Merge pull request #88 from coreos/master

encode entry should reuse memory rather than alloc each time
pull/820/head
Ben Johnson 2013-08-01 19:41:47 -07:00
commit 0be5a48b52
9 changed files with 102 additions and 29 deletions

View File

@ -20,7 +20,7 @@ var logLevel int = 0
var logger *log.Logger
func init() {
logger = log.New(os.Stdout, "raft", log.Lmicroseconds)
logger = log.New(os.Stdout, "[raft]", log.Lmicroseconds)
}
//------------------------------------------------------------------------------

View File

@ -38,6 +38,7 @@ type HTTPMuxer interface {
// Creates a new HTTP transporter with the given path prefix.
func NewHTTPTransporter(prefix string) *HTTPTransporter {
return &HTTPTransporter{
DisableKeepAlives: false,
prefix: prefix,
appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"),
requestVotePath: fmt.Sprintf("%s%s", prefix, "/requestVote"),

View File

@ -88,3 +88,67 @@ func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTrans
// Wait until everything is done.
wg.Wait()
}
func BenchmarkSpeed(b *testing.B) {
transporter := NewHTTPTransporter("/raft")
transporter.DisableKeepAlives = true
servers := []*Server{}
for i:= 0; i < 3; i++ {
port := 9000 + i
// Create raft server.
server := newTestServer(fmt.Sprintf("localhost:%d", port), transporter)
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.SetElectionTimeout(testElectionTimeout)
server.Start()
defer server.Stop()
servers = append(servers, server)
// Create listener for HTTP server and start it.
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
panic(err)
}
defer listener.Close()
// Create wrapping HTTP server.
mux := http.NewServeMux()
transporter.Install(server, mux)
httpServer := &http.Server{Addr: fmt.Sprintf(":%d", port), Handler: mux}
go func() { httpServer.Serve(listener) }()
}
// Setup configuration.
for _, server := range servers {
(servers)[0].Do(&DefaultJoinCommand{Name: server.Name()})
}
c := make(chan bool)
// Wait for configuration to propagate.
time.Sleep(2 * time.Second)
b.ResetTimer()
for n := 0; n < b.N; n++ {
for i := 0; i < 1000; i++ {
go send(c, servers[0])
}
for i := 0; i < 1000; i++ {
<-c
}
}
}
func send(c chan bool, s *Server) {
for i := 0; i < 20; i++ {
s.Do(&NOPCommand{})
}
c <- true
}

9
log.go
View File

@ -2,8 +2,10 @@ package raft
import (
"bufio"
"code.google.com/p/goprotobuf/proto"
"errors"
"fmt"
"github.com/benbjohnson/go-raft/protobuf"
"io"
"os"
"sync"
@ -26,6 +28,8 @@ type Log struct {
mutex sync.RWMutex
startIndex uint64 // the index before the first entry in the Log entries
startTerm uint64
pBuffer *proto.Buffer
pLogEntry *protobuf.ProtoLogEntry
}
// The results of the applying a log entry.
@ -43,7 +47,9 @@ type logResult struct {
// Creates a new log.
func newLog() *Log {
return &Log{
entries: make([]*LogEntry, 0),
entries: make([]*LogEntry, 0),
pBuffer: proto.NewBuffer(nil),
pLogEntry: &protobuf.ProtoLogEntry{},
}
}
@ -470,6 +476,7 @@ func (l *Log) appendEntries(entries []*LogEntry) error {
var err error
// Append each entry but exit if we hit an error.
for _, entry := range entries {
entry.log = l
if size, err = l.writeEntry(entry, w); err != nil {
return err
}

View File

@ -50,29 +50,26 @@ func newLogEntry(log *Log, index uint64, term uint64, command Command) (*LogEntr
// Encodes the log entry to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (e *LogEntry) encode(w io.Writer) (int, error) {
defer e.log.pBuffer.Reset()
p := proto.NewBuffer(nil)
e.log.pLogEntry.Index = proto.Uint64(e.Index)
e.log.pLogEntry.Term = proto.Uint64(e.Term)
e.log.pLogEntry.CommandName = proto.String(e.CommandName)
e.log.pLogEntry.Command = e.Command
pb := &protobuf.ProtoLogEntry{
Index: proto.Uint64(e.Index),
Term: proto.Uint64(e.Term),
CommandName: proto.String(e.CommandName),
Command: e.Command,
}
err := p.Marshal(pb)
err := e.log.pBuffer.Marshal(e.log.pLogEntry)
if err != nil {
return -1, err
}
_, err = fmt.Fprintf(w, "%8x\n", len(p.Bytes()))
_, err = fmt.Fprintf(w, "%8x\n", len(e.log.pBuffer.Bytes()))
if err != nil {
return -1, err
}
return w.Write(p.Bytes())
return w.Write(e.log.pBuffer.Bytes())
}
// Decodes the log entry from a buffer. Returns the number of bytes read and

View File

@ -62,9 +62,10 @@ func TestLogNewLog(t *testing.T) {
// Ensure that we can decode and encode to an existing log.
func TestLogExistingLog(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val: "bar", I: 0})
tmpLog := newLog()
e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
log, path := setupLog([]*LogEntry{e0, e1, e2})
defer log.close()
defer os.Remove(path)
@ -86,9 +87,10 @@ func TestLogExistingLog(t *testing.T) {
// Ensure that we can check the contents of the log by index/term.
func TestLogContainsEntries(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val: "bar", I: 0})
tmpLog := newLog()
e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
log, path := setupLog([]*LogEntry{e0, e1, e2})
defer log.close()
defer os.Remove(path)
@ -112,8 +114,9 @@ func TestLogContainsEntries(t *testing.T) {
// Ensure that we can recover from an incomplete/corrupt log and continue logging.
func TestLogRecovery(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100})
tmpLog := newLog()
e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
f, _ := ioutil.TempFile("", "raft-log-")
e0.encode(f)

View File

@ -29,7 +29,7 @@ const (
)
const (
MaxLogEntriesPerRequest = 200
MaxLogEntriesPerRequest = 2000
NumberOfLogEntriesAfterSnapshot = 200
)

View File

@ -102,9 +102,10 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
// Ensure that a vote request is denied if the log is out of date.
func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val: "bar", I: 0})
tmpLog := newLog()
e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2})
// start as a follower with term 2 and index 3
@ -143,7 +144,7 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
// // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader.
func TestServerPromoteSelf(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0})
// start as a follower

View File

@ -8,8 +8,8 @@ import (
)
const (
testHeartbeatTimeout = 5 * time.Millisecond
testElectionTimeout = 20 * time.Millisecond
testHeartbeatTimeout = 50 * time.Millisecond
testElectionTimeout = 200 * time.Millisecond
)
func init() {
@ -85,7 +85,7 @@ func newTestServerWithLog(name string, transporter Transporter, entries []*LogEn
func newTestCluster(names []string, transporter Transporter, lookup map[string]*Server) []*Server {
servers := []*Server{}
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20})
e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
for _, name := range names {
if lookup[name] != nil {