package storageflux import ( "fmt" "math" "sync" "github.com/apache/arrow/go/arrow/array" "github.com/influxdata/flux" "github.com/influxdata/flux/arrow" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/values" "github.com/influxdata/influxdb/v2" "github.com/influxdata/influxdb/v2/models" "github.com/influxdata/influxdb/v2/storage/reads/datatypes" storage "github.com/influxdata/influxdb/v2/storage/reads" "github.com/influxdata/influxdb/v2/tsdb/cursors" ) {{range .}} // // *********** {{.Name}} *********** // type {{.name}}Table struct { table mu sync.Mutex cur cursors.{{.Name}}ArrayCursor alloc *memory.Allocator } func new{{.Name}}Table( done chan struct{}, cur cursors.{{.Name}}ArrayCursor, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *{{.name}}Table { t := &{{.name}}Table{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, } t.readTags(tags) t.init(t.advance) return t } func (t *{{.name}}Table) Close() { t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } t.mu.Unlock() } func (t *{{.name}}Table) Statistics() cursors.CursorStats { t.mu.Lock() defer t.mu.Unlock() cur := t.cur if cur == nil { return cursors.CursorStats{} } cs := cur.Stats() return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } } func (t *{{.name}}Table) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *{{.name}}Table) advance() bool { a := t.cur.Next() l := a.Len() if l == 0 { return false } // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. cr := t.allocateBuffer(l) cr.cols[timeColIdx] = arrow.NewInt(a.Timestamps, t.alloc) cr.cols[valueColIdx] = t.toArrowBuffer(a.Values) t.appendTags(cr) t.appendBounds(cr) return true } // window table type {{.name}}WindowTable struct { {{.name}}Table arr *cursors.{{.Name}}Array nextTS int64 idxInArr int createEmpty bool timeColumn string window execute.Window {{if eq .Name "Integer"}}fillValue *{{.Type}}{{end}} } func new{{.Name}}WindowTable( done chan struct{}, cur cursors.{{.Name}}ArrayCursor, bounds execute.Bounds, window execute.Window, createEmpty bool, timeColumn string, {{if eq .Name "Integer"}}fillValue *{{.Type}},{{end}} key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *{{.name}}WindowTable { t := &{{.name}}WindowTable{ {{.name}}Table: {{.name}}Table{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, window: window, createEmpty: createEmpty, timeColumn: timeColumn, {{if eq .Name "Integer"}}fillValue: fillValue,{{end}} } if t.createEmpty { start := int64(bounds.Start) t.nextTS = int64(window.GetEarliestBounds(values.Time(start)).Stop) } t.readTags(tags) t.init(t.advance) return t } func (t *{{.name}}WindowTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } // createNextBufferTimes will read the timestamps from the array // cursor and construct the values for the next buffer. func (t *{{.name}}WindowTable) createNextBufferTimes() (start, stop *array.Int64, ok bool) { startB := arrow.NewIntBuilder(t.alloc) stopB := arrow.NewIntBuilder(t.alloc) if t.createEmpty { // There are no more windows when the start time is greater // than or equal to the stop time. subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) if startT := int64(values.Time(t.nextTS).Add(subEvery)); startT >= int64(t.bounds.Stop) { return nil, nil, false } // Create a buffer with the buffer size. // TODO(jsternberg): Calculate the exact size with max points as the maximum. startB.Resize(storage.MaxPointsPerBlock) stopB.Resize(storage.MaxPointsPerBlock) for ; ; t.nextTS = int64(values.Time(t.nextTS).Add(t.window.Every)) { startT, stopT := t.getWindowBoundsFor(t.nextTS) if startT >= int64(t.bounds.Stop) { break } startB.Append(startT) stopB.Append(stopT) } start = startB.NewInt64Array() stop = stopB.NewInt64Array() return start, stop, true } // Retrieve the next buffer so we can copy the timestamps. if !t.nextBuffer() { return nil, nil, false } // Copy over the timestamps from the next buffer and adjust // times for the boundaries. startB.Resize(len(t.arr.Timestamps)) stopB.Resize(len(t.arr.Timestamps)) for _, stopT := range t.arr.Timestamps { startT, stopT := t.getWindowBoundsFor(stopT) startB.Append(startT) stopB.Append(stopT) } start = startB.NewInt64Array() stop = stopB.NewInt64Array() return start, stop, true } func (t *{{.name}}WindowTable) getWindowBoundsFor(ts int64) (startT, stopT int64) { subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) startT, stopT = int64(values.Time(ts).Add(subEvery)), ts if startT < int64(t.bounds.Start) { startT = int64(t.bounds.Start) } if stopT > int64(t.bounds.Stop) { stopT = int64(t.bounds.Stop) } return startT, stopT } // nextAt will retrieve the next value that can be used with // the given stop timestamp. If no values can be used with the timestamp, // it will return the default value and false. func (t *{{.name}}WindowTable) nextAt(ts int64) (v {{.Type}}, ok bool) { if !t.nextBuffer() { return } else if !t.isInWindow(ts, t.arr.Timestamps[t.idxInArr]) { return } v, ok = t.arr.Values[t.idxInArr], true t.idxInArr++ return v, ok } // isInWindow will check if the given time at stop can be used within // the window stop time for ts. The ts may be a truncated stop time // because of a restricted boundary while stop will be the true // stop time returned by storage. func (t *{{.name}}WindowTable) isInWindow(ts int64, stop int64) bool { // This method checks if the stop time is a valid stop time for // that interval. This calculation is different from the calculation // of the window itself. For example, for a 10 second window that // starts at 20 seconds, we would include points between [20, 30). // The stop time for this interval would be 30, but because the stop // time can be truncated, valid stop times range from anywhere between // (20, 30]. The storage engine will always produce 30 as the end time // but we may have truncated the stop time because of the boundary // and this is why we are checking for this range instead of checking // if the two values are equal. subEvery := values.MakeDuration(t.window.Every.Nanoseconds(), t.window.Every.Months(), t.window.Every.IsPositive()) start := int64(values.Time(stop).Add(subEvery)) return start < ts && ts <= stop } // nextBuffer will ensure the array cursor is filled // and will return true if there is at least one value // that can be read from it. func (t *{{.name}}WindowTable) nextBuffer() bool { // Discard the current array cursor if we have // exceeded it. if t.arr != nil && t.idxInArr >= t.arr.Len() { t.arr = nil } // Retrieve the next array cursor if needed. if t.arr == nil { arr := t.cur.Next() if arr.Len() == 0 { return false } t.arr, t.idxInArr = arr, 0 } return true } // appendValues will scan the timestamps and append values // that match those timestamps from the buffer. func (t *{{.name}}WindowTable) appendValues(intervals []int64, appendValue func(v {{.Type}}), appendNull func()) { for i := 0; i < len(intervals); i++ { if v, ok := t.nextAt(intervals[i]); ok { appendValue(v) continue } appendNull() } } func (t *{{.name}}WindowTable) advance() bool { if !t.nextBuffer() { return false } // Create the timestamps for the next window. start, stop, ok := t.createNextBufferTimes() if !ok { return false } values := t.mergeValues(stop.Int64Values()) // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. cr := t.allocateBuffer(stop.Len()) if t.timeColumn != "" { switch t.timeColumn { case execute.DefaultStopColLabel: cr.cols[timeColIdx] = stop start.Release() case execute.DefaultStartColLabel: cr.cols[timeColIdx] = start stop.Release() } cr.cols[valueColIdx] = values t.appendBounds(cr) } else { cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[valueColIdxWithoutTime] = values } t.appendTags(cr) return true } // This table implementation will not have any empty windows. type {{.name}}WindowSelectorTable struct { {{.name}}Table timeColumn string window execute.Window } func new{{.Name}}WindowSelectorTable( done chan struct{}, cur cursors.{{.Name}}ArrayCursor, bounds execute.Bounds, window execute.Window, timeColumn string, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *{{.name}}WindowSelectorTable { t := &{{.name}}WindowSelectorTable{ {{.name}}Table: {{.name}}Table{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, window: window, timeColumn: timeColumn, } t.readTags(tags) t.init(t.advance) return t } func (t *{{.name}}WindowSelectorTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *{{.name}}WindowSelectorTable) advance() bool { arr := t.cur.Next() if arr.Len() == 0 { return false } cr := t.allocateBuffer(arr.Len()) switch t.timeColumn { case execute.DefaultStartColLabel: cr.cols[timeColIdx] = t.startTimes(arr) t.appendBounds(cr) case execute.DefaultStopColLabel: cr.cols[timeColIdx] = t.stopTimes(arr) t.appendBounds(cr) default: cr.cols[startColIdx] = t.startTimes(arr) cr.cols[stopColIdx] = t.stopTimes(arr) cr.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) } cr.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(cr) return true } func (t *{{.name}}WindowSelectorTable) startTimes(arr *cursors.{{.Name}}Array) *array.Int64 { start := arrow.NewIntBuilder(t.alloc) start.Resize(arr.Len()) rangeStart := int64(t.bounds.Start) for _, v := range arr.Timestamps { if windowStart := int64(t.window.GetEarliestBounds(values.Time(v)).Start); windowStart < rangeStart { start.Append(rangeStart) } else { start.Append(windowStart) } } return start.NewInt64Array() } func (t *{{.name}}WindowSelectorTable) stopTimes(arr *cursors.{{.Name}}Array) *array.Int64 { stop := arrow.NewIntBuilder(t.alloc) stop.Resize(arr.Len()) rangeStop := int64(t.bounds.Stop) for _, v := range arr.Timestamps { if windowStop := int64(t.window.GetEarliestBounds(values.Time(v)).Stop); windowStop > rangeStop { stop.Append(rangeStop) } else { stop.Append(windowStop) } } return stop.NewInt64Array() } // This table implementation may contain empty windows // in addition to non-empty windows. type {{.name}}EmptyWindowSelectorTable struct { {{.name}}Table arr *cursors.{{.Name}}Array idx int rangeStart int64 rangeStop int64 windowStart int64 windowStop int64 timeColumn string window execute.Window } func new{{.Name}}EmptyWindowSelectorTable( done chan struct{}, cur cursors.{{.Name}}ArrayCursor, bounds execute.Bounds, window execute.Window, timeColumn string, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *{{.name}}EmptyWindowSelectorTable { rangeStart := int64(bounds.Start) rangeStop := int64(bounds.Stop) t := &{{.name}}EmptyWindowSelectorTable{ {{.name}}Table: {{.name}}Table{ table: newTable(done, bounds, key, cols, defs, cache, alloc), cur: cur, }, arr: cur.Next(), rangeStart: rangeStart, rangeStop: rangeStop, windowStart: int64(window.GetEarliestBounds(values.Time(rangeStart)).Start), windowStop: int64(window.GetEarliestBounds(values.Time(rangeStart)).Stop), window: window, timeColumn: timeColumn, } t.readTags(tags) t.init(t.advance) return t } func (t *{{.name}}EmptyWindowSelectorTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *{{.name}}EmptyWindowSelectorTable) advance() bool { if t.arr.Len() == 0 { return false } values := t.arrowBuilder() values.Resize(storage.MaxPointsPerBlock) var cr *colReader switch t.timeColumn { case execute.DefaultStartColLabel: start := t.startTimes(values) cr = t.allocateBuffer(start.Len()) cr.cols[timeColIdx] = start t.appendBounds(cr) case execute.DefaultStopColLabel: stop := t.stopTimes(values) cr = t.allocateBuffer(stop.Len()) cr.cols[timeColIdx] = stop t.appendBounds(cr) default: start, stop, time := t.startStopTimes(values) cr = t.allocateBuffer(time.Len()) cr.cols[startColIdx] = start cr.cols[stopColIdx] = stop cr.cols[timeColIdx] = time } cr.cols[valueColIdx] = values.New{{.ArrowType}}Array() t.appendTags(cr) return true } func (t *{{.name}}EmptyWindowSelectorTable) startTimes(builder *array.{{.ArrowType}}Builder) *array.Int64 { start := arrow.NewIntBuilder(t.alloc) start.Resize(storage.MaxPointsPerBlock) for t.windowStart < t.rangeStop { // The first window should start at the // beginning of the time range. if t.windowStart < t.rangeStart { start.Append(t.rangeStart) } else { start.Append(t.windowStart) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if t.windowStart <= v && v < t.windowStop { t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { builder.AppendNull() } t.windowStart = int64(values.Time(t.windowStart).Add(t.window.Every)) t.windowStop = int64(values.Time(t.windowStop).Add(t.window.Every)) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if start.Len() == storage.MaxPointsPerBlock { break } } return start.NewInt64Array() } func (t *{{.name}}EmptyWindowSelectorTable) stopTimes(builder *array.{{.ArrowType}}Builder) *array.Int64 { stop := arrow.NewIntBuilder(t.alloc) stop.Resize(storage.MaxPointsPerBlock) for t.windowStart < t.rangeStop { // The last window should stop at the end of // the time range. if t.windowStop > t.rangeStop { stop.Append(t.rangeStop) } else { stop.Append(t.windowStop) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if t.windowStart <= v && v < t.windowStop { t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { builder.AppendNull() } t.windowStart = int64(values.Time(t.windowStart).Add(t.window.Every)) t.windowStop = int64(values.Time(t.windowStop).Add(t.window.Every)) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if stop.Len() == storage.MaxPointsPerBlock { break } } return stop.NewInt64Array() } func (t *{{.name}}EmptyWindowSelectorTable) startStopTimes(builder *array.{{.ArrowType}}Builder) (*array.Int64, *array.Int64, *array.Int64) { start := arrow.NewIntBuilder(t.alloc) start.Resize(storage.MaxPointsPerBlock) stop := arrow.NewIntBuilder(t.alloc) stop.Resize(storage.MaxPointsPerBlock) time := arrow.NewIntBuilder(t.alloc) time.Resize(storage.MaxPointsPerBlock) for t.windowStart < t.rangeStop { // The first window should start at the // beginning of the time range. if t.windowStart < t.rangeStart { start.Append(t.rangeStart) } else { start.Append(t.windowStart) } // The last window should stop at the end of // the time range. if t.windowStop > t.rangeStop { stop.Append(t.rangeStop) } else { stop.Append(t.windowStop) } var v int64 if t.arr.Len() == 0 { v = math.MaxInt64 } else { v = t.arr.Timestamps[t.idx] } // If the current timestamp falls within the // current window, append the value to the // builder, otherwise append a null value. if t.windowStart <= v && v < t.windowStop { time.Append(v) t.append(builder, t.arr.Values[t.idx]) t.idx++ } else { time.AppendNull() builder.AppendNull() } t.windowStart = int64(values.Time(t.windowStart).Add(t.window.Every)) t.windowStop = int64(values.Time(t.windowStop).Add(t.window.Every)) // If the current array is non-empty and has // been read in its entirety, call Next(). if t.arr.Len() > 0 && t.idx == t.arr.Len() { t.arr = t.cur.Next() t.idx = 0 } if time.Len() == storage.MaxPointsPerBlock { break } } return start.NewInt64Array(), stop.NewInt64Array(), time.NewInt64Array() } // group table type {{.name}}GroupTable struct { table mu sync.Mutex gc storage.GroupCursor cur cursors.{{.Name}}ArrayCursor } func new{{.Name}}GroupTable( done chan struct{}, gc storage.GroupCursor, cur cursors.{{.Name}}ArrayCursor, bounds execute.Bounds, key flux.GroupKey, cols []flux.ColMeta, tags models.Tags, defs [][]byte, cache *tagsCache, alloc *memory.Allocator, ) *{{.name}}GroupTable { t := &{{.name}}GroupTable{ table: newTable(done, bounds, key, cols, defs, cache, alloc), gc: gc, cur: cur, } t.readTags(tags) t.init(t.advance) return t } func (t *{{.name}}GroupTable) Close() { t.mu.Lock() if t.cur != nil { t.cur.Close() t.cur = nil } if t.gc != nil { t.gc.Close() t.gc = nil } t.mu.Unlock() } func (t *{{.name}}GroupTable) Do(f func(flux.ColReader) error) error { return t.do(f, t.advance) } func (t *{{.name}}GroupTable) advance() bool { if t.cur == nil { // For group aggregates, we will try to get all the series and all table buffers within those series // all at once and merge them into one row when this advance() function is first called. // At the end of this process, t.advanceCursor() already returns false and t.cur becomes nil. // But we still need to return true to indicate that there is data to be returned. // The second time when we call this advance(), t.cur is already nil, so we directly return false. return false } var arr *cursors.{{.Name}}Array var len int for { arr = t.cur.Next() len = arr.Len() if len > 0 { break } if !t.advanceCursor() { return false } } // handle the group without aggregate case if t.gc.Aggregate() == nil { // Retrieve the buffer for the data to avoid allocating // additional slices. If the buffer is still being used // because the references were retained, then we will // allocate a new buffer. colReader := t.allocateBuffer(len) colReader.cols[timeColIdx] = arrow.NewInt(arr.Timestamps, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer(arr.Values) t.appendTags(colReader) t.appendBounds(colReader) return true } aggregate, err := determine{{.Name}}AggregateMethod(t.gc.Aggregate().Type) if err != nil { t.err = err return false } ts, v := aggregate(arr.Timestamps, arr.Values) timestamps, values := []int64{ts}, []{{.Type}}{v} for { arr = t.cur.Next() if arr.Len() > 0 { ts, v := aggregate(arr.Timestamps, arr.Values) timestamps = append(timestamps, ts) values = append(values, v) continue } if !t.advanceCursor() { break } } timestamp, value := aggregate(timestamps, values) colReader := t.allocateBuffer(1) if IsSelector(t.gc.Aggregate()) { colReader.cols[timeColIdx] = arrow.NewInt([]int64{timestamp}, t.alloc) colReader.cols[valueColIdx] = t.toArrowBuffer([]{{.Type}}{value}) } else { colReader.cols[valueColIdxWithoutTime] = t.toArrowBuffer([]{{.Type}}{value}) } t.appendTags(colReader) t.appendBounds(colReader) return true } type {{.name}}AggregateMethod func([]int64, []{{.Type}}) (int64, {{.Type}}) // determine{{.Name}}AggregateMethod returns the method for aggregating // returned points within the same group. The incoming points are the // ones returned for each series and the method returned here will // aggregate the aggregates. func determine{{.Name}}AggregateMethod(agg datatypes.Aggregate_AggregateType) ({{.name}}AggregateMethod, error){ switch agg { case datatypes.AggregateTypeFirst: return aggregateFirstGroups{{.Name}}, nil case datatypes.AggregateTypeLast: return aggregateLastGroups{{.Name}}, nil case datatypes.AggregateTypeCount: {{if eq .Name "Integer"}} return aggregateCountGroups{{.Name}}, nil {{else}} return nil, &influxdb.Error { Code: influxdb.EInvalid, Msg: "unsupported for aggregate count: {{.Name}}", } {{end}} case datatypes.AggregateTypeSum: {{if and (ne .Name "Boolean") (ne .Name "String")}} return aggregateSumGroups{{.Name}}, nil {{else}} return nil, &influxdb.Error { Code: influxdb.EInvalid, Msg: "unsupported for aggregate sum: {{.Name}}", } {{end}} case datatypes.AggregateTypeMin: {{if and (ne .Name "Boolean") (ne .Name "String")}} return aggregateMinGroups{{.Name}}, nil {{else}} return nil, &influxdb.Error { Code: influxdb.EInvalid, Msg: "unsupported for aggregate min: {{.Name}}", } {{end}} case datatypes.AggregateTypeMax: {{if and (ne .Name "Boolean") (ne .Name "String")}} return aggregateMaxGroups{{.Name}}, nil {{else}} return nil, &influxdb.Error { Code: influxdb.EInvalid, Msg: "unsupported for aggregate max: {{.Name}}", } {{end}} default: return nil, &influxdb.Error { Code: influxdb.EInvalid, Msg: fmt.Sprintf("unknown/unimplemented aggregate type: %v", agg), } } } {{if and (ne .Name "Boolean") (ne .Name "String")}} func aggregateMinGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { value := values[0] timestamp := timestamps[0] for i := 1; i < len(values); i++ { if value > values[i] { value = values[i] timestamp = timestamps[i] } } return timestamp, value } {{end}} {{if and (ne .Name "Boolean") (ne .Name "String")}} func aggregateMaxGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { value := values[0] timestamp := timestamps[0] for i := 1; i < len(values); i++ { if value < values[i] { value = values[i] timestamp = timestamps[i] } } return timestamp, value } {{end}} // For group count and sum, the timestamp here is always math.MaxInt64. // their final result does not contain _time, so this timestamp value can be anything // and it won't matter. {{if eq .Name "Integer"}} func aggregateCountGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { return aggregateSumGroups{{.Name}}(timestamps, values) } {{end}} {{if and (ne .Name "Boolean") (ne .Name "String")}} func aggregateSumGroups{{.Name}}(_ []int64, values []{{.Type}}) (int64, {{.Type}}) { var sum {{.Type}} for _, v := range values { sum += v } return math.MaxInt64, sum } {{end}} func aggregateFirstGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { value := values[0] timestamp := timestamps[0] for i := 1; i < len(values); i++ { if timestamp > timestamps[i] { value = values[i] timestamp = timestamps[i] } } return timestamp, value } func aggregateLastGroups{{.Name}}(timestamps []int64, values []{{.Type}}) (int64, {{.Type}}) { value := values[0] timestamp := timestamps[0] for i := 1; i < len(values); i++ { if timestamp < timestamps[i] { value = values[i] timestamp = timestamps[i] } } return timestamp, value } func (t *{{.name}}GroupTable) advanceCursor() bool { t.cur.Close() t.cur = nil for t.gc.Next() { cur := t.gc.Cursor() if cur == nil { continue } if typedCur, ok := cur.(cursors.{{.Name}}ArrayCursor); !ok { // TODO(sgc): error or skip? cur.Close() t.err = &influxdb.Error { Code: influxdb.EInvalid, Err: &GroupCursorError { typ: "{{.name}}", cursor: cur, }, } return false } else { t.readTags(t.gc.Tags()) t.cur = typedCur return true } } return false } func (t *{{.name}}GroupTable) Statistics() cursors.CursorStats { if t.cur == nil { return cursors.CursorStats{} } cs := t.cur.Stats() return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } } {{end}}