commit
34aa0be4c1
128
log.go
128
log.go
|
@ -139,61 +139,56 @@ func (l *Log) open(path string) error {
|
|||
defer l.mutex.Unlock()
|
||||
|
||||
// Read all the entries from the log if one exists.
|
||||
var lastIndex int = 0
|
||||
if _, err := os.Stat(path); !os.IsNotExist(err) {
|
||||
// Open the log file.
|
||||
file, err := os.Open(path)
|
||||
if err != nil {
|
||||
var readBytes int64
|
||||
|
||||
var err error
|
||||
debugln("log.open.open ", path)
|
||||
// open log file
|
||||
l.file, err = os.OpenFile(path, os.O_RDWR, 0600)
|
||||
l.path = path
|
||||
|
||||
if err != nil {
|
||||
// if the log file does not exist before
|
||||
// we create the log file and set commitIndex to 0
|
||||
if os.IsNotExist(err) {
|
||||
l.file, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600)
|
||||
debugln("log.open.create ", path)
|
||||
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
reader := bufio.NewReader(file)
|
||||
|
||||
// Read the file and decode entries.
|
||||
for {
|
||||
if _, err := reader.Peek(1); err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
// Instantiate log entry and decode into it.
|
||||
entry, _ := newLogEntry(l, 0, 0, nil)
|
||||
n, err := entry.decode(reader)
|
||||
if err != nil {
|
||||
file.Close()
|
||||
if err = os.Truncate(path, int64(lastIndex)); err != nil {
|
||||
return fmt.Errorf("raft.Log: Unable to recover: %v", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// Append entry.
|
||||
l.entries = append(l.entries, entry)
|
||||
l.commitIndex = entry.Index
|
||||
|
||||
// Lookup and decode command.
|
||||
command, err := newCommand(entry.CommandName, entry.Command)
|
||||
if err != nil {
|
||||
file.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
// Apply the command.
|
||||
returnValue, err := l.ApplyFunc(command)
|
||||
l.results = append(l.results, &logResult{returnValue: returnValue, err: err})
|
||||
|
||||
lastIndex += n
|
||||
}
|
||||
|
||||
file.Close()
|
||||
}
|
||||
|
||||
// Open the file for appending.
|
||||
var err error
|
||||
l.file, err = os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l.path = path
|
||||
debugln("log.open.exist ", path)
|
||||
|
||||
reader := bufio.NewReader(l.file)
|
||||
|
||||
// Read the file and decode entries.
|
||||
for {
|
||||
if _, err := reader.Peek(1); err == io.EOF {
|
||||
debugln("open.log.append: finish ")
|
||||
break
|
||||
}
|
||||
|
||||
// Instantiate log entry and decode into it.
|
||||
entry, _ := newLogEntry(l, 0, 0, nil)
|
||||
entry.Position, _ = l.file.Seek(0, os.SEEK_CUR)
|
||||
|
||||
n, err := entry.decode(reader)
|
||||
if err != nil {
|
||||
if err = os.Truncate(path, readBytes); err != nil {
|
||||
return fmt.Errorf("raft.Log: Unable to recover: %v", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// Append entry.
|
||||
l.entries = append(l.entries, entry)
|
||||
debugln("open.log.append log index ", entry.Index)
|
||||
|
||||
readBytes += int64(n)
|
||||
}
|
||||
l.results = make([]*logResult, len(l.entries))
|
||||
debugln("open.log.recovery number of log ", len(l.entries))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -385,11 +380,6 @@ func (l *Log) setCommitIndex(index uint64) error {
|
|||
entryIndex := i - 1 - l.startIndex
|
||||
entry := l.entries[entryIndex]
|
||||
|
||||
// Write to storage.
|
||||
if _, err := entry.encode(l.file); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update commit index.
|
||||
l.commitIndex = entry.Index
|
||||
|
||||
|
@ -406,6 +396,14 @@ func (l *Log) setCommitIndex(index uint64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Set the commitIndex at the head of the log file to the current
|
||||
// commit Index. This should be called after obtained a log lock
|
||||
func (l *Log) flushCommitIndex() {
|
||||
l.file.Seek(0, os.SEEK_SET)
|
||||
fmt.Fprintf(l.file, "%8x\n", l.commitIndex)
|
||||
l.file.Seek(0, os.SEEK_END)
|
||||
}
|
||||
|
||||
//--------------------------------------
|
||||
// Truncation
|
||||
//--------------------------------------
|
||||
|
@ -431,6 +429,9 @@ func (l *Log) truncate(index uint64, term uint64) error {
|
|||
|
||||
// If we're truncating everything then just clear the entries.
|
||||
if index == l.startIndex {
|
||||
debugln("log.truncate.clear")
|
||||
l.file.Truncate(0)
|
||||
l.file.Seek(0, os.SEEK_SET)
|
||||
l.entries = []*LogEntry{}
|
||||
} else {
|
||||
// Do not truncate if the entry at index does not have the matching term.
|
||||
|
@ -443,6 +444,9 @@ func (l *Log) truncate(index uint64, term uint64) error {
|
|||
// Otherwise truncate up to the desired entry.
|
||||
if index < l.startIndex+uint64(len(l.entries)) {
|
||||
debugln("log.truncate.finish")
|
||||
position := l.entries[index-l.startIndex].Position
|
||||
l.file.Truncate(position)
|
||||
l.file.Seek(position, os.SEEK_SET)
|
||||
l.entries = l.entries[0 : index-l.startIndex]
|
||||
}
|
||||
}
|
||||
|
@ -488,6 +492,15 @@ func (l *Log) appendEntry(entry *LogEntry) error {
|
|||
}
|
||||
}
|
||||
|
||||
position, _ := l.file.Seek(0, os.SEEK_CUR)
|
||||
|
||||
entry.Position = position
|
||||
|
||||
// Write to storage.
|
||||
if _, err := entry.encode(l.file); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Append to entries list if stored on disk.
|
||||
l.entries = append(l.entries, entry)
|
||||
l.results = append(l.results, nil)
|
||||
|
@ -523,6 +536,9 @@ func (l *Log) compact(index uint64, term uint64) error {
|
|||
return err
|
||||
}
|
||||
for _, entry := range entries {
|
||||
position, _ := l.file.Seek(0, os.SEEK_CUR)
|
||||
entry.Position = position
|
||||
|
||||
if _, err = entry.encode(file); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ type LogEntry struct {
|
|||
Term uint64
|
||||
CommandName string
|
||||
Command []byte
|
||||
Position int64 // position in the log file
|
||||
commit chan bool
|
||||
}
|
||||
|
||||
|
@ -62,11 +63,10 @@ func (e *LogEntry) encode(w io.Writer) (int, error) {
|
|||
err := p.Marshal(pb)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return -1, err
|
||||
}
|
||||
|
||||
_, err = fmt.Fprintf(w, "%x", len(p.Bytes()))
|
||||
_, err = fmt.Fprintf(w, "%x\n", len(p.Bytes()))
|
||||
|
||||
if err != nil {
|
||||
return -1, err
|
||||
|
@ -80,10 +80,9 @@ func (e *LogEntry) encode(w io.Writer) (int, error) {
|
|||
func (e *LogEntry) decode(r io.Reader) (int, error) {
|
||||
|
||||
var length int
|
||||
_, err := fmt.Fscanf(r, "%x", &length)
|
||||
_, err := fmt.Fscanf(r, "%x\n", &length)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return -1, err
|
||||
}
|
||||
|
||||
|
@ -100,7 +99,7 @@ func (e *LogEntry) decode(r io.Reader) (int, error) {
|
|||
err = p.Unmarshal(pb)
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return -1, err
|
||||
}
|
||||
|
||||
e.Term = pb.GetTerm()
|
||||
|
|
28
log_test.go
28
log_test.go
|
@ -115,6 +115,7 @@ func TestLogRecovery(t *testing.T) {
|
|||
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100})
|
||||
f, _ := ioutil.TempFile("", "raft-log-")
|
||||
|
||||
e0.encode(f)
|
||||
e1.encode(f)
|
||||
f.WriteString("CORRUPT!")
|
||||
|
@ -160,7 +161,7 @@ func TestLogTruncate(t *testing.T) {
|
|||
if err := log.open(path); err != nil {
|
||||
t.Fatalf("Unable to open log: %v", err)
|
||||
}
|
||||
defer log.close()
|
||||
|
||||
defer os.Remove(path)
|
||||
|
||||
entry1, _ := newLogEntry(log, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||
|
@ -200,4 +201,29 @@ func TestLogTruncate(t *testing.T) {
|
|||
t.Fatalf("Truncating at last commit should work: %v\n\nEntries:\nActual: %v\nExpected: %v", err, log.entries, []*LogEntry{entry1, entry2})
|
||||
}
|
||||
|
||||
// Append after truncate
|
||||
if err := log.appendEntry(entry3); err != nil {
|
||||
t.Fatalf("Unable to append after truncate: %v", err)
|
||||
}
|
||||
|
||||
log.close()
|
||||
|
||||
// Recovery the truncated log
|
||||
log = newLog()
|
||||
if err := log.open(path); err != nil {
|
||||
t.Fatalf("Unable to open log: %v", err)
|
||||
}
|
||||
// Validate existing log entries.
|
||||
if len(log.entries) != 3 {
|
||||
t.Fatalf("Expected 3 entries, got %d", len(log.entries))
|
||||
}
|
||||
if log.entries[0].Index != 1 || log.entries[0].Term != 1 {
|
||||
t.Fatalf("Unexpected entry[0]: %v", log.entries[0])
|
||||
}
|
||||
if log.entries[1].Index != 2 || log.entries[1].Term != 1 {
|
||||
t.Fatalf("Unexpected entry[1]: %v", log.entries[1])
|
||||
}
|
||||
if log.entries[2].Index != 3 || log.entries[2].Term != 2 {
|
||||
t.Fatalf("Unexpected entry[2]: %v", log.entries[2])
|
||||
}
|
||||
}
|
||||
|
|
2
peer.go
2
peer.go
|
@ -252,7 +252,7 @@ func (p *Peer) sendSnapshotRecoveryRequest() {
|
|||
return
|
||||
}
|
||||
// Send response to server for processing.
|
||||
p.server.send(&AppendEntriesResponse{Term: resp.Term, Success: resp.Success, append: (resp.Term == p.server.currentTerm)})
|
||||
p.server.send(&AppendEntriesResponse{Term: resp.Term, Success: resp.Success, append: (resp.Term == p.server.currentTerm)})
|
||||
}
|
||||
|
||||
//--------------------------------------
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
// Code generated by protoc-gen-go.
|
||||
// source: snapshot_recovery_resquest.proto
|
||||
// source: snapshot_recovery_request.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
package protobuf
|
||||
|
|
78
server.go
78
server.go
|
@ -5,6 +5,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
|
@ -80,6 +81,8 @@ type Server struct {
|
|||
lastSnapshot *Snapshot
|
||||
stateMachine StateMachine
|
||||
maxLogEntriesPerRequest uint64
|
||||
|
||||
confFile *os.File
|
||||
}
|
||||
|
||||
// An event to be processed by the server's event loop.
|
||||
|
@ -185,7 +188,7 @@ func (s *Server) Context() interface{} {
|
|||
|
||||
// Retrieves the log path for the server.
|
||||
func (s *Server) LogPath() string {
|
||||
return fmt.Sprintf("%s/log", s.path)
|
||||
return path.Join(s.path, "log")
|
||||
}
|
||||
|
||||
// Retrieves the current state of the server.
|
||||
|
@ -316,16 +319,62 @@ func (s *Server) Initialize() error {
|
|||
}
|
||||
|
||||
// Create snapshot directory if not exist
|
||||
os.Mkdir(s.path+"/snapshot", 0700)
|
||||
os.Mkdir(path.Join(s.path, "snapshot"), 0700)
|
||||
|
||||
// Initialize the log and load it up.
|
||||
if err := s.log.open(s.LogPath()); err != nil {
|
||||
s.debugln("raft: Log error: %s", err)
|
||||
s.debugln("raft: Log error: ", err)
|
||||
return fmt.Errorf("raft: Initialization error: %s", err)
|
||||
}
|
||||
|
||||
if err := s.readConf(); err != nil {
|
||||
s.debugln("raft: Conf file error: ", err)
|
||||
return fmt.Errorf("raft: Initialization error: %s", err)
|
||||
}
|
||||
|
||||
// Update the term to the last term in the log.
|
||||
s.currentTerm = s.log.currentTerm()
|
||||
_, s.currentTerm = s.log.lastInfo()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read the configuration for the server.
|
||||
func (s *Server) readConf() error {
|
||||
var err error
|
||||
confPath := path.Join(s.path, "conf")
|
||||
s.debugln("readConf.open ", confPath)
|
||||
// open conf file
|
||||
s.confFile, err = os.OpenFile(confPath, os.O_RDWR, 0600)
|
||||
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
s.confFile, err = os.OpenFile(confPath, os.O_WRONLY|os.O_CREATE, 0600)
|
||||
debugln("readConf.create ", confPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
var peerName string
|
||||
_, err = fmt.Fscanf(s.confFile, "%s\n", &peerName)
|
||||
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
s.debugln("server.peer.conf: finish")
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
s.debugln("server.peer.conf.read: ", peerName)
|
||||
|
||||
peer := newPeer(s, peerName, s.heartbeatTimeout)
|
||||
|
||||
s.peers[peer.name] = peer
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -333,6 +382,7 @@ func (s *Server) Initialize() error {
|
|||
// Start the sever as a follower
|
||||
func (s *Server) StartFollower() {
|
||||
s.setState(Follower)
|
||||
s.debugln("follower starts at term: ", s.currentTerm, " index: ", s.log.currentIndex(), " commitIndex: ", s.log.commitIndex)
|
||||
go s.loop()
|
||||
}
|
||||
|
||||
|
@ -340,7 +390,7 @@ func (s *Server) StartFollower() {
|
|||
func (s *Server) StartLeader() {
|
||||
s.setState(Leader)
|
||||
s.currentTerm++
|
||||
s.debugln("leader start at term: ", s.currentTerm, " index: ", s.log.currentIndex())
|
||||
s.debugln("leader starts at term: ", s.currentTerm, " index: ", s.log.currentIndex(), " commitIndex: ", s.log.commitIndex)
|
||||
go s.loop()
|
||||
}
|
||||
|
||||
|
@ -578,6 +628,7 @@ func (s *Server) leaderLoop() {
|
|||
logIndex, _ := s.log.lastInfo()
|
||||
|
||||
// Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
|
||||
s.debugln("leaderLoop.set.PrevIndex to ", logIndex)
|
||||
for _, peer := range s.peers {
|
||||
peer.setPrevLogIndex(logIndex)
|
||||
peer.startHeartbeat()
|
||||
|
@ -667,6 +718,7 @@ func (s *Server) processCommand(command Command, e *event) {
|
|||
|
||||
// Create an entry for the command in the log.
|
||||
entry, err := s.log.createEntry(s.currentTerm, command)
|
||||
|
||||
if err != nil {
|
||||
s.debugln("server.command.log.entry.error:", err)
|
||||
e.c <- err
|
||||
|
@ -868,6 +920,11 @@ func (s *Server) AddPeer(name string) error {
|
|||
|
||||
// Only add the peer if it doesn't have the same name.
|
||||
if s.name != name {
|
||||
_, err := fmt.Fprintln(s.confFile, name)
|
||||
s.debugln("server.peer.conf.write: ", name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
peer := newPeer(s, name, s.heartbeatTimeout)
|
||||
if s.State() == Leader {
|
||||
peer.startHeartbeat()
|
||||
|
@ -896,8 +953,19 @@ func (s *Server) RemovePeer(name string) error {
|
|||
|
||||
// Stop peer and remove it.
|
||||
peer.stopHeartbeat()
|
||||
|
||||
delete(s.peers, name)
|
||||
|
||||
s.confFile.Truncate(0)
|
||||
s.confFile.Seek(0, os.SEEK_SET)
|
||||
|
||||
for peer := range s.peers {
|
||||
_, err := fmt.Fprintln(s.confFile, peer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -100,7 +100,7 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
|
|||
|
||||
resp := server.RequestVote(newRequestVoteRequest(3, "foo", 3, 3))
|
||||
if resp.Term != 3 || resp.VoteGranted {
|
||||
t.Fatalf("Stale index vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
|
||||
t.Fatalf("Stale index vote should have been denied [%v/%v/%v]", resp.Term, resp.VoteGranted)
|
||||
}
|
||||
resp = server.RequestVote(newRequestVoteRequest(3, "foo", 4, 2))
|
||||
if resp.Term != 3 || resp.VoteGranted {
|
||||
|
|
5
test.go
5
test.go
|
@ -37,6 +37,7 @@ func getLogPath() string {
|
|||
|
||||
func setupLog(entries []*LogEntry) (*Log, string) {
|
||||
f, _ := ioutil.TempFile("", "raft-log-")
|
||||
|
||||
for _, entry := range entries {
|
||||
entry.encode(f)
|
||||
}
|
||||
|
@ -75,6 +76,7 @@ func newTestServerWithLog(name string, transporter Transporter, entries []*LogEn
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
entry.encode(f)
|
||||
}
|
||||
|
@ -95,10 +97,10 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]*
|
|||
}
|
||||
for _, server := range servers {
|
||||
server.SetHeartbeatTimeout(testHeartbeatTimeout)
|
||||
server.Initialize()
|
||||
for _, peer := range servers {
|
||||
server.AddPeer(peer.Name())
|
||||
}
|
||||
server.Initialize()
|
||||
}
|
||||
return servers
|
||||
}
|
||||
|
@ -129,7 +131,6 @@ func (t *testTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer
|
|||
return t.SendSnapshotRecoveryRequest(server, peer, req)
|
||||
}
|
||||
|
||||
|
||||
type testStateMachine struct {
|
||||
saveFunc func() ([]byte, error)
|
||||
recoveryFunc func([]byte) error
|
||||
|
|
Loading…
Reference in New Issue