influxdb/server.go

1282 lines
32 KiB
Go
Raw Normal View History

2013-04-14 21:37:33 +00:00
package raft
2013-04-17 02:28:08 +00:00
import (
2013-06-24 16:52:51 +00:00
"encoding/json"
2013-04-17 02:28:08 +00:00
"errors"
"fmt"
2013-07-01 00:55:54 +00:00
"hash/crc32"
2013-06-08 02:19:18 +00:00
"io/ioutil"
2013-06-03 21:58:12 +00:00
"os"
"path"
2013-06-08 02:19:18 +00:00
"sort"
"sync"
"time"
2013-04-17 02:28:08 +00:00
)
2013-04-14 21:37:33 +00:00
//------------------------------------------------------------------------------
//
// Constants
//
//------------------------------------------------------------------------------
const (
2013-07-17 00:40:19 +00:00
Stopped = "stopped"
Follower = "follower"
Candidate = "candidate"
Leader = "leader"
Snapshotting = "snapshotting"
2013-04-17 02:32:49 +00:00
)
2013-07-15 05:48:41 +00:00
const (
2013-08-02 00:58:03 +00:00
MaxLogEntriesPerRequest = 2000
2013-07-18 23:44:01 +00:00
NumberOfLogEntriesAfterSnapshot = 200
2013-07-15 05:48:41 +00:00
)
2013-04-17 02:32:49 +00:00
const (
2013-04-28 04:51:17 +00:00
DefaultHeartbeatTimeout = 50 * time.Millisecond
2013-04-28 21:23:21 +00:00
DefaultElectionTimeout = 150 * time.Millisecond
2013-04-14 21:37:33 +00:00
)
2013-07-07 20:21:04 +00:00
var stopValue interface{}
//------------------------------------------------------------------------------
//
// Errors
//
//------------------------------------------------------------------------------
var NotLeaderError = errors.New("raft.Server: Not current leader")
var DuplicatePeerError = errors.New("raft.Server: Duplicate peer")
2013-07-07 20:21:04 +00:00
var CommandTimeoutError = errors.New("raft: Command timeout")
2013-04-14 21:37:33 +00:00
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
// A server is involved in the consensus protocol and can act as a follower,
// candidate or a leader.
type Server struct {
2013-06-24 16:52:51 +00:00
name string
path string
state string
transporter Transporter
context interface{}
currentTerm uint64
votedFor string
log *Log
leader string
peers map[string]*Peer
mutex sync.RWMutex
syncedPeer map[string]bool
2013-07-07 20:21:04 +00:00
c chan *event
2013-07-07 22:12:24 +00:00
electionTimeout time.Duration
heartbeatTimeout time.Duration
2013-07-15 05:48:41 +00:00
currentSnapshot *Snapshot
lastSnapshot *Snapshot
stateMachine StateMachine
maxLogEntriesPerRequest uint64
2013-09-18 04:19:46 +00:00
connectionString string
2013-04-14 21:37:33 +00:00
}
2013-07-07 20:21:04 +00:00
// An event to be processed by the server's event loop.
type event struct {
target interface{}
returnValue interface{}
c chan error
}
2013-04-14 21:37:33 +00:00
//------------------------------------------------------------------------------
//
// Constructor
//
//------------------------------------------------------------------------------
2013-04-17 02:28:08 +00:00
// Creates a new server with a log at the given path.
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (*Server, error) {
2013-04-17 02:28:08 +00:00
if name == "" {
return nil, errors.New("raft.Server: Name cannot be blank")
}
2013-05-28 19:57:38 +00:00
if transporter == nil {
2013-06-08 02:41:36 +00:00
panic("raft: Transporter required")
2013-05-28 19:57:38 +00:00
}
2013-04-17 02:28:08 +00:00
s := &Server{
2013-07-15 05:48:41 +00:00
name: name,
path: path,
transporter: transporter,
stateMachine: stateMachine,
context: context,
state: Stopped,
peers: make(map[string]*Peer),
log: newLog(),
c: make(chan *event, 256),
electionTimeout: DefaultElectionTimeout,
heartbeatTimeout: DefaultHeartbeatTimeout,
maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
connectionString: connectionString,
2013-04-14 21:37:33 +00:00
}
2013-04-30 04:13:50 +00:00
// Setup apply function.
s.log.ApplyFunc = func(c Command) (interface{}, error) {
result, err := c.Apply(s)
return result, err
2013-04-30 04:13:50 +00:00
}
2013-04-17 02:28:08 +00:00
return s, nil
}
//------------------------------------------------------------------------------
//
// Accessors
//
//------------------------------------------------------------------------------
2013-05-01 05:21:56 +00:00
//--------------------------------------
// General
//--------------------------------------
2013-04-17 02:28:08 +00:00
// Retrieves the name of the server.
func (s *Server) Name() string {
return s.name
}
// Retrieves the storage path for the server.
func (s *Server) Path() string {
return s.path
}
2013-05-05 19:36:23 +00:00
2013-07-06 19:41:42 +00:00
// The name of the current leader.
2013-06-03 13:51:52 +00:00
func (s *Server) Leader() string {
2013-06-03 01:18:25 +00:00
return s.leader
2013-06-08 02:19:18 +00:00
}
2013-06-26 18:25:22 +00:00
// Retrieves a copy of the peer data.
func (s *Server) Peers() map[string]*Peer {
s.mutex.Lock()
defer s.mutex.Unlock()
peers := make(map[string]*Peer)
for name, peer := range s.peers {
peers[name] = peer.clone()
}
return peers
}
2013-05-28 19:57:38 +00:00
// Retrieves the object that transports requests.
func (s *Server) Transporter() Transporter {
2013-07-07 22:12:24 +00:00
s.mutex.RLock()
defer s.mutex.RUnlock()
2013-05-28 19:57:38 +00:00
return s.transporter
}
2013-06-11 22:30:13 +00:00
func (s *Server) SetTransporter(t Transporter) {
2013-07-07 22:12:24 +00:00
s.mutex.Lock()
defer s.mutex.Unlock()
2013-06-11 22:30:13 +00:00
s.transporter = t
}
2013-06-03 02:43:40 +00:00
// Retrieves the context passed into the constructor.
func (s *Server) Context() interface{} {
return s.context
}
2013-04-17 02:28:08 +00:00
// Retrieves the log path for the server.
func (s *Server) LogPath() string {
2013-07-23 22:30:14 +00:00
return path.Join(s.path, "log")
2013-04-17 02:28:08 +00:00
}
// Retrieves the current state of the server.
2013-04-17 02:32:49 +00:00
func (s *Server) State() string {
2013-07-07 22:12:24 +00:00
s.mutex.RLock()
defer s.mutex.RUnlock()
2013-04-17 02:28:08 +00:00
return s.state
2013-04-14 21:37:33 +00:00
}
2013-07-07 22:12:24 +00:00
// Sets the state of the server.
func (s *Server) setState(state string) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.state = state
if state == Leader {
s.leader = s.Name()
}
2013-07-07 22:12:24 +00:00
}
// Retrieves the current term of the server.
func (s *Server) Term() uint64 {
return s.currentTerm
2013-04-14 21:37:33 +00:00
}
2013-07-06 19:41:42 +00:00
// Retrieves the current commit index of the server.
func (s *Server) CommitIndex() uint64 {
2013-07-06 04:49:47 +00:00
return s.log.commitIndex
}
2013-05-03 04:16:39 +00:00
// Retrieves the name of the candidate this server voted for in this term.
func (s *Server) VotedFor() string {
return s.votedFor
}
2013-05-08 03:56:32 +00:00
// Retrieves whether the server's log has no entries.
func (s *Server) IsLogEmpty() bool {
2013-07-06 04:49:47 +00:00
return s.log.isEmpty()
2013-05-08 03:56:32 +00:00
}
2013-05-08 20:22:08 +00:00
// A list of all the log entries. This should only be used for debugging purposes.
func (s *Server) LogEntries() []*LogEntry {
2013-07-06 19:41:42 +00:00
return s.log.entries
2013-05-08 20:22:08 +00:00
}
2013-06-03 19:13:38 +00:00
// A reference to the command name of the last entry.
2013-07-06 19:41:42 +00:00
func (s *Server) LastCommandName() string {
return s.log.lastCommandName()
2013-06-03 19:13:38 +00:00
}
// Get the state of the server for debugging
func (s *Server) GetState() string {
2013-07-07 22:12:24 +00:00
s.mutex.RLock()
defer s.mutex.RUnlock()
2013-08-03 02:00:11 +00:00
return fmt.Sprintf("Name: %s, State: %s, Term: %v, CommitedIndex: %v ", s.name, s.state, s.currentTerm, s.log.commitIndex)
}
2013-07-25 23:16:06 +00:00
// Check if the server is promotable
func (s *Server) promotable() bool {
return s.log.currentIndex() > 0
}
2013-05-01 05:21:56 +00:00
//--------------------------------------
// Membership
//--------------------------------------
2013-04-28 04:51:17 +00:00
// Retrieves the number of member servers in the consensus.
2013-05-01 05:11:23 +00:00
func (s *Server) MemberCount() int {
s.mutex.Lock()
defer s.mutex.Unlock()
return len(s.peers) + 1
2013-04-28 04:51:17 +00:00
}
// Retrieves the number of servers required to make a quorum.
2013-05-01 05:11:23 +00:00
func (s *Server) QuorumSize() int {
2013-04-28 04:51:17 +00:00
return (s.MemberCount() / 2) + 1
}
2013-05-01 05:21:56 +00:00
//--------------------------------------
// Election timeout
//--------------------------------------
// Retrieves the election timeout.
func (s *Server) ElectionTimeout() time.Duration {
2013-08-07 04:02:30 +00:00
s.mutex.RLock()
defer s.mutex.RUnlock()
2013-07-07 22:12:24 +00:00
return s.electionTimeout
2013-05-01 05:21:56 +00:00
}
// Sets the election timeout.
func (s *Server) SetElectionTimeout(duration time.Duration) {
2013-08-07 04:02:30 +00:00
s.mutex.Lock()
defer s.mutex.Unlock()
2013-07-07 22:12:24 +00:00
s.electionTimeout = duration
2013-05-01 05:21:56 +00:00
}
//--------------------------------------
// Heartbeat timeout
//--------------------------------------
// Retrieves the heartbeat timeout.
func (s *Server) HeartbeatTimeout() time.Duration {
2013-08-07 04:02:30 +00:00
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.heartbeatTimeout
}
// Sets the heartbeat timeout.
2013-07-06 19:41:42 +00:00
func (s *Server) SetHeartbeatTimeout(duration time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.heartbeatTimeout = duration
for _, peer := range s.peers {
2013-07-06 04:49:47 +00:00
peer.setHeartbeatTimeout(duration)
}
}
2013-04-14 21:37:33 +00:00
//------------------------------------------------------------------------------
//
// Methods
//
//------------------------------------------------------------------------------
2013-04-17 02:28:08 +00:00
//--------------------------------------
2013-06-24 16:52:51 +00:00
// Initialization
2013-04-17 02:28:08 +00:00
//--------------------------------------
2013-07-11 05:19:57 +00:00
// Reg the NOPCommand
func init() {
RegisterCommand(&NOPCommand{})
RegisterCommand(&DefaultJoinCommand{})
2013-07-26 20:33:58 +00:00
RegisterCommand(&DefaultLeaveCommand{})
2013-07-11 05:19:57 +00:00
}
2013-07-25 22:47:35 +00:00
// Start as follow
// If log entries exist then allow promotion to candidate if no AEs received.
// If no log entries exist then wait for AEs from another node.
// If no log entries exist and a self-join command is issued then
// immediately become leader and commit entry.
func (s *Server) Start() error {
// Exit if the server is already running.
if s.state != Stopped {
return errors.New("raft.Server: Server already running")
}
// Create snapshot directory if not exist
os.Mkdir(path.Join(s.path, "snapshot"), 0700)
2013-08-13 21:31:19 +00:00
if err := s.readConf(); err != nil {
s.debugln("raft: Conf file error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
2013-08-13 21:31:19 +00:00
// Initialize the log and load it up.
if err := s.log.open(s.LogPath()); err != nil {
s.debugln("raft: Log error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
// Update the term to the last term in the log.
_, s.currentTerm = s.log.lastInfo()
s.setState(Follower)
2013-07-25 22:40:20 +00:00
// If no log entries exist then
// 1. wait for AEs from another node
// 2. wait for self-join command
// to set itself promotable
2013-07-25 23:16:06 +00:00
if !s.promotable() {
s.debugln("start as a new raft server")
2013-07-25 22:40:20 +00:00
// If log entries exist then allow promotion to candidate
// if no AEs received.
} else {
s.debugln("start from previous saved state")
}
2013-08-03 02:00:11 +00:00
debugln(s.GetState())
go s.loop()
return nil
}
2013-07-03 16:53:46 +00:00
// Shuts down the server.
func (s *Server) Stop() {
2013-07-07 20:21:04 +00:00
s.send(&stopValue)
2013-07-07 22:12:24 +00:00
s.mutex.Lock()
defer s.mutex.Unlock()
2013-07-07 20:21:04 +00:00
s.log.close()
2013-07-03 16:53:46 +00:00
}
2013-07-07 20:21:04 +00:00
// Checks if the server is currently running.
func (s *Server) Running() bool {
2013-07-07 22:12:24 +00:00
s.mutex.RLock()
defer s.mutex.RUnlock()
2013-07-07 20:21:04 +00:00
return s.state != Stopped
}
2013-07-03 16:53:46 +00:00
2013-07-07 20:21:04 +00:00
//--------------------------------------
// Term
//--------------------------------------
2013-07-03 16:53:46 +00:00
2013-07-07 20:21:04 +00:00
// Sets the current term for the server. This is only used when an external
// current term is found.
func (s *Server) setCurrentTerm(term uint64, leaderName string, append bool) {
2013-07-07 22:12:24 +00:00
s.mutex.Lock()
defer s.mutex.Unlock()
// update the term and clear vote for
2013-07-07 20:21:04 +00:00
if term > s.currentTerm {
s.state = Follower
s.currentTerm = term
s.leader = leaderName
s.votedFor = ""
return
}
// discover new leader when candidate
// save leader name when follower
if term == s.currentTerm && s.state != Leader && append {
s.state = Follower
s.leader = leaderName
2013-07-03 16:53:46 +00:00
}
2013-07-26 00:49:01 +00:00
2013-07-03 16:53:46 +00:00
}
2013-07-06 19:41:42 +00:00
//--------------------------------------
// Event Loop
//--------------------------------------
2013-07-17 00:40:19 +00:00
// ________
// --|Snapshot| timeout
// | -------- ______
// recover | ^ | |
// snapshot / | |snapshot | |
// higher | | v | recv majority votes
// term | -------- timeout ----------- -----------
// |-> |Follower| ----------> | Candidate |--------------------> | Leader |
// -------- ----------- -----------
// ^ higher term/ | higher term |
// | new leader | |
// |_______________________|____________________________________ |
2013-07-06 19:41:42 +00:00
// The main event loop for the server
2013-07-07 20:21:04 +00:00
func (s *Server) loop() {
defer s.debugln("server.loop.end")
2013-07-06 19:41:42 +00:00
for {
2013-07-07 22:12:24 +00:00
state := s.State()
2013-07-07 22:12:24 +00:00
s.debugln("server.loop.run ", state)
switch state {
2013-07-06 19:41:42 +00:00
case Follower:
2013-07-07 20:21:04 +00:00
s.followerLoop()
2013-07-06 19:41:42 +00:00
case Candidate:
2013-07-07 20:21:04 +00:00
s.candidateLoop()
2013-07-06 19:41:42 +00:00
case Leader:
2013-07-07 20:21:04 +00:00
s.leaderLoop()
2013-07-17 00:40:19 +00:00
case Snapshotting:
s.snapshotLoop()
2013-07-07 20:21:04 +00:00
case Stopped:
return
2013-07-06 19:41:42 +00:00
}
}
}
2013-07-07 20:21:04 +00:00
// Sends an event to the event loop to be processed. The function will wait
// until the event is actually processed before returning.
func (s *Server) send(value interface{}) (interface{}, error) {
event := s.sendAsync(value)
err := <-event.c
return event.returnValue, err
}
func (s *Server) sendAsync(value interface{}) *event {
event := &event{target: value, c: make(chan error, 1)}
s.c <- event
return event
}
2013-07-06 19:41:42 +00:00
// The event loop that is run when the server is in a Follower state.
// Responds to RPCs from candidates and leaders.
// Converts to candidate if election timeout elapses without either:
// 1.Receiving valid AppendEntries RPC, or
// 2.Granting vote to candidate
2013-07-07 20:21:04 +00:00
func (s *Server) followerLoop() {
2013-07-08 04:31:58 +00:00
2013-07-07 22:12:24 +00:00
s.setState(Follower)
2013-07-08 04:31:58 +00:00
timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
2013-07-03 16:53:46 +00:00
for {
2013-07-07 20:21:04 +00:00
var err error
update := false
2013-07-07 20:21:04 +00:00
select {
case e := <-s.c:
if e.target == &stopValue {
2013-07-07 22:12:24 +00:00
s.setState(Stopped)
2013-08-07 04:29:40 +00:00
} else {
switch req := e.target.(type) {
case JoinCommand:
//If no log entries exist and a self-join command is issued
//then immediately become leader and commit entry.
if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
s.debugln("selfjoin and promote to leader")
s.setState(Leader)
s.processCommand(req, e)
} else {
err = NotLeaderError
}
case *AppendEntriesRequest:
e.returnValue, update = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, update = s.processRequestVoteRequest(req)
case *SnapshotRequest:
e.returnValue = s.processSnapshotRequest(req)
default:
err = NotLeaderError
}
2013-07-07 20:21:04 +00:00
}
2013-07-03 16:53:46 +00:00
2013-07-07 20:21:04 +00:00
// Callback to event.
e.c <- err
2013-07-07 20:55:55 +00:00
2013-07-08 04:31:58 +00:00
case <-timeoutChan:
// only allow synced follower to promote to candidate
2013-07-25 23:16:06 +00:00
if s.promotable() {
s.setState(Candidate)
} else {
update = true
}
2013-07-07 20:21:04 +00:00
}
2013-07-07 20:55:55 +00:00
2013-07-08 04:31:58 +00:00
// Converts to candidate if election timeout elapses without either:
// 1.Receiving valid AppendEntries RPC, or
// 2.Granting vote to candidate
if update {
2013-07-09 03:00:14 +00:00
timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
2013-07-08 04:31:58 +00:00
}
2013-07-07 20:21:04 +00:00
// Exit loop on state change.
2013-07-07 22:12:24 +00:00
if s.State() != Follower {
2013-07-07 20:21:04 +00:00
break
2013-07-03 16:53:46 +00:00
}
}
}
2013-07-06 19:41:42 +00:00
// The event loop that is run when the server is in a Candidate state.
2013-07-07 20:21:04 +00:00
func (s *Server) candidateLoop() {
2013-07-06 04:49:47 +00:00
lastLogIndex, lastLogTerm := s.log.lastInfo()
2013-07-07 20:21:04 +00:00
s.leader = ""
2013-07-03 16:53:46 +00:00
for {
2013-07-06 19:41:42 +00:00
// Increment current term, vote for self.
2013-07-03 16:53:46 +00:00
s.currentTerm++
2013-07-06 19:41:42 +00:00
s.votedFor = s.name
2013-07-03 16:53:46 +00:00
2013-07-06 19:41:42 +00:00
// Send RequestVote RPCs to all other servers.
2013-07-07 20:21:04 +00:00
respChan := make(chan *RequestVoteResponse, len(s.peers))
2013-07-03 16:53:46 +00:00
for _, peer := range s.peers {
2013-07-07 22:12:24 +00:00
go peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)
2013-07-03 16:53:46 +00:00
}
2013-07-06 19:41:42 +00:00
// Wait for either:
// * Votes received from majority of servers: become leader
// * AppendEntries RPC received from new leader: step down.
// * Election timeout elapses without election resolution: increment term, start new election
// * Discover higher term: step down (§5.1)
2013-07-07 20:21:04 +00:00
votesGranted := 1
timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
2013-07-07 23:52:18 +00:00
timeout := false
2013-07-07 20:21:04 +00:00
for {
// If we received enough votes then stop waiting for more votes.
s.debugln("server.candidate.votes: ", votesGranted, " quorum:", s.QuorumSize())
if votesGranted >= s.QuorumSize() {
2013-07-07 22:12:24 +00:00
s.setState(Leader)
2013-07-07 20:21:04 +00:00
break
}
// Collect votes from peers.
select {
case resp := <-respChan:
if resp.VoteGranted {
s.debugln("server.candidate.vote.granted: ", votesGranted)
votesGranted++
} else if resp.Term > s.currentTerm {
s.debugln("server.candidate.vote.failed")
s.setCurrentTerm(resp.Term, "", false)
} else {
s.debugln("server.candidate.vote: denied")
2013-07-07 20:21:04 +00:00
}
2013-07-07 20:55:55 +00:00
case e := <-s.c:
2013-07-07 20:21:04 +00:00
var err error
if e.target == &stopValue {
2013-07-07 22:12:24 +00:00
s.setState(Stopped)
2013-08-07 04:32:00 +00:00
} else {
switch req := e.target.(type) {
case Command:
err = NotLeaderError
case *AppendEntriesRequest:
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
}
2013-07-07 20:21:04 +00:00
}
// Callback to event.
e.c <- err
case <-timeoutChan:
2013-07-07 23:52:18 +00:00
timeout = true
}
// both process AER and RVR can make the server to follower
// also break when timeout happens
if s.State() != Candidate || timeout {
2013-07-07 20:21:04 +00:00
break
}
}
2013-07-07 20:55:55 +00:00
2013-07-07 23:52:18 +00:00
// break when we are not candidate
2013-07-07 22:12:24 +00:00
if s.State() != Candidate {
2013-07-07 20:21:04 +00:00
break
2013-07-03 16:53:46 +00:00
}
2013-07-07 23:52:18 +00:00
// continue when timeout happened
2013-07-03 16:53:46 +00:00
}
}
2013-08-07 04:33:37 +00:00
// The event loop that is run when the server is in a Leader state.
2013-07-07 20:21:04 +00:00
func (s *Server) leaderLoop() {
2013-07-07 22:12:24 +00:00
s.setState(Leader)
s.syncedPeer = make(map[string]bool)
2013-07-06 04:49:47 +00:00
logIndex, _ := s.log.lastInfo()
2013-07-03 16:53:46 +00:00
2013-07-06 19:41:42 +00:00
// Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
2013-07-03 16:53:46 +00:00
for _, peer := range s.peers {
2013-07-07 22:12:24 +00:00
peer.setPrevLogIndex(logIndex)
2013-07-06 04:49:47 +00:00
peer.startHeartbeat()
2013-07-03 16:53:46 +00:00
}
2013-07-01 15:46:53 +00:00
go s.Do(NOPCommand{})
2013-07-03 16:53:46 +00:00
// Begin to collect response from followers
2013-07-06 19:41:42 +00:00
for {
2013-07-07 20:21:04 +00:00
var err error
2013-07-06 19:41:42 +00:00
select {
2013-07-07 20:21:04 +00:00
case e := <-s.c:
if e.target == &stopValue {
2013-07-07 22:12:24 +00:00
s.setState(Stopped)
2013-08-07 04:35:33 +00:00
} else {
switch req := e.target.(type) {
case Command:
s.processCommand(req, e)
continue
case *AppendEntriesRequest:
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *AppendEntriesResponse:
s.processAppendEntriesResponse(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
}
2013-07-06 19:41:42 +00:00
}
2013-07-03 16:53:46 +00:00
2013-07-07 20:21:04 +00:00
// Callback to event.
e.c <- err
}
2013-07-07 20:55:55 +00:00
2013-07-07 20:21:04 +00:00
// Exit loop on state change.
2013-07-07 22:12:24 +00:00
if s.State() != Leader {
2013-07-06 19:41:42 +00:00
break
2013-07-03 16:53:46 +00:00
}
}
2013-07-06 19:41:42 +00:00
// Stop all peers.
for _, peer := range s.peers {
2013-08-13 21:31:19 +00:00
peer.stopHeartbeat(false)
}
s.syncedPeer = nil
}
2013-07-17 00:40:19 +00:00
func (s *Server) snapshotLoop() {
s.setState(Snapshotting)
for {
var err error
e := <-s.c
if e.target == &stopValue {
s.setState(Stopped)
2013-08-07 04:37:50 +00:00
} else {
switch req := e.target.(type) {
case Command:
err = NotLeaderError
case *AppendEntriesRequest:
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
case *SnapshotRecoveryRequest:
e.returnValue = s.processSnapshotRecoveryRequest(req)
}
2013-07-17 00:40:19 +00:00
}
// Callback to event.
e.c <- err
// Exit loop on state change.
if s.State() != Snapshotting {
break
}
}
}
2013-04-17 02:28:08 +00:00
//--------------------------------------
// Commands
//--------------------------------------
2013-04-28 04:51:17 +00:00
// Attempts to execute a command and replicate it. The function will return
// when the command has been successfully committed or an error has occurred.
func (s *Server) Do(command Command) (interface{}, error) {
2013-07-07 20:21:04 +00:00
return s.send(command)
}
2013-07-07 20:21:04 +00:00
// Processes a command.
func (s *Server) processCommand(command Command, e *event) {
s.debugln("server.command.process")
2013-07-07 20:21:04 +00:00
// Create an entry for the command in the log.
entry, err := s.log.createEntry(s.currentTerm, command)
if err != nil {
s.debugln("server.command.log.entry.error:", err)
e.c <- err
return
}
2013-07-06 04:49:47 +00:00
if err := s.log.appendEntry(entry); err != nil {
2013-07-07 20:21:04 +00:00
s.debugln("server.command.log.error:", err)
e.c <- err
return
2013-05-05 19:36:23 +00:00
}
2013-05-05 20:26:04 +00:00
2013-07-07 20:21:04 +00:00
// Issue a callback for the entry once it's committed.
go func() {
// Wait for the entry to be committed.
select {
case <-entry.commit:
2013-07-07 22:12:24 +00:00
var err error
2013-07-07 20:21:04 +00:00
s.debugln("server.command.commit")
2013-07-07 22:12:24 +00:00
e.returnValue, err = s.log.getEntryResult(entry, true)
e.c <- err
2013-07-07 20:21:04 +00:00
case <-time.After(time.Second):
s.debugln("server.command.timeout")
e.c <- CommandTimeoutError
}
}()
// Issue an append entries response for the server.
resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex())
resp.append = true
resp.peer = s.Name()
2013-08-02 16:03:23 +00:00
// this must be async
// sendAsync is not really async every time
// when the sending speed of the user is larger than
// the processing speed of the server, the buffered channel
// will be full. Then sendAsync will become sync, which will
// cause deadlock here.
// so we use a goroutine to avoid the deadlock
go s.sendAsync(resp)
2013-04-28 04:51:17 +00:00
}
2013-07-07 20:21:04 +00:00
//--------------------------------------
// Append Entries
//--------------------------------------
2013-07-06 19:41:42 +00:00
2013-07-07 20:21:04 +00:00
// Appends zero or more log entry from the leader to this server.
func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse {
ret, _ := s.send(req)
resp, _ := ret.(*AppendEntriesResponse)
return resp
}
2013-07-07 20:21:04 +00:00
// Processes the "append entries" request.
2013-07-08 04:31:58 +00:00
func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
2013-07-11 19:56:34 +00:00
2013-07-09 02:55:00 +00:00
s.traceln("server.ae.process")
2013-04-30 04:13:50 +00:00
if req.Term < s.currentTerm {
2013-07-07 20:21:04 +00:00
s.debugln("server.ae.error: stale term")
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), false
2013-04-30 04:13:50 +00:00
}
2013-06-24 16:52:51 +00:00
2013-07-07 20:21:04 +00:00
// Update term and leader.
s.setCurrentTerm(req.Term, req.LeaderName, true)
2013-06-08 02:19:18 +00:00
2013-04-30 04:13:50 +00:00
// Reject if log doesn't contain a matching previous entry.
2013-07-06 04:49:47 +00:00
if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
2013-07-07 20:21:04 +00:00
s.debugln("server.ae.truncate.error: ", err)
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
2013-04-30 04:13:50 +00:00
}
// Append entries to the log.
2013-07-06 04:49:47 +00:00
if err := s.log.appendEntries(req.Entries); err != nil {
2013-07-07 20:21:04 +00:00
s.debugln("server.ae.append.error: ", err)
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
2013-04-30 04:13:50 +00:00
}
// Commit up to the commit index.
2013-07-06 04:49:47 +00:00
if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
2013-07-07 20:21:04 +00:00
s.debugln("server.ae.commit.error: ", err)
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
2013-06-08 02:19:18 +00:00
}
2013-04-30 04:13:50 +00:00
// once the server appended and commited all the log entries from the leader
return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
2013-04-28 04:51:17 +00:00
}
2013-07-07 20:21:04 +00:00
// Processes the "append entries" response from the peer. This is only
// processed when the server is a leader. Responses received during other
// states are dropped.
func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
2013-07-11 19:56:34 +00:00
2013-07-07 20:21:04 +00:00
// If we find a higher term then change to a follower and exit.
if resp.Term > s.currentTerm {
s.setCurrentTerm(resp.Term, "", false)
2013-07-07 20:21:04 +00:00
return
}
2013-07-07 20:55:55 +00:00
// panic response if it's not successful.
2013-07-07 20:21:04 +00:00
if !resp.Success {
return
}
2013-07-07 20:55:55 +00:00
// if one peer successfully append a log from the leader term,
// we add it to the synced list
if resp.append == true {
s.syncedPeer[resp.peer] = true
}
2013-07-07 20:21:04 +00:00
// Increment the commit count to make sure we have a quorum before committing.
if len(s.syncedPeer) < s.QuorumSize() {
2013-07-07 20:21:04 +00:00
return
}
// Determine the committed index that a majority has.
var indices []uint64
indices = append(indices, s.log.currentIndex())
for _, peer := range s.peers {
2013-07-07 22:12:24 +00:00
indices = append(indices, peer.getPrevLogIndex())
2013-07-07 20:21:04 +00:00
}
sort.Sort(uint64Slice(indices))
// We can commit up to the index which the majority of the members have appended.
commitIndex := indices[s.QuorumSize()-1]
committedIndex := s.log.commitIndex
if commitIndex > committedIndex {
s.log.setCommitIndex(commitIndex)
s.debugln("commit index ", commitIndex)
2013-07-07 20:21:04 +00:00
for i := committedIndex; i < commitIndex; i++ {
2013-07-18 23:29:06 +00:00
if entry := s.log.getEntry(i + 1); entry != nil {
2013-07-10 17:31:56 +00:00
// if the leader is a new one and the entry came from the
// old leader, the commit channel will be nil and no go routine
// is waiting from this channel
// if we try to send to it, the new leader will get stuck
if entry.commit != nil {
select {
case entry.commit <- true:
default:
panic("server unable to send signal to commit channel")
}
}
2013-07-07 20:21:04 +00:00
}
}
2013-06-25 21:41:42 +00:00
}
2013-05-05 19:36:23 +00:00
}
2013-05-01 05:11:23 +00:00
//--------------------------------------
// Request Vote
2013-04-28 21:23:21 +00:00
//--------------------------------------
// Requests a vote from a server. A vote can be obtained if the vote's term is
// at the server's current term and the server has not made a vote yet. A vote
// can also be obtained if the term is greater than the server's current term.
2013-07-07 20:21:04 +00:00
func (s *Server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
ret, _ := s.send(req)
resp, _ := ret.(*RequestVoteResponse)
return resp
}
2013-05-08 03:56:32 +00:00
2013-07-07 20:21:04 +00:00
// Processes a "request vote" request.
2013-07-08 04:31:58 +00:00
func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
2013-07-11 19:56:34 +00:00
2013-04-28 21:23:21 +00:00
// If the request is coming from an old term then reject it.
if req.Term < s.currentTerm {
2013-07-07 20:21:04 +00:00
s.debugln("server.rv.error: stale term")
2013-07-08 04:31:58 +00:00
return newRequestVoteResponse(s.currentTerm, false), false
2013-04-28 21:23:21 +00:00
}
2013-07-03 01:22:37 +00:00
s.setCurrentTerm(req.Term, "", false)
2013-04-28 21:23:21 +00:00
// If we've already voted for a different candidate then don't vote for this candidate.
if s.votedFor != "" && s.votedFor != req.CandidateName {
2013-07-08 04:31:58 +00:00
s.debugln("server.rv.error: duplicate vote: ", req.CandidateName,
" already vote for ", s.votedFor)
return newRequestVoteResponse(s.currentTerm, false), false
2013-04-28 21:23:21 +00:00
}
2013-07-07 20:21:04 +00:00
// If the candidate's log is not at least as up-to-date as our last log then don't vote.
2013-07-06 04:49:47 +00:00
lastIndex, lastTerm := s.log.lastInfo()
if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
2013-07-08 04:31:58 +00:00
s.debugln("server.rv.error: out of date log: ", req.CandidateName,
"Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
"Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
2013-07-08 04:31:58 +00:00
return newRequestVoteResponse(s.currentTerm, false), false
2013-04-28 22:49:52 +00:00
}
2013-04-28 21:23:21 +00:00
2013-04-28 22:49:52 +00:00
// If we made it this far then cast a vote and reset our election time out.
2013-07-07 20:21:04 +00:00
s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
2013-04-28 22:49:52 +00:00
s.votedFor = req.CandidateName
2013-06-24 16:52:51 +00:00
2013-07-08 04:31:58 +00:00
return newRequestVoteResponse(s.currentTerm, true), true
}
2013-04-17 02:32:49 +00:00
//--------------------------------------
// Membership
//--------------------------------------
2013-07-09 02:55:00 +00:00
// Adds a peer to the server.
2013-08-15 23:35:01 +00:00
func (s *Server) AddPeer(name string, connectiongString string) error {
2013-07-09 02:55:00 +00:00
s.debugln("server.peer.add: ", name, len(s.peers))
2013-08-19 15:54:44 +00:00
2013-07-09 02:55:00 +00:00
// Do not allow peers to be added twice.
if s.peers[name] != nil {
return nil
2013-04-17 02:32:49 +00:00
}
2013-04-28 21:23:21 +00:00
// Skip the Peer if it has the same name as the Server
2013-08-19 15:54:44 +00:00
if s.name != name {
peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
2013-08-19 15:54:44 +00:00
if s.State() == Leader {
peer.startHeartbeat()
}
2013-08-13 21:31:19 +00:00
2013-08-19 15:54:44 +00:00
s.peers[peer.Name] = peer
}
2013-08-13 21:31:19 +00:00
2013-08-19 15:54:44 +00:00
// Write the configuration to file.
s.writeConf()
2013-08-13 21:31:19 +00:00
return nil
2013-04-17 02:32:49 +00:00
}
2013-07-09 02:55:00 +00:00
// Removes a peer from the server.
func (s *Server) RemovePeer(name string) error {
2013-07-09 02:55:00 +00:00
s.debugln("server.peer.remove: ", name, len(s.peers))
2013-08-19 15:54:44 +00:00
// Skip the Peer if it has the same name as the Server
if name != s.Name() {
// Return error if peer doesn't exist.
peer := s.peers[name]
if peer == nil {
return fmt.Errorf("raft: Peer not found: %s", name)
}
2013-08-19 15:54:44 +00:00
// Stop peer and remove it.
if s.State() == Leader {
peer.stopHeartbeat(true)
}
2013-08-19 15:54:44 +00:00
delete(s.peers, name)
2013-08-13 21:31:19 +00:00
}
2013-07-23 22:30:14 +00:00
2013-08-19 15:54:44 +00:00
// Write the configuration to file.
s.writeConf()
return nil
}
2013-06-06 03:32:52 +00:00
2013-06-03 21:58:12 +00:00
//--------------------------------------
// Log compaction
//--------------------------------------
2013-08-10 17:53:39 +00:00
func (s *Server) TakeSnapshot() error {
2013-06-03 21:58:12 +00:00
//TODO put a snapshot mutex
2013-07-07 20:21:04 +00:00
s.debugln("take Snapshot")
2013-06-03 21:58:12 +00:00
if s.currentSnapshot != nil {
return errors.New("handling snapshot")
}
2013-06-05 05:56:59 +00:00
2013-07-06 04:49:47 +00:00
lastIndex, lastTerm := s.log.commitInfo()
2013-06-03 21:58:12 +00:00
2013-08-03 02:00:11 +00:00
if lastIndex == 0 {
2013-06-03 21:58:12 +00:00
return errors.New("No logs")
}
path := s.SnapshotPath(lastIndex, lastTerm)
2013-06-12 16:47:48 +00:00
var state []byte
var err error
2013-06-06 04:14:07 +00:00
2013-06-12 16:47:48 +00:00
if s.stateMachine != nil {
state, err = s.stateMachine.Save()
if err != nil {
return err
}
} else {
state = []byte{0}
2013-06-06 04:14:07 +00:00
}
2013-09-18 04:19:46 +00:00
peers := make([]*Peer, len(s.peers)+1)
2013-06-12 16:47:48 +00:00
2013-09-18 04:19:46 +00:00
i := 0
2013-06-12 16:47:48 +00:00
for _, peer := range s.peers {
2013-09-18 04:19:46 +00:00
peers[i] = peer.clone()
2013-09-22 05:15:54 +00:00
i++
2013-09-18 04:19:46 +00:00
}
peers[i] = &Peer{
Name: s.Name(),
ConnectionString: s.connectionString,
2013-06-06 04:14:07 +00:00
}
2013-08-15 23:35:01 +00:00
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
2013-06-03 21:58:12 +00:00
2013-06-05 00:02:45 +00:00
s.saveSnapshot()
2013-06-05 05:56:59 +00:00
2013-07-16 20:16:33 +00:00
// We keep some log entries after the snapshot
// We do not want to send the whole snapshot
// to the slightly slow machines
2013-07-18 23:44:01 +00:00
if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {
compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshot
2013-07-18 23:29:06 +00:00
compactTerm := s.log.getEntry(compactIndex).Term
2013-07-16 20:16:33 +00:00
s.log.compact(compactIndex, compactTerm)
}
2013-06-05 05:56:59 +00:00
2013-06-03 21:58:12 +00:00
return nil
}
// Retrieves the log path for the server.
2013-06-05 00:02:45 +00:00
func (s *Server) saveSnapshot() error {
2013-06-05 05:56:59 +00:00
2013-06-03 21:58:12 +00:00
if s.currentSnapshot == nil {
return errors.New("no snapshot to save")
}
2013-07-06 04:49:47 +00:00
err := s.currentSnapshot.save()
2013-06-03 21:58:12 +00:00
if err != nil {
return err
}
2013-06-08 02:19:18 +00:00
2013-06-03 21:58:12 +00:00
tmp := s.lastSnapshot
s.lastSnapshot = s.currentSnapshot
2013-06-05 05:56:59 +00:00
// delete the previous snapshot if there is any change
2013-06-12 16:47:48 +00:00
if tmp != nil && !(tmp.LastIndex == s.lastSnapshot.LastIndex && tmp.LastTerm == s.lastSnapshot.LastTerm) {
2013-07-06 04:49:47 +00:00
tmp.remove()
2013-06-05 00:02:45 +00:00
}
2013-06-03 21:58:12 +00:00
s.currentSnapshot = nil
return nil
}
2013-06-05 00:02:45 +00:00
// Retrieves the log path for the server.
func (s *Server) SnapshotPath(lastIndex uint64, lastTerm uint64) string {
return path.Join(s.path, "snapshot", fmt.Sprintf("%v_%v.ss", lastTerm, lastIndex))
2013-06-05 00:02:45 +00:00
}
2013-07-18 23:44:01 +00:00
func (s *Server) RequestSnapshot(req *SnapshotRequest) *SnapshotResponse {
2013-07-17 00:40:19 +00:00
ret, _ := s.send(req)
resp, _ := ret.(*SnapshotResponse)
return resp
}
func (s *Server) processSnapshotRequest(req *SnapshotRequest) *SnapshotResponse {
// If the followers log contains an entry at the snapshots last index with a term
// that matches the snapshots last term
// Then the follower already has all the information found in the snapshot
// and can reply false
2013-07-18 23:29:06 +00:00
entry := s.log.getEntry(req.LastIndex)
2013-07-17 00:40:19 +00:00
if entry != nil && entry.Term == req.LastTerm {
return newSnapshotResponse(false)
}
s.setState(Snapshotting)
return newSnapshotResponse(true)
}
func (s *Server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
ret, _ := s.send(req)
resp, _ := ret.(*SnapshotRecoveryResponse)
return resp
}
func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
2013-06-05 05:56:59 +00:00
2013-06-06 04:14:07 +00:00
s.stateMachine.Recovery(req.State)
2013-06-03 21:58:12 +00:00
2013-07-17 00:40:19 +00:00
// clear the peer map
s.peers = make(map[string]*Peer)
// recovery the cluster configuration
2013-08-15 23:35:01 +00:00
for _, peer := range req.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
2013-06-12 16:47:48 +00:00
}
2013-06-03 21:58:12 +00:00
//update term and index
2013-06-06 03:25:17 +00:00
s.currentTerm = req.LastTerm
2013-06-12 16:47:48 +00:00
2013-07-06 04:49:47 +00:00
s.log.updateCommitIndex(req.LastIndex)
2013-06-12 16:47:48 +00:00
2013-06-06 03:25:17 +00:00
snapshotPath := s.SnapshotPath(req.LastIndex, req.LastTerm)
2013-06-12 16:47:48 +00:00
s.currentSnapshot = &Snapshot{req.LastIndex, req.LastTerm, req.Peers, req.State, snapshotPath}
2013-06-24 16:52:51 +00:00
2013-06-08 02:19:18 +00:00
s.saveSnapshot()
2013-06-24 16:52:51 +00:00
2013-07-17 00:40:19 +00:00
// clear the previous log entries
2013-07-06 04:49:47 +00:00
s.log.compact(req.LastIndex, req.LastTerm)
2013-06-05 00:02:45 +00:00
2013-07-17 00:40:19 +00:00
return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
2013-06-03 21:58:12 +00:00
}
2013-06-05 05:56:59 +00:00
// Load a snapshot at restart
2013-06-03 21:58:12 +00:00
func (s *Server) LoadSnapshot() error {
dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
2013-06-03 21:58:12 +00:00
if err != nil {
2013-06-24 16:52:51 +00:00
2013-06-12 16:47:48 +00:00
return err
2013-06-03 21:58:12 +00:00
}
filenames, err := dir.Readdirnames(-1)
if err != nil {
dir.Close()
panic(err)
}
dir.Close()
if len(filenames) == 0 {
return errors.New("no snapshot")
}
// not sure how many snapshot we should keep
sort.Strings(filenames)
2013-06-08 02:19:18 +00:00
snapshotPath := path.Join(s.path, "snapshot", filenames[len(filenames)-1])
2013-06-03 21:58:12 +00:00
2013-07-01 02:20:23 +00:00
// should not fail
2013-06-03 21:58:12 +00:00
file, err := os.OpenFile(snapshotPath, os.O_RDONLY, 0)
defer file.Close()
if err != nil {
panic(err)
}
2013-06-24 16:52:51 +00:00
// TODO check checksum first
2013-06-05 05:56:59 +00:00
2013-06-12 16:47:48 +00:00
var snapshotBytes []byte
2013-07-01 02:14:02 +00:00
var checksum uint32
2013-06-06 20:54:27 +00:00
2013-07-01 02:50:48 +00:00
n, err := fmt.Fscanf(file, "%08x\n", &checksum)
2013-06-03 21:58:12 +00:00
if err != nil {
2013-06-06 04:14:07 +00:00
return err
2013-06-03 21:58:12 +00:00
}
2013-06-12 16:47:48 +00:00
if n != 1 {
2013-06-06 04:14:07 +00:00
return errors.New("Bad snapshot file")
2013-06-03 21:58:12 +00:00
}
2013-06-12 16:47:48 +00:00
snapshotBytes, _ = ioutil.ReadAll(file)
2013-07-07 20:21:04 +00:00
s.debugln(string(snapshotBytes))
2013-06-12 16:47:48 +00:00
2013-07-01 00:55:54 +00:00
// Generate checksum.
byteChecksum := crc32.ChecksumIEEE(snapshotBytes)
if uint32(checksum) != byteChecksum {
2013-07-07 20:21:04 +00:00
s.debugln(checksum, " ", byteChecksum)
2013-07-01 00:55:54 +00:00
return errors.New("bad snapshot file")
}
2013-06-12 16:47:48 +00:00
err = json.Unmarshal(snapshotBytes, &s.lastSnapshot)
2013-06-06 20:54:27 +00:00
2013-06-06 04:14:07 +00:00
if err != nil {
2013-07-07 20:21:04 +00:00
s.debugln("unmarshal error: ", err)
2013-06-06 04:14:07 +00:00
return err
}
2013-06-12 16:47:48 +00:00
err = s.stateMachine.Recovery(s.lastSnapshot.State)
2013-07-01 00:55:54 +00:00
if err != nil {
2013-07-07 20:21:04 +00:00
s.debugln("recovery error: ", err)
2013-07-01 02:14:02 +00:00
return err
2013-07-01 00:55:54 +00:00
}
2013-08-15 23:35:01 +00:00
for _, peer := range s.lastSnapshot.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
2013-06-12 16:47:48 +00:00
}
2013-06-06 04:14:07 +00:00
2013-07-06 04:49:47 +00:00
s.log.startTerm = s.lastSnapshot.LastTerm
s.log.startIndex = s.lastSnapshot.LastIndex
s.log.updateCommitIndex(s.lastSnapshot.LastIndex)
2013-06-05 00:02:45 +00:00
2013-06-03 21:58:12 +00:00
return err
}
2013-07-07 20:21:04 +00:00
//--------------------------------------
// Config File
//--------------------------------------
func (s *Server) writeConf() {
2013-08-15 23:35:01 +00:00
peers := make([]*Peer, len(s.peers))
i := 0
2013-08-15 23:35:01 +00:00
for _, peer := range s.peers {
peers[i] = peer.clone()
i++
}
r := &Config{
CommitIndex: s.log.commitIndex,
Peers: peers,
}
b, _ := json.Marshal(r)
confPath := path.Join(s.path, "conf")
tmpConfPath := path.Join(s.path, "conf.tmp")
err := ioutil.WriteFile(tmpConfPath, b, 0600)
if err != nil {
panic(err)
}
os.Rename(tmpConfPath, confPath)
}
// Read the configuration for the server.
func (s *Server) readConf() error {
confPath := path.Join(s.path, "conf")
s.debugln("readConf.open ", confPath)
// open conf file
b, err := ioutil.ReadFile(confPath)
if err != nil {
return nil
}
conf := &Config{}
if err = json.Unmarshal(b, conf); err != nil {
return err
}
2013-09-18 04:19:46 +00:00
s.log.updateCommitIndex(conf.CommitIndex)
return nil
}
2013-07-07 20:21:04 +00:00
//--------------------------------------
// Debugging
//--------------------------------------
func (s *Server) debugln(v ...interface{}) {
2013-07-25 22:40:20 +00:00
debugf("[%s Term:%d] %s", s.name, s.currentTerm, fmt.Sprintln(v...))
2013-07-07 20:21:04 +00:00
}
func (s *Server) traceln(v ...interface{}) {
tracef("[%s] %s", s.name, fmt.Sprintln(v...))
}