Refactor server event loop.

pull/820/head
Ben Johnson 2013-07-07 14:21:04 -06:00
parent ce3dbb5dfb
commit 08e2d519ae
7 changed files with 505 additions and 673 deletions

View File

@ -11,8 +11,12 @@ import (
//
//------------------------------------------------------------------------------
var Debug bool = false
var Trace bool = false
const (
Debug = 1
Trace = 2
)
var LogLevel int = 0
var logger *log.Logger
func init() {
@ -32,7 +36,7 @@ func init() {
// Prints to the standard logger if debug mode is enabled. Arguments
// are handled in the manner of fmt.Print.
func debug(v ...interface{}) {
if Debug || Trace {
if LogLevel >= Debug {
logger.Print(v...)
}
}
@ -40,7 +44,7 @@ func debug(v ...interface{}) {
// Prints to the standard logger if debug mode is enabled. Arguments
// are handled in the manner of fmt.Printf.
func debugf(format string, v ...interface{}) {
if Debug || Trace {
if LogLevel >= Debug {
logger.Printf(format, v...)
}
}
@ -48,7 +52,7 @@ func debugf(format string, v ...interface{}) {
// Prints to the standard logger if debug mode is enabled. Arguments
// are handled in the manner of debugln.
func debugln(v ...interface{}) {
if Debug || Trace {
if LogLevel >= Debug {
logger.Println(v...)
}
}
@ -60,7 +64,7 @@ func debugln(v ...interface{}) {
// Prints to the standard logger if trace debugging is enabled. Arguments
// are handled in the manner of fmt.Print.
func trace(v ...interface{}) {
if Trace {
if LogLevel >= Trace {
logger.Print(v...)
}
}
@ -68,7 +72,7 @@ func trace(v ...interface{}) {
// Prints to the standard logger if trace debugging is enabled. Arguments
// are handled in the manner of fmt.Printf.
func tracef(format string, v ...interface{}) {
if Trace {
if LogLevel >= Trace {
logger.Printf(format, v...)
}
}
@ -76,7 +80,7 @@ func tracef(format string, v ...interface{}) {
// Prints to the standard logger if trace debugging is enabled. Arguments
// are handled in the manner of debugln.
func traceln(v ...interface{}) {
if Trace {
if LogLevel >= Trace {
logger.Println(v...)
}
}

2
log.go
View File

@ -205,7 +205,7 @@ func (l *Log) createEntry(term uint64, command Command) *LogEntry {
func (l *Log) getEntry(index uint64) *LogEntry {
l.mutex.Lock()
defer l.mutex.Unlock()
if index <= l.startIndex || index > (l.startIndex+uint64(len(l.entries))) {
return nil
}

292
peer.go
View File

@ -1,8 +1,6 @@
package raft
import (
"errors"
"fmt"
"sync"
"time"
)
@ -19,14 +17,8 @@ type Peer struct {
name string
prevLogIndex uint64
mutex sync.Mutex
heartbeatTimer *timer
}
type flushResponse struct {
term uint64
success bool
err error
name string
stopChan chan bool
heartbeatTimeout time.Duration
}
//------------------------------------------------------------------------------
@ -37,13 +29,12 @@ type flushResponse struct {
// Creates a new peer.
func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer {
p := &Peer{
return &Peer{
server: server,
name: name,
heartbeatTimer: newTimer(heartbeatTimeout, heartbeatTimeout),
stopChan: make(chan bool),
heartbeatTimeout: heartbeatTimeout,
}
return p
}
//------------------------------------------------------------------------------
@ -59,12 +50,9 @@ func (p *Peer) Name() string {
// Sets the heartbeat timeout.
func (p *Peer) setHeartbeatTimeout(duration time.Duration) {
p.heartbeatTimer.setDuration(duration)
p.heartbeatTimeout = duration
}
func (p *Peer) startHeartbeat() {
go p.heartbeat()
}
//------------------------------------------------------------------------------
//
@ -73,14 +61,19 @@ func (p *Peer) startHeartbeat() {
//------------------------------------------------------------------------------
//--------------------------------------
// State
// Heartbeat
//--------------------------------------
// Stops the peer entirely.
func (p *Peer) stop() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.heartbeatTimer.stop()
// Starts the peer heartbeat.
func (p *Peer) startHeartbeat() {
c := make(chan bool)
go p.heartbeat(c)
<- c
}
// Stops the peer heartbeat.
func (p *Peer) stopHeartbeat() {
p.stopChan <- true
}
//--------------------------------------
@ -99,179 +92,110 @@ func (p *Peer) clone() *Peer {
}
//--------------------------------------
// Flush
// Heartbeat
//--------------------------------------
// Sends an AppendEntries RPC but does not obtain a lock
// on the server.
func (p *Peer) flush() (uint64, bool, error) {
// We need to hold the log lock to create AppendEntriesRequest
// avoid snapshot to delete the desired entries before AEQ()
req := p.server.createAppendEntriesRequest(p.prevLogIndex)
// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
func (p *Peer) heartbeat(c chan bool) {
c <- true
if req != nil {
return p.sendFlushRequest(req)
for {
select {
case <-p.stopChan:
return
case <-time.After(p.heartbeatTimeout):
p.flush()
}
}
}
//--------------------------------------
// Append Entries
//--------------------------------------
// Sends an AppendEntries RPC.
func (p *Peer) flush() {
entries, prevLogTerm := p.server.log.getEntriesAfter(p.prevLogIndex)
if entries != nil {
p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, p.server.name, p.prevLogIndex, prevLogTerm, entries, p.server.log.commitIndex))
} else {
req := p.server.createSnapshotRequest()
return p.sendSnapshotRequest(req)
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
}
}
// Sends an AppendEntries request to the peer through the transport.
func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
traceln("peer.flush.send: ", p.server.Name(), "->", p.Name(), " ", len(req.Entries))
resp := p.server.transporter.SendAppendEntriesRequest(p.server, p, req)
if resp == nil {
debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name())
return
}
traceln("peer.flush.recv: ", p.Name())
// If successful then update the previous log index.
if resp.Success {
if len(req.Entries) > 0 {
p.prevLogIndex = req.Entries[len(req.Entries)-1].Index
}
traceln("peer.flush.success: ", p.server.Name(), "->", p.Name(), "; idx =", p.prevLogIndex)
// If it was unsuccessful then decrement the previous log index and
// we'll try again next time.
} else {
// we may miss a response from peer
if resp.CommitIndex >= p.prevLogIndex {
p.prevLogIndex = resp.CommitIndex
debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
} else if p.prevLogIndex > 0 {
// Decrement the previous log index down until we find a match. Don't
// let it go below where the peer's commit index is though. That's a
// problem.
p.prevLogIndex--
debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
}
}
// Send response to server for processing.
p.server.send(resp)
}
// Sends an Snapshot request to the peer through the transport.
func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
debugln("peer.snap.send: ", p.name)
resp := p.server.transporter.SendSnapshotRequest(p.server, p, req)
if resp == nil {
debugln("peer.snap.timeout: ", p.name)
return
}
debugln("peer.snap.recv: ", p.name)
// If successful then update the previous log index.
if resp.Success {
p.prevLogIndex = req.LastIndex
} else {
debugln("peer.snap.failed: ", p.name)
}
// Send response to server for processing.
p.server.send(resp)
}
//--------------------------------------
// Vote Requests
//--------------------------------------
// send VoteRequest Request
func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) {
debugln("peer.vote: ", p.server.Name(), "->", p.Name())
req.peer = p
debugln(p.server.Name(), "Send Vote Request to ", p.Name())
if resp, _ := p.server.transporter.SendVoteRequest(p.server, p, req); resp != nil {
if resp := p.server.transporter.SendVoteRequest(p.server, p, req); resp != nil {
resp.peer = p
c <- resp
}
}
// send Snapshot Request
func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) (uint64, bool, error) {
// Ignore any null requests.
if req == nil {
return 0, false, errors.New("raft.Peer: Request required")
}
// Generate an snapshot request based on the state of the server and
// log. Send the request through the user-provided handler and process the
// result.
resp, err := p.server.transporter.SendSnapshotRequest(p.server, p, req)
if resp == nil {
return 0, false, err
}
// If successful then update the previous log index. If it was
// unsuccessful then decrement the previous log index and we'll try again
// next time.
if resp.Success {
p.prevLogIndex = req.LastIndex
} else {
panic(resp)
}
return resp.Term, resp.Success, err
}
// Flushes a request through the server's transport.
func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error) {
// Ignore any null requests.
if req == nil {
return 0, false, errors.New("raft.Peer: Request required")
}
// Generate an AppendEntries request based on the state of the server and
// log. Send the request through the user-provided handler and process the
// result.
traceln("peer.flush.send: ", p.server.Name(), "->", p.Name(), " ", len(req.Entries))
respChan := make(chan *AppendEntriesResponse, 2)
go func() {
tranResp, _ := p.server.transporter.SendAppendEntriesRequest(p.server, p, req)
respChan <- tranResp
}()
var resp *AppendEntriesResponse
select {
// how to decide?
case <-time.After(p.server.heartbeatTimeout * 2):
resp = nil
case resp = <-respChan:
}
if resp == nil {
traceln("receive flush timeout from ", p.Name())
return 0, false, fmt.Errorf("AppendEntries timeout: %s", p.Name())
}
traceln("peer.flush.recv: ", p.Name())
// If successful then update the previous log index. If it was
// unsuccessful then decrement the previous log index and we'll try again
// next time.
if resp.Success {
if len(req.Entries) > 0 {
p.prevLogIndex = req.Entries[len(req.Entries)-1].Index
}
traceln(p.server.GetState()+": Peer ", p.Name(), "'s' log update to ", p.prevLogIndex)
} else {
if resp.Term > p.server.currentTerm {
return resp.Term, false, errors.New("Step down")
}
// we may miss a response from peer
if resp.CommitIndex >= p.prevLogIndex {
traceln(p.server.GetState()+": Peer ", p.Name(), "'s' log update to ", p.prevLogIndex)
p.prevLogIndex = resp.CommitIndex
} else if p.prevLogIndex > 0 {
traceln("Peer ", p.Name(), "'s' step back to ", p.prevLogIndex)
// Decrement the previous log index down until we find a match. Don't
// let it go below where the peer's commit index is though. That's a
// problem.
p.prevLogIndex--
}
}
return resp.Term, resp.Success, nil
}
//--------------------------------------
// Heartbeat
//--------------------------------------
// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
func (p *Peer) heartbeat() {
for {
// (1) timeout/fire happens, flush the peer
// (2) stopped, return
if p.heartbeatTimer.start() {
var f flushResponse
f.name = p.name
f.term, f.success, f.err = p.flush()
// if the peer successfully appended the log entry
// we will tell the commit center
if f.success {
if p.prevLogIndex > p.server.log.commitIndex {
traceln("[Heartbeat] Peer", p.Name(), "send to commit center")
p.server.response <- f
traceln("[Heartbeat] Peer", p.Name(), "back from commit center")
}
} else {
// shutdown the heartbeat
if f.term > p.server.currentTerm {
p.server.stateMutex.Lock()
if p.server.state == Leader {
p.server.state = Follower
select {
case p.server.stepDown <- f.term:
p.server.currentTerm = f.term
default:
panic("heartbeat cannot step down")
}
}
p.server.stateMutex.Unlock()
return
}
}
} else {
// shutdown
return
}
}
}

639
server.go
View File

@ -31,6 +31,8 @@ const (
DefaultElectionTimeout = 150 * time.Millisecond
)
var stopValue interface{}
//------------------------------------------------------------------------------
//
// Errors
@ -39,6 +41,7 @@ const (
var NotLeaderError = errors.New("raft.Server: Not current leader")
var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
var CommandTimeoutError = errors.New("raft: Command timeout")
//------------------------------------------------------------------------------
//
@ -62,11 +65,11 @@ type Server struct {
peers map[string]*Peer
mutex sync.Mutex
stateMutex sync.Mutex
commitCount int
electionTimer *timer
heartbeatTimeout time.Duration
c chan interface{}
response chan flushResponse
c chan *event
stepDown chan uint64
stopChan chan bool
@ -75,6 +78,13 @@ type Server struct {
stateMachine StateMachine
}
// An event to be processed by the server's event loop.
type event struct {
target interface{}
returnValue interface{}
c chan error
}
//------------------------------------------------------------------------------
//
// Constructor
@ -99,8 +109,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
state: Stopped,
peers: make(map[string]*Peer),
log: newLog(),
c: make(chan interface{}, 256),
response: make(chan flushResponse, 128),
c: make(chan *event, 256),
stepDown: make(chan uint64, 1),
stopChan: make(chan bool),
electionTimer: newTimer(DefaultElectionTimeout, DefaultElectionTimeout*2),
@ -288,8 +297,7 @@ func (s *Server) Initialize() error {
// Initialize the log and load it up.
if err := s.log.open(s.LogPath()); err != nil {
debugln("raft: Log error: %s", err)
s.unload()
s.debugln("raft: Log error: %s", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
@ -301,44 +309,22 @@ func (s *Server) Initialize() error {
// Start the sever as a follower
func (s *Server) StartFollower() {
go s.loop(Follower)
s.state = Follower
go s.loop()
}
// Start the sever as a leader
func (s *Server) StartLeader() {
s.state = Candidate
s.state = Leader
s.currentTerm++
go s.loop(Leader)
go s.loop()
}
// Shuts down the server.
func (s *Server) Stop() {
s.mutex.Lock()
if s.state == Follower {
s.electionTimer.stop()
} else {
s.mutex.Unlock()
s.stopChan <- true
}
s.unload()
}
// Unloads the server.
func (s *Server) unload() {
// Kill the election timer.
s.state = Stopped
// wait for all previous flush ends
time.Sleep(100 * time.Millisecond)
// Close the log.
if s.log != nil {
// still some concurrency issue with stop
// need lock
s.log.close()
}
s.send(&stopValue)
s.log.close()
}
// Checks if the server is currently running.
@ -346,6 +332,22 @@ func (s *Server) Running() bool {
return s.state != Stopped
}
//--------------------------------------
// Term
//--------------------------------------
// Sets the current term for the server. This is only used when an external
// current term is found.
func (s *Server) setCurrentTerm(term uint64, leaderName string) {
if term > s.currentTerm {
s.state = Follower
s.currentTerm = term
s.leader = leaderName
s.votedFor = ""
}
}
//--------------------------------------
// Event Loop
//--------------------------------------
@ -362,74 +364,81 @@ func (s *Server) Running() bool {
// |_______________________|____________________________________ |
//
// The main event loop for the server
func (s *Server) loop(role string) {
debugln("server.loop.start ", role)
defer debugln("server.loop.end")
func (s *Server) loop() {
defer s.debugln("server.loop.end")
for {
switch role {
s.debugln("server.loop.run ", s.state)
switch s.state {
case Follower:
if stopped := s.followerLoop(); stopped {
return
}
role = Candidate
s.followerLoop()
case Candidate:
stopped, leader := s.candidateLoop()
s.votedFor = ""
if stopped {
return
} else if leader {
role = Leader
} else {
role = Follower
}
s.candidateLoop()
case Leader:
if stopped := s.leaderLoop(); stopped {
return
}
role = Follower
s.leaderLoop()
case Stopped:
return
}
}
}
// Sends an event to the event loop to be processed. The function will wait
// until the event is actually processed before returning.
func (s *Server) send(value interface{}) (interface{}, error) {
event := s.sendAsync(value)
err := <-event.c
return event.returnValue, err
}
func (s *Server) sendAsync(value interface{}) *event {
event := &event{target: value, c: make(chan error, 1)}
s.c <- event
return event
}
// The event loop that is run when the server is in a Follower state.
// Responds to RPCs from candidates and leaders.
// Converts to candidate if election timeout elapses without either:
// 1.Receiving valid AppendEntries RPC, or
// 2.Granting vote to candidate
func (s *Server) followerLoop() bool {
func (s *Server) followerLoop() {
s.state = Follower
// (1) Timeout: Promote to candidate and return
// (2) Stopped: Due to receive heartbeat, continue
for {
if s.state == Stopped {
return true
var err error
select {
case e := <-s.c:
if e.target == &stopValue {
s.state = Stopped
} else if _, ok := e.target.(Command); ok {
err = NotLeaderError
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue = s.processAppendEntriesRequest(req)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue = s.processRequestVoteRequest(req)
}
// Callback to event.
e.c <- err
case <-afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2):
s.state = Candidate
}
if s.electionTimer.start() {
return false
} else {
s.electionTimer.ready()
continue
// Exit loop on state change.
if s.state != Follower {
break
}
}
}
// The event loop that is run when the server is in a Candidate state.
func (s *Server) candidateLoop() (bool, bool) {
if s.state != Follower && s.state != Stopped {
panic(fmt.Sprintf("Invalid candidate loop state: %s", s.state))
}
s.state = Candidate
s.leader = ""
func (s *Server) candidateLoop() {
lastLogIndex, lastLogTerm := s.log.lastInfo()
s.leader = ""
for {
// Increment current term, vote for self.
@ -437,10 +446,10 @@ func (s *Server) candidateLoop() (bool, bool) {
s.votedFor = s.name
// Send RequestVote RPCs to all other servers.
c := make(chan *RequestVoteResponse, len(s.peers))
respChan := make(chan *RequestVoteResponse, len(s.peers))
req := newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm)
for _, peer := range s.peers {
go peer.sendVoteRequest(req, c)
go peer.sendVoteRequest(req, respChan)
}
// Wait for either:
@ -448,126 +457,210 @@ func (s *Server) candidateLoop() (bool, bool) {
// * AppendEntries RPC received from new leader: step down.
// * Election timeout elapses without election resolution: increment term, start new election
// * Discover higher term: step down (§5.1)
if elected, timeout, stopped := s.startCandidateSelect(c); elected {
return false, true
} else if timeout {
continue
} else if stopped {
return true, false
} else {
return false, false
votesGranted := 1
timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
for {
// If we received enough votes then stop waiting for more votes.
s.debugln("server.candidate.votes: ", votesGranted, " quorum:", s.QuorumSize())
if votesGranted >= s.QuorumSize() {
s.state = Leader
break
}
// Collect votes from peers.
select {
case resp := <-respChan:
if resp.VoteGranted {
s.debugln("server.candidate.vote.granted: ", votesGranted)
votesGranted++
} else if resp.Term > s.currentTerm {
s.debugln("server.candidate.vote.failed")
s.setCurrentTerm(resp.Term, "")
break
}
case e := <- s.c:
var err error
if e.target == &stopValue {
s.state = Stopped
break
} else if _, ok := e.target.(Command); ok {
err = NotLeaderError
} else if _, ok := e.target.(*AppendEntriesRequest); ok {
err = NotLeaderError
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue = s.processRequestVoteRequest(req)
}
// Callback to event.
e.c <- err
case <-timeoutChan:
break
}
}
if s.state != Candidate {
break
}
}
}
// The event loop that is run when the server is in a Candidate state.
// Initialize nextIndex for each to last log index + 1
// Send initial empty AppendEntries RPCs (heartbeat) to each
// follower; repeat during idle periods to prevent election
// timeouts (§5.2)
// Accept commands from clients, append new entries to local
// log (§5.3)
// Whenever last log index ! nextIndex for a follower, send
// AppendEntries RPC with log entries starting at nextIndex,
// update nextIndex if successful (§5.3)
// If AppendEntries fails because of log inconsistency,
// decrement nextIndex and retry (§5.3)
// Mark entries committed if stored on a majority of servers
// and some entry from current term is stored on a majority of
// servers. Apply newly committed entries to state machine.
// Step down if currentTerm changes (§5.5)
func (s *Server) leaderLoop() bool {
func (s *Server) leaderLoop() {
s.state = Leader
s.leader = s.name
s.commitCount = 0
logIndex, _ := s.log.lastInfo()
// Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
for _, peer := range s.peers {
peer.prevLogIndex = logIndex
peer.heartbeatTimer.ready()
peer.startHeartbeat()
}
// Begin to collect response from followers
stopped := false
count := 1
for {
var err error
select {
case response := <-s.response:
debugln("[CommitCenter] Receive response from ", response.name, response.success)
if response.success {
count++
if count >= s.QuorumSize() {
s.updateCommitIndex()
}
case e := <-s.c:
s.debugln("server.leader.select")
if e.target == &stopValue {
s.state = Stopped
} else if command, ok := e.target.(Command); ok {
s.processCommand(command, e)
continue
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue = s.processAppendEntriesRequest(req)
} else if resp, ok := e.target.(*AppendEntriesResponse); ok {
s.processAppendEntriesResponse(resp)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue = s.processRequestVoteRequest(req)
}
case <-s.stepDown:
stopped = false
break
case <-s.stopChan:
stopped = true
// Callback to event.
e.c <- err
}
// Exit loop on state change.
if s.state != Leader {
break
}
}
// Stop all peers.
for _, peer := range s.peers {
peer.stop()
peer.stopHeartbeat()
}
return stopped
}
// Votes received from majority of servers: become leader
// Election timeout elapses without election resolution:
// Discover higher term: step down
func (s *Server) startCandidateSelect(respChan chan *RequestVoteResponse) (bool, bool, bool) {
votesGranted := 1
timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
//--------------------------------------
// Commands
//--------------------------------------
for {
// If we received enough votes then stop waiting for more votes.
if votesGranted >= s.QuorumSize() {
return true, false, false
}
// Attempts to execute a command and replicate it. The function will return
// when the command has been successfully committed or an error has occurred.
// Collect votes from peers.
func (s *Server) Do(command Command) (interface{}, error) {
return s.send(command)
}
// Processes a command.
func (s *Server) processCommand(command Command, e *event) {
s.debugln("server.command.process")
// Create an entry for the command in the log.
entry := s.log.createEntry(s.currentTerm, command)
if err := s.log.appendEntry(entry); err != nil {
s.debugln("server.command.log.error:", err)
e.c <- err
return
}
// Issue a callback for the entry once it's committed.
go func() {
// Wait for the entry to be committed.
select {
case resp := <-respChan:
if resp.VoteGranted == true {
votesGranted++
} else if resp.Term > s.currentTerm {
s.stateMutex.Lock()
select {
case <-s.stepDown:
default:
}
s.state = Follower
s.currentTerm = resp.Term
s.stateMutex.Unlock()
return false, false, false
}
case <-s.stepDown:
return false, false, false
case <-timeoutChan:
return false, true, false
case <-s.stopChan:
return false, false, true
case <-entry.commit:
s.debugln("server.command.commit")
e.returnValue = entry.result
entry.result = nil
e.c <- nil
case <-time.After(time.Second):
s.debugln("server.command.timeout")
e.c <- CommandTimeoutError
}
}()
}
// Issue an append entries response for the server.
s.sendAsync(newAppendEntriesResponse(s.currentTerm, true, s.log.commitIndex))
}
// Updates the commit index to the highest index committed by the quorum.
func (s *Server) updateCommitIndex() {
//--------------------------------------
// Append Entries
//--------------------------------------
// Appends zero or more log entry from the leader to this server.
func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
ret, _ := s.send(req)
resp, _ := ret.(*AppendEntriesResponse)
return resp
}
// Processes the "append entries" request.
func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) *AppendEntriesResponse {
if req.Term < s.currentTerm {
s.debugln("server.ae.error: stale term")
return newAppendEntriesResponse(s.currentTerm, false, s.log.commitIndex)
}
// Update term and leader.
s.setCurrentTerm(req.Term, req.LeaderName)
// Reject if log doesn't contain a matching previous entry.
if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
s.debugln("server.ae.truncate.error: ", err)
return newAppendEntriesResponse(s.currentTerm, false, s.log.commitIndex)
}
// Append entries to the log.
if err := s.log.appendEntries(req.Entries); err != nil {
s.debugln("server.ae.append.error: ", err)
return newAppendEntriesResponse(s.currentTerm, false, s.log.commitIndex)
}
// Commit up to the commit index.
if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
s.debugln("server.ae.commit.error: ", err)
return newAppendEntriesResponse(s.currentTerm, false, s.log.commitIndex)
}
return newAppendEntriesResponse(s.currentTerm, true, s.log.commitIndex)
}
// Processes the "append entries" response from the peer. This is only
// processed when the server is a leader. Responses received during other
// states are dropped.
func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
// If we find a higher term then change to a follower and exit.
if resp.Term > s.currentTerm {
s.setCurrentTerm(resp.Term, "")
return
}
// Ignore response if it's not successful.
if !resp.Success {
return
}
// Increment the commit count to make sure we have a quorum before committing.
s.commitCount++
if s.commitCount < s.QuorumSize() {
return
}
// Determine the committed index that a majority has.
var indices []uint64
indices = append(indices, s.log.currentIndex())
@ -583,119 +676,13 @@ func (s *Server) updateCommitIndex() {
if commitIndex > committedIndex {
s.log.setCommitIndex(commitIndex)
for i := committedIndex; i < commitIndex; i++ {
if entry := s.log.getEntry(i+1); entry != nil {
s.log.entries[i-s.log.startIndex].commit <- true
if entry := s.log.getEntry(i + 1); entry != nil {
entry.commit <- true
}
}
}
}
//--------------------------------------
// Commands
//--------------------------------------
// Attempts to execute a command and replicate it. The function will return
// when the command has been successfully committed or an error has occurred.
func (s *Server) Do(command Command) (interface{}, error) {
s.stateMutex.Lock()
if s.state != Leader {
s.stateMutex.Unlock()
return nil, NotLeaderError
}
// we get the term of the server
// when we are sure the server is leader
term := s.currentTerm
s.stateMutex.Unlock()
entry := s.log.createEntry(term, command)
if err := s.log.appendEntry(entry); err != nil {
return nil, err
}
s.response <- flushResponse{term, true, nil, s.name}
// timeout here
select {
case <-entry.commit:
debugln("[Do] finish!")
result := entry.result
entry.result = nil
return result, nil
case <-time.After(time.Second):
debugln("[Do] fail!")
return nil, errors.New("Command commit fails")
}
}
// Appends a log entry from the leader to this server.
func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
// If the server is stopped then reject it.
if !s.Running() {
return newAppendEntriesResponse(s.currentTerm, false, 0), fmt.Errorf("raft.Server: Server stopped")
}
// If the request is coming from an old term then reject it.
if req.Term < s.currentTerm {
return newAppendEntriesResponse(s.currentTerm, false, s.log.commitIndex), fmt.Errorf("raft.Server: Stale request term")
}
traceln("Peer ", s.Name(), "received heartbeat from ", req.LeaderName,
" ", req.Term, " ", s.currentTerm, " ", time.Now())
s.setCurrentTerm(req.Term)
// Update the current leader.
s.leader = req.LeaderName
// Reset election timeout.
if s.electionTimer != nil {
s.electionTimer.stop()
}
// Reject if log doesn't contain a matching previous entry.
if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
return newAppendEntriesResponse(s.currentTerm, false, s.log.commitIndex), err
}
traceln("Peer ", s.Name(), "after truncate ")
// Append entries to the log.
if err := s.log.appendEntries(req.Entries); err != nil {
return newAppendEntriesResponse(s.currentTerm, false, s.log.commitIndex), err
}
traceln("Peer ", s.Name(), "commit index ", req.CommitIndex, " from ",
req.LeaderName)
// Commit up to the commit index.
if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
return newAppendEntriesResponse(s.currentTerm, false, s.log.commitIndex), err
}
traceln("Peer ", s.Name(), "after commit ")
traceln("Peer ", s.Name(), "reply heartbeat from ", req.LeaderName,
" ", req.Term, " ", s.currentTerm, " ", time.Now())
return newAppendEntriesResponse(s.currentTerm, true, s.log.commitIndex), nil
}
// Creates an AppendEntries request. Can return a nil request object if the
// index doesn't exist because of a snapshot.
func (s *Server) createAppendEntriesRequest(prevLogIndex uint64) *AppendEntriesRequest {
entries, prevLogTerm := s.log.getEntriesAfter(prevLogIndex)
if entries != nil {
return newAppendEntriesRequest(s.currentTerm, s.name, prevLogIndex, prevLogTerm, entries, s.log.commitIndex)
} else {
return nil
}
}
//--------------------------------------
// Request Vote
//--------------------------------------
@ -703,79 +690,40 @@ func (s *Server) createAppendEntriesRequest(prevLogIndex uint64) *AppendEntriesR
// Requests a vote from a server. A vote can be obtained if the vote's term is
// at the server's current term and the server has not made a vote yet. A vote
// can also be obtained if the term is greater than the server's current term.
func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
debugln("Peer ", s.Name(), "receive vote request from ", req.CandidateName)
//debugln("[RequestVote] got the lock")
// Fail if the server is not running.
if !s.Running() {
return newRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Server is stopped")
}
func (s *Server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
ret, _ := s.send(req)
resp, _ := ret.(*RequestVoteResponse)
return resp
}
// Processes a "request vote" request.
func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) *RequestVoteResponse {
// If the request is coming from an old term then reject it.
if req.Term < s.currentTerm {
return newRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Stale term: %v < %v", req.Term, s.currentTerm)
s.debugln("server.rv.error: stale term")
return newRequestVoteResponse(s.currentTerm, false)
}
s.setCurrentTerm(req.Term)
s.setCurrentTerm(req.Term, "")
// If we've already voted for a different candidate then don't vote for this candidate.
if s.votedFor != "" && s.votedFor != req.CandidateName {
debugln("already vote for ", s.votedFor, " false to ", req.CandidateName)
return newRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Already voted for %v", s.votedFor)
s.debugln("server.rv.error: duplicate vote: ", req.CandidateName)
return newRequestVoteResponse(s.currentTerm, false)
}
// If the candidate's log is not at least as up-to-date as
// our last log then don't vote.
// If the candidate's log is not at least as up-to-date as our last log then don't vote.
lastIndex, lastTerm := s.log.lastInfo()
if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
debugln("my log is more up-to-date")
return newRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Out-of-date log: [%v/%v] > [%v/%v]", lastIndex, lastTerm, req.LastLogIndex, req.LastLogTerm)
s.debugln("server.rv.error: out of date log: ", req.CandidateName)
return newRequestVoteResponse(s.currentTerm, false)
}
// If we made it this far then cast a vote and reset our election time out.
s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
s.votedFor = req.CandidateName
debugln(s.Name(), "Vote for ", req.CandidateName, "at term", req.Term)
if s.electionTimer != nil {
s.electionTimer.stop()
}
return newRequestVoteResponse(s.currentTerm, true), nil
}
// Updates the current term on the server if the term is greater than the
// server's current term. When the term is changed then the server's vote is
// cleared and its state is changed to be a follower.
func (s *Server) setCurrentTerm(term uint64) {
if term > s.currentTerm {
s.votedFor = ""
s.stateMutex.Lock()
if s.state == Leader || s.state == Candidate {
debugln(s.Name(), " should step down to a follower from ", s.state)
s.state = Follower
select {
case s.stepDown <- term:
default:
panic("cannot stepdown")
}
debugln(s.Name(), " step down to a follower from ", s.state)
s.currentTerm = term
s.stateMutex.Unlock()
return
}
s.stateMutex.Unlock()
// update term after stop all the peer
s.currentTerm = term
}
return newRequestVoteResponse(s.currentTerm, true)
}
//--------------------------------------
@ -793,7 +741,7 @@ func (s *Server) AddPeer(name string) error {
// Only add the peer if it doesn't have the same name.
if s.name != name {
//debugln("Add peer ", name)
//s.debugln("Add peer ", name)
peer := newPeer(s, name, s.heartbeatTimeout)
if s.state == Leader {
peer.startHeartbeat()
@ -820,13 +768,11 @@ func (s *Server) RemovePeer(name string) error {
// Flush entries to the peer first.
if s.state == Leader {
if _, _, err := peer.flush(); err != nil {
warn("raft: Unable to notify peer of removal: %v", err)
}
peer.flush()
}
// Stop peer and remove it.
peer.stop()
peer.stopHeartbeat()
delete(s.peers, name)
return nil
@ -836,13 +782,6 @@ func (s *Server) RemovePeer(name string) error {
// Log compaction
//--------------------------------------
// Creates a snapshot request.
func (s *Server) createSnapshotRequest() *SnapshotRequest {
s.mutex.Lock()
defer s.mutex.Unlock()
return newSnapshotRequest(s.name, s.lastSnapshot)
}
// The background snapshot function
func (s *Server) Snapshot() {
for {
@ -855,7 +794,7 @@ func (s *Server) Snapshot() {
func (s *Server) takeSnapshot() error {
//TODO put a snapshot mutex
debugln("take Snapshot")
s.debugln("take Snapshot")
if s.currentSnapshot != nil {
return errors.New("handling snapshot")
}
@ -1003,27 +942,27 @@ func (s *Server) LoadSnapshot() error {
}
snapshotBytes, _ = ioutil.ReadAll(file)
debugln(string(snapshotBytes))
s.debugln(string(snapshotBytes))
// Generate checksum.
byteChecksum := crc32.ChecksumIEEE(snapshotBytes)
if uint32(checksum) != byteChecksum {
debugln(checksum, " ", byteChecksum)
s.debugln(checksum, " ", byteChecksum)
return errors.New("bad snapshot file")
}
err = json.Unmarshal(snapshotBytes, &s.lastSnapshot)
if err != nil {
debugln("unmarshal error: ", err)
s.debugln("unmarshal error: ", err)
return err
}
err = s.stateMachine.Recovery(s.lastSnapshot.State)
if err != nil {
debugln("recovery error: ", err)
s.debugln("recovery error: ", err)
return err
}
@ -1037,3 +976,17 @@ func (s *Server) LoadSnapshot() error {
return err
}
//--------------------------------------
// Debugging
//--------------------------------------
func (s *Server) debugln(v ...interface{}) {
debugf("[%s] %s", s.name, fmt.Sprintln(v...))
}
func (s *Server) traceln(v ...interface{}) {
tracef("[%s] %s", s.name, fmt.Sprintln(v...))
}

View File

@ -25,9 +25,9 @@ func TestServerRequestVote(t *testing.T) {
server.Initialize()
server.StartLeader()
defer server.Stop()
resp, err := server.RequestVote(newRequestVoteRequest(1, "foo", 0, 0))
if !(resp.Term == 1 && resp.VoteGranted && err == nil) {
t.Fatalf("Invalid request vote response: %v/%v (%v)", resp.Term, resp.VoteGranted, err)
resp := server.RequestVote(newRequestVoteRequest(1, "foo", 0, 0))
if resp.Term != 1 || !resp.VoteGranted {
t.Fatalf("Invalid request vote response: %v/%v", resp.Term, resp.VoteGranted)
}
}
@ -38,9 +38,9 @@ func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) {
server.StartLeader()
server.currentTerm = 2
defer server.Stop()
resp, err := server.RequestVote(newRequestVoteRequest(1, "foo", 0, 0))
if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Stale term: 1 < 2") {
t.Fatalf("Invalid request vote response: %v/%v (%v)", resp.Term, resp.VoteGranted, err)
resp := server.RequestVote(newRequestVoteRequest(1, "foo", 0, 0))
if resp.Term != 2 || resp.VoteGranted {
t.Fatalf("Invalid request vote response: %v/%v", resp.Term, resp.VoteGranted)
}
if server.currentTerm != 2 && server.state != Follower {
t.Fatalf("Server did not update term and demote: %v / %v", server.currentTerm, server.state)
@ -54,38 +54,35 @@ func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) {
server.StartLeader()
server.currentTerm = 2
defer server.Stop()
resp, err := server.RequestVote(newRequestVoteRequest(2, "foo", 0, 0))
if !(resp.Term == 2 && resp.VoteGranted && err == nil) {
t.Fatalf("First vote should not have been denied (%v)", err)
resp := server.RequestVote(newRequestVoteRequest(2, "foo", 0, 0))
if resp.Term != 2 || !resp.VoteGranted {
t.Fatalf("First vote should not have been denied")
}
resp, err = server.RequestVote(newRequestVoteRequest(2, "bar", 0, 0))
if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Already voted for foo") {
t.Fatalf("Second vote should have been denied (%v)", err)
resp = server.RequestVote(newRequestVoteRequest(2, "bar", 0, 0))
if resp.Term != 2 || resp.VoteGranted {
t.Fatalf("Second vote should have been denied")
}
}
// // Ensure that a vote request is approved if vote occurs in a new term.
// Ensure that a vote request is approved if vote occurs in a new term.
func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
server := newTestServer("1", &testTransporter{})
server.Initialize()
server.StartLeader()
server.currentTerm = 2
defer server.Stop()
resp, err := server.RequestVote(newRequestVoteRequest(2, "foo", 0, 0))
if !(resp.Term == 2 && resp.VoteGranted && server.VotedFor() == "foo" && err == nil) {
t.Fatalf("First vote should not have been denied (%v)", err)
resp := server.RequestVote(newRequestVoteRequest(2, "foo", 0, 0))
if resp.Term != 2 || !resp.VoteGranted || server.VotedFor() != "foo" {
t.Fatalf("First vote should not have been denied")
}
resp, err = server.RequestVote(newRequestVoteRequest(3, "bar", 0, 0))
resp = server.RequestVote(newRequestVoteRequest(3, "bar", 0, 0))
// now stepdown is done by channel, need time
time.Sleep(5 * time.Millisecond)
if !(resp.Term == 3 && resp.VoteGranted && server.VotedFor() == "bar" && err == nil) {
t.Fatalf("Second vote should have been approved (%v)", err)
if resp.Term != 3 || !resp.VoteGranted || server.VotedFor() != "bar" {
t.Fatalf("Second vote should have been approved")
}
}
// // Ensure that a vote request is denied if the log is out of date.
// Ensure that a vote request is denied if the log is out of date.
func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
server := newTestServerWithLog("1", &testTransporter{},
`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}`+"\n"+
@ -97,21 +94,21 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
defer server.Stop()
resp, err := server.RequestVote(newRequestVoteRequest(2, "foo", 2, 2))
if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Out-of-date log: [3/2] > [2/2]") {
t.Fatalf("Stale index vote should have been denied [%v/%v] (%v)", resp.Term, resp.VoteGranted, err)
resp := server.RequestVote(newRequestVoteRequest(2, "foo", 2, 2))
if resp.Term != 2 || resp.VoteGranted {
t.Fatalf("Stale index vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
}
resp, err = server.RequestVote(newRequestVoteRequest(2, "foo", 3, 1))
if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Out-of-date log: [3/2] > [3/1]") {
t.Fatalf("Stale term vote should have been denied [%v/%v] (%v)", resp.Term, resp.VoteGranted, err)
resp = server.RequestVote(newRequestVoteRequest(2, "foo", 3, 1))
if resp.Term != 2 || resp.VoteGranted {
t.Fatalf("Stale term vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
}
resp, err = server.RequestVote(newRequestVoteRequest(2, "foo", 3, 2))
if !(resp.Term == 2 && resp.VoteGranted && err == nil) {
t.Fatalf("Matching log vote should have been granted (%v)", err)
resp = server.RequestVote(newRequestVoteRequest(2, "foo", 3, 2))
if resp.Term != 2 || !resp.VoteGranted {
t.Fatalf("Matching log vote should have been granted")
}
resp, err = server.RequestVote(newRequestVoteRequest(2, "foo", 4, 3))
if !(resp.Term == 2 && resp.VoteGranted && err == nil) {
t.Fatalf("Ahead-of-log vote should have been granted (%v)", err)
resp = server.RequestVote(newRequestVoteRequest(2, "foo", 4, 3))
if resp.Term != 2 || !resp.VoteGranted {
t.Fatalf("Ahead-of-log vote should have been granted")
}
}
@ -121,7 +118,6 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
// // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader.
func TestServerPromoteSelf(t *testing.T) {
debugln("---TestServerPromoteSelf---")
server := newTestServer("1", &testTransporter{})
server.Initialize()
server.StartFollower()
@ -138,69 +134,28 @@ func TestServerPromoteSelf(t *testing.T) {
func TestServerPromote(t *testing.T) {
lookup := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return lookup[peer.Name()].RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return lookup[peer.Name()].AppendEntries(req)
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
lookup["1"].state = Follower
lookup["2"].state = Follower
lookup["3"].state = Follower
servers[0].StartFollower()
servers[1].StartFollower()
servers[2].StartFollower()
leader := servers[0]
time.Sleep(50 * time.Millisecond)
leader.StartFollower()
time.Sleep(200 * time.Millisecond)
if leader.state != Leader {
t.Fatalf("Server promotion failed: %v", leader.state)
if servers[0].state != Leader && servers[1].state != Leader && servers[2].state != Leader {
t.Fatalf("No leader elected: (%s, %s, %s)", servers[0].state, servers[1].state, servers[2].state)
}
for _, server := range servers {
server.Stop()
}
}
// Ensure that a server will restart election if not enough votes are obtained before timeout.
func TestServerPromoteDoubleElection(t *testing.T) {
lookup := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
resp, err := lookup[peer.Name()].RequestVote(req)
return resp, err
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
resp, err := lookup[peer.Name()].AppendEntries(req)
return resp, err
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
lookup["2"].currentTerm, lookup["2"].votedFor = 1, "2"
lookup["3"].currentTerm, lookup["3"].votedFor = 1, "3"
lookup["1"].state = Follower
lookup["2"].state = Follower
lookup["3"].state = Follower
leader := servers[0]
leader.StartFollower()
time.Sleep(400 * time.Millisecond)
if lookup["2"].votedFor != "1" {
t.Fatalf("Unexpected vote for server 2: %v", lookup["2"].votedFor)
}
if lookup["3"].votedFor != "1" {
t.Fatalf("Unexpected vote for server 3: %v", lookup["3"].votedFor)
}
for _, server := range servers {
server.Stop()
}
}
//--------------------------------------
// Append Entries
//--------------------------------------
@ -214,30 +169,30 @@ func TestServerAppendEntries(t *testing.T) {
// Append single entry.
entries := []*LogEntry{newLogEntry(nil, 1, 1, &testCommand1{"foo", 10})}
resp, err := server.AppendEntries(newAppendEntriesRequest(1, "ldr", 0, 0, entries, 0))
if !(resp.Term == 1 && resp.Success && err == nil) {
t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err)
resp := server.AppendEntries(newAppendEntriesRequest(1, "ldr", 0, 0, entries, 0))
if resp.Term != 1 || !resp.Success {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
if index, term := server.log.commitInfo(); !(index == 0 && term == 0) {
if index, term := server.log.commitInfo(); index != 0 || term != 0 {
t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
}
// Append multiple entries + commit the last one.
entries = []*LogEntry{newLogEntry(nil, 2, 1, &testCommand1{"bar", 20}), newLogEntry(nil, 3, 1, &testCommand1{"baz", 30})}
resp, err = server.AppendEntries(newAppendEntriesRequest(1, "ldr", 1, 1, entries, 1))
if !(resp.Term == 1 && resp.Success && err == nil) {
t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err)
resp = server.AppendEntries(newAppendEntriesRequest(1, "ldr", 1, 1, entries, 1))
if resp.Term != 1 || !resp.Success {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
if index, term := server.log.commitInfo(); !(index == 1 && term == 1) {
if index, term := server.log.commitInfo(); index != 1 || term != 1 {
t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
}
// Send zero entries and commit everything.
resp, err = server.AppendEntries(newAppendEntriesRequest(2, "ldr", 3, 1, []*LogEntry{}, 3))
if !(resp.Term == 2 && resp.Success && err == nil) {
t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err)
resp = server.AppendEntries(newAppendEntriesRequest(2, "ldr", 3, 1, []*LogEntry{}, 3))
if resp.Term != 2 || !resp.Success {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
if index, term := server.log.commitInfo(); !(index == 3 && term == 1) {
if index, term := server.log.commitInfo(); index != 3 || term != 1 {
t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
}
}
@ -253,11 +208,11 @@ func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
// Append single entry.
entries := []*LogEntry{newLogEntry(nil, 1, 1, &testCommand1{"foo", 10})}
resp, err := server.AppendEntries(newAppendEntriesRequest(1, "ldr", 0, 0, entries, 0))
if !(resp.Term == 2 && !resp.Success && err != nil && err.Error() == "raft.Server: Stale request term") {
t.Fatalf("AppendEntries should have failed: %v/%v : %v", resp.Term, resp.Success, err)
resp := server.AppendEntries(newAppendEntriesRequest(1, "ldr", 0, 0, entries, 0))
if resp.Term != 2 || resp.Success {
t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success)
}
if index, term := server.log.commitInfo(); !(index == 0 && term == 0) {
if index, term := server.log.commitInfo(); index != 0 || term != 0 {
t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
}
}
@ -275,16 +230,16 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
newLogEntry(nil, 1, 1, &testCommand1{"foo", 10}),
newLogEntry(nil, 2, 1, &testCommand1{"foo", 15}),
}
resp, err := server.AppendEntries(newAppendEntriesRequest(1, "ldr", 0, 0, entries, 2))
if !(resp.Term == 1 && resp.Success && err == nil) {
t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err)
resp := server.AppendEntries(newAppendEntriesRequest(1, "ldr", 0, 0, entries, 2))
if resp.Term != 1 || !resp.Success {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
// Append entry again (post-commit).
entries = []*LogEntry{newLogEntry(nil, 2, 1, &testCommand1{"bar", 20})}
resp, err = server.AppendEntries(newAppendEntriesRequest(1, "ldr", 2, 1, entries, 1))
if !(resp.Term == 1 && !resp.Success && err != nil && err.Error() == "raft.Log: Cannot append entry with earlier index in the same term (1:2 <= 1:2)") {
t.Fatalf("AppendEntries should have failed: %v/%v : %v", resp.Term, resp.Success, err)
resp = server.AppendEntries(newAppendEntriesRequest(1, "ldr", 2, 1, entries, 1))
if resp.Term != 1 || resp.Success {
t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success)
}
}
@ -301,16 +256,16 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
// Append single entry + commit.
entries := []*LogEntry{entry1, entry2}
resp, err := server.AppendEntries(newAppendEntriesRequest(1, "ldr", 0, 0, entries, 1))
if !(resp.Term == 1 && resp.Success && err == nil && server.log.commitIndex == 1 && reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry2})) {
t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err)
resp := server.AppendEntries(newAppendEntriesRequest(1, "ldr", 0, 0, entries, 1))
if resp.Term != 1 || !resp.Success || server.log.commitIndex != 1 || !reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry2}) {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
// Append entry that overwrites the second (uncommitted) entry.
entries = []*LogEntry{entry3}
resp, err = server.AppendEntries(newAppendEntriesRequest(2, "ldr", 1, 1, entries, 2))
if !(resp.Term == 2 && resp.Success && err == nil && server.log.commitIndex == 2 && reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry3})) {
t.Fatalf("AppendEntries should have succeeded: %v/%v : %v", resp.Term, resp.Success, err)
resp = server.AppendEntries(newAppendEntriesRequest(2, "ldr", 1, 1, entries, 2))
if resp.Term != 2 || !resp.Success || server.log.commitIndex != 2 || !reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry3}) {
t.Fatalf("AppendEntries should have succeeded: %v/%v", resp.Term, resp.Success)
}
}
@ -348,7 +303,7 @@ func TestServerSingleNode(t *testing.T) {
}
server.StartLeader()
time.Sleep(200 * time.Millisecond)
time.Sleep(50 * time.Millisecond)
// Join the server to itself.
if _, err := server.Do(&joinCommand{Name: "1"}); err != nil {
@ -374,23 +329,19 @@ func TestServerMultiNode(t *testing.T) {
servers := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
s := servers[peer.name]
resp, err := s.RequestVote(req)
return resp, err
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return servers[peer.name].RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
s := servers[peer.name]
resp, err := s.AppendEntries(req)
return resp, err
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return servers[peer.name].AppendEntries(req)
}
disTransporter := &testTransporter{}
disTransporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
return nil, nil
disTransporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return nil
}
disTransporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
return nil, nil
disTransporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return nil
}
var names []string
@ -492,7 +443,7 @@ func TestServerMultiNode(t *testing.T) {
debugln("retry")
retry++
leader = 0
Debug = true
LogLevel = Debug
time.Sleep(100 * time.Millisecond)
continue
}
@ -509,7 +460,7 @@ func TestServerMultiNode(t *testing.T) {
t.Fatalf("wrong leader number %v", leader)
}
if leader == 1 {
Debug = false
LogLevel = 0
break
}
}

12
test.go
View File

@ -96,20 +96,20 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]*
//--------------------------------------
type testTransporter struct {
sendVoteRequestFunc func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error)
sendAppendEntriesRequestFunc func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error)
sendSnapshotRequestFunc func(server *Server, peer *Peer, req *SnapshotRequest) (*SnapshotResponse, error)
sendVoteRequestFunc func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
sendAppendEntriesRequestFunc func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
sendSnapshotRequestFunc func(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
}
func (t *testTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
func (t *testTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return t.sendVoteRequestFunc(server, peer, req)
}
func (t *testTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
func (t *testTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return t.sendAppendEntriesRequestFunc(server, peer, req)
}
func (t *testTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) (*SnapshotResponse, error) {
func (t *testTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
return t.sendSnapshotRequestFunc(server, peer, req)
}

View File

@ -9,7 +9,7 @@ package raft
// Transporter is the interface for allowing the host application to transport
// requests to other nodes.
type Transporter interface {
SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error)
SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error)
SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) (*SnapshotResponse, error)
SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
}