Rename retention service

pull/10616/head
Edd Robinson 2018-10-10 12:36:02 +01:00
parent 1f2f03d93b
commit d5ed17adb2
4 changed files with 39 additions and 48 deletions

View File

@ -217,7 +217,7 @@ func platformF(cmd *cobra.Command, args []string) {
config.EngineOptions.WALEnabled = true // Enable a disk-based WAL.
config.EngineOptions.Config = config.Config
engine := storage.NewEngine(enginePath, config, storage.WithRetentionService(bucketSvc))
engine := storage.NewEngine(enginePath, config, storage.WithRetentionEnforcer(bucketSvc))
engine.WithLogger(logger)
reg.MustRegister(engine.PrometheusCollectors()...)

View File

@ -30,12 +30,12 @@ type Engine struct {
engineID *int // Not used by default.
nodeID *int // Not used by default.
mu sync.RWMutex
open bool
index *tsi1.Index
sfile *tsdb.SeriesFile
engine *tsm1.Engine
retentionService *retentionService
mu sync.RWMutex
open bool
index *tsi1.Index
sfile *tsdb.SeriesFile
engine *tsm1.Engine
retentionEnforcer *retentionEnforcer
logger *zap.Logger
}
@ -67,12 +67,12 @@ var WithNodeID = func(id int) Option {
}
}
// WithRetentionService initialises a retention service on the engine.
// WithRetentionService must be called after other options to ensure that all
// WithRetentionEnforcer initialises a retention enforcer on the engine.
// WithRetentionEnforcer must be called after other options to ensure that all
// metrics are labelled correctly.
var WithRetentionService = func(finder BucketFinder) Option {
var WithRetentionEnforcer = func(finder BucketFinder) Option {
return func(e *Engine) {
e.retentionService = newRetentionService(e, finder, e.config.RetentionInterval)
e.retentionEnforcer = newRetentionEnforcer(e, finder, e.config.RetentionInterval)
labels := prometheus.Labels(map[string]string{"status": ""})
if e.engineID != nil {
@ -82,8 +82,8 @@ var WithRetentionService = func(finder BucketFinder) Option {
if e.nodeID != nil {
labels["node_id"] = fmt.Sprint(*e.nodeID)
}
e.retentionService.defaultMetricLabels = labels
e.retentionService.retentionMetrics = newRetentionMetrics(labels)
e.retentionEnforcer.defaultMetricLabels = labels
e.retentionEnforcer.retentionMetrics = newRetentionMetrics(labels)
}
}
@ -132,7 +132,7 @@ func (e *Engine) WithLogger(log *zap.Logger) {
e.sfile.WithLogger(e.logger)
e.index.WithLogger(e.logger)
e.engine.WithLogger(e.logger)
e.retentionService.WithLogger(e.logger)
e.retentionEnforcer.WithLogger(e.logger)
}
// PrometheusCollectors returns all the prometheus collectors associated with
@ -142,7 +142,7 @@ func (e *Engine) PrometheusCollectors() []prometheus.Collector {
// TODO(edd): Get prom metrics for TSM.
// TODO(edd): Get prom metrics for index.
// TODO(edd): Get prom metrics for series file.
metrics = append(metrics, e.retentionService.PrometheusCollectors()...)
metrics = append(metrics, e.retentionEnforcer.PrometheusCollectors()...)
return metrics
}
@ -169,7 +169,7 @@ func (e *Engine) Open() error {
}
e.engine.SetCompactionsEnabled(true) // TODO(edd):is this needed?
if err := e.retentionService.Open(); err != nil {
if err := e.retentionEnforcer.Open(); err != nil {
return err
}
@ -188,7 +188,7 @@ func (e *Engine) Close() error {
}
e.open = false
if err := e.retentionService.Close(); err != nil {
if err := e.retentionEnforcer.Close(); err != nil {
return err
}

View File

@ -12,7 +12,6 @@ import (
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxql"
"github.com/influxdata/platform"
"github.com/influxdata/platform/kit/prom"
"github.com/influxdata/platform/models"
"github.com/influxdata/platform/tsdb"
"github.com/prometheus/client_golang/prometheus"
@ -25,14 +24,6 @@ const (
engineAPITimeout = time.Minute
)
type RetentionService interface {
prom.PrometheusCollector
Open() error
Close() error
WithLogger(*zap.Logger)
}
// A Deleter implementation is capable of deleting data from a storage engine.
type Deleter interface {
CreateSeriesCursor(context.Context, SeriesCursorRequest, influxql.Expr) (SeriesCursor, error)
@ -47,9 +38,9 @@ type BucketFinder interface {
// ErrServiceClosed is returned when the service is unavailable.
var ErrServiceClosed = errors.New("service is currently closed")
// The retentionService periodically removes data that is outside of the retention
// The retentionEnforcer periodically removes data that is outside of the retention
// period of the bucket associated with the data.
type retentionService struct {
type retentionEnforcer struct {
// Engine provides access to data stored on the engine
Engine Deleter
@ -69,11 +60,11 @@ type retentionService struct {
wg sync.WaitGroup
}
// newRetentionService returns a new Service that performs deletes
// every interval period. Setting interval to 0 is equivalent to disabling the
// service.
func newRetentionService(engine Deleter, bucketService BucketFinder, interval int64) *retentionService {
s := &retentionService{
// newRetentionEnforcer returns a new enforcer that ensures expired data is
// deleted every interval period. Setting interval to 0 is equivalent to
// disabling the service.
func newRetentionEnforcer(engine Deleter, bucketService BucketFinder, interval int64) *retentionEnforcer {
s := &retentionEnforcer{
Engine: engine,
BucketService: bucketService,
logger: zap.NewNop(),
@ -83,7 +74,7 @@ func newRetentionService(engine Deleter, bucketService BucketFinder, interval in
}
// metricLabels returns a new copy of the default metric labels.
func (s *retentionService) metricLabels() prometheus.Labels {
func (s *retentionEnforcer) metricLabels() prometheus.Labels {
labels := make(map[string]string, len(s.defaultMetricLabels))
for k, v := range s.defaultMetricLabels {
labels[k] = v
@ -92,7 +83,7 @@ func (s *retentionService) metricLabels() prometheus.Labels {
}
// WithLogger sets the logger l on the service. It must be called before Open.
func (s *retentionService) WithLogger(l *zap.Logger) {
func (s *retentionEnforcer) WithLogger(l *zap.Logger) {
if s == nil {
return // Not initialised
}
@ -101,7 +92,7 @@ func (s *retentionService) WithLogger(l *zap.Logger) {
// Open opens the service, which begins the process of removing expired data.
// Re-opening the service once it's open is a no-op.
func (s *retentionService) Open() error {
func (s *retentionEnforcer) Open() error {
if s == nil || s.closing() != nil {
return nil // Not initialised or already open.
}
@ -124,7 +115,7 @@ func (s *retentionService) Open() error {
// run periodically expires (deletes) all data that's fallen outside of the
// retention period for the associated bucket.
func (s *retentionService) run() {
func (s *retentionEnforcer) run() {
if s.interval == 0 {
s.logger.Info("Service disabled")
return
@ -170,7 +161,7 @@ func (s *retentionService) run() {
//
// Any series data that (1) belongs to a bucket in the provided map and
// (2) falls outside the bucket's indicated retention period will be deleted.
func (s *retentionService) expireData(rpByBucketID map[string]time.Duration, now time.Time) error {
func (s *retentionEnforcer) expireData(rpByBucketID map[string]time.Duration, now time.Time) error {
_, logEnd := logger.NewOperation(s.logger, "Data deletion", "data_deletion")
defer logEnd()
@ -240,7 +231,7 @@ func (s *retentionService) expireData(rpByBucketID map[string]time.Duration, now
// getRetentionPeriodPerBucket returns a map of (bucket ID -> retention period)
// for all buckets.
func (s *retentionService) getRetentionPeriodPerBucket() (map[string]time.Duration, error) {
func (s *retentionEnforcer) getRetentionPeriodPerBucket() (map[string]time.Duration, error) {
ctx, cancel := context.WithTimeout(context.Background(), bucketAPITimeout)
defer cancel()
buckets, _, err := s.BucketService.FindBuckets(ctx, platform.BucketFilter{})
@ -255,7 +246,7 @@ func (s *retentionService) getRetentionPeriodPerBucket() (map[string]time.Durati
}
// closing returns a channel to signal that the service is closing.
func (s *retentionService) closing() chan struct{} {
func (s *retentionEnforcer) closing() chan struct{} {
s.mu.RLock()
defer s.mu.RUnlock()
return s._closing
@ -265,7 +256,7 @@ func (s *retentionService) closing() chan struct{} {
//
// If a delete of data is in-progress, then it will be allowed to complete before
// Close returns. Re-closing the service once it's closed is a no-op.
func (s *retentionService) Close() error {
func (s *retentionEnforcer) Close() error {
if s == nil || s.closing() == nil {
return nil // Not initialised or already closed.
}
@ -283,14 +274,14 @@ func (s *retentionService) Close() error {
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (s *retentionService) PrometheusCollectors() []prometheus.Collector {
func (s *retentionEnforcer) PrometheusCollectors() []prometheus.Collector {
if s.retentionMetrics != nil {
return s.retentionMetrics.PrometheusCollectors()
}
return nil
}
// A BucketService is an platform.BucketService that the RetentionService can open,
// A BucketService is an platform.BucketService that the retentionEnforcer can open,
// close and log.
type BucketService interface {
platform.BucketService

View File

@ -17,7 +17,7 @@ import (
func TestService_Open(t *testing.T) {
t.Run("negative interval", func(t *testing.T) {
service := newRetentionService(nil, nil, -1)
service := newRetentionEnforcer(nil, nil, -1)
defer service.Close()
if err := service.Open(); err == nil {
t.Fatal("didn't get error, expected one")
@ -25,7 +25,7 @@ func TestService_Open(t *testing.T) {
})
t.Run("disabled service", func(t *testing.T) {
service := newRetentionService(NewTestEngine(), NewTestBucketFinder(), 0)
service := newRetentionEnforcer(NewTestEngine(), NewTestBucketFinder(), 0)
defer service.Close()
if err := service.Open(); err != nil {
t.Fatalf("got error %v", err)
@ -48,7 +48,7 @@ func TestService_Open(t *testing.T) {
})
t.Run("idempotency", func(t *testing.T) {
service := newRetentionService(NewTestEngine(), NewTestBucketFinder(), 0)
service := newRetentionEnforcer(NewTestEngine(), NewTestBucketFinder(), 0)
defer service.Close()
if err := service.Open(); err != nil {
t.Fatalf("got error %v", err)
@ -63,7 +63,7 @@ func TestService_Open(t *testing.T) {
func TestService_Close(t *testing.T) {
t.Run("idempotency", func(t *testing.T) {
service := newRetentionService(NewTestEngine(), NewTestBucketFinder(), 1)
service := newRetentionEnforcer(NewTestEngine(), NewTestBucketFinder(), 1)
if err := service.Open(); err != nil {
t.Fatalf("got error %v", err)
}
@ -81,7 +81,7 @@ func TestService_Close(t *testing.T) {
func TestService_expireData(t *testing.T) {
engine := NewTestEngine()
service := newRetentionService(engine, NewTestBucketFinder(), 0)
service := newRetentionEnforcer(engine, NewTestBucketFinder(), 0)
now := time.Date(2018, 4, 10, 23, 12, 33, 0, time.UTC)
t.Run("no rpByBucketID", func(t *testing.T) {