From 557c735f693047bdd45c59db8bd5f36fefc014ec Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 14 Jul 2013 22:48:41 -0700 Subject: [PATCH] move MLPR frmo log to server --- log.go | 32 +++++++++++++------------------- peer.go | 2 +- server.go | 34 ++++++++++++++++++++-------------- 3 files changed, 34 insertions(+), 34 deletions(-) diff --git a/log.go b/log.go index 2e5f79d10b..4d4e885657 100644 --- a/log.go +++ b/log.go @@ -9,10 +9,6 @@ import ( "sync" ) -const ( - MaxLogEntriesPerRequest = 200 -) - //------------------------------------------------------------------------------ // // Typedefs @@ -21,16 +17,15 @@ const ( // A log is a collection of log entries that are persisted to durable storage. type Log struct { - ApplyFunc func(Command) (interface{}, error) - file *os.File - path string - entries []*LogEntry - results []*logResult - commitIndex uint64 - mutex sync.RWMutex - startIndex uint64 // the index before the first entry in the Log entries - startTerm uint64 - maxLogEntriesPerRequest uint64 + ApplyFunc func(Command) (interface{}, error) + file *os.File + path string + entries []*LogEntry + results []*logResult + commitIndex uint64 + mutex sync.RWMutex + startIndex uint64 // the index before the first entry in the Log entries + startTerm uint64 } // The results of the applying a log entry. @@ -48,8 +43,7 @@ type logResult struct { // Creates a new log. func newLog() *Log { return &Log{ - entries: make([]*LogEntry, 0), - maxLogEntriesPerRequest: MaxLogEntriesPerRequest, + entries: make([]*LogEntry, 0), } } @@ -239,7 +233,7 @@ func (l *Log) containsEntry(index uint64, term uint64) bool { // Retrieves a list of entries after a given index as well as the term of the // index provided. A nil list of entries is returned if the index no longer // exists because a snapshot was made. -func (l *Log) getEntriesAfter(index uint64) ([]*LogEntry, uint64) { +func (l *Log) getEntriesAfter(index uint64, maxLogEntriesPerRequest uint64) ([]*LogEntry, uint64) { l.mutex.Lock() defer l.mutex.Unlock() @@ -265,11 +259,11 @@ func (l *Log) getEntriesAfter(index uint64) ([]*LogEntry, uint64) { entries := l.entries[index-l.startIndex:] length := len(entries) - if uint64(length) < l.maxLogEntriesPerRequest { + if uint64(length) < maxLogEntriesPerRequest { // Determine the term at the given entry and return a subslice. return entries, l.entries[index-1-l.startIndex].Term } else { - return entries[:l.maxLogEntriesPerRequest], l.entries[index-1-l.startIndex].Term + return entries[:maxLogEntriesPerRequest], l.entries[index-1-l.startIndex].Term } } diff --git a/peer.go b/peer.go index 23476f81fa..9a97142dec 100644 --- a/peer.go +++ b/peer.go @@ -139,7 +139,7 @@ func (p *Peer) heartbeat(c chan bool) { case <-time.After(p.heartbeatTimeout): debugln("peer.heartbeat.run: ", p.Name()) prevLogIndex := p.getPrevLogIndex() - entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex) + entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest) if p.server.State() != Leader { return diff --git a/server.go b/server.go index 7607679a1c..da89a542c0 100644 --- a/server.go +++ b/server.go @@ -26,6 +26,10 @@ const ( Leader = "leader" ) +const ( + MaxLogEntriesPerRequest = 200 +) + const ( DefaultHeartbeatTimeout = 50 * time.Millisecond DefaultElectionTimeout = 150 * time.Millisecond @@ -70,9 +74,10 @@ type Server struct { electionTimeout time.Duration heartbeatTimeout time.Duration - currentSnapshot *Snapshot - lastSnapshot *Snapshot - stateMachine StateMachine + currentSnapshot *Snapshot + lastSnapshot *Snapshot + stateMachine StateMachine + maxLogEntriesPerRequest uint64 } // An event to be processed by the server's event loop. @@ -98,17 +103,18 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S } s := &Server{ - name: name, - path: path, - transporter: transporter, - stateMachine: stateMachine, - context: context, - state: Stopped, - peers: make(map[string]*Peer), - log: newLog(), - c: make(chan *event, 256), - electionTimeout: DefaultElectionTimeout, - heartbeatTimeout: DefaultHeartbeatTimeout, + name: name, + path: path, + transporter: transporter, + stateMachine: stateMachine, + context: context, + state: Stopped, + peers: make(map[string]*Peer), + log: newLog(), + c: make(chan *event, 256), + electionTimeout: DefaultElectionTimeout, + heartbeatTimeout: DefaultHeartbeatTimeout, + maxLogEntriesPerRequest: MaxLogEntriesPerRequest, } // Setup apply function.