commit
99f0d0aaf5
94
server.go
94
server.go
|
@ -538,24 +538,27 @@ func (s *Server) followerLoop() {
|
|||
case e := <-s.c:
|
||||
if e.target == &stopValue {
|
||||
s.setState(Stopped)
|
||||
} else if command, ok := e.target.(JoinCommand); ok {
|
||||
//If no log entries exist and a self-join command is issued
|
||||
//then immediately become leader and commit entry.
|
||||
if s.log.currentIndex() == 0 && command.NodeName() == s.Name() {
|
||||
s.debugln("selfjoin and promote to leader")
|
||||
s.setState(Leader)
|
||||
s.processCommand(command, e)
|
||||
} else {
|
||||
} else {
|
||||
switch req := e.target.(type) {
|
||||
case JoinCommand:
|
||||
//If no log entries exist and a self-join command is issued
|
||||
//then immediately become leader and commit entry.
|
||||
if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
|
||||
s.debugln("selfjoin and promote to leader")
|
||||
s.setState(Leader)
|
||||
s.processCommand(req, e)
|
||||
} else {
|
||||
err = NotLeaderError
|
||||
}
|
||||
case *AppendEntriesRequest:
|
||||
e.returnValue, update = s.processAppendEntriesRequest(req)
|
||||
case *RequestVoteRequest:
|
||||
e.returnValue, update = s.processRequestVoteRequest(req)
|
||||
case *SnapshotRequest:
|
||||
e.returnValue = s.processSnapshotRequest(req)
|
||||
default:
|
||||
err = NotLeaderError
|
||||
}
|
||||
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
|
||||
e.returnValue, update = s.processAppendEntriesRequest(req)
|
||||
} else if req, ok := e.target.(*RequestVoteRequest); ok {
|
||||
e.returnValue, update = s.processRequestVoteRequest(req)
|
||||
} else if req, ok := e.target.(*SnapshotRequest); ok {
|
||||
e.returnValue = s.processSnapshotRequest(req)
|
||||
} else {
|
||||
err = NotLeaderError
|
||||
}
|
||||
|
||||
// Callback to event.
|
||||
|
@ -635,14 +638,16 @@ func (s *Server) candidateLoop() {
|
|||
var err error
|
||||
if e.target == &stopValue {
|
||||
s.setState(Stopped)
|
||||
} else if _, ok := e.target.(Command); ok {
|
||||
err = NotLeaderError
|
||||
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
|
||||
e.returnValue, _ = s.processAppendEntriesRequest(req)
|
||||
} else if req, ok := e.target.(*RequestVoteRequest); ok {
|
||||
e.returnValue, _ = s.processRequestVoteRequest(req)
|
||||
} else {
|
||||
switch req := e.target.(type) {
|
||||
case Command:
|
||||
err = NotLeaderError
|
||||
case *AppendEntriesRequest:
|
||||
e.returnValue, _ = s.processAppendEntriesRequest(req)
|
||||
case *RequestVoteRequest:
|
||||
e.returnValue, _ = s.processRequestVoteRequest(req)
|
||||
}
|
||||
}
|
||||
|
||||
// Callback to event.
|
||||
e.c <- err
|
||||
|
||||
|
@ -666,7 +671,7 @@ func (s *Server) candidateLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
// The event loop that is run when the server is in a Candidate state.
|
||||
// The event loop that is run when the server is in a Leader state.
|
||||
func (s *Server) leaderLoop() {
|
||||
s.setState(Leader)
|
||||
s.syncedPeer = make(map[string]bool)
|
||||
|
@ -688,15 +693,18 @@ func (s *Server) leaderLoop() {
|
|||
case e := <-s.c:
|
||||
if e.target == &stopValue {
|
||||
s.setState(Stopped)
|
||||
} else if command, ok := e.target.(Command); ok {
|
||||
s.processCommand(command, e)
|
||||
continue
|
||||
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
|
||||
e.returnValue, _ = s.processAppendEntriesRequest(req)
|
||||
} else if resp, ok := e.target.(*AppendEntriesResponse); ok {
|
||||
s.processAppendEntriesResponse(resp)
|
||||
} else if req, ok := e.target.(*RequestVoteRequest); ok {
|
||||
e.returnValue, _ = s.processRequestVoteRequest(req)
|
||||
} else {
|
||||
switch req := e.target.(type) {
|
||||
case Command:
|
||||
s.processCommand(req, e)
|
||||
continue
|
||||
case *AppendEntriesRequest:
|
||||
e.returnValue, _ = s.processAppendEntriesRequest(req)
|
||||
case *AppendEntriesResponse:
|
||||
s.processAppendEntriesResponse(req)
|
||||
case *RequestVoteRequest:
|
||||
e.returnValue, _ = s.processRequestVoteRequest(req)
|
||||
}
|
||||
}
|
||||
|
||||
// Callback to event.
|
||||
|
@ -726,16 +734,18 @@ func (s *Server) snapshotLoop() {
|
|||
|
||||
if e.target == &stopValue {
|
||||
s.setState(Stopped)
|
||||
} else if _, ok := e.target.(Command); ok {
|
||||
err = NotLeaderError
|
||||
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
|
||||
e.returnValue, _ = s.processAppendEntriesRequest(req)
|
||||
} else if req, ok := e.target.(*RequestVoteRequest); ok {
|
||||
e.returnValue, _ = s.processRequestVoteRequest(req)
|
||||
} else if req, ok := e.target.(*SnapshotRecoveryRequest); ok {
|
||||
e.returnValue = s.processSnapshotRecoveryRequest(req)
|
||||
} else {
|
||||
switch req := e.target.(type) {
|
||||
case Command:
|
||||
err = NotLeaderError
|
||||
case *AppendEntriesRequest:
|
||||
e.returnValue, _ = s.processAppendEntriesRequest(req)
|
||||
case *RequestVoteRequest:
|
||||
e.returnValue, _ = s.processRequestVoteRequest(req)
|
||||
case *SnapshotRecoveryRequest:
|
||||
e.returnValue = s.processSnapshotRecoveryRequest(req)
|
||||
}
|
||||
}
|
||||
|
||||
// Callback to event.
|
||||
e.c <- err
|
||||
|
||||
|
|
Loading…
Reference in New Issue