feat: Defer cleanup for log/index compactions, add debug log (#26511) (#26629)

pull/26631/merge
WeblWabl 2025-07-25 13:21:19 -05:00 committed by GitHub
parent 32c046b5ae
commit 6f526cd15e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 43 additions and 22 deletions

View File

@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
@ -62,7 +63,7 @@ type Partition struct {
// Fieldset shared with engine.
fieldset *tsdb.MeasurementFieldSet
currentCompactionN int // counter of in-progress compactions
currentCompactionN atomic.Int32 // counter of in-progress compactions
// Directory of the Partition's index files.
path string
@ -348,10 +349,8 @@ func (p *Partition) buildSeriesSet() error {
}
// CurrentCompactionN returns the number of compactions currently running.
func (p *Partition) CurrentCompactionN() int {
p.mu.RLock()
defer p.mu.RUnlock()
return p.currentCompactionN
func (p *Partition) CurrentCompactionN() int32 {
return p.currentCompactionN.Load()
}
// Wait will block until all compactions are finished.
@ -359,11 +358,29 @@ func (p *Partition) CurrentCompactionN() int {
func (p *Partition) Wait() {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
// Debug level timeout
timeoutDuration := 24 * time.Hour
startTime := time.Now()
for {
if p.CurrentCompactionN() == 0 {
return
}
<-ticker.C
select {
case <-ticker.C:
elapsed := time.Since(startTime)
if elapsed >= timeoutDuration {
files := make([]string, 0)
for _, v := range p.fileSet.Files() {
files = append(files, v.Path())
}
p.logger.Warn("Partition.Wait() timed out waiting for compactions to complete",
zap.Int32("stuck_compactions", p.CurrentCompactionN()), zap.Duration("timeout", timeoutDuration),
zap.Strings("files", files))
startTime = time.Now()
}
}
}
}
@ -1040,14 +1057,17 @@ func (p *Partition) compact() {
}
// Mark the level as compacting.
p.levelCompacting[0] = true
p.currentCompactionN++
p.currentCompactionN.Add(1)
go func() {
defer func() {
p.mu.Lock()
p.currentCompactionN.Add(-1)
p.levelCompacting[0] = false
p.mu.Unlock()
p.Compact()
}()
p.compactLogFile(logFile)
p.mu.Lock()
p.currentCompactionN--
p.levelCompacting[0] = false
p.mu.Unlock()
p.Compact()
}()
}
}
@ -1079,20 +1099,21 @@ func (p *Partition) compact() {
// Execute in closure to save reference to the group within the loop.
func(files []*IndexFile, level int) {
// Start compacting in a separate goroutine.
p.currentCompactionN++
p.currentCompactionN.Add(1)
go func() {
defer func() {
// Ensure compaction lock for the level is released.
p.mu.Lock()
p.levelCompacting[level] = false
p.currentCompactionN.Add(-1)
p.mu.Unlock()
// Check for new compactions
p.Compact()
}()
// Compact to a new level.
p.compactToLevel(files, level+1, interrupt)
// Ensure compaction lock for the level is released.
p.mu.Lock()
p.levelCompacting[level] = false
p.currentCompactionN--
p.mu.Unlock()
// Check for new compactions
p.Compact()
}()
}(files, level)
}