diff --git a/query/stdlib/influxdata/influxdb/source.go b/query/stdlib/influxdata/influxdb/source.go index f1da6a9e77..a39ab93820 100644 --- a/query/stdlib/influxdata/influxdb/source.go +++ b/query/stdlib/influxdata/influxdb/source.go @@ -117,7 +117,7 @@ func (s *readFilterSource) run(ctx context.Context) error { } func createReadFilterSource(s plan.ProcedureSpec, id execute.DatasetID, a execute.Administration) (execute.Source, error) { - span, ctx := tracing.StartSpanFromContext(context.TODO()) + span, ctx := tracing.StartSpanFromContext(a.Context()) defer span.Finish() spec := s.(*ReadRangePhysSpec) @@ -190,7 +190,7 @@ func (s *readGroupSource) run(ctx context.Context) error { } func createReadGroupSource(s plan.ProcedureSpec, id execute.DatasetID, a execute.Administration) (execute.Source, error) { - span, ctx := tracing.StartSpanFromContext(context.TODO()) + span, ctx := tracing.StartSpanFromContext(a.Context()) defer span.Finish() spec := s.(*ReadGroupPhysSpec) @@ -236,7 +236,7 @@ func createReadGroupSource(s plan.ProcedureSpec, id execute.DatasetID, a execute } func createReadTagKeysSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execute.Administration) (execute.Source, error) { - span, ctx := tracing.StartSpanFromContext(context.TODO()) + span, ctx := tracing.StartSpanFromContext(a.Context()) defer span.Finish() spec := prSpec.(*ReadTagKeysPhysSpec) @@ -300,7 +300,7 @@ func (s *readTagKeysSource) run(ctx context.Context) error { } func createReadTagValuesSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execute.Administration) (execute.Source, error) { - span, ctx := tracing.StartSpanFromContext(context.TODO()) + span, ctx := tracing.StartSpanFromContext(a.Context()) defer span.Finish() spec := prSpec.(*ReadTagValuesPhysSpec) diff --git a/storage/reads/datatypes/storage_common.pb.go b/storage/reads/datatypes/storage_common.pb.go index 92bc288c30..b3310b0f64 100644 --- a/storage/reads/datatypes/storage_common.pb.go +++ b/storage/reads/datatypes/storage_common.pb.go @@ -26,72 +26,76 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package -type ReadRequest_Group int32 +type ReadGroupRequest_Group int32 const ( // GroupNone returns all series as a single group. // The single GroupFrame.TagKeys will be the union of all tag keys. - GroupNone ReadRequest_Group = 0 + GroupNone ReadGroupRequest_Group = 0 // GroupAll returns a unique group for each series. // As an optimization, no GroupFrames will be generated. - GroupAll ReadRequest_Group = 1 + GroupAll ReadGroupRequest_Group = 1 // GroupBy returns a group for each unique value of the specified GroupKeys. - GroupBy ReadRequest_Group = 2 + GroupBy ReadGroupRequest_Group = 2 // GroupExcept in not implemented. - GroupExcept ReadRequest_Group = 3 + GroupExcept ReadGroupRequest_Group = 3 ) -var ReadRequest_Group_name = map[int32]string{ +var ReadGroupRequest_Group_name = map[int32]string{ 0: "GROUP_NONE", 1: "GROUP_ALL", 2: "GROUP_BY", 3: "GROUP_EXCEPT", } -var ReadRequest_Group_value = map[string]int32{ +var ReadGroupRequest_Group_value = map[string]int32{ "GROUP_NONE": 0, "GROUP_ALL": 1, "GROUP_BY": 2, "GROUP_EXCEPT": 3, } -func (x ReadRequest_Group) String() string { - return proto.EnumName(ReadRequest_Group_name, int32(x)) +func (x ReadGroupRequest_Group) String() string { + return proto.EnumName(ReadGroupRequest_Group_name, int32(x)) } -func (ReadRequest_Group) EnumDescriptor() ([]byte, []int) { +func (ReadGroupRequest_Group) EnumDescriptor() ([]byte, []int) { return fileDescriptor_715e4bf4cdf1f73d, []int{1, 0} } -type ReadRequest_HintFlags int32 +// TODO(jlapacik): This field is only used in unit tests. +// Specifically the two tests in group_resultset_test.go. +// This field should be removed and the tests that depend +// on it refactored. +type ReadGroupRequest_HintFlags int32 const ( - HintNone ReadRequest_HintFlags = 0 - HintNoPoints ReadRequest_HintFlags = 1 - HintNoSeries ReadRequest_HintFlags = 2 + HintNone ReadGroupRequest_HintFlags = 0 + HintNoPoints ReadGroupRequest_HintFlags = 1 + HintNoSeries ReadGroupRequest_HintFlags = 2 // HintSchemaAllTime performs schema queries without using time ranges - HintSchemaAllTime ReadRequest_HintFlags = 4 + HintSchemaAllTime ReadGroupRequest_HintFlags = 4 ) -var ReadRequest_HintFlags_name = map[int32]string{ +var ReadGroupRequest_HintFlags_name = map[int32]string{ 0: "HINT_NONE", 1: "HINT_NO_POINTS", 2: "HINT_NO_SERIES", 4: "HINT_SCHEMA_ALL_TIME", } -var ReadRequest_HintFlags_value = map[string]int32{ +var ReadGroupRequest_HintFlags_value = map[string]int32{ "HINT_NONE": 0, "HINT_NO_POINTS": 1, "HINT_NO_SERIES": 2, "HINT_SCHEMA_ALL_TIME": 4, } -func (x ReadRequest_HintFlags) String() string { - return proto.EnumName(ReadRequest_HintFlags_name, int32(x)) +func (x ReadGroupRequest_HintFlags) String() string { + return proto.EnumName(ReadGroupRequest_HintFlags_name, int32(x)) } -func (ReadRequest_HintFlags) EnumDescriptor() ([]byte, []int) { +func (ReadGroupRequest_HintFlags) EnumDescriptor() ([]byte, []int) { return fileDescriptor_715e4bf4cdf1f73d, []int{1, 1} } @@ -120,7 +124,7 @@ func (x Aggregate_AggregateType) String() string { } func (Aggregate_AggregateType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{2, 0} + return fileDescriptor_715e4bf4cdf1f73d, []int{3, 0} } type ReadResponse_FrameType int32 @@ -145,7 +149,7 @@ func (x ReadResponse_FrameType) String() string { } func (ReadResponse_FrameType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{4, 0} + return fileDescriptor_715e4bf4cdf1f73d, []int{5, 0} } type ReadResponse_DataType int32 @@ -179,7 +183,7 @@ func (x ReadResponse_DataType) String() string { } func (ReadResponse_DataType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{4, 1} + return fileDescriptor_715e4bf4cdf1f73d, []int{5, 1} } type ReadFilterRequest struct { @@ -221,40 +225,60 @@ func (m *ReadFilterRequest) XXX_DiscardUnknown() { var xxx_messageInfo_ReadFilterRequest proto.InternalMessageInfo -// Request message for Storage.Read. +type ReadGroupRequest struct { + ReadSource *types.Any `protobuf:"bytes,1,opt,name=read_source,json=readSource,proto3" json:"read_source,omitempty"` + Range TimestampRange `protobuf:"bytes,2,opt,name=range,proto3" json:"range"` + Predicate *Predicate `protobuf:"bytes,3,opt,name=predicate,proto3" json:"predicate,omitempty"` + // GroupKeys specifies a list of tag keys used to order the data. + // It is dependent on the Group property to determine its behavior. + GroupKeys []string `protobuf:"bytes,4,rep,name=group_keys,json=groupKeys,proto3" json:"group_keys,omitempty"` + Group ReadGroupRequest_Group `protobuf:"varint,5,opt,name=group,proto3,enum=influxdata.platform.storage.ReadGroupRequest_Group" json:"group,omitempty"` + Aggregate *Aggregate `protobuf:"bytes,6,opt,name=aggregate,proto3" json:"aggregate,omitempty"` + Hints HintFlags `protobuf:"fixed32,7,opt,name=hints,proto3,casttype=HintFlags" json:"hints,omitempty"` +} + +func (m *ReadGroupRequest) Reset() { *m = ReadGroupRequest{} } +func (m *ReadGroupRequest) String() string { return proto.CompactTextString(m) } +func (*ReadGroupRequest) ProtoMessage() {} +func (*ReadGroupRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_715e4bf4cdf1f73d, []int{1} +} +func (m *ReadGroupRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ReadGroupRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ReadGroupRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ReadGroupRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReadGroupRequest.Merge(m, src) +} +func (m *ReadGroupRequest) XXX_Size() int { + return m.Size() +} +func (m *ReadGroupRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ReadGroupRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ReadGroupRequest proto.InternalMessageInfo + +// TODO(jlapacik): Remove this message type ReadRequest struct { - ReadSource *types.Any `protobuf:"bytes,13,opt,name=read_source,json=readSource,proto3" json:"read_source,omitempty"` - TimestampRange TimestampRange `protobuf:"bytes,2,opt,name=timestamp_range,json=timestampRange,proto3" json:"timestamp_range"` - // Descending indicates whether points should be returned in descending order. - Descending bool `protobuf:"varint,3,opt,name=descending,proto3" json:"descending,omitempty"` - // GroupKeys specifies a list of tag keys used to order the data. It is dependent on the Group property to determine - // its behavior. - GroupKeys []string `protobuf:"bytes,4,rep,name=group_keys,json=groupKeys,proto3" json:"group_keys,omitempty"` - // - Group ReadRequest_Group `protobuf:"varint,11,opt,name=group,proto3,enum=influxdata.platform.storage.ReadRequest_Group" json:"group,omitempty"` - // Aggregate specifies an optional aggregate to apply to the data. - // TODO(sgc): switch to slice for multiple aggregates in a single request - Aggregate *Aggregate `protobuf:"bytes,9,opt,name=aggregate,proto3" json:"aggregate,omitempty"` - Predicate *Predicate `protobuf:"bytes,5,opt,name=predicate,proto3" json:"predicate,omitempty"` - // SeriesLimit determines the maximum number of series to be returned for the request. Specify 0 for no limit. - SeriesLimit int64 `protobuf:"varint,6,opt,name=series_limit,json=seriesLimit,proto3" json:"series_limit,omitempty"` - // SeriesOffset determines how many series to skip before processing the request. - SeriesOffset int64 `protobuf:"varint,7,opt,name=series_offset,json=seriesOffset,proto3" json:"series_offset,omitempty"` - // PointsLimit determines the maximum number of values per series to be returned for the request. - // Specify 0 for no limit. -1 to return series frames only. - PointsLimit int64 `protobuf:"varint,8,opt,name=points_limit,json=pointsLimit,proto3" json:"points_limit,omitempty"` - // Trace contains opaque data if a trace is active. - Trace map[string]string `protobuf:"bytes,10,rep,name=trace,proto3" json:"trace,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - // Hints is a bitwise OR of HintFlags to control the behavior - // of the read request. - Hints HintFlags `protobuf:"fixed32,12,opt,name=hints,proto3,casttype=HintFlags" json:"hints,omitempty"` } func (m *ReadRequest) Reset() { *m = ReadRequest{} } func (m *ReadRequest) String() string { return proto.CompactTextString(m) } func (*ReadRequest) ProtoMessage() {} func (*ReadRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{1} + return fileDescriptor_715e4bf4cdf1f73d, []int{2} } func (m *ReadRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -291,7 +315,7 @@ func (m *Aggregate) Reset() { *m = Aggregate{} } func (m *Aggregate) String() string { return proto.CompactTextString(m) } func (*Aggregate) ProtoMessage() {} func (*Aggregate) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{2} + return fileDescriptor_715e4bf4cdf1f73d, []int{3} } func (m *Aggregate) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -329,7 +353,7 @@ func (m *Tag) Reset() { *m = Tag{} } func (m *Tag) String() string { return proto.CompactTextString(m) } func (*Tag) ProtoMessage() {} func (*Tag) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{3} + return fileDescriptor_715e4bf4cdf1f73d, []int{4} } func (m *Tag) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -358,7 +382,7 @@ func (m *Tag) XXX_DiscardUnknown() { var xxx_messageInfo_Tag proto.InternalMessageInfo -// Response message for Storage.Read. +// Response message for ReadFilter and ReadGroup type ReadResponse struct { Frames []ReadResponse_Frame `protobuf:"bytes,1,rep,name=frames,proto3" json:"frames"` } @@ -367,7 +391,7 @@ func (m *ReadResponse) Reset() { *m = ReadResponse{} } func (m *ReadResponse) String() string { return proto.CompactTextString(m) } func (*ReadResponse) ProtoMessage() {} func (*ReadResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{4} + return fileDescriptor_715e4bf4cdf1f73d, []int{5} } func (m *ReadResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -412,7 +436,7 @@ func (m *ReadResponse_Frame) Reset() { *m = ReadResponse_Frame{} } func (m *ReadResponse_Frame) String() string { return proto.CompactTextString(m) } func (*ReadResponse_Frame) ProtoMessage() {} func (*ReadResponse_Frame) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{4, 0} + return fileDescriptor_715e4bf4cdf1f73d, []int{5, 0} } func (m *ReadResponse_Frame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -705,7 +729,7 @@ func _ReadResponse_Frame_OneofSizer(msg proto.Message) (n int) { type ReadResponse_GroupFrame struct { // TagKeys TagKeys [][]byte `protobuf:"bytes,1,rep,name=tag_keys,json=tagKeys,proto3" json:"tag_keys,omitempty"` - // PartitionKeyVals is the values of the partition key for this group, order matching ReadRequest.GroupKeys + // PartitionKeyVals is the values of the partition key for this group, order matching ReadGroupRequest.GroupKeys PartitionKeyVals [][]byte `protobuf:"bytes,2,rep,name=partition_key_vals,json=partitionKeyVals,proto3" json:"partition_key_vals,omitempty"` } @@ -713,7 +737,7 @@ func (m *ReadResponse_GroupFrame) Reset() { *m = ReadResponse_GroupFrame func (m *ReadResponse_GroupFrame) String() string { return proto.CompactTextString(m) } func (*ReadResponse_GroupFrame) ProtoMessage() {} func (*ReadResponse_GroupFrame) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{4, 1} + return fileDescriptor_715e4bf4cdf1f73d, []int{5, 1} } func (m *ReadResponse_GroupFrame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -751,7 +775,7 @@ func (m *ReadResponse_SeriesFrame) Reset() { *m = ReadResponse_SeriesFra func (m *ReadResponse_SeriesFrame) String() string { return proto.CompactTextString(m) } func (*ReadResponse_SeriesFrame) ProtoMessage() {} func (*ReadResponse_SeriesFrame) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{4, 2} + return fileDescriptor_715e4bf4cdf1f73d, []int{5, 2} } func (m *ReadResponse_SeriesFrame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -789,7 +813,7 @@ func (m *ReadResponse_FloatPointsFrame) Reset() { *m = ReadResponse_Floa func (m *ReadResponse_FloatPointsFrame) String() string { return proto.CompactTextString(m) } func (*ReadResponse_FloatPointsFrame) ProtoMessage() {} func (*ReadResponse_FloatPointsFrame) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{4, 3} + return fileDescriptor_715e4bf4cdf1f73d, []int{5, 3} } func (m *ReadResponse_FloatPointsFrame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -827,7 +851,7 @@ func (m *ReadResponse_IntegerPointsFrame) Reset() { *m = ReadResponse_In func (m *ReadResponse_IntegerPointsFrame) String() string { return proto.CompactTextString(m) } func (*ReadResponse_IntegerPointsFrame) ProtoMessage() {} func (*ReadResponse_IntegerPointsFrame) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{4, 4} + return fileDescriptor_715e4bf4cdf1f73d, []int{5, 4} } func (m *ReadResponse_IntegerPointsFrame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -865,7 +889,7 @@ func (m *ReadResponse_UnsignedPointsFrame) Reset() { *m = ReadResponse_U func (m *ReadResponse_UnsignedPointsFrame) String() string { return proto.CompactTextString(m) } func (*ReadResponse_UnsignedPointsFrame) ProtoMessage() {} func (*ReadResponse_UnsignedPointsFrame) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{4, 5} + return fileDescriptor_715e4bf4cdf1f73d, []int{5, 5} } func (m *ReadResponse_UnsignedPointsFrame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -903,7 +927,7 @@ func (m *ReadResponse_BooleanPointsFrame) Reset() { *m = ReadResponse_Bo func (m *ReadResponse_BooleanPointsFrame) String() string { return proto.CompactTextString(m) } func (*ReadResponse_BooleanPointsFrame) ProtoMessage() {} func (*ReadResponse_BooleanPointsFrame) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{4, 6} + return fileDescriptor_715e4bf4cdf1f73d, []int{5, 6} } func (m *ReadResponse_BooleanPointsFrame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -941,7 +965,7 @@ func (m *ReadResponse_StringPointsFrame) Reset() { *m = ReadResponse_Str func (m *ReadResponse_StringPointsFrame) String() string { return proto.CompactTextString(m) } func (*ReadResponse_StringPointsFrame) ProtoMessage() {} func (*ReadResponse_StringPointsFrame) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{4, 7} + return fileDescriptor_715e4bf4cdf1f73d, []int{5, 7} } func (m *ReadResponse_StringPointsFrame) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -978,7 +1002,7 @@ func (m *CapabilitiesResponse) Reset() { *m = CapabilitiesResponse{} } func (m *CapabilitiesResponse) String() string { return proto.CompactTextString(m) } func (*CapabilitiesResponse) ProtoMessage() {} func (*CapabilitiesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{5} + return fileDescriptor_715e4bf4cdf1f73d, []int{6} } func (m *CapabilitiesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1014,7 +1038,7 @@ func (m *HintsResponse) Reset() { *m = HintsResponse{} } func (m *HintsResponse) String() string { return proto.CompactTextString(m) } func (*HintsResponse) ProtoMessage() {} func (*HintsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{6} + return fileDescriptor_715e4bf4cdf1f73d, []int{7} } func (m *HintsResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1055,7 +1079,7 @@ func (m *TimestampRange) Reset() { *m = TimestampRange{} } func (m *TimestampRange) String() string { return proto.CompactTextString(m) } func (*TimestampRange) ProtoMessage() {} func (*TimestampRange) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{7} + return fileDescriptor_715e4bf4cdf1f73d, []int{8} } func (m *TimestampRange) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1095,7 +1119,7 @@ func (m *TagKeysRequest) Reset() { *m = TagKeysRequest{} } func (m *TagKeysRequest) String() string { return proto.CompactTextString(m) } func (*TagKeysRequest) ProtoMessage() {} func (*TagKeysRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{8} + return fileDescriptor_715e4bf4cdf1f73d, []int{9} } func (m *TagKeysRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1136,7 +1160,7 @@ func (m *TagValuesRequest) Reset() { *m = TagValuesRequest{} } func (m *TagValuesRequest) String() string { return proto.CompactTextString(m) } func (*TagValuesRequest) ProtoMessage() {} func (*TagValuesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{9} + return fileDescriptor_715e4bf4cdf1f73d, []int{10} } func (m *TagValuesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1174,7 +1198,7 @@ func (m *StringValuesResponse) Reset() { *m = StringValuesResponse{} } func (m *StringValuesResponse) String() string { return proto.CompactTextString(m) } func (*StringValuesResponse) ProtoMessage() {} func (*StringValuesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_715e4bf4cdf1f73d, []int{10} + return fileDescriptor_715e4bf4cdf1f73d, []int{11} } func (m *StringValuesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1204,14 +1228,14 @@ func (m *StringValuesResponse) XXX_DiscardUnknown() { var xxx_messageInfo_StringValuesResponse proto.InternalMessageInfo func init() { - proto.RegisterEnum("influxdata.platform.storage.ReadRequest_Group", ReadRequest_Group_name, ReadRequest_Group_value) - proto.RegisterEnum("influxdata.platform.storage.ReadRequest_HintFlags", ReadRequest_HintFlags_name, ReadRequest_HintFlags_value) + proto.RegisterEnum("influxdata.platform.storage.ReadGroupRequest_Group", ReadGroupRequest_Group_name, ReadGroupRequest_Group_value) + proto.RegisterEnum("influxdata.platform.storage.ReadGroupRequest_HintFlags", ReadGroupRequest_HintFlags_name, ReadGroupRequest_HintFlags_value) proto.RegisterEnum("influxdata.platform.storage.Aggregate_AggregateType", Aggregate_AggregateType_name, Aggregate_AggregateType_value) proto.RegisterEnum("influxdata.platform.storage.ReadResponse_FrameType", ReadResponse_FrameType_name, ReadResponse_FrameType_value) proto.RegisterEnum("influxdata.platform.storage.ReadResponse_DataType", ReadResponse_DataType_name, ReadResponse_DataType_value) proto.RegisterType((*ReadFilterRequest)(nil), "influxdata.platform.storage.ReadFilterRequest") + proto.RegisterType((*ReadGroupRequest)(nil), "influxdata.platform.storage.ReadGroupRequest") proto.RegisterType((*ReadRequest)(nil), "influxdata.platform.storage.ReadRequest") - proto.RegisterMapType((map[string]string)(nil), "influxdata.platform.storage.ReadRequest.TraceEntry") proto.RegisterType((*Aggregate)(nil), "influxdata.platform.storage.Aggregate") proto.RegisterType((*Tag)(nil), "influxdata.platform.storage.Tag") proto.RegisterType((*ReadResponse)(nil), "influxdata.platform.storage.ReadResponse") @@ -1235,114 +1259,106 @@ func init() { func init() { proto.RegisterFile("storage_common.proto", fileDescriptor_715e4bf4cdf1f73d) } var fileDescriptor_715e4bf4cdf1f73d = []byte{ - // 1701 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xdc, 0x58, 0xcd, 0x6f, 0x23, 0x49, - 0x15, 0x77, 0xc7, 0xdf, 0xcf, 0x1f, 0xe9, 0xd4, 0x9a, 0xe0, 0xed, 0x61, 0xed, 0x5e, 0x0b, 0x2d, - 0x81, 0xdd, 0x75, 0x96, 0xec, 0xae, 0x18, 0x0d, 0x70, 0x88, 0x33, 0x9e, 0x38, 0x4c, 0x62, 0x47, - 0x6d, 0x67, 0xc5, 0x22, 0x21, 0xab, 0x12, 0x57, 0x7a, 0x5b, 0xd3, 0xee, 0x6e, 0xba, 0xcb, 0xab, - 0x58, 0xe2, 0xc2, 0x89, 0x95, 0x4f, 0x70, 0x45, 0xb2, 0x84, 0xc4, 0x91, 0x3b, 0x7f, 0xc3, 0xdc, - 0xd8, 0x23, 0x5c, 0x2c, 0xf0, 0x48, 0x48, 0xfc, 0x05, 0x48, 0x9c, 0x50, 0x55, 0x75, 0xd9, 0xed, - 0xc9, 0x4c, 0xc6, 0xce, 0x09, 0xed, 0xad, 0xea, 0x7d, 0xfc, 0x5e, 0xbd, 0x57, 0xef, 0xa3, 0xba, - 0xa1, 0x14, 0x50, 0xd7, 0xc7, 0x26, 0xe9, 0x5f, 0xb9, 0xc3, 0xa1, 0xeb, 0xd4, 0x3d, 0xdf, 0xa5, - 0x2e, 0x7a, 0x60, 0x39, 0xd7, 0xf6, 0xe8, 0x66, 0x80, 0x29, 0xae, 0x7b, 0x36, 0xa6, 0xd7, 0xae, - 0x3f, 0xac, 0x87, 0x92, 0x5a, 0xc9, 0x74, 0x4d, 0x97, 0xcb, 0xed, 0xb3, 0x95, 0x50, 0xd1, 0x1e, - 0x98, 0xae, 0x6b, 0xda, 0x64, 0x9f, 0xef, 0x2e, 0x47, 0xd7, 0xfb, 0x64, 0xe8, 0xd1, 0x71, 0xc8, - 0x7c, 0xfb, 0x65, 0x26, 0x76, 0x24, 0x6b, 0xdb, 0xf3, 0xc9, 0xc0, 0xba, 0xc2, 0x94, 0x08, 0x42, - 0xed, 0xdf, 0x0a, 0xec, 0x18, 0x04, 0x0f, 0x9e, 0x58, 0x36, 0x25, 0xbe, 0x41, 0x7e, 0x35, 0x22, - 0x01, 0x45, 0x4d, 0xc8, 0xf9, 0x04, 0x0f, 0xfa, 0x81, 0x3b, 0xf2, 0xaf, 0x48, 0x59, 0xd1, 0x95, - 0xbd, 0xdc, 0x41, 0xa9, 0x2e, 0x70, 0xeb, 0x12, 0xb7, 0x7e, 0xe8, 0x8c, 0x1b, 0xc5, 0xf9, 0xac, - 0x0a, 0x0c, 0xa1, 0xcb, 0x65, 0x0d, 0xf0, 0x17, 0x6b, 0x74, 0x0c, 0x49, 0x1f, 0x3b, 0x26, 0x29, - 0x6f, 0x71, 0x80, 0xf7, 0xeb, 0x77, 0x38, 0x5a, 0xef, 0x59, 0x43, 0x12, 0x50, 0x3c, 0xf4, 0x0c, - 0xa6, 0xd2, 0x48, 0x3c, 0x9f, 0x55, 0x63, 0x86, 0xd0, 0x47, 0x8f, 0x21, 0xbb, 0x38, 0x78, 0x39, - 0xce, 0xc1, 0xde, 0xbb, 0x13, 0xec, 0x5c, 0x4a, 0x1b, 0x4b, 0xc5, 0xda, 0x7f, 0x32, 0x90, 0x63, - 0x27, 0x7d, 0x8d, 0x97, 0x85, 0x7b, 0x7a, 0x69, 0xc3, 0x36, 0x95, 0x67, 0xef, 0xdf, 0xdb, 0xdf, - 0x5d, 0xe6, 0xef, 0x7c, 0x56, 0x2d, 0xae, 0xd2, 0x8d, 0x22, 0x5d, 0xd9, 0xa3, 0x0a, 0xc0, 0x80, - 0x04, 0x57, 0xc4, 0x19, 0x58, 0x8e, 0xc9, 0x63, 0x91, 0x31, 0x22, 0x14, 0xf4, 0x01, 0x80, 0xe9, - 0xbb, 0x23, 0xaf, 0xff, 0x8c, 0x8c, 0x83, 0x72, 0x42, 0x8f, 0xef, 0x65, 0x1b, 0x85, 0xf9, 0xac, - 0x9a, 0x3d, 0x66, 0xd4, 0xa7, 0x64, 0x1c, 0x18, 0x59, 0x53, 0x2e, 0xd1, 0x63, 0x48, 0xf2, 0x4d, - 0x39, 0xa7, 0x2b, 0x7b, 0xc5, 0x83, 0xfa, 0x9d, 0x27, 0x8e, 0xc4, 0xae, 0xce, 0xd1, 0x0c, 0xa1, - 0xcc, 0xae, 0x07, 0x9b, 0xa6, 0x4f, 0x4c, 0x76, 0x3d, 0xd9, 0x35, 0xae, 0xe7, 0x50, 0x4a, 0x1b, - 0x4b, 0xc5, 0xd5, 0x4b, 0x4e, 0xde, 0xf3, 0x92, 0xd1, 0x01, 0xe4, 0x03, 0xe2, 0x5b, 0x24, 0xe8, - 0xdb, 0xd6, 0xd0, 0xa2, 0xe5, 0x94, 0xae, 0xec, 0xc5, 0x1b, 0xdb, 0xf3, 0x59, 0x35, 0xd7, 0xe5, - 0xf4, 0x53, 0x46, 0x36, 0x72, 0xc1, 0x72, 0x83, 0x3e, 0x85, 0x42, 0xa8, 0xe3, 0x5e, 0x5f, 0x07, - 0x84, 0x96, 0xd3, 0x5c, 0x49, 0x9d, 0xcf, 0xaa, 0x79, 0xa1, 0xd4, 0xe1, 0x74, 0x23, 0x84, 0x16, - 0x3b, 0x66, 0xca, 0x73, 0x2d, 0x87, 0x4a, 0x53, 0x99, 0xa5, 0xa9, 0x73, 0x4e, 0x0f, 0x4d, 0x79, - 0xcb, 0x0d, 0xea, 0x41, 0x92, 0xfa, 0xf8, 0x8a, 0x94, 0x41, 0x8f, 0xef, 0xe5, 0x0e, 0x3e, 0x5e, - 0x3b, 0xe0, 0x3d, 0xa6, 0xd5, 0x74, 0xa8, 0x3f, 0x6e, 0x64, 0xe7, 0xb3, 0x6a, 0x92, 0xef, 0x0d, - 0x01, 0x86, 0x3e, 0x80, 0xe4, 0x17, 0xcc, 0x46, 0x39, 0xaf, 0x2b, 0x7b, 0xe9, 0xc6, 0x2e, 0x13, - 0x68, 0x31, 0xc2, 0x7f, 0x67, 0xd5, 0x2c, 0x5b, 0x3c, 0xb1, 0xb1, 0x19, 0x18, 0x42, 0x48, 0x7b, - 0x08, 0xb0, 0x44, 0x43, 0x2a, 0xc4, 0x9f, 0x91, 0x31, 0xaf, 0xf1, 0xac, 0xc1, 0x96, 0xa8, 0x04, - 0xc9, 0x2f, 0xb1, 0x3d, 0x12, 0x69, 0x9c, 0x35, 0xc4, 0xe6, 0xd1, 0xd6, 0x43, 0xa5, 0xf6, 0x5b, - 0x05, 0x92, 0xfc, 0xe6, 0xd1, 0x3b, 0x00, 0xc7, 0x46, 0xe7, 0xe2, 0xbc, 0xdf, 0xee, 0xb4, 0x9b, - 0x6a, 0x4c, 0x2b, 0x4c, 0xa6, 0xba, 0x48, 0xb1, 0xb6, 0xeb, 0x10, 0xf4, 0x00, 0xb2, 0x82, 0x7d, - 0x78, 0x7a, 0xaa, 0x2a, 0x5a, 0x7e, 0x32, 0xd5, 0x33, 0x9c, 0x7b, 0x68, 0xdb, 0xe8, 0x6d, 0xc8, - 0x08, 0x66, 0xe3, 0x73, 0x75, 0x4b, 0xcb, 0x4d, 0xa6, 0x7a, 0x9a, 0xf3, 0x1a, 0x63, 0xf4, 0x2e, - 0xe4, 0x05, 0xab, 0xf9, 0xf3, 0xa3, 0xe6, 0x79, 0x4f, 0x8d, 0x6b, 0xdb, 0x93, 0xa9, 0x9e, 0xe3, - 0xec, 0xe6, 0xcd, 0x15, 0xf1, 0xa8, 0x96, 0xf8, 0xea, 0x4f, 0x95, 0x58, 0xed, 0xcf, 0x0a, 0x2c, - 0x1d, 0x63, 0xe6, 0x5a, 0x27, 0xed, 0x9e, 0x3c, 0x0c, 0x37, 0xc7, 0xb8, 0xfc, 0x2c, 0xdf, 0x85, - 0x62, 0xc8, 0xec, 0x9f, 0x77, 0x4e, 0xda, 0xbd, 0xae, 0xaa, 0x68, 0xea, 0x64, 0xaa, 0xe7, 0x85, - 0x84, 0xb8, 0xaa, 0xa8, 0x54, 0xb7, 0x69, 0x9c, 0x34, 0xbb, 0xea, 0x56, 0x54, 0x4a, 0xa4, 0x01, - 0xda, 0x87, 0x12, 0x97, 0xea, 0x1e, 0xb5, 0x9a, 0x67, 0x87, 0xcc, 0xbb, 0x7e, 0xef, 0xe4, 0xac, - 0xa9, 0x26, 0xb4, 0x6f, 0x4d, 0xa6, 0xfa, 0x0e, 0x93, 0xed, 0x5e, 0x7d, 0x41, 0x86, 0xf8, 0xd0, - 0xb6, 0x59, 0x21, 0x87, 0xa7, 0xfd, 0xab, 0x02, 0xd9, 0x45, 0xce, 0xa3, 0x16, 0x24, 0xe8, 0xd8, - 0x13, 0x6d, 0xb5, 0x78, 0xf0, 0xc9, 0x7a, 0x95, 0xb2, 0x5c, 0xf5, 0xc6, 0x1e, 0x31, 0x38, 0x42, - 0xed, 0x06, 0x0a, 0x2b, 0x64, 0x54, 0x85, 0x44, 0x18, 0x03, 0x7e, 0x9e, 0x15, 0x26, 0x0f, 0xc6, - 0x3b, 0x10, 0xef, 0x5e, 0x9c, 0xa9, 0x8a, 0x56, 0x9a, 0x4c, 0x75, 0x75, 0x85, 0xdf, 0x1d, 0x0d, - 0xd1, 0xbb, 0x90, 0x3c, 0xea, 0x5c, 0xb4, 0x7b, 0xea, 0x96, 0xb6, 0x3b, 0x99, 0xea, 0x68, 0x45, - 0xe0, 0xc8, 0x1d, 0x39, 0x32, 0xfe, 0x1f, 0x42, 0xbc, 0x87, 0xcd, 0x68, 0xf2, 0xe4, 0x5f, 0x91, - 0x3c, 0xf9, 0x30, 0x79, 0x6a, 0xbf, 0x2f, 0x42, 0x5e, 0x64, 0x73, 0xe0, 0xb9, 0x4e, 0x40, 0xd0, - 0x19, 0xa4, 0xae, 0x7d, 0x3c, 0x24, 0x41, 0x59, 0xe1, 0x85, 0xb0, 0xbf, 0x46, 0x21, 0x08, 0xd5, - 0xfa, 0x13, 0xa6, 0x17, 0xce, 0x87, 0x10, 0x44, 0xfb, 0x2a, 0x05, 0x49, 0x4e, 0x47, 0xa7, 0xb2, - 0xa3, 0xa5, 0x79, 0x07, 0xf9, 0x64, 0x7d, 0x5c, 0x9e, 0x64, 0x1c, 0xa4, 0x15, 0x93, 0x9d, 0xad, - 0x03, 0x29, 0x51, 0xf2, 0xe1, 0x0c, 0xfc, 0x74, 0x7d, 0x38, 0x91, 0x31, 0x12, 0x2f, 0x84, 0x41, - 0x1e, 0xe4, 0xaf, 0x6d, 0x17, 0xd3, 0xbe, 0x68, 0x0a, 0xe1, 0xa4, 0x78, 0xb4, 0x81, 0xf7, 0x4c, - 0x5b, 0xe4, 0xac, 0x08, 0x04, 0xef, 0x37, 0x11, 0x6a, 0x2b, 0x66, 0xe4, 0xae, 0x97, 0x5b, 0x74, - 0x03, 0x45, 0xcb, 0xa1, 0xc4, 0x24, 0xbe, 0xb4, 0x29, 0x06, 0xe8, 0x4f, 0xd6, 0xb7, 0x79, 0x22, - 0xf4, 0xa3, 0x56, 0x77, 0xe6, 0xb3, 0x6a, 0x61, 0x85, 0xde, 0x8a, 0x19, 0x05, 0x2b, 0x4a, 0x40, - 0xbf, 0x86, 0xed, 0x91, 0x13, 0x58, 0xa6, 0x43, 0x06, 0xd2, 0x74, 0x82, 0x9b, 0xfe, 0xe9, 0xfa, - 0xa6, 0x2f, 0x42, 0x80, 0xa8, 0x6d, 0xc4, 0xc6, 0xe4, 0x2a, 0xa3, 0x15, 0x33, 0x8a, 0xa3, 0x15, - 0x0a, 0xf3, 0xfb, 0xd2, 0x75, 0x6d, 0x82, 0x1d, 0x69, 0x3c, 0xb9, 0xa9, 0xdf, 0x0d, 0xa1, 0x7f, - 0xcb, 0xef, 0x15, 0x3a, 0xf3, 0xfb, 0x32, 0x4a, 0x40, 0x14, 0x0a, 0x01, 0xf5, 0x2d, 0xc7, 0x94, - 0x86, 0x53, 0xdc, 0xf0, 0x8f, 0x37, 0xc8, 0x1d, 0xae, 0x1e, 0xb5, 0x2b, 0x66, 0x51, 0x84, 0xdc, - 0x8a, 0x19, 0xf9, 0x20, 0xb2, 0x6f, 0xa4, 0x20, 0xc1, 0x90, 0xb5, 0x1b, 0x80, 0x65, 0x26, 0xa3, - 0xf7, 0x20, 0x43, 0xb1, 0x29, 0x1e, 0x03, 0xac, 0xd2, 0xf2, 0x8d, 0xdc, 0x7c, 0x56, 0x4d, 0xf7, - 0xb0, 0xc9, 0x9f, 0x02, 0x69, 0x2a, 0x16, 0xa8, 0x01, 0xc8, 0xc3, 0x3e, 0xb5, 0xa8, 0xe5, 0x3a, - 0x4c, 0xba, 0xff, 0x25, 0xb6, 0x59, 0x76, 0x32, 0x8d, 0xd2, 0x7c, 0x56, 0x55, 0xcf, 0x25, 0xf7, - 0x29, 0x19, 0x7f, 0x86, 0xed, 0xc0, 0x50, 0xbd, 0x97, 0x28, 0xda, 0x1f, 0x14, 0xc8, 0x45, 0xb2, - 0x1e, 0x3d, 0x82, 0x04, 0xc5, 0xa6, 0xac, 0x70, 0xfd, 0xee, 0xd7, 0x10, 0x36, 0xc3, 0x92, 0xe6, - 0x3a, 0xa8, 0x03, 0x59, 0x26, 0xd8, 0xe7, 0x8d, 0x72, 0x8b, 0x37, 0xca, 0x83, 0xf5, 0xe3, 0xf7, - 0x18, 0x53, 0xcc, 0xdb, 0x64, 0x66, 0x10, 0xae, 0xb4, 0x9f, 0x81, 0xfa, 0x72, 0xe9, 0xb0, 0xb7, - 0xd4, 0xe2, 0x75, 0x25, 0x8e, 0xa9, 0x1a, 0x11, 0x0a, 0xda, 0x85, 0x14, 0x6f, 0x5f, 0x22, 0x10, - 0x8a, 0x11, 0xee, 0xb4, 0x53, 0x40, 0xb7, 0x4b, 0x62, 0x43, 0xb4, 0xf8, 0x02, 0xed, 0x0c, 0xde, - 0x7a, 0x45, 0x96, 0x6f, 0x08, 0x97, 0x88, 0x1e, 0xee, 0x76, 0xde, 0x6e, 0x88, 0x96, 0x59, 0xa0, - 0x3d, 0x85, 0x9d, 0x5b, 0xc9, 0xb8, 0x21, 0x58, 0x56, 0x82, 0xd5, 0xba, 0x90, 0xe5, 0x00, 0xe1, - 0xa8, 0x4a, 0x85, 0x83, 0x36, 0xa6, 0xbd, 0x35, 0x99, 0xea, 0xdb, 0x0b, 0x56, 0x38, 0x6b, 0xab, - 0x90, 0x5a, 0xcc, 0xeb, 0x55, 0x01, 0x71, 0x96, 0x70, 0x12, 0xfd, 0x45, 0x81, 0x8c, 0xbc, 0x6f, - 0xf4, 0x1d, 0x48, 0x3e, 0x39, 0xed, 0x1c, 0xf6, 0xd4, 0x98, 0xb6, 0x33, 0x99, 0xea, 0x05, 0xc9, - 0xe0, 0x57, 0x8f, 0x74, 0x48, 0x9f, 0xb4, 0x7b, 0xcd, 0xe3, 0xa6, 0x21, 0x21, 0x25, 0x3f, 0xbc, - 0x4e, 0x54, 0x83, 0xcc, 0x45, 0xbb, 0x7b, 0x72, 0xdc, 0x6e, 0x3e, 0x56, 0xb7, 0xc4, 0x8c, 0x94, - 0x22, 0xf2, 0x8e, 0x18, 0x4a, 0xa3, 0xd3, 0x39, 0x6d, 0x1e, 0xb6, 0xd5, 0xf8, 0x2a, 0x4a, 0x18, - 0x77, 0x54, 0x81, 0x54, 0xb7, 0x67, 0x9c, 0xb4, 0x8f, 0xd5, 0x84, 0x86, 0x26, 0x53, 0xbd, 0x28, - 0x05, 0x44, 0x28, 0xc3, 0x83, 0xff, 0x51, 0x81, 0xd2, 0x11, 0xf6, 0xf0, 0xa5, 0x65, 0x5b, 0xd4, - 0x22, 0xc1, 0x62, 0x36, 0x76, 0x20, 0x71, 0x85, 0x3d, 0x59, 0x37, 0x77, 0xb7, 0x8d, 0x57, 0x01, - 0x30, 0x62, 0xc0, 0x1f, 0x77, 0x06, 0x07, 0xd2, 0x7e, 0x04, 0xd9, 0x05, 0x69, 0xa3, 0xf7, 0xde, - 0x36, 0x14, 0xf8, 0x33, 0x52, 0x22, 0xd7, 0x1e, 0xc2, 0x4b, 0xdf, 0x27, 0x4c, 0x39, 0xa0, 0xd8, - 0xa7, 0x1c, 0x30, 0x6e, 0x88, 0x0d, 0x33, 0x42, 0x9c, 0x01, 0x07, 0x8c, 0x1b, 0x6c, 0x59, 0xfb, - 0x97, 0x02, 0x45, 0xd9, 0x75, 0x96, 0xdf, 0x5f, 0xac, 0xd6, 0xd7, 0xfe, 0xca, 0xec, 0x61, 0x33, - 0x90, 0xdf, 0x5f, 0x74, 0xb1, 0xfe, 0x7f, 0xfb, 0xca, 0xfc, 0xcd, 0x16, 0xa8, 0x3d, 0x6c, 0x7e, - 0xc6, 0x53, 0xfe, 0x1b, 0xed, 0x2a, 0xfa, 0x36, 0xa4, 0xc3, 0xe1, 0xc2, 0x07, 0x7b, 0xd6, 0x48, - 0x89, 0x71, 0x52, 0xab, 0x43, 0x49, 0xa4, 0xba, 0x8c, 0x42, 0x98, 0xd9, 0xcb, 0xc6, 0xc0, 0x67, - 0x91, 0x6c, 0x0c, 0x07, 0x7f, 0x4f, 0x40, 0xba, 0x2b, 0x2c, 0xa1, 0x5f, 0x42, 0x82, 0xf5, 0x72, - 0xb4, 0xb7, 0xee, 0xa7, 0x91, 0xf6, 0xfd, 0xb5, 0x07, 0xc3, 0x47, 0x0a, 0xb2, 0x00, 0x96, 0xff, - 0x3b, 0xd0, 0x9b, 0x3f, 0x78, 0x57, 0x7e, 0x8c, 0x6c, 0x66, 0xea, 0x73, 0xc8, 0x47, 0xcb, 0x13, - 0xed, 0xde, 0xba, 0xef, 0xe6, 0xd0, 0xa3, 0x63, 0xed, 0x87, 0x1b, 0x57, 0x38, 0x7a, 0x0a, 0xe2, - 0xfb, 0xee, 0xb5, 0x98, 0x3f, 0xb8, 0x13, 0x73, 0xa5, 0xa8, 0xd1, 0x33, 0x90, 0xef, 0x01, 0xf4, - 0xfe, 0x9b, 0x86, 0x74, 0xa4, 0x7e, 0xdf, 0x70, 0xee, 0x57, 0x25, 0xc0, 0x47, 0x0a, 0x72, 0x21, - 0xbb, 0xa8, 0x0e, 0xf4, 0xe1, 0x9b, 0xcc, 0xad, 0x54, 0xd1, 0xbd, 0x0c, 0x36, 0xbe, 0xf7, 0xfc, - 0x9f, 0x95, 0xd8, 0xf3, 0x79, 0x45, 0xf9, 0x7a, 0x5e, 0x51, 0xfe, 0x31, 0xaf, 0x28, 0xbf, 0x7b, - 0x51, 0x89, 0x7d, 0xfd, 0xa2, 0x12, 0xfb, 0xdb, 0x8b, 0x4a, 0xec, 0x17, 0xfc, 0x95, 0xc1, 0x1e, - 0x19, 0xc1, 0x65, 0x8a, 0x87, 0xf0, 0xe3, 0xff, 0x05, 0x00, 0x00, 0xff, 0xff, 0x3f, 0x26, 0xfa, - 0x7c, 0xa5, 0x13, 0x00, 0x00, + // 1578 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xdc, 0x57, 0xcd, 0x6f, 0x23, 0x49, + 0x15, 0x77, 0xfb, 0x33, 0xfd, 0xfc, 0x91, 0x4e, 0xad, 0x09, 0xd9, 0x1e, 0xd6, 0xee, 0xb5, 0xd0, + 0x12, 0xd8, 0x1d, 0x67, 0xc8, 0x0c, 0x62, 0x34, 0xc0, 0xc1, 0xce, 0x38, 0xb1, 0x49, 0x62, 0x47, + 0x6d, 0x67, 0xc4, 0x20, 0x21, 0xab, 0x92, 0x54, 0x7a, 0x5a, 0x63, 0x77, 0x37, 0xdd, 0xed, 0x51, + 0x2c, 0x71, 0xe1, 0xc4, 0xc8, 0x12, 0x12, 0x5c, 0x91, 0x2c, 0x21, 0x71, 0xe4, 0xce, 0x1f, 0xc0, + 0x69, 0x6e, 0xcc, 0x91, 0x93, 0x05, 0x1e, 0x09, 0x89, 0x7f, 0x81, 0x13, 0xaa, 0xaa, 0x2e, 0xbb, + 0x9d, 0x84, 0xc4, 0xe6, 0xb4, 0x9a, 0x5b, 0xd5, 0xfb, 0xf8, 0xbd, 0xf7, 0xaa, 0xde, 0x47, 0x15, + 0xe4, 0x3d, 0xdf, 0x76, 0xb1, 0x41, 0xba, 0xe7, 0x76, 0xbf, 0x6f, 0x5b, 0x65, 0xc7, 0xb5, 0x7d, + 0x1b, 0x3d, 0x30, 0xad, 0xcb, 0xde, 0xe0, 0xea, 0x02, 0xfb, 0xb8, 0xec, 0xf4, 0xb0, 0x7f, 0x69, + 0xbb, 0xfd, 0x72, 0x20, 0xa9, 0xe6, 0x0d, 0xdb, 0xb0, 0x99, 0xdc, 0x0e, 0x5d, 0x71, 0x15, 0xf5, + 0x81, 0x61, 0xdb, 0x46, 0x8f, 0xec, 0xb0, 0xdd, 0xd9, 0xe0, 0x72, 0x87, 0xf4, 0x1d, 0x7f, 0x18, + 0x30, 0x3f, 0xbd, 0xce, 0xc4, 0x96, 0x60, 0xad, 0x3b, 0x2e, 0xb9, 0x30, 0xcf, 0xb1, 0x4f, 0x38, + 0xa1, 0xf4, 0x6f, 0x09, 0x36, 0x74, 0x82, 0x2f, 0xf6, 0xcd, 0x9e, 0x4f, 0x5c, 0x9d, 0xfc, 0x72, + 0x40, 0x3c, 0x1f, 0xd5, 0x20, 0xed, 0x12, 0x7c, 0xd1, 0xf5, 0xec, 0x81, 0x7b, 0x4e, 0xb6, 0x24, + 0x4d, 0xda, 0x4e, 0xef, 0xe6, 0xcb, 0x1c, 0xb7, 0x2c, 0x70, 0xcb, 0x15, 0x6b, 0x58, 0xcd, 0x4d, + 0x27, 0x45, 0xa0, 0x08, 0x6d, 0x26, 0xab, 0x83, 0x3b, 0x5b, 0xa3, 0x03, 0x48, 0xb8, 0xd8, 0x32, + 0xc8, 0x56, 0x94, 0x01, 0x7c, 0x59, 0xbe, 0x23, 0xd0, 0x72, 0xc7, 0xec, 0x13, 0xcf, 0xc7, 0x7d, + 0x47, 0xa7, 0x2a, 0xd5, 0xf8, 0xbb, 0x49, 0x31, 0xa2, 0x73, 0x7d, 0xf4, 0x1c, 0xe4, 0x99, 0xe3, + 0x5b, 0x31, 0x06, 0xf6, 0xc5, 0x9d, 0x60, 0x27, 0x42, 0x5a, 0x9f, 0x2b, 0x96, 0x7e, 0x9b, 0x04, + 0x85, 0x7a, 0x7a, 0xe0, 0xda, 0x03, 0xe7, 0xa3, 0x0e, 0x15, 0x7d, 0x05, 0x60, 0xd0, 0x28, 0xbb, + 0xaf, 0xc9, 0xd0, 0xdb, 0x8a, 0x6b, 0xb1, 0x6d, 0xb9, 0x9a, 0x9d, 0x4e, 0x8a, 0x32, 0x8b, 0xfd, + 0x90, 0x0c, 0x3d, 0x5d, 0x36, 0xc4, 0x12, 0x35, 0x20, 0xc1, 0x36, 0x5b, 0x09, 0x4d, 0xda, 0xce, + 0xed, 0x3e, 0xbe, 0xd3, 0xde, 0xf5, 0x13, 0x2c, 0xf3, 0x0d, 0x47, 0xa0, 0xee, 0x63, 0xc3, 0x70, + 0x89, 0x41, 0xdd, 0x4f, 0x2e, 0xe1, 0x7e, 0x45, 0x48, 0xeb, 0x73, 0x45, 0xf4, 0x15, 0x24, 0x5e, + 0x99, 0x96, 0xef, 0x6d, 0xa5, 0x34, 0x69, 0x3b, 0x55, 0xdd, 0x9c, 0x4e, 0x8a, 0x89, 0x3a, 0x25, + 0xfc, 0x67, 0x52, 0x94, 0xe9, 0x62, 0xbf, 0x87, 0x0d, 0x4f, 0xe7, 0x42, 0xa5, 0xdf, 0x48, 0x90, + 0x60, 0x4e, 0xa0, 0xcf, 0x00, 0x0e, 0xf4, 0xd6, 0xe9, 0x49, 0xb7, 0xd9, 0x6a, 0xd6, 0x94, 0x88, + 0x9a, 0x1d, 0x8d, 0x35, 0x1e, 0x72, 0xd3, 0xb6, 0x08, 0x7a, 0x00, 0x32, 0x67, 0x57, 0x8e, 0x8e, + 0x14, 0x49, 0xcd, 0x8c, 0xc6, 0xda, 0x1a, 0xe3, 0x56, 0x7a, 0x3d, 0xf4, 0x29, 0xac, 0x71, 0x66, + 0xf5, 0xa5, 0x12, 0x55, 0xd3, 0xa3, 0xb1, 0x96, 0x62, 0xbc, 0xea, 0x10, 0x7d, 0x0e, 0x19, 0xce, + 0xaa, 0xfd, 0x6c, 0xaf, 0x76, 0xd2, 0x51, 0x62, 0xea, 0xfa, 0x68, 0xac, 0xa5, 0x19, 0xbb, 0x76, + 0x75, 0x4e, 0x1c, 0x5f, 0x8d, 0xbf, 0xfd, 0x53, 0x21, 0x52, 0xfa, 0xb3, 0x04, 0x73, 0xf7, 0xa8, + 0xb9, 0x7a, 0xa3, 0xd9, 0x11, 0xce, 0x30, 0x73, 0x94, 0xcb, 0x7c, 0xf9, 0x36, 0xe4, 0x02, 0x66, + 0xf7, 0xa4, 0xd5, 0x68, 0x76, 0xda, 0x8a, 0xa4, 0x2a, 0xa3, 0xb1, 0x96, 0xe1, 0x12, 0x27, 0x36, + 0x0d, 0x2d, 0x2c, 0xd5, 0xae, 0xe9, 0x8d, 0x5a, 0x5b, 0x89, 0x86, 0xa5, 0xda, 0xc4, 0x35, 0x89, + 0x87, 0x76, 0x20, 0xcf, 0xa4, 0xda, 0x7b, 0xf5, 0xda, 0x71, 0x85, 0x46, 0xd7, 0xed, 0x34, 0x8e, + 0x6b, 0x4a, 0x5c, 0xfd, 0xc6, 0x68, 0xac, 0x6d, 0x50, 0xd9, 0xf6, 0xf9, 0x2b, 0xd2, 0xc7, 0x95, + 0x5e, 0x8f, 0xe6, 0x5e, 0xe0, 0x6d, 0x16, 0xd2, 0xf4, 0x32, 0x83, 0x7b, 0x2c, 0xfd, 0x4d, 0x02, + 0x79, 0x76, 0x1b, 0xa8, 0x0e, 0x71, 0x7f, 0xe8, 0xf0, 0x82, 0xc8, 0xed, 0x3e, 0x59, 0xee, 0x0e, + 0xe7, 0xab, 0xce, 0xd0, 0x21, 0x3a, 0x43, 0x28, 0x5d, 0x41, 0x76, 0x81, 0x8c, 0x8a, 0x10, 0x0f, + 0x8e, 0x84, 0xb9, 0xb7, 0xc0, 0x64, 0x67, 0xf3, 0x19, 0xc4, 0xda, 0xa7, 0xc7, 0x8a, 0xa4, 0xe6, + 0x47, 0x63, 0x4d, 0x59, 0xe0, 0xb7, 0x07, 0x7d, 0xf4, 0x39, 0x24, 0xf6, 0x5a, 0xa7, 0xcd, 0x8e, + 0x12, 0x55, 0x37, 0x47, 0x63, 0x0d, 0x2d, 0x08, 0xec, 0xd9, 0x03, 0x4b, 0x5c, 0xc7, 0x43, 0x88, + 0x75, 0xb0, 0x81, 0x14, 0x88, 0xbd, 0x26, 0x43, 0x16, 0x49, 0x46, 0xa7, 0x4b, 0x94, 0x87, 0xc4, + 0x1b, 0xdc, 0x1b, 0xf0, 0x6a, 0xcd, 0xe8, 0x7c, 0x53, 0xfa, 0x7d, 0x0e, 0x32, 0xfc, 0x40, 0x3c, + 0xc7, 0xb6, 0x3c, 0x82, 0x8e, 0x21, 0x79, 0xe9, 0xe2, 0x3e, 0xf1, 0xb6, 0x24, 0x2d, 0xb6, 0x9d, + 0xde, 0xdd, 0xb9, 0xb7, 0x30, 0x84, 0x6a, 0x79, 0x9f, 0xea, 0x05, 0x95, 0x1d, 0x80, 0xa8, 0x6f, + 0x93, 0x90, 0x60, 0x74, 0x74, 0x24, 0x0a, 0x2e, 0xc5, 0x2a, 0xe4, 0xc9, 0xf2, 0xb8, 0x2c, 0xe7, + 0x18, 0x48, 0x3d, 0x22, 0x6a, 0xae, 0x05, 0x49, 0x8f, 0x25, 0x42, 0xd0, 0xbd, 0x7e, 0xb0, 0x3c, + 0x1c, 0x4f, 0x20, 0x81, 0x17, 0xc0, 0x20, 0x07, 0x32, 0x97, 0x3d, 0x1b, 0xfb, 0x5d, 0x87, 0x65, + 0x61, 0xd0, 0xd3, 0x9e, 0xad, 0x10, 0x3d, 0xd5, 0xe6, 0x29, 0xcc, 0x0f, 0x62, 0x7d, 0x3a, 0x29, + 0xa6, 0x43, 0xd4, 0x7a, 0x44, 0x4f, 0x5f, 0xce, 0xb7, 0xe8, 0x0a, 0x72, 0xa6, 0xe5, 0x13, 0x83, + 0xb8, 0xc2, 0x26, 0x6f, 0x7d, 0x3f, 0x5e, 0xde, 0x66, 0x83, 0xeb, 0x87, 0xad, 0x6e, 0x4c, 0x27, + 0xc5, 0xec, 0x02, 0xbd, 0x1e, 0xd1, 0xb3, 0x66, 0x98, 0x80, 0x7e, 0x05, 0xeb, 0x03, 0xcb, 0x33, + 0x0d, 0x8b, 0x5c, 0x08, 0xd3, 0x71, 0x66, 0xfa, 0x27, 0xcb, 0x9b, 0x3e, 0x0d, 0x00, 0xc2, 0xb6, + 0xd1, 0x74, 0x52, 0xcc, 0x2d, 0x32, 0xea, 0x11, 0x3d, 0x37, 0x58, 0xa0, 0xd0, 0xb8, 0xcf, 0x6c, + 0xbb, 0x47, 0xb0, 0x25, 0x8c, 0x27, 0x56, 0x8d, 0xbb, 0xca, 0xf5, 0x6f, 0xc4, 0xbd, 0x40, 0xa7, + 0x71, 0x9f, 0x85, 0x09, 0xc8, 0x87, 0xac, 0xe7, 0xbb, 0xa6, 0x65, 0x08, 0xc3, 0xbc, 0x59, 0xff, + 0x68, 0x85, 0xdc, 0x61, 0xea, 0x61, 0xbb, 0xca, 0x74, 0x52, 0xcc, 0x84, 0xc9, 0xf5, 0x88, 0x9e, + 0xf1, 0x42, 0xfb, 0x6a, 0x12, 0xe2, 0x14, 0x59, 0xbd, 0x02, 0x98, 0x67, 0x32, 0xfa, 0x02, 0xd6, + 0x7c, 0x6c, 0xf0, 0x59, 0x45, 0x2b, 0x2d, 0x53, 0x4d, 0x4f, 0x27, 0xc5, 0x54, 0x07, 0x1b, 0x6c, + 0x52, 0xa5, 0x7c, 0xbe, 0x40, 0x55, 0x40, 0x0e, 0x76, 0x7d, 0xd3, 0x37, 0x6d, 0x8b, 0x4a, 0x77, + 0xdf, 0xe0, 0x1e, 0xcd, 0x4e, 0xaa, 0x91, 0x9f, 0x4e, 0x8a, 0xca, 0x89, 0xe0, 0x1e, 0x92, 0xe1, + 0x0b, 0xdc, 0xf3, 0x74, 0xc5, 0xb9, 0x46, 0x51, 0xff, 0x20, 0x41, 0x3a, 0x94, 0xf5, 0xe8, 0x19, + 0xc4, 0x7d, 0x6c, 0x88, 0x0a, 0xd7, 0xee, 0x9e, 0xdb, 0xd8, 0x08, 0x4a, 0x9a, 0xe9, 0xa0, 0x16, + 0xc8, 0x54, 0xb0, 0xcb, 0x1a, 0x65, 0x94, 0x35, 0xca, 0xdd, 0xe5, 0xcf, 0xef, 0x39, 0xf6, 0x31, + 0x6b, 0x93, 0x6b, 0x17, 0xc1, 0x4a, 0xfd, 0x29, 0x28, 0xd7, 0x4b, 0x07, 0x15, 0x00, 0x7c, 0xf1, + 0x5e, 0xe0, 0x6e, 0x2a, 0x7a, 0x88, 0x82, 0x36, 0x21, 0xc9, 0xda, 0x17, 0x3f, 0x08, 0x49, 0x0f, + 0x76, 0xea, 0x11, 0xa0, 0x9b, 0x25, 0xb1, 0x22, 0x5a, 0x6c, 0x86, 0x76, 0x0c, 0x9f, 0xdc, 0x92, + 0xe5, 0x2b, 0xc2, 0xc5, 0xc3, 0xce, 0xdd, 0xcc, 0xdb, 0x15, 0xd1, 0xd6, 0x66, 0x68, 0x87, 0xb0, + 0x71, 0x23, 0x19, 0x57, 0x04, 0x93, 0x05, 0x58, 0xa9, 0x0d, 0x32, 0x03, 0x08, 0x46, 0x55, 0x32, + 0x98, 0xbb, 0x11, 0xf5, 0x93, 0xd1, 0x58, 0x5b, 0x9f, 0xb1, 0x82, 0xd1, 0x5b, 0x84, 0xe4, 0x6c, + 0x7c, 0x2f, 0x0a, 0x70, 0x5f, 0x82, 0x49, 0xf4, 0x17, 0x09, 0xd6, 0xc4, 0x7d, 0xa3, 0x6f, 0x41, + 0x62, 0xff, 0xa8, 0x55, 0xe9, 0x28, 0x11, 0x75, 0x63, 0x34, 0xd6, 0xb2, 0x82, 0xc1, 0xae, 0x1e, + 0x69, 0x90, 0x6a, 0x34, 0x3b, 0xb5, 0x83, 0x9a, 0x2e, 0x20, 0x05, 0x3f, 0xb8, 0x4e, 0x54, 0x82, + 0xb5, 0xd3, 0x66, 0xbb, 0x71, 0xd0, 0xac, 0x3d, 0x57, 0xa2, 0x7c, 0x46, 0x0a, 0x11, 0x71, 0x47, + 0x14, 0xa5, 0xda, 0x6a, 0x1d, 0xd5, 0x2a, 0x4d, 0x25, 0xb6, 0x88, 0x12, 0x9c, 0x3b, 0x2a, 0x40, + 0xb2, 0xdd, 0xd1, 0x1b, 0xcd, 0x03, 0x25, 0xae, 0xa2, 0xd1, 0x58, 0xcb, 0x09, 0x01, 0x7e, 0x94, + 0x81, 0xe3, 0x7f, 0x94, 0x20, 0xbf, 0x87, 0x1d, 0x7c, 0x66, 0xf6, 0x4c, 0xdf, 0x24, 0xde, 0x6c, + 0x36, 0xb6, 0x20, 0x7e, 0x8e, 0x1d, 0x51, 0x37, 0x77, 0xb7, 0x8d, 0xdb, 0x00, 0x28, 0xd1, 0xab, + 0x59, 0xbe, 0x3b, 0xd4, 0x19, 0x90, 0xfa, 0x43, 0x90, 0x67, 0xa4, 0xf0, 0xc8, 0x96, 0x6f, 0x19, + 0xd9, 0x72, 0x30, 0xb2, 0x9f, 0x45, 0x9f, 0x4a, 0xa5, 0x75, 0xc8, 0xb2, 0xb7, 0xa1, 0x40, 0x2e, + 0x3d, 0x85, 0xdc, 0xe2, 0x0b, 0x9b, 0x2a, 0x7b, 0x3e, 0x76, 0x7d, 0x06, 0x18, 0xd3, 0xf9, 0x86, + 0x1a, 0x21, 0xd6, 0x05, 0x03, 0x8c, 0xe9, 0x74, 0x59, 0xfa, 0x97, 0x04, 0x39, 0xd1, 0x75, 0xe6, + 0xff, 0x03, 0x5a, 0xeb, 0x4b, 0xff, 0x0f, 0x3a, 0xd8, 0xf0, 0xc4, 0xff, 0xc0, 0x9f, 0xad, 0xbf, + 0x6e, 0x5f, 0xa1, 0x5f, 0x47, 0x41, 0xe9, 0x60, 0xe3, 0x05, 0x4b, 0xf9, 0x8f, 0x3a, 0x54, 0xf4, + 0x4d, 0x48, 0x05, 0xc3, 0x85, 0x0d, 0x76, 0x59, 0x4f, 0xf2, 0x71, 0x52, 0x2a, 0x43, 0x9e, 0xa7, + 0xba, 0x38, 0x85, 0x20, 0xb3, 0xe7, 0x8d, 0x81, 0xcd, 0x22, 0xd1, 0x18, 0x76, 0xff, 0x9a, 0x80, + 0x54, 0x9b, 0x5b, 0x42, 0xbf, 0x80, 0x38, 0xed, 0xe5, 0x68, 0x7b, 0x89, 0x76, 0xcf, 0x0e, 0x57, + 0xfd, 0xee, 0xd2, 0x83, 0xe1, 0x91, 0x84, 0x4c, 0x80, 0xf9, 0xa7, 0x1c, 0x95, 0xef, 0x55, 0x5d, + 0xf8, 0xbd, 0xaf, 0x66, 0xca, 0x00, 0x79, 0xf6, 0xa3, 0x43, 0x0f, 0x57, 0xfa, 0xf9, 0xad, 0x66, + 0xe8, 0x25, 0x64, 0xc2, 0x7d, 0x00, 0x6d, 0xde, 0x48, 0xac, 0x5a, 0xdf, 0xf1, 0x87, 0xea, 0xf7, + 0x57, 0x6e, 0x25, 0xe8, 0x10, 0xf8, 0xef, 0xf0, 0x7f, 0x62, 0x7e, 0xef, 0x4e, 0xcc, 0x85, 0xee, + 0x81, 0x5e, 0x83, 0x78, 0x78, 0xa0, 0x2f, 0xef, 0x7b, 0x0d, 0x84, 0x1a, 0xc5, 0x3d, 0x7e, 0xdf, + 0x96, 0x69, 0x8f, 0x24, 0x64, 0x83, 0x3c, 0x2b, 0xc3, 0x7b, 0x4e, 0xff, 0x7a, 0xb9, 0xfe, 0x5f, + 0x06, 0xab, 0xdf, 0x79, 0xf7, 0xcf, 0x42, 0xe4, 0xdd, 0xb4, 0x20, 0xbd, 0x9f, 0x16, 0xa4, 0x7f, + 0x4c, 0x0b, 0xd2, 0xef, 0x3e, 0x14, 0x22, 0xef, 0x3f, 0x14, 0x22, 0x7f, 0xff, 0x50, 0x88, 0xfc, + 0x9c, 0x3d, 0x67, 0xe8, 0x6b, 0xc6, 0x3b, 0x4b, 0xb2, 0x23, 0x7c, 0xfc, 0xdf, 0x00, 0x00, 0x00, + 0xff, 0xff, 0x03, 0xca, 0xf6, 0x4a, 0xb3, 0x12, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1357,10 +1373,12 @@ const _ = grpc.SupportPackageIsVersion4 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type StorageClient interface { - // Read performs a read operation using the given ReadRequest + // TODO(jlapacik): Remove this unsupported call Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (Storage_ReadClient, error) // ReadFilter performs a filter operation at storage ReadFilter(ctx context.Context, in *ReadFilterRequest, opts ...grpc.CallOption) (Storage_ReadFilterClient, error) + // ReadGroup performs a group operation at storage + ReadGroup(ctx context.Context, in *ReadGroupRequest, opts ...grpc.CallOption) (Storage_ReadGroupClient, error) // Capabilities returns a map of keys and values identifying the capabilities supported by the storage engine Capabilities(ctx context.Context, in *types.Empty, opts ...grpc.CallOption) (*CapabilitiesResponse, error) Hints(ctx context.Context, in *types.Empty, opts ...grpc.CallOption) (*HintsResponse, error) @@ -1442,6 +1460,38 @@ func (x *storageReadFilterClient) Recv() (*ReadResponse, error) { return m, nil } +func (c *storageClient) ReadGroup(ctx context.Context, in *ReadGroupRequest, opts ...grpc.CallOption) (Storage_ReadGroupClient, error) { + stream, err := c.cc.NewStream(ctx, &_Storage_serviceDesc.Streams[2], "/influxdata.platform.storage.Storage/ReadGroup", opts...) + if err != nil { + return nil, err + } + x := &storageReadGroupClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Storage_ReadGroupClient interface { + Recv() (*ReadResponse, error) + grpc.ClientStream +} + +type storageReadGroupClient struct { + grpc.ClientStream +} + +func (x *storageReadGroupClient) Recv() (*ReadResponse, error) { + m := new(ReadResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func (c *storageClient) Capabilities(ctx context.Context, in *types.Empty, opts ...grpc.CallOption) (*CapabilitiesResponse, error) { out := new(CapabilitiesResponse) err := c.cc.Invoke(ctx, "/influxdata.platform.storage.Storage/Capabilities", in, out, opts...) @@ -1461,7 +1511,7 @@ func (c *storageClient) Hints(ctx context.Context, in *types.Empty, opts ...grpc } func (c *storageClient) TagKeys(ctx context.Context, in *TagKeysRequest, opts ...grpc.CallOption) (Storage_TagKeysClient, error) { - stream, err := c.cc.NewStream(ctx, &_Storage_serviceDesc.Streams[2], "/influxdata.platform.storage.Storage/TagKeys", opts...) + stream, err := c.cc.NewStream(ctx, &_Storage_serviceDesc.Streams[3], "/influxdata.platform.storage.Storage/TagKeys", opts...) if err != nil { return nil, err } @@ -1493,7 +1543,7 @@ func (x *storageTagKeysClient) Recv() (*StringValuesResponse, error) { } func (c *storageClient) TagValues(ctx context.Context, in *TagValuesRequest, opts ...grpc.CallOption) (Storage_TagValuesClient, error) { - stream, err := c.cc.NewStream(ctx, &_Storage_serviceDesc.Streams[3], "/influxdata.platform.storage.Storage/TagValues", opts...) + stream, err := c.cc.NewStream(ctx, &_Storage_serviceDesc.Streams[4], "/influxdata.platform.storage.Storage/TagValues", opts...) if err != nil { return nil, err } @@ -1526,10 +1576,12 @@ func (x *storageTagValuesClient) Recv() (*StringValuesResponse, error) { // StorageServer is the server API for Storage service. type StorageServer interface { - // Read performs a read operation using the given ReadRequest + // TODO(jlapacik): Remove this unsupported call Read(*ReadRequest, Storage_ReadServer) error // ReadFilter performs a filter operation at storage ReadFilter(*ReadFilterRequest, Storage_ReadFilterServer) error + // ReadGroup performs a group operation at storage + ReadGroup(*ReadGroupRequest, Storage_ReadGroupServer) error // Capabilities returns a map of keys and values identifying the capabilities supported by the storage engine Capabilities(context.Context, *types.Empty) (*CapabilitiesResponse, error) Hints(context.Context, *types.Empty) (*HintsResponse, error) @@ -1585,6 +1637,27 @@ func (x *storageReadFilterServer) Send(m *ReadResponse) error { return x.ServerStream.SendMsg(m) } +func _Storage_ReadGroup_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(ReadGroupRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(StorageServer).ReadGroup(m, &storageReadGroupServer{stream}) +} + +type Storage_ReadGroupServer interface { + Send(*ReadResponse) error + grpc.ServerStream +} + +type storageReadGroupServer struct { + grpc.ServerStream +} + +func (x *storageReadGroupServer) Send(m *ReadResponse) error { + return x.ServerStream.SendMsg(m) +} + func _Storage_Capabilities_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(types.Empty) if err := dec(in); err != nil { @@ -1687,6 +1760,11 @@ var _Storage_serviceDesc = grpc.ServiceDesc{ Handler: _Storage_ReadFilter_Handler, ServerStreams: true, }, + { + StreamName: "ReadGroup", + Handler: _Storage_ReadGroup_Handler, + ServerStreams: true, + }, { StreamName: "TagKeys", Handler: _Storage_TagKeys_Handler, @@ -1747,7 +1825,7 @@ func (m *ReadFilterRequest) MarshalTo(dAtA []byte) (int, error) { return i, nil } -func (m *ReadRequest) Marshal() (dAtA []byte, err error) { +func (m *ReadGroupRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) n, err := m.MarshalTo(dAtA) @@ -1757,28 +1835,38 @@ func (m *ReadRequest) Marshal() (dAtA []byte, err error) { return dAtA[:n], nil } -func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) { +func (m *ReadGroupRequest) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l + if m.ReadSource != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintStorageCommon(dAtA, i, uint64(m.ReadSource.Size())) + n4, err := m.ReadSource.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n4 + } dAtA[i] = 0x12 i++ - i = encodeVarintStorageCommon(dAtA, i, uint64(m.TimestampRange.Size())) - n4, err := m.TimestampRange.MarshalTo(dAtA[i:]) + i = encodeVarintStorageCommon(dAtA, i, uint64(m.Range.Size())) + n5, err := m.Range.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n4 - if m.Descending { - dAtA[i] = 0x18 + i += n5 + if m.Predicate != nil { + dAtA[i] = 0x1a i++ - if m.Descending { - dAtA[i] = 1 - } else { - dAtA[i] = 0 + i = encodeVarintStorageCommon(dAtA, i, uint64(m.Predicate.Size())) + n6, err := m.Predicate.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err } - i++ + i += n6 } if len(m.GroupKeys) > 0 { for _, s := range m.GroupKeys { @@ -1795,79 +1883,45 @@ func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) { i += copy(dAtA[i:], s) } } - if m.Predicate != nil { - dAtA[i] = 0x2a - i++ - i = encodeVarintStorageCommon(dAtA, i, uint64(m.Predicate.Size())) - n5, err := m.Predicate.MarshalTo(dAtA[i:]) - if err != nil { - return 0, err - } - i += n5 - } - if m.SeriesLimit != 0 { - dAtA[i] = 0x30 - i++ - i = encodeVarintStorageCommon(dAtA, i, uint64(m.SeriesLimit)) - } - if m.SeriesOffset != 0 { - dAtA[i] = 0x38 - i++ - i = encodeVarintStorageCommon(dAtA, i, uint64(m.SeriesOffset)) - } - if m.PointsLimit != 0 { - dAtA[i] = 0x40 - i++ - i = encodeVarintStorageCommon(dAtA, i, uint64(m.PointsLimit)) - } - if m.Aggregate != nil { - dAtA[i] = 0x4a - i++ - i = encodeVarintStorageCommon(dAtA, i, uint64(m.Aggregate.Size())) - n6, err := m.Aggregate.MarshalTo(dAtA[i:]) - if err != nil { - return 0, err - } - i += n6 - } - if len(m.Trace) > 0 { - for k, _ := range m.Trace { - dAtA[i] = 0x52 - i++ - v := m.Trace[k] - mapSize := 1 + len(k) + sovStorageCommon(uint64(len(k))) + 1 + len(v) + sovStorageCommon(uint64(len(v))) - i = encodeVarintStorageCommon(dAtA, i, uint64(mapSize)) - dAtA[i] = 0xa - i++ - i = encodeVarintStorageCommon(dAtA, i, uint64(len(k))) - i += copy(dAtA[i:], k) - dAtA[i] = 0x12 - i++ - i = encodeVarintStorageCommon(dAtA, i, uint64(len(v))) - i += copy(dAtA[i:], v) - } - } if m.Group != 0 { - dAtA[i] = 0x58 + dAtA[i] = 0x28 i++ i = encodeVarintStorageCommon(dAtA, i, uint64(m.Group)) } - if m.Hints != 0 { - dAtA[i] = 0x65 + if m.Aggregate != nil { + dAtA[i] = 0x32 i++ - encoding_binary.LittleEndian.PutUint32(dAtA[i:], uint32(m.Hints)) - i += 4 - } - if m.ReadSource != nil { - dAtA[i] = 0x6a - i++ - i = encodeVarintStorageCommon(dAtA, i, uint64(m.ReadSource.Size())) - n7, err := m.ReadSource.MarshalTo(dAtA[i:]) + i = encodeVarintStorageCommon(dAtA, i, uint64(m.Aggregate.Size())) + n7, err := m.Aggregate.MarshalTo(dAtA[i:]) if err != nil { return 0, err } i += n7 } + if m.Hints != 0 { + dAtA[i] = 0x3d + i++ + encoding_binary.LittleEndian.PutUint32(dAtA[i:], uint32(m.Hints)) + i += 4 + } + return i, nil +} + +func (m *ReadRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l return i, nil } @@ -2587,16 +2641,21 @@ func (m *ReadFilterRequest) Size() (n int) { return n } -func (m *ReadRequest) Size() (n int) { +func (m *ReadGroupRequest) Size() (n int) { if m == nil { return 0 } var l int _ = l - l = m.TimestampRange.Size() + if m.ReadSource != nil { + l = m.ReadSource.Size() + n += 1 + l + sovStorageCommon(uint64(l)) + } + l = m.Range.Size() n += 1 + l + sovStorageCommon(uint64(l)) - if m.Descending { - n += 2 + if m.Predicate != nil { + l = m.Predicate.Size() + n += 1 + l + sovStorageCommon(uint64(l)) } if len(m.GroupKeys) > 0 { for _, s := range m.GroupKeys { @@ -2604,41 +2663,25 @@ func (m *ReadRequest) Size() (n int) { n += 1 + l + sovStorageCommon(uint64(l)) } } - if m.Predicate != nil { - l = m.Predicate.Size() - n += 1 + l + sovStorageCommon(uint64(l)) - } - if m.SeriesLimit != 0 { - n += 1 + sovStorageCommon(uint64(m.SeriesLimit)) - } - if m.SeriesOffset != 0 { - n += 1 + sovStorageCommon(uint64(m.SeriesOffset)) - } - if m.PointsLimit != 0 { - n += 1 + sovStorageCommon(uint64(m.PointsLimit)) + if m.Group != 0 { + n += 1 + sovStorageCommon(uint64(m.Group)) } if m.Aggregate != nil { l = m.Aggregate.Size() n += 1 + l + sovStorageCommon(uint64(l)) } - if len(m.Trace) > 0 { - for k, v := range m.Trace { - _ = k - _ = v - mapEntrySize := 1 + len(k) + sovStorageCommon(uint64(len(k))) + 1 + len(v) + sovStorageCommon(uint64(len(v))) - n += mapEntrySize + 1 + sovStorageCommon(uint64(mapEntrySize)) - } - } - if m.Group != 0 { - n += 1 + sovStorageCommon(uint64(m.Group)) - } if m.Hints != 0 { n += 5 } - if m.ReadSource != nil { - l = m.ReadSource.Size() - n += 1 + l + sovStorageCommon(uint64(l)) + return n +} + +func (m *ReadRequest) Size() (n int) { + if m == nil { + return 0 } + var l int + _ = l return n } @@ -3176,7 +3219,7 @@ func (m *ReadFilterRequest) Unmarshal(dAtA []byte) error { } return nil } -func (m *ReadRequest) Unmarshal(dAtA []byte) error { +func (m *ReadGroupRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 for iNdEx < l { @@ -3199,383 +3242,13 @@ func (m *ReadRequest) Unmarshal(dAtA []byte) error { fieldNum := int32(wire >> 3) wireType := int(wire & 0x7) if wireType == 4 { - return fmt.Errorf("proto: ReadRequest: wiretype end group for non-group") + return fmt.Errorf("proto: ReadGroupRequest: wiretype end group for non-group") } if fieldNum <= 0 { - return fmt.Errorf("proto: ReadRequest: illegal tag %d (wire type %d)", fieldNum, wire) + return fmt.Errorf("proto: ReadGroupRequest: illegal tag %d (wire type %d)", fieldNum, wire) } switch fieldNum { - case 2: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field TimestampRange", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowStorageCommon - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthStorageCommon - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthStorageCommon - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if err := m.TimestampRange.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 3: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Descending", wireType) - } - var v int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowStorageCommon - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - v |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - m.Descending = bool(v != 0) - case 4: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field GroupKeys", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowStorageCommon - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLen |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthStorageCommon - } - postIndex := iNdEx + intStringLen - if postIndex < 0 { - return ErrInvalidLengthStorageCommon - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.GroupKeys = append(m.GroupKeys, string(dAtA[iNdEx:postIndex])) - iNdEx = postIndex - case 5: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Predicate", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowStorageCommon - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthStorageCommon - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthStorageCommon - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Predicate == nil { - m.Predicate = &Predicate{} - } - if err := m.Predicate.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 6: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field SeriesLimit", wireType) - } - m.SeriesLimit = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowStorageCommon - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.SeriesLimit |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 7: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field SeriesOffset", wireType) - } - m.SeriesOffset = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowStorageCommon - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.SeriesOffset |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 8: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field PointsLimit", wireType) - } - m.PointsLimit = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowStorageCommon - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.PointsLimit |= int64(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 9: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Aggregate", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowStorageCommon - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthStorageCommon - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthStorageCommon - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Aggregate == nil { - m.Aggregate = &Aggregate{} - } - if err := m.Aggregate.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 10: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Trace", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowStorageCommon - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - msglen |= int(b&0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthStorageCommon - } - postIndex := iNdEx + msglen - if postIndex < 0 { - return ErrInvalidLengthStorageCommon - } - if postIndex > l { - return io.ErrUnexpectedEOF - } - if m.Trace == nil { - m.Trace = make(map[string]string) - } - var mapkey string - var mapvalue string - for iNdEx < postIndex { - entryPreIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowStorageCommon - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - if fieldNum == 1 { - var stringLenmapkey uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowStorageCommon - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLenmapkey |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLenmapkey := int(stringLenmapkey) - if intStringLenmapkey < 0 { - return ErrInvalidLengthStorageCommon - } - postStringIndexmapkey := iNdEx + intStringLenmapkey - if postStringIndexmapkey < 0 { - return ErrInvalidLengthStorageCommon - } - if postStringIndexmapkey > l { - return io.ErrUnexpectedEOF - } - mapkey = string(dAtA[iNdEx:postStringIndexmapkey]) - iNdEx = postStringIndexmapkey - } else if fieldNum == 2 { - var stringLenmapvalue uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowStorageCommon - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - stringLenmapvalue |= uint64(b&0x7F) << shift - if b < 0x80 { - break - } - } - intStringLenmapvalue := int(stringLenmapvalue) - if intStringLenmapvalue < 0 { - return ErrInvalidLengthStorageCommon - } - postStringIndexmapvalue := iNdEx + intStringLenmapvalue - if postStringIndexmapvalue < 0 { - return ErrInvalidLengthStorageCommon - } - if postStringIndexmapvalue > l { - return io.ErrUnexpectedEOF - } - mapvalue = string(dAtA[iNdEx:postStringIndexmapvalue]) - iNdEx = postStringIndexmapvalue - } else { - iNdEx = entryPreIndex - skippy, err := skipStorageCommon(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthStorageCommon - } - if (iNdEx + skippy) > postIndex { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - m.Trace[mapkey] = mapvalue - iNdEx = postIndex - case 11: - if wireType != 0 { - return fmt.Errorf("proto: wrong wireType = %d for field Group", wireType) - } - m.Group = 0 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowStorageCommon - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - m.Group |= ReadRequest_Group(b&0x7F) << shift - if b < 0x80 { - break - } - } - case 12: - if wireType != 5 { - return fmt.Errorf("proto: wrong wireType = %d for field Hints", wireType) - } - m.Hints = 0 - if (iNdEx + 4) > l { - return io.ErrUnexpectedEOF - } - m.Hints = HintFlags(encoding_binary.LittleEndian.Uint32(dAtA[iNdEx:])) - iNdEx += 4 - case 13: + case 1: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field ReadSource", wireType) } @@ -3611,6 +3284,225 @@ func (m *ReadRequest) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Range", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStorageCommon + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStorageCommon + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStorageCommon + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.Range.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Predicate", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStorageCommon + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStorageCommon + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStorageCommon + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Predicate == nil { + m.Predicate = &Predicate{} + } + if err := m.Predicate.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field GroupKeys", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStorageCommon + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthStorageCommon + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthStorageCommon + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.GroupKeys = append(m.GroupKeys, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Group", wireType) + } + m.Group = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStorageCommon + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Group |= ReadGroupRequest_Group(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Aggregate", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStorageCommon + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthStorageCommon + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthStorageCommon + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Aggregate == nil { + m.Aggregate = &Aggregate{} + } + if err := m.Aggregate.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 7: + if wireType != 5 { + return fmt.Errorf("proto: wrong wireType = %d for field Hints", wireType) + } + m.Hints = 0 + if (iNdEx + 4) > l { + return io.ErrUnexpectedEOF + } + m.Hints = HintFlags(encoding_binary.LittleEndian.Uint32(dAtA[iNdEx:])) + iNdEx += 4 + default: + iNdEx = preIndex + skippy, err := skipStorageCommon(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthStorageCommon + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthStorageCommon + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ReadRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowStorageCommon + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ReadRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ReadRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { default: iNdEx = preIndex skippy, err := skipStorageCommon(dAtA[iNdEx:]) diff --git a/storage/reads/datatypes/storage_common.proto b/storage/reads/datatypes/storage_common.proto index f09575a6a5..0d0c550c9b 100644 --- a/storage/reads/datatypes/storage_common.proto +++ b/storage/reads/datatypes/storage_common.proto @@ -13,12 +13,15 @@ option (gogoproto.unmarshaler_all) = true; option (gogoproto.goproto_getters_all) = false; service Storage { - // Read performs a read operation using the given ReadRequest + // TODO(jlapacik): Remove this unsupported call rpc Read (ReadRequest) returns (stream ReadResponse); // ReadFilter performs a filter operation at storage rpc ReadFilter (ReadFilterRequest) returns (stream ReadResponse); + // ReadGroup performs a group operation at storage + rpc ReadGroup (ReadGroupRequest) returns (stream ReadResponse); + // Capabilities returns a map of keys and values identifying the capabilities supported by the storage engine rpc Capabilities (google.protobuf.Empty) returns (CapabilitiesResponse); @@ -40,8 +43,11 @@ message ReadFilterRequest { Predicate predicate = 3; } -// Request message for Storage.Read. -message ReadRequest { +message ReadGroupRequest { + google.protobuf.Any read_source = 1 [(gogoproto.customname) = "ReadSource"]; + TimestampRange range = 2 [(gogoproto.nullable) = false]; + Predicate predicate = 3; + enum Group { option (gogoproto.goproto_enum_prefix) = false; @@ -60,6 +66,17 @@ message ReadRequest { GROUP_EXCEPT = 3 [(gogoproto.enumvalue_customname) = "GroupExcept"]; } + // GroupKeys specifies a list of tag keys used to order the data. + // It is dependent on the Group property to determine its behavior. + repeated string group_keys = 4 [(gogoproto.customname) = "GroupKeys"]; + + Group group = 5; + Aggregate aggregate = 6; + + // TODO(jlapacik): This field is only used in unit tests. + // Specifically the two tests in group_resultset_test.go. + // This field should be removed and the tests that depend + // on it refactored. enum HintFlags { option (gogoproto.goproto_enum_prefix) = false; @@ -69,45 +86,12 @@ message ReadRequest { // HintSchemaAllTime performs schema queries without using time ranges HINT_SCHEMA_ALL_TIME = 0x04 [(gogoproto.enumvalue_customname) = "HintSchemaAllTime"]; } - - google.protobuf.Any read_source = 13 [(gogoproto.customname) = "ReadSource"]; - - TimestampRange timestamp_range = 2 [(gogoproto.customname) = "TimestampRange", (gogoproto.nullable) = false]; - - // Descending indicates whether points should be returned in descending order. - bool descending = 3; - - // GroupKeys specifies a list of tag keys used to order the data. It is dependent on the Group property to determine - // its behavior. - repeated string group_keys = 4 [(gogoproto.customname) = "GroupKeys"]; - - // - Group group = 11; - - // Aggregate specifies an optional aggregate to apply to the data. - // TODO(sgc): switch to slice for multiple aggregates in a single request - Aggregate aggregate = 9; - - Predicate predicate = 5; - - // SeriesLimit determines the maximum number of series to be returned for the request. Specify 0 for no limit. - int64 series_limit = 6 [(gogoproto.customname) = "SeriesLimit"]; - - // SeriesOffset determines how many series to skip before processing the request. - int64 series_offset = 7 [(gogoproto.customname) = "SeriesOffset"]; - - // PointsLimit determines the maximum number of values per series to be returned for the request. - // Specify 0 for no limit. -1 to return series frames only. - int64 points_limit = 8 [(gogoproto.customname) = "PointsLimit"]; - - // Trace contains opaque data if a trace is active. - map trace = 10 [(gogoproto.customname) = "Trace"]; - - // Hints is a bitwise OR of HintFlags to control the behavior - // of the read request. - fixed32 hints = 12 [(gogoproto.customname) = "Hints", (gogoproto.casttype) = "HintFlags"]; + fixed32 hints = 7 [(gogoproto.customname) = "Hints", (gogoproto.casttype) = "HintFlags"]; } +// TODO(jlapacik): Remove this message +message ReadRequest {} + message Aggregate { enum AggregateType { option (gogoproto.goproto_enum_prefix) = false; @@ -127,7 +111,7 @@ message Tag { bytes value = 2; } -// Response message for Storage.Read. +// Response message for ReadFilter and ReadGroup message ReadResponse { enum FrameType { option (gogoproto.goproto_enum_prefix) = false; @@ -161,7 +145,7 @@ message ReadResponse { message GroupFrame { // TagKeys repeated bytes tag_keys = 1 [(gogoproto.customname) = "TagKeys"]; - // PartitionKeyVals is the values of the partition key for this group, order matching ReadRequest.GroupKeys + // PartitionKeyVals is the values of the partition key for this group, order matching ReadGroupRequest.GroupKeys repeated bytes partition_key_vals = 2 [(gogoproto.customname) = "PartitionKeyVals"]; } @@ -233,9 +217,3 @@ message TagValuesRequest { message StringValuesResponse { repeated bytes values = 1; } - -//message ExplainRequest { -// ReadRequest read_request = 1 [(gogoproto.customname) = "ReadRequest"]; -//} -// -//message ExplainResponse {} diff --git a/storage/reads/group_resultset.go b/storage/reads/group_resultset.go index b44cdafaf2..caf1737fa8 100644 --- a/storage/reads/group_resultset.go +++ b/storage/reads/group_resultset.go @@ -4,9 +4,10 @@ import ( "bytes" "context" "fmt" - "github.com/influxdata/influxdb/kit/tracing" + "math" "sort" + "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/storage/reads/datatypes" "github.com/influxdata/influxdb/tsdb/cursors" @@ -14,7 +15,7 @@ import ( type groupResultSet struct { ctx context.Context - req *datatypes.ReadRequest + req *datatypes.ReadGroupRequest agg *datatypes.Aggregate mb multiShardCursors @@ -42,7 +43,7 @@ func GroupOptionNilSortLo() GroupOption { } } -func NewGroupResultSet(ctx context.Context, req *datatypes.ReadRequest, newCursorFn func() (SeriesCursor, error), opts ...GroupOption) GroupResultSet { +func NewGroupResultSet(ctx context.Context, req *datatypes.ReadGroupRequest, newCursorFn func() (SeriesCursor, error), opts ...GroupOption) GroupResultSet { g := &groupResultSet{ ctx: ctx, req: req, @@ -56,7 +57,7 @@ func NewGroupResultSet(ctx context.Context, req *datatypes.ReadRequest, newCurso o(g) } - g.mb = newMultiShardArrayCursors(ctx, req.TimestampRange.Start, req.TimestampRange.End, !req.Descending, req.PointsLimit) + g.mb = newMultiShardArrayCursors(ctx, req.Range.Start, req.Range.End, true, math.MaxInt64) for i, k := range req.GroupKeys { g.keys[i] = []byte(k) diff --git a/storage/reads/group_resultset_test.go b/storage/reads/group_resultset_test.go index 7c1624a96d..c4c2b4257e 100644 --- a/storage/reads/group_resultset_test.go +++ b/storage/reads/group_resultset_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/data/gen" "github.com/influxdata/influxdb/storage/reads" @@ -16,7 +17,7 @@ func TestGroupGroupResultSetSorting(t *testing.T) { tests := []struct { name string cur reads.SeriesCursor - group datatypes.ReadRequest_Group + group datatypes.ReadGroupRequest_Group keys []string exp string }{ @@ -181,7 +182,14 @@ group: var hints datatypes.HintFlags hints.SetHintSchemaAllTime() - rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadRequest{Group: tt.group, GroupKeys: tt.keys, Hints: hints}, newCursor) + rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{ + Group: tt.group, + GroupKeys: tt.keys, + // TODO(jlapacik): + // Hints is not used except for the tests in this file. + // Eventually this field should be removed entirely. + Hints: hints, + }, newCursor) sb := new(strings.Builder) GroupResultSetToString(sb, rs, SkipNilCursor()) @@ -202,7 +210,7 @@ func TestNewGroupResultSet_GroupNone_NoDataReturnsNil(t *testing.T) { )}, nil } - rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadRequest{Group: datatypes.GroupNone}, newCursor) + rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.GroupNone}, newCursor) if rs != nil { t.Errorf("expected nil cursor") } @@ -217,7 +225,7 @@ func TestNewGroupResultSet_GroupBy_NoDataReturnsNil(t *testing.T) { )}, nil } - rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadRequest{Group: datatypes.GroupBy, GroupKeys: []string{"tag0"}}, newCursor) + rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.GroupBy, GroupKeys: []string{"tag0"}}, newCursor) if rs != nil { t.Errorf("expected nil cursor") } @@ -300,7 +308,14 @@ group: var hints datatypes.HintFlags hints.SetHintSchemaAllTime() - rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadRequest{Group: datatypes.GroupBy, GroupKeys: tt.keys, Hints: hints}, newCursor, tt.opts...) + rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{ + Group: datatypes.GroupBy, + GroupKeys: tt.keys, + // TODO(jlapacik): + // Hints is not used except for the tests in this file. + // Eventually this field should be removed entirely. + Hints: hints, + }, newCursor, tt.opts...) sb := new(strings.Builder) GroupResultSetToString(sb, rs, SkipNilCursor()) @@ -364,9 +379,7 @@ func BenchmarkNewGroupResultSet_GroupBy(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - var hints datatypes.HintFlags - hints.SetHintSchemaAllTime() - rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadRequest{Group: datatypes.GroupBy, GroupKeys: []string{"tag2"}, Hints: hints}, newCursor) + rs := reads.NewGroupResultSet(context.Background(), &datatypes.ReadGroupRequest{Group: datatypes.GroupBy, GroupKeys: []string{"tag2"}}, newCursor) rs.Close() } } diff --git a/storage/reads/reader.go b/storage/reads/reader.go index 2614325af6..76bd10d91e 100644 --- a/storage/reads/reader.go +++ b/storage/reads/reader.go @@ -1,7 +1,6 @@ package reads import ( - "bytes" "context" "fmt" "strings" @@ -33,27 +32,20 @@ func NewReader(s Store) influxdb.Reader { } func (r *storeReader) Read(ctx context.Context, rs influxdb.ReadSpec, start, stop execute.Time, alloc *memory.Allocator) (influxdb.TableIterator, error) { - var predicate *datatypes.Predicate - if rs.Predicate != nil { - p, err := toStoragePredicate(rs.Predicate) - if err != nil { - return nil, err - } - predicate = p - } - - return &tableIterator{ - ctx: ctx, - bounds: execute.Bounds{Start: start, Stop: stop}, - s: r.s, - readSpec: rs, - predicate: predicate, - alloc: alloc, - }, nil + return nil, nil } func (r *storeReader) ReadFilter(ctx context.Context, spec influxdb.ReadFilterSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) { - return &simpleTableIterator{ + return &filterIterator{ + ctx: ctx, + s: r.s, + spec: spec, + alloc: alloc, + }, nil +} + +func (r *storeReader) ReadGroup(ctx context.Context, spec influxdb.ReadGroupSpec, alloc *memory.Allocator) (influxdb.TableIterator, error) { + return &groupIterator{ ctx: ctx, s: r.s, spec: spec, @@ -103,7 +95,7 @@ func (r *storeReader) ReadTagValues(ctx context.Context, spec influxdb.ReadTagVa func (r *storeReader) Close() {} -type simpleTableIterator struct { +type filterIterator struct { ctx context.Context s Store spec influxdb.ReadFilterSpec @@ -111,12 +103,12 @@ type simpleTableIterator struct { alloc *memory.Allocator } -func (bi *simpleTableIterator) Statistics() cursors.CursorStats { return bi.stats } +func (fi *filterIterator) Statistics() cursors.CursorStats { return fi.stats } -func (bi *simpleTableIterator) Do(f func(flux.Table) error) error { - src := bi.s.GetSource( - uint64(bi.spec.OrganizationID), - uint64(bi.spec.BucketID), +func (fi *filterIterator) Do(f func(flux.Table) error) error { + src := fi.s.GetSource( + uint64(fi.spec.OrganizationID), + uint64(fi.spec.BucketID), ) // Setup read request @@ -126,8 +118,8 @@ func (bi *simpleTableIterator) Do(f func(flux.Table) error) error { } var predicate *datatypes.Predicate - if bi.spec.Predicate != nil { - p, err := toStoragePredicate(bi.spec.Predicate) + if fi.spec.Predicate != nil { + p, err := toStoragePredicate(fi.spec.Predicate) if err != nil { return err } @@ -137,10 +129,10 @@ func (bi *simpleTableIterator) Do(f func(flux.Table) error) error { var req datatypes.ReadFilterRequest req.ReadSource = any req.Predicate = predicate - req.Range.Start = int64(bi.spec.Bounds.Start) - req.Range.End = int64(bi.spec.Bounds.Stop) + req.Range.Start = int64(fi.spec.Bounds.Start) + req.Range.End = int64(fi.spec.Bounds.Stop) - rs, err := bi.s.ReadFilter(bi.ctx, &req) + rs, err := fi.s.ReadFilter(fi.ctx, &req) if err != nil { return err } @@ -149,10 +141,10 @@ func (bi *simpleTableIterator) Do(f func(flux.Table) error) error { return nil } - return bi.handleRead(f, rs) + return fi.handleRead(f, rs) } -func (bi *simpleTableIterator) handleRead(f func(flux.Table) error, rs ResultSet) error { +func (fi *filterIterator) handleRead(f func(flux.Table) error, rs ResultSet) error { // these resources must be closed if not nil on return var ( cur cursors.Cursor @@ -177,25 +169,25 @@ READ: continue } - bnds := bi.spec.Bounds + bnds := fi.spec.Bounds key := defaultGroupKeyForSeries(rs.Tags(), bnds) done := make(chan struct{}) switch typedCur := cur.(type) { case cursors.IntegerArrayCursor: cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt) - table = newIntegerTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, bi.alloc) + table = newIntegerTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.alloc) case cursors.FloatArrayCursor: cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat) - table = newFloatTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, bi.alloc) + table = newFloatTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.alloc) case cursors.UnsignedArrayCursor: cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt) - table = newUnsignedTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, bi.alloc) + table = newUnsignedTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.alloc) case cursors.BooleanArrayCursor: cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool) - table = newBooleanTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, bi.alloc) + table = newBooleanTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.alloc) case cursors.StringArrayCursor: cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString) - table = newStringTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, bi.alloc) + table = newStringTable(done, typedCur, bnds, key, cols, rs.Tags(), defs, fi.alloc) default: panic(fmt.Sprintf("unreachable: %T", typedCur)) } @@ -210,214 +202,79 @@ READ: } select { case <-done: - case <-bi.ctx.Done(): + case <-fi.ctx.Done(): table.Cancel() break READ } } stats := table.Statistics() - bi.stats.ScannedValues += stats.ScannedValues - bi.stats.ScannedBytes += stats.ScannedBytes + fi.stats.ScannedValues += stats.ScannedValues + fi.stats.ScannedBytes += stats.ScannedBytes table.Close() table = nil } return rs.Err() } -type tableIterator struct { - ctx context.Context - bounds execute.Bounds - s Store - readSpec influxdb.ReadSpec - predicate *datatypes.Predicate - stats cursors.CursorStats - alloc *memory.Allocator +type groupIterator struct { + ctx context.Context + s Store + spec influxdb.ReadGroupSpec + stats cursors.CursorStats + alloc *memory.Allocator } -func (bi *tableIterator) Statistics() cursors.CursorStats { return bi.stats } +func (gi *groupIterator) Statistics() cursors.CursorStats { return gi.stats } -func (bi *tableIterator) Do(f func(flux.Table) error) error { - src := bi.s.GetSource( - uint64(bi.readSpec.OrganizationID), - uint64(bi.readSpec.BucketID), +func (gi *groupIterator) Do(f func(flux.Table) error) error { + src := gi.s.GetSource( + uint64(gi.spec.OrganizationID), + uint64(gi.spec.BucketID), ) // Setup read request - var req datatypes.ReadRequest - if any, err := types.MarshalAny(src); err != nil { + any, err := types.MarshalAny(src) + if err != nil { return err - } else { - req.ReadSource = any - } - req.Predicate = bi.predicate - req.Descending = bi.readSpec.Descending - req.TimestampRange.Start = int64(bi.bounds.Start) - req.TimestampRange.End = int64(bi.bounds.Stop) - req.Group = convertGroupMode(bi.readSpec.GroupMode) - req.GroupKeys = bi.readSpec.GroupKeys - req.SeriesLimit = bi.readSpec.SeriesLimit - req.PointsLimit = bi.readSpec.PointsLimit - req.SeriesOffset = bi.readSpec.SeriesOffset - - if req.PointsLimit == -1 { - req.Hints.SetNoPoints() } - if agg, err := determineAggregateMethod(bi.readSpec.AggregateMethod); err != nil { + var predicate *datatypes.Predicate + if gi.spec.Predicate != nil { + p, err := toStoragePredicate(gi.spec.Predicate) + if err != nil { + return err + } + predicate = p + } + + var req datatypes.ReadGroupRequest + req.ReadSource = any + req.Predicate = predicate + req.Range.Start = int64(gi.spec.Bounds.Start) + req.Range.End = int64(gi.spec.Bounds.Stop) + + req.Group = convertGroupMode(gi.spec.GroupMode) + req.GroupKeys = gi.spec.GroupKeys + + if agg, err := determineAggregateMethod(gi.spec.AggregateMethod); err != nil { return err } else if agg != datatypes.AggregateTypeNone { req.Aggregate = &datatypes.Aggregate{Type: agg} } - switch { - case req.Group != datatypes.GroupAll: - rs, err := bi.s.GroupRead(bi.ctx, &req) - if err != nil { - return err - } - - if rs == nil { - return nil - } - - if req.Hints.NoPoints() { - return bi.handleGroupReadNoPoints(f, rs) - } - return bi.handleGroupRead(f, rs) - - default: - rs, err := bi.s.Read(bi.ctx, &req) - if err != nil { - return err - } - - if rs == nil { - return nil - } - - if req.Hints.NoPoints() { - return bi.handleReadNoPoints(f, rs) - } - return bi.handleRead(f, rs) + rs, err := gi.s.ReadGroup(gi.ctx, &req) + if err != nil { + return err } + + if rs == nil { + return nil + } + return gi.handleRead(f, rs) } -func (bi *tableIterator) handleRead(f func(flux.Table) error, rs ResultSet) error { - // these resources must be closed if not nil on return - var ( - cur cursors.Cursor - table storageTable - ) - - defer func() { - if table != nil { - table.Close() - } - if cur != nil { - cur.Close() - } - rs.Close() - }() - -READ: - for rs.Next() { - cur = rs.Cursor() - if cur == nil { - // no data for series key + field combination - continue - } - - key := groupKeyForSeries(rs.Tags(), &bi.readSpec, bi.bounds) - done := make(chan struct{}) - switch typedCur := cur.(type) { - case cursors.IntegerArrayCursor: - cols, defs := determineTableColsForSeries(rs.Tags(), flux.TInt) - table = newIntegerTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs, bi.alloc) - case cursors.FloatArrayCursor: - cols, defs := determineTableColsForSeries(rs.Tags(), flux.TFloat) - table = newFloatTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs, bi.alloc) - case cursors.UnsignedArrayCursor: - cols, defs := determineTableColsForSeries(rs.Tags(), flux.TUInt) - table = newUnsignedTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs, bi.alloc) - case cursors.BooleanArrayCursor: - cols, defs := determineTableColsForSeries(rs.Tags(), flux.TBool) - table = newBooleanTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs, bi.alloc) - case cursors.StringArrayCursor: - cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString) - table = newStringTable(done, typedCur, bi.bounds, key, cols, rs.Tags(), defs, bi.alloc) - default: - panic(fmt.Sprintf("unreachable: %T", typedCur)) - } - - cur = nil - - if !table.Empty() { - if err := f(table); err != nil { - table.Close() - table = nil - return err - } - select { - case <-done: - case <-bi.ctx.Done(): - table.Cancel() - break READ - } - } - - stats := table.Statistics() - bi.stats.ScannedValues += stats.ScannedValues - bi.stats.ScannedBytes += stats.ScannedBytes - table.Close() - table = nil - } - return rs.Err() -} - -func (bi *tableIterator) handleReadNoPoints(f func(flux.Table) error, rs ResultSet) error { - // these resources must be closed if not nil on return - var table storageTable - - defer func() { - if table != nil { - table.Close() - } - rs.Close() - }() - -READ: - for rs.Next() { - cur := rs.Cursor() - if !hasPoints(cur) { - // no data for series key + field combination - continue - } - - key := groupKeyForSeries(rs.Tags(), &bi.readSpec, bi.bounds) - done := make(chan struct{}) - cols, defs := determineTableColsForSeries(rs.Tags(), flux.TString) - table = newTableNoPoints(done, bi.bounds, key, cols, rs.Tags(), defs, bi.alloc) - - if err := f(table); err != nil { - table.Close() - table = nil - return err - } - select { - case <-done: - case <-bi.ctx.Done(): - table.Cancel() - break READ - } - - table.Close() - table = nil - } - return rs.Err() -} - -func (bi *tableIterator) handleGroupRead(f func(flux.Table) error, rs GroupResultSet) error { +func (gi *groupIterator) handleRead(f func(flux.Table) error, rs GroupResultSet) error { // these resources must be closed if not nil on return var ( gc GroupCursor @@ -454,24 +311,25 @@ READ: continue } - key := groupKeyForGroup(gc.PartitionKeyVals(), &bi.readSpec, bi.bounds) + bnds := gi.spec.Bounds + key := groupKeyForGroup(gc.PartitionKeyVals(), &gi.spec, bnds) done := make(chan struct{}) switch typedCur := cur.(type) { case cursors.IntegerArrayCursor: cols, defs := determineTableColsForGroup(gc.Keys(), flux.TInt) - table = newIntegerGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs, bi.alloc) + table = newIntegerGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.alloc) case cursors.FloatArrayCursor: cols, defs := determineTableColsForGroup(gc.Keys(), flux.TFloat) - table = newFloatGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs, bi.alloc) + table = newFloatGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.alloc) case cursors.UnsignedArrayCursor: cols, defs := determineTableColsForGroup(gc.Keys(), flux.TUInt) - table = newUnsignedGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs, bi.alloc) + table = newUnsignedGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.alloc) case cursors.BooleanArrayCursor: cols, defs := determineTableColsForGroup(gc.Keys(), flux.TBool) - table = newBooleanGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs, bi.alloc) + table = newBooleanGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.alloc) case cursors.StringArrayCursor: cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString) - table = newStringGroupTable(done, gc, typedCur, bi.bounds, key, cols, gc.Tags(), defs, bi.alloc) + table = newStringGroupTable(done, gc, typedCur, bnds, key, cols, gc.Tags(), defs, gi.alloc) default: panic(fmt.Sprintf("unreachable: %T", typedCur)) } @@ -487,61 +345,14 @@ READ: } select { case <-done: - case <-bi.ctx.Done(): + case <-gi.ctx.Done(): table.Cancel() break READ } stats := table.Statistics() - bi.stats.ScannedValues += stats.ScannedValues - bi.stats.ScannedBytes += stats.ScannedBytes - table.Close() - table = nil - - gc = rs.Next() - } - return rs.Err() -} - -func (bi *tableIterator) handleGroupReadNoPoints(f func(flux.Table) error, rs GroupResultSet) error { - // these resources must be closed if not nil on return - var ( - gc GroupCursor - table storageTable - ) - - defer func() { - if table != nil { - table.Close() - } - if gc != nil { - gc.Close() - } - rs.Close() - }() - - gc = rs.Next() -READ: - for gc != nil { - key := groupKeyForGroup(gc.PartitionKeyVals(), &bi.readSpec, bi.bounds) - done := make(chan struct{}) - cols, defs := determineTableColsForGroup(gc.Keys(), flux.TString) - table = newGroupTableNoPoints(done, bi.bounds, key, cols, defs, bi.alloc) - gc.Close() - gc = nil - - if err := f(table); err != nil { - table.Close() - table = nil - return err - } - select { - case <-done: - case <-bi.ctx.Done(): - table.Cancel() - break READ - } - + gi.stats.ScannedValues += stats.ScannedValues + gi.stats.ScannedBytes += stats.ScannedBytes table.Close() table = nil @@ -561,7 +372,7 @@ func determineAggregateMethod(agg string) (datatypes.Aggregate_AggregateType, er return 0, fmt.Errorf("unknown aggregate type %q", agg) } -func convertGroupMode(m influxdb.GroupMode) datatypes.ReadRequest_Group { +func convertGroupMode(m influxdb.GroupMode) datatypes.ReadGroupRequest_Group { switch m { case influxdb.GroupModeNone: return datatypes.GroupNone @@ -636,49 +447,6 @@ func defaultGroupKeyForSeries(tags models.Tags, bnds execute.Bounds) flux.GroupK return execute.NewGroupKey(cols, vs) } -func groupKeyForSeries(tags models.Tags, readSpec *influxdb.ReadSpec, bnds execute.Bounds) flux.GroupKey { - cols := make([]flux.ColMeta, 2, len(tags)) - vs := make([]values.Value, 2, len(tags)) - cols[0] = flux.ColMeta{ - Label: execute.DefaultStartColLabel, - Type: flux.TTime, - } - vs[0] = values.NewTime(bnds.Start) - cols[1] = flux.ColMeta{ - Label: execute.DefaultStopColLabel, - Type: flux.TTime, - } - vs[1] = values.NewTime(bnds.Stop) - switch readSpec.GroupMode { - case influxdb.GroupModeBy: - // group key in GroupKeys order, including tags in the GroupKeys slice - for _, k := range readSpec.GroupKeys { - bk := []byte(k) - for _, t := range tags { - if bytes.Equal(t.Key, bk) && len(t.Value) > 0 { - cols = append(cols, flux.ColMeta{ - Label: k, - Type: flux.TString, - }) - vs = append(vs, values.NewString(string(t.Value))) - } - } - } - case influxdb.GroupModeExcept: - // group key in GroupKeys order, skipping tags in the GroupKeys slice - panic("not implemented") - case influxdb.GroupModeDefault, influxdb.GroupModeAll: - for i := range tags { - cols = append(cols, flux.ColMeta{ - Label: string(tags[i].Key), - Type: flux.TString, - }) - vs = append(vs, values.NewString(string(tags[i].Value))) - } - } - return execute.NewGroupKey(cols, vs) -} - func determineTableColsForGroup(tagKeys [][]byte, typ flux.ColType) ([]flux.ColMeta, [][]byte) { cols := make([]flux.ColMeta, 4+len(tagKeys)) defs := make([][]byte, 4+len(tagKeys)) @@ -709,9 +477,9 @@ func determineTableColsForGroup(tagKeys [][]byte, typ flux.ColType) ([]flux.ColM return cols, defs } -func groupKeyForGroup(kv [][]byte, readSpec *influxdb.ReadSpec, bnds execute.Bounds) flux.GroupKey { - cols := make([]flux.ColMeta, 2, len(readSpec.GroupKeys)+2) - vs := make([]values.Value, 2, len(readSpec.GroupKeys)+2) +func groupKeyForGroup(kv [][]byte, spec *influxdb.ReadGroupSpec, bnds execute.Bounds) flux.GroupKey { + cols := make([]flux.ColMeta, 2, len(spec.GroupKeys)+2) + vs := make([]values.Value, 2, len(spec.GroupKeys)+2) cols[0] = flux.ColMeta{ Label: execute.DefaultStartColLabel, Type: flux.TTime, @@ -722,12 +490,12 @@ func groupKeyForGroup(kv [][]byte, readSpec *influxdb.ReadSpec, bnds execute.Bou Type: flux.TTime, } vs[1] = values.NewTime(bnds.Stop) - for i := range readSpec.GroupKeys { - if readSpec.GroupKeys[i] == execute.DefaultStartColLabel || readSpec.GroupKeys[i] == execute.DefaultStopColLabel { + for i := range spec.GroupKeys { + if spec.GroupKeys[i] == execute.DefaultStartColLabel || spec.GroupKeys[i] == execute.DefaultStopColLabel { continue } cols = append(cols, flux.ColMeta{ - Label: readSpec.GroupKeys[i], + Label: spec.GroupKeys[i], Type: flux.TString, }) vs = append(vs, values.NewString(string(kv[i]))) diff --git a/storage/reads/resultset.go b/storage/reads/resultset.go index bbc43bb031..a508d8cb7f 100644 --- a/storage/reads/resultset.go +++ b/storage/reads/resultset.go @@ -22,16 +22,7 @@ type resultSet struct { mb multiShardCursors } -func NewResultSet(ctx context.Context, req *datatypes.ReadRequest, cur SeriesCursor) ResultSet { - return &resultSet{ - ctx: ctx, - agg: req.Aggregate, - cur: cur, - mb: newMultiShardArrayCursors(ctx, req.TimestampRange.Start, req.TimestampRange.End, !req.Descending, req.PointsLimit), - } -} - -func NewResultSetFromFilter(ctx context.Context, req *datatypes.ReadFilterRequest, cur SeriesCursor) ResultSet { +func NewFilteredResultSet(ctx context.Context, req *datatypes.ReadFilterRequest, cur SeriesCursor) ResultSet { return &resultSet{ ctx: ctx, cur: cur, diff --git a/storage/reads/store.go b/storage/reads/store.go index e373ac8811..dedb6be19e 100644 --- a/storage/reads/store.go +++ b/storage/reads/store.go @@ -74,10 +74,14 @@ type GroupCursor interface { } type Store interface { - Read(ctx context.Context, req *datatypes.ReadRequest) (ResultSet, error) ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest) (ResultSet, error) - GroupRead(ctx context.Context, req *datatypes.ReadRequest) (GroupResultSet, error) - GetSource(orgID, bucketID uint64) proto.Message + ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest) (GroupResultSet, error) + TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cursors.StringIterator, error) TagValues(ctx context.Context, req *datatypes.TagValuesRequest) (cursors.StringIterator, error) + + // Deprecated method; should use ReadFilter and ReadGroup instead. + Read(ctx context.Context, req *datatypes.ReadRequest) (ResultSet, error) + + GetSource(orgID, bucketID uint64) proto.Message } diff --git a/storage/readservice/store.go b/storage/readservice/store.go index 44fbe285ff..a4371e717d 100644 --- a/storage/readservice/store.go +++ b/storage/readservice/store.go @@ -3,7 +3,6 @@ package readservice import ( "context" "errors" - "math" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" @@ -44,22 +43,10 @@ func (s *store) ReadFilter(ctx context.Context, req *datatypes.ReadFilterRequest cur = ic } - return reads.NewResultSetFromFilter(ctx, req, cur), nil + return reads.NewFilteredResultSet(ctx, req, cur), nil } -func (s *store) Read(ctx context.Context, req *datatypes.ReadRequest) (reads.ResultSet, error) { - if len(req.GroupKeys) > 0 { - panic("Read: len(Grouping) > 0") - } - - if req.Hints.NoPoints() { - req.PointsLimit = -1 - } - - if req.PointsLimit == 0 { - req.PointsLimit = math.MaxInt64 - } - +func (s *store) ReadGroup(ctx context.Context, req *datatypes.ReadGroupRequest) (reads.GroupResultSet, error) { if req.ReadSource == nil { return nil, errors.New("missing read source") } @@ -69,60 +56,6 @@ func (s *store) Read(ctx context.Context, req *datatypes.ReadRequest) (reads.Res return nil, err } - if req.TimestampRange.Start == 0 { - req.TimestampRange.Start = math.MaxInt64 - } - - if req.TimestampRange.End == 0 { - req.TimestampRange.End = math.MaxInt64 - } - - var cur reads.SeriesCursor - if ic, err := newIndexSeriesCursor(ctx, &source, req.Predicate, s.engine); err != nil { - return nil, err - } else if ic == nil { - return nil, nil - } else { - cur = ic - } - - if req.SeriesLimit > 0 || req.SeriesOffset > 0 { - cur = reads.NewLimitSeriesCursor(ctx, cur, req.SeriesLimit, req.SeriesOffset) - } - - return reads.NewResultSet(ctx, req, cur), nil -} - -func (s *store) GroupRead(ctx context.Context, req *datatypes.ReadRequest) (reads.GroupResultSet, error) { - if req.SeriesLimit > 0 || req.SeriesOffset > 0 { - return nil, errors.New("groupRead: SeriesLimit and SeriesOffset not supported when Grouping") - } - - if req.Hints.NoPoints() { - req.PointsLimit = -1 - } - - if req.PointsLimit == 0 { - req.PointsLimit = math.MaxInt64 - } - - if req.ReadSource == nil { - return nil, errors.New("missing read source") - } - - source, err := getReadSource(*req.ReadSource) - if err != nil { - return nil, err - } - - if req.TimestampRange.Start <= 0 { - req.TimestampRange.Start = math.MinInt64 - } - - if req.TimestampRange.End <= 0 { - req.TimestampRange.End = math.MaxInt64 - } - newCursor := func() (reads.SeriesCursor, error) { cur, err := newIndexSeriesCursor(ctx, &source, req.Predicate, s.engine) if cur == nil || err != nil { @@ -134,6 +67,10 @@ func (s *store) GroupRead(ctx context.Context, req *datatypes.ReadRequest) (read return reads.NewGroupResultSet(ctx, req, newCursor), nil } +func (s *store) Read(ctx context.Context, req *datatypes.ReadRequest) (reads.ResultSet, error) { + return nil, nil +} + func (s *store) TagKeys(ctx context.Context, req *datatypes.TagKeysRequest) (cursors.StringIterator, error) { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish()