feat: add support for binary tag key format
parent
dde1db40d1
commit
3a7cb119c6
|
@ -6,9 +6,12 @@ use std::{collections::BTreeSet, fmt, sync::Arc};
|
|||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
|
||||
use observability_deps::tracing::trace;
|
||||
use query::exec::{
|
||||
fieldlist::FieldList,
|
||||
seriesset::series::{self, Either},
|
||||
use query::{
|
||||
exec::{
|
||||
fieldlist::FieldList,
|
||||
seriesset::series::{self, Either},
|
||||
},
|
||||
frontend::influxrpc::{FIELD_COLUMN_NAME, MEASUREMENT_COLUMN_NAME},
|
||||
};
|
||||
|
||||
use generated_types::{
|
||||
|
@ -70,13 +73,20 @@ pub fn tag_keys_to_byte_vecs(tag_keys: Arc<BTreeSet<String>>) -> Vec<Vec<u8>> {
|
|||
/// ```
|
||||
///
|
||||
/// The specific type of (*Points) depends on the type of field column.
|
||||
pub fn series_or_groups_to_read_response(series_or_groups: Vec<Either>) -> ReadResponse {
|
||||
///
|
||||
/// If `tag_key_binary_format` is `true` then tag keys for measurements and
|
||||
/// fields are emitted in the canonical TSM format represented by `\x00` and
|
||||
/// `\xff` respectively.
|
||||
pub fn series_or_groups_to_read_response(
|
||||
series_or_groups: Vec<Either>,
|
||||
tag_key_binary_format: bool,
|
||||
) -> ReadResponse {
|
||||
let mut frames = vec![];
|
||||
|
||||
for series_or_group in series_or_groups {
|
||||
match series_or_group {
|
||||
Either::Series(series) => {
|
||||
series_to_frames(&mut frames, series);
|
||||
series_to_frames(&mut frames, series, tag_key_binary_format);
|
||||
}
|
||||
Either::Group(group) => {
|
||||
frames.push(group_to_frame(group));
|
||||
|
@ -89,7 +99,7 @@ pub fn series_or_groups_to_read_response(series_or_groups: Vec<Either>) -> ReadR
|
|||
}
|
||||
|
||||
/// Converts a `Series` into frames for GRPC transport
|
||||
fn series_to_frames(frames: &mut Vec<Frame>, series: series::Series) {
|
||||
fn series_to_frames(frames: &mut Vec<Frame>, series: series::Series, tag_key_binary_format: bool) {
|
||||
let series::Series { tags, data } = series;
|
||||
|
||||
let (data_type, data_frame) = match data {
|
||||
|
@ -116,7 +126,7 @@ fn series_to_frames(frames: &mut Vec<Frame>, series: series::Series) {
|
|||
};
|
||||
|
||||
let series_frame = Data::Series(SeriesFrame {
|
||||
tags: convert_tags(tags),
|
||||
tags: convert_tags(tags, tag_key_binary_format),
|
||||
data_type: data_type.into(),
|
||||
});
|
||||
|
||||
|
@ -147,10 +157,17 @@ fn group_to_frame(group: series::Group) -> Frame {
|
|||
}
|
||||
|
||||
/// Convert the tag=value pairs from Arc<str> to Vec<u8> for gRPC transport
|
||||
fn convert_tags(tags: Vec<series::Tag>) -> Vec<Tag> {
|
||||
fn convert_tags(tags: Vec<series::Tag>, tag_key_binary_format: bool) -> Vec<Tag> {
|
||||
tags.into_iter()
|
||||
.map(|series::Tag { key, value }| Tag {
|
||||
key: key.bytes().collect(),
|
||||
key: match tag_key_binary_format {
|
||||
true => match key.as_ref() {
|
||||
MEASUREMENT_COLUMN_NAME => vec![0_u8],
|
||||
FIELD_COLUMN_NAME => vec![255_u8],
|
||||
_ => key.bytes().collect(),
|
||||
},
|
||||
false => key.bytes().collect(),
|
||||
},
|
||||
value: value.bytes().collect(),
|
||||
})
|
||||
.collect()
|
||||
|
@ -363,10 +380,8 @@ mod tests {
|
|||
.expect("Correctly converted series set");
|
||||
let series: Vec<Either> = series.into_iter().map(|s| s.into()).collect();
|
||||
|
||||
let response = series_or_groups_to_read_response(series);
|
||||
|
||||
let response = series_or_groups_to_read_response(series.clone(), false);
|
||||
let dumped_frames = dump_frames(&response.frames);
|
||||
|
||||
let expected_frames = vec![
|
||||
"SeriesFrame, tags: _measurement=the_table,tag1=val1,_field=string_field, type: 4",
|
||||
"StringPointsFrame, timestamps: [2000, 3000], values: bar,baz",
|
||||
|
@ -385,6 +400,31 @@ mod tests {
|
|||
"Expected:\n{:#?}\nActual:\n{:#?}",
|
||||
expected_frames, dumped_frames
|
||||
);
|
||||
|
||||
//
|
||||
// Convert using binary tag key format.
|
||||
//
|
||||
|
||||
let response = series_or_groups_to_read_response(series, true);
|
||||
let dumped_frames = dump_frames(&response.frames);
|
||||
let expected_frames = vec![
|
||||
"SeriesFrame, tags: \x00=the_table,tag1=val1,<2C>=string_field, type: 4",
|
||||
"StringPointsFrame, timestamps: [2000, 3000], values: bar,baz",
|
||||
"SeriesFrame, tags: \x00=the_table,tag1=val1,<2C>=int_field, type: 1",
|
||||
"IntegerPointsFrame, timestamps: [2000, 3000], values: \"2,3\"",
|
||||
"SeriesFrame, tags: \x00=the_table,tag1=val1,<2C>=uint_field, type: 2",
|
||||
"UnsignedPointsFrame, timestamps: [2000, 3000], values: \"22,33\"",
|
||||
"SeriesFrame, tags: \x00=the_table,tag1=val1,<2C>=float_field, type: 0",
|
||||
"FloatPointsFrame, timestamps: [2000, 3000], values: \"20.1,30.1\"",
|
||||
"SeriesFrame, tags: \x00=the_table,tag1=val1,<2C>=boolean_field, type: 3",
|
||||
"BooleanPointsFrame, timestamps: [2000, 3000], values: false,true",
|
||||
];
|
||||
|
||||
assert_eq!(
|
||||
dumped_frames, expected_frames,
|
||||
"Expected:\n{:#?}\nActual:\n{:#?}",
|
||||
expected_frames, dumped_frames
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -399,7 +439,7 @@ mod tests {
|
|||
partition_key_vals: vec![Arc::from("val1"), Arc::from("val2")],
|
||||
};
|
||||
|
||||
let response = series_or_groups_to_read_response(vec![group.into()]);
|
||||
let response = series_or_groups_to_read_response(vec![group.into()], false);
|
||||
|
||||
let dumped_frames = dump_frames(&response.frames);
|
||||
|
||||
|
|
|
@ -15,8 +15,8 @@ use generated_types::{
|
|||
Int64ValuesResponse, MeasurementFieldsRequest, MeasurementFieldsResponse,
|
||||
MeasurementNamesRequest, MeasurementTagKeysRequest, MeasurementTagValuesRequest, Predicate,
|
||||
ReadFilterRequest, ReadGroupRequest, ReadResponse, ReadSeriesCardinalityRequest,
|
||||
ReadWindowAggregateRequest, StringValuesResponse, TagKeysRequest, TagValuesRequest,
|
||||
TimestampRange,
|
||||
ReadWindowAggregateRequest, StringValuesResponse, TagKeyMetaNames, TagKeysRequest,
|
||||
TagValuesRequest, TimestampRange,
|
||||
};
|
||||
use observability_deps::tracing::{error, info};
|
||||
use predicate::predicate::PredicateBuilder;
|
||||
|
@ -892,7 +892,8 @@ where
|
|||
.context(FilteringSeries { db_name })
|
||||
.log_if_error("Running series set plan")?;
|
||||
|
||||
let response = series_or_groups_to_read_response(series_or_groups);
|
||||
let emit_tag_keys_binary_format = req.tag_key_meta_names == TagKeyMetaNames::Binary as i32;
|
||||
let response = series_or_groups_to_read_response(series_or_groups, emit_tag_keys_binary_format);
|
||||
|
||||
Ok(vec![response])
|
||||
}
|
||||
|
@ -956,7 +957,9 @@ where
|
|||
})
|
||||
.log_if_error("Running Grouped SeriesSet Plan")?;
|
||||
|
||||
let response = series_or_groups_to_read_response(series_or_groups);
|
||||
// ReadGroupRequest does not have a field to control the format of
|
||||
// _measurement and _field tag keys, so always request in string format.
|
||||
let response = series_or_groups_to_read_response(series_or_groups, false);
|
||||
|
||||
Ok(vec![response])
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue