From 7781ed045501c97975bb79cd6c7835dbfdbd9257 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 20 Oct 2022 10:37:49 -0400 Subject: [PATCH] chore: Update datafusion (#5928) * chore: Upgrade datafusion * chore: Update for new API * chore: Update expected output * fix: Update comment * fix: compilation Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 16 +++---- datafusion/Cargo.toml | 4 +- parquet_file/src/storage.rs | 19 ++++---- parquet_to_line_protocol/src/lib.rs | 2 + query_tests/cases/in/pushdown.expected | 64 ++++++++++++-------------- 5 files changed, 53 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 199fcdf609..4d0daad5ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1135,7 +1135,7 @@ dependencies = [ [[package]] name = "datafusion" version = "13.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=feff5dc805449afa4a7ecb5ca88b979ad5739cce#feff5dc805449afa4a7ecb5ca88b979ad5739cce" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=57e445aadcc87cad33de8a969eb4203b219ec9dd#57e445aadcc87cad33de8a969eb4203b219ec9dd" dependencies = [ "ahash 0.8.0", "arrow", @@ -1179,7 +1179,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "13.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=feff5dc805449afa4a7ecb5ca88b979ad5739cce#feff5dc805449afa4a7ecb5ca88b979ad5739cce" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=57e445aadcc87cad33de8a969eb4203b219ec9dd#57e445aadcc87cad33de8a969eb4203b219ec9dd" dependencies = [ "arrow", "object_store", @@ -1191,7 +1191,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "13.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=feff5dc805449afa4a7ecb5ca88b979ad5739cce#feff5dc805449afa4a7ecb5ca88b979ad5739cce" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=57e445aadcc87cad33de8a969eb4203b219ec9dd#57e445aadcc87cad33de8a969eb4203b219ec9dd" dependencies = [ "ahash 0.8.0", "arrow", @@ -1203,7 +1203,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "13.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=feff5dc805449afa4a7ecb5ca88b979ad5739cce#feff5dc805449afa4a7ecb5ca88b979ad5739cce" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=57e445aadcc87cad33de8a969eb4203b219ec9dd#57e445aadcc87cad33de8a969eb4203b219ec9dd" dependencies = [ "arrow", "async-trait", @@ -1218,7 +1218,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "13.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=feff5dc805449afa4a7ecb5ca88b979ad5739cce#feff5dc805449afa4a7ecb5ca88b979ad5739cce" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=57e445aadcc87cad33de8a969eb4203b219ec9dd#57e445aadcc87cad33de8a969eb4203b219ec9dd" dependencies = [ "ahash 0.8.0", "arrow", @@ -1242,7 +1242,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "13.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=feff5dc805449afa4a7ecb5ca88b979ad5739cce#feff5dc805449afa4a7ecb5ca88b979ad5739cce" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=57e445aadcc87cad33de8a969eb4203b219ec9dd#57e445aadcc87cad33de8a969eb4203b219ec9dd" dependencies = [ "arrow", "datafusion 13.0.0", @@ -1255,7 +1255,7 @@ dependencies = [ [[package]] name = "datafusion-row" version = "13.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=feff5dc805449afa4a7ecb5ca88b979ad5739cce#feff5dc805449afa4a7ecb5ca88b979ad5739cce" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=57e445aadcc87cad33de8a969eb4203b219ec9dd#57e445aadcc87cad33de8a969eb4203b219ec9dd" dependencies = [ "arrow", "datafusion-common", @@ -1266,7 +1266,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "13.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=feff5dc805449afa4a7ecb5ca88b979ad5739cce#feff5dc805449afa4a7ecb5ca88b979ad5739cce" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=57e445aadcc87cad33de8a969eb4203b219ec9dd#57e445aadcc87cad33de8a969eb4203b219ec9dd" dependencies = [ "arrow", "datafusion-common", diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 83e1cfe21a..990a48942a 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -9,6 +9,6 @@ description = "Re-exports datafusion at a specific version" # Rename to workaround doctest bug # Turn off optional datafusion features (e.g. don't get support for crypto functions or avro) -upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="feff5dc805449afa4a7ecb5ca88b979ad5739cce", default-features = false, package = "datafusion" } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="feff5dc805449afa4a7ecb5ca88b979ad5739cce" } +upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="57e445aadcc87cad33de8a969eb4203b219ec9dd", default-features = false, package = "datafusion" } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="57e445aadcc87cad33de8a969eb4203b219ec9dd" } workspace-hack = { path = "../workspace-hack"} diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index 04443b14e9..fa81897856 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -227,6 +227,15 @@ impl ParquetStorage { let path = path.object_store_path(); trace!(path=?path, "fetching parquet data for filtered read"); + // set up "fake" DataFusion session (TODO thread the real one + // down so config options set on query context take effect here) + let object_store = Arc::clone(&self.object_store); + let session_ctx = SessionContext::new(); + let task_ctx = Arc::new(TaskContext::from(&session_ctx)); + task_ctx + .runtime_env() + .register_object_store("iox", "iox", object_store); + // Compute final (output) schema after selection let schema = Arc::new( select_schema(selection, &schema) @@ -256,17 +265,11 @@ impl ParquetStorage { projection: None, limit: None, table_partition_cols: vec![], + // TODO avoid this `copied_config` when config_options are directly available on context + config_options: session_ctx.copied_config().config_options(), }; let exec = ParquetExec::new(base_config, expr, None); - // set up "fake" DataFusion session - let object_store = Arc::clone(&self.object_store); - let session_ctx = SessionContext::new(); - let task_ctx = Arc::new(TaskContext::from(&session_ctx)); - task_ctx - .runtime_env() - .register_object_store("iox", "iox", object_store); - Ok(Box::pin(RecordBatchStreamAdapter::new( Arc::clone(&schema), futures::stream::once(execute_stream(Arc::new(exec), task_ctx)).try_flatten(), diff --git a/parquet_to_line_protocol/src/lib.rs b/parquet_to_line_protocol/src/lib.rs index 622fe386e9..0d8ff70210 100644 --- a/parquet_to_line_protocol/src/lib.rs +++ b/parquet_to_line_protocol/src/lib.rs @@ -2,6 +2,7 @@ use datafusion::{ arrow::datatypes::SchemaRef as ArrowSchemaRef, + config::ConfigOptions, datasource::{ file_format::{parquet::ParquetFormat, FileFormat}, listing::PartitionedFile, @@ -212,6 +213,7 @@ impl ParquetFileReader { projection: None, limit: None, table_partition_cols: vec![], + config_options: ConfigOptions::new().into_shareable(), }; // set up enough datafusion context to do the real read session diff --git a/query_tests/cases/in/pushdown.expected b/query_tests/cases/in/pushdown.expected index 5676da55b7..e733ef68c2 100644 --- a/query_tests/cases/in/pushdown.expected +++ b/query_tests/cases/in/pushdown.expected @@ -135,23 +135,21 @@ | 872 | 6 | 1970-01-01T00:00:00.000000110Z | lawrence | +-------+--------+--------------------------------+-----------+ -- SQL: EXPLAIN SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000; -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | Filter: CAST(restaurant.count AS Int64)restaurant.count > Int64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence")) AND CAST(restaurant.count AS Int64)restaurant.count < Int64(40000) | -| | Projection: CAST(restaurant.count AS Int64) AS CAST(restaurant.count AS Int64)restaurant.count, restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence")), CAST(restaurant.count AS Int64) < Int64(40000)] | -| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: CAST(restaurant.count AS Int64)restaurant.count@0 > 200 AND town@4 != tewsbury AND system@2 = 5 OR town@4 = lawrence AND CAST(restaurant.count AS Int64)restaurant.count@0 < 40000 | -| | ProjectionExec: expr=[CAST(count@0 AS Int64) as CAST(restaurant.count AS Int64)restaurant.count, count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [town != Dictionary(Int32, Utf8("tewsbury"))] | -| | | -+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | Filter: CAST(restaurant.count AS Int64)restaurant.count > Int64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence")) AND CAST(restaurant.count AS Int64)restaurant.count < Int64(40000) | +| | Projection: CAST(restaurant.count AS Int64) AS CAST(restaurant.count AS Int64)restaurant.count, restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence")), CAST(restaurant.count AS Int64) < Int64(40000)] | +| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] | +| | CoalesceBatchesExec: target_batch_size=4096 | +| | FilterExec: CAST(restaurant.count AS Int64)restaurant.count@0 > 200 AND town@4 != tewsbury AND system@2 = 5 OR town@4 = lawrence AND CAST(restaurant.count AS Int64)restaurant.count@0 < 40000 | +| | ProjectionExec: expr=[CAST(count@0 AS Int64) as CAST(restaurant.count AS Int64)restaurant.count, count@0 as count, system@1 as system, time@2 as time, town@3 as town] | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [town != Dictionary(Int32, Utf8("tewsbury"))] | +| | | ++---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: SELECT * from restaurant where count > 200 and count < 40000; -- Results After Sorting +-------+--------+--------------------------------+-----------+ @@ -164,23 +162,21 @@ | 872 | 6 | 1970-01-01T00:00:00.000000110Z | lawrence | +-------+--------+--------------------------------+-----------+ -- SQL: EXPLAIN SELECT * from restaurant where count > 200 and count < 40000; -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | Filter: CAST(restaurant.count AS Int64)restaurant.count > Int64(200) AND CAST(restaurant.count AS Int64)restaurant.count < Int64(40000) | -| | Projection: CAST(restaurant.count AS Int64) AS CAST(restaurant.count AS Int64)restaurant.count, restaurant.count, restaurant.system, restaurant.time, restaurant.town | -| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), CAST(restaurant.count AS Int64) < Int64(40000)] | -| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] | -| | CoalesceBatchesExec: target_batch_size=4096 | -| | FilterExec: CAST(restaurant.count AS Int64)restaurant.count@0 > 200 AND CAST(restaurant.count AS Int64)restaurant.count@0 < 40000 | -| | ProjectionExec: expr=[CAST(count@0 AS Int64) as CAST(restaurant.count AS Int64)restaurant.count, count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | -| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate | -| | | -+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | Filter: CAST(restaurant.count AS Int64)restaurant.count > Int64(200) AND CAST(restaurant.count AS Int64)restaurant.count < Int64(40000) | +| | Projection: CAST(restaurant.count AS Int64) AS CAST(restaurant.count AS Int64)restaurant.count, restaurant.count, restaurant.system, restaurant.time, restaurant.town | +| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), CAST(restaurant.count AS Int64) < Int64(40000)] | +| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] | +| | CoalesceBatchesExec: target_batch_size=4096 | +| | FilterExec: CAST(restaurant.count AS Int64)restaurant.count@0 > 200 AND CAST(restaurant.count AS Int64)restaurant.count@0 < 40000 | +| | ProjectionExec: expr=[CAST(count@0 AS Int64) as CAST(restaurant.count AS Int64)restaurant.count, count@0 as count, system@1 as system, time@2 as time, town@3 as town] | +| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate | +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQL: SELECT * from restaurant where system > 4.0 and system < 7.0; -- Results After Sorting +-------+--------+--------------------------------+-----------+