diff --git a/Cargo.lock b/Cargo.lock index 596c32c986..40a3992499 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -112,16 +112,16 @@ checksum = "f410d3907b6b3647b9e7bca4551274b2e3d716aa940afb67b7287257401da921" dependencies = [ "ahash 0.8.3", "arrow-arith", - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", + "arrow-array", + "arrow-buffer", "arrow-cast", "arrow-csv", - "arrow-data 34.0.0", + "arrow-data", "arrow-ipc", "arrow-json", "arrow-ord", "arrow-row", - "arrow-schema 34.0.0", + "arrow-schema", "arrow-select", "arrow-string", "comfy-table", @@ -133,10 +133,10 @@ version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f87391cf46473c9bc53dab68cb8872c3a81d4dfd1703f1c8aa397dba9880a043" dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "chrono", "half 2.2.1", "num", @@ -149,25 +149,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d35d5475e65c57cffba06d0022e3006b677515f99b54af33a7cd54f6cdd4a5b5" dependencies = [ "ahash 0.8.3", - "arrow-buffer 34.0.0", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", - "chrono", - "half 2.2.1", - "hashbrown 0.13.2", - "num", -] - -[[package]] -name = "arrow-array" -version = "35.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43489bbff475545b78b0e20bde1d22abd6c99e54499839f9e815a2fa5134a51b" -dependencies = [ - "ahash 0.8.3", - "arrow-buffer 35.0.0", - "arrow-data 35.0.0", - "arrow-schema 35.0.0", + "arrow-buffer", + "arrow-data", + "arrow-schema", "chrono", "chrono-tz", "half 2.2.1", @@ -185,26 +169,16 @@ dependencies = [ "num", ] -[[package]] -name = "arrow-buffer" -version = "35.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3759e4a52c593281184787af5435671dc8b1e78333e5a30242b2e2d6e3c9d1f" -dependencies = [ - "half 2.2.1", - "num", -] - [[package]] name = "arrow-cast" version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0a7285272c9897321dfdba59de29f5b05aeafd3cdedf104a941256d155f6d304" dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "arrow-select", "chrono", "lexical-core", @@ -217,11 +191,11 @@ version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "981ee4e7f6a120da04e00d0b39182e1eeacccb59c8da74511de753c56b7fddf7" dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", + "arrow-array", + "arrow-buffer", "arrow-cast", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-data", + "arrow-schema", "chrono", "csv", "csv-core", @@ -236,20 +210,8 @@ version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27cc673ee6989ea6e4b4e8c7d461f7e06026a096c8f0b1a7288885ff71ae1e56" dependencies = [ - "arrow-buffer 34.0.0", - "arrow-schema 34.0.0", - "half 2.2.1", - "num", -] - -[[package]] -name = "arrow-data" -version = "35.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19c7787c6cdbf9539b1ffb860bfc18c5848926ec3d62cbd52dc3b1ea35c874fd" -dependencies = [ - "arrow-buffer 35.0.0", - "arrow-schema 35.0.0", + "arrow-buffer", + "arrow-schema", "half 2.2.1", "num", ] @@ -260,11 +222,11 @@ version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd16945f8f3be0f6170b8ced60d414e56239d91a16a3f8800bc1504bc58b2592" dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", + "arrow-array", + "arrow-buffer", "arrow-cast", "arrow-ipc", - "arrow-schema 34.0.0", + "arrow-schema", "base64 0.21.0", "bytes", "futures", @@ -283,11 +245,11 @@ version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e37b8b69d9e59116b6b538e8514e0ec63a30f08b617ce800d31cb44e3ef64c1a" dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", + "arrow-array", + "arrow-buffer", "arrow-cast", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-data", + "arrow-schema", "flatbuffers", ] @@ -297,11 +259,11 @@ version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80c3fa0bed7cfebf6d18e46b733f9cb8a1cb43ce8e6539055ca3e1e48a426266" dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", + "arrow-array", + "arrow-buffer", "arrow-cast", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-data", + "arrow-schema", "chrono", "half 2.2.1", "indexmap", @@ -316,10 +278,10 @@ version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d247dce7bed6a8d6a3c6debfa707a3a2f694383f0c692a39d736a593eae5ef94" dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "arrow-select", "num", ] @@ -331,10 +293,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d609c0181f963cea5c70fddf9a388595b5be441f3aa1d1cdbf728ca834bbd3a" dependencies = [ "ahash 0.8.3", - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "half 2.2.1", "hashbrown 0.13.2", ] @@ -345,22 +307,16 @@ version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64951898473bfb8e22293e83a44f02874d2257514d49cd95f9aa4afcff183fbc" -[[package]] -name = "arrow-schema" -version = "35.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf6b26f6a6f8410e3b9531cbd1886399b99842701da77d4b4cf2013f7708f20f" - [[package]] name = "arrow-select" version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a513d89c2e1ac22b28380900036cf1f3992c6443efc5e079de631dcf83c6888" dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "num", ] @@ -370,10 +326,10 @@ version = "34.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5288979b2705dae1114c864d73150629add9153b9b8f1d7ee3963db94c372ba5" dependencies = [ - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", - "arrow-data 34.0.0", - "arrow-schema 34.0.0", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", "arrow-select", "regex", "regex-syntax", @@ -1497,7 +1453,7 @@ dependencies = [ [[package]] name = "datafusion" version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068" dependencies = [ "ahash 0.8.3", "arrow", @@ -1544,10 +1500,10 @@ dependencies = [ [[package]] name = "datafusion-common" version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068" dependencies = [ "arrow", - "arrow-array 35.0.0", + "arrow-array", "chrono", "num_cpus", "object_store", @@ -1558,7 +1514,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068" dependencies = [ "dashmap", "datafusion-common", @@ -1575,7 +1531,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068" dependencies = [ "ahash 0.8.3", "arrow", @@ -1586,7 +1542,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068" dependencies = [ "arrow", "async-trait", @@ -1603,12 +1559,12 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068" dependencies = [ "ahash 0.8.3", "arrow", - "arrow-buffer 34.0.0", - "arrow-schema 34.0.0", + "arrow-buffer", + "arrow-schema", "blake2", "blake3", "chrono", @@ -1633,7 +1589,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068" dependencies = [ "arrow", "chrono", @@ -1641,15 +1597,13 @@ dependencies = [ "datafusion-common", "datafusion-expr", "object_store", - "pbjson-build", "prost", - "prost-build", ] [[package]] name = "datafusion-row" version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068" dependencies = [ "arrow", "datafusion-common", @@ -1660,9 +1614,9 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "21.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068" dependencies = [ - "arrow-schema 34.0.0", + "arrow-schema", "datafusion-common", "datafusion-expr", "log", @@ -4135,12 +4089,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ac135ecf63ebb5f53dda0921b0b76d6048b3ef631a5f4760b9e8f863ff00cfa" dependencies = [ "ahash 0.8.3", - "arrow-array 34.0.0", - "arrow-buffer 34.0.0", + "arrow-array", + "arrow-buffer", "arrow-cast", - "arrow-data 34.0.0", + "arrow-data", "arrow-ipc", - "arrow-schema 34.0.0", + "arrow-schema", "arrow-select", "base64 0.21.0", "brotli", @@ -6848,6 +6802,7 @@ version = "0.1.0" dependencies = [ "ahash 0.8.3", "arrow", + "arrow-array", "arrow-flight", "arrow-ord", "arrow-string", diff --git a/Cargo.toml b/Cargo.toml index 1af05f976c..3b8bb791bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -121,8 +121,8 @@ license = "MIT OR Apache-2.0" [workspace.dependencies] arrow = { version = "34.0.0" } arrow-flight = { version = "34.0.0" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="74c3955db48f7ef6458125100eed3999512a56ba", default-features = false } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="74c3955db48f7ef6458125100eed3999512a56ba" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="f30671760285f242950437c3c0f520ef418c1068", default-features = false } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="f30671760285f242950437c3c0f520ef418c1068" } hashbrown = { version = "0.13.2" } parquet = { version = "34.0.0" } diff --git a/iox_query/src/frontend/common.rs b/iox_query/src/frontend/common.rs index c1b1b4fbe6..55a32e0f5b 100644 --- a/iox_query/src/frontend/common.rs +++ b/iox_query/src/frontend/common.rs @@ -1,9 +1,8 @@ use std::sync::Arc; use datafusion::{ - catalog::TableReference, - datasource::provider_as_source, - logical_expr::{expr_rewriter::ExprRewritable, LogicalPlanBuilder}, + catalog::TableReference, common::tree_node::TreeNode, datasource::provider_as_source, + logical_expr::LogicalPlanBuilder, }; use observability_deps::tracing::trace; use predicate::Predicate; diff --git a/iox_query/src/logical_optimizer/handle_gapfill.rs b/iox_query/src/logical_optimizer/handle_gapfill.rs index 98b9eef720..a36e1d631b 100644 --- a/iox_query/src/logical_optimizer/handle_gapfill.rs +++ b/iox_query/src/logical_optimizer/handle_gapfill.rs @@ -5,12 +5,10 @@ mod range_predicate; use crate::exec::gapfill::{FillStrategy, GapFill, GapFillParams}; use datafusion::{ + common::tree_node::{RewriteRecursion, TreeNode, TreeNodeRewriter, VisitRecursion}, error::{DataFusionError, Result}, logical_expr::{ - expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion}, - expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}, - utils::expr_to_columns, - Aggregate, BuiltinScalarFunction, Extension, LogicalPlan, + utils::expr_to_columns, Aggregate, BuiltinScalarFunction, Extension, LogicalPlan, }, optimizer::{optimizer::ApplyOrder, OptimizerConfig, OptimizerRule}, prelude::{col, Expr}, @@ -305,7 +303,8 @@ struct DateBinGapfillRewriter { args: Option>, } -impl ExprRewriter for DateBinGapfillRewriter { +impl TreeNodeRewriter for DateBinGapfillRewriter { + type N = Expr; fn pre_visit(&mut self, expr: &Expr) -> Result { match expr { Expr::ScalarUDF { fun, .. } if fun.name == DATE_BIN_GAPFILL_UDF_NAME => { @@ -330,23 +329,19 @@ impl ExprRewriter for DateBinGapfillRewriter { } fn count_date_bin_gapfill(e: &Expr) -> Result { - struct Finder { - count: usize, - } - impl ExpressionVisitor for Finder { - fn pre_visit(mut self, expr: &Expr) -> Result> { - match expr { - Expr::ScalarUDF { fun, .. } if fun.name == DATE_BIN_GAPFILL_UDF_NAME => { - self.count += 1; - } - _ => (), - }; - Ok(Recursion::Continue(self)) - } - } - let f = Finder { count: 0 }; - let f = e.accept(f)?; - Ok(f.count) + let mut count = 0; + e.apply(&mut |expr| { + match expr { + Expr::ScalarUDF { fun, .. } if fun.name == DATE_BIN_GAPFILL_UDF_NAME => { + count += 1; + } + _ => (), + }; + Ok(VisitRecursion::Continue) + }) + .expect("no errors"); + + Ok(count) } fn check_node(node: &LogicalPlan) -> Result<()> { diff --git a/iox_query/src/logical_optimizer/handle_gapfill/range_predicate.rs b/iox_query/src/logical_optimizer/handle_gapfill/range_predicate.rs index 1702df41b4..20a0cdeb68 100644 --- a/iox_query/src/logical_optimizer/handle_gapfill/range_predicate.rs +++ b/iox_query/src/logical_optimizer/handle_gapfill/range_predicate.rs @@ -2,9 +2,12 @@ use std::ops::{Bound, Range}; use datafusion::{ - common::DFSchema, - error::{DataFusionError, Result}, - logical_expr::{Between, BinaryExpr, LogicalPlan, Operator, PlanVisitor}, + common::{ + tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}, + DFSchema, + }, + error::Result, + logical_expr::{Between, BinaryExpr, LogicalPlan, Operator}, optimizer::utils::split_conjunction, prelude::{Column, Expr}, }; @@ -16,7 +19,7 @@ pub(super) fn find_time_range(plan: &LogicalPlan, time_col: &Column) -> Result Result { + fn pre_visit(&mut self, plan: &LogicalPlan) -> Result { match plan { LogicalPlan::Projection(p) => { let idx = p.schema.index_of_column(&self.col)?; match unwrap_alias(&p.expr[idx]) { Expr::Column(ref c) => { self.col = c.clone(); - Ok(true) + Ok(VisitRecursion::Continue) } - _ => Ok(false), + _ => Ok(VisitRecursion::Stop), } } LogicalPlan::Filter(f) => { @@ -48,15 +51,15 @@ impl PlanVisitor for TimeRangeVisitor { range.with_expr(f.input.schema().as_ref(), &self.col, expr) })?; self.range = range; - Ok(true) + Ok(VisitRecursion::Continue) } // These nodes do not alter their schema, so we can recurse through them LogicalPlan::Sort(_) | LogicalPlan::Repartition(_) | LogicalPlan::Limit(_) - | LogicalPlan::Distinct(_) => Ok(true), + | LogicalPlan::Distinct(_) => Ok(VisitRecursion::Continue), // At some point we may wish to handle joins here too. - _ => Ok(false), + _ => Ok(VisitRecursion::Stop), } } } diff --git a/iox_query/src/logical_optimizer/influx_regex_to_datafusion_regex.rs b/iox_query/src/logical_optimizer/influx_regex_to_datafusion_regex.rs index 0cc586d843..75ea9f92a6 100644 --- a/iox_query/src/logical_optimizer/influx_regex_to_datafusion_regex.rs +++ b/iox_query/src/logical_optimizer/influx_regex_to_datafusion_regex.rs @@ -1,7 +1,7 @@ use datafusion::{ - common::DFSchema, + common::{tree_node::TreeNodeRewriter, DFSchema}, error::DataFusionError, - logical_expr::{expr_rewriter::ExprRewriter, utils::from_plan, LogicalPlan, Operator}, + logical_expr::{utils::from_plan, LogicalPlan, Operator}, optimizer::{utils::rewrite_preserving_name, OptimizerConfig, OptimizerRule}, prelude::{binary_expr, lit, Expr}, scalar::ScalarValue, @@ -67,7 +67,9 @@ fn optimize(plan: &LogicalPlan) -> Result { from_plan(plan, new_exprs.as_slice(), new_inputs.as_slice()) } -impl ExprRewriter for InfluxRegexToDataFusionRegex { +impl TreeNodeRewriter for InfluxRegexToDataFusionRegex { + type N = Expr; + fn mutate(&mut self, expr: Expr) -> Result { match expr { Expr::ScalarUDF { fun, mut args } => { diff --git a/iox_query/src/physical_optimizer/chunk_extraction.rs b/iox_query/src/physical_optimizer/chunk_extraction.rs index 1279c79392..a43b088611 100644 --- a/iox_query/src/physical_optimizer/chunk_extraction.rs +++ b/iox_query/src/physical_optimizer/chunk_extraction.rs @@ -172,7 +172,8 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use data_types::ChunkId; use datafusion::{ - physical_plan::{expressions::Literal, filter::FilterExec, tree_node::TreeNodeRewritable}, + common::tree_node::{Transformed, TreeNode}, + physical_plan::{expressions::Literal, filter::FilterExec}, prelude::{col, lit}, scalar::ScalarValue, }; @@ -347,9 +348,9 @@ mod tests { Some(Arc::new(Literal::new(ScalarValue::from(false)))), None, ); - return Ok(Some(Arc::new(exec))); + return Ok(Transformed::Yes(Arc::new(exec))); } - Ok(None) + Ok(Transformed::No(plan)) }) .unwrap(); assert!(extract_chunks(plan.as_ref()).is_none()); diff --git a/iox_query/src/physical_optimizer/combine_chunks.rs b/iox_query/src/physical_optimizer/combine_chunks.rs index bc06e2d2f2..2840250865 100644 --- a/iox_query/src/physical_optimizer/combine_chunks.rs +++ b/iox_query/src/physical_optimizer/combine_chunks.rs @@ -1,10 +1,11 @@ use std::sync::Arc; use datafusion::{ + common::tree_node::{Transformed, TreeNode}, config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{tree_node::TreeNodeRewritable, ExecutionPlan}, + physical_plan::ExecutionPlan, }; use predicate::Predicate; @@ -35,7 +36,7 @@ impl PhysicalOptimizerRule for CombineChunks { ) -> Result> { plan.transform_up(&|plan| { if let Some((schema, chunks, output_sort_key)) = extract_chunks(plan.as_ref()) { - return Ok(Some(chunks_to_physical_nodes( + return Ok(Transformed::Yes(chunks_to_physical_nodes( &schema, output_sort_key.as_ref(), chunks, @@ -44,7 +45,7 @@ impl PhysicalOptimizerRule for CombineChunks { ))); } - Ok(None) + Ok(Transformed::No(plan)) }) } diff --git a/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs b/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs index 3d36ed7819..609eb6272c 100644 --- a/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs +++ b/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs @@ -1,10 +1,11 @@ use std::{collections::HashSet, sync::Arc}; use datafusion::{ + common::tree_node::{Transformed, TreeNode}, config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{tree_node::TreeNodeRewritable, ExecutionPlan}, + physical_plan::ExecutionPlan, }; use predicate::Predicate; use schema::{sort::SortKeyBuilder, TIME_COLUMN_NAME}; @@ -41,7 +42,7 @@ impl PhysicalOptimizerRule for DedupNullColumns { assert_eq!(children.len(), 1); let child = children.remove(0); let Some((schema, chunks, _output_sort_key)) = extract_chunks(child.as_ref()) else { - return Ok(None); + return Ok(Transformed::No(plan)); }; let pk_cols = dedup_exec.sort_columns(); @@ -73,14 +74,14 @@ impl PhysicalOptimizerRule for DedupNullColumns { ); let sort_exprs = arrow_sort_key_exprs(&sort_key, &schema); - return Ok(Some(Arc::new(DeduplicateExec::new( + return Ok(Transformed::Yes(Arc::new(DeduplicateExec::new( child, sort_exprs, dedup_exec.use_chunk_order_col(), )))); } - Ok(None) + Ok(Transformed::No(plan)) }) } diff --git a/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs b/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs index 87f713b0ba..cc42e7ef43 100644 --- a/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs +++ b/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs @@ -2,10 +2,11 @@ use std::{cmp::Reverse, sync::Arc}; use arrow::compute::SortOptions; use datafusion::{ + common::tree_node::{Transformed, TreeNode}, config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{tree_node::TreeNodeRewritable, ExecutionPlan}, + physical_plan::ExecutionPlan, }; use indexmap::IndexSet; use predicate::Predicate; @@ -58,7 +59,7 @@ impl PhysicalOptimizerRule for DedupSortOrder { assert_eq!(children.len(), 1); let child = children.remove(0); let Some((schema, chunks, _output_sort_key)) = extract_chunks(child.as_ref()) else { - return Ok(None); + return Ok(Transformed::No(plan)) }; let mut chunk_sort_keys: Vec> = chunks @@ -135,14 +136,14 @@ impl PhysicalOptimizerRule for DedupSortOrder { ); let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, &schema); - return Ok(Some(Arc::new(DeduplicateExec::new( + return Ok(Transformed::Yes(Arc::new(DeduplicateExec::new( child, sort_exprs, dedup_exec.use_chunk_order_col(), )))); } - Ok(None) + Ok(Transformed::No(plan)) }) } diff --git a/iox_query/src/physical_optimizer/dedup/partition_split.rs b/iox_query/src/physical_optimizer/dedup/partition_split.rs index cbc562e6f0..b4dd530803 100644 --- a/iox_query/src/physical_optimizer/dedup/partition_split.rs +++ b/iox_query/src/physical_optimizer/dedup/partition_split.rs @@ -2,10 +2,11 @@ use std::sync::Arc; use data_types::PartitionId; use datafusion::{ + common::tree_node::{Transformed, TreeNode}, config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan}, + physical_plan::{union::UnionExec, ExecutionPlan}, }; use hashbrown::HashMap; use observability_deps::tracing::warn; @@ -38,7 +39,7 @@ impl PhysicalOptimizerRule for PartitionSplit { assert_eq!(children.len(), 1); let child = children.remove(0); let Some((schema, chunks, output_sort_key)) = extract_chunks(child.as_ref()) else { - return Ok(None); + return Ok(Transformed::No(plan)); }; let mut chunks_by_partition: HashMap>> = @@ -53,7 +54,7 @@ impl PhysicalOptimizerRule for PartitionSplit { // If there not multiple partitions (0 or 1), then this optimizer is a no-op. Signal that to the // optimizer framework. if chunks_by_partition.len() < 2 { - return Ok(None); + return Ok(Transformed::No(plan)); } // Protect against degenerative plans @@ -69,7 +70,7 @@ impl PhysicalOptimizerRule for PartitionSplit { max_dedup_partition_split, "cannot split dedup operation based on partition, too many partitions" ); - return Ok(None); + return Ok(Transformed::No(plan)); } // ensure deterministic order @@ -94,10 +95,10 @@ impl PhysicalOptimizerRule for PartitionSplit { }) .collect(), ); - return Ok(Some(Arc::new(out))); + return Ok(Transformed::Yes(Arc::new(out))); } - Ok(None) + Ok(Transformed::No(plan)) }) } diff --git a/iox_query/src/physical_optimizer/dedup/remove_dedup.rs b/iox_query/src/physical_optimizer/dedup/remove_dedup.rs index 60bd73b515..1ac473f7ee 100644 --- a/iox_query/src/physical_optimizer/dedup/remove_dedup.rs +++ b/iox_query/src/physical_optimizer/dedup/remove_dedup.rs @@ -1,10 +1,11 @@ use std::sync::Arc; use datafusion::{ + common::tree_node::{Transformed, TreeNode}, config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{tree_node::TreeNodeRewritable, ExecutionPlan}, + physical_plan::ExecutionPlan, }; use predicate::Predicate; @@ -31,11 +32,11 @@ impl PhysicalOptimizerRule for RemoveDedup { assert_eq!(children.len(), 1); let child = children.remove(0); let Some((schema, chunks, output_sort_key)) = extract_chunks(child.as_ref()) else { - return Ok(None); + return Ok(Transformed::No(plan)); }; if (chunks.len() < 2) && chunks.iter().all(|c| !c.may_contain_pk_duplicates()) { - return Ok(Some(chunks_to_physical_nodes( + return Ok(Transformed::Yes(chunks_to_physical_nodes( &schema, output_sort_key.as_ref(), chunks, @@ -45,7 +46,7 @@ impl PhysicalOptimizerRule for RemoveDedup { } } - Ok(None) + Ok(Transformed::No(plan)) }) } diff --git a/iox_query/src/physical_optimizer/dedup/time_split.rs b/iox_query/src/physical_optimizer/dedup/time_split.rs index 61f9aa4022..8f92899f20 100644 --- a/iox_query/src/physical_optimizer/dedup/time_split.rs +++ b/iox_query/src/physical_optimizer/dedup/time_split.rs @@ -1,10 +1,11 @@ use std::sync::Arc; use datafusion::{ + common::tree_node::{Transformed, TreeNode}, config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan}, + physical_plan::{union::UnionExec, ExecutionPlan}, }; use observability_deps::tracing::warn; use predicate::Predicate; @@ -37,14 +38,14 @@ impl PhysicalOptimizerRule for TimeSplit { assert_eq!(children.len(), 1); let child = children.remove(0); let Some((schema, chunks, output_sort_key)) = extract_chunks(child.as_ref()) else { - return Ok(None); + return Ok(Transformed::No(plan)); }; let groups = group_potential_duplicates(chunks); // if there are no chunks or there is only one group, we don't need to split if groups.len() < 2 { - return Ok(None); + return Ok(Transformed::No(plan)); } // Protect against degenerative plans @@ -60,7 +61,7 @@ impl PhysicalOptimizerRule for TimeSplit { max_dedup_time_split, "cannot split dedup operation based on time overlaps, too many groups" ); - return Ok(None); + return Ok(Transformed::No(plan)); } let out = UnionExec::new( @@ -81,10 +82,10 @@ impl PhysicalOptimizerRule for TimeSplit { }) .collect(), ); - return Ok(Some(Arc::new(out))); + return Ok(Transformed::Yes(Arc::new(out))); } - Ok(None) + Ok(Transformed::No(plan)) }) } diff --git a/iox_query/src/physical_optimizer/predicate_pushdown.rs b/iox_query/src/physical_optimizer/predicate_pushdown.rs index 0386a8f337..2da2ec9869 100644 --- a/iox_query/src/physical_optimizer/predicate_pushdown.rs +++ b/iox_query/src/physical_optimizer/predicate_pushdown.rs @@ -1,21 +1,17 @@ use std::{collections::HashSet, sync::Arc}; use datafusion::{ + common::tree_node::{RewriteRecursion, Transformed, TreeNode, TreeNodeRewriter}, config::ConfigOptions, error::{DataFusionError, Result}, logical_expr::Operator, - physical_expr::{ - rewrite::{RewriteRecursion, TreeNodeRewriter}, - split_conjunction, - utils::collect_columns, - }, + physical_expr::{split_conjunction, utils::collect_columns}, physical_optimizer::PhysicalOptimizerRule, physical_plan::{ empty::EmptyExec, expressions::{BinaryExpr, Column}, file_format::ParquetExec, filter::FilterExec, - tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan, PhysicalExpr, }, @@ -44,7 +40,7 @@ impl PhysicalOptimizerRule for PredicatePushdown { let child_any = child.as_any(); if let Some(child_empty) = child_any.downcast_ref::() { if !child_empty.produce_one_row() { - return Ok(Some(child)); + return Ok(Transformed::Yes(child)); } } else if let Some(child_union) = child_any.downcast_ref::() { let new_inputs = child_union @@ -59,7 +55,7 @@ impl PhysicalOptimizerRule for PredicatePushdown { }) .collect::>>()?; let new_union = UnionExec::new(new_inputs); - return Ok(Some(Arc::new(new_union))); + return Ok(Transformed::Yes(Arc::new(new_union))); } else if let Some(child_parquet) = child_any.downcast_ref::() { let existing = child_parquet .predicate() @@ -80,7 +76,7 @@ impl PhysicalOptimizerRule for PredicatePushdown { None, )), )?); - return Ok(Some(new_node)); + return Ok(Transformed::Yes(new_node)); } else if let Some(child_dedup) = child_any.downcast_ref::() { let dedup_cols = child_dedup.sort_columns(); let (pushdown, no_pushdown): (Vec<_>, Vec<_>) = @@ -112,12 +108,12 @@ impl PhysicalOptimizerRule for PredicatePushdown { new_node, )?); } - return Ok(Some(new_node)); + return Ok(Transformed::Yes(new_node)); } } } - Ok(None) + Ok(Transformed::No(plan)) }) } @@ -135,7 +131,9 @@ struct ColumnCollector { cols: HashSet, } -impl TreeNodeRewriter> for ColumnCollector { +impl TreeNodeRewriter for ColumnCollector { + type N = Arc; + fn pre_visit( &mut self, node: &Arc, diff --git a/iox_query/src/physical_optimizer/projection_pushdown.rs b/iox_query/src/physical_optimizer/projection_pushdown.rs index a465d6af05..04c94589ad 100644 --- a/iox_query/src/physical_optimizer/projection_pushdown.rs +++ b/iox_query/src/physical_optimizer/projection_pushdown.rs @@ -5,6 +5,7 @@ use std::{ use arrow::datatypes::SchemaRef; use datafusion::{ + common::tree_node::{Transformed, TreeNode}, config::ConfigOptions, error::{DataFusionError, Result}, physical_expr::{ @@ -19,7 +20,6 @@ use datafusion::{ filter::FilterExec, projection::ProjectionExec, sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec}, - tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan, PhysicalExpr, }, @@ -53,11 +53,11 @@ impl PhysicalOptimizerRule for ProjectionPushdown { column_names.push(output_name.as_str()); } else { // don't bother w/ renames - return Ok(None); + return Ok(Transformed::No(plan)); } } else { // don't bother to deal w/ calculation within projection nodes - return Ok(None); + return Ok(Transformed::No(plan)); } } @@ -67,7 +67,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown { child_empty.produce_one_row(), Arc::new(child_empty.schema().project(&column_indices)?), ); - return Ok(Some(Arc::new(new_child))); + return Ok(Transformed::Yes(Arc::new(new_child))); } else if let Some(child_union) = child_any.downcast_ref::() { let new_inputs = child_union .inputs() @@ -81,7 +81,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown { }) .collect::>>()?; let new_union = UnionExec::new(new_inputs); - return Ok(Some(Arc::new(new_union))); + return Ok(Transformed::Yes(Arc::new(new_union))); } else if let Some(child_parquet) = child_any.downcast_ref::() { let projection = match child_parquet.base_config().projection.as_ref() { Some(projection) => column_indices @@ -100,7 +100,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown { }; let new_child = ParquetExec::new(base_config, child_parquet.predicate().cloned(), None); - return Ok(Some(Arc::new(new_child))); + return Ok(Transformed::Yes(Arc::new(new_child))); } else if let Some(child_filter) = child_any.downcast_ref::() { let filter_required_cols = collect_columns(child_filter.predicate()); let filter_required_cols = filter_required_cols @@ -124,7 +124,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown { }, )?; - return Ok(Some(plan)); + return Ok(Transformed::Yes(plan)); } else if let Some(child_sort) = child_any.downcast_ref::() { let sort_required_cols = child_sort .expr() @@ -150,7 +150,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown { }, )?; - return Ok(Some(plan)); + return Ok(Transformed::Yes(plan)); } else if let Some(child_sort) = child_any.downcast_ref::() { let sort_required_cols = child_sort @@ -176,7 +176,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown { }, )?; - return Ok(Some(plan)); + return Ok(Transformed::Yes(plan)); } else if let Some(child_proj) = child_any.downcast_ref::() { let expr = column_indices .iter() @@ -191,7 +191,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown { // and miss the optimization of that particular new ProjectionExec let plan = self.optimize(plan, config)?; - return Ok(Some(plan)); + return Ok(Transformed::Yes(plan)); } else if let Some(child_dedup) = child_any.downcast_ref::() { let dedup_required_cols = child_dedup.sort_columns(); @@ -216,7 +216,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown { }, )?; - return Ok(Some(plan)); + return Ok(Transformed::Yes(plan)); } else if let Some(child_recordbatches) = child_any.downcast_ref::() { @@ -225,11 +225,11 @@ impl PhysicalOptimizerRule for ProjectionPushdown { Arc::new(child_recordbatches.schema().project(&column_indices)?), child_recordbatches.output_sort_key_memo().cloned(), ); - return Ok(Some(Arc::new(new_child))); + return Ok(Transformed::Yes(Arc::new(new_child))); } } - Ok(None) + Ok(Transformed::No(plan)) }) } diff --git a/iox_query/src/physical_optimizer/sort/redundant_sort.rs b/iox_query/src/physical_optimizer/sort/redundant_sort.rs index 01eb7a5578..4edefebb72 100644 --- a/iox_query/src/physical_optimizer/sort/redundant_sort.rs +++ b/iox_query/src/physical_optimizer/sort/redundant_sort.rs @@ -1,10 +1,11 @@ use std::sync::Arc; use datafusion::{ + common::tree_node::{Transformed, TreeNode}, config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{sorts::sort::SortExec, tree_node::TreeNodeRewritable, ExecutionPlan}, + physical_plan::{sorts::sort::SortExec, ExecutionPlan}, }; /// Removes [`SortExec`] if it is no longer needed. @@ -24,11 +25,11 @@ impl PhysicalOptimizerRule for RedundantSort { let child = sort_exec.input(); if child.output_ordering() == Some(sort_exec.expr()) { - return Ok(Some(Arc::clone(child))); + return Ok(Transformed::Yes(Arc::clone(child))); } } - Ok(None) + Ok(Transformed::No(plan)) }) } diff --git a/iox_query/src/physical_optimizer/sort/sort_pushdown.rs b/iox_query/src/physical_optimizer/sort/sort_pushdown.rs index f1fef8328d..7f820d7740 100644 --- a/iox_query/src/physical_optimizer/sort/sort_pushdown.rs +++ b/iox_query/src/physical_optimizer/sort/sort_pushdown.rs @@ -1,12 +1,11 @@ use std::sync::Arc; use datafusion::{ + common::tree_node::{Transformed, TreeNode}, config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{ - sorts::sort::SortExec, tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan, - }, + physical_plan::{sorts::sort::SortExec, union::UnionExec, ExecutionPlan}, }; /// Pushes [`SortExec`] closer to the data source. @@ -44,11 +43,11 @@ impl PhysicalOptimizerRule for SortPushdown { }) .collect::>>()?, ); - return Ok(Some(Arc::new(new_union))); + return Ok(Transformed::Yes(Arc::new(new_union))); } } - Ok(None) + Ok(Transformed::No(plan)) }) } diff --git a/iox_query/src/physical_optimizer/union/nested_union.rs b/iox_query/src/physical_optimizer/union/nested_union.rs index 97961217fa..4b12cdf49a 100644 --- a/iox_query/src/physical_optimizer/union/nested_union.rs +++ b/iox_query/src/physical_optimizer/union/nested_union.rs @@ -1,10 +1,11 @@ use std::sync::Arc; use datafusion::{ + common::tree_node::{Transformed, TreeNode}, config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan}, + physical_plan::{union::UnionExec, ExecutionPlan}, }; /// Optimizer that replaces nested [`UnionExec`]s with a single level. @@ -51,11 +52,11 @@ impl PhysicalOptimizerRule for NestedUnion { } if found_union { - return Ok(Some(Arc::new(UnionExec::new(children_new)))); + return Ok(Transformed::Yes(Arc::new(UnionExec::new(children_new)))); } } - Ok(None) + Ok(Transformed::No(plan)) }) } diff --git a/iox_query/src/physical_optimizer/union/one_union.rs b/iox_query/src/physical_optimizer/union/one_union.rs index fc52130704..311ab1e831 100644 --- a/iox_query/src/physical_optimizer/union/one_union.rs +++ b/iox_query/src/physical_optimizer/union/one_union.rs @@ -1,10 +1,11 @@ use std::sync::Arc; use datafusion::{ + common::tree_node::{Transformed, TreeNode}, config::ConfigOptions, error::Result, physical_optimizer::PhysicalOptimizerRule, - physical_plan::{tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan}, + physical_plan::{union::UnionExec, ExecutionPlan}, }; /// Optimizer that replaces [`UnionExec`] with a single child node w/ the child note itself. @@ -33,11 +34,11 @@ impl PhysicalOptimizerRule for OneUnion { if let Some(union_exec) = plan_any.downcast_ref::() { let mut children = union_exec.children(); if children.len() == 1 { - return Ok(Some(children.remove(0))); + return Ok(Transformed::Yes(children.remove(0))); } } - Ok(None) + Ok(Transformed::No(plan)) }) } diff --git a/iox_query/src/util.rs b/iox_query/src/util.rs index 3167cdacdf..2996026876 100644 --- a/iox_query/src/util.rs +++ b/iox_query/src/util.rs @@ -18,13 +18,11 @@ use data_types::{ }; use datafusion::{ self, - common::{DFSchema, ToDFSchema}, + common::{tree_node::TreeNodeRewriter, DFSchema, ToDFSchema}, datasource::{provider_as_source, MemTable}, error::{DataFusionError, Result as DatafusionResult}, execution::context::ExecutionProps, - logical_expr::{ - expr_rewriter::ExprRewriter, BinaryExpr, ExprSchemable, LogicalPlan, LogicalPlanBuilder, - }, + logical_expr::{BinaryExpr, ExprSchemable, LogicalPlan, LogicalPlanBuilder}, optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext}, physical_expr::create_physical_expr, physical_plan::{ @@ -205,7 +203,9 @@ impl<'a> MissingColumnsToNull<'a> { } } -impl<'a> ExprRewriter for MissingColumnsToNull<'a> { +impl<'a> TreeNodeRewriter for MissingColumnsToNull<'a> { + type N = Expr; + fn mutate(&mut self, expr: Expr) -> DatafusionResult { // Ideally this would simply find all Expr::Columns and // replace them with a constant NULL value. However, doing do @@ -358,7 +358,7 @@ pub fn create_basic_summary( mod tests { use arrow::datatypes::DataType; use datafusion::{ - logical_expr::expr_rewriter::ExprRewritable, + common::tree_node::TreeNode, prelude::{col, lit}, scalar::ScalarValue, }; diff --git a/iox_query_influxql/src/plan/planner.rs b/iox_query_influxql/src/plan/planner.rs index e54137369a..2158e78d4b 100644 --- a/iox_query_influxql/src/plan/planner.rs +++ b/iox_query_influxql/src/plan/planner.rs @@ -14,8 +14,9 @@ use crate::plan::var_ref::{column_type_to_var_ref_data_type, var_ref_data_type_t use arrow::datatypes::DataType; use chrono_tz::Tz; use datafusion::catalog::TableReference; +use datafusion::common::tree_node::{TreeNode, TreeNodeRewriter}; use datafusion::common::{DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, ToDFSchema}; -use datafusion::logical_expr::expr_rewriter::{normalize_col, ExprRewritable, ExprRewriter}; +use datafusion::logical_expr::expr_rewriter::normalize_col; use datafusion::logical_expr::logical_plan::builder::project; use datafusion::logical_expr::logical_plan::Analyze; use datafusion::logical_expr::utils::{expr_as_column_expr, find_aggregate_exprs}; @@ -1049,7 +1050,9 @@ struct FixRegularExpressions<'a> { schemas: &'a Schemas, } -impl<'a> ExprRewriter for FixRegularExpressions<'a> { +impl<'a> TreeNodeRewriter for FixRegularExpressions<'a> { + type N = Expr; + fn mutate(&mut self, expr: Expr) -> Result { match expr { // InfluxQL evaluates regular expression conditions to false if the column is numeric diff --git a/iox_query_influxql/src/plan/planner_rewrite_expression.rs b/iox_query_influxql/src/plan/planner_rewrite_expression.rs index 1e3bf8f1e6..f4128f8789 100644 --- a/iox_query_influxql/src/plan/planner_rewrite_expression.rs +++ b/iox_query_influxql/src/plan/planner_rewrite_expression.rs @@ -123,43 +123,46 @@ //! [`Eval`]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4137 use crate::plan::util::Schemas; use arrow::datatypes::DataType; +use datafusion::common::tree_node::{Transformed, TreeNode}; use datafusion::common::{Result, ScalarValue}; -use datafusion::logical_expr::expr_rewriter::{ExprRewritable, ExprRewriter}; use datafusion::logical_expr::{ binary_expr, cast, coalesce, lit, BinaryExpr, Expr, ExprSchemable, Operator, }; /// Rewrite the expression tree and return a boolean result. pub(in crate::plan) fn rewrite_conditional(expr: Expr, schemas: &Schemas) -> Result { - let expr = expr.rewrite(&mut RewriteAndCoerce { schemas })?; + let expr = rewrite_expr(expr, schemas)?; Ok(match expr { Expr::Literal(ScalarValue::Null) => lit(false), _ => expr, }) } +/// The expression was rewritten +fn yes(expr: Expr) -> Result> { + Ok(Transformed::Yes(expr)) +} + +/// The expression was not rewritten +fn no(expr: Expr) -> Result> { + Ok(Transformed::No(expr)) +} + /// Rewrite the expression tree and return a result or `NULL` if some of the operands are /// incompatible. -pub(in crate::plan) fn rewrite_expr(expr: Expr, schemas: &Schemas) -> Result { - expr.rewrite(&mut RewriteAndCoerce { schemas }) -} - +/// /// Rewrite and coerce the expression tree to model the behavior /// of an InfluxQL query. -struct RewriteAndCoerce<'a> { - schemas: &'a Schemas, -} - -impl<'a> ExprRewriter for RewriteAndCoerce<'a> { - fn mutate(&mut self, expr: Expr) -> Result { +pub(in crate::plan) fn rewrite_expr(expr: Expr, schemas: &Schemas) -> Result { + expr.transform(&|expr| { match expr { Expr::BinaryExpr(BinaryExpr { ref left, op, ref right, }) => { - let lhs_type = left.get_type(&self.schemas.df_schema)?; - let rhs_type = right.get_type(&self.schemas.df_schema)?; + let lhs_type = left.get_type(&schemas.df_schema)?; + let rhs_type = right.get_type(&schemas.df_schema)?; match (lhs_type, op, rhs_type) { // @@ -183,7 +186,7 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> { DataType::Null, _, DataType::Null - ) => Ok(lit(ScalarValue::Null)), + ) => yes(lit(ScalarValue::Null)), // NULL using AND or OR is rewritten as `false`, which the optimiser // may short circuit. @@ -191,12 +194,12 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> { DataType::Null, Operator::Or | Operator::And, _ - ) => Ok(binary_expr(lit(false), op, (**right).clone())), + ) => yes(binary_expr(lit(false), op, (**right).clone())), ( _, Operator::Or | Operator::And, DataType::Null - ) => Ok(binary_expr((**left).clone(), op, lit(false))), + ) => yes(binary_expr((**left).clone(), op, lit(false))), // NULL with other operators is passed through to DataFusion, which is expected // evaluate to false. @@ -209,10 +212,10 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> { _, Operator::Eq | Operator::NotEq | Operator::Gt | Operator::Lt | Operator::GtEq | Operator::LtEq, DataType::Null - ) => Ok(expr), + ) => no(expr), // Any other operations with NULL should return false - (DataType::Null, ..) | (.., DataType::Null) => Ok(lit(false)), + (DataType::Null, ..) | (.., DataType::Null) => yes(lit(false)), // // Boolean types @@ -222,7 +225,7 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> { DataType::Boolean, Operator::And | Operator::Or | Operator::Eq | Operator::NotEq | Operator::BitwiseAnd | Operator::BitwiseXor | Operator::BitwiseOr, DataType::Boolean, - ) => Ok(rewrite_boolean((**left).clone(), op, (**right).clone())), + ) => yes(rewrite_boolean((**left).clone(), op, (**right).clone())), // // Numeric types @@ -247,16 +250,16 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> { // implementations, however, InfluxQL coalesces the result to `0`. // See: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4268-L4270 - Operator::Divide => Ok(coalesce(vec![expr, lit(0_f64)])), - _ => Ok(expr), + Operator::Divide => yes(coalesce(vec![expr, lit(0_f64)])), + _ => no(expr), }, // // If either of the types UInt64 and the other is UInt64 or Int64 // (DataType::UInt64, ..) | (.., DataType::UInt64) => match op { - Operator::Divide => Ok(coalesce(vec![expr, lit(0_u64)])), - _ => Ok(expr), + Operator::Divide => yes(coalesce(vec![expr, lit(0_u64)])), + _ => no(expr), } // // Finally, if both sides are Int64 @@ -269,8 +272,8 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> { // Like Float64, dividing by zero should return 0 for InfluxQL // // See: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4338-L4340 - Operator::Divide => Ok(coalesce(vec![expr, lit(0_i64)])), - _ => Ok(expr), + Operator::Divide => yes(coalesce(vec![expr, lit(0_i64)])), + _ => no(expr), }, // @@ -282,13 +285,13 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> { DataType::Utf8, Operator::Eq | Operator::NotEq | Operator::RegexMatch | Operator::RegexNotMatch | Operator::StringConcat, DataType::Utf8 - ) => Ok(expr), + ) => no(expr), // Rewrite the + operator to the string-concatenation operator ( DataType::Utf8, Operator::Plus, DataType::Utf8 - ) => Ok(binary_expr((**left).clone(), Operator::StringConcat, (**right).clone())), + ) => yes(binary_expr((**left).clone(), Operator::StringConcat, (**right).clone())), // // Dictionary (tag column) is treated the same as Utf8 @@ -302,7 +305,7 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> { DataType::Utf8, Operator::Eq | Operator::NotEq | Operator::RegexMatch | Operator::RegexNotMatch | Operator::StringConcat, DataType::Dictionary(..) - ) => Ok(expr), + ) => no(expr), ( DataType::Dictionary(..), Operator::Plus, @@ -312,13 +315,13 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> { DataType::Utf8, Operator::Plus, DataType::Dictionary(..) - ) => Ok(expr), + ) => no(expr), // // Timestamp (time-range) expressions should pass through to DataFusion. // - (DataType::Timestamp(..), ..) => Ok(expr), - (.., DataType::Timestamp(..)) => Ok(expr), + (DataType::Timestamp(..), ..) => no(expr), + (.., DataType::Timestamp(..)) => no(expr), // // Unhandled binary expressions with conditional operators @@ -337,13 +340,13 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> { | Operator::And | Operator::Or, _ - ) => Ok(lit(false)), + ) => yes(lit(false)), // // Everything else should result in `NULL`. // // See: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4558 - _ => Ok(lit(ScalarValue::Null)), + _ => yes(lit(ScalarValue::Null)), } } // @@ -351,9 +354,9 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> { // as it will handle evaluating function calls, etc // // See: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4638-L4647 - _ => Ok(expr), + _ => no(expr), } - } + }) } /// Rewrite conditional operators to `false` and any diff --git a/predicate/src/lib.rs b/predicate/src/lib.rs index 824a9b8660..171232b708 100644 --- a/predicate/src/lib.rs +++ b/predicate/src/lib.rs @@ -22,13 +22,9 @@ use arrow::{ }; use data_types::{InfluxDbType, TableSummary, TimestampRange}; use datafusion::{ + common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}, error::DataFusionError, - logical_expr::{ - binary_expr, - expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}, - utils::expr_to_columns, - BinaryExpr, - }, + logical_expr::{binary_expr, utils::expr_to_columns, BinaryExpr}, optimizer::utils::split_conjunction, physical_expr::execution_props::ExecutionProps, physical_optimizer::pruning::PruningStatistics, @@ -535,9 +531,10 @@ impl Predicate { return false; } - expr.accept(RowBasedVisitor::default()) - .expect("never fails") - .row_based + let mut visitor = RowBasedVisitor::default(); + expr.visit(&mut visitor).expect("never fails"); + + visitor.row_based }) .cloned() .collect(); @@ -622,8 +619,10 @@ impl Default for RowBasedVisitor { } } -impl ExpressionVisitor for RowBasedVisitor { - fn pre_visit(mut self, expr: &Expr) -> Result, DataFusionError> { +impl TreeNodeVisitor for RowBasedVisitor { + type N = Expr; + + fn pre_visit(&mut self, expr: &Expr) -> Result { match expr { Expr::Alias(_, _) | Expr::Between { .. } @@ -658,13 +657,13 @@ impl ExpressionVisitor for RowBasedVisitor { | Expr::SimilarTo { .. } | Expr::Sort { .. } | Expr::TryCast { .. } - | Expr::Wildcard => Ok(Recursion::Continue(self)), + | Expr::Wildcard => Ok(VisitRecursion::Continue), Expr::AggregateFunction { .. } | Expr::AggregateUDF { .. } | Expr::GroupingSet(_) | Expr::WindowFunction { .. } => { self.row_based = false; - Ok(Recursion::Stop(self)) + Ok(VisitRecursion::Stop) } } } diff --git a/predicate/src/rpc_predicate.rs b/predicate/src/rpc_predicate.rs index e6ac8639e9..ce346fbe51 100644 --- a/predicate/src/rpc_predicate.rs +++ b/predicate/src/rpc_predicate.rs @@ -4,12 +4,13 @@ mod measurement_rewrite; mod rewrite; mod value_rewrite; +use crate::rpc_predicate::column_rewrite::missing_tag_to_null; use crate::Predicate; +use datafusion::common::tree_node::TreeNode; use datafusion::common::ToDFSchema; use datafusion::error::Result as DataFusionResult; use datafusion::execution::context::ExecutionProps; -use datafusion::logical_expr::expr_rewriter::ExprRewritable; use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext}; use datafusion::prelude::{lit, Expr}; use observability_deps::tracing::{debug, trace}; @@ -17,7 +18,6 @@ use schema::Schema; use std::collections::BTreeSet; use std::sync::Arc; -use self::column_rewrite::MissingTagColumnRewriter; use self::field_rewrite::FieldProjectionRewriter; use self::measurement_rewrite::rewrite_measurement_references; use self::value_rewrite::rewrite_field_value_references; @@ -211,7 +211,6 @@ fn normalize_predicate( let mut predicate = predicate.clone(); let mut field_projections = FieldProjectionRewriter::new(schema.clone()); - let mut missing_tag_columns = MissingTagColumnRewriter::new(schema.clone()); let mut field_value_exprs = vec![]; @@ -226,7 +225,8 @@ fn normalize_predicate( .map(|e| { debug!(?e, "rewriting expr"); - let e = rewrite_measurement_references(table_name, e) + let e = e + .transform(&|e| rewrite_measurement_references(table_name, e)) .map(|e| log_rewrite(e, "rewrite_measurement_references")) // Rewrite any references to `_value = some_value` to literal true values. // Keeps track of these expressions, which can then be used to @@ -242,10 +242,10 @@ fn normalize_predicate( // in the table's schema as tags. Replace any column references that // do not exist, or that are not tags, with NULL. // Field values always use `_value` as a name and are handled above. - .and_then(|e| e.rewrite(&mut missing_tag_columns)) + .and_then(|e| e.transform(&|e| missing_tag_to_null(&schema, e))) .map(|e| log_rewrite(e, "missing_columums")) // apply IOx specific rewrites (that unlock other simplifications) - .and_then(rewrite::rewrite) + .and_then(rewrite::iox_expr_rewrite) .map(|e| log_rewrite(e, "rewrite")) // apply type_coercing so datafuson simplification can deal with this .and_then(|e| simplifier.coerce(e, Arc::clone(&df_schema))) diff --git a/predicate/src/rpc_predicate/column_rewrite.rs b/predicate/src/rpc_predicate/column_rewrite.rs index eeed0202bc..c58914fa95 100644 --- a/predicate/src/rpc_predicate/column_rewrite.rs +++ b/predicate/src/rpc_predicate/column_rewrite.rs @@ -1,53 +1,37 @@ use datafusion::{ - error::Result as DataFusionResult, logical_expr::expr_rewriter::ExprRewriter, prelude::*, + common::tree_node::Transformed, error::Result as DataFusionResult, prelude::*, scalar::ScalarValue, }; use schema::{InfluxColumnType, Schema}; /// Logic for rewriting expressions from influxrpc that reference non /// existent columns, or columns that are not tags, to NULL. -#[derive(Debug)] -pub(crate) struct MissingTagColumnRewriter { - /// The input schema - schema: Schema, +pub fn missing_tag_to_null(schema: &Schema, expr: Expr) -> DataFusionResult> { + Ok(match expr { + Expr::Column(col) if !tag_column_exists(schema, &col)? => Transformed::Yes(lit_null()), + expr => Transformed::No(expr), + }) } -impl MissingTagColumnRewriter { - /// Create a new [`MissingTagColumnRewriter`] targeting the given schema - pub(crate) fn new(schema: Schema) -> Self { - Self { schema } - } +fn tag_column_exists(schema: &Schema, col: &Column) -> DataFusionResult { + // todo a real error here (rpc_predicates shouldn't have table/relation qualifiers) + assert!(col.relation.is_none()); - fn tag_column_exists(&self, col: &Column) -> DataFusionResult { - // todo a real error here (rpc_predicates shouldn't have table/relation qualifiers) - assert!(col.relation.is_none()); - - let exists = self - .schema - .find_index_of(&col.name) - .map(|i| self.schema.field(i).0) - .map(|influx_column_type| influx_column_type == InfluxColumnType::Tag) - .unwrap_or(false); - Ok(exists) - } + let exists = schema + .find_index_of(&col.name) + .map(|i| schema.field(i).0) + .map(|influx_column_type| influx_column_type == InfluxColumnType::Tag) + .unwrap_or(false); + Ok(exists) } fn lit_null() -> Expr { lit(ScalarValue::Utf8(None)) } -impl ExprRewriter for MissingTagColumnRewriter { - fn mutate(&mut self, expr: Expr) -> DataFusionResult { - Ok(match expr { - Expr::Column(col) if !self.tag_column_exists(&col)? => lit_null(), - expr => expr, - }) - } -} - #[cfg(test)] mod tests { - use datafusion::{arrow::datatypes::DataType, logical_expr::expr_rewriter::ExprRewritable}; + use datafusion::{arrow::datatypes::DataType, common::tree_node::TreeNode}; use schema::SchemaBuilder; use super::*; @@ -103,7 +87,7 @@ mod tests { .build() .unwrap(); - let mut rewriter = MissingTagColumnRewriter::new(schema); - expr.rewrite(&mut rewriter).unwrap() + expr.transform(&|expr| missing_tag_to_null(&schema, expr)) + .unwrap() } } diff --git a/predicate/src/rpc_predicate/field_rewrite.rs b/predicate/src/rpc_predicate/field_rewrite.rs index 0d26c27938..eb8de42e98 100644 --- a/predicate/src/rpc_predicate/field_rewrite.rs +++ b/predicate/src/rpc_predicate/field_rewrite.rs @@ -4,9 +4,9 @@ use super::FIELD_COLUMN_NAME; use arrow::array::{as_boolean_array, as_string_array, ArrayRef, StringArray}; use arrow::compute::kernels; use arrow::record_batch::RecordBatch; +use datafusion::common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}; use datafusion::common::DFSchema; use datafusion::error::{DataFusionError, Result as DataFusionResult}; -use datafusion::logical_expr::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion}; use datafusion::optimizer::utils::split_conjunction_owned; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_expr::execution_props::ExecutionProps; @@ -78,7 +78,8 @@ impl FieldProjectionRewriter { // Rewrites a single predicate. Does not handle AND specially fn rewrite_single_conjunct(&mut self, expr: Expr) -> DataFusionResult { - let finder = expr.accept(ColumnReferencesFinder::default())?; + let mut finder = ColumnReferencesFinder::default(); + expr.visit(&mut finder)?; // rewrite any expression that only references _field to `true` match (finder.saw_field_reference, finder.saw_non_field_reference) { @@ -217,8 +218,9 @@ struct ColumnReferencesFinder { saw_non_field_reference: bool, } -impl ExpressionVisitor for ColumnReferencesFinder { - fn pre_visit(mut self, expr: &Expr) -> DataFusionResult> { +impl TreeNodeVisitor for ColumnReferencesFinder { + type N = Expr; + fn pre_visit(&mut self, expr: &Expr) -> DataFusionResult { if let Expr::Column(col) = expr { if col.name == FIELD_COLUMN_NAME { self.saw_field_reference = true; @@ -229,9 +231,9 @@ impl ExpressionVisitor for ColumnReferencesFinder { // terminate early if we have already found both if self.saw_field_reference && self.saw_non_field_reference { - Ok(Recursion::Stop(self)) + Ok(VisitRecursion::Stop) } else { - Ok(Recursion::Continue(self)) + Ok(VisitRecursion::Continue) } } } diff --git a/predicate/src/rpc_predicate/measurement_rewrite.rs b/predicate/src/rpc_predicate/measurement_rewrite.rs index e5fcf2a0a6..ea367efaae 100644 --- a/predicate/src/rpc_predicate/measurement_rewrite.rs +++ b/predicate/src/rpc_predicate/measurement_rewrite.rs @@ -1,5 +1,5 @@ +use datafusion::common::tree_node::Transformed; use datafusion::error::Result as DataFusionResult; -use datafusion::logical_expr::expr_rewriter::{ExprRewritable, ExprRewriter}; use datafusion::prelude::{lit, Column, Expr}; use super::MEASUREMENT_COLUMN_NAME; @@ -9,27 +9,16 @@ use super::MEASUREMENT_COLUMN_NAME; pub(crate) fn rewrite_measurement_references( table_name: &str, expr: Expr, -) -> DataFusionResult { - let mut rewriter = MeasurementRewriter { table_name }; - expr.rewrite(&mut rewriter) -} - -struct MeasurementRewriter<'a> { - table_name: &'a str, -} - -impl ExprRewriter for MeasurementRewriter<'_> { - fn mutate(&mut self, expr: Expr) -> DataFusionResult { - Ok(match expr { - // rewrite col("_measurement") --> "table_name" - Expr::Column(Column { relation, name }) if name == MEASUREMENT_COLUMN_NAME => { - // should not have a qualified foo._measurement - // reference - assert!(relation.is_none()); - lit(self.table_name) - } - // no rewrite needed - _ => expr, - }) - } +) -> DataFusionResult> { + Ok(match expr { + // rewrite col("_measurement") --> "table_name" + Expr::Column(Column { relation, name }) if name == MEASUREMENT_COLUMN_NAME => { + // should not have a qualified foo._measurement + // reference + assert!(relation.is_none()); + Transformed::Yes(lit(table_name)) + } + // no rewrite needed + _ => Transformed::No(expr), + }) } diff --git a/predicate/src/rpc_predicate/rewrite.rs b/predicate/src/rpc_predicate/rewrite.rs index 4488e86a5a..2c65a0571c 100644 --- a/predicate/src/rpc_predicate/rewrite.rs +++ b/predicate/src/rpc_predicate/rewrite.rs @@ -1,11 +1,7 @@ use datafusion::{ + common::tree_node::{Transformed, TreeNode}, error::Result, - logical_expr::{ - binary_expr, - expr::Case, - expr_rewriter::{ExprRewritable, ExprRewriter}, - BinaryExpr, Operator, - }, + logical_expr::{binary_expr, expr::Case, BinaryExpr, Operator}, prelude::Expr, }; @@ -37,8 +33,22 @@ use datafusion::{ /// ELSE tag_col = 'cpu' /// END /// ``` -pub fn rewrite(expr: Expr) -> Result { - expr.rewrite(&mut IOxExprRewriter::new()) +pub fn iox_expr_rewrite(expr: Expr) -> Result { + expr.transform(&iox_expr_rewrite_inner) +} + +fn iox_expr_rewrite_inner(expr: Expr) -> Result> { + Ok(match expr { + Expr::BinaryExpr(BinaryExpr { left, op, right }) if is_case(&left) && is_comparison(op) => { + Transformed::Yes(inline_case(true, *left, *right, op)) + } + Expr::BinaryExpr(BinaryExpr { left, op, right }) + if is_case(&right) && is_comparison(op) => + { + Transformed::Yes(inline_case(false, *left, *right, op)) + } + expr => Transformed::No(expr), + }) } /// Special purpose `Expr` rewrite rules for an Expr that is used as a predicate. @@ -58,15 +68,58 @@ pub fn rewrite(expr: Expr) -> Result { /// Currently it is special cases, but it would be great to generalize /// it and contribute it back to DataFusion pub fn simplify_predicate(expr: Expr) -> Result { - expr.rewrite(&mut IOxPredicateRewriter::new()) + expr.transform(&simplify_predicate_inner) } -/// see docs on [rewrite] -struct IOxExprRewriter {} +fn simplify_predicate_inner(expr: Expr) -> Result> { + // look for this structure: + // + // NOT(col IS NULL) AND col = 'foo' + // + // and replace it with + // + // col = 'foo' + // + // Proof: + // Case 1: col is NULL + // + // not (NULL IS NULL) AND col = 'foo' + // not (true) AND NULL = 'foo' + // NULL + // + // Case 2: col is not NULL and not equal to 'foo' + // not (false) AND false + // true AND false + // false + // + // Case 3: col is not NULL and equal to 'foo' + // not (false) AND true + // true AND true + // true + match expr { + Expr::BinaryExpr(BinaryExpr { + left, + op: Operator::And, + right, + }) => { + if let (Some(coll), Some(colr)) = (is_col_not_null(&left), is_col_op_lit(&right)) { + if colr == coll { + return Ok(Transformed::Yes(*right)); + } + } else if let (Some(coll), Some(colr)) = (is_col_op_lit(&left), is_col_not_null(&right)) + { + if colr == coll { + return Ok(Transformed::Yes(*left)); + } + }; -impl IOxExprRewriter { - fn new() -> Self { - Self {} + Ok(Transformed::No(Expr::BinaryExpr(BinaryExpr { + left, + op: Operator::And, + right, + }))) + } + expr => Ok(Transformed::No(expr)), } } @@ -109,24 +162,6 @@ fn is_comparison(op: Operator) -> bool { } } -impl ExprRewriter for IOxExprRewriter { - fn mutate(&mut self, expr: Expr) -> Result { - match expr { - Expr::BinaryExpr(BinaryExpr { left, op, right }) - if is_case(&left) && is_comparison(op) => - { - Ok(inline_case(true, *left, *right, op)) - } - Expr::BinaryExpr(BinaryExpr { left, op, right }) - if is_case(&right) && is_comparison(op) => - { - Ok(inline_case(false, *left, *right, op)) - } - expr => Ok(expr), - } - } -} - fn inline_case(case_on_left: bool, left: Expr, right: Expr, op: Operator) -> Expr { let (when_then_expr, else_expr, other) = match (case_on_left, left, right) { ( @@ -177,15 +212,6 @@ fn inline_case(case_on_left: bool, left: Expr, right: Expr, op: Operator) -> Exp }) } -/// see docs on [simplify_predicate] -struct IOxPredicateRewriter {} - -impl IOxPredicateRewriter { - fn new() -> Self { - Self {} - } -} - /// returns the column name for a column expression fn is_col(expr: &Expr) -> Option<&str> { if let Expr::Column(c) = &expr { @@ -226,61 +252,6 @@ fn is_col_op_lit(expr: &Expr) -> Option<&str> { } } -impl ExprRewriter for IOxPredicateRewriter { - fn mutate(&mut self, expr: Expr) -> Result { - // look for this structure: - // - // NOT(col IS NULL) AND col = 'foo' - // - // and replace it with - // - // col = 'foo' - // - // Proof: - // Case 1: col is NULL - // - // not (NULL IS NULL) AND col = 'foo' - // not (true) AND NULL = 'foo' - // NULL - // - // Case 2: col is not NULL and not equal to 'foo' - // not (false) AND false - // true AND false - // false - // - // Case 3: col is not NULL and equal to 'foo' - // not (false) AND true - // true AND true - // true - match expr { - Expr::BinaryExpr(BinaryExpr { - left, - op: Operator::And, - right, - }) => { - if let (Some(coll), Some(colr)) = (is_col_not_null(&left), is_col_op_lit(&right)) { - if colr == coll { - return Ok(*right); - } - } else if let (Some(coll), Some(colr)) = - (is_col_op_lit(&left), is_col_not_null(&right)) - { - if colr == coll { - return Ok(*left); - } - }; - - Ok(Expr::BinaryExpr(BinaryExpr { - left, - op: Operator::And, - right, - })) - } - expr => Ok(expr), - } - } -} - #[cfg(test)] mod tests { use std::ops::Add; @@ -299,7 +270,7 @@ mod tests { .eq(lit("case2")); let expected = expr.clone(); - assert_eq!(expected, rewrite(expr).unwrap()); + assert_eq!(expected, iox_expr_rewrite(expr).unwrap()); } #[test] @@ -314,7 +285,7 @@ mod tests { col("tag").eq(lit("bar")), ); - assert_eq!(expected, rewrite(expr).unwrap()); + assert_eq!(expected, iox_expr_rewrite(expr).unwrap()); } #[test] @@ -331,7 +302,7 @@ mod tests { lit("bar").eq(col("tag")), ); - assert_eq!(expected, rewrite(expr).unwrap()); + assert_eq!(expected, iox_expr_rewrite(expr).unwrap()); } #[test] @@ -358,7 +329,7 @@ mod tests { )), ); - assert_eq!(expected, rewrite(expr).unwrap()); + assert_eq!(expected, iox_expr_rewrite(expr).unwrap()); } #[test] @@ -404,7 +375,7 @@ mod tests { expr.clone() }; - assert_eq!(expected, rewrite(expr).unwrap()); + assert_eq!(expected, iox_expr_rewrite(expr).unwrap()); } #[test] @@ -434,7 +405,7 @@ mod tests { .otherwise(lit("WTF?").eq(lit("is null"))) .unwrap(); - assert_eq!(expected, rewrite(expr).unwrap()); + assert_eq!(expected, iox_expr_rewrite(expr).unwrap()); } #[test] @@ -450,7 +421,7 @@ mod tests { .add(lit(1)); let expected = expr.clone(); - assert_eq!(expected, rewrite(expr).unwrap()); + assert_eq!(expected, iox_expr_rewrite(expr).unwrap()); } fn make_case(when_expr: Expr, then_expr: Expr, otherwise_expr: Expr) -> Expr { diff --git a/predicate/src/rpc_predicate/value_rewrite.rs b/predicate/src/rpc_predicate/value_rewrite.rs index ea2bc6b582..d91946ae4c 100644 --- a/predicate/src/rpc_predicate/value_rewrite.rs +++ b/predicate/src/rpc_predicate/value_rewrite.rs @@ -1,5 +1,5 @@ +use datafusion::common::tree_node::{TreeNode, TreeNodeRewriter}; use datafusion::error::Result as DataFusionResult; -use datafusion::logical_expr::expr_rewriter::{ExprRewritable, ExprRewriter}; use datafusion::prelude::{lit, Expr}; use crate::ValueExpr; @@ -19,7 +19,9 @@ struct FieldValueRewriter<'a> { value_exprs: &'a mut Vec, } -impl<'a> ExprRewriter for FieldValueRewriter<'a> { +impl<'a> TreeNodeRewriter for FieldValueRewriter<'a> { + type N = Expr; + fn mutate(&mut self, expr: Expr) -> DataFusionResult { // try and convert Expr into a ValueExpr match expr.try_into() { diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index b740605c59..4e182cd5be 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -18,6 +18,7 @@ license.workspace = true [dependencies] ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } arrow = { version = "34", features = ["dyn_cmp_dict", "prettyprint"] } +arrow-array = { version = "34", default-features = false, features = ["chrono-tz"] } arrow-flight = { version = "34", features = ["flight-sql-experimental"] } arrow-ord = { version = "34", default-features = false, features = ["dyn_cmp_dict"] } arrow-string = { version = "34", default-features = false, features = ["dyn_cmp_dict"] } @@ -29,9 +30,9 @@ bytes = { version = "1" } chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] } crossbeam-utils = { version = "0.8" } crypto-common = { version = "0.1", default-features = false, features = ["std"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74c3955db48f7ef6458125100eed3999512a56ba" } -datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74c3955db48f7ef6458125100eed3999512a56ba", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } -datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74c3955db48f7ef6458125100eed3999512a56ba", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f30671760285f242950437c3c0f520ef418c1068" } +datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f30671760285f242950437c3c0f520ef418c1068", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } +datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f30671760285f242950437c3c0f520ef418c1068", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } digest = { version = "0.10", features = ["mac", "std"] } either = { version = "1" } fixedbitset = { version = "0.4" }