From 7dafc2cf34fa36b3fff74fe8d77fe2fa2c7a3217 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Wed, 2 Dec 2020 12:07:53 -0700 Subject: [PATCH] feat(tsdb): Implement delete with predicate. --- CHANGELOG.md | 1 + cmd/influxd/launcher/storage_test.go | 46 ++++++++++++++++++ http/delete_handler.go | 16 +++++-- http/delete_test.go | 4 +- predicate/predicate.go | 1 + storage/engine.go | 15 +++--- tsdb/index.go | 45 +++++++++++++++++ tsdb/store.go | 72 ++++++++++++++++++++++++++++ 8 files changed, 187 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 428aa6e4c7..0145d22be3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ want to use the default. 1. [20123](https://github.com/influxdata/influxdb/pull/20123): Allow password to be specified as a CLI option in `influx v1 auth set-password`. 1. [20110](https://github.com/influxdata/influxdb/pull/20110): Allow for users to specify where V2 config should be written in `influxd upgrade`. 1. [20204](https://github.com/influxdata/influxdb/pull/20204): Improve ID-related error messages for `influx v1 dbrp` commands. +1. [20236](https://github.com/influxdata/influxdb/pull/20236): Delete with predicate. ### Bug Fixes diff --git a/cmd/influxd/launcher/storage_test.go b/cmd/influxd/launcher/storage_test.go index 8f8bf3d3b5..d9913fa8c0 100644 --- a/cmd/influxd/launcher/storage_test.go +++ b/cmd/influxd/launcher/storage_test.go @@ -1,6 +1,7 @@ package launcher_test import ( + "context" "fmt" "io/ioutil" nethttp "net/http" @@ -156,6 +157,51 @@ func TestLauncher_BucketDelete(t *testing.T) { } } +func TestLauncher_DeleteWithPredicate(t *testing.T) { + l := launcher.RunTestLauncherOrFail(t, ctx, nil) + l.SetupOrFail(t) + defer l.ShutdownOrFail(t, ctx) + + // Write data to server. + if resp, err := nethttp.DefaultClient.Do(l.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", l.Org.ID, l.Bucket.ID), + "cpu,region=us-east-1 v=1 946684800000000000\n"+ + "cpu,region=us-west-1 v=1 946684800000000000\n"+ + "mem,region=us-west-1 v=1 946684800000000000\n", + )); err != nil { + t.Fatal(err) + } else if err := resp.Body.Close(); err != nil { + t.Fatal(err) + } + + // Execute single write against the server. + s := http.DeleteService{ + Addr: l.URL(), + Token: l.Auth.Token, + } + if err := s.DeleteBucketRangePredicate(context.Background(), http.DeleteRequest{ + OrgID: l.Org.ID.String(), + BucketID: l.Bucket.ID.String(), + Start: "2000-01-01T00:00:00Z", + Stop: "2000-01-02T00:00:00Z", + Predicate: `_measurement="cpu" AND region="us-west-1"`, + }); err != nil { + t.Fatal(err) + } + + // Query server to ensure write persists. + qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z)` + exp := `,result,table,_start,_stop,_time,_value,_field,_measurement,region` + "\r\n" + + `,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,1,v,cpu,us-east-1` + "\r\n" + + `,_result,1,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,1,v,mem,us-west-1` + "\r\n\r\n" + + buf, err := http.SimpleQuery(l.URL(), qs, l.Org.Name, l.Auth.Token) + if err != nil { + t.Fatalf("unexpected error querying server: %v", err) + } else if diff := cmp.Diff(string(buf), exp); diff != "" { + t.Fatal(diff) + } +} + func TestLauncher_UpdateRetentionPolicy(t *testing.T) { l := launcher.RunTestLauncherOrFail(t, ctx, nil) l.SetupOrFail(t) diff --git a/http/delete_handler.go b/http/delete_handler.go index 4d5d150d13..13cd048daf 100644 --- a/http/delete_handler.go +++ b/http/delete_handler.go @@ -114,16 +114,22 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) { return } - h.HandleHTTPError(r.Context(), &influxdb.Error{ - Code: influxdb.ENotImplemented, - Op: "http/handleDelete", - Msg: "Not implemented", - }, w) + if err := h.DeleteService.DeleteBucketRangePredicate(r.Context(), dr.Org.ID, dr.Bucket.ID, dr.Start, dr.Stop, dr.Predicate); err != nil { + h.HandleHTTPError(ctx, &influxdb.Error{ + Code: influxdb.EInternal, + Op: "http/handleDelete", + Msg: fmt.Sprintf("unable to delete: %v", err), + Err: err, + }, w) + return + } h.log.Debug("Deleted", zap.String("orgID", fmt.Sprint(dr.Org.ID.String())), zap.String("bucketID", fmt.Sprint(dr.Bucket.ID.String())), ) + + w.WriteHeader(http.StatusNoContent) } func decodeDeleteRequest(ctx context.Context, r *http.Request, orgSvc influxdb.OrganizationService, bucketSvc influxdb.BucketService) (*deleteRequest, error) { diff --git a/http/delete_test.go b/http/delete_test.go index fca803ef42..4be4f23e1f 100644 --- a/http/delete_test.go +++ b/http/delete_test.go @@ -228,7 +228,7 @@ func TestDelete(t *testing.T) { }, }, wants: wants{ - statusCode: http.StatusNotImplemented, + statusCode: http.StatusNoContent, body: ``, }, }, @@ -333,7 +333,7 @@ func TestDelete(t *testing.T) { }, }, wants: wants{ - statusCode: http.StatusNotImplemented, + statusCode: http.StatusNoContent, body: ``, }, }, diff --git a/predicate/predicate.go b/predicate/predicate.go index 4a7d3f43dd..ffed5161a5 100644 --- a/predicate/predicate.go +++ b/predicate/predicate.go @@ -16,6 +16,7 @@ func New(n Node) (influxdb.Predicate, error) { if n == nil { return nil, nil } + dt, err := n.ToDataType() if err != nil { return nil, err diff --git a/storage/engine.go b/storage/engine.go index 18ecebaeaa..44a28fe2da 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -299,15 +299,18 @@ func (e *Engine) DeleteBucket(ctx context.Context, orgID, bucketID influxdb.ID) return e.tsdbStore.DeleteDatabase(bucketID.String()) } -// DeleteBucketRange deletes an entire range of data from the storage engine. -func (e *Engine) DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64) error { - return ErrNotImplemented -} - // DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data // deleted must be in [min, max], and the key must match the predicate if provided. func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error { - return ErrNotImplemented + span, _ := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + e.mu.RLock() + defer e.mu.RUnlock() + if e.closing == nil { + return ErrEngineClosed + } + return e.tsdbStore.DeleteSeriesWithPredicate(bucketID.String(), min, max, pred) } func (e *Engine) BackupKVStore(ctx context.Context, w io.Writer) error { diff --git a/tsdb/index.go b/tsdb/index.go index 9a64fe6659..0d7740594f 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -9,6 +9,7 @@ import ( "sort" "sync" + "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/pkg/bytesutil" @@ -157,6 +158,50 @@ func (e *seriesElemAdapter) Tags() models.Tags { return e.tags } func (e *seriesElemAdapter) Deleted() bool { return e.deleted } func (e *seriesElemAdapter) Expr() influxql.Expr { return e.expr } +var _ SeriesIDIterator = (*PredicateSeriesIDIterator)(nil) + +type PredicateSeriesIDIterator struct { + itr SeriesIDIterator + sfile *SeriesFile + pred influxdb.Predicate +} + +func NewPredicateSeriesIDIterator(itr SeriesIDIterator, sfile *SeriesFile, pred influxdb.Predicate) SeriesIDIterator { + if pred == nil { + return itr + } + return &PredicateSeriesIDIterator{ + itr: itr, + sfile: sfile, + pred: pred, + } +} + +func (itr *PredicateSeriesIDIterator) Close() error { return itr.itr.Close() } + +func (itr *PredicateSeriesIDIterator) Next() (SeriesIDElem, error) { + for { + elem, err := itr.itr.Next() + if elem.SeriesID == 0 || err != nil { + return elem, err + } + + // Skip if this key has been tombstoned. + seriesKey := itr.sfile.SeriesKey(elem.SeriesID) + if len(seriesKey) == 0 { + continue + } + + name, tags := ParseSeriesKey(seriesKey) + tags = append(models.Tags{{Key: models.MeasurementTagKeyBytes, Value: name}}, tags...) + key := models.MakeKey(name, tags) + if !itr.pred.Matches(key) { + continue + } + return elem, nil + } +} + // SeriesIDElem represents a single series and optional expression. type SeriesIDElem struct { SeriesID uint64 diff --git a/tsdb/store.go b/tsdb/store.go index 646f2c6e17..b8c1ca02d7 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -1278,6 +1278,78 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) { return relativePath(s.path, shard.path) } +// DeleteSeries loops through the local shards and deletes the series data for +// the passed in series keys. +func (s *Store) DeleteSeriesWithPredicate(database string, min, max int64, pred influxdb.Predicate) error { + s.mu.RLock() + if s.databases[database].hasMultipleIndexTypes() { + s.mu.RUnlock() + return ErrMultipleIndexTypes + } + sfile := s.sfiles[database] + if sfile == nil { + s.mu.RUnlock() + // No series file means nothing has been written to this DB and thus nothing to delete. + return nil + } + shards := s.filterShards(byDatabase(database)) + epochs := s.epochsForShards(shards) + s.mu.RUnlock() + + // Limit to 1 delete for each shard since expanding the measurement into the list + // of series keys can be very memory intensive if run concurrently. + limit := limiter.NewFixed(1) + + return s.walkShards(shards, func(sh *Shard) error { + limit.Take() + defer limit.Release() + + // install our guard and wait for any prior deletes to finish. the + // guard ensures future deletes that could conflict wait for us. + waiter := epochs[sh.id].WaitDelete(newGuard(min, max, nil, nil)) + waiter.Wait() + defer waiter.Done() + + index, err := sh.Index() + if err != nil { + return err + } + + // Find matching series keys for each measurement. + mitr, err := index.MeasurementIterator() + if err != nil { + return err + } + defer mitr.Close() + + for { + mm, err := mitr.Next() + if err != nil { + return err + } else if mm == nil { + break + } + + if err := func() error { + sitr, err := index.MeasurementSeriesIDIterator(mm) + if err != nil { + return err + } else if sitr == nil { + return nil + } + defer sitr.Close() + + itr := NewSeriesIteratorAdapter(sfile, NewPredicateSeriesIDIterator(sitr, sfile, pred)) + return sh.DeleteSeriesRange(itr, min, max) + }(); err != nil { + return err + } + } + + return nil + }) +} + // DeleteSeries loops through the local shards and deletes the series data for // the passed in series keys. func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {