refactor: update window logic to use interval package

pull/19006/head^2
jl 2021-02-03 16:11:32 -08:00 committed by jlapacik
parent 1ad8f8d23d
commit 034280e819
12 changed files with 862 additions and 839 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/gogo/protobuf/types"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/interval"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/values"
@ -692,6 +693,11 @@ func (wai *windowAggregateIterator) handleRead(f func(flux.Table) error, rs stor
}
}
window, err := interval.NewWindow(wai.spec.Window.Every, wai.spec.Window.Period, wai.spec.Window.Offset)
if err != nil {
return err
}
// these resources must be closed if not nil on return
var (
cur cursors.Cursor
@ -729,72 +735,72 @@ READ:
fillValue = func(v int64) *int64 { return &v }(0)
}
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TInt, hasTimeCol)
table = newIntegerWindowTable(done, typedCur, bnds, wai.spec.Window, createEmpty, timeColumn, fillValue, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newIntegerWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, fillValue, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else if createEmpty && !hasTimeCol {
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt)
table = newIntegerEmptyWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newIntegerEmptyWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else {
// Note hasTimeCol == true means that aggregateWindow() was called.
// Because aggregateWindow() ultimately removes empty tables we
// don't bother creating them here.
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt)
table = newIntegerWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newIntegerWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
}
case cursors.FloatArrayCursor:
if !selector {
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TFloat, hasTimeCol)
table = newFloatWindowTable(done, typedCur, bnds, wai.spec.Window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newFloatWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else if createEmpty && !hasTimeCol {
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat)
table = newFloatEmptyWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newFloatEmptyWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else {
// Note hasTimeCol == true means that aggregateWindow() was called.
// Because aggregateWindow() ultimately removes empty tables we
// don't bother creating them here.
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat)
table = newFloatWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newFloatWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
}
case cursors.UnsignedArrayCursor:
if !selector {
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TUInt, hasTimeCol)
table = newUnsignedWindowTable(done, typedCur, bnds, wai.spec.Window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newUnsignedWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else if createEmpty && !hasTimeCol {
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt)
table = newUnsignedEmptyWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newUnsignedEmptyWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else {
// Note hasTimeCol == true means that aggregateWindow() was called.
// Because aggregateWindow() ultimately removes empty tables we
// don't bother creating them here.
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt)
table = newUnsignedWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newUnsignedWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
}
case cursors.BooleanArrayCursor:
if !selector {
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TBool, hasTimeCol)
table = newBooleanWindowTable(done, typedCur, bnds, wai.spec.Window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newBooleanWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else if createEmpty && !hasTimeCol {
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool)
table = newBooleanEmptyWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newBooleanEmptyWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else {
// Note hasTimeCol == true means that aggregateWindow() was called.
// Because aggregateWindow() ultimately removes empty tables we
// don't bother creating them here.
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool)
table = newBooleanWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newBooleanWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
}
case cursors.StringArrayCursor:
if !selector {
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TString, hasTimeCol)
table = newStringWindowTable(done, typedCur, bnds, wai.spec.Window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newStringWindowTable(done, typedCur, bnds, window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else if createEmpty && !hasTimeCol {
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
table = newStringEmptyWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newStringEmptyWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
} else {
// Note hasTimeCol == true means that aggregateWindow() was called.
// Because aggregateWindow() ultimately removes empty tables we
// don't bother creating them here.
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
table = newStringWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
table = newStringWindowSelectorTable(done, typedCur, bnds, window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
}
default:
panic(fmt.Sprintf("unreachable: %T", typedCur))

File diff suppressed because it is too large Load Diff

View File

@ -9,6 +9,7 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/interval"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2"
@ -100,11 +101,11 @@ func (t *{{.name}}Table) advance() bool {
type {{.name}}WindowTable struct {
{{.name}}Table
arr *cursors.{{.Name}}Array
nextTS int64
windowBounds interval.Bounds
idxInArr int
createEmpty bool
timeColumn string
window execute.Window
window interval.Window
{{if eq .Name "Integer"}}fillValue *{{.Type}}{{end}}
}
@ -112,7 +113,7 @@ func new{{.Name}}WindowTable(
done chan struct{},
cur cursors.{{.Name}}ArrayCursor,
bounds execute.Bounds,
window execute.Window,
window interval.Window,
createEmpty bool,
timeColumn string,
{{if eq .Name "Integer"}}fillValue *{{.Type}},{{end}}
@ -135,7 +136,7 @@ func new{{.Name}}WindowTable(
}
if t.createEmpty {
start := int64(bounds.Start)
t.nextTS = int64(window.GetEarliestBounds(values.Time(start)).Stop)
t.windowBounds = window.GetLatestBounds(values.Time(start))
}
t.readTags(tags)
t.init(t.advance)
@ -156,8 +157,7 @@ func (t *{{.name}}WindowTable) createNextBufferTimes() (start, stop *array.Int64
if t.createEmpty {
// There are no more windows when the start time is greater
// than or equal to the stop time.
subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive())
if startT := int64(values.Time(t.nextTS).Add(subEvery)); startT >= int64(t.bounds.Stop) {
if startT := int64(t.windowBounds.Start()); startT >= int64(t.bounds.Stop) {
return nil, nil, false
}
@ -165,8 +165,8 @@ func (t *{{.name}}WindowTable) createNextBufferTimes() (start, stop *array.Int64
// TODO(jsternberg): Calculate the exact size with max points as the maximum.
startB.Resize(storage.MaxPointsPerBlock)
stopB.Resize(storage.MaxPointsPerBlock)
for ; ; t.nextTS = int64(values.Time(t.nextTS).Add(t.window.Every)) {
startT, stopT := t.getWindowBoundsFor(t.nextTS)
for ; ; t.windowBounds = t.window.NextBounds(t.windowBounds) {
startT, stopT := t.getWindowBoundsFor(t.windowBounds)
if startT >= int64(t.bounds.Stop) {
break
}
@ -188,7 +188,8 @@ func (t *{{.name}}WindowTable) createNextBufferTimes() (start, stop *array.Int64
startB.Resize(len(t.arr.Timestamps))
stopB.Resize(len(t.arr.Timestamps))
for _, stopT := range t.arr.Timestamps {
startT, stopT := t.getWindowBoundsFor(stopT)
bounds := t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stopT)))
startT, stopT := t.getWindowBoundsFor(bounds)
startB.Append(startT)
stopB.Append(stopT)
}
@ -197,16 +198,16 @@ func (t *{{.name}}WindowTable) createNextBufferTimes() (start, stop *array.Int64
return start, stop, true
}
func (t *{{.name}}WindowTable) getWindowBoundsFor(ts int64) (startT, stopT int64) {
subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive())
startT, stopT = int64(values.Time(ts).Add(subEvery)), ts
if startT < int64(t.bounds.Start) {
startT = int64(t.bounds.Start)
func (t *{{.name}}WindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64, int64) {
beg := int64(bounds.Start())
end := int64(bounds.Stop())
if beg < int64(t.bounds.Start) {
beg = int64(t.bounds.Start)
}
if stopT > int64(t.bounds.Stop) {
stopT = int64(t.bounds.Stop)
if end > int64(t.bounds.Stop) {
end = int64(t.bounds.Stop)
}
return startT, stopT
return beg, end
}
// nextAt will retrieve the next value that can be used with
@ -238,8 +239,7 @@ func (t *{{.name}}WindowTable) isInWindow(ts int64, stop int64) bool {
// but we may have truncated the stop time because of the boundary
// and this is why we are checking for this range instead of checking
// if the two values are equal.
subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive())
start := int64(values.Time(stop).Add(subEvery))
start := int64(t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stop))).Start())
return start < ts && ts <= stop
}
@ -316,14 +316,14 @@ func (t *{{.name}}WindowTable) advance() bool {
type {{.name}}WindowSelectorTable struct {
{{.name}}Table
timeColumn string
window execute.Window
window interval.Window
}
func new{{.Name}}WindowSelectorTable(
done chan struct{},
cur cursors.{{.Name}}ArrayCursor,
bounds execute.Bounds,
window execute.Window,
window interval.Window,
timeColumn string,
key flux.GroupKey,
cols []flux.ColMeta,
@ -382,7 +382,7 @@ func (t *{{.name}}WindowSelectorTable) startTimes(arr *cursors.{{.Name}}Array) *
rangeStart := int64(t.bounds.Start)
for _, v := range arr.Timestamps {
if windowStart := int64(t.window.GetEarliestBounds(values.Time(v)).Start); windowStart < rangeStart {
if windowStart := int64(t.window.GetLatestBounds(values.Time(v)).Start()); windowStart < rangeStart {
start.Append(rangeStart)
} else {
start.Append(windowStart)
@ -398,7 +398,7 @@ func (t *{{.name}}WindowSelectorTable) stopTimes(arr *cursors.{{.Name}}Array) *a
rangeStop := int64(t.bounds.Stop)
for _, v := range arr.Timestamps {
if windowStop := int64(t.window.GetEarliestBounds(values.Time(v)).Stop); windowStop > rangeStop {
if windowStop := int64(t.window.GetLatestBounds(values.Time(v)).Stop()); windowStop > rangeStop {
stop.Append(rangeStop)
} else {
stop.Append(windowStop)
@ -415,17 +415,16 @@ type {{.name}}EmptyWindowSelectorTable struct {
idx int
rangeStart int64
rangeStop int64
windowStart int64
windowStop int64
windowBounds interval.Bounds
timeColumn string
window execute.Window
window interval.Window
}
func new{{.Name}}EmptyWindowSelectorTable(
done chan struct{},
cur cursors.{{.Name}}ArrayCursor,
bounds execute.Bounds,
window execute.Window,
window interval.Window,
timeColumn string,
key flux.GroupKey,
cols []flux.ColMeta,
@ -444,8 +443,7 @@ func new{{.Name}}EmptyWindowSelectorTable(
arr: cur.Next(),
rangeStart: rangeStart,
rangeStop: rangeStop,
windowStart: int64(window.GetEarliestBounds(values.Time(rangeStart)).Start),
windowStop: int64(window.GetEarliestBounds(values.Time(rangeStart)).Stop),
windowBounds: window.GetLatestBounds(values.Time(rangeStart)),
window: window,
timeColumn: timeColumn,
}
@ -496,13 +494,13 @@ func (t *{{.name}}EmptyWindowSelectorTable) startTimes(builder *array.{{.ArrowTy
start := arrow.NewIntBuilder(t.alloc)
start.Resize(storage.MaxPointsPerBlock)
for t.windowStart < t.rangeStop {
for int64(t.windowBounds.Start()) < t.rangeStop {
// The first window should start at the
// beginning of the time range.
if t.windowStart < t.rangeStart {
if int64(t.windowBounds.Start()) < t.rangeStart {
start.Append(t.rangeStart)
} else {
start.Append(t.windowStart)
start.Append(int64(t.windowBounds.Start()))
}
var v int64
@ -516,15 +514,14 @@ func (t *{{.name}}EmptyWindowSelectorTable) startTimes(builder *array.{{.ArrowTy
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if t.windowStart <= v && v < t.windowStop {
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
builder.AppendNull()
}
t.windowStart = int64(values.Time(t.windowStart).Add(t.window.Every))
t.windowStop = int64(values.Time(t.windowStop).Add(t.window.Every))
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
@ -544,13 +541,13 @@ func (t *{{.name}}EmptyWindowSelectorTable) stopTimes(builder *array.{{.ArrowTyp
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(storage.MaxPointsPerBlock)
for t.windowStart < t.rangeStop {
for int64(t.windowBounds.Start()) < t.rangeStop {
// The last window should stop at the end of
// the time range.
if t.windowStop > t.rangeStop {
if int64(t.windowBounds.Stop()) > t.rangeStop {
stop.Append(t.rangeStop)
} else {
stop.Append(t.windowStop)
stop.Append(int64(t.windowBounds.Stop()))
}
var v int64
@ -564,15 +561,14 @@ func (t *{{.name}}EmptyWindowSelectorTable) stopTimes(builder *array.{{.ArrowTyp
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if t.windowStart <= v && v < t.windowStop {
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
builder.AppendNull()
}
t.windowStart = int64(values.Time(t.windowStart).Add(t.window.Every))
t.windowStop = int64(values.Time(t.windowStop).Add(t.window.Every))
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
@ -598,22 +594,22 @@ func (t *{{.name}}EmptyWindowSelectorTable) startStopTimes(builder *array.{{.Arr
time := arrow.NewIntBuilder(t.alloc)
time.Resize(storage.MaxPointsPerBlock)
for t.windowStart < t.rangeStop {
for int64(t.windowBounds.Start()) < t.rangeStop {
// The first window should start at the
// beginning of the time range.
if t.windowStart < t.rangeStart {
if int64(t.windowBounds.Start()) < t.rangeStart {
start.Append(t.rangeStart)
} else {
start.Append(t.windowStart)
start.Append(int64(t.windowBounds.Start()))
}
// The last window should stop at the end of
// the time range.
if t.windowStop > t.rangeStop {
if int64(t.windowBounds.Stop()) > t.rangeStop {
stop.Append(t.rangeStop)
} else {
stop.Append(t.windowStop)
stop.Append(int64(t.windowBounds.Stop()))
}
var v int64
@ -627,7 +623,7 @@ func (t *{{.name}}EmptyWindowSelectorTable) startStopTimes(builder *array.{{.Arr
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if t.windowStart <= v && v < t.windowStop {
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
time.Append(v)
t.append(builder, t.arr.Values[t.idx])
t.idx++
@ -636,8 +632,7 @@ func (t *{{.name}}EmptyWindowSelectorTable) startStopTimes(builder *array.{{.Arr
builder.AppendNull()
}
t.windowStart = int64(values.Time(t.windowStart).Add(t.window.Every))
t.windowStop = int64(values.Time(t.windowStop).Add(t.window.Every))
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().

View File

@ -4,7 +4,7 @@ import (
"context"
"math"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/interval"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2/kit/errors"
"github.com/influxdata/influxdb/v2/kit/tracing"
@ -110,13 +110,12 @@ func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cu
offsetDur = convertNsecs(offset)
}
window := execute.Window{
Every: everyDur,
Period: periodDur,
Offset: offsetDur,
window, err := interval.NewWindow(everyDur, periodDur, offsetDur)
if err != nil {
return nil, err
}
if window.Every.Nanoseconds() == math.MaxInt64 {
if everyDur.Nanoseconds() == math.MaxInt64 {
// This means to aggregate over whole series for the query's time range
return newAggregateArrayCursor(r.ctx, agg, cursor)
} else {

View File

@ -11,7 +11,7 @@ import (
"fmt"
"math"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/interval"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
@ -47,8 +47,8 @@ func newLimitArrayCursor(cur cursors.Cursor) cursors.Cursor {
}
}
func newWindowFirstArrayCursor(cur cursors.Cursor, window execute.Window) cursors.Cursor {
if window.Every.IsZero() {
func newWindowFirstArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor {
if window.IsZero() {
return newLimitArrayCursor(cur)
}
switch cur := cur.(type) {
@ -73,8 +73,8 @@ func newWindowFirstArrayCursor(cur cursors.Cursor, window execute.Window) cursor
}
}
func newWindowLastArrayCursor(cur cursors.Cursor, window execute.Window) cursors.Cursor {
if window.Every.IsZero() {
func newWindowLastArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor {
if window.IsZero() {
return newLimitArrayCursor(cur)
}
switch cur := cur.(type) {
@ -99,7 +99,7 @@ func newWindowLastArrayCursor(cur cursors.Cursor, window execute.Window) cursors
}
}
func newWindowCountArrayCursor(cur cursors.Cursor, window execute.Window) cursors.Cursor {
func newWindowCountArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor {
switch cur := cur.(type) {
case cursors.FloatArrayCursor:
@ -122,7 +122,7 @@ func newWindowCountArrayCursor(cur cursors.Cursor, window execute.Window) cursor
}
}
func newWindowSumArrayCursor(cur cursors.Cursor, window execute.Window) (cursors.Cursor, error) {
func newWindowSumArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) {
switch cur := cur.(type) {
case cursors.FloatArrayCursor:
@ -142,7 +142,7 @@ func newWindowSumArrayCursor(cur cursors.Cursor, window execute.Window) (cursors
}
}
func newWindowMinArrayCursor(cur cursors.Cursor, window execute.Window) cursors.Cursor {
func newWindowMinArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor {
switch cur := cur.(type) {
case cursors.FloatArrayCursor:
@ -159,7 +159,7 @@ func newWindowMinArrayCursor(cur cursors.Cursor, window execute.Window) cursors.
}
}
func newWindowMaxArrayCursor(cur cursors.Cursor, window execute.Window) cursors.Cursor {
func newWindowMaxArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor {
switch cur := cur.(type) {
case cursors.FloatArrayCursor:
@ -176,7 +176,7 @@ func newWindowMaxArrayCursor(cur cursors.Cursor, window execute.Window) cursors.
}
}
func newWindowMeanArrayCursor(cur cursors.Cursor, window execute.Window) (cursors.Cursor, error) {
func newWindowMeanArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) {
switch cur := cur.(type) {
case cursors.FloatArrayCursor:
@ -374,12 +374,12 @@ type floatWindowLastArrayCursor struct {
windowEnd int64
res *cursors.FloatArray
tmp *cursors.FloatArray
window execute.Window
window interval.Window
}
// Window array cursors assume that every != 0 && every != MaxInt64.
// Such a cursor will panic in the first case and possibly overflow in the second.
func newFloatWindowLastArrayCursor(cur cursors.FloatArrayCursor, window execute.Window) *floatWindowLastArrayCursor {
func newFloatWindowLastArrayCursor(cur cursors.FloatArrayCursor, window interval.Window) *floatWindowLastArrayCursor {
return &floatWindowLastArrayCursor{
FloatArrayCursor: cur,
windowEnd: math.MinInt64,
@ -425,7 +425,7 @@ NEXT:
c.res.Timestamps[cur] = t
c.res.Values[cur] = a.Values[i]
c.windowEnd = int64(c.window.GetEarliestBounds(values.Time(t)).Stop)
c.windowEnd = int64(c.window.GetLatestBounds(values.Time(t)).Stop())
}
c.tmp.Timestamps = nil
@ -439,12 +439,12 @@ type floatWindowFirstArrayCursor struct {
windowEnd int64
res *cursors.FloatArray
tmp *cursors.FloatArray
window execute.Window
window interval.Window
}
// Window array cursors assume that every != 0 && every != MaxInt64.
// Such a cursor will panic in the first case and possibly overflow in the second.
func newFloatWindowFirstArrayCursor(cur cursors.FloatArrayCursor, window execute.Window) *floatWindowFirstArrayCursor {
func newFloatWindowFirstArrayCursor(cur cursors.FloatArrayCursor, window interval.Window) *floatWindowFirstArrayCursor {
return &floatWindowFirstArrayCursor{
FloatArrayCursor: cur,
windowEnd: math.MinInt64,
@ -480,7 +480,7 @@ NEXT:
continue
}
c.windowEnd = int64(c.window.GetEarliestBounds(values.Time(t)).Stop)
c.windowEnd = int64(c.window.GetLatestBounds(values.Time(t)).Stop())
c.res.Timestamps = append(c.res.Timestamps, t)
c.res.Values = append(c.res.Values, a.Values[i])
@ -502,12 +502,12 @@ type floatWindowCountArrayCursor struct {
cursors.FloatArrayCursor
res *cursors.IntegerArray
tmp *cursors.FloatArray
window execute.Window
window interval.Window
}
func newFloatWindowCountArrayCursor(cur cursors.FloatArrayCursor, window execute.Window) *floatWindowCountArrayCursor {
func newFloatWindowCountArrayCursor(cur cursors.FloatArrayCursor, window interval.Window) *floatWindowCountArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &floatWindowCountArrayCursor{
@ -542,8 +542,8 @@ func (c *floatWindowCountArrayCursor) Next() *cursors.IntegerArray {
var acc int64 = 0
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -554,7 +554,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -573,7 +573,7 @@ WINDOWS:
// start the new window
acc = 0
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
@ -613,12 +613,12 @@ type floatWindowSumArrayCursor struct {
cursors.FloatArrayCursor
res *cursors.FloatArray
tmp *cursors.FloatArray
window execute.Window
window interval.Window
}
func newFloatWindowSumArrayCursor(cur cursors.FloatArrayCursor, window execute.Window) *floatWindowSumArrayCursor {
func newFloatWindowSumArrayCursor(cur cursors.FloatArrayCursor, window interval.Window) *floatWindowSumArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &floatWindowSumArrayCursor{
@ -653,8 +653,8 @@ func (c *floatWindowSumArrayCursor) Next() *cursors.FloatArray {
var acc float64 = 0
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -665,7 +665,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -684,7 +684,7 @@ WINDOWS:
// start the new window
acc = 0
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
@ -724,12 +724,12 @@ type floatWindowMinArrayCursor struct {
cursors.FloatArrayCursor
res *cursors.FloatArray
tmp *cursors.FloatArray
window execute.Window
window interval.Window
}
func newFloatWindowMinArrayCursor(cur cursors.FloatArrayCursor, window execute.Window) *floatWindowMinArrayCursor {
func newFloatWindowMinArrayCursor(cur cursors.FloatArrayCursor, window interval.Window) *floatWindowMinArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &floatWindowMinArrayCursor{
@ -765,8 +765,8 @@ func (c *floatWindowMinArrayCursor) Next() *cursors.FloatArray {
var tsAcc int64
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -777,7 +777,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -796,7 +796,7 @@ WINDOWS:
// start the new window
acc = math.MaxFloat64
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
@ -839,12 +839,12 @@ type floatWindowMaxArrayCursor struct {
cursors.FloatArrayCursor
res *cursors.FloatArray
tmp *cursors.FloatArray
window execute.Window
window interval.Window
}
func newFloatWindowMaxArrayCursor(cur cursors.FloatArrayCursor, window execute.Window) *floatWindowMaxArrayCursor {
func newFloatWindowMaxArrayCursor(cur cursors.FloatArrayCursor, window interval.Window) *floatWindowMaxArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &floatWindowMaxArrayCursor{
@ -880,8 +880,8 @@ func (c *floatWindowMaxArrayCursor) Next() *cursors.FloatArray {
var tsAcc int64
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -892,7 +892,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -911,7 +911,7 @@ WINDOWS:
// start the new window
acc = -math.MaxFloat64
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
@ -954,12 +954,12 @@ type floatWindowMeanArrayCursor struct {
cursors.FloatArrayCursor
res *cursors.FloatArray
tmp *cursors.FloatArray
window execute.Window
window interval.Window
}
func newFloatWindowMeanArrayCursor(cur cursors.FloatArrayCursor, window execute.Window) *floatWindowMeanArrayCursor {
func newFloatWindowMeanArrayCursor(cur cursors.FloatArrayCursor, window interval.Window) *floatWindowMeanArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &floatWindowMeanArrayCursor{
@ -995,8 +995,8 @@ func (c *floatWindowMeanArrayCursor) Next() *cursors.FloatArray {
var count int64
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -1007,7 +1007,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -1027,7 +1027,7 @@ WINDOWS:
// start the new window
sum = 0
count = 0
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
@ -1253,12 +1253,12 @@ type integerWindowLastArrayCursor struct {
windowEnd int64
res *cursors.IntegerArray
tmp *cursors.IntegerArray
window execute.Window
window interval.Window
}
// Window array cursors assume that every != 0 && every != MaxInt64.
// Such a cursor will panic in the first case and possibly overflow in the second.
func newIntegerWindowLastArrayCursor(cur cursors.IntegerArrayCursor, window execute.Window) *integerWindowLastArrayCursor {
func newIntegerWindowLastArrayCursor(cur cursors.IntegerArrayCursor, window interval.Window) *integerWindowLastArrayCursor {
return &integerWindowLastArrayCursor{
IntegerArrayCursor: cur,
windowEnd: math.MinInt64,
@ -1304,7 +1304,7 @@ NEXT:
c.res.Timestamps[cur] = t
c.res.Values[cur] = a.Values[i]
c.windowEnd = int64(c.window.GetEarliestBounds(values.Time(t)).Stop)
c.windowEnd = int64(c.window.GetLatestBounds(values.Time(t)).Stop())
}
c.tmp.Timestamps = nil
@ -1318,12 +1318,12 @@ type integerWindowFirstArrayCursor struct {
windowEnd int64
res *cursors.IntegerArray
tmp *cursors.IntegerArray
window execute.Window
window interval.Window
}
// Window array cursors assume that every != 0 && every != MaxInt64.
// Such a cursor will panic in the first case and possibly overflow in the second.
func newIntegerWindowFirstArrayCursor(cur cursors.IntegerArrayCursor, window execute.Window) *integerWindowFirstArrayCursor {
func newIntegerWindowFirstArrayCursor(cur cursors.IntegerArrayCursor, window interval.Window) *integerWindowFirstArrayCursor {
return &integerWindowFirstArrayCursor{
IntegerArrayCursor: cur,
windowEnd: math.MinInt64,
@ -1359,7 +1359,7 @@ NEXT:
continue
}
c.windowEnd = int64(c.window.GetEarliestBounds(values.Time(t)).Stop)
c.windowEnd = int64(c.window.GetLatestBounds(values.Time(t)).Stop())
c.res.Timestamps = append(c.res.Timestamps, t)
c.res.Values = append(c.res.Values, a.Values[i])
@ -1381,12 +1381,12 @@ type integerWindowCountArrayCursor struct {
cursors.IntegerArrayCursor
res *cursors.IntegerArray
tmp *cursors.IntegerArray
window execute.Window
window interval.Window
}
func newIntegerWindowCountArrayCursor(cur cursors.IntegerArrayCursor, window execute.Window) *integerWindowCountArrayCursor {
func newIntegerWindowCountArrayCursor(cur cursors.IntegerArrayCursor, window interval.Window) *integerWindowCountArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &integerWindowCountArrayCursor{
@ -1421,8 +1421,8 @@ func (c *integerWindowCountArrayCursor) Next() *cursors.IntegerArray {
var acc int64 = 0
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -1433,7 +1433,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -1452,7 +1452,7 @@ WINDOWS:
// start the new window
acc = 0
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
@ -1492,12 +1492,12 @@ type integerWindowSumArrayCursor struct {
cursors.IntegerArrayCursor
res *cursors.IntegerArray
tmp *cursors.IntegerArray
window execute.Window
window interval.Window
}
func newIntegerWindowSumArrayCursor(cur cursors.IntegerArrayCursor, window execute.Window) *integerWindowSumArrayCursor {
func newIntegerWindowSumArrayCursor(cur cursors.IntegerArrayCursor, window interval.Window) *integerWindowSumArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &integerWindowSumArrayCursor{
@ -1532,8 +1532,8 @@ func (c *integerWindowSumArrayCursor) Next() *cursors.IntegerArray {
var acc int64 = 0
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -1544,7 +1544,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -1563,7 +1563,7 @@ WINDOWS:
// start the new window
acc = 0
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
@ -1603,12 +1603,12 @@ type integerWindowMinArrayCursor struct {
cursors.IntegerArrayCursor
res *cursors.IntegerArray
tmp *cursors.IntegerArray
window execute.Window
window interval.Window
}
func newIntegerWindowMinArrayCursor(cur cursors.IntegerArrayCursor, window execute.Window) *integerWindowMinArrayCursor {
func newIntegerWindowMinArrayCursor(cur cursors.IntegerArrayCursor, window interval.Window) *integerWindowMinArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &integerWindowMinArrayCursor{
@ -1644,8 +1644,8 @@ func (c *integerWindowMinArrayCursor) Next() *cursors.IntegerArray {
var tsAcc int64
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -1656,7 +1656,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -1675,7 +1675,7 @@ WINDOWS:
// start the new window
acc = math.MaxInt64
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
@ -1718,12 +1718,12 @@ type integerWindowMaxArrayCursor struct {
cursors.IntegerArrayCursor
res *cursors.IntegerArray
tmp *cursors.IntegerArray
window execute.Window
window interval.Window
}
func newIntegerWindowMaxArrayCursor(cur cursors.IntegerArrayCursor, window execute.Window) *integerWindowMaxArrayCursor {
func newIntegerWindowMaxArrayCursor(cur cursors.IntegerArrayCursor, window interval.Window) *integerWindowMaxArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &integerWindowMaxArrayCursor{
@ -1759,8 +1759,8 @@ func (c *integerWindowMaxArrayCursor) Next() *cursors.IntegerArray {
var tsAcc int64
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -1771,7 +1771,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -1790,7 +1790,7 @@ WINDOWS:
// start the new window
acc = math.MinInt64
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
@ -1833,12 +1833,12 @@ type integerWindowMeanArrayCursor struct {
cursors.IntegerArrayCursor
res *cursors.FloatArray
tmp *cursors.IntegerArray
window execute.Window
window interval.Window
}
func newIntegerWindowMeanArrayCursor(cur cursors.IntegerArrayCursor, window execute.Window) *integerWindowMeanArrayCursor {
func newIntegerWindowMeanArrayCursor(cur cursors.IntegerArrayCursor, window interval.Window) *integerWindowMeanArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &integerWindowMeanArrayCursor{
@ -1874,8 +1874,8 @@ func (c *integerWindowMeanArrayCursor) Next() *cursors.FloatArray {
var count int64
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -1886,7 +1886,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -1906,7 +1906,7 @@ WINDOWS:
// start the new window
sum = 0
count = 0
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
@ -2132,12 +2132,12 @@ type unsignedWindowLastArrayCursor struct {
windowEnd int64
res *cursors.UnsignedArray
tmp *cursors.UnsignedArray
window execute.Window
window interval.Window
}
// Window array cursors assume that every != 0 && every != MaxInt64.
// Such a cursor will panic in the first case and possibly overflow in the second.
func newUnsignedWindowLastArrayCursor(cur cursors.UnsignedArrayCursor, window execute.Window) *unsignedWindowLastArrayCursor {
func newUnsignedWindowLastArrayCursor(cur cursors.UnsignedArrayCursor, window interval.Window) *unsignedWindowLastArrayCursor {
return &unsignedWindowLastArrayCursor{
UnsignedArrayCursor: cur,
windowEnd: math.MinInt64,
@ -2183,7 +2183,7 @@ NEXT:
c.res.Timestamps[cur] = t
c.res.Values[cur] = a.Values[i]
c.windowEnd = int64(c.window.GetEarliestBounds(values.Time(t)).Stop)
c.windowEnd = int64(c.window.GetLatestBounds(values.Time(t)).Stop())
}
c.tmp.Timestamps = nil
@ -2197,12 +2197,12 @@ type unsignedWindowFirstArrayCursor struct {
windowEnd int64
res *cursors.UnsignedArray
tmp *cursors.UnsignedArray
window execute.Window
window interval.Window
}
// Window array cursors assume that every != 0 && every != MaxInt64.
// Such a cursor will panic in the first case and possibly overflow in the second.
func newUnsignedWindowFirstArrayCursor(cur cursors.UnsignedArrayCursor, window execute.Window) *unsignedWindowFirstArrayCursor {
func newUnsignedWindowFirstArrayCursor(cur cursors.UnsignedArrayCursor, window interval.Window) *unsignedWindowFirstArrayCursor {
return &unsignedWindowFirstArrayCursor{
UnsignedArrayCursor: cur,
windowEnd: math.MinInt64,
@ -2238,7 +2238,7 @@ NEXT:
continue
}
c.windowEnd = int64(c.window.GetEarliestBounds(values.Time(t)).Stop)
c.windowEnd = int64(c.window.GetLatestBounds(values.Time(t)).Stop())
c.res.Timestamps = append(c.res.Timestamps, t)
c.res.Values = append(c.res.Values, a.Values[i])
@ -2260,12 +2260,12 @@ type unsignedWindowCountArrayCursor struct {
cursors.UnsignedArrayCursor
res *cursors.IntegerArray
tmp *cursors.UnsignedArray
window execute.Window
window interval.Window
}
func newUnsignedWindowCountArrayCursor(cur cursors.UnsignedArrayCursor, window execute.Window) *unsignedWindowCountArrayCursor {
func newUnsignedWindowCountArrayCursor(cur cursors.UnsignedArrayCursor, window interval.Window) *unsignedWindowCountArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &unsignedWindowCountArrayCursor{
@ -2300,8 +2300,8 @@ func (c *unsignedWindowCountArrayCursor) Next() *cursors.IntegerArray {
var acc int64 = 0
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -2312,7 +2312,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -2331,7 +2331,7 @@ WINDOWS:
// start the new window
acc = 0
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
@ -2371,12 +2371,12 @@ type unsignedWindowSumArrayCursor struct {
cursors.UnsignedArrayCursor
res *cursors.UnsignedArray
tmp *cursors.UnsignedArray
window execute.Window
window interval.Window
}
func newUnsignedWindowSumArrayCursor(cur cursors.UnsignedArrayCursor, window execute.Window) *unsignedWindowSumArrayCursor {
func newUnsignedWindowSumArrayCursor(cur cursors.UnsignedArrayCursor, window interval.Window) *unsignedWindowSumArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &unsignedWindowSumArrayCursor{
@ -2411,8 +2411,8 @@ func (c *unsignedWindowSumArrayCursor) Next() *cursors.UnsignedArray {
var acc uint64 = 0
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -2423,7 +2423,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -2442,7 +2442,7 @@ WINDOWS:
// start the new window
acc = 0
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
@ -2482,12 +2482,12 @@ type unsignedWindowMinArrayCursor struct {
cursors.UnsignedArrayCursor
res *cursors.UnsignedArray
tmp *cursors.UnsignedArray
window execute.Window
window interval.Window
}
func newUnsignedWindowMinArrayCursor(cur cursors.UnsignedArrayCursor, window execute.Window) *unsignedWindowMinArrayCursor {
func newUnsignedWindowMinArrayCursor(cur cursors.UnsignedArrayCursor, window interval.Window) *unsignedWindowMinArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &unsignedWindowMinArrayCursor{
@ -2523,8 +2523,8 @@ func (c *unsignedWindowMinArrayCursor) Next() *cursors.UnsignedArray {
var tsAcc int64
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -2535,7 +2535,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -2554,7 +2554,7 @@ WINDOWS:
// start the new window
acc = math.MaxUint64
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
@ -2597,12 +2597,12 @@ type unsignedWindowMaxArrayCursor struct {
cursors.UnsignedArrayCursor
res *cursors.UnsignedArray
tmp *cursors.UnsignedArray
window execute.Window
window interval.Window
}
func newUnsignedWindowMaxArrayCursor(cur cursors.UnsignedArrayCursor, window execute.Window) *unsignedWindowMaxArrayCursor {
func newUnsignedWindowMaxArrayCursor(cur cursors.UnsignedArrayCursor, window interval.Window) *unsignedWindowMaxArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &unsignedWindowMaxArrayCursor{
@ -2638,8 +2638,8 @@ func (c *unsignedWindowMaxArrayCursor) Next() *cursors.UnsignedArray {
var tsAcc int64
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -2650,7 +2650,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -2669,7 +2669,7 @@ WINDOWS:
// start the new window
acc = 0
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
@ -2712,12 +2712,12 @@ type unsignedWindowMeanArrayCursor struct {
cursors.UnsignedArrayCursor
res *cursors.FloatArray
tmp *cursors.UnsignedArray
window execute.Window
window interval.Window
}
func newUnsignedWindowMeanArrayCursor(cur cursors.UnsignedArrayCursor, window execute.Window) *unsignedWindowMeanArrayCursor {
func newUnsignedWindowMeanArrayCursor(cur cursors.UnsignedArrayCursor, window interval.Window) *unsignedWindowMeanArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &unsignedWindowMeanArrayCursor{
@ -2753,8 +2753,8 @@ func (c *unsignedWindowMeanArrayCursor) Next() *cursors.FloatArray {
var count int64
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -2765,7 +2765,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -2785,7 +2785,7 @@ WINDOWS:
// start the new window
sum = 0
count = 0
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
@ -3011,12 +3011,12 @@ type stringWindowLastArrayCursor struct {
windowEnd int64
res *cursors.StringArray
tmp *cursors.StringArray
window execute.Window
window interval.Window
}
// Window array cursors assume that every != 0 && every != MaxInt64.
// Such a cursor will panic in the first case and possibly overflow in the second.
func newStringWindowLastArrayCursor(cur cursors.StringArrayCursor, window execute.Window) *stringWindowLastArrayCursor {
func newStringWindowLastArrayCursor(cur cursors.StringArrayCursor, window interval.Window) *stringWindowLastArrayCursor {
return &stringWindowLastArrayCursor{
StringArrayCursor: cur,
windowEnd: math.MinInt64,
@ -3062,7 +3062,7 @@ NEXT:
c.res.Timestamps[cur] = t
c.res.Values[cur] = a.Values[i]
c.windowEnd = int64(c.window.GetEarliestBounds(values.Time(t)).Stop)
c.windowEnd = int64(c.window.GetLatestBounds(values.Time(t)).Stop())
}
c.tmp.Timestamps = nil
@ -3076,12 +3076,12 @@ type stringWindowFirstArrayCursor struct {
windowEnd int64
res *cursors.StringArray
tmp *cursors.StringArray
window execute.Window
window interval.Window
}
// Window array cursors assume that every != 0 && every != MaxInt64.
// Such a cursor will panic in the first case and possibly overflow in the second.
func newStringWindowFirstArrayCursor(cur cursors.StringArrayCursor, window execute.Window) *stringWindowFirstArrayCursor {
func newStringWindowFirstArrayCursor(cur cursors.StringArrayCursor, window interval.Window) *stringWindowFirstArrayCursor {
return &stringWindowFirstArrayCursor{
StringArrayCursor: cur,
windowEnd: math.MinInt64,
@ -3117,7 +3117,7 @@ NEXT:
continue
}
c.windowEnd = int64(c.window.GetEarliestBounds(values.Time(t)).Stop)
c.windowEnd = int64(c.window.GetLatestBounds(values.Time(t)).Stop())
c.res.Timestamps = append(c.res.Timestamps, t)
c.res.Values = append(c.res.Values, a.Values[i])
@ -3139,12 +3139,12 @@ type stringWindowCountArrayCursor struct {
cursors.StringArrayCursor
res *cursors.IntegerArray
tmp *cursors.StringArray
window execute.Window
window interval.Window
}
func newStringWindowCountArrayCursor(cur cursors.StringArrayCursor, window execute.Window) *stringWindowCountArrayCursor {
func newStringWindowCountArrayCursor(cur cursors.StringArrayCursor, window interval.Window) *stringWindowCountArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &stringWindowCountArrayCursor{
@ -3179,8 +3179,8 @@ func (c *stringWindowCountArrayCursor) Next() *cursors.IntegerArray {
var acc int64 = 0
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -3191,7 +3191,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -3210,7 +3210,7 @@ WINDOWS:
// start the new window
acc = 0
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
@ -3435,12 +3435,12 @@ type booleanWindowLastArrayCursor struct {
windowEnd int64
res *cursors.BooleanArray
tmp *cursors.BooleanArray
window execute.Window
window interval.Window
}
// Window array cursors assume that every != 0 && every != MaxInt64.
// Such a cursor will panic in the first case and possibly overflow in the second.
func newBooleanWindowLastArrayCursor(cur cursors.BooleanArrayCursor, window execute.Window) *booleanWindowLastArrayCursor {
func newBooleanWindowLastArrayCursor(cur cursors.BooleanArrayCursor, window interval.Window) *booleanWindowLastArrayCursor {
return &booleanWindowLastArrayCursor{
BooleanArrayCursor: cur,
windowEnd: math.MinInt64,
@ -3486,7 +3486,7 @@ NEXT:
c.res.Timestamps[cur] = t
c.res.Values[cur] = a.Values[i]
c.windowEnd = int64(c.window.GetEarliestBounds(values.Time(t)).Stop)
c.windowEnd = int64(c.window.GetLatestBounds(values.Time(t)).Stop())
}
c.tmp.Timestamps = nil
@ -3500,12 +3500,12 @@ type booleanWindowFirstArrayCursor struct {
windowEnd int64
res *cursors.BooleanArray
tmp *cursors.BooleanArray
window execute.Window
window interval.Window
}
// Window array cursors assume that every != 0 && every != MaxInt64.
// Such a cursor will panic in the first case and possibly overflow in the second.
func newBooleanWindowFirstArrayCursor(cur cursors.BooleanArrayCursor, window execute.Window) *booleanWindowFirstArrayCursor {
func newBooleanWindowFirstArrayCursor(cur cursors.BooleanArrayCursor, window interval.Window) *booleanWindowFirstArrayCursor {
return &booleanWindowFirstArrayCursor{
BooleanArrayCursor: cur,
windowEnd: math.MinInt64,
@ -3541,7 +3541,7 @@ NEXT:
continue
}
c.windowEnd = int64(c.window.GetEarliestBounds(values.Time(t)).Stop)
c.windowEnd = int64(c.window.GetLatestBounds(values.Time(t)).Stop())
c.res.Timestamps = append(c.res.Timestamps, t)
c.res.Values = append(c.res.Values, a.Values[i])
@ -3563,12 +3563,12 @@ type booleanWindowCountArrayCursor struct {
cursors.BooleanArrayCursor
res *cursors.IntegerArray
tmp *cursors.BooleanArray
window execute.Window
window interval.Window
}
func newBooleanWindowCountArrayCursor(cur cursors.BooleanArrayCursor, window execute.Window) *booleanWindowCountArrayCursor {
func newBooleanWindowCountArrayCursor(cur cursors.BooleanArrayCursor, window interval.Window) *booleanWindowCountArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &booleanWindowCountArrayCursor{
@ -3603,8 +3603,8 @@ func (c *booleanWindowCountArrayCursor) Next() *cursors.IntegerArray {
var acc int64 = 0
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -3615,7 +3615,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -3634,7 +3634,7 @@ WINDOWS:
// start the new window
acc = 0
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS

View File

@ -5,7 +5,7 @@ import (
"fmt"
"math"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/interval"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
@ -29,8 +29,8 @@ func newLimitArrayCursor(cur cursors.Cursor) cursors.Cursor {
}
}
func newWindowFirstArrayCursor(cur cursors.Cursor, window execute.Window) cursors.Cursor {
if window.Every.IsZero() {
func newWindowFirstArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor {
if window.IsZero() {
return newLimitArrayCursor(cur)
}
switch cur := cur.(type) {
@ -43,8 +43,8 @@ func newWindowFirstArrayCursor(cur cursors.Cursor, window execute.Window) cursor
}
}
func newWindowLastArrayCursor(cur cursors.Cursor, window execute.Window) cursors.Cursor {
if window.Every.IsZero() {
func newWindowLastArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor {
if window.IsZero() {
return newLimitArrayCursor(cur)
}
switch cur := cur.(type) {
@ -57,7 +57,7 @@ func newWindowLastArrayCursor(cur cursors.Cursor, window execute.Window) cursors
}
}
func newWindowCountArrayCursor(cur cursors.Cursor, window execute.Window) cursors.Cursor {
func newWindowCountArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor {
switch cur := cur.(type) {
{{range .}}{{/* every type supports count */}}
case cursors.{{.Name}}ArrayCursor:
@ -68,7 +68,7 @@ func newWindowCountArrayCursor(cur cursors.Cursor, window execute.Window) cursor
}
}
func newWindowSumArrayCursor(cur cursors.Cursor, window execute.Window) (cursors.Cursor, error) {
func newWindowSumArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) {
switch cur := cur.(type) {
{{range .}}
{{$Type := .Name}}
@ -87,7 +87,7 @@ func newWindowSumArrayCursor(cur cursors.Cursor, window execute.Window) (cursors
}
}
func newWindowMinArrayCursor(cur cursors.Cursor, window execute.Window) cursors.Cursor {
func newWindowMinArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor {
switch cur := cur.(type) {
{{range .}}
{{$Type := .Name}}
@ -103,7 +103,7 @@ func newWindowMinArrayCursor(cur cursors.Cursor, window execute.Window) cursors.
}
}
func newWindowMaxArrayCursor(cur cursors.Cursor, window execute.Window) cursors.Cursor {
func newWindowMaxArrayCursor(cur cursors.Cursor, window interval.Window) cursors.Cursor {
switch cur := cur.(type) {
{{range .}}
{{$Type := .Name}}
@ -119,7 +119,7 @@ func newWindowMaxArrayCursor(cur cursors.Cursor, window execute.Window) cursors.
}
}
func newWindowMeanArrayCursor(cur cursors.Cursor, window execute.Window) (cursors.Cursor, error) {
func newWindowMeanArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) {
switch cur := cur.(type) {
{{range .}}
{{$Type := .Name}}
@ -322,12 +322,12 @@ type {{.name}}WindowLastArrayCursor struct {
windowEnd int64
res {{$arrayType}}
tmp {{$arrayType}}
window execute.Window
window interval.Window
}
// Window array cursors assume that every != 0 && every != MaxInt64.
// Such a cursor will panic in the first case and possibly overflow in the second.
func new{{.Name}}WindowLastArrayCursor(cur cursors.{{.Name}}ArrayCursor, window execute.Window) *{{.name}}WindowLastArrayCursor {
func new{{.Name}}WindowLastArrayCursor(cur cursors.{{.Name}}ArrayCursor, window interval.Window) *{{.name}}WindowLastArrayCursor {
return &{{.name}}WindowLastArrayCursor{
{{.Name}}ArrayCursor: cur,
windowEnd: math.MinInt64,
@ -373,7 +373,7 @@ NEXT:
c.res.Timestamps[cur] = t
c.res.Values[cur] = a.Values[i]
c.windowEnd = int64(c.window.GetEarliestBounds(values.Time(t)).Stop)
c.windowEnd = int64(c.window.GetLatestBounds(values.Time(t)).Stop())
}
c.tmp.Timestamps = nil
@ -387,12 +387,12 @@ type {{.name}}WindowFirstArrayCursor struct {
windowEnd int64
res {{$arrayType}}
tmp {{$arrayType}}
window execute.Window
window interval.Window
}
// Window array cursors assume that every != 0 && every != MaxInt64.
// Such a cursor will panic in the first case and possibly overflow in the second.
func new{{.Name}}WindowFirstArrayCursor(cur cursors.{{.Name}}ArrayCursor, window execute.Window) *{{.name}}WindowFirstArrayCursor {
func new{{.Name}}WindowFirstArrayCursor(cur cursors.{{.Name}}ArrayCursor, window interval.Window) *{{.name}}WindowFirstArrayCursor {
return &{{.name}}WindowFirstArrayCursor{
{{.Name}}ArrayCursor: cur,
windowEnd: math.MinInt64,
@ -428,7 +428,7 @@ NEXT:
continue
}
c.windowEnd = int64(c.window.GetEarliestBounds(values.Time(t)).Stop)
c.windowEnd = int64(c.window.GetLatestBounds(values.Time(t)).Stop())
c.res.Timestamps = append(c.res.Timestamps, t)
c.res.Values = append(c.res.Values, a.Values[i])
@ -456,12 +456,12 @@ type {{$name}}Window{{$aggName}}ArrayCursor struct {
cursors.{{$Name}}ArrayCursor
res *cursors.{{.OutputTypeName}}Array
tmp {{$arrayType}}
window execute.Window
window interval.Window
}
func new{{$Name}}Window{{$aggName}}ArrayCursor(cur cursors.{{$Name}}ArrayCursor, window execute.Window) *{{$name}}Window{{$aggName}}ArrayCursor {
func new{{$Name}}Window{{$aggName}}ArrayCursor(cur cursors.{{$Name}}ArrayCursor, window interval.Window) *{{$name}}Window{{$aggName}}ArrayCursor {
resLen := MaxPointsPerBlock
if window.Every.IsZero() {
if window.IsZero() {
resLen = 1
}
return &{{$name}}Window{{$aggName}}ArrayCursor{
@ -496,8 +496,8 @@ func (c *{{$name}}Window{{$aggName}}ArrayCursor) Next() *cursors.{{.OutputTypeNa
{{.AccDecls}}
var windowEnd int64
if !c.window.Every.IsZero() {
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
@ -508,7 +508,7 @@ WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.Every.IsZero() && ts >= windowEnd {
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
@ -526,7 +526,7 @@ WINDOWS:
// start the new window
{{.AccReset}}
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS

View File

@ -4,7 +4,7 @@ import (
"context"
"fmt"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/interval"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
)
@ -22,10 +22,10 @@ func newAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, curs
case datatypes.AggregateTypeFirst, datatypes.AggregateTypeLast:
return newLimitArrayCursor(cursor), nil
}
return newWindowAggregateArrayCursor(ctx, agg, execute.Window{}, cursor)
return newWindowAggregateArrayCursor(ctx, agg, interval.Window{}, cursor)
}
func newWindowAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, window execute.Window, cursor cursors.Cursor) (cursors.Cursor, error) {
func newWindowAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, window interval.Window, cursor cursors.Cursor) (cursors.Cursor, error) {
if cursor == nil {
return nil, nil
}

View File

@ -12,12 +12,14 @@ import (
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/interval"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
)
var cmpOptions = cmp.AllowUnexported(interval.Window{})
type MockFloatArrayCursor struct {
CloseFunc func()
ErrFunc func() error
@ -45,7 +47,7 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowCountArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -63,7 +65,7 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowSumArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowSumArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -81,7 +83,7 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMinArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMinArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -99,7 +101,7 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMaxArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMaxArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -117,7 +119,7 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMeanArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMeanArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -127,10 +129,11 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
func TestNewWindowAggregateArrayCursorMonths_Float(t *testing.T) {
t.Run("Count", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &floatWindowCountArrayCursor{
FloatArrayCursor: &MockFloatArrayCursor{},
@ -145,16 +148,17 @@ func TestNewWindowAggregateArrayCursorMonths_Float(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowCountArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Sum", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &floatWindowSumArrayCursor{
FloatArrayCursor: &MockFloatArrayCursor{},
@ -169,16 +173,17 @@ func TestNewWindowAggregateArrayCursorMonths_Float(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowSumArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowSumArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Min", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &floatWindowMinArrayCursor{
FloatArrayCursor: &MockFloatArrayCursor{},
@ -193,16 +198,17 @@ func TestNewWindowAggregateArrayCursorMonths_Float(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMinArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMinArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Max", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &floatWindowMaxArrayCursor{
FloatArrayCursor: &MockFloatArrayCursor{},
@ -217,16 +223,17 @@ func TestNewWindowAggregateArrayCursorMonths_Float(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMaxArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMaxArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Mean", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &floatWindowMeanArrayCursor{
FloatArrayCursor: &MockFloatArrayCursor{},
@ -241,7 +248,7 @@ func TestNewWindowAggregateArrayCursorMonths_Float(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMeanArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMeanArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -251,10 +258,11 @@ func TestNewWindowAggregateArrayCursorMonths_Float(t *testing.T) {
func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
t.Run("Count", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &floatWindowCountArrayCursor{
FloatArrayCursor: &MockFloatArrayCursor{},
@ -269,16 +277,17 @@ func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowCountArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Sum", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &floatWindowSumArrayCursor{
FloatArrayCursor: &MockFloatArrayCursor{},
@ -293,16 +302,17 @@ func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowSumArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowSumArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Min", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &floatWindowMinArrayCursor{
FloatArrayCursor: &MockFloatArrayCursor{},
@ -317,16 +327,17 @@ func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMinArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMinArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Max", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &floatWindowMaxArrayCursor{
FloatArrayCursor: &MockFloatArrayCursor{},
@ -341,16 +352,17 @@ func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMaxArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMaxArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Mean", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &floatWindowMeanArrayCursor{
FloatArrayCursor: &MockFloatArrayCursor{},
@ -365,7 +377,7 @@ func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMeanArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMeanArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -399,7 +411,7 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowCountArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -417,7 +429,7 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowSumArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowSumArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -435,7 +447,7 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMinArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMinArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -453,7 +465,7 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMaxArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMaxArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -471,7 +483,7 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMeanArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMeanArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -481,10 +493,11 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
func TestNewWindowAggregateArrayCursorMonths_Integer(t *testing.T) {
t.Run("Count", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &integerWindowCountArrayCursor{
IntegerArrayCursor: &MockIntegerArrayCursor{},
@ -499,16 +512,17 @@ func TestNewWindowAggregateArrayCursorMonths_Integer(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowCountArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Sum", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &integerWindowSumArrayCursor{
IntegerArrayCursor: &MockIntegerArrayCursor{},
@ -523,16 +537,17 @@ func TestNewWindowAggregateArrayCursorMonths_Integer(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowSumArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowSumArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Min", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &integerWindowMinArrayCursor{
IntegerArrayCursor: &MockIntegerArrayCursor{},
@ -547,16 +562,17 @@ func TestNewWindowAggregateArrayCursorMonths_Integer(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMinArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMinArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Max", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &integerWindowMaxArrayCursor{
IntegerArrayCursor: &MockIntegerArrayCursor{},
@ -571,16 +587,17 @@ func TestNewWindowAggregateArrayCursorMonths_Integer(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMaxArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMaxArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Mean", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &integerWindowMeanArrayCursor{
IntegerArrayCursor: &MockIntegerArrayCursor{},
@ -595,7 +612,7 @@ func TestNewWindowAggregateArrayCursorMonths_Integer(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMeanArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMeanArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -605,10 +622,11 @@ func TestNewWindowAggregateArrayCursorMonths_Integer(t *testing.T) {
func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
t.Run("Count", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &integerWindowCountArrayCursor{
IntegerArrayCursor: &MockIntegerArrayCursor{},
@ -623,16 +641,17 @@ func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowCountArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Sum", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &integerWindowSumArrayCursor{
IntegerArrayCursor: &MockIntegerArrayCursor{},
@ -647,16 +666,17 @@ func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowSumArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowSumArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Min", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &integerWindowMinArrayCursor{
IntegerArrayCursor: &MockIntegerArrayCursor{},
@ -671,16 +691,17 @@ func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMinArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMinArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Max", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &integerWindowMaxArrayCursor{
IntegerArrayCursor: &MockIntegerArrayCursor{},
@ -695,16 +716,17 @@ func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMaxArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMaxArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Mean", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &integerWindowMeanArrayCursor{
IntegerArrayCursor: &MockIntegerArrayCursor{},
@ -719,7 +741,7 @@ func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMeanArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMeanArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -753,7 +775,7 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowCountArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -771,7 +793,7 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowSumArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowSumArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -789,7 +811,7 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMinArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMinArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -807,7 +829,7 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMaxArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMaxArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -825,7 +847,7 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMeanArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMeanArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -835,10 +857,11 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
func TestNewWindowAggregateArrayCursorMonths_Unsigned(t *testing.T) {
t.Run("Count", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &unsignedWindowCountArrayCursor{
UnsignedArrayCursor: &MockUnsignedArrayCursor{},
@ -853,16 +876,17 @@ func TestNewWindowAggregateArrayCursorMonths_Unsigned(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowCountArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Sum", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &unsignedWindowSumArrayCursor{
UnsignedArrayCursor: &MockUnsignedArrayCursor{},
@ -877,16 +901,17 @@ func TestNewWindowAggregateArrayCursorMonths_Unsigned(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowSumArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowSumArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Min", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &unsignedWindowMinArrayCursor{
UnsignedArrayCursor: &MockUnsignedArrayCursor{},
@ -901,16 +926,17 @@ func TestNewWindowAggregateArrayCursorMonths_Unsigned(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMinArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMinArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Max", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &unsignedWindowMaxArrayCursor{
UnsignedArrayCursor: &MockUnsignedArrayCursor{},
@ -925,16 +951,17 @@ func TestNewWindowAggregateArrayCursorMonths_Unsigned(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMaxArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMaxArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Mean", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &unsignedWindowMeanArrayCursor{
UnsignedArrayCursor: &MockUnsignedArrayCursor{},
@ -949,7 +976,7 @@ func TestNewWindowAggregateArrayCursorMonths_Unsigned(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMeanArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMeanArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -959,10 +986,11 @@ func TestNewWindowAggregateArrayCursorMonths_Unsigned(t *testing.T) {
func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
t.Run("Count", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &unsignedWindowCountArrayCursor{
UnsignedArrayCursor: &MockUnsignedArrayCursor{},
@ -977,16 +1005,17 @@ func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowCountArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Sum", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &unsignedWindowSumArrayCursor{
UnsignedArrayCursor: &MockUnsignedArrayCursor{},
@ -1001,16 +1030,17 @@ func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowSumArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowSumArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Min", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &unsignedWindowMinArrayCursor{
UnsignedArrayCursor: &MockUnsignedArrayCursor{},
@ -1025,16 +1055,17 @@ func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMinArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMinArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Max", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &unsignedWindowMaxArrayCursor{
UnsignedArrayCursor: &MockUnsignedArrayCursor{},
@ -1049,16 +1080,17 @@ func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMaxArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMaxArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
t.Run("Mean", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &unsignedWindowMeanArrayCursor{
UnsignedArrayCursor: &MockUnsignedArrayCursor{},
@ -1073,7 +1105,7 @@ func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMeanArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMeanArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -1107,7 +1139,7 @@ func TestNewAggregateArrayCursor_String(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockStringArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(stringWindowCountArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(stringWindowCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -1117,10 +1149,11 @@ func TestNewAggregateArrayCursor_String(t *testing.T) {
func TestNewWindowAggregateArrayCursorMonths_String(t *testing.T) {
t.Run("Count", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &stringWindowCountArrayCursor{
StringArrayCursor: &MockStringArrayCursor{},
@ -1135,7 +1168,7 @@ func TestNewWindowAggregateArrayCursorMonths_String(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockStringArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(stringWindowCountArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(stringWindowCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -1145,10 +1178,11 @@ func TestNewWindowAggregateArrayCursorMonths_String(t *testing.T) {
func TestNewWindowAggregateArrayCursor_String(t *testing.T) {
t.Run("Count", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &stringWindowCountArrayCursor{
StringArrayCursor: &MockStringArrayCursor{},
@ -1163,7 +1197,7 @@ func TestNewWindowAggregateArrayCursor_String(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockStringArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(stringWindowCountArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(stringWindowCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -1197,7 +1231,7 @@ func TestNewAggregateArrayCursor_Boolean(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockBooleanArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(booleanWindowCountArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(booleanWindowCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -1207,10 +1241,11 @@ func TestNewAggregateArrayCursor_Boolean(t *testing.T) {
func TestNewWindowAggregateArrayCursorMonths_Boolean(t *testing.T) {
t.Run("Count", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &booleanWindowCountArrayCursor{
BooleanArrayCursor: &MockBooleanArrayCursor{},
@ -1225,7 +1260,7 @@ func TestNewWindowAggregateArrayCursorMonths_Boolean(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockBooleanArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(booleanWindowCountArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(booleanWindowCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -1235,10 +1270,11 @@ func TestNewWindowAggregateArrayCursorMonths_Boolean(t *testing.T) {
func TestNewWindowAggregateArrayCursor_Boolean(t *testing.T) {
t.Run("Count", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &booleanWindowCountArrayCursor{
BooleanArrayCursor: &MockBooleanArrayCursor{},
@ -1253,7 +1289,7 @@ func TestNewWindowAggregateArrayCursor_Boolean(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockBooleanArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(booleanWindowCountArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported(booleanWindowCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})

View File

@ -5,13 +5,15 @@ import (
"testing"
"time"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/interval"
"github.com/influxdata/flux/values"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
)
var cmpOptions = cmp.AllowUnexported(interval.Window{})
{{range .}}
{{$ColType := .Name}}
{{$colType := .name}}
@ -44,7 +46,7 @@ func TestNewAggregateArrayCursor_{{$ColType}}(t *testing.T) {
got, _ := newAggregateArrayCursor(context.Background(), agg, &Mock{{$ColType}}ArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported({{$colType}}Window{{$Agg}}ArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported({{$colType}}Window{{$Agg}}ArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -55,10 +57,11 @@ func TestNewWindowAggregateArrayCursorMonths_{{$ColType}}(t *testing.T) {
{{range .Aggs}}
{{$Agg := .Name}}
t.Run("{{$Agg}}", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(int64(time.Hour), 0, false),
Period: values.MakeDuration(int64(time.Hour), 0, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &{{$colType}}Window{{$Agg}}ArrayCursor{
{{$ColType}}ArrayCursor: &Mock{{$ColType}}ArrayCursor{},
@ -73,7 +76,7 @@ func TestNewWindowAggregateArrayCursorMonths_{{$ColType}}(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &Mock{{$ColType}}ArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported({{$colType}}Window{{$Agg}}ArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported({{$colType}}Window{{$Agg}}ArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
@ -84,10 +87,11 @@ func TestNewWindowAggregateArrayCursor_{{$ColType}}(t *testing.T) {
{{range .Aggs}}
{{$Agg := .Name}}
t.Run("{{$Agg}}", func(t *testing.T) {
window := execute.Window{
Every: values.MakeDuration(0, 1, false),
Period: values.MakeDuration(0, 1, false),
}
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &{{$colType}}Window{{$Agg}}ArrayCursor{
{{$ColType}}ArrayCursor: &Mock{{$ColType}}ArrayCursor{},
@ -102,7 +106,7 @@ func TestNewWindowAggregateArrayCursor_{{$ColType}}(t *testing.T) {
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &Mock{{$ColType}}ArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported({{$colType}}Window{{$Agg}}ArrayCursor{})); diff != "" {
if diff := cmp.Diff(got, want, cmp.AllowUnexported({{$colType}}Window{{$Agg}}ArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})

View File

@ -8,7 +8,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/interval"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2/models"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
@ -101,13 +101,13 @@ func copyFloatArray(src *cursors.FloatArray) *cursors.FloatArray {
type aggArrayCursorTest struct {
name string
createCursorFn func(cur cursors.IntegerArrayCursor, every, offset int64, window execute.Window) cursors.Cursor
createCursorFn func(cur cursors.IntegerArrayCursor, every, offset int64, window interval.Window) cursors.Cursor
every time.Duration
offset time.Duration
inputArrays []*cursors.IntegerArray
wantIntegers []*cursors.IntegerArray
wantFloats []*cursors.FloatArray
window execute.Window
window interval.Window
}
func (a *aggArrayCursorTest) run(t *testing.T) {
@ -616,21 +616,20 @@ func TestWindowFirstArrayCursor(t *testing.T) {
},
}
for _, tc := range testcases {
tc.createCursorFn = func(cur cursors.IntegerArrayCursor, every, offset int64, window execute.Window) cursors.Cursor {
tc.createCursorFn = func(cur cursors.IntegerArrayCursor, every, offset int64, window interval.Window) cursors.Cursor {
if every == 0 {
if window.Every.IsZero() {
if window.IsZero() {
return newIntegerLimitArrayCursor(cur)
}
}
// if either the every or offset are set, then create a window for nsec values
// every and window.Every should never BOTH be zero here
if every != 0 || offset != 0 {
everyDur := values.MakeDuration(every, 0, false)
offsetDur := values.MakeDuration(offset, 0, false)
window = execute.Window{
Every: everyDur,
Offset: offsetDur,
}
window, _ = interval.NewWindow(
values.MakeDuration(every, 0, false),
values.MakeDuration(every, 0, false),
values.MakeDuration(offset, 0, false),
)
}
// otherwise just use the window that was passed in
@ -828,14 +827,13 @@ func TestWindowLastArrayCursor(t *testing.T) {
},
}
for _, tc := range testcases {
tc.createCursorFn = func(cur cursors.IntegerArrayCursor, every, offset int64, window execute.Window) cursors.Cursor {
tc.createCursorFn = func(cur cursors.IntegerArrayCursor, every, offset int64, window interval.Window) cursors.Cursor {
if every != 0 || offset != 0 {
everyDur := values.MakeDuration(every, 0, false)
offsetDur := values.MakeDuration(offset, 0, false)
window = execute.Window{
Every: everyDur,
Offset: offsetDur,
}
window, _ = interval.NewWindow(
values.MakeDuration(every, 0, false),
values.MakeDuration(every, 0, false),
values.MakeDuration(offset, 0, false),
)
}
return newIntegerWindowLastArrayCursor(cur, window)
}
@ -1255,9 +1253,14 @@ func TestIntegerCountArrayCursor(t *testing.T) {
},
{
name: "monthly spans multiple periods",
window: execute.Window{
Every: values.MakeDuration(0, 1, false),
},
window: func() interval.Window {
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
return window
}(),
inputArrays: []*cursors.IntegerArray{
makeIntegerArray(
60,
@ -1276,10 +1279,14 @@ func TestIntegerCountArrayCursor(t *testing.T) {
},
{
name: "monthly window w/ offset",
window: execute.Window{
Every: values.MakeDuration(0, 1, false),
Offset: values.MakeDuration(1209600000000000, 0, false),
},
window: func() interval.Window {
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(1209600000000000, 0, false),
)
return window
}(),
inputArrays: []*cursors.IntegerArray{
makeIntegerArray(
60,
@ -1293,9 +1300,14 @@ func TestIntegerCountArrayCursor(t *testing.T) {
},
{
name: "monthly windows",
window: execute.Window{
Every: values.MakeDuration(0, 1, false),
},
window: func() interval.Window {
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
return window
}(),
inputArrays: []*cursors.IntegerArray{
makeIntegerArray(
60,
@ -1309,14 +1321,13 @@ func TestIntegerCountArrayCursor(t *testing.T) {
},
}
for _, tc := range testcases {
tc.createCursorFn = func(cur cursors.IntegerArrayCursor, every, offset int64, window execute.Window) cursors.Cursor {
tc.createCursorFn = func(cur cursors.IntegerArrayCursor, every, offset int64, window interval.Window) cursors.Cursor {
if every != 0 || offset != 0 {
everyDur := values.MakeDuration(every, 0, false)
offsetDur := values.MakeDuration(offset, 0, false)
window = execute.Window{
Every: everyDur,
Offset: offsetDur,
}
window, _ = interval.NewWindow(
values.MakeDuration(every, 0, false),
values.MakeDuration(every, 0, false),
values.MakeDuration(offset, 0, false),
)
}
return newIntegerWindowCountArrayCursor(cur, window)
}
@ -1587,14 +1598,13 @@ func TestIntegerSumArrayCursor(t *testing.T) {
},
}
for _, tc := range testcases {
tc.createCursorFn = func(cur cursors.IntegerArrayCursor, every, offset int64, window execute.Window) cursors.Cursor {
tc.createCursorFn = func(cur cursors.IntegerArrayCursor, every, offset int64, window interval.Window) cursors.Cursor {
if every != 0 || offset != 0 {
everyDur := values.MakeDuration(every, 0, false)
offsetDur := values.MakeDuration(offset, 0, false)
window = execute.Window{
Every: everyDur,
Offset: offsetDur,
}
window, _ = interval.NewWindow(
values.MakeDuration(every, 0, false),
values.MakeDuration(every, 0, false),
values.MakeDuration(offset, 0, false),
)
}
return newIntegerWindowSumArrayCursor(cur, window)
}
@ -1803,9 +1813,14 @@ func TestWindowMinArrayCursor(t *testing.T) {
},
{
name: "monthly windows",
window: execute.Window{
Every: values.MakeDuration(0, 1, false),
},
window: func() interval.Window {
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
return window
}(),
inputArrays: []*cursors.IntegerArray{
makeIntegerArray(
1,
@ -1819,14 +1834,13 @@ func TestWindowMinArrayCursor(t *testing.T) {
},
}
for _, tc := range testcases {
tc.createCursorFn = func(cur cursors.IntegerArrayCursor, every, offset int64, window execute.Window) cursors.Cursor {
tc.createCursorFn = func(cur cursors.IntegerArrayCursor, every, offset int64, window interval.Window) cursors.Cursor {
if every != 0 || offset != 0 {
everyDur := values.MakeDuration(every, 0, false)
offsetDur := values.MakeDuration(offset, 0, false)
window = execute.Window{
Every: everyDur,
Offset: offsetDur,
}
window, _ = interval.NewWindow(
values.MakeDuration(every, 0, false),
values.MakeDuration(every, 0, false),
values.MakeDuration(offset, 0, false),
)
}
return newIntegerWindowMinArrayCursor(cur, window)
}
@ -2035,14 +2049,13 @@ func TestWindowMaxArrayCursor(t *testing.T) {
},
}
for _, tc := range testcases {
tc.createCursorFn = func(cur cursors.IntegerArrayCursor, every, offset int64, window execute.Window) cursors.Cursor {
tc.createCursorFn = func(cur cursors.IntegerArrayCursor, every, offset int64, window interval.Window) cursors.Cursor {
if every != 0 || offset != 0 {
everyDur := values.MakeDuration(every, 0, false)
offsetDur := values.MakeDuration(offset, 0, false)
window = execute.Window{
Every: everyDur,
Offset: offsetDur,
}
window, _ = interval.NewWindow(
values.MakeDuration(every, 0, false),
values.MakeDuration(every, 0, false),
values.MakeDuration(offset, 0, false),
)
}
return newIntegerWindowMaxArrayCursor(cur, window)
}
@ -2142,14 +2155,13 @@ func TestWindowMeanArrayCursor(t *testing.T) {
},
}
for _, tc := range testcases {
tc.createCursorFn = func(cur cursors.IntegerArrayCursor, every, offset int64, window execute.Window) cursors.Cursor {
tc.createCursorFn = func(cur cursors.IntegerArrayCursor, every, offset int64, window interval.Window) cursors.Cursor {
if every != 0 || offset != 0 {
everyDur := values.MakeDuration(every, 0, false)
offsetDur := values.MakeDuration(offset, 0, false)
window = execute.Window{
Every: everyDur,
Offset: offsetDur,
}
window, _ = interval.NewWindow(
values.MakeDuration(every, 0, false),
values.MakeDuration(every, 0, false),
values.MakeDuration(offset, 0, false),
)
}
return newIntegerWindowMeanArrayCursor(cur, window)
}

View File

@ -1331,7 +1331,7 @@ func skipPredicate(dAtA []byte) (n int, err error) {
}
var (
ErrInvalidLengthPredicate = fmt.Errorf("proto: negative length found during unmarshalling")
ErrInvalidLengthPredicate = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowPredicate = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupPredicate = fmt.Errorf("proto: unexpected end of group")
)

View File

@ -8154,7 +8154,7 @@ func skipStorageCommon(dAtA []byte) (n int, err error) {
}
var (
ErrInvalidLengthStorageCommon = fmt.Errorf("proto: negative length found during unmarshalling")
ErrInvalidLengthStorageCommon = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowStorageCommon = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupStorageCommon = fmt.Errorf("proto: unexpected end of group")
)