diff --git a/peer.go b/peer.go index 7e1618874f..8f86e8d9ed 100644 --- a/peer.go +++ b/peer.go @@ -123,7 +123,7 @@ func (p *Peer) flush() (uint64, bool, error) { } // send VoteRequest Request -func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse){ +func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) { req.peer = p debugln(p.server.Name(), "Send Vote Request to ", p.Name()) if resp, _ := p.server.transporter.SendVoteRequest(p.server, p, req); resp != nil { @@ -174,16 +174,24 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error) //debugln("flush to ", p.Name()) debugln("[HeartBeat] Leader ", p.server.Name(), " to ", p.Name(), " ", len(req.Entries), " ", time.Now()) - if p.server.State() != Leader { - return 0, false, errors.New("Not leader anymore") - } + respChan := make(chan *AppendEntriesResponse, 2) - resp, err := p.server.transporter.SendAppendEntriesRequest(p.server, p, req) + go func() { + tranResp, _ := p.server.transporter.SendAppendEntriesRequest(p.server, p, req) + respChan <- tranResp + }() - //debugln("receive flush response from ", p.Name()) + go func() { + <-time.After(p.server.heartbeatTimeout) + respChan <- nil + }() + + resp := <-respChan + + debugln("receive flush response from ", p.Name()) if resp == nil { - return 0, false, err + return 0, false, errors.New("Network problem") } // If successful then update the previous log index. If it was @@ -196,27 +204,22 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error) } } else { - if p.server.State() != Leader { - return 0, false, errors.New("Not leader anymore") - } - if resp.Term > p.server.currentTerm { return resp.Term, false, errors.New("Step down") } - // Decrement the previous log index down until we find a match. Don't - // let it go below where the peer's commit index is though. That's a - // problem. - if p.prevLogIndex > 0 { + + // we may miss a response from peer + if resp.CommitIndex > p.prevLogIndex { + p.prevLogIndex = resp.CommitIndex + } else if p.prevLogIndex > 0 { + // Decrement the previous log index down until we find a match. Don't + // let it go below where the peer's commit index is though. That's a + // problem. p.prevLogIndex-- } - if resp.CommitIndex > p.prevLogIndex { - debugln("%v %v %v %v", resp.CommitIndex, p.prevLogIndex, - p.server.currentTerm, resp.Term) - panic("commitedIndex is greater than prevLogIndex") - } - } - return resp.Term, resp.Success, err + } + return resp.Term, resp.Success, nil } //--------------------------------------