Fix race in cache

If cache.Deduplicate is called while writes are in-flight on the cache, a data race
could occur.

WARNING: DATA RACE
Write by goroutine 15:
  runtime.mapassign1()
      /usr/local/go/src/runtime/hashmap.go:429 +0x0
  github.com/influxdata/influxdb/tsdb/engine/tsm1.(*Cache).entry()
      /Users/jason/go/src/github.com/influxdata/influxdb/tsdb/engine/tsm1/cache.go:482 +0x27e
  github.com/influxdata/influxdb/tsdb/engine/tsm1.(*Cache).WriteMulti()
      /Users/jason/go/src/github.com/influxdata/influxdb/tsdb/engine/tsm1/cache.go:207 +0x3b2
  github.com/influxdata/influxdb/tsdb/engine/tsm1.TestCache_Deduplicate_Concurrent.func1()
      /Users/jason/go/src/github.com/influxdata/influxdb/tsdb/engine/tsm1/cache_test.go:421 +0x73

Previous read by goroutine 16:
  runtime.mapiterinit()
      /usr/local/go/src/runtime/hashmap.go:607 +0x0
  github.com/influxdata/influxdb/tsdb/engine/tsm1.(*Cache).Deduplicate()
      /Users/jason/go/src/github.com/influxdata/influxdb/tsdb/engine/tsm1/cache.go:272 +0x7c
  github.com/influxdata/influxdb/tsdb/engine/tsm1.TestCache_Deduplicate_Concurrent.func2()
      /Users/jason/go/src/github.com/influxdata/influxdb/tsdb/engine/tsm1/cache_test.go:429 +0x69

Goroutine 15 (running) created at:
  github.com/influxdata/influxdb/tsdb/engine/tsm1.TestCache_Deduplicate_Concurrent()
      /Users/jason/go/src/github.com/influxdata/influxdb/tsdb/engine/tsm1/cache_test.go:423 +0x3f2
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:473 +0xdc

Goroutine 16 (finished) created at:
  github.com/influxdata/influxdb/tsdb/engine/tsm1.TestCache_Deduplicate_Concurrent()
      /Users/jason/go/src/github.com/influxdata/influxdb/tsdb/engine/tsm1/cache_test.go:431 +0x43b
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:473 +0xdc
pull/6794/head
Jason Wilder 2016-06-06 14:54:08 -06:00
parent bc76048371
commit 838a29cca8
2 changed files with 36 additions and 1 deletions

View File

@ -268,9 +268,11 @@ func (c *Cache) Snapshot() (*Cache, error) {
// Deduplicate sorts the snapshot before returning it. The compactor and any queries
// coming in while it writes will need the values sorted
func (c *Cache) Deduplicate() {
c.mu.RLock()
for _, e := range c.store {
e.deduplicate()
}
c.mu.RUnlock()
}
// ClearSnapshot will remove the snapshot cache from the list of flushing caches and

View File

@ -4,8 +4,10 @@ import (
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"reflect"
"sync"
"testing"
"github.com/golang/snappy"
@ -192,7 +194,7 @@ func TestCache_Cache_Delete_NonExistent(t *testing.T) {
c.Delete([]string{"bar"})
if got, exp := c.Size(), uint64(0); exp != got {
t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", exp, got)
t.Fatalf("cache size incorrect exp %d, got %d", exp, got)
}
}
@ -400,6 +402,37 @@ func TestCache_CacheWriteMemoryExceeded(t *testing.T) {
}
}
func TestCache_Deduplicate_Concurrent(t *testing.T) {
values := make(map[string][]Value)
for i := 0; i < 1000; i++ {
for j := 0; j < 100; j++ {
values[fmt.Sprintf("cpu%d", i)] = []Value{NewValue(int64(i+j)+int64(rand.Intn(10)), float64(i))}
}
}
wg := sync.WaitGroup{}
c := NewCache(1000000, "")
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
c.WriteMulti(values)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 1000; i++ {
c.Deduplicate()
}
}()
wg.Wait()
}
// Ensure the CacheLoader can correctly load from a single segment, even if it's corrupted.
func TestCacheLoader_LoadSingle(t *testing.T) {
// Create a WAL segment.