influxdb/storage/flux/table.gen.go

4823 lines
121 KiB
Go

// Generated by tmpl
// https://github.com/benbjohnson/tmpl
//
// DO NOT EDIT!
// Source: table.gen.go.tmpl
package storageflux
import (
"fmt"
"math"
"sync"
"github.com/influxdata/flux"
"github.com/influxdata/flux/array"
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/interval"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/values"
"github.com/influxdata/influxdb/v2/kit/platform/errors"
"github.com/influxdata/influxdb/v2/models"
storage "github.com/influxdata/influxdb/v2/storage/reads"
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
"github.com/influxdata/influxdb/v2/tsdb/cursors"
)
//
// *********** Float ***********
//
type floatTable struct {
table
mu sync.Mutex
cur cursors.FloatArrayCursor
alloc *memory.Allocator
}
func newFloatTable(
done chan struct{},
cur cursors.FloatArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *floatTable {
t := &floatTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *floatTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
t.mu.Unlock()
}
func (t *floatTable) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return cursors.CursorStats{}
}
cs := cur.Stats()
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
}
func (t *floatTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *floatTable) advance() bool {
a := t.cur.Next()
l := a.Len()
if l == 0 {
return false
}
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
cr := t.allocateBuffer(l)
cr.cols[timeColIdx] = arrow.NewInt(a.Timestamps, t.alloc)
cr.cols[valueColIdx] = t.toArrowBuffer(a.Values)
t.appendTags(cr)
t.appendBounds(cr)
return true
}
// window table
type floatWindowTable struct {
floatTable
arr *cursors.FloatArray
windowBounds interval.Bounds
idxInArr int
createEmpty bool
timeColumn string
isAggregate bool
window interval.Window
}
func newFloatWindowTable(
done chan struct{},
cur cursors.FloatArrayCursor,
bounds execute.Bounds,
window interval.Window,
createEmpty bool,
timeColumn string,
isAggregate bool,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *floatWindowTable {
t := &floatWindowTable{
floatTable: floatTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
window: window,
createEmpty: createEmpty,
timeColumn: timeColumn,
isAggregate: isAggregate,
}
if t.createEmpty {
start := int64(bounds.Start)
t.windowBounds = window.GetLatestBounds(values.Time(start))
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *floatWindowTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
// createNextBufferTimes will read the timestamps from the array
// cursor and construct the values for the next buffer.
func (t *floatWindowTable) createNextBufferTimes() (start, stop *array.Int, ok bool) {
startB := arrow.NewIntBuilder(t.alloc)
stopB := arrow.NewIntBuilder(t.alloc)
if t.createEmpty {
// There are no more windows when the start time is greater
// than or equal to the stop time.
if startT := int64(t.windowBounds.Start()); startT >= int64(t.bounds.Stop) {
return nil, nil, false
}
// Create a buffer with the buffer size.
// TODO(jsternberg): Calculate the exact size with max points as the maximum.
startB.Resize(storage.MaxPointsPerBlock)
stopB.Resize(storage.MaxPointsPerBlock)
for ; ; t.windowBounds = t.window.NextBounds(t.windowBounds) {
startT, stopT := t.getWindowBoundsFor(t.windowBounds)
if startT >= int64(t.bounds.Stop) {
break
}
startB.Append(startT)
stopB.Append(stopT)
}
start = startB.NewIntArray()
stop = stopB.NewIntArray()
return start, stop, true
}
// Retrieve the next buffer so we can copy the timestamps.
if !t.nextBuffer() {
return nil, nil, false
}
// Copy over the timestamps from the next buffer and adjust
// times for the boundaries.
startB.Resize(len(t.arr.Timestamps))
stopB.Resize(len(t.arr.Timestamps))
for _, stopT := range t.arr.Timestamps {
bounds := t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stopT)))
startT, stopT := t.getWindowBoundsFor(bounds)
startB.Append(startT)
stopB.Append(stopT)
}
start = startB.NewIntArray()
stop = stopB.NewIntArray()
return start, stop, true
}
func (t *floatWindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64, int64) {
beg := int64(bounds.Start())
end := int64(bounds.Stop())
if beg < int64(t.bounds.Start) {
beg = int64(t.bounds.Start)
}
if end > int64(t.bounds.Stop) {
end = int64(t.bounds.Stop)
}
return beg, end
}
// nextAt will retrieve the next value that can be used with
// the given stop timestamp. If no values can be used with the timestamp,
// it will return the default value and false.
func (t *floatWindowTable) nextAt(stop int64) (v float64, ok bool) {
if !t.nextBuffer() {
return
} else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) {
return
}
v, ok = t.arr.Values[t.idxInArr], true
t.idxInArr++
return v, ok
}
// isInWindow will check if the given time may be used within the window
// denoted by the stop timestamp. The stop may be a truncated stop time
// because of a restricted boundary.
//
// When used with an aggregate, ts will be the true stop time returned
// by storage. When used with an aggregate, it will be the real time
// for the point.
func (t *floatWindowTable) isInWindow(stop int64, ts int64) bool {
// Retrieve the boundary associated with this stop time.
// This will be the boundary for the previous nanosecond.
bounds := t.window.GetLatestBounds(values.Time(stop - 1))
start, stop := int64(bounds.Start()), int64(bounds.Stop())
// For an aggregate, the timestamp will be the stop time of the boundary.
if t.isAggregate {
return start < ts && ts <= stop
}
// For a selector, the timestamp should be within the boundary.
return start <= ts && ts < stop
}
// nextBuffer will ensure the array cursor is filled
// and will return true if there is at least one value
// that can be read from it.
func (t *floatWindowTable) nextBuffer() bool {
// Discard the current array cursor if we have
// exceeded it.
if t.arr != nil && t.idxInArr >= t.arr.Len() {
t.arr = nil
}
// Retrieve the next array cursor if needed.
if t.arr == nil {
arr := t.cur.Next()
if arr.Len() == 0 {
return false
}
t.arr, t.idxInArr = arr, 0
}
return true
}
// appendValues will scan the timestamps and append values
// that match those timestamps from the buffer.
func (t *floatWindowTable) appendValues(intervals []int64, appendValue func(v float64), appendNull func()) {
for i := 0; i < len(intervals); i++ {
if v, ok := t.nextAt(intervals[i]); ok {
appendValue(v)
continue
}
appendNull()
}
}
func (t *floatWindowTable) advance() bool {
if !t.nextBuffer() {
return false
}
// Create the timestamps for the next window.
start, stop, ok := t.createNextBufferTimes()
if !ok {
return false
}
values := t.mergeValues(stop.Int64Values())
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
cr := t.allocateBuffer(stop.Len())
if t.timeColumn != "" {
switch t.timeColumn {
case execute.DefaultStopColLabel:
cr.cols[timeColIdx] = stop
start.Release()
case execute.DefaultStartColLabel:
cr.cols[timeColIdx] = start
stop.Release()
}
cr.cols[valueColIdx] = values
t.appendBounds(cr)
} else {
cr.cols[startColIdx] = start
cr.cols[stopColIdx] = stop
cr.cols[valueColIdxWithoutTime] = values
}
t.appendTags(cr)
return true
}
// This table implementation will not have any empty windows.
type floatWindowSelectorTable struct {
floatTable
timeColumn string
window interval.Window
}
func newFloatWindowSelectorTable(
done chan struct{},
cur cursors.FloatArrayCursor,
bounds execute.Bounds,
window interval.Window,
timeColumn string,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *floatWindowSelectorTable {
t := &floatWindowSelectorTable{
floatTable: floatTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
window: window,
timeColumn: timeColumn,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *floatWindowSelectorTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *floatWindowSelectorTable) advance() bool {
arr := t.cur.Next()
if arr.Len() == 0 {
return false
}
cr := t.allocateBuffer(arr.Len())
switch t.timeColumn {
case execute.DefaultStartColLabel:
cr.cols[timeColIdx] = t.startTimes(arr)
t.appendBounds(cr)
case execute.DefaultStopColLabel:
cr.cols[timeColIdx] = t.stopTimes(arr)
t.appendBounds(cr)
default:
cr.cols[startColIdx] = t.startTimes(arr)
cr.cols[stopColIdx] = t.stopTimes(arr)
cr.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc)
}
cr.cols[valueColIdx] = t.toArrowBuffer(arr.Values)
t.appendTags(cr)
return true
}
func (t *floatWindowSelectorTable) startTimes(arr *cursors.FloatArray) *array.Int {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(arr.Len())
rangeStart := int64(t.bounds.Start)
for _, v := range arr.Timestamps {
if windowStart := int64(t.window.GetLatestBounds(values.Time(v)).Start()); windowStart < rangeStart {
start.Append(rangeStart)
} else {
start.Append(windowStart)
}
}
return start.NewIntArray()
}
func (t *floatWindowSelectorTable) stopTimes(arr *cursors.FloatArray) *array.Int {
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(arr.Len())
rangeStop := int64(t.bounds.Stop)
for _, v := range arr.Timestamps {
if windowStop := int64(t.window.GetLatestBounds(values.Time(v)).Stop()); windowStop > rangeStop {
stop.Append(rangeStop)
} else {
stop.Append(windowStop)
}
}
return stop.NewIntArray()
}
// This table implementation may contain empty windows
// in addition to non-empty windows.
type floatEmptyWindowSelectorTable struct {
floatTable
arr *cursors.FloatArray
idx int
rangeStart int64
rangeStop int64
windowBounds interval.Bounds
timeColumn string
window interval.Window
}
func newFloatEmptyWindowSelectorTable(
done chan struct{},
cur cursors.FloatArrayCursor,
bounds execute.Bounds,
window interval.Window,
timeColumn string,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *floatEmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
t := &floatEmptyWindowSelectorTable{
floatTable: floatTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
arr: cur.Next(),
rangeStart: rangeStart,
rangeStop: rangeStop,
windowBounds: window.GetLatestBounds(values.Time(rangeStart)),
window: window,
timeColumn: timeColumn,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *floatEmptyWindowSelectorTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *floatEmptyWindowSelectorTable) advance() bool {
if t.arr.Len() == 0 {
return false
}
values := t.arrowBuilder()
values.Resize(storage.MaxPointsPerBlock)
var cr *colReader
switch t.timeColumn {
case execute.DefaultStartColLabel:
start := t.startTimes(values)
cr = t.allocateBuffer(start.Len())
cr.cols[timeColIdx] = start
t.appendBounds(cr)
case execute.DefaultStopColLabel:
stop := t.stopTimes(values)
cr = t.allocateBuffer(stop.Len())
cr.cols[timeColIdx] = stop
t.appendBounds(cr)
default:
start, stop, time := t.startStopTimes(values)
cr = t.allocateBuffer(time.Len())
cr.cols[startColIdx] = start
cr.cols[stopColIdx] = stop
cr.cols[timeColIdx] = time
}
cr.cols[valueColIdx] = values.NewFloatArray()
t.appendTags(cr)
return true
}
func (t *floatEmptyWindowSelectorTable) startTimes(builder *array.FloatBuilder) *array.Int {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(storage.MaxPointsPerBlock)
for int64(t.windowBounds.Start()) < t.rangeStop {
// The first window should start at the
// beginning of the time range.
if int64(t.windowBounds.Start()) < t.rangeStart {
start.Append(t.rangeStart)
} else {
start.Append(int64(t.windowBounds.Start()))
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
builder.AppendNull()
}
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if start.Len() == storage.MaxPointsPerBlock {
break
}
}
return start.NewIntArray()
}
func (t *floatEmptyWindowSelectorTable) stopTimes(builder *array.FloatBuilder) *array.Int {
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(storage.MaxPointsPerBlock)
for int64(t.windowBounds.Start()) < t.rangeStop {
// The last window should stop at the end of
// the time range.
if int64(t.windowBounds.Stop()) > t.rangeStop {
stop.Append(t.rangeStop)
} else {
stop.Append(int64(t.windowBounds.Stop()))
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
builder.AppendNull()
}
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if stop.Len() == storage.MaxPointsPerBlock {
break
}
}
return stop.NewIntArray()
}
func (t *floatEmptyWindowSelectorTable) startStopTimes(builder *array.FloatBuilder) (*array.Int, *array.Int, *array.Int) {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(storage.MaxPointsPerBlock)
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(storage.MaxPointsPerBlock)
time := arrow.NewIntBuilder(t.alloc)
time.Resize(storage.MaxPointsPerBlock)
for int64(t.windowBounds.Start()) < t.rangeStop {
// The first window should start at the
// beginning of the time range.
if int64(t.windowBounds.Start()) < t.rangeStart {
start.Append(t.rangeStart)
} else {
start.Append(int64(t.windowBounds.Start()))
}
// The last window should stop at the end of
// the time range.
if int64(t.windowBounds.Stop()) > t.rangeStop {
stop.Append(t.rangeStop)
} else {
stop.Append(int64(t.windowBounds.Stop()))
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
time.Append(v)
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
time.AppendNull()
builder.AppendNull()
}
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if time.Len() == storage.MaxPointsPerBlock {
break
}
}
return start.NewIntArray(), stop.NewIntArray(), time.NewIntArray()
}
// group table
type floatGroupTable struct {
table
mu sync.Mutex
gc storage.GroupCursor
cur cursors.FloatArrayCursor
}
func newFloatGroupTable(
done chan struct{},
gc storage.GroupCursor,
cur cursors.FloatArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *floatGroupTable {
t := &floatGroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *floatGroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.gc != nil {
t.gc.Close()
t.gc = nil
}
t.mu.Unlock()
}
func (t *floatGroupTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *floatGroupTable) advance() bool {
if t.cur == nil {
// For group aggregates, we will try to get all the series and all table buffers within those series
// all at once and merge them into one row when this advance() function is first called.
// At the end of this process, t.advanceCursor() already returns false and t.cur becomes nil.
// But we still need to return true to indicate that there is data to be returned.
// The second time when we call this advance(), t.cur is already nil, so we directly return false.
return false
}
var arr *cursors.FloatArray
var len int
for {
arr = t.cur.Next()
len = arr.Len()
if len > 0 {
break
}
if !t.advanceCursor() {
return false
}
}
// handle the group without aggregate case
if t.gc.Aggregate() == nil {
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
colReader := t.allocateBuffer(len)
colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc)
colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values)
t.appendTags(colReader)
t.appendBounds(colReader)
return true
}
aggregate, err := makeFloatAggregateAccumulator(t.gc.Aggregate().Type)
if err != nil {
t.err = err
return false
}
aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags)
for {
arr = t.cur.Next()
if arr.Len() > 0 {
aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags)
continue
}
if !t.advanceCursor() {
break
}
}
timestamp, value, tags := aggregate.Result()
colReader := t.allocateBuffer(1)
if IsSelector(t.gc.Aggregate()) {
colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc)
colReader.cols[valueColIdx] = t.toArrowBuffer([]float64{value})
} else {
colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]float64{value})
}
t.appendTheseTags(colReader, tags)
t.appendBounds(colReader)
return true
}
type FloatAggregateAccumulator interface {
// AccumulateFirst receives an initial array of items to select from.
// It selects an item and stores the state. Afterwards, more data can
// be supplied with AccumulateMore and the results can be requested at
// any time. Without a call to AccumulateFirst the results are not
// defined.
AccumulateFirst(timestamps []int64, values []float64, tags [][]byte)
// AccumulateMore receives additional array elements to select from.
AccumulateMore(timestamps []int64, values []float64, tags [][]byte)
// Result returns the item selected from the data received so far.
Result() (int64, float64, [][]byte)
}
// The selector method takes a ( timestamp, value ) pair, a
// ( []timestamp, []value ) pair, and a starting index. It applies the selector
// to the single value and the array, starting at the supplied index. It
// returns -1 if the single value is selected and a non-negative value if an
// item from the array is selected.
type floatSelectorMethod func(int64, float64, []int64, []float64, int) int
// The selector accumulator tracks currently-selected item.
type floatSelectorAccumulator struct {
selector floatSelectorMethod
ts int64
v float64
tags [][]byte
}
func (a *floatSelectorAccumulator) AccumulateFirst(timestamps []int64, values []float64, tags [][]byte) {
index := a.selector(timestamps[0], values[0], timestamps, values, 1)
if index < 0 {
a.ts = timestamps[0]
a.v = values[0]
} else {
a.ts = timestamps[index]
a.v = values[index]
}
a.tags = make([][]byte, len(tags))
copy(a.tags, tags)
}
func (a *floatSelectorAccumulator) AccumulateMore(timestamps []int64, values []float64, tags [][]byte) {
index := a.selector(a.ts, a.v, timestamps, values, 0)
if index >= 0 {
a.ts = timestamps[index]
a.v = values[index]
if len(tags) > cap(a.tags) {
a.tags = make([][]byte, len(tags))
} else {
a.tags = a.tags[:len(tags)]
}
copy(a.tags, tags)
}
}
func (a *floatSelectorAccumulator) Result() (int64, float64, [][]byte) {
return a.ts, a.v, a.tags
}
// The aggregate method takes a value, an array of values, and a starting
// index, applies an aggregate operation over the value and the array, starting
// at the given index, and returns the result.
type floatAggregateMethod func(float64, []float64, int) float64
type floatAggregateAccumulator struct {
aggregate floatAggregateMethod
accum float64
// For pure aggregates it doesn't matter what we return for tags, but
// we need to satisfy the interface. We will just return the most
// recently seen tags.
tags [][]byte
}
func (a *floatAggregateAccumulator) AccumulateFirst(timestamps []int64, values []float64, tags [][]byte) {
a.accum = a.aggregate(values[0], values, 1)
a.tags = tags
}
func (a *floatAggregateAccumulator) AccumulateMore(timestamps []int64, values []float64, tags [][]byte) {
a.accum = a.aggregate(a.accum, values, 0)
a.tags = tags
}
// For group aggregates (non-selectors), the timestamp is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be
// anything and it won't matter.
func (a *floatAggregateAccumulator) Result() (int64, float64, [][]byte) {
return math.MaxInt64, a.accum, a.tags
}
// makeFloatAggregateAccumulator returns the interface implementation for
// aggregating returned points within the same group. The incoming points are
// the ones returned for each series and the struct returned here will
// aggregate the aggregates.
func makeFloatAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (FloatAggregateAccumulator, error) {
switch agg {
case datatypes.Aggregate_AggregateTypeFirst:
return &floatSelectorAccumulator{selector: selectorFirstGroupsFloat}, nil
case datatypes.Aggregate_AggregateTypeLast:
return &floatSelectorAccumulator{selector: selectorLastGroupsFloat}, nil
case datatypes.Aggregate_AggregateTypeCount:
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: "unsupported for aggregate count: Float",
}
case datatypes.Aggregate_AggregateTypeSum:
return &floatAggregateAccumulator{aggregate: aggregateSumGroupsFloat}, nil
case datatypes.Aggregate_AggregateTypeMin:
return &floatSelectorAccumulator{selector: selectorMinGroupsFloat}, nil
case datatypes.Aggregate_AggregateTypeMax:
return &floatSelectorAccumulator{selector: selectorMaxGroupsFloat}, nil
default:
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg),
}
}
}
func selectorMinGroupsFloat(ts int64, v float64, timestamps []int64, values []float64, i int) int {
index := -1
for ; i < len(values); i++ {
if v > values[i] {
index = i
v = values[i]
}
}
return index
}
func selectorMaxGroupsFloat(ts int64, v float64, timestamps []int64, values []float64, i int) int {
index := -1
for ; i < len(values); i++ {
if v < values[i] {
index = i
v = values[i]
}
}
return index
}
func aggregateSumGroupsFloat(sum float64, values []float64, i int) float64 {
for ; i < len(values); i++ {
sum += values[i]
}
return sum
}
func selectorFirstGroupsFloat(ts int64, v float64, timestamps []int64, values []float64, i int) int {
index := -1
for ; i < len(values); i++ {
if ts > timestamps[i] {
index = i
ts = timestamps[i]
}
}
return index
}
func selectorLastGroupsFloat(ts int64, v float64, timestamps []int64, values []float64, i int) int {
index := -1
for ; i < len(values); i++ {
if ts <= timestamps[i] {
index = i
ts = timestamps[i]
}
}
return index
}
func (t *floatGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
if cur == nil {
continue
}
if typedCur, ok := cur.(cursors.FloatArrayCursor); !ok {
// TODO(sgc): error or skip?
cur.Close()
t.err = &errors.Error{
Code: errors.EInvalid,
Err: &GroupCursorError{
typ: "float",
cursor: cur,
},
}
return false
} else {
t.readTags(t.gc.Tags())
t.cur = typedCur
return true
}
}
return false
}
func (t *floatGroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
}
//
// *********** Integer ***********
//
type integerTable struct {
table
mu sync.Mutex
cur cursors.IntegerArrayCursor
alloc *memory.Allocator
}
func newIntegerTable(
done chan struct{},
cur cursors.IntegerArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *integerTable {
t := &integerTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *integerTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
t.mu.Unlock()
}
func (t *integerTable) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return cursors.CursorStats{}
}
cs := cur.Stats()
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
}
func (t *integerTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *integerTable) advance() bool {
a := t.cur.Next()
l := a.Len()
if l == 0 {
return false
}
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
cr := t.allocateBuffer(l)
cr.cols[timeColIdx] = arrow.NewInt(a.Timestamps, t.alloc)
cr.cols[valueColIdx] = t.toArrowBuffer(a.Values)
t.appendTags(cr)
t.appendBounds(cr)
return true
}
// window table
type integerWindowTable struct {
integerTable
arr *cursors.IntegerArray
windowBounds interval.Bounds
idxInArr int
createEmpty bool
timeColumn string
isAggregate bool
window interval.Window
fillValue *int64
}
func newIntegerWindowTable(
done chan struct{},
cur cursors.IntegerArrayCursor,
bounds execute.Bounds,
window interval.Window,
createEmpty bool,
timeColumn string,
isAggregate bool,
fillValue *int64,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *integerWindowTable {
t := &integerWindowTable{
integerTable: integerTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
window: window,
createEmpty: createEmpty,
timeColumn: timeColumn,
isAggregate: isAggregate,
fillValue: fillValue,
}
if t.createEmpty {
start := int64(bounds.Start)
t.windowBounds = window.GetLatestBounds(values.Time(start))
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *integerWindowTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
// createNextBufferTimes will read the timestamps from the array
// cursor and construct the values for the next buffer.
func (t *integerWindowTable) createNextBufferTimes() (start, stop *array.Int, ok bool) {
startB := arrow.NewIntBuilder(t.alloc)
stopB := arrow.NewIntBuilder(t.alloc)
if t.createEmpty {
// There are no more windows when the start time is greater
// than or equal to the stop time.
if startT := int64(t.windowBounds.Start()); startT >= int64(t.bounds.Stop) {
return nil, nil, false
}
// Create a buffer with the buffer size.
// TODO(jsternberg): Calculate the exact size with max points as the maximum.
startB.Resize(storage.MaxPointsPerBlock)
stopB.Resize(storage.MaxPointsPerBlock)
for ; ; t.windowBounds = t.window.NextBounds(t.windowBounds) {
startT, stopT := t.getWindowBoundsFor(t.windowBounds)
if startT >= int64(t.bounds.Stop) {
break
}
startB.Append(startT)
stopB.Append(stopT)
}
start = startB.NewIntArray()
stop = stopB.NewIntArray()
return start, stop, true
}
// Retrieve the next buffer so we can copy the timestamps.
if !t.nextBuffer() {
return nil, nil, false
}
// Copy over the timestamps from the next buffer and adjust
// times for the boundaries.
startB.Resize(len(t.arr.Timestamps))
stopB.Resize(len(t.arr.Timestamps))
for _, stopT := range t.arr.Timestamps {
bounds := t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stopT)))
startT, stopT := t.getWindowBoundsFor(bounds)
startB.Append(startT)
stopB.Append(stopT)
}
start = startB.NewIntArray()
stop = stopB.NewIntArray()
return start, stop, true
}
func (t *integerWindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64, int64) {
beg := int64(bounds.Start())
end := int64(bounds.Stop())
if beg < int64(t.bounds.Start) {
beg = int64(t.bounds.Start)
}
if end > int64(t.bounds.Stop) {
end = int64(t.bounds.Stop)
}
return beg, end
}
// nextAt will retrieve the next value that can be used with
// the given stop timestamp. If no values can be used with the timestamp,
// it will return the default value and false.
func (t *integerWindowTable) nextAt(stop int64) (v int64, ok bool) {
if !t.nextBuffer() {
return
} else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) {
return
}
v, ok = t.arr.Values[t.idxInArr], true
t.idxInArr++
return v, ok
}
// isInWindow will check if the given time may be used within the window
// denoted by the stop timestamp. The stop may be a truncated stop time
// because of a restricted boundary.
//
// When used with an aggregate, ts will be the true stop time returned
// by storage. When used with an aggregate, it will be the real time
// for the point.
func (t *integerWindowTable) isInWindow(stop int64, ts int64) bool {
// Retrieve the boundary associated with this stop time.
// This will be the boundary for the previous nanosecond.
bounds := t.window.GetLatestBounds(values.Time(stop - 1))
start, stop := int64(bounds.Start()), int64(bounds.Stop())
// For an aggregate, the timestamp will be the stop time of the boundary.
if t.isAggregate {
return start < ts && ts <= stop
}
// For a selector, the timestamp should be within the boundary.
return start <= ts && ts < stop
}
// nextBuffer will ensure the array cursor is filled
// and will return true if there is at least one value
// that can be read from it.
func (t *integerWindowTable) nextBuffer() bool {
// Discard the current array cursor if we have
// exceeded it.
if t.arr != nil && t.idxInArr >= t.arr.Len() {
t.arr = nil
}
// Retrieve the next array cursor if needed.
if t.arr == nil {
arr := t.cur.Next()
if arr.Len() == 0 {
return false
}
t.arr, t.idxInArr = arr, 0
}
return true
}
// appendValues will scan the timestamps and append values
// that match those timestamps from the buffer.
func (t *integerWindowTable) appendValues(intervals []int64, appendValue func(v int64), appendNull func()) {
for i := 0; i < len(intervals); i++ {
if v, ok := t.nextAt(intervals[i]); ok {
appendValue(v)
continue
}
appendNull()
}
}
func (t *integerWindowTable) advance() bool {
if !t.nextBuffer() {
return false
}
// Create the timestamps for the next window.
start, stop, ok := t.createNextBufferTimes()
if !ok {
return false
}
values := t.mergeValues(stop.Int64Values())
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
cr := t.allocateBuffer(stop.Len())
if t.timeColumn != "" {
switch t.timeColumn {
case execute.DefaultStopColLabel:
cr.cols[timeColIdx] = stop
start.Release()
case execute.DefaultStartColLabel:
cr.cols[timeColIdx] = start
stop.Release()
}
cr.cols[valueColIdx] = values
t.appendBounds(cr)
} else {
cr.cols[startColIdx] = start
cr.cols[stopColIdx] = stop
cr.cols[valueColIdxWithoutTime] = values
}
t.appendTags(cr)
return true
}
// This table implementation will not have any empty windows.
type integerWindowSelectorTable struct {
integerTable
timeColumn string
window interval.Window
}
func newIntegerWindowSelectorTable(
done chan struct{},
cur cursors.IntegerArrayCursor,
bounds execute.Bounds,
window interval.Window,
timeColumn string,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *integerWindowSelectorTable {
t := &integerWindowSelectorTable{
integerTable: integerTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
window: window,
timeColumn: timeColumn,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *integerWindowSelectorTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *integerWindowSelectorTable) advance() bool {
arr := t.cur.Next()
if arr.Len() == 0 {
return false
}
cr := t.allocateBuffer(arr.Len())
switch t.timeColumn {
case execute.DefaultStartColLabel:
cr.cols[timeColIdx] = t.startTimes(arr)
t.appendBounds(cr)
case execute.DefaultStopColLabel:
cr.cols[timeColIdx] = t.stopTimes(arr)
t.appendBounds(cr)
default:
cr.cols[startColIdx] = t.startTimes(arr)
cr.cols[stopColIdx] = t.stopTimes(arr)
cr.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc)
}
cr.cols[valueColIdx] = t.toArrowBuffer(arr.Values)
t.appendTags(cr)
return true
}
func (t *integerWindowSelectorTable) startTimes(arr *cursors.IntegerArray) *array.Int {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(arr.Len())
rangeStart := int64(t.bounds.Start)
for _, v := range arr.Timestamps {
if windowStart := int64(t.window.GetLatestBounds(values.Time(v)).Start()); windowStart < rangeStart {
start.Append(rangeStart)
} else {
start.Append(windowStart)
}
}
return start.NewIntArray()
}
func (t *integerWindowSelectorTable) stopTimes(arr *cursors.IntegerArray) *array.Int {
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(arr.Len())
rangeStop := int64(t.bounds.Stop)
for _, v := range arr.Timestamps {
if windowStop := int64(t.window.GetLatestBounds(values.Time(v)).Stop()); windowStop > rangeStop {
stop.Append(rangeStop)
} else {
stop.Append(windowStop)
}
}
return stop.NewIntArray()
}
// This table implementation may contain empty windows
// in addition to non-empty windows.
type integerEmptyWindowSelectorTable struct {
integerTable
arr *cursors.IntegerArray
idx int
rangeStart int64
rangeStop int64
windowBounds interval.Bounds
timeColumn string
window interval.Window
}
func newIntegerEmptyWindowSelectorTable(
done chan struct{},
cur cursors.IntegerArrayCursor,
bounds execute.Bounds,
window interval.Window,
timeColumn string,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *integerEmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
t := &integerEmptyWindowSelectorTable{
integerTable: integerTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
arr: cur.Next(),
rangeStart: rangeStart,
rangeStop: rangeStop,
windowBounds: window.GetLatestBounds(values.Time(rangeStart)),
window: window,
timeColumn: timeColumn,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *integerEmptyWindowSelectorTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *integerEmptyWindowSelectorTable) advance() bool {
if t.arr.Len() == 0 {
return false
}
values := t.arrowBuilder()
values.Resize(storage.MaxPointsPerBlock)
var cr *colReader
switch t.timeColumn {
case execute.DefaultStartColLabel:
start := t.startTimes(values)
cr = t.allocateBuffer(start.Len())
cr.cols[timeColIdx] = start
t.appendBounds(cr)
case execute.DefaultStopColLabel:
stop := t.stopTimes(values)
cr = t.allocateBuffer(stop.Len())
cr.cols[timeColIdx] = stop
t.appendBounds(cr)
default:
start, stop, time := t.startStopTimes(values)
cr = t.allocateBuffer(time.Len())
cr.cols[startColIdx] = start
cr.cols[stopColIdx] = stop
cr.cols[timeColIdx] = time
}
cr.cols[valueColIdx] = values.NewIntArray()
t.appendTags(cr)
return true
}
func (t *integerEmptyWindowSelectorTable) startTimes(builder *array.IntBuilder) *array.Int {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(storage.MaxPointsPerBlock)
for int64(t.windowBounds.Start()) < t.rangeStop {
// The first window should start at the
// beginning of the time range.
if int64(t.windowBounds.Start()) < t.rangeStart {
start.Append(t.rangeStart)
} else {
start.Append(int64(t.windowBounds.Start()))
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
builder.AppendNull()
}
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if start.Len() == storage.MaxPointsPerBlock {
break
}
}
return start.NewIntArray()
}
func (t *integerEmptyWindowSelectorTable) stopTimes(builder *array.IntBuilder) *array.Int {
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(storage.MaxPointsPerBlock)
for int64(t.windowBounds.Start()) < t.rangeStop {
// The last window should stop at the end of
// the time range.
if int64(t.windowBounds.Stop()) > t.rangeStop {
stop.Append(t.rangeStop)
} else {
stop.Append(int64(t.windowBounds.Stop()))
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
builder.AppendNull()
}
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if stop.Len() == storage.MaxPointsPerBlock {
break
}
}
return stop.NewIntArray()
}
func (t *integerEmptyWindowSelectorTable) startStopTimes(builder *array.IntBuilder) (*array.Int, *array.Int, *array.Int) {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(storage.MaxPointsPerBlock)
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(storage.MaxPointsPerBlock)
time := arrow.NewIntBuilder(t.alloc)
time.Resize(storage.MaxPointsPerBlock)
for int64(t.windowBounds.Start()) < t.rangeStop {
// The first window should start at the
// beginning of the time range.
if int64(t.windowBounds.Start()) < t.rangeStart {
start.Append(t.rangeStart)
} else {
start.Append(int64(t.windowBounds.Start()))
}
// The last window should stop at the end of
// the time range.
if int64(t.windowBounds.Stop()) > t.rangeStop {
stop.Append(t.rangeStop)
} else {
stop.Append(int64(t.windowBounds.Stop()))
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
time.Append(v)
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
time.AppendNull()
builder.AppendNull()
}
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if time.Len() == storage.MaxPointsPerBlock {
break
}
}
return start.NewIntArray(), stop.NewIntArray(), time.NewIntArray()
}
// group table
type integerGroupTable struct {
table
mu sync.Mutex
gc storage.GroupCursor
cur cursors.IntegerArrayCursor
}
func newIntegerGroupTable(
done chan struct{},
gc storage.GroupCursor,
cur cursors.IntegerArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *integerGroupTable {
t := &integerGroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *integerGroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.gc != nil {
t.gc.Close()
t.gc = nil
}
t.mu.Unlock()
}
func (t *integerGroupTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *integerGroupTable) advance() bool {
if t.cur == nil {
// For group aggregates, we will try to get all the series and all table buffers within those series
// all at once and merge them into one row when this advance() function is first called.
// At the end of this process, t.advanceCursor() already returns false and t.cur becomes nil.
// But we still need to return true to indicate that there is data to be returned.
// The second time when we call this advance(), t.cur is already nil, so we directly return false.
return false
}
var arr *cursors.IntegerArray
var len int
for {
arr = t.cur.Next()
len = arr.Len()
if len > 0 {
break
}
if !t.advanceCursor() {
return false
}
}
// handle the group without aggregate case
if t.gc.Aggregate() == nil {
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
colReader := t.allocateBuffer(len)
colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc)
colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values)
t.appendTags(colReader)
t.appendBounds(colReader)
return true
}
aggregate, err := makeIntegerAggregateAccumulator(t.gc.Aggregate().Type)
if err != nil {
t.err = err
return false
}
aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags)
for {
arr = t.cur.Next()
if arr.Len() > 0 {
aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags)
continue
}
if !t.advanceCursor() {
break
}
}
timestamp, value, tags := aggregate.Result()
colReader := t.allocateBuffer(1)
if IsSelector(t.gc.Aggregate()) {
colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc)
colReader.cols[valueColIdx] = t.toArrowBuffer([]int64{value})
} else {
colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]int64{value})
}
t.appendTheseTags(colReader, tags)
t.appendBounds(colReader)
return true
}
type IntegerAggregateAccumulator interface {
// AccumulateFirst receives an initial array of items to select from.
// It selects an item and stores the state. Afterwards, more data can
// be supplied with AccumulateMore and the results can be requested at
// any time. Without a call to AccumulateFirst the results are not
// defined.
AccumulateFirst(timestamps []int64, values []int64, tags [][]byte)
// AccumulateMore receives additional array elements to select from.
AccumulateMore(timestamps []int64, values []int64, tags [][]byte)
// Result returns the item selected from the data received so far.
Result() (int64, int64, [][]byte)
}
// The selector method takes a ( timestamp, value ) pair, a
// ( []timestamp, []value ) pair, and a starting index. It applies the selector
// to the single value and the array, starting at the supplied index. It
// returns -1 if the single value is selected and a non-negative value if an
// item from the array is selected.
type integerSelectorMethod func(int64, int64, []int64, []int64, int) int
// The selector accumulator tracks currently-selected item.
type integerSelectorAccumulator struct {
selector integerSelectorMethod
ts int64
v int64
tags [][]byte
}
func (a *integerSelectorAccumulator) AccumulateFirst(timestamps []int64, values []int64, tags [][]byte) {
index := a.selector(timestamps[0], values[0], timestamps, values, 1)
if index < 0 {
a.ts = timestamps[0]
a.v = values[0]
} else {
a.ts = timestamps[index]
a.v = values[index]
}
a.tags = make([][]byte, len(tags))
copy(a.tags, tags)
}
func (a *integerSelectorAccumulator) AccumulateMore(timestamps []int64, values []int64, tags [][]byte) {
index := a.selector(a.ts, a.v, timestamps, values, 0)
if index >= 0 {
a.ts = timestamps[index]
a.v = values[index]
if len(tags) > cap(a.tags) {
a.tags = make([][]byte, len(tags))
} else {
a.tags = a.tags[:len(tags)]
}
copy(a.tags, tags)
}
}
func (a *integerSelectorAccumulator) Result() (int64, int64, [][]byte) {
return a.ts, a.v, a.tags
}
// The aggregate method takes a value, an array of values, and a starting
// index, applies an aggregate operation over the value and the array, starting
// at the given index, and returns the result.
type integerAggregateMethod func(int64, []int64, int) int64
type integerAggregateAccumulator struct {
aggregate integerAggregateMethod
accum int64
// For pure aggregates it doesn't matter what we return for tags, but
// we need to satisfy the interface. We will just return the most
// recently seen tags.
tags [][]byte
}
func (a *integerAggregateAccumulator) AccumulateFirst(timestamps []int64, values []int64, tags [][]byte) {
a.accum = a.aggregate(values[0], values, 1)
a.tags = tags
}
func (a *integerAggregateAccumulator) AccumulateMore(timestamps []int64, values []int64, tags [][]byte) {
a.accum = a.aggregate(a.accum, values, 0)
a.tags = tags
}
// For group aggregates (non-selectors), the timestamp is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be
// anything and it won't matter.
func (a *integerAggregateAccumulator) Result() (int64, int64, [][]byte) {
return math.MaxInt64, a.accum, a.tags
}
// makeIntegerAggregateAccumulator returns the interface implementation for
// aggregating returned points within the same group. The incoming points are
// the ones returned for each series and the struct returned here will
// aggregate the aggregates.
func makeIntegerAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (IntegerAggregateAccumulator, error) {
switch agg {
case datatypes.Aggregate_AggregateTypeFirst:
return &integerSelectorAccumulator{selector: selectorFirstGroupsInteger}, nil
case datatypes.Aggregate_AggregateTypeLast:
return &integerSelectorAccumulator{selector: selectorLastGroupsInteger}, nil
case datatypes.Aggregate_AggregateTypeCount:
return &integerAggregateAccumulator{aggregate: aggregateCountGroupsInteger}, nil
case datatypes.Aggregate_AggregateTypeSum:
return &integerAggregateAccumulator{aggregate: aggregateSumGroupsInteger}, nil
case datatypes.Aggregate_AggregateTypeMin:
return &integerSelectorAccumulator{selector: selectorMinGroupsInteger}, nil
case datatypes.Aggregate_AggregateTypeMax:
return &integerSelectorAccumulator{selector: selectorMaxGroupsInteger}, nil
default:
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg),
}
}
}
func selectorMinGroupsInteger(ts int64, v int64, timestamps []int64, values []int64, i int) int {
index := -1
for ; i < len(values); i++ {
if v > values[i] {
index = i
v = values[i]
}
}
return index
}
func selectorMaxGroupsInteger(ts int64, v int64, timestamps []int64, values []int64, i int) int {
index := -1
for ; i < len(values); i++ {
if v < values[i] {
index = i
v = values[i]
}
}
return index
}
func aggregateCountGroupsInteger(accum int64, values []int64, i int) int64 {
return aggregateSumGroupsInteger(accum, values, i)
}
func aggregateSumGroupsInteger(sum int64, values []int64, i int) int64 {
for ; i < len(values); i++ {
sum += values[i]
}
return sum
}
func selectorFirstGroupsInteger(ts int64, v int64, timestamps []int64, values []int64, i int) int {
index := -1
for ; i < len(values); i++ {
if ts > timestamps[i] {
index = i
ts = timestamps[i]
}
}
return index
}
func selectorLastGroupsInteger(ts int64, v int64, timestamps []int64, values []int64, i int) int {
index := -1
for ; i < len(values); i++ {
if ts <= timestamps[i] {
index = i
ts = timestamps[i]
}
}
return index
}
func (t *integerGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
if cur == nil {
continue
}
if typedCur, ok := cur.(cursors.IntegerArrayCursor); !ok {
// TODO(sgc): error or skip?
cur.Close()
t.err = &errors.Error{
Code: errors.EInvalid,
Err: &GroupCursorError{
typ: "integer",
cursor: cur,
},
}
return false
} else {
t.readTags(t.gc.Tags())
t.cur = typedCur
return true
}
}
return false
}
func (t *integerGroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
}
//
// *********** Unsigned ***********
//
type unsignedTable struct {
table
mu sync.Mutex
cur cursors.UnsignedArrayCursor
alloc *memory.Allocator
}
func newUnsignedTable(
done chan struct{},
cur cursors.UnsignedArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *unsignedTable {
t := &unsignedTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *unsignedTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
t.mu.Unlock()
}
func (t *unsignedTable) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return cursors.CursorStats{}
}
cs := cur.Stats()
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
}
func (t *unsignedTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *unsignedTable) advance() bool {
a := t.cur.Next()
l := a.Len()
if l == 0 {
return false
}
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
cr := t.allocateBuffer(l)
cr.cols[timeColIdx] = arrow.NewInt(a.Timestamps, t.alloc)
cr.cols[valueColIdx] = t.toArrowBuffer(a.Values)
t.appendTags(cr)
t.appendBounds(cr)
return true
}
// window table
type unsignedWindowTable struct {
unsignedTable
arr *cursors.UnsignedArray
windowBounds interval.Bounds
idxInArr int
createEmpty bool
timeColumn string
isAggregate bool
window interval.Window
}
func newUnsignedWindowTable(
done chan struct{},
cur cursors.UnsignedArrayCursor,
bounds execute.Bounds,
window interval.Window,
createEmpty bool,
timeColumn string,
isAggregate bool,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *unsignedWindowTable {
t := &unsignedWindowTable{
unsignedTable: unsignedTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
window: window,
createEmpty: createEmpty,
timeColumn: timeColumn,
isAggregate: isAggregate,
}
if t.createEmpty {
start := int64(bounds.Start)
t.windowBounds = window.GetLatestBounds(values.Time(start))
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *unsignedWindowTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
// createNextBufferTimes will read the timestamps from the array
// cursor and construct the values for the next buffer.
func (t *unsignedWindowTable) createNextBufferTimes() (start, stop *array.Int, ok bool) {
startB := arrow.NewIntBuilder(t.alloc)
stopB := arrow.NewIntBuilder(t.alloc)
if t.createEmpty {
// There are no more windows when the start time is greater
// than or equal to the stop time.
if startT := int64(t.windowBounds.Start()); startT >= int64(t.bounds.Stop) {
return nil, nil, false
}
// Create a buffer with the buffer size.
// TODO(jsternberg): Calculate the exact size with max points as the maximum.
startB.Resize(storage.MaxPointsPerBlock)
stopB.Resize(storage.MaxPointsPerBlock)
for ; ; t.windowBounds = t.window.NextBounds(t.windowBounds) {
startT, stopT := t.getWindowBoundsFor(t.windowBounds)
if startT >= int64(t.bounds.Stop) {
break
}
startB.Append(startT)
stopB.Append(stopT)
}
start = startB.NewIntArray()
stop = stopB.NewIntArray()
return start, stop, true
}
// Retrieve the next buffer so we can copy the timestamps.
if !t.nextBuffer() {
return nil, nil, false
}
// Copy over the timestamps from the next buffer and adjust
// times for the boundaries.
startB.Resize(len(t.arr.Timestamps))
stopB.Resize(len(t.arr.Timestamps))
for _, stopT := range t.arr.Timestamps {
bounds := t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stopT)))
startT, stopT := t.getWindowBoundsFor(bounds)
startB.Append(startT)
stopB.Append(stopT)
}
start = startB.NewIntArray()
stop = stopB.NewIntArray()
return start, stop, true
}
func (t *unsignedWindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64, int64) {
beg := int64(bounds.Start())
end := int64(bounds.Stop())
if beg < int64(t.bounds.Start) {
beg = int64(t.bounds.Start)
}
if end > int64(t.bounds.Stop) {
end = int64(t.bounds.Stop)
}
return beg, end
}
// nextAt will retrieve the next value that can be used with
// the given stop timestamp. If no values can be used with the timestamp,
// it will return the default value and false.
func (t *unsignedWindowTable) nextAt(stop int64) (v uint64, ok bool) {
if !t.nextBuffer() {
return
} else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) {
return
}
v, ok = t.arr.Values[t.idxInArr], true
t.idxInArr++
return v, ok
}
// isInWindow will check if the given time may be used within the window
// denoted by the stop timestamp. The stop may be a truncated stop time
// because of a restricted boundary.
//
// When used with an aggregate, ts will be the true stop time returned
// by storage. When used with an aggregate, it will be the real time
// for the point.
func (t *unsignedWindowTable) isInWindow(stop int64, ts int64) bool {
// Retrieve the boundary associated with this stop time.
// This will be the boundary for the previous nanosecond.
bounds := t.window.GetLatestBounds(values.Time(stop - 1))
start, stop := int64(bounds.Start()), int64(bounds.Stop())
// For an aggregate, the timestamp will be the stop time of the boundary.
if t.isAggregate {
return start < ts && ts <= stop
}
// For a selector, the timestamp should be within the boundary.
return start <= ts && ts < stop
}
// nextBuffer will ensure the array cursor is filled
// and will return true if there is at least one value
// that can be read from it.
func (t *unsignedWindowTable) nextBuffer() bool {
// Discard the current array cursor if we have
// exceeded it.
if t.arr != nil && t.idxInArr >= t.arr.Len() {
t.arr = nil
}
// Retrieve the next array cursor if needed.
if t.arr == nil {
arr := t.cur.Next()
if arr.Len() == 0 {
return false
}
t.arr, t.idxInArr = arr, 0
}
return true
}
// appendValues will scan the timestamps and append values
// that match those timestamps from the buffer.
func (t *unsignedWindowTable) appendValues(intervals []int64, appendValue func(v uint64), appendNull func()) {
for i := 0; i < len(intervals); i++ {
if v, ok := t.nextAt(intervals[i]); ok {
appendValue(v)
continue
}
appendNull()
}
}
func (t *unsignedWindowTable) advance() bool {
if !t.nextBuffer() {
return false
}
// Create the timestamps for the next window.
start, stop, ok := t.createNextBufferTimes()
if !ok {
return false
}
values := t.mergeValues(stop.Int64Values())
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
cr := t.allocateBuffer(stop.Len())
if t.timeColumn != "" {
switch t.timeColumn {
case execute.DefaultStopColLabel:
cr.cols[timeColIdx] = stop
start.Release()
case execute.DefaultStartColLabel:
cr.cols[timeColIdx] = start
stop.Release()
}
cr.cols[valueColIdx] = values
t.appendBounds(cr)
} else {
cr.cols[startColIdx] = start
cr.cols[stopColIdx] = stop
cr.cols[valueColIdxWithoutTime] = values
}
t.appendTags(cr)
return true
}
// This table implementation will not have any empty windows.
type unsignedWindowSelectorTable struct {
unsignedTable
timeColumn string
window interval.Window
}
func newUnsignedWindowSelectorTable(
done chan struct{},
cur cursors.UnsignedArrayCursor,
bounds execute.Bounds,
window interval.Window,
timeColumn string,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *unsignedWindowSelectorTable {
t := &unsignedWindowSelectorTable{
unsignedTable: unsignedTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
window: window,
timeColumn: timeColumn,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *unsignedWindowSelectorTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *unsignedWindowSelectorTable) advance() bool {
arr := t.cur.Next()
if arr.Len() == 0 {
return false
}
cr := t.allocateBuffer(arr.Len())
switch t.timeColumn {
case execute.DefaultStartColLabel:
cr.cols[timeColIdx] = t.startTimes(arr)
t.appendBounds(cr)
case execute.DefaultStopColLabel:
cr.cols[timeColIdx] = t.stopTimes(arr)
t.appendBounds(cr)
default:
cr.cols[startColIdx] = t.startTimes(arr)
cr.cols[stopColIdx] = t.stopTimes(arr)
cr.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc)
}
cr.cols[valueColIdx] = t.toArrowBuffer(arr.Values)
t.appendTags(cr)
return true
}
func (t *unsignedWindowSelectorTable) startTimes(arr *cursors.UnsignedArray) *array.Int {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(arr.Len())
rangeStart := int64(t.bounds.Start)
for _, v := range arr.Timestamps {
if windowStart := int64(t.window.GetLatestBounds(values.Time(v)).Start()); windowStart < rangeStart {
start.Append(rangeStart)
} else {
start.Append(windowStart)
}
}
return start.NewIntArray()
}
func (t *unsignedWindowSelectorTable) stopTimes(arr *cursors.UnsignedArray) *array.Int {
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(arr.Len())
rangeStop := int64(t.bounds.Stop)
for _, v := range arr.Timestamps {
if windowStop := int64(t.window.GetLatestBounds(values.Time(v)).Stop()); windowStop > rangeStop {
stop.Append(rangeStop)
} else {
stop.Append(windowStop)
}
}
return stop.NewIntArray()
}
// This table implementation may contain empty windows
// in addition to non-empty windows.
type unsignedEmptyWindowSelectorTable struct {
unsignedTable
arr *cursors.UnsignedArray
idx int
rangeStart int64
rangeStop int64
windowBounds interval.Bounds
timeColumn string
window interval.Window
}
func newUnsignedEmptyWindowSelectorTable(
done chan struct{},
cur cursors.UnsignedArrayCursor,
bounds execute.Bounds,
window interval.Window,
timeColumn string,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *unsignedEmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
t := &unsignedEmptyWindowSelectorTable{
unsignedTable: unsignedTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
arr: cur.Next(),
rangeStart: rangeStart,
rangeStop: rangeStop,
windowBounds: window.GetLatestBounds(values.Time(rangeStart)),
window: window,
timeColumn: timeColumn,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *unsignedEmptyWindowSelectorTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *unsignedEmptyWindowSelectorTable) advance() bool {
if t.arr.Len() == 0 {
return false
}
values := t.arrowBuilder()
values.Resize(storage.MaxPointsPerBlock)
var cr *colReader
switch t.timeColumn {
case execute.DefaultStartColLabel:
start := t.startTimes(values)
cr = t.allocateBuffer(start.Len())
cr.cols[timeColIdx] = start
t.appendBounds(cr)
case execute.DefaultStopColLabel:
stop := t.stopTimes(values)
cr = t.allocateBuffer(stop.Len())
cr.cols[timeColIdx] = stop
t.appendBounds(cr)
default:
start, stop, time := t.startStopTimes(values)
cr = t.allocateBuffer(time.Len())
cr.cols[startColIdx] = start
cr.cols[stopColIdx] = stop
cr.cols[timeColIdx] = time
}
cr.cols[valueColIdx] = values.NewUintArray()
t.appendTags(cr)
return true
}
func (t *unsignedEmptyWindowSelectorTable) startTimes(builder *array.UintBuilder) *array.Int {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(storage.MaxPointsPerBlock)
for int64(t.windowBounds.Start()) < t.rangeStop {
// The first window should start at the
// beginning of the time range.
if int64(t.windowBounds.Start()) < t.rangeStart {
start.Append(t.rangeStart)
} else {
start.Append(int64(t.windowBounds.Start()))
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
builder.AppendNull()
}
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if start.Len() == storage.MaxPointsPerBlock {
break
}
}
return start.NewIntArray()
}
func (t *unsignedEmptyWindowSelectorTable) stopTimes(builder *array.UintBuilder) *array.Int {
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(storage.MaxPointsPerBlock)
for int64(t.windowBounds.Start()) < t.rangeStop {
// The last window should stop at the end of
// the time range.
if int64(t.windowBounds.Stop()) > t.rangeStop {
stop.Append(t.rangeStop)
} else {
stop.Append(int64(t.windowBounds.Stop()))
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
builder.AppendNull()
}
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if stop.Len() == storage.MaxPointsPerBlock {
break
}
}
return stop.NewIntArray()
}
func (t *unsignedEmptyWindowSelectorTable) startStopTimes(builder *array.UintBuilder) (*array.Int, *array.Int, *array.Int) {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(storage.MaxPointsPerBlock)
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(storage.MaxPointsPerBlock)
time := arrow.NewIntBuilder(t.alloc)
time.Resize(storage.MaxPointsPerBlock)
for int64(t.windowBounds.Start()) < t.rangeStop {
// The first window should start at the
// beginning of the time range.
if int64(t.windowBounds.Start()) < t.rangeStart {
start.Append(t.rangeStart)
} else {
start.Append(int64(t.windowBounds.Start()))
}
// The last window should stop at the end of
// the time range.
if int64(t.windowBounds.Stop()) > t.rangeStop {
stop.Append(t.rangeStop)
} else {
stop.Append(int64(t.windowBounds.Stop()))
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
time.Append(v)
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
time.AppendNull()
builder.AppendNull()
}
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if time.Len() == storage.MaxPointsPerBlock {
break
}
}
return start.NewIntArray(), stop.NewIntArray(), time.NewIntArray()
}
// group table
type unsignedGroupTable struct {
table
mu sync.Mutex
gc storage.GroupCursor
cur cursors.UnsignedArrayCursor
}
func newUnsignedGroupTable(
done chan struct{},
gc storage.GroupCursor,
cur cursors.UnsignedArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *unsignedGroupTable {
t := &unsignedGroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *unsignedGroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.gc != nil {
t.gc.Close()
t.gc = nil
}
t.mu.Unlock()
}
func (t *unsignedGroupTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *unsignedGroupTable) advance() bool {
if t.cur == nil {
// For group aggregates, we will try to get all the series and all table buffers within those series
// all at once and merge them into one row when this advance() function is first called.
// At the end of this process, t.advanceCursor() already returns false and t.cur becomes nil.
// But we still need to return true to indicate that there is data to be returned.
// The second time when we call this advance(), t.cur is already nil, so we directly return false.
return false
}
var arr *cursors.UnsignedArray
var len int
for {
arr = t.cur.Next()
len = arr.Len()
if len > 0 {
break
}
if !t.advanceCursor() {
return false
}
}
// handle the group without aggregate case
if t.gc.Aggregate() == nil {
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
colReader := t.allocateBuffer(len)
colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc)
colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values)
t.appendTags(colReader)
t.appendBounds(colReader)
return true
}
aggregate, err := makeUnsignedAggregateAccumulator(t.gc.Aggregate().Type)
if err != nil {
t.err = err
return false
}
aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags)
for {
arr = t.cur.Next()
if arr.Len() > 0 {
aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags)
continue
}
if !t.advanceCursor() {
break
}
}
timestamp, value, tags := aggregate.Result()
colReader := t.allocateBuffer(1)
if IsSelector(t.gc.Aggregate()) {
colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc)
colReader.cols[valueColIdx] = t.toArrowBuffer([]uint64{value})
} else {
colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]uint64{value})
}
t.appendTheseTags(colReader, tags)
t.appendBounds(colReader)
return true
}
type UnsignedAggregateAccumulator interface {
// AccumulateFirst receives an initial array of items to select from.
// It selects an item and stores the state. Afterwards, more data can
// be supplied with AccumulateMore and the results can be requested at
// any time. Without a call to AccumulateFirst the results are not
// defined.
AccumulateFirst(timestamps []int64, values []uint64, tags [][]byte)
// AccumulateMore receives additional array elements to select from.
AccumulateMore(timestamps []int64, values []uint64, tags [][]byte)
// Result returns the item selected from the data received so far.
Result() (int64, uint64, [][]byte)
}
// The selector method takes a ( timestamp, value ) pair, a
// ( []timestamp, []value ) pair, and a starting index. It applies the selector
// to the single value and the array, starting at the supplied index. It
// returns -1 if the single value is selected and a non-negative value if an
// item from the array is selected.
type unsignedSelectorMethod func(int64, uint64, []int64, []uint64, int) int
// The selector accumulator tracks currently-selected item.
type unsignedSelectorAccumulator struct {
selector unsignedSelectorMethod
ts int64
v uint64
tags [][]byte
}
func (a *unsignedSelectorAccumulator) AccumulateFirst(timestamps []int64, values []uint64, tags [][]byte) {
index := a.selector(timestamps[0], values[0], timestamps, values, 1)
if index < 0 {
a.ts = timestamps[0]
a.v = values[0]
} else {
a.ts = timestamps[index]
a.v = values[index]
}
a.tags = make([][]byte, len(tags))
copy(a.tags, tags)
}
func (a *unsignedSelectorAccumulator) AccumulateMore(timestamps []int64, values []uint64, tags [][]byte) {
index := a.selector(a.ts, a.v, timestamps, values, 0)
if index >= 0 {
a.ts = timestamps[index]
a.v = values[index]
if len(tags) > cap(a.tags) {
a.tags = make([][]byte, len(tags))
} else {
a.tags = a.tags[:len(tags)]
}
copy(a.tags, tags)
}
}
func (a *unsignedSelectorAccumulator) Result() (int64, uint64, [][]byte) {
return a.ts, a.v, a.tags
}
// The aggregate method takes a value, an array of values, and a starting
// index, applies an aggregate operation over the value and the array, starting
// at the given index, and returns the result.
type unsignedAggregateMethod func(uint64, []uint64, int) uint64
type unsignedAggregateAccumulator struct {
aggregate unsignedAggregateMethod
accum uint64
// For pure aggregates it doesn't matter what we return for tags, but
// we need to satisfy the interface. We will just return the most
// recently seen tags.
tags [][]byte
}
func (a *unsignedAggregateAccumulator) AccumulateFirst(timestamps []int64, values []uint64, tags [][]byte) {
a.accum = a.aggregate(values[0], values, 1)
a.tags = tags
}
func (a *unsignedAggregateAccumulator) AccumulateMore(timestamps []int64, values []uint64, tags [][]byte) {
a.accum = a.aggregate(a.accum, values, 0)
a.tags = tags
}
// For group aggregates (non-selectors), the timestamp is always math.MaxInt64.
// their final result does not contain _time, so this timestamp value can be
// anything and it won't matter.
func (a *unsignedAggregateAccumulator) Result() (int64, uint64, [][]byte) {
return math.MaxInt64, a.accum, a.tags
}
// makeUnsignedAggregateAccumulator returns the interface implementation for
// aggregating returned points within the same group. The incoming points are
// the ones returned for each series and the struct returned here will
// aggregate the aggregates.
func makeUnsignedAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (UnsignedAggregateAccumulator, error) {
switch agg {
case datatypes.Aggregate_AggregateTypeFirst:
return &unsignedSelectorAccumulator{selector: selectorFirstGroupsUnsigned}, nil
case datatypes.Aggregate_AggregateTypeLast:
return &unsignedSelectorAccumulator{selector: selectorLastGroupsUnsigned}, nil
case datatypes.Aggregate_AggregateTypeCount:
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: "unsupported for aggregate count: Unsigned",
}
case datatypes.Aggregate_AggregateTypeSum:
return &unsignedAggregateAccumulator{aggregate: aggregateSumGroupsUnsigned}, nil
case datatypes.Aggregate_AggregateTypeMin:
return &unsignedSelectorAccumulator{selector: selectorMinGroupsUnsigned}, nil
case datatypes.Aggregate_AggregateTypeMax:
return &unsignedSelectorAccumulator{selector: selectorMaxGroupsUnsigned}, nil
default:
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg),
}
}
}
func selectorMinGroupsUnsigned(ts int64, v uint64, timestamps []int64, values []uint64, i int) int {
index := -1
for ; i < len(values); i++ {
if v > values[i] {
index = i
v = values[i]
}
}
return index
}
func selectorMaxGroupsUnsigned(ts int64, v uint64, timestamps []int64, values []uint64, i int) int {
index := -1
for ; i < len(values); i++ {
if v < values[i] {
index = i
v = values[i]
}
}
return index
}
func aggregateSumGroupsUnsigned(sum uint64, values []uint64, i int) uint64 {
for ; i < len(values); i++ {
sum += values[i]
}
return sum
}
func selectorFirstGroupsUnsigned(ts int64, v uint64, timestamps []int64, values []uint64, i int) int {
index := -1
for ; i < len(values); i++ {
if ts > timestamps[i] {
index = i
ts = timestamps[i]
}
}
return index
}
func selectorLastGroupsUnsigned(ts int64, v uint64, timestamps []int64, values []uint64, i int) int {
index := -1
for ; i < len(values); i++ {
if ts <= timestamps[i] {
index = i
ts = timestamps[i]
}
}
return index
}
func (t *unsignedGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
if cur == nil {
continue
}
if typedCur, ok := cur.(cursors.UnsignedArrayCursor); !ok {
// TODO(sgc): error or skip?
cur.Close()
t.err = &errors.Error{
Code: errors.EInvalid,
Err: &GroupCursorError{
typ: "unsigned",
cursor: cur,
},
}
return false
} else {
t.readTags(t.gc.Tags())
t.cur = typedCur
return true
}
}
return false
}
func (t *unsignedGroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
}
//
// *********** String ***********
//
type stringTable struct {
table
mu sync.Mutex
cur cursors.StringArrayCursor
alloc *memory.Allocator
}
func newStringTable(
done chan struct{},
cur cursors.StringArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *stringTable {
t := &stringTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *stringTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
t.mu.Unlock()
}
func (t *stringTable) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return cursors.CursorStats{}
}
cs := cur.Stats()
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
}
func (t *stringTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *stringTable) advance() bool {
a := t.cur.Next()
l := a.Len()
if l == 0 {
return false
}
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
cr := t.allocateBuffer(l)
cr.cols[timeColIdx] = arrow.NewInt(a.Timestamps, t.alloc)
cr.cols[valueColIdx] = t.toArrowBuffer(a.Values)
t.appendTags(cr)
t.appendBounds(cr)
return true
}
// window table
type stringWindowTable struct {
stringTable
arr *cursors.StringArray
windowBounds interval.Bounds
idxInArr int
createEmpty bool
timeColumn string
isAggregate bool
window interval.Window
}
func newStringWindowTable(
done chan struct{},
cur cursors.StringArrayCursor,
bounds execute.Bounds,
window interval.Window,
createEmpty bool,
timeColumn string,
isAggregate bool,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *stringWindowTable {
t := &stringWindowTable{
stringTable: stringTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
window: window,
createEmpty: createEmpty,
timeColumn: timeColumn,
isAggregate: isAggregate,
}
if t.createEmpty {
start := int64(bounds.Start)
t.windowBounds = window.GetLatestBounds(values.Time(start))
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *stringWindowTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
// createNextBufferTimes will read the timestamps from the array
// cursor and construct the values for the next buffer.
func (t *stringWindowTable) createNextBufferTimes() (start, stop *array.Int, ok bool) {
startB := arrow.NewIntBuilder(t.alloc)
stopB := arrow.NewIntBuilder(t.alloc)
if t.createEmpty {
// There are no more windows when the start time is greater
// than or equal to the stop time.
if startT := int64(t.windowBounds.Start()); startT >= int64(t.bounds.Stop) {
return nil, nil, false
}
// Create a buffer with the buffer size.
// TODO(jsternberg): Calculate the exact size with max points as the maximum.
startB.Resize(storage.MaxPointsPerBlock)
stopB.Resize(storage.MaxPointsPerBlock)
for ; ; t.windowBounds = t.window.NextBounds(t.windowBounds) {
startT, stopT := t.getWindowBoundsFor(t.windowBounds)
if startT >= int64(t.bounds.Stop) {
break
}
startB.Append(startT)
stopB.Append(stopT)
}
start = startB.NewIntArray()
stop = stopB.NewIntArray()
return start, stop, true
}
// Retrieve the next buffer so we can copy the timestamps.
if !t.nextBuffer() {
return nil, nil, false
}
// Copy over the timestamps from the next buffer and adjust
// times for the boundaries.
startB.Resize(len(t.arr.Timestamps))
stopB.Resize(len(t.arr.Timestamps))
for _, stopT := range t.arr.Timestamps {
bounds := t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stopT)))
startT, stopT := t.getWindowBoundsFor(bounds)
startB.Append(startT)
stopB.Append(stopT)
}
start = startB.NewIntArray()
stop = stopB.NewIntArray()
return start, stop, true
}
func (t *stringWindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64, int64) {
beg := int64(bounds.Start())
end := int64(bounds.Stop())
if beg < int64(t.bounds.Start) {
beg = int64(t.bounds.Start)
}
if end > int64(t.bounds.Stop) {
end = int64(t.bounds.Stop)
}
return beg, end
}
// nextAt will retrieve the next value that can be used with
// the given stop timestamp. If no values can be used with the timestamp,
// it will return the default value and false.
func (t *stringWindowTable) nextAt(stop int64) (v string, ok bool) {
if !t.nextBuffer() {
return
} else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) {
return
}
v, ok = t.arr.Values[t.idxInArr], true
t.idxInArr++
return v, ok
}
// isInWindow will check if the given time may be used within the window
// denoted by the stop timestamp. The stop may be a truncated stop time
// because of a restricted boundary.
//
// When used with an aggregate, ts will be the true stop time returned
// by storage. When used with an aggregate, it will be the real time
// for the point.
func (t *stringWindowTable) isInWindow(stop int64, ts int64) bool {
// Retrieve the boundary associated with this stop time.
// This will be the boundary for the previous nanosecond.
bounds := t.window.GetLatestBounds(values.Time(stop - 1))
start, stop := int64(bounds.Start()), int64(bounds.Stop())
// For an aggregate, the timestamp will be the stop time of the boundary.
if t.isAggregate {
return start < ts && ts <= stop
}
// For a selector, the timestamp should be within the boundary.
return start <= ts && ts < stop
}
// nextBuffer will ensure the array cursor is filled
// and will return true if there is at least one value
// that can be read from it.
func (t *stringWindowTable) nextBuffer() bool {
// Discard the current array cursor if we have
// exceeded it.
if t.arr != nil && t.idxInArr >= t.arr.Len() {
t.arr = nil
}
// Retrieve the next array cursor if needed.
if t.arr == nil {
arr := t.cur.Next()
if arr.Len() == 0 {
return false
}
t.arr, t.idxInArr = arr, 0
}
return true
}
// appendValues will scan the timestamps and append values
// that match those timestamps from the buffer.
func (t *stringWindowTable) appendValues(intervals []int64, appendValue func(v string), appendNull func()) {
for i := 0; i < len(intervals); i++ {
if v, ok := t.nextAt(intervals[i]); ok {
appendValue(v)
continue
}
appendNull()
}
}
func (t *stringWindowTable) advance() bool {
if !t.nextBuffer() {
return false
}
// Create the timestamps for the next window.
start, stop, ok := t.createNextBufferTimes()
if !ok {
return false
}
values := t.mergeValues(stop.Int64Values())
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
cr := t.allocateBuffer(stop.Len())
if t.timeColumn != "" {
switch t.timeColumn {
case execute.DefaultStopColLabel:
cr.cols[timeColIdx] = stop
start.Release()
case execute.DefaultStartColLabel:
cr.cols[timeColIdx] = start
stop.Release()
}
cr.cols[valueColIdx] = values
t.appendBounds(cr)
} else {
cr.cols[startColIdx] = start
cr.cols[stopColIdx] = stop
cr.cols[valueColIdxWithoutTime] = values
}
t.appendTags(cr)
return true
}
// This table implementation will not have any empty windows.
type stringWindowSelectorTable struct {
stringTable
timeColumn string
window interval.Window
}
func newStringWindowSelectorTable(
done chan struct{},
cur cursors.StringArrayCursor,
bounds execute.Bounds,
window interval.Window,
timeColumn string,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *stringWindowSelectorTable {
t := &stringWindowSelectorTable{
stringTable: stringTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
window: window,
timeColumn: timeColumn,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *stringWindowSelectorTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *stringWindowSelectorTable) advance() bool {
arr := t.cur.Next()
if arr.Len() == 0 {
return false
}
cr := t.allocateBuffer(arr.Len())
switch t.timeColumn {
case execute.DefaultStartColLabel:
cr.cols[timeColIdx] = t.startTimes(arr)
t.appendBounds(cr)
case execute.DefaultStopColLabel:
cr.cols[timeColIdx] = t.stopTimes(arr)
t.appendBounds(cr)
default:
cr.cols[startColIdx] = t.startTimes(arr)
cr.cols[stopColIdx] = t.stopTimes(arr)
cr.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc)
}
cr.cols[valueColIdx] = t.toArrowBuffer(arr.Values)
t.appendTags(cr)
return true
}
func (t *stringWindowSelectorTable) startTimes(arr *cursors.StringArray) *array.Int {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(arr.Len())
rangeStart := int64(t.bounds.Start)
for _, v := range arr.Timestamps {
if windowStart := int64(t.window.GetLatestBounds(values.Time(v)).Start()); windowStart < rangeStart {
start.Append(rangeStart)
} else {
start.Append(windowStart)
}
}
return start.NewIntArray()
}
func (t *stringWindowSelectorTable) stopTimes(arr *cursors.StringArray) *array.Int {
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(arr.Len())
rangeStop := int64(t.bounds.Stop)
for _, v := range arr.Timestamps {
if windowStop := int64(t.window.GetLatestBounds(values.Time(v)).Stop()); windowStop > rangeStop {
stop.Append(rangeStop)
} else {
stop.Append(windowStop)
}
}
return stop.NewIntArray()
}
// This table implementation may contain empty windows
// in addition to non-empty windows.
type stringEmptyWindowSelectorTable struct {
stringTable
arr *cursors.StringArray
idx int
rangeStart int64
rangeStop int64
windowBounds interval.Bounds
timeColumn string
window interval.Window
}
func newStringEmptyWindowSelectorTable(
done chan struct{},
cur cursors.StringArrayCursor,
bounds execute.Bounds,
window interval.Window,
timeColumn string,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *stringEmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
t := &stringEmptyWindowSelectorTable{
stringTable: stringTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
arr: cur.Next(),
rangeStart: rangeStart,
rangeStop: rangeStop,
windowBounds: window.GetLatestBounds(values.Time(rangeStart)),
window: window,
timeColumn: timeColumn,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *stringEmptyWindowSelectorTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *stringEmptyWindowSelectorTable) advance() bool {
if t.arr.Len() == 0 {
return false
}
values := t.arrowBuilder()
values.Resize(storage.MaxPointsPerBlock)
var cr *colReader
switch t.timeColumn {
case execute.DefaultStartColLabel:
start := t.startTimes(values)
cr = t.allocateBuffer(start.Len())
cr.cols[timeColIdx] = start
t.appendBounds(cr)
case execute.DefaultStopColLabel:
stop := t.stopTimes(values)
cr = t.allocateBuffer(stop.Len())
cr.cols[timeColIdx] = stop
t.appendBounds(cr)
default:
start, stop, time := t.startStopTimes(values)
cr = t.allocateBuffer(time.Len())
cr.cols[startColIdx] = start
cr.cols[stopColIdx] = stop
cr.cols[timeColIdx] = time
}
cr.cols[valueColIdx] = values.NewStringArray()
t.appendTags(cr)
return true
}
func (t *stringEmptyWindowSelectorTable) startTimes(builder *array.StringBuilder) *array.Int {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(storage.MaxPointsPerBlock)
for int64(t.windowBounds.Start()) < t.rangeStop {
// The first window should start at the
// beginning of the time range.
if int64(t.windowBounds.Start()) < t.rangeStart {
start.Append(t.rangeStart)
} else {
start.Append(int64(t.windowBounds.Start()))
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
builder.AppendNull()
}
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if start.Len() == storage.MaxPointsPerBlock {
break
}
}
return start.NewIntArray()
}
func (t *stringEmptyWindowSelectorTable) stopTimes(builder *array.StringBuilder) *array.Int {
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(storage.MaxPointsPerBlock)
for int64(t.windowBounds.Start()) < t.rangeStop {
// The last window should stop at the end of
// the time range.
if int64(t.windowBounds.Stop()) > t.rangeStop {
stop.Append(t.rangeStop)
} else {
stop.Append(int64(t.windowBounds.Stop()))
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
builder.AppendNull()
}
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if stop.Len() == storage.MaxPointsPerBlock {
break
}
}
return stop.NewIntArray()
}
func (t *stringEmptyWindowSelectorTable) startStopTimes(builder *array.StringBuilder) (*array.Int, *array.Int, *array.Int) {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(storage.MaxPointsPerBlock)
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(storage.MaxPointsPerBlock)
time := arrow.NewIntBuilder(t.alloc)
time.Resize(storage.MaxPointsPerBlock)
for int64(t.windowBounds.Start()) < t.rangeStop {
// The first window should start at the
// beginning of the time range.
if int64(t.windowBounds.Start()) < t.rangeStart {
start.Append(t.rangeStart)
} else {
start.Append(int64(t.windowBounds.Start()))
}
// The last window should stop at the end of
// the time range.
if int64(t.windowBounds.Stop()) > t.rangeStop {
stop.Append(t.rangeStop)
} else {
stop.Append(int64(t.windowBounds.Stop()))
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
time.Append(v)
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
time.AppendNull()
builder.AppendNull()
}
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if time.Len() == storage.MaxPointsPerBlock {
break
}
}
return start.NewIntArray(), stop.NewIntArray(), time.NewIntArray()
}
// group table
type stringGroupTable struct {
table
mu sync.Mutex
gc storage.GroupCursor
cur cursors.StringArrayCursor
}
func newStringGroupTable(
done chan struct{},
gc storage.GroupCursor,
cur cursors.StringArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *stringGroupTable {
t := &stringGroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *stringGroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.gc != nil {
t.gc.Close()
t.gc = nil
}
t.mu.Unlock()
}
func (t *stringGroupTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *stringGroupTable) advance() bool {
if t.cur == nil {
// For group aggregates, we will try to get all the series and all table buffers within those series
// all at once and merge them into one row when this advance() function is first called.
// At the end of this process, t.advanceCursor() already returns false and t.cur becomes nil.
// But we still need to return true to indicate that there is data to be returned.
// The second time when we call this advance(), t.cur is already nil, so we directly return false.
return false
}
var arr *cursors.StringArray
var len int
for {
arr = t.cur.Next()
len = arr.Len()
if len > 0 {
break
}
if !t.advanceCursor() {
return false
}
}
// handle the group without aggregate case
if t.gc.Aggregate() == nil {
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
colReader := t.allocateBuffer(len)
colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc)
colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values)
t.appendTags(colReader)
t.appendBounds(colReader)
return true
}
aggregate, err := makeStringAggregateAccumulator(t.gc.Aggregate().Type)
if err != nil {
t.err = err
return false
}
aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags)
for {
arr = t.cur.Next()
if arr.Len() > 0 {
aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags)
continue
}
if !t.advanceCursor() {
break
}
}
timestamp, value, tags := aggregate.Result()
colReader := t.allocateBuffer(1)
if IsSelector(t.gc.Aggregate()) {
colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc)
colReader.cols[valueColIdx] = t.toArrowBuffer([]string{value})
} else {
colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]string{value})
}
t.appendTheseTags(colReader, tags)
t.appendBounds(colReader)
return true
}
type StringAggregateAccumulator interface {
// AccumulateFirst receives an initial array of items to select from.
// It selects an item and stores the state. Afterwards, more data can
// be supplied with AccumulateMore and the results can be requested at
// any time. Without a call to AccumulateFirst the results are not
// defined.
AccumulateFirst(timestamps []int64, values []string, tags [][]byte)
// AccumulateMore receives additional array elements to select from.
AccumulateMore(timestamps []int64, values []string, tags [][]byte)
// Result returns the item selected from the data received so far.
Result() (int64, string, [][]byte)
}
// The selector method takes a ( timestamp, value ) pair, a
// ( []timestamp, []value ) pair, and a starting index. It applies the selector
// to the single value and the array, starting at the supplied index. It
// returns -1 if the single value is selected and a non-negative value if an
// item from the array is selected.
type stringSelectorMethod func(int64, string, []int64, []string, int) int
// The selector accumulator tracks currently-selected item.
type stringSelectorAccumulator struct {
selector stringSelectorMethod
ts int64
v string
tags [][]byte
}
func (a *stringSelectorAccumulator) AccumulateFirst(timestamps []int64, values []string, tags [][]byte) {
index := a.selector(timestamps[0], values[0], timestamps, values, 1)
if index < 0 {
a.ts = timestamps[0]
a.v = values[0]
} else {
a.ts = timestamps[index]
a.v = values[index]
}
a.tags = make([][]byte, len(tags))
copy(a.tags, tags)
}
func (a *stringSelectorAccumulator) AccumulateMore(timestamps []int64, values []string, tags [][]byte) {
index := a.selector(a.ts, a.v, timestamps, values, 0)
if index >= 0 {
a.ts = timestamps[index]
a.v = values[index]
if len(tags) > cap(a.tags) {
a.tags = make([][]byte, len(tags))
} else {
a.tags = a.tags[:len(tags)]
}
copy(a.tags, tags)
}
}
func (a *stringSelectorAccumulator) Result() (int64, string, [][]byte) {
return a.ts, a.v, a.tags
}
// makeStringAggregateAccumulator returns the interface implementation for
// aggregating returned points within the same group. The incoming points are
// the ones returned for each series and the struct returned here will
// aggregate the aggregates.
func makeStringAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (StringAggregateAccumulator, error) {
switch agg {
case datatypes.Aggregate_AggregateTypeFirst:
return &stringSelectorAccumulator{selector: selectorFirstGroupsString}, nil
case datatypes.Aggregate_AggregateTypeLast:
return &stringSelectorAccumulator{selector: selectorLastGroupsString}, nil
case datatypes.Aggregate_AggregateTypeCount:
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: "unsupported for aggregate count: String",
}
case datatypes.Aggregate_AggregateTypeSum:
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: "unsupported for aggregate sum: String",
}
case datatypes.Aggregate_AggregateTypeMin:
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: "unsupported for aggregate min: String",
}
case datatypes.Aggregate_AggregateTypeMax:
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: "unsupported for aggregate max: String",
}
default:
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg),
}
}
}
func selectorFirstGroupsString(ts int64, v string, timestamps []int64, values []string, i int) int {
index := -1
for ; i < len(values); i++ {
if ts > timestamps[i] {
index = i
ts = timestamps[i]
}
}
return index
}
func selectorLastGroupsString(ts int64, v string, timestamps []int64, values []string, i int) int {
index := -1
for ; i < len(values); i++ {
if ts <= timestamps[i] {
index = i
ts = timestamps[i]
}
}
return index
}
func (t *stringGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
if cur == nil {
continue
}
if typedCur, ok := cur.(cursors.StringArrayCursor); !ok {
// TODO(sgc): error or skip?
cur.Close()
t.err = &errors.Error{
Code: errors.EInvalid,
Err: &GroupCursorError{
typ: "string",
cursor: cur,
},
}
return false
} else {
t.readTags(t.gc.Tags())
t.cur = typedCur
return true
}
}
return false
}
func (t *stringGroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
}
//
// *********** Boolean ***********
//
type booleanTable struct {
table
mu sync.Mutex
cur cursors.BooleanArrayCursor
alloc *memory.Allocator
}
func newBooleanTable(
done chan struct{},
cur cursors.BooleanArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *booleanTable {
t := &booleanTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *booleanTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
t.mu.Unlock()
}
func (t *booleanTable) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return cursors.CursorStats{}
}
cs := cur.Stats()
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
}
func (t *booleanTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *booleanTable) advance() bool {
a := t.cur.Next()
l := a.Len()
if l == 0 {
return false
}
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
cr := t.allocateBuffer(l)
cr.cols[timeColIdx] = arrow.NewInt(a.Timestamps, t.alloc)
cr.cols[valueColIdx] = t.toArrowBuffer(a.Values)
t.appendTags(cr)
t.appendBounds(cr)
return true
}
// window table
type booleanWindowTable struct {
booleanTable
arr *cursors.BooleanArray
windowBounds interval.Bounds
idxInArr int
createEmpty bool
timeColumn string
isAggregate bool
window interval.Window
}
func newBooleanWindowTable(
done chan struct{},
cur cursors.BooleanArrayCursor,
bounds execute.Bounds,
window interval.Window,
createEmpty bool,
timeColumn string,
isAggregate bool,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *booleanWindowTable {
t := &booleanWindowTable{
booleanTable: booleanTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
window: window,
createEmpty: createEmpty,
timeColumn: timeColumn,
isAggregate: isAggregate,
}
if t.createEmpty {
start := int64(bounds.Start)
t.windowBounds = window.GetLatestBounds(values.Time(start))
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *booleanWindowTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
// createNextBufferTimes will read the timestamps from the array
// cursor and construct the values for the next buffer.
func (t *booleanWindowTable) createNextBufferTimes() (start, stop *array.Int, ok bool) {
startB := arrow.NewIntBuilder(t.alloc)
stopB := arrow.NewIntBuilder(t.alloc)
if t.createEmpty {
// There are no more windows when the start time is greater
// than or equal to the stop time.
if startT := int64(t.windowBounds.Start()); startT >= int64(t.bounds.Stop) {
return nil, nil, false
}
// Create a buffer with the buffer size.
// TODO(jsternberg): Calculate the exact size with max points as the maximum.
startB.Resize(storage.MaxPointsPerBlock)
stopB.Resize(storage.MaxPointsPerBlock)
for ; ; t.windowBounds = t.window.NextBounds(t.windowBounds) {
startT, stopT := t.getWindowBoundsFor(t.windowBounds)
if startT >= int64(t.bounds.Stop) {
break
}
startB.Append(startT)
stopB.Append(stopT)
}
start = startB.NewIntArray()
stop = stopB.NewIntArray()
return start, stop, true
}
// Retrieve the next buffer so we can copy the timestamps.
if !t.nextBuffer() {
return nil, nil, false
}
// Copy over the timestamps from the next buffer and adjust
// times for the boundaries.
startB.Resize(len(t.arr.Timestamps))
stopB.Resize(len(t.arr.Timestamps))
for _, stopT := range t.arr.Timestamps {
bounds := t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stopT)))
startT, stopT := t.getWindowBoundsFor(bounds)
startB.Append(startT)
stopB.Append(stopT)
}
start = startB.NewIntArray()
stop = stopB.NewIntArray()
return start, stop, true
}
func (t *booleanWindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64, int64) {
beg := int64(bounds.Start())
end := int64(bounds.Stop())
if beg < int64(t.bounds.Start) {
beg = int64(t.bounds.Start)
}
if end > int64(t.bounds.Stop) {
end = int64(t.bounds.Stop)
}
return beg, end
}
// nextAt will retrieve the next value that can be used with
// the given stop timestamp. If no values can be used with the timestamp,
// it will return the default value and false.
func (t *booleanWindowTable) nextAt(stop int64) (v bool, ok bool) {
if !t.nextBuffer() {
return
} else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) {
return
}
v, ok = t.arr.Values[t.idxInArr], true
t.idxInArr++
return v, ok
}
// isInWindow will check if the given time may be used within the window
// denoted by the stop timestamp. The stop may be a truncated stop time
// because of a restricted boundary.
//
// When used with an aggregate, ts will be the true stop time returned
// by storage. When used with an aggregate, it will be the real time
// for the point.
func (t *booleanWindowTable) isInWindow(stop int64, ts int64) bool {
// Retrieve the boundary associated with this stop time.
// This will be the boundary for the previous nanosecond.
bounds := t.window.GetLatestBounds(values.Time(stop - 1))
start, stop := int64(bounds.Start()), int64(bounds.Stop())
// For an aggregate, the timestamp will be the stop time of the boundary.
if t.isAggregate {
return start < ts && ts <= stop
}
// For a selector, the timestamp should be within the boundary.
return start <= ts && ts < stop
}
// nextBuffer will ensure the array cursor is filled
// and will return true if there is at least one value
// that can be read from it.
func (t *booleanWindowTable) nextBuffer() bool {
// Discard the current array cursor if we have
// exceeded it.
if t.arr != nil && t.idxInArr >= t.arr.Len() {
t.arr = nil
}
// Retrieve the next array cursor if needed.
if t.arr == nil {
arr := t.cur.Next()
if arr.Len() == 0 {
return false
}
t.arr, t.idxInArr = arr, 0
}
return true
}
// appendValues will scan the timestamps and append values
// that match those timestamps from the buffer.
func (t *booleanWindowTable) appendValues(intervals []int64, appendValue func(v bool), appendNull func()) {
for i := 0; i < len(intervals); i++ {
if v, ok := t.nextAt(intervals[i]); ok {
appendValue(v)
continue
}
appendNull()
}
}
func (t *booleanWindowTable) advance() bool {
if !t.nextBuffer() {
return false
}
// Create the timestamps for the next window.
start, stop, ok := t.createNextBufferTimes()
if !ok {
return false
}
values := t.mergeValues(stop.Int64Values())
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
cr := t.allocateBuffer(stop.Len())
if t.timeColumn != "" {
switch t.timeColumn {
case execute.DefaultStopColLabel:
cr.cols[timeColIdx] = stop
start.Release()
case execute.DefaultStartColLabel:
cr.cols[timeColIdx] = start
stop.Release()
}
cr.cols[valueColIdx] = values
t.appendBounds(cr)
} else {
cr.cols[startColIdx] = start
cr.cols[stopColIdx] = stop
cr.cols[valueColIdxWithoutTime] = values
}
t.appendTags(cr)
return true
}
// This table implementation will not have any empty windows.
type booleanWindowSelectorTable struct {
booleanTable
timeColumn string
window interval.Window
}
func newBooleanWindowSelectorTable(
done chan struct{},
cur cursors.BooleanArrayCursor,
bounds execute.Bounds,
window interval.Window,
timeColumn string,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *booleanWindowSelectorTable {
t := &booleanWindowSelectorTable{
booleanTable: booleanTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
window: window,
timeColumn: timeColumn,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *booleanWindowSelectorTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *booleanWindowSelectorTable) advance() bool {
arr := t.cur.Next()
if arr.Len() == 0 {
return false
}
cr := t.allocateBuffer(arr.Len())
switch t.timeColumn {
case execute.DefaultStartColLabel:
cr.cols[timeColIdx] = t.startTimes(arr)
t.appendBounds(cr)
case execute.DefaultStopColLabel:
cr.cols[timeColIdx] = t.stopTimes(arr)
t.appendBounds(cr)
default:
cr.cols[startColIdx] = t.startTimes(arr)
cr.cols[stopColIdx] = t.stopTimes(arr)
cr.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc)
}
cr.cols[valueColIdx] = t.toArrowBuffer(arr.Values)
t.appendTags(cr)
return true
}
func (t *booleanWindowSelectorTable) startTimes(arr *cursors.BooleanArray) *array.Int {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(arr.Len())
rangeStart := int64(t.bounds.Start)
for _, v := range arr.Timestamps {
if windowStart := int64(t.window.GetLatestBounds(values.Time(v)).Start()); windowStart < rangeStart {
start.Append(rangeStart)
} else {
start.Append(windowStart)
}
}
return start.NewIntArray()
}
func (t *booleanWindowSelectorTable) stopTimes(arr *cursors.BooleanArray) *array.Int {
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(arr.Len())
rangeStop := int64(t.bounds.Stop)
for _, v := range arr.Timestamps {
if windowStop := int64(t.window.GetLatestBounds(values.Time(v)).Stop()); windowStop > rangeStop {
stop.Append(rangeStop)
} else {
stop.Append(windowStop)
}
}
return stop.NewIntArray()
}
// This table implementation may contain empty windows
// in addition to non-empty windows.
type booleanEmptyWindowSelectorTable struct {
booleanTable
arr *cursors.BooleanArray
idx int
rangeStart int64
rangeStop int64
windowBounds interval.Bounds
timeColumn string
window interval.Window
}
func newBooleanEmptyWindowSelectorTable(
done chan struct{},
cur cursors.BooleanArrayCursor,
bounds execute.Bounds,
window interval.Window,
timeColumn string,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *booleanEmptyWindowSelectorTable {
rangeStart := int64(bounds.Start)
rangeStop := int64(bounds.Stop)
t := &booleanEmptyWindowSelectorTable{
booleanTable: booleanTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
cur: cur,
},
arr: cur.Next(),
rangeStart: rangeStart,
rangeStop: rangeStop,
windowBounds: window.GetLatestBounds(values.Time(rangeStart)),
window: window,
timeColumn: timeColumn,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *booleanEmptyWindowSelectorTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *booleanEmptyWindowSelectorTable) advance() bool {
if t.arr.Len() == 0 {
return false
}
values := t.arrowBuilder()
values.Resize(storage.MaxPointsPerBlock)
var cr *colReader
switch t.timeColumn {
case execute.DefaultStartColLabel:
start := t.startTimes(values)
cr = t.allocateBuffer(start.Len())
cr.cols[timeColIdx] = start
t.appendBounds(cr)
case execute.DefaultStopColLabel:
stop := t.stopTimes(values)
cr = t.allocateBuffer(stop.Len())
cr.cols[timeColIdx] = stop
t.appendBounds(cr)
default:
start, stop, time := t.startStopTimes(values)
cr = t.allocateBuffer(time.Len())
cr.cols[startColIdx] = start
cr.cols[stopColIdx] = stop
cr.cols[timeColIdx] = time
}
cr.cols[valueColIdx] = values.NewBooleanArray()
t.appendTags(cr)
return true
}
func (t *booleanEmptyWindowSelectorTable) startTimes(builder *array.BooleanBuilder) *array.Int {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(storage.MaxPointsPerBlock)
for int64(t.windowBounds.Start()) < t.rangeStop {
// The first window should start at the
// beginning of the time range.
if int64(t.windowBounds.Start()) < t.rangeStart {
start.Append(t.rangeStart)
} else {
start.Append(int64(t.windowBounds.Start()))
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
builder.AppendNull()
}
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if start.Len() == storage.MaxPointsPerBlock {
break
}
}
return start.NewIntArray()
}
func (t *booleanEmptyWindowSelectorTable) stopTimes(builder *array.BooleanBuilder) *array.Int {
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(storage.MaxPointsPerBlock)
for int64(t.windowBounds.Start()) < t.rangeStop {
// The last window should stop at the end of
// the time range.
if int64(t.windowBounds.Stop()) > t.rangeStop {
stop.Append(t.rangeStop)
} else {
stop.Append(int64(t.windowBounds.Stop()))
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
builder.AppendNull()
}
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if stop.Len() == storage.MaxPointsPerBlock {
break
}
}
return stop.NewIntArray()
}
func (t *booleanEmptyWindowSelectorTable) startStopTimes(builder *array.BooleanBuilder) (*array.Int, *array.Int, *array.Int) {
start := arrow.NewIntBuilder(t.alloc)
start.Resize(storage.MaxPointsPerBlock)
stop := arrow.NewIntBuilder(t.alloc)
stop.Resize(storage.MaxPointsPerBlock)
time := arrow.NewIntBuilder(t.alloc)
time.Resize(storage.MaxPointsPerBlock)
for int64(t.windowBounds.Start()) < t.rangeStop {
// The first window should start at the
// beginning of the time range.
if int64(t.windowBounds.Start()) < t.rangeStart {
start.Append(t.rangeStart)
} else {
start.Append(int64(t.windowBounds.Start()))
}
// The last window should stop at the end of
// the time range.
if int64(t.windowBounds.Stop()) > t.rangeStop {
stop.Append(t.rangeStop)
} else {
stop.Append(int64(t.windowBounds.Stop()))
}
var v int64
if t.arr.Len() == 0 {
v = math.MaxInt64
} else {
v = t.arr.Timestamps[t.idx]
}
// If the current timestamp falls within the
// current window, append the value to the
// builder, otherwise append a null value.
if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) {
time.Append(v)
t.append(builder, t.arr.Values[t.idx])
t.idx++
} else {
time.AppendNull()
builder.AppendNull()
}
t.windowBounds = t.window.NextBounds(t.windowBounds)
// If the current array is non-empty and has
// been read in its entirety, call Next().
if t.arr.Len() > 0 && t.idx == t.arr.Len() {
t.arr = t.cur.Next()
t.idx = 0
}
if time.Len() == storage.MaxPointsPerBlock {
break
}
}
return start.NewIntArray(), stop.NewIntArray(), time.NewIntArray()
}
// group table
type booleanGroupTable struct {
table
mu sync.Mutex
gc storage.GroupCursor
cur cursors.BooleanArrayCursor
}
func newBooleanGroupTable(
done chan struct{},
gc storage.GroupCursor,
cur cursors.BooleanArrayCursor,
bounds execute.Bounds,
key flux.GroupKey,
cols []flux.ColMeta,
tags models.Tags,
defs [][]byte,
cache *tagsCache,
alloc *memory.Allocator,
) *booleanGroupTable {
t := &booleanGroupTable{
table: newTable(done, bounds, key, cols, defs, cache, alloc),
gc: gc,
cur: cur,
}
t.readTags(tags)
t.init(t.advance)
return t
}
func (t *booleanGroupTable) Close() {
t.mu.Lock()
if t.cur != nil {
t.cur.Close()
t.cur = nil
}
if t.gc != nil {
t.gc.Close()
t.gc = nil
}
t.mu.Unlock()
}
func (t *booleanGroupTable) Do(f func(flux.ColReader) error) error {
return t.do(f, t.advance)
}
func (t *booleanGroupTable) advance() bool {
if t.cur == nil {
// For group aggregates, we will try to get all the series and all table buffers within those series
// all at once and merge them into one row when this advance() function is first called.
// At the end of this process, t.advanceCursor() already returns false and t.cur becomes nil.
// But we still need to return true to indicate that there is data to be returned.
// The second time when we call this advance(), t.cur is already nil, so we directly return false.
return false
}
var arr *cursors.BooleanArray
var len int
for {
arr = t.cur.Next()
len = arr.Len()
if len > 0 {
break
}
if !t.advanceCursor() {
return false
}
}
// handle the group without aggregate case
if t.gc.Aggregate() == nil {
// Retrieve the buffer for the data to avoid allocating
// additional slices. If the buffer is still being used
// because the references were retained, then we will
// allocate a new buffer.
colReader := t.allocateBuffer(len)
colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc)
colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values)
t.appendTags(colReader)
t.appendBounds(colReader)
return true
}
aggregate, err := makeBooleanAggregateAccumulator(t.gc.Aggregate().Type)
if err != nil {
t.err = err
return false
}
aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags)
for {
arr = t.cur.Next()
if arr.Len() > 0 {
aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags)
continue
}
if !t.advanceCursor() {
break
}
}
timestamp, value, tags := aggregate.Result()
colReader := t.allocateBuffer(1)
if IsSelector(t.gc.Aggregate()) {
colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc)
colReader.cols[valueColIdx] = t.toArrowBuffer([]bool{value})
} else {
colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]bool{value})
}
t.appendTheseTags(colReader, tags)
t.appendBounds(colReader)
return true
}
type BooleanAggregateAccumulator interface {
// AccumulateFirst receives an initial array of items to select from.
// It selects an item and stores the state. Afterwards, more data can
// be supplied with AccumulateMore and the results can be requested at
// any time. Without a call to AccumulateFirst the results are not
// defined.
AccumulateFirst(timestamps []int64, values []bool, tags [][]byte)
// AccumulateMore receives additional array elements to select from.
AccumulateMore(timestamps []int64, values []bool, tags [][]byte)
// Result returns the item selected from the data received so far.
Result() (int64, bool, [][]byte)
}
// The selector method takes a ( timestamp, value ) pair, a
// ( []timestamp, []value ) pair, and a starting index. It applies the selector
// to the single value and the array, starting at the supplied index. It
// returns -1 if the single value is selected and a non-negative value if an
// item from the array is selected.
type booleanSelectorMethod func(int64, bool, []int64, []bool, int) int
// The selector accumulator tracks currently-selected item.
type booleanSelectorAccumulator struct {
selector booleanSelectorMethod
ts int64
v bool
tags [][]byte
}
func (a *booleanSelectorAccumulator) AccumulateFirst(timestamps []int64, values []bool, tags [][]byte) {
index := a.selector(timestamps[0], values[0], timestamps, values, 1)
if index < 0 {
a.ts = timestamps[0]
a.v = values[0]
} else {
a.ts = timestamps[index]
a.v = values[index]
}
a.tags = make([][]byte, len(tags))
copy(a.tags, tags)
}
func (a *booleanSelectorAccumulator) AccumulateMore(timestamps []int64, values []bool, tags [][]byte) {
index := a.selector(a.ts, a.v, timestamps, values, 0)
if index >= 0 {
a.ts = timestamps[index]
a.v = values[index]
if len(tags) > cap(a.tags) {
a.tags = make([][]byte, len(tags))
} else {
a.tags = a.tags[:len(tags)]
}
copy(a.tags, tags)
}
}
func (a *booleanSelectorAccumulator) Result() (int64, bool, [][]byte) {
return a.ts, a.v, a.tags
}
// makeBooleanAggregateAccumulator returns the interface implementation for
// aggregating returned points within the same group. The incoming points are
// the ones returned for each series and the struct returned here will
// aggregate the aggregates.
func makeBooleanAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (BooleanAggregateAccumulator, error) {
switch agg {
case datatypes.Aggregate_AggregateTypeFirst:
return &booleanSelectorAccumulator{selector: selectorFirstGroupsBoolean}, nil
case datatypes.Aggregate_AggregateTypeLast:
return &booleanSelectorAccumulator{selector: selectorLastGroupsBoolean}, nil
case datatypes.Aggregate_AggregateTypeCount:
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: "unsupported for aggregate count: Boolean",
}
case datatypes.Aggregate_AggregateTypeSum:
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: "unsupported for aggregate sum: Boolean",
}
case datatypes.Aggregate_AggregateTypeMin:
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: "unsupported for aggregate min: Boolean",
}
case datatypes.Aggregate_AggregateTypeMax:
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: "unsupported for aggregate max: Boolean",
}
default:
return nil, &errors.Error{
Code: errors.EInvalid,
Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg),
}
}
}
func selectorFirstGroupsBoolean(ts int64, v bool, timestamps []int64, values []bool, i int) int {
index := -1
for ; i < len(values); i++ {
if ts > timestamps[i] {
index = i
ts = timestamps[i]
}
}
return index
}
func selectorLastGroupsBoolean(ts int64, v bool, timestamps []int64, values []bool, i int) int {
index := -1
for ; i < len(values); i++ {
if ts <= timestamps[i] {
index = i
ts = timestamps[i]
}
}
return index
}
func (t *booleanGroupTable) advanceCursor() bool {
t.cur.Close()
t.cur = nil
for t.gc.Next() {
cur := t.gc.Cursor()
if cur == nil {
continue
}
if typedCur, ok := cur.(cursors.BooleanArrayCursor); !ok {
// TODO(sgc): error or skip?
cur.Close()
t.err = &errors.Error{
Code: errors.EInvalid,
Err: &GroupCursorError{
typ: "boolean",
cursor: cur,
},
}
return false
} else {
t.readTags(t.gc.Tags())
t.cur = typedCur
return true
}
}
return false
}
func (t *booleanGroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
}