chore: Update DataFusion again (#7777)

* chore: Update datafusion again

* chore: Run cargo hakari tasks

---------

Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2023-05-15 08:38:45 -04:00 committed by GitHub
parent 8543c3ca1f
commit 7735e7c95b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 96 additions and 91 deletions

36
Cargo.lock generated
View File

@ -1432,8 +1432,8 @@ dependencies = [
[[package]] [[package]]
name = "datafusion" name = "datafusion"
version = "23.0.0" version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [ dependencies = [
"ahash 0.8.3", "ahash 0.8.3",
"arrow", "arrow",
@ -1481,8 +1481,8 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-common" name = "datafusion-common"
version = "23.0.0" version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [ dependencies = [
"arrow", "arrow",
"arrow-array", "arrow-array",
@ -1495,8 +1495,8 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-execution" name = "datafusion-execution"
version = "23.0.0" version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [ dependencies = [
"dashmap", "dashmap",
"datafusion-common", "datafusion-common",
@ -1512,8 +1512,8 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-expr" name = "datafusion-expr"
version = "23.0.0" version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [ dependencies = [
"ahash 0.8.3", "ahash 0.8.3",
"arrow", "arrow",
@ -1523,8 +1523,8 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-optimizer" name = "datafusion-optimizer"
version = "23.0.0" version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [ dependencies = [
"arrow", "arrow",
"async-trait", "async-trait",
@ -1540,8 +1540,8 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-physical-expr" name = "datafusion-physical-expr"
version = "23.0.0" version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [ dependencies = [
"ahash 0.8.3", "ahash 0.8.3",
"arrow", "arrow",
@ -1572,8 +1572,8 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-proto" name = "datafusion-proto"
version = "23.0.0" version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [ dependencies = [
"arrow", "arrow",
"chrono", "chrono",
@ -1586,8 +1586,8 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-row" name = "datafusion-row"
version = "23.0.0" version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [ dependencies = [
"arrow", "arrow",
"datafusion-common", "datafusion-common",
@ -1597,8 +1597,8 @@ dependencies = [
[[package]] [[package]]
name = "datafusion-sql" name = "datafusion-sql"
version = "23.0.0" version = "24.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=06e9f53637f20dd91bef43b74942ec36c38c22d5#06e9f53637f20dd91bef43b74942ec36c38c22d5" source = "git+https://github.com/apache/arrow-datafusion.git?rev=496fc399de700ae14fab436fdff8711cd3132436#496fc399de700ae14fab436fdff8711cd3132436"
dependencies = [ dependencies = [
"arrow", "arrow",
"arrow-schema", "arrow-schema",

View File

@ -115,8 +115,8 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies] [workspace.dependencies]
arrow = { version = "38.0.0" } arrow = { version = "38.0.0" }
arrow-flight = { version = "38.0.0" } arrow-flight = { version = "38.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="06e9f53637f20dd91bef43b74942ec36c38c22d5", default-features = false } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="496fc399de700ae14fab436fdff8711cd3132436", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="06e9f53637f20dd91bef43b74942ec36c38c22d5" } datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="496fc399de700ae14fab436fdff8711cd3132436" }
hashbrown = { version = "0.13.2" } hashbrown = { version = "0.13.2" }
parquet = { version = "38.0.0" } parquet = { version = "38.0.0" }
tonic = { version = "0.9.2", features = ["tls", "tls-webpki-roots"] } tonic = { version = "0.9.2", features = ["tls", "tls-webpki-roots"] }

View File

@ -23,13 +23,10 @@ use datafusion::arrow::datatypes::{DataType, Fields};
use datafusion::common::{DataFusionError, ToDFSchema}; use datafusion::common::{DataFusionError, ToDFSchema};
use datafusion::datasource::MemTable; use datafusion::datasource::MemTable;
use datafusion::execution::context::TaskContext; use datafusion::execution::context::TaskContext;
use datafusion::execution::memory_pool::UnboundedMemoryPool;
use datafusion::logical_expr::expr::Sort; use datafusion::logical_expr::expr::Sort;
use datafusion::physical_expr::execution_props::ExecutionProps; use datafusion::physical_expr::execution_props::ExecutionProps;
use datafusion::physical_expr::{create_physical_expr, PhysicalExpr}; use datafusion::physical_expr::{create_physical_expr, PhysicalExpr};
use datafusion::physical_optimizer::pruning::PruningPredicate; use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::common::SizedRecordBatchStream;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
use datafusion::physical_plan::{collect, EmptyRecordBatchStream, ExecutionPlan}; use datafusion::physical_plan::{collect, EmptyRecordBatchStream, ExecutionPlan};
use datafusion::prelude::{lit, Column, Expr, SessionContext}; use datafusion::prelude::{lit, Column, Expr, SessionContext};
use datafusion::{ use datafusion::{
@ -245,24 +242,18 @@ where
/// Create a SendableRecordBatchStream a RecordBatch /// Create a SendableRecordBatchStream a RecordBatch
pub fn stream_from_batch(schema: SchemaRef, batch: RecordBatch) -> SendableRecordBatchStream { pub fn stream_from_batch(schema: SchemaRef, batch: RecordBatch) -> SendableRecordBatchStream {
stream_from_batches(schema, vec![Arc::new(batch)]) stream_from_batches(schema, vec![batch])
} }
/// Create a SendableRecordBatchStream from Vec of RecordBatches with the same schema /// Create a SendableRecordBatchStream from Vec of RecordBatches with the same schema
pub fn stream_from_batches( pub fn stream_from_batches(
schema: SchemaRef, schema: SchemaRef,
batches: Vec<Arc<RecordBatch>>, batches: Vec<RecordBatch>,
) -> SendableRecordBatchStream { ) -> SendableRecordBatchStream {
if batches.is_empty() { if batches.is_empty() {
return Box::pin(EmptyRecordBatchStream::new(schema)); return Box::pin(EmptyRecordBatchStream::new(schema));
} }
Box::pin(MemoryStream::new_with_schema(batches, schema))
// TODO should track this memory properly
let dummy_pool = Arc::new(UnboundedMemoryPool::default()) as _;
let dummy_metrics = ExecutionPlanMetricsSet::new();
let mem_metrics = MemTrackingMetrics::new(&dummy_metrics, &dummy_pool, 0);
let stream = SizedRecordBatchStream::new(batches[0].schema(), batches, mem_metrics);
Box::pin(stream)
} }
/// Create a SendableRecordBatchStream that sends back no RecordBatches with a specific schema /// Create a SendableRecordBatchStream that sends back no RecordBatches with a specific schema

View File

@ -1281,7 +1281,7 @@ mod tests {
.map(|batches| { .map(|batches| {
let batches = batches let batches = batches
.into_iter() .into_iter()
.map(|chunk| Arc::new(parse_to_record_batch(Arc::clone(&schema), &chunk))) .map(|chunk| parse_to_record_batch(Arc::clone(&schema), &chunk))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
stream_from_batches(Arc::clone(&schema), batches) stream_from_batches(Arc::clone(&schema), batches)

View File

@ -8,8 +8,9 @@ use datafusion::{
common::tree_node::{RewriteRecursion, TreeNode, TreeNodeRewriter, VisitRecursion}, common::tree_node::{RewriteRecursion, TreeNode, TreeNodeRewriter, VisitRecursion},
error::{DataFusionError, Result}, error::{DataFusionError, Result},
logical_expr::{ logical_expr::{
utils::expr_to_columns, Aggregate, BuiltinScalarFunction, Extension, LogicalPlan, expr::{ScalarFunction, ScalarUDF},
Projection, utils::expr_to_columns,
Aggregate, BuiltinScalarFunction, Extension, LogicalPlan, Projection,
}, },
optimizer::{optimizer::ApplyOrder, OptimizerConfig, OptimizerRule}, optimizer::{optimizer::ApplyOrder, OptimizerConfig, OptimizerRule},
prelude::{col, Expr}, prelude::{col, Expr},
@ -330,7 +331,7 @@ impl TreeNodeRewriter for DateBinGapfillRewriter {
type N = Expr; type N = Expr;
fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> { fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> {
match expr { match expr {
Expr::ScalarUDF { fun, .. } if fun.name == DATE_BIN_GAPFILL_UDF_NAME => { Expr::ScalarUDF(ScalarUDF { fun, .. }) if fun.name == DATE_BIN_GAPFILL_UDF_NAME => {
Ok(RewriteRecursion::Mutate) Ok(RewriteRecursion::Mutate)
} }
_ => Ok(RewriteRecursion::Continue), _ => Ok(RewriteRecursion::Continue),
@ -342,12 +343,12 @@ impl TreeNodeRewriter for DateBinGapfillRewriter {
// so that everything stays wired up. // so that everything stays wired up.
let orig_name = expr.display_name()?; let orig_name = expr.display_name()?;
match expr { match expr {
Expr::ScalarUDF { fun, args } if fun.name == DATE_BIN_GAPFILL_UDF_NAME => { Expr::ScalarUDF(ScalarUDF { fun, args }) if fun.name == DATE_BIN_GAPFILL_UDF_NAME => {
self.args = Some(args.clone()); self.args = Some(args.clone());
Ok(Expr::ScalarFunction { Ok(Expr::ScalarFunction(ScalarFunction {
fun: BuiltinScalarFunction::DateBin, fun: BuiltinScalarFunction::DateBin,
args, args,
} })
.alias(orig_name)) .alias(orig_name))
} }
_ => Ok(expr), _ => Ok(expr),
@ -442,7 +443,7 @@ impl TreeNodeRewriter for FillFnRewriter {
type N = Expr; type N = Expr;
fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> { fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> {
match expr { match expr {
Expr::ScalarUDF { fun, .. } if udf_to_fill_strategy(&fun.name).is_some() => { Expr::ScalarUDF(ScalarUDF { fun, .. }) if udf_to_fill_strategy(&fun.name).is_some() => {
Ok(RewriteRecursion::Mutate) Ok(RewriteRecursion::Mutate)
} }
_ => Ok(RewriteRecursion::Continue), _ => Ok(RewriteRecursion::Continue),
@ -452,10 +453,12 @@ impl TreeNodeRewriter for FillFnRewriter {
fn mutate(&mut self, expr: Expr) -> Result<Expr> { fn mutate(&mut self, expr: Expr) -> Result<Expr> {
let orig_name = expr.display_name()?; let orig_name = expr.display_name()?;
match expr { match expr {
Expr::ScalarUDF { ref fun, .. } if udf_to_fill_strategy(&fun.name).is_none() => { Expr::ScalarUDF(ScalarUDF { ref fun, .. })
if udf_to_fill_strategy(&fun.name).is_none() =>
{
Ok(expr) Ok(expr)
} }
Expr::ScalarUDF { fun, mut args } => { Expr::ScalarUDF(ScalarUDF { fun, mut args }) => {
let fs = udf_to_fill_strategy(&fun.name).expect("must be a fill fn"); let fs = udf_to_fill_strategy(&fun.name).expect("must be a fill fn");
let arg = args.remove(0); let arg = args.remove(0);
self.add_fill_strategy(arg.clone(), fs)?; self.add_fill_strategy(arg.clone(), fs)?;
@ -484,7 +487,7 @@ fn count_udf(e: &Expr, name: &str) -> Result<usize> {
let mut count = 0; let mut count = 0;
e.apply(&mut |expr| { e.apply(&mut |expr| {
match expr { match expr {
Expr::ScalarUDF { fun, .. } if fun.name == name => { Expr::ScalarUDF(ScalarUDF { fun, .. }) if fun.name == name => {
count += 1; count += 1;
} }
_ => (), _ => (),
@ -522,6 +525,7 @@ mod test {
use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::error::Result; use datafusion::error::Result;
use datafusion::logical_expr::expr::ScalarUDF;
use datafusion::logical_expr::{logical_plan, LogicalPlan, LogicalPlanBuilder}; use datafusion::logical_expr::{logical_plan, LogicalPlan, LogicalPlanBuilder};
use datafusion::optimizer::optimizer::Optimizer; use datafusion::optimizer::optimizer::Optimizer;
use datafusion::optimizer::OptimizerContext; use datafusion::optimizer::OptimizerContext;
@ -562,24 +566,24 @@ mod test {
if let Some(origin) = origin { if let Some(origin) = origin {
args.push(origin) args.push(origin)
} }
Ok(Expr::ScalarUDF { Ok(Expr::ScalarUDF(ScalarUDF {
fun: query_functions::registry().udf(DATE_BIN_GAPFILL_UDF_NAME)?, fun: query_functions::registry().udf(DATE_BIN_GAPFILL_UDF_NAME)?,
args, args,
}) }))
} }
fn locf(arg: Expr) -> Result<Expr> { fn locf(arg: Expr) -> Result<Expr> {
Ok(Expr::ScalarUDF { Ok(Expr::ScalarUDF(ScalarUDF {
fun: query_functions::registry().udf(LOCF_UDF_NAME)?, fun: query_functions::registry().udf(LOCF_UDF_NAME)?,
args: vec![arg], args: vec![arg],
}) }))
} }
fn interpolate(arg: Expr) -> Result<Expr> { fn interpolate(arg: Expr) -> Result<Expr> {
Ok(Expr::ScalarUDF { Ok(Expr::ScalarUDF(ScalarUDF {
fun: query_functions::registry().udf(INTERPOLATE_UDF_NAME)?, fun: query_functions::registry().udf(INTERPOLATE_UDF_NAME)?,
args: vec![arg], args: vec![arg],
}) }))
} }
fn optimize(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> { fn optimize(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {

View File

@ -1,7 +1,7 @@
use datafusion::{ use datafusion::{
common::{tree_node::TreeNodeRewriter, DFSchema}, common::{tree_node::TreeNodeRewriter, DFSchema},
error::DataFusionError, error::DataFusionError,
logical_expr::{utils::from_plan, LogicalPlan, Operator}, logical_expr::{expr::ScalarUDF, utils::from_plan, LogicalPlan, Operator},
optimizer::{utils::rewrite_preserving_name, OptimizerConfig, OptimizerRule}, optimizer::{utils::rewrite_preserving_name, OptimizerConfig, OptimizerRule},
prelude::{binary_expr, lit, Expr}, prelude::{binary_expr, lit, Expr},
scalar::ScalarValue, scalar::ScalarValue,
@ -72,7 +72,7 @@ impl TreeNodeRewriter for InfluxRegexToDataFusionRegex {
fn mutate(&mut self, expr: Expr) -> Result<Expr, DataFusionError> { fn mutate(&mut self, expr: Expr) -> Result<Expr, DataFusionError> {
match expr { match expr {
Expr::ScalarUDF { fun, mut args } => { Expr::ScalarUDF(ScalarUDF { fun, mut args }) => {
if (args.len() == 2) if (args.len() == 2)
&& ((fun.name == REGEX_MATCH_UDF_NAME) && ((fun.name == REGEX_MATCH_UDF_NAME)
|| (fun.name == REGEX_NOT_MATCH_UDF_NAME)) || (fun.name == REGEX_NOT_MATCH_UDF_NAME))
@ -88,7 +88,7 @@ impl TreeNodeRewriter for InfluxRegexToDataFusionRegex {
} }
} }
Ok(Expr::ScalarUDF { fun, args }) Ok(Expr::ScalarUDF(ScalarUDF { fun, args }))
} }
_ => Ok(expr), _ => Ok(expr),
} }

View File

@ -20,6 +20,7 @@ use chrono_tz::Tz;
use datafusion::catalog::TableReference; use datafusion::catalog::TableReference;
use datafusion::common::{DFSchema, DFSchemaRef, Result, ScalarValue, ToDFSchema}; use datafusion::common::{DFSchema, DFSchemaRef, Result, ScalarValue, ToDFSchema};
use datafusion::datasource::{provider_as_source, MemTable}; use datafusion::datasource::{provider_as_source, MemTable};
use datafusion::logical_expr::expr::ScalarFunction;
use datafusion::logical_expr::expr_rewriter::normalize_col; use datafusion::logical_expr::expr_rewriter::normalize_col;
use datafusion::logical_expr::logical_plan::builder::project; use datafusion::logical_expr::logical_plan::builder::project;
use datafusion::logical_expr::logical_plan::Analyze; use datafusion::logical_expr::logical_plan::Analyze;
@ -617,10 +618,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
&& fill_option != FillClause::None && fill_option != FillClause::None
{ {
let args = match select_exprs[time_column_index].clone().unalias() { let args = match select_exprs[time_column_index].clone().unalias() {
Expr::ScalarFunction { Expr::ScalarFunction(ScalarFunction {
fun: BuiltinScalarFunction::DateBin, fun: BuiltinScalarFunction::DateBin,
args, args,
} => args, }) => args,
_ => { _ => {
// The InfluxQL planner adds the `date_bin` function, // The InfluxQL planner adds the `date_bin` function,
// so this condition represents an internal failure. // so this condition represents an internal failure.
@ -1159,13 +1160,13 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
if args.len() != 2 { if args.len() != 2 {
error::query("invalid number of arguments for log, expected 2, got 1") error::query("invalid number of arguments for log, expected 2, got 1")
} else { } else {
Ok(Expr::ScalarFunction { Ok(Expr::ScalarFunction(ScalarFunction {
fun: BuiltinScalarFunction::Log, fun: BuiltinScalarFunction::Log,
args: args.into_iter().rev().collect(), args: args.into_iter().rev().collect(),
}) }))
} }
} }
fun => Ok(Expr::ScalarFunction { fun, args }), fun => Ok(Expr::ScalarFunction(ScalarFunction { fun, args })),
} }
} }
@ -1411,7 +1412,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
// - not null if it had any non-null values // - not null if it had any non-null values
// //
// note that since we only have a single row, this is efficient // note that since we only have a single row, this is efficient
.project([Expr::ScalarFunction { .project([Expr::ScalarFunction(ScalarFunction {
fun: BuiltinScalarFunction::MakeArray, fun: BuiltinScalarFunction::MakeArray,
args: tags args: tags
.iter() .iter()
@ -1421,7 +1422,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
when(tag_col.gt(lit(0)), lit(*tag)).end() when(tag_col.gt(lit(0)), lit(*tag)).end()
}) })
.collect::<Result<Vec<_>, _>>()?, .collect::<Result<Vec<_>, _>>()?,
} })
.alias(tag_key_col)])? .alias(tag_key_col)])?
// roll our single array row into one row per tag key // roll our single array row into one row per tag key
.unnest_column(tag_key_df_col)? .unnest_column(tag_key_df_col)?

View File

@ -8,6 +8,9 @@
//! //!
//! NOTE //! NOTE
use datafusion::common::Result; use datafusion::common::Result;
use datafusion::logical_expr::expr::{
AggregateUDF, InList, InSubquery, Placeholder, ScalarFunction, ScalarUDF,
};
use datafusion::logical_expr::{ use datafusion::logical_expr::{
expr::{ expr::{
AggregateFunction, Between, BinaryExpr, Case, Cast, Expr, GetIndexedField, GroupingSet, AggregateFunction, Between, BinaryExpr, Case, Cast, Expr, GetIndexedField, GroupingSet,
@ -84,14 +87,16 @@ where
.collect::<Result<Vec<_>>>()?, .collect::<Result<Vec<_>>>()?,
window_frame.clone(), window_frame.clone(),
))), ))),
Expr::AggregateUDF { fun, args, filter } => Ok(Expr::AggregateUDF { Expr::AggregateUDF(AggregateUDF { fun, args, filter }) => {
fun: fun.clone(), Ok(Expr::AggregateUDF(AggregateUDF {
args: args fun: fun.clone(),
.iter() args: args
.map(|e| clone_with_replacement(e, replacement_fn)) .iter()
.collect::<Result<Vec<Expr>>>()?, .map(|e| clone_with_replacement(e, replacement_fn))
filter: filter.clone(), .collect::<Result<Vec<Expr>>>()?,
}), filter: filter.clone(),
}))
}
Expr::Alias(nested_expr, alias_name) => Ok(Expr::Alias( Expr::Alias(nested_expr, alias_name) => Ok(Expr::Alias(
Box::new(clone_with_replacement(nested_expr, replacement_fn)?), Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
alias_name.clone(), alias_name.clone(),
@ -107,18 +112,18 @@ where
Box::new(clone_with_replacement(low, replacement_fn)?), Box::new(clone_with_replacement(low, replacement_fn)?),
Box::new(clone_with_replacement(high, replacement_fn)?), Box::new(clone_with_replacement(high, replacement_fn)?),
))), ))),
Expr::InList { Expr::InList(InList {
expr: nested_expr, expr: nested_expr,
list, list,
negated, negated,
} => Ok(Expr::InList { }) => Ok(Expr::InList(InList {
expr: Box::new(clone_with_replacement(nested_expr, replacement_fn)?), expr: Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
list: list list: list
.iter() .iter()
.map(|e| clone_with_replacement(e, replacement_fn)) .map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?, .collect::<Result<Vec<Expr>>>()?,
negated: *negated, negated: *negated,
}), })),
Expr::BinaryExpr(BinaryExpr { left, right, op }) => { Expr::BinaryExpr(BinaryExpr { left, right, op }) => {
Ok(Expr::BinaryExpr(BinaryExpr::new( Ok(Expr::BinaryExpr(BinaryExpr::new(
Box::new(clone_with_replacement(left, replacement_fn)?), Box::new(clone_with_replacement(left, replacement_fn)?),
@ -182,20 +187,22 @@ where
None => None, None => None,
}, },
))), ))),
Expr::ScalarFunction { fun, args } => Ok(Expr::ScalarFunction { Expr::ScalarFunction(ScalarFunction { fun, args }) => {
fun: fun.clone(), Ok(Expr::ScalarFunction(ScalarFunction {
args: args fun: fun.clone(),
.iter() args: args
.map(|e| clone_with_replacement(e, replacement_fn)) .iter()
.collect::<Result<Vec<Expr>>>()?, .map(|e| clone_with_replacement(e, replacement_fn))
}), .collect::<Result<Vec<Expr>>>()?,
Expr::ScalarUDF { fun, args } => Ok(Expr::ScalarUDF { }))
}
Expr::ScalarUDF(ScalarUDF { fun, args }) => Ok(Expr::ScalarUDF(ScalarUDF {
fun: fun.clone(), fun: fun.clone(),
args: args args: args
.iter() .iter()
.map(|arg| clone_with_replacement(arg, replacement_fn)) .map(|arg| clone_with_replacement(arg, replacement_fn))
.collect::<Result<Vec<Expr>>>()?, .collect::<Result<Vec<Expr>>>()?,
}), })),
Expr::Negative(nested_expr) => Ok(Expr::Negative(Box::new( Expr::Negative(nested_expr) => Ok(Expr::Negative(Box::new(
clone_with_replacement(nested_expr, replacement_fn)?, clone_with_replacement(nested_expr, replacement_fn)?,
))), ))),
@ -256,15 +263,15 @@ where
| Expr::ScalarVariable(_, _) | Expr::ScalarVariable(_, _)
| Expr::Exists { .. } | Expr::Exists { .. }
| Expr::ScalarSubquery(_) => Ok(expr.clone()), | Expr::ScalarSubquery(_) => Ok(expr.clone()),
Expr::InSubquery { Expr::InSubquery(InSubquery {
expr: nested_expr, expr: nested_expr,
subquery, subquery,
negated, negated,
} => Ok(Expr::InSubquery { }) => Ok(Expr::InSubquery(InSubquery {
expr: Box::new(clone_with_replacement(nested_expr, replacement_fn)?), expr: Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
subquery: subquery.clone(), subquery: subquery.clone(),
negated: *negated, negated: *negated,
}), })),
Expr::Wildcard => Ok(Expr::Wildcard), Expr::Wildcard => Ok(Expr::Wildcard),
Expr::QualifiedWildcard { .. } => Ok(expr.clone()), Expr::QualifiedWildcard { .. } => Ok(expr.clone()),
Expr::GetIndexedField(GetIndexedField { key, expr }) => { Expr::GetIndexedField(GetIndexedField { key, expr }) => {
@ -301,10 +308,12 @@ where
))) )))
} }
}, },
Expr::Placeholder { id, data_type } => Ok(Expr::Placeholder { Expr::Placeholder(Placeholder { id, data_type }) => {
id: id.clone(), Ok(Expr::Placeholder(Placeholder {
data_type: data_type.clone(), id: id.clone(),
}), data_type: data_type.clone(),
}))
}
} }
} }
} }

View File

@ -181,10 +181,10 @@ fn scalar_coalesce_struct(scalar1: ScalarValue, scalar2: &ScalarValue) -> Scalar
/// ///
/// See [module-level docs](self) for more information. /// See [module-level docs](self) for more information.
pub fn coalesce_struct(args: Vec<Expr>) -> Expr { pub fn coalesce_struct(args: Vec<Expr>) -> Expr {
Expr::ScalarUDF { Expr::ScalarUDF(datafusion::logical_expr::expr::ScalarUDF {
fun: Arc::clone(&COALESCE_STRUCT_UDF), fun: Arc::clone(&COALESCE_STRUCT_UDF),
args, args,
} })
} }
#[cfg(test)] #[cfg(test)]

View File

@ -30,9 +30,9 @@ bytes = { version = "1" }
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] } chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] }
crossbeam-utils = { version = "0.8" } crossbeam-utils = { version = "0.8" }
crypto-common = { version = "0.1", default-features = false, features = ["std"] } crypto-common = { version = "0.1", default-features = false, features = ["std"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "06e9f53637f20dd91bef43b74942ec36c38c22d5" } datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "496fc399de700ae14fab436fdff8711cd3132436" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "06e9f53637f20dd91bef43b74942ec36c38c22d5", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "496fc399de700ae14fab436fdff8711cd3132436", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "06e9f53637f20dd91bef43b74942ec36c38c22d5", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "496fc399de700ae14fab436fdff8711cd3132436", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
digest = { version = "0.10", features = ["mac", "std"] } digest = { version = "0.10", features = ["mac", "std"] }
either = { version = "1" } either = { version = "1" }
fixedbitset = { version = "0.4" } fixedbitset = { version = "0.4" }