diff --git a/peer.go b/peer.go index e7761dd974..90e67a48a9 100644 --- a/peer.go +++ b/peer.go @@ -89,14 +89,14 @@ func (p *Peer) startHeartbeat() { } // Stops the peer heartbeat. -func (p *Peer) stopHeartbeat() { +func (p *Peer) stopHeartbeat(flush bool) { // here is a problem // the previous stop is no buffer leader may get blocked - // when heartbeat returns at line 132 + // when heartbeat returns // I make the channel with 1 buffer // and try to panic here select { - case p.stopChan <- true: + case p.stopChan <- flush: default: panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat") @@ -132,28 +132,40 @@ func (p *Peer) heartbeat(c chan bool) { for { select { - case <-stopChan: - debugln("peer.heartbeat.stop: ", p.Name()) - return - - case <-time.After(p.heartbeatTimeout): - debugln("peer.heartbeat.run: ", p.Name()) - prevLogIndex := p.getPrevLogIndex() - entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest) - - if p.server.State() != Leader { + case flush := <-stopChan: + if !flush { + debugln("peer.heartbeat.stop: ", p.Name()) + return + } else { + // before we can safely remove a node + // we must flush the remove command to the node first + p.flush() + debugln("peer.heartbeat.stop: ", p.Name()) return } - if entries != nil { - p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries)) - } else { - p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot)) - } + case <-time.After(p.heartbeatTimeout): + p.flush() } } } +func (p *Peer) flush() { + debugln("peer.heartbeat.run: ", p.Name()) + prevLogIndex := p.getPrevLogIndex() + entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest) + + if p.server.State() != Leader { + return + } + + if entries != nil { + p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries)) + } else { + p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot)) + } +} + //-------------------------------------- // Append Entries //-------------------------------------- diff --git a/server.go b/server.go index 154fbebf92..4048c7bacb 100644 --- a/server.go +++ b/server.go @@ -719,7 +719,7 @@ func (s *Server) leaderLoop() { // Stop all peers. for _, peer := range s.peers { - peer.stopHeartbeat() + peer.stopHeartbeat(false) } s.syncedPeer = nil } @@ -1009,10 +1009,6 @@ func (s *Server) AddPeer(name string) error { func (s *Server) RemovePeer(name string) error { s.debugln("server.peer.remove: ", name, len(s.peers)) - // Ignore removal of the server itself. - if s.name == name { - return nil - } // Return error if peer doesn't exist. peer := s.peers[name] if peer == nil { @@ -1022,7 +1018,9 @@ func (s *Server) RemovePeer(name string) error { // TODO: Flush entries to the peer first. // Stop peer and remove it. - peer.stopHeartbeat() + if s.State() == Leader { + peer.stopHeartbeat(true) + } delete(s.peers, name) @@ -1043,16 +1041,7 @@ func (s *Server) RemovePeer(name string) error { // Log compaction //-------------------------------------- -// The background snapshot function -func (s *Server) Snapshot() { - for { - // TODO: change this... to something reasonable - time.Sleep(1 * time.Second) - s.takeSnapshot() - } -} - -func (s *Server) takeSnapshot() error { +func (s *Server) TakeSnapshot() error { //TODO put a snapshot mutex s.debugln("take Snapshot") if s.currentSnapshot != nil {