pull/820/head
Ben Johnson 2013-07-07 14:55:55 -06:00
parent 08e2d519ae
commit 743b684370
2 changed files with 26 additions and 31 deletions

28
peer.go
View File

@ -13,11 +13,11 @@ import (
// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
server *Server
name string
prevLogIndex uint64
mutex sync.Mutex
stopChan chan bool
server *Server
name string
prevLogIndex uint64
mutex sync.Mutex
stopChan chan bool
heartbeatTimeout time.Duration
}
@ -30,9 +30,9 @@ type Peer struct {
// Creates a new peer.
func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer {
return &Peer{
server: server,
name: name,
stopChan: make(chan bool),
server: server,
name: name,
stopChan: make(chan bool),
heartbeatTimeout: heartbeatTimeout,
}
}
@ -53,7 +53,6 @@ func (p *Peer) setHeartbeatTimeout(duration time.Duration) {
p.heartbeatTimeout = duration
}
//------------------------------------------------------------------------------
//
// Methods
@ -68,7 +67,7 @@ func (p *Peer) setHeartbeatTimeout(duration time.Duration) {
func (p *Peer) startHeartbeat() {
c := make(chan bool)
go p.heartbeat(c)
<- c
<-c
}
// Stops the peer heartbeat.
@ -103,7 +102,7 @@ func (p *Peer) heartbeat(c chan bool) {
select {
case <-p.stopChan:
return
case <-time.After(p.heartbeatTimeout):
p.flush()
}
@ -142,8 +141,8 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
}
traceln("peer.flush.success: ", p.server.Name(), "->", p.Name(), "; idx =", p.prevLogIndex)
// If it was unsuccessful then decrement the previous log index and
// we'll try again next time.
// If it was unsuccessful then decrement the previous log index and
// we'll try again next time.
} else {
// we may miss a response from peer
if resp.CommitIndex >= p.prevLogIndex {
@ -180,7 +179,7 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
} else {
debugln("peer.snap.failed: ", p.name)
}
// Send response to server for processing.
p.server.send(resp)
}
@ -198,4 +197,3 @@ func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteRespo
c <- resp
}
}

View File

@ -59,12 +59,12 @@ type Server struct {
context interface{}
currentTerm uint64
votedFor string
log *Log
leader string
peers map[string]*Peer
mutex sync.Mutex
stateMutex sync.Mutex
votedFor string
log *Log
leader string
peers map[string]*Peer
mutex sync.Mutex
stateMutex sync.Mutex
commitCount int
electionTimer *timer
@ -347,7 +347,6 @@ func (s *Server) setCurrentTerm(term uint64, leaderName string) {
}
}
//--------------------------------------
// Event Loop
//--------------------------------------
@ -423,11 +422,11 @@ func (s *Server) followerLoop() {
// Callback to event.
e.c <- err
case <-afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2):
s.state = Candidate
}
// Exit loop on state change.
if s.state != Follower {
break
@ -480,7 +479,7 @@ func (s *Server) candidateLoop() {
break
}
case e := <- s.c:
case e := <-s.c:
var err error
if e.target == &stopValue {
s.state = Stopped
@ -500,7 +499,7 @@ func (s *Server) candidateLoop() {
break
}
}
if s.state != Candidate {
break
}
@ -543,7 +542,7 @@ func (s *Server) leaderLoop() {
// Callback to event.
e.c <- err
}
// Exit loop on state change.
if s.state != Leader {
break
@ -649,12 +648,12 @@ func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
s.setCurrentTerm(resp.Term, "")
return
}
// Ignore response if it's not successful.
if !resp.Success {
return
}
// Increment the commit count to make sure we have a quorum before committing.
s.commitCount++
if s.commitCount < s.QuorumSize() {
@ -977,7 +976,6 @@ func (s *Server) LoadSnapshot() error {
return err
}
//--------------------------------------
// Debugging
//--------------------------------------
@ -989,4 +987,3 @@ func (s *Server) debugln(v ...interface{}) {
func (s *Server) traceln(v ...interface{}) {
tracef("[%s] %s", s.name, fmt.Sprintln(v...))
}