From 75e0a229e912c307b60d9c1f9d66cab799756bff Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 10 Jul 2013 16:07:14 -0700 Subject: [PATCH 1/4] commit a NOP after the server becomes leader. Commit ohter commands, after the majority of the peers synced by the NOP command --- append_entries.go | 8 +++++-- command.go | 26 +++++++++++++++++++- http_transporter_test.go | 2 +- peer.go | 22 ++++++++++++++++- server.go | 51 ++++++++++++++++++++++++++-------------- server_test.go | 31 ++++++++++++++---------- 6 files changed, 106 insertions(+), 34 deletions(-) diff --git a/append_entries.go b/append_entries.go index f05a0edf9d..b7eceb39e1 100644 --- a/append_entries.go +++ b/append_entries.go @@ -18,10 +18,13 @@ type AppendEntriesRequest struct { // The response returned from a server appending entries to the log. type AppendEntriesResponse struct { - Term uint64 `json:"term"` + Term uint64 `json:"term"` + // the current index of the server Index uint64 `json:"index"` Success bool `json:"success"` CommitIndex uint64 `json:"commitIndex"` + peer string `json:"-"` + append bool `json:"-"` } //------------------------------------------------------------------------------ @@ -43,10 +46,11 @@ func newAppendEntriesRequest(term uint64, leaderName string, prevLogIndex uint64 } // Creates a new AppendEntries response. -func newAppendEntriesResponse(term uint64, success bool, commitIndex uint64) *AppendEntriesResponse { +func newAppendEntriesResponse(term uint64, success bool, index uint64, commitIndex uint64) *AppendEntriesResponse { return &AppendEntriesResponse{ Term: term, Success: success, + Index: index, CommitIndex: commitIndex, } } diff --git a/command.go b/command.go index ceb7172c76..0a0eb5df98 100644 --- a/command.go +++ b/command.go @@ -65,7 +65,31 @@ func RegisterCommand(command Command) { if command == nil { panic(fmt.Sprintf("raft: Cannot register nil")) } else if commandTypes[command.CommandName()] != nil { - panic(fmt.Sprintf("raft: Duplicate registration: %s", command.CommandName())) + // we need to register NOP command at the beginning + // for testing, it may register mutliple times + // i am not quite familiar with reg prorcess + // maybe you can fix it. sorry! + + //panic(fmt.Sprintf("raft: Duplicate registration: %s", command.CommandName())) + return } commandTypes[command.CommandName()] = command } + +//-------------------------------------- +// NOP command +//-------------------------------------- + +// NOP command +type NOPCommand struct { +} + +// The name of the NOP command in the log +func (c NOPCommand) CommandName() string { + return "nop" +} + +// NOP +func (c NOPCommand) Apply(server *Server) (interface{}, error) { + return nil, nil +} diff --git a/http_transporter_test.go b/http_transporter_test.go index ca349c1fed..ddee320c8f 100644 --- a/http_transporter_test.go +++ b/http_transporter_test.go @@ -90,7 +90,7 @@ func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTrans // Setup configuration. for _, server := range *servers { if _, err := (*servers)[0].Do(&joinCommand{Name: server.Name()}); err != nil { - t.Fatal("Server unable to join: %v", err) + t.Fatalf("Server %s unable to join: %v", server.Name(), err) } } diff --git a/peer.go b/peer.go index 00dc2071d6..a54007c67c 100644 --- a/peer.go +++ b/peer.go @@ -16,6 +16,7 @@ type Peer struct { server *Server name string prevLogIndex uint64 + synced bool mutex sync.RWMutex stopChan chan bool heartbeatTimeout time.Duration @@ -174,26 +175,45 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { if resp.Success { if len(req.Entries) > 0 { p.prevLogIndex = req.Entries[len(req.Entries)-1].Index + + // if peer append a log entry from the current term + // we set append to true + if req.Entries[len(req.Entries)-1].Term == p.server.currentTerm { + resp.append = true + } } traceln("peer.flush.success: ", p.server.Name(), "->", p.Name(), "; idx =", p.prevLogIndex) // If it was unsuccessful then decrement the previous log index and // we'll try again next time. } else { - // we may miss a response from peer if resp.CommitIndex >= p.prevLogIndex { + + // we may miss a response from peer + // so maybe the peer has commited the logs we sent + // but we did not receive the success reply and did not increase + // the prevLogIndex + p.prevLogIndex = resp.CommitIndex + debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex) } else if p.prevLogIndex > 0 { // Decrement the previous log index down until we find a match. Don't // let it go below where the peer's commit index is though. That's a // problem. p.prevLogIndex-- + // if it not enough, we directly decrease to the index of the + if p.prevLogIndex > resp.Index { + p.prevLogIndex = resp.Index + } + debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex) } } p.mutex.Unlock() + // Attach the peer to resp, thus server can know where it comes from + resp.peer = p.Name() // Send response to server for processing. p.server.send(resp) } diff --git a/server.go b/server.go index bf89242f50..6609de8261 100644 --- a/server.go +++ b/server.go @@ -59,12 +59,12 @@ type Server struct { context interface{} currentTerm uint64 - votedFor string - log *Log - leader string - peers map[string]*Peer - mutex sync.RWMutex - commitCount int + votedFor string + log *Log + leader string + peers map[string]*Peer + mutex sync.RWMutex + syncedPeer map[string]bool c chan *event electionTimeout time.Duration @@ -117,6 +117,8 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S return result, err } + RegisterCommand(&NOPCommand{}) + return s, nil } @@ -327,6 +329,7 @@ func (s *Server) StartFollower() { func (s *Server) StartLeader() { s.setState(Leader) s.currentTerm++ + s.debugln("leader start at term: ", s.currentTerm, " index: ", s.log.currentIndex()) go s.loop() } @@ -554,15 +557,18 @@ func (s *Server) candidateLoop() { // The event loop that is run when the server is in a Candidate state. func (s *Server) leaderLoop() { s.setState(Leader) - s.commitCount = 0 + s.syncedPeer = make(map[string]bool) logIndex, _ := s.log.lastInfo() // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat. for _, peer := range s.peers { + peer.synced = false peer.setPrevLogIndex(logIndex) peer.startHeartbeat() } + go s.Do(NOPCommand{}) + // Begin to collect response from followers for { var err error @@ -595,6 +601,7 @@ func (s *Server) leaderLoop() { for _, peer := range s.peers { peer.stopHeartbeat() } + s.syncedPeer = nil } //-------------------------------------- @@ -636,7 +643,11 @@ func (s *Server) processCommand(command Command, e *event) { }() // Issue an append entries response for the server. - s.sendAsync(newAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex())) + resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()) + resp.append = true + resp.peer = s.Name() + + s.sendAsync(resp) } //-------------------------------------- @@ -656,7 +667,7 @@ func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append if req.Term < s.currentTerm { s.debugln("server.ae.error: stale term") - return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), false + return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false } // Update term and leader. @@ -665,22 +676,22 @@ func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append // Reject if log doesn't contain a matching previous entry. if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil { s.debugln("server.ae.truncate.error: ", err) - return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), true + return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true } // Append entries to the log. if err := s.log.appendEntries(req.Entries); err != nil { s.debugln("server.ae.append.error: ", err) - return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), true + return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true } // Commit up to the commit index. if err := s.log.setCommitIndex(req.CommitIndex); err != nil { s.debugln("server.ae.commit.error: ", err) - return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), true + return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true } - return newAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex()), true + return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true } // Processes the "append entries" response from the peer. This is only @@ -693,14 +704,19 @@ func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) { return } - // Ignore response if it's not successful. + // panic response if it's not successful. if !resp.Success { return } + // if one peer successfully append a log from the leader term, + // we add it to the synced list + if resp.append == true { + s.syncedPeer[resp.peer] = true + } + // Increment the commit count to make sure we have a quorum before committing. - s.commitCount++ - if s.commitCount < s.QuorumSize() { + if len(s.syncedPeer) < s.QuorumSize() { return } @@ -771,7 +787,8 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot lastIndex, lastTerm := s.log.lastInfo() if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm { s.debugln("server.rv.error: out of date log: ", req.CandidateName, - "[", lastIndex, "]", " [", req.LastLogIndex, "]") + "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]", + "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]") return newRequestVoteResponse(s.currentTerm, false), false } diff --git a/server_test.go b/server_test.go index ac152193af..8b85621ad3 100644 --- a/server_test.go +++ b/server_test.go @@ -69,13 +69,16 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) { server := newTestServer("1", &testTransporter{}) server.Initialize() server.StartLeader() + + time.Sleep(time.Millisecond * 100) + server.currentTerm = 2 defer server.Stop() - resp := server.RequestVote(newRequestVoteRequest(2, "foo", 0, 0)) + resp := server.RequestVote(newRequestVoteRequest(2, "foo", 1, 1)) if resp.Term != 2 || !resp.VoteGranted || server.VotedFor() != "foo" { t.Fatalf("First vote should not have been denied") } - resp = server.RequestVote(newRequestVoteRequest(3, "bar", 0, 0)) + resp = server.RequestVote(newRequestVoteRequest(3, "bar", 1, 1)) if resp.Term != 3 || !resp.VoteGranted || server.VotedFor() != "bar" { t.Fatalf("Second vote should have been approved") @@ -90,24 +93,25 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) { `6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}`+"\n") server.Initialize() server.StartLeader() - server.currentTerm = 2 + + time.Sleep(time.Millisecond * 100) defer server.Stop() - resp := server.RequestVote(newRequestVoteRequest(2, "foo", 2, 2)) - if resp.Term != 2 || resp.VoteGranted { + resp := server.RequestVote(newRequestVoteRequest(3, "foo", 3, 3)) + if resp.Term != 3 || resp.VoteGranted { t.Fatalf("Stale index vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted) } - resp = server.RequestVote(newRequestVoteRequest(2, "foo", 3, 1)) - if resp.Term != 2 || resp.VoteGranted { + resp = server.RequestVote(newRequestVoteRequest(3, "foo", 4, 2)) + if resp.Term != 3 || resp.VoteGranted { t.Fatalf("Stale term vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted) } - resp = server.RequestVote(newRequestVoteRequest(2, "foo", 3, 2)) - if resp.Term != 2 || !resp.VoteGranted { + resp = server.RequestVote(newRequestVoteRequest(3, "foo", 4, 3)) + if resp.Term != 3 || !resp.VoteGranted { t.Fatalf("Matching log vote should have been granted") } - resp = server.RequestVote(newRequestVoteRequest(2, "foo", 4, 3)) - if resp.Term != 2 || !resp.VoteGranted { + resp = server.RequestVote(newRequestVoteRequest(3, "foo", 5, 4)) + if resp.Term != 3 || !resp.VoteGranted { t.Fatalf("Ahead-of-log vote should have been granted") } } @@ -164,7 +168,10 @@ func TestServerPromote(t *testing.T) { func TestServerAppendEntries(t *testing.T) { server := newTestServer("1", &testTransporter{}) server.Initialize() - server.StartLeader() + // this test should assume that the server is a follower + // the leader will commit itself + server.SetHeartbeatTimeout(time.Second * 10) + server.StartFollower() defer server.Stop() // Append single entry. From b9e1da8ac23927bcfd40f218e8b97480cce73928 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 10 Jul 2013 20:02:24 -0700 Subject: [PATCH 2/4] let knowen peer to rejoin to the cluster --- peer.go | 1 + server.go | 4 +++- server_test.go | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/peer.go b/peer.go index a54007c67c..1b44877db5 100644 --- a/peer.go +++ b/peer.go @@ -250,6 +250,7 @@ func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteRespo debugln("peer.vote: ", p.server.Name(), "->", p.Name()) req.peer = p if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil { + debugln("peer.vote: recv", p.server.Name(), "<-", p.Name()) resp.peer = p c <- resp } diff --git a/server.go b/server.go index d7ee0ad9ee..d9a46aa2bf 100644 --- a/server.go +++ b/server.go @@ -516,6 +516,8 @@ func (s *Server) candidateLoop() { } else if resp.Term > s.currentTerm { s.debugln("server.candidate.vote.failed") s.setCurrentTerm(resp.Term, "", false) + } else { + s.debugln("server.candidate.vote: denied") } case e := <-s.c: @@ -808,7 +810,7 @@ func (s *Server) AddPeer(name string) error { // Do not allow peers to be added twice. if s.peers[name] != nil { - return DuplicatePeerError + return nil } // Only add the peer if it doesn't have the same name. diff --git a/server_test.go b/server_test.go index 8b85621ad3..ad8f04cb68 100644 --- a/server_test.go +++ b/server_test.go @@ -403,7 +403,7 @@ func TestServerMultiNode(t *testing.T) { } mutex.RUnlock() - for i := 0; i < 20; i++ { + for i := 0; i < 200000; i++ { retry := 0 fmt.Println("Round ", i) From 2abebe3065e5a246101fcacf51ce01d55dc02e4a Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 10 Jul 2013 22:19:57 -0700 Subject: [PATCH 3/4] refactor --- command.go | 25 +------------------------ peer.go | 1 - server.go | 8 +++++--- 3 files changed, 6 insertions(+), 28 deletions(-) diff --git a/command.go b/command.go index 0a0eb5df98..ba307f31f0 100644 --- a/command.go +++ b/command.go @@ -65,31 +65,8 @@ func RegisterCommand(command Command) { if command == nil { panic(fmt.Sprintf("raft: Cannot register nil")) } else if commandTypes[command.CommandName()] != nil { - // we need to register NOP command at the beginning - // for testing, it may register mutliple times - // i am not quite familiar with reg prorcess - // maybe you can fix it. sorry! - - //panic(fmt.Sprintf("raft: Duplicate registration: %s", command.CommandName())) + panic(fmt.Sprintf("raft: Duplicate registration: %s", command.CommandName())) return } commandTypes[command.CommandName()] = command } - -//-------------------------------------- -// NOP command -//-------------------------------------- - -// NOP command -type NOPCommand struct { -} - -// The name of the NOP command in the log -func (c NOPCommand) CommandName() string { - return "nop" -} - -// NOP -func (c NOPCommand) Apply(server *Server) (interface{}, error) { - return nil, nil -} diff --git a/peer.go b/peer.go index 1b44877db5..23476f81fa 100644 --- a/peer.go +++ b/peer.go @@ -16,7 +16,6 @@ type Peer struct { server *Server name string prevLogIndex uint64 - synced bool mutex sync.RWMutex stopChan chan bool heartbeatTimeout time.Duration diff --git a/server.go b/server.go index d9a46aa2bf..7607679a1c 100644 --- a/server.go +++ b/server.go @@ -117,8 +117,6 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S return result, err } - RegisterCommand(&NOPCommand{}) - return s, nil } @@ -296,6 +294,11 @@ func (s *Server) SetHeartbeatTimeout(duration time.Duration) { // Initialization //-------------------------------------- +// Reg the NOPCommand +func init() { + RegisterCommand(&NOPCommand{}) +} + // Starts the server with a log at the given path. func (s *Server) Initialize() error { @@ -563,7 +566,6 @@ func (s *Server) leaderLoop() { // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat. for _, peer := range s.peers { - peer.synced = false peer.setPrevLogIndex(logIndex) peer.startHeartbeat() } From 255ce1bc7d212b453df39d2d5a5cc10ea5aad3ef Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 10 Jul 2013 22:20:21 -0700 Subject: [PATCH 4/4] add NOPCommand.go --- NOPCommand.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 NOPCommand.go diff --git a/NOPCommand.go b/NOPCommand.go new file mode 100644 index 0000000000..0d4b6f2d19 --- /dev/null +++ b/NOPCommand.go @@ -0,0 +1,19 @@ +package raft + +//-------------------------------------- +// NOP command +//-------------------------------------- + +// NOP command +type NOPCommand struct { +} + +// The name of the NOP command in the log +func (c NOPCommand) CommandName() string { + return "nop" +} + +// NOP +func (c NOPCommand) Apply(server *Server) (interface{}, error) { + return nil, nil +}