From b66cb1a0ded3adbe56041a8a8e1e168e46cd05bc Mon Sep 17 00:00:00 2001 From: Baruch Even Date: Mon, 2 Dec 2013 23:26:53 +0200 Subject: [PATCH 1/4] Correct replacing of log file Do not leak resources in case of errors and ensure data is synced to disk before replacing files. --- log.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/log.go b/log.go index 39da19d95c..182f8253c2 100644 --- a/log.go +++ b/log.go @@ -573,7 +573,8 @@ func (l *Log) compact(index uint64, term uint64) error { } // create a new log file and add all the entries - file, err := os.OpenFile(l.path+".new", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) + new_file_path := l.path + ".new" + file, err := os.OpenFile(new_file_path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) if err != nil { return err } @@ -582,25 +583,27 @@ func (l *Log) compact(index uint64, term uint64) error { entry.Position = position if _, err = entry.encode(file); err != nil { + file.Close() + os.Remove(new_file_path) return err } } - // close the current log file - l.file.Close() + file.Sync() - // remove the current log file to .bak - err = os.Remove(l.path) - if err != nil { - return err - } + old_file := l.file // rename the new log file - err = os.Rename(l.path+".new", l.path) + err = os.Rename(new_file_path, l.path) if err != nil { + file.Close() + os.Remove(new_file_path) return err } l.file = file + // close the old log file + old_file.Close() + // compaction the in memory log l.entries = entries l.startIndex = index From 3f983817390ab6ed0022834b9e637b0ab5a81657 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 10 Jan 2014 20:45:53 +0800 Subject: [PATCH 2/4] fix(sync) leader should do a sync before committing log entires As we do not fsync every time log writes to file when the state is leader, we need to do fsync before actually committing the log entries to ensure safety. --- log.go | 7 ++++++- server.go | 2 ++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/log.go b/log.go index 182f8253c2..d14207f25d 100644 --- a/log.go +++ b/log.go @@ -206,6 +206,11 @@ func (l *Log) close() { l.entries = make([]*LogEntry, 0) } +// sync to disk +func (l *Log) sync() error { + return l.file.Sync() +} + //-------------------------------------- // Entries //-------------------------------------- @@ -477,7 +482,7 @@ func (l *Log) appendEntries(entries []*LogEntry) error { startPosition += size } w.Flush() - err = l.file.Sync() + err = l.sync() if err != nil { panic(err) diff --git a/server.go b/server.go index 9cf3a4336d..8303ae1fea 100644 --- a/server.go +++ b/server.go @@ -960,6 +960,8 @@ func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) { committedIndex := s.log.commitIndex if commitIndex > committedIndex { + // leader needs to do a fsync before committing log entries + s.log.sync() s.log.setCommitIndex(commitIndex) s.debugln("commit index ", commitIndex) } From 4a4b7391a3f8d570da3560d0222657a66139e7ea Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 10 Jan 2014 20:49:33 +0800 Subject: [PATCH 3/4] Save config file atomically --- server.go | 2 +- util.go | 25 +++++++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) create mode 100644 util.go diff --git a/server.go b/server.go index 8303ae1fea..e3b6005dfd 100644 --- a/server.go +++ b/server.go @@ -1331,7 +1331,7 @@ func (s *server) writeConf() { confPath := path.Join(s.path, "conf") tmpConfPath := path.Join(s.path, "conf.tmp") - err := ioutil.WriteFile(tmpConfPath, b, 0600) + err := writeFileSynced(tmpConfPath, b, 0600) if err != nil { panic(err) diff --git a/util.go b/util.go new file mode 100644 index 0000000000..ed0acd2e06 --- /dev/null +++ b/util.go @@ -0,0 +1,25 @@ +package raft + +import ( + "io" + "os" +) + +// WriteFile writes data to a file named by filename. +// If the file does not exist, WriteFile creates it with permissions perm; +// otherwise WriteFile truncates it before writing. +// This is copied from ioutil.WriteFile with the addition of a Sync call to +// ensure the data reaches the disk. +func writeFileSynced(filename string, data []byte, perm os.FileMode) error { + f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) + if err != nil { + return err + } + n, err := f.Write(data) + f.Sync() + f.Close() + if err == nil && n < len(data) { + err = io.ErrShortWrite + } + return err +} From 8ca39ae2237115edc31fd6d8312daac290a1cb87 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 15 Jan 2014 20:26:54 +0800 Subject: [PATCH 4/4] fix(util.go) make writeFileSynced return error properly --- util.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/util.go b/util.go index ed0acd2e06..ff3d62f0c3 100644 --- a/util.go +++ b/util.go @@ -15,11 +15,17 @@ func writeFileSynced(filename string, data []byte, perm os.FileMode) error { if err != nil { return err } + n, err := f.Write(data) - f.Sync() - f.Close() - if err == nil && n < len(data) { - err = io.ErrShortWrite + if n < len(data) { + f.Close() + return io.ErrShortWrite } - return err + + err = f.Sync() + if err != nil { + return err + } + + return f.Close() }