feat: wip
parent
1a7b8983bf
commit
fac5b363c5
|
@ -103,7 +103,7 @@ func (s *Service) WithLogger(log *zap.Logger) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) run() {
|
func (s *Service) run() {
|
||||||
ticker := time.NewTicker(time.Duration(s.config.CheckInterval))
|
ticker := time.NewTicker(time.Duration(2 * time.Minute))
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
|
|
@ -1075,18 +1075,21 @@ func (p *Partition) compact() {
|
||||||
p.currentCompactionN++
|
p.currentCompactionN++
|
||||||
p.logger.Warn("currentCompaction INCREASED", zap.Int("currentCompactionN", p.currentCompactionN))
|
p.logger.Warn("currentCompaction INCREASED", zap.Int("currentCompactionN", p.currentCompactionN))
|
||||||
go func() {
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
p.mu.Lock()
|
||||||
|
p.currentCompactionN--
|
||||||
|
p.logger.Warn("currentCompaction DECREASED", zap.Int("currentCompactionN", p.currentCompactionN))
|
||||||
|
p.levelCompacting[0] = false
|
||||||
|
p.mu.Unlock()
|
||||||
|
p.Compact()
|
||||||
|
}()
|
||||||
if p.shouldChaosKill() {
|
if p.shouldChaosKill() {
|
||||||
p.logger.Warn("CHAOS: Randomly killing log file compaction goroutine",
|
p.logger.Warn("CHAOS: Randomly killing log file compaction goroutine",
|
||||||
zap.String("file", logFile.Path()), zap.Int("currentCompactionN", p.currentCompactionN))
|
zap.String("file", logFile.Path()), zap.Int("currentCompactionN", p.currentCompactionN))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
p.compactLogFile(logFile)
|
p.compactLogFile(logFile)
|
||||||
p.mu.Lock()
|
|
||||||
p.currentCompactionN--
|
|
||||||
p.logger.Warn("currentCompaction DECREASED", zap.Int("currentCompactionN", p.currentCompactionN))
|
|
||||||
p.levelCompacting[0] = false
|
|
||||||
p.mu.Unlock()
|
|
||||||
p.Compact()
|
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1121,22 +1124,24 @@ func (p *Partition) compact() {
|
||||||
p.currentCompactionN++
|
p.currentCompactionN++
|
||||||
p.logger.Warn("currentCompaction INCREASED", zap.Int("currentCompactionN", p.currentCompactionN))
|
p.logger.Warn("currentCompaction INCREASED", zap.Int("currentCompactionN", p.currentCompactionN))
|
||||||
go func() {
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
// Ensure compaction lock for the level is released.
|
||||||
|
p.mu.Lock()
|
||||||
|
p.levelCompacting[level] = false
|
||||||
|
p.currentCompactionN--
|
||||||
|
p.logger.Warn("currentCompaction DECREASED", zap.Int("currentCompactionN", p.currentCompactionN))
|
||||||
|
p.mu.Unlock()
|
||||||
|
|
||||||
|
// Check for new compactions
|
||||||
|
p.Compact()
|
||||||
|
}()
|
||||||
|
|
||||||
if p.shouldChaosKill() {
|
if p.shouldChaosKill() {
|
||||||
p.logger.Warn("CHAOS: Randomly killing log file compaction goroutine", zap.Int("currentCompactionN", p.currentCompactionN))
|
p.logger.Warn("CHAOS: Randomly killing log file compaction goroutine", zap.Int("currentCompactionN", p.currentCompactionN))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Compact to a new level.
|
// Compact to a new level.
|
||||||
p.compactToLevel(files, level+1, interrupt)
|
p.compactToLevel(files, level+1, interrupt)
|
||||||
|
|
||||||
// Ensure compaction lock for the level is released.
|
|
||||||
p.mu.Lock()
|
|
||||||
p.levelCompacting[level] = false
|
|
||||||
p.currentCompactionN--
|
|
||||||
p.logger.Warn("currentCompaction DECREASED", zap.Int("currentCompactionN", p.currentCompactionN))
|
|
||||||
p.mu.Unlock()
|
|
||||||
|
|
||||||
// Check for new compactions
|
|
||||||
p.Compact()
|
|
||||||
}()
|
}()
|
||||||
}(files, level)
|
}(files, level)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue