package query import ( "context" "fmt" "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/memory" "github.com/influxdata/flux/plan" "github.com/influxdata/influxdb/v2/kit/platform" "github.com/influxdata/influxdb/v2/storage/reads/datatypes" "github.com/influxdata/influxdb/v2/tsdb/cursors" ) // StorageReader is an interface for reading tables from the storage subsystem. type StorageReader interface { ReadFilter(ctx context.Context, spec ReadFilterSpec, alloc *memory.Allocator) (TableIterator, error) ReadGroup(ctx context.Context, spec ReadGroupSpec, alloc *memory.Allocator) (TableIterator, error) ReadWindowAggregate(ctx context.Context, spec ReadWindowAggregateSpec, alloc *memory.Allocator) (TableIterator, error) ReadTagKeys(ctx context.Context, spec ReadTagKeysSpec, alloc *memory.Allocator) (TableIterator, error) ReadTagValues(ctx context.Context, spec ReadTagValuesSpec, alloc *memory.Allocator) (TableIterator, error) ReadSeriesCardinality(ctx context.Context, spec ReadSeriesCardinalitySpec, alloc *memory.Allocator) (TableIterator, error) SupportReadSeriesCardinality(ctx context.Context) bool Close() } type ReadFilterSpec struct { OrganizationID platform.ID BucketID platform.ID Bounds execute.Bounds Predicate *datatypes.Predicate } type ReadGroupSpec struct { ReadFilterSpec GroupMode GroupMode GroupKeys []string AggregateMethod string } func (spec *ReadGroupSpec) Name() string { return fmt.Sprintf("readGroup(%s)", spec.AggregateMethod) } type ReadTagKeysSpec struct { ReadFilterSpec } type ReadTagValuesSpec struct { ReadFilterSpec TagKey string } type ReadSeriesCardinalitySpec struct { ReadFilterSpec } // ReadWindowAggregateSpec defines the options for WindowAggregate. // // Window and the WindowEvery/Offset should be mutually exclusive. If you set either the WindowEvery or Offset with // nanosecond values, then the Window will be ignored. type ReadWindowAggregateSpec struct { ReadFilterSpec WindowEvery int64 Offset int64 Aggregates []plan.ProcedureKind CreateEmpty bool TimeColumn string Window execute.Window // ForceAggregate forces all aggregates to be treated as aggregates. // This forces selectors, which normally don't return values for empty // windows, to return a null value. ForceAggregate bool } func (spec *ReadWindowAggregateSpec) Name() string { var agg string if len(spec.Aggregates) > 0 { agg = string(spec.Aggregates[0]) } return fmt.Sprintf("readWindow(%s)", agg) } // TableIterator is a table iterator that also keeps track of cursor statistics from the storage engine. type TableIterator interface { flux.TableIterator Statistics() cursors.CursorStats } type GroupMode int const ( // GroupModeNone merges all series into a single group. GroupModeNone GroupMode = iota // GroupModeBy produces a table for each unique value of the specified GroupKeys. GroupModeBy ) // ToGroupMode accepts the group mode from Flux and produces the appropriate storage group mode. func ToGroupMode(fluxMode flux.GroupMode) GroupMode { switch fluxMode { case flux.GroupModeNone: return GroupModeNone case flux.GroupModeBy: return GroupModeBy default: panic(fmt.Sprint("unknown group mode: ", fluxMode)) } }