diff --git a/generated_types/protos/influxdata/platform/storage/storage_common.proto b/generated_types/protos/influxdata/platform/storage/storage_common.proto index 35126eeeb0..14cd2a181d 100644 --- a/generated_types/protos/influxdata/platform/storage/storage_common.proto +++ b/generated_types/protos/influxdata/platform/storage/storage_common.proto @@ -64,7 +64,7 @@ message ReadGroupRequest { enum Group { // option (gogoproto.goproto_enum_prefix) = false; - + // GroupNone returns all series as a single group. // The single GroupFrame.TagKeys will be the union of all tag keys. GroupNone = 0; @@ -81,7 +81,7 @@ message ReadGroupRequest { Aggregate aggregate = 6; // Deprecated field only used in TSM storage-related tests. - reserved "Hints"; + reserved "Hints"; } message Aggregate { @@ -204,7 +204,7 @@ message TagKeysRequest { // TagValuesRequest is the request message for Storage.TagValues. message TagValuesRequest { - google.protobuf.Any TagsSource = 1; + google.protobuf.Any TagsSource = 1; TimestampRange range = 2; // [(gogoproto.nullable) = false]; Predicate predicate = 3; bytes tag_key = 4; @@ -294,6 +294,9 @@ message ReadWindowAggregateRequest { int64 Offset = 6; repeated Aggregate aggregate = 5; Window window = 7; + // TagKeyMetaNames determines the key format used for the measurement and field + // tags. + TagKeyMetaNames tag_key_meta_names = 8; } message TagValuesGroupedByMeasurementAndTagKeyRequest { diff --git a/influxdb_iox/src/commands/storage/request.rs b/influxdb_iox/src/commands/storage/request.rs index 9d619d661b..6698f98a64 100644 --- a/influxdb_iox/src/commands/storage/request.rs +++ b/influxdb_iox/src/commands/storage/request.rs @@ -85,6 +85,7 @@ pub fn read_window_aggregate( offset, aggregate, window, + tag_key_meta_names: TagKeyMetaNames::Text as i32, }) } diff --git a/service_grpc_influxrpc/src/service.rs b/service_grpc_influxrpc/src/service.rs index 0003296dfb..94704debc3 100644 --- a/service_grpc_influxrpc/src/service.rs +++ b/service_grpc_influxrpc/src/service.rs @@ -320,12 +320,20 @@ where let gby_agg = expr::make_read_group_aggregate(aggregate, group, group_keys) .context(ConvertingReadGroupAggregateSnafu { aggregate_string })?; - let results = query_group_impl(Arc::clone(&db), db_name, range, predicate, gby_agg, &ctx) - .await - .map_err(|e| e.to_status())? - .into_iter() - .map(Ok) - .collect::>(); + let results = query_group_impl( + Arc::clone(&db), + db_name, + range, + predicate, + gby_agg, + TagKeyMetaNames::Text, + &ctx, + ) + .await + .map_err(|e| e.to_status())? + .into_iter() + .map(Ok) + .collect::>(); if results.iter().all(|r| r.is_ok()) { query_completed_token.set_success(); @@ -374,9 +382,10 @@ where offset, aggregate, window, + tag_key_meta_names, } = req; - info!(%db_name, ?range, ?window_every, ?offset, ?aggregate, ?window, predicate=%predicate.loggable(), "read_window_aggregate"); + info!(%db_name, ?range, ?window_every, ?offset, ?aggregate, ?window, predicate=%predicate.loggable(), ?tag_key_meta_names, "read_window_aggregate"); let aggregate_string = format!( "aggregate: {:?}, window_every: {:?}, offset: {:?}, window: {:?}", @@ -386,12 +395,20 @@ where let gby_agg = expr::make_read_window_aggregate(aggregate, window_every, offset, window) .context(ConvertingWindowAggregateSnafu { aggregate_string })?; - let results = query_group_impl(Arc::clone(&db), db_name, range, predicate, gby_agg, &ctx) - .await - .map_err(|e| e.to_status())? - .into_iter() - .map(Ok) - .collect::>(); + let results = query_group_impl( + Arc::clone(&db), + db_name, + range, + predicate, + gby_agg, + TagKeyMetaNames::from_i32(tag_key_meta_names).unwrap_or_default(), + &ctx, + ) + .await + .map_err(|e| e.to_status())? + .into_iter() + .map(Ok) + .collect::>(); if results.iter().all(|r| r.is_ok()) { query_completed_token.set_success(); @@ -1225,6 +1242,7 @@ async fn query_group_impl( range: Option, rpc_predicate: Option, gby_agg: GroupByAndAggregate, + tag_key_meta_names: TagKeyMetaNames, ctx: &IOxSessionContext, ) -> Result, Error> where @@ -1269,9 +1287,8 @@ where .context(GroupingSeriesSnafu { db_name }) .log_if_error("Running Grouped SeriesSet Plan")?; - // ReadGroupRequest does not have a field to control the format of - // _measurement and _field tag keys, so always request in string format. - let response = series_or_groups_to_read_response(series_or_groups, false); + let tag_key_binary_format = tag_key_meta_names == TagKeyMetaNames::Binary; + let response = series_or_groups_to_read_response(series_or_groups, tag_key_binary_format); Ok(vec![response]) } @@ -2791,6 +2808,7 @@ mod tests { }], // old skool window definition window: None, + tag_key_meta_names: TagKeyMetaNames::Text as i32, }; let frames = fixture @@ -2855,6 +2873,7 @@ mod tests { negative: false, }), }), + tag_key_meta_names: TagKeyMetaNames::Text as i32, }; let frames = fixture @@ -2903,6 +2922,7 @@ mod tests { }], // old skool window definition window: None, + tag_key_meta_names: TagKeyMetaNames::Text as i32, }; let response_string = fixture @@ -3125,6 +3145,7 @@ mod tests { negative: false, }), }), + tag_key_meta_names: TagKeyMetaNames::Text as i32, }; let streaming_resp = service .read_window_aggregate(tonic::Request::new(request)) diff --git a/test_helpers_end_to_end/src/grpc.rs b/test_helpers_end_to_end/src/grpc.rs index fe675217b2..6a36afc429 100644 --- a/test_helpers_end_to_end/src/grpc.rs +++ b/test_helpers_end_to_end/src/grpc.rs @@ -5,7 +5,7 @@ use generated_types::{ read_group_request::Group, Aggregate, MeasurementFieldsRequest, MeasurementNamesRequest, MeasurementTagKeysRequest, MeasurementTagValuesRequest, Node, Predicate, ReadFilterRequest, ReadGroupRequest, ReadSource, - ReadWindowAggregateRequest, TagKeysRequest, TagValuesRequest, TimestampRange, + ReadWindowAggregateRequest, TagKeyMetaNames, TagKeysRequest, TagValuesRequest, TimestampRange, }; use prost::Message; @@ -346,6 +346,7 @@ impl GrpcRequestBuilder { offset: self.offset.expect("no offset specified"), aggregate, window: None, + tag_key_meta_names: TagKeyMetaNames::Text as i32, }) } }