storage: make retention metrics global

In the case that there are multiple engines, we want the retention
metrics from all of them.
pull/13022/head
Jeff Wendling 2019-03-29 13:27:43 -06:00
parent 83000361c3
commit 3e370b0f99
3 changed files with 109 additions and 37 deletions

View File

@ -139,9 +139,8 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
e.engine.SetDefaultMetricLabels(e.defaultMetricLabels)
e.sfile.SetDefaultMetricLabels(e.defaultMetricLabels)
e.index.SetDefaultMetricLabels(e.defaultMetricLabels)
if e.wal != nil {
e.wal.SetDefaultMetricLabels(e.defaultMetricLabels)
}
e.wal.SetDefaultMetricLabels(e.defaultMetricLabels)
e.retentionEnforcer.SetDefaultMetricLabels(e.defaultMetricLabels)
return e
}
@ -174,7 +173,7 @@ func (e *Engine) PrometheusCollectors() []prometheus.Collector {
metrics = append(metrics, tsi1.PrometheusCollectors()...)
metrics = append(metrics, tsm1.PrometheusCollectors()...)
metrics = append(metrics, wal.PrometheusCollectors()...)
metrics = append(metrics, e.retentionEnforcer.PrometheusCollectors()...)
metrics = append(metrics, RetentionPrometheusCollectors()...)
return metrics
}
@ -210,7 +209,9 @@ func (e *Engine) Open(ctx context.Context) (err error) {
// TODO(edd) background tasks will be run in priority order via a scheduler.
// For now we will just run on an interval as we only have the retention
// policy enforcer.
e.runRetentionEnforcer()
if e.retentionEnforcer != nil {
e.runRetentionEnforcer()
}
return nil
}
@ -279,11 +280,6 @@ func (e *Engine) runRetentionEnforcer() {
return
}
if e.retentionEnforcer != nil {
// Set default metric labels on retention enforcer.
e.retentionEnforcer.metrics = newRetentionMetrics(e.defaultMetricLabels)
}
l := e.logger.With(zap.String("component", "retention_enforcer"), logger.DurationLiteral("check_interval", interval))
l.Info("Starting")

View File

@ -2,10 +2,31 @@ package storage
import (
"sort"
"sync"
"github.com/prometheus/client_golang/prometheus"
)
// The following package variables act as singletons, to be shared by all
// storage.Engine instantiations. This allows multiple Engines to be
// monitored within the same process.
var (
rms *retentionMetrics
mmu sync.RWMutex
)
// RetentionPrometheusCollectors returns all prometheus metrics for retention.
func RetentionPrometheusCollectors() []prometheus.Collector {
mmu.RLock()
defer mmu.RUnlock()
var collectors []prometheus.Collector
if rms != nil {
collectors = append(collectors, rms.PrometheusCollectors()...)
}
return collectors
}
// namespace is the leading part of all published metrics for the Storage service.
const namespace = "storage"

View File

@ -6,7 +6,7 @@ import (
"math"
"time"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/logger"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
@ -18,12 +18,12 @@ const (
// A Deleter implementation is capable of deleting data from a storage engine.
type Deleter interface {
DeleteBucketRange(orgID, bucketID platform.ID, min, max int64) error
DeleteBucketRange(orgID, bucketID influxdb.ID, min, max int64) error
}
// A BucketFinder is responsible for providing access to buckets via a filter.
type BucketFinder interface {
FindBuckets(context.Context, platform.BucketFilter, ...platform.FindOptions) ([]*platform.Bucket, int, error)
FindBuckets(context.Context, influxdb.BucketFilter, ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error)
}
// ErrServiceClosed is returned when the service is unavailable.
@ -41,23 +41,37 @@ type retentionEnforcer struct {
logger *zap.Logger
metrics *retentionMetrics
tracker *retentionTracker
}
// newRetentionEnforcer returns a new enforcer that ensures expired data is
// deleted every interval period. Setting interval to 0 is equivalent to
// disabling the service.
func newRetentionEnforcer(engine Deleter, bucketService BucketFinder) *retentionEnforcer {
s := &retentionEnforcer{
return &retentionEnforcer{
Engine: engine,
BucketService: bucketService,
logger: zap.NewNop(),
tracker: newRetentionTracker(newRetentionMetrics(nil), nil),
}
s.metrics = newRetentionMetrics(nil)
return s
}
// WithLogger sets the logger l on the service. It must be called before Open.
// SetDefaultMetricLabels sets the default labels for the retention metrics.
func (s *retentionEnforcer) SetDefaultMetricLabels(defaultLabels prometheus.Labels) {
if s == nil {
return // Not initialized
}
mmu.Lock()
if rms == nil {
rms = newRetentionMetrics(defaultLabels)
}
mmu.Unlock()
s.tracker = newRetentionTracker(rms, defaultLabels)
}
// WithLogger sets the logger l on the service. It must be called before any run calls.
func (s *retentionEnforcer) WithLogger(l *zap.Logger) {
if s == nil {
return // Not initialised
@ -68,62 +82,103 @@ func (s *retentionEnforcer) WithLogger(l *zap.Logger) {
// run periodically expires (deletes) all data that's fallen outside of the
// retention period for the associated bucket.
func (s *retentionEnforcer) run() {
if s == nil {
return // Not initialized
}
log, logEnd := logger.NewOperation(s.logger, "Data retention check", "data_retention_check")
defer logEnd()
now := time.Now().UTC()
buckets, err := s.getBucketInformation()
if err != nil {
log.Error("Unable to determine bucket information", zap.Error(err))
return
} else {
s.expireData(buckets, now)
}
now := time.Now().UTC()
s.expireData(buckets, now)
s.metrics.CheckDuration.With(s.metrics.Labels()).Observe(time.Since(now).Seconds())
s.tracker.CheckDuration(time.Since(now), err == nil)
}
// expireData runs a delete operation on the storage engine.
//
// Any series data that (1) belongs to a bucket in the provided list and
// (2) falls outside the bucket's indicated retention period will be deleted.
func (s *retentionEnforcer) expireData(buckets []*platform.Bucket, now time.Time) {
func (s *retentionEnforcer) expireData(buckets []*influxdb.Bucket, now time.Time) {
logger, logEnd := logger.NewOperation(s.logger, "Data deletion", "data_deletion")
defer logEnd()
labels := s.metrics.Labels()
for _, b := range buckets {
if b.RetentionPeriod == 0 {
continue
}
labels["status"] = "ok"
labels["org_id"] = b.OrganizationID.String()
labels["bucket_id"] = b.ID.String()
max := now.Add(-b.RetentionPeriod).UnixNano()
err := s.Engine.DeleteBucketRange(b.OrganizationID, b.ID, math.MinInt64, max)
if err != nil {
labels["status"] = "error"
logger.Info("unable to delete bucket range",
zap.String("bucket id", b.ID.String()),
zap.String("org id", b.OrganizationID.String()),
zap.Error(err))
}
s.metrics.Checks.With(labels).Inc()
s.tracker.IncChecks(b.OrganizationID, b.ID, err == nil)
}
}
// getBucketInformation returns a slice of buckets to run retention on.
func (s *retentionEnforcer) getBucketInformation() ([]*platform.Bucket, error) {
func (s *retentionEnforcer) getBucketInformation() ([]*influxdb.Bucket, error) {
ctx, cancel := context.WithTimeout(context.Background(), bucketAPITimeout)
defer cancel()
buckets, _, err := s.BucketService.FindBuckets(ctx, platform.BucketFilter{})
buckets, _, err := s.BucketService.FindBuckets(ctx, influxdb.BucketFilter{})
return buckets, err
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (s *retentionEnforcer) PrometheusCollectors() []prometheus.Collector {
return s.metrics.PrometheusCollectors()
//
// metrics tracker
//
type retentionTracker struct {
metrics *retentionMetrics
labels prometheus.Labels
}
func newRetentionTracker(metrics *retentionMetrics, defaultLabels prometheus.Labels) *retentionTracker {
return &retentionTracker{metrics: metrics, labels: defaultLabels}
}
// Labels returns a copy of labels for use with index cache metrics.
func (t *retentionTracker) Labels() prometheus.Labels {
l := make(map[string]string, len(t.labels))
for k, v := range t.labels {
l[k] = v
}
return l
}
// IncChecks signals that a check happened for some bucket.
func (t *retentionTracker) IncChecks(orgID, bucketID influxdb.ID, success bool) {
labels := t.Labels()
labels["org_id"] = orgID.String()
labels["bucket_id"] = bucketID.String()
if success {
labels["status"] = "ok"
} else {
labels["status"] = "error"
}
t.metrics.Checks.With(labels).Inc()
}
// CheckDuration records the overall duration of a full retention check.
func (t *retentionTracker) CheckDuration(dur time.Duration, success bool) {
labels := t.Labels()
if success {
labels["status"] = "ok"
} else {
labels["status"] = "error"
}
t.metrics.CheckDuration.With(labels).Observe(dur.Seconds())
}