Add multi-node failure with re-election test.

pull/820/head
Ben Johnson 2013-05-05 15:41:55 -06:00
parent 9de292c636
commit f3441b8bfb
5 changed files with 214 additions and 50 deletions

67
peer.go
View File

@ -29,11 +29,16 @@ type Peer struct {
// Creates a new peer.
func NewPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer {
return &Peer{
p := &Peer{
server: server,
name: name,
heartbeatTimer: NewTimer(heartbeatTimeout, heartbeatTimeout),
}
// Start the heartbeat timeout.
go p.heartbeatTimeoutFunc()
return p
}
//------------------------------------------------------------------------------
@ -63,6 +68,35 @@ func (p *Peer) SetHeartbeatTimeout(duration time.Duration) {
//
//------------------------------------------------------------------------------
//--------------------------------------
// State
//--------------------------------------
// Resumes the peer heartbeating.
func (p *Peer) resume() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.heartbeatTimer.Reset()
}
// Pauses the peer to prevent heartbeating.
func (p *Peer) pause() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.heartbeatTimer.Pause()
}
// Stops the peer entirely.
func (p *Peer) stop() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.heartbeatTimer.Stop()
}
//--------------------------------------
// Flush
//--------------------------------------
// Generates and sends an AppendEntries RPC from the server for the peer.
// This serves to replicate the log and to provide a heartbeat mechanism. It
// returns the current term from the peer, whether the flush was successful
@ -94,6 +128,7 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest, handler func(*Server,
// log. Send the request through the user-provided handler and process the
// result.
resp, err := handler(p.server, p, req)
p.heartbeatTimer.Reset()
if resp == nil {
return 0, false, err
}
@ -113,3 +148,33 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest, handler func(*Server,
return resp.Term, resp.Success, err
}
//--------------------------------------
// Heartbeat
//--------------------------------------
// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
func (p *Peer) heartbeatTimeoutFunc() {
for {
// Grab the current timer channel.
p.mutex.Lock()
var c chan time.Time
if p.heartbeatTimer != nil {
c = p.heartbeatTimer.C()
}
p.mutex.Unlock()
// If the channel or timer are gone then exit.
if c == nil {
break
}
// Flush the peer when we get a heartbeat timeout. If the channel is
// closed then the peer is getting cleaned up and we should exit.
if _, ok := <- c; ok {
p.flush()
} else {
break
}
}
}

View File

@ -159,6 +159,26 @@ func (s *Server) SetElectionTimeout(duration time.Duration) {
s.electionTimer.SetMaxDuration(duration * 2)
}
//--------------------------------------
// Heartbeat timeout
//--------------------------------------
// Retrieves the heartbeat timeout.
func (s *Server) HeartbeatTimeout() time.Duration {
return s.heartbeatTimeout
}
// Sets the heartbeat timeout.
func (s *Server) SetHeartbeatTimeout(duration time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.heartbeatTimeout = duration
for _, peer := range s.peers {
peer.SetHeartbeatTimeout(duration)
}
}
//------------------------------------------------------------------------------
//
// Methods
@ -187,6 +207,12 @@ func (s *Server) Start() error {
// Update the state.
s.state = Follower
for _, peer := range s.peers {
peer.pause()
}
// Start the election timeout.
go s.electionTimeoutFunc()
return nil
}
@ -200,6 +226,8 @@ func (s *Server) Stop() {
// Unloads the server.
func (s *Server) unload() {
s.electionTimer.Stop()
if s.log != nil {
s.log.Close()
s.log = nil
@ -321,12 +349,20 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons
s.mutex.Lock()
defer s.mutex.Unlock()
// If the server is stopped then reject it.
if !s.Running() {
return NewAppendEntriesResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Server stopped")
}
// If the request is coming from an old term then reject it.
if req.Term < s.currentTerm {
return NewAppendEntriesResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Stale request term")
}
s.setCurrentTerm(req.Term)
s.state = Follower
for _, peer := range s.peers {
peer.pause()
}
// Reset election timeout.
s.electionTimer.Reset()
@ -479,9 +515,11 @@ func (s *Server) promoteToLeader(term uint64, lastLogIndex uint64, lastLogTerm u
return false
}
// Move server to become a leader.
// Move server to become a leader and begin peer heartbeats.
s.state = Leader
// TODO: Begin heartbeat to peers.
for _, peer := range s.peers {
peer.resume()
}
return true
}
@ -493,31 +531,31 @@ func (s *Server) promoteToLeader(term uint64, lastLogIndex uint64, lastLogTerm u
// Requests a vote from a server. A vote can be obtained if the vote's term is
// at the server's current term and the server has not made a vote yet. A vote
// can also be obtained if the term is greater than the server's current term.
func (s *Server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
// If the request is coming from an old term then reject it.
if req.Term < s.currentTerm {
return NewRequestVoteResponse(s.currentTerm, false)
return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Stale term: %v < %v", req.Term, s.currentTerm)
}
s.setCurrentTerm(req.Term)
// If we've already voted for a different candidate then don't vote for this candidate.
if s.votedFor != "" && s.votedFor != req.CandidateName {
return NewRequestVoteResponse(s.currentTerm, false)
return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Already voted for %v", s.votedFor)
}
// If the candidate's log is not at least as up-to-date as our committed log then don't vote.
lastCommitIndex, lastCommitTerm := s.log.CommitInfo()
if lastCommitIndex > req.LastLogIndex || lastCommitTerm > req.LastLogTerm {
return NewRequestVoteResponse(s.currentTerm, false)
return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Out-of-date log: [%v/%v] > [%v/%v]", lastCommitIndex, lastCommitTerm, req.LastLogIndex, req.LastLogTerm)
}
// If we made it this far then cast a vote and reset our election time out.
s.votedFor = req.CandidateName
s.electionTimer.Reset()
return NewRequestVoteResponse(s.currentTerm, true)
return NewRequestVoteResponse(s.currentTerm, true), nil
}
// Executes the handler for sending a RequestVote RPC.
@ -536,6 +574,35 @@ func (s *Server) setCurrentTerm(term uint64) {
s.currentTerm = term
s.votedFor = ""
s.state = Follower
for _, peer := range s.peers {
peer.pause()
}
}
}
// Listens to the election timeout and kicks off a new election.
func (s *Server) electionTimeoutFunc() {
for {
// Grab the current timer channel.
s.mutex.Lock()
var c chan time.Time
if s.electionTimer != nil {
c = s.electionTimer.C()
}
s.mutex.Unlock()
// If the channel or timer are gone then exit.
if c == nil {
break
}
// If an election times out then promote this server. If the channel
// closes then that means the server has stopped so kill the function.
if _, ok := <- c; ok {
s.promote()
} else {
break
}
}
}
@ -557,8 +624,9 @@ func (s *Server) Join(name string) error {
// If joining self then promote to leader.
if s.name == name {
s.currentTerm++
s.state = Leader
// TODO: Begin heartbeat to peers.
s.electionTimer.Pause()
return nil
}

View File

@ -20,9 +20,9 @@ import (
// Ensure that we can request a vote from a server that has not voted.
func TestServerRequestVote(t *testing.T) {
server := newTestServer("1")
resp := server.RequestVote(NewRequestVoteRequest(1, "foo", 0, 0))
if !(resp.Term == 1 && resp.VoteGranted) {
t.Fatalf("Invalid request vote response: %v/%v", resp.Term, resp.VoteGranted)
resp, err := server.RequestVote(NewRequestVoteRequest(1, "foo", 0, 0))
if !(resp.Term == 1 && resp.VoteGranted && err == nil) {
t.Fatalf("Invalid request vote response: %v/%v (%v)", resp.Term, resp.VoteGranted, err)
}
}
@ -31,9 +31,9 @@ func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) {
server := newTestServer("1")
server.state = Leader
server.currentTerm = 2
resp := server.RequestVote(NewRequestVoteRequest(1, "foo", 0, 0))
if !(resp.Term == 2 && !resp.VoteGranted) {
t.Fatalf("Invalid request vote response: %v/%v", resp.Term, resp.VoteGranted)
resp, err := server.RequestVote(NewRequestVoteRequest(1, "foo", 0, 0))
if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Stale term: 1 < 2") {
t.Fatalf("Invalid request vote response: %v/%v (%v)", resp.Term, resp.VoteGranted, err)
}
if server.currentTerm != 2 && server.state != Follower {
t.Fatalf("Server did not update term and demote: %v / %v", server.currentTerm, server.state)
@ -44,13 +44,13 @@ func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) {
func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) {
server := newTestServer("1")
server.currentTerm = 2
resp := server.RequestVote(NewRequestVoteRequest(2, "foo", 0, 0))
if !(resp.Term == 2 && resp.VoteGranted) {
t.Fatalf("First vote should not have been denied")
resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 0, 0))
if !(resp.Term == 2 && resp.VoteGranted && err == nil) {
t.Fatalf("First vote should not have been denied (%v)", err)
}
resp = server.RequestVote(NewRequestVoteRequest(2, "bar", 0, 0))
if !(resp.Term == 2 && !resp.VoteGranted) {
t.Fatalf("Second vote should have been denied")
resp, err = server.RequestVote(NewRequestVoteRequest(2, "bar", 0, 0))
if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Already voted for foo") {
t.Fatalf("Second vote should have been denied (%v)", err)
}
}
@ -58,13 +58,13 @@ func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) {
func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
server := newTestServer("1")
server.currentTerm = 2
resp := server.RequestVote(NewRequestVoteRequest(2, "foo", 0, 0))
if !(resp.Term == 2 && resp.VoteGranted && server.VotedFor() == "foo") {
t.Fatalf("First vote should not have been denied")
resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 0, 0))
if !(resp.Term == 2 && resp.VoteGranted && server.VotedFor() == "foo" && err == nil) {
t.Fatalf("First vote should not have been denied (%v)", err)
}
resp = server.RequestVote(NewRequestVoteRequest(3, "bar", 0, 0))
if !(resp.Term == 3 && resp.VoteGranted && server.VotedFor() == "bar") {
t.Fatalf("Second vote should have been approved")
resp, err = server.RequestVote(NewRequestVoteRequest(3, "bar", 0, 0))
if !(resp.Term == 3 && resp.VoteGranted && server.VotedFor() == "bar" && err == nil) {
t.Fatalf("Second vote should have been approved (%v)", err)
}
}
@ -76,21 +76,21 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
`6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}`+"\n")
server.Start()
resp := server.RequestVote(NewRequestVoteRequest(1, "foo", 2, 2))
if !(resp.Term == 1 && !resp.VoteGranted) {
t.Fatalf("Stale index vote should have been denied")
resp, err := server.RequestVote(NewRequestVoteRequest(1, "foo", 2, 2))
if !(resp.Term == 1 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Out-of-date log: [3/2] > [2/2]") {
t.Fatalf("Stale index vote should have been denied (%v)", err)
}
resp = server.RequestVote(NewRequestVoteRequest(1, "foo", 3, 1))
if !(resp.Term == 1 && !resp.VoteGranted) {
t.Fatalf("Stale term vote should have been denied")
resp, err = server.RequestVote(NewRequestVoteRequest(1, "foo", 3, 1))
if !(resp.Term == 1 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Out-of-date log: [3/2] > [3/1]") {
t.Fatalf("Stale term vote should have been denied (%v)", err)
}
resp = server.RequestVote(NewRequestVoteRequest(1, "foo", 3, 2))
if !(resp.Term == 1 && resp.VoteGranted) {
t.Fatalf("Matching log vote should have been granted")
resp, err = server.RequestVote(NewRequestVoteRequest(1, "foo", 3, 2))
if !(resp.Term == 1 && resp.VoteGranted && err == nil) {
t.Fatalf("Matching log vote should have been granted (%v)", err)
}
resp = server.RequestVote(NewRequestVoteRequest(1, "foo", 4, 3))
if !(resp.Term == 1 && resp.VoteGranted) {
t.Fatalf("Ahead-of-log vote should have been granted")
resp, err = server.RequestVote(NewRequestVoteRequest(1, "foo", 4, 3))
if !(resp.Term == 1 && resp.VoteGranted && err == nil) {
t.Fatalf("Ahead-of-log vote should have been granted (%v)", err)
}
}
@ -111,7 +111,7 @@ func TestServerPromoteSelf(t *testing.T) {
func TestServerPromote(t *testing.T) {
servers, lookup := newTestCluster([]string{"1", "2", "3"})
servers.SetRequestVoteHandler(func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
return lookup[peer.Name()].RequestVote(req), nil
return lookup[peer.Name()].RequestVote(req)
})
leader := servers[0]
if success, err := leader.promote(); !(success && err == nil && leader.state == Leader) {
@ -125,7 +125,7 @@ func TestServerPromoteDoubleElection(t *testing.T) {
lookup["2"].currentTerm, lookup["2"].votedFor = 1, "2"
lookup["3"].currentTerm, lookup["3"].votedFor = 1, "3"
servers.SetRequestVoteHandler(func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
return lookup[peer.Name()].RequestVote(req), nil
return lookup[peer.Name()].RequestVote(req)
})
leader := servers[0]
if success, err := leader.promote(); !(success && err == nil && leader.state == Leader && leader.currentTerm == 2) {
@ -290,12 +290,20 @@ func TestServerMultiNode(t *testing.T) {
servers := map[string]*Server{}
for _, name := range names {
server := newTestServer(name)
server.SetElectionTimeout(TestElectionTimeout)
server.SetHeartbeatTimeout(TestHeartbeatTimeout)
server.DoHandler = func(server *Server, peer *Peer, command Command) error {
mutex.Lock()
s := servers[peer.name]
mutex.Unlock()
return s.Do(command)
}
server.RequestVoteHandler = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
mutex.Lock()
s := servers[peer.name]
mutex.Unlock()
return s.RequestVote(req)
}
server.AppendEntriesHandler = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
mutex.Lock()
s := servers[peer.name]
@ -317,11 +325,23 @@ func TestServerMultiNode(t *testing.T) {
// Check that two peers exist on leader.
mutex.Lock()
defer mutex.Unlock()
leader := servers["1"]
if leader.MemberCount() != 3 {
t.Fatalf("Expected member count to be 3, got %v", leader.MemberCount())
}
mutex.Unlock()
// Stop the first server and wait for a re-election.
time.Sleep(500 * time.Millisecond)
leader.Stop()
time.Sleep(500 * time.Millisecond)
// Check that either server 2 or 3 is the leader now.
mutex.Lock()
if servers["2"].state != Leader && servers["3"].state != Leader {
t.Fatalf("Expected leader re-election: 2=%v, 3=%v", servers["2"].state, servers["3"].state)
}
mutex.Unlock()
// Stop the servers.
for _, server := range servers {

View File

@ -16,7 +16,7 @@ import (
// reset and stop. It also allows for the duration of the timer to be a random
// number between a min and max duration.
type Timer struct {
C chan time.Time
c chan time.Time
rand *rand.Rand
minDuration time.Duration
maxDuration time.Duration
@ -42,7 +42,7 @@ func NewTimer(minDuration time.Duration, maxDuration time.Duration) *Timer {
panic("raft.Timer: Minimum duration cannot be greater than maximum duration")
}
return &Timer{
C: make(chan time.Time, 1),
c: make(chan time.Time, 1),
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
minDuration: minDuration,
maxDuration: maxDuration,
@ -55,6 +55,13 @@ func NewTimer(minDuration time.Duration, maxDuration time.Duration) *Timer {
//
//------------------------------------------------------------------------------
// Retrieves the timer's channel.
func (t *Timer) C() chan time.Time {
t.mutex.Lock()
defer t.mutex.Unlock()
return t.c
}
// Retrieves the minimum duration of the timer.
func (t *Timer) MinDuration() time.Duration {
return t.minDuration
@ -105,8 +112,9 @@ func (t *Timer) Stop() {
t.internalTimer = nil
}
if t.C != nil {
close(t.C)
if t.c != nil {
close(t.c)
t.c = nil
}
}
@ -132,7 +140,10 @@ func (t *Timer) Reset() {
}
// Start a timer that will go off between the min and max duration.
d := t.minDuration + time.Duration(t.rand.Int63n(int64(t.maxDuration-t.minDuration)))
d := t.minDuration
if t.maxDuration > t.minDuration {
d += time.Duration(t.rand.Int63n(int64(t.maxDuration-t.minDuration)))
}
t.internalTimer = time.NewTimer(d)
go func() {
defer func() {
@ -149,7 +160,7 @@ func (t *Timer) Reset() {
if internalTimer != nil {
if v, ok := <-internalTimer.C; ok {
t.mutex.Lock()
t.C <- v
t.c <- v
t.mutex.Unlock()
}
}

View File

@ -19,7 +19,7 @@ func TestTimerReset(t *testing.T) {
timer := NewTimer(5*time.Millisecond, 10*time.Millisecond)
go func() {
for {
if _, ok := <-timer.C; ok {
if _, ok := <-timer.C(); ok {
mutex.Lock()
count++
timer.Reset()
@ -62,7 +62,7 @@ func TestTimerPause(t *testing.T) {
count := 0
timer := NewTimer(10*time.Millisecond, 20*time.Millisecond)
go func() {
<-timer.C
<-timer.C()
mutex.Lock()
count++
mutex.Unlock()