add timer.fire function, which can fire at the timer channel

pull/820/head
Xiang Li 2013-06-09 21:47:59 -07:00
parent 094e77f624
commit 531e12146a
4 changed files with 37 additions and 6 deletions

View File

@ -150,6 +150,13 @@ func (s *Server) State() string {
return s.state
}
// Retrieves the current term of the server.
func (s *Server) Term() uint64 {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.currentTerm
}
// Retrieves the name of the candidate this server voted for in this term.
func (s *Server) VotedFor() string {
s.mutex.Lock()
@ -248,7 +255,7 @@ func (s *Server) SetHeartbeatTimeout(duration time.Duration) {
//--------------------------------------
// Starts the server with a log at the given path.
func (s *Server) Start() error {
func (s *Server) Initialize() error {
s.mutex.Lock()
defer s.mutex.Unlock()
@ -269,21 +276,28 @@ func (s *Server) Start() error {
return fmt.Errorf("raft.Server: %v", err)
}
fmt.Println("curr ", s.currentTerm)
// Update the term to the last term in the log.
s.currentTerm = s.log.CurrentTerm()
fmt.Println("curr ", s.currentTerm)
// Update the state.
s.state = Follower
for _, peer := range s.peers {
peer.pause()
}
return nil
}
func (s *Server) StartFollower() {
// Start the election timeout.
c := make(chan bool)
s.electionTimer.Reset()
go s.electionTimeoutFunc(c)
<-c
return nil
}
// Shuts down the server.
@ -328,7 +342,7 @@ func (s *Server) Running() bool {
// Initializes the server to become leader of a new cluster. This function
// will fail if there is an existing log or the server is already a member in
// an existing cluster.
func (s *Server) Initialize() error {
func (s *Server) StartLeader() error {
s.mutex.Lock()
defer s.mutex.Unlock()
@ -374,7 +388,7 @@ func (s *Server) do(command Command) ([]byte, error) {
for _, peer := range s.peers {
peer.pause()
}
fmt.Println("curr term", s.currentTerm)
// Add a new entry to the log.
entry := s.log.CreateEntry(s.currentTerm, command)
if err := s.log.AppendEntry(entry); err != nil {
@ -390,6 +404,7 @@ func (s *Server) do(command Command) ([]byte, error) {
for _, peer := range s.peers {
peer.resume()
peer.heartbeatTimer.fire()
}
committed := false
@ -433,11 +448,14 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons
if req.Term < s.currentTerm {
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), fmt.Errorf("raft.Server: Stale request term")
}
fmt.Println("my term ", s.currentTerm, " req ", req.Term)
s.setCurrentTerm(req.Term)
// Update the current leader.
s.leader = req.LeaderName
fmt.Println("leader is ", req.LeaderName)
// Reset election timeout.
if s.electionTimer != nil {
s.electionTimer.Reset()
@ -664,6 +682,7 @@ func (s *Server) setCurrentTerm(term uint64) {
if term > s.currentTerm {
s.currentTerm = term
s.votedFor = ""
fmt.Println("go to be follower")
s.state = Follower
for _, peer := range s.peers {
peer.pause()
@ -691,6 +710,7 @@ func (s *Server) electionTimeoutFunc(startChannel chan bool) {
// If an election times out then promote this server. If the channel
// closes then that means the server has stopped so kill the function.
if _, ok := <-c; ok {
fmt.Println("timeout")
s.promote()
} else {
break
@ -706,12 +726,16 @@ func (s *Server) electionTimeoutFunc(startChannel chan bool) {
// within the context so that it is within the context of the server lock.
func (s *Server) AddPeer(name string) error {
// Do not allow peers to be added twice.
if s.peers[name] != nil {
return DuplicatePeerError
}
// Only add the peer if it doesn't have the same name.
if s.name != name {
fmt.Println("Add peer ", name)
peer := NewPeer(s, name, s.heartbeatTimeout)
if s.state == Leader {
peer.resume()

View File

@ -385,6 +385,9 @@ func TestServerMultiNode(t *testing.T) {
if leader.MemberCount() != 3 {
t.Fatalf("Expected member count to be 3, got %v", leader.MemberCount())
}
if servers["2"].State() == Leader || servers["3"].State() == Leader {
t.Fatalf("Expected leader should be 1: 2=%v, 3=%v\n", servers["2"].state, servers["3"].state)
}
mutex.Unlock()
// Stop the first server and wait for a re-election.

View File

@ -86,7 +86,7 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]*
for _, peer := range servers {
server.AddPeer(peer.Name())
}
server.Start()
server.Initialize()
}
return servers
}

View File

@ -137,6 +137,10 @@ func (t *Timer) stopInternalTimer() {
}
}
func (t *Timer) fire() {
t.c <-time.Now()
}
// Stops the timer if it is running and restarts it.
func (t *Timer) Reset() {
t.mutex.Lock()