chore: Update DataFusion (#8447)

* chore: Update DataFusion pin

* chore: Update for API changes
pull/24376/head
Andrew Lamb 2023-08-08 08:41:14 -05:00 committed by GitHub
parent 71b32fbdba
commit 46bfa0badc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 306 additions and 313 deletions

16
Cargo.lock generated
View File

@ -1389,7 +1389,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "28.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=5faa10b2911ecca4c2199f78ae675363c7d8230e#5faa10b2911ecca4c2199f78ae675363c7d8230e"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=99e2cd4b4082296b0e7f98b0fb122861c4f74a11#99e2cd4b4082296b0e7f98b0fb122861c4f74a11"
dependencies = [
"ahash",
"arrow",
@ -1437,7 +1437,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "28.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=5faa10b2911ecca4c2199f78ae675363c7d8230e#5faa10b2911ecca4c2199f78ae675363c7d8230e"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=99e2cd4b4082296b0e7f98b0fb122861c4f74a11#99e2cd4b4082296b0e7f98b0fb122861c4f74a11"
dependencies = [
"arrow",
"arrow-array",
@ -1451,7 +1451,7 @@ dependencies = [
[[package]]
name = "datafusion-execution"
version = "28.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=5faa10b2911ecca4c2199f78ae675363c7d8230e#5faa10b2911ecca4c2199f78ae675363c7d8230e"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=99e2cd4b4082296b0e7f98b0fb122861c4f74a11#99e2cd4b4082296b0e7f98b0fb122861c4f74a11"
dependencies = [
"dashmap",
"datafusion-common",
@ -1468,7 +1468,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "28.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=5faa10b2911ecca4c2199f78ae675363c7d8230e#5faa10b2911ecca4c2199f78ae675363c7d8230e"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=99e2cd4b4082296b0e7f98b0fb122861c4f74a11#99e2cd4b4082296b0e7f98b0fb122861c4f74a11"
dependencies = [
"ahash",
"arrow",
@ -1482,7 +1482,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "28.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=5faa10b2911ecca4c2199f78ae675363c7d8230e#5faa10b2911ecca4c2199f78ae675363c7d8230e"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=99e2cd4b4082296b0e7f98b0fb122861c4f74a11#99e2cd4b4082296b0e7f98b0fb122861c4f74a11"
dependencies = [
"arrow",
"async-trait",
@ -1499,7 +1499,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "28.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=5faa10b2911ecca4c2199f78ae675363c7d8230e#5faa10b2911ecca4c2199f78ae675363c7d8230e"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=99e2cd4b4082296b0e7f98b0fb122861c4f74a11#99e2cd4b4082296b0e7f98b0fb122861c4f74a11"
dependencies = [
"ahash",
"arrow",
@ -1533,7 +1533,7 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "28.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=5faa10b2911ecca4c2199f78ae675363c7d8230e#5faa10b2911ecca4c2199f78ae675363c7d8230e"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=99e2cd4b4082296b0e7f98b0fb122861c4f74a11#99e2cd4b4082296b0e7f98b0fb122861c4f74a11"
dependencies = [
"arrow",
"chrono",
@ -1547,7 +1547,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "28.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=5faa10b2911ecca4c2199f78ae675363c7d8230e#5faa10b2911ecca4c2199f78ae675363c7d8230e"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=99e2cd4b4082296b0e7f98b0fb122861c4f74a11#99e2cd4b4082296b0e7f98b0fb122861c4f74a11"
dependencies = [
"arrow",
"arrow-schema",

View File

@ -121,8 +121,8 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies]
arrow = { version = "43.0.0" }
arrow-flight = { version = "43.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "5faa10b2911ecca4c2199f78ae675363c7d8230e", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "5faa10b2911ecca4c2199f78ae675363c7d8230e" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "99e2cd4b4082296b0e7f98b0fb122861c4f74a11", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "99e2cd4b4082296b0e7f98b0fb122861c4f74a11" }
hashbrown = { version = "0.14.0" }
object_store = { version = "0.6.0" }

View File

@ -39,9 +39,9 @@ use datafusion::logical_expr::utils::{expr_as_column_expr, find_aggregate_exprs}
use datafusion::logical_expr::{
binary_expr, col, date_bin, expr, expr::WindowFunction, lit, lit_timestamp_nano, now, union,
window_function, AggregateFunction, AggregateUDF, Between, BuiltInWindowFunction,
BuiltinScalarFunction, EmptyRelation, Explain, Expr, ExprSchemable, Extension, GetIndexedField,
LogicalPlan, LogicalPlanBuilder, Operator, PlanType, Projection, ScalarUDF, TableSource,
ToStringifiedPlan, WindowFrame, WindowFrameBound, WindowFrameUnits,
BuiltinScalarFunction, EmptyRelation, Explain, Expr, ExprSchemable, Extension, LogicalPlan,
LogicalPlanBuilder, Operator, PlanType, Projection, ScalarUDF, TableSource, ToStringifiedPlan,
WindowFrame, WindowFrameBound, WindowFrameUnits,
};
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_expr::execution_props::ExecutionProps;
@ -1198,11 +1198,8 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
aggr_exprs[0] = selector_new.clone();
for (idx, struct_name, out_alias) in fields_to_extract {
select_exprs[idx] = Expr::GetIndexedField(GetIndexedField {
expr: Box::new(selector_new.clone()),
key: ScalarValue::Utf8(Some(struct_name)),
})
.alias(out_alias);
select_exprs[idx] =
selector_new.clone().field(struct_name).alias(out_alias);
should_fill_expr[idx] = true;
}
}
@ -1244,10 +1241,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
}
};
Expr::GetIndexedField(GetIndexedField {
expr: Box::new(selector),
key: ScalarValue::Utf8(Some("time".to_owned())),
})
selector.field("time")
} else {
lit_timestamp_nano(0)
}
@ -2013,10 +2007,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
}
.call(vec![expr, "time".as_expr()]);
Ok(Expr::GetIndexedField(GetIndexedField {
expr: Box::new(selector_udf),
key: ScalarValue::Utf8(Some("value".to_owned())),
}))
Ok(selector_udf.field("value"))
}
"difference" => {
check_arg_count(name, args, 1)?;

View File

@ -12,6 +12,7 @@ use datafusion::common::Result;
use datafusion::logical_expr::expr::{
AggregateUDF, Alias, InList, InSubquery, Placeholder, ScalarFunction, ScalarUDF,
};
use datafusion::logical_expr::GetFieldAccess;
use datafusion::logical_expr::{
expr::{
AggregateFunction, Between, BinaryExpr, Case, Cast, Expr, GetIndexedField, GroupingSet,
@ -20,6 +21,7 @@ use datafusion::logical_expr::{
utils::expr_as_column_expr,
LogicalPlan,
};
use datafusion::physical_plan::expressions::GetFieldAccessExpr;
/// Returns a cloned `Expr`, but any of the `Expr`'s in the tree may be
/// replaced/customized by the replacement function.
@ -52,273 +54,288 @@ where
Some(replacement) => Ok(replacement),
// No replacement was provided, clone the node and recursively call
// clone_with_replacement() on any nested expressions.
None => {
match expr {
Expr::AggregateFunction(AggregateFunction {
fun,
args,
distinct,
filter,
order_by,
}) => Ok(Expr::AggregateFunction(AggregateFunction::new(
fun.clone(),
args.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
*distinct,
filter.clone(),
order_by.clone(),
))),
Expr::WindowFunction(WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
}) => Ok(Expr::WindowFunction(WindowFunction::new(
fun.clone(),
args.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<_>>>()?,
partition_by
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<_>>>()?,
order_by
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<_>>>()?,
window_frame.clone(),
))),
Expr::AggregateUDF(AggregateUDF {
fun,
args,
filter,
order_by,
}) => Ok(Expr::AggregateUDF(AggregateUDF {
fun: fun.clone(),
args: args
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
filter: filter.clone(),
order_by: order_by.clone(),
})),
Expr::Alias(Alias {
expr: nested_expr,
name: alias_name,
}) => Ok(Expr::Alias(Alias {
expr: Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
name: alias_name.clone(),
})),
Expr::Between(Between {
expr,
negated,
low,
high,
}) => Ok(Expr::Between(Between::new(
Box::new(clone_with_replacement(expr, replacement_fn)?),
*negated,
Box::new(clone_with_replacement(low, replacement_fn)?),
Box::new(clone_with_replacement(high, replacement_fn)?),
))),
Expr::InList(InList {
expr: nested_expr,
list,
negated,
}) => Ok(Expr::InList(InList {
expr: Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
list: list
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
negated: *negated,
})),
Expr::BinaryExpr(BinaryExpr { left, right, op }) => {
Ok(Expr::BinaryExpr(BinaryExpr::new(
Box::new(clone_with_replacement(left, replacement_fn)?),
*op,
Box::new(clone_with_replacement(right, replacement_fn)?),
)))
}
Expr::Like(Like {
negated,
expr,
pattern,
case_insensitive,
escape_char,
}) => Ok(Expr::Like(Like::new(
*negated,
Box::new(clone_with_replacement(expr, replacement_fn)?),
Box::new(clone_with_replacement(pattern, replacement_fn)?),
*escape_char,
*case_insensitive,
))),
Expr::SimilarTo(Like {
negated,
expr,
pattern,
case_insensitive,
escape_char,
}) => Ok(Expr::SimilarTo(Like::new(
*negated,
Box::new(clone_with_replacement(expr, replacement_fn)?),
Box::new(clone_with_replacement(pattern, replacement_fn)?),
*escape_char,
*case_insensitive,
))),
Expr::Case(case) => Ok(Expr::Case(Case::new(
match &case.expr {
Some(case_expr) => {
Some(Box::new(clone_with_replacement(case_expr, replacement_fn)?))
}
None => None,
},
case.when_then_expr
.iter()
.map(|(a, b)| {
Ok((
Box::new(clone_with_replacement(a, replacement_fn)?),
Box::new(clone_with_replacement(b, replacement_fn)?),
))
})
.collect::<Result<Vec<(_, _)>>>()?,
match &case.else_expr {
Some(else_expr) => {
Some(Box::new(clone_with_replacement(else_expr, replacement_fn)?))
}
None => None,
},
))),
Expr::ScalarFunction(ScalarFunction { fun, args }) => {
Ok(Expr::ScalarFunction(ScalarFunction {
fun: fun.clone(),
args: args
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
}))
}
Expr::ScalarUDF(ScalarUDF { fun, args }) => Ok(Expr::ScalarUDF(ScalarUDF {
fun: fun.clone(),
args: args
.iter()
.map(|arg| clone_with_replacement(arg, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
})),
Expr::Negative(nested_expr) => Ok(Expr::Negative(Box::new(
clone_with_replacement(nested_expr, replacement_fn)?,
))),
Expr::Not(nested_expr) => Ok(Expr::Not(Box::new(clone_with_replacement(
nested_expr,
replacement_fn,
)?))),
Expr::IsNotNull(nested_expr) => Ok(Expr::IsNotNull(Box::new(
clone_with_replacement(nested_expr, replacement_fn)?,
))),
Expr::IsNull(nested_expr) => Ok(Expr::IsNull(Box::new(clone_with_replacement(
nested_expr,
replacement_fn,
)?))),
Expr::IsTrue(nested_expr) => Ok(Expr::IsTrue(Box::new(clone_with_replacement(
nested_expr,
replacement_fn,
)?))),
Expr::IsFalse(nested_expr) => Ok(Expr::IsFalse(Box::new(clone_with_replacement(
nested_expr,
replacement_fn,
)?))),
Expr::IsUnknown(nested_expr) => Ok(Expr::IsUnknown(Box::new(
clone_with_replacement(nested_expr, replacement_fn)?,
))),
Expr::IsNotTrue(nested_expr) => Ok(Expr::IsNotTrue(Box::new(
clone_with_replacement(nested_expr, replacement_fn)?,
))),
Expr::IsNotFalse(nested_expr) => Ok(Expr::IsNotFalse(Box::new(
clone_with_replacement(nested_expr, replacement_fn)?,
))),
Expr::IsNotUnknown(nested_expr) => Ok(Expr::IsNotUnknown(Box::new(
clone_with_replacement(nested_expr, replacement_fn)?,
))),
Expr::Cast(Cast { expr, data_type }) => Ok(Expr::Cast(Cast::new(
Box::new(clone_with_replacement(expr, replacement_fn)?),
data_type.clone(),
))),
Expr::TryCast(TryCast {
expr: nested_expr,
data_type,
}) => Ok(Expr::TryCast(TryCast::new(
Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
data_type.clone(),
))),
Expr::Sort(Sort {
expr: nested_expr,
asc,
nulls_first,
}) => Ok(Expr::Sort(Sort::new(
Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
*asc,
*nulls_first,
))),
Expr::Column { .. }
| Expr::OuterReferenceColumn(_, _)
| Expr::Literal(_)
| Expr::ScalarVariable(_, _)
| Expr::Exists { .. }
| Expr::ScalarSubquery(_) => Ok(expr.clone()),
Expr::InSubquery(InSubquery {
expr: nested_expr,
subquery,
negated,
}) => Ok(Expr::InSubquery(InSubquery {
expr: Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
subquery: subquery.clone(),
negated: *negated,
})),
Expr::Wildcard => Ok(Expr::Wildcard),
Expr::QualifiedWildcard { .. } => Ok(expr.clone()),
Expr::GetIndexedField(GetIndexedField { key, expr }) => {
Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(clone_with_replacement(expr.as_ref(), replacement_fn)?),
key.clone(),
)))
}
Expr::GroupingSet(set) => match set {
GroupingSet::Rollup(exprs) => Ok(Expr::GroupingSet(GroupingSet::Rollup(
exprs
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
))),
GroupingSet::Cube(exprs) => Ok(Expr::GroupingSet(GroupingSet::Cube(
exprs
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
))),
GroupingSet::GroupingSets(lists_of_exprs) => {
let mut new_lists_of_exprs = vec![];
for exprs in lists_of_exprs {
new_lists_of_exprs.push(
exprs
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
);
}
Ok(Expr::GroupingSet(GroupingSet::GroupingSets(
new_lists_of_exprs,
)))
}
},
Expr::Placeholder(Placeholder { id, data_type }) => {
Ok(Expr::Placeholder(Placeholder {
id: id.clone(),
data_type: data_type.clone(),
}))
}
None => match expr {
Expr::AggregateFunction(AggregateFunction {
fun,
args,
distinct,
filter,
order_by,
}) => Ok(Expr::AggregateFunction(AggregateFunction::new(
fun.clone(),
args.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
*distinct,
filter.clone(),
order_by.clone(),
))),
Expr::WindowFunction(WindowFunction {
fun,
args,
partition_by,
order_by,
window_frame,
}) => Ok(Expr::WindowFunction(WindowFunction::new(
fun.clone(),
args.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<_>>>()?,
partition_by
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<_>>>()?,
order_by
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<_>>>()?,
window_frame.clone(),
))),
Expr::AggregateUDF(AggregateUDF {
fun,
args,
filter,
order_by,
}) => Ok(Expr::AggregateUDF(AggregateUDF {
fun: fun.clone(),
args: args
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
filter: filter.clone(),
order_by: order_by.clone(),
})),
Expr::Alias(Alias {
expr: nested_expr,
name: alias_name,
}) => Ok(Expr::Alias(Alias {
expr: Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
name: alias_name.clone(),
})),
Expr::Between(Between {
expr,
negated,
low,
high,
}) => Ok(Expr::Between(Between::new(
Box::new(clone_with_replacement(expr, replacement_fn)?),
*negated,
Box::new(clone_with_replacement(low, replacement_fn)?),
Box::new(clone_with_replacement(high, replacement_fn)?),
))),
Expr::InList(InList {
expr: nested_expr,
list,
negated,
}) => Ok(Expr::InList(InList {
expr: Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
list: list
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
negated: *negated,
})),
Expr::BinaryExpr(BinaryExpr { left, right, op }) => {
Ok(Expr::BinaryExpr(BinaryExpr::new(
Box::new(clone_with_replacement(left, replacement_fn)?),
*op,
Box::new(clone_with_replacement(right, replacement_fn)?),
)))
}
}
Expr::Like(Like {
negated,
expr,
pattern,
case_insensitive,
escape_char,
}) => Ok(Expr::Like(Like::new(
*negated,
Box::new(clone_with_replacement(expr, replacement_fn)?),
Box::new(clone_with_replacement(pattern, replacement_fn)?),
*escape_char,
*case_insensitive,
))),
Expr::SimilarTo(Like {
negated,
expr,
pattern,
case_insensitive,
escape_char,
}) => Ok(Expr::SimilarTo(Like::new(
*negated,
Box::new(clone_with_replacement(expr, replacement_fn)?),
Box::new(clone_with_replacement(pattern, replacement_fn)?),
*escape_char,
*case_insensitive,
))),
Expr::Case(case) => Ok(Expr::Case(Case::new(
match &case.expr {
Some(case_expr) => {
Some(Box::new(clone_with_replacement(case_expr, replacement_fn)?))
}
None => None,
},
case.when_then_expr
.iter()
.map(|(a, b)| {
Ok((
Box::new(clone_with_replacement(a, replacement_fn)?),
Box::new(clone_with_replacement(b, replacement_fn)?),
))
})
.collect::<Result<Vec<(_, _)>>>()?,
match &case.else_expr {
Some(else_expr) => {
Some(Box::new(clone_with_replacement(else_expr, replacement_fn)?))
}
None => None,
},
))),
Expr::ScalarFunction(ScalarFunction { fun, args }) => {
Ok(Expr::ScalarFunction(ScalarFunction {
fun: fun.clone(),
args: args
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
}))
}
Expr::ScalarUDF(ScalarUDF { fun, args }) => Ok(Expr::ScalarUDF(ScalarUDF {
fun: fun.clone(),
args: args
.iter()
.map(|arg| clone_with_replacement(arg, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
})),
Expr::Negative(nested_expr) => Ok(Expr::Negative(Box::new(clone_with_replacement(
nested_expr,
replacement_fn,
)?))),
Expr::Not(nested_expr) => Ok(Expr::Not(Box::new(clone_with_replacement(
nested_expr,
replacement_fn,
)?))),
Expr::IsNotNull(nested_expr) => Ok(Expr::IsNotNull(Box::new(clone_with_replacement(
nested_expr,
replacement_fn,
)?))),
Expr::IsNull(nested_expr) => Ok(Expr::IsNull(Box::new(clone_with_replacement(
nested_expr,
replacement_fn,
)?))),
Expr::IsTrue(nested_expr) => Ok(Expr::IsTrue(Box::new(clone_with_replacement(
nested_expr,
replacement_fn,
)?))),
Expr::IsFalse(nested_expr) => Ok(Expr::IsFalse(Box::new(clone_with_replacement(
nested_expr,
replacement_fn,
)?))),
Expr::IsUnknown(nested_expr) => Ok(Expr::IsUnknown(Box::new(clone_with_replacement(
nested_expr,
replacement_fn,
)?))),
Expr::IsNotTrue(nested_expr) => Ok(Expr::IsNotTrue(Box::new(clone_with_replacement(
nested_expr,
replacement_fn,
)?))),
Expr::IsNotFalse(nested_expr) => Ok(Expr::IsNotFalse(Box::new(
clone_with_replacement(nested_expr, replacement_fn)?,
))),
Expr::IsNotUnknown(nested_expr) => Ok(Expr::IsNotUnknown(Box::new(
clone_with_replacement(nested_expr, replacement_fn)?,
))),
Expr::Cast(Cast { expr, data_type }) => Ok(Expr::Cast(Cast::new(
Box::new(clone_with_replacement(expr, replacement_fn)?),
data_type.clone(),
))),
Expr::TryCast(TryCast {
expr: nested_expr,
data_type,
}) => Ok(Expr::TryCast(TryCast::new(
Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
data_type.clone(),
))),
Expr::Sort(Sort {
expr: nested_expr,
asc,
nulls_first,
}) => Ok(Expr::Sort(Sort::new(
Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
*asc,
*nulls_first,
))),
Expr::Column { .. }
| Expr::OuterReferenceColumn(_, _)
| Expr::Literal(_)
| Expr::ScalarVariable(_, _)
| Expr::Exists { .. }
| Expr::ScalarSubquery(_) => Ok(expr.clone()),
Expr::InSubquery(InSubquery {
expr: nested_expr,
subquery,
negated,
}) => Ok(Expr::InSubquery(InSubquery {
expr: Box::new(clone_with_replacement(nested_expr, replacement_fn)?),
subquery: subquery.clone(),
negated: *negated,
})),
Expr::Wildcard => Ok(Expr::Wildcard),
Expr::QualifiedWildcard { .. } => Ok(expr.clone()),
Expr::GetIndexedField(GetIndexedField { field, expr }) => {
let field = match field {
GetFieldAccess::NamedStructField { name } => {
GetFieldAccess::NamedStructField { name: name.clone() }
}
GetFieldAccess::ListIndex { key } => GetFieldAccess::ListIndex {
key: Box::new(clone_with_replacement(key.as_ref(), replacement_fn)?),
},
GetFieldAccess::ListRange { start, stop } => GetFieldAccess::ListRange {
start: Box::new(clone_with_replacement(start.as_ref(), replacement_fn)?),
stop: Box::new(clone_with_replacement(stop.as_ref(), replacement_fn)?),
},
};
Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(clone_with_replacement(expr.as_ref(), replacement_fn)?),
field,
)))
}
Expr::GroupingSet(set) => match set {
GroupingSet::Rollup(exprs) => Ok(Expr::GroupingSet(GroupingSet::Rollup(
exprs
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
))),
GroupingSet::Cube(exprs) => Ok(Expr::GroupingSet(GroupingSet::Cube(
exprs
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
))),
GroupingSet::GroupingSets(lists_of_exprs) => {
let mut new_lists_of_exprs = vec![];
for exprs in lists_of_exprs {
new_lists_of_exprs.push(
exprs
.iter()
.map(|e| clone_with_replacement(e, replacement_fn))
.collect::<Result<Vec<Expr>>>()?,
);
}
Ok(Expr::GroupingSet(GroupingSet::GroupingSets(
new_lists_of_exprs,
)))
}
},
Expr::Placeholder(Placeholder { id, data_type }) => {
Ok(Expr::Placeholder(Placeholder {
id: id.clone(),
data_type: data_type.clone(),
}))
}
},
}
}

View File

@ -22,11 +22,8 @@ use data_types::ChunkId;
use datafusion::{
common::DFSchemaRef,
error::DataFusionError,
logical_expr::{
utils::exprlist_to_columns, ExprSchemable, GetIndexedField, LogicalPlan, LogicalPlanBuilder,
},
logical_expr::{utils::exprlist_to_columns, ExprSchemable, LogicalPlan, LogicalPlanBuilder},
prelude::{when, Column, Expr},
scalar::ScalarValue,
};
use datafusion_util::AsExpr;
use futures::{Stream, StreamExt, TryStreamExt};
@ -1624,22 +1621,10 @@ impl AggExprs {
let selector = make_selector_expr(agg, field.clone())?;
let field_name = field.name;
agg_exprs.push(
Expr::GetIndexedField(GetIndexedField {
expr: Box::new(selector.clone()),
key: ScalarValue::from("value"),
})
.alias(field_name),
);
agg_exprs.push(selector.clone().field("value").alias(field_name));
let time_column_name = format!("{TIME_COLUMN_NAME}_{field_name}");
agg_exprs.push(
Expr::GetIndexedField(GetIndexedField {
expr: Box::new(selector.clone()),
key: ScalarValue::from("time"),
})
.alias(&time_column_name),
);
agg_exprs.push(selector.field("time").alias(&time_column_name));
field_list.push((
Arc::from(field_name), // value name

View File

@ -28,9 +28,9 @@ bytes = { version = "1" }
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] }
crossbeam-utils = { version = "0.8" }
crypto-common = { version = "0.1", default-features = false, features = ["std"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "5faa10b2911ecca4c2199f78ae675363c7d8230e" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "5faa10b2911ecca4c2199f78ae675363c7d8230e", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "5faa10b2911ecca4c2199f78ae675363c7d8230e", default-features = false, features = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "99e2cd4b4082296b0e7f98b0fb122861c4f74a11" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "99e2cd4b4082296b0e7f98b0fb122861c4f74a11", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "99e2cd4b4082296b0e7f98b0fb122861c4f74a11", default-features = false, features = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions"] }
digest = { version = "0.10", features = ["mac", "std"] }
either = { version = "1", features = ["serde"] }
fixedbitset = { version = "0.4" }