fix: wait for all goroutines to finish before Stop
Changes are as follows: 1. Use wait group to wait all goroutines to finish before Stop 2. Remove `stop` channel because its functionality could be replaced by the wait group 3. Change `stopped` type to `chan bool`, considering it doesn't need to transfer `stop` 4. Make `send` function also notified by `stopped`, and cancel the request when stopped 5. Error handling in HTTP handler functions which is caused by canceling requestspull/820/head
parent
c6bdf639c2
commit
e63296ee29
|
@ -243,6 +243,10 @@ func (t *HTTPTransporter) appendEntriesHandler(server Server) http.HandlerFunc {
|
|||
}
|
||||
|
||||
resp := server.AppendEntries(req)
|
||||
if resp == nil {
|
||||
http.Error(w, "", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if _, err := resp.Encode(w); err != nil {
|
||||
http.Error(w, "", http.StatusInternalServerError)
|
||||
return
|
||||
|
@ -262,6 +266,10 @@ func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc {
|
|||
}
|
||||
|
||||
resp := server.RequestVote(req)
|
||||
if resp == nil {
|
||||
http.Error(w, "", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if _, err := resp.Encode(w); err != nil {
|
||||
http.Error(w, "", http.StatusInternalServerError)
|
||||
return
|
||||
|
@ -281,6 +289,10 @@ func (t *HTTPTransporter) snapshotHandler(server Server) http.HandlerFunc {
|
|||
}
|
||||
|
||||
resp := server.RequestSnapshot(req)
|
||||
if resp == nil {
|
||||
http.Error(w, "", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if _, err := resp.Encode(w); err != nil {
|
||||
http.Error(w, "", http.StatusInternalServerError)
|
||||
return
|
||||
|
@ -300,6 +312,10 @@ func (t *HTTPTransporter) snapshotRecoveryHandler(server Server) http.HandlerFun
|
|||
}
|
||||
|
||||
resp := server.SnapshotRecoveryRequest(req)
|
||||
if resp == nil {
|
||||
http.Error(w, "", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if _, err := resp.Encode(w); err != nil {
|
||||
http.Error(w, "", http.StatusInternalServerError)
|
||||
return
|
||||
|
|
7
peer.go
7
peer.go
|
@ -82,7 +82,12 @@ func (p *Peer) setPrevLogIndex(value uint64) {
|
|||
func (p *Peer) startHeartbeat() {
|
||||
p.stopChan = make(chan bool)
|
||||
c := make(chan bool)
|
||||
go p.heartbeat(c)
|
||||
|
||||
p.server.routineGroup.Add(1)
|
||||
go func() {
|
||||
defer p.server.routineGroup.Done()
|
||||
p.heartbeat(c)
|
||||
}()
|
||||
<-c
|
||||
}
|
||||
|
||||
|
|
63
server.go
63
server.go
|
@ -55,6 +55,7 @@ const ElectionTimeoutThresholdPercent = 0.8
|
|||
var NotLeaderError = errors.New("raft.Server: Not current leader")
|
||||
var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
|
||||
var CommandTimeoutError = errors.New("raft: Command timeout")
|
||||
var StopError = errors.New("raft: Has been stopped")
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
//
|
||||
|
@ -123,7 +124,7 @@ type server struct {
|
|||
mutex sync.RWMutex
|
||||
syncedPeer map[string]bool
|
||||
|
||||
stopped chan chan bool
|
||||
stopped chan bool
|
||||
c chan *ev
|
||||
electionTimeout time.Duration
|
||||
heartbeatInterval time.Duration
|
||||
|
@ -140,6 +141,8 @@ type server struct {
|
|||
maxLogEntriesPerRequest uint64
|
||||
|
||||
connectionString string
|
||||
|
||||
routineGroup sync.WaitGroup
|
||||
}
|
||||
|
||||
// An internal event to be processed by the server's event loop.
|
||||
|
@ -177,7 +180,6 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
|
|||
state: Stopped,
|
||||
peers: make(map[string]*Peer),
|
||||
log: newLog(),
|
||||
stopped: make(chan chan bool),
|
||||
c: make(chan *ev, 256),
|
||||
electionTimeout: DefaultElectionTimeout,
|
||||
heartbeatInterval: DefaultHeartbeatInterval,
|
||||
|
@ -440,6 +442,9 @@ func (s *server) Start() error {
|
|||
return err
|
||||
}
|
||||
|
||||
// stopped needs to be allocated each time server starts
|
||||
// because it is closed at `Stop`.
|
||||
s.stopped = make(chan bool)
|
||||
s.setState(Follower)
|
||||
|
||||
// If no log entries exist then
|
||||
|
@ -457,7 +462,11 @@ func (s *server) Start() error {
|
|||
|
||||
debugln(s.GetState())
|
||||
|
||||
go s.loop()
|
||||
s.routineGroup.Add(1)
|
||||
go func() {
|
||||
defer s.routineGroup.Done()
|
||||
s.loop()
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -501,12 +510,14 @@ func (s *server) Init() error {
|
|||
|
||||
// Shuts down the server.
|
||||
func (s *server) Stop() {
|
||||
stop := make(chan bool)
|
||||
s.stopped <- stop
|
||||
if s.stopped != nil {
|
||||
close(s.stopped)
|
||||
}
|
||||
s.state = Stopped
|
||||
|
||||
// make sure the server has stopped before we close the log
|
||||
<-stop
|
||||
s.routineGroup.Wait()
|
||||
|
||||
s.log.close()
|
||||
}
|
||||
|
||||
|
@ -599,8 +610,12 @@ func (s *server) loop() {
|
|||
func (s *server) send(value interface{}) (interface{}, error) {
|
||||
event := &ev{target: value, c: make(chan error, 1)}
|
||||
s.c <- event
|
||||
err := <-event.c
|
||||
return event.returnValue, err
|
||||
select {
|
||||
case <-s.stopped:
|
||||
return nil, StopError
|
||||
case err := <-event.c:
|
||||
return event.returnValue, err
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) sendAsync(value interface{}) {
|
||||
|
@ -614,7 +629,9 @@ func (s *server) sendAsync(value interface{}) {
|
|||
default:
|
||||
}
|
||||
|
||||
s.routineGroup.Add(1)
|
||||
go func() {
|
||||
defer s.routineGroup.Done()
|
||||
s.c <- event
|
||||
}()
|
||||
}
|
||||
|
@ -633,9 +650,8 @@ func (s *server) followerLoop() {
|
|||
var err error
|
||||
update := false
|
||||
select {
|
||||
case stop := <-s.stopped:
|
||||
case <-s.stopped:
|
||||
s.setState(Stopped)
|
||||
stop <- true
|
||||
return
|
||||
|
||||
case e := <-s.c:
|
||||
|
@ -710,7 +726,11 @@ func (s *server) candidateLoop() {
|
|||
// Send RequestVote RPCs to all other servers.
|
||||
respChan = make(chan *RequestVoteResponse, len(s.peers))
|
||||
for _, peer := range s.peers {
|
||||
go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
|
||||
s.routineGroup.Add(1)
|
||||
go func(peer *Peer) {
|
||||
defer s.routineGroup.Done()
|
||||
peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
|
||||
}(peer)
|
||||
}
|
||||
|
||||
// Wait for either:
|
||||
|
@ -733,9 +753,8 @@ func (s *server) candidateLoop() {
|
|||
|
||||
// Collect votes from peers.
|
||||
select {
|
||||
case stop := <-s.stopped:
|
||||
case <-s.stopped:
|
||||
s.setState(Stopped)
|
||||
stop <- true
|
||||
return
|
||||
|
||||
case resp := <-respChan:
|
||||
|
@ -779,19 +798,22 @@ func (s *server) leaderLoop() {
|
|||
// "Upon election: send initial empty AppendEntries RPCs (heartbeat) to
|
||||
// each server; repeat during idle periods to prevent election timeouts
|
||||
// (§5.2)". The heartbeats started above do the "idle" period work.
|
||||
go s.Do(NOPCommand{})
|
||||
s.routineGroup.Add(1)
|
||||
go func() {
|
||||
defer s.routineGroup.Done()
|
||||
s.Do(NOPCommand{})
|
||||
}()
|
||||
|
||||
// Begin to collect response from followers
|
||||
for s.State() == Leader {
|
||||
var err error
|
||||
select {
|
||||
case stop := <-s.stopped:
|
||||
case <-s.stopped:
|
||||
// Stop all peers before stop
|
||||
for _, peer := range s.peers {
|
||||
peer.stopHeartbeat(false)
|
||||
}
|
||||
s.setState(Stopped)
|
||||
stop <- true
|
||||
return
|
||||
|
||||
case e := <-s.c:
|
||||
|
@ -819,9 +841,8 @@ func (s *server) snapshotLoop() {
|
|||
for s.State() == Snapshotting {
|
||||
var err error
|
||||
select {
|
||||
case stop := <-s.stopped:
|
||||
case <-s.stopped:
|
||||
s.setState(Stopped)
|
||||
stop <- true
|
||||
return
|
||||
|
||||
case e := <-s.c:
|
||||
|
@ -1102,7 +1123,11 @@ func (s *server) RemovePeer(name string) error {
|
|||
// So we might be holding log lock and waiting for log lock,
|
||||
// which lead to a deadlock.
|
||||
// TODO(xiangli) refactor log lock
|
||||
go peer.stopHeartbeat(true)
|
||||
s.routineGroup.Add(1)
|
||||
go func() {
|
||||
defer s.routineGroup.Done()
|
||||
peer.stopHeartbeat(true)
|
||||
}()
|
||||
}
|
||||
|
||||
delete(s.peers, name)
|
||||
|
|
Loading…
Reference in New Issue