fix snapshot related issue

pull/820/head
Xiang Li 2013-09-18 00:19:46 -04:00
parent 0e62627e27
commit dbedc982df
4 changed files with 32 additions and 21 deletions

30
log.go
View File

@ -180,26 +180,23 @@ func (l *Log) open(path string) error {
}
break
}
// Append entry.
l.entries = append(l.entries, entry)
if entry.Index <= l.commitIndex {
command, err := newCommand(entry.CommandName, entry.Command)
if err != nil {
continue
if entry.Index > l.startIndex {
// Append entry.
l.entries = append(l.entries, entry)
if entry.Index <= l.commitIndex {
command, err := newCommand(entry.CommandName, entry.Command)
if err != nil {
continue
}
l.ApplyFunc(command)
}
l.ApplyFunc(command)
debugln("open.log.append log index ", entry.Index)
}
debugln("open.log.append log index ", entry.Index)
readBytes += int64(n)
}
l.results = make([]*logResult, len(l.entries))
l.compact(l.startIndex, l.startTerm)
debugln("open.log.recovery number of log ", len(l.entries))
return nil
}
@ -273,6 +270,8 @@ func (l *Log) getEntriesAfter(index uint64, maxLogEntriesPerRequest uint64) ([]*
entries := l.entries[index-l.startIndex:]
length := len(entries)
traceln("log.entriesAfter: startIndex:", l.startIndex, " lenght", len(l.entries))
if uint64(length) < maxLogEntriesPerRequest {
// Determine the term at the given entry and return a subslice.
return entries, l.entries[index-1-l.startIndex].Term
@ -353,7 +352,10 @@ func (l *Log) lastInfo() (index uint64, term uint64) {
func (l *Log) updateCommitIndex(index uint64) {
l.mutex.Lock()
defer l.mutex.Unlock()
l.commitIndex = index
if index > l.commitIndex {
l.commitIndex = index
}
debugln("update.commit.index ", index)
}
// Updates the commit index and writes entries after that index to the stable storage.

View File

@ -80,6 +80,8 @@ type Server struct {
lastSnapshot *Snapshot
stateMachine StateMachine
maxLogEntriesPerRequest uint64
connectionString string
}
// An event to be processed by the server's event loop.
@ -96,7 +98,7 @@ type event struct {
//------------------------------------------------------------------------------
// Creates a new server with a log at the given path.
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}) (*Server, error) {
func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectiongString string) (*Server, error) {
if name == "" {
return nil, errors.New("raft.Server: Name cannot be blank")
}
@ -117,6 +119,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
electionTimeout: DefaultElectionTimeout,
heartbeatTimeout: DefaultHeartbeatTimeout,
maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
connectionString: connectiongString,
}
// Setup apply function.
@ -1012,10 +1015,16 @@ func (s *Server) TakeSnapshot() error {
state = []byte{0}
}
var peers []*Peer
peers := make([]*Peer, len(s.peers)+1)
i := 0
for _, peer := range s.peers {
peers = append(peers, peer.clone())
peers[i] = peer.clone()
}
peers[i] = &Peer{
Name: s.Name(),
ConnectionString: s.connectionString,
}
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
@ -1256,7 +1265,7 @@ func (s *Server) readConf() error {
return err
}
s.log.commitIndex = conf.CommitIndex
s.log.updateCommitIndex(conf.CommitIndex)
return nil
}

View File

@ -428,7 +428,7 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
for _, name := range names {
server := servers[name]
if server.CommitIndex() != 17 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16)
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 17)
}
server.Stop()
}

View File

@ -65,12 +65,12 @@ func newTestServer(name string, transporter Transporter) *Server {
if err := os.MkdirAll(p, 0644); err != nil {
panic(err.Error())
}
server, _ := NewServer(name, p, transporter, nil, nil)
server, _ := NewServer(name, p, transporter, nil, nil, "")
return server
}
func newTestServerWithPath(name string, transporter Transporter, p string) *Server {
server, _ := NewServer(name, p, transporter, nil, nil)
server, _ := NewServer(name, p, transporter, nil, nil, "")
return server
}