Merge pull request #11729 from influxdata/refactor/from-logical-physical

refactor(query): separate "from" into logical and physical procedures

Fixes [influxdata/flux#270](https://github.com/influxdata/flux/issues/270).
pull/11766/head
Christopher M. Wolff 2019-02-07 13:41:07 -08:00 committed by GitHub
commit 81e2f304ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 171 additions and 98 deletions

2
go.mod
View File

@ -61,7 +61,7 @@ require (
github.com/hashicorp/vault v0.11.5
github.com/hashicorp/vault-plugin-secrets-kv v0.0.0-20181106190520-2236f141171e // indirect
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d // indirect
github.com/influxdata/flux v0.18.0
github.com/influxdata/flux v0.18.1-0.20190207205759-08f45a8c2bc1
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
github.com/jefferai/jsonx v0.0.0-20160721235117-9cc31c3135ee // indirect

4
go.sum
View File

@ -222,8 +222,8 @@ github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/flux v0.18.0 h1:tcbvJrOqnCcbyvXz7cbwEtIGMzr/JEBSDcRbdnRJCDk=
github.com/influxdata/flux v0.18.0/go.mod h1:0f5Yrm4VPSd/Ne6jIVOVtPo0MFe6jpLCr6vdaZYp7wY=
github.com/influxdata/flux v0.18.1-0.20190207205759-08f45a8c2bc1 h1:2KjMx8Dqp/44UEELbu/JU7z6QLiQgbmgDm+tEAr1A1g=
github.com/influxdata/flux v0.18.1-0.20190207205759-08f45a8c2bc1/go.mod h1:0f5Yrm4VPSd/Ne6jIVOVtPo0MFe6jpLCr6vdaZYp7wY=
github.com/influxdata/goreleaser v0.97.0-influx/go.mod h1:MnjA0e0Uq6ISqjG1WxxMAl+3VS1QYjILSWVnMYDxasE=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo=

View File

@ -37,13 +37,14 @@ func init() {
flux.RegisterOpSpec(FromKind, newFromOp)
plan.RegisterProcedureSpec(FromKind, newFromProcedure, FromKind)
plan.RegisterPhysicalRules(
FromConversionRule{},
MergeFromRangeRule{},
MergeFromFilterRule{},
FromDistinctRule{},
MergeFromGroupRule{},
FromKeysRule{},
)
execute.RegisterSource(FromKind, createFromSource)
execute.RegisterSource(PhysicalFromKind, createFromSource)
}
func createFromOpSpec(args flux.Arguments, a *flux.Administration) (flux.OperationSpec, error) {
@ -79,10 +80,52 @@ func (s *FromOpSpec) Kind() flux.OperationKind {
}
type FromProcedureSpec struct {
plan.DefaultCost
Bucket string
BucketID string
}
func newFromProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
spec, ok := qs.(*FromOpSpec)
if !ok {
return nil, fmt.Errorf("invalid spec type %T", qs)
}
return &FromProcedureSpec{
Bucket: spec.Bucket,
BucketID: spec.BucketID,
}, nil
}
func (s *FromProcedureSpec) Kind() plan.ProcedureKind {
return FromKind
}
func (s *FromProcedureSpec) Copy() plan.ProcedureSpec {
ns := new(FromProcedureSpec)
ns.Bucket = s.Bucket
ns.BucketID = s.BucketID
return ns
}
func (s FromProcedureSpec) PostPhysicalValidate(id plan.NodeID) error {
// FromProcedureSpec has no bounds, so must be invalid.
var bucket string
if len(s.Bucket) > 0 {
bucket = s.Bucket
} else {
bucket = s.BucketID
}
return fmt.Errorf(`%s: results from "%s" must be bounded`, id, bucket)
}
const PhysicalFromKind = "physFrom"
type PhysicalFromProcedureSpec struct {
FromProcedureSpec
plan.DefaultCost
BoundsSet bool
Bounds flux.Bounds
@ -109,24 +152,12 @@ type FromProcedureSpec struct {
AggregateMethod string
}
func newFromProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
spec, ok := qs.(*FromOpSpec)
if !ok {
return nil, fmt.Errorf("invalid spec type %T", qs)
}
return &FromProcedureSpec{
Bucket: spec.Bucket,
BucketID: spec.BucketID,
}, nil
func (PhysicalFromProcedureSpec) Kind() plan.ProcedureKind {
return PhysicalFromKind
}
func (s *FromProcedureSpec) Kind() plan.ProcedureKind {
return FromKind
}
func (s *FromProcedureSpec) Copy() plan.ProcedureSpec {
ns := new(FromProcedureSpec)
func (s *PhysicalFromProcedureSpec) Copy() plan.ProcedureSpec {
ns := new(PhysicalFromProcedureSpec)
ns.Bucket = s.Bucket
ns.BucketID = s.BucketID
@ -160,7 +191,7 @@ func (s *FromProcedureSpec) Copy() plan.ProcedureSpec {
}
// TimeBounds implements plan.BoundsAwareProcedureSpec.
func (s *FromProcedureSpec) TimeBounds(predecessorBounds *plan.Bounds) *plan.Bounds {
func (s *PhysicalFromProcedureSpec) TimeBounds(predecessorBounds *plan.Bounds) *plan.Bounds {
if s.BoundsSet {
bounds := &plan.Bounds{
Start: values.ConvertTime(s.Bounds.Start.Time(s.Bounds.Now)),
@ -171,7 +202,7 @@ func (s *FromProcedureSpec) TimeBounds(predecessorBounds *plan.Bounds) *plan.Bou
return nil
}
func (s FromProcedureSpec) PostPhysicalValidate(id plan.NodeID) error {
func (s PhysicalFromProcedureSpec) PostPhysicalValidate(id plan.NodeID) error {
if !s.BoundsSet || (s.Bounds.Start.IsZero() && s.Bounds.Stop.IsZero()) {
var bucket string
if len(s.Bucket) > 0 {
@ -185,6 +216,30 @@ func (s FromProcedureSpec) PostPhysicalValidate(id plan.NodeID) error {
return nil
}
// FromConversionRule converts a logical `from` node into a physical `from` node.
// TODO(cwolff): this rule can go away when we require a `range`
// to be pushed into a logical `from` to create a physical `from.`
type FromConversionRule struct {
}
func (FromConversionRule) Name() string {
return "FromConversionRule"
}
func (FromConversionRule) Pattern() plan.Pattern {
return plan.Pat(FromKind)
}
func (FromConversionRule) Rewrite(pn plan.PlanNode) (plan.PlanNode, bool, error) {
logicalFromSpec := pn.ProcedureSpec().(*FromProcedureSpec)
newNode := plan.CreatePhysicalNode(pn.ID(), &PhysicalFromProcedureSpec{
FromProcedureSpec: *logicalFromSpec,
})
plan.ReplaceNode(pn, newNode)
return newNode, true, nil
}
// MergeFromRangeRule pushes a `range` into a `from`.
type MergeFromRangeRule struct{}
@ -195,15 +250,15 @@ func (rule MergeFromRangeRule) Name() string {
// Pattern returns the pattern that matches `from -> range`.
func (rule MergeFromRangeRule) Pattern() plan.Pattern {
return plan.Pat(universe.RangeKind, plan.Pat(FromKind))
return plan.Pat(universe.RangeKind, plan.Pat(PhysicalFromKind))
}
// Rewrite attempts to rewrite a `from -> range` into a `FromRange`.
func (rule MergeFromRangeRule) Rewrite(node plan.PlanNode) (plan.PlanNode, bool, error) {
from := node.Predecessors()[0]
fromSpec := from.ProcedureSpec().(*FromProcedureSpec)
fromSpec := from.ProcedureSpec().(*PhysicalFromProcedureSpec)
rangeSpec := node.ProcedureSpec().(*universe.RangeProcedureSpec)
fromRange := fromSpec.Copy().(*FromProcedureSpec)
fromRange := fromSpec.Copy().(*PhysicalFromProcedureSpec)
// Set new bounds to `range` bounds initially
fromRange.Bounds = rangeSpec.Bounds
@ -240,7 +295,7 @@ func (rule MergeFromRangeRule) Rewrite(node plan.PlanNode) (plan.PlanNode, bool,
fromRange.BoundsSet = true
// Finally merge nodes into single operation
merged, err := plan.MergePhysicalPlanNodes(node, from, fromRange)
merged, err := plan.MergeToPhysicalPlanNode(node, from, fromRange)
if err != nil {
return nil, false, err
}
@ -258,13 +313,13 @@ func (MergeFromFilterRule) Name() string {
}
func (MergeFromFilterRule) Pattern() plan.Pattern {
return plan.Pat(universe.FilterKind, plan.Pat(FromKind))
return plan.Pat(universe.FilterKind, plan.Pat(PhysicalFromKind))
}
func (MergeFromFilterRule) Rewrite(filterNode plan.PlanNode) (plan.PlanNode, bool, error) {
filterSpec := filterNode.ProcedureSpec().(*universe.FilterProcedureSpec)
fromNode := filterNode.Predecessors()[0]
fromSpec := fromNode.ProcedureSpec().(*FromProcedureSpec)
fromSpec := fromNode.ProcedureSpec().(*PhysicalFromProcedureSpec)
if fromSpec.AggregateSet || fromSpec.GroupingSet {
return filterNode, false, nil
@ -294,7 +349,7 @@ func (MergeFromFilterRule) Rewrite(filterNode plan.PlanNode) (plan.PlanNode, boo
return filterNode, false, nil
}
newFromSpec := fromSpec.Copy().(*FromProcedureSpec)
newFromSpec := fromSpec.Copy().(*PhysicalFromProcedureSpec)
if newFromSpec.FilterSet {
newBody := semantic.ExprsToConjunction(newFromSpec.Filter.Block.Body.(semantic.Expression), pushable)
newFromSpec.Filter.Block.Body = newBody
@ -306,7 +361,7 @@ func (MergeFromFilterRule) Rewrite(filterNode plan.PlanNode) (plan.PlanNode, boo
if notPushable == nil {
// All predicates could be pushed down, so eliminate the filter
mergedNode, err := plan.MergePhysicalPlanNodes(filterNode, fromNode, newFromSpec)
mergedNode, err := plan.MergeToPhysicalPlanNode(filterNode, fromNode, newFromSpec)
if err != nil {
return nil, false, err
}
@ -468,13 +523,13 @@ func (FromDistinctRule) Name() string {
}
func (FromDistinctRule) Pattern() plan.Pattern {
return plan.Pat(universe.DistinctKind, plan.Pat(FromKind))
return plan.Pat(universe.DistinctKind, plan.Pat(PhysicalFromKind))
}
func (FromDistinctRule) Rewrite(distinctNode plan.PlanNode) (plan.PlanNode, bool, error) {
fromNode := distinctNode.Predecessors()[0]
distinctSpec := distinctNode.ProcedureSpec().(*universe.DistinctProcedureSpec)
fromSpec := fromNode.ProcedureSpec().(*FromProcedureSpec)
fromSpec := fromNode.ProcedureSpec().(*PhysicalFromProcedureSpec)
if fromSpec.LimitSet && fromSpec.PointsLimit == -1 {
return distinctNode, false, nil
@ -485,7 +540,7 @@ func (FromDistinctRule) Rewrite(distinctNode plan.PlanNode) (plan.PlanNode, bool
((fromSpec.GroupMode == flux.GroupModeBy && execute.ContainsStr(fromSpec.GroupKeys, distinctSpec.Column)) ||
(fromSpec.GroupMode == flux.GroupModeExcept && !execute.ContainsStr(fromSpec.GroupKeys, distinctSpec.Column)))
if groupStar || groupByColumn {
newFromSpec := fromSpec.Copy().(*FromProcedureSpec)
newFromSpec := fromSpec.Copy().(*PhysicalFromProcedureSpec)
newFromSpec.LimitSet = true
newFromSpec.PointsLimit = -1
if err := fromNode.ReplaceSpec(newFromSpec); err != nil {
@ -505,13 +560,13 @@ func (MergeFromGroupRule) Name() string {
}
func (MergeFromGroupRule) Pattern() plan.Pattern {
return plan.Pat(universe.GroupKind, plan.Pat(FromKind))
return plan.Pat(universe.GroupKind, plan.Pat(PhysicalFromKind))
}
func (MergeFromGroupRule) Rewrite(groupNode plan.PlanNode) (plan.PlanNode, bool, error) {
fromNode := groupNode.Predecessors()[0]
groupSpec := groupNode.ProcedureSpec().(*universe.GroupProcedureSpec)
fromSpec := fromNode.ProcedureSpec().(*FromProcedureSpec)
fromSpec := fromNode.ProcedureSpec().(*PhysicalFromProcedureSpec)
if fromSpec.GroupingSet ||
fromSpec.LimitSet ||
@ -527,11 +582,11 @@ func (MergeFromGroupRule) Rewrite(groupNode plan.PlanNode) (plan.PlanNode, bool,
}
}
newFromSpec := fromSpec.Copy().(*FromProcedureSpec)
newFromSpec := fromSpec.Copy().(*PhysicalFromProcedureSpec)
newFromSpec.GroupingSet = true
newFromSpec.GroupMode = groupSpec.GroupMode
newFromSpec.GroupKeys = groupSpec.GroupKeys
merged, err := plan.MergePhysicalPlanNodes(groupNode, fromNode, newFromSpec)
merged, err := plan.MergeToPhysicalPlanNode(groupNode, fromNode, newFromSpec)
if err != nil {
return nil, false, err
}
@ -546,18 +601,18 @@ func (FromKeysRule) Name() string {
}
func (FromKeysRule) Pattern() plan.Pattern {
return plan.Pat(universe.KeysKind, plan.Pat(FromKind))
return plan.Pat(universe.KeysKind, plan.Pat(PhysicalFromKind))
}
func (FromKeysRule) Rewrite(keysNode plan.PlanNode) (plan.PlanNode, bool, error) {
fromNode := keysNode.Predecessors()[0]
fromSpec := fromNode.ProcedureSpec().(*FromProcedureSpec)
fromSpec := fromNode.ProcedureSpec().(*PhysicalFromProcedureSpec)
if fromSpec.LimitSet && fromSpec.PointsLimit == -1 {
return keysNode, false, nil
}
newFromSpec := fromSpec.Copy().(*FromProcedureSpec)
newFromSpec := fromSpec.Copy().(*PhysicalFromProcedureSpec)
newFromSpec.LimitSet = true
newFromSpec.PointsLimit = -1
@ -572,7 +627,7 @@ func (FromKeysRule) Rewrite(keysNode plan.PlanNode) (plan.PlanNode, bool, error)
// https://github.com/influxdata/flux/issues/114
func createFromSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execute.Administration) (execute.Source, error) {
spec := prSpec.(*FromProcedureSpec)
spec := prSpec.(*PhysicalFromProcedureSpec)
var w execute.Window
bounds := a.StreamContext().Bounds()
if bounds == nil {

View File

@ -174,14 +174,15 @@ func makeFilterFn(exprs ...semantic.Expression) *semantic.FunctionExpression {
func TestFromRangeRule(t *testing.T) {
var (
fromWithBounds = &influxdb.FromProcedureSpec{
from = &influxdb.FromProcedureSpec{}
fromWithBounds = &influxdb.PhysicalFromProcedureSpec{
BoundsSet: true,
Bounds: flux.Bounds{
Start: fluxTime(5),
Stop: fluxTime(10),
},
}
fromWithIntersectedBounds = &influxdb.FromProcedureSpec{
fromWithIntersectedBounds = &influxdb.PhysicalFromProcedureSpec{
BoundsSet: true,
Bounds: flux.Bounds{
Start: fluxTime(9),
@ -200,7 +201,6 @@ func TestFromRangeRule(t *testing.T) {
Stop: fluxTime(14),
},
}
from = &influxdb.FromProcedureSpec{}
mean = &universe.MeanProcedureSpec{}
count = &universe.CountProcedureSpec{}
)
@ -209,10 +209,10 @@ func TestFromRangeRule(t *testing.T) {
{
Name: "from range",
// from -> range => from
Rules: []plan.Rule{&influxdb.MergeFromRangeRule{}},
Rules: []plan.Rule{&influxdb.FromConversionRule{}, &influxdb.MergeFromRangeRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreateLogicalNode("from", from),
plan.CreatePhysicalNode("range", rangeWithBounds),
},
Edges: [][2]int{{0, 1}},
@ -226,10 +226,10 @@ func TestFromRangeRule(t *testing.T) {
{
Name: "from range with successor node",
// from -> range -> count => from -> count
Rules: []plan.Rule{&influxdb.MergeFromRangeRule{}},
Rules: []plan.Rule{&influxdb.FromConversionRule{}, &influxdb.MergeFromRangeRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreateLogicalNode("from", from),
plan.CreatePhysicalNode("range", rangeWithBounds),
plan.CreatePhysicalNode("count", count),
},
@ -249,10 +249,10 @@ func TestFromRangeRule(t *testing.T) {
{
Name: "from with multiple ranges",
// from -> range -> range => from
Rules: []plan.Rule{&influxdb.MergeFromRangeRule{}},
Rules: []plan.Rule{&influxdb.FromConversionRule{}, &influxdb.MergeFromRangeRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreateLogicalNode("from", from),
plan.CreatePhysicalNode("range0", rangeWithBounds),
plan.CreatePhysicalNode("range1", rangeWithDifferentBounds),
},
@ -274,10 +274,10 @@ func TestFromRangeRule(t *testing.T) {
// range => \ /
// | from
// from
Rules: []plan.Rule{&influxdb.MergeFromRangeRule{}},
Rules: []plan.Rule{&influxdb.FromConversionRule{}, &influxdb.MergeFromRangeRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreateLogicalNode("from", from),
plan.CreatePhysicalNode("range", rangeWithBounds),
plan.CreatePhysicalNode("count", count),
plan.CreatePhysicalNode("yield0", yield("count")),
@ -316,7 +316,7 @@ func TestFromRangeRule(t *testing.T) {
Rules: []plan.Rule{&influxdb.MergeFromRangeRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreatePhysicalNode("from", &influxdb.PhysicalFromProcedureSpec{}),
plan.CreatePhysicalNode("range", rangeWithBounds),
plan.CreatePhysicalNode("yield0", yield("range")),
plan.CreatePhysicalNode("count", count),
@ -337,21 +337,29 @@ func TestFromRangeRule(t *testing.T) {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
plantest.RuleTestHelper(t, &tc)
plantest.PhysicalRuleTestHelper(t, &tc)
})
}
}
func TestFromFilterRule(t *testing.T) {
var (
rangeWithBounds = &universe.RangeProcedureSpec{
Bounds: flux.Bounds{
Start: fluxTime(5),
Stop: fluxTime(10),
},
bounds = flux.Bounds{
Start: fluxTime(5),
Stop: fluxTime(10),
}
from = &influxdb.FromProcedureSpec{}
physFrom = &influxdb.PhysicalFromProcedureSpec{
BoundsSet: true,
Bounds: bounds,
}
rangeWithBounds = &universe.RangeProcedureSpec{
Bounds: bounds,
}
pushableExpr1 = &semantic.BinaryExpression{Operator: ast.EqualOperator,
Left: &semantic.MemberExpression{Object: &semantic.IdentifierExpression{Name: "r"}, Property: "_measurement"},
Right: &semantic.StringLiteral{Value: "cpu"}}
@ -378,10 +386,10 @@ func TestFromFilterRule(t *testing.T) {
{
Name: "from filter",
// from -> filter => from
Rules: []plan.Rule{influxdb.MergeFromFilterRule{}},
Rules: []plan.Rule{influxdb.MergeFromFilterRule{}, influxdb.MergeFromRangeRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreatePhysicalNode("from", physFrom),
plan.CreatePhysicalNode("filter", &universe.FilterProcedureSpec{Fn: makeFilterFn(pushableExpr1)}),
},
Edges: [][2]int{
@ -390,7 +398,9 @@ func TestFromFilterRule(t *testing.T) {
},
After: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("merged_from_filter", &influxdb.FromProcedureSpec{
plan.CreatePhysicalNode("merged_from_filter", &influxdb.PhysicalFromProcedureSpec{
BoundsSet: true,
Bounds: bounds,
FilterSet: true,
Filter: makeFilterFn(pushableExpr1),
}),
@ -403,7 +413,7 @@ func TestFromFilterRule(t *testing.T) {
Rules: []plan.Rule{influxdb.MergeFromFilterRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreatePhysicalNode("from", physFrom),
plan.CreatePhysicalNode("filter1", &universe.FilterProcedureSpec{Fn: makeFilterFn(pushableExpr1)}),
plan.CreatePhysicalNode("filter2", &universe.FilterProcedureSpec{Fn: makeFilterFn(pushableExpr2)}),
},
@ -415,7 +425,9 @@ func TestFromFilterRule(t *testing.T) {
After: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("merged_from_filter1_filter2",
&influxdb.FromProcedureSpec{
&influxdb.PhysicalFromProcedureSpec{
BoundsSet: true,
Bounds: bounds,
FilterSet: true,
Filter: makeFilterFn(pushableExpr1, pushableExpr2),
}),
@ -428,7 +440,7 @@ func TestFromFilterRule(t *testing.T) {
Rules: []plan.Rule{influxdb.MergeFromFilterRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreatePhysicalNode("from", physFrom),
plan.CreatePhysicalNode("filter", &universe.FilterProcedureSpec{Fn: makeFilterFn(pushableExpr1, unpushableExpr)}),
},
Edges: [][2]int{
@ -438,7 +450,9 @@ func TestFromFilterRule(t *testing.T) {
After: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from",
&influxdb.FromProcedureSpec{
&influxdb.PhysicalFromProcedureSpec{
BoundsSet: true,
Bounds: bounds,
FilterSet: true,
Filter: makeFilterFn(pushableExpr1),
}),
@ -452,10 +466,10 @@ func TestFromFilterRule(t *testing.T) {
{
Name: "from range filter",
// from -> range -> filter => from
Rules: []plan.Rule{influxdb.MergeFromFilterRule{}, influxdb.MergeFromRangeRule{}},
Rules: []plan.Rule{influxdb.FromConversionRule{}, influxdb.MergeFromFilterRule{}, influxdb.MergeFromRangeRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreateLogicalNode("from", from),
plan.CreatePhysicalNode("range", rangeWithBounds),
plan.CreatePhysicalNode("filter", &universe.FilterProcedureSpec{Fn: makeFilterFn(pushableExpr1)}),
},
@ -466,14 +480,14 @@ func TestFromFilterRule(t *testing.T) {
},
After: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("merged_from_range_filter", &influxdb.FromProcedureSpec{
FilterSet: true,
Filter: makeFilterFn(pushableExpr1),
plan.CreatePhysicalNode("merged_from_range_filter", &influxdb.PhysicalFromProcedureSpec{
BoundsSet: true,
Bounds: flux.Bounds{
Start: fluxTime(5),
Stop: fluxTime(10),
},
FilterSet: true,
Filter: makeFilterFn(pushableExpr1),
}),
},
},
@ -484,7 +498,7 @@ func TestFromFilterRule(t *testing.T) {
Rules: []plan.Rule{influxdb.MergeFromFilterRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreateLogicalNode("from", physFrom),
plan.CreatePhysicalNode("filter", &universe.FilterProcedureSpec{Fn: makeFilterFn(unpushableExpr)}),
},
Edges: [][2]int{
@ -499,7 +513,7 @@ func TestFromFilterRule(t *testing.T) {
Rules: []plan.Rule{influxdb.MergeFromFilterRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreateLogicalNode("from", physFrom),
plan.CreatePhysicalNode("filter", &universe.FilterProcedureSpec{Fn: statementFn}),
},
Edges: [][2]int{
@ -514,20 +528,22 @@ func TestFromFilterRule(t *testing.T) {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
plantest.RuleTestHelper(t, &tc)
plantest.PhysicalRuleTestHelper(t, &tc)
})
}
}
func TestFromDistinctRule(t *testing.T) {
var from = &influxdb.FromProcedureSpec{}
physFrom := &influxdb.PhysicalFromProcedureSpec{}
tests := []plantest.RuleTestCase{
{
Name: "from distinct",
Rules: []plan.Rule{influxdb.FromDistinctRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreatePhysicalNode("from", physFrom),
plan.CreatePhysicalNode("distinct", &universe.DistinctProcedureSpec{Column: "_measurement"}),
},
Edges: [][2]int{
@ -536,7 +552,7 @@ func TestFromDistinctRule(t *testing.T) {
},
After: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", &influxdb.FromProcedureSpec{
plan.CreatePhysicalNode("from", &influxdb.PhysicalFromProcedureSpec{
LimitSet: true,
PointsLimit: -1,
}),
@ -553,7 +569,7 @@ func TestFromDistinctRule(t *testing.T) {
Rules: []plan.Rule{influxdb.FromDistinctRule{}, influxdb.MergeFromGroupRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreatePhysicalNode("from", physFrom),
plan.CreatePhysicalNode("group", &universe.GroupProcedureSpec{
GroupMode: flux.GroupModeBy,
GroupKeys: []string{"_measurement"},
@ -567,7 +583,7 @@ func TestFromDistinctRule(t *testing.T) {
},
After: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("merged_from_group", &influxdb.FromProcedureSpec{
plan.CreatePhysicalNode("merged_from_group", &influxdb.PhysicalFromProcedureSpec{
GroupingSet: true,
GroupMode: flux.GroupModeBy,
GroupKeys: []string{"_measurement"},
@ -585,7 +601,7 @@ func TestFromDistinctRule(t *testing.T) {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
plantest.RuleTestHelper(t, &tc)
plantest.PhysicalRuleTestHelper(t, &tc)
})
}
}
@ -598,7 +614,8 @@ func TestFromGroupRule(t *testing.T) {
Stop: fluxTime(10),
},
}
from = &influxdb.FromProcedureSpec{}
from = &influxdb.FromProcedureSpec{}
physFrom = &influxdb.PhysicalFromProcedureSpec{}
pushableExpr1 = &semantic.BinaryExpression{Operator: ast.EqualOperator,
Left: &semantic.MemberExpression{Object: &semantic.IdentifierExpression{Name: "r"}, Property: "_measurement"},
@ -611,7 +628,7 @@ func TestFromGroupRule(t *testing.T) {
Rules: []plan.Rule{influxdb.MergeFromGroupRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreatePhysicalNode("from", physFrom),
plan.CreatePhysicalNode("group", &universe.GroupProcedureSpec{
GroupMode: flux.GroupModeBy,
GroupKeys: []string{"_measurement"},
@ -625,7 +642,7 @@ func TestFromGroupRule(t *testing.T) {
},
After: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("merged_from_group", &influxdb.FromProcedureSpec{
plan.CreatePhysicalNode("merged_from_group", &influxdb.PhysicalFromProcedureSpec{
GroupingSet: true,
GroupMode: flux.GroupModeBy,
GroupKeys: []string{"_measurement"},
@ -643,7 +660,7 @@ func TestFromGroupRule(t *testing.T) {
Rules: []plan.Rule{influxdb.MergeFromGroupRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreatePhysicalNode("from", physFrom),
plan.CreatePhysicalNode("group", &universe.GroupProcedureSpec{
GroupMode: flux.GroupModeBy,
GroupKeys: []string{"_measurement"},
@ -660,7 +677,7 @@ func TestFromGroupRule(t *testing.T) {
},
After: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("merged_from_group", &influxdb.FromProcedureSpec{
plan.CreatePhysicalNode("merged_from_group", &influxdb.PhysicalFromProcedureSpec{
GroupingSet: true,
GroupMode: flux.GroupModeBy,
GroupKeys: []string{"_measurement"},
@ -676,11 +693,12 @@ func TestFromGroupRule(t *testing.T) {
},
},
{
Name: "from range group distinct group",
Rules: []plan.Rule{influxdb.MergeFromGroupRule{}, influxdb.FromDistinctRule{}, influxdb.MergeFromRangeRule{}},
Name: "from range group distinct group",
Rules: []plan.Rule{influxdb.FromConversionRule{}, influxdb.MergeFromGroupRule{},
influxdb.FromDistinctRule{}, influxdb.MergeFromRangeRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreateLogicalNode("from", from),
plan.CreatePhysicalNode("range", rangeWithBounds),
plan.CreatePhysicalNode("group1", &universe.GroupProcedureSpec{
GroupMode: flux.GroupModeBy,
@ -698,7 +716,7 @@ func TestFromGroupRule(t *testing.T) {
},
After: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("merged_from_range_group1", &influxdb.FromProcedureSpec{
plan.CreatePhysicalNode("merged_from_range_group1", &influxdb.PhysicalFromProcedureSpec{
BoundsSet: true,
Bounds: flux.Bounds{Start: fluxTime(5), Stop: fluxTime(10)},
GroupingSet: true,
@ -722,7 +740,7 @@ func TestFromGroupRule(t *testing.T) {
Rules: []plan.Rule{influxdb.MergeFromGroupRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreateLogicalNode("from", physFrom),
plan.CreatePhysicalNode("group", &universe.GroupProcedureSpec{
GroupMode: flux.GroupModeExcept,
GroupKeys: []string{"_time", "_value"},
@ -740,7 +758,7 @@ func TestFromGroupRule(t *testing.T) {
Rules: []plan.Rule{influxdb.MergeFromGroupRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreatePhysicalNode("from", physFrom),
plan.CreatePhysicalNode("group", &universe.GroupProcedureSpec{
GroupMode: flux.GroupModeExcept,
GroupKeys: []string{"_time"},
@ -758,7 +776,7 @@ func TestFromGroupRule(t *testing.T) {
Rules: []plan.Rule{influxdb.MergeFromGroupRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", from),
plan.CreatePhysicalNode("from", physFrom),
plan.CreatePhysicalNode("group", &universe.GroupProcedureSpec{
GroupMode: flux.GroupModeExcept,
GroupKeys: []string{"_value"},
@ -776,7 +794,7 @@ func TestFromGroupRule(t *testing.T) {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
plantest.RuleTestHelper(t, &tc)
plantest.PhysicalRuleTestHelper(t, &tc)
})
}
}
@ -788,7 +806,7 @@ func TestFromKeysRule(t *testing.T) {
Rules: []plan.Rule{influxdb.FromKeysRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", &influxdb.FromProcedureSpec{}),
plan.CreatePhysicalNode("from", &influxdb.PhysicalFromProcedureSpec{}),
plan.CreatePhysicalNode("keys", &universe.KeysProcedureSpec{}),
},
Edges: [][2]int{
@ -797,7 +815,7 @@ func TestFromKeysRule(t *testing.T) {
},
After: &plantest.PlanSpec{
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", &influxdb.FromProcedureSpec{
plan.CreatePhysicalNode("from", &influxdb.PhysicalFromProcedureSpec{
LimitSet: true,
PointsLimit: -1,
}),
@ -814,7 +832,7 @@ func TestFromKeysRule(t *testing.T) {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
plantest.RuleTestHelper(t, &tc)
plantest.PhysicalRuleTestHelper(t, &tc)
})
}
}
@ -828,7 +846,7 @@ func TestFromRangeValidation(t *testing.T) {
// \ /
// from
Nodes: []plan.PlanNode{
plan.CreatePhysicalNode("from", &influxdb.FromProcedureSpec{}),
plan.CreatePhysicalNode("from", &influxdb.PhysicalFromProcedureSpec{}),
plantest.CreatePhysicalMockNode("1"),
plantest.CreatePhysicalMockNode("2"),
plantest.CreatePhysicalMockNode("3"),