Add log truncation and AppendEntries overwrite.

pull/820/head
Ben Johnson 2013-04-30 22:44:16 -06:00
parent 7106fe616e
commit c8b9d783b7
5 changed files with 126 additions and 8 deletions

39
log.go
View File

@ -272,6 +272,45 @@ func (l *Log) SetCommitIndex(index uint64) error {
return nil
}
//--------------------------------------
// Truncation
//--------------------------------------
// Truncates the log to the given index and term. This only works if the log
// at the index has not been committed.
func (l *Log) Truncate(index uint64, term uint64) error {
l.mutex.Lock()
defer l.mutex.Unlock()
// Do not allow committed entries to be truncated.
if index < l.CommitIndex() {
return fmt.Errorf("raft.Log: Index is already committed (%v): (IDX=%v, TERM=%v)", l.CommitIndex(), index, term)
}
// Do not truncate past end of entries.
if index > uint64(len(l.entries)) {
return fmt.Errorf("raft.Log: Entry index does not exist (MAX=%v): (IDX=%v, TERM=%v)", len(l.entries), index, term)
}
// If we're truncating everything then just clear the entries.
if index == 0 {
l.entries = []*LogEntry{}
} else {
// Do not truncate if the entry at index does not have the matching term.
entry := l.entries[index-1]
if len(l.entries) > 0 && entry.term != term {
return fmt.Errorf("raft.Log: Entry at index does not have matching term (%v): (IDX=%v, TERM=%v)", entry.term, index, term)
}
// Otherwise truncate up to the desired entry.
if index < uint64(len(l.entries)) {
l.entries = l.entries[0:index]
}
}
return nil
}
//--------------------------------------
// Append
//--------------------------------------

View File

@ -13,6 +13,10 @@ import (
//
//------------------------------------------------------------------------------
//--------------------------------------
// Append
//--------------------------------------
// Ensure that we can append to a new log.
func TestLogNewLog(t *testing.T) {
path := getLogPath()
@ -170,3 +174,55 @@ func TestLogRecovery(t *testing.T) {
}
warn("--- END RECOVERY TEST\n")
}
//--------------------------------------
// Append
//--------------------------------------
// Ensure that we can truncate uncommitted entries in the log.
func TestLogTruncate(t *testing.T) {
log, path := setupLog("")
if err := log.Open(path); err != nil {
t.Fatalf("Unable to open log: %v", err)
}
defer log.Close()
defer os.Remove(path)
entry1 := NewLogEntry(log, 1, 1, &TestCommand1{"foo", 20})
if err := log.AppendEntry(entry1); err != nil {
t.Fatalf("Unable to append: %v", err)
}
entry2 := NewLogEntry(log, 2, 1, &TestCommand2{100})
if err := log.AppendEntry(entry2); err != nil {
t.Fatalf("Unable to append: %v", err)
}
entry3 := NewLogEntry(log, 3, 2, &TestCommand1{"bar", 0})
if err := log.AppendEntry(entry3); err != nil {
t.Fatalf("Unable to append: %v", err)
}
if err := log.SetCommitIndex(2); err != nil {
t.Fatalf("Unable to partially commit: %v", err)
}
// Truncate committed entry.
if err := log.Truncate(1, 1); err == nil || err.Error() != "raft.Log: Index is already committed (2): (IDX=1, TERM=1)" {
t.Fatalf("Truncating committed entries shouldn't work: %v", err)
}
// Truncate past end of log.
if err := log.Truncate(4, 2); err == nil || err.Error() != "raft.Log: Entry index does not exist (MAX=3): (IDX=4, TERM=2)" {
t.Fatalf("Truncating past end-of-log shouldn't work: %v", err)
}
// Truncate entry with mismatched term.
if err := log.Truncate(2, 2); err == nil || err.Error() != "raft.Log: Entry at index does not have matching term (1): (IDX=2, TERM=2)" {
t.Fatalf("Truncating mismatched entries shouldn't work: %v", err)
}
// Truncate end of log.
if err := log.Truncate(3, 2); !(err == nil && reflect.DeepEqual(log.entries, []*LogEntry{entry1,entry2,entry3})) {
t.Fatalf("Truncating end of log should work: %v\n\nEntries:\nActual: %v\nExpected: %v", err, log.entries, []*LogEntry{entry1,entry2,entry3})
}
// Truncate at last commit.
if err := log.Truncate(2, 1); !(err == nil && reflect.DeepEqual(log.entries, []*LogEntry{entry1,entry2})) {
t.Fatalf("Truncating at last commit should work: %v\n\nEntries:\nActual: %v\nExpected: %v", err, log.entries, []*LogEntry{entry1,entry2})
}
}

View File

@ -311,12 +311,8 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) (*AppendEntriesRespons
s.electionTimer.Reset()
// Reject if log doesn't contain a matching previous entry.
if req.PrevLogIndex == 0 && req.PrevLogTerm == 0 {
if index, _ := s.log.CommitInfo(); index > 0 {
return NewAppendEntriesResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Log contains previously committed entries: (IDX=%v, TERM=%v)", req.PrevLogIndex, req.PrevLogTerm)
}
} else if !s.log.ContainsEntry(req.PrevLogIndex, req.PrevLogTerm) {
return NewAppendEntriesResponse(s.currentTerm, false), fmt.Errorf("raft.Server: Log does not contain commit: (IDX=%v, TERM=%v)", req.PrevLogIndex, req.PrevLogTerm)
if err := s.log.Truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
return NewAppendEntriesResponse(s.currentTerm, false), err
}
// Append entries to the log.

View File

@ -1,6 +1,7 @@
package raft
import (
"reflect"
"testing"
)
@ -99,6 +100,7 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
func TestServerAppendEntries(t *testing.T) {
server := newTestServer("1")
server.Start()
defer server.Stop()
// Append single entry.
entries := []*LogEntry{NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10})}
@ -136,6 +138,7 @@ func TestServerAppendEntries(t *testing.T) {
func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
server := newTestServer("1")
server.Start()
defer server.Stop()
server.currentTerm = 2
// Append single entry.
@ -153,6 +156,7 @@ func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
server := newTestServer("1")
server.Start()
defer server.Stop()
// Append single entry + commit.
entries := []*LogEntry{
@ -172,8 +176,30 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
}
}
// TODO: Reject entries from earlier index or term.
// TODO: Test rollback of uncommitted entries.
// Ensure that we uncommitted entries are rolled back if new entries overwrite them.
func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
server := newTestServer("1")
server.Start()
defer server.Stop()
entry1 := NewLogEntry(nil, 1, 1, &TestCommand1{"foo", 10})
entry2 := NewLogEntry(nil, 2, 1, &TestCommand1{"foo", 15})
entry3 := NewLogEntry(nil, 2, 2, &TestCommand1{"bar", 20})
// Append single entry + commit.
entries := []*LogEntry{entry1, entry2}
resp, err := server.AppendEntries(NewAppendEntriesRequest(1, "ldr", 0, 0, entries, 1))
if !(resp.Term == 1 && resp.Success && err == nil && server.log.CommitIndex() == 1 && reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry2})) {
t.Fatalf("AppendEntries failed: %v/%v : %v", resp.Term, resp.Success, err)
}
// Append entry that overwrites the second (uncommitted) entry.
entries = []*LogEntry{entry3}
resp, err = server.AppendEntries(NewAppendEntriesRequest(2, "ldr", 1, 1, entries, 2))
if !(resp.Term == 2 && resp.Success && err == nil && server.log.CommitIndex() == 2 && reflect.DeepEqual(server.log.entries, []*LogEntry{entry1, entry3})) {
t.Fatalf("AppendEntries should have succeeded: %v/%v : %v", resp.Term, resp.Success, err)
}
}
//--------------------------------------
// Membership

View File

@ -32,6 +32,7 @@ func setupLogFile(content string) string {
func setupLog(content string) (*Log, string) {
path := setupLogFile(content)
log := NewLog()
log.ApplyFunc = func(c Command) {}
log.AddCommandType(&TestCommand1{})
log.AddCommandType(&TestCommand2{})
if err := log.Open(path); err != nil {