Add TSI index metrics

pull/10616/head
Edd Robinson 2018-11-29 14:13:16 +00:00
parent c76626accf
commit 7960ccc320
8 changed files with 505 additions and 18 deletions

View File

@ -19,7 +19,7 @@ type Metrics struct {
// These metrics have an extra label status = {"hit", "miss"}
Gets *prometheus.CounterVec // Number of times item retrieved.
Puts *prometheus.CounterVec // Number of times item retrieved.
Puts *prometheus.CounterVec // Number of times item inserted.
}
// NewMetrics initialises prometheus metrics for tracking an RHH hashmap.

View File

@ -138,6 +138,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
// Set default metrics labels.
e.engine.SetDefaultMetricLabels(e.defaultMetricLabels)
e.sfile.SetDefaultMetricLabels(e.defaultMetricLabels)
e.index.SetDefaultMetricLabels(e.defaultMetricLabels)
return e
}
@ -165,8 +166,8 @@ func (e *Engine) WithLogger(log *zap.Logger) {
// the engine and its components.
func (e *Engine) PrometheusCollectors() []prometheus.Collector {
var metrics []prometheus.Collector
// TODO(edd): Get prom metrics for index.
metrics = append(metrics, e.sfile.PrometheusCollectors()...)
metrics = append(metrics, e.index.PrometheusCollectors()...)
metrics = append(metrics, e.engine.PrometheusCollectors()...)
metrics = append(metrics, e.retentionEnforcer.PrometheusCollectors()...)
return metrics

View File

@ -70,7 +70,7 @@ func newSeriesFileMetrics(labels prometheus.Labels) *seriesFileMetrics {
Subsystem: seriesFileSubsystem,
Name: "index_compactions_active",
Help: "Number of active index compactions.",
}, names),
}, durationCompaction),
CompactionDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: seriesFileSubsystem,

View File

@ -5,13 +5,14 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/influxdata/platform/logger"
"github.com/influxdata/platform/pkg/rhh"
"os"
"path/filepath"
"sort"
"sync"
"github.com/influxdata/platform/logger"
"github.com/influxdata/platform/pkg/rhh"
"github.com/cespare/xxhash"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/pkg/binaryutil"
@ -67,7 +68,10 @@ func (f *SeriesFile) WithLogger(log *zap.Logger) {
// SetDefaultMetricLabels sets the default labels for metrics on the Series File.
// It must be called before the SeriesFile is opened.
func (f *SeriesFile) SetDefaultMetricLabels(labels prometheus.Labels) {
f.defaultMetricLabels = labels
f.defaultMetricLabels = make(prometheus.Labels, len(labels))
for k, v := range labels {
f.defaultMetricLabels[k] = v
}
f.defaultMetricLabels["partition_id"] = "" // All metrics have partition_id as a label.
}

View File

@ -5,6 +5,7 @@ import (
"sync"
"github.com/influxdata/platform/tsdb"
"github.com/prometheus/client_golang/prometheus"
)
// TagValueSeriesIDCache is an LRU cache for series id sets associated with
@ -24,6 +25,7 @@ type TagValueSeriesIDCache struct {
cache map[string]map[string]map[string]*list.Element
evictor *list.List
tracker *cacheTracker
capacity int
}
@ -32,6 +34,7 @@ func NewTagValueSeriesIDCache(c int) *TagValueSeriesIDCache {
return &TagValueSeriesIDCache{
cache: map[string]map[string]map[string]*list.Element{},
evictor: list.New(),
tracker: newCacheTracker(newCacheMetrics(nil)),
capacity: c,
}
}
@ -48,11 +51,13 @@ func (c *TagValueSeriesIDCache) get(name, key, value []byte) *tsdb.SeriesIDSet {
if mmap, ok := c.cache[string(name)]; ok {
if tkmap, ok := mmap[string(key)]; ok {
if ele, ok := tkmap[string(value)]; ok {
c.tracker.IncGetHit()
c.evictor.MoveToFront(ele) // This now becomes most recently used.
return ele.Value.(*seriesIDCacheElement).SeriesIDSet
}
}
}
c.tracker.IncGetMiss()
return nil
}
@ -100,6 +105,7 @@ func (c *TagValueSeriesIDCache) Put(name, key, value []byte, ss *tsdb.SeriesIDSe
// Check under the write lock if the relevant item is now in the cache.
if c.exists(name, key, value) {
c.Unlock()
c.tracker.IncPutHit()
return
}
defer c.Unlock()
@ -136,6 +142,7 @@ func (c *TagValueSeriesIDCache) Put(name, key, value []byte, ss *tsdb.SeriesIDSe
EVICT:
c.checkEviction()
c.tracker.IncPutMiss()
}
// Delete removes x from the tuple {name, key, value} if it exists.
@ -153,16 +160,21 @@ func (c *TagValueSeriesIDCache) delete(name, key, value []byte, x tsdb.SeriesID)
if ele, ok := tkmap[string(value)]; ok {
if ss := ele.Value.(*seriesIDCacheElement).SeriesIDSet; ss != nil {
ele.Value.(*seriesIDCacheElement).SeriesIDSet.Remove(x)
c.tracker.IncDeletesHit()
return
}
}
}
}
c.tracker.IncDeletesMiss()
}
// checkEviction checks if the cache is too big, and evicts the least recently used
// item if it is.
func (c *TagValueSeriesIDCache) checkEviction() {
if c.evictor.Len() <= c.capacity {
l := c.evictor.Len()
c.tracker.SetSize(uint64(l))
if l <= c.capacity {
return
}
@ -184,6 +196,13 @@ func (c *TagValueSeriesIDCache) checkEviction() {
if len(c.cache[string(name)]) == 0 {
delete(c.cache, string(name))
}
c.tracker.IncEvictions()
}
func (c *TagValueSeriesIDCache) PrometheusCollectors() []prometheus.Collector {
var collectors []prometheus.Collector
collectors = append(collectors, c.tracker.metrics.PrometheusCollectors()...)
return collectors
}
// seriesIDCacheElement is an item stored within a cache.
@ -193,3 +212,48 @@ type seriesIDCacheElement struct {
value []byte
SeriesIDSet *tsdb.SeriesIDSet
}
type cacheTracker struct {
metrics *cacheMetrics
}
func newCacheTracker(metrics *cacheMetrics) *cacheTracker {
return &cacheTracker{metrics: metrics}
}
func (t *cacheTracker) SetSize(sz uint64) {
labels := t.metrics.Labels()
t.metrics.Size.With(labels).Set(float64(sz))
}
func (t *cacheTracker) incGet(status string) {
labels := t.metrics.Labels()
labels["status"] = status
t.metrics.Gets.With(labels).Inc()
}
func (t *cacheTracker) IncGetHit() { t.incGet("hit") }
func (t *cacheTracker) IncGetMiss() { t.incGet("miss") }
func (t *cacheTracker) incPut(status string) {
labels := t.metrics.Labels()
labels["status"] = status
t.metrics.Puts.With(labels).Inc()
}
func (t *cacheTracker) IncPutHit() { t.incPut("hit") }
func (t *cacheTracker) IncPutMiss() { t.incPut("miss") }
func (t *cacheTracker) incDeletes(status string) {
labels := t.metrics.Labels()
labels["status"] = status
t.metrics.Deletes.With(labels).Inc()
}
func (t *cacheTracker) IncDeletesHit() { t.incDeletes("hit") }
func (t *cacheTracker) IncDeletesMiss() { t.incDeletes("miss") }
func (t *cacheTracker) IncEvictions() {
labels := t.metrics.Labels()
t.metrics.Evictions.With(labels).Inc()
}

View File

@ -13,6 +13,8 @@ import (
"sync/atomic"
"unsafe"
"github.com/prometheus/client_golang/prometheus"
"bytes"
"sort"
@ -109,7 +111,10 @@ type Index struct {
partitions []*Partition
opened bool
defaultLabels prometheus.Labels
tagValueCache *TagValueSeriesIDCache
partitionMetrics *partitionMetrics // Maintain a single set of partition metrics to be shared by partition.
// The following may be set when initializing an Index.
path string // Root directory of the index partitions.
@ -137,6 +142,7 @@ func (i *Index) UniqueReferenceID() uintptr {
func NewIndex(sfile *tsdb.SeriesFile, c Config, options ...IndexOption) *Index {
idx := &Index{
tagValueCache: NewTagValueSeriesIDCache(DefaultSeriesIDSetCacheSize),
partitionMetrics: newPartitionMetrics(nil),
maxLogFileSize: int64(c.MaxIndexLogFileSize),
logger: zap.NewNop(),
version: Version,
@ -151,6 +157,16 @@ func NewIndex(sfile *tsdb.SeriesFile, c Config, options ...IndexOption) *Index {
return idx
}
// SetDefaultMetricLabels sets the default labels on the trackers.
func (i *Index) SetDefaultMetricLabels(labels prometheus.Labels) {
i.defaultLabels = make(prometheus.Labels, len(labels))
for k, v := range labels {
i.defaultLabels[k] = v
}
i.tagValueCache.tracker = newCacheTracker(newCacheMetrics(labels))
i.partitionMetrics = newPartitionMetrics(labels)
}
// Bytes estimates the memory footprint of this Index, in bytes.
func (i *Index) Bytes() int {
var b int
@ -218,6 +234,7 @@ func (i *Index) Open() error {
p.nosync = i.disableFsync
p.logbufferSize = i.logfileBufferSize
p.logger = i.logger.With(zap.String("tsi1_partition", fmt.Sprint(j+1)))
p.tracker = newPartitionTracker(i.partitionMetrics, j)
i.partitions[j] = p
}
@ -1517,6 +1534,14 @@ func (i *Index) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byte,
return tsdb.DifferenceSeriesIDIterators(mitr, tsdb.MergeSeriesIDIterators(itrs...)), nil
}
// PrometheusCollectors returns all of the metrics for the index.
func (i *Index) PrometheusCollectors() []prometheus.Collector {
var collectors []prometheus.Collector
collectors = append(collectors, i.tagValueCache.PrometheusCollectors()...)
collectors = append(collectors, i.partitionMetrics.PrometheusCollectors()...)
return collectors
}
// IsIndexDir returns true if directory contains at least one partition directory.
func IsIndexDir(path string) (bool, error) {
fis, err := ioutil.ReadDir(path)

229
tsdb/tsi1/metrics.go Normal file
View File

@ -0,0 +1,229 @@
package tsi1
import (
"fmt"
"sort"
"github.com/prometheus/client_golang/prometheus"
)
// namespace is the leading part of all published metrics for the Storage service.
const namespace = "storage"
const cacheSubsystem = "tsi_cache" // sub-system associated with TSI index cache.
const partitionSubsystem = "tsi_index" // sub-system associated with the TSI index.
type cacheMetrics struct {
labels prometheus.Labels
Size *prometheus.GaugeVec // Size of the cache.
// These metrics have an extra label status = {"hit", "miss"}
Gets *prometheus.CounterVec // Number of times item retrieved.
Puts *prometheus.CounterVec // Number of times item inserted.
Deletes *prometheus.CounterVec // Number of times item deleted.
Evictions *prometheus.CounterVec // Number of times item deleted.
}
// newCacheMetrics initialises the prometheus metrics for tracking the Series File.
func newCacheMetrics(labels prometheus.Labels) *cacheMetrics {
var names []string
for k := range labels {
names = append(names, k)
}
sort.Strings(names)
statusNames := append(names, "status")
sort.Strings(statusNames)
return &cacheMetrics{
labels: labels,
Size: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: cacheSubsystem,
Name: "size",
Help: "Number of items residing in the cache.",
}, names),
Gets: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: cacheSubsystem,
Name: "get_total",
Help: "Total number of gets on cache.",
}, statusNames),
Puts: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: cacheSubsystem,
Name: "put_total",
Help: "Total number of insertions in cache.",
}, statusNames),
Deletes: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: cacheSubsystem,
Name: "deletes_total",
Help: "Total number of deletions in cache.",
}, statusNames),
Evictions: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: cacheSubsystem,
Name: "evictions_total",
Help: "Total number of cache evictions.",
}, names),
}
}
// Labels returns a copy of labels for use with RHH metrics.
func (m *cacheMetrics) Labels() prometheus.Labels {
l := make(map[string]string, len(m.labels))
for k, v := range m.labels {
l[k] = v
}
return l
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (m *cacheMetrics) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
m.Size,
m.Gets,
m.Puts,
m.Deletes,
m.Evictions,
}
}
type partitionMetrics struct {
labels prometheus.Labels
SeriesCreated *prometheus.CounterVec // Number of series created in Series File.
SeriesCreatedDuration *prometheus.HistogramVec // Distribution of time to insert series.
SeriesDropped *prometheus.CounterVec // Number of series removed from index.
Series *prometheus.GaugeVec // Number of series.
Measurements *prometheus.GaugeVec // Number of measurements.
DiskSize *prometheus.GaugeVec // Size occupied on disk.
// This metrics has a "type" = {index, log}
FilesTotal *prometheus.GaugeVec // files on disk.
// This metric has a "level" metric.
CompactionsActive *prometheus.GaugeVec // Number of active compactions.
// These metrics have a "level" metric.
// The following metrics include a "status" = {ok, error}` label
CompactionDuration *prometheus.HistogramVec // Duration of compactions.
Compactions *prometheus.CounterVec // Total number of compactions.
}
// newPartitionMetrics initialises the prometheus metrics for tracking the TSI partitions.
func newPartitionMetrics(labels prometheus.Labels) *partitionMetrics {
names := []string{"partition_id"} // All metrics have a partition
for k := range labels {
names = append(names, k)
}
sort.Strings(names)
// type = {"index", "log"}
fileNames := append(names, "type")
sort.Strings(fileNames)
// level = [0, 7]
compactionNames := append(names, "level")
sort.Strings(compactionNames)
// status = {"ok", "error"}
attemptedCompactionNames := append(compactionNames, "status")
sort.Strings(attemptedCompactionNames)
return &partitionMetrics{
labels: labels,
SeriesCreated: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: partitionSubsystem,
Name: "series_created",
Help: "Number of series created in the partition.",
}, names),
SeriesCreatedDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: partitionSubsystem,
Name: "series_created_duration_ns",
Help: "Time taken in nanosecond to create single series.",
// 30 buckets spaced exponentially between 100ns and ~19 us.
Buckets: prometheus.ExponentialBuckets(100.0, 1.2, 30),
}, names),
SeriesDropped: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: partitionSubsystem,
Name: "series_dropped",
Help: "Number of series dropped from the partition.",
}, names),
Series: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: partitionSubsystem,
Name: "series_total",
Help: "Number of series in the partition.",
}, names),
Measurements: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: partitionSubsystem,
Name: "measurements_total",
Help: "Number of series in the partition.",
}, names),
FilesTotal: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: partitionSubsystem,
Name: "files_total",
Help: "Number of files in the partition.",
}, fileNames),
DiskSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: partitionSubsystem,
Name: "disk_bytes",
Help: "Number of bytes TSI partition is using on disk.",
}, names),
CompactionsActive: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: partitionSubsystem,
Name: "compactions_active",
Help: "Number of active partition compactions.",
}, compactionNames),
CompactionDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: partitionSubsystem,
Name: "compactions_duration_seconds",
Help: "Time taken for a successful compaction of partition.",
// 30 buckets spaced exponentially between 1s and ~10 minutes.
Buckets: prometheus.ExponentialBuckets(1.0, 1.25, 30),
}, compactionNames),
Compactions: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: partitionSubsystem,
Name: "compactions",
Help: "Number of compactions.",
}, attemptedCompactionNames),
}
}
// Labels returns a copy of labels for use with TSI partition metrics.
func (m *partitionMetrics) Labels(partition int) prometheus.Labels {
l := make(map[string]string, len(m.labels))
for k, v := range m.labels {
l[k] = v
}
// N.B all series file metrics include the partition. So it's included here.
l["partition_id"] = fmt.Sprint(partition)
return l
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (m *partitionMetrics) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
m.SeriesCreated,
m.SeriesCreatedDuration,
m.SeriesDropped,
m.Series,
m.Measurements,
m.FilesTotal,
m.DiskSize,
m.CompactionsActive,
m.CompactionDuration,
m.Compactions,
}
}

View File

@ -54,6 +54,8 @@ type Partition struct {
// Measurement stats
stats MeasurementCardinalityStats
tracker *partitionTracker
// Fast series lookup of series IDs in the series file that have been present
// in this partition. This set tracks both insertions and deletions of a series.
seriesIDSet *tsdb.SeriesIDSet
@ -92,7 +94,7 @@ type Partition struct {
// NewPartition returns a new instance of Partition.
func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition {
return &Partition{
partition := &Partition{
closing: make(chan struct{}),
path: path,
sfile: sfile,
@ -106,6 +108,11 @@ func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition {
logger: zap.NewNop(),
version: Version,
}
base := filepath.Base(path)
id, _ := strconv.Atoi(base) // Ignore error because we will re-check during Open.
partition.tracker = newPartitionTracker(newPartitionMetrics(nil), id)
return partition
}
// bytes estimates the memory footprint of this Partition, in bytes.
@ -244,6 +251,10 @@ func (p *Partition) Open() error {
if err := p.buildSeriesSet(); err != nil {
return err
}
p.tracker.SetSeries(p.seriesIDSet.Cardinality())
p.tracker.SetFiles(uint64(len(p.fileSet.IndexFiles())), "index")
p.tracker.SetFiles(uint64(len(p.fileSet.LogFiles())), "log")
p.tracker.SetDiskSize(uint64(p.fileSet.Size()))
// Mark opened.
p.opened = true
@ -472,6 +483,11 @@ func (p *Partition) prependActiveLogFile() error {
if err := p.writeStatsFile(); err != nil {
return err
}
// Set the file metrics again.
p.tracker.SetFiles(uint64(len(p.fileSet.IndexFiles())), "index")
p.tracker.SetFiles(uint64(len(p.fileSet.LogFiles())), "log")
p.tracker.SetDiskSize(uint64(p.fileSet.Size()))
return nil
}
@ -663,6 +679,7 @@ func (p *Partition) createSeriesListIfNotExists(collection *tsdb.SeriesCollectio
defer fs.Release()
// Ensure fileset cannot change during insert.
now := time.Now()
p.mu.RLock()
// Insert series into log file.
ids, err := p.activeLogFile.AddSeriesList(p.seriesIDSet, collection)
@ -675,9 +692,26 @@ func (p *Partition) createSeriesListIfNotExists(collection *tsdb.SeriesCollectio
if err := p.CheckLogFile(); err != nil {
return nil, err
}
// NOTE(edd): if this becomes expensive then we can move the count into the
// log file.
var totalNew uint64
for _, id := range ids {
if !id.IsZero() {
totalNew++
}
}
if totalNew > 0 {
p.tracker.AddSeriesCreated(totalNew, time.Since(now))
p.tracker.AddSeries(totalNew)
p.tracker.SetDiskSize(uint64(p.fileSet.Size()))
}
return ids, nil
}
// DropSeries removes the provided series id from the index.
//
// TODO(edd): We should support a bulk drop here.
func (p *Partition) DropSeries(seriesID tsdb.SeriesID) error {
// Ignore if the series is already deleted.
if !p.seriesIDSet.Contains(seriesID) {
@ -691,6 +725,8 @@ func (p *Partition) DropSeries(seriesID tsdb.SeriesID) error {
// Update series set.
p.seriesIDSet.Remove(seriesID)
p.tracker.AddSeriesDropped(1)
p.tracker.SubSeries(1)
// Swap log file, if necessary.
return p.CheckLogFile()
@ -924,6 +960,21 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
assert(len(files) >= 2, "at least two index files are required for compaction")
assert(level > 0, "cannot compact level zero")
var err error
var start time.Time
p.tracker.IncActiveCompaction(level)
// Set the relevant metrics at the end of any compaction.
defer func() {
p.tracker.SetFiles(uint64(len(p.fileSet.IndexFiles())), "index")
p.tracker.SetFiles(uint64(len(p.fileSet.LogFiles())), "log")
p.tracker.SetDiskSize(uint64(p.fileSet.Size()))
p.tracker.DecActiveCompaction(level)
success := err == nil
p.tracker.CompactionAttempted(level, success, time.Since(start))
}()
// Build a logger for this compaction.
log, logEnd := logger.NewOperation(p.logger, "TSI level compaction", "tsi1_compact_to_level", zap.Int("tsi1_level", level))
defer logEnd()
@ -942,12 +993,12 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
defer once.Do(func() { IndexFiles(files).Release() })
// Track time to compact.
start := time.Now()
start = time.Now()
// Create new index file.
path := filepath.Join(p.path, FormatIndexFileName(p.NextSequence(), level))
f, err := os.Create(path)
if err != nil {
var f *os.File
if f, err = os.Create(path);err != nil {
log.Error("Cannot create compaction files", zap.Error(err))
return
}
@ -960,14 +1011,14 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
// Compact all index files to new index file.
lvl := p.levels[level]
n, err := IndexFiles(files).CompactTo(f, p.sfile, lvl.M, lvl.K, interrupt)
if err != nil {
var n int64
if n, err = IndexFiles(files).CompactTo(f, p.sfile, lvl.M, lvl.K, interrupt); err != nil {
log.Error("Cannot compact index files", zap.Error(err))
return
}
// Close file.
if err := f.Close(); err != nil {
if err = f.Close(); err != nil {
log.Error("Error closing index file", zap.Error(err))
return
}
@ -975,13 +1026,13 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
// Reopen as an index file.
file := NewIndexFile(p.sfile)
file.SetPath(path)
if err := file.Open(); err != nil {
if err = file.Open(); err != nil {
log.Error("Cannot open new index file", zap.Error(err))
return
}
// Obtain lock to swap in index file and write manifest.
if err := func() error {
if err = func() error {
p.mu.Lock()
defer p.mu.Unlock()
@ -1021,10 +1072,10 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch
for _, f := range files {
log.Info("Removing index file", zap.String("path", f.Path()))
if err := f.Close(); err != nil {
if err = f.Close(); err != nil {
log.Error("Cannot close index file", zap.Error(err))
return
} else if err := os.Remove(f.Path()); err != nil {
} else if err = os.Remove(f.Path()); err != nil {
log.Error("Cannot remove index file", zap.Error(err))
return
}
@ -1228,6 +1279,119 @@ func (p *Partition) MeasurementCardinalityStats() MeasurementCardinalityStats {
return stats
}
type partitionTracker struct {
metrics *partitionMetrics
id int // ID of partition.
}
func newPartitionTracker(metrics *partitionMetrics, partition int) *partitionTracker {
return &partitionTracker{
metrics: metrics,
id: partition,
}
}
// AddSeriesCreated increases the number of series created in the partition by n
// and sets a sample of the time taken to create a series.
func (t *partitionTracker) AddSeriesCreated(n uint64, d time.Duration) {
labels := t.metrics.Labels(t.id)
t.metrics.SeriesCreated.With(labels).Add(float64(n))
if n == 0 {
return // Nothing to record
}
perseries := d.Seconds() / float64(n)
t.metrics.SeriesCreatedDuration.With(labels).Observe(perseries)
}
// AddSeriesDropped increases the number of series dropped in the partition by n.
func (t *partitionTracker) AddSeriesDropped(n uint64) {
labels := t.metrics.Labels(t.id)
t.metrics.SeriesDropped.With(labels).Add(float64(n))
}
// SetSeries sets the number of series in the partition.
func (t *partitionTracker) SetSeries(n uint64) {
labels := t.metrics.Labels(t.id)
t.metrics.Series.With(labels).Set(float64(n))
}
// AddSeries increases the number of series in the partition by n.
func (t *partitionTracker) AddSeries(n uint64) {
labels := t.metrics.Labels(t.id)
t.metrics.Series.With(labels).Add(float64(n))
}
// SubSeries decreases the number of series in the partition by n.
func (t *partitionTracker) SubSeries(n uint64) {
labels := t.metrics.Labels(t.id)
t.metrics.Series.With(labels).Sub(float64(n))
}
// SetMeasurements sets the number of measurements in the partition.
func (t *partitionTracker) SetMeasurements(n uint64) {
labels := t.metrics.Labels(t.id)
t.metrics.Measurements.With(labels).Set(float64(n))
}
// AddMeasurements increases the number of measurements in the partition by n.
func (t *partitionTracker) AddMeasurements(n uint64) {
labels := t.metrics.Labels(t.id)
t.metrics.Measurements.With(labels).Add(float64(n))
}
// SubMeasurements decreases the number of measurements in the partition by n.
func (t *partitionTracker) SubMeasurements(n uint64) {
labels := t.metrics.Labels(t.id)
t.metrics.Measurements.With(labels).Sub(float64(n))
}
// SetFiles sets the number of files in the partition.
func (t *partitionTracker) SetFiles(n uint64, typ string) {
labels := t.metrics.Labels(t.id)
labels["type"] = typ
t.metrics.FilesTotal.With(labels).Set(float64(n))
}
// SetDiskSize sets the size of files in the partition.
func (t *partitionTracker) SetDiskSize(n uint64) {
labels := t.metrics.Labels(t.id)
t.metrics.DiskSize.With(labels).Set(float64(n))
}
// IncActiveCompaction increments the number of active compactions for the provided level.
func (t *partitionTracker) IncActiveCompaction(level int) {
labels := t.metrics.Labels(t.id)
labels["level"] = fmt.Sprint(level)
t.metrics.CompactionsActive.With(labels).Inc()
}
// DecActiveCompaction decrements the number of active compactions for the provided level.
func (t *partitionTracker) DecActiveCompaction(level int) {
labels := t.metrics.Labels(t.id)
labels["level"] = fmt.Sprint(level)
t.metrics.CompactionsActive.With(labels).Dec()
}
// CompactionAttempted updates the number of compactions attempted for the provided level.
func (t *partitionTracker) CompactionAttempted(level int, success bool, d time.Duration) {
labels := t.metrics.Labels(t.id)
labels["level"] = fmt.Sprint(level)
if success {
t.metrics.CompactionDuration.With(labels).Observe(d.Seconds())
labels["status"] = "ok"
t.metrics.Compactions.With(labels).Inc()
return
}
labels["status"] = "error"
t.metrics.Compactions.With(labels).Inc()
}
// unionStringSets returns the union of two sets
func unionStringSets(a, b map[string]struct{}) map[string]struct{} {
other := make(map[string]struct{})