with assertion problem at server.go L567

pull/820/head
Xiang Li 2013-07-25 14:26:27 -07:00
parent b01e3e713e
commit 057351ea5c
6 changed files with 111 additions and 13 deletions

View File

@ -16,7 +16,7 @@ const (
Trace = 2
)
var LogLevel int = 0
var LogLevel int = 2
var logger *log.Logger
func init() {

17
join_command.go Normal file
View File

@ -0,0 +1,17 @@
package raft
// Join command
type JoinCommand struct {
Name string `json:"name"`
}
// The name of the Join command in the log
func (c JoinCommand) CommandName() string {
return "raft:join"
}
func (c JoinCommand) Apply(server *Server) (interface{}, error) {
err := server.AddPeer(c.Name)
return []byte("join"), err
}

17
leave_command.go Normal file
View File

@ -0,0 +1,17 @@
package raft
// Leave command
type LeaveCommand struct {
Name string `json:"name"`
}
// The name of the Leave command in the log
func (c LeaveCommand) CommandName() string {
return "raft:leave"
}
func (c LeaveCommand) Apply(server *Server) (interface{}, error) {
err := server.RemovePeer(c.Name)
return []byte("leave"), err
}

View File

@ -10,7 +10,7 @@ type NOPCommand struct {
// The name of the NOP command in the log
func (c NOPCommand) CommandName() string {
return "nop"
return "raft:nop"
}
func (c NOPCommand) Apply(server *Server) (interface{}, error) {

View File

@ -12,6 +12,7 @@ import (
"sort"
"sync"
"time"
"reflect"
)
//------------------------------------------------------------------------------
@ -65,7 +66,7 @@ type Server struct {
transporter Transporter
context interface{}
currentTerm uint64
synced bool
promotable bool
votedFor string
log *Log
@ -309,8 +310,55 @@ func (s *Server) SetHeartbeatTimeout(duration time.Duration) {
// Reg the NOPCommand
func init() {
RegisterCommand(&NOPCommand{})
RegisterCommand(&JoinCommand{})
}
func (s *Server) Start() error {
// Exit if the server is already running.
if s.state != Stopped {
return errors.New("raft.Server: Server already running")
}
// Create snapshot directory if not exist
os.Mkdir(path.Join(s.path, "snapshot"), 0700)
// Initialize the log and load it up.
if err := s.log.open(s.LogPath()); err != nil {
s.debugln("raft: Log error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
if err := s.readConf(); err != nil {
s.debugln("raft: Conf file error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
// Update the term to the last term in the log.
_, s.currentTerm = s.log.lastInfo()
s.setState(Follower)
// If no log entries exist then
// 1. wait for AEs from another node
// 2. wait for self-join command
// to set itself promotable
if (s.log.currentIndex() == 0) {
s.debugln("start as a new raft server")
s.promotable = false
// If log entries exist then allow promotion to candidate
// if no AEs received.
} else {
s.debugln("start from previous saved state")
s.promotable = true
}
go s.loop()
return nil
}
// Starts the server with a log at the given path.
func (s *Server) Initialize() error {
@ -383,8 +431,8 @@ func (s *Server) readConf() error {
// Start the sever as a follower
// If we set synced to false, the follower will not promote itseft
// until it get synced with the cluster once
func (s *Server) StartFollower(synced bool) {
s.synced = synced
func (s *Server) StartFollower(promotable bool) {
s.promotable = promotable
s.setState(Follower)
s.debugln("follower starts at term: ", s.currentTerm, " index: ", s.log.currentIndex(), " commitIndex: ", s.log.commitIndex)
go s.loop()
@ -392,7 +440,7 @@ func (s *Server) StartFollower(synced bool) {
// Start the sever as a leader
func (s *Server) StartLeader() {
s.synced = true
s.promotable = true
s.setState(Leader)
s.currentTerm++
s.debugln("leader starts at term: ", s.currentTerm, " index: ", s.log.currentIndex(), " commitIndex: ", s.log.commitIndex)
@ -514,8 +562,22 @@ func (s *Server) followerLoop() {
case e := <-s.c:
if e.target == &stopValue {
s.setState(Stopped)
} else if _, ok := e.target.(Command); ok {
err = NotLeaderError
} else if command, ok := e.target.(Command); ok {
if reflect.Typeof(command) == {
//If no log entries exist and a self-join command is issued
//then immediately become leader and commit entry.
if s.log.currentIndex() == 0 && command.Name == s.Name() {
fmt.Println("selfjoin!")
s.promotable = true
s.setState(Leader)
s.processCommand(command, e)
} else {
err = NotLeaderError
}
} else {
err = NotLeaderError
}
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue, update = s.processAppendEntriesRequest(req)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
@ -530,7 +592,7 @@ func (s *Server) followerLoop() {
case <-timeoutChan:
// only allow synced follower to promote to candidate
if s.synced {
if s.promotable {
s.setState(Candidate)
} else {
update = true
@ -810,7 +872,7 @@ func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append
// once the server appended and commited all the log entries from the leader
// it is synced with the cluster
// the follower can promote to candidate if needed
s.synced = true
s.promotable = true
return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
}

View File

@ -22,8 +22,10 @@ import (
// Ensure that we can request a vote from a server that has not voted.
func TestServerRequestVote(t *testing.T) {
server := newTestServer("1", &testTransporter{})
server.Initialize()
server.StartLeader()
server.Start()
if _, err := server.Do(&JoinCommand{Name: server.Name()}); err != nil {
t.Fatalf("Server %s unable to join: %v", server.Name(), err)
}
defer server.Stop()
resp := server.RequestVote(newRequestVoteRequest(1, "foo", 0, 0))
if resp.Term != 1 || !resp.VoteGranted {
@ -288,7 +290,7 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
server := newTestServer("1", &testTransporter{})
server.Initialize()
server.StartFollower(true)
server.StartFollower(false)
defer server.Stop()
var err error
if _, err = server.Do(&testCommand1{Val: "foo", I: 10}); err != NotLeaderError {