Add log recovery.

pull/820/head
Ben Johnson 2013-04-15 22:02:08 -06:00
parent 2ff4241451
commit c6fd17a0b0
3 changed files with 87 additions and 18 deletions

12
log.go
View File

@ -84,6 +84,7 @@ func (l *Log) AddCommandType(command Command) {
// continue to append entries to the end of the log.
func (l *Log) Open(path string) error {
// Read all the entries from the log if one exists.
var lastIndex int = 0
if _, err := os.Stat(path); !os.IsNotExist(err) {
// Open the log file.
file, err := os.Open(path)
@ -101,10 +102,17 @@ func (l *Log) Open(path string) error {
// Instantiate log entry and decode into it.
entry := NewLogEntry(l, 0, 0, nil)
err := entry.Decode(reader)
n, err := entry.Decode(reader)
if err != nil {
return err
warn("raft.Log: %v", err)
warn("raft.Log: Recovering (%d)", lastIndex)
file.Close()
if err = os.Truncate(path, int64(lastIndex)); err != nil {
return fmt.Errorf("raft.Log: Unable to recover: %v", err)
}
break
}
lastIndex += n
// Append entry.
l.entries = append(l.entries, entry)

View File

@ -75,60 +75,77 @@ func (e *LogEntry) Encode(w io.Writer) error {
return err
}
// Decodes the log entry from a buffer.
func (e *LogEntry) Decode(r io.Reader) error {
// Decodes the log entry from a buffer. Returns the number of bytes read.
func (e *LogEntry) Decode(r io.Reader) (pos int, err error) {
pos = 0
if r == nil {
return errors.New("raft.LogEntry: Reader required to decode")
err = errors.New("raft.LogEntry: Reader required to decode")
return
}
// Read the expected checksum first.
var checksum uint32
if _, err := fmt.Fscanf(r, "%08x", &checksum); err != nil {
return fmt.Errorf("raft.LogEntry: Unable to read checksum: %v", err)
if _, err = fmt.Fscanf(r, "%08x", &checksum); err != nil {
err = fmt.Errorf("raft.LogEntry: Unable to read checksum: %v", err)
return
}
pos += 8
// Read the rest of the line.
bufr := bufio.NewReader(r)
if c, _ := bufr.ReadByte(); c != ' ' {
return fmt.Errorf("raft.LogEntry: Expected space, received %02x", c)
err = fmt.Errorf("raft.LogEntry: Expected space, received %02x", c)
return
}
pos += 1
line, err := bufr.ReadString('\n')
pos += len(line)
if err == io.EOF {
return fmt.Errorf("raft.LogEntry: Unexpected EOF")
err = fmt.Errorf("raft.LogEntry: Unexpected EOF")
return
} else if err != nil {
return fmt.Errorf("raft.LogEntry: Unable to read line: %v", err)
err = fmt.Errorf("raft.LogEntry: Unable to read line: %v", err)
return
}
b := bytes.NewBufferString(line)
// Verify checksum.
bchecksum := crc32.ChecksumIEEE(b.Bytes())
if checksum != bchecksum {
return fmt.Errorf("raft.LogEntry: Invalid checksum: Expected %08x, calculated %08x", checksum, bchecksum)
err = fmt.Errorf("raft.LogEntry: Invalid checksum: Expected %08x, calculated %08x", checksum, bchecksum)
return
}
// Read term, index and command name.
var commandName string
if _, err := fmt.Fscanf(b, "%016x %016x %s ", &e.index, &e.term, &commandName); err != nil {
return fmt.Errorf("raft.LogEntry: Unable to scan: %v", err)
if _, err = fmt.Fscanf(b, "%016x %016x %s ", &e.index, &e.term, &commandName); err != nil {
err = fmt.Errorf("raft.LogEntry: Unable to scan: %v", err)
return
}
// Instantiate command by name.
command, err := e.log.NewCommand(commandName)
if err != nil {
return fmt.Errorf("raft.LogEntry: Unable to instantiate command (%s): %v", commandName, err)
err = fmt.Errorf("raft.LogEntry: Unable to instantiate command (%s): %v", commandName, err)
return
}
// Deserialize command.
if err = json.NewDecoder(b).Decode(&command); err != nil {
return fmt.Errorf("raft.LogEntry: Unable to decode: %v", err)
err = fmt.Errorf("raft.LogEntry: Unable to decode: %v", err)
return
}
e.command = command
// Make sure there's only an EOF remaining.
if c, err := b.ReadByte(); err != io.EOF {
return fmt.Errorf("raft.LogEntry: Expected EOL, received %02x", c)
c, err := b.ReadByte()
if err != io.EOF {
err = fmt.Errorf("raft.LogEntry: Expected EOL, received %02x", c)
return
}
return nil
err = nil
return
}

View File

@ -111,3 +111,47 @@ func TestLogExistingLog(t *testing.T) {
t.Fatalf("Unexpected entry[2]: %v", log.entries[2])
}
}
// Ensure that we can recover from an incomplete/corrupt log and continue logging.
func TestLogRecovery(t *testing.T) {
path := setupLog(
`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}`+"\n" +
`4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}`+"\n" +
`6ac5807c 0000000000000003 00000000000`)
log := NewLog()
log.AddCommandType(&TestCommand1{})
log.AddCommandType(&TestCommand2{})
if err := log.Open(path); err != nil {
t.Fatalf("Unable to open log: %v", err)
}
defer log.Close()
defer os.Remove(path)
if err := log.Append(NewLogEntry(log, 3, 2, &TestCommand1{"bat", -5})); err != nil {
t.Fatalf("Unable to append: %v", err)
}
// Validate existing log entries.
if len(log.entries) != 3 {
t.Fatalf("Expected 2 entries, got %d", len(log.entries))
}
if !reflect.DeepEqual(log.entries[0], NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20})) {
t.Fatalf("Unexpected entry[0]: %v", log.entries[0])
}
if !reflect.DeepEqual(log.entries[1], NewLogEntry(log, 2, 1, &TestCommand2{100})) {
t.Fatalf("Unexpected entry[1]: %v", log.entries[1])
}
if !reflect.DeepEqual(log.entries[2], NewLogEntry(log, 3, 2, &TestCommand1{"bat", -5})) {
t.Fatalf("Unexpected entry[2]: %v", log.entries[2])
}
// Validate log contents.
expected :=
`cf4aab23 0000000000000001 0000000000000001 cmd_1 {"val":"foo","i":20}`+"\n" +
`4c08d91f 0000000000000002 0000000000000001 cmd_2 {"x":100}`+"\n" +
`3f3f884c 0000000000000003 0000000000000002 cmd_1 {"val":"bat","i":-5}`+"\n"
actual, _ := ioutil.ReadFile(path)
if string(actual) != expected {
t.Fatalf("Unexpected buffer:\nexp:\n%s\ngot:\n%s", expected, string(actual))
}
}