2013-04-28 04:51:17 +00:00
|
|
|
package raft
|
|
|
|
|
2013-05-05 19:36:23 +00:00
|
|
|
import (
|
|
|
|
"sync"
|
2013-05-05 20:26:04 +00:00
|
|
|
"time"
|
2013-05-05 19:36:23 +00:00
|
|
|
)
|
|
|
|
|
2013-04-28 04:51:17 +00:00
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
//
|
|
|
|
// Typedefs
|
|
|
|
//
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
// A peer is a reference to another server involved in the consensus protocol.
|
|
|
|
type Peer struct {
|
2013-07-07 20:55:55 +00:00
|
|
|
server *Server
|
|
|
|
name string
|
|
|
|
prevLogIndex uint64
|
2013-07-07 22:12:24 +00:00
|
|
|
mutex sync.RWMutex
|
2013-07-07 20:55:55 +00:00
|
|
|
stopChan chan bool
|
2013-07-07 20:21:04 +00:00
|
|
|
heartbeatTimeout time.Duration
|
2013-04-28 04:51:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
//
|
|
|
|
// Constructor
|
|
|
|
//
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
// Creates a new peer.
|
2013-07-06 04:49:47 +00:00
|
|
|
func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer {
|
2013-07-07 20:21:04 +00:00
|
|
|
return &Peer{
|
2013-07-07 20:55:55 +00:00
|
|
|
server: server,
|
|
|
|
name: name,
|
|
|
|
stopChan: make(chan bool),
|
2013-07-07 20:21:04 +00:00
|
|
|
heartbeatTimeout: heartbeatTimeout,
|
2013-05-05 19:36:23 +00:00
|
|
|
}
|
2013-04-28 04:51:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
//
|
|
|
|
// Accessors
|
|
|
|
//
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
// Retrieves the name of the peer.
|
|
|
|
func (p *Peer) Name() string {
|
|
|
|
return p.name
|
|
|
|
}
|
2013-05-05 19:36:23 +00:00
|
|
|
|
2013-05-05 20:26:04 +00:00
|
|
|
// Sets the heartbeat timeout.
|
2013-07-06 04:49:47 +00:00
|
|
|
func (p *Peer) setHeartbeatTimeout(duration time.Duration) {
|
2013-07-07 20:21:04 +00:00
|
|
|
p.heartbeatTimeout = duration
|
2013-05-05 20:26:04 +00:00
|
|
|
}
|
|
|
|
|
2013-07-07 22:12:24 +00:00
|
|
|
//--------------------------------------
|
|
|
|
// Prev log index
|
|
|
|
//--------------------------------------
|
|
|
|
|
|
|
|
// Retrieves the previous log index.
|
|
|
|
func (p *Peer) getPrevLogIndex() uint64 {
|
|
|
|
p.mutex.RLock()
|
|
|
|
defer p.mutex.RUnlock()
|
|
|
|
return p.prevLogIndex
|
|
|
|
}
|
|
|
|
|
|
|
|
// Sets the previous log index.
|
|
|
|
func (p *Peer) setPrevLogIndex(value uint64) {
|
|
|
|
p.mutex.Lock()
|
|
|
|
defer p.mutex.Unlock()
|
|
|
|
p.prevLogIndex = value
|
|
|
|
}
|
|
|
|
|
2013-05-05 19:36:23 +00:00
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
//
|
|
|
|
// Methods
|
|
|
|
//
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
2013-05-05 21:41:55 +00:00
|
|
|
//--------------------------------------
|
2013-07-07 20:21:04 +00:00
|
|
|
// Heartbeat
|
2013-05-05 21:41:55 +00:00
|
|
|
//--------------------------------------
|
|
|
|
|
2013-07-07 20:21:04 +00:00
|
|
|
// Starts the peer heartbeat.
|
|
|
|
func (p *Peer) startHeartbeat() {
|
|
|
|
c := make(chan bool)
|
|
|
|
go p.heartbeat(c)
|
2013-07-07 20:55:55 +00:00
|
|
|
<-c
|
2013-07-07 20:21:04 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Stops the peer heartbeat.
|
|
|
|
func (p *Peer) stopHeartbeat() {
|
|
|
|
p.stopChan <- true
|
2013-05-05 21:41:55 +00:00
|
|
|
}
|
|
|
|
|
2013-06-26 18:25:22 +00:00
|
|
|
//--------------------------------------
|
|
|
|
// Copying
|
|
|
|
//--------------------------------------
|
|
|
|
|
|
|
|
// Clones the state of the peer. The clone is not attached to a server and
|
|
|
|
// the heartbeat timer will not exist.
|
|
|
|
func (p *Peer) clone() *Peer {
|
|
|
|
p.mutex.Lock()
|
|
|
|
defer p.mutex.Unlock()
|
|
|
|
return &Peer{
|
2013-06-27 00:12:44 +00:00
|
|
|
name: p.name,
|
2013-06-26 18:25:22 +00:00
|
|
|
prevLogIndex: p.prevLogIndex,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-05-05 21:41:55 +00:00
|
|
|
//--------------------------------------
|
2013-07-07 20:21:04 +00:00
|
|
|
// Heartbeat
|
2013-05-05 21:41:55 +00:00
|
|
|
//--------------------------------------
|
|
|
|
|
2013-07-07 20:21:04 +00:00
|
|
|
// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
|
|
|
|
func (p *Peer) heartbeat(c chan bool) {
|
|
|
|
c <- true
|
2013-05-05 19:36:23 +00:00
|
|
|
|
2013-07-07 20:21:04 +00:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-p.stopChan:
|
|
|
|
return
|
2013-07-07 20:55:55 +00:00
|
|
|
|
2013-07-07 20:21:04 +00:00
|
|
|
case <-time.After(p.heartbeatTimeout):
|
2013-07-07 22:12:24 +00:00
|
|
|
if p.server.State() != Leader {
|
|
|
|
return
|
|
|
|
}
|
2013-07-07 20:21:04 +00:00
|
|
|
p.flush()
|
|
|
|
}
|
2013-07-01 15:46:53 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-07-07 20:21:04 +00:00
|
|
|
//--------------------------------------
|
|
|
|
// Append Entries
|
|
|
|
//--------------------------------------
|
2013-06-05 05:56:59 +00:00
|
|
|
|
2013-07-07 20:21:04 +00:00
|
|
|
// Sends an AppendEntries RPC.
|
|
|
|
func (p *Peer) flush() {
|
2013-07-07 22:12:24 +00:00
|
|
|
prevLogIndex := p.getPrevLogIndex()
|
|
|
|
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex)
|
2013-07-07 20:21:04 +00:00
|
|
|
if entries != nil {
|
2013-07-07 22:12:24 +00:00
|
|
|
p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, p.server.name, prevLogIndex, prevLogTerm, entries, p.server.log.CommitIndex()))
|
2013-06-03 21:58:12 +00:00
|
|
|
} else {
|
2013-07-07 20:21:04 +00:00
|
|
|
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
|
2013-06-03 21:58:12 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-07-07 20:21:04 +00:00
|
|
|
// Sends an AppendEntries request to the peer through the transport.
|
|
|
|
func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
|
2013-07-06 19:41:42 +00:00
|
|
|
traceln("peer.flush.send: ", p.server.Name(), "->", p.Name(), " ", len(req.Entries))
|
2013-06-28 23:14:41 +00:00
|
|
|
|
2013-07-07 22:12:24 +00:00
|
|
|
resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)
|
2013-05-05 19:36:23 +00:00
|
|
|
if resp == nil {
|
2013-07-07 20:21:04 +00:00
|
|
|
debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name())
|
|
|
|
return
|
2013-05-05 19:36:23 +00:00
|
|
|
}
|
2013-07-06 19:41:42 +00:00
|
|
|
traceln("peer.flush.recv: ", p.Name())
|
2013-05-05 20:26:04 +00:00
|
|
|
|
2013-07-07 20:21:04 +00:00
|
|
|
// If successful then update the previous log index.
|
2013-07-07 22:12:24 +00:00
|
|
|
p.mutex.Lock()
|
2013-05-05 19:36:23 +00:00
|
|
|
if resp.Success {
|
|
|
|
if len(req.Entries) > 0 {
|
2013-05-08 20:22:08 +00:00
|
|
|
p.prevLogIndex = req.Entries[len(req.Entries)-1].Index
|
2013-05-05 19:36:23 +00:00
|
|
|
}
|
2013-07-07 20:21:04 +00:00
|
|
|
traceln("peer.flush.success: ", p.server.Name(), "->", p.Name(), "; idx =", p.prevLogIndex)
|
2013-07-03 16:45:57 +00:00
|
|
|
|
2013-07-07 20:55:55 +00:00
|
|
|
// If it was unsuccessful then decrement the previous log index and
|
|
|
|
// we'll try again next time.
|
2013-07-07 20:21:04 +00:00
|
|
|
} else {
|
2013-07-03 16:45:57 +00:00
|
|
|
// we may miss a response from peer
|
2013-07-05 23:45:55 +00:00
|
|
|
if resp.CommitIndex >= p.prevLogIndex {
|
2013-07-03 16:45:57 +00:00
|
|
|
p.prevLogIndex = resp.CommitIndex
|
2013-07-07 20:21:04 +00:00
|
|
|
debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
|
2013-07-03 16:45:57 +00:00
|
|
|
} else if p.prevLogIndex > 0 {
|
|
|
|
// Decrement the previous log index down until we find a match. Don't
|
|
|
|
// let it go below where the peer's commit index is though. That's a
|
|
|
|
// problem.
|
|
|
|
p.prevLogIndex--
|
2013-07-07 20:21:04 +00:00
|
|
|
debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
|
2013-05-05 19:36:23 +00:00
|
|
|
}
|
2013-07-07 20:21:04 +00:00
|
|
|
}
|
2013-07-07 22:12:24 +00:00
|
|
|
p.mutex.Unlock()
|
2013-05-05 19:36:23 +00:00
|
|
|
|
2013-07-07 20:21:04 +00:00
|
|
|
// Send response to server for processing.
|
|
|
|
p.server.send(resp)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Sends an Snapshot request to the peer through the transport.
|
|
|
|
func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
|
|
|
|
debugln("peer.snap.send: ", p.name)
|
|
|
|
|
2013-07-07 22:12:24 +00:00
|
|
|
resp := p.server.Transporter().SendSnapshotRequest(p.server, p, req)
|
2013-07-07 20:21:04 +00:00
|
|
|
if resp == nil {
|
|
|
|
debugln("peer.snap.timeout: ", p.name)
|
|
|
|
return
|
2013-07-03 16:45:57 +00:00
|
|
|
}
|
2013-07-07 20:21:04 +00:00
|
|
|
|
|
|
|
debugln("peer.snap.recv: ", p.name)
|
|
|
|
|
|
|
|
// If successful then update the previous log index.
|
|
|
|
if resp.Success {
|
2013-07-07 22:12:24 +00:00
|
|
|
p.setPrevLogIndex(req.LastIndex)
|
2013-07-07 20:21:04 +00:00
|
|
|
} else {
|
|
|
|
debugln("peer.snap.failed: ", p.name)
|
|
|
|
}
|
2013-07-07 20:55:55 +00:00
|
|
|
|
2013-07-07 20:21:04 +00:00
|
|
|
// Send response to server for processing.
|
|
|
|
p.server.send(resp)
|
2013-05-05 19:36:23 +00:00
|
|
|
}
|
2013-05-05 21:41:55 +00:00
|
|
|
|
|
|
|
//--------------------------------------
|
2013-07-07 20:21:04 +00:00
|
|
|
// Vote Requests
|
2013-05-05 21:41:55 +00:00
|
|
|
//--------------------------------------
|
|
|
|
|
2013-07-07 20:21:04 +00:00
|
|
|
// send VoteRequest Request
|
|
|
|
func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) {
|
|
|
|
debugln("peer.vote: ", p.server.Name(), "->", p.Name())
|
|
|
|
req.peer = p
|
2013-07-07 22:12:24 +00:00
|
|
|
if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil {
|
2013-07-07 20:21:04 +00:00
|
|
|
resp.peer = p
|
|
|
|
c <- resp
|
2013-05-05 21:41:55 +00:00
|
|
|
}
|
|
|
|
}
|