From 132dae30226adc1b5b056f86ce81bd6891cfa5a8 Mon Sep 17 00:00:00 2001
From: Xiang Li <xiang.li@coreos.com>
Date: Mon, 12 Aug 2013 14:46:32 -0700
Subject: [PATCH] before remove the node, flush the remove command to the node
 first

---
 peer.go   | 46 +++++++++++++++++++++++++++++-----------------
 server.go | 10 ++++------
 2 files changed, 33 insertions(+), 23 deletions(-)

diff --git a/peer.go b/peer.go
index e7761dd974..9e7e1a239f 100644
--- a/peer.go
+++ b/peer.go
@@ -89,14 +89,14 @@ func (p *Peer) startHeartbeat() {
 }
 
 // Stops the peer heartbeat.
-func (p *Peer) stopHeartbeat() {
+func (p *Peer) stopHeartbeat(flush bool) {
 	// here is a problem
 	// the previous stop is no buffer leader may get blocked
 	// when heartbeat returns at line 132
 	// I make the channel with 1 buffer
 	// and try to panic here
 	select {
-	case p.stopChan <- true:
+	case p.stopChan <- flush:
 
 	default:
 		panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat")
@@ -132,28 +132,40 @@ func (p *Peer) heartbeat(c chan bool) {
 
 	for {
 		select {
-		case <-stopChan:
-			debugln("peer.heartbeat.stop: ", p.Name())
-			return
-
-		case <-time.After(p.heartbeatTimeout):
-			debugln("peer.heartbeat.run: ", p.Name())
-			prevLogIndex := p.getPrevLogIndex()
-			entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
-
-			if p.server.State() != Leader {
+		case flush := <-stopChan:
+			if !flush {
+				debugln("peer.heartbeat.stop: ", p.Name())
+				return
+			} else {
+				// before we can safely remove a node
+				// we must flush the remove command to the node first
+				p.flushPeer()
+				debugln("peer.heartbeat.stop: ", p.Name())
 				return
 			}
 
-			if entries != nil {
-				p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
-			} else {
-				p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
-			}
+		case <-time.After(p.heartbeatTimeout):
+			p.flushPeer()
 		}
 	}
 }
 
+func (p *Peer) flushPeer() {
+	debugln("peer.heartbeat.run: ", p.Name())
+	prevLogIndex := p.getPrevLogIndex()
+	entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
+
+	if p.server.State() != Leader {
+		return
+	}
+
+	if entries != nil {
+		p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
+	} else {
+		p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
+	}
+}
+
 //--------------------------------------
 // Append Entries
 //--------------------------------------
diff --git a/server.go b/server.go
index aac7c683ea..4048c7bacb 100644
--- a/server.go
+++ b/server.go
@@ -719,7 +719,7 @@ func (s *Server) leaderLoop() {
 
 	// Stop all peers.
 	for _, peer := range s.peers {
-		peer.stopHeartbeat()
+		peer.stopHeartbeat(false)
 	}
 	s.syncedPeer = nil
 }
@@ -1009,10 +1009,6 @@ func (s *Server) AddPeer(name string) error {
 func (s *Server) RemovePeer(name string) error {
 	s.debugln("server.peer.remove: ", name, len(s.peers))
 
-	// Ignore removal of the server itself.
-	if s.name == name {
-		return nil
-	}
 	// Return error if peer doesn't exist.
 	peer := s.peers[name]
 	if peer == nil {
@@ -1022,7 +1018,9 @@ func (s *Server) RemovePeer(name string) error {
 	// TODO: Flush entries to the peer first.
 
 	// Stop peer and remove it.
-	peer.stopHeartbeat()
+	if s.State() == Leader {
+		peer.stopHeartbeat(true)
+	}
 
 	delete(s.peers, name)