Integrate Retention metrics

pull/10616/head
Edd Robinson 2018-10-08 16:49:33 +01:00
parent 116456534a
commit 54d1ac09c1
3 changed files with 128 additions and 19 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/influxdata/platform/tsdb"
"github.com/influxdata/platform/tsdb/tsi1"
"github.com/influxdata/platform/tsdb/tsm1"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
@ -29,11 +30,12 @@ type Engine struct {
engineID *int // Not used by default.
nodeID *int // Not used by default.
mu sync.RWMutex
open bool
index *tsi1.Index
sfile *tsdb.SeriesFile
engine *tsm1.Engine
mu sync.RWMutex
open bool
index *tsi1.Index
sfile *tsdb.SeriesFile
engine *tsm1.Engine
retentionService *retentionService
logger *zap.Logger
}
@ -65,6 +67,26 @@ var WithNodeID = func(id int) Option {
}
}
// WithRetentionService initialises a retention service on the engine with the
// provided interval. WithRetentionService must be called after other options
// to ensure that all metrics are labelled correctly.
var WithRetentionService = func(finder BucketFinder, interval int64) Option {
return func(e *Engine) {
e.retentionService = newRetentionService(e, finder, interval)
labels := prometheus.Labels(map[string]string{"status": ""})
if e.engineID != nil {
labels["engine_id"] = fmt.Sprint(*e.engineID)
}
if e.nodeID != nil {
labels["node_id"] = fmt.Sprint(*e.nodeID)
}
e.retentionService.defaultMetricLabels = labels
e.retentionService.retentionMetrics = newRetentionMetrics(labels)
}
}
// NewEngine initialises a new storage engine, including a series file, index and
// TSM engine.
func NewEngine(path string, c Config, options ...Option) *Engine {
@ -110,6 +132,10 @@ func (e *Engine) WithLogger(log *zap.Logger) {
e.sfile.WithLogger(e.logger)
e.index.WithLogger(e.logger)
e.engine.WithLogger(e.logger)
if e.retentionService != nil {
e.retentionService.WithLogger(e.logger)
}
}
// Open opens the store and all underlying resources. It returns an error if

65
storage/metrics.go Normal file
View File

@ -0,0 +1,65 @@
package storage
import "github.com/prometheus/client_golang/prometheus"
// namespace is the leading part of all published metrics for the Storage service.
const namespace = "storage"
const retentionSubsystem = "retention" // sub-system associated with metrics for writing points.
// retentionMetrics is a set of metrics concerned with tracking data about retention policies.
type retentionMetrics struct {
Checks *prometheus.CounterVec
CheckDuration *prometheus.HistogramVec
Unprocessable *prometheus.CounterVec
Series *prometheus.CounterVec
}
func newRetentionMetrics(labels prometheus.Labels) *retentionMetrics {
var names []string
for k := range labels {
names = append(names, k)
}
return &retentionMetrics{
Checks: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: retentionSubsystem,
Name: "checks_total",
Help: "Number of retention check operations performed.",
}, names),
CheckDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: retentionSubsystem,
Name: "check_duration_seconds",
Help: "Time taken to perform a successful retention check.",
// 25 buckets spaced exponentially between 10s and ~2h
Buckets: prometheus.ExponentialBuckets(10, 1.32, 25),
}, names),
Unprocessable: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: retentionSubsystem,
Name: "unprocessable_buckets_total",
Help: "Number of buckets that could not be operated on.",
}, names),
Series: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: retentionSubsystem,
Name: "series_total",
Help: "Number of series that a delete was applied to.",
}, names),
}
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (rm *retentionMetrics) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
rm.Checks,
rm.CheckDuration,
rm.Unprocessable,
rm.Series,
}
}

View File

@ -62,7 +62,8 @@ type retentionService struct {
logger *zap.Logger
interval time.Duration // Interval that retention service deletes data.
// retentionMetrics *retentionMetrics
retentionMetrics *retentionMetrics
defaultMetricLabels prometheus.Labels // N.B this must not be mutated after Open is called.
mu sync.RWMutex
_closing chan struct{}
@ -79,13 +80,19 @@ func newRetentionService(engine Deleter, bucketService BucketFinder, interval in
BucketService: bucketService,
logger: zap.NewNop(),
interval: time.Duration(interval) * time.Second,
// retentionMetrics: newRetentionMetrics(),
}
return s
}
// metricLabels returns a new copy of the default metric labels.
func (s *retentionService) metricLabels() prometheus.Labels {
labels := make(map[string]string, len(s.defaultMetricLabels))
for k, v := range s.defaultMetricLabels {
labels[k] = v
}
return labels
}
// WithLogger sets the logger l on the service. It must be called before Open.
func (s *retentionService) WithLogger(l *zap.Logger) {
s.logger = l.With(zap.String("service", serviceName))
@ -144,14 +151,16 @@ func (s *retentionService) run() {
}
now := time.Now().UTC()
// status := "ok"
labels := s.metricLabels()
labels["status"] = "ok"
if err := s.expireData(rpByBucketID, now); err != nil {
log.Error("Deletion not successful", zap.Error(err))
// status = "error"
labels["status"] = "error"
}
// s.retentionMetrics.CheckDuration.With(prometheus.Labels{"node_id": fmt.Sprint(s.nodeID), "partition": fmt.Sprint(shardID), "status": status}).Observe(time.Since(now).Seconds())
// s.retentionMetrics.Checks.With(prometheus.Labels{"node_id": fmt.Sprint(s.nodeID), "partition": fmt.Sprint(shardID), "status": status}).Inc()
s.retentionMetrics.CheckDuration.With(labels).Observe(time.Since(now).Seconds())
s.retentionMetrics.Checks.With(labels).Inc()
case <-closingCh:
return
}
@ -209,12 +218,19 @@ func (s *retentionService) expireData(rpByBucketID map[string]time.Duration, now
}
defer func() {
// Track metrics on deletion.
// s.retentionMetrics.Unprocessable.With(prometheus.Labels{"node_id": fmt.Sprint(s.nodeID), "partition": fmt.Sprint(sh.ID()), "status": "bad_measurement"}).Add(float64(len(badMSketch)))
// s.retentionMetrics.Unprocessable.With(prometheus.Labels{"node_id": fmt.Sprint(s.nodeID), "partition": fmt.Sprint(sh.ID()), "status": "missing_bucket"}).Add(float64(len(missingBSketch)))
labels := s.metricLabels()
// s.retentionMetrics.Series.With(prometheus.Labels{"node_id": fmt.Sprint(s.nodeID), "partition": fmt.Sprint(sh.ID()), "status": "ok"}).Add(float64(atomic.LoadUint64(&seriesDeleted)))
// s.retentionMetrics.Series.With(prometheus.Labels{"node_id": fmt.Sprint(s.nodeID), "partition": fmt.Sprint(sh.ID()), "status": "skipped"}).Add(float64(atomic.LoadUint64(&seriesSkipped)))
labels["status"] = "bad_measurement"
s.retentionMetrics.Unprocessable.With(labels).Add(float64(len(badMSketch)))
labels["status"] = "missing_bucket"
s.retentionMetrics.Unprocessable.With(labels).Add(float64(len(missingBSketch)))
labels["status"] = "ok"
s.retentionMetrics.Series.With(labels).Add(float64(atomic.LoadUint64(&seriesDeleted)))
labels["status"] = "skipped"
s.retentionMetrics.Series.With(labels).Add(float64(atomic.LoadUint64(&seriesSkipped)))
}()
return s.Engine.DeleteSeriesRangeWithPredicate(newSeriesIteratorAdapter(cur), fn)
@ -270,8 +286,10 @@ func (s *retentionService) Close() error {
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (s *retentionService) PrometheusCollectors() []prometheus.Collector {
if s.retentionMetrics != nil {
return s.retentionMetrics.PrometheusCollectors()
}
return nil
// return s.retentionMetrics.PrometheusCollectors()
}
// A BucketService is an platform.BucketService that the RetentionService can open,