Log deserialization.

pull/820/head
Ben Johnson 2013-04-15 20:47:59 -06:00
parent 3841bb03a4
commit 2ff4241451
4 changed files with 119 additions and 33 deletions

22
log.go
View File

@ -1,6 +1,7 @@
package raft
import (
"bufio"
"errors"
"fmt"
"io"
@ -55,9 +56,10 @@ func (l *Log) NewCommand(name string) (Command, error) {
}
// Make a copy of the command.
copy, ok := reflect.New(reflect.ValueOf(command).Type()).Interface().(Command)
v := reflect.New(reflect.Indirect(reflect.ValueOf(command)).Type()).Interface()
copy, ok := v.(Command)
if !ok {
panic(fmt.Sprintf("raft.Log: Command type already exists: %s", command.Name()))
panic(fmt.Sprintf("raft.Log: Unable to copy command: %s (%v)", command.Name(), reflect.ValueOf(v).Kind().String()))
}
return copy, nil
}
@ -89,22 +91,26 @@ func (l *Log) Open(path string) error {
return err
}
defer file.Close()
reader := bufio.NewReader(file)
// Read the file and decode entries.
eof := false
for !eof {
for {
if _, err := reader.Peek(1); err == io.EOF {
break
}
// Instantiate log entry and decode into it.
entry := NewLogEntry(l, 0, 0, nil)
err := entry.Decode(l.file)
if err == io.EOF {
eof = true
} else if err != nil {
err := entry.Decode(reader)
if err != nil {
return err
}
// Append entry.
l.entries = append(l.entries, entry)
}
file.Close()
}
// Open the file for appending.

View File

@ -3,6 +3,7 @@ package raft
import (
"bufio"
"bytes"
"errors"
"hash/crc32"
"fmt"
"io"
@ -18,8 +19,8 @@ import (
// A log entry stores a single item in the log.
type LogEntry struct {
log *Log
term uint64
index uint64
term uint64
command Command
}
@ -30,11 +31,11 @@ type LogEntry struct {
//------------------------------------------------------------------------------
// Creates a new log entry associated with a log.
func NewLogEntry(log *Log, term uint64, index uint64, command Command) *LogEntry {
func NewLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry {
return &LogEntry{
log: log,
term: term,
index: index,
term: term,
command: command,
}
}
@ -51,6 +52,10 @@ func NewLogEntry(log *Log, term uint64, index uint64, command Command) *LogEntry
// Encodes the log entry to a buffer.
func (e *LogEntry) Encode(w io.Writer) error {
if w == nil {
return errors.New("raft.LogEntry: Writer required to encode")
}
encodedCommand, err := json.Marshal(e.command)
if err != nil {
return err
@ -58,7 +63,7 @@ func (e *LogEntry) Encode(w io.Writer) error {
// Write log line to temporary buffer.
var b bytes.Buffer
if _, err = fmt.Fprintf(&b, "%08x %08x %s %s\n", e.term, e.index, e.command.Name(), encodedCommand); err != nil {
if _, err = fmt.Fprintf(&b, "%016x %016x %s %s\n", e.index, e.term, e.command.Name(), encodedCommand); err != nil {
return err
}
@ -66,52 +71,61 @@ func (e *LogEntry) Encode(w io.Writer) error {
checksum := crc32.ChecksumIEEE(b.Bytes())
// Write log entry with checksum.
_, err = fmt.Fprintf(w, "%04x %s", checksum, b.String())
_, err = fmt.Fprintf(w, "%08x %s", checksum, b.String())
return err
}
// Decodes the log entry from a buffer.
func (e *LogEntry) Decode(r io.Reader) error {
if r == nil {
return errors.New("raft.LogEntry: Reader required to decode")
}
// Read the expected checksum first.
var checksum uint32
fmt.Fscanf(r, "%04x ", &checksum)
if _, err := fmt.Fscanf(r, "%08x", &checksum); err != nil {
return fmt.Errorf("raft.LogEntry: Unable to read checksum: %v", err)
}
// Read the rest of the line.
line, err := bufio.NewReader(r).ReadString('\n')
bufr := bufio.NewReader(r)
if c, _ := bufr.ReadByte(); c != ' ' {
return fmt.Errorf("raft.LogEntry: Expected space, received %02x", c)
}
line, err := bufr.ReadString('\n')
if err == io.EOF {
return fmt.Errorf("raft.LogEntry: Unexpected EOF")
} else if err != nil {
return err
return fmt.Errorf("raft.LogEntry: Unable to read line: %v", err)
}
b := bytes.NewBufferString(line)
// Verify checksum.
bchecksum := crc32.ChecksumIEEE(b.Bytes())
if checksum != bchecksum {
return fmt.Errorf("Invalid checksum: Expected %04x, received %04x", checksum, bchecksum)
return fmt.Errorf("raft.LogEntry: Invalid checksum: Expected %08x, calculated %08x", checksum, bchecksum)
}
// Read term, index and command name.
var commandName string
if _, err := fmt.Fscanf(b, "%08x %08x %s ", &e.term, &e.index, commandName); err != nil {
return err
if _, err := fmt.Fscanf(b, "%016x %016x %s ", &e.index, &e.term, &commandName); err != nil {
return fmt.Errorf("raft.LogEntry: Unable to scan: %v", err)
}
// Instantiate command by name.
command, err := e.log.NewCommand(commandName)
if err != nil {
return err
return fmt.Errorf("raft.LogEntry: Unable to instantiate command (%s): %v", commandName, err)
}
// Deserialize command.
if err = json.NewDecoder(b).Decode(&command); err != nil {
return err
}
// Make sure there's only a newline and EOF remaining.
if c, _ := b.ReadByte(); c != '\n' {
return fmt.Errorf("raft.LogEntry: Expected newline, received %02x", c)
return fmt.Errorf("raft.LogEntry: Unable to decode: %v", err)
}
e.command = command
// Make sure there's only an EOF remaining.
if c, err := b.ReadByte(); err != io.EOF {
return fmt.Errorf("raft.LogEntry: Expected EOL, received %02x", c)
}

View File

@ -4,6 +4,7 @@ import (
"testing"
"io/ioutil"
"os"
"reflect"
)
//------------------------------------------------------------------------------
@ -12,13 +13,20 @@ import (
//
//------------------------------------------------------------------------------
func setupLogFile() string {
func getLogPath() string {
f, _ := ioutil.TempFile("", "raft-log-")
f.Close()
os.Remove(f.Name())
return f.Name()
}
func setupLog(content string) string {
f, _ := ioutil.TempFile("", "raft-log-")
f.Write([]byte(content))
f.Close()
return f.Name()
}
type TestCommand1 struct {
Val string `json:"val"`
I int `json:"i"`
@ -28,31 +36,78 @@ func (c TestCommand1) Name() string {
return "cmd_1"
}
type TestCommand2 struct {
X int `json:"x"`
}
func (c TestCommand2) Name() string {
return "cmd_2"
}
//------------------------------------------------------------------------------
//
// Tests
//
//------------------------------------------------------------------------------
// Ensure that we can encode log entries.
// Ensure that we can append to a new log.
func TestLogNewLog(t *testing.T) {
path := setupLogFile()
path := getLogPath()
log := NewLog()
log.AddCommandType(&TestCommand1{})
log.AddCommandType(&TestCommand2{})
if err := log.Open(path); err != nil {
t.Fatalf("Unable to open log: %v", err)
}
defer log.Close()
defer os.Remove(path)
err := log.Append(NewLogEntry(log, 1, 2, &TestCommand1{"foo", 20}))
if err != nil {
if err := log.Append(NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20})); err != nil {
t.Fatalf("Unable to append: %v", err)
}
if err := log.Append(NewLogEntry(log, 2, 1, &TestCommand2{100})); err != nil {
t.Fatalf("Unable to append: %v", err)
}
if err := log.Append(NewLogEntry(log, 3, 2, &TestCommand1{"bar", 0})); err != nil {
t.Fatalf("Unable to append: %v", err)
}
expected := `a9f602d5 00000001 00000002 cmd_1 {"val":"foo","i":20}`+"\n"
expected :=
`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}`+"\n" +
`4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}`+"\n" +
`6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}`+"\n"
actual, _ := ioutil.ReadFile(path)
if string(actual) != expected {
t.Fatalf("Unexpected buffer:\nexp: %s\ngot: %s", expected, string(actual))
t.Fatalf("Unexpected buffer:\nexp:\n%s\ngot:\n%s", expected, string(actual))
}
}
// Ensure that we can decode and encode to an existing log.
func TestLogExistingLog(t *testing.T) {
path := setupLog(
`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}`+"\n" +
`4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}`+"\n" +
`6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}`+"\n")
log := NewLog()
log.AddCommandType(&TestCommand1{})
log.AddCommandType(&TestCommand2{})
if err := log.Open(path); err != nil {
t.Fatalf("Unable to open log: %v", err)
}
defer log.Close()
defer os.Remove(path)
// Validate existing log entries.
if len(log.entries) != 3 {
t.Fatalf("Expected 3 entries, got %d", len(log.entries))
}
if !reflect.DeepEqual(log.entries[0], NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20})) {
t.Fatalf("Unexpected entry[0]: %v", log.entries[0])
}
if !reflect.DeepEqual(log.entries[1], NewLogEntry(log, 2, 1, &TestCommand2{100})) {
t.Fatalf("Unexpected entry[1]: %v", log.entries[1])
}
if !reflect.DeepEqual(log.entries[2], NewLogEntry(log, 3, 2, &TestCommand1{"bar", 0})) {
t.Fatalf("Unexpected entry[2]: %v", log.entries[2])
}
}

11
util.go Normal file
View File

@ -0,0 +1,11 @@
package raft
import (
"fmt"
"os"
)
// Writes to standard error.
func warn(msg string, v ...interface{}) {
fmt.Fprintf(os.Stderr, msg+"\n", v...)
}