fix: Refactor storage to properly handle binary and text meta tags (#7012)

* fix: Refactor storage to properly handle binary and text meta tags

* fix: placate linter
pull/24376/head
Stuart Carnie 2023-02-20 08:12:41 +11:00 committed by GitHub
parent d82d00b847
commit b1b7865d35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 208 additions and 104 deletions

View File

@ -1,6 +1,7 @@
pub(crate) mod request;
pub(crate) mod response;
use crate::commands::storage::response::{BinaryTagSchema, TextTagSchema};
use generated_types::{
aggregate::AggregateType, influxdata::platform::storage::read_group_request::Group, Predicate,
};
@ -197,7 +198,7 @@ struct ReadGroup {
)]
group: Group,
#[clap(long, action)]
#[clap(long, action, value_delimiter = ',')]
group_keys: Vec<String>,
}
@ -303,7 +304,8 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
info!(?request, "read_filter");
let result = client.read_filter(request).await.context(ServerSnafu)?;
match config.format {
Format::Pretty => response::pretty_print_frames(&result).context(ResponseSnafu)?,
Format::Pretty => response::pretty_print_frames::<BinaryTagSchema>(&result)
.context(ResponseSnafu)?,
Format::Quiet => {}
}
}
@ -320,7 +322,8 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
info!(?request, "read_group");
let result = client.read_group(request).await.context(ServerSnafu)?;
match config.format {
Format::Pretty => response::pretty_print_frames(&result).context(ResponseSnafu)?,
Format::Pretty => response::pretty_print_frames::<TextTagSchema>(&result)
.context(ResponseSnafu)?,
Format::Quiet => {}
}
}
@ -343,7 +346,8 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
.context(ServerSnafu)?;
match config.format {
Format::Pretty => response::pretty_print_frames(&result).context(ResponseSnafu)?,
Format::Pretty => response::pretty_print_frames::<TextTagSchema>(&result)
.context(ResponseSnafu)?,
Format::Quiet => {}
}
}

View File

@ -6,7 +6,7 @@ use snafu::Snafu;
use self::generated_types::*;
use super::response::{
tag_key_is_field, tag_key_is_measurement, FIELD_TAG_KEY_BIN, MEASUREMENT_TAG_KEY_BIN,
FIELD_TAG_KEY_BIN, FIELD_TAG_KEY_TEXT, MEASUREMENT_TAG_KEY_BIN, MEASUREMENT_TAG_KEY_TEXT,
};
use ::generated_types::{aggregate::AggregateType, google::protobuf::*};
@ -59,7 +59,7 @@ pub fn read_filter(
read_source: Some(org_bucket),
range: Some(TimestampRange { start, end: stop }),
key_sort: read_filter_request::KeySort::Unspecified as i32, // IOx doesn't support any other sort
tag_key_meta_names: TagKeyMetaNames::Text as i32,
tag_key_meta_names: TagKeyMetaNames::Binary as i32,
}
}
@ -146,6 +146,14 @@ pub fn tag_values(
}
}
pub(crate) fn tag_key_is_measurement(key: &[u8]) -> bool {
(key == MEASUREMENT_TAG_KEY_TEXT) || (key == MEASUREMENT_TAG_KEY_BIN)
}
pub(crate) fn tag_key_is_field(key: &[u8]) -> bool {
(key == FIELD_TAG_KEY_TEXT) || (key == FIELD_TAG_KEY_BIN)
}
#[cfg(test)]
mod test_super {
use std::num::NonZeroU64;

View File

@ -39,8 +39,8 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
// Prints the provided data frames in a tabular format grouped into tables per
// distinct measurement.
pub fn pretty_print_frames(frames: &[Data]) -> Result<()> {
let rbs = frames_to_record_batches(frames)?;
pub fn pretty_print_frames<T: TagSchema>(frames: &[Data]) -> Result<()> {
let rbs = frames_to_record_batches::<T>(frames)?;
for (k, rb) in rbs {
println!("\n_measurement: {k}");
println!("rows: {:?}\n", &rb.num_rows());
@ -72,18 +72,16 @@ pub fn pretty_print_strings(values: Vec<String>) -> Result<()> {
// This function takes a set of InfluxRPC data frames and converts them into an
// Arrow record batches, which are suitable for pretty printing.
fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBatch>> {
fn frames_to_record_batches<T: TagSchema>(
frames: &[Data],
) -> Result<BTreeMap<String, RecordBatch>> {
// Run through all the frames once to build the schema of each table we need
// to build as a record batch.
let mut table_column_mapping = determine_tag_columns(frames);
let mut table_column_mapping = determine_tag_columns::<T>(frames);
let mut all_tables = BTreeMap::new();
let mut current_table_frame: Option<(IntermediateTable, SeriesFrame)> = None;
if frames.is_empty() {
return Ok(all_tables);
}
for frame in frames {
match frame {
generated_types::read_response::frame::Data::Group(_) => {
@ -93,7 +91,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
.fail();
}
generated_types::read_response::frame::Data::Series(sf) => {
let cur_frame_measurement = &sf.tags[0].value;
let cur_frame_measurement = T::measurement(sf);
// First series frame in result set.
if current_table_frame.is_none() {
@ -113,10 +111,10 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
// Series frame has moved on to a different measurement. Push
// this table into a record batch and onto final results, then
// create a new table.
if measurement(&prev_series_frame) != cur_frame_measurement {
if T::measurement(&prev_series_frame) != cur_frame_measurement {
let rb: RecordBatch = current_table.try_into()?;
all_tables.insert(
String::from_utf8(measurement(&prev_series_frame).to_owned())
String::from_utf8(T::measurement(&prev_series_frame).to_owned())
.context(InvalidMeasurementNameSnafu)?,
rb,
);
@ -142,7 +140,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
generated_types::read_response::frame::Data::FloatPoints(f) => {
// Get field key associated with previous series frame.
let (current_table, prev_series_frame) = current_table_frame.as_mut().unwrap();
let column = current_table.field_column(field_name(prev_series_frame));
let column = current_table.field_column(T::field_name(prev_series_frame));
let values = f.values.iter().copied().map(Some).collect::<Vec<_>>();
column.extend_f64(&values);
@ -153,7 +151,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
generated_types::read_response::frame::Data::IntegerPoints(f) => {
// Get field key associated with previous series frame.
let (current_table, prev_series_frame) = current_table_frame.as_mut().unwrap();
let column = current_table.field_column(field_name(prev_series_frame));
let column = current_table.field_column(T::field_name(prev_series_frame));
let values = f.values.iter().copied().map(Some).collect::<Vec<_>>();
column.extend_i64(&values);
@ -164,7 +162,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
generated_types::read_response::frame::Data::UnsignedPoints(f) => {
// Get field key associated with previous series frame.
let (current_table, prev_series_frame) = current_table_frame.as_mut().unwrap();
let column = current_table.field_column(field_name(prev_series_frame));
let column = current_table.field_column(T::field_name(prev_series_frame));
let values = f.values.iter().copied().map(Some).collect::<Vec<_>>();
column.extend_u64(&values);
@ -175,7 +173,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
generated_types::read_response::frame::Data::BooleanPoints(f) => {
// Get field key associated with previous series frame.
let (current_table, prev_series_frame) = current_table_frame.as_mut().unwrap();
let column = current_table.field_column(field_name(prev_series_frame));
let column = current_table.field_column(T::field_name(prev_series_frame));
let values = f.values.iter().copied().map(Some).collect::<Vec<_>>();
column.extend_bool(&values);
@ -186,7 +184,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
generated_types::read_response::frame::Data::StringPoints(f) => {
// Get field key associated with previous series frame.
let (current_table, prev_series_frame) = current_table_frame.as_mut().unwrap();
let column = current_table.field_column(field_name(prev_series_frame));
let column = current_table.field_column(T::field_name(prev_series_frame));
let values = f
.values
@ -209,11 +207,7 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
// Pad all tag columns with keys present in the previous series frame
// with identical values.
for Tag { key, value } in &prev_series_frame.tags {
if tag_key_is_measurement(key) || tag_key_is_field(key) {
continue;
}
for Tag { ref key, value } in T::tags(prev_series_frame) {
let idx = current_table
.tag_columns
.get(key)
@ -250,13 +244,14 @@ fn frames_to_record_batches(frames: &[Data]) -> Result<BTreeMap<String, RecordBa
}
// Convert and insert current table
let (current_table, prev_series_frame) = current_table_frame.take().unwrap();
let rb: RecordBatch = current_table.try_into()?;
all_tables.insert(
String::from_utf8(measurement(&prev_series_frame).to_owned())
.context(InvalidMeasurementNameSnafu)?,
rb,
);
if let Some((current_table, prev_series_frame)) = current_table_frame.take() {
let rb: RecordBatch = current_table.try_into()?;
all_tables.insert(
String::from_utf8(T::measurement(&prev_series_frame).to_owned())
.context(InvalidMeasurementNameSnafu)?,
rb,
);
}
Ok(all_tables)
}
@ -479,44 +474,31 @@ impl TryFrom<IntermediateTable> for RecordBatch {
// These constants describe known values for the keys associated with
// measurements and fields.
const MEASUREMENT_TAG_KEY_TEXT: [u8; 12] = [
b'_', b'm', b'e', b'a', b's', b'u', b'r', b'e', b'm', b'e', b'n', b't',
];
pub(crate) const MEASUREMENT_TAG_KEY_TEXT: [u8; 12] = *b"_measurement";
pub(crate) const MEASUREMENT_TAG_KEY_BIN: [u8; 1] = [0_u8];
const FIELD_TAG_KEY_TEXT: [u8; 6] = [b'_', b'f', b'i', b'e', b'l', b'd'];
pub(crate) const FIELD_TAG_KEY_TEXT: [u8; 6] = *b"_field";
pub(crate) const FIELD_TAG_KEY_BIN: [u8; 1] = [255_u8];
// Store a collection of column names and types for a single table (measurement).
#[derive(Debug, Default, PartialEq, Eq)]
struct TableColumns {
pub struct TableColumns {
tag_columns: BTreeSet<Vec<u8>>,
field_columns: BTreeMap<Vec<u8>, DataType>,
}
// Given a set of data frames determine from the series frames within the set
// of tag columns for each distinct table (measurement).
fn determine_tag_columns(frames: &[Data]) -> BTreeMap<Vec<u8>, TableColumns> {
fn determine_tag_columns<T: TagSchema>(frames: &[Data]) -> BTreeMap<Vec<u8>, TableColumns> {
let mut schema: BTreeMap<Vec<u8>, TableColumns> = BTreeMap::new();
for frame in frames {
if let Data::Series(sf) = frame {
if let Data::Series(ref sf) = frame {
assert!(!sf.tags.is_empty(), "expected _measurement and _field tags");
// PERF: avoid clone of value
let measurement_name = sf
.tags
.iter()
.find(|t| tag_key_is_measurement(&t.key))
.expect("measurement name not found")
.value
.clone();
let measurement_name = T::measurement(sf).clone();
let table = schema.entry(measurement_name).or_default();
for Tag { key, value } in sf.tags.iter().skip(1) {
if tag_key_is_field(key) {
table.field_columns.insert(value.clone(), sf.data_type());
continue;
}
let field_name = T::field_name(sf).clone();
table.field_columns.insert(field_name, sf.data_type());
for Tag { key, .. } in T::tags(sf) {
// PERF: avoid clone of key
table.tag_columns.insert(key.clone()); // Add column to table schema
}
@ -525,25 +507,67 @@ fn determine_tag_columns(frames: &[Data]) -> BTreeMap<Vec<u8>, TableColumns> {
schema
}
// Extract a reference to the measurement name from a Series frame.
fn measurement(frame: &SeriesFrame) -> &Vec<u8> {
assert!(tag_key_is_measurement(&frame.tags[0].key));
&frame.tags[0].value
pub trait TagSchema {
type IntoIter<'a>: Iterator<Item = &'a Tag>;
/// Returns the value of the measurement meta tag.
fn measurement(frame: &SeriesFrame) -> &Vec<u8>;
/// Returns the value of the field meta tag.
fn field_name(frame: &SeriesFrame) -> &Vec<u8>;
/// Returns the tags without the measurement or field meta tags.
fn tags(frame: &SeriesFrame) -> Self::IntoIter<'_>;
}
// Extract a reference to the field name from a Series frame.
fn field_name(frame: &SeriesFrame) -> &Vec<u8> {
let idx = frame.tags.len() - 1;
assert!(tag_key_is_field(&frame.tags[idx].key));
&frame.tags[idx].value
pub struct BinaryTagSchema;
impl TagSchema for BinaryTagSchema {
type IntoIter<'a> = std::slice::Iter<'a, Tag>;
fn measurement(frame: &SeriesFrame) -> &Vec<u8> {
assert_eq!(frame.tags[0].key, MEASUREMENT_TAG_KEY_BIN);
&frame.tags[0].value
}
fn field_name(frame: &SeriesFrame) -> &Vec<u8> {
let idx = frame.tags.len() - 1;
assert_eq!(frame.tags[idx].key, FIELD_TAG_KEY_BIN);
&frame.tags[idx].value
}
fn tags(frame: &SeriesFrame) -> Self::IntoIter<'_> {
frame.tags[1..frame.tags.len() - 1].iter()
}
}
pub(crate) fn tag_key_is_measurement(key: &[u8]) -> bool {
(key == MEASUREMENT_TAG_KEY_TEXT) || (key == MEASUREMENT_TAG_KEY_BIN)
}
pub struct TextTagSchema;
pub(crate) fn tag_key_is_field(key: &[u8]) -> bool {
(key == FIELD_TAG_KEY_TEXT) || (key == FIELD_TAG_KEY_BIN)
impl TagSchema for TextTagSchema {
type IntoIter<'a> = iter::Filter<std::slice::Iter<'a, Tag>, fn(&&Tag) -> bool>;
fn measurement(frame: &SeriesFrame) -> &Vec<u8> {
let idx = frame
.tags
.binary_search_by(|t| t.key[..].cmp(&MEASUREMENT_TAG_KEY_TEXT[..]))
.expect("missing measurement");
&frame.tags[idx].value
}
fn field_name(frame: &SeriesFrame) -> &Vec<u8> {
let idx = frame
.tags
.binary_search_by(|t| t.key[..].cmp(&FIELD_TAG_KEY_TEXT[..]))
.expect("missing field");
&frame.tags[idx].value
}
fn tags(frame: &SeriesFrame) -> Self::IntoIter<'_> {
frame
.tags
.iter()
.filter(|t| t.key != MEASUREMENT_TAG_KEY_TEXT && t.key != FIELD_TAG_KEY_TEXT)
}
}
#[cfg(test)]
@ -553,15 +577,17 @@ mod test_super {
BooleanPointsFrame, FloatPointsFrame, IntegerPointsFrame, SeriesFrame, StringPointsFrame,
UnsignedPointsFrame,
};
use itertools::Itertools;
use super::*;
// converts a vector of key/value pairs into a vector of `Tag`.
fn make_tags(pairs: &[(&str, &str)]) -> Vec<Tag> {
/// Converts a vector of `(key, value)` tuples into a vector of `Tag`, sorted by key.
fn make_tags(pairs: &[(&[u8], &str)]) -> Vec<Tag> {
pairs
.iter()
.sorted_by(|(a_key, _), (b_key, _)| Ord::cmp(a_key, b_key))
.map(|(key, value)| Tag {
key: key.as_bytes().to_vec(),
key: key.to_vec(),
value: value.as_bytes().to_vec(),
})
.collect::<Vec<_>>()
@ -618,15 +644,32 @@ mod test_super {
all_table_columns
}
trait KeyNames {
const MEASUREMENT_KEY: &'static [u8];
const FIELD_KEY: &'static [u8];
}
struct BinaryKeyNames;
impl KeyNames for BinaryKeyNames {
const MEASUREMENT_KEY: &'static [u8] = &[0_u8];
const FIELD_KEY: &'static [u8] = &[255_u8];
}
struct TextKeyNames;
impl KeyNames for TextKeyNames {
const MEASUREMENT_KEY: &'static [u8] = b"_measurement";
const FIELD_KEY: &'static [u8] = b"_field";
}
// generate a substantial set of frames across multiple tables.
fn gen_frames() -> Vec<Data> {
fn gen_frames<K: KeyNames>() -> Vec<Data> {
vec![
Data::Series(SeriesFrame {
tags: make_tags(&[
("_measurement", "cpu"),
("host", "foo"),
("server", "a"),
("_field", "temp"),
(K::MEASUREMENT_KEY, "cpu"),
(b"host", "foo"),
(b"server", "a"),
(K::FIELD_KEY, "temp"),
]),
data_type: DataType::Float as i32,
}),
@ -640,10 +683,10 @@ mod test_super {
}),
Data::Series(SeriesFrame {
tags: make_tags(&[
("_measurement", "cpu"),
("host", "foo"),
("server", "a"),
("_field", "voltage"),
(K::MEASUREMENT_KEY, "cpu"),
(b"host", "foo"),
(b"server", "a"),
(K::FIELD_KEY, "voltage"),
]),
data_type: DataType::Integer as i32,
}),
@ -653,10 +696,10 @@ mod test_super {
}),
Data::Series(SeriesFrame {
tags: make_tags(&[
("_measurement", "cpu"),
("host", "foo"),
("new_column", "a"),
("_field", "voltage"),
(K::MEASUREMENT_KEY, "cpu"),
(b"host", "foo"),
(b"new_column", "a"),
(K::FIELD_KEY, "voltage"),
]),
data_type: DataType::Integer as i32,
}),
@ -665,7 +708,10 @@ mod test_super {
values: vec![1000, 2000],
}),
Data::Series(SeriesFrame {
tags: make_tags(&[("_measurement", "another table"), ("_field", "voltage")]),
tags: make_tags(&[
(K::MEASUREMENT_KEY, "another table"),
(K::FIELD_KEY, "voltage"),
]),
data_type: DataType::String as i32,
}),
Data::StringPoints(StringPointsFrame {
@ -674,9 +720,9 @@ mod test_super {
}),
Data::Series(SeriesFrame {
tags: make_tags(&[
("_measurement", "another table"),
("region", "west"),
("_field", "voltage"),
(K::MEASUREMENT_KEY, "another table"),
(b"region", "west"),
(K::FIELD_KEY, "voltage"),
]),
data_type: DataType::String as i32,
}),
@ -686,9 +732,9 @@ mod test_super {
}),
Data::Series(SeriesFrame {
tags: make_tags(&[
("_measurement", "another table"),
("region", "north"),
("_field", "bool_field"),
(K::MEASUREMENT_KEY, "another table"),
(b"region", "north"),
(K::FIELD_KEY, "bool_field"),
]),
data_type: DataType::Boolean as i32,
}),
@ -698,9 +744,9 @@ mod test_super {
}),
Data::Series(SeriesFrame {
tags: make_tags(&[
("_measurement", "another table"),
("region", "south"),
("_field", "unsigned_field"),
(K::MEASUREMENT_KEY, "another table"),
(b"region", "south"),
(K::FIELD_KEY, "unsigned_field"),
]),
data_type: DataType::Unsigned as i32,
}),
@ -712,11 +758,15 @@ mod test_super {
}
#[test]
fn test_determine_tag_columns() {
assert!(determine_tag_columns(&[]).is_empty());
fn test_binary_determine_tag_columns() {
assert!(determine_tag_columns::<BinaryTagSchema>(&[]).is_empty());
let frame = Data::Series(SeriesFrame {
tags: make_tags(&[("_measurement", "cpu"), ("server", "a"), ("_field", "temp")]),
tags: make_tags(&[
(BinaryKeyNames::MEASUREMENT_KEY, "cpu"),
(b"server", "a"),
(BinaryKeyNames::FIELD_KEY, "temp"),
]),
data_type: DataType::Float as i32,
});
@ -725,10 +775,10 @@ mod test_super {
&["server"],
&[("temp", DataType::Float)],
)]);
assert_eq!(determine_tag_columns(&[frame]), exp);
assert_eq!(determine_tag_columns::<BinaryTagSchema>(&[frame]), exp);
// larger example
let frames = gen_frames();
let frames = gen_frames::<BinaryKeyNames>();
let exp = make_table_columns(&[
TableColumnInput::new(
@ -746,14 +796,56 @@ mod test_super {
],
),
]);
assert_eq!(determine_tag_columns(&frames), exp);
assert_eq!(determine_tag_columns::<BinaryTagSchema>(&frames), exp);
}
#[test]
fn test_text_determine_tag_columns() {
assert!(determine_tag_columns::<TextTagSchema>(&[]).is_empty());
let frame = Data::Series(SeriesFrame {
tags: make_tags(&[
(b"_measurement", "cpu"),
(b"server", "a"),
(b"_field", "temp"),
]),
data_type: DataType::Float as i32,
});
let exp = make_table_columns(&[TableColumnInput::new(
"cpu",
&["server"],
&[("temp", DataType::Float)],
)]);
assert_eq!(determine_tag_columns::<TextTagSchema>(&[frame]), exp);
// larger example
let frames = gen_frames::<TextKeyNames>();
let exp = make_table_columns(&[
TableColumnInput::new(
"cpu",
&["host", "new_column", "server"],
&[("temp", DataType::Float), ("voltage", DataType::Integer)],
),
TableColumnInput::new(
"another table",
&["region"],
&[
("bool_field", DataType::Boolean),
("unsigned_field", DataType::Unsigned),
("voltage", DataType::String),
],
),
]);
assert_eq!(determine_tag_columns::<TextTagSchema>(&frames), exp);
}
#[test]
fn test_frames_to_into_record_batches() {
let frames = gen_frames();
let frames = gen_frames::<TextKeyNames>();
let rbs = frames_to_record_batches(&frames);
let rbs = frames_to_record_batches::<TextTagSchema>(&frames);
let exp = vec![
(
"another table",