add server state mutex to avoid state race condition

pull/820/head
Xiang Li 2013-07-05 10:44:03 -07:00
parent fe55d87640
commit cbceb05801
4 changed files with 159 additions and 44 deletions

32
peer.go
View File

@ -2,9 +2,9 @@ package raft
import (
"errors"
"fmt"
"sync"
"time"
"fmt"
)
//------------------------------------------------------------------------------
@ -185,18 +185,19 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error)
var resp *AppendEntriesResponse
select {
case <-time.After(p.server.heartbeatTimeout):
// how to decide?
case <-time.After(p.server.heartbeatTimeout * 2):
resp = nil
case resp = <-respChan:
}
debugln("receive flush response from ", p.Name())
if resp == nil {
debugln("receive flush timeout from ", p.Name())
return 0, false, fmt.Errorf("AppendEntries timeout: %s", p.Name())
}
debugln("receive flush response from ", p.Name())
// If successful then update the previous log index. If it was
// unsuccessful then decrement the previous log index and we'll try again
@ -204,8 +205,8 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error)
if resp.Success {
if len(req.Entries) > 0 {
p.prevLogIndex = req.Entries[len(req.Entries)-1].Index
debugln("Peer ", p.Name(), "'s' log update to ", p.prevLogIndex)
}
debugln(p.server.GetState()+": Peer ", p.Name(), "'s' log update to ", p.prevLogIndex)
} else {
if resp.Term > p.server.currentTerm {
@ -214,8 +215,10 @@ func (p *Peer) sendFlushRequest(req *AppendEntriesRequest) (uint64, bool, error)
// we may miss a response from peer
if resp.CommitIndex > p.prevLogIndex {
debugln(p.server.GetState()+": Peer ", p.Name(), "'s' log update to ", p.prevLogIndex)
p.prevLogIndex = resp.CommitIndex
} else if p.prevLogIndex > 0 {
debugln("Peer ", p.Name(), "'s' step back to ", p.prevLogIndex)
// Decrement the previous log index down until we find a match. Don't
// let it go below where the peer's commit index is though. That's a
// problem.
@ -257,13 +260,20 @@ func (p *Peer) heartbeat() {
} else {
// shutdown the heartbeat
if f.term > p.server.currentTerm {
debugln("[Heartbeat] SetpDown!")
select {
case p.server.stepDown <- f.term:
return
default:
return
p.server.stateMutex.Lock()
if p.server.state == Leader {
p.server.state = Follower
select {
case p.server.stepDown <- f.term:
p.server.currentTerm = f.term
default:
panic("heartbeat cannot step down")
}
}
p.server.stateMutex.Unlock()
return
}
}

121
server.go
View File

@ -56,11 +56,12 @@ type Server struct {
context interface{}
currentTerm uint64
votedFor string
log *Log
leader string
peers map[string]*Peer
mutex sync.Mutex
votedFor string
log *Log
leader string
peers map[string]*Peer
mutex sync.Mutex
stateMutex sync.Mutex
electionTimer *Timer
heartbeatTimeout time.Duration
@ -97,7 +98,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
state: Stopped,
peers: make(map[string]*Peer),
log: NewLog(),
stepDown: make(chan uint64),
stepDown: make(chan uint64, 1),
stop: make(chan bool),
electionTimer: NewTimer(DefaultElectionTimeout, DefaultElectionTimeout*2),
heartbeatTimeout: DefaultHeartbeatTimeout,
@ -220,7 +221,7 @@ func (s *Server) LastCommandName() string {
// Get the state of the server for debugging
func (s *Server) GetState() string {
return fmt.Sprintf("State: %s, Term: %v, Index: %v ", s.state, s.currentTerm, s.CommittedIndex())
return fmt.Sprintf("Name: %s, State: %s, Term: %v, Index: %v ", s.name, s.state, s.currentTerm, s.CommittedIndex())
}
//--------------------------------------
@ -351,6 +352,7 @@ func (s *Server) StartServerLoop(role string) {
role = Candidate
case Candidate:
debugln(s.GetState() + "start Candiate")
stop, leader = s.startCandidateLoop()
s.votedFor = ""
@ -366,14 +368,17 @@ func (s *Server) StartServerLoop(role string) {
role = Follower
}
debugln(s.GetState() + "stop Candiate")
case Leader:
debugln(s.GetState() + "start Leader")
stop = s.startLeaderLoop()
if stop {
return
}
role = Follower
debugln(s.GetState() + "stop Leader")
}
}
}
@ -387,13 +392,13 @@ func (s *Server) StartFollower() {
// Start the sever as a leader
func (s *Server) StartLeader() {
s.state = Candidate
s.currentTerm++
go s.StartServerLoop(Leader)
}
// Shuts down the server.
func (s *Server) Stop() {
s.mutex.Lock()
if s.state == Follower {
s.electionTimer.Stop()
} else {
@ -413,8 +418,9 @@ func (s *Server) unload() {
// Close the log.
if s.log != nil {
// still some concurrency issue with stop
// need lock
s.log.Close()
s.log = nil
}
}
@ -461,6 +467,7 @@ func (s *Server) startFollowerLoop() (stop bool) {
func (s *Server) startCandidateLoop() (stop bool, leader bool) {
// the server must be a follower
if s.state != Follower && s.state != Stopped {
panic("startCandidateLoop")
}
@ -499,6 +506,7 @@ func (s *Server) startCandidateLoop() (stop bool, leader bool) {
}
if stop {
return true, false
}
@ -523,17 +531,36 @@ func (s *Server) startCandidateLoop() (stop bool, leader bool) {
// Step down if currentTerm changes (§5.5)
func (s *Server) startLeaderLoop() bool {
if s.state != Candidate && s.state != Stopped {
panic(s.Name() + " promote to leader but not candidate " + s.state)
// when the server goes into this loop,
// the leader may have been stepped down to follower!
// we cannot assume the the server is a candidate when
// get into this func
// The request vote func may let it step down
// That happens when we receive the majority votes, but
// another candidate start a new term and has not vote for us
// after it send vote request, the leader will stepdown before
// it enter this func
// Move server to become a leader and begin peer heartbeats.
s.stateMutex.Lock()
if s.state == Candidate {
s.state = Leader
s.leader = s.name
} else {
s.stateMutex.Unlock()
return false
}
s.state = Leader
s.stateMutex.Unlock()
logIndex, _ := s.log.LastInfo()
// Move server to become a leader and begin peer heartbeats.
s.state = Leader
s.leader = s.name
// after here we let the leader stepdown in the startLeaderSelect loop
// Update the peers prevLogIndex to leader's lastLogIndex
// Start heartbeat
@ -566,6 +593,9 @@ func (s *Server) startCandidateSelect(c chan *RequestVoteResponse) (bool, bool,
// Collect votes until we have a quorum.
votesGranted := 1
debugln(s.GetState() + "start Select")
defer debugln(s.GetState() + "end Select")
for {
// If we received enough votes then stop waiting for more votes.
@ -576,26 +606,43 @@ func (s *Server) startCandidateSelect(c chan *RequestVoteResponse) (bool, bool,
// Collect votes from peers.
select {
case resp := <-c:
debugln(s.GetState() + "select recv vote")
if resp != nil {
if resp.VoteGranted == true {
votesGranted++
} else if resp.Term > s.currentTerm {
s.stateMutex.Lock()
// go from internal path
// we may need to eat the stepdown
select {
case <-s.stepDown:
default:
}
s.state = Follower
s.currentTerm = resp.Term
debugln(s.GetState() + "select step down")
s.stateMutex.Unlock()
return false, false, false
}
}
case term := <-s.stepDown:
s.currentTerm = term
case <-s.stepDown:
debugln(s.GetState() + "select step down")
return false, false, false
// TODO: do we calculate the overall timeout? or timeout for each vote?
// Some issue here
case <-afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2):
debugln(s.GetState() + "select timeout")
return false, true, false
case <-s.stop:
debugln(s.GetState() + "select stop")
return false, false, true
}
@ -619,9 +666,9 @@ func (s *Server) startLeaderSelect() bool {
count++
}
case term := <-s.stepDown:
case <-s.stepDown:
// stepdown to follower
s.currentTerm = term
return false
case <-s.stop:
@ -682,16 +729,34 @@ func (s *Server) startLeaderSelect() bool {
// when the command has been successfully committed or an error has occurred.
func (s *Server) Do(command Command) (interface{}, error) {
// race here
// chance to append entry when we are not leader
// after the check, the leader may stepdown
// but log appended with the newest term
// which means the command from this follower can be commited
// which will cause a panic
s.stateMutex.Lock()
if s.state != Leader {
s.stateMutex.Unlock()
return nil, NotLeaderError
}
entry := s.log.CreateEntry(s.currentTerm, command)
// we get the term of the server
// when we are sure the server is leader
term := s.currentTerm
s.stateMutex.Unlock()
entry := s.log.CreateEntry(term, command)
if err := s.log.AppendEntry(entry); err != nil {
return nil, err
}
s.response <- FlushResponse{s.currentTerm, true, nil, nil}
s.response <- FlushResponse{term, true, nil, nil}
// to speed up the response time
// TODO: think about this carefully
@ -843,14 +908,25 @@ func (s *Server) setCurrentTerm(term uint64) {
if term > s.currentTerm {
s.votedFor = ""
s.stateMutex.Lock()
if s.state == Leader || s.state == Candidate {
debugln(s.Name(), " should step down to a follower from ", s.state)
s.stepDown <- term
s.state = Follower
select {
case s.stepDown <- term:
default:
panic("cannot stepdown")
}
debugln(s.Name(), " step down to a follower from ", s.state)
s.currentTerm = term
s.stateMutex.Unlock()
return
}
s.stateMutex.Unlock()
// update term after stop all the peer
s.currentTerm = term
}
@ -879,6 +955,7 @@ func (s *Server) AddPeer(name string) error {
s.peers[peer.name] = peer
}
return nil
}

View File

@ -1,6 +1,7 @@
package raft
import (
"fmt"
"reflect"
"strconv"
"sync"
@ -75,6 +76,10 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
t.Fatalf("First vote should not have been denied (%v)", err)
}
resp, err = server.RequestVote(NewRequestVoteRequest(3, "bar", 0, 0))
// now stepdown is done by channel, need time
time.Sleep(5 * time.Millisecond)
if !(resp.Term == 3 && resp.VoteGranted && server.VotedFor() == "bar" && err == nil) {
t.Fatalf("Second vote should have been approved (%v)", err)
}
@ -229,6 +234,7 @@ func TestServerAppendEntries(t *testing.T) {
// Send zero entries and commit everything.
resp, err = server.AppendEntries(NewAppendEntriesRequest(2, "ldr", 3, 1, []*LogEntry{}, 3))
if !(resp.Term == 2 && resp.Success && err == nil) {
t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err)
}
@ -242,6 +248,7 @@ func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
server := newTestServer("1", &testTransporter{})
server.Initialize()
server.StartLeader()
defer server.Stop()
server.currentTerm = 2
@ -261,6 +268,7 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
server := newTestServer("1", &testTransporter{})
server.Initialize()
server.StartLeader()
defer server.Stop()
// Append single entry + commit.
@ -388,7 +396,7 @@ func TestServerMultiNode(t *testing.T) {
var names []string
n := 5
n := 9
// add n servers
for i := 1; i <= n; i++ {
@ -413,6 +421,7 @@ func TestServerMultiNode(t *testing.T) {
server.SetElectionTimeout(testElectionTimeout)
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.StartFollower()
time.Sleep(10 * time.Millisecond)
}
if _, err := leader.Do(&joinCommand{Name: name}); err != nil {
t.Fatalf("Unable to join server[%s]: %v", name, err)
@ -431,17 +440,21 @@ func TestServerMultiNode(t *testing.T) {
}
mutex.Unlock()
for i := 0; i < 200000; i++ {
i++
debugln("Round ", i)
for i := 0; i < 20000000; i++ {
retry := 0
fmt.Println("Round ", i)
num := strconv.Itoa(i%(len(servers)) + 1)
num_1 := strconv.Itoa((i+3)%(len(servers)) + 1)
toStop := servers[num]
toStop_1 := servers[num_1]
// Stop the first server and wait for a re-election.
time.Sleep(100 * time.Millisecond)
debugln("Disconnect ", toStop.Name())
debugln("disconnect ", num, " ", num_1)
toStop.SetTransporter(disTransporter)
toStop_1.SetTransporter(disTransporter)
time.Sleep(200 * time.Millisecond)
// Check that either server 2 or 3 is the leader now.
//mutex.Lock()
@ -450,7 +463,7 @@ func TestServerMultiNode(t *testing.T) {
for key, value := range servers {
debugln("Play begin")
if key != num {
if key != num && key != num_1 {
if value.State() == Leader {
debugln("Found leader")
for i := 0; i < 10; i++ {
@ -467,23 +480,37 @@ func TestServerMultiNode(t *testing.T) {
}
for {
for key, value := range servers {
if key != num {
if key != num && key != num_1 {
if value.State() == Leader {
leader++
}
debugln(value.Name(), " ", value.currentTerm, " ", value.state)
}
}
if leader > 1 {
if retry < 300 {
debugln("retry")
retry++
leader = 0
Debug = true
time.Sleep(100 * time.Millisecond)
continue
}
t.Fatalf("wrong leader number %v", leader)
}
if leader == 0 {
leader = 0
time.Sleep(100 * time.Millisecond)
continue
if retry < 300 {
retry++
fmt.Println("retry 0")
leader = 0
time.Sleep(100 * time.Millisecond)
continue
}
t.Fatalf("wrong leader number %v", leader)
}
if leader == 1 {
Debug = false
break
}
}
@ -491,6 +518,7 @@ func TestServerMultiNode(t *testing.T) {
//mutex.Unlock()
toStop.SetTransporter(transporter)
toStop_1.SetTransporter(transporter)
}
}

View File

@ -48,10 +48,10 @@ func NewTimer(minDuration time.Duration, maxDuration time.Duration) *Timer {
}
return &Timer{
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
minDuration: minDuration,
maxDuration: maxDuration,
state: READY,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
stop: make(chan bool, 1),
fire: make(chan time.Time),
}