fix(storage): move retention snapshot out of per bucket calls (#14420)
* fix(storage): move retention snapshot out of per bucket calls Also adds tracking for snapshots from retention and full compactions.pull/14431/head
parent
c0253d868e
commit
48ee7ada04
|
@ -93,7 +93,7 @@ func WithNodeID(id int) Option {
|
|||
// metrics are labelled correctly.
|
||||
func WithRetentionEnforcer(finder BucketFinder) Option {
|
||||
return func(e *Engine) {
|
||||
e.retentionEnforcer = newRetentionEnforcer(e, finder)
|
||||
e.retentionEnforcer = newRetentionEnforcer(e, e.engine, finder)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -531,11 +531,6 @@ func (e *Engine) DeleteBucket(orgID, bucketID platform.ID) error {
|
|||
|
||||
// DeleteBucketRange deletes an entire bucket from the storage engine.
|
||||
func (e *Engine) DeleteBucketRange(orgID, bucketID platform.ID, min, max int64) error {
|
||||
// Snapshot to clear the cache to reduce write contention.
|
||||
if err := e.engine.WriteSnapshot(context.Background()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.mu.RLock()
|
||||
defer e.mu.RUnlock()
|
||||
if e.closing == nil {
|
||||
|
|
|
@ -3,6 +3,7 @@ package storage
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/influxdata/influxdb/tsdb/tsm1"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
|
@ -22,6 +23,11 @@ type Deleter interface {
|
|||
DeleteBucketRange(orgID, bucketID influxdb.ID, min, max int64) error
|
||||
}
|
||||
|
||||
// A Snapshotter implementation can take snapshots of the entire engine.
|
||||
type Snapshotter interface {
|
||||
WriteSnapshot(ctx context.Context, status tsm1.CacheStatus) error
|
||||
}
|
||||
|
||||
// A BucketFinder is responsible for providing access to buckets via a filter.
|
||||
type BucketFinder interface {
|
||||
FindBuckets(context.Context, influxdb.BucketFilter, ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error)
|
||||
|
@ -36,6 +42,8 @@ type retentionEnforcer struct {
|
|||
// Engine provides access to data stored on the engine
|
||||
Engine Deleter
|
||||
|
||||
Snapshotter Snapshotter
|
||||
|
||||
// BucketService provides an API for retrieving buckets associated with
|
||||
// organisations.
|
||||
BucketService BucketFinder
|
||||
|
@ -48,9 +56,10 @@ type retentionEnforcer struct {
|
|||
// newRetentionEnforcer returns a new enforcer that ensures expired data is
|
||||
// deleted every interval period. Setting interval to 0 is equivalent to
|
||||
// disabling the service.
|
||||
func newRetentionEnforcer(engine Deleter, bucketService BucketFinder) *retentionEnforcer {
|
||||
func newRetentionEnforcer(engine Deleter, snapshotter Snapshotter, bucketService BucketFinder) *retentionEnforcer {
|
||||
return &retentionEnforcer{
|
||||
Engine: engine,
|
||||
Snapshotter: snapshotter,
|
||||
BucketService: bucketService,
|
||||
logger: zap.NewNop(),
|
||||
tracker: newRetentionTracker(newRetentionMetrics(nil), nil),
|
||||
|
@ -111,6 +120,11 @@ func (s *retentionEnforcer) expireData(ctx context.Context, buckets []*influxdb.
|
|||
logger, logEnd := logger.NewOperation(ctx, s.logger, "Data deletion", "data_deletion")
|
||||
defer logEnd()
|
||||
|
||||
// Snapshot to clear the cache to reduce write contention.
|
||||
if err := s.Snapshotter.WriteSnapshot(ctx, tsm1.CacheStatusRetention); err != nil && err != tsm1.ErrSnapshotInProgress {
|
||||
logger.Warn("Unable to snapshot cache before retention", zap.Error(err))
|
||||
}
|
||||
|
||||
for _, b := range buckets {
|
||||
if b.RetentionPeriod == 0 {
|
||||
continue
|
||||
|
|
|
@ -2,6 +2,7 @@ package storage
|
|||
|
||||
import (
|
||||
"context"
|
||||
"github.com/influxdata/influxdb/tsdb/tsm1"
|
||||
"math"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
|
@ -16,7 +17,7 @@ import (
|
|||
|
||||
func TestRetentionService(t *testing.T) {
|
||||
engine := NewTestEngine()
|
||||
service := newRetentionEnforcer(engine, NewTestBucketFinder())
|
||||
service := newRetentionEnforcer(engine, &TestSnapshotter{}, NewTestBucketFinder())
|
||||
now := time.Date(2018, 4, 10, 23, 12, 33, 0, time.UTC)
|
||||
|
||||
t.Run("no buckets", func(t *testing.T) {
|
||||
|
@ -164,6 +165,12 @@ func (e *TestEngine) DeleteBucketRange(orgID, bucketID influxdb.ID, min, max int
|
|||
return e.DeleteBucketRangeFn(orgID, bucketID, min, max)
|
||||
}
|
||||
|
||||
type TestSnapshotter struct{}
|
||||
|
||||
func (s *TestSnapshotter) WriteSnapshot(ctx context.Context, status tsm1.CacheStatus) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type TestBucketFinder struct {
|
||||
FindBucketsFn func(context.Context, influxdb.BucketFilter, ...influxdb.FindOptions) ([]*influxdb.Bucket, int, error)
|
||||
}
|
||||
|
|
|
@ -430,7 +430,7 @@ func (e *Engine) disableSnapshotCompactions() {
|
|||
// TSM files. This is an expensive operation.
|
||||
func (e *Engine) ScheduleFullCompaction(ctx context.Context) error {
|
||||
// Snapshot any data in the cache
|
||||
if err := e.WriteSnapshot(ctx); err != nil {
|
||||
if err := e.WriteSnapshot(ctx, CacheStatusFullCompaction); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -775,8 +775,23 @@ func (t *compactionTracker) SetOptimiseQueue(length uint64) { t.SetQueue(4, leng
|
|||
// SetFullQueue sets the queue depth for Full compactions.
|
||||
func (t *compactionTracker) SetFullQueue(length uint64) { t.SetQueue(5, length) }
|
||||
|
||||
func (e *Engine) WriteSnapshot(ctx context.Context, status CacheStatus) error {
|
||||
start := time.Now()
|
||||
err := e.writeSnapshot(ctx)
|
||||
if err != nil && err != errCompactionsDisabled {
|
||||
e.logger.Info("Error writing snapshot", zap.Error(err))
|
||||
}
|
||||
e.compactionTracker.SnapshotAttempted(err == nil || err == errCompactionsDisabled ||
|
||||
err == ErrSnapshotInProgress, status, time.Since(start))
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteSnapshot will snapshot the cache and write a new TSM file with its contents, releasing the snapshot when done.
|
||||
func (e *Engine) WriteSnapshot(ctx context.Context) error {
|
||||
func (e *Engine) writeSnapshot(ctx context.Context) error {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
|
@ -874,15 +889,13 @@ func (e *Engine) compactCache() {
|
|||
continue
|
||||
}
|
||||
|
||||
span, ctx := tracing.StartSpanFromContextWithOperationName(context.Background(), "Engine.compactCache <-t.C")
|
||||
span, ctx := tracing.StartSpanFromContextWithOperationName(context.Background(), "compact cache")
|
||||
span.LogKV("path", e.path)
|
||||
|
||||
start := time.Now()
|
||||
err := e.WriteSnapshot(ctx)
|
||||
if err != nil && err != errCompactionsDisabled {
|
||||
err := e.WriteSnapshot(ctx, status)
|
||||
if err != nil && err != errCompactionsDisabled && err != ErrSnapshotInProgress {
|
||||
e.logger.Info("Error writing snapshot", zap.Error(err))
|
||||
}
|
||||
e.compactionTracker.SnapshotAttempted(err == nil || err == errCompactionsDisabled, status, time.Since(start))
|
||||
|
||||
span.Finish()
|
||||
}
|
||||
|
@ -895,10 +908,12 @@ type CacheStatus int
|
|||
|
||||
// Possible types of Cache status
|
||||
const (
|
||||
CacheStatusOkay CacheStatus = iota // Cache is Okay - do not snapshot.
|
||||
CacheStatusSizeExceeded // The cache is large enough to be snapshotted.
|
||||
CacheStatusAgeExceeded // The cache is past the age threshold to be snapshotted.
|
||||
CacheStatusColdNoWrites // The cache has not been written to for long enough that it should be snapshotted.
|
||||
CacheStatusOkay CacheStatus = iota // Cache is Okay - do not snapshot.
|
||||
CacheStatusSizeExceeded // The cache is large enough to be snapshotted.
|
||||
CacheStatusAgeExceeded // The cache is past the age threshold to be snapshotted.
|
||||
CacheStatusColdNoWrites // The cache has not been written to for long enough that it should be snapshotted.
|
||||
CacheStatusRetention // The cache was snapshotted before running retention.
|
||||
CacheStatusFullCompaction // The cache was snapshotted as part of a full compaction.
|
||||
)
|
||||
|
||||
// ShouldCompactCache returns a status indicating if the Cache should be
|
||||
|
|
|
@ -3,6 +3,7 @@ package tsm1_test
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"github.com/influxdata/influxdb/tsdb/tsm1"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
|
@ -33,7 +34,7 @@ func TestEngine_DeletePrefix(t *testing.T) {
|
|||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := e.WriteSnapshot(context.Background()); err != nil {
|
||||
if err := e.WriteSnapshot(context.Background(), tsm1.CacheStatusColdNoWrites); err != nil {
|
||||
t.Fatalf("failed to snapshot: %s", err.Error())
|
||||
}
|
||||
|
||||
|
|
|
@ -124,7 +124,7 @@ func TestEngine_SnapshotsDisabled(t *testing.T) {
|
|||
|
||||
// Writing a snapshot should not fail when the snapshot is empty
|
||||
// even if snapshots are disabled.
|
||||
if err := e.WriteSnapshot(context.Background()); err != nil {
|
||||
if err := e.WriteSnapshot(context.Background(), tsm1.CacheStatusColdNoWrites); err != nil {
|
||||
t.Fatalf("failed to snapshot: %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
@ -463,7 +463,7 @@ func (e *Engine) MustAddSeries(name string, tags map[string]string) {
|
|||
|
||||
// MustWriteSnapshot forces a snapshot of the engine. Panic on error.
|
||||
func (e *Engine) MustWriteSnapshot() {
|
||||
if err := e.WriteSnapshot(context.Background()); err != nil {
|
||||
if err := e.WriteSnapshot(context.Background(), tsm1.CacheStatusColdNoWrites); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue