influxdb/peer.go

286 lines
7.0 KiB
Go
Raw Normal View History

2013-04-28 04:51:17 +00:00
package raft
2013-05-05 19:36:23 +00:00
import (
"errors"
"fmt"
2013-05-05 19:36:23 +00:00
"sync"
2013-05-05 20:26:04 +00:00
"time"
2013-05-05 19:36:23 +00:00
)
2013-04-28 04:51:17 +00:00
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
2013-05-05 19:36:23 +00:00
server *Server
2013-04-28 04:51:17 +00:00
name string
2013-05-05 19:36:23 +00:00
prevLogIndex uint64
mutex sync.Mutex
2013-05-05 20:26:04 +00:00
heartbeatTimer *Timer
}
type FlushResponse struct {
2013-06-24 16:52:51 +00:00
term uint64
success bool
2013-06-24 16:52:51 +00:00
err error
peer *Peer
2013-04-28 04:51:17 +00:00
}
//------------------------------------------------------------------------------
//
// Constructor
//
//------------------------------------------------------------------------------
// Creates a new peer.
2013-05-05 20:26:04 +00:00
func NewPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer {
p := &Peer{
2013-05-05 20:26:04 +00:00
server: server,
name: name,
heartbeatTimer: NewTimer(heartbeatTimeout, heartbeatTimeout),
2013-05-05 19:36:23 +00:00
}
2013-05-10 14:47:24 +00:00
return p
2013-04-28 04:51:17 +00:00
}
//------------------------------------------------------------------------------
//
// Accessors
//
//------------------------------------------------------------------------------
// Retrieves the name of the peer.
func (p *Peer) Name() string {
return p.name
}
2013-05-05 19:36:23 +00:00
2013-05-05 20:26:04 +00:00
// Retrieves the heartbeat timeout.
func (p *Peer) HeartbeatTimeout() time.Duration {
return p.heartbeatTimer.MinDuration()
}
// Sets the heartbeat timeout.
func (p *Peer) SetHeartbeatTimeout(duration time.Duration) {
p.heartbeatTimer.SetDuration(duration)
}
func (p *Peer) StartHeartbeat() {
go p.heartbeat()
2013-06-12 16:47:48 +00:00
}
2013-05-05 19:36:23 +00:00
//------------------------------------------------------------------------------
//
// Methods
//
//------------------------------------------------------------------------------
//--------------------------------------
// State
//--------------------------------------
// Stops the peer entirely.
func (p *Peer) stop() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.heartbeatTimer.Stop()
}
2013-06-26 18:25:22 +00:00
//--------------------------------------
// Copying
//--------------------------------------
// Clones the state of the peer. The clone is not attached to a server and
// the heartbeat timer will not exist.
func (p *Peer) clone() *Peer {
p.mutex.Lock()
defer p.mutex.Unlock()
return &Peer{
2013-06-27 00:12:44 +00:00
name: p.name,
2013-06-26 18:25:22 +00:00
prevLogIndex: p.prevLogIndex,
}
}
//--------------------------------------
// Flush
//--------------------------------------
2013-06-24 16:52:51 +00:00
// Sends an AppendEntries RPC but does not obtain a lock
// on the server.
func (p *Peer) flush() (uint64, bool, error) {
2013-06-25 21:41:42 +00:00
// We need to hold the log lock to create AppendEntriesRequest
2013-06-06 03:25:17 +00:00
// avoid snapshot to delete the desired entries before AEQ()
2013-06-25 21:41:42 +00:00
req := p.server.createAppendEntriesRequest(p.prevLogIndex)
2013-06-24 16:52:51 +00:00
2013-06-25 21:41:42 +00:00
if req != nil {
return p.sendFlushRequest(req)
2013-06-06 03:25:17 +00:00
} else {
2013-06-25 21:41:42 +00:00
req := p.server.createSnapshotRequest()
2013-06-05 00:02:45 +00:00
return p.sendSnapshotRequest(req)
2013-06-03 21:58:12 +00:00
}
2013-06-08 02:19:18 +00:00
2013-05-05 19:36:23 +00:00
}
2013-07-01 15:46:53 +00:00
// send VoteRequest Request
func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) {
2013-07-01 15:46:53 +00:00
req.peer = p
debugln(p.server.Name(), "Send Vote Request to ", p.Name())
if resp, _ := p.server.transporter.SendVoteRequest(p.server, p, req); resp != nil {
resp.peer = p
c <- resp
}
}
2013-06-05 05:56:59 +00:00
// send Snapshot Request
2013-06-08 02:19:18 +00:00
func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) (uint64, bool, error) {
2013-06-03 21:58:12 +00:00
// Ignore any null requests.
if req == nil {
return 0, false, errors.New("raft.Peer: Request required")
}
// Generate an snapshot request based on the state of the server and
// log. Send the request through the user-provided handler and process the
// result.
resp, err := p.server.transporter.SendSnapshotRequest(p.server, p, req)
2013-06-03 21:58:12 +00:00
if resp == nil {
return 0, false, err
}
// If successful then update the previous log index. If it was
// unsuccessful then decrement the previous log index and we'll try again
// next time.
if resp.Success {
2013-06-05 00:02:45 +00:00
p.prevLogIndex = req.LastIndex
2013-06-05 05:56:59 +00:00
2013-06-03 21:58:12 +00:00
} else {
panic(resp)
}
2013-06-08 02:19:18 +00:00
return resp.Term, resp.Success, err
2013-06-03 21:58:12 +00:00
}
2013-05-28 19:57:38 +00:00
// Flushes a request through the server's transport.
func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error) {
// Ignore any null requests.
if req == nil {
return 0, false, errors.New("raft.Peer: Request required")
2013-05-05 19:36:23 +00:00
}
2013-06-05 05:56:59 +00:00
2013-05-05 19:36:23 +00:00
// Generate an AppendEntries request based on the state of the server and
// log. Send the request through the user-provided handler and process the
// result.
2013-06-25 20:11:48 +00:00
//debugln("flush to ", p.Name())
2013-06-26 18:10:58 +00:00
debugln("[HeartBeat] Leader ", p.server.Name(), " to ", p.Name(), " ", len(req.Entries), " ", time.Now())
respChan := make(chan *AppendEntriesResponse, 2)
go func() {
tranResp, _ := p.server.transporter.SendAppendEntriesRequest(p.server, p, req)
respChan <- tranResp
}()
var resp *AppendEntriesResponse
select {
// how to decide?
case <-time.After(p.server.heartbeatTimeout * 2):
resp = nil
2013-06-05 05:56:59 +00:00
case resp = <-respChan:
}
2013-05-05 19:36:23 +00:00
if resp == nil {
debugln("receive flush timeout from ", p.Name())
return 0, false, fmt.Errorf("AppendEntries timeout: %s", p.Name())
2013-05-05 19:36:23 +00:00
}
debugln("receive flush response from ", p.Name())
2013-05-05 20:26:04 +00:00
2013-05-05 19:36:23 +00:00
// If successful then update the previous log index. If it was
// unsuccessful then decrement the previous log index and we'll try again
// next time.
if resp.Success {
if len(req.Entries) > 0 {
2013-05-08 20:22:08 +00:00
p.prevLogIndex = req.Entries[len(req.Entries)-1].Index
2013-05-05 19:36:23 +00:00
}
debugln(p.server.GetState()+": Peer ", p.Name(), "'s' log update to ", p.prevLogIndex)
2013-05-05 19:36:23 +00:00
} else {
if resp.Term > p.server.currentTerm {
return resp.Term, false, errors.New("Step down")
}
// we may miss a response from peer
if resp.CommitIndex >= p.prevLogIndex {
debugln(p.server.GetState()+": Peer ", p.Name(), "'s' log update to ", p.prevLogIndex)
p.prevLogIndex = resp.CommitIndex
} else if p.prevLogIndex > 0 {
debugln("Peer ", p.Name(), "'s' step back to ", p.prevLogIndex)
// Decrement the previous log index down until we find a match. Don't
// let it go below where the peer's commit index is though. That's a
// problem.
p.prevLogIndex--
2013-05-05 19:36:23 +00:00
}
}
return resp.Term, resp.Success, nil
2013-05-05 19:36:23 +00:00
}
//--------------------------------------
// Heartbeat
//--------------------------------------
// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
func (p *Peer) heartbeat() {
for {
2013-06-08 02:19:18 +00:00
// (1) timeout/fire happens, flush the peer
// (2) stopped, return
2013-06-24 16:52:51 +00:00
if p.heartbeatTimer.Start() {
2013-06-24 16:52:51 +00:00
var f FlushResponse
f.peer = p
2013-06-24 16:52:51 +00:00
f.term, f.success, f.err = p.flush()
2013-06-24 16:52:51 +00:00
// if the peer successfully appended the log entry
// we will tell the commit center
if f.success {
if p.prevLogIndex > p.server.log.CommitIndex() {
2013-06-25 20:11:48 +00:00
debugln("[Heartbeat] Peer", p.Name(), "send to commit center")
2013-06-11 22:30:13 +00:00
p.server.response <- f
2013-06-25 20:11:48 +00:00
debugln("[Heartbeat] Peer", p.Name(), "back from commit center")
}
2013-06-24 16:52:51 +00:00
} else {
// shutdown the heartbeat
if f.term > p.server.currentTerm {
p.server.stateMutex.Lock()
if p.server.state == Leader {
p.server.state = Follower
select {
case p.server.stepDown <- f.term:
p.server.currentTerm = f.term
default:
panic("heartbeat cannot step down")
}
}
p.server.stateMutex.Unlock()
return
}
}
} else {
// shutdown
return
}
}
}