From e63296ee29e1243d2e62742bab4d063c213c5f43 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Tue, 1 Apr 2014 10:21:31 -0700 Subject: [PATCH] fix: wait for all goroutines to finish before Stop Changes are as follows: 1. Use wait group to wait all goroutines to finish before Stop 2. Remove `stop` channel because its functionality could be replaced by the wait group 3. Change `stopped` type to `chan bool`, considering it doesn't need to transfer `stop` 4. Make `send` function also notified by `stopped`, and cancel the request when stopped 5. Error handling in HTTP handler functions which is caused by canceling requests --- http_transporter.go | 16 ++++++++++++ peer.go | 7 ++++- server.go | 63 +++++++++++++++++++++++++++++++-------------- 3 files changed, 66 insertions(+), 20 deletions(-) diff --git a/http_transporter.go b/http_transporter.go index 1ab06dd380..1173bc5e9f 100644 --- a/http_transporter.go +++ b/http_transporter.go @@ -243,6 +243,10 @@ func (t *HTTPTransporter) appendEntriesHandler(server Server) http.HandlerFunc { } resp := server.AppendEntries(req) + if resp == nil { + http.Error(w, "", http.StatusInternalServerError) + return + } if _, err := resp.Encode(w); err != nil { http.Error(w, "", http.StatusInternalServerError) return @@ -262,6 +266,10 @@ func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc { } resp := server.RequestVote(req) + if resp == nil { + http.Error(w, "", http.StatusInternalServerError) + return + } if _, err := resp.Encode(w); err != nil { http.Error(w, "", http.StatusInternalServerError) return @@ -281,6 +289,10 @@ func (t *HTTPTransporter) snapshotHandler(server Server) http.HandlerFunc { } resp := server.RequestSnapshot(req) + if resp == nil { + http.Error(w, "", http.StatusInternalServerError) + return + } if _, err := resp.Encode(w); err != nil { http.Error(w, "", http.StatusInternalServerError) return @@ -300,6 +312,10 @@ func (t *HTTPTransporter) snapshotRecoveryHandler(server Server) http.HandlerFun } resp := server.SnapshotRecoveryRequest(req) + if resp == nil { + http.Error(w, "", http.StatusInternalServerError) + return + } if _, err := resp.Encode(w); err != nil { http.Error(w, "", http.StatusInternalServerError) return diff --git a/peer.go b/peer.go index 83ecc683d4..c7e55b44d6 100644 --- a/peer.go +++ b/peer.go @@ -82,7 +82,12 @@ func (p *Peer) setPrevLogIndex(value uint64) { func (p *Peer) startHeartbeat() { p.stopChan = make(chan bool) c := make(chan bool) - go p.heartbeat(c) + + p.server.routineGroup.Add(1) + go func() { + defer p.server.routineGroup.Done() + p.heartbeat(c) + }() <-c } diff --git a/server.go b/server.go index 080953af60..bc1a232435 100644 --- a/server.go +++ b/server.go @@ -55,6 +55,7 @@ const ElectionTimeoutThresholdPercent = 0.8 var NotLeaderError = errors.New("raft.Server: Not current leader") var DuplicatePeerError = errors.New("raft.Server: Duplicate peer") var CommandTimeoutError = errors.New("raft: Command timeout") +var StopError = errors.New("raft: Has been stopped") //------------------------------------------------------------------------------ // @@ -123,7 +124,7 @@ type server struct { mutex sync.RWMutex syncedPeer map[string]bool - stopped chan chan bool + stopped chan bool c chan *ev electionTimeout time.Duration heartbeatInterval time.Duration @@ -140,6 +141,8 @@ type server struct { maxLogEntriesPerRequest uint64 connectionString string + + routineGroup sync.WaitGroup } // An internal event to be processed by the server's event loop. @@ -177,7 +180,6 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S state: Stopped, peers: make(map[string]*Peer), log: newLog(), - stopped: make(chan chan bool), c: make(chan *ev, 256), electionTimeout: DefaultElectionTimeout, heartbeatInterval: DefaultHeartbeatInterval, @@ -440,6 +442,9 @@ func (s *server) Start() error { return err } + // stopped needs to be allocated each time server starts + // because it is closed at `Stop`. + s.stopped = make(chan bool) s.setState(Follower) // If no log entries exist then @@ -457,7 +462,11 @@ func (s *server) Start() error { debugln(s.GetState()) - go s.loop() + s.routineGroup.Add(1) + go func() { + defer s.routineGroup.Done() + s.loop() + }() return nil } @@ -501,12 +510,14 @@ func (s *server) Init() error { // Shuts down the server. func (s *server) Stop() { - stop := make(chan bool) - s.stopped <- stop + if s.stopped != nil { + close(s.stopped) + } s.state = Stopped // make sure the server has stopped before we close the log - <-stop + s.routineGroup.Wait() + s.log.close() } @@ -599,8 +610,12 @@ func (s *server) loop() { func (s *server) send(value interface{}) (interface{}, error) { event := &ev{target: value, c: make(chan error, 1)} s.c <- event - err := <-event.c - return event.returnValue, err + select { + case <-s.stopped: + return nil, StopError + case err := <-event.c: + return event.returnValue, err + } } func (s *server) sendAsync(value interface{}) { @@ -614,7 +629,9 @@ func (s *server) sendAsync(value interface{}) { default: } + s.routineGroup.Add(1) go func() { + defer s.routineGroup.Done() s.c <- event }() } @@ -633,9 +650,8 @@ func (s *server) followerLoop() { var err error update := false select { - case stop := <-s.stopped: + case <-s.stopped: s.setState(Stopped) - stop <- true return case e := <-s.c: @@ -710,7 +726,11 @@ func (s *server) candidateLoop() { // Send RequestVote RPCs to all other servers. respChan = make(chan *RequestVoteResponse, len(s.peers)) for _, peer := range s.peers { - go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan) + s.routineGroup.Add(1) + go func(peer *Peer) { + defer s.routineGroup.Done() + peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan) + }(peer) } // Wait for either: @@ -733,9 +753,8 @@ func (s *server) candidateLoop() { // Collect votes from peers. select { - case stop := <-s.stopped: + case <-s.stopped: s.setState(Stopped) - stop <- true return case resp := <-respChan: @@ -779,19 +798,22 @@ func (s *server) leaderLoop() { // "Upon election: send initial empty AppendEntries RPCs (heartbeat) to // each server; repeat during idle periods to prevent election timeouts // (ยง5.2)". The heartbeats started above do the "idle" period work. - go s.Do(NOPCommand{}) + s.routineGroup.Add(1) + go func() { + defer s.routineGroup.Done() + s.Do(NOPCommand{}) + }() // Begin to collect response from followers for s.State() == Leader { var err error select { - case stop := <-s.stopped: + case <-s.stopped: // Stop all peers before stop for _, peer := range s.peers { peer.stopHeartbeat(false) } s.setState(Stopped) - stop <- true return case e := <-s.c: @@ -819,9 +841,8 @@ func (s *server) snapshotLoop() { for s.State() == Snapshotting { var err error select { - case stop := <-s.stopped: + case <-s.stopped: s.setState(Stopped) - stop <- true return case e := <-s.c: @@ -1102,7 +1123,11 @@ func (s *server) RemovePeer(name string) error { // So we might be holding log lock and waiting for log lock, // which lead to a deadlock. // TODO(xiangli) refactor log lock - go peer.stopHeartbeat(true) + s.routineGroup.Add(1) + go func() { + defer s.routineGroup.Done() + peer.stopHeartbeat(true) + }() } delete(s.peers, name)