influxdb/server.go

1117 lines
27 KiB
Go
Raw Normal View History

2013-04-14 21:37:33 +00:00
package raft
2013-04-17 02:28:08 +00:00
import (
2013-06-24 16:52:51 +00:00
"encoding/json"
2013-04-17 02:28:08 +00:00
"errors"
"fmt"
2013-07-01 00:55:54 +00:00
"hash/crc32"
2013-06-08 02:19:18 +00:00
"io/ioutil"
2013-06-03 21:58:12 +00:00
"os"
"path"
2013-06-08 02:19:18 +00:00
"sort"
"sync"
"time"
2013-04-17 02:28:08 +00:00
)
2013-04-14 21:37:33 +00:00
//------------------------------------------------------------------------------
//
// Constants
//
//------------------------------------------------------------------------------
const (
2013-04-17 02:32:49 +00:00
Stopped = "stopped"
Follower = "follower"
Candidate = "candidate"
Leader = "leader"
)
const (
2013-04-28 04:51:17 +00:00
DefaultHeartbeatTimeout = 50 * time.Millisecond
2013-04-28 21:23:21 +00:00
DefaultElectionTimeout = 150 * time.Millisecond
2013-04-14 21:37:33 +00:00
)
//------------------------------------------------------------------------------
//
// Errors
//
//------------------------------------------------------------------------------
var NotLeaderError = errors.New("raft.Server: Not current leader")
var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
2013-04-14 21:37:33 +00:00
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
// A server is involved in the consensus protocol and can act as a follower,
// candidate or a leader.
type Server struct {
2013-06-24 16:52:51 +00:00
name string
path string
state string
transporter Transporter
context interface{}
currentTerm uint64
votedFor string
log *Log
leader string
peers map[string]*Peer
mutex sync.Mutex
2013-06-08 02:19:18 +00:00
electionTimer *Timer
heartbeatTimeout time.Duration
response chan FlushResponse
2013-06-24 16:52:51 +00:00
stepDown chan uint64
2013-07-03 16:53:46 +00:00
stop chan bool
2013-06-24 16:52:51 +00:00
currentSnapshot *Snapshot
lastSnapshot *Snapshot
stateMachine StateMachine
2013-04-14 21:37:33 +00:00
}
//------------------------------------------------------------------------------
//
// Constructor
//
//------------------------------------------------------------------------------
2013-04-17 02:28:08 +00:00
// Creates a new server with a log at the given path.
2013-06-12 16:47:48 +00:00
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}) (*Server, error) {
2013-04-17 02:28:08 +00:00
if name == "" {
return nil, errors.New("raft.Server: Name cannot be blank")
}
2013-05-28 19:57:38 +00:00
if transporter == nil {
2013-06-08 02:41:36 +00:00
panic("raft: Transporter required")
2013-05-28 19:57:38 +00:00
}
2013-04-17 02:28:08 +00:00
s := &Server{
2013-05-10 14:47:24 +00:00
name: name,
path: path,
2013-05-28 19:57:38 +00:00
transporter: transporter,
2013-06-12 16:47:48 +00:00
stateMachine: stateMachine,
2013-06-03 02:43:40 +00:00
context: context,
2013-05-10 14:47:24 +00:00
state: Stopped,
peers: make(map[string]*Peer),
log: NewLog(),
stepDown: make(chan uint64),
2013-07-03 16:53:46 +00:00
stop: make(chan bool),
2013-05-10 14:47:24 +00:00
electionTimer: NewTimer(DefaultElectionTimeout, DefaultElectionTimeout*2),
2013-05-05 20:26:04 +00:00
heartbeatTimeout: DefaultHeartbeatTimeout,
2013-04-14 21:37:33 +00:00
}
2013-04-30 04:13:50 +00:00
// Setup apply function.
s.log.ApplyFunc = func(c Command) (interface{}, error) {
result, err := c.Apply(s)
return result, err
2013-04-30 04:13:50 +00:00
}
2013-04-17 02:28:08 +00:00
return s, nil
}
//------------------------------------------------------------------------------
//
// Accessors
//
//------------------------------------------------------------------------------
2013-05-01 05:21:56 +00:00
//--------------------------------------
// General
//--------------------------------------
2013-04-17 02:28:08 +00:00
// Retrieves the name of the server.
func (s *Server) Name() string {
return s.name
}
// Retrieves the storage path for the server.
func (s *Server) Path() string {
return s.path
}
2013-05-05 19:36:23 +00:00
2013-06-03 13:51:52 +00:00
func (s *Server) Leader() string {
2013-06-03 01:18:25 +00:00
s.mutex.Lock()
defer s.mutex.Unlock()
return s.leader
2013-06-08 02:19:18 +00:00
}
2013-06-26 18:25:22 +00:00
// Retrieves a copy of the peer data.
func (s *Server) Peers() map[string]*Peer {
s.mutex.Lock()
defer s.mutex.Unlock()
peers := make(map[string]*Peer)
for name, peer := range s.peers {
peers[name] = peer.clone()
}
return peers
}
2013-05-28 19:57:38 +00:00
// Retrieves the object that transports requests.
func (s *Server) Transporter() Transporter {
return s.transporter
}
2013-06-11 22:30:13 +00:00
func (s *Server) SetTransporter(t Transporter) {
s.transporter = t
}
2013-06-03 02:43:40 +00:00
// Retrieves the context passed into the constructor.
func (s *Server) Context() interface{} {
return s.context
}
2013-04-17 02:28:08 +00:00
// Retrieves the log path for the server.
func (s *Server) LogPath() string {
return fmt.Sprintf("%s/log", s.path)
}
// Retrieves the current state of the server.
2013-04-17 02:32:49 +00:00
func (s *Server) State() string {
2013-04-17 02:28:08 +00:00
return s.state
2013-04-14 21:37:33 +00:00
}
// Retrieves the current term of the server.
func (s *Server) Term() uint64 {
return s.currentTerm
2013-04-14 21:37:33 +00:00
}
// Retrieves the current committed index of the server.
func (s *Server) CommittedIndex() uint64 {
return s.log.CommitIndex()
}
2013-05-03 04:16:39 +00:00
// Retrieves the name of the candidate this server voted for in this term.
func (s *Server) VotedFor() string {
return s.votedFor
}
2013-05-08 03:56:32 +00:00
// Retrieves whether the server's log has no entries.
func (s *Server) IsLogEmpty() bool {
2013-06-03 19:13:38 +00:00
s.mutex.Lock()
defer s.mutex.Unlock()
2013-05-08 03:56:32 +00:00
return s.log.IsEmpty()
}
2013-05-08 20:22:08 +00:00
// A list of all the log entries. This should only be used for debugging purposes.
func (s *Server) LogEntries() []*LogEntry {
2013-06-03 19:13:38 +00:00
s.mutex.Lock()
defer s.mutex.Unlock()
2013-05-08 20:22:08 +00:00
if s.log != nil {
return s.log.entries
}
return nil
}
2013-06-03 19:13:38 +00:00
// A reference to the command name of the last entry.
func (s *Server) LastCommandName() string {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.log != nil {
return s.log.LastCommandName()
}
return ""
}
// Get the state of the server for debugging
func (s *Server) GetState() string {
return fmt.Sprintf("State: %s, Term: %v, Index: %v ", s.state, s.currentTerm, s.CommittedIndex())
}
2013-05-01 05:21:56 +00:00
//--------------------------------------
// Membership
//--------------------------------------
2013-04-28 04:51:17 +00:00
// Retrieves the number of member servers in the consensus.
2013-05-01 05:11:23 +00:00
func (s *Server) MemberCount() int {
s.mutex.Lock()
defer s.mutex.Unlock()
return len(s.peers) + 1
2013-04-28 04:51:17 +00:00
}
// Retrieves the number of servers required to make a quorum.
2013-05-01 05:11:23 +00:00
func (s *Server) QuorumSize() int {
2013-04-28 04:51:17 +00:00
return (s.MemberCount() / 2) + 1
}
2013-05-01 05:21:56 +00:00
//--------------------------------------
// Election timeout
//--------------------------------------
// Retrieves the election timeout.
func (s *Server) ElectionTimeout() time.Duration {
2013-05-05 20:01:06 +00:00
return s.electionTimer.MinDuration()
2013-05-01 05:21:56 +00:00
}
// Sets the election timeout.
func (s *Server) SetElectionTimeout(duration time.Duration) {
2013-05-05 20:01:06 +00:00
s.electionTimer.SetMinDuration(duration)
s.electionTimer.SetMaxDuration(duration * 2)
2013-05-01 05:21:56 +00:00
}
2013-07-03 16:53:46 +00:00
// Start heartbeat when the server promote to leader
2013-06-12 16:47:48 +00:00
func (s *Server) StartHeartbeatTimeout() {
for _, peer := range s.peers {
peer.StartHeartbeat()
2013-06-12 16:47:48 +00:00
}
}
//--------------------------------------
// Heartbeat timeout
//--------------------------------------
// Retrieves the heartbeat timeout.
func (s *Server) HeartbeatTimeout() time.Duration {
return s.heartbeatTimeout
}
// Sets the heartbeat timeout.
func (s *Server) SetHeartbeatTimeout(duration time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.heartbeatTimeout = duration
for _, peer := range s.peers {
peer.SetHeartbeatTimeout(duration)
}
}
2013-04-14 21:37:33 +00:00
//------------------------------------------------------------------------------
//
// Methods
//
//------------------------------------------------------------------------------
2013-04-17 02:28:08 +00:00
//--------------------------------------
2013-06-24 16:52:51 +00:00
// Initialization
2013-04-17 02:28:08 +00:00
//--------------------------------------
// Starts the server with a log at the given path.
func (s *Server) Initialize() error {
2013-04-17 02:28:08 +00:00
s.mutex.Lock()
defer s.mutex.Unlock()
// Exit if the server is already running.
if s.Running() {
return errors.New("raft.Server: Server already running")
}
2013-04-28 21:23:21 +00:00
2013-06-24 16:52:51 +00:00
// Initialize response channel
s.response = make(chan FlushResponse, 128)
2013-06-03 21:58:12 +00:00
2013-06-24 16:52:51 +00:00
// Create snapshot directory if not exist
2013-06-08 02:19:18 +00:00
os.Mkdir(s.path+"/snapshot", 0700)
2013-06-03 21:58:12 +00:00
2013-04-17 02:28:08 +00:00
// Initialize the log and load it up.
if err := s.log.Open(s.LogPath()); err != nil {
2013-06-25 20:11:48 +00:00
debugln("log error")
2013-04-17 02:28:08 +00:00
s.unload()
2013-04-28 21:23:21 +00:00
return fmt.Errorf("raft.Server: %v", err)
2013-04-17 02:28:08 +00:00
}
2013-05-10 03:50:57 +00:00
// Update the term to the last term in the log.
s.currentTerm = s.log.CurrentTerm()
2013-06-24 16:52:51 +00:00
return nil
}
2013-07-03 16:53:46 +00:00
// timeout
// ______
// | |
// | |
// v | recv majority votes
// -------- timeout ----------- -----------
// |Follower| ----------> | Candidate |--------------------> | Leader |
// -------- ----------- -----------
// ^ stepDown | stepDown |
// |_______________________|____________________________________ |
//
// The main Loop for the server
func (s *Server) StartServerLoop(role string) {
stop := false
leader := false
defer debugln("server stopped!")
for {
switch role {
case Follower:
stop = s.startFollowerLoop()
if stop {
return
}
role = Candidate
case Candidate:
stop, leader = s.startCandidateLoop()
s.votedFor = ""
if stop {
return
}
if leader {
role = Leader
} else {
role = Follower
}
case Leader:
stop = s.startLeaderLoop()
if stop {
return
}
role = Follower
}
}
}
2013-06-24 16:52:51 +00:00
// Start the sever as a follower
func (s *Server) StartFollower() {
2013-07-03 16:53:46 +00:00
go s.StartServerLoop(Follower)
2013-06-27 00:12:44 +00:00
2013-06-24 16:52:51 +00:00
}
// Start the sever as a leader
2013-07-03 16:53:46 +00:00
func (s *Server) StartLeader() {
s.state = Candidate
go s.StartServerLoop(Leader)
}
// Shuts down the server.
func (s *Server) Stop() {
2013-06-24 16:52:51 +00:00
s.mutex.Lock()
2013-07-03 16:53:46 +00:00
if s.state == Follower {
s.electionTimer.Stop()
} else {
s.mutex.Unlock()
2013-07-03 16:53:46 +00:00
s.stop <- true
}
s.unload()
}
// Unloads the server.
func (s *Server) unload() {
// Kill the election timer.
s.state = Stopped
// wait for all previous flush ends
time.Sleep(100 * time.Millisecond)
// Close the log.
if s.log != nil {
s.log.Close()
s.log = nil
}
}
// Checks if the server is currently running.
func (s *Server) Running() bool {
return s.state != Stopped
}
// Respond to RPCs from candidates and leaders.
// Convert to candidate if election timeout elapses without
// either:
// 1.Receiving valid AppendEntries RPC, or
// 2.Granting vote to candidate
func (s *Server) startFollowerLoop() (stop bool) {
s.state = Follower
// (1) Timeout: promote and return
// (2) Stopped: due to receive heartbeat, continue
for {
if s.State() == Stopped {
return true
}
if s.electionTimer.Start() {
return false
} else {
s.electionTimer.Ready()
continue
}
}
}
// Increment currentTerm, vote for self
// Reset election timeout
// Send RequestVote RPCs to all other servers, wait for either:
// Votes received from majority of servers: become leader
// AppendEntries RPC received from new leader: step
// down
// Election timeout elapses without election resolution:
// increment term, start new election
// Discover higher term: step down
func (s *Server) startCandidateLoop() (stop bool, leader bool) {
if s.state != Follower && s.state != Stopped {
panic("startCandidateLoop")
}
s.state = Candidate
s.leader = ""
s.votedFor = s.Name()
lastLogIndex, lastLogTerm := s.log.LastInfo()
for {
// increase term
s.currentTerm++
// Request votes from each of our peers.
c := make(chan *RequestVoteResponse, len(s.peers))
req := NewRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm)
for _, peer := range s.peers {
go peer.sendVoteRequest(req, c)
}
// collectVotes
elected, timeout, stop := s.startCandidateSelect(c)
// If we received enough votes then promote to leader and stop this election.
if elected {
return false, true
}
if timeout {
debugln(s.Name(), " election timeout, restart")
// restart promotion
continue
}
2013-06-24 16:52:51 +00:00
2013-07-03 16:53:46 +00:00
if stop {
return true, false
}
return false, false
}
}
// Initialize nextIndex for each to last log index + 1
// Send initial empty AppendEntries RPCs (heartbeat) to each
// follower; repeat during idle periods to prevent election
// timeouts (§5.2)
// Accept commands from clients, append new entries to local
// log (§5.3)
// Whenever last log index ! nextIndex for a follower, send
// AppendEntries RPC with log entries starting at nextIndex,
// update nextIndex if successful (§5.3)
// If AppendEntries fails because of log inconsistency,
// decrement nextIndex and retry (§5.3)
// Mark entries committed if stored on a majority of servers
// and some entry from current term is stored on a majority of
// servers. Apply newly committed entries to state machine.
// Step down if currentTerm changes (§5.5)
func (s *Server) startLeaderLoop() bool {
if s.state != Candidate && s.state != Stopped {
panic(s.Name() + " promote to leader but not candidate " + s.state)
}
s.state = Leader
logIndex, _ := s.log.LastInfo()
// Move server to become a leader and begin peer heartbeats.
2013-06-24 16:52:51 +00:00
s.state = Leader
s.leader = s.name
2013-07-03 16:53:46 +00:00
// Update the peers prevLogIndex to leader's lastLogIndex
// Start heartbeat
for _, peer := range s.peers {
2013-04-17 02:28:08 +00:00
2013-07-03 16:53:46 +00:00
debugln("[Leader] Set ", peer.Name(), "Prev to", logIndex)
2013-04-17 02:28:08 +00:00
2013-07-03 16:53:46 +00:00
peer.prevLogIndex = logIndex
peer.heartbeatTimer.Ready()
peer.StartHeartbeat()
}
2013-07-01 15:46:53 +00:00
2013-07-03 16:53:46 +00:00
// Begin to collect response from followers
stop := s.startLeaderSelect()
{
for _, peer := range s.peers {
peer.stop()
}
}
return stop
}
// Votes received from majority of servers: become leader
// Election timeout elapses without election resolution:
// Discover higher term: step down
func (s *Server) startCandidateSelect(c chan *RequestVoteResponse) (bool, bool, bool) {
2013-07-01 15:46:53 +00:00
// Collect votes until we have a quorum.
votesGranted := 1
for {
// If we received enough votes then stop waiting for more votes.
if votesGranted >= s.QuorumSize() {
2013-07-03 16:53:46 +00:00
return true, false, false
2013-07-01 15:46:53 +00:00
}
// Collect votes from peers.
select {
case resp := <-c:
if resp != nil {
if resp.VoteGranted == true {
votesGranted++
2013-07-03 16:53:46 +00:00
} else if resp.Term > s.currentTerm {
2013-07-03 01:22:37 +00:00
s.currentTerm = resp.Term
2013-07-03 16:53:46 +00:00
return false, false, false
2013-07-01 15:46:53 +00:00
}
}
2013-07-03 16:53:46 +00:00
case term := <-s.stepDown:
2013-07-03 01:22:37 +00:00
s.currentTerm = term
2013-07-03 16:53:46 +00:00
return false, false, false
2013-07-01 15:46:53 +00:00
// TODO: do we calculate the overall timeout? or timeout for each vote?
2013-07-03 16:53:46 +00:00
// Some issue here
2013-07-01 15:46:53 +00:00
case <-afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2):
2013-07-03 16:53:46 +00:00
return false, true, false
case <-s.stop:
return false, false, true
2013-07-01 15:46:53 +00:00
}
}
}
2013-06-24 16:52:51 +00:00
// Collect response from followers. If more than the
// majority of the followers append a log entry, the
// leader will commit the log entry
2013-07-03 16:53:46 +00:00
func (s *Server) startLeaderSelect() bool {
count := 1
for {
2013-06-24 16:52:51 +00:00
var response FlushResponse
select {
2013-06-24 16:52:51 +00:00
case response = <-s.response:
// count for success response from peers
2013-06-27 20:10:52 +00:00
if response.success && response.peer != nil {
count++
}
2013-06-24 16:52:51 +00:00
case term := <-s.stepDown:
2013-07-03 01:22:37 +00:00
// stepdown to follower
s.currentTerm = term
2013-07-03 16:53:46 +00:00
return false
2013-07-03 01:22:37 +00:00
2013-07-03 16:53:46 +00:00
case <-s.stop:
return true
}
if response.peer != nil {
2013-06-26 18:57:16 +00:00
debugln("[CommitCenter] Receive response from ", response.peer.Name(), response.success)
}
2013-06-27 20:10:52 +00:00
// At least one entry from the leader's current term must also be stored on
// a majority of servers
if count >= s.QuorumSize() {
// Determine the committed index that a majority has.
var indices []uint64
indices = append(indices, s.log.CurrentIndex())
for _, peer := range s.peers {
indices = append(indices, peer.prevLogIndex)
}
sort.Sort(Uint64Slice(indices))
// We can commit upto the index which the mojarity
// of the members have appended.
commitIndex := indices[s.QuorumSize()-1]
committedIndex := s.log.CommitIndex()
if commitIndex > committedIndex {
debugln(indices)
debugln(s.GetState(), "[CommitCenter] Going to Commit ", commitIndex)
s.log.SetCommitIndex(commitIndex)
debugln("[CommitCenter] Commit ", commitIndex)
for i := committedIndex; i < commitIndex; i++ {
select {
case s.log.entries[i-s.log.startIndex].commit <- true:
debugln("notify")
continue
2013-06-27 00:12:44 +00:00
// we have a buffered commit channel, it should return immediately
// if we are the leader when the log received
default:
debugln("Cannot send commit nofication, log from previous leader")
}
}
}
}
}
}
2013-04-17 02:28:08 +00:00
//--------------------------------------
// Commands
//--------------------------------------
2013-04-28 04:51:17 +00:00
// Attempts to execute a command and replicate it. The function will return
// when the command has been successfully committed or an error has occurred.
func (s *Server) Do(command Command) (interface{}, error) {
if s.state != Leader {
return nil, NotLeaderError
}
2013-05-27 02:06:08 +00:00
2013-05-05 19:36:23 +00:00
entry := s.log.CreateEntry(s.currentTerm, command)
if err := s.log.AppendEntry(entry); err != nil {
return nil, err
2013-05-05 19:36:23 +00:00
}
2013-05-05 20:26:04 +00:00
s.response <- FlushResponse{s.currentTerm, true, nil, nil}
2013-05-05 19:36:23 +00:00
2013-06-24 03:41:43 +00:00
// to speed up the response time
// TODO: think about this carefully
// fire will speed up response time
// but will reduce through output
// for _, peer := range s.peers {
// peer.heartbeatTimer.Fire()
// }
2013-06-27 20:10:52 +00:00
2013-06-25 20:11:48 +00:00
debugln("[Do] join!")
2013-06-24 03:41:43 +00:00
// timeout here
select {
2013-06-24 16:52:51 +00:00
case <-entry.commit:
2013-06-25 20:11:48 +00:00
debugln("[Do] finish!")
result := entry.result
entry.result = nil
return result, nil
2013-06-24 16:52:51 +00:00
case <-time.After(time.Second):
2013-06-25 20:11:48 +00:00
debugln("[Do] fail!")
return nil, errors.New("Command commit fails")
2013-05-05 19:36:23 +00:00
}
2013-04-28 04:51:17 +00:00
}
// Appends a log entry from the leader to this server.
2013-04-30 04:13:50 +00:00
func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesResponse, error) {
2013-04-28 04:51:17 +00:00
s.mutex.Lock()
defer s.mutex.Unlock()
// If the server is stopped then reject it.
if !s.Running() {
2013-05-10 14:47:24 +00:00
return NewAppendEntriesResponse(s.currentTerm, false, 0), fmt.Errorf("raft.Server: Server stopped")
}
2013-04-30 04:13:50 +00:00
// If the request is coming from an old term then reject it.
2013-04-30 04:13:50 +00:00
if req.Term < s.currentTerm {
2013-05-10 14:47:24 +00:00
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), fmt.Errorf("raft.Server: Stale request term")
2013-04-30 04:13:50 +00:00
}
2013-06-24 16:52:51 +00:00
2013-06-25 20:11:48 +00:00
debugln("Peer ", s.Name(), "received heartbeat from ", req.LeaderName,
2013-06-24 16:52:51 +00:00
" ", req.Term, " ", s.currentTerm, " ", time.Now())
2013-04-30 04:13:50 +00:00
s.setCurrentTerm(req.Term)
2013-06-08 02:19:18 +00:00
2013-06-03 13:51:52 +00:00
// Update the current leader.
s.leader = req.LeaderName
2013-04-30 04:13:50 +00:00
// Reset election timeout.
2013-06-03 23:16:50 +00:00
if s.electionTimer != nil {
s.electionTimer.Stop()
2013-06-03 23:16:50 +00:00
}
2013-06-08 02:19:18 +00:00
2013-04-30 04:13:50 +00:00
// Reject if log doesn't contain a matching previous entry.
if err := s.log.Truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
2013-05-10 14:47:24 +00:00
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err
2013-04-30 04:13:50 +00:00
}
2013-06-25 20:11:48 +00:00
debugln("Peer ", s.Name(), "after truncate ")
2013-06-24 03:41:43 +00:00
2013-04-30 04:13:50 +00:00
// Append entries to the log.
if err := s.log.AppendEntries(req.Entries); err != nil {
2013-05-10 14:47:24 +00:00
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err
2013-04-30 04:13:50 +00:00
}
debugln("Peer ", s.Name(), "commit index ", req.CommitIndex, " from ",
req.LeaderName)
2013-04-30 04:13:50 +00:00
// Commit up to the commit index.
if err := s.log.SetCommitIndex(req.CommitIndex); err != nil {
2013-05-10 14:47:24 +00:00
return NewAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), err
2013-06-08 02:19:18 +00:00
}
2013-04-30 04:13:50 +00:00
2013-06-25 20:11:48 +00:00
debugln("Peer ", s.Name(), "after commit ")
2013-06-24 03:41:43 +00:00
2013-06-25 20:11:48 +00:00
debugln("Peer ", s.Name(), "reply heartbeat from ", req.LeaderName,
2013-06-24 16:52:51 +00:00
" ", req.Term, " ", s.currentTerm, " ", time.Now())
2013-05-10 14:47:24 +00:00
return NewAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex()), nil
2013-04-28 04:51:17 +00:00
}
2013-06-25 21:41:42 +00:00
// Creates an AppendEntries request. Can return a nil request object if the
// index doesn't exist because of a snapshot.
2013-05-28 19:57:38 +00:00
func (s *Server) createAppendEntriesRequest(prevLogIndex uint64) *AppendEntriesRequest {
2013-05-05 19:36:23 +00:00
if s.log == nil {
2013-05-28 19:57:38 +00:00
return nil
2013-05-05 19:36:23 +00:00
}
entries, prevLogTerm := s.log.GetEntriesAfter(prevLogIndex)
2013-06-25 21:41:42 +00:00
if entries != nil {
return NewAppendEntriesRequest(s.currentTerm, s.name, prevLogIndex, prevLogTerm, entries, s.log.CommitIndex())
} else {
return nil
}
2013-05-05 19:36:23 +00:00
}
2013-05-01 05:11:23 +00:00
//--------------------------------------
// Request Vote
2013-04-28 21:23:21 +00:00
//--------------------------------------
// Requests a vote from a server. A vote can be obtained if the vote's term is
// at the server's current term and the server has not made a vote yet. A vote
// can also be obtained if the term is greater than the server's current term.
func (s *Server) RequestVote(req *RequestVoteRequest) (*RequestVoteResponse, error) {
2013-04-28 21:23:21 +00:00
s.mutex.Lock()
defer s.mutex.Unlock()
2013-06-25 20:11:48 +00:00
debugln("Peer ", s.Name(), "receive vote request from ", req.CandidateName)
//debugln("[RequestVote] got the lock")
2013-05-08 03:56:32 +00:00
// Fail if the server is not running.
if !s.Running() {
return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Server is stopped")
}
2013-04-28 21:23:21 +00:00
// If the request is coming from an old term then reject it.
if req.Term < s.currentTerm {
return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Stale term: %v < %v", req.Term, s.currentTerm)
2013-04-28 21:23:21 +00:00
}
2013-07-03 01:22:37 +00:00
2013-04-30 04:13:50 +00:00
s.setCurrentTerm(req.Term)
2013-04-28 21:23:21 +00:00
// If we've already voted for a different candidate then don't vote for this candidate.
if s.votedFor != "" && s.votedFor != req.CandidateName {
debugln("already vote for ", s.votedFor, " false to ", req.CandidateName)
return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Already voted for %v", s.votedFor)
2013-04-28 21:23:21 +00:00
}
2013-06-24 16:52:51 +00:00
// If the candidate's log is not at least as up-to-date as
// our last log then don't vote.
lastIndex, lastTerm := s.log.LastInfo()
if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
2013-07-03 16:53:46 +00:00
debugln("my log is more up-to-date")
return NewRequestVoteResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Out-of-date log: [%v/%v] > [%v/%v]", lastIndex, lastTerm, req.LastLogIndex, req.LastLogTerm)
2013-04-28 22:49:52 +00:00
}
2013-04-28 21:23:21 +00:00
2013-04-28 22:49:52 +00:00
// If we made it this far then cast a vote and reset our election time out.
s.votedFor = req.CandidateName
2013-06-24 16:52:51 +00:00
debugln(s.Name(), "Vote for ", req.CandidateName, "at term", req.Term)
2013-06-24 16:52:51 +00:00
2013-06-03 23:16:50 +00:00
if s.electionTimer != nil {
s.electionTimer.Stop()
2013-06-03 23:16:50 +00:00
}
2013-06-24 16:52:51 +00:00
return NewRequestVoteResponse(s.currentTerm, true), nil
2013-04-28 21:23:21 +00:00
}
2013-06-24 16:52:51 +00:00
// Updates the current term on the server if the term is greater than the
2013-04-30 04:13:50 +00:00
// server's current term. When the term is changed then the server's vote is
// cleared and its state is changed to be a follower.
func (s *Server) setCurrentTerm(term uint64) {
if term > s.currentTerm {
s.votedFor = ""
2013-06-24 16:52:51 +00:00
2013-07-03 16:53:46 +00:00
if s.state == Leader || s.state == Candidate {
2013-07-03 01:22:37 +00:00
debugln(s.Name(), " should step down to a follower from ", s.state)
2013-06-24 16:52:51 +00:00
2013-07-03 01:22:37 +00:00
s.stepDown <- term
2013-07-03 16:53:46 +00:00
s.state = Follower
2013-07-03 01:22:37 +00:00
debugln(s.Name(), " step down to a follower from ", s.state)
return
}
// update term after stop all the peer
s.currentTerm = term
}
}
2013-04-17 02:32:49 +00:00
//--------------------------------------
// Membership
//--------------------------------------
// Adds a peer to the server. This should be called by a system's join command
// within the context so that it is within the context of the server lock.
func (s *Server) AddPeer(name string) error {
// Do not allow peers to be added twice.
if s.peers[name] != nil {
return DuplicatePeerError
2013-04-17 02:32:49 +00:00
}
2013-04-28 21:23:21 +00:00
// Only add the peer if it doesn't have the same name.
if s.name != name {
2013-06-25 20:11:48 +00:00
//debugln("Add peer ", name)
peer := NewPeer(s, name, s.heartbeatTimeout)
2013-06-04 13:35:43 +00:00
if s.state == Leader {
peer.StartHeartbeat()
2013-06-04 13:35:43 +00:00
}
s.peers[peer.name] = peer
2013-04-28 21:23:21 +00:00
2013-06-05 00:02:45 +00:00
}
return nil
2013-04-17 02:32:49 +00:00
}
// Removes a peer from the server. This should be called by a system's join command
// within the context so that it is within the context of the server lock.
func (s *Server) RemovePeer(name string) error {
// Ignore removal of the server itself.
if s.name == name {
return nil
}
// Return error if peer doesn't exist.
peer := s.peers[name]
2013-06-07 05:58:41 +00:00
if peer == nil {
return fmt.Errorf("raft: Peer not found: %s", name)
}
2013-06-07 05:58:41 +00:00
// Flush entries to the peer first.
if s.state == Leader {
2013-06-24 16:52:51 +00:00
if _, _, err := peer.flush(); err != nil {
2013-06-07 05:58:41 +00:00
warn("raft: Unable to notify peer of removal: %v", err)
}
}
// Stop peer and remove it.
peer.stop()
delete(s.peers, name)
return nil
}
2013-06-06 03:32:52 +00:00
2013-06-03 21:58:12 +00:00
//--------------------------------------
// Log compaction
//--------------------------------------
// Creates a snapshot request.
func (s *Server) createSnapshotRequest() *SnapshotRequest {
s.mutex.Lock()
defer s.mutex.Unlock()
return NewSnapshotRequest(s.name, s.lastSnapshot)
}
2013-06-05 05:56:59 +00:00
// The background snapshot function
2013-06-05 00:02:45 +00:00
func (s *Server) Snapshot() {
for {
2013-06-05 05:56:59 +00:00
// TODO: change this... to something reasonable
2013-06-12 16:47:48 +00:00
time.Sleep(60 * time.Second)
2013-06-24 16:52:51 +00:00
s.takeSnapshot()
2013-06-05 00:02:45 +00:00
}
}
2013-06-03 21:58:12 +00:00
2013-06-05 00:02:45 +00:00
func (s *Server) takeSnapshot() error {
2013-06-03 21:58:12 +00:00
//TODO put a snapshot mutex
2013-06-25 20:11:48 +00:00
debugln("take Snapshot")
2013-06-03 21:58:12 +00:00
if s.currentSnapshot != nil {
return errors.New("handling snapshot")
}
2013-06-05 05:56:59 +00:00
2013-06-03 21:58:12 +00:00
lastIndex, lastTerm := s.log.CommitInfo()
if lastIndex == 0 || lastTerm == 0 {
return errors.New("No logs")
}
path := s.SnapshotPath(lastIndex, lastTerm)
2013-06-12 16:47:48 +00:00
var state []byte
var err error
2013-06-06 04:14:07 +00:00
2013-06-12 16:47:48 +00:00
if s.stateMachine != nil {
state, err = s.stateMachine.Save()
if err != nil {
return err
}
} else {
state = []byte{0}
2013-06-06 04:14:07 +00:00
}
2013-06-12 16:47:48 +00:00
var peerNames []string
for _, peer := range s.peers {
peerNames = append(peerNames, peer.Name())
2013-06-06 04:14:07 +00:00
}
2013-06-12 16:47:48 +00:00
peerNames = append(peerNames, s.Name())
2013-06-06 04:14:07 +00:00
2013-06-24 16:52:51 +00:00
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peerNames, state, path}
2013-06-03 21:58:12 +00:00
2013-06-05 00:02:45 +00:00
s.saveSnapshot()
2013-06-05 05:56:59 +00:00
s.log.Compact(lastIndex, lastTerm)
2013-06-05 05:56:59 +00:00
2013-06-03 21:58:12 +00:00
return nil
}
// Retrieves the log path for the server.
2013-06-05 00:02:45 +00:00
func (s *Server) saveSnapshot() error {
2013-06-05 05:56:59 +00:00
2013-06-03 21:58:12 +00:00
if s.currentSnapshot == nil {
return errors.New("no snapshot to save")
}
err := s.currentSnapshot.Save()
if err != nil {
return err
}
2013-06-08 02:19:18 +00:00
2013-06-03 21:58:12 +00:00
tmp := s.lastSnapshot
s.lastSnapshot = s.currentSnapshot
2013-06-05 05:56:59 +00:00
// delete the previous snapshot if there is any change
2013-06-12 16:47:48 +00:00
if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) {
2013-06-05 00:02:45 +00:00
tmp.Remove()
}
2013-06-03 21:58:12 +00:00
s.currentSnapshot = nil
return nil
}
2013-06-05 00:02:45 +00:00
// Retrieves the log path for the server.
func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
2013-06-05 00:02:45 +00:00
}
2013-06-08 02:19:18 +00:00
func (s *Server) SnapshotRecovery(req *SnapshotRequest) (*SnapshotResponse, error) {
2013-06-03 21:58:12 +00:00
//
s.mutex.Lock()
defer s.mutex.Unlock()
2013-06-05 05:56:59 +00:00
2013-06-06 04:14:07 +00:00
s.stateMachine.Recovery(req.State)
2013-06-03 21:58:12 +00:00
2013-06-12 16:47:48 +00:00
//recovery the cluster configuration
for _, peerName := range req.Peers {
s.AddPeer(peerName)
}
2013-06-03 21:58:12 +00:00
//update term and index
2013-06-06 03:25:17 +00:00
s.currentTerm = req.LastTerm
2013-06-12 16:47:48 +00:00
2013-06-06 03:25:17 +00:00
s.log.UpdateCommitIndex(req.LastIndex)
2013-06-12 16:47:48 +00:00
2013-06-06 03:25:17 +00:00
snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm)
2013-06-12 16:47:48 +00:00
s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, snapshotPath}
2013-06-24 16:52:51 +00:00
2013-06-08 02:19:18 +00:00
s.saveSnapshot()
2013-06-24 16:52:51 +00:00
2013-06-06 03:25:17 +00:00
s.log.Compact(req.LastIndex, req.LastTerm)
2013-06-05 00:02:45 +00:00
2013-06-06 03:25:17 +00:00
return NewSnapshotResponse(req.LastTerm, true, req.LastIndex), nil
2013-06-03 21:58:12 +00:00
}
2013-06-05 05:56:59 +00:00
// Load a snapshot at restart
2013-06-03 21:58:12 +00:00
func (s *Server) LoadSnapshot() error {
dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
2013-06-03 21:58:12 +00:00
if err != nil {
2013-06-24 16:52:51 +00:00
2013-06-12 16:47:48 +00:00
return err
2013-06-03 21:58:12 +00:00
}
filenames, err := dir.Readdirnames(-1)
if err != nil {
dir.Close()
panic(err)
}
dir.Close()
if len(filenames) == 0 {
return errors.New("no snapshot")
}
// not sure how many snapshot we should keep
sort.Strings(filenames)
2013-06-08 02:19:18 +00:00
snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
2013-06-03 21:58:12 +00:00
2013-07-01 02:20:23 +00:00
// should not fail
2013-06-03 21:58:12 +00:00
file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
defer file.Close()
if err != nil {
panic(err)
}
2013-06-24 16:52:51 +00:00
// TODO check checksum first
2013-06-05 05:56:59 +00:00
2013-06-12 16:47:48 +00:00
var snapshotBytes []byte
2013-07-01 02:14:02 +00:00
var checksum uint32
2013-06-06 20:54:27 +00:00
2013-07-01 02:50:48 +00:00
n, err := fmt.Fscanf(file, "%08x\n", &checksum)
2013-06-03 21:58:12 +00:00
if err != nil {
2013-06-06 04:14:07 +00:00
return err
2013-06-03 21:58:12 +00:00
}
2013-06-12 16:47:48 +00:00
if n != 1 {
2013-06-06 04:14:07 +00:00
return errors.New("Bad snapshot file")
2013-06-03 21:58:12 +00:00
}
2013-06-12 16:47:48 +00:00
snapshotBytes, _ = ioutil.ReadAll(file)
2013-06-25 20:11:48 +00:00
debugln(string(snapshotBytes))
2013-06-12 16:47:48 +00:00
2013-07-01 00:55:54 +00:00
// Generate checksum.
byteChecksum := crc32.ChecksumIEEE(snapshotBytes)
if uint32(checksum) != byteChecksum {
2013-07-01 02:14:02 +00:00
debugln(checksum, " ", byteChecksum)
2013-07-01 00:55:54 +00:00
return errors.New("bad snapshot file")
}
2013-06-12 16:47:48 +00:00
err = json.Unmarshal(snapshotBytes, &s.lastSnapshot)
2013-06-06 20:54:27 +00:00
2013-06-06 04:14:07 +00:00
if err != nil {
2013-07-01 02:20:23 +00:00
debugln("unmarshal error: ", err)
2013-06-06 04:14:07 +00:00
return err
}
2013-06-12 16:47:48 +00:00
err = s.stateMachine.Recovery(s.lastSnapshot.State)
2013-07-01 00:55:54 +00:00
if err != nil {
2013-07-01 02:20:23 +00:00
debugln("recovery error: ", err)
2013-07-01 02:14:02 +00:00
return err
2013-07-01 00:55:54 +00:00
}
2013-06-12 16:47:48 +00:00
for _, peerName := range s.lastSnapshot.Peers {
s.AddPeer(peerName)
}
2013-06-06 04:14:07 +00:00
2013-06-12 16:47:48 +00:00
s.log.SetStartTerm(s.lastSnapshot.LastTerm)
s.log.SetStartIndex(s.lastSnapshot.LastIndex)
s.log.UpdateCommitIndex(s.lastSnapshot.LastIndex)
2013-06-05 00:02:45 +00:00
2013-06-03 21:58:12 +00:00
return err
}