Merge pull request #109 from coreos/upstream

Update
pull/820/head
Ben Johnson 2013-08-12 21:02:44 -07:00
commit b67b95a296
2 changed files with 35 additions and 34 deletions

48
peer.go
View File

@ -89,14 +89,14 @@ func (p *Peer) startHeartbeat() {
}
// Stops the peer heartbeat.
func (p *Peer) stopHeartbeat() {
func (p *Peer) stopHeartbeat(flush bool) {
// here is a problem
// the previous stop is no buffer leader may get blocked
// when heartbeat returns at line 132
// when heartbeat returns
// I make the channel with 1 buffer
// and try to panic here
select {
case p.stopChan <- true:
case p.stopChan <- flush:
default:
panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat")
@ -132,28 +132,40 @@ func (p *Peer) heartbeat(c chan bool) {
for {
select {
case <-stopChan:
debugln("peer.heartbeat.stop: ", p.Name())
return
case <-time.After(p.heartbeatTimeout):
debugln("peer.heartbeat.run: ", p.Name())
prevLogIndex := p.getPrevLogIndex()
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
if p.server.State() != Leader {
case flush := <-stopChan:
if !flush {
debugln("peer.heartbeat.stop: ", p.Name())
return
} else {
// before we can safely remove a node
// we must flush the remove command to the node first
p.flush()
debugln("peer.heartbeat.stop: ", p.Name())
return
}
if entries != nil {
p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
} else {
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
}
case <-time.After(p.heartbeatTimeout):
p.flush()
}
}
}
func (p *Peer) flush() {
debugln("peer.heartbeat.run: ", p.Name())
prevLogIndex := p.getPrevLogIndex()
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
if p.server.State() != Leader {
return
}
if entries != nil {
p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
} else {
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
}
}
//--------------------------------------
// Append Entries
//--------------------------------------

View File

@ -719,7 +719,7 @@ func (s *Server) leaderLoop() {
// Stop all peers.
for _, peer := range s.peers {
peer.stopHeartbeat()
peer.stopHeartbeat(false)
}
s.syncedPeer = nil
}
@ -1009,10 +1009,6 @@ func (s *Server) AddPeer(name string) error {
func (s *Server) RemovePeer(name string) error {
s.debugln("server.peer.remove: ", name, len(s.peers))
// Ignore removal of the server itself.
if s.name == name {
return nil
}
// Return error if peer doesn't exist.
peer := s.peers[name]
if peer == nil {
@ -1022,7 +1018,9 @@ func (s *Server) RemovePeer(name string) error {
// TODO: Flush entries to the peer first.
// Stop peer and remove it.
peer.stopHeartbeat()
if s.State() == Leader {
peer.stopHeartbeat(true)
}
delete(s.peers, name)
@ -1043,16 +1041,7 @@ func (s *Server) RemovePeer(name string) error {
// Log compaction
//--------------------------------------
// The background snapshot function
func (s *Server) Snapshot() {
for {
// TODO: change this... to something reasonable
time.Sleep(1 * time.Second)
s.takeSnapshot()
}
}
func (s *Server) takeSnapshot() error {
func (s *Server) TakeSnapshot() error {
//TODO put a snapshot mutex
s.debugln("take Snapshot")
if s.currentSnapshot != nil {