From c83db5e6e25ec4a7a0cc710c08f33a82dfc11f58 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 26 Nov 2013 23:36:42 -0500 Subject: [PATCH 01/23] refactor(server.go) make async really async. --- server.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/server.go b/server.go index e93dc9445c..8ccedf3dbd 100644 --- a/server.go +++ b/server.go @@ -515,15 +515,17 @@ func (s *server) loop() { // Sends an event to the event loop to be processed. The function will wait // until the event is actually processed before returning. func (s *server) send(value interface{}) (interface{}, error) { - event := s.sendAsync(value) + event := &event{target: value, c: make(chan error, 1)} + s.c <- event err := <-event.c return event.returnValue, err } -func (s *server) sendAsync(value interface{}) *event { - event := &event{target: value, c: make(chan error, 1)} - s.c <- event - return event +func (s *server) sendAsync(value interface{}) { + go func() { + event := &event{target: value, c: make(chan error, 1)} + s.c <- event + }() } // The event loop that is run when the server is in a Follower state. @@ -532,7 +534,6 @@ func (s *server) sendAsync(value interface{}) *event { // 1.Receiving valid AppendEntries RPC, or // 2.Granting vote to candidate func (s *server) followerLoop() { - s.setState(Follower) timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2) @@ -812,12 +813,9 @@ func (s *server) processCommand(command Command, e *event) { resp.peer = s.Name() // this must be async - // sendAsync is not really async every time // when the sending speed of the user is larger than // the processing speed of the server, the buffered channel - // will be full. Then sendAsync will become sync, which will - // cause deadlock here. - // so we use a goroutine to avoid the deadlock + // will be full, which may blocking the sending if not async. go s.sendAsync(resp) } From 3deb46e6f5d1b3c50a59d46e863f0a9224d2ef9b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 26 Nov 2013 23:38:07 -0500 Subject: [PATCH 02/23] refactor(peer.go) use async send if possible --- peer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/peer.go b/peer.go index 05a398178b..acbd8c1642 100644 --- a/peer.go +++ b/peer.go @@ -226,7 +226,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { // Attach the peer to resp, thus server can know where it comes from resp.peer = p.Name // Send response to server for processing. - p.server.send(resp) + p.server.sendAsync(resp) } // Sends an Snapshot request to the peer through the transport. @@ -270,7 +270,7 @@ func (p *Peer) sendSnapshotRecoveryRequest() { return } // Send response to server for processing. - p.server.send(&AppendEntriesResponse{Term: resp.Term, Success: resp.Success, append: (resp.Term == p.server.currentTerm)}) + p.server.sendAsync(&AppendEntriesResponse{Term: resp.Term, Success: resp.Success, append: (resp.Term == p.server.currentTerm)}) } //-------------------------------------- From 1c6f2fcd2759d9a0a240d42f3c83ebf5801d3f0f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Tue, 26 Nov 2013 23:43:02 -0500 Subject: [PATCH 03/23] refactor(server.go) do not construct the string if not necessary --- server.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/server.go b/server.go index 8ccedf3dbd..9b4ef60c9f 100644 --- a/server.go +++ b/server.go @@ -1324,9 +1324,13 @@ func (s *server) readConf() error { //-------------------------------------- func (s *server) debugln(v ...interface{}) { - debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...)) + if logLevel > Debug { + debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...)) + } } func (s *server) traceln(v ...interface{}) { - tracef("[%s] %s", s.name, fmt.Sprintln(v...)) + if logLevel > Trace { + tracef("[%s] %s", s.name, fmt.Sprintln(v...)) + } } From 59cb7259c23e0bce91d0cab4fd229fff00555996 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 27 Nov 2013 00:30:03 -0500 Subject: [PATCH 04/23] fix(server.go/peer.go) server.stop should stop heartbeat before close the log --- peer.go | 14 ++------------ server.go | 12 +++++++++--- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/peer.go b/peer.go index acbd8c1642..c03f813eea 100644 --- a/peer.go +++ b/peer.go @@ -79,7 +79,7 @@ func (p *Peer) setPrevLogIndex(value uint64) { // Starts the peer heartbeat. func (p *Peer) startHeartbeat() { - p.stopChan = make(chan bool, 1) + p.stopChan = make(chan bool) c := make(chan bool) go p.heartbeat(c) <-c @@ -87,17 +87,7 @@ func (p *Peer) startHeartbeat() { // Stops the peer heartbeat. func (p *Peer) stopHeartbeat(flush bool) { - // here is a problem - // the previous stop is no buffer leader may get blocked - // when heartbeat returns - // I make the channel with 1 buffer - // and try to panic here - select { - case p.stopChan <- flush: - - default: - panic("[" + p.server.Name() + "] cannot stop [" + p.Name + "] heartbeat") - } + p.stopChan <- flush } //-------------------------------------- diff --git a/server.go b/server.go index 9b4ef60c9f..12abf1cfec 100644 --- a/server.go +++ b/server.go @@ -111,7 +111,9 @@ type server struct { mutex sync.RWMutex syncedPeer map[string]bool - c chan *event + c chan *event + stopped chan bool + electionTimeout time.Duration heartbeatTimeout time.Duration @@ -159,6 +161,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S peers: make(map[string]*Peer), log: newLog(), c: make(chan *event, 256), + stopped: make(chan bool), electionTimeout: DefaultElectionTimeout, heartbeatTimeout: DefaultHeartbeatTimeout, maxLogEntriesPerRequest: MaxLogEntriesPerRequest, @@ -429,8 +432,9 @@ func (s *server) Start() error { // Shuts down the server. func (s *server) Stop() { s.send(&stopValue) - s.mutex.Lock() - defer s.mutex.Unlock() + + // make sure the server has stopped before we close the log + <-s.stopped s.log.close() } @@ -507,6 +511,7 @@ func (s *server) loop() { s.snapshotLoop() case Stopped: + s.stopped <- true return } } @@ -727,6 +732,7 @@ func (s *server) leaderLoop() { for _, peer := range s.peers { peer.stopHeartbeat(false) } + s.syncedPeer = nil } From 2252dfea4ca6c39a05d1a3c944a6ed6a0b3e9e22 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 27 Nov 2013 00:42:01 -0500 Subject: [PATCH 05/23] chrod more clear debug info for committing --- log.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/log.go b/log.go index b4cf172e30..9371a71166 100644 --- a/log.go +++ b/log.go @@ -405,7 +405,7 @@ func (l *Log) setCommitIndex(index uint64) error { // Apply the changes to the state machine and store the error code. returnValue, err := l.ApplyFunc(command) - debugln("setCommitIndex.set.result index: ", entryIndex) + debugf("setCommitIndex.set.result index: %v entires index: ", i, entryIndex) l.results[entryIndex] = &logResult{returnValue: returnValue, err: err} } return nil From 6d064b5ca55639d3ebb2428676254a8d55e98560 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 27 Nov 2013 00:50:03 -0500 Subject: [PATCH 06/23] refactor make deny vote more clear --- server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server.go b/server.go index 12abf1cfec..9656c115ce 100644 --- a/server.go +++ b/server.go @@ -951,7 +951,7 @@ func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot // If the request is coming from an old term then reject it. if req.Term < s.Term() { - s.debugln("server.rv.error: stale term") + s.debugln("server.rv.deny.vote: cause stale term") return newRequestVoteResponse(s.currentTerm, false), false } @@ -959,7 +959,7 @@ func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot // If we've already voted for a different candidate then don't vote for this candidate. if s.votedFor != "" && s.votedFor != req.CandidateName { - s.debugln("server.rv.error: duplicate vote: ", req.CandidateName, + s.debugln("server.deny.vote: cause duplicate vote: ", req.CandidateName, " already vote for ", s.votedFor) return newRequestVoteResponse(s.currentTerm, false), false } @@ -967,7 +967,7 @@ func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot // If the candidate's log is not at least as up-to-date as our last log then don't vote. lastIndex, lastTerm := s.log.lastInfo() if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm { - s.debugln("server.rv.error: out of date log: ", req.CandidateName, + s.debugln("server.deny.vote: cause out of date log: ", req.CandidateName, "Index :[", lastIndex, "]", " [", req.LastLogIndex, "]", "Term :[", lastTerm, "]", " [", req.LastLogTerm, "]") return newRequestVoteResponse(s.currentTerm, false), false From 42e4b8d3f5acadcf5c44654c07cce748a2ea41cc Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Mon, 30 Dec 2013 16:58:38 +0800 Subject: [PATCH 07/23] remove command timeout --- append_entries_request_test.go | 2 +- log.go | 67 ++++++++++++---------------------- log_entry.go | 6 +-- log_test.go | 30 +++++++-------- server.go | 34 +---------------- server_test.go | 28 +++++++------- test.go | 2 +- 7 files changed, 59 insertions(+), 110 deletions(-) diff --git a/append_entries_request_test.go b/append_entries_request_test.go index d8cbce735d..34a827f672 100644 --- a/append_entries_request_test.go +++ b/append_entries_request_test.go @@ -28,7 +28,7 @@ func createTestAppendEntriesRequest(entryCount int) (*AppendEntriesRequest, []by entries := make([]*LogEntry, 0) for i := 0; i < entryCount; i++ { command := &DefaultJoinCommand{Name: "localhost:1000"} - entry, _ := newLogEntry(nil, 1, 2, command) + entry, _ := newLogEntry(nil, nil, 1, 2, command) entries = append(entries, entry) } req := newAppendEntriesRequest(1, 1, 1, 1, "leader", entries) diff --git a/log.go b/log.go index 164958eeb3..7dcb24021b 100644 --- a/log.go +++ b/log.go @@ -23,7 +23,6 @@ type Log struct { file *os.File path string entries []*LogEntry - results []*logResult commitIndex uint64 mutex sync.RWMutex startIndex uint64 // the index before the first entry in the Log entries @@ -162,7 +161,7 @@ func (l *Log) open(path string) error { // Read the file and decode entries. for { // Instantiate log entry and decode into it. - entry, _ := newLogEntry(l, 0, 0, nil) + entry, _ := newLogEntry(l, nil, 0, 0, nil) entry.Position, _ = l.file.Seek(0, os.SEEK_CUR) n, err := entry.decode(l.file) @@ -191,8 +190,6 @@ func (l *Log) open(path string) error { readBytes += int64(n) } - l.results = make([]*logResult, len(l.entries)) - debugln("open.log.recovery number of log ", len(l.entries)) return nil } @@ -207,7 +204,6 @@ func (l *Log) close() { l.file = nil } l.entries = make([]*LogEntry, 0) - l.results = make([]*logResult, 0) } //-------------------------------------- @@ -215,8 +211,8 @@ func (l *Log) close() { //-------------------------------------- // Creates a log entry associated with this log. -func (l *Log) createEntry(term uint64, command Command) (*LogEntry, error) { - return newLogEntry(l, l.nextIndex(), term, command) +func (l *Log) createEntry(term uint64, command Command, e *ev) (*LogEntry, error) { + return newLogEntry(l, e, l.nextIndex(), term, command) } // Retrieves an entry from the log. If the entry has been eliminated because @@ -276,35 +272,6 @@ func (l *Log) getEntriesAfter(index uint64, maxLogEntriesPerRequest uint64) ([]* } } -// Retrieves the return value and error for an entry. The result can only exist -// after the entry has been committed. -func (l *Log) getEntryResult(entry *LogEntry, clear bool) (interface{}, error) { - l.mutex.RLock() - defer l.mutex.RUnlock() - - if entry == nil { - panic("raft: Log entry required for error retrieval") - } - debugln("getEntryResult.result index: ", entry.Index-l.startIndex-1) - // If a result exists for the entry then return it with its error. - if entry.Index > l.startIndex && entry.Index <= l.startIndex+uint64(len(l.results)) { - if result := l.results[entry.Index-l.startIndex-1]; result != nil { - - // keep the records before remove it - returnValue, err := result.returnValue, result.err - - // Remove reference to result if it's being cleared after retrieval. - if clear { - result.returnValue = nil - } - - return returnValue, err - } - } - - return nil, nil -} - //-------------------------------------- // Commit //-------------------------------------- @@ -402,7 +369,10 @@ func (l *Log) setCommitIndex(index uint64) error { // Apply the changes to the state machine and store the error code. returnValue, err := l.ApplyFunc(command) debugln("setCommitIndex.set.result index: ", entryIndex) - l.results[entryIndex] = &logResult{returnValue: returnValue, err: err} + if entry.event != nil { + entry.event.returnValue = returnValue + entry.event.c <- err + } } return nil } @@ -443,6 +413,14 @@ func (l *Log) truncate(index uint64, term uint64) error { debugln("log.truncate.clear") l.file.Truncate(0) l.file.Seek(0, os.SEEK_SET) + + // notify clients if this node is the previous leader + for _, entry := range l.entries { + if entry.event != nil { + entry.event.c <- errors.New("command failed to be committed due to node failure") + } + } + l.entries = []*LogEntry{} } else { // Do not truncate if the entry at index does not have the matching term. @@ -458,6 +436,15 @@ func (l *Log) truncate(index uint64, term uint64) error { position := l.entries[index-l.startIndex].Position l.file.Truncate(position) l.file.Seek(position, os.SEEK_SET) + + // notify clients if this node is the previous leader + for i := index - l.startIndex; i < uint64(len(l.entries)); i++ { + entry := l.entries[i] + if entry.event != nil { + entry.event.c <- errors.New("command failed to be committed due to node failure") + } + } + l.entries = l.entries[0 : index-l.startIndex] } } @@ -529,7 +516,6 @@ func (l *Log) appendEntry(entry *LogEntry) error { // Append to entries list if stored on disk. l.entries = append(l.entries, entry) - l.results = append(l.results, nil) return nil } @@ -558,7 +544,6 @@ func (l *Log) writeEntry(entry *LogEntry, w io.Writer) (int64, error) { // Append to entries list if stored on disk. l.entries = append(l.entries, entry) - l.results = append(l.results, nil) return int64(size), nil } @@ -570,7 +555,6 @@ func (l *Log) writeEntry(entry *LogEntry, w io.Writer) (int64, error) { // compact the log before index (including index) func (l *Log) compact(index uint64, term uint64) error { var entries []*LogEntry - var results []*logResult l.mutex.Lock() defer l.mutex.Unlock() @@ -583,11 +567,9 @@ func (l *Log) compact(index uint64, term uint64) error { // we just recovery from on snapshot if index >= l.internalCurrentIndex() { entries = make([]*LogEntry, 0) - results = make([]*logResult, 0) } else { // get all log entries after index entries = l.entries[index-l.startIndex:] - results = l.results[index-l.startIndex:] } // create a new log file and add all the entries @@ -621,7 +603,6 @@ func (l *Log) compact(index uint64, term uint64) error { // compaction the in memory log l.entries = entries - l.results = results l.startIndex = index l.startTerm = term return nil diff --git a/log_entry.go b/log_entry.go index 7b50dd3fc5..dc62a1d1e7 100644 --- a/log_entry.go +++ b/log_entry.go @@ -17,11 +17,11 @@ type LogEntry struct { CommandName string Command []byte Position int64 // position in the log file - commit chan bool + event *ev } // Creates a new log entry associated with a log. -func newLogEntry(log *Log, index uint64, term uint64, command Command) (*LogEntry, error) { +func newLogEntry(log *Log, event *ev, index uint64, term uint64, command Command) (*LogEntry, error) { var buf bytes.Buffer var commandName string if command != nil { @@ -41,7 +41,7 @@ func newLogEntry(log *Log, index uint64, term uint64, command Command) (*LogEntr Term: term, CommandName: commandName, Command: buf.Bytes(), - commit: make(chan bool, 5), + event: event, } return e, nil diff --git a/log_test.go b/log_test.go index e890090c35..e2b53c8467 100644 --- a/log_test.go +++ b/log_test.go @@ -30,15 +30,15 @@ func TestLogNewLog(t *testing.T) { defer log.close() defer os.Remove(path) - e, _ := newLogEntry(log, 1, 1, &testCommand1{Val: "foo", I: 20}) + e, _ := newLogEntry(log, nil, 1, 1, &testCommand1{Val: "foo", I: 20}) if err := log.appendEntry(e); err != nil { t.Fatalf("Unable to append: %v", err) } - e, _ = newLogEntry(log, 2, 1, &testCommand2{X: 100}) + e, _ = newLogEntry(log, nil, 2, 1, &testCommand2{X: 100}) if err := log.appendEntry(e); err != nil { t.Fatalf("Unable to append: %v", err) } - e, _ = newLogEntry(log, 3, 2, &testCommand1{Val: "bar", I: 0}) + e, _ = newLogEntry(log, nil, 3, 2, &testCommand1{Val: "bar", I: 0}) if err := log.appendEntry(e); err != nil { t.Fatalf("Unable to append: %v", err) } @@ -63,9 +63,9 @@ func TestLogNewLog(t *testing.T) { // Ensure that we can decode and encode to an existing log. func TestLogExistingLog(t *testing.T) { tmpLog := newLog() - e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20}) - e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100}) - e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0}) + e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20}) + e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100}) + e2, _ := newLogEntry(tmpLog, nil, 3, 2, &testCommand1{Val: "bar", I: 0}) log, path := setupLog([]*LogEntry{e0, e1, e2}) defer log.close() defer os.Remove(path) @@ -88,9 +88,9 @@ func TestLogExistingLog(t *testing.T) { // Ensure that we can check the contents of the log by index/term. func TestLogContainsEntries(t *testing.T) { tmpLog := newLog() - e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20}) - e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100}) - e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0}) + e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20}) + e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100}) + e2, _ := newLogEntry(tmpLog, nil, 3, 2, &testCommand1{Val: "bar", I: 0}) log, path := setupLog([]*LogEntry{e0, e1, e2}) defer log.close() defer os.Remove(path) @@ -115,8 +115,8 @@ func TestLogContainsEntries(t *testing.T) { // Ensure that we can recover from an incomplete/corrupt log and continue logging. func TestLogRecovery(t *testing.T) { tmpLog := newLog() - e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20}) - e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100}) + e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20}) + e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100}) f, _ := ioutil.TempFile("", "raft-log-") e0.encode(f) @@ -134,7 +134,7 @@ func TestLogRecovery(t *testing.T) { defer log.close() defer os.Remove(f.Name()) - e, _ := newLogEntry(log, 3, 2, &testCommand1{Val: "bat", I: -5}) + e, _ := newLogEntry(log, nil, 3, 2, &testCommand1{Val: "bat", I: -5}) if err := log.appendEntry(e); err != nil { t.Fatalf("Unable to append: %v", err) } @@ -167,15 +167,15 @@ func TestLogTruncate(t *testing.T) { defer os.Remove(path) - entry1, _ := newLogEntry(log, 1, 1, &testCommand1{Val: "foo", I: 20}) + entry1, _ := newLogEntry(log, nil, 1, 1, &testCommand1{Val: "foo", I: 20}) if err := log.appendEntry(entry1); err != nil { t.Fatalf("Unable to append: %v", err) } - entry2, _ := newLogEntry(log, 2, 1, &testCommand2{X: 100}) + entry2, _ := newLogEntry(log, nil, 2, 1, &testCommand2{X: 100}) if err := log.appendEntry(entry2); err != nil { t.Fatalf("Unable to append: %v", err) } - entry3, _ := newLogEntry(log, 3, 2, &testCommand1{Val: "bar", I: 0}) + entry3, _ := newLogEntry(log, nil, 3, 2, &testCommand1{Val: "bar", I: 0}) if err := log.appendEntry(entry3); err != nil { t.Fatalf("Unable to append: %v", err) } diff --git a/server.go b/server.go index bb3efd68f6..4513196da0 100644 --- a/server.go +++ b/server.go @@ -837,7 +837,7 @@ func (s *server) processCommand(command Command, e *ev) { s.debugln("server.command.process") // Create an entry for the command in the log. - entry, err := s.log.createEntry(s.currentTerm, command) + entry, err := s.log.createEntry(s.currentTerm, command, e) if err != nil { s.debugln("server.command.log.entry.error:", err) @@ -851,23 +851,6 @@ func (s *server) processCommand(command Command, e *ev) { return } - // Issue a callback for the entry once it's committed. - go func() { - // Wait for the entry to be committed. - select { - case <-entry.commit: - var err error - s.debugln("server.command.commit") - e.returnValue, err = s.log.getEntryResult(entry, true) - e.c <- err - case <-time.After(time.Second): - s.debugln("server.command.timeout") - e.c <- CommandTimeoutError - } - - entry.commit = nil - }() - // Issue an append entries response for the server. resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()) resp.append = true @@ -972,21 +955,6 @@ func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) { if commitIndex > committedIndex { s.log.setCommitIndex(commitIndex) s.debugln("commit index ", commitIndex) - for i := committedIndex; i < commitIndex; i++ { - if entry := s.log.getEntry(i + 1); entry != nil { - // if the leader is a new one and the entry came from the - // old leader, the commit channel will be nil and no go routine - // is waiting from this channel - // if we try to send to it, the new leader will get stuck - if entry.commit != nil { - select { - case entry.commit <- true: - default: - panic("server unable to send signal to commit channel") - } - } - } - } } } diff --git a/server_test.go b/server_test.go index f8be91e5f8..3ac36811b0 100644 --- a/server_test.go +++ b/server_test.go @@ -111,9 +111,9 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) { // Ensure that a vote request is denied if the log is out of date. func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) { tmpLog := newLog() - e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20}) - e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100}) - e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0}) + e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20}) + e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100}) + e2, _ := newLogEntry(tmpLog, nil, 3, 2, &testCommand1{Val: "bar", I: 0}) s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2}) // start as a follower with term 2 and index 3 @@ -151,7 +151,7 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) { // // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader. func TestServerPromoteSelf(t *testing.T) { - e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20}) + e0, _ := newLogEntry(newLog(), nil, 1, 1, &testCommand1{Val: "foo", I: 20}) s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0}) // start as a follower @@ -204,7 +204,7 @@ func TestServerAppendEntries(t *testing.T) { defer s.Stop() // Append single entry. - e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) + e, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10}) entries := []*LogEntry{e} resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries)) if resp.Term != 1 || !resp.Success { @@ -215,8 +215,8 @@ func TestServerAppendEntries(t *testing.T) { } // Append multiple entries + commit the last one. - e1, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20}) - e2, _ := newLogEntry(nil, 3, 1, &testCommand1{Val: "baz", I: 30}) + e1, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "bar", I: 20}) + e2, _ := newLogEntry(nil, nil, 3, 1, &testCommand1{Val: "baz", I: 30}) entries = []*LogEntry{e1, e2} resp = s.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries)) if resp.Term != 1 || !resp.Success { @@ -248,7 +248,7 @@ func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) { s.(*server).mutex.Unlock() // Append single entry. - e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) + e, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10}) entries := []*LogEntry{e} resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries)) if resp.Term != 2 || resp.Success { @@ -266,8 +266,8 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) { defer s.Stop() // Append single entry + commit. - e1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) - e2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15}) + e1, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10}) + e2, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "foo", I: 15}) entries := []*LogEntry{e1, e2} resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries)) if resp.Term != 1 || !resp.Success { @@ -275,7 +275,7 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) { } // Append entry again (post-commit). - e, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20}) + e, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "bar", I: 20}) entries = []*LogEntry{e} resp = s.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries)) if resp.Term != 1 || resp.Success { @@ -289,9 +289,9 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) { s.Start() defer s.Stop() - entry1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) - entry2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15}) - entry3, _ := newLogEntry(nil, 2, 2, &testCommand1{Val: "bar", I: 20}) + entry1, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10}) + entry2, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "foo", I: 15}) + entry3, _ := newLogEntry(nil, nil, 2, 2, &testCommand1{Val: "bar", I: 20}) // Append single entry + commit. entries := []*LogEntry{entry1, entry2} diff --git a/test.go b/test.go index dfe0a39f28..3ea9c7e3b1 100644 --- a/test.go +++ b/test.go @@ -103,7 +103,7 @@ func newTestServerWithLog(name string, transporter Transporter, entries []*LogEn func newTestCluster(names []string, transporter Transporter, lookup map[string]Server) []Server { servers := []Server{} - e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20}) + e0, _ := newLogEntry(newLog(), nil, 1, 1, &testCommand1{Val: "foo", I: 20}) for _, name := range names { if lookup[name] != nil { From 3509cfa7b12a39cde502b0d4afa1dd1023ce603c Mon Sep 17 00:00:00 2001 From: Greg Brockman Date: Wed, 1 Jan 2014 23:40:29 -0800 Subject: [PATCH 08/23] Make HttpTranporter.Transport into an exported field With this patch, it becomes possible to use the HTTPTransporter over custom transports, such as Unix sockets. --- http_transporter.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/http_transporter.go b/http_transporter.go index 3e5df39159..a74a5d628e 100644 --- a/http_transporter.go +++ b/http_transporter.go @@ -24,7 +24,7 @@ type HTTPTransporter struct { appendEntriesPath string requestVotePath string httpClient http.Client - transport *http.Transport + Transport *http.Transport } type HTTPMuxer interface { @@ -44,9 +44,9 @@ func NewHTTPTransporter(prefix string) *HTTPTransporter { prefix: prefix, appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"), requestVotePath: fmt.Sprintf("%s%s", prefix, "/requestVote"), - transport: &http.Transport{DisableKeepAlives: false}, + Transport: &http.Transport{DisableKeepAlives: false}, } - t.httpClient.Transport = t.transport + t.httpClient.Transport = t.Transport return t } @@ -102,7 +102,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, re url := fmt.Sprintf("%s%s", peer.ConnectionString, t.AppendEntriesPath()) traceln(server.Name(), "POST", url) - t.transport.ResponseHeaderTimeout = server.ElectionTimeout() + t.Transport.ResponseHeaderTimeout = server.ElectionTimeout() httpResp, err := t.httpClient.Post(url, "application/protobuf", &b) if httpResp == nil || err != nil { traceln("transporter.ae.response.error:", err) From c6a6491d39b4558b2c8306fb39f874e4806b7592 Mon Sep 17 00:00:00 2001 From: Joseph Crail Date: Wed, 8 Jan 2014 22:59:13 -0500 Subject: [PATCH 09/23] Fix spelling errors in comments and strings. --- log.go | 4 ++-- peer.go | 2 +- server.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/log.go b/log.go index 7dcb24021b..1e1eb203b9 100644 --- a/log.go +++ b/log.go @@ -262,7 +262,7 @@ func (l *Log) getEntriesAfter(index uint64, maxLogEntriesPerRequest uint64) ([]* entries := l.entries[index-l.startIndex:] length := len(entries) - traceln("log.entriesAfter: startIndex:", l.startIndex, " lenght", len(l.entries)) + traceln("log.entriesAfter: startIndex:", l.startIndex, " length", len(l.entries)) if uint64(length) < maxLogEntriesPerRequest { // Determine the term at the given entry and return a subslice. @@ -336,7 +336,7 @@ func (l *Log) setCommitIndex(index uint64) error { // Do not allow previous indices to be committed again. // This could happens, since the guarantee is that the new leader has up-to-dated - // log entires rather than has most up-to-dated committed index + // log entries rather than has most up-to-dated committed index // For example, Leader 1 send log 80 to follower 2 and follower 3 // follower 2 and follow 3 all got the new entries and reply diff --git a/peer.go b/peer.go index 1a0f205837..4cfeb4e939 100644 --- a/peer.go +++ b/peer.go @@ -202,7 +202,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { if resp.CommitIndex >= p.prevLogIndex { // we may miss a response from peer - // so maybe the peer has commited the logs we sent + // so maybe the peer has committed the logs we sent // but we did not receive the success reply and did not increase // the prevLogIndex diff --git a/server.go b/server.go index 4513196da0..792b328124 100644 --- a/server.go +++ b/server.go @@ -908,7 +908,7 @@ func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true } - // once the server appended and commited all the log entries from the leader + // once the server appended and committed all the log entries from the leader return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true } From a4ce4f07b97481149345780977bac1a19cf9981f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 9 Jan 2014 18:54:58 +0800 Subject: [PATCH 10/23] refactor(server.go) try non-blocking send first --- server.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index 3498a7e4e2..4eb08d44aa 100644 --- a/server.go +++ b/server.go @@ -572,8 +572,17 @@ func (s *server) send(value interface{}) (interface{}, error) { } func (s *server) sendAsync(value interface{}) { + event := &ev{target: value, c: make(chan error, 1)} + // try a non-blocking send first + // in most cases, this should not be blocking + // avoid create unnecessary go routines + select { + case s.c <- event: + return + default: + } + go func() { - event := &ev{target: value, c: make(chan error, 1)} s.c <- event }() } From cede92743124b8dc16909432248139f7ce02eaea Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 9 Jan 2014 21:26:35 +0800 Subject: [PATCH 11/23] refactor(peer.go) clean up debugging info --- peer.go | 32 ++++++++++++++++++-------------- server.go | 1 - 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/peer.go b/peer.go index d2b1b8737d..e3abf6cf96 100644 --- a/peer.go +++ b/peer.go @@ -130,7 +130,7 @@ func (p *Peer) heartbeat(c chan bool) { // before we can safely remove a node // we must flush the remove command to the node first p.flush() - debugln("peer.heartbeat.stop: ", p.Name) + debugln("peer.heartbeat.stop.with.flush: ", p.Name) return } @@ -141,7 +141,7 @@ func (p *Peer) heartbeat(c chan bool) { } func (p *Peer) flush() { - debugln("peer.heartbeat.run: ", p.Name) + debugln("peer.heartbeat.flush: ", p.Name) prevLogIndex := p.getPrevLogIndex() entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest) @@ -162,15 +162,16 @@ func (p *Peer) flush() { // Sends an AppendEntries request to the peer through the transport. func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { - traceln("peer.flush.send: ", p.server.Name(), "->", p.Name, " ", len(req.Entries)) + tracef("peer.append.send: %s->%s [prevLog:%v length: %v]\n", + p.server.Name(), p.Name, req.PrevLogIndex, len(req.Entries)) resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req) if resp == nil { p.server.DispatchEvent(newEvent(HeartbeatTimeoutEventType, p, nil)) - debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name) + debugln("peer.append.timeout: ", p.server.Name(), "->", p.Name) return } - traceln("peer.flush.recv: ", p.Name) + traceln("peer.append.resp: ", p.server.Name(), "<-", p.Name) // If successful then update the previous log index. p.mutex.Lock() @@ -184,21 +185,22 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { resp.append = true } } - traceln("peer.flush.success: ", p.server.Name(), "->", p.Name, "; idx =", p.prevLogIndex) - + traceln("peer.append.resp.success: ", p.Name, "; idx =", p.prevLogIndex) // If it was unsuccessful then decrement the previous log index and // we'll try again next time. } else { if resp.CommitIndex >= p.prevLogIndex { - // we may miss a response from peer - // so maybe the peer has committed the logs we sent - // but we did not receive the success reply and did not increase + // so maybe the peer has committed the logs we just sent + // but we did not receive the successful reply and did not increase // the prevLogIndex - p.prevLogIndex = resp.CommitIndex + // peer failed to truncate the log and sent a fail reply at this time + // we just need to update peer's prevLog index to commitIndex + + p.prevLogIndex = resp.CommitIndex + debugln("peer.append.resp.update: ", p.Name, "; idx =", p.prevLogIndex) - debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex) } else if p.prevLogIndex > 0 { // Decrement the previous log index down until we find a match. Don't // let it go below where the peer's commit index is though. That's a @@ -209,7 +211,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { p.prevLogIndex = resp.Index } - debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex) + debugln("peer.append.resp.decrement: ", p.Name, "; idx =", p.prevLogIndex) } } p.mutex.Unlock() @@ -273,8 +275,10 @@ func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteRespo debugln("peer.vote: ", p.server.Name(), "->", p.Name) req.peer = p if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil { - debugln("peer.vote: recv", p.server.Name(), "<-", p.Name) + debugln("peer.vote.recv: ", p.server.Name(), "<-", p.Name) resp.peer = p c <- resp + } else { + debugln("peer.vote.failed: ", p.server.Name(), "<-", p.Name) } } diff --git a/server.go b/server.go index 4eb08d44aa..9cf3a4336d 100644 --- a/server.go +++ b/server.go @@ -887,7 +887,6 @@ func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse // Processes the "append entries" request. func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) { - s.traceln("server.ae.process") if req.Term < s.currentTerm { From a778d942b3a01c7d6e3c336e13d1dd4bbff6700f Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 10 Jan 2014 18:05:09 +0800 Subject: [PATCH 12/23] test(server_test.go) Fix a deadlock. We stop all the servers before finishing the MultiNode test. The mock transporter directly calls to the function of the target server. If the target is stopped, the transporter will just hang there waiting for a reply. So the sender server will also hang for a reply. We add a timeout for the reply to solve this problem. --- server_test.go | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/server_test.go b/server_test.go index 3ac36811b0..4994faeddb 100644 --- a/server_test.go +++ b/server_test.go @@ -501,7 +501,19 @@ func TestServerMultiNode(t *testing.T) { clonedReq := &RequestVoteRequest{} json.Unmarshal(b, clonedReq) - return target.RequestVote(clonedReq) + c := make(chan *RequestVoteResponse) + + go func() { + c <- target.RequestVote(clonedReq) + }() + + select { + case resp := <-c: + return resp + case <-time.After(time.Millisecond * 200): + return nil + } + } transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { mutex.RLock() @@ -512,7 +524,18 @@ func TestServerMultiNode(t *testing.T) { clonedReq := &AppendEntriesRequest{} json.Unmarshal(b, clonedReq) - return target.AppendEntries(clonedReq) + c := make(chan *AppendEntriesResponse) + + go func() { + c <- target.AppendEntries(clonedReq) + }() + + select { + case resp := <-c: + return resp + case <-time.After(time.Millisecond * 200): + return nil + } } disTransporter := &testTransporter{} From caaebb1b567852da8ca12ff1869b55c8dfa8e47b Mon Sep 17 00:00:00 2001 From: Baruch Even Date: Mon, 2 Dec 2013 23:21:46 +0200 Subject: [PATCH 13/23] Fix spelling mistake in snapshot.go comment --- snapshot.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snapshot.go b/snapshot.go index 4f416f741a..dcdcac304d 100644 --- a/snapshot.go +++ b/snapshot.go @@ -52,7 +52,7 @@ func (ss *Snapshot) save() error { return err } - // force the change writting to disk + // force the change writing to disk file.Sync() return err } From b66cb1a0ded3adbe56041a8a8e1e168e46cd05bc Mon Sep 17 00:00:00 2001 From: Baruch Even Date: Mon, 2 Dec 2013 23:26:53 +0200 Subject: [PATCH 14/23] Correct replacing of log file Do not leak resources in case of errors and ensure data is synced to disk before replacing files. --- log.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/log.go b/log.go index 39da19d95c..182f8253c2 100644 --- a/log.go +++ b/log.go @@ -573,7 +573,8 @@ func (l *Log) compact(index uint64, term uint64) error { } // create a new log file and add all the entries - file, err := os.OpenFile(l.path+".new", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) + new_file_path := l.path + ".new" + file, err := os.OpenFile(new_file_path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) if err != nil { return err } @@ -582,25 +583,27 @@ func (l *Log) compact(index uint64, term uint64) error { entry.Position = position if _, err = entry.encode(file); err != nil { + file.Close() + os.Remove(new_file_path) return err } } - // close the current log file - l.file.Close() + file.Sync() - // remove the current log file to .bak - err = os.Remove(l.path) - if err != nil { - return err - } + old_file := l.file // rename the new log file - err = os.Rename(l.path+".new", l.path) + err = os.Rename(new_file_path, l.path) if err != nil { + file.Close() + os.Remove(new_file_path) return err } l.file = file + // close the old log file + old_file.Close() + // compaction the in memory log l.entries = entries l.startIndex = index From 3f983817390ab6ed0022834b9e637b0ab5a81657 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 10 Jan 2014 20:45:53 +0800 Subject: [PATCH 15/23] fix(sync) leader should do a sync before committing log entires As we do not fsync every time log writes to file when the state is leader, we need to do fsync before actually committing the log entries to ensure safety. --- log.go | 7 ++++++- server.go | 2 ++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/log.go b/log.go index 182f8253c2..d14207f25d 100644 --- a/log.go +++ b/log.go @@ -206,6 +206,11 @@ func (l *Log) close() { l.entries = make([]*LogEntry, 0) } +// sync to disk +func (l *Log) sync() error { + return l.file.Sync() +} + //-------------------------------------- // Entries //-------------------------------------- @@ -477,7 +482,7 @@ func (l *Log) appendEntries(entries []*LogEntry) error { startPosition += size } w.Flush() - err = l.file.Sync() + err = l.sync() if err != nil { panic(err) diff --git a/server.go b/server.go index 9cf3a4336d..8303ae1fea 100644 --- a/server.go +++ b/server.go @@ -960,6 +960,8 @@ func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) { committedIndex := s.log.commitIndex if commitIndex > committedIndex { + // leader needs to do a fsync before committing log entries + s.log.sync() s.log.setCommitIndex(commitIndex) s.debugln("commit index ", commitIndex) } From 4a4b7391a3f8d570da3560d0222657a66139e7ea Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 10 Jan 2014 20:49:33 +0800 Subject: [PATCH 16/23] Save config file atomically --- server.go | 2 +- util.go | 25 +++++++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) create mode 100644 util.go diff --git a/server.go b/server.go index 8303ae1fea..e3b6005dfd 100644 --- a/server.go +++ b/server.go @@ -1331,7 +1331,7 @@ func (s *server) writeConf() { confPath := path.Join(s.path, "conf") tmpConfPath := path.Join(s.path, "conf.tmp") - err := ioutil.WriteFile(tmpConfPath, b, 0600) + err := writeFileSynced(tmpConfPath, b, 0600) if err != nil { panic(err) diff --git a/util.go b/util.go new file mode 100644 index 0000000000..ed0acd2e06 --- /dev/null +++ b/util.go @@ -0,0 +1,25 @@ +package raft + +import ( + "io" + "os" +) + +// WriteFile writes data to a file named by filename. +// If the file does not exist, WriteFile creates it with permissions perm; +// otherwise WriteFile truncates it before writing. +// This is copied from ioutil.WriteFile with the addition of a Sync call to +// ensure the data reaches the disk. +func writeFileSynced(filename string, data []byte, perm os.FileMode) error { + f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) + if err != nil { + return err + } + n, err := f.Write(data) + f.Sync() + f.Close() + if err == nil && n < len(data) { + err = io.ErrShortWrite + } + return err +} From 4aa1042d780b0e5e30ac2dcd7951d6b7890cb05c Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 12 Jan 2014 16:01:38 +0800 Subject: [PATCH 17/23] refactor(server.go) Leader do not need to send its own response of receiving a command to itself. We send response to itself before, since it is a simple way to deal one machine cluster. After a little modification, we do not need to do this anymore. --- server.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/server.go b/server.go index 9cf3a4336d..f59ad51fe2 100644 --- a/server.go +++ b/server.go @@ -281,6 +281,7 @@ func (s *server) setState(state string) { s.state = state if state == Leader { s.leader = s.Name() + s.syncedPeer = make(map[string]bool) } // Dispatch state and leader change events. @@ -866,12 +867,12 @@ func (s *server) processCommand(command Command, e *ev) { return } - // Issue an append entries response for the server. - resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()) - resp.append = true - resp.peer = s.Name() - - s.sendAsync(resp) + s.syncedPeer[s.Name()] = true + if len(s.peers) == 0 { + commitIndex := s.log.currentIndex() + s.log.setCommitIndex(commitIndex) + s.debugln("commit index ", commitIndex) + } } //-------------------------------------- From 601a7996939a305f712a86b157f60c873322aab0 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 12 Jan 2014 16:05:31 +0800 Subject: [PATCH 18/23] do not make map twice --- server.go | 1 - 1 file changed, 1 deletion(-) diff --git a/server.go b/server.go index f59ad51fe2..dac211e863 100644 --- a/server.go +++ b/server.go @@ -754,7 +754,6 @@ func (s *server) candidateLoop() { // The event loop that is run when the server is in a Leader state. func (s *server) leaderLoop() { s.setState(Leader) - s.syncedPeer = make(map[string]bool) logIndex, _ := s.log.lastInfo() // Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat. From 6b0e6582268740964659d6f2ebb4031d04a7d631 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 13 Jan 2014 13:07:10 -0800 Subject: [PATCH 19/23] fix(logging): Fix malformed log statement --- log.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/log.go b/log.go index 39da19d95c..1d3f92e5b7 100644 --- a/log.go +++ b/log.go @@ -368,7 +368,7 @@ func (l *Log) setCommitIndex(index uint64) error { // Apply the changes to the state machine and store the error code. returnValue, err := l.ApplyFunc(command) - debugf("setCommitIndex.set.result index: %v entires index: ", i, entryIndex) + debugf("setCommitIndex.set.result index: %v, entries index: %v", i, entryIndex) if entry.event != nil { entry.event.returnValue = returnValue entry.event.c <- err From 8ca39ae2237115edc31fd6d8312daac290a1cb87 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 15 Jan 2014 20:26:54 +0800 Subject: [PATCH 20/23] fix(util.go) make writeFileSynced return error properly --- util.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/util.go b/util.go index ed0acd2e06..ff3d62f0c3 100644 --- a/util.go +++ b/util.go @@ -15,11 +15,17 @@ func writeFileSynced(filename string, data []byte, perm os.FileMode) error { if err != nil { return err } + n, err := f.Write(data) - f.Sync() - f.Close() - if err == nil && n < len(data) { - err = io.ErrShortWrite + if n < len(data) { + f.Close() + return io.ErrShortWrite } - return err + + err = f.Sync() + if err != nil { + return err + } + + return f.Close() } From 4157c675e0b6d92461823fcbeb631489910d7ac5 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Fri, 17 Jan 2014 13:37:05 -0500 Subject: [PATCH 21/23] handle Snapshot and SnapshotRecovery requests in the http transporter --- http_transporter.go | 137 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 123 insertions(+), 14 deletions(-) diff --git a/http_transporter.go b/http_transporter.go index a74a5d628e..1ab06dd380 100644 --- a/http_transporter.go +++ b/http_transporter.go @@ -5,6 +5,8 @@ import ( "fmt" "io" "net/http" + "net/url" + "path" ) // Parts from this transporter were heavily influenced by Peter Bougon's @@ -19,12 +21,14 @@ import ( // An HTTPTransporter is a default transport layer used to communicate between // multiple servers. type HTTPTransporter struct { - DisableKeepAlives bool - prefix string - appendEntriesPath string - requestVotePath string - httpClient http.Client - Transport *http.Transport + DisableKeepAlives bool + prefix string + appendEntriesPath string + requestVotePath string + snapshotPath string + snapshotRecoveryPath string + httpClient http.Client + Transport *http.Transport } type HTTPMuxer interface { @@ -40,11 +44,13 @@ type HTTPMuxer interface { // Creates a new HTTP transporter with the given path prefix. func NewHTTPTransporter(prefix string) *HTTPTransporter { t := &HTTPTransporter{ - DisableKeepAlives: false, - prefix: prefix, - appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"), - requestVotePath: fmt.Sprintf("%s%s", prefix, "/requestVote"), - Transport: &http.Transport{DisableKeepAlives: false}, + DisableKeepAlives: false, + prefix: prefix, + appendEntriesPath: joinPath(prefix, "/appendEntries"), + requestVotePath: joinPath(prefix, "/requestVote"), + snapshotPath: joinPath(prefix, "/snapshot"), + snapshotRecoveryPath: joinPath(prefix, "/snapshotRecovery"), + Transport: &http.Transport{DisableKeepAlives: false}, } t.httpClient.Transport = t.Transport return t @@ -71,6 +77,16 @@ func (t *HTTPTransporter) RequestVotePath() string { return t.requestVotePath } +// Retrieves the Snapshot path. +func (t *HTTPTransporter) SnapshotPath() string { + return t.snapshotPath +} + +// Retrieves the SnapshotRecovery path. +func (t *HTTPTransporter) SnapshotRecoveryPath() string { + return t.snapshotRecoveryPath +} + //------------------------------------------------------------------------------ // // Methods @@ -85,6 +101,8 @@ func (t *HTTPTransporter) RequestVotePath() string { func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer) { mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server)) mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server)) + mux.HandleFunc(t.SnapshotPath(), t.snapshotHandler(server)) + mux.HandleFunc(t.SnapshotRecoveryPath(), t.snapshotRecoveryHandler(server)) } //-------------------------------------- @@ -99,7 +117,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, re return nil } - url := fmt.Sprintf("%s%s", peer.ConnectionString, t.AppendEntriesPath()) + url := joinPath(peer.ConnectionString, t.AppendEntriesPath()) traceln(server.Name(), "POST", url) t.Transport.ResponseHeaderTimeout = server.ElectionTimeout() @@ -146,14 +164,67 @@ func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *Reques return resp } +func joinPath(connectionString, thePath string) string { + u, err := url.Parse(connectionString) + if err != nil { + panic(err) + } + u.Path = path.Join(u.Path, thePath) + return u.String() +} + // Sends a SnapshotRequest RPC to a peer. func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse { - return nil + var b bytes.Buffer + if _, err := req.Encode(&b); err != nil { + traceln("transporter.rv.encoding.error:", err) + return nil + } + + url := joinPath(peer.ConnectionString, t.snapshotPath) + traceln(server.Name(), "POST", url) + + httpResp, err := t.httpClient.Post(url, "application/protobuf", &b) + if httpResp == nil || err != nil { + traceln("transporter.rv.response.error:", err) + return nil + } + defer httpResp.Body.Close() + + resp := &SnapshotResponse{} + if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF { + traceln("transporter.rv.decoding.error:", err) + return nil + } + + return resp } // Sends a SnapshotRequest RPC to a peer. func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse { - return nil + var b bytes.Buffer + if _, err := req.Encode(&b); err != nil { + traceln("transporter.rv.encoding.error:", err) + return nil + } + + url := joinPath(peer.ConnectionString, t.snapshotRecoveryPath) + traceln(server.Name(), "POST", url) + + httpResp, err := t.httpClient.Post(url, "application/protobuf", &b) + if httpResp == nil || err != nil { + traceln("transporter.rv.response.error:", err) + return nil + } + defer httpResp.Body.Close() + + resp := &SnapshotRecoveryResponse{} + if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF { + traceln("transporter.rv.decoding.error:", err) + return nil + } + + return resp } //-------------------------------------- @@ -197,3 +268,41 @@ func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc { } } } + +// Handles incoming Snapshot requests. +func (t *HTTPTransporter) snapshotHandler(server Server) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + traceln(server.Name(), "RECV /snapshot") + + req := &SnapshotRequest{} + if _, err := req.Decode(r.Body); err != nil { + http.Error(w, "", http.StatusBadRequest) + return + } + + resp := server.RequestSnapshot(req) + if _, err := resp.Encode(w); err != nil { + http.Error(w, "", http.StatusInternalServerError) + return + } + } +} + +// Handles incoming SnapshotRecovery requests. +func (t *HTTPTransporter) snapshotRecoveryHandler(server Server) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + traceln(server.Name(), "RECV /snapshotRecovery") + + req := &SnapshotRecoveryRequest{} + if _, err := req.Decode(r.Body); err != nil { + http.Error(w, "", http.StatusBadRequest) + return + } + + resp := server.SnapshotRecoveryRequest(req) + if _, err := resp.Encode(w); err != nil { + http.Error(w, "", http.StatusInternalServerError) + return + } + } +} From 116a007266812c3846582d35fce5615d50f03c60 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 19 Jan 2014 14:57:49 -0700 Subject: [PATCH 22/23] Add Drone.io & Coveralls badges. --- README.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/README.md b/README.md index 5486cc5695..13e0c8404d 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,6 @@ -go-raft +go-raft [![Build Status](https://drone.io/github.com/goraft/raft/status.png)](https://drone.io/github.com/goraft/raft/latest) [![Coverage Status](https://coveralls.io/repos/goraft/raft/badge.png?branch=master)](https://coveralls.io/r/goraft/raft?branch=master) ======= -[![Build Status](https://travis-ci.org/goraft/raft.png?branch=master)](https://travis-ci.org/goraft/raft) - ## Overview This is a Go implementation of the Raft distributed consensus protocol. From 15dd02e3f916b2e816cbbec1eb5939ed8694a3e1 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Tue, 21 Jan 2014 08:24:28 -0800 Subject: [PATCH 23/23] feat(event): Add HeartbeatEventType --- event.go | 2 ++ peer.go | 3 +++ 2 files changed, 5 insertions(+) diff --git a/event.go b/event.go index 649a1158cd..d6cc3d035d 100644 --- a/event.go +++ b/event.go @@ -9,6 +9,8 @@ const ( HeartbeatTimeoutEventType = "heartbeatTimeout" ElectionTimeoutThresholdEventType = "electionTimeoutThreshold" + + HeartbeatEventType = "heartbeat" ) // Event represents an action that occurred within the Raft library. diff --git a/peer.go b/peer.go index e3abf6cf96..e5ef08c832 100644 --- a/peer.go +++ b/peer.go @@ -135,7 +135,10 @@ func (p *Peer) heartbeat(c chan bool) { } case <-ticker: + start := time.Now() p.flush() + duration := time.Now().Sub(start) + p.server.DispatchEvent(newEvent(HeartbeatEventType, duration, nil)) } } }