From 838a29cca80e63150ffad5347d367746ecd3c7d0 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Mon, 6 Jun 2016 14:54:08 -0600 Subject: [PATCH] Fix race in cache If cache.Deduplicate is called while writes are in-flight on the cache, a data race could occur. WARNING: DATA RACE Write by goroutine 15: runtime.mapassign1() /usr/local/go/src/runtime/hashmap.go:429 +0x0 github.com/influxdata/influxdb/tsdb/engine/tsm1.(*Cache).entry() /Users/jason/go/src/github.com/influxdata/influxdb/tsdb/engine/tsm1/cache.go:482 +0x27e github.com/influxdata/influxdb/tsdb/engine/tsm1.(*Cache).WriteMulti() /Users/jason/go/src/github.com/influxdata/influxdb/tsdb/engine/tsm1/cache.go:207 +0x3b2 github.com/influxdata/influxdb/tsdb/engine/tsm1.TestCache_Deduplicate_Concurrent.func1() /Users/jason/go/src/github.com/influxdata/influxdb/tsdb/engine/tsm1/cache_test.go:421 +0x73 Previous read by goroutine 16: runtime.mapiterinit() /usr/local/go/src/runtime/hashmap.go:607 +0x0 github.com/influxdata/influxdb/tsdb/engine/tsm1.(*Cache).Deduplicate() /Users/jason/go/src/github.com/influxdata/influxdb/tsdb/engine/tsm1/cache.go:272 +0x7c github.com/influxdata/influxdb/tsdb/engine/tsm1.TestCache_Deduplicate_Concurrent.func2() /Users/jason/go/src/github.com/influxdata/influxdb/tsdb/engine/tsm1/cache_test.go:429 +0x69 Goroutine 15 (running) created at: github.com/influxdata/influxdb/tsdb/engine/tsm1.TestCache_Deduplicate_Concurrent() /Users/jason/go/src/github.com/influxdata/influxdb/tsdb/engine/tsm1/cache_test.go:423 +0x3f2 testing.tRunner() /usr/local/go/src/testing/testing.go:473 +0xdc Goroutine 16 (finished) created at: github.com/influxdata/influxdb/tsdb/engine/tsm1.TestCache_Deduplicate_Concurrent() /Users/jason/go/src/github.com/influxdata/influxdb/tsdb/engine/tsm1/cache_test.go:431 +0x43b testing.tRunner() /usr/local/go/src/testing/testing.go:473 +0xdc --- tsdb/engine/tsm1/cache.go | 2 ++ tsdb/engine/tsm1/cache_test.go | 35 +++++++++++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index c1f84f468c..823cefaa59 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -268,9 +268,11 @@ func (c *Cache) Snapshot() (*Cache, error) { // Deduplicate sorts the snapshot before returning it. The compactor and any queries // coming in while it writes will need the values sorted func (c *Cache) Deduplicate() { + c.mu.RLock() for _, e := range c.store { e.deduplicate() } + c.mu.RUnlock() } // ClearSnapshot will remove the snapshot cache from the list of flushing caches and diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index c45701c81c..3e4984bd9f 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -4,8 +4,10 @@ import ( "fmt" "io/ioutil" "math" + "math/rand" "os" "reflect" + "sync" "testing" "github.com/golang/snappy" @@ -192,7 +194,7 @@ func TestCache_Cache_Delete_NonExistent(t *testing.T) { c.Delete([]string{"bar"}) if got, exp := c.Size(), uint64(0); exp != got { - t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got) + t.Fatalf("cache size incorrect exp %d, got %d", exp, got) } } @@ -400,6 +402,37 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) { } } +func TestCache_Deduplicate_Concurrent(t *testing.T) { + values := make(map[string][]Value) + + for i := 0; i < 1000; i++ { + for j := 0; j < 100; j++ { + values[fmt.Sprintf("cpu%d", i)] = []Value{NewValue(int64(i+j)+int64(rand.Intn(10)), float64(i))} + } + } + + wg := sync.WaitGroup{} + c := NewCache(1000000, "") + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + c.WriteMulti(values) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + c.Deduplicate() + } + }() + + wg.Wait() +} + // Ensure the CacheLoader can correctly load from a single segment, even if it's corrupted. func TestCacheLoader_LoadSingle(t *testing.T) { // Create a WAL segment.