Merge pull request #19807 from influxdata/feat/enable-mean-pushdown

feat(storage): enable window agg mean pushdown
pull/19839/head
Faith Chikwekwe 2020-10-27 13:24:37 -07:00 committed by GitHub
commit bd38374147
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1995 additions and 144 deletions

View File

@ -953,13 +953,13 @@ error2","query plan",109,110
}
func TestQueryPushDowns(t *testing.T) {
t.Skip("Not supported yet")
testcases := []struct {
name string
data []string
query string
op string
want string
skip string
}{
{
name: "range last single point start time",
@ -979,6 +979,7 @@ from(bucket: v.bucket)
,result,table,_start,_stop,_time,_value,_field,_measurement,tag
,,0,1970-01-01T00:00:00.000000001Z,1970-01-01T01:00:00Z,1970-01-01T00:00:00.000000001Z,1,f,m,a
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window last",
@ -1018,6 +1019,7 @@ from(bucket: v.bucket)
,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,1970-01-01T00:00:14Z,9,f,m0,k0
,,4,1970-01-01T00:00:15Z,1970-01-01T00:00:18Z,1970-01-01T00:00:15Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window offset last",
@ -1056,6 +1058,7 @@ from(bucket: v.bucket)
,,2,1970-01-01T00:00:11Z,1970-01-01T00:00:14Z,1970-01-01T00:00:13Z,8,f,m0,k0
,,3,1970-01-01T00:00:14Z,1970-01-01T00:00:17Z,1970-01-01T00:00:15Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "bare last",
@ -1090,6 +1093,7 @@ from(bucket: v.bucket)
,result,table,_start,_stop,_time,_value,_field,_measurement,k
,,0,1970-01-01T00:00:05Z,1970-01-01T00:00:20Z,1970-01-01T00:00:15Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window empty last",
@ -1135,6 +1139,7 @@ from(bucket: v.bucket)
#default,_result,2,1970-01-01T01:00:00Z,1970-01-01T02:00:00Z,,,f,m0,k0
,result,table,_start,_stop,_time,_value,_field,_measurement,k
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window empty offset last",
@ -1180,6 +1185,7 @@ from(bucket: v.bucket)
#default,_result,2,1970-01-01T01:00:00Z,1970-01-01T02:00:00Z,,,f,m0,k0
,result,table,_start,_stop,_time,_value,_field,_measurement,k
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window aggregate last",
@ -1215,6 +1221,7 @@ from(bucket: v.bucket)
,,0,1969-12-31T23:59:59Z,1970-01-01T00:00:33Z,1970-01-01T00:00:10Z,6,f,m0,k0
,,0,1969-12-31T23:59:59Z,1970-01-01T00:00:33Z,1970-01-01T00:00:20Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window first",
@ -1254,6 +1261,7 @@ from(bucket: v.bucket)
,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,1970-01-01T00:00:12Z,5,f,m0,k0
,,4,1970-01-01T00:00:15Z,1970-01-01T00:00:18Z,1970-01-01T00:00:15Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window first string",
@ -1279,6 +1287,7 @@ from(bucket: v.bucket)
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:05Z,1970-01-01T00:00:02Z,c,f,m,a
,,1,1970-01-01T00:00:05Z,1970-01-01T00:00:10Z,1970-01-01T00:00:07Z,h,f,m,a
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "bare first",
@ -1313,6 +1322,7 @@ from(bucket: v.bucket)
,result,table,_start,_stop,_time,_value,_field,_measurement,k
,,0,1970-01-01T00:00:05Z,1970-01-01T00:00:20Z,1970-01-01T00:00:05Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window empty first",
@ -1364,6 +1374,7 @@ from(bucket: v.bucket)
#default,_result,3,1970-01-01T00:00:01.5Z,1970-01-01T00:00:02Z,,,f,m0,k0
,_result,table,_start,_stop,_time,_value,_field,_measurement,k
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window aggregate first",
@ -1399,6 +1410,7 @@ from(bucket: v.bucket)
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:02Z,1970-01-01T00:00:00.5Z,0,f,m0,k0
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:02Z,1970-01-01T00:00:01.5Z,1,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window min",
@ -1438,6 +1450,7 @@ from(bucket: v.bucket)
,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,1970-01-01T00:00:12Z,5,f,m0,k0
,,4,1970-01-01T00:00:15Z,1970-01-01T00:00:18Z,1970-01-01T00:00:15Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "bare min",
@ -1472,6 +1485,7 @@ from(bucket: v.bucket)
,result,table,_start,_stop,_time,_value,_field,_measurement,k
,,0,1970-01-01T00:00:05Z,1970-01-01T00:00:20Z,1970-01-01T00:00:08Z,0,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window empty min",
@ -1513,6 +1527,7 @@ from(bucket: v.bucket)
#default,_result,3,1970-01-01T00:00:09Z,1970-01-01T00:00:12Z,,,f,m0,k0
,_result,table,_start,_stop,_time,_value,_field,_measurement,k
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window aggregate min",
@ -1538,6 +1553,7 @@ from(bucket: v.bucket)
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:12Z,1970-01-01T00:00:03Z,0,f,m0,k0
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:12Z,1970-01-01T00:00:09Z,0,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window max",
@ -1577,6 +1593,7 @@ from(bucket: v.bucket)
,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,1970-01-01T00:00:14Z,9,f,m0,k0
,,4,1970-01-01T00:00:15Z,1970-01-01T00:00:18Z,1970-01-01T00:00:15Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "bare max",
@ -1611,6 +1628,7 @@ from(bucket: v.bucket)
,result,table,_start,_stop,_time,_value,_field,_measurement,k
,,0,1970-01-01T00:00:05Z,1970-01-01T00:00:20Z,1970-01-01T00:00:14Z,9,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window empty max",
@ -1652,6 +1670,7 @@ from(bucket: v.bucket)
#default,_result,3,1970-01-01T00:00:09Z,1970-01-01T00:00:12Z,,,f,m0,k0
,_result,table,_start,_stop,_time,_value,_field,_measurement,k
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window aggregate max",
@ -1677,6 +1696,7 @@ from(bucket: v.bucket)
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:12Z,1970-01-01T00:00:03Z,2,f,m0,k0
,,0,1970-01-01T00:00:00Z,1970-01-01T00:00:12Z,1970-01-01T00:00:09Z,6,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window count removes empty series",
@ -1700,6 +1720,7 @@ from(bucket: v.bucket)
,_result,0,1970-01-01T00:00:01Z,1970-01-01T00:00:01.5Z,0,f,m,a
,_result,1,1970-01-01T00:00:01.5Z,1970-01-01T00:00:02Z,1,f,m,a
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "count",
@ -1737,6 +1758,7 @@ from(bucket: v.bucket)
,,0,1970-01-01T00:00:10Z,5,f,m0,k0
,,0,1970-01-01T00:00:15Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window offset count",
@ -1775,6 +1797,7 @@ from(bucket: v.bucket)
,,2,1970-01-01T00:00:07Z,1970-01-01T00:00:12Z,5,f,m0,k0
,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,3,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "count with nulls",
@ -1807,6 +1830,7 @@ from(bucket: v.bucket)
,,0,1970-01-01T00:00:10Z,0,f,m0,k0
,,0,1970-01-01T00:00:15Z,5,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "bare count",
@ -1842,6 +1866,7 @@ from(bucket: v.bucket)
,result,table,_value,_field,_measurement,k
,,0,15,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window sum removes empty series",
@ -1866,6 +1891,7 @@ from(bucket: v.bucket)
,_result,0,1970-01-01T00:00:01Z,1970-01-01T00:00:01.5Z,,f,m,a
,_result,1,1970-01-01T00:00:01.5Z,1970-01-01T00:00:02Z,3,f,m,a
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "sum",
@ -1903,6 +1929,7 @@ from(bucket: v.bucket)
,,0,1970-01-01T00:00:10Z,22,f,m0,k0
,,0,1970-01-01T00:00:15Z,35,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "window offset sum",
@ -1941,6 +1968,7 @@ from(bucket: v.bucket)
,,2,1970-01-01T00:00:07Z,1970-01-01T00:00:12Z,24,f,m0,k0
,,3,1970-01-01T00:00:12Z,1970-01-01T00:00:15Z,22,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "sum with nulls",
@ -1973,6 +2001,7 @@ from(bucket: v.bucket)
,,0,1970-01-01T00:00:10Z,,f,m0,k0
,,0,1970-01-01T00:00:15Z,35,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "bare sum",
@ -2008,6 +2037,7 @@ from(bucket: v.bucket)
,result,table,_value,_field,_measurement,k
,,0,67,f,m0,k0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "bare mean",
@ -2183,6 +2213,7 @@ from(bucket: v.bucket)
,result,table,_time,_value
,,0,1970-01-01T00:00:00.00Z,0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "group none first",
@ -2219,6 +2250,7 @@ from(bucket: v.bucket)
,result,table,_time,_value
,,0,1970-01-01T00:00:00.00Z,0
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "group last",
@ -2255,6 +2287,7 @@ from(bucket: v.bucket)
,result,table,_time,_value
,,0,1970-01-01T00:00:15.00Z,5
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "group none last",
@ -2291,6 +2324,7 @@ from(bucket: v.bucket)
,result,table,_time,_value
,,0,1970-01-01T00:00:15.00Z,5
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "count group none",
@ -2327,6 +2361,7 @@ from(bucket: v.bucket)
,result,table,_value
,,0,15
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "count group",
@ -2364,6 +2399,7 @@ from(bucket: v.bucket)
,,0,kk0,8
,,1,kk1,7
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "sum group none",
@ -2400,6 +2436,7 @@ from(bucket: v.bucket)
,result,table,_value
,,0,67
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "sum group",
@ -2437,6 +2474,7 @@ from(bucket: v.bucket)
,,0,kk0,32
,,1,kk1,35
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "min group",
@ -2474,6 +2512,7 @@ from(bucket: v.bucket)
,,0,kk0,0
,,1,kk1,1
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
{
name: "max group",
@ -2511,11 +2550,15 @@ from(bucket: v.bucket)
,,0,kk0,9
,,1,kk1,8
`,
skip: "https://github.com/influxdata/idpe/issues/8828",
},
}
for _, tc := range testcases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
if tc.skip != "" {
t.Skip(tc.skip)
}
l := launcher.RunTestLauncherOrFail(t, ctx, mock.NewFlagger(map[feature.Flag]interface{}{
feature.PushDownWindowAggregateMean(): true,
feature.PushDownGroupAggregateMinMax(): true,

View File

@ -12,6 +12,7 @@ type StorageReader struct {
ReadGroupFn func(ctx context.Context, spec query.ReadGroupSpec, alloc *memory.Allocator) (query.TableIterator, error)
ReadTagKeysFn func(ctx context.Context, spec query.ReadTagKeysSpec, alloc *memory.Allocator) (query.TableIterator, error)
ReadTagValuesFn func(ctx context.Context, spec query.ReadTagValuesSpec, alloc *memory.Allocator) (query.TableIterator, error)
ReadWindowAggregateFn func(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error)
CloseFn func()
}
@ -40,32 +41,6 @@ func (s *StorageReader) Close() {
}
}
type GroupStoreReader struct {
*StorageReader
GroupCapabilityFn func(ctx context.Context) query.GroupCapability
}
func (s *GroupStoreReader) GetGroupCapability(ctx context.Context) query.GroupCapability {
if s.GroupCapabilityFn != nil {
return s.GroupCapabilityFn(ctx)
}
return nil
}
type WindowAggregateStoreReader struct {
*StorageReader
GetWindowAggregateCapabilityFn func(ctx context.Context) query.WindowAggregateCapability
ReadWindowAggregateFn func(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error)
}
func (s *WindowAggregateStoreReader) GetWindowAggregateCapability(ctx context.Context) query.WindowAggregateCapability {
// Use the function if it exists.
if s.GetWindowAggregateCapabilityFn != nil {
return s.GetWindowAggregateCapabilityFn(ctx)
}
return nil
}
func (s *WindowAggregateStoreReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {
func (s *StorageReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {
return s.ReadWindowAggregateFn(ctx, spec, alloc)
}

View File

@ -680,70 +680,20 @@ func (rule PushDownWindowAggregateRule) Pattern() plan.Pattern {
}
func canPushWindowedAggregate(ctx context.Context, fnNode plan.Node) bool {
caps, ok := capabilities(ctx)
if !ok {
return false
}
// Check the aggregate function spec. Require the operation on _value
// and check the feature flag associated with the aggregate function.
switch fnNode.Kind() {
case universe.MinKind:
if !caps.HaveMin() {
return false
}
minSpec := fnNode.ProcedureSpec().(*universe.MinProcedureSpec)
if minSpec.Column != execute.DefaultValueColLabel {
return false
}
case universe.MaxKind:
if !caps.HaveMax() {
return false
}
maxSpec := fnNode.ProcedureSpec().(*universe.MaxProcedureSpec)
if maxSpec.Column != execute.DefaultValueColLabel {
return false
}
case universe.MeanKind:
if !feature.PushDownWindowAggregateMean().Enabled(ctx) || !caps.HaveMean() {
if !feature.PushDownWindowAggregateMean().Enabled(ctx) {
return false
}
meanSpec := fnNode.ProcedureSpec().(*universe.MeanProcedureSpec)
if len(meanSpec.Columns) != 1 || meanSpec.Columns[0] != execute.DefaultValueColLabel {
return false
}
case universe.CountKind:
if !caps.HaveCount() {
default:
return false
}
countSpec := fnNode.ProcedureSpec().(*universe.CountProcedureSpec)
if len(countSpec.Columns) != 1 || countSpec.Columns[0] != execute.DefaultValueColLabel {
return false
}
case universe.SumKind:
if !caps.HaveSum() {
return false
}
sumSpec := fnNode.ProcedureSpec().(*universe.SumProcedureSpec)
if len(sumSpec.Columns) != 1 || sumSpec.Columns[0] != execute.DefaultValueColLabel {
return false
}
case universe.FirstKind:
if !caps.HaveFirst() {
return false
}
firstSpec := fnNode.ProcedureSpec().(*universe.FirstProcedureSpec)
if firstSpec.Column != execute.DefaultValueColLabel {
return false
}
case universe.LastKind:
if !caps.HaveLast() {
return false
}
lastSpec := fnNode.ProcedureSpec().(*universe.LastProcedureSpec)
if lastSpec.Column != execute.DefaultValueColLabel {
return false
}
}
return true
}
@ -769,16 +719,6 @@ func isPushableWindow(windowSpec *universe.WindowProcedureSpec) bool {
windowSpec.StopColumn == "_stop"
}
func capabilities(ctx context.Context) (query.WindowAggregateCapability, bool) {
reader := GetStorageDependencies(ctx).FromDeps.Reader
windowAggregateReader, ok := reader.(query.WindowAggregateReader)
if !ok {
return nil, false
}
caps := windowAggregateReader.GetWindowAggregateCapability(ctx)
return caps, caps != nil
}
func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) {
fnNode := pn
if !canPushWindowedAggregate(ctx, fnNode) {
@ -794,10 +734,6 @@ func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (p
return pn, false, nil
}
if caps, ok := capabilities(ctx); !ok || windowSpec.Window.Offset.IsPositive() && !caps.HaveOffset() {
return pn, false, nil
}
// Rule passes.
return plan.CreatePhysicalNode("ReadWindowAggregate", &ReadWindowAggregatePhysSpec{
ReadRangePhysSpec: *fromSpec.Copy().(*ReadRangePhysSpec),
@ -946,10 +882,6 @@ func (p GroupWindowAggregateTransposeRule) Rewrite(ctx context.Context, pn plan.
return pn, false, nil
}
if caps, ok := capabilities(ctx); !ok || windowSpec.Window.Offset.IsPositive() && !caps.HaveOffset() {
return pn, false, nil
}
fromNode := windowNode.Predecessors()[0]
fromSpec := fromNode.ProcedureSpec().(*ReadGroupPhysSpec)

View File

@ -37,10 +37,6 @@ func (caps mockReaderCaps) GetGroupCapability(ctx context.Context) query.GroupCa
return caps.GroupCapabilities
}
func (caps mockReaderCaps) GetWindowAggregateCapability(ctx context.Context) query.WindowAggregateCapability {
return mockWAC{Have: caps.Have}
}
func (caps mockReaderCaps) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {
return nil, nil
}
@ -56,26 +52,41 @@ func (c mockGroupCapability) HaveLast() bool { return c.last }
func (c mockGroupCapability) HaveMin() bool { return c.min }
func (c mockGroupCapability) HaveMax() bool { return c.max }
// Mock Window Aggregate Capability
type mockWAC struct {
Have bool
}
func (m mockWAC) HaveMin() bool { return m.Have }
func (m mockWAC) HaveMax() bool { return m.Have }
func (m mockWAC) HaveMean() bool { return m.Have }
func (m mockWAC) HaveCount() bool { return m.Have }
func (m mockWAC) HaveSum() bool { return m.Have }
func (m mockWAC) HaveFirst() bool { return m.Have }
func (m mockWAC) HaveLast() bool { return m.Have }
func (m mockWAC) HaveOffset() bool { return m.Have }
func fluxTime(t int64) flux.Time {
return flux.Time{
Absolute: time.Unix(0, t).UTC(),
}
}
var skipTests = map[string]string{
"push down sum": "unskip once sum is ported",
"push down first": "unskip once first is ported",
"push down last": "unskip once last is ported",
"push down count": "unskip once count is ported",
"WithSuccessor": "unskip once min is ported",
"WindowPositiveOffset": "unskip once last is ported",
"SimplePassMax": "unskip once max is ported",
"SimplePassMin": "unskip once min is ported",
"SimplePassFirst": "unskip once first is ported",
"SimplePassLast": "unskip once last is ported",
"GroupByStartPassMin": "unskip once min is ported",
"GroupByHostPassMin": "unskip once min is ported",
"SimplePassCount": "unskip once count is ported",
"SimplePassSum": "unskip once sum is ported",
"PositiveOffset": "unskip once min is ported",
"CreateEmptyPassMin": "unskip once min is ported",
"AggregateWindowCountInvalidClosingWindowMultiple": "unskip once count is ported",
"AggregateWindowCountMultipleMatches": "unskip once count is ported",
"AggregateWindowCountInvalidDuplicateAs": "unskip once count is ported",
"AggregateWindowCountInvalidClosingWindow": "unskip once count is ported",
"AggregateWindowCountInvalidDuplicateColumn": "unskip once count is ported",
"AggregateWindowCountWrongSchemaMutator": "unskip once count is ported",
"AggregateWindowCount": "unskip once count is ported",
"AggregateWindowCount#01": "unskip once count is ported",
"AggregateWindowCountCreateEmpty": "unskip once count is ported",
"AggregateWindowCountInvalidClosingWindowCreateEmpty": "unskip once count is ported",
}
func TestPushDownRangeRule(t *testing.T) {
fromSpec := influxdb.FromStorageProcedureSpec{
Bucket: influxdb.NameOrID{Name: "my-bucket"},
@ -1941,6 +1952,9 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
for _, tc := range tests {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
if _, ok := skipTests[tc.Name]; ok {
t.Skip(skipTests[tc.Name])
}
t.Parallel()
plantest.PhysicalRuleTestHelper(t, &tc)
})
@ -2491,6 +2505,9 @@ func TestTransposeGroupToWindowAggregateRule(t *testing.T) {
for _, tc := range tests {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
if _, ok := skipTests[tc.Name]; ok {
t.Skip(skipTests[tc.Name])
}
t.Parallel()
plantest.PhysicalRuleTestHelper(t, &tc)
})
@ -2634,6 +2651,9 @@ func TestPushDownBareAggregateRule(t *testing.T) {
for _, tc := range testcases {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
if _, ok := skipTests[tc.Name]; ok {
t.Skip(skipTests[tc.Name])
}
t.Parallel()
plantest.PhysicalRuleTestHelper(t, &tc)
})

View File

@ -205,7 +205,7 @@ func TestReadWindowAggregateSource(t *testing.T) {
universe.SumKind,
},
}
reader := &mock.WindowAggregateStoreReader{
reader := &mock.StorageReader{
ReadWindowAggregateFn: func(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {
if want, got := orgID, spec.OrganizationID; want != got {
t.Errorf("unexpected organization id -want/+got:\n\t- %s\n\t+ %s", want, got)

View File

@ -37,24 +37,8 @@ type GroupAggregator interface {
GetGroupCapability(ctx context.Context) GroupCapability
}
// WindowAggregateCapability describes what is supported by WindowAggregateReader.
type WindowAggregateCapability interface {
HaveMin() bool
HaveMax() bool
HaveMean() bool
HaveCount() bool
HaveSum() bool
HaveFirst() bool
HaveLast() bool
HaveOffset() bool
}
// WindowAggregateReader implements the WindowAggregate capability.
type WindowAggregateReader interface {
// GetWindowAggregateCapability will get a detailed list of what the RPC call supports
// for window aggregate.
GetWindowAggregateCapability(ctx context.Context) WindowAggregateCapability
// ReadWindowAggregate will read a table using the WindowAggregate method.
ReadWindowAggregate(ctx context.Context, spec ReadWindowAggregateSpec, alloc *memory.Allocator) (TableIterator, error)
}
@ -96,6 +80,7 @@ type ReadWindowAggregateSpec struct {
Aggregates []plan.ProcedureKind
CreateEmpty bool
TimeColumn string
Window execute.Window
}
func (spec *ReadWindowAggregateSpec) Name() string {

View File

@ -3,6 +3,7 @@ package storageflux
import (
"context"
"fmt"
"github.com/influxdata/flux/plan"
"strings"
"github.com/gogo/protobuf/types"
@ -10,6 +11,8 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/errors"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/query"
storage "github.com/influxdata/influxdb/v2/storage/reads"
@ -54,6 +57,16 @@ type storeReader struct {
s storage.Store
}
func (r *storeReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {
return &windowAggregateIterator{
ctx: ctx,
s: r.s,
spec: spec,
cache: newTagsCache(0),
alloc: alloc,
}, nil
}
// NewReader returns a new storageflux reader
func NewReader(s storage.Store) query.StorageReader {
return &storeReader{s: s}
@ -218,6 +231,276 @@ READ:
return rs.Err()
}
type windowAggregateIterator struct {
ctx context.Context
s storage.Store
spec query.ReadWindowAggregateSpec
stats cursors.CursorStats
cache *tagsCache
alloc *memory.Allocator
}
func (wai *windowAggregateIterator) Statistics() cursors.CursorStats { return wai.stats }
func (wai *windowAggregateIterator) Do(f func(flux.Table) error) error {
src := wai.s.GetSource(
uint64(wai.spec.OrganizationID),
uint64(wai.spec.BucketID),
)
// Setup read request
any, err := types.MarshalAny(src)
if err != nil {
return err
}
var req datatypes.ReadWindowAggregateRequest
req.ReadSource = any
req.Predicate = wai.spec.Predicate
req.Range.Start = int64(wai.spec.Bounds.Start)
req.Range.End = int64(wai.spec.Bounds.Stop)
if wai.spec.WindowEvery != 0 || wai.spec.Offset != 0 {
req.Window = &datatypes.Window{
Every: &datatypes.Duration{
Nsecs: wai.spec.WindowEvery,
},
Offset: &datatypes.Duration{
Nsecs: wai.spec.Offset,
},
}
} else {
req.Window = &datatypes.Window{
Every: &datatypes.Duration{
Nsecs: wai.spec.Window.Every.Nanoseconds(),
Months: wai.spec.Window.Every.Months(),
Negative: wai.spec.Window.Every.IsNegative(),
},
Offset: &datatypes.Duration{
Nsecs: wai.spec.Window.Offset.Nanoseconds(),
Months: wai.spec.Window.Offset.Months(),
Negative: wai.spec.Window.Offset.IsNegative(),
},
}
}
req.Aggregate = make([]*datatypes.Aggregate, len(wai.spec.Aggregates))
for i, aggKind := range wai.spec.Aggregates {
if agg, err := determineAggregateMethod(string(aggKind)); err != nil {
return err
} else if agg != datatypes.AggregateTypeNone {
req.Aggregate[i] = &datatypes.Aggregate{Type: agg}
}
}
aggStore, ok := wai.s.(storage.Store)
if !ok {
return errors.New("storage does not support window aggregate")
}
rs, err := aggStore.WindowAggregate(wai.ctx, &req)
if err != nil {
return err
}
if rs == nil {
return nil
}
return wai.handleRead(f, rs)
}
const (
CountKind = "count"
SumKind = "sum"
FirstKind = "first"
LastKind = "last"
MinKind = "min"
MaxKind = "max"
MeanKind = "mean"
)
// isSelector returns true if given a procedure kind that represents a selector operator.
func isSelector(kind plan.ProcedureKind) bool {
return kind == FirstKind || kind == LastKind || kind == MinKind || kind == MaxKind
}
func isAggregateCount(kind plan.ProcedureKind) bool {
return kind == CountKind
}
func (wai *windowAggregateIterator) handleRead(f func(flux.Table) error, rs storage.ResultSet) error {
var window execute.Window
if wai.spec.WindowEvery != 0 || wai.spec.Offset != 0 {
windowEvery := wai.spec.WindowEvery
offset := wai.spec.Offset
everyDur := values.MakeDuration(windowEvery, 0, false)
offsetDur := values.MakeDuration(offset, 0, false)
window = execute.Window{
Every: everyDur,
Period: everyDur,
Offset: offsetDur,
}
} else {
window = wai.spec.Window
if window.Every != window.Period {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "planner should ensure that period equals every",
}
}
}
createEmpty := wai.spec.CreateEmpty
selector := len(wai.spec.Aggregates) > 0 && isSelector(wai.spec.Aggregates[0])
timeColumn := wai.spec.TimeColumn
if timeColumn == "" {
tableFn := f
f = func(table flux.Table) error {
return splitWindows(wai.ctx, wai.alloc, table, selector, tableFn)
}
}
// these resources must be closed if not nil on return
var (
cur cursors.Cursor
table storageTable
)
defer func() {
if table != nil {
table.Close()
}
if cur != nil {
cur.Close()
}
rs.Close()
wai.cache.Release()
}()
READ:
for rs.Next() {
cur = rs.Cursor()
if cur == nil {
// no data for series key + field combination
continue
}
bnds := wai.spec.Bounds
key := defaultGroupKeyForSeries(rs.Tags(), bnds)
done := make(chan struct{})
hasTimeCol := timeColumn != ""
switch typedCur := cur.(type) {
case cursors.IntegerArrayCursor:
if !selector {
var fillValue *int64
if isAggregateCount(wai.spec.Aggregates[0]) {
fillValue = func(v int64) *int64 { return &v }(0)
}
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TInt, hasTimeCol)
table = newIntegerWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, fillValue, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else if createEmpty && !hasTimeCol {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "window selectors not yet supported in storage",
}
} else {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "window selectors not yet supported in storage",
}
}
case cursors.FloatArrayCursor:
if !selector {
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TFloat, hasTimeCol)
table = newFloatWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else if createEmpty && !hasTimeCol {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "window selectors not yet supported in storage",
}
} else {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "window selectors not yet supported in storage",
}
}
case cursors.UnsignedArrayCursor:
if !selector {
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TUInt, hasTimeCol)
table = newUnsignedWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else if createEmpty && !hasTimeCol {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "window selectors not yet supported in storage",
}
} else {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "window selectors not yet supported in storage",
}
}
case cursors.BooleanArrayCursor:
if !selector {
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TBool, hasTimeCol)
table = newBooleanWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else if createEmpty && !hasTimeCol {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "window selectors not yet supported in storage",
}
} else {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "window selectors not yet supported in storage",
}
}
case cursors.StringArrayCursor:
if !selector {
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TString, hasTimeCol)
table = newStringWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else if createEmpty && !hasTimeCol {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "window selectors not yet supported in storage",
}
} else {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "window selectors not yet supported in storage",
}
}
default:
panic(fmt.Sprintf("unreachable: %T", typedCur))
}
cur = nil
if !table.Empty() {
if err := f(table); err != nil {
table.Close()
table = nil
return err
}
select {
case <-done:
case <-wai.ctx.Done():
table.Cancel()
break READ
}
}
stats := table.Statistics()
wai.stats.ScannedValues += stats.ScannedValues
wai.stats.ScannedBytes += stats.ScannedBytes
table.Close()
table = nil
}
return rs.Err()
}
type groupIterator struct {
ctx context.Context
s storage.Store
@ -380,9 +663,54 @@ const (
startColIdx = 0
stopColIdx = 1
timeColIdx = 2
valueColIdxWithoutTime = 2
valueColIdx = 3
)
func determineTableColsForWindowAggregate(tags models.Tags, typ flux.ColType, hasTimeCol bool) ([]flux.ColMeta, [][]byte) {
var cols []flux.ColMeta
var defs [][]byte
// aggregates remove the _time column
size := 3
if hasTimeCol {
size++
}
cols = make([]flux.ColMeta, size+len(tags))
defs = make([][]byte, size+len(tags))
cols[startColIdx] = flux.ColMeta{
Label: execute.DefaultStartColLabel,
Type: flux.TTime,
}
cols[stopColIdx] = flux.ColMeta{
Label: execute.DefaultStopColLabel,
Type: flux.TTime,
}
if hasTimeCol {
cols[timeColIdx] = flux.ColMeta{
Label: execute.DefaultTimeColLabel,
Type: flux.TTime,
}
cols[valueColIdx] = flux.ColMeta{
Label: execute.DefaultValueColLabel,
Type: typ,
}
} else {
cols[valueColIdxWithoutTime] = flux.ColMeta{
Label: execute.DefaultValueColLabel,
Type: typ,
}
}
for j, tag := range tags {
cols[size+j] = flux.ColMeta{
Label: string(tag.Key),
Type: flux.TString,
}
defs[size+j] = []byte("")
}
return cols, defs
}
func determineTableColsForSeries(tags models.Tags, typ flux.ColType) ([]flux.ColMeta, [][]byte) {
cols := make([]flux.ColMeta, 4+len(tags))
defs := make([][]byte, 4+len(tags))

File diff suppressed because it is too large Load Diff

View File

@ -3,10 +3,12 @@ package storageflux
import (
"sync"
"github.com/apache/arrow/go/arrow/array"
"github.com/influxdata/flux"
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2"
storage "github.com/influxdata/influxdb/v2/storage/reads"
"github.com/influxdata/influxdb/v2/models"
@ -91,6 +93,223 @@ func (t *{{.name}}Table) advance() bool {
return true
}
// window table
type {{.name}}WindowTable struct {
{{.name}}Table
arr *cursors.{{.Name}}Array
nextTS int64
idxInArr int
createEmpty bool
timeColumn string
window execute.Window
{{if eq .Name "Integer"}}fillValue *{{.Type}}{{end}}
}
func new{{.Name}}WindowTable(
done chan struct{},
cur cursors.{{.Name}}ArrayCursor,
bounds execute.Bounds,
window execute.Window,
createEmpty bool,
timeColumn string,
{{if eq .Name "Integer"}}fillValue *{{.Type}},{{end}}
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *{{.name}}WindowTable {
t := &{{.name}}WindowTable{
{{.name}}Table: {{.name}}Table{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
window: window,
createEmpty: createEmpty,
timeColumn: timeColumn,
{{if eq .Name "Integer"}}fillValue: fillValue,{{end}}
}
if t.createEmpty {
start := int64(bounds.Start)
t.nextTS = int64(window.GetEarliestBounds(values.Time(start)).Stop)
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *{{.name}}WindowTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
// createNextBufferTimes will read the timestamps from the array
// cursor and construct the values for the next buffer.
func (t *{{.name}}WindowTable) createNextBufferTimes() (start, stop *array.Int64, ok bool) {
startB := arrow.NewIntBuilder(t.alloc)
stopB := arrow.NewIntBuilder(t.alloc)
if t.createEmpty {
// There are no more windows when the start time is greater
// than or equal to the stop time.
subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive())
if startT := int64(values.Time(t.nextTS).Add(subEvery)); startT >= int64(t.bounds.Stop) {
return nil, nil, false
}
// Create a buffer with the buffer size.
// TODO(jsternberg): Calculate the exact size with max points as the maximum.
startB.Resize(storage.MaxPointsPerBlock)
stopB.Resize(storage.MaxPointsPerBlock)
for ; ; t.nextTS = int64(values.Time(t.nextTS).Add(t.window.Every)) {
startT, stopT := t.getWindowBoundsFor(t.nextTS)
if startT >= int64(t.bounds.Stop) {
break
}
startB.Append(startT)
stopB.Append(stopT)
}
start = startB.NewInt64Array()
stop = stopB.NewInt64Array()
return start, stop, true
}
// Retrieve the next buffer so we can copy the timestamps.
if !t.nextBuffer() {
return nil, nil, false
}
// Copy over the timestamps from the next buffer and adjust
// times for the boundaries.
startB.Resize(len(t.arr.Timestamps))
stopB.Resize(len(t.arr.Timestamps))
for _, stopT := range t.arr.Timestamps {
startT, stopT := t.getWindowBoundsFor(stopT)
startB.Append(startT)
stopB.Append(stopT)
}
start = startB.NewInt64Array()
stop = stopB.NewInt64Array()
return start, stop, true
}
func (t *{{.name}}WindowTable) getWindowBoundsFor(ts int64) (startT, stopT int64) {
subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive())
startT, stopT = int64(values.Time(ts).Add(subEvery)), ts
if startT < int64(t.bounds.Start) {
startT = int64(t.bounds.Start)
}
if stopT > int64(t.bounds.Stop) {
stopT = int64(t.bounds.Stop)
}
return startT, stopT
}
// nextAt will retrieve the next value that can be used with
// the given stop timestamp. If no values can be used with the timestamp,
// it will return the default value and false.
func (t *{{.name}}WindowTable) nextAt(ts int64) (v {{.Type}}, ok bool) {
if !t.nextBuffer() {
return
} else if !t.isInWindow(ts, t.arr.Timestamps[t.idxInArr]) {
return
}
v, ok = t.arr.Values[t.idxInArr], true
t.idxInArr++
return v, ok
}
// isInWindow will check if the given time at stop can be used within
// the window stop time for ts. The ts may be a truncated stop time
// because of a restricted boundary while stop will be the true
// stop time returned by storage.
func (t *{{.name}}WindowTable) isInWindow(ts int64, stop int64) bool {
// This method checks if the stop time is a valid stop time for
// that interval. This calculation is different from the calculation
// of the window itself. For example, for a 10 second window that
// starts at 20 seconds, we would include points between [20, 30).
// The stop time for this interval would be 30, but because the stop
// time can be truncated, valid stop times range from anywhere between
// (20, 30]. The storage engine will always produce 30 as the end time
// but we may have truncated the stop time because of the boundary
// and this is why we are checking for this range instead of checking
// if the two values are equal.
subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive())
start := int64(values.Time(stop).Add(subEvery))
return start < ts && ts <= stop
}
// nextBuffer will ensure the array cursor is filled
// and will return true if there is at least one value
// that can be read from it.
func (t *{{.name}}WindowTable) nextBuffer() bool {
// Discard the current array cursor if we have
// exceeded it.
if t.arr != nil && t.idxInArr >= t.arr.Len() {
t.arr = nil
}
// Retrieve the next array cursor if needed.
if t.arr == nil {
arr := t.cur.Next()
if arr.Len() == 0 {
return false
}
t.arr, t.idxInArr = arr, 0
}
return true
}
// appendValues will scan the timestamps and append values
// that match those timestamps from the buffer.
func (t *{{.name}}WindowTable) appendValues(intervals []int64, appendValue func(v {{.Type}}), appendNull func()) {
for i := 0; i < len(intervals); i++ {
if v, ok := t.nextAt(intervals[i]); ok {
appendValue(v)
continue
}
appendNull()
}
}
func (t *{{.name}}WindowTable) advance() bool {
if !t.nextBuffer() {
return false
}
// Create the timestamps for the next window.
start, stop, ok := t.createNextBufferTimes()
if !ok {
return false
}
values := t.mergeValues(stop.Int64Values())
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
cr := t.allocateBuffer(stop.Len())
if t.timeColumn != "" {
switch t.timeColumn {
case execute.DefaultStopColLabel:
cr.cols[timeColIdx] = stop
start.Release()
case execute.DefaultStartColLabel:
cr.cols[timeColIdx] = start
stop.Release()
}
cr.cols[valueColIdx] = values
t.appendBounds(cr)
} else {
cr.cols[startColIdx] = start
cr.cols[stopColIdx] = stop
cr.cols[valueColIdxWithoutTime] = values
}
t.appendTags(cr)
return true
}
// group table
type {{.name}}GroupTable struct {

View File

@ -25,6 +25,7 @@ type table struct {
defs [][]byte
done chan struct{}
empty bool
colBufs *colReader
@ -69,6 +70,10 @@ func (t *table) isCancelled() bool {
return atomic.LoadInt32(&t.cancelled) != 0
}
func (t *table) init(advance func() bool) {
t.empty = !advance() && t.err == nil
}
func (t *table) do(f func(flux.ColReader) error, advance func() bool) error {
// Mark this table as having been used. If this doesn't
// succeed, then this has already been invoked somewhere else.
@ -229,27 +234,61 @@ func (t *floatTable) toArrowBuffer(vs []float64) *array.Float64 {
func (t *floatGroupTable) toArrowBuffer(vs []float64) *array.Float64 {
return arrow.NewFloat(vs, t.alloc)
}
func (t *floatWindowTable) mergeValues(intervals []int64) *array.Float64 {
b := arrow.NewFloatBuilder(t.alloc)
b.Resize(len(intervals))
t.appendValues(intervals, b.Append, b.AppendNull)
return b.NewFloat64Array()
}
func (t *integerTable) toArrowBuffer(vs []int64) *array.Int64 {
return arrow.NewInt(vs, t.alloc)
}
func (t *integerGroupTable) toArrowBuffer(vs []int64) *array.Int64 {
return arrow.NewInt(vs, t.alloc)
}
func (t *integerWindowTable) mergeValues(intervals []int64) *array.Int64 {
b := arrow.NewIntBuilder(t.alloc)
b.Resize(len(intervals))
appendNull := b.AppendNull
if t.fillValue != nil {
appendNull = func() { b.Append(*t.fillValue) }
}
t.appendValues(intervals, b.Append, appendNull)
return b.NewInt64Array()
}
func (t *unsignedTable) toArrowBuffer(vs []uint64) *array.Uint64 {
return arrow.NewUint(vs, t.alloc)
}
func (t *unsignedGroupTable) toArrowBuffer(vs []uint64) *array.Uint64 {
return arrow.NewUint(vs, t.alloc)
}
func (t *unsignedWindowTable) mergeValues(intervals []int64) *array.Uint64 {
b := arrow.NewUintBuilder(t.alloc)
b.Resize(len(intervals))
t.appendValues(intervals, b.Append, b.AppendNull)
return b.NewUint64Array()
}
func (t *stringTable) toArrowBuffer(vs []string) *array.Binary {
return arrow.NewString(vs, t.alloc)
}
func (t *stringGroupTable) toArrowBuffer(vs []string) *array.Binary {
return arrow.NewString(vs, t.alloc)
}
func (t *stringWindowTable) mergeValues(intervals []int64) *array.Binary {
b := arrow.NewStringBuilder(t.alloc)
b.Resize(len(intervals))
t.appendValues(intervals, b.AppendString, b.AppendNull)
return b.NewBinaryArray()
}
func (t *booleanTable) toArrowBuffer(vs []bool) *array.Boolean {
return arrow.NewBool(vs, t.alloc)
}
func (t *booleanGroupTable) toArrowBuffer(vs []bool) *array.Boolean {
return arrow.NewBool(vs, t.alloc)
}
func (t *booleanWindowTable) mergeValues(intervals []int64) *array.Boolean {
b := arrow.NewBoolBuilder(t.alloc)
b.Resize(len(intervals))
t.appendValues(intervals, b.Append, b.AppendNull)
return b.NewBooleanArray()
}

199
storage/flux/window.go Normal file
View File

@ -0,0 +1,199 @@
package storageflux
import (
"context"
"fmt"
"sync/atomic"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/memory"
"github.com/influxdata/flux"
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2"
)
// splitWindows will split a windowTable by creating a new table from each
// row and modifying the group key to use the start and stop values from
// that row.
func splitWindows(ctx context.Context, alloc memory.Allocator, in flux.Table, selector bool, f func(t flux.Table) error) error {
wts := &windowTableSplitter{
ctx: ctx,
in: in,
alloc: alloc,
selector: selector,
}
return wts.Do(f)
}
type windowTableSplitter struct {
ctx context.Context
in flux.Table
alloc memory.Allocator
selector bool
}
func (w *windowTableSplitter) Do(f func(flux.Table) error) error {
defer w.in.Done()
startIdx, err := w.getTimeColumnIndex(execute.DefaultStartColLabel)
if err != nil {
return err
}
stopIdx, err := w.getTimeColumnIndex(execute.DefaultStopColLabel)
if err != nil {
return err
}
return w.in.Do(func(cr flux.ColReader) error {
// Retrieve the start and stop columns for splitting
// the windows.
start := cr.Times(startIdx)
stop := cr.Times(stopIdx)
// Iterate through each time to produce a table
// using the start and stop values.
arrs := make([]array.Interface, len(cr.Cols()))
for j := range cr.Cols() {
arrs[j] = getColumnValues(cr, j)
}
values := arrs[valueColIdx]
for i, n := 0, cr.Len(); i < n; i++ {
startT, stopT := start.Value(i), stop.Value(i)
// Rewrite the group key using the new time.
key := groupKeyForWindow(cr.Key(), startT, stopT)
if w.selector && values.IsNull(i) {
// Produce an empty table if the value is null
// and this is a selector.
table := execute.NewEmptyTable(key, cr.Cols())
if err := f(table); err != nil {
return err
}
continue
}
// Produce a slice for each column into a new
// table buffer.
buffer := arrow.TableBuffer{
GroupKey: key,
Columns: cr.Cols(),
Values: make([]array.Interface, len(cr.Cols())),
}
for j, arr := range arrs {
buffer.Values[j] = arrow.Slice(arr, int64(i), int64(i+1))
}
// Wrap these into a single table and execute.
done := make(chan struct{})
table := &windowTableRow{
buffer: buffer,
done: done,
}
if err := f(table); err != nil {
return err
}
select {
case <-done:
case <-w.ctx.Done():
return w.ctx.Err()
}
}
return nil
})
}
func (w *windowTableSplitter) getTimeColumnIndex(label string) (int, error) {
j := execute.ColIdx(label, w.in.Cols())
if j < 0 {
return -1, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("missing %q column from window splitter", label),
}
} else if c := w.in.Cols()[j]; c.Type != flux.TTime {
return -1, &influxdb.Error{
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("%q column must be of type time", label),
}
}
return j, nil
}
type windowTableRow struct {
used int32
buffer arrow.TableBuffer
done chan struct{}
}
func (w *windowTableRow) Key() flux.GroupKey {
return w.buffer.GroupKey
}
func (w *windowTableRow) Cols() []flux.ColMeta {
return w.buffer.Columns
}
func (w *windowTableRow) Do(f func(flux.ColReader) error) error {
if !atomic.CompareAndSwapInt32(&w.used, 0, 1) {
return &influxdb.Error{
Code: influxdb.EInternal,
Msg: "table already read",
}
}
defer close(w.done)
err := f(&w.buffer)
w.buffer.Release()
return err
}
func (w *windowTableRow) Done() {
if atomic.CompareAndSwapInt32(&w.used, 0, 1) {
w.buffer.Release()
close(w.done)
}
}
func (w *windowTableRow) Empty() bool {
return false
}
func groupKeyForWindow(key flux.GroupKey, start, stop int64) flux.GroupKey {
cols := key.Cols()
vs := make([]values.Value, len(cols))
for j, c := range cols {
if c.Label == execute.DefaultStartColLabel {
vs[j] = values.NewTime(values.Time(start))
} else if c.Label == execute.DefaultStopColLabel {
vs[j] = values.NewTime(values.Time(stop))
} else {
vs[j] = key.Value(j)
}
}
return execute.NewGroupKey(cols, vs)
}
// getColumnValues returns the array from the column reader as an array.Interface.
func getColumnValues(cr flux.ColReader, j int) array.Interface {
switch typ := cr.Cols()[j].Type; typ {
case flux.TInt:
return cr.Ints(j)
case flux.TUInt:
return cr.UInts(j)
case flux.TFloat:
return cr.Floats(j)
case flux.TString:
return cr.Strings(j)
case flux.TBool:
return cr.Bools(j)
case flux.TTime:
return cr.Times(j)
default:
panic(fmt.Errorf("unimplemented column type: %s", typ))
}
}

View File

@ -78,6 +78,8 @@ type GroupCursor interface {
type Store interface {
ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest) (ResultSet, error)
ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest) (GroupResultSet, error)
// WindowAggregate will invoke a ReadWindowAggregateRequest against the Store.
WindowAggregate(ctx context.Context, req *datatypes.ReadWindowAggregateRequest) (ResultSet, error)
TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cursors.StringIterator, error)
TagValues(ctx context.Context, req *datatypes.TagValuesRequest) (cursors.StringIterator, error)

View File

@ -54,6 +54,41 @@ type Store struct {
Logger *zap.Logger
}
func (s *Store) WindowAggregate(ctx context.Context, req *datatypes.ReadWindowAggregateRequest) (reads.ResultSet, error) {
if req.ReadSource == nil {
return nil, errors.New("missing read source")
}
source, err := getReadSource(*req.ReadSource)
if err != nil {
return nil, err
}
database, rp, start, end, err := s.validateArgs(source.OrganizationID, source.BucketID, req.Range.Start, req.Range.End)
if err != nil {
return nil, err
}
shardIDs, err := s.findShardIDs(database, rp, false, start, end)
if err != nil {
return nil, err
}
if len(shardIDs) == 0 { // TODO(jeff): this was a typed nil
return nil, nil
}
var cur reads.SeriesCursor
if ic, err := newIndexSeriesCursor(ctx, req.Predicate, s.TSDBStore.Shards(shardIDs)); err != nil {
return nil, err
} else if ic == nil { // TODO(jeff): this was a typed nil
return nil, nil
} else {
cur = ic
}
return reads.NewWindowAggregateResultSet(ctx, req, cur)
}
func NewStore(store TSDBStore, metaClient MetaClient) *Store {
return &Store{
TSDBStore: store,