storage: add predicate deletes to the engine interface

pull/13371/head
Jeff Wendling 2019-04-29 10:33:10 -06:00 committed by Jeff Wendling
parent 740d669514
commit e84d4625a5
2 changed files with 106 additions and 4 deletions

View File

@ -251,7 +251,15 @@ func (e *Engine) replayWAL() error {
return err
case *wal.DeleteBucketRangeWALEntry:
return e.deleteBucketRangeLocked(en.OrgID, en.BucketID, en.Min, en.Max)
var pred tsm1.Predicate
if len(en.Predicate) > 0 {
pred, err = tsm1.UnmarshalPredicate(en.Predicate)
if err != nil {
return err
}
}
return e.deleteBucketRangeLocked(en.OrgID, en.BucketID, en.Min, en.Max, pred)
}
return nil
@ -524,18 +532,44 @@ func (e *Engine) DeleteBucketRange(orgID, bucketID platform.ID, min, max int64)
return err
}
return e.deleteBucketRangeLocked(orgID, bucketID, min, max)
return e.deleteBucketRangeLocked(orgID, bucketID, min, max, nil)
}
// DeleteBucketRangePredicate deletes an entire bucket from the storage engine.
func (e *Engine) DeleteBucketRangePredicate(orgID, bucketID platform.ID,
min, max int64, pred tsm1.Predicate) error {
e.mu.RLock()
defer e.mu.RUnlock()
if e.closing == nil {
return ErrEngineClosed
}
// Marshal the predicate to add it to the WAL.
predData, err := pred.Marshal()
if err != nil {
return err
}
// Add the delete to the WAL to be replayed if there is a crash or shutdown.
if _, err := e.wal.DeleteBucketRange(orgID, bucketID, min, max, predData); err != nil {
return err
}
return e.deleteBucketRangeLocked(orgID, bucketID, min, max, pred)
}
// deleteBucketRangeLocked does the work of deleting a bucket range and must be called under
// some sort of lock.
func (e *Engine) deleteBucketRangeLocked(orgID, bucketID platform.ID, min, max int64) error {
func (e *Engine) deleteBucketRangeLocked(orgID, bucketID platform.ID,
min, max int64, pred tsm1.Predicate) error {
// TODO(edd): we need to clean up how we're encoding the prefix so that we
// don't have to remember to get it right everywhere we need to touch TSM data.
encoded := tsdb.EncodeName(orgID, bucketID)
name := models.EscapeMeasurement(encoded[:])
return e.engine.DeletePrefixRange(name, min, max, nil)
return e.engine.DeletePrefixRange(name, min, max, pred)
}
// SeriesCardinality returns the number of series in the engine.

View File

@ -12,7 +12,9 @@ import (
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/storage"
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/tsm1"
)
func TestEngine_WriteAndIndex(t *testing.T) {
@ -250,6 +252,72 @@ func TestEngine_DeleteBucket(t *testing.T) {
}
}
func TestEngine_DeleteBucket_Predicate(t *testing.T) {
engine := NewDefaultEngine()
defer engine.Close()
engine.MustOpen()
p := func(m, f string, kvs ...string) models.Point {
tags := map[string]string{models.FieldKeyTagKey: f, models.MeasurementTagKey: m}
for i := 0; i < len(kvs)-1; i += 2 {
tags[kvs[i]] = kvs[i+1]
}
return models.MustNewPoint(
tsdb.EncodeNameString(engine.org, engine.bucket),
models.NewTags(tags),
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
}
err := engine.Engine.WritePoints(context.TODO(), []models.Point{
p("cpu", "value", "tag1", "val1"),
p("cpu", "value", "tag2", "val2"),
p("cpu", "value", "tag3", "val3"),
p("mem", "value", "tag1", "val1"),
p("mem", "value", "tag2", "val2"),
p("mem", "value", "tag3", "val3"),
})
if err != nil {
t.Fatal(err)
}
// Check the series cardinality.
if got, exp := engine.SeriesCardinality(), int64(6); got != exp {
t.Fatalf("got %d series, exp %d series in index", got, exp)
}
// Construct a predicate to remove tag2
pred, err := tsm1.NewProtobufPredicate(&datatypes.Predicate{
Root: &datatypes.Node{
NodeType: datatypes.NodeTypeComparisonExpression,
Value: &datatypes.Node_Comparison_{Comparison: datatypes.ComparisonEqual},
Children: []*datatypes.Node{
{NodeType: datatypes.NodeTypeTagRef,
Value: &datatypes.Node_TagRefValue{TagRefValue: "tag2"},
},
{NodeType: datatypes.NodeTypeLiteral,
Value: &datatypes.Node_StringValue{StringValue: "val2"},
},
},
},
})
if err != nil {
t.Fatal(err)
}
// Remove the matching series.
if err := engine.DeleteBucketRangePredicate(engine.org, engine.bucket,
math.MinInt64, math.MaxInt64, pred); err != nil {
t.Fatal(err)
}
// Check only matching series were removed.
if got, exp := engine.SeriesCardinality(), int64(4); got != exp {
t.Fatalf("got %d series, exp %d series in index", got, exp)
}
}
func TestEngine_OpenClose(t *testing.T) {
engine := NewDefaultEngine()
engine.MustOpen()