move MLPR frmo log to server

pull/820/head
Xiang Li 2013-07-14 22:48:41 -07:00
parent 2e75d0846e
commit 557c735f69
3 changed files with 34 additions and 34 deletions

32
log.go
View File

@ -9,10 +9,6 @@ import (
"sync" "sync"
) )
const (
MaxLogEntriesPerRequest = 200
)
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// //
// Typedefs // Typedefs
@ -21,16 +17,15 @@ const (
// A log is a collection of log entries that are persisted to durable storage. // A log is a collection of log entries that are persisted to durable storage.
type Log struct { type Log struct {
ApplyFunc func(Command) (interface{}, error) ApplyFunc func(Command) (interface{}, error)
file *os.File file *os.File
path string path string
entries []*LogEntry entries []*LogEntry
results []*logResult results []*logResult
commitIndex uint64 commitIndex uint64
mutex sync.RWMutex mutex sync.RWMutex
startIndex uint64 // the index before the first entry in the Log entries startIndex uint64 // the index before the first entry in the Log entries
startTerm uint64 startTerm uint64
maxLogEntriesPerRequest uint64
} }
// The results of the applying a log entry. // The results of the applying a log entry.
@ -48,8 +43,7 @@ type logResult struct {
// Creates a new log. // Creates a new log.
func newLog() *Log { func newLog() *Log {
return &Log{ return &Log{
entries: make([]*LogEntry, 0), entries: make([]*LogEntry, 0),
maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
} }
} }
@ -239,7 +233,7 @@ func (l *Log) containsEntry(index uint64, term uint64) bool {
// Retrieves a list of entries after a given index as well as the term of the // Retrieves a list of entries after a given index as well as the term of the
// index provided. A nil list of entries is returned if the index no longer // index provided. A nil list of entries is returned if the index no longer
// exists because a snapshot was made. // exists because a snapshot was made.
func (l *Log) getEntriesAfter(index uint64) ([]*LogEntry, uint64) { func (l *Log) getEntriesAfter(index uint64, maxLogEntriesPerRequest uint64) ([]*LogEntry, uint64) {
l.mutex.Lock() l.mutex.Lock()
defer l.mutex.Unlock() defer l.mutex.Unlock()
@ -265,11 +259,11 @@ func (l *Log) getEntriesAfter(index uint64) ([]*LogEntry, uint64) {
entries := l.entries[index-l.startIndex:] entries := l.entries[index-l.startIndex:]
length := len(entries) length := len(entries)
if uint64(length) < l.maxLogEntriesPerRequest { if uint64(length) < maxLogEntriesPerRequest {
// Determine the term at the given entry and return a subslice. // Determine the term at the given entry and return a subslice.
return entries, l.entries[index-1-l.startIndex].Term return entries, l.entries[index-1-l.startIndex].Term
} else { } else {
return entries[:l.maxLogEntriesPerRequest], l.entries[index-1-l.startIndex].Term return entries[:maxLogEntriesPerRequest], l.entries[index-1-l.startIndex].Term
} }
} }

View File

@ -139,7 +139,7 @@ func (p *Peer) heartbeat(c chan bool) {
case <-time.After(p.heartbeatTimeout): case <-time.After(p.heartbeatTimeout):
debugln("peer.heartbeat.run: ", p.Name()) debugln("peer.heartbeat.run: ", p.Name())
prevLogIndex := p.getPrevLogIndex() prevLogIndex := p.getPrevLogIndex()
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex) entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
if p.server.State() != Leader { if p.server.State() != Leader {
return return

View File

@ -26,6 +26,10 @@ const (
Leader = "leader" Leader = "leader"
) )
const (
MaxLogEntriesPerRequest = 200
)
const ( const (
DefaultHeartbeatTimeout = 50 * time.Millisecond DefaultHeartbeatTimeout = 50 * time.Millisecond
DefaultElectionTimeout = 150 * time.Millisecond DefaultElectionTimeout = 150 * time.Millisecond
@ -70,9 +74,10 @@ type Server struct {
electionTimeout time.Duration electionTimeout time.Duration
heartbeatTimeout time.Duration heartbeatTimeout time.Duration
currentSnapshot *Snapshot currentSnapshot *Snapshot
lastSnapshot *Snapshot lastSnapshot *Snapshot
stateMachine StateMachine stateMachine StateMachine
maxLogEntriesPerRequest uint64
} }
// An event to be processed by the server's event loop. // An event to be processed by the server's event loop.
@ -98,17 +103,18 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
} }
s := &Server{ s := &Server{
name: name, name: name,
path: path, path: path,
transporter: transporter, transporter: transporter,
stateMachine: stateMachine, stateMachine: stateMachine,
context: context, context: context,
state: Stopped, state: Stopped,
peers: make(map[string]*Peer), peers: make(map[string]*Peer),
log: newLog(), log: newLog(),
c: make(chan *event, 256), c: make(chan *event, 256),
electionTimeout: DefaultElectionTimeout, electionTimeout: DefaultElectionTimeout,
heartbeatTimeout: DefaultHeartbeatTimeout, heartbeatTimeout: DefaultHeartbeatTimeout,
maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
} }
// Setup apply function. // Setup apply function.