diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index f1a61baa3d..ae3c0e0d7b 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -339,13 +339,12 @@ func (e *Engine) enableLevelCompactions(wait bool) { // last one to enable, start things back up e.Compactor.EnableCompactions() - quit := make(chan struct{}) - e.done = quit + e.done = make(chan struct{}) e.wg.Add(1) e.mu.Unlock() - go func() { defer e.wg.Done(); e.compact(quit) }() + go func() { defer e.wg.Done(); e.compact() }() } // disableLevelCompactions will stop level compactions before returning. @@ -417,12 +416,11 @@ func (e *Engine) enableSnapshotCompactions() { } e.Compactor.EnableSnapshots() - quit := make(chan struct{}) - e.snapDone = quit + e.snapDone = make(chan struct{}) e.snapWG.Add(1) e.mu.Unlock() - go func() { defer e.snapWG.Done(); e.compactCache(quit) }() + go func() { defer e.snapWG.Done(); e.compactCache() }() } func (e *Engine) disableSnapshotCompactions() { @@ -1597,10 +1595,14 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) ( } // compactCache continually checks if the WAL cache should be written to disk. -func (e *Engine) compactCache(quit <-chan struct{}) { +func (e *Engine) compactCache() { t := time.NewTicker(time.Second) defer t.Stop() for { + e.mu.RLock() + quit := e.snapDone + e.mu.RUnlock() + select { case <-quit: return @@ -1636,11 +1638,15 @@ func (e *Engine) ShouldCompactCache(lastWriteTime time.Time) bool { time.Since(lastWriteTime) > e.CacheFlushWriteColdDuration } -func (e *Engine) compact(quit <-chan struct{}) { +func (e *Engine) compact() { t := time.NewTicker(time.Second) defer t.Stop() for { + e.mu.RLock() + quit := e.done + e.mu.RUnlock() + select { case <-quit: return diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 53475c4fbc..779af49b2f 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -983,10 +983,14 @@ func TestIndex_SeriesIDSet(t *testing.T) { // series for the gpu measurement... for _, id := range ids { contains := engine.SeriesIDSet().Contains(id) - if id < 4 && !contains { - return fmt.Errorf("bitmap does not contain ID: %d, but should", id) - } else if id >= 4 && contains { - return fmt.Errorf("bitmap still contains ID: %d after delete", id) + key, _ := engine.sfile.Series(id) + isGpu := bytes.Equal(key, []byte("gpu")) + isMem := bytes.Equal(key, []byte("mem")) + + if (isGpu || isMem) && contains { + return fmt.Errorf("bitmap still contains ID: %d after delete: %s", id, string(key)) + } else if !(isGpu || isMem) && !contains { + return fmt.Errorf("bitmap does not contain ID: %d, but should: %s", id, string(key)) } } @@ -997,10 +1001,14 @@ func TestIndex_SeriesIDSet(t *testing.T) { for _, id := range ids { contains := engine.SeriesIDSet().Contains(id) - if id < 4 && !contains { - return fmt.Errorf("[after re-open] bitmap does not contain ID: %d, but should", id) - } else if id >= 4 && contains { - return fmt.Errorf("[after re-open] bitmap still contains ID: %d after delete", id) + key, _ := engine.sfile.Series(id) + isGpu := bytes.Equal(key, []byte("gpu")) + isMem := bytes.Equal(key, []byte("mem")) + + if (isGpu || isMem) && contains { + return fmt.Errorf("[after re-open] bitmap still contains ID: %d after delete: %s", id, string(key)) + } else if !(isGpu || isMem) && !contains { + return fmt.Errorf("[after re-open] bitmap does not contain ID: %d, but should: %s", id, string(key)) } } return nil diff --git a/tsdb/series_file.go b/tsdb/series_file.go index 383608f83a..6c27cd7aeb 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -74,7 +74,6 @@ func (f *SeriesFile) Close() (err error) { err = e } } - f.partitions = nil return err }