feat: mean,count aggregation for WindowAggregate pushdown in enterprise (#21291)

We support only one aggregate list [mean,count]. All other aggregates
still must be single-element lists.
pull/21334/head
Sam Arnold 2021-04-29 15:30:13 -03:00 committed by GitHub
parent 5d04f0ae20
commit 32aa970eba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 3773 additions and 293 deletions

View File

@ -17,6 +17,7 @@ v1.9.0 [unreleased]
- [#21100](https://github.com/influxdata/influxdb/pull/21100): feat: add memory and concurrency limits in flux controller - [#21100](https://github.com/influxdata/influxdb/pull/21100): feat: add memory and concurrency limits in flux controller
- [#21108](https://github.com/influxdata/influxdb/pull/21108): feat: make flux controller limits configurable - [#21108](https://github.com/influxdata/influxdb/pull/21108): feat: make flux controller limits configurable
- [#21226](https://github.com/influxdata/influxdb/pull/21226): feat: flux upgrade to v0.112.1 - [#21226](https://github.com/influxdata/influxdb/pull/21226): feat: flux upgrade to v0.112.1
- [#21291](https://github.com/influxdata/influxdb/pull/21291): feat: meancount aggregation for WindowAggregate pushdown in enterprise
### Bugfixes ### Bugfixes

View File

@ -27,7 +27,8 @@ type windowAggregateResultSet struct {
// conditions are met, it returns false, otherwise, it returns true. // conditions are met, it returns false, otherwise, it returns true.
func IsAscendingWindowAggregate(req *datatypes.ReadWindowAggregateRequest) bool { func IsAscendingWindowAggregate(req *datatypes.ReadWindowAggregateRequest) bool {
if len(req.Aggregate) != 1 { if len(req.Aggregate) != 1 {
return false // Descending optimization for last only applies when it is the only aggregate.
return true
} }
// The following is an optimization where in the case of a single window, // The following is an optimization where in the case of a single window,
@ -48,7 +49,13 @@ func IsAscendingWindowAggregate(req *datatypes.ReadWindowAggregateRequest) bool
func NewWindowAggregateResultSet(ctx context.Context, req *datatypes.ReadWindowAggregateRequest, cursor SeriesCursor) (ResultSet, error) { func NewWindowAggregateResultSet(ctx context.Context, req *datatypes.ReadWindowAggregateRequest, cursor SeriesCursor) (ResultSet, error) {
if nAggs := len(req.Aggregate); nAggs != 1 { if nAggs := len(req.Aggregate); nAggs != 1 {
return nil, errors.Errorf(errors.InternalError, "attempt to create a windowAggregateResultSet with %v aggregate functions", nAggs) if nAggs == 2 {
if req.Aggregate[0].Type != datatypes.AggregateTypeMean || req.Aggregate[1].Type != datatypes.AggregateTypeCount {
return nil, errors.Errorf(errors.InternalError, "attempt to create a windowAggregateResultSet with %v, %v aggregates", req.Aggregate[0].Type, req.Aggregate[1].Type)
}
} else {
return nil, errors.Errorf(errors.InternalError, "attempt to create a windowAggregateResultSet with %v aggregate functions", nAggs)
}
} }
ascending := IsAscendingWindowAggregate(req) ascending := IsAscendingWindowAggregate(req)
@ -110,7 +117,6 @@ func GetWindow(req *datatypes.ReadWindowAggregateRequest) (interval.Window, erro
} }
func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cursor, error) { func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cursor, error) {
agg := r.req.Aggregate[0]
cursor := r.arrayCursors.createCursor(seriesRow) cursor := r.arrayCursors.createCursor(seriesRow)
@ -121,9 +127,9 @@ func (r *windowAggregateResultSet) createCursor(seriesRow SeriesRow) (cursors.Cu
if window.Every().Nanoseconds() == math.MaxInt64 { if window.Every().Nanoseconds() == math.MaxInt64 {
// This means to aggregate over whole series for the query's time range // This means to aggregate over whole series for the query's time range
return newAggregateArrayCursor(r.ctx, agg, cursor) return newAggregateArrayCursor(r.ctx, r.req.Aggregate, cursor)
} else { } else {
return NewWindowAggregateArrayCursor(r.ctx, agg, window, cursor) return NewWindowAggregateArrayCursor(r.ctx, r.req.Aggregate, window, cursor)
} }
} }

View File

@ -196,6 +196,26 @@ func newWindowMeanArrayCursor(cur cursors.Cursor, window interval.Window) (curso
} }
} }
func newWindowMeanCountArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) {
switch cur := cur.(type) {
case cursors.FloatArrayCursor:
return newFloatWindowMeanCountArrayCursor(cur, window), nil
case cursors.IntegerArrayCursor:
return newIntegerWindowMeanCountArrayCursor(cur, window), nil
case cursors.UnsignedArrayCursor:
return newUnsignedWindowMeanCountArrayCursor(cur, window), nil
default:
return nil, &errors2.Error{
Code: errors2.EInvalid,
Msg: fmt.Sprintf("unsupported input type for meancount aggregate: %s", arrayCursorType(cur)),
}
}
}
// ******************** // ********************
// Float Array Cursor // Float Array Cursor
@ -525,6 +545,7 @@ func (c *floatWindowCountArrayCursor) Stats() cursors.CursorStats {
func (c *floatWindowCountArrayCursor) Next() *cursors.IntegerArray { func (c *floatWindowCountArrayCursor) Next() *cursors.IntegerArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.FloatArray var a *cursors.FloatArray
@ -587,7 +608,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.FloatArrayCursor.Next() a = c.FloatArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -604,6 +624,7 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res
@ -636,6 +657,7 @@ func (c *floatWindowSumArrayCursor) Stats() cursors.CursorStats {
func (c *floatWindowSumArrayCursor) Next() *cursors.FloatArray { func (c *floatWindowSumArrayCursor) Next() *cursors.FloatArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.FloatArray var a *cursors.FloatArray
@ -698,7 +720,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.FloatArrayCursor.Next() a = c.FloatArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -715,6 +736,7 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res
@ -747,6 +769,7 @@ func (c *floatWindowMinArrayCursor) Stats() cursors.CursorStats {
func (c *floatWindowMinArrayCursor) Next() *cursors.FloatArray { func (c *floatWindowMinArrayCursor) Next() *cursors.FloatArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.FloatArray var a *cursors.FloatArray
@ -813,7 +836,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.FloatArrayCursor.Next() a = c.FloatArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -830,6 +852,7 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res
@ -862,6 +885,7 @@ func (c *floatWindowMaxArrayCursor) Stats() cursors.CursorStats {
func (c *floatWindowMaxArrayCursor) Next() *cursors.FloatArray { func (c *floatWindowMaxArrayCursor) Next() *cursors.FloatArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.FloatArray var a *cursors.FloatArray
@ -928,7 +952,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.FloatArrayCursor.Next() a = c.FloatArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -945,6 +968,7 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res
@ -977,6 +1001,7 @@ func (c *floatWindowMeanArrayCursor) Stats() cursors.CursorStats {
func (c *floatWindowMeanArrayCursor) Next() *cursors.FloatArray { func (c *floatWindowMeanArrayCursor) Next() *cursors.FloatArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.FloatArray var a *cursors.FloatArray
@ -1042,7 +1067,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.FloatArrayCursor.Next() a = c.FloatArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -1059,11 +1083,131 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res
} }
type floatWindowMeanCountArrayCursor struct {
cursors.FloatArrayCursor
res *cursors.MeanCountArray
tmp *cursors.FloatArray
window interval.Window
}
func newFloatWindowMeanCountArrayCursor(cur cursors.FloatArrayCursor, window interval.Window) *floatWindowMeanCountArrayCursor {
resLen := MaxPointsPerBlock
if window.IsZero() {
resLen = 1
}
return &floatWindowMeanCountArrayCursor{
FloatArrayCursor: cur,
res: cursors.NewMeanCountArrayLen(resLen),
tmp: &cursors.FloatArray{},
window: window,
}
}
func (c *floatWindowMeanCountArrayCursor) Stats() cursors.CursorStats {
return c.FloatArrayCursor.Stats()
}
func (c *floatWindowMeanCountArrayCursor) Next() *cursors.MeanCountArray {
pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values0 = c.res.Values0[:cap(c.res.Values0)]
c.res.Values1 = c.res.Values1[:cap(c.res.Values1)]
var a *cursors.FloatArray
if c.tmp.Len() > 0 {
a = c.tmp
} else {
a = c.FloatArrayCursor.Next()
}
if a.Len() == 0 {
return &cursors.MeanCountArray{}
}
rowIdx := 0
var sum float64
var count int64
var windowEnd int64
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
windowHasPoints := false
// enumerate windows
WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
c.res.Timestamps[pos] = windowEnd
c.res.Values0[pos] = sum / float64(count)
c.res.Values1[pos] = count
pos++
if pos >= MaxPointsPerBlock {
// the output array is full,
// save the remaining points in the input array in tmp.
// they will be processed in the next call to Next()
c.tmp.Timestamps = a.Timestamps[rowIdx:]
c.tmp.Values = a.Values[rowIdx:]
break WINDOWS
}
}
// start the new window
sum = 0
count = 0
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
} else {
sum += a.Values[rowIdx]
count++
windowHasPoints = true
}
}
// Clear buffered timestamps & values if we make it through a cursor.
// The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil
c.tmp.Values = nil
// get the next chunk
a = c.FloatArrayCursor.Next()
if a.Len() == 0 {
// write the final point
// do not generate a point for empty windows
if windowHasPoints {
c.res.Timestamps[pos] = windowEnd
c.res.Values0[pos] = sum / float64(count)
c.res.Values1[pos] = count
pos++
}
break WINDOWS
}
rowIdx = 0
}
c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values0 = c.res.Values0[:pos]
c.res.Values1 = c.res.Values1[:pos]
return c.res
}
type floatEmptyArrayCursor struct { type floatEmptyArrayCursor struct {
res cursors.FloatArray res cursors.FloatArray
} }
@ -1404,6 +1548,7 @@ func (c *integerWindowCountArrayCursor) Stats() cursors.CursorStats {
func (c *integerWindowCountArrayCursor) Next() *cursors.IntegerArray { func (c *integerWindowCountArrayCursor) Next() *cursors.IntegerArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.IntegerArray var a *cursors.IntegerArray
@ -1466,7 +1611,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.IntegerArrayCursor.Next() a = c.IntegerArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -1483,6 +1627,7 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res
@ -1515,6 +1660,7 @@ func (c *integerWindowSumArrayCursor) Stats() cursors.CursorStats {
func (c *integerWindowSumArrayCursor) Next() *cursors.IntegerArray { func (c *integerWindowSumArrayCursor) Next() *cursors.IntegerArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.IntegerArray var a *cursors.IntegerArray
@ -1577,7 +1723,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.IntegerArrayCursor.Next() a = c.IntegerArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -1594,6 +1739,7 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res
@ -1626,6 +1772,7 @@ func (c *integerWindowMinArrayCursor) Stats() cursors.CursorStats {
func (c *integerWindowMinArrayCursor) Next() *cursors.IntegerArray { func (c *integerWindowMinArrayCursor) Next() *cursors.IntegerArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.IntegerArray var a *cursors.IntegerArray
@ -1692,7 +1839,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.IntegerArrayCursor.Next() a = c.IntegerArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -1709,6 +1855,7 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res
@ -1741,6 +1888,7 @@ func (c *integerWindowMaxArrayCursor) Stats() cursors.CursorStats {
func (c *integerWindowMaxArrayCursor) Next() *cursors.IntegerArray { func (c *integerWindowMaxArrayCursor) Next() *cursors.IntegerArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.IntegerArray var a *cursors.IntegerArray
@ -1807,7 +1955,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.IntegerArrayCursor.Next() a = c.IntegerArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -1824,6 +1971,7 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res
@ -1856,6 +2004,7 @@ func (c *integerWindowMeanArrayCursor) Stats() cursors.CursorStats {
func (c *integerWindowMeanArrayCursor) Next() *cursors.FloatArray { func (c *integerWindowMeanArrayCursor) Next() *cursors.FloatArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.IntegerArray var a *cursors.IntegerArray
@ -1921,7 +2070,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.IntegerArrayCursor.Next() a = c.IntegerArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -1938,11 +2086,131 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res
} }
type integerWindowMeanCountArrayCursor struct {
cursors.IntegerArrayCursor
res *cursors.MeanCountArray
tmp *cursors.IntegerArray
window interval.Window
}
func newIntegerWindowMeanCountArrayCursor(cur cursors.IntegerArrayCursor, window interval.Window) *integerWindowMeanCountArrayCursor {
resLen := MaxPointsPerBlock
if window.IsZero() {
resLen = 1
}
return &integerWindowMeanCountArrayCursor{
IntegerArrayCursor: cur,
res: cursors.NewMeanCountArrayLen(resLen),
tmp: &cursors.IntegerArray{},
window: window,
}
}
func (c *integerWindowMeanCountArrayCursor) Stats() cursors.CursorStats {
return c.IntegerArrayCursor.Stats()
}
func (c *integerWindowMeanCountArrayCursor) Next() *cursors.MeanCountArray {
pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values0 = c.res.Values0[:cap(c.res.Values0)]
c.res.Values1 = c.res.Values1[:cap(c.res.Values1)]
var a *cursors.IntegerArray
if c.tmp.Len() > 0 {
a = c.tmp
} else {
a = c.IntegerArrayCursor.Next()
}
if a.Len() == 0 {
return &cursors.MeanCountArray{}
}
rowIdx := 0
var sum float64
var count int64
var windowEnd int64
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
windowHasPoints := false
// enumerate windows
WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
c.res.Timestamps[pos] = windowEnd
c.res.Values0[pos] = sum / float64(count)
c.res.Values1[pos] = count
pos++
if pos >= MaxPointsPerBlock {
// the output array is full,
// save the remaining points in the input array in tmp.
// they will be processed in the next call to Next()
c.tmp.Timestamps = a.Timestamps[rowIdx:]
c.tmp.Values = a.Values[rowIdx:]
break WINDOWS
}
}
// start the new window
sum = 0
count = 0
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
} else {
sum += float64(a.Values[rowIdx])
count++
windowHasPoints = true
}
}
// Clear buffered timestamps & values if we make it through a cursor.
// The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil
c.tmp.Values = nil
// get the next chunk
a = c.IntegerArrayCursor.Next()
if a.Len() == 0 {
// write the final point
// do not generate a point for empty windows
if windowHasPoints {
c.res.Timestamps[pos] = windowEnd
c.res.Values0[pos] = sum / float64(count)
c.res.Values1[pos] = count
pos++
}
break WINDOWS
}
rowIdx = 0
}
c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values0 = c.res.Values0[:pos]
c.res.Values1 = c.res.Values1[:pos]
return c.res
}
type integerEmptyArrayCursor struct { type integerEmptyArrayCursor struct {
res cursors.IntegerArray res cursors.IntegerArray
} }
@ -2283,6 +2551,7 @@ func (c *unsignedWindowCountArrayCursor) Stats() cursors.CursorStats {
func (c *unsignedWindowCountArrayCursor) Next() *cursors.IntegerArray { func (c *unsignedWindowCountArrayCursor) Next() *cursors.IntegerArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.UnsignedArray var a *cursors.UnsignedArray
@ -2345,7 +2614,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.UnsignedArrayCursor.Next() a = c.UnsignedArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -2362,6 +2630,7 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res
@ -2394,6 +2663,7 @@ func (c *unsignedWindowSumArrayCursor) Stats() cursors.CursorStats {
func (c *unsignedWindowSumArrayCursor) Next() *cursors.UnsignedArray { func (c *unsignedWindowSumArrayCursor) Next() *cursors.UnsignedArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.UnsignedArray var a *cursors.UnsignedArray
@ -2456,7 +2726,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.UnsignedArrayCursor.Next() a = c.UnsignedArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -2473,6 +2742,7 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res
@ -2505,6 +2775,7 @@ func (c *unsignedWindowMinArrayCursor) Stats() cursors.CursorStats {
func (c *unsignedWindowMinArrayCursor) Next() *cursors.UnsignedArray { func (c *unsignedWindowMinArrayCursor) Next() *cursors.UnsignedArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.UnsignedArray var a *cursors.UnsignedArray
@ -2571,7 +2842,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.UnsignedArrayCursor.Next() a = c.UnsignedArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -2588,6 +2858,7 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res
@ -2620,6 +2891,7 @@ func (c *unsignedWindowMaxArrayCursor) Stats() cursors.CursorStats {
func (c *unsignedWindowMaxArrayCursor) Next() *cursors.UnsignedArray { func (c *unsignedWindowMaxArrayCursor) Next() *cursors.UnsignedArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.UnsignedArray var a *cursors.UnsignedArray
@ -2686,7 +2958,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.UnsignedArrayCursor.Next() a = c.UnsignedArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -2703,6 +2974,7 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res
@ -2735,6 +3007,7 @@ func (c *unsignedWindowMeanArrayCursor) Stats() cursors.CursorStats {
func (c *unsignedWindowMeanArrayCursor) Next() *cursors.FloatArray { func (c *unsignedWindowMeanArrayCursor) Next() *cursors.FloatArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.UnsignedArray var a *cursors.UnsignedArray
@ -2800,7 +3073,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.UnsignedArrayCursor.Next() a = c.UnsignedArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -2817,11 +3089,131 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res
} }
type unsignedWindowMeanCountArrayCursor struct {
cursors.UnsignedArrayCursor
res *cursors.MeanCountArray
tmp *cursors.UnsignedArray
window interval.Window
}
func newUnsignedWindowMeanCountArrayCursor(cur cursors.UnsignedArrayCursor, window interval.Window) *unsignedWindowMeanCountArrayCursor {
resLen := MaxPointsPerBlock
if window.IsZero() {
resLen = 1
}
return &unsignedWindowMeanCountArrayCursor{
UnsignedArrayCursor: cur,
res: cursors.NewMeanCountArrayLen(resLen),
tmp: &cursors.UnsignedArray{},
window: window,
}
}
func (c *unsignedWindowMeanCountArrayCursor) Stats() cursors.CursorStats {
return c.UnsignedArrayCursor.Stats()
}
func (c *unsignedWindowMeanCountArrayCursor) Next() *cursors.MeanCountArray {
pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values0 = c.res.Values0[:cap(c.res.Values0)]
c.res.Values1 = c.res.Values1[:cap(c.res.Values1)]
var a *cursors.UnsignedArray
if c.tmp.Len() > 0 {
a = c.tmp
} else {
a = c.UnsignedArrayCursor.Next()
}
if a.Len() == 0 {
return &cursors.MeanCountArray{}
}
rowIdx := 0
var sum float64
var count int64
var windowEnd int64
if !c.window.IsZero() {
windowEnd = int64(c.window.GetLatestBounds(values.Time(a.Timestamps[rowIdx])).Stop())
} else {
windowEnd = math.MaxInt64
}
windowHasPoints := false
// enumerate windows
WINDOWS:
for {
for ; rowIdx < a.Len(); rowIdx++ {
ts := a.Timestamps[rowIdx]
if !c.window.IsZero() && ts >= windowEnd {
// new window detected, close the current window
// do not generate a point for empty windows
if windowHasPoints {
c.res.Timestamps[pos] = windowEnd
c.res.Values0[pos] = sum / float64(count)
c.res.Values1[pos] = count
pos++
if pos >= MaxPointsPerBlock {
// the output array is full,
// save the remaining points in the input array in tmp.
// they will be processed in the next call to Next()
c.tmp.Timestamps = a.Timestamps[rowIdx:]
c.tmp.Values = a.Values[rowIdx:]
break WINDOWS
}
}
// start the new window
sum = 0
count = 0
windowEnd = int64(c.window.GetLatestBounds(values.Time(ts)).Stop())
windowHasPoints = false
continue WINDOWS
} else {
sum += float64(a.Values[rowIdx])
count++
windowHasPoints = true
}
}
// Clear buffered timestamps & values if we make it through a cursor.
// The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil
c.tmp.Values = nil
// get the next chunk
a = c.UnsignedArrayCursor.Next()
if a.Len() == 0 {
// write the final point
// do not generate a point for empty windows
if windowHasPoints {
c.res.Timestamps[pos] = windowEnd
c.res.Values0[pos] = sum / float64(count)
c.res.Values1[pos] = count
pos++
}
break WINDOWS
}
rowIdx = 0
}
c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values0 = c.res.Values0[:pos]
c.res.Values1 = c.res.Values1[:pos]
return c.res
}
type unsignedEmptyArrayCursor struct { type unsignedEmptyArrayCursor struct {
res cursors.UnsignedArray res cursors.UnsignedArray
} }
@ -3162,6 +3554,7 @@ func (c *stringWindowCountArrayCursor) Stats() cursors.CursorStats {
func (c *stringWindowCountArrayCursor) Next() *cursors.IntegerArray { func (c *stringWindowCountArrayCursor) Next() *cursors.IntegerArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.StringArray var a *cursors.StringArray
@ -3224,7 +3617,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.StringArrayCursor.Next() a = c.StringArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -3241,6 +3633,7 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res
@ -3586,6 +3979,7 @@ func (c *booleanWindowCountArrayCursor) Stats() cursors.CursorStats {
func (c *booleanWindowCountArrayCursor) Next() *cursors.IntegerArray { func (c *booleanWindowCountArrayCursor) Next() *cursors.IntegerArray {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
var a *cursors.BooleanArray var a *cursors.BooleanArray
@ -3648,7 +4042,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.BooleanArrayCursor.Next() a = c.BooleanArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -3665,6 +4058,7 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
return c.res return c.res

View File

@ -137,6 +137,25 @@ func newWindowMeanArrayCursor(cur cursors.Cursor, window interval.Window) (curso
} }
} }
} }
func newWindowMeanCountArrayCursor(cur cursors.Cursor, window interval.Window) (cursors.Cursor, error) {
switch cur := cur.(type) {
{{range .}}
{{$Type := .Name}}
{{range .Aggs}}
{{if eq .Name "MeanCount"}}
case cursors.{{$Type}}ArrayCursor:
return new{{$Type}}WindowMeanCountArrayCursor(cur, window), nil
{{end}}
{{end}}{{/* for each supported agg fn */}}
{{end}}{{/* for each field type */}}
default:
return nil, &errors2.Error{
Code: errors2.EInvalid,
Msg: fmt.Sprintf("unsupported input type for meancount aggregate: %s", arrayCursorType(cur)),
}
}
}
{{range .}} {{range .}}
{{$arrayType := print "*cursors." .Name "Array"}} {{$arrayType := print "*cursors." .Name "Array"}}
{{$type := print .name "ArrayFilterCursor"}} {{$type := print .name "ArrayFilterCursor"}}
@ -479,7 +498,12 @@ func (c *{{$name}}Window{{$aggName}}ArrayCursor) Stats() cursors.CursorStats {
func (c *{{$name}}Window{{$aggName}}ArrayCursor) Next() *cursors.{{.OutputTypeName}}Array { func (c *{{$name}}Window{{$aggName}}ArrayCursor) Next() *cursors.{{.OutputTypeName}}Array {
pos := 0 pos := 0
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)] c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
{{if eq .OutputTypeName "MeanCount" }}
c.res.Values0 = c.res.Values0[:cap(c.res.Values0)]
c.res.Values1 = c.res.Values1[:cap(c.res.Values1)]
{{else}}
c.res.Values = c.res.Values[:cap(c.res.Values)] c.res.Values = c.res.Values[:cap(c.res.Values)]
{{end}}
var a *cursors.{{$Name}}Array var a *cursors.{{$Name}}Array
if c.tmp.Len() > 0 { if c.tmp.Len() > 0 {
@ -540,7 +564,6 @@ WINDOWS:
// The break above will skip this if a cursor is partially read. // The break above will skip this if a cursor is partially read.
c.tmp.Timestamps = nil c.tmp.Timestamps = nil
c.tmp.Values = nil c.tmp.Values = nil
// get the next chunk // get the next chunk
a = c.{{$Name}}ArrayCursor.Next() a = c.{{$Name}}ArrayCursor.Next()
if a.Len() == 0 { if a.Len() == 0 {
@ -556,7 +579,12 @@ WINDOWS:
} }
c.res.Timestamps = c.res.Timestamps[:pos] c.res.Timestamps = c.res.Timestamps[:pos]
{{if eq .OutputTypeName "MeanCount" }}
c.res.Values0 = c.res.Values0[:pos]
c.res.Values1 = c.res.Values1[:pos]
{{else}}
c.res.Values = c.res.Values[:pos] c.res.Values = c.res.Values[:pos]
{{end}}
return c.res return c.res
} }

View File

@ -43,6 +43,14 @@
"Accumulate":"sum += a.Values[rowIdx]; count++", "Accumulate":"sum += a.Values[rowIdx]; count++",
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = sum / float64(count)", "AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = sum / float64(count)",
"AccReset":"sum = 0; count = 0" "AccReset":"sum = 0; count = 0"
},
{
"Name":"MeanCount",
"OutputTypeName":"MeanCount",
"AccDecls":"var sum float64; var count int64",
"Accumulate":"sum += a.Values[rowIdx]; count++",
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values0[pos] = sum / float64(count); c.res.Values1[pos] = count",
"AccReset":"sum = 0; count = 0"
} }
] ]
}, },
@ -90,6 +98,14 @@
"Accumulate":"sum += a.Values[rowIdx]; count++", "Accumulate":"sum += a.Values[rowIdx]; count++",
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = float64(sum) / float64(count)", "AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = float64(sum) / float64(count)",
"AccReset":"sum = 0; count = 0" "AccReset":"sum = 0; count = 0"
},
{
"Name":"MeanCount",
"OutputTypeName":"MeanCount",
"AccDecls":"var sum float64; var count int64",
"Accumulate":"sum += float64(a.Values[rowIdx]); count++",
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values0[pos] = sum / float64(count); c.res.Values1[pos] = count",
"AccReset":"sum = 0; count = 0"
} }
] ]
}, },
@ -137,6 +153,14 @@
"Accumulate":"sum += a.Values[rowIdx]; count++", "Accumulate":"sum += a.Values[rowIdx]; count++",
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = float64(sum) / float64(count)", "AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = float64(sum) / float64(count)",
"AccReset":"sum = 0; count = 0" "AccReset":"sum = 0; count = 0"
},
{
"Name":"MeanCount",
"OutputTypeName":"MeanCount",
"AccDecls":"var sum float64; var count int64",
"Accumulate":"sum += float64(a.Values[rowIdx]); count++",
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values0[pos] = sum / float64(count); c.res.Values1[pos] = count",
"AccReset":"sum = 0; count = 0"
} }
] ]
}, },

View File

@ -17,20 +17,20 @@ func (v *singleValue) Value(key string) (interface{}, bool) {
return v.v, true return v.v, true
} }
func newAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) (cursors.Cursor, error) { func newAggregateArrayCursor(ctx context.Context, agg []*datatypes.Aggregate, cursor cursors.Cursor) (cursors.Cursor, error) {
switch agg.Type { switch agg[0].Type {
case datatypes.AggregateTypeFirst, datatypes.AggregateTypeLast: case datatypes.AggregateTypeFirst, datatypes.AggregateTypeLast:
return newLimitArrayCursor(cursor), nil return newLimitArrayCursor(cursor), nil
} }
return NewWindowAggregateArrayCursor(ctx, agg, interval.Window{}, cursor) return NewWindowAggregateArrayCursor(ctx, agg, interval.Window{}, cursor)
} }
func NewWindowAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, window interval.Window, cursor cursors.Cursor) (cursors.Cursor, error) { func NewWindowAggregateArrayCursor(ctx context.Context, agg []*datatypes.Aggregate, window interval.Window, cursor cursors.Cursor) (cursors.Cursor, error) {
if cursor == nil { if cursor == nil {
return nil, nil return nil, nil
} }
switch agg.Type { switch agg[0].Type {
case datatypes.AggregateTypeCount: case datatypes.AggregateTypeCount:
return newWindowCountArrayCursor(cursor, window), nil return newWindowCountArrayCursor(cursor, window), nil
case datatypes.AggregateTypeSum: case datatypes.AggregateTypeSum:
@ -44,7 +44,11 @@ func NewWindowAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate
case datatypes.AggregateTypeMax: case datatypes.AggregateTypeMax:
return newWindowMaxArrayCursor(cursor, window), nil return newWindowMaxArrayCursor(cursor, window), nil
case datatypes.AggregateTypeMean: case datatypes.AggregateTypeMean:
if len(agg) == 2 && agg[1].Type == datatypes.AggregateTypeCount {
return newWindowMeanCountArrayCursor(cursor, window)
}
return newWindowMeanArrayCursor(cursor, window) return newWindowMeanArrayCursor(cursor, window)
default: default:
// TODO(sgc): should be validated higher up // TODO(sgc): should be validated higher up
panic("invalid aggregate") panic("invalid aggregate")

View File

@ -41,8 +41,8 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
tmp: &cursors.FloatArray{}, tmp: &cursors.FloatArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeCount, &datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
@ -59,8 +59,8 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
tmp: &cursors.FloatArray{}, tmp: &cursors.FloatArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeSum, &datatypes.Aggregate{Type: datatypes.AggregateTypeSum},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
@ -77,8 +77,8 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
tmp: &cursors.FloatArray{}, tmp: &cursors.FloatArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMin, &datatypes.Aggregate{Type: datatypes.AggregateTypeMin},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
@ -95,8 +95,8 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
tmp: &cursors.FloatArray{}, tmp: &cursors.FloatArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMax, &datatypes.Aggregate{Type: datatypes.AggregateTypeMax},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
@ -113,8 +113,8 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
tmp: &cursors.FloatArray{}, tmp: &cursors.FloatArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMean, &datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
@ -124,6 +124,25 @@ func TestNewAggregateArrayCursor_Float(t *testing.T) {
} }
}) })
t.Run("MeanCount", func(t *testing.T) {
want := &floatWindowMeanCountArrayCursor{
FloatArrayCursor: &MockFloatArrayCursor{},
res: cursors.NewMeanCountArrayLen(1),
tmp: &cursors.FloatArray{},
}
agg := []*datatypes.Aggregate{
&datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
&datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
}
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMeanCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
} }
func TestNewWindowAggregateArrayCursorMonths_Float(t *testing.T) { func TestNewWindowAggregateArrayCursorMonths_Float(t *testing.T) {
@ -142,8 +161,8 @@ func TestNewWindowAggregateArrayCursorMonths_Float(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeCount, &datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
@ -167,8 +186,8 @@ func TestNewWindowAggregateArrayCursorMonths_Float(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeSum, &datatypes.Aggregate{Type: datatypes.AggregateTypeSum},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
@ -192,8 +211,8 @@ func TestNewWindowAggregateArrayCursorMonths_Float(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMin, &datatypes.Aggregate{Type: datatypes.AggregateTypeMin},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
@ -217,8 +236,8 @@ func TestNewWindowAggregateArrayCursorMonths_Float(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMax, &datatypes.Aggregate{Type: datatypes.AggregateTypeMax},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
@ -242,8 +261,8 @@ func TestNewWindowAggregateArrayCursorMonths_Float(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMean, &datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
@ -253,6 +272,32 @@ func TestNewWindowAggregateArrayCursorMonths_Float(t *testing.T) {
} }
}) })
t.Run("MeanCount", func(t *testing.T) {
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &floatWindowMeanCountArrayCursor{
FloatArrayCursor: &MockFloatArrayCursor{},
res: cursors.NewMeanCountArrayLen(MaxPointsPerBlock),
tmp: &cursors.FloatArray{},
window: window,
}
agg := []*datatypes.Aggregate{
&datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
&datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
}
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMeanCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
} }
func TestNewWindowAggregateArrayCursor_Float(t *testing.T) { func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
@ -271,8 +316,8 @@ func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeCount, &datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
@ -296,8 +341,8 @@ func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeSum, &datatypes.Aggregate{Type: datatypes.AggregateTypeSum},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
@ -321,8 +366,8 @@ func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMin, &datatypes.Aggregate{Type: datatypes.AggregateTypeMin},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
@ -346,8 +391,8 @@ func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMax, &datatypes.Aggregate{Type: datatypes.AggregateTypeMax},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
@ -371,8 +416,8 @@ func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMean, &datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
@ -382,6 +427,32 @@ func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
} }
}) })
t.Run("MeanCount", func(t *testing.T) {
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &floatWindowMeanCountArrayCursor{
FloatArrayCursor: &MockFloatArrayCursor{},
res: cursors.NewMeanCountArrayLen(MaxPointsPerBlock),
tmp: &cursors.FloatArray{},
window: window,
}
agg := []*datatypes.Aggregate{
&datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
&datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
}
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMeanCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
} }
type MockIntegerArrayCursor struct { type MockIntegerArrayCursor struct {
@ -405,8 +476,8 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
tmp: &cursors.IntegerArray{}, tmp: &cursors.IntegerArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeCount, &datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
@ -423,8 +494,8 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
tmp: &cursors.IntegerArray{}, tmp: &cursors.IntegerArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeSum, &datatypes.Aggregate{Type: datatypes.AggregateTypeSum},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
@ -441,8 +512,8 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
tmp: &cursors.IntegerArray{}, tmp: &cursors.IntegerArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMin, &datatypes.Aggregate{Type: datatypes.AggregateTypeMin},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
@ -459,8 +530,8 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
tmp: &cursors.IntegerArray{}, tmp: &cursors.IntegerArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMax, &datatypes.Aggregate{Type: datatypes.AggregateTypeMax},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
@ -477,8 +548,8 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
tmp: &cursors.IntegerArray{}, tmp: &cursors.IntegerArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMean, &datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
@ -488,6 +559,25 @@ func TestNewAggregateArrayCursor_Integer(t *testing.T) {
} }
}) })
t.Run("MeanCount", func(t *testing.T) {
want := &integerWindowMeanCountArrayCursor{
IntegerArrayCursor: &MockIntegerArrayCursor{},
res: cursors.NewMeanCountArrayLen(1),
tmp: &cursors.IntegerArray{},
}
agg := []*datatypes.Aggregate{
&datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
&datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
}
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMeanCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
} }
func TestNewWindowAggregateArrayCursorMonths_Integer(t *testing.T) { func TestNewWindowAggregateArrayCursorMonths_Integer(t *testing.T) {
@ -506,8 +596,8 @@ func TestNewWindowAggregateArrayCursorMonths_Integer(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeCount, &datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
@ -531,8 +621,8 @@ func TestNewWindowAggregateArrayCursorMonths_Integer(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeSum, &datatypes.Aggregate{Type: datatypes.AggregateTypeSum},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
@ -556,8 +646,8 @@ func TestNewWindowAggregateArrayCursorMonths_Integer(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMin, &datatypes.Aggregate{Type: datatypes.AggregateTypeMin},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
@ -581,8 +671,8 @@ func TestNewWindowAggregateArrayCursorMonths_Integer(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMax, &datatypes.Aggregate{Type: datatypes.AggregateTypeMax},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
@ -606,8 +696,8 @@ func TestNewWindowAggregateArrayCursorMonths_Integer(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMean, &datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
@ -617,6 +707,32 @@ func TestNewWindowAggregateArrayCursorMonths_Integer(t *testing.T) {
} }
}) })
t.Run("MeanCount", func(t *testing.T) {
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &integerWindowMeanCountArrayCursor{
IntegerArrayCursor: &MockIntegerArrayCursor{},
res: cursors.NewMeanCountArrayLen(MaxPointsPerBlock),
tmp: &cursors.IntegerArray{},
window: window,
}
agg := []*datatypes.Aggregate{
&datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
&datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
}
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMeanCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
} }
func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) { func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
@ -635,8 +751,8 @@ func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeCount, &datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
@ -660,8 +776,8 @@ func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeSum, &datatypes.Aggregate{Type: datatypes.AggregateTypeSum},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
@ -685,8 +801,8 @@ func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMin, &datatypes.Aggregate{Type: datatypes.AggregateTypeMin},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
@ -710,8 +826,8 @@ func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMax, &datatypes.Aggregate{Type: datatypes.AggregateTypeMax},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
@ -735,8 +851,8 @@ func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMean, &datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
@ -746,6 +862,32 @@ func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
} }
}) })
t.Run("MeanCount", func(t *testing.T) {
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &integerWindowMeanCountArrayCursor{
IntegerArrayCursor: &MockIntegerArrayCursor{},
res: cursors.NewMeanCountArrayLen(MaxPointsPerBlock),
tmp: &cursors.IntegerArray{},
window: window,
}
agg := []*datatypes.Aggregate{
&datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
&datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
}
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMeanCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
} }
type MockUnsignedArrayCursor struct { type MockUnsignedArrayCursor struct {
@ -769,8 +911,8 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
tmp: &cursors.UnsignedArray{}, tmp: &cursors.UnsignedArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeCount, &datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
@ -787,8 +929,8 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
tmp: &cursors.UnsignedArray{}, tmp: &cursors.UnsignedArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeSum, &datatypes.Aggregate{Type: datatypes.AggregateTypeSum},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
@ -805,8 +947,8 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
tmp: &cursors.UnsignedArray{}, tmp: &cursors.UnsignedArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMin, &datatypes.Aggregate{Type: datatypes.AggregateTypeMin},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
@ -823,8 +965,8 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
tmp: &cursors.UnsignedArray{}, tmp: &cursors.UnsignedArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMax, &datatypes.Aggregate{Type: datatypes.AggregateTypeMax},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
@ -841,8 +983,8 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
tmp: &cursors.UnsignedArray{}, tmp: &cursors.UnsignedArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMean, &datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
@ -852,6 +994,25 @@ func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
} }
}) })
t.Run("MeanCount", func(t *testing.T) {
want := &unsignedWindowMeanCountArrayCursor{
UnsignedArrayCursor: &MockUnsignedArrayCursor{},
res: cursors.NewMeanCountArrayLen(1),
tmp: &cursors.UnsignedArray{},
}
agg := []*datatypes.Aggregate{
&datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
&datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
}
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMeanCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
} }
func TestNewWindowAggregateArrayCursorMonths_Unsigned(t *testing.T) { func TestNewWindowAggregateArrayCursorMonths_Unsigned(t *testing.T) {
@ -870,8 +1031,8 @@ func TestNewWindowAggregateArrayCursorMonths_Unsigned(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeCount, &datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
@ -895,8 +1056,8 @@ func TestNewWindowAggregateArrayCursorMonths_Unsigned(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeSum, &datatypes.Aggregate{Type: datatypes.AggregateTypeSum},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
@ -920,8 +1081,8 @@ func TestNewWindowAggregateArrayCursorMonths_Unsigned(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMin, &datatypes.Aggregate{Type: datatypes.AggregateTypeMin},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
@ -945,8 +1106,8 @@ func TestNewWindowAggregateArrayCursorMonths_Unsigned(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMax, &datatypes.Aggregate{Type: datatypes.AggregateTypeMax},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
@ -970,8 +1131,8 @@ func TestNewWindowAggregateArrayCursorMonths_Unsigned(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMean, &datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
@ -981,6 +1142,32 @@ func TestNewWindowAggregateArrayCursorMonths_Unsigned(t *testing.T) {
} }
}) })
t.Run("MeanCount", func(t *testing.T) {
window, _ := interval.NewWindow(
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(int64(time.Hour), 0, false),
values.MakeDuration(0, 0, false),
)
want := &unsignedWindowMeanCountArrayCursor{
UnsignedArrayCursor: &MockUnsignedArrayCursor{},
res: cursors.NewMeanCountArrayLen(MaxPointsPerBlock),
tmp: &cursors.UnsignedArray{},
window: window,
}
agg := []*datatypes.Aggregate{
&datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
&datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
}
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMeanCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
} }
func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) { func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
@ -999,8 +1186,8 @@ func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeCount, &datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
@ -1024,8 +1211,8 @@ func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeSum, &datatypes.Aggregate{Type: datatypes.AggregateTypeSum},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
@ -1049,8 +1236,8 @@ func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMin, &datatypes.Aggregate{Type: datatypes.AggregateTypeMin},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
@ -1074,8 +1261,8 @@ func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMax, &datatypes.Aggregate{Type: datatypes.AggregateTypeMax},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
@ -1099,8 +1286,8 @@ func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeMean, &datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
@ -1110,6 +1297,32 @@ func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
} }
}) })
t.Run("MeanCount", func(t *testing.T) {
window, _ := interval.NewWindow(
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 1, false),
values.MakeDuration(0, 0, false),
)
want := &unsignedWindowMeanCountArrayCursor{
UnsignedArrayCursor: &MockUnsignedArrayCursor{},
res: cursors.NewMeanCountArrayLen(MaxPointsPerBlock),
tmp: &cursors.UnsignedArray{},
window: window,
}
agg := []*datatypes.Aggregate{
&datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
&datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
}
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMeanCountArrayCursor{}), cmpOptions); diff != "" {
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
}
})
} }
type MockStringArrayCursor struct { type MockStringArrayCursor struct {
@ -1133,8 +1346,8 @@ func TestNewAggregateArrayCursor_String(t *testing.T) {
tmp: &cursors.StringArray{}, tmp: &cursors.StringArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeCount, &datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockStringArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockStringArrayCursor{})
@ -1162,8 +1375,8 @@ func TestNewWindowAggregateArrayCursorMonths_String(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeCount, &datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockStringArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockStringArrayCursor{})
@ -1191,8 +1404,8 @@ func TestNewWindowAggregateArrayCursor_String(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeCount, &datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockStringArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockStringArrayCursor{})
@ -1225,8 +1438,8 @@ func TestNewAggregateArrayCursor_Boolean(t *testing.T) {
tmp: &cursors.BooleanArray{}, tmp: &cursors.BooleanArray{},
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeCount, &datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockBooleanArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &MockBooleanArrayCursor{})
@ -1254,8 +1467,8 @@ func TestNewWindowAggregateArrayCursorMonths_Boolean(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeCount, &datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockBooleanArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockBooleanArrayCursor{})
@ -1283,8 +1496,8 @@ func TestNewWindowAggregateArrayCursor_Boolean(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateTypeCount, &datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockBooleanArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &MockBooleanArrayCursor{})

View File

@ -39,10 +39,16 @@ func TestNewAggregateArrayCursor_{{$ColType}}(t *testing.T) {
res: cursors.New{{.OutputTypeName}}ArrayLen(1), res: cursors.New{{.OutputTypeName}}ArrayLen(1),
tmp: &cursors.{{$ColType}}Array{}, tmp: &cursors.{{$ColType}}Array{},
} }
{{if eq $Agg "MeanCount"}}
agg := &datatypes.Aggregate{ agg := []*datatypes.Aggregate{
Type: datatypes.AggregateType{{$Agg}}, &datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
&datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
{{else}}
agg := []*datatypes.Aggregate{
&datatypes.Aggregate{Type: datatypes.AggregateType{{$Agg}}},
}
{{end}}
got, _ := newAggregateArrayCursor(context.Background(), agg, &Mock{{$ColType}}ArrayCursor{}) got, _ := newAggregateArrayCursor(context.Background(), agg, &Mock{{$ColType}}ArrayCursor{})
@ -70,9 +76,16 @@ func TestNewWindowAggregateArrayCursorMonths_{{$ColType}}(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ {{if eq $Agg "MeanCount"}}
Type: datatypes.AggregateType{{$Agg}}, agg := []*datatypes.Aggregate{
&datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
&datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
{{else}}
agg := []*datatypes.Aggregate{
&datatypes.Aggregate{Type: datatypes.AggregateType{{$Agg}}},
}
{{end}}
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &Mock{{$ColType}}ArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &Mock{{$ColType}}ArrayCursor{})
@ -100,9 +113,16 @@ func TestNewWindowAggregateArrayCursor_{{$ColType}}(t *testing.T) {
window: window, window: window,
} }
agg := &datatypes.Aggregate{ {{if eq $Agg "MeanCount"}}
Type: datatypes.AggregateType{{$Agg}}, agg := []*datatypes.Aggregate{
&datatypes.Aggregate{Type: datatypes.AggregateTypeMean},
&datatypes.Aggregate{Type: datatypes.AggregateTypeCount},
} }
{{else}}
agg := []*datatypes.Aggregate{
&datatypes.Aggregate{Type: datatypes.AggregateType{{$Agg}}},
}
{{end}}
got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &Mock{{$ColType}}ArrayCursor{}) got, _ := NewWindowAggregateArrayCursor(context.Background(), agg, window, &Mock{{$ColType}}ArrayCursor{})

View File

@ -77,6 +77,21 @@ func makeFloatArray(n int, tsStart time.Time, tsStep time.Duration, valueFn func
return fa return fa
} }
func makeMeanCountArray(n int, tsStart time.Time, tsStep time.Duration, valueFn func(i int64) (float64, int64)) *cursors.MeanCountArray {
fa := &cursors.MeanCountArray{
Timestamps: make([]int64, n),
Values0: make([]float64, n),
Values1: make([]int64, n),
}
for i := 0; i < n; i++ {
fa.Timestamps[i] = tsStart.UnixNano() + int64(i)*int64(tsStep)
fa.Values0[i], fa.Values1[i] = valueFn(int64(i))
}
return fa
}
func mustParseTime(ts string) time.Time { func mustParseTime(ts string) time.Time {
t, err := time.Parse(time.RFC3339, ts) t, err := time.Parse(time.RFC3339, ts)
if err != nil { if err != nil {
@ -99,6 +114,14 @@ func copyFloatArray(src *cursors.FloatArray) *cursors.FloatArray {
return dst return dst
} }
func copyMeanCountArray(src *cursors.MeanCountArray) *cursors.MeanCountArray {
dst := cursors.NewMeanCountArrayLen(src.Len())
copy(dst.Timestamps, src.Timestamps)
copy(dst.Values0, src.Values0)
copy(dst.Values1, src.Values1)
return dst
}
type aggArrayCursorTest struct { type aggArrayCursorTest struct {
name string name string
createCursorFn func(cur cursors.IntegerArrayCursor, every, offset int64, window interval.Window) cursors.Cursor createCursorFn func(cur cursors.IntegerArrayCursor, every, offset int64, window interval.Window) cursors.Cursor
@ -107,6 +130,7 @@ type aggArrayCursorTest struct {
inputArrays []*cursors.IntegerArray inputArrays []*cursors.IntegerArray
wantIntegers []*cursors.IntegerArray wantIntegers []*cursors.IntegerArray
wantFloats []*cursors.FloatArray wantFloats []*cursors.FloatArray
wantMeanCounts []*cursors.MeanCountArray
window interval.Window window interval.Window
} }
@ -147,6 +171,15 @@ func (a *aggArrayCursorTest) run(t *testing.T) {
if diff := cmp.Diff(got, a.wantFloats); diff != "" { if diff := cmp.Diff(got, a.wantFloats); diff != "" {
t.Fatalf("did not get expected result from count array cursor; -got/+want:\n%v", diff) t.Fatalf("did not get expected result from count array cursor; -got/+want:\n%v", diff)
} }
case cursors.MeanCountArrayCursor:
got := make([]*cursors.MeanCountArray, 0, len(a.wantMeanCounts))
for a := cursor.Next(); a.Len() != 0; a = cursor.Next() {
got = append(got, copyMeanCountArray(a))
}
if diff := cmp.Diff(got, a.wantMeanCounts); diff != "" {
t.Fatalf("did not get expected result from count array cursor; -got/+want:\n%v", diff)
}
default: default:
t.Fatalf("unsupported cursor type: %T", cursor) t.Fatalf("unsupported cursor type: %T", cursor)
} }
@ -2063,6 +2096,114 @@ func TestWindowMaxArrayCursor(t *testing.T) {
} }
} }
func TestWindowMeanCountArrayCursor(t *testing.T) {
maxTimestamp := time.Unix(0, math.MaxInt64)
testcases := []aggArrayCursorTest{
{
name: "no window",
every: 0,
inputArrays: []*cursors.IntegerArray{
makeIntegerArray(
5,
mustParseTime("2010-01-01T00:00:00Z"), time.Minute,
func(i int64) int64 { return i + 1 },
),
},
wantMeanCounts: []*cursors.MeanCountArray{
makeMeanCountArray(1, maxTimestamp, 0, func(int64) (float64, int64) { return 3.0, 5 }),
},
},
{
name: "no window fraction result",
every: 0,
inputArrays: []*cursors.IntegerArray{
makeIntegerArray(
6,
mustParseTime("2010-01-01T00:00:00Z"), time.Minute,
func(i int64) int64 { return i + 1 },
),
},
wantMeanCounts: []*cursors.MeanCountArray{
makeMeanCountArray(1, maxTimestamp, 0, func(int64) (float64, int64) { return 3.5, 6 }),
},
},
{
name: "no window empty",
every: 0,
inputArrays: []*cursors.IntegerArray{},
wantMeanCounts: []*cursors.MeanCountArray{},
},
{
name: "window",
every: 30 * time.Minute,
inputArrays: []*cursors.IntegerArray{
makeIntegerArray(
8,
mustParseTime("2010-01-01T00:00:00Z"), 15*time.Minute,
func(i int64) int64 {
return i
},
),
},
wantMeanCounts: []*cursors.MeanCountArray{
makeMeanCountArray(4, mustParseTime("2010-01-01T00:30:00Z"), 30*time.Minute,
func(i int64) (float64, int64) { return 0.5 + float64(i)*2, 2 }),
},
},
{
name: "window offset",
every: 30 * time.Minute,
offset: 5 * time.Minute,
inputArrays: []*cursors.IntegerArray{
makeIntegerArray(
8,
mustParseTime("2010-01-01T00:00:00Z"), 15*time.Minute,
func(i int64) int64 {
return i
},
),
},
wantMeanCounts: []*cursors.MeanCountArray{
makeMeanCountArray(5, mustParseTime("2010-01-01T00:05:00Z"), 30*time.Minute,
func(i int64) (float64, int64) {
return []float64{0, 1.5, 3.5, 5.5, 7}[i], []int64{1, 2, 2, 2, 1}[i]
}),
},
},
{
name: "empty window",
every: 15 * time.Minute,
inputArrays: []*cursors.IntegerArray{
makeIntegerArray(
2,
mustParseTime("2010-01-01T00:05:00Z"), 30*time.Minute,
func(i int64) int64 {
return 100 + i
},
),
},
wantMeanCounts: []*cursors.MeanCountArray{
makeMeanCountArray(2, mustParseTime("2010-01-01T00:15:00Z"), 30*time.Minute,
func(i int64) (float64, int64) { return 100 + float64(i), 1 }),
},
},
}
for _, tc := range testcases {
tc.createCursorFn = func(cur cursors.IntegerArrayCursor, every, offset int64, window interval.Window) cursors.Cursor {
if every != 0 || offset != 0 {
window, _ = interval.NewWindow(
values.MakeDuration(every, 0, false),
values.MakeDuration(every, 0, false),
values.MakeDuration(offset, 0, false),
)
}
return newIntegerWindowMeanCountArrayCursor(cur, window)
}
tc.run(t)
}
}
func TestWindowMeanArrayCursor(t *testing.T) { func TestWindowMeanArrayCursor(t *testing.T) {
maxTimestamp := time.Unix(0, math.MaxInt64) maxTimestamp := time.Unix(0, math.MaxInt64)

View File

@ -6,12 +6,11 @@ package datatypes
import ( import (
encoding_binary "encoding/binary" encoding_binary "encoding/binary"
fmt "fmt" fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
io "io" io "io"
math "math" math "math"
math_bits "math/bits" math_bits "math/bits"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
) )
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.

File diff suppressed because it is too large Load Diff

View File

@ -139,6 +139,7 @@ message ReadResponse {
UNSIGNED = 2 [(gogoproto.enumvalue_customname) = "DataTypeUnsigned"]; UNSIGNED = 2 [(gogoproto.enumvalue_customname) = "DataTypeUnsigned"];
BOOLEAN = 3 [(gogoproto.enumvalue_customname) = "DataTypeBoolean"]; BOOLEAN = 3 [(gogoproto.enumvalue_customname) = "DataTypeBoolean"];
STRING = 4 [(gogoproto.enumvalue_customname) = "DataTypeString"]; STRING = 4 [(gogoproto.enumvalue_customname) = "DataTypeString"];
MULTI = 5 [(gogoproto.enumvalue_customname) = "DataTypeMulti"];
} }
message Frame { message Frame {
@ -150,6 +151,7 @@ message ReadResponse {
UnsignedPointsFrame unsigned_points = 4 [(gogoproto.customname) = "UnsignedPoints"]; UnsignedPointsFrame unsigned_points = 4 [(gogoproto.customname) = "UnsignedPoints"];
BooleanPointsFrame boolean_points = 5 [(gogoproto.customname) = "BooleanPoints"]; BooleanPointsFrame boolean_points = 5 [(gogoproto.customname) = "BooleanPoints"];
StringPointsFrame string_points = 6 [(gogoproto.customname) = "StringPoints"]; StringPointsFrame string_points = 6 [(gogoproto.customname) = "StringPoints"];
MultiPointsFrame float_array = 8 [(gogoproto.customname) = "MultiPoints"];
} }
} }
@ -165,6 +167,36 @@ message ReadResponse {
DataType data_type = 2; DataType data_type = 2;
} }
message FloatValues {
repeated double values = 1;
}
message IntegerValues {
repeated int64 values = 1;
}
message UnsignedValues {
repeated uint64 values = 1;
}
message BooleanValues {
repeated bool values = 1;
}
message StringValues {
repeated string values = 1;
}
message AnyPoints {
oneof data {
FloatValues floats = 1;
IntegerValues integers = 2;
UnsignedValues unsigneds = 3;
BooleanValues booleans = 4;
StringValues strings = 5;
}
}
message MultiPointsFrame {
repeated sfixed64 timestamps = 1;
repeated AnyPoints value_arrays = 2 [(gogoproto.nullable) = false];
}
message FloatPointsFrame { message FloatPointsFrame {
repeated sfixed64 timestamps = 1; repeated sfixed64 timestamps = 1;
repeated double values = 2; repeated double values = 2;

View File

@ -2,3 +2,4 @@ package reads
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@array_cursor.gen.go.tmpldata array_cursor.gen.go.tmpl //go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@array_cursor.gen.go.tmpldata array_cursor.gen.go.tmpl
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@array_cursor.gen.go.tmpldata -o=array_cursor_gen_test.go array_cursor_test.gen.go.tmpl //go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@array_cursor.gen.go.tmpldata -o=array_cursor_gen_test.go array_cursor_test.gen.go.tmpl
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@types.tmpldata response_writer.gen.go.tmpl

View File

@ -304,7 +304,7 @@ func (c *groupNoneCursor) Next() bool {
func (c *groupNoneCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, err error) { func (c *groupNoneCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, err error) {
cur = c.arrayCursors.createCursor(c.row) cur = c.arrayCursors.createCursor(c.row)
if c.agg != nil { if c.agg != nil {
cur, err = newAggregateArrayCursor(c.ctx, c.agg, cur) cur, err = newAggregateArrayCursor(c.ctx, []*datatypes.Aggregate{c.agg}, cur)
} }
return cur, err return cur, err
} }
@ -352,7 +352,7 @@ func (c *groupByCursor) Next() bool {
func (c *groupByCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, err error) { func (c *groupByCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, err error) {
cur = c.arrayCursors.createCursor(seriesRow) cur = c.arrayCursors.createCursor(seriesRow)
if c.agg != nil { if c.agg != nil {
cur, err = newAggregateArrayCursor(c.ctx, c.agg, cur) cur, err = newAggregateArrayCursor(c.ctx, []*datatypes.Aggregate{c.agg}, cur)
} }
return cur, err return cur, err
} }

View File

@ -26,7 +26,6 @@ func (w *ResponseWriter) getFloatPointsFrame() *datatypes.ReadResponse_Frame_Flo
}, },
} }
} }
return res return res
} }
@ -36,6 +35,28 @@ func (w *ResponseWriter) putFloatPointsFrame(f *datatypes.ReadResponse_Frame_Flo
w.buffer.Float = append(w.buffer.Float, f) w.buffer.Float = append(w.buffer.Float, f)
} }
func (w *ResponseWriter) getFloatValues() *datatypes.ReadResponse_AnyPoints_Floats {
var res *datatypes.ReadResponse_AnyPoints_Floats
if len(w.buffer.FloatValues) > 0 {
i := len(w.buffer.FloatValues) - 1
res = w.buffer.FloatValues[i]
w.buffer.FloatValues[i] = nil
w.buffer.FloatValues = w.buffer.FloatValues[:i]
} else {
res = &datatypes.ReadResponse_AnyPoints_Floats{
Floats: &datatypes.ReadResponse_FloatValues{
Values: make([]float64, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) putFloatValues(f *datatypes.ReadResponse_AnyPoints_Floats) {
f.Floats.Values = f.Floats.Values[:0]
w.buffer.FloatValues = append(w.buffer.FloatValues, f)
}
func (w *ResponseWriter) streamFloatArraySeries(cur cursors.FloatArrayCursor) { func (w *ResponseWriter) streamFloatArraySeries(cur cursors.FloatArrayCursor) {
w.sf.DataType = datatypes.DataTypeFloat w.sf.DataType = datatypes.DataTypeFloat
ss := len(w.res.Frames) - 1 ss := len(w.res.Frames) - 1
@ -129,7 +150,6 @@ func (w *ResponseWriter) getIntegerPointsFrame() *datatypes.ReadResponse_Frame_I
}, },
} }
} }
return res return res
} }
@ -139,6 +159,28 @@ func (w *ResponseWriter) putIntegerPointsFrame(f *datatypes.ReadResponse_Frame_I
w.buffer.Integer = append(w.buffer.Integer, f) w.buffer.Integer = append(w.buffer.Integer, f)
} }
func (w *ResponseWriter) getIntegerValues() *datatypes.ReadResponse_AnyPoints_Integers {
var res *datatypes.ReadResponse_AnyPoints_Integers
if len(w.buffer.IntegerValues) > 0 {
i := len(w.buffer.IntegerValues) - 1
res = w.buffer.IntegerValues[i]
w.buffer.IntegerValues[i] = nil
w.buffer.IntegerValues = w.buffer.IntegerValues[:i]
} else {
res = &datatypes.ReadResponse_AnyPoints_Integers{
Integers: &datatypes.ReadResponse_IntegerValues{
Values: make([]int64, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) putIntegerValues(f *datatypes.ReadResponse_AnyPoints_Integers) {
f.Integers.Values = f.Integers.Values[:0]
w.buffer.IntegerValues = append(w.buffer.IntegerValues, f)
}
func (w *ResponseWriter) streamIntegerArraySeries(cur cursors.IntegerArrayCursor) { func (w *ResponseWriter) streamIntegerArraySeries(cur cursors.IntegerArrayCursor) {
w.sf.DataType = datatypes.DataTypeInteger w.sf.DataType = datatypes.DataTypeInteger
ss := len(w.res.Frames) - 1 ss := len(w.res.Frames) - 1
@ -232,7 +274,6 @@ func (w *ResponseWriter) getUnsignedPointsFrame() *datatypes.ReadResponse_Frame_
}, },
} }
} }
return res return res
} }
@ -242,6 +283,28 @@ func (w *ResponseWriter) putUnsignedPointsFrame(f *datatypes.ReadResponse_Frame_
w.buffer.Unsigned = append(w.buffer.Unsigned, f) w.buffer.Unsigned = append(w.buffer.Unsigned, f)
} }
func (w *ResponseWriter) getUnsignedValues() *datatypes.ReadResponse_AnyPoints_Unsigneds {
var res *datatypes.ReadResponse_AnyPoints_Unsigneds
if len(w.buffer.UnsignedValues) > 0 {
i := len(w.buffer.UnsignedValues) - 1
res = w.buffer.UnsignedValues[i]
w.buffer.UnsignedValues[i] = nil
w.buffer.UnsignedValues = w.buffer.UnsignedValues[:i]
} else {
res = &datatypes.ReadResponse_AnyPoints_Unsigneds{
Unsigneds: &datatypes.ReadResponse_UnsignedValues{
Values: make([]uint64, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) putUnsignedValues(f *datatypes.ReadResponse_AnyPoints_Unsigneds) {
f.Unsigneds.Values = f.Unsigneds.Values[:0]
w.buffer.UnsignedValues = append(w.buffer.UnsignedValues, f)
}
func (w *ResponseWriter) streamUnsignedArraySeries(cur cursors.UnsignedArrayCursor) { func (w *ResponseWriter) streamUnsignedArraySeries(cur cursors.UnsignedArrayCursor) {
w.sf.DataType = datatypes.DataTypeUnsigned w.sf.DataType = datatypes.DataTypeUnsigned
ss := len(w.res.Frames) - 1 ss := len(w.res.Frames) - 1
@ -335,7 +398,6 @@ func (w *ResponseWriter) getStringPointsFrame() *datatypes.ReadResponse_Frame_St
}, },
} }
} }
return res return res
} }
@ -345,6 +407,28 @@ func (w *ResponseWriter) putStringPointsFrame(f *datatypes.ReadResponse_Frame_St
w.buffer.String = append(w.buffer.String, f) w.buffer.String = append(w.buffer.String, f)
} }
func (w *ResponseWriter) getStringValues() *datatypes.ReadResponse_AnyPoints_Strings {
var res *datatypes.ReadResponse_AnyPoints_Strings
if len(w.buffer.StringValues) > 0 {
i := len(w.buffer.StringValues) - 1
res = w.buffer.StringValues[i]
w.buffer.StringValues[i] = nil
w.buffer.StringValues = w.buffer.StringValues[:i]
} else {
res = &datatypes.ReadResponse_AnyPoints_Strings{
Strings: &datatypes.ReadResponse_StringValues{
Values: make([]string, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) putStringValues(f *datatypes.ReadResponse_AnyPoints_Strings) {
f.Strings.Values = f.Strings.Values[:0]
w.buffer.StringValues = append(w.buffer.StringValues, f)
}
func (w *ResponseWriter) streamStringArraySeries(cur cursors.StringArrayCursor) { func (w *ResponseWriter) streamStringArraySeries(cur cursors.StringArrayCursor) {
w.sf.DataType = datatypes.DataTypeString w.sf.DataType = datatypes.DataTypeString
ss := len(w.res.Frames) - 1 ss := len(w.res.Frames) - 1
@ -438,7 +522,6 @@ func (w *ResponseWriter) getBooleanPointsFrame() *datatypes.ReadResponse_Frame_B
}, },
} }
} }
return res return res
} }
@ -448,6 +531,28 @@ func (w *ResponseWriter) putBooleanPointsFrame(f *datatypes.ReadResponse_Frame_B
w.buffer.Boolean = append(w.buffer.Boolean, f) w.buffer.Boolean = append(w.buffer.Boolean, f)
} }
func (w *ResponseWriter) getBooleanValues() *datatypes.ReadResponse_AnyPoints_Booleans {
var res *datatypes.ReadResponse_AnyPoints_Booleans
if len(w.buffer.BooleanValues) > 0 {
i := len(w.buffer.BooleanValues) - 1
res = w.buffer.BooleanValues[i]
w.buffer.BooleanValues[i] = nil
w.buffer.BooleanValues = w.buffer.BooleanValues[:i]
} else {
res = &datatypes.ReadResponse_AnyPoints_Booleans{
Booleans: &datatypes.ReadResponse_BooleanValues{
Values: make([]bool, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) putBooleanValues(f *datatypes.ReadResponse_AnyPoints_Booleans) {
f.Booleans.Values = f.Booleans.Values[:0]
w.buffer.BooleanValues = append(w.buffer.BooleanValues, f)
}
func (w *ResponseWriter) streamBooleanArraySeries(cur cursors.BooleanArrayCursor) { func (w *ResponseWriter) streamBooleanArraySeries(cur cursors.BooleanArrayCursor) {
w.sf.DataType = datatypes.DataTypeBoolean w.sf.DataType = datatypes.DataTypeBoolean
ss := len(w.res.Frames) - 1 ss := len(w.res.Frames) - 1

View File

@ -0,0 +1,135 @@
package reads
import (
"github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors"
)
{{with $types := .}}
{{range $k := $types}}
func (w *ResponseWriter) get{{$k.Name}}PointsFrame() *datatypes.ReadResponse_Frame_{{$k.Name}}Points {
var res *datatypes.ReadResponse_Frame_{{$k.Name}}Points
if len(w.buffer.{{$k.Name}}) > 0 {
i := len(w.buffer.{{$k.Name}}) - 1
res = w.buffer.{{$k.Name}}[i]
w.buffer.{{$k.Name}}[i] = nil
w.buffer.{{$k.Name}} = w.buffer.{{$k.Name}}[:i]
} else {
res = &datatypes.ReadResponse_Frame_{{$k.Name}}Points{
{{$k.Name}}Points: &datatypes.ReadResponse_{{$k.Name}}PointsFrame{
Timestamps: make([]int64, 0, batchSize),
Values: make([]{{$k.Type}}, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) put{{$k.Name}}PointsFrame(f *datatypes.ReadResponse_Frame_{{$k.Name}}Points) {
f.{{$k.Name}}Points.Timestamps = f.{{$k.Name}}Points.Timestamps[:0]
f.{{$k.Name}}Points.Values = f.{{$k.Name}}Points.Values[:0]
w.buffer.{{$k.Name}} = append(w.buffer.{{$k.Name}}, f)
}
func (w *ResponseWriter) get{{$k.Name}}Values() *datatypes.ReadResponse_AnyPoints_{{$k.Name}}s {
var res *datatypes.ReadResponse_AnyPoints_{{$k.Name}}s
if len(w.buffer.{{$k.Name}}Values) > 0 {
i := len(w.buffer.{{$k.Name}}Values) - 1
res = w.buffer.{{$k.Name}}Values[i]
w.buffer.{{$k.Name}}Values[i] = nil
w.buffer.{{$k.Name}}Values = w.buffer.{{$k.Name}}Values[:i]
} else {
res = &datatypes.ReadResponse_AnyPoints_{{$k.Name}}s{
{{$k.Name}}s: &datatypes.ReadResponse_{{$k.Name}}Values{
Values: make([]{{$k.Type}}, 0, batchSize),
},
}
}
return res
}
func (w *ResponseWriter) put{{$k.Name}}Values(f *datatypes.ReadResponse_AnyPoints_{{$k.Name}}s) {
f.{{$k.Name}}s.Values = f.{{$k.Name}}s.Values[:0]
w.buffer.{{$k.Name}}Values = append(w.buffer.{{$k.Name}}Values, f)
}
func (w *ResponseWriter) stream{{$k.Name}}ArraySeries(cur cursors.{{$k.Name}}ArrayCursor) {
w.sf.DataType = datatypes.DataType{{$k.Name}}
ss := len(w.res.Frames) - 1
a := cur.Next()
if len(a.Timestamps) == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) stream{{$k.Name}}ArrayPoints(cur cursors.{{$k.Name}}ArrayCursor) {
w.sf.DataType = datatypes.DataType{{$k.Name}}
ss := len(w.res.Frames) - 1
p := w.get{{$k.Name}}PointsFrame()
frame := p.{{$k.Name}}Points
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var seriesValueCount = 0
for {
// If the number of values produced by cur > 1000,
// cur.Next() will produce batches of values that are of
// length ≤ 1000.
// We attempt to limit the frame Timestamps / Values lengths
// the same to avoid allocations. These frames are recycled
// after flushing so that on repeated use there should be enough space
// to append values from a into frame without additional allocations.
a := cur.Next()
if len(a.Timestamps) == 0 {
break
}
seriesValueCount += a.Len()
// As specified in the struct definition, w.sz is an estimated
// size (in bytes) of the buffered data. It is therefore a
// deliberate choice to accumulate using the array Size, which is
// cheap to calculate. Calling frame.Size() can be expensive
// when using varint encoding for numbers.
w.sz += a.Size()
frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
frame.Values = append(frame.Values, a.Values...)
// given the expectation of cur.Next, we attempt to limit
// the number of values appended to the frame to batchSize (1000)
needsFrame := len(frame.Timestamps) >= batchSize
if w.sz >= writeSize {
needsFrame = true
w.Flush()
if w.err != nil {
break
}
}
if needsFrame {
// new frames are returned with Timestamps and Values preallocated
// to a minimum of batchSize length to reduce further allocations.
p = w.get{{$k.Name}}PointsFrame()
frame = p.{{$k.Name}}Points
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
}
}
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
{{end}}
{{end}}

View File

@ -3,11 +3,10 @@ package reads
import ( import (
"fmt" "fmt"
"google.golang.org/grpc/metadata"
"github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/storage/reads/datatypes" "github.com/influxdata/influxdb/storage/reads/datatypes"
"github.com/influxdata/influxdb/tsdb/cursors" "github.com/influxdata/influxdb/tsdb/cursors"
"google.golang.org/grpc/metadata"
) )
type ResponseStream interface { type ResponseStream interface {
@ -38,13 +37,19 @@ type ResponseWriter struct {
vc int // total value count vc int // total value count
buffer struct { buffer struct {
Float []*datatypes.ReadResponse_Frame_FloatPoints Float []*datatypes.ReadResponse_Frame_FloatPoints
Integer []*datatypes.ReadResponse_Frame_IntegerPoints Integer []*datatypes.ReadResponse_Frame_IntegerPoints
Unsigned []*datatypes.ReadResponse_Frame_UnsignedPoints Unsigned []*datatypes.ReadResponse_Frame_UnsignedPoints
Boolean []*datatypes.ReadResponse_Frame_BooleanPoints Boolean []*datatypes.ReadResponse_Frame_BooleanPoints
String []*datatypes.ReadResponse_Frame_StringPoints String []*datatypes.ReadResponse_Frame_StringPoints
Series []*datatypes.ReadResponse_Frame_Series Series []*datatypes.ReadResponse_Frame_Series
Group []*datatypes.ReadResponse_Frame_Group Group []*datatypes.ReadResponse_Frame_Group
Multi []*datatypes.ReadResponse_Frame_MultiPoints
FloatValues []*datatypes.ReadResponse_AnyPoints_Floats
IntegerValues []*datatypes.ReadResponse_AnyPoints_Integers
UnsignedValues []*datatypes.ReadResponse_AnyPoints_Unsigneds
BooleanValues []*datatypes.ReadResponse_AnyPoints_Booleans
StringValues []*datatypes.ReadResponse_AnyPoints_Strings
} }
hints datatypes.HintFlags hints datatypes.HintFlags
@ -229,6 +234,8 @@ func (w *ResponseWriter) streamCursor(cur cursors.Cursor) {
w.streamBooleanArraySeries(cur) w.streamBooleanArraySeries(cur)
case cursors.StringArrayCursor: case cursors.StringArrayCursor:
w.streamStringArraySeries(cur) w.streamStringArraySeries(cur)
case cursors.MeanCountArrayCursor:
w.streamMeanCountArraySeries(cur)
default: default:
panic(fmt.Sprintf("unreachable: %T", cur)) panic(fmt.Sprintf("unreachable: %T", cur))
} }
@ -245,6 +252,8 @@ func (w *ResponseWriter) streamCursor(cur cursors.Cursor) {
w.streamBooleanArrayPoints(cur) w.streamBooleanArrayPoints(cur)
case cursors.StringArrayCursor: case cursors.StringArrayCursor:
w.streamStringArrayPoints(cur) w.streamStringArrayPoints(cur)
case cursors.MeanCountArrayCursor:
w.streamMeanCountArrayPoints(cur)
default: default:
panic(fmt.Sprintf("unreachable: %T", cur)) panic(fmt.Sprintf("unreachable: %T", cur))
} }
@ -277,6 +286,8 @@ func (w *ResponseWriter) Flush() {
w.putBooleanPointsFrame(p) w.putBooleanPointsFrame(p)
case *datatypes.ReadResponse_Frame_StringPoints: case *datatypes.ReadResponse_Frame_StringPoints:
w.putStringPointsFrame(p) w.putStringPointsFrame(p)
case *datatypes.ReadResponse_Frame_MultiPoints:
w.putMultiPointsFrame(p)
case *datatypes.ReadResponse_Frame_Series: case *datatypes.ReadResponse_Frame_Series:
w.putSeriesFrame(p) w.putSeriesFrame(p)
case *datatypes.ReadResponse_Frame_Group: case *datatypes.ReadResponse_Frame_Group:
@ -285,3 +296,124 @@ func (w *ResponseWriter) Flush() {
} }
w.res.Frames = w.res.Frames[:0] w.res.Frames = w.res.Frames[:0]
} }
// The MultiPoints <==> MeanCount converters do not fit the codegen pattern in response_writer.gen.go
func (w *ResponseWriter) getMultiPointsFrameForMeanCount() *datatypes.ReadResponse_Frame_MultiPoints {
var res *datatypes.ReadResponse_Frame_MultiPoints
if len(w.buffer.Multi) > 0 {
i := len(w.buffer.Multi) - 1
res = w.buffer.Multi[i]
w.buffer.Multi[i] = nil
w.buffer.Multi = w.buffer.Multi[:i]
} else {
res = &datatypes.ReadResponse_Frame_MultiPoints{
MultiPoints: &datatypes.ReadResponse_MultiPointsFrame{
Timestamps: make([]int64, 0, batchSize),
},
}
}
res.MultiPoints.ValueArrays = append(res.MultiPoints.ValueArrays, datatypes.ReadResponse_AnyPoints{Data: w.getFloatValues()})
res.MultiPoints.ValueArrays = append(res.MultiPoints.ValueArrays, datatypes.ReadResponse_AnyPoints{Data: w.getIntegerValues()})
return res
}
func (w *ResponseWriter) putMultiPointsFrame(f *datatypes.ReadResponse_Frame_MultiPoints) {
f.MultiPoints.Timestamps = f.MultiPoints.Timestamps[:0]
for _, v := range f.MultiPoints.ValueArrays {
switch v := v.Data.(type) {
case *datatypes.ReadResponse_AnyPoints_Floats:
w.putFloatValues(v)
case *datatypes.ReadResponse_AnyPoints_Integers:
w.putIntegerValues(v)
case *datatypes.ReadResponse_AnyPoints_Unsigneds:
w.putUnsignedValues(v)
case *datatypes.ReadResponse_AnyPoints_Booleans:
w.putBooleanValues(v)
case *datatypes.ReadResponse_AnyPoints_Strings:
w.putStringValues(v)
}
}
f.MultiPoints.ValueArrays = f.MultiPoints.ValueArrays[:0]
w.buffer.Multi = append(w.buffer.Multi, f)
}
func (w *ResponseWriter) streamMeanCountArraySeries(cur cursors.MeanCountArrayCursor) {
w.sf.DataType = datatypes.DataTypeMulti
ss := len(w.res.Frames) - 1
a := cur.Next()
if len(a.Timestamps) == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}
func (w *ResponseWriter) streamMeanCountArrayPoints(cur cursors.MeanCountArrayCursor) {
w.sf.DataType = datatypes.DataTypeMulti
ss := len(w.res.Frames) - 1
p := w.getMultiPointsFrameForMeanCount()
frame := p.MultiPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
var seriesValueCount = 0
for {
// If the number of values produced by cur > 1000,
// cur.Next() will produce batches of values that are of
// length ≤ 1000.
// We attempt to limit the frame Timestamps / Values lengths
// the same to avoid allocations. These frames are recycled
// after flushing so that on repeated use there should be enough space
// to append values from a into frame without additional allocations.
a := cur.Next()
if len(a.Timestamps) == 0 {
break
}
seriesValueCount += a.Len()
// As specified in the struct definition, w.sz is an estimated
// size (in bytes) of the buffered data. It is therefore a
// deliberate choice to accumulate using the array Size, which is
// cheap to calculate. Calling frame.Size() can be expensive
// when using varint encoding for numbers.
w.sz += a.Size()
frame.Timestamps = append(frame.Timestamps, a.Timestamps...)
// This is guaranteed to be the right layout since we called getMultiPointsFrameForMeanCount.
frame.ValueArrays[0].GetFloats().Values = append(frame.ValueArrays[0].GetFloats().Values, a.Values0...)
frame.ValueArrays[1].GetIntegers().Values = append(frame.ValueArrays[1].GetIntegers().Values, a.Values1...)
// given the expectation of cur.Next, we attempt to limit
// the number of values appended to the frame to batchSize (1000)
needsFrame := len(frame.Timestamps) >= batchSize
if w.sz >= writeSize {
needsFrame = true
w.Flush()
if w.err != nil {
break
}
}
if needsFrame {
// new frames are returned with Timestamps and Values preallocated
// to a minimum of batchSize length to reduce further allocations.
p = w.getMultiPointsFrameForMeanCount()
frame = p.MultiPoints
w.res.Frames = append(w.res.Frames, datatypes.ReadResponse_Frame{Data: p})
}
}
w.vc += seriesValueCount
if seriesValueCount == 0 {
w.sz -= w.sf.Size()
w.putSeriesFrame(w.res.Frames[ss].Data.(*datatypes.ReadResponse_Frame_Series))
w.res.Frames = w.res.Frames[:ss]
} else if w.sz > writeSize {
w.Flush()
}
}

View File

@ -0,0 +1,259 @@
package cursors
{{range .}}
{{- $typename := print .Name "Array" }}
{{- $hasType := or (and .Type true) false }}
type {{ $typename }} struct {
Timestamps []int64
{{- if $hasType }}
Values []{{.Type}}
{{- end }}
}
func New{{$typename}}Len(sz int) *{{$typename}} {
return &{{$typename}}{
Timestamps: make([]int64, sz),
{{- if $hasType }}
Values: make([]{{.Type}}, sz),
{{- end }}
}
}
func (a *{{ $typename }}) MinTime() int64 {
return a.Timestamps[0]
}
func (a *{{ $typename }}) MaxTime() int64 {
return a.Timestamps[len(a.Timestamps)-1]
}
func (a *{{ $typename}}) Len() int {
return len(a.Timestamps)
}
// search performs a binary search for UnixNano() v in a
// and returns the position, i, where v would be inserted.
// An additional check of a.Timestamps[i] == v is necessary
// to determine if the value v exists.
func (a *{{ $typename }}) search(v int64) int {
// Define: f(x) → a.Timestamps[x] < v
// Define: f(-1) == true, f(n) == false
// Invariant: f(lo-1) == true, f(hi) == false
lo := 0
hi := a.Len()
for lo < hi {
mid := int(uint(lo+hi) >> 1)
if a.Timestamps[mid] < v {
lo = mid + 1 // preserves f(lo-1) == true
} else {
hi = mid // preserves f(hi) == false
}
}
// lo == hi
return lo
}
// FindRange returns the positions where min and max would be
// inserted into the array. If a[0].UnixNano() > max or
// a[len-1].UnixNano() < min then FindRange returns (-1, -1)
// indicating the array is outside the [min, max]. The values must
// be deduplicated and sorted before calling FindRange or the results
// are undefined.
func (a *{{ $typename }}) FindRange(min, max int64) (int, int) {
if a.Len() == 0 || min > max {
return -1, -1
}
minVal := a.MinTime()
maxVal := a.MaxTime()
if maxVal < min || minVal > max {
return -1, -1
}
return a.search(min), a.search(max)
}
{{- if $hasType }}
// Exclude removes the subset of values in [min, max]. The values must
// be deduplicated and sorted before calling Exclude or the results are undefined.
func (a *{{ $typename }}) Exclude(min, max int64) {
rmin, rmax := a.FindRange(min, max)
if rmin == -1 && rmax == -1 {
return
}
// a.Timestamps[rmin] ≥ min
// a.Timestamps[rmax] ≥ max
if rmax < a.Len() {
if a.Timestamps[rmax] == max {
rmax++
}
rest := a.Len()-rmax
if rest > 0 {
ts := a.Timestamps[:rmin+rest]
copy(ts[rmin:], a.Timestamps[rmax:])
a.Timestamps = ts
vs := a.Values[:rmin+rest]
copy(vs[rmin:], a.Values[rmax:])
a.Values = vs
return
}
}
a.Timestamps = a.Timestamps[:rmin]
a.Values = a.Values[:rmin]
}
// Include returns the subset values between min and max inclusive. The values must
// be deduplicated and sorted before calling Include or the results are undefined.
func (a *{{ $typename }}) Include(min, max int64) {
rmin, rmax := a.FindRange(min, max)
if rmin == -1 && rmax == -1 {
a.Timestamps = a.Timestamps[:0]
a.Values = a.Values[:0]
return
}
// a.Timestamps[rmin] ≥ min
// a.Timestamps[rmax] ≥ max
if rmax < a.Len() && a.Timestamps[rmax] == max {
rmax++
}
if rmin > -1 {
ts := a.Timestamps[:rmax-rmin]
copy(ts, a.Timestamps[rmin:rmax])
a.Timestamps = ts
vs := a.Values[:rmax-rmin]
copy(vs, a.Values[rmin:rmax])
a.Values = vs
} else {
a.Timestamps = a.Timestamps[:rmax]
a.Values = a.Values[:rmax]
}
}
// Merge overlays b to top of a. If two values conflict with
// the same timestamp, b is used. Both a and b must be sorted
// in ascending order.
func (a *{{ $typename }}) Merge(b *{{ $typename }}) {
if a.Len() == 0 {
*a = *b
return
}
if b.Len() == 0 {
return
}
// Normally, both a and b should not contain duplicates. Due to a bug in older versions, it's
// possible stored blocks might contain duplicate values. Remove them if they exists before
// merging.
// a = a.Deduplicate()
// b = b.Deduplicate()
if a.MaxTime() < b.MinTime() {
a.Timestamps = append(a.Timestamps, b.Timestamps...)
a.Values = append(a.Values, b.Values...)
return
}
if b.MaxTime() < a.MinTime() {
var tmp {{$typename}}
tmp.Timestamps = append(b.Timestamps, a.Timestamps...)
tmp.Values = append(b.Values, a.Values...)
*a = tmp
return
}
out := New{{$typename}}Len(a.Len()+b.Len())
i, j, k := 0, 0, 0
for i < len(a.Timestamps) && j < len(b.Timestamps) {
if a.Timestamps[i] < b.Timestamps[j] {
out.Timestamps[k] = a.Timestamps[i]
out.Values[k] = a.Values[i]
i++
} else if a.Timestamps[i] == b.Timestamps[j] {
out.Timestamps[k] = b.Timestamps[j]
out.Values[k] = b.Values[j]
i++
j++
} else {
out.Timestamps[k] = b.Timestamps[j]
out.Values[k] = b.Values[j]
j++
}
k++
}
if i < len(a.Timestamps) {
n := copy(out.Timestamps[k:], a.Timestamps[i:])
copy(out.Values[k:], a.Values[i:])
k += n
} else if j < len(b.Timestamps) {
n := copy(out.Timestamps[k:], b.Timestamps[j:])
copy(out.Values[k:], b.Values[j:])
k += n
}
a.Timestamps = out.Timestamps[:k]
a.Values = out.Values[:k]
}
{{ else }}
// Exclude removes the subset of timestamps in [min, max]. The timestamps must
// be deduplicated and sorted before calling Exclude or the results are undefined.
func (a *{{ $typename }}) Exclude(min, max int64) {
rmin, rmax := a.FindRange(min, max)
if rmin == -1 && rmax == -1 {
return
}
// a.Timestamps[rmin] ≥ min
// a.Timestamps[rmax] ≥ max
if rmax < a.Len() {
if a.Timestamps[rmax] == max {
rmax++
}
rest := a.Len()-rmax
if rest > 0 {
ts := a.Timestamps[:rmin+rest]
copy(ts[rmin:], a.Timestamps[rmax:])
a.Timestamps = ts
return
}
}
a.Timestamps = a.Timestamps[:rmin]
}
// Contains returns true if values exist between min and max inclusive. The
// values must be sorted before calling Contains or the results are undefined.
func (a *{{ $typename }}) Contains(min, max int64) bool {
rmin, rmax := a.FindRange(min, max)
if rmin == -1 && rmax == -1 {
return false
}
// a.Timestamps[rmin] ≥ min
// a.Timestamps[rmax] ≥ max
if a.Timestamps[rmin] == min {
return true
}
if rmax < a.Len() && a.Timestamps[rmax] == max {
return true
}
return rmax-rmin > 0
}
{{ end }}
{{ end }}

View File

@ -0,0 +1,26 @@
[
{
"Name":"Float",
"Type":"float64"
},
{
"Name":"Integer",
"Type":"int64"
},
{
"Name":"Unsigned",
"Type":"uint64"
},
{
"Name":"String",
"Type":"string"
},
{
"Name":"Boolean",
"Type":"bool"
},
{
"Name":"Timestamp",
"Type": null
}
]

View File

@ -1,5 +1,40 @@
package cursors package cursors
// MeanCountArray is too different to codegen easily
type MeanCountArray struct {
Timestamps []int64
Values0 []float64
Values1 []int64
}
func NewMeanCountArrayLen(sz int) *MeanCountArray {
return &MeanCountArray{
Timestamps: make([]int64, sz),
Values0: make([]float64, sz),
Values1: make([]int64, sz),
}
}
func (a *MeanCountArray) MinTime() int64 {
return a.Timestamps[0]
}
func (a *MeanCountArray) MaxTime() int64 {
return a.Timestamps[len(a.Timestamps)-1]
}
func (a *MeanCountArray) Len() int {
return len(a.Timestamps)
}
// Sizes for each type are different and not codegen-able
func (a *MeanCountArray) Size() int {
// size of timestamps + values
return len(a.Timestamps)*8 + len(a.Values0)*8 + len(a.Values1)*8
}
func (a *FloatArray) Size() int { func (a *FloatArray) Size() int {
// size of timestamps + values // size of timestamps + values
return len(a.Timestamps)*8 + len(a.Values)*8 return len(a.Timestamps)*8 + len(a.Values)*8

View File

@ -39,6 +39,11 @@ type BooleanArrayCursor interface {
Next() *BooleanArray Next() *BooleanArray
} }
type MeanCountArrayCursor interface {
Cursor
Next() *MeanCountArray
}
type CursorRequest struct { type CursorRequest struct {
Name []byte Name []byte
Tags models.Tags Tags models.Tags

View File

@ -1 +1,3 @@
package cursors package cursors
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@arrayvalues.gen.go.tmpldata arrayvalues.gen.go.tmpl