Merge branch 'main' into er/refactor/time-predicate

pull/24376/head
Edd Robinson 2020-12-02 19:13:12 +00:00 committed by GitHub
commit 05c420cc9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 214 additions and 66 deletions

View File

@ -175,7 +175,10 @@ message TagValuesRequest {
google.protobuf.Any tags_source = 1 ;
TimestampRange range = 2; // [(gogoproto.nullable) = false];
Predicate predicate = 3;
string tag_key = 4;
// string tag_key = 4;
// AAL changed from string --> bytes to handle \xff literals in Rust which are not valid UTF-8
bytes tag_key = 4;
}
// Response message for Storage.TagKeys, Storage.TagValues Storage.MeasurementNames,

View File

@ -1,5 +1,13 @@
//! This module contains gRPC service implementatations
/// `[0x00]` is the magic value that that the storage gRPC layer uses to
/// encode a tag_key that means "measurement name"
pub(crate) const TAG_KEY_MEASUREMENT: &[u8] = &[0];
/// `[0xff]` is is the magic value that that the storage gRPC layer uses
/// to encode a tag_key that means "field name"
pub(crate) const TAG_KEY_FIELD: &[u8] = &[255];
pub mod data;
pub mod expr;
pub mod input;

View File

@ -22,6 +22,7 @@ use generated_types::{
MeasurementFieldsResponse, ReadResponse, Tag,
};
use super::{TAG_KEY_FIELD, TAG_KEY_MEASUREMENT};
use snafu::Snafu;
#[derive(Debug, Snafu)]
@ -35,16 +36,20 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Convert a set of tag_keys into a form suitable for gRPC transport
/// Convert a set of tag_keys into a form suitable for gRPC transport,
/// adding the special 0x00 (_m) and 0xff (_f) tag keys
///
/// Namely, a Vec<Vec<u8>>, including the measurement and field names
pub fn tag_keys_to_byte_vecs(tag_keys: Arc<BTreeSet<String>>) -> Vec<Vec<u8>> {
// special case measurement and field
let specials_iter = vec![b"_field".to_vec(), b"_measurement".to_vec()].into_iter();
let tag_keys_iter = tag_keys.iter().map(|name| name.bytes().collect());
specials_iter.chain(tag_keys_iter).collect()
// special case measurement (0x00) and field (0xff)
// ensuring they are in the correct sort order (first and last, respectively)
let mut byte_vecs = Vec::with_capacity(2 + tag_keys.len());
byte_vecs.push(TAG_KEY_MEASUREMENT.to_vec()); // Shown as _m == _measurement
tag_keys.iter().for_each(|name| {
byte_vecs.push(name.bytes().collect());
});
byte_vecs.push(TAG_KEY_FIELD.to_vec()); // Shown as _f == _field
byte_vecs
}
fn series_set_to_frames(series_set: SeriesSet) -> Result<Vec<Frame>> {
@ -305,6 +310,33 @@ mod tests {
use super::*;
#[test]
fn test_tag_keys_to_byte_vecs() {
fn convert_keys(tag_keys: &[&str]) -> Vec<Vec<u8>> {
let tag_keys = tag_keys
.iter()
.map(|s| s.to_string())
.collect::<BTreeSet<_>>();
tag_keys_to_byte_vecs(Arc::new(tag_keys))
}
assert_eq!(convert_keys(&[]), vec![[0].to_vec(), [255].to_vec()]);
assert_eq!(
convert_keys(&["key_a"]),
vec![[0].to_vec(), b"key_a".to_vec(), [255].to_vec()]
);
assert_eq!(
convert_keys(&["key_a", "key_b"]),
vec![
[0].to_vec(),
b"key_a".to_vec(),
b"key_b".to_vec(),
[255].to_vec()
]
);
}
fn series_set_to_read_response(series_set: SeriesSet) -> Result<ReadResponse> {
let frames = series_set_to_frames(series_set)?;
Ok(ReadResponse { frames })

View File

@ -16,8 +16,9 @@ use generated_types::{
Aggregate as RPCAggregate, Duration as RPCDuration, Node as RPCNode, Predicate as RPCPredicate,
Window as RPCWindow,
};
use query::group_by::{Aggregate as QueryAggregate, GroupByAndAggregate, WindowDuration};
use super::{TAG_KEY_FIELD, TAG_KEY_MEASUREMENT};
use query::group_by::{Aggregate as QueryAggregate, GroupByAndAggregate, WindowDuration};
use query::predicate::PredicateBuilder;
use snafu::{ResultExt, Snafu};
use tracing::warn;
@ -350,27 +351,19 @@ pub trait SpecialTagKeys {
impl SpecialTagKeys for Vec<u8> {
fn is_measurement(&self) -> bool {
self.as_slice() == [0]
self.as_slice() == TAG_KEY_MEASUREMENT
}
/// Return true if this tag key actually refers to a field
/// name (e.g. _field or _f)
fn is_field(&self) -> bool {
self.as_slice() == [255]
self.as_slice() == TAG_KEY_FIELD
}
}
impl SpecialTagKeys for String {
fn is_measurement(&self) -> bool {
self.as_bytes() == [0]
}
/// Return true if this tag key actually refers to a field
/// name (e.g. _field or _f)
fn is_field(&self) -> bool {
self.as_bytes() == [255]
}
}
// Note that is_field can *NEVER* return true for a `String` because 0xff
// is not a valid UTF-8 character, and thus can not be a valid Rust
// String.
// converts a Node from the RPC layer into a datafusion logical expr
fn convert_node_to_expr(node: RPCNode) -> Result<Expr> {
@ -713,9 +706,9 @@ fn format_value<'a>(value: &'a RPCValue, f: &mut fmt::Formatter<'_>) -> fmt::Res
RegexValue(r) => write!(f, "RegEx:{}", r),
TagRefValue(bytes) => {
let temp = String::from_utf8_lossy(bytes);
let sval = match *bytes.as_slice() {
[0] => "_m[0x00]",
[255] => "_f[0xff]",
let sval = match bytes.as_slice() {
TAG_KEY_MEASUREMENT => "_m[0x00]",
TAG_KEY_FIELD => "_f[0xff]",
_ => &temp,
};
write!(f, "TagRef:{}", sval)

View File

@ -20,6 +20,7 @@ use data_types::error::ErrorLogger;
// For some reason rust thinks these imports are unused, but then
// complains of unresolved imports if they are not imported.
use generated_types::{node, Node};
use query::exec::fieldlist::FieldList;
use query::group_by::GroupByAndAggregate;
use crate::server::rpc::expr::{self, AddRPCNode, Loggable, SpecialTagKeys};
@ -140,13 +141,16 @@ pub enum Error {
source: crate::server::rpc::expr::Error,
},
#[snafu(display("Computing series: {}", source))]
#[snafu(display("Error computing series: {}", source))]
ComputingSeriesSet { source: SeriesSetError },
#[snafu(display("Computing groups series: {}", source))]
#[snafu(display("Error converting tag_key to UTF-8 in tag_values request, tag_key value '{}': {}", String::from_utf8_lossy(source.as_bytes()), source))]
ConvertingTagKeyInTagValues { source: std::string::FromUtf8Error },
#[snafu(display("Error computing groups series: {}", source))]
ComputingGroupedSeriesSet { source: SeriesSetError },
#[snafu(display("Converting time series into gRPC response: {}", source))]
#[snafu(display("Error converting time series into gRPC response: {}", source))]
ConvertingSeriesSet {
source: crate::server::rpc::data::Error,
},
@ -200,6 +204,7 @@ impl Error {
Self::ConvertingReadGroupType { .. } => Status::invalid_argument(self.to_string()),
Self::ConvertingWindowAggregate { .. } => Status::invalid_argument(self.to_string()),
Self::ComputingSeriesSet { .. } => Status::invalid_argument(self.to_string()),
Self::ConvertingTagKeyInTagValues { .. } => Status::invalid_argument(self.to_string()),
Self::ComputingGroupedSeriesSet { .. } => Status::invalid_argument(self.to_string()),
Self::ConvertingSeriesSet { .. } => Status::invalid_argument(self.to_string()),
Self::ConvertingFieldList { .. } => Status::invalid_argument(self.to_string()),
@ -462,7 +467,7 @@ where
// Special case a request for 'tag_key=_measurement" means to list all measurements
let response = if tag_key.is_measurement() {
info!(
"tag_values with tag_key=[x00] for database {}, range: {:?}, predicate: {} --> returning measurement_names",
"tag_values with tag_key=[x00] (measurement name) for database {}, range: {:?}, predicate: {} --> returning measurement_names",
db_name, range,
predicate.loggable()
);
@ -473,7 +478,34 @@ where
measurement_name_impl(self.db_store.clone(), self.executor.clone(), db_name, range)
.await
} else if tag_key.is_field() {
info!(
"tag_values with tag_key=[xff] (field name) for database {}, range: {:?}, predicate: {} --> returning fields",
db_name, range,
predicate.loggable()
);
let fieldlist = field_names_impl(
self.db_store.clone(),
self.executor.clone(),
db_name,
None,
range,
predicate,
)
.await?;
// Pick out the field names into a Vec<Vec<u8>>for return
let values = fieldlist
.fields
.into_iter()
.map(|f| f.name.bytes().collect())
.collect::<Vec<_>>();
Ok(StringValuesResponse { values })
} else {
let tag_key = String::from_utf8(tag_key).context(ConvertingTagKeyInTagValues)?;
info!(
"tag_values for database {}, range: {:?}, tag_key: {}, predicate: {}",
db_name,
@ -713,9 +745,9 @@ where
predicate.loggable()
);
let measurement = measurement;
let measurement = Some(measurement);
let response = measurement_fields_impl(
let response = field_names_impl(
self.db_store.clone(),
self.executor.clone(),
db_name,
@ -724,7 +756,12 @@ where
predicate,
)
.await
.map_err(|e| e.to_status());
.map(|fieldlist| {
fieldlist_to_measurement_fields_response(fieldlist)
.context(ConvertingFieldList)
.map_err(|e| e.to_status())
})
.map_err(|e| e.to_status())?;
tx.send(response)
.await
@ -793,10 +830,10 @@ where
})?;
// Map the resulting collection of Strings into a Vec<Vec<u8>>for return
let values = table_names
let values: Vec<Vec<u8>> = table_names
.iter()
.map(|name| name.bytes().collect())
.collect::<Vec<_>>();
.collect();
Ok(StringValuesResponse { values })
}
@ -848,6 +885,9 @@ where
// Map the resulting collection of Strings into a Vec<Vec<u8>>for return
let values = tag_keys_to_byte_vecs(tag_keys);
// Debugging help: comment this out to see what is coming back
// info!("Returning tag keys");
// values.iter().for_each(|k| info!(" {}", String::from_utf8_lossy(k)));
Ok(StringValuesResponse { values })
}
@ -901,10 +941,14 @@ where
})?;
// Map the resulting collection of Strings into a Vec<Vec<u8>>for return
let values = tag_values
let values: Vec<Vec<u8>> = tag_values
.iter()
.map(|name| name.bytes().collect())
.collect::<Vec<_>>();
.collect();
// Debugging help: uncomment to see raw values coming back
//info!("Returning tag values");
//values.iter().for_each(|k| info!(" {}", String::from_utf8_lossy(k)));
Ok(StringValuesResponse { values })
}
@ -1052,15 +1096,15 @@ where
Ok(())
}
/// Return fields with optional measurement, timestamp and arbitratry predicates
async fn measurement_fields_impl<T>(
/// Return field names, restricted via optional measurement, timestamp and predicate
async fn field_names_impl<T>(
db_store: Arc<T>,
executor: Arc<QueryExecutor>,
db_name: String,
measurement: String,
measurement: Option<String>,
range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>,
) -> Result<MeasurementFieldsResponse>
) -> Result<FieldList>
where
T: DatabaseStore,
{
@ -1068,7 +1112,7 @@ where
let predicate = PredicateBuilder::default()
.set_range(range)
.table_option(Some(measurement))
.table_option(measurement)
.rpc_predicate(rpc_predicate)
.context(ConvertingPredicate {
rpc_predicate_string,
@ -1097,8 +1141,7 @@ where
source: Box::new(e),
})?;
// And convert the result
fieldlist_to_measurement_fields_response(fieldlist).context(ConvertingFieldList)
Ok(fieldlist)
}
/// Instantiate a server listening on the specified address
@ -1150,7 +1193,8 @@ mod tests {
net::{IpAddr, Ipv4Addr, SocketAddr},
time::Duration,
};
use test_helpers::tracing::TracingCapture;
use test_helpers::{tag_key_bytes_to_strings, tracing::TracingCapture};
use tonic::Code;
use futures::prelude::*;
@ -1276,7 +1320,7 @@ mod tests {
test_db.set_column_names(to_string_vec(&tag_keys)).await;
let actual_tag_keys = fixture.storage_client.tag_keys(request).await?;
let mut expected_tag_keys = vec!["_field", "_measurement"];
let mut expected_tag_keys = vec!["_f(0xff)", "_m(0x00)"];
expected_tag_keys.extend(tag_keys.iter());
assert_eq!(
@ -1368,7 +1412,7 @@ mod tests {
let actual_tag_keys = fixture.storage_client.measurement_tag_keys(request).await?;
let mut expected_tag_keys = vec!["_field", "_measurement"];
let mut expected_tag_keys = vec!["_f(0xff)", "_m(0x00)"];
expected_tag_keys.extend(tag_keys.iter());
assert_eq!(
@ -1416,7 +1460,7 @@ mod tests {
/// the right parameters are passed into the Database interface
/// and that the returned values are sent back via gRPC.
#[tokio::test]
async fn test_storage_rpc_tag_values() -> Result<(), tonic::Status> {
async fn test_storage_rpc_tag_values() {
// Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new().await.expect("Connecting to test server");
@ -1450,7 +1494,7 @@ mod tests {
test_db.set_column_values(to_string_vec(&tag_values)).await;
let actual_tag_values = fixture.storage_client.tag_values(request).await?;
let actual_tag_values = fixture.storage_client.tag_values(request).await.unwrap();
assert_eq!(
actual_tag_values, tag_values,
"unexpected tag values while getting tag values"
@ -1468,7 +1512,7 @@ mod tests {
tags_source: source.clone(),
range: make_timestamp_range(1000, 1500),
predicate: None,
tag_key: "\x00".into(),
tag_key: [0].into(),
};
let lp_data = "h2o,state=CA temp=50.4 1000\n\
@ -1479,12 +1523,40 @@ mod tests {
.await;
let tag_values = vec!["h2o"];
let actual_tag_values = fixture.storage_client.tag_values(request).await?;
let actual_tag_values = fixture.storage_client.tag_values(request).await.unwrap();
assert_eq!(
actual_tag_values, tag_values,
"unexpected tag values while getting tag values for measurement names"
);
// ---
// test tag_key = _field means listing all field names
// ---
let request = TagValuesRequest {
tags_source: source.clone(),
range: make_timestamp_range(1000, 1500),
predicate: None,
tag_key: [255].into(),
};
// Setup a single field name (Field1)
let fieldlist = FieldList {
fields: vec![Field {
name: "Field1".into(),
data_type: DataType::Utf8,
last_timestamp: 1000,
}],
};
let fieldlist_plan = FieldListPlan::Known(Ok(fieldlist));
test_db.set_field_colum_names_values(fieldlist_plan).await;
let expected_tag_values = vec!["Field1"];
let actual_tag_values = fixture.storage_client.tag_values(request).await.unwrap();
assert_eq!(
actual_tag_values, expected_tag_values,
"unexpected tag values while getting tag values for field names"
);
// ---
// test error
// ---
@ -1513,14 +1585,33 @@ mod tests {
});
assert_eq!(test_db.get_column_values_request().await, expected_request);
Ok(())
// ---
// test error with non utf8 value
// ---
let request = TagValuesRequest {
tags_source: source.clone(),
range: None,
predicate: None,
tag_key: [0, 255].into(), // this is not a valid UTF-8 string
};
let response = fixture.storage_client.tag_values(request).await;
assert!(response.is_err());
let response_string = format!("{:?}", response);
let expected_error = "Error converting tag_key to UTF-8 in tag_values request";
assert!(
response_string.contains(expected_error),
"'{}' did not contain expected content '{}'",
response_string,
expected_error
);
}
/// test the plumbing of the RPC layer for measurement_tag_values-- specifically that
/// the right parameters are passed into the Database interface
/// and that the returned values are sent back via gRPC.
#[tokio::test]
async fn test_storage_rpc_measurement_tag_values() -> Result<(), tonic::Status> {
async fn test_storage_rpc_measurement_tag_values() {
// Start a test gRPC server on a randomally allocated port
let mut fixture = Fixture::new().await.expect("Connecting to test server");
@ -1558,7 +1649,9 @@ mod tests {
let actual_tag_values = fixture
.storage_client
.measurement_tag_values(request)
.await?;
.await
.unwrap();
assert_eq!(
actual_tag_values, tag_values,
"unexpected tag values while getting tag values",
@ -1598,8 +1691,6 @@ mod tests {
column_name: "the_tag_key".into(),
});
assert_eq!(test_db.get_column_values_request().await, expected_request);
Ok(())
}
#[tokio::test]
@ -2387,7 +2478,7 @@ mod tests {
.into_iter()
.map(|r| r.values.into_iter())
.flatten()
.map(|v| String::from_utf8(v).expect("string value response was not utf8"))
.map(tag_key_bytes_to_strings)
.collect::<Vec<_>>();
strings.sort();

View File

@ -47,6 +47,18 @@ pub fn str_pair_vec_to_vec(str_vec: &[(&str, &str)]) -> Vec<(Arc<String>, Arc<St
.collect()
}
/// Converts bytes representing tag_keys values to Rust strings,
/// handling the special case _m (0x00) and _f (0xff) values. Other
/// than [0xff] panics on any non-utf8 string.
pub fn tag_key_bytes_to_strings(bytes: Vec<u8>) -> String {
match bytes.as_slice() {
[0] => "_m(0x00)".into(),
// note this isn't valid UTF8 and thus would assert below
[255] => "_f(0xff)".into(),
_ => String::from_utf8(bytes).expect("string value response was not utf8"),
}
}
pub fn enable_logging() {
std::env::set_var("RUST_LOG", "debug");
env_logger::init();

View File

@ -293,25 +293,28 @@ async fn read_and_write_data() -> Result<()> {
let responses: Vec<_> = tag_keys_response.into_inner().try_collect().await?;
let keys = &responses[0].values;
let keys: Vec<_> = keys.iter().map(|s| str::from_utf8(s).unwrap()).collect();
let keys: Vec<_> = keys
.iter()
.map(|v| tag_key_bytes_to_strings(v.clone()))
.collect();
assert_eq!(
keys,
vec!["_field", "_measurement", "host", "name", "region"]
);
assert_eq!(keys, vec!["_m(0x00)", "host", "name", "region", "_f(0xff)"]);
let tag_values_request = tonic::Request::new(TagValuesRequest {
tags_source: read_source.clone(),
range: range.clone(),
predicate: predicate.clone(),
tag_key: String::from("host"),
tag_key: b"host".to_vec(),
});
let tag_values_response = storage_client.tag_values(tag_values_request).await?;
let responses: Vec<_> = tag_values_response.into_inner().try_collect().await?;
let values = &responses[0].values;
let values: Vec<_> = values.iter().map(|s| str::from_utf8(s).unwrap()).collect();
let values: Vec<_> = values
.iter()
.map(|v| tag_key_bytes_to_strings(v.clone()))
.collect();
assert_eq!(values, vec!["server01"]);
@ -397,9 +400,12 @@ async fn read_and_write_data() -> Result<()> {
.await?;
let values = &responses[0].values;
let values: Vec<_> = values.iter().map(|s| str::from_utf8(s).unwrap()).collect();
let values: Vec<_> = values
.iter()
.map(|v| tag_key_bytes_to_strings(v.clone()))
.collect();
assert_eq!(values, vec!["_field", "_measurement", "host", "region"]);
assert_eq!(values, vec!["_m(0x00)", "host", "region", "_f(0xff)"]);
let measurement_tag_values_request = tonic::Request::new(MeasurementTagValuesRequest {
source: read_source.clone(),
@ -418,7 +424,10 @@ async fn read_and_write_data() -> Result<()> {
.await?;
let values = &responses[0].values;
let values: Vec<_> = values.iter().map(|s| str::from_utf8(s).unwrap()).collect();
let values: Vec<_> = values
.iter()
.map(|v| tag_key_bytes_to_strings(v.clone()))
.collect();
assert_eq!(values, vec!["server01"]);