diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index 5ae065ea72..823cefaa59 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -268,9 +268,11 @@ func (c *Cache) Snapshot() (*Cache, error) { // Deduplicate sorts the snapshot before returning it. The compactor and any queries // coming in while it writes will need the values sorted func (c *Cache) Deduplicate() { + c.mu.RLock() for _, e := range c.store { e.deduplicate() } + c.mu.RUnlock() } // ClearSnapshot will remove the snapshot cache from the list of flushing caches and @@ -334,21 +336,27 @@ func (c *Cache) DeleteRange(keys []string, min, max int64) { defer c.mu.Unlock() for _, k := range keys { - origSize := c.store[k].size() + // Make sure key exist in the cache, skip if it does not + e, ok := c.store[k] + if !ok { + continue + } + + origSize := e.size() if min == math.MinInt64 && max == math.MaxInt64 { c.size -= uint64(origSize) delete(c.store, k) continue } - c.store[k].filter(min, max) - if c.store[k].count() == 0 { + e.filter(min, max) + if e.count() == 0 { delete(c.store, k) c.size -= uint64(origSize) continue } - c.size -= uint64(origSize - c.store[k].size()) + c.size -= uint64(origSize - e.size()) } } diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index b311adf90f..3e4984bd9f 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -4,8 +4,10 @@ import ( "fmt" "io/ioutil" "math" + "math/rand" "os" "reflect" + "sync" "testing" "github.com/golang/snappy" @@ -146,6 +148,7 @@ func TestCache_DeleteRange_NoValues(t *testing.T) { t.Fatalf("cache values mismatch: got %v, exp %v", got, exp) } } + func TestCache_Cache_Delete(t *testing.T) { v0 := NewValue(1, 1.0) v1 := NewValue(2, 2.0) @@ -185,6 +188,16 @@ func TestCache_Cache_Delete(t *testing.T) { } } +func TestCache_Cache_Delete_NonExistent(t *testing.T) { + c := NewCache(1024, "") + + c.Delete([]string{"bar"}) + + if got, exp := c.Size(), uint64(0); exp != got { + t.Fatalf("cache size incorrect exp %d, got %d", exp, got) + } +} + // This tests writing two batches to the same series. The first batch // is sorted. The second batch is also sorted but contains duplicates. func TestCache_CacheWriteMulti_Duplicates(t *testing.T) { @@ -389,6 +402,37 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) { } } +func TestCache_Deduplicate_Concurrent(t *testing.T) { + values := make(map[string][]Value) + + for i := 0; i < 1000; i++ { + for j := 0; j < 100; j++ { + values[fmt.Sprintf("cpu%d", i)] = []Value{NewValue(int64(i+j)+int64(rand.Intn(10)), float64(i))} + } + } + + wg := sync.WaitGroup{} + c := NewCache(1000000, "") + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + c.WriteMulti(values) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + c.Deduplicate() + } + }() + + wg.Wait() +} + // Ensure the CacheLoader can correctly load from a single segment, even if it's corrupted. func TestCacheLoader_LoadSingle(t *testing.T) { // Create a WAL segment.