Allow TSI cache to be dynamically altered
parent
3ea3d90e10
commit
e0cddadffd
|
@ -273,8 +273,10 @@ func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) {
|
|||
b.Fatal(err)
|
||||
}
|
||||
|
||||
runBenchmark := func(b *testing.B, queryN int) {
|
||||
idx := MustOpenNewIndex(tsi1.NewConfig())
|
||||
runBenchmark := func(b *testing.B, queryN int, cacheSize uint64) {
|
||||
config := tsi1.NewConfig()
|
||||
config.SeriesIDSetCacheSize = cacheSize
|
||||
idx := MustOpenNewIndex(config)
|
||||
var wg sync.WaitGroup
|
||||
begin := make(chan struct{})
|
||||
|
||||
|
@ -343,13 +345,11 @@ func BenchmarkIndex_ConcurrentWriteQuery(b *testing.B) {
|
|||
for _, queryN := range queries {
|
||||
b.Run(fmt.Sprintf("queries %d", queryN), func(b *testing.B) {
|
||||
b.Run("cache", func(b *testing.B) {
|
||||
tsi1.EnableBitsetCache = true
|
||||
runBenchmark(b, queryN)
|
||||
runBenchmark(b, queryN, tsi1.DefaultSeriesIDSetCacheSize)
|
||||
})
|
||||
|
||||
b.Run("no cache", func(b *testing.B) {
|
||||
tsi1.EnableBitsetCache = false
|
||||
runBenchmark(b, queryN)
|
||||
runBenchmark(b, queryN, 0)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
|
@ -26,11 +26,11 @@ type TagValueSeriesIDCache struct {
|
|||
evictor *list.List
|
||||
|
||||
tracker *cacheTracker
|
||||
capacity int
|
||||
capacity uint64
|
||||
}
|
||||
|
||||
// NewTagValueSeriesIDCache returns a TagValueSeriesIDCache with capacity c.
|
||||
func NewTagValueSeriesIDCache(c int) *TagValueSeriesIDCache {
|
||||
func NewTagValueSeriesIDCache(c uint64) *TagValueSeriesIDCache {
|
||||
return &TagValueSeriesIDCache{
|
||||
cache: map[string]map[string]map[string]*list.Element{},
|
||||
evictor: list.New(),
|
||||
|
@ -172,8 +172,8 @@ func (c *TagValueSeriesIDCache) delete(name, key, value []byte, x tsdb.SeriesID)
|
|||
// checkEviction checks if the cache is too big, and evicts the least recently used
|
||||
// item if it is.
|
||||
func (c *TagValueSeriesIDCache) checkEviction() {
|
||||
l := c.evictor.Len()
|
||||
c.tracker.SetSize(uint64(l))
|
||||
l := uint64(c.evictor.Len())
|
||||
c.tracker.SetSize(l)
|
||||
if l <= c.capacity {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -6,6 +6,9 @@ import "github.com/influxdata/platform/toml"
|
|||
// write-ahead log file will compact into an index file.
|
||||
const DefaultMaxIndexLogFileSize = 1 * 1024 * 1024 // 1MB
|
||||
|
||||
// DefaultSeriesIDSetCacheSize is the default number of series ID sets to cache.
|
||||
const DefaultSeriesIDSetCacheSize = 1000
|
||||
|
||||
// Config holds configurable Index options.
|
||||
type Config struct {
|
||||
// MaxIndexLogFileSize is the threshold, in bytes, when an index write-ahead log file will
|
||||
|
@ -13,11 +16,21 @@ type Config struct {
|
|||
// and result in lower heap usage at the expense of write throughput. Higher sizes will
|
||||
// be compacted less frequently, store more series in-memory, and provide higher write throughput.
|
||||
MaxIndexLogFileSize toml.Size `toml:"max-index-log-file-size"`
|
||||
|
||||
// SeriesIDSetCacheSize determines the size taken up by the cache of series ID
|
||||
// sets in the index. Since a series id set is a compressed bitmap of all series ids
|
||||
// matching a tag key/value pair, setting this size does not necessarily limit the
|
||||
// size on heap the cache takes up. Care should be taken.
|
||||
//
|
||||
// The cache uses an LRU strategy for eviction. Setting the value to 0 will
|
||||
// disable the cache.
|
||||
SeriesIDSetCacheSize uint64
|
||||
}
|
||||
|
||||
// NewConfig returns a new Config.
|
||||
func NewConfig() Config {
|
||||
return Config{
|
||||
MaxIndexLogFileSize: toml.Size(DefaultMaxIndexLogFileSize),
|
||||
MaxIndexLogFileSize: toml.Size(DefaultMaxIndexLogFileSize),
|
||||
SeriesIDSetCacheSize: DefaultSeriesIDSetCacheSize,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package tsi1
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
|
@ -8,28 +9,22 @@ import (
|
|||
"path/filepath"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"bytes"
|
||||
"sort"
|
||||
|
||||
"github.com/cespare/xxhash"
|
||||
"github.com/influxdata/influxql"
|
||||
"github.com/influxdata/platform/models"
|
||||
"github.com/influxdata/platform/pkg/slices"
|
||||
"github.com/influxdata/platform/query"
|
||||
"github.com/influxdata/platform/tsdb"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// DefaultSeriesIDSetCacheSize is the default number of series ID sets to cache.
|
||||
const DefaultSeriesIDSetCacheSize = 100
|
||||
|
||||
// ErrCompactionInterrupted is returned if compactions are disabled or
|
||||
// an index is closed while a compaction is occurring.
|
||||
var ErrCompactionInterrupted = errors.New("tsi1: compaction interrupted")
|
||||
|
@ -42,15 +37,6 @@ func init() {
|
|||
}
|
||||
DefaultPartitionN = uint64(i)
|
||||
}
|
||||
|
||||
// TODO(edd): To remove when feature finalised.
|
||||
var err error
|
||||
if os.Getenv("INFLUXDB_EXP_TSI_CACHING") != "" {
|
||||
EnableBitsetCache, err = strconv.ParseBool(os.Getenv("INFLUXDB_EXP_TSI_CACHING"))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DefaultPartitionN determines how many shards the index will be partitioned into.
|
||||
|
@ -60,9 +46,6 @@ func init() {
|
|||
//
|
||||
var DefaultPartitionN uint64 = 8
|
||||
|
||||
// EnableBitsetCache determines if bitsets are cached.
|
||||
var EnableBitsetCache = true
|
||||
|
||||
// An IndexOption is a functional option for changing the configuration of
|
||||
// an Index.
|
||||
type IndexOption func(i *Index)
|
||||
|
@ -123,6 +106,7 @@ type Index struct {
|
|||
logfileBufferSize int // The size of the buffer used by the LogFile.
|
||||
disableFsync bool // Disables flushing buffers and fsyning files. Used when working with indexes offline.
|
||||
logger *zap.Logger // Index's logger.
|
||||
config Config // The index configuration
|
||||
|
||||
// The following must be set when initializing an Index.
|
||||
sfile *tsdb.SeriesFile // series lookup file
|
||||
|
@ -141,11 +125,12 @@ func (i *Index) UniqueReferenceID() uintptr {
|
|||
// NewIndex returns a new instance of Index.
|
||||
func NewIndex(sfile *tsdb.SeriesFile, c Config, options ...IndexOption) *Index {
|
||||
idx := &Index{
|
||||
tagValueCache: NewTagValueSeriesIDCache(DefaultSeriesIDSetCacheSize),
|
||||
tagValueCache: NewTagValueSeriesIDCache(c.SeriesIDSetCacheSize),
|
||||
partitionMetrics: newPartitionMetrics(nil),
|
||||
maxLogFileSize: int64(c.MaxIndexLogFileSize),
|
||||
logger: zap.NewNop(),
|
||||
version: Version,
|
||||
config: c,
|
||||
sfile: sfile,
|
||||
PartitionN: DefaultPartitionN,
|
||||
}
|
||||
|
@ -926,7 +911,7 @@ func (i *Index) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesID
|
|||
// tagValueSeriesIDIterator returns a series iterator for a single tag value.
|
||||
func (i *Index) tagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesIDIterator, error) {
|
||||
// Check series ID set cache...
|
||||
if EnableBitsetCache {
|
||||
if i.config.SeriesIDSetCacheSize > 0 { // Cache enabled.
|
||||
if ss := i.tagValueCache.Get(name, key, value); ss != nil {
|
||||
// Return a clone because the set is mutable.
|
||||
return tsdb.NewSeriesIDSetIterator(ss.Clone()), nil
|
||||
|
@ -944,7 +929,7 @@ func (i *Index) tagValueSeriesIDIterator(name, key, value []byte) (tsdb.SeriesID
|
|||
}
|
||||
|
||||
itr := tsdb.MergeSeriesIDIterators(a...)
|
||||
if !EnableBitsetCache {
|
||||
if i.config.SeriesIDSetCacheSize == 0 { // Cache disabled.
|
||||
return itr, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue