feat(query): enable min/max pushdown (#20994)

* feat(query): enable min/max pushdown

* fix(query): fix the group last pushdown to use descending cursors

* test(storage): add read group test with no agg

Co-authored-by: Jonathan A. Sternberg <jonathan@influxdata.com>
pull/21272/head
Faith Chikwekwe 2021-04-20 12:56:43 -07:00 committed by GitHub
parent 8656c876b9
commit 7bde3413b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 921 additions and 336 deletions

View File

@ -0,0 +1,61 @@
package influxdb_test
import "testing/expect"
option now = () => (2030-01-01T00:00:00Z)
testcase push_down_min_bare extends "flux/planner/group_min_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_test.group_min_bare()
}
testcase push_down_min_bare_host extends "flux/planner/group_min_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_test.group_min_bare_host()
}
testcase push_down_min_bare_field extends "flux/planner/group_min_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_test.group_min_bare_field()
}
testcase push_down_max_bare extends "flux/planner/group_max_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_max_test.group_max_bare()
}
testcase push_down_max_bare_host extends "flux/planner/group_max_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_max_test.group_max_bare_host()
}
testcase push_down_max_bare_field extends "flux/planner/group_max_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_max_test.group_max_bare_field()
}
testcase push_down_table_test_min extends "flux/planner/group_min_max_table_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_max_table_test.group_min_table()
}
testcase push_down_table_test_max extends "flux/planner/group_min_max_table_test" {
expect.planner(rules: [
"PushDownGroupAggregateRule": 1,
])
group_min_max_table_test.group_max_table()
}

View File

@ -30,7 +30,7 @@ func init() {
PushDownWindowAggregateByTimeRule{},
PushDownBareAggregateRule{},
GroupWindowAggregateTransposeRule{},
// PushDownGroupAggregateRule{},
PushDownGroupAggregateRule{},
)
plan.RegisterLogicalRules(
MergeFiltersRule{},

File diff suppressed because it is too large Load Diff

View File

@ -17,6 +17,7 @@ import (
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
storage "github.com/influxdata/influxdb/v2/storage/reads"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
)
{{range .}}
//
@ -733,20 +734,17 @@ func (t *{{.name}}GroupTable) advance() bool {
return true
}
aggregate, err := determine{{.Name}}AggregateMethod(t.gc.Aggregate().Type)
aggregate, err := make{{.Name}}AggregateAccumulator(t.gc.Aggregate().Type)
if err != nil {
t.err = err
return false
}
ts, v := aggregate(arr.Timestamps, arr.Values)
timestamps, values := []int64{ts}, []{{.Type}}{v}
aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags)
for {
arr = t.cur.Next()
if arr.Len() > 0 {
ts, v := aggregate(arr.Timestamps, arr.Values)
timestamps = append(timestamps, ts)
values = append(values, v)
aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags)
continue
}
@ -754,7 +752,7 @@ func (t *{{.name}}GroupTable) advance() bool {
break
}
}
timestamp, value := aggregate(timestamps, values)
timestamp, value, tags := aggregate.Result()
colReader := t.allocateBuffer(1)
if IsSelector(t.gc.Aggregate()) {
@ -763,26 +761,123 @@ func (t *{{.name}}GroupTable) advance() bool {
} else {
colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]{{.Type}}{value})
}
t.appendTags(colReader)
t.appendTheseTags(colReader, tags)
t.appendBounds(colReader)
return true
}
type {{.name}}AggregateMethod func([]int64, []{{.Type}}) (int64, {{.Type}})
type {{.Name}}AggregateAccumulator interface {
// AccumulateFirst receives an initial array of items to select from.
// It selects an item and stores the state. Afterwards, more data can
// be supplied with AccumulateMore and the results can be requested at
// any time. Without a call to AccumulateFirst the results are not
// defined.
AccumulateFirst(timestamps []int64, values []{{.Type}}, tags [][]byte)
// determine{{.Name}}AggregateMethod returns the method for aggregating
// returned points within the same group. The incoming points are the
// ones returned for each series and the method returned here will
// AccumulateMore receives additional array elements to select from.
AccumulateMore(timestamps []int64, values []{{.Type}}, tags [][]byte)
// Result returns the item selected from the data received so far.
Result() (int64, {{.Type}}, [][]byte)
}
// The selector method takes a ( timestamp, value ) pair, a
// ( []timestamp, []value ) pair, and a starting index. It applies the selector
// to the single value and the array, starting at the supplied index. It
// returns -1 if the single value is selected and a non-negative value if an
// item from the array is selected.
type {{.name}}SelectorMethod func(int64, {{.Type}}, []int64, []{{.Type}}, int) (int)
// The selector accumulator tracks currently-selected item.
type {{.name}}SelectorAccumulator struct {
selector {{.name}}SelectorMethod
ts int64
v {{.Type}}
tags [][]byte
}
func (a *{{.name}}SelectorAccumulator) AccumulateFirst(timestamps []int64, values []{{.Type}}, tags [][]byte) {
index := a.selector(timestamps[0], values[0], timestamps, values, 1)
if index < 0 {
a.ts = timestamps[0]
a.v = values[0]
} else {
a.ts = timestamps[index]
a.v = values[index]
}
a.tags = make([][]byte, len(tags))
copy(a.tags, tags)
}
func (a *{{.name}}SelectorAccumulator) AccumulateMore(timestamps []int64, values []{{.Type}}, tags [][]byte) {
index := a.selector(a.ts, a.v, timestamps, values, 0)
if index >= 0 {
a.ts = timestamps[index]
a.v = values[index]
if len(tags) > cap(a.tags) {
a.tags = make([][]byte, len(tags))
} else {
a.tags = a.tags[:len(tags)]
}
copy(a.tags, tags)
}
}
func (a *{{.name}}SelectorAccumulator) Result() (int64, {{.Type}}, [][]byte) {
return a.ts, a.v, a.tags
}
{{if and (ne .Name "Boolean") (ne .Name "String")}}
// The aggregate method takes a value, an array of values, and a starting
// index, applies an aggregate operation over the value and the array, starting
// at the given index, and returns the result.
type {{.name}}AggregateMethod func({{.Type}}, []{{.Type}}, int) ({{.Type}})
type {{.name}}AggregateAccumulator struct {
aggregate {{.name}}AggregateMethod
accum {{.Type}}
// For pure aggregates it doesn't matter what we return for tags, but
// we need to satisfy the interface. We will just return the most
// recently seen tags.
tags [][]byte
}
func (a *{{.name}}AggregateAccumulator) AccumulateFirst(timestamps []int64, values []{{.Type}}, tags [][]byte) {
a.accum = a.aggregate(values[0], values, 1)
a.tags = tags
}
func (a *{{.name}}AggregateAccumulator) AccumulateMore(timestamps []int64, values []{{.Type}}, tags [][]byte) {
a.accum = a.aggregate(a.accum, values, 0)
a.tags = tags
}
// For group aggregates (non-selectors), the timestamp is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be
// anything and it won't matter.
func (a *{{.name}}AggregateAccumulator) Result() (int64, {{.Type}}, [][]byte) {
return math.MaxInt64, a.accum, a.tags
}
{{end}}
// make{{.Name}}AggregateAccumulator returns the interface implementation for
// aggregating returned points within the same group. The incoming points are
// the ones returned for each series and the struct returned here will
// aggregate the aggregates.
func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({{.name}}AggregateMethod, error){
func make{{.Name}}AggregateAccumulator(agg datatypes.Aggregate_AggregateType) ({{.Name}}AggregateAccumulator, error){
switch agg {
case datatypes.AggregateTypeFirst:
return aggregateFirstGroups{{.Name}}, nil
return &{{.name}}SelectorAccumulator{selector: selectorFirstGroups{{.Name}}}, nil
case datatypes.AggregateTypeLast:
return aggregateLastGroups{{.Name}}, nil
return &{{.name}}SelectorAccumulator{selector: selectorLastGroups{{.Name}}}, nil
case datatypes.AggregateTypeCount:
{{if eq .Name "Integer"}}
return aggregateCountGroups{{.Name}}, nil
return &{{.name}}AggregateAccumulator{aggregate: aggregateCountGroups{{.Name}}}, nil
{{else}}
return nil, &errors.Error {
Code: errors.EInvalid,
@ -791,7 +886,7 @@ func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({
{{end}}
case datatypes.AggregateTypeSum:
{{if and (ne .Name "Boolean") (ne .Name "String")}}
return aggregateSumGroups{{.Name}}, nil
return &{{.name}}AggregateAccumulator{aggregate: aggregateSumGroups{{.Name}}}, nil
{{else}}
return nil, &errors.Error {
Code: errors.EInvalid,
@ -800,7 +895,7 @@ func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({
{{end}}
case datatypes.AggregateTypeMin:
{{if and (ne .Name "Boolean") (ne .Name "String")}}
return aggregateMinGroups{{.Name}}, nil
return &{{.name}}SelectorAccumulator{selector: selectorMinGroups{{.Name}}}, nil
{{else}}
return nil, &errors.Error {
Code: errors.EInvalid,
@ -809,7 +904,7 @@ func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({
{{end}}
case datatypes.AggregateTypeMax:
{{if and (ne .Name "Boolean") (ne .Name "String")}}
return aggregateMaxGroups{{.Name}}, nil
return &{{.name}}SelectorAccumulator{selector: selectorMaxGroups{{.Name}}}, nil
{{else}}
return nil, &errors.Error {
Code: errors.EInvalid,
@ -825,82 +920,74 @@ func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({
}
{{if and (ne .Name "Boolean") (ne .Name "String")}}
func aggregateMinGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) {
value := values[0]
timestamp := timestamps[0]
func selectorMinGroups{{.Name}}(ts int64, v {{.Type}}, timestamps []int64, values []{{.Type}}, i int) (int) {
index := -1
for i := 1; i < len(values); i++ {
if value > values[i] {
value = values[i]
timestamp = timestamps[i]
for ; i < len(values); i++ {
if v > values[i] {
index = i
v = values[i]
}
}
return timestamp, value
return index
}
{{end}}
{{if and (ne .Name "Boolean") (ne .Name "String")}}
func aggregateMaxGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) {
value := values[0]
timestamp := timestamps[0]
func selectorMaxGroups{{.Name}}(ts int64, v {{.Type}}, timestamps []int64, values []{{.Type}}, i int) (int) {
index := -1
for i := 1; i < len(values); i++ {
if value < values[i] {
value = values[i]
timestamp = timestamps[i]
for ; i < len(values); i++ {
if v < values[i] {
index = i
v = values[i]
}
}
return timestamp, value
return index
}
{{end}}
// For group count and sum, the timestamp here is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be anything
// and it won't matter.
{{if eq .Name "Integer"}}
func aggregateCountGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) {
return aggregateSumGroups{{.Name}}(timestamps, values)
func aggregateCountGroups{{.Name}}(accum {{.Type}}, values []{{.Type}}, i int) ({{.Type}}) {
return aggregateSumGroups{{.Name}}(accum, values, i)
}
{{end}}
{{if and (ne .Name "Boolean") (ne .Name "String")}}
func aggregateSumGroups{{.Name}}(_ []int64, values []{{.Type}}) (int64, {{.Type}}) {
var sum {{.Type}}
for _, v := range values {
sum += v
func aggregateSumGroups{{.Name}}(sum {{.Type}}, values []{{.Type}}, i int) ({{.Type}}) {
for ; i< len(values); i++ {
sum += values[i]
}
return math.MaxInt64, sum
return sum
}
{{end}}
func aggregateFirstGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) {
value := values[0]
timestamp := timestamps[0]
func selectorFirstGroups{{.Name}}(ts int64, v {{.Type}}, timestamps []int64, values []{{.Type}}, i int) (int) {
index := -1
for i := 1; i < len(values); i++ {
if timestamp > timestamps[i] {
value = values[i]
timestamp = timestamps[i]
for ; i < len(values); i++ {
if ts > timestamps[i] {
index = i
ts = timestamps[i]
}
}
return timestamp, value
return index
}
func aggregateLastGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) {
value := values[0]
timestamp := timestamps[0]
func selectorLastGroups{{.Name}}(ts int64, v {{.Type}}, timestamps []int64, values []{{.Type}}, i int) (int) {
index := -1
for i := 1; i < len(values); i++ {
if timestamp < timestamps[i] {
value = values[i]
timestamp = timestamps[i]
for ; i < len(values); i++ {
if ts < timestamps[i] {
index = i
ts = timestamps[i]
}
}
return timestamp, value
return index
}
func (t *{{.name}}GroupTable) advanceCursor() bool {

View File

@ -203,7 +203,7 @@ func (t *table) readTags(tags models.Tags) {
for _, tag := range tags {
j := execute.ColIdx(string(tag.Key), t.cols)
// In the case of group aggregate, tags that are not referenced in group() are not included in the result, but
// readTags () still get a complete tag list. Here is just to skip the tags that should not present in the result.
// readTags () still get a complete tag list. Here is just to skip the tags that should not be present in the result.
if j < 0 {
continue
}
@ -211,16 +211,21 @@ func (t *table) readTags(tags models.Tags) {
}
}
// appendTags fills the colBufs for the tag columns with the tag value.
func (t *table) appendTags(cr *colReader) {
// appendTheseTags fills the colBufs for the tag columns with the given tag values.
func (t *table) appendTheseTags(cr *colReader, tags [][]byte) {
for j := range t.cols {
v := t.tags[j]
v := tags[j]
if v != nil {
cr.cols[j] = t.cache.GetTag(string(v), cr.l, t.alloc)
}
}
}
// appendTags fills the colBufs for the tag columns with the tag values from the table structure.
func (t *table) appendTags(cr *colReader) {
t.appendTheseTags(cr, t.tags)
}
// appendBounds fills the colBufs for the time bounds
func (t *table) appendBounds(cr *colReader) {
start, stop := t.cache.GetBounds(t.bounds, cr.l, t.alloc)

View File

@ -2887,7 +2887,6 @@ func TestStorageReader_ReadGroup(t *testing.T) {
// values vary among the candidate items for select and the read-group
// operation must track and return the correct set of tags.
func TestStorageReader_ReadGroupSelectTags(t *testing.T) {
t.Skip("fixme")
reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {
spec := Spec(org, bucket,
MeasurementSpec("m0",
@ -2948,27 +2947,101 @@ func TestStorageReader_ReadGroupSelectTags(t *testing.T) {
},
},
},
}
for _, tt := range cases {
mem := &memory.Allocator{}
got, err := reader.ReadGroup(context.Background(), query.ReadGroupSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
BucketID: reader.Bucket,
Bounds: reader.Bounds,
},
GroupMode: query.GroupModeBy,
GroupKeys: []string{"t0"},
AggregateMethod: tt.aggregate,
}, mem)
if err != nil {
t.Fatal(err)
}
t.Run("", func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadGroup(context.Background(), query.ReadGroupSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
BucketID: reader.Bucket,
Bounds: reader.Bounds,
},
GroupMode: query.GroupModeBy,
GroupKeys: []string{"t0"},
AggregateMethod: tt.aggregate,
}, mem)
if err != nil {
t.Fatal(err)
}
if diff := table.Diff(tt.want, got); diff != "" {
t.Errorf("unexpected results -want/+got:\n%s", diff)
}
if diff := table.Diff(tt.want, got); diff != "" {
t.Errorf("unexpected results -want/+got:\n%s", diff)
}
})
}
}
// TestStorageReader_ReadGroupNoAgg exercises the path where no aggregate is specified
func TestStorageReader_ReadGroupNoAgg(t *testing.T) {
reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {
spec := Spec(org, bucket,
MeasurementSpec("m0",
FloatArrayValuesSequence("f0", 10*time.Second, []float64{1.0, 2.0, 3.0, 4.0}),
TagValuesSequence("t1", "b-%s", 0, 2),
),
)
tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:00:40Z")
return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr
})
defer reader.Close()
cases := []struct {
aggregate string
want flux.TableIterator
}{
{
want: static.TableGroup{
static.TableMatrix{
{
static.Table{
static.StringKey("t1", "b-0"),
static.Strings("_measurement", "m0", "m0", "m0", "m0"),
static.Strings("_field", "f0", "f0", "f0", "f0"),
static.TimeKey("_start", "2019-11-25T00:00:00Z"),
static.TimeKey("_stop", "2019-11-25T00:00:40Z"),
static.Times("_time", "2019-11-25T00:00:00Z", "2019-11-25T00:00:10Z", "2019-11-25T00:00:20Z", "2019-11-25T00:00:30Z"),
static.Floats("_value", 1.0, 2.0, 3.0, 4.0),
},
},
{
static.Table{
static.StringKey("t1", "b-1"),
static.Strings("_measurement", "m0", "m0", "m0", "m0"),
static.Strings("_field", "f0", "f0", "f0", "f0"),
static.TimeKey("_start", "2019-11-25T00:00:00Z"),
static.TimeKey("_stop", "2019-11-25T00:00:40Z"),
static.Times("_time", "2019-11-25T00:00:00Z", "2019-11-25T00:00:10Z", "2019-11-25T00:00:20Z", "2019-11-25T00:00:30Z"),
static.Floats("_value", 1.0, 2.0, 3.0, 4.0),
},
},
},
},
},
}
for _, tt := range cases {
t.Run("", func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadGroup(context.Background(), query.ReadGroupSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
BucketID: reader.Bucket,
Bounds: reader.Bounds,
},
GroupMode: query.GroupModeBy,
GroupKeys: []string{"t1"},
}, mem)
if err != nil {
t.Fatal(err)
}
if diff := table.Diff(tt.want, got); diff != "" {
t.Errorf("unexpected results -want/+got:\n%s", diff)
}
})
}
}

View File

@ -40,6 +40,13 @@ func GroupOptionNilSortLo() GroupOption {
}
}
// IsAscendingGroupAggregate checks if this request is using the `last` aggregate type.
// It returns true if an ascending cursor should be used (all other conditions)
// or a descending cursor (when `last` is used).
func IsAscendingGroupAggregate(req *datatypes.ReadGroupRequest) bool {
return req.Aggregate == nil || req.Aggregate.Type != datatypes.AggregateTypeLast
}
func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, newSeriesCursorFn func() (SeriesCursor, error), opts ...GroupOption) GroupResultSet {
g := &groupResultSet{
ctx: ctx,
@ -54,7 +61,8 @@ func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, new
o(g)
}
g.arrayCursors = newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, true)
ascending := IsAscendingGroupAggregate(req)
g.arrayCursors = newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, ascending)
for i, k := range req.GroupKeys {
g.keys[i] = []byte(k)

View File

@ -209,7 +209,11 @@ func (s *Store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest)
return nil, err
}
shardIDs, err := s.findShardIDs(database, rp, false, start, end)
// Due to some optimizations around how flux's `last()` function is implemented with the
// storage engine, we need to detect if the read request requires a descending
// cursor or not.
descending := !reads.IsAscendingGroupAggregate(req)
shardIDs, err := s.findShardIDs(database, rp, descending, start, end)
if err != nil {
return nil, err
}