diff --git a/storage/engine.go b/storage/engine.go index e1a2cd1b2a..d7efb2750c 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -614,7 +614,7 @@ func (e *Engine) ApplyFnToSeriesIDSet(fn func(*tsdb.SeriesIDSet)) { } // MeasurementCardinalityStats returns cardinality stats for all measurements. -func (e *Engine) MeasurementCardinalityStats() tsi1.MeasurementCardinalityStats { +func (e *Engine) MeasurementCardinalityStats() (tsi1.MeasurementCardinalityStats, error) { return e.index.MeasurementCardinalityStats() } diff --git a/tsdb/tsi1/config.go b/tsdb/tsi1/config.go index 07cac181f1..2355d20924 100644 --- a/tsdb/tsi1/config.go +++ b/tsdb/tsi1/config.go @@ -1,6 +1,10 @@ package tsi1 -import "github.com/influxdata/influxdb/toml" +import ( + "time" + + "github.com/influxdata/influxdb/toml" +) // DefaultMaxIndexLogFileSize is the default threshold, in bytes, when an index // write-ahead log file will compact into an index file. @@ -25,6 +29,10 @@ type Config struct { // The cache uses an LRU strategy for eviction. Setting the value to 0 will // disable the cache. SeriesIDSetCacheSize uint64 + + // StatsTTL sets the time-to-live for the stats cache. If zero, then caching + // is disabled. If set then stats are cached for the given amount of time. + StatsTTL time.Duration `toml:"stats-ttl"` } // NewConfig returns a new Config. diff --git a/tsdb/tsi1/file_set.go b/tsdb/tsi1/file_set.go index b1241c8cd2..6b11cc0a7a 100644 --- a/tsdb/tsi1/file_set.go +++ b/tsdb/tsi1/file_set.go @@ -403,39 +403,6 @@ func (fs *FileSet) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.Serie return tsdb.NewSeriesIDSetIterator(ss), nil } -// Stats computes aggregate measurement cardinality stats from the raw index data. -func (fs *FileSet) Stats() MeasurementCardinalityStats { - stats := make(MeasurementCardinalityStats) - mitr := fs.MeasurementIterator() - if mitr == nil { - return stats - } - - for { - // Iterate over each measurement and set cardinality. - mm := mitr.Next() - if mm == nil { - return stats - } - - // Obtain all series for measurement. - sitr := fs.MeasurementSeriesIDIterator(mm.Name()) - if sitr == nil { - continue - } - - // All iterators should be series id set iterators except legacy 1.x data. - // Skip if it does not conform as aggregation would be too slow. - ssitr, ok := sitr.(tsdb.SeriesIDSetIterator) - if !ok { - continue - } - - // Set cardinality for the given measurement. - stats[string(mm.Name())] = int(ssitr.SeriesIDSet().Cardinality()) - } -} - // File represents a log or index file. type File interface { Close() error diff --git a/tsdb/tsi1/index.go b/tsdb/tsi1/index.go index 7550c68dc8..744d5cdaed 100644 --- a/tsdb/tsi1/index.go +++ b/tsdb/tsi1/index.go @@ -14,6 +14,7 @@ import ( "strconv" "sync" "sync/atomic" + "time" "unsafe" "github.com/cespare/xxhash" @@ -126,6 +127,9 @@ type Index struct { // Index's version. version int + // Cardinality stats caching time-to-live. + StatsTTL time.Duration + // Number of partitions used by the index. PartitionN uint64 } @@ -145,6 +149,7 @@ func NewIndex(sfile *tsdb.SeriesFile, c Config, options ...IndexOption) *Index { version: Version, config: c, sfile: sfile, + StatsTTL: c.StatsTTL, PartitionN: DefaultPartitionN, } @@ -243,6 +248,7 @@ func (i *Index) Open(ctx context.Context) error { for j := 0; j < len(i.partitions); j++ { p := NewPartition(i.sfile, filepath.Join(i.path, fmt.Sprint(j))) p.MaxLogFileSize = i.maxLogFileSize + p.StatsTTL = i.StatsTTL p.nosync = i.disableFsync p.logbufferSize = i.logfileBufferSize p.logger = i.logger.With(zap.String("tsi1_partition", fmt.Sprint(j+1))) @@ -1194,25 +1200,13 @@ func (i *Index) SetFieldName(measurement []byte, name string) {} func (i *Index) Rebuild() {} // MeasurementCardinalityStats returns cardinality stats for all measurements. -func (i *Index) MeasurementCardinalityStats() MeasurementCardinalityStats { +func (i *Index) MeasurementCardinalityStats() (MeasurementCardinalityStats, error) { i.mu.RLock() defer i.mu.RUnlock() stats := NewMeasurementCardinalityStats() for _, p := range i.partitions { - stats.Add(p.MeasurementCardinalityStats()) - } - return stats -} - -// ComputeMeasurementCardinalityStats computes the cardinality stats from raw index data. -func (i *Index) ComputeMeasurementCardinalityStats() (MeasurementCardinalityStats, error) { - i.mu.RLock() - defer i.mu.RUnlock() - - stats := NewMeasurementCardinalityStats() - for _, p := range i.partitions { - pstats, err := p.ComputeMeasurementCardinalityStats() + pstats, err := p.MeasurementCardinalityStats() if err != nil { return nil, err } diff --git a/tsdb/tsi1/index_test.go b/tsdb/tsi1/index_test.go index 6340182a96..835b64869f 100644 --- a/tsdb/tsi1/index_test.go +++ b/tsdb/tsi1/index_test.go @@ -338,7 +338,9 @@ func TestIndex_MeasurementCardinalityStats(t *testing.T) { t.Run("Empty", func(t *testing.T) { idx := MustOpenIndex(1, tsi1.NewConfig()) defer idx.Close() - if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{}); diff != "" { + if stats, err := idx.MeasurementCardinalityStats(); err != nil { + t.Fatal(err) + } else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{}); diff != "" { t.Fatal(diff) } }) @@ -355,7 +357,9 @@ func TestIndex_MeasurementCardinalityStats(t *testing.T) { t.Fatal(err) } - if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{"cpu": 2, "mem": 1}); diff != "" { + if stats, err := idx.MeasurementCardinalityStats(); err != nil { + t.Fatal(err) + } else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{"cpu": 2, "mem": 1}); diff != "" { t.Fatal(diff) } }) @@ -375,14 +379,18 @@ func TestIndex_MeasurementCardinalityStats(t *testing.T) { seriesID := idx.SeriesFile.SeriesID([]byte("cpu"), models.NewTags(map[string]string{"region": "west"}), nil) if err := idx.DropSeries(seriesID, idx.SeriesFile.SeriesKey(seriesID), true); err != nil { t.Fatal(err) - } else if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{"cpu": 1, "mem": 1}); diff != "" { + } else if stats, err := idx.MeasurementCardinalityStats(); err != nil { + t.Fatal(err) + } else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{"cpu": 1, "mem": 1}); diff != "" { t.Fatal(diff) } seriesID = idx.SeriesFile.SeriesID([]byte("mem"), models.NewTags(map[string]string{"region": "east"}), nil) if err := idx.DropSeries(seriesID, idx.SeriesFile.SeriesKey(seriesID), true); err != nil { t.Fatal(err) - } else if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{"cpu": 1}); diff != "" { + } else if stats, err := idx.MeasurementCardinalityStats(); err != nil { + t.Fatal(err) + } else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{"cpu": 1}); diff != "" { t.Fatal(diff) } }) @@ -404,14 +412,85 @@ func TestIndex_MeasurementCardinalityStats(t *testing.T) { } } - if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{"cpu": 1000000}); diff != "" { + if stats, err := idx.MeasurementCardinalityStats(); err != nil { + t.Fatal(err) + } else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{"cpu": 1000000}); diff != "" { t.Fatal(diff) } // Reopen and verify count. if err := idx.Reopen(); err != nil { t.Fatal(err) - } else if diff := cmp.Diff(idx.MeasurementCardinalityStats(), tsi1.MeasurementCardinalityStats{"cpu": 1000000}); diff != "" { + } else if stats, err := idx.MeasurementCardinalityStats(); err != nil { + t.Fatal(err) + } else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{"cpu": 1000000}); diff != "" { + t.Fatal(diff) + } + }) + + t.Run("LargeWithDelete", func(t *testing.T) { + if testing.Short() { + t.Skip("short mode, skipping") + } + config := tsi1.NewConfig() + config.MaxIndexLogFileSize = 4096 + idx := MustOpenIndex(1, config) + defer idx.Close() + + a := make([]Series, 1000) + for i := range a { + a[i] = Series{Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": fmt.Sprintf("east%04d", i)})} + } + if err := idx.CreateSeriesSliceIfNotExists(a); err != nil { + t.Fatal(err) + } + + // Issue deletion. + if err := idx.DropMeasurement([]byte("cpu")); err != nil { + t.Fatal(err) + } else if stats, err := idx.MeasurementCardinalityStats(); err != nil { + t.Fatal(err) + } else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{}); diff != "" { + t.Fatal(diff) + } + }) + + t.Run("Cache", func(t *testing.T) { + config := tsi1.NewConfig() + config.StatsTTL = 1 * time.Second + idx := MustOpenIndex(1, config) + defer idx.Close() + + // Insert two series & verify series. + if err := idx.CreateSeriesSliceIfNotExists([]Series{ + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "east"})}, + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "west"})}, + }); err != nil { + t.Fatal(err) + } else if stats, err := idx.MeasurementCardinalityStats(); err != nil { + t.Fatal(err) + } else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{"cpu": 2}); diff != "" { + t.Fatal(diff) + } + + // Insert one more series and immediate check. No change should occur. + if err := idx.CreateSeriesSliceIfNotExists([]Series{ + {Name: []byte("cpu"), Tags: models.NewTags(map[string]string{"region": "north"})}, + }); err != nil { + t.Fatal(err) + } else if stats, err := idx.MeasurementCardinalityStats(); err != nil { + t.Fatal(err) + } else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{"cpu": 2}); diff != "" { + t.Fatal(diff) + } + + // Wait for TTL. + time.Sleep(config.StatsTTL) + + // Verify again and stats should be updated. + if stats, err := idx.MeasurementCardinalityStats(); err != nil { + t.Fatal(err) + } else if diff := cmp.Diff(stats, tsi1.MeasurementCardinalityStats{"cpu": 3}); diff != "" { t.Fatal(diff) } }) @@ -626,31 +705,6 @@ func BenchmarkIndex_CreateSeriesListIfNotExist(b *testing.B) { }) } -func BenchmarkIndex_ComputeMeasurementCardinalityStats(b *testing.B) { - idx := MustOpenIndex(1, tsi1.NewConfig()) - defer idx.Close() - - const n = 10000 - for i := 0; i < n; i++ { - name := []byte(fmt.Sprintf("%08x", i)) - a := make([]Series, 1000) - for j := range a { - a[j] = Series{Name: name, Tags: models.NewTags(map[string]string{"region": fmt.Sprintf("east%04d", j)})} - } - if err := idx.CreateSeriesSliceIfNotExists(a); err != nil { - b.Fatal(err) - } - } - - b.Run("", func(b *testing.B) { - for i := 0; i < b.N; i++ { - if _, err := idx.ComputeMeasurementCardinalityStats(); err != nil { - b.Fatal(err) - } - } - }) -} - // Index is a test wrapper for tsi1.Index. type Index struct { *tsi1.Index diff --git a/tsdb/tsi1/partition.go b/tsdb/tsi1/partition.go index 32ad459a0a..debaeb6574 100644 --- a/tsdb/tsi1/partition.go +++ b/tsdb/tsi1/partition.go @@ -58,10 +58,6 @@ type Partition struct { fileSet *FileSet // current file set seq int // file id sequence - // Computed measurements stats since last compaction. - // NOTE: Does not include active log file stats. - stats MeasurementCardinalityStats - // Running statistics tracker *partitionTracker @@ -69,6 +65,11 @@ type Partition struct { // in this partition. This set tracks both insertions and deletions of a series. seriesIDSet *tsdb.SeriesIDSet + // Stats caching + StatsTTL time.Duration + statsCache MeasurementCardinalityStats + lastStatsTime time.Time + // Compaction management levels []CompactionLevel // compaction levels levelCompacting []bool // level compaction status @@ -123,7 +124,6 @@ func (p *Partition) bytes() int { b += int(unsafe.Sizeof(p.activeLogFile)) + p.activeLogFile.bytes() b += int(unsafe.Sizeof(p.fileSet)) + p.fileSet.bytes() b += int(unsafe.Sizeof(p.seq)) - b += int(unsafe.Sizeof(p.stats)) b += int(unsafe.Sizeof(p.tracker)) b += int(unsafe.Sizeof(p.seriesIDSet)) + p.seriesIDSet.Bytes() b += int(unsafe.Sizeof(p.levels)) @@ -279,9 +279,6 @@ func (p *Partition) Open() (err error) { p.tracker.SetFiles(uint64(len(p.fileSet.LogFiles())), "log") p.tracker.SetDiskSize(uint64(p.fileSet.Size())) - // Compute initial stats. - p.computeStats() - // Mark opened. p.res.Open() @@ -508,9 +505,6 @@ func (p *Partition) prependActiveLogFile() error { p.replaceFileSet(fileSet) p.manifestSize = manifestSize - // Compute new stats after fileset has been replaced. - p.computeStats() - // Set the file metrics again. p.tracker.SetFiles(uint64(len(p.fileSet.IndexFiles())), "index") p.tracker.SetFiles(uint64(len(p.fileSet.LogFiles())), "log") @@ -1297,40 +1291,73 @@ func (p *Partition) compactLogFile(ctx context.Context, logFile *LogFile, interr } // MeasurementCardinalityStats returns cardinality stats for all measurements. -func (p *Partition) MeasurementCardinalityStats() MeasurementCardinalityStats { +func (p *Partition) MeasurementCardinalityStats() (MeasurementCardinalityStats, error) { p.mu.RLock() defer p.mu.RUnlock() - stats := p.stats.Clone() - - if p.activeLogFile != nil { - stats.Add(p.activeLogFile.MeasurementCardinalityStats()) + // Return cached version, if enabled and the TTL is less than the last cache time. + if p.StatsTTL > 0 && !p.lastStatsTime.IsZero() && time.Since(p.lastStatsTime) < p.StatsTTL { + return p.statsCache.Clone(), nil } - return stats + + // If cache is unavailable then generate fresh stats. + stats, err := p.measurementCardinalityStats() + if err != nil { + return nil, err + } + + // Cache the stats if enabled. + if p.StatsTTL > 0 { + p.statsCache = stats + p.lastStatsTime = time.Now() + } + + return stats, nil } -// ComputeMeasurementCardinalityStats computes cardinality stats from raw data. -func (p *Partition) ComputeMeasurementCardinalityStats() (MeasurementCardinalityStats, error) { - p.mu.RLock() - defer p.mu.RUnlock() - +func (p *Partition) measurementCardinalityStats() (MeasurementCardinalityStats, error) { fs, err := p.fileSet.Duplicate() if err != nil { return nil, err } defer fs.Release() - return fs.Stats(), nil -} -// computeStats calculates the measurement stats from all files except the active log. -// FileSet must already be retained when calling this function. -func (p *Partition) computeStats() { - // Shallow copy the fileset and trim initial active log file. - fs := *p.fileSet - fs.files = fs.files[1:] + stats := make(MeasurementCardinalityStats) + mitr := fs.MeasurementIterator() + if mitr == nil { + return stats, nil + } - // Compute stats on the remaining files. - p.stats = fs.Stats() + for { + // Iterate over each measurement and set cardinality. + mm := mitr.Next() + if mm == nil { + return stats, nil + } + + // Obtain all series for measurement. + sitr := fs.MeasurementSeriesIDIterator(mm.Name()) + if sitr == nil { + continue + } + + // All iterators should be series id set iterators except legacy 1.x data. + // Skip if it does not conform as aggregation would be too slow. + ssitr, ok := sitr.(tsdb.SeriesIDSetIterator) + if !ok { + continue + } + + // Intersect with partition set to ensure deleted series are removed. + set := p.seriesIDSet.And(ssitr.SeriesIDSet()) + cardinality := int(set.Cardinality()) + if cardinality == 0 { + continue + } + + // Set cardinality for the given measurement. + stats[string(mm.Name())] = cardinality + } } type partitionTracker struct {