From 62fed73bcd2274995aad52bff2f2043ea0b03576 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 17 May 2023 15:57:12 +0200 Subject: [PATCH] refactor: upgrade DataFusion to `19b03240920ad63cac916b42951754c0337bdac8#19b03240920ad63cac916b42951754c0337bdac8` (#7813) I need: - https://github.com/apache/arrow-datafusion/pull/6226. Changes in code due to: - https://github.com/apache/arrow-datafusion/pull/6332 Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 18 ++++++------- Cargo.toml | 4 +-- .../cases/in/duplicates_parquet.sql.expected | 22 ++++++++-------- iox_query_influxql/src/plan/planner.rs | 2 ++ iox_query_influxql/src/plan/util_copy.rs | 26 ++++++++++++------- workspace-hack/Cargo.toml | 6 ++--- 6 files changed, 43 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 60172738b3..4c27f0c64d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1441,7 +1441,7 @@ dependencies = [ [[package]] name = "datafusion" version = "24.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2#e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=19b03240920ad63cac916b42951754c0337bdac8#19b03240920ad63cac916b42951754c0337bdac8" dependencies = [ "ahash 0.8.3", "arrow", @@ -1490,7 +1490,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "24.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2#e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=19b03240920ad63cac916b42951754c0337bdac8#19b03240920ad63cac916b42951754c0337bdac8" dependencies = [ "arrow", "arrow-array", @@ -1504,7 +1504,7 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "24.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2#e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=19b03240920ad63cac916b42951754c0337bdac8#19b03240920ad63cac916b42951754c0337bdac8" dependencies = [ "dashmap", "datafusion-common", @@ -1521,7 +1521,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "24.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2#e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=19b03240920ad63cac916b42951754c0337bdac8#19b03240920ad63cac916b42951754c0337bdac8" dependencies = [ "ahash 0.8.3", "arrow", @@ -1532,7 +1532,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "24.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2#e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=19b03240920ad63cac916b42951754c0337bdac8#19b03240920ad63cac916b42951754c0337bdac8" dependencies = [ "arrow", "async-trait", @@ -1549,7 +1549,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "24.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2#e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=19b03240920ad63cac916b42951754c0337bdac8#19b03240920ad63cac916b42951754c0337bdac8" dependencies = [ "ahash 0.8.3", "arrow", @@ -1581,7 +1581,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "24.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2#e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=19b03240920ad63cac916b42951754c0337bdac8#19b03240920ad63cac916b42951754c0337bdac8" dependencies = [ "arrow", "chrono", @@ -1595,7 +1595,7 @@ dependencies = [ [[package]] name = "datafusion-row" version = "24.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2#e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=19b03240920ad63cac916b42951754c0337bdac8#19b03240920ad63cac916b42951754c0337bdac8" dependencies = [ "arrow", "datafusion-common", @@ -1606,7 +1606,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "24.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2#e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=19b03240920ad63cac916b42951754c0337bdac8#19b03240920ad63cac916b42951754c0337bdac8" dependencies = [ "arrow", "arrow-schema", diff --git a/Cargo.toml b/Cargo.toml index 73454436df..832a1c56ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -117,8 +117,8 @@ license = "MIT OR Apache-2.0" [workspace.dependencies] arrow = { version = "39.0.0" } arrow-flight = { version = "39.0.0" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2", default-features = false } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="19b03240920ad63cac916b42951754c0337bdac8", default-features = false } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="19b03240920ad63cac916b42951754c0337bdac8" } hashbrown = { version = "0.13.2" } parquet = { version = "39.0.0" } tonic = { version = "0.9.2", features = ["tls", "tls-webpki-roots"] } diff --git a/influxdb_iox/tests/query_tests/cases/in/duplicates_parquet.sql.expected b/influxdb_iox/tests/query_tests/cases/in/duplicates_parquet.sql.expected index b8293f2939..20b0c3d3d7 100644 --- a/influxdb_iox/tests/query_tests/cases/in/duplicates_parquet.sql.expected +++ b/influxdb_iox/tests/query_tests/cases/in/duplicates_parquet.sql.expected @@ -96,16 +96,16 @@ ---------- | plan_type | plan | ---------- -| Plan with Metrics | CoalescePartitionsExec, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] | -| | UnionExec, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] | -| | CoalesceBatchesExec: target_batch_size=8192, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=5, spill_count=0, spilled_bytes=0] | -| | FilterExec: state@4 = MA, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=5, spill_count=0, spilled_bytes=0] | -| | ParquetExec: file_groups={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet], [1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, projection=[area, city, max_temp, min_temp, state, time], output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], predicate=state@4 = MA, pruning_predicate=state_min@0 <= MA AND MA <= state_max@1, metrics=[bytes_scanned=1219, elapsed_compute=1.234ms, mem_used=0, num_predicate_creation_errors=0, output_rows=5, page_index_eval_time=1.234ms, page_index_rows_filtered=0, predicate_evaluation_errors=0, pushdown_eval_time=1.234ms, pushdown_rows_filtered=5, row_groups_pruned=0, spill_count=0, spilled_bytes=0, time_elapsed_opening=1.234ms, time_elapsed_processing=1.234ms, time_elapsed_scanning_total=1.234ms, time_elapsed_scanning_until_data=1.234ms] | -| | ProjectionExec: expr=[area@1 as area, city@2 as city, max_temp@3 as max_temp, min_temp@4 as min_temp, state@5 as state, time@6 as time], metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=5, spill_count=0, spilled_bytes=0] | -| | DeduplicateExec: [state@5 ASC,city@2 ASC,time@6 ASC], metrics=[elapsed_compute=1.234ms, mem_used=0, num_dupes=2, output_rows=5, spill_count=0, spilled_bytes=0] | -| | SortPreservingMergeExec: [state@5 ASC,city@2 ASC,time@6 ASC,__chunk_order@0 ASC], metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=7, spill_count=0, spilled_bytes=0] | -| | CoalesceBatchesExec: target_batch_size=8192, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=7, spill_count=0, spilled_bytes=0] | -| | FilterExec: state@5 = MA, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=7, spill_count=0, spilled_bytes=0] | -| | ParquetExec: file_groups={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000002.parquet], [1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, projection=[__chunk_order, area, city, max_temp, min_temp, state, time], output_ordering=[state@5 ASC, city@2 ASC, time@6 ASC, __chunk_order@0 ASC], predicate=state@4 = MA, pruning_predicate=state_min@0 <= MA AND MA <= state_max@1, metrics=[bytes_scanned=1106, elapsed_compute=1.234ms, mem_used=0, num_predicate_creation_errors=0, output_rows=7, page_index_eval_time=1.234ms, page_index_rows_filtered=0, predicate_evaluation_errors=0, pushdown_eval_time=1.234ms, pushdown_rows_filtered=3, row_groups_pruned=0, spill_count=0, spilled_bytes=0, time_elapsed_opening=1.234ms, time_elapsed_processing=1.234ms, time_elapsed_scanning_total=1.234ms, time_elapsed_scanning_until_data=1.234ms] | +| Plan with Metrics | CoalescePartitionsExec, metrics=[elapsed_compute=1.234ms, output_rows=10] | +| | UnionExec, metrics=[elapsed_compute=1.234ms, output_rows=10] | +| | CoalesceBatchesExec: target_batch_size=8192, metrics=[elapsed_compute=1.234ms, output_rows=5] | +| | FilterExec: state@4 = MA, metrics=[elapsed_compute=1.234ms, output_rows=5] | +| | ParquetExec: file_groups={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet], [1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, projection=[area, city, max_temp, min_temp, state, time], output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], predicate=state@4 = MA, pruning_predicate=state_min@0 <= MA AND MA <= state_max@1, metrics=[bytes_scanned=1683, elapsed_compute=1.234ms, num_predicate_creation_errors=0, output_rows=5, page_index_eval_time=1.234ms, page_index_rows_filtered=0, predicate_evaluation_errors=0, pushdown_eval_time=1.234ms, pushdown_rows_filtered=5, row_groups_pruned=0, time_elapsed_opening=1.234ms, time_elapsed_processing=1.234ms, time_elapsed_scanning_total=1.234ms, time_elapsed_scanning_until_data=1.234ms] | +| | ProjectionExec: expr=[area@1 as area, city@2 as city, max_temp@3 as max_temp, min_temp@4 as min_temp, state@5 as state, time@6 as time], metrics=[elapsed_compute=1.234ms, output_rows=5] | +| | DeduplicateExec: [state@5 ASC,city@2 ASC,time@6 ASC], metrics=[elapsed_compute=1.234ms, num_dupes=2, output_rows=5] | +| | SortPreservingMergeExec: [state@5 ASC,city@2 ASC,time@6 ASC,__chunk_order@0 ASC], metrics=[elapsed_compute=1.234ms, output_rows=7] | +| | CoalesceBatchesExec: target_batch_size=8192, metrics=[elapsed_compute=1.234ms, output_rows=7] | +| | FilterExec: state@5 = MA, metrics=[elapsed_compute=1.234ms, output_rows=7] | +| | ParquetExec: file_groups={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000002.parquet], [1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, projection=[__chunk_order, area, city, max_temp, min_temp, state, time], output_ordering=[state@5 ASC, city@2 ASC, time@6 ASC, __chunk_order@0 ASC], predicate=state@4 = MA, pruning_predicate=state_min@0 <= MA AND MA <= state_max@1, metrics=[bytes_scanned=1532, elapsed_compute=1.234ms, num_predicate_creation_errors=0, output_rows=7, page_index_eval_time=1.234ms, page_index_rows_filtered=0, predicate_evaluation_errors=0, pushdown_eval_time=1.234ms, pushdown_rows_filtered=3, row_groups_pruned=0, time_elapsed_opening=1.234ms, time_elapsed_processing=1.234ms, time_elapsed_scanning_total=1.234ms, time_elapsed_scanning_until_data=1.234ms] | | | | ---------- \ No newline at end of file diff --git a/iox_query_influxql/src/plan/planner.rs b/iox_query_influxql/src/plan/planner.rs index 6d4df48f52..d32c402373 100644 --- a/iox_query_influxql/src/plan/planner.rs +++ b/iox_query_influxql/src/plan/planner.rs @@ -1118,6 +1118,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { vec![expr], distinct, None, + None, ))) } "sum" | "stddev" | "mean" | "median" => { @@ -1132,6 +1133,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { vec![expr], false, None, + None, ))) } name @ ("first" | "last" | "min" | "max") => { diff --git a/iox_query_influxql/src/plan/util_copy.rs b/iox_query_influxql/src/plan/util_copy.rs index ca3d4dc449..50f8e7209a 100644 --- a/iox_query_influxql/src/plan/util_copy.rs +++ b/iox_query_influxql/src/plan/util_copy.rs @@ -58,6 +58,7 @@ where args, distinct, filter, + order_by, }) => Ok(Expr::AggregateFunction(AggregateFunction::new( fun.clone(), args.iter() @@ -65,6 +66,7 @@ where .collect::>>()?, *distinct, filter.clone(), + order_by.clone(), ))), Expr::WindowFunction(WindowFunction { fun, @@ -87,16 +89,20 @@ where .collect::>>()?, window_frame.clone(), ))), - Expr::AggregateUDF(AggregateUDF { fun, args, filter }) => { - Ok(Expr::AggregateUDF(AggregateUDF { - fun: fun.clone(), - args: args - .iter() - .map(|e| clone_with_replacement(e, replacement_fn)) - .collect::>>()?, - filter: filter.clone(), - })) - } + Expr::AggregateUDF(AggregateUDF { + fun, + args, + filter, + order_by, + }) => Ok(Expr::AggregateUDF(AggregateUDF { + fun: fun.clone(), + args: args + .iter() + .map(|e| clone_with_replacement(e, replacement_fn)) + .collect::>>()?, + filter: filter.clone(), + order_by: order_by.clone(), + })), Expr::Alias(nested_expr, alias_name) => Ok(Expr::Alias( Box::new(clone_with_replacement(nested_expr, replacement_fn)?), alias_name.clone(), diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index c3b8ac9262..7e8a7aad58 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -30,9 +30,9 @@ bytes = { version = "1" } chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] } crossbeam-utils = { version = "0.8" } crypto-common = { version = "0.1", default-features = false, features = ["std"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2" } -datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } -datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "e6d7e46dedbe5046e4606bfd3d7a1199dd0aaae2", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "19b03240920ad63cac916b42951754c0337bdac8" } +datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "19b03240920ad63cac916b42951754c0337bdac8", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } +datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "19b03240920ad63cac916b42951754c0337bdac8", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } digest = { version = "0.10", features = ["mac", "std"] } either = { version = "1" } fixedbitset = { version = "0.4" }