From 95efaa7a0becd83f88b97d8a706e5cab34aae59c Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 24 Jul 2013 11:03:20 -0700 Subject: [PATCH 1/2] write multiple entries with a buffered io --- log.go | 42 +++++++++++++++++++++++++++++++++++++++++- server.go | 1 + 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/log.go b/log.go index b6801d4f41..f20ab23a8e 100644 --- a/log.go +++ b/log.go @@ -464,12 +464,20 @@ func (l *Log) appendEntries(entries []*LogEntry) error { l.mutex.Lock() defer l.mutex.Unlock() + startPosition, _ := l.file.Seek(0, os.SEEK_CUR) + + w := bufio.NewWriter(l.file) + + var size int64 + var err error // Append each entry but exit if we hit an error. for _, entry := range entries { - if err := l.appendEntry(entry); err != nil { + if size, err = l.appendEntryWithBuffer(entry, w, startPosition); err != nil { return err } + startPosition += size } + w.Flush() return nil } @@ -508,6 +516,38 @@ func (l *Log) appendEntry(entry *LogEntry) error { return nil } +// appendEntry with Buffered io +func (l *Log) appendEntryWithBuffer(entry *LogEntry, w io.Writer, startPosition int64) (int64, error) { + if l.file == nil { + return -1, errors.New("raft.Log: Log is not open") + } + + // Make sure the term and index are greater than the previous. + if len(l.entries) > 0 { + lastEntry := l.entries[len(l.entries)-1] + if entry.Term < lastEntry.Term { + return -1, fmt.Errorf("raft.Log: Cannot append entry with earlier term (%x:%x <= %x:%x)", entry.Term, entry.Index, lastEntry.Term, lastEntry.Index) + } else if entry.Term == lastEntry.Term && entry.Index <= lastEntry.Index { + return -1, fmt.Errorf("raft.Log: Cannot append entry with earlier index in the same term (%x:%x <= %x:%x)", entry.Term, entry.Index, lastEntry.Term, lastEntry.Index) + } + } + + entry.Position = startPosition + + var size int + var err error + // Write to storage. + if size, err = entry.encode(w); err != nil { + return -1, err + } + + // Append to entries list if stored on disk. + l.entries = append(l.entries, entry) + l.results = append(l.results, nil) + + return int64(size), nil +} + //-------------------------------------- // Log compaction //-------------------------------------- diff --git a/server.go b/server.go index 8f9c86251b..f95d8bdaf0 100644 --- a/server.go +++ b/server.go @@ -724,6 +724,7 @@ func (s *Server) processCommand(command Command, e *event) { e.c <- err return } + if err := s.log.appendEntry(entry); err != nil { s.debugln("server.command.log.error:", err) e.c <- err From 085da0973e7486a0b6a9e0872bc249e539bac0c8 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Wed, 24 Jul 2013 11:43:33 -0700 Subject: [PATCH 2/2] refactor on buffer writes --- log.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/log.go b/log.go index f20ab23a8e..d380c93ef3 100644 --- a/log.go +++ b/log.go @@ -472,9 +472,10 @@ func (l *Log) appendEntries(entries []*LogEntry) error { var err error // Append each entry but exit if we hit an error. for _, entry := range entries { - if size, err = l.appendEntryWithBuffer(entry, w, startPosition); err != nil { + if size, err = l.writeEntry(entry, w); err != nil { return err } + entry.Position = startPosition startPosition += size } w.Flush() @@ -517,7 +518,7 @@ func (l *Log) appendEntry(entry *LogEntry) error { } // appendEntry with Buffered io -func (l *Log) appendEntryWithBuffer(entry *LogEntry, w io.Writer, startPosition int64) (int64, error) { +func (l *Log) writeEntry(entry *LogEntry, w io.Writer) (int64, error) { if l.file == nil { return -1, errors.New("raft.Log: Log is not open") } @@ -532,12 +533,9 @@ func (l *Log) appendEntryWithBuffer(entry *LogEntry, w io.Writer, startPosition } } - entry.Position = startPosition - - var size int - var err error // Write to storage. - if size, err = entry.encode(w); err != nil { + size, err := entry.encode(w) + if err != nil { return -1, err }