commit
925a5afdda
|
@ -1,13 +1,12 @@
|
|||
package raft
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
"github.com/coreos/go-raft/protobuf"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
const appendEntriesRequestHeaderSize = 4 + 8 + 8 + 8 + 8 + 4 + 4
|
||||
|
||||
// The request sent to a server to append entries to the log.
|
||||
type AppendEntriesRequest struct {
|
||||
Term uint64
|
||||
|
@ -30,84 +29,75 @@ func newAppendEntriesRequest(term uint64, prevLogIndex uint64, prevLogTerm uint6
|
|||
}
|
||||
}
|
||||
|
||||
// Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes
|
||||
// written and any error that may have occurred.
|
||||
func (req *AppendEntriesRequest) encode(w io.Writer) (int, error) {
|
||||
leaderNameSize := len(req.LeaderName)
|
||||
b := make([]byte, appendEntriesRequestHeaderSize + leaderNameSize)
|
||||
|
||||
// Write request.
|
||||
binary.BigEndian.PutUint32(b[0:4], protocolVersion)
|
||||
binary.BigEndian.PutUint64(b[4:12], req.Term)
|
||||
binary.BigEndian.PutUint64(b[12:20], req.PrevLogIndex)
|
||||
binary.BigEndian.PutUint64(b[20:28], req.PrevLogTerm)
|
||||
binary.BigEndian.PutUint64(b[28:36], req.CommitIndex)
|
||||
binary.BigEndian.PutUint32(b[36:40], uint32(leaderNameSize))
|
||||
binary.BigEndian.PutUint32(b[40:44], uint32(len(req.Entries)))
|
||||
copy(b[44:44+leaderNameSize], []byte(req.LeaderName))
|
||||
protoEntries := make([]*protobuf.ProtoAppendEntriesRequest_ProtoLogEntry, len(req.Entries))
|
||||
|
||||
// Append entries.
|
||||
buf := bytes.NewBuffer(b)
|
||||
for _, entry := range req.Entries {
|
||||
if _, err := entry.encode(buf); err != nil {
|
||||
return 0, err
|
||||
for i, entry := range req.Entries {
|
||||
protoEntries[i] = &protobuf.ProtoAppendEntriesRequest_ProtoLogEntry{
|
||||
Index: proto.Uint64(entry.Index),
|
||||
Term: proto.Uint64(entry.Term),
|
||||
CommandName: proto.String(entry.CommandName),
|
||||
Command: entry.Command,
|
||||
}
|
||||
}
|
||||
|
||||
return w.Write(buf.Bytes())
|
||||
p := proto.NewBuffer(nil)
|
||||
|
||||
pb := &protobuf.ProtoAppendEntriesRequest{
|
||||
Term: proto.Uint64(req.Term),
|
||||
PrevLogIndex: proto.Uint64(req.PrevLogIndex),
|
||||
PrevLogTerm: proto.Uint64(req.PrevLogTerm),
|
||||
CommitIndex: proto.Uint64(req.CommitIndex),
|
||||
LeaderName: proto.String(req.LeaderName),
|
||||
Entries: protoEntries,
|
||||
}
|
||||
err := p.Marshal(pb)
|
||||
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
return w.Write(p.Bytes())
|
||||
}
|
||||
|
||||
// Decodes the AppendEntriesRequest from a buffer. Returns the number of bytes read and
|
||||
// any error that occurs.
|
||||
func (req *AppendEntriesRequest) decode(r io.Reader) (int, error) {
|
||||
var eof error
|
||||
header := make([]byte, appendEntriesRequestHeaderSize)
|
||||
if n, err := r.Read(header); err == io.EOF {
|
||||
return n, io.ErrUnexpectedEOF
|
||||
} else if err != nil {
|
||||
return n, err
|
||||
}
|
||||
entryCount := int(binary.BigEndian.Uint32(header[40:44]))
|
||||
data, err := ioutil.ReadAll(r)
|
||||
|
||||
// Read leader name.
|
||||
leaderName := make([]byte, binary.BigEndian.Uint32(header[36:40]))
|
||||
if n, err := r.Read(leaderName); err == io.EOF {
|
||||
if err == io.EOF && n != len(leaderName) {
|
||||
return appendEntriesRequestHeaderSize+n, io.ErrUnexpectedEOF
|
||||
} else {
|
||||
eof = io.EOF
|
||||
}
|
||||
} else if err != nil {
|
||||
return appendEntriesRequestHeaderSize+n, err
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
totalBytes := appendEntriesRequestHeaderSize + len(leaderName)
|
||||
|
||||
// Read entries.
|
||||
entries := []*LogEntry{}
|
||||
for i:=0; i<entryCount; i++ {
|
||||
entry := &LogEntry{}
|
||||
n, err := entry.decode(r)
|
||||
entries = append(entries, entry)
|
||||
totalBytes += n
|
||||
|
||||
if err == io.EOF {
|
||||
if len(entries) == entryCount {
|
||||
err = io.EOF
|
||||
} else {
|
||||
return totalBytes, io.ErrUnexpectedEOF
|
||||
}
|
||||
} else if err != nil {
|
||||
return totalBytes, err
|
||||
totalBytes := len(data)
|
||||
|
||||
pb := &protobuf.ProtoAppendEntriesRequest{}
|
||||
p := proto.NewBuffer(data)
|
||||
|
||||
err = p.Unmarshal(pb)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
req.Term = pb.GetTerm()
|
||||
req.PrevLogIndex = pb.GetPrevLogIndex()
|
||||
req.PrevLogTerm = pb.GetPrevLogTerm()
|
||||
req.CommitIndex = pb.GetCommitIndex()
|
||||
req.LeaderName = pb.GetLeaderName()
|
||||
|
||||
req.Entries = make([]*LogEntry, len(pb.Entries))
|
||||
|
||||
for i, entry := range pb.Entries {
|
||||
req.Entries[i] = &LogEntry{
|
||||
Index: entry.GetIndex(),
|
||||
Term: entry.GetTerm(),
|
||||
CommandName: entry.GetCommandName(),
|
||||
Command: entry.Command,
|
||||
}
|
||||
}
|
||||
|
||||
// Verify that the encoding format can be read.
|
||||
if version := binary.BigEndian.Uint32(header[0:4]); version != protocolVersion {
|
||||
return totalBytes, errUnsupportedLogVersion
|
||||
}
|
||||
|
||||
req.Term = binary.BigEndian.Uint64(header[4:12])
|
||||
req.PrevLogIndex = binary.BigEndian.Uint64(header[12:20])
|
||||
req.PrevLogTerm = binary.BigEndian.Uint64(header[20:28])
|
||||
req.CommitIndex = binary.BigEndian.Uint64(header[28:36])
|
||||
req.LeaderName = string(leaderName)
|
||||
req.Entries = entries
|
||||
|
||||
return totalBytes, eof
|
||||
return totalBytes, nil
|
||||
}
|
||||
|
|
|
@ -5,25 +5,6 @@ import (
|
|||
"testing"
|
||||
)
|
||||
|
||||
// Ensure that we can encode and decode append entries requests.
|
||||
func TestAppendEntriesRequestEncodeDecode(t *testing.T) {
|
||||
var b bytes.Buffer
|
||||
e0, _ := newLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"})
|
||||
e1, _ := newLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1001"})
|
||||
r0 := newAppendEntriesRequest(1, 2, 3, 4, "ldr", []*LogEntry{e0, e1})
|
||||
if _, err := r0.encode(&b); err != nil {
|
||||
t.Fatal("AE request encoding error:", err)
|
||||
}
|
||||
|
||||
r1 := &AppendEntriesRequest{}
|
||||
if _, err := r1.decode(&b); err != nil {
|
||||
t.Fatal("AE request decoding error:", err)
|
||||
}
|
||||
if r1.Term != 1 || r1.PrevLogIndex != 2 || r1.PrevLogTerm != 3 || r1.CommitIndex != 4 || r1.LeaderName != "ldr" || len(r1.Entries) != 2 {
|
||||
t.Fatal("Invalid AE data:", r1.Term, r1.PrevLogIndex, r1.PrevLogTerm, r1.CommitIndex, r1.LeaderName, len(r1.Entries))
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAppendEntriesRequestEncoding(b *testing.B) {
|
||||
req, tmp := createTestAppendEntriesRequest(2000)
|
||||
b.ResetTimer()
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
package raft
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
"github.com/coreos/go-raft/protobuf"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
const appendEntriesResponseHeaderSize = 4 + 8 + 1 + 8 + 8
|
||||
|
||||
// The response returned from a server appending entries to the log.
|
||||
type AppendEntriesResponse struct {
|
||||
Term uint64
|
||||
|
@ -28,41 +28,50 @@ func newAppendEntriesResponse(term uint64, success bool, index uint64, commitInd
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
// Encodes the AppendEntriesResponse to a buffer. Returns the number of bytes
|
||||
// written and any error that may have occurred.
|
||||
func (resp *AppendEntriesResponse) encode(w io.Writer) (int, error) {
|
||||
b := make([]byte, appendEntriesResponseHeaderSize)
|
||||
|
||||
binary.BigEndian.PutUint32(b[0:4], protocolVersion)
|
||||
binary.BigEndian.PutUint64(b[4:12], resp.Term)
|
||||
bigEndianPutBool(b[12:13], resp.Success)
|
||||
binary.BigEndian.PutUint64(b[13:21], resp.Index)
|
||||
binary.BigEndian.PutUint64(b[21:29], resp.CommitIndex)
|
||||
p := proto.NewBuffer(nil)
|
||||
|
||||
return w.Write(b)
|
||||
pb := &protobuf.ProtoAppendEntriesResponse{
|
||||
Term: proto.Uint64(resp.Term),
|
||||
Index: proto.Uint64(resp.Index),
|
||||
CommitIndex: proto.Uint64(resp.CommitIndex),
|
||||
Success: proto.Bool(resp.Success),
|
||||
}
|
||||
|
||||
err := p.Marshal(pb)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
return w.Write(p.Bytes())
|
||||
}
|
||||
|
||||
// Decodes the AppendEntriesResponse from a buffer. Returns the number of bytes read and
|
||||
// any error that occurs.
|
||||
func (resp *AppendEntriesResponse) decode(r io.Reader) (int, error) {
|
||||
var eof error
|
||||
header := make([]byte, appendEntriesResponseHeaderSize)
|
||||
if n, err := r.Read(header); err == io.EOF {
|
||||
if n == len(header) {
|
||||
eof = io.EOF
|
||||
} else {
|
||||
return n, io.ErrUnexpectedEOF
|
||||
}
|
||||
} else if err != nil {
|
||||
return n, err
|
||||
data, err := ioutil.ReadAll(r)
|
||||
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
// Verify that the encoding format can be read.
|
||||
if version := binary.BigEndian.Uint32(header[0:4]); version != protocolVersion {
|
||||
return appendEntriesResponseHeaderSize, errUnsupportedLogVersion
|
||||
totalBytes := len(data)
|
||||
|
||||
pb := &protobuf.ProtoAppendEntriesResponse{}
|
||||
p := proto.NewBuffer(data)
|
||||
|
||||
err = p.Unmarshal(pb)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
resp.Term = binary.BigEndian.Uint64(header[4:12])
|
||||
resp.Success = bigEndianBool(header[12:13])
|
||||
resp.Index = binary.BigEndian.Uint64(header[13:21])
|
||||
resp.CommitIndex = binary.BigEndian.Uint64(header[21:29])
|
||||
resp.Term = pb.GetTerm()
|
||||
resp.Index = pb.GetIndex()
|
||||
resp.CommitIndex = pb.GetCommitIndex()
|
||||
resp.Success = pb.GetSuccess()
|
||||
|
||||
return appendEntriesResponseHeaderSize, eof
|
||||
return totalBytes, nil
|
||||
}
|
||||
|
|
|
@ -5,23 +5,6 @@ import (
|
|||
"testing"
|
||||
)
|
||||
|
||||
// Ensure that we can encode and decode append entries responses.
|
||||
func TestAppendEntriesResponseEncodeDecode(t *testing.T) {
|
||||
var b bytes.Buffer
|
||||
r0 := newAppendEntriesResponse(1, true, 2, 3)
|
||||
if _, err := r0.encode(&b); err != nil {
|
||||
t.Fatal("AE response encoding error:", err)
|
||||
}
|
||||
|
||||
r1 := &AppendEntriesResponse{}
|
||||
if _, err := r1.decode(&b); err != nil {
|
||||
t.Fatal("AE response decoding error:", err)
|
||||
}
|
||||
if r1.Term != 1 || r1.Success != true || r1.Index != 2 || r1.CommitIndex != 3 {
|
||||
t.Fatal("Invalid AE response data:", r1.Term, r1.Success, r1.Index, r1.CommitIndex)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkAppendEntriesResponseEncoding(b *testing.B) {
|
||||
req, tmp := createTestAppendEntriesResponse(2000)
|
||||
b.ResetTimer()
|
||||
|
|
13
binary.go
13
binary.go
|
@ -1,13 +0,0 @@
|
|||
package raft
|
||||
|
||||
func bigEndianPutBool(b []byte, value bool) {
|
||||
if value {
|
||||
b[0] = 1
|
||||
} else {
|
||||
b[0] = 0
|
||||
}
|
||||
}
|
||||
|
||||
func bigEndianBool(b []byte) bool {
|
||||
return !(b[0] == 0)
|
||||
}
|
|
@ -96,7 +96,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r
|
|||
traceln(server.Name(), "POST", url)
|
||||
|
||||
client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
|
||||
httpResp, err := client.Post(url, "application/octet-stream", &b)
|
||||
httpResp, err := client.Post(url, "application/protobuf", &b)
|
||||
if httpResp == nil || err != nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -121,7 +121,7 @@ func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *Reque
|
|||
traceln(server.Name(), "POST", url)
|
||||
|
||||
client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
|
||||
httpResp, err := client.Post(url, "application/octet-stream", &b)
|
||||
httpResp, err := client.Post(url, "application/protobuf", &b)
|
||||
if httpResp == nil || err != nil {
|
||||
return nil
|
||||
}
|
||||
|
|
119
log_entry.go
119
log_entry.go
|
@ -2,25 +2,21 @@ package raft
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"hash/crc32"
|
||||
"fmt"
|
||||
"github.com/coreos/go-raft/protobuf"
|
||||
"io"
|
||||
)
|
||||
|
||||
const logEntryHeaderSize int = 4 + 4 + 8 + 8 + 4 + 4
|
||||
|
||||
var errInvalidChecksum = errors.New("Invalid checksum")
|
||||
|
||||
// A log entry stores a single item in the log.
|
||||
type LogEntry struct {
|
||||
log *Log
|
||||
Index uint64
|
||||
Term uint64
|
||||
log *Log
|
||||
Index uint64
|
||||
Term uint64
|
||||
CommandName string
|
||||
Command []byte
|
||||
commit chan bool
|
||||
Command []byte
|
||||
commit chan bool
|
||||
}
|
||||
|
||||
// Creates a new log entry associated with a log.
|
||||
|
@ -39,12 +35,12 @@ func newLogEntry(log *Log, index uint64, term uint64, command Command) (*LogEntr
|
|||
}
|
||||
|
||||
e := &LogEntry{
|
||||
log: log,
|
||||
Index: index,
|
||||
Term: term,
|
||||
log: log,
|
||||
Index: index,
|
||||
Term: term,
|
||||
CommandName: commandName,
|
||||
Command: buf.Bytes(),
|
||||
commit: make(chan bool, 5),
|
||||
Command: buf.Bytes(),
|
||||
commit: make(chan bool, 5),
|
||||
}
|
||||
|
||||
return e, nil
|
||||
|
@ -53,65 +49,64 @@ func newLogEntry(log *Log, index uint64, term uint64, command Command) (*LogEntr
|
|||
// Encodes the log entry to a buffer. Returns the number of bytes
|
||||
// written and any error that may have occurred.
|
||||
func (e *LogEntry) encode(w io.Writer) (int, error) {
|
||||
commandNameSize, commandSize := len([]byte(e.CommandName)), len(e.Command)
|
||||
b := make([]byte, logEntryHeaderSize + commandNameSize + commandSize)
|
||||
|
||||
// Write log entry.
|
||||
binary.BigEndian.PutUint32(b[4:8], protocolVersion)
|
||||
binary.BigEndian.PutUint64(b[8:16], e.Term)
|
||||
binary.BigEndian.PutUint64(b[16:24], e.Index)
|
||||
binary.BigEndian.PutUint32(b[24:28], uint32(commandNameSize))
|
||||
binary.BigEndian.PutUint32(b[28:32], uint32(commandSize))
|
||||
copy(b[32:32+commandNameSize], []byte(e.CommandName))
|
||||
copy(b[32+commandNameSize:], e.Command)
|
||||
p := proto.NewBuffer(nil)
|
||||
|
||||
// Write checksum.
|
||||
binary.BigEndian.PutUint32(b[0:4], crc32.ChecksumIEEE(b[4:]))
|
||||
pb := &protobuf.ProtoLogEntry{
|
||||
Index: proto.Uint64(e.Index),
|
||||
Term: proto.Uint64(e.Term),
|
||||
CommandName: proto.String(e.CommandName),
|
||||
Command: e.Command,
|
||||
}
|
||||
|
||||
return w.Write(b)
|
||||
err := p.Marshal(pb)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return -1, err
|
||||
}
|
||||
|
||||
_, err = fmt.Fprintf(w, "%x", len(p.Bytes()))
|
||||
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
return w.Write(p.Bytes())
|
||||
}
|
||||
|
||||
// Decodes the log entry from a buffer. Returns the number of bytes read and
|
||||
// any error that occurs.
|
||||
func (e *LogEntry) decode(r io.Reader) (int, error) {
|
||||
// Read the header.
|
||||
header := make([]byte, logEntryHeaderSize)
|
||||
if n, err := r.Read(header); err != nil {
|
||||
return n, err
|
||||
|
||||
var length int
|
||||
_, err := fmt.Fscanf(r, "%x", &length)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
return -1, err
|
||||
}
|
||||
|
||||
// Read command name.
|
||||
commandName := make([]byte, binary.BigEndian.Uint32(header[24:28]))
|
||||
if n, err := r.Read(commandName); err != nil {
|
||||
return logEntryHeaderSize+n, err
|
||||
data := make([]byte, length)
|
||||
|
||||
_, err = r.Read(data)
|
||||
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
// Read command data.
|
||||
command := make([]byte, binary.BigEndian.Uint32(header[28:32]))
|
||||
if n, err := r.Read(command); err != nil {
|
||||
return logEntryHeaderSize+len(commandName)+n, err
|
||||
}
|
||||
totalBytes := logEntryHeaderSize + len(commandName) + len(command)
|
||||
pb := &protobuf.ProtoLogEntry{}
|
||||
p := proto.NewBuffer(data)
|
||||
err = p.Unmarshal(pb)
|
||||
|
||||
// Verify checksum.
|
||||
checksum := binary.BigEndian.Uint32(header[0:4])
|
||||
crc := crc32.NewIEEE()
|
||||
crc.Write(header[4:])
|
||||
crc.Write(commandName)
|
||||
crc.Write(command)
|
||||
if checksum != crc.Sum32() {
|
||||
return totalBytes, errInvalidChecksum
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Verify that the encoding format can be read.
|
||||
if version := binary.BigEndian.Uint32(header[4:8]); version != protocolVersion {
|
||||
return totalBytes, errUnsupportedLogVersion
|
||||
}
|
||||
|
||||
e.Term = binary.BigEndian.Uint64(header[8:16])
|
||||
e.Index = binary.BigEndian.Uint64(header[16:24])
|
||||
e.CommandName = string(commandName)
|
||||
e.Command = command
|
||||
e.Term = pb.GetTerm()
|
||||
e.Index = pb.GetIndex()
|
||||
e.CommandName = pb.GetCommandName()
|
||||
e.Command = pb.Command
|
||||
|
||||
return totalBytes, nil
|
||||
return length, nil
|
||||
}
|
||||
|
|
|
@ -1,43 +0,0 @@
|
|||
package raft
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
)
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
//
|
||||
// Tests
|
||||
//
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
//--------------------------------------
|
||||
// Encoding
|
||||
//--------------------------------------
|
||||
|
||||
// Ensure that we can encode and decode a log entry.
|
||||
func TestLogEntryEncodeDecode(t *testing.T) {
|
||||
// Create entry.
|
||||
e1, err := newLogEntry(nil, 1, 2, &joinCommand{Name: "localhost:1000"})
|
||||
if err != nil {
|
||||
t.Fatal("Unable to create entry: ", err)
|
||||
}
|
||||
|
||||
// Encode the entry to a buffer.
|
||||
var buf bytes.Buffer
|
||||
if n, err := e1.encode(&buf); n == 0 || err != nil {
|
||||
t.Fatal("Unable to encode entry: ", n, err)
|
||||
}
|
||||
|
||||
// Decode into a new entry.
|
||||
e2 := &LogEntry{}
|
||||
if n, err := e2.decode(&buf); n == 0 || err != nil {
|
||||
t.Fatal("Unable to decode entry: ", n, err)
|
||||
}
|
||||
if e2.Index != 1 || e2.Term != 2 {
|
||||
t.Fatal("Unexpected log entry encoding:", e2.Index, e2.Term)
|
||||
}
|
||||
if e2.CommandName != "test:join" || !bytes.Equal(e1.Command, e2.Command) {
|
||||
t.Fatal("Unexpected log entry command encoding:", e2.CommandName, len(e1.Command), len(e2.Command))
|
||||
}
|
||||
}
|
32
log_test.go
32
log_test.go
|
@ -30,15 +30,15 @@ func TestLogNewLog(t *testing.T) {
|
|||
defer log.close()
|
||||
defer os.Remove(path)
|
||||
|
||||
e, _ := newLogEntry(log, 1, 1, &testCommand1{Val:"foo", I:20})
|
||||
e, _ := newLogEntry(log, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||
if err := log.appendEntry(e); err != nil {
|
||||
t.Fatalf("Unable to append: %v", err)
|
||||
}
|
||||
e, _ = newLogEntry(log, 2, 1, &testCommand2{X:100})
|
||||
e, _ = newLogEntry(log, 2, 1, &testCommand2{X: 100})
|
||||
if err := log.appendEntry(e); err != nil {
|
||||
t.Fatalf("Unable to append: %v", err)
|
||||
}
|
||||
e, _ = newLogEntry(log, 3, 2, &testCommand1{Val:"bar", I:0})
|
||||
e, _ = newLogEntry(log, 3, 2, &testCommand1{Val: "bar", I: 0})
|
||||
if err := log.appendEntry(e); err != nil {
|
||||
t.Fatalf("Unable to append: %v", err)
|
||||
}
|
||||
|
@ -62,9 +62,9 @@ func TestLogNewLog(t *testing.T) {
|
|||
|
||||
// Ensure that we can decode and encode to an existing log.
|
||||
func TestLogExistingLog(t *testing.T) {
|
||||
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val:"foo", I:20})
|
||||
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X:100})
|
||||
e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val:"bar", I:0})
|
||||
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100})
|
||||
e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val: "bar", I: 0})
|
||||
log, path := setupLog([]*LogEntry{e0, e1, e2})
|
||||
defer log.close()
|
||||
defer os.Remove(path)
|
||||
|
@ -86,9 +86,9 @@ func TestLogExistingLog(t *testing.T) {
|
|||
|
||||
// Ensure that we can check the contents of the log by index/term.
|
||||
func TestLogContainsEntries(t *testing.T) {
|
||||
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val:"foo", I:20})
|
||||
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X:100})
|
||||
e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val:"bar", I:0})
|
||||
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100})
|
||||
e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val: "bar", I: 0})
|
||||
log, path := setupLog([]*LogEntry{e0, e1, e2})
|
||||
defer log.close()
|
||||
defer os.Remove(path)
|
||||
|
@ -112,8 +112,8 @@ func TestLogContainsEntries(t *testing.T) {
|
|||
|
||||
// Ensure that we can recover from an incomplete/corrupt log and continue logging.
|
||||
func TestLogRecovery(t *testing.T) {
|
||||
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val:"foo", I:20})
|
||||
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X:100})
|
||||
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100})
|
||||
f, _ := ioutil.TempFile("", "raft-log-")
|
||||
e0.encode(f)
|
||||
e1.encode(f)
|
||||
|
@ -130,14 +130,14 @@ func TestLogRecovery(t *testing.T) {
|
|||
defer log.close()
|
||||
defer os.Remove(f.Name())
|
||||
|
||||
e, _ := newLogEntry(log, 3, 2, &testCommand1{Val:"bat", I:-5})
|
||||
e, _ := newLogEntry(log, 3, 2, &testCommand1{Val: "bat", I: -5})
|
||||
if err := log.appendEntry(e); err != nil {
|
||||
t.Fatalf("Unable to append: %v", err)
|
||||
}
|
||||
|
||||
// Validate existing log entries.
|
||||
if len(log.entries) != 3 {
|
||||
t.Fatalf("Expected 2 entries, got %d", len(log.entries))
|
||||
t.Fatalf("Expected 3 entries, got %d", len(log.entries))
|
||||
}
|
||||
if log.entries[0].Index != 1 || log.entries[0].Term != 1 {
|
||||
t.Fatalf("Unexpected entry[0]: %v", log.entries[0])
|
||||
|
@ -163,15 +163,15 @@ func TestLogTruncate(t *testing.T) {
|
|||
defer log.close()
|
||||
defer os.Remove(path)
|
||||
|
||||
entry1, _ := newLogEntry(log, 1, 1, &testCommand1{Val:"foo", I:20})
|
||||
entry1, _ := newLogEntry(log, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||
if err := log.appendEntry(entry1); err != nil {
|
||||
t.Fatalf("Unable to append: %v", err)
|
||||
}
|
||||
entry2, _ := newLogEntry(log, 2, 1, &testCommand2{X:100})
|
||||
entry2, _ := newLogEntry(log, 2, 1, &testCommand2{X: 100})
|
||||
if err := log.appendEntry(entry2); err != nil {
|
||||
t.Fatalf("Unable to append: %v", err)
|
||||
}
|
||||
entry3, _ := newLogEntry(log, 3, 2, &testCommand1{Val:"bar", I:0})
|
||||
entry3, _ := newLogEntry(log, 3, 2, &testCommand1{Val: "bar", I: 0})
|
||||
if err := log.appendEntry(entry3); err != nil {
|
||||
t.Fatalf("Unable to append: %v", err)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,115 @@
|
|||
// Code generated by protoc-gen-go.
|
||||
// source: append_entreis_request.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
package protobuf
|
||||
|
||||
import proto "code.google.com/p/goprotobuf/proto"
|
||||
import json "encoding/json"
|
||||
import math "math"
|
||||
|
||||
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = &json.SyntaxError{}
|
||||
var _ = math.Inf
|
||||
|
||||
type ProtoAppendEntriesRequest struct {
|
||||
Term *uint64 `protobuf:"varint,1,req" json:"Term,omitempty"`
|
||||
PrevLogIndex *uint64 `protobuf:"varint,2,req" json:"PrevLogIndex,omitempty"`
|
||||
PrevLogTerm *uint64 `protobuf:"varint,3,req" json:"PrevLogTerm,omitempty"`
|
||||
CommitIndex *uint64 `protobuf:"varint,4,req" json:"CommitIndex,omitempty"`
|
||||
LeaderName *string `protobuf:"bytes,5,req" json:"LeaderName,omitempty"`
|
||||
Entries []*ProtoAppendEntriesRequest_ProtoLogEntry `protobuf:"bytes,6,rep" json:"Entries,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *ProtoAppendEntriesRequest) Reset() { *m = ProtoAppendEntriesRequest{} }
|
||||
func (m *ProtoAppendEntriesRequest) String() string { return proto.CompactTextString(m) }
|
||||
func (*ProtoAppendEntriesRequest) ProtoMessage() {}
|
||||
|
||||
func (m *ProtoAppendEntriesRequest) GetTerm() uint64 {
|
||||
if m != nil && m.Term != nil {
|
||||
return *m.Term
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ProtoAppendEntriesRequest) GetPrevLogIndex() uint64 {
|
||||
if m != nil && m.PrevLogIndex != nil {
|
||||
return *m.PrevLogIndex
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ProtoAppendEntriesRequest) GetPrevLogTerm() uint64 {
|
||||
if m != nil && m.PrevLogTerm != nil {
|
||||
return *m.PrevLogTerm
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ProtoAppendEntriesRequest) GetCommitIndex() uint64 {
|
||||
if m != nil && m.CommitIndex != nil {
|
||||
return *m.CommitIndex
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ProtoAppendEntriesRequest) GetLeaderName() string {
|
||||
if m != nil && m.LeaderName != nil {
|
||||
return *m.LeaderName
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *ProtoAppendEntriesRequest) GetEntries() []*ProtoAppendEntriesRequest_ProtoLogEntry {
|
||||
if m != nil {
|
||||
return m.Entries
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type ProtoAppendEntriesRequest_ProtoLogEntry struct {
|
||||
Index *uint64 `protobuf:"varint,1,req" json:"Index,omitempty"`
|
||||
Term *uint64 `protobuf:"varint,2,req" json:"Term,omitempty"`
|
||||
CommandName *string `protobuf:"bytes,3,req" json:"CommandName,omitempty"`
|
||||
Command []byte `protobuf:"bytes,4,req" json:"Command,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *ProtoAppendEntriesRequest_ProtoLogEntry) Reset() {
|
||||
*m = ProtoAppendEntriesRequest_ProtoLogEntry{}
|
||||
}
|
||||
func (m *ProtoAppendEntriesRequest_ProtoLogEntry) String() string { return proto.CompactTextString(m) }
|
||||
func (*ProtoAppendEntriesRequest_ProtoLogEntry) ProtoMessage() {}
|
||||
|
||||
func (m *ProtoAppendEntriesRequest_ProtoLogEntry) GetIndex() uint64 {
|
||||
if m != nil && m.Index != nil {
|
||||
return *m.Index
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ProtoAppendEntriesRequest_ProtoLogEntry) GetTerm() uint64 {
|
||||
if m != nil && m.Term != nil {
|
||||
return *m.Term
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ProtoAppendEntriesRequest_ProtoLogEntry) GetCommandName() string {
|
||||
if m != nil && m.CommandName != nil {
|
||||
return *m.CommandName
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *ProtoAppendEntriesRequest_ProtoLogEntry) GetCommand() []byte {
|
||||
if m != nil {
|
||||
return m.Command
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package protobuf;
|
||||
|
||||
message ProtoAppendEntriesRequest {
|
||||
required uint64 Term=1;
|
||||
required uint64 PrevLogIndex=2;
|
||||
required uint64 PrevLogTerm=3;
|
||||
required uint64 CommitIndex=4;
|
||||
required string LeaderName=5;
|
||||
|
||||
message ProtoLogEntry {
|
||||
required uint64 Index=1;
|
||||
required uint64 Term=2;
|
||||
required string CommandName=3;
|
||||
required bytes Command=4;
|
||||
}
|
||||
|
||||
repeated ProtoLogEntry Entries=6;
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
// Code generated by protoc-gen-go.
|
||||
// source: append_entries_responses.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
package protobuf
|
||||
|
||||
import proto "code.google.com/p/goprotobuf/proto"
|
||||
import json "encoding/json"
|
||||
import math "math"
|
||||
|
||||
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = &json.SyntaxError{}
|
||||
var _ = math.Inf
|
||||
|
||||
type ProtoAppendEntriesResponse struct {
|
||||
Term *uint64 `protobuf:"varint,1,req" json:"Term,omitempty"`
|
||||
Index *uint64 `protobuf:"varint,2,req" json:"Index,omitempty"`
|
||||
CommitIndex *uint64 `protobuf:"varint,3,req" json:"CommitIndex,omitempty"`
|
||||
Success *bool `protobuf:"varint,4,req" json:"Success,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *ProtoAppendEntriesResponse) Reset() { *m = ProtoAppendEntriesResponse{} }
|
||||
func (m *ProtoAppendEntriesResponse) String() string { return proto.CompactTextString(m) }
|
||||
func (*ProtoAppendEntriesResponse) ProtoMessage() {}
|
||||
|
||||
func (m *ProtoAppendEntriesResponse) GetTerm() uint64 {
|
||||
if m != nil && m.Term != nil {
|
||||
return *m.Term
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ProtoAppendEntriesResponse) GetIndex() uint64 {
|
||||
if m != nil && m.Index != nil {
|
||||
return *m.Index
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ProtoAppendEntriesResponse) GetCommitIndex() uint64 {
|
||||
if m != nil && m.CommitIndex != nil {
|
||||
return *m.CommitIndex
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ProtoAppendEntriesResponse) GetSuccess() bool {
|
||||
if m != nil && m.Success != nil {
|
||||
return *m.Success
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func init() {
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
package protobuf;
|
||||
|
||||
message ProtoAppendEntriesResponse {
|
||||
required uint64 Term=1;
|
||||
required uint64 Index=2;
|
||||
required uint64 CommitIndex=3;
|
||||
required bool Success=4;
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
// Code generated by protoc-gen-go.
|
||||
// source: log_entry.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
package protobuf
|
||||
|
||||
import proto "code.google.com/p/goprotobuf/proto"
|
||||
import json "encoding/json"
|
||||
import math "math"
|
||||
|
||||
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = &json.SyntaxError{}
|
||||
var _ = math.Inf
|
||||
|
||||
type ProtoLogEntry struct {
|
||||
Index *uint64 `protobuf:"varint,1,req" json:"Index,omitempty"`
|
||||
Term *uint64 `protobuf:"varint,2,req" json:"Term,omitempty"`
|
||||
CommandName *string `protobuf:"bytes,3,req" json:"CommandName,omitempty"`
|
||||
Command []byte `protobuf:"bytes,4,opt" json:"Command,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *ProtoLogEntry) Reset() { *m = ProtoLogEntry{} }
|
||||
func (m *ProtoLogEntry) String() string { return proto.CompactTextString(m) }
|
||||
func (*ProtoLogEntry) ProtoMessage() {}
|
||||
|
||||
func (m *ProtoLogEntry) GetIndex() uint64 {
|
||||
if m != nil && m.Index != nil {
|
||||
return *m.Index
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ProtoLogEntry) GetTerm() uint64 {
|
||||
if m != nil && m.Term != nil {
|
||||
return *m.Term
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ProtoLogEntry) GetCommandName() string {
|
||||
if m != nil && m.CommandName != nil {
|
||||
return *m.CommandName
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *ProtoLogEntry) GetCommand() []byte {
|
||||
if m != nil {
|
||||
return m.Command
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
package protobuf;
|
||||
|
||||
message ProtoLogEntry {
|
||||
required uint64 Index=1;
|
||||
required uint64 Term=2;
|
||||
required string CommandName=3;
|
||||
optional bytes Command=4; // for nop-command
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
// Code generated by protoc-gen-go.
|
||||
// source: request_vote_request.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
package protobuf
|
||||
|
||||
import proto "code.google.com/p/goprotobuf/proto"
|
||||
import json "encoding/json"
|
||||
import math "math"
|
||||
|
||||
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = &json.SyntaxError{}
|
||||
var _ = math.Inf
|
||||
|
||||
type ProtoRequestVoteRequest struct {
|
||||
Term *uint64 `protobuf:"varint,1,req" json:"Term,omitempty"`
|
||||
LastLogIndex *uint64 `protobuf:"varint,2,req" json:"LastLogIndex,omitempty"`
|
||||
LastLogTerm *uint64 `protobuf:"varint,3,req" json:"LastLogTerm,omitempty"`
|
||||
CandidateName *string `protobuf:"bytes,4,req" json:"CandidateName,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *ProtoRequestVoteRequest) Reset() { *m = ProtoRequestVoteRequest{} }
|
||||
func (m *ProtoRequestVoteRequest) String() string { return proto.CompactTextString(m) }
|
||||
func (*ProtoRequestVoteRequest) ProtoMessage() {}
|
||||
|
||||
func (m *ProtoRequestVoteRequest) GetTerm() uint64 {
|
||||
if m != nil && m.Term != nil {
|
||||
return *m.Term
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ProtoRequestVoteRequest) GetLastLogIndex() uint64 {
|
||||
if m != nil && m.LastLogIndex != nil {
|
||||
return *m.LastLogIndex
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ProtoRequestVoteRequest) GetLastLogTerm() uint64 {
|
||||
if m != nil && m.LastLogTerm != nil {
|
||||
return *m.LastLogTerm
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ProtoRequestVoteRequest) GetCandidateName() string {
|
||||
if m != nil && m.CandidateName != nil {
|
||||
return *m.CandidateName
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func init() {
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
package protobuf;
|
||||
|
||||
message ProtoRequestVoteRequest {
|
||||
required uint64 Term=1;
|
||||
required uint64 LastLogIndex=2;
|
||||
required uint64 LastLogTerm=3;
|
||||
required string CandidateName=4;
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
// Code generated by protoc-gen-go.
|
||||
// source: request_vote_responses.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
package protobuf
|
||||
|
||||
import proto "code.google.com/p/goprotobuf/proto"
|
||||
import json "encoding/json"
|
||||
import math "math"
|
||||
|
||||
// Reference proto, json, and math imports to suppress error if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = &json.SyntaxError{}
|
||||
var _ = math.Inf
|
||||
|
||||
type ProtoRequestVoteResponse struct {
|
||||
Term *uint64 `protobuf:"varint,1,req" json:"Term,omitempty"`
|
||||
VoteGranted *bool `protobuf:"varint,2,req" json:"VoteGranted,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *ProtoRequestVoteResponse) Reset() { *m = ProtoRequestVoteResponse{} }
|
||||
func (m *ProtoRequestVoteResponse) String() string { return proto.CompactTextString(m) }
|
||||
func (*ProtoRequestVoteResponse) ProtoMessage() {}
|
||||
|
||||
func (m *ProtoRequestVoteResponse) GetTerm() uint64 {
|
||||
if m != nil && m.Term != nil {
|
||||
return *m.Term
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *ProtoRequestVoteResponse) GetVoteGranted() bool {
|
||||
if m != nil && m.VoteGranted != nil {
|
||||
return *m.VoteGranted
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func init() {
|
||||
}
|
|
@ -0,0 +1,6 @@
|
|||
package protobuf;
|
||||
|
||||
message ProtoRequestVoteResponse {
|
||||
required uint64 Term=1;
|
||||
required bool VoteGranted=2;
|
||||
}
|
10
protocol.go
10
protocol.go
|
@ -1,10 +0,0 @@
|
|||
package raft
|
||||
|
||||
import "errors"
|
||||
|
||||
// The version of the protocol currently used by the Raft library. This
|
||||
// can change as the library evolves and future versions should be
|
||||
// backwards compatible.
|
||||
const protocolVersion uint32 = 1
|
||||
|
||||
var errUnsupportedLogVersion = errors.New("Unsupported log version")
|
|
@ -1,12 +1,12 @@
|
|||
package raft
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
"github.com/coreos/go-raft/protobuf"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
const requestVoteRequestHeaderSize = 4 + 8 + 8 + 8 + 4
|
||||
|
||||
// The request sent to a server to vote for a candidate to become a leader.
|
||||
type RequestVoteRequest struct {
|
||||
peer *Peer
|
||||
|
@ -26,52 +26,50 @@ func newRequestVoteRequest(term uint64, candidateName string, lastLogIndex uint6
|
|||
}
|
||||
}
|
||||
|
||||
// Encodes the RequestVoteRequest to a buffer. Returns the number of bytes
|
||||
// written and any error that may have occurred.
|
||||
func (req *RequestVoteRequest) encode(w io.Writer) (int, error) {
|
||||
candidateNameSize := len(req.CandidateName)
|
||||
b := make([]byte, requestVoteRequestHeaderSize + candidateNameSize)
|
||||
|
||||
// Write request.
|
||||
binary.BigEndian.PutUint32(b[0:4], protocolVersion)
|
||||
binary.BigEndian.PutUint64(b[4:12], req.Term)
|
||||
binary.BigEndian.PutUint64(b[12:20], req.LastLogIndex)
|
||||
binary.BigEndian.PutUint64(b[20:28], req.LastLogTerm)
|
||||
binary.BigEndian.PutUint32(b[28:32], uint32(candidateNameSize))
|
||||
copy(b[32:], []byte(req.CandidateName))
|
||||
p := proto.NewBuffer(nil)
|
||||
|
||||
return w.Write(b)
|
||||
pb := &protobuf.ProtoRequestVoteRequest{
|
||||
Term: proto.Uint64(req.Term),
|
||||
LastLogIndex: proto.Uint64(req.LastLogIndex),
|
||||
LastLogTerm: proto.Uint64(req.LastLogTerm),
|
||||
CandidateName: proto.String(req.CandidateName),
|
||||
}
|
||||
err := p.Marshal(pb)
|
||||
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
return w.Write(p.Bytes())
|
||||
}
|
||||
|
||||
// Decodes the RequestVoteRequest from a buffer. Returns the number of bytes read and
|
||||
// any error that occurs.
|
||||
func (req *RequestVoteRequest) decode(r io.Reader) (int, error) {
|
||||
var eof error
|
||||
header := make([]byte, requestVoteRequestHeaderSize)
|
||||
if n, err := r.Read(header); err == io.EOF {
|
||||
return n, io.ErrUnexpectedEOF
|
||||
} else if err != nil {
|
||||
return n, err
|
||||
data, err := ioutil.ReadAll(r)
|
||||
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
// Read candidate name.
|
||||
candidateName := make([]byte, binary.BigEndian.Uint32(header[28:32]))
|
||||
if n, err := r.Read(candidateName); err == io.EOF {
|
||||
if err == io.EOF && n != len(candidateName) {
|
||||
return requestVoteRequestHeaderSize+n, io.ErrUnexpectedEOF
|
||||
} else {
|
||||
eof = io.EOF
|
||||
}
|
||||
} else if err != nil {
|
||||
return requestVoteRequestHeaderSize+n, err
|
||||
}
|
||||
totalBytes := requestVoteRequestHeaderSize + len(candidateName)
|
||||
totalBytes := len(data)
|
||||
|
||||
// Verify that the encoding format can be read.
|
||||
if version := binary.BigEndian.Uint32(header[0:4]); version != protocolVersion {
|
||||
return totalBytes, errUnsupportedLogVersion
|
||||
pb := &protobuf.ProtoRequestVoteRequest{}
|
||||
p := proto.NewBuffer(data)
|
||||
|
||||
err = p.Unmarshal(pb)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
req.Term = binary.BigEndian.Uint64(header[4:12])
|
||||
req.LastLogIndex = binary.BigEndian.Uint64(header[12:20])
|
||||
req.LastLogTerm = binary.BigEndian.Uint64(header[20:28])
|
||||
req.CandidateName = string(candidateName)
|
||||
req.Term = pb.GetTerm()
|
||||
req.LastLogIndex = pb.GetLastLogIndex()
|
||||
req.LastLogTerm = pb.GetLastLogTerm()
|
||||
req.CandidateName = pb.GetCandidateName()
|
||||
|
||||
return totalBytes, eof
|
||||
return totalBytes, nil
|
||||
}
|
||||
|
|
|
@ -1,23 +0,0 @@
|
|||
package raft
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Ensure that we can encode and decode request vote requests.
|
||||
func TestRequestVoteRequestEncodeDecode(t *testing.T) {
|
||||
var b bytes.Buffer
|
||||
r0 := newRequestVoteRequest(1, "ldr", 2, 3)
|
||||
if _, err := r0.encode(&b); err != nil {
|
||||
t.Fatal("RV request encoding error:", err)
|
||||
}
|
||||
|
||||
r1 := &RequestVoteRequest{}
|
||||
if _, err := r1.decode(&b); err != nil {
|
||||
t.Fatal("RV request decoding error:", err)
|
||||
}
|
||||
if r1.Term != 1 || r1.CandidateName != "ldr" || r1.LastLogIndex != 2 || r1.LastLogTerm != 3 {
|
||||
t.Fatal("Invalid RV request data:", r1.Term, r1.CandidateName, r1.LastLogIndex, r1.LastLogTerm)
|
||||
}
|
||||
}
|
|
@ -1,12 +1,12 @@
|
|||
package raft
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"code.google.com/p/goprotobuf/proto"
|
||||
"github.com/coreos/go-raft/protobuf"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
const requestVoteResponseHeaderSize = 4 + 8 + 1
|
||||
|
||||
// The response returned from a server after a vote for a candidate to become a leader.
|
||||
type RequestVoteResponse struct {
|
||||
peer *Peer
|
||||
|
@ -22,36 +22,47 @@ func newRequestVoteResponse(term uint64, voteGranted bool) *RequestVoteResponse
|
|||
}
|
||||
}
|
||||
|
||||
// Encodes the RequestVoteResponse to a buffer. Returns the number of bytes
|
||||
// written and any error that may have occurred.
|
||||
func (resp *RequestVoteResponse) encode(w io.Writer) (int, error) {
|
||||
b := make([]byte, requestVoteResponseHeaderSize)
|
||||
|
||||
binary.BigEndian.PutUint32(b[0:4], protocolVersion)
|
||||
binary.BigEndian.PutUint64(b[4:12], resp.Term)
|
||||
bigEndianPutBool(b[12:13], resp.VoteGranted)
|
||||
p := proto.NewBuffer(nil)
|
||||
|
||||
return w.Write(b)
|
||||
pb := &protobuf.ProtoRequestVoteResponse{
|
||||
Term: proto.Uint64(resp.Term),
|
||||
VoteGranted: proto.Bool(resp.VoteGranted),
|
||||
}
|
||||
|
||||
err := p.Marshal(pb)
|
||||
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
return w.Write(p.Bytes())
|
||||
}
|
||||
|
||||
// Decodes the RequestVoteResponse from a buffer. Returns the number of bytes read and
|
||||
// any error that occurs.
|
||||
func (resp *RequestVoteResponse) decode(r io.Reader) (int, error) {
|
||||
var eof error
|
||||
header := make([]byte, requestVoteResponseHeaderSize)
|
||||
if n, err := r.Read(header); err == io.EOF {
|
||||
if n == len(header) {
|
||||
eof = io.EOF
|
||||
} else {
|
||||
return n, io.ErrUnexpectedEOF
|
||||
}
|
||||
} else if err != nil {
|
||||
return n, err
|
||||
data, err := ioutil.ReadAll(r)
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Verify that the encoding format can be read.
|
||||
if version := binary.BigEndian.Uint32(header[0:4]); version != protocolVersion {
|
||||
return requestVoteResponseHeaderSize, errUnsupportedLogVersion
|
||||
totalBytes := len(data)
|
||||
|
||||
pb := &protobuf.ProtoRequestVoteResponse{}
|
||||
p := proto.NewBuffer(data)
|
||||
|
||||
err = p.Unmarshal(pb)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
resp.Term = binary.BigEndian.Uint64(header[4:12])
|
||||
resp.VoteGranted = bigEndianBool(header[12:13])
|
||||
resp.Term = pb.GetTerm()
|
||||
resp.VoteGranted = pb.GetVoteGranted()
|
||||
|
||||
return requestVoteResponseHeaderSize, eof
|
||||
return totalBytes, nil
|
||||
}
|
||||
|
|
|
@ -1,23 +0,0 @@
|
|||
package raft
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// Ensure that we can encode and decode request vote responses.
|
||||
func TestRequestVoteResponseEncodeDecode(t *testing.T) {
|
||||
var b bytes.Buffer
|
||||
r0 := newRequestVoteResponse(1, true)
|
||||
if _, err := r0.encode(&b); err != nil {
|
||||
t.Fatal("RV response encoding error:", err)
|
||||
}
|
||||
|
||||
r1 := &RequestVoteResponse{}
|
||||
if _, err := r1.decode(&b); err != nil {
|
||||
t.Fatal("RV response decoding error:", err)
|
||||
}
|
||||
if r1.Term != 1 || r1.VoteGranted != true {
|
||||
t.Fatal("Invalid RV response data:", r1.Term, r1.VoteGranted)
|
||||
}
|
||||
}
|
|
@ -677,6 +677,7 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
|
|||
|
||||
// Processes the "append entries" request.
|
||||
func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
|
||||
|
||||
s.traceln("server.ae.process")
|
||||
|
||||
if req.Term < s.currentTerm {
|
||||
|
@ -712,6 +713,7 @@ func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append
|
|||
// processed when the server is a leader. Responses received during other
|
||||
// states are dropped.
|
||||
func (s *Server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
|
||||
|
||||
// If we find a higher term then change to a follower and exit.
|
||||
if resp.Term > s.currentTerm {
|
||||
s.setCurrentTerm(resp.Term, "", false)
|
||||
|
@ -782,6 +784,7 @@ func (s *Server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
|
|||
|
||||
// Processes a "request vote" request.
|
||||
func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
|
||||
|
||||
// If the request is coming from an old term then reject it.
|
||||
if req.Term < s.currentTerm {
|
||||
s.debugln("server.rv.error: stale term")
|
||||
|
|
|
@ -87,9 +87,9 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
|
|||
|
||||
// Ensure that a vote request is denied if the log is out of date.
|
||||
func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
|
||||
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val:"foo", I:20})
|
||||
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X:100})
|
||||
e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val:"bar", I:0})
|
||||
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
|
||||
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100})
|
||||
e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val: "bar", I: 0})
|
||||
server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2})
|
||||
server.Initialize()
|
||||
server.StartLeader()
|
||||
|
@ -175,7 +175,7 @@ func TestServerAppendEntries(t *testing.T) {
|
|||
defer server.Stop()
|
||||
|
||||
// Append single entry.
|
||||
e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val:"foo", I:10})
|
||||
e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
|
||||
entries := []*LogEntry{e}
|
||||
resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
|
||||
if resp.Term != 1 || !resp.Success {
|
||||
|
@ -186,8 +186,8 @@ func TestServerAppendEntries(t *testing.T) {
|
|||
}
|
||||
|
||||
// Append multiple entries + commit the last one.
|
||||
e1, _ := newLogEntry(nil, 2, 1, &testCommand1{Val:"bar", I:20})
|
||||
e2, _ := newLogEntry(nil, 3, 1, &testCommand1{Val:"baz", I:30})
|
||||
e1, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20})
|
||||
e2, _ := newLogEntry(nil, 3, 1, &testCommand1{Val: "baz", I: 30})
|
||||
entries = []*LogEntry{e1, e2}
|
||||
resp = server.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries))
|
||||
if resp.Term != 1 || !resp.Success {
|
||||
|
@ -217,7 +217,7 @@ func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
|
|||
server.currentTerm = 2
|
||||
|
||||
// Append single entry.
|
||||
e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val:"foo", I:10})
|
||||
e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
|
||||
entries := []*LogEntry{e}
|
||||
resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
|
||||
if resp.Term != 2 || resp.Success {
|
||||
|
@ -237,8 +237,8 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
|
|||
defer server.Stop()
|
||||
|
||||
// Append single entry + commit.
|
||||
e1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val:"foo", I:10})
|
||||
e2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val:"foo", I:15})
|
||||
e1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
|
||||
e2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15})
|
||||
entries := []*LogEntry{e1, e2}
|
||||
resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries))
|
||||
if resp.Term != 1 || !resp.Success {
|
||||
|
@ -246,7 +246,7 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
|
|||
}
|
||||
|
||||
// Append entry again (post-commit).
|
||||
e, _ := newLogEntry(nil, 2, 1, &testCommand1{Val:"bar", I:20})
|
||||
e, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20})
|
||||
entries = []*LogEntry{e}
|
||||
resp = server.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries))
|
||||
if resp.Term != 1 || resp.Success {
|
||||
|
@ -261,9 +261,9 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
|
|||
server.StartLeader()
|
||||
defer server.Stop()
|
||||
|
||||
entry1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val:"foo", I:10})
|
||||
entry2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val:"foo", I:15})
|
||||
entry3, _ := newLogEntry(nil, 2, 2, &testCommand1{Val:"bar", I:20})
|
||||
entry1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
|
||||
entry2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15})
|
||||
entry3, _ := newLogEntry(nil, 2, 2, &testCommand1{Val: "bar", I: 20})
|
||||
|
||||
// Append single entry + commit.
|
||||
entries := []*LogEntry{entry1, entry2}
|
||||
|
@ -291,7 +291,7 @@ func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
|
|||
server.StartFollower()
|
||||
defer server.Stop()
|
||||
var err error
|
||||
if _, err = server.Do(&testCommand1{Val:"foo", I:10}); err != NotLeaderError {
|
||||
if _, err = server.Do(&testCommand1{Val: "foo", I: 10}); err != NotLeaderError {
|
||||
t.Fatalf("Expected error: %v, got: %v", NotLeaderError, err)
|
||||
}
|
||||
}
|
||||
|
|
11
test.go
11
test.go
|
@ -40,14 +40,18 @@ func setupLog(entries []*LogEntry) (*Log, string) {
|
|||
for _, entry := range entries {
|
||||
entry.encode(f)
|
||||
}
|
||||
f.Close()
|
||||
|
||||
err := f.Close()
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
log := newLog()
|
||||
log.ApplyFunc = func(c Command) (interface{}, error) {
|
||||
return nil, nil
|
||||
}
|
||||
if err := log.open(f.Name()); err != nil {
|
||||
panic("Unable to open log")
|
||||
panic(err)
|
||||
}
|
||||
return log, f.Name()
|
||||
}
|
||||
|
@ -151,7 +155,6 @@ func (c *joinCommand) Apply(server *Server) (interface{}, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
//--------------------------------------
|
||||
// Command1
|
||||
//--------------------------------------
|
||||
|
|
Loading…
Reference in New Issue