diff --git a/debug.go b/debug.go index 0f5949ce4b..ec0c6cf048 100644 --- a/debug.go +++ b/debug.go @@ -16,7 +16,7 @@ const ( Trace = 2 ) -var LogLevel int = 0 +var LogLevel int = 2 var logger *log.Logger func init() { diff --git a/join_command.go b/join_command.go new file mode 100644 index 0000000000..1e37c0d3a4 --- /dev/null +++ b/join_command.go @@ -0,0 +1,17 @@ +package raft + +// Join command +type JoinCommand struct { + Name string `json:"name"` +} + +// The name of the Join command in the log +func (c JoinCommand) CommandName() string { + return "raft:join" +} + +func (c JoinCommand) Apply(server *Server) (interface{}, error) { + err := server.AddPeer(c.Name) + + return []byte("join"), err +} \ No newline at end of file diff --git a/leave_command.go b/leave_command.go new file mode 100644 index 0000000000..d4a61536d6 --- /dev/null +++ b/leave_command.go @@ -0,0 +1,17 @@ +package raft + +// Leave command +type LeaveCommand struct { + Name string `json:"name"` +} + +// The name of the Leave command in the log +func (c LeaveCommand) CommandName() string { + return "raft:leave" +} + +func (c LeaveCommand) Apply(server *Server) (interface{}, error) { + err := server.RemovePeer(c.Name) + + return []byte("leave"), err +} \ No newline at end of file diff --git a/nop_command.go b/nop_command.go index 333ebb4b65..e3183cdd88 100644 --- a/nop_command.go +++ b/nop_command.go @@ -10,7 +10,7 @@ type NOPCommand struct { // The name of the NOP command in the log func (c NOPCommand) CommandName() string { - return "nop" + return "raft:nop" } func (c NOPCommand) Apply(server *Server) (interface{}, error) { diff --git a/server.go b/server.go index 66fb4c5dd2..c1da821bb5 100644 --- a/server.go +++ b/server.go @@ -12,6 +12,7 @@ import ( "sort" "sync" "time" + "reflect" ) //------------------------------------------------------------------------------ @@ -65,7 +66,7 @@ type Server struct { transporter Transporter context interface{} currentTerm uint64 - synced bool + promotable bool votedFor string log *Log @@ -309,8 +310,55 @@ func (s *Server) SetHeartbeatTimeout(duration time.Duration) { // Reg the NOPCommand func init() { RegisterCommand(&NOPCommand{}) + RegisterCommand(&JoinCommand{}) } +func (s *Server) Start() error { + // Exit if the server is already running. + if s.state != Stopped { + return errors.New("raft.Server: Server already running") + } + + // Create snapshot directory if not exist + os.Mkdir(path.Join(s.path, "snapshot"), 0700) + + // Initialize the log and load it up. + if err := s.log.open(s.LogPath()); err != nil { + s.debugln("raft: Log error: ", err) + return fmt.Errorf("raft: Initialization error: %s", err) + } + + if err := s.readConf(); err != nil { + s.debugln("raft: Conf file error: ", err) + return fmt.Errorf("raft: Initialization error: %s", err) + } + + // Update the term to the last term in the log. + _, s.currentTerm = s.log.lastInfo() + + s.setState(Follower) + + // If no log entries exist then + // 1. wait for AEs from another node + // 2. wait for self-join command + // to set itself promotable + if (s.log.currentIndex() == 0) { + s.debugln("start as a new raft server") + s.promotable = false + + // If log entries exist then allow promotion to candidate + // if no AEs received. + } else { + s.debugln("start from previous saved state") + s.promotable = true + } + + go s.loop() + + return nil +} + + // Starts the server with a log at the given path. func (s *Server) Initialize() error { @@ -383,8 +431,8 @@ func (s *Server) readConf() error { // Start the sever as a follower // If we set synced to false, the follower will not promote itseft // until it get synced with the cluster once -func (s *Server) StartFollower(synced bool) { - s.synced = synced +func (s *Server) StartFollower(promotable bool) { + s.promotable = promotable s.setState(Follower) s.debugln("follower starts at term: ", s.currentTerm, " index: ", s.log.currentIndex(), " commitIndex: ", s.log.commitIndex) go s.loop() @@ -392,7 +440,7 @@ func (s *Server) StartFollower(synced bool) { // Start the sever as a leader func (s *Server) StartLeader() { - s.synced = true + s.promotable = true s.setState(Leader) s.currentTerm++ s.debugln("leader starts at term: ", s.currentTerm, " index: ", s.log.currentIndex(), " commitIndex: ", s.log.commitIndex) @@ -514,8 +562,22 @@ func (s *Server) followerLoop() { case e := <-s.c: if e.target == &stopValue { s.setState(Stopped) - } else if _, ok := e.target.(Command); ok { - err = NotLeaderError + } else if command, ok := e.target.(Command); ok { + + if reflect.Typeof(command) == { + //If no log entries exist and a self-join command is issued + //then immediately become leader and commit entry. + if s.log.currentIndex() == 0 && command.Name == s.Name() { + fmt.Println("selfjoin!") + s.promotable = true + s.setState(Leader) + s.processCommand(command, e) + } else { + err = NotLeaderError + } + } else { + err = NotLeaderError + } } else if req, ok := e.target.(*AppendEntriesRequest); ok { e.returnValue, update = s.processAppendEntriesRequest(req) } else if req, ok := e.target.(*RequestVoteRequest); ok { @@ -530,7 +592,7 @@ func (s *Server) followerLoop() { case <-timeoutChan: // only allow synced follower to promote to candidate - if s.synced { + if s.promotable { s.setState(Candidate) } else { update = true @@ -810,7 +872,7 @@ func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append // once the server appended and commited all the log entries from the leader // it is synced with the cluster // the follower can promote to candidate if needed - s.synced = true + s.promotable = true return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true } diff --git a/server_test.go b/server_test.go index 544332e957..04a95cd574 100644 --- a/server_test.go +++ b/server_test.go @@ -22,8 +22,10 @@ import ( // Ensure that we can request a vote from a server that has not voted. func TestServerRequestVote(t *testing.T) { server := newTestServer("1", &testTransporter{}) - server.Initialize() - server.StartLeader() + server.Start() + if _, err := server.Do(&JoinCommand{Name: server.Name()}); err != nil { + t.Fatalf("Server %s unable to join: %v", server.Name(), err) + } defer server.Stop() resp := server.RequestVote(newRequestVoteRequest(1, "foo", 0, 0)) if resp.Term != 1 || !resp.VoteGranted { @@ -288,7 +290,7 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) { func TestServerDenyCommandExecutionWhenFollower(t *testing.T) { server := newTestServer("1", &testTransporter{}) server.Initialize() - server.StartFollower(true) + server.StartFollower(false) defer server.Stop() var err error if _, err = server.Do(&testCommand1{Val: "foo", I: 10}); err != NotLeaderError {