Merge pull request #150 from xiangli-cmu/sync_fix

Fix file issue
pull/820/head
Xiang Li 2014-01-15 04:27:30 -08:00
commit 2c0c8f0635
3 changed files with 52 additions and 11 deletions

28
log.go
View File

@ -206,6 +206,11 @@ func (l *Log) close() {
l.entries = make([]*LogEntry, 0)
}
// sync to disk
func (l *Log) sync() error {
return l.file.Sync()
}
//--------------------------------------
// Entries
//--------------------------------------
@ -477,7 +482,7 @@ func (l *Log) appendEntries(entries []*LogEntry) error {
startPosition += size
}
w.Flush()
err = l.file.Sync()
err = l.sync()
if err != nil {
panic(err)
@ -573,7 +578,8 @@ func (l *Log) compact(index uint64, term uint64) error {
}
// create a new log file and add all the entries
file, err := os.OpenFile(l.path+".new", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
new_file_path := l.path + ".new"
file, err := os.OpenFile(new_file_path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
@ -582,25 +588,27 @@ func (l *Log) compact(index uint64, term uint64) error {
entry.Position = position
if _, err = entry.encode(file); err != nil {
file.Close()
os.Remove(new_file_path)
return err
}
}
// close the current log file
l.file.Close()
file.Sync()
// remove the current log file to .bak
err = os.Remove(l.path)
if err != nil {
return err
}
old_file := l.file
// rename the new log file
err = os.Rename(l.path+".new", l.path)
err = os.Rename(new_file_path, l.path)
if err != nil {
file.Close()
os.Remove(new_file_path)
return err
}
l.file = file
// close the old log file
old_file.Close()
// compaction the in memory log
l.entries = entries
l.startIndex = index

View File

@ -960,6 +960,8 @@ func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
committedIndex := s.log.commitIndex
if commitIndex > committedIndex {
// leader needs to do a fsync before committing log entries
s.log.sync()
s.log.setCommitIndex(commitIndex)
s.debugln("commit index ", commitIndex)
}
@ -1329,7 +1331,7 @@ func (s *server) writeConf() {
confPath := path.Join(s.path, "conf")
tmpConfPath := path.Join(s.path, "conf.tmp")
err := ioutil.WriteFile(tmpConfPath, b, 0600)
err := writeFileSynced(tmpConfPath, b, 0600)
if err != nil {
panic(err)

31
util.go Normal file
View File

@ -0,0 +1,31 @@
package raft
import (
"io"
"os"
)
// WriteFile writes data to a file named by filename.
// If the file does not exist, WriteFile creates it with permissions perm;
// otherwise WriteFile truncates it before writing.
// This is copied from ioutil.WriteFile with the addition of a Sync call to
// ensure the data reaches the disk.
func writeFileSynced(filename string, data []byte, perm os.FileMode) error {
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}
n, err := f.Write(data)
if n < len(data) {
f.Close()
return io.ErrShortWrite
}
err = f.Sync()
if err != nil {
return err
}
return f.Close()
}