influxdb/storage/retention.go

130 lines
3.9 KiB
Go
Raw Normal View History

2018-10-05 16:57:49 +00:00
package storage
import (
"context"
"errors"
"math"
"time"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/logger"
2018-10-05 16:57:49 +00:00
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
const (
bucketAPITimeout = 10 * time.Second
)
// A Deleter implementation is capable of deleting data from a storage engine.
type Deleter interface {
DeleteBucketRange(orgID, bucketID platform.ID, min, max int64) error
2018-10-05 16:57:49 +00:00
}
// A BucketFinder is responsible for providing access to buckets via a filter.
type BucketFinder interface {
2018-10-09 18:43:10 +00:00
FindBuckets(context.Context, platform.BucketFilter, ...platform.FindOptions) ([]*platform.Bucket, int, error)
2018-10-05 16:57:49 +00:00
}
// ErrServiceClosed is returned when the service is unavailable.
var ErrServiceClosed = errors.New("service is currently closed")
2018-10-10 11:36:02 +00:00
// The retentionEnforcer periodically removes data that is outside of the retention
2018-10-05 16:57:49 +00:00
// period of the bucket associated with the data.
2018-10-10 11:36:02 +00:00
type retentionEnforcer struct {
2018-10-05 16:57:49 +00:00
// Engine provides access to data stored on the engine
Engine Deleter
// BucketService provides an API for retrieving buckets associated with
// organisations.
BucketService BucketFinder
logger *zap.Logger
2018-10-05 16:57:49 +00:00
metrics *retentionMetrics
2018-10-05 16:57:49 +00:00
}
2018-10-10 11:36:02 +00:00
// newRetentionEnforcer returns a new enforcer that ensures expired data is
// deleted every interval period. Setting interval to 0 is equivalent to
// disabling the service.
func newRetentionEnforcer(engine Deleter, bucketService BucketFinder) *retentionEnforcer {
2018-10-10 11:36:02 +00:00
s := &retentionEnforcer{
Engine: engine,
BucketService: bucketService,
logger: zap.NewNop(),
2018-10-05 16:57:49 +00:00
}
s.metrics = newRetentionMetrics(nil)
2018-10-05 16:57:49 +00:00
return s
}
// WithLogger sets the logger l on the service. It must be called before Open.
2018-10-10 11:36:02 +00:00
func (s *retentionEnforcer) WithLogger(l *zap.Logger) {
2018-10-10 10:42:37 +00:00
if s == nil {
return // Not initialised
}
s.logger = l.With(zap.String("component", "retention_enforcer"))
2018-10-05 16:57:49 +00:00
}
// run periodically expires (deletes) all data that's fallen outside of the
// retention period for the associated bucket.
2018-10-10 11:36:02 +00:00
func (s *retentionEnforcer) run() {
log, logEnd := logger.NewOperation(s.logger, "Data retention check", "data_retention_check")
defer logEnd()
buckets, err := s.getBucketInformation()
if err != nil {
log.Error("Unable to determine bucket information", zap.Error(err))
2018-10-05 16:57:49 +00:00
return
}
now := time.Now().UTC()
2019-02-04 19:26:21 +00:00
s.expireData(buckets, now)
s.metrics.CheckDuration.With(s.metrics.Labels()).Observe(time.Since(now).Seconds())
2018-10-05 16:57:49 +00:00
}
// expireData runs a delete operation on the storage engine.
//
// Any series data that (1) belongs to a bucket in the provided list and
2018-10-05 16:57:49 +00:00
// (2) falls outside the bucket's indicated retention period will be deleted.
2019-02-04 19:26:21 +00:00
func (s *retentionEnforcer) expireData(buckets []*platform.Bucket, now time.Time) {
2019-01-22 17:31:35 +00:00
logger, logEnd := logger.NewOperation(s.logger, "Data deletion", "data_deletion")
2018-10-05 16:57:49 +00:00
defer logEnd()
2019-02-04 19:26:21 +00:00
labels := s.metrics.Labels()
for _, b := range buckets {
if b.RetentionPeriod == 0 {
continue
2018-10-05 16:57:49 +00:00
}
2019-02-04 19:26:21 +00:00
labels["status"] = "ok"
labels["org_id"] = b.OrganizationID.String()
labels["bucket_id"] = b.ID.String()
max := now.Add(-b.RetentionPeriod).UnixNano()
err := s.Engine.DeleteBucketRange(b.OrganizationID, b.ID, math.MinInt64, max)
if err != nil {
2019-02-04 19:26:21 +00:00
labels["status"] = "error"
2019-01-22 17:31:35 +00:00
logger.Info("unable to delete bucket range",
zap.String("bucket id", b.ID.String()),
zap.String("org id", b.OrganizationID.String()),
zap.Error(err))
2018-10-05 16:57:49 +00:00
}
2019-02-04 19:26:21 +00:00
s.metrics.Checks.With(labels).Inc()
}
2018-10-05 16:57:49 +00:00
}
// getBucketInformation returns a slice of buckets to run retention on.
func (s *retentionEnforcer) getBucketInformation() ([]*platform.Bucket, error) {
2018-10-05 16:57:49 +00:00
ctx, cancel := context.WithTimeout(context.Background(), bucketAPITimeout)
defer cancel()
2018-10-05 16:57:49 +00:00
buckets, _, err := s.BucketService.FindBuckets(ctx, platform.BucketFilter{})
return buckets, err
2018-10-05 16:57:49 +00:00
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
2018-10-10 11:36:02 +00:00
func (s *retentionEnforcer) PrometheusCollectors() []prometheus.Collector {
return s.metrics.PrometheusCollectors()
2018-10-05 16:57:49 +00:00
}