feat(influxql): Implement DELETE & DROP MEASUREMENT
parent
0780232b83
commit
1501351623
|
@ -115,8 +115,8 @@ func (t *TemporaryEngine) SeriesCardinality() int64 {
|
|||
}
|
||||
|
||||
// DeleteBucketRangePredicate will delete a bucket from the range and predicate.
|
||||
func (t *TemporaryEngine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error {
|
||||
return t.engine.DeleteBucketRangePredicate(ctx, orgID, bucketID, min, max, pred)
|
||||
func (t *TemporaryEngine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
|
||||
return t.engine.DeleteBucketRangePredicate(ctx, orgID, bucketID, min, max, pred, opts)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -11,5 +11,10 @@ type Predicate interface {
|
|||
|
||||
// DeleteService will delete a bucket from the range and predict.
|
||||
type DeleteService interface {
|
||||
DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID ID, min, max int64, pred Predicate) error
|
||||
DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID ID, min, max int64, pred Predicate, opts DeletePrefixRangeOptions) error
|
||||
}
|
||||
|
||||
type DeletePrefixRangeOptions struct {
|
||||
// If true, does not delete underlying series when all data has been deleted.
|
||||
KeepSeries bool
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
pcontext "github.com/influxdata/influxdb/v2/context"
|
||||
"github.com/influxdata/influxdb/v2/kit/tracing"
|
||||
"github.com/influxdata/influxdb/v2/predicate"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/tsm1"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -121,6 +122,7 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) {
|
|||
dr.Start,
|
||||
dr.Stop,
|
||||
dr.Predicate,
|
||||
influxdb.DeletePrefixRangeOptions{KeepSeries: dr.KeepSeries},
|
||||
)
|
||||
if err != nil {
|
||||
h.HandleHTTPError(ctx, err, w)
|
||||
|
@ -155,28 +157,33 @@ func decodeDeleteRequest(ctx context.Context, r *http.Request, orgSvc influxdb.O
|
|||
}
|
||||
|
||||
type deleteRequest struct {
|
||||
Org *influxdb.Organization
|
||||
Bucket *influxdb.Bucket
|
||||
Start int64
|
||||
Stop int64
|
||||
Predicate influxdb.Predicate
|
||||
Org *influxdb.Organization
|
||||
Bucket *influxdb.Bucket
|
||||
Start int64
|
||||
Stop int64
|
||||
Predicate influxdb.Predicate
|
||||
KeepSeries bool
|
||||
}
|
||||
|
||||
type deleteRequestDecode struct {
|
||||
Start string `json:"start"`
|
||||
Stop string `json:"stop"`
|
||||
Predicate string `json:"predicate"`
|
||||
Start string `json:"start"`
|
||||
Stop string `json:"stop"`
|
||||
Predicate string `json:"predicate"`
|
||||
PredicateBytes []byte `json:"predicate_bytes"`
|
||||
KeepSeries bool `json:"keep_series"`
|
||||
}
|
||||
|
||||
// DeleteRequest is the request send over http to delete points.
|
||||
type DeleteRequest struct {
|
||||
OrgID string `json:"-"`
|
||||
Org string `json:"-"` // org name
|
||||
BucketID string `json:"-"`
|
||||
Bucket string `json:"-"`
|
||||
Start string `json:"start"`
|
||||
Stop string `json:"stop"`
|
||||
Predicate string `json:"predicate"`
|
||||
OrgID string `json:"-"`
|
||||
Org string `json:"-"` // org name
|
||||
BucketID string `json:"-"`
|
||||
Bucket string `json:"-"`
|
||||
Start string `json:"start"`
|
||||
Stop string `json:"stop"`
|
||||
Predicate string `json:"predicate"`
|
||||
PredicateBytes []byte `json:"predicate_bytes"`
|
||||
KeepSeries bool `json:"keep_series"`
|
||||
}
|
||||
|
||||
func (dr *deleteRequest) UnmarshalJSON(b []byte) error {
|
||||
|
@ -188,7 +195,8 @@ func (dr *deleteRequest) UnmarshalJSON(b []byte) error {
|
|||
Err: err,
|
||||
}
|
||||
}
|
||||
*dr = deleteRequest{}
|
||||
|
||||
*dr = deleteRequest{KeepSeries: drd.KeepSeries}
|
||||
start, err := time.Parse(time.RFC3339Nano, drd.Start)
|
||||
if err != nil {
|
||||
return &influxdb.Error{
|
||||
|
@ -208,12 +216,22 @@ func (dr *deleteRequest) UnmarshalJSON(b []byte) error {
|
|||
}
|
||||
}
|
||||
dr.Stop = stop.UnixNano()
|
||||
node, err := predicate.Parse(drd.Predicate)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
if len(drd.PredicateBytes) != 0 {
|
||||
if dr.Predicate, err = tsm1.UnmarshalPredicate(drd.PredicateBytes); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
node, err := predicate.Parse(drd.Predicate)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if dr.Predicate, err = predicate.New(node); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
dr.Predicate, err = predicate.New(node)
|
||||
return err
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteService sends data over HTTP to delete points.
|
||||
|
|
|
@ -10,20 +10,20 @@ var _ influxdb.DeleteService = &DeleteService{}
|
|||
|
||||
// DeleteService is a mock delete server.
|
||||
type DeleteService struct {
|
||||
DeleteBucketRangePredicateF func(tx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error
|
||||
DeleteBucketRangePredicateF func(tx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error
|
||||
}
|
||||
|
||||
// NewDeleteService returns a mock DeleteService where its methods will return
|
||||
// zero values.
|
||||
func NewDeleteService() DeleteService {
|
||||
return DeleteService{
|
||||
DeleteBucketRangePredicateF: func(tx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error {
|
||||
DeleteBucketRangePredicateF: func(tx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
//DeleteBucketRangePredicate calls DeleteBucketRangePredicateF.
|
||||
func (s DeleteService) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error {
|
||||
return s.DeleteBucketRangePredicateF(ctx, orgID, bucketID, min, max, pred)
|
||||
func (s DeleteService) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
|
||||
return s.DeleteBucketRangePredicateF(ctx, orgID, bucketID, min, max, pred, opts)
|
||||
}
|
||||
|
|
|
@ -336,7 +336,7 @@ func (e *Engine) replayWAL() error {
|
|||
}
|
||||
}
|
||||
|
||||
return e.deleteBucketRangeLocked(context.Background(), en.OrgID, en.BucketID, en.Min, en.Max, pred)
|
||||
return e.deleteBucketRangeLocked(context.Background(), en.OrgID, en.BucketID, en.Min, en.Max, pred, influxdb.DeletePrefixRangeOptions{KeepSeries: en.KeepSeries})
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -663,12 +663,12 @@ func (e *Engine) DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb
|
|||
return err
|
||||
}
|
||||
|
||||
return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, nil)
|
||||
return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, nil, influxdb.DeletePrefixRangeOptions{})
|
||||
}
|
||||
|
||||
// DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data
|
||||
// deleted must be in [min, max], and the key must match the predicate if provided.
|
||||
func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error {
|
||||
func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
|
||||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
|
@ -693,18 +693,18 @@ func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID
|
|||
return err
|
||||
}
|
||||
|
||||
return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, pred)
|
||||
return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, pred, opts)
|
||||
}
|
||||
|
||||
// deleteBucketRangeLocked does the work of deleting a bucket range and must be called under
|
||||
// some sort of lock.
|
||||
func (e *Engine) deleteBucketRangeLocked(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred tsm1.Predicate) error {
|
||||
func (e *Engine) deleteBucketRangeLocked(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred tsm1.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
|
||||
// TODO(edd): we need to clean up how we're encoding the prefix so that we
|
||||
// don't have to remember to get it right everywhere we need to touch TSM data.
|
||||
encoded := tsdb.EncodeName(orgID, bucketID)
|
||||
name := models.EscapeMeasurement(encoded[:])
|
||||
|
||||
return e.engine.DeletePrefixRange(ctx, name, min, max, pred)
|
||||
return e.engine.DeletePrefixRange(ctx, name, min, max, pred, opts)
|
||||
}
|
||||
|
||||
// CreateBackup creates a "snapshot" of all TSM data in the Engine.
|
||||
|
|
|
@ -311,7 +311,7 @@ func TestEngine_DeleteBucket_Predicate(t *testing.T) {
|
|||
|
||||
// Remove the matching series.
|
||||
if err := engine.DeleteBucketRangePredicate(context.Background(), engine.org, engine.bucket,
|
||||
math.MinInt64, math.MaxInt64, pred); err != nil {
|
||||
math.MinInt64, math.MaxInt64, pred, influxdb.DeletePrefixRangeOptions{}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -341,7 +341,7 @@ func TestEngine_DeleteBucket_Predicate(t *testing.T) {
|
|||
|
||||
// Remove the matching series.
|
||||
if err := engine.DeleteBucketRangePredicate(context.Background(), engine.org, engine.bucket,
|
||||
math.MinInt64, math.MaxInt64, pred); err != nil {
|
||||
math.MinInt64, math.MaxInt64, pred, influxdb.DeletePrefixRangeOptions{}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -63,6 +63,10 @@ const (
|
|||
|
||||
// DeleteBucketRangeWALEntryType indicates a delete bucket range entry.
|
||||
DeleteBucketRangeWALEntryType WalEntryType = 0x04
|
||||
|
||||
// DeleteBucketRangeKeepSeriesWALEntryType indicates a delete bucket range entry
|
||||
// but the underlying series are not deleted.
|
||||
DeleteBucketRangeKeepSeriesWALEntryType WalEntryType = 0x05
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -993,10 +997,11 @@ func (w *WriteWALEntry) Type() WalEntryType {
|
|||
|
||||
// DeleteBucketRangeWALEntry represents the deletion of data in a bucket.
|
||||
type DeleteBucketRangeWALEntry struct {
|
||||
OrgID influxdb.ID
|
||||
BucketID influxdb.ID
|
||||
Min, Max int64
|
||||
Predicate []byte
|
||||
OrgID influxdb.ID
|
||||
BucketID influxdb.ID
|
||||
Min, Max int64
|
||||
Predicate []byte
|
||||
KeepSeries bool
|
||||
}
|
||||
|
||||
// MarshalBinary returns a binary representation of the entry in a new byte slice.
|
||||
|
@ -1062,6 +1067,9 @@ func (w *DeleteBucketRangeWALEntry) Encode(b []byte) ([]byte, error) {
|
|||
|
||||
// Type returns DeleteBucketRangeWALEntryType.
|
||||
func (w *DeleteBucketRangeWALEntry) Type() WalEntryType {
|
||||
if w.KeepSeries {
|
||||
return DeleteBucketRangeKeepSeriesWALEntryType
|
||||
}
|
||||
return DeleteBucketRangeWALEntryType
|
||||
}
|
||||
|
||||
|
@ -1203,13 +1211,15 @@ func (r *WALSegmentReader) Next() bool {
|
|||
}
|
||||
|
||||
// and marshal it and send it to the cache
|
||||
switch WalEntryType(entryType) {
|
||||
switch typ := WalEntryType(entryType); typ {
|
||||
case WriteWALEntryType:
|
||||
r.entry = &WriteWALEntry{
|
||||
Values: make(map[string][]value.Value),
|
||||
}
|
||||
case DeleteBucketRangeWALEntryType:
|
||||
r.entry = &DeleteBucketRangeWALEntry{}
|
||||
case DeleteBucketRangeWALEntryType, DeleteBucketRangeKeepSeriesWALEntryType:
|
||||
r.entry = &DeleteBucketRangeWALEntry{
|
||||
KeepSeries: typ == DeleteBucketRangeKeepSeriesWALEntryType,
|
||||
}
|
||||
default:
|
||||
r.err = fmt.Errorf("unknown wal entry type: %v", entryType)
|
||||
return true
|
||||
|
|
|
@ -216,50 +216,87 @@ func TestWALWriter_WriteMulti_Multiple(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestWALWriter_DeleteBucketRange(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
f := MustTempFile(dir)
|
||||
w := NewWALSegmentWriter(f)
|
||||
t.Run("DeleteSeries", func(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
f := MustTempFile(dir)
|
||||
w := NewWALSegmentWriter(f)
|
||||
|
||||
entry := &DeleteBucketRangeWALEntry{
|
||||
OrgID: influxdb.ID(1),
|
||||
BucketID: influxdb.ID(2),
|
||||
Min: 3,
|
||||
Max: 4,
|
||||
Predicate: []byte("predicate"),
|
||||
}
|
||||
entry := &DeleteBucketRangeWALEntry{
|
||||
OrgID: influxdb.ID(1),
|
||||
BucketID: influxdb.ID(2),
|
||||
Min: 3,
|
||||
Max: 4,
|
||||
Predicate: []byte("predicate"),
|
||||
KeepSeries: false,
|
||||
}
|
||||
|
||||
if err := w.Write(mustMarshalEntry(entry)); err != nil {
|
||||
fatal(t, "write points", err)
|
||||
}
|
||||
if err := w.Write(mustMarshalEntry(entry)); err != nil {
|
||||
fatal(t, "write points", err)
|
||||
} else if err := w.Flush(); err != nil {
|
||||
fatal(t, "flush", err)
|
||||
}
|
||||
|
||||
if err := w.Flush(); err != nil {
|
||||
fatal(t, "flush", err)
|
||||
}
|
||||
if _, err := f.Seek(0, io.SeekStart); err != nil {
|
||||
fatal(t, "seek", err)
|
||||
}
|
||||
|
||||
if _, err := f.Seek(0, io.SeekStart); err != nil {
|
||||
fatal(t, "seek", err)
|
||||
}
|
||||
r := NewWALSegmentReader(f)
|
||||
if !r.Next() {
|
||||
t.Fatalf("expected next, got false")
|
||||
}
|
||||
|
||||
r := NewWALSegmentReader(f)
|
||||
if e, err := r.Read(); err != nil {
|
||||
fatal(t, "read entry", err)
|
||||
} else if e, ok := e.(*DeleteBucketRangeWALEntry); !ok {
|
||||
t.Fatalf("expected WriteWALEntry: got %#v", e)
|
||||
} else if !reflect.DeepEqual(entry, e) {
|
||||
t.Fatalf("expected %+v but got %+v", entry, e)
|
||||
} else if got, want := e.Type(), DeleteBucketRangeWALEntryType; got != want {
|
||||
t.Fatalf("Type()=%v, want %v", got, want)
|
||||
}
|
||||
})
|
||||
|
||||
if !r.Next() {
|
||||
t.Fatalf("expected next, got false")
|
||||
}
|
||||
t.Run("KeepSeries", func(t *testing.T) {
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
f := MustTempFile(dir)
|
||||
w := NewWALSegmentWriter(f)
|
||||
|
||||
we, err := r.Read()
|
||||
if err != nil {
|
||||
fatal(t, "read entry", err)
|
||||
}
|
||||
entry := &DeleteBucketRangeWALEntry{
|
||||
OrgID: influxdb.ID(1),
|
||||
BucketID: influxdb.ID(2),
|
||||
Min: 3,
|
||||
Max: 4,
|
||||
Predicate: []byte("predicate"),
|
||||
KeepSeries: true,
|
||||
}
|
||||
|
||||
e, ok := we.(*DeleteBucketRangeWALEntry)
|
||||
if !ok {
|
||||
t.Fatalf("expected WriteWALEntry: got %#v", e)
|
||||
}
|
||||
if err := w.Write(mustMarshalEntry(entry)); err != nil {
|
||||
fatal(t, "write points", err)
|
||||
} else if err := w.Flush(); err != nil {
|
||||
fatal(t, "flush", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(entry, e) {
|
||||
t.Fatalf("expected %+v but got %+v", entry, e)
|
||||
}
|
||||
if _, err := f.Seek(0, io.SeekStart); err != nil {
|
||||
fatal(t, "seek", err)
|
||||
}
|
||||
|
||||
r := NewWALSegmentReader(f)
|
||||
if !r.Next() {
|
||||
t.Fatalf("expected next, got false")
|
||||
}
|
||||
|
||||
if e, err := r.Read(); err != nil {
|
||||
fatal(t, "read entry", err)
|
||||
} else if e, ok := e.(*DeleteBucketRangeWALEntry); !ok {
|
||||
t.Fatalf("expected WriteWALEntry: got %#v", e)
|
||||
} else if !reflect.DeepEqual(entry, e) {
|
||||
t.Fatalf("expected %+v but got %+v", entry, e)
|
||||
} else if got, want := e.Type(), DeleteBucketRangeKeepSeriesWALEntryType; got != want {
|
||||
t.Fatalf("Type()=%v, want %v", got, want)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestWAL_ClosedSegments(t *testing.T) {
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/kit/tracing"
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/tsdb"
|
||||
|
@ -19,7 +20,7 @@ import (
|
|||
// DeletePrefixRange removes all TSM data belonging to a bucket, and removes all index
|
||||
// and series file data associated with the bucket. The provided time range ensures
|
||||
// that only bucket data for that range is removed.
|
||||
func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, max int64, pred Predicate) error {
|
||||
func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, max int64, pred Predicate, opts influxdb.DeletePrefixRangeOptions) error {
|
||||
span, ctx := tracing.StartSpanFromContext(rootCtx)
|
||||
span.LogKV("name_prefix", fmt.Sprintf("%x", name),
|
||||
"min", time.Unix(0, min), "max", time.Unix(0, max),
|
||||
|
@ -198,7 +199,7 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma
|
|||
span.LogKV("cache_cardinality", keysChecked)
|
||||
span.Finish()
|
||||
|
||||
if len(possiblyDead.keys) > 0 {
|
||||
if len(possiblyDead.keys) > 0 && !opts.KeepSeries {
|
||||
buf := make([]byte, 1024)
|
||||
|
||||
// TODO(jeff): all of these methods have possible errors which opens us to partial
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/tsm1"
|
||||
)
|
||||
|
@ -44,7 +45,7 @@ func TestEngine_DeletePrefix(t *testing.T) {
|
|||
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil); err != nil {
|
||||
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
|
||||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
|
@ -90,7 +91,7 @@ func TestEngine_DeletePrefix(t *testing.T) {
|
|||
iter.Close()
|
||||
|
||||
// Deleting remaining series should remove them from the series.
|
||||
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 9, nil); err != nil {
|
||||
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 9, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
|
||||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
|
@ -149,7 +150,7 @@ func BenchmarkEngine_DeletePrefixRange(b *testing.B) {
|
|||
}
|
||||
b.StartTimer()
|
||||
|
||||
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil); err != nil {
|
||||
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
|
||||
b.Fatal(err)
|
||||
} else if err := e.Close(); err != nil {
|
||||
b.Fatal(err)
|
||||
|
|
|
@ -58,7 +58,7 @@ func TestIndex_SeriesIDSet(t *testing.T) {
|
|||
|
||||
// Drop all the series for the gpu measurement and they should no longer
|
||||
// be in the series ID set.
|
||||
if err := engine.DeletePrefixRange(context.Background(), []byte("gpu"), math.MinInt64, math.MaxInt64, nil); err != nil {
|
||||
if err := engine.DeletePrefixRange(context.Background(), []byte("gpu"), math.MinInt64, math.MaxInt64, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -331,7 +331,7 @@ func BenchmarkEngine_DeletePrefixRange_Cache(b *testing.B) {
|
|||
}
|
||||
b.StartTimer()
|
||||
|
||||
if err := e.DeletePrefixRange(context.Background(), []byte("0000000011221111000000001122112"), 0, math.MaxInt64, nil); err != nil {
|
||||
if err := e.DeletePrefixRange(context.Background(), []byte("0000000011221111000000001122112"), 0, math.MaxInt64, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -346,7 +346,7 @@ func BenchmarkEngine_DeletePrefixRange_Cache(b *testing.B) {
|
|||
}
|
||||
b.StartTimer()
|
||||
|
||||
if err := e.DeletePrefixRange(context.Background(), []byte("fooasdasdasdasdasd"), 0, math.MaxInt64, nil); err != nil {
|
||||
if err := e.DeletePrefixRange(context.Background(), []byte("fooasdasdasdasdasd"), 0, math.MaxInt64, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
@ -534,7 +534,7 @@ func (e *Engine) MustDeleteBucketRange(orgID, bucketID influxdb.ID, min, max int
|
|||
encoded := tsdb.EncodeName(orgID, bucketID)
|
||||
name := models.EscapeMeasurement(encoded[:])
|
||||
|
||||
err := e.DeletePrefixRange(context.Background(), name, min, max, nil)
|
||||
err := e.DeletePrefixRange(context.Background(), name, min, max, nil, influxdb.DeletePrefixRangeOptions{})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue