From 07cdb99192c6822647d5abffdd55a435e62752db Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Fri, 22 Jul 2022 11:26:42 +0200 Subject: [PATCH] chore: Revert "Sync ReadWindowAggregate API: TagKeyMetaNames" (#5184) We're noticing a possible regression (OOMs) in our testing cluster that roughly correlates with this. --- .../platform/storage/storage_common.proto | 5 +- influxdb_iox/src/commands/storage/request.rs | 1 - service_grpc_influxrpc/src/service.rs | 53 ++++++------------- test_helpers_end_to_end/src/grpc.rs | 3 +- 4 files changed, 19 insertions(+), 43 deletions(-) diff --git a/generated_types/protos/influxdata/platform/storage/storage_common.proto b/generated_types/protos/influxdata/platform/storage/storage_common.proto index 14cd2a181d..d006699e4f 100644 --- a/generated_types/protos/influxdata/platform/storage/storage_common.proto +++ b/generated_types/protos/influxdata/platform/storage/storage_common.proto @@ -294,9 +294,8 @@ message ReadWindowAggregateRequest { int64 Offset = 6; repeated Aggregate aggregate = 5; Window window = 7; - // TagKeyMetaNames determines the key format used for the measurement and field - // tags. - TagKeyMetaNames tag_key_meta_names = 8; + reserved 8; + reserved "tag_key_meta_names"; } message TagValuesGroupedByMeasurementAndTagKeyRequest { diff --git a/influxdb_iox/src/commands/storage/request.rs b/influxdb_iox/src/commands/storage/request.rs index 6698f98a64..9d619d661b 100644 --- a/influxdb_iox/src/commands/storage/request.rs +++ b/influxdb_iox/src/commands/storage/request.rs @@ -85,7 +85,6 @@ pub fn read_window_aggregate( offset, aggregate, window, - tag_key_meta_names: TagKeyMetaNames::Text as i32, }) } diff --git a/service_grpc_influxrpc/src/service.rs b/service_grpc_influxrpc/src/service.rs index 1c23abe882..9fecc3af87 100644 --- a/service_grpc_influxrpc/src/service.rs +++ b/service_grpc_influxrpc/src/service.rs @@ -330,20 +330,12 @@ where let gby_agg = expr::make_read_group_aggregate(aggregate, group, group_keys) .context(ConvertingReadGroupAggregateSnafu { aggregate_string })?; - let results = query_group_impl( - Arc::clone(&db), - db_name, - range, - predicate, - gby_agg, - TagKeyMetaNames::Text, - &ctx, - ) - .await - .map_err(|e| e.to_status())? - .into_iter() - .map(Ok) - .collect::>(); + let results = query_group_impl(Arc::clone(&db), db_name, range, predicate, gby_agg, &ctx) + .await + .map_err(|e| e.to_status())? + .into_iter() + .map(Ok) + .collect::>(); if results.iter().all(|r| r.is_ok()) { query_completed_token.set_success(); @@ -397,10 +389,9 @@ where offset, aggregate, window, - tag_key_meta_names, } = req; - info!(%db_name, ?range, ?window_every, ?offset, ?aggregate, ?window, predicate=%predicate.loggable(), ?tag_key_meta_names, "read_window_aggregate"); + info!(%db_name, ?range, ?window_every, ?offset, ?aggregate, ?window, predicate=%predicate.loggable(), "read_window_aggregate"); let aggregate_string = format!( "aggregate: {:?}, window_every: {:?}, offset: {:?}, window: {:?}", @@ -410,20 +401,12 @@ where let gby_agg = expr::make_read_window_aggregate(aggregate, window_every, offset, window) .context(ConvertingWindowAggregateSnafu { aggregate_string })?; - let results = query_group_impl( - Arc::clone(&db), - db_name, - range, - predicate, - gby_agg, - TagKeyMetaNames::from_i32(tag_key_meta_names).unwrap_or_default(), - &ctx, - ) - .await - .map_err(|e| e.to_status())? - .into_iter() - .map(Ok) - .collect::>(); + let results = query_group_impl(Arc::clone(&db), db_name, range, predicate, gby_agg, &ctx) + .await + .map_err(|e| e.to_status())? + .into_iter() + .map(Ok) + .collect::>(); if results.iter().all(|r| r.is_ok()) { query_completed_token.set_success(); @@ -1292,7 +1275,6 @@ async fn query_group_impl( range: Option, rpc_predicate: Option, gby_agg: GroupByAndAggregate, - tag_key_meta_names: TagKeyMetaNames, ctx: &IOxSessionContext, ) -> Result, Error> where @@ -1337,8 +1319,9 @@ where .context(GroupingSeriesSnafu { db_name }) .log_if_error("Running Grouped SeriesSet Plan")?; - let tag_key_binary_format = tag_key_meta_names == TagKeyMetaNames::Binary; - let response = series_or_groups_to_read_response(series_or_groups, tag_key_binary_format); + // ReadGroupRequest does not have a field to control the format of + // _measurement and _field tag keys, so always request in string format. + let response = series_or_groups_to_read_response(series_or_groups, false); Ok(vec![response]) } @@ -2858,7 +2841,6 @@ mod tests { }], // old skool window definition window: None, - tag_key_meta_names: TagKeyMetaNames::Text as i32, }; let frames = fixture @@ -2923,7 +2905,6 @@ mod tests { negative: false, }), }), - tag_key_meta_names: TagKeyMetaNames::Text as i32, }; let frames = fixture @@ -2972,7 +2953,6 @@ mod tests { }], // old skool window definition window: None, - tag_key_meta_names: TagKeyMetaNames::Text as i32, }; let response_string = fixture @@ -3195,7 +3175,6 @@ mod tests { negative: false, }), }), - tag_key_meta_names: TagKeyMetaNames::Text as i32, }; let streaming_resp = service .read_window_aggregate(tonic::Request::new(request)) diff --git a/test_helpers_end_to_end/src/grpc.rs b/test_helpers_end_to_end/src/grpc.rs index 6a36afc429..fe675217b2 100644 --- a/test_helpers_end_to_end/src/grpc.rs +++ b/test_helpers_end_to_end/src/grpc.rs @@ -5,7 +5,7 @@ use generated_types::{ read_group_request::Group, Aggregate, MeasurementFieldsRequest, MeasurementNamesRequest, MeasurementTagKeysRequest, MeasurementTagValuesRequest, Node, Predicate, ReadFilterRequest, ReadGroupRequest, ReadSource, - ReadWindowAggregateRequest, TagKeyMetaNames, TagKeysRequest, TagValuesRequest, TimestampRange, + ReadWindowAggregateRequest, TagKeysRequest, TagValuesRequest, TimestampRange, }; use prost::Message; @@ -346,7 +346,6 @@ impl GrpcRequestBuilder { offset: self.offset.expect("no offset specified"), aggregate, window: None, - tag_key_meta_names: TagKeyMetaNames::Text as i32, }) } }