diff --git a/src/cluster/cluster_configuration.go b/src/cluster/cluster_configuration.go index afc91a01c8..0db57fb8c5 100644 --- a/src/cluster/cluster_configuration.go +++ b/src/cluster/cluster_configuration.go @@ -55,34 +55,35 @@ const ( the servers in the cluster and their state, databases, users, and which continuous queries are running. */ type ClusterConfiguration struct { - createDatabaseLock sync.RWMutex - DatabaseReplicationFactors map[string]uint8 - usersLock sync.RWMutex - clusterAdmins map[string]*ClusterAdmin - dbUsers map[string]map[string]*DbUser - servers []*ClusterServer - serversLock sync.RWMutex - continuousQueries map[string][]*ContinuousQuery - continuousQueriesLock sync.RWMutex - ParsedContinuousQueries map[string]map[uint32]*parser.SelectQuery - continuousQueryTimestamp time.Time - LocalServerId uint32 - config *configuration.Configuration - addedLocalServerWait chan bool - addedLocalServer bool - connectionCreator func(string) ServerConnection - shardStore LocalShardStore - wal WAL - longTermShards []*ShardData - shortTermShards []*ShardData - random *rand.Rand - lastServerToGetShard *ClusterServer - shardCreator ShardCreator - shardLock sync.Mutex - shardsById map[uint32]*ShardData - shardsByIdLock sync.RWMutex - LocalRaftName string - writeBuffers []*WriteBuffer + createDatabaseLock sync.RWMutex + DatabaseReplicationFactors map[string]uint8 + usersLock sync.RWMutex + clusterAdmins map[string]*ClusterAdmin + dbUsers map[string]map[string]*DbUser + servers []*ClusterServer + serversLock sync.RWMutex + continuousQueries map[string][]*ContinuousQuery + continuousQueriesLock sync.RWMutex + ParsedContinuousQueries map[string]map[uint32]*parser.SelectQuery + continuousQueryTimestamp time.Time + LocalServerId uint32 + LocalProtobufConnectionString string + config *configuration.Configuration + addedLocalServerWait chan bool + addedLocalServer bool + connectionCreator func(string) ServerConnection + shardStore LocalShardStore + wal WAL + longTermShards []*ShardData + shortTermShards []*ShardData + random *rand.Rand + lastServerToGetShard *ClusterServer + shardCreator ShardCreator + shardLock sync.Mutex + shardsById map[uint32]*ShardData + shardsByIdLock sync.RWMutex + LocalRaftName string + writeBuffers []*WriteBuffer } type ContinuousQuery struct { @@ -217,22 +218,33 @@ func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) { server.Id = uint32(len(self.servers)) log.Info("Added server to cluster config: %d, %s, %s", server.Id, server.RaftConnectionString, server.ProtobufConnectionString) log.Info("Checking whether this is the local server new: %s, local: %s\n", self.config.ProtobufConnectionString(), server.ProtobufConnectionString) - if server.RaftName != self.LocalRaftName { - log.Info("Connecting to ProtobufServer: %s", server.ProtobufConnectionString, self.config.ProtobufConnectionString()) - if server.connection == nil { - server.connection = self.connectionCreator(server.ProtobufConnectionString) - server.Connect() - } - writeBuffer := NewWriteBuffer(fmt.Sprintf("%d", server.GetId()), server, self.wal, server.Id, self.config.PerServerWriteBufferSize) - self.writeBuffers = append(self.writeBuffers, writeBuffer) - server.SetWriteBuffer(writeBuffer) - server.StartHeartbeat() - } else if !self.addedLocalServer { + + if server.RaftName == self.LocalRaftName && self.addedLocalServer { + panic("how did we add the same server twice ?") + } + + // if this is the local server unblock WaitForLocalServerLoaded() + // and set the local connection string and id + if server.RaftName == self.LocalRaftName { log.Info("Added the local server") self.LocalServerId = server.Id + self.LocalProtobufConnectionString = server.ProtobufConnectionString self.addedLocalServerWait <- true self.addedLocalServer = true + return } + + // if this isn't the local server, connect to it + log.Info("Connecting to ProtobufServer: %s", server.ProtobufConnectionString, self.config.ProtobufConnectionString()) + if server.connection == nil { + server.connection = self.connectionCreator(server.ProtobufConnectionString) + server.Connect() + } + writeBuffer := NewWriteBuffer(fmt.Sprintf("%d", server.GetId()), server, self.wal, server.Id, self.config.PerServerWriteBufferSize) + self.writeBuffers = append(self.writeBuffers, writeBuffer) + server.SetWriteBuffer(writeBuffer) + server.StartHeartbeat() + return } func (self *ClusterConfiguration) GetDatabases() []*Database { @@ -344,6 +356,10 @@ func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQ return self.continuousQueries[db] } +func (self *ClusterConfiguration) GetLocalConfiguration() *configuration.Configuration { + return self.config +} + func (self *ClusterConfiguration) GetDbUsers(db string) []common.User { self.usersLock.RLock() defer self.usersLock.RUnlock() diff --git a/src/coordinator/command.go b/src/coordinator/command.go index cd36493e56..e1579e8611 100644 --- a/src/coordinator/command.go +++ b/src/coordinator/command.go @@ -3,6 +3,7 @@ package coordinator import ( "cluster" "encoding/json" + "fmt" "io" "time" @@ -15,7 +16,7 @@ var internalRaftCommands map[string]raft.Command func init() { internalRaftCommands = map[string]raft.Command{} for _, command := range []raft.Command{ - &AddPotentialServerCommand{}, + &InfluxJoinCommand{}, &CreateDatabaseCommand{}, &DropDatabaseCommand{}, &SaveDbUserCommand{}, @@ -216,24 +217,6 @@ func (c *SaveClusterAdminCommand) Apply(server raft.Server) (interface{}, error) return nil, nil } -type AddPotentialServerCommand struct { - Server *cluster.ClusterServer -} - -func NewAddPotentialServerCommand(s *cluster.ClusterServer) *AddPotentialServerCommand { - return &AddPotentialServerCommand{Server: s} -} - -func (c *AddPotentialServerCommand) CommandName() string { - return "add_server" -} - -func (c *AddPotentialServerCommand) Apply(server raft.Server) (interface{}, error) { - config := server.Context().(*cluster.ClusterConfiguration) - config.AddPotentialServer(c.Server) - return nil, nil -} - type InfluxJoinCommand struct { Name string `json:"name"` ConnectionString string `json:"connectionString"` @@ -242,13 +225,31 @@ type InfluxJoinCommand struct { // The name of the Join command in the log func (c *InfluxJoinCommand) CommandName() string { - return "raft:join" + return "join" } func (c *InfluxJoinCommand) Apply(server raft.Server) (interface{}, error) { err := server.AddPeer(c.Name, c.ConnectionString) + if err != nil { + return nil, err + } - return []byte("join"), err + clusterConfig := server.Context().(*cluster.ClusterConfiguration) + + newServer := clusterConfig.GetServerByRaftName(c.Name) + // it's a new server the cluster has never seen, make it a potential + if newServer != nil { + return nil, fmt.Errorf("Server %s already exist", c.Name) + } + + log.Info("Adding new server to the cluster config %s", c.Name) + clusterServer := cluster.NewClusterServer(c.Name, + c.ConnectionString, + c.ProtobufConnectionString, + nil, + clusterConfig.GetLocalConfiguration()) + clusterConfig.AddPotentialServer(clusterServer) + return nil, nil } func (c *InfluxJoinCommand) NodeName() string { diff --git a/src/coordinator/raft_server.go b/src/coordinator/raft_server.go index f430863e58..55910d1a28 100644 --- a/src/coordinator/raft_server.go +++ b/src/coordinator/raft_server.go @@ -380,18 +380,6 @@ func (s *RaftServer) startRaft() error { if err != nil { log.Error(err) } - - protobufConnectString := s.config.ProtobufConnectionString() - clusterServer := cluster.NewClusterServer(name, - connectionString, - protobufConnectString, - nil, - s.config) - command := NewAddPotentialServerCommand(clusterServer) - _, err = s.doOrProxyCommand(command) - if err != nil { - return err - } err = s.CreateRootUser() return err } @@ -608,6 +596,7 @@ func (s *RaftServer) retryCommand(command raft.Command, retries int) (ret interf } func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { + // if this is the leader, process the command if s.raftServer.State() == raft.Leader { command := &InfluxJoinCommand{} if err := json.NewDecoder(req.Body).Decode(&command); err != nil { @@ -619,34 +608,17 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) { if _, err := s.raftServer.Do(command); err != nil { log.Error("Can't process %v: %s", command, err) http.Error(w, err.Error(), http.StatusInternalServerError) - return } - server := s.clusterConfig.GetServerByRaftName(command.Name) - // it's a new server the cluster has never seen, make it a potential - if server == nil { - log.Info("Adding new server to the cluster config %s", command.Name) - clusterServer := cluster.NewClusterServer(command.Name, - command.ConnectionString, - command.ProtobufConnectionString, - nil, - s.config) - addServer := NewAddPotentialServerCommand(clusterServer) - if _, err := s.raftServer.Do(addServer); err != nil { - log.Error("Error joining raft server: ", err, command) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - } - log.Info("Server %s already exist in the cluster config", command.Name) + return + } + + leader, ok := s.leaderConnectString() + log.Debug("Non-leader redirecting to: (%v, %v)", leader, ok) + if ok { + log.Debug("redirecting to leader to join...") + http.Redirect(w, req, leader+"/join", http.StatusTemporaryRedirect) } else { - leader, ok := s.leaderConnectString() - log.Debug("Non-leader redirecting to: (%v, %v)", leader, ok) - if ok { - log.Debug("redirecting to leader to join...") - http.Redirect(w, req, leader+"/join", http.StatusTemporaryRedirect) - } else { - http.Error(w, errors.New("Couldn't find leader of the cluster to join").Error(), http.StatusInternalServerError) - } + http.Error(w, errors.New("Couldn't find leader of the cluster to join").Error(), http.StatusInternalServerError) } }