Remove api keys from all database creation and tests.

Add ifndef for yacc file to make it compile on OSX.
Remove old server location stuff to make way for how clustering is actually going to work.
pull/17/head
Paul Dix 2013-10-25 11:45:59 -04:00
parent a63d8ce5ac
commit b8f3e02170
12 changed files with 94 additions and 361 deletions

View File

@ -319,8 +319,7 @@ func (self *HttpServer) createDatabase(w libhttp.ResponseWriter, r *libhttp.Requ
w.Write([]byte(err.Error())) w.Write([]byte(err.Error()))
return return
} }
apiKey := r.URL.Query().Get("api_key") err = self.coordinator.CreateDatabase(createRequest.Name)
err = self.coordinator.CreateDatabase(createRequest.Name, createRequest.ApiKey, apiKey)
if err != nil { if err != nil {
w.WriteHeader(libhttp.StatusBadRequest) w.WriteHeader(libhttp.StatusBadRequest)
w.Write([]byte(err.Error())) w.Write([]byte(err.Error()))

View File

@ -88,12 +88,10 @@ func (self *MockEngine) RunQuery(_ string, query string, yield func(*protocol.Se
} }
type MockCoordinator struct { type MockCoordinator struct {
series []*protocol.Series series []*protocol.Series
db string db string
droppedDb string droppedDb string
initialApiKey string users map[string]*coordinator.User
requestingApiKey string
users map[string]*coordinator.User
} }
func (self *MockCoordinator) DistributeQuery(db string, query *parser.Query, yield func(*protocol.Series) error) error { func (self *MockCoordinator) DistributeQuery(db string, query *parser.Query, yield func(*protocol.Series) error) error {
@ -105,10 +103,8 @@ func (self *MockCoordinator) WriteSeriesData(db string, series *protocol.Series)
return nil return nil
} }
func (self *MockCoordinator) CreateDatabase(db, initialApiKey, requestingApiKey string) error { func (self *MockCoordinator) CreateDatabase(db string) error {
self.db = db self.db = db
self.initialApiKey = initialApiKey
self.requestingApiKey = requestingApiKey
return nil return nil
} }
@ -320,8 +316,6 @@ func (self *ApiSuite) TestCreateDatabase(c *C) {
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, libhttp.StatusCreated) c.Assert(resp.StatusCode, Equals, libhttp.StatusCreated)
c.Assert(self.coordinator.db, Equals, "foo") c.Assert(self.coordinator.db, Equals, "foo")
c.Assert(self.coordinator.initialApiKey, Equals, "bar")
c.Assert(self.coordinator.requestingApiKey, Equals, "asdf")
} }
func (self *ApiSuite) TestDropDatabase(c *C) { func (self *ApiSuite) TestDropDatabase(c *C) {

View File

@ -6,92 +6,43 @@ import (
) )
type ClusterConfiguration struct { type ClusterConfiguration struct {
MaxRingLocation int64 createDatabaseLock sync.RWMutex
nextDatabaseId int databaseNames map[string]bool
createDatabaseLock sync.RWMutex usersLock sync.RWMutex
databaseNames map[string]bool clusterAdmins map[string]*clusterAdmin
nextDatabaseIdLock sync.Mutex dbUsers map[string]map[string]*dbUser
RingLocationToServers map[int64][]string activeServerConfig []*ClusterServer
ringLocationToServersLock sync.RWMutex potentialServerConfig []*ClusterServer
ReadApiKeys map[string]bool rebalancingServerConfig []*ClusterServer
readApiKeysLock sync.RWMutex
WriteApiKeys map[string]bool
writeApiKeysLock sync.RWMutex
usersLock sync.RWMutex
clusterAdmins map[string]*clusterAdmin
dbUsers map[string]map[string]*dbUser
} }
type ApiKeyType int const NUMBER_OF_RING_LOCATIONS = 10000
const ( func NewClusterConfiguration() *ClusterConfiguration {
ReadKey ApiKeyType = iota
WriteKey
)
func NewClusterConfiguration(maxRingLocation int64) *ClusterConfiguration {
return &ClusterConfiguration{ return &ClusterConfiguration{
MaxRingLocation: maxRingLocation, databaseNames: make(map[string]bool),
databaseNames: make(map[string]bool), clusterAdmins: make(map[string]*clusterAdmin),
RingLocationToServers: make(map[int64][]string), dbUsers: make(map[string]map[string]*dbUser),
ReadApiKeys: make(map[string]bool), activeServerConfig: make([]*ClusterServer, 0),
WriteApiKeys: make(map[string]bool), potentialServerConfig: make([]*ClusterServer, 0),
clusterAdmins: make(map[string]*clusterAdmin), rebalancingServerConfig: make([]*ClusterServer, 0),
dbUsers: make(map[string]map[string]*dbUser),
} }
} }
func (self *ClusterConfiguration) AddRingLocationToServer(hostnameAndPort string, ringLocation int64) { func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) {
self.ringLocationToServersLock.Lock() self.potentialServerConfig = append(self.potentialServerConfig, server)
defer self.ringLocationToServersLock.Unlock()
self.RingLocationToServers[ringLocation] = append(self.RingLocationToServers[ringLocation], hostnameAndPort)
} }
func (self *ClusterConfiguration) RemoveRingLocationFromServer(hostnameAndPort string, ringLocation int64) { func (self *ClusterConfiguration) RebalanceBasedOnPotentialConfig() {
self.ringLocationToServersLock.Lock()
defer self.ringLocationToServersLock.Unlock()
oldLocations := self.RingLocationToServers[ringLocation]
newLocations := make([]string, 0, len(oldLocations))
for _, l := range oldLocations {
if l != hostnameAndPort {
newLocations = append(newLocations, l)
}
}
self.RingLocationToServers[ringLocation] = newLocations
} }
func (self *ClusterConfiguration) AddApiKey(database, key string, apiKeyType ApiKeyType) { func (self *ClusterConfiguration) UpdatePotentialServerOrder(serverIds []uint32) {
if apiKeyType == ReadKey {
self.readApiKeysLock.Lock()
defer self.readApiKeysLock.Unlock()
self.ReadApiKeys[database+key] = true
} else {
self.writeApiKeysLock.Lock()
defer self.writeApiKeysLock.Unlock()
self.WriteApiKeys[database+key] = true
}
} }
func (self *ClusterConfiguration) DeleteApiKey(database, key string) { func (self *ClusterConfiguration) MoveRebalancingToActive() {
self.readApiKeysLock.Lock() self.activeServerConfig = self.rebalancingServerConfig
self.writeApiKeysLock.Lock() self.rebalancingServerConfig = make([]*ClusterServer, 0)
defer self.readApiKeysLock.Unlock() self.potentialServerConfig = self.activeServerConfig
defer self.writeApiKeysLock.Unlock()
fullKey := database + key
delete(self.ReadApiKeys, fullKey)
delete(self.WriteApiKeys, fullKey)
}
func (self *ClusterConfiguration) IsValidReadKey(database, key string) bool {
self.readApiKeysLock.RLock()
defer self.readApiKeysLock.RUnlock()
return self.ReadApiKeys[database+key]
}
func (self *ClusterConfiguration) IsValidWriteKey(database, key string) bool {
self.writeApiKeysLock.RLock()
defer self.writeApiKeysLock.RUnlock()
return self.WriteApiKeys[database+key]
} }
func (self *ClusterConfiguration) GetDatabases() map[string]bool { func (self *ClusterConfiguration) GetDatabases() map[string]bool {
@ -128,20 +79,6 @@ func (self *ClusterConfiguration) DropDatabase(name string) error {
return nil return nil
} }
func (self *ClusterConfiguration) NextDatabaseId() string {
self.nextDatabaseIdLock.Lock()
self.nextDatabaseId += 1
id := self.nextDatabaseId
self.nextDatabaseIdLock.Unlock()
return fmt.Sprintf("%d", id)
}
func (self *ClusterConfiguration) CurrentDatabaseId() string {
self.nextDatabaseIdLock.Lock()
defer self.nextDatabaseIdLock.Unlock()
return fmt.Sprintf("%d", self.nextDatabaseId)
}
func (self *ClusterConfiguration) SaveDbUser(u *dbUser) { func (self *ClusterConfiguration) SaveDbUser(u *dbUser) {
self.usersLock.Lock() self.usersLock.Lock()
defer self.usersLock.Unlock() defer self.usersLock.Unlock()

View File

@ -1 +1,15 @@
package coordinator package coordinator
type ClusterServer struct {
Id uint32
RaftName string
State ServerState
}
type ServerState int
const (
LoadingRingData ServerState = iota
DeletingOldData
Running
)

View File

@ -4,114 +4,6 @@ import (
"github.com/goraft/raft" "github.com/goraft/raft"
) )
type AddServerToLocationCommand struct {
Host string `json:"host"`
Location int64 `json:"location"`
}
func NewAddServerToLocationCommand(host string, location int64) *AddServerToLocationCommand {
return &AddServerToLocationCommand{
Host: host,
Location: location,
}
}
func (c *AddServerToLocationCommand) CommandName() string {
return "add"
}
func (c *AddServerToLocationCommand) Apply(server raft.Server) (interface{}, error) {
config := server.Context().(*ClusterConfiguration)
config.AddRingLocationToServer(c.Host, c.Location)
return nil, nil
}
type RemoveServerFromLocationCommand struct {
Host string `json:"host"`
Location int64 `json:"location"`
}
func NewRemoveServerFromLocationCommand(host string, location int64) *RemoveServerFromLocationCommand {
return &RemoveServerFromLocationCommand{
Host: host,
Location: location,
}
}
func (c *RemoveServerFromLocationCommand) CommandName() string {
return "remove"
}
func (c *RemoveServerFromLocationCommand) Apply(server raft.Server) (interface{}, error) {
config := server.Context().(*ClusterConfiguration)
config.RemoveRingLocationFromServer(c.Host, c.Location)
return nil, nil
}
type AddApiKeyCommand struct {
Database string `json:"database"`
ApiKey string `json:"api_key"`
KeyType ApiKeyType `json:"key_type"`
}
func NewAddApikeyCommand(db, key string, keyType ApiKeyType) *AddApiKeyCommand {
return &AddApiKeyCommand{
Database: db,
ApiKey: key,
KeyType: keyType,
}
}
func (c *AddApiKeyCommand) CommandName() string {
return "add_key"
}
func (c *AddApiKeyCommand) Apply(server raft.Server) (interface{}, error) {
config := server.Context().(*ClusterConfiguration)
config.AddApiKey(c.Database, c.ApiKey, c.KeyType)
return nil, nil
}
type RemoveApiKeyCommand struct {
Database string `json:"database"`
ApiKey string `json:"api_key"`
}
func NewRemoveApiKeyCommand(db, key string) *RemoveApiKeyCommand {
return &RemoveApiKeyCommand{
Database: db,
ApiKey: key,
}
}
func (c *RemoveApiKeyCommand) CommandName() string {
return "remove_key"
}
func (c *RemoveApiKeyCommand) Apply(server raft.Server) (interface{}, error) {
config := server.Context().(*ClusterConfiguration)
config.DeleteApiKey(c.Database, c.ApiKey)
return nil, nil
}
type NextDatabaseIdCommand struct {
LastId int `json:"last_id"`
}
func NewNextDatabaseIdCommand(lastId int) *NextDatabaseIdCommand {
return &NextDatabaseIdCommand{lastId}
}
func (c *NextDatabaseIdCommand) CommandName() string {
return "next_db"
}
func (c *NextDatabaseIdCommand) Apply(server raft.Server) (interface{}, error) {
config := server.Context().(*ClusterConfiguration)
id := config.NextDatabaseId()
return id, nil
}
type DropDatabaseCommand struct { type DropDatabaseCommand struct {
Name string `json:"name"` Name string `json:"name"`
} }

View File

@ -50,17 +50,12 @@ func (self *CoordinatorImpl) WriteSeriesData(db string, series *protocol.Series)
return self.datastore.WriteSeriesData(db, series) return self.datastore.WriteSeriesData(db, series)
} }
func (self *CoordinatorImpl) CreateDatabase(db, initialApiKey, requestingApiKey string) error { func (self *CoordinatorImpl) CreateDatabase(db string) error {
err := self.raftServer.CreateDatabase(db) err := self.raftServer.CreateDatabase(db)
if err != nil { if err != nil {
return err return err
} }
err = self.raftServer.AddReadApiKey(db, initialApiKey) return nil
if err != nil {
return err
}
err = self.raftServer.AddWriteApiKey(db, initialApiKey)
return err
} }
func (self *CoordinatorImpl) DropDatabase(db string) error { func (self *CoordinatorImpl) DropDatabase(db string) error {

View File

@ -27,7 +27,6 @@ var nextPortNum int
var nextDirNum int var nextDirNum int
const ( const (
MAX_RING_LOCATIONS = 10
SERVER_STARTUP_TIME = time.Second * 2 // new cluster will have to create the root user and encrypt the password which takes little over a sec SERVER_STARTUP_TIME = time.Second * 2 // new cluster will have to create the root user and encrypt the password which takes little over a sec
REPLICATION_LAG = time.Millisecond * 500 REPLICATION_LAG = time.Millisecond * 500
) )
@ -54,7 +53,7 @@ func nextPort() int {
// this is a hack for OSX boxes running spotify. It binds to 127.0.0.1:8099. net.Listen doesn't return an // this is a hack for OSX boxes running spotify. It binds to 127.0.0.1:8099. net.Listen doesn't return an
// error when listening to :8099. ugh. // error when listening to :8099. ugh.
if 8090+nextPortNum == 8099 { if 8090+nextPortNum == 8099 {
nextPortNum += 1 nextPortNum += 2
} }
return 8090 + nextPortNum return 8090 + nextPortNum
} }
@ -107,7 +106,7 @@ func newConfigAndServer(path string, port int) (*ClusterConfiguration, *RaftServ
fullPath = "/tmp/chronos_coordinator_test/" + path fullPath = "/tmp/chronos_coordinator_test/" + path
} }
os.MkdirAll(fullPath, 0744) os.MkdirAll(fullPath, 0744)
config := NewClusterConfiguration(MAX_RING_LOCATIONS) config := NewClusterConfiguration()
server := NewRaftServer(fullPath, "localhost", port, config) server := NewRaftServer(fullPath, "localhost", port, config)
return config, server return config, server
} }
@ -168,9 +167,9 @@ func (self *CoordinatorSuite) TestCanRecoverCoordinatorWithData(c *C) {
}() }()
time.Sleep(SERVER_STARTUP_TIME) time.Sleep(SERVER_STARTUP_TIME)
c.Assert(err, Equals, nil) c.Assert(err, Equals, nil)
server.AddReadApiKey("db", "key1") server.CreateDatabase("db1")
assertConfigContains(port, "key1", true, c) assertConfigContains(port, "db1", true, c)
server.Close() server.Close()
time.Sleep(SERVER_STARTUP_TIME) time.Sleep(SERVER_STARTUP_TIME)
@ -183,7 +182,7 @@ func (self *CoordinatorSuite) TestCanRecoverCoordinatorWithData(c *C) {
}() }()
time.Sleep(SERVER_STARTUP_TIME) time.Sleep(SERVER_STARTUP_TIME)
c.Assert(err, Equals, nil) c.Assert(err, Equals, nil)
assertConfigContains(port, "key1", true, c) assertConfigContains(port, "db1", true, c)
} }
func (self *CoordinatorSuite) TestCanCreateCoordinatorsAndReplicate(c *C) { func (self *CoordinatorSuite) TestCanCreateCoordinatorsAndReplicate(c *C) {
@ -215,10 +214,10 @@ func (self *CoordinatorSuite) TestCanCreateCoordinatorsAndReplicate(c *C) {
c.Assert(err2, Equals, nil) c.Assert(err2, Equals, nil)
c.Assert(err, Equals, nil) c.Assert(err, Equals, nil)
server.AddReadApiKey("db", "key1") server.CreateDatabase("db2")
time.Sleep(REPLICATION_LAG) time.Sleep(REPLICATION_LAG)
assertConfigContains(port1, "key1", true, c) assertConfigContains(port1, "db2", true, c)
assertConfigContains(port2, "key1", true, c) assertConfigContains(port2, "db2", true, c)
} }
func (self *CoordinatorSuite) TestDoWriteOperationsFromNonLeaderServer(c *C) { func (self *CoordinatorSuite) TestDoWriteOperationsFromNonLeaderServer(c *C) {
@ -248,35 +247,20 @@ func (self *CoordinatorSuite) TestDoWriteOperationsFromNonLeaderServer(c *C) {
time.Sleep(SERVER_STARTUP_TIME) time.Sleep(SERVER_STARTUP_TIME)
c.Assert(err, Equals, nil) c.Assert(err, Equals, nil)
c.Assert(err2, Equals, nil) c.Assert(err2, Equals, nil)
err = server2.AddReadApiKey("db", "key1") err = server2.CreateDatabase("db3")
c.Assert(err, Equals, nil)
err = server2.AddWriteApiKey("db", "key2")
c.Assert(err, Equals, nil)
err = server2.AddServerToLocation("somehost", int64(-1))
c.Assert(err, Equals, nil) c.Assert(err, Equals, nil)
time.Sleep(REPLICATION_LAG) time.Sleep(REPLICATION_LAG)
assertConfigContains(port1, "key1", true, c) assertConfigContains(port1, "db3", true, c)
assertConfigContains(port2, "key1", true, c) assertConfigContains(port2, "db3", true, c)
assertConfigContains(port1, "key2", true, c)
assertConfigContains(port2, "key2", true, c)
assertConfigContains(port1, "somehost", true, c)
assertConfigContains(port2, "somehost", true, c)
err = server2.RemoveApiKey("db", "key2")
c.Assert(err, Equals, nil)
err = server2.RemoveServerFromLocation("somehost", int64(-1))
c.Assert(err, Equals, nil)
time.Sleep(REPLICATION_LAG)
assertConfigContains(port2, "key2", false, c)
assertConfigContains(port1, "somehost", false, c)
} }
func (self *CoordinatorSuite) TestNewServerJoiningClusterWillPickUpData(c *C) { func (self *CoordinatorSuite) TestNewServerJoiningClusterWillPickUpData(c *C) {
defer http.DefaultTransport.(*http.Transport).CloseIdleConnections() defer http.DefaultTransport.(*http.Transport).CloseIdleConnections()
logDir := nextDir() logDir := nextDir()
port := nextPort() // TODO: make the next port method actually check that the port is open. Skipping some here to make it actually work. ugh.
port := nextPort() + 3
logDir2 := nextDir() logDir2 := nextDir()
port2 := nextPort() port2 := nextPort() + 3
defer clearPath(logDir) defer clearPath(logDir)
defer clearPath(logDir2) defer clearPath(logDir2)
@ -288,9 +272,9 @@ func (self *CoordinatorSuite) TestNewServerJoiningClusterWillPickUpData(c *C) {
defer server.Close() defer server.Close()
time.Sleep(SERVER_STARTUP_TIME) time.Sleep(SERVER_STARTUP_TIME)
c.Assert(err, Equals, nil) c.Assert(err, Equals, nil)
server.AddReadApiKey("db", "key1") server.CreateDatabase("db4")
assertConfigContains(port, "key1", true, c) assertConfigContains(port, "db4", true, c)
_, server2 := newConfigAndServer(logDir2, port2) _, server2 := newConfigAndServer(logDir2, port2)
go func() { go func() {
@ -299,19 +283,19 @@ func (self *CoordinatorSuite) TestNewServerJoiningClusterWillPickUpData(c *C) {
defer server2.Close() defer server2.Close()
time.Sleep(SERVER_STARTUP_TIME) time.Sleep(SERVER_STARTUP_TIME)
c.Assert(err, Equals, nil) c.Assert(err, Equals, nil)
assertConfigContains(port2, "key1", true, c) assertConfigContains(port2, "db4", true, c)
} }
func (self *CoordinatorSuite) TestCanElectNewLeaderAndRecover(c *C) { func (self *CoordinatorSuite) TestCanElectNewLeaderAndRecover(c *C) {
servers := startAndVerifyCluster(3, c) servers := startAndVerifyCluster(3, c)
defer clean(servers) defer clean(servers)
err := servers[0].AddReadApiKey("db", "key1") err := servers[0].CreateDatabase("db5")
c.Assert(err, Equals, nil) c.Assert(err, Equals, nil)
time.Sleep(REPLICATION_LAG) time.Sleep(REPLICATION_LAG)
assertConfigContains(servers[0].port, "key1", true, c) assertConfigContains(servers[0].port, "db5", true, c)
assertConfigContains(servers[1].port, "key1", true, c) assertConfigContains(servers[1].port, "db5", true, c)
assertConfigContains(servers[2].port, "key1", true, c) assertConfigContains(servers[2].port, "db5", true, c)
leader, _ := servers[1].leaderConnectString() leader, _ := servers[1].leaderConnectString()
c.Assert(leader, Equals, fmt.Sprintf("http://localhost:%d", servers[0].port)) c.Assert(leader, Equals, fmt.Sprintf("http://localhost:%d", servers[0].port))
@ -323,11 +307,11 @@ func (self *CoordinatorSuite) TestCanElectNewLeaderAndRecover(c *C) {
time.Sleep(SERVER_STARTUP_TIME) time.Sleep(SERVER_STARTUP_TIME)
leader, _ = servers[1].leaderConnectString() leader, _ = servers[1].leaderConnectString()
c.Assert(leader, Not(Equals), fmt.Sprintf("http://localhost:%d", servers[0].port)) c.Assert(leader, Not(Equals), fmt.Sprintf("http://localhost:%d", servers[0].port))
err = servers[1].AddReadApiKey("db", "key2") err = servers[1].CreateDatabase("db6")
c.Assert(err, Equals, nil) c.Assert(err, Equals, nil)
time.Sleep(REPLICATION_LAG) time.Sleep(REPLICATION_LAG)
assertConfigContains(servers[1].port, "key2", true, c) assertConfigContains(servers[1].port, "db6", true, c)
assertConfigContains(servers[2].port, "key2", true, c) assertConfigContains(servers[2].port, "db6", true, c)
_, server := newConfigAndServer(servers[0].path, servers[0].port) _, server := newConfigAndServer(servers[0].path, servers[0].port)
defer server.Close() defer server.Close()
@ -338,17 +322,17 @@ func (self *CoordinatorSuite) TestCanElectNewLeaderAndRecover(c *C) {
time.Sleep(SERVER_STARTUP_TIME) time.Sleep(SERVER_STARTUP_TIME)
c.Assert(err, Equals, nil) c.Assert(err, Equals, nil)
c.Assert(server.clusterConfig.ReadApiKeys["dbkey1"], Equals, true) c.Assert(server.clusterConfig.databaseNames["db5"], Equals, true)
c.Assert(server.clusterConfig.ReadApiKeys["dbkey2"], Equals, true) c.Assert(server.clusterConfig.databaseNames["db6"], Equals, true)
assertConfigContains(server.port, "key1", true, c) assertConfigContains(server.port, "db5", true, c)
assertConfigContains(server.port, "key2", true, c) assertConfigContains(server.port, "db6", true, c)
err = server.AddReadApiKey("blah", "sdf") err = server.CreateDatabase("db7")
c.Assert(err, Equals, nil) c.Assert(err, Equals, nil)
time.Sleep(REPLICATION_LAG) time.Sleep(REPLICATION_LAG)
assertConfigContains(servers[0].port, "sdf", true, c) assertConfigContains(servers[0].port, "db7", true, c)
assertConfigContains(servers[1].port, "sdf", true, c) assertConfigContains(servers[1].port, "db7", true, c)
assertConfigContains(servers[2].port, "sdf", true, c) assertConfigContains(servers[2].port, "db7", true, c)
} }
func (self *CoordinatorSuite) TestCanJoinAClusterWhenNotInitiallyPointedAtLeader(c *C) { func (self *CoordinatorSuite) TestCanJoinAClusterWhenNotInitiallyPointedAtLeader(c *C) {
@ -393,12 +377,12 @@ func (self *CoordinatorSuite) TestCanJoinAClusterWhenNotInitiallyPointedAtLeader
leader, _ := server2.leaderConnectString() leader, _ := server2.leaderConnectString()
c.Assert(leader, Equals, fmt.Sprintf("http://localhost:%d", port1)) c.Assert(leader, Equals, fmt.Sprintf("http://localhost:%d", port1))
err = server.AddReadApiKey("db", "key1") err = server.CreateDatabase("db8")
c.Assert(err, Equals, nil) c.Assert(err, Equals, nil)
time.Sleep(REPLICATION_LAG) time.Sleep(REPLICATION_LAG)
assertConfigContains(port1, "key1", true, c) assertConfigContains(port1, "db8", true, c)
assertConfigContains(port2, "key1", true, c) assertConfigContains(port2, "db8", true, c)
assertConfigContains(port3, "key1", true, c) assertConfigContains(port3, "db8", true, c)
} }
func (self *UserSuite) BenchmarkHashing(c *C) { func (self *UserSuite) BenchmarkHashing(c *C) {
@ -619,28 +603,6 @@ func (self *CoordinatorSuite) TestCanDropDatabaseWithName(c *C) {
c.Assert(err, ErrorMatches, ".*db3 doesn't exist.*") c.Assert(err, ErrorMatches, ".*db3 doesn't exist.*")
} }
func (self *CoordinatorSuite) TestCanCreateDatabase(c *C) {
servers := startAndVerifyCluster(3, c)
defer clean(servers)
id, _ := servers[0].GetNextDatabaseId()
c.Assert(id, Equals, "1")
time.Sleep(REPLICATION_LAG)
c.Assert(id, Equals, servers[1].clusterConfig.CurrentDatabaseId())
c.Assert(id, Equals, servers[2].clusterConfig.CurrentDatabaseId())
id2, _ := servers[1].GetNextDatabaseId()
id3, _ := servers[2].GetNextDatabaseId()
id4, _ := servers[0].GetNextDatabaseId()
c.Assert(id2, Equals, "2")
c.Assert(id3, Equals, "3")
c.Assert(id4, Equals, "4")
time.Sleep(REPLICATION_LAG)
c.Assert(id4, Equals, servers[1].clusterConfig.CurrentDatabaseId())
c.Assert(id4, Equals, servers[2].clusterConfig.CurrentDatabaseId())
}
func (self *CoordinatorSuite) TestDistributesRingLocationsToNewServer(c *C) {
}
func (self *CoordinatorSuite) TestWillSetTimestampsAndSequenceNumbersForPointsWithout(c *C) { func (self *CoordinatorSuite) TestWillSetTimestampsAndSequenceNumbersForPointsWithout(c *C) {
datastoreMock := &DatastoreMock{} datastoreMock := &DatastoreMock{}
coordinator := NewCoordinatorImpl(datastoreMock, nil, nil) coordinator := NewCoordinatorImpl(datastoreMock, nil, nil)

View File

@ -16,8 +16,8 @@ type Coordinator interface {
// 5. TODO: Aggregation on the nodes // 5. TODO: Aggregation on the nodes
DistributeQuery(db string, query *parser.Query, yield func(*protocol.Series) error) error DistributeQuery(db string, query *parser.Query, yield func(*protocol.Series) error) error
WriteSeriesData(db string, series *protocol.Series) error WriteSeriesData(db string, series *protocol.Series) error
CreateDatabase(db, initialApiKey, requestingApiKey string) error
DropDatabase(db string) error DropDatabase(db string) error
CreateDatabase(db string) error
} }
type UserManager interface { type UserManager interface {

View File

@ -41,11 +41,6 @@ func NewRaftServer(path string, host string, port int, clusterConfig *ClusterCon
if !registeredCommands { if !registeredCommands {
// raft.SetLogLevel(raft.Trace) // raft.SetLogLevel(raft.Trace)
registeredCommands = true registeredCommands = true
raft.RegisterCommand(&AddApiKeyCommand{})
raft.RegisterCommand(&RemoveApiKeyCommand{})
raft.RegisterCommand(&AddServerToLocationCommand{})
raft.RegisterCommand(&RemoveServerFromLocationCommand{})
raft.RegisterCommand(&NextDatabaseIdCommand{})
raft.RegisterCommand(&CreateDatabaseCommand{}) raft.RegisterCommand(&CreateDatabaseCommand{})
raft.RegisterCommand(&DropDatabaseCommand{}) raft.RegisterCommand(&DropDatabaseCommand{})
raft.RegisterCommand(&SaveDbUserCommand{}) raft.RegisterCommand(&SaveDbUserCommand{})
@ -111,36 +106,6 @@ func (s *RaftServer) doOrProxyCommand(command raft.Command, commandType string)
return nil, nil return nil, nil
} }
func (s *RaftServer) AddReadApiKey(db, key string) error {
command := NewAddApikeyCommand(db, key, ReadKey)
_, err := s.doOrProxyCommand(command, "add_api_key")
return err
}
func (s *RaftServer) AddWriteApiKey(db, key string) error {
command := NewAddApikeyCommand(db, key, WriteKey)
_, err := s.doOrProxyCommand(command, "add_api_key")
return err
}
func (s *RaftServer) RemoveApiKey(db, key string) error {
command := NewRemoveApiKeyCommand(db, key)
_, err := s.doOrProxyCommand(command, "remove_api_key")
return err
}
func (s *RaftServer) AddServerToLocation(host string, location int64) error {
command := NewAddServerToLocationCommand(host, location)
_, err := s.doOrProxyCommand(command, "add_server")
return err
}
func (s *RaftServer) RemoveServerFromLocation(host string, location int64) error {
command := NewRemoveServerFromLocationCommand(host, location)
_, err := s.doOrProxyCommand(command, "remove_server")
return err
}
func (s *RaftServer) CreateDatabase(name string) error { func (s *RaftServer) CreateDatabase(name string) error {
command := NewCreateDatabaseCommand(name) command := NewCreateDatabaseCommand(name)
_, err := s.doOrProxyCommand(command, "create_db") _, err := s.doOrProxyCommand(command, "create_db")
@ -153,12 +118,6 @@ func (s *RaftServer) DropDatabase(name string) error {
return err return err
} }
func (s *RaftServer) GetNextDatabaseId() (string, error) {
command := NewNextDatabaseIdCommand(s.clusterConfig.nextDatabaseId)
id, err := s.doOrProxyCommand(command, "next_db")
return id.(string), err
}
func (s *RaftServer) SaveDbUser(u *dbUser) error { func (s *RaftServer) SaveDbUser(u *dbUser) error {
command := NewSaveDbUserCommand(u) command := NewSaveDbUserCommand(u)
_, err := s.doOrProxyCommand(command, "save_db_user") _, err := s.doOrProxyCommand(command, "save_db_user")
@ -323,23 +282,13 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) {
func (s *RaftServer) configHandler(w http.ResponseWriter, req *http.Request) { func (s *RaftServer) configHandler(w http.ResponseWriter, req *http.Request) {
jsonObject := make(map[string]interface{}) jsonObject := make(map[string]interface{})
readKeys := make([]string, 0) dbs := make([]string, 0)
for k, _ := range s.clusterConfig.ReadApiKeys { for db, _ := range s.clusterConfig.databaseNames {
readKeys = append(readKeys, k) dbs = append(dbs, db)
} }
jsonObject["read_keys"] = readKeys jsonObject["databases"] = dbs
writeKeys := make([]string, 0) jsonObject["cluster_admins"] = s.clusterConfig.clusterAdmins
for k, _ := range s.clusterConfig.WriteApiKeys { jsonObject["database_users"] = s.clusterConfig.dbUsers
writeKeys = append(writeKeys, k)
}
jsonObject["write_keys"] = writeKeys
locations := make([]map[string]interface{}, 0)
for location, servers := range s.clusterConfig.RingLocationToServers {
s := servers
locations = append(locations, map[string]interface{}{"location": location, "servers": s})
}
jsonObject["locations"] = locations
js, err := json.Marshal(jsonObject) js, err := json.Marshal(jsonObject)
if err != nil { if err != nil {
log.Println("ERROR marshalling config: ", err) log.Println("ERROR marshalling config: ", err)
@ -362,17 +311,7 @@ func (s *RaftServer) processCommandHandler(w http.ResponseWriter, req *http.Requ
vars := mux.Vars(req) vars := mux.Vars(req)
value := vars["command_type"] value := vars["command_type"]
var command raft.Command var command raft.Command
if value == "add_api_key" { if value == "create_db" {
command = &AddApiKeyCommand{}
} else if value == "remove_api_key" {
command = &RemoveApiKeyCommand{}
} else if value == "add_server" {
command = &AddServerToLocationCommand{}
} else if value == "remove_server" {
command = &RemoveServerFromLocationCommand{}
} else if value == "next_db" {
command = &NextDatabaseIdCommand{}
} else if value == "create_db" {
command = &CreateDatabaseCommand{} command = &CreateDatabaseCommand{}
} else if value == "drop_db" { } else if value == "drop_db" {
command = &DropDatabaseCommand{} command = &DropDatabaseCommand{}

View File

@ -45,7 +45,7 @@ func (self *MockCoordinator) WriteSeriesData(database string, series *protocol.S
return nil return nil
} }
func (self *MockCoordinator) CreateDatabase(db, initialApiKey, requestingApiKey string) error { func (self *MockCoordinator) CreateDatabase(db string) error {
return nil return nil
} }

View File

@ -6,7 +6,9 @@
#include <string.h> #include <string.h>
#include "query_types.h" #include "query_types.h"
#ifndef __APPLE_CC__
__asm__(".symver memcpy,memcpy@GLIBC_2.2.5"); __asm__(".symver memcpy,memcpy@GLIBC_2.2.5");
#endif
expression *create_expression(expression *left, char op, expression *right) { expression *create_expression(expression *left, char op, expression *right) {
expression *expr = malloc(sizeof(expression)); expression *expr = malloc(sizeof(expression));

View File

@ -30,8 +30,7 @@ func main() {
config := configuration.LoadConfiguration(*fileName) config := configuration.LoadConfiguration(*fileName)
log.Println("Starting Influx Server...") log.Println("Starting Influx Server...")
ringSize := int64(1000) clusterConfig := coordinator.NewClusterConfiguration()
clusterConfig := coordinator.NewClusterConfiguration(ringSize)
os.MkdirAll(config.RaftDir, 0744) os.MkdirAll(config.RaftDir, 0744)
raftServer := coordinator.NewRaftServer(config.RaftDir, "localhost", config.RaftServerPort, clusterConfig) raftServer := coordinator.NewRaftServer(config.RaftDir, "localhost", config.RaftServerPort, clusterConfig)