refactor(server) cleanup snapshot
parent
30f261bfe8
commit
7debffbcbb
63
server.go
63
server.go
|
@ -125,8 +125,14 @@ type server struct {
|
|||
electionTimeout time.Duration
|
||||
heartbeatInterval time.Duration
|
||||
|
||||
snapshot *Snapshot
|
||||
pendingSnapshot *Snapshot
|
||||
snapshot *Snapshot
|
||||
|
||||
// PendingSnapshot is an unfinished snapshot.
|
||||
// After the pendingSnapshot is saved to disk,
|
||||
// it will be set to snapshot and also will be
|
||||
// set to nil.
|
||||
pendingSnapshot *Snapshot
|
||||
|
||||
stateMachine StateMachine
|
||||
maxLogEntriesPerRequest uint64
|
||||
|
||||
|
@ -1083,30 +1089,35 @@ func (s *server) RemovePeer(name string) error {
|
|||
//--------------------------------------
|
||||
|
||||
func (s *server) TakeSnapshot() error {
|
||||
// TODO: put a snapshot mutex
|
||||
s.debugln("take Snapshot")
|
||||
if s.stateMachine == nil {
|
||||
return errors.New("Snapshot: Cannot create snapshot. Missing state machine.")
|
||||
}
|
||||
|
||||
// Shortcut without lock
|
||||
// Exit if the server is currently creating a snapshot.
|
||||
if s.pendingSnapshot != nil {
|
||||
return errors.New("Snapshot: Last snapshot is not finished.")
|
||||
}
|
||||
|
||||
// Exit if there are no logs yet in the system.
|
||||
// TODO: acquire the lock and no more committed is allowed
|
||||
// This will be done after finishing refactoring heartbeat
|
||||
s.debugln("take.snapshot")
|
||||
|
||||
lastIndex, lastTerm := s.log.commitInfo()
|
||||
path := s.SnapshotPath(lastIndex, lastTerm)
|
||||
if lastIndex == 0 {
|
||||
return errors.New("No logs")
|
||||
|
||||
// check if there is log has been committed since the
|
||||
// last snapshot.
|
||||
if lastIndex == s.log.startIndex {
|
||||
return nil
|
||||
}
|
||||
|
||||
var state []byte
|
||||
var err error
|
||||
if s.stateMachine != nil {
|
||||
state, err = s.stateMachine.Save()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
state = []byte{0}
|
||||
path := s.SnapshotPath(lastIndex, lastTerm)
|
||||
// Attach snapshot to pending snapshot and save it to disk.
|
||||
s.pendingSnapshot = &Snapshot{lastIndex, lastTerm, nil, nil, path}
|
||||
|
||||
state, err := s.stateMachine.Save()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Clone the list of peers.
|
||||
|
@ -1116,8 +1127,9 @@ func (s *server) TakeSnapshot() error {
|
|||
}
|
||||
peers = append(peers, &Peer{Name: s.Name(), ConnectionString: s.connectionString})
|
||||
|
||||
// Attach current snapshot and save it to disk.
|
||||
s.pendingSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
|
||||
// Attach snapshot to pending snapshot and save it to disk.
|
||||
s.pendingSnapshot.Peers = peers
|
||||
s.pendingSnapshot.State = state
|
||||
s.saveSnapshot()
|
||||
|
||||
// We keep some log entries after the snapshot.
|
||||
|
@ -1191,7 +1203,7 @@ func (s *server) SnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *Snapshot
|
|||
func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
|
||||
// Recover state sent from request.
|
||||
if err := s.stateMachine.Recovery(req.State); err != nil {
|
||||
return newSnapshotRecoveryResponse(req.LastTerm, false, req.LastIndex)
|
||||
panic("cannot recover from previous state")
|
||||
}
|
||||
|
||||
// Recover the cluster configuration.
|
||||
|
@ -1212,7 +1224,6 @@ func (s *server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *S
|
|||
s.log.compact(req.LastIndex, req.LastTerm)
|
||||
|
||||
return newSnapshotRecoveryResponse(req.LastTerm, true, req.LastIndex)
|
||||
|
||||
}
|
||||
|
||||
// Load a snapshot at restart
|
||||
|
@ -1220,6 +1231,7 @@ func (s *server) LoadSnapshot() error {
|
|||
// Open snapshot/ directory.
|
||||
dir, err := os.OpenFile(path.Join(s.path, "snapshot"), os.O_RDONLY, 0)
|
||||
if err != nil {
|
||||
s.debugln("cannot.open.snapshot: ", err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1232,7 +1244,8 @@ func (s *server) LoadSnapshot() error {
|
|||
dir.Close()
|
||||
|
||||
if len(filenames) == 0 {
|
||||
return errors.New("no snapshot")
|
||||
s.debugln("no.snapshot.to.load")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Grab the latest snapshot.
|
||||
|
@ -1252,7 +1265,7 @@ func (s *server) LoadSnapshot() error {
|
|||
if err != nil {
|
||||
return err
|
||||
} else if n != 1 {
|
||||
return errors.New("Bad snapshot file")
|
||||
return errors.New("checksum.err: bad.snapshot.file")
|
||||
}
|
||||
|
||||
// Load remaining snapshot contents.
|
||||
|
@ -1270,13 +1283,13 @@ func (s *server) LoadSnapshot() error {
|
|||
|
||||
// Decode snapshot.
|
||||
if err = json.Unmarshal(b, &s.snapshot); err != nil {
|
||||
s.debugln("unmarshal error: ", err)
|
||||
s.debugln("unmarshal.snapshot.error: ", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Recover snapshot into state machine.
|
||||
if err = s.stateMachine.Recovery(s.snapshot.State); err != nil {
|
||||
s.debugln("recovery error: ", err)
|
||||
s.debugln("recovery.snapshot.error: ", err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue