feat(storage/flux): support table.fill() within aggregate window pushdown (#21519)

pull/21522/head
Jonathan A. Sternberg 2021-05-19 20:22:56 -05:00 committed by GitHub
parent 35ce6e6946
commit ce48262c31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 543 additions and 142 deletions

View File

@ -119,10 +119,14 @@ type ReadWindowAggregatePhysSpec struct {
Aggregates []plan.ProcedureKind
CreateEmpty bool
TimeColumn string
// ForceAggregate forces the aggregates to be treated as
// aggregates even if they are selectors.
ForceAggregate bool
}
func (s *ReadWindowAggregatePhysSpec) PlanDetails() string {
return fmt.Sprintf("every = %v, aggregates = %v, createEmpty = %v, timeColumn = \"%s\"", s.WindowEvery, s.Aggregates, s.CreateEmpty, s.TimeColumn)
return fmt.Sprintf("every = %v, aggregates = %v, createEmpty = %v, timeColumn = \"%s\", forceAggregate = %v", s.WindowEvery, s.Aggregates, s.CreateEmpty, s.TimeColumn, s.ForceAggregate)
}
func (s *ReadWindowAggregatePhysSpec) Kind() plan.ProcedureKind {
@ -130,16 +134,12 @@ func (s *ReadWindowAggregatePhysSpec) Kind() plan.ProcedureKind {
}
func (s *ReadWindowAggregatePhysSpec) Copy() plan.ProcedureSpec {
ns := new(ReadWindowAggregatePhysSpec)
ns := *s
ns.ReadRangePhysSpec = *s.ReadRangePhysSpec.Copy().(*ReadRangePhysSpec)
ns.WindowEvery = s.WindowEvery
ns.Offset = s.Offset
ns.Aggregates = s.Aggregates
ns.CreateEmpty = s.CreateEmpty
ns.TimeColumn = s.TimeColumn
ns.Aggregates = make([]plan.ProcedureKind, len(s.Aggregates))
copy(ns.Aggregates, s.Aggregates)
return ns
return &ns
}
type ReadTagKeysPhysSpec struct {

View File

@ -11,6 +11,7 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/stdlib/experimental/table"
"github.com/influxdata/flux/stdlib/influxdata/influxdb"
"github.com/influxdata/flux/stdlib/universe"
"github.com/influxdata/flux/values"
@ -27,6 +28,7 @@ func init() {
PushDownReadTagValuesRule{},
SortedPivotRule{},
PushDownWindowAggregateRule{},
PushDownWindowForceAggregateRule{},
PushDownWindowAggregateByTimeRule{},
PushDownBareAggregateRule{},
GroupWindowAggregateTransposeRule{},
@ -752,7 +754,44 @@ func (PushDownWindowAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (p
}), true, nil
}
// PushDownWindowAggregateWithTimeRule will match the given pattern.
// PushDownWindowForceAggregateRule will match the given pattern.
// ReadWindowAggregatePhys |> table.fill()
//
// If this pattern matches, then the ForceAggregate switch will be enabled
// on the ReadWindowAggregate which will force selectors to return a null value.
//
// This pattern is idempotent and may be applied multiple times with the same effect.
type PushDownWindowForceAggregateRule struct{}
func (PushDownWindowForceAggregateRule) Name() string {
return "PushDownWindowForceAggregateRule"
}
func (PushDownWindowForceAggregateRule) Pattern() plan.Pattern {
return plan.Pat(table.FillKind,
plan.Pat(ReadWindowAggregatePhysKind))
}
func (PushDownWindowForceAggregateRule) Rewrite(ctx context.Context, pn plan.Node) (plan.Node, bool, error) {
windowAggregateNode := pn.Predecessors()[0]
windowAggregateSpec := windowAggregateNode.ProcedureSpec().(*ReadWindowAggregatePhysSpec)
if windowAggregateSpec.WindowEvery == flux.ConvertDuration(math.MaxInt64) {
// Do not apply this transformation to the bare aggregate case.
// There's virtually no benefit to pushing that down since there are no
// subsequent transformations to push down and I'm not actually sure the
// code works properly in that case.
return pn, false, nil
}
windowAggregateSpec.ForceAggregate = true
newNode, err := plan.MergeToPhysicalNode(pn, windowAggregateNode, windowAggregateSpec)
if err != nil {
return pn, false, err
}
return newNode, true, nil
}
// PushDownWindowAggregateByTimeRule will match the given pattern.
// ReadWindowAggregatePhys |> duplicate |> window(every: inf)
//
// If this pattern matches and the arguments to duplicate are
@ -764,7 +803,7 @@ func (PushDownWindowAggregateByTimeRule) Name() string {
return "PushDownWindowAggregateByTimeRule"
}
func (rule PushDownWindowAggregateByTimeRule) Pattern() plan.Pattern {
func (PushDownWindowAggregateByTimeRule) Pattern() plan.Pattern {
return plan.Pat(universe.WindowKind,
plan.Pat(universe.SchemaMutationKind,
plan.Pat(ReadWindowAggregatePhysKind)))

View File

@ -14,6 +14,7 @@ import (
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/plan/plantest"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/stdlib/experimental/table"
fluxinfluxdb "github.com/influxdata/flux/stdlib/influxdata/influxdb"
"github.com/influxdata/flux/stdlib/universe"
"github.com/influxdata/flux/values"
@ -1942,6 +1943,120 @@ func TestPushDownWindowAggregateRule(t *testing.T) {
}
}
func TestPushDownWindowForceAggregateRule(t *testing.T) {
rules := []plan.Rule{
influxdb.PushDownWindowAggregateRule{},
influxdb.PushDownWindowForceAggregateRule{},
influxdb.PushDownWindowAggregateByTimeRule{},
}
readRange := influxdb.ReadRangePhysSpec{
Bucket: "test",
Bounds: flux.Bounds{
Start: flux.Time{
IsRelative: true,
Relative: -time.Hour,
},
Stop: flux.Time{
IsRelative: true,
},
},
}
tests := []plantest.RuleTestCase{
{
Name: "simple",
Rules: rules,
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadWindowAggregate", &influxdb.ReadWindowAggregatePhysSpec{
ReadRangePhysSpec: readRange,
WindowEvery: flux.ConvertDuration(5 * time.Minute),
Aggregates: []plan.ProcedureKind{
universe.MaxKind,
},
}),
plan.CreatePhysicalNode("fill", &table.FillProcedureSpec{}),
},
Edges: [][2]int{
{0, 1},
},
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("merged_ReadWindowAggregate_fill", &influxdb.ReadWindowAggregatePhysSpec{
ReadRangePhysSpec: readRange,
WindowEvery: flux.ConvertDuration(5 * time.Minute),
Aggregates: []plan.ProcedureKind{
universe.MaxKind,
},
ForceAggregate: true,
}),
},
},
},
{
Name: "idempotent",
Rules: rules,
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadWindowAggregate", &influxdb.ReadWindowAggregatePhysSpec{
ReadRangePhysSpec: readRange,
WindowEvery: flux.ConvertDuration(5 * time.Minute),
Aggregates: []plan.ProcedureKind{
universe.MaxKind,
},
}),
plan.CreatePhysicalNode("fill0", &table.FillProcedureSpec{}),
plan.CreatePhysicalNode("fill1", &table.FillProcedureSpec{}),
},
Edges: [][2]int{
{0, 1},
{1, 2},
},
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("merged_ReadWindowAggregate_fill0_fill1", &influxdb.ReadWindowAggregatePhysSpec{
ReadRangePhysSpec: readRange,
WindowEvery: flux.ConvertDuration(5 * time.Minute),
Aggregates: []plan.ProcedureKind{
universe.MaxKind,
},
ForceAggregate: true,
}),
},
},
},
{
Name: "bare",
Rules: rules,
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadWindowAggregate", &influxdb.ReadWindowAggregatePhysSpec{
ReadRangePhysSpec: readRange,
WindowEvery: flux.ConvertDuration(math.MaxInt64),
Aggregates: []plan.ProcedureKind{
universe.MaxKind,
},
}),
plan.CreatePhysicalNode("fill", &table.FillProcedureSpec{}),
},
Edges: [][2]int{
{0, 1},
},
},
NoChange: true,
},
}
for _, tc := range tests {
t.Run(tc.Name, func(t *testing.T) {
plantest.PhysicalRuleTestHelper(t, &tc)
})
}
}
func TestTransposeGroupToWindowAggregateRule(t *testing.T) {
// Turn on all variants.
flagger := mock.NewFlagger(map[feature.Flag]interface{}{

View File

@ -355,9 +355,10 @@ func createReadWindowAggregateSource(s plan.ProcedureSpec, id execute.DatasetID,
Period: spec.WindowEvery,
Offset: spec.Offset,
},
Aggregates: spec.Aggregates,
CreateEmpty: spec.CreateEmpty,
TimeColumn: spec.TimeColumn,
Aggregates: spec.Aggregates,
CreateEmpty: spec.CreateEmpty,
TimeColumn: spec.TimeColumn,
ForceAggregate: spec.ForceAggregate,
},
a,
), nil

View File

@ -56,8 +56,10 @@ type ReadTagValuesSpec struct {
TagKey string
}
// ReadWindowAggregateSpec defines the options for WindowAggregate.
//
// Window and the WindowEvery/Offset should be mutually exclusive. If you set either the WindowEvery or Offset with
// nanosecond values, then the Window will be ignored
// nanosecond values, then the Window will be ignored.
type ReadWindowAggregateSpec struct {
ReadFilterSpec
WindowEvery int64
@ -66,6 +68,11 @@ type ReadWindowAggregateSpec struct {
CreateEmpty bool
TimeColumn string
Window execute.Window
// ForceAggregate forces all aggregates to be treated as aggregates.
// This forces selectors, which normally don't return values for empty
// windows, to return a null value.
ForceAggregate bool
}
func (spec *ReadWindowAggregateSpec) Name() string {

View File

@ -730,13 +730,13 @@ READ:
hasTimeCol := timeColumn != ""
switch typedCur := cur.(type) {
case cursors.IntegerArrayCursor:
if !selector {
if !selector || wai.spec.ForceAggregate {
var fillValue *int64
if isAggregateCount(wai.spec.Aggregates[0]) {
fillValue = func(v int64) *int64 { return &v }(0)
}
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TInt, hasTimeCol)
table = newIntegerWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, fillValue, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newIntegerWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, !selector, fillValue, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else if createEmpty && !hasTimeCol {
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt)
table = newIntegerEmptyWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
@ -748,9 +748,9 @@ READ:
table = newIntegerWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
}
case cursors.FloatArrayCursor:
if !selector {
if !selector || wai.spec.ForceAggregate {
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TFloat, hasTimeCol)
table = newFloatWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newFloatWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, !selector, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else if createEmpty && !hasTimeCol {
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat)
table = newFloatEmptyWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
@ -762,9 +762,9 @@ READ:
table = newFloatWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
}
case cursors.UnsignedArrayCursor:
if !selector {
if !selector || wai.spec.ForceAggregate {
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TUInt, hasTimeCol)
table = newUnsignedWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newUnsignedWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, !selector, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else if createEmpty && !hasTimeCol {
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt)
table = newUnsignedEmptyWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
@ -776,9 +776,9 @@ READ:
table = newUnsignedWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
}
case cursors.BooleanArrayCursor:
if !selector {
if !selector || wai.spec.ForceAggregate {
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TBool, hasTimeCol)
table = newBooleanWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newBooleanWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, !selector, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else if createEmpty && !hasTimeCol {
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool)
table = newBooleanEmptyWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
@ -790,9 +790,9 @@ READ:
table = newBooleanWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
}
case cursors.StringArrayCursor:
if !selector {
if !selector || wai.spec.ForceAggregate {
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TString, hasTimeCol)
table = newStringWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newStringWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, !selector, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else if createEmpty && !hasTimeCol {
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
table = newStringEmptyWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)

View File

@ -111,6 +111,7 @@ type floatWindowTable struct {
idxInArr int
createEmpty bool
timeColumn string
isAggregate bool
window interval.Window
}
@ -121,6 +122,7 @@ func newFloatWindowTable(
window interval.Window,
createEmpty bool,
timeColumn string,
isAggregate bool,
key flux.GroupKey,
cols []flux.ColMeta,
@ -137,6 +139,7 @@ func newFloatWindowTable(
window: window,
createEmpty: createEmpty,
timeColumn: timeColumn,
isAggregate: isAggregate,
}
if t.createEmpty {
start := int64(bounds.Start)
@ -217,10 +220,10 @@ func (t *floatWindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64, in
// nextAt will retrieve the next value that can be used with
// the given stop timestamp. If no values can be used with the timestamp,
// it will return the default value and false.
func (t *floatWindowTable) nextAt(ts int64) (v float64, ok bool) {
func (t *floatWindowTable) nextAt(stop int64) (v float64, ok bool) {
if !t.nextBuffer() {
return
} else if !t.isInWindow(ts, t.arr.Timestamps[t.idxInArr]) {
} else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) {
return
}
v, ok = t.arr.Values[t.idxInArr], true
@ -228,23 +231,26 @@ func (t *floatWindowTable) nextAt(ts int64) (v float64, ok bool) {
return v, ok
}
// isInWindow will check if the given time at stop can be used within
// the window stop time for ts. The ts may be a truncated stop time
// because of a restricted boundary while stop will be the true
// stop time returned by storage.
func (t *floatWindowTable) isInWindow(ts int64, stop int64) bool {
// This method checks if the stop time is a valid stop time for
// that interval. This calculation is different from the calculation
// of the window itself. For example, for a 10 second window that
// starts at 20 seconds, we would include points between [20, 30).
// The stop time for this interval would be 30, but because the stop
// time can be truncated, valid stop times range from anywhere between
// (20, 30]. The storage engine will always produce 30 as the end time
// but we may have truncated the stop time because of the boundary
// and this is why we are checking for this range instead of checking
// if the two values are equal.
start := int64(t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stop))).Start())
return start < ts && ts <= stop
// isInWindow will check if the given time may be used within the window
// denoted by the stop timestamp. The stop may be a truncated stop time
// because of a restricted boundary.
//
// When used with an aggregate, ts will be the true stop time returned
// by storage. When used with an aggregate, it will be the real time
// for the point.
func (t *floatWindowTable) isInWindow(stop int64, ts int64) bool {
// Retrieve the boundary associated with this stop time.
// This will be the boundary for the previous nanosecond.
bounds := t.window.GetLatestBounds(values.Time(stop - 1))
start, stop := int64(bounds.Start()), int64(bounds.Stop())
// For an aggregate, the timestamp will be the stop time of the boundary.
if t.isAggregate {
return start < ts && ts <= stop
}
// For a selector, the timestamp should be within the boundary.
return start <= ts && ts < stop
}
// nextBuffer will ensure the array cursor is filled
@ -1086,6 +1092,7 @@ type integerWindowTable struct {
idxInArr int
createEmpty bool
timeColumn string
isAggregate bool
window interval.Window
fillValue *int64
}
@ -1097,6 +1104,7 @@ func newIntegerWindowTable(
window interval.Window,
createEmpty bool,
timeColumn string,
isAggregate bool,
fillValue *int64,
key flux.GroupKey,
cols []flux.ColMeta,
@ -1113,6 +1121,7 @@ func newIntegerWindowTable(
window: window,
createEmpty: createEmpty,
timeColumn: timeColumn,
isAggregate: isAggregate,
fillValue: fillValue,
}
if t.createEmpty {
@ -1194,10 +1203,10 @@ func (t *integerWindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64,
// nextAt will retrieve the next value that can be used with
// the given stop timestamp. If no values can be used with the timestamp,
// it will return the default value and false.
func (t *integerWindowTable) nextAt(ts int64) (v int64, ok bool) {
func (t *integerWindowTable) nextAt(stop int64) (v int64, ok bool) {
if !t.nextBuffer() {
return
} else if !t.isInWindow(ts, t.arr.Timestamps[t.idxInArr]) {
} else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) {
return
}
v, ok = t.arr.Values[t.idxInArr], true
@ -1205,23 +1214,26 @@ func (t *integerWindowTable) nextAt(ts int64) (v int64, ok bool) {
return v, ok
}
// isInWindow will check if the given time at stop can be used within
// the window stop time for ts. The ts may be a truncated stop time
// because of a restricted boundary while stop will be the true
// stop time returned by storage.
func (t *integerWindowTable) isInWindow(ts int64, stop int64) bool {
// This method checks if the stop time is a valid stop time for
// that interval. This calculation is different from the calculation
// of the window itself. For example, for a 10 second window that
// starts at 20 seconds, we would include points between [20, 30).
// The stop time for this interval would be 30, but because the stop
// time can be truncated, valid stop times range from anywhere between
// (20, 30]. The storage engine will always produce 30 as the end time
// but we may have truncated the stop time because of the boundary
// and this is why we are checking for this range instead of checking
// if the two values are equal.
start := int64(t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stop))).Start())
return start < ts && ts <= stop
// isInWindow will check if the given time may be used within the window
// denoted by the stop timestamp. The stop may be a truncated stop time
// because of a restricted boundary.
//
// When used with an aggregate, ts will be the true stop time returned
// by storage. When used with an aggregate, it will be the real time
// for the point.
func (t *integerWindowTable) isInWindow(stop int64, ts int64) bool {
// Retrieve the boundary associated with this stop time.
// This will be the boundary for the previous nanosecond.
bounds := t.window.GetLatestBounds(values.Time(stop - 1))
start, stop := int64(bounds.Start()), int64(bounds.Stop())
// For an aggregate, the timestamp will be the stop time of the boundary.
if t.isAggregate {
return start < ts && ts <= stop
}
// For a selector, the timestamp should be within the boundary.
return start <= ts && ts < stop
}
// nextBuffer will ensure the array cursor is filled
@ -2064,6 +2076,7 @@ type unsignedWindowTable struct {
idxInArr int
createEmpty bool
timeColumn string
isAggregate bool
window interval.Window
}
@ -2074,6 +2087,7 @@ func newUnsignedWindowTable(
window interval.Window,
createEmpty bool,
timeColumn string,
isAggregate bool,
key flux.GroupKey,
cols []flux.ColMeta,
@ -2090,6 +2104,7 @@ func newUnsignedWindowTable(
window: window,
createEmpty: createEmpty,
timeColumn: timeColumn,
isAggregate: isAggregate,
}
if t.createEmpty {
start := int64(bounds.Start)
@ -2170,10 +2185,10 @@ func (t *unsignedWindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64,
// nextAt will retrieve the next value that can be used with
// the given stop timestamp. If no values can be used with the timestamp,
// it will return the default value and false.
func (t *unsignedWindowTable) nextAt(ts int64) (v uint64, ok bool) {
func (t *unsignedWindowTable) nextAt(stop int64) (v uint64, ok bool) {
if !t.nextBuffer() {
return
} else if !t.isInWindow(ts, t.arr.Timestamps[t.idxInArr]) {
} else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) {
return
}
v, ok = t.arr.Values[t.idxInArr], true
@ -2181,23 +2196,26 @@ func (t *unsignedWindowTable) nextAt(ts int64) (v uint64, ok bool) {
return v, ok
}
// isInWindow will check if the given time at stop can be used within
// the window stop time for ts. The ts may be a truncated stop time
// because of a restricted boundary while stop will be the true
// stop time returned by storage.
func (t *unsignedWindowTable) isInWindow(ts int64, stop int64) bool {
// This method checks if the stop time is a valid stop time for
// that interval. This calculation is different from the calculation
// of the window itself. For example, for a 10 second window that
// starts at 20 seconds, we would include points between [20, 30).
// The stop time for this interval would be 30, but because the stop
// time can be truncated, valid stop times range from anywhere between
// (20, 30]. The storage engine will always produce 30 as the end time
// but we may have truncated the stop time because of the boundary
// and this is why we are checking for this range instead of checking
// if the two values are equal.
start := int64(t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stop))).Start())
return start < ts && ts <= stop
// isInWindow will check if the given time may be used within the window
// denoted by the stop timestamp. The stop may be a truncated stop time
// because of a restricted boundary.
//
// When used with an aggregate, ts will be the true stop time returned
// by storage. When used with an aggregate, it will be the real time
// for the point.
func (t *unsignedWindowTable) isInWindow(stop int64, ts int64) bool {
// Retrieve the boundary associated with this stop time.
// This will be the boundary for the previous nanosecond.
bounds := t.window.GetLatestBounds(values.Time(stop - 1))
start, stop := int64(bounds.Start()), int64(bounds.Stop())
// For an aggregate, the timestamp will be the stop time of the boundary.
if t.isAggregate {
return start < ts && ts <= stop
}
// For a selector, the timestamp should be within the boundary.
return start <= ts && ts < stop
}
// nextBuffer will ensure the array cursor is filled
@ -3039,6 +3057,7 @@ type stringWindowTable struct {
idxInArr int
createEmpty bool
timeColumn string
isAggregate bool
window interval.Window
}
@ -3049,6 +3068,7 @@ func newStringWindowTable(
window interval.Window,
createEmpty bool,
timeColumn string,
isAggregate bool,
key flux.GroupKey,
cols []flux.ColMeta,
@ -3065,6 +3085,7 @@ func newStringWindowTable(
window: window,
createEmpty: createEmpty,
timeColumn: timeColumn,
isAggregate: isAggregate,
}
if t.createEmpty {
start := int64(bounds.Start)
@ -3145,10 +3166,10 @@ func (t *stringWindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64, i
// nextAt will retrieve the next value that can be used with
// the given stop timestamp. If no values can be used with the timestamp,
// it will return the default value and false.
func (t *stringWindowTable) nextAt(ts int64) (v string, ok bool) {
func (t *stringWindowTable) nextAt(stop int64) (v string, ok bool) {
if !t.nextBuffer() {
return
} else if !t.isInWindow(ts, t.arr.Timestamps[t.idxInArr]) {
} else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) {
return
}
v, ok = t.arr.Values[t.idxInArr], true
@ -3156,23 +3177,26 @@ func (t *stringWindowTable) nextAt(ts int64) (v string, ok bool) {
return v, ok
}
// isInWindow will check if the given time at stop can be used within
// the window stop time for ts. The ts may be a truncated stop time
// because of a restricted boundary while stop will be the true
// stop time returned by storage.
func (t *stringWindowTable) isInWindow(ts int64, stop int64) bool {
// This method checks if the stop time is a valid stop time for
// that interval. This calculation is different from the calculation
// of the window itself. For example, for a 10 second window that
// starts at 20 seconds, we would include points between [20, 30).
// The stop time for this interval would be 30, but because the stop
// time can be truncated, valid stop times range from anywhere between
// (20, 30]. The storage engine will always produce 30 as the end time
// but we may have truncated the stop time because of the boundary
// and this is why we are checking for this range instead of checking
// if the two values are equal.
start := int64(t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stop))).Start())
return start < ts && ts <= stop
// isInWindow will check if the given time may be used within the window
// denoted by the stop timestamp. The stop may be a truncated stop time
// because of a restricted boundary.
//
// When used with an aggregate, ts will be the true stop time returned
// by storage. When used with an aggregate, it will be the real time
// for the point.
func (t *stringWindowTable) isInWindow(stop int64, ts int64) bool {
// Retrieve the boundary associated with this stop time.
// This will be the boundary for the previous nanosecond.
bounds := t.window.GetLatestBounds(values.Time(stop - 1))
start, stop := int64(bounds.Start()), int64(bounds.Stop())
// For an aggregate, the timestamp will be the stop time of the boundary.
if t.isAggregate {
return start < ts && ts <= stop
}
// For a selector, the timestamp should be within the boundary.
return start <= ts && ts < stop
}
// nextBuffer will ensure the array cursor is filled
@ -3958,6 +3982,7 @@ type booleanWindowTable struct {
idxInArr int
createEmpty bool
timeColumn string
isAggregate bool
window interval.Window
}
@ -3968,6 +3993,7 @@ func newBooleanWindowTable(
window interval.Window,
createEmpty bool,
timeColumn string,
isAggregate bool,
key flux.GroupKey,
cols []flux.ColMeta,
@ -3984,6 +4010,7 @@ func newBooleanWindowTable(
window: window,
createEmpty: createEmpty,
timeColumn: timeColumn,
isAggregate: isAggregate,
}
if t.createEmpty {
start := int64(bounds.Start)
@ -4064,10 +4091,10 @@ func (t *booleanWindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64,
// nextAt will retrieve the next value that can be used with
// the given stop timestamp. If no values can be used with the timestamp,
// it will return the default value and false.
func (t *booleanWindowTable) nextAt(ts int64) (v bool, ok bool) {
func (t *booleanWindowTable) nextAt(stop int64) (v bool, ok bool) {
if !t.nextBuffer() {
return
} else if !t.isInWindow(ts, t.arr.Timestamps[t.idxInArr]) {
} else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) {
return
}
v, ok = t.arr.Values[t.idxInArr], true
@ -4075,23 +4102,26 @@ func (t *booleanWindowTable) nextAt(ts int64) (v bool, ok bool) {
return v, ok
}
// isInWindow will check if the given time at stop can be used within
// the window stop time for ts. The ts may be a truncated stop time
// because of a restricted boundary while stop will be the true
// stop time returned by storage.
func (t *booleanWindowTable) isInWindow(ts int64, stop int64) bool {
// This method checks if the stop time is a valid stop time for
// that interval. This calculation is different from the calculation
// of the window itself. For example, for a 10 second window that
// starts at 20 seconds, we would include points between [20, 30).
// The stop time for this interval would be 30, but because the stop
// time can be truncated, valid stop times range from anywhere between
// (20, 30]. The storage engine will always produce 30 as the end time
// but we may have truncated the stop time because of the boundary
// and this is why we are checking for this range instead of checking
// if the two values are equal.
start := int64(t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stop))).Start())
return start < ts && ts <= stop
// isInWindow will check if the given time may be used within the window
// denoted by the stop timestamp. The stop may be a truncated stop time
// because of a restricted boundary.
//
// When used with an aggregate, ts will be the true stop time returned
// by storage. When used with an aggregate, it will be the real time
// for the point.
func (t *booleanWindowTable) isInWindow(stop int64, ts int64) bool {
// Retrieve the boundary associated with this stop time.
// This will be the boundary for the previous nanosecond.
bounds := t.window.GetLatestBounds(values.Time(stop - 1))
start, stop := int64(bounds.Start()), int64(bounds.Stop())
// For an aggregate, the timestamp will be the stop time of the boundary.
if t.isAggregate {
return start < ts && ts <= stop
}
// For a selector, the timestamp should be within the boundary.
return start <= ts && ts < stop
}
// nextBuffer will ensure the array cursor is filled

View File

@ -106,6 +106,7 @@ type {{.name}}WindowTable struct {
idxInArr int
createEmpty bool
timeColumn string
isAggregate bool
window interval.Window
{{if eq .Name "Integer"}}fillValue *{{.Type}}{{end}}
}
@ -117,6 +118,7 @@ func new{{.Name}}WindowTable(
window interval.Window,
createEmpty bool,
timeColumn string,
isAggregate bool,
{{if eq .Name "Integer"}}fillValue *{{.Type}},{{end}}
key flux.GroupKey,
cols []flux.ColMeta,
@ -133,6 +135,7 @@ func new{{.Name}}WindowTable(
window: window,
createEmpty: createEmpty,
timeColumn: timeColumn,
isAggregate: isAggregate,
{{if eq .Name "Integer"}}fillValue: fillValue,{{end}}
}
if t.createEmpty {
@ -214,10 +217,10 @@ func (t *{{.name}}WindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64
// nextAt will retrieve the next value that can be used with
// the given stop timestamp. If no values can be used with the timestamp,
// it will return the default value and false.
func (t *{{.name}}WindowTable) nextAt(ts int64) (v {{.Type}}, ok bool) {
func (t *{{.name}}WindowTable) nextAt(stop int64) (v {{.Type}}, ok bool) {
if !t.nextBuffer() {
return
} else if !t.isInWindow(ts, t.arr.Timestamps[t.idxInArr]) {
} else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) {
return
}
v, ok = t.arr.Values[t.idxInArr], true
@ -225,23 +228,26 @@ func (t *{{.name}}WindowTable) nextAt(ts int64) (v {{.Type}}, ok bool) {
return v, ok
}
// isInWindow will check if the given time at stop can be used within
// the window stop time for ts. The ts may be a truncated stop time
// because of a restricted boundary while stop will be the true
// stop time returned by storage.
func (t *{{.name}}WindowTable) isInWindow(ts int64, stop int64) bool {
// This method checks if the stop time is a valid stop time for
// that interval. This calculation is different from the calculation
// of the window itself. For example, for a 10 second window that
// starts at 20 seconds, we would include points between [20, 30).
// The stop time for this interval would be 30, but because the stop
// time can be truncated, valid stop times range from anywhere between
// (20, 30]. The storage engine will always produce 30 as the end time
// but we may have truncated the stop time because of the boundary
// and this is why we are checking for this range instead of checking
// if the two values are equal.
start := int64(t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stop))).Start())
return start < ts && ts <= stop
// isInWindow will check if the given time may be used within the window
// denoted by the stop timestamp. The stop may be a truncated stop time
// because of a restricted boundary.
//
// When used with an aggregate, ts will be the true stop time returned
// by storage. When used with an aggregate, it will be the real time
// for the point.
func (t *{{.name}}WindowTable) isInWindow(stop int64, ts int64) bool {
// Retrieve the boundary associated with this stop time.
// This will be the boundary for the previous nanosecond.
bounds := t.window.GetLatestBounds(values.Time(stop - 1))
start, stop := int64(bounds.Start()), int64(bounds.Stop())
// For an aggregate, the timestamp will be the stop time of the boundary.
if t.isAggregate {
return start < ts && ts <= stop
}
// For a selector, the timestamp should be within the boundary.
return start <= ts && ts < stop
}
// nextBuffer will ensure the array cursor is filled

View File

@ -1059,6 +1059,210 @@ func TestStorageReader_ReadWindowAggregate_CreateEmptyByStartTime(t *testing.T)
}
}
func TestStorageReader_ReadWindowAggregate_CreateEmptyAggregateByStopTime(t *testing.T) {
reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {
spec := Spec(org, bucket,
MeasurementSpec("m0",
FloatArrayValuesSequence("f0", 15*time.Second, []float64{1.0, 2.0, 3.0, 4.0}),
TagValuesSequence("t0", "a-%s", 0, 3),
),
)
tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:01:00Z")
return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr
})
defer reader.Close()
for _, tt := range []struct {
aggregate plan.ProcedureKind
want flux.TableIterator
}{
{
aggregate: storageflux.CountKind,
want: static.TableGroup{
static.StringKey("_measurement", "m0"),
static.StringKey("_field", "f0"),
static.TimeKey("_start", "2019-11-25T00:00:00Z"),
static.TimeKey("_stop", "2019-11-25T00:01:00Z"),
static.TableMatrix{
static.StringKeys("t0", "a-0", "a-1", "a-2"),
{
static.Table{
static.Times("_time", "2019-11-25T00:00:10Z", 10, 20, 30, 40, 50),
static.Ints("_value", 1, 1, 0, 1, 1, 0),
},
},
},
},
},
{
aggregate: storageflux.MinKind,
want: static.TableGroup{
static.StringKey("_measurement", "m0"),
static.StringKey("_field", "f0"),
static.TimeKey("_start", "2019-11-25T00:00:00Z"),
static.TimeKey("_stop", "2019-11-25T00:01:00Z"),
static.TableMatrix{
static.StringKeys("t0", "a-0", "a-1", "a-2"),
{
static.Table{
static.Times("_time", "2019-11-25T00:00:10Z", 10, 20, 30, 40, 50),
static.Floats("_value", 1, 2, nil, 3, 4, nil),
},
},
},
},
},
{
aggregate: storageflux.MaxKind,
want: static.TableGroup{
static.StringKey("_measurement", "m0"),
static.StringKey("_field", "f0"),
static.TimeKey("_start", "2019-11-25T00:00:00Z"),
static.TimeKey("_stop", "2019-11-25T00:01:00Z"),
static.TableMatrix{
static.StringKeys("t0", "a-0", "a-1", "a-2"),
{
static.Table{
static.Times("_time", "2019-11-25T00:00:10Z", 10, 20, 30, 40, 50),
static.Floats("_value", 1, 2, nil, 3, 4, nil),
},
},
},
},
},
} {
t.Run(string(tt.aggregate), func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
BucketID: reader.Bucket,
Bounds: reader.Bounds,
},
TimeColumn: execute.DefaultStopColLabel,
Window: execute.Window{
Every: flux.ConvertDuration(10 * time.Second),
Period: flux.ConvertDuration(10 * time.Second),
},
Aggregates: []plan.ProcedureKind{
tt.aggregate,
},
CreateEmpty: true,
ForceAggregate: true,
}, mem)
if err != nil {
t.Fatal(err)
}
if diff := table.Diff(tt.want, got); diff != "" {
t.Errorf("unexpected results -want/+got:\n%s", diff)
}
})
}
}
func TestStorageReader_ReadWindowAggregate_CreateEmptyAggregateByStartTime(t *testing.T) {
reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {
spec := Spec(org, bucket,
MeasurementSpec("m0",
FloatArrayValuesSequence("f0", 15*time.Second, []float64{1.0, 2.0, 3.0, 4.0}),
TagValuesSequence("t0", "a-%s", 0, 3),
),
)
tr := TimeRange("2019-11-25T00:00:00Z", "2019-11-25T00:01:00Z")
return datagen.NewSeriesGeneratorFromSpec(spec, tr), tr
})
defer reader.Close()
for _, tt := range []struct {
aggregate plan.ProcedureKind
want flux.TableIterator
}{
{
aggregate: storageflux.CountKind,
want: static.TableGroup{
static.StringKey("_measurement", "m0"),
static.StringKey("_field", "f0"),
static.TimeKey("_start", "2019-11-25T00:00:00Z"),
static.TimeKey("_stop", "2019-11-25T00:01:00Z"),
static.TableMatrix{
static.StringKeys("t0", "a-0", "a-1", "a-2"),
{
static.Table{
static.Times("_time", "2019-11-25T00:00:00Z", 10, 20, 30, 40, 50),
static.Ints("_value", 1, 1, 0, 1, 1, 0),
},
},
},
},
},
{
aggregate: storageflux.MinKind,
want: static.TableGroup{
static.StringKey("_measurement", "m0"),
static.StringKey("_field", "f0"),
static.TimeKey("_start", "2019-11-25T00:00:00Z"),
static.TimeKey("_stop", "2019-11-25T00:01:00Z"),
static.TableMatrix{
static.StringKeys("t0", "a-0", "a-1", "a-2"),
{
static.Table{
static.Times("_time", "2019-11-25T00:00:00Z", 10, 20, 30, 40, 50),
static.Floats("_value", 1, 2, nil, 3, 4, nil),
},
},
},
},
},
{
aggregate: storageflux.MaxKind,
want: static.TableGroup{
static.StringKey("_measurement", "m0"),
static.StringKey("_field", "f0"),
static.TimeKey("_start", "2019-11-25T00:00:00Z"),
static.TimeKey("_stop", "2019-11-25T00:01:00Z"),
static.TableMatrix{
static.StringKeys("t0", "a-0", "a-1", "a-2"),
{
static.Table{
static.Times("_time", "2019-11-25T00:00:00Z", 10, 20, 30, 40, 50),
static.Floats("_value", 1, 2, nil, 3, 4, nil),
},
},
},
},
},
} {
t.Run(string(tt.aggregate), func(t *testing.T) {
mem := &memory.Allocator{}
got, err := reader.ReadWindowAggregate(context.Background(), query.ReadWindowAggregateSpec{
ReadFilterSpec: query.ReadFilterSpec{
OrganizationID: reader.Org,
BucketID: reader.Bucket,
Bounds: reader.Bounds,
},
TimeColumn: execute.DefaultStartColLabel,
Window: execute.Window{
Every: flux.ConvertDuration(10 * time.Second),
Period: flux.ConvertDuration(10 * time.Second),
},
Aggregates: []plan.ProcedureKind{
tt.aggregate,
},
CreateEmpty: true,
ForceAggregate: true,
}, mem)
if err != nil {
t.Fatal(err)
}
if diff := table.Diff(tt.want, got); diff != "" {
t.Errorf("unexpected results -want/+got:\n%s", diff)
}
})
}
}
func TestStorageReader_ReadWindowAggregate_TruncatedBounds(t *testing.T) {
reader := NewStorageReader(t, func(org, bucket platform.ID) (datagen.SeriesGenerator, datagen.TimeRange) {
spec := Spec(org, bucket,
@ -2947,7 +3151,6 @@ func TestStorageReader_ReadGroupSelectTags(t *testing.T) {
},
},
},
}
for _, tt := range cases {
@ -3031,8 +3234,8 @@ func TestStorageReader_ReadGroupNoAgg(t *testing.T) {
BucketID: reader.Bucket,
Bounds: reader.Bounds,
},
GroupMode: query.GroupModeBy,
GroupKeys: []string{"t1"},
GroupMode: query.GroupModeBy,
GroupKeys: []string{"t1"},
}, mem)
if err != nil {
t.Fatal(err)