feat(values): add window mean array cursor
parent
c523fd22f7
commit
2acb8b897a
2
go.mod
2
go.mod
|
@ -47,7 +47,7 @@ require (
|
|||
github.com/hashicorp/vault/api v1.0.2
|
||||
github.com/imdario/mergo v0.3.9 // indirect
|
||||
github.com/influxdata/cron v0.0.0-20191203200038-ded12750aac6
|
||||
github.com/influxdata/flux v0.83.2
|
||||
github.com/influxdata/flux v0.83.3
|
||||
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69
|
||||
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
|
||||
github.com/influxdata/pkg-config v0.2.3
|
||||
|
|
4
go.sum
4
go.sum
|
@ -321,8 +321,8 @@ github.com/influxdata/arrow/go/arrow v0.0.0-20200917142114-986e413c1705 h1:o8xdE
|
|||
github.com/influxdata/arrow/go/arrow v0.0.0-20200917142114-986e413c1705/go.mod h1:QNYViu/X0HXDHw7m3KXzWSVXIbfUvJqBFe6Gj8/pYA0=
|
||||
github.com/influxdata/cron v0.0.0-20191203200038-ded12750aac6 h1:OtjKkeWDjUbyMi82C7XXy7Tvm2LXMwiBBXyFIGNPaGA=
|
||||
github.com/influxdata/cron v0.0.0-20191203200038-ded12750aac6/go.mod h1:XabtPPW2qsCg0tl+kjaPU+cFS+CjQXEXbT1VJvHT4og=
|
||||
github.com/influxdata/flux v0.83.2 h1:b4S5IoEXFUhoqlF9LLRDmvj67cA5UNkBtmmmtUVtuAg=
|
||||
github.com/influxdata/flux v0.83.2/go.mod h1:+6FzHdZdwYjEIa2iuQEJ92x+C2A8X1jI0qdpVT0DJfM=
|
||||
github.com/influxdata/flux v0.83.3 h1:k+6XvZlV82tTNwoM3U9gTZ2W3C2S0Clr/t0bm86/4b0=
|
||||
github.com/influxdata/flux v0.83.3/go.mod h1:+6FzHdZdwYjEIa2iuQEJ92x+C2A8X1jI0qdpVT0DJfM=
|
||||
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0fXO4ZE/lFGIE84G6rIV5SJN3P3sjIXAP1a8eU=
|
||||
github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69/go.mod h1:pwymjR6SrP3gD3pRj9RJwdl1j5s3doEEV8gS4X9qSzA=
|
||||
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo=
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
# List any generated files here
|
||||
TARGETS = array_cursor.gen.go \
|
||||
array_cursor_test.gen.go
|
||||
|
||||
# List any source files used to generate the targets here
|
||||
SOURCES = gen.go \
|
||||
array_cursor.gen.go.tmpl \
|
||||
array_cursor_test.gen.go.tmpl \
|
||||
array_cursor.gen.go.tmpldata \
|
||||
types.tmpldata
|
||||
|
||||
# List any directories that have their own Makefile here
|
||||
SUBDIRS = datatypes
|
||||
|
||||
# Default target
|
||||
all: $(SUBDIRS) $(TARGETS)
|
||||
|
||||
# Recurse into subdirs for same make goal
|
||||
$(SUBDIRS):
|
||||
$(MAKE) -C $@ $(MAKECMDGOALS)
|
||||
|
||||
# Clean all targets recursively
|
||||
clean: $(SUBDIRS)
|
||||
rm -f $(TARGETS)
|
||||
|
||||
# Define go generate if not already defined
|
||||
GO_GENERATE := go generate
|
||||
|
||||
# Run go generate for the targets
|
||||
$(TARGETS): $(SOURCES)
|
||||
$(GO_GENERATE) -x
|
||||
|
||||
.PHONY: all clean $(SUBDIRS)
|
|
@ -8,7 +8,12 @@ package reads
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/values"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
||||
)
|
||||
|
||||
|
@ -19,6 +24,26 @@ const (
|
|||
MaxPointsPerBlock = 1000
|
||||
)
|
||||
|
||||
func newWindowMeanArrayCursor(cur cursors.Cursor, window execute.Window) (cursors.Cursor, error) {
|
||||
switch cur := cur.(type) {
|
||||
|
||||
case cursors.FloatArrayCursor:
|
||||
return newFloatWindowMeanArrayCursor(cur, window), nil
|
||||
|
||||
case cursors.IntegerArrayCursor:
|
||||
return newIntegerWindowMeanArrayCursor(cur, window), nil
|
||||
|
||||
case cursors.UnsignedArrayCursor:
|
||||
return newUnsignedWindowMeanArrayCursor(cur, window), nil
|
||||
|
||||
default:
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: fmt.Sprintf("unsupported input type for mean aggregate: %s", arrayCursorType(cur)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ********************
|
||||
// Float Array Cursor
|
||||
|
||||
|
@ -241,6 +266,120 @@ func (c *integerFloatCountArrayCursor) Next() *cursors.IntegerArray {
|
|||
}
|
||||
}
|
||||
|
||||
type floatWindowMeanArrayCursor struct {
|
||||
cursors.FloatArrayCursor
|
||||
res *cursors.FloatArray
|
||||
tmp *cursors.FloatArray
|
||||
window execute.Window
|
||||
}
|
||||
|
||||
func newFloatWindowMeanArrayCursor(cur cursors.FloatArrayCursor, window execute.Window) *floatWindowMeanArrayCursor {
|
||||
resLen := MaxPointsPerBlock
|
||||
if window.Every.IsZero() {
|
||||
resLen = 1
|
||||
}
|
||||
return &floatWindowMeanArrayCursor{
|
||||
FloatArrayCursor: cur,
|
||||
res: cursors.NewFloatArrayLen(resLen),
|
||||
tmp: &cursors.FloatArray{},
|
||||
window: window,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *floatWindowMeanArrayCursor) Stats() cursors.CursorStats {
|
||||
return c.FloatArrayCursor.Stats()
|
||||
}
|
||||
|
||||
func (c *floatWindowMeanArrayCursor) Next() *cursors.FloatArray {
|
||||
pos := 0
|
||||
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
|
||||
c.res.Values = c.res.Values[:cap(c.res.Values)]
|
||||
|
||||
var a *cursors.FloatArray
|
||||
if c.tmp.Len() > 0 {
|
||||
a = c.tmp
|
||||
} else {
|
||||
a = c.FloatArrayCursor.Next()
|
||||
}
|
||||
|
||||
if a.Len() == 0 {
|
||||
return &cursors.FloatArray{}
|
||||
}
|
||||
|
||||
rowIdx := 0
|
||||
var sum float64
|
||||
var count int64
|
||||
|
||||
var windowEnd int64
|
||||
if !c.window.Every.IsZero() {
|
||||
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
|
||||
} else {
|
||||
windowEnd = math.MaxInt64
|
||||
}
|
||||
windowHasPoints := false
|
||||
|
||||
// enumerate windows
|
||||
WINDOWS:
|
||||
for {
|
||||
for ; rowIdx < a.Len(); rowIdx++ {
|
||||
ts := a.Timestamps[rowIdx]
|
||||
if !c.window.Every.IsZero() && ts >= windowEnd {
|
||||
// new window detected, close the current window
|
||||
// do not generate a point for empty windows
|
||||
if windowHasPoints {
|
||||
c.res.Timestamps[pos] = windowEnd
|
||||
c.res.Values[pos] = sum / float64(count)
|
||||
pos++
|
||||
if pos >= MaxPointsPerBlock {
|
||||
// the output array is full,
|
||||
// save the remaining points in the input array in tmp.
|
||||
// they will be processed in the next call to Next()
|
||||
c.tmp.Timestamps = a.Timestamps[rowIdx:]
|
||||
c.tmp.Values = a.Values[rowIdx:]
|
||||
break WINDOWS
|
||||
}
|
||||
}
|
||||
|
||||
// start the new window
|
||||
sum = 0
|
||||
count = 0
|
||||
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
|
||||
windowHasPoints = false
|
||||
|
||||
continue WINDOWS
|
||||
} else {
|
||||
sum += a.Values[rowIdx]
|
||||
count++
|
||||
windowHasPoints = true
|
||||
}
|
||||
}
|
||||
|
||||
// Clear buffered timestamps & values if we make it through a cursor.
|
||||
// The break above will skip this if a cursor is partially read.
|
||||
c.tmp.Timestamps = nil
|
||||
c.tmp.Values = nil
|
||||
|
||||
// get the next chunk
|
||||
a = c.FloatArrayCursor.Next()
|
||||
if a.Len() == 0 {
|
||||
// write the final point
|
||||
// do not generate a point for empty windows
|
||||
if windowHasPoints {
|
||||
c.res.Timestamps[pos] = windowEnd
|
||||
c.res.Values[pos] = sum / float64(count)
|
||||
pos++
|
||||
}
|
||||
break WINDOWS
|
||||
}
|
||||
rowIdx = 0
|
||||
}
|
||||
|
||||
c.res.Timestamps = c.res.Timestamps[:pos]
|
||||
c.res.Values = c.res.Values[:pos]
|
||||
|
||||
return c.res
|
||||
}
|
||||
|
||||
type floatEmptyArrayCursor struct {
|
||||
res cursors.FloatArray
|
||||
}
|
||||
|
@ -474,6 +613,120 @@ func (c *integerIntegerCountArrayCursor) Next() *cursors.IntegerArray {
|
|||
}
|
||||
}
|
||||
|
||||
type integerWindowMeanArrayCursor struct {
|
||||
cursors.IntegerArrayCursor
|
||||
res *cursors.FloatArray
|
||||
tmp *cursors.IntegerArray
|
||||
window execute.Window
|
||||
}
|
||||
|
||||
func newIntegerWindowMeanArrayCursor(cur cursors.IntegerArrayCursor, window execute.Window) *integerWindowMeanArrayCursor {
|
||||
resLen := MaxPointsPerBlock
|
||||
if window.Every.IsZero() {
|
||||
resLen = 1
|
||||
}
|
||||
return &integerWindowMeanArrayCursor{
|
||||
IntegerArrayCursor: cur,
|
||||
res: cursors.NewFloatArrayLen(resLen),
|
||||
tmp: &cursors.IntegerArray{},
|
||||
window: window,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *integerWindowMeanArrayCursor) Stats() cursors.CursorStats {
|
||||
return c.IntegerArrayCursor.Stats()
|
||||
}
|
||||
|
||||
func (c *integerWindowMeanArrayCursor) Next() *cursors.FloatArray {
|
||||
pos := 0
|
||||
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
|
||||
c.res.Values = c.res.Values[:cap(c.res.Values)]
|
||||
|
||||
var a *cursors.IntegerArray
|
||||
if c.tmp.Len() > 0 {
|
||||
a = c.tmp
|
||||
} else {
|
||||
a = c.IntegerArrayCursor.Next()
|
||||
}
|
||||
|
||||
if a.Len() == 0 {
|
||||
return &cursors.FloatArray{}
|
||||
}
|
||||
|
||||
rowIdx := 0
|
||||
var sum int64
|
||||
var count int64
|
||||
|
||||
var windowEnd int64
|
||||
if !c.window.Every.IsZero() {
|
||||
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
|
||||
} else {
|
||||
windowEnd = math.MaxInt64
|
||||
}
|
||||
windowHasPoints := false
|
||||
|
||||
// enumerate windows
|
||||
WINDOWS:
|
||||
for {
|
||||
for ; rowIdx < a.Len(); rowIdx++ {
|
||||
ts := a.Timestamps[rowIdx]
|
||||
if !c.window.Every.IsZero() && ts >= windowEnd {
|
||||
// new window detected, close the current window
|
||||
// do not generate a point for empty windows
|
||||
if windowHasPoints {
|
||||
c.res.Timestamps[pos] = windowEnd
|
||||
c.res.Values[pos] = float64(sum) / float64(count)
|
||||
pos++
|
||||
if pos >= MaxPointsPerBlock {
|
||||
// the output array is full,
|
||||
// save the remaining points in the input array in tmp.
|
||||
// they will be processed in the next call to Next()
|
||||
c.tmp.Timestamps = a.Timestamps[rowIdx:]
|
||||
c.tmp.Values = a.Values[rowIdx:]
|
||||
break WINDOWS
|
||||
}
|
||||
}
|
||||
|
||||
// start the new window
|
||||
sum = 0
|
||||
count = 0
|
||||
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
|
||||
windowHasPoints = false
|
||||
|
||||
continue WINDOWS
|
||||
} else {
|
||||
sum += a.Values[rowIdx]
|
||||
count++
|
||||
windowHasPoints = true
|
||||
}
|
||||
}
|
||||
|
||||
// Clear buffered timestamps & values if we make it through a cursor.
|
||||
// The break above will skip this if a cursor is partially read.
|
||||
c.tmp.Timestamps = nil
|
||||
c.tmp.Values = nil
|
||||
|
||||
// get the next chunk
|
||||
a = c.IntegerArrayCursor.Next()
|
||||
if a.Len() == 0 {
|
||||
// write the final point
|
||||
// do not generate a point for empty windows
|
||||
if windowHasPoints {
|
||||
c.res.Timestamps[pos] = windowEnd
|
||||
c.res.Values[pos] = float64(sum) / float64(count)
|
||||
pos++
|
||||
}
|
||||
break WINDOWS
|
||||
}
|
||||
rowIdx = 0
|
||||
}
|
||||
|
||||
c.res.Timestamps = c.res.Timestamps[:pos]
|
||||
c.res.Values = c.res.Values[:pos]
|
||||
|
||||
return c.res
|
||||
}
|
||||
|
||||
type integerEmptyArrayCursor struct {
|
||||
res cursors.IntegerArray
|
||||
}
|
||||
|
@ -707,6 +960,120 @@ func (c *integerUnsignedCountArrayCursor) Next() *cursors.IntegerArray {
|
|||
}
|
||||
}
|
||||
|
||||
type unsignedWindowMeanArrayCursor struct {
|
||||
cursors.UnsignedArrayCursor
|
||||
res *cursors.FloatArray
|
||||
tmp *cursors.UnsignedArray
|
||||
window execute.Window
|
||||
}
|
||||
|
||||
func newUnsignedWindowMeanArrayCursor(cur cursors.UnsignedArrayCursor, window execute.Window) *unsignedWindowMeanArrayCursor {
|
||||
resLen := MaxPointsPerBlock
|
||||
if window.Every.IsZero() {
|
||||
resLen = 1
|
||||
}
|
||||
return &unsignedWindowMeanArrayCursor{
|
||||
UnsignedArrayCursor: cur,
|
||||
res: cursors.NewFloatArrayLen(resLen),
|
||||
tmp: &cursors.UnsignedArray{},
|
||||
window: window,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *unsignedWindowMeanArrayCursor) Stats() cursors.CursorStats {
|
||||
return c.UnsignedArrayCursor.Stats()
|
||||
}
|
||||
|
||||
func (c *unsignedWindowMeanArrayCursor) Next() *cursors.FloatArray {
|
||||
pos := 0
|
||||
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
|
||||
c.res.Values = c.res.Values[:cap(c.res.Values)]
|
||||
|
||||
var a *cursors.UnsignedArray
|
||||
if c.tmp.Len() > 0 {
|
||||
a = c.tmp
|
||||
} else {
|
||||
a = c.UnsignedArrayCursor.Next()
|
||||
}
|
||||
|
||||
if a.Len() == 0 {
|
||||
return &cursors.FloatArray{}
|
||||
}
|
||||
|
||||
rowIdx := 0
|
||||
var sum uint64
|
||||
var count int64
|
||||
|
||||
var windowEnd int64
|
||||
if !c.window.Every.IsZero() {
|
||||
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
|
||||
} else {
|
||||
windowEnd = math.MaxInt64
|
||||
}
|
||||
windowHasPoints := false
|
||||
|
||||
// enumerate windows
|
||||
WINDOWS:
|
||||
for {
|
||||
for ; rowIdx < a.Len(); rowIdx++ {
|
||||
ts := a.Timestamps[rowIdx]
|
||||
if !c.window.Every.IsZero() && ts >= windowEnd {
|
||||
// new window detected, close the current window
|
||||
// do not generate a point for empty windows
|
||||
if windowHasPoints {
|
||||
c.res.Timestamps[pos] = windowEnd
|
||||
c.res.Values[pos] = float64(sum) / float64(count)
|
||||
pos++
|
||||
if pos >= MaxPointsPerBlock {
|
||||
// the output array is full,
|
||||
// save the remaining points in the input array in tmp.
|
||||
// they will be processed in the next call to Next()
|
||||
c.tmp.Timestamps = a.Timestamps[rowIdx:]
|
||||
c.tmp.Values = a.Values[rowIdx:]
|
||||
break WINDOWS
|
||||
}
|
||||
}
|
||||
|
||||
// start the new window
|
||||
sum = 0
|
||||
count = 0
|
||||
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
|
||||
windowHasPoints = false
|
||||
|
||||
continue WINDOWS
|
||||
} else {
|
||||
sum += a.Values[rowIdx]
|
||||
count++
|
||||
windowHasPoints = true
|
||||
}
|
||||
}
|
||||
|
||||
// Clear buffered timestamps & values if we make it through a cursor.
|
||||
// The break above will skip this if a cursor is partially read.
|
||||
c.tmp.Timestamps = nil
|
||||
c.tmp.Values = nil
|
||||
|
||||
// get the next chunk
|
||||
a = c.UnsignedArrayCursor.Next()
|
||||
if a.Len() == 0 {
|
||||
// write the final point
|
||||
// do not generate a point for empty windows
|
||||
if windowHasPoints {
|
||||
c.res.Timestamps[pos] = windowEnd
|
||||
c.res.Values[pos] = float64(sum) / float64(count)
|
||||
pos++
|
||||
}
|
||||
break WINDOWS
|
||||
}
|
||||
rowIdx = 0
|
||||
}
|
||||
|
||||
c.res.Timestamps = c.res.Timestamps[:pos]
|
||||
c.res.Values = c.res.Values[:pos]
|
||||
|
||||
return c.res
|
||||
}
|
||||
|
||||
type unsignedEmptyArrayCursor struct {
|
||||
res cursors.UnsignedArray
|
||||
}
|
||||
|
@ -1103,3 +1470,26 @@ func (c *booleanEmptyArrayCursor) Err() error { return nil }
|
|||
func (c *booleanEmptyArrayCursor) Close() {}
|
||||
func (c *booleanEmptyArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} }
|
||||
func (c *booleanEmptyArrayCursor) Next() *cursors.BooleanArray { return &c.res }
|
||||
|
||||
func arrayCursorType(cur cursors.Cursor) string {
|
||||
switch cur.(type) {
|
||||
|
||||
case cursors.FloatArrayCursor:
|
||||
return "float"
|
||||
|
||||
case cursors.IntegerArrayCursor:
|
||||
return "integer"
|
||||
|
||||
case cursors.UnsignedArrayCursor:
|
||||
return "unsigned"
|
||||
|
||||
case cursors.StringArrayCursor:
|
||||
return "string"
|
||||
|
||||
case cursors.BooleanArrayCursor:
|
||||
return "boolean"
|
||||
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,12 @@ package reads
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/values"
|
||||
"github.com/influxdata/influxdb/v2"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
||||
)
|
||||
|
||||
|
@ -13,6 +18,24 @@ const (
|
|||
MaxPointsPerBlock = 1000
|
||||
)
|
||||
|
||||
func newWindowMeanArrayCursor(cur cursors.Cursor, window execute.Window) (cursors.Cursor, error) {
|
||||
switch cur := cur.(type) {
|
||||
{{range .}}
|
||||
{{$Type := .Name}}
|
||||
{{range .Aggs}}
|
||||
{{if eq .Name "Mean"}}
|
||||
case cursors.{{$Type}}ArrayCursor:
|
||||
return new{{$Type}}WindowMeanArrayCursor(cur, window), nil
|
||||
{{end}}
|
||||
{{end}}{{/* for each supported agg fn */}}
|
||||
{{end}}{{/* for each field type */}}
|
||||
default:
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: fmt.Sprintf("unsupported input type for mean aggregate: %s", arrayCursorType(cur)),
|
||||
}
|
||||
}
|
||||
}
|
||||
{{range .}}
|
||||
{{$arrayType := print "*cursors." .Name "Array"}}
|
||||
{{$type := print .name "ArrayFilterCursor"}}
|
||||
|
@ -249,6 +272,123 @@ func (c *integer{{.Name}}CountArrayCursor) Next() *cursors.IntegerArray {
|
|||
}
|
||||
}
|
||||
|
||||
{{/* create an aggregate cursor for each aggregate function supported by the type */}}
|
||||
{{$Name := .Name}}
|
||||
{{$name := .name}}
|
||||
{{range .Aggs}}
|
||||
{{$aggName := .Name}}
|
||||
|
||||
type {{$name}}Window{{$aggName}}ArrayCursor struct {
|
||||
cursors.{{$Name}}ArrayCursor
|
||||
res *cursors.{{.OutputTypeName}}Array
|
||||
tmp {{$arrayType}}
|
||||
window execute.Window
|
||||
}
|
||||
|
||||
func new{{$Name}}Window{{$aggName}}ArrayCursor(cur cursors.{{$Name}}ArrayCursor, window execute.Window) *{{$name}}Window{{$aggName}}ArrayCursor {
|
||||
resLen := MaxPointsPerBlock
|
||||
if window.Every.IsZero() {
|
||||
resLen = 1
|
||||
}
|
||||
return &{{$name}}Window{{$aggName}}ArrayCursor{
|
||||
{{$Name}}ArrayCursor: cur,
|
||||
res: cursors.New{{.OutputTypeName}}ArrayLen(resLen),
|
||||
tmp: &cursors.{{$Name}}Array{},
|
||||
window: window,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *{{$name}}Window{{$aggName}}ArrayCursor) Stats() cursors.CursorStats {
|
||||
return c.{{$Name}}ArrayCursor.Stats()
|
||||
}
|
||||
|
||||
func (c *{{$name}}Window{{$aggName}}ArrayCursor) Next() *cursors.{{.OutputTypeName}}Array {
|
||||
pos := 0
|
||||
c.res.Timestamps = c.res.Timestamps[:cap(c.res.Timestamps)]
|
||||
c.res.Values = c.res.Values[:cap(c.res.Values)]
|
||||
|
||||
var a *cursors.{{$Name}}Array
|
||||
if c.tmp.Len() > 0 {
|
||||
a = c.tmp
|
||||
} else {
|
||||
a = c.{{$Name}}ArrayCursor.Next()
|
||||
}
|
||||
|
||||
if a.Len() == 0 {
|
||||
return &cursors.{{.OutputTypeName}}Array{}
|
||||
}
|
||||
|
||||
rowIdx := 0
|
||||
{{.AccDecls}}
|
||||
|
||||
var windowEnd int64
|
||||
if !c.window.Every.IsZero() {
|
||||
windowEnd = int64(c.window.GetEarliestBounds(values.Time(a.Timestamps[rowIdx])).Stop)
|
||||
} else {
|
||||
windowEnd = math.MaxInt64
|
||||
}
|
||||
windowHasPoints := false
|
||||
|
||||
// enumerate windows
|
||||
WINDOWS:
|
||||
for {
|
||||
for ; rowIdx < a.Len(); rowIdx++ {
|
||||
ts := a.Timestamps[rowIdx]
|
||||
if !c.window.Every.IsZero() && ts >= windowEnd {
|
||||
// new window detected, close the current window
|
||||
// do not generate a point for empty windows
|
||||
if windowHasPoints {
|
||||
{{.AccEmit}}
|
||||
pos++
|
||||
if pos >= MaxPointsPerBlock {
|
||||
// the output array is full,
|
||||
// save the remaining points in the input array in tmp.
|
||||
// they will be processed in the next call to Next()
|
||||
c.tmp.Timestamps = a.Timestamps[rowIdx:]
|
||||
c.tmp.Values = a.Values[rowIdx:]
|
||||
break WINDOWS
|
||||
}
|
||||
}
|
||||
|
||||
// start the new window
|
||||
{{.AccReset}}
|
||||
windowEnd = int64(c.window.GetEarliestBounds(values.Time(ts)).Stop)
|
||||
windowHasPoints = false
|
||||
|
||||
continue WINDOWS
|
||||
} else {
|
||||
{{.Accumulate}}
|
||||
windowHasPoints = true
|
||||
}
|
||||
}
|
||||
|
||||
// Clear buffered timestamps & values if we make it through a cursor.
|
||||
// The break above will skip this if a cursor is partially read.
|
||||
c.tmp.Timestamps = nil
|
||||
c.tmp.Values = nil
|
||||
|
||||
// get the next chunk
|
||||
a = c.{{$Name}}ArrayCursor.Next()
|
||||
if a.Len() == 0 {
|
||||
// write the final point
|
||||
// do not generate a point for empty windows
|
||||
if windowHasPoints {
|
||||
{{.AccEmit}}
|
||||
pos++
|
||||
}
|
||||
break WINDOWS
|
||||
}
|
||||
rowIdx = 0
|
||||
}
|
||||
|
||||
c.res.Timestamps = c.res.Timestamps[:pos]
|
||||
c.res.Values = c.res.Values[:pos]
|
||||
|
||||
return c.res
|
||||
}
|
||||
|
||||
{{end}}{{/* range .Aggs */}}
|
||||
|
||||
type {{.name}}EmptyArrayCursor struct {
|
||||
res cursors.{{.Name}}Array
|
||||
}
|
||||
|
@ -260,4 +400,15 @@ func (c *{{.name}}EmptyArrayCursor) Close() {}
|
|||
func (c *{{.name}}EmptyArrayCursor) Stats() cursors.CursorStats { return cursors.CursorStats{} }
|
||||
func (c *{{.name}}EmptyArrayCursor) Next() {{$arrayType}} { return &c.res }
|
||||
|
||||
{{end}}
|
||||
{{end}}{{/* range . */}}
|
||||
|
||||
func arrayCursorType(cur cursors.Cursor) string {
|
||||
switch cur.(type) {
|
||||
{{range .}}
|
||||
case cursors.{{.Name}}ArrayCursor:
|
||||
return "{{.name}}"
|
||||
{{end}}{{/* range . */}}
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,17 @@
|
|||
"Type":"float64",
|
||||
"ValueType":"FloatValue",
|
||||
"Nil":"0",
|
||||
"Agg":true
|
||||
"Agg":true,
|
||||
"Aggs": [
|
||||
{
|
||||
"Name":"Mean",
|
||||
"OutputTypeName":"Float",
|
||||
"AccDecls":"var sum float64; var count int64",
|
||||
"Accumulate":"sum += a.Values[rowIdx]; count++",
|
||||
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = sum / float64(count)",
|
||||
"AccReset":"sum = 0; count = 0"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"Name":"Integer",
|
||||
|
@ -13,7 +23,17 @@
|
|||
"Type":"int64",
|
||||
"ValueType":"IntegerValue",
|
||||
"Nil":"0",
|
||||
"Agg":true
|
||||
"Agg":true,
|
||||
"Aggs": [
|
||||
{
|
||||
"Name":"Mean",
|
||||
"OutputTypeName":"Float",
|
||||
"AccDecls":"var sum int64; var count int64",
|
||||
"Accumulate":"sum += a.Values[rowIdx]; count++",
|
||||
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = float64(sum) / float64(count)",
|
||||
"AccReset":"sum = 0; count = 0"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"Name":"Unsigned",
|
||||
|
@ -21,7 +41,17 @@
|
|||
"Type":"uint64",
|
||||
"ValueType":"UnsignedValue",
|
||||
"Nil":"0",
|
||||
"Agg":true
|
||||
"Agg":true,
|
||||
"Aggs": [
|
||||
{
|
||||
"Name":"Mean",
|
||||
"OutputTypeName":"Float",
|
||||
"AccDecls":"var sum uint64; var count int64",
|
||||
"Accumulate":"sum += a.Values[rowIdx]; count++",
|
||||
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = float64(sum) / float64(count)",
|
||||
"AccReset":"sum = 0; count = 0"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"Name":"String",
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
||||
)
|
||||
|
@ -16,16 +17,21 @@ func (v *singleValue) Value(key string) (interface{}, bool) {
|
|||
return v.v, true
|
||||
}
|
||||
|
||||
func newAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) cursors.Cursor {
|
||||
if cursor == nil {
|
||||
return nil
|
||||
}
|
||||
func newAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) (cursors.Cursor, error) {
|
||||
return newWindowAggregateArrayCursor(ctx, agg, execute.Window{}, cursor)
|
||||
}
|
||||
|
||||
func newWindowAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, window execute.Window, cursor cursors.Cursor) (cursors.Cursor, error) {
|
||||
if cursor == nil {
|
||||
return nil, nil
|
||||
}
|
||||
switch agg.Type {
|
||||
case datatypes.AggregateTypeSum:
|
||||
return newSumArrayCursor(cursor)
|
||||
return newSumArrayCursor(cursor), nil
|
||||
case datatypes.AggregateTypeCount:
|
||||
return newCountArrayCursor(cursor)
|
||||
return newCountArrayCursor(cursor), nil
|
||||
case datatypes.AggregateTypeMean:
|
||||
return newWindowMeanArrayCursor(cursor, window)
|
||||
default:
|
||||
// TODO(sgc): should be validated higher up
|
||||
panic("invalid aggregate")
|
||||
|
@ -158,6 +164,6 @@ func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor {
|
|||
}
|
||||
}
|
||||
|
||||
func (m *multiShardArrayCursors) newAggregateCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) cursors.Cursor {
|
||||
func (m *multiShardArrayCursors) newAggregateCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) (cursors.Cursor, error) {
|
||||
return newAggregateArrayCursor(ctx, agg, cursor)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,274 @@
|
|||
// Generated by tmpl
|
||||
// https://github.com/benbjohnson/tmpl
|
||||
//
|
||||
// DO NOT EDIT!
|
||||
// Source: array_cursor_test.gen.go.tmpl
|
||||
|
||||
package reads
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/values"
|
||||
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
||||
)
|
||||
|
||||
type MockFloatArrayCursor struct {
|
||||
CloseFunc func()
|
||||
ErrFunc func() error
|
||||
StatsFunc func() cursors.CursorStats
|
||||
NextFunc func() *cursors.FloatArray
|
||||
}
|
||||
|
||||
func (c *MockFloatArrayCursor) Close() { c.CloseFunc() }
|
||||
func (c *MockFloatArrayCursor) Err() error { return c.ErrFunc() }
|
||||
func (c *MockFloatArrayCursor) Stats() cursors.CursorStats { return c.StatsFunc() }
|
||||
func (c *MockFloatArrayCursor) Next() *cursors.FloatArray { return c.NextFunc() }
|
||||
|
||||
func TestNewAggregateArrayCursor_Float(t *testing.T) {
|
||||
t.Run("Mean", func(t *testing.T) {
|
||||
want := &floatWindowMeanArrayCursor{
|
||||
FloatArrayCursor: &MockFloatArrayCursor{},
|
||||
res: cursors.NewFloatArrayLen(1),
|
||||
tmp: &cursors.FloatArray{},
|
||||
}
|
||||
|
||||
agg := &datatypes.Aggregate{
|
||||
Type: datatypes.AggregateTypeMean,
|
||||
}
|
||||
|
||||
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockFloatArrayCursor{})
|
||||
|
||||
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMeanArrayCursor{})); diff != "" {
|
||||
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestNewWindowAggregateArrayCursorMonths_Float(t *testing.T) {
|
||||
t.Run("Mean", func(t *testing.T) {
|
||||
window := execute.Window{
|
||||
Every: values.MakeDuration(int64(time.Hour), 0, false),
|
||||
Period: values.MakeDuration(int64(time.Hour), 0, false),
|
||||
}
|
||||
|
||||
want := &floatWindowMeanArrayCursor{
|
||||
FloatArrayCursor: &MockFloatArrayCursor{},
|
||||
res: cursors.NewFloatArrayLen(MaxPointsPerBlock),
|
||||
tmp: &cursors.FloatArray{},
|
||||
window: window,
|
||||
}
|
||||
|
||||
agg := &datatypes.Aggregate{
|
||||
Type: datatypes.AggregateTypeMean,
|
||||
}
|
||||
|
||||
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
|
||||
|
||||
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMeanArrayCursor{})); diff != "" {
|
||||
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestNewWindowAggregateArrayCursor_Float(t *testing.T) {
|
||||
t.Run("Mean", func(t *testing.T) {
|
||||
window := execute.Window{
|
||||
Every: values.MakeDuration(0, 1, false),
|
||||
Period: values.MakeDuration(0, 1, false),
|
||||
}
|
||||
|
||||
want := &floatWindowMeanArrayCursor{
|
||||
FloatArrayCursor: &MockFloatArrayCursor{},
|
||||
res: cursors.NewFloatArrayLen(MaxPointsPerBlock),
|
||||
tmp: &cursors.FloatArray{},
|
||||
window: window,
|
||||
}
|
||||
|
||||
agg := &datatypes.Aggregate{
|
||||
Type: datatypes.AggregateTypeMean,
|
||||
}
|
||||
|
||||
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockFloatArrayCursor{})
|
||||
|
||||
if diff := cmp.Diff(got, want, cmp.AllowUnexported(floatWindowMeanArrayCursor{})); diff != "" {
|
||||
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
type MockIntegerArrayCursor struct {
|
||||
CloseFunc func()
|
||||
ErrFunc func() error
|
||||
StatsFunc func() cursors.CursorStats
|
||||
NextFunc func() *cursors.IntegerArray
|
||||
}
|
||||
|
||||
func (c *MockIntegerArrayCursor) Close() { c.CloseFunc() }
|
||||
func (c *MockIntegerArrayCursor) Err() error { return c.ErrFunc() }
|
||||
func (c *MockIntegerArrayCursor) Stats() cursors.CursorStats { return c.StatsFunc() }
|
||||
func (c *MockIntegerArrayCursor) Next() *cursors.IntegerArray { return c.NextFunc() }
|
||||
|
||||
func TestNewAggregateArrayCursor_Integer(t *testing.T) {
|
||||
t.Run("Mean", func(t *testing.T) {
|
||||
want := &integerWindowMeanArrayCursor{
|
||||
IntegerArrayCursor: &MockIntegerArrayCursor{},
|
||||
res: cursors.NewFloatArrayLen(1),
|
||||
tmp: &cursors.IntegerArray{},
|
||||
}
|
||||
|
||||
agg := &datatypes.Aggregate{
|
||||
Type: datatypes.AggregateTypeMean,
|
||||
}
|
||||
|
||||
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockIntegerArrayCursor{})
|
||||
|
||||
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMeanArrayCursor{})); diff != "" {
|
||||
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestNewWindowAggregateArrayCursorMonths_Integer(t *testing.T) {
|
||||
t.Run("Mean", func(t *testing.T) {
|
||||
window := execute.Window{
|
||||
Every: values.MakeDuration(int64(time.Hour), 0, false),
|
||||
Period: values.MakeDuration(int64(time.Hour), 0, false),
|
||||
}
|
||||
|
||||
want := &integerWindowMeanArrayCursor{
|
||||
IntegerArrayCursor: &MockIntegerArrayCursor{},
|
||||
res: cursors.NewFloatArrayLen(MaxPointsPerBlock),
|
||||
tmp: &cursors.IntegerArray{},
|
||||
window: window,
|
||||
}
|
||||
|
||||
agg := &datatypes.Aggregate{
|
||||
Type: datatypes.AggregateTypeMean,
|
||||
}
|
||||
|
||||
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
|
||||
|
||||
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMeanArrayCursor{})); diff != "" {
|
||||
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestNewWindowAggregateArrayCursor_Integer(t *testing.T) {
|
||||
t.Run("Mean", func(t *testing.T) {
|
||||
window := execute.Window{
|
||||
Every: values.MakeDuration(0, 1, false),
|
||||
Period: values.MakeDuration(0, 1, false),
|
||||
}
|
||||
|
||||
want := &integerWindowMeanArrayCursor{
|
||||
IntegerArrayCursor: &MockIntegerArrayCursor{},
|
||||
res: cursors.NewFloatArrayLen(MaxPointsPerBlock),
|
||||
tmp: &cursors.IntegerArray{},
|
||||
window: window,
|
||||
}
|
||||
|
||||
agg := &datatypes.Aggregate{
|
||||
Type: datatypes.AggregateTypeMean,
|
||||
}
|
||||
|
||||
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockIntegerArrayCursor{})
|
||||
|
||||
if diff := cmp.Diff(got, want, cmp.AllowUnexported(integerWindowMeanArrayCursor{})); diff != "" {
|
||||
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
type MockUnsignedArrayCursor struct {
|
||||
CloseFunc func()
|
||||
ErrFunc func() error
|
||||
StatsFunc func() cursors.CursorStats
|
||||
NextFunc func() *cursors.UnsignedArray
|
||||
}
|
||||
|
||||
func (c *MockUnsignedArrayCursor) Close() { c.CloseFunc() }
|
||||
func (c *MockUnsignedArrayCursor) Err() error { return c.ErrFunc() }
|
||||
func (c *MockUnsignedArrayCursor) Stats() cursors.CursorStats { return c.StatsFunc() }
|
||||
func (c *MockUnsignedArrayCursor) Next() *cursors.UnsignedArray { return c.NextFunc() }
|
||||
|
||||
func TestNewAggregateArrayCursor_Unsigned(t *testing.T) {
|
||||
t.Run("Mean", func(t *testing.T) {
|
||||
want := &unsignedWindowMeanArrayCursor{
|
||||
UnsignedArrayCursor: &MockUnsignedArrayCursor{},
|
||||
res: cursors.NewFloatArrayLen(1),
|
||||
tmp: &cursors.UnsignedArray{},
|
||||
}
|
||||
|
||||
agg := &datatypes.Aggregate{
|
||||
Type: datatypes.AggregateTypeMean,
|
||||
}
|
||||
|
||||
got, _ := newAggregateArrayCursor(context.Background(), agg, &MockUnsignedArrayCursor{})
|
||||
|
||||
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMeanArrayCursor{})); diff != "" {
|
||||
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestNewWindowAggregateArrayCursorMonths_Unsigned(t *testing.T) {
|
||||
t.Run("Mean", func(t *testing.T) {
|
||||
window := execute.Window{
|
||||
Every: values.MakeDuration(int64(time.Hour), 0, false),
|
||||
Period: values.MakeDuration(int64(time.Hour), 0, false),
|
||||
}
|
||||
|
||||
want := &unsignedWindowMeanArrayCursor{
|
||||
UnsignedArrayCursor: &MockUnsignedArrayCursor{},
|
||||
res: cursors.NewFloatArrayLen(MaxPointsPerBlock),
|
||||
tmp: &cursors.UnsignedArray{},
|
||||
window: window,
|
||||
}
|
||||
|
||||
agg := &datatypes.Aggregate{
|
||||
Type: datatypes.AggregateTypeMean,
|
||||
}
|
||||
|
||||
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
|
||||
|
||||
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMeanArrayCursor{})); diff != "" {
|
||||
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestNewWindowAggregateArrayCursor_Unsigned(t *testing.T) {
|
||||
t.Run("Mean", func(t *testing.T) {
|
||||
window := execute.Window{
|
||||
Every: values.MakeDuration(0, 1, false),
|
||||
Period: values.MakeDuration(0, 1, false),
|
||||
}
|
||||
|
||||
want := &unsignedWindowMeanArrayCursor{
|
||||
UnsignedArrayCursor: &MockUnsignedArrayCursor{},
|
||||
res: cursors.NewFloatArrayLen(MaxPointsPerBlock),
|
||||
tmp: &cursors.UnsignedArray{},
|
||||
window: window,
|
||||
}
|
||||
|
||||
agg := &datatypes.Aggregate{
|
||||
Type: datatypes.AggregateTypeMean,
|
||||
}
|
||||
|
||||
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &MockUnsignedArrayCursor{})
|
||||
|
||||
if diff := cmp.Diff(got, want, cmp.AllowUnexported(unsignedWindowMeanArrayCursor{})); diff != "" {
|
||||
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
})
|
||||
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
package reads
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/values"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
||||
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
|
||||
)
|
||||
|
||||
{{range .}}
|
||||
{{$ColType := .Name}}
|
||||
{{$colType := .name}}
|
||||
|
||||
{{range .Aggs}}
|
||||
{{if eq .Name "Mean"}}
|
||||
{{$Agg := .Name}}
|
||||
|
||||
type Mock{{$ColType}}ArrayCursor struct {
|
||||
CloseFunc func()
|
||||
ErrFunc func() error
|
||||
StatsFunc func() cursors.CursorStats
|
||||
NextFunc func() *cursors.{{$ColType}}Array
|
||||
}
|
||||
|
||||
func (c *Mock{{$ColType}}ArrayCursor) Close() { c.CloseFunc() }
|
||||
func (c *Mock{{$ColType}}ArrayCursor) Err() error { return c.ErrFunc() }
|
||||
func (c *Mock{{$ColType}}ArrayCursor) Stats() cursors.CursorStats { return c.StatsFunc() }
|
||||
func (c *Mock{{$ColType}}ArrayCursor) Next() *cursors.{{$ColType}}Array { return c.NextFunc() }
|
||||
|
||||
func TestNewAggregateArrayCursor_{{$ColType}}(t *testing.T) {
|
||||
t.Run("{{$Agg}}", func(t *testing.T) {
|
||||
want := &{{$colType}}Window{{$Agg}}ArrayCursor{
|
||||
{{$ColType}}ArrayCursor: &Mock{{$ColType}}ArrayCursor{},
|
||||
res: cursors.New{{.OutputTypeName}}ArrayLen(1),
|
||||
tmp: &cursors.{{$ColType}}Array{},
|
||||
}
|
||||
|
||||
agg := &datatypes.Aggregate{
|
||||
Type: datatypes.AggregateType{{$Agg}},
|
||||
}
|
||||
|
||||
got, _ := newAggregateArrayCursor(context.Background(), agg, &Mock{{$ColType}}ArrayCursor{})
|
||||
|
||||
if diff := cmp.Diff(got, want, cmp.AllowUnexported({{$colType}}Window{{$Agg}}ArrayCursor{})); diff != "" {
|
||||
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestNewWindowAggregateArrayCursorMonths_{{$ColType}}(t *testing.T) {
|
||||
t.Run("{{$Agg}}", func(t *testing.T) {
|
||||
window := execute.Window{
|
||||
Every: values.MakeDuration(int64(time.Hour), 0, false),
|
||||
Period: values.MakeDuration(int64(time.Hour), 0, false),
|
||||
}
|
||||
|
||||
want := &{{$colType}}Window{{$Agg}}ArrayCursor{
|
||||
{{$ColType}}ArrayCursor: &Mock{{$ColType}}ArrayCursor{},
|
||||
res: cursors.New{{.OutputTypeName}}ArrayLen(MaxPointsPerBlock),
|
||||
tmp: &cursors.{{$ColType}}Array{},
|
||||
window: window,
|
||||
}
|
||||
|
||||
agg := &datatypes.Aggregate{
|
||||
Type: datatypes.AggregateType{{$Agg}},
|
||||
}
|
||||
|
||||
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &Mock{{$ColType}}ArrayCursor{})
|
||||
|
||||
if diff := cmp.Diff(got, want, cmp.AllowUnexported({{$colType}}Window{{$Agg}}ArrayCursor{})); diff != "" {
|
||||
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestNewWindowAggregateArrayCursor_{{$ColType}}(t *testing.T) {
|
||||
t.Run("{{$Agg}}", func(t *testing.T) {
|
||||
window := execute.Window{
|
||||
Every: values.MakeDuration(0, 1, false),
|
||||
Period: values.MakeDuration(0, 1, false),
|
||||
}
|
||||
|
||||
want := &{{$colType}}Window{{$Agg}}ArrayCursor{
|
||||
{{$ColType}}ArrayCursor: &Mock{{$ColType}}ArrayCursor{},
|
||||
res: cursors.New{{.OutputTypeName}}ArrayLen(MaxPointsPerBlock),
|
||||
tmp: &cursors.{{$ColType}}Array{},
|
||||
window: window,
|
||||
}
|
||||
|
||||
agg := &datatypes.Aggregate{
|
||||
Type: datatypes.AggregateType{{$Agg}},
|
||||
}
|
||||
|
||||
got, _ := newWindowAggregateArrayCursor(context.Background(), agg, window, &Mock{{$ColType}}ArrayCursor{})
|
||||
|
||||
if diff := cmp.Diff(got, want, cmp.AllowUnexported({{$colType}}Window{{$Agg}}ArrayCursor{})); diff != "" {
|
||||
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
{{end}}
|
||||
{{end}}
|
||||
{{end}}{{/* range over each supported field type */}}
|
||||
|
|
@ -1,11 +1,17 @@
|
|||
package reads
|
||||
|
||||
import (
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/values"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
||||
)
|
||||
|
||||
|
||||
func TestIntegerFilterArrayCursor(t *testing.T) {
|
||||
var i int
|
||||
expr := MockExpression{
|
||||
|
@ -39,17 +45,216 @@ func TestIntegerFilterArrayCursor(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type MockIntegerArrayCursor struct {
|
||||
CloseFunc func()
|
||||
ErrFunc func() error
|
||||
StatsFunc func() cursors.CursorStats
|
||||
NextFunc func() *cursors.IntegerArray
|
||||
func makeIntegerArray(n int, tsStart time.Time, tsStep time.Duration, valueFn func(i int64) int64) *cursors.IntegerArray {
|
||||
ia := &cursors.IntegerArray{
|
||||
Timestamps: make([]int64, n),
|
||||
Values: make([]int64, n),
|
||||
}
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
ia.Timestamps[i] = tsStart.UnixNano() + int64(i)*int64(tsStep)
|
||||
ia.Values[i] = valueFn(int64(i))
|
||||
}
|
||||
|
||||
return ia
|
||||
}
|
||||
|
||||
func (c *MockIntegerArrayCursor) Close() { c.CloseFunc() }
|
||||
func (c *MockIntegerArrayCursor) Err() error { return c.ErrFunc() }
|
||||
func (c *MockIntegerArrayCursor) Stats() cursors.CursorStats { return c.StatsFunc() }
|
||||
func (c *MockIntegerArrayCursor) Next() *cursors.IntegerArray { return c.NextFunc() }
|
||||
func makeFloatArray(n int, tsStart time.Time, tsStep time.Duration, valueFn func(i int64) float64) *cursors.FloatArray {
|
||||
fa := &cursors.FloatArray{
|
||||
Timestamps: make([]int64, n),
|
||||
Values: make([]float64, n),
|
||||
}
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
fa.Timestamps[i] = tsStart.UnixNano() + int64(i)*int64(tsStep)
|
||||
fa.Values[i] = valueFn(int64(i))
|
||||
}
|
||||
|
||||
return fa
|
||||
}
|
||||
|
||||
func mustParseTime(ts string) time.Time {
|
||||
t, err := time.Parse(time.RFC3339, ts)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
||||
func copyIntegerArray(src *cursors.IntegerArray) *cursors.IntegerArray {
|
||||
dst := cursors.NewIntegerArrayLen(src.Len())
|
||||
copy(dst.Timestamps, src.Timestamps)
|
||||
copy(dst.Values, src.Values)
|
||||
return dst
|
||||
}
|
||||
|
||||
func copyFloatArray(src *cursors.FloatArray) *cursors.FloatArray {
|
||||
dst := cursors.NewFloatArrayLen(src.Len())
|
||||
copy(dst.Timestamps, src.Timestamps)
|
||||
copy(dst.Values, src.Values)
|
||||
return dst
|
||||
}
|
||||
|
||||
type aggArrayCursorTest struct {
|
||||
name string
|
||||
createCursorFn func(cur cursors.IntegerArrayCursor, every, offset int64, window execute.Window) cursors.Cursor
|
||||
every time.Duration
|
||||
offset time.Duration
|
||||
inputArrays []*cursors.IntegerArray
|
||||
wantIntegers []*cursors.IntegerArray
|
||||
wantFloats []*cursors.FloatArray
|
||||
window execute.Window
|
||||
}
|
||||
|
||||
func (a *aggArrayCursorTest) run(t *testing.T) {
|
||||
t.Helper()
|
||||
t.Run(a.name, func(t *testing.T) {
|
||||
var resultN int
|
||||
mc := &MockIntegerArrayCursor{
|
||||
CloseFunc: func() {},
|
||||
ErrFunc: func() error { return nil },
|
||||
StatsFunc: func() cursors.CursorStats { return cursors.CursorStats{} },
|
||||
NextFunc: func() *cursors.IntegerArray {
|
||||
if resultN < len(a.inputArrays) {
|
||||
a := a.inputArrays[resultN]
|
||||
resultN++
|
||||
return a
|
||||
}
|
||||
return &cursors.IntegerArray{}
|
||||
},
|
||||
}
|
||||
c := a.createCursorFn(mc, int64(a.every), int64(a.offset), a.window)
|
||||
switch cursor := c.(type) {
|
||||
case cursors.IntegerArrayCursor:
|
||||
got := make([]*cursors.IntegerArray, 0, len(a.wantIntegers))
|
||||
for a := cursor.Next(); a.Len() != 0; a = cursor.Next() {
|
||||
got = append(got, copyIntegerArray(a))
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(got, a.wantIntegers); diff != "" {
|
||||
t.Fatalf("did not get expected result from count array cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
case cursors.FloatArrayCursor:
|
||||
got := make([]*cursors.FloatArray, 0, len(a.wantFloats))
|
||||
for a := cursor.Next(); a.Len() != 0; a = cursor.Next() {
|
||||
got = append(got, copyFloatArray(a))
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(got, a.wantFloats); diff != "" {
|
||||
t.Fatalf("did not get expected result from count array cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
default:
|
||||
t.Fatalf("unsupported cursor type: %T", cursor)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestWindowMeanArrayCursor(t *testing.T) {
|
||||
maxTimestamp := time.Unix(0, math.MaxInt64)
|
||||
|
||||
testcases := []aggArrayCursorTest{
|
||||
{
|
||||
name: "no window",
|
||||
every: 0,
|
||||
inputArrays: []*cursors.IntegerArray{
|
||||
makeIntegerArray(
|
||||
5,
|
||||
mustParseTime("2010-01-01T00:00:00Z"), time.Minute,
|
||||
func(i int64) int64 { return i + 1 },
|
||||
),
|
||||
},
|
||||
wantFloats: []*cursors.FloatArray{
|
||||
makeFloatArray(1, maxTimestamp, 0, func(int64) float64 { return 3.0 }),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no window fraction result",
|
||||
every: 0,
|
||||
inputArrays: []*cursors.IntegerArray{
|
||||
makeIntegerArray(
|
||||
6,
|
||||
mustParseTime("2010-01-01T00:00:00Z"), time.Minute,
|
||||
func(i int64) int64 { return i + 1 },
|
||||
),
|
||||
},
|
||||
wantFloats: []*cursors.FloatArray{
|
||||
makeFloatArray(1, maxTimestamp, 0, func(int64) float64 { return 3.5 }),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no window empty",
|
||||
every: 0,
|
||||
inputArrays: []*cursors.IntegerArray{},
|
||||
wantFloats: []*cursors.FloatArray{},
|
||||
},
|
||||
{
|
||||
name: "window",
|
||||
every: 30 * time.Minute,
|
||||
inputArrays: []*cursors.IntegerArray{
|
||||
makeIntegerArray(
|
||||
8,
|
||||
mustParseTime("2010-01-01T00:00:00Z"), 15*time.Minute,
|
||||
func(i int64) int64 {
|
||||
return i
|
||||
},
|
||||
),
|
||||
},
|
||||
wantFloats: []*cursors.FloatArray{
|
||||
makeFloatArray(4, mustParseTime("2010-01-01T00:30:00Z"), 30*time.Minute,
|
||||
func(i int64) float64 { return 0.5 + float64(i)*2 }),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "window offset",
|
||||
every: 30 * time.Minute,
|
||||
offset: 5 * time.Minute,
|
||||
inputArrays: []*cursors.IntegerArray{
|
||||
makeIntegerArray(
|
||||
8,
|
||||
mustParseTime("2010-01-01T00:00:00Z"), 15*time.Minute,
|
||||
func(i int64) int64 {
|
||||
return i
|
||||
},
|
||||
),
|
||||
},
|
||||
wantFloats: []*cursors.FloatArray{
|
||||
makeFloatArray(5, mustParseTime("2010-01-01T00:05:00Z"), 30*time.Minute,
|
||||
func(i int64) float64 { return []float64{0, 1.5, 3.5, 5.5, 7}[i] }),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty window",
|
||||
every: 15 * time.Minute,
|
||||
inputArrays: []*cursors.IntegerArray{
|
||||
makeIntegerArray(
|
||||
2,
|
||||
mustParseTime("2010-01-01T00:05:00Z"), 30*time.Minute,
|
||||
func(i int64) int64 {
|
||||
return 100 + i
|
||||
},
|
||||
),
|
||||
},
|
||||
wantFloats: []*cursors.FloatArray{
|
||||
makeFloatArray(2, mustParseTime("2010-01-01T00:15:00Z"), 30*time.Minute,
|
||||
func(i int64) float64 { return 100 + float64(i) }),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range testcases {
|
||||
tc.createCursorFn = func(cur cursors.IntegerArrayCursor, every, offset int64, window execute.Window) cursors.Cursor {
|
||||
if every != 0 || offset != 0 {
|
||||
everyDur := values.MakeDuration(every, 0, false)
|
||||
offsetDur := values.MakeDuration(offset, 0, false)
|
||||
window = execute.Window{
|
||||
Every: everyDur,
|
||||
Offset: offsetDur,
|
||||
}
|
||||
}
|
||||
return newIntegerWindowMeanArrayCursor(cur, window)
|
||||
}
|
||||
tc.run(t)
|
||||
}
|
||||
}
|
||||
|
||||
type MockExpression struct {
|
||||
EvalBoolFunc func(v Valuer) bool
|
||||
|
|
|
@ -6,12 +6,11 @@ package datatypes
|
|||
import (
|
||||
encoding_binary "encoding/binary"
|
||||
fmt "fmt"
|
||||
_ "github.com/gogo/protobuf/gogoproto"
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
io "io"
|
||||
math "math"
|
||||
math_bits "math/bits"
|
||||
|
||||
_ "github.com/gogo/protobuf/gogoproto"
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
)
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
|
|
|
@ -6,13 +6,12 @@ package datatypes
|
|||
import (
|
||||
encoding_binary "encoding/binary"
|
||||
fmt "fmt"
|
||||
io "io"
|
||||
math "math"
|
||||
math_bits "math/bits"
|
||||
|
||||
_ "github.com/gogo/protobuf/gogoproto"
|
||||
proto "github.com/gogo/protobuf/proto"
|
||||
types "github.com/gogo/protobuf/types"
|
||||
io "io"
|
||||
math "math"
|
||||
math_bits "math/bits"
|
||||
)
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
package reads
|
||||
|
||||
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@array_cursor.gen.go.tmpldata array_cursor.gen.go.tmpl
|
||||
//go:generate env GO111MODULE=on go run github.com/benbjohnson/tmpl -data=@array_cursor.gen.go.tmpldata -o=array_cursor_gen_test.go array_cursor_test.gen.go.tmpl
|
||||
|
|
|
@ -294,7 +294,7 @@ func (c *groupNoneCursor) Next() bool {
|
|||
func (c *groupNoneCursor) Cursor() cursors.Cursor {
|
||||
cur := c.arrayCursors.createCursor(c.row)
|
||||
if c.agg != nil {
|
||||
cur = c.arrayCursors.newAggregateCursor(c.ctx, c.agg, cur)
|
||||
cur, _ = c.arrayCursors.newAggregateCursor(c.ctx, c.agg, cur)
|
||||
}
|
||||
return cur
|
||||
}
|
||||
|
@ -335,7 +335,7 @@ func (c *groupByCursor) Next() bool {
|
|||
func (c *groupByCursor) Cursor() cursors.Cursor {
|
||||
cur := c.arrayCursors.createCursor(*c.seriesRows[c.i-1])
|
||||
if c.agg != nil {
|
||||
cur = c.arrayCursors.newAggregateCursor(c.ctx, c.agg, cur)
|
||||
cur, _ = c.arrayCursors.newAggregateCursor(c.ctx, c.agg, cur)
|
||||
}
|
||||
return cur
|
||||
}
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
|
||||
type multiShardCursors interface {
|
||||
createCursor(row SeriesRow) cursors.Cursor
|
||||
newAggregateCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) cursors.Cursor
|
||||
newAggregateCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) (cursors.Cursor, error)
|
||||
}
|
||||
|
||||
type resultSet struct {
|
||||
|
@ -60,7 +60,7 @@ func (r *resultSet) Next() bool {
|
|||
func (r *resultSet) Cursor() cursors.Cursor {
|
||||
cur := r.arrayCursors.createCursor(r.seriesRow)
|
||||
if r.agg != nil {
|
||||
cur = r.arrayCursors.newAggregateCursor(r.ctx, r.agg, cur)
|
||||
cur, _ = r.arrayCursors.newAggregateCursor(r.ctx, r.agg, cur)
|
||||
}
|
||||
return cur
|
||||
}
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
[
|
||||
{
|
||||
"Name":"Float",
|
||||
"name":"float",
|
||||
"Type":"float64"
|
||||
},
|
||||
{
|
||||
"Name":"Integer",
|
||||
"name":"integer",
|
||||
"Type":"int64"
|
||||
},
|
||||
{
|
||||
"Name":"Unsigned",
|
||||
"name":"unsigned",
|
||||
"Type":"uint64"
|
||||
},
|
||||
{
|
||||
"Name":"String",
|
||||
"name":"string",
|
||||
"Type":"string"
|
||||
},
|
||||
{
|
||||
"Name":"Boolean",
|
||||
"name":"boolean",
|
||||
"Type":"bool"
|
||||
}
|
||||
]
|
Loading…
Reference in New Issue