feat: lower Influx regex expressions to DF regex expressions (#6394)
* feat: lower Influx regex experessions to DF regex expressions For #6388. * refactor: address review commentspull/24376/head
parent
6324707110
commit
a5d693eba2
|
@ -19,6 +19,7 @@ use crate::{
|
|||
split::StreamSplitExec,
|
||||
stringset::{IntoStringSet, StringSetRef},
|
||||
},
|
||||
logical_optimizer::iox_optimizer,
|
||||
plan::{
|
||||
fieldlist::FieldListPlan,
|
||||
seriesset::{SeriesSetPlan, SeriesSetPlans},
|
||||
|
@ -212,7 +213,8 @@ impl IOxSessionConfig {
|
|||
.with_query_planner(Arc::new(IOxQueryPlanner {}));
|
||||
|
||||
let state = register_selector_aggregates(state);
|
||||
let state = register_scalar_functions(state);
|
||||
let mut state = register_scalar_functions(state);
|
||||
state.optimizer = iox_optimizer();
|
||||
|
||||
let inner = SessionContext::with_state(state);
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ use std::{any::Any, collections::BTreeSet, fmt::Debug, iter::FromIterator, sync:
|
|||
|
||||
pub mod exec;
|
||||
pub mod frontend;
|
||||
pub mod logical_optimizer;
|
||||
pub mod plan;
|
||||
pub mod provider;
|
||||
pub mod pruning;
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
use datafusion::{
|
||||
common::DFSchema,
|
||||
error::DataFusionError,
|
||||
logical_expr::{expr_rewriter::ExprRewriter, utils::from_plan, LogicalPlan, Operator},
|
||||
optimizer::{utils::rewrite_preserving_name, OptimizerConfig, OptimizerRule},
|
||||
prelude::{binary_expr, lit, Expr},
|
||||
scalar::ScalarValue,
|
||||
};
|
||||
use query_functions::{clean_non_meta_escapes, REGEX_MATCH_UDF_NAME, REGEX_NOT_MATCH_UDF_NAME};
|
||||
|
||||
/// Replaces InfluxDB-specific regex operator with DataFusion regex operator.
|
||||
///
|
||||
/// InfluxDB has a special regex operator that is especially used by Flux/InfluxQL and that excepts certain escape
|
||||
/// sequences that are normal Rust regex crate does NOT support. If the pattern is already known at planning time (i.e.
|
||||
/// it is a constant), then we can clean the escape sequences and just use the ordinary DataFusion regex operator. This
|
||||
/// is desired because the ordinary DataFusion regex operator can be optimized further (e.g. to cheaper `LIKE` expressions).
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct InfluxRegexToDataFusionRegex {}
|
||||
|
||||
impl InfluxRegexToDataFusionRegex {
|
||||
/// Create new optimizer rule.
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
impl OptimizerRule for InfluxRegexToDataFusionRegex {
|
||||
fn optimize(
|
||||
&self,
|
||||
plan: &LogicalPlan,
|
||||
_optimizer_config: &mut OptimizerConfig,
|
||||
) -> Result<LogicalPlan, DataFusionError> {
|
||||
optimize(plan)
|
||||
}
|
||||
|
||||
fn name(&self) -> &str {
|
||||
"influx_regex_to_datafusion_regex"
|
||||
}
|
||||
}
|
||||
|
||||
fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan, DataFusionError> {
|
||||
let new_inputs = plan
|
||||
.inputs()
|
||||
.iter()
|
||||
.map(|input| optimize(input))
|
||||
.collect::<Result<Vec<_>, DataFusionError>>()?;
|
||||
|
||||
let mut schema =
|
||||
new_inputs
|
||||
.iter()
|
||||
.map(|input| input.schema())
|
||||
.fold(DFSchema::empty(), |mut lhs, rhs| {
|
||||
lhs.merge(rhs);
|
||||
lhs
|
||||
});
|
||||
|
||||
schema.merge(plan.schema());
|
||||
|
||||
let mut expr_rewriter = InfluxRegexToDataFusionRegex {};
|
||||
|
||||
let new_exprs = plan
|
||||
.expressions()
|
||||
.into_iter()
|
||||
.map(|expr| rewrite_preserving_name(expr, &mut expr_rewriter))
|
||||
.collect::<Result<Vec<_>, DataFusionError>>()?;
|
||||
|
||||
from_plan(plan, new_exprs.as_slice(), new_inputs.as_slice())
|
||||
}
|
||||
|
||||
impl ExprRewriter for InfluxRegexToDataFusionRegex {
|
||||
fn mutate(&mut self, expr: Expr) -> Result<Expr, DataFusionError> {
|
||||
match expr {
|
||||
Expr::ScalarUDF { fun, mut args } => {
|
||||
if (args.len() == 2)
|
||||
&& ((fun.name == REGEX_MATCH_UDF_NAME)
|
||||
|| (fun.name == REGEX_NOT_MATCH_UDF_NAME))
|
||||
{
|
||||
if let Expr::Literal(ScalarValue::Utf8(Some(s))) = &args[1] {
|
||||
let s = clean_non_meta_escapes(s);
|
||||
let op = match fun.name.as_str() {
|
||||
REGEX_MATCH_UDF_NAME => Operator::RegexMatch,
|
||||
REGEX_NOT_MATCH_UDF_NAME => Operator::RegexNotMatch,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
return Ok(binary_expr(args.remove(0), op, lit(s)));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Expr::ScalarUDF { fun, args })
|
||||
}
|
||||
_ => Ok(expr),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use datafusion::optimizer::{optimizer::Optimizer, OptimizerConfig};
|
||||
|
||||
use self::influx_regex_to_datafusion_regex::InfluxRegexToDataFusionRegex;
|
||||
|
||||
mod influx_regex_to_datafusion_regex;
|
||||
|
||||
/// Create IOx-specific logical [`Optimizer`].
|
||||
///
|
||||
/// This is mostly the default optimizer that DataFusion provides but with some additional passes.
|
||||
pub fn iox_optimizer() -> Optimizer {
|
||||
let mut opt = Optimizer::new(&OptimizerConfig::default());
|
||||
opt.rules
|
||||
.push(Arc::new(InfluxRegexToDataFusionRegex::new()));
|
||||
opt
|
||||
}
|
|
@ -33,6 +33,7 @@ mod window;
|
|||
/// Function registry
|
||||
mod registry;
|
||||
|
||||
pub use crate::regex::clean_non_meta_escapes;
|
||||
pub use crate::regex::REGEX_MATCH_UDF_NAME;
|
||||
pub use crate::regex::REGEX_NOT_MATCH_UDF_NAME;
|
||||
|
||||
|
|
|
@ -147,7 +147,7 @@ fn is_valid_character_after_escape(c: char) -> bool {
|
|||
/// golang, used by the influx storage rpc.
|
||||
///
|
||||
/// See <https://github.com/rust-lang/regex/issues/501> for more details
|
||||
fn clean_non_meta_escapes(pattern: &str) -> String {
|
||||
pub fn clean_non_meta_escapes(pattern: &str) -> String {
|
||||
if pattern.is_empty() {
|
||||
return pattern.to_string();
|
||||
}
|
||||
|
@ -274,6 +274,18 @@ mod test {
|
|||
"+---------------+--------+",
|
||||
],
|
||||
),
|
||||
(
|
||||
"twi",
|
||||
true, // keep the values matched
|
||||
vec![
|
||||
"+---------------+--------+",
|
||||
"| words | length |",
|
||||
"+---------------+--------+",
|
||||
"| aphex twin | 10 |",
|
||||
"| cocteau twins | 13 |",
|
||||
"+---------------+--------+",
|
||||
],
|
||||
),
|
||||
];
|
||||
|
||||
for (pattern, matches, expected) in cases.into_iter() {
|
||||
|
|
|
@ -320,18 +320,18 @@
|
|||
+-------+--------+--------------------------------+---------+
|
||||
-- SQL: EXPLAIN SELECT * from restaurant where influx_regex_match(town, 'foo|bar|baz') and influx_regex_not_match(town, 'one|two');
|
||||
-- Results After Normalizing UUIDs
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
|
||||
| | Filter: influx_regex_match(CAST(restaurant.town AS Utf8)restaurant.town, Utf8("foo|bar|baz")) AND influx_regex_not_match(CAST(restaurant.town AS Utf8)restaurant.town, Utf8("one|two")) |
|
||||
| | Projection: CAST(restaurant.town AS Utf8) AS CAST(restaurant.town AS Utf8)restaurant.town, restaurant.count, restaurant.system, restaurant.time, restaurant.town |
|
||||
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[influx_regex_match(CAST(restaurant.town AS Utf8), Utf8("foo|bar|baz")), influx_regex_not_match(CAST(restaurant.town AS Utf8), Utf8("one|two"))] |
|
||||
| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] |
|
||||
| | CoalesceBatchesExec: target_batch_size=4096 |
|
||||
| | FilterExec: influx_regex_match(CAST(restaurant.town AS Utf8)restaurant.town@0, foo|bar|baz) AND influx_regex_not_match(CAST(restaurant.town AS Utf8)restaurant.town@0, one|two) |
|
||||
| | ProjectionExec: expr=[CAST(town@3 AS Utf8) as CAST(restaurant.town AS Utf8)restaurant.town, count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
|
||||
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
|
||||
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=influx_regex_match(CAST(town AS Utf8), Utf8("foo|bar|baz")) AND influx_regex_not_match(CAST(town AS Utf8), Utf8("one|two")), projection=[count, system, time, town] |
|
||||
| | |
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| plan_type | plan |
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
|
||||
| | Filter: (CAST(restaurant.town AS Utf8)restaurant.town ~ Utf8("foo|bar|baz")) AND (CAST(restaurant.town AS Utf8)restaurant.town !~ Utf8("one|two")) |
|
||||
| | Projection: CAST(restaurant.town AS Utf8) AS CAST(restaurant.town AS Utf8)restaurant.town, restaurant.count, restaurant.system, restaurant.time, restaurant.town |
|
||||
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.town AS Utf8) ~ Utf8("foo|bar|baz") AS influx_regex_match(restaurant.town,Utf8("foo|bar|baz")), CAST(restaurant.town AS Utf8) !~ Utf8("one|two") AS influx_regex_not_match(restaurant.town,Utf8("one|two")), CAST(restaurant.town AS Utf8) ~ Utf8("foo|bar|baz"), CAST(restaurant.town AS Utf8) !~ Utf8("one|two")] |
|
||||
| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] |
|
||||
| | CoalesceBatchesExec: target_batch_size=4096 |
|
||||
| | FilterExec: CAST(restaurant.town AS Utf8)restaurant.town@0 ~ foo|bar|baz AND CAST(restaurant.town AS Utf8)restaurant.town@0 !~ one|two |
|
||||
| | ProjectionExec: expr=[CAST(town@3 AS Utf8) as CAST(restaurant.town AS Utf8)restaurant.town, count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
|
||||
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
|
||||
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=(CAST(town AS Utf8) ~ Utf8("foo|bar|baz")) AND (CAST(town AS Utf8) !~ Utf8("one|two")) AND (CAST(town AS Utf8) ~ Utf8("foo|bar|baz")) AND (CAST(town AS Utf8) !~ Utf8("one|two")), projection=[count, system, time, town] |
|
||||
| | |
|
||||
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
|
Loading…
Reference in New Issue