diff --git a/src/api/http/api.go b/src/api/http/api.go index f0286b513e..b1856df7c4 100644 --- a/src/api/http/api.go +++ b/src/api/http/api.go @@ -319,8 +319,7 @@ func (self *HttpServer) createDatabase(w libhttp.ResponseWriter, r *libhttp.Requ w.Write([]byte(err.Error())) return } - apiKey := r.URL.Query().Get("api_key") - err = self.coordinator.CreateDatabase(createRequest.Name, createRequest.ApiKey, apiKey) + err = self.coordinator.CreateDatabase(createRequest.Name) if err != nil { w.WriteHeader(libhttp.StatusBadRequest) w.Write([]byte(err.Error())) diff --git a/src/api/http/api_test.go b/src/api/http/api_test.go index 4784ec22c7..2bf1aaf32b 100644 --- a/src/api/http/api_test.go +++ b/src/api/http/api_test.go @@ -88,12 +88,10 @@ func (self *MockEngine) RunQuery(_ string, query string, yield func(*protocol.Se } type MockCoordinator struct { - series []*protocol.Series - db string - droppedDb string - initialApiKey string - requestingApiKey string - users map[string]*coordinator.User + series []*protocol.Series + db string + droppedDb string + users map[string]*coordinator.User } func (self *MockCoordinator) DistributeQuery(db string, query *parser.Query, yield func(*protocol.Series) error) error { @@ -105,10 +103,8 @@ func (self *MockCoordinator) WriteSeriesData(db string, series *protocol.Series) return nil } -func (self *MockCoordinator) CreateDatabase(db, initialApiKey, requestingApiKey string) error { +func (self *MockCoordinator) CreateDatabase(db string) error { self.db = db - self.initialApiKey = initialApiKey - self.requestingApiKey = requestingApiKey return nil } @@ -320,8 +316,6 @@ func (self *ApiSuite) TestCreateDatabase(c *C) { c.Assert(err, IsNil) c.Assert(resp.StatusCode, Equals, libhttp.StatusCreated) c.Assert(self.coordinator.db, Equals, "foo") - c.Assert(self.coordinator.initialApiKey, Equals, "bar") - c.Assert(self.coordinator.requestingApiKey, Equals, "asdf") } func (self *ApiSuite) TestDropDatabase(c *C) { diff --git a/src/coordinator/cluster_configuration.go b/src/coordinator/cluster_configuration.go index 8ec7d4a026..55144290d7 100644 --- a/src/coordinator/cluster_configuration.go +++ b/src/coordinator/cluster_configuration.go @@ -6,92 +6,43 @@ import ( ) type ClusterConfiguration struct { - MaxRingLocation int64 - nextDatabaseId int - createDatabaseLock sync.RWMutex - databaseNames map[string]bool - nextDatabaseIdLock sync.Mutex - RingLocationToServers map[int64][]string - ringLocationToServersLock sync.RWMutex - ReadApiKeys map[string]bool - readApiKeysLock sync.RWMutex - WriteApiKeys map[string]bool - writeApiKeysLock sync.RWMutex - usersLock sync.RWMutex - clusterAdmins map[string]*clusterAdmin - dbUsers map[string]map[string]*dbUser + createDatabaseLock sync.RWMutex + databaseNames map[string]bool + usersLock sync.RWMutex + clusterAdmins map[string]*clusterAdmin + dbUsers map[string]map[string]*dbUser + activeServerConfig []*ClusterServer + potentialServerConfig []*ClusterServer + rebalancingServerConfig []*ClusterServer } -type ApiKeyType int +const NUMBER_OF_RING_LOCATIONS = 10000 -const ( - ReadKey ApiKeyType = iota - WriteKey -) - -func NewClusterConfiguration(maxRingLocation int64) *ClusterConfiguration { +func NewClusterConfiguration() *ClusterConfiguration { return &ClusterConfiguration{ - MaxRingLocation: maxRingLocation, - databaseNames: make(map[string]bool), - RingLocationToServers: make(map[int64][]string), - ReadApiKeys: make(map[string]bool), - WriteApiKeys: make(map[string]bool), - clusterAdmins: make(map[string]*clusterAdmin), - dbUsers: make(map[string]map[string]*dbUser), + databaseNames: make(map[string]bool), + clusterAdmins: make(map[string]*clusterAdmin), + dbUsers: make(map[string]map[string]*dbUser), + activeServerConfig: make([]*ClusterServer, 0), + potentialServerConfig: make([]*ClusterServer, 0), + rebalancingServerConfig: make([]*ClusterServer, 0), } } -func (self *ClusterConfiguration) AddRingLocationToServer(hostnameAndPort string, ringLocation int64) { - self.ringLocationToServersLock.Lock() - defer self.ringLocationToServersLock.Unlock() - self.RingLocationToServers[ringLocation] = append(self.RingLocationToServers[ringLocation], hostnameAndPort) +func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) { + self.potentialServerConfig = append(self.potentialServerConfig, server) } -func (self *ClusterConfiguration) RemoveRingLocationFromServer(hostnameAndPort string, ringLocation int64) { - self.ringLocationToServersLock.Lock() - defer self.ringLocationToServersLock.Unlock() - oldLocations := self.RingLocationToServers[ringLocation] - newLocations := make([]string, 0, len(oldLocations)) - for _, l := range oldLocations { - if l != hostnameAndPort { - newLocations = append(newLocations, l) - } - } - self.RingLocationToServers[ringLocation] = newLocations +func (self *ClusterConfiguration) RebalanceBasedOnPotentialConfig() { } -func (self *ClusterConfiguration) AddApiKey(database, key string, apiKeyType ApiKeyType) { - if apiKeyType == ReadKey { - self.readApiKeysLock.Lock() - defer self.readApiKeysLock.Unlock() - self.ReadApiKeys[database+key] = true - } else { - self.writeApiKeysLock.Lock() - defer self.writeApiKeysLock.Unlock() - self.WriteApiKeys[database+key] = true - } +func (self *ClusterConfiguration) UpdatePotentialServerOrder(serverIds []uint32) { } -func (self *ClusterConfiguration) DeleteApiKey(database, key string) { - self.readApiKeysLock.Lock() - self.writeApiKeysLock.Lock() - defer self.readApiKeysLock.Unlock() - defer self.writeApiKeysLock.Unlock() - fullKey := database + key - delete(self.ReadApiKeys, fullKey) - delete(self.WriteApiKeys, fullKey) -} - -func (self *ClusterConfiguration) IsValidReadKey(database, key string) bool { - self.readApiKeysLock.RLock() - defer self.readApiKeysLock.RUnlock() - return self.ReadApiKeys[database+key] -} - -func (self *ClusterConfiguration) IsValidWriteKey(database, key string) bool { - self.writeApiKeysLock.RLock() - defer self.writeApiKeysLock.RUnlock() - return self.WriteApiKeys[database+key] +func (self *ClusterConfiguration) MoveRebalancingToActive() { + self.activeServerConfig = self.rebalancingServerConfig + self.rebalancingServerConfig = make([]*ClusterServer, 0) + self.potentialServerConfig = self.activeServerConfig } func (self *ClusterConfiguration) GetDatabases() map[string]bool { @@ -128,20 +79,6 @@ func (self *ClusterConfiguration) DropDatabase(name string) error { return nil } -func (self *ClusterConfiguration) NextDatabaseId() string { - self.nextDatabaseIdLock.Lock() - self.nextDatabaseId += 1 - id := self.nextDatabaseId - self.nextDatabaseIdLock.Unlock() - return fmt.Sprintf("%d", id) -} - -func (self *ClusterConfiguration) CurrentDatabaseId() string { - self.nextDatabaseIdLock.Lock() - defer self.nextDatabaseIdLock.Unlock() - return fmt.Sprintf("%d", self.nextDatabaseId) -} - func (self *ClusterConfiguration) SaveDbUser(u *dbUser) { self.usersLock.Lock() defer self.usersLock.Unlock() diff --git a/src/coordinator/cluster_server.go b/src/coordinator/cluster_server.go index f2f4440fd7..724f57fe26 100644 --- a/src/coordinator/cluster_server.go +++ b/src/coordinator/cluster_server.go @@ -1 +1,15 @@ package coordinator + +type ClusterServer struct { + Id uint32 + RaftName string + State ServerState +} + +type ServerState int + +const ( + LoadingRingData ServerState = iota + DeletingOldData + Running +) diff --git a/src/coordinator/command.go b/src/coordinator/command.go index b617100790..80239315f1 100644 --- a/src/coordinator/command.go +++ b/src/coordinator/command.go @@ -4,114 +4,6 @@ import ( "github.com/goraft/raft" ) -type AddServerToLocationCommand struct { - Host string `json:"host"` - Location int64 `json:"location"` -} - -func NewAddServerToLocationCommand(host string, location int64) *AddServerToLocationCommand { - return &AddServerToLocationCommand{ - Host: host, - Location: location, - } -} - -func (c *AddServerToLocationCommand) CommandName() string { - return "add" -} - -func (c *AddServerToLocationCommand) Apply(server raft.Server) (interface{}, error) { - config := server.Context().(*ClusterConfiguration) - config.AddRingLocationToServer(c.Host, c.Location) - return nil, nil -} - -type RemoveServerFromLocationCommand struct { - Host string `json:"host"` - Location int64 `json:"location"` -} - -func NewRemoveServerFromLocationCommand(host string, location int64) *RemoveServerFromLocationCommand { - return &RemoveServerFromLocationCommand{ - Host: host, - Location: location, - } -} - -func (c *RemoveServerFromLocationCommand) CommandName() string { - return "remove" -} - -func (c *RemoveServerFromLocationCommand) Apply(server raft.Server) (interface{}, error) { - config := server.Context().(*ClusterConfiguration) - config.RemoveRingLocationFromServer(c.Host, c.Location) - return nil, nil -} - -type AddApiKeyCommand struct { - Database string `json:"database"` - ApiKey string `json:"api_key"` - KeyType ApiKeyType `json:"key_type"` -} - -func NewAddApikeyCommand(db, key string, keyType ApiKeyType) *AddApiKeyCommand { - return &AddApiKeyCommand{ - Database: db, - ApiKey: key, - KeyType: keyType, - } -} - -func (c *AddApiKeyCommand) CommandName() string { - return "add_key" -} - -func (c *AddApiKeyCommand) Apply(server raft.Server) (interface{}, error) { - config := server.Context().(*ClusterConfiguration) - config.AddApiKey(c.Database, c.ApiKey, c.KeyType) - return nil, nil -} - -type RemoveApiKeyCommand struct { - Database string `json:"database"` - ApiKey string `json:"api_key"` -} - -func NewRemoveApiKeyCommand(db, key string) *RemoveApiKeyCommand { - return &RemoveApiKeyCommand{ - Database: db, - ApiKey: key, - } -} - -func (c *RemoveApiKeyCommand) CommandName() string { - return "remove_key" -} - -func (c *RemoveApiKeyCommand) Apply(server raft.Server) (interface{}, error) { - config := server.Context().(*ClusterConfiguration) - config.DeleteApiKey(c.Database, c.ApiKey) - return nil, nil -} - -type NextDatabaseIdCommand struct { - LastId int `json:"last_id"` -} - -func NewNextDatabaseIdCommand(lastId int) *NextDatabaseIdCommand { - return &NextDatabaseIdCommand{lastId} -} - -func (c *NextDatabaseIdCommand) CommandName() string { - return "next_db" -} - -func (c *NextDatabaseIdCommand) Apply(server raft.Server) (interface{}, error) { - config := server.Context().(*ClusterConfiguration) - id := config.NextDatabaseId() - return id, nil -} - type DropDatabaseCommand struct { Name string `json:"name"` } diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 82b5310555..e8f2825360 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -50,17 +50,12 @@ func (self *CoordinatorImpl) WriteSeriesData(db string, series *protocol.Series) return self.datastore.WriteSeriesData(db, series) } -func (self *CoordinatorImpl) CreateDatabase(db, initialApiKey, requestingApiKey string) error { +func (self *CoordinatorImpl) CreateDatabase(db string) error { err := self.raftServer.CreateDatabase(db) if err != nil { return err } - err = self.raftServer.AddReadApiKey(db, initialApiKey) - if err != nil { - return err - } - err = self.raftServer.AddWriteApiKey(db, initialApiKey) - return err + return nil } func (self *CoordinatorImpl) DropDatabase(db string) error { diff --git a/src/coordinator/coordinator_test.go b/src/coordinator/coordinator_test.go index 0b3ab7c3a6..6e9fc2c4b0 100644 --- a/src/coordinator/coordinator_test.go +++ b/src/coordinator/coordinator_test.go @@ -27,7 +27,6 @@ var nextPortNum int var nextDirNum int const ( - MAX_RING_LOCATIONS = 10 SERVER_STARTUP_TIME = time.Second * 2 // new cluster will have to create the root user and encrypt the password which takes little over a sec REPLICATION_LAG = time.Millisecond * 500 ) @@ -54,7 +53,7 @@ func nextPort() int { // this is a hack for OSX boxes running spotify. It binds to 127.0.0.1:8099. net.Listen doesn't return an // error when listening to :8099. ugh. if 8090+nextPortNum == 8099 { - nextPortNum += 1 + nextPortNum += 2 } return 8090 + nextPortNum } @@ -107,7 +106,7 @@ func newConfigAndServer(path string, port int) (*ClusterConfiguration, *RaftServ fullPath = "/tmp/chronos_coordinator_test/" + path } os.MkdirAll(fullPath, 0744) - config := NewClusterConfiguration(MAX_RING_LOCATIONS) + config := NewClusterConfiguration() server := NewRaftServer(fullPath, "localhost", port, config) return config, server } @@ -168,9 +167,9 @@ func (self *CoordinatorSuite) TestCanRecoverCoordinatorWithData(c *C) { }() time.Sleep(SERVER_STARTUP_TIME) c.Assert(err, Equals, nil) - server.AddReadApiKey("db", "key1") + server.CreateDatabase("db1") - assertConfigContains(port, "key1", true, c) + assertConfigContains(port, "db1", true, c) server.Close() time.Sleep(SERVER_STARTUP_TIME) @@ -183,7 +182,7 @@ func (self *CoordinatorSuite) TestCanRecoverCoordinatorWithData(c *C) { }() time.Sleep(SERVER_STARTUP_TIME) c.Assert(err, Equals, nil) - assertConfigContains(port, "key1", true, c) + assertConfigContains(port, "db1", true, c) } func (self *CoordinatorSuite) TestCanCreateCoordinatorsAndReplicate(c *C) { @@ -215,10 +214,10 @@ func (self *CoordinatorSuite) TestCanCreateCoordinatorsAndReplicate(c *C) { c.Assert(err2, Equals, nil) c.Assert(err, Equals, nil) - server.AddReadApiKey("db", "key1") + server.CreateDatabase("db2") time.Sleep(REPLICATION_LAG) - assertConfigContains(port1, "key1", true, c) - assertConfigContains(port2, "key1", true, c) + assertConfigContains(port1, "db2", true, c) + assertConfigContains(port2, "db2", true, c) } func (self *CoordinatorSuite) TestDoWriteOperationsFromNonLeaderServer(c *C) { @@ -248,35 +247,20 @@ func (self *CoordinatorSuite) TestDoWriteOperationsFromNonLeaderServer(c *C) { time.Sleep(SERVER_STARTUP_TIME) c.Assert(err, Equals, nil) c.Assert(err2, Equals, nil) - err = server2.AddReadApiKey("db", "key1") - c.Assert(err, Equals, nil) - err = server2.AddWriteApiKey("db", "key2") - c.Assert(err, Equals, nil) - err = server2.AddServerToLocation("somehost", int64(-1)) + err = server2.CreateDatabase("db3") c.Assert(err, Equals, nil) time.Sleep(REPLICATION_LAG) - assertConfigContains(port1, "key1", true, c) - assertConfigContains(port2, "key1", true, c) - assertConfigContains(port1, "key2", true, c) - assertConfigContains(port2, "key2", true, c) - assertConfigContains(port1, "somehost", true, c) - assertConfigContains(port2, "somehost", true, c) - - err = server2.RemoveApiKey("db", "key2") - c.Assert(err, Equals, nil) - err = server2.RemoveServerFromLocation("somehost", int64(-1)) - c.Assert(err, Equals, nil) - time.Sleep(REPLICATION_LAG) - assertConfigContains(port2, "key2", false, c) - assertConfigContains(port1, "somehost", false, c) + assertConfigContains(port1, "db3", true, c) + assertConfigContains(port2, "db3", true, c) } func (self *CoordinatorSuite) TestNewServerJoiningClusterWillPickUpData(c *C) { defer http.DefaultTransport.(*http.Transport).CloseIdleConnections() logDir := nextDir() - port := nextPort() + // TODO: make the next port method actually check that the port is open. Skipping some here to make it actually work. ugh. + port := nextPort() + 3 logDir2 := nextDir() - port2 := nextPort() + port2 := nextPort() + 3 defer clearPath(logDir) defer clearPath(logDir2) @@ -288,9 +272,9 @@ func (self *CoordinatorSuite) TestNewServerJoiningClusterWillPickUpData(c *C) { defer server.Close() time.Sleep(SERVER_STARTUP_TIME) c.Assert(err, Equals, nil) - server.AddReadApiKey("db", "key1") + server.CreateDatabase("db4") - assertConfigContains(port, "key1", true, c) + assertConfigContains(port, "db4", true, c) _, server2 := newConfigAndServer(logDir2, port2) go func() { @@ -299,19 +283,19 @@ func (self *CoordinatorSuite) TestNewServerJoiningClusterWillPickUpData(c *C) { defer server2.Close() time.Sleep(SERVER_STARTUP_TIME) c.Assert(err, Equals, nil) - assertConfigContains(port2, "key1", true, c) + assertConfigContains(port2, "db4", true, c) } func (self *CoordinatorSuite) TestCanElectNewLeaderAndRecover(c *C) { servers := startAndVerifyCluster(3, c) defer clean(servers) - err := servers[0].AddReadApiKey("db", "key1") + err := servers[0].CreateDatabase("db5") c.Assert(err, Equals, nil) time.Sleep(REPLICATION_LAG) - assertConfigContains(servers[0].port, "key1", true, c) - assertConfigContains(servers[1].port, "key1", true, c) - assertConfigContains(servers[2].port, "key1", true, c) + assertConfigContains(servers[0].port, "db5", true, c) + assertConfigContains(servers[1].port, "db5", true, c) + assertConfigContains(servers[2].port, "db5", true, c) leader, _ := servers[1].leaderConnectString() c.Assert(leader, Equals, fmt.Sprintf("http://localhost:%d", servers[0].port)) @@ -323,11 +307,11 @@ func (self *CoordinatorSuite) TestCanElectNewLeaderAndRecover(c *C) { time.Sleep(SERVER_STARTUP_TIME) leader, _ = servers[1].leaderConnectString() c.Assert(leader, Not(Equals), fmt.Sprintf("http://localhost:%d", servers[0].port)) - err = servers[1].AddReadApiKey("db", "key2") + err = servers[1].CreateDatabase("db6") c.Assert(err, Equals, nil) time.Sleep(REPLICATION_LAG) - assertConfigContains(servers[1].port, "key2", true, c) - assertConfigContains(servers[2].port, "key2", true, c) + assertConfigContains(servers[1].port, "db6", true, c) + assertConfigContains(servers[2].port, "db6", true, c) _, server := newConfigAndServer(servers[0].path, servers[0].port) defer server.Close() @@ -338,17 +322,17 @@ func (self *CoordinatorSuite) TestCanElectNewLeaderAndRecover(c *C) { time.Sleep(SERVER_STARTUP_TIME) c.Assert(err, Equals, nil) - c.Assert(server.clusterConfig.ReadApiKeys["dbkey1"], Equals, true) - c.Assert(server.clusterConfig.ReadApiKeys["dbkey2"], Equals, true) - assertConfigContains(server.port, "key1", true, c) - assertConfigContains(server.port, "key2", true, c) + c.Assert(server.clusterConfig.databaseNames["db5"], Equals, true) + c.Assert(server.clusterConfig.databaseNames["db6"], Equals, true) + assertConfigContains(server.port, "db5", true, c) + assertConfigContains(server.port, "db6", true, c) - err = server.AddReadApiKey("blah", "sdf") + err = server.CreateDatabase("db7") c.Assert(err, Equals, nil) time.Sleep(REPLICATION_LAG) - assertConfigContains(servers[0].port, "sdf", true, c) - assertConfigContains(servers[1].port, "sdf", true, c) - assertConfigContains(servers[2].port, "sdf", true, c) + assertConfigContains(servers[0].port, "db7", true, c) + assertConfigContains(servers[1].port, "db7", true, c) + assertConfigContains(servers[2].port, "db7", true, c) } func (self *CoordinatorSuite) TestCanJoinAClusterWhenNotInitiallyPointedAtLeader(c *C) { @@ -393,12 +377,12 @@ func (self *CoordinatorSuite) TestCanJoinAClusterWhenNotInitiallyPointedAtLeader leader, _ := server2.leaderConnectString() c.Assert(leader, Equals, fmt.Sprintf("http://localhost:%d", port1)) - err = server.AddReadApiKey("db", "key1") + err = server.CreateDatabase("db8") c.Assert(err, Equals, nil) time.Sleep(REPLICATION_LAG) - assertConfigContains(port1, "key1", true, c) - assertConfigContains(port2, "key1", true, c) - assertConfigContains(port3, "key1", true, c) + assertConfigContains(port1, "db8", true, c) + assertConfigContains(port2, "db8", true, c) + assertConfigContains(port3, "db8", true, c) } func (self *UserSuite) BenchmarkHashing(c *C) { @@ -619,28 +603,6 @@ func (self *CoordinatorSuite) TestCanDropDatabaseWithName(c *C) { c.Assert(err, ErrorMatches, ".*db3 doesn't exist.*") } -func (self *CoordinatorSuite) TestCanCreateDatabase(c *C) { - servers := startAndVerifyCluster(3, c) - defer clean(servers) - id, _ := servers[0].GetNextDatabaseId() - c.Assert(id, Equals, "1") - time.Sleep(REPLICATION_LAG) - c.Assert(id, Equals, servers[1].clusterConfig.CurrentDatabaseId()) - c.Assert(id, Equals, servers[2].clusterConfig.CurrentDatabaseId()) - id2, _ := servers[1].GetNextDatabaseId() - id3, _ := servers[2].GetNextDatabaseId() - id4, _ := servers[0].GetNextDatabaseId() - c.Assert(id2, Equals, "2") - c.Assert(id3, Equals, "3") - c.Assert(id4, Equals, "4") - time.Sleep(REPLICATION_LAG) - c.Assert(id4, Equals, servers[1].clusterConfig.CurrentDatabaseId()) - c.Assert(id4, Equals, servers[2].clusterConfig.CurrentDatabaseId()) -} - -func (self *CoordinatorSuite) TestDistributesRingLocationsToNewServer(c *C) { -} - func (self *CoordinatorSuite) TestWillSetTimestampsAndSequenceNumbersForPointsWithout(c *C) { datastoreMock := &DatastoreMock{} coordinator := NewCoordinatorImpl(datastoreMock, nil, nil) diff --git a/src/coordinator/interface.go b/src/coordinator/interface.go index f3c3884652..84ff7a126d 100644 --- a/src/coordinator/interface.go +++ b/src/coordinator/interface.go @@ -16,8 +16,8 @@ type Coordinator interface { // 5. TODO: Aggregation on the nodes DistributeQuery(db string, query *parser.Query, yield func(*protocol.Series) error) error WriteSeriesData(db string, series *protocol.Series) error - CreateDatabase(db, initialApiKey, requestingApiKey string) error DropDatabase(db string) error + CreateDatabase(db string) error } type UserManager interface { diff --git a/src/coordinator/raft_server.go b/src/coordinator/raft_server.go index ab856aeced..e597a5fdaa 100644 --- a/src/coordinator/raft_server.go +++ b/src/coordinator/raft_server.go @@ -41,11 +41,6 @@ func NewRaftServer(path string, host string, port int, clusterConfig *ClusterCon if !registeredCommands { // raft.SetLogLevel(raft.Trace) registeredCommands = true - raft.RegisterCommand(&AddApiKeyCommand{}) - raft.RegisterCommand(&RemoveApiKeyCommand{}) - raft.RegisterCommand(&AddServerToLocationCommand{}) - raft.RegisterCommand(&RemoveServerFromLocationCommand{}) - raft.RegisterCommand(&NextDatabaseIdCommand{}) raft.RegisterCommand(&CreateDatabaseCommand{}) raft.RegisterCommand(&DropDatabaseCommand{}) raft.RegisterCommand(&SaveDbUserCommand{}) @@ -111,36 +106,6 @@ func (s *RaftServer) doOrProxyCommand(command raft.Command, commandType string) return nil, nil } -func (s *RaftServer) AddReadApiKey(db, key string) error { - command := NewAddApikeyCommand(db, key, ReadKey) - _, err := s.doOrProxyCommand(command, "add_api_key") - return err -} - -func (s *RaftServer) AddWriteApiKey(db, key string) error { - command := NewAddApikeyCommand(db, key, WriteKey) - _, err := s.doOrProxyCommand(command, "add_api_key") - return err -} - -func (s *RaftServer) RemoveApiKey(db, key string) error { - command := NewRemoveApiKeyCommand(db, key) - _, err := s.doOrProxyCommand(command, "remove_api_key") - return err -} - -func (s *RaftServer) AddServerToLocation(host string, location int64) error { - command := NewAddServerToLocationCommand(host, location) - _, err := s.doOrProxyCommand(command, "add_server") - return err -} - -func (s *RaftServer) RemoveServerFromLocation(host string, location int64) error { - command := NewRemoveServerFromLocationCommand(host, location) - _, err := s.doOrProxyCommand(command, "remove_server") - return err -} - func (s *RaftServer) CreateDatabase(name string) error { command := NewCreateDatabaseCommand(name) _, err := s.doOrProxyCommand(command, "create_db") @@ -153,12 +118,6 @@ func (s *RaftServer) DropDatabase(name string) error { return err } -func (s *RaftServer) GetNextDatabaseId() (string, error) { - command := NewNextDatabaseIdCommand(s.clusterConfig.nextDatabaseId) - id, err := s.doOrProxyCommand(command, "next_db") - return id.(string), err -} - func (s *RaftServer) SaveDbUser(u *dbUser) error { command := NewSaveDbUserCommand(u) _, err := s.doOrProxyCommand(command, "save_db_user") @@ -323,23 +282,13 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { func (s *RaftServer) configHandler(w http.ResponseWriter, req *http.Request) { jsonObject := make(map[string]interface{}) - readKeys := make([]string, 0) - for k, _ := range s.clusterConfig.ReadApiKeys { - readKeys = append(readKeys, k) + dbs := make([]string, 0) + for db, _ := range s.clusterConfig.databaseNames { + dbs = append(dbs, db) } - jsonObject["read_keys"] = readKeys - writeKeys := make([]string, 0) - for k, _ := range s.clusterConfig.WriteApiKeys { - writeKeys = append(writeKeys, k) - } - jsonObject["write_keys"] = writeKeys - locations := make([]map[string]interface{}, 0) - for location, servers := range s.clusterConfig.RingLocationToServers { - s := servers - locations = append(locations, map[string]interface{}{"location": location, "servers": s}) - } - jsonObject["locations"] = locations - + jsonObject["databases"] = dbs + jsonObject["cluster_admins"] = s.clusterConfig.clusterAdmins + jsonObject["database_users"] = s.clusterConfig.dbUsers js, err := json.Marshal(jsonObject) if err != nil { log.Println("ERROR marshalling config: ", err) @@ -362,17 +311,7 @@ func (s *RaftServer) processCommandHandler(w http.ResponseWriter, req *http.Requ vars := mux.Vars(req) value := vars["command_type"] var command raft.Command - if value == "add_api_key" { - command = &AddApiKeyCommand{} - } else if value == "remove_api_key" { - command = &RemoveApiKeyCommand{} - } else if value == "add_server" { - command = &AddServerToLocationCommand{} - } else if value == "remove_server" { - command = &RemoveServerFromLocationCommand{} - } else if value == "next_db" { - command = &NextDatabaseIdCommand{} - } else if value == "create_db" { + if value == "create_db" { command = &CreateDatabaseCommand{} } else if value == "drop_db" { command = &DropDatabaseCommand{} diff --git a/src/engine/engine_test.go b/src/engine/engine_test.go index ea49e59db0..084a9fc768 100644 --- a/src/engine/engine_test.go +++ b/src/engine/engine_test.go @@ -45,7 +45,7 @@ func (self *MockCoordinator) WriteSeriesData(database string, series *protocol.S return nil } -func (self *MockCoordinator) CreateDatabase(db, initialApiKey, requestingApiKey string) error { +func (self *MockCoordinator) CreateDatabase(db string) error { return nil } diff --git a/src/parser/query.yacc b/src/parser/query.yacc index bcbe91484a..312f9e87ab 100644 --- a/src/parser/query.yacc +++ b/src/parser/query.yacc @@ -6,7 +6,9 @@ #include #include "query_types.h" +#ifndef __APPLE_CC__ __asm__(".symver memcpy,memcpy@GLIBC_2.2.5"); +#endif expression *create_expression(expression *left, char op, expression *right) { expression *expr = malloc(sizeof(expression)); diff --git a/src/server/server.go b/src/server/server.go index 0589723dd6..c479db18e7 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -30,8 +30,7 @@ func main() { config := configuration.LoadConfiguration(*fileName) log.Println("Starting Influx Server...") - ringSize := int64(1000) - clusterConfig := coordinator.NewClusterConfiguration(ringSize) + clusterConfig := coordinator.NewClusterConfiguration() os.MkdirAll(config.RaftDir, 0744) raftServer := coordinator.NewRaftServer(config.RaftDir, "localhost", config.RaftServerPort, clusterConfig)