move buf to log struct

pull/820/head
Xiang Li 2013-08-01 17:58:03 -07:00
parent 1ff290c4a6
commit 78cb651d93
8 changed files with 69 additions and 28 deletions

View File

@ -7,6 +7,8 @@ import (
"net/http" "net/http"
) )
import _ "net/http/pprof"
// Parts from this transporter were heavily influenced by Peter Bougon's // Parts from this transporter were heavily influenced by Peter Bougon's
// raft implementation: https://github.com/peterbourgon/raft // raft implementation: https://github.com/peterbourgon/raft
@ -38,6 +40,7 @@ type HTTPMuxer interface {
// Creates a new HTTP transporter with the given path prefix. // Creates a new HTTP transporter with the given path prefix.
func NewHTTPTransporter(prefix string) *HTTPTransporter { func NewHTTPTransporter(prefix string) *HTTPTransporter {
return &HTTPTransporter{ return &HTTPTransporter{
DisableKeepAlives: false,
prefix: prefix, prefix: prefix,
appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"), appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"),
requestVotePath: fmt.Sprintf("%s%s", prefix, "/requestVote"), requestVotePath: fmt.Sprintf("%s%s", prefix, "/requestVote"),

View File

@ -4,6 +4,8 @@ import (
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"os"
"runtime/pprof"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -76,6 +78,27 @@ func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTrans
// Wait for configuration to propagate. // Wait for configuration to propagate.
time.Sleep(testHeartbeatTimeout * 2) time.Sleep(testHeartbeatTimeout * 2)
f, _ := os.Create("raftprof")
pprof.StartCPUProfile(f)
c := make(chan bool)
start := time.Now()
for i := 0; i < 1000; i++ {
go send(c, (*servers)[0])
}
for i := 0; i < 1000; i++ {
<-c
}
end := time.Now()
fmt.Println(end.Sub(start), "commands ", 1000*20)
pprof.StopCPUProfile()
// Wait for configuration to propagate.
time.Sleep(testHeartbeatTimeout * 2)
// Execute all the callbacks at the same time. // Execute all the callbacks at the same time.
for _i, _f := range callbacks { for _i, _f := range callbacks {
i, f := _i, _f i, f := _i, _f
@ -88,3 +111,10 @@ func runTestHttpServers(t *testing.T, servers *[]*Server, transporter *HTTPTrans
// Wait until everything is done. // Wait until everything is done.
wg.Wait() wg.Wait()
} }
func send(c chan bool, s *Server) {
for i := 0; i < 20; i++ {
s.Do(&NOPCommand{})
}
c <- true
}

7
log.go
View File

@ -2,8 +2,10 @@ package raft
import ( import (
"bufio" "bufio"
"code.google.com/p/goprotobuf/proto"
"errors" "errors"
"fmt" "fmt"
"github.com/benbjohnson/go-raft/protobuf"
"io" "io"
"os" "os"
"sync" "sync"
@ -26,6 +28,8 @@ type Log struct {
mutex sync.RWMutex mutex sync.RWMutex
startIndex uint64 // the index before the first entry in the Log entries startIndex uint64 // the index before the first entry in the Log entries
startTerm uint64 startTerm uint64
pBuffer *proto.Buffer
pLogEntry *protobuf.ProtoLogEntry
} }
// The results of the applying a log entry. // The results of the applying a log entry.
@ -44,6 +48,8 @@ type logResult struct {
func newLog() *Log { func newLog() *Log {
return &Log{ return &Log{
entries: make([]*LogEntry, 0), entries: make([]*LogEntry, 0),
pBuffer: proto.NewBuffer(nil),
pLogEntry: &protobuf.ProtoLogEntry{},
} }
} }
@ -470,6 +476,7 @@ func (l *Log) appendEntries(entries []*LogEntry) error {
var err error var err error
// Append each entry but exit if we hit an error. // Append each entry but exit if we hit an error.
for _, entry := range entries { for _, entry := range entries {
entry.log = l
if size, err = l.writeEntry(entry, w); err != nil { if size, err = l.writeEntry(entry, w); err != nil {
return err return err
} }

View File

@ -9,9 +9,6 @@ import (
"io" "io"
) )
var p = proto.NewBuffer(nil)
var pb = &protobuf.ProtoLogEntry{}
// A log entry stores a single item in the log. // A log entry stores a single item in the log.
type LogEntry struct { type LogEntry struct {
log *Log log *Log
@ -53,26 +50,26 @@ func newLogEntry(log *Log, index uint64, term uint64, command Command) (*LogEntr
// Encodes the log entry to a buffer. Returns the number of bytes // Encodes the log entry to a buffer. Returns the number of bytes
// written and any error that may have occurred. // written and any error that may have occurred.
func (e *LogEntry) encode(w io.Writer) (int, error) { func (e *LogEntry) encode(w io.Writer) (int, error) {
defer p.Reset() defer e.log.pBuffer.Reset()
pb.Index = proto.Uint64(e.Index) e.log.pLogEntry.Index = proto.Uint64(e.Index)
pb.Term = proto.Uint64(e.Term) e.log.pLogEntry.Term = proto.Uint64(e.Term)
pb.CommandName = proto.String(e.CommandName) e.log.pLogEntry.CommandName = proto.String(e.CommandName)
pb.Command = e.Command e.log.pLogEntry.Command = e.Command
err := p.Marshal(pb) err := e.log.pBuffer.Marshal(e.log.pLogEntry)
if err != nil { if err != nil {
return -1, err return -1, err
} }
_, err = fmt.Fprintf(w, "%8x\n", len(p.Bytes())) _, err = fmt.Fprintf(w, "%8x\n", len(e.log.pBuffer.Bytes()))
if err != nil { if err != nil {
return -1, err return -1, err
} }
return w.Write(p.Bytes()) return w.Write(e.log.pBuffer.Bytes())
} }
// Decodes the log entry from a buffer. Returns the number of bytes read and // Decodes the log entry from a buffer. Returns the number of bytes read and

View File

@ -62,9 +62,10 @@ func TestLogNewLog(t *testing.T) {
// Ensure that we can decode and encode to an existing log. // Ensure that we can decode and encode to an existing log.
func TestLogExistingLog(t *testing.T) { func TestLogExistingLog(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20}) tmpLog := newLog()
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100}) e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val: "bar", I: 0}) e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
log, path := setupLog([]*LogEntry{e0, e1, e2}) log, path := setupLog([]*LogEntry{e0, e1, e2})
defer log.close() defer log.close()
defer os.Remove(path) defer os.Remove(path)
@ -86,9 +87,10 @@ func TestLogExistingLog(t *testing.T) {
// Ensure that we can check the contents of the log by index/term. // Ensure that we can check the contents of the log by index/term.
func TestLogContainsEntries(t *testing.T) { func TestLogContainsEntries(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20}) tmpLog := newLog()
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100}) e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val: "bar", I: 0}) e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
log, path := setupLog([]*LogEntry{e0, e1, e2}) log, path := setupLog([]*LogEntry{e0, e1, e2})
defer log.close() defer log.close()
defer os.Remove(path) defer os.Remove(path)
@ -112,8 +114,9 @@ func TestLogContainsEntries(t *testing.T) {
// Ensure that we can recover from an incomplete/corrupt log and continue logging. // Ensure that we can recover from an incomplete/corrupt log and continue logging.
func TestLogRecovery(t *testing.T) { func TestLogRecovery(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20}) tmpLog := newLog()
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100}) e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
f, _ := ioutil.TempFile("", "raft-log-") f, _ := ioutil.TempFile("", "raft-log-")
e0.encode(f) e0.encode(f)

View File

@ -29,7 +29,7 @@ const (
) )
const ( const (
MaxLogEntriesPerRequest = 200 MaxLogEntriesPerRequest = 2000
NumberOfLogEntriesAfterSnapshot = 200 NumberOfLogEntriesAfterSnapshot = 200
) )

View File

@ -102,9 +102,10 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
// Ensure that a vote request is denied if the log is out of date. // Ensure that a vote request is denied if the log is out of date.
func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) { func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20}) tmpLog := newLog()
e1, _ := newLogEntry(nil, 2, 1, &testCommand2{X: 100}) e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
e2, _ := newLogEntry(nil, 3, 2, &testCommand1{Val: "bar", I: 0}) e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2}) server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2})
// start as a follower with term 2 and index 3 // start as a follower with term 2 and index 3
@ -143,7 +144,7 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
// // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader. // // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader.
func TestServerPromoteSelf(t *testing.T) { func TestServerPromoteSelf(t *testing.T) {
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20}) e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0}) server := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0})
// start as a follower // start as a follower

View File

@ -8,8 +8,8 @@ import (
) )
const ( const (
testHeartbeatTimeout = 5 * time.Millisecond testHeartbeatTimeout = 50 * time.Millisecond
testElectionTimeout = 20 * time.Millisecond testElectionTimeout = 200 * time.Millisecond
) )
func init() { func init() {
@ -85,7 +85,7 @@ func newTestServerWithLog(name string, transporter Transporter, entries []*LogEn
func newTestCluster(names []string, transporter Transporter, lookup map[string]*Server) []*Server { func newTestCluster(names []string, transporter Transporter, lookup map[string]*Server) []*Server {
servers := []*Server{} servers := []*Server{}
e0, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 20}) e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
for _, name := range names { for _, name := range names {
if lookup[name] != nil { if lookup[name] != nil {