Fix race in disableLevelCompactions

There was a race on the WaitGroup where we could end up calling Add
while another goroutine was still waiting.  The functions were confusing
so they have been simplified a bit since the compactions goroutines
have been reworked a lot already.
pull/8949/head
Jason Wilder 2017-10-10 14:36:21 -06:00
parent 5033783a33
commit 4f8580fbaa
1 changed files with 37 additions and 50 deletions

View File

@ -97,15 +97,11 @@ type Engine struct {
// The following group of fields is used to track the state of level compactions within the // The following group of fields is used to track the state of level compactions within the
// Engine. The WaitGroup is used to monitor the compaction goroutines, the 'done' channel is // Engine. The WaitGroup is used to monitor the compaction goroutines, the 'done' channel is
// used to signal those goroutines to shutdown. Every request to disable level compactions will // used to signal those goroutines to shutdown.
// call 'Wait' on 'wg', with the first goroutine to arrive (levelWorkers == 0 while holding the
// lock) will close the done channel and re-assign 'nil' to the variable. Re-enabling will
// decrease 'levelWorkers', and when it decreases to zero, level compactions will be started
// back up again.
wg sync.WaitGroup // waitgroup for active level compaction goroutines wg sync.WaitGroup // waitgroup for active level compaction goroutines
done chan struct{} // channel to signal level compactions to stop done chan struct{} // channel to signal level compactions to stop
levelWorkers int // Number of "workers" that expect compactions to be in a disabled state running int32 // running tracks whether compactions are being enabled or disabled
snapDone chan struct{} // channel to signal snapshot compactions to stop snapDone chan struct{} // channel to signal snapshot compactions to stop
snapWG sync.WaitGroup // waitgroup for running snapshot compactions snapWG sync.WaitGroup // waitgroup for running snapshot compactions
@ -211,72 +207,63 @@ func (e *Engine) SetEnabled(enabled bool) {
func (e *Engine) SetCompactionsEnabled(enabled bool) { func (e *Engine) SetCompactionsEnabled(enabled bool) {
if enabled { if enabled {
e.enableSnapshotCompactions() e.enableSnapshotCompactions()
e.enableLevelCompactions(false) e.enableLevelCompactions()
} else { } else {
e.disableSnapshotCompactions() e.disableSnapshotCompactions()
e.disableLevelCompactions(false) e.disableLevelCompactions()
} }
} }
// enableLevelCompactions will request that level compactions start back up again // enableLevelCompactions will request that level compactions start back up again
// func (e *Engine) enableLevelCompactions() {
// 'wait' signifies that a corresponding call to disableLevelCompactions(true) was made at some // See if they are already enabled. If we set this to one, they are disabled
// point, and the associated task that required disabled compactions is now complete // and we're the first to enable.
func (e *Engine) enableLevelCompactions(wait bool) { if !atomic.CompareAndSwapInt32(&e.running, 0, 1) {
// If we don't need to wait, see if we're already enabled
if !wait {
e.mu.RLock()
if e.done != nil {
e.mu.RUnlock()
return
}
e.mu.RUnlock()
}
e.mu.Lock()
if wait {
e.levelWorkers -= 1
}
if e.levelWorkers != 0 || e.done != nil {
// still waiting on more workers or already enabled
e.mu.Unlock()
return return
} }
// last one to enable, start things back up e.mu.Lock()
defer e.mu.Unlock()
if e.done != nil {
return
}
// First one to enable, start things up
e.Compactor.EnableCompactions() e.Compactor.EnableCompactions()
quit := make(chan struct{}) quit := make(chan struct{})
e.done = quit e.done = quit
e.wg.Add(1) e.wg.Add(1)
e.mu.Unlock()
go func() { defer e.wg.Done(); e.compact(quit) }() go func() { defer e.wg.Done(); e.compact(quit) }()
} }
// disableLevelCompactions will stop level compactions before returning. // disableLevelCompactions will stop level compactions before returning.
// func (e *Engine) disableLevelCompactions() {
// If 'wait' is set to true, then a corresponding call to enableLevelCompactions(true) will be // Already disabled?
// required before level compactions will start back up again. if atomic.LoadInt32(&e.running) == 0 {
func (e *Engine) disableLevelCompactions(wait bool) { return
}
e.mu.Lock() e.mu.Lock()
old := e.levelWorkers if e.done == nil {
if wait { e.mu.Unlock()
e.levelWorkers += 1 return
} }
if old == 0 && e.done != nil { // Interrupt and disable running compactions
// Prevent new compactions from starting e.Compactor.DisableCompactions()
e.Compactor.DisableCompactions()
// Stop all background compaction goroutines // Stop all background compaction goroutines
close(e.done) close(e.done)
e.done = nil e.done = nil
}
// Allow goroutines to exit
e.mu.Unlock() e.mu.Unlock()
e.wg.Wait() e.wg.Wait()
atomic.StoreInt32(&e.running, 0)
} }
func (e *Engine) enableSnapshotCompactions() { func (e *Engine) enableSnapshotCompactions() {
@ -967,8 +954,8 @@ func (e *Engine) DeleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
// so that snapshotting does not stop while writing out tombstones. If it is stopped, // so that snapshotting does not stop while writing out tombstones. If it is stopped,
// and writing tombstones takes a long time, writes can get rejected due to the cache // and writing tombstones takes a long time, writes can get rejected due to the cache
// filling up. // filling up.
e.disableLevelCompactions(true) e.disableLevelCompactions()
defer e.enableLevelCompactions(true) defer e.enableLevelCompactions()
tempKeys := seriesKeys[:] tempKeys := seriesKeys[:]
deleteKeys := make([][]byte, 0, len(seriesKeys)) deleteKeys := make([][]byte, 0, len(seriesKeys))