diff --git a/generated_types/storage_common.proto b/generated_types/storage_common.proto index ef31615a39..0a1c16e83d 100644 --- a/generated_types/storage_common.proto +++ b/generated_types/storage_common.proto @@ -175,7 +175,10 @@ message TagValuesRequest { google.protobuf.Any tags_source = 1 ; TimestampRange range = 2; // [(gogoproto.nullable) = false]; Predicate predicate = 3; - string tag_key = 4; + + // string tag_key = 4; + // AAL changed from string --> bytes to handle \xff literals in Rust which are not valid UTF-8 + bytes tag_key = 4; } // Response message for Storage.TagKeys, Storage.TagValues Storage.MeasurementNames, diff --git a/src/server/rpc.rs b/src/server/rpc.rs index 61a9375945..9e0d94d9ad 100644 --- a/src/server/rpc.rs +++ b/src/server/rpc.rs @@ -1,5 +1,13 @@ //! This module contains gRPC service implementatations +/// `[0x00]` is the magic value that that the storage gRPC layer uses to +/// encode a tag_key that means "measurement name" +pub(crate) const TAG_KEY_MEASUREMENT: &[u8] = &[0]; + +/// `[0xff]` is is the magic value that that the storage gRPC layer uses +/// to encode a tag_key that means "field name" +pub(crate) const TAG_KEY_FIELD: &[u8] = &[255]; + pub mod data; pub mod expr; pub mod input; diff --git a/src/server/rpc/data.rs b/src/server/rpc/data.rs index 9e1735636c..e42270c8cf 100644 --- a/src/server/rpc/data.rs +++ b/src/server/rpc/data.rs @@ -22,6 +22,7 @@ use generated_types::{ MeasurementFieldsResponse, ReadResponse, Tag, }; +use super::{TAG_KEY_FIELD, TAG_KEY_MEASUREMENT}; use snafu::Snafu; #[derive(Debug, Snafu)] @@ -35,16 +36,20 @@ pub enum Error { pub type Result = std::result::Result; -/// Convert a set of tag_keys into a form suitable for gRPC transport +/// Convert a set of tag_keys into a form suitable for gRPC transport, +/// adding the special 0x00 (_m) and 0xff (_f) tag keys /// /// Namely, a Vec>, including the measurement and field names pub fn tag_keys_to_byte_vecs(tag_keys: Arc>) -> Vec> { - // special case measurement and field - let specials_iter = vec![b"_field".to_vec(), b"_measurement".to_vec()].into_iter(); - - let tag_keys_iter = tag_keys.iter().map(|name| name.bytes().collect()); - - specials_iter.chain(tag_keys_iter).collect() + // special case measurement (0x00) and field (0xff) + // ensuring they are in the correct sort order (first and last, respectively) + let mut byte_vecs = Vec::with_capacity(2 + tag_keys.len()); + byte_vecs.push(TAG_KEY_MEASUREMENT.to_vec()); // Shown as _m == _measurement + tag_keys.iter().for_each(|name| { + byte_vecs.push(name.bytes().collect()); + }); + byte_vecs.push(TAG_KEY_FIELD.to_vec()); // Shown as _f == _field + byte_vecs } fn series_set_to_frames(series_set: SeriesSet) -> Result> { @@ -305,6 +310,33 @@ mod tests { use super::*; + #[test] + fn test_tag_keys_to_byte_vecs() { + fn convert_keys(tag_keys: &[&str]) -> Vec> { + let tag_keys = tag_keys + .iter() + .map(|s| s.to_string()) + .collect::>(); + + tag_keys_to_byte_vecs(Arc::new(tag_keys)) + } + + assert_eq!(convert_keys(&[]), vec![[0].to_vec(), [255].to_vec()]); + assert_eq!( + convert_keys(&["key_a"]), + vec![[0].to_vec(), b"key_a".to_vec(), [255].to_vec()] + ); + assert_eq!( + convert_keys(&["key_a", "key_b"]), + vec![ + [0].to_vec(), + b"key_a".to_vec(), + b"key_b".to_vec(), + [255].to_vec() + ] + ); + } + fn series_set_to_read_response(series_set: SeriesSet) -> Result { let frames = series_set_to_frames(series_set)?; Ok(ReadResponse { frames }) diff --git a/src/server/rpc/expr.rs b/src/server/rpc/expr.rs index 9fc0e83f20..68a133556a 100644 --- a/src/server/rpc/expr.rs +++ b/src/server/rpc/expr.rs @@ -16,8 +16,9 @@ use generated_types::{ Aggregate as RPCAggregate, Duration as RPCDuration, Node as RPCNode, Predicate as RPCPredicate, Window as RPCWindow, }; -use query::group_by::{Aggregate as QueryAggregate, GroupByAndAggregate, WindowDuration}; +use super::{TAG_KEY_FIELD, TAG_KEY_MEASUREMENT}; +use query::group_by::{Aggregate as QueryAggregate, GroupByAndAggregate, WindowDuration}; use query::predicate::PredicateBuilder; use snafu::{ResultExt, Snafu}; use tracing::warn; @@ -350,27 +351,19 @@ pub trait SpecialTagKeys { impl SpecialTagKeys for Vec { fn is_measurement(&self) -> bool { - self.as_slice() == [0] + self.as_slice() == TAG_KEY_MEASUREMENT } /// Return true if this tag key actually refers to a field /// name (e.g. _field or _f) fn is_field(&self) -> bool { - self.as_slice() == [255] + self.as_slice() == TAG_KEY_FIELD } } -impl SpecialTagKeys for String { - fn is_measurement(&self) -> bool { - self.as_bytes() == [0] - } - - /// Return true if this tag key actually refers to a field - /// name (e.g. _field or _f) - fn is_field(&self) -> bool { - self.as_bytes() == [255] - } -} +// Note that is_field can *NEVER* return true for a `String` because 0xff +// is not a valid UTF-8 character, and thus can not be a valid Rust +// String. // converts a Node from the RPC layer into a datafusion logical expr fn convert_node_to_expr(node: RPCNode) -> Result { @@ -713,9 +706,9 @@ fn format_value<'a>(value: &'a RPCValue, f: &mut fmt::Formatter<'_>) -> fmt::Res RegexValue(r) => write!(f, "RegEx:{}", r), TagRefValue(bytes) => { let temp = String::from_utf8_lossy(bytes); - let sval = match *bytes.as_slice() { - [0] => "_m[0x00]", - [255] => "_f[0xff]", + let sval = match bytes.as_slice() { + TAG_KEY_MEASUREMENT => "_m[0x00]", + TAG_KEY_FIELD => "_f[0xff]", _ => &temp, }; write!(f, "TagRef:{}", sval) diff --git a/src/server/rpc/storage.rs b/src/server/rpc/storage.rs index e9ddff9a7b..77e45001a6 100644 --- a/src/server/rpc/storage.rs +++ b/src/server/rpc/storage.rs @@ -20,6 +20,7 @@ use data_types::error::ErrorLogger; // For some reason rust thinks these imports are unused, but then // complains of unresolved imports if they are not imported. use generated_types::{node, Node}; +use query::exec::fieldlist::FieldList; use query::group_by::GroupByAndAggregate; use crate::server::rpc::expr::{self, AddRPCNode, Loggable, SpecialTagKeys}; @@ -140,13 +141,16 @@ pub enum Error { source: crate::server::rpc::expr::Error, }, - #[snafu(display("Computing series: {}", source))] + #[snafu(display("Error computing series: {}", source))] ComputingSeriesSet { source: SeriesSetError }, - #[snafu(display("Computing groups series: {}", source))] + #[snafu(display("Error converting tag_key to UTF-8 in tag_values request, tag_key value '{}': {}", String::from_utf8_lossy(source.as_bytes()), source))] + ConvertingTagKeyInTagValues { source: std::string::FromUtf8Error }, + + #[snafu(display("Error computing groups series: {}", source))] ComputingGroupedSeriesSet { source: SeriesSetError }, - #[snafu(display("Converting time series into gRPC response: {}", source))] + #[snafu(display("Error converting time series into gRPC response: {}", source))] ConvertingSeriesSet { source: crate::server::rpc::data::Error, }, @@ -200,6 +204,7 @@ impl Error { Self::ConvertingReadGroupType { .. } => Status::invalid_argument(self.to_string()), Self::ConvertingWindowAggregate { .. } => Status::invalid_argument(self.to_string()), Self::ComputingSeriesSet { .. } => Status::invalid_argument(self.to_string()), + Self::ConvertingTagKeyInTagValues { .. } => Status::invalid_argument(self.to_string()), Self::ComputingGroupedSeriesSet { .. } => Status::invalid_argument(self.to_string()), Self::ConvertingSeriesSet { .. } => Status::invalid_argument(self.to_string()), Self::ConvertingFieldList { .. } => Status::invalid_argument(self.to_string()), @@ -462,7 +467,7 @@ where // Special case a request for 'tag_key=_measurement" means to list all measurements let response = if tag_key.is_measurement() { info!( - "tag_values with tag_key=[x00] for database {}, range: {:?}, predicate: {} --> returning measurement_names", + "tag_values with tag_key=[x00] (measurement name) for database {}, range: {:?}, predicate: {} --> returning measurement_names", db_name, range, predicate.loggable() ); @@ -473,7 +478,34 @@ where measurement_name_impl(self.db_store.clone(), self.executor.clone(), db_name, range) .await + } else if tag_key.is_field() { + info!( + "tag_values with tag_key=[xff] (field name) for database {}, range: {:?}, predicate: {} --> returning fields", + db_name, range, + predicate.loggable() + ); + + let fieldlist = field_names_impl( + self.db_store.clone(), + self.executor.clone(), + db_name, + None, + range, + predicate, + ) + .await?; + + // Pick out the field names into a Vec>for return + let values = fieldlist + .fields + .into_iter() + .map(|f| f.name.bytes().collect()) + .collect::>(); + + Ok(StringValuesResponse { values }) } else { + let tag_key = String::from_utf8(tag_key).context(ConvertingTagKeyInTagValues)?; + info!( "tag_values for database {}, range: {:?}, tag_key: {}, predicate: {}", db_name, @@ -713,9 +745,9 @@ where predicate.loggable() ); - let measurement = measurement; + let measurement = Some(measurement); - let response = measurement_fields_impl( + let response = field_names_impl( self.db_store.clone(), self.executor.clone(), db_name, @@ -724,7 +756,12 @@ where predicate, ) .await - .map_err(|e| e.to_status()); + .map(|fieldlist| { + fieldlist_to_measurement_fields_response(fieldlist) + .context(ConvertingFieldList) + .map_err(|e| e.to_status()) + }) + .map_err(|e| e.to_status())?; tx.send(response) .await @@ -793,10 +830,10 @@ where })?; // Map the resulting collection of Strings into a Vec>for return - let values = table_names + let values: Vec> = table_names .iter() .map(|name| name.bytes().collect()) - .collect::>(); + .collect(); Ok(StringValuesResponse { values }) } @@ -848,6 +885,9 @@ where // Map the resulting collection of Strings into a Vec>for return let values = tag_keys_to_byte_vecs(tag_keys); + // Debugging help: comment this out to see what is coming back + // info!("Returning tag keys"); + // values.iter().for_each(|k| info!(" {}", String::from_utf8_lossy(k))); Ok(StringValuesResponse { values }) } @@ -901,10 +941,14 @@ where })?; // Map the resulting collection of Strings into a Vec>for return - let values = tag_values + let values: Vec> = tag_values .iter() .map(|name| name.bytes().collect()) - .collect::>(); + .collect(); + + // Debugging help: uncomment to see raw values coming back + //info!("Returning tag values"); + //values.iter().for_each(|k| info!(" {}", String::from_utf8_lossy(k))); Ok(StringValuesResponse { values }) } @@ -1052,15 +1096,15 @@ where Ok(()) } -/// Return fields with optional measurement, timestamp and arbitratry predicates -async fn measurement_fields_impl( +/// Return field names, restricted via optional measurement, timestamp and predicate +async fn field_names_impl( db_store: Arc, executor: Arc, db_name: String, - measurement: String, + measurement: Option, range: Option, rpc_predicate: Option, -) -> Result +) -> Result where T: DatabaseStore, { @@ -1068,7 +1112,7 @@ where let predicate = PredicateBuilder::default() .set_range(range) - .table_option(Some(measurement)) + .table_option(measurement) .rpc_predicate(rpc_predicate) .context(ConvertingPredicate { rpc_predicate_string, @@ -1097,8 +1141,7 @@ where source: Box::new(e), })?; - // And convert the result - fieldlist_to_measurement_fields_response(fieldlist).context(ConvertingFieldList) + Ok(fieldlist) } /// Instantiate a server listening on the specified address @@ -1150,7 +1193,8 @@ mod tests { net::{IpAddr, Ipv4Addr, SocketAddr}, time::Duration, }; - use test_helpers::tracing::TracingCapture; + use test_helpers::{tag_key_bytes_to_strings, tracing::TracingCapture}; + use tonic::Code; use futures::prelude::*; @@ -1276,7 +1320,7 @@ mod tests { test_db.set_column_names(to_string_vec(&tag_keys)).await; let actual_tag_keys = fixture.storage_client.tag_keys(request).await?; - let mut expected_tag_keys = vec!["_field", "_measurement"]; + let mut expected_tag_keys = vec!["_f(0xff)", "_m(0x00)"]; expected_tag_keys.extend(tag_keys.iter()); assert_eq!( @@ -1368,7 +1412,7 @@ mod tests { let actual_tag_keys = fixture.storage_client.measurement_tag_keys(request).await?; - let mut expected_tag_keys = vec!["_field", "_measurement"]; + let mut expected_tag_keys = vec!["_f(0xff)", "_m(0x00)"]; expected_tag_keys.extend(tag_keys.iter()); assert_eq!( @@ -1416,7 +1460,7 @@ mod tests { /// the right parameters are passed into the Database interface /// and that the returned values are sent back via gRPC. #[tokio::test] - async fn test_storage_rpc_tag_values() -> Result<(), tonic::Status> { + async fn test_storage_rpc_tag_values() { // Start a test gRPC server on a randomally allocated port let mut fixture = Fixture::new().await.expect("Connecting to test server"); @@ -1450,7 +1494,7 @@ mod tests { test_db.set_column_values(to_string_vec(&tag_values)).await; - let actual_tag_values = fixture.storage_client.tag_values(request).await?; + let actual_tag_values = fixture.storage_client.tag_values(request).await.unwrap(); assert_eq!( actual_tag_values, tag_values, "unexpected tag values while getting tag values" @@ -1468,7 +1512,7 @@ mod tests { tags_source: source.clone(), range: make_timestamp_range(1000, 1500), predicate: None, - tag_key: "\x00".into(), + tag_key: [0].into(), }; let lp_data = "h2o,state=CA temp=50.4 1000\n\ @@ -1479,12 +1523,40 @@ mod tests { .await; let tag_values = vec!["h2o"]; - let actual_tag_values = fixture.storage_client.tag_values(request).await?; + let actual_tag_values = fixture.storage_client.tag_values(request).await.unwrap(); assert_eq!( actual_tag_values, tag_values, "unexpected tag values while getting tag values for measurement names" ); + // --- + // test tag_key = _field means listing all field names + // --- + let request = TagValuesRequest { + tags_source: source.clone(), + range: make_timestamp_range(1000, 1500), + predicate: None, + tag_key: [255].into(), + }; + + // Setup a single field name (Field1) + let fieldlist = FieldList { + fields: vec![Field { + name: "Field1".into(), + data_type: DataType::Utf8, + last_timestamp: 1000, + }], + }; + let fieldlist_plan = FieldListPlan::Known(Ok(fieldlist)); + test_db.set_field_colum_names_values(fieldlist_plan).await; + + let expected_tag_values = vec!["Field1"]; + let actual_tag_values = fixture.storage_client.tag_values(request).await.unwrap(); + assert_eq!( + actual_tag_values, expected_tag_values, + "unexpected tag values while getting tag values for field names" + ); + // --- // test error // --- @@ -1513,14 +1585,33 @@ mod tests { }); assert_eq!(test_db.get_column_values_request().await, expected_request); - Ok(()) + // --- + // test error with non utf8 value + // --- + let request = TagValuesRequest { + tags_source: source.clone(), + range: None, + predicate: None, + tag_key: [0, 255].into(), // this is not a valid UTF-8 string + }; + + let response = fixture.storage_client.tag_values(request).await; + assert!(response.is_err()); + let response_string = format!("{:?}", response); + let expected_error = "Error converting tag_key to UTF-8 in tag_values request"; + assert!( + response_string.contains(expected_error), + "'{}' did not contain expected content '{}'", + response_string, + expected_error + ); } /// test the plumbing of the RPC layer for measurement_tag_values-- specifically that /// the right parameters are passed into the Database interface /// and that the returned values are sent back via gRPC. #[tokio::test] - async fn test_storage_rpc_measurement_tag_values() -> Result<(), tonic::Status> { + async fn test_storage_rpc_measurement_tag_values() { // Start a test gRPC server on a randomally allocated port let mut fixture = Fixture::new().await.expect("Connecting to test server"); @@ -1558,7 +1649,9 @@ mod tests { let actual_tag_values = fixture .storage_client .measurement_tag_values(request) - .await?; + .await + .unwrap(); + assert_eq!( actual_tag_values, tag_values, "unexpected tag values while getting tag values", @@ -1598,8 +1691,6 @@ mod tests { column_name: "the_tag_key".into(), }); assert_eq!(test_db.get_column_values_request().await, expected_request); - - Ok(()) } #[tokio::test] @@ -2387,7 +2478,7 @@ mod tests { .into_iter() .map(|r| r.values.into_iter()) .flatten() - .map(|v| String::from_utf8(v).expect("string value response was not utf8")) + .map(tag_key_bytes_to_strings) .collect::>(); strings.sort(); diff --git a/test_helpers/src/lib.rs b/test_helpers/src/lib.rs index fb421d4bfd..a6de59d60c 100644 --- a/test_helpers/src/lib.rs +++ b/test_helpers/src/lib.rs @@ -47,6 +47,18 @@ pub fn str_pair_vec_to_vec(str_vec: &[(&str, &str)]) -> Vec<(Arc, Arc) -> String { + match bytes.as_slice() { + [0] => "_m(0x00)".into(), + // note this isn't valid UTF8 and thus would assert below + [255] => "_f(0xff)".into(), + _ => String::from_utf8(bytes).expect("string value response was not utf8"), + } +} + pub fn enable_logging() { std::env::set_var("RUST_LOG", "debug"); env_logger::init(); diff --git a/tests/end-to-end.rs b/tests/end-to-end.rs index d23791790b..562ca09b58 100644 --- a/tests/end-to-end.rs +++ b/tests/end-to-end.rs @@ -293,25 +293,28 @@ async fn read_and_write_data() -> Result<()> { let responses: Vec<_> = tag_keys_response.into_inner().try_collect().await?; let keys = &responses[0].values; - let keys: Vec<_> = keys.iter().map(|s| str::from_utf8(s).unwrap()).collect(); + let keys: Vec<_> = keys + .iter() + .map(|v| tag_key_bytes_to_strings(v.clone())) + .collect(); - assert_eq!( - keys, - vec!["_field", "_measurement", "host", "name", "region"] - ); + assert_eq!(keys, vec!["_m(0x00)", "host", "name", "region", "_f(0xff)"]); let tag_values_request = tonic::Request::new(TagValuesRequest { tags_source: read_source.clone(), range: range.clone(), predicate: predicate.clone(), - tag_key: String::from("host"), + tag_key: b"host".to_vec(), }); let tag_values_response = storage_client.tag_values(tag_values_request).await?; let responses: Vec<_> = tag_values_response.into_inner().try_collect().await?; let values = &responses[0].values; - let values: Vec<_> = values.iter().map(|s| str::from_utf8(s).unwrap()).collect(); + let values: Vec<_> = values + .iter() + .map(|v| tag_key_bytes_to_strings(v.clone())) + .collect(); assert_eq!(values, vec!["server01"]); @@ -397,9 +400,12 @@ async fn read_and_write_data() -> Result<()> { .await?; let values = &responses[0].values; - let values: Vec<_> = values.iter().map(|s| str::from_utf8(s).unwrap()).collect(); + let values: Vec<_> = values + .iter() + .map(|v| tag_key_bytes_to_strings(v.clone())) + .collect(); - assert_eq!(values, vec!["_field", "_measurement", "host", "region"]); + assert_eq!(values, vec!["_m(0x00)", "host", "region", "_f(0xff)"]); let measurement_tag_values_request = tonic::Request::new(MeasurementTagValuesRequest { source: read_source.clone(), @@ -418,7 +424,10 @@ async fn read_and_write_data() -> Result<()> { .await?; let values = &responses[0].values; - let values: Vec<_> = values.iter().map(|s| str::from_utf8(s).unwrap()).collect(); + let values: Vec<_> = values + .iter() + .map(|v| tag_key_bytes_to_strings(v.clone())) + .collect(); assert_eq!(values, vec!["server01"]);