diff --git a/log.go b/log.go index dbc61189bc..01efc08a66 100644 --- a/log.go +++ b/log.go @@ -1,6 +1,7 @@ package raft import ( + "bufio" "errors" "fmt" "io" @@ -55,9 +56,10 @@ func (l *Log) NewCommand(name string) (Command, error) { } // Make a copy of the command. - copy, ok := reflect.New(reflect.ValueOf(command).Type()).Interface().(Command) + v := reflect.New(reflect.Indirect(reflect.ValueOf(command)).Type()).Interface() + copy, ok := v.(Command) if !ok { - panic(fmt.Sprintf("raft.Log: Command type already exists: %s", command.Name())) + panic(fmt.Sprintf("raft.Log: Unable to copy command: %s (%v)", command.Name(), reflect.ValueOf(v).Kind().String())) } return copy, nil } @@ -89,22 +91,26 @@ func (l *Log) Open(path string) error { return err } defer file.Close() + reader := bufio.NewReader(file) // Read the file and decode entries. - eof := false - for !eof { + for { + if _, err := reader.Peek(1); err == io.EOF { + break + } + // Instantiate log entry and decode into it. entry := NewLogEntry(l, 0, 0, nil) - err := entry.Decode(l.file) - if err == io.EOF { - eof = true - } else if err != nil { + err := entry.Decode(reader) + if err != nil { return err } // Append entry. l.entries = append(l.entries, entry) } + + file.Close() } // Open the file for appending. diff --git a/log_entry.go b/log_entry.go index 5e3c166801..bfd75d98d7 100644 --- a/log_entry.go +++ b/log_entry.go @@ -3,6 +3,7 @@ package raft import ( "bufio" "bytes" + "errors" "hash/crc32" "fmt" "io" @@ -18,8 +19,8 @@ import ( // A log entry stores a single item in the log. type LogEntry struct { log *Log - term uint64 index uint64 + term uint64 command Command } @@ -30,11 +31,11 @@ type LogEntry struct { //------------------------------------------------------------------------------ // Creates a new log entry associated with a log. -func NewLogEntry(log *Log, term uint64, index uint64, command Command) *LogEntry { +func NewLogEntry(log *Log, index uint64, term uint64, command Command) *LogEntry { return &LogEntry{ log: log, - term: term, index: index, + term: term, command: command, } } @@ -51,6 +52,10 @@ func NewLogEntry(log *Log, term uint64, index uint64, command Command) *LogEntry // Encodes the log entry to a buffer. func (e *LogEntry) Encode(w io.Writer) error { + if w == nil { + return errors.New("raft.LogEntry: Writer required to encode") + } + encodedCommand, err := json.Marshal(e.command) if err != nil { return err @@ -58,7 +63,7 @@ func (e *LogEntry) Encode(w io.Writer) error { // Write log line to temporary buffer. var b bytes.Buffer - if _, err = fmt.Fprintf(&b, "%08x %08x %s %s\n", e.term, e.index, e.command.Name(), encodedCommand); err != nil { + if _, err = fmt.Fprintf(&b, "%016x %016x %s %s\n", e.index, e.term, e.command.Name(), encodedCommand); err != nil { return err } @@ -66,52 +71,61 @@ func (e *LogEntry) Encode(w io.Writer) error { checksum := crc32.ChecksumIEEE(b.Bytes()) // Write log entry with checksum. - _, err = fmt.Fprintf(w, "%04x %s", checksum, b.String()) + _, err = fmt.Fprintf(w, "%08x %s", checksum, b.String()) return err } // Decodes the log entry from a buffer. func (e *LogEntry) Decode(r io.Reader) error { + if r == nil { + return errors.New("raft.LogEntry: Reader required to decode") + } + // Read the expected checksum first. var checksum uint32 - fmt.Fscanf(r, "%04x ", &checksum) + if _, err := fmt.Fscanf(r, "%08x", &checksum); err != nil { + return fmt.Errorf("raft.LogEntry: Unable to read checksum: %v", err) + } // Read the rest of the line. - line, err := bufio.NewReader(r).ReadString('\n') + bufr := bufio.NewReader(r) + if c, _ := bufr.ReadByte(); c != ' ' { + return fmt.Errorf("raft.LogEntry: Expected space, received %02x", c) + } + + line, err := bufr.ReadString('\n') if err == io.EOF { return fmt.Errorf("raft.LogEntry: Unexpected EOF") } else if err != nil { - return err + return fmt.Errorf("raft.LogEntry: Unable to read line: %v", err) } b := bytes.NewBufferString(line) // Verify checksum. bchecksum := crc32.ChecksumIEEE(b.Bytes()) if checksum != bchecksum { - return fmt.Errorf("Invalid checksum: Expected %04x, received %04x", checksum, bchecksum) + return fmt.Errorf("raft.LogEntry: Invalid checksum: Expected %08x, calculated %08x", checksum, bchecksum) } // Read term, index and command name. var commandName string - if _, err := fmt.Fscanf(b, "%08x %08x %s ", &e.term, &e.index, commandName); err != nil { - return err + if _, err := fmt.Fscanf(b, "%016x %016x %s ", &e.index, &e.term, &commandName); err != nil { + return fmt.Errorf("raft.LogEntry: Unable to scan: %v", err) } // Instantiate command by name. command, err := e.log.NewCommand(commandName) if err != nil { - return err + return fmt.Errorf("raft.LogEntry: Unable to instantiate command (%s): %v", commandName, err) } // Deserialize command. if err = json.NewDecoder(b).Decode(&command); err != nil { - return err - } - - // Make sure there's only a newline and EOF remaining. - if c, _ := b.ReadByte(); c != '\n' { - return fmt.Errorf("raft.LogEntry: Expected newline, received %02x", c) + return fmt.Errorf("raft.LogEntry: Unable to decode: %v", err) } + e.command = command + + // Make sure there's only an EOF remaining. if c, err := b.ReadByte(); err != io.EOF { return fmt.Errorf("raft.LogEntry: Expected EOL, received %02x", c) } diff --git a/log_test.go b/log_test.go index 3cd098ce8e..24a008cda8 100644 --- a/log_test.go +++ b/log_test.go @@ -4,6 +4,7 @@ import ( "testing" "io/ioutil" "os" + "reflect" ) //------------------------------------------------------------------------------ @@ -12,13 +13,20 @@ import ( // //------------------------------------------------------------------------------ -func setupLogFile() string { +func getLogPath() string { f, _ := ioutil.TempFile("", "raft-log-") f.Close() os.Remove(f.Name()) return f.Name() } +func setupLog(content string) string { + f, _ := ioutil.TempFile("", "raft-log-") + f.Write([]byte(content)) + f.Close() + return f.Name() +} + type TestCommand1 struct { Val string `json:"val"` I int `json:"i"` @@ -28,31 +36,78 @@ func (c TestCommand1) Name() string { return "cmd_1" } +type TestCommand2 struct { + X int `json:"x"` +} + +func (c TestCommand2) Name() string { + return "cmd_2" +} + //------------------------------------------------------------------------------ // // Tests // //------------------------------------------------------------------------------ -// Ensure that we can encode log entries. +// Ensure that we can append to a new log. func TestLogNewLog(t *testing.T) { - path := setupLogFile() + path := getLogPath() log := NewLog() log.AddCommandType(&TestCommand1{}) + log.AddCommandType(&TestCommand2{}) if err := log.Open(path); err != nil { t.Fatalf("Unable to open log: %v", err) } defer log.Close() defer os.Remove(path) - err := log.Append(NewLogEntry(log, 1, 2, &TestCommand1{"foo", 20})) - if err != nil { + if err := log.Append(NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20})); err != nil { + t.Fatalf("Unable to append: %v", err) + } + if err := log.Append(NewLogEntry(log, 2, 1, &TestCommand2{100})); err != nil { + t.Fatalf("Unable to append: %v", err) + } + if err := log.Append(NewLogEntry(log, 3, 2, &TestCommand1{"bar", 0})); err != nil { t.Fatalf("Unable to append: %v", err) } - expected := `a9f602d5 00000001 00000002 cmd_1 {"val":"foo","i":20}`+"\n" + expected := + `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}`+"\n" + + `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}`+"\n" + + `6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}`+"\n" actual, _ := ioutil.ReadFile(path) if string(actual) != expected { - t.Fatalf("Unexpected buffer:\nexp: %s\ngot: %s", expected, string(actual)) + t.Fatalf("Unexpected buffer:\nexp:\n%s\ngot:\n%s", expected, string(actual)) + } +} + +// Ensure that we can decode and encode to an existing log. +func TestLogExistingLog(t *testing.T) { + path := setupLog( + `cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}`+"\n" + + `4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}`+"\n" + + `6ac5807c 0000000000000003 0000000000000002 cmd_1 {"val":"bar","i":0}`+"\n") + log := NewLog() + log.AddCommandType(&TestCommand1{}) + log.AddCommandType(&TestCommand2{}) + if err := log.Open(path); err != nil { + t.Fatalf("Unable to open log: %v", err) + } + defer log.Close() + defer os.Remove(path) + + // Validate existing log entries. + if len(log.entries) != 3 { + t.Fatalf("Expected 3 entries, got %d", len(log.entries)) + } + if !reflect.DeepEqual(log.entries[0], NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20})) { + t.Fatalf("Unexpected entry[0]: %v", log.entries[0]) + } + if !reflect.DeepEqual(log.entries[1], NewLogEntry(log, 2, 1, &TestCommand2{100})) { + t.Fatalf("Unexpected entry[1]: %v", log.entries[1]) + } + if !reflect.DeepEqual(log.entries[2], NewLogEntry(log, 3, 2, &TestCommand1{"bar", 0})) { + t.Fatalf("Unexpected entry[2]: %v", log.entries[2]) } } diff --git a/util.go b/util.go new file mode 100644 index 0000000000..92e30e4816 --- /dev/null +++ b/util.go @@ -0,0 +1,11 @@ +package raft + +import ( + "fmt" + "os" +) + +// Writes to standard error. +func warn(msg string, v ...interface{}) { + fmt.Fprintf(os.Stderr, msg+"\n", v...) +}