influxdb/server.go

235 lines
5.4 KiB
Go
Raw Normal View History

2013-04-14 21:37:33 +00:00
package raft
2013-04-17 02:28:08 +00:00
import (
"errors"
"fmt"
"sync"
)
2013-04-14 21:37:33 +00:00
//------------------------------------------------------------------------------
//
// Constants
//
//------------------------------------------------------------------------------
const (
2013-04-17 02:32:49 +00:00
Stopped = "stopped"
Follower = "follower"
Candidate = "candidate"
Leader = "leader"
)
const (
DefaultElectionTimeout = 150
2013-04-14 21:37:33 +00:00
)
//------------------------------------------------------------------------------
//
// Typedefs
//
//------------------------------------------------------------------------------
// A server is involved in the consensus protocol and can act as a follower,
// candidate or a leader.
type Server struct {
2013-04-17 02:28:08 +00:00
name string
path string
2013-04-17 02:32:49 +00:00
state string
2013-04-14 21:37:33 +00:00
currentTerm int
votedFor int
2013-04-17 02:28:08 +00:00
log *Log
replicas []*Replica
mutex sync.Mutex
2013-04-17 02:32:49 +00:00
ElectionTimeout int
2013-04-14 21:37:33 +00:00
}
//--------------------------------------
// Replicas
//--------------------------------------
// A replica is a reference to another server involved in the consensus protocol.
type Replica struct {
2013-04-17 02:28:08 +00:00
name string
2013-04-14 21:37:33 +00:00
voteResponded bool
voteGranted bool
nextIndex int
lastAgreeIndex int
}
//--------------------------------------
// Request Vote RPC
//--------------------------------------
// The request sent to a server to vote for a candidate to become a leader.
type RequestVoteRequest struct {
Term int `json:"term"`
CandidateId int `json:"candidateId"`
LastLogIndex int `json:"lastLogIndex"`
LastLogTerm int `json:"lastLogTerm"`
}
// The response returned from a server after a vote for a candidate to become a leader.
type RequestVoteResponse struct {
Term int `json:"term"`
VoteGranted bool `json:"voteGranted"`
}
//--------------------------------------
// Append Entries RPC
//--------------------------------------
// The request sent to a server to append entries to the log.
type AppendEntriesRequest struct {
Term int `json:"term"`
LeaderId int `json:"leaderId"`
PrevLogIndex int `json:"prevLogIndex"`
PrevLogTerm int `json:"prevLogTerm"`
Entries []*LogEntry `json:"entries"`
CommitIndex int `json:"commitIndex"`
}
// The response returned from a server appending entries to the log.
type AppendEntriesResponse struct {
Term int `json:"term"`
Success bool `json:"success"`
}
//------------------------------------------------------------------------------
//
// Constructor
//
//------------------------------------------------------------------------------
2013-04-17 02:28:08 +00:00
// Creates a new server with a log at the given path.
func NewServer(name string, path string) (*Server, error) {
if name == "" {
return nil, errors.New("raft.Server: Name cannot be blank")
}
s := &Server{
name: name,
path: path,
state: Stopped,
log: NewLog(),
2013-04-14 21:37:33 +00:00
}
2013-04-17 02:28:08 +00:00
return s, nil
}
//------------------------------------------------------------------------------
//
// Accessors
//
//------------------------------------------------------------------------------
// Retrieves the name of the server.
func (s *Server) Name() string {
return s.name
}
// Retrieves the storage path for the server.
func (s *Server) Path() string {
return s.path
}
// Retrieves the log path for the server.
func (s *Server) LogPath() string {
return fmt.Sprintf("%s/log", s.path)
}
// Retrieves the current state of the server.
2013-04-17 02:32:49 +00:00
func (s *Server) State() string {
2013-04-17 02:28:08 +00:00
return s.state
2013-04-14 21:37:33 +00:00
}
//------------------------------------------------------------------------------
//
// Methods
//
//------------------------------------------------------------------------------
2013-04-17 02:28:08 +00:00
//--------------------------------------
// State
//--------------------------------------
// Starts the server with a log at the given path.
func (s *Server) Start() error {
s.mutex.Lock()
defer s.mutex.Unlock()
// Exit if the server is already running.
if s.Running() {
return errors.New("raft.Server: Server already running")
}
// Initialize the log and load it up.
if err := s.log.Open(s.LogPath()); err != nil {
s.unload()
return fmt.Errorf("raft.Server: %v", err)
}
2013-04-17 02:32:49 +00:00
// Update the state.
s.state = Follower
2013-04-17 02:28:08 +00:00
return nil
}
// Shuts down the server.
func (s *Server) Stop() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.unload()
}
// Unloads the server.
func (s *Server) unload() {
if s.log != nil {
s.log.Close()
s.log = nil
}
2013-04-17 02:32:49 +00:00
s.state = Stopped
2013-04-17 02:28:08 +00:00
}
// Checks if the server is currently running.
func (s *Server) Running() bool {
return s.state != Stopped
}
//--------------------------------------
// Commands
//--------------------------------------
// Adds a command type to the log. The instance passed in will be copied and
// deserialized each time a new log entry is read. This function will panic
// if a command type with the same name already exists.
func (s *Server) AddCommandType(command Command) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.log.AddCommandType(command)
}
2013-04-17 02:32:49 +00:00
//--------------------------------------
// Membership
//--------------------------------------
// Connects to a given server and attempts to gain membership.
func (s *Server) Join(name string) error {
s.mutex.Lock()
defer s.mutex.Unlock()
// Exit if the server is not running.
if !s.Running() {
return errors.New("raft.Server: Cannot join while stopped")
} else if len(s.replicas) > 0 {
return errors.New("raft.Server: Cannot join; already in membership")
}
// If joining self then promote to leader.
if s.name == name {
s.state = Leader
return nil
}
return nil
}