Convert cache to LRU
parent
2ae2157d02
commit
2c4c79f110
|
@ -1,6 +1,7 @@
|
|||
package tsi1
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
|
@ -26,6 +27,9 @@ import (
|
|||
// IndexName is the name of the index.
|
||||
const IndexName = tsdb.TSI1IndexName
|
||||
|
||||
// DefaultSeriesIDSetCacheSize is the default number of series ID sets to cache.
|
||||
const DefaultSeriesIDSetCacheSize = 10
|
||||
|
||||
// ErrCompactionInterrupted is returned if compactions are disabled or
|
||||
// an index is closed while a compaction is occurring.
|
||||
var ErrCompactionInterrupted = errors.New("tsi1: compaction interrupted")
|
||||
|
@ -117,7 +121,8 @@ type Index struct {
|
|||
opened bool
|
||||
|
||||
cacheMu sync.RWMutex
|
||||
sscache map[string]map[string]map[string]*tsdb.SeriesIDSet
|
||||
sscache map[string]map[string]map[string]*list.Element
|
||||
ssevict *list.List
|
||||
|
||||
// The following may be set when initializing an Index.
|
||||
path string // Root directory of the index partitions.
|
||||
|
@ -149,7 +154,8 @@ func (i *Index) UniqueReferenceID() uintptr {
|
|||
// NewIndex returns a new instance of Index.
|
||||
func NewIndex(sfile *tsdb.SeriesFile, database string, options ...IndexOption) *Index {
|
||||
idx := &Index{
|
||||
sscache: map[string]map[string]map[string]*tsdb.SeriesIDSet{},
|
||||
sscache: map[string]map[string]map[string]*list.Element{},
|
||||
ssevict: list.New(),
|
||||
maxLogFileSize: tsdb.DefaultMaxIndexLogFileSize,
|
||||
logger: zap.NewNop(),
|
||||
version: Version,
|
||||
|
@ -882,22 +888,20 @@ func (i *Index) TagKeySeriesIDIterator(name, key []byte) (tsdb.SeriesIDIterator,
|
|||
return tsdb.MergeSeriesIDIterators(a...), nil
|
||||
}
|
||||
|
||||
func (i *Index) tagValueSeriesIDSet(name, key, value []byte) *tsdb.SeriesIDSet {
|
||||
func (i *Index) tagValueSeriesIDSet(name, key, value []byte) *list.Element {
|
||||
i.cacheMu.RLock()
|
||||
defer i.cacheMu.RUnlock()
|
||||
if tkmap, ok := i.sscache[string(name)]; ok {
|
||||
if tvmap, ok := tkmap[string(key)]; ok {
|
||||
if ss, ok := tvmap[string(value)]; ok {
|
||||
return ss.Clone() // to be safe.
|
||||
if ele, ok := tvmap[string(value)]; ok {
|
||||
return ele
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Index) putTagValueSeriesIDSet(name, key, value []byte, ss *tsdb.SeriesIDSet) {
|
||||
i.cacheMu.Lock()
|
||||
defer i.cacheMu.Unlock()
|
||||
func (i *Index) putTagValueSeriesIDSet(name, key, value []byte, ss *list.Element) {
|
||||
if mmap, ok := i.sscache[string(name)]; ok {
|
||||
if tkmap, ok := mmap[string(key)]; ok {
|
||||
|
||||
|
@ -912,13 +916,13 @@ func (i *Index) putTagValueSeriesIDSet(name, key, value []byte, ss *tsdb.SeriesI
|
|||
}
|
||||
|
||||
// No series set map for the tag key
|
||||
mmap[string(key)] = map[string]*tsdb.SeriesIDSet{string(value): ss}
|
||||
mmap[string(key)] = map[string]*list.Element{string(value): ss}
|
||||
return
|
||||
}
|
||||
|
||||
// No map for the measurement
|
||||
i.sscache[string(name)] = map[string]map[string]*tsdb.SeriesIDSet{
|
||||
string(key): map[string]*tsdb.SeriesIDSet{string(value): ss},
|
||||
i.sscache[string(name)] = map[string]map[string]*list.Element{
|
||||
string(key): map[string]*list.Element{string(value): ss},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -926,8 +930,13 @@ func (i *Index) putTagValueSeriesIDSet(name, key, value []byte, ss *tsdb.SeriesI
|
|||
func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
|
||||
// Check series ID set cache...
|
||||
if ss := i.tagValueSeriesIDSet(name, key, value); ss != nil {
|
||||
// fmt.Printf("Using cached set for %q %q %q\n", name, key, value)
|
||||
return tsdb.NewSeriesIDSetIterator(ss), nil
|
||||
// Move this series set to the front of the eviction queue.
|
||||
i.cacheMu.Lock()
|
||||
i.ssevict.MoveToFront(ss)
|
||||
i.cacheMu.Unlock()
|
||||
|
||||
// Return a clone because the set is mutable.
|
||||
return tsdb.NewSeriesIDSetIterator(ss.Value.(*ssElement).SeriesIDSet.Clone()), nil
|
||||
}
|
||||
|
||||
fmt.Printf("CACHE MISS %q %q %q\n", name, key, value)
|
||||
|
@ -946,7 +955,27 @@ func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesID
|
|||
|
||||
// Check if the iterator contains only series id sets. Cache them...
|
||||
if ssitr, ok := itr.(tsdb.SeriesIDSetIterator); ok {
|
||||
i.putTagValueSeriesIDSet(name, key, value, ssitr.SeriesIDSet())
|
||||
i.cacheMu.Lock()
|
||||
// Create element and put at front of eviction queue.
|
||||
ele := i.ssevict.PushFront(&ssElement{
|
||||
name: name,
|
||||
key: key,
|
||||
value: value,
|
||||
SeriesIDSet: ssitr.SeriesIDSet(),
|
||||
})
|
||||
|
||||
// Add element to cache.
|
||||
i.putTagValueSeriesIDSet(name, key, value, ele)
|
||||
|
||||
// Does something need to be evicted from the cache?
|
||||
if i.ssevict.Len() > DefaultSeriesIDSetCacheSize {
|
||||
e := i.ssevict.Back()
|
||||
i.ssevict.Remove(e)
|
||||
|
||||
ele := e.Value.(*ssElement)
|
||||
delete(i.sscache[string(ele.name)][string(ele.key)], string(ele.value))
|
||||
}
|
||||
i.cacheMu.Unlock()
|
||||
} else {
|
||||
fmt.Printf("UNABLE TO PUT %T for %q %q %q\n", itr, name, key, value)
|
||||
}
|
||||
|
@ -1061,3 +1090,12 @@ func IsIndexDir(path string) (bool, error) {
|
|||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// ssElement is is stored within the LRU. It makes it possible to do constant
|
||||
// time access via a hashmap and also storage inside a linked list.
|
||||
type ssElement struct {
|
||||
name []byte
|
||||
key []byte
|
||||
value []byte
|
||||
SeriesIDSet *tsdb.SeriesIDSet
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue