From 50b6ace75f239b3c2a5c6307837f859e747bb6bd Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 20 Nov 2017 13:42:10 -0700 Subject: [PATCH] Fix wait reused while disabling compactions --- CHANGELOG.md | 1 + tsdb/engine/tsm1/engine.go | 56 ++++++++++++++++++++++++++++++--- tsdb/engine/tsm1/engine_test.go | 29 +++++++++++++++++ 3 files changed, 81 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d892741234..1fe0b1d557 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - [#9065](https://github.com/influxdata/influxdb/pull/9065): Refuse extra arguments to influx CLI - [#9058](https://github.com/influxdata/influxdb/issues/9058): Fix space required after regex operator. Thanks @stop-start! +- [#9109](https://github.com/influxdata/influxdb/issues/9109): Fix: panic: sync: WaitGroup is reused before previous Wait has returned ## v1.4.2 [2017-11-15] diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index f28d5070bd..a85930116e 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -282,17 +282,44 @@ func (e *Engine) disableLevelCompactions(wait bool) { e.levelWorkers += 1 } + // Hold onto the current done channel so we can wait on it if necessary + waitCh := e.done + if old == 0 && e.done != nil { + // It's possible we have closed the done channel and released the lock and another + // goroutine has attempted to disable compactions. We're current in the process of + // disabling them so check for this and wait until the original completes. + select { + case <-e.done: + e.mu.Unlock() + return + default: + } + // Prevent new compactions from starting e.Compactor.DisableCompactions() // Stop all background compaction goroutines close(e.done) - e.done = nil + e.mu.Unlock() + e.wg.Wait() + // Signal that all goroutines have exited. + e.mu.Lock() + e.done = nil + e.mu.Unlock() + return + } + e.mu.Unlock() + + // Compaction were already disabled. + if waitCh == nil { + return } - e.mu.Unlock() + // We were not the first caller to disable compactions and they were in the process + // of being disabled. Wait for them to complete before returning. + <-waitCh e.wg.Wait() } @@ -323,15 +350,34 @@ func (e *Engine) enableSnapshotCompactions() { func (e *Engine) disableSnapshotCompactions() { e.mu.Lock() - + var wait bool if e.snapDone != nil { + // We may be in the process of stopping snapshots. See if the channel + // was closed. + select { + case <-e.snapDone: + e.mu.Unlock() + return + default: + } + close(e.snapDone) - e.snapDone = nil e.Compactor.DisableSnapshots() + wait = true + + } + e.mu.Unlock() + + // Wait for the snapshot goroutine to exit. + if wait { + e.snapWG.Wait() } + // Signal that the goroutines are exit and everything is stopped by setting + // snapDone to nil. + e.mu.Lock() + e.snapDone = nil e.mu.Unlock() - e.snapWG.Wait() // If the cache is empty, free up its resources as well. if e.Cache.Size() == 0 { diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 470e30974a..84c37d8281 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -816,6 +816,35 @@ func TestEngine_CreateCursor_Descending(t *testing.T) { } } +// This test ensures that "sync: WaitGroup is reused before previous Wait has returned" is +// is not raised. +func TestEngine_DisableEnableCompactions_Concurrent(t *testing.T) { + t.Parallel() + + e := MustOpenDefaultEngine() + defer e.Close() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + e.SetCompactionsEnabled(true) + e.SetCompactionsEnabled(false) + } + }() + + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + e.SetCompactionsEnabled(false) + e.SetCompactionsEnabled(true) + } + }() + wg.Wait() +} + func BenchmarkEngine_CreateIterator_Count_1K(b *testing.B) { benchmarkEngineCreateIteratorCount(b, 1000) }