2013-04-14 21:37:33 +00:00
|
|
|
package raft
|
|
|
|
|
2013-04-17 02:28:08 +00:00
|
|
|
import (
|
2013-06-24 16:52:51 +00:00
|
|
|
"encoding/json"
|
2013-04-17 02:28:08 +00:00
|
|
|
"errors"
|
|
|
|
"fmt"
|
2013-06-08 02:19:18 +00:00
|
|
|
"io/ioutil"
|
2013-06-03 21:58:12 +00:00
|
|
|
"os"
|
2013-06-05 17:38:49 +00:00
|
|
|
"path"
|
2013-06-08 02:19:18 +00:00
|
|
|
"sort"
|
|
|
|
"sync"
|
|
|
|
"time"
|
2013-04-17 02:28:08 +00:00
|
|
|
)
|
|
|
|
|
2013-04-14 21:37:33 +00:00
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
//
|
|
|
|
// Constants
|
|
|
|
//
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
const (
|
2013-04-17 02:32:49 +00:00
|
|
|
Stopped = "stopped"
|
|
|
|
Follower = "follower"
|
|
|
|
Candidate = "candidate"
|
|
|
|
Leader = "leader"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2013-04-28 04:51:17 +00:00
|
|
|
DefaultHeartbeatTimeout = 50 * time.Millisecond
|
2013-04-28 21:23:21 +00:00
|
|
|
DefaultElectionTimeout = 150 * time.Millisecond
|
2013-04-14 21:37:33 +00:00
|
|
|
)
|
|
|
|
|
2013-05-28 18:46:27 +00:00
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
//
|
|
|
|
// Errors
|
|
|
|
//
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
var NotLeaderError = errors.New("raft.Server: Not current leader")
|
|
|
|
var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
|
2013-05-27 02:04:41 +00:00
|
|
|
|
2013-04-14 21:37:33 +00:00
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
//
|
|
|
|
// Typedefs
|
|
|
|
//
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
|
|
|
// A server is involved in the consensus protocol and can act as a follower,
|
|
|
|
// candidate or a leader.
|
|
|
|
type Server struct {
|
2013-06-24 16:52:51 +00:00
|
|
|
name string
|
|
|
|
path string
|
|
|
|
state string
|
|
|
|
transporter Transporter
|
|
|
|
context interface{}
|
|
|
|
currentTerm uint64
|
|
|
|
|
|
|
|
votedFor string
|
|
|
|
log *Log
|
|
|
|
leader string
|
|
|
|
peers map[string]*Peer
|
|
|
|
mutex sync.Mutex
|
2013-06-13 18:03:32 +00:00
|
|
|
|
2013-06-08 02:19:18 +00:00
|
|
|
electionTimer *Timer
|
|
|
|
heartbeatTimeout time.Duration
|
2013-06-13 18:03:32 +00:00
|
|
|
response chan FlushResponse
|
2013-06-24 16:52:51 +00:00
|
|
|
stepDown chan uint64
|
2013-06-13 18:03:32 +00:00
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
currentSnapshot *Snapshot
|
|
|
|
lastSnapshot *Snapshot
|
|
|
|
stateMachine StateMachine
|
2013-04-14 21:37:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
//
|
|
|
|
// Constructor
|
|
|
|
//
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
2013-04-17 02:28:08 +00:00
|
|
|
// Creates a new server with a log at the given path.
|
2013-06-12 16:47:48 +00:00
|
|
|
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}) (*Server, error) {
|
2013-04-17 02:28:08 +00:00
|
|
|
if name == "" {
|
|
|
|
return nil, errors.New("raft.Server: Name cannot be blank")
|
|
|
|
}
|
2013-05-28 19:57:38 +00:00
|
|
|
if transporter == nil {
|
2013-06-08 02:41:36 +00:00
|
|
|
panic("raft: Transporter required")
|
2013-05-28 19:57:38 +00:00
|
|
|
}
|
|
|
|
|
2013-04-17 02:28:08 +00:00
|
|
|
s := &Server{
|
2013-05-10 14:47:24 +00:00
|
|
|
name: name,
|
|
|
|
path: path,
|
2013-05-28 19:57:38 +00:00
|
|
|
transporter: transporter,
|
2013-06-12 16:47:48 +00:00
|
|
|
stateMachine: stateMachine,
|
2013-06-03 02:43:40 +00:00
|
|
|
context: context,
|
2013-05-10 14:47:24 +00:00
|
|
|
state: Stopped,
|
|
|
|
peers: make(map[string]*Peer),
|
|
|
|
log: NewLog(),
|
|
|
|
electionTimer: NewTimer(DefaultElectionTimeout, DefaultElectionTimeout*2),
|
2013-05-05 20:26:04 +00:00
|
|
|
heartbeatTimeout: DefaultHeartbeatTimeout,
|
2013-04-14 21:37:33 +00:00
|
|
|
}
|
2013-04-30 04:13:50 +00:00
|
|
|
|
|
|
|
// Setup apply function.
|
2013-06-25 20:20:53 +00:00
|
|
|
s.log.ApplyFunc = func(c Command) (interface{}, error) {
|
2013-06-09 05:39:50 +00:00
|
|
|
result, err := c.Apply(s)
|
|
|
|
return result, err
|
2013-04-30 04:13:50 +00:00
|
|
|
}
|
|
|
|
|
2013-04-17 02:28:08 +00:00
|
|
|
return s, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
//
|
|
|
|
// Accessors
|
|
|
|
//
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
2013-05-01 05:21:56 +00:00
|
|
|
//--------------------------------------
|
|
|
|
// General
|
|
|
|
//--------------------------------------
|
|
|
|
|
2013-04-17 02:28:08 +00:00
|
|
|
// Retrieves the name of the server.
|
|
|
|
func (s *Server) Name() string {
|
|
|
|
return s.name
|
|
|
|
}
|
|
|
|
|
|
|
|
// Retrieves the storage path for the server.
|
|
|
|
func (s *Server) Path() string {
|
|
|
|
return s.path
|
|
|
|
}
|
2013-05-05 19:36:23 +00:00
|
|
|
|
2013-06-03 13:51:52 +00:00
|
|
|
func (s *Server) Leader() string {
|
2013-06-03 01:18:25 +00:00
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
|
|
|
return s.leader
|
2013-06-08 02:19:18 +00:00
|
|
|
}
|
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
// Retrieves the peers of the server
|
2013-06-23 18:42:31 +00:00
|
|
|
func (s *Server) Peers() map[string]*Peer {
|
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
|
|
|
return s.peers
|
|
|
|
}
|
|
|
|
|
2013-05-28 19:57:38 +00:00
|
|
|
// Retrieves the object that transports requests.
|
|
|
|
func (s *Server) Transporter() Transporter {
|
|
|
|
return s.transporter
|
|
|
|
}
|
|
|
|
|
2013-06-11 22:30:13 +00:00
|
|
|
func (s *Server) SetTransporter(t Transporter) {
|
|
|
|
s.transporter = t
|
|
|
|
}
|
|
|
|
|
2013-06-03 02:43:40 +00:00
|
|
|
// Retrieves the context passed into the constructor.
|
|
|
|
func (s *Server) Context() interface{} {
|
|
|
|
return s.context
|
|
|
|
}
|
|
|
|
|
2013-04-17 02:28:08 +00:00
|
|
|
// Retrieves the log path for the server.
|
|
|
|
func (s *Server) LogPath() string {
|
|
|
|
return fmt.Sprintf("%s/log", s.path)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Retrieves the current state of the server.
|
2013-04-17 02:32:49 +00:00
|
|
|
func (s *Server) State() string {
|
2013-06-23 18:42:31 +00:00
|
|
|
// s.mutex.Lock()
|
|
|
|
// defer s.mutex.Unlock()
|
2013-04-17 02:28:08 +00:00
|
|
|
return s.state
|
2013-04-14 21:37:33 +00:00
|
|
|
}
|
|
|
|
|
2013-06-10 04:47:59 +00:00
|
|
|
// Retrieves the current term of the server.
|
|
|
|
func (s *Server) Term() uint64 {
|
2013-05-08 03:56:32 +00:00
|
|
|
s.mutex.Lock()
|
2013-05-25 05:37:56 +00:00
|
|
|
defer s.mutex.Unlock()
|
2013-06-10 04:47:59 +00:00
|
|
|
return s.currentTerm
|
2013-04-14 21:37:33 +00:00
|
|
|
}
|
|
|
|
|
2013-05-03 04:16:39 +00:00
|
|
|
// Retrieves the name of the candidate this server voted for in this term.
|
|
|
|
func (s *Server) VotedFor() string {
|
|
|
|
s.mutex.Lock()
|
2013-05-25 05:37:56 +00:00
|
|
|
defer s.mutex.Unlock()
|
2013-05-03 04:16:39 +00:00
|
|
|
return s.votedFor
|
|
|
|
}
|
|
|
|
|
2013-05-08 03:56:32 +00:00
|
|
|
// Retrieves whether the server's log has no entries.
|
|
|
|
func (s *Server) IsLogEmpty() bool {
|
2013-06-03 19:13:38 +00:00
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
2013-05-08 03:56:32 +00:00
|
|
|
return s.log.IsEmpty()
|
|
|
|
}
|
|
|
|
|
2013-05-08 20:22:08 +00:00
|
|
|
// A list of all the log entries. This should only be used for debugging purposes.
|
|
|
|
func (s *Server) LogEntries() []*LogEntry {
|
2013-06-03 19:13:38 +00:00
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
2013-05-08 20:22:08 +00:00
|
|
|
if s.log != nil {
|
|
|
|
return s.log.entries
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2013-06-03 19:13:38 +00:00
|
|
|
// A reference to the command name of the last entry.
|
|
|
|
func (s *Server) LastCommandName() string {
|
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
|
|
|
if s.log != nil {
|
|
|
|
return s.log.LastCommandName()
|
|
|
|
}
|
|
|
|
return ""
|
|
|
|
}
|
|
|
|
|
2013-05-01 05:21:56 +00:00
|
|
|
//--------------------------------------
|
|
|
|
// Membership
|
|
|
|
//--------------------------------------
|
|
|
|
|
2013-04-28 04:51:17 +00:00
|
|
|
// Retrieves the number of member servers in the consensus.
|
2013-05-01 05:11:23 +00:00
|
|
|
func (s *Server) MemberCount() int {
|
2013-06-22 15:38:48 +00:00
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
2013-06-22 15:35:30 +00:00
|
|
|
return len(s.peers) + 1
|
2013-04-28 04:51:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Retrieves the number of servers required to make a quorum.
|
2013-05-01 05:11:23 +00:00
|
|
|
func (s *Server) QuorumSize() int {
|
2013-04-28 04:51:17 +00:00
|
|
|
return (s.MemberCount() / 2) + 1
|
|
|
|
}
|
|
|
|
|
2013-05-01 05:21:56 +00:00
|
|
|
//--------------------------------------
|
|
|
|
// Election timeout
|
|
|
|
//--------------------------------------
|
|
|
|
|
|
|
|
// Retrieves the election timeout.
|
|
|
|
func (s *Server) ElectionTimeout() time.Duration {
|
2013-05-05 20:01:06 +00:00
|
|
|
return s.electionTimer.MinDuration()
|
2013-05-01 05:21:56 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Sets the election timeout.
|
|
|
|
func (s *Server) SetElectionTimeout(duration time.Duration) {
|
2013-05-05 20:01:06 +00:00
|
|
|
s.electionTimer.SetMinDuration(duration)
|
|
|
|
s.electionTimer.SetMaxDuration(duration * 2)
|
2013-05-01 05:21:56 +00:00
|
|
|
}
|
|
|
|
|
2013-06-12 16:47:48 +00:00
|
|
|
func (s *Server) StartElectionTimeout() {
|
|
|
|
s.electionTimer.Reset()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *Server) StartHeartbeatTimeout() {
|
|
|
|
for _, peer := range s.peers {
|
|
|
|
peer.StartHeartbeatTimeout()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-05-05 21:41:55 +00:00
|
|
|
//--------------------------------------
|
|
|
|
// Heartbeat timeout
|
|
|
|
//--------------------------------------
|
|
|
|
|
|
|
|
// Retrieves the heartbeat timeout.
|
|
|
|
func (s *Server) HeartbeatTimeout() time.Duration {
|
|
|
|
return s.heartbeatTimeout
|
|
|
|
}
|
|
|
|
|
|
|
|
// Sets the heartbeat timeout.
|
|
|
|
func (s *Server) SetHeartbeatTimeout(duration time.Duration) {
|
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
|
|
|
|
|
|
|
s.heartbeatTimeout = duration
|
|
|
|
for _, peer := range s.peers {
|
|
|
|
peer.SetHeartbeatTimeout(duration)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-04-14 21:37:33 +00:00
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
//
|
|
|
|
// Methods
|
|
|
|
//
|
|
|
|
//------------------------------------------------------------------------------
|
2013-04-17 02:28:08 +00:00
|
|
|
|
|
|
|
//--------------------------------------
|
2013-06-24 16:52:51 +00:00
|
|
|
// Initialization
|
2013-04-17 02:28:08 +00:00
|
|
|
//--------------------------------------
|
|
|
|
|
|
|
|
// Starts the server with a log at the given path.
|
2013-06-10 04:47:59 +00:00
|
|
|
func (s *Server) Initialize() error {
|
2013-04-17 02:28:08 +00:00
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
|
|
|
|
|
|
|
// Exit if the server is already running.
|
|
|
|
if s.Running() {
|
|
|
|
return errors.New("raft.Server: Server already running")
|
|
|
|
}
|
2013-04-28 21:23:21 +00:00
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
// Initialize response channel
|
|
|
|
s.response = make(chan FlushResponse, 128)
|
2013-06-03 21:58:12 +00:00
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
// Create snapshot directory if not exist
|
2013-06-08 02:19:18 +00:00
|
|
|
os.Mkdir(s.path+"/snapshot", 0700)
|
2013-06-03 21:58:12 +00:00
|
|
|
|
2013-04-17 02:28:08 +00:00
|
|
|
// Initialize the log and load it up.
|
|
|
|
if err := s.log.Open(s.LogPath()); err != nil {
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("log error")
|
2013-04-17 02:28:08 +00:00
|
|
|
s.unload()
|
2013-04-28 21:23:21 +00:00
|
|
|
return fmt.Errorf("raft.Server: %v", err)
|
2013-04-17 02:28:08 +00:00
|
|
|
}
|
|
|
|
|
2013-05-10 03:50:57 +00:00
|
|
|
// Update the term to the last term in the log.
|
|
|
|
s.currentTerm = s.log.CurrentTerm()
|
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
return nil
|
|
|
|
}
|
2013-06-23 18:42:31 +00:00
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
// Start the sever as a follower
|
|
|
|
func (s *Server) StartFollower() {
|
2013-04-17 02:32:49 +00:00
|
|
|
// Update the state.
|
|
|
|
s.state = Follower
|
2013-05-05 21:41:55 +00:00
|
|
|
|
|
|
|
// Start the election timeout.
|
2013-06-04 13:38:02 +00:00
|
|
|
c := make(chan bool)
|
2013-06-24 16:52:51 +00:00
|
|
|
s.electionTimer.Reset()
|
2013-06-04 13:38:02 +00:00
|
|
|
go s.electionTimeoutFunc(c)
|
2013-06-08 02:19:18 +00:00
|
|
|
<-c
|
2013-06-24 16:52:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Start the sever as a leader
|
|
|
|
func (s *Server) StartLeader() error {
|
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
|
|
|
|
|
|
|
// Start as leader.
|
|
|
|
s.currentTerm++
|
|
|
|
s.state = Leader
|
|
|
|
s.leader = s.name
|
|
|
|
s.electionTimer.Pause()
|
|
|
|
|
|
|
|
// Leader need to collect appendLog response
|
|
|
|
go s.commitCenter()
|
2013-04-17 02:28:08 +00:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
// Collect response from followers. If more than the
|
|
|
|
// majority of the followers append a log entry, the
|
|
|
|
// leader will commit the log entry
|
2013-06-23 18:42:31 +00:00
|
|
|
func (s *Server) commitCenter() {
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("collecting data")
|
2013-06-23 18:42:31 +00:00
|
|
|
for {
|
2013-06-24 16:52:51 +00:00
|
|
|
var response FlushResponse
|
2013-06-23 18:42:31 +00:00
|
|
|
|
|
|
|
select {
|
2013-06-24 16:52:51 +00:00
|
|
|
case response = <-s.response:
|
2013-06-23 18:42:31 +00:00
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
case term := <-s.stepDown:
|
|
|
|
s.setCurrentTerm(term)
|
|
|
|
return
|
2013-06-23 18:42:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if response.peer != nil {
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("[CommitCenter] Receive respone from ", response.peer.Name(), response.success)
|
2013-06-23 18:42:31 +00:00
|
|
|
}
|
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
// TODO: UINT64 SORTING
|
|
|
|
// Convert uint64 to int, since go does not have a built in
|
|
|
|
// func to sort uint64
|
|
|
|
|
|
|
|
// when the leader is the only member in the cluster, it can commit
|
|
|
|
// the log immediately
|
2013-06-23 18:42:31 +00:00
|
|
|
if s.QuorumSize() < 2 {
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("[CommitCenter] Commit ", s.log.CurrentIndex())
|
2013-06-24 03:41:43 +00:00
|
|
|
|
|
|
|
commited := int(s.log.CommitIndex())
|
|
|
|
commit := int(s.log.CurrentIndex())
|
|
|
|
|
|
|
|
s.log.SetCommitIndex(uint64(commit))
|
|
|
|
|
|
|
|
for i := commited; i < commit; i++ {
|
|
|
|
select {
|
2013-06-24 16:52:51 +00:00
|
|
|
case s.log.entries[i-int(s.log.startIndex)].commit <- true:
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("notify")
|
2013-06-24 03:41:43 +00:00
|
|
|
continue
|
2013-06-24 16:52:51 +00:00
|
|
|
// we have a buffered commit channel, it should return immediately
|
2013-06-24 03:41:43 +00:00
|
|
|
default:
|
2013-06-24 16:52:51 +00:00
|
|
|
panic("Cannot send commit nofication")
|
2013-06-24 03:41:43 +00:00
|
|
|
}
|
|
|
|
}
|
2013-06-23 18:42:31 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
// TODO: Current we use sort which is O(NlogN).
|
|
|
|
// we should record the previous infomation and
|
|
|
|
// find the index to commit in O(1)
|
|
|
|
|
2013-06-23 18:42:31 +00:00
|
|
|
var data []int
|
|
|
|
data = append(data, int(s.log.CurrentIndex()))
|
|
|
|
|
|
|
|
for _, peer := range s.peers {
|
|
|
|
data = append(data, int(peer.prevLogIndex))
|
|
|
|
}
|
|
|
|
|
|
|
|
sort.Ints(data)
|
2013-06-24 16:52:51 +00:00
|
|
|
|
|
|
|
// We can commit upto the index which the mojarity
|
|
|
|
// of the members have appended.
|
|
|
|
commit := data[s.QuorumSize()-1]
|
2013-06-23 18:42:31 +00:00
|
|
|
|
2013-06-24 03:41:43 +00:00
|
|
|
commited := int(s.log.CommitIndex())
|
|
|
|
|
|
|
|
if commit > commited {
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("[CommitCenter] Going to Commit ", commit)
|
2013-06-23 18:42:31 +00:00
|
|
|
s.log.SetCommitIndex(uint64(commit))
|
2013-06-24 03:41:43 +00:00
|
|
|
|
|
|
|
for i := commited; i < commit; i++ {
|
|
|
|
select {
|
2013-06-24 16:52:51 +00:00
|
|
|
case s.log.entries[i-int(s.log.startIndex)].commit <- true:
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("notify")
|
2013-06-24 03:41:43 +00:00
|
|
|
continue
|
|
|
|
default:
|
|
|
|
continue
|
|
|
|
}
|
2013-06-23 18:42:31 +00:00
|
|
|
}
|
|
|
|
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("[CommitCenter] Commit ", commit)
|
2013-06-23 18:42:31 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
func (s *Server) commitNotify() {
|
2013-06-23 18:42:31 +00:00
|
|
|
|
2013-04-17 02:28:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Shuts down the server.
|
|
|
|
func (s *Server) Stop() {
|
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
|
|
|
s.unload()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Unloads the server.
|
|
|
|
func (s *Server) unload() {
|
2013-05-27 00:02:31 +00:00
|
|
|
// Kill the election timer.
|
2013-06-03 23:16:50 +00:00
|
|
|
if s.electionTimer != nil {
|
|
|
|
s.electionTimer.Stop()
|
|
|
|
s.electionTimer = nil
|
|
|
|
}
|
2013-05-10 14:47:24 +00:00
|
|
|
|
2013-05-27 00:02:31 +00:00
|
|
|
// Remove peers.
|
|
|
|
for _, peer := range s.peers {
|
|
|
|
peer.stop()
|
|
|
|
}
|
2013-06-11 22:30:13 +00:00
|
|
|
// wait for all previous flush ends
|
|
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
|
2013-05-27 00:02:31 +00:00
|
|
|
s.peers = make(map[string]*Peer)
|
|
|
|
|
|
|
|
// Close the log.
|
2013-04-17 02:28:08 +00:00
|
|
|
if s.log != nil {
|
|
|
|
s.log.Close()
|
|
|
|
s.log = nil
|
|
|
|
}
|
2013-04-17 02:32:49 +00:00
|
|
|
|
|
|
|
s.state = Stopped
|
2013-04-17 02:28:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Checks if the server is currently running.
|
|
|
|
func (s *Server) Running() bool {
|
|
|
|
return s.state != Stopped
|
|
|
|
}
|
|
|
|
|
|
|
|
//--------------------------------------
|
|
|
|
// Commands
|
|
|
|
//--------------------------------------
|
|
|
|
|
2013-04-28 04:51:17 +00:00
|
|
|
// Attempts to execute a command and replicate it. The function will return
|
|
|
|
// when the command has been successfully committed or an error has occurred.
|
|
|
|
|
2013-06-25 20:20:53 +00:00
|
|
|
func (s *Server) Do(command Command) (interface{}, error) {
|
2013-05-27 02:04:41 +00:00
|
|
|
if s.state != Leader {
|
2013-06-09 05:39:50 +00:00
|
|
|
return nil, NotLeaderError
|
2013-05-27 02:04:41 +00:00
|
|
|
}
|
2013-05-27 02:06:08 +00:00
|
|
|
|
2013-05-05 19:36:23 +00:00
|
|
|
entry := s.log.CreateEntry(s.currentTerm, command)
|
|
|
|
if err := s.log.AppendEntry(entry); err != nil {
|
2013-06-09 05:39:50 +00:00
|
|
|
return nil, err
|
2013-05-05 19:36:23 +00:00
|
|
|
}
|
2013-05-05 20:26:04 +00:00
|
|
|
|
2013-06-23 18:42:31 +00:00
|
|
|
s.response <- FlushResponse{s.currentTerm, true, nil, nil}
|
2013-05-05 19:36:23 +00:00
|
|
|
|
2013-06-24 03:41:43 +00:00
|
|
|
// to speed up the response time
|
2013-06-09 05:39:50 +00:00
|
|
|
for _, peer := range s.peers {
|
2013-06-10 04:47:59 +00:00
|
|
|
peer.heartbeatTimer.fire()
|
2013-05-05 19:36:23 +00:00
|
|
|
}
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("[Do] join!")
|
2013-06-24 03:41:43 +00:00
|
|
|
|
|
|
|
// timeout here
|
|
|
|
select {
|
2013-06-24 16:52:51 +00:00
|
|
|
case <-entry.commit:
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("[Do] finish!")
|
2013-06-24 16:52:51 +00:00
|
|
|
return entry.result, nil
|
|
|
|
case <-time.After(time.Second):
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("[Do] fail!")
|
2013-06-24 16:52:51 +00:00
|
|
|
return nil, errors.New("Command commit fails")
|
2013-05-05 19:36:23 +00:00
|
|
|
}
|
2013-04-28 04:51:17 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Appends a log entry from the leader to this server.
|
2013-04-30 04:13:50 +00:00
|
|
|
func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
|
2013-04-28 04:51:17 +00:00
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
2013-05-05 21:41:55 +00:00
|
|
|
// If the server is stopped then reject it.
|
|
|
|
if !s.Running() {
|
2013-05-10 14:47:24 +00:00
|
|
|
return NewAppendEntriesResponse(s.currentTerm, false, 0), fmt.Errorf("raft.Server: Server stopped")
|
2013-05-05 21:41:55 +00:00
|
|
|
}
|
|
|
|
|
2013-04-30 04:13:50 +00:00
|
|
|
// If the request is coming from an old term then reject it.
|
2013-06-23 18:42:31 +00:00
|
|
|
|
2013-04-30 04:13:50 +00:00
|
|
|
if req.Term < s.currentTerm {
|
2013-05-10 14:47:24 +00:00
|
|
|
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), fmt.Errorf("raft.Server: Stale request term")
|
2013-04-30 04:13:50 +00:00
|
|
|
}
|
2013-06-24 16:52:51 +00:00
|
|
|
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("Peer ", s.Name(), "received heartbeat from ", req.LeaderName,
|
2013-06-24 16:52:51 +00:00
|
|
|
" ", req.Term, " ", s.currentTerm, " ", time.Now())
|
2013-06-10 04:47:59 +00:00
|
|
|
|
2013-04-30 04:13:50 +00:00
|
|
|
s.setCurrentTerm(req.Term)
|
2013-06-08 02:19:18 +00:00
|
|
|
|
2013-06-03 13:51:52 +00:00
|
|
|
// Update the current leader.
|
|
|
|
s.leader = req.LeaderName
|
2013-04-30 04:13:50 +00:00
|
|
|
|
|
|
|
// Reset election timeout.
|
2013-06-03 23:16:50 +00:00
|
|
|
if s.electionTimer != nil {
|
|
|
|
s.electionTimer.Reset()
|
|
|
|
}
|
2013-06-08 02:19:18 +00:00
|
|
|
|
2013-04-30 04:13:50 +00:00
|
|
|
// Reject if log doesn't contain a matching previous entry.
|
2013-05-01 04:44:16 +00:00
|
|
|
if err := s.log.Truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
|
2013-05-10 14:47:24 +00:00
|
|
|
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err
|
2013-04-30 04:13:50 +00:00
|
|
|
}
|
|
|
|
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("Peer ", s.Name(), "after truncate ")
|
2013-06-24 03:41:43 +00:00
|
|
|
|
2013-04-30 04:13:50 +00:00
|
|
|
// Append entries to the log.
|
|
|
|
if err := s.log.AppendEntries(req.Entries); err != nil {
|
2013-05-10 14:47:24 +00:00
|
|
|
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err
|
2013-04-30 04:13:50 +00:00
|
|
|
}
|
|
|
|
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("Peer ", s.Name(), "after append ")
|
2013-04-30 04:13:50 +00:00
|
|
|
// Commit up to the commit index.
|
|
|
|
if err := s.log.SetCommitIndex(req.CommitIndex); err != nil {
|
2013-05-10 14:47:24 +00:00
|
|
|
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err
|
2013-06-08 02:19:18 +00:00
|
|
|
}
|
2013-04-30 04:13:50 +00:00
|
|
|
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("Peer ", s.Name(), "after commit ")
|
2013-06-24 03:41:43 +00:00
|
|
|
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("Peer ", s.Name(), "reply heartbeat from ", req.LeaderName,
|
2013-06-24 16:52:51 +00:00
|
|
|
" ", req.Term, " ", s.currentTerm, " ", time.Now())
|
2013-05-10 14:47:24 +00:00
|
|
|
return NewAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex()), nil
|
2013-04-28 04:51:17 +00:00
|
|
|
}
|
|
|
|
|
2013-05-05 19:36:23 +00:00
|
|
|
// Creates an AppendEntries request.
|
2013-05-28 19:57:38 +00:00
|
|
|
func (s *Server) createAppendEntriesRequest(prevLogIndex uint64) *AppendEntriesRequest {
|
2013-05-05 19:36:23 +00:00
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
|
|
|
return s.createInternalAppendEntriesRequest(prevLogIndex)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Creates an AppendEntries request without a lock.
|
2013-05-28 19:57:38 +00:00
|
|
|
func (s *Server) createInternalAppendEntriesRequest(prevLogIndex uint64) *AppendEntriesRequest {
|
2013-05-05 19:36:23 +00:00
|
|
|
if s.log == nil {
|
2013-05-28 19:57:38 +00:00
|
|
|
return nil
|
2013-05-05 19:36:23 +00:00
|
|
|
}
|
|
|
|
entries, prevLogTerm := s.log.GetEntriesAfter(prevLogIndex)
|
|
|
|
req := NewAppendEntriesRequest(s.currentTerm, s.name, prevLogIndex, prevLogTerm, entries, s.log.CommitIndex())
|
2013-05-28 19:57:38 +00:00
|
|
|
return req
|
2013-05-05 19:36:23 +00:00
|
|
|
}
|
|
|
|
|
2013-04-28 21:23:21 +00:00
|
|
|
//--------------------------------------
|
2013-05-01 05:11:23 +00:00
|
|
|
// Promotion
|
|
|
|
//--------------------------------------
|
|
|
|
|
|
|
|
// Promotes the server to a candidate and then requests votes from peers. If
|
|
|
|
// enough votes are received then the server becomes the leader. If this
|
|
|
|
// server is elected then true is returned. If another server is elected then
|
|
|
|
// false is returned.
|
2013-05-01 05:21:56 +00:00
|
|
|
func (s *Server) promote() (bool, error) {
|
2013-06-06 03:25:17 +00:00
|
|
|
|
2013-05-01 05:11:23 +00:00
|
|
|
for {
|
|
|
|
// Start a new election.
|
2013-06-03 23:16:50 +00:00
|
|
|
term, lastLogIndex, lastLogTerm, err := s.promoteToCandidate()
|
|
|
|
if err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
2013-05-01 05:11:23 +00:00
|
|
|
|
|
|
|
// Request votes from each of our peers.
|
|
|
|
c := make(chan *RequestVoteResponse, len(s.peers))
|
|
|
|
for _, _peer := range s.peers {
|
|
|
|
peer := _peer
|
|
|
|
go func() {
|
|
|
|
req := NewRequestVoteRequest(term, s.name, lastLogIndex, lastLogTerm)
|
|
|
|
req.peer = peer
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln(s.Name(), "Send Vote Request to ", peer.Name())
|
2013-05-28 19:57:38 +00:00
|
|
|
if resp, _ := s.transporter.SendVoteRequest(s, peer, req); resp != nil {
|
2013-05-10 03:50:57 +00:00
|
|
|
resp.peer = peer
|
|
|
|
c <- resp
|
|
|
|
}
|
2013-05-01 05:11:23 +00:00
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Collect votes until we have a quorum.
|
|
|
|
votes := map[string]bool{}
|
2013-06-24 03:41:43 +00:00
|
|
|
|
2013-05-01 05:11:23 +00:00
|
|
|
elected := false
|
2013-06-24 03:41:43 +00:00
|
|
|
timeout := false
|
|
|
|
|
2013-05-01 05:11:23 +00:00
|
|
|
for {
|
2013-06-24 16:52:51 +00:00
|
|
|
// if timeout happened, restart the promotion
|
2013-06-24 03:41:43 +00:00
|
|
|
if timeout {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
2013-05-01 05:11:23 +00:00
|
|
|
// Add up all our votes.
|
|
|
|
votesGranted := 1
|
|
|
|
for _, value := range votes {
|
|
|
|
if value {
|
|
|
|
votesGranted++
|
|
|
|
}
|
|
|
|
}
|
2013-06-24 03:41:43 +00:00
|
|
|
|
2013-05-01 05:11:23 +00:00
|
|
|
// If we received enough votes then stop waiting for more votes.
|
|
|
|
if votesGranted >= s.QuorumSize() {
|
|
|
|
elected = true
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
// Collect votes from peers.
|
|
|
|
select {
|
|
|
|
case resp := <-c:
|
2013-05-01 05:21:56 +00:00
|
|
|
if resp != nil {
|
|
|
|
// Step down if we discover a higher term.
|
|
|
|
if resp.Term > term {
|
2013-06-03 23:16:50 +00:00
|
|
|
s.mutex.Lock()
|
2013-05-01 05:21:56 +00:00
|
|
|
s.setCurrentTerm(term)
|
2013-06-03 23:16:50 +00:00
|
|
|
if s.electionTimer != nil {
|
|
|
|
s.electionTimer.Reset()
|
|
|
|
}
|
|
|
|
s.mutex.Unlock()
|
2013-05-01 05:21:56 +00:00
|
|
|
return false, fmt.Errorf("raft.Server: Higher term discovered, stepping down: (%v > %v)", resp.Term, term)
|
|
|
|
}
|
2013-05-01 05:11:23 +00:00
|
|
|
votes[resp.peer.Name()] = resp.VoteGranted
|
|
|
|
}
|
2013-05-27 00:02:31 +00:00
|
|
|
case <-afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2):
|
2013-06-24 03:41:43 +00:00
|
|
|
timeout = true
|
2013-05-01 05:11:23 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// If we received enough votes then promote to leader and stop this election.
|
|
|
|
if elected && s.promoteToLeader(term, lastLogIndex, lastLogTerm) {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
// If we are no longer in the same term then another server must have been elected.
|
2013-05-08 03:56:32 +00:00
|
|
|
s.mutex.Lock()
|
2013-05-01 05:11:23 +00:00
|
|
|
if s.currentTerm != term {
|
2013-05-08 03:56:32 +00:00
|
|
|
s.mutex.Unlock()
|
2013-05-01 05:21:56 +00:00
|
|
|
return false, fmt.Errorf("raft.Server: Term changed during election, stepping down: (%v > %v)", s.currentTerm, term)
|
2013-05-01 05:11:23 +00:00
|
|
|
}
|
2013-05-08 03:56:32 +00:00
|
|
|
s.mutex.Unlock()
|
2013-05-01 05:11:23 +00:00
|
|
|
}
|
|
|
|
|
2013-05-01 05:21:56 +00:00
|
|
|
return true, nil
|
2013-05-01 05:11:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Promotes the server to a candidate and increases the election term. The
|
|
|
|
// term and log state are returned for use in the RPCs.
|
2013-06-03 23:16:50 +00:00
|
|
|
func (s *Server) promoteToCandidate() (uint64, uint64, uint64, error) {
|
2013-05-01 05:11:23 +00:00
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
|
|
|
|
2013-06-03 23:16:50 +00:00
|
|
|
// Ignore promotion if the server is not a follower.
|
|
|
|
if s.state != Follower && s.state != Candidate {
|
|
|
|
return 0, 0, 0, fmt.Errorf("raft: Invalid promotion state: %s", s.state)
|
|
|
|
}
|
|
|
|
|
2013-05-01 05:21:56 +00:00
|
|
|
// Move server to become a candidate, increase our term & vote for ourself.
|
2013-05-01 05:11:23 +00:00
|
|
|
s.state = Candidate
|
|
|
|
s.currentTerm++
|
2013-05-01 05:21:56 +00:00
|
|
|
s.votedFor = s.name
|
2013-06-03 01:18:25 +00:00
|
|
|
s.leader = ""
|
2013-06-24 03:41:43 +00:00
|
|
|
|
2013-05-01 05:11:23 +00:00
|
|
|
// Pause the election timer while we're a candidate.
|
|
|
|
s.electionTimer.Pause()
|
2013-06-24 03:41:43 +00:00
|
|
|
|
2013-05-01 05:11:23 +00:00
|
|
|
// Return server state so we can check for it during leader promotion.
|
2013-06-23 18:42:31 +00:00
|
|
|
lastLogIndex, lastLogTerm := s.log.LastInfo()
|
|
|
|
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("[PromoteToCandidate] Follower ", s.Name(),
|
2013-06-23 18:42:31 +00:00
|
|
|
"promote to candidate[", lastLogIndex, ",", lastLogTerm, "]")
|
|
|
|
|
2013-06-03 23:16:50 +00:00
|
|
|
return s.currentTerm, lastLogIndex, lastLogTerm, nil
|
2013-05-01 05:11:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Promotes the server from a candidate to a leader. This can only occur if
|
|
|
|
// the server is in the state that it assumed when the candidate election
|
|
|
|
// began. This is because another server may have won the election and caused
|
|
|
|
// the state to change.
|
|
|
|
func (s *Server) promoteToLeader(term uint64, lastLogIndex uint64, lastLogTerm uint64) bool {
|
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
|
|
|
|
|
|
|
// Ignore promotion if we are not a candidate.
|
|
|
|
if s.state != Candidate {
|
2013-06-24 16:52:51 +00:00
|
|
|
panic("promote to leader but not candidate")
|
2013-05-01 05:11:23 +00:00
|
|
|
}
|
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
// TODO: should panic or just a false?
|
|
|
|
|
2013-05-01 05:11:23 +00:00
|
|
|
// Disallow promotion if the term or log does not match what we currently have.
|
2013-06-23 18:42:31 +00:00
|
|
|
logIndex, logTerm := s.log.LastInfo()
|
2013-05-01 05:11:23 +00:00
|
|
|
if s.currentTerm != term || logIndex != lastLogIndex || logTerm != lastLogTerm {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2013-05-05 21:41:55 +00:00
|
|
|
// Move server to become a leader and begin peer heartbeats.
|
2013-05-01 05:11:23 +00:00
|
|
|
s.state = Leader
|
2013-06-03 01:18:25 +00:00
|
|
|
s.leader = s.name
|
2013-06-23 18:42:31 +00:00
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
// Begin to collect response from followers
|
|
|
|
go s.commitCenter()
|
|
|
|
|
|
|
|
// Update the peers prevLogIndex to leader's lastLogIndex
|
|
|
|
// Start heartbeat
|
2013-05-05 21:41:55 +00:00
|
|
|
for _, peer := range s.peers {
|
2013-06-24 16:52:51 +00:00
|
|
|
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("[Leader] Set ", peer.Name(), "Prev to", lastLogIndex)
|
2013-06-24 16:52:51 +00:00
|
|
|
|
2013-06-11 22:30:13 +00:00
|
|
|
peer.prevLogIndex = lastLogIndex
|
2013-05-05 21:41:55 +00:00
|
|
|
peer.resume()
|
|
|
|
}
|
2013-05-01 05:21:56 +00:00
|
|
|
|
2013-05-01 05:11:23 +00:00
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
//--------------------------------------
|
|
|
|
// Request Vote
|
2013-04-28 21:23:21 +00:00
|
|
|
//--------------------------------------
|
|
|
|
|
|
|
|
// Requests a vote from a server. A vote can be obtained if the vote's term is
|
|
|
|
// at the server's current term and the server has not made a vote yet. A vote
|
|
|
|
// can also be obtained if the term is greater than the server's current term.
|
2013-05-05 21:41:55 +00:00
|
|
|
func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, error) {
|
2013-04-28 21:23:21 +00:00
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
2013-06-23 18:42:31 +00:00
|
|
|
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("Peer ", s.Name(), "receive vote request from ", req.CandidateName)
|
|
|
|
//debugln("[RequestVote] got the lock")
|
2013-05-08 03:56:32 +00:00
|
|
|
// Fail if the server is not running.
|
|
|
|
if !s.Running() {
|
|
|
|
return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Server is stopped")
|
|
|
|
}
|
|
|
|
|
2013-04-28 21:23:21 +00:00
|
|
|
// If the request is coming from an old term then reject it.
|
|
|
|
if req.Term < s.currentTerm {
|
2013-05-05 21:41:55 +00:00
|
|
|
return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Stale term: %v < %v", req.Term, s.currentTerm)
|
2013-04-28 21:23:21 +00:00
|
|
|
}
|
2013-04-30 04:13:50 +00:00
|
|
|
s.setCurrentTerm(req.Term)
|
2013-04-28 21:23:21 +00:00
|
|
|
|
|
|
|
// If we've already voted for a different candidate then don't vote for this candidate.
|
|
|
|
if s.votedFor != "" && s.votedFor != req.CandidateName {
|
2013-05-05 21:41:55 +00:00
|
|
|
return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Already voted for %v", s.votedFor)
|
2013-04-28 21:23:21 +00:00
|
|
|
}
|
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
// If the candidate's log is not at least as up-to-date as
|
|
|
|
// our last log then don't vote.
|
2013-06-23 18:42:31 +00:00
|
|
|
lastIndex, lastTerm := s.log.LastInfo()
|
|
|
|
if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
|
|
|
|
return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Out-of-date log: [%v/%v] > [%v/%v]", lastIndex, lastTerm, req.LastLogIndex, req.LastLogTerm)
|
2013-04-28 22:49:52 +00:00
|
|
|
}
|
2013-04-28 21:23:21 +00:00
|
|
|
|
2013-04-28 22:49:52 +00:00
|
|
|
// If we made it this far then cast a vote and reset our election time out.
|
|
|
|
s.votedFor = req.CandidateName
|
2013-06-24 16:52:51 +00:00
|
|
|
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln(s.Name(), "Vote for ", req.CandidateName)
|
2013-06-24 16:52:51 +00:00
|
|
|
|
2013-06-03 23:16:50 +00:00
|
|
|
if s.electionTimer != nil {
|
|
|
|
s.electionTimer.Reset()
|
|
|
|
}
|
2013-06-24 16:52:51 +00:00
|
|
|
|
2013-05-05 21:41:55 +00:00
|
|
|
return NewRequestVoteResponse(s.currentTerm, true), nil
|
2013-04-28 21:23:21 +00:00
|
|
|
}
|
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
// Updates the current term on the server if the term is greater than the
|
2013-04-30 04:13:50 +00:00
|
|
|
// server's current term. When the term is changed then the server's vote is
|
|
|
|
// cleared and its state is changed to be a follower.
|
|
|
|
func (s *Server) setCurrentTerm(term uint64) {
|
|
|
|
if term > s.currentTerm {
|
|
|
|
s.votedFor = ""
|
2013-06-24 16:52:51 +00:00
|
|
|
|
|
|
|
if s.state == Leader {
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln(s.Name(), " step down to a follower")
|
2013-06-24 16:52:51 +00:00
|
|
|
|
|
|
|
// stop heartbeats
|
|
|
|
for _, peer := range s.peers {
|
|
|
|
peer.pause()
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case s.stepDown <- term:
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2013-05-05 21:41:55 +00:00
|
|
|
}
|
2013-06-24 16:52:51 +00:00
|
|
|
|
2013-04-30 04:13:50 +00:00
|
|
|
s.state = Follower
|
2013-06-24 16:52:51 +00:00
|
|
|
|
2013-06-23 18:42:31 +00:00
|
|
|
// update term after stop all the peer
|
|
|
|
s.currentTerm = term
|
2013-05-05 21:41:55 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Listens to the election timeout and kicks off a new election.
|
2013-06-04 13:38:02 +00:00
|
|
|
func (s *Server) electionTimeoutFunc(startChannel chan bool) {
|
|
|
|
startChannel <- true
|
2013-05-05 21:41:55 +00:00
|
|
|
for {
|
|
|
|
// Grab the current timer channel.
|
|
|
|
s.mutex.Lock()
|
|
|
|
var c chan time.Time
|
|
|
|
if s.electionTimer != nil {
|
|
|
|
c = s.electionTimer.C()
|
|
|
|
}
|
|
|
|
s.mutex.Unlock()
|
|
|
|
|
|
|
|
// If the channel or timer are gone then exit.
|
|
|
|
if c == nil {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
// If an election times out then promote this server. If the channel
|
|
|
|
// closes then that means the server has stopped so kill the function.
|
2013-05-10 14:47:24 +00:00
|
|
|
if _, ok := <-c; ok {
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("[ElectionTimeout] ", s.Name(), " ", time.Now())
|
2013-05-05 21:41:55 +00:00
|
|
|
s.promote()
|
|
|
|
} else {
|
|
|
|
break
|
|
|
|
}
|
2013-04-30 04:13:50 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-04-17 02:32:49 +00:00
|
|
|
//--------------------------------------
|
|
|
|
// Membership
|
|
|
|
//--------------------------------------
|
|
|
|
|
2013-05-28 18:46:27 +00:00
|
|
|
// Adds a peer to the server. This should be called by a system's join command
|
|
|
|
// within the context so that it is within the context of the server lock.
|
|
|
|
func (s *Server) AddPeer(name string) error {
|
|
|
|
// Do not allow peers to be added twice.
|
2013-06-10 04:47:59 +00:00
|
|
|
|
2013-05-28 18:46:27 +00:00
|
|
|
if s.peers[name] != nil {
|
|
|
|
return DuplicatePeerError
|
2013-04-17 02:32:49 +00:00
|
|
|
}
|
2013-04-28 21:23:21 +00:00
|
|
|
|
2013-05-28 18:46:27 +00:00
|
|
|
// Only add the peer if it doesn't have the same name.
|
|
|
|
if s.name != name {
|
2013-06-25 20:11:48 +00:00
|
|
|
//debugln("Add peer ", name)
|
2013-05-28 18:46:27 +00:00
|
|
|
peer := NewPeer(s, name, s.heartbeatTimeout)
|
2013-06-04 13:35:43 +00:00
|
|
|
if s.state == Leader {
|
|
|
|
peer.resume()
|
|
|
|
}
|
2013-05-28 18:46:27 +00:00
|
|
|
s.peers[peer.name] = peer
|
2013-04-28 21:23:21 +00:00
|
|
|
|
2013-06-05 00:02:45 +00:00
|
|
|
}
|
2013-05-28 18:46:27 +00:00
|
|
|
return nil
|
2013-04-17 02:32:49 +00:00
|
|
|
}
|
2013-06-05 17:57:31 +00:00
|
|
|
|
|
|
|
// Removes a peer from the server. This should be called by a system's join command
|
|
|
|
// within the context so that it is within the context of the server lock.
|
|
|
|
func (s *Server) RemovePeer(name string) error {
|
|
|
|
// Ignore removal of the server itself.
|
|
|
|
if s.name == name {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
// Return error if peer doesn't exist.
|
|
|
|
peer := s.peers[name]
|
2013-06-07 05:58:41 +00:00
|
|
|
if peer == nil {
|
2013-06-05 17:57:31 +00:00
|
|
|
return fmt.Errorf("raft: Peer not found: %s", name)
|
|
|
|
}
|
|
|
|
|
2013-06-07 05:58:41 +00:00
|
|
|
// Flush entries to the peer first.
|
|
|
|
if s.state == Leader {
|
2013-06-24 16:52:51 +00:00
|
|
|
if _, _, err := peer.flush(); err != nil {
|
2013-06-07 05:58:41 +00:00
|
|
|
warn("raft: Unable to notify peer of removal: %v", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-06-05 17:57:31 +00:00
|
|
|
// Stop peer and remove it.
|
|
|
|
peer.stop()
|
|
|
|
delete(s.peers, name)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
2013-06-06 03:32:52 +00:00
|
|
|
|
2013-06-03 21:58:12 +00:00
|
|
|
//--------------------------------------
|
|
|
|
// Log compaction
|
|
|
|
//--------------------------------------
|
|
|
|
|
|
|
|
// Creates a snapshot request.
|
|
|
|
func (s *Server) createSnapshotRequest() *SnapshotRequest {
|
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
|
|
|
return NewSnapshotRequest(s.name, s.lastSnapshot)
|
|
|
|
}
|
|
|
|
|
2013-06-05 05:56:59 +00:00
|
|
|
// The background snapshot function
|
2013-06-05 00:02:45 +00:00
|
|
|
func (s *Server) Snapshot() {
|
|
|
|
for {
|
2013-06-05 05:56:59 +00:00
|
|
|
// TODO: change this... to something reasonable
|
2013-06-12 16:47:48 +00:00
|
|
|
time.Sleep(60 * time.Second)
|
2013-06-23 18:42:31 +00:00
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
s.takeSnapshot()
|
2013-06-05 00:02:45 +00:00
|
|
|
}
|
|
|
|
}
|
2013-06-03 21:58:12 +00:00
|
|
|
|
2013-06-05 00:02:45 +00:00
|
|
|
func (s *Server) takeSnapshot() error {
|
2013-06-03 21:58:12 +00:00
|
|
|
//TODO put a snapshot mutex
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln("take Snapshot")
|
2013-06-03 21:58:12 +00:00
|
|
|
if s.currentSnapshot != nil {
|
|
|
|
return errors.New("handling snapshot")
|
|
|
|
}
|
2013-06-05 05:56:59 +00:00
|
|
|
|
2013-06-03 21:58:12 +00:00
|
|
|
lastIndex, lastTerm := s.log.CommitInfo()
|
|
|
|
|
|
|
|
if lastIndex == 0 || lastTerm == 0 {
|
|
|
|
return errors.New("No logs")
|
|
|
|
}
|
|
|
|
|
|
|
|
path := s.SnapshotPath(lastIndex, lastTerm)
|
|
|
|
|
2013-06-12 16:47:48 +00:00
|
|
|
var state []byte
|
|
|
|
var err error
|
2013-06-06 04:14:07 +00:00
|
|
|
|
2013-06-12 16:47:48 +00:00
|
|
|
if s.stateMachine != nil {
|
|
|
|
state, err = s.stateMachine.Save()
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
} else {
|
|
|
|
state = []byte{0}
|
2013-06-06 04:14:07 +00:00
|
|
|
}
|
|
|
|
|
2013-06-12 16:47:48 +00:00
|
|
|
var peerNames []string
|
|
|
|
|
|
|
|
for _, peer := range s.peers {
|
|
|
|
peerNames = append(peerNames, peer.Name())
|
2013-06-06 04:14:07 +00:00
|
|
|
}
|
2013-06-12 16:47:48 +00:00
|
|
|
peerNames = append(peerNames, s.Name())
|
2013-06-06 04:14:07 +00:00
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peerNames, state, path}
|
2013-06-03 21:58:12 +00:00
|
|
|
|
2013-06-05 00:02:45 +00:00
|
|
|
s.saveSnapshot()
|
2013-06-05 05:56:59 +00:00
|
|
|
|
2013-06-05 17:38:49 +00:00
|
|
|
s.log.Compact(lastIndex, lastTerm)
|
2013-06-05 05:56:59 +00:00
|
|
|
|
2013-06-03 21:58:12 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Retrieves the log path for the server.
|
2013-06-05 00:02:45 +00:00
|
|
|
func (s *Server) saveSnapshot() error {
|
2013-06-05 05:56:59 +00:00
|
|
|
|
2013-06-03 21:58:12 +00:00
|
|
|
if s.currentSnapshot == nil {
|
|
|
|
return errors.New("no snapshot to save")
|
|
|
|
}
|
|
|
|
|
|
|
|
err := s.currentSnapshot.Save()
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2013-06-08 02:19:18 +00:00
|
|
|
|
2013-06-03 21:58:12 +00:00
|
|
|
tmp := s.lastSnapshot
|
|
|
|
s.lastSnapshot = s.currentSnapshot
|
2013-06-05 05:56:59 +00:00
|
|
|
|
|
|
|
// delete the previous snapshot if there is any change
|
2013-06-12 16:47:48 +00:00
|
|
|
if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) {
|
2013-06-05 00:02:45 +00:00
|
|
|
tmp.Remove()
|
|
|
|
}
|
2013-06-03 21:58:12 +00:00
|
|
|
s.currentSnapshot = nil
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2013-06-05 00:02:45 +00:00
|
|
|
// Retrieves the log path for the server.
|
|
|
|
func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
|
2013-06-05 17:38:49 +00:00
|
|
|
return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
|
2013-06-05 00:02:45 +00:00
|
|
|
}
|
|
|
|
|
2013-06-08 02:19:18 +00:00
|
|
|
func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, error) {
|
2013-06-03 21:58:12 +00:00
|
|
|
//
|
|
|
|
s.mutex.Lock()
|
|
|
|
defer s.mutex.Unlock()
|
2013-06-05 05:56:59 +00:00
|
|
|
|
2013-06-06 04:14:07 +00:00
|
|
|
s.stateMachine.Recovery(req.State)
|
2013-06-03 21:58:12 +00:00
|
|
|
|
2013-06-12 16:47:48 +00:00
|
|
|
//recovery the cluster configuration
|
|
|
|
for _, peerName := range req.Peers {
|
|
|
|
s.AddPeer(peerName)
|
|
|
|
}
|
|
|
|
|
2013-06-03 21:58:12 +00:00
|
|
|
//update term and index
|
2013-06-06 03:25:17 +00:00
|
|
|
s.currentTerm = req.LastTerm
|
2013-06-12 16:47:48 +00:00
|
|
|
|
2013-06-06 03:25:17 +00:00
|
|
|
s.log.UpdateCommitIndex(req.LastIndex)
|
2013-06-12 16:47:48 +00:00
|
|
|
|
2013-06-06 03:25:17 +00:00
|
|
|
snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm)
|
2013-06-12 16:47:48 +00:00
|
|
|
|
|
|
|
s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, snapshotPath}
|
2013-06-24 16:52:51 +00:00
|
|
|
|
2013-06-08 02:19:18 +00:00
|
|
|
s.saveSnapshot()
|
2013-06-24 16:52:51 +00:00
|
|
|
|
2013-06-06 03:25:17 +00:00
|
|
|
s.log.Compact(req.LastIndex, req.LastTerm)
|
2013-06-05 00:02:45 +00:00
|
|
|
|
2013-06-06 03:25:17 +00:00
|
|
|
return NewSnapshotResponse(req.LastTerm, true, req.LastIndex), nil
|
2013-06-03 21:58:12 +00:00
|
|
|
|
|
|
|
}
|
|
|
|
|
2013-06-05 05:56:59 +00:00
|
|
|
// Load a snapshot at restart
|
2013-06-03 21:58:12 +00:00
|
|
|
func (s *Server) LoadSnapshot() error {
|
2013-06-05 17:38:49 +00:00
|
|
|
dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
|
2013-06-03 21:58:12 +00:00
|
|
|
if err != nil {
|
2013-06-24 16:52:51 +00:00
|
|
|
|
2013-06-12 16:47:48 +00:00
|
|
|
return err
|
2013-06-03 21:58:12 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
filenames, err := dir.Readdirnames(-1)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
dir.Close()
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
dir.Close()
|
|
|
|
if len(filenames) == 0 {
|
|
|
|
return errors.New("no snapshot")
|
|
|
|
}
|
|
|
|
|
|
|
|
// not sure how many snapshot we should keep
|
|
|
|
sort.Strings(filenames)
|
2013-06-08 02:19:18 +00:00
|
|
|
snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
|
2013-06-03 21:58:12 +00:00
|
|
|
|
|
|
|
// should not file
|
|
|
|
file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
|
|
|
|
defer file.Close()
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
2013-06-24 16:52:51 +00:00
|
|
|
// TODO check checksum first
|
2013-06-05 05:56:59 +00:00
|
|
|
|
2013-06-12 16:47:48 +00:00
|
|
|
var snapshotBytes []byte
|
|
|
|
var checksum []byte
|
2013-06-06 20:54:27 +00:00
|
|
|
|
2013-06-12 16:47:48 +00:00
|
|
|
n, err := fmt.Fscanf(file, "%08x\n", &checksum)
|
2013-06-03 21:58:12 +00:00
|
|
|
|
|
|
|
if err != nil {
|
2013-06-06 04:14:07 +00:00
|
|
|
return err
|
2013-06-03 21:58:12 +00:00
|
|
|
}
|
|
|
|
|
2013-06-12 16:47:48 +00:00
|
|
|
if n != 1 {
|
2013-06-06 04:14:07 +00:00
|
|
|
return errors.New("Bad snapshot file")
|
2013-06-03 21:58:12 +00:00
|
|
|
}
|
|
|
|
|
2013-06-12 16:47:48 +00:00
|
|
|
snapshotBytes, _ = ioutil.ReadAll(file)
|
2013-06-25 20:11:48 +00:00
|
|
|
debugln(string(snapshotBytes))
|
2013-06-12 16:47:48 +00:00
|
|
|
|
|
|
|
err = json.Unmarshal(snapshotBytes, &s.lastSnapshot)
|
2013-06-06 20:54:27 +00:00
|
|
|
|
2013-06-06 04:14:07 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2013-06-12 16:47:48 +00:00
|
|
|
err = s.stateMachine.Recovery(s.lastSnapshot.State)
|
|
|
|
|
|
|
|
for _, peerName := range s.lastSnapshot.Peers {
|
|
|
|
s.AddPeer(peerName)
|
|
|
|
}
|
2013-06-06 04:14:07 +00:00
|
|
|
|
2013-06-12 16:47:48 +00:00
|
|
|
s.log.SetStartTerm(s.lastSnapshot.LastTerm)
|
|
|
|
s.log.SetStartIndex(s.lastSnapshot.LastIndex)
|
|
|
|
s.log.UpdateCommitIndex(s.lastSnapshot.LastIndex)
|
2013-06-05 00:02:45 +00:00
|
|
|
|
2013-06-03 21:58:12 +00:00
|
|
|
return err
|
|
|
|
}
|