2013-04-28 04:51:17 +00:00
|
|
|
package raft
|
|
|
|
|
2013-05-05 19:36:23 +00:00
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
"sync"
|
2013-05-05 20:26:04 +00:00
|
|
|
"time"
|
2013-05-05 19:36:23 +00:00
|
|
|
)
|
|
|
|
|
2013-04-28 04:51:17 +00:00
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
//
|
|
|
|
// Typedefs
|
|
|
|
//
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
// A peer is a reference to another server involved in the consensus protocol.
|
|
|
|
type Peer struct {
|
2013-05-05 19:36:23 +00:00
|
|
|
server *Server
|
2013-04-28 04:51:17 +00:00
|
|
|
name string
|
2013-05-05 19:36:23 +00:00
|
|
|
prevLogIndex uint64
|
|
|
|
mutex sync.Mutex
|
2013-05-05 20:26:04 +00:00
|
|
|
heartbeatTimer *Timer
|
2013-04-28 04:51:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
//
|
|
|
|
// Constructor
|
|
|
|
//
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
// Creates a new peer.
|
2013-05-05 20:26:04 +00:00
|
|
|
func NewPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer {
|
2013-05-05 21:41:55 +00:00
|
|
|
p := &Peer{
|
2013-05-05 20:26:04 +00:00
|
|
|
server: server,
|
|
|
|
name: name,
|
|
|
|
heartbeatTimer: NewTimer(heartbeatTimeout, heartbeatTimeout),
|
2013-05-05 19:36:23 +00:00
|
|
|
}
|
2013-05-10 14:47:24 +00:00
|
|
|
|
2013-05-05 21:41:55 +00:00
|
|
|
// Start the heartbeat timeout.
|
|
|
|
go p.heartbeatTimeoutFunc()
|
|
|
|
|
|
|
|
return p
|
2013-04-28 04:51:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
//
|
|
|
|
// Accessors
|
|
|
|
//
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
// Retrieves the name of the peer.
|
|
|
|
func (p *Peer) Name() string {
|
|
|
|
return p.name
|
|
|
|
}
|
2013-05-05 19:36:23 +00:00
|
|
|
|
2013-05-05 20:26:04 +00:00
|
|
|
// Retrieves the heartbeat timeout.
|
|
|
|
func (p *Peer) HeartbeatTimeout() time.Duration {
|
|
|
|
return p.heartbeatTimer.MinDuration()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Sets the heartbeat timeout.
|
|
|
|
func (p *Peer) SetHeartbeatTimeout(duration time.Duration) {
|
|
|
|
p.heartbeatTimer.SetDuration(duration)
|
|
|
|
}
|
|
|
|
|
2013-05-05 19:36:23 +00:00
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
//
|
|
|
|
// Methods
|
|
|
|
//
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
2013-05-05 21:41:55 +00:00
|
|
|
//--------------------------------------
|
|
|
|
// State
|
|
|
|
//--------------------------------------
|
|
|
|
|
|
|
|
// Resumes the peer heartbeating.
|
|
|
|
func (p *Peer) resume() {
|
|
|
|
p.mutex.Lock()
|
|
|
|
defer p.mutex.Unlock()
|
|
|
|
p.heartbeatTimer.Reset()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Pauses the peer to prevent heartbeating.
|
|
|
|
func (p *Peer) pause() {
|
|
|
|
p.mutex.Lock()
|
|
|
|
defer p.mutex.Unlock()
|
|
|
|
p.heartbeatTimer.Pause()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Stops the peer entirely.
|
|
|
|
func (p *Peer) stop() {
|
|
|
|
p.mutex.Lock()
|
|
|
|
defer p.mutex.Unlock()
|
|
|
|
p.heartbeatTimer.Stop()
|
|
|
|
}
|
|
|
|
|
|
|
|
//--------------------------------------
|
|
|
|
// Flush
|
|
|
|
//--------------------------------------
|
|
|
|
|
2013-05-05 19:36:23 +00:00
|
|
|
// Sends an AppendEntries RPC but does not obtain a lock on the server. This
|
|
|
|
// method should only be called from the server.
|
|
|
|
func (p *Peer) internalFlush() (uint64, bool, error) {
|
|
|
|
p.mutex.Lock()
|
|
|
|
defer p.mutex.Unlock()
|
|
|
|
req, handler := p.server.createInternalAppendEntriesRequest(p.prevLogIndex)
|
|
|
|
return p.sendFlushRequest(req, handler)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Flushes a request through a handler.
|
|
|
|
func (p *Peer) sendFlushRequest(req *AppendEntriesRequest, handler func(*Server, *Peer, *AppendEntriesRequest) (*AppendEntriesResponse, error)) (uint64, bool, error) {
|
|
|
|
// Ignore any null requests/handlers.
|
|
|
|
if req == nil || handler == nil {
|
|
|
|
return 0, false, errors.New("raft.Peer: Request or handler required")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Generate an AppendEntries request based on the state of the server and
|
|
|
|
// log. Send the request through the user-provided handler and process the
|
|
|
|
// result.
|
|
|
|
resp, err := handler(p.server, p, req)
|
2013-05-05 21:41:55 +00:00
|
|
|
p.heartbeatTimer.Reset()
|
2013-05-05 19:36:23 +00:00
|
|
|
if resp == nil {
|
|
|
|
return 0, false, err
|
|
|
|
}
|
2013-05-05 20:26:04 +00:00
|
|
|
|
2013-05-05 19:36:23 +00:00
|
|
|
// If successful then update the previous log index. If it was
|
|
|
|
// unsuccessful then decrement the previous log index and we'll try again
|
|
|
|
// next time.
|
|
|
|
if resp.Success {
|
|
|
|
if len(req.Entries) > 0 {
|
2013-05-08 20:22:08 +00:00
|
|
|
p.prevLogIndex = req.Entries[len(req.Entries)-1].Index
|
2013-05-05 19:36:23 +00:00
|
|
|
}
|
|
|
|
} else {
|
2013-05-10 14:47:24 +00:00
|
|
|
// Decrement the previous log index down until we find a match. Don't
|
|
|
|
// let it go below where the peer's commit index is though. That's a
|
|
|
|
// problem.
|
2013-05-05 19:36:23 +00:00
|
|
|
if p.prevLogIndex > 0 {
|
|
|
|
p.prevLogIndex--
|
2013-05-11 03:54:25 +00:00
|
|
|
}
|
|
|
|
if resp.CommitIndex > p.prevLogIndex {
|
|
|
|
p.prevLogIndex = resp.CommitIndex
|
2013-05-05 19:36:23 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return resp.Term, resp.Success, err
|
|
|
|
}
|
2013-05-05 21:41:55 +00:00
|
|
|
|
|
|
|
//--------------------------------------
|
|
|
|
// Heartbeat
|
|
|
|
//--------------------------------------
|
|
|
|
|
|
|
|
// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
|
|
|
|
func (p *Peer) heartbeatTimeoutFunc() {
|
|
|
|
for {
|
|
|
|
// Grab the current timer channel.
|
|
|
|
p.mutex.Lock()
|
|
|
|
var c chan time.Time
|
|
|
|
if p.heartbeatTimer != nil {
|
|
|
|
c = p.heartbeatTimer.C()
|
|
|
|
}
|
|
|
|
p.mutex.Unlock()
|
|
|
|
|
|
|
|
// If the channel or timer are gone then exit.
|
|
|
|
if c == nil {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
// Flush the peer when we get a heartbeat timeout. If the channel is
|
|
|
|
// closed then the peer is getting cleaned up and we should exit.
|
2013-05-10 14:47:24 +00:00
|
|
|
if _, ok := <-c; ok {
|
2013-05-28 18:46:27 +00:00
|
|
|
// Retrieve the peer data within a lock that is separate from the
|
|
|
|
// server lock when creating the request. Otherwise a deadlock can
|
|
|
|
// occur.
|
|
|
|
p.mutex.Lock()
|
|
|
|
server, prevLogIndex := p.server, p.prevLogIndex
|
|
|
|
p.mutex.Unlock()
|
|
|
|
|
|
|
|
// Lock the server to create a request.
|
|
|
|
req, handler := server.createAppendEntriesRequest(prevLogIndex)
|
|
|
|
|
|
|
|
p.mutex.Lock()
|
|
|
|
p.sendFlushRequest(req, handler)
|
|
|
|
p.mutex.Unlock()
|
2013-05-05 21:41:55 +00:00
|
|
|
} else {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|