leader must stop all heartbeat routines before stepdown and truncate its log

pull/820/head
Xiang Li 2014-01-27 09:05:54 -05:00
parent 2987111400
commit bdeac89463
2 changed files with 15 additions and 12 deletions

12
peer.go
View File

@ -146,14 +146,12 @@ func (p *Peer) heartbeat(c chan bool) {
func (p *Peer) flush() {
debugln("peer.heartbeat.flush: ", p.Name)
prevLogIndex := p.getPrevLogIndex()
term := p.server.currentTerm
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
if p.server.State() != Leader {
return
}
if entries != nil {
p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
p.sendAppendEntriesRequest(newAppendEntriesRequest(term, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
} else {
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
}
@ -192,13 +190,13 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
// If it was unsuccessful then decrement the previous log index and
// we'll try again next time.
} else {
if resp.Term() > p.server.Term() || resp.CommitIndex() > p.server.CommitIndex() {
if resp.Term() > p.server.Term() {
// this happens when there is a new leader comes up that this *leader* has not
// known yet.
// this server can know until the new leader send a ae with higher term
// or this server finish processing this response.
debugln("peer.append.resp.not.update: new.leader.found")
} else if resp.CommitIndex() >= p.prevLogIndex {
} else if resp.Term() == req.Term && resp.CommitIndex() >= p.prevLogIndex {
// we may miss a response from peer
// so maybe the peer has committed the logs we just sent
// but we did not receive the successful reply and did not increase

View File

@ -495,6 +495,12 @@ func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) {
prevLeader := s.leader
if term > s.currentTerm {
// stop heartbeats before step-down
if s.state == Leader {
for _, peer := range s.peers {
peer.stopHeartbeat(false)
}
}
// update the term and clear vote for
s.state = Follower
s.currentTerm = term
@ -771,6 +777,10 @@ func (s *server) leaderLoop() {
select {
case e := <-s.c:
if e.target == &stopValue {
// Stop all peers before stop
for _, peer := range s.peers {
peer.stopHeartbeat(false)
}
s.setState(Stopped)
} else {
switch req := e.target.(type) {
@ -796,11 +806,6 @@ func (s *server) leaderLoop() {
}
}
// Stop all peers.
for _, peer := range s.peers {
peer.stopHeartbeat(false)
}
s.syncedPeer = nil
}