Merge pull request #9305 from influxdata/jw-compact-disable
Fix TestEngine_DisableEnableCompactions_Concurrent hangpull/9308/head
commit
1abe176a68
|
@ -339,13 +339,12 @@ func (e *Engine) enableLevelCompactions(wait bool) {
|
||||||
|
|
||||||
// last one to enable, start things back up
|
// last one to enable, start things back up
|
||||||
e.Compactor.EnableCompactions()
|
e.Compactor.EnableCompactions()
|
||||||
quit := make(chan struct{})
|
e.done = make(chan struct{})
|
||||||
e.done = quit
|
|
||||||
|
|
||||||
e.wg.Add(1)
|
e.wg.Add(1)
|
||||||
e.mu.Unlock()
|
e.mu.Unlock()
|
||||||
|
|
||||||
go func() { defer e.wg.Done(); e.compact(quit) }()
|
go func() { defer e.wg.Done(); e.compact() }()
|
||||||
}
|
}
|
||||||
|
|
||||||
// disableLevelCompactions will stop level compactions before returning.
|
// disableLevelCompactions will stop level compactions before returning.
|
||||||
|
@ -417,12 +416,11 @@ func (e *Engine) enableSnapshotCompactions() {
|
||||||
}
|
}
|
||||||
|
|
||||||
e.Compactor.EnableSnapshots()
|
e.Compactor.EnableSnapshots()
|
||||||
quit := make(chan struct{})
|
e.snapDone = make(chan struct{})
|
||||||
e.snapDone = quit
|
|
||||||
e.snapWG.Add(1)
|
e.snapWG.Add(1)
|
||||||
e.mu.Unlock()
|
e.mu.Unlock()
|
||||||
|
|
||||||
go func() { defer e.snapWG.Done(); e.compactCache(quit) }()
|
go func() { defer e.snapWG.Done(); e.compactCache() }()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) disableSnapshotCompactions() {
|
func (e *Engine) disableSnapshotCompactions() {
|
||||||
|
@ -1597,10 +1595,14 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (
|
||||||
}
|
}
|
||||||
|
|
||||||
// compactCache continually checks if the WAL cache should be written to disk.
|
// compactCache continually checks if the WAL cache should be written to disk.
|
||||||
func (e *Engine) compactCache(quit <-chan struct{}) {
|
func (e *Engine) compactCache() {
|
||||||
t := time.NewTicker(time.Second)
|
t := time.NewTicker(time.Second)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
for {
|
for {
|
||||||
|
e.mu.RLock()
|
||||||
|
quit := e.snapDone
|
||||||
|
e.mu.RUnlock()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-quit:
|
case <-quit:
|
||||||
return
|
return
|
||||||
|
@ -1636,11 +1638,15 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool {
|
||||||
time.Since(lastWriteTime) > e.CacheFlushWriteColdDuration
|
time.Since(lastWriteTime) > e.CacheFlushWriteColdDuration
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) compact(quit <-chan struct{}) {
|
func (e *Engine) compact() {
|
||||||
t := time.NewTicker(time.Second)
|
t := time.NewTicker(time.Second)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
|
e.mu.RLock()
|
||||||
|
quit := e.done
|
||||||
|
e.mu.RUnlock()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-quit:
|
case <-quit:
|
||||||
return
|
return
|
||||||
|
|
|
@ -983,10 +983,14 @@ func TestIndex_SeriesIDSet(t *testing.T) {
|
||||||
// series for the gpu measurement...
|
// series for the gpu measurement...
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
contains := engine.SeriesIDSet().Contains(id)
|
contains := engine.SeriesIDSet().Contains(id)
|
||||||
if id < 4 && !contains {
|
key, _ := engine.sfile.Series(id)
|
||||||
return fmt.Errorf("bitmap does not contain ID: %d, but should", id)
|
isGpu := bytes.Equal(key, []byte("gpu"))
|
||||||
} else if id >= 4 && contains {
|
isMem := bytes.Equal(key, []byte("mem"))
|
||||||
return fmt.Errorf("bitmap still contains ID: %d after delete", id)
|
|
||||||
|
if (isGpu || isMem) && contains {
|
||||||
|
return fmt.Errorf("bitmap still contains ID: %d after delete: %s", id, string(key))
|
||||||
|
} else if !(isGpu || isMem) && !contains {
|
||||||
|
return fmt.Errorf("bitmap does not contain ID: %d, but should: %s", id, string(key))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -997,10 +1001,14 @@ func TestIndex_SeriesIDSet(t *testing.T) {
|
||||||
|
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
contains := engine.SeriesIDSet().Contains(id)
|
contains := engine.SeriesIDSet().Contains(id)
|
||||||
if id < 4 && !contains {
|
key, _ := engine.sfile.Series(id)
|
||||||
return fmt.Errorf("[after re-open] bitmap does not contain ID: %d, but should", id)
|
isGpu := bytes.Equal(key, []byte("gpu"))
|
||||||
} else if id >= 4 && contains {
|
isMem := bytes.Equal(key, []byte("mem"))
|
||||||
return fmt.Errorf("[after re-open] bitmap still contains ID: %d after delete", id)
|
|
||||||
|
if (isGpu || isMem) && contains {
|
||||||
|
return fmt.Errorf("[after re-open] bitmap still contains ID: %d after delete: %s", id, string(key))
|
||||||
|
} else if !(isGpu || isMem) && !contains {
|
||||||
|
return fmt.Errorf("[after re-open] bitmap does not contain ID: %d, but should: %s", id, string(key))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -74,7 +74,6 @@ func (f *SeriesFile) Close() (err error) {
|
||||||
err = e
|
err = e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
f.partitions = nil
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue