influxdb/peer.go

272 lines
7.4 KiB
Go
Raw Normal View History

2013-04-28 04:51:17 +00:00
package raft
2013-05-05 19:36:23 +00:00
import (
"sync"
2013-05-05 20:26:04 +00:00
"time"
2013-05-05 19:36:23 +00:00
)
2013-04-28 04:51:17 +00:00
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
2013-07-07 20:55:55 +00:00
server *Server
name string
prevLogIndex uint64
2013-07-07 22:12:24 +00:00
mutex sync.RWMutex
2013-07-07 20:55:55 +00:00
stopChan chan bool
2013-07-07 20:21:04 +00:00
heartbeatTimeout time.Duration
2013-04-28 04:51:17 +00:00
}
//------------------------------------------------------------------------------
//
// Constructor
//
//------------------------------------------------------------------------------
// Creates a new peer.
2013-07-06 04:49:47 +00:00
func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer {
2013-07-07 20:21:04 +00:00
return &Peer{
2013-07-07 20:55:55 +00:00
server: server,
name: name,
2013-07-07 20:21:04 +00:00
heartbeatTimeout: heartbeatTimeout,
2013-05-05 19:36:23 +00:00
}
2013-04-28 04:51:17 +00:00
}
//------------------------------------------------------------------------------
//
// Accessors
//
//------------------------------------------------------------------------------
// Retrieves the name of the peer.
func (p *Peer) Name() string {
return p.name
}
2013-05-05 19:36:23 +00:00
2013-05-05 20:26:04 +00:00
// Sets the heartbeat timeout.
2013-07-06 04:49:47 +00:00
func (p *Peer) setHeartbeatTimeout(duration time.Duration) {
2013-07-07 20:21:04 +00:00
p.heartbeatTimeout = duration
2013-05-05 20:26:04 +00:00
}
2013-07-07 22:12:24 +00:00
//--------------------------------------
// Prev log index
//--------------------------------------
// Retrieves the previous log index.
func (p *Peer) getPrevLogIndex() uint64 {
p.mutex.RLock()
defer p.mutex.RUnlock()
return p.prevLogIndex
}
// Sets the previous log index.
func (p *Peer) setPrevLogIndex(value uint64) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.prevLogIndex = value
}
2013-05-05 19:36:23 +00:00
//------------------------------------------------------------------------------
//
// Methods
//
//------------------------------------------------------------------------------
//--------------------------------------
2013-07-07 20:21:04 +00:00
// Heartbeat
//--------------------------------------
2013-07-07 20:21:04 +00:00
// Starts the peer heartbeat.
func (p *Peer) startHeartbeat() {
p.stopChan = make(chan bool, 1)
2013-07-07 20:21:04 +00:00
c := make(chan bool)
go p.heartbeat(c)
2013-07-07 20:55:55 +00:00
<-c
2013-07-07 20:21:04 +00:00
}
// Stops the peer heartbeat.
func (p *Peer) stopHeartbeat() {
2013-07-08 02:58:01 +00:00
// here is a problem
2013-07-10 17:31:56 +00:00
// the previous stop is no buffer leader may get blocked
// when heartbeat returns at line 132
2013-07-08 02:58:01 +00:00
// I make the channel with 1 buffer
// and try to panic here
select {
2013-07-09 02:55:00 +00:00
case p.stopChan <- true:
2013-07-08 02:58:01 +00:00
2013-07-09 02:55:00 +00:00
default:
panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat")
2013-07-08 02:58:01 +00:00
}
}
2013-06-26 18:25:22 +00:00
//--------------------------------------
// Copying
//--------------------------------------
// Clones the state of the peer. The clone is not attached to a server and
// the heartbeat timer will not exist.
func (p *Peer) clone() *Peer {
p.mutex.Lock()
defer p.mutex.Unlock()
return &Peer{
2013-06-27 00:12:44 +00:00
name: p.name,
2013-06-26 18:25:22 +00:00
prevLogIndex: p.prevLogIndex,
}
}
//--------------------------------------
2013-07-07 20:21:04 +00:00
// Heartbeat
//--------------------------------------
2013-07-07 20:21:04 +00:00
// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
func (p *Peer) heartbeat(c chan bool) {
stopChan := p.stopChan
2013-07-07 20:21:04 +00:00
c <- true
2013-05-05 19:36:23 +00:00
2013-07-09 02:55:00 +00:00
debugln("peer.heartbeat: ", p.Name(), p.heartbeatTimeout)
2013-07-07 20:21:04 +00:00
for {
select {
case <-stopChan:
2013-07-09 02:55:00 +00:00
debugln("peer.heartbeat.stop: ", p.Name())
2013-07-07 20:21:04 +00:00
return
2013-07-07 20:55:55 +00:00
2013-07-07 20:21:04 +00:00
case <-time.After(p.heartbeatTimeout):
2013-07-09 02:55:00 +00:00
debugln("peer.heartbeat.run: ", p.Name())
prevLogIndex := p.getPrevLogIndex()
2013-07-15 05:48:41 +00:00
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
2013-07-07 22:12:24 +00:00
if p.server.State() != Leader {
return
}
if entries != nil {
p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
} else {
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
}
2013-07-07 20:21:04 +00:00
}
2013-07-01 15:46:53 +00:00
}
}
2013-07-07 20:21:04 +00:00
//--------------------------------------
// Append Entries
//--------------------------------------
2013-06-05 05:56:59 +00:00
2013-07-07 20:21:04 +00:00
// Sends an AppendEntries request to the peer through the transport.
func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
2013-07-06 19:41:42 +00:00
traceln("peer.flush.send: ", p.server.Name(), "->", p.Name(), " ", len(req.Entries))
2013-07-07 22:12:24 +00:00
resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)
2013-05-05 19:36:23 +00:00
if resp == nil {
2013-07-07 20:21:04 +00:00
debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name())
return
2013-05-05 19:36:23 +00:00
}
2013-07-06 19:41:42 +00:00
traceln("peer.flush.recv: ", p.Name())
2013-05-05 20:26:04 +00:00
2013-07-07 20:21:04 +00:00
// If successful then update the previous log index.
2013-07-07 22:12:24 +00:00
p.mutex.Lock()
2013-05-05 19:36:23 +00:00
if resp.Success {
if len(req.Entries) > 0 {
2013-05-08 20:22:08 +00:00
p.prevLogIndex = req.Entries[len(req.Entries)-1].Index
// if peer append a log entry from the current term
// we set append to true
if req.Entries[len(req.Entries)-1].Term == p.server.currentTerm {
resp.append = true
}
2013-05-05 19:36:23 +00:00
}
2013-07-07 20:21:04 +00:00
traceln("peer.flush.success: ", p.server.Name(), "->", p.Name(), "; idx =", p.prevLogIndex)
2013-07-07 20:55:55 +00:00
// If it was unsuccessful then decrement the previous log index and
// we'll try again next time.
2013-07-07 20:21:04 +00:00
} else {
if resp.CommitIndex >= p.prevLogIndex {
// we may miss a response from peer
// so maybe the peer has commited the logs we sent
// but we did not receive the success reply and did not increase
// the prevLogIndex
p.prevLogIndex = resp.CommitIndex
2013-07-07 20:21:04 +00:00
debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
} else if p.prevLogIndex > 0 {
// Decrement the previous log index down until we find a match. Don't
// let it go below where the peer's commit index is though. That's a
// problem.
p.prevLogIndex--
// if it not enough, we directly decrease to the index of the
if p.prevLogIndex > resp.Index {
p.prevLogIndex = resp.Index
}
2013-07-07 20:21:04 +00:00
debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
2013-05-05 19:36:23 +00:00
}
2013-07-07 20:21:04 +00:00
}
2013-07-07 22:12:24 +00:00
p.mutex.Unlock()
2013-05-05 19:36:23 +00:00
// Attach the peer to resp, thus server can know where it comes from
resp.peer = p.Name()
2013-07-07 20:21:04 +00:00
// Send response to server for processing.
p.server.send(resp)
}
// Sends an Snapshot request to the peer through the transport.
func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
debugln("peer.snap.send: ", p.name)
2013-07-07 22:12:24 +00:00
resp := p.server.Transporter().SendSnapshotRequest(p.server, p, req)
2013-07-07 20:21:04 +00:00
if resp == nil {
debugln("peer.snap.timeout: ", p.name)
return
}
2013-07-07 20:21:04 +00:00
debugln("peer.snap.recv: ", p.name)
2013-07-17 00:40:19 +00:00
// If successful, the peer should have been to snapshot state
// Send it the snapshot!
2013-07-07 20:21:04 +00:00
if resp.Success {
p.sendSnapshotRecoveryRequest()
2013-07-07 20:21:04 +00:00
} else {
debugln("peer.snap.failed: ", p.name)
2013-07-17 00:40:19 +00:00
return
2013-07-07 20:21:04 +00:00
}
2013-07-07 20:55:55 +00:00
}
// Sends an Snapshot Recovery request to the peer through the transport.
func (p *Peer) sendSnapshotRecoveryRequest() {
req := newSnapshotRecoveryRequest(p.server.name, p.server.lastSnapshot)
debugln("peer.snap.recovery.send: ", p.name)
resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req)
if resp.Success {
p.prevLogIndex = req.LastIndex
} else {
debugln("peer.snap.recovery.failed: ", p.name)
return
}
2013-07-07 20:21:04 +00:00
// Send response to server for processing.
p.server.send(&AppendEntriesResponse{Term: resp.Term, Success: resp.Success, append: (resp.Term == p.server.currentTerm)})
2013-05-05 19:36:23 +00:00
}
//--------------------------------------
2013-07-07 20:21:04 +00:00
// Vote Requests
//--------------------------------------
2013-07-07 20:21:04 +00:00
// send VoteRequest Request
func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) {
debugln("peer.vote: ", p.server.Name(), "->", p.Name())
req.peer = p
2013-07-07 22:12:24 +00:00
if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil {
debugln("peer.vote: recv", p.server.Name(), "<-", p.Name())
2013-07-07 20:21:04 +00:00
resp.peer = p
c <- resp
}
}