diff --git a/src/api/http/api_test.go b/src/api/http/api_test.go index 53ad494d44..05c33ca279 100644 --- a/src/api/http/api_test.go +++ b/src/api/http/api_test.go @@ -2,6 +2,7 @@ package http import ( "bytes" + "cluster" "common" "coordinator" "encoding/base64" @@ -98,7 +99,7 @@ func (self *MockEngine) RunQuery(_ common.User, _ string, query string, localOnl type MockCoordinator struct { coordinator.Coordinator series []*protocol.Series - continuousQueries map[string][]*coordinator.ContinuousQuery + continuousQueries map[string][]*cluster.ContinuousQuery deleteQueries []*parser.DeleteQuery db string droppedDb string @@ -119,8 +120,8 @@ func (self *MockCoordinator) CreateDatabase(_ common.User, db string, _ uint8) e return nil } -func (self *MockCoordinator) ListDatabases(_ common.User) ([]*coordinator.Database, error) { - return []*coordinator.Database{&coordinator.Database{"db1", 1}, &coordinator.Database{"db2", 1}}, nil +func (self *MockCoordinator) ListDatabases(_ common.User) ([]*cluster.Database, error) { + return []*cluster.Database{&cluster.Database{"db1", 1}, &cluster.Database{"db2", 1}}, nil } func (self *MockCoordinator) DropDatabase(_ common.User, db string) error { @@ -154,7 +155,7 @@ func (self *MockCoordinator) ListContinuousQueries(_ common.User, db string) ([] } func (self *MockCoordinator) CreateContinuousQuery(_ common.User, db string, query string) error { - self.continuousQueries[db] = append(self.continuousQueries[db], &coordinator.ContinuousQuery{2, query}) + self.continuousQueries[db] = append(self.continuousQueries[db], &cluster.ContinuousQuery{2, query}) return nil } @@ -172,9 +173,9 @@ func (self *ApiSuite) formatUrl(path string, args ...interface{}) string { func (self *ApiSuite) SetUpSuite(c *C) { self.coordinator = &MockCoordinator{ - continuousQueries: map[string][]*coordinator.ContinuousQuery{ - "db1": []*coordinator.ContinuousQuery{ - &coordinator.ContinuousQuery{1, "select * from foo into bar;"}, + continuousQueries: map[string][]*cluster.ContinuousQuery{ + "db1": []*cluster.ContinuousQuery{ + &cluster.ContinuousQuery{1, "select * from foo into bar;"}, }, }, } @@ -753,10 +754,10 @@ func (self *ApiSuite) TestDatabasesIndex(c *C) { defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) c.Assert(err, IsNil) - users := []*coordinator.Database{} + users := []*cluster.Database{} err = json.Unmarshal(body, &users) c.Assert(err, IsNil) - c.Assert(users, DeepEquals, []*coordinator.Database{&coordinator.Database{"db1", uint8(1)}, &coordinator.Database{"db2", uint8(1)}}) + c.Assert(users, DeepEquals, []*cluster.Database{&cluster.Database{"db1", uint8(1)}, &cluster.Database{"db2", uint8(1)}}) } } @@ -771,10 +772,10 @@ func (self *ApiSuite) TestBasicAuthentication(c *C) { body, err := ioutil.ReadAll(resp.Body) c.Assert(err, IsNil) c.Assert(resp.StatusCode, Equals, libhttp.StatusOK) - users := []*coordinator.Database{} + users := []*cluster.Database{} err = json.Unmarshal(body, &users) c.Assert(err, IsNil) - c.Assert(users, DeepEquals, []*coordinator.Database{&coordinator.Database{"db1", 1}, &coordinator.Database{"db2", 1}}) + c.Assert(users, DeepEquals, []*cluster.Database{&cluster.Database{"db1", 1}, &cluster.Database{"db2", 1}}) } func (self *ApiSuite) TestContinuousQueryOperations(c *C) { diff --git a/src/coordinator/cluster_configuration.go b/src/cluster/cluster_configuration.go similarity index 77% rename from src/coordinator/cluster_configuration.go rename to src/cluster/cluster_configuration.go index 32d27ad9b1..8586b42613 100644 --- a/src/coordinator/cluster_configuration.go +++ b/src/cluster/cluster_configuration.go @@ -1,4 +1,4 @@ -package coordinator +package cluster import ( "bytes" @@ -14,6 +14,15 @@ import ( "time" ) +// defined by cluster config (in cluster package) +type QuerySpec interface { + GetStartTime() time.Time + GetEndTime() time.Time + Database() string + TableNames() []string + IsRegex() bool +} + /* This struct stores all the metadata confiugration information about a running cluster. This includes the servers in the cluster and their state, databases, users, and which continuous queries are running. @@ -25,22 +34,23 @@ import ( */ type ClusterConfiguration struct { createDatabaseLock sync.RWMutex - databaseReplicationFactors map[string]uint8 + DatabaseReplicationFactors map[string]uint8 usersLock sync.RWMutex - clusterAdmins map[string]*clusterAdmin - dbUsers map[string]map[string]*dbUser + clusterAdmins map[string]*ClusterAdmin + dbUsers map[string]map[string]*DbUser servers []*ClusterServer serversLock sync.RWMutex continuousQueries map[string][]*ContinuousQuery continuousQueriesLock sync.RWMutex - parsedContinuousQueries map[string]map[uint32]*parser.SelectQuery + ParsedContinuousQueries map[string]map[uint32]*parser.SelectQuery continuousQueryTimestamp time.Time hasRunningServers bool - localServerId uint32 + LocalServerId uint32 ClusterVersion uint32 config *configuration.Configuration addedLocalServerWait chan bool addedLocalServer bool + connectionCreator func(string) ServerConnection } type ContinuousQuery struct { @@ -53,16 +63,17 @@ type Database struct { ReplicationFactor uint8 `json:"replicationFactor"` } -func NewClusterConfiguration(config *configuration.Configuration) *ClusterConfiguration { +func NewClusterConfiguration(config *configuration.Configuration, connectionCreator func(string) ServerConnection) *ClusterConfiguration { return &ClusterConfiguration{ - databaseReplicationFactors: make(map[string]uint8), - clusterAdmins: make(map[string]*clusterAdmin), - dbUsers: make(map[string]map[string]*dbUser), + DatabaseReplicationFactors: make(map[string]uint8), + clusterAdmins: make(map[string]*ClusterAdmin), + dbUsers: make(map[string]map[string]*DbUser), continuousQueries: make(map[string][]*ContinuousQuery), - parsedContinuousQueries: make(map[string]map[uint32]*parser.SelectQuery), + ParsedContinuousQueries: make(map[string]map[uint32]*parser.SelectQuery), servers: make([]*ClusterServer, 0), config: config, addedLocalServerWait: make(chan bool, 1), + connectionCreator: connectionCreator, } } @@ -83,7 +94,7 @@ func (self *ClusterConfiguration) WaitForLocalServerLoaded() { } func (self *ClusterConfiguration) GetReplicationFactor(database *string) uint8 { - return self.databaseReplicationFactors[*database] + return self.DatabaseReplicationFactors[*database] } func (self *ClusterConfiguration) IsActive() bool { @@ -118,9 +129,9 @@ func (self *ClusterConfiguration) GetServerById(id *uint32) *ClusterServer { return nil } -type serverToQuery struct { - server *ClusterServer - ringLocationsToQuery uint32 +type ServerToQuery struct { + Server *ClusterServer + RingLocationToQuery uint32 } // This function will return an array of servers to query and the number of ring locations to return per server. @@ -130,21 +141,21 @@ type serverToQuery struct { // if you have a cluster with databases with RFs of 1, 2, and 3: optimal cluster sizes would be 6, 12, 18, 24, 30, etc. // If that's not the case, one or more servers will have to filter out data from other servers on the fly, which could // be a little more expensive. -func (self *ClusterConfiguration) GetServersToMakeQueryTo(database *string) (servers []*serverToQuery, replicationFactor uint32) { +func (self *ClusterConfiguration) GetServersToMakeQueryTo(database *string) (servers []*ServerToQuery, replicationFactor uint32) { replicationFactor = uint32(self.GetReplicationFactor(database)) replicationFactorInt := int(replicationFactor) index := 0 for i, s := range self.servers { - if s.Id == self.localServerId { + if s.Id == self.LocalServerId { index = i % replicationFactorInt break } } - servers = make([]*serverToQuery, 0, len(self.servers)/replicationFactorInt) + servers = make([]*ServerToQuery, 0, len(self.servers)/replicationFactorInt) serverCount := len(self.servers) for ; index < serverCount; index += replicationFactorInt { server := self.servers[index] - servers = append(servers, &serverToQuery{server, replicationFactor}) + servers = append(servers, &ServerToQuery{server, replicationFactor}) } // need to maybe add a server and set which ones filter their data if serverCount%replicationFactorInt != 0 { @@ -175,10 +186,10 @@ func (self *ClusterConfiguration) GetServersToMakeQueryTo(database *string) (ser */ lastIndexAdded := index - replicationFactorInt if serverCount-lastIndexAdded == replicationFactorInt { - servers[0].ringLocationsToQuery = uint32(replicationFactorInt - 1) - servers = append(servers, &serverToQuery{self.servers[0], replicationFactor}) + servers[0].RingLocationToQuery = uint32(replicationFactorInt - 1) + servers = append(servers, &ServerToQuery{self.servers[0], replicationFactor}) } else { - servers[0].ringLocationsToQuery = uint32(replicationFactorInt - 1) + servers[0].RingLocationToQuery = uint32(replicationFactorInt - 1) } } return servers, replicationFactor @@ -198,7 +209,7 @@ func (self *ClusterConfiguration) GetRingFilterFunction(database string, countOf serversToInclude := make([]*ClusterServer, 0, countOfServersToInclude) countServers := int(countOfServersToInclude) for i, s := range self.servers { - if s.Id == self.localServerId { + if s.Id == self.LocalServerId { serversToInclude = append(serversToInclude, s) for j := i - 1; j >= 0 && len(serversToInclude) < countServers; j-- { serversToInclude = append(serversToInclude, self.servers[j]) @@ -307,7 +318,7 @@ func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) { server.Connect() } else if !self.addedLocalServer { log.Info("Added the local server") - self.localServerId = server.Id + self.LocalServerId = server.Id self.addedLocalServerWait <- true self.addedLocalServer = true } @@ -317,8 +328,8 @@ func (self *ClusterConfiguration) GetDatabases() []*Database { self.createDatabaseLock.RLock() defer self.createDatabaseLock.RUnlock() - dbs := make([]*Database, 0, len(self.databaseReplicationFactors)) - for name, rf := range self.databaseReplicationFactors { + dbs := make([]*Database, 0, len(self.DatabaseReplicationFactors)) + for name, rf := range self.DatabaseReplicationFactors { dbs = append(dbs, &Database{Name: name, ReplicationFactor: rf}) } return dbs @@ -328,10 +339,10 @@ func (self *ClusterConfiguration) CreateDatabase(name string, replicationFactor self.createDatabaseLock.Lock() defer self.createDatabaseLock.Unlock() - if _, ok := self.databaseReplicationFactors[name]; ok { + if _, ok := self.DatabaseReplicationFactors[name]; ok { return fmt.Errorf("database %s exists", name) } - self.databaseReplicationFactors[name] = replicationFactor + self.DatabaseReplicationFactors[name] = replicationFactor return nil } @@ -339,11 +350,11 @@ func (self *ClusterConfiguration) DropDatabase(name string) error { self.createDatabaseLock.Lock() defer self.createDatabaseLock.Unlock() - if _, ok := self.databaseReplicationFactors[name]; !ok { + if _, ok := self.DatabaseReplicationFactors[name]; !ok { return fmt.Errorf("Database %s doesn't exist", name) } - delete(self.databaseReplicationFactors, name) + delete(self.DatabaseReplicationFactors, name) self.usersLock.Lock() defer self.usersLock.Unlock() @@ -360,8 +371,8 @@ func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string) self.continuousQueries = map[string][]*ContinuousQuery{} } - if self.parsedContinuousQueries == nil { - self.parsedContinuousQueries = map[string]map[uint32]*parser.SelectQuery{} + if self.ParsedContinuousQueries == nil { + self.ParsedContinuousQueries = map[string]map[uint32]*parser.SelectQuery{} } maxId := uint32(0) @@ -377,10 +388,10 @@ func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string) } queryId := maxId + 1 - if self.parsedContinuousQueries[db] == nil { - self.parsedContinuousQueries[db] = map[uint32]*parser.SelectQuery{queryId: selectQuery} + if self.ParsedContinuousQueries[db] == nil { + self.ParsedContinuousQueries[db] = map[uint32]*parser.SelectQuery{queryId: selectQuery} } else { - self.parsedContinuousQueries[db][queryId] = selectQuery + self.ParsedContinuousQueries[db][queryId] = selectQuery } self.continuousQueries[db] = append(self.continuousQueries[db], &ContinuousQuery{queryId, query}) @@ -405,7 +416,7 @@ func (self *ClusterConfiguration) DeleteContinuousQuery(db string, id uint32) er q := self.continuousQueries[db] q[len(q)-1], q[i], q = nil, q[len(q)-1], q[:len(q)-1] self.continuousQueries[db] = q - delete(self.parsedContinuousQueries[db], id) + delete(self.ParsedContinuousQueries[db], id) break } } @@ -431,7 +442,7 @@ func (self *ClusterConfiguration) GetDbUsers(db string) (names []string) { return } -func (self *ClusterConfiguration) GetDbUser(db, username string) *dbUser { +func (self *ClusterConfiguration) GetDbUser(db, username string) *DbUser { self.usersLock.RLock() defer self.usersLock.RUnlock() @@ -442,7 +453,7 @@ func (self *ClusterConfiguration) GetDbUser(db, username string) *dbUser { return dbUsers[username] } -func (self *ClusterConfiguration) SaveDbUser(u *dbUser) { +func (self *ClusterConfiguration) SaveDbUser(u *DbUser) { self.usersLock.Lock() defer self.usersLock.Unlock() db := u.GetDb() @@ -455,7 +466,7 @@ func (self *ClusterConfiguration) SaveDbUser(u *dbUser) { return } if dbUsers == nil { - dbUsers = map[string]*dbUser{} + dbUsers = map[string]*DbUser{} self.dbUsers[db] = dbUsers } dbUsers[u.GetName()] = u @@ -471,7 +482,7 @@ func (self *ClusterConfiguration) ChangeDbUserPassword(db, username, hash string if dbUsers[username] == nil { return fmt.Errorf("Invalid username %s", username) } - dbUsers[username].changePassword(hash) + dbUsers[username].ChangePassword(hash) return nil } @@ -486,14 +497,14 @@ func (self *ClusterConfiguration) GetClusterAdmins() (names []string) { return } -func (self *ClusterConfiguration) GetClusterAdmin(username string) *clusterAdmin { +func (self *ClusterConfiguration) GetClusterAdmin(username string) *ClusterAdmin { self.usersLock.RLock() defer self.usersLock.RUnlock() return self.clusterAdmins[username] } -func (self *ClusterConfiguration) SaveClusterAdmin(u *clusterAdmin) { +func (self *ClusterConfiguration) SaveClusterAdmin(u *ClusterAdmin) { self.usersLock.Lock() defer self.usersLock.Unlock() if u.IsDeleted() { @@ -506,26 +517,26 @@ func (self *ClusterConfiguration) SaveClusterAdmin(u *clusterAdmin) { func (self *ClusterConfiguration) GetDatabaseReplicationFactor(name string) uint8 { self.createDatabaseLock.RLock() defer self.createDatabaseLock.RUnlock() - return self.databaseReplicationFactors[name] + return self.DatabaseReplicationFactors[name] } func (self *ClusterConfiguration) Save() ([]byte, error) { log.Debug("Dumping the cluster configuration") data := struct { Databases map[string]uint8 - Admins map[string]*clusterAdmin - DbUsers map[string]map[string]*dbUser + Admins map[string]*ClusterAdmin + DbUsers map[string]map[string]*DbUser Servers []*ClusterServer HasRunningServers bool LocalServerId uint32 ClusterVersion uint32 }{ - self.databaseReplicationFactors, + self.DatabaseReplicationFactors, self.clusterAdmins, self.dbUsers, self.servers, self.hasRunningServers, - self.localServerId, + self.LocalServerId, self.ClusterVersion, } @@ -542,8 +553,8 @@ func (self *ClusterConfiguration) Recovery(b []byte) error { log.Debug("Recovering the cluster configuration") data := struct { Databases map[string]uint8 - Admins map[string]*clusterAdmin - DbUsers map[string]map[string]*dbUser + Admins map[string]*ClusterAdmin + DbUsers map[string]map[string]*DbUser Servers []*ClusterServer HasRunningServers bool LocalServerId uint32 @@ -555,26 +566,29 @@ func (self *ClusterConfiguration) Recovery(b []byte) error { return err } - self.databaseReplicationFactors = data.Databases + self.DatabaseReplicationFactors = data.Databases self.clusterAdmins = data.Admins self.dbUsers = data.DbUsers // copy the protobuf client from the old servers - oldServers := map[string]*ProtobufClient{} + oldServers := map[string]ServerConnection{} for _, server := range self.servers { - oldServers[server.ProtobufConnectionString] = server.protobufClient + oldServers[server.ProtobufConnectionString] = server.connection } self.servers = data.Servers for _, server := range self.servers { - server.protobufClient = oldServers[server.ProtobufConnectionString] - if server.protobufClient == nil { - server.Connect() + server.connection = oldServers[server.ProtobufConnectionString] + if server.connection == nil { + server.connection = self.connectionCreator(server.ProtobufConnectionString) + if server.ProtobufConnectionString != self.config.ProtobufConnectionString() { + server.Connect() + } } } self.hasRunningServers = data.HasRunningServers - self.localServerId = data.LocalServerId + self.LocalServerId = data.LocalServerId self.ClusterVersion = data.ClusterVersion if self.addedLocalServer { @@ -594,3 +608,50 @@ func (self *ClusterConfiguration) Recovery(b []byte) error { return nil } + +func (self *ClusterConfiguration) AuthenticateDbUser(db, username, password string) (common.User, error) { + dbUsers := self.dbUsers[db] + if dbUsers == nil || dbUsers[username] == nil { + return nil, common.NewAuthorizationError("Invalid username/password") + } + user := dbUsers[username] + if user.isValidPwd(password) { + return user, nil + } + return nil, common.NewAuthorizationError("Invalid username/password") +} + +func (self *ClusterConfiguration) AuthenticateClusterAdmin(username, password string) (common.User, error) { + user := self.clusterAdmins[username] + if user == nil { + return nil, common.NewAuthorizationError("Invalid username/password") + } + if user.isValidPwd(password) { + return user, nil + } + return nil, common.NewAuthorizationError("Invalid username/password") +} + +func (self *ClusterConfiguration) HasContinuousQueries() bool { + return self.continuousQueries != nil && len(self.continuousQueries) > 0 +} + +func (self *ClusterConfiguration) LastContinuousQueryRunTime() time.Time { + return self.continuousQueryTimestamp +} + +func (self *ClusterConfiguration) SetLastContinuousQueryRunTime(t time.Time) { + self.continuousQueryTimestamp = t +} + +func (self *ClusterConfiguration) GetMapForJsonSerialization() map[string]interface{} { + jsonObject := make(map[string]interface{}) + dbs := make([]string, 0) + for db, _ := range self.DatabaseReplicationFactors { + dbs = append(dbs, db) + } + jsonObject["databases"] = dbs + jsonObject["cluster_admins"] = self.clusterAdmins + jsonObject["database_users"] = self.dbUsers + return jsonObject +} diff --git a/src/coordinator/cluster_server.go b/src/cluster/cluster_server.go similarity index 64% rename from src/coordinator/cluster_server.go rename to src/cluster/cluster_server.go index c6ae378364..35b82aa867 100644 --- a/src/coordinator/cluster_server.go +++ b/src/cluster/cluster_server.go @@ -1,4 +1,4 @@ -package coordinator +package cluster import ( log "code.google.com/p/log4go" @@ -12,7 +12,12 @@ type ClusterServer struct { State ServerState RaftConnectionString string ProtobufConnectionString string - protobufClient *ProtobufClient + connection ServerConnection +} + +type ServerConnection interface { + Connect() + MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error } type ServerState int @@ -25,10 +30,20 @@ const ( Potential ) +func NewClusterServer(raftName, raftConnectionString, protobufConnectionString string, connection ServerConnection) *ClusterServer { + return &ClusterServer{ + RaftName: raftName, + RaftConnectionString: raftConnectionString, + ProtobufConnectionString: protobufConnectionString, + connection: connection, + } +} + // in the coordinator test we don't want to create protobuf servers, // so we just ignore creating a protobuf client when the connection // string has a 0 port func shouldConnect(addr string) bool { + log.Debug("SHOULD CONNECT: ", addr) _, port, err := net.SplitHostPort(addr) if err != nil { log.Error("Error parsing address '%s': %s", addr, err) @@ -43,19 +58,15 @@ func shouldConnect(addr string) bool { } func (self *ClusterServer) Connect() { - if self.protobufClient != nil { - return - } - if !shouldConnect(self.ProtobufConnectionString) { return } log.Info("ClusterServer: %d connecting to: %s", self.Id, self.ProtobufConnectionString) - self.protobufClient = NewProtobufClient(self.ProtobufConnectionString) + self.connection.Connect() } func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error { self.Connect() - return self.protobufClient.MakeRequest(request, responseStream) + return self.connection.MakeRequest(request, responseStream) } diff --git a/src/coordinator/user.go b/src/cluster/user.go similarity index 79% rename from src/coordinator/user.go rename to src/cluster/user.go index 03fa7c5235..c542a82e31 100644 --- a/src/coordinator/user.go +++ b/src/cluster/user.go @@ -1,4 +1,4 @@ -package coordinator +package cluster import ( "code.google.com/p/go.crypto/bcrypt" @@ -40,7 +40,7 @@ func (self *CommonUser) IsDeleted() bool { return self.IsUserDeleted } -func (self *CommonUser) changePassword(hash string) error { +func (self *CommonUser) ChangePassword(hash string) error { self.Hash = hash userCache.Delete(self.CacheKey) return nil @@ -78,23 +78,23 @@ func (self *CommonUser) HasReadAccess(name string) bool { return false } -type clusterAdmin struct { +type ClusterAdmin struct { CommonUser `json:"common"` } -func (self *clusterAdmin) IsClusterAdmin() bool { +func (self *ClusterAdmin) IsClusterAdmin() bool { return true } -func (self *clusterAdmin) HasWriteAccess(_ string) bool { +func (self *ClusterAdmin) HasWriteAccess(_ string) bool { return true } -func (self *clusterAdmin) HasReadAccess(_ string) bool { +func (self *ClusterAdmin) HasReadAccess(_ string) bool { return true } -type dbUser struct { +type DbUser struct { CommonUser `json:"common"` Db string `json:"db"` WriteTo []*Matcher `json:"write_matchers"` @@ -102,11 +102,11 @@ type dbUser struct { IsAdmin bool `json:"is_admin"` } -func (self *dbUser) IsDbAdmin(db string) bool { +func (self *DbUser) IsDbAdmin(db string) bool { return self.IsAdmin && self.Db == db } -func (self *dbUser) HasWriteAccess(name string) bool { +func (self *DbUser) HasWriteAccess(name string) bool { for _, matcher := range self.WriteTo { if matcher.Matches(name) { return true @@ -116,7 +116,7 @@ func (self *dbUser) HasWriteAccess(name string) bool { return false } -func (self *dbUser) HasReadAccess(name string) bool { +func (self *DbUser) HasReadAccess(name string) bool { for _, matcher := range self.ReadFrom { if matcher.Matches(name) { return true @@ -126,13 +126,11 @@ func (self *dbUser) HasReadAccess(name string) bool { return false } -func (self *dbUser) GetDb() string { +func (self *DbUser) GetDb() string { return self.Db } -// private funcs - -func hashPassword(password string) ([]byte, error) { +func HashPassword(password string) ([]byte, error) { // The second arg is the cost of the hashing, higher is slower but makes it harder // to brute force, since it will be really slow and impractical return bcrypt.GenerateFromPassword([]byte(password), 10) diff --git a/src/coordinator/user_test.go b/src/cluster/user_test.go similarity index 78% rename from src/coordinator/user_test.go rename to src/cluster/user_test.go index ba8a9e36cd..480f3d8d5f 100644 --- a/src/coordinator/user_test.go +++ b/src/cluster/user_test.go @@ -1,4 +1,4 @@ -package coordinator +package cluster import ( "common" @@ -13,7 +13,7 @@ var root common.User func (self *UserSuite) SetUpSuite(c *C) { user := &clusterAdmin{CommonUser{"root", "", false, "root"}} - c.Assert(user.changePassword("password"), IsNil) + c.Assert(user.ChangePassword("password"), IsNil) root = user } @@ -21,9 +21,9 @@ func (self *UserSuite) TestProperties(c *C) { u := clusterAdmin{CommonUser{Name: "root"}} c.Assert(u.IsClusterAdmin(), Equals, true) c.Assert(u.GetName(), Equals, "root") - hash, err := hashPassword("foobar") + hash, err := HashPassword("foobar") c.Assert(err, IsNil) - c.Assert(u.changePassword(string(hash)), IsNil) + c.Assert(u.ChangePassword(string(hash)), IsNil) c.Assert(u.isValidPwd("foobar"), Equals, true) c.Assert(u.isValidPwd("password"), Equals, false) @@ -31,9 +31,9 @@ func (self *UserSuite) TestProperties(c *C) { c.Assert(dbUser.IsClusterAdmin(), Equals, false) c.Assert(dbUser.IsDbAdmin("db"), Equals, true) c.Assert(dbUser.GetName(), Equals, "db_user") - hash, err = hashPassword("password") + hash, err = HashPassword("password") c.Assert(err, IsNil) - c.Assert(dbUser.changePassword(string(hash)), IsNil) + c.Assert(dbUser.ChangePassword(string(hash)), IsNil) c.Assert(dbUser.isValidPwd("password"), Equals, true) c.Assert(dbUser.isValidPwd("password1"), Equals, false) } diff --git a/src/coordinator/client_server_test.go b/src/coordinator/client_server_test.go index e33db73368..5903f4706b 100644 --- a/src/coordinator/client_server_test.go +++ b/src/coordinator/client_server_test.go @@ -50,6 +50,7 @@ func (self *ClientServerSuite) TestClientCanMakeRequests(c *C) { go protobufServer.ListenAndServe() c.Assert(protobufServer, Not(IsNil)) protobufClient := NewProtobufClient("localhost:8091") + protobufClient.Connect() responseStream := make(chan *protocol.Response, 1) mock := ` diff --git a/src/coordinator/command.go b/src/coordinator/command.go index e1b28f7200..bce4849c1e 100644 --- a/src/coordinator/command.go +++ b/src/coordinator/command.go @@ -1,7 +1,9 @@ package coordinator import ( + "cluster" log "code.google.com/p/log4go" + "fmt" "github.com/goraft/raft" "time" ) @@ -39,7 +41,7 @@ func (c *SetContinuousQueryTimestampCommand) CommandName() string { } func (c *SetContinuousQueryTimestampCommand) Apply(server raft.Server) (interface{}, error) { - config := server.Context().(*ClusterConfiguration) + config := server.Context().(*cluster.ClusterConfiguration) err := config.SetContinuousQueryTimestamp(c.Timestamp) return nil, err } @@ -58,7 +60,7 @@ func (c *CreateContinuousQueryCommand) CommandName() string { } func (c *CreateContinuousQueryCommand) Apply(server raft.Server) (interface{}, error) { - config := server.Context().(*ClusterConfiguration) + config := server.Context().(*cluster.ClusterConfiguration) err := config.CreateContinuousQuery(c.Database, c.Query) return nil, err } @@ -77,7 +79,7 @@ func (c *DeleteContinuousQueryCommand) CommandName() string { } func (c *DeleteContinuousQueryCommand) Apply(server raft.Server) (interface{}, error) { - config := server.Context().(*ClusterConfiguration) + config := server.Context().(*cluster.ClusterConfiguration) err := config.DeleteContinuousQuery(c.Database, c.Id) return nil, err } @@ -95,7 +97,7 @@ func (c *DropDatabaseCommand) CommandName() string { } func (c *DropDatabaseCommand) Apply(server raft.Server) (interface{}, error) { - config := server.Context().(*ClusterConfiguration) + config := server.Context().(*cluster.ClusterConfiguration) err := config.DropDatabase(c.Name) return nil, err } @@ -114,16 +116,16 @@ func (c *CreateDatabaseCommand) CommandName() string { } func (c *CreateDatabaseCommand) Apply(server raft.Server) (interface{}, error) { - config := server.Context().(*ClusterConfiguration) + config := server.Context().(*cluster.ClusterConfiguration) err := config.CreateDatabase(c.Name, c.ReplicationFactor) return nil, err } type SaveDbUserCommand struct { - User *dbUser `json:"user"` + User *cluster.DbUser `json:"user"` } -func NewSaveDbUserCommand(u *dbUser) *SaveDbUserCommand { +func NewSaveDbUserCommand(u *cluster.DbUser) *SaveDbUserCommand { return &SaveDbUserCommand{ User: u, } @@ -134,7 +136,7 @@ func (c *SaveDbUserCommand) CommandName() string { } func (c *SaveDbUserCommand) Apply(server raft.Server) (interface{}, error) { - config := server.Context().(*ClusterConfiguration) + config := server.Context().(*cluster.ClusterConfiguration) config.SaveDbUser(c.User) log.Debug("(raft:%s) Created user %s:%s", server.Name(), c.User.Db, c.User.Name) return nil, nil @@ -160,15 +162,15 @@ func (c *ChangeDbUserPassword) CommandName() string { func (c *ChangeDbUserPassword) Apply(server raft.Server) (interface{}, error) { log.Debug("(raft:%s) changing db user password for %s:%s", server.Name(), c.Database, c.Username) - config := server.Context().(*ClusterConfiguration) + config := server.Context().(*cluster.ClusterConfiguration) return nil, config.ChangeDbUserPassword(c.Database, c.Username, c.Hash) } type SaveClusterAdminCommand struct { - User *clusterAdmin `json:"user"` + User *cluster.ClusterAdmin `json:"user"` } -func NewSaveClusterAdminCommand(u *clusterAdmin) *SaveClusterAdminCommand { +func NewSaveClusterAdminCommand(u *cluster.ClusterAdmin) *SaveClusterAdminCommand { return &SaveClusterAdminCommand{ User: u, } @@ -179,16 +181,16 @@ func (c *SaveClusterAdminCommand) CommandName() string { } func (c *SaveClusterAdminCommand) Apply(server raft.Server) (interface{}, error) { - config := server.Context().(*ClusterConfiguration) + config := server.Context().(*cluster.ClusterConfiguration) config.SaveClusterAdmin(c.User) return nil, nil } type AddPotentialServerCommand struct { - Server *ClusterServer + Server *cluster.ClusterServer } -func NewAddPotentialServerCommand(s *ClusterServer) *AddPotentialServerCommand { +func NewAddPotentialServerCommand(s *cluster.ClusterServer) *AddPotentialServerCommand { return &AddPotentialServerCommand{Server: s} } @@ -197,17 +199,18 @@ func (c *AddPotentialServerCommand) CommandName() string { } func (c *AddPotentialServerCommand) Apply(server raft.Server) (interface{}, error) { - config := server.Context().(*ClusterConfiguration) + config := server.Context().(*cluster.ClusterConfiguration) + fmt.Println("COMMMAND: adding potent server") config.AddPotentialServer(c.Server) return nil, nil } type UpdateServerStateCommand struct { ServerId uint32 - State ServerState + State cluster.ServerState } -func NewUpdateServerStateCommand(serverId uint32, state ServerState) *UpdateServerStateCommand { +func NewUpdateServerStateCommand(serverId uint32, state cluster.ServerState) *UpdateServerStateCommand { return &UpdateServerStateCommand{ServerId: serverId, State: state} } @@ -216,7 +219,7 @@ func (c *UpdateServerStateCommand) CommandName() string { } func (c *UpdateServerStateCommand) Apply(server raft.Server) (interface{}, error) { - config := server.Context().(*ClusterConfiguration) + config := server.Context().(*cluster.ClusterConfiguration) err := config.UpdateServerState(c.ServerId, c.State) return nil, err } diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index a874f3dfe6..d6cf009cf2 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -1,6 +1,7 @@ package coordinator import ( + "cluster" log "code.google.com/p/log4go" "common" "datastore" @@ -18,7 +19,7 @@ import ( ) type CoordinatorImpl struct { - clusterConfiguration *ClusterConfiguration + clusterConfiguration *cluster.ClusterConfiguration raftServer ClusterConsensus datastore datastore.Datastore requestId uint32 @@ -67,7 +68,7 @@ func init() { } } -func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsensus, clusterConfiguration *ClusterConfiguration) *CoordinatorImpl { +func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsensus, clusterConfiguration *cluster.ClusterConfiguration) *CoordinatorImpl { coordinator := &CoordinatorImpl{ clusterConfiguration: clusterConfiguration, raftServer: raftServer, @@ -98,18 +99,18 @@ func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query isDbUser := !user.IsClusterAdmin() responseChannels := make([]chan *protocol.Response, 0, len(servers)+1) queryString := query.GetQueryString() - var localServerToQuery *serverToQuery + var localServerToQuery *cluster.ServerToQuery for _, server := range servers { - if server.server.Id == self.clusterConfiguration.localServerId { + if server.Server.Id == self.clusterConfiguration.LocalServerId { localServerToQuery = server } else { request := &protocol.Request{Type: &queryRequest, Query: &queryString, Id: &id, Database: &db, UserName: &userName, IsDbUser: &isDbUser} - if server.ringLocationsToQuery != replicationFactor { - r := server.ringLocationsToQuery + if server.RingLocationToQuery != replicationFactor { + r := server.RingLocationToQuery request.RingLocationsToQuery = &r } responseChan := make(chan *protocol.Response, 3) - server.server.MakeRequest(request, responseChan) + server.Server.MakeRequest(request, responseChan) responseChannels = append(responseChannels, responseChan) } } @@ -131,8 +132,8 @@ func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query go func() { var ringFilter func(database, series *string, time *int64) bool - if replicationFactor != localServerToQuery.ringLocationsToQuery { - ringFilter = self.clusterConfiguration.GetRingFilterFunction(db, localServerToQuery.ringLocationsToQuery) + if replicationFactor != localServerToQuery.RingLocationToQuery { + ringFilter = self.clusterConfiguration.GetRingFilterFunction(db, localServerToQuery.RingLocationToQuery) } self.datastore.ExecuteQuery(user, db, query, sendFromLocal, ringFilter) local <- &protocol.Response{Type: &endStreamResponse} @@ -419,11 +420,11 @@ func (self *CoordinatorImpl) SyncLogIteration() { servers := self.clusterConfiguration.Servers() replicationFactors := map[uint8]bool{} - for _, replicationFactor := range self.clusterConfiguration.databaseReplicationFactors { + for _, replicationFactor := range self.clusterConfiguration.DatabaseReplicationFactors { replicationFactors[replicationFactor] = true } - localId := self.clusterConfiguration.localServerId + localId := self.clusterConfiguration.LocalServerId for replicationFactor, _ := range replicationFactors { for _, owningServer := range servers { @@ -478,7 +479,7 @@ func (self *CoordinatorImpl) SyncLogIteration() { } } -func (self *CoordinatorImpl) getLastAndCurrentSequenceNumbers(replicationFactor uint8, originatingServer, owningServer *ClusterServer) (uint64, uint64, error) { +func (self *CoordinatorImpl) getLastAndCurrentSequenceNumbers(replicationFactor uint8, originatingServer, owningServer *cluster.ClusterServer) (uint64, uint64, error) { lastKnownSequenceNumber, err := self.GetLastSequenceNumber(replicationFactor, originatingServer.Id, owningServer.Id) if err != nil { return 0, 0, err @@ -497,7 +498,7 @@ func (self *CoordinatorImpl) GetLastSequenceNumber(replicationFactor uint8, orig &replicationFactor, &owningServer, &originatingServer) } -func (self *CoordinatorImpl) getCurrentSequenceNumber(replicationFactor uint8, originatingServer, owningServer *ClusterServer) (uint64, error) { +func (self *CoordinatorImpl) getCurrentSequenceNumber(replicationFactor uint8, originatingServer, owningServer *cluster.ClusterServer) (uint64, error) { id := atomic.AddUint32(&self.requestId, uint32(1)) replicationFactor32 := uint32(replicationFactor) database := "" @@ -611,9 +612,9 @@ func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series } func (self *CoordinatorImpl) ProcessContinuousQueries(db string, series *protocol.Series) { - if self.clusterConfiguration.parsedContinuousQueries != nil { + if self.clusterConfiguration.ParsedContinuousQueries != nil { incomingSeriesName := *series.Name - for _, query := range self.clusterConfiguration.parsedContinuousQueries[db] { + for _, query := range self.clusterConfiguration.ParsedContinuousQueries[db] { groupByClause := query.GetGroupByClause() if groupByClause.Elems != nil { continue @@ -722,7 +723,7 @@ func (self *CoordinatorImpl) DeleteSeriesData(user common.User, db string, query servers, _ := self.clusterConfiguration.GetServersToMakeQueryTo(&db) for _, server := range servers { - if err := self.handleSeriesDelete(user, server.server, db, query); err != nil { + if err := self.handleSeriesDelete(user, server.Server, db, query); err != nil { return err } } @@ -739,17 +740,17 @@ func (self *CoordinatorImpl) createRequest(requestType protocol.Request_Type, da return &protocol.Request{Type: &requestType, Database: database, Id: &id} } -func (self *CoordinatorImpl) handleSeriesDelete(user common.User, server *ClusterServer, database string, query *parser.DeleteQuery) error { +func (self *CoordinatorImpl) handleSeriesDelete(user common.User, server *cluster.ClusterServer, database string, query *parser.DeleteQuery) error { owner, servers := self.clusterConfiguration.GetReplicas(server, &database) request := self.createRequest(proxyDelete, &database) queryStr := query.GetQueryStringWithTimeCondition() request.Query = &queryStr - request.OriginatingServerId = &self.clusterConfiguration.localServerId + request.OriginatingServerId = &self.clusterConfiguration.LocalServerId request.ClusterVersion = &self.clusterConfiguration.ClusterVersion request.OwnerServerId = &owner.Id - if server.Id == self.clusterConfiguration.localServerId { + if server.Id == self.clusterConfiguration.LocalServerId { // this is a local delete replicationFactor := self.clusterConfiguration.GetReplicationFactor(&database) err := self.datastore.LogRequestAndAssignSequenceNumber(request, &replicationFactor, &owner.Id) @@ -771,17 +772,17 @@ func (self *CoordinatorImpl) handleSeriesDelete(user common.User, server *Cluste return self.proxyUntilSuccess(servers, request) } -func (self *CoordinatorImpl) handleDropDatabase(server *ClusterServer, database string) error { +func (self *CoordinatorImpl) handleDropDatabase(server *cluster.ClusterServer, database string) error { owner, servers := self.clusterConfiguration.GetReplicas(server, &database) request := self.createRequest(proxyDropDatabase, &database) - request.OriginatingServerId = &self.clusterConfiguration.localServerId + request.OriginatingServerId = &self.clusterConfiguration.LocalServerId request.ClusterVersion = &self.clusterConfiguration.ClusterVersion request.OwnerServerId = &owner.Id replicationFactor := uint32(self.clusterConfiguration.GetDatabaseReplicationFactor(database)) request.ReplicationFactor = &replicationFactor - if server.Id == self.clusterConfiguration.localServerId { + if server.Id == self.clusterConfiguration.LocalServerId { // this is a local delete replicationFactor := self.clusterConfiguration.GetReplicationFactor(&database) err := self.datastore.LogRequestAndAssignSequenceNumber(request, &replicationFactor, &owner.Id) @@ -803,18 +804,18 @@ func (self *CoordinatorImpl) handleDropDatabase(server *ClusterServer, database return self.proxyUntilSuccess(servers, request) } -func (self *CoordinatorImpl) handleDropSeries(server *ClusterServer, database, series string) error { +func (self *CoordinatorImpl) handleDropSeries(server *cluster.ClusterServer, database, series string) error { owner, servers := self.clusterConfiguration.GetReplicas(server, &database) request := self.createRequest(proxyDropSeries, &database) - request.OriginatingServerId = &self.clusterConfiguration.localServerId + request.OriginatingServerId = &self.clusterConfiguration.LocalServerId request.ClusterVersion = &self.clusterConfiguration.ClusterVersion request.OwnerServerId = &owner.Id request.Series = &protocol.Series{Name: &series} replicationFactor := uint32(self.clusterConfiguration.GetDatabaseReplicationFactor(database)) request.ReplicationFactor = &replicationFactor - if server.Id == self.clusterConfiguration.localServerId { + if server.Id == self.clusterConfiguration.LocalServerId { // this is a local delete replicationFactor := self.clusterConfiguration.GetReplicationFactor(&database) err := self.datastore.LogRequestAndAssignSequenceNumber(request, &replicationFactor, &owner.Id) @@ -851,12 +852,12 @@ func (self *CoordinatorImpl) handleClusterWrite(serverIndex *int, db *string, se request := self.createRequest(proxyWrite, db) request.Series = series - request.OriginatingServerId = &self.clusterConfiguration.localServerId + request.OriginatingServerId = &self.clusterConfiguration.LocalServerId request.ClusterVersion = &self.clusterConfiguration.ClusterVersion request.OwnerServerId = &owner.Id for _, s := range servers { - if s.Id == self.clusterConfiguration.localServerId { + if s.Id == self.clusterConfiguration.LocalServerId { // TODO: make storing of the data and logging of the request atomic replicationFactor := self.clusterConfiguration.GetReplicationFactor(db) err := self.datastore.LogRequestAndAssignSequenceNumber(request, &replicationFactor, &owner.Id) @@ -882,9 +883,9 @@ func (self *CoordinatorImpl) handleClusterWrite(serverIndex *int, db *string, se // This method will attemp to proxy the request until the call to proxy returns nil. If no server succeeds, // the last err value will be returned. -func (self *CoordinatorImpl) proxyUntilSuccess(servers []*ClusterServer, request *protocol.Request) (err error) { +func (self *CoordinatorImpl) proxyUntilSuccess(servers []*cluster.ClusterServer, request *protocol.Request) (err error) { for _, s := range servers { - if s.Id != self.clusterConfiguration.localServerId { + if s.Id != self.clusterConfiguration.LocalServerId { err = self.proxyWrite(s, request) if err == nil { return nil @@ -894,7 +895,7 @@ func (self *CoordinatorImpl) proxyUntilSuccess(servers []*ClusterServer, request return } -func (self *CoordinatorImpl) proxyWrite(clusterServer *ClusterServer, request *protocol.Request) error { +func (self *CoordinatorImpl) proxyWrite(clusterServer *cluster.ClusterServer, request *protocol.Request) error { originatingServerId := request.OriginatingServerId request.OriginatingServerId = nil defer func() { request.OriginatingServerId = originatingServerId }() @@ -984,7 +985,7 @@ func (self *CoordinatorImpl) CreateDatabase(user common.User, db string, replica return nil } -func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*Database, error) { +func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*cluster.Database, error) { if !user.IsClusterAdmin() { return nil, common.NewAuthorizationError("Insufficient permission to list databases") } @@ -1022,16 +1023,16 @@ func (self *CoordinatorImpl) ListSeries(user common.User, database string) ([]*p isDbUser := !user.IsClusterAdmin() responseChannels := make([]chan *protocol.Response, 0, len(servers)+1) for _, server := range servers { - if server.server.Id == self.clusterConfiguration.localServerId { + if server.Server.Id == self.clusterConfiguration.LocalServerId { continue } request := &protocol.Request{Type: &listSeriesRequest, Id: &id, Database: &database, UserName: &userName, IsDbUser: &isDbUser} - if server.ringLocationsToQuery != replicationFactor { - r := server.ringLocationsToQuery + if server.RingLocationToQuery != replicationFactor { + r := server.RingLocationToQuery request.RingLocationsToQuery = &r } responseChan := make(chan *protocol.Response, 3) - server.server.protobufClient.MakeRequest(request, responseChan) + server.Server.MakeRequest(request, responseChan) responseChannels = append(responseChannels, responseChan) } @@ -1072,7 +1073,7 @@ func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error { } else { servers, _ := self.clusterConfiguration.GetServersToMakeQueryTo(&db) for _, server := range servers { - if err := self.handleDropDatabase(server.server, db); err != nil { + if err := self.handleDropDatabase(server.Server, db); err != nil { return err } } @@ -1098,7 +1099,7 @@ func (self *CoordinatorImpl) DropSeries(user common.User, db, series string) err servers, _ := self.clusterConfiguration.GetServersToMakeQueryTo(&db) for _, server := range servers { - if err := self.handleDropSeries(server.server, db, series); err != nil { + if err := self.handleDropSeries(server.Server, db, series); err != nil { return err } } @@ -1108,27 +1109,15 @@ func (self *CoordinatorImpl) DropSeries(user common.User, db, series string) err func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (common.User, error) { log.Debug("(raft:%s) Authenticating password for %s:%s", self.raftServer.(*RaftServer).raftServer.Name(), db, username) - dbUsers := self.clusterConfiguration.dbUsers[db] - if dbUsers == nil || dbUsers[username] == nil { - return nil, common.NewAuthorizationError("Invalid username/password") - } - user := dbUsers[username] - if user.isValidPwd(password) { + user, err := self.clusterConfiguration.AuthenticateDbUser(db, username, password) + if user != nil { log.Debug("(raft:%s) User %s authenticated succesfuly", self.raftServer.(*RaftServer).raftServer.Name(), username) - return user, nil } - return nil, common.NewAuthorizationError("Invalid username/password") + return user, err } func (self *CoordinatorImpl) AuthenticateClusterAdmin(username, password string) (common.User, error) { - user := self.clusterConfiguration.clusterAdmins[username] - if user == nil { - return nil, common.NewAuthorizationError("Invalid username/password") - } - if user.isValidPwd(password) { - return user, nil - } - return nil, common.NewAuthorizationError("Invalid username/password") + return self.clusterConfiguration.AuthenticateClusterAdmin(username, password) } func (self *CoordinatorImpl) ListClusterAdmins(requester common.User) ([]string, error) { @@ -1148,11 +1137,11 @@ func (self *CoordinatorImpl) CreateClusterAdminUser(requester common.User, usern return fmt.Errorf("%s isn't a valid username", username) } - if self.clusterConfiguration.clusterAdmins[username] != nil { + if self.clusterConfiguration.GetClusterAdmin(username) != nil { return fmt.Errorf("User %s already exists", username) } - return self.raftServer.SaveClusterAdminUser(&clusterAdmin{CommonUser{Name: username, CacheKey: username}}) + return self.raftServer.SaveClusterAdminUser(&cluster.ClusterAdmin{cluster.CommonUser{Name: username, CacheKey: username}}) } func (self *CoordinatorImpl) DeleteClusterAdminUser(requester common.User, username string) error { @@ -1160,7 +1149,7 @@ func (self *CoordinatorImpl) DeleteClusterAdminUser(requester common.User, usern return common.NewAuthorizationError("Insufficient permissions") } - user := self.clusterConfiguration.clusterAdmins[username] + user := self.clusterConfiguration.GetClusterAdmin(username) if user == nil { return fmt.Errorf("User %s doesn't exists", username) } @@ -1174,16 +1163,16 @@ func (self *CoordinatorImpl) ChangeClusterAdminPassword(requester common.User, u return common.NewAuthorizationError("Insufficient permissions") } - user := self.clusterConfiguration.clusterAdmins[username] + user := self.clusterConfiguration.GetClusterAdmin(username) if user == nil { return fmt.Errorf("Invalid user name %s", username) } - hash, err := hashPassword(password) + hash, err := cluster.HashPassword(password) if err != nil { return err } - user.changePassword(string(hash)) + user.ChangePassword(string(hash)) return self.raftServer.SaveClusterAdminUser(user) } @@ -1201,13 +1190,12 @@ func (self *CoordinatorImpl) CreateDbUser(requester common.User, db, username st } self.CreateDatabase(requester, db, uint8(1)) // ignore the error since the db may exist - dbUsers := self.clusterConfiguration.dbUsers[db] - if dbUsers != nil && dbUsers[username] != nil { + if self.clusterConfiguration.GetDbUser(db, username) != nil { return fmt.Errorf("User %s already exists", username) } - matchers := []*Matcher{&Matcher{true, ".*"}} + matchers := []*cluster.Matcher{&cluster.Matcher{true, ".*"}} log.Debug("(raft:%s) Creating uesr %s:%s", self.raftServer.(*RaftServer).raftServer.Name(), db, username) - return self.raftServer.SaveDbUser(&dbUser{CommonUser{Name: username, CacheKey: db + "%" + username}, db, matchers, matchers, false}) + return self.raftServer.SaveDbUser(&cluster.DbUser{cluster.CommonUser{Name: username, CacheKey: db + "%" + username}, db, matchers, matchers, false}) } func (self *CoordinatorImpl) DeleteDbUser(requester common.User, db, username string) error { @@ -1215,12 +1203,10 @@ func (self *CoordinatorImpl) DeleteDbUser(requester common.User, db, username st return common.NewAuthorizationError("Insufficient permissions") } - dbUsers := self.clusterConfiguration.dbUsers[db] - if dbUsers == nil || dbUsers[username] == nil { + user := self.clusterConfiguration.GetDbUser(db, username) + if user == nil { return fmt.Errorf("User %s doesn't exists", username) } - - user := dbUsers[username] user.CommonUser.IsUserDeleted = true return self.raftServer.SaveDbUser(user) } @@ -1238,7 +1224,7 @@ func (self *CoordinatorImpl) ChangeDbUserPassword(requester common.User, db, use return common.NewAuthorizationError("Insufficient permissions") } - hash, err := hashPassword(password) + hash, err := cluster.HashPassword(password) if err != nil { return err } @@ -1250,12 +1236,10 @@ func (self *CoordinatorImpl) SetDbAdmin(requester common.User, db, username stri return common.NewAuthorizationError("Insufficient permissions") } - dbUsers := self.clusterConfiguration.dbUsers[db] - if dbUsers == nil || dbUsers[username] == nil { + user := self.clusterConfiguration.GetDbUser(db, username) + if user == nil { return fmt.Errorf("Invalid username %s", username) } - - user := dbUsers[username] user.IsAdmin = isAdmin self.raftServer.SaveDbUser(user) return nil @@ -1294,9 +1278,9 @@ func (self *CoordinatorImpl) ReplicateDelete(request *protocol.Request) error { return nil } -func (self *CoordinatorImpl) sendRequestToReplicas(request *protocol.Request, replicas []*ClusterServer) { +func (self *CoordinatorImpl) sendRequestToReplicas(request *protocol.Request, replicas []*cluster.ClusterServer) { for _, server := range replicas { - if server.Id != self.clusterConfiguration.localServerId { + if server.Id != self.clusterConfiguration.LocalServerId { err := server.MakeRequest(request, nil) if err != nil { log.Warn("REPLICATION ERROR: ", request.GetSequenceNumber(), err) @@ -1306,7 +1290,7 @@ func (self *CoordinatorImpl) sendRequestToReplicas(request *protocol.Request, re } func (self *CoordinatorImpl) sequenceNumberWithServerId(n uint64) uint64 { - return n*HOST_ID_OFFSET + uint64(self.clusterConfiguration.localServerId) + return n*HOST_ID_OFFSET + uint64(self.clusterConfiguration.LocalServerId) } func isValidName(name string) bool { diff --git a/src/coordinator/coordinator_test.go b/src/coordinator/coordinator_test.go index 59e7047fe8..c87ab40377 100644 --- a/src/coordinator/coordinator_test.go +++ b/src/coordinator/coordinator_test.go @@ -1,6 +1,7 @@ package coordinator import ( + "cluster" . "common" "configuration" "datastore" @@ -112,10 +113,14 @@ func clean(servers ...*RaftServer) { } } +func newProtobufClient(connectString string) cluster.ServerConnection { + return NewProtobufClient(connectString) +} + func newConfigAndServer(c *C) *RaftServer { path, err := ioutil.TempDir(os.TempDir(), "influxdb") c.Assert(err, IsNil) - config := NewClusterConfiguration(&configuration.Configuration{}) + config := cluster.NewClusterConfiguration(&configuration.Configuration{}, newProtobufClient) setupConfig := &configuration.Configuration{Hostname: "localhost", RaftDir: path, RaftServerPort: 0} server := NewRaftServer(setupConfig, config) return server @@ -191,6 +196,7 @@ func (self *CoordinatorSuite) TestCanSnapshot(c *C) { server.port = port server.name = name // defer clean(server) + fmt.Println("STARTING UP.......") err = server.ListenAndServe() c.Assert(err, IsNil) time.Sleep(SERVER_STARTUP_TIME) @@ -199,6 +205,7 @@ func (self *CoordinatorSuite) TestCanSnapshot(c *C) { assertConfigContains(server.port, dbname, true, c) } + fmt.Println("NEW SERVER JOINING...") // make another server join the cluster server2 := newConfigAndServer(c) defer clean(server2) @@ -206,6 +213,7 @@ func (self *CoordinatorSuite) TestCanSnapshot(c *C) { c.Assert(err, IsNil) server2.config.SeedServers = []string{fmt.Sprintf("http://localhost:%d", server.port)} server2.Serve(l) + fmt.Println("DONE...") time.Sleep(SERVER_STARTUP_TIME) for i := 0; i < 1000; i++ { dbname := fmt.Sprintf("db%d", i) @@ -278,14 +286,6 @@ func (self *CoordinatorSuite) TestCanElectNewLeaderAndRecover(c *C) { assertConfigContains(servers[2].port, "db6", true, c) } -func (self *UserSuite) BenchmarkHashing(c *C) { - for i := 0; i < c.N; i++ { - pwd := fmt.Sprintf("password%d", i) - _, err := hashPassword(pwd) - c.Assert(err, IsNil) - } -} - func (self *CoordinatorSuite) TestAutomaticDbCreations(c *C) { servers := startAndVerifyCluster(3, c) defer clean(servers...) @@ -311,7 +311,7 @@ func (self *CoordinatorSuite) TestAutomaticDbCreations(c *C) { coordinator := NewCoordinatorImpl(&DatastoreMock{}, server, server.clusterConfig) dbs, err := coordinator.ListDatabases(root) c.Assert(err, IsNil) - c.Assert(dbs, DeepEquals, []*Database{&Database{"db1", 1}}) + c.Assert(dbs, DeepEquals, []*cluster.Database{&cluster.Database{"db1", 1}}) } // if the db is dropped it should remove the users as well @@ -583,7 +583,7 @@ func (self *CoordinatorSuite) TestCanCreateDatabaseWithNameAndReplicationFactor( time.Sleep(REPLICATION_LAG) for i := 0; i < 3; i++ { - databases := servers[i].clusterConfig.databaseReplicationFactors + databases := servers[i].clusterConfig.DatabaseReplicationFactors c.Assert(databases, DeepEquals, map[string]uint8{ "db1": 1, "db2": 1, @@ -655,16 +655,16 @@ func (self *CoordinatorSuite) TestServersGetUniqueIdsAndCanActivateCluster(c *C) defer clean(servers...) // ensure they're all in the same order across the cluster - expectedServers := servers[0].clusterConfig.servers + expectedServers := servers[0].clusterConfig.Servers() for _, server := range servers { - c.Assert(server.clusterConfig.servers, HasLen, len(expectedServers)) + c.Assert(server.clusterConfig.Servers(), HasLen, len(expectedServers)) for i, clusterServer := range expectedServers { - c.Assert(server.clusterConfig.servers[i].Id, Equals, clusterServer.Id) + c.Assert(server.clusterConfig.Servers()[i].Id, Equals, clusterServer.Id) } } // ensure cluster server ids are unique idMap := make(map[uint32]bool) - for _, clusterServer := range servers[0].clusterConfig.servers { + for _, clusterServer := range servers[0].clusterConfig.Servers() { _, ok := idMap[clusterServer.Id] c.Assert(ok, Equals, false) idMap[clusterServer.Id] = true diff --git a/src/coordinator/interface.go b/src/coordinator/interface.go index 14f2c301f7..f71bf42d5b 100644 --- a/src/coordinator/interface.go +++ b/src/coordinator/interface.go @@ -1,6 +1,7 @@ package coordinator import ( + "cluster" "common" "net" "parser" @@ -23,7 +24,7 @@ type Coordinator interface { DropSeries(user common.User, db, series string) error CreateDatabase(user common.User, db string, replicationFactor uint8) error ForceCompaction(user common.User) error - ListDatabases(user common.User) ([]*Database, error) + ListDatabases(user common.User) ([]*cluster.Database, error) ListSeries(user common.User, database string) ([]*protocol.Series, error) ReplicateWrite(request *protocol.Request) error ReplicateDelete(request *protocol.Request) error @@ -67,14 +68,14 @@ type ClusterConsensus interface { DropDatabase(name string) error CreateContinuousQuery(db string, query string) error DeleteContinuousQuery(db string, id uint32) error - SaveClusterAdminUser(u *clusterAdmin) error - SaveDbUser(user *dbUser) error + SaveClusterAdminUser(u *cluster.ClusterAdmin) error + SaveDbUser(user *cluster.DbUser) error ChangeDbUserPassword(db, username string, hash []byte) error // an insert index of -1 will append to the end of the ring - AddServer(server *ClusterServer, insertIndex int) error + AddServer(server *cluster.ClusterServer, insertIndex int) error // only servers that are in a Potential state can be moved around in the ring - MovePotentialServer(server *ClusterServer, insertIndex int) error + MovePotentialServer(server *cluster.ClusterServer, insertIndex int) error /* Activate tells the cluster to start sending writes to this node. The node will also make requests to the other servers to backfill any @@ -82,11 +83,11 @@ type ClusterConsensus interface { Once the new node updates it state to "Running" the other servers will delete all of the data that they no longer have to keep from the ring */ - ActivateServer(server *ClusterServer) error + ActivateServer(server *cluster.ClusterServer) error // Efficient method to have a potential server take the place of a running (or downed) // server. The replacement must have a state of "Potential" for this to work. - ReplaceServer(oldServer *ClusterServer, replacement *ClusterServer) error + ReplaceServer(oldServer *cluster.ClusterServer, replacement *cluster.ClusterServer) error AssignEngineAndCoordinator(engine queryRunner, coordinator *CoordinatorImpl) error diff --git a/src/coordinator/protobuf_client.go b/src/coordinator/protobuf_client.go index 9edd3f00c1..cca074fa1d 100644 --- a/src/coordinator/protobuf_client.go +++ b/src/coordinator/protobuf_client.go @@ -20,6 +20,7 @@ type ProtobufClient struct { requestBuffer map[uint32]*runningRequest connectionStatus uint32 reconnectWait sync.WaitGroup + connectCalled bool } type runningRequest struct { @@ -37,13 +38,19 @@ const ( ) func NewProtobufClient(hostAndPort string) *ProtobufClient { - client := &ProtobufClient{hostAndPort: hostAndPort, requestBuffer: make(map[uint32]*runningRequest), connectionStatus: IS_CONNECTED} + return &ProtobufClient{hostAndPort: hostAndPort, requestBuffer: make(map[uint32]*runningRequest), connectionStatus: IS_CONNECTED} +} + +func (self *ProtobufClient) Connect() { + if self.connectCalled { + return + } + self.connectCalled = true go func() { - client.reconnect() - client.readResponses() + self.reconnect() + self.readResponses() }() - go client.peridicallySweepTimedOutRequests() - return client + go self.peridicallySweepTimedOutRequests() } func (self *ProtobufClient) Close() { diff --git a/src/coordinator/protobuf_request_handler.go b/src/coordinator/protobuf_request_handler.go index 8042a844de..c60ce80572 100644 --- a/src/coordinator/protobuf_request_handler.go +++ b/src/coordinator/protobuf_request_handler.go @@ -2,6 +2,7 @@ package coordinator import ( "bytes" + "cluster" log "code.google.com/p/log4go" "common" "datastore" @@ -16,7 +17,7 @@ import ( type ProtobufRequestHandler struct { db datastore.Datastore coordinator Coordinator - clusterConfig *ClusterConfiguration + clusterConfig *cluster.ClusterConfiguration writeOk protocol.Response_Type } @@ -27,7 +28,7 @@ var ( sequenceNumberResponse = protocol.Response_SEQUENCE_NUMBER ) -func NewProtobufRequestHandler(db datastore.Datastore, coordinator Coordinator, clusterConfig *ClusterConfiguration) *ProtobufRequestHandler { +func NewProtobufRequestHandler(db datastore.Datastore, coordinator Coordinator, clusterConfig *cluster.ClusterConfiguration) *ProtobufRequestHandler { return &ProtobufRequestHandler{db: db, coordinator: coordinator, writeOk: protocol.Response_WRITE_OK, clusterConfig: clusterConfig} } @@ -35,7 +36,7 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con if *request.Type == protocol.Request_PROXY_WRITE { response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk} - request.OriginatingServerId = &self.clusterConfig.localServerId + request.OriginatingServerId = &self.clusterConfig.LocalServerId // TODO: make request logging and datastore write atomic replicationFactor := self.clusterConfig.GetReplicationFactor(request.Database) err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId) @@ -53,7 +54,7 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con } else if *request.Type == protocol.Request_PROXY_DROP_SERIES { response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk} - request.OriginatingServerId = &self.clusterConfig.localServerId + request.OriginatingServerId = &self.clusterConfig.LocalServerId replicationFactor := uint8(*request.ReplicationFactor) // TODO: make request logging and datastore write atomic err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId) @@ -85,7 +86,7 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con } else if *request.Type == protocol.Request_PROXY_DROP_DATABASE { response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk} - request.OriginatingServerId = &self.clusterConfig.localServerId + request.OriginatingServerId = &self.clusterConfig.LocalServerId replicationFactor := uint8(*request.ReplicationFactor) // TODO: make request logging and datastore write atomic err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId) @@ -117,7 +118,7 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con } else if *request.Type == protocol.Request_PROXY_DELETE { response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk} - request.OriginatingServerId = &self.clusterConfig.localServerId + request.OriginatingServerId = &self.clusterConfig.LocalServerId // TODO: make request logging and datastore write atomic replicationFactor := self.clusterConfig.GetReplicationFactor(request.Database) err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId) diff --git a/src/coordinator/raft_server.go b/src/coordinator/raft_server.go index 5f73fc9703..fee7970683 100644 --- a/src/coordinator/raft_server.go +++ b/src/coordinator/raft_server.go @@ -2,6 +2,7 @@ package coordinator import ( "bytes" + "cluster" log "code.google.com/p/log4go" "common" "configuration" @@ -39,7 +40,7 @@ type RaftServer struct { router *mux.Router raftServer raft.Server httpServer *http.Server - clusterConfig *ClusterConfiguration + clusterConfig *cluster.ClusterConfiguration mutex sync.RWMutex listener net.Listener closing bool @@ -58,7 +59,7 @@ var replicateWrite = protocol.Request_REPLICATION_WRITE var replicateDelete = protocol.Request_REPLICATION_DELETE // Creates a new server. -func NewRaftServer(config *configuration.Configuration, clusterConfig *ClusterConfiguration) *RaftServer { +func NewRaftServer(config *configuration.Configuration, clusterConfig *cluster.ClusterConfiguration) *RaftServer { if !registeredCommands { registeredCommands = true for _, command := range internalRaftCommands { @@ -190,7 +191,7 @@ func (s *RaftServer) DropDatabase(name string) error { return err } -func (s *RaftServer) SaveDbUser(u *dbUser) error { +func (s *RaftServer) SaveDbUser(u *cluster.DbUser) error { command := NewSaveDbUserCommand(u) _, err := s.doOrProxyCommand(command, "save_db_user") return err @@ -202,16 +203,16 @@ func (s *RaftServer) ChangeDbUserPassword(db, username string, hash []byte) erro return err } -func (s *RaftServer) SaveClusterAdminUser(u *clusterAdmin) error { +func (s *RaftServer) SaveClusterAdminUser(u *cluster.ClusterAdmin) error { command := NewSaveClusterAdminCommand(u) _, err := s.doOrProxyCommand(command, "save_cluster_admin_user") return err } func (s *RaftServer) CreateRootUser() error { - u := &clusterAdmin{CommonUser{"root", "", false, "root"}} - hash, _ := hashPassword(DEFAULT_ROOT_PWD) - u.changePassword(string(hash)) + u := &cluster.ClusterAdmin{cluster.CommonUser{"root", "", false, "root"}} + hash, _ := cluster.HashPassword(DEFAULT_ROOT_PWD) + u.ChangePassword(string(hash)) return s.SaveClusterAdminUser(u) } @@ -222,27 +223,26 @@ func (s *RaftServer) SetContinuousQueryTimestamp(timestamp time.Time) error { } func (s *RaftServer) CreateContinuousQuery(db string, query string) error { - // if there are already-running queries, we need to initiate a backfill - if !s.clusterConfig.continuousQueryTimestamp.IsZero() { - selectQuery, err := parser.ParseSelectQuery(query) - if err != nil { - return fmt.Errorf("Failed to parse continuous query: %s", query) - } + selectQuery, err := parser.ParseSelectQuery(query) + if err != nil { + return fmt.Errorf("Failed to parse continuous query: %s", query) + } - duration, err := selectQuery.GetGroupByClause().GetGroupByTime() - if err != nil { - return fmt.Errorf("Couldn't get group by time for continuous query: %s", err) - } + duration, err := selectQuery.GetGroupByClause().GetGroupByTime() + if err != nil { + return fmt.Errorf("Couldn't get group by time for continuous query: %s", err) + } - if duration != nil { - zeroTime := time.Time{} - currentBoundary := time.Now().Truncate(*duration) - go s.runContinuousQuery(db, selectQuery, zeroTime, currentBoundary) - } + if duration != nil { + zeroTime := time.Time{} + currentBoundary := time.Now().Truncate(*duration) + go s.runContinuousQuery(db, selectQuery, zeroTime, currentBoundary) + } else { + // TODO: make continuous queries backfill for queries that don't have a group by time } command := NewCreateContinuousQueryCommand(db, query) - _, err := s.doOrProxyCommand(command, "create_cq") + _, err = s.doOrProxyCommand(command, "create_cq") return err } @@ -252,19 +252,19 @@ func (s *RaftServer) DeleteContinuousQuery(db string, id uint32) error { return err } -func (s *RaftServer) ActivateServer(server *ClusterServer) error { +func (s *RaftServer) ActivateServer(server *cluster.ClusterServer) error { return errors.New("not implemented") } -func (s *RaftServer) AddServer(server *ClusterServer, insertIndex int) error { +func (s *RaftServer) AddServer(server *cluster.ClusterServer, insertIndex int) error { return errors.New("not implemented") } -func (s *RaftServer) MovePotentialServer(server *ClusterServer, insertIndex int) error { +func (s *RaftServer) MovePotentialServer(server *cluster.ClusterServer, insertIndex int) error { return errors.New("not implemented") } -func (s *RaftServer) ReplaceServer(oldServer *ClusterServer, replacement *ClusterServer) error { +func (s *RaftServer) ReplaceServer(oldServer *cluster.ClusterServer, replacement *cluster.ClusterServer) error { return errors.New("not implemented") } @@ -356,11 +356,9 @@ func (s *RaftServer) startRaft() error { log.Error(err) } - command := NewAddPotentialServerCommand(&ClusterServer{ - RaftName: name, - RaftConnectionString: connectionString, - ProtobufConnectionString: s.config.ProtobufConnectionString(), - }) + protobufConnectString := s.config.ProtobufConnectionString() + clusterServer := cluster.NewClusterServer(name, connectionString, protobufConnectString, NewProtobufClient(protobufConnectString)) + command := NewAddPotentialServerCommand(clusterServer) _, err = s.doOrProxyCommand(command, "add_server") if err != nil { return err @@ -413,14 +411,14 @@ func (s *RaftServer) raftLeaderLoop(loopTimer *time.Ticker) { } func (s *RaftServer) checkContinuousQueries() { - if s.clusterConfig.continuousQueries == nil || len(s.clusterConfig.continuousQueries) == 0 { + if !s.clusterConfig.HasContinuousQueries() { return } runTime := time.Now() queriesDidRun := false - for db, queries := range s.clusterConfig.parsedContinuousQueries { + for db, queries := range s.clusterConfig.ParsedContinuousQueries { for _, query := range queries { groupByClause := query.GetGroupByClause() @@ -436,9 +434,10 @@ func (s *RaftServer) checkContinuousQueries() { } currentBoundary := runTime.Truncate(*duration) - lastBoundary := s.clusterConfig.continuousQueryTimestamp.Truncate(*duration) + lastRun := s.clusterConfig.LastContinuousQueryRunTime() + lastBoundary := lastRun.Truncate(*duration) - if currentBoundary.After(s.clusterConfig.continuousQueryTimestamp) { + if currentBoundary.After(lastRun) { s.runContinuousQuery(db, query, lastBoundary, currentBoundary) queriesDidRun = true } @@ -446,13 +445,13 @@ func (s *RaftServer) checkContinuousQueries() { } if queriesDidRun { - s.clusterConfig.continuousQueryTimestamp = runTime + s.clusterConfig.SetLastContinuousQueryRunTime(runTime) s.SetContinuousQueryTimestamp(runTime) } } func (s *RaftServer) runContinuousQuery(db string, query *parser.SelectQuery, start time.Time, end time.Time) { - clusterAdmin := s.clusterConfig.clusterAdmins["root"] + clusterAdmin := s.clusterConfig.GetClusterAdmin("root") intoClause := query.GetIntoClause() targetName := intoClause.Target.Name sequenceNumber := uint64(1) @@ -592,16 +591,17 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { // it's a new server the cluster has never seen, make it a potential if server == nil { log.Info("Adding new server to the cluster config %s", command.Name) - addServer := NewAddPotentialServerCommand(&ClusterServer{ - RaftName: command.Name, - RaftConnectionString: command.ConnectionString, - ProtobufConnectionString: command.ProtobufConnectionString, - }) + client := NewProtobufClient(command.ProtobufConnectionString) + fmt.Println("CREATED CLIENT") + clusterServer := cluster.NewClusterServer(command.Name, command.ConnectionString, command.ProtobufConnectionString, client) + fmt.Println("CREATED SERVER") + addServer := NewAddPotentialServerCommand(clusterServer) if _, err := s.raftServer.Do(addServer); err != nil { log.Error("Error joining raft server: ", err, command) http.Error(w, err.Error(), http.StatusInternalServerError) return } + fmt.Println("DID COMMAND...") } log.Info("Server %s already exist in the cluster config", command.Name) } else { @@ -617,15 +617,7 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { } func (s *RaftServer) configHandler(w http.ResponseWriter, req *http.Request) { - jsonObject := make(map[string]interface{}) - dbs := make([]string, 0) - for db, _ := range s.clusterConfig.databaseReplicationFactors { - dbs = append(dbs, db) - } - jsonObject["databases"] = dbs - jsonObject["cluster_admins"] = s.clusterConfig.clusterAdmins - jsonObject["database_users"] = s.clusterConfig.dbUsers - js, err := json.Marshal(jsonObject) + js, err := json.Marshal(s.clusterConfig.GetMapForJsonSerialization()) if err != nil { log.Error("ERROR marshalling config: ", err) } diff --git a/src/coordinator/sharding.go b/src/coordinator/sharding.go new file mode 100644 index 0000000000..82f2fdee9d --- /dev/null +++ b/src/coordinator/sharding.go @@ -0,0 +1,27 @@ +package coordinator + +import ( + "cluster" + "protocol" + "time" +) + +// duration 1h, 1d, 7d +// split duration to n shards +// if n > 1 +// hash using either random or series name + +// These are things that the Coordinator need (defined in Coordinator, will have to import cluster package) +type ShardAwareObject interface { + GetShards(querySpec cluster.QuerySpec) []Shard + GetShardById(id uint32) Shard +} + +type Shard interface { + Id() uint32 + StartTime() time.Time + EndTime() time.Time + SeriesNames() []string + Write([]*protocol.Series) error + Query(cluster.QuerySpec, chan *protocol.Response) error +}