diff --git a/storage/bucket_service.go b/storage/bucket_service.go index 4353d50dcf..6ef541ec22 100644 --- a/storage/bucket_service.go +++ b/storage/bucket_service.go @@ -10,7 +10,7 @@ import ( // BucketDeleter defines the behaviour of deleting a bucket. type BucketDeleter interface { - DeleteBucket(platform.ID, platform.ID) error + DeleteBucket(context.Context, platform.ID, platform.ID) error } // BucketService wraps an existing platform.BucketService implementation. @@ -102,7 +102,7 @@ func (s *BucketService) DeleteBucket(ctx context.Context, bucketID platform.ID) // The data is dropped first from the storage engine. If this fails for any // reason, then the bucket will still be available in the future to retrieve // the orgID, which is needed for the engine. - if err := s.engine.DeleteBucket(bucket.OrgID, bucketID); err != nil { + if err := s.engine.DeleteBucket(ctx, bucket.OrgID, bucketID); err != nil { return err } return s.inner.DeleteBucket(ctx, bucketID) diff --git a/storage/bucket_service_test.go b/storage/bucket_service_test.go index ee4692a36e..e571a0a29d 100644 --- a/storage/bucket_service_test.go +++ b/storage/bucket_service_test.go @@ -57,7 +57,7 @@ type MockDeleter struct { orgID, bucketID platform.ID } -func (m *MockDeleter) DeleteBucket(orgID, bucketID platform.ID) error { +func (m *MockDeleter) DeleteBucket(_ context.Context, orgID, bucketID platform.ID) error { m.orgID, m.bucketID = orgID, bucketID return nil } diff --git a/storage/engine.go b/storage/engine.go index ed9284f141..e1a2cd1b2a 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -266,7 +266,7 @@ func (e *Engine) replayWAL() error { } } - return e.deleteBucketRangeLocked(en.OrgID, en.BucketID, en.Min, en.Max, pred) + return e.deleteBucketRangeLocked(context.Background(), en.OrgID, en.BucketID, en.Min, en.Max, pred) } return nil @@ -525,12 +525,17 @@ func (e *Engine) CommitSegments(ctx context.Context, segs []string, fn func() er } // DeleteBucket deletes an entire bucket from the storage engine. -func (e *Engine) DeleteBucket(orgID, bucketID platform.ID) error { - return e.DeleteBucketRange(orgID, bucketID, math.MinInt64, math.MaxInt64) +func (e *Engine) DeleteBucket(ctx context.Context, orgID, bucketID platform.ID) error { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + return e.DeleteBucketRange(ctx, orgID, bucketID, math.MinInt64, math.MaxInt64) } // DeleteBucketRange deletes an entire bucket from the storage engine. -func (e *Engine) DeleteBucketRange(orgID, bucketID platform.ID, min, max int64) error { +func (e *Engine) DeleteBucketRange(ctx context.Context, orgID, bucketID platform.ID, min, max int64) error { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + e.mu.RLock() defer e.mu.RUnlock() if e.closing == nil { @@ -542,13 +547,14 @@ func (e *Engine) DeleteBucketRange(orgID, bucketID platform.ID, min, max int64) return err } - return e.deleteBucketRangeLocked(orgID, bucketID, min, max, nil) + return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, nil) } // DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data // deleted must be in [min, max], and the key must match the predicate if provided. -func (e *Engine) DeleteBucketRangePredicate(orgID, bucketID platform.ID, - min, max int64, pred tsm1.Predicate) error { +func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred tsm1.Predicate) error { + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() e.mu.RLock() defer e.mu.RUnlock() @@ -567,20 +573,18 @@ func (e *Engine) DeleteBucketRangePredicate(orgID, bucketID platform.ID, return err } - return e.deleteBucketRangeLocked(orgID, bucketID, min, max, pred) + return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, pred) } // deleteBucketRangeLocked does the work of deleting a bucket range and must be called under // some sort of lock. -func (e *Engine) deleteBucketRangeLocked(orgID, bucketID platform.ID, - min, max int64, pred tsm1.Predicate) error { - +func (e *Engine) deleteBucketRangeLocked(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred tsm1.Predicate) error { // TODO(edd): we need to clean up how we're encoding the prefix so that we // don't have to remember to get it right everywhere we need to touch TSM data. encoded := tsdb.EncodeName(orgID, bucketID) name := models.EscapeMeasurement(encoded[:]) - return e.engine.DeletePrefixRange(name, min, max, pred) + return e.engine.DeletePrefixRange(ctx, name, min, max, pred) } // SeriesCardinality returns the number of series in the engine. diff --git a/storage/engine_test.go b/storage/engine_test.go index 1191ce4c49..1ffa295d8b 100644 --- a/storage/engine_test.go +++ b/storage/engine_test.go @@ -244,7 +244,7 @@ func TestEngine_DeleteBucket(t *testing.T) { } // Remove the original bucket. - if err := engine.DeleteBucket(engine.org, engine.bucket); err != nil { + if err := engine.DeleteBucket(context.Background(), engine.org, engine.bucket); err != nil { t.Fatal(err) } @@ -309,7 +309,7 @@ func TestEngine_DeleteBucket_Predicate(t *testing.T) { } // Remove the matching series. - if err := engine.DeleteBucketRangePredicate(engine.org, engine.bucket, + if err := engine.DeleteBucketRangePredicate(context.Background(), engine.org, engine.bucket, math.MinInt64, math.MaxInt64, pred); err != nil { t.Fatal(err) } @@ -339,7 +339,7 @@ func TestEngine_DeleteBucket_Predicate(t *testing.T) { } // Remove the matching series. - if err := engine.DeleteBucketRangePredicate(engine.org, engine.bucket, + if err := engine.DeleteBucketRangePredicate(context.Background(), engine.org, engine.bucket, math.MinInt64, math.MaxInt64, pred); err != nil { t.Fatal(err) } @@ -508,7 +508,7 @@ func BenchmarkDeleteBucket(b *testing.B) { b.Run(fmt.Sprintf("cardinality_%d", card), func(b *testing.B) { setup(card) for i := 0; i < b.N; i++ { - if err := engine.DeleteBucket(engine.org, engine.bucket); err != nil { + if err := engine.DeleteBucket(context.Background(), engine.org, engine.bucket); err != nil { b.Fatal(err) } diff --git a/storage/retention.go b/storage/retention.go index 1f3575375a..670a8c1ac4 100644 --- a/storage/retention.go +++ b/storage/retention.go @@ -3,10 +3,11 @@ package storage import ( "context" "errors" - "github.com/influxdata/influxdb/tsdb/tsm1" "math" "time" + "github.com/influxdata/influxdb/tsdb/tsm1" + "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/logger" @@ -20,7 +21,7 @@ const ( // A Deleter implementation is capable of deleting data from a storage engine. type Deleter interface { - DeleteBucketRange(orgID, bucketID influxdb.ID, min, max int64) error + DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64) error } // A Snapshotter implementation can take snapshots of the entire engine. @@ -130,7 +131,7 @@ func (s *retentionEnforcer) expireData(ctx context.Context, buckets []*influxdb. continue } - span, _ := tracing.StartSpanFromContext(ctx) + span, ctx := tracing.StartSpanFromContext(ctx) span.LogKV( "bucket", b.Name, "org_id", b.OrgID, @@ -138,7 +139,7 @@ func (s *retentionEnforcer) expireData(ctx context.Context, buckets []*influxdb. "retention_policy", b.RetentionPolicyName) max := now.Add(-b.RetentionPeriod).UnixNano() - err := s.Engine.DeleteBucketRange(b.OrgID, b.ID, math.MinInt64, max) + err := s.Engine.DeleteBucketRange(ctx, b.OrgID, b.ID, math.MinInt64, max) if err != nil { logger.Info("unable to delete bucket range", zap.String("bucket id", b.ID.String()), diff --git a/storage/retention_test.go b/storage/retention_test.go index 7086384783..d01210a8e3 100644 --- a/storage/retention_test.go +++ b/storage/retention_test.go @@ -58,7 +58,7 @@ func TestRetentionService(t *testing.T) { } gotMatched := map[string]struct{}{} - engine.DeleteBucketRangeFn = func(orgID, bucketID influxdb.ID, from, to int64) error { + engine.DeleteBucketRangeFn = func(ctx context.Context, orgID, bucketID influxdb.ID, from, to int64) error { if from != math.MinInt64 { t.Fatalf("got from %d, expected %d", from, math.MinInt64) } @@ -144,17 +144,17 @@ func genMeasurementName() []byte { } type TestEngine struct { - DeleteBucketRangeFn func(influxdb.ID, influxdb.ID, int64, int64) error + DeleteBucketRangeFn func(context.Context, influxdb.ID, influxdb.ID, int64, int64) error } func NewTestEngine() *TestEngine { return &TestEngine{ - DeleteBucketRangeFn: func(influxdb.ID, influxdb.ID, int64, int64) error { return nil }, + DeleteBucketRangeFn: func(context.Context, influxdb.ID, influxdb.ID, int64, int64) error { return nil }, } } -func (e *TestEngine) DeleteBucketRange(orgID, bucketID influxdb.ID, min, max int64) error { - return e.DeleteBucketRangeFn(orgID, bucketID, min, max) +func (e *TestEngine) DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64) error { + return e.DeleteBucketRangeFn(ctx, orgID, bucketID, min, max) } type TestSnapshotter struct{} diff --git a/tsdb/tsm1/cache.go b/tsdb/tsm1/cache.go index 5c3920baa3..9e63bf83a5 100644 --- a/tsdb/tsm1/cache.go +++ b/tsdb/tsm1/cache.go @@ -2,12 +2,14 @@ package tsm1 import ( "bytes" + "context" "fmt" "math" "sync" "sync/atomic" "time" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/storage/wal" "github.com/influxdata/influxdb/tsdb" @@ -386,7 +388,10 @@ func (c *Cache) Values(key []byte) Values { // DeleteBucketRange removes values for all keys containing points // with timestamps between min and max contained in the bucket identified // by name from the cache. -func (c *Cache) DeleteBucketRange(name []byte, min, max int64, pred Predicate) { +func (c *Cache) DeleteBucketRange(ctx context.Context, name []byte, min, max int64, pred Predicate) { + span, _ := tracing.StartSpanFromContext(ctx) + defer span.Finish() + // TODO(edd/jeff): find a way to optimize lock usage c.mu.Lock() defer c.mu.Unlock() @@ -498,7 +503,7 @@ func (cl *CacheLoader) Load(cache *Cache) error { encoded := tsdb.EncodeName(en.OrgID, en.BucketID) name := models.EscapeMeasurement(encoded[:]) - cache.DeleteBucketRange(name, en.Min, en.Max, pred) + cache.DeleteBucketRange(context.Background(), name, en.Min, en.Max, pred) return nil } diff --git a/tsdb/tsm1/cache_test.go b/tsdb/tsm1/cache_test.go index 2ee86304d0..be4c045605 100644 --- a/tsdb/tsm1/cache_test.go +++ b/tsdb/tsm1/cache_test.go @@ -1,6 +1,7 @@ package tsm1 import ( + "context" "errors" "fmt" "io/ioutil" @@ -176,7 +177,7 @@ func TestCache_Cache_DeleteBucketRange(t *testing.T) { t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys) } - c.DeleteBucketRange([]byte("bar"), 2, math.MaxInt64, nil) + c.DeleteBucketRange(context.Background(), []byte("bar"), 2, math.MaxInt64, nil) if exp, keys := [][]byte{[]byte("bar"), []byte("foo")}, c.Keys(); !reflect.DeepEqual(keys, exp) { t.Fatalf("cache keys incorrect after delete, exp %v, got %v", exp, keys) @@ -215,7 +216,7 @@ func TestCache_DeleteBucketRange_NoValues(t *testing.T) { t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys) } - c.DeleteBucketRange([]byte("foo"), math.MinInt64, math.MaxInt64, nil) + c.DeleteBucketRange(context.Background(), []byte("foo"), math.MinInt64, math.MaxInt64, nil) if exp, keys := 0, len(c.Keys()); !reflect.DeepEqual(keys, exp) { t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys) @@ -250,7 +251,7 @@ func TestCache_DeleteBucketRange_NotSorted(t *testing.T) { t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys) } - c.DeleteBucketRange([]byte("foo"), 1, 3, nil) + c.DeleteBucketRange(context.Background(), []byte("foo"), 1, 3, nil) if exp, keys := 0, len(c.Keys()); !reflect.DeepEqual(keys, exp) { t.Fatalf("cache keys incorrect after delete, exp %v, got %v", exp, keys) @@ -268,7 +269,7 @@ func TestCache_DeleteBucketRange_NotSorted(t *testing.T) { func TestCache_DeleteBucketRange_NonExistent(t *testing.T) { c := NewCache(1024) - c.DeleteBucketRange([]byte("bar"), math.MinInt64, math.MaxInt64, nil) + c.DeleteBucketRange(context.Background(), []byte("bar"), math.MinInt64, math.MaxInt64, nil) if got, exp := c.Size(), uint64(0); exp != got { t.Fatalf("cache size incorrect exp %d, got %d", exp, got) @@ -300,7 +301,7 @@ func TestCache_Cache_DeleteBucketRange_WithPredicate(t *testing.T) { t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys) } - c.DeleteBucketRange([]byte("f"), 2, math.MaxInt64, stringPredicate("fee")) + c.DeleteBucketRange(context.Background(), []byte("f"), 2, math.MaxInt64, stringPredicate("fee")) if exp, keys := [][]byte{[]byte("fee"), []byte("foo")}, c.Keys(); !reflect.DeepEqual(keys, exp) { t.Fatalf("cache keys incorrect after delete, exp %v, got %v", exp, keys) diff --git a/tsdb/tsm1/engine_delete_prefix.go b/tsdb/tsm1/engine_delete_prefix.go index bac20490d7..d996c5e564 100644 --- a/tsdb/tsm1/engine_delete_prefix.go +++ b/tsdb/tsm1/engine_delete_prefix.go @@ -2,9 +2,11 @@ package tsm1 import ( "bytes" + "context" "math" "sync" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/bytesutil" "github.com/influxdata/influxdb/tsdb" @@ -14,7 +16,9 @@ import ( // DeletePrefixRange removes all TSM data belonging to a bucket, and removes all index // and series file data associated with the bucket. The provided time range ensures // that only bucket data for that range is removed. -func (e *Engine) DeletePrefixRange(name []byte, min, max int64, pred Predicate) error { +func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, max int64, pred Predicate) error { + span, ctx := tracing.StartSpanFromContext(rootCtx) + defer span.Finish() // TODO(jeff): we need to block writes to this prefix while deletes are in progress // otherwise we can end up in a situation where we have staged data in the cache or // WAL that was deleted from the index, or worse. This needs to happen at a higher @@ -26,9 +30,11 @@ func (e *Engine) DeletePrefixRange(name []byte, min, max int64, pred Predicate) // Ensure that the index does not compact away the measurement or series we're // going to delete before we're done with them. + span, _ = tracing.StartSpanFromContextWithOperationName(rootCtx, "disable index compactions") e.index.DisableCompactions() defer e.index.EnableCompactions() e.index.Wait() + span.Finish() // Disable and abort running compactions so that tombstones added existing tsm // files don't get removed. This would cause deleted measurements/series to @@ -36,11 +42,15 @@ func (e *Engine) DeletePrefixRange(name []byte, min, max int64, pred Predicate) // so that snapshotting does not stop while writing out tombstones. If it is stopped, // and writing tombstones takes a long time, writes can get rejected due to the cache // filling up. + span, _ = tracing.StartSpanFromContextWithOperationName(rootCtx, "disable tsm compactions") e.disableLevelCompactions(true) defer e.enableLevelCompactions(true) + span.Finish() + span, _ = tracing.StartSpanFromContextWithOperationName(rootCtx, "disable series file compactions") e.sfile.DisableCompactions() defer e.sfile.EnableCompactions() + span.Finish() // TODO(jeff): are the query language values still a thing? // Min and max time in the engine are slightly different from the query language values. @@ -64,6 +74,11 @@ func (e *Engine) DeletePrefixRange(name []byte, min, max int64, pred Predicate) possiblyDead.keys = make(map[string]struct{}) if err := e.FileStore.Apply(func(r TSMFile) error { + // TODO(edd): tracing this deep down is currently speculative, so I have + // not added the tracing into the TSMReader API. + span, _ := tracing.StartSpanFromContextWithOperationName(rootCtx, "TSMFile delete prefix") + defer span.Finish() + return r.DeletePrefix(name, min, max, pred, func(key []byte) { possiblyDead.Lock() possiblyDead.keys[string(key)] = struct{}{} @@ -77,6 +92,11 @@ func (e *Engine) DeletePrefixRange(name []byte, min, max int64, pred Predicate) // ApplySerialEntryFn cannot return an error in this invocation. _ = e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error { + // TODO(edd): tracing this deep down is currently speculative, so I have + // not added the tracing into the Cache API. + span, _ := tracing.StartSpanFromContextWithOperationName(rootCtx, "Cache find delete keys") + defer span.Finish() + if !bytes.HasPrefix(k, name) { return nil } @@ -94,14 +114,21 @@ func (e *Engine) DeletePrefixRange(name []byte, min, max int64, pred Predicate) }) // Sort the series keys because ApplyEntryFn iterates over the keys randomly. + sortSpan, _ := tracing.StartSpanFromContextWithOperationName(rootCtx, "Cache sort keys") bytesutil.Sort(deleteKeys) + sortSpan.Finish() // Delete from the cache. - e.Cache.DeleteBucketRange(name, min, max, pred) + e.Cache.DeleteBucketRange(ctx, name, min, max, pred) // Now that all of the data is purged, we need to find if some keys are fully deleted // and if so, remove them from the index. if err := e.FileStore.Apply(func(r TSMFile) error { + // TODO(edd): tracing this deep down is currently speculative, so I have + // not added the tracing into the Engine API. + span, _ := tracing.StartSpanFromContextWithOperationName(rootCtx, "TSMFile determine fully deleted") + defer span.Finish() + possiblyDead.RLock() defer possiblyDead.RUnlock() @@ -137,6 +164,11 @@ func (e *Engine) DeletePrefixRange(name []byte, min, max int64, pred Predicate) // ApplySerialEntryFn cannot return an error in this invocation. _ = e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error { + // TODO(edd): tracing this deep down is currently speculative, so I have + // not added the tracing into the Cache API. + span, _ := tracing.StartSpanFromContextWithOperationName(rootCtx, "Cache find delete keys") + defer span.Finish() + if !bytes.HasPrefix(k, name) { return nil } @@ -185,21 +217,26 @@ func (e *Engine) DeletePrefixRange(name []byte, min, max int64, pred Predicate) } // Remove the measurement from the index before the series file. + span, _ = tracing.StartSpanFromContextWithOperationName(rootCtx, "TSI drop measurement") if err := e.index.DropMeasurement(name); err != nil { return err } + span.Finish() // Iterate over the series ids we previously extracted from the index // and remove from the series file. + span, _ = tracing.StartSpanFromContextWithOperationName(rootCtx, "SFile Delete Series ID") set.ForEachNoLock(func(id tsdb.SeriesID) { if err = e.sfile.DeleteSeriesID(id); err != nil { return } }) + span.Finish() return err } // This is the slow path, when not dropping the entire bucket (measurement) + span, _ = tracing.StartSpanFromContextWithOperationName(rootCtx, "TSI/SFile Delete keys") for key := range possiblyDead.keys { // TODO(jeff): ugh reduce copies here keyb := []byte(key) @@ -219,6 +256,7 @@ func (e *Engine) DeletePrefixRange(name []byte, min, max int64, pred Predicate) return err } } + span.Finish() } return nil diff --git a/tsdb/tsm1/engine_delete_prefix_test.go b/tsdb/tsm1/engine_delete_prefix_test.go index a537f86a65..0400b51c94 100644 --- a/tsdb/tsm1/engine_delete_prefix_test.go +++ b/tsdb/tsm1/engine_delete_prefix_test.go @@ -3,11 +3,11 @@ package tsm1_test import ( "bytes" "context" - "github.com/influxdata/influxdb/tsdb/tsm1" "reflect" "testing" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/tsdb/tsm1" ) func TestEngine_DeletePrefix(t *testing.T) { @@ -43,7 +43,7 @@ func TestEngine_DeletePrefix(t *testing.T) { t.Fatalf("series count mismatch: exp %v, got %v", exp, got) } - if err := e.DeletePrefixRange([]byte("mm0"), 0, 3, nil); err != nil { + if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil); err != nil { t.Fatalf("failed to delete series: %v", err) } @@ -89,7 +89,7 @@ func TestEngine_DeletePrefix(t *testing.T) { iter.Close() // Deleting remaining series should remove them from the series. - if err := e.DeletePrefixRange([]byte("mm0"), 0, 9, nil); err != nil { + if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 9, nil); err != nil { t.Fatalf("failed to delete series: %v", err) } diff --git a/tsdb/tsm1/engine_test.go b/tsdb/tsm1/engine_test.go index 2e1692706f..8d6fd83e58 100644 --- a/tsdb/tsm1/engine_test.go +++ b/tsdb/tsm1/engine_test.go @@ -61,7 +61,7 @@ func TestIndex_SeriesIDSet(t *testing.T) { // Drop all the series for the gpu measurement and they should no longer // be in the series ID set. - if err := engine.DeletePrefixRange([]byte("gpu"), math.MinInt64, math.MaxInt64, nil); err != nil { + if err := engine.DeletePrefixRange(context.Background(), []byte("gpu"), math.MinInt64, math.MaxInt64, nil); err != nil { t.Fatal(err) } @@ -485,7 +485,7 @@ func (e *Engine) MustDeleteBucketRange(orgID, bucketID influxdb.ID, min, max int encoded := tsdb.EncodeName(orgID, bucketID) name := models.EscapeMeasurement(encoded[:]) - err := e.DeletePrefixRange(name, min, max, nil) + err := e.DeletePrefixRange(context.Background(), name, min, max, nil) if err != nil { panic(err) }