Fix peer timer race condition.
parent
c544519c7c
commit
8efbb1535e
12
peer.go
12
peer.go
|
@ -35,8 +35,10 @@ func NewPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer
|
||||||
heartbeatTimer: NewTimer(heartbeatTimeout, heartbeatTimeout),
|
heartbeatTimer: NewTimer(heartbeatTimeout, heartbeatTimeout),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the heartbeat timeout.
|
// Start the heartbeat timeout and wait for the goroutine to start.
|
||||||
go p.heartbeatTimeoutFunc()
|
c := make(chan bool)
|
||||||
|
go p.heartbeatTimeoutFunc(c)
|
||||||
|
<-c
|
||||||
|
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
@ -149,10 +151,8 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error)
|
||||||
//--------------------------------------
|
//--------------------------------------
|
||||||
|
|
||||||
// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
|
// Listens to the heartbeat timeout and flushes an AppendEntries RPC.
|
||||||
func (p *Peer) heartbeatTimeoutFunc() {
|
func (p *Peer) heartbeatTimeoutFunc(startChannel chan bool) {
|
||||||
// Initialize the timer here since there can be a delay before this
|
startChannel <- true
|
||||||
// goroutine actually starts.
|
|
||||||
p.heartbeatTimer.Reset()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// Grab the current timer channel.
|
// Grab the current timer channel.
|
||||||
|
|
|
@ -721,6 +721,9 @@ func (s *Server) AddPeer(name string) error {
|
||||||
// Only add the peer if it doesn't have the same name.
|
// Only add the peer if it doesn't have the same name.
|
||||||
if s.name != name {
|
if s.name != name {
|
||||||
peer := NewPeer(s, name, s.heartbeatTimeout)
|
peer := NewPeer(s, name, s.heartbeatTimeout)
|
||||||
|
if s.state == Leader {
|
||||||
|
peer.resume()
|
||||||
|
}
|
||||||
s.peers[peer.name] = peer
|
s.peers[peer.name] = peer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -343,7 +343,8 @@ func TestServerMultiNode(t *testing.T) {
|
||||||
mutex.Lock()
|
mutex.Lock()
|
||||||
s := servers[peer.name]
|
s := servers[peer.name]
|
||||||
mutex.Unlock()
|
mutex.Unlock()
|
||||||
return s.RequestVote(req)
|
resp, err := s.RequestVote(req)
|
||||||
|
return resp, err
|
||||||
}
|
}
|
||||||
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
|
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
|
||||||
mutex.Lock()
|
mutex.Lock()
|
||||||
|
@ -375,7 +376,7 @@ func TestServerMultiNode(t *testing.T) {
|
||||||
servers[name] = server
|
servers[name] = server
|
||||||
mutex.Unlock()
|
mutex.Unlock()
|
||||||
}
|
}
|
||||||
time.Sleep(500 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
// Check that two peers exist on leader.
|
// Check that two peers exist on leader.
|
||||||
mutex.Lock()
|
mutex.Lock()
|
||||||
|
@ -385,9 +386,9 @@ func TestServerMultiNode(t *testing.T) {
|
||||||
mutex.Unlock()
|
mutex.Unlock()
|
||||||
|
|
||||||
// Stop the first server and wait for a re-election.
|
// Stop the first server and wait for a re-election.
|
||||||
time.Sleep(500 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
leader.Stop()
|
leader.Stop()
|
||||||
time.Sleep(500 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
// Check that either server 2 or 3 is the leader now.
|
// Check that either server 2 or 3 is the leader now.
|
||||||
mutex.Lock()
|
mutex.Lock()
|
||||||
|
|
Loading…
Reference in New Issue