989 lines
28 KiB
Go
989 lines
28 KiB
Go
package storageflux
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/gogo/protobuf/types"
|
|
"github.com/influxdata/flux"
|
|
"github.com/influxdata/flux/execute"
|
|
"github.com/influxdata/flux/memory"
|
|
"github.com/influxdata/flux/plan"
|
|
"github.com/influxdata/flux/values"
|
|
"github.com/influxdata/influxdb/v2"
|
|
"github.com/influxdata/influxdb/v2/models"
|
|
"github.com/influxdata/influxdb/v2/query"
|
|
storage "github.com/influxdata/influxdb/v2/storage/reads"
|
|
"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
|
|
"github.com/influxdata/influxdb/v2/tsdb/cursors"
|
|
)
|
|
|
|
// GroupCursorError is returned when two different cursor types
|
|
// are read for the same table.
|
|
type GroupCursorError struct {
|
|
typ string
|
|
cursor cursors.Cursor
|
|
}
|
|
|
|
func (err *GroupCursorError) Error() string {
|
|
var got string
|
|
switch err.cursor.(type) {
|
|
case cursors.FloatArrayCursor:
|
|
got = "float"
|
|
case cursors.IntegerArrayCursor:
|
|
got = "integer"
|
|
case cursors.UnsignedArrayCursor:
|
|
got = "unsigned"
|
|
case cursors.StringArrayCursor:
|
|
got = "string"
|
|
case cursors.BooleanArrayCursor:
|
|
got = "boolean"
|
|
default:
|
|
got = "invalid"
|
|
}
|
|
return fmt.Sprintf("schema collision: cannot group %s and %s types together", err.typ, got)
|
|
}
|
|
|
|
type storageTable interface {
|
|
flux.Table
|
|
Close()
|
|
Cancel()
|
|
Statistics() cursors.CursorStats
|
|
}
|
|
|
|
type storeReader struct {
|
|
s storage.Store
|
|
}
|
|
|
|
// NewReader returns a new storageflux reader
|
|
func NewReader(s storage.Store) query.StorageReader {
|
|
return &storeReader{s: s}
|
|
}
|
|
|
|
func (r *storeReader) ReadFilter(ctx context.Context, spec query.ReadFilterSpec, alloc *memory.Allocator) (query.TableIterator, error) {
|
|
return &filterIterator{
|
|
ctx: ctx,
|
|
s: r.s,
|
|
spec: spec,
|
|
cache: newTagsCache(0),
|
|
alloc: alloc,
|
|
}, nil
|
|
}
|
|
|
|
func (r *storeReader) ReadGroup(ctx context.Context, spec query.ReadGroupSpec, alloc *memory.Allocator) (query.TableIterator, error) {
|
|
return &groupIterator{
|
|
ctx: ctx,
|
|
s: r.s,
|
|
spec: spec,
|
|
cache: newTagsCache(0),
|
|
alloc: alloc,
|
|
}, nil
|
|
}
|
|
|
|
func (r *storeReader) ReadWindowAggregate(ctx context.Context, spec query.ReadWindowAggregateSpec, alloc *memory.Allocator) (query.TableIterator, error) {
|
|
return &windowAggregateIterator{
|
|
ctx: ctx,
|
|
s: r.s,
|
|
spec: spec,
|
|
cache: newTagsCache(0),
|
|
alloc: alloc,
|
|
}, nil
|
|
}
|
|
|
|
func (r *storeReader) ReadTagKeys(ctx context.Context, spec query.ReadTagKeysSpec, alloc *memory.Allocator) (query.TableIterator, error) {
|
|
return &tagKeysIterator{
|
|
ctx: ctx,
|
|
bounds: spec.Bounds,
|
|
s: r.s,
|
|
readSpec: spec,
|
|
predicate: spec.Predicate,
|
|
alloc: alloc,
|
|
}, nil
|
|
}
|
|
|
|
func (r *storeReader) ReadTagValues(ctx context.Context, spec query.ReadTagValuesSpec, alloc *memory.Allocator) (query.TableIterator, error) {
|
|
return &tagValuesIterator{
|
|
ctx: ctx,
|
|
bounds: spec.Bounds,
|
|
s: r.s,
|
|
readSpec: spec,
|
|
predicate: spec.Predicate,
|
|
alloc: alloc,
|
|
}, nil
|
|
}
|
|
|
|
func (r *storeReader) Close() {}
|
|
|
|
type filterIterator struct {
|
|
ctx context.Context
|
|
s storage.Store
|
|
spec query.ReadFilterSpec
|
|
stats cursors.CursorStats
|
|
cache *tagsCache
|
|
alloc *memory.Allocator
|
|
}
|
|
|
|
func (fi *filterIterator) Statistics() cursors.CursorStats { return fi.stats }
|
|
|
|
func (fi *filterIterator) Do(f func(flux.Table) error) error {
|
|
src := fi.s.GetSource(
|
|
uint64(fi.spec.OrganizationID),
|
|
uint64(fi.spec.BucketID),
|
|
)
|
|
|
|
// Setup read request
|
|
any, err := types.MarshalAny(src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var req datatypes.ReadFilterRequest
|
|
req.ReadSource = any
|
|
req.Predicate = fi.spec.Predicate
|
|
req.Range.Start = int64(fi.spec.Bounds.Start)
|
|
req.Range.End = int64(fi.spec.Bounds.Stop)
|
|
|
|
rs, err := fi.s.ReadFilter(fi.ctx, &req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if rs == nil {
|
|
return nil
|
|
}
|
|
|
|
return fi.handleRead(f, rs)
|
|
}
|
|
|
|
func (fi *filterIterator) handleRead(f func(flux.Table) error, rs storage.ResultSet) error {
|
|
// these resources must be closed if not nil on return
|
|
var (
|
|
cur cursors.Cursor
|
|
table storageTable
|
|
)
|
|
|
|
defer func() {
|
|
if table != nil {
|
|
table.Close()
|
|
}
|
|
if cur != nil {
|
|
cur.Close()
|
|
}
|
|
rs.Close()
|
|
fi.cache.Release()
|
|
}()
|
|
|
|
READ:
|
|
for rs.Next() {
|
|
cur = rs.Cursor()
|
|
if cur == nil {
|
|
// no data for series key + field combination
|
|
continue
|
|
}
|
|
|
|
bnds := fi.spec.Bounds
|
|
key := defaultGroupKeyForSeries(rs.Tags(), bnds)
|
|
done := make(chan struct{})
|
|
switch typedCur := cur.(type) {
|
|
case cursors.IntegerArrayCursor:
|
|
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt)
|
|
table = newIntegerTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc)
|
|
case cursors.FloatArrayCursor:
|
|
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat)
|
|
table = newFloatTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc)
|
|
case cursors.UnsignedArrayCursor:
|
|
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt)
|
|
table = newUnsignedTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc)
|
|
case cursors.BooleanArrayCursor:
|
|
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool)
|
|
table = newBooleanTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc)
|
|
case cursors.StringArrayCursor:
|
|
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
|
|
table = newStringTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.cache, fi.alloc)
|
|
default:
|
|
panic(fmt.Sprintf("unreachable: %T", typedCur))
|
|
}
|
|
|
|
cur = nil
|
|
|
|
if !table.Empty() {
|
|
if err := f(table); err != nil {
|
|
table.Close()
|
|
table = nil
|
|
return err
|
|
}
|
|
select {
|
|
case <-done:
|
|
case <-fi.ctx.Done():
|
|
table.Cancel()
|
|
break READ
|
|
}
|
|
}
|
|
|
|
stats := table.Statistics()
|
|
fi.stats.ScannedValues += stats.ScannedValues
|
|
fi.stats.ScannedBytes += stats.ScannedBytes
|
|
table.Close()
|
|
table = nil
|
|
}
|
|
return rs.Err()
|
|
}
|
|
|
|
type groupIterator struct {
|
|
ctx context.Context
|
|
s storage.Store
|
|
spec query.ReadGroupSpec
|
|
stats cursors.CursorStats
|
|
cache *tagsCache
|
|
alloc *memory.Allocator
|
|
}
|
|
|
|
func (gi *groupIterator) Statistics() cursors.CursorStats { return gi.stats }
|
|
|
|
func (gi *groupIterator) Do(f func(flux.Table) error) error {
|
|
src := gi.s.GetSource(
|
|
uint64(gi.spec.OrganizationID),
|
|
uint64(gi.spec.BucketID),
|
|
)
|
|
|
|
// Setup read request
|
|
any, err := types.MarshalAny(src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var req datatypes.ReadGroupRequest
|
|
req.ReadSource = any
|
|
req.Predicate = gi.spec.Predicate
|
|
req.Range.Start = int64(gi.spec.Bounds.Start)
|
|
req.Range.End = int64(gi.spec.Bounds.Stop)
|
|
|
|
if len(gi.spec.GroupKeys) > 0 && gi.spec.GroupMode == query.GroupModeNone {
|
|
return &influxdb.Error{
|
|
Code: influxdb.EInternal,
|
|
Msg: "cannot have group mode none with group key values",
|
|
}
|
|
}
|
|
req.Group = convertGroupMode(gi.spec.GroupMode)
|
|
req.GroupKeys = gi.spec.GroupKeys
|
|
|
|
if agg, err := determineAggregateMethod(gi.spec.AggregateMethod); err != nil {
|
|
return err
|
|
} else if agg != datatypes.AggregateTypeNone {
|
|
req.Aggregate = &datatypes.Aggregate{Type: agg}
|
|
}
|
|
|
|
rs, err := gi.s.ReadGroup(gi.ctx, &req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if rs == nil {
|
|
return nil
|
|
}
|
|
return gi.handleRead(f, rs)
|
|
}
|
|
|
|
func (gi *groupIterator) handleRead(f func(flux.Table) error, rs storage.GroupResultSet) error {
|
|
// these resources must be closed if not nil on return
|
|
var (
|
|
gc storage.GroupCursor
|
|
cur cursors.Cursor
|
|
table storageTable
|
|
)
|
|
|
|
defer func() {
|
|
if table != nil {
|
|
table.Close()
|
|
}
|
|
if cur != nil {
|
|
cur.Close()
|
|
}
|
|
if gc != nil {
|
|
gc.Close()
|
|
}
|
|
rs.Close()
|
|
gi.cache.Release()
|
|
}()
|
|
|
|
gc = rs.Next()
|
|
READ:
|
|
for gc != nil {
|
|
for gc.Next() {
|
|
cur = gc.Cursor()
|
|
if cur != nil {
|
|
break
|
|
}
|
|
}
|
|
|
|
if cur == nil {
|
|
gc.Close()
|
|
gc = rs.Next()
|
|
continue
|
|
}
|
|
|
|
bnds := gi.spec.Bounds
|
|
key := groupKeyForGroup(gc.PartitionKeyVals(), &gi.spec, bnds)
|
|
done := make(chan struct{})
|
|
switch typedCur := cur.(type) {
|
|
case cursors.IntegerArrayCursor:
|
|
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TInt, gc.Aggregate(), key)
|
|
table = newIntegerGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc)
|
|
case cursors.FloatArrayCursor:
|
|
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TFloat, gc.Aggregate(), key)
|
|
table = newFloatGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc)
|
|
case cursors.UnsignedArrayCursor:
|
|
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TUInt, gc.Aggregate(), key)
|
|
table = newUnsignedGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc)
|
|
case cursors.BooleanArrayCursor:
|
|
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TBool, gc.Aggregate(), key)
|
|
table = newBooleanGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc)
|
|
case cursors.StringArrayCursor:
|
|
cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString, gc.Aggregate(), key)
|
|
table = newStringGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.cache, gi.alloc)
|
|
default:
|
|
panic(fmt.Sprintf("unreachable: %T", typedCur))
|
|
}
|
|
|
|
// table owns these resources and is responsible for closing them
|
|
cur = nil
|
|
gc = nil
|
|
|
|
if err := f(table); err != nil {
|
|
table.Close()
|
|
table = nil
|
|
return err
|
|
}
|
|
select {
|
|
case <-done:
|
|
case <-gi.ctx.Done():
|
|
table.Cancel()
|
|
break READ
|
|
}
|
|
|
|
stats := table.Statistics()
|
|
gi.stats.ScannedValues += stats.ScannedValues
|
|
gi.stats.ScannedBytes += stats.ScannedBytes
|
|
table.Close()
|
|
table = nil
|
|
|
|
gc = rs.Next()
|
|
}
|
|
return rs.Err()
|
|
}
|
|
|
|
func determineAggregateMethod(agg string) (datatypes.Aggregate_AggregateType, error) {
|
|
if agg == "" {
|
|
return datatypes.AggregateTypeNone, nil
|
|
}
|
|
|
|
if t, ok := datatypes.Aggregate_AggregateType_value[strings.ToUpper(agg)]; ok {
|
|
return datatypes.Aggregate_AggregateType(t), nil
|
|
}
|
|
return 0, fmt.Errorf("unknown aggregate type %q", agg)
|
|
}
|
|
|
|
func convertGroupMode(m query.GroupMode) datatypes.ReadGroupRequest_Group {
|
|
switch m {
|
|
case query.GroupModeNone:
|
|
return datatypes.GroupNone
|
|
case query.GroupModeBy:
|
|
return datatypes.GroupBy
|
|
}
|
|
panic(fmt.Sprint("invalid group mode: ", m))
|
|
}
|
|
|
|
const (
|
|
startColIdx = 0
|
|
stopColIdx = 1
|
|
timeColIdx = 2
|
|
valueColIdxWithoutTime = 2
|
|
valueColIdx = 3
|
|
)
|
|
|
|
func determineTableColsForWindowAggregate(tags models.Tags, typ flux.ColType, hasTimeCol bool) ([]flux.ColMeta, [][]byte) {
|
|
var cols []flux.ColMeta
|
|
var defs [][]byte
|
|
|
|
// aggregates remove the _time column
|
|
size := 3
|
|
if hasTimeCol {
|
|
size++
|
|
}
|
|
cols = make([]flux.ColMeta, size+len(tags))
|
|
defs = make([][]byte, size+len(tags))
|
|
cols[startColIdx] = flux.ColMeta{
|
|
Label: execute.DefaultStartColLabel,
|
|
Type: flux.TTime,
|
|
}
|
|
cols[stopColIdx] = flux.ColMeta{
|
|
Label: execute.DefaultStopColLabel,
|
|
Type: flux.TTime,
|
|
}
|
|
if hasTimeCol {
|
|
cols[timeColIdx] = flux.ColMeta{
|
|
Label: execute.DefaultTimeColLabel,
|
|
Type: flux.TTime,
|
|
}
|
|
cols[valueColIdx] = flux.ColMeta{
|
|
Label: execute.DefaultValueColLabel,
|
|
Type: typ,
|
|
}
|
|
} else {
|
|
cols[valueColIdxWithoutTime] = flux.ColMeta{
|
|
Label: execute.DefaultValueColLabel,
|
|
Type: typ,
|
|
}
|
|
}
|
|
for j, tag := range tags {
|
|
cols[size+j] = flux.ColMeta{
|
|
Label: string(tag.Key),
|
|
Type: flux.TString,
|
|
}
|
|
defs[size+j] = []byte("")
|
|
}
|
|
return cols, defs
|
|
}
|
|
|
|
func determineTableColsForSeries(tags models.Tags, typ flux.ColType) ([]flux.ColMeta, [][]byte) {
|
|
cols := make([]flux.ColMeta, 4+len(tags))
|
|
defs := make([][]byte, 4+len(tags))
|
|
cols[startColIdx] = flux.ColMeta{
|
|
Label: execute.DefaultStartColLabel,
|
|
Type: flux.TTime,
|
|
}
|
|
cols[stopColIdx] = flux.ColMeta{
|
|
Label: execute.DefaultStopColLabel,
|
|
Type: flux.TTime,
|
|
}
|
|
cols[timeColIdx] = flux.ColMeta{
|
|
Label: execute.DefaultTimeColLabel,
|
|
Type: flux.TTime,
|
|
}
|
|
cols[valueColIdx] = flux.ColMeta{
|
|
Label: execute.DefaultValueColLabel,
|
|
Type: typ,
|
|
}
|
|
for j, tag := range tags {
|
|
cols[4+j] = flux.ColMeta{
|
|
Label: string(tag.Key),
|
|
Type: flux.TString,
|
|
}
|
|
defs[4+j] = []byte("")
|
|
}
|
|
return cols, defs
|
|
}
|
|
|
|
func defaultGroupKeyForSeries(tags models.Tags, bnds execute.Bounds) flux.GroupKey {
|
|
cols := make([]flux.ColMeta, 2, len(tags)+2)
|
|
vs := make([]values.Value, 2, len(tags)+2)
|
|
cols[startColIdx] = flux.ColMeta{
|
|
Label: execute.DefaultStartColLabel,
|
|
Type: flux.TTime,
|
|
}
|
|
vs[startColIdx] = values.NewTime(bnds.Start)
|
|
cols[stopColIdx] = flux.ColMeta{
|
|
Label: execute.DefaultStopColLabel,
|
|
Type: flux.TTime,
|
|
}
|
|
vs[stopColIdx] = values.NewTime(bnds.Stop)
|
|
for i := range tags {
|
|
cols = append(cols, flux.ColMeta{
|
|
Label: string(tags[i].Key),
|
|
Type: flux.TString,
|
|
})
|
|
vs = append(vs, values.NewString(string(tags[i].Value)))
|
|
}
|
|
return execute.NewGroupKey(cols, vs)
|
|
}
|
|
|
|
func IsSelector(agg *datatypes.Aggregate) bool {
|
|
if agg == nil {
|
|
return false
|
|
}
|
|
return agg.Type == datatypes.AggregateTypeMin || agg.Type == datatypes.AggregateTypeMax ||
|
|
agg.Type == datatypes.AggregateTypeFirst || agg.Type == datatypes.AggregateTypeLast
|
|
}
|
|
|
|
func determineTableColsForGroup(tagKeys [][]byte, typ flux.ColType, agg *datatypes.Aggregate, groupKey flux.GroupKey) ([]flux.ColMeta, [][]byte) {
|
|
var colSize int
|
|
if agg == nil || IsSelector(agg) {
|
|
// The group without aggregate or with selector (min, max, first, last) case:
|
|
// _start, _stop, _time, _value + tags
|
|
colSize += 4 + len(tagKeys)
|
|
} else {
|
|
// The group aggregate case:
|
|
// Only the group keys + _value are needed.
|
|
// Note that `groupKey` will contain _start, _stop, plus any group columns specified.
|
|
// _start and _stop will always be in the first two slots, see: groupKeyForGroup()
|
|
// For the group aggregate case the output does not contain a _time column.
|
|
|
|
// Also note that if in the future we will add support for mean, then it should also fall onto this branch.
|
|
|
|
colSize = len(groupKey.Cols()) + 1
|
|
}
|
|
|
|
cols := make([]flux.ColMeta, colSize)
|
|
defs := make([][]byte, colSize)
|
|
// No matter this has aggregate, selector, or neither, the first two columns are always _start and _stop
|
|
cols[startColIdx] = flux.ColMeta{
|
|
Label: execute.DefaultStartColLabel,
|
|
Type: flux.TTime,
|
|
}
|
|
cols[stopColIdx] = flux.ColMeta{
|
|
Label: execute.DefaultStopColLabel,
|
|
Type: flux.TTime,
|
|
}
|
|
|
|
if agg == nil || IsSelector(agg) {
|
|
// For the group without aggregate or with selector case:
|
|
cols[timeColIdx] = flux.ColMeta{
|
|
Label: execute.DefaultTimeColLabel,
|
|
Type: flux.TTime,
|
|
}
|
|
cols[valueColIdx] = flux.ColMeta{
|
|
Label: execute.DefaultValueColLabel,
|
|
Type: typ,
|
|
}
|
|
for j, tag := range tagKeys {
|
|
cols[4+j] = flux.ColMeta{
|
|
Label: string(tag),
|
|
Type: flux.TString,
|
|
}
|
|
defs[4+j] = []byte("")
|
|
}
|
|
} else {
|
|
// Aggregate has no _time
|
|
cols[valueColIdxWithoutTime] = flux.ColMeta{
|
|
Label: execute.DefaultValueColLabel,
|
|
Type: typ,
|
|
}
|
|
// From now on, only include group keys that are not _start and _stop.
|
|
// which are already included as the first two columns
|
|
// This highly depends on the implementation of groupKeyForGroup() which
|
|
// put _start and _stop into the first two slots.
|
|
for j := 2; j < len(groupKey.Cols()); j++ {
|
|
// the starting columns index for other group key columns is 3 (1+j)
|
|
cols[1+j] = flux.ColMeta{
|
|
Label: groupKey.Cols()[j].Label,
|
|
Type: groupKey.Cols()[j].Type,
|
|
}
|
|
defs[1+j] = []byte("")
|
|
}
|
|
}
|
|
return cols, defs
|
|
}
|
|
|
|
func groupKeyForGroup(kv [][]byte, spec *query.ReadGroupSpec, bnds execute.Bounds) flux.GroupKey {
|
|
cols := make([]flux.ColMeta, 2, len(spec.GroupKeys)+2)
|
|
vs := make([]values.Value, 2, len(spec.GroupKeys)+2)
|
|
cols[startColIdx] = flux.ColMeta{
|
|
Label: execute.DefaultStartColLabel,
|
|
Type: flux.TTime,
|
|
}
|
|
vs[startColIdx] = values.NewTime(bnds.Start)
|
|
cols[stopColIdx] = flux.ColMeta{
|
|
Label: execute.DefaultStopColLabel,
|
|
Type: flux.TTime,
|
|
}
|
|
vs[stopColIdx] = values.NewTime(bnds.Stop)
|
|
for i := range spec.GroupKeys {
|
|
if spec.GroupKeys[i] == execute.DefaultStartColLabel || spec.GroupKeys[i] == execute.DefaultStopColLabel {
|
|
continue
|
|
}
|
|
cols = append(cols, flux.ColMeta{
|
|
Label: spec.GroupKeys[i],
|
|
Type: flux.TString,
|
|
})
|
|
vs = append(vs, values.NewString(string(kv[i])))
|
|
}
|
|
return execute.NewGroupKey(cols, vs)
|
|
}
|
|
|
|
type windowAggregateIterator struct {
|
|
ctx context.Context
|
|
s storage.Store
|
|
spec query.ReadWindowAggregateSpec
|
|
stats cursors.CursorStats
|
|
cache *tagsCache
|
|
alloc *memory.Allocator
|
|
}
|
|
|
|
func (wai *windowAggregateIterator) Statistics() cursors.CursorStats { return wai.stats }
|
|
|
|
func (wai *windowAggregateIterator) Do(f func(flux.Table) error) error {
|
|
src := wai.s.GetSource(
|
|
uint64(wai.spec.OrganizationID),
|
|
uint64(wai.spec.BucketID),
|
|
)
|
|
|
|
// Setup read request
|
|
any, err := types.MarshalAny(src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var req datatypes.ReadWindowAggregateRequest
|
|
req.ReadSource = any
|
|
req.Predicate = wai.spec.Predicate
|
|
req.Range.Start = int64(wai.spec.Bounds.Start)
|
|
req.Range.End = int64(wai.spec.Bounds.Stop)
|
|
|
|
req.Window = &datatypes.Window{
|
|
Every: &datatypes.Duration{
|
|
Nsecs: wai.spec.Window.Every.Nanoseconds(),
|
|
Months: wai.spec.Window.Every.Months(),
|
|
Negative: wai.spec.Window.Every.IsNegative(),
|
|
},
|
|
Offset: &datatypes.Duration{
|
|
Nsecs: wai.spec.Window.Offset.Nanoseconds(),
|
|
Months: wai.spec.Window.Offset.Months(),
|
|
Negative: wai.spec.Window.Offset.IsNegative(),
|
|
},
|
|
}
|
|
|
|
req.Aggregate = make([]*datatypes.Aggregate, len(wai.spec.Aggregates))
|
|
|
|
for i, aggKind := range wai.spec.Aggregates {
|
|
if agg, err := determineAggregateMethod(string(aggKind)); err != nil {
|
|
return err
|
|
} else if agg != datatypes.AggregateTypeNone {
|
|
req.Aggregate[i] = &datatypes.Aggregate{Type: agg}
|
|
}
|
|
}
|
|
|
|
rs, err := wai.s.WindowAggregate(wai.ctx, &req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if rs == nil {
|
|
return nil
|
|
}
|
|
return wai.handleRead(f, rs)
|
|
}
|
|
|
|
const (
|
|
CountKind = "count"
|
|
SumKind = "sum"
|
|
FirstKind = "first"
|
|
LastKind = "last"
|
|
MinKind = "min"
|
|
MaxKind = "max"
|
|
MeanKind = "mean"
|
|
)
|
|
|
|
// isSelector returns true if given a procedure kind that represents a selector operator.
|
|
func isSelector(kind plan.ProcedureKind) bool {
|
|
return kind == FirstKind || kind == LastKind || kind == MinKind || kind == MaxKind
|
|
}
|
|
|
|
func (wai *windowAggregateIterator) handleRead(f func(flux.Table) error, rs storage.ResultSet) error {
|
|
createEmpty := wai.spec.CreateEmpty
|
|
|
|
selector := len(wai.spec.Aggregates) > 0 && isSelector(wai.spec.Aggregates[0])
|
|
|
|
timeColumn := wai.spec.TimeColumn
|
|
if timeColumn == "" {
|
|
tableFn := f
|
|
f = func(table flux.Table) error {
|
|
return splitWindows(wai.ctx, wai.alloc, table, selector, tableFn)
|
|
}
|
|
}
|
|
|
|
// these resources must be closed if not nil on return
|
|
var (
|
|
cur cursors.Cursor
|
|
table storageTable
|
|
)
|
|
|
|
defer func() {
|
|
if table != nil {
|
|
table.Close()
|
|
}
|
|
if cur != nil {
|
|
cur.Close()
|
|
}
|
|
rs.Close()
|
|
wai.cache.Release()
|
|
}()
|
|
|
|
READ:
|
|
for rs.Next() {
|
|
cur = rs.Cursor()
|
|
if cur == nil {
|
|
// no data for series key + field combination
|
|
continue
|
|
}
|
|
|
|
bnds := wai.spec.Bounds
|
|
key := defaultGroupKeyForSeries(rs.Tags(), bnds)
|
|
done := make(chan struct{})
|
|
hasTimeCol := timeColumn != ""
|
|
switch typedCur := cur.(type) {
|
|
case cursors.IntegerArrayCursor:
|
|
if !selector {
|
|
var fillValue *int64
|
|
if isAggregateCount(wai.spec.Aggregates[0]) {
|
|
fillValue = func(v int64) *int64 { return &v }(0)
|
|
}
|
|
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TInt, hasTimeCol)
|
|
table = newIntegerWindowTable(done, typedCur, bnds, wai.spec.Window, createEmpty, timeColumn, fillValue, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
|
|
} else if createEmpty && !hasTimeCol {
|
|
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt)
|
|
table = newIntegerEmptyWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
|
|
} else {
|
|
// Note hasTimeCol == true means that aggregateWindow() was called.
|
|
// Because aggregateWindow() ultimately removes empty tables we
|
|
// don't bother creating them here.
|
|
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt)
|
|
table = newIntegerWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
|
|
}
|
|
case cursors.FloatArrayCursor:
|
|
if !selector {
|
|
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TFloat, hasTimeCol)
|
|
table = newFloatWindowTable(done, typedCur, bnds, wai.spec.Window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
|
|
} else if createEmpty && !hasTimeCol {
|
|
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat)
|
|
table = newFloatEmptyWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
|
|
} else {
|
|
// Note hasTimeCol == true means that aggregateWindow() was called.
|
|
// Because aggregateWindow() ultimately removes empty tables we
|
|
// don't bother creating them here.
|
|
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat)
|
|
table = newFloatWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
|
|
}
|
|
case cursors.UnsignedArrayCursor:
|
|
if !selector {
|
|
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TUInt, hasTimeCol)
|
|
table = newUnsignedWindowTable(done, typedCur, bnds, wai.spec.Window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
|
|
} else if createEmpty && !hasTimeCol {
|
|
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt)
|
|
table = newUnsignedEmptyWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
|
|
} else {
|
|
// Note hasTimeCol == true means that aggregateWindow() was called.
|
|
// Because aggregateWindow() ultimately removes empty tables we
|
|
// don't bother creating them here.
|
|
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt)
|
|
table = newUnsignedWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
|
|
}
|
|
case cursors.BooleanArrayCursor:
|
|
if !selector {
|
|
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TBool, hasTimeCol)
|
|
table = newBooleanWindowTable(done, typedCur, bnds, wai.spec.Window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
|
|
} else if createEmpty && !hasTimeCol {
|
|
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool)
|
|
table = newBooleanEmptyWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
|
|
} else {
|
|
// Note hasTimeCol == true means that aggregateWindow() was called.
|
|
// Because aggregateWindow() ultimately removes empty tables we
|
|
// don't bother creating them here.
|
|
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool)
|
|
table = newBooleanWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
|
|
}
|
|
case cursors.StringArrayCursor:
|
|
if !selector {
|
|
cols, defs := determineTableColsForWindowAggregate(rs.Tags(), flux.TString, hasTimeCol)
|
|
table = newStringWindowTable(done, typedCur, bnds, wai.spec.Window, createEmpty, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
|
|
} else if createEmpty && !hasTimeCol {
|
|
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
|
|
table = newStringEmptyWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
|
|
} else {
|
|
// Note hasTimeCol == true means that aggregateWindow() was called.
|
|
// Because aggregateWindow() ultimately removes empty tables we
|
|
// don't bother creating them here.
|
|
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
|
|
table = newStringWindowSelectorTable(done, typedCur, bnds, wai.spec.Window, timeColumn, key, cols, rs.Tags(), defs, wai.cache, wai.alloc)
|
|
}
|
|
default:
|
|
panic(fmt.Sprintf("unreachable: %T", typedCur))
|
|
}
|
|
|
|
cur = nil
|
|
|
|
if !table.Empty() {
|
|
if err := f(table); err != nil {
|
|
table.Close()
|
|
table = nil
|
|
return err
|
|
}
|
|
select {
|
|
case <-done:
|
|
case <-wai.ctx.Done():
|
|
table.Cancel()
|
|
break READ
|
|
}
|
|
}
|
|
|
|
stats := table.Statistics()
|
|
wai.stats.ScannedValues += stats.ScannedValues
|
|
wai.stats.ScannedBytes += stats.ScannedBytes
|
|
table.Close()
|
|
table = nil
|
|
}
|
|
return rs.Err()
|
|
}
|
|
|
|
func isAggregateCount(kind plan.ProcedureKind) bool {
|
|
return kind == CountKind
|
|
}
|
|
|
|
type tagKeysIterator struct {
|
|
ctx context.Context
|
|
bounds execute.Bounds
|
|
s storage.Store
|
|
readSpec query.ReadTagKeysSpec
|
|
predicate *datatypes.Predicate
|
|
alloc *memory.Allocator
|
|
}
|
|
|
|
func (ti *tagKeysIterator) Do(f func(flux.Table) error) error {
|
|
src := ti.s.GetSource(
|
|
uint64(ti.readSpec.OrganizationID),
|
|
uint64(ti.readSpec.BucketID),
|
|
)
|
|
|
|
var req datatypes.TagKeysRequest
|
|
any, err := types.MarshalAny(src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req.TagsSource = any
|
|
req.Predicate = ti.predicate
|
|
req.Range.Start = int64(ti.bounds.Start)
|
|
req.Range.End = int64(ti.bounds.Stop)
|
|
|
|
rs, err := ti.s.TagKeys(ti.ctx, &req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return ti.handleRead(f, rs)
|
|
}
|
|
|
|
func (ti *tagKeysIterator) handleRead(f func(flux.Table) error, rs cursors.StringIterator) error {
|
|
key := execute.NewGroupKey(nil, nil)
|
|
builder := execute.NewColListTableBuilder(key, ti.alloc)
|
|
valueIdx, err := builder.AddCol(flux.ColMeta{
|
|
Label: execute.DefaultValueColLabel,
|
|
Type: flux.TString,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer builder.ClearData()
|
|
|
|
// Add the _start and _stop columns that come from storage.
|
|
if err := builder.AppendString(valueIdx, "_start"); err != nil {
|
|
return err
|
|
}
|
|
if err := builder.AppendString(valueIdx, "_stop"); err != nil {
|
|
return err
|
|
}
|
|
|
|
for rs.Next() {
|
|
v := rs.Value()
|
|
switch v {
|
|
case models.MeasurementTagKey:
|
|
v = "_measurement"
|
|
case models.FieldKeyTagKey:
|
|
v = "_field"
|
|
}
|
|
|
|
if err := builder.AppendString(valueIdx, v); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Construct the table and add to the reference count
|
|
// so we can free the table later.
|
|
tbl, err := builder.Table()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Release the references to the arrays held by the builder.
|
|
builder.ClearData()
|
|
return f(tbl)
|
|
}
|
|
|
|
func (ti *tagKeysIterator) Statistics() cursors.CursorStats {
|
|
return cursors.CursorStats{}
|
|
}
|
|
|
|
type tagValuesIterator struct {
|
|
ctx context.Context
|
|
bounds execute.Bounds
|
|
s storage.Store
|
|
readSpec query.ReadTagValuesSpec
|
|
predicate *datatypes.Predicate
|
|
alloc *memory.Allocator
|
|
}
|
|
|
|
func (ti *tagValuesIterator) Do(f func(flux.Table) error) error {
|
|
src := ti.s.GetSource(
|
|
uint64(ti.readSpec.OrganizationID),
|
|
uint64(ti.readSpec.BucketID),
|
|
)
|
|
|
|
var req datatypes.TagValuesRequest
|
|
any, err := types.MarshalAny(src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.TagsSource = any
|
|
|
|
switch ti.readSpec.TagKey {
|
|
case "_measurement":
|
|
req.TagKey = models.MeasurementTagKey
|
|
case "_field":
|
|
req.TagKey = models.FieldKeyTagKey
|
|
default:
|
|
req.TagKey = ti.readSpec.TagKey
|
|
}
|
|
req.Predicate = ti.predicate
|
|
req.Range.Start = int64(ti.bounds.Start)
|
|
req.Range.End = int64(ti.bounds.Stop)
|
|
|
|
rs, err := ti.s.TagValues(ti.ctx, &req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return ti.handleRead(f, rs)
|
|
}
|
|
|
|
func (ti *tagValuesIterator) handleRead(f func(flux.Table) error, rs cursors.StringIterator) error {
|
|
key := execute.NewGroupKey(nil, nil)
|
|
builder := execute.NewColListTableBuilder(key, ti.alloc)
|
|
valueIdx, err := builder.AddCol(flux.ColMeta{
|
|
Label: execute.DefaultValueColLabel,
|
|
Type: flux.TString,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer builder.ClearData()
|
|
|
|
for rs.Next() {
|
|
if err := builder.AppendString(valueIdx, rs.Value()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Construct the table and add to the reference count
|
|
// so we can free the table later.
|
|
tbl, err := builder.Table()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Release the references to the arrays held by the builder.
|
|
builder.ClearData()
|
|
return f(tbl)
|
|
}
|
|
|
|
func (ti *tagValuesIterator) Statistics() cursors.CursorStats {
|
|
return cursors.CursorStats{}
|
|
}
|