// Generated by tmpl // https://github.com/benbjohnson/tmpl // // DO NOT EDIT! // Source: table.gen.go.tmpl package storageflux import ( "fmt" "math" "sync" "github.com/influxdata/flux" "github.com/influxdata/flux/array" "github.com/influxdata/flux/arrow" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/interval" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/values" "github.com/influxdata/influxdb/v2/kit/platform/errors" "github.com/influxdata/influxdb/v2/models" storage "github.com/influxdata/influxdb/v2/storage/reads" "github.com/influxdata/influxdb/v2/storage/reads/datatypes" "github.com/influxdata/influxdb/v2/tsdb/cursors" ) // // *********** Float *********** // type floatTable struct { table mu sync.Mutex cur cursors.FloatArrayCursor alloc *memory.Allocator } func newFloatTable( done chan struct{}, cur cursors.FloatArrayCursor, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *floatTable { t := &floatTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, } t.readTags(tags) t.init(t.advance) return t } func (t *floatTable) Close() { t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } t.mu.Unlock() } func (t *floatTable) Statistics() cursors.CursorStats { t.mu.Lock() defer t.mu.Unlock() cur := t.cur if cur == nil { return cursors.CursorStats{} } cs := cur.Stats() return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } } func (t *floatTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *floatTable) advance() bool { a := t.cur.Next() l := a.Len() if l == 0 { return false } // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. cr := t.allocateBuffer(l) cr.cols[timeColIdx] = arrow.NewInt(a.Timestamps, t.alloc) cr.cols[valueColIdx] = t.toArrowBuffer(a.Values) t.appendTags(cr) t.appendBounds(cr) return true } // window table type floatWindowTable struct { floatTable arr *cursors.FloatArray windowBounds interval.Bounds idxInArr int createEmpty bool timeColumn string isAggregate bool window interval.Window } func newFloatWindowTable( done chan struct{}, cur cursors.FloatArrayCursor, bounds execute.Bounds, window interval.Window, createEmpty bool, timeColumn string, isAggregate bool, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *floatWindowTable { t := &floatWindowTable{ floatTable: floatTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, window: window, createEmpty: createEmpty, timeColumn: timeColumn, isAggregate: isAggregate, } if t.createEmpty { start := int64(bounds.Start) t.windowBounds = window.GetLatestBounds(values.Time(start)) } t.readTags(tags) t.init(t.advance) return t } func (t *floatWindowTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } // createNextBufferTimes will read the timestamps from the array // cursor and construct the values for the next buffer. func (t *floatWindowTable) createNextBufferTimes() (start, stop *array.Int, ok bool) { startB := arrow.NewIntBuilder(t.alloc) stopB := arrow.NewIntBuilder(t.alloc) if t.createEmpty { // There are no more windows when the start time is greater // than or equal to the stop time. if startT := int64(t.windowBounds.Start()); startT >= int64(t.bounds.Stop) { return nil, nil, false } // Create a buffer with the buffer size. // TODO(jsternberg): Calculate the exact size with max points as the maximum. startB.Resize(storage.MaxPointsPerBlock) stopB.Resize(storage.MaxPointsPerBlock) for ; ; t.windowBounds = t.window.NextBounds(t.windowBounds) { startT, stopT := t.getWindowBoundsFor(t.windowBounds) if startT >= int64(t.bounds.Stop) { break } startB.Append(startT) stopB.Append(stopT) } start = startB.NewIntArray() stop = stopB.NewIntArray() return start, stop, true } // Retrieve the next buffer so we can copy the timestamps. if !t.nextBuffer() { return nil, nil, false } // Copy over the timestamps from the next buffer and adjust // times for the boundaries. startB.Resize(len(t.arr.Timestamps)) stopB.Resize(len(t.arr.Timestamps)) for _, stopT := range t.arr.Timestamps { bounds := t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stopT))) startT, stopT := t.getWindowBoundsFor(bounds) startB.Append(startT) stopB.Append(stopT) } start = startB.NewIntArray() stop = stopB.NewIntArray() return start, stop, true } func (t *floatWindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64, int64) { beg := int64(bounds.Start()) end := int64(bounds.Stop()) if beg < int64(t.bounds.Start) { beg = int64(t.bounds.Start) } if end > int64(t.bounds.Stop) { end = int64(t.bounds.Stop) } return beg, end } // nextAt will retrieve the next value that can be used with // the given stop timestamp. If no values can be used with the timestamp, // it will return the default value and false. func (t *floatWindowTable) nextAt(stop int64) (v float64, ok bool) { if !t.nextBuffer() { return } else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) { return } v, ok = t.arr.Values[t.idxInArr], true t.idxInArr++ return v, ok } // isInWindow will check if the given time may be used within the window // denoted by the stop timestamp. The stop may be a truncated stop time // because of a restricted boundary. // // When used with an aggregate, ts will be the true stop time returned // by storage. When used with an aggregate, it will be the real time // for the point. func (t *floatWindowTable) isInWindow(stop int64, ts int64) bool { // Retrieve the boundary associated with this stop time. // This will be the boundary for the previous nanosecond. bounds := t.window.GetLatestBounds(values.Time(stop - 1)) start, stop := int64(bounds.Start()), int64(bounds.Stop()) // For an aggregate, the timestamp will be the stop time of the boundary. if t.isAggregate { return start < ts && ts <= stop } // For a selector, the timestamp should be within the boundary. return start <= ts && ts < stop } // nextBuffer will ensure the array cursor is filled // and will return true if there is at least one value // that can be read from it. func (t *floatWindowTable) nextBuffer() bool { // Discard the current array cursor if we have // exceeded it. if t.arr != nil && t.idxInArr >= t.arr.Len() { t.arr = nil } // Retrieve the next array cursor if needed. if t.arr == nil { arr := t.cur.Next() if arr.Len() == 0 { return false } t.arr, t.idxInArr = arr, 0 } return true } // appendValues will scan the timestamps and append values // that match those timestamps from the buffer. func (t *floatWindowTable) appendValues(intervals []int64, appendValue func(v float64), appendNull func()) { for i := 0; i < len(intervals); i++ { if v, ok := t.nextAt(intervals[i]); ok { appendValue(v) continue } appendNull() } } func (t *floatWindowTable) advance() bool { if !t.nextBuffer() { return false } // Create the timestamps for the next window. start, stop, ok := t.createNextBufferTimes() if !ok { return false } values := t.mergeValues(stop.Int64Values()) // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. cr := t.allocateBuffer(stop.Len()) if t.timeColumn != "" { switch t.timeColumn { case execute.DefaultStopColLabel: cr.cols[timeColIdx] = stop start.Release() case execute.DefaultStartColLabel: cr.cols[timeColIdx] = start stop.Release() } cr.cols[valueColIdx] = values t.appendBounds(cr) } else { cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[valueColIdxWithoutTime] = values } t.appendTags(cr) return true } // This table implementation will not have any empty windows. type floatWindowSelectorTable struct { floatTable timeColumn string window interval.Window } func newFloatWindowSelectorTable( done chan struct{}, cur cursors.FloatArrayCursor, bounds execute.Bounds, window interval.Window, timeColumn string, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *floatWindowSelectorTable { t := &floatWindowSelectorTable{ floatTable: floatTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, window: window, timeColumn: timeColumn, } t.readTags(tags) t.init(t.advance) return t } func (t *floatWindowSelectorTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *floatWindowSelectorTable) advance() bool { arr := t.cur.Next() if arr.Len() == 0 { return false } cr := t.allocateBuffer(arr.Len()) switch t.timeColumn { case execute.DefaultStartColLabel: cr.cols[timeColIdx] = t.startTimes(arr) t.appendBounds(cr) case execute.DefaultStopColLabel: cr.cols[timeColIdx] = t.stopTimes(arr) t.appendBounds(cr) default: cr.cols[startColIdx] = t.startTimes(arr) cr.cols[stopColIdx] = t.stopTimes(arr) cr.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) } cr.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(cr) return true } func (t *floatWindowSelectorTable) startTimes(arr *cursors.FloatArray) *array.Int { start := arrow.NewIntBuilder(t.alloc) start.Resize(arr.Len()) rangeStart := int64(t.bounds.Start) for _, v := range arr.Timestamps { if windowStart := int64(t.window.GetLatestBounds(values.Time(v)).Start()); windowStart < rangeStart { start.Append(rangeStart) } else { start.Append(windowStart) } } return start.NewIntArray() } func (t *floatWindowSelectorTable) stopTimes(arr *cursors.FloatArray) *array.Int { stop := arrow.NewIntBuilder(t.alloc) stop.Resize(arr.Len()) rangeStop := int64(t.bounds.Stop) for _, v := range arr.Timestamps { if windowStop := int64(t.window.GetLatestBounds(values.Time(v)).Stop()); windowStop > rangeStop { stop.Append(rangeStop) } else { stop.Append(windowStop) } } return stop.NewIntArray() } // This table implementation may contain empty windows // in addition to non-empty windows. type floatEmptyWindowSelectorTable struct { floatTable arr *cursors.FloatArray idx int rangeStart int64 rangeStop int64 windowBounds interval.Bounds timeColumn string window interval.Window } func newFloatEmptyWindowSelectorTable( done chan struct{}, cur cursors.FloatArrayCursor, bounds execute.Bounds, window interval.Window, timeColumn string, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *floatEmptyWindowSelectorTable { rangeStart := int64(bounds.Start) rangeStop := int64(bounds.Stop) t := &floatEmptyWindowSelectorTable{ floatTable: floatTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, arr: cur.Next(), rangeStart: rangeStart, rangeStop: rangeStop, windowBounds: window.GetLatestBounds(values.Time(rangeStart)), window: window, timeColumn: timeColumn, } t.readTags(tags) t.init(t.advance) return t } func (t *floatEmptyWindowSelectorTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *floatEmptyWindowSelectorTable) advance() bool { if t.arr.Len() == 0 { return false } values := t.arrowBuilder() values.Resize(storage.MaxPointsPerBlock) var cr *colReader switch t.timeColumn { case execute.DefaultStartColLabel: start := t.startTimes(values) cr = t.allocateBuffer(start.Len()) cr.cols[timeColIdx] = start t.appendBounds(cr) case execute.DefaultStopColLabel: stop := t.stopTimes(values) cr = t.allocateBuffer(stop.Len()) cr.cols[timeColIdx] = stop t.appendBounds(cr) default: start, stop, time := t.startStopTimes(values) cr = t.allocateBuffer(time.Len()) cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[timeColIdx] = time } cr.cols[valueColIdx] = values.NewFloatArray() t.appendTags(cr) return true } func (t *floatEmptyWindowSelectorTable) startTimes(builder *array.FloatBuilder) *array.Int { start := arrow.NewIntBuilder(t.alloc) start.Resize(storage.MaxPointsPerBlock) for int64(t.windowBounds.Start()) < t.rangeStop { // The first window should start at the // beginning of the time range. if int64(t.windowBounds.Start()) < t.rangeStart { start.Append(t.rangeStart) } else { start.Append(int64(t.windowBounds.Start())) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) { t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { builder.AppendNull() } t.windowBounds = t.window.NextBounds(t.windowBounds) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if start.Len() == storage.MaxPointsPerBlock { break } } return start.NewIntArray() } func (t *floatEmptyWindowSelectorTable) stopTimes(builder *array.FloatBuilder) *array.Int { stop := arrow.NewIntBuilder(t.alloc) stop.Resize(storage.MaxPointsPerBlock) for int64(t.windowBounds.Start()) < t.rangeStop { // The last window should stop at the end of // the time range. if int64(t.windowBounds.Stop()) > t.rangeStop { stop.Append(t.rangeStop) } else { stop.Append(int64(t.windowBounds.Stop())) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) { t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { builder.AppendNull() } t.windowBounds = t.window.NextBounds(t.windowBounds) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if stop.Len() == storage.MaxPointsPerBlock { break } } return stop.NewIntArray() } func (t *floatEmptyWindowSelectorTable) startStopTimes(builder *array.FloatBuilder) (*array.Int, *array.Int, *array.Int) { start := arrow.NewIntBuilder(t.alloc) start.Resize(storage.MaxPointsPerBlock) stop := arrow.NewIntBuilder(t.alloc) stop.Resize(storage.MaxPointsPerBlock) time := arrow.NewIntBuilder(t.alloc) time.Resize(storage.MaxPointsPerBlock) for int64(t.windowBounds.Start()) < t.rangeStop { // The first window should start at the // beginning of the time range. if int64(t.windowBounds.Start()) < t.rangeStart { start.Append(t.rangeStart) } else { start.Append(int64(t.windowBounds.Start())) } // The last window should stop at the end of // the time range. if int64(t.windowBounds.Stop()) > t.rangeStop { stop.Append(t.rangeStop) } else { stop.Append(int64(t.windowBounds.Stop())) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) { time.Append(v) t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { time.AppendNull() builder.AppendNull() } t.windowBounds = t.window.NextBounds(t.windowBounds) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if time.Len() == storage.MaxPointsPerBlock { break } } return start.NewIntArray(), stop.NewIntArray(), time.NewIntArray() } // group table type floatGroupTable struct { table mu sync.Mutex gc storage.GroupCursor cur cursors.FloatArrayCursor } func newFloatGroupTable( done chan struct{}, gc storage.GroupCursor, cur cursors.FloatArrayCursor, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *floatGroupTable { t := &floatGroupTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), gc: gc, cur: cur, } t.readTags(tags) t.init(t.advance) return t } func (t *floatGroupTable) Close() { t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } if t.gc != nil { t.gc.Close() t.gc = nil } t.mu.Unlock() } func (t *floatGroupTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *floatGroupTable) advance() bool { if t.cur == nil { // For group aggregates, we will try to get all the series and all table buffers within those series // all at once and merge them into one row when this advance() function is first called. // At the end of this process, t.advanceCursor() already returns false and t.cur becomes nil. // But we still need to return true to indicate that there is data to be returned. // The second time when we call this advance(), t.cur is already nil, so we directly return false. return false } var arr *cursors.FloatArray var len int for { arr = t.cur.Next() len = arr.Len() if len > 0 { break } if !t.advanceCursor() { return false } } // handle the group without aggregate case if t.gc.Aggregate() == nil { // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. colReader := t.allocateBuffer(len) colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(colReader) t.appendBounds(colReader) return true } aggregate, err := makeFloatAggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } if !t.advanceCursor() { break } } timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer([]float64{value}) } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]float64{value}) } t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } type FloatAggregateAccumulator interface { // AccumulateFirst receives an initial array of items to select from. // It selects an item and stores the state. Afterwards, more data can // be supplied with AccumulateMore and the results can be requested at // any time. Without a call to AccumulateFirst the results are not // defined. AccumulateFirst(timestamps []int64, values []float64, tags [][]byte) // AccumulateMore receives additional array elements to select from. AccumulateMore(timestamps []int64, values []float64, tags [][]byte) // Result returns the item selected from the data received so far. Result() (int64, float64, [][]byte) } // The selector method takes a ( timestamp, value ) pair, a // ( []timestamp, []value ) pair, and a starting index. It applies the selector // to the single value and the array, starting at the supplied index. It // returns -1 if the single value is selected and a non-negative value if an // item from the array is selected. type floatSelectorMethod func(int64, float64, []int64, []float64, int) int // The selector accumulator tracks currently-selected item. type floatSelectorAccumulator struct { selector floatSelectorMethod ts int64 v float64 tags [][]byte } func (a *floatSelectorAccumulator) AccumulateFirst(timestamps []int64, values []float64, tags [][]byte) { index := a.selector(timestamps[0], values[0], timestamps, values, 1) if index < 0 { a.ts = timestamps[0] a.v = values[0] } else { a.ts = timestamps[index] a.v = values[index] } a.tags = make([][]byte, len(tags)) copy(a.tags, tags) } func (a *floatSelectorAccumulator) AccumulateMore(timestamps []int64, values []float64, tags [][]byte) { index := a.selector(a.ts, a.v, timestamps, values, 0) if index >= 0 { a.ts = timestamps[index] a.v = values[index] if len(tags) > cap(a.tags) { a.tags = make([][]byte, len(tags)) } else { a.tags = a.tags[:len(tags)] } copy(a.tags, tags) } } func (a *floatSelectorAccumulator) Result() (int64, float64, [][]byte) { return a.ts, a.v, a.tags } // The aggregate method takes a value, an array of values, and a starting // index, applies an aggregate operation over the value and the array, starting // at the given index, and returns the result. type floatAggregateMethod func(float64, []float64, int) float64 type floatAggregateAccumulator struct { aggregate floatAggregateMethod accum float64 // For pure aggregates it doesn't matter what we return for tags, but // we need to satisfy the interface. We will just return the most // recently seen tags. tags [][]byte } func (a *floatAggregateAccumulator) AccumulateFirst(timestamps []int64, values []float64, tags [][]byte) { a.accum = a.aggregate(values[0], values, 1) a.tags = tags } func (a *floatAggregateAccumulator) AccumulateMore(timestamps []int64, values []float64, tags [][]byte) { a.accum = a.aggregate(a.accum, values, 0) a.tags = tags } // For group aggregates (non-selectors), the timestamp is always math.MaxInt64. // their final result does not contain _time, so this timestamp value can be // anything and it won't matter. func (a *floatAggregateAccumulator) Result() (int64, float64, [][]byte) { return math.MaxInt64, a.accum, a.tags } // makeFloatAggregateAccumulator returns the interface implementation for // aggregating returned points within the same group. The incoming points are // the ones returned for each series and the struct returned here will // aggregate the aggregates. func makeFloatAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (FloatAggregateAccumulator, error) { switch agg { case datatypes.Aggregate_AggregateTypeFirst: return &floatSelectorAccumulator{selector: selectorFirstGroupsFloat}, nil case datatypes.Aggregate_AggregateTypeLast: return &floatSelectorAccumulator{selector: selectorLastGroupsFloat}, nil case datatypes.Aggregate_AggregateTypeCount: return nil, &errors.Error{ Code: errors.EInvalid, Msg: "unsupported for aggregate count: Float", } case datatypes.Aggregate_AggregateTypeSum: return &floatAggregateAccumulator{aggregate: aggregateSumGroupsFloat}, nil case datatypes.Aggregate_AggregateTypeMin: return &floatSelectorAccumulator{selector: selectorMinGroupsFloat}, nil case datatypes.Aggregate_AggregateTypeMax: return &floatSelectorAccumulator{selector: selectorMaxGroupsFloat}, nil default: return nil, &errors.Error{ Code: errors.EInvalid, Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg), } } } func selectorMinGroupsFloat(ts int64, v float64, timestamps []int64, values []float64, i int) int { index := -1 for ; i < len(values); i++ { if v > values[i] { index = i v = values[i] } } return index } func selectorMaxGroupsFloat(ts int64, v float64, timestamps []int64, values []float64, i int) int { index := -1 for ; i < len(values); i++ { if v < values[i] { index = i v = values[i] } } return index } func aggregateSumGroupsFloat(sum float64, values []float64, i int) float64 { for ; i < len(values); i++ { sum += values[i] } return sum } func selectorFirstGroupsFloat(ts int64, v float64, timestamps []int64, values []float64, i int) int { index := -1 for ; i < len(values); i++ { if ts > timestamps[i] { index = i ts = timestamps[i] } } return index } func selectorLastGroupsFloat(ts int64, v float64, timestamps []int64, values []float64, i int) int { index := -1 for ; i < len(values); i++ { if ts <= timestamps[i] { index = i ts = timestamps[i] } } return index } func (t *floatGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { cur := t.gc.Cursor() if cur == nil { continue } if typedCur, ok := cur.(cursors.FloatArrayCursor); !ok { // TODO(sgc): error or skip? cur.Close() t.err = &errors.Error{ Code: errors.EInvalid, Err: &GroupCursorError{ typ: "float", cursor: cur, }, } return false } else { t.readTags(t.gc.Tags()) t.cur = typedCur return true } } return false } func (t *floatGroupTable) Statistics() cursors.CursorStats { if t.cur == nil { return cursors.CursorStats{} } cs := t.cur.Stats() return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } } // // *********** Integer *********** // type integerTable struct { table mu sync.Mutex cur cursors.IntegerArrayCursor alloc *memory.Allocator } func newIntegerTable( done chan struct{}, cur cursors.IntegerArrayCursor, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *integerTable { t := &integerTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, } t.readTags(tags) t.init(t.advance) return t } func (t *integerTable) Close() { t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } t.mu.Unlock() } func (t *integerTable) Statistics() cursors.CursorStats { t.mu.Lock() defer t.mu.Unlock() cur := t.cur if cur == nil { return cursors.CursorStats{} } cs := cur.Stats() return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } } func (t *integerTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *integerTable) advance() bool { a := t.cur.Next() l := a.Len() if l == 0 { return false } // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. cr := t.allocateBuffer(l) cr.cols[timeColIdx] = arrow.NewInt(a.Timestamps, t.alloc) cr.cols[valueColIdx] = t.toArrowBuffer(a.Values) t.appendTags(cr) t.appendBounds(cr) return true } // window table type integerWindowTable struct { integerTable arr *cursors.IntegerArray windowBounds interval.Bounds idxInArr int createEmpty bool timeColumn string isAggregate bool window interval.Window fillValue *int64 } func newIntegerWindowTable( done chan struct{}, cur cursors.IntegerArrayCursor, bounds execute.Bounds, window interval.Window, createEmpty bool, timeColumn string, isAggregate bool, fillValue *int64, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *integerWindowTable { t := &integerWindowTable{ integerTable: integerTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, window: window, createEmpty: createEmpty, timeColumn: timeColumn, isAggregate: isAggregate, fillValue: fillValue, } if t.createEmpty { start := int64(bounds.Start) t.windowBounds = window.GetLatestBounds(values.Time(start)) } t.readTags(tags) t.init(t.advance) return t } func (t *integerWindowTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } // createNextBufferTimes will read the timestamps from the array // cursor and construct the values for the next buffer. func (t *integerWindowTable) createNextBufferTimes() (start, stop *array.Int, ok bool) { startB := arrow.NewIntBuilder(t.alloc) stopB := arrow.NewIntBuilder(t.alloc) if t.createEmpty { // There are no more windows when the start time is greater // than or equal to the stop time. if startT := int64(t.windowBounds.Start()); startT >= int64(t.bounds.Stop) { return nil, nil, false } // Create a buffer with the buffer size. // TODO(jsternberg): Calculate the exact size with max points as the maximum. startB.Resize(storage.MaxPointsPerBlock) stopB.Resize(storage.MaxPointsPerBlock) for ; ; t.windowBounds = t.window.NextBounds(t.windowBounds) { startT, stopT := t.getWindowBoundsFor(t.windowBounds) if startT >= int64(t.bounds.Stop) { break } startB.Append(startT) stopB.Append(stopT) } start = startB.NewIntArray() stop = stopB.NewIntArray() return start, stop, true } // Retrieve the next buffer so we can copy the timestamps. if !t.nextBuffer() { return nil, nil, false } // Copy over the timestamps from the next buffer and adjust // times for the boundaries. startB.Resize(len(t.arr.Timestamps)) stopB.Resize(len(t.arr.Timestamps)) for _, stopT := range t.arr.Timestamps { bounds := t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stopT))) startT, stopT := t.getWindowBoundsFor(bounds) startB.Append(startT) stopB.Append(stopT) } start = startB.NewIntArray() stop = stopB.NewIntArray() return start, stop, true } func (t *integerWindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64, int64) { beg := int64(bounds.Start()) end := int64(bounds.Stop()) if beg < int64(t.bounds.Start) { beg = int64(t.bounds.Start) } if end > int64(t.bounds.Stop) { end = int64(t.bounds.Stop) } return beg, end } // nextAt will retrieve the next value that can be used with // the given stop timestamp. If no values can be used with the timestamp, // it will return the default value and false. func (t *integerWindowTable) nextAt(stop int64) (v int64, ok bool) { if !t.nextBuffer() { return } else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) { return } v, ok = t.arr.Values[t.idxInArr], true t.idxInArr++ return v, ok } // isInWindow will check if the given time may be used within the window // denoted by the stop timestamp. The stop may be a truncated stop time // because of a restricted boundary. // // When used with an aggregate, ts will be the true stop time returned // by storage. When used with an aggregate, it will be the real time // for the point. func (t *integerWindowTable) isInWindow(stop int64, ts int64) bool { // Retrieve the boundary associated with this stop time. // This will be the boundary for the previous nanosecond. bounds := t.window.GetLatestBounds(values.Time(stop - 1)) start, stop := int64(bounds.Start()), int64(bounds.Stop()) // For an aggregate, the timestamp will be the stop time of the boundary. if t.isAggregate { return start < ts && ts <= stop } // For a selector, the timestamp should be within the boundary. return start <= ts && ts < stop } // nextBuffer will ensure the array cursor is filled // and will return true if there is at least one value // that can be read from it. func (t *integerWindowTable) nextBuffer() bool { // Discard the current array cursor if we have // exceeded it. if t.arr != nil && t.idxInArr >= t.arr.Len() { t.arr = nil } // Retrieve the next array cursor if needed. if t.arr == nil { arr := t.cur.Next() if arr.Len() == 0 { return false } t.arr, t.idxInArr = arr, 0 } return true } // appendValues will scan the timestamps and append values // that match those timestamps from the buffer. func (t *integerWindowTable) appendValues(intervals []int64, appendValue func(v int64), appendNull func()) { for i := 0; i < len(intervals); i++ { if v, ok := t.nextAt(intervals[i]); ok { appendValue(v) continue } appendNull() } } func (t *integerWindowTable) advance() bool { if !t.nextBuffer() { return false } // Create the timestamps for the next window. start, stop, ok := t.createNextBufferTimes() if !ok { return false } values := t.mergeValues(stop.Int64Values()) // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. cr := t.allocateBuffer(stop.Len()) if t.timeColumn != "" { switch t.timeColumn { case execute.DefaultStopColLabel: cr.cols[timeColIdx] = stop start.Release() case execute.DefaultStartColLabel: cr.cols[timeColIdx] = start stop.Release() } cr.cols[valueColIdx] = values t.appendBounds(cr) } else { cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[valueColIdxWithoutTime] = values } t.appendTags(cr) return true } // This table implementation will not have any empty windows. type integerWindowSelectorTable struct { integerTable timeColumn string window interval.Window } func newIntegerWindowSelectorTable( done chan struct{}, cur cursors.IntegerArrayCursor, bounds execute.Bounds, window interval.Window, timeColumn string, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *integerWindowSelectorTable { t := &integerWindowSelectorTable{ integerTable: integerTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, window: window, timeColumn: timeColumn, } t.readTags(tags) t.init(t.advance) return t } func (t *integerWindowSelectorTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *integerWindowSelectorTable) advance() bool { arr := t.cur.Next() if arr.Len() == 0 { return false } cr := t.allocateBuffer(arr.Len()) switch t.timeColumn { case execute.DefaultStartColLabel: cr.cols[timeColIdx] = t.startTimes(arr) t.appendBounds(cr) case execute.DefaultStopColLabel: cr.cols[timeColIdx] = t.stopTimes(arr) t.appendBounds(cr) default: cr.cols[startColIdx] = t.startTimes(arr) cr.cols[stopColIdx] = t.stopTimes(arr) cr.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) } cr.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(cr) return true } func (t *integerWindowSelectorTable) startTimes(arr *cursors.IntegerArray) *array.Int { start := arrow.NewIntBuilder(t.alloc) start.Resize(arr.Len()) rangeStart := int64(t.bounds.Start) for _, v := range arr.Timestamps { if windowStart := int64(t.window.GetLatestBounds(values.Time(v)).Start()); windowStart < rangeStart { start.Append(rangeStart) } else { start.Append(windowStart) } } return start.NewIntArray() } func (t *integerWindowSelectorTable) stopTimes(arr *cursors.IntegerArray) *array.Int { stop := arrow.NewIntBuilder(t.alloc) stop.Resize(arr.Len()) rangeStop := int64(t.bounds.Stop) for _, v := range arr.Timestamps { if windowStop := int64(t.window.GetLatestBounds(values.Time(v)).Stop()); windowStop > rangeStop { stop.Append(rangeStop) } else { stop.Append(windowStop) } } return stop.NewIntArray() } // This table implementation may contain empty windows // in addition to non-empty windows. type integerEmptyWindowSelectorTable struct { integerTable arr *cursors.IntegerArray idx int rangeStart int64 rangeStop int64 windowBounds interval.Bounds timeColumn string window interval.Window } func newIntegerEmptyWindowSelectorTable( done chan struct{}, cur cursors.IntegerArrayCursor, bounds execute.Bounds, window interval.Window, timeColumn string, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *integerEmptyWindowSelectorTable { rangeStart := int64(bounds.Start) rangeStop := int64(bounds.Stop) t := &integerEmptyWindowSelectorTable{ integerTable: integerTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, arr: cur.Next(), rangeStart: rangeStart, rangeStop: rangeStop, windowBounds: window.GetLatestBounds(values.Time(rangeStart)), window: window, timeColumn: timeColumn, } t.readTags(tags) t.init(t.advance) return t } func (t *integerEmptyWindowSelectorTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *integerEmptyWindowSelectorTable) advance() bool { if t.arr.Len() == 0 { return false } values := t.arrowBuilder() values.Resize(storage.MaxPointsPerBlock) var cr *colReader switch t.timeColumn { case execute.DefaultStartColLabel: start := t.startTimes(values) cr = t.allocateBuffer(start.Len()) cr.cols[timeColIdx] = start t.appendBounds(cr) case execute.DefaultStopColLabel: stop := t.stopTimes(values) cr = t.allocateBuffer(stop.Len()) cr.cols[timeColIdx] = stop t.appendBounds(cr) default: start, stop, time := t.startStopTimes(values) cr = t.allocateBuffer(time.Len()) cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[timeColIdx] = time } cr.cols[valueColIdx] = values.NewIntArray() t.appendTags(cr) return true } func (t *integerEmptyWindowSelectorTable) startTimes(builder *array.IntBuilder) *array.Int { start := arrow.NewIntBuilder(t.alloc) start.Resize(storage.MaxPointsPerBlock) for int64(t.windowBounds.Start()) < t.rangeStop { // The first window should start at the // beginning of the time range. if int64(t.windowBounds.Start()) < t.rangeStart { start.Append(t.rangeStart) } else { start.Append(int64(t.windowBounds.Start())) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) { t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { builder.AppendNull() } t.windowBounds = t.window.NextBounds(t.windowBounds) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if start.Len() == storage.MaxPointsPerBlock { break } } return start.NewIntArray() } func (t *integerEmptyWindowSelectorTable) stopTimes(builder *array.IntBuilder) *array.Int { stop := arrow.NewIntBuilder(t.alloc) stop.Resize(storage.MaxPointsPerBlock) for int64(t.windowBounds.Start()) < t.rangeStop { // The last window should stop at the end of // the time range. if int64(t.windowBounds.Stop()) > t.rangeStop { stop.Append(t.rangeStop) } else { stop.Append(int64(t.windowBounds.Stop())) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) { t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { builder.AppendNull() } t.windowBounds = t.window.NextBounds(t.windowBounds) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if stop.Len() == storage.MaxPointsPerBlock { break } } return stop.NewIntArray() } func (t *integerEmptyWindowSelectorTable) startStopTimes(builder *array.IntBuilder) (*array.Int, *array.Int, *array.Int) { start := arrow.NewIntBuilder(t.alloc) start.Resize(storage.MaxPointsPerBlock) stop := arrow.NewIntBuilder(t.alloc) stop.Resize(storage.MaxPointsPerBlock) time := arrow.NewIntBuilder(t.alloc) time.Resize(storage.MaxPointsPerBlock) for int64(t.windowBounds.Start()) < t.rangeStop { // The first window should start at the // beginning of the time range. if int64(t.windowBounds.Start()) < t.rangeStart { start.Append(t.rangeStart) } else { start.Append(int64(t.windowBounds.Start())) } // The last window should stop at the end of // the time range. if int64(t.windowBounds.Stop()) > t.rangeStop { stop.Append(t.rangeStop) } else { stop.Append(int64(t.windowBounds.Stop())) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) { time.Append(v) t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { time.AppendNull() builder.AppendNull() } t.windowBounds = t.window.NextBounds(t.windowBounds) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if time.Len() == storage.MaxPointsPerBlock { break } } return start.NewIntArray(), stop.NewIntArray(), time.NewIntArray() } // group table type integerGroupTable struct { table mu sync.Mutex gc storage.GroupCursor cur cursors.IntegerArrayCursor } func newIntegerGroupTable( done chan struct{}, gc storage.GroupCursor, cur cursors.IntegerArrayCursor, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *integerGroupTable { t := &integerGroupTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), gc: gc, cur: cur, } t.readTags(tags) t.init(t.advance) return t } func (t *integerGroupTable) Close() { t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } if t.gc != nil { t.gc.Close() t.gc = nil } t.mu.Unlock() } func (t *integerGroupTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *integerGroupTable) advance() bool { if t.cur == nil { // For group aggregates, we will try to get all the series and all table buffers within those series // all at once and merge them into one row when this advance() function is first called. // At the end of this process, t.advanceCursor() already returns false and t.cur becomes nil. // But we still need to return true to indicate that there is data to be returned. // The second time when we call this advance(), t.cur is already nil, so we directly return false. return false } var arr *cursors.IntegerArray var len int for { arr = t.cur.Next() len = arr.Len() if len > 0 { break } if !t.advanceCursor() { return false } } // handle the group without aggregate case if t.gc.Aggregate() == nil { // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. colReader := t.allocateBuffer(len) colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(colReader) t.appendBounds(colReader) return true } aggregate, err := makeIntegerAggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } if !t.advanceCursor() { break } } timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer([]int64{value}) } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]int64{value}) } t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } type IntegerAggregateAccumulator interface { // AccumulateFirst receives an initial array of items to select from. // It selects an item and stores the state. Afterwards, more data can // be supplied with AccumulateMore and the results can be requested at // any time. Without a call to AccumulateFirst the results are not // defined. AccumulateFirst(timestamps []int64, values []int64, tags [][]byte) // AccumulateMore receives additional array elements to select from. AccumulateMore(timestamps []int64, values []int64, tags [][]byte) // Result returns the item selected from the data received so far. Result() (int64, int64, [][]byte) } // The selector method takes a ( timestamp, value ) pair, a // ( []timestamp, []value ) pair, and a starting index. It applies the selector // to the single value and the array, starting at the supplied index. It // returns -1 if the single value is selected and a non-negative value if an // item from the array is selected. type integerSelectorMethod func(int64, int64, []int64, []int64, int) int // The selector accumulator tracks currently-selected item. type integerSelectorAccumulator struct { selector integerSelectorMethod ts int64 v int64 tags [][]byte } func (a *integerSelectorAccumulator) AccumulateFirst(timestamps []int64, values []int64, tags [][]byte) { index := a.selector(timestamps[0], values[0], timestamps, values, 1) if index < 0 { a.ts = timestamps[0] a.v = values[0] } else { a.ts = timestamps[index] a.v = values[index] } a.tags = make([][]byte, len(tags)) copy(a.tags, tags) } func (a *integerSelectorAccumulator) AccumulateMore(timestamps []int64, values []int64, tags [][]byte) { index := a.selector(a.ts, a.v, timestamps, values, 0) if index >= 0 { a.ts = timestamps[index] a.v = values[index] if len(tags) > cap(a.tags) { a.tags = make([][]byte, len(tags)) } else { a.tags = a.tags[:len(tags)] } copy(a.tags, tags) } } func (a *integerSelectorAccumulator) Result() (int64, int64, [][]byte) { return a.ts, a.v, a.tags } // The aggregate method takes a value, an array of values, and a starting // index, applies an aggregate operation over the value and the array, starting // at the given index, and returns the result. type integerAggregateMethod func(int64, []int64, int) int64 type integerAggregateAccumulator struct { aggregate integerAggregateMethod accum int64 // For pure aggregates it doesn't matter what we return for tags, but // we need to satisfy the interface. We will just return the most // recently seen tags. tags [][]byte } func (a *integerAggregateAccumulator) AccumulateFirst(timestamps []int64, values []int64, tags [][]byte) { a.accum = a.aggregate(values[0], values, 1) a.tags = tags } func (a *integerAggregateAccumulator) AccumulateMore(timestamps []int64, values []int64, tags [][]byte) { a.accum = a.aggregate(a.accum, values, 0) a.tags = tags } // For group aggregates (non-selectors), the timestamp is always math.MaxInt64. // their final result does not contain _time, so this timestamp value can be // anything and it won't matter. func (a *integerAggregateAccumulator) Result() (int64, int64, [][]byte) { return math.MaxInt64, a.accum, a.tags } // makeIntegerAggregateAccumulator returns the interface implementation for // aggregating returned points within the same group. The incoming points are // the ones returned for each series and the struct returned here will // aggregate the aggregates. func makeIntegerAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (IntegerAggregateAccumulator, error) { switch agg { case datatypes.Aggregate_AggregateTypeFirst: return &integerSelectorAccumulator{selector: selectorFirstGroupsInteger}, nil case datatypes.Aggregate_AggregateTypeLast: return &integerSelectorAccumulator{selector: selectorLastGroupsInteger}, nil case datatypes.Aggregate_AggregateTypeCount: return &integerAggregateAccumulator{aggregate: aggregateCountGroupsInteger}, nil case datatypes.Aggregate_AggregateTypeSum: return &integerAggregateAccumulator{aggregate: aggregateSumGroupsInteger}, nil case datatypes.Aggregate_AggregateTypeMin: return &integerSelectorAccumulator{selector: selectorMinGroupsInteger}, nil case datatypes.Aggregate_AggregateTypeMax: return &integerSelectorAccumulator{selector: selectorMaxGroupsInteger}, nil default: return nil, &errors.Error{ Code: errors.EInvalid, Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg), } } } func selectorMinGroupsInteger(ts int64, v int64, timestamps []int64, values []int64, i int) int { index := -1 for ; i < len(values); i++ { if v > values[i] { index = i v = values[i] } } return index } func selectorMaxGroupsInteger(ts int64, v int64, timestamps []int64, values []int64, i int) int { index := -1 for ; i < len(values); i++ { if v < values[i] { index = i v = values[i] } } return index } func aggregateCountGroupsInteger(accum int64, values []int64, i int) int64 { return aggregateSumGroupsInteger(accum, values, i) } func aggregateSumGroupsInteger(sum int64, values []int64, i int) int64 { for ; i < len(values); i++ { sum += values[i] } return sum } func selectorFirstGroupsInteger(ts int64, v int64, timestamps []int64, values []int64, i int) int { index := -1 for ; i < len(values); i++ { if ts > timestamps[i] { index = i ts = timestamps[i] } } return index } func selectorLastGroupsInteger(ts int64, v int64, timestamps []int64, values []int64, i int) int { index := -1 for ; i < len(values); i++ { if ts <= timestamps[i] { index = i ts = timestamps[i] } } return index } func (t *integerGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { cur := t.gc.Cursor() if cur == nil { continue } if typedCur, ok := cur.(cursors.IntegerArrayCursor); !ok { // TODO(sgc): error or skip? cur.Close() t.err = &errors.Error{ Code: errors.EInvalid, Err: &GroupCursorError{ typ: "integer", cursor: cur, }, } return false } else { t.readTags(t.gc.Tags()) t.cur = typedCur return true } } return false } func (t *integerGroupTable) Statistics() cursors.CursorStats { if t.cur == nil { return cursors.CursorStats{} } cs := t.cur.Stats() return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } } // // *********** Unsigned *********** // type unsignedTable struct { table mu sync.Mutex cur cursors.UnsignedArrayCursor alloc *memory.Allocator } func newUnsignedTable( done chan struct{}, cur cursors.UnsignedArrayCursor, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *unsignedTable { t := &unsignedTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, } t.readTags(tags) t.init(t.advance) return t } func (t *unsignedTable) Close() { t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } t.mu.Unlock() } func (t *unsignedTable) Statistics() cursors.CursorStats { t.mu.Lock() defer t.mu.Unlock() cur := t.cur if cur == nil { return cursors.CursorStats{} } cs := cur.Stats() return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } } func (t *unsignedTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *unsignedTable) advance() bool { a := t.cur.Next() l := a.Len() if l == 0 { return false } // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. cr := t.allocateBuffer(l) cr.cols[timeColIdx] = arrow.NewInt(a.Timestamps, t.alloc) cr.cols[valueColIdx] = t.toArrowBuffer(a.Values) t.appendTags(cr) t.appendBounds(cr) return true } // window table type unsignedWindowTable struct { unsignedTable arr *cursors.UnsignedArray windowBounds interval.Bounds idxInArr int createEmpty bool timeColumn string isAggregate bool window interval.Window } func newUnsignedWindowTable( done chan struct{}, cur cursors.UnsignedArrayCursor, bounds execute.Bounds, window interval.Window, createEmpty bool, timeColumn string, isAggregate bool, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *unsignedWindowTable { t := &unsignedWindowTable{ unsignedTable: unsignedTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, window: window, createEmpty: createEmpty, timeColumn: timeColumn, isAggregate: isAggregate, } if t.createEmpty { start := int64(bounds.Start) t.windowBounds = window.GetLatestBounds(values.Time(start)) } t.readTags(tags) t.init(t.advance) return t } func (t *unsignedWindowTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } // createNextBufferTimes will read the timestamps from the array // cursor and construct the values for the next buffer. func (t *unsignedWindowTable) createNextBufferTimes() (start, stop *array.Int, ok bool) { startB := arrow.NewIntBuilder(t.alloc) stopB := arrow.NewIntBuilder(t.alloc) if t.createEmpty { // There are no more windows when the start time is greater // than or equal to the stop time. if startT := int64(t.windowBounds.Start()); startT >= int64(t.bounds.Stop) { return nil, nil, false } // Create a buffer with the buffer size. // TODO(jsternberg): Calculate the exact size with max points as the maximum. startB.Resize(storage.MaxPointsPerBlock) stopB.Resize(storage.MaxPointsPerBlock) for ; ; t.windowBounds = t.window.NextBounds(t.windowBounds) { startT, stopT := t.getWindowBoundsFor(t.windowBounds) if startT >= int64(t.bounds.Stop) { break } startB.Append(startT) stopB.Append(stopT) } start = startB.NewIntArray() stop = stopB.NewIntArray() return start, stop, true } // Retrieve the next buffer so we can copy the timestamps. if !t.nextBuffer() { return nil, nil, false } // Copy over the timestamps from the next buffer and adjust // times for the boundaries. startB.Resize(len(t.arr.Timestamps)) stopB.Resize(len(t.arr.Timestamps)) for _, stopT := range t.arr.Timestamps { bounds := t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stopT))) startT, stopT := t.getWindowBoundsFor(bounds) startB.Append(startT) stopB.Append(stopT) } start = startB.NewIntArray() stop = stopB.NewIntArray() return start, stop, true } func (t *unsignedWindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64, int64) { beg := int64(bounds.Start()) end := int64(bounds.Stop()) if beg < int64(t.bounds.Start) { beg = int64(t.bounds.Start) } if end > int64(t.bounds.Stop) { end = int64(t.bounds.Stop) } return beg, end } // nextAt will retrieve the next value that can be used with // the given stop timestamp. If no values can be used with the timestamp, // it will return the default value and false. func (t *unsignedWindowTable) nextAt(stop int64) (v uint64, ok bool) { if !t.nextBuffer() { return } else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) { return } v, ok = t.arr.Values[t.idxInArr], true t.idxInArr++ return v, ok } // isInWindow will check if the given time may be used within the window // denoted by the stop timestamp. The stop may be a truncated stop time // because of a restricted boundary. // // When used with an aggregate, ts will be the true stop time returned // by storage. When used with an aggregate, it will be the real time // for the point. func (t *unsignedWindowTable) isInWindow(stop int64, ts int64) bool { // Retrieve the boundary associated with this stop time. // This will be the boundary for the previous nanosecond. bounds := t.window.GetLatestBounds(values.Time(stop - 1)) start, stop := int64(bounds.Start()), int64(bounds.Stop()) // For an aggregate, the timestamp will be the stop time of the boundary. if t.isAggregate { return start < ts && ts <= stop } // For a selector, the timestamp should be within the boundary. return start <= ts && ts < stop } // nextBuffer will ensure the array cursor is filled // and will return true if there is at least one value // that can be read from it. func (t *unsignedWindowTable) nextBuffer() bool { // Discard the current array cursor if we have // exceeded it. if t.arr != nil && t.idxInArr >= t.arr.Len() { t.arr = nil } // Retrieve the next array cursor if needed. if t.arr == nil { arr := t.cur.Next() if arr.Len() == 0 { return false } t.arr, t.idxInArr = arr, 0 } return true } // appendValues will scan the timestamps and append values // that match those timestamps from the buffer. func (t *unsignedWindowTable) appendValues(intervals []int64, appendValue func(v uint64), appendNull func()) { for i := 0; i < len(intervals); i++ { if v, ok := t.nextAt(intervals[i]); ok { appendValue(v) continue } appendNull() } } func (t *unsignedWindowTable) advance() bool { if !t.nextBuffer() { return false } // Create the timestamps for the next window. start, stop, ok := t.createNextBufferTimes() if !ok { return false } values := t.mergeValues(stop.Int64Values()) // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. cr := t.allocateBuffer(stop.Len()) if t.timeColumn != "" { switch t.timeColumn { case execute.DefaultStopColLabel: cr.cols[timeColIdx] = stop start.Release() case execute.DefaultStartColLabel: cr.cols[timeColIdx] = start stop.Release() } cr.cols[valueColIdx] = values t.appendBounds(cr) } else { cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[valueColIdxWithoutTime] = values } t.appendTags(cr) return true } // This table implementation will not have any empty windows. type unsignedWindowSelectorTable struct { unsignedTable timeColumn string window interval.Window } func newUnsignedWindowSelectorTable( done chan struct{}, cur cursors.UnsignedArrayCursor, bounds execute.Bounds, window interval.Window, timeColumn string, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *unsignedWindowSelectorTable { t := &unsignedWindowSelectorTable{ unsignedTable: unsignedTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, window: window, timeColumn: timeColumn, } t.readTags(tags) t.init(t.advance) return t } func (t *unsignedWindowSelectorTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *unsignedWindowSelectorTable) advance() bool { arr := t.cur.Next() if arr.Len() == 0 { return false } cr := t.allocateBuffer(arr.Len()) switch t.timeColumn { case execute.DefaultStartColLabel: cr.cols[timeColIdx] = t.startTimes(arr) t.appendBounds(cr) case execute.DefaultStopColLabel: cr.cols[timeColIdx] = t.stopTimes(arr) t.appendBounds(cr) default: cr.cols[startColIdx] = t.startTimes(arr) cr.cols[stopColIdx] = t.stopTimes(arr) cr.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) } cr.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(cr) return true } func (t *unsignedWindowSelectorTable) startTimes(arr *cursors.UnsignedArray) *array.Int { start := arrow.NewIntBuilder(t.alloc) start.Resize(arr.Len()) rangeStart := int64(t.bounds.Start) for _, v := range arr.Timestamps { if windowStart := int64(t.window.GetLatestBounds(values.Time(v)).Start()); windowStart < rangeStart { start.Append(rangeStart) } else { start.Append(windowStart) } } return start.NewIntArray() } func (t *unsignedWindowSelectorTable) stopTimes(arr *cursors.UnsignedArray) *array.Int { stop := arrow.NewIntBuilder(t.alloc) stop.Resize(arr.Len()) rangeStop := int64(t.bounds.Stop) for _, v := range arr.Timestamps { if windowStop := int64(t.window.GetLatestBounds(values.Time(v)).Stop()); windowStop > rangeStop { stop.Append(rangeStop) } else { stop.Append(windowStop) } } return stop.NewIntArray() } // This table implementation may contain empty windows // in addition to non-empty windows. type unsignedEmptyWindowSelectorTable struct { unsignedTable arr *cursors.UnsignedArray idx int rangeStart int64 rangeStop int64 windowBounds interval.Bounds timeColumn string window interval.Window } func newUnsignedEmptyWindowSelectorTable( done chan struct{}, cur cursors.UnsignedArrayCursor, bounds execute.Bounds, window interval.Window, timeColumn string, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *unsignedEmptyWindowSelectorTable { rangeStart := int64(bounds.Start) rangeStop := int64(bounds.Stop) t := &unsignedEmptyWindowSelectorTable{ unsignedTable: unsignedTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, arr: cur.Next(), rangeStart: rangeStart, rangeStop: rangeStop, windowBounds: window.GetLatestBounds(values.Time(rangeStart)), window: window, timeColumn: timeColumn, } t.readTags(tags) t.init(t.advance) return t } func (t *unsignedEmptyWindowSelectorTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *unsignedEmptyWindowSelectorTable) advance() bool { if t.arr.Len() == 0 { return false } values := t.arrowBuilder() values.Resize(storage.MaxPointsPerBlock) var cr *colReader switch t.timeColumn { case execute.DefaultStartColLabel: start := t.startTimes(values) cr = t.allocateBuffer(start.Len()) cr.cols[timeColIdx] = start t.appendBounds(cr) case execute.DefaultStopColLabel: stop := t.stopTimes(values) cr = t.allocateBuffer(stop.Len()) cr.cols[timeColIdx] = stop t.appendBounds(cr) default: start, stop, time := t.startStopTimes(values) cr = t.allocateBuffer(time.Len()) cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[timeColIdx] = time } cr.cols[valueColIdx] = values.NewUintArray() t.appendTags(cr) return true } func (t *unsignedEmptyWindowSelectorTable) startTimes(builder *array.UintBuilder) *array.Int { start := arrow.NewIntBuilder(t.alloc) start.Resize(storage.MaxPointsPerBlock) for int64(t.windowBounds.Start()) < t.rangeStop { // The first window should start at the // beginning of the time range. if int64(t.windowBounds.Start()) < t.rangeStart { start.Append(t.rangeStart) } else { start.Append(int64(t.windowBounds.Start())) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) { t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { builder.AppendNull() } t.windowBounds = t.window.NextBounds(t.windowBounds) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if start.Len() == storage.MaxPointsPerBlock { break } } return start.NewIntArray() } func (t *unsignedEmptyWindowSelectorTable) stopTimes(builder *array.UintBuilder) *array.Int { stop := arrow.NewIntBuilder(t.alloc) stop.Resize(storage.MaxPointsPerBlock) for int64(t.windowBounds.Start()) < t.rangeStop { // The last window should stop at the end of // the time range. if int64(t.windowBounds.Stop()) > t.rangeStop { stop.Append(t.rangeStop) } else { stop.Append(int64(t.windowBounds.Stop())) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) { t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { builder.AppendNull() } t.windowBounds = t.window.NextBounds(t.windowBounds) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if stop.Len() == storage.MaxPointsPerBlock { break } } return stop.NewIntArray() } func (t *unsignedEmptyWindowSelectorTable) startStopTimes(builder *array.UintBuilder) (*array.Int, *array.Int, *array.Int) { start := arrow.NewIntBuilder(t.alloc) start.Resize(storage.MaxPointsPerBlock) stop := arrow.NewIntBuilder(t.alloc) stop.Resize(storage.MaxPointsPerBlock) time := arrow.NewIntBuilder(t.alloc) time.Resize(storage.MaxPointsPerBlock) for int64(t.windowBounds.Start()) < t.rangeStop { // The first window should start at the // beginning of the time range. if int64(t.windowBounds.Start()) < t.rangeStart { start.Append(t.rangeStart) } else { start.Append(int64(t.windowBounds.Start())) } // The last window should stop at the end of // the time range. if int64(t.windowBounds.Stop()) > t.rangeStop { stop.Append(t.rangeStop) } else { stop.Append(int64(t.windowBounds.Stop())) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) { time.Append(v) t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { time.AppendNull() builder.AppendNull() } t.windowBounds = t.window.NextBounds(t.windowBounds) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if time.Len() == storage.MaxPointsPerBlock { break } } return start.NewIntArray(), stop.NewIntArray(), time.NewIntArray() } // group table type unsignedGroupTable struct { table mu sync.Mutex gc storage.GroupCursor cur cursors.UnsignedArrayCursor } func newUnsignedGroupTable( done chan struct{}, gc storage.GroupCursor, cur cursors.UnsignedArrayCursor, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *unsignedGroupTable { t := &unsignedGroupTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), gc: gc, cur: cur, } t.readTags(tags) t.init(t.advance) return t } func (t *unsignedGroupTable) Close() { t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } if t.gc != nil { t.gc.Close() t.gc = nil } t.mu.Unlock() } func (t *unsignedGroupTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *unsignedGroupTable) advance() bool { if t.cur == nil { // For group aggregates, we will try to get all the series and all table buffers within those series // all at once and merge them into one row when this advance() function is first called. // At the end of this process, t.advanceCursor() already returns false and t.cur becomes nil. // But we still need to return true to indicate that there is data to be returned. // The second time when we call this advance(), t.cur is already nil, so we directly return false. return false } var arr *cursors.UnsignedArray var len int for { arr = t.cur.Next() len = arr.Len() if len > 0 { break } if !t.advanceCursor() { return false } } // handle the group without aggregate case if t.gc.Aggregate() == nil { // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. colReader := t.allocateBuffer(len) colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(colReader) t.appendBounds(colReader) return true } aggregate, err := makeUnsignedAggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } if !t.advanceCursor() { break } } timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer([]uint64{value}) } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]uint64{value}) } t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } type UnsignedAggregateAccumulator interface { // AccumulateFirst receives an initial array of items to select from. // It selects an item and stores the state. Afterwards, more data can // be supplied with AccumulateMore and the results can be requested at // any time. Without a call to AccumulateFirst the results are not // defined. AccumulateFirst(timestamps []int64, values []uint64, tags [][]byte) // AccumulateMore receives additional array elements to select from. AccumulateMore(timestamps []int64, values []uint64, tags [][]byte) // Result returns the item selected from the data received so far. Result() (int64, uint64, [][]byte) } // The selector method takes a ( timestamp, value ) pair, a // ( []timestamp, []value ) pair, and a starting index. It applies the selector // to the single value and the array, starting at the supplied index. It // returns -1 if the single value is selected and a non-negative value if an // item from the array is selected. type unsignedSelectorMethod func(int64, uint64, []int64, []uint64, int) int // The selector accumulator tracks currently-selected item. type unsignedSelectorAccumulator struct { selector unsignedSelectorMethod ts int64 v uint64 tags [][]byte } func (a *unsignedSelectorAccumulator) AccumulateFirst(timestamps []int64, values []uint64, tags [][]byte) { index := a.selector(timestamps[0], values[0], timestamps, values, 1) if index < 0 { a.ts = timestamps[0] a.v = values[0] } else { a.ts = timestamps[index] a.v = values[index] } a.tags = make([][]byte, len(tags)) copy(a.tags, tags) } func (a *unsignedSelectorAccumulator) AccumulateMore(timestamps []int64, values []uint64, tags [][]byte) { index := a.selector(a.ts, a.v, timestamps, values, 0) if index >= 0 { a.ts = timestamps[index] a.v = values[index] if len(tags) > cap(a.tags) { a.tags = make([][]byte, len(tags)) } else { a.tags = a.tags[:len(tags)] } copy(a.tags, tags) } } func (a *unsignedSelectorAccumulator) Result() (int64, uint64, [][]byte) { return a.ts, a.v, a.tags } // The aggregate method takes a value, an array of values, and a starting // index, applies an aggregate operation over the value and the array, starting // at the given index, and returns the result. type unsignedAggregateMethod func(uint64, []uint64, int) uint64 type unsignedAggregateAccumulator struct { aggregate unsignedAggregateMethod accum uint64 // For pure aggregates it doesn't matter what we return for tags, but // we need to satisfy the interface. We will just return the most // recently seen tags. tags [][]byte } func (a *unsignedAggregateAccumulator) AccumulateFirst(timestamps []int64, values []uint64, tags [][]byte) { a.accum = a.aggregate(values[0], values, 1) a.tags = tags } func (a *unsignedAggregateAccumulator) AccumulateMore(timestamps []int64, values []uint64, tags [][]byte) { a.accum = a.aggregate(a.accum, values, 0) a.tags = tags } // For group aggregates (non-selectors), the timestamp is always math.MaxInt64. // their final result does not contain _time, so this timestamp value can be // anything and it won't matter. func (a *unsignedAggregateAccumulator) Result() (int64, uint64, [][]byte) { return math.MaxInt64, a.accum, a.tags } // makeUnsignedAggregateAccumulator returns the interface implementation for // aggregating returned points within the same group. The incoming points are // the ones returned for each series and the struct returned here will // aggregate the aggregates. func makeUnsignedAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (UnsignedAggregateAccumulator, error) { switch agg { case datatypes.Aggregate_AggregateTypeFirst: return &unsignedSelectorAccumulator{selector: selectorFirstGroupsUnsigned}, nil case datatypes.Aggregate_AggregateTypeLast: return &unsignedSelectorAccumulator{selector: selectorLastGroupsUnsigned}, nil case datatypes.Aggregate_AggregateTypeCount: return nil, &errors.Error{ Code: errors.EInvalid, Msg: "unsupported for aggregate count: Unsigned", } case datatypes.Aggregate_AggregateTypeSum: return &unsignedAggregateAccumulator{aggregate: aggregateSumGroupsUnsigned}, nil case datatypes.Aggregate_AggregateTypeMin: return &unsignedSelectorAccumulator{selector: selectorMinGroupsUnsigned}, nil case datatypes.Aggregate_AggregateTypeMax: return &unsignedSelectorAccumulator{selector: selectorMaxGroupsUnsigned}, nil default: return nil, &errors.Error{ Code: errors.EInvalid, Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg), } } } func selectorMinGroupsUnsigned(ts int64, v uint64, timestamps []int64, values []uint64, i int) int { index := -1 for ; i < len(values); i++ { if v > values[i] { index = i v = values[i] } } return index } func selectorMaxGroupsUnsigned(ts int64, v uint64, timestamps []int64, values []uint64, i int) int { index := -1 for ; i < len(values); i++ { if v < values[i] { index = i v = values[i] } } return index } func aggregateSumGroupsUnsigned(sum uint64, values []uint64, i int) uint64 { for ; i < len(values); i++ { sum += values[i] } return sum } func selectorFirstGroupsUnsigned(ts int64, v uint64, timestamps []int64, values []uint64, i int) int { index := -1 for ; i < len(values); i++ { if ts > timestamps[i] { index = i ts = timestamps[i] } } return index } func selectorLastGroupsUnsigned(ts int64, v uint64, timestamps []int64, values []uint64, i int) int { index := -1 for ; i < len(values); i++ { if ts <= timestamps[i] { index = i ts = timestamps[i] } } return index } func (t *unsignedGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { cur := t.gc.Cursor() if cur == nil { continue } if typedCur, ok := cur.(cursors.UnsignedArrayCursor); !ok { // TODO(sgc): error or skip? cur.Close() t.err = &errors.Error{ Code: errors.EInvalid, Err: &GroupCursorError{ typ: "unsigned", cursor: cur, }, } return false } else { t.readTags(t.gc.Tags()) t.cur = typedCur return true } } return false } func (t *unsignedGroupTable) Statistics() cursors.CursorStats { if t.cur == nil { return cursors.CursorStats{} } cs := t.cur.Stats() return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } } // // *********** String *********** // type stringTable struct { table mu sync.Mutex cur cursors.StringArrayCursor alloc *memory.Allocator } func newStringTable( done chan struct{}, cur cursors.StringArrayCursor, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *stringTable { t := &stringTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, } t.readTags(tags) t.init(t.advance) return t } func (t *stringTable) Close() { t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } t.mu.Unlock() } func (t *stringTable) Statistics() cursors.CursorStats { t.mu.Lock() defer t.mu.Unlock() cur := t.cur if cur == nil { return cursors.CursorStats{} } cs := cur.Stats() return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } } func (t *stringTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *stringTable) advance() bool { a := t.cur.Next() l := a.Len() if l == 0 { return false } // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. cr := t.allocateBuffer(l) cr.cols[timeColIdx] = arrow.NewInt(a.Timestamps, t.alloc) cr.cols[valueColIdx] = t.toArrowBuffer(a.Values) t.appendTags(cr) t.appendBounds(cr) return true } // window table type stringWindowTable struct { stringTable arr *cursors.StringArray windowBounds interval.Bounds idxInArr int createEmpty bool timeColumn string isAggregate bool window interval.Window } func newStringWindowTable( done chan struct{}, cur cursors.StringArrayCursor, bounds execute.Bounds, window interval.Window, createEmpty bool, timeColumn string, isAggregate bool, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *stringWindowTable { t := &stringWindowTable{ stringTable: stringTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, window: window, createEmpty: createEmpty, timeColumn: timeColumn, isAggregate: isAggregate, } if t.createEmpty { start := int64(bounds.Start) t.windowBounds = window.GetLatestBounds(values.Time(start)) } t.readTags(tags) t.init(t.advance) return t } func (t *stringWindowTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } // createNextBufferTimes will read the timestamps from the array // cursor and construct the values for the next buffer. func (t *stringWindowTable) createNextBufferTimes() (start, stop *array.Int, ok bool) { startB := arrow.NewIntBuilder(t.alloc) stopB := arrow.NewIntBuilder(t.alloc) if t.createEmpty { // There are no more windows when the start time is greater // than or equal to the stop time. if startT := int64(t.windowBounds.Start()); startT >= int64(t.bounds.Stop) { return nil, nil, false } // Create a buffer with the buffer size. // TODO(jsternberg): Calculate the exact size with max points as the maximum. startB.Resize(storage.MaxPointsPerBlock) stopB.Resize(storage.MaxPointsPerBlock) for ; ; t.windowBounds = t.window.NextBounds(t.windowBounds) { startT, stopT := t.getWindowBoundsFor(t.windowBounds) if startT >= int64(t.bounds.Stop) { break } startB.Append(startT) stopB.Append(stopT) } start = startB.NewIntArray() stop = stopB.NewIntArray() return start, stop, true } // Retrieve the next buffer so we can copy the timestamps. if !t.nextBuffer() { return nil, nil, false } // Copy over the timestamps from the next buffer and adjust // times for the boundaries. startB.Resize(len(t.arr.Timestamps)) stopB.Resize(len(t.arr.Timestamps)) for _, stopT := range t.arr.Timestamps { bounds := t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stopT))) startT, stopT := t.getWindowBoundsFor(bounds) startB.Append(startT) stopB.Append(stopT) } start = startB.NewIntArray() stop = stopB.NewIntArray() return start, stop, true } func (t *stringWindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64, int64) { beg := int64(bounds.Start()) end := int64(bounds.Stop()) if beg < int64(t.bounds.Start) { beg = int64(t.bounds.Start) } if end > int64(t.bounds.Stop) { end = int64(t.bounds.Stop) } return beg, end } // nextAt will retrieve the next value that can be used with // the given stop timestamp. If no values can be used with the timestamp, // it will return the default value and false. func (t *stringWindowTable) nextAt(stop int64) (v string, ok bool) { if !t.nextBuffer() { return } else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) { return } v, ok = t.arr.Values[t.idxInArr], true t.idxInArr++ return v, ok } // isInWindow will check if the given time may be used within the window // denoted by the stop timestamp. The stop may be a truncated stop time // because of a restricted boundary. // // When used with an aggregate, ts will be the true stop time returned // by storage. When used with an aggregate, it will be the real time // for the point. func (t *stringWindowTable) isInWindow(stop int64, ts int64) bool { // Retrieve the boundary associated with this stop time. // This will be the boundary for the previous nanosecond. bounds := t.window.GetLatestBounds(values.Time(stop - 1)) start, stop := int64(bounds.Start()), int64(bounds.Stop()) // For an aggregate, the timestamp will be the stop time of the boundary. if t.isAggregate { return start < ts && ts <= stop } // For a selector, the timestamp should be within the boundary. return start <= ts && ts < stop } // nextBuffer will ensure the array cursor is filled // and will return true if there is at least one value // that can be read from it. func (t *stringWindowTable) nextBuffer() bool { // Discard the current array cursor if we have // exceeded it. if t.arr != nil && t.idxInArr >= t.arr.Len() { t.arr = nil } // Retrieve the next array cursor if needed. if t.arr == nil { arr := t.cur.Next() if arr.Len() == 0 { return false } t.arr, t.idxInArr = arr, 0 } return true } // appendValues will scan the timestamps and append values // that match those timestamps from the buffer. func (t *stringWindowTable) appendValues(intervals []int64, appendValue func(v string), appendNull func()) { for i := 0; i < len(intervals); i++ { if v, ok := t.nextAt(intervals[i]); ok { appendValue(v) continue } appendNull() } } func (t *stringWindowTable) advance() bool { if !t.nextBuffer() { return false } // Create the timestamps for the next window. start, stop, ok := t.createNextBufferTimes() if !ok { return false } values := t.mergeValues(stop.Int64Values()) // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. cr := t.allocateBuffer(stop.Len()) if t.timeColumn != "" { switch t.timeColumn { case execute.DefaultStopColLabel: cr.cols[timeColIdx] = stop start.Release() case execute.DefaultStartColLabel: cr.cols[timeColIdx] = start stop.Release() } cr.cols[valueColIdx] = values t.appendBounds(cr) } else { cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[valueColIdxWithoutTime] = values } t.appendTags(cr) return true } // This table implementation will not have any empty windows. type stringWindowSelectorTable struct { stringTable timeColumn string window interval.Window } func newStringWindowSelectorTable( done chan struct{}, cur cursors.StringArrayCursor, bounds execute.Bounds, window interval.Window, timeColumn string, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *stringWindowSelectorTable { t := &stringWindowSelectorTable{ stringTable: stringTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, window: window, timeColumn: timeColumn, } t.readTags(tags) t.init(t.advance) return t } func (t *stringWindowSelectorTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *stringWindowSelectorTable) advance() bool { arr := t.cur.Next() if arr.Len() == 0 { return false } cr := t.allocateBuffer(arr.Len()) switch t.timeColumn { case execute.DefaultStartColLabel: cr.cols[timeColIdx] = t.startTimes(arr) t.appendBounds(cr) case execute.DefaultStopColLabel: cr.cols[timeColIdx] = t.stopTimes(arr) t.appendBounds(cr) default: cr.cols[startColIdx] = t.startTimes(arr) cr.cols[stopColIdx] = t.stopTimes(arr) cr.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) } cr.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(cr) return true } func (t *stringWindowSelectorTable) startTimes(arr *cursors.StringArray) *array.Int { start := arrow.NewIntBuilder(t.alloc) start.Resize(arr.Len()) rangeStart := int64(t.bounds.Start) for _, v := range arr.Timestamps { if windowStart := int64(t.window.GetLatestBounds(values.Time(v)).Start()); windowStart < rangeStart { start.Append(rangeStart) } else { start.Append(windowStart) } } return start.NewIntArray() } func (t *stringWindowSelectorTable) stopTimes(arr *cursors.StringArray) *array.Int { stop := arrow.NewIntBuilder(t.alloc) stop.Resize(arr.Len()) rangeStop := int64(t.bounds.Stop) for _, v := range arr.Timestamps { if windowStop := int64(t.window.GetLatestBounds(values.Time(v)).Stop()); windowStop > rangeStop { stop.Append(rangeStop) } else { stop.Append(windowStop) } } return stop.NewIntArray() } // This table implementation may contain empty windows // in addition to non-empty windows. type stringEmptyWindowSelectorTable struct { stringTable arr *cursors.StringArray idx int rangeStart int64 rangeStop int64 windowBounds interval.Bounds timeColumn string window interval.Window } func newStringEmptyWindowSelectorTable( done chan struct{}, cur cursors.StringArrayCursor, bounds execute.Bounds, window interval.Window, timeColumn string, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *stringEmptyWindowSelectorTable { rangeStart := int64(bounds.Start) rangeStop := int64(bounds.Stop) t := &stringEmptyWindowSelectorTable{ stringTable: stringTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, arr: cur.Next(), rangeStart: rangeStart, rangeStop: rangeStop, windowBounds: window.GetLatestBounds(values.Time(rangeStart)), window: window, timeColumn: timeColumn, } t.readTags(tags) t.init(t.advance) return t } func (t *stringEmptyWindowSelectorTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *stringEmptyWindowSelectorTable) advance() bool { if t.arr.Len() == 0 { return false } values := t.arrowBuilder() values.Resize(storage.MaxPointsPerBlock) var cr *colReader switch t.timeColumn { case execute.DefaultStartColLabel: start := t.startTimes(values) cr = t.allocateBuffer(start.Len()) cr.cols[timeColIdx] = start t.appendBounds(cr) case execute.DefaultStopColLabel: stop := t.stopTimes(values) cr = t.allocateBuffer(stop.Len()) cr.cols[timeColIdx] = stop t.appendBounds(cr) default: start, stop, time := t.startStopTimes(values) cr = t.allocateBuffer(time.Len()) cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[timeColIdx] = time } cr.cols[valueColIdx] = values.NewStringArray() t.appendTags(cr) return true } func (t *stringEmptyWindowSelectorTable) startTimes(builder *array.StringBuilder) *array.Int { start := arrow.NewIntBuilder(t.alloc) start.Resize(storage.MaxPointsPerBlock) for int64(t.windowBounds.Start()) < t.rangeStop { // The first window should start at the // beginning of the time range. if int64(t.windowBounds.Start()) < t.rangeStart { start.Append(t.rangeStart) } else { start.Append(int64(t.windowBounds.Start())) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) { t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { builder.AppendNull() } t.windowBounds = t.window.NextBounds(t.windowBounds) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if start.Len() == storage.MaxPointsPerBlock { break } } return start.NewIntArray() } func (t *stringEmptyWindowSelectorTable) stopTimes(builder *array.StringBuilder) *array.Int { stop := arrow.NewIntBuilder(t.alloc) stop.Resize(storage.MaxPointsPerBlock) for int64(t.windowBounds.Start()) < t.rangeStop { // The last window should stop at the end of // the time range. if int64(t.windowBounds.Stop()) > t.rangeStop { stop.Append(t.rangeStop) } else { stop.Append(int64(t.windowBounds.Stop())) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) { t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { builder.AppendNull() } t.windowBounds = t.window.NextBounds(t.windowBounds) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if stop.Len() == storage.MaxPointsPerBlock { break } } return stop.NewIntArray() } func (t *stringEmptyWindowSelectorTable) startStopTimes(builder *array.StringBuilder) (*array.Int, *array.Int, *array.Int) { start := arrow.NewIntBuilder(t.alloc) start.Resize(storage.MaxPointsPerBlock) stop := arrow.NewIntBuilder(t.alloc) stop.Resize(storage.MaxPointsPerBlock) time := arrow.NewIntBuilder(t.alloc) time.Resize(storage.MaxPointsPerBlock) for int64(t.windowBounds.Start()) < t.rangeStop { // The first window should start at the // beginning of the time range. if int64(t.windowBounds.Start()) < t.rangeStart { start.Append(t.rangeStart) } else { start.Append(int64(t.windowBounds.Start())) } // The last window should stop at the end of // the time range. if int64(t.windowBounds.Stop()) > t.rangeStop { stop.Append(t.rangeStop) } else { stop.Append(int64(t.windowBounds.Stop())) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) { time.Append(v) t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { time.AppendNull() builder.AppendNull() } t.windowBounds = t.window.NextBounds(t.windowBounds) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if time.Len() == storage.MaxPointsPerBlock { break } } return start.NewIntArray(), stop.NewIntArray(), time.NewIntArray() } // group table type stringGroupTable struct { table mu sync.Mutex gc storage.GroupCursor cur cursors.StringArrayCursor } func newStringGroupTable( done chan struct{}, gc storage.GroupCursor, cur cursors.StringArrayCursor, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *stringGroupTable { t := &stringGroupTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), gc: gc, cur: cur, } t.readTags(tags) t.init(t.advance) return t } func (t *stringGroupTable) Close() { t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } if t.gc != nil { t.gc.Close() t.gc = nil } t.mu.Unlock() } func (t *stringGroupTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *stringGroupTable) advance() bool { if t.cur == nil { // For group aggregates, we will try to get all the series and all table buffers within those series // all at once and merge them into one row when this advance() function is first called. // At the end of this process, t.advanceCursor() already returns false and t.cur becomes nil. // But we still need to return true to indicate that there is data to be returned. // The second time when we call this advance(), t.cur is already nil, so we directly return false. return false } var arr *cursors.StringArray var len int for { arr = t.cur.Next() len = arr.Len() if len > 0 { break } if !t.advanceCursor() { return false } } // handle the group without aggregate case if t.gc.Aggregate() == nil { // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. colReader := t.allocateBuffer(len) colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(colReader) t.appendBounds(colReader) return true } aggregate, err := makeStringAggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } if !t.advanceCursor() { break } } timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer([]string{value}) } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]string{value}) } t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } type StringAggregateAccumulator interface { // AccumulateFirst receives an initial array of items to select from. // It selects an item and stores the state. Afterwards, more data can // be supplied with AccumulateMore and the results can be requested at // any time. Without a call to AccumulateFirst the results are not // defined. AccumulateFirst(timestamps []int64, values []string, tags [][]byte) // AccumulateMore receives additional array elements to select from. AccumulateMore(timestamps []int64, values []string, tags [][]byte) // Result returns the item selected from the data received so far. Result() (int64, string, [][]byte) } // The selector method takes a ( timestamp, value ) pair, a // ( []timestamp, []value ) pair, and a starting index. It applies the selector // to the single value and the array, starting at the supplied index. It // returns -1 if the single value is selected and a non-negative value if an // item from the array is selected. type stringSelectorMethod func(int64, string, []int64, []string, int) int // The selector accumulator tracks currently-selected item. type stringSelectorAccumulator struct { selector stringSelectorMethod ts int64 v string tags [][]byte } func (a *stringSelectorAccumulator) AccumulateFirst(timestamps []int64, values []string, tags [][]byte) { index := a.selector(timestamps[0], values[0], timestamps, values, 1) if index < 0 { a.ts = timestamps[0] a.v = values[0] } else { a.ts = timestamps[index] a.v = values[index] } a.tags = make([][]byte, len(tags)) copy(a.tags, tags) } func (a *stringSelectorAccumulator) AccumulateMore(timestamps []int64, values []string, tags [][]byte) { index := a.selector(a.ts, a.v, timestamps, values, 0) if index >= 0 { a.ts = timestamps[index] a.v = values[index] if len(tags) > cap(a.tags) { a.tags = make([][]byte, len(tags)) } else { a.tags = a.tags[:len(tags)] } copy(a.tags, tags) } } func (a *stringSelectorAccumulator) Result() (int64, string, [][]byte) { return a.ts, a.v, a.tags } // makeStringAggregateAccumulator returns the interface implementation for // aggregating returned points within the same group. The incoming points are // the ones returned for each series and the struct returned here will // aggregate the aggregates. func makeStringAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (StringAggregateAccumulator, error) { switch agg { case datatypes.Aggregate_AggregateTypeFirst: return &stringSelectorAccumulator{selector: selectorFirstGroupsString}, nil case datatypes.Aggregate_AggregateTypeLast: return &stringSelectorAccumulator{selector: selectorLastGroupsString}, nil case datatypes.Aggregate_AggregateTypeCount: return nil, &errors.Error{ Code: errors.EInvalid, Msg: "unsupported for aggregate count: String", } case datatypes.Aggregate_AggregateTypeSum: return nil, &errors.Error{ Code: errors.EInvalid, Msg: "unsupported for aggregate sum: String", } case datatypes.Aggregate_AggregateTypeMin: return nil, &errors.Error{ Code: errors.EInvalid, Msg: "unsupported for aggregate min: String", } case datatypes.Aggregate_AggregateTypeMax: return nil, &errors.Error{ Code: errors.EInvalid, Msg: "unsupported for aggregate max: String", } default: return nil, &errors.Error{ Code: errors.EInvalid, Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg), } } } func selectorFirstGroupsString(ts int64, v string, timestamps []int64, values []string, i int) int { index := -1 for ; i < len(values); i++ { if ts > timestamps[i] { index = i ts = timestamps[i] } } return index } func selectorLastGroupsString(ts int64, v string, timestamps []int64, values []string, i int) int { index := -1 for ; i < len(values); i++ { if ts <= timestamps[i] { index = i ts = timestamps[i] } } return index } func (t *stringGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { cur := t.gc.Cursor() if cur == nil { continue } if typedCur, ok := cur.(cursors.StringArrayCursor); !ok { // TODO(sgc): error or skip? cur.Close() t.err = &errors.Error{ Code: errors.EInvalid, Err: &GroupCursorError{ typ: "string", cursor: cur, }, } return false } else { t.readTags(t.gc.Tags()) t.cur = typedCur return true } } return false } func (t *stringGroupTable) Statistics() cursors.CursorStats { if t.cur == nil { return cursors.CursorStats{} } cs := t.cur.Stats() return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } } // // *********** Boolean *********** // type booleanTable struct { table mu sync.Mutex cur cursors.BooleanArrayCursor alloc *memory.Allocator } func newBooleanTable( done chan struct{}, cur cursors.BooleanArrayCursor, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *booleanTable { t := &booleanTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, } t.readTags(tags) t.init(t.advance) return t } func (t *booleanTable) Close() { t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } t.mu.Unlock() } func (t *booleanTable) Statistics() cursors.CursorStats { t.mu.Lock() defer t.mu.Unlock() cur := t.cur if cur == nil { return cursors.CursorStats{} } cs := cur.Stats() return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } } func (t *booleanTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *booleanTable) advance() bool { a := t.cur.Next() l := a.Len() if l == 0 { return false } // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. cr := t.allocateBuffer(l) cr.cols[timeColIdx] = arrow.NewInt(a.Timestamps, t.alloc) cr.cols[valueColIdx] = t.toArrowBuffer(a.Values) t.appendTags(cr) t.appendBounds(cr) return true } // window table type booleanWindowTable struct { booleanTable arr *cursors.BooleanArray windowBounds interval.Bounds idxInArr int createEmpty bool timeColumn string isAggregate bool window interval.Window } func newBooleanWindowTable( done chan struct{}, cur cursors.BooleanArrayCursor, bounds execute.Bounds, window interval.Window, createEmpty bool, timeColumn string, isAggregate bool, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *booleanWindowTable { t := &booleanWindowTable{ booleanTable: booleanTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, window: window, createEmpty: createEmpty, timeColumn: timeColumn, isAggregate: isAggregate, } if t.createEmpty { start := int64(bounds.Start) t.windowBounds = window.GetLatestBounds(values.Time(start)) } t.readTags(tags) t.init(t.advance) return t } func (t *booleanWindowTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } // createNextBufferTimes will read the timestamps from the array // cursor and construct the values for the next buffer. func (t *booleanWindowTable) createNextBufferTimes() (start, stop *array.Int, ok bool) { startB := arrow.NewIntBuilder(t.alloc) stopB := arrow.NewIntBuilder(t.alloc) if t.createEmpty { // There are no more windows when the start time is greater // than or equal to the stop time. if startT := int64(t.windowBounds.Start()); startT >= int64(t.bounds.Stop) { return nil, nil, false } // Create a buffer with the buffer size. // TODO(jsternberg): Calculate the exact size with max points as the maximum. startB.Resize(storage.MaxPointsPerBlock) stopB.Resize(storage.MaxPointsPerBlock) for ; ; t.windowBounds = t.window.NextBounds(t.windowBounds) { startT, stopT := t.getWindowBoundsFor(t.windowBounds) if startT >= int64(t.bounds.Stop) { break } startB.Append(startT) stopB.Append(stopT) } start = startB.NewIntArray() stop = stopB.NewIntArray() return start, stop, true } // Retrieve the next buffer so we can copy the timestamps. if !t.nextBuffer() { return nil, nil, false } // Copy over the timestamps from the next buffer and adjust // times for the boundaries. startB.Resize(len(t.arr.Timestamps)) stopB.Resize(len(t.arr.Timestamps)) for _, stopT := range t.arr.Timestamps { bounds := t.window.PrevBounds(t.window.GetLatestBounds(values.Time(stopT))) startT, stopT := t.getWindowBoundsFor(bounds) startB.Append(startT) stopB.Append(stopT) } start = startB.NewIntArray() stop = stopB.NewIntArray() return start, stop, true } func (t *booleanWindowTable) getWindowBoundsFor(bounds interval.Bounds) (int64, int64) { beg := int64(bounds.Start()) end := int64(bounds.Stop()) if beg < int64(t.bounds.Start) { beg = int64(t.bounds.Start) } if end > int64(t.bounds.Stop) { end = int64(t.bounds.Stop) } return beg, end } // nextAt will retrieve the next value that can be used with // the given stop timestamp. If no values can be used with the timestamp, // it will return the default value and false. func (t *booleanWindowTable) nextAt(stop int64) (v bool, ok bool) { if !t.nextBuffer() { return } else if !t.isInWindow(stop, t.arr.Timestamps[t.idxInArr]) { return } v, ok = t.arr.Values[t.idxInArr], true t.idxInArr++ return v, ok } // isInWindow will check if the given time may be used within the window // denoted by the stop timestamp. The stop may be a truncated stop time // because of a restricted boundary. // // When used with an aggregate, ts will be the true stop time returned // by storage. When used with an aggregate, it will be the real time // for the point. func (t *booleanWindowTable) isInWindow(stop int64, ts int64) bool { // Retrieve the boundary associated with this stop time. // This will be the boundary for the previous nanosecond. bounds := t.window.GetLatestBounds(values.Time(stop - 1)) start, stop := int64(bounds.Start()), int64(bounds.Stop()) // For an aggregate, the timestamp will be the stop time of the boundary. if t.isAggregate { return start < ts && ts <= stop } // For a selector, the timestamp should be within the boundary. return start <= ts && ts < stop } // nextBuffer will ensure the array cursor is filled // and will return true if there is at least one value // that can be read from it. func (t *booleanWindowTable) nextBuffer() bool { // Discard the current array cursor if we have // exceeded it. if t.arr != nil && t.idxInArr >= t.arr.Len() { t.arr = nil } // Retrieve the next array cursor if needed. if t.arr == nil { arr := t.cur.Next() if arr.Len() == 0 { return false } t.arr, t.idxInArr = arr, 0 } return true } // appendValues will scan the timestamps and append values // that match those timestamps from the buffer. func (t *booleanWindowTable) appendValues(intervals []int64, appendValue func(v bool), appendNull func()) { for i := 0; i < len(intervals); i++ { if v, ok := t.nextAt(intervals[i]); ok { appendValue(v) continue } appendNull() } } func (t *booleanWindowTable) advance() bool { if !t.nextBuffer() { return false } // Create the timestamps for the next window. start, stop, ok := t.createNextBufferTimes() if !ok { return false } values := t.mergeValues(stop.Int64Values()) // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. cr := t.allocateBuffer(stop.Len()) if t.timeColumn != "" { switch t.timeColumn { case execute.DefaultStopColLabel: cr.cols[timeColIdx] = stop start.Release() case execute.DefaultStartColLabel: cr.cols[timeColIdx] = start stop.Release() } cr.cols[valueColIdx] = values t.appendBounds(cr) } else { cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[valueColIdxWithoutTime] = values } t.appendTags(cr) return true } // This table implementation will not have any empty windows. type booleanWindowSelectorTable struct { booleanTable timeColumn string window interval.Window } func newBooleanWindowSelectorTable( done chan struct{}, cur cursors.BooleanArrayCursor, bounds execute.Bounds, window interval.Window, timeColumn string, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *booleanWindowSelectorTable { t := &booleanWindowSelectorTable{ booleanTable: booleanTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, window: window, timeColumn: timeColumn, } t.readTags(tags) t.init(t.advance) return t } func (t *booleanWindowSelectorTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *booleanWindowSelectorTable) advance() bool { arr := t.cur.Next() if arr.Len() == 0 { return false } cr := t.allocateBuffer(arr.Len()) switch t.timeColumn { case execute.DefaultStartColLabel: cr.cols[timeColIdx] = t.startTimes(arr) t.appendBounds(cr) case execute.DefaultStopColLabel: cr.cols[timeColIdx] = t.stopTimes(arr) t.appendBounds(cr) default: cr.cols[startColIdx] = t.startTimes(arr) cr.cols[stopColIdx] = t.stopTimes(arr) cr.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) } cr.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(cr) return true } func (t *booleanWindowSelectorTable) startTimes(arr *cursors.BooleanArray) *array.Int { start := arrow.NewIntBuilder(t.alloc) start.Resize(arr.Len()) rangeStart := int64(t.bounds.Start) for _, v := range arr.Timestamps { if windowStart := int64(t.window.GetLatestBounds(values.Time(v)).Start()); windowStart < rangeStart { start.Append(rangeStart) } else { start.Append(windowStart) } } return start.NewIntArray() } func (t *booleanWindowSelectorTable) stopTimes(arr *cursors.BooleanArray) *array.Int { stop := arrow.NewIntBuilder(t.alloc) stop.Resize(arr.Len()) rangeStop := int64(t.bounds.Stop) for _, v := range arr.Timestamps { if windowStop := int64(t.window.GetLatestBounds(values.Time(v)).Stop()); windowStop > rangeStop { stop.Append(rangeStop) } else { stop.Append(windowStop) } } return stop.NewIntArray() } // This table implementation may contain empty windows // in addition to non-empty windows. type booleanEmptyWindowSelectorTable struct { booleanTable arr *cursors.BooleanArray idx int rangeStart int64 rangeStop int64 windowBounds interval.Bounds timeColumn string window interval.Window } func newBooleanEmptyWindowSelectorTable( done chan struct{}, cur cursors.BooleanArrayCursor, bounds execute.Bounds, window interval.Window, timeColumn string, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *booleanEmptyWindowSelectorTable { rangeStart := int64(bounds.Start) rangeStop := int64(bounds.Stop) t := &booleanEmptyWindowSelectorTable{ booleanTable: booleanTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, arr: cur.Next(), rangeStart: rangeStart, rangeStop: rangeStop, windowBounds: window.GetLatestBounds(values.Time(rangeStart)), window: window, timeColumn: timeColumn, } t.readTags(tags) t.init(t.advance) return t } func (t *booleanEmptyWindowSelectorTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *booleanEmptyWindowSelectorTable) advance() bool { if t.arr.Len() == 0 { return false } values := t.arrowBuilder() values.Resize(storage.MaxPointsPerBlock) var cr *colReader switch t.timeColumn { case execute.DefaultStartColLabel: start := t.startTimes(values) cr = t.allocateBuffer(start.Len()) cr.cols[timeColIdx] = start t.appendBounds(cr) case execute.DefaultStopColLabel: stop := t.stopTimes(values) cr = t.allocateBuffer(stop.Len()) cr.cols[timeColIdx] = stop t.appendBounds(cr) default: start, stop, time := t.startStopTimes(values) cr = t.allocateBuffer(time.Len()) cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[timeColIdx] = time } cr.cols[valueColIdx] = values.NewBooleanArray() t.appendTags(cr) return true } func (t *booleanEmptyWindowSelectorTable) startTimes(builder *array.BooleanBuilder) *array.Int { start := arrow.NewIntBuilder(t.alloc) start.Resize(storage.MaxPointsPerBlock) for int64(t.windowBounds.Start()) < t.rangeStop { // The first window should start at the // beginning of the time range. if int64(t.windowBounds.Start()) < t.rangeStart { start.Append(t.rangeStart) } else { start.Append(int64(t.windowBounds.Start())) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) { t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { builder.AppendNull() } t.windowBounds = t.window.NextBounds(t.windowBounds) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if start.Len() == storage.MaxPointsPerBlock { break } } return start.NewIntArray() } func (t *booleanEmptyWindowSelectorTable) stopTimes(builder *array.BooleanBuilder) *array.Int { stop := arrow.NewIntBuilder(t.alloc) stop.Resize(storage.MaxPointsPerBlock) for int64(t.windowBounds.Start()) < t.rangeStop { // The last window should stop at the end of // the time range. if int64(t.windowBounds.Stop()) > t.rangeStop { stop.Append(t.rangeStop) } else { stop.Append(int64(t.windowBounds.Stop())) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) { t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { builder.AppendNull() } t.windowBounds = t.window.NextBounds(t.windowBounds) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if stop.Len() == storage.MaxPointsPerBlock { break } } return stop.NewIntArray() } func (t *booleanEmptyWindowSelectorTable) startStopTimes(builder *array.BooleanBuilder) (*array.Int, *array.Int, *array.Int) { start := arrow.NewIntBuilder(t.alloc) start.Resize(storage.MaxPointsPerBlock) stop := arrow.NewIntBuilder(t.alloc) stop.Resize(storage.MaxPointsPerBlock) time := arrow.NewIntBuilder(t.alloc) time.Resize(storage.MaxPointsPerBlock) for int64(t.windowBounds.Start()) < t.rangeStop { // The first window should start at the // beginning of the time range. if int64(t.windowBounds.Start()) < t.rangeStart { start.Append(t.rangeStart) } else { start.Append(int64(t.windowBounds.Start())) } // The last window should stop at the end of // the time range. if int64(t.windowBounds.Stop()) > t.rangeStop { stop.Append(t.rangeStop) } else { stop.Append(int64(t.windowBounds.Stop())) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if int64(t.windowBounds.Start()) <= v && v < int64(t.windowBounds.Stop()) { time.Append(v) t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { time.AppendNull() builder.AppendNull() } t.windowBounds = t.window.NextBounds(t.windowBounds) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if time.Len() == storage.MaxPointsPerBlock { break } } return start.NewIntArray(), stop.NewIntArray(), time.NewIntArray() } // group table type booleanGroupTable struct { table mu sync.Mutex gc storage.GroupCursor cur cursors.BooleanArrayCursor } func newBooleanGroupTable( done chan struct{}, gc storage.GroupCursor, cur cursors.BooleanArrayCursor, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *booleanGroupTable { t := &booleanGroupTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), gc: gc, cur: cur, } t.readTags(tags) t.init(t.advance) return t } func (t *booleanGroupTable) Close() { t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } if t.gc != nil { t.gc.Close() t.gc = nil } t.mu.Unlock() } func (t *booleanGroupTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *booleanGroupTable) advance() bool { if t.cur == nil { // For group aggregates, we will try to get all the series and all table buffers within those series // all at once and merge them into one row when this advance() function is first called. // At the end of this process, t.advanceCursor() already returns false and t.cur becomes nil. // But we still need to return true to indicate that there is data to be returned. // The second time when we call this advance(), t.cur is already nil, so we directly return false. return false } var arr *cursors.BooleanArray var len int for { arr = t.cur.Next() len = arr.Len() if len > 0 { break } if !t.advanceCursor() { return false } } // handle the group without aggregate case if t.gc.Aggregate() == nil { // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. colReader := t.allocateBuffer(len) colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(colReader) t.appendBounds(colReader) return true } aggregate, err := makeBooleanAggregateAccumulator(t.gc.Aggregate().Type) if err != nil { t.err = err return false } aggregate.AccumulateFirst(arr.Timestamps, arr.Values, t.tags) for { arr = t.cur.Next() if arr.Len() > 0 { aggregate.AccumulateMore(arr.Timestamps, arr.Values, t.tags) continue } if !t.advanceCursor() { break } } timestamp, value, tags := aggregate.Result() colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer([]bool{value}) } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]bool{value}) } t.appendTheseTags(colReader, tags) t.appendBounds(colReader) return true } type BooleanAggregateAccumulator interface { // AccumulateFirst receives an initial array of items to select from. // It selects an item and stores the state. Afterwards, more data can // be supplied with AccumulateMore and the results can be requested at // any time. Without a call to AccumulateFirst the results are not // defined. AccumulateFirst(timestamps []int64, values []bool, tags [][]byte) // AccumulateMore receives additional array elements to select from. AccumulateMore(timestamps []int64, values []bool, tags [][]byte) // Result returns the item selected from the data received so far. Result() (int64, bool, [][]byte) } // The selector method takes a ( timestamp, value ) pair, a // ( []timestamp, []value ) pair, and a starting index. It applies the selector // to the single value and the array, starting at the supplied index. It // returns -1 if the single value is selected and a non-negative value if an // item from the array is selected. type booleanSelectorMethod func(int64, bool, []int64, []bool, int) int // The selector accumulator tracks currently-selected item. type booleanSelectorAccumulator struct { selector booleanSelectorMethod ts int64 v bool tags [][]byte } func (a *booleanSelectorAccumulator) AccumulateFirst(timestamps []int64, values []bool, tags [][]byte) { index := a.selector(timestamps[0], values[0], timestamps, values, 1) if index < 0 { a.ts = timestamps[0] a.v = values[0] } else { a.ts = timestamps[index] a.v = values[index] } a.tags = make([][]byte, len(tags)) copy(a.tags, tags) } func (a *booleanSelectorAccumulator) AccumulateMore(timestamps []int64, values []bool, tags [][]byte) { index := a.selector(a.ts, a.v, timestamps, values, 0) if index >= 0 { a.ts = timestamps[index] a.v = values[index] if len(tags) > cap(a.tags) { a.tags = make([][]byte, len(tags)) } else { a.tags = a.tags[:len(tags)] } copy(a.tags, tags) } } func (a *booleanSelectorAccumulator) Result() (int64, bool, [][]byte) { return a.ts, a.v, a.tags } // makeBooleanAggregateAccumulator returns the interface implementation for // aggregating returned points within the same group. The incoming points are // the ones returned for each series and the struct returned here will // aggregate the aggregates. func makeBooleanAggregateAccumulator(agg datatypes.Aggregate_AggregateType) (BooleanAggregateAccumulator, error) { switch agg { case datatypes.Aggregate_AggregateTypeFirst: return &booleanSelectorAccumulator{selector: selectorFirstGroupsBoolean}, nil case datatypes.Aggregate_AggregateTypeLast: return &booleanSelectorAccumulator{selector: selectorLastGroupsBoolean}, nil case datatypes.Aggregate_AggregateTypeCount: return nil, &errors.Error{ Code: errors.EInvalid, Msg: "unsupported for aggregate count: Boolean", } case datatypes.Aggregate_AggregateTypeSum: return nil, &errors.Error{ Code: errors.EInvalid, Msg: "unsupported for aggregate sum: Boolean", } case datatypes.Aggregate_AggregateTypeMin: return nil, &errors.Error{ Code: errors.EInvalid, Msg: "unsupported for aggregate min: Boolean", } case datatypes.Aggregate_AggregateTypeMax: return nil, &errors.Error{ Code: errors.EInvalid, Msg: "unsupported for aggregate max: Boolean", } default: return nil, &errors.Error{ Code: errors.EInvalid, Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg), } } } func selectorFirstGroupsBoolean(ts int64, v bool, timestamps []int64, values []bool, i int) int { index := -1 for ; i < len(values); i++ { if ts > timestamps[i] { index = i ts = timestamps[i] } } return index } func selectorLastGroupsBoolean(ts int64, v bool, timestamps []int64, values []bool, i int) int { index := -1 for ; i < len(values); i++ { if ts <= timestamps[i] { index = i ts = timestamps[i] } } return index } func (t *booleanGroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { cur := t.gc.Cursor() if cur == nil { continue } if typedCur, ok := cur.(cursors.BooleanArrayCursor); !ok { // TODO(sgc): error or skip? cur.Close() t.err = &errors.Error{ Code: errors.EInvalid, Err: &GroupCursorError{ typ: "boolean", cursor: cur, }, } return false } else { t.readTags(t.gc.Tags()) t.cur = typedCur return true } } return false } func (t *booleanGroupTable) Statistics() cursors.CursorStats { if t.cur == nil { return cursors.CursorStats{} } cs := t.cur.Stats() return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } }