Merge pull request #39 from xiangli-cmu/server

Blocking the transportation when the server will stepdown
pull/820/head
Ben Johnson 2013-07-02 19:31:55 -07:00
commit 8b8791eb8f
2 changed files with 42 additions and 31 deletions

View File

@ -370,12 +370,16 @@ func (s *Server) collectVotes(c chan *RequestVoteResponse) (bool, bool) {
// Step down if we discover a higher term.
s.mutex.Lock()
s.setCurrentTerm(resp.Term)
s.state = Follower
s.currentTerm = resp.Term
s.mutex.Unlock()
return false, false
}
}
case term := <- s.stepDown:
s.state = Follower
s.currentTerm = term
// TODO: do we calculate the overall timeout? or timeout for each vote?
// Some issue here
@ -406,11 +410,17 @@ func (s *Server) commitCenter() {
}
case term := <-s.stepDown:
s.mutex.Lock()
// stepdown to follower
s.setCurrentTerm(term)
// stop heartbeats
for _, peer := range s.peers {
peer.stop()
}
s.mutex.Unlock()
s.state = Follower
s.currentTerm = term
s.StartElectionTimeout()
return
}
@ -760,6 +770,7 @@ func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, err
if req.Term < s.currentTerm {
return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Stale term: %v < %v", req.Term, s.currentTerm)
}
s.setCurrentTerm(req.Term)
// If we've already voted for a different candidate then don't vote for this candidate.
@ -794,30 +805,14 @@ func (s *Server) setCurrentTerm(term uint64) {
if term > s.currentTerm {
s.votedFor = ""
if s.state == Leader {
debugln(s.Name(), " step down to a follower")
if s.state == Leader || s.state == Candidate{
debugln(s.Name(), " should step down to a follower from ", s.state)
// stop heartbeats
for _, peer := range s.peers {
peer.stop()
}
s.stepDown <- term
select {
case s.stepDown <- term:
default:
}
s.StartElectionTimeout()
// candidate should also start timeout
} else if s.state == Candidate {
s.StartElectionTimeout()
debugln(s.Name(), " step down to a follower from ", s.state)
return
}
s.state = Follower
// update term after stop all the peer
s.currentTerm = term
}

View File

@ -428,7 +428,7 @@ func TestServerMultiNode(t *testing.T) {
}
mutex.Unlock()
for i := 0; i < 20; i++ {
for i := 0; i < 200000; i++ {
i++
debugln("Round ", i)
@ -453,20 +453,36 @@ func TestServerMultiNode(t *testing.T) {
for i := 0; i < 10; i++ {
debugln("[Test] do ", value.Name())
if _, err := value.Do(&TestCommand2{X: 1}); err != nil {
t.Fatalf("Unable to do command")
break
}
debugln("[Test] Done")
}
leader++
debugln("Leader is ", value.Name(), " Index ", value.log.commitIndex)
}
debugln("Not Found leader")
}
}
for {
for key, value := range servers {
if key != num {
if value.State() == Leader {
leader++
}
}
}
if leader != 1 {
t.Fatalf("wrong leader number %v", leader)
if leader > 1 {
t.Fatalf("wrong leader number %v", leader)
}
if leader == 0 {
leader = 0
time.Sleep(100 * time.Millisecond)
continue
}
if leader == 1 {
break
}
}
//mutex.Unlock()