Re-use WALSegmentReaders at startup

pull/8804/head
Jason Wilder 2017-09-07 12:56:17 -06:00
parent e39276b96f
commit 5581f8b4ae
2 changed files with 22 additions and 5 deletions

View File

@ -601,6 +601,8 @@ func NewCacheLoader(files []string) *CacheLoader {
// file is truncated up to and including the last valid byte, and processing
// continues with the next segment file.
func (cl *CacheLoader) Load(cache *Cache) error {
var r *WALSegmentReader
for _, fn := range cl.files {
if err := func() error {
f, err := os.OpenFile(fn, os.O_CREATE|os.O_RDWR, 0666)
@ -621,8 +623,12 @@ func (cl *CacheLoader) Load(cache *Cache) error {
return nil
}
r := NewWALSegmentReader(f)
defer r.Close()
if r == nil {
r = NewWALSegmentReader(f)
defer r.Close()
} else {
r.Reset(f)
}
for r.Next() {
entry, err := r.Read()
@ -647,7 +653,7 @@ func (cl *CacheLoader) Load(cache *Cache) error {
}
}
return nil
return r.Close()
}(); err != nil {
return err
}

View File

@ -1081,7 +1081,7 @@ func (w *WALSegmentWriter) close() error {
// WALSegmentReader reads WAL segments.
type WALSegmentReader struct {
rc io.ReadCloser
r io.Reader
r *bufio.Reader
entry WALEntry
n int64
err error
@ -1091,10 +1091,18 @@ type WALSegmentReader struct {
func NewWALSegmentReader(r io.ReadCloser) *WALSegmentReader {
return &WALSegmentReader{
rc: r,
r: bufio.NewReaderSize(r, 1024*1024),
r: bufio.NewReader(r),
}
}
func (r *WALSegmentReader) Reset(rc io.ReadCloser) {
r.rc = rc
r.r.Reset(rc)
r.entry = nil
r.n = 0
r.err = nil
}
// Next indicates if there is a value to read.
func (r *WALSegmentReader) Next() bool {
var nReadOK int
@ -1187,6 +1195,9 @@ func (r *WALSegmentReader) Error() error {
// Close closes the underlying io.Reader.
func (r *WALSegmentReader) Close() error {
if r.rc == nil {
return nil
}
return r.rc.Close()
}