diff --git a/log.go b/log.go index 10d8adfe3d..da93b77a53 100644 --- a/log.go +++ b/log.go @@ -139,61 +139,95 @@ func (l *Log) open(path string) error { defer l.mutex.Unlock() // Read all the entries from the log if one exists. - var lastIndex int = 0 - if _, err := os.Stat(path); !os.IsNotExist(err) { - // Open the log file. - file, err := os.Open(path) - if err != nil { - return err - } - defer file.Close() - reader := bufio.NewReader(file) + var readBytes int64 = 0 - // Read the file and decode entries. - for { - if _, err := reader.Peek(1); err == io.EOF { - break - } + var err error + debugln("log.open.open ", path) + // open log file + l.file, err = os.OpenFile(path, os.O_RDWR, 0600) + l.path = path - // Instantiate log entry and decode into it. - entry, _ := newLogEntry(l, 0, 0, nil) - n, err := entry.decode(reader) + if err != nil { + // if the log file does not exist before + // we create the log file and set commitIndex to 0 + if os.IsNotExist(err) { + l.file, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600) + debugln("log.open.create ", path) if err != nil { - file.Close() - if err = os.Truncate(path, int64(lastIndex)); err != nil { - return fmt.Errorf("raft.Log: Unable to recover: %v", err) - } - break + panic(err) + } + // set commitIndex to 0 + _, err = fmt.Fprintf(l.file, "%8x\n", 0) + + if err != nil { + l.file.Close() + panic(err) + return err } - // Append entry. - l.entries = append(l.entries, entry) + } + return err + } + debugln("log.open.exist ", path) + // if the log file exists + // we read out the commitIndex and apply all the commands + // seek to the end of log file + var commitIndex uint64 + _, err = fmt.Fscanf(l.file, "%8x\n", &commitIndex) + debugln("log.open.commitIndex is ", commitIndex) + if err != nil { + panic(err) + return err + } + + reader := bufio.NewReader(l.file) + + // Read the file and decode entries. + for { + if _, err := reader.Peek(1); err == io.EOF { + break + } + + // Instantiate log entry and decode into it. + entry, _ := newLogEntry(l, 0, 0, nil) + n, err := entry.decode(reader) + if err != nil { + //panic(err) + //l.file.Close() + if err = os.Truncate(path, readBytes); err != nil { + return fmt.Errorf("raft.Log: Unable to recover: %v", err) + } + break + } + + // Append entry. + l.entries = append(l.entries, entry) + debugln("open.log.append log index ", entry.Index) + // if the entry index less than the known commitIndex + // commit it + if entry.Index < commitIndex { + l.commitIndex = entry.Index // Lookup and decode command. command, err := newCommand(entry.CommandName, entry.Command) if err != nil { - file.Close() + panic(err) + l.file.Close() return err } // Apply the command. - returnValue, err := l.ApplyFunc(command) - l.results = append(l.results, &logResult{returnValue: returnValue, err: err}) + _, err = l.ApplyFunc(command) + + // Do we really want the result? + // l.results = append(l.results, &logResult{returnValue: returnValue, err: err}) - lastIndex += n } - - file.Close() + readBytes += int64(n) } - - // Open the file for appending. - var err error - l.file, err = os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) - if err != nil { - return err - } - l.path = path + l.results = make([]*logResult, len(l.entries)) + debugln("open.log.recovery number of log ", len(l.entries)) return nil } @@ -385,11 +419,6 @@ func (l *Log) setCommitIndex(index uint64) error { entryIndex := i - 1 - l.startIndex entry := l.entries[entryIndex] - // Write to storage. - if _, err := entry.encode(l.file); err != nil { - return err - } - // Update commit index. l.commitIndex = entry.Index @@ -406,6 +435,12 @@ func (l *Log) setCommitIndex(index uint64) error { return nil } +func (l *Log) flushCommitIndex() { + l.file.Seek(0, os.SEEK_SET) + fmt.Fprintf(l.file, "%8x\n", l.commitIndex) + l.file.Seek(0, os.SEEK_END) +} + //-------------------------------------- // Truncation //-------------------------------------- @@ -431,6 +466,10 @@ func (l *Log) truncate(index uint64, term uint64) error { // If we're truncating everything then just clear the entries. if index == l.startIndex { + debugln("log.truncate.clear") + // 9 = %8x + '\n' + l.file.Truncate(9) + l.file.Seek(9, os.SEEK_SET) l.entries = []*LogEntry{} } else { // Do not truncate if the entry at index does not have the matching term. @@ -443,6 +482,9 @@ func (l *Log) truncate(index uint64, term uint64) error { // Otherwise truncate up to the desired entry. if index < l.startIndex+uint64(len(l.entries)) { debugln("log.truncate.finish") + position := l.entries[index-l.startIndex].Position + l.file.Truncate(position) + l.file.Seek(position, os.SEEK_SET) l.entries = l.entries[0 : index-l.startIndex] } } @@ -488,6 +530,15 @@ func (l *Log) appendEntry(entry *LogEntry) error { } } + position, _ := l.file.Seek(0, os.SEEK_CUR) + + entry.Position = position + + // Write to storage. + if _, err := entry.encode(l.file); err != nil { + return err + } + // Append to entries list if stored on disk. l.entries = append(l.entries, entry) l.results = append(l.results, nil) @@ -523,6 +574,9 @@ func (l *Log) compact(index uint64, term uint64) error { return err } for _, entry := range entries { + position, _ := l.file.Seek(0, os.SEEK_CUR) + entry.Position = position + if _, err = entry.encode(file); err != nil { return err } diff --git a/log_entry.go b/log_entry.go index 5672a05dd0..8d77f02a82 100644 --- a/log_entry.go +++ b/log_entry.go @@ -16,6 +16,7 @@ type LogEntry struct { Term uint64 CommandName string Command []byte + Position int64 // position in the log file commit chan bool } @@ -62,11 +63,10 @@ func (e *LogEntry) encode(w io.Writer) (int, error) { err := p.Marshal(pb) if err != nil { - panic(err) return -1, err } - _, err = fmt.Fprintf(w, "%x", len(p.Bytes())) + _, err = fmt.Fprintf(w, "%x\n", len(p.Bytes())) if err != nil { return -1, err @@ -80,10 +80,9 @@ func (e *LogEntry) encode(w io.Writer) (int, error) { func (e *LogEntry) decode(r io.Reader) (int, error) { var length int - _, err := fmt.Fscanf(r, "%x", &length) + _, err := fmt.Fscanf(r, "%x\n", &length) if err != nil { - panic(err) return -1, err } @@ -100,7 +99,7 @@ func (e *LogEntry) decode(r io.Reader) (int, error) { err = p.Unmarshal(pb) if err != nil { - return 0, err + return -1, err } e.Term = pb.GetTerm() diff --git a/log_test.go b/log_test.go index 2d3dadd899..9ce0919936 100644 --- a/log_test.go +++ b/log_test.go @@ -1,6 +1,7 @@ package raft import ( + "fmt" "io/ioutil" "os" "reflect" @@ -115,6 +116,13 @@ func TestLogRecovery(t *testing.T) { e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20}) e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100}) f, _ := ioutil.TempFile("", "raft-log-") + _, err := fmt.Fprintf(f, "%8x\n", 0) + + if err != nil { + f.Close() + panic(err) + } + e0.encode(f) e1.encode(f) f.WriteString("CORRUPT!") diff --git a/nop_command.go b/nop_command.go index 333ebb4b65..925ccd71ec 100644 --- a/nop_command.go +++ b/nop_command.go @@ -10,7 +10,7 @@ type NOPCommand struct { // The name of the NOP command in the log func (c NOPCommand) CommandName() string { - return "nop" + return "nop\n" } func (c NOPCommand) Apply(server *Server) (interface{}, error) { diff --git a/peer.go b/peer.go index 8497ef9797..e7761dd974 100644 --- a/peer.go +++ b/peer.go @@ -252,7 +252,7 @@ func (p *Peer) sendSnapshotRecoveryRequest() { return } // Send response to server for processing. - p.server.send(&AppendEntriesResponse{Term: resp.Term, Success: resp.Success, append: (resp.Term == p.server.currentTerm)}) + p.server.send(&AppendEntriesResponse{Term: resp.Term, Success: resp.Success, append: (resp.Term == p.server.currentTerm)}) } //-------------------------------------- diff --git a/server.go b/server.go index 7ca54b25a7..dc312b34ec 100644 --- a/server.go +++ b/server.go @@ -320,12 +320,12 @@ func (s *Server) Initialize() error { // Initialize the log and load it up. if err := s.log.open(s.LogPath()); err != nil { - s.debugln("raft: Log error: %s", err) + s.debugln("raft: Log error: ", err) return fmt.Errorf("raft: Initialization error: %s", err) } // Update the term to the last term in the log. - s.currentTerm = s.log.currentTerm() + _, s.currentTerm = s.log.lastInfo() return nil } @@ -333,6 +333,7 @@ func (s *Server) Initialize() error { // Start the sever as a follower func (s *Server) StartFollower() { s.setState(Follower) + s.debugln("follower starts at term: ", s.currentTerm, " index: ", s.log.currentIndex(), " commitIndex: ", s.log.commitIndex) go s.loop() } @@ -340,7 +341,7 @@ func (s *Server) StartFollower() { func (s *Server) StartLeader() { s.setState(Leader) s.currentTerm++ - s.debugln("leader start at term: ", s.currentTerm, " index: ", s.log.currentIndex()) + s.debugln("leader starts at term: ", s.currentTerm, " index: ", s.log.currentIndex(), " commitIndex: ", s.log.commitIndex) go s.loop() } @@ -578,6 +579,7 @@ func (s *Server) leaderLoop() { logIndex, _ := s.log.lastInfo() // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat. + s.debugln("leaderLoop.set.PrevIndex to ", logIndex) for _, peer := range s.peers { peer.setPrevLogIndex(logIndex) peer.startHeartbeat() @@ -667,6 +669,7 @@ func (s *Server) processCommand(command Command, e *event) { // Create an entry for the command in the log. entry, err := s.log.createEntry(s.currentTerm, command) + if err != nil { s.debugln("server.command.log.entry.error:", err) e.c <- err @@ -861,6 +864,9 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot func (s *Server) AddPeer(name string) error { s.debugln("server.peer.add: ", name, len(s.peers)) + // Save the configuration of the cluster + s.log.flushCommitIndex() + // Do not allow peers to be added twice. if s.peers[name] != nil { return nil diff --git a/server_test.go b/server_test.go index 0719435404..d476ad7f80 100644 --- a/server_test.go +++ b/server_test.go @@ -100,7 +100,7 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) { resp := server.RequestVote(newRequestVoteRequest(3, "foo", 3, 3)) if resp.Term != 3 || resp.VoteGranted { - t.Fatalf("Stale index vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted) + t.Fatalf("Stale index vote should have been denied [%v/%v/%v]", resp.Term, resp.VoteGranted) } resp = server.RequestVote(newRequestVoteRequest(3, "foo", 4, 2)) if resp.Term != 3 || resp.VoteGranted { diff --git a/test.go b/test.go index bcbe06d9a4..7bf3d27d80 100644 --- a/test.go +++ b/test.go @@ -37,10 +37,19 @@ func getLogPath() string { func setupLog(entries []*LogEntry) (*Log, string) { f, _ := ioutil.TempFile("", "raft-log-") + + // commit Index + _, err := fmt.Fprintf(f, "%8x\n", 0) + + if err != nil { + f.Close() + panic(err) + } + for _, entry := range entries { entry.encode(f) } - err := f.Close() + err = f.Close() if err != nil { panic(err) @@ -75,6 +84,13 @@ func newTestServerWithLog(name string, transporter Transporter, entries []*LogEn if err != nil { panic(err) } + _, err = fmt.Fprintf(f, "%x\n", len(entries)) + + if err != nil { + f.Close() + panic(err) + } + for _, entry := range entries { entry.encode(f) } @@ -95,10 +111,10 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]* } for _, server := range servers { server.SetHeartbeatTimeout(testHeartbeatTimeout) + server.Initialize() for _, peer := range servers { server.AddPeer(peer.Name()) } - server.Initialize() } return servers } @@ -129,7 +145,6 @@ func (t *testTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer return t.SendSnapshotRecoveryRequest(server, peer, req) } - type testStateMachine struct { saveFunc func() ([]byte, error) recoveryFunc func([]byte) error