diff --git a/storage/engine.go b/storage/engine.go index 9793d45002..d5dbbce666 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -251,7 +251,15 @@ func (e *Engine) replayWAL() error { return err case *wal.DeleteBucketRangeWALEntry: - return e.deleteBucketRangeLocked(en.OrgID, en.BucketID, en.Min, en.Max) + var pred tsm1.Predicate + if len(en.Predicate) > 0 { + pred, err = tsm1.UnmarshalPredicate(en.Predicate) + if err != nil { + return err + } + } + + return e.deleteBucketRangeLocked(en.OrgID, en.BucketID, en.Min, en.Max, pred) } return nil @@ -524,18 +532,44 @@ func (e *Engine) DeleteBucketRange(orgID, bucketID platform.ID, min, max int64) return err } - return e.deleteBucketRangeLocked(orgID, bucketID, min, max) + return e.deleteBucketRangeLocked(orgID, bucketID, min, max, nil) +} + +// DeleteBucketRangePredicate deletes an entire bucket from the storage engine. +func (e *Engine) DeleteBucketRangePredicate(orgID, bucketID platform.ID, + min, max int64, pred tsm1.Predicate) error { + + e.mu.RLock() + defer e.mu.RUnlock() + if e.closing == nil { + return ErrEngineClosed + } + + // Marshal the predicate to add it to the WAL. + predData, err := pred.Marshal() + if err != nil { + return err + } + + // Add the delete to the WAL to be replayed if there is a crash or shutdown. + if _, err := e.wal.DeleteBucketRange(orgID, bucketID, min, max, predData); err != nil { + return err + } + + return e.deleteBucketRangeLocked(orgID, bucketID, min, max, pred) } // deleteBucketRangeLocked does the work of deleting a bucket range and must be called under // some sort of lock. -func (e *Engine) deleteBucketRangeLocked(orgID, bucketID platform.ID, min, max int64) error { +func (e *Engine) deleteBucketRangeLocked(orgID, bucketID platform.ID, + min, max int64, pred tsm1.Predicate) error { + // TODO(edd): we need to clean up how we're encoding the prefix so that we // don't have to remember to get it right everywhere we need to touch TSM data. encoded := tsdb.EncodeName(orgID, bucketID) name := models.EscapeMeasurement(encoded[:]) - return e.engine.DeletePrefixRange(name, min, max, nil) + return e.engine.DeletePrefixRange(name, min, max, pred) } // SeriesCardinality returns the number of series in the engine. diff --git a/storage/engine_test.go b/storage/engine_test.go index 9436b50cc9..0deeef6c77 100644 --- a/storage/engine_test.go +++ b/storage/engine_test.go @@ -12,7 +12,9 @@ import ( "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/storage" + "github.com/influxdata/influxdb/storage/reads/datatypes" "github.com/influxdata/influxdb/tsdb" + "github.com/influxdata/influxdb/tsdb/tsm1" ) func TestEngine_WriteAndIndex(t *testing.T) { @@ -250,6 +252,72 @@ func TestEngine_DeleteBucket(t *testing.T) { } } +func TestEngine_DeleteBucket_Predicate(t *testing.T) { + engine := NewDefaultEngine() + defer engine.Close() + engine.MustOpen() + + p := func(m, f string, kvs ...string) models.Point { + tags := map[string]string{models.FieldKeyTagKey: f, models.MeasurementTagKey: m} + for i := 0; i < len(kvs)-1; i += 2 { + tags[kvs[i]] = kvs[i+1] + } + return models.MustNewPoint( + tsdb.EncodeNameString(engine.org, engine.bucket), + models.NewTags(tags), + map[string]interface{}{"value": 1.0}, + time.Unix(1, 2), + ) + } + + err := engine.Engine.WritePoints(context.TODO(), []models.Point{ + p("cpu", "value", "tag1", "val1"), + p("cpu", "value", "tag2", "val2"), + p("cpu", "value", "tag3", "val3"), + p("mem", "value", "tag1", "val1"), + p("mem", "value", "tag2", "val2"), + p("mem", "value", "tag3", "val3"), + }) + if err != nil { + t.Fatal(err) + } + + // Check the series cardinality. + if got, exp := engine.SeriesCardinality(), int64(6); got != exp { + t.Fatalf("got %d series, exp %d series in index", got, exp) + } + + // Construct a predicate to remove tag2 + pred, err := tsm1.NewProtobufPredicate(&datatypes.Predicate{ + Root: &datatypes.Node{ + NodeType: datatypes.NodeTypeComparisonExpression, + Value: &datatypes.Node_Comparison_{Comparison: datatypes.ComparisonEqual}, + Children: []*datatypes.Node{ + {NodeType: datatypes.NodeTypeTagRef, + Value: &datatypes.Node_TagRefValue{TagRefValue: "tag2"}, + }, + {NodeType: datatypes.NodeTypeLiteral, + Value: &datatypes.Node_StringValue{StringValue: "val2"}, + }, + }, + }, + }) + if err != nil { + t.Fatal(err) + } + + // Remove the matching series. + if err := engine.DeleteBucketRangePredicate(engine.org, engine.bucket, + math.MinInt64, math.MaxInt64, pred); err != nil { + t.Fatal(err) + } + + // Check only matching series were removed. + if got, exp := engine.SeriesCardinality(), int64(4); got != exp { + t.Fatalf("got %d series, exp %d series in index", got, exp) + } +} + func TestEngine_OpenClose(t *testing.T) { engine := NewDefaultEngine() engine.MustOpen()