Add single node simulation.

pull/903/head
Ben Johnson 2014-09-08 15:39:05 -06:00
parent 44a5f90acf
commit 7d4c3e9c6f
5 changed files with 228 additions and 77 deletions

12
raft/TODO.md Normal file
View File

@ -0,0 +1,12 @@
TODO
====
## Uncompleted
- [ ] Log initialization
## Completed
- [x] Encoding
- [x] Streaming

View File

@ -9,6 +9,12 @@ var (
// ErrAlreadyOpen is returned when opening a log that is already open.
ErrAlreadyOpen = errors.New("log already open")
// ErrURLRequired is returned when opening a log without a URL set.
ErrURLRequired = errors.New("url required")
// ErrLogExists is returned when initializing an already existing log.
ErrLogExists = errors.New("log exists")
// ErrNotLeader is returned performing leader operations on a non-leader.
ErrNotLeader = errors.New("not leader")

View File

@ -1,8 +1,10 @@
package raft
import (
"crypto/rand"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"net/url"
"os"
@ -15,7 +17,7 @@ import (
// FSM represents the state machine that the log is applied to.
type FSM interface {
Apply([]*LogEntry)
Apply(*LogEntry) error
Snapshot(io.Writer) error
Restore(io.Reader) error
}
@ -35,7 +37,6 @@ const (
type Log struct {
mu sync.Mutex
id uint64 // unique log identifier
path string // data directory
state State // current node state
config *Config // cluster configuration
@ -56,6 +57,9 @@ type Log struct {
segment *segment // TODO(benbjohnson): support multiple segments
// The locator for the log. This matches the entry in the peer list.
URL *url.URL
// The state machine that log entries will be applied to.
FSM FSM
@ -73,6 +77,9 @@ type Log struct {
// Clock is an abstraction of the time package. By default it will use
// a real-time clock but a mock clock can be used for testing.
Clock clock.Clock
// Rand returns randomly generated bytes.
Rand func([]byte) (n int, err error)
}
// Path returns the data path of the Raft log.
@ -101,9 +108,11 @@ func (l *Log) Open(path string) error {
l.mu.Lock()
defer l.mu.Unlock()
// Do not allow an open log to be reopened.
// Validate initial log state.
if l.opened() {
return ErrAlreadyOpen
} else if l.URL == nil {
return ErrURLRequired
}
// Create directory, if not exists.
@ -112,10 +121,13 @@ func (l *Log) Open(path string) error {
}
l.path = path
// Use the realtime clock by default.
// Setup default clock & random source.
if l.Clock == nil {
l.Clock = clock.New()
}
if l.Rand == nil {
l.Rand = rand.Read
}
// Read config.
if err := l.restoreConfig(); err != nil {
@ -175,6 +187,85 @@ func (l *Log) restoreConfig() error {
return nil
}
// Initialize a new log.
// Returns an error if log data already exists.
func (l *Log) Initialize() error {
l.mu.Lock()
defer l.mu.Unlock()
// Return error if entries already exist.
if l.currentIndex > 0 {
return ErrLogExists
} else if l.URL == nil {
return ErrURLRequired
}
// Generate a new configuration with one node.
config := &Config{Peers: []*url.URL{l.URL}}
// Generate new 8-hex digit cluster identifier.
var clusterID [4]byte
_, _ = l.Rand(clusterID[:])
config.ClusterID = fmt.Sprintf("%08x", clusterID[:])
// Automatically promote to leader.
l.promoteToLeader()
// Set initial configuration.
b, _ := config.MarshalJSON()
if err := l.apply(LogEntryConfig, b); err != nil {
return err
}
return nil
}
// promoteTo moves the log to a leader state.
func (l *Log) promoteToLeader() {
assert(l.state == Candidate || (l.state == Follower && l.currentIndex == 0), "invalid prev state: %s / %d", l.state, l.currentIndex)
l.state = Leader
}
// Apply executes a command against the log.
// This function returns once the command has been committed to the log.
func (l *Log) Apply(command []byte) error {
l.mu.Lock()
defer l.mu.Unlock()
return l.apply(LogEntryCommand, command)
}
func (l *Log) apply(typ LogEntryType, command []byte) error {
// Do not apply if this node is not the leader.
if l.state != Leader {
return ErrNotLeader
}
// Create log entry.
l.currentIndex++
e := LogEntry{
Type: typ,
Index: l.currentIndex + 1,
Term: l.currentTerm,
Data: command,
}
// Append to the current log segment.
if err := l.segment.append(&e); err != nil {
return err
}
// TODO(benbjohnson): Wait for consensus.
// Apply to FSM.
if err := l.FSM.Apply(&e); err != nil {
return err
}
// TODO(benbjohnson): Add callback.
return nil
}
// Heartbeat establishes dominance by the current leader.
// Returns the current term and highest written log entry index.
func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (uint64, uint64, error) {
@ -346,9 +437,6 @@ func (l *Log) Elect() error {
func (l *Log) elect() error {
l.state = Candidate
// TODO(benbjohnson): Hold election.
// TEMP: Move to leader.
l.state = Leader
return nil
}
@ -531,22 +619,41 @@ type segmentWriter struct {
// Config represents the configuration for the log.
type Config struct {
// Cluster identifier. Used to prevent separate clusters from
// accidentally communicating with one another.
ClusterID string
// List of peers in the cluster.
Peers []*url.URL `json:"peers,omitempty"`
Peers []*url.URL
}
// configJSONMarshaler represents the JSON serialized form of the Config type.
type configJSONMarshaler struct {
ClusterID string `json:"clusterID"`
Peers []string `json:"peers"`
}
// MarshalJSON converts a Config into a JSON-formatted byte slice.
func (c *Config) MarshalJSON() ([]byte, error) {
var o configJSONMarshaler
o.ClusterID = c.ClusterID
for _, u := range c.Peers {
o.Peers = append(o.Peers, u.String())
}
return json.Marshal(&o)
}
// UnmarshalJSON parses a JSON-formatted byte slice into a Config instance.
func (c *Config) UnmarshalJSON(data []byte) error {
var o struct {
Peers []string `json:"peers"`
}
// Unmarshal into temporary type.
var o configJSONMarshaler
if err := json.Unmarshal(data, &o); err != nil {
return err
}
// Convert data to Config type.
// Convert values to config format.
c.ClusterID = o.ClusterID
for _, peer := range o.Peers {
u, err := url.Parse(peer)
if err != nil {
@ -558,16 +665,8 @@ func (c *Config) UnmarshalJSON(data []byte) error {
return nil
}
// MarshalJSON converts a Config into a JSON-formatted byte slice.
func (c *Config) MarshalJSON() ([]byte, error) {
var o struct {
Peers []string `json:"peers"`
func assert(condition bool, msg string, v ...interface{}) {
if !condition {
panic(fmt.Sprintf("asser failed: "+msg, v...))
}
// Convert to temporary type.
for _, u := range c.Peers {
o.Peers = append(o.Peers, u.String())
}
return json.Marshal(&o)
}

View File

@ -3,12 +3,12 @@ package raft_test
import (
"bytes"
"log"
"net/url"
"os"
"reflect"
"runtime"
"testing"
"testing/quick"
"time"
"github.com/benbjohnson/clock"
"github.com/influxdb/influxdb/raft"
@ -29,57 +29,6 @@ func TestLog_Open(t *testing.T) {
}
}
// Ensure that a log can read entries from a stream.
func TestLog_ReadWrite(t *testing.T) {
l := NewTestLog()
defer l.Close()
// Entries to write.
entries := []*raft.LogEntry{
&raft.LogEntry{Index: 1, Term: 1, Data: []byte{0}},
&raft.LogEntry{Index: 2, Term: 1, Data: []byte{1}},
}
// Create reader.
var w bytes.Buffer
enc := raft.NewLogEntryEncoder(&w)
for _, e := range entries {
if err := enc.Encode(e); err != nil {
t.Fatal("encode: ", err)
}
}
// Consume from the reader.
if err := l.ReadFrom(&BufferCloser{&w}); err != nil {
t.Fatal("read from: ", err)
}
// Force an election.
if err := l.Elect(); err != nil {
t.Fatal("elect: ", err)
}
// Read entries back out of the log.
var r bytes.Buffer
go func() {
if err := l.WriteTo(&BufferCloser{&r}, 0, 0); err != nil {
t.Fatal("write to: ", err)
}
}()
time.Sleep(10 * time.Millisecond) // HACK(benbjohnson)
// Verify entries.
dec := raft.NewLogEntryDecoder(&r)
for _, exp := range entries {
var e raft.LogEntry
if err := dec.Decode(&e); err != nil {
t.Fatal("decode(0): ", err)
} else if !reflect.DeepEqual(exp, &e) {
t.Fatalf("entry:\n\nexp: %#v\n\ngot: %#v\n\n", exp, &e)
}
}
}
// Ensure that log entries can be encoded to a writer.
func TestLogEntryEncoder_Encode(t *testing.T) {
var buf bytes.Buffer
@ -224,7 +173,10 @@ type TestLog struct {
func NewTestLog() *TestLog {
l := NewUnopenedTestLog()
if err := l.Open(tempfile()); err != nil {
log.Fatal("open: ", err)
log.Fatalf("open: %s", err)
}
if err := l.Initialize(); err != nil {
log.Fatalf("initialize: %s", err)
}
return l
}
@ -234,9 +186,12 @@ func NewTestLog() *TestLog {
func NewUnopenedTestLog() *TestLog {
l := &TestLog{
Log: &raft.Log{
FSM: &TestFSM{},
Clock: clock.NewMockClock(),
Rand: nopRand,
},
}
l.URL, _ = url.Parse("//node")
return l
}
@ -252,3 +207,6 @@ type BufferCloser struct {
}
func (b *BufferCloser) Close() error { return nil }
// nopRand implements the raft.Log.Rand interface but does nothing.
func nopRand(b []byte) (int, error) { return len(b), nil }

View File

@ -1,16 +1,92 @@
package raft_test
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"net/url"
"os"
"testing"
"testing/quick"
"github.com/benbjohnson/clock"
"github.com/influxdb/influxdb/raft"
)
func init() {
log.SetFlags(0)
}
// Ensure that a single node can process commands applied to the log.
func Test_Simulate_SingleNode(t *testing.T) {
f := func(commands [][]byte) bool {
var fsm TestFSM
l := &raft.Log{
FSM: &fsm,
Clock: clock.NewMockClock(),
Rand: nopRand,
}
l.URL, _ = url.Parse("//node")
if err := l.Open(tempfile()); err != nil {
log.Fatal("open: ", err)
}
defer os.RemoveAll(l.Path())
defer l.Close()
// HACK(benbjohnson): Initialize instead.
if err := l.Initialize(); err != nil {
t.Fatalf("initialize: %s", err)
}
// Execute a series of commands.
for _, command := range commands {
if err := l.Apply(command); err != nil {
t.Fatalf("apply: %s", err)
}
}
// Verify the configuration is set.
if fsm.config != `{"clusterID":"00000000","peers":["//node"]}` {
t.Fatalf("unexpected config: %s", fsm.config)
}
// Verify the commands were executed against the FSM, in order.
for i, command := range commands {
if b := fsm.commands[i]; !bytes.Equal(command, b) {
t.Fatalf("%d. command:\n\nexp: %x\n\n got:%x\n\n", i, command, b)
}
}
return true
}
if err := quick.Check(f, nil); err != nil {
t.Error(err)
}
}
// TestFSM represents a fake state machine that simple records all commands.
type TestFSM struct {
config string
commands [][]byte
}
func (fsm *TestFSM) Apply(entry *raft.LogEntry) error {
switch entry.Type {
case raft.LogEntryCommand:
fsm.commands = append(fsm.commands, entry.Data)
case raft.LogEntryConfig:
fsm.config = string(entry.Data)
default:
panic("unknown entry type")
}
return nil
}
func (fsm *TestFSM) Snapshot(w io.Writer) error { return nil }
func (fsm *TestFSM) Restore(r io.Reader) error { return nil }
// tempfile returns the path to a non-existent file in the temp directory.
func tempfile() string {
f, _ := ioutil.TempFile("", "raft-")