feat: Port 1.x retention policy enforcement service

Configuration of the check interval is available via

```
--storage-retention-check-interval
```

Closes #19309
pull/19525/head
Stuart Carnie 2020-09-08 16:02:37 -07:00
parent deb99b3885
commit 7c2be6e780
9 changed files with 698 additions and 439 deletions

View File

@ -435,6 +435,11 @@ func launcherOpts(l *Launcher) []cli.Opt {
Flag: "storage-tsm-use-madv-willneed",
Desc: "Controls whether we hint to the kernel that we intend to page in mmap'd sections of TSM files.",
},
{
DestP: &l.StorageConfig.RetentionInterval,
Flag: "storage-retention-check-interval",
Desc: "The interval of time when retention policy enforcement checks run.",
},
// InfluxQL Coordinator Config
{
@ -802,7 +807,6 @@ func (m *Launcher) run(ctx context.Context) (err error) {
// the testing engine will write/read into a temporary directory
engine := NewTemporaryEngine(
m.StorageConfig,
storage.WithRetentionEnforcer(ts.BucketService),
storage.WithMetaClient(metaClient),
)
flushers = append(flushers, engine)
@ -816,7 +820,6 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.engine = storage.NewEngine(
m.enginePath,
m.StorageConfig,
storage.WithRetentionEnforcer(ts.BucketService),
storage.WithMetaClient(metaClient),
)
}

1
go.mod
View File

@ -45,6 +45,7 @@ require (
github.com/google/go-querystring v1.0.0 // indirect
github.com/google/martian v2.1.1-0.20190517191504-25dcb96d9e51+incompatible // indirect
github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c // indirect
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/go-retryablehttp v0.6.4 // indirect
github.com/hashicorp/raft v1.0.0 // indirect
github.com/hashicorp/vault/api v1.0.2

View File

@ -2,15 +2,16 @@ package storage
import (
"context"
"fmt"
"io"
"path/filepath"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb"
_ "github.com/influxdata/influxdb/v2/tsdb/engine"
@ -18,28 +19,17 @@ import (
_ "github.com/influxdata/influxdb/v2/tsdb/index/tsi1"
"github.com/influxdata/influxdb/v2/v1/coordinator"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/influxdata/influxdb/v2/v1/services/retention"
"github.com/influxdata/influxql"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
// Static objects to prevent small allocs.
// var timeBytes = []byte("time")
// ErrEngineClosed is returned when a caller attempts to use the engine while
// it's closed.
var ErrEngineClosed = errors.New("engine is closed")
// runner lets us mock out the retention enforcer in tests
type runner interface{ run() }
// runnable is a function that lets the caller know if they can proceed with their
// task. A runnable returns a function that should be called by the caller to
// signal they finished their task.
type runnable func() (done func())
type Engine struct {
config Config
path string
@ -51,56 +41,19 @@ type Engine struct {
pointsWriter interface {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, user meta.User, points []models.Point) error
}
finder BucketFinder
retentionEnforcer runner
retentionEnforcerLimiter runnable
retentionService *retention.Service
defaultMetricLabels prometheus.Labels
writePointsValidationEnabled bool
// Tracks all goroutines started by the Engine.
wg sync.WaitGroup
logger *zap.Logger
}
// Option provides a set
type Option func(*Engine)
// WithRetentionEnforcer initialises a retention enforcer on the engine.
// WithRetentionEnforcer must be called after other options to ensure that all
// metrics are labelled correctly.
func WithRetentionEnforcer(finder BucketFinder) Option {
return func(e *Engine) {
e.finder = finder
// TODO - change retention enforce to take store
// e.retentionEnforcer = newRetentionEnforcer(e, e.engine, finder)
}
}
// WithRetentionEnforcerLimiter sets a limiter used to control when the
// retention enforcer can proceed. If this option is not used then the default
// limiter (or the absence of one) is a no-op, and no limitations will be put
// on running the retention enforcer.
func WithRetentionEnforcerLimiter(f runnable) Option {
return func(e *Engine) {
e.retentionEnforcerLimiter = f
}
}
// WithPageFaultLimiter allows the caller to set the limiter for restricting
// the frequency of page faults.
func WithPageFaultLimiter(limiter *rate.Limiter) Option {
return func(e *Engine) {
// TODO no longer needed
// e.engine.WithPageFaultLimiter(limiter)
// e.index.WithPageFaultLimiter(limiter)
// e.sfile.WithPageFaultLimiter(limiter)
}
}
func WithMetaClient(c MetaClient) Option {
return func(e *Engine) {
e.metaClient = c
@ -108,12 +61,15 @@ func WithMetaClient(c MetaClient) Option {
}
type MetaClient interface {
Database(name string) (di *meta.DatabaseInfo)
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
Database(name string) (di *meta.DatabaseInfo)
Databases() []meta.DatabaseInfo
DeleteShardGroup(database, policy string, id uint64) error
PruneShardGroups() error
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error)
UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error
}
type TSDBStore interface {
@ -155,9 +111,9 @@ func NewEngine(path string, c Config, options ...Option) *Engine {
pw.MetaClient = e.metaClient
e.pointsWriter = pw
if r, ok := e.retentionEnforcer.(*retentionEnforcer); ok {
r.SetDefaultMetricLabels(e.defaultMetricLabels)
}
e.retentionService = retention.NewService(retention.Config{Enabled: true, CheckInterval: c.RetentionInterval})
e.retentionService.TSDBStore = e.tsdbStore
e.retentionService.MetaClient = e.metaClient
return e
}
@ -173,17 +129,15 @@ func (e *Engine) WithLogger(log *zap.Logger) {
pw.Logger = e.logger
}
if r, ok := e.retentionEnforcer.(*retentionEnforcer); ok {
r.WithLogger(e.logger)
if e.retentionService != nil {
e.retentionService.WithLogger(log)
}
}
// PrometheusCollectors returns all the prometheus collectors associated with
// the engine and its components.
func (e *Engine) PrometheusCollectors() []prometheus.Collector {
var metrics []prometheus.Collector
metrics = append(metrics, RetentionPrometheusCollectors()...)
return metrics
return nil
}
// Open opens the store and all underlying resources. It returns an error if
@ -202,14 +156,13 @@ func (e *Engine) Open(ctx context.Context) (err error) {
if err := e.tsdbStore.Open(); err != nil {
return err
}
if err := e.retentionService.Open(); err != nil {
return err
}
e.closing = make(chan struct{})
// TODO(edd) background tasks will be run in priority order via a scheduler.
// For now we will just run on an interval as we only have the retention
// policy enforcer.
if e.retentionEnforcer != nil {
e.runRetentionEnforcer()
}
return nil
}
@ -221,72 +174,6 @@ func (e *Engine) EnableCompactions() {
func (e *Engine) DisableCompactions() {
}
// runRetentionEnforcer runs the retention enforcer in a separate goroutine.
//
// Currently this just runs on an interval, but in the future we will add the
// ability to reschedule the retention enforcement if there are not enough
// resources available.
func (e *Engine) runRetentionEnforcer() {
interval := time.Duration(e.config.RetentionInterval)
if interval == 0 {
e.logger.Info("Retention enforcer disabled")
return // Enforcer disabled.
} else if interval < 0 {
e.logger.Error("Negative retention interval", logger.DurationLiteral("check_interval", interval))
return
}
l := e.logger.With(zap.String("component", "retention_enforcer"), logger.DurationLiteral("check_interval", interval))
l.Info("Starting")
ticker := time.NewTicker(interval)
e.wg.Add(1)
go func() {
defer e.wg.Done()
for {
// It's safe to read closing without a lock because it's never
// modified if this goroutine is active.
select {
case <-e.closing:
l.Info("Stopping")
return
case <-ticker.C:
// canRun will signal to this goroutine that the enforcer can
// run. It will also carry from the blocking goroutine a function
// that needs to be called when the enforcer has finished its work.
canRun := make(chan func())
// This goroutine blocks until the retention enforcer has permission
// to proceed.
go func() {
if e.retentionEnforcerLimiter != nil {
// The limiter will block until the enforcer can proceed.
// The limiter returns a function that needs to be called
// when the enforcer has finished its work.
canRun <- e.retentionEnforcerLimiter()
return
}
canRun <- func() {}
}()
// Is it possible to get a slot? We need to be able to close
// whilst waiting...
select {
case <-e.closing:
l.Info("Stopping")
return
case done := <-canRun:
e.retentionEnforcer.run()
if done != nil {
done()
}
}
}
}
}()
}
// Close closes the store and all underlying resources. It returns an error if
// any of the underlying systems fail to close.
func (e *Engine) Close() error {
@ -301,15 +188,21 @@ func (e *Engine) Close() error {
close(e.closing)
e.mu.RUnlock()
// Wait for any other goroutines to finish.
e.wg.Wait()
e.mu.Lock()
defer e.mu.Unlock()
e.closing = nil
// TODO - Close tsdb store
return nil
var retErr *multierror.Error
if err := e.retentionService.Close(); err != nil {
retErr = multierror.Append(retErr, fmt.Errorf("error closing retention service: %w", err))
}
if err := e.tsdbStore.Close(); err != nil {
retErr = multierror.Append(retErr, fmt.Errorf("error closing TSDB store: %w", err))
}
return retErr.ErrorOrNil()
}
// WritePoints writes the provided points to the engine.

View File

@ -1,90 +0,0 @@
package storage
import (
"sort"
"sync"
"github.com/prometheus/client_golang/prometheus"
)
// The following package variables act as singletons, to be shared by all
// storage.Engine instantiations. This allows multiple Engines to be
// monitored within the same process.
var (
rms *retentionMetrics
mmu sync.RWMutex
)
// RetentionPrometheusCollectors returns all prometheus metrics for retention.
func RetentionPrometheusCollectors() []prometheus.Collector {
mmu.RLock()
defer mmu.RUnlock()
var collectors []prometheus.Collector
if rms != nil {
collectors = append(collectors, rms.PrometheusCollectors()...)
}
return collectors
}
// namespace is the leading part of all published metrics for the Storage service.
const namespace = "storage"
const retentionSubsystem = "retention" // sub-system associated with metrics for writing points.
// retentionMetrics is a set of metrics concerned with tracking data about retention policies.
type retentionMetrics struct {
labels prometheus.Labels
Checks *prometheus.CounterVec
CheckDuration *prometheus.HistogramVec
}
func newRetentionMetrics(labels prometheus.Labels) *retentionMetrics {
var names []string
for k := range labels {
names = append(names, k)
}
sort.Strings(names)
checksNames := append(append([]string(nil), names...), "status")
sort.Strings(checksNames)
checkDurationNames := append(append([]string(nil), names...), "status")
sort.Strings(checkDurationNames)
return &retentionMetrics{
labels: labels,
Checks: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: retentionSubsystem,
Name: "checks_total",
Help: "Number of retention check operations performed.",
}, checksNames),
CheckDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: retentionSubsystem,
Name: "check_duration_seconds",
Help: "Time taken to perform a successful retention check.",
// 25 buckets spaced exponentially between 10s and ~2h
Buckets: prometheus.ExponentialBuckets(10, 1.32, 25),
}, checkDurationNames),
}
}
// Labels returns a copy of labels for use with retention metrics.
func (m *retentionMetrics) Labels() prometheus.Labels {
l := make(map[string]string, len(m.labels))
for k, v := range m.labels {
l[k] = v
}
return l
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (rm *retentionMetrics) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
rm.Checks,
rm.CheckDuration,
}
}

View File

@ -2,219 +2,11 @@ package storage
import (
"context"
"errors"
"math"
"time"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
const (
bucketAPITimeout = 10 * time.Second
)
// A Deleter implementation is capable of deleting data from a storage engine.
type Deleter interface {
DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64) error
}
// A Snapshotter implementation can take snapshots of the entire engine.
type Snapshotter interface {
WriteSnapshot(ctx context.Context) error
}
// A BucketFinder is responsible for providing access to buckets via a filter.
type BucketFinder interface {
FindBuckets(context.Context, influxdb.BucketFilter, ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error)
}
// ErrServiceClosed is returned when the service is unavailable.
var ErrServiceClosed = errors.New("service is currently closed")
// The retentionEnforcer periodically removes data that is outside of the retention
// period of the bucket associated with the data.
type retentionEnforcer struct {
// Engine provides access to data stored on the engine
Engine Deleter
Snapshotter Snapshotter
// BucketService provides an API for retrieving buckets associated with
// organisations.
BucketService BucketFinder
logger *zap.Logger
tracker *retentionTracker
}
// SetDefaultMetricLabels sets the default labels for the retention metrics.
func (s *retentionEnforcer) SetDefaultMetricLabels(defaultLabels prometheus.Labels) {
if s == nil {
return // Not initialized
}
mmu.Lock()
if rms == nil {
rms = newRetentionMetrics(defaultLabels)
}
mmu.Unlock()
s.tracker = newRetentionTracker(rms, defaultLabels)
}
// WithLogger sets the logger l on the service. It must be called before any run calls.
func (s *retentionEnforcer) WithLogger(l *zap.Logger) {
if s == nil {
return // Not initialised
}
s.logger = l.With(zap.String("component", "retention_enforcer"))
}
// run periodically expires (deletes) all data that's fallen outside of the
// retention period for the associated bucket.
func (s *retentionEnforcer) run() {
if s == nil {
return // Not initialized
}
span, ctx := tracing.StartSpanFromContext(context.Background())
defer span.Finish()
log, logEnd := logger.NewOperation(ctx, s.logger, "Data retention check", "data_retention_check")
defer logEnd()
now := time.Now().UTC()
buckets, err := s.getBucketInformation(ctx)
if err != nil {
log.Error("Unable to determine bucket information", zap.Error(err))
} else {
s.expireData(ctx, buckets, now)
}
s.tracker.CheckDuration(time.Since(now), err == nil)
}
// expireData runs a delete operation on the storage engine.
//
// Any series data that (1) belongs to a bucket in the provided list and
// (2) falls outside the bucket's indicated retention period will be deleted.
func (s *retentionEnforcer) expireData(ctx context.Context, buckets []*influxdb.Bucket, now time.Time) {
logger, logEnd := logger.NewOperation(ctx, s.logger, "Data deletion", "data_deletion",
zap.Int("buckets", len(buckets)))
defer logEnd()
// Snapshot to clear the cache to reduce write contention.
if err := s.Snapshotter.WriteSnapshot(ctx); err != nil && err != tsm1.ErrSnapshotInProgress {
logger.Warn("Unable to snapshot cache before retention", zap.Error(err))
}
var skipInf, skipInvalid int
for _, b := range buckets {
bucketFields := []zapcore.Field{
zap.String("org_id", b.OrgID.String()),
zap.String("bucket_id", b.ID.String()),
zap.Duration("retention_period", b.RetentionPeriod),
zap.String("system_type", b.Type.String()),
}
if b.RetentionPeriod == 0 {
logger.Debug("Skipping bucket with infinite retention", bucketFields...)
skipInf++
continue
} else if !b.OrgID.Valid() || !b.ID.Valid() {
skipInvalid++
logger.Warn("Skipping bucket with invalid fields", bucketFields...)
continue
}
min := int64(math.MinInt64)
max := now.Add(-b.RetentionPeriod).UnixNano()
span, ctx := tracing.StartSpanFromContext(ctx)
span.LogKV(
"bucket_id", b.ID,
"org_id", b.OrgID,
"system_type", b.Type,
"retention_period", b.RetentionPeriod,
"retention_policy", b.RetentionPolicyName,
"from", time.Unix(0, min).UTC(),
"to", time.Unix(0, max).UTC(),
)
err := s.Engine.DeleteBucketRange(ctx, b.OrgID, b.ID, min, max)
if err != nil {
logger.Info("Unable to delete bucket range",
append(bucketFields, zap.Time("min", time.Unix(0, min)), zap.Time("max", time.Unix(0, max)), zap.Error(err))...)
tracing.LogError(span, err)
}
s.tracker.IncChecks(err == nil)
span.Finish()
}
if skipInf > 0 || skipInvalid > 0 {
logger.Info("Skipped buckets", zap.Int("infinite_retention_total", skipInf), zap.Int("invalid_total", skipInvalid))
}
}
// getBucketInformation returns a slice of buckets to run retention on.
func (s *retentionEnforcer) getBucketInformation(ctx context.Context) ([]*influxdb.Bucket, error) {
ctx, cancel := context.WithTimeout(ctx, bucketAPITimeout)
defer cancel()
buckets, _, err := s.BucketService.FindBuckets(ctx, influxdb.BucketFilter{})
return buckets, err
}
//
// metrics tracker
//
type retentionTracker struct {
metrics *retentionMetrics
labels prometheus.Labels
}
func newRetentionTracker(metrics *retentionMetrics, defaultLabels prometheus.Labels) *retentionTracker {
return &retentionTracker{metrics: metrics, labels: defaultLabels}
}
// Labels returns a copy of labels for use with index cache metrics.
func (t *retentionTracker) Labels() prometheus.Labels {
l := make(map[string]string, len(t.labels))
for k, v := range t.labels {
l[k] = v
}
return l
}
// IncChecks signals that a check happened for some bucket.
func (t *retentionTracker) IncChecks(success bool) {
labels := t.Labels()
if success {
labels["status"] = "ok"
} else {
labels["status"] = "error"
}
t.metrics.Checks.With(labels).Inc()
}
// CheckDuration records the overall duration of a full retention check.
func (t *retentionTracker) CheckDuration(dur time.Duration, success bool) {
labels := t.Labels()
if success {
labels["status"] = "ok"
} else {
labels["status"] = "error"
}
t.metrics.CheckDuration.With(labels).Observe(dur.Seconds())
}

View File

@ -0,0 +1,49 @@
package retention
import (
"errors"
"time"
"github.com/influxdata/influxdb/v2/toml"
"github.com/influxdata/influxdb/v2/v1/monitor/diagnostics"
)
// Config represents the configuration for the retention service.
type Config struct {
Enabled bool `toml:"enabled"`
CheckInterval toml.Duration `toml:"check-interval"`
}
// NewConfig returns an instance of Config with defaults.
func NewConfig() Config {
return Config{Enabled: true, CheckInterval: toml.Duration(30 * time.Minute)}
}
// Validate returns an error if the Config is invalid.
func (c Config) Validate() error {
if !c.Enabled {
return nil
}
// TODO: Should we enforce a minimum interval?
// Polling every nanosecond, for instance, will greatly impact performance.
if c.CheckInterval <= 0 {
return errors.New("check-interval must be positive")
}
return nil
}
// Diagnostics returns a diagnostics representation of a subset of the Config.
func (c Config) Diagnostics() (*diagnostics.Diagnostics, error) {
if !c.Enabled {
return diagnostics.RowFromMap(map[string]interface{}{
"enabled": false,
}), nil
}
return diagnostics.RowFromMap(map[string]interface{}{
"enabled": true,
"check-interval": c.CheckInterval,
}), nil
}

View File

@ -0,0 +1,51 @@
package retention_test
import (
"testing"
"time"
"github.com/BurntSushi/toml"
"github.com/influxdata/influxdb/v2/v1/services/retention"
)
func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c retention.Config
if _, err := toml.Decode(`
enabled = true
check-interval = "1s"
`, &c); err != nil {
t.Fatal(err)
}
// Validate configuration.
if !c.Enabled {
t.Fatalf("unexpected enabled state: %v", c.Enabled)
} else if time.Duration(c.CheckInterval) != time.Second {
t.Fatalf("unexpected check interval: %v", c.CheckInterval)
}
}
func TestConfig_Validate(t *testing.T) {
c := retention.NewConfig()
if err := c.Validate(); err != nil {
t.Fatalf("unexpected validation fail from NewConfig: %s", err)
}
c = retention.NewConfig()
c.CheckInterval = 0
if err := c.Validate(); err == nil {
t.Fatal("expected error for check-interval = 0, got nil")
}
c = retention.NewConfig()
c.CheckInterval *= -1
if err := c.Validate(); err == nil {
t.Fatal("expected error for negative check-interval, got nil")
}
c.Enabled = false
if err := c.Validate(); err != nil {
t.Fatalf("unexpected validation fail from disabled config: %s", err)
}
}

View File

@ -0,0 +1,163 @@
// Package retention provides the retention policy enforcement service.
package retention // import "github.com/influxdata/influxdb/services/retention"
import (
"context"
"sync"
"time"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"go.uber.org/zap"
)
// Service represents the retention policy enforcement service.
type Service struct {
MetaClient interface {
Databases() []meta.DatabaseInfo
DeleteShardGroup(database, policy string, id uint64) error
PruneShardGroups() error
}
TSDBStore interface {
ShardIDs() []uint64
DeleteShard(shardID uint64) error
}
config Config
wg sync.WaitGroup
done chan struct{}
logger *zap.Logger
}
// NewService returns a configured retention policy enforcement service.
func NewService(c Config) *Service {
return &Service{
config: c,
logger: zap.NewNop(),
}
}
// Open starts retention policy enforcement.
func (s *Service) Open() error {
if !s.config.Enabled || s.done != nil {
return nil
}
s.logger.Info("Starting retention policy enforcement service",
logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval)))
s.done = make(chan struct{})
s.wg.Add(1)
go func() { defer s.wg.Done(); s.run() }()
return nil
}
// Close stops retention policy enforcement.
func (s *Service) Close() error {
if !s.config.Enabled || s.done == nil {
return nil
}
s.logger.Info("Closing retention policy enforcement service")
close(s.done)
s.wg.Wait()
s.done = nil
return nil
}
// WithLogger sets the logger on the service.
func (s *Service) WithLogger(log *zap.Logger) {
s.logger = log.With(zap.String("service", "retention"))
}
func (s *Service) run() {
ticker := time.NewTicker(time.Duration(s.config.CheckInterval))
defer ticker.Stop()
for {
select {
case <-s.done:
return
case <-ticker.C:
log, logEnd := logger.NewOperation(context.Background(), s.logger, "Retention policy deletion check", "retention_delete_check")
type deletionInfo struct {
db string
rp string
}
deletedShardIDs := make(map[uint64]deletionInfo)
// Mark down if an error occurred during this function so we can inform the
// user that we will try again on the next interval.
// Without the message, they may see the error message and assume they
// have to do it manually.
var retryNeeded bool
dbs := s.MetaClient.Databases()
for _, d := range dbs {
for _, r := range d.RetentionPolicies {
// Build list of already deleted shards.
for _, g := range r.DeletedShardGroups() {
for _, sh := range g.Shards {
deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name}
}
}
// Determine all shards that have expired and need to be deleted.
for _, g := range r.ExpiredShardGroups(time.Now().UTC()) {
if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil {
log.Info("Failed to delete shard group",
logger.Database(d.Name),
logger.ShardGroup(g.ID),
logger.RetentionPolicy(r.Name),
zap.Error(err))
retryNeeded = true
continue
}
log.Info("Deleted shard group",
logger.Database(d.Name),
logger.ShardGroup(g.ID),
logger.RetentionPolicy(r.Name))
// Store all the shard IDs that may possibly need to be removed locally.
for _, sh := range g.Shards {
deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name}
}
}
}
}
// Remove shards if we store them locally
for _, id := range s.TSDBStore.ShardIDs() {
if info, ok := deletedShardIDs[id]; ok {
if err := s.TSDBStore.DeleteShard(id); err != nil {
log.Info("Failed to delete shard",
logger.Database(info.db),
logger.Shard(id),
logger.RetentionPolicy(info.rp),
zap.Error(err))
retryNeeded = true
continue
}
log.Info("Deleted shard",
logger.Database(info.db),
logger.Shard(id),
logger.RetentionPolicy(info.rp))
}
}
if err := s.MetaClient.PruneShardGroups(); err != nil {
log.Info("Problem pruning shard groups", zap.Error(err))
retryNeeded = true
}
if retryNeeded {
log.Info("One or more errors occurred during shard deletion and will be retried on the next check", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval)))
}
logEnd()
}
}
}

View File

@ -0,0 +1,397 @@
package retention_test
import (
"bytes"
"fmt"
"reflect"
"sync"
"testing"
"time"
"github.com/influxdata/influxdb/v2/logger"
"github.com/influxdata/influxdb/v2/toml"
"github.com/influxdata/influxdb/v2/v1/internal"
"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/influxdata/influxdb/v2/v1/services/retention"
)
func TestService_OpenDisabled(t *testing.T) {
// Opening a disabled service should be a no-op.
c := retention.NewConfig()
c.Enabled = false
s := NewService(c)
if err := s.Open(); err != nil {
t.Fatal(err)
}
if s.LogBuf.String() != "" {
t.Fatalf("service logged %q, didn't expect any logging", s.LogBuf.String())
}
}
func TestService_OpenClose(t *testing.T) {
// Opening a disabled service should be a no-op.
s := NewService(retention.NewConfig())
if err := s.Open(); err != nil {
t.Fatal(err)
}
if s.LogBuf.String() == "" {
t.Fatal("service didn't log anything on open")
}
// Reopening is a no-op
if err := s.Open(); err != nil {
t.Fatal(err)
}
if err := s.Close(); err != nil {
t.Fatal(err)
}
// Re-closing is a no-op
if err := s.Close(); err != nil {
t.Fatal(err)
}
}
func TestService_CheckShards(t *testing.T) {
now := time.Now()
// Account for any time difference that could cause some of the logic in
// this test to fail due to a race condition. If we are at the very end of
// the hour, we can choose a time interval based on one "now" time and then
// run the retention service in the next hour. If we're in one of those
// situations, wait 100 milliseconds until we're in the next hour.
if got, want := now.Add(100*time.Millisecond).Truncate(time.Hour), now.Truncate(time.Hour); !got.Equal(want) {
time.Sleep(100 * time.Millisecond)
}
data := []meta.DatabaseInfo{
{
Name: "db0",
DefaultRetentionPolicy: "rp0",
RetentionPolicies: []meta.RetentionPolicyInfo{
{
Name: "rp0",
ReplicaN: 1,
Duration: time.Hour,
ShardGroupDuration: time.Hour,
ShardGroups: []meta.ShardGroupInfo{
{
ID: 1,
StartTime: now.Truncate(time.Hour).Add(-2 * time.Hour),
EndTime: now.Truncate(time.Hour).Add(-1 * time.Hour),
Shards: []meta.ShardInfo{
{ID: 2},
{ID: 3},
},
},
{
ID: 4,
StartTime: now.Truncate(time.Hour).Add(-1 * time.Hour),
EndTime: now.Truncate(time.Hour),
Shards: []meta.ShardInfo{
{ID: 5},
{ID: 6},
},
},
{
ID: 7,
StartTime: now.Truncate(time.Hour),
EndTime: now.Truncate(time.Hour).Add(time.Hour),
Shards: []meta.ShardInfo{
{ID: 8},
{ID: 9},
},
},
},
},
},
},
}
config := retention.NewConfig()
config.CheckInterval = toml.Duration(10 * time.Millisecond)
s := NewService(config)
s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo {
return data
}
done := make(chan struct{})
deletedShardGroups := make(map[string]struct{})
s.MetaClient.DeleteShardGroupFn = func(database, policy string, id uint64) error {
for _, dbi := range data {
if dbi.Name == database {
for _, rpi := range dbi.RetentionPolicies {
if rpi.Name == policy {
for i, sg := range rpi.ShardGroups {
if sg.ID == id {
rpi.ShardGroups[i].DeletedAt = time.Now().UTC()
}
}
}
}
}
}
deletedShardGroups[fmt.Sprintf("%s.%s.%d", database, policy, id)] = struct{}{}
if got, want := deletedShardGroups, map[string]struct{}{
"db0.rp0.1": struct{}{},
}; reflect.DeepEqual(got, want) {
close(done)
} else if len(got) > 1 {
t.Errorf("deleted too many shard groups")
}
return nil
}
pruned := false
closing := make(chan struct{})
s.MetaClient.PruneShardGroupsFn = func() error {
select {
case <-done:
if !pruned {
close(closing)
pruned = true
}
default:
}
return nil
}
deletedShards := make(map[uint64]struct{})
s.TSDBStore.ShardIDsFn = func() []uint64 {
return []uint64{2, 3, 5, 6}
}
s.TSDBStore.DeleteShardFn = func(shardID uint64) error {
deletedShards[shardID] = struct{}{}
return nil
}
if err := s.Open(); err != nil {
t.Fatalf("unexpected open error: %s", err)
}
defer func() {
if err := s.Close(); err != nil {
t.Fatalf("unexpected close error: %s", err)
}
}()
timer := time.NewTimer(100 * time.Millisecond)
select {
case <-done:
timer.Stop()
case <-timer.C:
t.Errorf("timeout waiting for shard groups to be deleted")
return
}
timer = time.NewTimer(100 * time.Millisecond)
select {
case <-closing:
timer.Stop()
case <-timer.C:
t.Errorf("timeout waiting for shards to be deleted")
return
}
if got, want := deletedShards, map[uint64]struct{}{
2: struct{}{},
3: struct{}{},
}; !reflect.DeepEqual(got, want) {
t.Errorf("unexpected deleted shards: got=%#v want=%#v", got, want)
}
}
// This reproduces https://github.com/influxdata/influxdb/issues/8819
func TestService_8819_repro(t *testing.T) {
for i := 0; i < 1000; i++ {
s, errC, done := testService_8819_repro(t)
if err := s.Open(); err != nil {
t.Fatal(err)
}
// Wait for service to run one sweep of all dbs/rps/shards.
if err := <-errC; err != nil {
t.Fatalf("%dth iteration: %v", i, err)
}
// Mark that we do not expect more errors in case it runs one more time.
close(done)
if err := s.Close(); err != nil {
t.Fatal(err)
}
}
}
func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{}) {
c := retention.NewConfig()
c.CheckInterval = toml.Duration(time.Millisecond)
s := NewService(c)
errC := make(chan error, 1) // Buffer Important to prevent deadlock.
done := make(chan struct{})
// A database and a bunch of shards
var mu sync.Mutex
shards := []uint64{3, 5, 8, 9, 11, 12}
localShards := []uint64{3, 5, 8, 9, 11, 12}
databases := []meta.DatabaseInfo{
{
Name: "db0",
RetentionPolicies: []meta.RetentionPolicyInfo{
{
Name: "autogen",
Duration: 24 * time.Hour,
ShardGroupDuration: 24 * time.Hour,
ShardGroups: []meta.ShardGroupInfo{
{
ID: 1,
StartTime: time.Date(1980, 1, 1, 0, 0, 0, 0, time.UTC),
EndTime: time.Date(1981, 1, 1, 0, 0, 0, 0, time.UTC),
Shards: []meta.ShardInfo{
{ID: 3}, {ID: 9},
},
},
{
ID: 2,
StartTime: time.Now().Add(-1 * time.Hour),
EndTime: time.Now(),
DeletedAt: time.Now(),
Shards: []meta.ShardInfo{
{ID: 11}, {ID: 12},
},
},
},
},
},
},
}
sendError := func(err error) {
select {
case errC <- err:
case <-done:
}
}
s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo {
mu.Lock()
defer mu.Unlock()
return databases
}
s.MetaClient.DeleteShardGroupFn = func(database string, policy string, id uint64) error {
if database != "db0" {
sendError(fmt.Errorf("wrong db name: %s", database))
return nil
} else if policy != "autogen" {
sendError(fmt.Errorf("wrong rp name: %s", policy))
return nil
} else if id != 1 {
sendError(fmt.Errorf("wrong shard group id: %d", id))
return nil
}
// remove the associated shards (3 and 9) from the shards slice...
mu.Lock()
newShards := make([]uint64, 0, len(shards))
for _, sid := range shards {
if sid != 3 && sid != 9 {
newShards = append(newShards, sid)
}
}
shards = newShards
databases[0].RetentionPolicies[0].ShardGroups[0].DeletedAt = time.Now().UTC()
mu.Unlock()
return nil
}
s.MetaClient.PruneShardGroupsFn = func() error {
// When this is called all shards that have been deleted from the meta
// store (expired) should also have been deleted from disk.
// If they haven't then that indicates that shards can be removed from
// the meta store and there can be a race where they haven't yet been
// removed from the local disk and indexes. This has an impact on, for
// example, the max series per database limit.
mu.Lock()
defer mu.Unlock()
for _, lid := range localShards {
var found bool
for _, mid := range shards {
if lid == mid {
found = true
break
}
}
if !found {
sendError(fmt.Errorf("local shard %d present, yet it's missing from meta store. %v -- %v ", lid, shards, localShards))
return nil
}
}
// We should have removed shards 3 and 9
if !reflect.DeepEqual(localShards, []uint64{5, 8}) {
sendError(fmt.Errorf("removed shards still present locally: %v", localShards))
return nil
}
sendError(nil)
return nil
}
s.TSDBStore.ShardIDsFn = func() []uint64 {
mu.Lock()
defer mu.Unlock()
return localShards
}
s.TSDBStore.DeleteShardFn = func(id uint64) error {
var found bool
mu.Lock()
newShards := make([]uint64, 0, len(localShards))
for _, sid := range localShards {
if sid != id {
newShards = append(newShards, sid)
} else {
found = true
}
}
localShards = newShards
mu.Unlock()
if !found {
return fmt.Errorf("shard %d not found locally", id)
}
return nil
}
return s, errC, done
}
type Service struct {
MetaClient *internal.MetaClientMock
TSDBStore *internal.TSDBStoreMock
LogBuf bytes.Buffer
*retention.Service
}
func NewService(c retention.Config) *Service {
s := &Service{
MetaClient: &internal.MetaClientMock{},
TSDBStore: &internal.TSDBStoreMock{},
Service: retention.NewService(c),
}
l := logger.New(&s.LogBuf)
s.WithLogger(l)
s.Service.MetaClient = s.MetaClient
s.Service.TSDBStore = s.TSDBStore
return s
}