feat(influxql): Implement DELETE & DROP MEASUREMENT
parent
0780232b83
commit
1501351623
|
@ -115,8 +115,8 @@ func (t *TemporaryEngine) SeriesCardinality() int64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteBucketRangePredicate will delete a bucket from the range and predicate.
|
// DeleteBucketRangePredicate will delete a bucket from the range and predicate.
|
||||||
func (t *TemporaryEngine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error {
|
func (t *TemporaryEngine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
|
||||||
return t.engine.DeleteBucketRangePredicate(ctx, orgID, bucketID, min, max, pred)
|
return t.engine.DeleteBucketRangePredicate(ctx, orgID, bucketID, min, max, pred, opts)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,5 +11,10 @@ type Predicate interface {
|
||||||
|
|
||||||
// DeleteService will delete a bucket from the range and predict.
|
// DeleteService will delete a bucket from the range and predict.
|
||||||
type DeleteService interface {
|
type DeleteService interface {
|
||||||
DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID ID, min, max int64, pred Predicate) error
|
DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID ID, min, max int64, pred Predicate, opts DeletePrefixRangeOptions) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type DeletePrefixRangeOptions struct {
|
||||||
|
// If true, does not delete underlying series when all data has been deleted.
|
||||||
|
KeepSeries bool
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
pcontext "github.com/influxdata/influxdb/v2/context"
|
pcontext "github.com/influxdata/influxdb/v2/context"
|
||||||
"github.com/influxdata/influxdb/v2/kit/tracing"
|
"github.com/influxdata/influxdb/v2/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/v2/predicate"
|
"github.com/influxdata/influxdb/v2/predicate"
|
||||||
|
"github.com/influxdata/influxdb/v2/tsdb/tsm1"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -121,6 +122,7 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) {
|
||||||
dr.Start,
|
dr.Start,
|
||||||
dr.Stop,
|
dr.Stop,
|
||||||
dr.Predicate,
|
dr.Predicate,
|
||||||
|
influxdb.DeletePrefixRangeOptions{KeepSeries: dr.KeepSeries},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.HandleHTTPError(ctx, err, w)
|
h.HandleHTTPError(ctx, err, w)
|
||||||
|
@ -155,28 +157,33 @@ func decodeDeleteRequest(ctx context.Context, r *http.Request, orgSvc influxdb.O
|
||||||
}
|
}
|
||||||
|
|
||||||
type deleteRequest struct {
|
type deleteRequest struct {
|
||||||
Org *influxdb.Organization
|
Org *influxdb.Organization
|
||||||
Bucket *influxdb.Bucket
|
Bucket *influxdb.Bucket
|
||||||
Start int64
|
Start int64
|
||||||
Stop int64
|
Stop int64
|
||||||
Predicate influxdb.Predicate
|
Predicate influxdb.Predicate
|
||||||
|
KeepSeries bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type deleteRequestDecode struct {
|
type deleteRequestDecode struct {
|
||||||
Start string `json:"start"`
|
Start string `json:"start"`
|
||||||
Stop string `json:"stop"`
|
Stop string `json:"stop"`
|
||||||
Predicate string `json:"predicate"`
|
Predicate string `json:"predicate"`
|
||||||
|
PredicateBytes []byte `json:"predicate_bytes"`
|
||||||
|
KeepSeries bool `json:"keep_series"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteRequest is the request send over http to delete points.
|
// DeleteRequest is the request send over http to delete points.
|
||||||
type DeleteRequest struct {
|
type DeleteRequest struct {
|
||||||
OrgID string `json:"-"`
|
OrgID string `json:"-"`
|
||||||
Org string `json:"-"` // org name
|
Org string `json:"-"` // org name
|
||||||
BucketID string `json:"-"`
|
BucketID string `json:"-"`
|
||||||
Bucket string `json:"-"`
|
Bucket string `json:"-"`
|
||||||
Start string `json:"start"`
|
Start string `json:"start"`
|
||||||
Stop string `json:"stop"`
|
Stop string `json:"stop"`
|
||||||
Predicate string `json:"predicate"`
|
Predicate string `json:"predicate"`
|
||||||
|
PredicateBytes []byte `json:"predicate_bytes"`
|
||||||
|
KeepSeries bool `json:"keep_series"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dr *deleteRequest) UnmarshalJSON(b []byte) error {
|
func (dr *deleteRequest) UnmarshalJSON(b []byte) error {
|
||||||
|
@ -188,7 +195,8 @@ func (dr *deleteRequest) UnmarshalJSON(b []byte) error {
|
||||||
Err: err,
|
Err: err,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*dr = deleteRequest{}
|
|
||||||
|
*dr = deleteRequest{KeepSeries: drd.KeepSeries}
|
||||||
start, err := time.Parse(time.RFC3339Nano, drd.Start)
|
start, err := time.Parse(time.RFC3339Nano, drd.Start)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &influxdb.Error{
|
return &influxdb.Error{
|
||||||
|
@ -208,12 +216,22 @@ func (dr *deleteRequest) UnmarshalJSON(b []byte) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
dr.Stop = stop.UnixNano()
|
dr.Stop = stop.UnixNano()
|
||||||
node, err := predicate.Parse(drd.Predicate)
|
|
||||||
if err != nil {
|
if len(drd.PredicateBytes) != 0 {
|
||||||
return err
|
if dr.Predicate, err = tsm1.UnmarshalPredicate(drd.PredicateBytes); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
node, err := predicate.Parse(drd.Predicate)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if dr.Predicate, err = predicate.New(node); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
dr.Predicate, err = predicate.New(node)
|
|
||||||
return err
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteService sends data over HTTP to delete points.
|
// DeleteService sends data over HTTP to delete points.
|
||||||
|
|
|
@ -10,20 +10,20 @@ var _ influxdb.DeleteService = &DeleteService{}
|
||||||
|
|
||||||
// DeleteService is a mock delete server.
|
// DeleteService is a mock delete server.
|
||||||
type DeleteService struct {
|
type DeleteService struct {
|
||||||
DeleteBucketRangePredicateF func(tx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error
|
DeleteBucketRangePredicateF func(tx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDeleteService returns a mock DeleteService where its methods will return
|
// NewDeleteService returns a mock DeleteService where its methods will return
|
||||||
// zero values.
|
// zero values.
|
||||||
func NewDeleteService() DeleteService {
|
func NewDeleteService() DeleteService {
|
||||||
return DeleteService{
|
return DeleteService{
|
||||||
DeleteBucketRangePredicateF: func(tx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error {
|
DeleteBucketRangePredicateF: func(tx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//DeleteBucketRangePredicate calls DeleteBucketRangePredicateF.
|
//DeleteBucketRangePredicate calls DeleteBucketRangePredicateF.
|
||||||
func (s DeleteService) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error {
|
func (s DeleteService) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
|
||||||
return s.DeleteBucketRangePredicateF(ctx, orgID, bucketID, min, max, pred)
|
return s.DeleteBucketRangePredicateF(ctx, orgID, bucketID, min, max, pred, opts)
|
||||||
}
|
}
|
||||||
|
|
|
@ -336,7 +336,7 @@ func (e *Engine) replayWAL() error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return e.deleteBucketRangeLocked(context.Background(), en.OrgID, en.BucketID, en.Min, en.Max, pred)
|
return e.deleteBucketRangeLocked(context.Background(), en.OrgID, en.BucketID, en.Min, en.Max, pred, influxdb.DeletePrefixRangeOptions{KeepSeries: en.KeepSeries})
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -663,12 +663,12 @@ func (e *Engine) DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, nil)
|
return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, nil, influxdb.DeletePrefixRangeOptions{})
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data
|
// DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data
|
||||||
// deleted must be in [min, max], and the key must match the predicate if provided.
|
// deleted must be in [min, max], and the key must match the predicate if provided.
|
||||||
func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error {
|
func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
|
||||||
span, ctx := tracing.StartSpanFromContext(ctx)
|
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||||
defer span.Finish()
|
defer span.Finish()
|
||||||
|
|
||||||
|
@ -693,18 +693,18 @@ func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, pred)
|
return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, pred, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteBucketRangeLocked does the work of deleting a bucket range and must be called under
|
// deleteBucketRangeLocked does the work of deleting a bucket range and must be called under
|
||||||
// some sort of lock.
|
// some sort of lock.
|
||||||
func (e *Engine) deleteBucketRangeLocked(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred tsm1.Predicate) error {
|
func (e *Engine) deleteBucketRangeLocked(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred tsm1.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
|
||||||
// TODO(edd): we need to clean up how we're encoding the prefix so that we
|
// TODO(edd): we need to clean up how we're encoding the prefix so that we
|
||||||
// don't have to remember to get it right everywhere we need to touch TSM data.
|
// don't have to remember to get it right everywhere we need to touch TSM data.
|
||||||
encoded := tsdb.EncodeName(orgID, bucketID)
|
encoded := tsdb.EncodeName(orgID, bucketID)
|
||||||
name := models.EscapeMeasurement(encoded[:])
|
name := models.EscapeMeasurement(encoded[:])
|
||||||
|
|
||||||
return e.engine.DeletePrefixRange(ctx, name, min, max, pred)
|
return e.engine.DeletePrefixRange(ctx, name, min, max, pred, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateBackup creates a "snapshot" of all TSM data in the Engine.
|
// CreateBackup creates a "snapshot" of all TSM data in the Engine.
|
||||||
|
|
|
@ -311,7 +311,7 @@ func TestEngine_DeleteBucket_Predicate(t *testing.T) {
|
||||||
|
|
||||||
// Remove the matching series.
|
// Remove the matching series.
|
||||||
if err := engine.DeleteBucketRangePredicate(context.Background(), engine.org, engine.bucket,
|
if err := engine.DeleteBucketRangePredicate(context.Background(), engine.org, engine.bucket,
|
||||||
math.MinInt64, math.MaxInt64, pred); err != nil {
|
math.MinInt64, math.MaxInt64, pred, influxdb.DeletePrefixRangeOptions{}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -341,7 +341,7 @@ func TestEngine_DeleteBucket_Predicate(t *testing.T) {
|
||||||
|
|
||||||
// Remove the matching series.
|
// Remove the matching series.
|
||||||
if err := engine.DeleteBucketRangePredicate(context.Background(), engine.org, engine.bucket,
|
if err := engine.DeleteBucketRangePredicate(context.Background(), engine.org, engine.bucket,
|
||||||
math.MinInt64, math.MaxInt64, pred); err != nil {
|
math.MinInt64, math.MaxInt64, pred, influxdb.DeletePrefixRangeOptions{}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -63,6 +63,10 @@ const (
|
||||||
|
|
||||||
// DeleteBucketRangeWALEntryType indicates a delete bucket range entry.
|
// DeleteBucketRangeWALEntryType indicates a delete bucket range entry.
|
||||||
DeleteBucketRangeWALEntryType WalEntryType = 0x04
|
DeleteBucketRangeWALEntryType WalEntryType = 0x04
|
||||||
|
|
||||||
|
// DeleteBucketRangeKeepSeriesWALEntryType indicates a delete bucket range entry
|
||||||
|
// but the underlying series are not deleted.
|
||||||
|
DeleteBucketRangeKeepSeriesWALEntryType WalEntryType = 0x05
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -993,10 +997,11 @@ func (w *WriteWALEntry) Type() WalEntryType {
|
||||||
|
|
||||||
// DeleteBucketRangeWALEntry represents the deletion of data in a bucket.
|
// DeleteBucketRangeWALEntry represents the deletion of data in a bucket.
|
||||||
type DeleteBucketRangeWALEntry struct {
|
type DeleteBucketRangeWALEntry struct {
|
||||||
OrgID influxdb.ID
|
OrgID influxdb.ID
|
||||||
BucketID influxdb.ID
|
BucketID influxdb.ID
|
||||||
Min, Max int64
|
Min, Max int64
|
||||||
Predicate []byte
|
Predicate []byte
|
||||||
|
KeepSeries bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarshalBinary returns a binary representation of the entry in a new byte slice.
|
// MarshalBinary returns a binary representation of the entry in a new byte slice.
|
||||||
|
@ -1062,6 +1067,9 @@ func (w *DeleteBucketRangeWALEntry) Encode(b []byte) ([]byte, error) {
|
||||||
|
|
||||||
// Type returns DeleteBucketRangeWALEntryType.
|
// Type returns DeleteBucketRangeWALEntryType.
|
||||||
func (w *DeleteBucketRangeWALEntry) Type() WalEntryType {
|
func (w *DeleteBucketRangeWALEntry) Type() WalEntryType {
|
||||||
|
if w.KeepSeries {
|
||||||
|
return DeleteBucketRangeKeepSeriesWALEntryType
|
||||||
|
}
|
||||||
return DeleteBucketRangeWALEntryType
|
return DeleteBucketRangeWALEntryType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1203,13 +1211,15 @@ func (r *WALSegmentReader) Next() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// and marshal it and send it to the cache
|
// and marshal it and send it to the cache
|
||||||
switch WalEntryType(entryType) {
|
switch typ := WalEntryType(entryType); typ {
|
||||||
case WriteWALEntryType:
|
case WriteWALEntryType:
|
||||||
r.entry = &WriteWALEntry{
|
r.entry = &WriteWALEntry{
|
||||||
Values: make(map[string][]value.Value),
|
Values: make(map[string][]value.Value),
|
||||||
}
|
}
|
||||||
case DeleteBucketRangeWALEntryType:
|
case DeleteBucketRangeWALEntryType, DeleteBucketRangeKeepSeriesWALEntryType:
|
||||||
r.entry = &DeleteBucketRangeWALEntry{}
|
r.entry = &DeleteBucketRangeWALEntry{
|
||||||
|
KeepSeries: typ == DeleteBucketRangeKeepSeriesWALEntryType,
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
r.err = fmt.Errorf("unknown wal entry type: %v", entryType)
|
r.err = fmt.Errorf("unknown wal entry type: %v", entryType)
|
||||||
return true
|
return true
|
||||||
|
|
|
@ -216,50 +216,87 @@ func TestWALWriter_WriteMulti_Multiple(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWALWriter_DeleteBucketRange(t *testing.T) {
|
func TestWALWriter_DeleteBucketRange(t *testing.T) {
|
||||||
dir := MustTempDir()
|
t.Run("DeleteSeries", func(t *testing.T) {
|
||||||
defer os.RemoveAll(dir)
|
dir := MustTempDir()
|
||||||
f := MustTempFile(dir)
|
defer os.RemoveAll(dir)
|
||||||
w := NewWALSegmentWriter(f)
|
f := MustTempFile(dir)
|
||||||
|
w := NewWALSegmentWriter(f)
|
||||||
|
|
||||||
entry := &DeleteBucketRangeWALEntry{
|
entry := &DeleteBucketRangeWALEntry{
|
||||||
OrgID: influxdb.ID(1),
|
OrgID: influxdb.ID(1),
|
||||||
BucketID: influxdb.ID(2),
|
BucketID: influxdb.ID(2),
|
||||||
Min: 3,
|
Min: 3,
|
||||||
Max: 4,
|
Max: 4,
|
||||||
Predicate: []byte("predicate"),
|
Predicate: []byte("predicate"),
|
||||||
}
|
KeepSeries: false,
|
||||||
|
}
|
||||||
|
|
||||||
if err := w.Write(mustMarshalEntry(entry)); err != nil {
|
if err := w.Write(mustMarshalEntry(entry)); err != nil {
|
||||||
fatal(t, "write points", err)
|
fatal(t, "write points", err)
|
||||||
}
|
} else if err := w.Flush(); err != nil {
|
||||||
|
fatal(t, "flush", err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := w.Flush(); err != nil {
|
if _, err := f.Seek(0, io.SeekStart); err != nil {
|
||||||
fatal(t, "flush", err)
|
fatal(t, "seek", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := f.Seek(0, io.SeekStart); err != nil {
|
r := NewWALSegmentReader(f)
|
||||||
fatal(t, "seek", err)
|
if !r.Next() {
|
||||||
}
|
t.Fatalf("expected next, got false")
|
||||||
|
}
|
||||||
|
|
||||||
r := NewWALSegmentReader(f)
|
if e, err := r.Read(); err != nil {
|
||||||
|
fatal(t, "read entry", err)
|
||||||
|
} else if e, ok := e.(*DeleteBucketRangeWALEntry); !ok {
|
||||||
|
t.Fatalf("expected WriteWALEntry: got %#v", e)
|
||||||
|
} else if !reflect.DeepEqual(entry, e) {
|
||||||
|
t.Fatalf("expected %+v but got %+v", entry, e)
|
||||||
|
} else if got, want := e.Type(), DeleteBucketRangeWALEntryType; got != want {
|
||||||
|
t.Fatalf("Type()=%v, want %v", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
if !r.Next() {
|
t.Run("KeepSeries", func(t *testing.T) {
|
||||||
t.Fatalf("expected next, got false")
|
dir := MustTempDir()
|
||||||
}
|
defer os.RemoveAll(dir)
|
||||||
|
f := MustTempFile(dir)
|
||||||
|
w := NewWALSegmentWriter(f)
|
||||||
|
|
||||||
we, err := r.Read()
|
entry := &DeleteBucketRangeWALEntry{
|
||||||
if err != nil {
|
OrgID: influxdb.ID(1),
|
||||||
fatal(t, "read entry", err)
|
BucketID: influxdb.ID(2),
|
||||||
}
|
Min: 3,
|
||||||
|
Max: 4,
|
||||||
|
Predicate: []byte("predicate"),
|
||||||
|
KeepSeries: true,
|
||||||
|
}
|
||||||
|
|
||||||
e, ok := we.(*DeleteBucketRangeWALEntry)
|
if err := w.Write(mustMarshalEntry(entry)); err != nil {
|
||||||
if !ok {
|
fatal(t, "write points", err)
|
||||||
t.Fatalf("expected WriteWALEntry: got %#v", e)
|
} else if err := w.Flush(); err != nil {
|
||||||
}
|
fatal(t, "flush", err)
|
||||||
|
}
|
||||||
|
|
||||||
if !reflect.DeepEqual(entry, e) {
|
if _, err := f.Seek(0, io.SeekStart); err != nil {
|
||||||
t.Fatalf("expected %+v but got %+v", entry, e)
|
fatal(t, "seek", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r := NewWALSegmentReader(f)
|
||||||
|
if !r.Next() {
|
||||||
|
t.Fatalf("expected next, got false")
|
||||||
|
}
|
||||||
|
|
||||||
|
if e, err := r.Read(); err != nil {
|
||||||
|
fatal(t, "read entry", err)
|
||||||
|
} else if e, ok := e.(*DeleteBucketRangeWALEntry); !ok {
|
||||||
|
t.Fatalf("expected WriteWALEntry: got %#v", e)
|
||||||
|
} else if !reflect.DeepEqual(entry, e) {
|
||||||
|
t.Fatalf("expected %+v but got %+v", entry, e)
|
||||||
|
} else if got, want := e.Type(), DeleteBucketRangeKeepSeriesWALEntryType; got != want {
|
||||||
|
t.Fatalf("Type()=%v, want %v", got, want)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWAL_ClosedSegments(t *testing.T) {
|
func TestWAL_ClosedSegments(t *testing.T) {
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/v2"
|
||||||
"github.com/influxdata/influxdb/v2/kit/tracing"
|
"github.com/influxdata/influxdb/v2/kit/tracing"
|
||||||
"github.com/influxdata/influxdb/v2/models"
|
"github.com/influxdata/influxdb/v2/models"
|
||||||
"github.com/influxdata/influxdb/v2/tsdb"
|
"github.com/influxdata/influxdb/v2/tsdb"
|
||||||
|
@ -19,7 +20,7 @@ import (
|
||||||
// DeletePrefixRange removes all TSM data belonging to a bucket, and removes all index
|
// DeletePrefixRange removes all TSM data belonging to a bucket, and removes all index
|
||||||
// and series file data associated with the bucket. The provided time range ensures
|
// and series file data associated with the bucket. The provided time range ensures
|
||||||
// that only bucket data for that range is removed.
|
// that only bucket data for that range is removed.
|
||||||
func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, max int64, pred Predicate) error {
|
func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, max int64, pred Predicate, opts influxdb.DeletePrefixRangeOptions) error {
|
||||||
span, ctx := tracing.StartSpanFromContext(rootCtx)
|
span, ctx := tracing.StartSpanFromContext(rootCtx)
|
||||||
span.LogKV("name_prefix", fmt.Sprintf("%x", name),
|
span.LogKV("name_prefix", fmt.Sprintf("%x", name),
|
||||||
"min", time.Unix(0, min), "max", time.Unix(0, max),
|
"min", time.Unix(0, min), "max", time.Unix(0, max),
|
||||||
|
@ -198,7 +199,7 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma
|
||||||
span.LogKV("cache_cardinality", keysChecked)
|
span.LogKV("cache_cardinality", keysChecked)
|
||||||
span.Finish()
|
span.Finish()
|
||||||
|
|
||||||
if len(possiblyDead.keys) > 0 {
|
if len(possiblyDead.keys) > 0 && !opts.KeepSeries {
|
||||||
buf := make([]byte, 1024)
|
buf := make([]byte, 1024)
|
||||||
|
|
||||||
// TODO(jeff): all of these methods have possible errors which opens us to partial
|
// TODO(jeff): all of these methods have possible errors which opens us to partial
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"reflect"
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/influxdb/v2"
|
||||||
"github.com/influxdata/influxdb/v2/models"
|
"github.com/influxdata/influxdb/v2/models"
|
||||||
"github.com/influxdata/influxdb/v2/tsdb/tsm1"
|
"github.com/influxdata/influxdb/v2/tsdb/tsm1"
|
||||||
)
|
)
|
||||||
|
@ -44,7 +45,7 @@ func TestEngine_DeletePrefix(t *testing.T) {
|
||||||
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil); err != nil {
|
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
|
||||||
t.Fatalf("failed to delete series: %v", err)
|
t.Fatalf("failed to delete series: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,7 +91,7 @@ func TestEngine_DeletePrefix(t *testing.T) {
|
||||||
iter.Close()
|
iter.Close()
|
||||||
|
|
||||||
// Deleting remaining series should remove them from the series.
|
// Deleting remaining series should remove them from the series.
|
||||||
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 9, nil); err != nil {
|
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 9, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
|
||||||
t.Fatalf("failed to delete series: %v", err)
|
t.Fatalf("failed to delete series: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,7 +150,7 @@ func BenchmarkEngine_DeletePrefixRange(b *testing.B) {
|
||||||
}
|
}
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
|
|
||||||
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil); err != nil {
|
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
} else if err := e.Close(); err != nil {
|
} else if err := e.Close(); err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
|
|
|
@ -58,7 +58,7 @@ func TestIndex_SeriesIDSet(t *testing.T) {
|
||||||
|
|
||||||
// Drop all the series for the gpu measurement and they should no longer
|
// Drop all the series for the gpu measurement and they should no longer
|
||||||
// be in the series ID set.
|
// be in the series ID set.
|
||||||
if err := engine.DeletePrefixRange(context.Background(), []byte("gpu"), math.MinInt64, math.MaxInt64, nil); err != nil {
|
if err := engine.DeletePrefixRange(context.Background(), []byte("gpu"), math.MinInt64, math.MaxInt64, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -331,7 +331,7 @@ func BenchmarkEngine_DeletePrefixRange_Cache(b *testing.B) {
|
||||||
}
|
}
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
|
|
||||||
if err := e.DeletePrefixRange(context.Background(), []byte("0000000011221111000000001122112"), 0, math.MaxInt64, nil); err != nil {
|
if err := e.DeletePrefixRange(context.Background(), []byte("0000000011221111000000001122112"), 0, math.MaxInt64, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -346,7 +346,7 @@ func BenchmarkEngine_DeletePrefixRange_Cache(b *testing.B) {
|
||||||
}
|
}
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
|
|
||||||
if err := e.DeletePrefixRange(context.Background(), []byte("fooasdasdasdasdasd"), 0, math.MaxInt64, nil); err != nil {
|
if err := e.DeletePrefixRange(context.Background(), []byte("fooasdasdasdasdasd"), 0, math.MaxInt64, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
|
||||||
b.Fatal(err)
|
b.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -534,7 +534,7 @@ func (e *Engine) MustDeleteBucketRange(orgID, bucketID influxdb.ID, min, max int
|
||||||
encoded := tsdb.EncodeName(orgID, bucketID)
|
encoded := tsdb.EncodeName(orgID, bucketID)
|
||||||
name := models.EscapeMeasurement(encoded[:])
|
name := models.EscapeMeasurement(encoded[:])
|
||||||
|
|
||||||
err := e.DeletePrefixRange(context.Background(), name, min, max, nil)
|
err := e.DeletePrefixRange(context.Background(), name, min, max, nil, influxdb.DeletePrefixRangeOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue