diff --git a/pkg/rhh/metrics.go b/pkg/rhh/metrics.go index 3d7f8840d0..e314339d8d 100644 --- a/pkg/rhh/metrics.go +++ b/pkg/rhh/metrics.go @@ -19,7 +19,7 @@ type Metrics struct { // These metrics have an extra label status = {"hit", "miss"} Gets *prometheus.CounterVec // Number of times item retrieved. - Puts *prometheus.CounterVec // Number of times item retrieved. + Puts *prometheus.CounterVec // Number of times item inserted. } // NewMetrics initialises prometheus metrics for tracking an RHH hashmap. diff --git a/storage/engine.go b/storage/engine.go index 51eaf88dce..dcc3e0659f 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -138,6 +138,7 @@ func NewEngine(path string, c Config, options ...Option) *Engine { // Set default metrics labels. e.engine.SetDefaultMetricLabels(e.defaultMetricLabels) e.sfile.SetDefaultMetricLabels(e.defaultMetricLabels) + e.index.SetDefaultMetricLabels(e.defaultMetricLabels) return e } @@ -165,8 +166,8 @@ func (e *Engine) WithLogger(log *zap.Logger) { // the engine and its components. func (e *Engine) PrometheusCollectors() []prometheus.Collector { var metrics []prometheus.Collector - // TODO(edd): Get prom metrics for index. metrics = append(metrics, e.sfile.PrometheusCollectors()...) + metrics = append(metrics, e.index.PrometheusCollectors()...) metrics = append(metrics, e.engine.PrometheusCollectors()...) metrics = append(metrics, e.retentionEnforcer.PrometheusCollectors()...) return metrics diff --git a/tsdb/metrics.go b/tsdb/metrics.go index 845893c1fc..dbaa2f6218 100644 --- a/tsdb/metrics.go +++ b/tsdb/metrics.go @@ -70,7 +70,7 @@ func newSeriesFileMetrics(labels prometheus.Labels) *seriesFileMetrics { Subsystem: seriesFileSubsystem, Name: "index_compactions_active", Help: "Number of active index compactions.", - }, names), + }, durationCompaction), CompactionDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: seriesFileSubsystem, diff --git a/tsdb/series_file.go b/tsdb/series_file.go index 722e135d8a..95f75a26dc 100644 --- a/tsdb/series_file.go +++ b/tsdb/series_file.go @@ -5,13 +5,14 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/influxdata/platform/logger" - "github.com/influxdata/platform/pkg/rhh" "os" "path/filepath" "sort" "sync" + "github.com/influxdata/platform/logger" + "github.com/influxdata/platform/pkg/rhh" + "github.com/cespare/xxhash" "github.com/influxdata/platform/models" "github.com/influxdata/platform/pkg/binaryutil" @@ -67,7 +68,10 @@ func (f *SeriesFile) WithLogger(log *zap.Logger) { // SetDefaultMetricLabels sets the default labels for metrics on the Series File. // It must be called before the SeriesFile is opened. func (f *SeriesFile) SetDefaultMetricLabels(labels prometheus.Labels) { - f.defaultMetricLabels = labels + f.defaultMetricLabels = make(prometheus.Labels, len(labels)) + for k, v := range labels { + f.defaultMetricLabels[k] = v + } f.defaultMetricLabels["partition_id"] = "" // All metrics have partition_id as a label. } diff --git a/tsdb/tsi1/cache.go b/tsdb/tsi1/cache.go index 5a2cb88401..1ee7616f82 100644 --- a/tsdb/tsi1/cache.go +++ b/tsdb/tsi1/cache.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/influxdata/platform/tsdb" + "github.com/prometheus/client_golang/prometheus" ) // TagValueSeriesIDCache is an LRU cache for series id sets associated with @@ -24,6 +25,7 @@ type TagValueSeriesIDCache struct { cache map[string]map[string]map[string]*list.Element evictor *list.List + tracker *cacheTracker capacity int } @@ -32,6 +34,7 @@ func NewTagValueSeriesIDCache(c int) *TagValueSeriesIDCache { return &TagValueSeriesIDCache{ cache: map[string]map[string]map[string]*list.Element{}, evictor: list.New(), + tracker: newCacheTracker(newCacheMetrics(nil)), capacity: c, } } @@ -48,11 +51,13 @@ func (c *TagValueSeriesIDCache) get(name, key, value []byte) *tsdb.SeriesIDSet { if mmap, ok := c.cache[string(name)]; ok { if tkmap, ok := mmap[string(key)]; ok { if ele, ok := tkmap[string(value)]; ok { + c.tracker.IncGetHit() c.evictor.MoveToFront(ele) // This now becomes most recently used. return ele.Value.(*seriesIDCacheElement).SeriesIDSet } } } + c.tracker.IncGetMiss() return nil } @@ -100,6 +105,7 @@ func (c *TagValueSeriesIDCache) Put(name, key, value []byte, ss *tsdb.SeriesIDSe // Check under the write lock if the relevant item is now in the cache. if c.exists(name, key, value) { c.Unlock() + c.tracker.IncPutHit() return } defer c.Unlock() @@ -136,6 +142,7 @@ func (c *TagValueSeriesIDCache) Put(name, key, value []byte, ss *tsdb.SeriesIDSe EVICT: c.checkEviction() + c.tracker.IncPutMiss() } // Delete removes x from the tuple {name, key, value} if it exists. @@ -153,16 +160,21 @@ func (c *TagValueSeriesIDCache) delete(name, key, value []byte, x tsdb.SeriesID) if ele, ok := tkmap[string(value)]; ok { if ss := ele.Value.(*seriesIDCacheElement).SeriesIDSet; ss != nil { ele.Value.(*seriesIDCacheElement).SeriesIDSet.Remove(x) + c.tracker.IncDeletesHit() + return } } } } + c.tracker.IncDeletesMiss() } // checkEviction checks if the cache is too big, and evicts the least recently used // item if it is. func (c *TagValueSeriesIDCache) checkEviction() { - if c.evictor.Len() <= c.capacity { + l := c.evictor.Len() + c.tracker.SetSize(uint64(l)) + if l <= c.capacity { return } @@ -184,6 +196,13 @@ func (c *TagValueSeriesIDCache) checkEviction() { if len(c.cache[string(name)]) == 0 { delete(c.cache, string(name)) } + c.tracker.IncEvictions() +} + +func (c *TagValueSeriesIDCache) PrometheusCollectors() []prometheus.Collector { + var collectors []prometheus.Collector + collectors = append(collectors, c.tracker.metrics.PrometheusCollectors()...) + return collectors } // seriesIDCacheElement is an item stored within a cache. @@ -193,3 +212,48 @@ type seriesIDCacheElement struct { value []byte SeriesIDSet *tsdb.SeriesIDSet } + +type cacheTracker struct { + metrics *cacheMetrics +} + +func newCacheTracker(metrics *cacheMetrics) *cacheTracker { + return &cacheTracker{metrics: metrics} +} + +func (t *cacheTracker) SetSize(sz uint64) { + labels := t.metrics.Labels() + t.metrics.Size.With(labels).Set(float64(sz)) +} + +func (t *cacheTracker) incGet(status string) { + labels := t.metrics.Labels() + labels["status"] = status + t.metrics.Gets.With(labels).Inc() +} + +func (t *cacheTracker) IncGetHit() { t.incGet("hit") } +func (t *cacheTracker) IncGetMiss() { t.incGet("miss") } + +func (t *cacheTracker) incPut(status string) { + labels := t.metrics.Labels() + labels["status"] = status + t.metrics.Puts.With(labels).Inc() +} + +func (t *cacheTracker) IncPutHit() { t.incPut("hit") } +func (t *cacheTracker) IncPutMiss() { t.incPut("miss") } + +func (t *cacheTracker) incDeletes(status string) { + labels := t.metrics.Labels() + labels["status"] = status + t.metrics.Deletes.With(labels).Inc() +} + +func (t *cacheTracker) IncDeletesHit() { t.incDeletes("hit") } +func (t *cacheTracker) IncDeletesMiss() { t.incDeletes("miss") } + +func (t *cacheTracker) IncEvictions() { + labels := t.metrics.Labels() + t.metrics.Evictions.With(labels).Inc() +} diff --git a/tsdb/tsi1/index.go b/tsdb/tsi1/index.go index 7411e04c31..3f71b329a7 100644 --- a/tsdb/tsi1/index.go +++ b/tsdb/tsi1/index.go @@ -13,6 +13,8 @@ import ( "sync/atomic" "unsafe" + "github.com/prometheus/client_golang/prometheus" + "bytes" "sort" @@ -109,7 +111,10 @@ type Index struct { partitions []*Partition opened bool + defaultLabels prometheus.Labels + tagValueCache *TagValueSeriesIDCache + partitionMetrics *partitionMetrics // Maintain a single set of partition metrics to be shared by partition. // The following may be set when initializing an Index. path string // Root directory of the index partitions. @@ -137,6 +142,7 @@ func (i *Index) UniqueReferenceID() uintptr { func NewIndex(sfile *tsdb.SeriesFile, c Config, options ...IndexOption) *Index { idx := &Index{ tagValueCache: NewTagValueSeriesIDCache(DefaultSeriesIDSetCacheSize), + partitionMetrics: newPartitionMetrics(nil), maxLogFileSize: int64(c.MaxIndexLogFileSize), logger: zap.NewNop(), version: Version, @@ -151,6 +157,16 @@ func NewIndex(sfile *tsdb.SeriesFile, c Config, options ...IndexOption) *Index { return idx } +// SetDefaultMetricLabels sets the default labels on the trackers. +func (i *Index) SetDefaultMetricLabels(labels prometheus.Labels) { + i.defaultLabels = make(prometheus.Labels, len(labels)) + for k, v := range labels { + i.defaultLabels[k] = v + } + i.tagValueCache.tracker = newCacheTracker(newCacheMetrics(labels)) + i.partitionMetrics = newPartitionMetrics(labels) +} + // Bytes estimates the memory footprint of this Index, in bytes. func (i *Index) Bytes() int { var b int @@ -218,6 +234,7 @@ func (i *Index) Open() error { p.nosync = i.disableFsync p.logbufferSize = i.logfileBufferSize p.logger = i.logger.With(zap.String("tsi1_partition", fmt.Sprint(j+1))) + p.tracker = newPartitionTracker(i.partitionMetrics, j) i.partitions[j] = p } @@ -1517,6 +1534,14 @@ func (i *Index) matchTagValueNotEqualNotEmptySeriesIDIterator(name, key []byte, return tsdb.DifferenceSeriesIDIterators(mitr, tsdb.MergeSeriesIDIterators(itrs...)), nil } +// PrometheusCollectors returns all of the metrics for the index. +func (i *Index) PrometheusCollectors() []prometheus.Collector { + var collectors []prometheus.Collector + collectors = append(collectors, i.tagValueCache.PrometheusCollectors()...) + collectors = append(collectors, i.partitionMetrics.PrometheusCollectors()...) + return collectors +} + // IsIndexDir returns true if directory contains at least one partition directory. func IsIndexDir(path string) (bool, error) { fis, err := ioutil.ReadDir(path) diff --git a/tsdb/tsi1/metrics.go b/tsdb/tsi1/metrics.go new file mode 100644 index 0000000000..f81aa6b59a --- /dev/null +++ b/tsdb/tsi1/metrics.go @@ -0,0 +1,229 @@ +package tsi1 + +import ( + "fmt" + "sort" + + "github.com/prometheus/client_golang/prometheus" +) + +// namespace is the leading part of all published metrics for the Storage service. +const namespace = "storage" + +const cacheSubsystem = "tsi_cache" // sub-system associated with TSI index cache. +const partitionSubsystem = "tsi_index" // sub-system associated with the TSI index. + +type cacheMetrics struct { + labels prometheus.Labels + Size *prometheus.GaugeVec // Size of the cache. + + // These metrics have an extra label status = {"hit", "miss"} + Gets *prometheus.CounterVec // Number of times item retrieved. + Puts *prometheus.CounterVec // Number of times item inserted. + Deletes *prometheus.CounterVec // Number of times item deleted. + Evictions *prometheus.CounterVec // Number of times item deleted. +} + +// newCacheMetrics initialises the prometheus metrics for tracking the Series File. +func newCacheMetrics(labels prometheus.Labels) *cacheMetrics { + var names []string + for k := range labels { + names = append(names, k) + } + sort.Strings(names) + + statusNames := append(names, "status") + sort.Strings(statusNames) + + return &cacheMetrics{ + labels: labels, + Size: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: cacheSubsystem, + Name: "size", + Help: "Number of items residing in the cache.", + }, names), + Gets: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: cacheSubsystem, + Name: "get_total", + Help: "Total number of gets on cache.", + }, statusNames), + Puts: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: cacheSubsystem, + Name: "put_total", + Help: "Total number of insertions in cache.", + }, statusNames), + Deletes: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: cacheSubsystem, + Name: "deletes_total", + Help: "Total number of deletions in cache.", + }, statusNames), + Evictions: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: cacheSubsystem, + Name: "evictions_total", + Help: "Total number of cache evictions.", + }, names), + } +} + +// Labels returns a copy of labels for use with RHH metrics. +func (m *cacheMetrics) Labels() prometheus.Labels { + l := make(map[string]string, len(m.labels)) + for k, v := range m.labels { + l[k] = v + } + return l +} + +// PrometheusCollectors satisfies the prom.PrometheusCollector interface. +func (m *cacheMetrics) PrometheusCollectors() []prometheus.Collector { + return []prometheus.Collector{ + m.Size, + m.Gets, + m.Puts, + m.Deletes, + m.Evictions, + } +} + +type partitionMetrics struct { + labels prometheus.Labels + SeriesCreated *prometheus.CounterVec // Number of series created in Series File. + SeriesCreatedDuration *prometheus.HistogramVec // Distribution of time to insert series. + SeriesDropped *prometheus.CounterVec // Number of series removed from index. + Series *prometheus.GaugeVec // Number of series. + Measurements *prometheus.GaugeVec // Number of measurements. + DiskSize *prometheus.GaugeVec // Size occupied on disk. + + // This metrics has a "type" = {index, log} + FilesTotal *prometheus.GaugeVec // files on disk. + + // This metric has a "level" metric. + CompactionsActive *prometheus.GaugeVec // Number of active compactions. + + // These metrics have a "level" metric. + // The following metrics include a "status" = {ok, error}` label + CompactionDuration *prometheus.HistogramVec // Duration of compactions. + Compactions *prometheus.CounterVec // Total number of compactions. +} + +// newPartitionMetrics initialises the prometheus metrics for tracking the TSI partitions. +func newPartitionMetrics(labels prometheus.Labels) *partitionMetrics { + names := []string{"partition_id"} // All metrics have a partition + for k := range labels { + names = append(names, k) + } + sort.Strings(names) + + // type = {"index", "log"} + fileNames := append(names, "type") + sort.Strings(fileNames) + + // level = [0, 7] + compactionNames := append(names, "level") + sort.Strings(compactionNames) + + // status = {"ok", "error"} + attemptedCompactionNames := append(compactionNames, "status") + sort.Strings(attemptedCompactionNames) + + return &partitionMetrics{ + labels: labels, + SeriesCreated: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: partitionSubsystem, + Name: "series_created", + Help: "Number of series created in the partition.", + }, names), + SeriesCreatedDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: partitionSubsystem, + Name: "series_created_duration_ns", + Help: "Time taken in nanosecond to create single series.", + // 30 buckets spaced exponentially between 100ns and ~19 us. + Buckets: prometheus.ExponentialBuckets(100.0, 1.2, 30), + }, names), + SeriesDropped: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: partitionSubsystem, + Name: "series_dropped", + Help: "Number of series dropped from the partition.", + }, names), + Series: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: partitionSubsystem, + Name: "series_total", + Help: "Number of series in the partition.", + }, names), + Measurements: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: partitionSubsystem, + Name: "measurements_total", + Help: "Number of series in the partition.", + }, names), + FilesTotal: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: partitionSubsystem, + Name: "files_total", + Help: "Number of files in the partition.", + }, fileNames), + DiskSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: partitionSubsystem, + Name: "disk_bytes", + Help: "Number of bytes TSI partition is using on disk.", + }, names), + CompactionsActive: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: partitionSubsystem, + Name: "compactions_active", + Help: "Number of active partition compactions.", + }, compactionNames), + CompactionDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: partitionSubsystem, + Name: "compactions_duration_seconds", + Help: "Time taken for a successful compaction of partition.", + // 30 buckets spaced exponentially between 1s and ~10 minutes. + Buckets: prometheus.ExponentialBuckets(1.0, 1.25, 30), + }, compactionNames), + Compactions: prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: partitionSubsystem, + Name: "compactions", + Help: "Number of compactions.", + }, attemptedCompactionNames), + } +} + +// Labels returns a copy of labels for use with TSI partition metrics. +func (m *partitionMetrics) Labels(partition int) prometheus.Labels { + l := make(map[string]string, len(m.labels)) + for k, v := range m.labels { + l[k] = v + } + + // N.B all series file metrics include the partition. So it's included here. + l["partition_id"] = fmt.Sprint(partition) + return l +} + +// PrometheusCollectors satisfies the prom.PrometheusCollector interface. +func (m *partitionMetrics) PrometheusCollectors() []prometheus.Collector { + return []prometheus.Collector{ + m.SeriesCreated, + m.SeriesCreatedDuration, + m.SeriesDropped, + m.Series, + m.Measurements, + m.FilesTotal, + m.DiskSize, + m.CompactionsActive, + m.CompactionDuration, + m.Compactions, + } +} diff --git a/tsdb/tsi1/partition.go b/tsdb/tsi1/partition.go index fbfe1b7e8e..84d8eb08b2 100644 --- a/tsdb/tsi1/partition.go +++ b/tsdb/tsi1/partition.go @@ -54,6 +54,8 @@ type Partition struct { // Measurement stats stats MeasurementCardinalityStats + tracker *partitionTracker + // Fast series lookup of series IDs in the series file that have been present // in this partition. This set tracks both insertions and deletions of a series. seriesIDSet *tsdb.SeriesIDSet @@ -92,7 +94,7 @@ type Partition struct { // NewPartition returns a new instance of Partition. func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition { - return &Partition{ + partition := &Partition{ closing: make(chan struct{}), path: path, sfile: sfile, @@ -106,6 +108,11 @@ func NewPartition(sfile *tsdb.SeriesFile, path string) *Partition { logger: zap.NewNop(), version: Version, } + + base := filepath.Base(path) + id, _ := strconv.Atoi(base) // Ignore error because we will re-check during Open. + partition.tracker = newPartitionTracker(newPartitionMetrics(nil), id) + return partition } // bytes estimates the memory footprint of this Partition, in bytes. @@ -244,6 +251,10 @@ func (p *Partition) Open() error { if err := p.buildSeriesSet(); err != nil { return err } + p.tracker.SetSeries(p.seriesIDSet.Cardinality()) + p.tracker.SetFiles(uint64(len(p.fileSet.IndexFiles())), "index") + p.tracker.SetFiles(uint64(len(p.fileSet.LogFiles())), "log") + p.tracker.SetDiskSize(uint64(p.fileSet.Size())) // Mark opened. p.opened = true @@ -472,6 +483,11 @@ func (p *Partition) prependActiveLogFile() error { if err := p.writeStatsFile(); err != nil { return err } + + // Set the file metrics again. + p.tracker.SetFiles(uint64(len(p.fileSet.IndexFiles())), "index") + p.tracker.SetFiles(uint64(len(p.fileSet.LogFiles())), "log") + p.tracker.SetDiskSize(uint64(p.fileSet.Size())) return nil } @@ -663,6 +679,7 @@ func (p *Partition) createSeriesListIfNotExists(collection *tsdb.SeriesCollectio defer fs.Release() // Ensure fileset cannot change during insert. + now := time.Now() p.mu.RLock() // Insert series into log file. ids, err := p.activeLogFile.AddSeriesList(p.seriesIDSet, collection) @@ -675,9 +692,26 @@ func (p *Partition) createSeriesListIfNotExists(collection *tsdb.SeriesCollectio if err := p.CheckLogFile(); err != nil { return nil, err } + + // NOTE(edd): if this becomes expensive then we can move the count into the + // log file. + var totalNew uint64 + for _, id := range ids { + if !id.IsZero() { + totalNew++ + } + } + if totalNew > 0 { + p.tracker.AddSeriesCreated(totalNew, time.Since(now)) + p.tracker.AddSeries(totalNew) + p.tracker.SetDiskSize(uint64(p.fileSet.Size())) + } return ids, nil } +// DropSeries removes the provided series id from the index. +// +// TODO(edd): We should support a bulk drop here. func (p *Partition) DropSeries(seriesID tsdb.SeriesID) error { // Ignore if the series is already deleted. if !p.seriesIDSet.Contains(seriesID) { @@ -691,6 +725,8 @@ func (p *Partition) DropSeries(seriesID tsdb.SeriesID) error { // Update series set. p.seriesIDSet.Remove(seriesID) + p.tracker.AddSeriesDropped(1) + p.tracker.SubSeries(1) // Swap log file, if necessary. return p.CheckLogFile() @@ -923,7 +959,22 @@ func (p *Partition) compact() { func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-chan struct{}) { assert(len(files) >= 2, "at least two index files are required for compaction") assert(level > 0, "cannot compact level zero") + + var err error + var start time.Time + p.tracker.IncActiveCompaction(level) + // Set the relevant metrics at the end of any compaction. + defer func() { + p.tracker.SetFiles(uint64(len(p.fileSet.IndexFiles())), "index") + p.tracker.SetFiles(uint64(len(p.fileSet.LogFiles())), "log") + p.tracker.SetDiskSize(uint64(p.fileSet.Size())) + p.tracker.DecActiveCompaction(level) + + success := err == nil + p.tracker.CompactionAttempted(level, success, time.Since(start)) + }() + // Build a logger for this compaction. log, logEnd := logger.NewOperation(p.logger, "TSI level compaction", "tsi1_compact_to_level", zap.Int("tsi1_level", level)) defer logEnd() @@ -942,12 +993,12 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch defer once.Do(func() { IndexFiles(files).Release() }) // Track time to compact. - start := time.Now() + start = time.Now() // Create new index file. path := filepath.Join(p.path, FormatIndexFileName(p.NextSequence(), level)) - f, err := os.Create(path) - if err != nil { + var f *os.File + if f, err = os.Create(path);err != nil { log.Error("Cannot create compaction files", zap.Error(err)) return } @@ -960,14 +1011,14 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch // Compact all index files to new index file. lvl := p.levels[level] - n, err := IndexFiles(files).CompactTo(f, p.sfile, lvl.M, lvl.K, interrupt) - if err != nil { + var n int64 + if n, err = IndexFiles(files).CompactTo(f, p.sfile, lvl.M, lvl.K, interrupt); err != nil { log.Error("Cannot compact index files", zap.Error(err)) return } // Close file. - if err := f.Close(); err != nil { + if err = f.Close(); err != nil { log.Error("Error closing index file", zap.Error(err)) return } @@ -975,13 +1026,13 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch // Reopen as an index file. file := NewIndexFile(p.sfile) file.SetPath(path) - if err := file.Open(); err != nil { + if err = file.Open(); err != nil { log.Error("Cannot open new index file", zap.Error(err)) return } // Obtain lock to swap in index file and write manifest. - if err := func() error { + if err = func() error { p.mu.Lock() defer p.mu.Unlock() @@ -1021,10 +1072,10 @@ func (p *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch for _, f := range files { log.Info("Removing index file", zap.String("path", f.Path())) - if err := f.Close(); err != nil { + if err = f.Close(); err != nil { log.Error("Cannot close index file", zap.Error(err)) return - } else if err := os.Remove(f.Path()); err != nil { + } else if err = os.Remove(f.Path()); err != nil { log.Error("Cannot remove index file", zap.Error(err)) return } @@ -1228,6 +1279,119 @@ func (p *Partition) MeasurementCardinalityStats() MeasurementCardinalityStats { return stats } +type partitionTracker struct { + metrics *partitionMetrics + id int // ID of partition. +} + +func newPartitionTracker(metrics *partitionMetrics, partition int) *partitionTracker { + return &partitionTracker{ + metrics: metrics, + id: partition, + } +} + +// AddSeriesCreated increases the number of series created in the partition by n +// and sets a sample of the time taken to create a series. +func (t *partitionTracker) AddSeriesCreated(n uint64, d time.Duration) { + labels := t.metrics.Labels(t.id) + t.metrics.SeriesCreated.With(labels).Add(float64(n)) + + if n == 0 { + return // Nothing to record + } + + perseries := d.Seconds() / float64(n) + t.metrics.SeriesCreatedDuration.With(labels).Observe(perseries) +} + +// AddSeriesDropped increases the number of series dropped in the partition by n. +func (t *partitionTracker) AddSeriesDropped(n uint64) { + labels := t.metrics.Labels(t.id) + t.metrics.SeriesDropped.With(labels).Add(float64(n)) +} + +// SetSeries sets the number of series in the partition. +func (t *partitionTracker) SetSeries(n uint64) { + labels := t.metrics.Labels(t.id) + t.metrics.Series.With(labels).Set(float64(n)) +} + +// AddSeries increases the number of series in the partition by n. +func (t *partitionTracker) AddSeries(n uint64) { + labels := t.metrics.Labels(t.id) + t.metrics.Series.With(labels).Add(float64(n)) +} + +// SubSeries decreases the number of series in the partition by n. +func (t *partitionTracker) SubSeries(n uint64) { + labels := t.metrics.Labels(t.id) + t.metrics.Series.With(labels).Sub(float64(n)) +} + +// SetMeasurements sets the number of measurements in the partition. +func (t *partitionTracker) SetMeasurements(n uint64) { + labels := t.metrics.Labels(t.id) + t.metrics.Measurements.With(labels).Set(float64(n)) +} + +// AddMeasurements increases the number of measurements in the partition by n. +func (t *partitionTracker) AddMeasurements(n uint64) { + labels := t.metrics.Labels(t.id) + t.metrics.Measurements.With(labels).Add(float64(n)) +} + +// SubMeasurements decreases the number of measurements in the partition by n. +func (t *partitionTracker) SubMeasurements(n uint64) { + labels := t.metrics.Labels(t.id) + t.metrics.Measurements.With(labels).Sub(float64(n)) +} + +// SetFiles sets the number of files in the partition. +func (t *partitionTracker) SetFiles(n uint64, typ string) { + labels := t.metrics.Labels(t.id) + labels["type"] = typ + t.metrics.FilesTotal.With(labels).Set(float64(n)) +} + +// SetDiskSize sets the size of files in the partition. +func (t *partitionTracker) SetDiskSize(n uint64) { + labels := t.metrics.Labels(t.id) + t.metrics.DiskSize.With(labels).Set(float64(n)) +} + +// IncActiveCompaction increments the number of active compactions for the provided level. +func (t *partitionTracker) IncActiveCompaction(level int) { + labels := t.metrics.Labels(t.id) + labels["level"] = fmt.Sprint(level) + + t.metrics.CompactionsActive.With(labels).Inc() +} + +// DecActiveCompaction decrements the number of active compactions for the provided level. +func (t *partitionTracker) DecActiveCompaction(level int) { + labels := t.metrics.Labels(t.id) + labels["level"] = fmt.Sprint(level) + + t.metrics.CompactionsActive.With(labels).Dec() +} + +// CompactionAttempted updates the number of compactions attempted for the provided level. +func (t *partitionTracker) CompactionAttempted(level int, success bool, d time.Duration) { + labels := t.metrics.Labels(t.id) + labels["level"] = fmt.Sprint(level) + if success { + t.metrics.CompactionDuration.With(labels).Observe(d.Seconds()) + + labels["status"] = "ok" + t.metrics.Compactions.With(labels).Inc() + return + } + + labels["status"] = "error" + t.metrics.Compactions.With(labels).Inc() +} + // unionStringSets returns the union of two sets func unionStringSets(a, b map[string]struct{}) map[string]struct{} { other := make(map[string]struct{})