timeout for transfere. This helps when the node is far left behind. The server will not wait for sending a large package before send the other heartbeat
parent
11de59fa06
commit
d3f5d2b7fc
47
peer.go
47
peer.go
|
@ -123,7 +123,7 @@ func (p *Peer) flush() (uint64, bool, error) {
|
|||
}
|
||||
|
||||
// send VoteRequest Request
|
||||
func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse){
|
||||
func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) {
|
||||
req.peer = p
|
||||
debugln(p.server.Name(), "Send Vote Request to ", p.Name())
|
||||
if resp, _ := p.server.transporter.SendVoteRequest(p.server, p, req); resp != nil {
|
||||
|
@ -174,16 +174,24 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error)
|
|||
//debugln("flush to ", p.Name())
|
||||
debugln("[HeartBeat] Leader ", p.server.Name(), " to ", p.Name(), " ", len(req.Entries), " ", time.Now())
|
||||
|
||||
if p.server.State() != Leader {
|
||||
return 0, false, errors.New("Not leader anymore")
|
||||
}
|
||||
respChan := make(chan *AppendEntriesResponse, 2)
|
||||
|
||||
resp, err := p.server.transporter.SendAppendEntriesRequest(p.server, p, req)
|
||||
go func() {
|
||||
tranResp, _ := p.server.transporter.SendAppendEntriesRequest(p.server, p, req)
|
||||
respChan <- tranResp
|
||||
}()
|
||||
|
||||
//debugln("receive flush response from ", p.Name())
|
||||
go func() {
|
||||
<-time.After(p.server.heartbeatTimeout)
|
||||
respChan <- nil
|
||||
}()
|
||||
|
||||
resp := <-respChan
|
||||
|
||||
debugln("receive flush response from ", p.Name())
|
||||
|
||||
if resp == nil {
|
||||
return 0, false, err
|
||||
return 0, false, errors.New("Network problem")
|
||||
}
|
||||
|
||||
// If successful then update the previous log index. If it was
|
||||
|
@ -196,27 +204,22 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error)
|
|||
}
|
||||
} else {
|
||||
|
||||
if p.server.State() != Leader {
|
||||
return 0, false, errors.New("Not leader anymore")
|
||||
}
|
||||
|
||||
if resp.Term > p.server.currentTerm {
|
||||
return resp.Term, false, errors.New("Step down")
|
||||
}
|
||||
// Decrement the previous log index down until we find a match. Don't
|
||||
// let it go below where the peer's commit index is though. That's a
|
||||
// problem.
|
||||
if p.prevLogIndex > 0 {
|
||||
|
||||
// we may miss a response from peer
|
||||
if resp.CommitIndex > p.prevLogIndex {
|
||||
p.prevLogIndex = resp.CommitIndex
|
||||
} else if p.prevLogIndex > 0 {
|
||||
// Decrement the previous log index down until we find a match. Don't
|
||||
// let it go below where the peer's commit index is though. That's a
|
||||
// problem.
|
||||
p.prevLogIndex--
|
||||
}
|
||||
if resp.CommitIndex > p.prevLogIndex {
|
||||
debugln("%v %v %v %v", resp.CommitIndex, p.prevLogIndex,
|
||||
p.server.currentTerm, resp.Term)
|
||||
panic("commitedIndex is greater than prevLogIndex")
|
||||
}
|
||||
}
|
||||
|
||||
return resp.Term, resp.Success, err
|
||||
}
|
||||
return resp.Term, resp.Success, nil
|
||||
}
|
||||
|
||||
//--------------------------------------
|
||||
|
|
Loading…
Reference in New Issue