fix(tsdb): Replace TSI compaction wait group with counter.

Previously the TSI partition would panic if a compaction was
started while `Wait()` was waiting. This commit removes the previous
wait group and replaces it with a simple counter. The `Wait()`
function now polls the counter until it reaches zero.
pull/14902/head
Ben Johnson 2019-09-02 09:37:34 -06:00
parent 7beed29551
commit 729558d64b
No known key found for this signature in database
GPG Key ID: 81741CD251883081
1 changed files with 37 additions and 17 deletions

View File

@ -73,7 +73,7 @@ type Partition struct {
levels []CompactionLevel // compaction levels
levelCompacting []bool // level compaction status
compactionsDisabled int // counter of disables
compactionsWG sync.WaitGroup
currentCompactionN int // counter of in-progress compactions
// Directory of the Partition's index files.
path string
@ -371,9 +371,9 @@ func (p *Partition) Close() error {
p.resmu.Lock()
defer p.resmu.Unlock()
// Close the resource and wait for any outstanding references.
// Close the resource.
p.res.Close()
p.compactionsWG.Wait()
p.Wait()
// There are now no internal outstanding callers holding a reference
// so we can acquire this mutex to protect against external callers.
@ -910,9 +910,25 @@ func (p *Partition) EnableCompactions() {
p.compactionsDisabled--
}
// Wait will block until all compactions are finished. Must only be called while they
// are disabled.
func (p *Partition) Wait() { p.compactionsWG.Wait() }
// CurrentCompactionN returns the number of compactions currently running.
func (p *Partition) CurrentCompactionN() int {
p.mu.RLock()
defer p.mu.RUnlock()
return p.currentCompactionN
}
// Wait will block until all compactions are finished.
// Must only be called while they are disabled.
func (p *Partition) Wait() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
if p.CurrentCompactionN() == 0 {
return
}
}
}
// compact compacts continguous groups of files that are not currently compacting.
func (p *Partition) compact() {
@ -964,20 +980,20 @@ func (p *Partition) compact() {
p.levelCompacting[level] = true
// Start compacting in a separate goroutine.
p.compactionsWG.Add(1)
p.currentCompactionN++
go func(level int) {
// Compact to a new level.
p.compactToLevel(files, frefs, level+1, ref.Closing())
// Ensure compaction lock for the level is released.
p.mu.Lock()
p.levelCompacting[level] = false
p.mu.Unlock()
// Ensure references are released.
frefs.Release()
ref.Release()
p.compactionsWG.Done()
// Ensure compaction lock for the level is released.
p.mu.Lock()
p.levelCompacting[level] = false
p.currentCompactionN--
p.mu.Unlock()
// Check for new compactions
p.Compact()
@ -1163,12 +1179,16 @@ func (p *Partition) checkLogFile() error {
}
// Begin compacting in a background goroutine.
p.compactionsWG.Add(1)
p.currentCompactionN++
go func() {
p.compactLogFile(ctx, logFile, ref.Closing())
ref.Release() // release our reference
p.compactionsWG.Done() // compaction is now complete
p.Compact() // check for new compactions
ref.Release() // release our reference
p.mu.Lock()
p.currentCompactionN-- // compaction is now complete
p.mu.Unlock()
p.Compact() // check for new compactions
}()
return nil