diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index 2718609b5b..be3c4bae9c 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -10,6 +10,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "unsafe" @@ -62,7 +63,7 @@ type Partition struct { // Fieldset shared with engine. fieldset *tsdb.MeasurementFieldSet - currentCompactionN int // counter of in-progress compactions + currentCompactionN atomic.Int32 // counter of in-progress compactions // Directory of the Partition's index files. path string @@ -348,10 +349,8 @@ func (p *Partition) buildSeriesSet() error { } // CurrentCompactionN returns the number of compactions currently running. -func (p *Partition) CurrentCompactionN() int { - p.mu.RLock() - defer p.mu.RUnlock() - return p.currentCompactionN +func (p *Partition) CurrentCompactionN() int32 { + return p.currentCompactionN.Load() } // Wait will block until all compactions are finished. @@ -359,11 +358,29 @@ func (p *Partition) CurrentCompactionN() int { func (p *Partition) Wait() { ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop() + + // Debug level timeout + timeoutDuration := 24 * time.Hour + startTime := time.Now() + for { if p.CurrentCompactionN() == 0 { return } - <-ticker.C + select { + case <-ticker.C: + elapsed := time.Since(startTime) + if elapsed >= timeoutDuration { + files := make([]string, 0) + for _, v := range p.fileSet.Files() { + files = append(files, v.Path()) + } + p.logger.Warn("Partition.Wait() timed out waiting for compactions to complete", + zap.Int32("stuck_compactions", p.CurrentCompactionN()), zap.Duration("timeout", timeoutDuration), + zap.Strings("files", files)) + startTime = time.Now() + } + } } } @@ -1040,14 +1057,17 @@ func (p *Partition) compact() { } // Mark the level as compacting. p.levelCompacting[0] = true - p.currentCompactionN++ + p.currentCompactionN.Add(1) go func() { + defer func() { + p.mu.Lock() + p.currentCompactionN.Add(-1) + p.levelCompacting[0] = false + p.mu.Unlock() + p.Compact() + }() + p.compactLogFile(logFile) - p.mu.Lock() - p.currentCompactionN-- - p.levelCompacting[0] = false - p.mu.Unlock() - p.Compact() }() } } @@ -1079,20 +1099,21 @@ func (p *Partition) compact() { // Execute in closure to save reference to the group within the loop. func(files []*IndexFile, level int) { // Start compacting in a separate goroutine. - p.currentCompactionN++ + p.currentCompactionN.Add(1) go func() { + defer func() { + // Ensure compaction lock for the level is released. + p.mu.Lock() + p.levelCompacting[level] = false + p.currentCompactionN.Add(-1) + p.mu.Unlock() + + // Check for new compactions + p.Compact() + }() // Compact to a new level. p.compactToLevel(files, level+1, interrupt) - - // Ensure compaction lock for the level is released. - p.mu.Lock() - p.levelCompacting[level] = false - p.currentCompactionN-- - p.mu.Unlock() - - // Check for new compactions - p.Compact() }() }(files, level) }