From b24f9c81ba680ce828d72b14d17486646a0ff4d5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 11 Jul 2023 09:36:38 -0400 Subject: [PATCH] chore: Update DataFusion pin, updates for API changed (#8199) --- Cargo.lock | 104 +++++++++++------- Cargo.toml | 30 +---- compactor/src/components/df_planner/panic.rs | 14 ++- iox_query/src/exec/gapfill/mod.rs | 12 +- iox_query/src/exec/non_null_checker.rs | 22 ++-- iox_query/src/exec/query_tracing.rs | 4 +- iox_query/src/exec/schema_pivot.rs | 20 ++-- iox_query/src/exec/split.rs | 22 ++-- .../handle_gapfill/range_predicate.rs | 4 +- .../physical_optimizer/projection_pushdown.rs | 12 +- iox_query/src/provider/deduplicate.rs | 30 +++-- iox_query/src/provider/record_batch_exec.rs | 20 ++-- iox_query_influxql/src/frontend/planner.rs | 16 ++- iox_query_influxql/src/plan/planner.rs | 13 ++- iox_query_influxql/src/plan/util_copy.rs | 13 ++- predicate/src/lib.rs | 2 +- querier/src/system_tables/mod.rs | 16 ++- workspace-hack/Cargo.toml | 23 ++-- 18 files changed, 220 insertions(+), 157 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 105d143d90..660dd90e99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -161,8 +161,9 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arrow" -version = "42.0.0" -source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/42.0.0_patched#20f6bd7ed730d937abe76ab859088094dee8a5d3" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2feeebd77b34b0bc88f224e06d01c27da4733997cc4789a4e056196656cdc59a" dependencies = [ "ahash 0.8.3", "arrow-arith", @@ -182,8 +183,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "42.0.0" -source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/42.0.0_patched#20f6bd7ed730d937abe76ab859088094dee8a5d3" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7173f5dc49c0ecb5135f52565af33afd3fdc9a12d13bd6f9973e8b96305e4b2e" dependencies = [ "arrow-array", "arrow-buffer", @@ -196,8 +198,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "42.0.0" -source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/42.0.0_patched#20f6bd7ed730d937abe76ab859088094dee8a5d3" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63d7ea725f7d1f8bb2cffc53ef538557e95fc802e217d5be25122d402e22f3d0" dependencies = [ "ahash 0.8.3", "arrow-buffer", @@ -212,8 +215,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "42.0.0" -source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/42.0.0_patched#20f6bd7ed730d937abe76ab859088094dee8a5d3" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdbe439e077f484e5000b9e1d47b5e4c0d15f2b311a8f5bcc682553d5d67a722" dependencies = [ "half 2.3.1", "num", @@ -221,8 +225,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "42.0.0" -source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/42.0.0_patched#20f6bd7ed730d937abe76ab859088094dee8a5d3" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93913cc14875770aa1eef5e310765e855effa352c094cb1c7c00607d0f37b4e1" dependencies = [ "arrow-array", "arrow-buffer", @@ -238,8 +243,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "42.0.0" -source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/42.0.0_patched#20f6bd7ed730d937abe76ab859088094dee8a5d3" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef55b67c55ed877e6fe7b923121c19dae5e31ca70249ea2779a17b58fb0fbd9a" dependencies = [ "arrow-array", "arrow-buffer", @@ -256,8 +262,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "42.0.0" -source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/42.0.0_patched#20f6bd7ed730d937abe76ab859088094dee8a5d3" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4f4f4a3c54614126a71ab91f6631c9743eb4643d6e9318b74191da9dc6e028b" dependencies = [ "arrow-buffer", "arrow-schema", @@ -267,8 +274,9 @@ dependencies = [ [[package]] name = "arrow-flight" -version = "42.0.0" -source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/42.0.0_patched#20f6bd7ed730d937abe76ab859088094dee8a5d3" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1128a9f099b4e8dc9a67aed274061f3cc95afd8b7aab98f2b44cb8b7b542b71" dependencies = [ "arrow-arith", "arrow-array", @@ -293,8 +301,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "42.0.0" -source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/42.0.0_patched#20f6bd7ed730d937abe76ab859088094dee8a5d3" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d41a3659f984a524ef1c2981d43747b24d8eec78e2425267fcd0ef34ce71cd18" dependencies = [ "arrow-array", "arrow-buffer", @@ -306,8 +315,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "42.0.0" -source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/42.0.0_patched#20f6bd7ed730d937abe76ab859088094dee8a5d3" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10b95faa95a378f56ef32d84cc0104ea998c39ef7cd1faaa6b4cebf8ea92846d" dependencies = [ "arrow-array", "arrow-buffer", @@ -316,7 +326,7 @@ dependencies = [ "arrow-schema", "chrono", "half 2.3.1", - "indexmap 1.9.3", + "indexmap 2.0.0", "lexical-core", "num", "serde", @@ -325,8 +335,9 @@ dependencies = [ [[package]] name = "arrow-ord" -version = "42.0.0" -source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/42.0.0_patched#20f6bd7ed730d937abe76ab859088094dee8a5d3" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c68549a4284d9f8b39586afb8d5ff8158b8f0286353a4844deb1d11cf1ba1f26" dependencies = [ "arrow-array", "arrow-buffer", @@ -339,8 +350,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "42.0.0" -source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/42.0.0_patched#20f6bd7ed730d937abe76ab859088094dee8a5d3" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a75a4a757afc301ce010adadff54d79d66140c4282ed3de565f6ccb716a5cf3" dependencies = [ "ahash 0.8.3", "arrow-array", @@ -353,13 +365,15 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "42.0.0" -source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/42.0.0_patched#20f6bd7ed730d937abe76ab859088094dee8a5d3" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bebcb57eef570b15afbcf2d07d813eb476fde9f6dd69c81004d6476c197e87e" [[package]] name = "arrow-select" -version = "42.0.0" -source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/42.0.0_patched#20f6bd7ed730d937abe76ab859088094dee8a5d3" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6e2943fa433a48921e914417173816af64eef61c0a3d448280e6c40a62df221" dependencies = [ "arrow-array", "arrow-buffer", @@ -370,14 +384,16 @@ dependencies = [ [[package]] name = "arrow-string" -version = "42.0.0" -source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/42.0.0_patched#20f6bd7ed730d937abe76ab859088094dee8a5d3" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbc92ed638851774f6d7af1ad900b92bc1486746497511868b4298fcbcfa35af" dependencies = [ "arrow-array", "arrow-buffer", "arrow-data", "arrow-schema", "arrow-select", + "num", "regex", "regex-syntax 0.7.4", ] @@ -1344,7 +1360,7 @@ dependencies = [ [[package]] name = "datafusion" version = "27.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=311e8c7c7ab8c087f072eeac9335394c09ae8185#311e8c7c7ab8c087f072eeac9335394c09ae8185" dependencies = [ "ahash 0.8.3", "arrow", @@ -1392,7 +1408,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "27.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=311e8c7c7ab8c087f072eeac9335394c09ae8185#311e8c7c7ab8c087f072eeac9335394c09ae8185" dependencies = [ "arrow", "arrow-array", @@ -1406,7 +1422,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "27.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=311e8c7c7ab8c087f072eeac9335394c09ae8185#311e8c7c7ab8c087f072eeac9335394c09ae8185" dependencies = [ "dashmap", "datafusion-common", @@ -1423,7 +1439,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "27.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=311e8c7c7ab8c087f072eeac9335394c09ae8185#311e8c7c7ab8c087f072eeac9335394c09ae8185" dependencies = [ "ahash 0.8.3", "arrow", @@ -1437,7 +1453,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "27.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=311e8c7c7ab8c087f072eeac9335394c09ae8185#311e8c7c7ab8c087f072eeac9335394c09ae8185" dependencies = [ "arrow", "async-trait", @@ -1454,13 +1470,14 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "27.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=311e8c7c7ab8c087f072eeac9335394c09ae8185#311e8c7c7ab8c087f072eeac9335394c09ae8185" dependencies = [ "ahash 0.8.3", "arrow", "arrow-array", "arrow-buffer", "arrow-schema", + "base64 0.21.2", "blake2", "blake3", "chrono", @@ -1469,6 +1486,7 @@ dependencies = [ "datafusion-row", "half 2.3.1", "hashbrown 0.14.0", + "hex", "indexmap 2.0.0", "itertools 0.11.0", "lazy_static", @@ -1486,7 +1504,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "27.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=311e8c7c7ab8c087f072eeac9335394c09ae8185#311e8c7c7ab8c087f072eeac9335394c09ae8185" dependencies = [ "arrow", "chrono", @@ -1500,7 +1518,7 @@ dependencies = [ [[package]] name = "datafusion-row" version = "27.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=311e8c7c7ab8c087f072eeac9335394c09ae8185#311e8c7c7ab8c087f072eeac9335394c09ae8185" dependencies = [ "arrow", "datafusion-common", @@ -1511,7 +1529,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "27.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=311e8c7c7ab8c087f072eeac9335394c09ae8185#311e8c7c7ab8c087f072eeac9335394c09ae8185" dependencies = [ "arrow", "arrow-schema", @@ -3849,8 +3867,9 @@ dependencies = [ [[package]] name = "parquet" -version = "42.0.0" -source = "git+https://github.com/alamb/arrow-rs.git?branch=alamb/42.0.0_patched#20f6bd7ed730d937abe76ab859088094dee8a5d3" +version = "43.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec7267a9607c3f955d4d0ac41b88a67cecc0d8d009173ad3da390699a6cb3750" dependencies = [ "ahash 0.8.3", "arrow-array", @@ -6686,6 +6705,7 @@ dependencies = [ "hashbrown 0.14.0", "heck", "indexmap 1.9.3", + "indexmap 2.0.0", "itertools 0.10.5", "libc", "lock_api", diff --git a/Cargo.toml b/Cargo.toml index 1c6a8e3c82..b84d776fd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -118,13 +118,14 @@ edition = "2021" license = "MIT OR Apache-2.0" [workspace.dependencies] -arrow = { version = "42.0.0" } -arrow-flight = { version = "42.0.0" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46752163bd4f30f778850160513e8ca7f15fcf14", default-features = false } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46752163bd4f30f778850160513e8ca7f15fcf14" } +arrow = { version = "43.0.0" } +arrow-flight = { version = "43.0.0" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "311e8c7c7ab8c087f072eeac9335394c09ae8185", default-features = false } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "311e8c7c7ab8c087f072eeac9335394c09ae8185" } + hashbrown = { version = "0.14.0" } object_store = { version = "0.6.0" } -parquet = { version = "42.0.0" } +parquet = { version = "43.0.0" } tonic = { version = "0.9.2", features = ["tls", "tls-webpki-roots"] } tonic-build = { version = "0.9.2" } tonic-health = { version = "0.9.2" } @@ -154,22 +155,3 @@ opt-level = 3 [profile.dev.package.similar] opt-level = 3 - -[patch.crates-io] -# TODO remove on upgrade to 43.0.0 -# Use https://github.com/apache/arrow-rs/pull/4467 to get the fix for -# https://github.com/apache/arrow-rs/issues/4459 -parquet = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched" } -arrow = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched" } -arrow-buffer = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched" } -arrow-schema = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched" } -arrow-data = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched" } -arrow-array = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched" } -arrow-select = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched" } -arrow-cast = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched" } -arrow-ipc = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched" } -arrow-row = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched" } -arrow-arith = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched" } -arrow-ord = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched" } -arrow-string = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched" } -arrow-flight = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched" } diff --git a/compactor/src/components/df_planner/panic.rs b/compactor/src/components/df_planner/panic.rs index 84fcdd6552..79956fdd0c 100644 --- a/compactor/src/components/df_planner/panic.rs +++ b/compactor/src/components/df_planner/panic.rs @@ -7,8 +7,8 @@ use datafusion::{ execution::context::TaskContext, physical_expr::PhysicalSortExpr, physical_plan::{ - stream::RecordBatchStreamAdapter, ExecutionPlan, Partitioning, SendableRecordBatchStream, - Statistics, + stream::RecordBatchStreamAdapter, DisplayAs, DisplayFormatType, ExecutionPlan, + Partitioning, SendableRecordBatchStream, Statistics, }, }; use schema::SchemaBuilder; @@ -93,6 +93,16 @@ impl ExecutionPlan for PanicPlan { } } +impl DisplayAs for PanicPlan { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "PanicPlan") + } + } + } +} + #[cfg(test)] mod tests { use data_types::CompactionLevel; diff --git a/iox_query/src/exec/gapfill/mod.rs b/iox_query/src/exec/gapfill/mod.rs index 95f806c74b..90b20254be 100644 --- a/iox_query/src/exec/gapfill/mod.rs +++ b/iox_query/src/exec/gapfill/mod.rs @@ -27,7 +27,7 @@ use datafusion::{ physical_plan::{ expressions::Column, metrics::{BaselineMetrics, ExecutionPlanMetricsSet}, - DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr, + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr, SendableRecordBatchStream, Statistics, }, prelude::Expr, @@ -534,6 +534,12 @@ impl ExecutionPlan for GapFillExec { )?)) } + fn statistics(&self) -> Statistics { + Statistics::default() + } +} + +impl DisplayAs for GapFillExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { @@ -566,10 +572,6 @@ impl ExecutionPlan for GapFillExec { } } } - - fn statistics(&self) -> Statistics { - Statistics::default() - } } #[cfg(test)] diff --git a/iox_query/src/exec/non_null_checker.rs b/iox_query/src/exec/non_null_checker.rs index d5de5f34d3..1de84cc656 100644 --- a/iox_query/src/exec/non_null_checker.rs +++ b/iox_query/src/exec/non_null_checker.rs @@ -54,8 +54,8 @@ use datafusion::{ physical_plan::{ expressions::PhysicalSortExpr, metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, - DisplayFormatType, Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, - Statistics, + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + SendableRecordBatchStream, Statistics, }, }; @@ -272,14 +272,6 @@ impl ExecutionPlan for NonNullCheckerExec { Ok(AdapterStream::adapt(self.schema(), rx, handle)) } - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "NonNullCheckerExec") - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } @@ -290,6 +282,16 @@ impl ExecutionPlan for NonNullCheckerExec { } } +impl DisplayAs for NonNullCheckerExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "NonNullCheckerExec") + } + } + } +} + async fn check_for_nulls( mut input_stream: SendableRecordBatchStream, schema: SchemaRef, diff --git a/iox_query/src/exec/query_tracing.rs b/iox_query/src/exec/query_tracing.rs index e5950dce49..b3d37b23b6 100644 --- a/iox_query/src/exec/query_tracing.rs +++ b/iox_query/src/exec/query_tracing.rs @@ -345,7 +345,7 @@ mod tests { physical_plan::{ expressions::PhysicalSortExpr, metrics::{Count, Time, Timestamp}, - Metric, + DisplayAs, Metric, }, }; use std::{collections::BTreeMap, str::FromStr, sync::Arc, time::Duration}; @@ -679,7 +679,9 @@ mod tests { fn metrics(&self) -> Option { self.metrics.clone() } + } + impl DisplayAs for TestExec { fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "TestExec - {}", self.name) } diff --git a/iox_query/src/exec/schema_pivot.rs b/iox_query/src/exec/schema_pivot.rs index 9670044849..b6192f61f1 100644 --- a/iox_query/src/exec/schema_pivot.rs +++ b/iox_query/src/exec/schema_pivot.rs @@ -30,7 +30,6 @@ use arrow::{ error::ArrowError, record_batch::RecordBatch, }; -use datafusion::error::DataFusionError; use datafusion::{ common::{DFSchemaRef, ToDFSchema}, error::{DataFusionError as Error, Result}, @@ -43,6 +42,7 @@ use datafusion::{ Statistics, }, }; +use datafusion::{error::DataFusionError, physical_plan::DisplayAs}; use datafusion_util::{watch::WatchedTask, AdapterStream}; use observability_deps::tracing::debug; @@ -247,14 +247,6 @@ impl ExecutionPlan for SchemaPivotExec { Ok(AdapterStream::adapt(self.schema(), rx, handle)) } - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "SchemaPivotExec") - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } @@ -265,6 +257,16 @@ impl ExecutionPlan for SchemaPivotExec { } } +impl DisplayAs for SchemaPivotExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "SchemaPivotExec") + } + } + } +} + // Algorithm: for each column we haven't seen a value for yet, // check each input row; // diff --git a/iox_query/src/exec/split.rs b/iox_query/src/exec/split.rs index 23ddbd3498..c995dbe837 100644 --- a/iox_query/src/exec/split.rs +++ b/iox_query/src/exec/split.rs @@ -67,8 +67,8 @@ use datafusion::{ physical_plan::{ expressions::PhysicalSortExpr, metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput}, - ColumnarValue, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, PhysicalExpr, - SendableRecordBatchStream, Statistics, + ColumnarValue, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + PhysicalExpr, SendableRecordBatchStream, Statistics, }, scalar::ScalarValue, }; @@ -267,14 +267,6 @@ impl ExecutionPlan for StreamSplitExec { } } - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "StreamSplitExec") - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } @@ -286,6 +278,16 @@ impl ExecutionPlan for StreamSplitExec { } } +impl DisplayAs for StreamSplitExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "StreamSplitExec") + } + } + } +} + impl StreamSplitExec { /// if in State::New, sets up the output running and sets self.state --> `Running` fn start_if_needed(&self, context: Arc) -> Result<()> { diff --git a/iox_query/src/logical_optimizer/handle_gapfill/range_predicate.rs b/iox_query/src/logical_optimizer/handle_gapfill/range_predicate.rs index c89b7857cb..9965b86f0d 100644 --- a/iox_query/src/logical_optimizer/handle_gapfill/range_predicate.rs +++ b/iox_query/src/logical_optimizer/handle_gapfill/range_predicate.rs @@ -7,7 +7,7 @@ use datafusion::{ DFSchema, }, error::Result, - logical_expr::{Between, BinaryExpr, LogicalPlan, Operator}, + logical_expr::{expr::Alias, Between, BinaryExpr, LogicalPlan, Operator}, optimizer::utils::split_conjunction, prelude::{Column, Expr}, }; @@ -79,7 +79,7 @@ impl TreeNodeVisitor for TimeRangeVisitor { fn unwrap_alias(mut e: &Expr) -> &Expr { loop { match e { - Expr::Alias(inner, _) => e = inner.as_ref(), + Expr::Alias(Alias { expr, .. }) => e = expr.as_ref(), e => break e, } } diff --git a/iox_query/src/physical_optimizer/projection_pushdown.rs b/iox_query/src/physical_optimizer/projection_pushdown.rs index 2e8a97fb5f..d3c2c8879e 100644 --- a/iox_query/src/physical_optimizer/projection_pushdown.rs +++ b/iox_query/src/physical_optimizer/projection_pushdown.rs @@ -434,7 +434,7 @@ mod tests { logical_expr::Operator, physical_plan::{ expressions::{BinaryExpr, Literal}, - PhysicalExpr, Statistics, + DisplayAs, PhysicalExpr, Statistics, }, scalar::ScalarValue, }; @@ -1695,6 +1695,12 @@ mod tests { unimplemented!() } + fn statistics(&self) -> datafusion::physical_plan::Statistics { + unimplemented!() + } + } + + impl DisplayAs for TestExec { fn fmt_as( &self, _t: datafusion::physical_plan::DisplayFormatType, @@ -1702,9 +1708,5 @@ mod tests { ) -> std::fmt::Result { write!(f, "Test") } - - fn statistics(&self) -> datafusion::physical_plan::Statistics { - unimplemented!() - } } } diff --git a/iox_query/src/provider/deduplicate.rs b/iox_query/src/provider/deduplicate.rs index 02427b1257..e221c3fd87 100644 --- a/iox_query/src/provider/deduplicate.rs +++ b/iox_query/src/provider/deduplicate.rs @@ -20,8 +20,8 @@ use datafusion::{ metrics::{ self, BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, RecordOutput, }, - DisplayFormatType, Distribution, ExecutionPlan, Partitioning, SendableRecordBatchStream, - Statistics, + DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, Partitioning, + SendableRecordBatchStream, Statistics, }, }; use futures::StreamExt; @@ -267,15 +267,6 @@ impl ExecutionPlan for DeduplicateExec { vec![Distribution::SinglePartition] } - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - let expr: Vec = self.sort_keys.iter().map(|e| e.to_string()).collect(); - write!(f, "DeduplicateExec: [{}]", expr.join(",")) - } - } - } - fn metrics(&self) -> Option { Some(self.metrics.clone_inner()) } @@ -289,6 +280,17 @@ impl ExecutionPlan for DeduplicateExec { } } +impl DisplayAs for DeduplicateExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + let expr: Vec = self.sort_keys.iter().map(|e| e.to_string()).collect(); + write!(f, "DeduplicateExec: [{}]", expr.join(",")) + } + } + } +} + async fn deduplicate( mut input_stream: SendableRecordBatchStream, sort_keys: Vec, @@ -1222,4 +1224,10 @@ mod test { Statistics::default() } } + + impl DisplayAs for DummyExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "DummyExec") + } + } } diff --git a/iox_query/src/provider/record_batch_exec.rs b/iox_query/src/provider/record_batch_exec.rs index ded30d8338..3959dca206 100644 --- a/iox_query/src/provider/record_batch_exec.rs +++ b/iox_query/src/provider/record_batch_exec.rs @@ -14,7 +14,7 @@ use datafusion::{ expressions::{Column, PhysicalSortExpr}, memory::MemoryStream, metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, - ColumnStatistics, DisplayFormatType, ExecutionPlan, Partitioning, + ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }, scalar::ScalarValue, @@ -234,6 +234,16 @@ impl ExecutionPlan for RecordBatchesExec { Ok(adapter) } + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Statistics { + self.statistics.clone() + } +} + +impl DisplayAs for RecordBatchesExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { let total_groups = self.chunks.len(); @@ -258,12 +268,4 @@ impl ExecutionPlan for RecordBatchesExec { } } } - - fn metrics(&self) -> Option { - Some(self.metrics.clone_inner()) - } - - fn statistics(&self) -> Statistics { - self.statistics.clone() - } } diff --git a/iox_query_influxql/src/frontend/planner.rs b/iox_query_influxql/src/frontend/planner.rs index 998dfbd881..c0de1cdc31 100644 --- a/iox_query_influxql/src/frontend/planner.rs +++ b/iox_query_influxql/src/frontend/planner.rs @@ -17,7 +17,9 @@ use datafusion::datasource::provider_as_source; use datafusion::execution::context::{SessionState, TaskContext}; use datafusion::logical_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource}; use datafusion::physical_expr::PhysicalSortExpr; -use datafusion::physical_plan::{Partitioning, SendableRecordBatchStream}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, Partitioning, SendableRecordBatchStream, +}; use datafusion::{ error::{DataFusionError, Result}, physical_plan::ExecutionPlan, @@ -78,7 +80,7 @@ struct SchemaExec { impl Debug for SchemaExec { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "SchemaExec") + self.fmt_as(DisplayFormatType::Default, f) } } @@ -123,6 +125,16 @@ impl ExecutionPlan for SchemaExec { } } +impl DisplayAs for SchemaExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "SchemaExec") + } + } + } +} + /// Create plans for running InfluxQL queries against databases #[derive(Debug, Default)] pub struct InfluxQLQueryPlanner {} diff --git a/iox_query_influxql/src/plan/planner.rs b/iox_query_influxql/src/plan/planner.rs index d7ed9e9920..7201fa52a7 100644 --- a/iox_query_influxql/src/plan/planner.rs +++ b/iox_query_influxql/src/plan/planner.rs @@ -29,7 +29,7 @@ use datafusion::catalog::TableReference; use datafusion::common::tree_node::{TreeNode, VisitRecursion}; use datafusion::common::{DFSchema, DFSchemaRef, Result, ScalarValue, ToDFSchema}; use datafusion::datasource::{provider_as_source, MemTable}; -use datafusion::logical_expr::expr::ScalarFunction; +use datafusion::logical_expr::expr::{Alias, ScalarFunction}; use datafusion::logical_expr::expr_rewriter::normalize_col; use datafusion::logical_expr::logical_plan::builder::project; use datafusion::logical_expr::logical_plan::Analyze; @@ -813,7 +813,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { // Take ownership of the alias, so we don't reallocate, and temporarily place a literal // `NULL` in its place. - let Expr::Alias(_, alias) = std::mem::replace(&mut select_exprs[time_column_index], lit(ScalarValue::Null)) else { + let Expr::Alias(Alias{name: alias, ..}) = std::mem::replace(&mut select_exprs[time_column_index], lit(ScalarValue::Null)) else { return error::internal("time column is not an alias") }; @@ -1149,7 +1149,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> { continue; } let (expr, out_name) = match expr.clone() { - Expr::Alias(expr, out_name) => (*expr, out_name), + Expr::Alias(Alias { + expr, + name: out_name, + }) => (*expr, out_name), _ => { return error::internal("other field is not aliased"); } @@ -1195,7 +1198,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { let time_column = { // Take ownership of the alias, so we don't reallocate, and temporarily place a literal // `NULL` in its place. - let Expr::Alias(_, alias) = std::mem::replace(&mut select_exprs[time_column_index], lit(ScalarValue::Null)) else { + let Expr::Alias(Alias{name: alias, ..}) = std::mem::replace(&mut select_exprs[time_column_index], lit(ScalarValue::Null)) else { return error::internal("time column is not an alias") }; @@ -2824,7 +2827,7 @@ fn build_gap_fill_node( fill_strategy: FillStrategy, ) -> Result { let (expr, alias) = match time_column { - Expr::Alias(expr, alias) => (expr.as_ref(), alias), + Expr::Alias(Alias { expr, name: alias }) => (expr.as_ref(), alias), _ => return error::internal("expected time column to have an alias function"), }; diff --git a/iox_query_influxql/src/plan/util_copy.rs b/iox_query_influxql/src/plan/util_copy.rs index 1ece258d5b..563b6e2a93 100644 --- a/iox_query_influxql/src/plan/util_copy.rs +++ b/iox_query_influxql/src/plan/util_copy.rs @@ -10,7 +10,7 @@ use datafusion::common::tree_node::{TreeNode, VisitRecursion}; use datafusion::common::Result; use datafusion::logical_expr::expr::{ - AggregateUDF, InList, InSubquery, Placeholder, ScalarFunction, ScalarUDF, + AggregateUDF, Alias, InList, InSubquery, Placeholder, ScalarFunction, ScalarUDF, }; use datafusion::logical_expr::{ expr::{ @@ -104,10 +104,13 @@ where filter: filter.clone(), order_by: order_by.clone(), })), - Expr::Alias(nested_expr, alias_name) => Ok(Expr::Alias( - Box::new(clone_with_replacement(nested_expr, replacement_fn)?), - alias_name.clone(), - )), + Expr::Alias(Alias { + expr: nested_expr, + name: alias_name, + }) => Ok(Expr::Alias(Alias { + expr: Box::new(clone_with_replacement(nested_expr, replacement_fn)?), + name: alias_name.clone(), + })), Expr::Between(Between { expr, negated, diff --git a/predicate/src/lib.rs b/predicate/src/lib.rs index f9ea842655..aa6c509574 100644 --- a/predicate/src/lib.rs +++ b/predicate/src/lib.rs @@ -488,7 +488,7 @@ impl TreeNodeVisitor for RowBasedVisitor { fn pre_visit(&mut self, expr: &Expr) -> Result { match expr { - Expr::Alias(_, _) + Expr::Alias(_) | Expr::Between { .. } | Expr::BinaryExpr { .. } | Expr::Case { .. } diff --git a/querier/src/system_tables/mod.rs b/querier/src/system_tables/mod.rs index c0d053c599..ee4d2a15a5 100644 --- a/querier/src/system_tables/mod.rs +++ b/querier/src/system_tables/mod.rs @@ -3,6 +3,7 @@ use arrow::{datatypes::SchemaRef, error::Result as ArrowResult, record_batch::Re use async_trait::async_trait; use data_types::NamespaceId; use datafusion::error::DataFusionError; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datafusion::{ catalog::schema::SchemaProvider, datasource::TableProvider, @@ -140,9 +141,7 @@ struct SystemTableExecutionPlan { impl std::fmt::Debug for SystemTableExecutionPlan { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SystemTableExecutionPlan") - .field("projection", &self.projection) - .finish() + self.fmt_as(DisplayFormatType::Default, f) } } @@ -192,6 +191,17 @@ impl ExecutionPlan for SystemTableExecutionPlan } } +impl DisplayAs for SystemTableExecutionPlan { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => f + .debug_struct("SystemTableExecutionPlan") + .field("projection", &self.projection) + .finish(), + } + } +} + struct SystemTableStream { projected_schema: SchemaRef, projection: Option>, diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 90bdcbb109..72d6c3a077 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -17,11 +17,11 @@ license.workspace = true ### BEGIN HAKARI SECTION [dependencies] ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] } -arrow = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched", features = ["dyn_cmp_dict", "prettyprint"] } -arrow-array = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched", default-features = false, features = ["chrono-tz"] } -arrow-flight = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched", features = ["flight-sql-experimental"] } -arrow-ord = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched", default-features = false, features = ["dyn_cmp_dict"] } -arrow-string = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched", default-features = false, features = ["dyn_cmp_dict"] } +arrow = { version = "43", features = ["dyn_cmp_dict", "prettyprint"] } +arrow-array = { version = "43", default-features = false, features = ["chrono-tz"] } +arrow-flight = { version = "43", features = ["flight-sql-experimental"] } +arrow-ord = { version = "43", default-features = false, features = ["dyn_cmp_dict"] } +arrow-string = { version = "43", default-features = false, features = ["dyn_cmp_dict"] } base64-594e8ee84c453af0 = { package = "base64", version = "0.13" } base64-647d43efb71741da = { package = "base64", version = "0.21" } bitflags = { version = "1" } @@ -30,9 +30,9 @@ bytes = { version = "1" } chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] } crossbeam-utils = { version = "0.8" } crypto-common = { version = "0.1", default-features = false, features = ["std"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46752163bd4f30f778850160513e8ca7f15fcf14" } -datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46752163bd4f30f778850160513e8ca7f15fcf14", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } -datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46752163bd4f30f778850160513e8ca7f15fcf14", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "311e8c7c7ab8c087f072eeac9335394c09ae8185" } +datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "311e8c7c7ab8c087f072eeac9335394c09ae8185", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } +datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "311e8c7c7ab8c087f072eeac9335394c09ae8185", default-features = false, features = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions"] } digest = { version = "0.10", features = ["mac", "std"] } either = { version = "1" } fixedbitset = { version = "0.4" } @@ -47,7 +47,8 @@ futures-task = { version = "0.3", default-features = false, features = ["std"] } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } getrandom = { version = "0.2", default-features = false, features = ["std"] } hashbrown = { version = "0.14", features = ["raw"] } -indexmap = { version = "1", default-features = false, features = ["std"] } +indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["std"] } +indexmap-f595c2ba2a3f28df = { package = "indexmap", version = "2" } itertools = { version = "0.10" } libc = { version = "0.2", features = ["extra_traits"] } lock_api = { version = "0.4", features = ["arc_lock"] } @@ -59,7 +60,7 @@ num-traits = { version = "0.2", features = ["i128", "libm"] } object_store = { version = "0.6", default-features = false, features = ["aws", "azure", "gcp"] } once_cell = { version = "1", features = ["parking_lot"] } parking_lot = { version = "0.12", features = ["arc_lock"] } -parquet = { git = "https://github.com/alamb/arrow-rs.git", branch = "alamb/42.0.0_patched", features = ["experimental", "object_store"] } +parquet = { version = "43", features = ["experimental", "object_store"] } petgraph = { version = "0.6" } phf_shared = { version = "0.11" } predicates = { version = "3" } @@ -122,7 +123,7 @@ futures-util = { version = "0.3", features = ["channel", "io", "sink"] } getrandom = { version = "0.2", default-features = false, features = ["std"] } hashbrown = { version = "0.14", features = ["raw"] } heck = { version = "0.4", features = ["unicode"] } -indexmap = { version = "1", default-features = false, features = ["std"] } +indexmap-dff4ba8e3ae991db = { package = "indexmap", version = "1", default-features = false, features = ["std"] } itertools = { version = "0.10" } libc = { version = "0.2", features = ["extra_traits"] } lock_api = { version = "0.4", features = ["arc_lock"] }