Fix locking on cache

pull/10234/head
Edd Robinson 2018-08-08 23:41:26 +01:00
parent 6d12f5d323
commit 5c88a1dd0e
3 changed files with 101 additions and 8 deletions

View File

@ -2,7 +2,6 @@ package tsi1
import (
"container/list"
"fmt"
"sync"
"github.com/influxdata/influxdb/tsdb"
@ -40,8 +39,8 @@ func NewTagValueSeriesIDCache(c int) *TagValueSeriesIDCache {
// Get returns the SeriesIDSet associated with the {name, key, value} tuple if it
// exists.
func (c *TagValueSeriesIDCache) Get(name, key, value []byte) *tsdb.SeriesIDSet {
c.mu.RLock()
defer c.mu.RUnlock()
c.mu.Lock()
defer c.mu.Unlock()
return c.get(name, key, value)
}
@ -57,6 +56,17 @@ func (c *TagValueSeriesIDCache) get(name, key, value []byte) *tsdb.SeriesIDSet {
return nil
}
// exists returns true if the an item exists for the tuple {name, key, value}.
func (c *TagValueSeriesIDCache) exists(name, key, value []byte) bool {
if mmap, ok := c.cache[string(name)]; ok {
if tkmap, ok := mmap[string(key)]; ok {
_, ok := tkmap[string(value)]
return ok
}
}
return false
}
// addToSet adds x to the SeriesIDSet associated with the tuple {name, key, value}
// if it exists. This method takes a lock on the underlying SeriesIDSet.
//
@ -66,6 +76,11 @@ func (c *TagValueSeriesIDCache) addToSet(name, key, value []byte, x uint64) {
if mmap, ok := c.cache[string(name)]; ok {
if tkmap, ok := mmap[string(key)]; ok {
if ele, ok := tkmap[string(value)]; ok {
ss := ele.Value.(*seriesIDCacheElement).SeriesIDSet
if ss == nil {
ele.Value.(*seriesIDCacheElement).SeriesIDSet = tsdb.NewSeriesIDSet(x)
return
}
ele.Value.(*seriesIDCacheElement).SeriesIDSet.Add(x)
}
}
@ -83,7 +98,7 @@ func (c *TagValueSeriesIDCache) measurementContainsSets(name []byte) bool {
func (c *TagValueSeriesIDCache) Put(name, key, value []byte, ss *tsdb.SeriesIDSet) {
c.mu.Lock()
// Check under the write lock if the relevant item is now in the cache.
if c.get(name, key, value) != nil {
if c.exists(name, key, value) {
c.mu.Unlock()
return
}
@ -128,22 +143,21 @@ EVICT:
// item if it is.
func (c *TagValueSeriesIDCache) checkEviction() {
if c.evictor.Len() > c.capacity {
panic("CACHE FULL") // FIXME(edd) remove
e := c.evictor.Back() // Least recently used item.
listElement := e.Value.(*seriesIDCacheElement)
name := listElement.name
key := listElement.key
value := listElement.value
fmt.Printf("goint to remove %q %q %q\n", name, key, value)
c.evictor.Remove(e) // Remove from evictor
delete(c.cache[string(name)][string(key)], string(value)) // Remove from hashmap of items.
// Check if there are no tag values for the tag key.
// Check if there are no more tag values for the tag key.
if len(c.cache[string(name)][string(key)]) == 0 {
delete(c.cache[string(name)], string(key))
}
// Check there are no tag keys for the measurement.
// Check there are no more tag keys for the measurement.
if len(c.cache[string(name)]) == 0 {
delete(c.cache, string(name))
}

View File

@ -1,7 +1,10 @@
package tsi1
import (
"math/rand"
"sync"
"testing"
"time"
"github.com/influxdata/influxdb/tsdb"
)
@ -125,6 +128,74 @@ func TestTagValueSeriesIDCache_eviction(t *testing.T) {
cache.Has(t, "m3", "k0", "v0", m3k0v0)
}
func TestTagValueSeriesIDCache_addToSet(t *testing.T) {
cache := TestCache{NewTagValueSeriesIDCache(4)}
cache.PutByString("m0", "k0", "v0", nil) // Puts a nil set in the cache.
s2 := tsdb.NewSeriesIDSet(100)
cache.PutByString("m0", "k0", "v1", s2)
cache.Has(t, "m0", "k0", "v0", nil)
cache.Has(t, "m0", "k0", "v1", s2)
cache.addToSet([]byte("m0"), []byte("k0"), []byte("v0"), 20) // No non-nil set exists so one will be created
cache.addToSet([]byte("m0"), []byte("k0"), []byte("v1"), 101) // No non-nil set exists so one will be created
cache.Has(t, "m0", "k0", "v1", s2)
ss := cache.GetByString("m0", "k0", "v0")
if !tsdb.NewSeriesIDSet(20).Equals(ss) {
t.Fatalf("series id set was %v", ss)
}
}
func TestTagValueSeriesIDCache_ConcurrentGetPut(t *testing.T) {
if testing.Short() {
t.Skip("Skipping long test")
}
a := []string{"a", "b", "c", "d", "e"}
rnd := func() []byte {
return []byte(a[rand.Intn(len(a)-1)])
}
cache := TestCache{NewTagValueSeriesIDCache(100)}
done := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-done:
return
default:
}
cache.Put(rnd(), rnd(), rnd(), tsdb.NewSeriesIDSet())
}
}()
}
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-done:
return
default:
}
_ = cache.Get(rnd(), rnd(), rnd())
}
}()
}
time.Sleep(10 * time.Second)
close(done)
wg.Wait()
}
type TestCache struct {
*TagValueSeriesIDCache
}
@ -141,6 +212,10 @@ func (c TestCache) HasNot(t *testing.T, name, key, value string) {
}
}
func (c TestCache) GetByString(name, key, value string) *tsdb.SeriesIDSet {
return c.Get([]byte(name), []byte(key), []byte(value))
}
func (c TestCache) PutByString(name, key, value string, ss *tsdb.SeriesIDSet) {
c.Put([]byte(name), []byte(key), []byte(value), ss)
}

View File

@ -460,6 +460,10 @@ func TestIndex_TagValueSeriesIDIterator(t *testing.T) {
"mem,region=west,server=c",
})
})
t.Run("no matching series", func(t *testing.T) {
testTagValueSeriesIDIterator(t, "foo", "bar", "zoo", nil)
})
}
// Index is a test wrapper for tsi1.Index.