Merge branch 'master' of https://github.com/goraft/raft into snapshot-testing
Conflicts: snapshot.gopull/820/head
commit
5aa95dfde6
|
@ -1,8 +1,6 @@
|
||||||
go-raft
|
go-raft [](https://drone.io/github.com/goraft/raft/latest) [](https://coveralls.io/r/goraft/raft?branch=master)
|
||||||
=======
|
=======
|
||||||
|
|
||||||
[](https://travis-ci.org/goraft/raft)
|
|
||||||
|
|
||||||
## Overview
|
## Overview
|
||||||
|
|
||||||
This is a Go implementation of the Raft distributed consensus protocol.
|
This is a Go implementation of the Raft distributed consensus protocol.
|
||||||
|
|
|
@ -28,7 +28,7 @@ func createTestAppendEntriesRequest(entryCount int) (*AppendEntriesRequest, []by
|
||||||
entries := make([]*LogEntry, 0)
|
entries := make([]*LogEntry, 0)
|
||||||
for i := 0; i < entryCount; i++ {
|
for i := 0; i < entryCount; i++ {
|
||||||
command := &DefaultJoinCommand{Name: "localhost:1000"}
|
command := &DefaultJoinCommand{Name: "localhost:1000"}
|
||||||
entry, _ := newLogEntry(nil, 1, 2, command)
|
entry, _ := newLogEntry(nil, nil, 1, 2, command)
|
||||||
entries = append(entries, entry)
|
entries = append(entries, entry)
|
||||||
}
|
}
|
||||||
req := newAppendEntriesRequest(1, 1, 1, 1, "leader", entries)
|
req := newAppendEntriesRequest(1, 1, 1, 1, "leader", entries)
|
||||||
|
|
2
event.go
2
event.go
|
@ -9,6 +9,8 @@ const (
|
||||||
|
|
||||||
HeartbeatTimeoutEventType = "heartbeatTimeout"
|
HeartbeatTimeoutEventType = "heartbeatTimeout"
|
||||||
ElectionTimeoutThresholdEventType = "electionTimeoutThreshold"
|
ElectionTimeoutThresholdEventType = "electionTimeoutThreshold"
|
||||||
|
|
||||||
|
HeartbeatEventType = "heartbeat"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Event represents an action that occurred within the Raft library.
|
// Event represents an action that occurred within the Raft library.
|
||||||
|
|
|
@ -5,6 +5,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"path"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Parts from this transporter were heavily influenced by Peter Bougon's
|
// Parts from this transporter were heavily influenced by Peter Bougon's
|
||||||
|
@ -19,12 +21,14 @@ import (
|
||||||
// An HTTPTransporter is a default transport layer used to communicate between
|
// An HTTPTransporter is a default transport layer used to communicate between
|
||||||
// multiple servers.
|
// multiple servers.
|
||||||
type HTTPTransporter struct {
|
type HTTPTransporter struct {
|
||||||
DisableKeepAlives bool
|
DisableKeepAlives bool
|
||||||
prefix string
|
prefix string
|
||||||
appendEntriesPath string
|
appendEntriesPath string
|
||||||
requestVotePath string
|
requestVotePath string
|
||||||
httpClient http.Client
|
snapshotPath string
|
||||||
transport *http.Transport
|
snapshotRecoveryPath string
|
||||||
|
httpClient http.Client
|
||||||
|
Transport *http.Transport
|
||||||
}
|
}
|
||||||
|
|
||||||
type HTTPMuxer interface {
|
type HTTPMuxer interface {
|
||||||
|
@ -40,13 +44,15 @@ type HTTPMuxer interface {
|
||||||
// Creates a new HTTP transporter with the given path prefix.
|
// Creates a new HTTP transporter with the given path prefix.
|
||||||
func NewHTTPTransporter(prefix string) *HTTPTransporter {
|
func NewHTTPTransporter(prefix string) *HTTPTransporter {
|
||||||
t := &HTTPTransporter{
|
t := &HTTPTransporter{
|
||||||
DisableKeepAlives: false,
|
DisableKeepAlives: false,
|
||||||
prefix: prefix,
|
prefix: prefix,
|
||||||
appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"),
|
appendEntriesPath: joinPath(prefix, "/appendEntries"),
|
||||||
requestVotePath: fmt.Sprintf("%s%s", prefix, "/requestVote"),
|
requestVotePath: joinPath(prefix, "/requestVote"),
|
||||||
transport: &http.Transport{DisableKeepAlives: false},
|
snapshotPath: joinPath(prefix, "/snapshot"),
|
||||||
|
snapshotRecoveryPath: joinPath(prefix, "/snapshotRecovery"),
|
||||||
|
Transport: &http.Transport{DisableKeepAlives: false},
|
||||||
}
|
}
|
||||||
t.httpClient.Transport = t.transport
|
t.httpClient.Transport = t.Transport
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,6 +77,16 @@ func (t *HTTPTransporter) RequestVotePath() string {
|
||||||
return t.requestVotePath
|
return t.requestVotePath
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Retrieves the Snapshot path.
|
||||||
|
func (t *HTTPTransporter) SnapshotPath() string {
|
||||||
|
return t.snapshotPath
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieves the SnapshotRecovery path.
|
||||||
|
func (t *HTTPTransporter) SnapshotRecoveryPath() string {
|
||||||
|
return t.snapshotRecoveryPath
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
//
|
//
|
||||||
// Methods
|
// Methods
|
||||||
|
@ -85,6 +101,8 @@ func (t *HTTPTransporter) RequestVotePath() string {
|
||||||
func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer) {
|
func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer) {
|
||||||
mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server))
|
mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server))
|
||||||
mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server))
|
mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server))
|
||||||
|
mux.HandleFunc(t.SnapshotPath(), t.snapshotHandler(server))
|
||||||
|
mux.HandleFunc(t.SnapshotRecoveryPath(), t.snapshotRecoveryHandler(server))
|
||||||
}
|
}
|
||||||
|
|
||||||
//--------------------------------------
|
//--------------------------------------
|
||||||
|
@ -99,10 +117,10 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, re
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
url := fmt.Sprintf("%s%s", peer.ConnectionString, t.AppendEntriesPath())
|
url := joinPath(peer.ConnectionString, t.AppendEntriesPath())
|
||||||
traceln(server.Name(), "POST", url)
|
traceln(server.Name(), "POST", url)
|
||||||
|
|
||||||
t.transport.ResponseHeaderTimeout = server.ElectionTimeout()
|
t.Transport.ResponseHeaderTimeout = server.ElectionTimeout()
|
||||||
httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
|
httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
|
||||||
if httpResp == nil || err != nil {
|
if httpResp == nil || err != nil {
|
||||||
traceln("transporter.ae.response.error:", err)
|
traceln("transporter.ae.response.error:", err)
|
||||||
|
@ -146,14 +164,67 @@ func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *Reques
|
||||||
return resp
|
return resp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func joinPath(connectionString, thePath string) string {
|
||||||
|
u, err := url.Parse(connectionString)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
u.Path = path.Join(u.Path, thePath)
|
||||||
|
return u.String()
|
||||||
|
}
|
||||||
|
|
||||||
// Sends a SnapshotRequest RPC to a peer.
|
// Sends a SnapshotRequest RPC to a peer.
|
||||||
func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
|
func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
|
||||||
return nil
|
var b bytes.Buffer
|
||||||
|
if _, err := req.Encode(&b); err != nil {
|
||||||
|
traceln("transporter.rv.encoding.error:", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
url := joinPath(peer.ConnectionString, t.snapshotPath)
|
||||||
|
traceln(server.Name(), "POST", url)
|
||||||
|
|
||||||
|
httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
|
||||||
|
if httpResp == nil || err != nil {
|
||||||
|
traceln("transporter.rv.response.error:", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
defer httpResp.Body.Close()
|
||||||
|
|
||||||
|
resp := &SnapshotResponse{}
|
||||||
|
if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
|
||||||
|
traceln("transporter.rv.decoding.error:", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sends a SnapshotRequest RPC to a peer.
|
// Sends a SnapshotRequest RPC to a peer.
|
||||||
func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
|
func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
|
||||||
return nil
|
var b bytes.Buffer
|
||||||
|
if _, err := req.Encode(&b); err != nil {
|
||||||
|
traceln("transporter.rv.encoding.error:", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
url := joinPath(peer.ConnectionString, t.snapshotRecoveryPath)
|
||||||
|
traceln(server.Name(), "POST", url)
|
||||||
|
|
||||||
|
httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
|
||||||
|
if httpResp == nil || err != nil {
|
||||||
|
traceln("transporter.rv.response.error:", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
defer httpResp.Body.Close()
|
||||||
|
|
||||||
|
resp := &SnapshotRecoveryResponse{}
|
||||||
|
if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
|
||||||
|
traceln("transporter.rv.decoding.error:", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp
|
||||||
}
|
}
|
||||||
|
|
||||||
//--------------------------------------
|
//--------------------------------------
|
||||||
|
@ -197,3 +268,41 @@ func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handles incoming Snapshot requests.
|
||||||
|
func (t *HTTPTransporter) snapshotHandler(server Server) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
traceln(server.Name(), "RECV /snapshot")
|
||||||
|
|
||||||
|
req := &SnapshotRequest{}
|
||||||
|
if _, err := req.Decode(r.Body); err != nil {
|
||||||
|
http.Error(w, "", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := server.RequestSnapshot(req)
|
||||||
|
if _, err := resp.Encode(w); err != nil {
|
||||||
|
http.Error(w, "", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handles incoming SnapshotRecovery requests.
|
||||||
|
func (t *HTTPTransporter) snapshotRecoveryHandler(server Server) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
traceln(server.Name(), "RECV /snapshotRecovery")
|
||||||
|
|
||||||
|
req := &SnapshotRecoveryRequest{}
|
||||||
|
if _, err := req.Decode(r.Body); err != nil {
|
||||||
|
http.Error(w, "", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
resp := server.SnapshotRecoveryRequest(req)
|
||||||
|
if _, err := resp.Encode(w); err != nil {
|
||||||
|
http.Error(w, "", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
101
log.go
101
log.go
|
@ -23,7 +23,6 @@ type Log struct {
|
||||||
file *os.File
|
file *os.File
|
||||||
path string
|
path string
|
||||||
entries []*LogEntry
|
entries []*LogEntry
|
||||||
results []*logResult
|
|
||||||
commitIndex uint64
|
commitIndex uint64
|
||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
startIndex uint64 // the index before the first entry in the Log entries
|
startIndex uint64 // the index before the first entry in the Log entries
|
||||||
|
@ -162,7 +161,7 @@ func (l *Log) open(path string) error {
|
||||||
// Read the file and decode entries.
|
// Read the file and decode entries.
|
||||||
for {
|
for {
|
||||||
// Instantiate log entry and decode into it.
|
// Instantiate log entry and decode into it.
|
||||||
entry, _ := newLogEntry(l, 0, 0, nil)
|
entry, _ := newLogEntry(l, nil, 0, 0, nil)
|
||||||
entry.Position, _ = l.file.Seek(0, os.SEEK_CUR)
|
entry.Position, _ = l.file.Seek(0, os.SEEK_CUR)
|
||||||
|
|
||||||
n, err := entry.decode(l.file)
|
n, err := entry.decode(l.file)
|
||||||
|
@ -191,8 +190,6 @@ func (l *Log) open(path string) error {
|
||||||
|
|
||||||
readBytes += int64(n)
|
readBytes += int64(n)
|
||||||
}
|
}
|
||||||
l.results = make([]*logResult, len(l.entries))
|
|
||||||
|
|
||||||
debugln("open.log.recovery number of log ", len(l.entries))
|
debugln("open.log.recovery number of log ", len(l.entries))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -207,7 +204,11 @@ func (l *Log) close() {
|
||||||
l.file = nil
|
l.file = nil
|
||||||
}
|
}
|
||||||
l.entries = make([]*LogEntry, 0)
|
l.entries = make([]*LogEntry, 0)
|
||||||
l.results = make([]*logResult, 0)
|
}
|
||||||
|
|
||||||
|
// sync to disk
|
||||||
|
func (l *Log) sync() error {
|
||||||
|
return l.file.Sync()
|
||||||
}
|
}
|
||||||
|
|
||||||
//--------------------------------------
|
//--------------------------------------
|
||||||
|
@ -215,8 +216,8 @@ func (l *Log) close() {
|
||||||
//--------------------------------------
|
//--------------------------------------
|
||||||
|
|
||||||
// Creates a log entry associated with this log.
|
// Creates a log entry associated with this log.
|
||||||
func (l *Log) createEntry(term uint64, command Command) (*LogEntry, error) {
|
func (l *Log) createEntry(term uint64, command Command, e *ev) (*LogEntry, error) {
|
||||||
return newLogEntry(l, l.nextIndex(), term, command)
|
return newLogEntry(l, e, l.nextIndex(), term, command)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieves an entry from the log. If the entry has been eliminated because
|
// Retrieves an entry from the log. If the entry has been eliminated because
|
||||||
|
@ -266,7 +267,7 @@ func (l *Log) getEntriesAfter(index uint64, maxLogEntriesPerRequest uint64) ([]*
|
||||||
entries := l.entries[index-l.startIndex:]
|
entries := l.entries[index-l.startIndex:]
|
||||||
length := len(entries)
|
length := len(entries)
|
||||||
|
|
||||||
traceln("log.entriesAfter: startIndex:", l.startIndex, " lenght", len(l.entries))
|
traceln("log.entriesAfter: startIndex:", l.startIndex, " length", len(l.entries))
|
||||||
|
|
||||||
if uint64(length) < maxLogEntriesPerRequest {
|
if uint64(length) < maxLogEntriesPerRequest {
|
||||||
// Determine the term at the given entry and return a subslice.
|
// Determine the term at the given entry and return a subslice.
|
||||||
|
@ -276,35 +277,6 @@ func (l *Log) getEntriesAfter(index uint64, maxLogEntriesPerRequest uint64) ([]*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieves the return value and error for an entry. The result can only exist
|
|
||||||
// after the entry has been committed.
|
|
||||||
func (l *Log) getEntryResult(entry *LogEntry, clear bool) (interface{}, error) {
|
|
||||||
l.mutex.RLock()
|
|
||||||
defer l.mutex.RUnlock()
|
|
||||||
|
|
||||||
if entry == nil {
|
|
||||||
panic("raft: Log entry required for error retrieval")
|
|
||||||
}
|
|
||||||
debugln("getEntryResult.result index: ", entry.Index-l.startIndex-1)
|
|
||||||
// If a result exists for the entry then return it with its error.
|
|
||||||
if entry.Index > l.startIndex && entry.Index <= l.startIndex+uint64(len(l.results)) {
|
|
||||||
if result := l.results[entry.Index-l.startIndex-1]; result != nil {
|
|
||||||
|
|
||||||
// keep the records before remove it
|
|
||||||
returnValue, err := result.returnValue, result.err
|
|
||||||
|
|
||||||
// Remove reference to result if it's being cleared after retrieval.
|
|
||||||
if clear {
|
|
||||||
result.returnValue = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return returnValue, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
//--------------------------------------
|
//--------------------------------------
|
||||||
// Commit
|
// Commit
|
||||||
//--------------------------------------
|
//--------------------------------------
|
||||||
|
@ -369,7 +341,7 @@ func (l *Log) setCommitIndex(index uint64) error {
|
||||||
// Do not allow previous indices to be committed again.
|
// Do not allow previous indices to be committed again.
|
||||||
|
|
||||||
// This could happens, since the guarantee is that the new leader has up-to-dated
|
// This could happens, since the guarantee is that the new leader has up-to-dated
|
||||||
// log entires rather than has most up-to-dated committed index
|
// log entries rather than has most up-to-dated committed index
|
||||||
|
|
||||||
// For example, Leader 1 send log 80 to follower 2 and follower 3
|
// For example, Leader 1 send log 80 to follower 2 and follower 3
|
||||||
// follower 2 and follow 3 all got the new entries and reply
|
// follower 2 and follow 3 all got the new entries and reply
|
||||||
|
@ -401,8 +373,11 @@ func (l *Log) setCommitIndex(index uint64) error {
|
||||||
|
|
||||||
// Apply the changes to the state machine and store the error code.
|
// Apply the changes to the state machine and store the error code.
|
||||||
returnValue, err := l.ApplyFunc(command)
|
returnValue, err := l.ApplyFunc(command)
|
||||||
debugln("setCommitIndex.set.result index: ", entryIndex)
|
debugf("setCommitIndex.set.result index: %v, entries index: %v", i, entryIndex)
|
||||||
l.results[entryIndex] = &logResult{returnValue: returnValue, err: err}
|
if entry.event != nil {
|
||||||
|
entry.event.returnValue = returnValue
|
||||||
|
entry.event.c <- err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -443,6 +418,14 @@ func (l *Log) truncate(index uint64, term uint64) error {
|
||||||
debugln("log.truncate.clear")
|
debugln("log.truncate.clear")
|
||||||
l.file.Truncate(0)
|
l.file.Truncate(0)
|
||||||
l.file.Seek(0, os.SEEK_SET)
|
l.file.Seek(0, os.SEEK_SET)
|
||||||
|
|
||||||
|
// notify clients if this node is the previous leader
|
||||||
|
for _, entry := range l.entries {
|
||||||
|
if entry.event != nil {
|
||||||
|
entry.event.c <- errors.New("command failed to be committed due to node failure")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
l.entries = []*LogEntry{}
|
l.entries = []*LogEntry{}
|
||||||
} else {
|
} else {
|
||||||
// Do not truncate if the entry at index does not have the matching term.
|
// Do not truncate if the entry at index does not have the matching term.
|
||||||
|
@ -458,6 +441,15 @@ func (l *Log) truncate(index uint64, term uint64) error {
|
||||||
position := l.entries[index-l.startIndex].Position
|
position := l.entries[index-l.startIndex].Position
|
||||||
l.file.Truncate(position)
|
l.file.Truncate(position)
|
||||||
l.file.Seek(position, os.SEEK_SET)
|
l.file.Seek(position, os.SEEK_SET)
|
||||||
|
|
||||||
|
// notify clients if this node is the previous leader
|
||||||
|
for i := index - l.startIndex; i < uint64(len(l.entries)); i++ {
|
||||||
|
entry := l.entries[i]
|
||||||
|
if entry.event != nil {
|
||||||
|
entry.event.c <- errors.New("command failed to be committed due to node failure")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
l.entries = l.entries[0 : index-l.startIndex]
|
l.entries = l.entries[0 : index-l.startIndex]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -490,7 +482,7 @@ func (l *Log) appendEntries(entries []*LogEntry) error {
|
||||||
startPosition += size
|
startPosition += size
|
||||||
}
|
}
|
||||||
w.Flush()
|
w.Flush()
|
||||||
err = l.file.Sync()
|
err = l.sync()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -529,7 +521,6 @@ func (l *Log) appendEntry(entry *LogEntry) error {
|
||||||
|
|
||||||
// Append to entries list if stored on disk.
|
// Append to entries list if stored on disk.
|
||||||
l.entries = append(l.entries, entry)
|
l.entries = append(l.entries, entry)
|
||||||
l.results = append(l.results, nil)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -558,7 +549,6 @@ func (l *Log) writeEntry(entry *LogEntry, w io.Writer) (int64, error) {
|
||||||
|
|
||||||
// Append to entries list if stored on disk.
|
// Append to entries list if stored on disk.
|
||||||
l.entries = append(l.entries, entry)
|
l.entries = append(l.entries, entry)
|
||||||
l.results = append(l.results, nil)
|
|
||||||
|
|
||||||
return int64(size), nil
|
return int64(size), nil
|
||||||
}
|
}
|
||||||
|
@ -570,7 +560,6 @@ func (l *Log) writeEntry(entry *LogEntry, w io.Writer) (int64, error) {
|
||||||
// compact the log before index (including index)
|
// compact the log before index (including index)
|
||||||
func (l *Log) compact(index uint64, term uint64) error {
|
func (l *Log) compact(index uint64, term uint64) error {
|
||||||
var entries []*LogEntry
|
var entries []*LogEntry
|
||||||
var results []*logResult
|
|
||||||
|
|
||||||
l.mutex.Lock()
|
l.mutex.Lock()
|
||||||
defer l.mutex.Unlock()
|
defer l.mutex.Unlock()
|
||||||
|
@ -583,15 +572,14 @@ func (l *Log) compact(index uint64, term uint64) error {
|
||||||
// we just recovery from on snapshot
|
// we just recovery from on snapshot
|
||||||
if index >= l.internalCurrentIndex() {
|
if index >= l.internalCurrentIndex() {
|
||||||
entries = make([]*LogEntry, 0)
|
entries = make([]*LogEntry, 0)
|
||||||
results = make([]*logResult, 0)
|
|
||||||
} else {
|
} else {
|
||||||
// get all log entries after index
|
// get all log entries after index
|
||||||
entries = l.entries[index-l.startIndex:]
|
entries = l.entries[index-l.startIndex:]
|
||||||
results = l.results[index-l.startIndex:]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// create a new log file and add all the entries
|
// create a new log file and add all the entries
|
||||||
file, err := os.OpenFile(l.path+".new", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
|
new_file_path := l.path + ".new"
|
||||||
|
file, err := os.OpenFile(new_file_path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -600,28 +588,29 @@ func (l *Log) compact(index uint64, term uint64) error {
|
||||||
entry.Position = position
|
entry.Position = position
|
||||||
|
|
||||||
if _, err = entry.encode(file); err != nil {
|
if _, err = entry.encode(file); err != nil {
|
||||||
|
file.Close()
|
||||||
|
os.Remove(new_file_path)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// close the current log file
|
file.Sync()
|
||||||
l.file.Close()
|
|
||||||
|
|
||||||
// remove the current log file to .bak
|
old_file := l.file
|
||||||
err = os.Remove(l.path)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// rename the new log file
|
// rename the new log file
|
||||||
err = os.Rename(l.path+".new", l.path)
|
err = os.Rename(new_file_path, l.path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
file.Close()
|
||||||
|
os.Remove(new_file_path)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
l.file = file
|
l.file = file
|
||||||
|
|
||||||
|
// close the old log file
|
||||||
|
old_file.Close()
|
||||||
|
|
||||||
// compaction the in memory log
|
// compaction the in memory log
|
||||||
l.entries = entries
|
l.entries = entries
|
||||||
l.results = results
|
|
||||||
l.startIndex = index
|
l.startIndex = index
|
||||||
l.startTerm = term
|
l.startTerm = term
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -17,11 +17,11 @@ type LogEntry struct {
|
||||||
CommandName string
|
CommandName string
|
||||||
Command []byte
|
Command []byte
|
||||||
Position int64 // position in the log file
|
Position int64 // position in the log file
|
||||||
commit chan bool
|
event *ev
|
||||||
}
|
}
|
||||||
|
|
||||||
// Creates a new log entry associated with a log.
|
// Creates a new log entry associated with a log.
|
||||||
func newLogEntry(log *Log, index uint64, term uint64, command Command) (*LogEntry, error) {
|
func newLogEntry(log *Log, event *ev, index uint64, term uint64, command Command) (*LogEntry, error) {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
var commandName string
|
var commandName string
|
||||||
if command != nil {
|
if command != nil {
|
||||||
|
@ -41,7 +41,7 @@ func newLogEntry(log *Log, index uint64, term uint64, command Command) (*LogEntr
|
||||||
Term: term,
|
Term: term,
|
||||||
CommandName: commandName,
|
CommandName: commandName,
|
||||||
Command: buf.Bytes(),
|
Command: buf.Bytes(),
|
||||||
commit: make(chan bool, 5),
|
event: event,
|
||||||
}
|
}
|
||||||
|
|
||||||
return e, nil
|
return e, nil
|
||||||
|
|
30
log_test.go
30
log_test.go
|
@ -30,15 +30,15 @@ func TestLogNewLog(t *testing.T) {
|
||||||
defer log.close()
|
defer log.close()
|
||||||
defer os.Remove(path)
|
defer os.Remove(path)
|
||||||
|
|
||||||
e, _ := newLogEntry(log, 1, 1, &testCommand1{Val: "foo", I: 20})
|
e, _ := newLogEntry(log, nil, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||||
if err := log.appendEntry(e); err != nil {
|
if err := log.appendEntry(e); err != nil {
|
||||||
t.Fatalf("Unable to append: %v", err)
|
t.Fatalf("Unable to append: %v", err)
|
||||||
}
|
}
|
||||||
e, _ = newLogEntry(log, 2, 1, &testCommand2{X: 100})
|
e, _ = newLogEntry(log, nil, 2, 1, &testCommand2{X: 100})
|
||||||
if err := log.appendEntry(e); err != nil {
|
if err := log.appendEntry(e); err != nil {
|
||||||
t.Fatalf("Unable to append: %v", err)
|
t.Fatalf("Unable to append: %v", err)
|
||||||
}
|
}
|
||||||
e, _ = newLogEntry(log, 3, 2, &testCommand1{Val: "bar", I: 0})
|
e, _ = newLogEntry(log, nil, 3, 2, &testCommand1{Val: "bar", I: 0})
|
||||||
if err := log.appendEntry(e); err != nil {
|
if err := log.appendEntry(e); err != nil {
|
||||||
t.Fatalf("Unable to append: %v", err)
|
t.Fatalf("Unable to append: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -63,9 +63,9 @@ func TestLogNewLog(t *testing.T) {
|
||||||
// Ensure that we can decode and encode to an existing log.
|
// Ensure that we can decode and encode to an existing log.
|
||||||
func TestLogExistingLog(t *testing.T) {
|
func TestLogExistingLog(t *testing.T) {
|
||||||
tmpLog := newLog()
|
tmpLog := newLog()
|
||||||
e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
|
e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||||
e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
|
e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100})
|
||||||
e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
|
e2, _ := newLogEntry(tmpLog, nil, 3, 2, &testCommand1{Val: "bar", I: 0})
|
||||||
log, path := setupLog([]*LogEntry{e0, e1, e2})
|
log, path := setupLog([]*LogEntry{e0, e1, e2})
|
||||||
defer log.close()
|
defer log.close()
|
||||||
defer os.Remove(path)
|
defer os.Remove(path)
|
||||||
|
@ -88,9 +88,9 @@ func TestLogExistingLog(t *testing.T) {
|
||||||
// Ensure that we can check the contents of the log by index/term.
|
// Ensure that we can check the contents of the log by index/term.
|
||||||
func TestLogContainsEntries(t *testing.T) {
|
func TestLogContainsEntries(t *testing.T) {
|
||||||
tmpLog := newLog()
|
tmpLog := newLog()
|
||||||
e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
|
e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||||
e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
|
e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100})
|
||||||
e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
|
e2, _ := newLogEntry(tmpLog, nil, 3, 2, &testCommand1{Val: "bar", I: 0})
|
||||||
log, path := setupLog([]*LogEntry{e0, e1, e2})
|
log, path := setupLog([]*LogEntry{e0, e1, e2})
|
||||||
defer log.close()
|
defer log.close()
|
||||||
defer os.Remove(path)
|
defer os.Remove(path)
|
||||||
|
@ -115,8 +115,8 @@ func TestLogContainsEntries(t *testing.T) {
|
||||||
// Ensure that we can recover from an incomplete/corrupt log and continue logging.
|
// Ensure that we can recover from an incomplete/corrupt log and continue logging.
|
||||||
func TestLogRecovery(t *testing.T) {
|
func TestLogRecovery(t *testing.T) {
|
||||||
tmpLog := newLog()
|
tmpLog := newLog()
|
||||||
e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
|
e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||||
e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
|
e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100})
|
||||||
f, _ := ioutil.TempFile("", "raft-log-")
|
f, _ := ioutil.TempFile("", "raft-log-")
|
||||||
|
|
||||||
e0.encode(f)
|
e0.encode(f)
|
||||||
|
@ -134,7 +134,7 @@ func TestLogRecovery(t *testing.T) {
|
||||||
defer log.close()
|
defer log.close()
|
||||||
defer os.Remove(f.Name())
|
defer os.Remove(f.Name())
|
||||||
|
|
||||||
e, _ := newLogEntry(log, 3, 2, &testCommand1{Val: "bat", I: -5})
|
e, _ := newLogEntry(log, nil, 3, 2, &testCommand1{Val: "bat", I: -5})
|
||||||
if err := log.appendEntry(e); err != nil {
|
if err := log.appendEntry(e); err != nil {
|
||||||
t.Fatalf("Unable to append: %v", err)
|
t.Fatalf("Unable to append: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -167,15 +167,15 @@ func TestLogTruncate(t *testing.T) {
|
||||||
|
|
||||||
defer os.Remove(path)
|
defer os.Remove(path)
|
||||||
|
|
||||||
entry1, _ := newLogEntry(log, 1, 1, &testCommand1{Val: "foo", I: 20})
|
entry1, _ := newLogEntry(log, nil, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||||
if err := log.appendEntry(entry1); err != nil {
|
if err := log.appendEntry(entry1); err != nil {
|
||||||
t.Fatalf("Unable to append: %v", err)
|
t.Fatalf("Unable to append: %v", err)
|
||||||
}
|
}
|
||||||
entry2, _ := newLogEntry(log, 2, 1, &testCommand2{X: 100})
|
entry2, _ := newLogEntry(log, nil, 2, 1, &testCommand2{X: 100})
|
||||||
if err := log.appendEntry(entry2); err != nil {
|
if err := log.appendEntry(entry2); err != nil {
|
||||||
t.Fatalf("Unable to append: %v", err)
|
t.Fatalf("Unable to append: %v", err)
|
||||||
}
|
}
|
||||||
entry3, _ := newLogEntry(log, 3, 2, &testCommand1{Val: "bar", I: 0})
|
entry3, _ := newLogEntry(log, nil, 3, 2, &testCommand1{Val: "bar", I: 0})
|
||||||
if err := log.appendEntry(entry3); err != nil {
|
if err := log.appendEntry(entry3); err != nil {
|
||||||
t.Fatalf("Unable to append: %v", err)
|
t.Fatalf("Unable to append: %v", err)
|
||||||
}
|
}
|
||||||
|
|
53
peer.go
53
peer.go
|
@ -79,7 +79,7 @@ func (p *Peer) setPrevLogIndex(value uint64) {
|
||||||
|
|
||||||
// Starts the peer heartbeat.
|
// Starts the peer heartbeat.
|
||||||
func (p *Peer) startHeartbeat() {
|
func (p *Peer) startHeartbeat() {
|
||||||
p.stopChan = make(chan bool, 1)
|
p.stopChan = make(chan bool)
|
||||||
c := make(chan bool)
|
c := make(chan bool)
|
||||||
go p.heartbeat(c)
|
go p.heartbeat(c)
|
||||||
<-c
|
<-c
|
||||||
|
@ -87,17 +87,7 @@ func (p *Peer) startHeartbeat() {
|
||||||
|
|
||||||
// Stops the peer heartbeat.
|
// Stops the peer heartbeat.
|
||||||
func (p *Peer) stopHeartbeat(flush bool) {
|
func (p *Peer) stopHeartbeat(flush bool) {
|
||||||
// here is a problem
|
p.stopChan <- flush
|
||||||
// the previous stop is no buffer leader may get blocked
|
|
||||||
// when heartbeat returns
|
|
||||||
// I make the channel with 1 buffer
|
|
||||||
// and try to panic here
|
|
||||||
select {
|
|
||||||
case p.stopChan <- flush:
|
|
||||||
|
|
||||||
default:
|
|
||||||
panic("[" + p.server.Name() + "] cannot stop [" + p.Name + "] heartbeat")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//--------------------------------------
|
//--------------------------------------
|
||||||
|
@ -140,18 +130,21 @@ func (p *Peer) heartbeat(c chan bool) {
|
||||||
// before we can safely remove a node
|
// before we can safely remove a node
|
||||||
// we must flush the remove command to the node first
|
// we must flush the remove command to the node first
|
||||||
p.flush()
|
p.flush()
|
||||||
debugln("peer.heartbeat.stop: ", p.Name)
|
debugln("peer.heartbeat.stop.with.flush: ", p.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-ticker:
|
case <-ticker:
|
||||||
|
start := time.Now()
|
||||||
p.flush()
|
p.flush()
|
||||||
|
duration := time.Now().Sub(start)
|
||||||
|
p.server.DispatchEvent(newEvent(HeartbeatEventType, duration, nil))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Peer) flush() {
|
func (p *Peer) flush() {
|
||||||
debugln("peer.heartbeat.run: ", p.Name)
|
debugln("peer.heartbeat.flush: ", p.Name)
|
||||||
prevLogIndex := p.getPrevLogIndex()
|
prevLogIndex := p.getPrevLogIndex()
|
||||||
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
|
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
|
||||||
|
|
||||||
|
@ -172,15 +165,16 @@ func (p *Peer) flush() {
|
||||||
|
|
||||||
// Sends an AppendEntries request to the peer through the transport.
|
// Sends an AppendEntries request to the peer through the transport.
|
||||||
func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
|
func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
|
||||||
traceln("peer.flush.send: ", p.server.Name(), "->", p.Name, " ", len(req.Entries))
|
tracef("peer.append.send: %s->%s [prevLog:%v length: %v]\n",
|
||||||
|
p.server.Name(), p.Name, req.PrevLogIndex, len(req.Entries))
|
||||||
|
|
||||||
resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)
|
resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)
|
||||||
if resp == nil {
|
if resp == nil {
|
||||||
p.server.DispatchEvent(newEvent(HeartbeatTimeoutEventType, p, nil))
|
p.server.DispatchEvent(newEvent(HeartbeatTimeoutEventType, p, nil))
|
||||||
debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name)
|
debugln("peer.append.timeout: ", p.server.Name(), "->", p.Name)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
traceln("peer.flush.recv: ", p.Name)
|
traceln("peer.append.resp: ", p.server.Name(), "<-", p.Name)
|
||||||
|
|
||||||
// If successful then update the previous log index.
|
// If successful then update the previous log index.
|
||||||
p.mutex.Lock()
|
p.mutex.Lock()
|
||||||
|
@ -194,21 +188,22 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
|
||||||
resp.append = true
|
resp.append = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
traceln("peer.flush.success: ", p.server.Name(), "->", p.Name, "; idx =", p.prevLogIndex)
|
traceln("peer.append.resp.success: ", p.Name, "; idx =", p.prevLogIndex)
|
||||||
|
|
||||||
// If it was unsuccessful then decrement the previous log index and
|
// If it was unsuccessful then decrement the previous log index and
|
||||||
// we'll try again next time.
|
// we'll try again next time.
|
||||||
} else {
|
} else {
|
||||||
if resp.CommitIndex >= p.prevLogIndex {
|
if resp.CommitIndex >= p.prevLogIndex {
|
||||||
|
|
||||||
// we may miss a response from peer
|
// we may miss a response from peer
|
||||||
// so maybe the peer has commited the logs we sent
|
// so maybe the peer has committed the logs we just sent
|
||||||
// but we did not receive the success reply and did not increase
|
// but we did not receive the successful reply and did not increase
|
||||||
// the prevLogIndex
|
// the prevLogIndex
|
||||||
|
|
||||||
p.prevLogIndex = resp.CommitIndex
|
// peer failed to truncate the log and sent a fail reply at this time
|
||||||
|
// we just need to update peer's prevLog index to commitIndex
|
||||||
|
|
||||||
|
p.prevLogIndex = resp.CommitIndex
|
||||||
|
debugln("peer.append.resp.update: ", p.Name, "; idx =", p.prevLogIndex)
|
||||||
|
|
||||||
debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex)
|
|
||||||
} else if p.prevLogIndex > 0 {
|
} else if p.prevLogIndex > 0 {
|
||||||
// Decrement the previous log index down until we find a match. Don't
|
// Decrement the previous log index down until we find a match. Don't
|
||||||
// let it go below where the peer's commit index is though. That's a
|
// let it go below where the peer's commit index is though. That's a
|
||||||
|
@ -219,7 +214,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
|
||||||
p.prevLogIndex = resp.Index
|
p.prevLogIndex = resp.Index
|
||||||
}
|
}
|
||||||
|
|
||||||
debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex)
|
debugln("peer.append.resp.decrement: ", p.Name, "; idx =", p.prevLogIndex)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
p.mutex.Unlock()
|
p.mutex.Unlock()
|
||||||
|
@ -227,7 +222,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
|
||||||
// Attach the peer to resp, thus server can know where it comes from
|
// Attach the peer to resp, thus server can know where it comes from
|
||||||
resp.peer = p.Name
|
resp.peer = p.Name
|
||||||
// Send response to server for processing.
|
// Send response to server for processing.
|
||||||
p.server.send(resp)
|
p.server.sendAsync(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sends an Snapshot request to the peer through the transport.
|
// Sends an Snapshot request to the peer through the transport.
|
||||||
|
@ -271,7 +266,7 @@ func (p *Peer) sendSnapshotRecoveryRequest() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Send response to server for processing.
|
// Send response to server for processing.
|
||||||
p.server.send(&AppendEntriesResponse{Term: resp.Term, Success: resp.Success, append: (resp.Term == p.server.currentTerm)})
|
p.server.sendAsync(&AppendEntriesResponse{Term: resp.Term, Success: resp.Success, append: (resp.Term == p.server.currentTerm)})
|
||||||
}
|
}
|
||||||
|
|
||||||
//--------------------------------------
|
//--------------------------------------
|
||||||
|
@ -283,8 +278,10 @@ func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteRespo
|
||||||
debugln("peer.vote: ", p.server.Name(), "->", p.Name)
|
debugln("peer.vote: ", p.server.Name(), "->", p.Name)
|
||||||
req.peer = p
|
req.peer = p
|
||||||
if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil {
|
if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil {
|
||||||
debugln("peer.vote: recv", p.server.Name(), "<-", p.Name)
|
debugln("peer.vote.recv: ", p.server.Name(), "<-", p.Name)
|
||||||
resp.peer = p
|
resp.peer = p
|
||||||
c <- resp
|
c <- resp
|
||||||
|
} else {
|
||||||
|
debugln("peer.vote.failed: ", p.server.Name(), "<-", p.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
105
server.go
105
server.go
|
@ -119,6 +119,7 @@ type server struct {
|
||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
syncedPeer map[string]bool
|
syncedPeer map[string]bool
|
||||||
|
|
||||||
|
stopped chan bool
|
||||||
c chan *ev
|
c chan *ev
|
||||||
electionTimeout time.Duration
|
electionTimeout time.Duration
|
||||||
heartbeatTimeout time.Duration
|
heartbeatTimeout time.Duration
|
||||||
|
@ -166,6 +167,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
|
||||||
state: Stopped,
|
state: Stopped,
|
||||||
peers: make(map[string]*Peer),
|
peers: make(map[string]*Peer),
|
||||||
log: newLog(),
|
log: newLog(),
|
||||||
|
stopped: make(chan bool),
|
||||||
c: make(chan *ev, 256),
|
c: make(chan *ev, 256),
|
||||||
electionTimeout: DefaultElectionTimeout,
|
electionTimeout: DefaultElectionTimeout,
|
||||||
heartbeatTimeout: DefaultHeartbeatTimeout,
|
heartbeatTimeout: DefaultHeartbeatTimeout,
|
||||||
|
@ -279,6 +281,7 @@ func (s *server) setState(state string) {
|
||||||
s.state = state
|
s.state = state
|
||||||
if state == Leader {
|
if state == Leader {
|
||||||
s.leader = s.Name()
|
s.leader = s.Name()
|
||||||
|
s.syncedPeer = make(map[string]bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dispatch state and leader change events.
|
// Dispatch state and leader change events.
|
||||||
|
@ -463,8 +466,9 @@ func (s *server) Start() error {
|
||||||
// Shuts down the server.
|
// Shuts down the server.
|
||||||
func (s *server) Stop() {
|
func (s *server) Stop() {
|
||||||
s.send(&stopValue)
|
s.send(&stopValue)
|
||||||
s.mutex.Lock()
|
|
||||||
defer s.mutex.Unlock()
|
// make sure the server has stopped before we close the log
|
||||||
|
<-s.stopped
|
||||||
s.log.close()
|
s.log.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -553,6 +557,7 @@ func (s *server) loop() {
|
||||||
s.snapshotLoop()
|
s.snapshotLoop()
|
||||||
|
|
||||||
case Stopped:
|
case Stopped:
|
||||||
|
s.stopped <- true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -561,15 +566,26 @@ func (s *server) loop() {
|
||||||
// Sends an event to the event loop to be processed. The function will wait
|
// Sends an event to the event loop to be processed. The function will wait
|
||||||
// until the event is actually processed before returning.
|
// until the event is actually processed before returning.
|
||||||
func (s *server) send(value interface{}) (interface{}, error) {
|
func (s *server) send(value interface{}) (interface{}, error) {
|
||||||
event := s.sendAsync(value)
|
event := &ev{target: value, c: make(chan error, 1)}
|
||||||
|
s.c <- event
|
||||||
err := <-event.c
|
err := <-event.c
|
||||||
return event.returnValue, err
|
return event.returnValue, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) sendAsync(value interface{}) *ev {
|
func (s *server) sendAsync(value interface{}) {
|
||||||
event := &ev{target: value, c: make(chan error, 1)}
|
event := &ev{target: value, c: make(chan error, 1)}
|
||||||
s.c <- event
|
// try a non-blocking send first
|
||||||
return event
|
// in most cases, this should not be blocking
|
||||||
|
// avoid create unnecessary go routines
|
||||||
|
select {
|
||||||
|
case s.c <- event:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
s.c <- event
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// The event loop that is run when the server is in a Follower state.
|
// The event loop that is run when the server is in a Follower state.
|
||||||
|
@ -578,7 +594,6 @@ func (s *server) sendAsync(value interface{}) *ev {
|
||||||
// 1.Receiving valid AppendEntries RPC, or
|
// 1.Receiving valid AppendEntries RPC, or
|
||||||
// 2.Granting vote to candidate
|
// 2.Granting vote to candidate
|
||||||
func (s *server) followerLoop() {
|
func (s *server) followerLoop() {
|
||||||
|
|
||||||
s.setState(Follower)
|
s.setState(Follower)
|
||||||
since := time.Now()
|
since := time.Now()
|
||||||
electionTimeout := s.ElectionTimeout()
|
electionTimeout := s.ElectionTimeout()
|
||||||
|
@ -739,7 +754,6 @@ func (s *server) candidateLoop() {
|
||||||
// The event loop that is run when the server is in a Leader state.
|
// The event loop that is run when the server is in a Leader state.
|
||||||
func (s *server) leaderLoop() {
|
func (s *server) leaderLoop() {
|
||||||
s.setState(Leader)
|
s.setState(Leader)
|
||||||
s.syncedPeer = make(map[string]bool)
|
|
||||||
logIndex, _ := s.log.lastInfo()
|
logIndex, _ := s.log.lastInfo()
|
||||||
|
|
||||||
// Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
|
// Update the peers prevLogIndex to leader's lastLogIndex and start heartbeat.
|
||||||
|
@ -786,6 +800,7 @@ func (s *server) leaderLoop() {
|
||||||
for _, peer := range s.peers {
|
for _, peer := range s.peers {
|
||||||
peer.stopHeartbeat(false)
|
peer.stopHeartbeat(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
s.syncedPeer = nil
|
s.syncedPeer = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -837,7 +852,7 @@ func (s *server) processCommand(command Command, e *ev) {
|
||||||
s.debugln("server.command.process")
|
s.debugln("server.command.process")
|
||||||
|
|
||||||
// Create an entry for the command in the log.
|
// Create an entry for the command in the log.
|
||||||
entry, err := s.log.createEntry(s.currentTerm, command)
|
entry, err := s.log.createEntry(s.currentTerm, command, e)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.debugln("server.command.log.entry.error:", err)
|
s.debugln("server.command.log.entry.error:", err)
|
||||||
|
@ -851,36 +866,12 @@ func (s *server) processCommand(command Command, e *ev) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Issue a callback for the entry once it's committed.
|
s.syncedPeer[s.Name()] = true
|
||||||
go func() {
|
if len(s.peers) == 0 {
|
||||||
// Wait for the entry to be committed.
|
commitIndex := s.log.currentIndex()
|
||||||
select {
|
s.log.setCommitIndex(commitIndex)
|
||||||
case <-entry.commit:
|
s.debugln("commit index ", commitIndex)
|
||||||
var err error
|
}
|
||||||
s.debugln("server.command.commit")
|
|
||||||
e.returnValue, err = s.log.getEntryResult(entry, true)
|
|
||||||
e.c <- err
|
|
||||||
case <-time.After(time.Second):
|
|
||||||
s.debugln("server.command.timeout")
|
|
||||||
e.c <- CommandTimeoutError
|
|
||||||
}
|
|
||||||
|
|
||||||
entry.commit = nil
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Issue an append entries response for the server.
|
|
||||||
resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex())
|
|
||||||
resp.append = true
|
|
||||||
resp.peer = s.Name()
|
|
||||||
|
|
||||||
// this must be async
|
|
||||||
// sendAsync is not really async every time
|
|
||||||
// when the sending speed of the user is larger than
|
|
||||||
// the processing speed of the server, the buffered channel
|
|
||||||
// will be full. Then sendAsync will become sync, which will
|
|
||||||
// cause deadlock here.
|
|
||||||
// so we use a goroutine to avoid the deadlock
|
|
||||||
go s.sendAsync(resp)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//--------------------------------------
|
//--------------------------------------
|
||||||
|
@ -896,7 +887,6 @@ func (s *server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
|
||||||
|
|
||||||
// Processes the "append entries" request.
|
// Processes the "append entries" request.
|
||||||
func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
|
func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
|
||||||
|
|
||||||
s.traceln("server.ae.process")
|
s.traceln("server.ae.process")
|
||||||
|
|
||||||
if req.Term < s.currentTerm {
|
if req.Term < s.currentTerm {
|
||||||
|
@ -925,7 +915,7 @@ func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append
|
||||||
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
|
return newAppendEntriesResponse(s.currentTerm, false, s.log.currentIndex(), s.log.CommitIndex()), true
|
||||||
}
|
}
|
||||||
|
|
||||||
// once the server appended and commited all the log entries from the leader
|
// once the server appended and committed all the log entries from the leader
|
||||||
|
|
||||||
return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
|
return newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex()), true
|
||||||
}
|
}
|
||||||
|
@ -970,23 +960,10 @@ func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
|
||||||
committedIndex := s.log.commitIndex
|
committedIndex := s.log.commitIndex
|
||||||
|
|
||||||
if commitIndex > committedIndex {
|
if commitIndex > committedIndex {
|
||||||
|
// leader needs to do a fsync before committing log entries
|
||||||
|
s.log.sync()
|
||||||
s.log.setCommitIndex(commitIndex)
|
s.log.setCommitIndex(commitIndex)
|
||||||
s.debugln("commit index ", commitIndex)
|
s.debugln("commit index ", commitIndex)
|
||||||
for i := committedIndex; i < commitIndex; i++ {
|
|
||||||
if entry := s.log.getEntry(i + 1); entry != nil {
|
|
||||||
// if the leader is a new one and the entry came from the
|
|
||||||
// old leader, the commit channel will be nil and no go routine
|
|
||||||
// is waiting from this channel
|
|
||||||
// if we try to send to it, the new leader will get stuck
|
|
||||||
if entry.commit != nil {
|
|
||||||
select {
|
|
||||||
case entry.commit <- true:
|
|
||||||
default:
|
|
||||||
panic("server unable to send signal to commit channel")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1008,7 +985,7 @@ func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
|
||||||
|
|
||||||
// If the request is coming from an old term then reject it.
|
// If the request is coming from an old term then reject it.
|
||||||
if req.Term < s.Term() {
|
if req.Term < s.Term() {
|
||||||
s.debugln("server.rv.error: stale term")
|
s.debugln("server.rv.deny.vote: cause stale term")
|
||||||
return newRequestVoteResponse(s.currentTerm, false), false
|
return newRequestVoteResponse(s.currentTerm, false), false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1016,7 +993,7 @@ func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
|
||||||
|
|
||||||
// If we've already voted for a different candidate then don't vote for this candidate.
|
// If we've already voted for a different candidate then don't vote for this candidate.
|
||||||
if s.votedFor != "" && s.votedFor != req.CandidateName {
|
if s.votedFor != "" && s.votedFor != req.CandidateName {
|
||||||
s.debugln("server.rv.error: duplicate vote: ", req.CandidateName,
|
s.debugln("server.deny.vote: cause duplicate vote: ", req.CandidateName,
|
||||||
" already vote for ", s.votedFor)
|
" already vote for ", s.votedFor)
|
||||||
return newRequestVoteResponse(s.currentTerm, false), false
|
return newRequestVoteResponse(s.currentTerm, false), false
|
||||||
}
|
}
|
||||||
|
@ -1024,7 +1001,7 @@ func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
|
||||||
// If the candidate's log is not at least as up-to-date as our last log then don't vote.
|
// If the candidate's log is not at least as up-to-date as our last log then don't vote.
|
||||||
lastIndex, lastTerm := s.log.lastInfo()
|
lastIndex, lastTerm := s.log.lastInfo()
|
||||||
if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
|
if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
|
||||||
s.debugln("server.rv.error: out of date log: ", req.CandidateName,
|
s.debugln("server.deny.vote: cause out of date log: ", req.CandidateName,
|
||||||
"Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
|
"Index :[", lastIndex, "]", " [", req.LastLogIndex, "]",
|
||||||
"Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
|
"Term :[", lastTerm, "]", " [", req.LastLogTerm, "]")
|
||||||
return newRequestVoteResponse(s.currentTerm, false), false
|
return newRequestVoteResponse(s.currentTerm, false), false
|
||||||
|
@ -1335,7 +1312,7 @@ func (s *server) writeConf() {
|
||||||
confPath := path.Join(s.path, "conf")
|
confPath := path.Join(s.path, "conf")
|
||||||
tmpConfPath := path.Join(s.path, "conf.tmp")
|
tmpConfPath := path.Join(s.path, "conf.tmp")
|
||||||
|
|
||||||
err := ioutil.WriteFile(tmpConfPath, b, 0600)
|
err := writeFileSynced(tmpConfPath, b, 0600)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -1372,9 +1349,13 @@ func (s *server) readConf() error {
|
||||||
//--------------------------------------
|
//--------------------------------------
|
||||||
|
|
||||||
func (s *server) debugln(v ...interface{}) {
|
func (s *server) debugln(v ...interface{}) {
|
||||||
debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...))
|
if logLevel > Debug {
|
||||||
|
debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *server) traceln(v ...interface{}) {
|
func (s *server) traceln(v ...interface{}) {
|
||||||
tracef("[%s] %s", s.name, fmt.Sprintln(v...))
|
if logLevel > Trace {
|
||||||
|
tracef("[%s] %s", s.name, fmt.Sprintln(v...))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,9 +105,9 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
|
||||||
// Ensure that a vote request is denied if the log is out of date.
|
// Ensure that a vote request is denied if the log is out of date.
|
||||||
func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
|
func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
|
||||||
tmpLog := newLog()
|
tmpLog := newLog()
|
||||||
e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
|
e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||||
e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
|
e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100})
|
||||||
e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
|
e2, _ := newLogEntry(tmpLog, nil, 3, 2, &testCommand1{Val: "bar", I: 0})
|
||||||
s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2})
|
s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2})
|
||||||
|
|
||||||
// start as a follower with term 2 and index 3
|
// start as a follower with term 2 and index 3
|
||||||
|
@ -145,7 +145,7 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
|
||||||
|
|
||||||
// // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader.
|
// // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader.
|
||||||
func TestServerPromoteSelf(t *testing.T) {
|
func TestServerPromoteSelf(t *testing.T) {
|
||||||
e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
|
e0, _ := newLogEntry(newLog(), nil, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||||
s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0})
|
s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0})
|
||||||
|
|
||||||
// start as a follower
|
// start as a follower
|
||||||
|
@ -198,7 +198,7 @@ func TestServerAppendEntries(t *testing.T) {
|
||||||
defer s.Stop()
|
defer s.Stop()
|
||||||
|
|
||||||
// Append single entry.
|
// Append single entry.
|
||||||
e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
|
e, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
|
||||||
entries := []*LogEntry{e}
|
entries := []*LogEntry{e}
|
||||||
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
|
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
|
||||||
if resp.Term != 1 || !resp.Success {
|
if resp.Term != 1 || !resp.Success {
|
||||||
|
@ -209,8 +209,8 @@ func TestServerAppendEntries(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Append multiple entries + commit the last one.
|
// Append multiple entries + commit the last one.
|
||||||
e1, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20})
|
e1, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "bar", I: 20})
|
||||||
e2, _ := newLogEntry(nil, 3, 1, &testCommand1{Val: "baz", I: 30})
|
e2, _ := newLogEntry(nil, nil, 3, 1, &testCommand1{Val: "baz", I: 30})
|
||||||
entries = []*LogEntry{e1, e2}
|
entries = []*LogEntry{e1, e2}
|
||||||
resp = s.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries))
|
resp = s.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries))
|
||||||
if resp.Term != 1 || !resp.Success {
|
if resp.Term != 1 || !resp.Success {
|
||||||
|
@ -242,7 +242,7 @@ func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
|
||||||
s.(*server).mutex.Unlock()
|
s.(*server).mutex.Unlock()
|
||||||
|
|
||||||
// Append single entry.
|
// Append single entry.
|
||||||
e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
|
e, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
|
||||||
entries := []*LogEntry{e}
|
entries := []*LogEntry{e}
|
||||||
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
|
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
|
||||||
if resp.Term != 2 || resp.Success {
|
if resp.Term != 2 || resp.Success {
|
||||||
|
@ -260,8 +260,8 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
|
||||||
defer s.Stop()
|
defer s.Stop()
|
||||||
|
|
||||||
// Append single entry + commit.
|
// Append single entry + commit.
|
||||||
e1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
|
e1, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
|
||||||
e2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15})
|
e2, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "foo", I: 15})
|
||||||
entries := []*LogEntry{e1, e2}
|
entries := []*LogEntry{e1, e2}
|
||||||
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries))
|
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries))
|
||||||
if resp.Term != 1 || !resp.Success {
|
if resp.Term != 1 || !resp.Success {
|
||||||
|
@ -269,7 +269,7 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Append entry again (post-commit).
|
// Append entry again (post-commit).
|
||||||
e, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20})
|
e, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "bar", I: 20})
|
||||||
entries = []*LogEntry{e}
|
entries = []*LogEntry{e}
|
||||||
resp = s.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries))
|
resp = s.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries))
|
||||||
if resp.Term != 1 || resp.Success {
|
if resp.Term != 1 || resp.Success {
|
||||||
|
@ -283,9 +283,9 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
|
||||||
s.Start()
|
s.Start()
|
||||||
defer s.Stop()
|
defer s.Stop()
|
||||||
|
|
||||||
entry1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
|
entry1, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
|
||||||
entry2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15})
|
entry2, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "foo", I: 15})
|
||||||
entry3, _ := newLogEntry(nil, 2, 2, &testCommand1{Val: "bar", I: 20})
|
entry3, _ := newLogEntry(nil, nil, 2, 2, &testCommand1{Val: "bar", I: 20})
|
||||||
|
|
||||||
// Append single entry + commit.
|
// Append single entry + commit.
|
||||||
entries := []*LogEntry{entry1, entry2}
|
entries := []*LogEntry{entry1, entry2}
|
||||||
|
@ -495,7 +495,19 @@ func TestServerMultiNode(t *testing.T) {
|
||||||
clonedReq := &RequestVoteRequest{}
|
clonedReq := &RequestVoteRequest{}
|
||||||
json.Unmarshal(b, clonedReq)
|
json.Unmarshal(b, clonedReq)
|
||||||
|
|
||||||
return target.RequestVote(clonedReq)
|
c := make(chan *RequestVoteResponse)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
c <- target.RequestVote(clonedReq)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case resp := <-c:
|
||||||
|
return resp
|
||||||
|
case <-time.After(time.Millisecond * 200):
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
|
transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
|
||||||
mutex.RLock()
|
mutex.RLock()
|
||||||
|
@ -506,7 +518,18 @@ func TestServerMultiNode(t *testing.T) {
|
||||||
clonedReq := &AppendEntriesRequest{}
|
clonedReq := &AppendEntriesRequest{}
|
||||||
json.Unmarshal(b, clonedReq)
|
json.Unmarshal(b, clonedReq)
|
||||||
|
|
||||||
return target.AppendEntries(clonedReq)
|
c := make(chan *AppendEntriesResponse)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
c <- target.AppendEntries(clonedReq)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case resp := <-c:
|
||||||
|
return resp
|
||||||
|
case <-time.After(time.Millisecond * 200):
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
disTransporter := &testTransporter{}
|
disTransporter := &testTransporter{}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package raft
|
package raft
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
@ -17,7 +16,7 @@ func TestSnapshot(t *testing.T) {
|
||||||
s.Do(&testCommand1{})
|
s.Do(&testCommand1{})
|
||||||
err := s.TakeSnapshot()
|
err := s.TakeSnapshot()
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Equal(t, s.(*server).lastSnapshot.LastIndex, uint64(3))
|
assert.Equal(t, s.(*server).lastSnapshot.LastIndex, uint64(2))
|
||||||
|
|
||||||
// Repeat to make sure new snapshot gets created.
|
// Repeat to make sure new snapshot gets created.
|
||||||
s.Do(&testCommand1{})
|
s.Do(&testCommand1{})
|
||||||
|
@ -35,14 +34,6 @@ func TestSnapshot(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure that snapshotting fails if there are no log entries yet.
|
|
||||||
func TestSnapshotWithNoLog(t *testing.T) {
|
|
||||||
runServerWithMockStateMachine(Leader, func (s Server, m *mock.Mock) {
|
|
||||||
err := s.TakeSnapshot()
|
|
||||||
assert.Equal(t, err, errors.New("No logs"))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure that a snapshot request can be sent and received.
|
// Ensure that a snapshot request can be sent and received.
|
||||||
func TestSnapshotRequest(t *testing.T) {
|
func TestSnapshotRequest(t *testing.T) {
|
||||||
runServerWithMockStateMachine(Follower, func (s Server, m *mock.Mock) {
|
runServerWithMockStateMachine(Follower, func (s Server, m *mock.Mock) {
|
||||||
|
|
2
test.go
2
test.go
|
@ -103,7 +103,7 @@ func newTestServerWithLog(name string, transporter Transporter, entries []*LogEn
|
||||||
|
|
||||||
func newTestCluster(names []string, transporter Transporter, lookup map[string]Server) []Server {
|
func newTestCluster(names []string, transporter Transporter, lookup map[string]Server) []Server {
|
||||||
servers := []Server{}
|
servers := []Server{}
|
||||||
e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
|
e0, _ := newLogEntry(newLog(), nil, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||||
|
|
||||||
for _, name := range names {
|
for _, name := range names {
|
||||||
if lookup[name] != nil {
|
if lookup[name] != nil {
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
package raft
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
// WriteFile writes data to a file named by filename.
|
||||||
|
// If the file does not exist, WriteFile creates it with permissions perm;
|
||||||
|
// otherwise WriteFile truncates it before writing.
|
||||||
|
// This is copied from ioutil.WriteFile with the addition of a Sync call to
|
||||||
|
// ensure the data reaches the disk.
|
||||||
|
func writeFileSynced(filename string, data []byte, perm os.FileMode) error {
|
||||||
|
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := f.Write(data)
|
||||||
|
if n < len(data) {
|
||||||
|
f.Close()
|
||||||
|
return io.ErrShortWrite
|
||||||
|
}
|
||||||
|
|
||||||
|
err = f.Sync()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return f.Close()
|
||||||
|
}
|
Loading…
Reference in New Issue