feat: Implement IOx Function Registry for regex_match/regex_not_match (#4431)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-04-27 06:33:19 -04:00 committed by GitHub
parent b01a5bb0a9
commit f0ceca985b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 217 additions and 62 deletions

1
Cargo.lock generated
View File

@ -4623,6 +4623,7 @@ dependencies = [
"chrono",
"datafusion 0.1.0",
"itertools",
"lazy_static",
"observability_deps",
"regex",
"regex-syntax",

View File

@ -327,18 +327,16 @@ impl PredicateBuilder {
/// Builds a regex matching expression from the provided column name and
/// pattern. Values not matching the regex will be filtered out.
pub fn build_regex_match_expr(self, column: &str, pattern: impl Into<String>) -> Self {
self.regex_match_expr(column, pattern, true)
pub fn build_regex_match_expr(mut self, column: &str, pattern: impl Into<String>) -> Self {
let expr = query_functions::regex_match_expr(col(column), pattern.into());
self.inner.exprs.push(expr);
self
}
/// Builds a regex "not matching" expression from the provided column name
/// and pattern. Values *matching* the regex will be filtered out.
pub fn build_regex_not_match_expr(self, column: &str, pattern: impl Into<String>) -> Self {
self.regex_match_expr(column, pattern, false)
}
fn regex_match_expr(mut self, column: &str, pattern: impl Into<String>, matches: bool) -> Self {
let expr = query_functions::regex::regex_match_expr(col(column), pattern.into(), matches);
pub fn build_regex_not_match_expr(mut self, column: &str, pattern: impl Into<String>) -> Self {
let expr = query_functions::regex_not_match_expr(col(column), pattern.into());
self.inner.exprs.push(expr);
self
}

View File

@ -10,6 +10,7 @@ arrow = { version = "12", features = ["prettyprint"] }
chrono = { version = "0.4", default-features = false }
datafusion = { path = "../datafusion" }
itertools = "0.10.2"
lazy_static = "1.4.0"
observability_deps = { path = "../observability_deps" }
regex = "1"
regex-syntax = "0.6.25"

View File

@ -8,13 +8,54 @@
clippy::use_self,
clippy::clone_on_ref_ptr
)]
use datafusion::{
logical_plan::{Expr, FunctionRegistry},
prelude::lit,
};
/// Grouping by structs
pub mod group_by;
/// Regular Expressions
pub mod regex;
mod regex;
/// Flux selector expressions
pub mod selectors;
/// Time window and groupin
pub mod window;
/// Function registry
mod registry;
/// Return an Expr that invokes a InfluxRPC compatible regex match to
/// determine which values satisfy the pattern. Equivalent to:
///
/// ```text
/// col ~= /pattern/
/// ```
pub fn regex_match_expr(input: Expr, pattern: String) -> Expr {
registry()
.udf(regex::REGEX_MATCH_UDF_NAME)
.expect("RegexMatch function not registered")
.call(vec![input, lit(pattern)])
}
/// Return an Expr that invokes a InfluxRPC compatible regex match to
/// determine which values do not satisfy the pattern. Equivalent to:
///
/// ```text
/// col !~ /pattern/
/// ```
pub fn regex_not_match_expr(input: Expr, pattern: String) -> Expr {
registry()
.udf(regex::REGEX_NOT_MATCH_UDF_NAME)
.expect("NotRegexMatch function not registered")
.call(vec![input, lit(pattern)])
}
/// Return an [`FunctionRegistry`] with the implementations of IOx UDFs
pub fn registry() -> &'static dyn FunctionRegistry {
registry::instance()
}

View File

@ -1,21 +1,48 @@
use std::sync::Arc;
use arrow::{
array::{ArrayRef, BooleanArray, StringArray},
array::{as_string_array, ArrayRef, BooleanArray},
datatypes::DataType,
};
use datafusion::{
error::DataFusionError,
logical_expr::Volatility,
logical_plan::{create_udf, Expr},
physical_plan::functions::make_scalar_function,
logical_expr::{ScalarFunctionImplementation, ScalarUDF, Volatility},
logical_plan::create_udf,
physical_plan::ColumnarValue,
scalar::ScalarValue,
};
/// The name of the regex_match UDF given to DataFusion.
pub const REGEX_MATCH_UDF_NAME: &str = "RegexMatch";
pub(crate) const REGEX_MATCH_UDF_NAME: &str = "RegexMatch";
/// The name of the not_regex_match UDF given to DataFusion.
pub const REGEX_NOT_MATCH_UDF_NAME: &str = "RegexNotMatch";
pub(crate) const REGEX_NOT_MATCH_UDF_NAME: &str = "RegexNotMatch";
lazy_static::lazy_static! {
pub(crate) static ref REGEX_MATCH_UDF: Arc<ScalarUDF> = Arc::new(
create_udf(
REGEX_MATCH_UDF_NAME,
// takes two arguments: regex, pattern
vec![DataType::Utf8, DataType::Utf8],
Arc::new(DataType::Boolean),
Volatility::Stable,
regex_match_expr_impl(true),
)
);
}
lazy_static::lazy_static! {
pub(crate) static ref REGEX_NOT_MATCH_UDF: Arc<ScalarUDF> = Arc::new(
create_udf(
REGEX_NOT_MATCH_UDF_NAME,
// takes two arguments: regex, pattern
vec![DataType::Utf8, DataType::Utf8],
Arc::new(DataType::Boolean),
Volatility::Stable,
regex_match_expr_impl(false),
)
);
}
/// Given a column containing string values and a single regex pattern,
/// `regex_match_expr` determines which values satisfy the pattern and which do
@ -29,55 +56,74 @@ pub const REGEX_NOT_MATCH_UDF_NAME: &str = "RegexNotMatch";
/// This UDF is designed to support the regex operator that can be pushed down
/// via the InfluxRPC API.
///
pub fn regex_match_expr(input: Expr, pattern: String, matches: bool) -> Expr {
// N.B., this function does not utilise the Arrow regexp compute kernel because
// in order to act as a filter it needs to return a boolean array of comparison
// results, not an array of strings as the regex compute kernel does.
fn regex_match_expr_impl(matches: bool) -> ScalarFunctionImplementation {
// N.B., this function does not utilise the Arrow regexp compute
// kernel because in order to act as a filter it needs to return a
// boolean array of comparison results, not an array of strings as
// the regex compute kernel does and it needs to implement the
// regexp syntax for influxrpc.
// Attempt to make the pattern compatible with what is accepted by
// the golang regexp library which is different than Rust's regexp
let pattern = clean_non_meta_escapes(pattern);
let func = move |args: &[ColumnarValue]| {
assert_eq!(args.len(), 2); // only works over a single column and pattern at a time.
let func = move |args: &[ArrayRef]| {
assert_eq!(args.len(), 1); // only works over a single column at a time.
let pattern = match &args[1] {
// second arg was array (not constant)
ColumnarValue::Array(_) => {
return Err(DataFusionError::NotImplemented(format!(
"regex_match({}) with non scalar patterns not yet implemented",
matches
)))
}
ColumnarValue::Scalar(ScalarValue::Utf8(pattern)) => pattern,
ColumnarValue::Scalar(arg) => {
return Err(DataFusionError::Internal(format!(
"Expected string pattern to regex match({}), got: {:?}",
matches, arg
)))
}
};
let input_arr = &args[0].as_any().downcast_ref::<StringArray>().unwrap();
let pattern = pattern.as_ref().ok_or_else(|| {
DataFusionError::NotImplemented(
"NULL patterns not supported in regex match".to_string(),
)
})?;
// Attempt to make the pattern compatible with what is accepted by
// the golang regexp library which is different than Rust's regexp
let pattern = clean_non_meta_escapes(pattern);
let pattern = regex::Regex::new(&pattern).map_err(|e| {
DataFusionError::Internal(format!("error compiling regex pattern: {}", e))
})?;
let results = input_arr
.iter()
.map(|row| {
// in arrow, any value can be null.
// Here we decide to make our UDF to return null when either base or exponent is null.
row.map(|v| pattern.is_match(v) == matches)
})
.collect::<BooleanArray>();
match &args[0] {
ColumnarValue::Array(arr) => {
let results = as_string_array(arr)
.iter()
.map(|row| {
// in arrow, any value can be null.
// Here we decide to make our UDF to return null when either base or exponent is null.
row.map(|v| pattern.is_match(v) == matches)
})
.collect::<BooleanArray>();
Ok(Arc::new(results) as ArrayRef)
Ok(ColumnarValue::Array(Arc::new(results) as ArrayRef))
}
ColumnarValue::Scalar(ScalarValue::Utf8(row)) => {
let res = row.as_ref().map(|v| pattern.is_match(v) == matches);
Ok(ColumnarValue::Scalar(ScalarValue::Boolean(res)))
}
ColumnarValue::Scalar(v) => {
return Err(DataFusionError::Internal(format!(
"regex_match({}) expected first argument to be utf8, got ('{}')",
matches, v
)))
}
}
};
// make_scalar_function is a helper to support accepting scalar values as
// well as arrays.
let func = make_scalar_function(func);
let udf_name = if matches {
REGEX_MATCH_UDF_NAME
} else {
REGEX_NOT_MATCH_UDF_NAME
};
let udf = create_udf(
udf_name,
vec![DataType::Utf8],
Arc::new(DataType::Boolean),
Volatility::Stable,
func,
);
udf.call(vec![input])
Arc::new(func)
}
fn is_valid_character_after_escape(c: char) -> bool {
@ -104,9 +150,9 @@ fn is_valid_character_after_escape(c: char) -> bool {
/// golang, used by the influx storage rpc.
///
/// See <https://github.com/rust-lang/regex/issues/501> for more details
fn clean_non_meta_escapes(pattern: String) -> String {
fn clean_non_meta_escapes(pattern: &str) -> String {
if pattern.is_empty() {
return pattern;
return pattern.to_string();
}
#[derive(Debug, Copy, Clone)]
@ -165,11 +211,11 @@ mod test {
datasource::MemTable,
error::DataFusionError,
logical_plan::{col, Expr},
prelude::SessionContext,
prelude::{lit, SessionContext},
};
use std::sync::Arc;
use super::clean_non_meta_escapes;
use super::*;
#[tokio::test]
async fn regex_match_expr() {
@ -235,7 +281,14 @@ mod test {
];
for (pattern, matches, expected) in cases.into_iter() {
let regex_expr = super::regex_match_expr(col("words"), pattern.to_string(), matches);
let args = vec![col("words"), lit(pattern)];
let regex_expr = if matches {
REGEX_MATCH_UDF.call(args)
} else {
REGEX_NOT_MATCH_UDF.call(args)
};
let actual = run_plan(regex_expr).await.unwrap();
assert_eq!(
@ -249,7 +302,7 @@ mod test {
#[tokio::test]
async fn regex_match_expr_invalid_regex() {
// an invalid regex pattern
let regex_expr = super::regex_match_expr(col("words"), "[".to_string(), true);
let regex_expr = crate::regex_match_expr(col("words"), "[".to_string());
let actual = run_plan(regex_expr).await.expect_err("expected error");
assert!(actual.to_string().contains("error compiling regex pattern"))
@ -328,7 +381,7 @@ mod test {
];
for (pattern, expected) in cases {
let cleaned_pattern = clean_non_meta_escapes(pattern.to_string());
let cleaned_pattern = clean_non_meta_escapes(pattern);
assert_eq!(
cleaned_pattern, expected,
"Expected '{}' to be cleaned to '{}', got '{}'",

View File

@ -0,0 +1,55 @@
use std::{collections::HashSet, sync::Arc};
use datafusion::{
common::{DataFusionError, Result as DataFusionResult},
logical_expr::{AggregateUDF, ScalarUDF},
logical_plan::FunctionRegistry,
};
use crate::regex;
lazy_static::lazy_static! {
static ref REGISTRY: IOxFunctionRegistry = IOxFunctionRegistry::new();
}
/// Lookup for all DataFusion User Defined Functions used by IOx
#[derive(Debug)]
pub(crate) struct IOxFunctionRegistry {}
impl IOxFunctionRegistry {
fn new() -> Self {
Self {}
}
}
impl FunctionRegistry for IOxFunctionRegistry {
fn udfs(&self) -> HashSet<String> {
[regex::REGEX_MATCH_UDF_NAME, regex::REGEX_NOT_MATCH_UDF_NAME]
.into_iter()
.map(|s| s.to_string())
.collect()
}
fn udf(&self, name: &str) -> DataFusionResult<Arc<ScalarUDF>> {
match name {
regex::REGEX_MATCH_UDF_NAME => Ok(regex::REGEX_MATCH_UDF.clone()),
regex::REGEX_NOT_MATCH_UDF_NAME => Ok(regex::REGEX_NOT_MATCH_UDF.clone()),
_ => Err(DataFusionError::Plan(format!(
"IOx FunctionRegistry does not contain function '{}'",
name
))),
}
}
fn udaf(&self, name: &str) -> DataFusionResult<Arc<AggregateUDF>> {
Err(DataFusionError::Plan(format!(
"IOx FunctionRegistry does not contain user defined aggregate function '{}'",
name
)))
}
}
/// Return a reference to the global function registry
pub(crate) fn instance() -> &'static IOxFunctionRegistry {
&REGISTRY
}

View File

@ -29,7 +29,6 @@ use predicate::{
PredicateBuilder,
};
use query::{Aggregate as QueryAggregate, WindowDuration};
use query_functions::regex::regex_match_expr;
use snafu::{OptionExt, ResultExt, Snafu};
#[derive(Debug, Snafu)]
@ -590,7 +589,14 @@ fn build_regex_match_expr(matches: bool, mut inputs: Vec<Expr>) -> Result<Expr>
return InternalInvalidRegexExprReferenceSnafu.fail();
};
Ok(regex_match_expr(inputs.remove(0), pattern, matches))
if matches {
Ok(query_functions::regex_match_expr(inputs.remove(0), pattern))
} else {
Ok(query_functions::regex_not_match_expr(
inputs.remove(0),
pattern,
))
}
}
_ => InternalInvalidRegexExprChildrenSnafu { num_children }.fail(),
}