parent
32c046b5ae
commit
6f526cd15e
|
@ -10,6 +10,7 @@ import (
|
|||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
|
@ -62,7 +63,7 @@ type Partition struct {
|
|||
// Fieldset shared with engine.
|
||||
fieldset *tsdb.MeasurementFieldSet
|
||||
|
||||
currentCompactionN int // counter of in-progress compactions
|
||||
currentCompactionN atomic.Int32 // counter of in-progress compactions
|
||||
|
||||
// Directory of the Partition's index files.
|
||||
path string
|
||||
|
@ -348,10 +349,8 @@ func (p *Partition) buildSeriesSet() error {
|
|||
}
|
||||
|
||||
// CurrentCompactionN returns the number of compactions currently running.
|
||||
func (p *Partition) CurrentCompactionN() int {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return p.currentCompactionN
|
||||
func (p *Partition) CurrentCompactionN() int32 {
|
||||
return p.currentCompactionN.Load()
|
||||
}
|
||||
|
||||
// Wait will block until all compactions are finished.
|
||||
|
@ -359,11 +358,29 @@ func (p *Partition) CurrentCompactionN() int {
|
|||
func (p *Partition) Wait() {
|
||||
ticker := time.NewTicker(10 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Debug level timeout
|
||||
timeoutDuration := 24 * time.Hour
|
||||
startTime := time.Now()
|
||||
|
||||
for {
|
||||
if p.CurrentCompactionN() == 0 {
|
||||
return
|
||||
}
|
||||
<-ticker.C
|
||||
select {
|
||||
case <-ticker.C:
|
||||
elapsed := time.Since(startTime)
|
||||
if elapsed >= timeoutDuration {
|
||||
files := make([]string, 0)
|
||||
for _, v := range p.fileSet.Files() {
|
||||
files = append(files, v.Path())
|
||||
}
|
||||
p.logger.Warn("Partition.Wait() timed out waiting for compactions to complete",
|
||||
zap.Int32("stuck_compactions", p.CurrentCompactionN()), zap.Duration("timeout", timeoutDuration),
|
||||
zap.Strings("files", files))
|
||||
startTime = time.Now()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1040,14 +1057,17 @@ func (p *Partition) compact() {
|
|||
}
|
||||
// Mark the level as compacting.
|
||||
p.levelCompacting[0] = true
|
||||
p.currentCompactionN++
|
||||
p.currentCompactionN.Add(1)
|
||||
go func() {
|
||||
defer func() {
|
||||
p.mu.Lock()
|
||||
p.currentCompactionN.Add(-1)
|
||||
p.levelCompacting[0] = false
|
||||
p.mu.Unlock()
|
||||
p.Compact()
|
||||
}()
|
||||
|
||||
p.compactLogFile(logFile)
|
||||
p.mu.Lock()
|
||||
p.currentCompactionN--
|
||||
p.levelCompacting[0] = false
|
||||
p.mu.Unlock()
|
||||
p.Compact()
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
@ -1079,20 +1099,21 @@ func (p *Partition) compact() {
|
|||
// Execute in closure to save reference to the group within the loop.
|
||||
func(files []*IndexFile, level int) {
|
||||
// Start compacting in a separate goroutine.
|
||||
p.currentCompactionN++
|
||||
p.currentCompactionN.Add(1)
|
||||
go func() {
|
||||
defer func() {
|
||||
// Ensure compaction lock for the level is released.
|
||||
p.mu.Lock()
|
||||
p.levelCompacting[level] = false
|
||||
p.currentCompactionN.Add(-1)
|
||||
p.mu.Unlock()
|
||||
|
||||
// Check for new compactions
|
||||
p.Compact()
|
||||
}()
|
||||
|
||||
// Compact to a new level.
|
||||
p.compactToLevel(files, level+1, interrupt)
|
||||
|
||||
// Ensure compaction lock for the level is released.
|
||||
p.mu.Lock()
|
||||
p.levelCompacting[level] = false
|
||||
p.currentCompactionN--
|
||||
p.mu.Unlock()
|
||||
|
||||
// Check for new compactions
|
||||
p.Compact()
|
||||
}()
|
||||
}(files, level)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue