chore: Update datafusion again (#7440)

* chore: Update DataFusion

* chore: Update for new API

* chore: Run cargo hakari tasks

* fix: cargo doc

---------

Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
pull/24376/head
Andrew Lamb 2023-04-04 17:45:46 +02:00 committed by GitHub
parent 4a9317dfdb
commit badc8865ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 99 additions and 33 deletions

37
Cargo.lock generated
View File

@ -1499,8 +1499,8 @@ dependencies = [
[[package]]
name = "datafusion"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feddb3c94d2b9c0d4cfa074b63d577ee660916ca#feddb3c94d2b9c0d4cfa074b63d577ee660916ca"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2191a69b2d48b0b7230c9da5a75f86a4e659361b#2191a69b2d48b0b7230c9da5a75f86a4e659361b"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1546,8 +1546,8 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feddb3c94d2b9c0d4cfa074b63d577ee660916ca#feddb3c94d2b9c0d4cfa074b63d577ee660916ca"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2191a69b2d48b0b7230c9da5a75f86a4e659361b#2191a69b2d48b0b7230c9da5a75f86a4e659361b"
dependencies = [
"arrow",
"arrow-array",
@ -1560,8 +1560,8 @@ dependencies = [
[[package]]
name = "datafusion-execution"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feddb3c94d2b9c0d4cfa074b63d577ee660916ca#feddb3c94d2b9c0d4cfa074b63d577ee660916ca"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2191a69b2d48b0b7230c9da5a75f86a4e659361b#2191a69b2d48b0b7230c9da5a75f86a4e659361b"
dependencies = [
"dashmap",
"datafusion-common",
@ -1577,8 +1577,8 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feddb3c94d2b9c0d4cfa074b63d577ee660916ca#feddb3c94d2b9c0d4cfa074b63d577ee660916ca"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2191a69b2d48b0b7230c9da5a75f86a4e659361b#2191a69b2d48b0b7230c9da5a75f86a4e659361b"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1588,8 +1588,8 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feddb3c94d2b9c0d4cfa074b63d577ee660916ca#feddb3c94d2b9c0d4cfa074b63d577ee660916ca"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2191a69b2d48b0b7230c9da5a75f86a4e659361b#2191a69b2d48b0b7230c9da5a75f86a4e659361b"
dependencies = [
"arrow",
"async-trait",
@ -1605,11 +1605,12 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feddb3c94d2b9c0d4cfa074b63d577ee660916ca#feddb3c94d2b9c0d4cfa074b63d577ee660916ca"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2191a69b2d48b0b7230c9da5a75f86a4e659361b#2191a69b2d48b0b7230c9da5a75f86a4e659361b"
dependencies = [
"ahash 0.8.3",
"arrow",
"arrow-array",
"arrow-buffer",
"arrow-schema",
"blake2",
@ -1635,8 +1636,8 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feddb3c94d2b9c0d4cfa074b63d577ee660916ca#feddb3c94d2b9c0d4cfa074b63d577ee660916ca"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2191a69b2d48b0b7230c9da5a75f86a4e659361b#2191a69b2d48b0b7230c9da5a75f86a4e659361b"
dependencies = [
"arrow",
"chrono",
@ -1649,8 +1650,8 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feddb3c94d2b9c0d4cfa074b63d577ee660916ca#feddb3c94d2b9c0d4cfa074b63d577ee660916ca"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2191a69b2d48b0b7230c9da5a75f86a4e659361b#2191a69b2d48b0b7230c9da5a75f86a4e659361b"
dependencies = [
"arrow",
"datafusion-common",
@ -1660,8 +1661,8 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "21.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feddb3c94d2b9c0d4cfa074b63d577ee660916ca#feddb3c94d2b9c0d4cfa074b63d577ee660916ca"
version = "21.1.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=2191a69b2d48b0b7230c9da5a75f86a4e659361b#2191a69b2d48b0b7230c9da5a75f86a4e659361b"
dependencies = [
"arrow-schema",
"datafusion-common",

View File

@ -121,8 +121,8 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies]
arrow = { version = "36.0.0" }
arrow-flight = { version = "36.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="feddb3c94d2b9c0d4cfa074b63d577ee660916ca", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="feddb3c94d2b9c0d4cfa074b63d577ee660916ca" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="2191a69b2d48b0b7230c9da5a75f86a4e659361b", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="2191a69b2d48b0b7230c9da5a75f86a4e659361b" }
hashbrown = { version = "0.13.2" }
parquet = { version = "36.0.0" }

View File

@ -12,6 +12,7 @@
pub mod config;
pub mod sender;
pub mod sort_exprs;
pub mod watch;
use std::sync::Arc;

View File

@ -0,0 +1,52 @@
use datafusion::{
arrow::compute::SortOptions,
physical_expr::{PhysicalSortExpr, PhysicalSortRequirement},
};
/// Structure to build [`PhysicalSortRequirement`]s for ExecutionPlans.
///
/// Replace with `PhysicalSortExpr::from_sort_exprs` when
/// <https://github.com/apache/arrow-datafusion/pull/5863> is merged
/// upstream.
pub fn requirements_from_sort_exprs<'a>(
exprs: impl IntoIterator<Item = &'a PhysicalSortExpr>,
) -> Vec<PhysicalSortRequirement> {
exprs
.into_iter()
.cloned()
.map(PhysicalSortRequirement::from)
.collect()
}
/// Converts the `PhysicalSortRequirement` to `PhysicalSortExpr`.
/// If required ordering is `None` for an entry, the default
/// ordering `ASC, NULLS LAST` is used.
///
/// The default is picked to be consistent with
/// PostgreSQL: <https://www.postgresql.org/docs/current/queries-order.html>
///
/// Replace with `PhysicalSortExpr::from` when
/// <https://github.com/apache/arrow-datafusion/pull/5863> is merged
/// upstream.
pub fn into_sort_expr(requirement: PhysicalSortRequirement) -> PhysicalSortExpr {
let PhysicalSortRequirement { expr, options } = requirement;
let options = options.unwrap_or(SortOptions {
descending: false,
nulls_first: false,
});
PhysicalSortExpr { expr, options }
}
/// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
/// for each entry in the input. If required ordering is None for an entry
/// default ordering `ASC, NULLS LAST` if given.
///
/// replace with PhysicalSortExpr::to_sort_exprs when
/// <https://github.com/apache/arrow-datafusion/pull/5863> is merged
/// upstream.
pub fn requirements_to_sort_exprs(
required: impl IntoIterator<Item = PhysicalSortRequirement>,
) -> Vec<PhysicalSortExpr> {
required.into_iter().map(into_sort_expr).collect()
}

View File

@ -19,7 +19,10 @@ use datafusion::{
error::{DataFusionError, Result},
execution::{context::TaskContext, memory_pool::MemoryConsumer},
logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore},
physical_expr::{create_physical_expr, execution_props::ExecutionProps, PhysicalSortExpr},
physical_expr::{
create_physical_expr, execution_props::ExecutionProps, PhysicalSortExpr,
PhysicalSortRequirement,
},
physical_plan::{
expressions::Column,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet},
@ -28,6 +31,7 @@ use datafusion::{
},
prelude::Expr,
};
use datafusion_util::sort_exprs::requirements_from_sort_exprs;
use self::stream::GapFillStream;
@ -469,8 +473,8 @@ impl ExecutionPlan for GapFillExec {
self.input.output_ordering()
}
fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
vec![Some(&self.sort_expr)]
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![Some(requirements_from_sort_exprs(&self.sort_expr))]
}
fn maintains_input_order(&self) -> Vec<bool> {

View File

@ -12,6 +12,7 @@ use datafusion::{
ExecutionPlan,
},
};
use datafusion_util::sort_exprs::requirements_to_sort_exprs;
use observability_deps::tracing::warn;
use crate::config::IoxConfigExt;
@ -95,7 +96,7 @@ impl PhysicalOptimizerRule for ParquetSortness {
})?;
// did this help?
if transformed_child.output_ordering() == Some(input_ordering) {
if transformed_child.output_ordering() == Some(&input_ordering) {
child = transformed_child;
transformed_any = true;
}
@ -121,13 +122,16 @@ impl PhysicalOptimizerRule for ParquetSortness {
}
}
type ChildWithSorting<'a> = (Arc<dyn ExecutionPlan>, &'a [PhysicalSortExpr]);
type ChildWithSorting<'a> = (Arc<dyn ExecutionPlan>, Vec<PhysicalSortExpr>);
fn detect_children_with_desired_ordering(
plan: &dyn ExecutionPlan,
) -> Option<Vec<ChildWithSorting<'_>>> {
if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
return Some(vec![(Arc::clone(sort_exec.input()), sort_exec.expr())]);
return Some(vec![(
Arc::clone(sort_exec.input()),
sort_exec.expr().to_vec(),
)]);
}
let required_input_ordering = plan.required_input_ordering();
@ -152,7 +156,8 @@ fn detect_children_with_desired_ordering(
.zip(
required_input_ordering
.into_iter()
.map(|expr| expr.expect("just checked")),
.map(|requirement| requirement.expect("just checked"))
.map(requirements_to_sort_exprs),
)
.collect(),
)

View File

@ -5,7 +5,9 @@ mod key_ranges;
use std::{collections::HashSet, fmt, sync::Arc};
use arrow::{error::ArrowError, record_batch::RecordBatch};
use datafusion_util::{watch::WatchedTask, AdapterStream};
use datafusion_util::{
sort_exprs::requirements_from_sort_exprs, watch::WatchedTask, AdapterStream,
};
use crate::CHUNK_ORDER_COLUMN_NAME;
@ -14,6 +16,7 @@ pub use self::algo::RecordBatchDeduplicator;
use datafusion::{
error::{DataFusionError, Result},
execution::context::TaskContext,
physical_expr::PhysicalSortRequirement,
physical_plan::{
expressions::{Column, PhysicalSortExpr},
metrics::{
@ -190,8 +193,8 @@ impl ExecutionPlan for DeduplicateExec {
Some(&self.sort_keys)
}
fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
vec![Some(&self.input_order)]
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![Some(requirements_from_sort_exprs(&self.input_order))]
}
fn maintains_input_order(&self) -> Vec<bool> {

View File

@ -30,9 +30,9 @@ bytes = { version = "1" }
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] }
crossbeam-utils = { version = "0.8" }
crypto-common = { version = "0.1", default-features = false, features = ["std"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "feddb3c94d2b9c0d4cfa074b63d577ee660916ca" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "feddb3c94d2b9c0d4cfa074b63d577ee660916ca", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "feddb3c94d2b9c0d4cfa074b63d577ee660916ca", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2191a69b2d48b0b7230c9da5a75f86a4e659361b" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2191a69b2d48b0b7230c9da5a75f86a4e659361b", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "2191a69b2d48b0b7230c9da5a75f86a4e659361b", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
digest = { version = "0.10", features = ["mac", "std"] }
either = { version = "1" }
fixedbitset = { version = "0.4" }