chore: Revert "Sync ReadWindowAggregate API: TagKeyMetaNames" (#5184)
We're noticing a possible regression (OOMs) in our testing cluster that roughly correlates with this.pull/24376/head
parent
e6d68f90b4
commit
07cdb99192
|
|
@ -294,9 +294,8 @@ message ReadWindowAggregateRequest {
|
|||
int64 Offset = 6;
|
||||
repeated Aggregate aggregate = 5;
|
||||
Window window = 7;
|
||||
// TagKeyMetaNames determines the key format used for the measurement and field
|
||||
// tags.
|
||||
TagKeyMetaNames tag_key_meta_names = 8;
|
||||
reserved 8;
|
||||
reserved "tag_key_meta_names";
|
||||
}
|
||||
|
||||
message TagValuesGroupedByMeasurementAndTagKeyRequest {
|
||||
|
|
|
|||
|
|
@ -85,7 +85,6 @@ pub fn read_window_aggregate(
|
|||
offset,
|
||||
aggregate,
|
||||
window,
|
||||
tag_key_meta_names: TagKeyMetaNames::Text as i32,
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -330,20 +330,12 @@ where
|
|||
let gby_agg = expr::make_read_group_aggregate(aggregate, group, group_keys)
|
||||
.context(ConvertingReadGroupAggregateSnafu { aggregate_string })?;
|
||||
|
||||
let results = query_group_impl(
|
||||
Arc::clone(&db),
|
||||
db_name,
|
||||
range,
|
||||
predicate,
|
||||
gby_agg,
|
||||
TagKeyMetaNames::Text,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| e.to_status())?
|
||||
.into_iter()
|
||||
.map(Ok)
|
||||
.collect::<Vec<_>>();
|
||||
let results = query_group_impl(Arc::clone(&db), db_name, range, predicate, gby_agg, &ctx)
|
||||
.await
|
||||
.map_err(|e| e.to_status())?
|
||||
.into_iter()
|
||||
.map(Ok)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if results.iter().all(|r| r.is_ok()) {
|
||||
query_completed_token.set_success();
|
||||
|
|
@ -397,10 +389,9 @@ where
|
|||
offset,
|
||||
aggregate,
|
||||
window,
|
||||
tag_key_meta_names,
|
||||
} = req;
|
||||
|
||||
info!(%db_name, ?range, ?window_every, ?offset, ?aggregate, ?window, predicate=%predicate.loggable(), ?tag_key_meta_names, "read_window_aggregate");
|
||||
info!(%db_name, ?range, ?window_every, ?offset, ?aggregate, ?window, predicate=%predicate.loggable(), "read_window_aggregate");
|
||||
|
||||
let aggregate_string = format!(
|
||||
"aggregate: {:?}, window_every: {:?}, offset: {:?}, window: {:?}",
|
||||
|
|
@ -410,20 +401,12 @@ where
|
|||
let gby_agg = expr::make_read_window_aggregate(aggregate, window_every, offset, window)
|
||||
.context(ConvertingWindowAggregateSnafu { aggregate_string })?;
|
||||
|
||||
let results = query_group_impl(
|
||||
Arc::clone(&db),
|
||||
db_name,
|
||||
range,
|
||||
predicate,
|
||||
gby_agg,
|
||||
TagKeyMetaNames::from_i32(tag_key_meta_names).unwrap_or_default(),
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| e.to_status())?
|
||||
.into_iter()
|
||||
.map(Ok)
|
||||
.collect::<Vec<_>>();
|
||||
let results = query_group_impl(Arc::clone(&db), db_name, range, predicate, gby_agg, &ctx)
|
||||
.await
|
||||
.map_err(|e| e.to_status())?
|
||||
.into_iter()
|
||||
.map(Ok)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
if results.iter().all(|r| r.is_ok()) {
|
||||
query_completed_token.set_success();
|
||||
|
|
@ -1292,7 +1275,6 @@ async fn query_group_impl<D>(
|
|||
range: Option<TimestampRange>,
|
||||
rpc_predicate: Option<Predicate>,
|
||||
gby_agg: GroupByAndAggregate,
|
||||
tag_key_meta_names: TagKeyMetaNames,
|
||||
ctx: &IOxSessionContext,
|
||||
) -> Result<Vec<ReadResponse>, Error>
|
||||
where
|
||||
|
|
@ -1337,8 +1319,9 @@ where
|
|||
.context(GroupingSeriesSnafu { db_name })
|
||||
.log_if_error("Running Grouped SeriesSet Plan")?;
|
||||
|
||||
let tag_key_binary_format = tag_key_meta_names == TagKeyMetaNames::Binary;
|
||||
let response = series_or_groups_to_read_response(series_or_groups, tag_key_binary_format);
|
||||
// ReadGroupRequest does not have a field to control the format of
|
||||
// _measurement and _field tag keys, so always request in string format.
|
||||
let response = series_or_groups_to_read_response(series_or_groups, false);
|
||||
|
||||
Ok(vec![response])
|
||||
}
|
||||
|
|
@ -2858,7 +2841,6 @@ mod tests {
|
|||
}],
|
||||
// old skool window definition
|
||||
window: None,
|
||||
tag_key_meta_names: TagKeyMetaNames::Text as i32,
|
||||
};
|
||||
|
||||
let frames = fixture
|
||||
|
|
@ -2923,7 +2905,6 @@ mod tests {
|
|||
negative: false,
|
||||
}),
|
||||
}),
|
||||
tag_key_meta_names: TagKeyMetaNames::Text as i32,
|
||||
};
|
||||
|
||||
let frames = fixture
|
||||
|
|
@ -2972,7 +2953,6 @@ mod tests {
|
|||
}],
|
||||
// old skool window definition
|
||||
window: None,
|
||||
tag_key_meta_names: TagKeyMetaNames::Text as i32,
|
||||
};
|
||||
|
||||
let response_string = fixture
|
||||
|
|
@ -3195,7 +3175,6 @@ mod tests {
|
|||
negative: false,
|
||||
}),
|
||||
}),
|
||||
tag_key_meta_names: TagKeyMetaNames::Text as i32,
|
||||
};
|
||||
let streaming_resp = service
|
||||
.read_window_aggregate(tonic::Request::new(request))
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ use generated_types::{
|
|||
read_group_request::Group,
|
||||
Aggregate, MeasurementFieldsRequest, MeasurementNamesRequest, MeasurementTagKeysRequest,
|
||||
MeasurementTagValuesRequest, Node, Predicate, ReadFilterRequest, ReadGroupRequest, ReadSource,
|
||||
ReadWindowAggregateRequest, TagKeyMetaNames, TagKeysRequest, TagValuesRequest, TimestampRange,
|
||||
ReadWindowAggregateRequest, TagKeysRequest, TagValuesRequest, TimestampRange,
|
||||
};
|
||||
use prost::Message;
|
||||
|
||||
|
|
@ -346,7 +346,6 @@ impl GrpcRequestBuilder {
|
|||
offset: self.offset.expect("no offset specified"),
|
||||
aggregate,
|
||||
window: None,
|
||||
tag_key_meta_names: TagKeyMetaNames::Text as i32,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue