feat(tsdb): Implement delete with predicate.

pull/20236/head
Ben Johnson 2020-12-02 12:07:53 -07:00
parent e0543e6e3a
commit 7dafc2cf34
8 changed files with 187 additions and 13 deletions

View File

@ -18,6 +18,7 @@ want to use the default.
1. [20123](https://github.com/influxdata/influxdb/pull/20123): Allow password to be specified as a CLI option in `influx v1 auth set-password`. 1. [20123](https://github.com/influxdata/influxdb/pull/20123): Allow password to be specified as a CLI option in `influx v1 auth set-password`.
1. [20110](https://github.com/influxdata/influxdb/pull/20110): Allow for users to specify where V2 config should be written in `influxd upgrade`. 1. [20110](https://github.com/influxdata/influxdb/pull/20110): Allow for users to specify where V2 config should be written in `influxd upgrade`.
1. [20204](https://github.com/influxdata/influxdb/pull/20204): Improve ID-related error messages for `influx v1 dbrp` commands. 1. [20204](https://github.com/influxdata/influxdb/pull/20204): Improve ID-related error messages for `influx v1 dbrp` commands.
1. [20236](https://github.com/influxdata/influxdb/pull/20236): Delete with predicate.
### Bug Fixes ### Bug Fixes

View File

@ -1,6 +1,7 @@
package launcher_test package launcher_test
import ( import (
"context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
nethttp "net/http" nethttp "net/http"
@ -156,6 +157,51 @@ func TestLauncher_BucketDelete(t *testing.T) {
} }
} }
func TestLauncher_DeleteWithPredicate(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)
// Write data to server.
if resp, err := nethttp.DefaultClient.Do(l.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", l.Org.ID, l.Bucket.ID),
"cpu,region=us-east-1 v=1 946684800000000000\n"+
"cpu,region=us-west-1 v=1 946684800000000000\n"+
"mem,region=us-west-1 v=1 946684800000000000\n",
)); err != nil {
t.Fatal(err)
} else if err := resp.Body.Close(); err != nil {
t.Fatal(err)
}
// Execute single write against the server.
s := http.DeleteService{
Addr: l.URL(),
Token: l.Auth.Token,
}
if err := s.DeleteBucketRangePredicate(context.Background(), http.DeleteRequest{
OrgID: l.Org.ID.String(),
BucketID: l.Bucket.ID.String(),
Start: "2000-01-01T00:00:00Z",
Stop: "2000-01-02T00:00:00Z",
Predicate: `_measurement="cpu" AND region="us-west-1"`,
}); err != nil {
t.Fatal(err)
}
// Query server to ensure write persists.
qs := `from(bucket:"BUCKET") |> range(start:2000-01-01T00:00:00Z,stop:2000-01-02T00:00:00Z)`
exp := `,result,table,_start,_stop,_time,_value,_field,_measurement,region` + "\r\n" +
`,_result,0,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,1,v,cpu,us-east-1` + "\r\n" +
`,_result,1,2000-01-01T00:00:00Z,2000-01-02T00:00:00Z,2000-01-01T00:00:00Z,1,v,mem,us-west-1` + "\r\n\r\n"
buf, err := http.SimpleQuery(l.URL(), qs, l.Org.Name, l.Auth.Token)
if err != nil {
t.Fatalf("unexpected error querying server: %v", err)
} else if diff := cmp.Diff(string(buf), exp); diff != "" {
t.Fatal(diff)
}
}
func TestLauncher_UpdateRetentionPolicy(t *testing.T) { func TestLauncher_UpdateRetentionPolicy(t *testing.T) {
l := launcher.RunTestLauncherOrFail(t, ctx, nil) l := launcher.RunTestLauncherOrFail(t, ctx, nil)
l.SetupOrFail(t) l.SetupOrFail(t)

View File

@ -114,16 +114,22 @@ func (h *DeleteHandler) handleDelete(w http.ResponseWriter, r *http.Request) {
return return
} }
h.HandleHTTPError(r.Context(), &influxdb.Error{ if err := h.DeleteService.DeleteBucketRangePredicate(r.Context(), dr.Org.ID, dr.Bucket.ID, dr.Start, dr.Stop, dr.Predicate); err != nil {
Code: influxdb.ENotImplemented, h.HandleHTTPError(ctx, &influxdb.Error{
Op: "http/handleDelete", Code: influxdb.EInternal,
Msg: "Not implemented", Op: "http/handleDelete",
}, w) Msg: fmt.Sprintf("unable to delete: %v", err),
Err: err,
}, w)
return
}
h.log.Debug("Deleted", h.log.Debug("Deleted",
zap.String("orgID", fmt.Sprint(dr.Org.ID.String())), zap.String("orgID", fmt.Sprint(dr.Org.ID.String())),
zap.String("bucketID", fmt.Sprint(dr.Bucket.ID.String())), zap.String("bucketID", fmt.Sprint(dr.Bucket.ID.String())),
) )
w.WriteHeader(http.StatusNoContent)
} }
func decodeDeleteRequest(ctx context.Context, r *http.Request, orgSvc influxdb.OrganizationService, bucketSvc influxdb.BucketService) (*deleteRequest, error) { func decodeDeleteRequest(ctx context.Context, r *http.Request, orgSvc influxdb.OrganizationService, bucketSvc influxdb.BucketService) (*deleteRequest, error) {

View File

@ -228,7 +228,7 @@ func TestDelete(t *testing.T) {
}, },
}, },
wants: wants{ wants: wants{
statusCode: http.StatusNotImplemented, statusCode: http.StatusNoContent,
body: ``, body: ``,
}, },
}, },
@ -333,7 +333,7 @@ func TestDelete(t *testing.T) {
}, },
}, },
wants: wants{ wants: wants{
statusCode: http.StatusNotImplemented, statusCode: http.StatusNoContent,
body: ``, body: ``,
}, },
}, },

View File

@ -16,6 +16,7 @@ func New(n Node) (influxdb.Predicate, error) {
if n == nil { if n == nil {
return nil, nil return nil, nil
} }
dt, err := n.ToDataType() dt, err := n.ToDataType()
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -299,15 +299,18 @@ func (e *Engine) DeleteBucket(ctx context.Context, orgID, bucketID influxdb.ID)
return e.tsdbStore.DeleteDatabase(bucketID.String()) return e.tsdbStore.DeleteDatabase(bucketID.String())
} }
// DeleteBucketRange deletes an entire range of data from the storage engine.
func (e *Engine) DeleteBucketRange(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64) error {
return ErrNotImplemented
}
// DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data // DeleteBucketRangePredicate deletes data within a bucket from the storage engine. Any data
// deleted must be in [min, max], and the key must match the predicate if provided. // deleted must be in [min, max], and the key must match the predicate if provided.
func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error { func (e *Engine) DeleteBucketRangePredicate(ctx context.Context, orgID, bucketID influxdb.ID, min, max int64, pred influxdb.Predicate) error {
return ErrNotImplemented span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return ErrEngineClosed
}
return e.tsdbStore.DeleteSeriesWithPredicate(bucketID.String(), min, max, pred)
} }
func (e *Engine) BackupKVStore(ctx context.Context, w io.Writer) error { func (e *Engine) BackupKVStore(ctx context.Context, w io.Writer) error {

View File

@ -9,6 +9,7 @@ import (
"sort" "sort"
"sync" "sync"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/influxql/query" "github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/pkg/bytesutil" "github.com/influxdata/influxdb/v2/pkg/bytesutil"
@ -157,6 +158,50 @@ func (e *seriesElemAdapter) Tags() models.Tags { return e.tags }
func (e *seriesElemAdapter) Deleted() bool { return e.deleted } func (e *seriesElemAdapter) Deleted() bool { return e.deleted }
func (e *seriesElemAdapter) Expr() influxql.Expr { return e.expr } func (e *seriesElemAdapter) Expr() influxql.Expr { return e.expr }
var _ SeriesIDIterator = (*PredicateSeriesIDIterator)(nil)
type PredicateSeriesIDIterator struct {
itr SeriesIDIterator
sfile *SeriesFile
pred influxdb.Predicate
}
func NewPredicateSeriesIDIterator(itr SeriesIDIterator, sfile *SeriesFile, pred influxdb.Predicate) SeriesIDIterator {
if pred == nil {
return itr
}
return &PredicateSeriesIDIterator{
itr: itr,
sfile: sfile,
pred: pred,
}
}
func (itr *PredicateSeriesIDIterator) Close() error { return itr.itr.Close() }
func (itr *PredicateSeriesIDIterator) Next() (SeriesIDElem, error) {
for {
elem, err := itr.itr.Next()
if elem.SeriesID == 0 || err != nil {
return elem, err
}
// Skip if this key has been tombstoned.
seriesKey := itr.sfile.SeriesKey(elem.SeriesID)
if len(seriesKey) == 0 {
continue
}
name, tags := ParseSeriesKey(seriesKey)
tags = append(models.Tags{{Key: models.MeasurementTagKeyBytes, Value: name}}, tags...)
key := models.MakeKey(name, tags)
if !itr.pred.Matches(key) {
continue
}
return elem, nil
}
}
// SeriesIDElem represents a single series and optional expression. // SeriesIDElem represents a single series and optional expression.
type SeriesIDElem struct { type SeriesIDElem struct {
SeriesID uint64 SeriesID uint64

View File

@ -1278,6 +1278,78 @@ func (s *Store) ShardRelativePath(id uint64) (string, error) {
return relativePath(s.path, shard.path) return relativePath(s.path, shard.path)
} }
// DeleteSeries loops through the local shards and deletes the series data for
// the passed in series keys.
func (s *Store) DeleteSeriesWithPredicate(database string, min, max int64, pred influxdb.Predicate) error {
s.mu.RLock()
if s.databases[database].hasMultipleIndexTypes() {
s.mu.RUnlock()
return ErrMultipleIndexTypes
}
sfile := s.sfiles[database]
if sfile == nil {
s.mu.RUnlock()
// No series file means nothing has been written to this DB and thus nothing to delete.
return nil
}
shards := s.filterShards(byDatabase(database))
epochs := s.epochsForShards(shards)
s.mu.RUnlock()
// Limit to 1 delete for each shard since expanding the measurement into the list
// of series keys can be very memory intensive if run concurrently.
limit := limiter.NewFixed(1)
return s.walkShards(shards, func(sh *Shard) error {
limit.Take()
defer limit.Release()
// install our guard and wait for any prior deletes to finish. the
// guard ensures future deletes that could conflict wait for us.
waiter := epochs[sh.id].WaitDelete(newGuard(min, max, nil, nil))
waiter.Wait()
defer waiter.Done()
index, err := sh.Index()
if err != nil {
return err
}
// Find matching series keys for each measurement.
mitr, err := index.MeasurementIterator()
if err != nil {
return err
}
defer mitr.Close()
for {
mm, err := mitr.Next()
if err != nil {
return err
} else if mm == nil {
break
}
if err := func() error {
sitr, err := index.MeasurementSeriesIDIterator(mm)
if err != nil {
return err
} else if sitr == nil {
return nil
}
defer sitr.Close()
itr := NewSeriesIteratorAdapter(sfile, NewPredicateSeriesIDIterator(sitr, sfile, pred))
return sh.DeleteSeriesRange(itr, min, max)
}(); err != nil {
return err
}
}
return nil
})
}
// DeleteSeries loops through the local shards and deletes the series data for // DeleteSeries loops through the local shards and deletes the series data for
// the passed in series keys. // the passed in series keys.
func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error { func (s *Store) DeleteSeries(database string, sources []influxql.Source, condition influxql.Expr) error {