Add cluster interfaces, refactor cluster config and server into cluster package.
* fix backfill of continuous queries to actually workpull/249/head
parent
b8ba76c05d
commit
3df7afaed9
|
@ -2,6 +2,7 @@ package http
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"cluster"
|
||||
"common"
|
||||
"coordinator"
|
||||
"encoding/base64"
|
||||
|
@ -98,7 +99,7 @@ func (self *MockEngine) RunQuery(_ common.User, _ string, query string, localOnl
|
|||
type MockCoordinator struct {
|
||||
coordinator.Coordinator
|
||||
series []*protocol.Series
|
||||
continuousQueries map[string][]*coordinator.ContinuousQuery
|
||||
continuousQueries map[string][]*cluster.ContinuousQuery
|
||||
deleteQueries []*parser.DeleteQuery
|
||||
db string
|
||||
droppedDb string
|
||||
|
@ -119,8 +120,8 @@ func (self *MockCoordinator) CreateDatabase(_ common.User, db string, _ uint8) e
|
|||
return nil
|
||||
}
|
||||
|
||||
func (self *MockCoordinator) ListDatabases(_ common.User) ([]*coordinator.Database, error) {
|
||||
return []*coordinator.Database{&coordinator.Database{"db1", 1}, &coordinator.Database{"db2", 1}}, nil
|
||||
func (self *MockCoordinator) ListDatabases(_ common.User) ([]*cluster.Database, error) {
|
||||
return []*cluster.Database{&cluster.Database{"db1", 1}, &cluster.Database{"db2", 1}}, nil
|
||||
}
|
||||
|
||||
func (self *MockCoordinator) DropDatabase(_ common.User, db string) error {
|
||||
|
@ -154,7 +155,7 @@ func (self *MockCoordinator) ListContinuousQueries(_ common.User, db string) ([]
|
|||
}
|
||||
|
||||
func (self *MockCoordinator) CreateContinuousQuery(_ common.User, db string, query string) error {
|
||||
self.continuousQueries[db] = append(self.continuousQueries[db], &coordinator.ContinuousQuery{2, query})
|
||||
self.continuousQueries[db] = append(self.continuousQueries[db], &cluster.ContinuousQuery{2, query})
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -172,9 +173,9 @@ func (self *ApiSuite) formatUrl(path string, args ...interface{}) string {
|
|||
|
||||
func (self *ApiSuite) SetUpSuite(c *C) {
|
||||
self.coordinator = &MockCoordinator{
|
||||
continuousQueries: map[string][]*coordinator.ContinuousQuery{
|
||||
"db1": []*coordinator.ContinuousQuery{
|
||||
&coordinator.ContinuousQuery{1, "select * from foo into bar;"},
|
||||
continuousQueries: map[string][]*cluster.ContinuousQuery{
|
||||
"db1": []*cluster.ContinuousQuery{
|
||||
&cluster.ContinuousQuery{1, "select * from foo into bar;"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -753,10 +754,10 @@ func (self *ApiSuite) TestDatabasesIndex(c *C) {
|
|||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
c.Assert(err, IsNil)
|
||||
users := []*coordinator.Database{}
|
||||
users := []*cluster.Database{}
|
||||
err = json.Unmarshal(body, &users)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(users, DeepEquals, []*coordinator.Database{&coordinator.Database{"db1", uint8(1)}, &coordinator.Database{"db2", uint8(1)}})
|
||||
c.Assert(users, DeepEquals, []*cluster.Database{&cluster.Database{"db1", uint8(1)}, &cluster.Database{"db2", uint8(1)}})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -771,10 +772,10 @@ func (self *ApiSuite) TestBasicAuthentication(c *C) {
|
|||
body, err := ioutil.ReadAll(resp.Body)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(resp.StatusCode, Equals, libhttp.StatusOK)
|
||||
users := []*coordinator.Database{}
|
||||
users := []*cluster.Database{}
|
||||
err = json.Unmarshal(body, &users)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(users, DeepEquals, []*coordinator.Database{&coordinator.Database{"db1", 1}, &coordinator.Database{"db2", 1}})
|
||||
c.Assert(users, DeepEquals, []*cluster.Database{&cluster.Database{"db1", 1}, &cluster.Database{"db2", 1}})
|
||||
}
|
||||
|
||||
func (self *ApiSuite) TestContinuousQueryOperations(c *C) {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package coordinator
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
@ -14,6 +14,15 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
// defined by cluster config (in cluster package)
|
||||
type QuerySpec interface {
|
||||
GetStartTime() time.Time
|
||||
GetEndTime() time.Time
|
||||
Database() string
|
||||
TableNames() []string
|
||||
IsRegex() bool
|
||||
}
|
||||
|
||||
/*
|
||||
This struct stores all the metadata confiugration information about a running cluster. This includes
|
||||
the servers in the cluster and their state, databases, users, and which continuous queries are running.
|
||||
|
@ -25,22 +34,23 @@ import (
|
|||
*/
|
||||
type ClusterConfiguration struct {
|
||||
createDatabaseLock sync.RWMutex
|
||||
databaseReplicationFactors map[string]uint8
|
||||
DatabaseReplicationFactors map[string]uint8
|
||||
usersLock sync.RWMutex
|
||||
clusterAdmins map[string]*clusterAdmin
|
||||
dbUsers map[string]map[string]*dbUser
|
||||
clusterAdmins map[string]*ClusterAdmin
|
||||
dbUsers map[string]map[string]*DbUser
|
||||
servers []*ClusterServer
|
||||
serversLock sync.RWMutex
|
||||
continuousQueries map[string][]*ContinuousQuery
|
||||
continuousQueriesLock sync.RWMutex
|
||||
parsedContinuousQueries map[string]map[uint32]*parser.SelectQuery
|
||||
ParsedContinuousQueries map[string]map[uint32]*parser.SelectQuery
|
||||
continuousQueryTimestamp time.Time
|
||||
hasRunningServers bool
|
||||
localServerId uint32
|
||||
LocalServerId uint32
|
||||
ClusterVersion uint32
|
||||
config *configuration.Configuration
|
||||
addedLocalServerWait chan bool
|
||||
addedLocalServer bool
|
||||
connectionCreator func(string) ServerConnection
|
||||
}
|
||||
|
||||
type ContinuousQuery struct {
|
||||
|
@ -53,16 +63,17 @@ type Database struct {
|
|||
ReplicationFactor uint8 `json:"replicationFactor"`
|
||||
}
|
||||
|
||||
func NewClusterConfiguration(config *configuration.Configuration) *ClusterConfiguration {
|
||||
func NewClusterConfiguration(config *configuration.Configuration, connectionCreator func(string) ServerConnection) *ClusterConfiguration {
|
||||
return &ClusterConfiguration{
|
||||
databaseReplicationFactors: make(map[string]uint8),
|
||||
clusterAdmins: make(map[string]*clusterAdmin),
|
||||
dbUsers: make(map[string]map[string]*dbUser),
|
||||
DatabaseReplicationFactors: make(map[string]uint8),
|
||||
clusterAdmins: make(map[string]*ClusterAdmin),
|
||||
dbUsers: make(map[string]map[string]*DbUser),
|
||||
continuousQueries: make(map[string][]*ContinuousQuery),
|
||||
parsedContinuousQueries: make(map[string]map[uint32]*parser.SelectQuery),
|
||||
ParsedContinuousQueries: make(map[string]map[uint32]*parser.SelectQuery),
|
||||
servers: make([]*ClusterServer, 0),
|
||||
config: config,
|
||||
addedLocalServerWait: make(chan bool, 1),
|
||||
connectionCreator: connectionCreator,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -83,7 +94,7 @@ func (self *ClusterConfiguration) WaitForLocalServerLoaded() {
|
|||
}
|
||||
|
||||
func (self *ClusterConfiguration) GetReplicationFactor(database *string) uint8 {
|
||||
return self.databaseReplicationFactors[*database]
|
||||
return self.DatabaseReplicationFactors[*database]
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) IsActive() bool {
|
||||
|
@ -118,9 +129,9 @@ func (self *ClusterConfiguration) GetServerById(id *uint32) *ClusterServer {
|
|||
return nil
|
||||
}
|
||||
|
||||
type serverToQuery struct {
|
||||
server *ClusterServer
|
||||
ringLocationsToQuery uint32
|
||||
type ServerToQuery struct {
|
||||
Server *ClusterServer
|
||||
RingLocationToQuery uint32
|
||||
}
|
||||
|
||||
// This function will return an array of servers to query and the number of ring locations to return per server.
|
||||
|
@ -130,21 +141,21 @@ type serverToQuery struct {
|
|||
// if you have a cluster with databases with RFs of 1, 2, and 3: optimal cluster sizes would be 6, 12, 18, 24, 30, etc.
|
||||
// If that's not the case, one or more servers will have to filter out data from other servers on the fly, which could
|
||||
// be a little more expensive.
|
||||
func (self *ClusterConfiguration) GetServersToMakeQueryTo(database *string) (servers []*serverToQuery, replicationFactor uint32) {
|
||||
func (self *ClusterConfiguration) GetServersToMakeQueryTo(database *string) (servers []*ServerToQuery, replicationFactor uint32) {
|
||||
replicationFactor = uint32(self.GetReplicationFactor(database))
|
||||
replicationFactorInt := int(replicationFactor)
|
||||
index := 0
|
||||
for i, s := range self.servers {
|
||||
if s.Id == self.localServerId {
|
||||
if s.Id == self.LocalServerId {
|
||||
index = i % replicationFactorInt
|
||||
break
|
||||
}
|
||||
}
|
||||
servers = make([]*serverToQuery, 0, len(self.servers)/replicationFactorInt)
|
||||
servers = make([]*ServerToQuery, 0, len(self.servers)/replicationFactorInt)
|
||||
serverCount := len(self.servers)
|
||||
for ; index < serverCount; index += replicationFactorInt {
|
||||
server := self.servers[index]
|
||||
servers = append(servers, &serverToQuery{server, replicationFactor})
|
||||
servers = append(servers, &ServerToQuery{server, replicationFactor})
|
||||
}
|
||||
// need to maybe add a server and set which ones filter their data
|
||||
if serverCount%replicationFactorInt != 0 {
|
||||
|
@ -175,10 +186,10 @@ func (self *ClusterConfiguration) GetServersToMakeQueryTo(database *string) (ser
|
|||
*/
|
||||
lastIndexAdded := index - replicationFactorInt
|
||||
if serverCount-lastIndexAdded == replicationFactorInt {
|
||||
servers[0].ringLocationsToQuery = uint32(replicationFactorInt - 1)
|
||||
servers = append(servers, &serverToQuery{self.servers[0], replicationFactor})
|
||||
servers[0].RingLocationToQuery = uint32(replicationFactorInt - 1)
|
||||
servers = append(servers, &ServerToQuery{self.servers[0], replicationFactor})
|
||||
} else {
|
||||
servers[0].ringLocationsToQuery = uint32(replicationFactorInt - 1)
|
||||
servers[0].RingLocationToQuery = uint32(replicationFactorInt - 1)
|
||||
}
|
||||
}
|
||||
return servers, replicationFactor
|
||||
|
@ -198,7 +209,7 @@ func (self *ClusterConfiguration) GetRingFilterFunction(database string, countOf
|
|||
serversToInclude := make([]*ClusterServer, 0, countOfServersToInclude)
|
||||
countServers := int(countOfServersToInclude)
|
||||
for i, s := range self.servers {
|
||||
if s.Id == self.localServerId {
|
||||
if s.Id == self.LocalServerId {
|
||||
serversToInclude = append(serversToInclude, s)
|
||||
for j := i - 1; j >= 0 && len(serversToInclude) < countServers; j-- {
|
||||
serversToInclude = append(serversToInclude, self.servers[j])
|
||||
|
@ -307,7 +318,7 @@ func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) {
|
|||
server.Connect()
|
||||
} else if !self.addedLocalServer {
|
||||
log.Info("Added the local server")
|
||||
self.localServerId = server.Id
|
||||
self.LocalServerId = server.Id
|
||||
self.addedLocalServerWait <- true
|
||||
self.addedLocalServer = true
|
||||
}
|
||||
|
@ -317,8 +328,8 @@ func (self *ClusterConfiguration) GetDatabases() []*Database {
|
|||
self.createDatabaseLock.RLock()
|
||||
defer self.createDatabaseLock.RUnlock()
|
||||
|
||||
dbs := make([]*Database, 0, len(self.databaseReplicationFactors))
|
||||
for name, rf := range self.databaseReplicationFactors {
|
||||
dbs := make([]*Database, 0, len(self.DatabaseReplicationFactors))
|
||||
for name, rf := range self.DatabaseReplicationFactors {
|
||||
dbs = append(dbs, &Database{Name: name, ReplicationFactor: rf})
|
||||
}
|
||||
return dbs
|
||||
|
@ -328,10 +339,10 @@ func (self *ClusterConfiguration) CreateDatabase(name string, replicationFactor
|
|||
self.createDatabaseLock.Lock()
|
||||
defer self.createDatabaseLock.Unlock()
|
||||
|
||||
if _, ok := self.databaseReplicationFactors[name]; ok {
|
||||
if _, ok := self.DatabaseReplicationFactors[name]; ok {
|
||||
return fmt.Errorf("database %s exists", name)
|
||||
}
|
||||
self.databaseReplicationFactors[name] = replicationFactor
|
||||
self.DatabaseReplicationFactors[name] = replicationFactor
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -339,11 +350,11 @@ func (self *ClusterConfiguration) DropDatabase(name string) error {
|
|||
self.createDatabaseLock.Lock()
|
||||
defer self.createDatabaseLock.Unlock()
|
||||
|
||||
if _, ok := self.databaseReplicationFactors[name]; !ok {
|
||||
if _, ok := self.DatabaseReplicationFactors[name]; !ok {
|
||||
return fmt.Errorf("Database %s doesn't exist", name)
|
||||
}
|
||||
|
||||
delete(self.databaseReplicationFactors, name)
|
||||
delete(self.DatabaseReplicationFactors, name)
|
||||
|
||||
self.usersLock.Lock()
|
||||
defer self.usersLock.Unlock()
|
||||
|
@ -360,8 +371,8 @@ func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string)
|
|||
self.continuousQueries = map[string][]*ContinuousQuery{}
|
||||
}
|
||||
|
||||
if self.parsedContinuousQueries == nil {
|
||||
self.parsedContinuousQueries = map[string]map[uint32]*parser.SelectQuery{}
|
||||
if self.ParsedContinuousQueries == nil {
|
||||
self.ParsedContinuousQueries = map[string]map[uint32]*parser.SelectQuery{}
|
||||
}
|
||||
|
||||
maxId := uint32(0)
|
||||
|
@ -377,10 +388,10 @@ func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string)
|
|||
}
|
||||
|
||||
queryId := maxId + 1
|
||||
if self.parsedContinuousQueries[db] == nil {
|
||||
self.parsedContinuousQueries[db] = map[uint32]*parser.SelectQuery{queryId: selectQuery}
|
||||
if self.ParsedContinuousQueries[db] == nil {
|
||||
self.ParsedContinuousQueries[db] = map[uint32]*parser.SelectQuery{queryId: selectQuery}
|
||||
} else {
|
||||
self.parsedContinuousQueries[db][queryId] = selectQuery
|
||||
self.ParsedContinuousQueries[db][queryId] = selectQuery
|
||||
}
|
||||
self.continuousQueries[db] = append(self.continuousQueries[db], &ContinuousQuery{queryId, query})
|
||||
|
||||
|
@ -405,7 +416,7 @@ func (self *ClusterConfiguration) DeleteContinuousQuery(db string, id uint32) er
|
|||
q := self.continuousQueries[db]
|
||||
q[len(q)-1], q[i], q = nil, q[len(q)-1], q[:len(q)-1]
|
||||
self.continuousQueries[db] = q
|
||||
delete(self.parsedContinuousQueries[db], id)
|
||||
delete(self.ParsedContinuousQueries[db], id)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -431,7 +442,7 @@ func (self *ClusterConfiguration) GetDbUsers(db string) (names []string) {
|
|||
return
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) GetDbUser(db, username string) *dbUser {
|
||||
func (self *ClusterConfiguration) GetDbUser(db, username string) *DbUser {
|
||||
self.usersLock.RLock()
|
||||
defer self.usersLock.RUnlock()
|
||||
|
||||
|
@ -442,7 +453,7 @@ func (self *ClusterConfiguration) GetDbUser(db, username string) *dbUser {
|
|||
return dbUsers[username]
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) SaveDbUser(u *dbUser) {
|
||||
func (self *ClusterConfiguration) SaveDbUser(u *DbUser) {
|
||||
self.usersLock.Lock()
|
||||
defer self.usersLock.Unlock()
|
||||
db := u.GetDb()
|
||||
|
@ -455,7 +466,7 @@ func (self *ClusterConfiguration) SaveDbUser(u *dbUser) {
|
|||
return
|
||||
}
|
||||
if dbUsers == nil {
|
||||
dbUsers = map[string]*dbUser{}
|
||||
dbUsers = map[string]*DbUser{}
|
||||
self.dbUsers[db] = dbUsers
|
||||
}
|
||||
dbUsers[u.GetName()] = u
|
||||
|
@ -471,7 +482,7 @@ func (self *ClusterConfiguration) ChangeDbUserPassword(db, username, hash string
|
|||
if dbUsers[username] == nil {
|
||||
return fmt.Errorf("Invalid username %s", username)
|
||||
}
|
||||
dbUsers[username].changePassword(hash)
|
||||
dbUsers[username].ChangePassword(hash)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -486,14 +497,14 @@ func (self *ClusterConfiguration) GetClusterAdmins() (names []string) {
|
|||
return
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) GetClusterAdmin(username string) *clusterAdmin {
|
||||
func (self *ClusterConfiguration) GetClusterAdmin(username string) *ClusterAdmin {
|
||||
self.usersLock.RLock()
|
||||
defer self.usersLock.RUnlock()
|
||||
|
||||
return self.clusterAdmins[username]
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) SaveClusterAdmin(u *clusterAdmin) {
|
||||
func (self *ClusterConfiguration) SaveClusterAdmin(u *ClusterAdmin) {
|
||||
self.usersLock.Lock()
|
||||
defer self.usersLock.Unlock()
|
||||
if u.IsDeleted() {
|
||||
|
@ -506,26 +517,26 @@ func (self *ClusterConfiguration) SaveClusterAdmin(u *clusterAdmin) {
|
|||
func (self *ClusterConfiguration) GetDatabaseReplicationFactor(name string) uint8 {
|
||||
self.createDatabaseLock.RLock()
|
||||
defer self.createDatabaseLock.RUnlock()
|
||||
return self.databaseReplicationFactors[name]
|
||||
return self.DatabaseReplicationFactors[name]
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) Save() ([]byte, error) {
|
||||
log.Debug("Dumping the cluster configuration")
|
||||
data := struct {
|
||||
Databases map[string]uint8
|
||||
Admins map[string]*clusterAdmin
|
||||
DbUsers map[string]map[string]*dbUser
|
||||
Admins map[string]*ClusterAdmin
|
||||
DbUsers map[string]map[string]*DbUser
|
||||
Servers []*ClusterServer
|
||||
HasRunningServers bool
|
||||
LocalServerId uint32
|
||||
ClusterVersion uint32
|
||||
}{
|
||||
self.databaseReplicationFactors,
|
||||
self.DatabaseReplicationFactors,
|
||||
self.clusterAdmins,
|
||||
self.dbUsers,
|
||||
self.servers,
|
||||
self.hasRunningServers,
|
||||
self.localServerId,
|
||||
self.LocalServerId,
|
||||
self.ClusterVersion,
|
||||
}
|
||||
|
||||
|
@ -542,8 +553,8 @@ func (self *ClusterConfiguration) Recovery(b []byte) error {
|
|||
log.Debug("Recovering the cluster configuration")
|
||||
data := struct {
|
||||
Databases map[string]uint8
|
||||
Admins map[string]*clusterAdmin
|
||||
DbUsers map[string]map[string]*dbUser
|
||||
Admins map[string]*ClusterAdmin
|
||||
DbUsers map[string]map[string]*DbUser
|
||||
Servers []*ClusterServer
|
||||
HasRunningServers bool
|
||||
LocalServerId uint32
|
||||
|
@ -555,26 +566,29 @@ func (self *ClusterConfiguration) Recovery(b []byte) error {
|
|||
return err
|
||||
}
|
||||
|
||||
self.databaseReplicationFactors = data.Databases
|
||||
self.DatabaseReplicationFactors = data.Databases
|
||||
self.clusterAdmins = data.Admins
|
||||
self.dbUsers = data.DbUsers
|
||||
|
||||
// copy the protobuf client from the old servers
|
||||
oldServers := map[string]*ProtobufClient{}
|
||||
oldServers := map[string]ServerConnection{}
|
||||
for _, server := range self.servers {
|
||||
oldServers[server.ProtobufConnectionString] = server.protobufClient
|
||||
oldServers[server.ProtobufConnectionString] = server.connection
|
||||
}
|
||||
|
||||
self.servers = data.Servers
|
||||
for _, server := range self.servers {
|
||||
server.protobufClient = oldServers[server.ProtobufConnectionString]
|
||||
if server.protobufClient == nil {
|
||||
server.Connect()
|
||||
server.connection = oldServers[server.ProtobufConnectionString]
|
||||
if server.connection == nil {
|
||||
server.connection = self.connectionCreator(server.ProtobufConnectionString)
|
||||
if server.ProtobufConnectionString != self.config.ProtobufConnectionString() {
|
||||
server.Connect()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.hasRunningServers = data.HasRunningServers
|
||||
self.localServerId = data.LocalServerId
|
||||
self.LocalServerId = data.LocalServerId
|
||||
self.ClusterVersion = data.ClusterVersion
|
||||
|
||||
if self.addedLocalServer {
|
||||
|
@ -594,3 +608,50 @@ func (self *ClusterConfiguration) Recovery(b []byte) error {
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) AuthenticateDbUser(db, username, password string) (common.User, error) {
|
||||
dbUsers := self.dbUsers[db]
|
||||
if dbUsers == nil || dbUsers[username] == nil {
|
||||
return nil, common.NewAuthorizationError("Invalid username/password")
|
||||
}
|
||||
user := dbUsers[username]
|
||||
if user.isValidPwd(password) {
|
||||
return user, nil
|
||||
}
|
||||
return nil, common.NewAuthorizationError("Invalid username/password")
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) AuthenticateClusterAdmin(username, password string) (common.User, error) {
|
||||
user := self.clusterAdmins[username]
|
||||
if user == nil {
|
||||
return nil, common.NewAuthorizationError("Invalid username/password")
|
||||
}
|
||||
if user.isValidPwd(password) {
|
||||
return user, nil
|
||||
}
|
||||
return nil, common.NewAuthorizationError("Invalid username/password")
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) HasContinuousQueries() bool {
|
||||
return self.continuousQueries != nil && len(self.continuousQueries) > 0
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) LastContinuousQueryRunTime() time.Time {
|
||||
return self.continuousQueryTimestamp
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) SetLastContinuousQueryRunTime(t time.Time) {
|
||||
self.continuousQueryTimestamp = t
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) GetMapForJsonSerialization() map[string]interface{} {
|
||||
jsonObject := make(map[string]interface{})
|
||||
dbs := make([]string, 0)
|
||||
for db, _ := range self.DatabaseReplicationFactors {
|
||||
dbs = append(dbs, db)
|
||||
}
|
||||
jsonObject["databases"] = dbs
|
||||
jsonObject["cluster_admins"] = self.clusterAdmins
|
||||
jsonObject["database_users"] = self.dbUsers
|
||||
return jsonObject
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package coordinator
|
||||
package cluster
|
||||
|
||||
import (
|
||||
log "code.google.com/p/log4go"
|
||||
|
@ -12,7 +12,12 @@ type ClusterServer struct {
|
|||
State ServerState
|
||||
RaftConnectionString string
|
||||
ProtobufConnectionString string
|
||||
protobufClient *ProtobufClient
|
||||
connection ServerConnection
|
||||
}
|
||||
|
||||
type ServerConnection interface {
|
||||
Connect()
|
||||
MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error
|
||||
}
|
||||
|
||||
type ServerState int
|
||||
|
@ -25,10 +30,20 @@ const (
|
|||
Potential
|
||||
)
|
||||
|
||||
func NewClusterServer(raftName, raftConnectionString, protobufConnectionString string, connection ServerConnection) *ClusterServer {
|
||||
return &ClusterServer{
|
||||
RaftName: raftName,
|
||||
RaftConnectionString: raftConnectionString,
|
||||
ProtobufConnectionString: protobufConnectionString,
|
||||
connection: connection,
|
||||
}
|
||||
}
|
||||
|
||||
// in the coordinator test we don't want to create protobuf servers,
|
||||
// so we just ignore creating a protobuf client when the connection
|
||||
// string has a 0 port
|
||||
func shouldConnect(addr string) bool {
|
||||
log.Debug("SHOULD CONNECT: ", addr)
|
||||
_, port, err := net.SplitHostPort(addr)
|
||||
if err != nil {
|
||||
log.Error("Error parsing address '%s': %s", addr, err)
|
||||
|
@ -43,19 +58,15 @@ func shouldConnect(addr string) bool {
|
|||
}
|
||||
|
||||
func (self *ClusterServer) Connect() {
|
||||
if self.protobufClient != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if !shouldConnect(self.ProtobufConnectionString) {
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("ClusterServer: %d connecting to: %s", self.Id, self.ProtobufConnectionString)
|
||||
self.protobufClient = NewProtobufClient(self.ProtobufConnectionString)
|
||||
self.connection.Connect()
|
||||
}
|
||||
|
||||
func (self *ClusterServer) MakeRequest(request *protocol.Request, responseStream chan *protocol.Response) error {
|
||||
self.Connect()
|
||||
return self.protobufClient.MakeRequest(request, responseStream)
|
||||
return self.connection.MakeRequest(request, responseStream)
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package coordinator
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"code.google.com/p/go.crypto/bcrypt"
|
||||
|
@ -40,7 +40,7 @@ func (self *CommonUser) IsDeleted() bool {
|
|||
return self.IsUserDeleted
|
||||
}
|
||||
|
||||
func (self *CommonUser) changePassword(hash string) error {
|
||||
func (self *CommonUser) ChangePassword(hash string) error {
|
||||
self.Hash = hash
|
||||
userCache.Delete(self.CacheKey)
|
||||
return nil
|
||||
|
@ -78,23 +78,23 @@ func (self *CommonUser) HasReadAccess(name string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
type clusterAdmin struct {
|
||||
type ClusterAdmin struct {
|
||||
CommonUser `json:"common"`
|
||||
}
|
||||
|
||||
func (self *clusterAdmin) IsClusterAdmin() bool {
|
||||
func (self *ClusterAdmin) IsClusterAdmin() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (self *clusterAdmin) HasWriteAccess(_ string) bool {
|
||||
func (self *ClusterAdmin) HasWriteAccess(_ string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (self *clusterAdmin) HasReadAccess(_ string) bool {
|
||||
func (self *ClusterAdmin) HasReadAccess(_ string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type dbUser struct {
|
||||
type DbUser struct {
|
||||
CommonUser `json:"common"`
|
||||
Db string `json:"db"`
|
||||
WriteTo []*Matcher `json:"write_matchers"`
|
||||
|
@ -102,11 +102,11 @@ type dbUser struct {
|
|||
IsAdmin bool `json:"is_admin"`
|
||||
}
|
||||
|
||||
func (self *dbUser) IsDbAdmin(db string) bool {
|
||||
func (self *DbUser) IsDbAdmin(db string) bool {
|
||||
return self.IsAdmin && self.Db == db
|
||||
}
|
||||
|
||||
func (self *dbUser) HasWriteAccess(name string) bool {
|
||||
func (self *DbUser) HasWriteAccess(name string) bool {
|
||||
for _, matcher := range self.WriteTo {
|
||||
if matcher.Matches(name) {
|
||||
return true
|
||||
|
@ -116,7 +116,7 @@ func (self *dbUser) HasWriteAccess(name string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (self *dbUser) HasReadAccess(name string) bool {
|
||||
func (self *DbUser) HasReadAccess(name string) bool {
|
||||
for _, matcher := range self.ReadFrom {
|
||||
if matcher.Matches(name) {
|
||||
return true
|
||||
|
@ -126,13 +126,11 @@ func (self *dbUser) HasReadAccess(name string) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (self *dbUser) GetDb() string {
|
||||
func (self *DbUser) GetDb() string {
|
||||
return self.Db
|
||||
}
|
||||
|
||||
// private funcs
|
||||
|
||||
func hashPassword(password string) ([]byte, error) {
|
||||
func HashPassword(password string) ([]byte, error) {
|
||||
// The second arg is the cost of the hashing, higher is slower but makes it harder
|
||||
// to brute force, since it will be really slow and impractical
|
||||
return bcrypt.GenerateFromPassword([]byte(password), 10)
|
|
@ -1,4 +1,4 @@
|
|||
package coordinator
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"common"
|
||||
|
@ -13,7 +13,7 @@ var root common.User
|
|||
|
||||
func (self *UserSuite) SetUpSuite(c *C) {
|
||||
user := &clusterAdmin{CommonUser{"root", "", false, "root"}}
|
||||
c.Assert(user.changePassword("password"), IsNil)
|
||||
c.Assert(user.ChangePassword("password"), IsNil)
|
||||
root = user
|
||||
}
|
||||
|
||||
|
@ -21,9 +21,9 @@ func (self *UserSuite) TestProperties(c *C) {
|
|||
u := clusterAdmin{CommonUser{Name: "root"}}
|
||||
c.Assert(u.IsClusterAdmin(), Equals, true)
|
||||
c.Assert(u.GetName(), Equals, "root")
|
||||
hash, err := hashPassword("foobar")
|
||||
hash, err := HashPassword("foobar")
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(u.changePassword(string(hash)), IsNil)
|
||||
c.Assert(u.ChangePassword(string(hash)), IsNil)
|
||||
c.Assert(u.isValidPwd("foobar"), Equals, true)
|
||||
c.Assert(u.isValidPwd("password"), Equals, false)
|
||||
|
||||
|
@ -31,9 +31,9 @@ func (self *UserSuite) TestProperties(c *C) {
|
|||
c.Assert(dbUser.IsClusterAdmin(), Equals, false)
|
||||
c.Assert(dbUser.IsDbAdmin("db"), Equals, true)
|
||||
c.Assert(dbUser.GetName(), Equals, "db_user")
|
||||
hash, err = hashPassword("password")
|
||||
hash, err = HashPassword("password")
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(dbUser.changePassword(string(hash)), IsNil)
|
||||
c.Assert(dbUser.ChangePassword(string(hash)), IsNil)
|
||||
c.Assert(dbUser.isValidPwd("password"), Equals, true)
|
||||
c.Assert(dbUser.isValidPwd("password1"), Equals, false)
|
||||
}
|
|
@ -50,6 +50,7 @@ func (self *ClientServerSuite) TestClientCanMakeRequests(c *C) {
|
|||
go protobufServer.ListenAndServe()
|
||||
c.Assert(protobufServer, Not(IsNil))
|
||||
protobufClient := NewProtobufClient("localhost:8091")
|
||||
protobufClient.Connect()
|
||||
responseStream := make(chan *protocol.Response, 1)
|
||||
|
||||
mock := `
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
package coordinator
|
||||
|
||||
import (
|
||||
"cluster"
|
||||
log "code.google.com/p/log4go"
|
||||
"fmt"
|
||||
"github.com/goraft/raft"
|
||||
"time"
|
||||
)
|
||||
|
@ -39,7 +41,7 @@ func (c *SetContinuousQueryTimestampCommand) CommandName() string {
|
|||
}
|
||||
|
||||
func (c *SetContinuousQueryTimestampCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*ClusterConfiguration)
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
err := config.SetContinuousQueryTimestamp(c.Timestamp)
|
||||
return nil, err
|
||||
}
|
||||
|
@ -58,7 +60,7 @@ func (c *CreateContinuousQueryCommand) CommandName() string {
|
|||
}
|
||||
|
||||
func (c *CreateContinuousQueryCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*ClusterConfiguration)
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
err := config.CreateContinuousQuery(c.Database, c.Query)
|
||||
return nil, err
|
||||
}
|
||||
|
@ -77,7 +79,7 @@ func (c *DeleteContinuousQueryCommand) CommandName() string {
|
|||
}
|
||||
|
||||
func (c *DeleteContinuousQueryCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*ClusterConfiguration)
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
err := config.DeleteContinuousQuery(c.Database, c.Id)
|
||||
return nil, err
|
||||
}
|
||||
|
@ -95,7 +97,7 @@ func (c *DropDatabaseCommand) CommandName() string {
|
|||
}
|
||||
|
||||
func (c *DropDatabaseCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*ClusterConfiguration)
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
err := config.DropDatabase(c.Name)
|
||||
return nil, err
|
||||
}
|
||||
|
@ -114,16 +116,16 @@ func (c *CreateDatabaseCommand) CommandName() string {
|
|||
}
|
||||
|
||||
func (c *CreateDatabaseCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*ClusterConfiguration)
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
err := config.CreateDatabase(c.Name, c.ReplicationFactor)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
type SaveDbUserCommand struct {
|
||||
User *dbUser `json:"user"`
|
||||
User *cluster.DbUser `json:"user"`
|
||||
}
|
||||
|
||||
func NewSaveDbUserCommand(u *dbUser) *SaveDbUserCommand {
|
||||
func NewSaveDbUserCommand(u *cluster.DbUser) *SaveDbUserCommand {
|
||||
return &SaveDbUserCommand{
|
||||
User: u,
|
||||
}
|
||||
|
@ -134,7 +136,7 @@ func (c *SaveDbUserCommand) CommandName() string {
|
|||
}
|
||||
|
||||
func (c *SaveDbUserCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*ClusterConfiguration)
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
config.SaveDbUser(c.User)
|
||||
log.Debug("(raft:%s) Created user %s:%s", server.Name(), c.User.Db, c.User.Name)
|
||||
return nil, nil
|
||||
|
@ -160,15 +162,15 @@ func (c *ChangeDbUserPassword) CommandName() string {
|
|||
|
||||
func (c *ChangeDbUserPassword) Apply(server raft.Server) (interface{}, error) {
|
||||
log.Debug("(raft:%s) changing db user password for %s:%s", server.Name(), c.Database, c.Username)
|
||||
config := server.Context().(*ClusterConfiguration)
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
return nil, config.ChangeDbUserPassword(c.Database, c.Username, c.Hash)
|
||||
}
|
||||
|
||||
type SaveClusterAdminCommand struct {
|
||||
User *clusterAdmin `json:"user"`
|
||||
User *cluster.ClusterAdmin `json:"user"`
|
||||
}
|
||||
|
||||
func NewSaveClusterAdminCommand(u *clusterAdmin) *SaveClusterAdminCommand {
|
||||
func NewSaveClusterAdminCommand(u *cluster.ClusterAdmin) *SaveClusterAdminCommand {
|
||||
return &SaveClusterAdminCommand{
|
||||
User: u,
|
||||
}
|
||||
|
@ -179,16 +181,16 @@ func (c *SaveClusterAdminCommand) CommandName() string {
|
|||
}
|
||||
|
||||
func (c *SaveClusterAdminCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*ClusterConfiguration)
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
config.SaveClusterAdmin(c.User)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type AddPotentialServerCommand struct {
|
||||
Server *ClusterServer
|
||||
Server *cluster.ClusterServer
|
||||
}
|
||||
|
||||
func NewAddPotentialServerCommand(s *ClusterServer) *AddPotentialServerCommand {
|
||||
func NewAddPotentialServerCommand(s *cluster.ClusterServer) *AddPotentialServerCommand {
|
||||
return &AddPotentialServerCommand{Server: s}
|
||||
}
|
||||
|
||||
|
@ -197,17 +199,18 @@ func (c *AddPotentialServerCommand) CommandName() string {
|
|||
}
|
||||
|
||||
func (c *AddPotentialServerCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*ClusterConfiguration)
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
fmt.Println("COMMMAND: adding potent server")
|
||||
config.AddPotentialServer(c.Server)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type UpdateServerStateCommand struct {
|
||||
ServerId uint32
|
||||
State ServerState
|
||||
State cluster.ServerState
|
||||
}
|
||||
|
||||
func NewUpdateServerStateCommand(serverId uint32, state ServerState) *UpdateServerStateCommand {
|
||||
func NewUpdateServerStateCommand(serverId uint32, state cluster.ServerState) *UpdateServerStateCommand {
|
||||
return &UpdateServerStateCommand{ServerId: serverId, State: state}
|
||||
}
|
||||
|
||||
|
@ -216,7 +219,7 @@ func (c *UpdateServerStateCommand) CommandName() string {
|
|||
}
|
||||
|
||||
func (c *UpdateServerStateCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
config := server.Context().(*ClusterConfiguration)
|
||||
config := server.Context().(*cluster.ClusterConfiguration)
|
||||
err := config.UpdateServerState(c.ServerId, c.State)
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package coordinator
|
||||
|
||||
import (
|
||||
"cluster"
|
||||
log "code.google.com/p/log4go"
|
||||
"common"
|
||||
"datastore"
|
||||
|
@ -18,7 +19,7 @@ import (
|
|||
)
|
||||
|
||||
type CoordinatorImpl struct {
|
||||
clusterConfiguration *ClusterConfiguration
|
||||
clusterConfiguration *cluster.ClusterConfiguration
|
||||
raftServer ClusterConsensus
|
||||
datastore datastore.Datastore
|
||||
requestId uint32
|
||||
|
@ -67,7 +68,7 @@ func init() {
|
|||
}
|
||||
}
|
||||
|
||||
func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsensus, clusterConfiguration *ClusterConfiguration) *CoordinatorImpl {
|
||||
func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsensus, clusterConfiguration *cluster.ClusterConfiguration) *CoordinatorImpl {
|
||||
coordinator := &CoordinatorImpl{
|
||||
clusterConfiguration: clusterConfiguration,
|
||||
raftServer: raftServer,
|
||||
|
@ -98,18 +99,18 @@ func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query
|
|||
isDbUser := !user.IsClusterAdmin()
|
||||
responseChannels := make([]chan *protocol.Response, 0, len(servers)+1)
|
||||
queryString := query.GetQueryString()
|
||||
var localServerToQuery *serverToQuery
|
||||
var localServerToQuery *cluster.ServerToQuery
|
||||
for _, server := range servers {
|
||||
if server.server.Id == self.clusterConfiguration.localServerId {
|
||||
if server.Server.Id == self.clusterConfiguration.LocalServerId {
|
||||
localServerToQuery = server
|
||||
} else {
|
||||
request := &protocol.Request{Type: &queryRequest, Query: &queryString, Id: &id, Database: &db, UserName: &userName, IsDbUser: &isDbUser}
|
||||
if server.ringLocationsToQuery != replicationFactor {
|
||||
r := server.ringLocationsToQuery
|
||||
if server.RingLocationToQuery != replicationFactor {
|
||||
r := server.RingLocationToQuery
|
||||
request.RingLocationsToQuery = &r
|
||||
}
|
||||
responseChan := make(chan *protocol.Response, 3)
|
||||
server.server.MakeRequest(request, responseChan)
|
||||
server.Server.MakeRequest(request, responseChan)
|
||||
responseChannels = append(responseChannels, responseChan)
|
||||
}
|
||||
}
|
||||
|
@ -131,8 +132,8 @@ func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query
|
|||
|
||||
go func() {
|
||||
var ringFilter func(database, series *string, time *int64) bool
|
||||
if replicationFactor != localServerToQuery.ringLocationsToQuery {
|
||||
ringFilter = self.clusterConfiguration.GetRingFilterFunction(db, localServerToQuery.ringLocationsToQuery)
|
||||
if replicationFactor != localServerToQuery.RingLocationToQuery {
|
||||
ringFilter = self.clusterConfiguration.GetRingFilterFunction(db, localServerToQuery.RingLocationToQuery)
|
||||
}
|
||||
self.datastore.ExecuteQuery(user, db, query, sendFromLocal, ringFilter)
|
||||
local <- &protocol.Response{Type: &endStreamResponse}
|
||||
|
@ -419,11 +420,11 @@ func (self *CoordinatorImpl) SyncLogIteration() {
|
|||
servers := self.clusterConfiguration.Servers()
|
||||
|
||||
replicationFactors := map[uint8]bool{}
|
||||
for _, replicationFactor := range self.clusterConfiguration.databaseReplicationFactors {
|
||||
for _, replicationFactor := range self.clusterConfiguration.DatabaseReplicationFactors {
|
||||
replicationFactors[replicationFactor] = true
|
||||
}
|
||||
|
||||
localId := self.clusterConfiguration.localServerId
|
||||
localId := self.clusterConfiguration.LocalServerId
|
||||
|
||||
for replicationFactor, _ := range replicationFactors {
|
||||
for _, owningServer := range servers {
|
||||
|
@ -478,7 +479,7 @@ func (self *CoordinatorImpl) SyncLogIteration() {
|
|||
}
|
||||
}
|
||||
|
||||
func (self *CoordinatorImpl) getLastAndCurrentSequenceNumbers(replicationFactor uint8, originatingServer, owningServer *ClusterServer) (uint64, uint64, error) {
|
||||
func (self *CoordinatorImpl) getLastAndCurrentSequenceNumbers(replicationFactor uint8, originatingServer, owningServer *cluster.ClusterServer) (uint64, uint64, error) {
|
||||
lastKnownSequenceNumber, err := self.GetLastSequenceNumber(replicationFactor, originatingServer.Id, owningServer.Id)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
|
@ -497,7 +498,7 @@ func (self *CoordinatorImpl) GetLastSequenceNumber(replicationFactor uint8, orig
|
|||
&replicationFactor, &owningServer, &originatingServer)
|
||||
}
|
||||
|
||||
func (self *CoordinatorImpl) getCurrentSequenceNumber(replicationFactor uint8, originatingServer, owningServer *ClusterServer) (uint64, error) {
|
||||
func (self *CoordinatorImpl) getCurrentSequenceNumber(replicationFactor uint8, originatingServer, owningServer *cluster.ClusterServer) (uint64, error) {
|
||||
id := atomic.AddUint32(&self.requestId, uint32(1))
|
||||
replicationFactor32 := uint32(replicationFactor)
|
||||
database := ""
|
||||
|
@ -611,9 +612,9 @@ func (self *CoordinatorImpl) WriteSeriesData(user common.User, db string, series
|
|||
}
|
||||
|
||||
func (self *CoordinatorImpl) ProcessContinuousQueries(db string, series *protocol.Series) {
|
||||
if self.clusterConfiguration.parsedContinuousQueries != nil {
|
||||
if self.clusterConfiguration.ParsedContinuousQueries != nil {
|
||||
incomingSeriesName := *series.Name
|
||||
for _, query := range self.clusterConfiguration.parsedContinuousQueries[db] {
|
||||
for _, query := range self.clusterConfiguration.ParsedContinuousQueries[db] {
|
||||
groupByClause := query.GetGroupByClause()
|
||||
if groupByClause.Elems != nil {
|
||||
continue
|
||||
|
@ -722,7 +723,7 @@ func (self *CoordinatorImpl) DeleteSeriesData(user common.User, db string, query
|
|||
|
||||
servers, _ := self.clusterConfiguration.GetServersToMakeQueryTo(&db)
|
||||
for _, server := range servers {
|
||||
if err := self.handleSeriesDelete(user, server.server, db, query); err != nil {
|
||||
if err := self.handleSeriesDelete(user, server.Server, db, query); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -739,17 +740,17 @@ func (self *CoordinatorImpl) createRequest(requestType protocol.Request_Type, da
|
|||
return &protocol.Request{Type: &requestType, Database: database, Id: &id}
|
||||
}
|
||||
|
||||
func (self *CoordinatorImpl) handleSeriesDelete(user common.User, server *ClusterServer, database string, query *parser.DeleteQuery) error {
|
||||
func (self *CoordinatorImpl) handleSeriesDelete(user common.User, server *cluster.ClusterServer, database string, query *parser.DeleteQuery) error {
|
||||
owner, servers := self.clusterConfiguration.GetReplicas(server, &database)
|
||||
|
||||
request := self.createRequest(proxyDelete, &database)
|
||||
queryStr := query.GetQueryStringWithTimeCondition()
|
||||
request.Query = &queryStr
|
||||
request.OriginatingServerId = &self.clusterConfiguration.localServerId
|
||||
request.OriginatingServerId = &self.clusterConfiguration.LocalServerId
|
||||
request.ClusterVersion = &self.clusterConfiguration.ClusterVersion
|
||||
request.OwnerServerId = &owner.Id
|
||||
|
||||
if server.Id == self.clusterConfiguration.localServerId {
|
||||
if server.Id == self.clusterConfiguration.LocalServerId {
|
||||
// this is a local delete
|
||||
replicationFactor := self.clusterConfiguration.GetReplicationFactor(&database)
|
||||
err := self.datastore.LogRequestAndAssignSequenceNumber(request, &replicationFactor, &owner.Id)
|
||||
|
@ -771,17 +772,17 @@ func (self *CoordinatorImpl) handleSeriesDelete(user common.User, server *Cluste
|
|||
return self.proxyUntilSuccess(servers, request)
|
||||
}
|
||||
|
||||
func (self *CoordinatorImpl) handleDropDatabase(server *ClusterServer, database string) error {
|
||||
func (self *CoordinatorImpl) handleDropDatabase(server *cluster.ClusterServer, database string) error {
|
||||
owner, servers := self.clusterConfiguration.GetReplicas(server, &database)
|
||||
|
||||
request := self.createRequest(proxyDropDatabase, &database)
|
||||
request.OriginatingServerId = &self.clusterConfiguration.localServerId
|
||||
request.OriginatingServerId = &self.clusterConfiguration.LocalServerId
|
||||
request.ClusterVersion = &self.clusterConfiguration.ClusterVersion
|
||||
request.OwnerServerId = &owner.Id
|
||||
replicationFactor := uint32(self.clusterConfiguration.GetDatabaseReplicationFactor(database))
|
||||
request.ReplicationFactor = &replicationFactor
|
||||
|
||||
if server.Id == self.clusterConfiguration.localServerId {
|
||||
if server.Id == self.clusterConfiguration.LocalServerId {
|
||||
// this is a local delete
|
||||
replicationFactor := self.clusterConfiguration.GetReplicationFactor(&database)
|
||||
err := self.datastore.LogRequestAndAssignSequenceNumber(request, &replicationFactor, &owner.Id)
|
||||
|
@ -803,18 +804,18 @@ func (self *CoordinatorImpl) handleDropDatabase(server *ClusterServer, database
|
|||
return self.proxyUntilSuccess(servers, request)
|
||||
}
|
||||
|
||||
func (self *CoordinatorImpl) handleDropSeries(server *ClusterServer, database, series string) error {
|
||||
func (self *CoordinatorImpl) handleDropSeries(server *cluster.ClusterServer, database, series string) error {
|
||||
owner, servers := self.clusterConfiguration.GetReplicas(server, &database)
|
||||
|
||||
request := self.createRequest(proxyDropSeries, &database)
|
||||
request.OriginatingServerId = &self.clusterConfiguration.localServerId
|
||||
request.OriginatingServerId = &self.clusterConfiguration.LocalServerId
|
||||
request.ClusterVersion = &self.clusterConfiguration.ClusterVersion
|
||||
request.OwnerServerId = &owner.Id
|
||||
request.Series = &protocol.Series{Name: &series}
|
||||
replicationFactor := uint32(self.clusterConfiguration.GetDatabaseReplicationFactor(database))
|
||||
request.ReplicationFactor = &replicationFactor
|
||||
|
||||
if server.Id == self.clusterConfiguration.localServerId {
|
||||
if server.Id == self.clusterConfiguration.LocalServerId {
|
||||
// this is a local delete
|
||||
replicationFactor := self.clusterConfiguration.GetReplicationFactor(&database)
|
||||
err := self.datastore.LogRequestAndAssignSequenceNumber(request, &replicationFactor, &owner.Id)
|
||||
|
@ -851,12 +852,12 @@ func (self *CoordinatorImpl) handleClusterWrite(serverIndex *int, db *string, se
|
|||
|
||||
request := self.createRequest(proxyWrite, db)
|
||||
request.Series = series
|
||||
request.OriginatingServerId = &self.clusterConfiguration.localServerId
|
||||
request.OriginatingServerId = &self.clusterConfiguration.LocalServerId
|
||||
request.ClusterVersion = &self.clusterConfiguration.ClusterVersion
|
||||
request.OwnerServerId = &owner.Id
|
||||
|
||||
for _, s := range servers {
|
||||
if s.Id == self.clusterConfiguration.localServerId {
|
||||
if s.Id == self.clusterConfiguration.LocalServerId {
|
||||
// TODO: make storing of the data and logging of the request atomic
|
||||
replicationFactor := self.clusterConfiguration.GetReplicationFactor(db)
|
||||
err := self.datastore.LogRequestAndAssignSequenceNumber(request, &replicationFactor, &owner.Id)
|
||||
|
@ -882,9 +883,9 @@ func (self *CoordinatorImpl) handleClusterWrite(serverIndex *int, db *string, se
|
|||
|
||||
// This method will attemp to proxy the request until the call to proxy returns nil. If no server succeeds,
|
||||
// the last err value will be returned.
|
||||
func (self *CoordinatorImpl) proxyUntilSuccess(servers []*ClusterServer, request *protocol.Request) (err error) {
|
||||
func (self *CoordinatorImpl) proxyUntilSuccess(servers []*cluster.ClusterServer, request *protocol.Request) (err error) {
|
||||
for _, s := range servers {
|
||||
if s.Id != self.clusterConfiguration.localServerId {
|
||||
if s.Id != self.clusterConfiguration.LocalServerId {
|
||||
err = self.proxyWrite(s, request)
|
||||
if err == nil {
|
||||
return nil
|
||||
|
@ -894,7 +895,7 @@ func (self *CoordinatorImpl) proxyUntilSuccess(servers []*ClusterServer, request
|
|||
return
|
||||
}
|
||||
|
||||
func (self *CoordinatorImpl) proxyWrite(clusterServer *ClusterServer, request *protocol.Request) error {
|
||||
func (self *CoordinatorImpl) proxyWrite(clusterServer *cluster.ClusterServer, request *protocol.Request) error {
|
||||
originatingServerId := request.OriginatingServerId
|
||||
request.OriginatingServerId = nil
|
||||
defer func() { request.OriginatingServerId = originatingServerId }()
|
||||
|
@ -984,7 +985,7 @@ func (self *CoordinatorImpl) CreateDatabase(user common.User, db string, replica
|
|||
return nil
|
||||
}
|
||||
|
||||
func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*Database, error) {
|
||||
func (self *CoordinatorImpl) ListDatabases(user common.User) ([]*cluster.Database, error) {
|
||||
if !user.IsClusterAdmin() {
|
||||
return nil, common.NewAuthorizationError("Insufficient permission to list databases")
|
||||
}
|
||||
|
@ -1022,16 +1023,16 @@ func (self *CoordinatorImpl) ListSeries(user common.User, database string) ([]*p
|
|||
isDbUser := !user.IsClusterAdmin()
|
||||
responseChannels := make([]chan *protocol.Response, 0, len(servers)+1)
|
||||
for _, server := range servers {
|
||||
if server.server.Id == self.clusterConfiguration.localServerId {
|
||||
if server.Server.Id == self.clusterConfiguration.LocalServerId {
|
||||
continue
|
||||
}
|
||||
request := &protocol.Request{Type: &listSeriesRequest, Id: &id, Database: &database, UserName: &userName, IsDbUser: &isDbUser}
|
||||
if server.ringLocationsToQuery != replicationFactor {
|
||||
r := server.ringLocationsToQuery
|
||||
if server.RingLocationToQuery != replicationFactor {
|
||||
r := server.RingLocationToQuery
|
||||
request.RingLocationsToQuery = &r
|
||||
}
|
||||
responseChan := make(chan *protocol.Response, 3)
|
||||
server.server.protobufClient.MakeRequest(request, responseChan)
|
||||
server.Server.MakeRequest(request, responseChan)
|
||||
responseChannels = append(responseChannels, responseChan)
|
||||
}
|
||||
|
||||
|
@ -1072,7 +1073,7 @@ func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error {
|
|||
} else {
|
||||
servers, _ := self.clusterConfiguration.GetServersToMakeQueryTo(&db)
|
||||
for _, server := range servers {
|
||||
if err := self.handleDropDatabase(server.server, db); err != nil {
|
||||
if err := self.handleDropDatabase(server.Server, db); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -1098,7 +1099,7 @@ func (self *CoordinatorImpl) DropSeries(user common.User, db, series string) err
|
|||
|
||||
servers, _ := self.clusterConfiguration.GetServersToMakeQueryTo(&db)
|
||||
for _, server := range servers {
|
||||
if err := self.handleDropSeries(server.server, db, series); err != nil {
|
||||
if err := self.handleDropSeries(server.Server, db, series); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -1108,27 +1109,15 @@ func (self *CoordinatorImpl) DropSeries(user common.User, db, series string) err
|
|||
|
||||
func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (common.User, error) {
|
||||
log.Debug("(raft:%s) Authenticating password for %s:%s", self.raftServer.(*RaftServer).raftServer.Name(), db, username)
|
||||
dbUsers := self.clusterConfiguration.dbUsers[db]
|
||||
if dbUsers == nil || dbUsers[username] == nil {
|
||||
return nil, common.NewAuthorizationError("Invalid username/password")
|
||||
}
|
||||
user := dbUsers[username]
|
||||
if user.isValidPwd(password) {
|
||||
user, err := self.clusterConfiguration.AuthenticateDbUser(db, username, password)
|
||||
if user != nil {
|
||||
log.Debug("(raft:%s) User %s authenticated succesfuly", self.raftServer.(*RaftServer).raftServer.Name(), username)
|
||||
return user, nil
|
||||
}
|
||||
return nil, common.NewAuthorizationError("Invalid username/password")
|
||||
return user, err
|
||||
}
|
||||
|
||||
func (self *CoordinatorImpl) AuthenticateClusterAdmin(username, password string) (common.User, error) {
|
||||
user := self.clusterConfiguration.clusterAdmins[username]
|
||||
if user == nil {
|
||||
return nil, common.NewAuthorizationError("Invalid username/password")
|
||||
}
|
||||
if user.isValidPwd(password) {
|
||||
return user, nil
|
||||
}
|
||||
return nil, common.NewAuthorizationError("Invalid username/password")
|
||||
return self.clusterConfiguration.AuthenticateClusterAdmin(username, password)
|
||||
}
|
||||
|
||||
func (self *CoordinatorImpl) ListClusterAdmins(requester common.User) ([]string, error) {
|
||||
|
@ -1148,11 +1137,11 @@ func (self *CoordinatorImpl) CreateClusterAdminUser(requester common.User, usern
|
|||
return fmt.Errorf("%s isn't a valid username", username)
|
||||
}
|
||||
|
||||
if self.clusterConfiguration.clusterAdmins[username] != nil {
|
||||
if self.clusterConfiguration.GetClusterAdmin(username) != nil {
|
||||
return fmt.Errorf("User %s already exists", username)
|
||||
}
|
||||
|
||||
return self.raftServer.SaveClusterAdminUser(&clusterAdmin{CommonUser{Name: username, CacheKey: username}})
|
||||
return self.raftServer.SaveClusterAdminUser(&cluster.ClusterAdmin{cluster.CommonUser{Name: username, CacheKey: username}})
|
||||
}
|
||||
|
||||
func (self *CoordinatorImpl) DeleteClusterAdminUser(requester common.User, username string) error {
|
||||
|
@ -1160,7 +1149,7 @@ func (self *CoordinatorImpl) DeleteClusterAdminUser(requester common.User, usern
|
|||
return common.NewAuthorizationError("Insufficient permissions")
|
||||
}
|
||||
|
||||
user := self.clusterConfiguration.clusterAdmins[username]
|
||||
user := self.clusterConfiguration.GetClusterAdmin(username)
|
||||
if user == nil {
|
||||
return fmt.Errorf("User %s doesn't exists", username)
|
||||
}
|
||||
|
@ -1174,16 +1163,16 @@ func (self *CoordinatorImpl) ChangeClusterAdminPassword(requester common.User, u
|
|||
return common.NewAuthorizationError("Insufficient permissions")
|
||||
}
|
||||
|
||||
user := self.clusterConfiguration.clusterAdmins[username]
|
||||
user := self.clusterConfiguration.GetClusterAdmin(username)
|
||||
if user == nil {
|
||||
return fmt.Errorf("Invalid user name %s", username)
|
||||
}
|
||||
|
||||
hash, err := hashPassword(password)
|
||||
hash, err := cluster.HashPassword(password)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
user.changePassword(string(hash))
|
||||
user.ChangePassword(string(hash))
|
||||
return self.raftServer.SaveClusterAdminUser(user)
|
||||
}
|
||||
|
||||
|
@ -1201,13 +1190,12 @@ func (self *CoordinatorImpl) CreateDbUser(requester common.User, db, username st
|
|||
}
|
||||
|
||||
self.CreateDatabase(requester, db, uint8(1)) // ignore the error since the db may exist
|
||||
dbUsers := self.clusterConfiguration.dbUsers[db]
|
||||
if dbUsers != nil && dbUsers[username] != nil {
|
||||
if self.clusterConfiguration.GetDbUser(db, username) != nil {
|
||||
return fmt.Errorf("User %s already exists", username)
|
||||
}
|
||||
matchers := []*Matcher{&Matcher{true, ".*"}}
|
||||
matchers := []*cluster.Matcher{&cluster.Matcher{true, ".*"}}
|
||||
log.Debug("(raft:%s) Creating uesr %s:%s", self.raftServer.(*RaftServer).raftServer.Name(), db, username)
|
||||
return self.raftServer.SaveDbUser(&dbUser{CommonUser{Name: username, CacheKey: db + "%" + username}, db, matchers, matchers, false})
|
||||
return self.raftServer.SaveDbUser(&cluster.DbUser{cluster.CommonUser{Name: username, CacheKey: db + "%" + username}, db, matchers, matchers, false})
|
||||
}
|
||||
|
||||
func (self *CoordinatorImpl) DeleteDbUser(requester common.User, db, username string) error {
|
||||
|
@ -1215,12 +1203,10 @@ func (self *CoordinatorImpl) DeleteDbUser(requester common.User, db, username st
|
|||
return common.NewAuthorizationError("Insufficient permissions")
|
||||
}
|
||||
|
||||
dbUsers := self.clusterConfiguration.dbUsers[db]
|
||||
if dbUsers == nil || dbUsers[username] == nil {
|
||||
user := self.clusterConfiguration.GetDbUser(db, username)
|
||||
if user == nil {
|
||||
return fmt.Errorf("User %s doesn't exists", username)
|
||||
}
|
||||
|
||||
user := dbUsers[username]
|
||||
user.CommonUser.IsUserDeleted = true
|
||||
return self.raftServer.SaveDbUser(user)
|
||||
}
|
||||
|
@ -1238,7 +1224,7 @@ func (self *CoordinatorImpl) ChangeDbUserPassword(requester common.User, db, use
|
|||
return common.NewAuthorizationError("Insufficient permissions")
|
||||
}
|
||||
|
||||
hash, err := hashPassword(password)
|
||||
hash, err := cluster.HashPassword(password)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1250,12 +1236,10 @@ func (self *CoordinatorImpl) SetDbAdmin(requester common.User, db, username stri
|
|||
return common.NewAuthorizationError("Insufficient permissions")
|
||||
}
|
||||
|
||||
dbUsers := self.clusterConfiguration.dbUsers[db]
|
||||
if dbUsers == nil || dbUsers[username] == nil {
|
||||
user := self.clusterConfiguration.GetDbUser(db, username)
|
||||
if user == nil {
|
||||
return fmt.Errorf("Invalid username %s", username)
|
||||
}
|
||||
|
||||
user := dbUsers[username]
|
||||
user.IsAdmin = isAdmin
|
||||
self.raftServer.SaveDbUser(user)
|
||||
return nil
|
||||
|
@ -1294,9 +1278,9 @@ func (self *CoordinatorImpl) ReplicateDelete(request *protocol.Request) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (self *CoordinatorImpl) sendRequestToReplicas(request *protocol.Request, replicas []*ClusterServer) {
|
||||
func (self *CoordinatorImpl) sendRequestToReplicas(request *protocol.Request, replicas []*cluster.ClusterServer) {
|
||||
for _, server := range replicas {
|
||||
if server.Id != self.clusterConfiguration.localServerId {
|
||||
if server.Id != self.clusterConfiguration.LocalServerId {
|
||||
err := server.MakeRequest(request, nil)
|
||||
if err != nil {
|
||||
log.Warn("REPLICATION ERROR: ", request.GetSequenceNumber(), err)
|
||||
|
@ -1306,7 +1290,7 @@ func (self *CoordinatorImpl) sendRequestToReplicas(request *protocol.Request, re
|
|||
}
|
||||
|
||||
func (self *CoordinatorImpl) sequenceNumberWithServerId(n uint64) uint64 {
|
||||
return n*HOST_ID_OFFSET + uint64(self.clusterConfiguration.localServerId)
|
||||
return n*HOST_ID_OFFSET + uint64(self.clusterConfiguration.LocalServerId)
|
||||
}
|
||||
|
||||
func isValidName(name string) bool {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package coordinator
|
||||
|
||||
import (
|
||||
"cluster"
|
||||
. "common"
|
||||
"configuration"
|
||||
"datastore"
|
||||
|
@ -112,10 +113,14 @@ func clean(servers ...*RaftServer) {
|
|||
}
|
||||
}
|
||||
|
||||
func newProtobufClient(connectString string) cluster.ServerConnection {
|
||||
return NewProtobufClient(connectString)
|
||||
}
|
||||
|
||||
func newConfigAndServer(c *C) *RaftServer {
|
||||
path, err := ioutil.TempDir(os.TempDir(), "influxdb")
|
||||
c.Assert(err, IsNil)
|
||||
config := NewClusterConfiguration(&configuration.Configuration{})
|
||||
config := cluster.NewClusterConfiguration(&configuration.Configuration{}, newProtobufClient)
|
||||
setupConfig := &configuration.Configuration{Hostname: "localhost", RaftDir: path, RaftServerPort: 0}
|
||||
server := NewRaftServer(setupConfig, config)
|
||||
return server
|
||||
|
@ -191,6 +196,7 @@ func (self *CoordinatorSuite) TestCanSnapshot(c *C) {
|
|||
server.port = port
|
||||
server.name = name
|
||||
// defer clean(server)
|
||||
fmt.Println("STARTING UP.......")
|
||||
err = server.ListenAndServe()
|
||||
c.Assert(err, IsNil)
|
||||
time.Sleep(SERVER_STARTUP_TIME)
|
||||
|
@ -199,6 +205,7 @@ func (self *CoordinatorSuite) TestCanSnapshot(c *C) {
|
|||
assertConfigContains(server.port, dbname, true, c)
|
||||
}
|
||||
|
||||
fmt.Println("NEW SERVER JOINING...")
|
||||
// make another server join the cluster
|
||||
server2 := newConfigAndServer(c)
|
||||
defer clean(server2)
|
||||
|
@ -206,6 +213,7 @@ func (self *CoordinatorSuite) TestCanSnapshot(c *C) {
|
|||
c.Assert(err, IsNil)
|
||||
server2.config.SeedServers = []string{fmt.Sprintf("http://localhost:%d", server.port)}
|
||||
server2.Serve(l)
|
||||
fmt.Println("DONE...")
|
||||
time.Sleep(SERVER_STARTUP_TIME)
|
||||
for i := 0; i < 1000; i++ {
|
||||
dbname := fmt.Sprintf("db%d", i)
|
||||
|
@ -278,14 +286,6 @@ func (self *CoordinatorSuite) TestCanElectNewLeaderAndRecover(c *C) {
|
|||
assertConfigContains(servers[2].port, "db6", true, c)
|
||||
}
|
||||
|
||||
func (self *UserSuite) BenchmarkHashing(c *C) {
|
||||
for i := 0; i < c.N; i++ {
|
||||
pwd := fmt.Sprintf("password%d", i)
|
||||
_, err := hashPassword(pwd)
|
||||
c.Assert(err, IsNil)
|
||||
}
|
||||
}
|
||||
|
||||
func (self *CoordinatorSuite) TestAutomaticDbCreations(c *C) {
|
||||
servers := startAndVerifyCluster(3, c)
|
||||
defer clean(servers...)
|
||||
|
@ -311,7 +311,7 @@ func (self *CoordinatorSuite) TestAutomaticDbCreations(c *C) {
|
|||
coordinator := NewCoordinatorImpl(&DatastoreMock{}, server, server.clusterConfig)
|
||||
dbs, err := coordinator.ListDatabases(root)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(dbs, DeepEquals, []*Database{&Database{"db1", 1}})
|
||||
c.Assert(dbs, DeepEquals, []*cluster.Database{&cluster.Database{"db1", 1}})
|
||||
}
|
||||
|
||||
// if the db is dropped it should remove the users as well
|
||||
|
@ -583,7 +583,7 @@ func (self *CoordinatorSuite) TestCanCreateDatabaseWithNameAndReplicationFactor(
|
|||
time.Sleep(REPLICATION_LAG)
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
databases := servers[i].clusterConfig.databaseReplicationFactors
|
||||
databases := servers[i].clusterConfig.DatabaseReplicationFactors
|
||||
c.Assert(databases, DeepEquals, map[string]uint8{
|
||||
"db1": 1,
|
||||
"db2": 1,
|
||||
|
@ -655,16 +655,16 @@ func (self *CoordinatorSuite) TestServersGetUniqueIdsAndCanActivateCluster(c *C)
|
|||
defer clean(servers...)
|
||||
|
||||
// ensure they're all in the same order across the cluster
|
||||
expectedServers := servers[0].clusterConfig.servers
|
||||
expectedServers := servers[0].clusterConfig.Servers()
|
||||
for _, server := range servers {
|
||||
c.Assert(server.clusterConfig.servers, HasLen, len(expectedServers))
|
||||
c.Assert(server.clusterConfig.Servers(), HasLen, len(expectedServers))
|
||||
for i, clusterServer := range expectedServers {
|
||||
c.Assert(server.clusterConfig.servers[i].Id, Equals, clusterServer.Id)
|
||||
c.Assert(server.clusterConfig.Servers()[i].Id, Equals, clusterServer.Id)
|
||||
}
|
||||
}
|
||||
// ensure cluster server ids are unique
|
||||
idMap := make(map[uint32]bool)
|
||||
for _, clusterServer := range servers[0].clusterConfig.servers {
|
||||
for _, clusterServer := range servers[0].clusterConfig.Servers() {
|
||||
_, ok := idMap[clusterServer.Id]
|
||||
c.Assert(ok, Equals, false)
|
||||
idMap[clusterServer.Id] = true
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package coordinator
|
||||
|
||||
import (
|
||||
"cluster"
|
||||
"common"
|
||||
"net"
|
||||
"parser"
|
||||
|
@ -23,7 +24,7 @@ type Coordinator interface {
|
|||
DropSeries(user common.User, db, series string) error
|
||||
CreateDatabase(user common.User, db string, replicationFactor uint8) error
|
||||
ForceCompaction(user common.User) error
|
||||
ListDatabases(user common.User) ([]*Database, error)
|
||||
ListDatabases(user common.User) ([]*cluster.Database, error)
|
||||
ListSeries(user common.User, database string) ([]*protocol.Series, error)
|
||||
ReplicateWrite(request *protocol.Request) error
|
||||
ReplicateDelete(request *protocol.Request) error
|
||||
|
@ -67,14 +68,14 @@ type ClusterConsensus interface {
|
|||
DropDatabase(name string) error
|
||||
CreateContinuousQuery(db string, query string) error
|
||||
DeleteContinuousQuery(db string, id uint32) error
|
||||
SaveClusterAdminUser(u *clusterAdmin) error
|
||||
SaveDbUser(user *dbUser) error
|
||||
SaveClusterAdminUser(u *cluster.ClusterAdmin) error
|
||||
SaveDbUser(user *cluster.DbUser) error
|
||||
ChangeDbUserPassword(db, username string, hash []byte) error
|
||||
|
||||
// an insert index of -1 will append to the end of the ring
|
||||
AddServer(server *ClusterServer, insertIndex int) error
|
||||
AddServer(server *cluster.ClusterServer, insertIndex int) error
|
||||
// only servers that are in a Potential state can be moved around in the ring
|
||||
MovePotentialServer(server *ClusterServer, insertIndex int) error
|
||||
MovePotentialServer(server *cluster.ClusterServer, insertIndex int) error
|
||||
/*
|
||||
Activate tells the cluster to start sending writes to this node.
|
||||
The node will also make requests to the other servers to backfill any
|
||||
|
@ -82,11 +83,11 @@ type ClusterConsensus interface {
|
|||
Once the new node updates it state to "Running" the other servers will
|
||||
delete all of the data that they no longer have to keep from the ring
|
||||
*/
|
||||
ActivateServer(server *ClusterServer) error
|
||||
ActivateServer(server *cluster.ClusterServer) error
|
||||
|
||||
// Efficient method to have a potential server take the place of a running (or downed)
|
||||
// server. The replacement must have a state of "Potential" for this to work.
|
||||
ReplaceServer(oldServer *ClusterServer, replacement *ClusterServer) error
|
||||
ReplaceServer(oldServer *cluster.ClusterServer, replacement *cluster.ClusterServer) error
|
||||
|
||||
AssignEngineAndCoordinator(engine queryRunner, coordinator *CoordinatorImpl) error
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ type ProtobufClient struct {
|
|||
requestBuffer map[uint32]*runningRequest
|
||||
connectionStatus uint32
|
||||
reconnectWait sync.WaitGroup
|
||||
connectCalled bool
|
||||
}
|
||||
|
||||
type runningRequest struct {
|
||||
|
@ -37,13 +38,19 @@ const (
|
|||
)
|
||||
|
||||
func NewProtobufClient(hostAndPort string) *ProtobufClient {
|
||||
client := &ProtobufClient{hostAndPort: hostAndPort, requestBuffer: make(map[uint32]*runningRequest), connectionStatus: IS_CONNECTED}
|
||||
return &ProtobufClient{hostAndPort: hostAndPort, requestBuffer: make(map[uint32]*runningRequest), connectionStatus: IS_CONNECTED}
|
||||
}
|
||||
|
||||
func (self *ProtobufClient) Connect() {
|
||||
if self.connectCalled {
|
||||
return
|
||||
}
|
||||
self.connectCalled = true
|
||||
go func() {
|
||||
client.reconnect()
|
||||
client.readResponses()
|
||||
self.reconnect()
|
||||
self.readResponses()
|
||||
}()
|
||||
go client.peridicallySweepTimedOutRequests()
|
||||
return client
|
||||
go self.peridicallySweepTimedOutRequests()
|
||||
}
|
||||
|
||||
func (self *ProtobufClient) Close() {
|
||||
|
|
|
@ -2,6 +2,7 @@ package coordinator
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"cluster"
|
||||
log "code.google.com/p/log4go"
|
||||
"common"
|
||||
"datastore"
|
||||
|
@ -16,7 +17,7 @@ import (
|
|||
type ProtobufRequestHandler struct {
|
||||
db datastore.Datastore
|
||||
coordinator Coordinator
|
||||
clusterConfig *ClusterConfiguration
|
||||
clusterConfig *cluster.ClusterConfiguration
|
||||
writeOk protocol.Response_Type
|
||||
}
|
||||
|
||||
|
@ -27,7 +28,7 @@ var (
|
|||
sequenceNumberResponse = protocol.Response_SEQUENCE_NUMBER
|
||||
)
|
||||
|
||||
func NewProtobufRequestHandler(db datastore.Datastore, coordinator Coordinator, clusterConfig *ClusterConfiguration) *ProtobufRequestHandler {
|
||||
func NewProtobufRequestHandler(db datastore.Datastore, coordinator Coordinator, clusterConfig *cluster.ClusterConfiguration) *ProtobufRequestHandler {
|
||||
return &ProtobufRequestHandler{db: db, coordinator: coordinator, writeOk: protocol.Response_WRITE_OK, clusterConfig: clusterConfig}
|
||||
}
|
||||
|
||||
|
@ -35,7 +36,7 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con
|
|||
if *request.Type == protocol.Request_PROXY_WRITE {
|
||||
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}
|
||||
|
||||
request.OriginatingServerId = &self.clusterConfig.localServerId
|
||||
request.OriginatingServerId = &self.clusterConfig.LocalServerId
|
||||
// TODO: make request logging and datastore write atomic
|
||||
replicationFactor := self.clusterConfig.GetReplicationFactor(request.Database)
|
||||
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId)
|
||||
|
@ -53,7 +54,7 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con
|
|||
} else if *request.Type == protocol.Request_PROXY_DROP_SERIES {
|
||||
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}
|
||||
|
||||
request.OriginatingServerId = &self.clusterConfig.localServerId
|
||||
request.OriginatingServerId = &self.clusterConfig.LocalServerId
|
||||
replicationFactor := uint8(*request.ReplicationFactor)
|
||||
// TODO: make request logging and datastore write atomic
|
||||
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId)
|
||||
|
@ -85,7 +86,7 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con
|
|||
} else if *request.Type == protocol.Request_PROXY_DROP_DATABASE {
|
||||
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}
|
||||
|
||||
request.OriginatingServerId = &self.clusterConfig.localServerId
|
||||
request.OriginatingServerId = &self.clusterConfig.LocalServerId
|
||||
replicationFactor := uint8(*request.ReplicationFactor)
|
||||
// TODO: make request logging and datastore write atomic
|
||||
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId)
|
||||
|
@ -117,7 +118,7 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con
|
|||
} else if *request.Type == protocol.Request_PROXY_DELETE {
|
||||
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}
|
||||
|
||||
request.OriginatingServerId = &self.clusterConfig.localServerId
|
||||
request.OriginatingServerId = &self.clusterConfig.LocalServerId
|
||||
// TODO: make request logging and datastore write atomic
|
||||
replicationFactor := self.clusterConfig.GetReplicationFactor(request.Database)
|
||||
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId)
|
||||
|
|
|
@ -2,6 +2,7 @@ package coordinator
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"cluster"
|
||||
log "code.google.com/p/log4go"
|
||||
"common"
|
||||
"configuration"
|
||||
|
@ -39,7 +40,7 @@ type RaftServer struct {
|
|||
router *mux.Router
|
||||
raftServer raft.Server
|
||||
httpServer *http.Server
|
||||
clusterConfig *ClusterConfiguration
|
||||
clusterConfig *cluster.ClusterConfiguration
|
||||
mutex sync.RWMutex
|
||||
listener net.Listener
|
||||
closing bool
|
||||
|
@ -58,7 +59,7 @@ var replicateWrite = protocol.Request_REPLICATION_WRITE
|
|||
var replicateDelete = protocol.Request_REPLICATION_DELETE
|
||||
|
||||
// Creates a new server.
|
||||
func NewRaftServer(config *configuration.Configuration, clusterConfig *ClusterConfiguration) *RaftServer {
|
||||
func NewRaftServer(config *configuration.Configuration, clusterConfig *cluster.ClusterConfiguration) *RaftServer {
|
||||
if !registeredCommands {
|
||||
registeredCommands = true
|
||||
for _, command := range internalRaftCommands {
|
||||
|
@ -190,7 +191,7 @@ func (s *RaftServer) DropDatabase(name string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *RaftServer) SaveDbUser(u *dbUser) error {
|
||||
func (s *RaftServer) SaveDbUser(u *cluster.DbUser) error {
|
||||
command := NewSaveDbUserCommand(u)
|
||||
_, err := s.doOrProxyCommand(command, "save_db_user")
|
||||
return err
|
||||
|
@ -202,16 +203,16 @@ func (s *RaftServer) ChangeDbUserPassword(db, username string, hash []byte) erro
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *RaftServer) SaveClusterAdminUser(u *clusterAdmin) error {
|
||||
func (s *RaftServer) SaveClusterAdminUser(u *cluster.ClusterAdmin) error {
|
||||
command := NewSaveClusterAdminCommand(u)
|
||||
_, err := s.doOrProxyCommand(command, "save_cluster_admin_user")
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *RaftServer) CreateRootUser() error {
|
||||
u := &clusterAdmin{CommonUser{"root", "", false, "root"}}
|
||||
hash, _ := hashPassword(DEFAULT_ROOT_PWD)
|
||||
u.changePassword(string(hash))
|
||||
u := &cluster.ClusterAdmin{cluster.CommonUser{"root", "", false, "root"}}
|
||||
hash, _ := cluster.HashPassword(DEFAULT_ROOT_PWD)
|
||||
u.ChangePassword(string(hash))
|
||||
return s.SaveClusterAdminUser(u)
|
||||
}
|
||||
|
||||
|
@ -222,27 +223,26 @@ func (s *RaftServer) SetContinuousQueryTimestamp(timestamp time.Time) error {
|
|||
}
|
||||
|
||||
func (s *RaftServer) CreateContinuousQuery(db string, query string) error {
|
||||
// if there are already-running queries, we need to initiate a backfill
|
||||
if !s.clusterConfig.continuousQueryTimestamp.IsZero() {
|
||||
selectQuery, err := parser.ParseSelectQuery(query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse continuous query: %s", query)
|
||||
}
|
||||
selectQuery, err := parser.ParseSelectQuery(query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse continuous query: %s", query)
|
||||
}
|
||||
|
||||
duration, err := selectQuery.GetGroupByClause().GetGroupByTime()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Couldn't get group by time for continuous query: %s", err)
|
||||
}
|
||||
duration, err := selectQuery.GetGroupByClause().GetGroupByTime()
|
||||
if err != nil {
|
||||
return fmt.Errorf("Couldn't get group by time for continuous query: %s", err)
|
||||
}
|
||||
|
||||
if duration != nil {
|
||||
zeroTime := time.Time{}
|
||||
currentBoundary := time.Now().Truncate(*duration)
|
||||
go s.runContinuousQuery(db, selectQuery, zeroTime, currentBoundary)
|
||||
}
|
||||
if duration != nil {
|
||||
zeroTime := time.Time{}
|
||||
currentBoundary := time.Now().Truncate(*duration)
|
||||
go s.runContinuousQuery(db, selectQuery, zeroTime, currentBoundary)
|
||||
} else {
|
||||
// TODO: make continuous queries backfill for queries that don't have a group by time
|
||||
}
|
||||
|
||||
command := NewCreateContinuousQueryCommand(db, query)
|
||||
_, err := s.doOrProxyCommand(command, "create_cq")
|
||||
_, err = s.doOrProxyCommand(command, "create_cq")
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -252,19 +252,19 @@ func (s *RaftServer) DeleteContinuousQuery(db string, id uint32) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *RaftServer) ActivateServer(server *ClusterServer) error {
|
||||
func (s *RaftServer) ActivateServer(server *cluster.ClusterServer) error {
|
||||
return errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (s *RaftServer) AddServer(server *ClusterServer, insertIndex int) error {
|
||||
func (s *RaftServer) AddServer(server *cluster.ClusterServer, insertIndex int) error {
|
||||
return errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (s *RaftServer) MovePotentialServer(server *ClusterServer, insertIndex int) error {
|
||||
func (s *RaftServer) MovePotentialServer(server *cluster.ClusterServer, insertIndex int) error {
|
||||
return errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (s *RaftServer) ReplaceServer(oldServer *ClusterServer, replacement *ClusterServer) error {
|
||||
func (s *RaftServer) ReplaceServer(oldServer *cluster.ClusterServer, replacement *cluster.ClusterServer) error {
|
||||
return errors.New("not implemented")
|
||||
}
|
||||
|
||||
|
@ -356,11 +356,9 @@ func (s *RaftServer) startRaft() error {
|
|||
log.Error(err)
|
||||
}
|
||||
|
||||
command := NewAddPotentialServerCommand(&ClusterServer{
|
||||
RaftName: name,
|
||||
RaftConnectionString: connectionString,
|
||||
ProtobufConnectionString: s.config.ProtobufConnectionString(),
|
||||
})
|
||||
protobufConnectString := s.config.ProtobufConnectionString()
|
||||
clusterServer := cluster.NewClusterServer(name, connectionString, protobufConnectString, NewProtobufClient(protobufConnectString))
|
||||
command := NewAddPotentialServerCommand(clusterServer)
|
||||
_, err = s.doOrProxyCommand(command, "add_server")
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -413,14 +411,14 @@ func (s *RaftServer) raftLeaderLoop(loopTimer *time.Ticker) {
|
|||
}
|
||||
|
||||
func (s *RaftServer) checkContinuousQueries() {
|
||||
if s.clusterConfig.continuousQueries == nil || len(s.clusterConfig.continuousQueries) == 0 {
|
||||
if !s.clusterConfig.HasContinuousQueries() {
|
||||
return
|
||||
}
|
||||
|
||||
runTime := time.Now()
|
||||
queriesDidRun := false
|
||||
|
||||
for db, queries := range s.clusterConfig.parsedContinuousQueries {
|
||||
for db, queries := range s.clusterConfig.ParsedContinuousQueries {
|
||||
for _, query := range queries {
|
||||
groupByClause := query.GetGroupByClause()
|
||||
|
||||
|
@ -436,9 +434,10 @@ func (s *RaftServer) checkContinuousQueries() {
|
|||
}
|
||||
|
||||
currentBoundary := runTime.Truncate(*duration)
|
||||
lastBoundary := s.clusterConfig.continuousQueryTimestamp.Truncate(*duration)
|
||||
lastRun := s.clusterConfig.LastContinuousQueryRunTime()
|
||||
lastBoundary := lastRun.Truncate(*duration)
|
||||
|
||||
if currentBoundary.After(s.clusterConfig.continuousQueryTimestamp) {
|
||||
if currentBoundary.After(lastRun) {
|
||||
s.runContinuousQuery(db, query, lastBoundary, currentBoundary)
|
||||
queriesDidRun = true
|
||||
}
|
||||
|
@ -446,13 +445,13 @@ func (s *RaftServer) checkContinuousQueries() {
|
|||
}
|
||||
|
||||
if queriesDidRun {
|
||||
s.clusterConfig.continuousQueryTimestamp = runTime
|
||||
s.clusterConfig.SetLastContinuousQueryRunTime(runTime)
|
||||
s.SetContinuousQueryTimestamp(runTime)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *RaftServer) runContinuousQuery(db string, query *parser.SelectQuery, start time.Time, end time.Time) {
|
||||
clusterAdmin := s.clusterConfig.clusterAdmins["root"]
|
||||
clusterAdmin := s.clusterConfig.GetClusterAdmin("root")
|
||||
intoClause := query.GetIntoClause()
|
||||
targetName := intoClause.Target.Name
|
||||
sequenceNumber := uint64(1)
|
||||
|
@ -592,16 +591,17 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) {
|
|||
// it's a new server the cluster has never seen, make it a potential
|
||||
if server == nil {
|
||||
log.Info("Adding new server to the cluster config %s", command.Name)
|
||||
addServer := NewAddPotentialServerCommand(&ClusterServer{
|
||||
RaftName: command.Name,
|
||||
RaftConnectionString: command.ConnectionString,
|
||||
ProtobufConnectionString: command.ProtobufConnectionString,
|
||||
})
|
||||
client := NewProtobufClient(command.ProtobufConnectionString)
|
||||
fmt.Println("CREATED CLIENT")
|
||||
clusterServer := cluster.NewClusterServer(command.Name, command.ConnectionString, command.ProtobufConnectionString, client)
|
||||
fmt.Println("CREATED SERVER")
|
||||
addServer := NewAddPotentialServerCommand(clusterServer)
|
||||
if _, err := s.raftServer.Do(addServer); err != nil {
|
||||
log.Error("Error joining raft server: ", err, command)
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
fmt.Println("DID COMMAND...")
|
||||
}
|
||||
log.Info("Server %s already exist in the cluster config", command.Name)
|
||||
} else {
|
||||
|
@ -617,15 +617,7 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) {
|
|||
}
|
||||
|
||||
func (s *RaftServer) configHandler(w http.ResponseWriter, req *http.Request) {
|
||||
jsonObject := make(map[string]interface{})
|
||||
dbs := make([]string, 0)
|
||||
for db, _ := range s.clusterConfig.databaseReplicationFactors {
|
||||
dbs = append(dbs, db)
|
||||
}
|
||||
jsonObject["databases"] = dbs
|
||||
jsonObject["cluster_admins"] = s.clusterConfig.clusterAdmins
|
||||
jsonObject["database_users"] = s.clusterConfig.dbUsers
|
||||
js, err := json.Marshal(jsonObject)
|
||||
js, err := json.Marshal(s.clusterConfig.GetMapForJsonSerialization())
|
||||
if err != nil {
|
||||
log.Error("ERROR marshalling config: ", err)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
package coordinator
|
||||
|
||||
import (
|
||||
"cluster"
|
||||
"protocol"
|
||||
"time"
|
||||
)
|
||||
|
||||
// duration 1h, 1d, 7d
|
||||
// split duration to n shards
|
||||
// if n > 1
|
||||
// hash using either random or series name
|
||||
|
||||
// These are things that the Coordinator need (defined in Coordinator, will have to import cluster package)
|
||||
type ShardAwareObject interface {
|
||||
GetShards(querySpec cluster.QuerySpec) []Shard
|
||||
GetShardById(id uint32) Shard
|
||||
}
|
||||
|
||||
type Shard interface {
|
||||
Id() uint32
|
||||
StartTime() time.Time
|
||||
EndTime() time.Time
|
||||
SeriesNames() []string
|
||||
Write([]*protocol.Series) error
|
||||
Query(cluster.QuerySpec, chan *protocol.Response) error
|
||||
}
|
Loading…
Reference in New Issue