write log entry to file on disk when appendEntry and truncate log file on disk when truncate log entries

pull/820/head
Xiang Li 2013-07-20 19:07:16 -07:00
parent 59de4e7064
commit d286dfdc6c
8 changed files with 139 additions and 57 deletions

140
log.go
View File

@ -139,61 +139,95 @@ func (l *Log) open(path string) error {
defer l.mutex.Unlock() defer l.mutex.Unlock()
// Read all the entries from the log if one exists. // Read all the entries from the log if one exists.
var lastIndex int = 0 var readBytes int64 = 0
if _, err := os.Stat(path); !os.IsNotExist(err) {
// Open the log file.
file, err := os.Open(path)
if err != nil {
return err
}
defer file.Close()
reader := bufio.NewReader(file)
// Read the file and decode entries. var err error
for { debugln("log.open.open ", path)
if _, err := reader.Peek(1); err == io.EOF { // open log file
break l.file, err = os.OpenFile(path, os.O_RDWR, 0600)
} l.path = path
// Instantiate log entry and decode into it. if err != nil {
entry, _ := newLogEntry(l, 0, 0, nil) // if the log file does not exist before
n, err := entry.decode(reader) // we create the log file and set commitIndex to 0
if os.IsNotExist(err) {
l.file, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600)
debugln("log.open.create ", path)
if err != nil { if err != nil {
file.Close() panic(err)
if err = os.Truncate(path, int64(lastIndex)); err != nil { }
return fmt.Errorf("raft.Log: Unable to recover: %v", err) // set commitIndex to 0
} _, err = fmt.Fprintf(l.file, "%8x\n", 0)
break
if err != nil {
l.file.Close()
panic(err)
return err
} }
// Append entry. }
l.entries = append(l.entries, entry) return err
}
debugln("log.open.exist ", path)
// if the log file exists
// we read out the commitIndex and apply all the commands
// seek to the end of log file
var commitIndex uint64
_, err = fmt.Fscanf(l.file, "%8x\n", &commitIndex)
debugln("log.open.commitIndex is ", commitIndex)
if err != nil {
panic(err)
return err
}
reader := bufio.NewReader(l.file)
// Read the file and decode entries.
for {
if _, err := reader.Peek(1); err == io.EOF {
break
}
// Instantiate log entry and decode into it.
entry, _ := newLogEntry(l, 0, 0, nil)
n, err := entry.decode(reader)
if err != nil {
//panic(err)
//l.file.Close()
if err = os.Truncate(path, readBytes); err != nil {
return fmt.Errorf("raft.Log: Unable to recover: %v", err)
}
break
}
// Append entry.
l.entries = append(l.entries, entry)
debugln("open.log.append log index ", entry.Index)
// if the entry index less than the known commitIndex
// commit it
if entry.Index < commitIndex {
l.commitIndex = entry.Index l.commitIndex = entry.Index
// Lookup and decode command. // Lookup and decode command.
command, err := newCommand(entry.CommandName, entry.Command) command, err := newCommand(entry.CommandName, entry.Command)
if err != nil { if err != nil {
file.Close() panic(err)
l.file.Close()
return err return err
} }
// Apply the command. // Apply the command.
returnValue, err := l.ApplyFunc(command) _, err = l.ApplyFunc(command)
l.results = append(l.results, &logResult{returnValue: returnValue, err: err})
// Do we really want the result?
// l.results = append(l.results, &logResult{returnValue: returnValue, err: err})
lastIndex += n
} }
readBytes += int64(n)
file.Close()
} }
l.results = make([]*logResult, len(l.entries))
// Open the file for appending. debugln("open.log.recovery number of log ", len(l.entries))
var err error
l.file, err = os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
l.path = path
return nil return nil
} }
@ -385,11 +419,6 @@ func (l *Log) setCommitIndex(index uint64) error {
entryIndex := i - 1 - l.startIndex entryIndex := i - 1 - l.startIndex
entry := l.entries[entryIndex] entry := l.entries[entryIndex]
// Write to storage.
if _, err := entry.encode(l.file); err != nil {
return err
}
// Update commit index. // Update commit index.
l.commitIndex = entry.Index l.commitIndex = entry.Index
@ -406,6 +435,12 @@ func (l *Log) setCommitIndex(index uint64) error {
return nil return nil
} }
func (l *Log) flushCommitIndex() {
l.file.Seek(0, os.SEEK_SET)
fmt.Fprintf(l.file, "%8x\n", l.commitIndex)
l.file.Seek(0, os.SEEK_END)
}
//-------------------------------------- //--------------------------------------
// Truncation // Truncation
//-------------------------------------- //--------------------------------------
@ -431,6 +466,10 @@ func (l *Log) truncate(index uint64, term uint64) error {
// If we're truncating everything then just clear the entries. // If we're truncating everything then just clear the entries.
if index == l.startIndex { if index == l.startIndex {
debugln("log.truncate.clear")
// 9 = %8x + '\n'
l.file.Truncate(9)
l.file.Seek(9, os.SEEK_SET)
l.entries = []*LogEntry{} l.entries = []*LogEntry{}
} else { } else {
// Do not truncate if the entry at index does not have the matching term. // Do not truncate if the entry at index does not have the matching term.
@ -443,6 +482,9 @@ func (l *Log) truncate(index uint64, term uint64) error {
// Otherwise truncate up to the desired entry. // Otherwise truncate up to the desired entry.
if index < l.startIndex+uint64(len(l.entries)) { if index < l.startIndex+uint64(len(l.entries)) {
debugln("log.truncate.finish") debugln("log.truncate.finish")
position := l.entries[index-l.startIndex].Position
l.file.Truncate(position)
l.file.Seek(position, os.SEEK_SET)
l.entries = l.entries[0 : index-l.startIndex] l.entries = l.entries[0 : index-l.startIndex]
} }
} }
@ -488,6 +530,15 @@ func (l *Log) appendEntry(entry *LogEntry) error {
} }
} }
position, _ := l.file.Seek(0, os.SEEK_CUR)
entry.Position = position
// Write to storage.
if _, err := entry.encode(l.file); err != nil {
return err
}
// Append to entries list if stored on disk. // Append to entries list if stored on disk.
l.entries = append(l.entries, entry) l.entries = append(l.entries, entry)
l.results = append(l.results, nil) l.results = append(l.results, nil)
@ -523,6 +574,9 @@ func (l *Log) compact(index uint64, term uint64) error {
return err return err
} }
for _, entry := range entries { for _, entry := range entries {
position, _ := l.file.Seek(0, os.SEEK_CUR)
entry.Position = position
if _, err = entry.encode(file); err != nil { if _, err = entry.encode(file); err != nil {
return err return err
} }

View File

@ -16,6 +16,7 @@ type LogEntry struct {
Term uint64 Term uint64
CommandName string CommandName string
Command []byte Command []byte
Position int64 // position in the log file
commit chan bool commit chan bool
} }
@ -62,11 +63,10 @@ func (e *LogEntry) encode(w io.Writer) (int, error) {
err := p.Marshal(pb) err := p.Marshal(pb)
if err != nil { if err != nil {
panic(err)
return -1, err return -1, err
} }
_, err = fmt.Fprintf(w, "%x", len(p.Bytes())) _, err = fmt.Fprintf(w, "%x\n", len(p.Bytes()))
if err != nil { if err != nil {
return -1, err return -1, err
@ -80,10 +80,9 @@ func (e *LogEntry) encode(w io.Writer) (int, error) {
func (e *LogEntry) decode(r io.Reader) (int, error) { func (e *LogEntry) decode(r io.Reader) (int, error) {
var length int var length int
_, err := fmt.Fscanf(r, "%x", &length) _, err := fmt.Fscanf(r, "%x\n", &length)
if err != nil { if err != nil {
panic(err)
return -1, err return -1, err
} }
@ -100,7 +99,7 @@ func (e *LogEntry) decode(r io.Reader) (int, error) {
err = p.Unmarshal(pb) err = p.Unmarshal(pb)
if err != nil { if err != nil {
return 0, err return -1, err
} }
e.Term = pb.GetTerm() e.Term = pb.GetTerm()

View File

@ -1,6 +1,7 @@
package raft package raft
import ( import (
"fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"reflect" "reflect"
@ -115,6 +116,13 @@ func TestLogRecovery(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20}) e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100}) e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100})
f, _ := ioutil.TempFile("", "raft-log-") f, _ := ioutil.TempFile("", "raft-log-")
_, err := fmt.Fprintf(f, "%8x\n", 0)
if err != nil {
f.Close()
panic(err)
}
e0.encode(f) e0.encode(f)
e1.encode(f) e1.encode(f)
f.WriteString("CORRUPT!") f.WriteString("CORRUPT!")

View File

@ -10,7 +10,7 @@ type NOPCommand struct {
// The name of the NOP command in the log // The name of the NOP command in the log
func (c NOPCommand) CommandName() string { func (c NOPCommand) CommandName() string {
return "nop" return "nop\n"
} }
func (c NOPCommand) Apply(server *Server) (interface{}, error) { func (c NOPCommand) Apply(server *Server) (interface{}, error) {

View File

@ -252,7 +252,7 @@ func (p *Peer) sendSnapshotRecoveryRequest() {
return return
} }
// Send response to server for processing. // Send response to server for processing.
p.server.send(&AppendEntriesResponse{Term: resp.Term, Success: resp.Success, append: (resp.Term == p.server.currentTerm)}) p.server.send(&AppendEntriesResponse{Term: resp.Term, Success: resp.Success, append: (resp.Term == p.server.currentTerm)})
} }
//-------------------------------------- //--------------------------------------

View File

@ -320,12 +320,12 @@ func (s *Server) Initialize() error {
// Initialize the log and load it up. // Initialize the log and load it up.
if err := s.log.open(s.LogPath()); err != nil { if err := s.log.open(s.LogPath()); err != nil {
s.debugln("raft: Log error: %s", err) s.debugln("raft: Log error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err) return fmt.Errorf("raft: Initialization error: %s", err)
} }
// Update the term to the last term in the log. // Update the term to the last term in the log.
s.currentTerm = s.log.currentTerm() _, s.currentTerm = s.log.lastInfo()
return nil return nil
} }
@ -333,6 +333,7 @@ func (s *Server) Initialize() error {
// Start the sever as a follower // Start the sever as a follower
func (s *Server) StartFollower() { func (s *Server) StartFollower() {
s.setState(Follower) s.setState(Follower)
s.debugln("follower starts at term: ", s.currentTerm, " index: ", s.log.currentIndex(), " commitIndex: ", s.log.commitIndex)
go s.loop() go s.loop()
} }
@ -340,7 +341,7 @@ func (s *Server) StartFollower() {
func (s *Server) StartLeader() { func (s *Server) StartLeader() {
s.setState(Leader) s.setState(Leader)
s.currentTerm++ s.currentTerm++
s.debugln("leader start at term: ", s.currentTerm, " index: ", s.log.currentIndex()) s.debugln("leader starts at term: ", s.currentTerm, " index: ", s.log.currentIndex(), " commitIndex: ", s.log.commitIndex)
go s.loop() go s.loop()
} }
@ -578,6 +579,7 @@ func (s *Server) leaderLoop() {
logIndex, _ := s.log.lastInfo() logIndex, _ := s.log.lastInfo()
// Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat. // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
for _, peer := range s.peers { for _, peer := range s.peers {
peer.setPrevLogIndex(logIndex) peer.setPrevLogIndex(logIndex)
peer.startHeartbeat() peer.startHeartbeat()
@ -667,6 +669,7 @@ func (s *Server) processCommand(command Command, e *event) {
// Create an entry for the command in the log. // Create an entry for the command in the log.
entry, err := s.log.createEntry(s.currentTerm, command) entry, err := s.log.createEntry(s.currentTerm, command)
if err != nil { if err != nil {
s.debugln("server.command.log.entry.error:", err) s.debugln("server.command.log.entry.error:", err)
e.c <- err e.c <- err
@ -861,6 +864,9 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
func (s *Server) AddPeer(name string) error { func (s *Server) AddPeer(name string) error {
s.debugln("server.peer.add: ", name, len(s.peers)) s.debugln("server.peer.add: ", name, len(s.peers))
// Save the configuration of the cluster
s.log.flushCommitIndex()
// Do not allow peers to be added twice. // Do not allow peers to be added twice.
if s.peers[name] != nil { if s.peers[name] != nil {
return nil return nil

View File

@ -100,7 +100,7 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
resp := server.RequestVote(newRequestVoteRequest(3, "foo", 3, 3)) resp := server.RequestVote(newRequestVoteRequest(3, "foo", 3, 3))
if resp.Term != 3 || resp.VoteGranted { if resp.Term != 3 || resp.VoteGranted {
t.Fatalf("Stale index vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted) t.Fatalf("Stale index vote should have been denied [%v/%v/%v]", resp.Term, resp.VoteGranted)
} }
resp = server.RequestVote(newRequestVoteRequest(3, "foo", 4, 2)) resp = server.RequestVote(newRequestVoteRequest(3, "foo", 4, 2))
if resp.Term != 3 || resp.VoteGranted { if resp.Term != 3 || resp.VoteGranted {

21
test.go
View File

@ -37,10 +37,19 @@ func getLogPath() string {
func setupLog(entries []*LogEntry) (*Log, string) { func setupLog(entries []*LogEntry) (*Log, string) {
f, _ := ioutil.TempFile("", "raft-log-") f, _ := ioutil.TempFile("", "raft-log-")
// commit Index
_, err := fmt.Fprintf(f, "%8x\n", 0)
if err != nil {
f.Close()
panic(err)
}
for _, entry := range entries { for _, entry := range entries {
entry.encode(f) entry.encode(f)
} }
err := f.Close() err = f.Close()
if err != nil { if err != nil {
panic(err) panic(err)
@ -75,6 +84,13 @@ func newTestServerWithLog(name string, transporter Transporter, entries []*LogEn
if err != nil { if err != nil {
panic(err) panic(err)
} }
_, err = fmt.Fprintf(f, "%x\n", len(entries))
if err != nil {
f.Close()
panic(err)
}
for _, entry := range entries { for _, entry := range entries {
entry.encode(f) entry.encode(f)
} }
@ -95,10 +111,10 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]*
} }
for _, server := range servers { for _, server := range servers {
server.SetHeartbeatTimeout(testHeartbeatTimeout) server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Initialize()
for _, peer := range servers { for _, peer := range servers {
server.AddPeer(peer.Name()) server.AddPeer(peer.Name())
} }
server.Initialize()
} }
return servers return servers
} }
@ -129,7 +145,6 @@ func (t *testTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer
return t.SendSnapshotRecoveryRequest(server, peer, req) return t.SendSnapshotRecoveryRequest(server, peer, req)
} }
type testStateMachine struct { type testStateMachine struct {
saveFunc func() ([]byte, error) saveFunc func() ([]byte, error)
recoveryFunc func([]byte) error recoveryFunc func([]byte) error