chore: Update datafusion (#5928)

* chore: Upgrade datafusion

* chore: Update for new API

* chore: Update expected output

* fix: Update comment

* fix: compilation

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-10-20 10:37:49 -04:00 committed by GitHub
parent e1d37b52b2
commit 7781ed0455
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 53 additions and 52 deletions

16
Cargo.lock generated
View File

@ -1135,7 +1135,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "13.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feff5dc805449afa4a7ecb5ca88b979ad5739cce#feff5dc805449afa4a7ecb5ca88b979ad5739cce"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=57e445aadcc87cad33de8a969eb4203b219ec9dd#57e445aadcc87cad33de8a969eb4203b219ec9dd"
dependencies = [
"ahash 0.8.0",
"arrow",
@ -1179,7 +1179,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "13.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feff5dc805449afa4a7ecb5ca88b979ad5739cce#feff5dc805449afa4a7ecb5ca88b979ad5739cce"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=57e445aadcc87cad33de8a969eb4203b219ec9dd#57e445aadcc87cad33de8a969eb4203b219ec9dd"
dependencies = [
"arrow",
"object_store",
@ -1191,7 +1191,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "13.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feff5dc805449afa4a7ecb5ca88b979ad5739cce#feff5dc805449afa4a7ecb5ca88b979ad5739cce"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=57e445aadcc87cad33de8a969eb4203b219ec9dd#57e445aadcc87cad33de8a969eb4203b219ec9dd"
dependencies = [
"ahash 0.8.0",
"arrow",
@ -1203,7 +1203,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "13.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feff5dc805449afa4a7ecb5ca88b979ad5739cce#feff5dc805449afa4a7ecb5ca88b979ad5739cce"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=57e445aadcc87cad33de8a969eb4203b219ec9dd#57e445aadcc87cad33de8a969eb4203b219ec9dd"
dependencies = [
"arrow",
"async-trait",
@ -1218,7 +1218,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "13.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feff5dc805449afa4a7ecb5ca88b979ad5739cce#feff5dc805449afa4a7ecb5ca88b979ad5739cce"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=57e445aadcc87cad33de8a969eb4203b219ec9dd#57e445aadcc87cad33de8a969eb4203b219ec9dd"
dependencies = [
"ahash 0.8.0",
"arrow",
@ -1242,7 +1242,7 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "13.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feff5dc805449afa4a7ecb5ca88b979ad5739cce#feff5dc805449afa4a7ecb5ca88b979ad5739cce"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=57e445aadcc87cad33de8a969eb4203b219ec9dd#57e445aadcc87cad33de8a969eb4203b219ec9dd"
dependencies = [
"arrow",
"datafusion 13.0.0",
@ -1255,7 +1255,7 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "13.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feff5dc805449afa4a7ecb5ca88b979ad5739cce#feff5dc805449afa4a7ecb5ca88b979ad5739cce"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=57e445aadcc87cad33de8a969eb4203b219ec9dd#57e445aadcc87cad33de8a969eb4203b219ec9dd"
dependencies = [
"arrow",
"datafusion-common",
@ -1266,7 +1266,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "13.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=feff5dc805449afa4a7ecb5ca88b979ad5739cce#feff5dc805449afa4a7ecb5ca88b979ad5739cce"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=57e445aadcc87cad33de8a969eb4203b219ec9dd#57e445aadcc87cad33de8a969eb4203b219ec9dd"
dependencies = [
"arrow",
"datafusion-common",

View File

@ -9,6 +9,6 @@ description = "Re-exports datafusion at a specific version"
# Rename to workaround doctest bug
# Turn off optional datafusion features (e.g. don't get support for crypto functions or avro)
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="feff5dc805449afa4a7ecb5ca88b979ad5739cce", default-features = false, package = "datafusion" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="feff5dc805449afa4a7ecb5ca88b979ad5739cce" }
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="57e445aadcc87cad33de8a969eb4203b219ec9dd", default-features = false, package = "datafusion" }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="57e445aadcc87cad33de8a969eb4203b219ec9dd" }
workspace-hack = { path = "../workspace-hack"}

View File

@ -227,6 +227,15 @@ impl ParquetStorage {
let path = path.object_store_path();
trace!(path=?path, "fetching parquet data for filtered read");
// set up "fake" DataFusion session (TODO thread the real one
// down so config options set on query context take effect here)
let object_store = Arc::clone(&self.object_store);
let session_ctx = SessionContext::new();
let task_ctx = Arc::new(TaskContext::from(&session_ctx));
task_ctx
.runtime_env()
.register_object_store("iox", "iox", object_store);
// Compute final (output) schema after selection
let schema = Arc::new(
select_schema(selection, &schema)
@ -256,17 +265,11 @@ impl ParquetStorage {
projection: None,
limit: None,
table_partition_cols: vec![],
// TODO avoid this `copied_config` when config_options are directly available on context
config_options: session_ctx.copied_config().config_options(),
};
let exec = ParquetExec::new(base_config, expr, None);
// set up "fake" DataFusion session
let object_store = Arc::clone(&self.object_store);
let session_ctx = SessionContext::new();
let task_ctx = Arc::new(TaskContext::from(&session_ctx));
task_ctx
.runtime_env()
.register_object_store("iox", "iox", object_store);
Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&schema),
futures::stream::once(execute_stream(Arc::new(exec), task_ctx)).try_flatten(),

View File

@ -2,6 +2,7 @@
use datafusion::{
arrow::datatypes::SchemaRef as ArrowSchemaRef,
config::ConfigOptions,
datasource::{
file_format::{parquet::ParquetFormat, FileFormat},
listing::PartitionedFile,
@ -212,6 +213,7 @@ impl ParquetFileReader {
projection: None,
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
};
// set up enough datafusion context to do the real read session

View File

@ -135,23 +135,21 @@
| 872 | 6 | 1970-01-01T00:00:00.000000110Z | lawrence |
+-------+--------+--------------------------------+-----------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200 and town != 'tewsbury' and (system =5 or town = 'lawrence') and count < 40000;
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Filter: CAST(restaurant.count AS Int64)restaurant.count > Int64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence")) AND CAST(restaurant.count AS Int64)restaurant.count < Int64(40000) |
| | Projection: CAST(restaurant.count AS Int64) AS CAST(restaurant.count AS Int64)restaurant.count, restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence")), CAST(restaurant.count AS Int64) < Int64(40000)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(restaurant.count AS Int64)restaurant.count@0 > 200 AND town@4 != tewsbury AND system@2 = 5 OR town@4 = lawrence AND CAST(restaurant.count AS Int64)restaurant.count@0 < 40000 |
| | ProjectionExec: expr=[CAST(count@0 AS Int64) as CAST(restaurant.count AS Int64)restaurant.count, count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [town != Dictionary(Int32, Utf8("tewsbury"))] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Filter: CAST(restaurant.count AS Int64)restaurant.count > Int64(200) AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence")) AND CAST(restaurant.count AS Int64)restaurant.count < Int64(40000) |
| | Projection: CAST(restaurant.count AS Int64) AS CAST(restaurant.count AS Int64)restaurant.count, restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system = Float64(5) OR restaurant.town = Dictionary(Int32, Utf8("lawrence")), CAST(restaurant.count AS Int64) < Int64(40000)] |
| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(restaurant.count AS Int64)restaurant.count@0 > 200 AND town@4 != tewsbury AND system@2 = 5 OR town@4 = lawrence AND CAST(restaurant.count AS Int64)restaurant.count@0 < 40000 |
| | ProjectionExec: expr=[CAST(count@0 AS Int64) as CAST(restaurant.count AS Int64)restaurant.count, count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate exprs: [town != Dictionary(Int32, Utf8("tewsbury"))] |
| | |
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: SELECT * from restaurant where count > 200 and count < 40000;
-- Results After Sorting
+-------+--------+--------------------------------+-----------+
@ -164,23 +162,21 @@
| 872 | 6 | 1970-01-01T00:00:00.000000110Z | lawrence |
+-------+--------+--------------------------------+-----------+
-- SQL: EXPLAIN SELECT * from restaurant where count > 200 and count < 40000;
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Filter: CAST(restaurant.count AS Int64)restaurant.count > Int64(200) AND CAST(restaurant.count AS Int64)restaurant.count < Int64(40000) |
| | Projection: CAST(restaurant.count AS Int64) AS CAST(restaurant.count AS Int64)restaurant.count, restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), CAST(restaurant.count AS Int64) < Int64(40000)] |
| physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(restaurant.count AS Int64)restaurant.count@0 > 200 AND CAST(restaurant.count AS Int64)restaurant.count@0 < 40000 |
| | ProjectionExec: expr=[CAST(count@0 AS Int64) as CAST(restaurant.count AS Int64)restaurant.count, count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
| | |
+---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | Filter: CAST(restaurant.count AS Int64)restaurant.count > Int64(200) AND CAST(restaurant.count AS Int64)restaurant.count < Int64(40000) |
| | Projection: CAST(restaurant.count AS Int64) AS CAST(restaurant.count AS Int64)restaurant.count, restaurant.count, restaurant.system, restaurant.time, restaurant.town |
| | TableScan: restaurant projection=[count, system, time, town], partial_filters=[CAST(restaurant.count AS Int64) > Int64(200), CAST(restaurant.count AS Int64) < Int64(40000)] |
| physical_plan | ProjectionExec: expr=[count@1 as count, system@2 as system, time@3 as time, town@4 as town] |
| | CoalesceBatchesExec: target_batch_size=4096 |
| | FilterExec: CAST(restaurant.count AS Int64)restaurant.count@0 > 200 AND CAST(restaurant.count AS Int64)restaurant.count@0 < 40000 |
| | ProjectionExec: expr=[CAST(count@0 AS Int64) as CAST(restaurant.count AS Int64)restaurant.count, count@0 as count, system@1 as system, time@2 as time, town@3 as town] |
| | RepartitionExec: partitioning=RoundRobinBatch(4) |
| | IOxReadFilterNode: table_name=restaurant, chunks=1 predicate=Predicate |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-- SQL: SELECT * from restaurant where system > 4.0 and system < 7.0;
-- Results After Sorting
+-------+--------+--------------------------------+-----------+