Merge pull request #82 from coreos/master

write multiple entries with a buffered io
pull/820/head
Ben Johnson 2013-07-24 11:46:49 -07:00
commit b6f71abc29
2 changed files with 40 additions and 1 deletions

40
log.go
View File

@ -464,12 +464,21 @@ func (l *Log) appendEntries(entries []*LogEntry) error {
l.mutex.Lock()
defer l.mutex.Unlock()
startPosition, _ := l.file.Seek(0, os.SEEK_CUR)
w := bufio.NewWriter(l.file)
var size int64
var err error
// Append each entry but exit if we hit an error.
for _, entry := range entries {
if err := l.appendEntry(entry); err != nil {
if size, err = l.writeEntry(entry, w); err != nil {
return err
}
entry.Position = startPosition
startPosition += size
}
w.Flush()
return nil
}
@ -508,6 +517,35 @@ func (l *Log) appendEntry(entry *LogEntry) error {
return nil
}
// appendEntry with Buffered io
func (l *Log) writeEntry(entry *LogEntry, w io.Writer) (int64, error) {
if l.file == nil {
return -1, errors.New("raft.Log: Log is not open")
}
// Make sure the term and index are greater than the previous.
if len(l.entries) > 0 {
lastEntry := l.entries[len(l.entries)-1]
if entry.Term < lastEntry.Term {
return -1, fmt.Errorf("raft.Log: Cannot append entry with earlier term (%x:%x <= %x:%x)", entry.Term, entry.Index, lastEntry.Term, lastEntry.Index)
} else if entry.Term == lastEntry.Term && entry.Index <= lastEntry.Index {
return -1, fmt.Errorf("raft.Log: Cannot append entry with earlier index in the same term (%x:%x <= %x:%x)", entry.Term, entry.Index, lastEntry.Term, lastEntry.Index)
}
}
// Write to storage.
size, err := entry.encode(w)
if err != nil {
return -1, err
}
// Append to entries list if stored on disk.
l.entries = append(l.entries, entry)
l.results = append(l.results, nil)
return int64(size), nil
}
//--------------------------------------
// Log compaction
//--------------------------------------

View File

@ -724,6 +724,7 @@ func (s *Server) processCommand(command Command, e *event) {
e.c <- err
return
}
if err := s.log.appendEntry(entry); err != nil {
s.debugln("server.command.log.error:", err)
e.c <- err