diff --git a/log.go b/log.go index b6801d4f41..d380c93ef3 100644 --- a/log.go +++ b/log.go @@ -464,12 +464,21 @@ func (l *Log) appendEntries(entries []*LogEntry) error { l.mutex.Lock() defer l.mutex.Unlock() + startPosition, _ := l.file.Seek(0, os.SEEK_CUR) + + w := bufio.NewWriter(l.file) + + var size int64 + var err error // Append each entry but exit if we hit an error. for _, entry := range entries { - if err := l.appendEntry(entry); err != nil { + if size, err = l.writeEntry(entry, w); err != nil { return err } + entry.Position = startPosition + startPosition += size } + w.Flush() return nil } @@ -508,6 +517,35 @@ func (l *Log) appendEntry(entry *LogEntry) error { return nil } +// appendEntry with Buffered io +func (l *Log) writeEntry(entry *LogEntry, w io.Writer) (int64, error) { + if l.file == nil { + return -1, errors.New("raft.Log: Log is not open") + } + + // Make sure the term and index are greater than the previous. + if len(l.entries) > 0 { + lastEntry := l.entries[len(l.entries)-1] + if entry.Term < lastEntry.Term { + return -1, fmt.Errorf("raft.Log: Cannot append entry with earlier term (%x:%x <= %x:%x)", entry.Term, entry.Index, lastEntry.Term, lastEntry.Index) + } else if entry.Term == lastEntry.Term && entry.Index <= lastEntry.Index { + return -1, fmt.Errorf("raft.Log: Cannot append entry with earlier index in the same term (%x:%x <= %x:%x)", entry.Term, entry.Index, lastEntry.Term, lastEntry.Index) + } + } + + // Write to storage. + size, err := entry.encode(w) + if err != nil { + return -1, err + } + + // Append to entries list if stored on disk. + l.entries = append(l.entries, entry) + l.results = append(l.results, nil) + + return int64(size), nil +} + //-------------------------------------- // Log compaction //-------------------------------------- diff --git a/server.go b/server.go index 8f9c86251b..f95d8bdaf0 100644 --- a/server.go +++ b/server.go @@ -724,6 +724,7 @@ func (s *Server) processCommand(command Command, e *event) { e.c <- err return } + if err := s.log.appendEntry(entry); err != nil { s.debugln("server.command.log.error:", err) e.c <- err