Merge pull request #27 from xiangli-cmu/master

Fix commit issue
pull/820/head
Ben Johnson 2013-06-27 14:49:23 -07:00
commit 82ccc09168
3 changed files with 139 additions and 93 deletions

View File

@ -338,13 +338,26 @@ func (s *Server) StartLeader() error {
// leader will commit the log entry
func (s *Server) commitCenter() {
debugln("collecting data")
count := 1
for {
var response FlushResponse
select {
case response = <-s.response:
// count for success response from peers
if response.success && response.peer != nil {
count++
}
case term := <-s.stepDown:
s.mutex.Lock()
s.setCurrentTerm(term)
s.mutex.Unlock()
return
}
@ -352,37 +365,42 @@ func (s *Server) commitCenter() {
debugln("[CommitCenter] Receive response from ", response.peer.Name(), response.success)
}
// Determine the committed index that a majority has.
var indices []uint64
indices = append(indices, s.log.CurrentIndex())
for _, peer := range s.peers {
indices = append(indices, peer.prevLogIndex)
}
sort.Sort(Uint64Slice(indices))
// At least one entry from the leader's current term must also be stored on
// a majority of servers
if count >= s.QuorumSize() {
// Determine the committed index that a majority has.
var indices []uint64
indices = append(indices, s.log.CurrentIndex())
for _, peer := range s.peers {
indices = append(indices, peer.prevLogIndex)
}
sort.Sort(Uint64Slice(indices))
// We can commit upto the index which the mojarity
// of the members have appended.
commitIndex := indices[s.QuorumSize()-1]
committedIndex := s.log.CommitIndex()
// We can commit upto the index which the mojarity
// of the members have appended.
commitIndex := indices[s.QuorumSize()-1]
committedIndex := s.log.CommitIndex()
if commitIndex > committedIndex {
if commitIndex > committedIndex {
debugln(indices)
debugln("[CommitCenter] Going to Commit ", commitIndex)
s.log.SetCommitIndex(commitIndex)
debugln("[CommitCenter] Commit ", commitIndex)
debugln("[CommitCenter] Going to Commit ", commitIndex)
s.log.SetCommitIndex(commitIndex)
debugln("[CommitCenter] Commit ", commitIndex)
for i := committedIndex; i < commitIndex; i++ {
select {
case s.log.entries[i-s.log.startIndex].commit <- true:
debugln("notify")
continue
for i := committedIndex; i < commitIndex; i++ {
select {
case s.log.entries[i-s.log.startIndex].commit <- true:
debugln("notify")
continue
// we have a buffered commit channel, it should return immediately
default:
panic("Cannot send commit nofication")
// if we are the leader when the log received
default:
debugln("Cannot send commit nofication, log from previous leader")
}
}
}
}
}
}
@ -447,9 +465,14 @@ func (s *Server) Do(command Command) (interface{}, error) {
s.response <- FlushResponse{s.currentTerm, true, nil, nil}
// to speed up the response time
for _, peer := range s.peers {
peer.heartbeatTimer.Fire()
}
// TODO: think about this carefully
// fire will speed up response time
// but will reduce through output
// for _, peer := range s.peers {
// peer.heartbeatTimer.Fire()
// }
debugln("[Do] join!")
// timeout here
@ -505,7 +528,8 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err
}
debugln("Peer ", s.Name(), "after append ")
debugln("Peer ", s.Name(), "commit index ", req.CommitIndex, " from ",
req.LeaderName)
// Commit up to the commit index.
if err := s.log.SetCommitIndex(req.CommitIndex); err != nil {
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err
@ -635,6 +659,7 @@ func (s *Server) promoteToCandidate() (uint64, uint64, uint64, error) {
// Ignore promotion if the server is not a follower.
if s.state != Follower && s.state != Candidate {
panic("promote but not a follower")
return 0, 0, 0, fmt.Errorf("raft: Invalid promotion state: %s", s.state)
}
@ -650,7 +675,8 @@ func (s *Server) promoteToCandidate() (uint64, uint64, uint64, error) {
lastLogIndex, lastLogTerm := s.log.LastInfo()
debugln("[PromoteToCandidate] Follower ", s.Name(),
"promote to candidate[", lastLogIndex, ",", lastLogTerm, "]")
"promote to candidate[", lastLogIndex, ",", lastLogTerm, "]",
"currentTerm ", s.currentTerm)
return s.currentTerm, lastLogIndex, lastLogTerm, nil
}
@ -665,7 +691,7 @@ func (s *Server) promoteToLeader(term uint64, lastLogIndex uint64, lastLogTerm u
// Ignore promotion if we are not a candidate.
if s.state != Candidate {
panic("promote to leader but not candidate")
panic(s.Name() + " promote to leader but not candidate " + s.state)
}
// TODO: should panic or just a false?
@ -723,6 +749,7 @@ func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, err
// If we've already voted for a different candidate then don't vote for this candidate.
if s.votedFor != "" && s.votedFor != req.CandidateName {
debugln("already vote for ", s.votedFor, " false to ", req.CandidateName)
return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Already voted for %v", s.votedFor)
}
@ -736,7 +763,7 @@ func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, err
// If we made it this far then cast a vote and reset our election time out.
s.votedFor = req.CandidateName
debugln(s.Name(), "Vote for ", req.CandidateName)
debugln(s.Name(), "Vote for ", req.CandidateName, "at term", req.Term)
if s.electionTimer != nil {
s.electionTimer.Stop()
@ -766,6 +793,11 @@ func (s *Server) setCurrentTerm(term uint64) {
default:
}
s.StartElectionTimeout()
// candidate should also start timeout
} else if s.state == Candidate {
s.StartElectionTimeout()
}

View File

@ -116,70 +116,84 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
// //--------------------------------------
// // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader.
// func TestServerPromoteSelf(t *testing.T) {
// server := newTestServer("1", &testTransporter{})
// server.Initialize()
// server.StartFollower()
// defer server.Stop()
// if success, err := server.promote(); !(success && err == nil && server.state == Leader) {
// t.Fatalf("Server self-promotion failed: %v (%v)", server.state, err)
// }
// }
func TestServerPromoteSelf(t *testing.T) {
server := newTestServer("1", &testTransporter{})
server.Initialize()
server.StartFollower()
defer server.Stop()
// // Ensure that we can promote a server within a cluster to a leader.
// func TestServerPromote(t *testing.T) {
// lookup := map[string]*Server{}
// transporter := &testTransporter{}
// transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
// return lookup[peer.Name()].RequestVote(req)
// }
// transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
// return lookup[peer.Name()].AppendEntries(req)
// }
// servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
// for _, server := range servers {
// defer server.Stop()
// }
// leader := servers[0]
// leader.StartFollower()
// if success, err := leader.promote(); !(success && err == nil && leader.state == Leader) {
// t.Fatalf("Server promotion in cluster failed: %v (%v)", leader.state, err)
// }
// }
if success, err := server.promote(); !(success && err == nil && server.state == Leader) {
t.Fatalf("Server self-promotion failed: %v (%v)", server.state, err)
}
}
//Ensure that we can promote a server within a cluster to a leader.
func TestServerPromote(t *testing.T) {
debugln("---TestServerPromote---")
lookup := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
return lookup[peer.Name()].RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
return lookup[peer.Name()].AppendEntries(req)
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
lookup["1"].state = Follower
lookup["2"].state = Follower
lookup["3"].state = Follower
leader := servers[0]
if success, err := leader.promote(); !(success && err == nil && leader.state == Leader) {
t.Fatalf("Server self-promotion failed: %v (%v)", leader.state, err)
}
for _, server := range servers {
server.Stop()
}
}
// Ensure that a server will restart election if not enough votes are obtained before timeout.
// func TestServerPromoteDoubleElection(t *testing.T) {
// lookup := map[string]*Server{}
// transporter := &testTransporter{}
// transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
// resp, err := lookup[peer.Name()].RequestVote(req)
// return resp, err
// }
// transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
// resp, err := lookup[peer.Name()].AppendEntries(req)
// return resp, err
// }
// servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
// lookup["2"].currentTerm, lookup["2"].votedFor = 1, "2"
// lookup["3"].currentTerm, lookup["3"].votedFor = 1, "3"
// lookup["2"].electionTimer.Stop()
// lookup["3"].electionTimer.Stop()
// for _, server := range servers {
// defer server.Stop()
// }
// leader := servers[0]
// leader.StartFollower()
// if success, err := leader.promote(); !(success && err == nil && leader.state == Leader && leader.currentTerm == 2) {
// t.Fatalf("Server promotion in cluster failed: %v (%v)", leader.state, err)
// }
// time.Sleep(50 * time.Millisecond)
// if lookup["2"].votedFor != "1" {
// t.Fatalf("Unexpected vote for server 2: %v", lookup["2"].votedFor)
// }
// if lookup["3"].votedFor != "1" {
// t.Fatalf("Unexpected vote for server 3: %v", lookup["3"].votedFor)
// }
// }
func TestServerPromoteDoubleElection(t *testing.T) {
debugln("---TestServerPromoteDoubleElection---")
lookup := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) (*RequestVoteResponse, error) {
resp, err := lookup[peer.Name()].RequestVote(req)
return resp, err
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
resp, err := lookup[peer.Name()].AppendEntries(req)
return resp, err
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
lookup["2"].currentTerm, lookup["2"].votedFor = 1, "2"
lookup["3"].currentTerm, lookup["3"].votedFor = 1, "3"
lookup["1"].state = Follower
lookup["2"].state = Follower
lookup["3"].state = Follower
leader := servers[0]
if success, err := leader.promote(); !(success && err == nil && leader.state == Leader) {
t.Fatalf("Server self-promotion failed: %v (%v)", leader.state, err)
}
if lookup["2"].votedFor != "1" {
t.Fatalf("Unexpected vote for server 2: %v", lookup["2"].votedFor)
}
if lookup["3"].votedFor != "1" {
t.Fatalf("Unexpected vote for server 3: %v", lookup["3"].votedFor)
}
for _, server := range servers {
server.Stop()
}
}
//--------------------------------------
// Append Entries

View File

@ -9,7 +9,7 @@ import (
const (
testHeartbeatTimeout = 10 * time.Millisecond
testElectionTimeout = 50 * time.Millisecond
testElectionTimeout = 100 * time.Millisecond
)
func init() {