Update higher term on leader during stream request.

This commit adds a check for newer terms in the streaming endpoint.
pull/1872/head
Ben Johnson 2015-03-06 11:12:54 -07:00
parent 20b964b496
commit cae0e85ed7
1 changed files with 19 additions and 2 deletions

View File

@ -82,6 +82,7 @@ type Log struct {
state State // current node state
heartbeats chan heartbeat // incoming heartbeat channel
terms chan uint64 // incoming channel of newer terms
lastLogTerm uint64 // highest term in the log
lastLogIndex uint64 // highest index in the log
@ -143,6 +144,7 @@ func NewLog() *Log {
Transport: &HTTPTransport{},
Rand: rand.NewSource(time.Now().UnixNano()).Int63,
heartbeats: make(chan heartbeat, 1),
terms: make(chan uint64, 1),
}
l.SetLogOutput(os.Stderr)
return l
@ -880,9 +882,20 @@ func (l *Log) leaderLoop(closing <-chan struct{}) State {
case <-closing: // wait for state change.
return Stopped
case newTerm := <-l.terms: // step down on higher term
if newTerm > term {
l.mu.Lock()
l.term = newTerm
l.truncate()
l.mu.Unlock()
return Follower
}
continue
case hb := <-l.heartbeats: // step down on higher term
if hb.term > term {
l.mu.Lock()
l.term = hb.term
l.truncate()
l.mu.Unlock()
return Follower
@ -1461,12 +1474,16 @@ func (l *Log) initWriter(w io.Writer, id, term, index uint64) (*logWriter, error
// Do not begin streaming if:
// 1. Node is not the leader.
// 2. Term is earlier than current term.
// 2. Term is after current term.
// 3. Index is after the commit index.
if l.state != Leader {
return nil, ErrNotLeader
} else if term > l.term {
return nil, ErrStaleTerm
select {
case l.terms <- term:
default:
}
return nil, ErrNotLeader
} else if index > l.lastLogIndex {
return nil, ErrUncommittedIndex
}