Add Peer.LastActivity().

pull/820/head
Ben Johnson 2014-02-28 09:20:00 -07:00
parent 6251f50349
commit c2dc64e2a6
1 changed files with 11 additions and 0 deletions

11
peer.go
View File

@ -20,6 +20,7 @@ type Peer struct {
mutex sync.RWMutex
stopChan chan bool
heartbeatInterval time.Duration
lastActivity time.Time
}
//------------------------------------------------------------------------------
@ -90,6 +91,11 @@ func (p *Peer) stopHeartbeat(flush bool) {
p.stopChan <- flush
}
// LastActivity returns the last time any response was received from the peer.
func (p *Peer) LastActivity() time.Time {
return p.lastActivity
}
//--------------------------------------
// Copying
//--------------------------------------
@ -103,6 +109,7 @@ func (p *Peer) clone() *Peer {
Name: p.Name,
ConnectionString: p.ConnectionString,
prevLogIndex: p.prevLogIndex,
lastActivity: p.lastActivity,
}
}
@ -176,6 +183,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
// If successful then update the previous log index.
p.mutex.Lock()
p.lastActivity = time.Now()
if resp.Success() {
if len(req.Entries) > 0 {
p.prevLogIndex = req.Entries[len(req.Entries)-1].GetIndex()
@ -243,6 +251,7 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
// If successful, the peer should have been to snapshot state
// Send it the snapshot!
p.lastActivity = time.Now()
if resp.Success {
p.sendSnapshotRecoveryRequest()
} else {
@ -263,6 +272,7 @@ func (p *Peer) sendSnapshotRecoveryRequest() {
return
}
p.lastActivity = time.Now()
if resp.Success {
p.prevLogIndex = req.LastIndex
} else {
@ -283,6 +293,7 @@ func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteRespo
req.peer = p
if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil {
debugln("peer.vote.recv: ", p.server.Name(), "<-", p.Name)
p.lastActivity = time.Now()
resp.peer = p
c <- resp
} else {