From 0d744dafed22e18629b642293ba825a1405378b8 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Tue, 25 Aug 2015 18:23:24 -0400 Subject: [PATCH] Fix metafile so it doesn't get trampled by other goroutines. Fixes #3832 and fixes #3833 --- tsdb/engine/wal/wal.go | 35 ++++++++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/tsdb/engine/wal/wal.go b/tsdb/engine/wal/wal.go index 0c64d0caa2..e6fa4b8c32 100644 --- a/tsdb/engine/wal/wal.go +++ b/tsdb/engine/wal/wal.go @@ -325,14 +325,14 @@ func (l *Log) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map // is meant to be called by bz1 BEFORE it updates its own index, since the metadata // is flushed here first. func (l *Log) DeleteSeries(keys []string) error { - // we want to stop any writes from happening to ensure the data gets cleared - l.mu.Lock() - defer l.mu.Unlock() - if err := l.flushMetadata(); err != nil { return err } + // we want to stop any writes from happening to ensure the data gets cleared + l.mu.Lock() + defer l.mu.Unlock() + for _, p := range l.partitions { p.deleteSeries(keys) } @@ -435,6 +435,9 @@ func (l *Log) writeSeriesAndFields(fields map[string]*tsdb.MeasurementFields, se // nextMetaFile will close the current file if there is one open and open a new file to log // metadata updates to. This function assumes that you've locked l.mu elsewhere. func (l *Log) nextMetaFile() error { + l.mu.Lock() + defer l.mu.Unlock() + if l.metaFile != nil { if err := l.metaFile.Close(); err != nil { return err @@ -455,7 +458,7 @@ func (l *Log) nextMetaFile() error { return err } - id = int(n) + id = int(n) + 1 } nextFileName := filepath.Join(l.path, fmt.Sprintf("%06d.%s", id, MetaFileExtension)) @@ -620,6 +623,28 @@ func (l *Log) autoflusher(closing chan struct{}) { // metadata from previous files to the index. After a sucessful write, the metadata files // will be removed. While the flush to index is happening we aren't blocked for new metadata writes. func (l *Log) flushMetadata() error { + // make sure there's actually something in the metadata file to flush + size, err := func() (int64, error) { + l.mu.Lock() + defer l.mu.Unlock() + + if l.metaFile == nil { + return 0, nil + } + st, err := l.metaFile.Stat() + if err != nil { + return 0, err + } + + return st.Size(), nil + }() + if err != nil { + return err + } else if size == 0 { + return nil + } + + // we have data, get a list of the existing files and rotate to a new one, then flush files, err := l.metadataFiles() if err != nil { return err