chore: Update datafusion again (#7353)

* chore: Update DataFusion

* refactor: Update predicate crate for new transform API

* refactor: Update iox_query crate for new APIs

* refactor: Update influxql for new API

* chore: Run cargo hakari tasks

---------

Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2023-03-28 18:21:49 +02:00 committed by GitHub
parent b277b620fa
commit 43e236e040
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 372 additions and 457 deletions

177
Cargo.lock generated
View File

@ -112,16 +112,16 @@ checksum = "f410d3907b6b3647b9e7bca4551274b2e3d716aa940afb67b7287257401da921"
dependencies = [
"ahash 0.8.3",
"arrow-arith",
"arrow-array 34.0.0",
"arrow-buffer 34.0.0",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-csv",
"arrow-data 34.0.0",
"arrow-data",
"arrow-ipc",
"arrow-json",
"arrow-ord",
"arrow-row",
"arrow-schema 34.0.0",
"arrow-schema",
"arrow-select",
"arrow-string",
"comfy-table",
@ -133,10 +133,10 @@ version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f87391cf46473c9bc53dab68cb8872c3a81d4dfd1703f1c8aa397dba9880a043"
dependencies = [
"arrow-array 34.0.0",
"arrow-buffer 34.0.0",
"arrow-data 34.0.0",
"arrow-schema 34.0.0",
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"chrono",
"half 2.2.1",
"num",
@ -149,25 +149,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d35d5475e65c57cffba06d0022e3006b677515f99b54af33a7cd54f6cdd4a5b5"
dependencies = [
"ahash 0.8.3",
"arrow-buffer 34.0.0",
"arrow-data 34.0.0",
"arrow-schema 34.0.0",
"chrono",
"half 2.2.1",
"hashbrown 0.13.2",
"num",
]
[[package]]
name = "arrow-array"
version = "35.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43489bbff475545b78b0e20bde1d22abd6c99e54499839f9e815a2fa5134a51b"
dependencies = [
"ahash 0.8.3",
"arrow-buffer 35.0.0",
"arrow-data 35.0.0",
"arrow-schema 35.0.0",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"chrono",
"chrono-tz",
"half 2.2.1",
@ -185,26 +169,16 @@ dependencies = [
"num",
]
[[package]]
name = "arrow-buffer"
version = "35.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3759e4a52c593281184787af5435671dc8b1e78333e5a30242b2e2d6e3c9d1f"
dependencies = [
"half 2.2.1",
"num",
]
[[package]]
name = "arrow-cast"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a7285272c9897321dfdba59de29f5b05aeafd3cdedf104a941256d155f6d304"
dependencies = [
"arrow-array 34.0.0",
"arrow-buffer 34.0.0",
"arrow-data 34.0.0",
"arrow-schema 34.0.0",
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-select",
"chrono",
"lexical-core",
@ -217,11 +191,11 @@ version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "981ee4e7f6a120da04e00d0b39182e1eeacccb59c8da74511de753c56b7fddf7"
dependencies = [
"arrow-array 34.0.0",
"arrow-buffer 34.0.0",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data 34.0.0",
"arrow-schema 34.0.0",
"arrow-data",
"arrow-schema",
"chrono",
"csv",
"csv-core",
@ -236,20 +210,8 @@ version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27cc673ee6989ea6e4b4e8c7d461f7e06026a096c8f0b1a7288885ff71ae1e56"
dependencies = [
"arrow-buffer 34.0.0",
"arrow-schema 34.0.0",
"half 2.2.1",
"num",
]
[[package]]
name = "arrow-data"
version = "35.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19c7787c6cdbf9539b1ffb860bfc18c5848926ec3d62cbd52dc3b1ea35c874fd"
dependencies = [
"arrow-buffer 35.0.0",
"arrow-schema 35.0.0",
"arrow-buffer",
"arrow-schema",
"half 2.2.1",
"num",
]
@ -260,11 +222,11 @@ version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd16945f8f3be0f6170b8ced60d414e56239d91a16a3f8800bc1504bc58b2592"
dependencies = [
"arrow-array 34.0.0",
"arrow-buffer 34.0.0",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-ipc",
"arrow-schema 34.0.0",
"arrow-schema",
"base64 0.21.0",
"bytes",
"futures",
@ -283,11 +245,11 @@ version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e37b8b69d9e59116b6b538e8514e0ec63a30f08b617ce800d31cb44e3ef64c1a"
dependencies = [
"arrow-array 34.0.0",
"arrow-buffer 34.0.0",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data 34.0.0",
"arrow-schema 34.0.0",
"arrow-data",
"arrow-schema",
"flatbuffers",
]
@ -297,11 +259,11 @@ version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80c3fa0bed7cfebf6d18e46b733f9cb8a1cb43ce8e6539055ca3e1e48a426266"
dependencies = [
"arrow-array 34.0.0",
"arrow-buffer 34.0.0",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data 34.0.0",
"arrow-schema 34.0.0",
"arrow-data",
"arrow-schema",
"chrono",
"half 2.2.1",
"indexmap",
@ -316,10 +278,10 @@ version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d247dce7bed6a8d6a3c6debfa707a3a2f694383f0c692a39d736a593eae5ef94"
dependencies = [
"arrow-array 34.0.0",
"arrow-buffer 34.0.0",
"arrow-data 34.0.0",
"arrow-schema 34.0.0",
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-select",
"num",
]
@ -331,10 +293,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d609c0181f963cea5c70fddf9a388595b5be441f3aa1d1cdbf728ca834bbd3a"
dependencies = [
"ahash 0.8.3",
"arrow-array 34.0.0",
"arrow-buffer 34.0.0",
"arrow-data 34.0.0",
"arrow-schema 34.0.0",
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"half 2.2.1",
"hashbrown 0.13.2",
]
@ -345,22 +307,16 @@ version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64951898473bfb8e22293e83a44f02874d2257514d49cd95f9aa4afcff183fbc"
[[package]]
name = "arrow-schema"
version = "35.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf6b26f6a6f8410e3b9531cbd1886399b99842701da77d4b4cf2013f7708f20f"
[[package]]
name = "arrow-select"
version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a513d89c2e1ac22b28380900036cf1f3992c6443efc5e079de631dcf83c6888"
dependencies = [
"arrow-array 34.0.0",
"arrow-buffer 34.0.0",
"arrow-data 34.0.0",
"arrow-schema 34.0.0",
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"num",
]
@ -370,10 +326,10 @@ version = "34.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5288979b2705dae1114c864d73150629add9153b9b8f1d7ee3963db94c372ba5"
dependencies = [
"arrow-array 34.0.0",
"arrow-buffer 34.0.0",
"arrow-data 34.0.0",
"arrow-schema 34.0.0",
"arrow-array",
"arrow-buffer",
"arrow-data",
"arrow-schema",
"arrow-select",
"regex",
"regex-syntax",
@ -1497,7 +1453,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1544,10 +1500,10 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068"
dependencies = [
"arrow",
"arrow-array 35.0.0",
"arrow-array",
"chrono",
"num_cpus",
"object_store",
@ -1558,7 +1514,7 @@ dependencies = [
[[package]]
name = "datafusion-execution"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068"
dependencies = [
"dashmap",
"datafusion-common",
@ -1575,7 +1531,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1586,7 +1542,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068"
dependencies = [
"arrow",
"async-trait",
@ -1603,12 +1559,12 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068"
dependencies = [
"ahash 0.8.3",
"arrow",
"arrow-buffer 34.0.0",
"arrow-schema 34.0.0",
"arrow-buffer",
"arrow-schema",
"blake2",
"blake3",
"chrono",
@ -1633,7 +1589,7 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068"
dependencies = [
"arrow",
"chrono",
@ -1641,15 +1597,13 @@ dependencies = [
"datafusion-common",
"datafusion-expr",
"object_store",
"pbjson-build",
"prost",
"prost-build",
]
[[package]]
name = "datafusion-row"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068"
dependencies = [
"arrow",
"datafusion-common",
@ -1660,9 +1614,9 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=74c3955db48f7ef6458125100eed3999512a56ba#74c3955db48f7ef6458125100eed3999512a56ba"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=f30671760285f242950437c3c0f520ef418c1068#f30671760285f242950437c3c0f520ef418c1068"
dependencies = [
"arrow-schema 34.0.0",
"arrow-schema",
"datafusion-common",
"datafusion-expr",
"log",
@ -4135,12 +4089,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ac135ecf63ebb5f53dda0921b0b76d6048b3ef631a5f4760b9e8f863ff00cfa"
dependencies = [
"ahash 0.8.3",
"arrow-array 34.0.0",
"arrow-buffer 34.0.0",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data 34.0.0",
"arrow-data",
"arrow-ipc",
"arrow-schema 34.0.0",
"arrow-schema",
"arrow-select",
"base64 0.21.0",
"brotli",
@ -6848,6 +6802,7 @@ version = "0.1.0"
dependencies = [
"ahash 0.8.3",
"arrow",
"arrow-array",
"arrow-flight",
"arrow-ord",
"arrow-string",

View File

@ -121,8 +121,8 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies]
arrow = { version = "34.0.0" }
arrow-flight = { version = "34.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="74c3955db48f7ef6458125100eed3999512a56ba", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="74c3955db48f7ef6458125100eed3999512a56ba" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="f30671760285f242950437c3c0f520ef418c1068", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="f30671760285f242950437c3c0f520ef418c1068" }
hashbrown = { version = "0.13.2" }
parquet = { version = "34.0.0" }

View File

@ -1,9 +1,8 @@
use std::sync::Arc;
use datafusion::{
catalog::TableReference,
datasource::provider_as_source,
logical_expr::{expr_rewriter::ExprRewritable, LogicalPlanBuilder},
catalog::TableReference, common::tree_node::TreeNode, datasource::provider_as_source,
logical_expr::LogicalPlanBuilder,
};
use observability_deps::tracing::trace;
use predicate::Predicate;

View File

@ -5,12 +5,10 @@ mod range_predicate;
use crate::exec::gapfill::{FillStrategy, GapFill, GapFillParams};
use datafusion::{
common::tree_node::{RewriteRecursion, TreeNode, TreeNodeRewriter, VisitRecursion},
error::{DataFusionError, Result},
logical_expr::{
expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion},
expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
utils::expr_to_columns,
Aggregate, BuiltinScalarFunction, Extension, LogicalPlan,
utils::expr_to_columns, Aggregate, BuiltinScalarFunction, Extension, LogicalPlan,
},
optimizer::{optimizer::ApplyOrder, OptimizerConfig, OptimizerRule},
prelude::{col, Expr},
@ -305,7 +303,8 @@ struct DateBinGapfillRewriter {
args: Option<Vec<Expr>>,
}
impl ExprRewriter for DateBinGapfillRewriter {
impl TreeNodeRewriter for DateBinGapfillRewriter {
type N = Expr;
fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> {
match expr {
Expr::ScalarUDF { fun, .. } if fun.name == DATE_BIN_GAPFILL_UDF_NAME => {
@ -330,23 +329,19 @@ impl ExprRewriter for DateBinGapfillRewriter {
}
fn count_date_bin_gapfill(e: &Expr) -> Result<usize> {
struct Finder {
count: usize,
}
impl ExpressionVisitor for Finder {
fn pre_visit(mut self, expr: &Expr) -> Result<Recursion<Self>> {
match expr {
Expr::ScalarUDF { fun, .. } if fun.name == DATE_BIN_GAPFILL_UDF_NAME => {
self.count += 1;
}
_ => (),
};
Ok(Recursion::Continue(self))
}
}
let f = Finder { count: 0 };
let f = e.accept(f)?;
Ok(f.count)
let mut count = 0;
e.apply(&mut |expr| {
match expr {
Expr::ScalarUDF { fun, .. } if fun.name == DATE_BIN_GAPFILL_UDF_NAME => {
count += 1;
}
_ => (),
};
Ok(VisitRecursion::Continue)
})
.expect("no errors");
Ok(count)
}
fn check_node(node: &LogicalPlan) -> Result<()> {

View File

@ -2,9 +2,12 @@
use std::ops::{Bound, Range};
use datafusion::{
common::DFSchema,
error::{DataFusionError, Result},
logical_expr::{Between, BinaryExpr, LogicalPlan, Operator, PlanVisitor},
common::{
tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion},
DFSchema,
},
error::Result,
logical_expr::{Between, BinaryExpr, LogicalPlan, Operator},
optimizer::utils::split_conjunction,
prelude::{Column, Expr},
};
@ -16,7 +19,7 @@ pub(super) fn find_time_range(plan: &LogicalPlan, time_col: &Column) -> Result<R
col: time_col.clone(),
range: TimeRange::default(),
};
plan.accept(&mut v)?;
plan.visit(&mut v)?;
Ok(v.range.0)
}
@ -25,19 +28,19 @@ struct TimeRangeVisitor {
range: TimeRange,
}
impl PlanVisitor for TimeRangeVisitor {
type Error = DataFusionError;
impl TreeNodeVisitor for TimeRangeVisitor {
type N = LogicalPlan;
fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error> {
fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<VisitRecursion> {
match plan {
LogicalPlan::Projection(p) => {
let idx = p.schema.index_of_column(&self.col)?;
match unwrap_alias(&p.expr[idx]) {
Expr::Column(ref c) => {
self.col = c.clone();
Ok(true)
Ok(VisitRecursion::Continue)
}
_ => Ok(false),
_ => Ok(VisitRecursion::Stop),
}
}
LogicalPlan::Filter(f) => {
@ -48,15 +51,15 @@ impl PlanVisitor for TimeRangeVisitor {
range.with_expr(f.input.schema().as_ref(), &self.col, expr)
})?;
self.range = range;
Ok(true)
Ok(VisitRecursion::Continue)
}
// These nodes do not alter their schema, so we can recurse through them
LogicalPlan::Sort(_)
| LogicalPlan::Repartition(_)
| LogicalPlan::Limit(_)
| LogicalPlan::Distinct(_) => Ok(true),
| LogicalPlan::Distinct(_) => Ok(VisitRecursion::Continue),
// At some point we may wish to handle joins here too.
_ => Ok(false),
_ => Ok(VisitRecursion::Stop),
}
}
}

View File

@ -1,7 +1,7 @@
use datafusion::{
common::DFSchema,
common::{tree_node::TreeNodeRewriter, DFSchema},
error::DataFusionError,
logical_expr::{expr_rewriter::ExprRewriter, utils::from_plan, LogicalPlan, Operator},
logical_expr::{utils::from_plan, LogicalPlan, Operator},
optimizer::{utils::rewrite_preserving_name, OptimizerConfig, OptimizerRule},
prelude::{binary_expr, lit, Expr},
scalar::ScalarValue,
@ -67,7 +67,9 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan, DataFusionError> {
from_plan(plan, new_exprs.as_slice(), new_inputs.as_slice())
}
impl ExprRewriter for InfluxRegexToDataFusionRegex {
impl TreeNodeRewriter for InfluxRegexToDataFusionRegex {
type N = Expr;
fn mutate(&mut self, expr: Expr) -> Result<Expr, DataFusionError> {
match expr {
Expr::ScalarUDF { fun, mut args } => {

View File

@ -172,7 +172,8 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use data_types::ChunkId;
use datafusion::{
physical_plan::{expressions::Literal, filter::FilterExec, tree_node::TreeNodeRewritable},
common::tree_node::{Transformed, TreeNode},
physical_plan::{expressions::Literal, filter::FilterExec},
prelude::{col, lit},
scalar::ScalarValue,
};
@ -347,9 +348,9 @@ mod tests {
Some(Arc::new(Literal::new(ScalarValue::from(false)))),
None,
);
return Ok(Some(Arc::new(exec)));
return Ok(Transformed::Yes(Arc::new(exec)));
}
Ok(None)
Ok(Transformed::No(plan))
})
.unwrap();
assert!(extract_chunks(plan.as_ref()).is_none());

View File

@ -1,10 +1,11 @@
use std::sync::Arc;
use datafusion::{
common::tree_node::{Transformed, TreeNode},
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{tree_node::TreeNodeRewritable, ExecutionPlan},
physical_plan::ExecutionPlan,
};
use predicate::Predicate;
@ -35,7 +36,7 @@ impl PhysicalOptimizerRule for CombineChunks {
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(&|plan| {
if let Some((schema, chunks, output_sort_key)) = extract_chunks(plan.as_ref()) {
return Ok(Some(chunks_to_physical_nodes(
return Ok(Transformed::Yes(chunks_to_physical_nodes(
&schema,
output_sort_key.as_ref(),
chunks,
@ -44,7 +45,7 @@ impl PhysicalOptimizerRule for CombineChunks {
)));
}
Ok(None)
Ok(Transformed::No(plan))
})
}

View File

@ -1,10 +1,11 @@
use std::{collections::HashSet, sync::Arc};
use datafusion::{
common::tree_node::{Transformed, TreeNode},
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{tree_node::TreeNodeRewritable, ExecutionPlan},
physical_plan::ExecutionPlan,
};
use predicate::Predicate;
use schema::{sort::SortKeyBuilder, TIME_COLUMN_NAME};
@ -41,7 +42,7 @@ impl PhysicalOptimizerRule for DedupNullColumns {
assert_eq!(children.len(), 1);
let child = children.remove(0);
let Some((schema, chunks, _output_sort_key)) = extract_chunks(child.as_ref()) else {
return Ok(None);
return Ok(Transformed::No(plan));
};
let pk_cols = dedup_exec.sort_columns();
@ -73,14 +74,14 @@ impl PhysicalOptimizerRule for DedupNullColumns {
);
let sort_exprs = arrow_sort_key_exprs(&sort_key, &schema);
return Ok(Some(Arc::new(DeduplicateExec::new(
return Ok(Transformed::Yes(Arc::new(DeduplicateExec::new(
child,
sort_exprs,
dedup_exec.use_chunk_order_col(),
))));
}
Ok(None)
Ok(Transformed::No(plan))
})
}

View File

@ -2,10 +2,11 @@ use std::{cmp::Reverse, sync::Arc};
use arrow::compute::SortOptions;
use datafusion::{
common::tree_node::{Transformed, TreeNode},
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{tree_node::TreeNodeRewritable, ExecutionPlan},
physical_plan::ExecutionPlan,
};
use indexmap::IndexSet;
use predicate::Predicate;
@ -58,7 +59,7 @@ impl PhysicalOptimizerRule for DedupSortOrder {
assert_eq!(children.len(), 1);
let child = children.remove(0);
let Some((schema, chunks, _output_sort_key)) = extract_chunks(child.as_ref()) else {
return Ok(None);
return Ok(Transformed::No(plan))
};
let mut chunk_sort_keys: Vec<IndexSet<_>> = chunks
@ -135,14 +136,14 @@ impl PhysicalOptimizerRule for DedupSortOrder {
);
let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, &schema);
return Ok(Some(Arc::new(DeduplicateExec::new(
return Ok(Transformed::Yes(Arc::new(DeduplicateExec::new(
child,
sort_exprs,
dedup_exec.use_chunk_order_col(),
))));
}
Ok(None)
Ok(Transformed::No(plan))
})
}

View File

@ -2,10 +2,11 @@ use std::sync::Arc;
use data_types::PartitionId;
use datafusion::{
common::tree_node::{Transformed, TreeNode},
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan},
physical_plan::{union::UnionExec, ExecutionPlan},
};
use hashbrown::HashMap;
use observability_deps::tracing::warn;
@ -38,7 +39,7 @@ impl PhysicalOptimizerRule for PartitionSplit {
assert_eq!(children.len(), 1);
let child = children.remove(0);
let Some((schema, chunks, output_sort_key)) = extract_chunks(child.as_ref()) else {
return Ok(None);
return Ok(Transformed::No(plan));
};
let mut chunks_by_partition: HashMap<PartitionId, Vec<Arc<dyn QueryChunk>>> =
@ -53,7 +54,7 @@ impl PhysicalOptimizerRule for PartitionSplit {
// If there not multiple partitions (0 or 1), then this optimizer is a no-op. Signal that to the
// optimizer framework.
if chunks_by_partition.len() < 2 {
return Ok(None);
return Ok(Transformed::No(plan));
}
// Protect against degenerative plans
@ -69,7 +70,7 @@ impl PhysicalOptimizerRule for PartitionSplit {
max_dedup_partition_split,
"cannot split dedup operation based on partition, too many partitions"
);
return Ok(None);
return Ok(Transformed::No(plan));
}
// ensure deterministic order
@ -94,10 +95,10 @@ impl PhysicalOptimizerRule for PartitionSplit {
})
.collect(),
);
return Ok(Some(Arc::new(out)));
return Ok(Transformed::Yes(Arc::new(out)));
}
Ok(None)
Ok(Transformed::No(plan))
})
}

View File

@ -1,10 +1,11 @@
use std::sync::Arc;
use datafusion::{
common::tree_node::{Transformed, TreeNode},
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{tree_node::TreeNodeRewritable, ExecutionPlan},
physical_plan::ExecutionPlan,
};
use predicate::Predicate;
@ -31,11 +32,11 @@ impl PhysicalOptimizerRule for RemoveDedup {
assert_eq!(children.len(), 1);
let child = children.remove(0);
let Some((schema, chunks, output_sort_key)) = extract_chunks(child.as_ref()) else {
return Ok(None);
return Ok(Transformed::No(plan));
};
if (chunks.len() < 2) && chunks.iter().all(|c| !c.may_contain_pk_duplicates()) {
return Ok(Some(chunks_to_physical_nodes(
return Ok(Transformed::Yes(chunks_to_physical_nodes(
&schema,
output_sort_key.as_ref(),
chunks,
@ -45,7 +46,7 @@ impl PhysicalOptimizerRule for RemoveDedup {
}
}
Ok(None)
Ok(Transformed::No(plan))
})
}

View File

@ -1,10 +1,11 @@
use std::sync::Arc;
use datafusion::{
common::tree_node::{Transformed, TreeNode},
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan},
physical_plan::{union::UnionExec, ExecutionPlan},
};
use observability_deps::tracing::warn;
use predicate::Predicate;
@ -37,14 +38,14 @@ impl PhysicalOptimizerRule for TimeSplit {
assert_eq!(children.len(), 1);
let child = children.remove(0);
let Some((schema, chunks, output_sort_key)) = extract_chunks(child.as_ref()) else {
return Ok(None);
return Ok(Transformed::No(plan));
};
let groups = group_potential_duplicates(chunks);
// if there are no chunks or there is only one group, we don't need to split
if groups.len() < 2 {
return Ok(None);
return Ok(Transformed::No(plan));
}
// Protect against degenerative plans
@ -60,7 +61,7 @@ impl PhysicalOptimizerRule for TimeSplit {
max_dedup_time_split,
"cannot split dedup operation based on time overlaps, too many groups"
);
return Ok(None);
return Ok(Transformed::No(plan));
}
let out = UnionExec::new(
@ -81,10 +82,10 @@ impl PhysicalOptimizerRule for TimeSplit {
})
.collect(),
);
return Ok(Some(Arc::new(out)));
return Ok(Transformed::Yes(Arc::new(out)));
}
Ok(None)
Ok(Transformed::No(plan))
})
}

View File

@ -1,21 +1,17 @@
use std::{collections::HashSet, sync::Arc};
use datafusion::{
common::tree_node::{RewriteRecursion, Transformed, TreeNode, TreeNodeRewriter},
config::ConfigOptions,
error::{DataFusionError, Result},
logical_expr::Operator,
physical_expr::{
rewrite::{RewriteRecursion, TreeNodeRewriter},
split_conjunction,
utils::collect_columns,
},
physical_expr::{split_conjunction, utils::collect_columns},
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
empty::EmptyExec,
expressions::{BinaryExpr, Column},
file_format::ParquetExec,
filter::FilterExec,
tree_node::TreeNodeRewritable,
union::UnionExec,
ExecutionPlan, PhysicalExpr,
},
@ -44,7 +40,7 @@ impl PhysicalOptimizerRule for PredicatePushdown {
let child_any = child.as_any();
if let Some(child_empty) = child_any.downcast_ref::<EmptyExec>() {
if !child_empty.produce_one_row() {
return Ok(Some(child));
return Ok(Transformed::Yes(child));
}
} else if let Some(child_union) = child_any.downcast_ref::<UnionExec>() {
let new_inputs = child_union
@ -59,7 +55,7 @@ impl PhysicalOptimizerRule for PredicatePushdown {
})
.collect::<Result<Vec<_>>>()?;
let new_union = UnionExec::new(new_inputs);
return Ok(Some(Arc::new(new_union)));
return Ok(Transformed::Yes(Arc::new(new_union)));
} else if let Some(child_parquet) = child_any.downcast_ref::<ParquetExec>() {
let existing = child_parquet
.predicate()
@ -80,7 +76,7 @@ impl PhysicalOptimizerRule for PredicatePushdown {
None,
)),
)?);
return Ok(Some(new_node));
return Ok(Transformed::Yes(new_node));
} else if let Some(child_dedup) = child_any.downcast_ref::<DeduplicateExec>() {
let dedup_cols = child_dedup.sort_columns();
let (pushdown, no_pushdown): (Vec<_>, Vec<_>) =
@ -112,12 +108,12 @@ impl PhysicalOptimizerRule for PredicatePushdown {
new_node,
)?);
}
return Ok(Some(new_node));
return Ok(Transformed::Yes(new_node));
}
}
}
Ok(None)
Ok(Transformed::No(plan))
})
}
@ -135,7 +131,9 @@ struct ColumnCollector {
cols: HashSet<Column>,
}
impl TreeNodeRewriter<Arc<dyn PhysicalExpr>> for ColumnCollector {
impl TreeNodeRewriter for ColumnCollector {
type N = Arc<dyn PhysicalExpr>;
fn pre_visit(
&mut self,
node: &Arc<dyn PhysicalExpr>,

View File

@ -5,6 +5,7 @@ use std::{
use arrow::datatypes::SchemaRef;
use datafusion::{
common::tree_node::{Transformed, TreeNode},
config::ConfigOptions,
error::{DataFusionError, Result},
physical_expr::{
@ -19,7 +20,6 @@ use datafusion::{
filter::FilterExec,
projection::ProjectionExec,
sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec},
tree_node::TreeNodeRewritable,
union::UnionExec,
ExecutionPlan, PhysicalExpr,
},
@ -53,11 +53,11 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
column_names.push(output_name.as_str());
} else {
// don't bother w/ renames
return Ok(None);
return Ok(Transformed::No(plan));
}
} else {
// don't bother to deal w/ calculation within projection nodes
return Ok(None);
return Ok(Transformed::No(plan));
}
}
@ -67,7 +67,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
child_empty.produce_one_row(),
Arc::new(child_empty.schema().project(&column_indices)?),
);
return Ok(Some(Arc::new(new_child)));
return Ok(Transformed::Yes(Arc::new(new_child)));
} else if let Some(child_union) = child_any.downcast_ref::<UnionExec>() {
let new_inputs = child_union
.inputs()
@ -81,7 +81,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
})
.collect::<Result<Vec<_>>>()?;
let new_union = UnionExec::new(new_inputs);
return Ok(Some(Arc::new(new_union)));
return Ok(Transformed::Yes(Arc::new(new_union)));
} else if let Some(child_parquet) = child_any.downcast_ref::<ParquetExec>() {
let projection = match child_parquet.base_config().projection.as_ref() {
Some(projection) => column_indices
@ -100,7 +100,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
};
let new_child =
ParquetExec::new(base_config, child_parquet.predicate().cloned(), None);
return Ok(Some(Arc::new(new_child)));
return Ok(Transformed::Yes(Arc::new(new_child)));
} else if let Some(child_filter) = child_any.downcast_ref::<FilterExec>() {
let filter_required_cols = collect_columns(child_filter.predicate());
let filter_required_cols = filter_required_cols
@ -124,7 +124,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
},
)?;
return Ok(Some(plan));
return Ok(Transformed::Yes(plan));
} else if let Some(child_sort) = child_any.downcast_ref::<SortExec>() {
let sort_required_cols = child_sort
.expr()
@ -150,7 +150,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
},
)?;
return Ok(Some(plan));
return Ok(Transformed::Yes(plan));
} else if let Some(child_sort) = child_any.downcast_ref::<SortPreservingMergeExec>()
{
let sort_required_cols = child_sort
@ -176,7 +176,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
},
)?;
return Ok(Some(plan));
return Ok(Transformed::Yes(plan));
} else if let Some(child_proj) = child_any.downcast_ref::<ProjectionExec>() {
let expr = column_indices
.iter()
@ -191,7 +191,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
// and miss the optimization of that particular new ProjectionExec
let plan = self.optimize(plan, config)?;
return Ok(Some(plan));
return Ok(Transformed::Yes(plan));
} else if let Some(child_dedup) = child_any.downcast_ref::<DeduplicateExec>() {
let dedup_required_cols = child_dedup.sort_columns();
@ -216,7 +216,7 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
},
)?;
return Ok(Some(plan));
return Ok(Transformed::Yes(plan));
} else if let Some(child_recordbatches) =
child_any.downcast_ref::<RecordBatchesExec>()
{
@ -225,11 +225,11 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
Arc::new(child_recordbatches.schema().project(&column_indices)?),
child_recordbatches.output_sort_key_memo().cloned(),
);
return Ok(Some(Arc::new(new_child)));
return Ok(Transformed::Yes(Arc::new(new_child)));
}
}
Ok(None)
Ok(Transformed::No(plan))
})
}

View File

@ -1,10 +1,11 @@
use std::sync::Arc;
use datafusion::{
common::tree_node::{Transformed, TreeNode},
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{sorts::sort::SortExec, tree_node::TreeNodeRewritable, ExecutionPlan},
physical_plan::{sorts::sort::SortExec, ExecutionPlan},
};
/// Removes [`SortExec`] if it is no longer needed.
@ -24,11 +25,11 @@ impl PhysicalOptimizerRule for RedundantSort {
let child = sort_exec.input();
if child.output_ordering() == Some(sort_exec.expr()) {
return Ok(Some(Arc::clone(child)));
return Ok(Transformed::Yes(Arc::clone(child)));
}
}
Ok(None)
Ok(Transformed::No(plan))
})
}

View File

@ -1,12 +1,11 @@
use std::sync::Arc;
use datafusion::{
common::tree_node::{Transformed, TreeNode},
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
sorts::sort::SortExec, tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan,
},
physical_plan::{sorts::sort::SortExec, union::UnionExec, ExecutionPlan},
};
/// Pushes [`SortExec`] closer to the data source.
@ -44,11 +43,11 @@ impl PhysicalOptimizerRule for SortPushdown {
})
.collect::<Result<Vec<_>>>()?,
);
return Ok(Some(Arc::new(new_union)));
return Ok(Transformed::Yes(Arc::new(new_union)));
}
}
Ok(None)
Ok(Transformed::No(plan))
})
}

View File

@ -1,10 +1,11 @@
use std::sync::Arc;
use datafusion::{
common::tree_node::{Transformed, TreeNode},
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan},
physical_plan::{union::UnionExec, ExecutionPlan},
};
/// Optimizer that replaces nested [`UnionExec`]s with a single level.
@ -51,11 +52,11 @@ impl PhysicalOptimizerRule for NestedUnion {
}
if found_union {
return Ok(Some(Arc::new(UnionExec::new(children_new))));
return Ok(Transformed::Yes(Arc::new(UnionExec::new(children_new))));
}
}
Ok(None)
Ok(Transformed::No(plan))
})
}

View File

@ -1,10 +1,11 @@
use std::sync::Arc;
use datafusion::{
common::tree_node::{Transformed, TreeNode},
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{tree_node::TreeNodeRewritable, union::UnionExec, ExecutionPlan},
physical_plan::{union::UnionExec, ExecutionPlan},
};
/// Optimizer that replaces [`UnionExec`] with a single child node w/ the child note itself.
@ -33,11 +34,11 @@ impl PhysicalOptimizerRule for OneUnion {
if let Some(union_exec) = plan_any.downcast_ref::<UnionExec>() {
let mut children = union_exec.children();
if children.len() == 1 {
return Ok(Some(children.remove(0)));
return Ok(Transformed::Yes(children.remove(0)));
}
}
Ok(None)
Ok(Transformed::No(plan))
})
}

View File

@ -18,13 +18,11 @@ use data_types::{
};
use datafusion::{
self,
common::{DFSchema, ToDFSchema},
common::{tree_node::TreeNodeRewriter, DFSchema, ToDFSchema},
datasource::{provider_as_source, MemTable},
error::{DataFusionError, Result as DatafusionResult},
execution::context::ExecutionProps,
logical_expr::{
expr_rewriter::ExprRewriter, BinaryExpr, ExprSchemable, LogicalPlan, LogicalPlanBuilder,
},
logical_expr::{BinaryExpr, ExprSchemable, LogicalPlan, LogicalPlanBuilder},
optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext},
physical_expr::create_physical_expr,
physical_plan::{
@ -205,7 +203,9 @@ impl<'a> MissingColumnsToNull<'a> {
}
}
impl<'a> ExprRewriter for MissingColumnsToNull<'a> {
impl<'a> TreeNodeRewriter for MissingColumnsToNull<'a> {
type N = Expr;
fn mutate(&mut self, expr: Expr) -> DatafusionResult<Expr> {
// Ideally this would simply find all Expr::Columns and
// replace them with a constant NULL value. However, doing do
@ -358,7 +358,7 @@ pub fn create_basic_summary(
mod tests {
use arrow::datatypes::DataType;
use datafusion::{
logical_expr::expr_rewriter::ExprRewritable,
common::tree_node::TreeNode,
prelude::{col, lit},
scalar::ScalarValue,
};

View File

@ -14,8 +14,9 @@ use crate::plan::var_ref::{column_type_to_var_ref_data_type, var_ref_data_type_t
use arrow::datatypes::DataType;
use chrono_tz::Tz;
use datafusion::catalog::TableReference;
use datafusion::common::tree_node::{TreeNode, TreeNodeRewriter};
use datafusion::common::{DFSchema, DFSchemaRef, DataFusionError, Result, ScalarValue, ToDFSchema};
use datafusion::logical_expr::expr_rewriter::{normalize_col, ExprRewritable, ExprRewriter};
use datafusion::logical_expr::expr_rewriter::normalize_col;
use datafusion::logical_expr::logical_plan::builder::project;
use datafusion::logical_expr::logical_plan::Analyze;
use datafusion::logical_expr::utils::{expr_as_column_expr, find_aggregate_exprs};
@ -1049,7 +1050,9 @@ struct FixRegularExpressions<'a> {
schemas: &'a Schemas,
}
impl<'a> ExprRewriter for FixRegularExpressions<'a> {
impl<'a> TreeNodeRewriter for FixRegularExpressions<'a> {
type N = Expr;
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
match expr {
// InfluxQL evaluates regular expression conditions to false if the column is numeric

View File

@ -123,43 +123,46 @@
//! [`Eval`]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4137
use crate::plan::util::Schemas;
use arrow::datatypes::DataType;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::common::{Result, ScalarValue};
use datafusion::logical_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
use datafusion::logical_expr::{
binary_expr, cast, coalesce, lit, BinaryExpr, Expr, ExprSchemable, Operator,
};
/// Rewrite the expression tree and return a boolean result.
pub(in crate::plan) fn rewrite_conditional(expr: Expr, schemas: &Schemas) -> Result<Expr> {
let expr = expr.rewrite(&mut RewriteAndCoerce { schemas })?;
let expr = rewrite_expr(expr, schemas)?;
Ok(match expr {
Expr::Literal(ScalarValue::Null) => lit(false),
_ => expr,
})
}
/// The expression was rewritten
fn yes(expr: Expr) -> Result<Transformed<Expr>> {
Ok(Transformed::Yes(expr))
}
/// The expression was not rewritten
fn no(expr: Expr) -> Result<Transformed<Expr>> {
Ok(Transformed::No(expr))
}
/// Rewrite the expression tree and return a result or `NULL` if some of the operands are
/// incompatible.
pub(in crate::plan) fn rewrite_expr(expr: Expr, schemas: &Schemas) -> Result<Expr> {
expr.rewrite(&mut RewriteAndCoerce { schemas })
}
///
/// Rewrite and coerce the expression tree to model the behavior
/// of an InfluxQL query.
struct RewriteAndCoerce<'a> {
schemas: &'a Schemas,
}
impl<'a> ExprRewriter for RewriteAndCoerce<'a> {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
pub(in crate::plan) fn rewrite_expr(expr: Expr, schemas: &Schemas) -> Result<Expr> {
expr.transform(&|expr| {
match expr {
Expr::BinaryExpr(BinaryExpr {
ref left,
op,
ref right,
}) => {
let lhs_type = left.get_type(&self.schemas.df_schema)?;
let rhs_type = right.get_type(&self.schemas.df_schema)?;
let lhs_type = left.get_type(&schemas.df_schema)?;
let rhs_type = right.get_type(&schemas.df_schema)?;
match (lhs_type, op, rhs_type) {
//
@ -183,7 +186,7 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> {
DataType::Null,
_,
DataType::Null
) => Ok(lit(ScalarValue::Null)),
) => yes(lit(ScalarValue::Null)),
// NULL using AND or OR is rewritten as `false`, which the optimiser
// may short circuit.
@ -191,12 +194,12 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> {
DataType::Null,
Operator::Or | Operator::And,
_
) => Ok(binary_expr(lit(false), op, (**right).clone())),
) => yes(binary_expr(lit(false), op, (**right).clone())),
(
_,
Operator::Or | Operator::And,
DataType::Null
) => Ok(binary_expr((**left).clone(), op, lit(false))),
) => yes(binary_expr((**left).clone(), op, lit(false))),
// NULL with other operators is passed through to DataFusion, which is expected
// evaluate to false.
@ -209,10 +212,10 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> {
_,
Operator::Eq | Operator::NotEq | Operator::Gt | Operator::Lt | Operator::GtEq | Operator::LtEq,
DataType::Null
) => Ok(expr),
) => no(expr),
// Any other operations with NULL should return false
(DataType::Null, ..) | (.., DataType::Null) => Ok(lit(false)),
(DataType::Null, ..) | (.., DataType::Null) => yes(lit(false)),
//
// Boolean types
@ -222,7 +225,7 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> {
DataType::Boolean,
Operator::And | Operator::Or | Operator::Eq | Operator::NotEq | Operator::BitwiseAnd | Operator::BitwiseXor | Operator::BitwiseOr,
DataType::Boolean,
) => Ok(rewrite_boolean((**left).clone(), op, (**right).clone())),
) => yes(rewrite_boolean((**left).clone(), op, (**right).clone())),
//
// Numeric types
@ -247,16 +250,16 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> {
// implementations, however, InfluxQL coalesces the result to `0`.
// See: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4268-L4270
Operator::Divide => Ok(coalesce(vec![expr, lit(0_f64)])),
_ => Ok(expr),
Operator::Divide => yes(coalesce(vec![expr, lit(0_f64)])),
_ => no(expr),
},
//
// If either of the types UInt64 and the other is UInt64 or Int64
//
(DataType::UInt64, ..) |
(.., DataType::UInt64) => match op {
Operator::Divide => Ok(coalesce(vec![expr, lit(0_u64)])),
_ => Ok(expr),
Operator::Divide => yes(coalesce(vec![expr, lit(0_u64)])),
_ => no(expr),
}
//
// Finally, if both sides are Int64
@ -269,8 +272,8 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> {
// Like Float64, dividing by zero should return 0 for InfluxQL
//
// See: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4338-L4340
Operator::Divide => Ok(coalesce(vec![expr, lit(0_i64)])),
_ => Ok(expr),
Operator::Divide => yes(coalesce(vec![expr, lit(0_i64)])),
_ => no(expr),
},
//
@ -282,13 +285,13 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> {
DataType::Utf8,
Operator::Eq | Operator::NotEq | Operator::RegexMatch | Operator::RegexNotMatch | Operator::StringConcat,
DataType::Utf8
) => Ok(expr),
) => no(expr),
// Rewrite the + operator to the string-concatenation operator
(
DataType::Utf8,
Operator::Plus,
DataType::Utf8
) => Ok(binary_expr((**left).clone(), Operator::StringConcat, (**right).clone())),
) => yes(binary_expr((**left).clone(), Operator::StringConcat, (**right).clone())),
//
// Dictionary (tag column) is treated the same as Utf8
@ -302,7 +305,7 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> {
DataType::Utf8,
Operator::Eq | Operator::NotEq | Operator::RegexMatch | Operator::RegexNotMatch | Operator::StringConcat,
DataType::Dictionary(..)
) => Ok(expr),
) => no(expr),
(
DataType::Dictionary(..),
Operator::Plus,
@ -312,13 +315,13 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> {
DataType::Utf8,
Operator::Plus,
DataType::Dictionary(..)
) => Ok(expr),
) => no(expr),
//
// Timestamp (time-range) expressions should pass through to DataFusion.
//
(DataType::Timestamp(..), ..) => Ok(expr),
(.., DataType::Timestamp(..)) => Ok(expr),
(DataType::Timestamp(..), ..) => no(expr),
(.., DataType::Timestamp(..)) => no(expr),
//
// Unhandled binary expressions with conditional operators
@ -337,13 +340,13 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> {
| Operator::And
| Operator::Or,
_
) => Ok(lit(false)),
) => yes(lit(false)),
//
// Everything else should result in `NULL`.
//
// See: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4558
_ => Ok(lit(ScalarValue::Null)),
_ => yes(lit(ScalarValue::Null)),
}
}
//
@ -351,9 +354,9 @@ impl<'a> ExprRewriter for RewriteAndCoerce<'a> {
// as it will handle evaluating function calls, etc
//
// See: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4638-L4647
_ => Ok(expr),
_ => no(expr),
}
}
})
}
/// Rewrite conditional operators to `false` and any

View File

@ -22,13 +22,9 @@ use arrow::{
};
use data_types::{InfluxDbType, TableSummary, TimestampRange};
use datafusion::{
common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion},
error::DataFusionError,
logical_expr::{
binary_expr,
expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion},
utils::expr_to_columns,
BinaryExpr,
},
logical_expr::{binary_expr, utils::expr_to_columns, BinaryExpr},
optimizer::utils::split_conjunction,
physical_expr::execution_props::ExecutionProps,
physical_optimizer::pruning::PruningStatistics,
@ -535,9 +531,10 @@ impl Predicate {
return false;
}
expr.accept(RowBasedVisitor::default())
.expect("never fails")
.row_based
let mut visitor = RowBasedVisitor::default();
expr.visit(&mut visitor).expect("never fails");
visitor.row_based
})
.cloned()
.collect();
@ -622,8 +619,10 @@ impl Default for RowBasedVisitor {
}
}
impl ExpressionVisitor for RowBasedVisitor {
fn pre_visit(mut self, expr: &Expr) -> Result<Recursion<Self>, DataFusionError> {
impl TreeNodeVisitor for RowBasedVisitor {
type N = Expr;
fn pre_visit(&mut self, expr: &Expr) -> Result<VisitRecursion, DataFusionError> {
match expr {
Expr::Alias(_, _)
| Expr::Between { .. }
@ -658,13 +657,13 @@ impl ExpressionVisitor for RowBasedVisitor {
| Expr::SimilarTo { .. }
| Expr::Sort { .. }
| Expr::TryCast { .. }
| Expr::Wildcard => Ok(Recursion::Continue(self)),
| Expr::Wildcard => Ok(VisitRecursion::Continue),
Expr::AggregateFunction { .. }
| Expr::AggregateUDF { .. }
| Expr::GroupingSet(_)
| Expr::WindowFunction { .. } => {
self.row_based = false;
Ok(Recursion::Stop(self))
Ok(VisitRecursion::Stop)
}
}
}

View File

@ -4,12 +4,13 @@ mod measurement_rewrite;
mod rewrite;
mod value_rewrite;
use crate::rpc_predicate::column_rewrite::missing_tag_to_null;
use crate::Predicate;
use datafusion::common::tree_node::TreeNode;
use datafusion::common::ToDFSchema;
use datafusion::error::Result as DataFusionResult;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::expr_rewriter::ExprRewritable;
use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext};
use datafusion::prelude::{lit, Expr};
use observability_deps::tracing::{debug, trace};
@ -17,7 +18,6 @@ use schema::Schema;
use std::collections::BTreeSet;
use std::sync::Arc;
use self::column_rewrite::MissingTagColumnRewriter;
use self::field_rewrite::FieldProjectionRewriter;
use self::measurement_rewrite::rewrite_measurement_references;
use self::value_rewrite::rewrite_field_value_references;
@ -211,7 +211,6 @@ fn normalize_predicate(
let mut predicate = predicate.clone();
let mut field_projections = FieldProjectionRewriter::new(schema.clone());
let mut missing_tag_columns = MissingTagColumnRewriter::new(schema.clone());
let mut field_value_exprs = vec![];
@ -226,7 +225,8 @@ fn normalize_predicate(
.map(|e| {
debug!(?e, "rewriting expr");
let e = rewrite_measurement_references(table_name, e)
let e = e
.transform(&|e| rewrite_measurement_references(table_name, e))
.map(|e| log_rewrite(e, "rewrite_measurement_references"))
// Rewrite any references to `_value = some_value` to literal true values.
// Keeps track of these expressions, which can then be used to
@ -242,10 +242,10 @@ fn normalize_predicate(
// in the table's schema as tags. Replace any column references that
// do not exist, or that are not tags, with NULL.
// Field values always use `_value` as a name and are handled above.
.and_then(|e| e.rewrite(&mut missing_tag_columns))
.and_then(|e| e.transform(&|e| missing_tag_to_null(&schema, e)))
.map(|e| log_rewrite(e, "missing_columums"))
// apply IOx specific rewrites (that unlock other simplifications)
.and_then(rewrite::rewrite)
.and_then(rewrite::iox_expr_rewrite)
.map(|e| log_rewrite(e, "rewrite"))
// apply type_coercing so datafuson simplification can deal with this
.and_then(|e| simplifier.coerce(e, Arc::clone(&df_schema)))

View File

@ -1,53 +1,37 @@
use datafusion::{
error::Result as DataFusionResult, logical_expr::expr_rewriter::ExprRewriter, prelude::*,
common::tree_node::Transformed, error::Result as DataFusionResult, prelude::*,
scalar::ScalarValue,
};
use schema::{InfluxColumnType, Schema};
/// Logic for rewriting expressions from influxrpc that reference non
/// existent columns, or columns that are not tags, to NULL.
#[derive(Debug)]
pub(crate) struct MissingTagColumnRewriter {
/// The input schema
schema: Schema,
pub fn missing_tag_to_null(schema: &Schema, expr: Expr) -> DataFusionResult<Transformed<Expr>> {
Ok(match expr {
Expr::Column(col) if !tag_column_exists(schema, &col)? => Transformed::Yes(lit_null()),
expr => Transformed::No(expr),
})
}
impl MissingTagColumnRewriter {
/// Create a new [`MissingTagColumnRewriter`] targeting the given schema
pub(crate) fn new(schema: Schema) -> Self {
Self { schema }
}
fn tag_column_exists(schema: &Schema, col: &Column) -> DataFusionResult<bool> {
// todo a real error here (rpc_predicates shouldn't have table/relation qualifiers)
assert!(col.relation.is_none());
fn tag_column_exists(&self, col: &Column) -> DataFusionResult<bool> {
// todo a real error here (rpc_predicates shouldn't have table/relation qualifiers)
assert!(col.relation.is_none());
let exists = self
.schema
.find_index_of(&col.name)
.map(|i| self.schema.field(i).0)
.map(|influx_column_type| influx_column_type == InfluxColumnType::Tag)
.unwrap_or(false);
Ok(exists)
}
let exists = schema
.find_index_of(&col.name)
.map(|i| schema.field(i).0)
.map(|influx_column_type| influx_column_type == InfluxColumnType::Tag)
.unwrap_or(false);
Ok(exists)
}
fn lit_null() -> Expr {
lit(ScalarValue::Utf8(None))
}
impl ExprRewriter for MissingTagColumnRewriter {
fn mutate(&mut self, expr: Expr) -> DataFusionResult<Expr> {
Ok(match expr {
Expr::Column(col) if !self.tag_column_exists(&col)? => lit_null(),
expr => expr,
})
}
}
#[cfg(test)]
mod tests {
use datafusion::{arrow::datatypes::DataType, logical_expr::expr_rewriter::ExprRewritable};
use datafusion::{arrow::datatypes::DataType, common::tree_node::TreeNode};
use schema::SchemaBuilder;
use super::*;
@ -103,7 +87,7 @@ mod tests {
.build()
.unwrap();
let mut rewriter = MissingTagColumnRewriter::new(schema);
expr.rewrite(&mut rewriter).unwrap()
expr.transform(&|expr| missing_tag_to_null(&schema, expr))
.unwrap()
}
}

View File

@ -4,9 +4,9 @@ use super::FIELD_COLUMN_NAME;
use arrow::array::{as_boolean_array, as_string_array, ArrayRef, StringArray};
use arrow::compute::kernels;
use arrow::record_batch::RecordBatch;
use datafusion::common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion::common::DFSchema;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::logical_expr::expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
use datafusion::optimizer::utils::split_conjunction_owned;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_expr::execution_props::ExecutionProps;
@ -78,7 +78,8 @@ impl FieldProjectionRewriter {
// Rewrites a single predicate. Does not handle AND specially
fn rewrite_single_conjunct(&mut self, expr: Expr) -> DataFusionResult<Expr> {
let finder = expr.accept(ColumnReferencesFinder::default())?;
let mut finder = ColumnReferencesFinder::default();
expr.visit(&mut finder)?;
// rewrite any expression that only references _field to `true`
match (finder.saw_field_reference, finder.saw_non_field_reference) {
@ -217,8 +218,9 @@ struct ColumnReferencesFinder {
saw_non_field_reference: bool,
}
impl ExpressionVisitor for ColumnReferencesFinder {
fn pre_visit(mut self, expr: &Expr) -> DataFusionResult<Recursion<Self>> {
impl TreeNodeVisitor for ColumnReferencesFinder {
type N = Expr;
fn pre_visit(&mut self, expr: &Expr) -> DataFusionResult<VisitRecursion> {
if let Expr::Column(col) = expr {
if col.name == FIELD_COLUMN_NAME {
self.saw_field_reference = true;
@ -229,9 +231,9 @@ impl ExpressionVisitor for ColumnReferencesFinder {
// terminate early if we have already found both
if self.saw_field_reference && self.saw_non_field_reference {
Ok(Recursion::Stop(self))
Ok(VisitRecursion::Stop)
} else {
Ok(Recursion::Continue(self))
Ok(VisitRecursion::Continue)
}
}
}

View File

@ -1,5 +1,5 @@
use datafusion::common::tree_node::Transformed;
use datafusion::error::Result as DataFusionResult;
use datafusion::logical_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
use datafusion::prelude::{lit, Column, Expr};
use super::MEASUREMENT_COLUMN_NAME;
@ -9,27 +9,16 @@ use super::MEASUREMENT_COLUMN_NAME;
pub(crate) fn rewrite_measurement_references(
table_name: &str,
expr: Expr,
) -> DataFusionResult<Expr> {
let mut rewriter = MeasurementRewriter { table_name };
expr.rewrite(&mut rewriter)
}
struct MeasurementRewriter<'a> {
table_name: &'a str,
}
impl ExprRewriter for MeasurementRewriter<'_> {
fn mutate(&mut self, expr: Expr) -> DataFusionResult<Expr> {
Ok(match expr {
// rewrite col("_measurement") --> "table_name"
Expr::Column(Column { relation, name }) if name == MEASUREMENT_COLUMN_NAME => {
// should not have a qualified foo._measurement
// reference
assert!(relation.is_none());
lit(self.table_name)
}
// no rewrite needed
_ => expr,
})
}
) -> DataFusionResult<Transformed<Expr>> {
Ok(match expr {
// rewrite col("_measurement") --> "table_name"
Expr::Column(Column { relation, name }) if name == MEASUREMENT_COLUMN_NAME => {
// should not have a qualified foo._measurement
// reference
assert!(relation.is_none());
Transformed::Yes(lit(table_name))
}
// no rewrite needed
_ => Transformed::No(expr),
})
}

View File

@ -1,11 +1,7 @@
use datafusion::{
common::tree_node::{Transformed, TreeNode},
error::Result,
logical_expr::{
binary_expr,
expr::Case,
expr_rewriter::{ExprRewritable, ExprRewriter},
BinaryExpr, Operator,
},
logical_expr::{binary_expr, expr::Case, BinaryExpr, Operator},
prelude::Expr,
};
@ -37,8 +33,22 @@ use datafusion::{
/// ELSE tag_col = 'cpu'
/// END
/// ```
pub fn rewrite(expr: Expr) -> Result<Expr> {
expr.rewrite(&mut IOxExprRewriter::new())
pub fn iox_expr_rewrite(expr: Expr) -> Result<Expr> {
expr.transform(&iox_expr_rewrite_inner)
}
fn iox_expr_rewrite_inner(expr: Expr) -> Result<Transformed<Expr>> {
Ok(match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right }) if is_case(&left) && is_comparison(op) => {
Transformed::Yes(inline_case(true, *left, *right, op))
}
Expr::BinaryExpr(BinaryExpr { left, op, right })
if is_case(&right) && is_comparison(op) =>
{
Transformed::Yes(inline_case(false, *left, *right, op))
}
expr => Transformed::No(expr),
})
}
/// Special purpose `Expr` rewrite rules for an Expr that is used as a predicate.
@ -58,15 +68,58 @@ pub fn rewrite(expr: Expr) -> Result<Expr> {
/// Currently it is special cases, but it would be great to generalize
/// it and contribute it back to DataFusion
pub fn simplify_predicate(expr: Expr) -> Result<Expr> {
expr.rewrite(&mut IOxPredicateRewriter::new())
expr.transform(&simplify_predicate_inner)
}
/// see docs on [rewrite]
struct IOxExprRewriter {}
fn simplify_predicate_inner(expr: Expr) -> Result<Transformed<Expr>> {
// look for this structure:
//
// NOT(col IS NULL) AND col = 'foo'
//
// and replace it with
//
// col = 'foo'
//
// Proof:
// Case 1: col is NULL
//
// not (NULL IS NULL) AND col = 'foo'
// not (true) AND NULL = 'foo'
// NULL
//
// Case 2: col is not NULL and not equal to 'foo'
// not (false) AND false
// true AND false
// false
//
// Case 3: col is not NULL and equal to 'foo'
// not (false) AND true
// true AND true
// true
match expr {
Expr::BinaryExpr(BinaryExpr {
left,
op: Operator::And,
right,
}) => {
if let (Some(coll), Some(colr)) = (is_col_not_null(&left), is_col_op_lit(&right)) {
if colr == coll {
return Ok(Transformed::Yes(*right));
}
} else if let (Some(coll), Some(colr)) = (is_col_op_lit(&left), is_col_not_null(&right))
{
if colr == coll {
return Ok(Transformed::Yes(*left));
}
};
impl IOxExprRewriter {
fn new() -> Self {
Self {}
Ok(Transformed::No(Expr::BinaryExpr(BinaryExpr {
left,
op: Operator::And,
right,
})))
}
expr => Ok(Transformed::No(expr)),
}
}
@ -109,24 +162,6 @@ fn is_comparison(op: Operator) -> bool {
}
}
impl ExprRewriter for IOxExprRewriter {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
match expr {
Expr::BinaryExpr(BinaryExpr { left, op, right })
if is_case(&left) && is_comparison(op) =>
{
Ok(inline_case(true, *left, *right, op))
}
Expr::BinaryExpr(BinaryExpr { left, op, right })
if is_case(&right) && is_comparison(op) =>
{
Ok(inline_case(false, *left, *right, op))
}
expr => Ok(expr),
}
}
}
fn inline_case(case_on_left: bool, left: Expr, right: Expr, op: Operator) -> Expr {
let (when_then_expr, else_expr, other) = match (case_on_left, left, right) {
(
@ -177,15 +212,6 @@ fn inline_case(case_on_left: bool, left: Expr, right: Expr, op: Operator) -> Exp
})
}
/// see docs on [simplify_predicate]
struct IOxPredicateRewriter {}
impl IOxPredicateRewriter {
fn new() -> Self {
Self {}
}
}
/// returns the column name for a column expression
fn is_col(expr: &Expr) -> Option<&str> {
if let Expr::Column(c) = &expr {
@ -226,61 +252,6 @@ fn is_col_op_lit(expr: &Expr) -> Option<&str> {
}
}
impl ExprRewriter for IOxPredicateRewriter {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
// look for this structure:
//
// NOT(col IS NULL) AND col = 'foo'
//
// and replace it with
//
// col = 'foo'
//
// Proof:
// Case 1: col is NULL
//
// not (NULL IS NULL) AND col = 'foo'
// not (true) AND NULL = 'foo'
// NULL
//
// Case 2: col is not NULL and not equal to 'foo'
// not (false) AND false
// true AND false
// false
//
// Case 3: col is not NULL and equal to 'foo'
// not (false) AND true
// true AND true
// true
match expr {
Expr::BinaryExpr(BinaryExpr {
left,
op: Operator::And,
right,
}) => {
if let (Some(coll), Some(colr)) = (is_col_not_null(&left), is_col_op_lit(&right)) {
if colr == coll {
return Ok(*right);
}
} else if let (Some(coll), Some(colr)) =
(is_col_op_lit(&left), is_col_not_null(&right))
{
if colr == coll {
return Ok(*left);
}
};
Ok(Expr::BinaryExpr(BinaryExpr {
left,
op: Operator::And,
right,
}))
}
expr => Ok(expr),
}
}
}
#[cfg(test)]
mod tests {
use std::ops::Add;
@ -299,7 +270,7 @@ mod tests {
.eq(lit("case2"));
let expected = expr.clone();
assert_eq!(expected, rewrite(expr).unwrap());
assert_eq!(expected, iox_expr_rewrite(expr).unwrap());
}
#[test]
@ -314,7 +285,7 @@ mod tests {
col("tag").eq(lit("bar")),
);
assert_eq!(expected, rewrite(expr).unwrap());
assert_eq!(expected, iox_expr_rewrite(expr).unwrap());
}
#[test]
@ -331,7 +302,7 @@ mod tests {
lit("bar").eq(col("tag")),
);
assert_eq!(expected, rewrite(expr).unwrap());
assert_eq!(expected, iox_expr_rewrite(expr).unwrap());
}
#[test]
@ -358,7 +329,7 @@ mod tests {
)),
);
assert_eq!(expected, rewrite(expr).unwrap());
assert_eq!(expected, iox_expr_rewrite(expr).unwrap());
}
#[test]
@ -404,7 +375,7 @@ mod tests {
expr.clone()
};
assert_eq!(expected, rewrite(expr).unwrap());
assert_eq!(expected, iox_expr_rewrite(expr).unwrap());
}
#[test]
@ -434,7 +405,7 @@ mod tests {
.otherwise(lit("WTF?").eq(lit("is null")))
.unwrap();
assert_eq!(expected, rewrite(expr).unwrap());
assert_eq!(expected, iox_expr_rewrite(expr).unwrap());
}
#[test]
@ -450,7 +421,7 @@ mod tests {
.add(lit(1));
let expected = expr.clone();
assert_eq!(expected, rewrite(expr).unwrap());
assert_eq!(expected, iox_expr_rewrite(expr).unwrap());
}
fn make_case(when_expr: Expr, then_expr: Expr, otherwise_expr: Expr) -> Expr {

View File

@ -1,5 +1,5 @@
use datafusion::common::tree_node::{TreeNode, TreeNodeRewriter};
use datafusion::error::Result as DataFusionResult;
use datafusion::logical_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
use datafusion::prelude::{lit, Expr};
use crate::ValueExpr;
@ -19,7 +19,9 @@ struct FieldValueRewriter<'a> {
value_exprs: &'a mut Vec<ValueExpr>,
}
impl<'a> ExprRewriter for FieldValueRewriter<'a> {
impl<'a> TreeNodeRewriter for FieldValueRewriter<'a> {
type N = Expr;
fn mutate(&mut self, expr: Expr) -> DataFusionResult<Expr> {
// try and convert Expr into a ValueExpr
match expr.try_into() {

View File

@ -18,6 +18,7 @@ license.workspace = true
[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
arrow = { version = "34", features = ["dyn_cmp_dict", "prettyprint"] }
arrow-array = { version = "34", default-features = false, features = ["chrono-tz"] }
arrow-flight = { version = "34", features = ["flight-sql-experimental"] }
arrow-ord = { version = "34", default-features = false, features = ["dyn_cmp_dict"] }
arrow-string = { version = "34", default-features = false, features = ["dyn_cmp_dict"] }
@ -29,9 +30,9 @@ bytes = { version = "1" }
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] }
crossbeam-utils = { version = "0.8" }
crypto-common = { version = "0.1", default-features = false, features = ["std"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74c3955db48f7ef6458125100eed3999512a56ba" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74c3955db48f7ef6458125100eed3999512a56ba", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "74c3955db48f7ef6458125100eed3999512a56ba", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f30671760285f242950437c3c0f520ef418c1068" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f30671760285f242950437c3c0f520ef418c1068", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "f30671760285f242950437c3c0f520ef418c1068", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
digest = { version = "0.10", features = ["mac", "std"] }
either = { version = "1" }
fixedbitset = { version = "0.4" }