Fix timer race condition.

pull/820/head
Ben Johnson 2013-06-03 19:16:50 -04:00
parent ba9b7739e4
commit c544519c7c
4 changed files with 61 additions and 16 deletions

View File

@ -150,6 +150,10 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error)
// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
func (p *Peer) heartbeatTimeoutFunc() {
// Initialize the timer here since there can be a delay before this
// goroutine actually starts.
p.heartbeatTimer.Reset()
for {
// Grab the current timer channel.
p.mutex.Lock()

View File

@ -265,7 +265,6 @@ func (s *Server) Start() error {
// Start the election timeout.
go s.electionTimeoutFunc()
s.electionTimer.Reset()
return nil
}
@ -280,7 +279,10 @@ func (s *Server) Stop() {
// Unloads the server.
func (s *Server) unload() {
// Kill the election timer.
s.electionTimer.Stop()
if s.electionTimer != nil {
s.electionTimer.Stop()
s.electionTimer = nil
}
// Remove peers.
for _, peer := range s.peers {
@ -369,8 +371,12 @@ func (s *Server) do(command Command) error {
if err != nil {
return
} else if term > currentTerm {
s.mutex.Lock()
s.setCurrentTerm(term)
s.electionTimer.Reset()
if s.electionTimer != nil {
s.electionTimer.Reset()
}
s.mutex.Unlock()
return
}
@ -433,7 +439,9 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons
s.leader = req.LeaderName
// Reset election timeout.
s.electionTimer.Reset()
if s.electionTimer != nil {
s.electionTimer.Reset()
}
// Reject if log doesn't contain a matching previous entry.
if err := s.log.Truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
@ -481,7 +489,10 @@ func (s *Server) createInternalAppendEntriesRequest(prevLogIndex uint64) *Append
func (s *Server) promote() (bool, error) {
for {
// Start a new election.
term, lastLogIndex, lastLogTerm := s.promoteToCandidate()
term, lastLogIndex, lastLogTerm, err := s.promoteToCandidate()
if err != nil {
return false, err
}
// Request votes from each of our peers.
c := make(chan *RequestVoteResponse, len(s.peers))
@ -521,8 +532,12 @@ func (s *Server) promote() (bool, error) {
if resp != nil {
// Step down if we discover a higher term.
if resp.Term > term {
s.mutex.Lock()
s.setCurrentTerm(term)
s.electionTimer.Reset()
if s.electionTimer != nil {
s.electionTimer.Reset()
}
s.mutex.Unlock()
return false, fmt.Errorf("raft.Server: Higher term discovered, stepping down: (%v > %v)", resp.Term, term)
}
votes[resp.peer.Name()] = resp.VoteGranted
@ -551,10 +566,15 @@ func (s *Server) promote() (bool, error) {
// Promotes the server to a candidate and increases the election term. The
// term and log state are returned for use in the RPCs.
func (s *Server) promoteToCandidate() (term uint64, lastLogIndex uint64, lastLogTerm uint64) {
func (s *Server) promoteToCandidate() (uint64, uint64, uint64, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
// Ignore promotion if the server is not a follower.
if s.state != Follower && s.state != Candidate {
return 0, 0, 0, fmt.Errorf("raft: Invalid promotion state: %s", s.state)
}
// Move server to become a candidate, increase our term & vote for ourself.
s.state = Candidate
s.currentTerm++
@ -564,8 +584,8 @@ func (s *Server) promoteToCandidate() (term uint64, lastLogIndex uint64, lastLog
s.electionTimer.Pause()
// Return server state so we can check for it during leader promotion.
lastLogIndex, lastLogTerm = s.log.CommitInfo()
return s.currentTerm, lastLogIndex, lastLogTerm
lastLogIndex, lastLogTerm := s.log.CommitInfo()
return s.currentTerm, lastLogIndex, lastLogTerm, nil
}
// Promotes the server from a candidate to a leader. This can only occur if
@ -632,7 +652,9 @@ func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, err
// If we made it this far then cast a vote and reset our election time out.
s.votedFor = req.CandidateName
s.electionTimer.Reset()
if s.electionTimer != nil {
s.electionTimer.Reset()
}
return NewRequestVoteResponse(s.currentTerm, true), nil
}
@ -652,6 +674,14 @@ func (s *Server) setCurrentTerm(term uint64) {
// Listens to the election timeout and kicks off a new election.
func (s *Server) electionTimeoutFunc() {
// Initialize the timer here since there can be a delay before this
// goroutine actually starts.
s.mutex.Lock()
if s.electionTimer != nil {
s.electionTimer.Reset()
}
s.mutex.Unlock()
for {
// Grab the current timer channel.
s.mutex.Lock()

View File

@ -126,6 +126,9 @@ func TestServerPromote(t *testing.T) {
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
return lookup[peer.Name()].RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
return lookup[peer.Name()].AppendEntries(req)
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
for _, server := range servers {
defer server.Stop()
@ -141,7 +144,12 @@ func TestServerPromoteDoubleElection(t *testing.T) {
lookup := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
return lookup[peer.Name()].RequestVote(req)
resp, err := lookup[peer.Name()].RequestVote(req)
return resp, err
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
resp, err := lookup[peer.Name()].AppendEntries(req)
return resp, err
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
lookup["2"].currentTerm, lookup["2"].votedFor = 1, "2"
@ -155,11 +163,12 @@ func TestServerPromoteDoubleElection(t *testing.T) {
if success, err := leader.promote(); !(success && err == nil && leader.state == Leader && leader.currentTerm == 2) {
t.Fatalf("Server promotion in cluster failed: %v (%v)", leader.state, err)
}
if lookup["2"].VotedFor() != "1" {
t.Fatalf("Unexpected vote for server 2: %v", lookup["2"].VotedFor())
time.Sleep(50 * time.Millisecond)
if lookup["2"].votedFor != "1" {
t.Fatalf("Unexpected vote for server 2: %v", lookup["2"].votedFor)
}
if lookup["3"].VotedFor() != "1" {
t.Fatalf("Unexpected vote for server 3: %v", lookup["3"].VotedFor())
if lookup["3"].votedFor != "1" {
t.Fatalf("Unexpected vote for server 3: %v", lookup["3"].votedFor)
}
}

View File

@ -162,7 +162,9 @@ func (t *Timer) Reset() {
case v, ok := <-internalTimer.C:
if ok {
t.mutex.Lock()
t.c <- v
if t.c != nil {
t.c <- v
}
t.mutex.Unlock()
}
case <-resetChannel: