feat(influxql): Implement DELETE & DROP MEASUREMENT

pull/19387/head
Ben Johnson 2020-08-21 13:32:46 -06:00
parent 0780232b83
commit 1501351623
11 changed files with 159 additions and 87 deletions

View File

@ -115,8 +115,8 @@ func (t *TemporaryEngine) SeriesCardinality() int64 {
}
// DeleteBucketRangePredicate will delete a bucket from the range and predicate.
func (t *TemporaryEngine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error {
return t.engine.DeleteBucketRangePredicate(ctx, orgID, bucketID, min, max, pred)
func (t *TemporaryEngine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
return t.engine.DeleteBucketRangePredicate(ctx, orgID, bucketID, min, max, pred, opts)
}

View File

@ -11,5 +11,10 @@ type Predicate interface {
// DeleteService will delete a bucket from the range and predict.
type DeleteService interface {
DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID ID, min, max int64, pred Predicate) error
DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID ID, min, max int64, pred Predicate, opts DeletePrefixRangeOptions) error
}
type DeletePrefixRangeOptions struct {
// If true, does not delete underlying series when all data has been deleted.
KeepSeries bool
}

View File

@ -13,6 +13,7 @@ import (
pcontext "github.com/influxdata/influxdb/v2/context"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/predicate"
"github.com/influxdata/influxdb/v2/tsdb/tsm1"
"go.uber.org/zap"
)
@ -121,6 +122,7 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) {
dr.Start,
dr.Stop,
dr.Predicate,
influxdb.DeletePrefixRangeOptions{KeepSeries: dr.KeepSeries},
)
if err != nil {
h.HandleHTTPError(ctx, err, w)
@ -155,28 +157,33 @@ func decodeDeleteRequest(ctx context.Context, r *http.Request, orgSvc influxdb.O
}
type deleteRequest struct {
Org *influxdb.Organization
Bucket *influxdb.Bucket
Start int64
Stop int64
Predicate influxdb.Predicate
Org *influxdb.Organization
Bucket *influxdb.Bucket
Start int64
Stop int64
Predicate influxdb.Predicate
KeepSeries bool
}
type deleteRequestDecode struct {
Start string `json:"start"`
Stop string `json:"stop"`
Predicate string `json:"predicate"`
Start string `json:"start"`
Stop string `json:"stop"`
Predicate string `json:"predicate"`
PredicateBytes []byte `json:"predicate_bytes"`
KeepSeries bool `json:"keep_series"`
}
// DeleteRequest is the request send over http to delete points.
type DeleteRequest struct {
OrgID string `json:"-"`
Org string `json:"-"` // org name
BucketID string `json:"-"`
Bucket string `json:"-"`
Start string `json:"start"`
Stop string `json:"stop"`
Predicate string `json:"predicate"`
OrgID string `json:"-"`
Org string `json:"-"` // org name
BucketID string `json:"-"`
Bucket string `json:"-"`
Start string `json:"start"`
Stop string `json:"stop"`
Predicate string `json:"predicate"`
PredicateBytes []byte `json:"predicate_bytes"`
KeepSeries bool `json:"keep_series"`
}
func (dr *deleteRequest) UnmarshalJSON(b []byte) error {
@ -188,7 +195,8 @@ func (dr *deleteRequest) UnmarshalJSON(b []byte) error {
Err: err,
}
}
*dr = deleteRequest{}
*dr = deleteRequest{KeepSeries: drd.KeepSeries}
start, err := time.Parse(time.RFC3339Nano, drd.Start)
if err != nil {
return &influxdb.Error{
@ -208,12 +216,22 @@ func (dr *deleteRequest) UnmarshalJSON(b []byte) error {
}
}
dr.Stop = stop.UnixNano()
node, err := predicate.Parse(drd.Predicate)
if err != nil {
return err
if len(drd.PredicateBytes) != 0 {
if dr.Predicate, err = tsm1.UnmarshalPredicate(drd.PredicateBytes); err != nil {
return err
}
} else {
node, err := predicate.Parse(drd.Predicate)
if err != nil {
return err
}
if dr.Predicate, err = predicate.New(node); err != nil {
return err
}
}
dr.Predicate, err = predicate.New(node)
return err
return nil
}
// DeleteService sends data over HTTP to delete points.

View File

@ -10,20 +10,20 @@ var _ influxdb.DeleteService = &DeleteService{}
// DeleteService is a mock delete server.
type DeleteService struct {
DeleteBucketRangePredicateF func(tx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error
DeleteBucketRangePredicateF func(tx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error
}
// NewDeleteService returns a mock DeleteService where its methods will return
// zero values.
func NewDeleteService() DeleteService {
return DeleteService{
DeleteBucketRangePredicateF: func(tx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error {
DeleteBucketRangePredicateF: func(tx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
return nil
},
}
}
//DeleteBucketRangePredicate calls DeleteBucketRangePredicateF.
func (s DeleteService) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error {
return s.DeleteBucketRangePredicateF(ctx, orgID, bucketID, min, max, pred)
func (s DeleteService) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
return s.DeleteBucketRangePredicateF(ctx, orgID, bucketID, min, max, pred, opts)
}

View File

@ -336,7 +336,7 @@ func (e *Engine) replayWAL() error {
}
}
return e.deleteBucketRangeLocked(context.Background(), en.OrgID, en.BucketID, en.Min, en.Max, pred)
return e.deleteBucketRangeLocked(context.Background(), en.OrgID, en.BucketID, en.Min, en.Max, pred, influxdb.DeletePrefixRangeOptions{KeepSeries: en.KeepSeries})
}
return nil
@ -663,12 +663,12 @@ func (e *Engine) DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb
return err
}
return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, nil)
return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, nil, influxdb.DeletePrefixRangeOptions{})
}
// DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data
// deleted must be in [min, max], and the key must match the predicate if provided.
func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error {
func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
@ -693,18 +693,18 @@ func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID
return err
}
return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, pred)
return e.deleteBucketRangeLocked(ctx, orgID, bucketID, min, max, pred, opts)
}
// deleteBucketRangeLocked does the work of deleting a bucket range and must be called under
// some sort of lock.
func (e *Engine) deleteBucketRangeLocked(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred tsm1.Predicate) error {
func (e *Engine) deleteBucketRangeLocked(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred tsm1.Predicate, opts influxdb.DeletePrefixRangeOptions) error {
// TODO(edd): we need to clean up how we're encoding the prefix so that we
// don't have to remember to get it right everywhere we need to touch TSM data.
encoded := tsdb.EncodeName(orgID, bucketID)
name := models.EscapeMeasurement(encoded[:])
return e.engine.DeletePrefixRange(ctx, name, min, max, pred)
return e.engine.DeletePrefixRange(ctx, name, min, max, pred, opts)
}
// CreateBackup creates a "snapshot" of all TSM data in the Engine.

View File

@ -311,7 +311,7 @@ func TestEngine_DeleteBucket_Predicate(t *testing.T) {
// Remove the matching series.
if err := engine.DeleteBucketRangePredicate(context.Background(), engine.org, engine.bucket,
math.MinInt64, math.MaxInt64, pred); err != nil {
math.MinInt64, math.MaxInt64, pred, influxdb.DeletePrefixRangeOptions{}); err != nil {
t.Fatal(err)
}
@ -341,7 +341,7 @@ func TestEngine_DeleteBucket_Predicate(t *testing.T) {
// Remove the matching series.
if err := engine.DeleteBucketRangePredicate(context.Background(), engine.org, engine.bucket,
math.MinInt64, math.MaxInt64, pred); err != nil {
math.MinInt64, math.MaxInt64, pred, influxdb.DeletePrefixRangeOptions{}); err != nil {
t.Fatal(err)
}

View File

@ -63,6 +63,10 @@ const (
// DeleteBucketRangeWALEntryType indicates a delete bucket range entry.
DeleteBucketRangeWALEntryType WalEntryType = 0x04
// DeleteBucketRangeKeepSeriesWALEntryType indicates a delete bucket range entry
// but the underlying series are not deleted.
DeleteBucketRangeKeepSeriesWALEntryType WalEntryType = 0x05
)
var (
@ -993,10 +997,11 @@ func (w *WriteWALEntry) Type() WalEntryType {
// DeleteBucketRangeWALEntry represents the deletion of data in a bucket.
type DeleteBucketRangeWALEntry struct {
OrgID influxdb.ID
BucketID influxdb.ID
Min, Max int64
Predicate []byte
OrgID influxdb.ID
BucketID influxdb.ID
Min, Max int64
Predicate []byte
KeepSeries bool
}
// MarshalBinary returns a binary representation of the entry in a new byte slice.
@ -1062,6 +1067,9 @@ func (w *DeleteBucketRangeWALEntry) Encode(b []byte) ([]byte, error) {
// Type returns DeleteBucketRangeWALEntryType.
func (w *DeleteBucketRangeWALEntry) Type() WalEntryType {
if w.KeepSeries {
return DeleteBucketRangeKeepSeriesWALEntryType
}
return DeleteBucketRangeWALEntryType
}
@ -1203,13 +1211,15 @@ func (r *WALSegmentReader) Next() bool {
}
// and marshal it and send it to the cache
switch WalEntryType(entryType) {
switch typ := WalEntryType(entryType); typ {
case WriteWALEntryType:
r.entry = &WriteWALEntry{
Values: make(map[string][]value.Value),
}
case DeleteBucketRangeWALEntryType:
r.entry = &DeleteBucketRangeWALEntry{}
case DeleteBucketRangeWALEntryType, DeleteBucketRangeKeepSeriesWALEntryType:
r.entry = &DeleteBucketRangeWALEntry{
KeepSeries: typ == DeleteBucketRangeKeepSeriesWALEntryType,
}
default:
r.err = fmt.Errorf("unknown wal entry type: %v", entryType)
return true

View File

@ -216,50 +216,87 @@ func TestWALWriter_WriteMulti_Multiple(t *testing.T) {
}
func TestWALWriter_DeleteBucketRange(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w := NewWALSegmentWriter(f)
t.Run("DeleteSeries", func(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w := NewWALSegmentWriter(f)
entry := &DeleteBucketRangeWALEntry{
OrgID: influxdb.ID(1),
BucketID: influxdb.ID(2),
Min: 3,
Max: 4,
Predicate: []byte("predicate"),
}
entry := &DeleteBucketRangeWALEntry{
OrgID: influxdb.ID(1),
BucketID: influxdb.ID(2),
Min: 3,
Max: 4,
Predicate: []byte("predicate"),
KeepSeries: false,
}
if err := w.Write(mustMarshalEntry(entry)); err != nil {
fatal(t, "write points", err)
}
if err := w.Write(mustMarshalEntry(entry)); err != nil {
fatal(t, "write points", err)
} else if err := w.Flush(); err != nil {
fatal(t, "flush", err)
}
if err := w.Flush(); err != nil {
fatal(t, "flush", err)
}
if _, err := f.Seek(0, io.SeekStart); err != nil {
fatal(t, "seek", err)
}
if _, err := f.Seek(0, io.SeekStart); err != nil {
fatal(t, "seek", err)
}
r := NewWALSegmentReader(f)
if !r.Next() {
t.Fatalf("expected next, got false")
}
r := NewWALSegmentReader(f)
if e, err := r.Read(); err != nil {
fatal(t, "read entry", err)
} else if e, ok := e.(*DeleteBucketRangeWALEntry); !ok {
t.Fatalf("expected WriteWALEntry: got %#v", e)
} else if !reflect.DeepEqual(entry, e) {
t.Fatalf("expected %+v but got %+v", entry, e)
} else if got, want := e.Type(), DeleteBucketRangeWALEntryType; got != want {
t.Fatalf("Type()=%v, want %v", got, want)
}
})
if !r.Next() {
t.Fatalf("expected next, got false")
}
t.Run("KeepSeries", func(t *testing.T) {
dir := MustTempDir()
defer os.RemoveAll(dir)
f := MustTempFile(dir)
w := NewWALSegmentWriter(f)
we, err := r.Read()
if err != nil {
fatal(t, "read entry", err)
}
entry := &DeleteBucketRangeWALEntry{
OrgID: influxdb.ID(1),
BucketID: influxdb.ID(2),
Min: 3,
Max: 4,
Predicate: []byte("predicate"),
KeepSeries: true,
}
e, ok := we.(*DeleteBucketRangeWALEntry)
if !ok {
t.Fatalf("expected WriteWALEntry: got %#v", e)
}
if err := w.Write(mustMarshalEntry(entry)); err != nil {
fatal(t, "write points", err)
} else if err := w.Flush(); err != nil {
fatal(t, "flush", err)
}
if !reflect.DeepEqual(entry, e) {
t.Fatalf("expected %+v but got %+v", entry, e)
}
if _, err := f.Seek(0, io.SeekStart); err != nil {
fatal(t, "seek", err)
}
r := NewWALSegmentReader(f)
if !r.Next() {
t.Fatalf("expected next, got false")
}
if e, err := r.Read(); err != nil {
fatal(t, "read entry", err)
} else if e, ok := e.(*DeleteBucketRangeWALEntry); !ok {
t.Fatalf("expected WriteWALEntry: got %#v", e)
} else if !reflect.DeepEqual(entry, e) {
t.Fatalf("expected %+v but got %+v", entry, e)
} else if got, want := e.Type(), DeleteBucketRangeKeepSeriesWALEntryType; got != want {
t.Fatalf("Type()=%v, want %v", got, want)
}
})
}
func TestWAL_ClosedSegments(t *testing.T) {

View File

@ -9,6 +9,7 @@ import (
"sync"
"time"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb"
@ -19,7 +20,7 @@ import (
// DeletePrefixRange removes all TSM data belonging to a bucket, and removes all index
// and series file data associated with the bucket. The provided time range ensures
// that only bucket data for that range is removed.
func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, max int64, pred Predicate) error {
func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, max int64, pred Predicate, opts influxdb.DeletePrefixRangeOptions) error {
span, ctx := tracing.StartSpanFromContext(rootCtx)
span.LogKV("name_prefix", fmt.Sprintf("%x", name),
"min", time.Unix(0, min), "max", time.Unix(0, max),
@ -198,7 +199,7 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma
span.LogKV("cache_cardinality", keysChecked)
span.Finish()
if len(possiblyDead.keys) > 0 {
if len(possiblyDead.keys) > 0 && !opts.KeepSeries {
buf := make([]byte, 1024)
// TODO(jeff): all of these methods have possible errors which opens us to partial

View File

@ -7,6 +7,7 @@ import (
"reflect"
"testing"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb/tsm1"
)
@ -44,7 +45,7 @@ func TestEngine_DeletePrefix(t *testing.T) {
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
}
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil); err != nil {
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
t.Fatalf("failed to delete series: %v", err)
}
@ -90,7 +91,7 @@ func TestEngine_DeletePrefix(t *testing.T) {
iter.Close()
// Deleting remaining series should remove them from the series.
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 9, nil); err != nil {
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 9, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
t.Fatalf("failed to delete series: %v", err)
}
@ -149,7 +150,7 @@ func BenchmarkEngine_DeletePrefixRange(b *testing.B) {
}
b.StartTimer()
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil); err != nil {
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
b.Fatal(err)
} else if err := e.Close(); err != nil {
b.Fatal(err)

View File

@ -58,7 +58,7 @@ func TestIndex_SeriesIDSet(t *testing.T) {
// Drop all the series for the gpu measurement and they should no longer
// be in the series ID set.
if err := engine.DeletePrefixRange(context.Background(), []byte("gpu"), math.MinInt64, math.MaxInt64, nil); err != nil {
if err := engine.DeletePrefixRange(context.Background(), []byte("gpu"), math.MinInt64, math.MaxInt64, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
t.Fatal(err)
}
@ -331,7 +331,7 @@ func BenchmarkEngine_DeletePrefixRange_Cache(b *testing.B) {
}
b.StartTimer()
if err := e.DeletePrefixRange(context.Background(), []byte("0000000011221111000000001122112"), 0, math.MaxInt64, nil); err != nil {
if err := e.DeletePrefixRange(context.Background(), []byte("0000000011221111000000001122112"), 0, math.MaxInt64, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
b.Fatal(err)
}
}
@ -346,7 +346,7 @@ func BenchmarkEngine_DeletePrefixRange_Cache(b *testing.B) {
}
b.StartTimer()
if err := e.DeletePrefixRange(context.Background(), []byte("fooasdasdasdasdasd"), 0, math.MaxInt64, nil); err != nil {
if err := e.DeletePrefixRange(context.Background(), []byte("fooasdasdasdasdasd"), 0, math.MaxInt64, nil, influxdb.DeletePrefixRangeOptions{}); err != nil {
b.Fatal(err)
}
}
@ -534,7 +534,7 @@ func (e *Engine) MustDeleteBucketRange(orgID, bucketID influxdb.ID, min, max int
encoded := tsdb.EncodeName(orgID, bucketID)
name := models.EscapeMeasurement(encoded[:])
err := e.DeletePrefixRange(context.Background(), name, min, max, nil)
err := e.DeletePrefixRange(context.Background(), name, min, max, nil, influxdb.DeletePrefixRangeOptions{})
if err != nil {
panic(err)
}