Merge pull request #9141 from influxdata/jw-waitgroup
Fix wait reused while disabling compactionspull/9151/head
commit
d526b828d1
tsdb/engine/tsm1
|
@ -12,6 +12,7 @@
|
|||
|
||||
- [#9065](https://github.com/influxdata/influxdb/pull/9065): Refuse extra arguments to influx CLI
|
||||
- [#9058](https://github.com/influxdata/influxdb/issues/9058): Fix space required after regex operator. Thanks @stop-start!
|
||||
- [#9109](https://github.com/influxdata/influxdb/issues/9109): Fix: panic: sync: WaitGroup is reused before previous Wait has returned
|
||||
|
||||
## v1.4.2 [2017-11-15]
|
||||
|
||||
|
|
|
@ -282,17 +282,44 @@ func (e *Engine) disableLevelCompactions(wait bool) {
|
|||
e.levelWorkers += 1
|
||||
}
|
||||
|
||||
// Hold onto the current done channel so we can wait on it if necessary
|
||||
waitCh := e.done
|
||||
|
||||
if old == 0 && e.done != nil {
|
||||
// It's possible we have closed the done channel and released the lock and another
|
||||
// goroutine has attempted to disable compactions. We're current in the process of
|
||||
// disabling them so check for this and wait until the original completes.
|
||||
select {
|
||||
case <-e.done:
|
||||
e.mu.Unlock()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Prevent new compactions from starting
|
||||
e.Compactor.DisableCompactions()
|
||||
|
||||
// Stop all background compaction goroutines
|
||||
close(e.done)
|
||||
e.done = nil
|
||||
e.mu.Unlock()
|
||||
e.wg.Wait()
|
||||
|
||||
// Signal that all goroutines have exited.
|
||||
e.mu.Lock()
|
||||
e.done = nil
|
||||
e.mu.Unlock()
|
||||
return
|
||||
}
|
||||
e.mu.Unlock()
|
||||
|
||||
// Compaction were already disabled.
|
||||
if waitCh == nil {
|
||||
return
|
||||
}
|
||||
|
||||
e.mu.Unlock()
|
||||
// We were not the first caller to disable compactions and they were in the process
|
||||
// of being disabled. Wait for them to complete before returning.
|
||||
<-waitCh
|
||||
e.wg.Wait()
|
||||
}
|
||||
|
||||
|
@ -323,15 +350,34 @@ func (e *Engine) enableSnapshotCompactions() {
|
|||
|
||||
func (e *Engine) disableSnapshotCompactions() {
|
||||
e.mu.Lock()
|
||||
|
||||
var wait bool
|
||||
if e.snapDone != nil {
|
||||
// We may be in the process of stopping snapshots. See if the channel
|
||||
// was closed.
|
||||
select {
|
||||
case <-e.snapDone:
|
||||
e.mu.Unlock()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
close(e.snapDone)
|
||||
e.snapDone = nil
|
||||
e.Compactor.DisableSnapshots()
|
||||
wait = true
|
||||
|
||||
}
|
||||
e.mu.Unlock()
|
||||
|
||||
// Wait for the snapshot goroutine to exit.
|
||||
if wait {
|
||||
e.snapWG.Wait()
|
||||
}
|
||||
|
||||
// Signal that the goroutines are exit and everything is stopped by setting
|
||||
// snapDone to nil.
|
||||
e.mu.Lock()
|
||||
e.snapDone = nil
|
||||
e.mu.Unlock()
|
||||
e.snapWG.Wait()
|
||||
|
||||
// If the cache is empty, free up its resources as well.
|
||||
if e.Cache.Size() == 0 {
|
||||
|
|
|
@ -816,6 +816,35 @@ func TestEngine_CreateCursor_Descending(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// This test ensures that "sync: WaitGroup is reused before previous Wait has returned" is
|
||||
// is not raised.
|
||||
func TestEngine_DisableEnableCompactions_Concurrent(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
e := MustOpenDefaultEngine()
|
||||
defer e.Close()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 1000; i++ {
|
||||
e.SetCompactionsEnabled(true)
|
||||
e.SetCompactionsEnabled(false)
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 1000; i++ {
|
||||
e.SetCompactionsEnabled(false)
|
||||
e.SetCompactionsEnabled(true)
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func BenchmarkEngine_CreateIterator_Count_1K(b *testing.B) {
|
||||
benchmarkEngineCreateIteratorCount(b, 1000)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue