return the index of the commited command to the application via do() and add Index func to get the current committed index of the server

pull/820/head
Xiang Li 2013-06-28 16:14:41 -07:00
parent 52f97d0c3a
commit 5c6766e13e
2 changed files with 31 additions and 5 deletions

19
peer.go
View File

@ -4,6 +4,7 @@ import (
"errors"
"sync"
"time"
"fmt"
)
//------------------------------------------------------------------------------
@ -163,6 +164,11 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error)
// result.
//debugln("flush to ", p.Name())
debugln("[HeartBeat] Leader ", p.server.Name(), " to ", p.Name(), " ", len(req.Entries), " ", time.Now())
if p.server.State() != Leader {
return 0, false, errors.New("Not leader anymore")
}
resp, err := p.server.transporter.SendAppendEntriesRequest(p.server, p, req)
//debugln("receive flush response from ", p.Name())
@ -180,6 +186,15 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error)
debugln("Peer ", p.Name(), "'s' log update to ", p.prevLogIndex)
}
} else {
if p.server.State() != Leader {
return 0, false, errors.New("Not leader anymore")
}
if resp.Term > p.server.currentTerm {
return resp.Term, false, errors.New("Step down")
}
// Decrement the previous log index down until we find a match. Don't
// let it go below where the peer's commit index is though. That's a
// problem.
@ -187,6 +202,10 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error)
p.prevLogIndex--
}
if resp.CommitIndex > p.prevLogIndex {
fmt.Println(err)
fmt.Printf("%v %v %v %v", resp.CommitIndex, p.prevLogIndex,
p.server.currentTerm, resp.Term)
panic("ai")
p.prevLogIndex = resp.CommitIndex
}
}

View File

@ -175,6 +175,13 @@ func (s *Server) Term() uint64 {
return s.currentTerm
}
// Retrieves the current committed index of the server.
func (s *Server) CommittedIndex() uint64 {
return s.log.CommitIndex()
}
// Retrieves the name of the candidate this server voted for in this term.
func (s *Server) VotedFor() string {
return s.votedFor
@ -452,14 +459,14 @@ func (s *Server) Running() bool {
// Attempts to execute a command and replicate it. The function will return
// when the command has been successfully committed or an error has occurred.
func (s *Server) Do(command Command) (interface{}, error) {
func (s *Server) Do(command Command) (interface{}, uint64, error) {
if s.state != Leader {
return nil, NotLeaderError
return nil, 0, NotLeaderError
}
entry := s.log.CreateEntry(s.currentTerm, command)
if err := s.log.AppendEntry(entry); err != nil {
return nil, err
return nil, 0, err
}
s.response <- FlushResponse{s.currentTerm, true, nil, nil}
@ -481,10 +488,10 @@ func (s *Server) Do(command Command) (interface{}, error) {
debugln("[Do] finish!")
result := entry.result
entry.result = nil
return result, nil
return result, entry.Index, nil
case <-time.After(time.Second):
debugln("[Do] fail!")
return nil, errors.New("Command commit fails")
return nil, 0, errors.New("Command commit fails")
}
}