refactor(storage): ReadFilter storage operation

pull/13250/head
jlapacik 2019-04-02 14:03:25 -07:00
parent 785b787497
commit 8078b915fd
12 changed files with 1082 additions and 215 deletions

View File

@ -0,0 +1,55 @@
package influxdb
import (
"fmt"
"github.com/influxdata/flux"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/values"
)
const ReadRangePhysKind = "ReadRangePhysKind"
type ReadRangePhysSpec struct {
plan.DefaultCost
Bucket string
BucketID string
Bounds flux.Bounds
}
func (s *ReadRangePhysSpec) Kind() plan.ProcedureKind {
return ReadRangePhysKind
}
func (s *ReadRangePhysSpec) Copy() plan.ProcedureSpec {
ns := new(ReadRangePhysSpec)
ns.Bucket = s.Bucket
ns.BucketID = s.BucketID
ns.Bounds = s.Bounds
return ns
}
func (s *ReadRangePhysSpec) PostPhysicalValidate(id plan.NodeID) error {
if s.Bounds.Start.IsZero() && s.Bounds.Stop.IsZero() {
var bucket string
if len(s.Bucket) > 0 {
bucket = s.Bucket
} else {
bucket = s.BucketID
}
return fmt.Errorf(`%s: results from "%s" must be bounded`, id, bucket)
}
return nil
}
// TimeBounds implements plan.BoundsAwareProcedureSpec.
func (s *ReadRangePhysSpec) TimeBounds(predecessorBounds *plan.Bounds) *plan.Bounds {
return &plan.Bounds{
Start: values.ConvertTime(s.Bounds.Start.Time(s.Bounds.Now)),
Stop: values.ConvertTime(s.Bounds.Stop.Time(s.Bounds.Now)),
}
}

View File

@ -0,0 +1,37 @@
package influxdb
import (
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/stdlib/universe"
)
// func init() {
// plan.RegisterPhysicalRules(
// PushDownRangeRule{},
// )
// }
// PushDownRangeRule pushes down a range filter to storage
type PushDownRangeRule struct{}
func (rule PushDownRangeRule) Name() string {
return "PushDownRangeRule"
}
// Pattern matches 'from |> range'
func (rule PushDownRangeRule) Pattern() plan.Pattern {
return plan.Pat(universe.RangeKind, plan.Pat(FromKind))
}
// Rewrite converts 'from |> range' into 'ReadRange'
func (rule PushDownRangeRule) Rewrite(node plan.Node) (plan.Node, bool, error) {
fromNode := node.Predecessors()[0]
fromSpec := fromNode.ProcedureSpec().(*FromProcedureSpec)
rangeSpec := node.ProcedureSpec().(*universe.RangeProcedureSpec)
return plan.CreatePhysicalNode("ReadRange", &ReadRangePhysSpec{
Bucket: fromSpec.Bucket,
BucketID: fromSpec.BucketID,
Bounds: rangeSpec.Bounds,
}), true, nil
}

View File

@ -0,0 +1,120 @@
package influxdb_test
import (
"testing"
"github.com/influxdata/flux"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/plan/plantest"
"github.com/influxdata/flux/stdlib/universe"
"github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb"
)
func TestPushDownRangeRule(t *testing.T) {
fromSpec := influxdb.FromProcedureSpec{
Bucket: "my-bucket",
}
rangeSpec := universe.RangeProcedureSpec{
Bounds: flux.Bounds{
Start: fluxTime(5),
Stop: fluxTime(10),
},
}
readRangeSpec := influxdb.ReadRangePhysSpec{
Bucket: "my-bucket",
Bounds: flux.Bounds{
Start: fluxTime(5),
Stop: fluxTime(10),
},
}
tests := []plantest.RuleTestCase{
{
Name: "simple",
// from -> range => ReadRange
Rules: []plan.Rule{
influxdb.PushDownRangeRule{},
},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("from", &fromSpec),
plan.CreateLogicalNode("range", &rangeSpec),
},
Edges: [][2]int{{0, 1}},
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadRange", &readRangeSpec),
},
},
},
{
Name: "with successor",
// from -> range -> count => ReadRange -> count
Rules: []plan.Rule{
influxdb.PushDownRangeRule{},
},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("from", &fromSpec),
plan.CreateLogicalNode("range", &rangeSpec),
plan.CreatePhysicalNode("count", &universe.CountProcedureSpec{}),
},
Edges: [][2]int{
{0, 1},
{1, 2},
},
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadRange", &readRangeSpec),
plan.CreatePhysicalNode("count", &universe.CountProcedureSpec{}),
},
Edges: [][2]int{{0, 1}},
},
},
{
Name: "with multiple successors",
// count mean
// \ / count mean
// range => \ /
// | ReadRange
// from
Rules: []plan.Rule{
influxdb.PushDownRangeRule{},
},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreateLogicalNode("from", &fromSpec),
plan.CreateLogicalNode("range", &rangeSpec),
plan.CreatePhysicalNode("count", &universe.CountProcedureSpec{}),
plan.CreatePhysicalNode("mean", &universe.MeanProcedureSpec{}),
},
Edges: [][2]int{
{0, 1},
{1, 2},
{1, 3},
},
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadRange", &readRangeSpec),
plan.CreatePhysicalNode("count", &universe.CountProcedureSpec{}),
plan.CreatePhysicalNode("mean", &universe.MeanProcedureSpec{}),
},
Edges: [][2]int{
{0, 1},
{0, 2},
},
},
},
}
for _, tc := range tests {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
t.Parallel()
plantest.PhysicalRuleTestHelper(t, &tc)
})
}
}

View File

@ -0,0 +1,158 @@
package influxdb
import (
"context"
"errors"
"fmt"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/flux/plan"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/tsdb/cursors"
)
func init() {
execute.RegisterSource(ReadRangePhysKind, createReadFilterSource)
}
type runner interface {
run(ctx context.Context) error
}
type Source struct {
id execute.DatasetID
ts []execute.Transformation
alloc *memory.Allocator
stats cursors.CursorStats
runner runner
}
func (s *Source) Run(ctx context.Context) {
err := s.runner.run(ctx)
for _, t := range s.ts {
t.Finish(s.id, err)
}
}
func (s *Source) AddTransformation(t execute.Transformation) {
s.ts = append(s.ts, t)
}
func (s *Source) Metadata() flux.Metadata {
return flux.Metadata{
"influxdb/scanned-bytes": []interface{}{s.stats.ScannedBytes},
"influxdb/scanned-values": []interface{}{s.stats.ScannedValues},
}
}
func (s *Source) processTables(ctx context.Context, tables TableIterator, watermark execute.Time) error {
err := tables.Do(func(tbl flux.Table) error {
for _, t := range s.ts {
if err := t.Process(s.id, tbl); err != nil {
return err
}
//TODO(nathanielc): Also add mechanism to send UpdateProcessingTime calls, when no data is arriving.
// This is probably not needed for this source, but other sources should do so.
if err := t.UpdateProcessingTime(s.id, execute.Now()); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
// Track the number of bytes and values scanned.
stats := tables.Statistics()
s.stats.ScannedValues += stats.ScannedValues
s.stats.ScannedBytes += stats.ScannedBytes
for _, t := range s.ts {
if err := t.UpdateWatermark(s.id, watermark); err != nil {
return err
}
}
return nil
}
type readFilterSource struct {
Source
reader Reader
readSpec ReadFilterSpec
}
func ReadFilterSource(id execute.DatasetID, r Reader, readSpec ReadFilterSpec, alloc *memory.Allocator) execute.Source {
src := new(readFilterSource)
src.id = id
src.alloc = alloc
src.reader = r
src.readSpec = readSpec
src.runner = src
return src
}
func (s *readFilterSource) run(ctx context.Context) error {
stop := s.readSpec.Bounds.Stop
tables, err := s.reader.ReadFilter(
ctx,
s.readSpec,
s.alloc,
)
if err != nil {
return err
}
return s.processTables(ctx, tables, stop)
}
func createReadFilterSource(s plan.ProcedureSpec, id execute.DatasetID, a execute.Administration) (execute.Source, error) {
spec := s.(*ReadRangePhysSpec)
bounds := a.StreamContext().Bounds()
if bounds == nil {
return nil, errors.New("nil bounds passed to from")
}
deps := a.Dependencies()[FromKind].(Dependencies)
req := query.RequestFromContext(a.Context())
if req == nil {
return nil, errors.New("missing request on context")
}
orgID := req.OrganizationID
var bucketID platform.ID
// Determine bucketID
switch {
case spec.Bucket != "":
b, ok := deps.BucketLookup.Lookup(orgID, spec.Bucket)
if !ok {
return nil, fmt.Errorf("could not find bucket %q", spec.Bucket)
}
bucketID = b
case len(spec.BucketID) != 0:
err := bucketID.DecodeFromString(spec.BucketID)
if err != nil {
return nil, err
}
}
return ReadFilterSource(
id,
deps.Reader,
ReadFilterSpec{
OrganizationID: orgID,
BucketID: bucketID,
Bounds: *bounds,
},
a.Allocator(),
), nil
}

View File

@ -67,81 +67,38 @@ func (l StaticLookup) Watch() <-chan struct{} {
// source performs storage reads
type source struct {
id execute.DatasetID
Source
reader Reader
readSpec ReadSpec
window execute.Window
bounds execute.Bounds
alloc *memory.Allocator
ts []execute.Transformation
currentTime execute.Time
overflow bool
stats cursors.CursorStats
}
func NewSource(id execute.DatasetID, r Reader, readSpec ReadSpec, bounds execute.Bounds, w execute.Window, currentTime execute.Time, alloc *memory.Allocator) execute.Source {
return &source{
id: id,
src := &source{
reader: r,
readSpec: readSpec,
bounds: bounds,
window: w,
currentTime: currentTime,
alloc: alloc,
}
}
func (s *source) AddTransformation(t execute.Transformation) {
s.ts = append(s.ts, t)
}
func (s *source) Run(ctx context.Context) {
err := s.run(ctx)
for _, t := range s.ts {
t.Finish(s.id, err)
}
}
func (s *source) Metadata() flux.Metadata {
return flux.Metadata{
"influxdb/scanned-bytes": []interface{}{s.stats.ScannedBytes},
"influxdb/scanned-values": []interface{}{s.stats.ScannedValues},
}
src.id = id
src.alloc = alloc
src.runner = src
return src
}
func (s *source) run(ctx context.Context) error {
//TODO(nathanielc): Pass through context to actual network I/O.
for tables, mark, ok := s.next(ctx); ok; tables, mark, ok = s.next(ctx) {
err := tables.Do(func(tbl flux.Table) error {
for _, t := range s.ts {
if err := t.Process(s.id, tbl); err != nil {
return err
}
//TODO(nathanielc): Also add mechanism to send UpdateProcessingTime calls, when no data is arriving.
// This is probably not needed for this source, but other sources should do so.
if err := t.UpdateProcessingTime(s.id, execute.Now()); err != nil {
return err
}
}
return nil
})
err := s.processTables(ctx, tables, mark)
if err != nil {
return err
}
// Track the number of bytes and values scanned.
stats := tables.Statistics()
s.stats.ScannedValues += stats.ScannedValues
s.stats.ScannedBytes += stats.ScannedBytes
for _, t := range s.ts {
if err := t.UpdateWatermark(s.id, mark); err != nil {
return err
}
}
}
return nil
}
@ -240,8 +197,18 @@ type ReadSpec struct {
RetentionPolicy string // required by InfluxDB OSS
}
type ReadFilterSpec struct {
OrganizationID platform.ID
BucketID platform.ID
Bounds execute.Bounds
Predicate *semantic.FunctionExpression
}
type Reader interface {
Read(ctx context.Context, rs ReadSpec, start, stop execute.Time, alloc *memory.Allocator) (TableIterator, error)
ReadFilter(ctx context.Context, spec ReadFilterSpec, alloc *memory.Allocator) (TableIterator, error)
Close()
}

View File

@ -60,7 +60,7 @@ func (x ReadRequest_Group) String() string {
}
func (ReadRequest_Group) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{0, 0}
return fileDescriptor_715e4bf4cdf1f73d, []int{1, 0}
}
type ReadRequest_HintFlags int32
@ -92,7 +92,7 @@ func (x ReadRequest_HintFlags) String() string {
}
func (ReadRequest_HintFlags) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{0, 1}
return fileDescriptor_715e4bf4cdf1f73d, []int{1, 1}
}
type Aggregate_AggregateType int32
@ -120,7 +120,7 @@ func (x Aggregate_AggregateType) String() string {
}
func (Aggregate_AggregateType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{1, 0}
return fileDescriptor_715e4bf4cdf1f73d, []int{2, 0}
}
type ReadResponse_FrameType int32
@ -145,7 +145,7 @@ func (x ReadResponse_FrameType) String() string {
}
func (ReadResponse_FrameType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{3, 0}
return fileDescriptor_715e4bf4cdf1f73d, []int{4, 0}
}
type ReadResponse_DataType int32
@ -179,9 +179,48 @@ func (x ReadResponse_DataType) String() string {
}
func (ReadResponse_DataType) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{3, 1}
return fileDescriptor_715e4bf4cdf1f73d, []int{4, 1}
}
type ReadFilterRequest struct {
ReadSource *types.Any `protobuf:"bytes,1,opt,name=read_source,json=readSource,proto3" json:"read_source,omitempty"`
Range TimestampRange `protobuf:"bytes,2,opt,name=range,proto3" json:"range"`
Predicate *Predicate `protobuf:"bytes,3,opt,name=predicate,proto3" json:"predicate,omitempty"`
}
func (m *ReadFilterRequest) Reset() { *m = ReadFilterRequest{} }
func (m *ReadFilterRequest) String() string { return proto.CompactTextString(m) }
func (*ReadFilterRequest) ProtoMessage() {}
func (*ReadFilterRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{0}
}
func (m *ReadFilterRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *ReadFilterRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_ReadFilterRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalTo(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *ReadFilterRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_ReadFilterRequest.Merge(m, src)
}
func (m *ReadFilterRequest) XXX_Size() int {
return m.Size()
}
func (m *ReadFilterRequest) XXX_DiscardUnknown() {
xxx_messageInfo_ReadFilterRequest.DiscardUnknown(m)
}
var xxx_messageInfo_ReadFilterRequest proto.InternalMessageInfo
// Request message for Storage.Read.
type ReadRequest struct {
ReadSource *types.Any `protobuf:"bytes,13,opt,name=read_source,json=readSource,proto3" json:"read_source,omitempty"`
@ -215,7 +254,7 @@ func (m *ReadRequest) Reset() { *m = ReadRequest{} }
func (m *ReadRequest) String() string { return proto.CompactTextString(m) }
func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{0}
return fileDescriptor_715e4bf4cdf1f73d, []int{1}
}
func (m *ReadRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -252,7 +291,7 @@ func (m *Aggregate) Reset() { *m = Aggregate{} }
func (m *Aggregate) String() string { return proto.CompactTextString(m) }
func (*Aggregate) ProtoMessage() {}
func (*Aggregate) Descriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{1}
return fileDescriptor_715e4bf4cdf1f73d, []int{2}
}
func (m *Aggregate) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -290,7 +329,7 @@ func (m *Tag) Reset() { *m = Tag{} }
func (m *Tag) String() string { return proto.CompactTextString(m) }
func (*Tag) ProtoMessage() {}
func (*Tag) Descriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{2}
return fileDescriptor_715e4bf4cdf1f73d, []int{3}
}
func (m *Tag) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -328,7 +367,7 @@ func (m *ReadResponse) Reset() { *m = ReadResponse{} }
func (m *ReadResponse) String() string { return proto.CompactTextString(m) }
func (*ReadResponse) ProtoMessage() {}
func (*ReadResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{3}
return fileDescriptor_715e4bf4cdf1f73d, []int{4}
}
func (m *ReadResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -373,7 +412,7 @@ func (m *ReadResponse_Frame) Reset() { *m = ReadResponse_Frame{} }
func (m *ReadResponse_Frame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_Frame) ProtoMessage() {}
func (*ReadResponse_Frame) Descriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{3, 0}
return fileDescriptor_715e4bf4cdf1f73d, []int{4, 0}
}
func (m *ReadResponse_Frame) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -674,7 +713,7 @@ func (m *ReadResponse_GroupFrame) Reset() { *m = ReadResponse_GroupFrame
func (m *ReadResponse_GroupFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_GroupFrame) ProtoMessage() {}
func (*ReadResponse_GroupFrame) Descriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{3, 1}
return fileDescriptor_715e4bf4cdf1f73d, []int{4, 1}
}
func (m *ReadResponse_GroupFrame) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -712,7 +751,7 @@ func (m *ReadResponse_SeriesFrame) Reset() { *m = ReadResponse_SeriesFra
func (m *ReadResponse_SeriesFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_SeriesFrame) ProtoMessage() {}
func (*ReadResponse_SeriesFrame) Descriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{3, 2}
return fileDescriptor_715e4bf4cdf1f73d, []int{4, 2}
}
func (m *ReadResponse_SeriesFrame) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -750,7 +789,7 @@ func (m *ReadResponse_FloatPointsFrame) Reset() { *m = ReadResponse_Floa
func (m *ReadResponse_FloatPointsFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_FloatPointsFrame) ProtoMessage() {}
func (*ReadResponse_FloatPointsFrame) Descriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{3, 3}
return fileDescriptor_715e4bf4cdf1f73d, []int{4, 3}
}
func (m *ReadResponse_FloatPointsFrame) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -788,7 +827,7 @@ func (m *ReadResponse_IntegerPointsFrame) Reset() { *m = ReadResponse_In
func (m *ReadResponse_IntegerPointsFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_IntegerPointsFrame) ProtoMessage() {}
func (*ReadResponse_IntegerPointsFrame) Descriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{3, 4}
return fileDescriptor_715e4bf4cdf1f73d, []int{4, 4}
}
func (m *ReadResponse_IntegerPointsFrame) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -826,7 +865,7 @@ func (m *ReadResponse_UnsignedPointsFrame) Reset() { *m = ReadResponse_U
func (m *ReadResponse_UnsignedPointsFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_UnsignedPointsFrame) ProtoMessage() {}
func (*ReadResponse_UnsignedPointsFrame) Descriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{3, 5}
return fileDescriptor_715e4bf4cdf1f73d, []int{4, 5}
}
func (m *ReadResponse_UnsignedPointsFrame) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -864,7 +903,7 @@ func (m *ReadResponse_BooleanPointsFrame) Reset() { *m = ReadResponse_Bo
func (m *ReadResponse_BooleanPointsFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_BooleanPointsFrame) ProtoMessage() {}
func (*ReadResponse_BooleanPointsFrame) Descriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{3, 6}
return fileDescriptor_715e4bf4cdf1f73d, []int{4, 6}
}
func (m *ReadResponse_BooleanPointsFrame) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -902,7 +941,7 @@ func (m *ReadResponse_StringPointsFrame) Reset() { *m = ReadResponse_Str
func (m *ReadResponse_StringPointsFrame) String() string { return proto.CompactTextString(m) }
func (*ReadResponse_StringPointsFrame) ProtoMessage() {}
func (*ReadResponse_StringPointsFrame) Descriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{3, 7}
return fileDescriptor_715e4bf4cdf1f73d, []int{4, 7}
}
func (m *ReadResponse_StringPointsFrame) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -939,7 +978,7 @@ func (m *CapabilitiesResponse) Reset() { *m = CapabilitiesResponse{} }
func (m *CapabilitiesResponse) String() string { return proto.CompactTextString(m) }
func (*CapabilitiesResponse) ProtoMessage() {}
func (*CapabilitiesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{4}
return fileDescriptor_715e4bf4cdf1f73d, []int{5}
}
func (m *CapabilitiesResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -975,7 +1014,7 @@ func (m *HintsResponse) Reset() { *m = HintsResponse{} }
func (m *HintsResponse) String() string { return proto.CompactTextString(m) }
func (*HintsResponse) ProtoMessage() {}
func (*HintsResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{5}
return fileDescriptor_715e4bf4cdf1f73d, []int{6}
}
func (m *HintsResponse) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -1016,7 +1055,7 @@ func (m *TimestampRange) Reset() { *m = TimestampRange{} }
func (m *TimestampRange) String() string { return proto.CompactTextString(m) }
func (*TimestampRange) ProtoMessage() {}
func (*TimestampRange) Descriptor() ([]byte, []int) {
return fileDescriptor_715e4bf4cdf1f73d, []int{6}
return fileDescriptor_715e4bf4cdf1f73d, []int{7}
}
func (m *TimestampRange) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
@ -1051,6 +1090,7 @@ func init() {
proto.RegisterEnum("influxdata.platform.storage.Aggregate_AggregateType", Aggregate_AggregateType_name, Aggregate_AggregateType_value)
proto.RegisterEnum("influxdata.platform.storage.ReadResponse_FrameType", ReadResponse_FrameType_name, ReadResponse_FrameType_value)
proto.RegisterEnum("influxdata.platform.storage.ReadResponse_DataType", ReadResponse_DataType_name, ReadResponse_DataType_value)
proto.RegisterType((*ReadFilterRequest)(nil), "influxdata.platform.storage.ReadFilterRequest")
proto.RegisterType((*ReadRequest)(nil), "influxdata.platform.storage.ReadRequest")
proto.RegisterMapType((map[string]string)(nil), "influxdata.platform.storage.ReadRequest.TraceEntry")
proto.RegisterType((*Aggregate)(nil), "influxdata.platform.storage.Aggregate")
@ -1073,104 +1113,107 @@ func init() {
func init() { proto.RegisterFile("storage_common.proto", fileDescriptor_715e4bf4cdf1f73d) }
var fileDescriptor_715e4bf4cdf1f73d = []byte{
// 1547 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x57, 0x4f, 0x8f, 0xea, 0xc8,
0x11, 0xc7, 0xfc, 0xa7, 0xf8, 0x33, 0x7e, 0xbd, 0x64, 0xc4, 0xfa, 0x65, 0xc1, 0x8b, 0xa2, 0x0d,
0x49, 0x36, 0x4c, 0xc2, 0xee, 0x2a, 0x4f, 0x2f, 0xc9, 0x01, 0xe6, 0xf1, 0x06, 0x32, 0x33, 0x30,
0x6a, 0x98, 0x28, 0x1b, 0x29, 0x42, 0x3d, 0xd0, 0x78, 0xad, 0x35, 0xb6, 0x63, 0x9b, 0xd5, 0x20,
0xe5, 0x9e, 0x15, 0xa7, 0xcd, 0x35, 0x12, 0x52, 0xa4, 0x1c, 0x73, 0xcf, 0x67, 0x78, 0xb7, 0xec,
0x31, 0x27, 0x94, 0xf0, 0x3e, 0x44, 0xa4, 0x9c, 0xa2, 0xee, 0xb6, 0xc1, 0xcc, 0xbc, 0x8c, 0xe0,
0xd6, 0xf5, 0xab, 0xaa, 0x5f, 0x75, 0x97, 0xab, 0xaa, 0xdb, 0x50, 0x74, 0x3d, 0xcb, 0x21, 0x1a,
0x1d, 0x8d, 0xad, 0xd9, 0xcc, 0x32, 0xeb, 0xb6, 0x63, 0x79, 0x16, 0x7a, 0xae, 0x9b, 0x53, 0x63,
0x7e, 0x3f, 0x21, 0x1e, 0xa9, 0xdb, 0x06, 0xf1, 0xa6, 0x96, 0x33, 0xab, 0xfb, 0x96, 0x4a, 0x51,
0xb3, 0x34, 0x8b, 0xdb, 0x9d, 0xb1, 0x95, 0x70, 0x51, 0x9e, 0x6b, 0x96, 0xa5, 0x19, 0xf4, 0x8c,
0x4b, 0x77, 0xf3, 0xe9, 0x19, 0x9d, 0xd9, 0xde, 0xc2, 0x57, 0xbe, 0xff, 0x50, 0x49, 0xcc, 0x40,
0x75, 0x62, 0x3b, 0x74, 0xa2, 0x8f, 0x89, 0x47, 0x05, 0x50, 0xfd, 0x4f, 0x1a, 0xb2, 0x98, 0x92,
0x09, 0xa6, 0xbf, 0x9f, 0x53, 0xd7, 0x43, 0x6d, 0xc8, 0x3a, 0x94, 0x4c, 0x46, 0xae, 0x35, 0x77,
0xc6, 0xb4, 0x94, 0x57, 0xa5, 0x5a, 0xb6, 0x51, 0xac, 0x0b, 0xc6, 0x7a, 0xc0, 0x58, 0x6f, 0x9a,
0x8b, 0x56, 0x61, 0xb3, 0xae, 0x00, 0xf3, 0x1d, 0x70, 0x5b, 0x0c, 0xce, 0x76, 0x8d, 0x0c, 0x38,
0xf1, 0xf4, 0x19, 0x75, 0x3d, 0x32, 0xb3, 0x47, 0x0e, 0x31, 0x35, 0x5a, 0x8a, 0x72, 0xaa, 0x1f,
0xd5, 0x9f, 0x38, 0x6c, 0x7d, 0x18, 0xf8, 0x60, 0xe6, 0xd2, 0x3a, 0x7d, 0xb3, 0xae, 0x44, 0x36,
0xeb, 0x4a, 0x61, 0x1f, 0xc7, 0x05, 0x6f, 0x4f, 0x46, 0x65, 0x80, 0x09, 0x75, 0xc7, 0xd4, 0x9c,
0xe8, 0xa6, 0x56, 0x8a, 0xa9, 0x52, 0x2d, 0x8d, 0x43, 0x08, 0xfa, 0x18, 0x40, 0x73, 0xac, 0xb9,
0x3d, 0xfa, 0x92, 0x2e, 0xdc, 0x52, 0x5c, 0x8d, 0xd5, 0x32, 0xad, 0xfc, 0x66, 0x5d, 0xc9, 0x5c,
0x30, 0xf4, 0x92, 0x2e, 0x5c, 0x9c, 0xd1, 0x82, 0x25, 0x7a, 0x05, 0x09, 0x2e, 0x94, 0xb2, 0xaa,
0x54, 0x2b, 0x34, 0xea, 0x4f, 0xee, 0x38, 0x94, 0xbb, 0x3a, 0x67, 0xc3, 0xc2, 0x19, 0xbd, 0x82,
0x0c, 0xd1, 0x34, 0x87, 0x6a, 0xc4, 0xa3, 0xa5, 0x0c, 0x3f, 0xfb, 0x47, 0x4f, 0x32, 0x35, 0x03,
0x6b, 0xbc, 0x73, 0x64, 0x2c, 0xdb, 0x2f, 0x56, 0x4a, 0x1c, 0xc0, 0x72, 0x13, 0x58, 0xe3, 0x9d,
0x23, 0x6a, 0x40, 0xce, 0xa5, 0x8e, 0x4e, 0xdd, 0x91, 0xa1, 0xcf, 0x74, 0xaf, 0x94, 0x54, 0xa5,
0x5a, 0xac, 0x75, 0xb2, 0x59, 0x57, 0xb2, 0x03, 0x8e, 0x5f, 0x31, 0x18, 0x67, 0xdd, 0x9d, 0x80,
0x3e, 0x83, 0xbc, 0xef, 0x63, 0x4d, 0xa7, 0x2e, 0xf5, 0x4a, 0x29, 0xee, 0x24, 0x6f, 0xd6, 0x95,
0x9c, 0x70, 0xea, 0x73, 0x1c, 0xfb, 0xd4, 0x42, 0x62, 0xa1, 0x6c, 0x4b, 0x37, 0xbd, 0x20, 0x54,
0x7a, 0x17, 0xea, 0x86, 0xe3, 0x7e, 0x28, 0x7b, 0x27, 0xa0, 0x21, 0x24, 0x3c, 0x87, 0x8c, 0x69,
0x09, 0xd4, 0x58, 0x2d, 0xdb, 0xf8, 0xe4, 0xe0, 0x84, 0x0f, 0x99, 0x57, 0xdb, 0xf4, 0x9c, 0x45,
0x2b, 0xb3, 0x59, 0x57, 0x12, 0x5c, 0xc6, 0x82, 0x0c, 0x7d, 0x0c, 0x89, 0x2f, 0x58, 0x8c, 0x52,
0x4e, 0x95, 0x6a, 0xa9, 0xd6, 0x29, 0x33, 0xe8, 0x30, 0xe0, 0xbf, 0xeb, 0x4a, 0x86, 0x2d, 0x5e,
0x1b, 0x44, 0x73, 0xb1, 0x30, 0x52, 0x5e, 0x00, 0xec, 0xd8, 0x90, 0x0c, 0xb1, 0x2f, 0xe9, 0xa2,
0x24, 0xa9, 0x52, 0x2d, 0x83, 0xd9, 0x12, 0x15, 0x21, 0xf1, 0x15, 0x31, 0xe6, 0xa2, 0x8c, 0x33,
0x58, 0x08, 0x2f, 0xa3, 0x2f, 0xa4, 0xea, 0x1f, 0x25, 0x48, 0xf0, 0x2f, 0x8f, 0x3e, 0x00, 0xb8,
0xc0, 0xfd, 0xdb, 0x9b, 0x51, 0xaf, 0xdf, 0x6b, 0xcb, 0x11, 0x25, 0xbf, 0x5c, 0xa9, 0xa2, 0xc4,
0x7a, 0x96, 0x49, 0xd1, 0x73, 0xc8, 0x08, 0x75, 0xf3, 0xea, 0x4a, 0x96, 0x94, 0xdc, 0x72, 0xa5,
0xa6, 0xb9, 0xb6, 0x69, 0x18, 0xe8, 0x7d, 0x48, 0x0b, 0x65, 0xeb, 0x73, 0x39, 0xaa, 0x64, 0x97,
0x2b, 0x35, 0xc5, 0x75, 0xad, 0x05, 0xfa, 0x10, 0x72, 0x42, 0xd5, 0xfe, 0xcd, 0x79, 0xfb, 0x66,
0x28, 0xc7, 0x94, 0x93, 0xe5, 0x4a, 0xcd, 0x72, 0x75, 0xfb, 0x7e, 0x4c, 0x6d, 0x4f, 0x89, 0x7f,
0xfd, 0xd7, 0x72, 0xa4, 0xfa, 0x37, 0x09, 0x76, 0x07, 0x63, 0xe1, 0x3a, 0xdd, 0xde, 0x30, 0xd8,
0x0c, 0x0f, 0xc7, 0xb4, 0x7c, 0x2f, 0xdf, 0x83, 0x82, 0xaf, 0x1c, 0xdd, 0xf4, 0xbb, 0xbd, 0xe1,
0x40, 0x96, 0x14, 0x79, 0xb9, 0x52, 0x73, 0xc2, 0x42, 0x7c, 0xaa, 0xb0, 0xd5, 0xa0, 0x8d, 0xbb,
0xed, 0x81, 0x1c, 0x0d, 0x5b, 0x89, 0x32, 0x40, 0x67, 0x50, 0xe4, 0x56, 0x83, 0xf3, 0x4e, 0xfb,
0xba, 0xc9, 0x4e, 0x37, 0x1a, 0x76, 0xaf, 0xdb, 0x72, 0x5c, 0xf9, 0xce, 0x72, 0xa5, 0x3e, 0x63,
0xb6, 0x83, 0xf1, 0x17, 0x74, 0x46, 0x9a, 0x86, 0xc1, 0x1a, 0xd9, 0xdf, 0xed, 0x3f, 0x24, 0xc8,
0x6c, 0x6b, 0x1e, 0x75, 0x20, 0xee, 0x2d, 0x6c, 0xca, 0x53, 0x5e, 0x68, 0x7c, 0x7a, 0x58, 0xa7,
0xec, 0x56, 0xc3, 0x85, 0x4d, 0x31, 0x67, 0xa8, 0xde, 0x43, 0x7e, 0x0f, 0x46, 0x15, 0x88, 0xfb,
0x39, 0xe0, 0xfb, 0xd9, 0x53, 0xf2, 0x64, 0x7c, 0x00, 0xb1, 0xc1, 0xed, 0xb5, 0x2c, 0x29, 0xc5,
0xe5, 0x4a, 0x95, 0xf7, 0xf4, 0x83, 0xf9, 0x0c, 0x7d, 0x08, 0x89, 0xf3, 0xfe, 0x6d, 0x6f, 0x28,
0x47, 0x95, 0xd3, 0xe5, 0x4a, 0x45, 0x7b, 0x06, 0xe7, 0xd6, 0xdc, 0x0c, 0xf2, 0xff, 0x63, 0x88,
0x0d, 0x89, 0x16, 0x2e, 0x9e, 0xdc, 0x3b, 0x8a, 0x27, 0xe7, 0x17, 0x4f, 0xf5, 0x4f, 0x05, 0xc8,
0x89, 0x6a, 0x76, 0x6d, 0xcb, 0x74, 0x29, 0xba, 0x86, 0xe4, 0xd4, 0x21, 0x33, 0xea, 0x96, 0x24,
0xde, 0x08, 0x67, 0x07, 0x34, 0x82, 0x70, 0xad, 0xbf, 0x66, 0x7e, 0xad, 0x38, 0x9b, 0x97, 0xd8,
0x27, 0x51, 0xbe, 0x4e, 0x42, 0x82, 0xe3, 0xe8, 0x2a, 0x98, 0x68, 0x29, 0x3e, 0x41, 0x3e, 0x3d,
0x9c, 0x97, 0x17, 0x19, 0x27, 0xe9, 0x44, 0x82, 0xc9, 0xd6, 0x87, 0xa4, 0x68, 0x79, 0x7e, 0xc4,
0x6c, 0xe3, 0xb3, 0xc3, 0xe9, 0x44, 0xc5, 0x04, 0x7c, 0x3e, 0x0d, 0xb2, 0x21, 0x37, 0x35, 0x2c,
0xe2, 0x8d, 0xc4, 0x50, 0xf0, 0x6f, 0x8a, 0x97, 0x47, 0x9c, 0x9e, 0x79, 0x8b, 0x9a, 0x15, 0x89,
0xe0, 0xf3, 0x26, 0x84, 0x76, 0x22, 0x38, 0x3b, 0xdd, 0x89, 0xe8, 0x1e, 0x0a, 0xba, 0xe9, 0x51,
0x8d, 0x3a, 0x41, 0xcc, 0x18, 0x8f, 0xf9, 0x8b, 0xc3, 0x63, 0x76, 0x85, 0x7f, 0x38, 0xea, 0xb3,
0xcd, 0xba, 0x92, 0xdf, 0xc3, 0x3b, 0x11, 0x9c, 0xd7, 0xc3, 0x00, 0xfa, 0x03, 0x9c, 0xcc, 0x4d,
0x57, 0xd7, 0x4c, 0x3a, 0x09, 0x42, 0xc7, 0x79, 0xe8, 0x5f, 0x1e, 0x1e, 0xfa, 0xd6, 0x27, 0x08,
0xc7, 0x46, 0xec, 0x9a, 0xdc, 0x57, 0x74, 0x22, 0xb8, 0x30, 0xdf, 0x43, 0xd8, 0xb9, 0xef, 0x2c,
0xcb, 0xa0, 0xc4, 0x0c, 0x82, 0x27, 0x8e, 0x3d, 0x77, 0x4b, 0xf8, 0x3f, 0x3a, 0xf7, 0x1e, 0xce,
0xce, 0x7d, 0x17, 0x06, 0x90, 0x07, 0x79, 0xd7, 0x73, 0x74, 0x53, 0x0b, 0x02, 0x27, 0x79, 0xe0,
0x9f, 0x1f, 0x51, 0x3b, 0xdc, 0x3d, 0x1c, 0x57, 0xdc, 0x45, 0x21, 0xb8, 0x13, 0xc1, 0x39, 0x37,
0x24, 0xb7, 0x92, 0x10, 0x67, 0xcc, 0xca, 0x3d, 0xc0, 0xae, 0x92, 0xd1, 0x47, 0x90, 0xf6, 0x88,
0x26, 0x1e, 0x03, 0xac, 0xd3, 0x72, 0xad, 0xec, 0x66, 0x5d, 0x49, 0x0d, 0x89, 0xc6, 0x9f, 0x02,
0x29, 0x4f, 0x2c, 0x50, 0x0b, 0x90, 0x4d, 0x1c, 0x4f, 0xf7, 0x74, 0xcb, 0x64, 0xd6, 0xa3, 0xaf,
0x88, 0xc1, 0xaa, 0x93, 0x79, 0x14, 0x37, 0xeb, 0x8a, 0x7c, 0x13, 0x68, 0x2f, 0xe9, 0xe2, 0xd7,
0xc4, 0x70, 0xb1, 0x6c, 0x3f, 0x40, 0x94, 0x3f, 0x4b, 0x90, 0x0d, 0x55, 0x3d, 0x7a, 0x09, 0x71,
0x8f, 0x68, 0x41, 0x87, 0xab, 0x4f, 0xbf, 0x86, 0x88, 0xe6, 0xb7, 0x34, 0xf7, 0x41, 0x7d, 0xc8,
0x30, 0xc3, 0x11, 0x1f, 0x94, 0x51, 0x3e, 0x28, 0x1b, 0x87, 0xe7, 0xef, 0x15, 0xf1, 0x08, 0x1f,
0x93, 0xe9, 0x89, 0xbf, 0x52, 0x7e, 0x05, 0xf2, 0xc3, 0xd6, 0x61, 0x6f, 0xa9, 0xed, 0xeb, 0x4a,
0x6c, 0x53, 0xc6, 0x21, 0x04, 0x9d, 0x42, 0x92, 0x8f, 0x2f, 0x91, 0x08, 0x09, 0xfb, 0x92, 0x72,
0x05, 0xe8, 0x71, 0x4b, 0x1c, 0xc9, 0x16, 0xdb, 0xb2, 0x5d, 0xc3, 0x7b, 0xef, 0xa8, 0xf2, 0x23,
0xe9, 0xe2, 0xe1, 0xcd, 0x3d, 0xae, 0xdb, 0x23, 0xd9, 0xd2, 0x5b, 0xb6, 0x4b, 0x78, 0xf6, 0xa8,
0x18, 0x8f, 0x24, 0xcb, 0x04, 0x64, 0xd5, 0x01, 0x64, 0x38, 0x81, 0x7f, 0x55, 0x25, 0xfd, 0x8b,
0x36, 0xa2, 0xbc, 0xb7, 0x5c, 0xa9, 0x27, 0x5b, 0x95, 0x7f, 0xd7, 0x56, 0x20, 0xb9, 0xbd, 0xaf,
0xf7, 0x0d, 0xc4, 0x5e, 0xfc, 0x9b, 0xe8, 0xef, 0x12, 0xa4, 0x83, 0xef, 0x8d, 0xbe, 0x0b, 0x89,
0xd7, 0x57, 0xfd, 0xe6, 0x50, 0x8e, 0x28, 0xcf, 0x96, 0x2b, 0x35, 0x1f, 0x28, 0xf8, 0xa7, 0x47,
0x2a, 0xa4, 0xba, 0xbd, 0x61, 0xfb, 0xa2, 0x8d, 0x03, 0xca, 0x40, 0xef, 0x7f, 0x4e, 0x54, 0x85,
0xf4, 0x6d, 0x6f, 0xd0, 0xbd, 0xe8, 0xb5, 0x5f, 0xc9, 0x51, 0x71, 0x47, 0x06, 0x26, 0xc1, 0x37,
0x62, 0x2c, 0xad, 0x7e, 0xff, 0xaa, 0xdd, 0xec, 0xc9, 0xb1, 0x7d, 0x16, 0x3f, 0xef, 0xa8, 0x0c,
0xc9, 0xc1, 0x10, 0x77, 0x7b, 0x17, 0x72, 0x5c, 0x41, 0xcb, 0x95, 0x5a, 0x08, 0x0c, 0x44, 0x2a,
0xfd, 0x8d, 0xff, 0x45, 0x82, 0xe2, 0x39, 0xb1, 0xc9, 0x9d, 0x6e, 0xe8, 0x9e, 0x4e, 0xdd, 0xed,
0xdd, 0xd8, 0x87, 0xf8, 0x98, 0xd8, 0x41, 0xdf, 0x3c, 0x3d, 0x36, 0xde, 0x45, 0xc0, 0x40, 0x97,
0x3f, 0xee, 0x30, 0x27, 0x52, 0x7e, 0x06, 0x99, 0x2d, 0x74, 0xd4, 0x7b, 0xef, 0x04, 0xf2, 0xfc,
0x19, 0x19, 0x30, 0x57, 0x5f, 0xc0, 0x83, 0xff, 0x13, 0xe6, 0xec, 0x7a, 0xc4, 0xf1, 0x38, 0x61,
0x0c, 0x0b, 0x81, 0x05, 0xa1, 0xe6, 0x84, 0x13, 0xc6, 0x30, 0x5b, 0x36, 0xbe, 0x89, 0x42, 0x6a,
0x20, 0x36, 0x8d, 0x7e, 0x07, 0x71, 0xd6, 0xae, 0xa8, 0x76, 0xe8, 0xeb, 0x57, 0xf9, 0xc1, 0xc1,
0xbd, 0xff, 0x13, 0x09, 0x7d, 0x0e, 0xb9, 0x70, 0x5a, 0xd0, 0xe9, 0xa3, 0x5f, 0xba, 0x36, 0xfb,
0x83, 0x54, 0x7e, 0x7a, 0x74, 0x66, 0xd1, 0x25, 0x88, 0x77, 0xf5, 0xff, 0xe5, 0xfc, 0xe1, 0x93,
0x9c, 0x7b, 0xc9, 0x6c, 0x7d, 0xff, 0xcd, 0xbf, 0xcb, 0x91, 0x37, 0x9b, 0xb2, 0xf4, 0xed, 0xa6,
0x2c, 0xfd, 0x6b, 0x53, 0x96, 0xbe, 0x79, 0x5b, 0x8e, 0x7c, 0xfb, 0xb6, 0x1c, 0xf9, 0xe7, 0xdb,
0x72, 0xe4, 0xb7, 0x7c, 0xfe, 0xb1, 0xf1, 0xe7, 0xde, 0x25, 0x79, 0x90, 0x4f, 0xfe, 0x17, 0x00,
0x00, 0xff, 0xff, 0x61, 0xb5, 0x14, 0xbe, 0x53, 0x0f, 0x00, 0x00,
// 1588 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x57, 0xcd, 0x6f, 0x23, 0x49,
0x15, 0x77, 0xfb, 0xdb, 0xcf, 0x1f, 0xe9, 0xd4, 0x9a, 0xc8, 0xdb, 0xc3, 0xda, 0xbd, 0x16, 0x5a,
0x02, 0x2c, 0x0e, 0x64, 0x77, 0xc5, 0x68, 0x80, 0x83, 0x9d, 0x71, 0x62, 0x93, 0xc4, 0x8e, 0xca,
0x0e, 0x62, 0x91, 0x90, 0x55, 0xb1, 0xcb, 0xbd, 0xad, 0x6d, 0x77, 0x37, 0xdd, 0xed, 0x55, 0x2c,
0x71, 0x67, 0xe5, 0x13, 0x5c, 0x91, 0x2c, 0x21, 0x71, 0xe4, 0xce, 0xdf, 0x30, 0x37, 0xf6, 0xc8,
0xc9, 0x02, 0xcf, 0x89, 0xbf, 0x00, 0x09, 0x2e, 0xa8, 0xaa, 0xba, 0xed, 0x76, 0x32, 0x9b, 0xb5,
0xe7, 0x56, 0xf5, 0x3e, 0x7e, 0xef, 0x55, 0xbd, 0xaf, 0x2a, 0x28, 0xba, 0x9e, 0xe5, 0x10, 0x8d,
0x0e, 0x86, 0xd6, 0x64, 0x62, 0x99, 0x35, 0xdb, 0xb1, 0x3c, 0x0b, 0x3d, 0xd3, 0xcd, 0xb1, 0x31,
0xbd, 0x1f, 0x11, 0x8f, 0xd4, 0x6c, 0x83, 0x78, 0x63, 0xcb, 0x99, 0xd4, 0x7c, 0x49, 0xa5, 0xa8,
0x59, 0x9a, 0xc5, 0xe5, 0x4e, 0xd8, 0x4a, 0xa8, 0x28, 0xcf, 0x34, 0xcb, 0xd2, 0x0c, 0x7a, 0xc2,
0x77, 0x77, 0xd3, 0xf1, 0x09, 0x9d, 0xd8, 0xde, 0xcc, 0x67, 0xbe, 0xfb, 0x90, 0x49, 0xcc, 0x80,
0x75, 0x60, 0x3b, 0x74, 0xa4, 0x0f, 0x89, 0x47, 0x05, 0xa1, 0xfa, 0x6f, 0x09, 0x0e, 0x31, 0x25,
0xa3, 0x73, 0xdd, 0xf0, 0xa8, 0x83, 0xe9, 0x6f, 0xa7, 0xd4, 0xf5, 0x50, 0x13, 0xb2, 0x0e, 0x25,
0xa3, 0x81, 0x6b, 0x4d, 0x9d, 0x21, 0x2d, 0x49, 0xaa, 0x74, 0x9c, 0x3d, 0x2d, 0xd6, 0x04, 0x6e,
0x2d, 0xc0, 0xad, 0xd5, 0xcd, 0x59, 0xa3, 0xb0, 0x5a, 0x56, 0x80, 0x21, 0xf4, 0xb8, 0x2c, 0x06,
0x67, 0xbd, 0x46, 0x17, 0x90, 0x70, 0x88, 0xa9, 0xd1, 0x52, 0x94, 0x03, 0xfc, 0xa0, 0xf6, 0xc4,
0x41, 0x6b, 0x7d, 0x7d, 0x42, 0x5d, 0x8f, 0x4c, 0x6c, 0xcc, 0x54, 0x1a, 0xf1, 0x57, 0xcb, 0x4a,
0x04, 0x0b, 0x7d, 0xf4, 0x12, 0x32, 0x6b, 0xc7, 0x4b, 0x31, 0x0e, 0xf6, 0xc1, 0x93, 0x60, 0x37,
0x81, 0x34, 0xde, 0x28, 0x56, 0xff, 0x93, 0x86, 0x2c, 0xf3, 0xf4, 0x6b, 0x4e, 0x99, 0x7f, 0xcb,
0x53, 0x1a, 0x70, 0xe0, 0x05, 0xbe, 0x0f, 0xde, 0xfa, 0xbc, 0x47, 0xec, 0xbc, 0xab, 0x65, 0xa5,
0xb0, 0x4d, 0xc7, 0x05, 0x6f, 0x6b, 0x8f, 0xca, 0x00, 0x23, 0xea, 0x0e, 0xa9, 0x39, 0xd2, 0x4d,
0x8d, 0xdf, 0x45, 0x1a, 0x87, 0x28, 0xe8, 0x43, 0x00, 0xcd, 0xb1, 0xa6, 0xf6, 0xe0, 0x73, 0x3a,
0x73, 0x4b, 0x71, 0x35, 0x76, 0x9c, 0x69, 0xe4, 0x57, 0xcb, 0x4a, 0xe6, 0x82, 0x51, 0x2f, 0xe9,
0xcc, 0xc5, 0x19, 0x2d, 0x58, 0xa2, 0x97, 0x90, 0xe0, 0x9b, 0x52, 0x56, 0x95, 0x8e, 0x0b, 0xa7,
0xb5, 0x27, 0x3d, 0x0e, 0xdd, 0x5d, 0x8d, 0xa3, 0x61, 0xa1, 0xcc, 0xc2, 0x43, 0x34, 0xcd, 0xa1,
0x1a, 0x0b, 0x4f, 0x66, 0x87, 0xf0, 0xd4, 0x03, 0x69, 0xbc, 0x51, 0xdc, 0x0e, 0x72, 0xe2, 0x2d,
0x83, 0x8c, 0x4e, 0x21, 0xe7, 0x52, 0x47, 0xa7, 0xee, 0xc0, 0xd0, 0x27, 0xba, 0x57, 0x4a, 0xaa,
0xd2, 0x71, 0xac, 0x71, 0xb0, 0x5a, 0x56, 0xb2, 0x3d, 0x4e, 0xbf, 0x62, 0x64, 0x9c, 0x75, 0x37,
0x1b, 0xf4, 0x09, 0xe4, 0x7d, 0x1d, 0x6b, 0x3c, 0x76, 0xa9, 0x57, 0x4a, 0x71, 0x25, 0x79, 0xb5,
0xac, 0xe4, 0x84, 0x52, 0x97, 0xd3, 0xb1, 0x0f, 0x2d, 0x76, 0xcc, 0x94, 0x6d, 0xe9, 0xa6, 0x17,
0x98, 0x4a, 0x6f, 0x4c, 0xdd, 0x70, 0xba, 0x6f, 0xca, 0xde, 0x6c, 0x50, 0x1f, 0x12, 0x9e, 0x43,
0x86, 0xb4, 0x04, 0x6a, 0xec, 0x38, 0x7b, 0xfa, 0xd1, 0xce, 0x17, 0xde, 0x67, 0x5a, 0x4d, 0xd3,
0x73, 0x66, 0x8d, 0xcc, 0x6a, 0x59, 0x49, 0xf0, 0x3d, 0x16, 0x60, 0xe8, 0x43, 0x48, 0x7c, 0xc6,
0x6c, 0x94, 0x72, 0xaa, 0x74, 0x9c, 0x6a, 0x1c, 0x31, 0x81, 0x16, 0x23, 0xfc, 0x77, 0x59, 0xc9,
0xb0, 0xc5, 0xb9, 0x41, 0x34, 0x17, 0x0b, 0x21, 0xe5, 0x39, 0xc0, 0x06, 0x0d, 0xc9, 0x10, 0xfb,
0x9c, 0xce, 0x78, 0x8d, 0x67, 0x30, 0x5b, 0xa2, 0x22, 0x24, 0xbe, 0x20, 0xc6, 0x54, 0xa4, 0x71,
0x06, 0x8b, 0xcd, 0x8b, 0xe8, 0x73, 0xa9, 0xfa, 0x7b, 0x09, 0x12, 0x3c, 0xf2, 0xe8, 0x3d, 0x80,
0x0b, 0xdc, 0xbd, 0xbd, 0x19, 0x74, 0xba, 0x9d, 0xa6, 0x1c, 0x51, 0xf2, 0xf3, 0x85, 0x2a, 0x52,
0xac, 0x63, 0x99, 0x14, 0x3d, 0x83, 0x8c, 0x60, 0xd7, 0xaf, 0xae, 0x64, 0x49, 0xc9, 0xcd, 0x17,
0x6a, 0x9a, 0x73, 0xeb, 0x86, 0x81, 0xde, 0x85, 0xb4, 0x60, 0x36, 0x3e, 0x95, 0xa3, 0x4a, 0x76,
0xbe, 0x50, 0x53, 0x9c, 0xd7, 0x98, 0xa1, 0xf7, 0x21, 0x27, 0x58, 0xcd, 0x5f, 0x9d, 0x35, 0x6f,
0xfa, 0x72, 0x4c, 0x39, 0x98, 0x2f, 0xd4, 0x2c, 0x67, 0x37, 0xef, 0x87, 0xd4, 0xf6, 0x94, 0xf8,
0x97, 0x7f, 0x29, 0x47, 0xaa, 0x7f, 0x95, 0x60, 0x73, 0x30, 0x66, 0xae, 0xd5, 0xee, 0xf4, 0x03,
0x67, 0xb8, 0x39, 0xc6, 0xe5, 0xbe, 0x7c, 0x07, 0x0a, 0x3e, 0x73, 0x70, 0xd3, 0x6d, 0x77, 0xfa,
0x3d, 0x59, 0x52, 0xe4, 0xf9, 0x42, 0xcd, 0x09, 0x09, 0x11, 0xaa, 0xb0, 0x54, 0xaf, 0x89, 0xdb,
0xcd, 0x9e, 0x1c, 0x0d, 0x4b, 0x89, 0x34, 0x40, 0x27, 0x50, 0xe4, 0x52, 0xbd, 0xb3, 0x56, 0xf3,
0xba, 0xce, 0x4e, 0x37, 0xe8, 0xb7, 0xaf, 0x9b, 0x72, 0x5c, 0xf9, 0xd6, 0x7c, 0xa1, 0x1e, 0x32,
0xd9, 0xde, 0xf0, 0x33, 0x3a, 0x21, 0x75, 0xc3, 0x60, 0x85, 0xec, 0x7b, 0xfb, 0x77, 0x09, 0x32,
0xeb, 0x9c, 0x47, 0x2d, 0x88, 0x7b, 0x33, 0x5b, 0xb4, 0xd5, 0xc2, 0xe9, 0xc7, 0xbb, 0x55, 0xca,
0x66, 0xd5, 0x9f, 0xd9, 0x14, 0x73, 0x84, 0xea, 0x3d, 0xe4, 0xb7, 0xc8, 0xa8, 0x02, 0x71, 0xff,
0x0e, 0xb8, 0x3f, 0x5b, 0x4c, 0x7e, 0x19, 0xef, 0x41, 0xac, 0x77, 0x7b, 0x2d, 0x4b, 0x4a, 0x71,
0xbe, 0x50, 0xe5, 0x2d, 0x7e, 0x6f, 0x3a, 0x41, 0xef, 0x43, 0xe2, 0xac, 0x7b, 0xdb, 0xe9, 0xcb,
0x51, 0xe5, 0x68, 0xbe, 0x50, 0xd1, 0x96, 0xc0, 0x99, 0x35, 0x35, 0x83, 0xfb, 0xff, 0x21, 0xc4,
0xfa, 0x44, 0x0b, 0x27, 0x4f, 0xee, 0x0d, 0xc9, 0x93, 0xf3, 0x93, 0xa7, 0xfa, 0xc7, 0x02, 0xe4,
0x44, 0x36, 0xbb, 0xb6, 0x65, 0xba, 0x14, 0x5d, 0x43, 0x72, 0xec, 0x90, 0x09, 0x75, 0x4b, 0x12,
0x2f, 0x84, 0x93, 0x1d, 0x0a, 0x41, 0xa8, 0xd6, 0xce, 0x99, 0x9e, 0x3f, 0x1f, 0x7c, 0x10, 0xe5,
0xcb, 0x24, 0x24, 0x38, 0x1d, 0x5d, 0x05, 0x1d, 0x2d, 0xc5, 0x3b, 0xc8, 0xc7, 0xbb, 0xe3, 0xf2,
0x24, 0xe3, 0x20, 0xad, 0x48, 0xd0, 0xd9, 0xba, 0x90, 0x14, 0x25, 0xef, 0xcf, 0xc0, 0x4f, 0x76,
0x87, 0x13, 0x19, 0x13, 0xe0, 0xf9, 0x30, 0xc8, 0x86, 0xdc, 0xd8, 0xb0, 0x88, 0x37, 0x10, 0x4d,
0xc1, 0x9f, 0x14, 0x2f, 0xf6, 0x38, 0x3d, 0xd3, 0x16, 0x39, 0x2b, 0x2e, 0x82, 0xf7, 0x9b, 0x10,
0xb5, 0x15, 0xc1, 0xd9, 0xf1, 0x66, 0x8b, 0xee, 0xa1, 0xa0, 0x9b, 0x1e, 0xd5, 0xa8, 0x13, 0xd8,
0x14, 0x03, 0xf4, 0x67, 0xbb, 0xdb, 0x6c, 0x0b, 0xfd, 0xb0, 0xd5, 0xc3, 0xd5, 0xb2, 0x92, 0xdf,
0xa2, 0xb7, 0x22, 0x38, 0xaf, 0x87, 0x09, 0xe8, 0x77, 0x70, 0x30, 0x35, 0x5d, 0x5d, 0x33, 0xe9,
0x28, 0x30, 0x1d, 0xe7, 0xa6, 0x7f, 0xbe, 0xbb, 0xe9, 0x5b, 0x1f, 0x20, 0x6c, 0x1b, 0xb1, 0x31,
0xb9, 0xcd, 0x68, 0x45, 0x70, 0x61, 0xba, 0x45, 0x61, 0xe7, 0xbe, 0xb3, 0x2c, 0x83, 0x12, 0x33,
0x30, 0x9e, 0xd8, 0xf7, 0xdc, 0x0d, 0xa1, 0xff, 0xe8, 0xdc, 0x5b, 0x74, 0x76, 0xee, 0xbb, 0x30,
0x01, 0x79, 0x90, 0x77, 0x3d, 0x47, 0x37, 0xb5, 0xc0, 0x70, 0x92, 0x1b, 0xfe, 0xe9, 0x1e, 0xb9,
0xc3, 0xd5, 0xc3, 0x76, 0xc5, 0x2c, 0x0a, 0x91, 0x5b, 0x11, 0x9c, 0x73, 0x43, 0xfb, 0x46, 0x12,
0xe2, 0x0c, 0x59, 0xb9, 0x07, 0xd8, 0x64, 0x32, 0xfa, 0x00, 0xd2, 0x1e, 0xd1, 0xc4, 0x63, 0x80,
0x55, 0x5a, 0xae, 0x91, 0x5d, 0x2d, 0x2b, 0xa9, 0x3e, 0xd1, 0xf8, 0x53, 0x20, 0xe5, 0x89, 0x05,
0x6a, 0x00, 0xb2, 0x89, 0xe3, 0xe9, 0x9e, 0x6e, 0x99, 0x4c, 0x7a, 0xf0, 0x05, 0x31, 0x58, 0x76,
0x32, 0x8d, 0xe2, 0x6a, 0x59, 0x91, 0x6f, 0x02, 0xee, 0x25, 0x9d, 0xfd, 0x92, 0x18, 0x2e, 0x96,
0xed, 0x07, 0x14, 0xe5, 0x4f, 0x12, 0x64, 0x43, 0x59, 0x8f, 0x5e, 0x40, 0xdc, 0x23, 0x5a, 0x50,
0xe1, 0xea, 0xd3, 0xaf, 0x21, 0xa2, 0xf9, 0x25, 0xcd, 0x75, 0x50, 0x17, 0x32, 0x4c, 0x70, 0xc0,
0x1b, 0x65, 0x94, 0x37, 0xca, 0xd3, 0xdd, 0xef, 0xef, 0x25, 0xf1, 0x08, 0x6f, 0x93, 0xe9, 0x91,
0xbf, 0x52, 0x7e, 0x01, 0xf2, 0xc3, 0xd2, 0x61, 0x6f, 0xa9, 0xf5, 0xeb, 0x4a, 0xb8, 0x29, 0xe3,
0x10, 0x05, 0x1d, 0x41, 0x92, 0xb7, 0x2f, 0x71, 0x11, 0x12, 0xf6, 0x77, 0xca, 0x15, 0xa0, 0xc7,
0x25, 0xb1, 0x27, 0x5a, 0x6c, 0x8d, 0x76, 0x0d, 0xef, 0xbc, 0x21, 0xcb, 0xf7, 0x84, 0x8b, 0x87,
0x9d, 0x7b, 0x9c, 0xb7, 0x7b, 0xa2, 0xa5, 0xd7, 0x68, 0x97, 0x70, 0xf8, 0x28, 0x19, 0xf7, 0x04,
0xcb, 0x04, 0x60, 0xd5, 0x1e, 0x64, 0x38, 0x80, 0x3f, 0xaa, 0x92, 0xfe, 0xa0, 0x8d, 0x28, 0xef,
0xcc, 0x17, 0xea, 0xc1, 0x9a, 0xe5, 0xcf, 0xda, 0x0a, 0x24, 0xd7, 0xf3, 0x7a, 0x5b, 0x40, 0xf8,
0xe2, 0x4f, 0xa2, 0xbf, 0x49, 0x90, 0x0e, 0xe2, 0x8d, 0xbe, 0x0d, 0x89, 0xf3, 0xab, 0x6e, 0xbd,
0x2f, 0x47, 0x94, 0xc3, 0xf9, 0x42, 0xcd, 0x07, 0x0c, 0x1e, 0x7a, 0xa4, 0x42, 0xaa, 0xdd, 0xe9,
0x37, 0x2f, 0x9a, 0x38, 0x80, 0x0c, 0xf8, 0x7e, 0x38, 0x51, 0x15, 0xd2, 0xb7, 0x9d, 0x5e, 0xfb,
0xa2, 0xd3, 0x7c, 0x29, 0x47, 0xc5, 0x8c, 0x0c, 0x44, 0x82, 0x18, 0x31, 0x94, 0x46, 0xb7, 0x7b,
0xd5, 0xac, 0x77, 0xe4, 0xd8, 0x36, 0x8a, 0x7f, 0xef, 0xa8, 0x0c, 0xc9, 0x5e, 0x1f, 0xb7, 0x3b,
0x17, 0x72, 0x5c, 0x41, 0xf3, 0x85, 0x5a, 0x08, 0x04, 0xc4, 0x55, 0xfa, 0x8e, 0xff, 0x59, 0x82,
0xe2, 0x19, 0xb1, 0xc9, 0x9d, 0x6e, 0xe8, 0x9e, 0x4e, 0xdd, 0xf5, 0x6c, 0xec, 0x42, 0x7c, 0x48,
0xec, 0xa0, 0x6e, 0x9e, 0x6e, 0x1b, 0x6f, 0x02, 0x60, 0x44, 0x97, 0x3f, 0xee, 0x30, 0x07, 0x52,
0x7e, 0x02, 0x99, 0x35, 0x69, 0xaf, 0xf7, 0xde, 0x01, 0xe4, 0xf9, 0x33, 0x32, 0x40, 0xae, 0x3e,
0x87, 0x07, 0xff, 0x13, 0xa6, 0xec, 0x7a, 0xc4, 0xf1, 0x38, 0x60, 0x0c, 0x8b, 0x0d, 0x33, 0x42,
0xcd, 0x11, 0x07, 0x8c, 0x61, 0xb6, 0x3c, 0xfd, 0x5f, 0x14, 0x52, 0x3d, 0xe1, 0x34, 0xfa, 0x0d,
0xc4, 0x59, 0xb9, 0xa2, 0xe3, 0x5d, 0x5f, 0xbf, 0xca, 0xf7, 0x76, 0xae, 0xfd, 0x1f, 0x49, 0x48,
0x07, 0xd8, 0x7c, 0x69, 0xd1, 0x37, 0xff, 0x69, 0xb6, 0xfe, 0xbe, 0xfb, 0x99, 0xfa, 0x14, 0x72,
0xe1, 0x08, 0xa0, 0xa3, 0x47, 0xbf, 0xc7, 0x26, 0xfb, 0x98, 0x2b, 0x3f, 0xde, 0x3b, 0x88, 0xe8,
0x12, 0xc4, 0x13, 0xfe, 0x6b, 0x31, 0xbf, 0xff, 0x24, 0xe6, 0x56, 0xdc, 0x1a, 0xdf, 0x7d, 0xf5,
0xaf, 0x72, 0xe4, 0xd5, 0xaa, 0x2c, 0x7d, 0xb5, 0x2a, 0x4b, 0xff, 0x5c, 0x95, 0xa5, 0x3f, 0xbc,
0x2e, 0x47, 0xbe, 0x7a, 0x5d, 0x8e, 0xfc, 0xe3, 0x75, 0x39, 0xf2, 0x6b, 0xde, 0x6a, 0x59, 0xa7,
0x75, 0xef, 0x92, 0xdc, 0xc8, 0x47, 0xff, 0x0f, 0x00, 0x00, 0xff, 0xff, 0xf8, 0xcb, 0x1e, 0x6b,
0xaa, 0x10, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -1187,6 +1230,8 @@ const _ = grpc.SupportPackageIsVersion4
type StorageClient interface {
// Read performs a read operation using the given ReadRequest
Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (Storage_ReadClient, error)
// ReadFilter performs a filter operation at storage
ReadFilter(ctx context.Context, in *ReadFilterRequest, opts ...grpc.CallOption) (Storage_ReadFilterClient, error)
// Capabilities returns a map of keys and values identifying the capabilities supported by the storage engine
Capabilities(ctx context.Context, in *types.Empty, opts ...grpc.CallOption) (*CapabilitiesResponse, error)
Hints(ctx context.Context, in *types.Empty, opts ...grpc.CallOption) (*HintsResponse, error)
@ -1232,6 +1277,38 @@ func (x *storageReadClient) Recv() (*ReadResponse, error) {
return m, nil
}
func (c *storageClient) ReadFilter(ctx context.Context, in *ReadFilterRequest, opts ...grpc.CallOption) (Storage_ReadFilterClient, error) {
stream, err := c.cc.NewStream(ctx, &_Storage_serviceDesc.Streams[1], "/influxdata.platform.storage.Storage/ReadFilter", opts...)
if err != nil {
return nil, err
}
x := &storageReadFilterClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type Storage_ReadFilterClient interface {
Recv() (*ReadResponse, error)
grpc.ClientStream
}
type storageReadFilterClient struct {
grpc.ClientStream
}
func (x *storageReadFilterClient) Recv() (*ReadResponse, error) {
m := new(ReadResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *storageClient) Capabilities(ctx context.Context, in *types.Empty, opts ...grpc.CallOption) (*CapabilitiesResponse, error) {
out := new(CapabilitiesResponse)
err := c.cc.Invoke(ctx, "/influxdata.platform.storage.Storage/Capabilities", in, out, opts...)
@ -1254,6 +1331,8 @@ func (c *storageClient) Hints(ctx context.Context, in *types.Empty, opts ...grpc
type StorageServer interface {
// Read performs a read operation using the given ReadRequest
Read(*ReadRequest, Storage_ReadServer) error
// ReadFilter performs a filter operation at storage
ReadFilter(*ReadFilterRequest, Storage_ReadFilterServer) error
// Capabilities returns a map of keys and values identifying the capabilities supported by the storage engine
Capabilities(context.Context, *types.Empty) (*CapabilitiesResponse, error)
Hints(context.Context, *types.Empty) (*HintsResponse, error)
@ -1284,6 +1363,27 @@ func (x *storageReadServer) Send(m *ReadResponse) error {
return x.ServerStream.SendMsg(m)
}
func _Storage_ReadFilter_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(ReadFilterRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(StorageServer).ReadFilter(m, &storageReadFilterServer{stream})
}
type Storage_ReadFilterServer interface {
Send(*ReadResponse) error
grpc.ServerStream
}
type storageReadFilterServer struct {
grpc.ServerStream
}
func (x *storageReadFilterServer) Send(m *ReadResponse) error {
return x.ServerStream.SendMsg(m)
}
func _Storage_Capabilities_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(types.Empty)
if err := dec(in); err != nil {
@ -1339,10 +1439,61 @@ var _Storage_serviceDesc = grpc.ServiceDesc{
Handler: _Storage_Read_Handler,
ServerStreams: true,
},
{
StreamName: "ReadFilter",
Handler: _Storage_ReadFilter_Handler,
ServerStreams: true,
},
},
Metadata: "storage_common.proto",
}
func (m *ReadFilterRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ReadFilterRequest) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if m.ReadSource != nil {
dAtA[i] = 0xa
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(m.ReadSource.Size()))
n1, err := m.ReadSource.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n1
}
dAtA[i] = 0x12
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(m.Range.Size()))
n2, err := m.Range.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n2
if m.Predicate != nil {
dAtA[i] = 0x1a
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(m.Predicate.Size()))
n3, err := m.Predicate.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n3
}
return i, nil
}
func (m *ReadRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
@ -1361,11 +1512,11 @@ func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x12
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(m.TimestampRange.Size()))
n1, err := m.TimestampRange.MarshalTo(dAtA[i:])
n4, err := m.TimestampRange.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n1
i += n4
if m.Descending {
dAtA[i] = 0x18
i++
@ -1395,11 +1546,11 @@ func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x2a
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(m.Predicate.Size()))
n2, err := m.Predicate.MarshalTo(dAtA[i:])
n5, err := m.Predicate.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n2
i += n5
}
if m.SeriesLimit != 0 {
dAtA[i] = 0x30
@ -1420,11 +1571,11 @@ func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x4a
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(m.Aggregate.Size()))
n3, err := m.Aggregate.MarshalTo(dAtA[i:])
n6, err := m.Aggregate.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n3
i += n6
}
if len(m.Trace) > 0 {
for k, _ := range m.Trace {
@ -1458,11 +1609,11 @@ func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x6a
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(m.ReadSource.Size()))
n4, err := m.ReadSource.MarshalTo(dAtA[i:])
n7, err := m.ReadSource.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n4
i += n7
}
return i, nil
}
@ -1566,11 +1717,11 @@ func (m *ReadResponse_Frame) MarshalTo(dAtA []byte) (int, error) {
var l int
_ = l
if m.Data != nil {
nn5, err := m.Data.MarshalTo(dAtA[i:])
nn8, err := m.Data.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += nn5
i += nn8
}
return i, nil
}
@ -1581,11 +1732,11 @@ func (m *ReadResponse_Frame_Series) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0xa
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(m.Series.Size()))
n6, err := m.Series.MarshalTo(dAtA[i:])
n9, err := m.Series.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n6
i += n9
}
return i, nil
}
@ -1595,11 +1746,11 @@ func (m *ReadResponse_Frame_FloatPoints) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x12
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(m.FloatPoints.Size()))
n7, err := m.FloatPoints.MarshalTo(dAtA[i:])
n10, err := m.FloatPoints.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n7
i += n10
}
return i, nil
}
@ -1609,11 +1760,11 @@ func (m *ReadResponse_Frame_IntegerPoints) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x1a
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(m.IntegerPoints.Size()))
n8, err := m.IntegerPoints.MarshalTo(dAtA[i:])
n11, err := m.IntegerPoints.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n8
i += n11
}
return i, nil
}
@ -1623,11 +1774,11 @@ func (m *ReadResponse_Frame_UnsignedPoints) MarshalTo(dAtA []byte) (int, error)
dAtA[i] = 0x22
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(m.UnsignedPoints.Size()))
n9, err := m.UnsignedPoints.MarshalTo(dAtA[i:])
n12, err := m.UnsignedPoints.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n9
i += n12
}
return i, nil
}
@ -1637,11 +1788,11 @@ func (m *ReadResponse_Frame_BooleanPoints) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x2a
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(m.BooleanPoints.Size()))
n10, err := m.BooleanPoints.MarshalTo(dAtA[i:])
n13, err := m.BooleanPoints.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n10
i += n13
}
return i, nil
}
@ -1651,11 +1802,11 @@ func (m *ReadResponse_Frame_StringPoints) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x32
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(m.StringPoints.Size()))
n11, err := m.StringPoints.MarshalTo(dAtA[i:])
n14, err := m.StringPoints.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n11
i += n14
}
return i, nil
}
@ -1665,11 +1816,11 @@ func (m *ReadResponse_Frame_Group) MarshalTo(dAtA []byte) (int, error) {
dAtA[i] = 0x3a
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(m.Group.Size()))
n12, err := m.Group.MarshalTo(dAtA[i:])
n15, err := m.Group.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n12
i += n15
}
return i, nil
}
@ -1771,8 +1922,8 @@ func (m *ReadResponse_FloatPointsFrame) MarshalTo(dAtA []byte) (int, error) {
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(len(m.Values)*8))
for _, num := range m.Values {
f13 := math.Float64bits(float64(num))
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(f13))
f16 := math.Float64bits(float64(num))
encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(f16))
i += 8
}
}
@ -1804,22 +1955,22 @@ func (m *ReadResponse_IntegerPointsFrame) MarshalTo(dAtA []byte) (int, error) {
}
}
if len(m.Values) > 0 {
dAtA15 := make([]byte, len(m.Values)*10)
var j14 int
dAtA18 := make([]byte, len(m.Values)*10)
var j17 int
for _, num1 := range m.Values {
num := uint64(num1)
for num >= 1<<7 {
dAtA15[j14] = uint8(uint64(num)&0x7f | 0x80)
dAtA18[j17] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
j14++
j17++
}
dAtA15[j14] = uint8(num)
j14++
dAtA18[j17] = uint8(num)
j17++
}
dAtA[i] = 0x12
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(j14))
i += copy(dAtA[i:], dAtA15[:j14])
i = encodeVarintStorageCommon(dAtA, i, uint64(j17))
i += copy(dAtA[i:], dAtA18[:j17])
}
return i, nil
}
@ -1849,21 +2000,21 @@ func (m *ReadResponse_UnsignedPointsFrame) MarshalTo(dAtA []byte) (int, error) {
}
}
if len(m.Values) > 0 {
dAtA17 := make([]byte, len(m.Values)*10)
var j16 int
dAtA20 := make([]byte, len(m.Values)*10)
var j19 int
for _, num := range m.Values {
for num >= 1<<7 {
dAtA17[j16] = uint8(uint64(num)&0x7f | 0x80)
dAtA20[j19] = uint8(uint64(num)&0x7f | 0x80)
num >>= 7
j16++
j19++
}
dAtA17[j16] = uint8(num)
j16++
dAtA20[j19] = uint8(num)
j19++
}
dAtA[i] = 0x12
i++
i = encodeVarintStorageCommon(dAtA, i, uint64(j16))
i += copy(dAtA[i:], dAtA17[:j16])
i = encodeVarintStorageCommon(dAtA, i, uint64(j19))
i += copy(dAtA[i:], dAtA20[:j19])
}
return i, nil
}
@ -2040,6 +2191,25 @@ func encodeVarintStorageCommon(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
return offset + 1
}
func (m *ReadFilterRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.ReadSource != nil {
l = m.ReadSource.Size()
n += 1 + l + sovStorageCommon(uint64(l))
}
l = m.Range.Size()
n += 1 + l + sovStorageCommon(uint64(l))
if m.Predicate != nil {
l = m.Predicate.Size()
n += 1 + l + sovStorageCommon(uint64(l))
}
return n
}
func (m *ReadRequest) Size() (n int) {
if m == nil {
return 0
@ -2414,6 +2584,164 @@ func sovStorageCommon(x uint64) (n int) {
func sozStorageCommon(x uint64) (n int) {
return sovStorageCommon(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *ReadFilterRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorageCommon
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: ReadFilterRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: ReadFilterRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ReadSource", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorageCommon
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthStorageCommon
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthStorageCommon
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.ReadSource == nil {
m.ReadSource = &types.Any{}
}
if err := m.ReadSource.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Range", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorageCommon
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthStorageCommon
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthStorageCommon
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := m.Range.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Predicate", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowStorageCommon
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthStorageCommon
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthStorageCommon
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Predicate == nil {
m.Predicate = &Predicate{}
}
if err := m.Predicate.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipStorageCommon(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthStorageCommon
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthStorageCommon
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *ReadRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0

View File

@ -16,6 +16,9 @@ service Storage {
// Read performs a read operation using the given ReadRequest
rpc Read (ReadRequest) returns (stream ReadResponse);
// ReadFilter performs a filter operation at storage
rpc ReadFilter (ReadFilterRequest) returns (stream ReadResponse);
// Capabilities returns a map of keys and values identifying the capabilities supported by the storage engine
rpc Capabilities (google.protobuf.Empty) returns (CapabilitiesResponse);
@ -25,6 +28,12 @@ service Storage {
// rpc Explain(google.protobuf.Empty) returns (ExplainResponse){}
}
message ReadFilterRequest {
google.protobuf.Any read_source = 1 [(gogoproto.customname) = "ReadSource"];
TimestampRange range = 2 [(gogoproto.nullable) = false];
Predicate predicate = 3;
}
// Request message for Storage.Read.
message ReadRequest {
enum Group {

View File

@ -52,8 +52,138 @@ func (r *storeReader) Read(ctx context.Context, rs influxdb.ReadSpec, start, sto
}, nil
}
func (r *storeReader) ReadFilter(ctx context.Context, spec influxdb.ReadFilterSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) {
return &simpleTableIterator{
ctx: ctx,
s: r.s,
spec: spec,
alloc: alloc,
}, nil
}
func (r *storeReader) Close() {}
type simpleTableIterator struct {
ctx context.Context
s Store
spec influxdb.ReadFilterSpec
stats cursors.CursorStats
alloc *memory.Allocator
}
func (bi *simpleTableIterator) Statistics() cursors.CursorStats { return bi.stats }
func (bi *simpleTableIterator) Do(f func(flux.Table) error) error {
orgID := uint64(bi.spec.OrganizationID)
bucketID := uint64(bi.spec.BucketID)
src := bi.s.GetSourceFrom(orgID, bucketID)
// Setup read request
any, err := types.MarshalAny(src)
if err != nil {
return err
}
var predicate *datatypes.Predicate
if bi.spec.Predicate != nil {
p, err := toStoragePredicate(bi.spec.Predicate)
if err != nil {
return err
}
predicate = p
}
var req datatypes.ReadFilterRequest
req.ReadSource = any
req.Predicate = predicate
req.Range.Start = int64(bi.spec.Bounds.Start)
req.Range.End = int64(bi.spec.Bounds.Stop)
rs, err := bi.s.ReadFilter(bi.ctx, &req)
if err != nil {
return err
}
if rs == nil {
return nil
}
return bi.handleRead(f, rs)
}
func (bi *simpleTableIterator) handleRead(f func(flux.Table) error, rs ResultSet) error {
// these resources must be closed if not nil on return
var (
cur cursors.Cursor
table storageTable
)
defer func() {
if table != nil {
table.Close()
}
if cur != nil {
cur.Close()
}
rs.Close()
}()
READ:
for rs.Next() {
cur = rs.Cursor()
if cur == nil {
// no data for series key + field combination
continue
}
bnds := bi.spec.Bounds
key := defaultGroupKeyForSeries(rs.Tags(), bnds)
done := make(chan struct{})
switch typedCur := cur.(type) {
case cursors.IntegerArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt)
table = newIntegerTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, bi.alloc)
case cursors.FloatArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat)
table = newFloatTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, bi.alloc)
case cursors.UnsignedArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt)
table = newUnsignedTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, bi.alloc)
case cursors.BooleanArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool)
table = newBooleanTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, bi.alloc)
case cursors.StringArrayCursor:
cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString)
table = newStringTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, bi.alloc)
default:
panic(fmt.Sprintf("unreachable: %T", typedCur))
}
cur = nil
if !table.Empty() {
if err := f(table); err != nil {
table.Close()
table = nil
return err
}
select {
case <-done:
case <-bi.ctx.Done():
table.Cancel()
break READ
}
}
stats := table.Statistics()
bi.stats.ScannedValues += stats.ScannedValues
bi.stats.ScannedBytes += stats.ScannedBytes
table.Close()
table = nil
}
return rs.Err()
}
type tableIterator struct {
ctx context.Context
bounds execute.Bounds
@ -442,6 +572,29 @@ func determineTableColsForSeries(tags models.Tags, typ flux.ColType) ([]flux.Col
return cols, defs
}
func defaultGroupKeyForSeries(tags models.Tags, bnds execute.Bounds) flux.GroupKey {
cols := make([]flux.ColMeta, 2, len(tags))
vs := make([]values.Value, 2, len(tags))
cols[0] = flux.ColMeta{
Label: execute.DefaultStartColLabel,
Type: flux.TTime,
}
vs[0] = values.NewTime(bnds.Start)
cols[1] = flux.ColMeta{
Label: execute.DefaultStopColLabel,
Type: flux.TTime,
}
vs[1] = values.NewTime(bnds.Stop)
for i := range tags {
cols = append(cols, flux.ColMeta{
Label: string(tags[i].Key),
Type: flux.TString,
})
vs = append(vs, values.NewString(string(tags[i].Value)))
}
return execute.NewGroupKey(cols, vs)
}
func groupKeyForSeries(tags models.Tags, readSpec *influxdb.ReadSpec, bnds execute.Bounds) flux.GroupKey {
cols := make([]flux.ColMeta, 2, len(tags))
vs := make([]values.Value, 2, len(tags))

View File

@ -2,6 +2,7 @@ package reads
import (
"context"
"math"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/storage/reads/datatypes"
@ -30,6 +31,14 @@ func NewResultSet(ctx context.Context, req *datatypes.ReadRequest, cur SeriesCur
}
}
func NewResultSetFromFilter(ctx context.Context, req *datatypes.ReadFilterRequest, cur SeriesCursor) ResultSet {
return &resultSet{
ctx: ctx,
cur: cur,
mb: newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, true, math.MaxInt64),
}
}
func (r *resultSet) Err() error { return nil }
// Close closes the result set. Close is idempotent.

View File

@ -76,6 +76,8 @@ type GroupCursor interface {
type Store interface {
Read(ctx context.Context, req *datatypes.ReadRequest) (ResultSet, error)
ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest) (ResultSet, error)
GroupRead(ctx context.Context, req *datatypes.ReadRequest) (GroupResultSet, error)
GetSource(rs influxdb.ReadSpec) (proto.Message, error)
GetSourceFrom(orgID, bucketID uint64) proto.Message
}

View File

@ -35,7 +35,7 @@ type indexSeriesCursor struct {
hasValueExpr bool
}
func newIndexSeriesCursor(ctx context.Context, src *readSource, req *datatypes.ReadRequest, engine *storage.Engine) (*indexSeriesCursor, error) {
func newIndexSeriesCursor(ctx context.Context, src *readSource, predicate *datatypes.Predicate, engine *storage.Engine) (*indexSeriesCursor, error) {
queries, err := engine.CreateCursorIterator(ctx)
if err != nil {
return nil, err
@ -59,7 +59,7 @@ func newIndexSeriesCursor(ctx context.Context, src *readSource, req *datatypes.R
}
p := &indexSeriesCursor{row: reads.SeriesRow{Query: tsdb.CursorIterators{queries}}}
if root := req.Predicate.GetRoot(); root != nil {
if root := predicate.GetRoot(); root != nil {
if p.cond, err = reads.NodeToExpr(root, nil); err != nil {
return nil, err
}

View File

@ -22,6 +22,28 @@ func newStore(engine *storage.Engine) *store {
return &store{engine: engine}
}
func (s *store) ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest) (reads.ResultSet, error) {
if req.ReadSource == nil {
return nil, errors.New("missing read source")
}
var source readSource
if err := types.UnmarshalAny(req.ReadSource, &source); err != nil {
return nil, err
}
var cur reads.SeriesCursor
if ic, err := newIndexSeriesCursor(ctx, &source, req.Predicate, s.engine); err != nil {
return nil, err
} else if ic == nil {
return nil, nil
} else {
cur = ic
}
return reads.NewResultSetFromFilter(ctx, req, cur), nil
}
func (s *store) Read(ctx context.Context, req *datatypes.ReadRequest) (reads.ResultSet, error) {
if len(req.GroupKeys) > 0 {
panic("Read: len(Grouping) > 0")
@ -49,7 +71,7 @@ func (s *store) Read(ctx context.Context, req *datatypes.ReadRequest) (reads.Res
}
var cur reads.SeriesCursor
if ic, err := newIndexSeriesCursor(ctx, source, req, s.engine); err != nil {
if ic, err := newIndexSeriesCursor(ctx, source, req.Predicate, s.engine); err != nil {
return nil, err
} else if ic == nil {
return nil, nil
@ -91,7 +113,7 @@ func (s *store) GroupRead(ctx context.Context, req *datatypes.ReadRequest) (read
}
newCursor := func() (reads.SeriesCursor, error) {
cur, err := newIndexSeriesCursor(ctx, source, req, s.engine)
cur, err := newIndexSeriesCursor(ctx, source, req.Predicate, s.engine)
if cur == nil || err != nil {
return nil, err
}
@ -120,6 +142,13 @@ func (s *store) GetSource(rs influxdb.ReadSpec) (proto.Message, error) {
}, nil
}
func (s *store) GetSourceFrom(orgID, bucketID uint64) proto.Message {
return &readSource{
BucketID: bucketID,
OrganizationID: orgID,
}
}
func getReadSource(req *datatypes.ReadRequest) (*readSource, error) {
if req.ReadSource == nil {
return nil, errors.New("missing read source")