From 9ff2969d38ce55356df377cbd5c8f6fe3ae90892 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 20 Feb 2015 13:52:12 -0700 Subject: [PATCH 1/2] Add set up code for raft internal tests. --- raft/clock.go | 3 +- raft/internal_test.go | 68 +++++++++++++++++++++++++++++++++++++++++++ raft/log.go | 2 +- raft/log_test.go | 36 +++++------------------ 4 files changed, 79 insertions(+), 30 deletions(-) diff --git a/raft/clock.go b/raft/clock.go index d85fe76ece..27ba32ffa7 100644 --- a/raft/clock.go +++ b/raft/clock.go @@ -43,7 +43,8 @@ func (c *Clock) AfterApplyInterval() <-chan chan struct{} { return newClockChan( // AfterElectionTimeout returns a channel that fires after a duration that is // between the election timeout and double the election timeout. func (c *Clock) AfterElectionTimeout() <-chan chan struct{} { - return newClockChan(c.ElectionTimeout + time.Duration(rand.Intn(int(c.ElectionTimeout)))) + d := c.ElectionTimeout + time.Duration(rand.Intn(int(c.ElectionTimeout))) + return newClockChan(d) } // AfterHeartbeatInterval returns a channel that fires after the heartbeat interval. diff --git a/raft/internal_test.go b/raft/internal_test.go index 1b8575bc9b..a84cf0bd64 100644 --- a/raft/internal_test.go +++ b/raft/internal_test.go @@ -1,5 +1,73 @@ package raft +import ( + "encoding/binary" + "io" + "io/ioutil" + "os" + "testing" +) + +func TestLog_followerLoop(t *testing.T) { + l := NewInitializedLog() + defer CloseLog(l) +} + func (l *Log) WaitUncommitted(index uint64) error { return l.waitUncommitted(index) } func (l *Log) WaitCommitted(index uint64) error { return l.waitCommitted(index) } func (l *Log) WaitApplied(index uint64) error { return l.Wait(index) } + +// NewOpenedLog returns an opened Log. Panic on error. +func NewOpenedLog() *Log { + l := NewLog() + l.FSM = &IndexFSM{} + if err := l.Open(tempfile()); err != nil { + panic(err.Error()) + } + return l +} + +// NewInitializedLog returns an opened & initialized Log. Panic on error. +func NewInitializedLog() *Log { + l := NewOpenedLog() + if err := l.Initialize(); err != nil { + panic(err.Error()) + } + return l +} + +// CloseLog closes a log and deletes its underlying path. +func CloseLog(l *Log) { + defer os.RemoveAll(l.Path()) + l.Close() +} + +// IndexFSM represents a state machine that only records the last applied index. +type IndexFSM struct { + index uint64 +} + +// MustApply updates the index. +func (fsm *IndexFSM) MustApply(entry *LogEntry) { fsm.index = entry.Index } + +// Index returns the highest applied index. +func (fsm *IndexFSM) Index() (uint64, error) { return fsm.index, nil } + +// Snapshot writes the FSM's index as the snapshot. +func (fsm *IndexFSM) Snapshot(w io.Writer) (uint64, error) { + return fsm.index, binary.Write(w, binary.BigEndian, fsm.index) +} + +// Restore reads the snapshot from the reader. +func (fsm *IndexFSM) Restore(r io.Reader) error { + return binary.Read(r, binary.BigEndian, &fsm.index) +} + +// tempfile returns the path to a non-existent file in the temp directory. +func tempfile() string { + f, _ := ioutil.TempFile("", "raft-") + path := f.Name() + f.Close() + os.Remove(path) + return path +} diff --git a/raft/log.go b/raft/log.go index 0f62773cfe..4eec38798b 100644 --- a/raft/log.go +++ b/raft/log.go @@ -1386,7 +1386,7 @@ func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) ( return ErrAlreadyVoted } else if lastLogTerm < l.lastLogTerm { return ErrOutOfDateLog - } else if lastLogTerm == l.term && lastLogIndex < l.lastLogIndex { + } else if lastLogTerm == l.lastLogTerm && lastLogIndex < l.lastLogIndex { return ErrOutOfDateLog } diff --git a/raft/log_test.go b/raft/log_test.go index 001a5e4362..6d18fc8945 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -297,8 +297,9 @@ func TestCluster_Elect_RealTime(t *testing.T) { // Verify FSM indicies match. for i, l := range c.Logs { - if exp, fsm := commandN+minIndex, l.FSM.(*IndexFSM); exp != fsm.index { - t.Errorf("fsm index mismatch(%d): exp=%d, got=%d", i, exp, fsm.index) + fsmIndex, _ := l.FSM.(*raft.IndexFSM).Index() + if exp := commandN + minIndex; exp != fsmIndex { + t.Errorf("fsm index mismatch(%d): exp=%d, got=%d", i, exp, fsmIndex) } } } @@ -333,35 +334,13 @@ func benchmarkClusterApply(b *testing.B, logN int) { // Verify FSM indicies match. for i, l := range c.Logs { - if fsm := l.FSM.(*IndexFSM); index != fsm.index { - b.Errorf("fsm index mismatch(%d): exp=%d, got=%d", i, index, fsm.index) + fsmIndex, _ := l.FSM.(*raft.IndexFSM).Index() + if index != fsmIndex { + b.Errorf("fsm index mismatch(%d): exp=%d, got=%d", i, index, fsmIndex) } } } -// IndexFSM represents a state machine that only records the last applied index. -type IndexFSM struct { - index uint64 -} - -// MustApply updates the index. -func (fsm *IndexFSM) MustApply(entry *raft.LogEntry) { fsm.index = entry.Index } - -// Index returns the highest applied index. -func (fsm *IndexFSM) Index() (uint64, error) { return fsm.index, nil } - -// Snapshot writes the FSM's index as the snapshot. -func (fsm *IndexFSM) Snapshot(w io.Writer) (uint64, error) { - return fsm.index, binary.Write(w, binary.BigEndian, fsm.index) -} - -// Restore reads the snapshot from the reader. -func (fsm *IndexFSM) Restore(r io.Reader) error { - return binary.Read(r, binary.BigEndian, &fsm.index) -} - -func indexFSMFunc() raft.FSM { return &IndexFSM{} } - // Cluster represents a collection of nodes that share the same mock clock. type Cluster struct { Logs []*Log @@ -643,7 +622,8 @@ func (fsm *FSM) Restore(r io.Reader) error { return nil } -func fsmFunc() raft.FSM { return &FSM{} } +func fsmFunc() raft.FSM { return &FSM{} } +func indexFSMFunc() raft.FSM { return &raft.IndexFSM{} } // seq implements the raft.Log#Rand interface and returns incrementing ints. func seq() func() int64 { From a7a8e0bba4135174fd38a984ba01ae1af301db98 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 20 Feb 2015 13:52:45 -0700 Subject: [PATCH 2/2] Fix candidate heartbeat step down. --- raft/log.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft/log.go b/raft/log.go index 4eec38798b..9c674b6e20 100644 --- a/raft/log.go +++ b/raft/log.go @@ -788,7 +788,7 @@ func (l *Log) candidateLoop(closing <-chan struct{}) State { return Stopped case hb := <-l.heartbeats: l.mu.Lock() - if hb.term > l.term { + if hb.term >= l.term { l.term = hb.term l.votedFor = 0 l.leaderID = hb.leaderID