influxdb/storage/retention.go

130 lines
3.9 KiB
Go

package storage
import (
"context"
"errors"
"math"
"time"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/logger"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
const (
bucketAPITimeout = 10 * time.Second
)
// A Deleter implementation is capable of deleting data from a storage engine.
type Deleter interface {
DeleteBucketRange(orgID, bucketID platform.ID, min, max int64) error
}
// A BucketFinder is responsible for providing access to buckets via a filter.
type BucketFinder interface {
FindBuckets(context.Context, platform.BucketFilter, ...platform.FindOptions) ([]*platform.Bucket, int, error)
}
// ErrServiceClosed is returned when the service is unavailable.
var ErrServiceClosed = errors.New("service is currently closed")
// The retentionEnforcer periodically removes data that is outside of the retention
// period of the bucket associated with the data.
type retentionEnforcer struct {
// Engine provides access to data stored on the engine
Engine Deleter
// BucketService provides an API for retrieving buckets associated with
// organisations.
BucketService BucketFinder
logger *zap.Logger
metrics *retentionMetrics
}
// newRetentionEnforcer returns a new enforcer that ensures expired data is
// deleted every interval period. Setting interval to 0 is equivalent to
// disabling the service.
func newRetentionEnforcer(engine Deleter, bucketService BucketFinder) *retentionEnforcer {
s := &retentionEnforcer{
Engine: engine,
BucketService: bucketService,
logger: zap.NewNop(),
}
s.metrics = newRetentionMetrics(nil)
return s
}
// WithLogger sets the logger l on the service. It must be called before Open.
func (s *retentionEnforcer) WithLogger(l *zap.Logger) {
if s == nil {
return // Not initialised
}
s.logger = l.With(zap.String("component", "retention_enforcer"))
}
// run periodically expires (deletes) all data that's fallen outside of the
// retention period for the associated bucket.
func (s *retentionEnforcer) run() {
log, logEnd := logger.NewOperation(s.logger, "Data retention check", "data_retention_check")
defer logEnd()
buckets, err := s.getBucketInformation()
if err != nil {
log.Error("Unable to determine bucket information", zap.Error(err))
return
}
now := time.Now().UTC()
s.expireData(buckets, now)
s.metrics.CheckDuration.With(s.metrics.Labels()).Observe(time.Since(now).Seconds())
}
// expireData runs a delete operation on the storage engine.
//
// Any series data that (1) belongs to a bucket in the provided list and
// (2) falls outside the bucket's indicated retention period will be deleted.
func (s *retentionEnforcer) expireData(buckets []*platform.Bucket, now time.Time) {
logger, logEnd := logger.NewOperation(s.logger, "Data deletion", "data_deletion")
defer logEnd()
labels := s.metrics.Labels()
for _, b := range buckets {
if b.RetentionPeriod == 0 {
continue
}
labels["status"] = "ok"
labels["org_id"] = b.OrganizationID.String()
labels["bucket_id"] = b.ID.String()
max := now.Add(-b.RetentionPeriod).UnixNano()
err := s.Engine.DeleteBucketRange(b.OrganizationID, b.ID, math.MinInt64, max)
if err != nil {
labels["status"] = "error"
logger.Info("unable to delete bucket range",
zap.String("bucket id", b.ID.String()),
zap.String("org id", b.OrganizationID.String()),
zap.Error(err))
}
s.metrics.Checks.With(labels).Inc()
}
}
// getBucketInformation returns a slice of buckets to run retention on.
func (s *retentionEnforcer) getBucketInformation() ([]*platform.Bucket, error) {
ctx, cancel := context.WithTimeout(context.Background(), bucketAPITimeout)
defer cancel()
buckets, _, err := s.BucketService.FindBuckets(ctx, platform.BucketFilter{})
return buckets, err
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (s *retentionEnforcer) PrometheusCollectors() []prometheus.Collector {
return s.metrics.PrometheusCollectors()
}