From 102fcd671bf09d25f27bb85b983dbe6f8c3d228e Mon Sep 17 00:00:00 2001 From: David Norton Date: Tue, 12 Nov 2019 18:02:41 -0500 Subject: [PATCH] fix(tsm1): make Digest() safe for concurrent use This change adds a lock around digest creation so that it is safe for concurrent calls. Prior to this change, calls from multiple goroutines resulted in "Digest aborted, problem renaming tmp digest" errors. --- tsdb/engine/tsm1/engine.go | 6 ++++ tsdb/engine/tsm1/engine_test.go | 51 +++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index acfc1ccd82..a6fa11fe61 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -196,6 +196,9 @@ type Engine struct { // seriesTypeMap maps a series key to field type seriesTypeMap *radix.Tree + + // muDigest ensures only one goroutine can generate a digest at a time. + muDigest sync.RWMutex } // NewEngine returns a new instance of Engine. @@ -283,6 +286,9 @@ func (e *Engine) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) { // Digest returns a reader for the shard's digest. func (e *Engine) Digest() (io.ReadCloser, int64, error) { + e.muDigest.Lock() + defer e.muDigest.Unlock() + log, logEnd := logger.NewOperation(e.logger, "Engine digest", "tsm1_digest") defer logEnd() diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 51116b0aeb..a5ccfe1121 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -320,6 +320,57 @@ type span struct { tspan *tsm1.DigestTimeSpan } +// Ensure engine handles concurrent calls to Digest(). +func TestEngine_Digest_Concurrent(t *testing.T) { + e := MustOpenEngine(inmem.IndexName) + defer e.Close() + + if err := e.Open(); err != nil { + t.Fatalf("failed to open tsm1 engine: %s", err.Error()) + } + + // Create a few points. + points := []models.Point{ + MustParsePointString("cpu,host=A value=1.1 1000000000"), + MustParsePointString("cpu,host=B value=1.2 2000000000"), + } + + if err := e.WritePoints(points); err != nil { + t.Fatalf("failed to write points: %s", err.Error()) + } + + // Force a compaction. + e.ScheduleFullCompaction() + + // Start multiple waiting goroutines, ready to call Digest(). + start := make(chan struct{}) + errs := make(chan error) + wg := &sync.WaitGroup{} + for n := 0; n < 100; n++ { + wg.Add(1) + go func() { + defer wg.Done() + <-start + if _, _, err := e.Digest(); err != nil { + errs <- err + } + }() + } + + // Goroutine to close errs channel after all routines have finished. + go func() { wg.Wait(); close(errs) }() + + // Signal all goroutines to call Digest(). + close(start) + + // Check for digest errors. + for err := range errs { + if err != nil { + t.Fatal(err) + } + } +} + // Ensure that the engine will backup any TSM files created since the passed in time func TestEngine_Backup(t *testing.T) { sfile := MustOpenSeriesFile()