influxdb/peer.go

243 lines
6.3 KiB
Go
Raw Normal View History

2013-04-28 04:51:17 +00:00
package raft
2013-05-05 19:36:23 +00:00
import (
"errors"
"sync"
2013-05-05 20:26:04 +00:00
"time"
2013-06-05 00:02:45 +00:00
"fmt"
2013-05-05 19:36:23 +00:00
)
2013-04-28 04:51:17 +00:00
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
2013-05-05 19:36:23 +00:00
server *Server
2013-04-28 04:51:17 +00:00
name string
2013-05-05 19:36:23 +00:00
prevLogIndex uint64
mutex sync.Mutex
2013-05-05 20:26:04 +00:00
heartbeatTimer *Timer
2013-04-28 04:51:17 +00:00
}
//------------------------------------------------------------------------------
//
// Constructor
//
//------------------------------------------------------------------------------
// Creates a new peer.
2013-05-05 20:26:04 +00:00
func NewPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer {
p := &Peer{
2013-05-05 20:26:04 +00:00
server: server,
name: name,
heartbeatTimer: NewTimer(heartbeatTimeout, heartbeatTimeout),
2013-05-05 19:36:23 +00:00
}
2013-05-10 14:47:24 +00:00
// Start the heartbeat timeout.
go p.heartbeatTimeoutFunc()
return p
2013-04-28 04:51:17 +00:00
}
//------------------------------------------------------------------------------
//
// Accessors
//
//------------------------------------------------------------------------------
// Retrieves the name of the peer.
func (p *Peer) Name() string {
return p.name
}
2013-05-05 19:36:23 +00:00
2013-05-05 20:26:04 +00:00
// Retrieves the heartbeat timeout.
func (p *Peer) HeartbeatTimeout() time.Duration {
return p.heartbeatTimer.MinDuration()
}
// Sets the heartbeat timeout.
func (p *Peer) SetHeartbeatTimeout(duration time.Duration) {
p.heartbeatTimer.SetDuration(duration)
}
2013-05-05 19:36:23 +00:00
//------------------------------------------------------------------------------
//
// Methods
//
//------------------------------------------------------------------------------
//--------------------------------------
// State
//--------------------------------------
// Resumes the peer heartbeating.
func (p *Peer) resume() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.heartbeatTimer.Reset()
}
// Pauses the peer to prevent heartbeating.
func (p *Peer) pause() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.heartbeatTimer.Pause()
}
// Stops the peer entirely.
func (p *Peer) stop() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.heartbeatTimer.Stop()
}
//--------------------------------------
// Flush
//--------------------------------------
2013-05-05 19:36:23 +00:00
// Sends an AppendEntries RPC but does not obtain a lock on the server. This
// method should only be called from the server.
func (p *Peer) internalFlush() (uint64, bool, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
2013-06-03 21:58:12 +00:00
2013-06-05 00:02:45 +00:00
fmt.Println("internal flush!")
2013-06-03 21:58:12 +00:00
if p.prevLogIndex < p.server.log.StartIndex() {
req := p.server.createSnapshotRequest()
2013-06-05 00:02:45 +00:00
return p.sendSnapshotRequest(req)
2013-06-03 21:58:12 +00:00
}
2013-05-28 19:57:38 +00:00
req := p.server.createInternalAppendEntriesRequest(p.prevLogIndex)
return p.sendFlushRequest(req)
2013-05-05 19:36:23 +00:00
}
2013-06-03 21:58:12 +00:00
// TODO add this function
func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) (uint64, bool, error){
// Ignore any null requests.
if req == nil {
return 0, false, errors.New("raft.Peer: Request required")
}
// Generate an snapshot request based on the state of the server and
// log. Send the request through the user-provided handler and process the
// result.
resp, err := p.server.transporter.SendSnapshotRequest(p.server, p, req)
p.heartbeatTimer.Reset()
if resp == nil {
return 0, false, err
}
// If successful then update the previous log index. If it was
// unsuccessful then decrement the previous log index and we'll try again
// next time.
if resp.Success {
2013-06-05 00:02:45 +00:00
p.prevLogIndex = req.LastIndex
fmt.Println("update peer preindex to ", p.prevLogIndex)
2013-06-03 21:58:12 +00:00
} else {
panic(resp)
}
return resp.Term, resp.Success, err
}
2013-05-28 19:57:38 +00:00
// Flushes a request through the server's transport.
func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error) {
// Ignore any null requests.
if req == nil {
return 0, false, errors.New("raft.Peer: Request required")
2013-05-05 19:36:23 +00:00
}
2013-06-05 00:02:45 +00:00
fmt.Println("FLUSH: before trans!")
2013-05-05 19:36:23 +00:00
// Generate an AppendEntries request based on the state of the server and
// log. Send the request through the user-provided handler and process the
// result.
2013-05-28 19:57:38 +00:00
resp, err := p.server.transporter.SendAppendEntriesRequest(p.server, p, req)
2013-06-05 00:02:45 +00:00
fmt.Println("FLUSH: trans finished")
p.heartbeatTimer.Reset()
2013-05-05 19:36:23 +00:00
if resp == nil {
2013-06-05 00:02:45 +00:00
fmt.Println("trans error")
2013-05-05 19:36:23 +00:00
return 0, false, err
}
2013-05-05 20:26:04 +00:00
2013-05-05 19:36:23 +00:00
// If successful then update the previous log index. If it was
// unsuccessful then decrement the previous log index and we'll try again
// next time.
if resp.Success {
2013-06-05 00:02:45 +00:00
fmt.Println("FLUSH: trans success")
2013-05-05 19:36:23 +00:00
if len(req.Entries) > 0 {
2013-05-08 20:22:08 +00:00
p.prevLogIndex = req.Entries[len(req.Entries)-1].Index
2013-05-05 19:36:23 +00:00
}
} else {
2013-05-10 14:47:24 +00:00
// Decrement the previous log index down until we find a match. Don't
// let it go below where the peer's commit index is though. That's a
// problem.
2013-05-05 19:36:23 +00:00
if p.prevLogIndex > 0 {
p.prevLogIndex--
2013-05-11 03:54:25 +00:00
}
if resp.CommitIndex > p.prevLogIndex {
p.prevLogIndex = resp.CommitIndex
2013-05-05 19:36:23 +00:00
}
}
return resp.Term, resp.Success, err
}
//--------------------------------------
// Heartbeat
//--------------------------------------
// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
func (p *Peer) heartbeatTimeoutFunc() {
2013-06-05 00:02:45 +00:00
fmt.Println("heart beat")
for {
// Grab the current timer channel.
p.mutex.Lock()
2013-06-05 00:02:45 +00:00
fmt.Println("heart beat: got lock")
var c chan time.Time
if p.heartbeatTimer != nil {
c = p.heartbeatTimer.C()
}
p.mutex.Unlock()
2013-06-05 00:02:45 +00:00
fmt.Println("heart beat: after lock")
// If the channel or timer are gone then exit.
if c == nil {
2013-06-05 00:02:45 +00:00
fmt.Println("heart beat: break")
break
}
// Flush the peer when we get a heartbeat timeout. If the channel is
// closed then the peer is getting cleaned up and we should exit.
2013-05-10 14:47:24 +00:00
if _, ok := <-c; ok {
// Retrieve the peer data within a lock that is separate from the
// server lock when creating the request. Otherwise a deadlock can
// occur.
p.mutex.Lock()
server, prevLogIndex := p.server, p.prevLogIndex
p.mutex.Unlock()
2013-06-05 00:02:45 +00:00
fmt.Println("heart beat, preIndex: ", prevLogIndex, " startIndex:", server.log.StartIndex())
server.log.mutex.Lock()
if prevLogIndex < server.log.StartIndex() {
server.log.mutex.Unlock()
req := server.createSnapshotRequest()
p.mutex.Lock()
p.sendSnapshotRequest(req)
p.mutex.Unlock()
} else {
// Lock the server to create a request.
req := server.createAppendEntriesRequest(prevLogIndex)
server.log.mutex.Unlock()
p.mutex.Lock()
p.sendFlushRequest(req)
p.mutex.Unlock()
}
} else {
break
}
}
}