From beee3115f4455671364e514704bc8961f7b6ae3e Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 6 May 2021 19:04:28 +0100 Subject: [PATCH] feat: expose regex =\~ and to gRPC API --- query/src/func/regex.rs | 2 +- src/influxdb_ioxd/rpc/storage/expr.rs | 47 ++++++++---- tests/end_to_end_cases/storage_api.rs | 103 ++++++++++++++++++++++++++ 3 files changed, 135 insertions(+), 17 deletions(-) diff --git a/query/src/func/regex.rs b/query/src/func/regex.rs index 9057f59bc8..42135781d7 100644 --- a/query/src/func/regex.rs +++ b/query/src/func/regex.rs @@ -26,7 +26,7 @@ pub const REGEX_NOT_MATCH_UDF_NAME: &str = "RegexMatch"; /// This UDF is designed to support the regex operator that can be pushed down /// via the InfluxRPC API. /// -pub(crate) fn regex_match_expr(input: Expr, pattern: String, matches: bool) -> Expr { +pub fn regex_match_expr(input: Expr, pattern: String, matches: bool) -> Expr { // N.B., this function does not utilise the Arrow regexp compute kernel because // in order to act as a filter it needs to return a boolean array of comparison // results, not an array of strings as the regex compute kernel does. diff --git a/src/influxdb_ioxd/rpc/storage/expr.rs b/src/influxdb_ioxd/rpc/storage/expr.rs index 5546d43977..1631d79082 100644 --- a/src/influxdb_ioxd/rpc/storage/expr.rs +++ b/src/influxdb_ioxd/rpc/storage/expr.rs @@ -10,6 +10,7 @@ use std::{convert::TryFrom, fmt}; use datafusion::{ logical_plan::{binary_expr, Expr, Operator}, prelude::*, + scalar::ScalarValue, }; use generated_types::{ aggregate::AggregateType as RPCAggregateType, node::Comparison as RPCComparison, @@ -20,9 +21,10 @@ use generated_types::{ use super::{TAG_KEY_FIELD, TAG_KEY_MEASUREMENT}; use observability_deps::tracing::warn; +use query::func::regex; use query::group_by::{Aggregate as QueryAggregate, WindowDuration}; use query::predicate::PredicateBuilder; -use snafu::{ResultExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; #[derive(Debug, Snafu)] pub enum Error { @@ -66,19 +68,14 @@ pub enum Error { #[snafu(display("Internal error: found field tag reference in unexpected location"))] InternalInvalidFieldReference {}, - #[snafu(display( - "Error creating predicate: Regular expression predicates are not supported: {}", - regexp - ))] - RegExpLiteralNotSupported { regexp: String }, + #[snafu(display("Invalid regex pattern"))] + RegExpPatternInvalid {}, - #[snafu(display("Error creating predicate: Regular expression predicates are not supported"))] - RegExpNotSupported {}, + #[snafu(display("Internal error: regex expression input not expected"))] + InternalInvalidRegexExprReference {}, - #[snafu(display( - "Error creating predicate: Not Regular expression predicates are not supported" - ))] - NotRegExpNotSupported {}, + #[snafu(display("Internal error: incorrect number of nodes: {:?}", num_children))] + InternalInvalidRegexExprChildren { num_children: usize }, #[snafu(display("Error creating predicate: StartsWith comparisons not supported"))] StartsWithNotSupported {}, @@ -452,7 +449,7 @@ fn build_node(value: RPCValue, inputs: Vec) -> Result { RPCValue::IntValue(v) => Ok(lit(v)), RPCValue::UintValue(v) => Ok(lit(v)), RPCValue::FloatValue(f) => Ok(lit(f)), - RPCValue::RegexValue(regexp) => RegExpLiteralNotSupported { regexp }.fail(), + RPCValue::RegexValue(pattern) => Ok(lit(pattern)), RPCValue::TagRefValue(tag_name) => Ok(col(&make_tag_name(tag_name)?)), RPCValue::FieldRefValue(field_name) => Ok(col(&field_name)), RPCValue::Logical(logical) => build_logical_node(logical, inputs), @@ -471,7 +468,7 @@ fn build_logical_node(logical: i32, inputs: Vec) -> Result { } } -/// Creates an expr from a "Comparsion" Node +/// Creates an expr from a "Comparison" Node fn build_comparison_node(comparison: i32, inputs: Vec) -> Result { let comparison_enum = RPCComparison::from_i32(comparison); @@ -479,8 +476,8 @@ fn build_comparison_node(comparison: i32, inputs: Vec) -> Result { Some(RPCComparison::Equal) => build_binary_expr(Operator::Eq, inputs), Some(RPCComparison::NotEqual) => build_binary_expr(Operator::NotEq, inputs), Some(RPCComparison::StartsWith) => StartsWithNotSupported {}.fail(), - Some(RPCComparison::Regex) => RegExpNotSupported {}.fail(), - Some(RPCComparison::NotRegex) => NotRegExpNotSupported {}.fail(), + Some(RPCComparison::Regex) => build_regex_match_expr(true, inputs), + Some(RPCComparison::NotRegex) => build_regex_match_expr(false, inputs), Some(RPCComparison::Lt) => build_binary_expr(Operator::Lt, inputs), Some(RPCComparison::Lte) => build_binary_expr(Operator::LtEq, inputs), Some(RPCComparison::Gt) => build_binary_expr(Operator::Gt, inputs), @@ -505,6 +502,24 @@ fn build_binary_expr(op: Operator, inputs: Vec) -> Result { } } +// Creates a DataFusion ScalarUDF expression that performs a regex matching +// operation. +fn build_regex_match_expr(matches: bool, mut inputs: Vec) -> Result { + let num_children = inputs.len(); + match num_children { + 2 => { + let pattern = if let Expr::Literal(ScalarValue::Utf8(pattern)) = inputs.remove(1) { + pattern.context(RegExpPatternInvalid)? + } else { + return InternalInvalidRegexExprReference.fail(); + }; + + Ok(regex::regex_match_expr(inputs.remove(0), pattern, matches)) + } + _ => InternalInvalidRegexExprChildren { num_children }.fail(), + } +} + pub fn make_read_group_aggregate( aggregate: Option, group: RPCGroup, diff --git a/tests/end_to_end_cases/storage_api.rs b/tests/end_to_end_cases/storage_api.rs index 0df505a404..196c66b914 100644 --- a/tests/end_to_end_cases/storage_api.rs +++ b/tests/end_to_end_cases/storage_api.rs @@ -294,6 +294,47 @@ async fn measurement_fields_endpoint( assert_eq!(field.timestamp, scenario.ns_since_epoch() + 4); } +#[tokio::test] +pub async fn regex_operator_test() { + let fixture = ServerFixture::create_shared().await; + let mut management = fixture.management_client(); + let mut storage_client = StorageClient::new(fixture.grpc_channel()); + let influxdb2 = fixture.influxdb2_client(); + + let scenario = Scenario::new(); + scenario.create_database(&mut management).await; + + load_read_group_data(&influxdb2, &scenario).await; + + let read_source = scenario.read_source(); + + // read_group(group_keys: region, agg: None) + let read_filter_request = ReadFilterRequest { + read_source: read_source.clone(), + range: Some(TimestampRange { + start: 0, + end: 2001, // include all data + }), + predicate: Some(make_regex_match_predicate("host", "^b.+")), + }; + + let expected_frames = vec![ + "SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu1,host=bar, type: 0", + "FloatPointsFrame, timestamps: [1000, 2000], values: \"20,21\"", + "SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu1,host=bar, type: 0", + "FloatPointsFrame, timestamps: [1000, 2000], values: \"81,82\"", + "SeriesFrame, tags: _field=usage_system,_measurement=cpu,cpu=cpu2,host=bar, type: 0", + "FloatPointsFrame, timestamps: [1000, 2000], values: \"40,41\"", + "SeriesFrame, tags: _field=usage_user,_measurement=cpu,cpu=cpu2,host=bar, type: 0", + "FloatPointsFrame, timestamps: [1000, 2000], values: \"51,52\"", + ]; + + assert_eq!( + do_read_filter_request(&mut storage_client, read_filter_request).await, + expected_frames, + ); +} + #[tokio::test] pub async fn read_group_test() { let fixture = ServerFixture::create_shared().await; @@ -654,6 +695,41 @@ fn make_tag_predicate(tag_name: impl Into, tag_value: impl Into) } } +// Create a predicate representing tag_name ~= /pattern/ +// +// The constitution of this request was formed by looking at a real request +// made to storage, which looked like this: +// +// root:< +// node_type:COMPARISON_EXPRESSION +// children: +// children: +// comparison:REGEX +// > +fn make_regex_match_predicate( + tag_key_name: impl Into, + pattern: impl Into, +) -> Predicate { + Predicate { + root: Some(Node { + node_type: NodeType::ComparisonExpression as i32, + children: vec![ + Node { + node_type: NodeType::TagRef as i32, + children: vec![], + value: Some(Value::TagRefValue(tag_key_name.into().into())), + }, + Node { + node_type: NodeType::Literal as i32, + children: vec![], + value: Some(Value::RegexValue(pattern.into())), + }, + ], + value: Some(Value::Comparison(Comparison::Regex as _)), + }), + } +} + /// Create a predicate representing _f=field_name in the horrible gRPC structs fn make_field_predicate(field_name: impl Into) -> Predicate { Predicate { @@ -676,6 +752,33 @@ fn make_field_predicate(field_name: impl Into) -> Predicate { } } +/// Make a read_group request and returns the results in a comparable format +async fn do_read_filter_request( + storage_client: &mut StorageClient, + request: ReadFilterRequest, +) -> Vec { + let request = tonic::Request::new(request); + + let read_filter_response = storage_client + .read_filter(request) + .await + .expect("successful read_filter call"); + + let responses: Vec<_> = read_filter_response + .into_inner() + .try_collect() + .await + .unwrap(); + + let frames: Vec<_> = responses + .into_iter() + .flat_map(|r| r.frames) + .flat_map(|f| f.data) + .collect(); + + dump_data_frames(&frames) +} + /// Make a read_group request and returns the results in a comparable format async fn do_read_group_request( storage_client: &mut StorageClient,