fix: Sync ReadWindowAggregate API: TagKeyMetaNames

The storage API has been updated in https://github.com/influxdata/idpe/pull/12868
in January, but since we forked the `.proto` files we never noticed.
pull/24376/head
Marko Mikulicic 2022-07-21 14:26:50 +02:00
parent b35502ce61
commit 21d033eafd
4 changed files with 46 additions and 20 deletions

View File

@ -64,7 +64,7 @@ message ReadGroupRequest {
enum Group {
// option (gogoproto.goproto_enum_prefix) = false;
// GroupNone returns all series as a single group.
// The single GroupFrame.TagKeys will be the union of all tag keys.
GroupNone = 0;
@ -81,7 +81,7 @@ message ReadGroupRequest {
Aggregate aggregate = 6;
// Deprecated field only used in TSM storage-related tests.
reserved "Hints";
reserved "Hints";
}
message Aggregate {
@ -204,7 +204,7 @@ message TagKeysRequest {
// TagValuesRequest is the request message for Storage.TagValues.
message TagValuesRequest {
google.protobuf.Any TagsSource = 1;
google.protobuf.Any TagsSource = 1;
TimestampRange range = 2; // [(gogoproto.nullable) = false];
Predicate predicate = 3;
bytes tag_key = 4;
@ -294,6 +294,9 @@ message ReadWindowAggregateRequest {
int64 Offset = 6;
repeated Aggregate aggregate = 5;
Window window = 7;
// TagKeyMetaNames determines the key format used for the measurement and field
// tags.
TagKeyMetaNames tag_key_meta_names = 8;
}
message TagValuesGroupedByMeasurementAndTagKeyRequest {

View File

@ -85,6 +85,7 @@ pub fn read_window_aggregate(
offset,
aggregate,
window,
tag_key_meta_names: TagKeyMetaNames::Text as i32,
})
}

View File

@ -320,12 +320,20 @@ where
let gby_agg = expr::make_read_group_aggregate(aggregate, group, group_keys)
.context(ConvertingReadGroupAggregateSnafu { aggregate_string })?;
let results = query_group_impl(Arc::clone(&db), db_name, range, predicate, gby_agg, &ctx)
.await
.map_err(|e| e.to_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
let results = query_group_impl(
Arc::clone(&db),
db_name,
range,
predicate,
gby_agg,
TagKeyMetaNames::Text,
&ctx,
)
.await
.map_err(|e| e.to_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
if results.iter().all(|r| r.is_ok()) {
query_completed_token.set_success();
@ -374,9 +382,10 @@ where
offset,
aggregate,
window,
tag_key_meta_names,
} = req;
info!(%db_name, ?range, ?window_every, ?offset, ?aggregate, ?window, predicate=%predicate.loggable(), "read_window_aggregate");
info!(%db_name, ?range, ?window_every, ?offset, ?aggregate, ?window, predicate=%predicate.loggable(), ?tag_key_meta_names, "read_window_aggregate");
let aggregate_string = format!(
"aggregate: {:?}, window_every: {:?}, offset: {:?}, window: {:?}",
@ -386,12 +395,20 @@ where
let gby_agg = expr::make_read_window_aggregate(aggregate, window_every, offset, window)
.context(ConvertingWindowAggregateSnafu { aggregate_string })?;
let results = query_group_impl(Arc::clone(&db), db_name, range, predicate, gby_agg, &ctx)
.await
.map_err(|e| e.to_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
let results = query_group_impl(
Arc::clone(&db),
db_name,
range,
predicate,
gby_agg,
TagKeyMetaNames::from_i32(tag_key_meta_names).unwrap_or_default(),
&ctx,
)
.await
.map_err(|e| e.to_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
if results.iter().all(|r| r.is_ok()) {
query_completed_token.set_success();
@ -1225,6 +1242,7 @@ async fn query_group_impl<D>(
range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>,
gby_agg: GroupByAndAggregate,
tag_key_meta_names: TagKeyMetaNames,
ctx: &IOxSessionContext,
) -> Result<Vec<ReadResponse>, Error>
where
@ -1269,9 +1287,8 @@ where
.context(GroupingSeriesSnafu { db_name })
.log_if_error("Running Grouped SeriesSet Plan")?;
// ReadGroupRequest does not have a field to control the format of
// _measurement and _field tag keys, so always request in string format.
let response = series_or_groups_to_read_response(series_or_groups, false);
let tag_key_binary_format = tag_key_meta_names == TagKeyMetaNames::Binary;
let response = series_or_groups_to_read_response(series_or_groups, tag_key_binary_format);
Ok(vec![response])
}
@ -2791,6 +2808,7 @@ mod tests {
}],
// old skool window definition
window: None,
tag_key_meta_names: TagKeyMetaNames::Text as i32,
};
let frames = fixture
@ -2855,6 +2873,7 @@ mod tests {
negative: false,
}),
}),
tag_key_meta_names: TagKeyMetaNames::Text as i32,
};
let frames = fixture
@ -2903,6 +2922,7 @@ mod tests {
}],
// old skool window definition
window: None,
tag_key_meta_names: TagKeyMetaNames::Text as i32,
};
let response_string = fixture
@ -3125,6 +3145,7 @@ mod tests {
negative: false,
}),
}),
tag_key_meta_names: TagKeyMetaNames::Text as i32,
};
let streaming_resp = service
.read_window_aggregate(tonic::Request::new(request))

View File

@ -5,7 +5,7 @@ use generated_types::{
read_group_request::Group,
Aggregate, MeasurementFieldsRequest, MeasurementNamesRequest, MeasurementTagKeysRequest,
MeasurementTagValuesRequest, Node, Predicate, ReadFilterRequest, ReadGroupRequest, ReadSource,
ReadWindowAggregateRequest, TagKeysRequest, TagValuesRequest, TimestampRange,
ReadWindowAggregateRequest, TagKeyMetaNames, TagKeysRequest, TagValuesRequest, TimestampRange,
};
use prost::Message;
@ -346,6 +346,7 @@ impl GrpcRequestBuilder {
offset: self.offset.expect("no offset specified"),
aggregate,
window: None,
tag_key_meta_names: TagKeyMetaNames::Text as i32,
})
}
}