feat(query): planner rule to push down window and bare first() and la… (#18534)
parent
037777a6ef
commit
95d97c6e29
30
flags.yml
30
flags.yml
|
@ -55,9 +55,33 @@
|
|||
default: false
|
||||
contact: Query Team
|
||||
|
||||
- name: Push Down Window Aggregate Rest
|
||||
description: Enable non-Count, non-Sum variants of PushDownWindowAggregateRule and PushDownWindowAggregateRule (stage 2)
|
||||
key: pushDownWindowAggregateRest
|
||||
- name: Push Down Window Aggregate Min
|
||||
description: Enable Min variant of PushDownWindowAggregateRule and PushDownBareAggregateRule
|
||||
key: pushDownWindowAggregateMin
|
||||
default: false
|
||||
contact: Query Team
|
||||
|
||||
- name: Push Down Window Aggregate Max
|
||||
description: Enable Max variant of PushDownWindowAggregateRule and PushDownBareAggregateRule
|
||||
key: pushDownWindowAggregateMax
|
||||
default: false
|
||||
contact: Query Team
|
||||
|
||||
- name: Push Down Window Aggregate Mean
|
||||
description: Enable Mean variant of PushDownWindowAggregateRule and PushDownBareAggregateRule
|
||||
key: pushDownWindowAggregateMean
|
||||
default: false
|
||||
contact: Query Team
|
||||
|
||||
- name: Push Down Window Aggregate First
|
||||
description: Enable First variant of PushDownWindowAggregateRule and PushDownBareAggregateRule
|
||||
key: pushDownWindowAggregateFirst
|
||||
default: false
|
||||
contact: Query Team
|
||||
|
||||
- name: Push Down Window Aggregate Last
|
||||
description: Enable Last variant of PushDownWindowAggregateRule and PushDownBareAggregateRule
|
||||
key: pushDownWindowAggregateLast
|
||||
default: false
|
||||
contact: Query Team
|
||||
|
||||
|
|
|
@ -86,18 +86,74 @@ func PushDownWindowAggregateSum() BoolFlag {
|
|||
return pushDownWindowAggregateSum
|
||||
}
|
||||
|
||||
var pushDownWindowAggregateRest = MakeBoolFlag(
|
||||
"Push Down Window Aggregate Rest",
|
||||
"pushDownWindowAggregateRest",
|
||||
var pushDownWindowAggregateMin = MakeBoolFlag(
|
||||
"Push Down Window Aggregate Min",
|
||||
"pushDownWindowAggregateMin",
|
||||
"Query Team",
|
||||
false,
|
||||
Temporary,
|
||||
false,
|
||||
)
|
||||
|
||||
// PushDownWindowAggregateRest - Enable non-Count, non-Sum variants of PushDownWindowAggregateRule and PushDownWindowAggregateRule (stage 2)
|
||||
func PushDownWindowAggregateRest() BoolFlag {
|
||||
return pushDownWindowAggregateRest
|
||||
// PushDownWindowAggregateMin - Enable Min variant of PushDownWindowAggregateRule and PushDownBareAggregateRule
|
||||
func PushDownWindowAggregateMin() BoolFlag {
|
||||
return pushDownWindowAggregateMin
|
||||
}
|
||||
|
||||
var pushDownWindowAggregateMax = MakeBoolFlag(
|
||||
"Push Down Window Aggregate Max",
|
||||
"pushDownWindowAggregateMax",
|
||||
"Query Team",
|
||||
false,
|
||||
Temporary,
|
||||
false,
|
||||
)
|
||||
|
||||
// PushDownWindowAggregateMax - Enable Max variant of PushDownWindowAggregateRule and PushDownBareAggregateRule
|
||||
func PushDownWindowAggregateMax() BoolFlag {
|
||||
return pushDownWindowAggregateMax
|
||||
}
|
||||
|
||||
var pushDownWindowAggregateMean = MakeBoolFlag(
|
||||
"Push Down Window Aggregate Mean",
|
||||
"pushDownWindowAggregateMean",
|
||||
"Query Team",
|
||||
false,
|
||||
Temporary,
|
||||
false,
|
||||
)
|
||||
|
||||
// PushDownWindowAggregateMean - Enable Mean variant of PushDownWindowAggregateRule and PushDownBareAggregateRule
|
||||
func PushDownWindowAggregateMean() BoolFlag {
|
||||
return pushDownWindowAggregateMean
|
||||
}
|
||||
|
||||
var pushDownWindowAggregateFirst = MakeBoolFlag(
|
||||
"Push Down Window Aggregate First",
|
||||
"pushDownWindowAggregateFirst",
|
||||
"Query Team",
|
||||
false,
|
||||
Temporary,
|
||||
false,
|
||||
)
|
||||
|
||||
// PushDownWindowAggregateFirst - Enable First variant of PushDownWindowAggregateRule and PushDownBareAggregateRule
|
||||
func PushDownWindowAggregateFirst() BoolFlag {
|
||||
return pushDownWindowAggregateFirst
|
||||
}
|
||||
|
||||
var pushDownWindowAggregateLast = MakeBoolFlag(
|
||||
"Push Down Window Aggregate Last",
|
||||
"pushDownWindowAggregateLast",
|
||||
"Query Team",
|
||||
false,
|
||||
Temporary,
|
||||
false,
|
||||
)
|
||||
|
||||
// PushDownWindowAggregateLast - Enable Last variant of PushDownWindowAggregateRule and PushDownBareAggregateRule
|
||||
func PushDownWindowAggregateLast() BoolFlag {
|
||||
return pushDownWindowAggregateLast
|
||||
}
|
||||
|
||||
var newAuth = MakeBoolFlag(
|
||||
|
@ -247,7 +303,11 @@ var all = []Flag{
|
|||
frontendExample,
|
||||
pushDownWindowAggregateCount,
|
||||
pushDownWindowAggregateSum,
|
||||
pushDownWindowAggregateRest,
|
||||
pushDownWindowAggregateMin,
|
||||
pushDownWindowAggregateMax,
|
||||
pushDownWindowAggregateMean,
|
||||
pushDownWindowAggregateFirst,
|
||||
pushDownWindowAggregateLast,
|
||||
newAuth,
|
||||
sessionService,
|
||||
pushDownGroupAggregateCount,
|
||||
|
@ -267,7 +327,11 @@ var byKey = map[string]Flag{
|
|||
"frontendExample": frontendExample,
|
||||
"pushDownWindowAggregateCount": pushDownWindowAggregateCount,
|
||||
"pushDownWindowAggregateSum": pushDownWindowAggregateSum,
|
||||
"pushDownWindowAggregateRest": pushDownWindowAggregateRest,
|
||||
"pushDownWindowAggregateMin": pushDownWindowAggregateMin,
|
||||
"pushDownWindowAggregateMax": pushDownWindowAggregateMax,
|
||||
"pushDownWindowAggregateMean": pushDownWindowAggregateMean,
|
||||
"pushDownWindowAggregateFirst": pushDownWindowAggregateFirst,
|
||||
"pushDownWindowAggregateLast": pushDownWindowAggregateLast,
|
||||
"newAuth": newAuth,
|
||||
"sessionService": sessionService,
|
||||
"pushDownGroupAggregateCount": pushDownGroupAggregateCount,
|
||||
|
|
|
@ -659,11 +659,13 @@ func (PushDownWindowAggregateRule) Name() string {
|
|||
}
|
||||
|
||||
var windowPushableAggs = []plan.ProcedureKind{
|
||||
universe.CountKind,
|
||||
universe.SumKind,
|
||||
universe.MinKind,
|
||||
universe.MaxKind,
|
||||
universe.MeanKind,
|
||||
universe.CountKind,
|
||||
universe.SumKind,
|
||||
universe.FirstKind,
|
||||
universe.LastKind,
|
||||
}
|
||||
|
||||
func (rule PushDownWindowAggregateRule) Pattern() plan.Pattern {
|
||||
|
@ -687,28 +689,25 @@ func canPushWindowedAggregate(ctx context.Context, fnNode plan.Node) bool {
|
|||
// and check the feature flag associated with the aggregate function.
|
||||
switch fnNode.Kind() {
|
||||
case universe.MinKind:
|
||||
if !feature.PushDownWindowAggregateRest().Enabled(ctx) || !caps.HaveMin() {
|
||||
if !feature.PushDownWindowAggregateMin().Enabled(ctx) || !caps.HaveMin() {
|
||||
return false
|
||||
}
|
||||
|
||||
minSpec := fnNode.ProcedureSpec().(*universe.MinProcedureSpec)
|
||||
if minSpec.Column != execute.DefaultValueColLabel {
|
||||
return false
|
||||
}
|
||||
case universe.MaxKind:
|
||||
if !feature.PushDownWindowAggregateRest().Enabled(ctx) || !caps.HaveMax() {
|
||||
if !feature.PushDownWindowAggregateMax().Enabled(ctx) || !caps.HaveMax() {
|
||||
return false
|
||||
}
|
||||
|
||||
maxSpec := fnNode.ProcedureSpec().(*universe.MaxProcedureSpec)
|
||||
if maxSpec.Column != execute.DefaultValueColLabel {
|
||||
return false
|
||||
}
|
||||
case universe.MeanKind:
|
||||
if !feature.PushDownWindowAggregateRest().Enabled(ctx) || !caps.HaveMean() {
|
||||
if !feature.PushDownWindowAggregateMean().Enabled(ctx) || !caps.HaveMean() {
|
||||
return false
|
||||
}
|
||||
|
||||
meanSpec := fnNode.ProcedureSpec().(*universe.MeanProcedureSpec)
|
||||
if len(meanSpec.Columns) != 1 || meanSpec.Columns[0] != execute.DefaultValueColLabel {
|
||||
return false
|
||||
|
@ -717,7 +716,6 @@ func canPushWindowedAggregate(ctx context.Context, fnNode plan.Node) bool {
|
|||
if !feature.PushDownWindowAggregateCount().Enabled(ctx) || !caps.HaveCount() {
|
||||
return false
|
||||
}
|
||||
|
||||
countSpec := fnNode.ProcedureSpec().(*universe.CountProcedureSpec)
|
||||
if len(countSpec.Columns) != 1 || countSpec.Columns[0] != execute.DefaultValueColLabel {
|
||||
return false
|
||||
|
@ -726,11 +724,18 @@ func canPushWindowedAggregate(ctx context.Context, fnNode plan.Node) bool {
|
|||
if !feature.PushDownWindowAggregateSum().Enabled(ctx) || !caps.HaveSum() {
|
||||
return false
|
||||
}
|
||||
|
||||
sumSpec := fnNode.ProcedureSpec().(*universe.SumProcedureSpec)
|
||||
if len(sumSpec.Columns) != 1 || sumSpec.Columns[0] != execute.DefaultValueColLabel {
|
||||
return false
|
||||
}
|
||||
case universe.FirstKind:
|
||||
if !feature.PushDownWindowAggregateFirst().Enabled(ctx) || !caps.HaveFirst() {
|
||||
return false
|
||||
}
|
||||
case universe.LastKind:
|
||||
if !feature.PushDownWindowAggregateLast().Enabled(ctx) || !caps.HaveLast() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -63,6 +63,8 @@ func (m mockWAC) HaveMax() bool { return m.Have }
|
|||
func (m mockWAC) HaveMean() bool { return m.Have }
|
||||
func (m mockWAC) HaveCount() bool { return m.Have }
|
||||
func (m mockWAC) HaveSum() bool { return m.Have }
|
||||
func (m mockWAC) HaveFirst() bool { return m.Have }
|
||||
func (m mockWAC) HaveLast() bool { return m.Have }
|
||||
|
||||
func fluxTime(t int64) flux.Time {
|
||||
return flux.Time{
|
||||
|
@ -1162,6 +1164,42 @@ func TestReadTagValuesRule(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func minProcedureSpec() *universe.MinProcedureSpec {
|
||||
return &universe.MinProcedureSpec{
|
||||
SelectorConfig: execute.SelectorConfig{Column: execute.DefaultValueColLabel},
|
||||
}
|
||||
}
|
||||
func maxProcedureSpec() *universe.MaxProcedureSpec {
|
||||
return &universe.MaxProcedureSpec{
|
||||
SelectorConfig: execute.SelectorConfig{Column: execute.DefaultValueColLabel},
|
||||
}
|
||||
}
|
||||
func countProcedureSpec() *universe.CountProcedureSpec {
|
||||
return &universe.CountProcedureSpec{
|
||||
AggregateConfig: execute.AggregateConfig{Columns: []string{execute.DefaultValueColLabel}},
|
||||
}
|
||||
}
|
||||
func sumProcedureSpec() *universe.SumProcedureSpec {
|
||||
return &universe.SumProcedureSpec{
|
||||
AggregateConfig: execute.AggregateConfig{Columns: []string{execute.DefaultValueColLabel}},
|
||||
}
|
||||
}
|
||||
func firstProcedureSpec() *universe.FirstProcedureSpec {
|
||||
return &universe.FirstProcedureSpec{
|
||||
SelectorConfig: execute.SelectorConfig{Column: execute.DefaultValueColLabel},
|
||||
}
|
||||
}
|
||||
func lastProcedureSpec() *universe.LastProcedureSpec {
|
||||
return &universe.LastProcedureSpec{
|
||||
SelectorConfig: execute.SelectorConfig{Column: execute.DefaultValueColLabel},
|
||||
}
|
||||
}
|
||||
func meanProcedureSpec() *universe.MeanProcedureSpec {
|
||||
return &universe.MeanProcedureSpec{
|
||||
AggregateConfig: execute.AggregateConfig{Columns: []string{execute.DefaultValueColLabel}},
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Window Aggregate Testing
|
||||
//
|
||||
|
@ -1170,7 +1208,11 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
|
|||
flagger := mock.NewFlagger(map[feature.Flag]interface{}{
|
||||
feature.PushDownWindowAggregateCount(): true,
|
||||
feature.PushDownWindowAggregateSum(): true,
|
||||
feature.PushDownWindowAggregateRest(): true,
|
||||
feature.PushDownWindowAggregateMin(): true,
|
||||
feature.PushDownWindowAggregateMax(): true,
|
||||
feature.PushDownWindowAggregateMean(): true,
|
||||
feature.PushDownWindowAggregateFirst(): true,
|
||||
feature.PushDownWindowAggregateLast(): true,
|
||||
})
|
||||
|
||||
withFlagger, _ := feature.Annotate(context.Background(), flagger)
|
||||
|
@ -1261,39 +1303,13 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
|
|||
return spec
|
||||
}
|
||||
|
||||
minProcedureSpec := func() *universe.MinProcedureSpec {
|
||||
return &universe.MinProcedureSpec{
|
||||
SelectorConfig: execute.SelectorConfig{Column: "_value"},
|
||||
}
|
||||
}
|
||||
maxProcedureSpec := func() *universe.MaxProcedureSpec {
|
||||
return &universe.MaxProcedureSpec{
|
||||
SelectorConfig: execute.SelectorConfig{Column: "_value"},
|
||||
}
|
||||
}
|
||||
meanProcedureSpec := func() *universe.MeanProcedureSpec {
|
||||
return &universe.MeanProcedureSpec{
|
||||
AggregateConfig: execute.AggregateConfig{Columns: []string{"_value"}},
|
||||
}
|
||||
}
|
||||
countProcedureSpec := func() *universe.CountProcedureSpec {
|
||||
return &universe.CountProcedureSpec{
|
||||
AggregateConfig: execute.AggregateConfig{Columns: []string{"_value"}},
|
||||
}
|
||||
}
|
||||
sumProcedureSpec := func() *universe.SumProcedureSpec {
|
||||
return &universe.SumProcedureSpec{
|
||||
AggregateConfig: execute.AggregateConfig{Columns: []string{"_value"}},
|
||||
}
|
||||
}
|
||||
|
||||
// ReadRange -> window -> min => ReadWindowAggregate
|
||||
tests = append(tests, plantest.RuleTestCase{
|
||||
Context: haveCaps,
|
||||
Name: "SimplePassMin",
|
||||
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
|
||||
Before: simplePlanWithWindowAgg(window1m, "min", minProcedureSpec()),
|
||||
After: simpleResult("min", false),
|
||||
Before: simplePlanWithWindowAgg(window1m, universe.MinKind, minProcedureSpec()),
|
||||
After: simpleResult(universe.MinKind, false),
|
||||
})
|
||||
|
||||
// ReadRange -> window -> max => ReadWindowAggregate
|
||||
|
@ -1301,8 +1317,8 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
|
|||
Context: haveCaps,
|
||||
Name: "SimplePassMax",
|
||||
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
|
||||
Before: simplePlanWithWindowAgg(window1m, "max", maxProcedureSpec()),
|
||||
After: simpleResult("max", false),
|
||||
Before: simplePlanWithWindowAgg(window1m, universe.MaxKind, maxProcedureSpec()),
|
||||
After: simpleResult(universe.MaxKind, false),
|
||||
})
|
||||
|
||||
// ReadRange -> window -> mean => ReadWindowAggregate
|
||||
|
@ -1310,8 +1326,8 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
|
|||
Context: haveCaps,
|
||||
Name: "SimplePassMean",
|
||||
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
|
||||
Before: simplePlanWithWindowAgg(window1m, "mean", meanProcedureSpec()),
|
||||
After: simpleResult("mean", false),
|
||||
Before: simplePlanWithWindowAgg(window1m, universe.MeanKind, meanProcedureSpec()),
|
||||
After: simpleResult(universe.MeanKind, false),
|
||||
})
|
||||
|
||||
// ReadRange -> window -> count => ReadWindowAggregate
|
||||
|
@ -1319,8 +1335,8 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
|
|||
Context: haveCaps,
|
||||
Name: "SimplePassCount",
|
||||
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
|
||||
Before: simplePlanWithWindowAgg(window1m, "count", countProcedureSpec()),
|
||||
After: simpleResult("count", false),
|
||||
Before: simplePlanWithWindowAgg(window1m, universe.CountKind, countProcedureSpec()),
|
||||
After: simpleResult(universe.CountKind, false),
|
||||
})
|
||||
|
||||
// ReadRange -> window -> sum => ReadWindowAggregate
|
||||
|
@ -1328,8 +1344,26 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
|
|||
Context: haveCaps,
|
||||
Name: "SimplePassSum",
|
||||
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
|
||||
Before: simplePlanWithWindowAgg(window1m, "sum", sumProcedureSpec()),
|
||||
After: simpleResult("sum", false),
|
||||
Before: simplePlanWithWindowAgg(window1m, universe.SumKind, sumProcedureSpec()),
|
||||
After: simpleResult(universe.SumKind, false),
|
||||
})
|
||||
|
||||
// ReadRange -> window -> first => ReadWindowAggregate
|
||||
tests = append(tests, plantest.RuleTestCase{
|
||||
Context: haveCaps,
|
||||
Name: "SimplePassFirst",
|
||||
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
|
||||
Before: simplePlanWithWindowAgg(window1m, universe.FirstKind, firstProcedureSpec()),
|
||||
After: simpleResult(universe.FirstKind, false),
|
||||
})
|
||||
|
||||
// ReadRange -> window -> last => ReadWindowAggregate
|
||||
tests = append(tests, plantest.RuleTestCase{
|
||||
Context: haveCaps,
|
||||
Name: "SimplePassLast",
|
||||
Rules: []plan.Rule{influxdb.PushDownWindowAggregateRule{}},
|
||||
Before: simplePlanWithWindowAgg(window1m, universe.LastKind, lastProcedureSpec()),
|
||||
After: simpleResult(universe.LastKind, false),
|
||||
})
|
||||
|
||||
// Rewrite with successors
|
||||
|
@ -1883,85 +1917,13 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestSwitchFillImplRule(t *testing.T) {
|
||||
flagger := mock.NewFlagger(map[feature.Flag]interface{}{
|
||||
feature.MemoryOptimizedFill(): true,
|
||||
})
|
||||
withFlagger, _ := feature.Annotate(context.Background(), flagger)
|
||||
readRange := &influxdb.ReadRangePhysSpec{
|
||||
Bucket: "my-bucket",
|
||||
Bounds: flux.Bounds{
|
||||
Start: fluxTime(5),
|
||||
Stop: fluxTime(10),
|
||||
},
|
||||
}
|
||||
sourceSpec := &universe.DualImplProcedureSpec{
|
||||
ProcedureSpec: &universe.FillProcedureSpec{
|
||||
DefaultCost: plan.DefaultCost{},
|
||||
Column: "_value",
|
||||
Value: values.NewFloat(0),
|
||||
UsePrevious: false,
|
||||
},
|
||||
UseDeprecated: false,
|
||||
}
|
||||
targetSpec := sourceSpec.Copy().(*universe.DualImplProcedureSpec)
|
||||
universe.UseDeprecatedImpl(targetSpec)
|
||||
|
||||
testcases := []plantest.RuleTestCase{
|
||||
{
|
||||
Context: withFlagger,
|
||||
Name: "enable memory optimized fill",
|
||||
Rules: []plan.Rule{influxdb.SwitchFillImplRule{}},
|
||||
Before: &plantest.PlanSpec{
|
||||
Nodes: []plan.Node{
|
||||
plan.CreatePhysicalNode("ReadRange", readRange),
|
||||
plan.CreatePhysicalNode("fill", sourceSpec),
|
||||
},
|
||||
Edges: [][2]int{
|
||||
{0, 1},
|
||||
},
|
||||
},
|
||||
NoChange: true,
|
||||
},
|
||||
{
|
||||
Context: context.Background(),
|
||||
Name: "disable memory optimized fill",
|
||||
Rules: []plan.Rule{influxdb.SwitchFillImplRule{}},
|
||||
Before: &plantest.PlanSpec{
|
||||
Nodes: []plan.Node{
|
||||
plan.CreatePhysicalNode("ReadRange", readRange),
|
||||
plan.CreatePhysicalNode("fill", sourceSpec),
|
||||
},
|
||||
Edges: [][2]int{
|
||||
{0, 1},
|
||||
},
|
||||
},
|
||||
After: &plantest.PlanSpec{
|
||||
Nodes: []plan.Node{
|
||||
plan.CreatePhysicalNode("ReadRange", readRange),
|
||||
plan.CreatePhysicalNode("fill", targetSpec),
|
||||
},
|
||||
Edges: [][2]int{
|
||||
{0, 1},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testcases {
|
||||
tc := tc
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
plantest.PhysicalRuleTestHelper(t, &tc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPushDownBareAggregateRule(t *testing.T) {
|
||||
// Turn on support for window aggregate count
|
||||
flagger := mock.NewFlagger(map[feature.Flag]interface{}{
|
||||
feature.PushDownWindowAggregateCount(): true,
|
||||
feature.PushDownWindowAggregateSum(): true,
|
||||
feature.PushDownWindowAggregateFirst(): true,
|
||||
feature.PushDownWindowAggregateLast(): true,
|
||||
})
|
||||
|
||||
withFlagger, _ := feature.Annotate(context.Background(), flagger)
|
||||
|
@ -1995,25 +1957,9 @@ func TestPushDownBareAggregateRule(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
minProcedureSpec := func() *universe.MinProcedureSpec {
|
||||
return &universe.MinProcedureSpec{
|
||||
SelectorConfig: execute.SelectorConfig{Column: "_value"},
|
||||
}
|
||||
}
|
||||
countProcedureSpec := func() *universe.CountProcedureSpec {
|
||||
return &universe.CountProcedureSpec{
|
||||
AggregateConfig: execute.AggregateConfig{Columns: []string{"_value"}},
|
||||
}
|
||||
}
|
||||
sumProcedureSpec := func() *universe.SumProcedureSpec {
|
||||
return &universe.SumProcedureSpec{
|
||||
AggregateConfig: execute.AggregateConfig{Columns: []string{"_value"}},
|
||||
}
|
||||
}
|
||||
|
||||
testcases := []plantest.RuleTestCase{
|
||||
{
|
||||
// successful push down
|
||||
// ReadRange -> count => ReadWindowAggregate
|
||||
Context: haveCaps,
|
||||
Name: "push down count",
|
||||
Rules: []plan.Rule{influxdb.PushDownBareAggregateRule{}},
|
||||
|
@ -2028,12 +1974,12 @@ func TestPushDownBareAggregateRule(t *testing.T) {
|
|||
},
|
||||
After: &plantest.PlanSpec{
|
||||
Nodes: []plan.Node{
|
||||
plan.CreatePhysicalNode("ReadWindowAggregate", readWindowAggregate("count")),
|
||||
plan.CreatePhysicalNode("ReadWindowAggregate", readWindowAggregate(universe.CountKind)),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// successful push down
|
||||
// ReadRange -> sum => ReadWindowAggregate
|
||||
Context: haveCaps,
|
||||
Name: "push down sum",
|
||||
Rules: []plan.Rule{influxdb.PushDownBareAggregateRule{}},
|
||||
|
@ -2048,7 +1994,47 @@ func TestPushDownBareAggregateRule(t *testing.T) {
|
|||
},
|
||||
After: &plantest.PlanSpec{
|
||||
Nodes: []plan.Node{
|
||||
plan.CreatePhysicalNode("ReadWindowAggregate", readWindowAggregate("sum")),
|
||||
plan.CreatePhysicalNode("ReadWindowAggregate", readWindowAggregate(universe.SumKind)),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// ReadRange -> first => ReadWindowAggregate
|
||||
Context: haveCaps,
|
||||
Name: "push down first",
|
||||
Rules: []plan.Rule{influxdb.PushDownBareAggregateRule{}},
|
||||
Before: &plantest.PlanSpec{
|
||||
Nodes: []plan.Node{
|
||||
plan.CreatePhysicalNode("ReadRange", readRange),
|
||||
plan.CreatePhysicalNode("first", firstProcedureSpec()),
|
||||
},
|
||||
Edges: [][2]int{
|
||||
{0, 1},
|
||||
},
|
||||
},
|
||||
After: &plantest.PlanSpec{
|
||||
Nodes: []plan.Node{
|
||||
plan.CreatePhysicalNode("ReadWindowAggregate", readWindowAggregate(universe.FirstKind)),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
// ReadRange -> last => ReadWindowAggregate
|
||||
Context: haveCaps,
|
||||
Name: "push down last",
|
||||
Rules: []plan.Rule{influxdb.PushDownBareAggregateRule{}},
|
||||
Before: &plantest.PlanSpec{
|
||||
Nodes: []plan.Node{
|
||||
plan.CreatePhysicalNode("ReadRange", readRange),
|
||||
plan.CreatePhysicalNode("last", lastProcedureSpec()),
|
||||
},
|
||||
Edges: [][2]int{
|
||||
{0, 1},
|
||||
},
|
||||
},
|
||||
After: &plantest.PlanSpec{
|
||||
Nodes: []plan.Node{
|
||||
plan.CreatePhysicalNode("ReadWindowAggregate", readWindowAggregate(universe.LastKind)),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -2430,3 +2416,77 @@ func TestPushDownGroupAggregateRule(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSwitchFillImplRule(t *testing.T) {
|
||||
flagger := mock.NewFlagger(map[feature.Flag]interface{}{
|
||||
feature.MemoryOptimizedFill(): true,
|
||||
})
|
||||
withFlagger, _ := feature.Annotate(context.Background(), flagger)
|
||||
readRange := &influxdb.ReadRangePhysSpec{
|
||||
Bucket: "my-bucket",
|
||||
Bounds: flux.Bounds{
|
||||
Start: fluxTime(5),
|
||||
Stop: fluxTime(10),
|
||||
},
|
||||
}
|
||||
sourceSpec := &universe.DualImplProcedureSpec{
|
||||
ProcedureSpec: &universe.FillProcedureSpec{
|
||||
DefaultCost: plan.DefaultCost{},
|
||||
Column: "_value",
|
||||
Value: values.NewFloat(0),
|
||||
UsePrevious: false,
|
||||
},
|
||||
UseDeprecated: false,
|
||||
}
|
||||
targetSpec := sourceSpec.Copy().(*universe.DualImplProcedureSpec)
|
||||
universe.UseDeprecatedImpl(targetSpec)
|
||||
|
||||
testcases := []plantest.RuleTestCase{
|
||||
{
|
||||
Context: withFlagger,
|
||||
Name: "enable memory optimized fill",
|
||||
Rules: []plan.Rule{influxdb.SwitchFillImplRule{}},
|
||||
Before: &plantest.PlanSpec{
|
||||
Nodes: []plan.Node{
|
||||
plan.CreatePhysicalNode("ReadRange", readRange),
|
||||
plan.CreatePhysicalNode("fill", sourceSpec),
|
||||
},
|
||||
Edges: [][2]int{
|
||||
{0, 1},
|
||||
},
|
||||
},
|
||||
NoChange: true,
|
||||
},
|
||||
{
|
||||
Context: context.Background(),
|
||||
Name: "disable memory optimized fill",
|
||||
Rules: []plan.Rule{influxdb.SwitchFillImplRule{}},
|
||||
Before: &plantest.PlanSpec{
|
||||
Nodes: []plan.Node{
|
||||
plan.CreatePhysicalNode("ReadRange", readRange),
|
||||
plan.CreatePhysicalNode("fill", sourceSpec),
|
||||
},
|
||||
Edges: [][2]int{
|
||||
{0, 1},
|
||||
},
|
||||
},
|
||||
After: &plantest.PlanSpec{
|
||||
Nodes: []plan.Node{
|
||||
plan.CreatePhysicalNode("ReadRange", readRange),
|
||||
plan.CreatePhysicalNode("fill", targetSpec),
|
||||
},
|
||||
Edges: [][2]int{
|
||||
{0, 1},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testcases {
|
||||
tc := tc
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
plantest.PhysicalRuleTestHelper(t, &tc)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,6 +42,8 @@ type WindowAggregateCapability interface {
|
|||
HaveMean() bool
|
||||
HaveCount() bool
|
||||
HaveSum() bool
|
||||
HaveFirst() bool
|
||||
HaveLast() bool
|
||||
}
|
||||
|
||||
// WindowAggregateReader implements the WindowAggregate capability.
|
||||
|
|
|
@ -218,6 +218,8 @@ type WindowAggregateCapability struct {
|
|||
Mean bool
|
||||
Count bool
|
||||
Sum bool
|
||||
First bool
|
||||
Last bool
|
||||
}
|
||||
|
||||
func (w WindowAggregateCapability) HaveMin() bool { return w.Min }
|
||||
|
@ -225,3 +227,5 @@ func (w WindowAggregateCapability) HaveMax() bool { return w.Max }
|
|||
func (w WindowAggregateCapability) HaveMean() bool { return w.Mean }
|
||||
func (w WindowAggregateCapability) HaveCount() bool { return w.Count }
|
||||
func (w WindowAggregateCapability) HaveSum() bool { return w.Sum }
|
||||
func (w WindowAggregateCapability) HaveFirst() bool { return w.First }
|
||||
func (w WindowAggregateCapability) HaveLast() bool { return w.Last }
|
||||
|
|
Loading…
Reference in New Issue