merge InfluxJoinCommand with AddPotentialServerCommand

pull/482/head
John Shahid 2014-04-28 18:26:32 -04:00
parent 876b05956f
commit 4866197788
3 changed files with 87 additions and 98 deletions

View File

@ -55,34 +55,35 @@ const (
the servers in the cluster and their state, databases, users, and which continuous queries are running.
*/
type ClusterConfiguration struct {
createDatabaseLock sync.RWMutex
DatabaseReplicationFactors map[string]uint8
usersLock sync.RWMutex
clusterAdmins map[string]*ClusterAdmin
dbUsers map[string]map[string]*DbUser
servers []*ClusterServer
serversLock sync.RWMutex
continuousQueries map[string][]*ContinuousQuery
continuousQueriesLock sync.RWMutex
ParsedContinuousQueries map[string]map[uint32]*parser.SelectQuery
continuousQueryTimestamp time.Time
LocalServerId uint32
config *configuration.Configuration
addedLocalServerWait chan bool
addedLocalServer bool
connectionCreator func(string) ServerConnection
shardStore LocalShardStore
wal WAL
longTermShards []*ShardData
shortTermShards []*ShardData
random *rand.Rand
lastServerToGetShard *ClusterServer
shardCreator ShardCreator
shardLock sync.Mutex
shardsById map[uint32]*ShardData
shardsByIdLock sync.RWMutex
LocalRaftName string
writeBuffers []*WriteBuffer
createDatabaseLock sync.RWMutex
DatabaseReplicationFactors map[string]uint8
usersLock sync.RWMutex
clusterAdmins map[string]*ClusterAdmin
dbUsers map[string]map[string]*DbUser
servers []*ClusterServer
serversLock sync.RWMutex
continuousQueries map[string][]*ContinuousQuery
continuousQueriesLock sync.RWMutex
ParsedContinuousQueries map[string]map[uint32]*parser.SelectQuery
continuousQueryTimestamp time.Time
LocalServerId uint32
LocalProtobufConnectionString string
config *configuration.Configuration
addedLocalServerWait chan bool
addedLocalServer bool
connectionCreator func(string) ServerConnection
shardStore LocalShardStore
wal WAL
longTermShards []*ShardData
shortTermShards []*ShardData
random *rand.Rand
lastServerToGetShard *ClusterServer
shardCreator ShardCreator
shardLock sync.Mutex
shardsById map[uint32]*ShardData
shardsByIdLock sync.RWMutex
LocalRaftName string
writeBuffers []*WriteBuffer
}
type ContinuousQuery struct {
@ -217,22 +218,33 @@ func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) {
server.Id = uint32(len(self.servers))
log.Info("Added server to cluster config: %d, %s, %s", server.Id, server.RaftConnectionString, server.ProtobufConnectionString)
log.Info("Checking whether this is the local server new: %s, local: %s\n", self.config.ProtobufConnectionString(), server.ProtobufConnectionString)
if server.RaftName != self.LocalRaftName {
log.Info("Connecting to ProtobufServer: %s", server.ProtobufConnectionString, self.config.ProtobufConnectionString())
if server.connection == nil {
server.connection = self.connectionCreator(server.ProtobufConnectionString)
server.Connect()
}
writeBuffer := NewWriteBuffer(fmt.Sprintf("%d", server.GetId()), server, self.wal, server.Id, self.config.PerServerWriteBufferSize)
self.writeBuffers = append(self.writeBuffers, writeBuffer)
server.SetWriteBuffer(writeBuffer)
server.StartHeartbeat()
} else if !self.addedLocalServer {
if server.RaftName == self.LocalRaftName && self.addedLocalServer {
panic("how did we add the same server twice ?")
}
// if this is the local server unblock WaitForLocalServerLoaded()
// and set the local connection string and id
if server.RaftName == self.LocalRaftName {
log.Info("Added the local server")
self.LocalServerId = server.Id
self.LocalProtobufConnectionString = server.ProtobufConnectionString
self.addedLocalServerWait <- true
self.addedLocalServer = true
return
}
// if this isn't the local server, connect to it
log.Info("Connecting to ProtobufServer: %s", server.ProtobufConnectionString, self.config.ProtobufConnectionString())
if server.connection == nil {
server.connection = self.connectionCreator(server.ProtobufConnectionString)
server.Connect()
}
writeBuffer := NewWriteBuffer(fmt.Sprintf("%d", server.GetId()), server, self.wal, server.Id, self.config.PerServerWriteBufferSize)
self.writeBuffers = append(self.writeBuffers, writeBuffer)
server.SetWriteBuffer(writeBuffer)
server.StartHeartbeat()
return
}
func (self *ClusterConfiguration) GetDatabases() []*Database {
@ -344,6 +356,10 @@ func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQ
return self.continuousQueries[db]
}
func (self *ClusterConfiguration) GetLocalConfiguration() *configuration.Configuration {
return self.config
}
func (self *ClusterConfiguration) GetDbUsers(db string) []common.User {
self.usersLock.RLock()
defer self.usersLock.RUnlock()

View File

@ -3,6 +3,7 @@ package coordinator
import (
"cluster"
"encoding/json"
"fmt"
"io"
"time"
@ -15,7 +16,7 @@ var internalRaftCommands map[string]raft.Command
func init() {
internalRaftCommands = map[string]raft.Command{}
for _, command := range []raft.Command{
&AddPotentialServerCommand{},
&InfluxJoinCommand{},
&CreateDatabaseCommand{},
&DropDatabaseCommand{},
&SaveDbUserCommand{},
@ -216,24 +217,6 @@ func (c *SaveClusterAdminCommand) Apply(server raft.Server) (interface{}, error)
return nil, nil
}
type AddPotentialServerCommand struct {
Server *cluster.ClusterServer
}
func NewAddPotentialServerCommand(s *cluster.ClusterServer) *AddPotentialServerCommand {
return &AddPotentialServerCommand{Server: s}
}
func (c *AddPotentialServerCommand) CommandName() string {
return "add_server"
}
func (c *AddPotentialServerCommand) Apply(server raft.Server) (interface{}, error) {
config := server.Context().(*cluster.ClusterConfiguration)
config.AddPotentialServer(c.Server)
return nil, nil
}
type InfluxJoinCommand struct {
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
@ -242,13 +225,31 @@ type InfluxJoinCommand struct {
// The name of the Join command in the log
func (c *InfluxJoinCommand) CommandName() string {
return "raft:join"
return "join"
}
func (c *InfluxJoinCommand) Apply(server raft.Server) (interface{}, error) {
err := server.AddPeer(c.Name, c.ConnectionString)
if err != nil {
return nil, err
}
return []byte("join"), err
clusterConfig := server.Context().(*cluster.ClusterConfiguration)
newServer := clusterConfig.GetServerByRaftName(c.Name)
// it's a new server the cluster has never seen, make it a potential
if newServer != nil {
return nil, fmt.Errorf("Server %s already exist", c.Name)
}
log.Info("Adding new server to the cluster config %s", c.Name)
clusterServer := cluster.NewClusterServer(c.Name,
c.ConnectionString,
c.ProtobufConnectionString,
nil,
clusterConfig.GetLocalConfiguration())
clusterConfig.AddPotentialServer(clusterServer)
return nil, nil
}
func (c *InfluxJoinCommand) NodeName() string {

View File

@ -380,18 +380,6 @@ func (s *RaftServer) startRaft() error {
if err != nil {
log.Error(err)
}
protobufConnectString := s.config.ProtobufConnectionString()
clusterServer := cluster.NewClusterServer(name,
connectionString,
protobufConnectString,
nil,
s.config)
command := NewAddPotentialServerCommand(clusterServer)
_, err = s.doOrProxyCommand(command)
if err != nil {
return err
}
err = s.CreateRootUser()
return err
}
@ -608,6 +596,7 @@ func (s *RaftServer) retryCommand(command raft.Command, retries int) (ret interf
}
func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) {
// if this is the leader, process the command
if s.raftServer.State() == raft.Leader {
command := &InfluxJoinCommand{}
if err := json.NewDecoder(req.Body).Decode(&command); err != nil {
@ -619,34 +608,17 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) {
if _, err := s.raftServer.Do(command); err != nil {
log.Error("Can't process %v: %s", command, err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
server := s.clusterConfig.GetServerByRaftName(command.Name)
// it's a new server the cluster has never seen, make it a potential
if server == nil {
log.Info("Adding new server to the cluster config %s", command.Name)
clusterServer := cluster.NewClusterServer(command.Name,
command.ConnectionString,
command.ProtobufConnectionString,
nil,
s.config)
addServer := NewAddPotentialServerCommand(clusterServer)
if _, err := s.raftServer.Do(addServer); err != nil {
log.Error("Error joining raft server: ", err, command)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
log.Info("Server %s already exist in the cluster config", command.Name)
return
}
leader, ok := s.leaderConnectString()
log.Debug("Non-leader redirecting to: (%v, %v)", leader, ok)
if ok {
log.Debug("redirecting to leader to join...")
http.Redirect(w, req, leader+"/join", http.StatusTemporaryRedirect)
} else {
leader, ok := s.leaderConnectString()
log.Debug("Non-leader redirecting to: (%v, %v)", leader, ok)
if ok {
log.Debug("redirecting to leader to join...")
http.Redirect(w, req, leader+"/join", http.StatusTemporaryRedirect)
} else {
http.Error(w, errors.New("Couldn't find leader of the cluster to join").Error(), http.StatusInternalServerError)
}
http.Error(w, errors.New("Couldn't find leader of the cluster to join").Error(), http.StatusInternalServerError)
}
}