diff --git a/append_entries_request.go b/append_entries_request.go index 071c72b1ac..038bd16e3d 100644 --- a/append_entries_request.go +++ b/append_entries_request.go @@ -1,13 +1,12 @@ package raft import ( - "bytes" - "encoding/binary" + "code.google.com/p/goprotobuf/proto" + "github.com/coreos/go-raft/protobuf" "io" + "io/ioutil" ) -const appendEntriesRequestHeaderSize = 4 + 8 + 8 + 8 + 8 + 4 + 4 - // The request sent to a server to append entries to the log. type AppendEntriesRequest struct { Term uint64 @@ -30,84 +29,75 @@ func newAppendEntriesRequest(term uint64, prevLogIndex uint64, prevLogTerm uint6 } } +// Encodes the AppendEntriesRequest to a buffer. Returns the number of bytes +// written and any error that may have occurred. func (req *AppendEntriesRequest) encode(w io.Writer) (int, error) { - leaderNameSize := len(req.LeaderName) - b := make([]byte, appendEntriesRequestHeaderSize + leaderNameSize) - // Write request. - binary.BigEndian.PutUint32(b[0:4], protocolVersion) - binary.BigEndian.PutUint64(b[4:12], req.Term) - binary.BigEndian.PutUint64(b[12:20], req.PrevLogIndex) - binary.BigEndian.PutUint64(b[20:28], req.PrevLogTerm) - binary.BigEndian.PutUint64(b[28:36], req.CommitIndex) - binary.BigEndian.PutUint32(b[36:40], uint32(leaderNameSize)) - binary.BigEndian.PutUint32(b[40:44], uint32(len(req.Entries))) - copy(b[44:44+leaderNameSize], []byte(req.LeaderName)) + protoEntries := make([]*protobuf.ProtoAppendEntriesRequest_ProtoLogEntry, len(req.Entries)) - // Append entries. - buf := bytes.NewBuffer(b) - for _, entry := range req.Entries { - if _, err := entry.encode(buf); err != nil { - return 0, err + for i, entry := range req.Entries { + protoEntries[i] = &protobuf.ProtoAppendEntriesRequest_ProtoLogEntry{ + Index: proto.Uint64(entry.Index), + Term: proto.Uint64(entry.Term), + CommandName: proto.String(entry.CommandName), + Command: entry.Command, } } - return w.Write(buf.Bytes()) + p := proto.NewBuffer(nil) + + pb := &protobuf.ProtoAppendEntriesRequest{ + Term: proto.Uint64(req.Term), + PrevLogIndex: proto.Uint64(req.PrevLogIndex), + PrevLogTerm: proto.Uint64(req.PrevLogTerm), + CommitIndex: proto.Uint64(req.CommitIndex), + LeaderName: proto.String(req.LeaderName), + Entries: protoEntries, + } + err := p.Marshal(pb) + + if err != nil { + return -1, err + } + + return w.Write(p.Bytes()) } +// Decodes the AppendEntriesRequest from a buffer. Returns the number of bytes read and +// any error that occurs. func (req *AppendEntriesRequest) decode(r io.Reader) (int, error) { - var eof error - header := make([]byte, appendEntriesRequestHeaderSize) - if n, err := r.Read(header); err == io.EOF { - return n, io.ErrUnexpectedEOF - } else if err != nil { - return n, err - } - entryCount := int(binary.BigEndian.Uint32(header[40:44])) + data, err := ioutil.ReadAll(r) - // Read leader name. - leaderName := make([]byte, binary.BigEndian.Uint32(header[36:40])) - if n, err := r.Read(leaderName); err == io.EOF { - if err == io.EOF && n != len(leaderName) { - return appendEntriesRequestHeaderSize+n, io.ErrUnexpectedEOF - } else { - eof = io.EOF - } - } else if err != nil { - return appendEntriesRequestHeaderSize+n, err + if err != nil { + return -1, err } - totalBytes := appendEntriesRequestHeaderSize + len(leaderName) - // Read entries. - entries := []*LogEntry{} - for i:=0; i s.currentTerm { s.setCurrentTerm(resp.Term, "", false) @@ -782,6 +784,7 @@ func (s *Server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse { // Processes a "request vote" request. func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) { + // If the request is coming from an old term then reject it. if req.Term < s.currentTerm { s.debugln("server.rv.error: stale term") diff --git a/server_test.go b/server_test.go index 796fd22f48..0719435404 100644 --- a/server_test.go +++ b/server_test.go @@ -87,9 +87,9 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) { // Ensure that a vote request is denied if the log is out of date. func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) { - e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val:"foo", I:20}) - e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X:100}) - e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val:"bar", I:0}) + e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20}) + e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100}) + e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val: "bar", I: 0}) server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2}) server.Initialize() server.StartLeader() @@ -175,7 +175,7 @@ func TestServerAppendEntries(t *testing.T) { defer server.Stop() // Append single entry. - e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val:"foo", I:10}) + e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) entries := []*LogEntry{e} resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries)) if resp.Term != 1 || !resp.Success { @@ -186,8 +186,8 @@ func TestServerAppendEntries(t *testing.T) { } // Append multiple entries + commit the last one. - e1, _ := newLogEntry(nil, 2, 1, &testCommand1{Val:"bar", I:20}) - e2, _ := newLogEntry(nil, 3, 1, &testCommand1{Val:"baz", I:30}) + e1, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20}) + e2, _ := newLogEntry(nil, 3, 1, &testCommand1{Val: "baz", I: 30}) entries = []*LogEntry{e1, e2} resp = server.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries)) if resp.Term != 1 || !resp.Success { @@ -217,7 +217,7 @@ func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) { server.currentTerm = 2 // Append single entry. - e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val:"foo", I:10}) + e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) entries := []*LogEntry{e} resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries)) if resp.Term != 2 || resp.Success { @@ -237,8 +237,8 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) { defer server.Stop() // Append single entry + commit. - e1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val:"foo", I:10}) - e2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val:"foo", I:15}) + e1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) + e2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15}) entries := []*LogEntry{e1, e2} resp := server.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries)) if resp.Term != 1 || !resp.Success { @@ -246,7 +246,7 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) { } // Append entry again (post-commit). - e, _ := newLogEntry(nil, 2, 1, &testCommand1{Val:"bar", I:20}) + e, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20}) entries = []*LogEntry{e} resp = server.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries)) if resp.Term != 1 || resp.Success { @@ -261,9 +261,9 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) { server.StartLeader() defer server.Stop() - entry1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val:"foo", I:10}) - entry2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val:"foo", I:15}) - entry3, _ := newLogEntry(nil, 2, 2, &testCommand1{Val:"bar", I:20}) + entry1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) + entry2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15}) + entry3, _ := newLogEntry(nil, 2, 2, &testCommand1{Val: "bar", I: 20}) // Append single entry + commit. entries := []*LogEntry{entry1, entry2} @@ -291,7 +291,7 @@ func TestServerDenyCommandExecutionWhenFollower(t *testing.T) { server.StartFollower() defer server.Stop() var err error - if _, err = server.Do(&testCommand1{Val:"foo", I:10}); err != NotLeaderError { + if _, err = server.Do(&testCommand1{Val: "foo", I: 10}); err != NotLeaderError { t.Fatalf("Expected error: %v, got: %v", NotLeaderError, err) } } diff --git a/test.go b/test.go index b771ac1a61..7275e4705a 100644 --- a/test.go +++ b/test.go @@ -40,14 +40,18 @@ func setupLog(entries []*LogEntry) (*Log, string) { for _, entry := range entries { entry.encode(f) } - f.Close() - + err := f.Close() + + if err != nil { + panic(err) + } + log := newLog() log.ApplyFunc = func(c Command) (interface{}, error) { return nil, nil } if err := log.open(f.Name()); err != nil { - panic("Unable to open log") + panic(err) } return log, f.Name() } @@ -151,7 +155,6 @@ func (c *joinCommand) Apply(server *Server) (interface{}, error) { return nil, err } - //-------------------------------------- // Command1 //--------------------------------------