feat(query/stdlib): add min and max to ReadGroup (#19158)

Enables the mix and max aggregates for the ReadGroupAggregte pushdown behind a feature flag.

Co-authored-by: Jonathan A. Sternberg <jonathan@influxdata.com>
pull/19222/head
Faith Chikwekwe 2020-08-05 07:40:26 -07:00 committed by GitHub
parent b484bfc34f
commit d48dc690a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1015 additions and 207 deletions

View File

@ -2235,6 +2235,80 @@ from(bucket: v.bucket)
,result,table,kk,_value
,,0,kk0,32
,,1,kk1,35
`,
},
{
name: "min group",
data: []string{
"m0,k=k0,kk=kk0 f=0i 0",
"m0,k=k0,kk=kk1 f=1i 1000000000",
"m0,k=k0,kk=kk0 f=2i 2000000000",
"m0,k=k0,kk=kk1 f=3i 3000000000",
"m0,k=k0,kk=kk0 f=4i 4000000000",
"m0,k=k0,kk=kk1 f=5i 5000000000",
"m0,k=k0,kk=kk0 f=6i 6000000000",
"m0,k=k0,kk=kk1 f=5i 7000000000",
"m0,k=k0,kk=kk0 f=0i 8000000000",
"m0,k=k0,kk=kk1 f=6i 9000000000",
"m0,k=k0,kk=kk0 f=6i 10000000000",
"m0,k=k0,kk=kk1 f=7i 11000000000",
"m0,k=k0,kk=kk0 f=5i 12000000000",
"m0,k=k0,kk=kk1 f=8i 13000000000",
"m0,k=k0,kk=kk0 f=9i 14000000000",
"m0,k=k0,kk=kk1 f=5i 15000000000",
},
op: "readGroup(min)",
query: `
from(bucket: v.bucket)
|> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:15Z)
|> group(columns: ["kk"])
|> min()
|> keep(columns: ["kk", "_value"])
`,
want: `
#datatype,string,long,string,long
#group,false,false,true,false
#default,_result,,,
,result,table,kk,_value
,,0,kk0,0
,,1,kk1,1
`,
},
{
name: "max group",
data: []string{
"m0,k=k0,kk=kk0 f=0i 0",
"m0,k=k0,kk=kk1 f=1i 1000000000",
"m0,k=k0,kk=kk0 f=2i 2000000000",
"m0,k=k0,kk=kk1 f=3i 3000000000",
"m0,k=k0,kk=kk0 f=4i 4000000000",
"m0,k=k0,kk=kk1 f=5i 5000000000",
"m0,k=k0,kk=kk0 f=6i 6000000000",
"m0,k=k0,kk=kk1 f=5i 7000000000",
"m0,k=k0,kk=kk0 f=0i 8000000000",
"m0,k=k0,kk=kk1 f=6i 9000000000",
"m0,k=k0,kk=kk0 f=6i 10000000000",
"m0,k=k0,kk=kk1 f=7i 11000000000",
"m0,k=k0,kk=kk0 f=5i 12000000000",
"m0,k=k0,kk=kk1 f=8i 13000000000",
"m0,k=k0,kk=kk0 f=9i 14000000000",
"m0,k=k0,kk=kk1 f=5i 15000000000",
},
op: "readGroup(max)",
query: `
from(bucket: v.bucket)
|> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:15Z)
|> group(columns: ["kk"])
|> max()
|> keep(columns: ["kk", "_value"])
`,
want: `
#datatype,string,long,string,long
#group,false,false,true,false
#default,_result,,,
,result,table,kk,_value
,,0,kk0,9
,,1,kk1,8
`,
},
}
@ -2247,6 +2321,7 @@ from(bucket: v.bucket)
feature.PushDownWindowAggregateMean(): true,
feature.PushDownWindowAggregateMin(): true,
feature.PushDownWindowAggregateMax(): true,
feature.PushDownGroupAggregateMinMax(): true,
}))
l.SetupOrFail(t)

View File

@ -151,3 +151,9 @@
contact: Monitoring Team
lifetime: temporary
expose: true
- name: Push Down Group Aggregate Min Max
description: Enable the min and max variants of the PushDownGroupAggregate planner rule
key: pushDownGroupAggregateMinMax
default: false
contact: Query Team

View File

@ -282,6 +282,20 @@ func Notebooks() BoolFlag {
return notebooks
}
var pushDownGroupAggregateMinMax = MakeBoolFlag(
"Push Down Group Aggregate Min Max",
"pushDownGroupAggregateMinMax",
"Query Team",
false,
Temporary,
false,
)
// PushDownGroupAggregateMinMax - Enable the min and max variants of the PushDownGroupAggregate planner rule
func PushDownGroupAggregateMinMax() BoolFlag {
return pushDownGroupAggregateMinMax
}
var all = []Flag{
appMetrics,
backendExample,
@ -303,6 +317,7 @@ var all = []Flag{
useUserPermission,
mergeFiltersRule,
notebooks,
pushDownGroupAggregateMinMax,
}
var byKey = map[string]Flag{
@ -326,4 +341,5 @@ var byKey = map[string]Flag{
"useUserPermission": useUserPermission,
"mergeFiltersRule": mergeFiltersRule,
"notebooks": notebooks,
"pushDownGroupAggregateMinMax": pushDownGroupAggregateMinMax,
}

View File

@ -1023,6 +1023,8 @@ func (rule PushDownGroupAggregateRule) Pattern() plan.Pattern {
universe.SumKind,
universe.FirstKind,
universe.LastKind,
universe.MinKind,
universe.MaxKind,
},
plan.Pat(ReadGroupPhysKind))
}
@ -1075,6 +1077,28 @@ func (PushDownGroupAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (pl
AggregateMethod: universe.LastKind,
})
return node, true, nil
case universe.MinKind:
// ReadGroup() -> min => ReadGroup(min)
if feature.PushDownGroupAggregateMinMax().Enabled(ctx) {
node := plan.CreatePhysicalNode("ReadGroupAggregate", &ReadGroupPhysSpec{
ReadRangePhysSpec: group.ReadRangePhysSpec,
GroupMode: group.GroupMode,
GroupKeys: group.GroupKeys,
AggregateMethod: universe.MinKind,
})
return node, true, nil
}
case universe.MaxKind:
// ReadGroup() -> max => ReadGroup(max)
if feature.PushDownGroupAggregateMinMax().Enabled(ctx) {
node := plan.CreatePhysicalNode("ReadGroupAggregate", &ReadGroupPhysSpec{
ReadRangePhysSpec: group.ReadRangePhysSpec,
GroupMode: group.GroupMode,
GroupKeys: group.GroupKeys,
AggregateMethod: universe.MaxKind,
})
return node, true, nil
}
}
return pn, false, nil
}
@ -1102,6 +1126,12 @@ func canPushGroupedAggregate(ctx context.Context, pn plan.Node) bool {
case universe.LastKind:
agg := pn.ProcedureSpec().(*universe.LastProcedureSpec)
return caps.HaveLast() && agg.Column == execute.DefaultValueColLabel
case universe.MaxKind:
agg := pn.ProcedureSpec().(*universe.MaxProcedureSpec)
return caps.HaveMax() && agg.Column == execute.DefaultValueColLabel
case universe.MinKind:
agg := pn.ProcedureSpec().(*universe.MinProcedureSpec)
return caps.HaveMin() && agg.Column == execute.DefaultValueColLabel
}
return false
}

View File

@ -2672,7 +2672,9 @@ func TestPushDownBareAggregateRule(t *testing.T) {
//
func TestPushDownGroupAggregateRule(t *testing.T) {
// Turn on all flags
ctx, _ := feature.Annotate(context.Background(), mock.NewFlagger(map[feature.Flag]interface{}{}))
ctx, _ := feature.Annotate(context.Background(), mock.NewFlagger(map[feature.Flag]interface{}{
feature.PushDownGroupAggregateMinMax(): true,
}))
caps := func(c query.GroupCapability) context.Context {
deps := influxdb.StorageDependencies{
@ -2726,6 +2728,20 @@ func TestPushDownGroupAggregateRule(t *testing.T) {
},
}
}
minProcedureSpecVal := func() *universe.MinProcedureSpec {
return &universe.MinProcedureSpec{
SelectorConfig: execute.SelectorConfig{
Column: execute.DefaultValueColLabel,
},
}
}
maxProcedureSpecVal := func() *universe.MaxProcedureSpec {
return &universe.MaxProcedureSpec{
SelectorConfig: execute.SelectorConfig{
Column: execute.DefaultValueColLabel,
},
}
}
countProcedureSpec := func() *universe.CountProcedureSpec {
return &universe.CountProcedureSpec{
AggregateConfig: execute.DefaultAggregateConfig,
@ -2829,12 +2845,56 @@ func TestPushDownGroupAggregateRule(t *testing.T) {
// ReadGroup() -> last => ReadGroup() -> last
tests = append(tests, plantest.RuleTestCase{
Context: caps(mockGroupCapability{}),
Name: "RewriteGroupLast",
Name: "NoLastCapability",
Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}},
Before: simplePlanWithAgg("last", lastProcedureSpec()),
NoChange: true,
})
// ReadGroup() -> max => ReadGroup(max)
tests = append(tests, plantest.RuleTestCase{
Context: caps(mockGroupCapability{max: true}),
Name: "RewriteGroupMax",
Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}},
Before: simplePlanWithAgg("max", maxProcedureSpecVal()),
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadGroupAggregate", readGroupAgg("max")),
},
},
})
// ReadGroup() -> max => ReadGroup() -> max
tests = append(tests, plantest.RuleTestCase{
Context: caps(mockGroupCapability{}),
Name: "NoMaxCapability",
Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}},
Before: simplePlanWithAgg("max", maxProcedureSpecVal()),
NoChange: true,
})
// ReadGroup() -> min => ReadGroup(min)
tests = append(tests, plantest.RuleTestCase{
Context: caps(mockGroupCapability{min: true}),
Name: "RewriteGroupMin",
Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}},
Before: simplePlanWithAgg("min", minProcedureSpecVal()),
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("ReadGroupAggregate", readGroupAgg("min")),
},
},
})
// ReadGroup() -> min => ReadGroup() -> min
tests = append(tests, plantest.RuleTestCase{
Context: caps(mockGroupCapability{}),
Name: "NoMinCapability",
Rules: []plan.Rule{influxdb.PushDownGroupAggregateRule{}},
Before: simplePlanWithAgg("min", minProcedureSpecVal()),
NoChange: true,
})
// Rewrite with successors
// ReadGroup() -> count -> sum {2} => ReadGroup(count) -> sum {2}
tests = append(tests, plantest.RuleTestCase{

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/errors"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/query"
@ -273,6 +274,12 @@ func (gi *groupIterator) Do(f func(flux.Table) error) error {
req.Range.Start = int64(gi.spec.Bounds.Start)
req.Range.End = int64(gi.spec.Bounds.Stop)
if len(gi.spec.GroupKeys) > 0 && gi.spec.GroupMode == query.GroupModeNone {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "cannot have group mode none with group key values",
}
}
req.Group = convertGroupMode(gi.spec.GroupMode)
req.GroupKeys = gi.spec.GroupKeys

View File

@ -7,6 +7,7 @@
package storageflux
import (
"fmt"
"math"
"sync"
@ -746,49 +747,29 @@ func (t *floatGroupTable) advance() bool {
return true
}
// handle the group with aggregate case
var value float64
// For group count, sum, min, and max, the timestamp here is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be anything
// and it won't matter.
// For group first, we need to assign the initial value to math.MaxInt64 so
// we can find the row with the smallest timestamp.
// Do not worry about data with math.MaxInt64 as its real timestamp.
// In OSS we require a |> range() call in the query and a math.MaxInt64 timestamp
// cannot make it through.
var timestamp int64 = math.MaxInt64
if t.gc.Aggregate().Type == datatypes.AggregateTypeLast {
timestamp = math.MinInt64
aggregate, err := determineFloatAggregateMethod(t.gc.Aggregate().Type)
if err != nil {
t.err = err
return false
}
ts, v := aggregate(arr.Timestamps, arr.Values)
timestamps, values := []int64{ts}, []float64{v}
for {
// note that for the group aggregate case, len here should always be 1
for i := 0; i < len; i++ {
switch t.gc.Aggregate().Type {
case datatypes.AggregateTypeCount:
panic("unsupported for aggregate count: Float")
case datatypes.AggregateTypeSum:
value += arr.Values[i]
case datatypes.AggregateTypeFirst:
if arr.Timestamps[i] < timestamp {
timestamp = arr.Timestamps[i]
value = arr.Values[i]
}
case datatypes.AggregateTypeLast:
if arr.Timestamps[i] > timestamp {
timestamp = arr.Timestamps[i]
value = arr.Values[i]
}
}
}
arr = t.cur.Next()
len = arr.Len()
if len > 0 {
if arr.Len() > 0 {
ts, v := aggregate(arr.Timestamps, arr.Values)
timestamps = append(timestamps, ts)
values = append(values, v)
continue
}
if !t.advanceCursor() {
break
}
}
timestamp, value := aggregate(timestamps, values)
colReader := t.allocateBuffer(1)
if IsSelector(t.gc.Aggregate()) {
colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc)
@ -801,6 +782,113 @@ func (t *floatGroupTable) advance() bool {
return true
}
type floatAggregateMethod func([]int64, []float64) (int64, float64)
// determineFloatAggregateMethod returns the method for aggregating
// returned points within the same group. The incoming points are the
// ones returned for each series and the method returned here will
// aggregate the aggregates.
func determineFloatAggregateMethod(agg datatypes.Aggregate_AggregateType) (floatAggregateMethod, error) {
switch agg {
case datatypes.AggregateTypeFirst:
return aggregateFirstGroupsFloat, nil
case datatypes.AggregateTypeLast:
return aggregateLastGroupsFloat, nil
case datatypes.AggregateTypeCount:
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate count: Float",
}
case datatypes.AggregateTypeSum:
return aggregateSumGroupsFloat, nil
case datatypes.AggregateTypeMin:
return aggregateMinGroupsFloat, nil
case datatypes.AggregateTypeMax:
return aggregateMaxGroupsFloat, nil
default:
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg),
}
}
}
func aggregateMinGroupsFloat(timestamps []int64, values []float64) (int64, float64) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if value > values[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func aggregateMaxGroupsFloat(timestamps []int64, values []float64) (int64, float64) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if value < values[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
// For group count and sum, the timestamp here is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be anything
// and it won't matter.
func aggregateSumGroupsFloat(_ []int64, values []float64) (int64, float64) {
var sum float64
for _, v := range values {
sum += v
}
return math.MaxInt64, sum
}
func aggregateFirstGroupsFloat(timestamps []int64, values []float64) (int64, float64) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if timestamp > timestamps[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func aggregateLastGroupsFloat(timestamps []int64, values []float64) (int64, float64) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if timestamp < timestamps[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func (t *floatGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
@ -1567,49 +1655,29 @@ func (t *integerGroupTable) advance() bool {
return true
}
// handle the group with aggregate case
var value int64
// For group count, sum, min, and max, the timestamp here is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be anything
// and it won't matter.
// For group first, we need to assign the initial value to math.MaxInt64 so
// we can find the row with the smallest timestamp.
// Do not worry about data with math.MaxInt64 as its real timestamp.
// In OSS we require a |> range() call in the query and a math.MaxInt64 timestamp
// cannot make it through.
var timestamp int64 = math.MaxInt64
if t.gc.Aggregate().Type == datatypes.AggregateTypeLast {
timestamp = math.MinInt64
aggregate, err := determineIntegerAggregateMethod(t.gc.Aggregate().Type)
if err != nil {
t.err = err
return false
}
ts, v := aggregate(arr.Timestamps, arr.Values)
timestamps, values := []int64{ts}, []int64{v}
for {
// note that for the group aggregate case, len here should always be 1
for i := 0; i < len; i++ {
switch t.gc.Aggregate().Type {
case datatypes.AggregateTypeCount:
fallthrough
case datatypes.AggregateTypeSum:
value += arr.Values[i]
case datatypes.AggregateTypeFirst:
if arr.Timestamps[i] < timestamp {
timestamp = arr.Timestamps[i]
value = arr.Values[i]
}
case datatypes.AggregateTypeLast:
if arr.Timestamps[i] > timestamp {
timestamp = arr.Timestamps[i]
value = arr.Values[i]
}
}
}
arr = t.cur.Next()
len = arr.Len()
if len > 0 {
if arr.Len() > 0 {
ts, v := aggregate(arr.Timestamps, arr.Values)
timestamps = append(timestamps, ts)
values = append(values, v)
continue
}
if !t.advanceCursor() {
break
}
}
timestamp, value := aggregate(timestamps, values)
colReader := t.allocateBuffer(1)
if IsSelector(t.gc.Aggregate()) {
colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc)
@ -1622,6 +1690,114 @@ func (t *integerGroupTable) advance() bool {
return true
}
type integerAggregateMethod func([]int64, []int64) (int64, int64)
// determineIntegerAggregateMethod returns the method for aggregating
// returned points within the same group. The incoming points are the
// ones returned for each series and the method returned here will
// aggregate the aggregates.
func determineIntegerAggregateMethod(agg datatypes.Aggregate_AggregateType) (integerAggregateMethod, error) {
switch agg {
case datatypes.AggregateTypeFirst:
return aggregateFirstGroupsInteger, nil
case datatypes.AggregateTypeLast:
return aggregateLastGroupsInteger, nil
case datatypes.AggregateTypeCount:
return aggregateCountGroupsInteger, nil
case datatypes.AggregateTypeSum:
return aggregateSumGroupsInteger, nil
case datatypes.AggregateTypeMin:
return aggregateMinGroupsInteger, nil
case datatypes.AggregateTypeMax:
return aggregateMaxGroupsInteger, nil
default:
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg),
}
}
}
func aggregateMinGroupsInteger(timestamps []int64, values []int64) (int64, int64) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if value > values[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func aggregateMaxGroupsInteger(timestamps []int64, values []int64) (int64, int64) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if value < values[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
// For group count and sum, the timestamp here is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be anything
// and it won't matter.
func aggregateCountGroupsInteger(timestamps []int64, values []int64) (int64, int64) {
return aggregateSumGroupsInteger(timestamps, values)
}
func aggregateSumGroupsInteger(_ []int64, values []int64) (int64, int64) {
var sum int64
for _, v := range values {
sum += v
}
return math.MaxInt64, sum
}
func aggregateFirstGroupsInteger(timestamps []int64, values []int64) (int64, int64) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if timestamp > timestamps[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func aggregateLastGroupsInteger(timestamps []int64, values []int64) (int64, int64) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if timestamp < timestamps[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func (t *integerGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
@ -2386,49 +2562,29 @@ func (t *unsignedGroupTable) advance() bool {
return true
}
// handle the group with aggregate case
var value uint64
// For group count, sum, min, and max, the timestamp here is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be anything
// and it won't matter.
// For group first, we need to assign the initial value to math.MaxInt64 so
// we can find the row with the smallest timestamp.
// Do not worry about data with math.MaxInt64 as its real timestamp.
// In OSS we require a |> range() call in the query and a math.MaxInt64 timestamp
// cannot make it through.
var timestamp int64 = math.MaxInt64
if t.gc.Aggregate().Type == datatypes.AggregateTypeLast {
timestamp = math.MinInt64
aggregate, err := determineUnsignedAggregateMethod(t.gc.Aggregate().Type)
if err != nil {
t.err = err
return false
}
ts, v := aggregate(arr.Timestamps, arr.Values)
timestamps, values := []int64{ts}, []uint64{v}
for {
// note that for the group aggregate case, len here should always be 1
for i := 0; i < len; i++ {
switch t.gc.Aggregate().Type {
case datatypes.AggregateTypeCount:
panic("unsupported for aggregate count: Unsigned")
case datatypes.AggregateTypeSum:
value += arr.Values[i]
case datatypes.AggregateTypeFirst:
if arr.Timestamps[i] < timestamp {
timestamp = arr.Timestamps[i]
value = arr.Values[i]
}
case datatypes.AggregateTypeLast:
if arr.Timestamps[i] > timestamp {
timestamp = arr.Timestamps[i]
value = arr.Values[i]
}
}
}
arr = t.cur.Next()
len = arr.Len()
if len > 0 {
if arr.Len() > 0 {
ts, v := aggregate(arr.Timestamps, arr.Values)
timestamps = append(timestamps, ts)
values = append(values, v)
continue
}
if !t.advanceCursor() {
break
}
}
timestamp, value := aggregate(timestamps, values)
colReader := t.allocateBuffer(1)
if IsSelector(t.gc.Aggregate()) {
colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc)
@ -2441,6 +2597,113 @@ func (t *unsignedGroupTable) advance() bool {
return true
}
type unsignedAggregateMethod func([]int64, []uint64) (int64, uint64)
// determineUnsignedAggregateMethod returns the method for aggregating
// returned points within the same group. The incoming points are the
// ones returned for each series and the method returned here will
// aggregate the aggregates.
func determineUnsignedAggregateMethod(agg datatypes.Aggregate_AggregateType) (unsignedAggregateMethod, error) {
switch agg {
case datatypes.AggregateTypeFirst:
return aggregateFirstGroupsUnsigned, nil
case datatypes.AggregateTypeLast:
return aggregateLastGroupsUnsigned, nil
case datatypes.AggregateTypeCount:
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate count: Unsigned",
}
case datatypes.AggregateTypeSum:
return aggregateSumGroupsUnsigned, nil
case datatypes.AggregateTypeMin:
return aggregateMinGroupsUnsigned, nil
case datatypes.AggregateTypeMax:
return aggregateMaxGroupsUnsigned, nil
default:
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg),
}
}
}
func aggregateMinGroupsUnsigned(timestamps []int64, values []uint64) (int64, uint64) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if value > values[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func aggregateMaxGroupsUnsigned(timestamps []int64, values []uint64) (int64, uint64) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if value < values[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
// For group count and sum, the timestamp here is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be anything
// and it won't matter.
func aggregateSumGroupsUnsigned(_ []int64, values []uint64) (int64, uint64) {
var sum uint64
for _, v := range values {
sum += v
}
return math.MaxInt64, sum
}
func aggregateFirstGroupsUnsigned(timestamps []int64, values []uint64) (int64, uint64) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if timestamp > timestamps[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func aggregateLastGroupsUnsigned(timestamps []int64, values []uint64) (int64, uint64) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if timestamp < timestamps[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func (t *unsignedGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
@ -3205,49 +3468,29 @@ func (t *stringGroupTable) advance() bool {
return true
}
// handle the group with aggregate case
var value string
// For group count, sum, min, and max, the timestamp here is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be anything
// and it won't matter.
// For group first, we need to assign the initial value to math.MaxInt64 so
// we can find the row with the smallest timestamp.
// Do not worry about data with math.MaxInt64 as its real timestamp.
// In OSS we require a |> range() call in the query and a math.MaxInt64 timestamp
// cannot make it through.
var timestamp int64 = math.MaxInt64
if t.gc.Aggregate().Type == datatypes.AggregateTypeLast {
timestamp = math.MinInt64
aggregate, err := determineStringAggregateMethod(t.gc.Aggregate().Type)
if err != nil {
t.err = err
return false
}
ts, v := aggregate(arr.Timestamps, arr.Values)
timestamps, values := []int64{ts}, []string{v}
for {
// note that for the group aggregate case, len here should always be 1
for i := 0; i < len; i++ {
switch t.gc.Aggregate().Type {
case datatypes.AggregateTypeCount:
panic("unsupported for aggregate count: String")
case datatypes.AggregateTypeSum:
panic("unsupported for aggregate sum: String")
case datatypes.AggregateTypeFirst:
if arr.Timestamps[i] < timestamp {
timestamp = arr.Timestamps[i]
value = arr.Values[i]
}
case datatypes.AggregateTypeLast:
if arr.Timestamps[i] > timestamp {
timestamp = arr.Timestamps[i]
value = arr.Values[i]
}
}
}
arr = t.cur.Next()
len = arr.Len()
if len > 0 {
if arr.Len() > 0 {
ts, v := aggregate(arr.Timestamps, arr.Values)
timestamps = append(timestamps, ts)
values = append(values, v)
continue
}
if !t.advanceCursor() {
break
}
}
timestamp, value := aggregate(timestamps, values)
colReader := t.allocateBuffer(1)
if IsSelector(t.gc.Aggregate()) {
colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc)
@ -3260,6 +3503,86 @@ func (t *stringGroupTable) advance() bool {
return true
}
type stringAggregateMethod func([]int64, []string) (int64, string)
// determineStringAggregateMethod returns the method for aggregating
// returned points within the same group. The incoming points are the
// ones returned for each series and the method returned here will
// aggregate the aggregates.
func determineStringAggregateMethod(agg datatypes.Aggregate_AggregateType) (stringAggregateMethod, error) {
switch agg {
case datatypes.AggregateTypeFirst:
return aggregateFirstGroupsString, nil
case datatypes.AggregateTypeLast:
return aggregateLastGroupsString, nil
case datatypes.AggregateTypeCount:
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate count: String",
}
case datatypes.AggregateTypeSum:
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate sum: String",
}
case datatypes.AggregateTypeMin:
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate min: String",
}
case datatypes.AggregateTypeMax:
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate max: String",
}
default:
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg),
}
}
}
// For group count and sum, the timestamp here is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be anything
// and it won't matter.
func aggregateFirstGroupsString(timestamps []int64, values []string) (int64, string) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if timestamp > timestamps[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func aggregateLastGroupsString(timestamps []int64, values []string) (int64, string) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if timestamp < timestamps[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func (t *stringGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
@ -4024,49 +4347,29 @@ func (t *booleanGroupTable) advance() bool {
return true
}
// handle the group with aggregate case
var value bool
// For group count, sum, min, and max, the timestamp here is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be anything
// and it won't matter.
// For group first, we need to assign the initial value to math.MaxInt64 so
// we can find the row with the smallest timestamp.
// Do not worry about data with math.MaxInt64 as its real timestamp.
// In OSS we require a |> range() call in the query and a math.MaxInt64 timestamp
// cannot make it through.
var timestamp int64 = math.MaxInt64
if t.gc.Aggregate().Type == datatypes.AggregateTypeLast {
timestamp = math.MinInt64
aggregate, err := determineBooleanAggregateMethod(t.gc.Aggregate().Type)
if err != nil {
t.err = err
return false
}
ts, v := aggregate(arr.Timestamps, arr.Values)
timestamps, values := []int64{ts}, []bool{v}
for {
// note that for the group aggregate case, len here should always be 1
for i := 0; i < len; i++ {
switch t.gc.Aggregate().Type {
case datatypes.AggregateTypeCount:
panic("unsupported for aggregate count: Boolean")
case datatypes.AggregateTypeSum:
panic("unsupported for aggregate sum: Boolean")
case datatypes.AggregateTypeFirst:
if arr.Timestamps[i] < timestamp {
timestamp = arr.Timestamps[i]
value = arr.Values[i]
}
case datatypes.AggregateTypeLast:
if arr.Timestamps[i] > timestamp {
timestamp = arr.Timestamps[i]
value = arr.Values[i]
}
}
}
arr = t.cur.Next()
len = arr.Len()
if len > 0 {
if arr.Len() > 0 {
ts, v := aggregate(arr.Timestamps, arr.Values)
timestamps = append(timestamps, ts)
values = append(values, v)
continue
}
if !t.advanceCursor() {
break
}
}
timestamp, value := aggregate(timestamps, values)
colReader := t.allocateBuffer(1)
if IsSelector(t.gc.Aggregate()) {
colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc)
@ -4079,6 +4382,86 @@ func (t *booleanGroupTable) advance() bool {
return true
}
type booleanAggregateMethod func([]int64, []bool) (int64, bool)
// determineBooleanAggregateMethod returns the method for aggregating
// returned points within the same group. The incoming points are the
// ones returned for each series and the method returned here will
// aggregate the aggregates.
func determineBooleanAggregateMethod(agg datatypes.Aggregate_AggregateType) (booleanAggregateMethod, error) {
switch agg {
case datatypes.AggregateTypeFirst:
return aggregateFirstGroupsBoolean, nil
case datatypes.AggregateTypeLast:
return aggregateLastGroupsBoolean, nil
case datatypes.AggregateTypeCount:
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate count: Boolean",
}
case datatypes.AggregateTypeSum:
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate sum: Boolean",
}
case datatypes.AggregateTypeMin:
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate min: Boolean",
}
case datatypes.AggregateTypeMax:
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate max: Boolean",
}
default:
return nil, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg),
}
}
}
// For group count and sum, the timestamp here is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be anything
// and it won't matter.
func aggregateFirstGroupsBoolean(timestamps []int64, values []bool) (int64, bool) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if timestamp > timestamps[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func aggregateLastGroupsBoolean(timestamps []int64, values []bool) (int64, bool) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if timestamp < timestamps[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func (t *booleanGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil

View File

@ -1,6 +1,7 @@
package storageflux
import (
"fmt"
"math"
"sync"
@ -742,49 +743,29 @@ func (t *{{.name}}GroupTable) advance() bool {
return true
}
// handle the group with aggregate case
var value {{.Type}}
// For group count, sum, min, and max, the timestamp here is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be anything
// and it won't matter.
// For group first, we need to assign the initial value to math.MaxInt64 so
// we can find the row with the smallest timestamp.
// Do not worry about data with math.MaxInt64 as its real timestamp.
// In OSS we require a |> range() call in the query and a math.MaxInt64 timestamp
// cannot make it through.
var timestamp int64 = math.MaxInt64
if t.gc.Aggregate().Type == datatypes.AggregateTypeLast {
timestamp = math.MinInt64
aggregate, err := determine{{.Name}}AggregateMethod(t.gc.Aggregate().Type)
if err != nil {
t.err = err
return false
}
ts, v := aggregate(arr.Timestamps, arr.Values)
timestamps, values := []int64{ts}, []{{.Type}}{v}
for {
// note that for the group aggregate case, len here should always be 1
for i := 0; i < len; i++ {
switch t.gc.Aggregate().Type {
case datatypes.AggregateTypeCount:
{{if eq .Name "Integer"}}fallthrough{{else}}panic("unsupported for aggregate count: {{.Name}}"){{end}}
case datatypes.AggregateTypeSum:
{{if or (eq .Name "String") (eq .Name "Boolean")}}panic("unsupported for aggregate sum: {{.Name}}"){{else}}value += arr.Values[i]{{end}}
case datatypes.AggregateTypeFirst:
if arr.Timestamps[i] < timestamp {
timestamp = arr.Timestamps[i]
value = arr.Values[i]
}
case datatypes.AggregateTypeLast:
if arr.Timestamps[i] > timestamp {
timestamp = arr.Timestamps[i]
value = arr.Values[i]
}
}
}
arr = t.cur.Next()
len = arr.Len()
if len > 0 {
if arr.Len() > 0 {
ts, v := aggregate(arr.Timestamps, arr.Values)
timestamps = append(timestamps, ts)
values = append(values, v)
continue
}
if !t.advanceCursor() {
break
}
}
timestamp, value := aggregate(timestamps, values)
colReader := t.allocateBuffer(1)
if IsSelector(t.gc.Aggregate()) {
colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc)
@ -797,6 +778,141 @@ func (t *{{.name}}GroupTable) advance() bool {
return true
}
type {{.name}}AggregateMethod func([]int64, []{{.Type}}) (int64, {{.Type}})
// determine{{.Name}}AggregateMethod returns the method for aggregating
// returned points within the same group. The incoming points are the
// ones returned for each series and the method returned here will
// aggregate the aggregates.
func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({{.name}}AggregateMethod, error){
switch agg {
case datatypes.AggregateTypeFirst:
return aggregateFirstGroups{{.Name}}, nil
case datatypes.AggregateTypeLast:
return aggregateLastGroups{{.Name}}, nil
case datatypes.AggregateTypeCount:
{{if eq .Name "Integer"}}
return aggregateCountGroups{{.Name}}, nil
{{else}}
return nil, &influxdb.Error {
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate count: {{.Name}}",
}
{{end}}
case datatypes.AggregateTypeSum:
{{if and (ne .Name "Boolean") (ne .Name "String")}}
return aggregateSumGroups{{.Name}}, nil
{{else}}
return nil, &influxdb.Error {
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate sum: {{.Name}}",
}
{{end}}
case datatypes.AggregateTypeMin:
{{if and (ne .Name "Boolean") (ne .Name "String")}}
return aggregateMinGroups{{.Name}}, nil
{{else}}
return nil, &influxdb.Error {
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate min: {{.Name}}",
}
{{end}}
case datatypes.AggregateTypeMax:
{{if and (ne .Name "Boolean") (ne .Name "String")}}
return aggregateMaxGroups{{.Name}}, nil
{{else}}
return nil, &influxdb.Error {
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate max: {{.Name}}",
}
{{end}}
default:
return nil, &influxdb.Error {
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg),
}
}
}
{{if and (ne .Name "Boolean") (ne .Name "String")}}
func aggregateMinGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if value > values[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
{{end}}
{{if and (ne .Name "Boolean") (ne .Name "String")}}
func aggregateMaxGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if value < values[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
{{end}}
// For group count and sum, the timestamp here is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be anything
// and it won't matter.
{{if eq .Name "Integer"}}
func aggregateCountGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) {
return aggregateSumGroups{{.Name}}(timestamps, values)
}
{{end}}
{{if and (ne .Name "Boolean") (ne .Name "String")}}
func aggregateSumGroups{{.Name}}(_ []int64, values []{{.Type}}) (int64, {{.Type}}) {
var sum {{.Type}}
for _, v := range values {
sum += v
}
return math.MaxInt64, sum
}
{{end}}
func aggregateFirstGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if timestamp > timestamps[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func aggregateLastGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if timestamp < timestamps[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func (t *{{.name}}GroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil

View File

@ -71,7 +71,7 @@ func (t *table) isCancelled() bool {
}
func (t *table) init(advance func() bool) {
t.empty = !advance()
t.empty = !advance() && t.err == nil
}
func (t *table) do(f func(flux.ColReader) error, advance func() bool) error {
@ -82,6 +82,12 @@ func (t *table) do(f func(flux.ColReader) error, advance func() bool) error {
}
defer t.closeDone()
// If an error occurred during initialization, that is
// returned here.
if t.err != nil {
return t.err
}
if !t.Empty() {
t.err = f(t.colBufs)
t.colBufs.Release()

View File

@ -2564,6 +2564,115 @@ func TestStorageReader_EmptyTableNoEmptyWindows(t *testing.T) {
}
}
func TestStorageReader_ReadGroup(t *testing.T) {
reader := NewStorageReader(t, func(org, bucket influxdb.ID) (gen.SeriesGenerator, gen.TimeRange) {
spec := Spec(org, bucket,
MeasurementSpec("m0",
FloatArrayValuesSequence("f0", 10*time.Second, []float64{1.0, 2.0, 3.0, 4.0}),
TagValuesSequence("t0", "a-%s", 0, 3),
),
)
tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:02:00Z")
return gen.NewSeriesGeneratorFromSpec(spec, tr), tr
})
defer reader.Close()
for _, tt := range []struct {
aggregate string
want flux.TableIterator
}{
{
aggregate: storageflux.CountKind,
want: static.TableGroup{
static.StringKey("_measurement", "m0"),
static.StringKey("_field", "f0"),
static.TimeKey("_start", "2019-11-25T00:00:00Z"),
static.TimeKey("_stop", "2019-11-25T00:02:00Z"),
static.TableMatrix{
static.StringKeys("t0", "a-0", "a-1", "a-2"),
{
static.Table{
static.Ints("_value", 12),
},
},
},
},
},
{
aggregate: storageflux.SumKind,
want: static.TableGroup{
static.StringKey("_measurement", "m0"),
static.StringKey("_field", "f0"),
static.TimeKey("_start", "2019-11-25T00:00:00Z"),
static.TimeKey("_stop", "2019-11-25T00:02:00Z"),
static.TableMatrix{
static.StringKeys("t0", "a-0", "a-1", "a-2"),
{
static.Table{
static.Floats("_value", 30),
},
},
},
},
},
{
aggregate: storageflux.MinKind,
want: static.TableGroup{
static.StringKey("_measurement", "m0"),
static.StringKey("_field", "f0"),
static.TimeKey("_start", "2019-11-25T00:00:00Z"),
static.TimeKey("_stop", "2019-11-25T00:02:00Z"),
static.TableMatrix{
static.StringKeys("t0", "a-0", "a-1", "a-2"),
{
static.Table{
static.Times("_time", "2019-11-25T00:00:00Z"),
static.Floats("_value", 1),
},
},
},
},
},
{
aggregate: storageflux.MaxKind,
want: static.TableGroup{
static.StringKey("_measurement", "m0"),
static.StringKey("_field", "f0"),
static.TimeKey("_start", "2019-11-25T00:00:00Z"),
static.TimeKey("_stop", "2019-11-25T00:02:00Z"),
static.TableMatrix{
static.StringKeys("t0", "a-0", "a-1", "a-2"),
{
static.Table{
static.Times("_time", "2019-11-25T00:00:30Z"),
static.Floats("_value", 4),
},
},
},
},
},
} {
mem := &memory.Allocator{}
got, err := reader.ReadGroup(context.Background(), query.ReadGroupSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
BucketID: reader.Bucket,
Bounds: reader.Bounds,
},
GroupMode: query.GroupModeBy,
GroupKeys: []string{"_measurement", "_field", "t0"},
AggregateMethod: tt.aggregate,
}, mem)
if err != nil {
t.Fatal(err)
}
if diff := table.Diff(tt.want, got); diff != "" {
t.Errorf("unexpected results -want/+got:\n%s", diff)
}
}
}
func BenchmarkReadFilter(b *testing.B) {
setupFn := func(org, bucket influxdb.ID) (gen.SeriesGenerator, gen.TimeRange) {
tagsSpec := &gen.TagsSpec{