refactor(query/stdlib): modify storage filters to use the predicate directly (#17650)

The storage filters are modified to use the predicates directly so we do
not have to pass `semantic.FunctionExpression` around. Instead, since
simple expressions are all that are supported anyway, we transform
suitable function expressions into predicates as part of the push down
rule and this simplifies the influxdb reader code.

This also moves the storage predicate conversion code into the standard
library package as it is the only location that uses this code now that
the predicate conversion is done as part of the push down rule.

This refactor was prompted by another refactor of the
`semantic.FunctionExpression` that would cause it to always contain a
`semantic.Block`. Since the push down filter needs the expressions and
to combine them, this refactor allows us not do construct a combined
filter inside of blocks which allows us to have better type safety.
pull/17653/head
Jonathan A. Sternberg 2020-04-07 10:45:08 -05:00 committed by GitHub
parent 17c244d1a7
commit 1bb08ceaf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 111 additions and 173 deletions

View File

@ -7,9 +7,9 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
)
const (
@ -50,12 +50,10 @@ type ReadRangePhysSpec struct {
Bucket string
BucketID string
// FilterSet is set to true if there is a filter.
FilterSet bool
// Filter is the filter to use when calling into
// storage. It must be possible to push down this
// filter.
Filter *semantic.FunctionExpression
Filter *datatypes.Predicate
Bounds flux.Bounds
}
@ -64,19 +62,8 @@ func (s *ReadRangePhysSpec) Kind() plan.ProcedureKind {
return ReadRangePhysKind
}
func (s *ReadRangePhysSpec) Copy() plan.ProcedureSpec {
ns := new(ReadRangePhysSpec)
ns.Bucket = s.Bucket
ns.BucketID = s.BucketID
ns.FilterSet = s.FilterSet
if ns.FilterSet {
ns.Filter = s.Filter.Copy().(*semantic.FunctionExpression)
}
ns.Bounds = s.Bounds
return ns
ns := *s
return &ns
}
func (s *ReadRangePhysSpec) LookupBucketID(ctx context.Context, orgID influxdb.ID, buckets BucketLookup) (influxdb.ID, error) {

View File

@ -134,7 +134,7 @@ func (PushDownFilterRule) Rewrite(pn plan.Node) (plan.Node, bool, error) {
return pn, false, nil
}
bodyExpr, ok := getFunctionBodyExpr(filterSpec.Fn.Fn.Block)
bodyExpr, ok := filterSpec.Fn.Fn.GetFunctionBodyExpression()
if !ok {
return pn, false, nil
}
@ -159,17 +159,26 @@ func (PushDownFilterRule) Rewrite(pn plan.Node) (plan.Node, bool, error) {
}
pushable, _ = rewritePushableExpr(pushable)
newFromSpec := fromSpec.Copy().(*ReadRangePhysSpec)
if newFromSpec.FilterSet {
newBody := semantic.ExprsToConjunction(newFromSpec.Filter.Block.Body.(semantic.Expression), pushable)
newFromSpec.Filter.Block.Body = newBody
} else {
newFromSpec.FilterSet = true
// NOTE: We lose the scope here, but that is ok because we can't push down the scope to storage.
newFromSpec.Filter = filterSpec.Fn.Fn.Copy().(*semantic.FunctionExpression)
newFromSpec.Filter.Block.Body = pushable
// Convert the pushable expression to a storage predicate.
predicate, err := ToStoragePredicate(pushable, paramName)
if err != nil {
return nil, false, err
}
// If the filter has already been set, then combine the existing predicate
// with the new one.
if fromSpec.Filter != nil {
mergedPredicate, err := mergePredicates(ast.AndOperator, fromSpec.Filter, predicate)
if err != nil {
return nil, false, err
}
predicate = mergedPredicate
}
// Copy the specification and set the predicate.
newFromSpec := fromSpec.Copy().(*ReadRangePhysSpec)
newFromSpec.Filter = predicate
if notPushable == nil {
// All predicates could be pushed down, so eliminate the filter
mergedNode, err := plan.MergeToPhysicalNode(pn, fromNode, newFromSpec)
@ -185,7 +194,11 @@ func (PushDownFilterRule) Rewrite(pn plan.Node) (plan.Node, bool, error) {
}
newFilterSpec := filterSpec.Copy().(*universe.FilterProcedureSpec)
newFilterSpec.Fn.Fn.Block.Body = notPushable
newFilterSpec.Fn.Fn.Block.Body = &semantic.Block{
Body: []semantic.Statement{
&semantic.ReturnStatement{Argument: notPushable},
},
}
if err := pn.ReplaceSpec(newFilterSpec); err != nil {
return nil, false, err
}
@ -623,25 +636,3 @@ func (SortedPivotRule) Rewrite(pn plan.Node) (plan.Node, bool, error) {
}
return pn, false, nil
}
// getFunctionBodyExpr will return the return value expression from
// the function block. This will only return an expression if there
// is exactly one expression in the block. It will return false
// as the second argument if the statement is more complex.
func getFunctionBodyExpr(fn *semantic.FunctionBlock) (semantic.Expression, bool) {
switch e := fn.Body.(type) {
case *semantic.Block:
if len(e.Body) != 1 {
return nil, false
}
returnExpr, ok := e.Body[0].(*semantic.ReturnStatement)
if !ok {
return nil, false
}
return returnExpr.Argument, true
case semantic.Expression:
return e, true
default:
return nil, false
}
}

View File

@ -14,6 +14,7 @@ import (
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/stdlib/universe"
"github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
)
func fluxTime(t int64) flux.Time {
@ -155,24 +156,17 @@ func TestPushDownFilterRule(t *testing.T) {
}
}
makeExprBody := func(expr *semantic.FunctionExpression) *semantic.FunctionExpression {
switch e := expr.Block.Body.(type) {
case *semantic.Block:
if len(e.Body) != 1 {
panic("more than one statement in function body")
}
returnExpr, ok := e.Body[0].(*semantic.ReturnStatement)
if !ok {
panic("non-return statement in function body")
}
newExpr := expr.Copy().(*semantic.FunctionExpression)
newExpr.Block.Body = returnExpr.Argument.Copy()
return newExpr
case semantic.Expression:
return expr
default:
panic("unexpected function body type")
toStoragePredicate := func(fn *semantic.FunctionExpression) *datatypes.Predicate {
body, ok := fn.GetFunctionBodyExpression()
if !ok {
panic("more than one statement in function body")
}
predicate, err := influxdb.ToStoragePredicate(body, "r")
if err != nil {
panic(err)
}
return predicate
}
tests := []plantest.RuleTestCase{
@ -196,9 +190,8 @@ func TestPushDownFilterRule(t *testing.T) {
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("merged_ReadRange_filter", &influxdb.ReadRangePhysSpec{
Bounds: bounds,
FilterSet: true,
Filter: makeExprBody(pushableFn1),
Bounds: bounds,
Filter: toStoragePredicate(pushableFn1),
}),
},
},
@ -227,9 +220,8 @@ func TestPushDownFilterRule(t *testing.T) {
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("merged_ReadRange_filter1_filter2", &influxdb.ReadRangePhysSpec{
Bounds: bounds,
FilterSet: true,
Filter: makeExprBody(pushableFn1and2),
Bounds: bounds,
Filter: toStoragePredicate(pushableFn1and2),
}),
},
},
@ -254,12 +246,11 @@ func TestPushDownFilterRule(t *testing.T) {
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadRange", &influxdb.ReadRangePhysSpec{
Bounds: bounds,
FilterSet: true,
Filter: makeExprBody(pushableFn1),
Bounds: bounds,
Filter: toStoragePredicate(pushableFn1),
}),
plan.CreatePhysicalNode("filter", &universe.FilterProcedureSpec{
Fn: makeResolvedFilterFn(makeExprBody(unpushableFn)),
Fn: makeResolvedFilterFn(unpushableFn),
}),
},
Edges: [][2]int{
@ -293,9 +284,8 @@ func TestPushDownFilterRule(t *testing.T) {
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("merged_ReadRange_filter", &influxdb.ReadRangePhysSpec{
Bounds: bounds,
FilterSet: true,
Filter: makeExprBody(pushableFn1),
Bounds: bounds,
Filter: toStoragePredicate(pushableFn1),
}),
},
},
@ -338,9 +328,8 @@ func TestPushDownFilterRule(t *testing.T) {
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("merged_ReadRange_filter", &influxdb.ReadRangePhysSpec{
Bounds: bounds,
FilterSet: true,
Filter: makeExprBody(executetest.FunctionExpression(t, `(r) => r.host != ""`)),
Bounds: bounds,
Filter: toStoragePredicate(executetest.FunctionExpression(t, `(r) => r.host != ""`)),
}),
},
},
@ -364,9 +353,8 @@ func TestPushDownFilterRule(t *testing.T) {
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("merged_ReadRange_filter", &influxdb.ReadRangePhysSpec{
Bounds: bounds,
FilterSet: true,
Filter: makeExprBody(executetest.FunctionExpression(t, `(r) => r.host == ""`)),
Bounds: bounds,
Filter: toStoragePredicate(executetest.FunctionExpression(t, `(r) => r.host == ""`)),
}),
},
},
@ -408,9 +396,8 @@ func TestPushDownFilterRule(t *testing.T) {
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("merged_ReadRange_filter", &influxdb.ReadRangePhysSpec{
Bounds: bounds,
FilterSet: true,
Filter: makeExprBody(executetest.FunctionExpression(t, `(r) => r.host != ""`)),
Bounds: bounds,
Filter: toStoragePredicate(executetest.FunctionExpression(t, `(r) => r.host != ""`)),
}),
},
},
@ -434,9 +421,8 @@ func TestPushDownFilterRule(t *testing.T) {
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("merged_ReadRange_filter", &influxdb.ReadRangePhysSpec{
Bounds: bounds,
FilterSet: true,
Filter: makeExprBody(executetest.FunctionExpression(t, `(r) => r._value == ""`)),
Bounds: bounds,
Filter: toStoragePredicate(executetest.FunctionExpression(t, `(r) => r._value == ""`)),
}),
},
},
@ -479,9 +465,8 @@ func TestPushDownFilterRule(t *testing.T) {
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("merged_ReadRange_filter", &influxdb.ReadRangePhysSpec{
Bounds: bounds,
FilterSet: true,
Filter: makeExprBody(executetest.FunctionExpression(t, `(r) => r.host == "cpu" and r.host != ""`)),
Bounds: bounds,
Filter: toStoragePredicate(executetest.FunctionExpression(t, `(r) => r.host == "cpu" and r.host != ""`)),
}),
},
},
@ -754,8 +739,8 @@ func TestReadTagKeysRule(t *testing.T) {
},
}
if filter {
s.FilterSet = true
s.Filter = filterSpec.Fn.Fn
bodyExpr, _ := filterSpec.Fn.Fn.GetFunctionBodyExpression()
s.Filter, _ = influxdb.ToStoragePredicate(bodyExpr, "r")
}
return &s
}
@ -971,8 +956,8 @@ func TestReadTagValuesRule(t *testing.T) {
TagKey: "host",
}
if filter {
s.FilterSet = true
s.Filter = filterSpec.Fn.Fn
bodyExpr, _ := filterSpec.Fn.Fn.GetFunctionBodyExpression()
s.Filter, _ = influxdb.ToStoragePredicate(bodyExpr, "r")
}
return &s
}

View File

@ -10,7 +10,6 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/semantic"
platform "github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/query"
@ -181,10 +180,6 @@ func createReadFilterSource(s plan.ProcedureSpec, id execute.DatasetID, a execut
return nil, err
}
var filter *semantic.FunctionExpression
if spec.FilterSet {
filter = spec.Filter
}
return ReadFilterSource(
id,
deps.Reader,
@ -192,7 +187,7 @@ func createReadFilterSource(s plan.ProcedureSpec, id execute.DatasetID, a execut
OrganizationID: orgID,
BucketID: bucketID,
Bounds: *bounds,
Predicate: filter,
Predicate: spec.Filter,
},
a,
), nil
@ -258,10 +253,6 @@ func createReadGroupSource(s plan.ProcedureSpec, id execute.DatasetID, a execute
return nil, err
}
var filter *semantic.FunctionExpression
if spec.FilterSet {
filter = spec.Filter
}
return ReadGroupSource(
id,
deps.Reader,
@ -270,7 +261,7 @@ func createReadGroupSource(s plan.ProcedureSpec, id execute.DatasetID, a execute
OrganizationID: orgID,
BucketID: bucketID,
Bounds: *bounds,
Predicate: filter,
Predicate: spec.Filter,
},
GroupMode: query.ToGroupMode(spec.GroupMode),
GroupKeys: spec.GroupKeys,
@ -297,11 +288,6 @@ func createReadTagKeysSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID,
return nil, err
}
var filter *semantic.FunctionExpression
if spec.FilterSet {
filter = spec.Filter
}
bounds := a.StreamContext().Bounds()
return ReadTagKeysSource(
dsid,
@ -311,7 +297,7 @@ func createReadTagKeysSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID,
OrganizationID: orgID,
BucketID: bucketID,
Bounds: *bounds,
Predicate: filter,
Predicate: spec.Filter,
},
},
a,
@ -366,11 +352,6 @@ func createReadTagValuesSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID
return nil, err
}
var filter *semantic.FunctionExpression
if spec.FilterSet {
filter = spec.Filter
}
bounds := a.StreamContext().Bounds()
return ReadTagValuesSource(
dsid,
@ -380,7 +361,7 @@ func createReadTagValuesSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID
OrganizationID: orgID,
BucketID: bucketID,
Bounds: *bounds,
Predicate: filter,
Predicate: spec.Filter,
},
TagKey: spec.TagKey,
},

View File

@ -1,4 +1,4 @@
package storageflux
package influxdb
import (
"fmt"
@ -10,12 +10,10 @@ import (
"github.com/pkg/errors"
)
func toStoragePredicate(f *semantic.FunctionExpression) (*datatypes.Predicate, error) {
if f.Block.Parameters == nil || len(f.Block.Parameters.List) != 1 {
return nil, errors.New("storage predicate functions must have exactly one parameter")
}
root, err := toStoragePredicateHelper(f.Block.Body.(semantic.Expression), f.Block.Parameters.List[0].Key.Name)
// ToStoragePredicate will convert a FunctionExpression into a predicate that can be
// sent down to the storage layer.
func ToStoragePredicate(n semantic.Expression, objectName string) (*datatypes.Predicate, error) {
root, err := toStoragePredicateHelper(n, objectName)
if err != nil {
return nil, err
}
@ -25,6 +23,39 @@ func toStoragePredicate(f *semantic.FunctionExpression) (*datatypes.Predicate, e
}, nil
}
func mergePredicates(op ast.LogicalOperatorKind, predicates ...*datatypes.Predicate) (*datatypes.Predicate, error) {
if len(predicates) == 0 {
return nil, errors.New("at least one predicate is needed")
}
var value datatypes.Node_Logical
switch op {
case ast.AndOperator:
value = datatypes.LogicalAnd
case ast.OrOperator:
value = datatypes.LogicalOr
default:
return nil, fmt.Errorf("unknown logical operator %v", op)
}
// Nest the predicates backwards. This way we get a tree like this:
// a AND (b AND c)
root := predicates[len(predicates)-1].Root
for i := len(predicates) - 2; i >= 0; i-- {
root = &datatypes.Node{
NodeType: datatypes.NodeTypeLogicalExpression,
Value: &datatypes.Node_Logical_{Logical: value},
Children: []*datatypes.Node{
predicates[i].Root,
root,
},
}
}
return &datatypes.Predicate{
Root: root,
}, nil
}
func toStoragePredicateHelper(n semantic.Expression, objectName string) (*datatypes.Node, error) {
switch n := n.(type) {
case *semantic.LogicalExpression:

View File

@ -7,8 +7,8 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
)
@ -27,9 +27,8 @@ type ReadFilterSpec struct {
OrganizationID influxdb.ID
BucketID influxdb.ID
Bounds execute.Bounds
Predicate *semantic.FunctionExpression
Bounds execute.Bounds
Predicate *datatypes.Predicate
}
type ReadGroupSpec struct {

View File

@ -54,41 +54,23 @@ func (r *storeReader) ReadGroup(ctx context.Context, spec query.ReadGroupSpec, a
}
func (r *storeReader) ReadTagKeys(ctx context.Context, spec query.ReadTagKeysSpec, alloc *memory.Allocator) (query.TableIterator, error) {
var predicate *datatypes.Predicate
if spec.Predicate != nil {
p, err := toStoragePredicate(spec.Predicate)
if err != nil {
return nil, err
}
predicate = p
}
return &tagKeysIterator{
ctx: ctx,
bounds: spec.Bounds,
s: r.s,
readSpec: spec,
predicate: predicate,
predicate: spec.Predicate,
alloc: alloc,
}, nil
}
func (r *storeReader) ReadTagValues(ctx context.Context, spec query.ReadTagValuesSpec, alloc *memory.Allocator) (query.TableIterator, error) {
var predicate *datatypes.Predicate
if spec.Predicate != nil {
p, err := toStoragePredicate(spec.Predicate)
if err != nil {
return nil, err
}
predicate = p
}
return &tagValuesIterator{
ctx: ctx,
bounds: spec.Bounds,
s: r.s,
readSpec: spec,
predicate: predicate,
predicate: spec.Predicate,
alloc: alloc,
}, nil
}
@ -118,18 +100,9 @@ func (fi *filterIterator) Do(f func(flux.Table) error) error {
return err
}
var predicate *datatypes.Predicate
if fi.spec.Predicate != nil {
p, err := toStoragePredicate(fi.spec.Predicate)
if err != nil {
return err
}
predicate = p
}
var req datatypes.ReadFilterRequest
req.ReadSource = any
req.Predicate = predicate
req.Predicate = fi.spec.Predicate
req.Range.Start = int64(fi.spec.Bounds.Start)
req.Range.End = int64(fi.spec.Bounds.Stop)
@ -242,18 +215,9 @@ func (gi *groupIterator) Do(f func(flux.Table) error) error {
return err
}
var predicate *datatypes.Predicate
if gi.spec.Predicate != nil {
p, err := toStoragePredicate(gi.spec.Predicate)
if err != nil {
return err
}
predicate = p
}
var req datatypes.ReadGroupRequest
req.ReadSource = any
req.Predicate = predicate
req.Predicate = gi.spec.Predicate
req.Range.Start = int64(gi.spec.Bounds.Start)
req.Range.End = int64(gi.spec.Bounds.Stop)