feat: aggregate array cursors
parent
3b7cb529dc
commit
b7ac9f07be
|
@ -18,7 +18,7 @@ type windowAggregateResultSet struct {
|
|||
req *datatypes.ReadWindowAggregateRequest
|
||||
seriesCursor SeriesCursor
|
||||
seriesRow SeriesRow
|
||||
arrayCursors *arrayCursors
|
||||
arrayCursors multiShardCursors
|
||||
cursor cursors.Cursor
|
||||
err error
|
||||
}
|
||||
|
@ -57,7 +57,7 @@ func NewWindowAggregateResultSet(ctx context.Context, req *datatypes.ReadWindowA
|
|||
ctx: ctx,
|
||||
req: req,
|
||||
seriesCursor: cursor,
|
||||
arrayCursors: newArrayCursors(ctx, req.Range.Start, req.Range.End, ascending),
|
||||
arrayCursors: newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, ascending),
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -18,6 +18,107 @@ const (
|
|||
MaxPointsPerBlock = 1000
|
||||
)
|
||||
|
||||
func newLimitArrayCursor(cur cursors.Cursor) cursors.Cursor {
|
||||
switch cur := cur.(type) {
|
||||
{{range .}}{{/* every type supports limit */}}
|
||||
case cursors.{{.Name}}ArrayCursor:
|
||||
return new{{.Name}}LimitArrayCursor(cur)
|
||||
{{end}}
|
||||
default:
|
||||
panic(fmt.Sprintf("unreachable: %T", cur))
|
||||
}
|
||||
}
|
||||
|
||||
func newWindowFirstArrayCursor(cur cursors.Cursor, window execute.Window) cursors.Cursor {
|
||||
if window.Every.IsZero() {
|
||||
return newLimitArrayCursor(cur)
|
||||
}
|
||||
switch cur := cur.(type) {
|
||||
{{range .}}{{/* every type supports first */}}
|
||||
case cursors.{{.Name}}ArrayCursor:
|
||||
return new{{.Name}}WindowFirstArrayCursor(cur, window)
|
||||
{{end}}
|
||||
default:
|
||||
panic(fmt.Sprintf("unreachable: %T", cur))
|
||||
}
|
||||
}
|
||||
|
||||
func newWindowLastArrayCursor(cur cursors.Cursor, window execute.Window) cursors.Cursor {
|
||||
if window.Every.IsZero() {
|
||||
return newLimitArrayCursor(cur)
|
||||
}
|
||||
switch cur := cur.(type) {
|
||||
{{range .}}{{/* every type supports last */}}
|
||||
case cursors.{{.Name}}ArrayCursor:
|
||||
return new{{.Name}}WindowLastArrayCursor(cur, window)
|
||||
{{end}}
|
||||
default:
|
||||
panic(fmt.Sprintf("unreachable: %T", cur))
|
||||
}
|
||||
}
|
||||
|
||||
func newWindowCountArrayCursor(cur cursors.Cursor, window execute.Window) cursors.Cursor {
|
||||
switch cur := cur.(type) {
|
||||
{{range .}}{{/* every type supports count */}}
|
||||
case cursors.{{.Name}}ArrayCursor:
|
||||
return new{{.Name}}WindowCountArrayCursor(cur, window)
|
||||
{{end}}
|
||||
default:
|
||||
panic(fmt.Sprintf("unreachable: %T", cur))
|
||||
}
|
||||
}
|
||||
|
||||
func newWindowSumArrayCursor(cur cursors.Cursor, window execute.Window) (cursors.Cursor, error) {
|
||||
switch cur := cur.(type) {
|
||||
{{range .}}
|
||||
{{$Type := .Name}}
|
||||
{{range .Aggs}}
|
||||
{{if eq .Name "Sum"}}
|
||||
case cursors.{{$Type}}ArrayCursor:
|
||||
return new{{$Type}}WindowSumArrayCursor(cur, window), nil
|
||||
{{end}}
|
||||
{{end}}{{/* for each supported agg fn */}}
|
||||
{{end}}{{/* for each field type */}}
|
||||
default:
|
||||
return nil, &influxdb.Error{
|
||||
Code: influxdb.EInvalid,
|
||||
Msg: fmt.Sprintf("unsupported input type for sum aggregate: %s", arrayCursorType(cur)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newWindowMinArrayCursor(cur cursors.Cursor, window execute.Window) cursors.Cursor {
|
||||
switch cur := cur.(type) {
|
||||
{{range .}}
|
||||
{{$Type := .Name}}
|
||||
{{range .Aggs}}
|
||||
{{if eq .Name "Min"}}
|
||||
case cursors.{{$Type}}ArrayCursor:
|
||||
return new{{$Type}}WindowMinArrayCursor(cur, window)
|
||||
{{end}}
|
||||
{{end}}{{/* for each supported agg fn */}}
|
||||
{{end}}{{/* for each field type */}}
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported for aggregate min: %T", cur))
|
||||
}
|
||||
}
|
||||
|
||||
func newWindowMaxArrayCursor(cur cursors.Cursor, window execute.Window) cursors.Cursor {
|
||||
switch cur := cur.(type) {
|
||||
{{range .}}
|
||||
{{$Type := .Name}}
|
||||
{{range .Aggs}}
|
||||
{{if eq .Name "Max"}}
|
||||
case cursors.{{$Type}}ArrayCursor:
|
||||
return new{{$Type}}WindowMaxArrayCursor(cur, window)
|
||||
{{end}}
|
||||
{{end}}{{/* for each supported agg fn */}}
|
||||
{{end}}{{/* for each field type */}}
|
||||
default:
|
||||
panic(fmt.Sprintf("unsupported for aggregate max: %T", cur))
|
||||
}
|
||||
}
|
||||
|
||||
func newWindowMeanArrayCursor(cur cursors.Cursor, window execute.Window) (cursors.Cursor, error) {
|
||||
switch cur := cur.(type) {
|
||||
{{range .}}
|
||||
|
@ -112,82 +213,6 @@ LOOP:
|
|||
return c.res
|
||||
}
|
||||
|
||||
type {{.name}}ArrayCursor struct {
|
||||
cursors.{{.Name}}ArrayCursor
|
||||
cursorContext
|
||||
filter *{{$type}}
|
||||
}
|
||||
|
||||
|
||||
func (c *{{.name}}ArrayCursor) reset(cur cursors.{{.Name}}ArrayCursor, cursorIterators cursors.CursorIterators, cond expression) {
|
||||
if cond != nil {
|
||||
if c.filter == nil {
|
||||
c.filter = new{{.Name}}FilterArrayCursor(cond)
|
||||
}
|
||||
c.filter.reset(cur)
|
||||
cur = c.filter
|
||||
}
|
||||
|
||||
c.{{.Name}}ArrayCursor = cur
|
||||
c.itrs = cursorIterators
|
||||
c.err = nil
|
||||
}
|
||||
|
||||
|
||||
func (c *{{.name}}ArrayCursor) Err() error { return c.err }
|
||||
|
||||
func (c *{{.name}}ArrayCursor) Stats() cursors.CursorStats {
|
||||
return c.{{.Name}}ArrayCursor.Stats()
|
||||
}
|
||||
|
||||
func (c *{{.name}}ArrayCursor) Next() {{$arrayType}} {
|
||||
for {
|
||||
a := c.{{.Name}}ArrayCursor.Next()
|
||||
if a.Len() == 0 {
|
||||
if c.nextArrayCursor() {
|
||||
continue
|
||||
}
|
||||
}
|
||||
return a
|
||||
}
|
||||
}
|
||||
|
||||
func (c *{{.name}}ArrayCursor) nextArrayCursor() bool {
|
||||
if len(c.itrs) < 1 {
|
||||
return false
|
||||
}
|
||||
|
||||
c.{{.Name}}ArrayCursor.Close()
|
||||
|
||||
var itr cursors.CursorIterator
|
||||
var cur cursors.Cursor
|
||||
itr, c.itrs = c.itrs[0], c.itrs[1:]
|
||||
cur, _ = itr.Next(c.ctx, c.req)
|
||||
|
||||
itr = nil
|
||||
|
||||
var ok bool
|
||||
if cur != nil {
|
||||
var next cursors.{{.Name}}ArrayCursor
|
||||
next, ok = cur.(cursors.{{.Name}}ArrayCursor)
|
||||
if !ok {
|
||||
cur.Close()
|
||||
next = {{.Name}}EmptyArrayCursor
|
||||
itr = nil
|
||||
c.err = errors.New("expected {{.name}} cursor")
|
||||
} else {
|
||||
if c.filter != nil {
|
||||
c.filter.reset(next)
|
||||
next = c.filter
|
||||
}
|
||||
}
|
||||
c.{{.Name}}ArrayCursor = next
|
||||
} else {
|
||||
c.{{.Name}}ArrayCursor = {{.Name}}EmptyArrayCursor
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
type {{.name}}MultiShardArrayCursor struct {
|
||||
cursors.{{.Name}}ArrayCursor
|
||||
cursorContext
|
||||
|
@ -206,7 +231,6 @@ func (c *{{.name}}MultiShardArrayCursor) reset(cur cursors.{{.Name}}ArrayCursor,
|
|||
c.{{.Name}}ArrayCursor = cur
|
||||
c.itrs = itrs
|
||||
c.err = nil
|
||||
c.count = 0
|
||||
}
|
||||
|
||||
|
||||
|
@ -224,14 +248,6 @@ func (c *{{.name}}MultiShardArrayCursor) Next() {{$arrayType}} {
|
|||
continue
|
||||
}
|
||||
}
|
||||
c.count += int64(a.Len())
|
||||
if c.count > c.limit {
|
||||
diff := c.count - c.limit
|
||||
c.count -= diff
|
||||
rem := int64(a.Len()) - diff
|
||||
a.Timestamps = a.Timestamps[:rem]
|
||||
a.Values = a.Values[:rem]
|
||||
}
|
||||
return a
|
||||
}
|
||||
}
|
||||
|
@ -270,79 +286,162 @@ func (c *{{.name}}MultiShardArrayCursor) nextArrayCursor() bool {
|
|||
return ok
|
||||
}
|
||||
|
||||
{{if .Agg}}
|
||||
{{$type := print .name "ArraySumCursor"}}
|
||||
{{$Type := print .Name "ArraySumCursor"}}
|
||||
|
||||
|
||||
type {{$type}} struct {
|
||||
type {{.name}}LimitArrayCursor struct {
|
||||
cursors.{{.Name}}ArrayCursor
|
||||
ts [1]int64
|
||||
vs [1]{{.Type}}
|
||||
res {{$arrayType}}
|
||||
done bool
|
||||
}
|
||||
|
||||
func new{{$Type}}(cur cursors.{{.Name}}ArrayCursor) *{{$type}} {
|
||||
return &{{$type}}{
|
||||
func new{{.Name}}LimitArrayCursor(cur cursors.{{.Name}}ArrayCursor) *{{.name}}LimitArrayCursor {
|
||||
return &{{.name}}LimitArrayCursor{
|
||||
{{.Name}}ArrayCursor: cur,
|
||||
res: &cursors.{{.Name}}Array{},
|
||||
res: cursors.New{{.Name}}ArrayLen(1),
|
||||
}
|
||||
}
|
||||
|
||||
func (c {{$type}}) Stats() cursors.CursorStats { return c.{{.Name}}ArrayCursor.Stats() }
|
||||
func (c *{{.name}}LimitArrayCursor) Stats() cursors.CursorStats { return c.{{.Name}}ArrayCursor.Stats() }
|
||||
|
||||
func (c {{$type}}) Next() {{$arrayType}} {
|
||||
func (c *{{.name}}LimitArrayCursor) Next() {{$arrayType}} {
|
||||
if c.done {
|
||||
return &cursors.{{.Name}}Array{}
|
||||
}
|
||||
a := c.{{.Name}}ArrayCursor.Next()
|
||||
if len(a.Timestamps) == 0 {
|
||||
return a
|
||||
}
|
||||
c.done = true
|
||||
c.res.Timestamps[0] = a.Timestamps[0]
|
||||
c.res.Values[0] = a.Values[0]
|
||||
return c.res
|
||||
}
|
||||
|
||||
ts := a.Timestamps[0]
|
||||
var acc {{.Type}}
|
||||
type {{.name}}WindowLastArrayCursor struct {
|
||||
cursors.{{.Name}}ArrayCursor
|
||||
windowEnd int64
|
||||
res {{$arrayType}}
|
||||
tmp {{$arrayType}}
|
||||
window execute.Window
|
||||
}
|
||||
|
||||
for {
|
||||
for _, v := range a.Values {
|
||||
acc += v
|
||||
}
|
||||
a = c.{{.Name}}ArrayCursor.Next()
|
||||
if len(a.Timestamps) == 0 {
|
||||
c.ts[0] = ts
|
||||
c.vs[0] = acc
|
||||
c.res.Timestamps = c.ts[:]
|
||||
c.res.Values = c.vs[:]
|
||||
return c.res
|
||||
}
|
||||
// Window array cursors assume that every != 0 && every != MaxInt64.
|
||||
// Such a cursor will panic in the first case and possibly overflow in the second.
|
||||
func new{{.Name}}WindowLastArrayCursor(cur cursors.{{.Name}}ArrayCursor, window execute.Window) *{{.name}}WindowLastArrayCursor {
|
||||
return &{{.name}}WindowLastArrayCursor{
|
||||
{{.Name}}ArrayCursor: cur,
|
||||
windowEnd: math.MinInt64,
|
||||
res: cursors.New{{.Name}}ArrayLen(MaxPointsPerBlock),
|
||||
tmp: &cursors.{{.Name}}Array{},
|
||||
window: window,
|
||||
}
|
||||
}
|
||||
|
||||
{{end}}
|
||||
|
||||
type {{.Name}}CountArrayCursor struct {
|
||||
cursors.{{.Name}}ArrayCursor
|
||||
}
|
||||
|
||||
func (c *{{.Name}}CountArrayCursor) Stats() cursors.CursorStats {
|
||||
func (c *{{.name}}WindowLastArrayCursor) Stats() cursors.CursorStats {
|
||||
return c.{{.Name}}ArrayCursor.Stats()
|
||||
}
|
||||
|
||||
func (c *{{.Name}}CountArrayCursor) Next() *cursors.IntegerArray {
|
||||
a := c.{{.Name}}ArrayCursor.Next()
|
||||
if len(a.Timestamps) == 0 {
|
||||
return &cursors.IntegerArray{}
|
||||
func (c *{{.name}}WindowLastArrayCursor) Next() *cursors.{{.Name}}Array {
|
||||
cur := -1
|
||||
|
||||
NEXT:
|
||||
var a *cursors.{{.Name}}Array
|
||||
|
||||
if c.tmp.Len() > 0 {
|
||||
a = c.tmp
|
||||
} else {
|
||||
a = c.{{.Name}}ArrayCursor.Next()
|
||||
}
|
||||
|
||||
ts := a.Timestamps[0]
|
||||
var acc int64
|
||||
for {
|
||||
acc += int64(len(a.Timestamps))
|
||||
if a.Len() == 0 {
|
||||
c.res.Timestamps = c.res.Timestamps[:cur+1]
|
||||
c.res.Values = c.res.Values[:cur+1]
|
||||
return c.res
|
||||
}
|
||||
|
||||
for i, t := range a.Timestamps {
|
||||
if t >= c.windowEnd {
|
||||
cur++
|
||||
}
|
||||
|
||||
if cur == MaxPointsPerBlock {
|
||||
c.tmp.Timestamps = a.Timestamps[i:]
|
||||
c.tmp.Values = a.Values[i:]
|
||||
return c.res
|
||||
}
|
||||
|
||||
c.res.Timestamps[cur] = t
|
||||
c.res.Values[cur] = a.Values[i]
|
||||
|
||||
c.windowEnd = int64(c.window.GetEarliestBounds(values.Time(t)).Stop)
|
||||
}
|
||||
|
||||
c.tmp.Timestamps = nil
|
||||
c.tmp.Values = nil
|
||||
|
||||
goto NEXT
|
||||
}
|
||||
|
||||
type {{.name}}WindowFirstArrayCursor struct {
|
||||
cursors.{{.Name}}ArrayCursor
|
||||
windowEnd int64
|
||||
res {{$arrayType}}
|
||||
tmp {{$arrayType}}
|
||||
window execute.Window
|
||||
}
|
||||
|
||||
// Window array cursors assume that every != 0 && every != MaxInt64.
|
||||
// Such a cursor will panic in the first case and possibly overflow in the second.
|
||||
func new{{.Name}}WindowFirstArrayCursor(cur cursors.{{.Name}}ArrayCursor, window execute.Window) *{{.name}}WindowFirstArrayCursor {
|
||||
return &{{.name}}WindowFirstArrayCursor{
|
||||
{{.Name}}ArrayCursor: cur,
|
||||
windowEnd: math.MinInt64,
|
||||
res: cursors.New{{.Name}}ArrayLen(MaxPointsPerBlock),
|
||||
tmp: &cursors.{{.Name}}Array{},
|
||||
window: window,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *{{.name}}WindowFirstArrayCursor) Stats() cursors.CursorStats {
|
||||
return c.{{.Name}}ArrayCursor.Stats()
|
||||
}
|
||||
|
||||
func (c *{{.name}}WindowFirstArrayCursor) Next() *cursors.{{.Name}}Array {
|
||||
c.res.Timestamps = c.res.Timestamps[:0]
|
||||
c.res.Values = c.res.Values[:0]
|
||||
|
||||
NEXT:
|
||||
var a *cursors.{{.Name}}Array
|
||||
|
||||
if c.tmp.Len() > 0 {
|
||||
a = c.tmp
|
||||
} else {
|
||||
a = c.{{.Name}}ArrayCursor.Next()
|
||||
if len(a.Timestamps) == 0 {
|
||||
res := cursors.NewIntegerArrayLen(1)
|
||||
res.Timestamps[0] = ts
|
||||
res.Values[0] = acc
|
||||
return res
|
||||
}
|
||||
|
||||
if a.Len() == 0 {
|
||||
return c.res
|
||||
}
|
||||
|
||||
for i, t := range a.Timestamps {
|
||||
if t < c.windowEnd {
|
||||
continue
|
||||
}
|
||||
|
||||
c.windowEnd = int64(c.window.GetEarliestBounds(values.Time(t)).Stop)
|
||||
|
||||
c.res.Timestamps = append(c.res.Timestamps, t)
|
||||
c.res.Values = append(c.res.Values, a.Values[i])
|
||||
|
||||
if c.res.Len() == MaxPointsPerBlock {
|
||||
c.tmp.Timestamps = a.Timestamps[i+1:]
|
||||
c.tmp.Values = a.Values[i+1:]
|
||||
return c.res
|
||||
}
|
||||
}
|
||||
|
||||
c.tmp.Timestamps = nil
|
||||
c.tmp.Values = nil
|
||||
|
||||
goto NEXT
|
||||
}
|
||||
|
||||
{{/* create an aggregate cursor for each aggregate function supported by the type */}}
|
||||
|
|
|
@ -3,68 +3,171 @@
|
|||
"Name":"Float",
|
||||
"name":"float",
|
||||
"Type":"float64",
|
||||
"ValueType":"FloatValue",
|
||||
"Nil":"0",
|
||||
"Agg":true,
|
||||
"Aggs": [
|
||||
{
|
||||
"Name":"Mean",
|
||||
"OutputTypeName":"Float",
|
||||
"AccDecls":"var sum float64; var count int64",
|
||||
"Accumulate":"sum += a.Values[rowIdx]; count++",
|
||||
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = sum / float64(count)",
|
||||
"AccReset":"sum = 0; count = 0"
|
||||
}
|
||||
{
|
||||
"Name":"Count",
|
||||
"OutputTypeName":"Integer",
|
||||
"AccDecls":"var acc int64 = 0",
|
||||
"Accumulate":"acc++",
|
||||
"AccEmit": "c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = acc",
|
||||
"AccReset":"acc = 0"
|
||||
},
|
||||
{
|
||||
"Name":"Sum",
|
||||
"OutputTypeName":"Float",
|
||||
"AccDecls":"var acc float64 = 0",
|
||||
"Accumulate":"acc += a.Values[rowIdx]",
|
||||
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = acc",
|
||||
"AccReset":"acc = 0"
|
||||
},
|
||||
{
|
||||
"Name":"Min",
|
||||
"OutputTypeName":"Float",
|
||||
"AccDecls":"var acc float64 = math.MaxFloat64; var tsAcc int64",
|
||||
"Accumulate":"if !windowHasPoints || a.Values[rowIdx] < acc { acc = a.Values[rowIdx]; tsAcc = a.Timestamps[rowIdx] }",
|
||||
"AccEmit":"c.res.Timestamps[pos] = tsAcc; c.res.Values[pos] = acc",
|
||||
"AccReset":"acc = math.MaxFloat64"
|
||||
},
|
||||
{
|
||||
"Name":"Max",
|
||||
"OutputTypeName":"Float",
|
||||
"AccDecls":"var acc float64 = -math.MaxFloat64; var tsAcc int64",
|
||||
"Accumulate":"if !windowHasPoints || a.Values[rowIdx] > acc { acc = a.Values[rowIdx]; tsAcc = a.Timestamps[rowIdx] }",
|
||||
"AccEmit":"c.res.Timestamps[pos] = tsAcc; c.res.Values[pos] = acc",
|
||||
"AccReset":"acc = -math.MaxFloat64"
|
||||
},
|
||||
{
|
||||
"Name":"Mean",
|
||||
"OutputTypeName":"Float",
|
||||
"AccDecls":"var sum float64; var count int64",
|
||||
"Accumulate":"sum += a.Values[rowIdx]; count++",
|
||||
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = sum / float64(count)",
|
||||
"AccReset":"sum = 0; count = 0"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"Name":"Integer",
|
||||
"name":"integer",
|
||||
"Type":"int64",
|
||||
"ValueType":"IntegerValue",
|
||||
"Nil":"0",
|
||||
"Agg":true,
|
||||
"Aggs": [
|
||||
{
|
||||
"Name":"Mean",
|
||||
"OutputTypeName":"Float",
|
||||
"AccDecls":"var sum int64; var count int64",
|
||||
"Accumulate":"sum += a.Values[rowIdx]; count++",
|
||||
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = float64(sum) / float64(count)",
|
||||
"AccReset":"sum = 0; count = 0"
|
||||
}
|
||||
{
|
||||
"Name":"Count",
|
||||
"OutputTypeName":"Integer",
|
||||
"AccDecls":"var acc int64 = 0",
|
||||
"Accumulate":"acc++",
|
||||
"AccEmit": "c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = acc",
|
||||
"AccReset":"acc = 0"
|
||||
},
|
||||
{
|
||||
"Name":"Sum",
|
||||
"OutputTypeName":"Integer",
|
||||
"AccDecls":"var acc int64 = 0",
|
||||
"Accumulate":"acc += a.Values[rowIdx]",
|
||||
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = acc",
|
||||
"AccReset":"acc = 0"
|
||||
},
|
||||
{
|
||||
"Name":"Min",
|
||||
"OutputTypeName":"Integer",
|
||||
"AccDecls":"var acc int64 = math.MaxInt64; var tsAcc int64",
|
||||
"Accumulate":"if !windowHasPoints || a.Values[rowIdx] < acc { acc = a.Values[rowIdx]; tsAcc = a.Timestamps[rowIdx] }",
|
||||
"AccEmit":"c.res.Timestamps[pos] = tsAcc; c.res.Values[pos] = acc",
|
||||
"AccReset":"acc = math.MaxInt64"
|
||||
},
|
||||
{
|
||||
"Name":"Max",
|
||||
"OutputTypeName":"Integer",
|
||||
"AccDecls":"var acc int64 = math.MinInt64; var tsAcc int64",
|
||||
"Accumulate":"if !windowHasPoints || a.Values[rowIdx] > acc { acc = a.Values[rowIdx]; tsAcc = a.Timestamps[rowIdx] }",
|
||||
"AccEmit":"c.res.Timestamps[pos] = tsAcc; c.res.Values[pos] = acc",
|
||||
"AccReset":"acc = math.MinInt64"
|
||||
},
|
||||
{
|
||||
"Name":"Mean",
|
||||
"OutputTypeName":"Float",
|
||||
"AccDecls":"var sum int64; var count int64",
|
||||
"Accumulate":"sum += a.Values[rowIdx]; count++",
|
||||
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = float64(sum) / float64(count)",
|
||||
"AccReset":"sum = 0; count = 0"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"Name":"Unsigned",
|
||||
"name":"unsigned",
|
||||
"Type":"uint64",
|
||||
"ValueType":"UnsignedValue",
|
||||
"Nil":"0",
|
||||
"Agg":true,
|
||||
"Aggs": [
|
||||
{
|
||||
"Name":"Mean",
|
||||
"OutputTypeName":"Float",
|
||||
"AccDecls":"var sum uint64; var count int64",
|
||||
"Accumulate":"sum += a.Values[rowIdx]; count++",
|
||||
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = float64(sum) / float64(count)",
|
||||
"AccReset":"sum = 0; count = 0"
|
||||
}
|
||||
{
|
||||
"Name":"Count",
|
||||
"OutputTypeName":"Integer",
|
||||
"AccDecls":"var acc int64 = 0",
|
||||
"Accumulate":"acc++",
|
||||
"AccEmit": "c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = acc",
|
||||
"AccReset":"acc = 0"
|
||||
},
|
||||
{
|
||||
"Name":"Sum",
|
||||
"OutputTypeName":"Unsigned",
|
||||
"AccDecls":"var acc uint64 = 0",
|
||||
"Accumulate":"acc += a.Values[rowIdx]",
|
||||
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = acc",
|
||||
"AccReset":"acc = 0"
|
||||
},
|
||||
{
|
||||
"Name":"Min",
|
||||
"OutputTypeName":"Unsigned",
|
||||
"AccDecls":"var acc uint64 = math.MaxUint64; var tsAcc int64",
|
||||
"Accumulate":"if !windowHasPoints || a.Values[rowIdx] < acc { acc = a.Values[rowIdx]; tsAcc = a.Timestamps[rowIdx] }",
|
||||
"AccEmit":"c.res.Timestamps[pos] = tsAcc; c.res.Values[pos] = acc",
|
||||
"AccReset":"acc = math.MaxUint64"
|
||||
},
|
||||
{
|
||||
"Name":"Max",
|
||||
"OutputTypeName":"Unsigned",
|
||||
"AccDecls":"var acc uint64 = 0; var tsAcc int64",
|
||||
"Accumulate":"if !windowHasPoints || a.Values[rowIdx] > acc { acc = a.Values[rowIdx]; tsAcc = a.Timestamps[rowIdx] }",
|
||||
"AccEmit":"c.res.Timestamps[pos] = tsAcc; c.res.Values[pos] = acc",
|
||||
"AccReset":"acc = 0"
|
||||
},
|
||||
{
|
||||
"Name":"Mean",
|
||||
"OutputTypeName":"Float",
|
||||
"AccDecls":"var sum uint64; var count int64",
|
||||
"Accumulate":"sum += a.Values[rowIdx]; count++",
|
||||
"AccEmit":"c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = float64(sum) / float64(count)",
|
||||
"AccReset":"sum = 0; count = 0"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"Name":"String",
|
||||
"name":"string",
|
||||
"Type":"string",
|
||||
"ValueType":"StringValue",
|
||||
"Nil":"\"\""
|
||||
"Aggs": [
|
||||
{
|
||||
"Name":"Count",
|
||||
"OutputTypeName":"Integer",
|
||||
"AccDecls":"var acc int64 = 0",
|
||||
"Accumulate":"acc++",
|
||||
"AccEmit": "c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = acc",
|
||||
"AccReset":"acc = 0"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"Name":"Boolean",
|
||||
"name":"boolean",
|
||||
"Type":"bool",
|
||||
"ValueType":"BooleanValue",
|
||||
"Nil":"false"
|
||||
"Aggs": [
|
||||
{
|
||||
"Name":"Count",
|
||||
"OutputTypeName":"Integer",
|
||||
"AccDecls":"var acc int64 = 0",
|
||||
"Accumulate":"acc++",
|
||||
"AccEmit": "c.res.Timestamps[pos] = windowEnd; c.res.Values[pos] = acc",
|
||||
"AccReset":"acc = 0"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
|
|
|
@ -18,6 +18,10 @@ func (v *singleValue) Value(key string) (interface{}, bool) {
|
|||
}
|
||||
|
||||
func newAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) (cursors.Cursor, error) {
|
||||
switch agg.Type {
|
||||
case datatypes.AggregateTypeFirst, datatypes.AggregateTypeLast:
|
||||
return newLimitArrayCursor(cursor), nil
|
||||
}
|
||||
return newWindowAggregateArrayCursor(ctx, agg, execute.Window{}, cursor)
|
||||
}
|
||||
|
||||
|
@ -25,11 +29,20 @@ func newWindowAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate
|
|||
if cursor == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
switch agg.Type {
|
||||
case datatypes.AggregateTypeSum:
|
||||
return newSumArrayCursor(cursor), nil
|
||||
case datatypes.AggregateTypeCount:
|
||||
return newCountArrayCursor(cursor), nil
|
||||
return newWindowCountArrayCursor(cursor, window), nil
|
||||
case datatypes.AggregateTypeSum:
|
||||
return newWindowSumArrayCursor(cursor, window)
|
||||
case datatypes.AggregateTypeFirst:
|
||||
return newWindowFirstArrayCursor(cursor, window), nil
|
||||
case datatypes.AggregateTypeLast:
|
||||
return newWindowLastArrayCursor(cursor, window), nil
|
||||
case datatypes.AggregateTypeMin:
|
||||
return newWindowMinArrayCursor(cursor, window), nil
|
||||
case datatypes.AggregateTypeMax:
|
||||
return newWindowMaxArrayCursor(cursor, window), nil
|
||||
case datatypes.AggregateTypeMean:
|
||||
return newWindowMeanArrayCursor(cursor, window)
|
||||
default:
|
||||
|
@ -38,21 +51,28 @@ func newWindowAggregateArrayCursor(ctx context.Context, agg *datatypes.Aggregate
|
|||
}
|
||||
}
|
||||
|
||||
type arrayCursors struct {
|
||||
type cursorContext struct {
|
||||
ctx context.Context
|
||||
req *cursors.CursorRequest
|
||||
itrs cursors.CursorIterators
|
||||
err error
|
||||
}
|
||||
|
||||
type multiShardArrayCursors struct {
|
||||
ctx context.Context
|
||||
req cursors.CursorRequest
|
||||
|
||||
cursors struct {
|
||||
i integerArrayCursor
|
||||
f floatArrayCursor
|
||||
u unsignedArrayCursor
|
||||
b booleanArrayCursor
|
||||
s stringArrayCursor
|
||||
i integerMultiShardArrayCursor
|
||||
f floatMultiShardArrayCursor
|
||||
u unsignedMultiShardArrayCursor
|
||||
b booleanMultiShardArrayCursor
|
||||
s stringMultiShardArrayCursor
|
||||
}
|
||||
}
|
||||
|
||||
func newArrayCursors(ctx context.Context, start, end int64, asc bool) *arrayCursors {
|
||||
m := &arrayCursors{
|
||||
func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool) *multiShardArrayCursors {
|
||||
m := &multiShardArrayCursors{
|
||||
ctx: ctx,
|
||||
req: cursors.CursorRequest{
|
||||
Ascending: asc,
|
||||
|
@ -75,134 +95,6 @@ func newArrayCursors(ctx context.Context, start, end int64, asc bool) *arrayCurs
|
|||
return m
|
||||
}
|
||||
|
||||
func (m *arrayCursors) createCursor(seriesRow SeriesRow) cursors.Cursor {
|
||||
m.req.Name = seriesRow.Name
|
||||
m.req.Tags = seriesRow.SeriesTags
|
||||
m.req.Field = seriesRow.Field
|
||||
|
||||
var cond expression
|
||||
if seriesRow.ValueCond != nil {
|
||||
cond = &astExpr{seriesRow.ValueCond}
|
||||
}
|
||||
|
||||
if seriesRow.Query == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var itr cursors.CursorIterator
|
||||
var cur cursors.Cursor
|
||||
for cur == nil && len(seriesRow.Query) > 0 {
|
||||
itr, seriesRow.Query = seriesRow.Query[0], seriesRow.Query[1:]
|
||||
cur, _ = itr.Next(m.ctx, &m.req)
|
||||
|
||||
switch c := cur.(type) {
|
||||
case cursors.IntegerArrayCursor:
|
||||
m.cursors.i.reset(c, seriesRow.Query, cond)
|
||||
return &m.cursors.i
|
||||
case cursors.FloatArrayCursor:
|
||||
m.cursors.f.reset(c, seriesRow.Query, cond)
|
||||
return &m.cursors.f
|
||||
case cursors.UnsignedArrayCursor:
|
||||
m.cursors.u.reset(c, seriesRow.Query, cond)
|
||||
return &m.cursors.u
|
||||
case cursors.StringArrayCursor:
|
||||
m.cursors.s.reset(c, seriesRow.Query, cond)
|
||||
return &m.cursors.s
|
||||
case cursors.BooleanArrayCursor:
|
||||
m.cursors.b.reset(c, seriesRow.Query, cond)
|
||||
return &m.cursors.b
|
||||
default:
|
||||
panic(fmt.Sprintf("unreachable: %T", cur))
|
||||
}
|
||||
}
|
||||
// if cursor is nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func newSumArrayCursor(cur cursors.Cursor) cursors.Cursor {
|
||||
switch cur := cur.(type) {
|
||||
case cursors.FloatArrayCursor:
|
||||
return newFloatArraySumCursor(cur)
|
||||
case cursors.IntegerArrayCursor:
|
||||
return newIntegerArraySumCursor(cur)
|
||||
case cursors.UnsignedArrayCursor:
|
||||
return newUnsignedArraySumCursor(cur)
|
||||
default:
|
||||
// TODO(sgc): propagate an error instead?
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func newCountArrayCursor(cur cursors.Cursor) cursors.Cursor {
|
||||
switch cur := cur.(type) {
|
||||
case cursors.FloatArrayCursor:
|
||||
return &FloatCountArrayCursor{FloatArrayCursor: cur}
|
||||
case cursors.IntegerArrayCursor:
|
||||
return &IntegerCountArrayCursor{IntegerArrayCursor: cur}
|
||||
case cursors.UnsignedArrayCursor:
|
||||
return &UnsignedCountArrayCursor{UnsignedArrayCursor: cur}
|
||||
case cursors.StringArrayCursor:
|
||||
return &StringCountArrayCursor{StringArrayCursor: cur}
|
||||
case cursors.BooleanArrayCursor:
|
||||
return &BooleanCountArrayCursor{BooleanArrayCursor: cur}
|
||||
default:
|
||||
panic(fmt.Sprintf("unreachable: %T", cur))
|
||||
}
|
||||
}
|
||||
|
||||
type cursorContext struct {
|
||||
ctx context.Context
|
||||
req *cursors.CursorRequest
|
||||
itrs cursors.CursorIterators
|
||||
limit int64
|
||||
count int64
|
||||
err error
|
||||
}
|
||||
|
||||
type multiShardArrayCursors struct {
|
||||
ctx context.Context
|
||||
limit int64
|
||||
req cursors.CursorRequest
|
||||
|
||||
cursors struct {
|
||||
i integerMultiShardArrayCursor
|
||||
f floatMultiShardArrayCursor
|
||||
u unsignedMultiShardArrayCursor
|
||||
b booleanMultiShardArrayCursor
|
||||
s stringMultiShardArrayCursor
|
||||
}
|
||||
}
|
||||
|
||||
func newMultiShardArrayCursors(ctx context.Context, start, end int64, asc bool, limit int64) *multiShardArrayCursors {
|
||||
if limit < 0 {
|
||||
limit = 1
|
||||
}
|
||||
|
||||
m := &multiShardArrayCursors{
|
||||
ctx: ctx,
|
||||
limit: limit,
|
||||
req: cursors.CursorRequest{
|
||||
Ascending: asc,
|
||||
StartTime: start,
|
||||
EndTime: end,
|
||||
},
|
||||
}
|
||||
|
||||
cc := cursorContext{
|
||||
ctx: ctx,
|
||||
limit: limit,
|
||||
req: &m.req,
|
||||
}
|
||||
|
||||
m.cursors.i.cursorContext = cc
|
||||
m.cursors.f.cursorContext = cc
|
||||
m.cursors.u.cursorContext = cc
|
||||
m.cursors.b.cursorContext = cc
|
||||
m.cursors.s.cursorContext = cc
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor {
|
||||
m.req.Name = row.Name
|
||||
m.req.Tags = row.SeriesTags
|
||||
|
@ -244,7 +136,3 @@ func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor {
|
|||
panic(fmt.Sprintf("unreachable: %T", cur))
|
||||
}
|
||||
}
|
||||
|
||||
func (m *multiShardArrayCursors) newAggregateCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) (cursors.Cursor, error) {
|
||||
return newAggregateArrayCursor(ctx, agg, cursor)
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -16,10 +16,6 @@ import (
|
|||
{{$ColType := .Name}}
|
||||
{{$colType := .name}}
|
||||
|
||||
{{range .Aggs}}
|
||||
{{if eq .Name "Mean"}}
|
||||
{{$Agg := .Name}}
|
||||
|
||||
type Mock{{$ColType}}ArrayCursor struct {
|
||||
CloseFunc func()
|
||||
ErrFunc func() error
|
||||
|
@ -33,6 +29,8 @@ func (c *Mock{{$ColType}}ArrayCursor) Stats() cursors.CursorStats { return c.St
|
|||
func (c *Mock{{$ColType}}ArrayCursor) Next() *cursors.{{$ColType}}Array { return c.NextFunc() }
|
||||
|
||||
func TestNewAggregateArrayCursor_{{$ColType}}(t *testing.T) {
|
||||
{{range .Aggs}}
|
||||
{{$Agg := .Name}}
|
||||
t.Run("{{$Agg}}", func(t *testing.T) {
|
||||
want := &{{$colType}}Window{{$Agg}}ArrayCursor{
|
||||
{{$ColType}}ArrayCursor: &Mock{{$ColType}}ArrayCursor{},
|
||||
|
@ -50,9 +48,12 @@ func TestNewAggregateArrayCursor_{{$ColType}}(t *testing.T) {
|
|||
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
})
|
||||
{{end}}
|
||||
}
|
||||
|
||||
func TestNewWindowAggregateArrayCursorMonths_{{$ColType}}(t *testing.T) {
|
||||
{{range .Aggs}}
|
||||
{{$Agg := .Name}}
|
||||
t.Run("{{$Agg}}", func(t *testing.T) {
|
||||
window := execute.Window{
|
||||
Every: values.MakeDuration(int64(time.Hour), 0, false),
|
||||
|
@ -76,9 +77,12 @@ func TestNewWindowAggregateArrayCursorMonths_{{$ColType}}(t *testing.T) {
|
|||
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
})
|
||||
{{end}}
|
||||
}
|
||||
|
||||
func TestNewWindowAggregateArrayCursor_{{$ColType}}(t *testing.T) {
|
||||
{{range .Aggs}}
|
||||
{{$Agg := .Name}}
|
||||
t.Run("{{$Agg}}", func(t *testing.T) {
|
||||
window := execute.Window{
|
||||
Every: values.MakeDuration(0, 1, false),
|
||||
|
@ -102,8 +106,6 @@ func TestNewWindowAggregateArrayCursor_{{$ColType}}(t *testing.T) {
|
|||
t.Fatalf("did not get expected cursor; -got/+want:\n%v", diff)
|
||||
}
|
||||
})
|
||||
{{end}}
|
||||
}
|
||||
{{end}}
|
||||
{{end}}
|
||||
{{end}}{{/* range over each supported field type */}}
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -4,7 +4,6 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
|
@ -55,7 +54,7 @@ func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, new
|
|||
o(g)
|
||||
}
|
||||
|
||||
g.arrayCursors = newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, true, math.MaxInt64)
|
||||
g.arrayCursors = newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, true)
|
||||
|
||||
for i, k := range req.GroupKeys {
|
||||
g.keys[i] = []byte(k)
|
||||
|
@ -267,6 +266,8 @@ type groupNoneCursor struct {
|
|||
cur SeriesCursor
|
||||
row SeriesRow
|
||||
keys [][]byte
|
||||
cursor cursors.Cursor
|
||||
err error
|
||||
}
|
||||
|
||||
func (c *groupNoneCursor) Err() error { return nil }
|
||||
|
@ -288,15 +289,20 @@ func (c *groupNoneCursor) Next() bool {
|
|||
|
||||
c.row = *row
|
||||
|
||||
return true
|
||||
c.cursor, c.err = c.createCursor(c.row)
|
||||
return c.err == nil
|
||||
}
|
||||
|
||||
func (c *groupNoneCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, err error) {
|
||||
cur = c.arrayCursors.createCursor(c.row)
|
||||
if c.agg != nil {
|
||||
cur, err = newAggregateArrayCursor(c.ctx, c.agg, cur)
|
||||
}
|
||||
return cur, err
|
||||
}
|
||||
|
||||
func (c *groupNoneCursor) Cursor() cursors.Cursor {
|
||||
cur := c.arrayCursors.createCursor(c.row)
|
||||
if c.agg != nil {
|
||||
cur, _ = c.arrayCursors.newAggregateCursor(c.ctx, c.agg, cur)
|
||||
}
|
||||
return cur
|
||||
return c.cursor
|
||||
}
|
||||
|
||||
type groupByCursor struct {
|
||||
|
@ -307,6 +313,8 @@ type groupByCursor struct {
|
|||
seriesRows []*SeriesRow
|
||||
keys [][]byte
|
||||
vals [][]byte
|
||||
cursor cursors.Cursor
|
||||
err error
|
||||
}
|
||||
|
||||
func (c *groupByCursor) reset(seriesRows []*SeriesRow) {
|
||||
|
@ -327,17 +335,22 @@ func (c *groupByCursor) Aggregate() *datatypes.Aggregate {
|
|||
func (c *groupByCursor) Next() bool {
|
||||
if c.i < len(c.seriesRows) {
|
||||
c.i++
|
||||
return true
|
||||
c.cursor, c.err = c.createCursor(*c.seriesRows[c.i-1])
|
||||
return c.err == nil
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *groupByCursor) Cursor() cursors.Cursor {
|
||||
cur := c.arrayCursors.createCursor(*c.seriesRows[c.i-1])
|
||||
func (c *groupByCursor) createCursor(seriesRow SeriesRow) (cur cursors.Cursor, err error) {
|
||||
cur = c.arrayCursors.createCursor(seriesRow)
|
||||
if c.agg != nil {
|
||||
cur, _ = c.arrayCursors.newAggregateCursor(c.ctx, c.agg, cur)
|
||||
cur, err = newAggregateArrayCursor(c.ctx, c.agg, cur)
|
||||
}
|
||||
return cur
|
||||
return cur, err
|
||||
}
|
||||
|
||||
func (c *groupByCursor) Cursor() cursors.Cursor {
|
||||
return c.cursor
|
||||
}
|
||||
|
||||
func (c *groupByCursor) Stats() cursors.CursorStats {
|
||||
|
|
|
@ -2,7 +2,6 @@ package reads
|
|||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
|
||||
"github.com/influxdata/influxdb/v2/models"
|
||||
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
|
||||
|
@ -11,12 +10,10 @@ import (
|
|||
|
||||
type multiShardCursors interface {
|
||||
createCursor(row SeriesRow) cursors.Cursor
|
||||
newAggregateCursor(ctx context.Context, agg *datatypes.Aggregate, cursor cursors.Cursor) (cursors.Cursor, error)
|
||||
}
|
||||
|
||||
type resultSet struct {
|
||||
ctx context.Context
|
||||
agg *datatypes.Aggregate
|
||||
seriesCursor SeriesCursor
|
||||
seriesRow SeriesRow
|
||||
arrayCursors multiShardCursors
|
||||
|
@ -26,7 +23,7 @@ func NewFilteredResultSet(ctx context.Context, req *datatypes.ReadFilterRequest,
|
|||
return &resultSet{
|
||||
ctx: ctx,
|
||||
seriesCursor: seriesCursor,
|
||||
arrayCursors: newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, true, math.MaxInt64),
|
||||
arrayCursors: newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, true),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -58,11 +55,7 @@ func (r *resultSet) Next() bool {
|
|||
}
|
||||
|
||||
func (r *resultSet) Cursor() cursors.Cursor {
|
||||
cur := r.arrayCursors.createCursor(r.seriesRow)
|
||||
if r.agg != nil {
|
||||
cur, _ = r.arrayCursors.newAggregateCursor(r.ctx, r.agg, cur)
|
||||
}
|
||||
return cur
|
||||
return r.arrayCursors.createCursor(r.seriesRow)
|
||||
}
|
||||
|
||||
func (r *resultSet) Tags() models.Tags {
|
||||
|
|
Loading…
Reference in New Issue