From 5c88a1dd0e23cbcdf2787272cb820490f1610060 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Wed, 8 Aug 2018 23:41:26 +0100 Subject: [PATCH] Fix locking on cache --- tsdb/index/tsi1/cache.go | 30 ++++++++++---- tsdb/index/tsi1/cache_test.go | 75 +++++++++++++++++++++++++++++++++++ tsdb/index/tsi1/index_test.go | 4 ++ 3 files changed, 101 insertions(+), 8 deletions(-) diff --git a/tsdb/index/tsi1/cache.go b/tsdb/index/tsi1/cache.go index a3aacc4027..22a8d9d90e 100644 --- a/tsdb/index/tsi1/cache.go +++ b/tsdb/index/tsi1/cache.go @@ -2,7 +2,6 @@ package tsi1 import ( "container/list" - "fmt" "sync" "github.com/influxdata/influxdb/tsdb" @@ -40,8 +39,8 @@ func NewTagValueSeriesIDCache(c int) *TagValueSeriesIDCache { // Get returns the SeriesIDSet associated with the {name, key, value} tuple if it // exists. func (c *TagValueSeriesIDCache) Get(name, key, value []byte) *tsdb.SeriesIDSet { - c.mu.RLock() - defer c.mu.RUnlock() + c.mu.Lock() + defer c.mu.Unlock() return c.get(name, key, value) } @@ -57,6 +56,17 @@ func (c *TagValueSeriesIDCache) get(name, key, value []byte) *tsdb.SeriesIDSet { return nil } +// exists returns true if the an item exists for the tuple {name, key, value}. +func (c *TagValueSeriesIDCache) exists(name, key, value []byte) bool { + if mmap, ok := c.cache[string(name)]; ok { + if tkmap, ok := mmap[string(key)]; ok { + _, ok := tkmap[string(value)] + return ok + } + } + return false +} + // addToSet adds x to the SeriesIDSet associated with the tuple {name, key, value} // if it exists. This method takes a lock on the underlying SeriesIDSet. // @@ -66,6 +76,11 @@ func (c *TagValueSeriesIDCache) addToSet(name, key, value []byte, x uint64) { if mmap, ok := c.cache[string(name)]; ok { if tkmap, ok := mmap[string(key)]; ok { if ele, ok := tkmap[string(value)]; ok { + ss := ele.Value.(*seriesIDCacheElement).SeriesIDSet + if ss == nil { + ele.Value.(*seriesIDCacheElement).SeriesIDSet = tsdb.NewSeriesIDSet(x) + return + } ele.Value.(*seriesIDCacheElement).SeriesIDSet.Add(x) } } @@ -83,7 +98,7 @@ func (c *TagValueSeriesIDCache) measurementContainsSets(name []byte) bool { func (c *TagValueSeriesIDCache) Put(name, key, value []byte, ss *tsdb.SeriesIDSet) { c.mu.Lock() // Check under the write lock if the relevant item is now in the cache. - if c.get(name, key, value) != nil { + if c.exists(name, key, value) { c.mu.Unlock() return } @@ -128,22 +143,21 @@ EVICT: // item if it is. func (c *TagValueSeriesIDCache) checkEviction() { if c.evictor.Len() > c.capacity { - panic("CACHE FULL") // FIXME(edd) remove e := c.evictor.Back() // Least recently used item. listElement := e.Value.(*seriesIDCacheElement) name := listElement.name key := listElement.key value := listElement.value - fmt.Printf("goint to remove %q %q %q\n", name, key, value) + c.evictor.Remove(e) // Remove from evictor delete(c.cache[string(name)][string(key)], string(value)) // Remove from hashmap of items. - // Check if there are no tag values for the tag key. + // Check if there are no more tag values for the tag key. if len(c.cache[string(name)][string(key)]) == 0 { delete(c.cache[string(name)], string(key)) } - // Check there are no tag keys for the measurement. + // Check there are no more tag keys for the measurement. if len(c.cache[string(name)]) == 0 { delete(c.cache, string(name)) } diff --git a/tsdb/index/tsi1/cache_test.go b/tsdb/index/tsi1/cache_test.go index f77e4c2728..cdcb84dc38 100644 --- a/tsdb/index/tsi1/cache_test.go +++ b/tsdb/index/tsi1/cache_test.go @@ -1,7 +1,10 @@ package tsi1 import ( + "math/rand" + "sync" "testing" + "time" "github.com/influxdata/influxdb/tsdb" ) @@ -125,6 +128,74 @@ func TestTagValueSeriesIDCache_eviction(t *testing.T) { cache.Has(t, "m3", "k0", "v0", m3k0v0) } +func TestTagValueSeriesIDCache_addToSet(t *testing.T) { + cache := TestCache{NewTagValueSeriesIDCache(4)} + cache.PutByString("m0", "k0", "v0", nil) // Puts a nil set in the cache. + s2 := tsdb.NewSeriesIDSet(100) + cache.PutByString("m0", "k0", "v1", s2) + cache.Has(t, "m0", "k0", "v0", nil) + cache.Has(t, "m0", "k0", "v1", s2) + + cache.addToSet([]byte("m0"), []byte("k0"), []byte("v0"), 20) // No non-nil set exists so one will be created + cache.addToSet([]byte("m0"), []byte("k0"), []byte("v1"), 101) // No non-nil set exists so one will be created + cache.Has(t, "m0", "k0", "v1", s2) + + ss := cache.GetByString("m0", "k0", "v0") + if !tsdb.NewSeriesIDSet(20).Equals(ss) { + t.Fatalf("series id set was %v", ss) + } + +} + +func TestTagValueSeriesIDCache_ConcurrentGetPut(t *testing.T) { + if testing.Short() { + t.Skip("Skipping long test") + } + + a := []string{"a", "b", "c", "d", "e"} + rnd := func() []byte { + return []byte(a[rand.Intn(len(a)-1)]) + } + + cache := TestCache{NewTagValueSeriesIDCache(100)} + done := make(chan struct{}) + var wg sync.WaitGroup + + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-done: + return + default: + } + cache.Put(rnd(), rnd(), rnd(), tsdb.NewSeriesIDSet()) + } + }() + } + + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-done: + return + default: + } + _ = cache.Get(rnd(), rnd(), rnd()) + } + }() + } + + time.Sleep(10 * time.Second) + close(done) + wg.Wait() +} + type TestCache struct { *TagValueSeriesIDCache } @@ -141,6 +212,10 @@ func (c TestCache) HasNot(t *testing.T, name, key, value string) { } } +func (c TestCache) GetByString(name, key, value string) *tsdb.SeriesIDSet { + return c.Get([]byte(name), []byte(key), []byte(value)) +} + func (c TestCache) PutByString(name, key, value string, ss *tsdb.SeriesIDSet) { c.Put([]byte(name), []byte(key), []byte(value), ss) } diff --git a/tsdb/index/tsi1/index_test.go b/tsdb/index/tsi1/index_test.go index 7c5396ecc7..dd82d043b9 100644 --- a/tsdb/index/tsi1/index_test.go +++ b/tsdb/index/tsi1/index_test.go @@ -460,6 +460,10 @@ func TestIndex_TagValueSeriesIDIterator(t *testing.T) { "mem,region=west,server=c", }) }) + + t.Run("no matching series", func(t *testing.T) { + testTagValueSeriesIDIterator(t, "foo", "bar", "zoo", nil) + }) } // Index is a test wrapper for tsi1.Index.