pull/820/head
commit
10f8b9f836
18
log.go
18
log.go
|
@ -233,7 +233,7 @@ func (l *Log) containsEntry(index uint64, term uint64) bool {
|
|||
// Retrieves a list of entries after a given index as well as the term of the
|
||||
// index provided. A nil list of entries is returned if the index no longer
|
||||
// exists because a snapshot was made.
|
||||
func (l *Log) getEntriesAfter(index uint64) ([]*LogEntry, uint64) {
|
||||
func (l *Log) getEntriesAfter(index uint64, maxLogEntriesPerRequest uint64) ([]*LogEntry, uint64) {
|
||||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
|
||||
|
@ -256,8 +256,15 @@ func (l *Log) getEntriesAfter(index uint64) ([]*LogEntry, uint64) {
|
|||
|
||||
traceln("log.entriesAfter.partial: ", index, " ", l.entries[len(l.entries)-1].Index)
|
||||
|
||||
// Determine the term at the given entry and return a subslice.
|
||||
return l.entries[index-l.startIndex:], l.entries[index-1-l.startIndex].Term
|
||||
entries := l.entries[index-l.startIndex:]
|
||||
length := len(entries)
|
||||
|
||||
if uint64(length) < maxLogEntriesPerRequest {
|
||||
// Determine the term at the given entry and return a subslice.
|
||||
return entries, l.entries[index-1-l.startIndex].Term
|
||||
} else {
|
||||
return entries[:maxLogEntriesPerRequest], l.entries[index-1-l.startIndex].Term
|
||||
}
|
||||
}
|
||||
|
||||
// Retrieves the return value and error for an entry. The result can only exist
|
||||
|
@ -340,8 +347,11 @@ func (l *Log) setCommitIndex(index uint64) error {
|
|||
l.mutex.Lock()
|
||||
defer l.mutex.Unlock()
|
||||
|
||||
// this is not error any more after limited the number of sending entries
|
||||
// commit up to what we already have
|
||||
if index > l.startIndex+uint64(len(l.entries)) {
|
||||
return fmt.Errorf("raft.Log: Commit index (%d) out of range (%d)", index, len(l.entries))
|
||||
debugln("raft.Log: Commit index", index, "set back to ", len(l.entries))
|
||||
index = l.startIndex + uint64(len(l.entries))
|
||||
}
|
||||
|
||||
// Do not allow previous indices to be committed again.
|
||||
|
|
2
peer.go
2
peer.go
|
@ -139,7 +139,7 @@ func (p *Peer) heartbeat(c chan bool) {
|
|||
case <-time.After(p.heartbeatTimeout):
|
||||
debugln("peer.heartbeat.run: ", p.Name())
|
||||
prevLogIndex := p.getPrevLogIndex()
|
||||
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex)
|
||||
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
|
||||
|
||||
if p.server.State() != Leader {
|
||||
return
|
||||
|
|
34
server.go
34
server.go
|
@ -27,6 +27,10 @@ const (
|
|||
Leader = "leader"
|
||||
)
|
||||
|
||||
const (
|
||||
MaxLogEntriesPerRequest = 200
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultHeartbeatTimeout = 50 * time.Millisecond
|
||||
DefaultElectionTimeout = 150 * time.Millisecond
|
||||
|
@ -72,9 +76,10 @@ type Server struct {
|
|||
electionTimeout time.Duration
|
||||
heartbeatTimeout time.Duration
|
||||
|
||||
currentSnapshot *Snapshot
|
||||
lastSnapshot *Snapshot
|
||||
stateMachine StateMachine
|
||||
currentSnapshot *Snapshot
|
||||
lastSnapshot *Snapshot
|
||||
stateMachine StateMachine
|
||||
maxLogEntriesPerRequest uint64
|
||||
}
|
||||
|
||||
// An event to be processed by the server's event loop.
|
||||
|
@ -100,17 +105,18 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
|
|||
}
|
||||
|
||||
s := &Server{
|
||||
name: name,
|
||||
path: path,
|
||||
transporter: transporter,
|
||||
stateMachine: stateMachine,
|
||||
context: context,
|
||||
state: Stopped,
|
||||
peers: make(map[string]*Peer),
|
||||
log: newLog(),
|
||||
c: make(chan *event, 256),
|
||||
electionTimeout: DefaultElectionTimeout,
|
||||
heartbeatTimeout: DefaultHeartbeatTimeout,
|
||||
name: name,
|
||||
path: path,
|
||||
transporter: transporter,
|
||||
stateMachine: stateMachine,
|
||||
context: context,
|
||||
state: Stopped,
|
||||
peers: make(map[string]*Peer),
|
||||
log: newLog(),
|
||||
c: make(chan *event, 256),
|
||||
electionTimeout: DefaultElectionTimeout,
|
||||
heartbeatTimeout: DefaultHeartbeatTimeout,
|
||||
maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
|
||||
}
|
||||
|
||||
// Setup apply function.
|
||||
|
|
Loading…
Reference in New Issue