diff --git a/Cargo.lock b/Cargo.lock index b32c8efb57..bd6c09e084 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4623,6 +4623,7 @@ dependencies = [ "chrono", "datafusion 0.1.0", "itertools", + "lazy_static", "observability_deps", "regex", "regex-syntax", diff --git a/predicate/src/lib.rs b/predicate/src/lib.rs index d1648b7b05..a64cc39572 100644 --- a/predicate/src/lib.rs +++ b/predicate/src/lib.rs @@ -327,18 +327,16 @@ impl PredicateBuilder { /// Builds a regex matching expression from the provided column name and /// pattern. Values not matching the regex will be filtered out. - pub fn build_regex_match_expr(self, column: &str, pattern: impl Into) -> Self { - self.regex_match_expr(column, pattern, true) + pub fn build_regex_match_expr(mut self, column: &str, pattern: impl Into) -> Self { + let expr = query_functions::regex_match_expr(col(column), pattern.into()); + self.inner.exprs.push(expr); + self } /// Builds a regex "not matching" expression from the provided column name /// and pattern. Values *matching* the regex will be filtered out. - pub fn build_regex_not_match_expr(self, column: &str, pattern: impl Into) -> Self { - self.regex_match_expr(column, pattern, false) - } - - fn regex_match_expr(mut self, column: &str, pattern: impl Into, matches: bool) -> Self { - let expr = query_functions::regex::regex_match_expr(col(column), pattern.into(), matches); + pub fn build_regex_not_match_expr(mut self, column: &str, pattern: impl Into) -> Self { + let expr = query_functions::regex_not_match_expr(col(column), pattern.into()); self.inner.exprs.push(expr); self } diff --git a/query_functions/Cargo.toml b/query_functions/Cargo.toml index 6d6ff8923f..4ab91d2eea 100644 --- a/query_functions/Cargo.toml +++ b/query_functions/Cargo.toml @@ -10,6 +10,7 @@ arrow = { version = "12", features = ["prettyprint"] } chrono = { version = "0.4", default-features = false } datafusion = { path = "../datafusion" } itertools = "0.10.2" +lazy_static = "1.4.0" observability_deps = { path = "../observability_deps" } regex = "1" regex-syntax = "0.6.25" diff --git a/query_functions/src/lib.rs b/query_functions/src/lib.rs index 883953b73e..82f079acc4 100644 --- a/query_functions/src/lib.rs +++ b/query_functions/src/lib.rs @@ -8,13 +8,54 @@ clippy::use_self, clippy::clone_on_ref_ptr )] + +use datafusion::{ + logical_plan::{Expr, FunctionRegistry}, + prelude::lit, +}; + +/// Grouping by structs pub mod group_by; /// Regular Expressions -pub mod regex; +mod regex; /// Flux selector expressions pub mod selectors; /// Time window and groupin pub mod window; + +/// Function registry +mod registry; + +/// Return an Expr that invokes a InfluxRPC compatible regex match to +/// determine which values satisfy the pattern. Equivalent to: +/// +/// ```text +/// col ~= /pattern/ +/// ``` +pub fn regex_match_expr(input: Expr, pattern: String) -> Expr { + registry() + .udf(regex::REGEX_MATCH_UDF_NAME) + .expect("RegexMatch function not registered") + .call(vec![input, lit(pattern)]) +} + +/// Return an Expr that invokes a InfluxRPC compatible regex match to +/// determine which values do not satisfy the pattern. Equivalent to: +/// +/// ```text +/// col !~ /pattern/ +/// ``` +pub fn regex_not_match_expr(input: Expr, pattern: String) -> Expr { + registry() + .udf(regex::REGEX_NOT_MATCH_UDF_NAME) + .expect("NotRegexMatch function not registered") + .call(vec![input, lit(pattern)]) +} + +/// Return an [`FunctionRegistry`] with the implementations of IOx UDFs +pub fn registry() -> &'static dyn FunctionRegistry { + registry::instance() +} diff --git a/query_functions/src/regex.rs b/query_functions/src/regex.rs index 53f79198a8..566422062c 100644 --- a/query_functions/src/regex.rs +++ b/query_functions/src/regex.rs @@ -1,21 +1,48 @@ use std::sync::Arc; use arrow::{ - array::{ArrayRef, BooleanArray, StringArray}, + array::{as_string_array, ArrayRef, BooleanArray}, datatypes::DataType, }; use datafusion::{ error::DataFusionError, - logical_expr::Volatility, - logical_plan::{create_udf, Expr}, - physical_plan::functions::make_scalar_function, + logical_expr::{ScalarFunctionImplementation, ScalarUDF, Volatility}, + logical_plan::create_udf, + physical_plan::ColumnarValue, + scalar::ScalarValue, }; /// The name of the regex_match UDF given to DataFusion. -pub const REGEX_MATCH_UDF_NAME: &str = "RegexMatch"; +pub(crate) const REGEX_MATCH_UDF_NAME: &str = "RegexMatch"; /// The name of the not_regex_match UDF given to DataFusion. -pub const REGEX_NOT_MATCH_UDF_NAME: &str = "RegexNotMatch"; +pub(crate) const REGEX_NOT_MATCH_UDF_NAME: &str = "RegexNotMatch"; + +lazy_static::lazy_static! { + pub(crate) static ref REGEX_MATCH_UDF: Arc = Arc::new( + create_udf( + REGEX_MATCH_UDF_NAME, + // takes two arguments: regex, pattern + vec![DataType::Utf8, DataType::Utf8], + Arc::new(DataType::Boolean), + Volatility::Stable, + regex_match_expr_impl(true), + ) + ); +} + +lazy_static::lazy_static! { + pub(crate) static ref REGEX_NOT_MATCH_UDF: Arc = Arc::new( + create_udf( + REGEX_NOT_MATCH_UDF_NAME, + // takes two arguments: regex, pattern + vec![DataType::Utf8, DataType::Utf8], + Arc::new(DataType::Boolean), + Volatility::Stable, + regex_match_expr_impl(false), + ) + ); +} /// Given a column containing string values and a single regex pattern, /// `regex_match_expr` determines which values satisfy the pattern and which do @@ -29,55 +56,74 @@ pub const REGEX_NOT_MATCH_UDF_NAME: &str = "RegexNotMatch"; /// This UDF is designed to support the regex operator that can be pushed down /// via the InfluxRPC API. /// -pub fn regex_match_expr(input: Expr, pattern: String, matches: bool) -> Expr { - // N.B., this function does not utilise the Arrow regexp compute kernel because - // in order to act as a filter it needs to return a boolean array of comparison - // results, not an array of strings as the regex compute kernel does. +fn regex_match_expr_impl(matches: bool) -> ScalarFunctionImplementation { + // N.B., this function does not utilise the Arrow regexp compute + // kernel because in order to act as a filter it needs to return a + // boolean array of comparison results, not an array of strings as + // the regex compute kernel does and it needs to implement the + // regexp syntax for influxrpc. - // Attempt to make the pattern compatible with what is accepted by - // the golang regexp library which is different than Rust's regexp - let pattern = clean_non_meta_escapes(pattern); + let func = move |args: &[ColumnarValue]| { + assert_eq!(args.len(), 2); // only works over a single column and pattern at a time. - let func = move |args: &[ArrayRef]| { - assert_eq!(args.len(), 1); // only works over a single column at a time. + let pattern = match &args[1] { + // second arg was array (not constant) + ColumnarValue::Array(_) => { + return Err(DataFusionError::NotImplemented(format!( + "regex_match({}) with non scalar patterns not yet implemented", + matches + ))) + } + ColumnarValue::Scalar(ScalarValue::Utf8(pattern)) => pattern, + ColumnarValue::Scalar(arg) => { + return Err(DataFusionError::Internal(format!( + "Expected string pattern to regex match({}), got: {:?}", + matches, arg + ))) + } + }; - let input_arr = &args[0].as_any().downcast_ref::().unwrap(); + let pattern = pattern.as_ref().ok_or_else(|| { + DataFusionError::NotImplemented( + "NULL patterns not supported in regex match".to_string(), + ) + })?; + + // Attempt to make the pattern compatible with what is accepted by + // the golang regexp library which is different than Rust's regexp + let pattern = clean_non_meta_escapes(pattern); let pattern = regex::Regex::new(&pattern).map_err(|e| { DataFusionError::Internal(format!("error compiling regex pattern: {}", e)) })?; - let results = input_arr - .iter() - .map(|row| { - // in arrow, any value can be null. - // Here we decide to make our UDF to return null when either base or exponent is null. - row.map(|v| pattern.is_match(v) == matches) - }) - .collect::(); + match &args[0] { + ColumnarValue::Array(arr) => { + let results = as_string_array(arr) + .iter() + .map(|row| { + // in arrow, any value can be null. + // Here we decide to make our UDF to return null when either base or exponent is null. + row.map(|v| pattern.is_match(v) == matches) + }) + .collect::(); - Ok(Arc::new(results) as ArrayRef) + Ok(ColumnarValue::Array(Arc::new(results) as ArrayRef)) + } + ColumnarValue::Scalar(ScalarValue::Utf8(row)) => { + let res = row.as_ref().map(|v| pattern.is_match(v) == matches); + Ok(ColumnarValue::Scalar(ScalarValue::Boolean(res))) + } + ColumnarValue::Scalar(v) => { + return Err(DataFusionError::Internal(format!( + "regex_match({}) expected first argument to be utf8, got ('{}')", + matches, v + ))) + } + } }; - // make_scalar_function is a helper to support accepting scalar values as - // well as arrays. - let func = make_scalar_function(func); - - let udf_name = if matches { - REGEX_MATCH_UDF_NAME - } else { - REGEX_NOT_MATCH_UDF_NAME - }; - - let udf = create_udf( - udf_name, - vec![DataType::Utf8], - Arc::new(DataType::Boolean), - Volatility::Stable, - func, - ); - - udf.call(vec![input]) + Arc::new(func) } fn is_valid_character_after_escape(c: char) -> bool { @@ -104,9 +150,9 @@ fn is_valid_character_after_escape(c: char) -> bool { /// golang, used by the influx storage rpc. /// /// See for more details -fn clean_non_meta_escapes(pattern: String) -> String { +fn clean_non_meta_escapes(pattern: &str) -> String { if pattern.is_empty() { - return pattern; + return pattern.to_string(); } #[derive(Debug, Copy, Clone)] @@ -165,11 +211,11 @@ mod test { datasource::MemTable, error::DataFusionError, logical_plan::{col, Expr}, - prelude::SessionContext, + prelude::{lit, SessionContext}, }; use std::sync::Arc; - use super::clean_non_meta_escapes; + use super::*; #[tokio::test] async fn regex_match_expr() { @@ -235,7 +281,14 @@ mod test { ]; for (pattern, matches, expected) in cases.into_iter() { - let regex_expr = super::regex_match_expr(col("words"), pattern.to_string(), matches); + let args = vec![col("words"), lit(pattern)]; + + let regex_expr = if matches { + REGEX_MATCH_UDF.call(args) + } else { + REGEX_NOT_MATCH_UDF.call(args) + }; + let actual = run_plan(regex_expr).await.unwrap(); assert_eq!( @@ -249,7 +302,7 @@ mod test { #[tokio::test] async fn regex_match_expr_invalid_regex() { // an invalid regex pattern - let regex_expr = super::regex_match_expr(col("words"), "[".to_string(), true); + let regex_expr = crate::regex_match_expr(col("words"), "[".to_string()); let actual = run_plan(regex_expr).await.expect_err("expected error"); assert!(actual.to_string().contains("error compiling regex pattern")) @@ -328,7 +381,7 @@ mod test { ]; for (pattern, expected) in cases { - let cleaned_pattern = clean_non_meta_escapes(pattern.to_string()); + let cleaned_pattern = clean_non_meta_escapes(pattern); assert_eq!( cleaned_pattern, expected, "Expected '{}' to be cleaned to '{}', got '{}'", diff --git a/query_functions/src/registry.rs b/query_functions/src/registry.rs new file mode 100644 index 0000000000..1f8c25a95a --- /dev/null +++ b/query_functions/src/registry.rs @@ -0,0 +1,55 @@ +use std::{collections::HashSet, sync::Arc}; + +use datafusion::{ + common::{DataFusionError, Result as DataFusionResult}, + logical_expr::{AggregateUDF, ScalarUDF}, + logical_plan::FunctionRegistry, +}; + +use crate::regex; + +lazy_static::lazy_static! { + static ref REGISTRY: IOxFunctionRegistry = IOxFunctionRegistry::new(); +} + +/// Lookup for all DataFusion User Defined Functions used by IOx +#[derive(Debug)] +pub(crate) struct IOxFunctionRegistry {} + +impl IOxFunctionRegistry { + fn new() -> Self { + Self {} + } +} + +impl FunctionRegistry for IOxFunctionRegistry { + fn udfs(&self) -> HashSet { + [regex::REGEX_MATCH_UDF_NAME, regex::REGEX_NOT_MATCH_UDF_NAME] + .into_iter() + .map(|s| s.to_string()) + .collect() + } + + fn udf(&self, name: &str) -> DataFusionResult> { + match name { + regex::REGEX_MATCH_UDF_NAME => Ok(regex::REGEX_MATCH_UDF.clone()), + regex::REGEX_NOT_MATCH_UDF_NAME => Ok(regex::REGEX_NOT_MATCH_UDF.clone()), + _ => Err(DataFusionError::Plan(format!( + "IOx FunctionRegistry does not contain function '{}'", + name + ))), + } + } + + fn udaf(&self, name: &str) -> DataFusionResult> { + Err(DataFusionError::Plan(format!( + "IOx FunctionRegistry does not contain user defined aggregate function '{}'", + name + ))) + } +} + +/// Return a reference to the global function registry +pub(crate) fn instance() -> &'static IOxFunctionRegistry { + ®ISTRY +} diff --git a/service_grpc_influxrpc/src/expr.rs b/service_grpc_influxrpc/src/expr.rs index 4f98798687..18767b9a60 100644 --- a/service_grpc_influxrpc/src/expr.rs +++ b/service_grpc_influxrpc/src/expr.rs @@ -29,7 +29,6 @@ use predicate::{ PredicateBuilder, }; use query::{Aggregate as QueryAggregate, WindowDuration}; -use query_functions::regex::regex_match_expr; use snafu::{OptionExt, ResultExt, Snafu}; #[derive(Debug, Snafu)] @@ -590,7 +589,14 @@ fn build_regex_match_expr(matches: bool, mut inputs: Vec) -> Result return InternalInvalidRegexExprReferenceSnafu.fail(); }; - Ok(regex_match_expr(inputs.remove(0), pattern, matches)) + if matches { + Ok(query_functions::regex_match_expr(inputs.remove(0), pattern)) + } else { + Ok(query_functions::regex_not_match_expr( + inputs.remove(0), + pattern, + )) + } } _ => InternalInvalidRegexExprChildrenSnafu { num_children }.fail(), }