Refactor cache
parent
1ae716b64e
commit
8af7c133db
|
@ -0,0 +1,159 @@
|
|||
package tsi1
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
// TagValueSeriesIDCache is an LRU cache for series id sets associated with
|
||||
// name -> key -> value mappings. The purpose of the cache is to provide
|
||||
// efficient means to get sets of series ids that would otherwise involve merging
|
||||
// many individual bitmaps at query time.
|
||||
//
|
||||
// When initialising a TagValueSeriesIDCache a capacity must be provided. When
|
||||
// more than c items are added to the cache, the least recently used item is
|
||||
// evicted from the cache.
|
||||
//
|
||||
// A TagValueSeriesIDCache comprises a linked list implementation to track the
|
||||
// order by which items should be evicted from the cache, and a hashmap implementation
|
||||
// to provide constant time retrievals of items from the cache.
|
||||
type TagValueSeriesIDCache struct {
|
||||
mu sync.RWMutex
|
||||
cache map[string]map[string]map[string]*list.Element
|
||||
evictor *list.List
|
||||
|
||||
capacity int
|
||||
}
|
||||
|
||||
// NewTagValueSeriesIDCache returns a TagValueSeriesIDCache with capacity c.
|
||||
func NewTagValueSeriesIDCache(c int) *TagValueSeriesIDCache {
|
||||
return &TagValueSeriesIDCache{
|
||||
cache: map[string]map[string]map[string]*list.Element{},
|
||||
evictor: list.New(),
|
||||
capacity: c,
|
||||
}
|
||||
}
|
||||
|
||||
// Get returns the SeriesIDSet associated with the {name, key, value} tuple if it
|
||||
// exists.
|
||||
func (c *TagValueSeriesIDCache) Get(name, key, value []byte) *tsdb.SeriesIDSet {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
return c.get(name, key, value)
|
||||
}
|
||||
|
||||
func (c *TagValueSeriesIDCache) get(name, key, value []byte) *tsdb.SeriesIDSet {
|
||||
if mmap, ok := c.cache[string(name)]; ok {
|
||||
if tkmap, ok := mmap[string(key)]; ok {
|
||||
if ele, ok := tkmap[string(value)]; ok {
|
||||
c.evictor.MoveToFront(ele) // This now becomes most recently used.
|
||||
return ele.Value.(*seriesIDCacheElement).SeriesIDSet
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// addToSet adds x to the SeriesIDSet associated with the tuple {name, key, value}
|
||||
// if it exists. This method takes a lock on the underlying SeriesIDSet.
|
||||
//
|
||||
// NB this does not count as an access on the set—therefore the set is not promoted
|
||||
// within the LRU cache.
|
||||
func (c *TagValueSeriesIDCache) addToSet(name, key, value []byte, x uint64) {
|
||||
if mmap, ok := c.cache[string(name)]; ok {
|
||||
if tkmap, ok := mmap[string(key)]; ok {
|
||||
if ele, ok := tkmap[string(value)]; ok {
|
||||
ele.Value.(*seriesIDCacheElement).SeriesIDSet.Add(x)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// measurementContainsSets returns true if there are sets cached for the provided measurement.
|
||||
func (c *TagValueSeriesIDCache) measurementContainsSets(name []byte) bool {
|
||||
_, ok := c.cache[string(name)]
|
||||
return ok
|
||||
}
|
||||
|
||||
// Put adds the SeriesIDSet to the cache under the tuple {name, key, value}. If
|
||||
// the cache is at its limit, then the least recently used item is evicted.
|
||||
func (c *TagValueSeriesIDCache) Put(name, key, value []byte, ss *tsdb.SeriesIDSet) {
|
||||
c.mu.Lock()
|
||||
// Check under the write lock if the relevant item is now in the cache.
|
||||
if c.get(name, key, value) != nil {
|
||||
c.mu.Unlock()
|
||||
return
|
||||
}
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// Create list item, and add to the front of the eviction list.
|
||||
listElement := c.evictor.PushFront(&seriesIDCacheElement{
|
||||
name: name,
|
||||
key: key,
|
||||
value: value,
|
||||
SeriesIDSet: ss,
|
||||
})
|
||||
|
||||
// Add the listElement to the set of items.
|
||||
if mmap, ok := c.cache[string(name)]; ok {
|
||||
if tkmap, ok := mmap[string(key)]; ok {
|
||||
if _, ok := tkmap[string(value)]; ok {
|
||||
// FIXME(edd): only here whilst testing.
|
||||
panic("existence of cache item breaks invariant")
|
||||
}
|
||||
|
||||
// Add the set to the map
|
||||
tkmap[string(value)] = listElement
|
||||
goto EVICT
|
||||
}
|
||||
|
||||
// No series set map for the tag key - first tag value for the tag key.
|
||||
mmap[string(key)] = map[string]*list.Element{string(value): listElement}
|
||||
goto EVICT
|
||||
}
|
||||
|
||||
// No map for the measurement - first tag key for the measurment.
|
||||
c.cache[string(name)] = map[string]map[string]*list.Element{
|
||||
string(key): map[string]*list.Element{string(value): listElement},
|
||||
}
|
||||
|
||||
EVICT:
|
||||
c.checkEviction()
|
||||
}
|
||||
|
||||
// checkEviction checks if the cache is too big, and evicts the least recently used
|
||||
// item if it is.
|
||||
func (c *TagValueSeriesIDCache) checkEviction() {
|
||||
if c.evictor.Len() > c.capacity {
|
||||
// panic("CACHE FULL") // FIXME(edd) remove
|
||||
e := c.evictor.Back() // Least recently used item.
|
||||
listElement := e.Value.(*seriesIDCacheElement)
|
||||
name := listElement.name
|
||||
key := listElement.key
|
||||
value := listElement.value
|
||||
fmt.Printf("goint to remove %q %q %q\n", name, key, value)
|
||||
c.evictor.Remove(e) // Remove from evictor
|
||||
delete(c.cache[string(name)][string(key)], string(value)) // Remove from hashmap of items.
|
||||
|
||||
// Check if there are no tag values for the tag key.
|
||||
if len(c.cache[string(name)][string(key)]) == 0 {
|
||||
delete(c.cache[string(name)], string(key))
|
||||
}
|
||||
|
||||
// Check there are no tag keys for the measurement.
|
||||
if len(c.cache[string(name)]) == 0 {
|
||||
delete(c.cache, string(name))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// seriesIDCacheElement is an item stored within a cache.
|
||||
type seriesIDCacheElement struct {
|
||||
name []byte
|
||||
key []byte
|
||||
value []byte
|
||||
SeriesIDSet *tsdb.SeriesIDSet
|
||||
}
|
|
@ -0,0 +1,146 @@
|
|||
package tsi1
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/tsdb"
|
||||
)
|
||||
|
||||
func TestTagValueSeriesIDCache(t *testing.T) {
|
||||
m0k0v0 := tsdb.NewSeriesIDSet(1, 2, 3, 4, 5)
|
||||
m0k0v1 := tsdb.NewSeriesIDSet(10, 20, 30, 40, 50)
|
||||
m0k1v2 := tsdb.NewSeriesIDSet()
|
||||
m1k3v0 := tsdb.NewSeriesIDSet(900, 0, 929)
|
||||
|
||||
cache := TestCache{NewTagValueSeriesIDCache(10)}
|
||||
cache.Has(t, "m0", "k0", "v0", nil)
|
||||
|
||||
// Putting something in the cache makes it retrievable.
|
||||
cache.PutByString("m0", "k0", "v0", m0k0v0)
|
||||
cache.Has(t, "m0", "k0", "v0", m0k0v0)
|
||||
|
||||
// Putting something else under the same key will not replace the original item.
|
||||
cache.PutByString("m0", "k0", "v0", tsdb.NewSeriesIDSet(100, 200))
|
||||
cache.Has(t, "m0", "k0", "v0", m0k0v0)
|
||||
|
||||
// Add another item to the cache.
|
||||
cache.PutByString("m0", "k0", "v1", m0k0v1)
|
||||
cache.Has(t, "m0", "k0", "v0", m0k0v0)
|
||||
cache.Has(t, "m0", "k0", "v1", m0k0v1)
|
||||
|
||||
// Add some more items
|
||||
cache.PutByString("m0", "k1", "v2", m0k1v2)
|
||||
cache.PutByString("m1", "k3", "v0", m1k3v0)
|
||||
cache.Has(t, "m0", "k0", "v0", m0k0v0)
|
||||
cache.Has(t, "m0", "k0", "v1", m0k0v1)
|
||||
cache.Has(t, "m0", "k1", "v2", m0k1v2)
|
||||
cache.Has(t, "m1", "k3", "v0", m1k3v0)
|
||||
}
|
||||
|
||||
func TestTagValueSeriesIDCache_eviction(t *testing.T) {
|
||||
m0k0v0 := tsdb.NewSeriesIDSet(1, 2, 3, 4, 5)
|
||||
m0k0v1 := tsdb.NewSeriesIDSet(10, 20, 30, 40, 50)
|
||||
m0k1v2 := tsdb.NewSeriesIDSet()
|
||||
m1k3v0 := tsdb.NewSeriesIDSet(900, 0, 929)
|
||||
|
||||
cache := TestCache{NewTagValueSeriesIDCache(4)}
|
||||
cache.PutByString("m0", "k0", "v0", m0k0v0)
|
||||
cache.PutByString("m0", "k0", "v1", m0k0v1)
|
||||
cache.PutByString("m0", "k1", "v2", m0k1v2)
|
||||
cache.PutByString("m1", "k3", "v0", m1k3v0)
|
||||
cache.Has(t, "m0", "k0", "v0", m0k0v0)
|
||||
cache.Has(t, "m0", "k0", "v1", m0k0v1)
|
||||
cache.Has(t, "m0", "k1", "v2", m0k1v2)
|
||||
cache.Has(t, "m1", "k3", "v0", m1k3v0)
|
||||
|
||||
// Putting another item in the cache will evict m0k0v0
|
||||
m2k0v0 := tsdb.NewSeriesIDSet(8, 8, 8)
|
||||
cache.PutByString("m2", "k0", "v0", m2k0v0)
|
||||
if got, exp := cache.evictor.Len(), 4; got != exp {
|
||||
t.Fatalf("cache size was %d, expected %d", got, exp)
|
||||
}
|
||||
cache.HasNot(t, "m0", "k0", "v0")
|
||||
cache.Has(t, "m0", "k0", "v1", m0k0v1)
|
||||
cache.Has(t, "m0", "k1", "v2", m0k1v2)
|
||||
cache.Has(t, "m1", "k3", "v0", m1k3v0)
|
||||
cache.Has(t, "m2", "k0", "v0", m2k0v0)
|
||||
|
||||
// Putting another item in the cache will evict m0k0v1. That will mean
|
||||
// there will be no values left under the tuple {m0, k0}
|
||||
if _, ok := cache.cache[string("m0")][string("k0")]; !ok {
|
||||
t.Fatalf("Map missing for key %q", "k0")
|
||||
}
|
||||
|
||||
m2k0v1 := tsdb.NewSeriesIDSet(8, 8, 8)
|
||||
cache.PutByString("m2", "k0", "v1", m2k0v1)
|
||||
if got, exp := cache.evictor.Len(), 4; got != exp {
|
||||
t.Fatalf("cache size was %d, expected %d", got, exp)
|
||||
}
|
||||
cache.HasNot(t, "m0", "k0", "v0")
|
||||
cache.HasNot(t, "m0", "k0", "v1")
|
||||
cache.Has(t, "m0", "k1", "v2", m0k1v2)
|
||||
cache.Has(t, "m1", "k3", "v0", m1k3v0)
|
||||
cache.Has(t, "m2", "k0", "v0", m2k0v0)
|
||||
cache.Has(t, "m2", "k0", "v1", m2k0v1)
|
||||
|
||||
// Further, the map for all tag values for the tuple {m0, k0} should be removed.
|
||||
if _, ok := cache.cache[string("m0")][string("k0")]; ok {
|
||||
t.Fatalf("Map present for key %q, should be removed", "k0")
|
||||
}
|
||||
|
||||
// Putting another item in the cache will evict m0k1v2. That will mean
|
||||
// there will be no values left under the tuple {m0}
|
||||
if _, ok := cache.cache[string("m0")]; !ok {
|
||||
t.Fatalf("Map missing for key %q", "k0")
|
||||
}
|
||||
m2k0v2 := tsdb.NewSeriesIDSet(8, 9, 9)
|
||||
cache.PutByString("m2", "k0", "v2", m2k0v2)
|
||||
cache.HasNot(t, "m0", "k0", "v0")
|
||||
cache.HasNot(t, "m0", "k0", "v1")
|
||||
cache.HasNot(t, "m0", "k1", "v2")
|
||||
cache.Has(t, "m1", "k3", "v0", m1k3v0)
|
||||
cache.Has(t, "m2", "k0", "v0", m2k0v0)
|
||||
cache.Has(t, "m2", "k0", "v1", m2k0v1)
|
||||
cache.Has(t, "m2", "k0", "v2", m2k0v2)
|
||||
|
||||
// The map for all tag values for the tuple {m0} should be removed.
|
||||
if _, ok := cache.cache[string("m0")]; ok {
|
||||
t.Fatalf("Map present for key %q, should be removed", "k0")
|
||||
}
|
||||
|
||||
// Putting another item in the cache will evict m2k0v0 if we first get m1k3v0
|
||||
// because m2k0v0 will have been used less recently...
|
||||
m3k0v0 := tsdb.NewSeriesIDSet(1000)
|
||||
cache.Has(t, "m1", "k3", "v0", m1k3v0) // This makes it the most recently used rather than the least.
|
||||
cache.PutByString("m3", "k0", "v0", m3k0v0)
|
||||
|
||||
cache.HasNot(t, "m0", "k0", "v0")
|
||||
cache.HasNot(t, "m0", "k0", "v1")
|
||||
cache.HasNot(t, "m0", "k1", "v2")
|
||||
cache.HasNot(t, "m2", "k0", "v0") // This got pushed to the back.
|
||||
|
||||
cache.Has(t, "m1", "k3", "v0", m1k3v0) // This got saved because we looked at it before we added to the cache
|
||||
cache.Has(t, "m2", "k0", "v1", m2k0v1)
|
||||
cache.Has(t, "m2", "k0", "v2", m2k0v2)
|
||||
cache.Has(t, "m3", "k0", "v0", m3k0v0)
|
||||
}
|
||||
|
||||
type TestCache struct {
|
||||
*TagValueSeriesIDCache
|
||||
}
|
||||
|
||||
func (c TestCache) Has(t *testing.T, name, key, value string, ss *tsdb.SeriesIDSet) {
|
||||
if got, exp := c.Get([]byte(name), []byte(key), []byte(value)), ss; got != exp {
|
||||
t.Fatalf("got set %v, expected %v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func (c TestCache) HasNot(t *testing.T, name, key, value string) {
|
||||
if got := c.Get([]byte(name), []byte(key), []byte(value)); got != nil {
|
||||
t.Fatalf("got non-nil set %v for {%q, %q, %q}", got, name, key, value)
|
||||
}
|
||||
}
|
||||
|
||||
func (c TestCache) PutByString(name, key, value string, ss *tsdb.SeriesIDSet) {
|
||||
c.Put([]byte(name), []byte(key), []byte(value), ss)
|
||||
}
|
|
@ -1,7 +1,6 @@
|
|||
package tsi1
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
|
@ -130,9 +129,7 @@ type Index struct {
|
|||
partitions []*Partition
|
||||
opened bool
|
||||
|
||||
cacheMu sync.RWMutex
|
||||
sscache map[string]map[string]map[string]*list.Element
|
||||
ssevict *list.List
|
||||
tagValueCache *TagValueSeriesIDCache
|
||||
|
||||
// The following may be set when initializing an Index.
|
||||
path string // Root directory of the index partitions.
|
||||
|
@ -164,8 +161,7 @@ func (i *Index) UniqueReferenceID() uintptr {
|
|||
// NewIndex returns a new instance of Index.
|
||||
func NewIndex(sfile *tsdb.SeriesFile, database string, options ...IndexOption) *Index {
|
||||
idx := &Index{
|
||||
sscache: map[string]map[string]map[string]*list.Element{},
|
||||
ssevict: list.New(),
|
||||
tagValueCache: NewTagValueSeriesIDCache(DefaultSeriesIDSetCacheSize),
|
||||
maxLogFileSize: tsdb.DefaultMaxIndexLogFileSize,
|
||||
logger: zap.NewNop(),
|
||||
version: Version,
|
||||
|
@ -683,7 +679,7 @@ func (i *Index) CreateSeriesListIfNotExists(keys [][]byte, names [][]byte, tagsS
|
|||
}
|
||||
|
||||
// Some cached bitset results may need to be updated.
|
||||
i.cacheMu.RLock()
|
||||
i.tagValueCache.mu.RLock()
|
||||
for j, id := range ids {
|
||||
if id == 0 {
|
||||
continue
|
||||
|
@ -691,29 +687,27 @@ func (i *Index) CreateSeriesListIfNotExists(keys [][]byte, names [][]byte, tagsS
|
|||
|
||||
name := pNames[idx][j]
|
||||
tags := pTags[idx][j]
|
||||
if tkmap, ok := i.sscache[string(name)]; ok {
|
||||
if i.tagValueCache.measurementContainsSets(name) {
|
||||
for _, pair := range tags {
|
||||
if tvmap, ok := tkmap[string(pair.Key)]; ok {
|
||||
// TODO(edd): It's not clear to me yet whether it will be better to take a lock
|
||||
// on every series id set, or whether to gather them all up under the cache rlock
|
||||
// and then take the cache lock and update them all at once (without invoking a lock
|
||||
// on each series id set).
|
||||
//
|
||||
// Taking the cache lock will block all queries, but is one lock. Taking each series set
|
||||
// lock might be many lock/unlocks but will only block a query that needs that particular set.
|
||||
//
|
||||
// Need to think on it, but I think taking a lock on each series id set is the way to go.
|
||||
//
|
||||
// One other option here is to take a lock on the series id set when we first encounter it
|
||||
// and then keep it locked until we're done with all the ids.
|
||||
if ele, ok := tvmap[string(pair.Value)]; ok {
|
||||
ele.Value.(*ssElement).SeriesIDSet.Add(id) // Takes a lock on the series id set
|
||||
}
|
||||
}
|
||||
// TODO(edd): It's not clear to me yet whether it will be better to take a lock
|
||||
// on every series id set, or whether to gather them all up under the cache rlock
|
||||
// and then take the cache lock and update them all at once (without invoking a lock
|
||||
// on each series id set).
|
||||
//
|
||||
// Taking the cache lock will block all queries, but is one lock. Taking each series set
|
||||
// lock might be many lock/unlocks but will only block a query that needs that particular set.
|
||||
//
|
||||
// Need to think on it, but I think taking a lock on each series id set is the way to go.
|
||||
//
|
||||
// One other option here is to take a lock on the series id set when we first encounter it
|
||||
// and then keep it locked until we're done with all the ids.
|
||||
//
|
||||
// Note: this will only add `id` to the set if it exists.
|
||||
i.tagValueCache.addToSet(name, pair.Key, pair.Value, id) // Takes a lock on the series id set
|
||||
}
|
||||
}
|
||||
}
|
||||
i.cacheMu.RUnlock()
|
||||
i.tagValueCache.mu.RUnlock()
|
||||
|
||||
errC <- err
|
||||
}
|
||||
|
@ -753,26 +747,24 @@ func (i *Index) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) erro
|
|||
|
||||
// If there are cached sets for any of the tag pairs, they will need to be
|
||||
// updated with the series id.
|
||||
i.cacheMu.RLock()
|
||||
if tkmap, ok := i.sscache[string(name)]; ok {
|
||||
i.tagValueCache.mu.RLock()
|
||||
if i.tagValueCache.measurementContainsSets(name) {
|
||||
for _, pair := range tags {
|
||||
if tvmap, ok := tkmap[string(pair.Key)]; ok {
|
||||
// TODO(edd): It's not clear to me yet whether it will be better to take a lock
|
||||
// on every series id set, or whether to gather them all up under the cache rlock
|
||||
// and then take the cache lock and update them all at once (without invoking a lock
|
||||
// on each series id set).
|
||||
//
|
||||
// Taking the cache lock will block all queries, but is one lock. Taking each series set
|
||||
// lock might be many lock/unlocks but will only block a query that needs that particular set.
|
||||
//
|
||||
// Need to think on it, but I think taking a lock on each series id set is the way to go.
|
||||
if ele, ok := tvmap[string(pair.Value)]; ok {
|
||||
ele.Value.(*ssElement).SeriesIDSet.Add(ids[0]) // Takes a lock on the series id set
|
||||
}
|
||||
}
|
||||
// TODO(edd): It's not clear to me yet whether it will be better to take a lock
|
||||
// on every series id set, or whether to gather them all up under the cache rlock
|
||||
// and then take the cache lock and update them all at once (without invoking a lock
|
||||
// on each series id set).
|
||||
//
|
||||
// Taking the cache lock will block all queries, but is one lock. Taking each series set
|
||||
// lock might be many lock/unlocks but will only block a query that needs that particular set.
|
||||
//
|
||||
// Need to think on it, but I think taking a lock on each series id set is the way to go.
|
||||
//
|
||||
// Note this will only add `id` to the set if it exists.
|
||||
i.tagValueCache.addToSet(name, pair.Key, pair.Value, ids[0]) // Takes a lock on the series id set
|
||||
}
|
||||
}
|
||||
i.cacheMu.RUnlock()
|
||||
i.tagValueCache.mu.RUnlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -975,63 +967,14 @@ func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator,
|
|||
return tsdb.MergeSeriesIDIterators(a...), nil
|
||||
}
|
||||
|
||||
func (i *Index) tagValueSeriesIDSet(name, key, value []byte) *list.Element {
|
||||
if !EnableBitsetCache {
|
||||
return nil
|
||||
}
|
||||
|
||||
i.cacheMu.RLock()
|
||||
defer i.cacheMu.RUnlock()
|
||||
if tkmap, ok := i.sscache[string(name)]; ok {
|
||||
if tvmap, ok := tkmap[string(key)]; ok {
|
||||
if ele, ok := tvmap[string(value)]; ok {
|
||||
return ele
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Index) putTagValueSeriesIDSet(name, key, value []byte, ss *list.Element) {
|
||||
if !EnableBitsetCache {
|
||||
return
|
||||
}
|
||||
|
||||
if mmap, ok := i.sscache[string(name)]; ok {
|
||||
if tkmap, ok := mmap[string(key)]; ok {
|
||||
|
||||
if _, ok := tkmap[string(value)]; ok {
|
||||
// Already have it. Can happen with concurrent write.
|
||||
return
|
||||
}
|
||||
|
||||
// Add the set to the map
|
||||
tkmap[string(value)] = ss
|
||||
return
|
||||
}
|
||||
|
||||
// No series set map for the tag key
|
||||
mmap[string(key)] = map[string]*list.Element{string(value): ss}
|
||||
return
|
||||
}
|
||||
|
||||
// No map for the measurement
|
||||
i.sscache[string(name)] = map[string]map[string]*list.Element{
|
||||
string(key): map[string]*list.Element{string(value): ss},
|
||||
}
|
||||
}
|
||||
|
||||
// TagValueSeriesIDIterator returns a series iterator for a single tag value.
|
||||
func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
|
||||
// Check series ID set cache...
|
||||
if ss := i.tagValueSeriesIDSet(name, key, value); ss != nil {
|
||||
// Move this series set to the front of the eviction queue.
|
||||
i.cacheMu.Lock()
|
||||
i.ssevict.MoveToFront(ss)
|
||||
i.cacheMu.Unlock()
|
||||
|
||||
// Return a clone because the set is mutable.
|
||||
return tsdb.NewSeriesIDSetIterator(ss.Value.(*ssElement).SeriesIDSet.Clone()), nil
|
||||
if EnableBitsetCache {
|
||||
if ss := i.tagValueCache.Get(name, key, value); ss != nil {
|
||||
// Return a clone because the set is mutable.
|
||||
return tsdb.NewSeriesIDSetIterator(ss.Clone()), nil
|
||||
}
|
||||
}
|
||||
|
||||
a := make([]tsdb.SeriesIDIterator, 0, len(i.partitions))
|
||||
|
@ -1045,43 +988,15 @@ func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesID
|
|||
}
|
||||
|
||||
itr := tsdb.MergeSeriesIDIterators(a...)
|
||||
if !EnableBitsetCache {
|
||||
return itr, nil
|
||||
}
|
||||
|
||||
// Check if the iterator contains only series id sets. Cache them...
|
||||
if ssitr, ok := itr.(tsdb.SeriesIDSetIterator); ok {
|
||||
i.cacheMu.Lock()
|
||||
|
||||
// Check once more under write lock.
|
||||
if tkmap, ok := i.sscache[string(name)]; ok {
|
||||
if tvmap, ok := tkmap[string(key)]; ok {
|
||||
if _, ok := tvmap[string(value)]; ok {
|
||||
i.cacheMu.Unlock()
|
||||
return itr, nil // Already in cache
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create element and put at front of eviction queue.
|
||||
ssitr.SeriesIDSet().SetCOW(true)
|
||||
ele := i.ssevict.PushFront(&ssElement{
|
||||
name: name,
|
||||
key: key,
|
||||
value: value,
|
||||
SeriesIDSet: ssitr.SeriesIDSet(),
|
||||
})
|
||||
|
||||
// Add element to cache.
|
||||
i.putTagValueSeriesIDSet(name, key, value, ele)
|
||||
|
||||
// Does something need to be evicted from the cache?
|
||||
if i.ssevict.Len() > DefaultSeriesIDSetCacheSize {
|
||||
panic("CACHE FULL") // FIXME(edd) remove
|
||||
e := i.ssevict.Back()
|
||||
i.ssevict.Remove(e)
|
||||
|
||||
ele := e.Value.(*ssElement)
|
||||
delete(i.sscache[string(ele.name)][string(ele.key)], string(ele.value))
|
||||
}
|
||||
i.cacheMu.Unlock()
|
||||
ss := ssitr.SeriesIDSet()
|
||||
ss.SetCOW(true) // This is important to speed the clone up.
|
||||
i.tagValueCache.Put(name, key, value, ss)
|
||||
} else {
|
||||
fmt.Printf("UNABLE TO PUT %T for %q %q %q\n", itr, name, key, value)
|
||||
}
|
||||
|
|
|
@ -584,8 +584,8 @@ var tsiditr tsdb.SeriesIDIterator
|
|||
// together. In the case of tsl files the sets need to are cloned and then merged.
|
||||
//
|
||||
// Typical results on an i7 laptop
|
||||
// BenchmarkIndex_IndexFile_TagValueSeriesIDIterator/78888_series_TagValueSeriesIDIterator/cache-8 100000 15049 ns/op 100616 B/op 39 allocs/op
|
||||
// BenchmarkIndex_IndexFile_TagValueSeriesIDIterator/78888_series_TagValueSeriesIDIterator/no_cache-8 10000 131034 ns/op 125080 B/op 352 allocs/op
|
||||
// BenchmarkIndex_IndexFile_TagValueSeriesIDIterator/78888_series_TagValueSeriesIDIterator/cache-8 2000000 643 ns/op 744 B/op 13 allocs/op
|
||||
// BenchmarkIndex_IndexFile_TagValueSeriesIDIterator/78888_series_TagValueSeriesIDIterator/no_cache-8 10000 130749 ns/op 124952 B/op 350 allocs/op
|
||||
func BenchmarkIndex_IndexFile_TagValueSeriesIDIterator(b *testing.B) {
|
||||
var err error
|
||||
sfile := NewSeriesFile()
|
||||
|
@ -717,16 +717,14 @@ func BenchmarkIndex_CreateSeriesListIfNotExists(b *testing.B) {
|
|||
// The idea is to emphasize the performance difference when bitset caching is on and off.
|
||||
//
|
||||
// Typical results for an i7 laptop
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_1/cache-8 1 5519979264 ns/op 19768957384 B/op 40035662 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_1/no_cache-8 1 4244426712 ns/op 5588384496 B/op 69527254 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_2/cache-8 1 5695594405 ns/op 17565272512 B/op 86973739 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_2/no_cache-8 1 4800202002 ns/op 6772213648 B/op 106802140 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_4/cache-8 1 9377769247 ns/op 22060334496 B/op 156652125 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_4/no_cache-8 1 9496432555 ns/op 11124867792 B/op 191979975 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_8/cache-8 1 13687588689 ns/op 24639425936 B/op 285704923 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_8/no_cache-8 1 14852905065 ns/op 21239729512 B/op 391653485 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_16/cache-8 1 18562123757 ns/op 30728182200 B/op 447833013 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_16/no_cache-8 1 19779330203 ns/op 29268458824 B/op 528321987 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_1/queries_100000/cache-8 1 3836451407 ns/op 2453296232 B/op 22648482 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_4/queries_100000/cache-8 1 1836598730 ns/op 2435668224 B/op 22908705 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_8/queries_100000/cache-8 1 1714771527 ns/op 2341518456 B/op 23450621 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_16/queries_100000/cache-8 1 1810658403 ns/op 2401239408 B/op 23868079 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_1/queries_100000/no_cache-8 1 4044478305 ns/op 4414915048 B/op 27292357 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_4/queries_100000/no_cache-8 1 18663345153 ns/op 23035974472 B/op 54015704 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_8/queries_100000/no_cache-8 1 22242979152 ns/op 28178915600 B/op 80156305 allocs/op
|
||||
// BenchmarkIndex_ConcurrentWriteQuery/partition_16/queries_100000/no_cache-8 1 24817283922 ns/op 34613960984 B/op 150356327 allocs/op
|
||||
func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) {
|
||||
// Read line-protocol and coerce into tsdb format.
|
||||
keys := make([][]byte, 0, 1e6)
|
||||
|
@ -766,13 +764,12 @@ func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) {
|
|||
tags = append(tags, pt.Tags())
|
||||
}
|
||||
|
||||
runBenchmark := func(b *testing.B, sz int, partitions uint64) {
|
||||
runBenchmark := func(b *testing.B, queryN int, partitions uint64) {
|
||||
idx := MustOpenIndex(partitions)
|
||||
done := make(chan struct{})
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Run concurrent iterator...
|
||||
runIter := func(done chan struct{}) {
|
||||
runIter := func() {
|
||||
keys := [][]string{
|
||||
{"m0", "tag2", "value4"},
|
||||
{"m1", "tag3", "value5"},
|
||||
|
@ -781,13 +778,7 @@ func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) {
|
|||
{"m4", "tag5", "value0"},
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
for i := 0; i < queryN/5; i++ {
|
||||
for _, key := range keys {
|
||||
itr, err := idx.TagValueSeriesIDIterator([]byte(key[0]), []byte(key[1]), []byte(key[2]))
|
||||
if err != nil {
|
||||
|
@ -803,47 +794,52 @@ func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) {
|
|||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func() { defer wg.Done(); runIter(done) }()
|
||||
for j := 0; j < b.N; j++ {
|
||||
for i := 0; i < len(keys); i += sz {
|
||||
k := keys[i : i+sz]
|
||||
n := names[i : i+sz]
|
||||
t := tags[i : i+sz]
|
||||
go func() { defer wg.Done(); runIter() }()
|
||||
batchSize := 10000
|
||||
for j := 0; j < 1; j++ {
|
||||
for i := 0; i < len(keys); i += batchSize {
|
||||
k := keys[i : i+batchSize]
|
||||
n := names[i : i+batchSize]
|
||||
t := tags[i : i+batchSize]
|
||||
if errResult = idx.CreateSeriesListIfNotExists(k, n, t); errResult != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
// Reset the index...
|
||||
b.StopTimer()
|
||||
|
||||
close(done)
|
||||
// Wait for queries to finish
|
||||
wg.Wait()
|
||||
|
||||
// Reset the index...
|
||||
b.StopTimer()
|
||||
if err := idx.Close(); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
// Re-open everything
|
||||
idx = MustOpenIndex(partitions)
|
||||
done = make(chan struct{})
|
||||
wg.Add(1)
|
||||
go func() { defer wg.Done(); runIter(done) }()
|
||||
go func() { defer wg.Done(); runIter() }()
|
||||
b.StartTimer()
|
||||
}
|
||||
}
|
||||
|
||||
partitions := []uint64{1, 2, 4, 8, 16}
|
||||
partitions := []uint64{1, 4, 8, 16}
|
||||
queries := []int{1e5}
|
||||
for _, partition := range partitions {
|
||||
b.Run(fmt.Sprintf("partition %d", partition), func(b *testing.B) {
|
||||
b.Run("cache", func(b *testing.B) {
|
||||
tsi1.EnableBitsetCache = true
|
||||
runBenchmark(b, 10000, partition)
|
||||
})
|
||||
for _, queryN := range queries {
|
||||
b.Run(fmt.Sprintf("queries %d", queryN), func(b *testing.B) {
|
||||
b.Run("cache", func(b *testing.B) {
|
||||
tsi1.EnableBitsetCache = true
|
||||
runBenchmark(b, queryN, partition)
|
||||
})
|
||||
|
||||
b.Run("no cache", func(b *testing.B) {
|
||||
tsi1.EnableBitsetCache = false
|
||||
runBenchmark(b, 10000, partition)
|
||||
})
|
||||
b.Run("no cache", func(b *testing.B) {
|
||||
tsi1.EnableBitsetCache = false
|
||||
runBenchmark(b, queryN, partition)
|
||||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue