From fac5b363c59d9ccea56fb5c821c26eed6f6ecad2 Mon Sep 17 00:00:00 2001 From: devanbenz Date: Fri, 13 Jun 2025 12:24:02 -0500 Subject: [PATCH] feat: wip --- services/retention/service.go | 2 +- tsdb/index/tsi1/partition.go | 37 ++++++++++++++++++++--------------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/services/retention/service.go b/services/retention/service.go index aa75beace9..87fb142141 100644 --- a/services/retention/service.go +++ b/services/retention/service.go @@ -103,7 +103,7 @@ func (s *Service) WithLogger(log *zap.Logger) { } func (s *Service) run() { - ticker := time.NewTicker(time.Duration(s.config.CheckInterval)) + ticker := time.NewTicker(time.Duration(2 * time.Minute)) defer ticker.Stop() for { select { diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index 41c13d8db6..4e5f0d4ebb 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -1075,18 +1075,21 @@ func (p *Partition) compact() { p.currentCompactionN++ p.logger.Warn("currentCompaction INCREASED", zap.Int("currentCompactionN", p.currentCompactionN)) go func() { + defer func() { + p.mu.Lock() + p.currentCompactionN-- + p.logger.Warn("currentCompaction DECREASED", zap.Int("currentCompactionN", p.currentCompactionN)) + p.levelCompacting[0] = false + p.mu.Unlock() + p.Compact() + }() if p.shouldChaosKill() { p.logger.Warn("CHAOS: Randomly killing log file compaction goroutine", zap.String("file", logFile.Path()), zap.Int("currentCompactionN", p.currentCompactionN)) return } + p.compactLogFile(logFile) - p.mu.Lock() - p.currentCompactionN-- - p.logger.Warn("currentCompaction DECREASED", zap.Int("currentCompactionN", p.currentCompactionN)) - p.levelCompacting[0] = false - p.mu.Unlock() - p.Compact() }() } } @@ -1121,22 +1124,24 @@ func (p *Partition) compact() { p.currentCompactionN++ p.logger.Warn("currentCompaction INCREASED", zap.Int("currentCompactionN", p.currentCompactionN)) go func() { + defer func() { + // Ensure compaction lock for the level is released. + p.mu.Lock() + p.levelCompacting[level] = false + p.currentCompactionN-- + p.logger.Warn("currentCompaction DECREASED", zap.Int("currentCompactionN", p.currentCompactionN)) + p.mu.Unlock() + + // Check for new compactions + p.Compact() + }() + if p.shouldChaosKill() { p.logger.Warn("CHAOS: Randomly killing log file compaction goroutine", zap.Int("currentCompactionN", p.currentCompactionN)) return } // Compact to a new level. p.compactToLevel(files, level+1, interrupt) - - // Ensure compaction lock for the level is released. - p.mu.Lock() - p.levelCompacting[level] = false - p.currentCompactionN-- - p.logger.Warn("currentCompaction DECREASED", zap.Int("currentCompactionN", p.currentCompactionN)) - p.mu.Unlock() - - // Check for new compactions - p.Compact() }() }(files, level) }