Merge pull request #70 from coreos/master

Commit a NOP after the server becomes leader
pull/820/head
Ben Johnson 2013-07-11 11:26:55 -07:00
commit c0c294e397
7 changed files with 107 additions and 35 deletions

19
NOPCommand.go Normal file
View File

@ -0,0 +1,19 @@
package raft
//--------------------------------------
// NOP command
//--------------------------------------
// NOP command
type NOPCommand struct {
}
// The name of the NOP command in the log
func (c NOPCommand) CommandName() string {
return "nop"
}
// NOP
func (c NOPCommand) Apply(server *Server) (interface{}, error) {
return nil, nil
}

View File

@ -19,9 +19,12 @@ type AppendEntriesRequest struct {
// The response returned from a server appending entries to the log.
type AppendEntriesResponse struct {
Term uint64 `json:"term"`
// the current index of the server
Index uint64 `json:"index"`
Success bool `json:"success"`
CommitIndex uint64 `json:"commitIndex"`
peer string `json:"-"`
append bool `json:"-"`
}
//------------------------------------------------------------------------------
@ -43,10 +46,11 @@ func newAppendEntriesRequest(term uint64, leaderName string, prevLogIndex uint64
}
// Creates a new AppendEntries response.
func newAppendEntriesResponse(term uint64, success bool, commitIndex uint64) *AppendEntriesResponse {
func newAppendEntriesResponse(term uint64, success bool, index uint64, commitIndex uint64) *AppendEntriesResponse {
return &AppendEntriesResponse{
Term: term,
Success: success,
Index: index,
CommitIndex: commitIndex,
}
}

View File

@ -66,6 +66,7 @@ func RegisterCommand(command Command) {
panic(fmt.Sprintf("raft: Cannot register nil"))
} else if commandTypes[command.CommandName()] != nil {
panic(fmt.Sprintf("raft: Duplicate registration: %s", command.CommandName()))
return
}
commandTypes[command.CommandName()] = command
}

View File

@ -90,7 +90,7 @@ func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTrans
// Setup configuration.
for _, server := range *servers {
if _, err := (*servers)[0].Do(&joinCommand{Name: server.Name()}); err != nil {
t.Fatal("Server unable to join: %v", err)
t.Fatalf("Server %s unable to join: %v", server.Name(), err)
}
}

22
peer.go
View File

@ -174,26 +174,45 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
if resp.Success {
if len(req.Entries) > 0 {
p.prevLogIndex = req.Entries[len(req.Entries)-1].Index
// if peer append a log entry from the current term
// we set append to true
if req.Entries[len(req.Entries)-1].Term == p.server.currentTerm {
resp.append = true
}
}
traceln("peer.flush.success: ", p.server.Name(), "->", p.Name(), "; idx =", p.prevLogIndex)
// If it was unsuccessful then decrement the previous log index and
// we'll try again next time.
} else {
// we may miss a response from peer
if resp.CommitIndex >= p.prevLogIndex {
// we may miss a response from peer
// so maybe the peer has commited the logs we sent
// but we did not receive the success reply and did not increase
// the prevLogIndex
p.prevLogIndex = resp.CommitIndex
debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
} else if p.prevLogIndex > 0 {
// Decrement the previous log index down until we find a match. Don't
// let it go below where the peer's commit index is though. That's a
// problem.
p.prevLogIndex--
// if it not enough, we directly decrease to the index of the
if p.prevLogIndex > resp.Index {
p.prevLogIndex = resp.Index
}
debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
}
}
p.mutex.Unlock()
// Attach the peer to resp, thus server can know where it comes from
resp.peer = p.Name()
// Send response to server for processing.
p.server.send(resp)
}
@ -230,6 +249,7 @@ func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteRespo
debugln("peer.vote: ", p.server.Name(), "->", p.Name())
req.peer = p
if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil {
debugln("peer.vote: recv", p.server.Name(), "<-", p.Name())
resp.peer = p
c <- resp
}

View File

@ -64,7 +64,7 @@ type Server struct {
leader string
peers map[string]*Peer
mutex sync.RWMutex
commitCount int
syncedPeer map[string]bool
c chan *event
electionTimeout time.Duration
@ -294,6 +294,11 @@ func (s *Server) SetHeartbeatTimeout(duration time.Duration) {
// Initialization
//--------------------------------------
// Reg the NOPCommand
func init() {
RegisterCommand(&NOPCommand{})
}
// Starts the server with a log at the given path.
func (s *Server) Initialize() error {
@ -327,6 +332,7 @@ func (s *Server) StartFollower() {
func (s *Server) StartLeader() {
s.setState(Leader)
s.currentTerm++
s.debugln("leader start at term: ", s.currentTerm, " index: ", s.log.currentIndex())
go s.loop()
}
@ -513,6 +519,8 @@ func (s *Server) candidateLoop() {
} else if resp.Term > s.currentTerm {
s.debugln("server.candidate.vote.failed")
s.setCurrentTerm(resp.Term, "", false)
} else {
s.debugln("server.candidate.vote: denied")
}
case e := <-s.c:
@ -553,7 +561,7 @@ func (s *Server) candidateLoop() {
// The event loop that is run when the server is in a Candidate state.
func (s *Server) leaderLoop() {
s.setState(Leader)
s.commitCount = 0
s.syncedPeer = make(map[string]bool)
logIndex, _ := s.log.lastInfo()
// Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
@ -562,6 +570,8 @@ func (s *Server) leaderLoop() {
peer.startHeartbeat()
}
go s.Do(NOPCommand{})
// Begin to collect response from followers
for {
var err error
@ -594,6 +604,7 @@ func (s *Server) leaderLoop() {
for _, peer := range s.peers {
peer.stopHeartbeat()
}
s.syncedPeer = nil
}
//--------------------------------------
@ -635,7 +646,11 @@ func (s *Server) processCommand(command Command, e *event) {
}()
// Issue an append entries response for the server.
s.sendAsync(newAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex()))
resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex())
resp.append = true
resp.peer = s.Name()
s.sendAsync(resp)
}
//--------------------------------------
@ -655,7 +670,7 @@ func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append
if req.Term < s.currentTerm {
s.debugln("server.ae.error: stale term")
return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), false
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
}
// Update term and leader.
@ -664,22 +679,22 @@ func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append
// Reject if log doesn't contain a matching previous entry.
if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
s.debugln("server.ae.truncate.error: ", err)
return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), true
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
}
// Append entries to the log.
if err := s.log.appendEntries(req.Entries); err != nil {
s.debugln("server.ae.append.error: ", err)
return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), true
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
}
// Commit up to the commit index.
if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
s.debugln("server.ae.commit.error: ", err)
return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), true
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
}
return newAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex()), true
return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
}
// Processes the "append entries" response from the peer. This is only
@ -692,14 +707,19 @@ func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
return
}
// Ignore response if it's not successful.
// panic response if it's not successful.
if !resp.Success {
return
}
// if one peer successfully append a log from the leader term,
// we add it to the synced list
if resp.append == true {
s.syncedPeer[resp.peer] = true
}
// Increment the commit count to make sure we have a quorum before committing.
s.commitCount++
if s.commitCount < s.QuorumSize() {
if len(s.syncedPeer) < s.QuorumSize() {
return
}
@ -770,7 +790,8 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
lastIndex, lastTerm := s.log.lastInfo()
if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
s.debugln("server.rv.error: out of date log: ", req.CandidateName,
"[", lastIndex, "]", " [", req.LastLogIndex, "]")
"Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
"Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
return newRequestVoteResponse(s.currentTerm, false), false
}
@ -791,7 +812,7 @@ func (s *Server) AddPeer(name string) error {
// Do not allow peers to be added twice.
if s.peers[name] != nil {
return DuplicatePeerError
return nil
}
// Only add the peer if it doesn't have the same name.

View File

@ -69,13 +69,16 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
server := newTestServer("1", &testTransporter{})
server.Initialize()
server.StartLeader()
time.Sleep(time.Millisecond * 100)
server.currentTerm = 2
defer server.Stop()
resp := server.RequestVote(newRequestVoteRequest(2, "foo", 0, 0))
resp := server.RequestVote(newRequestVoteRequest(2, "foo", 1, 1))
if resp.Term != 2 || !resp.VoteGranted || server.VotedFor() != "foo" {
t.Fatalf("First vote should not have been denied")
}
resp = server.RequestVote(newRequestVoteRequest(3, "bar", 0, 0))
resp = server.RequestVote(newRequestVoteRequest(3, "bar", 1, 1))
if resp.Term != 3 || !resp.VoteGranted || server.VotedFor() != "bar" {
t.Fatalf("Second vote should have been approved")
@ -90,24 +93,25 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
`6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}`+"\n")
server.Initialize()
server.StartLeader()
server.currentTerm = 2
time.Sleep(time.Millisecond * 100)
defer server.Stop()
resp := server.RequestVote(newRequestVoteRequest(2, "foo", 2, 2))
if resp.Term != 2 || resp.VoteGranted {
resp := server.RequestVote(newRequestVoteRequest(3, "foo", 3, 3))
if resp.Term != 3 || resp.VoteGranted {
t.Fatalf("Stale index vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
}
resp = server.RequestVote(newRequestVoteRequest(2, "foo", 3, 1))
if resp.Term != 2 || resp.VoteGranted {
resp = server.RequestVote(newRequestVoteRequest(3, "foo", 4, 2))
if resp.Term != 3 || resp.VoteGranted {
t.Fatalf("Stale term vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
}
resp = server.RequestVote(newRequestVoteRequest(2, "foo", 3, 2))
if resp.Term != 2 || !resp.VoteGranted {
resp = server.RequestVote(newRequestVoteRequest(3, "foo", 4, 3))
if resp.Term != 3 || !resp.VoteGranted {
t.Fatalf("Matching log vote should have been granted")
}
resp = server.RequestVote(newRequestVoteRequest(2, "foo", 4, 3))
if resp.Term != 2 || !resp.VoteGranted {
resp = server.RequestVote(newRequestVoteRequest(3, "foo", 5, 4))
if resp.Term != 3 || !resp.VoteGranted {
t.Fatalf("Ahead-of-log vote should have been granted")
}
}
@ -164,7 +168,10 @@ func TestServerPromote(t *testing.T) {
func TestServerAppendEntries(t *testing.T) {
server := newTestServer("1", &testTransporter{})
server.Initialize()
server.StartLeader()
// this test should assume that the server is a follower
// the leader will commit itself
server.SetHeartbeatTimeout(time.Second * 10)
server.StartFollower()
defer server.Stop()
// Append single entry.
@ -396,7 +403,7 @@ func TestServerMultiNode(t *testing.T) {
}
mutex.RUnlock()
for i := 0; i < 20; i++ {
for i := 0; i < 200000; i++ {
retry := 0
fmt.Println("Round ", i)