Update term change immediately, only send signal.

pull/2236/head
Ben Johnson 2015-04-09 17:07:12 -06:00
parent 61cd46333a
commit c757039f70
1 changed files with 38 additions and 67 deletions

View File

@ -154,7 +154,7 @@ type Log struct {
// Incoming heartbeats and term changes go to these channels
// and are picked up by the current state.
heartbeats chan heartbeat
terms chan uint64
terms chan struct{}
// Close notification and wait.
wg sync.WaitGroup
@ -201,7 +201,7 @@ func NewLog() *Log {
Transport: &HTTPTransport{},
Rand: rand.NewSource(time.Now().UnixNano()).Int63,
heartbeats: make(chan heartbeat, 10),
terms: make(chan uint64, 10),
terms: make(chan struct{}, 1),
Logger: log.New(os.Stderr, "[raft] ", log.LstdFlags),
}
l.updateLogPrefix()
@ -538,7 +538,11 @@ func (l *Log) setTerm(term uint64) error {
}
// mustSetTerm sets the current term and clears the vote. Panic on error.
func (l *Log) mustSetTerm(term uint64) {
func (l *Log) mustSetTermIfHigher(term uint64) {
if term <= l.term {
return
}
if err := l.setTerm(term); err != nil {
panic("unable to set term: " + err.Error())
}
@ -863,21 +867,12 @@ func (l *Log) followerLoop(closing <-chan struct{}) State {
// Update term, commit index & leader.
l.mu.Lock()
if hb.term > l.term {
l.mustSetTerm(hb.term)
}
l.mustSetTermIfHigher(hb.term)
if hb.commitIndex > l.commitIndex {
l.commitIndex = hb.commitIndex
}
l.leaderID = hb.leaderID
l.mu.Unlock()
case term := <-l.terms:
l.mu.Lock()
if term > l.term {
l.mustSetTerm(term)
}
l.mu.Unlock()
}
}
}
@ -960,9 +955,14 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State {
// Increment term and request votes.
l.mu.Lock()
l.mustSetTerm(l.term + 1)
l.mustSetTermIfHigher(l.term + 1)
l.votedFor = l.id
term := l.term
select {
case <-l.terms:
default:
}
l.mu.Unlock()
// Ensure all candidate goroutines complete before transitioning to another state.
@ -981,27 +981,13 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State {
return Stopped
case hb := <-l.heartbeats:
l.mu.Lock()
if hb.term >= term {
l.mustSetTerm(hb.term)
l.mustSetTermIfHigher(hb.term)
if hb.term >= l.term {
l.leaderID = hb.leaderID
l.mu.Unlock()
return Follower
}
l.mu.Unlock()
case newTerm := <-l.terms:
// Ignore if it's not after this current term.
if newTerm <= term {
continue
}
// Check against the current term since that may have changed.
l.mu.Lock()
if newTerm > l.term {
l.mustSetTerm(newTerm)
l.mu.Unlock()
return Follower
}
l.mu.Unlock()
case <-l.terms:
return Follower
case <-elected:
return Leader
case ch := <-l.Clock.AfterElectionTimeout():
@ -1034,12 +1020,11 @@ func (l *Log) elect(term uint64, elected chan struct{}, wg *sync.WaitGroup) {
peerTerm, err := l.Transport.RequestVote(n.URL, term, id, lastLogIndex, lastLogTerm)
l.Logger.Printf("send req vote(term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (term=%d, err=%v)", term, id, lastLogIndex, lastLogTerm, peerTerm, err)
// If an error occured then send the peer's term.
// If an error occured then update term.
if err != nil {
select {
case l.terms <- peerTerm:
default:
}
l.mu.Lock()
l.mustSetTermIfHigher(peerTerm)
l.mu.Unlock()
return
}
votes <- struct{}{}
@ -1078,6 +1063,11 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State {
// Retrieve leader's term.
l.mu.Lock()
term := l.term
select {
case <-l.terms:
default:
}
l.mu.Unlock()
// Read log from leader in a separate goroutine.
@ -1092,25 +1082,16 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State {
case <-closing: // wait for state change.
return Stopped
case newTerm := <-l.terms: // step down on higher term
if newTerm > term {
l.mu.Lock()
l.mustSetTerm(newTerm)
l.truncateTo(l.commitIndex)
l.mu.Unlock()
return Follower
}
continue
case <-l.terms: // step down on higher term
l.mu.Lock()
l.truncateTo(l.commitIndex)
l.mu.Unlock()
return Follower
case hb := <-l.heartbeats: // step down on higher term
if hb.term > term {
l.mu.Lock()
l.mustSetTerm(hb.term)
l.truncateTo(l.commitIndex)
l.mu.Unlock()
return Follower
}
continue
case hb := <-l.heartbeats: // update term, if necessary
l.mu.Lock()
l.mustSetTermIfHigher(hb.term)
l.mu.Unlock()
case commitIndex, ok := <-committed:
// Quorum not reached, try again.
@ -1614,14 +1595,7 @@ func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (
}
// Notify term change.
l.term = term
l.votedFor = 0
if term > l.term {
select {
case l.terms <- term:
default:
}
}
l.mustSetTermIfHigher(term)
// Reject request if log is out of date.
if lastLogTerm < l.lastLogTerm {
@ -1675,10 +1649,7 @@ func (l *Log) initWriter(w io.Writer, id, term, index uint64) (*logWriter, error
if l.state != Leader {
return nil, ErrNotLeader
} else if term > l.term {
select {
case l.terms <- term:
default:
}
l.mustSetTermIfHigher(term)
return nil, ErrNotLeader
}