1040 lines
27 KiB
Cheetah
1040 lines
27 KiB
Cheetah
package storageflux
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"sync"
|
|
|
|
"github.com/influxdata/flux"
|
|
"github.com/influxdata/flux/array"
|
|
"github.com/influxdata/flux/arrow"
|
|
"github.com/influxdata/flux/execute"
|
|
"github.com/influxdata/flux/interval"
|
|
"github.com/influxdata/flux/memory"
|
|
"github.com/influxdata/flux/values"
|
|
"github.com/influxdata/influxdb/v2/kit/platform/errors"
|
|
"github.com/influxdata/influxdb/v2/models"
|
|
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
|
|
storage "github.com/influxdata/influxdb/v2/storage/reads"
|
|
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
|
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
|
|
)
|
|
{{range .}}
|
|
//
|
|
// *********** {{.Name}} ***********
|
|
//
|
|
|
|
type {{.name}}Table struct {
|
|
table
|
|
mu sync.Mutex
|
|
cur cursors.{{.Name}}ArrayCursor
|
|
alloc memory.Allocator
|
|
}
|
|
|
|
func new{{.Name}}Table(
|
|
done chan struct{},
|
|
cur cursors.{{.Name}}ArrayCursor,
|
|
bounds execute.Bounds,
|
|
key flux.GroupKey,
|
|
cols []flux.ColMeta,
|
|
tags models.Tags,
|
|
defs [][]byte,
|
|
cache *tagsCache,
|
|
alloc memory.Allocator,
|
|
) *{{.name}}Table {
|
|
t := &{{.name}}Table{
|
|
table: newTable(done, bounds, key, cols, defs, cache, alloc),
|
|
cur: cur,
|
|
}
|
|
t.readTags(tags)
|
|
t.init(t.advance)
|
|
|
|
return t
|
|
}
|
|
|
|
func (t *{{.name}}Table) Close() {
|
|
t.mu.Lock()
|
|
if t.cur != nil {
|
|
t.cur.Close()
|
|
t.cur = nil
|
|
}
|
|
t.mu.Unlock()
|
|
}
|
|
|
|
func (t *{{.name}}Table) Statistics() cursors.CursorStats {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
cur := t.cur
|
|
if cur == nil {
|
|
return cursors.CursorStats{}
|
|
}
|
|
cs := cur.Stats()
|
|
return cursors.CursorStats{
|
|
ScannedValues: cs.ScannedValues,
|
|
ScannedBytes: cs.ScannedBytes,
|
|
}
|
|
}
|
|
|
|
func (t *{{.name}}Table) Do(f func(flux.ColReader) error) error {
|
|
return t.do(f, t.advance)
|
|
}
|
|
|
|
func (t *{{.name}}Table) advance() bool {
|
|
a := t.cur.Next()
|
|
l := a.Len()
|
|
if l == 0 {
|
|
return false
|
|
}
|
|
|
|
// Retrieve the buffer for the data to avoid allocating
|
|
// additional slices. If the buffer is still being used
|
|
// because the references were retained, then we will
|
|
// allocate a new buffer.
|
|
cr := t.allocateBuffer(l)
|
|
cr.cols[timeColIdx] = arrow.NewInt(a.Timestamps, t.alloc)
|
|
cr.cols[valueColIdx] = t.toArrowBuffer(a.Values)
|
|
t.appendTags(cr)
|
|
t.appendBounds(cr)
|
|
return true
|
|
}
|
|
|
|
// window table
|
|
type {{.name}}WindowTable struct {
|
|
{{.name}}Table
|
|
arr *cursors.{{.Name}}Array
|
|
windowBounds interval.Bounds
|
|
idxInArr int
|
|
createEmpty bool
|
|
timeColumn string
|
|
isAggregate bool
|
|
window interval.Window
|
|
{{if eq .Name "Integer"}}fillValue *{{.Type}}{{end}}
|
|
}
|
|
|
|
func new{{.Name}}WindowTable(
|
|
done chan struct{},
|
|
cur cursors.{{.Name}}ArrayCursor,
|
|
bounds execute.Bounds,
|
|
window interval.Window,
|
|
createEmpty bool,
|
|
timeColumn string,
|
|
isAggregate bool,
|
|
{{if eq .Name "Integer"}}fillValue *{{.Type}},{{end}}
|
|
key flux.GroupKey,
|
|
cols []flux.ColMeta,
|
|
tags models.Tags,
|
|
defs [][]byte,
|
|
cache *tagsCache,
|
|
alloc memory.Allocator,
|
|
) *{{.name}}WindowTable {
|
|
t := &{{.name}}WindowTable{
|
|
{{.name}}Table: {{.name}}Table{
|
|
table: newTable(done, bounds, key, cols, defs, cache, alloc),
|
|
cur: cur,
|
|
},
|
|
window: window,
|
|
createEmpty: createEmpty,
|
|
timeColumn: timeColumn,
|
|
isAggregate: isAggregate,
|
|
{{if eq .Name "Integer"}}fillValue: fillValue,{{end}}
|
|
}
|
|
if t.createEmpty {
|
|
start := int64(bounds.Start)
|
|
t.windowBounds = window.GetLatestBounds(values.Time(start))
|
|
}
|
|
t.readTags(tags)
|
|
t.init(t.advance)
|
|
|
|
return t
|
|
}
|
|
|
|
func (t *{{.name}}WindowTable) Do(f func(flux.ColReader) error) error {
|
|
return t.do(f, t.advance)
|
|
}
|
|
|
|
// createNextBufferTimes will read the timestamps from the array
|
|
// cursor and construct the values for the next buffer.
|
|
func (t *{{.name}}WindowTable) createNextBufferTimes() (start, stop *array.Int, ok bool) {
|
|
startB := arrow.NewIntBuilder(t.alloc)
|
|
stopB := arrow.NewIntBuilder(t.alloc)
|
|
|
|
if t.createEmpty {
|
|
// There are no more windows when the start time is greater
|
|
// than or equal to the stop time.
|
|
if startT := int64(t.windowBounds.Start()); startT >= int64(t.bounds.Stop) {
|
|
return nil, nil, false
|
|
}
|
|
|
|
// Create a buffer with the buffer size.
|
|
// TODO(jsternberg): Calculate the exact size with max points as the maximum.
|
|
startB.Resize(storage.MaxPointsPerBlock)
|
|
stopB.Resize(storage.MaxPointsPerBlock)
|
|
for ; ; t.windowBounds = t.window.NextBounds(t.windowBounds) {
|
|
startT, stopT := t.getWindowBoundsFor(t.windowBounds)
|
|
if startT >= int64(t.bounds.Stop) {
|
|
break
|
|
}
|
|
startB.Append(startT)
|
|
stopB.Append(stopT)
|
|
}
|
|
start = startB.NewIntArray()
|
|
stop = stopB.NewIntArray()
|
|
return start, stop, true
|
|
}
|
|
|
|
// Retrieve the next buffer so we can copy the timestamps.
|
|
if !t.nextBuffer() {
|
|
return nil, nil, false
|
|
}
|
|
|
|
// Copy over the timestamps from the next buffer and adjust
|
|
// times for the boundaries.
|
|
startB.Resize(len(t.arr.Timestamps))
|
|
stopB.Resize(len(t.arr.Timestamps))
|
|
for _, stopT := range t.arr.Timestamps {
|
|
bounds := t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stopT)))
|
|
startT, stopT := t.getWindowBoundsFor(bounds)
|
|
startB.Append(startT)
|
|
stopB.Append(stopT)
|
|
}
|
|
start = startB.NewIntArray()
|
|
stop = stopB.NewIntArray()
|
|
return start, stop, true
|
|
}
|
|
|
|
func (t *{{.name}}WindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64, int64) {
|
|
beg := int64(bounds.Start())
|
|
end := int64(bounds.Stop())
|
|
if beg < int64(t.bounds.Start) {
|
|
beg = int64(t.bounds.Start)
|
|
}
|
|
if end > int64(t.bounds.Stop) {
|
|
end = int64(t.bounds.Stop)
|
|
}
|
|
return beg, end
|
|
}
|
|
|
|
// nextAt will retrieve the next value that can be used with
|
|
// the given stop timestamp. If no values can be used with the timestamp,
|
|
// it will return the default value and false.
|
|
func (t *{{.name}}WindowTable) nextAt(stop int64) (v {{.Type}}, ok bool) {
|
|
if !t.nextBuffer() {
|
|
return
|
|
} else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) {
|
|
return
|
|
}
|
|
v, ok = t.arr.Values[t.idxInArr], true
|
|
t.idxInArr++
|
|
return v, ok
|
|
}
|
|
|
|
// isInWindow will check if the given time may be used within the window
|
|
// denoted by the stop timestamp. The stop may be a truncated stop time
|
|
// because of a restricted boundary.
|
|
//
|
|
// When used with an aggregate, ts will be the true stop time returned
|
|
// by storage. When used with an aggregate, it will be the real time
|
|
// for the point.
|
|
func (t *{{.name}}WindowTable) isInWindow(stop int64, ts int64) bool {
|
|
// Retrieve the boundary associated with this stop time.
|
|
// This will be the boundary for the previous nanosecond.
|
|
bounds := t.window.GetLatestBounds(values.Time(stop - 1))
|
|
start, stop := int64(bounds.Start()), int64(bounds.Stop())
|
|
|
|
// For an aggregate, the timestamp will be the stop time of the boundary.
|
|
if t.isAggregate {
|
|
return start < ts && ts <= stop
|
|
}
|
|
|
|
// For a selector, the timestamp should be within the boundary.
|
|
return start <= ts && ts < stop
|
|
}
|
|
|
|
// nextBuffer will ensure the array cursor is filled
|
|
// and will return true if there is at least one value
|
|
// that can be read from it.
|
|
func (t *{{.name}}WindowTable) nextBuffer() bool {
|
|
// Discard the current array cursor if we have
|
|
// exceeded it.
|
|
if t.arr != nil && t.idxInArr >= t.arr.Len() {
|
|
t.arr = nil
|
|
}
|
|
|
|
// Retrieve the next array cursor if needed.
|
|
if t.arr == nil {
|
|
arr := t.cur.Next()
|
|
if arr.Len() == 0 {
|
|
return false
|
|
}
|
|
t.arr, t.idxInArr = arr, 0
|
|
}
|
|
return true
|
|
}
|
|
|
|
// appendValues will scan the timestamps and append values
|
|
// that match those timestamps from the buffer.
|
|
func (t *{{.name}}WindowTable) appendValues(intervals []int64, appendValue func(v {{.Type}}), appendNull func()) {
|
|
for i := 0; i < len(intervals); i++ {
|
|
if v, ok := t.nextAt(intervals[i]); ok {
|
|
appendValue(v)
|
|
continue
|
|
}
|
|
appendNull()
|
|
}
|
|
}
|
|
|
|
func (t *{{.name}}WindowTable) advance() bool {
|
|
if !t.nextBuffer() {
|
|
return false
|
|
}
|
|
// Create the timestamps for the next window.
|
|
start, stop, ok := t.createNextBufferTimes()
|
|
if !ok {
|
|
return false
|
|
}
|
|
values := t.mergeValues(stop.Int64Values())
|
|
|
|
// Retrieve the buffer for the data to avoid allocating
|
|
// additional slices. If the buffer is still being used
|
|
// because the references were retained, then we will
|
|
// allocate a new buffer.
|
|
cr := t.allocateBuffer(stop.Len())
|
|
if t.timeColumn != "" {
|
|
switch t.timeColumn {
|
|
case execute.DefaultStopColLabel:
|
|
cr.cols[timeColIdx] = stop
|
|
start.Release()
|
|
case execute.DefaultStartColLabel:
|
|
cr.cols[timeColIdx] = start
|
|
stop.Release()
|
|
}
|
|
cr.cols[valueColIdx] = values
|
|
t.appendBounds(cr)
|
|
} else {
|
|
cr.cols[startColIdx] = start
|
|
cr.cols[stopColIdx] = stop
|
|
cr.cols[valueColIdxWithoutTime] = values
|
|
}
|
|
t.appendTags(cr)
|
|
return true
|
|
}
|
|
|
|
// This table implementation will not have any empty windows.
|
|
type {{.name}}WindowSelectorTable struct {
|
|
{{.name}}Table
|
|
timeColumn string
|
|
window interval.Window
|
|
}
|
|
|
|
func new{{.Name}}WindowSelectorTable(
|
|
done chan struct{},
|
|
cur cursors.{{.Name}}ArrayCursor,
|
|
bounds execute.Bounds,
|
|
window interval.Window,
|
|
timeColumn string,
|
|
key flux.GroupKey,
|
|
cols []flux.ColMeta,
|
|
tags models.Tags,
|
|
defs [][]byte,
|
|
cache *tagsCache,
|
|
alloc memory.Allocator,
|
|
) *{{.name}}WindowSelectorTable {
|
|
t := &{{.name}}WindowSelectorTable{
|
|
{{.name}}Table: {{.name}}Table{
|
|
table: newTable(done, bounds, key, cols, defs, cache, alloc),
|
|
cur: cur,
|
|
},
|
|
window: window,
|
|
timeColumn: timeColumn,
|
|
}
|
|
t.readTags(tags)
|
|
t.init(t.advance)
|
|
return t
|
|
}
|
|
|
|
func (t *{{.name}}WindowSelectorTable) Do(f func(flux.ColReader) error) error {
|
|
return t.do(f, t.advance)
|
|
}
|
|
|
|
func (t *{{.name}}WindowSelectorTable) advance() bool {
|
|
arr := t.cur.Next()
|
|
if arr.Len() == 0 {
|
|
return false
|
|
}
|
|
|
|
cr := t.allocateBuffer(arr.Len())
|
|
|
|
switch t.timeColumn {
|
|
case execute.DefaultStartColLabel:
|
|
cr.cols[timeColIdx] = t.startTimes(arr)
|
|
t.appendBounds(cr)
|
|
case execute.DefaultStopColLabel:
|
|
cr.cols[timeColIdx] = t.stopTimes(arr)
|
|
t.appendBounds(cr)
|
|
default:
|
|
cr.cols[startColIdx] = t.startTimes(arr)
|
|
cr.cols[stopColIdx] = t.stopTimes(arr)
|
|
cr.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc)
|
|
}
|
|
|
|
cr.cols[valueColIdx] = t.toArrowBuffer(arr.Values)
|
|
t.appendTags(cr)
|
|
return true
|
|
}
|
|
|
|
func (t *{{.name}}WindowSelectorTable) startTimes(arr *cursors.{{.Name}}Array) *array.Int {
|
|
start := arrow.NewIntBuilder(t.alloc)
|
|
start.Resize(arr.Len())
|
|
|
|
rangeStart := int64(t.bounds.Start)
|
|
|
|
for _, v := range arr.Timestamps {
|
|
if windowStart := int64(t.window.GetLatestBounds(values.Time(v)).Start()); windowStart < rangeStart {
|
|
start.Append(rangeStart)
|
|
} else {
|
|
start.Append(windowStart)
|
|
}
|
|
}
|
|
return start.NewIntArray()
|
|
}
|
|
|
|
func (t *{{.name}}WindowSelectorTable) stopTimes(arr *cursors.{{.Name}}Array) *array.Int {
|
|
stop := arrow.NewIntBuilder(t.alloc)
|
|
stop.Resize(arr.Len())
|
|
|
|
rangeStop := int64(t.bounds.Stop)
|
|
|
|
for _, v := range arr.Timestamps {
|
|
if windowStop := int64(t.window.GetLatestBounds(values.Time(v)).Stop()); windowStop > rangeStop {
|
|
stop.Append(rangeStop)
|
|
} else {
|
|
stop.Append(windowStop)
|
|
}
|
|
}
|
|
return stop.NewIntArray()
|
|
}
|
|
|
|
// This table implementation may contain empty windows
|
|
// in addition to non-empty windows.
|
|
type {{.name}}EmptyWindowSelectorTable struct {
|
|
{{.name}}Table
|
|
arr *cursors.{{.Name}}Array
|
|
idx int
|
|
rangeStart int64
|
|
rangeStop int64
|
|
windowBounds interval.Bounds
|
|
timeColumn string
|
|
window interval.Window
|
|
}
|
|
|
|
func new{{.Name}}EmptyWindowSelectorTable(
|
|
done chan struct{},
|
|
cur cursors.{{.Name}}ArrayCursor,
|
|
bounds execute.Bounds,
|
|
window interval.Window,
|
|
timeColumn string,
|
|
key flux.GroupKey,
|
|
cols []flux.ColMeta,
|
|
tags models.Tags,
|
|
defs [][]byte,
|
|
cache *tagsCache,
|
|
alloc memory.Allocator,
|
|
) *{{.name}}EmptyWindowSelectorTable {
|
|
rangeStart := int64(bounds.Start)
|
|
rangeStop := int64(bounds.Stop)
|
|
t := &{{.name}}EmptyWindowSelectorTable{
|
|
{{.name}}Table: {{.name}}Table{
|
|
table: newTable(done, bounds, key, cols, defs, cache, alloc),
|
|
cur: cur,
|
|
},
|
|
arr: cur.Next(),
|
|
rangeStart: rangeStart,
|
|
rangeStop: rangeStop,
|
|
windowBounds: window.GetLatestBounds(values.Time(rangeStart)),
|
|
window: window,
|
|
timeColumn: timeColumn,
|
|
}
|
|
t.readTags(tags)
|
|
t.init(t.advance)
|
|
return t
|
|
}
|
|
|
|
func (t *{{.name}}EmptyWindowSelectorTable) Do(f func(flux.ColReader) error) error {
|
|
return t.do(f, t.advance)
|
|
}
|
|
|
|
func (t *{{.name}}EmptyWindowSelectorTable) advance() bool {
|
|
if t.arr.Len() == 0 {
|
|
return false
|
|
}
|
|
|
|
values := t.arrowBuilder()
|
|
values.Resize(storage.MaxPointsPerBlock)
|
|
|
|
var cr *colReader
|
|
|
|
switch t.timeColumn {
|
|
case execute.DefaultStartColLabel:
|
|
start := t.startTimes(values)
|
|
cr = t.allocateBuffer(start.Len())
|
|
cr.cols[timeColIdx] = start
|
|
t.appendBounds(cr)
|
|
case execute.DefaultStopColLabel:
|
|
stop := t.stopTimes(values)
|
|
cr = t.allocateBuffer(stop.Len())
|
|
cr.cols[timeColIdx] = stop
|
|
t.appendBounds(cr)
|
|
default:
|
|
start, stop, time := t.startStopTimes(values)
|
|
cr = t.allocateBuffer(time.Len())
|
|
cr.cols[startColIdx] = start
|
|
cr.cols[stopColIdx] = stop
|
|
cr.cols[timeColIdx] = time
|
|
}
|
|
|
|
cr.cols[valueColIdx] = values.New{{.ArrowType}}Array()
|
|
t.appendTags(cr)
|
|
return true
|
|
}
|
|
|
|
func (t *{{.name}}EmptyWindowSelectorTable) startTimes(builder *array.{{.ArrowType}}Builder) *array.Int {
|
|
start := arrow.NewIntBuilder(t.alloc)
|
|
start.Resize(storage.MaxPointsPerBlock)
|
|
|
|
for int64(t.windowBounds.Start()) < t.rangeStop {
|
|
// The first window should start at the
|
|
// beginning of the time range.
|
|
if int64(t.windowBounds.Start()) < t.rangeStart {
|
|
start.Append(t.rangeStart)
|
|
} else {
|
|
start.Append(int64(t.windowBounds.Start()))
|
|
}
|
|
|
|
var v int64
|
|
|
|
if t.arr.Len() == 0 {
|
|
v = math.MaxInt64
|
|
} else {
|
|
v = t.arr.Timestamps[t.idx]
|
|
}
|
|
|
|
// If the current timestamp falls within the
|
|
// current window, append the value to the
|
|
// builder, otherwise append a null value.
|
|
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
|
|
t.append(builder, t.arr.Values[t.idx])
|
|
t.idx++
|
|
} else {
|
|
builder.AppendNull()
|
|
}
|
|
|
|
t.windowBounds = t.window.NextBounds(t.windowBounds)
|
|
|
|
// If the current array is non-empty and has
|
|
// been read in its entirety, call Next().
|
|
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
|
|
t.arr = t.cur.Next()
|
|
t.idx = 0
|
|
}
|
|
|
|
if start.Len() == storage.MaxPointsPerBlock {
|
|
break
|
|
}
|
|
}
|
|
return start.NewIntArray()
|
|
}
|
|
|
|
func (t *{{.name}}EmptyWindowSelectorTable) stopTimes(builder *array.{{.ArrowType}}Builder) *array.Int {
|
|
stop := arrow.NewIntBuilder(t.alloc)
|
|
stop.Resize(storage.MaxPointsPerBlock)
|
|
|
|
for int64(t.windowBounds.Start()) < t.rangeStop {
|
|
// The last window should stop at the end of
|
|
// the time range.
|
|
if int64(t.windowBounds.Stop()) > t.rangeStop {
|
|
stop.Append(t.rangeStop)
|
|
} else {
|
|
stop.Append(int64(t.windowBounds.Stop()))
|
|
}
|
|
|
|
var v int64
|
|
|
|
if t.arr.Len() == 0 {
|
|
v = math.MaxInt64
|
|
} else {
|
|
v = t.arr.Timestamps[t.idx]
|
|
}
|
|
|
|
// If the current timestamp falls within the
|
|
// current window, append the value to the
|
|
// builder, otherwise append a null value.
|
|
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
|
|
t.append(builder, t.arr.Values[t.idx])
|
|
t.idx++
|
|
} else {
|
|
builder.AppendNull()
|
|
}
|
|
|
|
t.windowBounds = t.window.NextBounds(t.windowBounds)
|
|
|
|
// If the current array is non-empty and has
|
|
// been read in its entirety, call Next().
|
|
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
|
|
t.arr = t.cur.Next()
|
|
t.idx = 0
|
|
}
|
|
|
|
if stop.Len() == storage.MaxPointsPerBlock {
|
|
break
|
|
}
|
|
}
|
|
return stop.NewIntArray()
|
|
}
|
|
|
|
func (t *{{.name}}EmptyWindowSelectorTable) startStopTimes(builder *array.{{.ArrowType}}Builder) (*array.Int, *array.Int, *array.Int) {
|
|
start := arrow.NewIntBuilder(t.alloc)
|
|
start.Resize(storage.MaxPointsPerBlock)
|
|
|
|
stop := arrow.NewIntBuilder(t.alloc)
|
|
stop.Resize(storage.MaxPointsPerBlock)
|
|
|
|
time := arrow.NewIntBuilder(t.alloc)
|
|
time.Resize(storage.MaxPointsPerBlock)
|
|
|
|
for int64(t.windowBounds.Start()) < t.rangeStop {
|
|
|
|
// The first window should start at the
|
|
// beginning of the time range.
|
|
if int64(t.windowBounds.Start()) < t.rangeStart {
|
|
start.Append(t.rangeStart)
|
|
} else {
|
|
start.Append(int64(t.windowBounds.Start()))
|
|
}
|
|
|
|
// The last window should stop at the end of
|
|
// the time range.
|
|
if int64(t.windowBounds.Stop()) > t.rangeStop {
|
|
stop.Append(t.rangeStop)
|
|
} else {
|
|
stop.Append(int64(t.windowBounds.Stop()))
|
|
}
|
|
|
|
var v int64
|
|
|
|
if t.arr.Len() == 0 {
|
|
v = math.MaxInt64
|
|
} else {
|
|
v = t.arr.Timestamps[t.idx]
|
|
}
|
|
|
|
// If the current timestamp falls within the
|
|
// current window, append the value to the
|
|
// builder, otherwise append a null value.
|
|
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
|
|
time.Append(v)
|
|
t.append(builder, t.arr.Values[t.idx])
|
|
t.idx++
|
|
} else {
|
|
time.AppendNull()
|
|
builder.AppendNull()
|
|
}
|
|
|
|
t.windowBounds = t.window.NextBounds(t.windowBounds)
|
|
|
|
// If the current array is non-empty and has
|
|
// been read in its entirety, call Next().
|
|
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
|
|
t.arr = t.cur.Next()
|
|
t.idx = 0
|
|
}
|
|
|
|
if time.Len() == storage.MaxPointsPerBlock {
|
|
break
|
|
}
|
|
}
|
|
return start.NewIntArray(), stop.NewIntArray(), time.NewIntArray()
|
|
}
|
|
|
|
// group table
|
|
|
|
type {{.name}}GroupTable struct {
|
|
table
|
|
mu sync.Mutex
|
|
gc storage.GroupCursor
|
|
cur cursors.{{.Name}}ArrayCursor
|
|
}
|
|
|
|
func new{{.Name}}GroupTable(
|
|
done chan struct{},
|
|
gc storage.GroupCursor,
|
|
cur cursors.{{.Name}}ArrayCursor,
|
|
bounds execute.Bounds,
|
|
key flux.GroupKey,
|
|
cols []flux.ColMeta,
|
|
tags models.Tags,
|
|
defs [][]byte,
|
|
cache *tagsCache,
|
|
alloc memory.Allocator,
|
|
) *{{.name}}GroupTable {
|
|
t := &{{.name}}GroupTable{
|
|
table: newTable(done, bounds, key, cols, defs, cache, alloc),
|
|
gc: gc,
|
|
cur: cur,
|
|
}
|
|
t.readTags(tags)
|
|
t.init(t.advance)
|
|
|
|
return t
|
|
}
|
|
|
|
func (t *{{.name}}GroupTable) Close() {
|
|
t.mu.Lock()
|
|
if t.cur != nil {
|
|
t.cur.Close()
|
|
t.cur = nil
|
|
}
|
|
if t.gc != nil {
|
|
t.gc.Close()
|
|
t.gc = nil
|
|
}
|
|
t.mu.Unlock()
|
|
}
|
|
|
|
func (t *{{.name}}GroupTable) Do(f func(flux.ColReader) error) error {
|
|
return t.do(f, t.advance)
|
|
}
|
|
|
|
func (t *{{.name}}GroupTable) advance() bool {
|
|
if t.cur == nil {
|
|
// For group aggregates, we will try to get all the series and all table buffers within those series
|
|
// all at once and merge them into one row when this advance() function is first called.
|
|
// At the end of this process, t.advanceCursor() already returns false and t.cur becomes nil.
|
|
// But we still need to return true to indicate that there is data to be returned.
|
|
// The second time when we call this advance(), t.cur is already nil, so we directly return false.
|
|
return false
|
|
}
|
|
var arr *cursors.{{.Name}}Array
|
|
var len int
|
|
for {
|
|
arr = t.cur.Next()
|
|
len = arr.Len()
|
|
if len > 0 {
|
|
break
|
|
}
|
|
if !t.advanceCursor() {
|
|
return false
|
|
}
|
|
}
|
|
|
|
// handle the group without aggregate case
|
|
if t.gc.Aggregate() == nil {
|
|
// Retrieve the buffer for the data to avoid allocating
|
|
// additional slices. If the buffer is still being used
|
|
// because the references were retained, then we will
|
|
// allocate a new buffer.
|
|
colReader := t.allocateBuffer(len)
|
|
colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc)
|
|
colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values)
|
|
t.appendTags(colReader)
|
|
t.appendBounds(colReader)
|
|
return true
|
|
}
|
|
|
|
aggregate, err := make{{.Name}}AggregateAccumulator(t.gc.Aggregate().Type)
|
|
if err != nil {
|
|
t.err = err
|
|
return false
|
|
}
|
|
|
|
aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags)
|
|
for {
|
|
arr = t.cur.Next()
|
|
if arr.Len() > 0 {
|
|
aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags)
|
|
continue
|
|
}
|
|
|
|
if !t.advanceCursor() {
|
|
break
|
|
}
|
|
}
|
|
timestamp, value, tags := aggregate.Result()
|
|
|
|
colReader := t.allocateBuffer(1)
|
|
if IsSelector(t.gc.Aggregate()) {
|
|
colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc)
|
|
colReader.cols[valueColIdx] = t.toArrowBuffer([]{{.Type}}{value})
|
|
} else {
|
|
colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]{{.Type}}{value})
|
|
}
|
|
t.appendTheseTags(colReader, tags)
|
|
t.appendBounds(colReader)
|
|
return true
|
|
}
|
|
|
|
type {{.Name}}AggregateAccumulator interface {
|
|
// AccumulateFirst receives an initial array of items to select from.
|
|
// It selects an item and stores the state. Afterwards, more data can
|
|
// be supplied with AccumulateMore and the results can be requested at
|
|
// any time. Without a call to AccumulateFirst the results are not
|
|
// defined.
|
|
AccumulateFirst(timestamps []int64, values []{{.Type}}, tags [][]byte)
|
|
|
|
// AccumulateMore receives additional array elements to select from.
|
|
AccumulateMore(timestamps []int64, values []{{.Type}}, tags [][]byte)
|
|
|
|
// Result returns the item selected from the data received so far.
|
|
Result() (int64, {{.Type}}, [][]byte)
|
|
}
|
|
|
|
// The selector method takes a ( timestamp, value ) pair, a
|
|
// ( []timestamp, []value ) pair, and a starting index. It applies the selector
|
|
// to the single value and the array, starting at the supplied index. It
|
|
// returns -1 if the single value is selected and a non-negative value if an
|
|
// item from the array is selected.
|
|
type {{.name}}SelectorMethod func(int64, {{.Type}}, []int64, []{{.Type}}, int) (int)
|
|
|
|
// The selector accumulator tracks currently-selected item.
|
|
type {{.name}}SelectorAccumulator struct {
|
|
selector {{.name}}SelectorMethod
|
|
|
|
ts int64
|
|
v {{.Type}}
|
|
tags [][]byte
|
|
}
|
|
|
|
func (a *{{.name}}SelectorAccumulator) AccumulateFirst(timestamps []int64, values []{{.Type}}, tags [][]byte) {
|
|
index := a.selector(timestamps[0], values[0], timestamps, values, 1)
|
|
if index < 0 {
|
|
a.ts = timestamps[0]
|
|
a.v = values[0]
|
|
} else {
|
|
a.ts = timestamps[index]
|
|
a.v = values[index]
|
|
}
|
|
a.tags = make([][]byte, len(tags))
|
|
copy(a.tags, tags)
|
|
}
|
|
|
|
func (a *{{.name}}SelectorAccumulator) AccumulateMore(timestamps []int64, values []{{.Type}}, tags [][]byte) {
|
|
index := a.selector(a.ts, a.v, timestamps, values, 0)
|
|
if index >= 0 {
|
|
a.ts = timestamps[index]
|
|
a.v = values[index]
|
|
|
|
if len(tags) > cap(a.tags) {
|
|
a.tags = make([][]byte, len(tags))
|
|
} else {
|
|
a.tags = a.tags[:len(tags)]
|
|
}
|
|
copy(a.tags, tags)
|
|
}
|
|
}
|
|
|
|
func (a *{{.name}}SelectorAccumulator) Result() (int64, {{.Type}}, [][]byte) {
|
|
return a.ts, a.v, a.tags
|
|
}
|
|
|
|
{{if and (ne .Name "Boolean") (ne .Name "String")}}
|
|
|
|
// The aggregate method takes a value, an array of values, and a starting
|
|
// index, applies an aggregate operation over the value and the array, starting
|
|
// at the given index, and returns the result.
|
|
type {{.name}}AggregateMethod func({{.Type}}, []{{.Type}}, int) ({{.Type}})
|
|
|
|
type {{.name}}AggregateAccumulator struct {
|
|
aggregate {{.name}}AggregateMethod
|
|
accum {{.Type}}
|
|
|
|
// For pure aggregates it doesn't matter what we return for tags, but
|
|
// we need to satisfy the interface. We will just return the most
|
|
// recently seen tags.
|
|
tags [][]byte
|
|
}
|
|
|
|
func (a *{{.name}}AggregateAccumulator) AccumulateFirst(timestamps []int64, values []{{.Type}}, tags [][]byte) {
|
|
a.accum = a.aggregate(values[0], values, 1)
|
|
a.tags = tags
|
|
}
|
|
|
|
func (a *{{.name}}AggregateAccumulator) AccumulateMore(timestamps []int64, values []{{.Type}}, tags [][]byte) {
|
|
a.accum = a.aggregate(a.accum, values, 0)
|
|
a.tags = tags
|
|
}
|
|
|
|
// For group aggregates (non-selectors), the timestamp is always math.MaxInt64.
|
|
// their final result does not contain _time, so this timestamp value can be
|
|
// anything and it won't matter.
|
|
func (a *{{.name}}AggregateAccumulator) Result() (int64, {{.Type}}, [][]byte) {
|
|
return math.MaxInt64, a.accum, a.tags
|
|
}
|
|
|
|
{{end}}
|
|
|
|
// make{{.Name}}AggregateAccumulator returns the interface implementation for
|
|
// aggregating returned points within the same group. The incoming points are
|
|
// the ones returned for each series and the struct returned here will
|
|
// aggregate the aggregates.
|
|
func make{{.Name}}AggregateAccumulator(agg datatypes.Aggregate_AggregateType) ({{.Name}}AggregateAccumulator, error){
|
|
switch agg {
|
|
case datatypes.Aggregate_AggregateTypeFirst:
|
|
return &{{.name}}SelectorAccumulator{selector: selectorFirstGroups{{.Name}}}, nil
|
|
case datatypes.Aggregate_AggregateTypeLast:
|
|
return &{{.name}}SelectorAccumulator{selector: selectorLastGroups{{.Name}}}, nil
|
|
case datatypes.Aggregate_AggregateTypeCount:
|
|
{{if eq .Name "Integer"}}
|
|
return &{{.name}}AggregateAccumulator{aggregate: aggregateCountGroups{{.Name}}}, nil
|
|
{{else}}
|
|
return nil, &errors.Error {
|
|
Code: errors.EInvalid,
|
|
Msg: "unsupported for aggregate count: {{.Name}}",
|
|
}
|
|
{{end}}
|
|
case datatypes.Aggregate_AggregateTypeSum:
|
|
{{if and (ne .Name "Boolean") (ne .Name "String")}}
|
|
return &{{.name}}AggregateAccumulator{aggregate: aggregateSumGroups{{.Name}}}, nil
|
|
{{else}}
|
|
return nil, &errors.Error {
|
|
Code: errors.EInvalid,
|
|
Msg: "unsupported for aggregate sum: {{.Name}}",
|
|
}
|
|
{{end}}
|
|
case datatypes.Aggregate_AggregateTypeMin:
|
|
{{if and (ne .Name "Boolean") (ne .Name "String")}}
|
|
return &{{.name}}SelectorAccumulator{selector: selectorMinGroups{{.Name}}}, nil
|
|
{{else}}
|
|
return nil, &errors.Error {
|
|
Code: errors.EInvalid,
|
|
Msg: "unsupported for aggregate min: {{.Name}}",
|
|
}
|
|
{{end}}
|
|
case datatypes.Aggregate_AggregateTypeMax:
|
|
{{if and (ne .Name "Boolean") (ne .Name "String")}}
|
|
return &{{.name}}SelectorAccumulator{selector: selectorMaxGroups{{.Name}}}, nil
|
|
{{else}}
|
|
return nil, &errors.Error {
|
|
Code: errors.EInvalid,
|
|
Msg: "unsupported for aggregate max: {{.Name}}",
|
|
}
|
|
{{end}}
|
|
default:
|
|
return nil, &errors.Error {
|
|
Code: errors.EInvalid,
|
|
Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg),
|
|
}
|
|
}
|
|
}
|
|
|
|
{{if and (ne .Name "Boolean") (ne .Name "String")}}
|
|
func selectorMinGroups{{.Name}}(ts int64, v {{.Type}}, timestamps []int64, values []{{.Type}}, i int) (int) {
|
|
index := -1
|
|
|
|
for ; i < len(values); i++ {
|
|
if v > values[i] {
|
|
index = i
|
|
v = values[i]
|
|
}
|
|
}
|
|
|
|
return index
|
|
}
|
|
{{end}}
|
|
|
|
{{if and (ne .Name "Boolean") (ne .Name "String")}}
|
|
func selectorMaxGroups{{.Name}}(ts int64, v {{.Type}}, timestamps []int64, values []{{.Type}}, i int) (int) {
|
|
index := -1
|
|
|
|
for ; i < len(values); i++ {
|
|
if v < values[i] {
|
|
index = i
|
|
v = values[i]
|
|
}
|
|
}
|
|
|
|
return index
|
|
}
|
|
{{end}}
|
|
|
|
{{if eq .Name "Integer"}}
|
|
func aggregateCountGroups{{.Name}}(accum {{.Type}}, values []{{.Type}}, i int) ({{.Type}}) {
|
|
return aggregateSumGroups{{.Name}}(accum, values, i)
|
|
}
|
|
{{end}}
|
|
|
|
{{if and (ne .Name "Boolean") (ne .Name "String")}}
|
|
func aggregateSumGroups{{.Name}}(sum {{.Type}}, values []{{.Type}}, i int) ({{.Type}}) {
|
|
for ; i< len(values); i++ {
|
|
sum += values[i]
|
|
}
|
|
return sum
|
|
}
|
|
{{end}}
|
|
|
|
func selectorFirstGroups{{.Name}}(ts int64, v {{.Type}}, timestamps []int64, values []{{.Type}}, i int) (int) {
|
|
index := -1
|
|
|
|
for ; i < len(values); i++ {
|
|
if ts > timestamps[i] {
|
|
index = i
|
|
ts = timestamps[i]
|
|
}
|
|
}
|
|
|
|
return index
|
|
}
|
|
|
|
func selectorLastGroups{{.Name}}(ts int64, v {{.Type}}, timestamps []int64, values []{{.Type}}, i int) (int) {
|
|
index := -1
|
|
|
|
for ; i < len(values); i++ {
|
|
if ts <= timestamps[i] {
|
|
index = i
|
|
ts = timestamps[i]
|
|
}
|
|
}
|
|
|
|
return index
|
|
}
|
|
|
|
func (t *{{.name}}GroupTable) advanceCursor() bool {
|
|
t.cur.Close()
|
|
t.cur = nil
|
|
for t.gc.Next() {
|
|
cur := t.gc.Cursor()
|
|
if cur == nil {
|
|
continue
|
|
}
|
|
|
|
if typedCur, ok := cur.(cursors.{{.Name}}ArrayCursor); !ok {
|
|
// TODO(sgc): error or skip?
|
|
cur.Close()
|
|
t.err = &errors.Error {
|
|
Code: errors.EInvalid,
|
|
Err: &GroupCursorError {
|
|
typ: "{{.name}}",
|
|
cursor: cur,
|
|
},
|
|
}
|
|
return false
|
|
} else {
|
|
t.readTags(t.gc.Tags())
|
|
t.cur = typedCur
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (t *{{.name}}GroupTable) Statistics() cursors.CursorStats {
|
|
if t.cur == nil {
|
|
return cursors.CursorStats{}
|
|
}
|
|
cs := t.cur.Stats()
|
|
return cursors.CursorStats{
|
|
ScannedValues: cs.ScannedValues,
|
|
ScannedBytes: cs.ScannedBytes,
|
|
}
|
|
}
|
|
|
|
{{end}}
|