Fix server startup to wait for local server id after raft server is started
parent
16b0052f59
commit
288f468da8
|
@ -22,6 +22,7 @@ type Server struct {
|
||||||
Config *configuration.Configuration
|
Config *configuration.Configuration
|
||||||
RequestHandler *coordinator.ProtobufRequestHandler
|
RequestHandler *coordinator.ProtobufRequestHandler
|
||||||
stopped bool
|
stopped bool
|
||||||
|
writeLog *wal.WAL
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServer(config *configuration.Configuration) (*Server, error) {
|
func NewServer(config *configuration.Configuration) (*Server, error) {
|
||||||
|
@ -47,11 +48,6 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
|
||||||
clusterConfig := cluster.NewClusterConfiguration(config, writeLog, shardDb, newClient)
|
clusterConfig := cluster.NewClusterConfiguration(config, writeLog, shardDb, newClient)
|
||||||
raftServer := coordinator.NewRaftServer(config, clusterConfig)
|
raftServer := coordinator.NewRaftServer(config, clusterConfig)
|
||||||
|
|
||||||
log.Info("Waiting for local server to be added")
|
|
||||||
clusterConfig.WaitForLocalServerLoaded()
|
|
||||||
|
|
||||||
writeLog.SetServerId(clusterConfig.ServerId())
|
|
||||||
|
|
||||||
coord := coordinator.NewCoordinatorImpl(db, raftServer, clusterConfig)
|
coord := coordinator.NewCoordinatorImpl(db, raftServer, clusterConfig)
|
||||||
go coord.SyncLogs()
|
go coord.SyncLogs()
|
||||||
requestHandler := coordinator.NewProtobufRequestHandler(db, coord, clusterConfig)
|
requestHandler := coordinator.NewProtobufRequestHandler(db, coord, clusterConfig)
|
||||||
|
@ -71,7 +67,8 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
|
||||||
Coordinator: coord,
|
Coordinator: coord,
|
||||||
AdminServer: adminServer,
|
AdminServer: adminServer,
|
||||||
Config: config,
|
Config: config,
|
||||||
RequestHandler: requestHandler}, nil
|
RequestHandler: requestHandler,
|
||||||
|
writeLog: writeLog}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *Server) ListenAndServe() error {
|
func (self *Server) ListenAndServe() error {
|
||||||
|
@ -82,6 +79,15 @@ func (self *Server) ListenAndServe() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Info("Waiting for local server to be added")
|
||||||
|
self.ClusterConfig.WaitForLocalServerLoaded()
|
||||||
|
self.writeLog.SetServerId(self.ClusterConfig.ServerId())
|
||||||
|
|
||||||
|
err = self.recoverFromLog()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
err = self.Coordinator.(*coordinator.CoordinatorImpl).ConnectToProtobufServers(self.Config.ProtobufConnectionString())
|
err = self.Coordinator.(*coordinator.CoordinatorImpl).ConnectToProtobufServers(self.Config.ProtobufConnectionString())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -93,6 +99,11 @@ func (self *Server) ListenAndServe() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *Server) recoverFromLog() error {
|
||||||
|
// TODO: recover from the log: wal.RecoverFromLog....
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (self *Server) Stop() {
|
func (self *Server) Stop() {
|
||||||
if self.stopped {
|
if self.stopped {
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in New Issue