feat(influxql): Implement DELETE & DROP MEASUREMENT

pull/19387/head
Ben Johnson 2020-08-21 13:32:46 -06:00
parent 0780232b83
commit 1501351623
11 changed files with 159 additions and 87 deletions

View File

@ -115,8 +115,8 @@ func (t *TemporaryEngine) SeriesCardinality() int64 {
} }
// DeleteBucketRangePredicate will delete a bucket from the range and predicate. // DeleteBucketRangePredicate will delete a bucket from the range and predicate.
func (t *TemporaryEngine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error { func (t *TemporaryEngine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
return t.engine.DeleteBucketRangePredicate(ctx, orgID, bucketID, min, max, pred) return t.engine.DeleteBucketRangePredicate(ctx, orgID, bucketID, min, max, pred, opts)
} }

View File

@ -11,5 +11,10 @@ type Predicate interface {
// DeleteService will delete a bucket from the range and predict. // DeleteService will delete a bucket from the range and predict.
type DeleteService interface { type DeleteService interface {
DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID ID, min, max int64, pred Predicate) error DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID ID, min, max int64, pred Predicate, opts DeletePrefixRangeOptions) error
}
type DeletePrefixRangeOptions struct {
// If true, does not delete underlying series when all data has been deleted.
KeepSeries bool
} }

View File

@ -13,6 +13,7 @@ import (
pcontext "github.com/influxdata/influxdb/v2/context" pcontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/kit/tracing" "github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/predicate" "github.com/influxdata/influxdb/v2/predicate"
"github.com/influxdata/influxdb/v2/tsdb/tsm1"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -121,6 +122,7 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) {
dr.Start, dr.Start,
dr.Stop, dr.Stop,
dr.Predicate, dr.Predicate,
influxdb.DeletePrefixRangeOptions{KeepSeries: dr.KeepSeries},
) )
if err != nil { if err != nil {
h.HandleHTTPError(ctx, err, w) h.HandleHTTPError(ctx, err, w)
@ -155,28 +157,33 @@ func decodeDeleteRequest(ctx context.Context, r *http.Request, orgSvc influxdb.O
} }
type deleteRequest struct { type deleteRequest struct {
Org *influxdb.Organization Org *influxdb.Organization
Bucket *influxdb.Bucket Bucket *influxdb.Bucket
Start int64 Start int64
Stop int64 Stop int64
Predicate influxdb.Predicate Predicate influxdb.Predicate
KeepSeries bool
} }
type deleteRequestDecode struct { type deleteRequestDecode struct {
Start string `json:"start"` Start string `json:"start"`
Stop string `json:"stop"` Stop string `json:"stop"`
Predicate string `json:"predicate"` Predicate string `json:"predicate"`
PredicateBytes []byte `json:"predicate_bytes"`
KeepSeries bool `json:"keep_series"`
} }
// DeleteRequest is the request send over http to delete points. // DeleteRequest is the request send over http to delete points.
type DeleteRequest struct { type DeleteRequest struct {
OrgID string `json:"-"` OrgID string `json:"-"`
Org string `json:"-"` // org name Org string `json:"-"` // org name
BucketID string `json:"-"` BucketID string `json:"-"`
Bucket string `json:"-"` Bucket string `json:"-"`
Start string `json:"start"` Start string `json:"start"`
Stop string `json:"stop"` Stop string `json:"stop"`
Predicate string `json:"predicate"` Predicate string `json:"predicate"`
PredicateBytes []byte `json:"predicate_bytes"`
KeepSeries bool `json:"keep_series"`
} }
func (dr *deleteRequest) UnmarshalJSON(b []byte) error { func (dr *deleteRequest) UnmarshalJSON(b []byte) error {
@ -188,7 +195,8 @@ func (dr *deleteRequest) UnmarshalJSON(b []byte) error {
Err: err, Err: err,
} }
} }
*dr = deleteRequest{}
*dr = deleteRequest{KeepSeries: drd.KeepSeries}
start, err := time.Parse(time.RFC3339Nano, drd.Start) start, err := time.Parse(time.RFC3339Nano, drd.Start)
if err != nil { if err != nil {
return &influxdb.Error{ return &influxdb.Error{
@ -208,12 +216,22 @@ func (dr *deleteRequest) UnmarshalJSON(b []byte) error {
} }
} }
dr.Stop = stop.UnixNano() dr.Stop = stop.UnixNano()
node, err := predicate.Parse(drd.Predicate)
if err != nil { if len(drd.PredicateBytes) != 0 {
return err if dr.Predicate, err = tsm1.UnmarshalPredicate(drd.PredicateBytes); err != nil {
return err
}
} else {
node, err := predicate.Parse(drd.Predicate)
if err != nil {
return err
}
if dr.Predicate, err = predicate.New(node); err != nil {
return err
}
} }
dr.Predicate, err = predicate.New(node)
return err return nil
} }
// DeleteService sends data over HTTP to delete points. // DeleteService sends data over HTTP to delete points.

View File

@ -10,20 +10,20 @@ var _ influxdb.DeleteService = &DeleteService{}
// DeleteService is a mock delete server. // DeleteService is a mock delete server.
type DeleteService struct { type DeleteService struct {
DeleteBucketRangePredicateF func(tx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error DeleteBucketRangePredicateF func(tx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error
} }
// NewDeleteService returns a mock DeleteService where its methods will return // NewDeleteService returns a mock DeleteService where its methods will return
// zero values. // zero values.
func NewDeleteService() DeleteService { func NewDeleteService() DeleteService {
return DeleteService{ return DeleteService{
DeleteBucketRangePredicateF: func(tx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error { DeleteBucketRangePredicateF: func(tx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
return nil return nil
}, },
} }
} }
//DeleteBucketRangePredicate calls DeleteBucketRangePredicateF. //DeleteBucketRangePredicate calls DeleteBucketRangePredicateF.
func (s DeleteService) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error { func (s DeleteService) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
return s.DeleteBucketRangePredicateF(ctx, orgID, bucketID, min, max, pred) return s.DeleteBucketRangePredicateF(ctx, orgID, bucketID, min, max, pred, opts)
} }

View File

@ -336,7 +336,7 @@ func (e *Engine) replayWAL() error {
} }
} }
return e.deleteBucketRangeLocked(context.Background(), en.OrgID, en.BucketID, en.Min, en.Max, pred) return e.deleteBucketRangeLocked(context.Background(), en.OrgID, en.BucketID, en.Min, en.Max, pred, influxdb.DeletePrefixRangeOptions{KeepSeries: en.KeepSeries})
} }
return nil return nil
@ -663,12 +663,12 @@ func (e *Engine) DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb
return err return err
} }
return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, nil) return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, nil, influxdb.DeletePrefixRangeOptions{})
} }
// DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data // DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data
// deleted must be in [min, max], and the key must match the predicate if provided. // deleted must be in [min, max], and the key must match the predicate if provided.
func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error { func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
span, ctx := tracing.StartSpanFromContext(ctx) span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish() defer span.Finish()
@ -693,18 +693,18 @@ func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID
return err return err
} }
return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, pred) return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, pred, opts)
} }
// deleteBucketRangeLocked does the work of deleting a bucket range and must be called under // deleteBucketRangeLocked does the work of deleting a bucket range and must be called under
// some sort of lock. // some sort of lock.
func (e *Engine) deleteBucketRangeLocked(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred tsm1.Predicate) error { func (e *Engine) deleteBucketRangeLocked(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred tsm1.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
// TODO(edd): we need to clean up how we're encoding the prefix so that we // TODO(edd): we need to clean up how we're encoding the prefix so that we
// don't have to remember to get it right everywhere we need to touch TSM data. // don't have to remember to get it right everywhere we need to touch TSM data.
encoded := tsdb.EncodeName(orgID, bucketID) encoded := tsdb.EncodeName(orgID, bucketID)
name := models.EscapeMeasurement(encoded[:]) name := models.EscapeMeasurement(encoded[:])
return e.engine.DeletePrefixRange(ctx, name, min, max, pred) return e.engine.DeletePrefixRange(ctx, name, min, max, pred, opts)
} }
// CreateBackup creates a "snapshot" of all TSM data in the Engine. // CreateBackup creates a "snapshot" of all TSM data in the Engine.

View File

@ -311,7 +311,7 @@ func TestEngine_DeleteBucket_Predicate(t *testing.T) {
// Remove the matching series. // Remove the matching series.
if err := engine.DeleteBucketRangePredicate(context.Background(), engine.org, engine.bucket, if err := engine.DeleteBucketRangePredicate(context.Background(), engine.org, engine.bucket,
math.MinInt64, math.MaxInt64, pred); err != nil { math.MinInt64, math.MaxInt64, pred, influxdb.DeletePrefixRangeOptions{}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -341,7 +341,7 @@ func TestEngine_DeleteBucket_Predicate(t *testing.T) {
// Remove the matching series. // Remove the matching series.
if err := engine.DeleteBucketRangePredicate(context.Background(), engine.org, engine.bucket, if err := engine.DeleteBucketRangePredicate(context.Background(), engine.org, engine.bucket,
math.MinInt64, math.MaxInt64, pred); err != nil { math.MinInt64, math.MaxInt64, pred, influxdb.DeletePrefixRangeOptions{}); err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -63,6 +63,10 @@ const (
// DeleteBucketRangeWALEntryType indicates a delete bucket range entry. // DeleteBucketRangeWALEntryType indicates a delete bucket range entry.
DeleteBucketRangeWALEntryType WalEntryType = 0x04 DeleteBucketRangeWALEntryType WalEntryType = 0x04
// DeleteBucketRangeKeepSeriesWALEntryType indicates a delete bucket range entry
// but the underlying series are not deleted.
DeleteBucketRangeKeepSeriesWALEntryType WalEntryType = 0x05
) )
var ( var (
@ -993,10 +997,11 @@ func (w *WriteWALEntry) Type() WalEntryType {
// DeleteBucketRangeWALEntry represents the deletion of data in a bucket. // DeleteBucketRangeWALEntry represents the deletion of data in a bucket.
type DeleteBucketRangeWALEntry struct { type DeleteBucketRangeWALEntry struct {
OrgID influxdb.ID OrgID influxdb.ID
BucketID influxdb.ID BucketID influxdb.ID
Min, Max int64 Min, Max int64
Predicate []byte Predicate []byte
KeepSeries bool
} }
// MarshalBinary returns a binary representation of the entry in a new byte slice. // MarshalBinary returns a binary representation of the entry in a new byte slice.
@ -1062,6 +1067,9 @@ func (w *DeleteBucketRangeWALEntry) Encode(b []byte) ([]byte, error) {
// Type returns DeleteBucketRangeWALEntryType. // Type returns DeleteBucketRangeWALEntryType.
func (w *DeleteBucketRangeWALEntry) Type() WalEntryType { func (w *DeleteBucketRangeWALEntry) Type() WalEntryType {
if w.KeepSeries {
return DeleteBucketRangeKeepSeriesWALEntryType
}
return DeleteBucketRangeWALEntryType return DeleteBucketRangeWALEntryType
} }
@ -1203,13 +1211,15 @@ func (r *WALSegmentReader) Next() bool {
} }
// and marshal it and send it to the cache // and marshal it and send it to the cache
switch WalEntryType(entryType) { switch typ := WalEntryType(entryType); typ {
case WriteWALEntryType: case WriteWALEntryType:
r.entry = &WriteWALEntry{ r.entry = &WriteWALEntry{
Values: make(map[string][]value.Value), Values: make(map[string][]value.Value),
} }
case DeleteBucketRangeWALEntryType: case DeleteBucketRangeWALEntryType, DeleteBucketRangeKeepSeriesWALEntryType:
r.entry = &DeleteBucketRangeWALEntry{} r.entry = &DeleteBucketRangeWALEntry{
KeepSeries: typ == DeleteBucketRangeKeepSeriesWALEntryType,
}
default: default:
r.err = fmt.Errorf("unknown wal entry type: %v", entryType) r.err = fmt.Errorf("unknown wal entry type: %v", entryType)
return true return true

View File

@ -216,50 +216,87 @@ func TestWALWriter_WriteMulti_Multiple(t *testing.T) {
} }
func TestWALWriter_DeleteBucketRange(t *testing.T) { func TestWALWriter_DeleteBucketRange(t *testing.T) {
dir := MustTempDir() t.Run("DeleteSeries", func(t *testing.T) {
defer os.RemoveAll(dir) dir := MustTempDir()
f := MustTempFile(dir) defer os.RemoveAll(dir)
w := NewWALSegmentWriter(f) f := MustTempFile(dir)
w := NewWALSegmentWriter(f)
entry := &DeleteBucketRangeWALEntry{ entry := &DeleteBucketRangeWALEntry{
OrgID: influxdb.ID(1), OrgID: influxdb.ID(1),
BucketID: influxdb.ID(2), BucketID: influxdb.ID(2),
Min: 3, Min: 3,
Max: 4, Max: 4,
Predicate: []byte("predicate"), Predicate: []byte("predicate"),
} KeepSeries: false,
}
if err := w.Write(mustMarshalEntry(entry)); err != nil { if err := w.Write(mustMarshalEntry(entry)); err != nil {
fatal(t, "write points", err) fatal(t, "write points", err)
} } else if err := w.Flush(); err != nil {
fatal(t, "flush", err)
}
if err := w.Flush(); err != nil { if _, err := f.Seek(0, io.SeekStart); err != nil {
fatal(t, "flush", err) fatal(t, "seek", err)
} }
if _, err := f.Seek(0, io.SeekStart); err != nil { r := NewWALSegmentReader(f)
fatal(t, "seek", err) if !r.Next() {
} t.Fatalf("expected next, got false")
}
r := NewWALSegmentReader(f) if e, err := r.Read(); err != nil {
fatal(t, "read entry", err)
} else if e, ok := e.(*DeleteBucketRangeWALEntry); !ok {
t.Fatalf("expected WriteWALEntry: got %#v", e)
} else if !reflect.DeepEqual(entry, e) {
t.Fatalf("expected %+v but got %+v", entry, e)
} else if got, want := e.Type(), DeleteBucketRangeWALEntryType; got != want {
t.Fatalf("Type()=%v, want %v", got, want)
}
})
if !r.Next() { t.Run("KeepSeries", func(t *testing.T) {
t.Fatalf("expected next, got false") dir := MustTempDir()
} defer os.RemoveAll(dir)
f := MustTempFile(dir)
w := NewWALSegmentWriter(f)
we, err := r.Read() entry := &DeleteBucketRangeWALEntry{
if err != nil { OrgID: influxdb.ID(1),
fatal(t, "read entry", err) BucketID: influxdb.ID(2),
} Min: 3,
Max: 4,
Predicate: []byte("predicate"),
KeepSeries: true,
}
e, ok := we.(*DeleteBucketRangeWALEntry) if err := w.Write(mustMarshalEntry(entry)); err != nil {
if !ok { fatal(t, "write points", err)
t.Fatalf("expected WriteWALEntry: got %#v", e) } else if err := w.Flush(); err != nil {
} fatal(t, "flush", err)
}
if !reflect.DeepEqual(entry, e) { if _, err := f.Seek(0, io.SeekStart); err != nil {
t.Fatalf("expected %+v but got %+v", entry, e) fatal(t, "seek", err)
} }
r := NewWALSegmentReader(f)
if !r.Next() {
t.Fatalf("expected next, got false")
}
if e, err := r.Read(); err != nil {
fatal(t, "read entry", err)
} else if e, ok := e.(*DeleteBucketRangeWALEntry); !ok {
t.Fatalf("expected WriteWALEntry: got %#v", e)
} else if !reflect.DeepEqual(entry, e) {
t.Fatalf("expected %+v but got %+v", entry, e)
} else if got, want := e.Type(), DeleteBucketRangeKeepSeriesWALEntryType; got != want {
t.Fatalf("Type()=%v, want %v", got, want)
}
})
} }
func TestWAL_ClosedSegments(t *testing.T) { func TestWAL_ClosedSegments(t *testing.T) {

View File

@ -9,6 +9,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing" "github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb" "github.com/influxdata/influxdb/v2/tsdb"
@ -19,7 +20,7 @@ import (
// DeletePrefixRange removes all TSM data belonging to a bucket, and removes all index // DeletePrefixRange removes all TSM data belonging to a bucket, and removes all index
// and series file data associated with the bucket. The provided time range ensures // and series file data associated with the bucket. The provided time range ensures
// that only bucket data for that range is removed. // that only bucket data for that range is removed.
func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, max int64, pred Predicate) error { func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, max int64, pred Predicate, opts influxdb.DeletePrefixRangeOptions) error {
span, ctx := tracing.StartSpanFromContext(rootCtx) span, ctx := tracing.StartSpanFromContext(rootCtx)
span.LogKV("name_prefix", fmt.Sprintf("%x", name), span.LogKV("name_prefix", fmt.Sprintf("%x", name),
"min", time.Unix(0, min), "max", time.Unix(0, max), "min", time.Unix(0, min), "max", time.Unix(0, max),
@ -198,7 +199,7 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma
span.LogKV("cache_cardinality", keysChecked) span.LogKV("cache_cardinality", keysChecked)
span.Finish() span.Finish()
if len(possiblyDead.keys) > 0 { if len(possiblyDead.keys) > 0 && !opts.KeepSeries {
buf := make([]byte, 1024) buf := make([]byte, 1024)
// TODO(jeff): all of these methods have possible errors which opens us to partial // TODO(jeff): all of these methods have possible errors which opens us to partial

View File

@ -7,6 +7,7 @@ import (
"reflect" "reflect"
"testing" "testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb/tsm1" "github.com/influxdata/influxdb/v2/tsdb/tsm1"
) )
@ -44,7 +45,7 @@ func TestEngine_DeletePrefix(t *testing.T) {
t.Fatalf("series count mismatch: exp %v, got %v", exp, got) t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
} }
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil); err != nil { if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
t.Fatalf("failed to delete series: %v", err) t.Fatalf("failed to delete series: %v", err)
} }
@ -90,7 +91,7 @@ func TestEngine_DeletePrefix(t *testing.T) {
iter.Close() iter.Close()
// Deleting remaining series should remove them from the series. // Deleting remaining series should remove them from the series.
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 9, nil); err != nil { if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 9, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
t.Fatalf("failed to delete series: %v", err) t.Fatalf("failed to delete series: %v", err)
} }
@ -149,7 +150,7 @@ func BenchmarkEngine_DeletePrefixRange(b *testing.B) {
} }
b.StartTimer() b.StartTimer()
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil); err != nil { if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
b.Fatal(err) b.Fatal(err)
} else if err := e.Close(); err != nil { } else if err := e.Close(); err != nil {
b.Fatal(err) b.Fatal(err)

View File

@ -58,7 +58,7 @@ func TestIndex_SeriesIDSet(t *testing.T) {
// Drop all the series for the gpu measurement and they should no longer // Drop all the series for the gpu measurement and they should no longer
// be in the series ID set. // be in the series ID set.
if err := engine.DeletePrefixRange(context.Background(), []byte("gpu"), math.MinInt64, math.MaxInt64, nil); err != nil { if err := engine.DeletePrefixRange(context.Background(), []byte("gpu"), math.MinInt64, math.MaxInt64, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -331,7 +331,7 @@ func BenchmarkEngine_DeletePrefixRange_Cache(b *testing.B) {
} }
b.StartTimer() b.StartTimer()
if err := e.DeletePrefixRange(context.Background(), []byte("0000000011221111000000001122112"), 0, math.MaxInt64, nil); err != nil { if err := e.DeletePrefixRange(context.Background(), []byte("0000000011221111000000001122112"), 0, math.MaxInt64, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
b.Fatal(err) b.Fatal(err)
} }
} }
@ -346,7 +346,7 @@ func BenchmarkEngine_DeletePrefixRange_Cache(b *testing.B) {
} }
b.StartTimer() b.StartTimer()
if err := e.DeletePrefixRange(context.Background(), []byte("fooasdasdasdasdasd"), 0, math.MaxInt64, nil); err != nil { if err := e.DeletePrefixRange(context.Background(), []byte("fooasdasdasdasdasd"), 0, math.MaxInt64, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
b.Fatal(err) b.Fatal(err)
} }
} }
@ -534,7 +534,7 @@ func (e *Engine) MustDeleteBucketRange(orgID, bucketID influxdb.ID, min, max int
encoded := tsdb.EncodeName(orgID, bucketID) encoded := tsdb.EncodeName(orgID, bucketID)
name := models.EscapeMeasurement(encoded[:]) name := models.EscapeMeasurement(encoded[:])
err := e.DeletePrefixRange(context.Background(), name, min, max, nil) err := e.DeletePrefixRange(context.Background(), name, min, max, nil, influxdb.DeletePrefixRangeOptions{})
if err != nil { if err != nil {
panic(err) panic(err)
} }