Fix timer cleanup.

pull/820/head
Ben Johnson 2013-05-26 18:02:31 -06:00
parent 1337175e63
commit 7503eee58b
6 changed files with 71 additions and 33 deletions

View File

@ -19,10 +19,10 @@ type AppendEntriesRequest struct {
// The response returned from a server appending entries to the log.
type AppendEntriesResponse struct {
peer *Peer
Term uint64 `json:"term"`
Success bool `json:"success"`
CommitIndex uint64 `json:"commitIndex"`
peer *Peer
Term uint64 `json:"term"`
Success bool `json:"success"`
CommitIndex uint64 `json:"commitIndex"`
}
//------------------------------------------------------------------------------
@ -46,8 +46,8 @@ func NewAppendEntriesRequest(term uint64, leaderName string, prevLogIndex uint64
// Creates a new AppendEntries response.
func NewAppendEntriesResponse(term uint64, success bool, commitIndex uint64) *AppendEntriesResponse {
return &AppendEntriesResponse{
Term: term,
Success: success,
CommitIndex: commitIndex,
Term: term,
Success: success,
CommitIndex: commitIndex,
}
}

View File

@ -235,8 +235,16 @@ func (s *Server) Stop() {
// Unloads the server.
func (s *Server) unload() {
// Kill the election timer.
s.electionTimer.Stop()
// Remove peers.
for _, peer := range s.peers {
peer.stop()
}
s.peers = make(map[string]*Peer)
// Close the log.
if s.log != nil {
s.log.Close()
s.log = nil
@ -316,7 +324,7 @@ loop:
return fmt.Errorf("raft.Server: Higher term discovered, stepping down: (%v > %v)", s.currentTerm, currentTerm)
}
responseCount++
case <-afterBetween(s.ElectionTimeout(), s.ElectionTimeout() * 2):
case <-afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2):
break loop
}
}
@ -461,7 +469,7 @@ func (s *Server) promote() (bool, error) {
}
votes[resp.peer.Name()] = resp.VoteGranted
}
case <-afterBetween(s.ElectionTimeout(), s.ElectionTimeout() * 2):
case <-afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2):
break loop
}
}
@ -646,5 +654,7 @@ func (s *Server) Join(name string) error {
}
// Request membership if we are joining to another server.
return s.executeDoHandler(NewPeer(s, name, s.heartbeatTimeout), command)
peer := NewPeer(s, name, s.heartbeatTimeout)
defer peer.stop()
return s.executeDoHandler(peer, command)
}

View File

@ -21,6 +21,7 @@ import (
func TestServerRequestVote(t *testing.T) {
server := newTestServer("1")
server.Start()
defer server.Stop()
resp, err := server.RequestVote(NewRequestVoteRequest(1, "foo", 0, 0))
if !(resp.Term == 1 && resp.VoteGranted && err == nil) {
t.Fatalf("Invalid request vote response: %v/%v (%v)", resp.Term, resp.VoteGranted, err)
@ -33,6 +34,7 @@ func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) {
server.state = Leader
server.currentTerm = 2
server.Start()
defer server.Stop()
resp, err := server.RequestVote(NewRequestVoteRequest(1, "foo", 0, 0))
if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Stale term: 1 < 2") {
t.Fatalf("Invalid request vote response: %v/%v (%v)", resp.Term, resp.VoteGranted, err)
@ -47,6 +49,7 @@ func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) {
server := newTestServer("1")
server.currentTerm = 2
server.Start()
defer server.Stop()
resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 0, 0))
if !(resp.Term == 2 && resp.VoteGranted && err == nil) {
t.Fatalf("First vote should not have been denied (%v)", err)
@ -62,6 +65,7 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
server := newTestServer("1")
server.currentTerm = 2
server.Start()
defer server.Stop()
resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 0, 0))
if !(resp.Term == 2 && resp.VoteGranted && server.VotedFor() == "foo" && err == nil) {
t.Fatalf("First vote should not have been denied (%v)", err)
@ -81,6 +85,7 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
if err := server.Start(); err != nil {
t.Fatalf("Unable to start server: %v", err)
}
defer server.Stop()
resp, err := server.RequestVote(NewRequestVoteRequest(2, "foo", 2, 2))
if !(resp.Term == 2 && !resp.VoteGranted && err != nil && err.Error() == "raft.Server: Out-of-date log: [3/2] > [2/2]") {
@ -108,6 +113,7 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
func TestServerPromoteSelf(t *testing.T) {
server := newTestServer("1")
server.Start()
defer server.Stop()
if success, err := server.promote(); !(success && err == nil && server.state == Leader) {
t.Fatalf("Server self-promotion failed: %v (%v)", server.state, err)
}
@ -119,6 +125,9 @@ func TestServerPromote(t *testing.T) {
servers.SetRequestVoteHandler(func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
return lookup[peer.Name()].RequestVote(req)
})
for _, server := range servers {
defer server.Stop()
}
leader := servers[0]
if success, err := leader.promote(); !(success && err == nil && leader.state == Leader) {
t.Fatalf("Server promotion in cluster failed: %v (%v)", leader.state, err)
@ -133,6 +142,9 @@ func TestServerPromoteDoubleElection(t *testing.T) {
servers.SetRequestVoteHandler(func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
return lookup[peer.Name()].RequestVote(req)
})
for _, server := range servers {
defer server.Stop()
}
leader := servers[0]
if success, err := leader.promote(); !(success && err == nil && leader.state == Leader && leader.currentTerm == 2) {
t.Fatalf("Server promotion in cluster failed: %v (%v)", leader.state, err)
@ -183,8 +195,6 @@ func TestServerAppendEntries(t *testing.T) {
if index, term := server.log.CommitInfo(); !(index == 3 && term == 1) {
t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
}
server.Stop()
}
// Ensure that entries with stale terms are rejected.
@ -269,6 +279,7 @@ func TestServerSingleNode(t *testing.T) {
if err := server.Start(); err != nil {
t.Fatalf("Unable to start server: %v", err)
}
defer server.Stop()
if server.state != Follower {
t.Fatalf("Unexpected server state: %v", server.state)
}
@ -294,6 +305,9 @@ func TestServerMultiNode(t *testing.T) {
var mutex sync.Mutex
names := []string{"1", "2", "3"}
servers := map[string]*Server{}
for _, server := range servers {
defer server.Stop()
}
for _, name := range names {
server := newTestServer(name)
server.SetElectionTimeout(TestElectionTimeout)

View File

@ -9,7 +9,7 @@ import (
// the returned channel.
func afterBetween(min time.Duration, max time.Duration) <-chan time.Time {
rand := rand.New(rand.NewSource(time.Now().UnixNano()))
d, delta := min, (max-min)
d, delta := min, (max - min)
if delta > 0 {
d += time.Duration(rand.Int63n(int64(delta)))
}

View File

@ -17,6 +17,10 @@ import (
// number between a min and max duration.
type Timer struct {
c chan time.Time
// Used to break the goroutine listening for the internalTimer since the
// Timer struct won't close its channel automatically.
resetChannel chan bool
rand *rand.Rand
minDuration time.Duration
maxDuration time.Duration
@ -42,10 +46,10 @@ func NewTimer(minDuration time.Duration, maxDuration time.Duration) *Timer {
panic("raft.Timer: Minimum duration cannot be greater than maximum duration")
}
return &Timer{
c: make(chan time.Time, 1),
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
minDuration: minDuration,
maxDuration: maxDuration,
c: make(chan time.Time, 1),
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
minDuration: minDuration,
maxDuration: maxDuration,
}
}
@ -107,10 +111,7 @@ func (t *Timer) Stop() {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.internalTimer != nil {
t.internalTimer.Stop()
t.internalTimer = nil
}
t.stopInternalTimer()
if t.c != nil {
close(t.c)
@ -123,9 +124,16 @@ func (t *Timer) Pause() {
t.mutex.Lock()
defer t.mutex.Unlock()
t.stopInternalTimer()
}
// Stops the timer and closes the channel.
func (t *Timer) stopInternalTimer() {
if t.internalTimer != nil {
t.internalTimer.Stop()
t.internalTimer = nil
close(t.resetChannel)
t.resetChannel = nil
}
}
@ -136,7 +144,7 @@ func (t *Timer) Reset() {
// Stop the timer if it's already running.
if t.internalTimer != nil {
t.internalTimer.Stop()
t.stopInternalTimer()
}
// Start a timer that will go off between the min and max duration.
@ -145,24 +153,19 @@ func (t *Timer) Reset() {
d += time.Duration(t.rand.Int63n(int64(t.maxDuration - t.minDuration)))
}
t.internalTimer = time.NewTimer(d)
t.resetChannel = make(chan bool, 1)
internalTimer, resetChannel := t.internalTimer, t.resetChannel
go func() {
defer func() {
recover()
}()
// Retrieve the current internal timer.
t.mutex.Lock()
internalTimer := t.internalTimer
t.mutex.Unlock()
// If the timer exists then grab the value from the channel and pass
// it through to the timer's external channel.
if internalTimer != nil {
if v, ok := <-internalTimer.C; ok {
select {
case v, ok := <-internalTimer.C:
if ok {
t.mutex.Lock()
t.c <- v
t.mutex.Unlock()
}
case <-resetChannel:
}
}()
}

11
z_test.go Normal file
View File

@ -0,0 +1,11 @@
package raft
import (
"testing"
"time"
)
func TestGC(t *testing.T) {
<-time.After(500 * time.Millisecond)
// panic("Oh god no!")
}