From 8078b915fd02ee26953d7a9e3095e7c7145e9d9b Mon Sep 17 00:00:00 2001 From: jlapacik Date: Tue, 2 Apr 2019 14:03:25 -0700 Subject: [PATCH] refactor(storage): ReadFilter storage operation --- query/stdlib/influxdata/influxdb/operators.go | 55 ++ query/stdlib/influxdata/influxdb/rules.go | 37 + .../stdlib/influxdata/influxdb/rules_test.go | 120 ++++ query/stdlib/influxdata/influxdb/source.go | 158 +++++ query/stdlib/influxdata/influxdb/storage.go | 69 +- storage/reads/datatypes/storage_common.pb.go | 648 +++++++++++++----- storage/reads/datatypes/storage_common.proto | 9 + storage/reads/reader.go | 153 +++++ storage/reads/resultset.go | 9 + storage/reads/store.go | 2 + storage/readservice/cursor.go | 4 +- storage/readservice/store.go | 33 +- 12 files changed, 1082 insertions(+), 215 deletions(-) create mode 100644 query/stdlib/influxdata/influxdb/operators.go create mode 100644 query/stdlib/influxdata/influxdb/rules.go create mode 100644 query/stdlib/influxdata/influxdb/rules_test.go create mode 100644 query/stdlib/influxdata/influxdb/source.go diff --git a/query/stdlib/influxdata/influxdb/operators.go b/query/stdlib/influxdata/influxdb/operators.go new file mode 100644 index 0000000000..20cca47d80 --- /dev/null +++ b/query/stdlib/influxdata/influxdb/operators.go @@ -0,0 +1,55 @@ +package influxdb + +import ( + "fmt" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/plan" + "github.com/influxdata/flux/values" +) + +const ReadRangePhysKind = "ReadRangePhysKind" + +type ReadRangePhysSpec struct { + plan.DefaultCost + + Bucket string + BucketID string + + Bounds flux.Bounds +} + +func (s *ReadRangePhysSpec) Kind() plan.ProcedureKind { + return ReadRangePhysKind +} +func (s *ReadRangePhysSpec) Copy() plan.ProcedureSpec { + ns := new(ReadRangePhysSpec) + + ns.Bucket = s.Bucket + ns.BucketID = s.BucketID + + ns.Bounds = s.Bounds + + return ns +} + +func (s *ReadRangePhysSpec) PostPhysicalValidate(id plan.NodeID) error { + if s.Bounds.Start.IsZero() && s.Bounds.Stop.IsZero() { + var bucket string + if len(s.Bucket) > 0 { + bucket = s.Bucket + } else { + bucket = s.BucketID + } + return fmt.Errorf(`%s: results from "%s" must be bounded`, id, bucket) + } + return nil +} + +// TimeBounds implements plan.BoundsAwareProcedureSpec. +func (s *ReadRangePhysSpec) TimeBounds(predecessorBounds *plan.Bounds) *plan.Bounds { + return &plan.Bounds{ + Start: values.ConvertTime(s.Bounds.Start.Time(s.Bounds.Now)), + Stop: values.ConvertTime(s.Bounds.Stop.Time(s.Bounds.Now)), + } +} diff --git a/query/stdlib/influxdata/influxdb/rules.go b/query/stdlib/influxdata/influxdb/rules.go new file mode 100644 index 0000000000..64a4229ddf --- /dev/null +++ b/query/stdlib/influxdata/influxdb/rules.go @@ -0,0 +1,37 @@ +package influxdb + +import ( + "github.com/influxdata/flux/plan" + "github.com/influxdata/flux/stdlib/universe" +) + +// func init() { +// plan.RegisterPhysicalRules( +// PushDownRangeRule{}, +// ) +// } + +// PushDownRangeRule pushes down a range filter to storage +type PushDownRangeRule struct{} + +func (rule PushDownRangeRule) Name() string { + return "PushDownRangeRule" +} + +// Pattern matches 'from |> range' +func (rule PushDownRangeRule) Pattern() plan.Pattern { + return plan.Pat(universe.RangeKind, plan.Pat(FromKind)) +} + +// Rewrite converts 'from |> range' into 'ReadRange' +func (rule PushDownRangeRule) Rewrite(node plan.Node) (plan.Node, bool, error) { + fromNode := node.Predecessors()[0] + fromSpec := fromNode.ProcedureSpec().(*FromProcedureSpec) + + rangeSpec := node.ProcedureSpec().(*universe.RangeProcedureSpec) + return plan.CreatePhysicalNode("ReadRange", &ReadRangePhysSpec{ + Bucket: fromSpec.Bucket, + BucketID: fromSpec.BucketID, + Bounds: rangeSpec.Bounds, + }), true, nil +} diff --git a/query/stdlib/influxdata/influxdb/rules_test.go b/query/stdlib/influxdata/influxdb/rules_test.go new file mode 100644 index 0000000000..9f1c59ce14 --- /dev/null +++ b/query/stdlib/influxdata/influxdb/rules_test.go @@ -0,0 +1,120 @@ +package influxdb_test + +import ( + "testing" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/plan" + "github.com/influxdata/flux/plan/plantest" + "github.com/influxdata/flux/stdlib/universe" + "github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb" +) + +func TestPushDownRangeRule(t *testing.T) { + fromSpec := influxdb.FromProcedureSpec{ + Bucket: "my-bucket", + } + rangeSpec := universe.RangeProcedureSpec{ + Bounds: flux.Bounds{ + Start: fluxTime(5), + Stop: fluxTime(10), + }, + } + readRangeSpec := influxdb.ReadRangePhysSpec{ + Bucket: "my-bucket", + Bounds: flux.Bounds{ + Start: fluxTime(5), + Stop: fluxTime(10), + }, + } + + tests := []plantest.RuleTestCase{ + { + Name: "simple", + // from -> range => ReadRange + Rules: []plan.Rule{ + influxdb.PushDownRangeRule{}, + }, + Before: &plantest.PlanSpec{ + Nodes: []plan.Node{ + plan.CreateLogicalNode("from", &fromSpec), + plan.CreateLogicalNode("range", &rangeSpec), + }, + Edges: [][2]int{{0, 1}}, + }, + After: &plantest.PlanSpec{ + Nodes: []plan.Node{ + plan.CreatePhysicalNode("ReadRange", &readRangeSpec), + }, + }, + }, + { + Name: "with successor", + // from -> range -> count => ReadRange -> count + Rules: []plan.Rule{ + influxdb.PushDownRangeRule{}, + }, + Before: &plantest.PlanSpec{ + Nodes: []plan.Node{ + plan.CreateLogicalNode("from", &fromSpec), + plan.CreateLogicalNode("range", &rangeSpec), + plan.CreatePhysicalNode("count", &universe.CountProcedureSpec{}), + }, + Edges: [][2]int{ + {0, 1}, + {1, 2}, + }, + }, + After: &plantest.PlanSpec{ + Nodes: []plan.Node{ + plan.CreatePhysicalNode("ReadRange", &readRangeSpec), + plan.CreatePhysicalNode("count", &universe.CountProcedureSpec{}), + }, + Edges: [][2]int{{0, 1}}, + }, + }, + { + Name: "with multiple successors", + // count mean + // \ / count mean + // range => \ / + // | ReadRange + // from + Rules: []plan.Rule{ + influxdb.PushDownRangeRule{}, + }, + Before: &plantest.PlanSpec{ + Nodes: []plan.Node{ + plan.CreateLogicalNode("from", &fromSpec), + plan.CreateLogicalNode("range", &rangeSpec), + plan.CreatePhysicalNode("count", &universe.CountProcedureSpec{}), + plan.CreatePhysicalNode("mean", &universe.MeanProcedureSpec{}), + }, + Edges: [][2]int{ + {0, 1}, + {1, 2}, + {1, 3}, + }, + }, + After: &plantest.PlanSpec{ + Nodes: []plan.Node{ + plan.CreatePhysicalNode("ReadRange", &readRangeSpec), + plan.CreatePhysicalNode("count", &universe.CountProcedureSpec{}), + plan.CreatePhysicalNode("mean", &universe.MeanProcedureSpec{}), + }, + Edges: [][2]int{ + {0, 1}, + {0, 2}, + }, + }, + }, + } + + for _, tc := range tests { + tc := tc + t.Run(tc.Name, func(t *testing.T) { + t.Parallel() + plantest.PhysicalRuleTestHelper(t, &tc) + }) + } +} diff --git a/query/stdlib/influxdata/influxdb/source.go b/query/stdlib/influxdata/influxdb/source.go new file mode 100644 index 0000000000..4bc87750b5 --- /dev/null +++ b/query/stdlib/influxdata/influxdb/source.go @@ -0,0 +1,158 @@ +package influxdb + +import ( + "context" + "errors" + "fmt" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/memory" + "github.com/influxdata/flux/plan" + platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/query" + "github.com/influxdata/influxdb/tsdb/cursors" +) + +func init() { + execute.RegisterSource(ReadRangePhysKind, createReadFilterSource) +} + +type runner interface { + run(ctx context.Context) error +} + +type Source struct { + id execute.DatasetID + ts []execute.Transformation + + alloc *memory.Allocator + stats cursors.CursorStats + + runner runner +} + +func (s *Source) Run(ctx context.Context) { + err := s.runner.run(ctx) + for _, t := range s.ts { + t.Finish(s.id, err) + } +} + +func (s *Source) AddTransformation(t execute.Transformation) { + s.ts = append(s.ts, t) +} + +func (s *Source) Metadata() flux.Metadata { + return flux.Metadata{ + "influxdb/scanned-bytes": []interface{}{s.stats.ScannedBytes}, + "influxdb/scanned-values": []interface{}{s.stats.ScannedValues}, + } +} + +func (s *Source) processTables(ctx context.Context, tables TableIterator, watermark execute.Time) error { + err := tables.Do(func(tbl flux.Table) error { + for _, t := range s.ts { + if err := t.Process(s.id, tbl); err != nil { + return err + } + //TODO(nathanielc): Also add mechanism to send UpdateProcessingTime calls, when no data is arriving. + // This is probably not needed for this source, but other sources should do so. + if err := t.UpdateProcessingTime(s.id, execute.Now()); err != nil { + return err + } + } + return nil + }) + if err != nil { + return err + } + + // Track the number of bytes and values scanned. + stats := tables.Statistics() + s.stats.ScannedValues += stats.ScannedValues + s.stats.ScannedBytes += stats.ScannedBytes + + for _, t := range s.ts { + if err := t.UpdateWatermark(s.id, watermark); err != nil { + return err + } + } + return nil +} + +type readFilterSource struct { + Source + reader Reader + readSpec ReadFilterSpec +} + +func ReadFilterSource(id execute.DatasetID, r Reader, readSpec ReadFilterSpec, alloc *memory.Allocator) execute.Source { + src := new(readFilterSource) + + src.id = id + src.alloc = alloc + + src.reader = r + src.readSpec = readSpec + + src.runner = src + return src +} + +func (s *readFilterSource) run(ctx context.Context) error { + stop := s.readSpec.Bounds.Stop + tables, err := s.reader.ReadFilter( + ctx, + s.readSpec, + s.alloc, + ) + if err != nil { + return err + } + return s.processTables(ctx, tables, stop) +} + +func createReadFilterSource(s plan.ProcedureSpec, id execute.DatasetID, a execute.Administration) (execute.Source, error) { + spec := s.(*ReadRangePhysSpec) + + bounds := a.StreamContext().Bounds() + if bounds == nil { + return nil, errors.New("nil bounds passed to from") + } + + deps := a.Dependencies()[FromKind].(Dependencies) + + req := query.RequestFromContext(a.Context()) + if req == nil { + return nil, errors.New("missing request on context") + } + + orgID := req.OrganizationID + var bucketID platform.ID + // Determine bucketID + switch { + case spec.Bucket != "": + b, ok := deps.BucketLookup.Lookup(orgID, spec.Bucket) + if !ok { + return nil, fmt.Errorf("could not find bucket %q", spec.Bucket) + } + bucketID = b + case len(spec.BucketID) != 0: + err := bucketID.DecodeFromString(spec.BucketID) + if err != nil { + return nil, err + } + } + + return ReadFilterSource( + id, + deps.Reader, + ReadFilterSpec{ + OrganizationID: orgID, + BucketID: bucketID, + Bounds: *bounds, + }, + a.Allocator(), + ), nil +} diff --git a/query/stdlib/influxdata/influxdb/storage.go b/query/stdlib/influxdata/influxdb/storage.go index 0e7942ad88..1a4a2d794d 100644 --- a/query/stdlib/influxdata/influxdb/storage.go +++ b/query/stdlib/influxdata/influxdb/storage.go @@ -67,81 +67,38 @@ func (l StaticLookup) Watch() <-chan struct{} { // source performs storage reads type source struct { - id execute.DatasetID + Source + reader Reader readSpec ReadSpec window execute.Window bounds execute.Bounds - alloc *memory.Allocator - - ts []execute.Transformation currentTime execute.Time overflow bool - - stats cursors.CursorStats } func NewSource(id execute.DatasetID, r Reader, readSpec ReadSpec, bounds execute.Bounds, w execute.Window, currentTime execute.Time, alloc *memory.Allocator) execute.Source { - return &source{ - id: id, + src := &source{ reader: r, readSpec: readSpec, bounds: bounds, window: w, currentTime: currentTime, - alloc: alloc, - } -} - -func (s *source) AddTransformation(t execute.Transformation) { - s.ts = append(s.ts, t) -} - -func (s *source) Run(ctx context.Context) { - err := s.run(ctx) - for _, t := range s.ts { - t.Finish(s.id, err) - } -} - -func (s *source) Metadata() flux.Metadata { - return flux.Metadata{ - "influxdb/scanned-bytes": []interface{}{s.stats.ScannedBytes}, - "influxdb/scanned-values": []interface{}{s.stats.ScannedValues}, } + src.id = id + src.alloc = alloc + src.runner = src + return src } func (s *source) run(ctx context.Context) error { //TODO(nathanielc): Pass through context to actual network I/O. for tables, mark, ok := s.next(ctx); ok; tables, mark, ok = s.next(ctx) { - err := tables.Do(func(tbl flux.Table) error { - for _, t := range s.ts { - if err := t.Process(s.id, tbl); err != nil { - return err - } - //TODO(nathanielc): Also add mechanism to send UpdateProcessingTime calls, when no data is arriving. - // This is probably not needed for this source, but other sources should do so. - if err := t.UpdateProcessingTime(s.id, execute.Now()); err != nil { - return err - } - } - return nil - }) + err := s.processTables(ctx, tables, mark) if err != nil { return err } - - // Track the number of bytes and values scanned. - stats := tables.Statistics() - s.stats.ScannedValues += stats.ScannedValues - s.stats.ScannedBytes += stats.ScannedBytes - - for _, t := range s.ts { - if err := t.UpdateWatermark(s.id, mark); err != nil { - return err - } - } } return nil } @@ -240,8 +197,18 @@ type ReadSpec struct { RetentionPolicy string // required by InfluxDB OSS } +type ReadFilterSpec struct { + OrganizationID platform.ID + BucketID platform.ID + + Bounds execute.Bounds + + Predicate *semantic.FunctionExpression +} + type Reader interface { Read(ctx context.Context, rs ReadSpec, start, stop execute.Time, alloc *memory.Allocator) (TableIterator, error) + ReadFilter(ctx context.Context, spec ReadFilterSpec, alloc *memory.Allocator) (TableIterator, error) Close() } diff --git a/storage/reads/datatypes/storage_common.pb.go b/storage/reads/datatypes/storage_common.pb.go index 8b8272c35b..8f0c9e6d91 100644 --- a/storage/reads/datatypes/storage_common.pb.go +++ b/storage/reads/datatypes/storage_common.pb.go @@ -60,7 +60,7 @@ func (x ReadRequest_Group) String() string { } func (ReadRequest_Group) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{0, 0} + return fileDescriptor_715e4bf4cdf1f73d, []int{1, 0} } type ReadRequest_HintFlags int32 @@ -92,7 +92,7 @@ func (x ReadRequest_HintFlags) String() string { } func (ReadRequest_HintFlags) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{0, 1} + return fileDescriptor_715e4bf4cdf1f73d, []int{1, 1} } type Aggregate_AggregateType int32 @@ -120,7 +120,7 @@ func (x Aggregate_AggregateType) String() string { } func (Aggregate_AggregateType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{1, 0} + return fileDescriptor_715e4bf4cdf1f73d, []int{2, 0} } type ReadResponse_FrameType int32 @@ -145,7 +145,7 @@ func (x ReadResponse_FrameType) String() string { } func (ReadResponse_FrameType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{3, 0} + return fileDescriptor_715e4bf4cdf1f73d, []int{4, 0} } type ReadResponse_DataType int32 @@ -179,9 +179,48 @@ func (x ReadResponse_DataType) String() string { } func (ReadResponse_DataType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{3, 1} + return fileDescriptor_715e4bf4cdf1f73d, []int{4, 1} } +type ReadFilterRequest struct { + ReadSource *types.Any `protobuf:"bytes,1,opt,name=read_source,json=readSource,proto3" json:"read_source,omitempty"` + Range TimestampRange `protobuf:"bytes,2,opt,name=range,proto3" json:"range"` + Predicate *Predicate `protobuf:"bytes,3,opt,name=predicate,proto3" json:"predicate,omitempty"` +} + +func (m *ReadFilterRequest) Reset() { *m = ReadFilterRequest{} } +func (m *ReadFilterRequest) String() string { return proto.CompactTextString(m) } +func (*ReadFilterRequest) ProtoMessage() {} +func (*ReadFilterRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_715e4bf4cdf1f73d, []int{0} +} +func (m *ReadFilterRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ReadFilterRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ReadFilterRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ReadFilterRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReadFilterRequest.Merge(m, src) +} +func (m *ReadFilterRequest) XXX_Size() int { + return m.Size() +} +func (m *ReadFilterRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ReadFilterRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ReadFilterRequest proto.InternalMessageInfo + // Request message for Storage.Read. type ReadRequest struct { ReadSource *types.Any `protobuf:"bytes,13,opt,name=read_source,json=readSource,proto3" json:"read_source,omitempty"` @@ -215,7 +254,7 @@ func (m *ReadRequest) Reset() { *m = ReadRequest{} } func (m *ReadRequest) String() string { return proto.CompactTextString(m) } func (*ReadRequest) ProtoMessage() {} func (*ReadRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{0} + return fileDescriptor_715e4bf4cdf1f73d, []int{1} } func (m *ReadRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -252,7 +291,7 @@ func (m *Aggregate) Reset() { *m = Aggregate{} } func (m *Aggregate) String() string { return proto.CompactTextString(m) } func (*Aggregate) ProtoMessage() {} func (*Aggregate) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{1} + return fileDescriptor_715e4bf4cdf1f73d, []int{2} } func (m *Aggregate) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -290,7 +329,7 @@ func (m *Tag) Reset() { *m = Tag{} } func (m *Tag) String() string { return proto.CompactTextString(m) } func (*Tag) ProtoMessage() {} func (*Tag) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{2} + return fileDescriptor_715e4bf4cdf1f73d, []int{3} } func (m *Tag) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -328,7 +367,7 @@ func (m *ReadResponse) Reset() { *m = ReadResponse{} } func (m *ReadResponse) String() string { return proto.CompactTextString(m) } func (*ReadResponse) ProtoMessage() {} func (*ReadResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{3} + return fileDescriptor_715e4bf4cdf1f73d, []int{4} } func (m *ReadResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -373,7 +412,7 @@ func (m *ReadResponse_Frame) Reset() { *m = ReadResponse_Frame{} } func (m *ReadResponse_Frame) String() string { return proto.CompactTextString(m) } func (*ReadResponse_Frame) ProtoMessage() {} func (*ReadResponse_Frame) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{3, 0} + return fileDescriptor_715e4bf4cdf1f73d, []int{4, 0} } func (m *ReadResponse_Frame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -674,7 +713,7 @@ func (m *ReadResponse_GroupFrame) Reset() { *m = ReadResponse_GroupFrame func (m *ReadResponse_GroupFrame) String() string { return proto.CompactTextString(m) } func (*ReadResponse_GroupFrame) ProtoMessage() {} func (*ReadResponse_GroupFrame) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{3, 1} + return fileDescriptor_715e4bf4cdf1f73d, []int{4, 1} } func (m *ReadResponse_GroupFrame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -712,7 +751,7 @@ func (m *ReadResponse_SeriesFrame) Reset() { *m = ReadResponse_SeriesFra func (m *ReadResponse_SeriesFrame) String() string { return proto.CompactTextString(m) } func (*ReadResponse_SeriesFrame) ProtoMessage() {} func (*ReadResponse_SeriesFrame) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{3, 2} + return fileDescriptor_715e4bf4cdf1f73d, []int{4, 2} } func (m *ReadResponse_SeriesFrame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -750,7 +789,7 @@ func (m *ReadResponse_FloatPointsFrame) Reset() { *m = ReadResponse_Floa func (m *ReadResponse_FloatPointsFrame) String() string { return proto.CompactTextString(m) } func (*ReadResponse_FloatPointsFrame) ProtoMessage() {} func (*ReadResponse_FloatPointsFrame) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{3, 3} + return fileDescriptor_715e4bf4cdf1f73d, []int{4, 3} } func (m *ReadResponse_FloatPointsFrame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -788,7 +827,7 @@ func (m *ReadResponse_IntegerPointsFrame) Reset() { *m = ReadResponse_In func (m *ReadResponse_IntegerPointsFrame) String() string { return proto.CompactTextString(m) } func (*ReadResponse_IntegerPointsFrame) ProtoMessage() {} func (*ReadResponse_IntegerPointsFrame) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{3, 4} + return fileDescriptor_715e4bf4cdf1f73d, []int{4, 4} } func (m *ReadResponse_IntegerPointsFrame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -826,7 +865,7 @@ func (m *ReadResponse_UnsignedPointsFrame) Reset() { *m = ReadResponse_U func (m *ReadResponse_UnsignedPointsFrame) String() string { return proto.CompactTextString(m) } func (*ReadResponse_UnsignedPointsFrame) ProtoMessage() {} func (*ReadResponse_UnsignedPointsFrame) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{3, 5} + return fileDescriptor_715e4bf4cdf1f73d, []int{4, 5} } func (m *ReadResponse_UnsignedPointsFrame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -864,7 +903,7 @@ func (m *ReadResponse_BooleanPointsFrame) Reset() { *m = ReadResponse_Bo func (m *ReadResponse_BooleanPointsFrame) String() string { return proto.CompactTextString(m) } func (*ReadResponse_BooleanPointsFrame) ProtoMessage() {} func (*ReadResponse_BooleanPointsFrame) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{3, 6} + return fileDescriptor_715e4bf4cdf1f73d, []int{4, 6} } func (m *ReadResponse_BooleanPointsFrame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -902,7 +941,7 @@ func (m *ReadResponse_StringPointsFrame) Reset() { *m = ReadResponse_Str func (m *ReadResponse_StringPointsFrame) String() string { return proto.CompactTextString(m) } func (*ReadResponse_StringPointsFrame) ProtoMessage() {} func (*ReadResponse_StringPointsFrame) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{3, 7} + return fileDescriptor_715e4bf4cdf1f73d, []int{4, 7} } func (m *ReadResponse_StringPointsFrame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -939,7 +978,7 @@ func (m *CapabilitiesResponse) Reset() { *m = CapabilitiesResponse{} } func (m *CapabilitiesResponse) String() string { return proto.CompactTextString(m) } func (*CapabilitiesResponse) ProtoMessage() {} func (*CapabilitiesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{4} + return fileDescriptor_715e4bf4cdf1f73d, []int{5} } func (m *CapabilitiesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -975,7 +1014,7 @@ func (m *HintsResponse) Reset() { *m = HintsResponse{} } func (m *HintsResponse) String() string { return proto.CompactTextString(m) } func (*HintsResponse) ProtoMessage() {} func (*HintsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{5} + return fileDescriptor_715e4bf4cdf1f73d, []int{6} } func (m *HintsResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1016,7 +1055,7 @@ func (m *TimestampRange) Reset() { *m = TimestampRange{} } func (m *TimestampRange) String() string { return proto.CompactTextString(m) } func (*TimestampRange) ProtoMessage() {} func (*TimestampRange) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{6} + return fileDescriptor_715e4bf4cdf1f73d, []int{7} } func (m *TimestampRange) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1051,6 +1090,7 @@ func init() { proto.RegisterEnum("influxdata.platform.storage.Aggregate_AggregateType", Aggregate_AggregateType_name, Aggregate_AggregateType_value) proto.RegisterEnum("influxdata.platform.storage.ReadResponse_FrameType", ReadResponse_FrameType_name, ReadResponse_FrameType_value) proto.RegisterEnum("influxdata.platform.storage.ReadResponse_DataType", ReadResponse_DataType_name, ReadResponse_DataType_value) + proto.RegisterType((*ReadFilterRequest)(nil), "influxdata.platform.storage.ReadFilterRequest") proto.RegisterType((*ReadRequest)(nil), "influxdata.platform.storage.ReadRequest") proto.RegisterMapType((map[string]string)(nil), "influxdata.platform.storage.ReadRequest.TraceEntry") proto.RegisterType((*Aggregate)(nil), "influxdata.platform.storage.Aggregate") @@ -1073,104 +1113,107 @@ func init() { func init() { proto.RegisterFile("storage_common.proto", fileDescriptor_715e4bf4cdf1f73d) } var fileDescriptor_715e4bf4cdf1f73d = []byte{ - // 1547 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x57, 0x4f, 0x8f, 0xea, 0xc8, - 0x11, 0xc7, 0xfc, 0xa7, 0xf8, 0x33, 0x7e, 0xbd, 0x64, 0xc4, 0xfa, 0x65, 0xc1, 0x8b, 0xa2, 0x0d, - 0x49, 0x36, 0x4c, 0xc2, 0xee, 0x2a, 0x4f, 0x2f, 0xc9, 0x01, 0xe6, 0xf1, 0x06, 0x32, 0x33, 0x30, - 0x6a, 0x98, 0x28, 0x1b, 0x29, 0x42, 0x3d, 0xd0, 0x78, 0xad, 0x35, 0xb6, 0x63, 0x9b, 0xd5, 0x20, - 0xe5, 0x9e, 0x15, 0xa7, 0xcd, 0x35, 0x12, 0x52, 0xa4, 0x1c, 0x73, 0xcf, 0x67, 0x78, 0xb7, 0xec, - 0x31, 0x27, 0x94, 0xf0, 0x3e, 0x44, 0xa4, 0x9c, 0xa2, 0xee, 0xb6, 0xc1, 0xcc, 0xbc, 0x8c, 0xe0, - 0xd6, 0xf5, 0xab, 0xaa, 0x5f, 0x75, 0x97, 0xab, 0xaa, 0xdb, 0x50, 0x74, 0x3d, 0xcb, 0x21, 0x1a, - 0x1d, 0x8d, 0xad, 0xd9, 0xcc, 0x32, 0xeb, 0xb6, 0x63, 0x79, 0x16, 0x7a, 0xae, 0x9b, 0x53, 0x63, - 0x7e, 0x3f, 0x21, 0x1e, 0xa9, 0xdb, 0x06, 0xf1, 0xa6, 0x96, 0x33, 0xab, 0xfb, 0x96, 0x4a, 0x51, - 0xb3, 0x34, 0x8b, 0xdb, 0x9d, 0xb1, 0x95, 0x70, 0x51, 0x9e, 0x6b, 0x96, 0xa5, 0x19, 0xf4, 0x8c, - 0x4b, 0x77, 0xf3, 0xe9, 0x19, 0x9d, 0xd9, 0xde, 0xc2, 0x57, 0xbe, 0xff, 0x50, 0x49, 0xcc, 0x40, - 0x75, 0x62, 0x3b, 0x74, 0xa2, 0x8f, 0x89, 0x47, 0x05, 0x50, 0xfd, 0x4f, 0x1a, 0xb2, 0x98, 0x92, - 0x09, 0xa6, 0xbf, 0x9f, 0x53, 0xd7, 0x43, 0x6d, 0xc8, 0x3a, 0x94, 0x4c, 0x46, 0xae, 0x35, 0x77, - 0xc6, 0xb4, 0x94, 0x57, 0xa5, 0x5a, 0xb6, 0x51, 0xac, 0x0b, 0xc6, 0x7a, 0xc0, 0x58, 0x6f, 0x9a, - 0x8b, 0x56, 0x61, 0xb3, 0xae, 0x00, 0xf3, 0x1d, 0x70, 0x5b, 0x0c, 0xce, 0x76, 0x8d, 0x0c, 0x38, - 0xf1, 0xf4, 0x19, 0x75, 0x3d, 0x32, 0xb3, 0x47, 0x0e, 0x31, 0x35, 0x5a, 0x8a, 0x72, 0xaa, 0x1f, - 0xd5, 0x9f, 0x38, 0x6c, 0x7d, 0x18, 0xf8, 0x60, 0xe6, 0xd2, 0x3a, 0x7d, 0xb3, 0xae, 0x44, 0x36, - 0xeb, 0x4a, 0x61, 0x1f, 0xc7, 0x05, 0x6f, 0x4f, 0x46, 0x65, 0x80, 0x09, 0x75, 0xc7, 0xd4, 0x9c, - 0xe8, 0xa6, 0x56, 0x8a, 0xa9, 0x52, 0x2d, 0x8d, 0x43, 0x08, 0xfa, 0x18, 0x40, 0x73, 0xac, 0xb9, - 0x3d, 0xfa, 0x92, 0x2e, 0xdc, 0x52, 0x5c, 0x8d, 0xd5, 0x32, 0xad, 0xfc, 0x66, 0x5d, 0xc9, 0x5c, - 0x30, 0xf4, 0x92, 0x2e, 0x5c, 0x9c, 0xd1, 0x82, 0x25, 0x7a, 0x05, 0x09, 0x2e, 0x94, 0xb2, 0xaa, - 0x54, 0x2b, 0x34, 0xea, 0x4f, 0xee, 0x38, 0x94, 0xbb, 0x3a, 0x67, 0xc3, 0xc2, 0x19, 0xbd, 0x82, - 0x0c, 0xd1, 0x34, 0x87, 0x6a, 0xc4, 0xa3, 0xa5, 0x0c, 0x3f, 0xfb, 0x47, 0x4f, 0x32, 0x35, 0x03, - 0x6b, 0xbc, 0x73, 0x64, 0x2c, 0xdb, 0x2f, 0x56, 0x4a, 0x1c, 0xc0, 0x72, 0x13, 0x58, 0xe3, 0x9d, - 0x23, 0x6a, 0x40, 0xce, 0xa5, 0x8e, 0x4e, 0xdd, 0x91, 0xa1, 0xcf, 0x74, 0xaf, 0x94, 0x54, 0xa5, - 0x5a, 0xac, 0x75, 0xb2, 0x59, 0x57, 0xb2, 0x03, 0x8e, 0x5f, 0x31, 0x18, 0x67, 0xdd, 0x9d, 0x80, - 0x3e, 0x83, 0xbc, 0xef, 0x63, 0x4d, 0xa7, 0x2e, 0xf5, 0x4a, 0x29, 0xee, 0x24, 0x6f, 0xd6, 0x95, - 0x9c, 0x70, 0xea, 0x73, 0x1c, 0xfb, 0xd4, 0x42, 0x62, 0xa1, 0x6c, 0x4b, 0x37, 0xbd, 0x20, 0x54, - 0x7a, 0x17, 0xea, 0x86, 0xe3, 0x7e, 0x28, 0x7b, 0x27, 0xa0, 0x21, 0x24, 0x3c, 0x87, 0x8c, 0x69, - 0x09, 0xd4, 0x58, 0x2d, 0xdb, 0xf8, 0xe4, 0xe0, 0x84, 0x0f, 0x99, 0x57, 0xdb, 0xf4, 0x9c, 0x45, - 0x2b, 0xb3, 0x59, 0x57, 0x12, 0x5c, 0xc6, 0x82, 0x0c, 0x7d, 0x0c, 0x89, 0x2f, 0x58, 0x8c, 0x52, - 0x4e, 0x95, 0x6a, 0xa9, 0xd6, 0x29, 0x33, 0xe8, 0x30, 0xe0, 0xbf, 0xeb, 0x4a, 0x86, 0x2d, 0x5e, - 0x1b, 0x44, 0x73, 0xb1, 0x30, 0x52, 0x5e, 0x00, 0xec, 0xd8, 0x90, 0x0c, 0xb1, 0x2f, 0xe9, 0xa2, - 0x24, 0xa9, 0x52, 0x2d, 0x83, 0xd9, 0x12, 0x15, 0x21, 0xf1, 0x15, 0x31, 0xe6, 0xa2, 0x8c, 0x33, - 0x58, 0x08, 0x2f, 0xa3, 0x2f, 0xa4, 0xea, 0x1f, 0x25, 0x48, 0xf0, 0x2f, 0x8f, 0x3e, 0x00, 0xb8, - 0xc0, 0xfd, 0xdb, 0x9b, 0x51, 0xaf, 0xdf, 0x6b, 0xcb, 0x11, 0x25, 0xbf, 0x5c, 0xa9, 0xa2, 0xc4, - 0x7a, 0x96, 0x49, 0xd1, 0x73, 0xc8, 0x08, 0x75, 0xf3, 0xea, 0x4a, 0x96, 0x94, 0xdc, 0x72, 0xa5, - 0xa6, 0xb9, 0xb6, 0x69, 0x18, 0xe8, 0x7d, 0x48, 0x0b, 0x65, 0xeb, 0x73, 0x39, 0xaa, 0x64, 0x97, - 0x2b, 0x35, 0xc5, 0x75, 0xad, 0x05, 0xfa, 0x10, 0x72, 0x42, 0xd5, 0xfe, 0xcd, 0x79, 0xfb, 0x66, - 0x28, 0xc7, 0x94, 0x93, 0xe5, 0x4a, 0xcd, 0x72, 0x75, 0xfb, 0x7e, 0x4c, 0x6d, 0x4f, 0x89, 0x7f, - 0xfd, 0xd7, 0x72, 0xa4, 0xfa, 0x37, 0x09, 0x76, 0x07, 0x63, 0xe1, 0x3a, 0xdd, 0xde, 0x30, 0xd8, - 0x0c, 0x0f, 0xc7, 0xb4, 0x7c, 0x2f, 0xdf, 0x83, 0x82, 0xaf, 0x1c, 0xdd, 0xf4, 0xbb, 0xbd, 0xe1, - 0x40, 0x96, 0x14, 0x79, 0xb9, 0x52, 0x73, 0xc2, 0x42, 0x7c, 0xaa, 0xb0, 0xd5, 0xa0, 0x8d, 0xbb, - 0xed, 0x81, 0x1c, 0x0d, 0x5b, 0x89, 0x32, 0x40, 0x67, 0x50, 0xe4, 0x56, 0x83, 0xf3, 0x4e, 0xfb, - 0xba, 0xc9, 0x4e, 0x37, 0x1a, 0x76, 0xaf, 0xdb, 0x72, 0x5c, 0xf9, 0xce, 0x72, 0xa5, 0x3e, 0x63, - 0xb6, 0x83, 0xf1, 0x17, 0x74, 0x46, 0x9a, 0x86, 0xc1, 0x1a, 0xd9, 0xdf, 0xed, 0x3f, 0x24, 0xc8, - 0x6c, 0x6b, 0x1e, 0x75, 0x20, 0xee, 0x2d, 0x6c, 0xca, 0x53, 0x5e, 0x68, 0x7c, 0x7a, 0x58, 0xa7, - 0xec, 0x56, 0xc3, 0x85, 0x4d, 0x31, 0x67, 0xa8, 0xde, 0x43, 0x7e, 0x0f, 0x46, 0x15, 0x88, 0xfb, - 0x39, 0xe0, 0xfb, 0xd9, 0x53, 0xf2, 0x64, 0x7c, 0x00, 0xb1, 0xc1, 0xed, 0xb5, 0x2c, 0x29, 0xc5, - 0xe5, 0x4a, 0x95, 0xf7, 0xf4, 0x83, 0xf9, 0x0c, 0x7d, 0x08, 0x89, 0xf3, 0xfe, 0x6d, 0x6f, 0x28, - 0x47, 0x95, 0xd3, 0xe5, 0x4a, 0x45, 0x7b, 0x06, 0xe7, 0xd6, 0xdc, 0x0c, 0xf2, 0xff, 0x63, 0x88, - 0x0d, 0x89, 0x16, 0x2e, 0x9e, 0xdc, 0x3b, 0x8a, 0x27, 0xe7, 0x17, 0x4f, 0xf5, 0x4f, 0x05, 0xc8, - 0x89, 0x6a, 0x76, 0x6d, 0xcb, 0x74, 0x29, 0xba, 0x86, 0xe4, 0xd4, 0x21, 0x33, 0xea, 0x96, 0x24, - 0xde, 0x08, 0x67, 0x07, 0x34, 0x82, 0x70, 0xad, 0xbf, 0x66, 0x7e, 0xad, 0x38, 0x9b, 0x97, 0xd8, - 0x27, 0x51, 0xbe, 0x4e, 0x42, 0x82, 0xe3, 0xe8, 0x2a, 0x98, 0x68, 0x29, 0x3e, 0x41, 0x3e, 0x3d, - 0x9c, 0x97, 0x17, 0x19, 0x27, 0xe9, 0x44, 0x82, 0xc9, 0xd6, 0x87, 0xa4, 0x68, 0x79, 0x7e, 0xc4, - 0x6c, 0xe3, 0xb3, 0xc3, 0xe9, 0x44, 0xc5, 0x04, 0x7c, 0x3e, 0x0d, 0xb2, 0x21, 0x37, 0x35, 0x2c, - 0xe2, 0x8d, 0xc4, 0x50, 0xf0, 0x6f, 0x8a, 0x97, 0x47, 0x9c, 0x9e, 0x79, 0x8b, 0x9a, 0x15, 0x89, - 0xe0, 0xf3, 0x26, 0x84, 0x76, 0x22, 0x38, 0x3b, 0xdd, 0x89, 0xe8, 0x1e, 0x0a, 0xba, 0xe9, 0x51, - 0x8d, 0x3a, 0x41, 0xcc, 0x18, 0x8f, 0xf9, 0x8b, 0xc3, 0x63, 0x76, 0x85, 0x7f, 0x38, 0xea, 0xb3, - 0xcd, 0xba, 0x92, 0xdf, 0xc3, 0x3b, 0x11, 0x9c, 0xd7, 0xc3, 0x00, 0xfa, 0x03, 0x9c, 0xcc, 0x4d, - 0x57, 0xd7, 0x4c, 0x3a, 0x09, 0x42, 0xc7, 0x79, 0xe8, 0x5f, 0x1e, 0x1e, 0xfa, 0xd6, 0x27, 0x08, - 0xc7, 0x46, 0xec, 0x9a, 0xdc, 0x57, 0x74, 0x22, 0xb8, 0x30, 0xdf, 0x43, 0xd8, 0xb9, 0xef, 0x2c, - 0xcb, 0xa0, 0xc4, 0x0c, 0x82, 0x27, 0x8e, 0x3d, 0x77, 0x4b, 0xf8, 0x3f, 0x3a, 0xf7, 0x1e, 0xce, - 0xce, 0x7d, 0x17, 0x06, 0x90, 0x07, 0x79, 0xd7, 0x73, 0x74, 0x53, 0x0b, 0x02, 0x27, 0x79, 0xe0, - 0x9f, 0x1f, 0x51, 0x3b, 0xdc, 0x3d, 0x1c, 0x57, 0xdc, 0x45, 0x21, 0xb8, 0x13, 0xc1, 0x39, 0x37, - 0x24, 0xb7, 0x92, 0x10, 0x67, 0xcc, 0xca, 0x3d, 0xc0, 0xae, 0x92, 0xd1, 0x47, 0x90, 0xf6, 0x88, - 0x26, 0x1e, 0x03, 0xac, 0xd3, 0x72, 0xad, 0xec, 0x66, 0x5d, 0x49, 0x0d, 0x89, 0xc6, 0x9f, 0x02, - 0x29, 0x4f, 0x2c, 0x50, 0x0b, 0x90, 0x4d, 0x1c, 0x4f, 0xf7, 0x74, 0xcb, 0x64, 0xd6, 0xa3, 0xaf, - 0x88, 0xc1, 0xaa, 0x93, 0x79, 0x14, 0x37, 0xeb, 0x8a, 0x7c, 0x13, 0x68, 0x2f, 0xe9, 0xe2, 0xd7, - 0xc4, 0x70, 0xb1, 0x6c, 0x3f, 0x40, 0x94, 0x3f, 0x4b, 0x90, 0x0d, 0x55, 0x3d, 0x7a, 0x09, 0x71, - 0x8f, 0x68, 0x41, 0x87, 0xab, 0x4f, 0xbf, 0x86, 0x88, 0xe6, 0xb7, 0x34, 0xf7, 0x41, 0x7d, 0xc8, - 0x30, 0xc3, 0x11, 0x1f, 0x94, 0x51, 0x3e, 0x28, 0x1b, 0x87, 0xe7, 0xef, 0x15, 0xf1, 0x08, 0x1f, - 0x93, 0xe9, 0x89, 0xbf, 0x52, 0x7e, 0x05, 0xf2, 0xc3, 0xd6, 0x61, 0x6f, 0xa9, 0xed, 0xeb, 0x4a, - 0x6c, 0x53, 0xc6, 0x21, 0x04, 0x9d, 0x42, 0x92, 0x8f, 0x2f, 0x91, 0x08, 0x09, 0xfb, 0x92, 0x72, - 0x05, 0xe8, 0x71, 0x4b, 0x1c, 0xc9, 0x16, 0xdb, 0xb2, 0x5d, 0xc3, 0x7b, 0xef, 0xa8, 0xf2, 0x23, - 0xe9, 0xe2, 0xe1, 0xcd, 0x3d, 0xae, 0xdb, 0x23, 0xd9, 0xd2, 0x5b, 0xb6, 0x4b, 0x78, 0xf6, 0xa8, - 0x18, 0x8f, 0x24, 0xcb, 0x04, 0x64, 0xd5, 0x01, 0x64, 0x38, 0x81, 0x7f, 0x55, 0x25, 0xfd, 0x8b, - 0x36, 0xa2, 0xbc, 0xb7, 0x5c, 0xa9, 0x27, 0x5b, 0x95, 0x7f, 0xd7, 0x56, 0x20, 0xb9, 0xbd, 0xaf, - 0xf7, 0x0d, 0xc4, 0x5e, 0xfc, 0x9b, 0xe8, 0xef, 0x12, 0xa4, 0x83, 0xef, 0x8d, 0xbe, 0x0b, 0x89, - 0xd7, 0x57, 0xfd, 0xe6, 0x50, 0x8e, 0x28, 0xcf, 0x96, 0x2b, 0x35, 0x1f, 0x28, 0xf8, 0xa7, 0x47, - 0x2a, 0xa4, 0xba, 0xbd, 0x61, 0xfb, 0xa2, 0x8d, 0x03, 0xca, 0x40, 0xef, 0x7f, 0x4e, 0x54, 0x85, - 0xf4, 0x6d, 0x6f, 0xd0, 0xbd, 0xe8, 0xb5, 0x5f, 0xc9, 0x51, 0x71, 0x47, 0x06, 0x26, 0xc1, 0x37, - 0x62, 0x2c, 0xad, 0x7e, 0xff, 0xaa, 0xdd, 0xec, 0xc9, 0xb1, 0x7d, 0x16, 0x3f, 0xef, 0xa8, 0x0c, - 0xc9, 0xc1, 0x10, 0x77, 0x7b, 0x17, 0x72, 0x5c, 0x41, 0xcb, 0x95, 0x5a, 0x08, 0x0c, 0x44, 0x2a, - 0xfd, 0x8d, 0xff, 0x45, 0x82, 0xe2, 0x39, 0xb1, 0xc9, 0x9d, 0x6e, 0xe8, 0x9e, 0x4e, 0xdd, 0xed, - 0xdd, 0xd8, 0x87, 0xf8, 0x98, 0xd8, 0x41, 0xdf, 0x3c, 0x3d, 0x36, 0xde, 0x45, 0xc0, 0x40, 0x97, - 0x3f, 0xee, 0x30, 0x27, 0x52, 0x7e, 0x06, 0x99, 0x2d, 0x74, 0xd4, 0x7b, 0xef, 0x04, 0xf2, 0xfc, - 0x19, 0x19, 0x30, 0x57, 0x5f, 0xc0, 0x83, 0xff, 0x13, 0xe6, 0xec, 0x7a, 0xc4, 0xf1, 0x38, 0x61, - 0x0c, 0x0b, 0x81, 0x05, 0xa1, 0xe6, 0x84, 0x13, 0xc6, 0x30, 0x5b, 0x36, 0xbe, 0x89, 0x42, 0x6a, - 0x20, 0x36, 0x8d, 0x7e, 0x07, 0x71, 0xd6, 0xae, 0xa8, 0x76, 0xe8, 0xeb, 0x57, 0xf9, 0xc1, 0xc1, - 0xbd, 0xff, 0x13, 0x09, 0x7d, 0x0e, 0xb9, 0x70, 0x5a, 0xd0, 0xe9, 0xa3, 0x5f, 0xba, 0x36, 0xfb, - 0x83, 0x54, 0x7e, 0x7a, 0x74, 0x66, 0xd1, 0x25, 0x88, 0x77, 0xf5, 0xff, 0xe5, 0xfc, 0xe1, 0x93, - 0x9c, 0x7b, 0xc9, 0x6c, 0x7d, 0xff, 0xcd, 0xbf, 0xcb, 0x91, 0x37, 0x9b, 0xb2, 0xf4, 0xed, 0xa6, - 0x2c, 0xfd, 0x6b, 0x53, 0x96, 0xbe, 0x79, 0x5b, 0x8e, 0x7c, 0xfb, 0xb6, 0x1c, 0xf9, 0xe7, 0xdb, - 0x72, 0xe4, 0xb7, 0x7c, 0xfe, 0xb1, 0xf1, 0xe7, 0xde, 0x25, 0x79, 0x90, 0x4f, 0xfe, 0x17, 0x00, - 0x00, 0xff, 0xff, 0x61, 0xb5, 0x14, 0xbe, 0x53, 0x0f, 0x00, 0x00, + // 1588 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x57, 0xcd, 0x6f, 0x23, 0x49, + 0x15, 0x77, 0xfb, 0xdb, 0xcf, 0x1f, 0xe9, 0xd4, 0x9a, 0xc8, 0xdb, 0xc3, 0xda, 0xbd, 0x16, 0x5a, + 0x02, 0x2c, 0x0e, 0x64, 0x77, 0xc5, 0x68, 0x80, 0x83, 0x9d, 0x71, 0x62, 0x93, 0xc4, 0x8e, 0xca, + 0x0e, 0x62, 0x91, 0x90, 0x55, 0xb1, 0xcb, 0xbd, 0xad, 0x6d, 0x77, 0x37, 0xdd, 0xed, 0x55, 0x2c, + 0x71, 0x67, 0xe5, 0x13, 0x5c, 0x91, 0x2c, 0x21, 0x71, 0xe4, 0xce, 0xdf, 0x30, 0x37, 0xf6, 0xc8, + 0xc9, 0x02, 0xcf, 0x89, 0xbf, 0x00, 0x09, 0x2e, 0xa8, 0xaa, 0xba, 0xed, 0x76, 0x32, 0x9b, 0xb5, + 0xe7, 0x56, 0xf5, 0x3e, 0x7e, 0xef, 0x55, 0xbd, 0xaf, 0x2a, 0x28, 0xba, 0x9e, 0xe5, 0x10, 0x8d, + 0x0e, 0x86, 0xd6, 0x64, 0x62, 0x99, 0x35, 0xdb, 0xb1, 0x3c, 0x0b, 0x3d, 0xd3, 0xcd, 0xb1, 0x31, + 0xbd, 0x1f, 0x11, 0x8f, 0xd4, 0x6c, 0x83, 0x78, 0x63, 0xcb, 0x99, 0xd4, 0x7c, 0x49, 0xa5, 0xa8, + 0x59, 0x9a, 0xc5, 0xe5, 0x4e, 0xd8, 0x4a, 0xa8, 0x28, 0xcf, 0x34, 0xcb, 0xd2, 0x0c, 0x7a, 0xc2, + 0x77, 0x77, 0xd3, 0xf1, 0x09, 0x9d, 0xd8, 0xde, 0xcc, 0x67, 0xbe, 0xfb, 0x90, 0x49, 0xcc, 0x80, + 0x75, 0x60, 0x3b, 0x74, 0xa4, 0x0f, 0x89, 0x47, 0x05, 0xa1, 0xfa, 0x6f, 0x09, 0x0e, 0x31, 0x25, + 0xa3, 0x73, 0xdd, 0xf0, 0xa8, 0x83, 0xe9, 0x6f, 0xa7, 0xd4, 0xf5, 0x50, 0x13, 0xb2, 0x0e, 0x25, + 0xa3, 0x81, 0x6b, 0x4d, 0x9d, 0x21, 0x2d, 0x49, 0xaa, 0x74, 0x9c, 0x3d, 0x2d, 0xd6, 0x04, 0x6e, + 0x2d, 0xc0, 0xad, 0xd5, 0xcd, 0x59, 0xa3, 0xb0, 0x5a, 0x56, 0x80, 0x21, 0xf4, 0xb8, 0x2c, 0x06, + 0x67, 0xbd, 0x46, 0x17, 0x90, 0x70, 0x88, 0xa9, 0xd1, 0x52, 0x94, 0x03, 0xfc, 0xa0, 0xf6, 0xc4, + 0x41, 0x6b, 0x7d, 0x7d, 0x42, 0x5d, 0x8f, 0x4c, 0x6c, 0xcc, 0x54, 0x1a, 0xf1, 0x57, 0xcb, 0x4a, + 0x04, 0x0b, 0x7d, 0xf4, 0x12, 0x32, 0x6b, 0xc7, 0x4b, 0x31, 0x0e, 0xf6, 0xc1, 0x93, 0x60, 0x37, + 0x81, 0x34, 0xde, 0x28, 0x56, 0xff, 0x93, 0x86, 0x2c, 0xf3, 0xf4, 0x6b, 0x4e, 0x99, 0x7f, 0xcb, + 0x53, 0x1a, 0x70, 0xe0, 0x05, 0xbe, 0x0f, 0xde, 0xfa, 0xbc, 0x47, 0xec, 0xbc, 0xab, 0x65, 0xa5, + 0xb0, 0x4d, 0xc7, 0x05, 0x6f, 0x6b, 0x8f, 0xca, 0x00, 0x23, 0xea, 0x0e, 0xa9, 0x39, 0xd2, 0x4d, + 0x8d, 0xdf, 0x45, 0x1a, 0x87, 0x28, 0xe8, 0x43, 0x00, 0xcd, 0xb1, 0xa6, 0xf6, 0xe0, 0x73, 0x3a, + 0x73, 0x4b, 0x71, 0x35, 0x76, 0x9c, 0x69, 0xe4, 0x57, 0xcb, 0x4a, 0xe6, 0x82, 0x51, 0x2f, 0xe9, + 0xcc, 0xc5, 0x19, 0x2d, 0x58, 0xa2, 0x97, 0x90, 0xe0, 0x9b, 0x52, 0x56, 0x95, 0x8e, 0x0b, 0xa7, + 0xb5, 0x27, 0x3d, 0x0e, 0xdd, 0x5d, 0x8d, 0xa3, 0x61, 0xa1, 0xcc, 0xc2, 0x43, 0x34, 0xcd, 0xa1, + 0x1a, 0x0b, 0x4f, 0x66, 0x87, 0xf0, 0xd4, 0x03, 0x69, 0xbc, 0x51, 0xdc, 0x0e, 0x72, 0xe2, 0x2d, + 0x83, 0x8c, 0x4e, 0x21, 0xe7, 0x52, 0x47, 0xa7, 0xee, 0xc0, 0xd0, 0x27, 0xba, 0x57, 0x4a, 0xaa, + 0xd2, 0x71, 0xac, 0x71, 0xb0, 0x5a, 0x56, 0xb2, 0x3d, 0x4e, 0xbf, 0x62, 0x64, 0x9c, 0x75, 0x37, + 0x1b, 0xf4, 0x09, 0xe4, 0x7d, 0x1d, 0x6b, 0x3c, 0x76, 0xa9, 0x57, 0x4a, 0x71, 0x25, 0x79, 0xb5, + 0xac, 0xe4, 0x84, 0x52, 0x97, 0xd3, 0xb1, 0x0f, 0x2d, 0x76, 0xcc, 0x94, 0x6d, 0xe9, 0xa6, 0x17, + 0x98, 0x4a, 0x6f, 0x4c, 0xdd, 0x70, 0xba, 0x6f, 0xca, 0xde, 0x6c, 0x50, 0x1f, 0x12, 0x9e, 0x43, + 0x86, 0xb4, 0x04, 0x6a, 0xec, 0x38, 0x7b, 0xfa, 0xd1, 0xce, 0x17, 0xde, 0x67, 0x5a, 0x4d, 0xd3, + 0x73, 0x66, 0x8d, 0xcc, 0x6a, 0x59, 0x49, 0xf0, 0x3d, 0x16, 0x60, 0xe8, 0x43, 0x48, 0x7c, 0xc6, + 0x6c, 0x94, 0x72, 0xaa, 0x74, 0x9c, 0x6a, 0x1c, 0x31, 0x81, 0x16, 0x23, 0xfc, 0x77, 0x59, 0xc9, + 0xb0, 0xc5, 0xb9, 0x41, 0x34, 0x17, 0x0b, 0x21, 0xe5, 0x39, 0xc0, 0x06, 0x0d, 0xc9, 0x10, 0xfb, + 0x9c, 0xce, 0x78, 0x8d, 0x67, 0x30, 0x5b, 0xa2, 0x22, 0x24, 0xbe, 0x20, 0xc6, 0x54, 0xa4, 0x71, + 0x06, 0x8b, 0xcd, 0x8b, 0xe8, 0x73, 0xa9, 0xfa, 0x7b, 0x09, 0x12, 0x3c, 0xf2, 0xe8, 0x3d, 0x80, + 0x0b, 0xdc, 0xbd, 0xbd, 0x19, 0x74, 0xba, 0x9d, 0xa6, 0x1c, 0x51, 0xf2, 0xf3, 0x85, 0x2a, 0x52, + 0xac, 0x63, 0x99, 0x14, 0x3d, 0x83, 0x8c, 0x60, 0xd7, 0xaf, 0xae, 0x64, 0x49, 0xc9, 0xcd, 0x17, + 0x6a, 0x9a, 0x73, 0xeb, 0x86, 0x81, 0xde, 0x85, 0xb4, 0x60, 0x36, 0x3e, 0x95, 0xa3, 0x4a, 0x76, + 0xbe, 0x50, 0x53, 0x9c, 0xd7, 0x98, 0xa1, 0xf7, 0x21, 0x27, 0x58, 0xcd, 0x5f, 0x9d, 0x35, 0x6f, + 0xfa, 0x72, 0x4c, 0x39, 0x98, 0x2f, 0xd4, 0x2c, 0x67, 0x37, 0xef, 0x87, 0xd4, 0xf6, 0x94, 0xf8, + 0x97, 0x7f, 0x29, 0x47, 0xaa, 0x7f, 0x95, 0x60, 0x73, 0x30, 0x66, 0xae, 0xd5, 0xee, 0xf4, 0x03, + 0x67, 0xb8, 0x39, 0xc6, 0xe5, 0xbe, 0x7c, 0x07, 0x0a, 0x3e, 0x73, 0x70, 0xd3, 0x6d, 0x77, 0xfa, + 0x3d, 0x59, 0x52, 0xe4, 0xf9, 0x42, 0xcd, 0x09, 0x09, 0x11, 0xaa, 0xb0, 0x54, 0xaf, 0x89, 0xdb, + 0xcd, 0x9e, 0x1c, 0x0d, 0x4b, 0x89, 0x34, 0x40, 0x27, 0x50, 0xe4, 0x52, 0xbd, 0xb3, 0x56, 0xf3, + 0xba, 0xce, 0x4e, 0x37, 0xe8, 0xb7, 0xaf, 0x9b, 0x72, 0x5c, 0xf9, 0xd6, 0x7c, 0xa1, 0x1e, 0x32, + 0xd9, 0xde, 0xf0, 0x33, 0x3a, 0x21, 0x75, 0xc3, 0x60, 0x85, 0xec, 0x7b, 0xfb, 0x77, 0x09, 0x32, + 0xeb, 0x9c, 0x47, 0x2d, 0x88, 0x7b, 0x33, 0x5b, 0xb4, 0xd5, 0xc2, 0xe9, 0xc7, 0xbb, 0x55, 0xca, + 0x66, 0xd5, 0x9f, 0xd9, 0x14, 0x73, 0x84, 0xea, 0x3d, 0xe4, 0xb7, 0xc8, 0xa8, 0x02, 0x71, 0xff, + 0x0e, 0xb8, 0x3f, 0x5b, 0x4c, 0x7e, 0x19, 0xef, 0x41, 0xac, 0x77, 0x7b, 0x2d, 0x4b, 0x4a, 0x71, + 0xbe, 0x50, 0xe5, 0x2d, 0x7e, 0x6f, 0x3a, 0x41, 0xef, 0x43, 0xe2, 0xac, 0x7b, 0xdb, 0xe9, 0xcb, + 0x51, 0xe5, 0x68, 0xbe, 0x50, 0xd1, 0x96, 0xc0, 0x99, 0x35, 0x35, 0x83, 0xfb, 0xff, 0x21, 0xc4, + 0xfa, 0x44, 0x0b, 0x27, 0x4f, 0xee, 0x0d, 0xc9, 0x93, 0xf3, 0x93, 0xa7, 0xfa, 0xc7, 0x02, 0xe4, + 0x44, 0x36, 0xbb, 0xb6, 0x65, 0xba, 0x14, 0x5d, 0x43, 0x72, 0xec, 0x90, 0x09, 0x75, 0x4b, 0x12, + 0x2f, 0x84, 0x93, 0x1d, 0x0a, 0x41, 0xa8, 0xd6, 0xce, 0x99, 0x9e, 0x3f, 0x1f, 0x7c, 0x10, 0xe5, + 0xcb, 0x24, 0x24, 0x38, 0x1d, 0x5d, 0x05, 0x1d, 0x2d, 0xc5, 0x3b, 0xc8, 0xc7, 0xbb, 0xe3, 0xf2, + 0x24, 0xe3, 0x20, 0xad, 0x48, 0xd0, 0xd9, 0xba, 0x90, 0x14, 0x25, 0xef, 0xcf, 0xc0, 0x4f, 0x76, + 0x87, 0x13, 0x19, 0x13, 0xe0, 0xf9, 0x30, 0xc8, 0x86, 0xdc, 0xd8, 0xb0, 0x88, 0x37, 0x10, 0x4d, + 0xc1, 0x9f, 0x14, 0x2f, 0xf6, 0x38, 0x3d, 0xd3, 0x16, 0x39, 0x2b, 0x2e, 0x82, 0xf7, 0x9b, 0x10, + 0xb5, 0x15, 0xc1, 0xd9, 0xf1, 0x66, 0x8b, 0xee, 0xa1, 0xa0, 0x9b, 0x1e, 0xd5, 0xa8, 0x13, 0xd8, + 0x14, 0x03, 0xf4, 0x67, 0xbb, 0xdb, 0x6c, 0x0b, 0xfd, 0xb0, 0xd5, 0xc3, 0xd5, 0xb2, 0x92, 0xdf, + 0xa2, 0xb7, 0x22, 0x38, 0xaf, 0x87, 0x09, 0xe8, 0x77, 0x70, 0x30, 0x35, 0x5d, 0x5d, 0x33, 0xe9, + 0x28, 0x30, 0x1d, 0xe7, 0xa6, 0x7f, 0xbe, 0xbb, 0xe9, 0x5b, 0x1f, 0x20, 0x6c, 0x1b, 0xb1, 0x31, + 0xb9, 0xcd, 0x68, 0x45, 0x70, 0x61, 0xba, 0x45, 0x61, 0xe7, 0xbe, 0xb3, 0x2c, 0x83, 0x12, 0x33, + 0x30, 0x9e, 0xd8, 0xf7, 0xdc, 0x0d, 0xa1, 0xff, 0xe8, 0xdc, 0x5b, 0x74, 0x76, 0xee, 0xbb, 0x30, + 0x01, 0x79, 0x90, 0x77, 0x3d, 0x47, 0x37, 0xb5, 0xc0, 0x70, 0x92, 0x1b, 0xfe, 0xe9, 0x1e, 0xb9, + 0xc3, 0xd5, 0xc3, 0x76, 0xc5, 0x2c, 0x0a, 0x91, 0x5b, 0x11, 0x9c, 0x73, 0x43, 0xfb, 0x46, 0x12, + 0xe2, 0x0c, 0x59, 0xb9, 0x07, 0xd8, 0x64, 0x32, 0xfa, 0x00, 0xd2, 0x1e, 0xd1, 0xc4, 0x63, 0x80, + 0x55, 0x5a, 0xae, 0x91, 0x5d, 0x2d, 0x2b, 0xa9, 0x3e, 0xd1, 0xf8, 0x53, 0x20, 0xe5, 0x89, 0x05, + 0x6a, 0x00, 0xb2, 0x89, 0xe3, 0xe9, 0x9e, 0x6e, 0x99, 0x4c, 0x7a, 0xf0, 0x05, 0x31, 0x58, 0x76, + 0x32, 0x8d, 0xe2, 0x6a, 0x59, 0x91, 0x6f, 0x02, 0xee, 0x25, 0x9d, 0xfd, 0x92, 0x18, 0x2e, 0x96, + 0xed, 0x07, 0x14, 0xe5, 0x4f, 0x12, 0x64, 0x43, 0x59, 0x8f, 0x5e, 0x40, 0xdc, 0x23, 0x5a, 0x50, + 0xe1, 0xea, 0xd3, 0xaf, 0x21, 0xa2, 0xf9, 0x25, 0xcd, 0x75, 0x50, 0x17, 0x32, 0x4c, 0x70, 0xc0, + 0x1b, 0x65, 0x94, 0x37, 0xca, 0xd3, 0xdd, 0xef, 0xef, 0x25, 0xf1, 0x08, 0x6f, 0x93, 0xe9, 0x91, + 0xbf, 0x52, 0x7e, 0x01, 0xf2, 0xc3, 0xd2, 0x61, 0x6f, 0xa9, 0xf5, 0xeb, 0x4a, 0xb8, 0x29, 0xe3, + 0x10, 0x05, 0x1d, 0x41, 0x92, 0xb7, 0x2f, 0x71, 0x11, 0x12, 0xf6, 0x77, 0xca, 0x15, 0xa0, 0xc7, + 0x25, 0xb1, 0x27, 0x5a, 0x6c, 0x8d, 0x76, 0x0d, 0xef, 0xbc, 0x21, 0xcb, 0xf7, 0x84, 0x8b, 0x87, + 0x9d, 0x7b, 0x9c, 0xb7, 0x7b, 0xa2, 0xa5, 0xd7, 0x68, 0x97, 0x70, 0xf8, 0x28, 0x19, 0xf7, 0x04, + 0xcb, 0x04, 0x60, 0xd5, 0x1e, 0x64, 0x38, 0x80, 0x3f, 0xaa, 0x92, 0xfe, 0xa0, 0x8d, 0x28, 0xef, + 0xcc, 0x17, 0xea, 0xc1, 0x9a, 0xe5, 0xcf, 0xda, 0x0a, 0x24, 0xd7, 0xf3, 0x7a, 0x5b, 0x40, 0xf8, + 0xe2, 0x4f, 0xa2, 0xbf, 0x49, 0x90, 0x0e, 0xe2, 0x8d, 0xbe, 0x0d, 0x89, 0xf3, 0xab, 0x6e, 0xbd, + 0x2f, 0x47, 0x94, 0xc3, 0xf9, 0x42, 0xcd, 0x07, 0x0c, 0x1e, 0x7a, 0xa4, 0x42, 0xaa, 0xdd, 0xe9, + 0x37, 0x2f, 0x9a, 0x38, 0x80, 0x0c, 0xf8, 0x7e, 0x38, 0x51, 0x15, 0xd2, 0xb7, 0x9d, 0x5e, 0xfb, + 0xa2, 0xd3, 0x7c, 0x29, 0x47, 0xc5, 0x8c, 0x0c, 0x44, 0x82, 0x18, 0x31, 0x94, 0x46, 0xb7, 0x7b, + 0xd5, 0xac, 0x77, 0xe4, 0xd8, 0x36, 0x8a, 0x7f, 0xef, 0xa8, 0x0c, 0xc9, 0x5e, 0x1f, 0xb7, 0x3b, + 0x17, 0x72, 0x5c, 0x41, 0xf3, 0x85, 0x5a, 0x08, 0x04, 0xc4, 0x55, 0xfa, 0x8e, 0xff, 0x59, 0x82, + 0xe2, 0x19, 0xb1, 0xc9, 0x9d, 0x6e, 0xe8, 0x9e, 0x4e, 0xdd, 0xf5, 0x6c, 0xec, 0x42, 0x7c, 0x48, + 0xec, 0xa0, 0x6e, 0x9e, 0x6e, 0x1b, 0x6f, 0x02, 0x60, 0x44, 0x97, 0x3f, 0xee, 0x30, 0x07, 0x52, + 0x7e, 0x02, 0x99, 0x35, 0x69, 0xaf, 0xf7, 0xde, 0x01, 0xe4, 0xf9, 0x33, 0x32, 0x40, 0xae, 0x3e, + 0x87, 0x07, 0xff, 0x13, 0xa6, 0xec, 0x7a, 0xc4, 0xf1, 0x38, 0x60, 0x0c, 0x8b, 0x0d, 0x33, 0x42, + 0xcd, 0x11, 0x07, 0x8c, 0x61, 0xb6, 0x3c, 0xfd, 0x5f, 0x14, 0x52, 0x3d, 0xe1, 0x34, 0xfa, 0x0d, + 0xc4, 0x59, 0xb9, 0xa2, 0xe3, 0x5d, 0x5f, 0xbf, 0xca, 0xf7, 0x76, 0xae, 0xfd, 0x1f, 0x49, 0x48, + 0x07, 0xd8, 0x7c, 0x69, 0xd1, 0x37, 0xff, 0x69, 0xb6, 0xfe, 0xbe, 0xfb, 0x99, 0xfa, 0x14, 0x72, + 0xe1, 0x08, 0xa0, 0xa3, 0x47, 0xbf, 0xc7, 0x26, 0xfb, 0x98, 0x2b, 0x3f, 0xde, 0x3b, 0x88, 0xe8, + 0x12, 0xc4, 0x13, 0xfe, 0x6b, 0x31, 0xbf, 0xff, 0x24, 0xe6, 0x56, 0xdc, 0x1a, 0xdf, 0x7d, 0xf5, + 0xaf, 0x72, 0xe4, 0xd5, 0xaa, 0x2c, 0x7d, 0xb5, 0x2a, 0x4b, 0xff, 0x5c, 0x95, 0xa5, 0x3f, 0xbc, + 0x2e, 0x47, 0xbe, 0x7a, 0x5d, 0x8e, 0xfc, 0xe3, 0x75, 0x39, 0xf2, 0x6b, 0xde, 0x6a, 0x59, 0xa7, + 0x75, 0xef, 0x92, 0xdc, 0xc8, 0x47, 0xff, 0x0f, 0x00, 0x00, 0xff, 0xff, 0xf8, 0xcb, 0x1e, 0x6b, + 0xaa, 0x10, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1187,6 +1230,8 @@ const _ = grpc.SupportPackageIsVersion4 type StorageClient interface { // Read performs a read operation using the given ReadRequest Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (Storage_ReadClient, error) + // ReadFilter performs a filter operation at storage + ReadFilter(ctx context.Context, in *ReadFilterRequest, opts ...grpc.CallOption) (Storage_ReadFilterClient, error) // Capabilities returns a map of keys and values identifying the capabilities supported by the storage engine Capabilities(ctx context.Context, in *types.Empty, opts ...grpc.CallOption) (*CapabilitiesResponse, error) Hints(ctx context.Context, in *types.Empty, opts ...grpc.CallOption) (*HintsResponse, error) @@ -1232,6 +1277,38 @@ func (x *storageReadClient) Recv() (*ReadResponse, error) { return m, nil } +func (c *storageClient) ReadFilter(ctx context.Context, in *ReadFilterRequest, opts ...grpc.CallOption) (Storage_ReadFilterClient, error) { + stream, err := c.cc.NewStream(ctx, &_Storage_serviceDesc.Streams[1], "/influxdata.platform.storage.Storage/ReadFilter", opts...) + if err != nil { + return nil, err + } + x := &storageReadFilterClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Storage_ReadFilterClient interface { + Recv() (*ReadResponse, error) + grpc.ClientStream +} + +type storageReadFilterClient struct { + grpc.ClientStream +} + +func (x *storageReadFilterClient) Recv() (*ReadResponse, error) { + m := new(ReadResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func (c *storageClient) Capabilities(ctx context.Context, in *types.Empty, opts ...grpc.CallOption) (*CapabilitiesResponse, error) { out := new(CapabilitiesResponse) err := c.cc.Invoke(ctx, "/influxdata.platform.storage.Storage/Capabilities", in, out, opts...) @@ -1254,6 +1331,8 @@ func (c *storageClient) Hints(ctx context.Context, in *types.Empty, opts ...grpc type StorageServer interface { // Read performs a read operation using the given ReadRequest Read(*ReadRequest, Storage_ReadServer) error + // ReadFilter performs a filter operation at storage + ReadFilter(*ReadFilterRequest, Storage_ReadFilterServer) error // Capabilities returns a map of keys and values identifying the capabilities supported by the storage engine Capabilities(context.Context, *types.Empty) (*CapabilitiesResponse, error) Hints(context.Context, *types.Empty) (*HintsResponse, error) @@ -1284,6 +1363,27 @@ func (x *storageReadServer) Send(m *ReadResponse) error { return x.ServerStream.SendMsg(m) } +func _Storage_ReadFilter_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(ReadFilterRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(StorageServer).ReadFilter(m, &storageReadFilterServer{stream}) +} + +type Storage_ReadFilterServer interface { + Send(*ReadResponse) error + grpc.ServerStream +} + +type storageReadFilterServer struct { + grpc.ServerStream +} + +func (x *storageReadFilterServer) Send(m *ReadResponse) error { + return x.ServerStream.SendMsg(m) +} + func _Storage_Capabilities_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(types.Empty) if err := dec(in); err != nil { @@ -1339,10 +1439,61 @@ var _Storage_serviceDesc = grpc.ServiceDesc{ Handler: _Storage_Read_Handler, ServerStreams: true, }, + { + StreamName: "ReadFilter", + Handler: _Storage_ReadFilter_Handler, + ServerStreams: true, + }, }, Metadata: "storage_common.proto", } +func (m *ReadFilterRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ReadFilterRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.ReadSource != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintStorageCommon(dAtA, i, uint64(m.ReadSource.Size())) + n1, err := m.ReadSource.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } + dAtA[i] = 0x12 + i++ + i = encodeVarintStorageCommon(dAtA, i, uint64(m.Range.Size())) + n2, err := m.Range.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + if m.Predicate != nil { + dAtA[i] = 0x1a + i++ + i = encodeVarintStorageCommon(dAtA, i, uint64(m.Predicate.Size())) + n3, err := m.Predicate.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n3 + } + return i, nil +} + func (m *ReadRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -1361,11 +1512,11 @@ func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintStorageCommon(dAtA, i, uint64(m.TimestampRange.Size())) - n1, err := m.TimestampRange.MarshalTo(dAtA[i:]) + n4, err := m.TimestampRange.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n1 + i += n4 if m.Descending { dAtA[i] = 0x18 i++ @@ -1395,11 +1546,11 @@ func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x2a i++ i = encodeVarintStorageCommon(dAtA, i, uint64(m.Predicate.Size())) - n2, err := m.Predicate.MarshalTo(dAtA[i:]) + n5, err := m.Predicate.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n2 + i += n5 } if m.SeriesLimit != 0 { dAtA[i] = 0x30 @@ -1420,11 +1571,11 @@ func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x4a i++ i = encodeVarintStorageCommon(dAtA, i, uint64(m.Aggregate.Size())) - n3, err := m.Aggregate.MarshalTo(dAtA[i:]) + n6, err := m.Aggregate.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n3 + i += n6 } if len(m.Trace) > 0 { for k, _ := range m.Trace { @@ -1458,11 +1609,11 @@ func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x6a i++ i = encodeVarintStorageCommon(dAtA, i, uint64(m.ReadSource.Size())) - n4, err := m.ReadSource.MarshalTo(dAtA[i:]) + n7, err := m.ReadSource.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n4 + i += n7 } return i, nil } @@ -1566,11 +1717,11 @@ func (m *ReadResponse_Frame) MarshalTo(dAtA []byte) (int, error) { var l int _ = l if m.Data != nil { - nn5, err := m.Data.MarshalTo(dAtA[i:]) + nn8, err := m.Data.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += nn5 + i += nn8 } return i, nil } @@ -1581,11 +1732,11 @@ func (m *ReadResponse_Frame_Series) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintStorageCommon(dAtA, i, uint64(m.Series.Size())) - n6, err := m.Series.MarshalTo(dAtA[i:]) + n9, err := m.Series.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n6 + i += n9 } return i, nil } @@ -1595,11 +1746,11 @@ func (m *ReadResponse_Frame_FloatPoints) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x12 i++ i = encodeVarintStorageCommon(dAtA, i, uint64(m.FloatPoints.Size())) - n7, err := m.FloatPoints.MarshalTo(dAtA[i:]) + n10, err := m.FloatPoints.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n7 + i += n10 } return i, nil } @@ -1609,11 +1760,11 @@ func (m *ReadResponse_Frame_IntegerPoints) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x1a i++ i = encodeVarintStorageCommon(dAtA, i, uint64(m.IntegerPoints.Size())) - n8, err := m.IntegerPoints.MarshalTo(dAtA[i:]) + n11, err := m.IntegerPoints.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n8 + i += n11 } return i, nil } @@ -1623,11 +1774,11 @@ func (m *ReadResponse_Frame_UnsignedPoints) MarshalTo(dAtA []byte) (int, error) dAtA[i] = 0x22 i++ i = encodeVarintStorageCommon(dAtA, i, uint64(m.UnsignedPoints.Size())) - n9, err := m.UnsignedPoints.MarshalTo(dAtA[i:]) + n12, err := m.UnsignedPoints.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n9 + i += n12 } return i, nil } @@ -1637,11 +1788,11 @@ func (m *ReadResponse_Frame_BooleanPoints) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x2a i++ i = encodeVarintStorageCommon(dAtA, i, uint64(m.BooleanPoints.Size())) - n10, err := m.BooleanPoints.MarshalTo(dAtA[i:]) + n13, err := m.BooleanPoints.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n10 + i += n13 } return i, nil } @@ -1651,11 +1802,11 @@ func (m *ReadResponse_Frame_StringPoints) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x32 i++ i = encodeVarintStorageCommon(dAtA, i, uint64(m.StringPoints.Size())) - n11, err := m.StringPoints.MarshalTo(dAtA[i:]) + n14, err := m.StringPoints.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n11 + i += n14 } return i, nil } @@ -1665,11 +1816,11 @@ func (m *ReadResponse_Frame_Group) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x3a i++ i = encodeVarintStorageCommon(dAtA, i, uint64(m.Group.Size())) - n12, err := m.Group.MarshalTo(dAtA[i:]) + n15, err := m.Group.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n12 + i += n15 } return i, nil } @@ -1771,8 +1922,8 @@ func (m *ReadResponse_FloatPointsFrame) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintStorageCommon(dAtA, i, uint64(len(m.Values)*8)) for _, num := range m.Values { - f13 := math.Float64bits(float64(num)) - encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(f13)) + f16 := math.Float64bits(float64(num)) + encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(f16)) i += 8 } } @@ -1804,22 +1955,22 @@ func (m *ReadResponse_IntegerPointsFrame) MarshalTo(dAtA []byte) (int, error) { } } if len(m.Values) > 0 { - dAtA15 := make([]byte, len(m.Values)*10) - var j14 int + dAtA18 := make([]byte, len(m.Values)*10) + var j17 int for _, num1 := range m.Values { num := uint64(num1) for num >= 1<<7 { - dAtA15[j14] = uint8(uint64(num)&0x7f | 0x80) + dAtA18[j17] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j14++ + j17++ } - dAtA15[j14] = uint8(num) - j14++ + dAtA18[j17] = uint8(num) + j17++ } dAtA[i] = 0x12 i++ - i = encodeVarintStorageCommon(dAtA, i, uint64(j14)) - i += copy(dAtA[i:], dAtA15[:j14]) + i = encodeVarintStorageCommon(dAtA, i, uint64(j17)) + i += copy(dAtA[i:], dAtA18[:j17]) } return i, nil } @@ -1849,21 +2000,21 @@ func (m *ReadResponse_UnsignedPointsFrame) MarshalTo(dAtA []byte) (int, error) { } } if len(m.Values) > 0 { - dAtA17 := make([]byte, len(m.Values)*10) - var j16 int + dAtA20 := make([]byte, len(m.Values)*10) + var j19 int for _, num := range m.Values { for num >= 1<<7 { - dAtA17[j16] = uint8(uint64(num)&0x7f | 0x80) + dAtA20[j19] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j16++ + j19++ } - dAtA17[j16] = uint8(num) - j16++ + dAtA20[j19] = uint8(num) + j19++ } dAtA[i] = 0x12 i++ - i = encodeVarintStorageCommon(dAtA, i, uint64(j16)) - i += copy(dAtA[i:], dAtA17[:j16]) + i = encodeVarintStorageCommon(dAtA, i, uint64(j19)) + i += copy(dAtA[i:], dAtA20[:j19]) } return i, nil } @@ -2040,6 +2191,25 @@ func encodeVarintStorageCommon(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return offset + 1 } +func (m *ReadFilterRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.ReadSource != nil { + l = m.ReadSource.Size() + n += 1 + l + sovStorageCommon(uint64(l)) + } + l = m.Range.Size() + n += 1 + l + sovStorageCommon(uint64(l)) + if m.Predicate != nil { + l = m.Predicate.Size() + n += 1 + l + sovStorageCommon(uint64(l)) + } + return n +} + func (m *ReadRequest) Size() (n int) { if m == nil { return 0 @@ -2414,6 +2584,164 @@ func sovStorageCommon(x uint64) (n int) { func sozStorageCommon(x uint64) (n int) { return sovStorageCommon(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } +func (m *ReadFilterRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStorageCommon + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ReadFilterRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ReadFilterRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ReadSource", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStorageCommon + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStorageCommon + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStorageCommon + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.ReadSource == nil { + m.ReadSource = &types.Any{} + } + if err := m.ReadSource.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Range", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStorageCommon + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStorageCommon + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStorageCommon + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.Range.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Predicate", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStorageCommon + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStorageCommon + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStorageCommon + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Predicate == nil { + m.Predicate = &Predicate{} + } + if err := m.Predicate.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipStorageCommon(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthStorageCommon + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthStorageCommon + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *ReadRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/storage/reads/datatypes/storage_common.proto b/storage/reads/datatypes/storage_common.proto index 59a298532b..0c936e5353 100644 --- a/storage/reads/datatypes/storage_common.proto +++ b/storage/reads/datatypes/storage_common.proto @@ -16,6 +16,9 @@ service Storage { // Read performs a read operation using the given ReadRequest rpc Read (ReadRequest) returns (stream ReadResponse); + // ReadFilter performs a filter operation at storage + rpc ReadFilter (ReadFilterRequest) returns (stream ReadResponse); + // Capabilities returns a map of keys and values identifying the capabilities supported by the storage engine rpc Capabilities (google.protobuf.Empty) returns (CapabilitiesResponse); @@ -25,6 +28,12 @@ service Storage { // rpc Explain(google.protobuf.Empty) returns (ExplainResponse){} } +message ReadFilterRequest { + google.protobuf.Any read_source = 1 [(gogoproto.customname) = "ReadSource"]; + TimestampRange range = 2 [(gogoproto.nullable) = false]; + Predicate predicate = 3; +} + // Request message for Storage.Read. message ReadRequest { enum Group { diff --git a/storage/reads/reader.go b/storage/reads/reader.go index 9dfb10c861..ff87680208 100644 --- a/storage/reads/reader.go +++ b/storage/reads/reader.go @@ -52,8 +52,138 @@ func (r *storeReader) Read(ctx context.Context, rs influxdb.ReadSpec, start, sto }, nil } +func (r *storeReader) ReadFilter(ctx context.Context, spec influxdb.ReadFilterSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) { + return &simpleTableIterator{ + ctx: ctx, + s: r.s, + spec: spec, + alloc: alloc, + }, nil +} + func (r *storeReader) Close() {} +type simpleTableIterator struct { + ctx context.Context + s Store + spec influxdb.ReadFilterSpec + stats cursors.CursorStats + alloc *memory.Allocator +} + +func (bi *simpleTableIterator) Statistics() cursors.CursorStats { return bi.stats } + +func (bi *simpleTableIterator) Do(f func(flux.Table) error) error { + orgID := uint64(bi.spec.OrganizationID) + bucketID := uint64(bi.spec.BucketID) + src := bi.s.GetSourceFrom(orgID, bucketID) + + // Setup read request + any, err := types.MarshalAny(src) + if err != nil { + return err + } + + var predicate *datatypes.Predicate + if bi.spec.Predicate != nil { + p, err := toStoragePredicate(bi.spec.Predicate) + if err != nil { + return err + } + predicate = p + } + + var req datatypes.ReadFilterRequest + req.ReadSource = any + req.Predicate = predicate + req.Range.Start = int64(bi.spec.Bounds.Start) + req.Range.End = int64(bi.spec.Bounds.Stop) + + rs, err := bi.s.ReadFilter(bi.ctx, &req) + if err != nil { + return err + } + + if rs == nil { + return nil + } + + return bi.handleRead(f, rs) +} + +func (bi *simpleTableIterator) handleRead(f func(flux.Table) error, rs ResultSet) error { + // these resources must be closed if not nil on return + var ( + cur cursors.Cursor + table storageTable + ) + + defer func() { + if table != nil { + table.Close() + } + if cur != nil { + cur.Close() + } + rs.Close() + }() + +READ: + for rs.Next() { + cur = rs.Cursor() + if cur == nil { + // no data for series key + field combination + continue + } + + bnds := bi.spec.Bounds + key := defaultGroupKeyForSeries(rs.Tags(), bnds) + done := make(chan struct{}) + switch typedCur := cur.(type) { + case cursors.IntegerArrayCursor: + cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt) + table = newIntegerTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, bi.alloc) + case cursors.FloatArrayCursor: + cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat) + table = newFloatTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, bi.alloc) + case cursors.UnsignedArrayCursor: + cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt) + table = newUnsignedTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, bi.alloc) + case cursors.BooleanArrayCursor: + cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool) + table = newBooleanTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, bi.alloc) + case cursors.StringArrayCursor: + cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString) + table = newStringTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, bi.alloc) + default: + panic(fmt.Sprintf("unreachable: %T", typedCur)) + } + + cur = nil + + if !table.Empty() { + if err := f(table); err != nil { + table.Close() + table = nil + return err + } + select { + case <-done: + case <-bi.ctx.Done(): + table.Cancel() + break READ + } + } + + stats := table.Statistics() + bi.stats.ScannedValues += stats.ScannedValues + bi.stats.ScannedBytes += stats.ScannedBytes + table.Close() + table = nil + } + return rs.Err() +} + type tableIterator struct { ctx context.Context bounds execute.Bounds @@ -442,6 +572,29 @@ func determineTableColsForSeries(tags models.Tags, typ flux.ColType) ([]flux.Col return cols, defs } +func defaultGroupKeyForSeries(tags models.Tags, bnds execute.Bounds) flux.GroupKey { + cols := make([]flux.ColMeta, 2, len(tags)) + vs := make([]values.Value, 2, len(tags)) + cols[0] = flux.ColMeta{ + Label: execute.DefaultStartColLabel, + Type: flux.TTime, + } + vs[0] = values.NewTime(bnds.Start) + cols[1] = flux.ColMeta{ + Label: execute.DefaultStopColLabel, + Type: flux.TTime, + } + vs[1] = values.NewTime(bnds.Stop) + for i := range tags { + cols = append(cols, flux.ColMeta{ + Label: string(tags[i].Key), + Type: flux.TString, + }) + vs = append(vs, values.NewString(string(tags[i].Value))) + } + return execute.NewGroupKey(cols, vs) +} + func groupKeyForSeries(tags models.Tags, readSpec *influxdb.ReadSpec, bnds execute.Bounds) flux.GroupKey { cols := make([]flux.ColMeta, 2, len(tags)) vs := make([]values.Value, 2, len(tags)) diff --git a/storage/reads/resultset.go b/storage/reads/resultset.go index a85478c0fe..bbc43bb031 100644 --- a/storage/reads/resultset.go +++ b/storage/reads/resultset.go @@ -2,6 +2,7 @@ package reads import ( "context" + "math" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/storage/reads/datatypes" @@ -30,6 +31,14 @@ func NewResultSet(ctx context.Context, req *datatypes.ReadRequest, cur SeriesCur } } +func NewResultSetFromFilter(ctx context.Context, req *datatypes.ReadFilterRequest, cur SeriesCursor) ResultSet { + return &resultSet{ + ctx: ctx, + cur: cur, + mb: newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, true, math.MaxInt64), + } +} + func (r *resultSet) Err() error { return nil } // Close closes the result set. Close is idempotent. diff --git a/storage/reads/store.go b/storage/reads/store.go index 3c93a12de8..643635fca3 100644 --- a/storage/reads/store.go +++ b/storage/reads/store.go @@ -76,6 +76,8 @@ type GroupCursor interface { type Store interface { Read(ctx context.Context, req *datatypes.ReadRequest) (ResultSet, error) + ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest) (ResultSet, error) GroupRead(ctx context.Context, req *datatypes.ReadRequest) (GroupResultSet, error) GetSource(rs influxdb.ReadSpec) (proto.Message, error) + GetSourceFrom(orgID, bucketID uint64) proto.Message } diff --git a/storage/readservice/cursor.go b/storage/readservice/cursor.go index 1da0186bd6..7c93e85355 100644 --- a/storage/readservice/cursor.go +++ b/storage/readservice/cursor.go @@ -35,7 +35,7 @@ type indexSeriesCursor struct { hasValueExpr bool } -func newIndexSeriesCursor(ctx context.Context, src *readSource, req *datatypes.ReadRequest, engine *storage.Engine) (*indexSeriesCursor, error) { +func newIndexSeriesCursor(ctx context.Context, src *readSource, predicate *datatypes.Predicate, engine *storage.Engine) (*indexSeriesCursor, error) { queries, err := engine.CreateCursorIterator(ctx) if err != nil { return nil, err @@ -59,7 +59,7 @@ func newIndexSeriesCursor(ctx context.Context, src *readSource, req *datatypes.R } p := &indexSeriesCursor{row: reads.SeriesRow{Query: tsdb.CursorIterators{queries}}} - if root := req.Predicate.GetRoot(); root != nil { + if root := predicate.GetRoot(); root != nil { if p.cond, err = reads.NodeToExpr(root, nil); err != nil { return nil, err } diff --git a/storage/readservice/store.go b/storage/readservice/store.go index 1585b3e6a9..57837e9326 100644 --- a/storage/readservice/store.go +++ b/storage/readservice/store.go @@ -22,6 +22,28 @@ func newStore(engine *storage.Engine) *store { return &store{engine: engine} } +func (s *store) ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest) (reads.ResultSet, error) { + if req.ReadSource == nil { + return nil, errors.New("missing read source") + } + + var source readSource + if err := types.UnmarshalAny(req.ReadSource, &source); err != nil { + return nil, err + } + + var cur reads.SeriesCursor + if ic, err := newIndexSeriesCursor(ctx, &source, req.Predicate, s.engine); err != nil { + return nil, err + } else if ic == nil { + return nil, nil + } else { + cur = ic + } + + return reads.NewResultSetFromFilter(ctx, req, cur), nil +} + func (s *store) Read(ctx context.Context, req *datatypes.ReadRequest) (reads.ResultSet, error) { if len(req.GroupKeys) > 0 { panic("Read: len(Grouping) > 0") @@ -49,7 +71,7 @@ func (s *store) Read(ctx context.Context, req *datatypes.ReadRequest) (reads.Res } var cur reads.SeriesCursor - if ic, err := newIndexSeriesCursor(ctx, source, req, s.engine); err != nil { + if ic, err := newIndexSeriesCursor(ctx, source, req.Predicate, s.engine); err != nil { return nil, err } else if ic == nil { return nil, nil @@ -91,7 +113,7 @@ func (s *store) GroupRead(ctx context.Context, req *datatypes.ReadRequest) (read } newCursor := func() (reads.SeriesCursor, error) { - cur, err := newIndexSeriesCursor(ctx, source, req, s.engine) + cur, err := newIndexSeriesCursor(ctx, source, req.Predicate, s.engine) if cur == nil || err != nil { return nil, err } @@ -120,6 +142,13 @@ func (s *store) GetSource(rs influxdb.ReadSpec) (proto.Message, error) { }, nil } +func (s *store) GetSourceFrom(orgID, bucketID uint64) proto.Message { + return &readSource{ + BucketID: bucketID, + OrganizationID: orgID, + } +} + func getReadSource(req *datatypes.ReadRequest) (*readSource, error) { if req.ReadSource == nil { return nil, errors.New("missing read source")