diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 0d792ad9e2..3d27c81109 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -58,13 +58,13 @@ const ( // Engine represents a storage engine with compressed blocks. type Engine struct { - mu sync.RWMutex - done chan struct{} - snapshotterDone chan struct{} - wg sync.WaitGroup - snapshotterWg sync.WaitGroup - levelCompactionsEnabled bool - snapshotCompactionsEnabled bool + mu sync.RWMutex + wg sync.WaitGroup + done chan struct{} + levelWorkers int + + snapDone chan struct{} + snapWG sync.WaitGroup path string logger *log.Logger // Logger to be used for important messages @@ -155,80 +155,87 @@ func (e *Engine) SetEnabled(enabled bool) { func (e *Engine) SetCompactionsEnabled(enabled bool) { if enabled { e.enableSnapshotCompactions() - e.enableLevelCompactions() - + e.enableLevelCompactions(0) } else { e.disableSnapshotCompactions() - e.disableLevelCompactions() + e.disableLevelCompactions(0) } } -func (e *Engine) enableLevelCompactions() { +func (e *Engine) enableLevelCompactions(n int) { e.mu.Lock() - if e.levelCompactionsEnabled { + e.levelWorkers -= n + if e.levelWorkers != 0 || e.done != nil { + // still waiting on more workers or already enabled e.mu.Unlock() return } - e.levelCompactionsEnabled = true + + // last one to enable, start things back up e.Compactor.EnableCompactions() - e.done = make(chan struct{}) - e.mu.Unlock() + quit := make(chan struct{}) + e.done = quit e.wg.Add(4) - go e.compactTSMFull() - go e.compactTSMLevel(true, 1) - go e.compactTSMLevel(true, 2) - go e.compactTSMLevel(false, 3) -} - -func (e *Engine) disableLevelCompactions() { - e.mu.Lock() - if !e.levelCompactionsEnabled { - e.mu.Unlock() - return - } - // Prevent new compactions from starting - e.levelCompactionsEnabled = false - e.Compactor.DisableCompactions() e.mu.Unlock() - // Stop all background compaction goroutines - close(e.done) + go func() { defer e.wg.Done(); e.compactTSMFull(quit) }() + go func() { defer e.wg.Done(); e.compactTSMLevel(true, 1, quit) }() + go func() { defer e.wg.Done(); e.compactTSMLevel(true, 2, quit) }() + go func() { defer e.wg.Done(); e.compactTSMLevel(false, 3, quit) }() +} - // Wait for compaction goroutines to exit +func (e *Engine) disableLevelCompactions(n int) { + e.mu.Lock() + old := e.levelWorkers + e.levelWorkers += n + + if old == 0 && e.done != nil { + // Prevent new compactions from starting + e.Compactor.DisableCompactions() + + // Stop all background compaction goroutines + close(e.done) + e.done = nil + } + + e.mu.Unlock() e.wg.Wait() - if err := e.cleanup(); err != nil { - e.logger.Printf("error cleaning up temp file: %v", err) + if old == 0 { // first to disable should cleanup + if err := e.cleanup(); err != nil { + e.logger.Printf("error cleaning up temp file: %v", err) + } } } func (e *Engine) enableSnapshotCompactions() { e.mu.Lock() - if e.snapshotCompactionsEnabled { + if e.snapDone != nil { e.mu.Unlock() return } - e.snapshotCompactionsEnabled = true - e.snapshotterDone = make(chan struct{}) e.Compactor.EnableSnapshots() + quit := make(chan struct{}) + e.snapDone = quit + e.snapWG.Add(1) e.mu.Unlock() - e.snapshotterWg.Add(1) - go e.compactCache() + go func() { defer e.snapWG.Done(); e.compactCache(quit) }() } func (e *Engine) disableSnapshotCompactions() { e.mu.Lock() - if !e.snapshotCompactionsEnabled { - e.mu.Unlock() - return + + if e.snapDone != nil { + e.Compactor.DisableSnapshots() + close(e.snapDone) + e.snapDone = nil } - e.snapshotCompactionsEnabled = false - e.Compactor.DisableSnapshots() + e.mu.Unlock() - e.snapshotterWg.Wait() + e.snapWG.Wait() } // Path returns the path the engine was opened with. @@ -309,8 +316,6 @@ func (e *Engine) Statistics(tags map[string]string) []models.Statistic { // Open opens and initializes the engine. func (e *Engine) Open() error { - e.done = make(chan struct{}) - if err := os.MkdirAll(e.path, 0777); err != nil { return err } @@ -688,8 +693,8 @@ func (e *Engine) DeleteSeriesRange(seriesKeys []string, min, max int64) error { // so that snapshotting does not stop while writing out tombstones. If it is stopped, // and writing tombstones takes a long time, writes can get rejected due to the cache // filling up. - e.disableLevelCompactions() - defer e.enableLevelCompactions() + e.disableLevelCompactions(1) + defer e.enableLevelCompactions(1) // keyMap is used to see if a given key should be deleted. seriesKey // are the measurement + tagset (minus separate & field) @@ -860,22 +865,13 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) ( } // compactCache continually checks if the WAL cache should be written to disk -func (e *Engine) compactCache() { - defer e.snapshotterWg.Done() +func (e *Engine) compactCache(quit <-chan struct{}) { for { select { - case <-e.snapshotterDone: + case <-quit: return default: - e.mu.RLock() - enabled := e.snapshotCompactionsEnabled - e.mu.RUnlock() - - if !enabled { - return - } - e.Cache.UpdateAge() if e.ShouldCompactCache(e.WAL.LastWriteTime()) { start := time.Now() @@ -907,12 +903,10 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool { time.Now().Sub(lastWriteTime) > e.CacheFlushWriteColdDuration } -func (e *Engine) compactTSMLevel(fast bool, level int) { - defer e.wg.Done() - +func (e *Engine) compactTSMLevel(fast bool, level int, quit <-chan struct{}) { for { select { - case <-e.done: + case <-quit: return default: @@ -997,12 +991,10 @@ func (e *Engine) compactTSMLevel(fast bool, level int) { } } -func (e *Engine) compactTSMFull() { - defer e.wg.Done() - +func (e *Engine) compactTSMFull(quit <-chan struct{}) { for { select { - case <-e.done: + case <-quit: return default: