fix(tsi1): Remove TSI cardinality stats cache
parent
dd95578940
commit
9237ee6a40
|
@ -614,7 +614,7 @@ func (e *Engine) ApplyFnToSeriesIDSet(fn func(*tsdb.SeriesIDSet)) {
|
|||
}
|
||||
|
||||
// MeasurementCardinalityStats returns cardinality stats for all measurements.
|
||||
func (e *Engine) MeasurementCardinalityStats() tsi1.MeasurementCardinalityStats {
|
||||
func (e *Engine) MeasurementCardinalityStats() (tsi1.MeasurementCardinalityStats, error) {
|
||||
return e.index.MeasurementCardinalityStats()
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
package tsi1
|
||||
|
||||
import "github.com/influxdata/influxdb/toml"
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/toml"
|
||||
)
|
||||
|
||||
// DefaultMaxIndexLogFileSize is the default threshold, in bytes, when an index
|
||||
// write-ahead log file will compact into an index file.
|
||||
|
@ -25,6 +29,10 @@ type Config struct {
|
|||
// The cache uses an LRU strategy for eviction. Setting the value to 0 will
|
||||
// disable the cache.
|
||||
SeriesIDSetCacheSize uint64
|
||||
|
||||
// StatsTTL sets the time-to-live for the stats cache. If zero, then caching
|
||||
// is disabled. If set then stats are cached for the given amount of time.
|
||||
StatsTTL time.Duration `toml:"stats-ttl"`
|
||||
}
|
||||
|
||||
// NewConfig returns a new Config.
|
||||
|
|
|
@ -403,39 +403,6 @@ func (fs *FileSet) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.Serie
|
|||
return tsdb.NewSeriesIDSetIterator(ss), nil
|
||||
}
|
||||
|
||||
// Stats computes aggregate measurement cardinality stats from the raw index data.
|
||||
func (fs *FileSet) Stats() MeasurementCardinalityStats {
|
||||
stats := make(MeasurementCardinalityStats)
|
||||
mitr := fs.MeasurementIterator()
|
||||
if mitr == nil {
|
||||
return stats
|
||||
}
|
||||
|
||||
for {
|
||||
// Iterate over each measurement and set cardinality.
|
||||
mm := mitr.Next()
|
||||
if mm == nil {
|
||||
return stats
|
||||
}
|
||||
|
||||
// Obtain all series for measurement.
|
||||
sitr := fs.MeasurementSeriesIDIterator(mm.Name())
|
||||
if sitr == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// All iterators should be series id set iterators except legacy 1.x data.
|
||||
// Skip if it does not conform as aggregation would be too slow.
|
||||
ssitr, ok := sitr.(tsdb.SeriesIDSetIterator)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Set cardinality for the given measurement.
|
||||
stats[string(mm.Name())] = int(ssitr.SeriesIDSet().Cardinality())
|
||||
}
|
||||
}
|
||||
|
||||
// File represents a log or index file.
|
||||
type File interface {
|
||||
Close() error
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/cespare/xxhash"
|
||||
|
@ -126,6 +127,9 @@ type Index struct {
|
|||
// Index's version.
|
||||
version int
|
||||
|
||||
// Cardinality stats caching time-to-live.
|
||||
StatsTTL time.Duration
|
||||
|
||||
// Number of partitions used by the index.
|
||||
PartitionN uint64
|
||||
}
|
||||
|
@ -145,6 +149,7 @@ func NewIndex(sfile *tsdb.SeriesFile, c Config, options ...IndexOption) *Index {
|
|||
version: Version,
|
||||
config: c,
|
||||
sfile: sfile,
|
||||
StatsTTL: c.StatsTTL,
|
||||
PartitionN: DefaultPartitionN,
|
||||
}
|
||||
|
||||
|
@ -243,6 +248,7 @@ func (i *Index) Open(ctx context.Context) error {
|
|||
for j := 0; j < len(i.partitions); j++ {
|
||||
p := NewPartition(i.sfile, filepath.Join(i.path, fmt.Sprint(j)))
|
||||
p.MaxLogFileSize = i.maxLogFileSize
|
||||
p.StatsTTL = i.StatsTTL
|
||||
p.nosync = i.disableFsync
|
||||
p.logbufferSize = i.logfileBufferSize
|
||||
p.logger = i.logger.With(zap.String("tsi1_partition", fmt.Sprint(j+1)))
|
||||
|
@ -1194,25 +1200,13 @@ func (i *Index) SetFieldName(measurement []byte, name string) {}
|
|||
func (i *Index) Rebuild() {}
|
||||
|
||||
// MeasurementCardinalityStats returns cardinality stats for all measurements.
|
||||
func (i *Index) MeasurementCardinalityStats() MeasurementCardinalityStats {
|
||||
func (i *Index) MeasurementCardinalityStats() (MeasurementCardinalityStats, error) {
|
||||
i.mu.RLock()
|
||||
defer i.mu.RUnlock()
|
||||
|
||||
stats := NewMeasurementCardinalityStats()
|
||||
for _, p := range i.partitions {
|
||||
stats.Add(p.MeasurementCardinalityStats())
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
||||
// ComputeMeasurementCardinalityStats computes the cardinality stats from raw index data.
|
||||
func (i *Index) ComputeMeasurementCardinalityStats() (MeasurementCardinalityStats, error) {
|
||||
i.mu.RLock()
|
||||
defer i.mu.RUnlock()
|
||||
|
||||
stats := NewMeasurementCardinalityStats()
|
||||
for _, p := range i.partitions {
|
||||
pstats, err := p.ComputeMeasurementCardinalityStats()
|
||||
pstats, err := p.MeasurementCardinalityStats()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -338,7 +338,9 @@ func TestIndex_MeasurementCardinalityStats(t *testing.T) {
|
|||
t.Run("Empty", func(t *testing.T) {
|
||||
idx := MustOpenIndex(1, tsi1.NewConfig())
|
||||
defer idx.Close()
|
||||
if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{}); diff != "" {
|
||||
if stats, err := idx.MeasurementCardinalityStats(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{}); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
})
|
||||
|
@ -355,7 +357,9 @@ func TestIndex_MeasurementCardinalityStats(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{"cpu": 2, "mem": 1}); diff != "" {
|
||||
if stats, err := idx.MeasurementCardinalityStats(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{"cpu": 2, "mem": 1}); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
})
|
||||
|
@ -375,14 +379,18 @@ func TestIndex_MeasurementCardinalityStats(t *testing.T) {
|
|||
seriesID := idx.SeriesFile.SeriesID([]byte("cpu"), models.NewTags(map[string]string{"region": "west"}), nil)
|
||||
if err := idx.DropSeries(seriesID, idx.SeriesFile.SeriesKey(seriesID), true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{"cpu": 1, "mem": 1}); diff != "" {
|
||||
} else if stats, err := idx.MeasurementCardinalityStats(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{"cpu": 1, "mem": 1}); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
|
||||
seriesID = idx.SeriesFile.SeriesID([]byte("mem"), models.NewTags(map[string]string{"region": "east"}), nil)
|
||||
if err := idx.DropSeries(seriesID, idx.SeriesFile.SeriesKey(seriesID), true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{"cpu": 1}); diff != "" {
|
||||
} else if stats, err := idx.MeasurementCardinalityStats(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{"cpu": 1}); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
})
|
||||
|
@ -404,14 +412,85 @@ func TestIndex_MeasurementCardinalityStats(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{"cpu": 1000000}); diff != "" {
|
||||
if stats, err := idx.MeasurementCardinalityStats(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{"cpu": 1000000}); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
|
||||
// Reopen and verify count.
|
||||
if err := idx.Reopen(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{"cpu": 1000000}); diff != "" {
|
||||
} else if stats, err := idx.MeasurementCardinalityStats(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{"cpu": 1000000}); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("LargeWithDelete", func(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("short mode, skipping")
|
||||
}
|
||||
config := tsi1.NewConfig()
|
||||
config.MaxIndexLogFileSize = 4096
|
||||
idx := MustOpenIndex(1, config)
|
||||
defer idx.Close()
|
||||
|
||||
a := make([]Series, 1000)
|
||||
for i := range a {
|
||||
a[i] = Series{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": fmt.Sprintf("east%04d", i)})}
|
||||
}
|
||||
if err := idx.CreateSeriesSliceIfNotExists(a); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Issue deletion.
|
||||
if err := idx.DropMeasurement([]byte("cpu")); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if stats, err := idx.MeasurementCardinalityStats(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{}); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Cache", func(t *testing.T) {
|
||||
config := tsi1.NewConfig()
|
||||
config.StatsTTL = 1 * time.Second
|
||||
idx := MustOpenIndex(1, config)
|
||||
defer idx.Close()
|
||||
|
||||
// Insert two series & verify series.
|
||||
if err := idx.CreateSeriesSliceIfNotExists([]Series{
|
||||
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})},
|
||||
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if stats, err := idx.MeasurementCardinalityStats(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{"cpu": 2}); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
|
||||
// Insert one more series and immediate check. No change should occur.
|
||||
if err := idx.CreateSeriesSliceIfNotExists([]Series{
|
||||
{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north"})},
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if stats, err := idx.MeasurementCardinalityStats(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{"cpu": 2}); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
|
||||
// Wait for TTL.
|
||||
time.Sleep(config.StatsTTL)
|
||||
|
||||
// Verify again and stats should be updated.
|
||||
if stats, err := idx.MeasurementCardinalityStats(); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{"cpu": 3}); diff != "" {
|
||||
t.Fatal(diff)
|
||||
}
|
||||
})
|
||||
|
@ -626,31 +705,6 @@ func BenchmarkIndex_CreateSeriesListIfNotExist(b *testing.B) {
|
|||
})
|
||||
}
|
||||
|
||||
func BenchmarkIndex_ComputeMeasurementCardinalityStats(b *testing.B) {
|
||||
idx := MustOpenIndex(1, tsi1.NewConfig())
|
||||
defer idx.Close()
|
||||
|
||||
const n = 10000
|
||||
for i := 0; i < n; i++ {
|
||||
name := []byte(fmt.Sprintf("%08x", i))
|
||||
a := make([]Series, 1000)
|
||||
for j := range a {
|
||||
a[j] = Series{Name: name, Tags: models.NewTags(map[string]string{"region": fmt.Sprintf("east%04d", j)})}
|
||||
}
|
||||
if err := idx.CreateSeriesSliceIfNotExists(a); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
b.Run("", func(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
if _, err := idx.ComputeMeasurementCardinalityStats(); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Index is a test wrapper for tsi1.Index.
|
||||
type Index struct {
|
||||
*tsi1.Index
|
||||
|
|
|
@ -58,10 +58,6 @@ type Partition struct {
|
|||
fileSet *FileSet // current file set
|
||||
seq int // file id sequence
|
||||
|
||||
// Computed measurements stats since last compaction.
|
||||
// NOTE: Does not include active log file stats.
|
||||
stats MeasurementCardinalityStats
|
||||
|
||||
// Running statistics
|
||||
tracker *partitionTracker
|
||||
|
||||
|
@ -69,6 +65,11 @@ type Partition struct {
|
|||
// in this partition. This set tracks both insertions and deletions of a series.
|
||||
seriesIDSet *tsdb.SeriesIDSet
|
||||
|
||||
// Stats caching
|
||||
StatsTTL time.Duration
|
||||
statsCache MeasurementCardinalityStats
|
||||
lastStatsTime time.Time
|
||||
|
||||
// Compaction management
|
||||
levels []CompactionLevel // compaction levels
|
||||
levelCompacting []bool // level compaction status
|
||||
|
@ -123,7 +124,6 @@ func (p *Partition) bytes() int {
|
|||
b += int(unsafe.Sizeof(p.activeLogFile)) + p.activeLogFile.bytes()
|
||||
b += int(unsafe.Sizeof(p.fileSet)) + p.fileSet.bytes()
|
||||
b += int(unsafe.Sizeof(p.seq))
|
||||
b += int(unsafe.Sizeof(p.stats))
|
||||
b += int(unsafe.Sizeof(p.tracker))
|
||||
b += int(unsafe.Sizeof(p.seriesIDSet)) + p.seriesIDSet.Bytes()
|
||||
b += int(unsafe.Sizeof(p.levels))
|
||||
|
@ -279,9 +279,6 @@ func (p *Partition) Open() (err error) {
|
|||
p.tracker.SetFiles(uint64(len(p.fileSet.LogFiles())), "log")
|
||||
p.tracker.SetDiskSize(uint64(p.fileSet.Size()))
|
||||
|
||||
// Compute initial stats.
|
||||
p.computeStats()
|
||||
|
||||
// Mark opened.
|
||||
p.res.Open()
|
||||
|
||||
|
@ -508,9 +505,6 @@ func (p *Partition) prependActiveLogFile() error {
|
|||
p.replaceFileSet(fileSet)
|
||||
p.manifestSize = manifestSize
|
||||
|
||||
// Compute new stats after fileset has been replaced.
|
||||
p.computeStats()
|
||||
|
||||
// Set the file metrics again.
|
||||
p.tracker.SetFiles(uint64(len(p.fileSet.IndexFiles())), "index")
|
||||
p.tracker.SetFiles(uint64(len(p.fileSet.LogFiles())), "log")
|
||||
|
@ -1297,40 +1291,73 @@ func (p *Partition) compactLogFile(ctx context.Context, logFile *LogFile, interr
|
|||
}
|
||||
|
||||
// MeasurementCardinalityStats returns cardinality stats for all measurements.
|
||||
func (p *Partition) MeasurementCardinalityStats() MeasurementCardinalityStats {
|
||||
func (p *Partition) MeasurementCardinalityStats() (MeasurementCardinalityStats, error) {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
|
||||
stats := p.stats.Clone()
|
||||
|
||||
if p.activeLogFile != nil {
|
||||
stats.Add(p.activeLogFile.MeasurementCardinalityStats())
|
||||
// Return cached version, if enabled and the TTL is less than the last cache time.
|
||||
if p.StatsTTL > 0 && !p.lastStatsTime.IsZero() && time.Since(p.lastStatsTime) < p.StatsTTL {
|
||||
return p.statsCache.Clone(), nil
|
||||
}
|
||||
return stats
|
||||
|
||||
// If cache is unavailable then generate fresh stats.
|
||||
stats, err := p.measurementCardinalityStats()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Cache the stats if enabled.
|
||||
if p.StatsTTL > 0 {
|
||||
p.statsCache = stats
|
||||
p.lastStatsTime = time.Now()
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// ComputeMeasurementCardinalityStats computes cardinality stats from raw data.
|
||||
func (p *Partition) ComputeMeasurementCardinalityStats() (MeasurementCardinalityStats, error) {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
|
||||
func (p *Partition) measurementCardinalityStats() (MeasurementCardinalityStats, error) {
|
||||
fs, err := p.fileSet.Duplicate()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer fs.Release()
|
||||
return fs.Stats(), nil
|
||||
}
|
||||
|
||||
// computeStats calculates the measurement stats from all files except the active log.
|
||||
// FileSet must already be retained when calling this function.
|
||||
func (p *Partition) computeStats() {
|
||||
// Shallow copy the fileset and trim initial active log file.
|
||||
fs := *p.fileSet
|
||||
fs.files = fs.files[1:]
|
||||
stats := make(MeasurementCardinalityStats)
|
||||
mitr := fs.MeasurementIterator()
|
||||
if mitr == nil {
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// Compute stats on the remaining files.
|
||||
p.stats = fs.Stats()
|
||||
for {
|
||||
// Iterate over each measurement and set cardinality.
|
||||
mm := mitr.Next()
|
||||
if mm == nil {
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// Obtain all series for measurement.
|
||||
sitr := fs.MeasurementSeriesIDIterator(mm.Name())
|
||||
if sitr == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// All iterators should be series id set iterators except legacy 1.x data.
|
||||
// Skip if it does not conform as aggregation would be too slow.
|
||||
ssitr, ok := sitr.(tsdb.SeriesIDSetIterator)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Intersect with partition set to ensure deleted series are removed.
|
||||
set := p.seriesIDSet.And(ssitr.SeriesIDSet())
|
||||
cardinality := int(set.Cardinality())
|
||||
if cardinality == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Set cardinality for the given measurement.
|
||||
stats[string(mm.Name())] = cardinality
|
||||
}
|
||||
}
|
||||
|
||||
type partitionTracker struct {
|
||||
|
|
Loading…
Reference in New Issue