diff --git a/Cargo.lock b/Cargo.lock index a32044b3e7..9e40bdc4e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1399,7 +1399,7 @@ dependencies = [ [[package]] name = "datafusion" version = "16.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6a050058bd704f73b38106b7abf21dc4539eebc#e6a050058bd704f73b38106b7abf21dc4539eebc" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=350cb47289a76e579b221fe374e4cf09db332569#350cb47289a76e579b221fe374e4cf09db332569" dependencies = [ "ahash 0.8.2", "arrow", @@ -1445,7 +1445,7 @@ dependencies = [ [[package]] name = "datafusion-common" version = "16.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6a050058bd704f73b38106b7abf21dc4539eebc#e6a050058bd704f73b38106b7abf21dc4539eebc" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=350cb47289a76e579b221fe374e4cf09db332569#350cb47289a76e579b221fe374e4cf09db332569" dependencies = [ "arrow", "chrono", @@ -1458,7 +1458,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "16.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6a050058bd704f73b38106b7abf21dc4539eebc#e6a050058bd704f73b38106b7abf21dc4539eebc" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=350cb47289a76e579b221fe374e4cf09db332569#350cb47289a76e579b221fe374e4cf09db332569" dependencies = [ "ahash 0.8.2", "arrow", @@ -1470,7 +1470,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "16.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6a050058bd704f73b38106b7abf21dc4539eebc#e6a050058bd704f73b38106b7abf21dc4539eebc" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=350cb47289a76e579b221fe374e4cf09db332569#350cb47289a76e579b221fe374e4cf09db332569" dependencies = [ "arrow", "async-trait", @@ -1486,7 +1486,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "16.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6a050058bd704f73b38106b7abf21dc4539eebc#e6a050058bd704f73b38106b7abf21dc4539eebc" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=350cb47289a76e579b221fe374e4cf09db332569#350cb47289a76e579b221fe374e4cf09db332569" dependencies = [ "ahash 0.8.2", "arrow", @@ -1516,7 +1516,7 @@ dependencies = [ [[package]] name = "datafusion-proto" version = "16.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6a050058bd704f73b38106b7abf21dc4539eebc#e6a050058bd704f73b38106b7abf21dc4539eebc" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=350cb47289a76e579b221fe374e4cf09db332569#350cb47289a76e579b221fe374e4cf09db332569" dependencies = [ "arrow", "chrono", @@ -1533,7 +1533,7 @@ dependencies = [ [[package]] name = "datafusion-row" version = "16.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6a050058bd704f73b38106b7abf21dc4539eebc#e6a050058bd704f73b38106b7abf21dc4539eebc" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=350cb47289a76e579b221fe374e4cf09db332569#350cb47289a76e579b221fe374e4cf09db332569" dependencies = [ "arrow", "datafusion-common", @@ -1544,7 +1544,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "16.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e6a050058bd704f73b38106b7abf21dc4539eebc#e6a050058bd704f73b38106b7abf21dc4539eebc" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=350cb47289a76e579b221fe374e4cf09db332569#350cb47289a76e579b221fe374e4cf09db332569" dependencies = [ "arrow-schema", "datafusion-common", diff --git a/Cargo.toml b/Cargo.toml index 8d273eae60..78177c5e2c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,8 +116,8 @@ license = "MIT OR Apache-2.0" [workspace.dependencies] arrow = { version = "31.0.0" } arrow-flight = { version = "31.0.0" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="e6a050058bd704f73b38106b7abf21dc4539eebc", default-features = false } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="e6a050058bd704f73b38106b7abf21dc4539eebc" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="350cb47289a76e579b221fe374e4cf09db332569", default-features = false } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="350cb47289a76e579b221fe374e4cf09db332569" } hashbrown = { version = "0.13.2" } parquet = { version = "31.0.0" } diff --git a/query_tests/cases/in/dedup_and_predicates_parquet.expected b/query_tests/cases/in/dedup_and_predicates_parquet.expected index d56213f83e..69c97c6cad 100644 --- a/query_tests/cases/in/dedup_and_predicates_parquet.expected +++ b/query_tests/cases/in/dedup_and_predicates_parquet.expected @@ -17,7 +17,7 @@ | physical_plan | SortExec: [tag@2 ASC NULLS LAST] | | | CoalescePartitionsExec | | | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | DeduplicateExec: [tag@2 ASC,time@3 ASC] | | | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] | | | UnionExec | @@ -94,7 +94,7 @@ | | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: time@3 = 0 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | DeduplicateExec: [tag@2 ASC,time@3 ASC] | | | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] | | | UnionExec | diff --git a/query_tests/cases/in/dedup_and_predicates_parquet_ingester.expected b/query_tests/cases/in/dedup_and_predicates_parquet_ingester.expected index c7fa1ecd41..c2514b7050 100644 --- a/query_tests/cases/in/dedup_and_predicates_parquet_ingester.expected +++ b/query_tests/cases/in/dedup_and_predicates_parquet_ingester.expected @@ -17,7 +17,7 @@ | physical_plan | SortExec: [tag@2 ASC NULLS LAST] | | | CoalescePartitionsExec | | | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | DeduplicateExec: [tag@2 ASC,time@3 ASC] | | | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] | | | UnionExec | @@ -97,7 +97,7 @@ | | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: time@3 = 0 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | DeduplicateExec: [tag@2 ASC,time@3 ASC] | | | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] | | | UnionExec | diff --git a/query_tests/cases/in/duplicates_ingester.expected b/query_tests/cases/in/duplicates_ingester.expected index 73cad6fe3c..5461ec8b2a 100644 --- a/query_tests/cases/in/duplicates_ingester.expected +++ b/query_tests/cases/in/duplicates_ingester.expected @@ -10,7 +10,7 @@ | physical_plan | SortExec: [time@0 ASC NULLS LAST,state@1 ASC NULLS LAST,city@2 ASC NULLS LAST] | | | CoalescePartitionsExec | | | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 | | | UnionExec | | | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC] | | | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC] | @@ -31,7 +31,7 @@ | logical_plan | Projection: h2o.time, h2o.state, h2o.city, h2o.min_temp, h2o.max_temp, h2o.area | | | TableScan: h2o projection=[area, city, max_temp, min_temp, state, time] | | physical_plan | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 | | | UnionExec | | | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC] | | | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC] | @@ -58,14 +58,14 @@ | | ProjectionExec: expr=[state@0 as name] | | | UnionExec | | | ProjectionExec: expr=[state@1 as state] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | DeduplicateExec: [state@1 ASC,city@0 ASC,time@2 ASC] | | | SortPreservingMergeExec: [state@1 ASC,city@0 ASC,time@2 ASC] | | | UnionExec | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[state@1 ASC, city@0 ASC, time@2 ASC], projection=[city, state, time] | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[state@1 ASC, city@0 ASC, time@2 ASC], projection=[city, state, time] | | | ProjectionExec: expr=[state@1 as state] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | DeduplicateExec: [city@0 ASC,state@1 ASC,time@2 ASC] | | | SortExec: [city@0 ASC,state@1 ASC,time@2 ASC] | | | RecordBatchesExec: batches_groups=1 batches=1 | @@ -73,14 +73,14 @@ | | ProjectionExec: expr=[city@0 as name] | | | UnionExec | | | ProjectionExec: expr=[city@0 as city] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | DeduplicateExec: [state@1 ASC,city@0 ASC,time@2 ASC] | | | SortPreservingMergeExec: [state@1 ASC,city@0 ASC,time@2 ASC] | | | UnionExec | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[state@1 ASC, city@0 ASC, time@2 ASC], projection=[city, state, time] | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[state@1 ASC, city@0 ASC, time@2 ASC], projection=[city, state, time] | | | ProjectionExec: expr=[city@0 as city] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | DeduplicateExec: [city@0 ASC,state@1 ASC,time@2 ASC] | | | SortExec: [city@0 ASC,state@1 ASC,time@2 ASC] | | | RecordBatchesExec: batches_groups=1 batches=1 | diff --git a/query_tests/cases/in/duplicates_parquet.expected b/query_tests/cases/in/duplicates_parquet.expected index 14b4a28b8d..636e9e420e 100644 --- a/query_tests/cases/in/duplicates_parquet.expected +++ b/query_tests/cases/in/duplicates_parquet.expected @@ -10,7 +10,7 @@ | physical_plan | SortExec: [time@0 ASC NULLS LAST,state@1 ASC NULLS LAST,city@2 ASC NULLS LAST] | | | CoalescePartitionsExec | | | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 | | | UnionExec | | | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC] | | | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC] | @@ -28,7 +28,7 @@ | logical_plan | Projection: h2o.time, h2o.state, h2o.city, h2o.min_temp, h2o.max_temp, h2o.area | | | TableScan: h2o projection=[area, city, max_temp, min_temp, state, time] | | physical_plan | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 | | | UnionExec | | | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC] | | | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC] | @@ -52,7 +52,7 @@ | | ProjectionExec: expr=[state@0 as name] | | | UnionExec | | | ProjectionExec: expr=[state@1 as state] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | DeduplicateExec: [state@1 ASC,city@0 ASC,time@2 ASC] | | | SortPreservingMergeExec: [state@1 ASC,city@0 ASC,time@2 ASC] | | | UnionExec | @@ -62,7 +62,7 @@ | | ProjectionExec: expr=[city@0 as name] | | | UnionExec | | | ProjectionExec: expr=[city@0 as city] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | DeduplicateExec: [state@1 ASC,city@0 ASC,time@2 ASC] | | | SortPreservingMergeExec: [state@1 ASC,city@0 ASC,time@2 ASC] | | | UnionExec | @@ -87,7 +87,7 @@ | | ProjectionExec: expr=[area@0 as area, city@1 as city, max_temp@2 as max_temp, min_temp@3 as min_temp, state@4 as state, time@5 as time], metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] | | | FilterExec: state@4 = MA, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] | -| | RepartitionExec: partitioning=RoundRobinBatch(4), metrics=[fetch_time=1.234ms, repart_time=1.234ms, send_time=1.234ms] | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3, metrics=[fetch_time=1.234ms, repart_time=1.234ms, send_time=1.234ms] | | | UnionExec, metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=10, spill_count=0, spilled_bytes=0] | | | DeduplicateExec: [state@4 ASC,city@1 ASC,time@5 ASC], metrics=[elapsed_compute=1.234ms, mem_used=0, num_dupes=2, output_rows=5, spill_count=0, spilled_bytes=0] | | | SortPreservingMergeExec: [state@4 ASC,city@1 ASC,time@5 ASC], metrics=[elapsed_compute=1.234ms, mem_used=0, output_rows=7, spill_count=0, spilled_bytes=0] | diff --git a/query_tests/cases/in/duplicates_parquet_many.expected b/query_tests/cases/in/duplicates_parquet_many.expected index 899284c45a..ea77e5a3fa 100644 --- a/query_tests/cases/in/duplicates_parquet_many.expected +++ b/query_tests/cases/in/duplicates_parquet_many.expected @@ -19,7 +19,7 @@ | | AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1)), SUM(m.f)] | | | UnionExec | | | ProjectionExec: expr=[f@0 as f] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | DeduplicateExec: [tag@1 ASC,time@2 ASC] | | | SortPreservingMergeExec: [tag@1 ASC,time@2 ASC] | | | UnionExec | diff --git a/query_tests/cases/in/pushdown.expected b/query_tests/cases/in/pushdown.expected index 336aa522df..633f5a97b2 100644 --- a/query_tests/cases/in/pushdown.expected +++ b/query_tests/cases/in/pushdown.expected @@ -20,7 +20,7 @@ | logical_plan | Projection: restaurant.count, restaurant.system, restaurant.time, restaurant.town | | | TableScan: restaurant projection=[count, system, time, town] | | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[count, system, time, town] | | | | +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+ @@ -47,7 +47,7 @@ | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: count@0 > 200 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200), pruning_predicate=count_max@0 > 200, projection=[count, system, time, town] | | | | +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ @@ -62,7 +62,7 @@ | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: CAST(count@0 AS Float64) > 200 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=CAST(count AS Float64) > Float64(200), projection=[count, system, time, town] | | | | +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ @@ -77,7 +77,7 @@ | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: system@1 > 4 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(4), pruning_predicate=system_max@0 > 4, projection=[count, system, time, town] | | | | +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ @@ -103,7 +103,7 @@ | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: count@0 > 200 AND town@3 != tewsbury | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200) AND town != Dictionary(Int32, Utf8("tewsbury")), pruning_predicate=count_max@0 > 200 AND town_min@1 != tewsbury OR tewsbury != town_max@2, projection=[count, system, time, town] | | | | +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ @@ -128,7 +128,7 @@ | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: count@0 > 200 AND town@3 != tewsbury AND system@1 = 5 OR town@3 = lawrence | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200) AND town != Dictionary(Int32, Utf8("tewsbury")) AND (system = Float64(5) OR town = Dictionary(Int32, Utf8("lawrence"))), pruning_predicate=count_max@0 > 200 AND town_min@1 != tewsbury OR tewsbury != town_max@2 AND system_min@3 <= 5 AND 5 <= system_max@4 OR town_min@1 <= lawrence AND lawrence <= town_max@2, projection=[count, system, time, town] | | | || physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: count@0 > 200 AND town@3 != tewsbury AND system@1 = 5 OR town@3 = lawrence AND count@0 < 40000 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200) AND town != Dictionary(Int32, Utf8("tewsbury")) AND (system = Float64(5) OR town = Dictionary(Int32, Utf8("lawrence"))) AND count < UInt64(40000), pruning_predicate=count_max@0 > 200 AND town_min@1 != tewsbury OR tewsbury != town_max@2 AND system_min@3 <= 5 AND 5 <= system_max@4 OR town_min@1 <= lawrence AND lawrence <= town_max@2 AND count_min@5 < 40000, projection=[count, system, time, town] | | | || physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: count@0 > 200 AND count@0 < 40000 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=count > UInt64(200) AND count < UInt64(40000), pruning_predicate=count_max@0 > 200 AND count_min@1 < 40000, projection=[count, system, time, town] | | | | +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ @@ -205,7 +205,7 @@ | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: system@1 > 4 AND system@1 < 7 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(4) AND system < Float64(7), pruning_predicate=system_max@0 > 4 AND system_min@1 < 7, projection=[count, system, time, town] | | | | +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ @@ -229,7 +229,7 @@ | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: system@1 > 5 AND system@1 < 7 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(5) AND system < Float64(7), pruning_predicate=system_max@0 > 5 AND system_min@1 < 7, projection=[count, system, time, town] | | | | +---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ @@ -252,7 +252,7 @@ | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: system@1 > 5 AND town@3 != tewsbury AND 7 > system@1 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(5) AND town != Dictionary(Int32, Utf8("tewsbury")) AND Float64(7) > system, pruning_predicate=system_max@0 > 5 AND town_min@1 != tewsbury OR tewsbury != town_max@2 AND system_min@3 < 7, projection=[count, system, time, town] | | | | +---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ @@ -274,7 +274,7 @@ | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: system@1 > 5 AND tewsbury != town@3 AND system@1 < 7 AND count@0 = 632 OR town@3 = reading | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=system > Float64(5) AND Dictionary(Int32, Utf8("tewsbury")) != town AND system < Float64(7) AND (count = UInt64(632) OR town = Dictionary(Int32, Utf8("reading"))), pruning_predicate=system_max@0 > 5 AND town_min@1 != tewsbury OR tewsbury != town_max@2 AND system_min@3 < 7 AND count_min@4 <= 632 AND 632 <= count_max@5 OR town_min@1 <= reading AND reading <= town_max@2, projection=[count, system, time, town] | | | || | Filter: Float64(5) < restaurant.system AND restaurant.town != Dictionary(Int32, Utf8("tewsbury")) AND restaurant.system < Float64(7) AND (restaurant.count = UInt64(632) OR restaurant.town = Dictionary(Int32, Utf8("reading"))) AND restaurant.time > TimestampNanosecond(130, None) | | | TableScan: restaurant projection=[count, system, time, town], partial_filters=[Float64(5) < restaurant.system, restaurant.town != Dictionary(Int32, Utf8("tewsbury")), restaurant.system < Float64(7), restaurant.count = UInt64(632) OR restaurant.town = Dictionary(Int32, Utf8("reading")), restaurant.time > TimestampNanosecond(130, None)] | | physical_plan | ProjectionExec: expr=[count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: 5 < system@1 AND town@3 != tewsbury AND system@1 < 7 AND count@0 = 632 OR town@3 = reading AND time@2 > 130 | | | EmptyExec: produce_one_row=false | @@ -332,7 +332,7 @@ | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: CAST(restaurant.town AS Utf8)restaurant.town@0 LIKE %foo% OR CAST(restaurant.town AS Utf8)restaurant.town@0 LIKE %bar% OR CAST(restaurant.town AS Utf8)restaurant.town@0 LIKE %baz% AND CAST(restaurant.town AS Utf8)restaurant.town@0 NOT LIKE %one% AND CAST(restaurant.town AS Utf8)restaurant.town@0 NOT LIKE %two% | | | ProjectionExec: expr=[CAST(town@3 AS Utf8) as CAST(restaurant.town AS Utf8)restaurant.town, count@0 as count, system@1 as system, time@2 as time, town@3 as town] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=(CAST(town AS Utf8) AS restaurant.town LIKE Utf8("%foo%") OR CAST(town AS Utf8) AS restaurant.town LIKE Utf8("%bar%") OR CAST(town AS Utf8) AS restaurant.town LIKE Utf8("%baz%")) AND CAST(town AS Utf8) AS restaurant.town NOT LIKE Utf8("%one%") AND CAST(town AS Utf8) AS restaurant.town NOT LIKE Utf8("%two%") AND (CAST(town AS Utf8) LIKE Utf8("%foo%") OR CAST(town AS Utf8) LIKE Utf8("%bar%") OR CAST(town AS Utf8) LIKE Utf8("%baz%")) AND CAST(town AS Utf8) NOT LIKE Utf8("%one%") AND CAST(town AS Utf8) NOT LIKE Utf8("%two%"), projection=[count, system, time, town] | | | | +---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ diff --git a/query_tests/cases/in/retention.expected b/query_tests/cases/in/retention.expected index 4f1229c635..98e209f4a9 100644 --- a/query_tests/cases/in/retention.expected +++ b/query_tests/cases/in/retention.expected @@ -18,7 +18,7 @@ | physical_plan | SortExec: [host@0 ASC NULLS LAST,load@1 ASC NULLS LAST,time@2 ASC NULLS LAST] | | | CoalescePartitionsExec | | | ProjectionExec: expr=[host@0 as host, load@1 as load, time@2 as time] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | DeduplicateExec: [host@0 ASC,time@2 ASC] | | | SortPreservingMergeExec: [host@0 ASC,time@2 ASC] | | | UnionExec | @@ -51,7 +51,7 @@ | | ProjectionExec: expr=[host@0 as host, load@1 as load, time@2 as time] | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: host@0 != b | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | DeduplicateExec: [host@0 ASC,time@2 ASC] | | | SortPreservingMergeExec: [host@0 ASC,time@2 ASC] | | | UnionExec | diff --git a/query_tests/cases/in/several_chunks.expected b/query_tests/cases/in/several_chunks.expected index e028700b8c..8a491b925f 100644 --- a/query_tests/cases/in/several_chunks.expected +++ b/query_tests/cases/in/several_chunks.expected @@ -20,7 +20,7 @@ | logical_plan | Projection: h2o.city, h2o.other_temp, h2o.state, h2o.temp, h2o.time | | | TableScan: h2o projection=[city, other_temp, state, temp, time] | | physical_plan | ProjectionExec: expr=[city@0 as city, other_temp@1 as other_temp, state@2 as state, temp@3 as temp, time@4 as time] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 | | | UnionExec | | | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] | | | SortPreservingMergeExec: [city@0 ASC,state@2 ASC,time@4 ASC] | @@ -57,7 +57,7 @@ | physical_plan | ProjectionExec: expr=[temp@1 as temp, other_temp@0 as other_temp, time@2 as time] | | | UnionExec | | | ProjectionExec: expr=[other_temp@1 as other_temp, temp@3 as temp, time@4 as time] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] | | | SortPreservingMergeExec: [city@0 ASC,state@2 ASC,time@4 ASC] | | | UnionExec | @@ -65,7 +65,7 @@ | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] | | | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] | | | ProjectionExec: expr=[other_temp@1 as other_temp, temp@3 as temp, time@4 as time] | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] | | | SortExec: [city@0 ASC,state@2 ASC,time@4 ASC] | | | RecordBatchesExec: batches_groups=1 batches=1 | @@ -83,7 +83,7 @@ | physical_plan | ProjectionExec: expr=[city@0 as city, other_temp@1 as other_temp, state@2 as state, temp@3 as temp, time@4 as time] | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: time@4 >= 250 | -| | RepartitionExec: partitioning=RoundRobinBatch(4) | +| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 | | | UnionExec | | | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] | | | SortPreservingMergeExec: [city@0 ASC,state@2 ASC,time@4 ASC] | diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 1bae8f8392..56eba1327a 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -29,7 +29,7 @@ bytes = { version = "1", features = ["std"] } chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "iana-time-zone", "serde", "std", "winapi"] } crossbeam-utils = { version = "0.8", features = ["std"] } crypto-common = { version = "0.1", default-features = false, features = ["std"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "e6a050058bd704f73b38106b7abf21dc4539eebc", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "350cb47289a76e579b221fe374e4cf09db332569", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] } digest = { version = "0.10", features = ["alloc", "block-buffer", "core-api", "mac", "std", "subtle"] } either = { version = "1", features = ["use_std"] } fixedbitset = { version = "0.4", features = ["std"] }