Add series file metrics

pull/10616/head
Edd Robinson 2018-11-09 10:16:34 +00:00
parent 4e67e37ca6
commit d1fe2bc188
9 changed files with 355 additions and 19 deletions

38
pkg/rhh/metrics.go Normal file
View File

@ -0,0 +1,38 @@
package rhh
import (
"sort"
"github.com/prometheus/client_golang/prometheus"
)
type rhhMetrics struct {
labels prometheus.Labels
}
// newRHHMetrics initialises prometheus metrics for tracking an RHH hashmap.
func newRHHMetrics(namespace, subsystem string, labels prometheus.Labels) *rhhMetrics {
var names []string
for k := range labels {
names = append(names, k)
}
sort.Strings(names)
return &rhhMetrics{
labels: labels,
}
}
// Labels returns a copy of labels for use with RHH metrics.
func (m *rhhMetrics) Labels() prometheus.Labels {
l := make(map[string]string, len(m.labels))
for k, v := range m.labels {
l[k] = v
}
return l
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (m *rhhMetrics) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{}
}

View File

@ -6,6 +6,7 @@ import (
"sort"
"github.com/cespare/xxhash"
"github.com/prometheus/client_golang/prometheus"
)
// HashMap represents a hash map that implements Robin Hood Hashing.
@ -21,12 +22,15 @@ type HashMap struct {
loadFactor int
tmpKey []byte
tracker *rhhTracker
}
func NewHashMap(opt Options) *HashMap {
m := &HashMap{
capacity: pow2(opt.Capacity), // Limited to 2^64.
loadFactor: opt.LoadFactor,
tracker: newRHHTracker(newRHHMetrics("", "", nil)),
}
m.alloc()
return m
@ -203,6 +207,19 @@ func (m *HashMap) Keys() [][]byte {
return a
}
// PrometheusCollectors returns the metrics associated with this hashmap.
func (m *HashMap) PrometheusCollectors() []prometheus.Collector {
return m.tracker.metrics.PrometheusCollectors()
}
type rhhTracker struct {
metrics *rhhMetrics
}
func newRHHTracker(metrics *rhhMetrics) *rhhTracker {
return &rhhTracker{metrics: metrics}
}
type hashElem struct {
key []byte
value interface{}

View File

@ -1,10 +1,10 @@
package storage
import (
"fmt"
"bytes"
"context"
"errors"
"fmt"
"sync"
"time"
@ -136,7 +136,8 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
option(e)
}
// Set default metrics labels.
e.engine.WithDefaultMetricLabels(e.defaultMetricLabels)
e.engine.SetDefaultMetricLabels(e.defaultMetricLabels)
e.sfile.SetDefaultMetricLabels(e.defaultMetricLabels)
return e
}
@ -165,7 +166,7 @@ func (e *Engine) WithLogger(log *zap.Logger) {
func (e *Engine) PrometheusCollectors() []prometheus.Collector {
var metrics []prometheus.Collector
// TODO(edd): Get prom metrics for index.
// TODO(edd): Get prom metrics for series file.
metrics = append(metrics, e.sfile.PrometheusCollectors()...)
metrics = append(metrics, e.engine.PrometheusCollectors()...)
metrics = append(metrics, e.retentionEnforcer.PrometheusCollectors()...)
return metrics
@ -200,7 +201,7 @@ func (e *Engine) Open() error {
// For now we will just run on an interval as we only have the retention
// policy enforcer.
e.runRetentionEnforcer()
return nil
}

115
tsdb/metrics.go Normal file
View File

@ -0,0 +1,115 @@
package tsdb
import (
"fmt"
"sort"
"github.com/prometheus/client_golang/prometheus"
)
// namespace is the leading part of all published metrics for the Storage service.
const namespace = "storage"
const seriesFileSubsystem = "series_file" // sub-system associated with metrics for the Series File.
type seriesFileMetrics struct {
labels prometheus.Labels
SeriesCreated *prometheus.CounterVec // Number of series created in Series File.
Series *prometheus.GaugeVec // Number of series.
DiskSize *prometheus.GaugeVec // Size occupied on disk.
Segments *prometheus.GaugeVec // Number of segment files.
CompactionsActive *prometheus.GaugeVec // Number of active compactions.
CompactionDuration *prometheus.HistogramVec // Duration of compactions.
// The following metrics include a ``"status" = {ok, error}` label
Compactions *prometheus.CounterVec // Total number of compactions.
}
// newSeriesFileMetrics initialises the prometheus metrics for tracking the Series File.
func newSeriesFileMetrics(labels prometheus.Labels) *seriesFileMetrics {
var names []string
for k := range labels {
names = append(names, k)
}
names = append(names, "partition_id") // All metrics have a partition_id label
sort.Strings(names)
totalCompactions := append(names, "status")
sort.Strings(totalCompactions)
durationCompaction := append(names, "component")
sort.Strings(durationCompaction)
return &seriesFileMetrics{
labels: labels,
SeriesCreated: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: seriesFileSubsystem,
Name: "series_created",
Help: "Number of series created in Series File.",
}, names),
Series: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: seriesFileSubsystem,
Name: "series_total",
Help: "Number of series in Series File.",
}, names),
DiskSize: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: seriesFileSubsystem,
Name: "disk_bytes",
Help: "Number of bytes Series File is using on disk.",
}, names),
Segments: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: seriesFileSubsystem,
Name: "segments",
Help: "Number of segment files in Series File.",
}, names),
CompactionsActive: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: seriesFileSubsystem,
Name: "index_compactions_active",
Help: "Number of active compactions.",
}, names),
CompactionDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: seriesFileSubsystem,
Name: "index_compactions_duration_seconds",
Help: "Time taken for a successful compaction of index.",
// 30 buckets spaced exponentially between 5s and ~53 minutes.
Buckets: prometheus.ExponentialBuckets(5.0, 1.25, 30),
}, durationCompaction),
Compactions: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: seriesFileSubsystem,
Name: "compactions",
Help: "Number of compactions.",
}, totalCompactions),
}
}
// Labels returns a copy of labels for use with Series File metrics.
func (m *seriesFileMetrics) Labels(partition int) prometheus.Labels {
l := make(map[string]string, len(m.labels))
for k, v := range m.labels {
l[k] = v
}
// N.B all series file metrics include the partition. So it's included here.
l["partition_id"] = fmt.Sprint(partition)
return l
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (m *seriesFileMetrics) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
m.SeriesCreated,
m.Series,
m.DiskSize,
m.Segments,
m.CompactionsActive,
m.CompactionDuration,
m.Compactions,
}
}

View File

@ -13,6 +13,7 @@ import (
"github.com/cespare/xxhash"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/pkg/binaryutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
@ -35,6 +36,9 @@ type SeriesFile struct {
path string
partitions []*SeriesPartition
defaultMetricLabels prometheus.Labels
metrics *seriesFileMetrics
refs sync.RWMutex // RWMutex to track references to the SeriesFile that are in use.
Logger *zap.Logger
@ -53,6 +57,12 @@ func (f *SeriesFile) WithLogger(log *zap.Logger) {
f.Logger = log.With(zap.String("service", "series-file"))
}
// SetDefaultMetricLabels sets the default labels for metrics on the Series File.
// It must be called before the SeriesFile is opened.
func (f *SeriesFile) SetDefaultMetricLabels(labels prometheus.Labels) {
f.defaultMetricLabels = labels
}
// Open memory maps the data file at the file's path.
func (f *SeriesFile) Open() error {
// Wait for all references to be released and prevent new ones from being acquired.
@ -64,12 +74,18 @@ func (f *SeriesFile) Open() error {
return err
}
f.metrics = newSeriesFileMetrics(f.defaultMetricLabels) // All partitions must share the same metrics.
// Open partitions.
f.partitions = make([]*SeriesPartition, 0, SeriesFilePartitionN)
for i := 0; i < SeriesFilePartitionN; i++ {
// TODO(edd): These partition initialisation should be moved up to NewSeriesFile.
p := NewSeriesPartition(i, f.SeriesPartitionPath(i))
p.Logger = f.Logger.With(zap.Int("partition", p.ID()))
// Set the metric tracker on the partition with any injected default labels.
p.tracker = newSeriesPartitionTracker(f.metrics, p.ID())
if err := p.Open(); err != nil {
f.Close()
return err
@ -300,6 +316,16 @@ func (f *SeriesFile) SeriesKeyPartition(key []byte) *SeriesPartition {
return f.partitions[partitionID]
}
// PrometheusCollectors returns all the prometheus metrics associated with the series file.
func (f *SeriesFile) PrometheusCollectors() []prometheus.Collector {
collectors := f.metrics.PrometheusCollectors() // Shared per-partition metrics.
for _, p := range f.partitions {
collectors = append(collectors, p.PrometheusCollectors()...)
}
return collectors
}
// AppendSeriesKey serializes name and tags to a byte slice.
// The total length is prepended as a uvarint.
func AppendSeriesKey(dst []byte, name []byte, tags models.Tags) []byte {

View File

@ -144,6 +144,16 @@ func (idx *SeriesIndex) OnDiskCount() uint64 { return idx.count }
// InMemCount returns the number of series in the in-memory index.
func (idx *SeriesIndex) InMemCount() uint64 { return uint64(len(idx.idOffsetMap)) }
// OnDiskSize returns the on-disk size of the index in bytes.
func (idx *SeriesIndex) OnDiskSize() uint64 { return uint64(len(idx.data)) }
// InMemSize returns the heap size of the index in bytes. The returned value is
// an estimation and does not include include all allocated memory.
func (idx *SeriesIndex) InMemSize() uint64 {
n := len(idx.idOffsetMap)
return uint64(2*8*n) + uint64(len(idx.tombstones)*8)
}
func (idx *SeriesIndex) Insert(key []byte, id SeriesIDTyped, offset int64) {
idx.execEntry(SeriesEntryInsertFlag, id, offset, key)
}

View File

@ -8,10 +8,12 @@ import (
"os"
"path/filepath"
"sync"
"time"
"github.com/influxdata/platform/logger"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/pkg/rhh"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
@ -44,7 +46,8 @@ type SeriesPartition struct {
CompactThreshold int
Logger *zap.Logger
tracker *seriesPartitionTracker
Logger *zap.Logger
}
// NewSeriesPartition returns a new instance of SeriesPartition.
@ -54,6 +57,7 @@ func NewSeriesPartition(id int, path string) *SeriesPartition {
path: path,
closing: make(chan struct{}),
CompactThreshold: DefaultSeriesPartitionCompactThreshold,
tracker: newSeriesPartitionTracker(newSeriesFileMetrics(nil), id),
Logger: zap.NewNop(),
seq: uint64(id) + 1,
}
@ -75,7 +79,6 @@ func (p *SeriesPartition) Open() error {
if err := p.openSegments(); err != nil {
return err
}
// Init last segment for writes.
if err := p.activeSegment().InitForWrite(); err != nil {
return err
@ -87,13 +90,14 @@ func (p *SeriesPartition) Open() error {
} else if p.index.Recover(p.segments); err != nil {
return err
}
return nil
}(); err != nil {
p.Close()
return err
}
p.tracker.SetSeries(p.index.Count()) // Set series count metric.
p.tracker.SetDiskSize(p.DiskSize()) // Set on-disk size metric.
return nil
}
@ -134,6 +138,7 @@ func (p *SeriesPartition) openSegments() error {
p.segments = append(p.segments, segment)
}
p.tracker.SetSegments(uint64(len(p.segments)))
return nil
}
@ -170,9 +175,17 @@ func (p *SeriesPartition) ID() int { return p.id }
// Path returns the path to the partition.
func (p *SeriesPartition) Path() string { return p.path }
// Path returns the path to the series index.
// IndexPath returns the path to the series index.
func (p *SeriesPartition) IndexPath() string { return filepath.Join(p.path, "index") }
// PrometheusCollectors returns the collectors associated with the partition.
func (p *SeriesPartition) PrometheusCollectors() []prometheus.Collector {
// SeriesFile metrics
p.mu.RLock()
defer p.mu.RUnlock()
return p.index.keyIDMap.PrometheusCollectors() // Metrics for RHH.
}
// CreateSeriesListIfNotExists creates a list of series in bulk if they don't exist.
// The ids parameter is modified to contain series IDs for all keys belonging to this partition.
// If the type does not match the existing type for the key, a zero id is stored.
@ -283,6 +296,8 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(collection *SeriesCollecti
for _, keyRange := range newKeyRanges {
p.index.Insert(p.seriesKeyByOffset(keyRange.offset), keyRange.id, keyRange.offset)
}
p.tracker.AddSeriesCreated(uint64(len(newKeyRanges))) // Track new series in metric.
p.tracker.AddSeries(uint64(len(newKeyRanges)))
// Check if we've crossed the compaction threshold.
if p.compactionsEnabled() && !p.compacting && p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) {
@ -290,13 +305,18 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(collection *SeriesCollecti
log, logEnd := logger.NewOperation(p.Logger, "Series partition compaction", "series_partition_compaction", zap.String("path", p.path))
p.wg.Add(1)
p.tracker.IncCompactionsActive()
go func() {
defer p.wg.Done()
compactor := NewSeriesPartitionCompactor()
compactor.cancel = p.closing
if err := compactor.Compact(p); err != nil {
duration, err := compactor.Compact(p)
if err != nil {
p.tracker.IncCompactionErr()
log.Error("series partition compaction failed", zap.Error(err))
} else {
p.tracker.IncCompactionOK(duration)
}
logEnd()
@ -305,6 +325,10 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(collection *SeriesCollecti
p.mu.Lock()
p.compacting = false
p.mu.Unlock()
p.tracker.DecCompactionsActive()
// Disk size may have changed due to compaction.
p.tracker.SetDiskSize(p.DiskSize())
}()
}
@ -348,7 +372,7 @@ func (p *SeriesPartition) DeleteSeriesID(id SeriesID) error {
// Mark tombstone in memory.
p.index.Delete(id)
p.tracker.SubSeries(1)
return nil
}
@ -417,6 +441,21 @@ func (p *SeriesPartition) SeriesCount() uint64 {
return n
}
// DiskSize returns the number of bytes taken up on disk by the partition.
func (p *SeriesPartition) DiskSize() uint64 {
p.mu.RLock()
defer p.mu.RUnlock()
return p.diskSize()
}
func (p *SeriesPartition) diskSize() uint64 {
totalSize := p.index.OnDiskSize()
for _, segment := range p.segments {
totalSize += uint64(len(segment.Data()))
}
return totalSize
}
func (p *SeriesPartition) DisableCompactions() {
p.mu.Lock()
defer p.mu.Unlock()
@ -503,7 +542,8 @@ func (p *SeriesPartition) createSegment() (*SeriesSegment, error) {
if err := segment.InitForWrite(); err != nil {
return nil, err
}
p.tracker.SetSegments(uint64(len(p.segments)))
p.tracker.SetDiskSize(p.diskSize()) // Disk size will change with new segment.
return segment, nil
}
@ -525,6 +565,92 @@ func (p *SeriesPartition) seriesKeyByOffset(offset int64) []byte {
return nil
}
type seriesPartitionTracker struct {
metrics *seriesFileMetrics
id int // ID of partition.
}
func newSeriesPartitionTracker(metrics *seriesFileMetrics, partition int) *seriesPartitionTracker {
return &seriesPartitionTracker{
metrics: metrics,
id: partition,
}
}
// AddSeriesCreated increases the number of series created in the partition by n.
func (t *seriesPartitionTracker) AddSeriesCreated(n uint64) {
labels := t.metrics.Labels(t.id)
t.metrics.SeriesCreated.With(labels).Add(float64(n))
}
// SetSeries sets the number of series in the partition.
func (t *seriesPartitionTracker) SetSeries(n uint64) {
labels := t.metrics.Labels(t.id)
t.metrics.Series.With(labels).Set(float64(n))
}
// AddSeries increases the number of series in the partition by n.
func (t *seriesPartitionTracker) AddSeries(n uint64) {
labels := t.metrics.Labels(t.id)
t.metrics.Series.With(labels).Add(float64(n))
}
// SubSeries decreases the number of series in the partition by n.
func (t *seriesPartitionTracker) SubSeries(n uint64) {
labels := t.metrics.Labels(t.id)
t.metrics.Series.With(labels).Sub(float64(n))
}
// SetDiskSize sets the number of bytes used by files for in partition.
func (t *seriesPartitionTracker) SetDiskSize(sz uint64) {
labels := t.metrics.Labels(t.id)
t.metrics.DiskSize.With(labels).Set(float64(sz))
}
// SetSegments sets the number of segments files for the partition.
func (t *seriesPartitionTracker) SetSegments(n uint64) {
labels := t.metrics.Labels(t.id)
t.metrics.Segments.With(labels).Set(float64(n))
}
// IncCompactionsActive increments the number of active compactions for the
// components of a partition (index and segments).
func (t *seriesPartitionTracker) IncCompactionsActive() {
labels := t.metrics.Labels(t.id)
labels["component"] = "index" // TODO(edd): when we add segment compactions we will add a new label value.
t.metrics.CompactionsActive.With(labels).Inc()
}
// DecCompactionsActive decrements the number of active compactions for the
// components of a partition (index and segments).
func (t *seriesPartitionTracker) DecCompactionsActive() {
labels := t.metrics.Labels(t.id)
labels["component"] = "index" // TODO(edd): when we add segment compactions we will add a new label value.
t.metrics.CompactionsActive.With(labels).Dec()
}
// incCompactions increments the number of compactions for the partition.
// Callers should use IncCompactionOK and IncCompactionErr.
func (t *seriesPartitionTracker) incCompactions(status string, duration time.Duration) {
if duration > 0 {
labels := t.metrics.Labels(t.id)
labels["component"] = "index"
t.metrics.CompactionDuration.With(labels).Observe(duration.Seconds())
}
labels := t.metrics.Labels(t.id)
labels["status"] = status
t.metrics.Compactions.With(labels).Inc()
}
// IncCompactionOK increments the number of successful compactions for the partition.
func (t *seriesPartitionTracker) IncCompactionOK(duration time.Duration) {
t.incCompactions("ok", duration)
}
// IncCompactionErr increments the number of failed compactions for the partition.
func (t *seriesPartitionTracker) IncCompactionErr() { t.incCompactions("error", 0) }
// SeriesPartitionCompactor represents an object reindexes a series partition and optionally compacts segments.
type SeriesPartitionCompactor struct {
cancel <-chan struct{}
@ -536,7 +662,7 @@ func NewSeriesPartitionCompactor() *SeriesPartitionCompactor {
}
// Compact rebuilds the series partition index.
func (c *SeriesPartitionCompactor) Compact(p *SeriesPartition) error {
func (c *SeriesPartitionCompactor) Compact(p *SeriesPartition) (time.Duration, error) {
// Snapshot the partitions and index so we can check tombstones and replay at the end under lock.
p.mu.RLock()
segments := CloneSeriesSegments(p.segments)
@ -544,11 +670,14 @@ func (c *SeriesPartitionCompactor) Compact(p *SeriesPartition) error {
seriesN := p.index.Count()
p.mu.RUnlock()
now := time.Now()
// Compact index to a temporary location.
indexPath := index.path + ".compacting"
if err := c.compactIndexTo(index, seriesN, segments, indexPath); err != nil {
return err
return 0, err
}
duration := time.Since(now)
// Swap compacted index under lock & replay since compaction.
if err := func() error {
@ -570,10 +699,10 @@ func (c *SeriesPartitionCompactor) Compact(p *SeriesPartition) error {
}
return nil
}(); err != nil {
return err
return 0, err
}
return nil
return duration, nil
}
func (c *SeriesPartitionCompactor) compactIndexTo(index *SeriesIndex, seriesN uint64, segments []*SeriesSegment, path string) error {

View File

@ -250,9 +250,9 @@ func (e *Engine) WithCompactionPlanner(planner CompactionPlanner) {
e.CompactionPlan = planner
}
// WithDefaultMetricLabels sets the default labels for metrics on the engine.
// SetDefaultMetricLabels sets the default labels for metrics on the engine.
// It must be called before the Engine is opened.
func (e *Engine) WithDefaultMetricLabels(labels prometheus.Labels) {
func (e *Engine) SetDefaultMetricLabels(labels prometheus.Labels) {
e.defaultMetricLabels = labels
}

View File

@ -53,7 +53,7 @@ type compactionMetrics struct {
CompactionDuration *prometheus.HistogramVec
CompactionQueue *prometheus.GaugeVec
// The following metrics include a ``"status" = {ok, error, dropped}` label
// The following metrics include a ``"status" = {ok, error}` label
Compactions *prometheus.CounterVec
}
@ -312,7 +312,7 @@ func newWALMetrics(labels prometheus.Labels) *walMetrics {
}
}
// Labels returns a copy of labels for use with file metrics.
// Labels returns a copy of labels for use with WAL metrics.
func (m *walMetrics) Labels() prometheus.Labels {
l := make(map[string]string, len(m.labels))
for k, v := range m.labels {