influxdb/raft/log.go

1751 lines
42 KiB
Go

package raft
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"net/http"
"net/url"
"os"
"path/filepath"
"runtime"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
"time"
)
// FSM represents the state machine that the log is applied to.
// The FSM must maintain the highest index that it has seen.
type FSM interface {
// Executes a log entry against the state machine.
// Non-repeatable errors such as system and disk errors must panic.
MustApply(*LogEntry)
// Returns the highest index saved to the state machine.
Index() (uint64, error)
// Writes a snapshot of the entire state machine to a writer.
// Returns the index at the point in time of the snapshot.
Snapshot(w io.Writer) (index uint64, err error)
// Reads a snapshot of the entire state machine.
Restore(r io.Reader) error
}
const logEntryHeaderSize = 8 + 8 + 8 // sz+index+term
// WaitInterval represents the amount of time between checks to the applied index.
// This is used by clients wanting to wait until a given index is processed.
const WaitInterval = 1 * time.Millisecond
// State represents whether the log is a follower, candidate, or leader.
type State int
// String returns the string representation of the state.
func (s State) String() string {
switch s {
case Stopped:
return "stopped"
case Follower:
return "follower"
case Candidate:
return "candidate"
case Leader:
return "leader"
}
return "unknown"
}
const (
Stopped State = iota
Follower
Candidate
Leader
)
// Log represents a replicated log of commands.
type Log struct {
mu sync.Mutex
id uint64 // log identifier
path string // data directory
config *Config // cluster configuration
state State // current node state
heartbeats chan heartbeat // incoming heartbeat channel
lastLogTerm uint64 // highest term in the log
lastLogIndex uint64 // highest index in the log
term uint64 // current election term
leaderID uint64 // the current leader
votedFor uint64 // candidate voted for in current election term
lastContact time.Time // last contact from the leader
commitIndex uint64 // highest entry to be committed
appliedIndex uint64 // highest entry to applied to state machine
reader io.ReadCloser // incoming stream from leader
writers []*logWriter // outgoing streams to followers
entries []*LogEntry
wg sync.WaitGroup // pending goroutines
closing chan struct{} // close notification
// Network address to the reach the log.
URL *url.URL
// The state machine that log entries will be applied to.
FSM FSM
// The transport used to communicate with other nodes in the cluster.
Transport interface {
Join(u *url.URL, nodeURL *url.URL) (id uint64, leaderID uint64, config *Config, err error)
Leave(u *url.URL, id uint64) error
Heartbeat(u *url.URL, term, commitIndex, leaderID uint64) (lastIndex uint64, err error)
ReadFrom(u *url.URL, id, term, index uint64) (io.ReadCloser, error)
RequestVote(u *url.URL, term, candidateID, lastLogIndex, lastLogTerm uint64) error
}
// Clock is an abstraction of time.
Clock interface {
Now() time.Time
AfterApplyInterval() <-chan chan struct{}
AfterElectionTimeout() <-chan chan struct{}
AfterHeartbeatInterval() <-chan chan struct{}
AfterReconnectTimeout() <-chan chan struct{}
}
// Rand returns a random number.
Rand func() int64
// Sets whether trace messages are logged.
DebugEnabled bool
// This logs some asynchronous errors that occur within the log.
Logger *log.Logger
}
// NewLog creates a new instance of Log with reasonable defaults.
func NewLog() *Log {
l := &Log{
Clock: NewClock(),
Transport: &HTTPTransport{},
Rand: rand.NewSource(time.Now().UnixNano()).Int63,
heartbeats: make(chan heartbeat, 1),
}
l.SetLogOutput(os.Stderr)
return l
}
// Path returns the data path of the Raft log.
// Returns an empty string if the log is closed.
func (l *Log) Path() string { return l.path }
func (l *Log) idPath() string { return filepath.Join(l.path, "id") }
func (l *Log) termPath() string { return filepath.Join(l.path, "term") }
func (l *Log) configPath() string { return filepath.Join(l.path, "config") }
// Opened returns true if the log is currently open.
func (l *Log) Opened() bool {
l.mu.Lock()
defer l.mu.Unlock()
return l.opened()
}
func (l *Log) opened() bool { return l.path != "" }
// ID returns the log's identifier.
func (l *Log) ID() uint64 {
l.mu.Lock()
defer l.mu.Unlock()
return l.id
}
// State returns the current state.
func (l *Log) State() State {
l.mu.Lock()
defer l.mu.Unlock()
return l.state
}
// LastLogIndexTerm returns the last index & term from the log.
func (l *Log) LastLogIndexTerm() (index, term uint64) {
l.mu.Lock()
defer l.mu.Unlock()
return l.lastLogIndex, l.lastLogTerm
}
// CommtIndex returns the highest committed index.
func (l *Log) CommitIndex() uint64 {
l.mu.Lock()
defer l.mu.Unlock()
return l.commitIndex
}
// AppliedIndex returns the highest applied index.
func (l *Log) AppliedIndex() uint64 {
l.mu.Lock()
defer l.mu.Unlock()
return l.appliedIndex
}
// Term returns the current term.
func (l *Log) Term() uint64 {
l.mu.Lock()
defer l.mu.Unlock()
return l.term
}
// Config returns a the log's current configuration.
func (l *Log) Config() *Config {
l.mu.Lock()
defer l.mu.Unlock()
if l.config != nil {
return l.config.Clone()
}
return nil
}
// Open initializes the log from a path.
// If the path does not exist then it is created.
func (l *Log) Open(path string) error {
l.mu.Lock()
defer l.mu.Unlock()
// Validate initial log state.
if l.opened() {
return ErrOpen
}
// Create directory, if not exists.
if err := os.MkdirAll(path, 0700); err != nil {
return err
}
l.path = path
// Initialize log identifier.
id, err := l.readID()
if err != nil {
_ = l.close()
return err
}
l.setID(id)
// Initialize log term.
term, err := l.readTerm()
if err != nil {
_ = l.close()
return err
}
l.term = term
l.votedFor = 0
l.lastLogTerm = term
// Read config.
c, err := l.readConfig()
if err != nil {
_ = l.close()
return err
}
l.config = c
// Determine last applied index from FSM.
index, err := l.FSM.Index()
if err != nil {
return err
}
l.tracef("Open: fsm: index=%d", index)
l.lastLogIndex = index
l.appliedIndex = index
l.commitIndex = index
// Start goroutine to apply logs.
l.wg.Add(1)
l.closing = make(chan struct{})
go l.applier(l.closing)
// If a log exists then start the state loop.
if c != nil {
l.Logger.Printf("log open: created at %s, with ID %d, term %d, last applied index of %d",
path, l.id, l.term, l.lastLogIndex)
// If the config only has one node then start it as the leader.
// Otherwise start as a follower.
if len(c.Nodes) == 1 && c.Nodes[0].ID == l.id {
l.Logger.Println("log open: promoting to leader immediately")
l.startStateLoop(l.closing, Leader)
} else {
l.startStateLoop(l.closing, Follower)
}
} else {
l.Logger.Printf("log pending: waiting for initialization or join")
}
return nil
}
// Close closes the log.
func (l *Log) Close() error {
l.mu.Lock()
defer l.mu.Unlock()
return l.close()
}
// close should be called under lock.
func (l *Log) close() error {
l.tracef("closing...")
// Close the reader, if open.
l.closeReader()
// Notify goroutines of closing and wait outside of lock.
if l.closing != nil {
close(l.closing)
l.closing = nil
l.mu.Unlock()
l.wg.Wait()
l.mu.Lock()
}
// Close the writers.
for _, w := range l.writers {
_ = w.Close()
}
l.writers = nil
// Clear log info.
l.setID(0)
l.path = ""
l.lastLogIndex, l.lastLogTerm = 0, 0
l.term, l.votedFor = 0, 0
l.config = nil
l.tracef("closed")
return nil
}
func (l *Log) closeReader() {
if l.reader != nil {
_ = l.reader.Close()
l.reader = nil
}
}
func (l *Log) setID(id uint64) {
l.id = id
l.updateLogPrefix()
}
// readID reads the log identifier from file.
func (l *Log) readID() (uint64, error) {
// Read identifier from disk.
b, err := ioutil.ReadFile(l.idPath())
if os.IsNotExist(err) {
return 0, nil
} else if err != nil {
return 0, err
}
// Parse identifier.
id, err := strconv.ParseUint(string(b), 10, 64)
if err != nil {
return 0, err
}
return id, nil
}
// writeID writes the log identifier to file.
func (l *Log) writeID(id uint64) error {
b := []byte(strconv.FormatUint(id, 10))
return ioutil.WriteFile(l.idPath(), b, 0600)
}
// readTerm reads the log term from file.
func (l *Log) readTerm() (uint64, error) {
// Read term from disk.
b, err := ioutil.ReadFile(l.termPath())
if os.IsNotExist(err) {
return 0, nil
} else if err != nil {
return 0, err
}
// Parse term.
id, err := strconv.ParseUint(string(b), 10, 64)
if err != nil {
return 0, err
}
return id, nil
}
// writeTerm writes the current log term to file.
func (l *Log) writeTerm(term uint64) error {
b := []byte(strconv.FormatUint(term, 10))
return ioutil.WriteFile(l.termPath(), b, 0600)
}
// readConfig reads the configuration from disk.
func (l *Log) readConfig() (*Config, error) {
// Read config from disk.
f, err := os.Open(l.configPath())
if err != nil && !os.IsNotExist(err) {
return nil, err
}
defer func() { _ = f.Close() }()
// Marshal file to a config type.
var config *Config
if f != nil {
config = &Config{}
if err := NewConfigDecoder(f).Decode(config); err != nil {
return nil, err
}
}
return config, nil
}
// writeConfig writes the configuration to disk.
func (l *Log) writeConfig(config *Config) error {
// FIX(benbjohnson): Atomic write.
// Open file.
f, err := os.Create(l.configPath())
if err != nil {
return err
}
defer func() { _ = f.Close() }()
// Marshal config into file.
if err := NewConfigEncoder(f).Encode(config); err != nil {
return err
}
return nil
}
// Initialize a new log.
// Returns an error if log data already exists.
func (l *Log) Initialize() error {
var config *Config
err := func() error {
l.mu.Lock()
defer l.mu.Unlock()
// Return error if log is not open or is already a member of a cluster.
if !l.opened() {
return ErrClosed
} else if l.id != 0 {
return ErrInitialized
}
// Start first node at id 1.
id := uint64(1)
// Generate a new configuration with one node.
config = &Config{MaxNodeID: id}
config.AddNode(id, l.URL)
// Generate new 8-hex digit cluster identifier.
config.ClusterID = uint64(l.Rand())
// Generate log id.
if err := l.writeID(id); err != nil {
return err
}
l.setID(id)
// Automatically promote to leader.
term := uint64(1)
if err := l.writeTerm(term); err != nil {
return fmt.Errorf("write term: %s", err)
}
l.term = term
l.votedFor = 0
l.lastLogTerm = term
// Begin state loop as leader.
l.startStateLoop(l.closing, Leader)
l.Logger.Printf("log initialize: promoted to 'leader' with cluster ID %d, log ID %d, term %d",
config.ClusterID, l.id, l.term)
return nil
}()
if err != nil {
return err
}
// Set initial configuration.
var buf bytes.Buffer
_ = NewConfigEncoder(&buf).Encode(config)
index, err := l.internalApply(LogEntryInitialize, buf.Bytes())
if err != nil {
return err
}
// Wait until entry is applied.
return l.Wait(index)
}
// SetLogOutput sets writer for all Raft output.
func (l *Log) SetLogOutput(w io.Writer) {
l.Logger = log.New(w, "", log.LstdFlags)
l.updateLogPrefix()
}
func (l *Log) updateLogPrefix() {
var host string
if l.URL != nil {
host = l.URL.Host
}
l.Logger.SetPrefix(fmt.Sprintf("[raft] %s ", host))
}
// trace writes a log message if DebugEnabled is true.
func (l *Log) trace(v ...interface{}) {
if l.DebugEnabled {
l.Logger.Print(v...)
}
}
// trace writes a formatted log message if DebugEnabled is true.
func (l *Log) tracef(msg string, v ...interface{}) {
if l.DebugEnabled {
l.Logger.Printf(msg+"\n", v...)
}
}
// Leader returns the id and URL associated with the current leader.
// Returns zero if there is no current leader.
func (l *Log) Leader() (id uint64, u *url.URL) {
l.mu.Lock()
defer l.mu.Unlock()
return l.leader()
}
func (l *Log) leader() (id uint64, u *url.URL) {
// Ignore if there's no configuration set.
if l.config == nil {
return
}
// Find node by identifier.
n := l.config.NodeByID(l.leaderID)
if n == nil {
return
}
return n.ID, n.URL
}
// Join contacts a node in the cluster to request membership.
// A log cannot join a cluster if it has already been initialized.
func (l *Log) Join(u *url.URL) error {
// Validate under lock.
var nodeURL *url.URL
if err := func() error {
l.mu.Lock()
defer l.mu.Unlock()
if !l.opened() {
return ErrClosed
} else if l.id != 0 {
return ErrInitialized
} else if l.URL == nil {
return ErrURLRequired
}
nodeURL = l.URL
return nil
}(); err != nil {
return err
}
l.tracef("Join: %s", u)
// Send join request.
id, leaderID, config, err := l.Transport.Join(u, nodeURL)
if err != nil {
return err
}
l.leaderID = leaderID
l.tracef("Join: confirmed")
// Lock once the join request is returned.
l.mu.Lock()
defer l.mu.Unlock()
// Write identifier.
if err := l.writeID(id); err != nil {
return err
}
l.setID(id)
// Write config.
if err := l.writeConfig(config); err != nil {
return err
}
l.config = config
// Begin state loop as follower.
l.startStateLoop(l.closing, Follower)
// Change to a follower state.
l.Logger.Println("log join: entered 'follower' state for cluster at", u, " with log ID", l.id)
return nil
}
// Leave removes the log from cluster membership and removes the log data.
func (l *Log) Leave() error {
l.mu.Lock()
defer l.mu.Unlock()
// TODO(benbjohnson): Check if open.
// TODO(benbjohnson): Apply remove peer command.
// TODO(benbjohnson): Remove underlying data.
return nil
}
// startStateLoop begins the state loop in a separate goroutine.
// Returns once the state has transitioned to the initial state passed in.
func (l *Log) startStateLoop(closing <-chan struct{}, state State) {
l.wg.Add(1)
go l.stateLoop(closing, state)
// Wait until state change.
for {
if l.state == state {
break
}
runtime.Gosched()
}
}
// stateLoop runs in a separate goroutine and runs the appropriate state loop.
func (l *Log) stateLoop(closing <-chan struct{}, state State) {
defer l.wg.Done()
for {
// Transition to new state.
l.Logger.Printf("log state change: %s => %s", l.state, state)
l.state = state
// Remove previous reader, if one exists.
if l.reader != nil {
_ = l.reader.Close()
}
// Execute the appropriate state loop.
// Each loop returns the next state to transition to.
switch state {
case Stopped:
return
case Follower:
state = l.followerLoop(closing)
case Candidate:
state = l.candidateLoop(closing)
case Leader:
state = l.leaderLoop(closing)
}
}
}
func (l *Log) followerLoop(closing <-chan struct{}) State {
l.tracef("followerLoop")
defer l.tracef("followerLoop: exit")
// Ensure all follower goroutines complete before transitioning to another state.
var wg sync.WaitGroup
defer wg.Wait()
var transitioning = make(chan struct{})
defer close(transitioning)
// Read log from leader in a separate goroutine.
wg.Add(1)
go l.readFromLeader(&wg, transitioning)
for {
select {
case <-closing:
return Stopped
case ch := <-l.Clock.AfterElectionTimeout():
l.closeReader()
close(ch)
return Candidate
case hb := <-l.heartbeats:
l.tracef("followerLoop: heartbeat: term=%d, idx=%d", hb.term, hb.commitIndex)
// Update term, commit index & leader.
l.mu.Lock()
if hb.term > l.term {
l.term = hb.term
l.votedFor = 0
}
if hb.commitIndex > l.commitIndex {
l.commitIndex = hb.commitIndex
}
l.leaderID = hb.leaderID
l.mu.Unlock()
}
}
}
func (l *Log) readFromLeader(wg *sync.WaitGroup, transitioning <-chan struct{}) {
defer wg.Done()
l.tracef("readFromLeader:")
for {
select {
case <-transitioning:
l.tracef("readFromLeader: exiting")
return
default:
}
// Retrieve the term, last commit index, & leader URL.
l.mu.Lock()
id, commitIndex, term := l.id, l.commitIndex, l.term
_, u := l.leader()
l.mu.Unlock()
// If no leader exists then wait momentarily and retry.
if u == nil {
l.tracef("readFromLeader: no leader")
time.Sleep(100 * time.Millisecond)
continue
}
// Connect to leader.
l.tracef("readFromLeader: read from: %s, id=%d, term=%d, index=%d", u.String(), id, term, commitIndex)
r, err := l.Transport.ReadFrom(u, id, term, commitIndex)
if err != nil {
l.Logger.Printf("connect stream: %s", err)
}
// Attach the stream to the log.
if err := l.ReadFrom(r); err != nil {
l.tracef("readFromLeader: read from: disconnect: %s", err)
}
}
}
// truncate removes all uncommitted entries.
func (l *Log) truncate() {
if len(l.entries) == 0 {
return
}
entmin := l.entries[0].Index
assert(l.commitIndex >= entmin, "commit index before lowest entry: commit=%d, entmin=%d", l.commitIndex, entmin)
l.entries = l.entries[:l.commitIndex-entmin]
l.lastLogIndex = l.commitIndex
}
// candidateLoop requests vote from other nodes in an attempt to become leader.
func (l *Log) candidateLoop(closing <-chan struct{}) State {
l.tracef("candidateLoop")
defer l.tracef("candidateLoop: exit")
// TODO: prevote
// Increment term and request votes.
l.mu.Lock()
l.term++
l.votedFor = l.id
term := l.term
l.mu.Unlock()
// Ensure all candidate goroutines complete before transitioning to another state.
var wg sync.WaitGroup
defer wg.Wait()
var transitioning = make(chan struct{})
defer close(transitioning)
// Read log from leader in a separate goroutine.
wg.Add(1)
elected := make(chan struct{}, 1)
go l.elect(term, elected, &wg, transitioning)
for {
select {
case <-closing:
return Stopped
case hb := <-l.heartbeats:
l.mu.Lock()
if hb.term >= l.term {
l.term = hb.term
l.votedFor = 0
l.leaderID = hb.leaderID
l.mu.Unlock()
return Follower
}
l.mu.Unlock()
case <-elected:
return Leader
case ch := <-l.Clock.AfterElectionTimeout():
close(ch)
return Follower
}
}
}
func (l *Log) elect(term uint64, elected chan struct{}, wg *sync.WaitGroup, transitioning <-chan struct{}) {
defer wg.Done()
// Ensure we are in the same term and copy properties.
l.mu.Lock()
if term != l.term {
l.mu.Unlock()
return
}
id, config := l.id, l.config
lastLogIndex, lastLogTerm := l.lastLogIndex, l.lastLogTerm
l.mu.Unlock()
// Request votes from peers.
votes := make(chan struct{}, len(config.Nodes))
for _, n := range config.Nodes {
if n.ID == id {
continue
}
go func(n *ConfigNode) {
if err := l.Transport.RequestVote(n.URL, term, id, lastLogIndex, lastLogTerm); err != nil {
l.tracef("sendVoteRequests: %s: %s", n.URL.String(), err)
return
}
votes <- struct{}{}
}(n)
}
// Wait until we have a quorum before responding.
voteN := 1
for {
// Signal channel that the log has been elected.
if voteN >= (len(config.Nodes)/2)+1 {
elected <- struct{}{}
return
}
// Wait until log transitions to another state or we receive a vote.
select {
case <-transitioning:
return
case <-votes:
voteN++
}
}
}
// leaderLoop periodically sends heartbeats to all followers to maintain dominance.
func (l *Log) leaderLoop(closing <-chan struct{}) State {
l.tracef("leaderLoop")
defer l.tracef("leaderLoop: exit")
// Ensure all leader goroutines complete before transitioning to another state.
var wg sync.WaitGroup
defer wg.Wait()
var transitioning = make(chan struct{})
defer close(transitioning)
// Retrieve leader's term.
l.mu.Lock()
term := l.term
l.mu.Unlock()
// Read log from leader in a separate goroutine.
for {
// Send hearbeat to followers.
wg.Add(1)
committed := make(chan uint64, 1)
go l.heartbeater(term, committed, &wg, transitioning)
// Wait for close, new leader, or new heartbeat response.
select {
case <-closing: // wait for state change.
return Stopped
case hb := <-l.heartbeats: // step down on higher term
if hb.term > term {
l.mu.Lock()
l.truncate()
l.mu.Unlock()
return Follower
}
continue
case commitIndex, ok := <-committed:
// Quorum not reached, try again.
if !ok {
continue
}
// Quorum reached, set new commit index.
l.mu.Lock()
if commitIndex > l.commitIndex {
l.tracef("leaderLoop: committed: idx=%d", commitIndex)
l.commitIndex = commitIndex
}
l.mu.Unlock()
continue
}
}
}
// heartbeater continuoally sends heartbeats to all peers.
func (l *Log) heartbeater(term uint64, committed chan uint64, wg *sync.WaitGroup, transitioning <-chan struct{}) {
defer wg.Done()
// Ensure term is correct and retrieve current state.
l.mu.Lock()
if l.term != term {
l.mu.Unlock()
return
}
commitIndex, localIndex, leaderID, config := l.commitIndex, l.lastLogIndex, l.id, l.config
l.mu.Unlock()
// Commit latest index if there are no peers.
if config == nil || len(config.Nodes) <= 1 {
time.Sleep(10 * time.Millisecond)
committed <- localIndex
return
}
l.tracef("heartbeater: start: n=%d", len(config.Nodes))
// Send heartbeats to all peers.
peerIndices := make(chan uint64, len(config.Nodes))
for _, n := range config.Nodes {
if n.ID == leaderID {
continue
}
go func(n *ConfigNode) {
//l.tracef("heartbeater: send: url=%s, term=%d, commit=%d, leaderID=%d", n.URL, term, commitIndex, leaderID)
peerIndex, err := l.Transport.Heartbeat(n.URL, term, commitIndex, leaderID)
//l.tracef("heartbeater: recv: url=%s, peerIndex=%d, err=%s", n.URL, peerIndex, err)
if err != nil {
l.Logger.Printf("heartbeater: error: %s", err)
return
}
peerIndices <- peerIndex
}(n)
}
// Wait for heartbeat responses or timeout.
after := l.Clock.AfterHeartbeatInterval()
indexes := make([]uint64, 1, len(config.Nodes))
indexes[0] = localIndex
for {
select {
case <-transitioning:
l.tracef("heartbeater: transitioning")
return
case peerIndex := <-peerIndices:
l.tracef("heartbeater: index: idx=%d, idxs=%+v", peerIndex, indexes)
indexes = append(indexes, peerIndex) // collect responses
case ch := <-after:
// Once we have enough indices then return the lowest index
// among the highest quorum of nodes.
quorumN := (len(config.Nodes) / 2) + 1
if len(indexes) >= quorumN {
// Return highest index reported by quorum.
sort.Sort(sort.Reverse(uint64Slice(indexes)))
committed <- indexes[quorumN-1]
l.tracef("heartbeater: commit: idx=%d, idxs=%+v", commitIndex, indexes)
} else {
l.tracef("heartbeater: no quorum: idxs=%+v", indexes)
close(committed)
}
close(ch)
return
}
}
}
type heartbeatResponse struct {
peerTerm uint64
peerIndex uint64
}
// check looks if the channel has any messages.
// If it does then errDone is returned, otherwise nil is returned.
func check(done chan struct{}) error {
select {
case <-done:
return errDone
default:
return nil
}
}
// Apply executes a command against the log.
// This function returns once the command has been committed to the log.
func (l *Log) Apply(command []byte) (uint64, error) {
return l.internalApply(LogEntryCommand, command)
}
func (l *Log) internalApply(typ LogEntryType, command []byte) (index uint64, err error) {
l.mu.Lock()
defer l.mu.Unlock()
// Do not apply if this node is closed.
// Do not apply if this node is not the leader.
if l.state == Stopped {
return 0, ErrClosed
} else if l.state != Leader {
return 0, ErrNotLeader
}
// Create log entry.
e := LogEntry{
Type: typ,
Index: l.lastLogIndex + 1,
Term: l.term,
Data: command,
}
index = e.Index
// Append to the log.
l.append(&e)
// If there is no config or only one node then move commit index forward.
if l.config == nil || len(l.config.Nodes) <= 1 {
l.commitIndex = l.lastLogIndex
}
return
}
// Wait blocks until a given index is applied.
func (l *Log) Wait(index uint64) error {
// TODO(benbjohnson): Check for leadership change (?).
// TODO(benbjohnson): Add timeout.
for {
l.mu.Lock()
state, appliedIndex := l.state, l.appliedIndex
l.mu.Unlock()
if state == Stopped {
return ErrClosed
} else if appliedIndex >= index {
return nil
}
time.Sleep(WaitInterval)
}
}
// waitCommitted blocks until a given committed index is reached.
func (l *Log) waitCommitted(index uint64) error {
for {
l.mu.Lock()
state, committedIndex := l.state, l.commitIndex
l.mu.Unlock()
if state == Stopped {
return ErrClosed
} else if committedIndex >= index {
return nil
}
time.Sleep(WaitInterval)
}
}
// waitUncommitted blocks until a given uncommitted index is reached.
func (l *Log) waitUncommitted(index uint64) error {
for {
l.mu.Lock()
lastLogIndex := l.lastLogIndex
//l.tracef("waitUncommitted: %s / %d", l.state, l.lastLogIndex)
l.mu.Unlock()
if lastLogIndex >= index {
return nil
}
time.Sleep(WaitInterval)
}
}
// append adds a log entry to the list of entries.
func (l *Log) append(e *LogEntry) {
//l.tracef("append: idx=%d, prev=%d", e.Index, l.lastLogIndex)
assert(e.Index == l.lastLogIndex+1, "non-contiguous log index(%d): idx=%d, prev=%d", l.id, e.Index, l.lastLogIndex)
// Encode entry to a byte slice.
buf := make([]byte, logEntryHeaderSize+len(e.Data))
copy(buf, e.encodedHeader())
copy(buf[logEntryHeaderSize:], e.Data)
// Add to pending entries list to wait to be applied.
l.entries = append(l.entries, e)
l.lastLogIndex = e.Index
l.lastLogTerm = e.Term
// Write to tailing writers.
for i := 0; i < len(l.writers); i++ {
w := l.writers[i]
// If an error occurs then remove the writer and close it.
if _, err := w.Write(buf); err != nil {
l.removeWriter(w)
i--
continue
}
// Flush, if possible.
flushWriter(w.Writer)
}
}
// applier runs in a separate goroutine and applies all entries between the
// previously applied index and the current commit index.
func (l *Log) applier(closing <-chan struct{}) {
defer l.wg.Done()
for {
// Wait for a close signal or timeout.
var confirm chan struct{}
select {
case <-closing:
return
case confirm = <-l.Clock.AfterApplyInterval():
}
//l.tracef("applier")
// Apply all entries committed since the previous apply.
err := func() error {
l.mu.Lock()
defer l.mu.Unlock()
// Verify, under lock, that we're not closing.
select {
case <-closing:
return nil
default:
}
// Ignore if there are no pending entries.
// Ignore if all entries are applied.
if len(l.entries) == 0 {
//l.tracef("applier: no entries")
return nil
} else if l.appliedIndex == l.commitIndex {
//l.tracef("applier: up to date")
return nil
}
// Determine the available range of indices on the log.
entmin, entmax := l.entries[0].Index, l.entries[len(l.entries)-1].Index
assert(entmin <= entmax, "apply: log out of order: min=%d, max=%d", entmin, entmax)
assert(uint64(len(l.entries)) == (entmax-entmin+1), "apply: missing entries: min=%d, max=%d, len=%d", entmin, entmax, len(l.entries))
// Determine the range of indices that should be processed.
// This should be the entry after the last applied index through to
// the committed index.
nextUnappliedIndex, commitIndex := l.appliedIndex+1, l.commitIndex
l.tracef("applier: entries: available=%d-%d, [next,commit]=%d-%d", entmin, entmax, nextUnappliedIndex, commitIndex)
assert(nextUnappliedIndex <= commitIndex, "next unapplied index after commit index: next=%d, commit=%d", nextUnappliedIndex, commitIndex)
// Determine the lowest index to start from.
// This should be the next entry after the last applied entry.
// Ignore if we don't have any entries after the last applied yet.
assert(entmin <= nextUnappliedIndex, "apply: missing entries: min=%d, next=%d", entmin, nextUnappliedIndex)
if nextUnappliedIndex > entmax {
return nil
}
imin := nextUnappliedIndex
// Determine the highest index to go to.
// This should be the committed index.
// If we haven't yet received the committed index then go to the last available.
var imax uint64
if commitIndex <= entmax {
imax = commitIndex
} else {
imax = entmax
}
// Determine entries to apply.
l.tracef("applier: entries: available=%d-%d, applying=%d-%d", entmin, entmax, imin, imax)
entries := l.entries[imin-entmin : imax-entmin+1]
// Determine low water mark for entries to cut off.
for _, w := range l.writers {
if w.snapshotIndex > 0 && w.snapshotIndex < imax {
imax = w.snapshotIndex
}
}
l.entries = l.entries[imax-entmin:]
// Iterate over each entry and apply it.
for _, e := range entries {
// l.tracef("applier: entry: idx=%d", e.Index)
switch e.Type {
case LogEntryCommand, LogEntryNop:
case LogEntryInitialize:
l.mustApplyInitialize(e)
case LogEntryAddPeer:
l.mustApplyAddPeer(e)
case LogEntryRemovePeer:
l.mustApplyRemovePeer(e)
default:
panic("unsupported command type: " + strconv.Itoa(int(e.Type)))
}
// Apply to FSM.
if e.Index > 0 {
l.FSM.MustApply(e)
}
// Increment applied index.
l.appliedIndex++
}
return nil
}()
// If error occurred then log it.
// The log will retry after a given timeout.
if err != nil {
l.Logger.Printf("apply error: %s", err)
// TODO(benbjohnson): Longer timeout before retry?
}
// Signal clock that apply is done.
close(confirm)
}
}
// mustApplyInitialize a log initialization command by parsing and setting the configuration.
func (l *Log) mustApplyInitialize(e *LogEntry) {
// Parse the configuration from the log entry.
config := &Config{}
if err := NewConfigDecoder(bytes.NewReader(e.Data)).Decode(config); err != nil {
panic("decode: " + err.Error())
}
// Set the last update index on the configuration.
config.Index = e.Index
// TODO(benbjohnson): Lock the log while we update the configuration.
// Perist the configuration to disk.
if err := l.writeConfig(config); err != nil {
panic("write config: " + err.Error())
}
l.config = config
}
// mustApplyAddPeer adds a node to the cluster configuration.
func (l *Log) mustApplyAddPeer(e *LogEntry) {
// Unmarshal node from entry data.
var n *ConfigNode
if err := json.Unmarshal(e.Data, &n); err != nil {
panic("unmarshal: " + err.Error())
}
// Clone configuration.
config := l.config.Clone()
// Increment the node identifier.
config.MaxNodeID++
n.ID = config.MaxNodeID
// Add node to configuration.
if err := config.AddNode(n.ID, n.URL); err != nil {
l.Logger.Panicf("apply: add node: %s", err)
}
// Set configuration index.
config.Index = e.Index
// Write configuration.
if err := l.writeConfig(config); err != nil {
panic("write config: " + err.Error())
}
l.config = config
}
// mustApplyRemovePeer removes a node from the cluster configuration.
func (l *Log) mustApplyRemovePeer(e *LogEntry) error {
// TODO(benbjohnson): Clone configuration.
// TODO(benbjohnson): Remove node from configuration.
// TODO(benbjohnson): Set configuration index.
// TODO(benbjohnson): Write configuration.
return nil
}
// AddPeer creates a new peer in the cluster.
// Returns the new peer's identifier and the current configuration.
func (l *Log) AddPeer(u *url.URL) (uint64, uint64, *Config, error) {
// Validate URL.
if u == nil {
return 0, 0, nil, fmt.Errorf("peer url required")
}
// Apply command.
b, _ := json.Marshal(&ConfigNode{URL: u})
index, err := l.internalApply(LogEntryAddPeer, b)
if err != nil {
return 0, 0, nil, err
}
if err := l.Wait(index); err != nil {
return 0, 0, nil, err
}
// Lock while we look up the node.
l.mu.Lock()
defer l.mu.Unlock()
// Look up node.
n := l.config.NodeByURL(u)
if n == nil {
return 0, 0, nil, fmt.Errorf("node not found")
}
return n.ID, l.leaderID, l.config.Clone(), nil
}
// RemovePeer removes an existing peer from the cluster by id.
func (l *Log) RemovePeer(id uint64) error {
l.mu.Lock()
defer l.mu.Unlock()
// TODO(benbjohnson): Apply removePeerCommand.
return nil
}
// Heartbeat establishes dominance by the current leader.
// Returns the current term and highest written log entry index.
func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (currentIndex uint64, err error) {
l.mu.Lock()
defer l.mu.Unlock()
// Check if log is closed.
if !l.opened() || l.state == Stopped {
l.tracef("Heartbeat: closed")
return 0, ErrClosed
}
// Ignore if the incoming term is less than the log's term.
if term < l.term {
l.tracef("HB: stale term, ignore: %d < %d", term, l.term)
return l.lastLogIndex, ErrStaleTerm
}
// Send heartbeat to channel for the state loop to process.
select {
case l.heartbeats <- heartbeat{term: term, commitIndex: commitIndex, leaderID: leaderID}:
default:
}
l.tracef("HB: (term=%d, commit=%d, leaderID: %d) (index=%d, term=%d)", term, commitIndex, leaderID, l.lastLogIndex, l.term)
return l.lastLogIndex, nil
}
// RequestVote requests a vote from the log.
func (l *Log) RequestVote(term, candidateID, lastLogIndex, lastLogTerm uint64) (err error) {
l.mu.Lock()
defer l.mu.Unlock()
// Check if log is closed.
if !l.opened() {
return ErrClosed
}
defer func() {
l.tracef("RV(term=%d, candidateID=%d, lastLogIndex=%d, lastLogTerm=%d) (err=%v)", term, candidateID, lastLogIndex, lastLogTerm, err)
}()
// Deny vote if:
// 1. Candidate is requesting a vote from an earlier term. (§5.1)
// 2. Already voted for a different candidate in this term. (§5.2)
// 3. Candidate log is less up-to-date than local log. (§5.4)
if term < l.term {
return ErrStaleTerm
} else if term == l.term && l.votedFor != 0 && l.votedFor != candidateID {
return ErrAlreadyVoted
} else if lastLogTerm < l.lastLogTerm {
return ErrOutOfDateLog
} else if lastLogTerm == l.lastLogTerm && lastLogIndex < l.lastLogIndex {
return ErrOutOfDateLog
}
// Vote for candidate.
l.votedFor = candidateID
return nil
}
// WriteEntriesTo attaches a writer to the log from a given index.
// The index specified must be a committed index.
func (l *Log) WriteEntriesTo(w io.Writer, id, term, index uint64) error {
// Validate and initialize the writer.
writer, err := l.initWriter(w, id, term, index)
if err != nil {
return err
}
// Write the snapshot and advance the writer through the log.
// If an error occurs then remove the writer.
if err := l.writeTo(writer, id, term, index); err != nil {
l.mu.Lock()
l.removeWriter(writer)
l.mu.Unlock()
return err
}
// Wait for writer to finish.
<-writer.done
return nil
}
func (l *Log) writeTo(writer *logWriter, id, term, index uint64) error {
// Extract the underlying writer.
w := writer.Writer
// Write snapshot marker byte.
if _, err := w.Write([]byte{logEntrySnapshot}); err != nil {
return err
}
// Begin streaming the snapshot.
snapshotIndex, err := l.FSM.Snapshot(w)
if err != nil {
return err
}
// Write snapshot index at the end and flush.
if err := binary.Write(w, binary.BigEndian, snapshotIndex); err != nil {
return fmt.Errorf("write snapshot index: %s", err)
}
flushWriter(w)
// Write entries since the snapshot occurred and begin tailing writer.
if err := l.advanceWriter(writer, snapshotIndex); err != nil {
return err
}
return nil
}
// validates writer and adds it to the list of writers.
func (l *Log) initWriter(w io.Writer, id, term, index uint64) (*logWriter, error) {
l.mu.Lock()
defer l.mu.Unlock()
// Check if log is closed.
if !l.opened() {
return nil, ErrClosed
}
// Do not begin streaming if:
// 1. Node is not the leader.
// 2. Term is earlier than current term.
// 3. Index is after the commit index.
if l.state != Leader {
return nil, ErrNotLeader
} else if term > l.term {
return nil, ErrStaleTerm
} else if index > l.lastLogIndex {
return nil, ErrUncommittedIndex
}
// OPTIMIZE(benbjohnson): Create buffered output to prevent blocking.
// Write configuration.
var buf bytes.Buffer
err := NewConfigEncoder(&buf).Encode(l.config)
assert(err == nil, "marshal config error: %s", err)
enc := NewLogEntryEncoder(w)
if err := enc.Encode(&LogEntry{Type: logEntryConfig, Data: buf.Bytes()}); err != nil {
return nil, err
}
flushWriter(w)
// Wrap writer and append to log to tail.
writer := &logWriter{
Writer: w,
id: id,
snapshotIndex: l.appliedIndex,
done: make(chan struct{}),
}
l.writers = append(l.writers, writer)
return writer, nil
}
// replays entries since the snapshot's index and begins tailing the log.
func (l *Log) advanceWriter(writer *logWriter, snapshotIndex uint64) error {
l.mu.Lock()
defer l.mu.Unlock()
// Check if writer has been closed during snapshot.
select {
case <-writer.done:
return errors.New("writer closed during snapshot")
default:
}
// Determine the highest snapshot index. The writer's snapshot index can
// be higher if non-command entries have been applied.
if writer.snapshotIndex > snapshotIndex {
snapshotIndex = writer.snapshotIndex
}
snapshotIndex++
// Write pending entries.
if len(l.entries) > 0 {
startIndex := l.entries[0].Index
enc := NewLogEntryEncoder(writer.Writer)
for _, e := range l.entries[snapshotIndex-startIndex:] {
if err := enc.Encode(e); err != nil {
return err
}
}
}
// Flush data.
flushWriter(writer.Writer)
// Clear snapshot index on writer.
writer.snapshotIndex = 0
return nil
}
// removeWriter removes a writer from the list of log writers.
func (l *Log) removeWriter(writer *logWriter) {
l.tracef("removeWriter")
for i, w := range l.writers {
if w == writer {
copy(l.writers[i:], l.writers[i+1:])
l.writers[len(l.writers)-1] = nil
l.writers = l.writers[:len(l.writers)-1]
_ = w.Close()
break
}
}
return
}
// Flush pushes out buffered data for all open writers.
func (l *Log) Flush() {
l.mu.Lock()
defer l.mu.Unlock()
for _, w := range l.writers {
flushWriter(w.Writer)
}
}
// ReadFrom continually reads log entries from a reader.
func (l *Log) ReadFrom(r io.ReadCloser) error {
l.tracef("ReadFrom")
if err := l.initReadFrom(r); err != nil {
return err
}
// If a nil reader is passed in then exit.
if r == nil {
return nil
}
// Continually decode entries.
dec := NewLogEntryDecoder(r)
for {
// Decode single entry.
var e LogEntry
if err := dec.Decode(&e); err == io.EOF {
return nil
} else if err != nil {
return err
}
// If this is a config entry then update the config.
if e.Type == logEntryConfig {
l.tracef("ReadFrom: config")
config := &Config{}
if err := NewConfigDecoder(bytes.NewReader(e.Data)).Decode(config); err != nil {
return err
}
l.mu.Lock()
if err := l.writeConfig(config); err != nil {
l.mu.Unlock()
return err
}
l.config = config
l.mu.Unlock()
continue
}
// If this is a snapshot then load it.
if e.Type == logEntrySnapshot {
l.tracef("ReadFrom: snapshot")
if err := l.FSM.Restore(r); err != nil {
return err
}
l.tracef("ReadFrom: snapshot: restored")
// Read the snapshot index off the end of the snapshot.
var index uint64
if err := binary.Read(r, binary.BigEndian, &index); err != nil {
return fmt.Errorf("read snapshot index: %s", err)
}
l.tracef("ReadFrom: snapshot: index=%d", index)
// Update the indicies & clear the entries.
l.mu.Lock()
l.lastLogIndex = index
l.commitIndex = index
l.appliedIndex = index
l.entries = nil
l.mu.Unlock()
continue
}
// Append entry to the log.
l.mu.Lock()
if l.state == Stopped {
l.mu.Unlock()
return nil
}
//l.tracef("ReadFrom: entry: index=%d / prev=%d / commit=%d", e.Index, l.lastLogIndex, l.commitIndex)
l.append(&e)
l.mu.Unlock()
}
}
// Initializes the ReadFrom() call under a lock and swaps out the readers.
func (l *Log) initReadFrom(r io.ReadCloser) error {
l.mu.Lock()
defer l.mu.Unlock()
// Check if log is closed.
if !l.opened() {
return ErrClosed
}
// Remove previous reader, if one exists.
l.closeReader()
// Set new reader.
l.reader = r
return nil
}
// heartbeat represents an incoming heartbeat.
type heartbeat struct {
term uint64
commitIndex uint64
leaderID uint64
}
// logWriter wraps writers to provide a channel for close notification.
type logWriter struct {
io.Writer
id uint64 // target's log id
snapshotIndex uint64 // snapshot index, if zero then ignored.
done chan struct{} // close notification
}
// Write writes bytes to the underlying writer.
// The write is ignored if the writer is currently snapshotting.
func (w *logWriter) Write(p []byte) (int, error) {
if w.snapshotIndex != 0 {
return 0, nil
}
return w.Writer.Write(p)
}
func (w *logWriter) Close() error {
w.snapshotIndex = 0
close(w.done)
return nil
}
// flushes data for writers that implement HTTP.Flusher.
func flushWriter(w io.Writer) {
if w, ok := w.(http.Flusher); ok {
w.Flush()
}
}
// LogEntryType serves as an internal marker for log entries.
// Non-command entry types are handled by the library itself.
type LogEntryType uint8
const (
LogEntryCommand LogEntryType = iota
LogEntryNop
LogEntryInitialize
LogEntryAddPeer
LogEntryRemovePeer
// Internal entry types.
logEntryConfig = 254
logEntrySnapshot = 255
)
// LogEntry represents a single command within the log.
type LogEntry struct {
Type LogEntryType
Index uint64
Term uint64
Data []byte
}
// encodedHeader returns the encoded header for the entry.
func (e *LogEntry) encodedHeader() []byte {
var b [logEntryHeaderSize]byte
binary.BigEndian.PutUint64(b[0:8], (uint64(e.Type)<<56)|uint64(len(e.Data)))
binary.BigEndian.PutUint64(b[8:16], e.Index)
binary.BigEndian.PutUint64(b[16:24], e.Term)
return b[:]
}
type uint64Slice []uint64
func (p uint64Slice) Len() int { return len(p) }
func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
var errDone = errors.New("done")
func assert(condition bool, msg string, v ...interface{}) {
if !condition {
panic(fmt.Sprintf("assert failed: "+msg, v...))
}
}
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
func printstack() {
stack := strings.Join(strings.Split(string(debug.Stack()), "\n")[2:], "\n")
fmt.Fprintln(os.Stderr, stack)
}