feat: added PushDownWindowAggregate planner rewrite rule (#17898)

Added a (disabled and feature-flagged) planner rule that matches:

ReadRange -> window -> { min, max, mean, count, sum }

The rule requires:
 * the pushDownWindowAggregate{Count,Rest} feature flags enabled
 * having WindowAggregateCapability
   (which StorageReader does not currently have)
 * use of "_value" columns only
 * window.period == window.every
 * window.every.months == 0
 * window.every is positive
 * window.offset == 0
 * standard time columns
 * createEmpty is false
pull/17974/head
Adrian Thurston 2020-05-06 10:27:17 +03:00 committed by GitHub
parent 7cb599c582
commit e51a2b81e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 633 additions and 3 deletions

View File

@ -26,3 +26,15 @@
default: 42
contact: Gavin Cabbage
expose: true
- name: Push Down Window Aggregate Count
description: Enable Count variant of PushDownWindowAggregateRule
key: pushDownWindowAggregateCount
default: false
contact: Query Team
- name: Push Down Window Aggregate Rest
description: Enable non-Count variants of PushDownWindowAggregateRule (stage 2)
key: pushDownWindowAggregateRest
default: false
contact: Query Team

View File

@ -30,12 +30,44 @@ func FrontendExample() IntFlag {
return frontendExample
}
var pushDownWindowAggregateCount = MakeBoolFlag(
"Push Down Window Aggregate Count",
"pushDownWindowAggregateCount",
"Query Team",
false,
Temporary,
false,
)
// PushDownWindowAggregateCount - Enable Count variant of PushDownWindowAggregateRule
func PushDownWindowAggregateCount() BoolFlag {
return pushDownWindowAggregateCount
}
var pushDownWindowAggregateRest = MakeBoolFlag(
"Push Down Window Aggregate Rest",
"pushDownWindowAggregateRest",
"Query Team",
false,
Temporary,
false,
)
// PushDownWindowAggregateRest - Enable non-Count variants of PushDownWindowAggregateRule (stage 2)
func PushDownWindowAggregateRest() BoolFlag {
return pushDownWindowAggregateRest
}
var all = []Flag{
backendExample,
frontendExample,
pushDownWindowAggregateCount,
pushDownWindowAggregateRest,
}
var byKey = map[string]Flag{
"backendExample": backendExample,
"frontendExample": frontendExample,
"backendExample": backendExample,
"frontendExample": frontendExample,
"pushDownWindowAggregateCount": pushDownWindowAggregateCount,
"pushDownWindowAggregateRest": pushDownWindowAggregateRest,
}

View File

@ -11,6 +11,8 @@ import (
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/stdlib/influxdata/influxdb"
"github.com/influxdata/flux/stdlib/universe"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/query"
)
func init() {
@ -22,6 +24,9 @@ func init() {
PushDownReadTagKeysRule{},
PushDownReadTagValuesRule{},
SortedPivotRule{},
// For this rule to take effect the appropriate capabilities must be
// added AND feature flags must be enabled.
// PushDownWindowAggregateRule{},
)
}
@ -638,3 +643,124 @@ func (SortedPivotRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bo
}
return pn, false, nil
}
//
// Push Down of window aggregates.
// ReadRangePhys |> window |> { min, max, mean, count, sum }
//
type PushDownWindowAggregateRule struct{}
func (PushDownWindowAggregateRule) Name() string {
return "PushDownWindowAggregateRule"
}
func (rule PushDownWindowAggregateRule) Pattern() plan.Pattern {
return plan.OneOf(
[]plan.ProcedureKind{
universe.MinKind,
universe.MaxKind,
universe.MeanKind,
universe.CountKind,
universe.SumKind,
},
plan.Pat(universe.WindowKind, plan.Pat(ReadRangePhysKind)))
}
func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) {
// Check Capabilities
reader := GetStorageDependencies(ctx).FromDeps.Reader
windowAggregateReader, ok := reader.(query.WindowAggregateReader)
if !ok {
return pn, false, nil
}
caps := windowAggregateReader.GetWindowAggregateCapability(ctx)
if caps == nil {
return pn, false, nil
}
// Check the aggregate function spec. Require operation on _value. There
// are two feature flags covering all cases. One specifically for Count,
// and another for the rest. There are individual capability tests for all
// cases.
fnNode := pn
switch fnNode.Kind() {
case universe.MinKind:
if !feature.PushDownWindowAggregateRest().Enabled(ctx) || !caps.HaveMin() {
return pn, false, nil
}
minSpec := fnNode.ProcedureSpec().(*universe.MinProcedureSpec)
if minSpec.Column != "_value" {
return pn, false, nil
}
case universe.MaxKind:
if !feature.PushDownWindowAggregateRest().Enabled(ctx) || !caps.HaveMax() {
return pn, false, nil
}
maxSpec := fnNode.ProcedureSpec().(*universe.MaxProcedureSpec)
if maxSpec.Column != "_value" {
return pn, false, nil
}
case universe.MeanKind:
if !feature.PushDownWindowAggregateRest().Enabled(ctx) || !caps.HaveMean() {
return pn, false, nil
}
meanSpec := fnNode.ProcedureSpec().(*universe.MeanProcedureSpec)
if len(meanSpec.Columns) != 1 || meanSpec.Columns[0] != "_value" {
return pn, false, nil
}
case universe.CountKind:
if !feature.PushDownWindowAggregateCount().Enabled(ctx) || !caps.HaveCount() {
return pn, false, nil
}
countSpec := fnNode.ProcedureSpec().(*universe.CountProcedureSpec)
if len(countSpec.Columns) != 1 || countSpec.Columns[0] != "_value" {
return pn, false, nil
}
case universe.SumKind:
if !feature.PushDownWindowAggregateRest().Enabled(ctx) || !caps.HaveSum() {
return pn, false, nil
}
sumSpec := fnNode.ProcedureSpec().(*universe.SumProcedureSpec)
if len(sumSpec.Columns) != 1 || sumSpec.Columns[0] != "_value" {
return pn, false, nil
}
}
windowNode := fnNode.Predecessors()[0]
windowSpec := windowNode.ProcedureSpec().(*universe.WindowProcedureSpec)
fromNode := windowNode.Predecessors()[0]
fromSpec := fromNode.ProcedureSpec().(*ReadRangePhysSpec)
// every and period must be equal
// every.months must be zero
// every.isNegative must be false
// offset: must be zero
// timeColumn: must be "_time"
// startColumn: must be "_start"
// stopColumn: must be "_stop"
// createEmpty: must be false
window := windowSpec.Window
if !window.Every.Equal(window.Period) ||
window.Every.Months() != 0 ||
window.Every.IsNegative() ||
window.Every.IsZero() ||
!window.Offset.IsZero() ||
windowSpec.TimeColumn != "_time" ||
windowSpec.StartColumn != "_start" ||
windowSpec.StopColumn != "_stop" ||
windowSpec.CreateEmpty {
return pn, false, nil
}
// Rule passes.
return plan.CreatePhysicalNode("ReadWindowAggregate", &ReadWindowAggregatePhysSpec{
ReadRangePhysSpec: *fromSpec.Copy().(*ReadRangePhysSpec),
Aggregates: []plan.ProcedureKind{fnNode.Kind()},
WindowEvery: window.Every.Nanoseconds(),
}), true, nil
}

View File

@ -1,6 +1,7 @@
package influxdb_test
import (
"context"
"testing"
"time"
@ -9,14 +10,45 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/execute/executetest"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/plan/plantest"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/stdlib/universe"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2/query"
"github.com/influxdata/influxdb/v2/query/stdlib/influxdata/influxdb"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/kit/feature"
)
// A small mock reader so we can indicate if rule-related capabilities are
// present
type mockReaderCaps struct {
query.StorageReader
Have bool
}
func (caps mockReaderCaps) GetWindowAggregateCapability(ctx context.Context) query.WindowAggregateCapability {
return mockWAC{ Have: caps.Have }
}
func (caps mockReaderCaps) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {
return nil, nil
}
// Mock Window Aggregate Capability
type mockWAC struct {
Have bool
}
func (m mockWAC) HaveMin() bool { return m.Have }
func (m mockWAC) HaveMax() bool { return m.Have }
func (m mockWAC) HaveMean() bool { return m.Have }
func (m mockWAC) HaveCount() bool { return m.Have }
func (m mockWAC) HaveSum() bool { return m.Have }
func fluxTime(t int64) flux.Time {
return flux.Time{
Absolute: time.Unix(0, t).UTC(),
@ -1114,3 +1146,425 @@ func TestReadTagValuesRule(t *testing.T) {
})
}
}
//
// Window Aggregate Testing
//
func TestPushDownWindowAggregateRule(t *testing.T) {
// Turn on all variants.
flagger := mock.NewFlagger(map[feature.Flag] interface{}{
feature.PushDownWindowAggregateCount(): true,
feature.PushDownWindowAggregateRest(): true,
})
withFlagger, _ := feature.Annotate(context.Background(), flagger)
// Construct dependencies either with or without aggregate window caps.
deps := func(have bool) influxdb.StorageDependencies {
return influxdb.StorageDependencies{
FromDeps: influxdb.FromDependencies{
Reader: mockReaderCaps{ Have: have },
Metrics: influxdb.NewMetrics(nil),
},
}
}
haveCaps := deps(true).Inject(withFlagger)
noCaps := deps(false).Inject(withFlagger)
readRange := influxdb.ReadRangePhysSpec{
Bucket: "my-bucket",
Bounds: flux.Bounds{
Start: fluxTime(5),
Stop: fluxTime(10),
},
}
dur1m := values.ConvertDuration(60 * time.Second)
dur2m := values.ConvertDuration(120 * time.Second)
dur0 := values.ConvertDuration(0)
durNeg, _ := values.ParseDuration("-60s")
dur1y, _ := values.ParseDuration("1y")
window := func(dur values.Duration) universe.WindowProcedureSpec{
return universe.WindowProcedureSpec{
Window: plan.WindowSpec{
Every: dur,
Period: dur,
Offset: dur0,
},
TimeColumn: "_time",
StartColumn: "_start",
StopColumn: "_stop",
CreateEmpty: false,
}
}
window1m := window(dur1m)
window2m := window(dur2m)
windowNeg := window(durNeg)
window1y := window(dur1y)
tests := make([]plantest.RuleTestCase, 0)
// construct a simple plan with a specific window
simplePlanWithWindowAgg := func( window universe.WindowProcedureSpec, agg plan.NodeID, spec plan.ProcedureSpec ) *plantest.PlanSpec {
return &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadRange", &readRange),
plan.CreateLogicalNode("window", &window),
plan.CreateLogicalNode(agg, spec),
},
Edges: [][2]int{
{0, 1},
{1, 2},
},
}
}
// construct a simple result
simpleResult := func( proc plan.ProcedureKind ) *plantest.PlanSpec {
return &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadWindowAggregate", &influxdb.ReadWindowAggregatePhysSpec{
ReadRangePhysSpec: readRange,
Aggregates: []plan.ProcedureKind{proc},
WindowEvery: 60000000000,
}),
},
}
}
minProcedureSpec := func() *universe.MinProcedureSpec {
return &universe.MinProcedureSpec{
SelectorConfig: execute.SelectorConfig{Column: "_value"},
}
}
maxProcedureSpec := func() *universe.MaxProcedureSpec {
return &universe.MaxProcedureSpec{
SelectorConfig: execute.SelectorConfig{Column: "_value"},
}
}
meanProcedureSpec := func() *universe.MeanProcedureSpec {
return &universe.MeanProcedureSpec{
AggregateConfig: execute.AggregateConfig{Columns: []string{"_value"}},
}
}
countProcedureSpec := func() *universe.CountProcedureSpec {
return &universe.CountProcedureSpec{
AggregateConfig: execute.AggregateConfig{Columns: []string{"_value"}},
}
}
sumProcedureSpec := func() *universe.SumProcedureSpec {
return &universe.SumProcedureSpec{
AggregateConfig: execute.AggregateConfig{Columns: []string{"_value"}},
}
}
// ReadRange -> window -> min => ReadWindowAggregate
tests = append(tests, plantest.RuleTestCase{
Context: haveCaps,
Name: "SimplePassMin",
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "min", minProcedureSpec() ),
After: simpleResult( "min" ),
})
// ReadRange -> window -> max => ReadWindowAggregate
tests = append(tests, plantest.RuleTestCase{
Context: haveCaps,
Name: "SimplePassMax",
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "max", maxProcedureSpec() ),
After: simpleResult( "max" ),
})
// ReadRange -> window -> mean => ReadWindowAggregate
tests = append(tests, plantest.RuleTestCase{
Context: haveCaps,
Name: "SimplePassMean",
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "mean", meanProcedureSpec() ),
After: simpleResult( "mean" ),
})
// ReadRange -> window -> count => ReadWindowAggregate
tests = append(tests, plantest.RuleTestCase{
Context: haveCaps,
Name: "SimplePassCount",
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "count", countProcedureSpec() ),
After: simpleResult( "count" ),
})
// ReadRange -> window -> sum => ReadWindowAggregate
tests = append(tests, plantest.RuleTestCase{
Context: haveCaps,
Name: "SimplePassSum",
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "sum", sumProcedureSpec() ),
After: simpleResult( "sum" ),
})
// Rewrite with successors
// ReadRange -> window -> min -> count {2} => ReadWindowAggregate -> count {2}
tests = append(tests, plantest.RuleTestCase{
Context: haveCaps,
Name: "WithSuccessor",
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadRange", &readRange),
plan.CreateLogicalNode("window", &window1m),
plan.CreateLogicalNode("min", minProcedureSpec() ),
plan.CreateLogicalNode("count", countProcedureSpec() ),
plan.CreateLogicalNode("count", countProcedureSpec() ),
},
Edges: [][2]int{
{0, 1},
{1, 2},
{2, 3},
{2, 4},
},
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadWindowAggregate", &influxdb.ReadWindowAggregatePhysSpec{
ReadRangePhysSpec: readRange,
Aggregates: []plan.ProcedureKind{"min"},
WindowEvery: 60000000000,
}),
plan.CreateLogicalNode("count", countProcedureSpec() ),
plan.CreateLogicalNode("count", countProcedureSpec() ),
},
Edges: [][2]int{
{0, 1},
{0, 2},
},
},
})
// Helper that adds a test with a simple plan that does not pass due to a
// specified bad window
simpleMinUnchanged := func( name string, window universe.WindowProcedureSpec ) {
// Note: NoChange is not working correctly for these tests. It is
// expecting empty time, start, and stop column fields.
tests = append( tests, plantest.RuleTestCase{
Name: name,
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window, "min", countProcedureSpec() ),
NoChange: true,
})
}
// Condition not met: period not equal to every
badWindow1 := window1m
badWindow1.Window.Period = dur2m
simpleMinUnchanged( "BadPeriod", badWindow1 )
// Condition not met: offset non-zero
badWindow2 := window1m
badWindow2.Window.Offset = dur1m
simpleMinUnchanged( "BadOffset", badWindow2 )
// Condition not met: non-standard _time column
badWindow3 := window1m
badWindow3.TimeColumn = "_timmy"
simpleMinUnchanged( "BadTime", badWindow3 )
// Condition not met: non-standard start column
badWindow4 := window1m
badWindow4.StartColumn = "_stooort"
simpleMinUnchanged( "BadStart", badWindow4 )
// Condition not met: non-standard stop column
badWindow5 := window1m
badWindow5.StopColumn = "_stappp"
simpleMinUnchanged( "BadStop", badWindow5 )
// Condition not met: createEmpty is not false
badWindow6 := window1m
badWindow6.CreateEmpty = true
simpleMinUnchanged( "BadCreateEmpty", badWindow6 )
// Condition not met: duration too long.
simpleMinUnchanged( "WindowTooLarge", window1y )
// Condition not met: neg duration.
simpleMinUnchanged( "WindowNeg", windowNeg )
// Bad min column
// ReadRange -> window -> min => NO-CHANGE
tests = append(tests, plantest.RuleTestCase{
Name: "BadMinCol",
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "min", &universe.MinProcedureSpec{
SelectorConfig: execute.SelectorConfig{Column:"_valmoo"},
}),
NoChange: true,
})
// Bad max column
// ReadRange -> window -> max => NO-CHANGE
tests = append(tests, plantest.RuleTestCase{
Name: "BadMaxCol",
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "max", &universe.MaxProcedureSpec{
SelectorConfig: execute.SelectorConfig{Column:"_valmoo"},
}),
NoChange: true,
})
// Bad mean columns
// ReadRange -> window -> mean => NO-CHANGE
tests = append(tests, plantest.RuleTestCase{
Name: "BadMeanCol1",
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "mean", &universe.MeanProcedureSpec{
AggregateConfig: execute.AggregateConfig{Columns:[]string{"_valmoo"}},
}),
NoChange: true,
})
tests = append(tests, plantest.RuleTestCase{
Name: "BadMeanCol2",
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "mean", &universe.MeanProcedureSpec{
AggregateConfig: execute.AggregateConfig{Columns:[]string{"_value", "_valmoo"}},
}),
NoChange: true,
})
// No match due to a collapsed node having a successor
// ReadRange -> window -> min
// \-> min
tests = append(tests, plantest.RuleTestCase{
Name: "CollapsedWithSuccessor1",
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadRange", &readRange),
plan.CreateLogicalNode("window", &window1m),
plan.CreateLogicalNode("min", minProcedureSpec() ),
plan.CreateLogicalNode("min", minProcedureSpec() ),
},
Edges: [][2]int{
{0, 1},
{1, 2},
{1, 3},
},
},
NoChange: true,
})
// No match due to a collapsed node having a successor
// ReadRange -> window -> min
// \-> window
tests = append(tests, plantest.RuleTestCase{
Name: "CollapsedWithSuccessor2",
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadRange", &readRange),
plan.CreateLogicalNode("window", &window1m),
plan.CreateLogicalNode("min", minProcedureSpec() ),
plan.CreateLogicalNode("window", &window2m),
},
Edges: [][2]int{
{0, 1},
{1, 2},
{0, 3},
},
},
NoChange: true,
})
// No pattern match
// ReadRange -> filter -> window -> min -> NO-CHANGE
pushableFn1 := executetest.FunctionExpression(t, `(r) => true`)
makeResolvedFilterFn := func(expr *semantic.FunctionExpression) interpreter.ResolvedFunction {
return interpreter.ResolvedFunction{
Scope: nil,
Fn: expr,
}
}
noPatternMatch1 := func() *plantest.PlanSpec{
return &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadRange", &readRange),
plan.CreatePhysicalNode("filter", &universe.FilterProcedureSpec{
Fn: makeResolvedFilterFn(pushableFn1),
}),
plan.CreateLogicalNode("window", &window1m),
plan.CreateLogicalNode("min", minProcedureSpec() ),
},
Edges: [][2]int{
{0, 1},
{1, 2},
{2, 3},
},
}
}
tests = append(tests, plantest.RuleTestCase{
Name: "NoPatternMatch1",
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: noPatternMatch1(),
NoChange: true,
})
// No pattern match 2
// ReadRange -> window -> filter -> min -> NO-CHANGE
noPatternMatch2 := func() *plantest.PlanSpec{
return &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadRange", &readRange),
plan.CreateLogicalNode("window", &window1m),
plan.CreatePhysicalNode("filter", &universe.FilterProcedureSpec{
Fn: makeResolvedFilterFn(pushableFn1),
}),
plan.CreateLogicalNode("min", minProcedureSpec() ),
},
Edges: [][2]int{
{0, 1},
{1, 2},
{2, 3},
},
}
}
tests = append(tests, plantest.RuleTestCase{
Name: "NoPatternMatch2",
Context: haveCaps,
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: noPatternMatch2(),
NoChange: true,
})
// Fail due to no capabilities present.
tests = append(tests, plantest.RuleTestCase{
Context: noCaps,
Name: "FailNoCaps",
Rules: []plan.Rule{ influxdb.PushDownWindowAggregateRule{}},
Before: simplePlanWithWindowAgg( window1m, "count", countProcedureSpec() ),
After: simpleResult( "count" ),
NoChange: true,
})
for _, tc := range tests {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
plantest.PhysicalRuleTestHelper(t, &tc)
})
}
}

View File

@ -25,7 +25,13 @@ type StorageReader interface {
}
// WindowAggregateCapability describes what is supported by WindowAggregateReader.
type WindowAggregateCapability interface{}
type WindowAggregateCapability interface {
HaveMin() bool
HaveMax() bool
HaveMean() bool
HaveCount() bool
HaveSum() bool
}
// WindowAggregateReader implements the WindowAggregate capability.
type WindowAggregateReader interface {