From 7d4c3e9c6f477b75e4f6fb01223199944dd21485 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 8 Sep 2014 15:39:05 -0600 Subject: [PATCH] Add single node simulation. --- raft/TODO.md | 12 ++++ raft/errors.go | 6 ++ raft/log.go | 147 ++++++++++++++++++++++++++++++++++++++-------- raft/log_test.go | 64 ++++---------------- raft/raft_test.go | 76 ++++++++++++++++++++++++ 5 files changed, 228 insertions(+), 77 deletions(-) create mode 100644 raft/TODO.md diff --git a/raft/TODO.md b/raft/TODO.md new file mode 100644 index 0000000000..79bb2aaddd --- /dev/null +++ b/raft/TODO.md @@ -0,0 +1,12 @@ +TODO +==== + +## Uncompleted + +- [ ] Log initialization + + +## Completed + +- [x] Encoding +- [x] Streaming diff --git a/raft/errors.go b/raft/errors.go index 3222f0a04a..2d9e9f9198 100644 --- a/raft/errors.go +++ b/raft/errors.go @@ -9,6 +9,12 @@ var ( // ErrAlreadyOpen is returned when opening a log that is already open. ErrAlreadyOpen = errors.New("log already open") + // ErrURLRequired is returned when opening a log without a URL set. + ErrURLRequired = errors.New("url required") + + // ErrLogExists is returned when initializing an already existing log. + ErrLogExists = errors.New("log exists") + // ErrNotLeader is returned performing leader operations on a non-leader. ErrNotLeader = errors.New("not leader") diff --git a/raft/log.go b/raft/log.go index f35717fa7e..50e065af36 100644 --- a/raft/log.go +++ b/raft/log.go @@ -1,8 +1,10 @@ package raft import ( + "crypto/rand" "encoding/binary" "encoding/json" + "fmt" "io" "net/url" "os" @@ -15,7 +17,7 @@ import ( // FSM represents the state machine that the log is applied to. type FSM interface { - Apply([]*LogEntry) + Apply(*LogEntry) error Snapshot(io.Writer) error Restore(io.Reader) error } @@ -35,7 +37,6 @@ const ( type Log struct { mu sync.Mutex - id uint64 // unique log identifier path string // data directory state State // current node state config *Config // cluster configuration @@ -56,6 +57,9 @@ type Log struct { segment *segment // TODO(benbjohnson): support multiple segments + // The locator for the log. This matches the entry in the peer list. + URL *url.URL + // The state machine that log entries will be applied to. FSM FSM @@ -73,6 +77,9 @@ type Log struct { // Clock is an abstraction of the time package. By default it will use // a real-time clock but a mock clock can be used for testing. Clock clock.Clock + + // Rand returns randomly generated bytes. + Rand func([]byte) (n int, err error) } // Path returns the data path of the Raft log. @@ -101,9 +108,11 @@ func (l *Log) Open(path string) error { l.mu.Lock() defer l.mu.Unlock() - // Do not allow an open log to be reopened. + // Validate initial log state. if l.opened() { return ErrAlreadyOpen + } else if l.URL == nil { + return ErrURLRequired } // Create directory, if not exists. @@ -112,10 +121,13 @@ func (l *Log) Open(path string) error { } l.path = path - // Use the realtime clock by default. + // Setup default clock & random source. if l.Clock == nil { l.Clock = clock.New() } + if l.Rand == nil { + l.Rand = rand.Read + } // Read config. if err := l.restoreConfig(); err != nil { @@ -175,6 +187,85 @@ func (l *Log) restoreConfig() error { return nil } +// Initialize a new log. +// Returns an error if log data already exists. +func (l *Log) Initialize() error { + l.mu.Lock() + defer l.mu.Unlock() + + // Return error if entries already exist. + if l.currentIndex > 0 { + return ErrLogExists + } else if l.URL == nil { + return ErrURLRequired + } + + // Generate a new configuration with one node. + config := &Config{Peers: []*url.URL{l.URL}} + + // Generate new 8-hex digit cluster identifier. + var clusterID [4]byte + _, _ = l.Rand(clusterID[:]) + config.ClusterID = fmt.Sprintf("%08x", clusterID[:]) + + // Automatically promote to leader. + l.promoteToLeader() + + // Set initial configuration. + b, _ := config.MarshalJSON() + if err := l.apply(LogEntryConfig, b); err != nil { + return err + } + + return nil +} + +// promoteTo moves the log to a leader state. +func (l *Log) promoteToLeader() { + assert(l.state == Candidate || (l.state == Follower && l.currentIndex == 0), "invalid prev state: %s / %d", l.state, l.currentIndex) + l.state = Leader +} + +// Apply executes a command against the log. +// This function returns once the command has been committed to the log. +func (l *Log) Apply(command []byte) error { + l.mu.Lock() + defer l.mu.Unlock() + return l.apply(LogEntryCommand, command) +} + +func (l *Log) apply(typ LogEntryType, command []byte) error { + // Do not apply if this node is not the leader. + if l.state != Leader { + return ErrNotLeader + } + + // Create log entry. + l.currentIndex++ + e := LogEntry{ + Type: typ, + Index: l.currentIndex + 1, + Term: l.currentTerm, + Data: command, + } + + // Append to the current log segment. + if err := l.segment.append(&e); err != nil { + return err + } + + // TODO(benbjohnson): Wait for consensus. + + // Apply to FSM. + if err := l.FSM.Apply(&e); err != nil { + return err + } + + // TODO(benbjohnson): Add callback. + + return nil +} + // Heartbeat establishes dominance by the current leader. // Returns the current term and highest written log entry index. func (l *Log) Heartbeat(term, commitIndex, leaderID uint64) (uint64, uint64, error) { @@ -346,9 +437,6 @@ func (l *Log) Elect() error { func (l *Log) elect() error { l.state = Candidate // TODO(benbjohnson): Hold election. - - // TEMP: Move to leader. - l.state = Leader return nil } @@ -531,22 +619,41 @@ type segmentWriter struct { // Config represents the configuration for the log. type Config struct { + // Cluster identifier. Used to prevent separate clusters from + // accidentally communicating with one another. + ClusterID string + // List of peers in the cluster. - Peers []*url.URL `json:"peers,omitempty"` + Peers []*url.URL +} + +// configJSONMarshaler represents the JSON serialized form of the Config type. +type configJSONMarshaler struct { + ClusterID string `json:"clusterID"` + Peers []string `json:"peers"` +} + +// MarshalJSON converts a Config into a JSON-formatted byte slice. +func (c *Config) MarshalJSON() ([]byte, error) { + var o configJSONMarshaler + o.ClusterID = c.ClusterID + for _, u := range c.Peers { + o.Peers = append(o.Peers, u.String()) + } + + return json.Marshal(&o) } // UnmarshalJSON parses a JSON-formatted byte slice into a Config instance. func (c *Config) UnmarshalJSON(data []byte) error { - var o struct { - Peers []string `json:"peers"` - } - // Unmarshal into temporary type. + var o configJSONMarshaler if err := json.Unmarshal(data, &o); err != nil { return err } - // Convert data to Config type. + // Convert values to config format. + c.ClusterID = o.ClusterID for _, peer := range o.Peers { u, err := url.Parse(peer) if err != nil { @@ -558,16 +665,8 @@ func (c *Config) UnmarshalJSON(data []byte) error { return nil } -// MarshalJSON converts a Config into a JSON-formatted byte slice. -func (c *Config) MarshalJSON() ([]byte, error) { - var o struct { - Peers []string `json:"peers"` +func assert(condition bool, msg string, v ...interface{}) { + if !condition { + panic(fmt.Sprintf("asser failed: "+msg, v...)) } - - // Convert to temporary type. - for _, u := range c.Peers { - o.Peers = append(o.Peers, u.String()) - } - - return json.Marshal(&o) } diff --git a/raft/log_test.go b/raft/log_test.go index 2ec55fe4d4..c111685118 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -3,12 +3,12 @@ package raft_test import ( "bytes" "log" + "net/url" "os" "reflect" "runtime" "testing" "testing/quick" - "time" "github.com/benbjohnson/clock" "github.com/influxdb/influxdb/raft" @@ -29,57 +29,6 @@ func TestLog_Open(t *testing.T) { } } -// Ensure that a log can read entries from a stream. -func TestLog_ReadWrite(t *testing.T) { - l := NewTestLog() - defer l.Close() - - // Entries to write. - entries := []*raft.LogEntry{ - &raft.LogEntry{Index: 1, Term: 1, Data: []byte{0}}, - &raft.LogEntry{Index: 2, Term: 1, Data: []byte{1}}, - } - - // Create reader. - var w bytes.Buffer - enc := raft.NewLogEntryEncoder(&w) - for _, e := range entries { - if err := enc.Encode(e); err != nil { - t.Fatal("encode: ", err) - } - } - - // Consume from the reader. - if err := l.ReadFrom(&BufferCloser{&w}); err != nil { - t.Fatal("read from: ", err) - } - - // Force an election. - if err := l.Elect(); err != nil { - t.Fatal("elect: ", err) - } - - // Read entries back out of the log. - var r bytes.Buffer - go func() { - if err := l.WriteTo(&BufferCloser{&r}, 0, 0); err != nil { - t.Fatal("write to: ", err) - } - }() - time.Sleep(10 * time.Millisecond) // HACK(benbjohnson) - - // Verify entries. - dec := raft.NewLogEntryDecoder(&r) - for _, exp := range entries { - var e raft.LogEntry - if err := dec.Decode(&e); err != nil { - t.Fatal("decode(0): ", err) - } else if !reflect.DeepEqual(exp, &e) { - t.Fatalf("entry:\n\nexp: %#v\n\ngot: %#v\n\n", exp, &e) - } - } -} - // Ensure that log entries can be encoded to a writer. func TestLogEntryEncoder_Encode(t *testing.T) { var buf bytes.Buffer @@ -224,7 +173,10 @@ type TestLog struct { func NewTestLog() *TestLog { l := NewUnopenedTestLog() if err := l.Open(tempfile()); err != nil { - log.Fatal("open: ", err) + log.Fatalf("open: %s", err) + } + if err := l.Initialize(); err != nil { + log.Fatalf("initialize: %s", err) } return l } @@ -234,9 +186,12 @@ func NewTestLog() *TestLog { func NewUnopenedTestLog() *TestLog { l := &TestLog{ Log: &raft.Log{ + FSM: &TestFSM{}, Clock: clock.NewMockClock(), + Rand: nopRand, }, } + l.URL, _ = url.Parse("//node") return l } @@ -252,3 +207,6 @@ type BufferCloser struct { } func (b *BufferCloser) Close() error { return nil } + +// nopRand implements the raft.Log.Rand interface but does nothing. +func nopRand(b []byte) (int, error) { return len(b), nil } diff --git a/raft/raft_test.go b/raft/raft_test.go index c89e5e8d03..dd68ce2b57 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -1,16 +1,92 @@ package raft_test import ( + "bytes" "fmt" + "io" "io/ioutil" "log" + "net/url" "os" + "testing" + "testing/quick" + + "github.com/benbjohnson/clock" + "github.com/influxdb/influxdb/raft" ) func init() { log.SetFlags(0) } +// Ensure that a single node can process commands applied to the log. +func Test_Simulate_SingleNode(t *testing.T) { + f := func(commands [][]byte) bool { + var fsm TestFSM + l := &raft.Log{ + FSM: &fsm, + Clock: clock.NewMockClock(), + Rand: nopRand, + } + l.URL, _ = url.Parse("//node") + if err := l.Open(tempfile()); err != nil { + log.Fatal("open: ", err) + } + defer os.RemoveAll(l.Path()) + defer l.Close() + + // HACK(benbjohnson): Initialize instead. + if err := l.Initialize(); err != nil { + t.Fatalf("initialize: %s", err) + } + + // Execute a series of commands. + for _, command := range commands { + if err := l.Apply(command); err != nil { + t.Fatalf("apply: %s", err) + } + } + + // Verify the configuration is set. + if fsm.config != `{"clusterID":"00000000","peers":["//node"]}` { + t.Fatalf("unexpected config: %s", fsm.config) + } + + // Verify the commands were executed against the FSM, in order. + for i, command := range commands { + if b := fsm.commands[i]; !bytes.Equal(command, b) { + t.Fatalf("%d. command:\n\nexp: %x\n\n got:%x\n\n", i, command, b) + } + } + + return true + } + if err := quick.Check(f, nil); err != nil { + t.Error(err) + } +} + +// TestFSM represents a fake state machine that simple records all commands. +type TestFSM struct { + config string + commands [][]byte +} + +func (fsm *TestFSM) Apply(entry *raft.LogEntry) error { + switch entry.Type { + case raft.LogEntryCommand: + fsm.commands = append(fsm.commands, entry.Data) + case raft.LogEntryConfig: + fsm.config = string(entry.Data) + default: + panic("unknown entry type") + } + return nil +} + +func (fsm *TestFSM) Snapshot(w io.Writer) error { return nil } +func (fsm *TestFSM) Restore(r io.Reader) error { return nil } + // tempfile returns the path to a non-existent file in the temp directory. func tempfile() string { f, _ := ioutil.TempFile("", "raft-")