feat(query): add a planner rule to push down bare aggregates (#18144)

pull/18216/head
Christopher M. Wolff 2020-05-21 23:03:09 -07:00 committed by GitHub
parent 0bb847b7e7
commit 53165bfb7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 315 additions and 168 deletions

View File

@ -28,13 +28,13 @@
expose: true
- name: Push Down Window Aggregate Count
description: Enable Count variant of PushDownWindowAggregateRule
description: Enable Count variant of PushDownWindowAggregateRule and PushDownBareAggregateRule
key: pushDownWindowAggregateCount
default: false
contact: Query Team
- name: Push Down Window Aggregate Rest
description: Enable non-Count variants of PushDownWindowAggregateRule (stage 2)
description: Enable non-Count variants of PushDownWindowAggregateRule and PushDownWindowAggregateRule (stage 2)
key: pushDownWindowAggregateRest
default: false
contact: Query Team

View File

@ -39,7 +39,7 @@ var pushDownWindowAggregateCount = MakeBoolFlag(
false,
)
// PushDownWindowAggregateCount - Enable Count variant of PushDownWindowAggregateRule
// PushDownWindowAggregateCount - Enable Count variant of PushDownWindowAggregateRule and PushDownBareAggregateRule
func PushDownWindowAggregateCount() BoolFlag {
return pushDownWindowAggregateCount
}
@ -53,7 +53,7 @@ var pushDownWindowAggregateRest = MakeBoolFlag(
false,
)
// PushDownWindowAggregateRest - Enable non-Count variants of PushDownWindowAggregateRule (stage 2)
// PushDownWindowAggregateRest - Enable non-Count variants of PushDownWindowAggregateRule and PushDownWindowAggregateRule (stage 2)
func PushDownWindowAggregateRest() BoolFlag {
return pushDownWindowAggregateRest
}

View File

@ -2,6 +2,7 @@ package influxdb
import (
"context"
"math"
"github.com/influxdata/flux"
"github.com/influxdata/flux/ast"
@ -24,9 +25,10 @@ func init() {
PushDownReadTagKeysRule{},
PushDownReadTagValuesRule{},
SortedPivotRule{},
// For this rule to take effect the appropriate capabilities must be
// For the following two rules to take effect the appropriate capabilities must be
// added AND feature flags must be enabled.
// PushDownWindowAggregateRule{},
// PushDownBareAggregateRule{},
// For this rule to take effect the corresponding feature flags must be
// enabled.
@ -658,82 +660,90 @@ func (PushDownWindowAggregateRule) Name() string {
return "PushDownWindowAggregateRule"
}
var windowPushableAggs = []plan.ProcedureKind{
universe.MinKind,
universe.MaxKind,
universe.MeanKind,
universe.CountKind,
universe.SumKind,
}
func (rule PushDownWindowAggregateRule) Pattern() plan.Pattern {
return plan.OneOf(
[]plan.ProcedureKind{
universe.MinKind,
universe.MaxKind,
universe.MeanKind,
universe.CountKind,
universe.SumKind,
},
return plan.OneOf(windowPushableAggs,
plan.Pat(universe.WindowKind, plan.Pat(ReadRangePhysKind)))
}
func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) {
func canPushWindowedAggregate(ctx context.Context, fnNode plan.Node) bool {
// Check Capabilities
reader := GetStorageDependencies(ctx).FromDeps.Reader
windowAggregateReader, ok := reader.(query.WindowAggregateReader)
if !ok {
return pn, false, nil
return false
}
caps := windowAggregateReader.GetWindowAggregateCapability(ctx)
if caps == nil {
return pn, false, nil
return false
}
// Check the aggregate function spec. Require operation on _value. There
// are two feature flags covering all cases. One specifically for Count,
// and another for the rest. There are individual capability tests for all
// cases.
fnNode := pn
switch fnNode.Kind() {
case universe.MinKind:
if !feature.PushDownWindowAggregateRest().Enabled(ctx) || !caps.HaveMin() {
return pn, false, nil
return false
}
minSpec := fnNode.ProcedureSpec().(*universe.MinProcedureSpec)
if minSpec.Column != execute.DefaultValueColLabel {
return pn, false, nil
return false
}
case universe.MaxKind:
if !feature.PushDownWindowAggregateRest().Enabled(ctx) || !caps.HaveMax() {
return pn, false, nil
return false
}
maxSpec := fnNode.ProcedureSpec().(*universe.MaxProcedureSpec)
if maxSpec.Column != execute.DefaultValueColLabel {
return pn, false, nil
return false
}
case universe.MeanKind:
if !feature.PushDownWindowAggregateRest().Enabled(ctx) || !caps.HaveMean() {
return pn, false, nil
return false
}
meanSpec := fnNode.ProcedureSpec().(*universe.MeanProcedureSpec)
if len(meanSpec.Columns) != 1 || meanSpec.Columns[0] != execute.DefaultValueColLabel {
return pn, false, nil
return false
}
case universe.CountKind:
if !feature.PushDownWindowAggregateCount().Enabled(ctx) || !caps.HaveCount() {
return pn, false, nil
return false
}
countSpec := fnNode.ProcedureSpec().(*universe.CountProcedureSpec)
if len(countSpec.Columns) != 1 || countSpec.Columns[0] != execute.DefaultValueColLabel {
return pn, false, nil
return false
}
case universe.SumKind:
if !feature.PushDownWindowAggregateRest().Enabled(ctx) || !caps.HaveSum() {
return pn, false, nil
return false
}
sumSpec := fnNode.ProcedureSpec().(*universe.SumProcedureSpec)
if len(sumSpec.Columns) != 1 || sumSpec.Columns[0] != execute.DefaultValueColLabel {
return pn, false, nil
return false
}
}
return true
}
func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) {
fnNode := pn
if !canPushWindowedAggregate(ctx, fnNode) {
return pn, false, nil
}
windowNode := fnNode.Predecessors()[0]
windowSpec := windowNode.ProcedureSpec().(*universe.WindowProcedureSpec)
@ -769,6 +779,35 @@ func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (p
}), true, nil
}
// PushDownBareAggregateRule is a rule that allows pushing down of aggregates
// that are directly over a ReadRange source.
type PushDownBareAggregateRule struct{}
func (p PushDownBareAggregateRule) Name() string {
return "PushDownWindowAggregateRule"
}
func (p PushDownBareAggregateRule) Pattern() plan.Pattern {
return plan.OneOf(windowPushableAggs,
plan.Pat(ReadRangePhysKind))
}
func (p PushDownBareAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) {
fnNode := pn
if !canPushWindowedAggregate(ctx, fnNode) {
return pn, false, nil
}
fromNode := fnNode.Predecessors()[0]
fromSpec := fromNode.ProcedureSpec().(*ReadRangePhysSpec)
return plan.CreatePhysicalNode("ReadWindowAggregate", &ReadWindowAggregatePhysSpec{
ReadRangePhysSpec: *fromSpec.Copy().(*ReadRangePhysSpec),
Aggregates: []plan.ProcedureKind{fnNode.Kind()},
WindowEvery: math.MaxInt64,
}), true, nil
}
//
// Push Down of group aggregates.
// ReadGroupPhys |> { count }

View File

@ -2,6 +2,7 @@ package influxdb_test
import (
"context"
"math"
"testing"
"time"
@ -16,11 +17,11 @@ import (
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/stdlib/universe"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/query"
"github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/kit/feature"
)
// A small mock reader so we can indicate if rule-related capabilities are
@ -31,7 +32,7 @@ type mockReaderCaps struct {
}
func (caps mockReaderCaps) GetWindowAggregateCapability(ctx context.Context) query.WindowAggregateCapability {
return mockWAC{ Have: caps.Have }
return mockWAC{Have: caps.Have}
}
func (caps mockReaderCaps) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {
@ -43,11 +44,11 @@ type mockWAC struct {
Have bool
}
func (m mockWAC) HaveMin() bool { return m.Have }
func (m mockWAC) HaveMax() bool { return m.Have }
func (m mockWAC) HaveMean() bool { return m.Have }
func (m mockWAC) HaveMin() bool { return m.Have }
func (m mockWAC) HaveMax() bool { return m.Have }
func (m mockWAC) HaveMean() bool { return m.Have }
func (m mockWAC) HaveCount() bool { return m.Have }
func (m mockWAC) HaveSum() bool { return m.Have }
func (m mockWAC) HaveSum() bool { return m.Have }
func fluxTime(t int64) flux.Time {
return flux.Time{
@ -1152,9 +1153,9 @@ func TestReadTagValuesRule(t *testing.T) {
//
func TestPushDownWindowAggregateRule(t *testing.T) {
// Turn on all variants.
flagger := mock.NewFlagger(map[feature.Flag] interface{}{
flagger := mock.NewFlagger(map[feature.Flag]interface{}{
feature.PushDownWindowAggregateCount(): true,
feature.PushDownWindowAggregateRest(): true,
feature.PushDownWindowAggregateRest(): true,
})
withFlagger, _ := feature.Annotate(context.Background(), flagger)
@ -1163,7 +1164,7 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
deps := func(have bool) influxdb.StorageDependencies {
return influxdb.StorageDependencies{
FromDeps: influxdb.FromDependencies{
Reader: mockReaderCaps{ Have: have },
Reader: mockReaderCaps{Have: have},
Metrics: influxdb.NewMetrics(nil),
},
}
@ -1186,7 +1187,7 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
durNeg, _ := values.ParseDuration("-60s")
dur1y, _ := values.ParseDuration("1y")
window := func(dur values.Duration) universe.WindowProcedureSpec{
window := func(dur values.Duration) universe.WindowProcedureSpec {
return universe.WindowProcedureSpec{
Window: plan.WindowSpec{
Every: dur,
@ -1208,7 +1209,7 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
tests := make([]plantest.RuleTestCase, 0)
// construct a simple plan with a specific window and aggregate function
simplePlanWithWindowAgg := func( window universe.WindowProcedureSpec, agg plan.NodeID, spec plan.ProcedureSpec ) *plantest.PlanSpec {
simplePlanWithWindowAgg := func(window universe.WindowProcedureSpec, agg plan.NodeID, spec plan.ProcedureSpec) *plantest.PlanSpec {
return &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadRange", &readRange),
@ -1223,13 +1224,13 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
}
// construct a simple result
simpleResult := func( proc plan.ProcedureKind ) *plantest.PlanSpec {
simpleResult := func(proc plan.ProcedureKind) *plantest.PlanSpec {
return &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadWindowAggregate", &influxdb.ReadWindowAggregatePhysSpec{
ReadRangePhysSpec: readRange,
Aggregates: []plan.ProcedureKind{proc},
WindowEvery: 60000000000,
Aggregates: []plan.ProcedureKind{proc},
WindowEvery: 60000000000,
}),
},
}
@ -1261,66 +1262,64 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
}
}
// ReadRange -> window -> min => ReadWindowAggregate
tests = append(tests, plantest.RuleTestCase{
Context: haveCaps,
Name: "SimplePassMin",
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "min", minProcedureSpec() ),
After: simpleResult( "min" ),
Name: "SimplePassMin",
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg(window1m, "min", minProcedureSpec()),
After: simpleResult("min"),
})
// ReadRange -> window -> max => ReadWindowAggregate
tests = append(tests, plantest.RuleTestCase{
Context: haveCaps,
Name: "SimplePassMax",
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "max", maxProcedureSpec() ),
After: simpleResult( "max" ),
Name: "SimplePassMax",
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg(window1m, "max", maxProcedureSpec()),
After: simpleResult("max"),
})
// ReadRange -> window -> mean => ReadWindowAggregate
tests = append(tests, plantest.RuleTestCase{
Context: haveCaps,
Name: "SimplePassMean",
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "mean", meanProcedureSpec() ),
After: simpleResult( "mean" ),
Name: "SimplePassMean",
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg(window1m, "mean", meanProcedureSpec()),
After: simpleResult("mean"),
})
// ReadRange -> window -> count => ReadWindowAggregate
tests = append(tests, plantest.RuleTestCase{
Context: haveCaps,
Name: "SimplePassCount",
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "count", countProcedureSpec() ),
After: simpleResult( "count" ),
Name: "SimplePassCount",
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg(window1m, "count", countProcedureSpec()),
After: simpleResult("count"),
})
// ReadRange -> window -> sum => ReadWindowAggregate
tests = append(tests, plantest.RuleTestCase{
Context: haveCaps,
Name: "SimplePassSum",
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "sum", sumProcedureSpec() ),
After: simpleResult( "sum" ),
Name: "SimplePassSum",
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg(window1m, "sum", sumProcedureSpec()),
After: simpleResult("sum"),
})
// Rewrite with successors
// ReadRange -> window -> min -> count {2} => ReadWindowAggregate -> count {2}
tests = append(tests, plantest.RuleTestCase{
Context: haveCaps,
Name: "WithSuccessor",
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Name: "WithSuccessor",
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadRange", &readRange),
plan.CreateLogicalNode("window", &window1m),
plan.CreateLogicalNode("min", minProcedureSpec() ),
plan.CreateLogicalNode("count", countProcedureSpec() ),
plan.CreateLogicalNode("count", countProcedureSpec() ),
plan.CreateLogicalNode("min", minProcedureSpec()),
plan.CreateLogicalNode("count", countProcedureSpec()),
plan.CreateLogicalNode("count", countProcedureSpec()),
},
Edges: [][2]int{
{0, 1},
@ -1333,11 +1332,11 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadWindowAggregate", &influxdb.ReadWindowAggregatePhysSpec{
ReadRangePhysSpec: readRange,
Aggregates: []plan.ProcedureKind{"min"},
WindowEvery: 60000000000,
Aggregates: []plan.ProcedureKind{"min"},
WindowEvery: 60000000000,
}),
plan.CreateLogicalNode("count", countProcedureSpec() ),
plan.CreateLogicalNode("count", countProcedureSpec() ),
plan.CreateLogicalNode("count", countProcedureSpec()),
plan.CreateLogicalNode("count", countProcedureSpec()),
},
Edges: [][2]int{
{0, 1},
@ -1348,14 +1347,14 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
// Helper that adds a test with a simple plan that does not pass due to a
// specified bad window
simpleMinUnchanged := func( name string, window universe.WindowProcedureSpec ) {
simpleMinUnchanged := func(name string, window universe.WindowProcedureSpec) {
// Note: NoChange is not working correctly for these tests. It is
// expecting empty time, start, and stop column fields.
tests = append( tests, plantest.RuleTestCase{
Name: name,
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window, "min", countProcedureSpec() ),
tests = append(tests, plantest.RuleTestCase{
Name: name,
Context: haveCaps,
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg(window, "min", countProcedureSpec()),
NoChange: true,
})
}
@ -1363,47 +1362,47 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
// Condition not met: period not equal to every
badWindow1 := window1m
badWindow1.Window.Period = dur2m
simpleMinUnchanged( "BadPeriod", badWindow1 )
simpleMinUnchanged("BadPeriod", badWindow1)
// Condition not met: offset non-zero
badWindow2 := window1m
badWindow2.Window.Offset = dur1m
simpleMinUnchanged( "BadOffset", badWindow2 )
simpleMinUnchanged("BadOffset", badWindow2)
// Condition not met: non-standard _time column
badWindow3 := window1m
badWindow3.TimeColumn = "_timmy"
simpleMinUnchanged( "BadTime", badWindow3 )
simpleMinUnchanged("BadTime", badWindow3)
// Condition not met: non-standard start column
badWindow4 := window1m
badWindow4.StartColumn = "_stooort"
simpleMinUnchanged( "BadStart", badWindow4 )
simpleMinUnchanged("BadStart", badWindow4)
// Condition not met: non-standard stop column
badWindow5 := window1m
badWindow5.StopColumn = "_stappp"
simpleMinUnchanged( "BadStop", badWindow5 )
simpleMinUnchanged("BadStop", badWindow5)
// Condition not met: createEmpty is not false
badWindow6 := window1m
badWindow6.CreateEmpty = true
simpleMinUnchanged( "BadCreateEmpty", badWindow6 )
simpleMinUnchanged("BadCreateEmpty", badWindow6)
// Condition not met: duration too long.
simpleMinUnchanged( "WindowTooLarge", window1y )
simpleMinUnchanged("WindowTooLarge", window1y)
// Condition not met: neg duration.
simpleMinUnchanged( "WindowNeg", windowNeg )
simpleMinUnchanged("WindowNeg", windowNeg)
// Bad min column
// ReadRange -> window -> min => NO-CHANGE
tests = append(tests, plantest.RuleTestCase{
Name: "BadMinCol",
Name: "BadMinCol",
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "min", &universe.MinProcedureSpec{
SelectorConfig: execute.SelectorConfig{Column:"_valmoo"},
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg(window1m, "min", &universe.MinProcedureSpec{
SelectorConfig: execute.SelectorConfig{Column: "_valmoo"},
}),
NoChange: true,
})
@ -1411,11 +1410,11 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
// Bad max column
// ReadRange -> window -> max => NO-CHANGE
tests = append(tests, plantest.RuleTestCase{
Name: "BadMaxCol",
Name: "BadMaxCol",
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "max", &universe.MaxProcedureSpec{
SelectorConfig: execute.SelectorConfig{Column:"_valmoo"},
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg(window1m, "max", &universe.MaxProcedureSpec{
SelectorConfig: execute.SelectorConfig{Column: "_valmoo"},
}),
NoChange: true,
})
@ -1423,20 +1422,20 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
// Bad mean columns
// ReadRange -> window -> mean => NO-CHANGE
tests = append(tests, plantest.RuleTestCase{
Name: "BadMeanCol1",
Name: "BadMeanCol1",
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "mean", &universe.MeanProcedureSpec{
AggregateConfig: execute.AggregateConfig{Columns:[]string{"_valmoo"}},
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg(window1m, "mean", &universe.MeanProcedureSpec{
AggregateConfig: execute.AggregateConfig{Columns: []string{"_valmoo"}},
}),
NoChange: true,
})
tests = append(tests, plantest.RuleTestCase{
Name: "BadMeanCol2",
Name: "BadMeanCol2",
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "mean", &universe.MeanProcedureSpec{
AggregateConfig: execute.AggregateConfig{Columns:[]string{"_value", "_valmoo"}},
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg(window1m, "mean", &universe.MeanProcedureSpec{
AggregateConfig: execute.AggregateConfig{Columns: []string{"_value", "_valmoo"}},
}),
NoChange: true,
})
@ -1445,15 +1444,15 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
// ReadRange -> window -> min
// \-> min
tests = append(tests, plantest.RuleTestCase{
Name: "CollapsedWithSuccessor1",
Name: "CollapsedWithSuccessor1",
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadRange", &readRange),
plan.CreateLogicalNode("window", &window1m),
plan.CreateLogicalNode("min", minProcedureSpec() ),
plan.CreateLogicalNode("min", minProcedureSpec() ),
plan.CreateLogicalNode("min", minProcedureSpec()),
plan.CreateLogicalNode("min", minProcedureSpec()),
},
Edges: [][2]int{
{0, 1},
@ -1468,14 +1467,14 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
// ReadRange -> window -> min
// \-> window
tests = append(tests, plantest.RuleTestCase{
Name: "CollapsedWithSuccessor2",
Name: "CollapsedWithSuccessor2",
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadRange", &readRange),
plan.CreateLogicalNode("window", &window1m),
plan.CreateLogicalNode("min", minProcedureSpec() ),
plan.CreateLogicalNode("min", minProcedureSpec()),
plan.CreateLogicalNode("window", &window2m),
},
Edges: [][2]int{
@ -1487,7 +1486,6 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
NoChange: true,
})
// No pattern match
// ReadRange -> filter -> window -> min -> NO-CHANGE
pushableFn1 := executetest.FunctionExpression(t, `(r) => true`)
@ -1498,7 +1496,7 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
Fn: expr,
}
}
noPatternMatch1 := func() *plantest.PlanSpec{
noPatternMatch1 := func() *plantest.PlanSpec {
return &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadRange", &readRange),
@ -1506,7 +1504,7 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
Fn: makeResolvedFilterFn(pushableFn1),
}),
plan.CreateLogicalNode("window", &window1m),
plan.CreateLogicalNode("min", minProcedureSpec() ),
plan.CreateLogicalNode("min", minProcedureSpec()),
},
Edges: [][2]int{
{0, 1},
@ -1516,16 +1514,16 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
}
}
tests = append(tests, plantest.RuleTestCase{
Name: "NoPatternMatch1",
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: noPatternMatch1(),
Name: "NoPatternMatch1",
Context: haveCaps,
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: noPatternMatch1(),
NoChange: true,
})
// No pattern match 2
// ReadRange -> window -> filter -> min -> NO-CHANGE
noPatternMatch2 := func() *plantest.PlanSpec{
noPatternMatch2 := func() *plantest.PlanSpec {
return &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadRange", &readRange),
@ -1533,7 +1531,7 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
plan.CreatePhysicalNode("filter", &universe.FilterProcedureSpec{
Fn: makeResolvedFilterFn(pushableFn1),
}),
plan.CreateLogicalNode("min", minProcedureSpec() ),
plan.CreateLogicalNode("min", minProcedureSpec()),
},
Edges: [][2]int{
{0, 1},
@ -1543,20 +1541,20 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
}
}
tests = append(tests, plantest.RuleTestCase{
Name: "NoPatternMatch2",
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: noPatternMatch2(),
Name: "NoPatternMatch2",
Context: haveCaps,
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: noPatternMatch2(),
NoChange: true,
})
// Fail due to no capabilities present.
tests = append(tests, plantest.RuleTestCase{
Context: noCaps,
Name: "FailNoCaps",
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "count", countProcedureSpec() ),
After: simpleResult( "count" ),
Context: noCaps,
Name: "FailNoCaps",
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg(window1m, "count", countProcedureSpec()),
After: simpleResult("count"),
NoChange: true,
})
@ -1569,28 +1567,138 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
}
}
func TestPushDownBareAggregateRule(t *testing.T) {
// Turn on support for window aggregate count
flagger := mock.NewFlagger(map[feature.Flag]interface{}{
feature.PushDownWindowAggregateCount(): true,
})
withFlagger, _ := feature.Annotate(context.Background(), flagger)
// Construct dependencies either with or without aggregate window caps.
deps := func(have bool) influxdb.StorageDependencies {
return influxdb.StorageDependencies{
FromDeps: influxdb.FromDependencies{
Reader: mockReaderCaps{Have: have},
Metrics: influxdb.NewMetrics(nil),
},
}
}
haveCaps := deps(true).Inject(withFlagger)
noCaps := deps(false).Inject(withFlagger)
readRange := &influxdb.ReadRangePhysSpec{
Bucket: "my-bucket",
Bounds: flux.Bounds{
Start: fluxTime(5),
Stop: fluxTime(10),
},
}
readWindowAggregate := &influxdb.ReadWindowAggregatePhysSpec{
ReadRangePhysSpec: *(readRange.Copy().(*influxdb.ReadRangePhysSpec)),
WindowEvery: math.MaxInt64,
Aggregates: []plan.ProcedureKind{universe.CountKind},
}
minProcedureSpec := func() *universe.MinProcedureSpec {
return &universe.MinProcedureSpec{
SelectorConfig: execute.SelectorConfig{Column: "_value"},
}
}
countProcedureSpec := func() *universe.CountProcedureSpec {
return &universe.CountProcedureSpec{
AggregateConfig: execute.AggregateConfig{Columns: []string{"_value"}},
}
}
testcases := []plantest.RuleTestCase{
{
// successful push down
Context: haveCaps,
Name: "push down count",
Rules: []plan.Rule{influxdb.PushDownBareAggregateRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadRange", readRange),
plan.CreatePhysicalNode("count", countProcedureSpec()),
},
Edges: [][2]int{
{0, 1},
},
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadWindowAggregate", readWindowAggregate),
},
},
},
{
// capability not provided in storage layer
Context: noCaps,
Name: "no caps",
Rules: []plan.Rule{influxdb.PushDownBareAggregateRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadRange", readRange),
plan.CreatePhysicalNode("count", countProcedureSpec()),
},
Edges: [][2]int{
{0, 1},
},
},
NoChange: true,
},
{
// unsupported aggregate
Context: haveCaps,
Name: "no push down min",
Rules: []plan.Rule{influxdb.PushDownBareAggregateRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadRange", readRange),
plan.CreatePhysicalNode("count", minProcedureSpec()),
},
Edges: [][2]int{
{0, 1},
},
},
NoChange: true,
},
}
for _, tc := range testcases {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
plantest.PhysicalRuleTestHelper(t, &tc)
})
}
}
//
// Group Aggregate Testing
//
func TestPushDownGroupAggregateRule(t *testing.T) {
// Turn on all flags
flagger := mock.NewFlagger(map[feature.Flag] interface{}{
flagger := mock.NewFlagger(map[feature.Flag]interface{}{
feature.PushDownGroupAggregateCount(): true,
})
ctx, _ := feature.Annotate(context.Background(), flagger)
readGroupAgg := func(aggregateMethod string) *influxdb.ReadGroupPhysSpec{
readGroupAgg := func(aggregateMethod string) *influxdb.ReadGroupPhysSpec {
return &influxdb.ReadGroupPhysSpec{
ReadRangePhysSpec: influxdb.ReadRangePhysSpec {
ReadRangePhysSpec: influxdb.ReadRangePhysSpec{
Bucket: "my-bucket",
Bounds: flux.Bounds{
Start: fluxTime(5),
Stop: fluxTime(10),
},
},
GroupMode: flux.GroupModeBy,
GroupKeys: []string{"_measurement", "tag0", "tag1"},
GroupMode: flux.GroupModeBy,
GroupKeys: []string{"_measurement", "tag0", "tag1"},
AggregateMethod: aggregateMethod,
}
}
@ -1601,7 +1709,7 @@ func TestPushDownGroupAggregateRule(t *testing.T) {
tests := make([]plantest.RuleTestCase, 0)
// construct a simple plan with a specific aggregate
simplePlanWithAgg := func( agg plan.NodeID, spec plan.ProcedureSpec ) *plantest.PlanSpec {
simplePlanWithAgg := func(agg plan.NodeID, spec plan.ProcedureSpec) *plantest.PlanSpec {
return &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadGroup", readGroup()),
@ -1614,10 +1722,10 @@ func TestPushDownGroupAggregateRule(t *testing.T) {
}
// construct a simple result
simpleResult := func( proc string ) *plantest.PlanSpec {
simpleResult := func(proc string) *plantest.PlanSpec {
return &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadGroup", readGroupAgg(proc) ),
plan.CreatePhysicalNode("ReadGroup", readGroupAgg(proc)),
},
}
}
@ -1641,24 +1749,24 @@ func TestPushDownGroupAggregateRule(t *testing.T) {
// ReadGroup -> count => ReadGroup
tests = append(tests, plantest.RuleTestCase{
Context: ctx,
Name: "SimplePassCount",
Rules: []plan.Rule{ influxdb.PushDownGroupAggregateRule{}},
Before: simplePlanWithAgg( "count", countProcedureSpec() ),
After: simpleResult( "count" ),
Name: "SimplePassCount",
Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}},
Before: simplePlanWithAgg("count", countProcedureSpec()),
After: simpleResult("count"),
})
// Rewrite with successors
// ReadGroup -> count -> sum {2} => ReadGroup -> count {2}
tests = append(tests, plantest.RuleTestCase{
Context: ctx,
Name: "WithSuccessor1",
Rules: []plan.Rule{ influxdb.PushDownGroupAggregateRule{}},
Name: "WithSuccessor1",
Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadGroup", readGroup()),
plan.CreateLogicalNode("count", countProcedureSpec() ),
plan.CreateLogicalNode("sum", sumProcedureSpec() ),
plan.CreateLogicalNode("sum", sumProcedureSpec() ),
plan.CreateLogicalNode("count", countProcedureSpec()),
plan.CreateLogicalNode("sum", sumProcedureSpec()),
plan.CreateLogicalNode("sum", sumProcedureSpec()),
},
Edges: [][2]int{
{0, 1},
@ -1668,9 +1776,9 @@ func TestPushDownGroupAggregateRule(t *testing.T) {
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadGroup", readGroupAgg("count") ),
plan.CreateLogicalNode("sum", sumProcedureSpec() ),
plan.CreateLogicalNode("sum", sumProcedureSpec() ),
plan.CreatePhysicalNode("ReadGroup", readGroupAgg("count")),
plan.CreateLogicalNode("sum", sumProcedureSpec()),
plan.CreateLogicalNode("sum", sumProcedureSpec()),
},
Edges: [][2]int{
{0, 1},
@ -1684,13 +1792,13 @@ func TestPushDownGroupAggregateRule(t *testing.T) {
// ReadGroup -> count -> count => ReadGroup -> count
tests = append(tests, plantest.RuleTestCase{
Context: ctx,
Name: "WithSuccessor2",
Rules: []plan.Rule{ influxdb.PushDownGroupAggregateRule{}},
Name: "WithSuccessor2",
Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadGroup", readGroup()),
plan.CreateLogicalNode("count", countProcedureSpec() ),
plan.CreateLogicalNode("count", countProcedureSpec() ),
plan.CreateLogicalNode("count", countProcedureSpec()),
plan.CreateLogicalNode("count", countProcedureSpec()),
},
Edges: [][2]int{
{0, 1},
@ -1699,8 +1807,8 @@ func TestPushDownGroupAggregateRule(t *testing.T) {
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadGroup", readGroupAgg("count") ),
plan.CreateLogicalNode("count", countProcedureSpec() ),
plan.CreatePhysicalNode("ReadGroup", readGroupAgg("count")),
plan.CreateLogicalNode("count", countProcedureSpec()),
},
Edges: [][2]int{
{0, 1},
@ -1711,27 +1819,27 @@ func TestPushDownGroupAggregateRule(t *testing.T) {
// Bad count column
// ReadGroup -> count => NO-CHANGE
tests = append(tests, plantest.RuleTestCase{
Name: "BadCountCol",
Name: "BadCountCol",
Context: ctx,
Rules: []plan.Rule{ influxdb.PushDownGroupAggregateRule{}},
Before: simplePlanWithAgg( "count", &universe.CountProcedureSpec{
Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}},
Before: simplePlanWithAgg("count", &universe.CountProcedureSpec{
AggregateConfig: execute.AggregateConfig{Columns: []string{"_valmoo"}},
}),
NoChange: true,
})
// No match due to a collapsed node having a successor
// ReadGroup -> count
// ReadGroup -> count
// \-> min
tests = append(tests, plantest.RuleTestCase{
Name: "CollapsedWithSuccessor",
Name: "CollapsedWithSuccessor",
Context: ctx,
Rules: []plan.Rule{ influxdb.PushDownGroupAggregateRule{}},
Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadGroup", readGroup()),
plan.CreateLogicalNode("count", countProcedureSpec() ),
plan.CreateLogicalNode("min", minProcedureSpec() ),
plan.CreateLogicalNode("count", countProcedureSpec()),
plan.CreateLogicalNode("min", minProcedureSpec()),
},
Edges: [][2]int{
{0, 1},
@ -1751,14 +1859,14 @@ func TestPushDownGroupAggregateRule(t *testing.T) {
Fn: expr,
}
}
noPatternMatch1 := func() *plantest.PlanSpec{
noPatternMatch1 := func() *plantest.PlanSpec {
return &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadGroup", readGroup()),
plan.CreatePhysicalNode("filter", &universe.FilterProcedureSpec{
Fn: makeResolvedFilterFn(pushableFn1),
}),
plan.CreateLogicalNode("count", countProcedureSpec() ),
plan.CreateLogicalNode("count", countProcedureSpec()),
},
Edges: [][2]int{
{0, 1},
@ -1767,10 +1875,10 @@ func TestPushDownGroupAggregateRule(t *testing.T) {
}
}
tests = append(tests, plantest.RuleTestCase{
Name: "NoPatternMatch",
Context: ctx,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: noPatternMatch1(),
Name: "NoPatternMatch",
Context: ctx,
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
Before: noPatternMatch1(),
NoChange: true,
})