influxdb/storage/flux/table.gen.go.tmpl

952 lines
24 KiB
Cheetah
Raw Normal View History

package storageflux
import (
2020-10-29 16:06:41 +00:00
"fmt"
"math"
"sync"
2020-10-29 16:06:41 +00:00
"github.com/apache/arrow/go/arrow/array"
"github.com/influxdata/flux"
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/models"
2020-10-29 16:06:41 +00:00
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
storage "github.com/influxdata/influxdb/v2/storage/reads"
2020-08-26 17:46:47 +00:00
"github.com/influxdata/influxdb/v2/tsdb/cursors"
)
{{range .}}
//
// *********** {{.Name}} ***********
//
type {{.name}}Table struct {
table
mu sync.Mutex
cur cursors.{{.Name}}ArrayCursor
alloc *memory.Allocator
}
func new{{.Name}}Table(
done chan struct{},
cur cursors.{{.Name}}ArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *{{.name}}Table {
t := &{{.name}}Table{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
}
t.readTags(tags)
2020-10-29 16:06:41 +00:00
t.init(t.advance)
return t
}
func (t *{{.name}}Table) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
t.mu.Unlock()
}
func (t *{{.name}}Table) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return cursors.CursorStats{}
}
cs := cur.Stats()
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
}
func (t *{{.name}}Table) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *{{.name}}Table) advance() bool {
a := t.cur.Next()
l := a.Len()
if l == 0 {
return false
}
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
cr := t.allocateBuffer(l)
cr.cols[timeColIdx] = arrow.NewInt(a.Timestamps, t.alloc)
cr.cols[valueColIdx] = t.toArrowBuffer(a.Values)
t.appendTags(cr)
t.appendBounds(cr)
return true
}
// window table
type {{.name}}WindowTable struct {
{{.name}}Table
arr *cursors.{{.Name}}Array
nextTS int64
idxInArr int
createEmpty bool
timeColumn string
window execute.Window
{{if eq .Name "Integer"}}fillValue *{{.Type}}{{end}}
}
func new{{.Name}}WindowTable(
done chan struct{},
cur cursors.{{.Name}}ArrayCursor,
bounds execute.Bounds,
window execute.Window,
createEmpty bool,
timeColumn string,
{{if eq .Name "Integer"}}fillValue *{{.Type}},{{end}}
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *{{.name}}WindowTable {
t := &{{.name}}WindowTable{
{{.name}}Table: {{.name}}Table{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
window: window,
createEmpty: createEmpty,
timeColumn: timeColumn,
{{if eq .Name "Integer"}}fillValue: fillValue,{{end}}
}
if t.createEmpty {
start := int64(bounds.Start)
t.nextTS = int64(window.GetEarliestBounds(values.Time(start)).Stop)
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *{{.name}}WindowTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
// createNextBufferTimes will read the timestamps from the array
// cursor and construct the values for the next buffer.
func (t *{{.name}}WindowTable) createNextBufferTimes() (start, stop *array.Int64, ok bool) {
startB := arrow.NewIntBuilder(t.alloc)
stopB := arrow.NewIntBuilder(t.alloc)
if t.createEmpty {
// There are no more windows when the start time is greater
// than or equal to the stop time.
subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive())
if startT := int64(values.Time(t.nextTS).Add(subEvery)); startT >= int64(t.bounds.Stop) {
return nil, nil, false
}
// Create a buffer with the buffer size.
// TODO(jsternberg): Calculate the exact size with max points as the maximum.
startB.Resize(storage.MaxPointsPerBlock)
stopB.Resize(storage.MaxPointsPerBlock)
for ; ; t.nextTS = int64(values.Time(t.nextTS).Add(t.window.Every)) {
startT, stopT := t.getWindowBoundsFor(t.nextTS)
if startT >= int64(t.bounds.Stop) {
break
}
startB.Append(startT)
stopB.Append(stopT)
}
start = startB.NewInt64Array()
stop = stopB.NewInt64Array()
return start, stop, true
}
// Retrieve the next buffer so we can copy the timestamps.
if !t.nextBuffer() {
return nil, nil, false
}
// Copy over the timestamps from the next buffer and adjust
// times for the boundaries.
startB.Resize(len(t.arr.Timestamps))
stopB.Resize(len(t.arr.Timestamps))
for _, stopT := range t.arr.Timestamps {
startT, stopT := t.getWindowBoundsFor(stopT)
startB.Append(startT)
stopB.Append(stopT)
}
start = startB.NewInt64Array()
stop = stopB.NewInt64Array()
return start, stop, true
}
func (t *{{.name}}WindowTable) getWindowBoundsFor(ts int64) (startT, stopT int64) {
subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive())
startT, stopT = int64(values.Time(ts).Add(subEvery)), ts
if startT < int64(t.bounds.Start) {
startT = int64(t.bounds.Start)
}
if stopT > int64(t.bounds.Stop) {
stopT = int64(t.bounds.Stop)
}
return startT, stopT
}
// nextAt will retrieve the next value that can be used with
// the given stop timestamp. If no values can be used with the timestamp,
// it will return the default value and false.
func (t *{{.name}}WindowTable) nextAt(ts int64) (v {{.Type}}, ok bool) {
if !t.nextBuffer() {
return
} else if !t.isInWindow(ts, t.arr.Timestamps[t.idxInArr]) {
return
}
v, ok = t.arr.Values[t.idxInArr], true
t.idxInArr++
return v, ok
}
// isInWindow will check if the given time at stop can be used within
// the window stop time for ts. The ts may be a truncated stop time
// because of a restricted boundary while stop will be the true
// stop time returned by storage.
func (t *{{.name}}WindowTable) isInWindow(ts int64, stop int64) bool {
// This method checks if the stop time is a valid stop time for
// that interval. This calculation is different from the calculation
// of the window itself. For example, for a 10 second window that
// starts at 20 seconds, we would include points between [20, 30).
// The stop time for this interval would be 30, but because the stop
// time can be truncated, valid stop times range from anywhere between
// (20, 30]. The storage engine will always produce 30 as the end time
// but we may have truncated the stop time because of the boundary
// and this is why we are checking for this range instead of checking
// if the two values are equal.
subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive())
start := int64(values.Time(stop).Add(subEvery))
return start < ts && ts <= stop
}
// nextBuffer will ensure the array cursor is filled
// and will return true if there is at least one value
// that can be read from it.
func (t *{{.name}}WindowTable) nextBuffer() bool {
// Discard the current array cursor if we have
// exceeded it.
if t.arr != nil && t.idxInArr >= t.arr.Len() {
t.arr = nil
}
// Retrieve the next array cursor if needed.
if t.arr == nil {
arr := t.cur.Next()
if arr.Len() == 0 {
return false
}
t.arr, t.idxInArr = arr, 0
}
return true
}
// appendValues will scan the timestamps and append values
// that match those timestamps from the buffer.
func (t *{{.name}}WindowTable) appendValues(intervals []int64, appendValue func(v {{.Type}}), appendNull func()) {
for i := 0; i < len(intervals); i++ {
if v, ok := t.nextAt(intervals[i]); ok {
appendValue(v)
continue
}
appendNull()
}
}
func (t *{{.name}}WindowTable) advance() bool {
if !t.nextBuffer() {
return false
}
// Create the timestamps for the next window.
start, stop, ok := t.createNextBufferTimes()
if !ok {
return false
}
values := t.mergeValues(stop.Int64Values())
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
cr := t.allocateBuffer(stop.Len())
if t.timeColumn != "" {
switch t.timeColumn {
case execute.DefaultStopColLabel:
cr.cols[timeColIdx] = stop
start.Release()
case execute.DefaultStartColLabel:
cr.cols[timeColIdx] = start
stop.Release()
}
cr.cols[valueColIdx] = values
t.appendBounds(cr)
} else {
cr.cols[startColIdx] = start
cr.cols[stopColIdx] = stop
cr.cols[valueColIdxWithoutTime] = values
}
t.appendTags(cr)
return true
}
2020-10-29 16:06:41 +00:00
// This table implementation will not have any empty windows.
type {{.name}}WindowSelectorTable struct {
{{.name}}Table
timeColumn string
window execute.Window
}
func new{{.Name}}WindowSelectorTable(
done chan struct{},
cur cursors.{{.Name}}ArrayCursor,
bounds execute.Bounds,
window execute.Window,
timeColumn string,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *{{.name}}WindowSelectorTable {
t := &{{.name}}WindowSelectorTable{
{{.name}}Table: {{.name}}Table{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
window: window,
timeColumn: timeColumn,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *{{.name}}WindowSelectorTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *{{.name}}WindowSelectorTable) advance() bool {
arr := t.cur.Next()
if arr.Len() == 0 {
return false
}
cr := t.allocateBuffer(arr.Len())
switch t.timeColumn {
case execute.DefaultStartColLabel:
cr.cols[timeColIdx] = t.startTimes(arr)
t.appendBounds(cr)
case execute.DefaultStopColLabel:
cr.cols[timeColIdx] = t.stopTimes(arr)
t.appendBounds(cr)
default:
cr.cols[startColIdx] = t.startTimes(arr)
cr.cols[stopColIdx] = t.stopTimes(arr)
cr.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc)
}
cr.cols[valueColIdx] = t.toArrowBuffer(arr.Values)
t.appendTags(cr)
return true
}
func (t *{{.name}}WindowSelectorTable) startTimes(arr *cursors.{{.Name}}Array) *array.Int64 {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(arr.Len())
rangeStart := int64(t.bounds.Start)
for _, v := range arr.Timestamps {
if windowStart := int64(t.window.GetEarliestBounds(values.Time(v)).Start); windowStart < rangeStart {
start.Append(rangeStart)
} else {
start.Append(windowStart)
}
}
return start.NewInt64Array()
}
func (t *{{.name}}WindowSelectorTable) stopTimes(arr *cursors.{{.Name}}Array) *array.Int64 {
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(arr.Len())
rangeStop := int64(t.bounds.Stop)
for _, v := range arr.Timestamps {
if windowStop := int64(t.window.GetEarliestBounds(values.Time(v)).Stop); windowStop > rangeStop {
stop.Append(rangeStop)
} else {
stop.Append(windowStop)
}
}
return stop.NewInt64Array()
}
// This table implementation may contain empty windows
// in addition to non-empty windows.
type {{.name}}EmptyWindowSelectorTable struct {
{{.name}}Table
arr *cursors.{{.Name}}Array
idx int
rangeStart int64
rangeStop int64
windowStart int64
windowStop int64
timeColumn string
window execute.Window
}
func new{{.Name}}EmptyWindowSelectorTable(
done chan struct{},
cur cursors.{{.Name}}ArrayCursor,
bounds execute.Bounds,
window execute.Window,
timeColumn string,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *{{.name}}EmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
t := &{{.name}}EmptyWindowSelectorTable{
{{.name}}Table: {{.name}}Table{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
arr: cur.Next(),
rangeStart: rangeStart,
rangeStop: rangeStop,
windowStart: int64(window.GetEarliestBounds(values.Time(rangeStart)).Start),
windowStop: int64(window.GetEarliestBounds(values.Time(rangeStart)).Stop),
window: window,
timeColumn: timeColumn,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *{{.name}}EmptyWindowSelectorTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *{{.name}}EmptyWindowSelectorTable) advance() bool {
if t.arr.Len() == 0 {
return false
}
values := t.arrowBuilder()
values.Resize(storage.MaxPointsPerBlock)
var cr *colReader
switch t.timeColumn {
case execute.DefaultStartColLabel:
start := t.startTimes(values)
cr = t.allocateBuffer(start.Len())
cr.cols[timeColIdx] = start
t.appendBounds(cr)
case execute.DefaultStopColLabel:
stop := t.stopTimes(values)
cr = t.allocateBuffer(stop.Len())
cr.cols[timeColIdx] = stop
t.appendBounds(cr)
default:
start, stop, time := t.startStopTimes(values)
cr = t.allocateBuffer(time.Len())
cr.cols[startColIdx] = start
cr.cols[stopColIdx] = stop
cr.cols[timeColIdx] = time
}
cr.cols[valueColIdx] = values.New{{.ArrowType}}Array()
t.appendTags(cr)
return true
}
func (t *{{.name}}EmptyWindowSelectorTable) startTimes(builder *array.{{.ArrowType}}Builder) *array.Int64 {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(storage.MaxPointsPerBlock)
for t.windowStart < t.rangeStop {
// The first window should start at the
// beginning of the time range.
if t.windowStart < t.rangeStart {
start.Append(t.rangeStart)
} else {
start.Append(t.windowStart)
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if t.windowStart <= v && v < t.windowStop {
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
builder.AppendNull()
}
t.windowStart = int64(values.Time(t.windowStart).Add(t.window.Every))
t.windowStop = int64(values.Time(t.windowStop).Add(t.window.Every))
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if start.Len() == storage.MaxPointsPerBlock {
break
}
}
return start.NewInt64Array()
}
func (t *{{.name}}EmptyWindowSelectorTable) stopTimes(builder *array.{{.ArrowType}}Builder) *array.Int64 {
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(storage.MaxPointsPerBlock)
for t.windowStart < t.rangeStop {
// The last window should stop at the end of
// the time range.
if t.windowStop > t.rangeStop {
stop.Append(t.rangeStop)
} else {
stop.Append(t.windowStop)
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if t.windowStart <= v && v < t.windowStop {
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
builder.AppendNull()
}
t.windowStart = int64(values.Time(t.windowStart).Add(t.window.Every))
t.windowStop = int64(values.Time(t.windowStop).Add(t.window.Every))
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if stop.Len() == storage.MaxPointsPerBlock {
break
}
}
return stop.NewInt64Array()
}
func (t *{{.name}}EmptyWindowSelectorTable) startStopTimes(builder *array.{{.ArrowType}}Builder) (*array.Int64, *array.Int64, *array.Int64) {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(storage.MaxPointsPerBlock)
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(storage.MaxPointsPerBlock)
time := arrow.NewIntBuilder(t.alloc)
time.Resize(storage.MaxPointsPerBlock)
for t.windowStart < t.rangeStop {
// The first window should start at the
// beginning of the time range.
if t.windowStart < t.rangeStart {
start.Append(t.rangeStart)
} else {
start.Append(t.windowStart)
}
// The last window should stop at the end of
// the time range.
if t.windowStop > t.rangeStop {
stop.Append(t.rangeStop)
} else {
stop.Append(t.windowStop)
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if t.windowStart <= v && v < t.windowStop {
time.Append(v)
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
time.AppendNull()
builder.AppendNull()
}
t.windowStart = int64(values.Time(t.windowStart).Add(t.window.Every))
t.windowStop = int64(values.Time(t.windowStop).Add(t.window.Every))
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if time.Len() == storage.MaxPointsPerBlock {
break
}
}
return start.NewInt64Array(), stop.NewInt64Array(), time.NewInt64Array()
}
// group table
type {{.name}}GroupTable struct {
table
mu sync.Mutex
gc storage.GroupCursor
cur cursors.{{.Name}}ArrayCursor
}
func new{{.Name}}GroupTable(
done chan struct{},
gc storage.GroupCursor,
cur cursors.{{.Name}}ArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *{{.name}}GroupTable {
t := &{{.name}}GroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
gc: gc,
cur: cur,
}
t.readTags(tags)
2020-10-29 16:06:41 +00:00
t.init(t.advance)
return t
}
func (t *{{.name}}GroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.gc != nil {
t.gc.Close()
t.gc = nil
}
t.mu.Unlock()
}
func (t *{{.name}}GroupTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *{{.name}}GroupTable) advance() bool {
2020-10-29 16:06:41 +00:00
if t.cur == nil {
// For group aggregates, we will try to get all the series and all table buffers within those series
// all at once and merge them into one row when this advance() function is first called.
// At the end of this process, t.advanceCursor() already returns false and t.cur becomes nil.
// But we still need to return true to indicate that there is data to be returned.
// The second time when we call this advance(), t.cur is already nil, so we directly return false.
return false
}
var arr *cursors.{{.Name}}Array
var len int
for {
arr = t.cur.Next()
len = arr.Len()
if len > 0 {
break
}
if !t.advanceCursor() {
return false
}
2020-10-29 16:06:41 +00:00
}
// handle the group without aggregate case
if t.gc.Aggregate() == nil {
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
colReader := t.allocateBuffer(len)
colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc)
colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values)
t.appendTags(colReader)
t.appendBounds(colReader)
return true
}
2020-10-29 16:06:41 +00:00
aggregate, err := determine{{.Name}}AggregateMethod(t.gc.Aggregate().Type)
if err != nil {
t.err = err
2020-04-24 14:58:24 +00:00
return false
}
2020-04-24 14:58:24 +00:00
2020-10-29 16:06:41 +00:00
ts, v := aggregate(arr.Timestamps, arr.Values)
timestamps, values := []int64{ts}, []{{.Type}}{v}
for {
arr = t.cur.Next()
if arr.Len() > 0 {
ts, v := aggregate(arr.Timestamps, arr.Values)
timestamps = append(timestamps, ts)
values = append(values, v)
continue
}
if !t.advanceCursor() {
break
}
}
timestamp, value := aggregate(timestamps, values)
colReader := t.allocateBuffer(1)
if IsSelector(t.gc.Aggregate()) {
colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc)
colReader.cols[valueColIdx] = t.toArrowBuffer([]{{.Type}}{value})
} else {
colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]{{.Type}}{value})
}
t.appendTags(colReader)
t.appendBounds(colReader)
return true
}
2020-10-29 16:06:41 +00:00
type {{.name}}AggregateMethod func([]int64, []{{.Type}}) (int64, {{.Type}})
// determine{{.Name}}AggregateMethod returns the method for aggregating
// returned points within the same group. The incoming points are the
// ones returned for each series and the method returned here will
// aggregate the aggregates.
func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({{.name}}AggregateMethod, error){
switch agg {
case datatypes.AggregateTypeFirst:
return aggregateFirstGroups{{.Name}}, nil
case datatypes.AggregateTypeLast:
return aggregateLastGroups{{.Name}}, nil
case datatypes.AggregateTypeCount:
{{if eq .Name "Integer"}}
return aggregateCountGroups{{.Name}}, nil
{{else}}
return nil, &influxdb.Error {
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate count: {{.Name}}",
}
{{end}}
case datatypes.AggregateTypeSum:
{{if and (ne .Name "Boolean") (ne .Name "String")}}
return aggregateSumGroups{{.Name}}, nil
{{else}}
return nil, &influxdb.Error {
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate sum: {{.Name}}",
}
{{end}}
case datatypes.AggregateTypeMin:
{{if and (ne .Name "Boolean") (ne .Name "String")}}
return aggregateMinGroups{{.Name}}, nil
{{else}}
return nil, &influxdb.Error {
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate min: {{.Name}}",
}
{{end}}
case datatypes.AggregateTypeMax:
{{if and (ne .Name "Boolean") (ne .Name "String")}}
return aggregateMaxGroups{{.Name}}, nil
{{else}}
return nil, &influxdb.Error {
Code: influxdb.EInvalid,
Msg: "unsupported for aggregate max: {{.Name}}",
}
{{end}}
default:
return nil, &influxdb.Error {
Code: influxdb.EInvalid,
Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg),
}
}
}
{{if and (ne .Name "Boolean") (ne .Name "String")}}
func aggregateMinGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if value > values[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
{{end}}
{{if and (ne .Name "Boolean") (ne .Name "String")}}
func aggregateMaxGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if value < values[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
{{end}}
// For group count and sum, the timestamp here is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be anything
// and it won't matter.
{{if eq .Name "Integer"}}
func aggregateCountGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) {
return aggregateSumGroups{{.Name}}(timestamps, values)
}
{{end}}
{{if and (ne .Name "Boolean") (ne .Name "String")}}
func aggregateSumGroups{{.Name}}(_ []int64, values []{{.Type}}) (int64, {{.Type}}) {
var sum {{.Type}}
for _, v := range values {
sum += v
}
return math.MaxInt64, sum
}
{{end}}
func aggregateFirstGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if timestamp > timestamps[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func aggregateLastGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) {
value := values[0]
timestamp := timestamps[0]
for i := 1; i < len(values); i++ {
if timestamp < timestamps[i] {
value = values[i]
timestamp = timestamps[i]
}
}
return timestamp, value
}
func (t *{{.name}}GroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
if cur == nil {
continue
}
if typedCur, ok := cur.(cursors.{{.Name}}ArrayCursor); !ok {
// TODO(sgc): error or skip?
cur.Close()
t.err = &influxdb.Error {
Code: influxdb.EInvalid,
Err: &GroupCursorError {
typ: "{{.name}}",
cursor: cur,
},
}
return false
} else {
t.readTags(t.gc.Tags())
t.cur = typedCur
return true
}
}
return false
}
func (t *{{.name}}GroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
}
{{end}}