before remove the node, flush the remove command to the node first

pull/820/head
Xiang Li 2013-08-12 14:46:32 -07:00
parent 27906fddaa
commit 132dae3022
2 changed files with 33 additions and 23 deletions

46
peer.go
View File

@ -89,14 +89,14 @@ func (p *Peer) startHeartbeat() {
}
// Stops the peer heartbeat.
func (p *Peer) stopHeartbeat() {
func (p *Peer) stopHeartbeat(flush bool) {
// here is a problem
// the previous stop is no buffer leader may get blocked
// when heartbeat returns at line 132
// I make the channel with 1 buffer
// and try to panic here
select {
case p.stopChan <- true:
case p.stopChan <- flush:
default:
panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat")
@ -132,28 +132,40 @@ func (p *Peer) heartbeat(c chan bool) {
for {
select {
case <-stopChan:
debugln("peer.heartbeat.stop: ", p.Name())
return
case <-time.After(p.heartbeatTimeout):
debugln("peer.heartbeat.run: ", p.Name())
prevLogIndex := p.getPrevLogIndex()
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
if p.server.State() != Leader {
case flush := <-stopChan:
if !flush {
debugln("peer.heartbeat.stop: ", p.Name())
return
} else {
// before we can safely remove a node
// we must flush the remove command to the node first
p.flushPeer()
debugln("peer.heartbeat.stop: ", p.Name())
return
}
if entries != nil {
p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
} else {
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
}
case <-time.After(p.heartbeatTimeout):
p.flushPeer()
}
}
}
func (p *Peer) flushPeer() {
debugln("peer.heartbeat.run: ", p.Name())
prevLogIndex := p.getPrevLogIndex()
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
if p.server.State() != Leader {
return
}
if entries != nil {
p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
} else {
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
}
}
//--------------------------------------
// Append Entries
//--------------------------------------

View File

@ -719,7 +719,7 @@ func (s *Server) leaderLoop() {
// Stop all peers.
for _, peer := range s.peers {
peer.stopHeartbeat()
peer.stopHeartbeat(false)
}
s.syncedPeer = nil
}
@ -1009,10 +1009,6 @@ func (s *Server) AddPeer(name string) error {
func (s *Server) RemovePeer(name string) error {
s.debugln("server.peer.remove: ", name, len(s.peers))
// Ignore removal of the server itself.
if s.name == name {
return nil
}
// Return error if peer doesn't exist.
peer := s.peers[name]
if peer == nil {
@ -1022,7 +1018,9 @@ func (s *Server) RemovePeer(name string) error {
// TODO: Flush entries to the peer first.
// Stop peer and remove it.
peer.stopHeartbeat()
if s.State() == Leader {
peer.stopHeartbeat(true)
}
delete(s.peers, name)