feat: expose regex =\~ and to gRPC API

pull/24376/head
Edd Robinson 2021-05-06 19:04:28 +01:00
parent eae3fec571
commit beee3115f4
3 changed files with 135 additions and 17 deletions

View File

@ -26,7 +26,7 @@ pub const REGEX_NOT_MATCH_UDF_NAME: &str = "RegexMatch";
/// This UDF is designed to support the regex operator that can be pushed down
/// via the InfluxRPC API.
///
pub(crate) fn regex_match_expr(input: Expr, pattern: String, matches: bool) -> Expr {
pub fn regex_match_expr(input: Expr, pattern: String, matches: bool) -> Expr {
// N.B., this function does not utilise the Arrow regexp compute kernel because
// in order to act as a filter it needs to return a boolean array of comparison
// results, not an array of strings as the regex compute kernel does.

View File

@ -10,6 +10,7 @@ use std::{convert::TryFrom, fmt};
use datafusion::{
logical_plan::{binary_expr, Expr, Operator},
prelude::*,
scalar::ScalarValue,
};
use generated_types::{
aggregate::AggregateType as RPCAggregateType, node::Comparison as RPCComparison,
@ -20,9 +21,10 @@ use generated_types::{
use super::{TAG_KEY_FIELD, TAG_KEY_MEASUREMENT};
use observability_deps::tracing::warn;
use query::func::regex;
use query::group_by::{Aggregate as QueryAggregate, WindowDuration};
use query::predicate::PredicateBuilder;
use snafu::{ResultExt, Snafu};
use snafu::{OptionExt, ResultExt, Snafu};
#[derive(Debug, Snafu)]
pub enum Error {
@ -66,19 +68,14 @@ pub enum Error {
#[snafu(display("Internal error: found field tag reference in unexpected location"))]
InternalInvalidFieldReference {},
#[snafu(display(
"Error creating predicate: Regular expression predicates are not supported: {}",
regexp
))]
RegExpLiteralNotSupported { regexp: String },
#[snafu(display("Invalid regex pattern"))]
RegExpPatternInvalid {},
#[snafu(display("Error creating predicate: Regular expression predicates are not supported"))]
RegExpNotSupported {},
#[snafu(display("Internal error: regex expression input not expected"))]
InternalInvalidRegexExprReference {},
#[snafu(display(
"Error creating predicate: Not Regular expression predicates are not supported"
))]
NotRegExpNotSupported {},
#[snafu(display("Internal error: incorrect number of nodes: {:?}", num_children))]
InternalInvalidRegexExprChildren { num_children: usize },
#[snafu(display("Error creating predicate: StartsWith comparisons not supported"))]
StartsWithNotSupported {},
@ -452,7 +449,7 @@ fn build_node(value: RPCValue, inputs: Vec<Expr>) -> Result<Expr> {
RPCValue::IntValue(v) => Ok(lit(v)),
RPCValue::UintValue(v) => Ok(lit(v)),
RPCValue::FloatValue(f) => Ok(lit(f)),
RPCValue::RegexValue(regexp) => RegExpLiteralNotSupported { regexp }.fail(),
RPCValue::RegexValue(pattern) => Ok(lit(pattern)),
RPCValue::TagRefValue(tag_name) => Ok(col(&make_tag_name(tag_name)?)),
RPCValue::FieldRefValue(field_name) => Ok(col(&field_name)),
RPCValue::Logical(logical) => build_logical_node(logical, inputs),
@ -471,7 +468,7 @@ fn build_logical_node(logical: i32, inputs: Vec<Expr>) -> Result<Expr> {
}
}
/// Creates an expr from a "Comparsion" Node
/// Creates an expr from a "Comparison" Node
fn build_comparison_node(comparison: i32, inputs: Vec<Expr>) -> Result<Expr> {
let comparison_enum = RPCComparison::from_i32(comparison);
@ -479,8 +476,8 @@ fn build_comparison_node(comparison: i32, inputs: Vec<Expr>) -> Result<Expr> {
Some(RPCComparison::Equal) => build_binary_expr(Operator::Eq, inputs),
Some(RPCComparison::NotEqual) => build_binary_expr(Operator::NotEq, inputs),
Some(RPCComparison::StartsWith) => StartsWithNotSupported {}.fail(),
Some(RPCComparison::Regex) => RegExpNotSupported {}.fail(),
Some(RPCComparison::NotRegex) => NotRegExpNotSupported {}.fail(),
Some(RPCComparison::Regex) => build_regex_match_expr(true, inputs),
Some(RPCComparison::NotRegex) => build_regex_match_expr(false, inputs),
Some(RPCComparison::Lt) => build_binary_expr(Operator::Lt, inputs),
Some(RPCComparison::Lte) => build_binary_expr(Operator::LtEq, inputs),
Some(RPCComparison::Gt) => build_binary_expr(Operator::Gt, inputs),
@ -505,6 +502,24 @@ fn build_binary_expr(op: Operator, inputs: Vec<Expr>) -> Result<Expr> {
}
}
// Creates a DataFusion ScalarUDF expression that performs a regex matching
// operation.
fn build_regex_match_expr(matches: bool, mut inputs: Vec<Expr>) -> Result<Expr> {
let num_children = inputs.len();
match num_children {
2 => {
let pattern = if let Expr::Literal(ScalarValue::Utf8(pattern)) = inputs.remove(1) {
pattern.context(RegExpPatternInvalid)?
} else {
return InternalInvalidRegexExprReference.fail();
};
Ok(regex::regex_match_expr(inputs.remove(0), pattern, matches))
}
_ => InternalInvalidRegexExprChildren { num_children }.fail(),
}
}
pub fn make_read_group_aggregate(
aggregate: Option<RPCAggregate>,
group: RPCGroup,

View File

@ -294,6 +294,47 @@ async fn measurement_fields_endpoint(
assert_eq!(field.timestamp, scenario.ns_since_epoch() + 4);
}
#[tokio::test]
pub async fn regex_operator_test() {
let fixture = ServerFixture::create_shared().await;
let mut management = fixture.management_client();
let mut storage_client = StorageClient::new(fixture.grpc_channel());
let influxdb2 = fixture.influxdb2_client();
let scenario = Scenario::new();
scenario.create_database(&mut management).await;
load_read_group_data(&influxdb2, &scenario).await;
let read_source = scenario.read_source();
// read_group(group_keys: region, agg: None)
let read_filter_request = ReadFilterRequest {
read_source: read_source.clone(),
range: Some(TimestampRange {
start: 0,
end: 2001, // include all data
}),
predicate: Some(make_regex_match_predicate("host", "^b.+")),
};
let expected_frames = vec![
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"20,21\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"81,82\"",
"SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"40,41\"",
"SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0",
"FloatPointsFrame, timestamps: [1000, 2000], values: \"51,52\"",
];
assert_eq!(
do_read_filter_request(&mut storage_client, read_filter_request).await,
expected_frames,
);
}
#[tokio::test]
pub async fn read_group_test() {
let fixture = ServerFixture::create_shared().await;
@ -654,6 +695,41 @@ fn make_tag_predicate(tag_name: impl Into<String>, tag_value: impl Into<String>)
}
}
// Create a predicate representing tag_name ~= /pattern/
//
// The constitution of this request was formed by looking at a real request
// made to storage, which looked like this:
//
// root:<
// node_type:COMPARISON_EXPRESSION
// children:<node_type:TAG_REF tag_ref_value:"tag_key_name" >
// children:<node_type:LITERAL regex_value:"pattern" >
// comparison:REGEX
// >
fn make_regex_match_predicate(
tag_key_name: impl Into<String>,
pattern: impl Into<String>,
) -> Predicate {
Predicate {
root: Some(Node {
node_type: NodeType::ComparisonExpression as i32,
children: vec![
Node {
node_type: NodeType::TagRef as i32,
children: vec![],
value: Some(Value::TagRefValue(tag_key_name.into().into())),
},
Node {
node_type: NodeType::Literal as i32,
children: vec![],
value: Some(Value::RegexValue(pattern.into())),
},
],
value: Some(Value::Comparison(Comparison::Regex as _)),
}),
}
}
/// Create a predicate representing _f=field_name in the horrible gRPC structs
fn make_field_predicate(field_name: impl Into<String>) -> Predicate {
Predicate {
@ -676,6 +752,33 @@ fn make_field_predicate(field_name: impl Into<String>) -> Predicate {
}
}
/// Make a read_group request and returns the results in a comparable format
async fn do_read_filter_request(
storage_client: &mut StorageClient<tonic::transport::Channel>,
request: ReadFilterRequest,
) -> Vec<String> {
let request = tonic::Request::new(request);
let read_filter_response = storage_client
.read_filter(request)
.await
.expect("successful read_filter call");
let responses: Vec<_> = read_filter_response
.into_inner()
.try_collect()
.await
.unwrap();
let frames: Vec<_> = responses
.into_iter()
.flat_map(|r| r.frames)
.flat_map(|f| f.data)
.collect();
dump_data_frames(&frames)
}
/// Make a read_group request and returns the results in a comparable format
async fn do_read_group_request(
storage_client: &mut StorageClient<tonic::transport::Channel>,