diff --git a/cmd/influx_inspect/buildtsi/buildtsi.go b/cmd/influx_inspect/buildtsi/buildtsi.go index 5592c50e0f..d1cfe49df9 100644 --- a/cmd/influx_inspect/buildtsi/buildtsi.go +++ b/cmd/influx_inspect/buildtsi/buildtsi.go @@ -232,6 +232,7 @@ func IndexShard(sfile *tsdb.SeriesFile, dataDir, walDir string, maxLogFileSize i // Each new series entry in a log file is ~12 bytes so this should // roughly equate to one flush to the file for every batch. tsi1.WithLogFileBufferSize(12*batchSize), + tsi1.DisableMetrics(), // Disable metrics when rebuilding an index ) tsiIndex.WithLogger(log) diff --git a/pkg/rhh/rhh.go b/pkg/rhh/rhh.go index bd986e86ee..782a635018 100644 --- a/pkg/rhh/rhh.go +++ b/pkg/rhh/rhh.go @@ -39,6 +39,8 @@ func NewHashMap(opt Options) *HashMap { loadFactor: opt.LoadFactor, tracker: newRHHTracker(opt.Metrics, opt.Labels), } + m.tracker.enabled = opt.MetricsEnabled + m.alloc() return m } @@ -270,6 +272,7 @@ func (m *HashMap) PrometheusCollectors() []prometheus.Collector { type rhhTracker struct { metrics *Metrics labels prometheus.Labels + enabled bool } // Labels returns a copy of the default labels used by the tracker's metrics. @@ -283,43 +286,71 @@ func (t *rhhTracker) Labels() prometheus.Labels { } func newRHHTracker(metrics *Metrics, defaultLabels prometheus.Labels) *rhhTracker { - return &rhhTracker{metrics: metrics, labels: defaultLabels} + return &rhhTracker{metrics: metrics, labels: defaultLabels, enabled: true} } func (t *rhhTracker) SetLoadFactor(load float64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.LoadFactor.With(labels).Set(load) } func (t *rhhTracker) SetSize(sz uint64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.Size.With(labels).Set(float64(sz)) } func (t *rhhTracker) ObserveGet(d time.Duration) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.GetDuration.With(labels).Observe(float64(d.Nanoseconds())) t.metrics.LastGetDuration.With(labels).Set(float64(d.Nanoseconds())) } func (t *rhhTracker) ObservePut(d time.Duration) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.InsertDuration.With(labels).Observe(float64(d.Nanoseconds())) t.metrics.LastInsertDuration.With(labels).Set(float64(d.Nanoseconds())) } func (t *rhhTracker) SetGrowDuration(d time.Duration) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.LastGrowDuration.With(labels).Set(d.Seconds()) } // TODO(edd): currently no safe way to calculate this concurrently. func (t *rhhTracker) SetProbeCount(length float64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.MeanProbeCount.With(labels).Set(length) } func (t *rhhTracker) incGet(status string) { + if !t.enabled { + return + } + labels := t.Labels() labels["status"] = status t.metrics.Gets.With(labels).Inc() @@ -329,6 +360,10 @@ func (t *rhhTracker) IncGetHit() { t.incGet("hit") } func (t *rhhTracker) IncGetMiss() { t.incGet("miss") } func (t *rhhTracker) incPut(status string) { + if !t.enabled { + return + } + labels := t.Labels() labels["status"] = status t.metrics.Puts.With(labels).Inc() @@ -357,16 +392,18 @@ func (e *hashElem) setKey(v []byte) { // Options represents initialization options that are passed to NewHashMap(). type Options struct { - Capacity int64 - LoadFactor int - Metrics *Metrics - Labels prometheus.Labels + Capacity int64 + LoadFactor int + MetricsEnabled bool + Metrics *Metrics + Labels prometheus.Labels } // DefaultOptions represents a default set of options to pass to NewHashMap(). var DefaultOptions = Options{ - Capacity: 256, - LoadFactor: 90, + Capacity: 256, + LoadFactor: 90, + MetricsEnabled: true, } // HashKey computes a hash of key. Hash is always non-zero. diff --git a/storage/engine.go b/storage/engine.go index 535a177c17..96976e7d71 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -105,7 +105,6 @@ func NewEngine(path string, c Config, options ...Option) *Engine { e := &Engine{ config: c, path: path, - sfile: tsdb.NewSeriesFile(c.GetSeriesFilePath(path)), defaultMetricLabels: prometheus.Labels{}, logger: zap.NewNop(), } diff --git a/tsdb/series_file.go b/tsdb/series_file.go index 7ed8703935..1af091f277 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -44,6 +44,7 @@ type SeriesFile struct { // each partition decorates the same metric measurements with different // partition id label values. defaultMetricLabels prometheus.Labels + metricsEnabled bool refs sync.RWMutex // RWMutex to track references to the SeriesFile that are in use. @@ -53,10 +54,9 @@ type SeriesFile struct { // NewSeriesFile returns a new instance of SeriesFile. func NewSeriesFile(path string) *SeriesFile { return &SeriesFile{ - path: path, - // partitionMetrics: newSeriesFileMetrics(nil), - // indexMetrics: rhh.NewMetrics(namespace, seriesFileSubsystem+"_index", nil), - Logger: zap.NewNop(), + path: path, + metricsEnabled: true, + Logger: zap.NewNop(), } } @@ -74,6 +74,12 @@ func (f *SeriesFile) SetDefaultMetricLabels(labels prometheus.Labels) { } } +// DisableMetrics ensures that activity is not collected via the prometheus metrics. +// DisableMetrics must be called before Open. +func (f *SeriesFile) DisableMetrics() { + f.metricsEnabled = false +} + // Open memory maps the data file at the file's path. func (f *SeriesFile) Open() error { _, logEnd := logger.NewOperation(f.Logger, "Opening Series File", "series_file_open", zap.String("path", f.path)) @@ -90,10 +96,10 @@ func (f *SeriesFile) Open() error { // Initialise metrics for trackers. mmu.Lock() - if sms == nil { + if sms == nil && f.metricsEnabled { sms = newSeriesFileMetrics(f.defaultMetricLabels) } - if ims == nil { + if ims == nil && f.metricsEnabled { // Make a copy of the default labels so that another label can be provided. labels := make(prometheus.Labels, len(f.defaultMetricLabels)) for k, v := range f.defaultMetricLabels { @@ -122,9 +128,11 @@ func (f *SeriesFile) Open() error { p.index.rhhMetrics = ims p.index.rhhLabels = labels + p.index.rhhMetricsEnabled = f.metricsEnabled // Set the metric trackers on the partition with any injected default labels. p.tracker = newSeriesPartitionTracker(sms, labels) + p.tracker.enabled = f.metricsEnabled if err := p.Open(); err != nil { f.Close() diff --git a/tsdb/series_index.go b/tsdb/series_index.go index 170d0cc01c..5298da973b 100644 --- a/tsdb/series_index.go +++ b/tsdb/series_index.go @@ -46,8 +46,9 @@ type SeriesIndex struct { // metrics stores a shard instance of some Prometheus metrics. metrics // must be set before Open is called. - rhhMetrics *rhh.Metrics - rhhLabels prometheus.Labels + rhhMetrics *rhh.Metrics + rhhLabels prometheus.Labels + rhhMetricsEnabled bool data []byte // mmap data keyIDData []byte // key/id mmap data @@ -61,7 +62,8 @@ type SeriesIndex struct { func NewSeriesIndex(path string) *SeriesIndex { return &SeriesIndex{ - path: path, + path: path, + rhhMetricsEnabled: true, } } @@ -95,6 +97,7 @@ func (idx *SeriesIndex) Open() (err error) { options := rhh.DefaultOptions options.Metrics = idx.rhhMetrics options.Labels = idx.rhhLabels + options.MetricsEnabled = idx.rhhMetricsEnabled idx.keyIDMap = rhh.NewHashMap(options) idx.idOffsetMap = make(map[SeriesID]int64) diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index 4ef1178be0..44032e69a0 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -561,12 +561,14 @@ func (p *SeriesPartition) seriesKeyByOffset(offset int64) []byte { type seriesPartitionTracker struct { metrics *seriesFileMetrics labels prometheus.Labels + enabled bool } func newSeriesPartitionTracker(metrics *seriesFileMetrics, defaultLabels prometheus.Labels) *seriesPartitionTracker { return &seriesPartitionTracker{ metrics: metrics, labels: defaultLabels, + enabled: true, } } @@ -581,36 +583,60 @@ func (t *seriesPartitionTracker) Labels() prometheus.Labels { // AddSeriesCreated increases the number of series created in the partition by n. func (t *seriesPartitionTracker) AddSeriesCreated(n uint64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.SeriesCreated.With(labels).Add(float64(n)) } // SetSeries sets the number of series in the partition. func (t *seriesPartitionTracker) SetSeries(n uint64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.Series.With(labels).Set(float64(n)) } // AddSeries increases the number of series in the partition by n. func (t *seriesPartitionTracker) AddSeries(n uint64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.Series.With(labels).Add(float64(n)) } // SubSeries decreases the number of series in the partition by n. func (t *seriesPartitionTracker) SubSeries(n uint64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.Series.With(labels).Sub(float64(n)) } // SetDiskSize sets the number of bytes used by files for in partition. func (t *seriesPartitionTracker) SetDiskSize(sz uint64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.DiskSize.With(labels).Set(float64(sz)) } // SetSegments sets the number of segments files for the partition. func (t *seriesPartitionTracker) SetSegments(n uint64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.Segments.With(labels).Set(float64(n)) } @@ -618,6 +644,10 @@ func (t *seriesPartitionTracker) SetSegments(n uint64) { // IncCompactionsActive increments the number of active compactions for the // components of a partition (index and segments). func (t *seriesPartitionTracker) IncCompactionsActive() { + if !t.enabled { + return + } + labels := t.Labels() labels["component"] = "index" // TODO(edd): when we add segment compactions we will add a new label value. t.metrics.CompactionsActive.With(labels).Inc() @@ -626,6 +656,10 @@ func (t *seriesPartitionTracker) IncCompactionsActive() { // DecCompactionsActive decrements the number of active compactions for the // components of a partition (index and segments). func (t *seriesPartitionTracker) DecCompactionsActive() { + if !t.enabled { + return + } + labels := t.Labels() labels["component"] = "index" // TODO(edd): when we add segment compactions we will add a new label value. t.metrics.CompactionsActive.With(labels).Dec() @@ -634,6 +668,10 @@ func (t *seriesPartitionTracker) DecCompactionsActive() { // incCompactions increments the number of compactions for the partition. // Callers should use IncCompactionOK and IncCompactionErr. func (t *seriesPartitionTracker) incCompactions(status string, duration time.Duration) { + if !t.enabled { + return + } + if duration > 0 { labels := t.Labels() labels["component"] = "index" diff --git a/tsdb/tsi1/cache.go b/tsdb/tsi1/cache.go index d711fabb8c..7c198525ce 100644 --- a/tsdb/tsi1/cache.go +++ b/tsdb/tsi1/cache.go @@ -216,10 +216,11 @@ type seriesIDCacheElement struct { type cacheTracker struct { metrics *cacheMetrics labels prometheus.Labels + enabled bool } func newCacheTracker(metrics *cacheMetrics, defaultLabels prometheus.Labels) *cacheTracker { - return &cacheTracker{metrics: metrics, labels: defaultLabels} + return &cacheTracker{metrics: metrics, labels: defaultLabels, enabled: true} } // Labels returns a copy of labels for use with index cache metrics. @@ -232,11 +233,19 @@ func (t *cacheTracker) Labels() prometheus.Labels { } func (t *cacheTracker) SetSize(sz uint64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.Size.With(labels).Set(float64(sz)) } func (t *cacheTracker) incGet(status string) { + if !t.enabled { + return + } + labels := t.Labels() labels["status"] = status t.metrics.Gets.With(labels).Inc() @@ -246,6 +255,10 @@ func (t *cacheTracker) IncGetHit() { t.incGet("hit") } func (t *cacheTracker) IncGetMiss() { t.incGet("miss") } func (t *cacheTracker) incPut(status string) { + if !t.enabled { + return + } + labels := t.Labels() labels["status"] = status t.metrics.Puts.With(labels).Inc() @@ -255,6 +268,10 @@ func (t *cacheTracker) IncPutHit() { t.incPut("hit") } func (t *cacheTracker) IncPutMiss() { t.incPut("miss") } func (t *cacheTracker) incDeletes(status string) { + if !t.enabled { + return + } + labels := t.Labels() labels["status"] = status t.metrics.Deletes.With(labels).Inc() @@ -264,6 +281,10 @@ func (t *cacheTracker) IncDeletesHit() { t.incDeletes("hit") } func (t *cacheTracker) IncDeletesMiss() { t.incDeletes("miss") } func (t *cacheTracker) IncEvictions() { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.Evictions.With(labels).Inc() } diff --git a/tsdb/tsi1/index.go b/tsdb/tsi1/index.go index ce831fa35a..a2f45c7633 100644 --- a/tsdb/tsi1/index.go +++ b/tsdb/tsi1/index.go @@ -88,6 +88,14 @@ var WithLogFileBufferSize = func(sz int) IndexOption { } } +// DisableMetrics ensures that activity is not collected via the prometheus metrics. +// DisableMetrics must be called before Open. +var DisableMetrics = func() IndexOption { + return func(i *Index) { + i.metricsEnabled = false + } +} + // Index represents a collection of layered index files and WAL. type Index struct { mu sync.RWMutex @@ -98,6 +106,7 @@ type Index struct { tagValueCache *TagValueSeriesIDCache partitionMetrics *partitionMetrics // Maintain a single set of partition metrics to be shared by partition. + metricsEnabled bool // The following may be set when initializing an Index. path string // Root directory of the index partitions. @@ -127,6 +136,7 @@ func NewIndex(sfile *tsdb.SeriesFile, c Config, options ...IndexOption) *Index { idx := &Index{ tagValueCache: NewTagValueSeriesIDCache(c.SeriesIDSetCacheSize), partitionMetrics: newPartitionMetrics(nil), + metricsEnabled: true, maxLogFileSize: int64(c.MaxIndexLogFileSize), logger: zap.NewNop(), version: Version, @@ -210,16 +220,17 @@ func (i *Index) Open() error { } mmu.Lock() - if cms == nil { + if cms == nil && i.metricsEnabled { cms = newCacheMetrics(i.defaultLabels) } - if pms == nil { + if pms == nil && i.metricsEnabled { pms = newPartitionMetrics(i.defaultLabels) } mmu.Unlock() // Set the correct shared metrics on the cache i.tagValueCache.tracker = newCacheTracker(cms, i.defaultLabels) + i.tagValueCache.tracker.enabled = i.metricsEnabled // Initialize index partitions. i.partitions = make([]*Partition, i.PartitionN) @@ -238,6 +249,7 @@ func (i *Index) Open() error { } labels["index_partition"] = fmt.Sprint(j) p.tracker = newPartitionTracker(pms, labels) + p.tracker.enabled = i.metricsEnabled i.partitions[j] = p } diff --git a/tsdb/tsi1/partition.go b/tsdb/tsi1/partition.go index 57e3809b7c..6a611f2d62 100644 --- a/tsdb/tsi1/partition.go +++ b/tsdb/tsi1/partition.go @@ -1294,12 +1294,14 @@ func (p *Partition) MeasurementCardinalityStats() MeasurementCardinalityStats { type partitionTracker struct { metrics *partitionMetrics labels prometheus.Labels + enabled bool // Allows tracker to be disabled. } func newPartitionTracker(metrics *partitionMetrics, defaultLabels prometheus.Labels) *partitionTracker { return &partitionTracker{ metrics: metrics, labels: defaultLabels, + enabled: true, } } @@ -1315,6 +1317,10 @@ func (t *partitionTracker) Labels() prometheus.Labels { // AddSeriesCreated increases the number of series created in the partition by n // and sets a sample of the time taken to create a series. func (t *partitionTracker) AddSeriesCreated(n uint64, d time.Duration) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.SeriesCreated.With(labels).Add(float64(n)) @@ -1328,48 +1334,80 @@ func (t *partitionTracker) AddSeriesCreated(n uint64, d time.Duration) { // AddSeriesDropped increases the number of series dropped in the partition by n. func (t *partitionTracker) AddSeriesDropped(n uint64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.SeriesDropped.With(labels).Add(float64(n)) } // SetSeries sets the number of series in the partition. func (t *partitionTracker) SetSeries(n uint64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.Series.With(labels).Set(float64(n)) } // AddSeries increases the number of series in the partition by n. func (t *partitionTracker) AddSeries(n uint64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.Series.With(labels).Add(float64(n)) } // SubSeries decreases the number of series in the partition by n. func (t *partitionTracker) SubSeries(n uint64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.Series.With(labels).Sub(float64(n)) } // SetMeasurements sets the number of measurements in the partition. func (t *partitionTracker) SetMeasurements(n uint64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.Measurements.With(labels).Set(float64(n)) } // AddMeasurements increases the number of measurements in the partition by n. func (t *partitionTracker) AddMeasurements(n uint64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.Measurements.With(labels).Add(float64(n)) } // SubMeasurements decreases the number of measurements in the partition by n. func (t *partitionTracker) SubMeasurements(n uint64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.Measurements.With(labels).Sub(float64(n)) } // SetFiles sets the number of files in the partition. func (t *partitionTracker) SetFiles(n uint64, typ string) { + if !t.enabled { + return + } + labels := t.Labels() labels["type"] = typ t.metrics.FilesTotal.With(labels).Set(float64(n)) @@ -1377,12 +1415,20 @@ func (t *partitionTracker) SetFiles(n uint64, typ string) { // SetDiskSize sets the size of files in the partition. func (t *partitionTracker) SetDiskSize(n uint64) { + if !t.enabled { + return + } + labels := t.Labels() t.metrics.DiskSize.With(labels).Set(float64(n)) } // IncActiveCompaction increments the number of active compactions for the provided level. func (t *partitionTracker) IncActiveCompaction(level int) { + if !t.enabled { + return + } + labels := t.Labels() labels["level"] = fmt.Sprint(level) @@ -1391,6 +1437,10 @@ func (t *partitionTracker) IncActiveCompaction(level int) { // DecActiveCompaction decrements the number of active compactions for the provided level. func (t *partitionTracker) DecActiveCompaction(level int) { + if !t.enabled { + return + } + labels := t.Labels() labels["level"] = fmt.Sprint(level) @@ -1399,6 +1449,10 @@ func (t *partitionTracker) DecActiveCompaction(level int) { // CompactionAttempted updates the number of compactions attempted for the provided level. func (t *partitionTracker) CompactionAttempted(level int, success bool, d time.Duration) { + if !t.enabled { + return + } + labels := t.Labels() labels["level"] = fmt.Sprint(level) if success {