diff --git a/cmd/influxd/launcher/engine.go b/cmd/influxd/launcher/engine.go index f6fca802c0..ab670262bb 100644 --- a/cmd/influxd/launcher/engine.go +++ b/cmd/influxd/launcher/engine.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/influxdata/influxql" + "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/http" "github.com/influxdata/influxdb/v2/kit/platform" @@ -119,8 +121,8 @@ func (t *TemporaryEngine) SeriesCardinality(ctx context.Context, bucketID platfo } // DeleteBucketRangePredicate will delete a bucket from the range and predicate. -func (t *TemporaryEngine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate) error { - return t.engine.DeleteBucketRangePredicate(ctx, orgID, bucketID, min, max, pred) +func (t *TemporaryEngine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error { + return t.engine.DeleteBucketRangePredicate(ctx, orgID, bucketID, min, max, pred, measurement) } func (t *TemporaryEngine) CreateBucket(ctx context.Context, b *influxdb.Bucket) error { diff --git a/delete.go b/delete.go index 68bfc25808..2a40546bb3 100644 --- a/delete.go +++ b/delete.go @@ -3,6 +3,8 @@ package influxdb import ( "context" + "github.com/influxdata/influxql" + "github.com/influxdata/influxdb/v2/kit/platform" ) @@ -15,5 +17,5 @@ type Predicate interface { // DeleteService will delete a bucket from the range and predict. type DeleteService interface { - DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred Predicate) error + DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred Predicate, measurement influxql.Expr) error } diff --git a/http/delete_handler.go b/http/delete_handler.go index 1f0d1ad36a..acb7018c5e 100644 --- a/http/delete_handler.go +++ b/http/delete_handler.go @@ -5,9 +5,12 @@ import ( "context" "encoding/json" "fmt" + "io" http "net/http" "time" + "github.com/influxdata/influxql" + "github.com/influxdata/httprouter" "github.com/influxdata/influxdb/v2" pcontext "github.com/influxdata/influxdb/v2/context" @@ -91,7 +94,7 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) { return } - dr, err := decodeDeleteRequest( + dr, measurement, err := decodeDeleteRequest( ctx, r, h.OrganizationService, h.BucketService, @@ -121,7 +124,7 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) { return } - if err := h.DeleteService.DeleteBucketRangePredicate(r.Context(), dr.Org.ID, dr.Bucket.ID, dr.Start, dr.Stop, dr.Predicate); err != nil { + if err := h.DeleteService.DeleteBucketRangePredicate(r.Context(), dr.Org.ID, dr.Bucket.ID, dr.Start, dr.Stop, dr.Predicate, measurement); err != nil { h.HandleHTTPError(ctx, &errors.Error{ Code: errors.EInternal, Op: "http/handleDelete", @@ -139,24 +142,78 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -func decodeDeleteRequest(ctx context.Context, r *http.Request, orgSvc influxdb.OrganizationService, bucketSvc influxdb.BucketService) (*deleteRequest, error) { +func decodeDeleteRequest(ctx context.Context, r *http.Request, orgSvc influxdb.OrganizationService, bucketSvc influxdb.BucketService) (*deleteRequest, influxql.Expr, error) { dr := new(deleteRequest) - err := json.NewDecoder(r.Body).Decode(dr) + buf, err := io.ReadAll(r.Body) if err != nil { - return nil, &errors.Error{ + je := &errors.Error{ Code: errors.EInvalid, - Msg: "invalid request; error parsing request json", + Msg: "error reading json body", Err: err, } + return nil, nil, je } + buffer := bytes.NewBuffer(buf) + err = json.NewDecoder(buffer).Decode(dr) + if err != nil { + je := &errors.Error{ + Code: errors.EInvalid, + Msg: "error decoding json body", + Err: err, + } + return nil, nil, je + } + + var drd deleteRequestDecode + err = json.Unmarshal(buf, &drd) + if err != nil { + je := &errors.Error{ + Code: errors.EInvalid, + Msg: "error decoding json body for predicate", + Err: err, + } + return nil, nil, je + } + var measurementExpr influxql.Expr + if drd.Predicate != "" { + expr, err := influxql.ParseExpr(drd.Predicate) + if err != nil { + return nil, nil, &errors.Error{ + Code: errors.EInvalid, + Msg: "invalid request; error parsing predicate", + Err: err, + } + } + measurementExpr, _, err = influxql.PartitionExpr(influxql.CloneExpr(expr), func(e influxql.Expr) (bool, error) { + switch e := e.(type) { + case *influxql.BinaryExpr: + switch e.Op { + case influxql.EQ, influxql.NEQ, influxql.EQREGEX, influxql.NEQREGEX: + tag, ok := e.LHS.(*influxql.VarRef) + if ok && tag.Val == "_measurement" { + return true, nil + } + } + } + return false, nil + }) + if err != nil { + return nil, nil, &errors.Error{ + Code: errors.EInvalid, + Msg: "invalid request; error partitioning predicate", + Err: err, + } + } + } + if dr.Org, err = queryOrganization(ctx, r, orgSvc); err != nil { - return nil, err + return nil, nil, err } if dr.Bucket, err = queryBucket(ctx, dr.Org.ID, r, bucketSvc); err != nil { - return nil, err + return nil, nil, err } - return dr, nil + return dr, measurementExpr, nil } type deleteRequest struct { diff --git a/http/delete_test.go b/http/delete_test.go index 8e29f64124..eeabb259db 100644 --- a/http/delete_test.go +++ b/http/delete_test.go @@ -72,7 +72,7 @@ func TestDelete(t *testing.T) { contentType: "application/json; charset=utf-8", body: `{ "code": "invalid", - "message": "invalid request; error parsing request json: invalid RFC3339Nano for field start, please format your time with RFC3339Nano format, example: 2009-01-02T23:00:00Z" + "message": "error decoding json body: invalid RFC3339Nano for field start, please format your time with RFC3339Nano format, example: 2009-01-02T23:00:00Z" }`, }, }, @@ -89,7 +89,7 @@ func TestDelete(t *testing.T) { contentType: "application/json; charset=utf-8", body: `{ "code": "invalid", - "message": "invalid request; error parsing request json: invalid RFC3339Nano for field stop, please format your time with RFC3339Nano format, example: 2009-01-01T23:00:00Z" + "message": "error decoding json body: invalid RFC3339Nano for field stop, please format your time with RFC3339Nano format, example: 2009-01-01T23:00:00Z" }`, }, }, @@ -106,7 +106,7 @@ func TestDelete(t *testing.T) { contentType: "application/json; charset=utf-8", body: fmt.Sprintf(`{ "code": "invalid", - "message": "invalid request; error parsing request json: %s" + "message": "error decoding json body: %s" }`, msgStartTooSoon), }, }, @@ -123,7 +123,7 @@ func TestDelete(t *testing.T) { contentType: "application/json; charset=utf-8", body: fmt.Sprintf(`{ "code": "invalid", - "message": "invalid request; error parsing request json: %s" + "message": "error decoding json body: %s" }`, msgStopTooLate), }, }, @@ -321,7 +321,61 @@ func TestDelete(t *testing.T) { statusCode: http.StatusBadRequest, body: `{ "code": "invalid", - "message": "invalid request; error parsing request json: the logical operator OR is not supported yet at position 25" + "message": "error decoding json body: the logical operator OR is not supported yet at position 25" + }`, + }, + }, + { + name: "unsupported delete measurements", + args: args{ + queryParams: map[string][]string{ + "org": {"org1"}, + "bucket": {"buck1"}, + }, + body: []byte(`{ + "start":"2009-01-01T23:00:00Z", + "stop":"2019-11-10T01:00:00Z", + "predicate": "_measurement=\"cpu\" or _measurement=\"mem\"" + }`), + authorizer: &influxdb.Authorization{ + UserID: user1ID, + Status: influxdb.Active, + Permissions: []influxdb.Permission{ + { + Action: influxdb.WriteAction, + Resource: influxdb.Resource{ + Type: influxdb.BucketsResourceType, + ID: influxtesting.IDPtr(platform.ID(2)), + OrgID: influxtesting.IDPtr(platform.ID(1)), + }, + }, + }, + }, + }, + fields: fields{ + DeleteService: mock.NewDeleteService(), + BucketService: &mock.BucketService{ + FindBucketFn: func(ctx context.Context, f influxdb.BucketFilter) (*influxdb.Bucket, error) { + return &influxdb.Bucket{ + ID: platform.ID(2), + Name: "bucket1", + }, nil + }, + }, + OrganizationService: &mock.OrganizationService{ + FindOrganizationF: func(ctx context.Context, f influxdb.OrganizationFilter) (*influxdb.Organization, error) { + return &influxdb.Organization{ + ID: platform.ID(1), + Name: "org1", + }, nil + }, + }, + }, + wants: wants{ + statusCode: http.StatusBadRequest, + body: `{ + "code": "invalid", + "message": "error decoding json body: the logical operator OR is not supported yet at position 19" }`, }, }, @@ -335,7 +389,7 @@ func TestDelete(t *testing.T) { body: []byte(`{ "start":"2009-01-01T23:00:00Z", "stop":"2019-11-10T01:00:00Z", - "predicate": "tag1=\"v1\" and (tag2=\"v2\" and tag3=\"v3\")" + "predicate": "_measurement=\"testing\" and tag1=\"v1\" and (tag2=\"v2\" and tag3=\"v3\")" }`), authorizer: &influxdb.Authorization{ UserID: user1ID, diff --git a/mock/delete.go b/mock/delete.go index 08b3521015..32b792b2d3 100644 --- a/mock/delete.go +++ b/mock/delete.go @@ -3,6 +3,8 @@ package mock import ( "context" + "github.com/influxdata/influxql" + "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/kit/platform" ) @@ -11,20 +13,20 @@ var _ influxdb.DeleteService = &DeleteService{} // DeleteService is a mock delete server. type DeleteService struct { - DeleteBucketRangePredicateF func(tx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate) error + DeleteBucketRangePredicateF func(tx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error } // NewDeleteService returns a mock DeleteService where its methods will return // zero values. func NewDeleteService() DeleteService { return DeleteService{ - DeleteBucketRangePredicateF: func(tx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate) error { + DeleteBucketRangePredicateF: func(tx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error { return nil }, } } -//DeleteBucketRangePredicate calls DeleteBucketRangePredicateF. -func (s DeleteService) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate) error { - return s.DeleteBucketRangePredicateF(ctx, orgID, bucketID, min, max, pred) +// DeleteBucketRangePredicate calls DeleteBucketRangePredicateF. +func (s DeleteService) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error { + return s.DeleteBucketRangePredicateF(ctx, orgID, bucketID, min, max, pred, measurement) } diff --git a/storage/engine.go b/storage/engine.go index e2bd8ba48f..9b9142f3f9 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -327,7 +327,7 @@ func (e *Engine) DeleteBucket(ctx context.Context, orgID, bucketID platform.ID) // DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data // deleted must be in [min, max], and the key must match the predicate if provided. -func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate) error { +func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID platform.ID, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() @@ -336,7 +336,7 @@ func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID if e.closing == nil { return ErrEngineClosed } - return e.tsdbStore.DeleteSeriesWithPredicate(ctx, bucketID.String(), min, max, pred) + return e.tsdbStore.DeleteSeriesWithPredicate(ctx, bucketID.String(), min, max, pred, measurement) } // RLockKVStore locks the KV store as well as the engine in preparation for doing a backup. diff --git a/tsdb/store.go b/tsdb/store.go index d1c616f2f0..0d37fd1309 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -15,6 +15,8 @@ import ( "sync" "time" + errors3 "github.com/influxdata/influxdb/v2/pkg/errors" + "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/influxql/query" errors2 "github.com/influxdata/influxdb/v2/kit/platform/errors" @@ -1335,7 +1337,7 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) { // DeleteSeries loops through the local shards and deletes the series data for // the passed in series keys. -func (s *Store) DeleteSeriesWithPredicate(ctx context.Context, database string, min, max int64, pred influxdb.Predicate) error { +func (s *Store) DeleteSeriesWithPredicate(ctx context.Context, database string, min, max int64, pred influxdb.Predicate, measurement influxql.Expr) error { s.mu.RLock() if s.databases[database].hasMultipleIndexTypes() { s.mu.RUnlock() @@ -1355,7 +1357,7 @@ func (s *Store) DeleteSeriesWithPredicate(ctx context.Context, database string, // of series keys can be very memory intensive if run concurrently. limit := limiter.NewFixed(1) - return s.walkShards(shards, func(sh *Shard) error { + return s.walkShards(shards, func(sh *Shard) (err error) { if err := limit.Take(ctx); err != nil { return err } @@ -1372,12 +1374,43 @@ func (s *Store) DeleteSeriesWithPredicate(ctx context.Context, database string, return err } + measurementName := make([]byte, 0) + + if measurement != nil { + if m, ok := measurement.(*influxql.BinaryExpr); ok { + rhs, ok := m.RHS.(*influxql.VarRef) + if ok { + measurementName = []byte(rhs.Val) + exists, err := sh.MeasurementExists(measurementName) + if err != nil { + return err + } + if !exists { + return nil + } + } + } + } + // Find matching series keys for each measurement. mitr, err := index.MeasurementIterator() if err != nil { return err } - defer mitr.Close() + defer errors3.Capture(&err, mitr.Close)() + + deleteSeries := func(mm []byte) error { + sitr, err := index.MeasurementSeriesIDIterator(mm) + if err != nil { + return err + } else if sitr == nil { + return nil + } + defer errors3.Capture(&err, sitr.Close)() + + itr := NewSeriesIteratorAdapter(sfile, NewPredicateSeriesIDIterator(sitr, sfile, pred)) + return sh.DeleteSeriesRange(ctx, itr, min, max) + } for { mm, err := mitr.Next() @@ -1387,19 +1420,14 @@ func (s *Store) DeleteSeriesWithPredicate(ctx context.Context, database string, break } - if err := func() error { - sitr, err := index.MeasurementSeriesIDIterator(mm) + // If we are deleting within a measurement and have found a match, we can return after the delete. + if measurementName != nil && bytes.Equal(mm, measurementName) { + return deleteSeries(mm) + } else { + err := deleteSeries(mm) if err != nil { return err - } else if sitr == nil { - return nil } - defer sitr.Close() - - itr := NewSeriesIteratorAdapter(sfile, NewPredicateSeriesIDIterator(sitr, sfile, pred)) - return sh.DeleteSeriesRange(ctx, itr, min, max) - }(); err != nil { - return err } } diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 1043c59eb0..d9b13f03f3 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -6,6 +6,7 @@ import ( "context" "errors" "fmt" + "github.com/influxdata/influxdb/v2/predicate" "math" "math/rand" "os" @@ -2125,6 +2126,58 @@ func TestStore_TagValues_ConcurrentDropShard(t *testing.T) { } } +func TestStore_DeleteByPredicate(t *testing.T) { + test := func(t *testing.T, index string) error { + s := MustOpenStore(t, index) + defer s.Close() + + s.MustCreateShardWithData("db0", "rp0", 0, + `cpu,host=serverA value=1 0`, + `cpu,region=west value=3 20`, + `cpu,secret=foo value=5 30`, + `mem,secret=foo value=1 30`, + `disk value=4 30`, + ) + + p, err := predicate.Parse(`_measurement="cpu"`) + if err != nil { + return err + } + + pred, err := predicate.New(p) + if err != nil { + return err + } + + expr, err := influxql.ParseExpr(`_measurement="cpu"`) + if err != nil { + return err + } + + err = s.DeleteSeriesWithPredicate(context.Background(), "db0", math.MinInt, math.MaxInt, pred, expr) + if err != nil { + return err + } + + names, err := s.MeasurementNames(context.Background(), query.OpenAuthorizer, "db0", nil) + if err != nil { + return err + } + + require.Equal(t, 2, len(names), "expected cpu to be deleted, leaving 2 measurements") + + return nil + } + + for _, index := range tsdb.RegisteredIndexes() { + t.Run(index, func(t *testing.T) { + if err := test(t, index); err != nil { + t.Fatal(err) + } + }) + } +} + func BenchmarkStore_SeriesCardinality_100_Shards(b *testing.B) { for _, index := range tsdb.RegisteredIndexes() { store := NewStore(b, index)