From cae0e85ed75cb6fde220a1452156bd7b1fde0e08 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 6 Mar 2015 11:12:54 -0700 Subject: [PATCH] Update higher term on leader during stream request. This commit adds a check for newer terms in the streaming endpoint. --- raft/log.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/raft/log.go b/raft/log.go index 026869d628..9c16025804 100644 --- a/raft/log.go +++ b/raft/log.go @@ -82,6 +82,7 @@ type Log struct { state State // current node state heartbeats chan heartbeat // incoming heartbeat channel + terms chan uint64 // incoming channel of newer terms lastLogTerm uint64 // highest term in the log lastLogIndex uint64 // highest index in the log @@ -143,6 +144,7 @@ func NewLog() *Log { Transport: &HTTPTransport{}, Rand: rand.NewSource(time.Now().UnixNano()).Int63, heartbeats: make(chan heartbeat, 1), + terms: make(chan uint64, 1), } l.SetLogOutput(os.Stderr) return l @@ -880,9 +882,20 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State { case <-closing: // wait for state change. return Stopped + case newTerm := <-l.terms: // step down on higher term + if newTerm > term { + l.mu.Lock() + l.term = newTerm + l.truncate() + l.mu.Unlock() + return Follower + } + continue + case hb := <-l.heartbeats: // step down on higher term if hb.term > term { l.mu.Lock() + l.term = hb.term l.truncate() l.mu.Unlock() return Follower @@ -1461,12 +1474,16 @@ func (l *Log) initWriter(w io.Writer, id, term, index uint64) (*logWriter, error // Do not begin streaming if: // 1. Node is not the leader. - // 2. Term is earlier than current term. + // 2. Term is after current term. // 3. Index is after the commit index. if l.state != Leader { return nil, ErrNotLeader } else if term > l.term { - return nil, ErrStaleTerm + select { + case l.terms <- term: + default: + } + return nil, ErrNotLeader } else if index > l.lastLogIndex { return nil, ErrUncommittedIndex }