Fixed bug where raft snapshots were storing the local server id, which isn't shared state. Updated initialization and recovery to set the local server id based on if a given server's Raft name matches the raft name set locally.

pull/269/head
Paul Dix 2014-02-25 07:28:53 -05:00
parent bcb2f25e6d
commit 679403b464
4 changed files with 39 additions and 109 deletions

View File

@ -50,11 +50,6 @@ const (
/*
This struct stores all the metadata confiugration information about a running cluster. This includes
the servers in the cluster and their state, databases, users, and which continuous queries are running.
ClusterVersion is a monotonically increasing int that keeps track of different server configurations.
For example, when you spin up a cluster and start writing data, the version will be 1. If you expand the
cluster the version will be bumped. Using this the cluster is able to run two versions simultaneously
while the new servers are being brought online.
*/
type ClusterConfiguration struct {
createDatabaseLock sync.RWMutex
@ -68,9 +63,7 @@ type ClusterConfiguration struct {
continuousQueriesLock sync.RWMutex
ParsedContinuousQueries map[string]map[uint32]*parser.SelectQuery
continuousQueryTimestamp time.Time
hasRunningServers bool
LocalServerId uint32
ClusterVersion uint32
config *configuration.Configuration
addedLocalServerWait chan bool
addedLocalServer bool
@ -86,6 +79,7 @@ type ClusterConfiguration struct {
lastShardId uint32
shardsById map[uint32]*ShardData
shardsByIdLock sync.RWMutex
LocalRaftName string
}
type ContinuousQuery struct {
@ -202,22 +196,6 @@ func (self *ClusterConfiguration) GetServerByProtobufConnectionString(connection
return nil
}
func (self *ClusterConfiguration) UpdateServerState(serverId uint32, state ServerState) error {
self.serversLock.Lock()
defer self.serversLock.Unlock()
atomic.AddUint32(&self.ClusterVersion, uint32(1))
for _, server := range self.servers {
if server.Id == serverId {
if state == Running {
self.hasRunningServers = true
}
server.State = state
return nil
}
}
return errors.New(fmt.Sprintf("No server with id %d", serverId))
}
func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) {
self.serversLock.Lock()
defer self.serversLock.Unlock()
@ -226,10 +204,13 @@ func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) {
server.Id = uint32(len(self.servers))
log.Info("Added server to cluster config: %d, %s, %s", server.Id, server.RaftConnectionString, server.ProtobufConnectionString)
log.Info("Checking whether this is the local server new: %s, local: %s\n", self.config.ProtobufConnectionString(), server.ProtobufConnectionString)
if server.ProtobufConnectionString != self.config.ProtobufConnectionString() {
log.Info("Connecting to ProtobufServer: %s", server.ProtobufConnectionString)
server.connection = self.connectionCreator(server.ProtobufConnectionString)
server.Connect()
if server.RaftName != self.LocalRaftName {
log.Info("Connecting to ProtobufServer: %s", server.ProtobufConnectionString, self.config.ProtobufConnectionString())
if server.connection == nil {
fmt.Println("Creating connection from AddPotentialServer")
server.connection = self.connectionCreator(server.ProtobufConnectionString)
server.Connect()
}
server.SetWriteBuffer(NewWriteBuffer(server, self.wal, server.Id, self.config.PerServerWriteBufferSize))
server.StartHeartbeat()
} else if !self.addedLocalServer {
@ -431,29 +412,23 @@ func (self *ClusterConfiguration) SaveClusterAdmin(u *ClusterAdmin) {
}
type SavedConfiguration struct {
Databases map[string]uint8
Admins map[string]*ClusterAdmin
DbUsers map[string]map[string]*DbUser
Servers []*ClusterServer
HasRunningServers bool
LocalServerId uint32
ClusterVersion uint32
ShortTermShards []*NewShardData
LongTermShards []*NewShardData
Databases map[string]uint8
Admins map[string]*ClusterAdmin
DbUsers map[string]map[string]*DbUser
Servers []*ClusterServer
ShortTermShards []*NewShardData
LongTermShards []*NewShardData
}
func (self *ClusterConfiguration) Save() ([]byte, error) {
log.Debug("Dumping the cluster configuration")
data := &SavedConfiguration{
Databases: self.DatabaseReplicationFactors,
Admins: self.clusterAdmins,
DbUsers: self.dbUsers,
Servers: self.servers,
HasRunningServers: self.hasRunningServers,
LocalServerId: self.LocalServerId,
ClusterVersion: self.ClusterVersion,
ShortTermShards: self.convertShardsToNewShardData(self.shortTermShards),
LongTermShards: self.convertShardsToNewShardData(self.longTermShards),
Databases: self.DatabaseReplicationFactors,
Admins: self.clusterAdmins,
DbUsers: self.dbUsers,
Servers: self.servers,
ShortTermShards: self.convertShardsToNewShardData(self.shortTermShards),
LongTermShards: self.convertShardsToNewShardData(self.longTermShards),
}
b := bytes.NewBuffer(nil)
@ -517,35 +492,24 @@ func (self *ClusterConfiguration) Recovery(b []byte) error {
self.servers = data.Servers
for _, server := range self.servers {
fmt.Println("CONFIG: server: ", server.Id, server)
if server.RaftName == self.LocalRaftName {
fmt.Println("Set local server after recovery")
self.LocalServerId = server.Id
self.addedLocalServerWait <- true
self.addedLocalServer = true
continue
}
server.connection = oldServers[server.ProtobufConnectionString]
if server.connection == nil {
fmt.Println("Creating connection from Recovery")
server.connection = self.connectionCreator(server.ProtobufConnectionString)
if server.ProtobufConnectionString != self.config.ProtobufConnectionString() {
server.SetWriteBuffer(NewWriteBuffer(server, self.wal, server.Id, self.config.PerServerWriteBufferSize))
server.Connect()
server.StartHeartbeat()
}
}
server.StartHeartbeat()
}
self.hasRunningServers = data.HasRunningServers
self.LocalServerId = data.LocalServerId
self.ClusterVersion = data.ClusterVersion
if self.addedLocalServer {
return nil
}
for _, server := range self.servers {
log.Info("Checking whether this is the local server new: %s, local: %s\n", self.config.ProtobufConnectionString(), server.ProtobufConnectionString)
if server.ProtobufConnectionString != self.config.ProtobufConnectionString() {
continue
}
log.Info("Added the local server")
self.addedLocalServerWait <- true
self.addedLocalServer = true
break
}
self.shardsByIdLock.Lock()
@ -714,16 +678,10 @@ func (self *ClusterConfiguration) createShards(microsecondsEpoch int64, shardTyp
}
func (self *ClusterConfiguration) getStartAndEndBasedOnDuration(microsecondsEpoch int64, duration int64) (*time.Time, *time.Time) {
fmt.Println("MS epoch, seconds duration: ", microsecondsEpoch, duration)
startTimeSeconds := microsecondsEpoch / int64(1000) / int64(1000) / duration * duration
startTime := time.Unix(startTimeSeconds, 0)
endTime := time.Unix(startTimeSeconds+duration, 0)
// TODO: remove this....
if !((startTime.UnixNano()/int64(1000)) <= microsecondsEpoch && (endTime.UnixNano()/int64(1000)) > microsecondsEpoch) {
fmt.Println("TIME NOT IN DURATION: ", startTime.UnixNano()/int64(1000), endTime.UnixNano()/int64(1000), microsecondsEpoch)
panic("blah")
}
return &startTime, &endTime
}
@ -742,7 +700,6 @@ func (self *ClusterConfiguration) GetShards(querySpec *parser.QuerySpec) []*Shar
shouldQueryShortTerm, shouldQueryLongTerm := querySpec.ShouldQueryShortTermAndLongTerm()
if shouldQueryLongTerm && shouldQueryShortTerm {
fmt.Println("GetShards: long term and short term")
shards := make([]*ShardData, 0)
shards = append(shards, self.getShardRange(querySpec, self.shortTermShards)...)
shards = append(shards, self.getShardRange(querySpec, self.longTermShards)...)
@ -756,10 +713,8 @@ func (self *ClusterConfiguration) GetShards(querySpec *parser.QuerySpec) []*Shar
var shards []*ShardData
if shouldQueryLongTerm {
fmt.Println("GetShards: long term")
shards = self.getShardRange(querySpec, self.longTermShards)
} else {
fmt.Println("GetShards: short term")
shards = self.getShardRange(querySpec, self.shortTermShards)
}
if querySpec.IsAscending() {
@ -784,10 +739,8 @@ func (self *ClusterConfiguration) GetAllShards() []*ShardData {
}
func (self *ClusterConfiguration) getShardRange(querySpec QuerySpec, shards []*ShardData) []*ShardData {
fmt.Println("---------------------- getShardRange")
startTime := querySpec.GetStartTime().UnixNano() / 1000
endTime := querySpec.GetEndTime().UnixNano() / 1000
fmt.Println("StartTime, EndTime: ", startTime, endTime)
startIndex := -1
endIndex := -1
@ -798,7 +751,6 @@ func (self *ClusterConfiguration) getShardRange(querySpec QuerySpec, shards []*S
// this logic looks a little weird because the shards passed into this function should
// always be passed in time descending order. But start time is low and end time is high. just FYI.
for i, shard := range shards {
fmt.Println("shard: ", shard)
if startIndex == -1 {
if shard.IsMicrosecondInRange(endTime) {
startIndex = i
@ -815,8 +767,7 @@ func (self *ClusterConfiguration) getShardRange(querySpec QuerySpec, shards []*S
if endIndex == -1 {
endIndex = len(shards)
}
fmt.Printf("StartIndex: %d, EndIndex: %d, Len: %d\n", startIndex, endIndex, len(shards[startIndex:endIndex]))
fmt.Println("END ---------------------- getShardRange")
fmt.Printf("getShardRange: StartIndex: %d, EndIndex: %d, Len: %d\n", startIndex, endIndex, len(shards[startIndex:endIndex]))
return shards[startIndex:endIndex]
}
@ -902,17 +853,11 @@ func (self *ClusterConfiguration) AddShards(shards []*NewShardData) ([]*ShardDat
createdShards = append(createdShards, shard)
fmt.Println(self.config.ShortTermShard)
log.Info("%s: %d - start: %s. end: %s. isLocal: %d. servers: %s",
message, shard.Id(),
shard.StartTime().Format("Mon Jan 2 15:04:05 -0700 MST 2006"), shard.EndTime().Format("Mon Jan 2 15:04:05 -0700 MST 2006"),
shard.IsLocal(), shard.ServerIds())
}
fmt.Println("AddShards: ", len(createdShards))
for _, s := range createdShards {
fmt.Println("S: ", s)
}
fmt.Println("****************")
return createdShards, nil
}
@ -987,6 +932,7 @@ func (self *ClusterConfiguration) RecoverFromWAL() error {
} else {
go func(serverId uint32) {
if server.connection == nil {
fmt.Println("Creating connection from WAL: ", serverId, self.LocalServerId)
server.connection = self.connectionCreator(server.ProtobufConnectionString)
server.Connect()
}

View File

@ -13,7 +13,6 @@ func init() {
internalRaftCommands = map[string]raft.Command{}
for _, command := range []raft.Command{
&AddPotentialServerCommand{},
&UpdateServerStateCommand{},
&CreateDatabaseCommand{},
&DropDatabaseCommand{},
&SaveDbUserCommand{},
@ -205,25 +204,6 @@ func (c *AddPotentialServerCommand) Apply(server raft.Server) (interface{}, erro
return nil, nil
}
type UpdateServerStateCommand struct {
ServerId uint32
State cluster.ServerState
}
func NewUpdateServerStateCommand(serverId uint32, state cluster.ServerState) *UpdateServerStateCommand {
return &UpdateServerStateCommand{ServerId: serverId, State: state}
}
func (c *UpdateServerStateCommand) CommandName() string {
return "update_state"
}
func (c *UpdateServerStateCommand) Apply(server raft.Server) (interface{}, error) {
config := server.Context().(*cluster.ClusterConfiguration)
err := config.UpdateServerState(c.ServerId, c.State)
return nil, err
}
type InfluxJoinCommand struct {
Name string `json:"name"`
ConnectionString string `json:"connectionString"`

View File

@ -110,6 +110,10 @@ func NewRaftServer(config *configuration.Configuration, clusterConfig *cluster.C
return s
}
func (s *RaftServer) GetRaftName() string {
return s.name
}
func (s *RaftServer) leaderConnectString() (string, bool) {
leader := s.raftServer.Leader()
peers := s.raftServer.Peers()
@ -352,7 +356,7 @@ func (s *RaftServer) startRaft() error {
clusterServer := cluster.NewClusterServer(name,
connectionString,
protobufConnectString,
NewProtobufClient(protobufConnectString, s.config.ProtobufTimeout.Duration),
nil,
s.config.ProtobufHeartbeatInterval.Duration)
command := NewAddPotentialServerCommand(clusterServer)
_, err = s.doOrProxyCommand(command, "add_server")
@ -592,11 +596,10 @@ func (s *RaftServer) joinHandler(w http.ResponseWriter, req *http.Request) {
// it's a new server the cluster has never seen, make it a potential
if server == nil {
log.Info("Adding new server to the cluster config %s", command.Name)
client := NewProtobufClient(command.ProtobufConnectionString, s.config.ProtobufTimeout.Duration)
clusterServer := cluster.NewClusterServer(command.Name,
command.ConnectionString,
command.ProtobufConnectionString,
client,
nil,
s.config.ProtobufHeartbeatInterval.Duration)
addServer := NewAddPotentialServerCommand(clusterServer)
if _, err := s.raftServer.Do(addServer); err != nil {

View File

@ -47,6 +47,7 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
clusterConfig := cluster.NewClusterConfiguration(config, writeLog, shardDb, newClient)
raftServer := coordinator.NewRaftServer(config, clusterConfig)
clusterConfig.LocalRaftName = raftServer.GetRaftName()
clusterConfig.SetShardCreator(raftServer)
clusterConfig.CreateFutureShardsAutomaticallyBeforeTimeComes()