From d0d355ba4dd8358b837fdfeb54cc42c5cc6d6b59 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 6 Sep 2023 10:08:54 +0200 Subject: [PATCH] refactor: unpack record batches later during query (#8663) For #8350 we want to be able to stream record batches from the ingester instead of waiting to buffer them fully before the query starts. Hence we can no longer inspect the batches in the "display" implementation of the plan. This change mostly contains the display change, not the actual streaming part. I'll do that in a follow-up. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- ...d_predicates_parquet_ingester.sql.expected | 10 +- .../cases/in/duplicates_ingester.sql.expected | 8 +- ...cates_parquet_20_and_ingester.sql.expected | 2 +- .../cases/in/retention.sql.expected | 4 +- .../cases/in/several_chunks.sql.expected | 6 +- .../cases/in/two_chunks.sql.expected | 4 +- iox_query/src/frontend/reorg.rs | 12 +-- .../src/physical_optimizer/combine_chunks.rs | 10 +- .../dedup/partition_split.rs | 20 ++-- .../physical_optimizer/dedup/remove_dedup.rs | 12 +-- .../physical_optimizer/dedup/time_split.rs | 14 +-- .../physical_optimizer/projection_pushdown.rs | 4 +- .../sort/parquet_sortness.rs | 4 +- .../sort/push_sort_through_union.rs | 32 +++---- iox_query/src/provider.rs | 24 ++--- iox_query/src/provider/physical.rs | 6 +- iox_query/src/provider/record_batch_exec.rs | 96 +++++++------------ iox_query_influxrpc/src/scan_plan.rs | 2 +- 18 files changed, 122 insertions(+), 148 deletions(-) diff --git a/influxdb_iox/tests/query_tests/cases/in/dedup_and_predicates_parquet_ingester.sql.expected b/influxdb_iox/tests/query_tests/cases/in/dedup_and_predicates_parquet_ingester.sql.expected index eb5b686900..168e2084c5 100644 --- a/influxdb_iox/tests/query_tests/cases/in/dedup_and_predicates_parquet_ingester.sql.expected +++ b/influxdb_iox/tests/query_tests/cases/in/dedup_and_predicates_parquet_ingester.sql.expected @@ -19,7 +19,7 @@ | | SortPreservingMergeExec: [tag@3 ASC,time@4 ASC,__chunk_order@0 ASC] | | | UnionExec | | | SortExec: expr=[tag@3 ASC,time@4 ASC,__chunk_order@0 ASC] | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 | +| | RecordBatchesExec: chunks=1 | | | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[__chunk_order, bar, foo, tag, time], output_ordering=[tag@3 ASC, time@4 ASC, __chunk_order@0 ASC] | | | | ---------- @@ -43,7 +43,7 @@ | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: tag@3 = A | | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 | +| | RecordBatchesExec: chunks=1 | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: tag@3 = A | | | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[__chunk_order, bar, foo, tag, time], output_ordering=[tag@3 ASC, time@4 ASC, __chunk_order@0 ASC], predicate=tag@2 = A, pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 | @@ -68,7 +68,7 @@ | | SortPreservingMergeExec: [tag@3 ASC,time@4 ASC,__chunk_order@0 ASC] | | | UnionExec | | | SortExec: expr=[tag@3 ASC,time@4 ASC,__chunk_order@0 ASC] | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 | +| | RecordBatchesExec: chunks=1 | | | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[__chunk_order, bar, foo, tag, time], output_ordering=[tag@3 ASC, time@4 ASC, __chunk_order@0 ASC] | | | | ---------- @@ -95,7 +95,7 @@ | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: time@4 = 0 | | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 | +| | RecordBatchesExec: chunks=1 | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: time@4 = 0 | | | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[__chunk_order, bar, foo, tag, time], output_ordering=[tag@3 ASC, time@4 ASC, __chunk_order@0 ASC], predicate=time@3 = 0, pruning_predicate=time_min@0 <= 0 AND 0 <= time_max@1 | @@ -123,7 +123,7 @@ | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: tag@3 = A AND time@4 = 0 | | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 | +| | RecordBatchesExec: chunks=1 | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: tag@3 = A AND time@4 = 0 | | | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[__chunk_order, bar, foo, tag, time], output_ordering=[tag@3 ASC, time@4 ASC, __chunk_order@0 ASC], predicate=tag@2 = A AND time@3 = 0, pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3 | diff --git a/influxdb_iox/tests/query_tests/cases/in/duplicates_ingester.sql.expected b/influxdb_iox/tests/query_tests/cases/in/duplicates_ingester.sql.expected index 30a89657d0..0d353b4809 100644 --- a/influxdb_iox/tests/query_tests/cases/in/duplicates_ingester.sql.expected +++ b/influxdb_iox/tests/query_tests/cases/in/duplicates_ingester.sql.expected @@ -44,7 +44,7 @@ | | ProjectionExec: expr=[time@1 as time, state@2 as state, city@3 as city, min_temp@4 as min_temp, max_temp@5 as max_temp, area@6 as area] | | | DeduplicateExec: [city@3 ASC,state@2 ASC,time@1 ASC] | | | SortExec: expr=[city@3 ASC,state@2 ASC,time@1 ASC,__chunk_order@0 ASC] | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=6 | +| | RecordBatchesExec: chunks=1 | | | | ---------- -- SQL: EXPLAIN select time, state, city, min_temp, max_temp, area from h2o; @@ -63,7 +63,7 @@ | | ProjectionExec: expr=[time@1 as time, state@2 as state, city@3 as city, min_temp@4 as min_temp, max_temp@5 as max_temp, area@6 as area] | | | DeduplicateExec: [city@3 ASC,state@2 ASC,time@1 ASC] | | | SortExec: expr=[city@3 ASC,state@2 ASC,time@1 ASC,__chunk_order@0 ASC] | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=6 | +| | RecordBatchesExec: chunks=1 | | | | ---------- -- SQL: EXPLAIN select state as name from h2o UNION ALL select city as name from h2o; @@ -87,7 +87,7 @@ | | ProjectionExec: expr=[state@2 as state] | | | DeduplicateExec: [city@1 ASC,state@2 ASC,time@3 ASC] | | | SortExec: expr=[city@1 ASC,state@2 ASC,time@3 ASC,__chunk_order@0 ASC] | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=6 | +| | RecordBatchesExec: chunks=1 | | | ProjectionExec: expr=[city@0 as name] | | | UnionExec | | | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[city] | @@ -98,7 +98,7 @@ | | ProjectionExec: expr=[city@1 as city] | | | DeduplicateExec: [city@1 ASC,state@2 ASC,time@3 ASC] | | | SortExec: expr=[city@1 ASC,state@2 ASC,time@3 ASC,__chunk_order@0 ASC] | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=6 | +| | RecordBatchesExec: chunks=1 | | | | ---------- -- SQL: select count(*) from h2o; diff --git a/influxdb_iox/tests/query_tests/cases/in/duplicates_parquet_20_and_ingester.sql.expected b/influxdb_iox/tests/query_tests/cases/in/duplicates_parquet_20_and_ingester.sql.expected index c82ace4f82..a8b1e1aeaa 100644 --- a/influxdb_iox/tests/query_tests/cases/in/duplicates_parquet_20_and_ingester.sql.expected +++ b/influxdb_iox/tests/query_tests/cases/in/duplicates_parquet_20_and_ingester.sql.expected @@ -22,7 +22,7 @@ | | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC,__chunk_order@0 ASC] | | | UnionExec | | | SortExec: expr=[tag@2 ASC,time@3 ASC,__chunk_order@0 ASC] | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 | +| | RecordBatchesExec: chunks=1 | | | ParquetExec: file_groups={10 groups: [[1/1/1/00000000-0000-0000-0000-00000000000a.parquet], [1/1/1/00000000-0000-0000-0000-00000000000b.parquet], [1/1/1/00000000-0000-0000-0000-00000000000c.parquet], [1/1/1/00000000-0000-0000-0000-00000000000d.parquet], [1/1/1/00000000-0000-0000-0000-00000000000e.parquet], ...]}, projection=[__chunk_order, f, tag, time], output_ordering=[tag@2 ASC, time@3 ASC, __chunk_order@0 ASC] | | | | ---------- \ No newline at end of file diff --git a/influxdb_iox/tests/query_tests/cases/in/retention.sql.expected b/influxdb_iox/tests/query_tests/cases/in/retention.sql.expected index da285e8e56..528bfb2c33 100644 --- a/influxdb_iox/tests/query_tests/cases/in/retention.sql.expected +++ b/influxdb_iox/tests/query_tests/cases/in/retention.sql.expected @@ -25,7 +25,7 @@ | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: time@3 > | | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 | +| | RecordBatchesExec: chunks=1 | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: time@3 > | | | ParquetExec: file_groups={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet], [1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, projection=[__chunk_order, host, load, time], output_ordering=[host@1 ASC, time@3 ASC, __chunk_order@0 ASC], predicate=time@2 > , pruning_predicate=time_max@0 > | @@ -56,7 +56,7 @@ | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: host@1 != b AND time@3 > | | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 | +| | RecordBatchesExec: chunks=1 | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: host@1 != b AND time@3 > | | | ParquetExec: file_groups={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet], [1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, projection=[__chunk_order, host, load, time], output_ordering=[host@1 ASC, time@3 ASC, __chunk_order@0 ASC], predicate=host@0 != b AND time@2 > , pruning_predicate=(host_min@0 != b OR b != host_max@1) AND time_max@2 > | diff --git a/influxdb_iox/tests/query_tests/cases/in/several_chunks.sql.expected b/influxdb_iox/tests/query_tests/cases/in/several_chunks.sql.expected index 1568a7ee73..7674b16a20 100644 --- a/influxdb_iox/tests/query_tests/cases/in/several_chunks.sql.expected +++ b/influxdb_iox/tests/query_tests/cases/in/several_chunks.sql.expected @@ -27,7 +27,7 @@ | | ProjectionExec: expr=[city@1 as city, other_temp@2 as other_temp, state@3 as state, temp@4 as temp, time@5 as time] | | | DeduplicateExec: [city@1 ASC,state@3 ASC,time@5 ASC] | | | SortExec: expr=[city@1 ASC,state@3 ASC,time@5 ASC,__chunk_order@0 ASC] | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 | +| | RecordBatchesExec: chunks=1 | | | | ---------- -- SQL: select temp, other_temp, time from h2o; @@ -59,7 +59,7 @@ | | ProjectionExec: expr=[temp@3 as temp, other_temp@4 as other_temp, time@5 as time] | | | DeduplicateExec: [city@1 ASC,state@2 ASC,time@5 ASC] | | | SortExec: expr=[city@1 ASC,state@2 ASC,time@5 ASC,__chunk_order@0 ASC] | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 | +| | RecordBatchesExec: chunks=1 | | | | ---------- -- SQL: EXPLAIN SELECT * from h2o where time >= to_timestamp('1970-01-01T00:00:00.000000250+00:00'); @@ -83,6 +83,6 @@ | | SortExec: expr=[city@1 ASC,state@3 ASC,time@5 ASC,__chunk_order@0 ASC] | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: time@5 >= 250 | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 | +| | RecordBatchesExec: chunks=1 | | | | ---------- \ No newline at end of file diff --git a/influxdb_iox/tests/query_tests/cases/in/two_chunks.sql.expected b/influxdb_iox/tests/query_tests/cases/in/two_chunks.sql.expected index b9db74230c..e80dae340d 100644 --- a/influxdb_iox/tests/query_tests/cases/in/two_chunks.sql.expected +++ b/influxdb_iox/tests/query_tests/cases/in/two_chunks.sql.expected @@ -19,7 +19,7 @@ | | SortPreservingMergeExec: [city@1 ASC,state@3 ASC,time@5 ASC,__chunk_order@0 ASC] | | | UnionExec | | | SortExec: expr=[city@1 ASC,state@3 ASC,time@5 ASC,__chunk_order@0 ASC] | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 | +| | RecordBatchesExec: chunks=1 | | | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[__chunk_order, city, other_temp, state, temp, time], output_ordering=[city@1 ASC, state@3 ASC, time@5 ASC, __chunk_order@0 ASC] | | | | ---------- @@ -43,7 +43,7 @@ | | SortPreservingMergeExec: [city@1 ASC,state@2 ASC,time@5 ASC,__chunk_order@0 ASC] | | | UnionExec | | | SortExec: expr=[city@1 ASC,state@2 ASC,time@5 ASC,__chunk_order@0 ASC] | -| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 | +| | RecordBatchesExec: chunks=1 | | | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[__chunk_order, city, state, temp, other_temp, time], output_ordering=[city@1 ASC, state@2 ASC, time@5 ASC, __chunk_order@0 ASC] | | | | ---------- \ No newline at end of file diff --git a/iox_query/src/frontend/reorg.rs b/iox_query/src/frontend/reorg.rs index cd8838b4c2..1149d6cb44 100644 --- a/iox_query/src/frontend/reorg.rs +++ b/iox_query/src/frontend/reorg.rs @@ -417,12 +417,12 @@ mod test { - " SortPreservingMergeExec: [tag1@2 DESC,time@3 ASC NULLS LAST]" - " UnionExec" - " SortExec: expr=[tag1@2 DESC,time@3 ASC NULLS LAST]" - - " RecordBatchesExec: batches_groups=1 batches=1 total_rows=5" + - " RecordBatchesExec: chunks=1" - " SortExec: expr=[tag1@2 DESC,time@3 ASC NULLS LAST]" - " ProjectionExec: expr=[field_int@1 as field_int, field_int2@2 as field_int2, tag1@3 as tag1, time@4 as time]" - " DeduplicateExec: [tag1@3 ASC,time@4 ASC]" - " SortExec: expr=[tag1@3 ASC,time@4 ASC,__chunk_order@0 ASC]" - - " RecordBatchesExec: batches_groups=1 batches=1 total_rows=4" + - " RecordBatchesExec: chunks=1" "### ); @@ -486,12 +486,12 @@ mod test { - " SortPreservingMergeExec: [time@3 ASC NULLS LAST,tag1@2 ASC]" - " UnionExec" - " SortExec: expr=[time@3 ASC NULLS LAST,tag1@2 ASC]" - - " RecordBatchesExec: batches_groups=1 batches=1 total_rows=5" + - " RecordBatchesExec: chunks=1" - " SortExec: expr=[time@3 ASC NULLS LAST,tag1@2 ASC]" - " ProjectionExec: expr=[field_int@1 as field_int, field_int2@2 as field_int2, tag1@3 as tag1, time@4 as time]" - " DeduplicateExec: [tag1@3 ASC,time@4 ASC]" - " SortExec: expr=[tag1@3 ASC,time@4 ASC,__chunk_order@0 ASC]" - - " RecordBatchesExec: batches_groups=1 batches=1 total_rows=4" + - " RecordBatchesExec: chunks=1" "### ); @@ -567,12 +567,12 @@ mod test { - " SortPreservingMergeExec: [time@3 ASC NULLS LAST,tag1@2 ASC]" - " UnionExec" - " SortExec: expr=[time@3 ASC NULLS LAST,tag1@2 ASC]" - - " RecordBatchesExec: batches_groups=1 batches=1 total_rows=5" + - " RecordBatchesExec: chunks=1" - " SortExec: expr=[time@3 ASC NULLS LAST,tag1@2 ASC]" - " ProjectionExec: expr=[field_int@1 as field_int, field_int2@2 as field_int2, tag1@3 as tag1, time@4 as time]" - " DeduplicateExec: [tag1@3 ASC,time@4 ASC]" - " SortExec: expr=[tag1@3 ASC,time@4 ASC,__chunk_order@0 ASC]" - - " RecordBatchesExec: batches_groups=1 batches=1 total_rows=4" + - " RecordBatchesExec: chunks=1" "### ); diff --git a/iox_query/src/physical_optimizer/combine_chunks.rs b/iox_query/src/physical_optimizer/combine_chunks.rs index 1b103da6d2..a0138cd476 100644 --- a/iox_query/src/physical_optimizer/combine_chunks.rs +++ b/iox_query/src/physical_optimizer/combine_chunks.rs @@ -114,15 +114,15 @@ mod tests { input: - " UnionExec" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[2.parquet]]}" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={2 groups: [[4.parquet], [5.parquet]]}" output: Ok: - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={2 groups: [[2.parquet, 5.parquet], [4.parquet]]}" "### ); @@ -211,13 +211,13 @@ mod tests { - " UnionExec" - " FilterExec: false" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" output: Ok: - " UnionExec" - " FilterExec: false" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" "### ); } diff --git a/iox_query/src/physical_optimizer/dedup/partition_split.rs b/iox_query/src/physical_optimizer/dedup/partition_split.rs index ed1ed1e45e..0715414985 100644 --- a/iox_query/src/physical_optimizer/dedup/partition_split.rs +++ b/iox_query/src/physical_optimizer/dedup/partition_split.rs @@ -150,13 +150,13 @@ mod tests { input: - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={1 group: [[3.parquet]]}, projection=[field, tag1, tag2, time]" output: Ok: - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={1 group: [[3.parquet]]}, projection=[field, tag1, tag2, time]" "### ); @@ -183,18 +183,18 @@ mod tests { input: - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={2 groups: [[3.parquet, 5.parquet], [4.parquet, 6.parquet]]}, projection=[field, tag1, tag2, time]" output: Ok: - " UnionExec" - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={2 groups: [[3.parquet, 6.parquet], [5.parquet]]}, projection=[field, tag1, tag2, time]" - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[4.parquet]]}, projection=[field, tag1, tag2, time]" "### ); @@ -238,18 +238,18 @@ mod tests { input: - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={2 groups: [[3.parquet, 5.parquet], [4.parquet, 6.parquet]]}, projection=[field, tag1, tag2, time]" output: Ok: - " UnionExec" - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={2 groups: [[3.parquet, 6.parquet], [5.parquet]]}, projection=[field, tag1, tag2, time]" - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[4.parquet]]}, projection=[field, tag1, tag2, time]" "### ); @@ -275,12 +275,12 @@ mod tests { input: - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=3 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=3" output: Ok: - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=3 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=3" "### ); } diff --git a/iox_query/src/physical_optimizer/dedup/remove_dedup.rs b/iox_query/src/physical_optimizer/dedup/remove_dedup.rs index 65926b3f28..4bfab07150 100644 --- a/iox_query/src/physical_optimizer/dedup/remove_dedup.rs +++ b/iox_query/src/physical_optimizer/dedup/remove_dedup.rs @@ -101,11 +101,11 @@ mod tests { input: - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" output: Ok: - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" "### ); } @@ -123,12 +123,12 @@ mod tests { input: - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" output: Ok: - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" "### ); } @@ -147,12 +147,12 @@ mod tests { input: - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" output: Ok: - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" "### ); } diff --git a/iox_query/src/physical_optimizer/dedup/time_split.rs b/iox_query/src/physical_optimizer/dedup/time_split.rs index 1e09d2b075..57f18baf5f 100644 --- a/iox_query/src/physical_optimizer/dedup/time_split.rs +++ b/iox_query/src/physical_optimizer/dedup/time_split.rs @@ -145,13 +145,13 @@ mod tests { input: - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={1 group: [[3.parquet]]}, projection=[field, tag1, tag2, time]" output: Ok: - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={1 group: [[3.parquet]]}, projection=[field, tag1, tag2, time]" "### ); @@ -186,18 +186,18 @@ mod tests { input: - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={2 groups: [[3.parquet, 5.parquet], [4.parquet, 6.parquet]]}, projection=[field, tag1, tag2, time]" output: Ok: - " UnionExec" - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={2 groups: [[6.parquet, 5.parquet], [3.parquet]]}, projection=[field, tag1, tag2, time]" - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[4.parquet]]}, projection=[field, tag1, tag2, time]" "### ); @@ -223,12 +223,12 @@ mod tests { input: - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=3 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=3" output: Ok: - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=3 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=3" "### ); } diff --git a/iox_query/src/physical_optimizer/projection_pushdown.rs b/iox_query/src/physical_optimizer/projection_pushdown.rs index 8e2bb26ba7..be263130f5 100644 --- a/iox_query/src/physical_optimizer/projection_pushdown.rs +++ b/iox_query/src/physical_optimizer/projection_pushdown.rs @@ -1297,10 +1297,10 @@ mod tests { --- input: - " ProjectionExec: expr=[tag1@0 as tag1]" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" output: Ok: - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" "### ); diff --git a/iox_query/src/physical_optimizer/sort/parquet_sortness.rs b/iox_query/src/physical_optimizer/sort/parquet_sortness.rs index ce7f4b9cf8..95b710301c 100644 --- a/iox_query/src/physical_optimizer/sort/parquet_sortness.rs +++ b/iox_query/src/physical_optimizer/sort/parquet_sortness.rs @@ -612,13 +612,13 @@ mod tests { input: - " DeduplicateExec: [col2@1 ASC,col1@0 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=0 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=0" - " ParquetExec: file_groups={1 group: [[1.parquet, 2.parquet]]}, projection=[col1, col2, col3, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, __chunk_order@3 ASC]" output: Ok: - " DeduplicateExec: [col2@1 ASC,col1@0 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=0 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=0" - " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, col3, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, __chunk_order@3 ASC]" "### ); diff --git a/iox_query/src/physical_optimizer/sort/push_sort_through_union.rs b/iox_query/src/physical_optimizer/sort/push_sort_through_union.rs index 2ba3f20440..6563a86512 100644 --- a/iox_query/src/physical_optimizer/sort/push_sort_through_union.rs +++ b/iox_query/src/physical_optimizer/sort/push_sort_through_union.rs @@ -266,7 +266,7 @@ mod test { - " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8" - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]" output: Ok: @@ -275,7 +275,7 @@ mod test { - " SortPreservingRepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4" - " UnionExec" - " SortExec: expr=[col2@1 ASC,col1@0 ASC,time@3 ASC,__chunk_order@4 ASC]" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]" "### ); @@ -317,7 +317,7 @@ mod test { - " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8" - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]" output: Ok: @@ -327,7 +327,7 @@ mod test { - " SortPreservingRepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4" - " UnionExec" - " SortExec: expr=[col2@1 ASC,col1@0 ASC,time@3 ASC,__chunk_order@4 ASC]" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]" "### ); @@ -358,14 +358,14 @@ mod test { - " DeduplicateExec: [col2@1 ASC,col1@0 ASC,time@3 ASC,__chunk_order@4 ASC]" - " SortExec: expr=[col2@1 ASC,col1@0 ASC,time@3 ASC,__chunk_order@4 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]" output: Ok: - " DeduplicateExec: [col2@1 ASC,col1@0 ASC,time@3 ASC,__chunk_order@4 ASC]" - " UnionExec" - " SortExec: expr=[col2@1 ASC,col1@0 ASC,time@3 ASC,__chunk_order@4 ASC]" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]" "### ); @@ -402,8 +402,8 @@ mod test { - " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8" - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" + - " RecordBatchesExec: chunks=2" output: Ok: - " DeduplicateExec: [col2@1 ASC,col1@0 ASC,time@3 ASC,__chunk_order@4 ASC]" @@ -411,8 +411,8 @@ mod test { - " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8" - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" + - " RecordBatchesExec: chunks=2" "### ); } @@ -537,7 +537,7 @@ mod test { - " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8" - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]" output: Ok: @@ -546,7 +546,7 @@ mod test { - " SortPreservingRepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4" - " UnionExec" - " SortExec: expr=[col2@1 ASC,col1@0 ASC,time@3 ASC,__chunk_order@4 ASC]" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]" "### ); @@ -586,7 +586,7 @@ mod test { - " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8" - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]" output: Ok: @@ -596,7 +596,7 @@ mod test { - " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8" - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]" "### ); @@ -635,7 +635,7 @@ mod test { - " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8" - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col1@0 ASC, col2@1 ASC, time@3 ASC, __chunk_order@4 ASC]" output: Ok: @@ -644,7 +644,7 @@ mod test { - " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8" - " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4" - " UnionExec" - - " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=2" - " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col1@0 ASC, col2@1 ASC, time@3 ASC, __chunk_order@4 ASC]" "### ); diff --git a/iox_query/src/provider.rs b/iox_query/src/provider.rs index 54d0e8d140..bc1602baa2 100644 --- a/iox_query/src/provider.rs +++ b/iox_query/src/provider.rs @@ -370,7 +370,7 @@ mod test { - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]" "### ); @@ -387,7 +387,7 @@ mod test { - " ProjectionExec: expr=[tag1@1 as tag1, time@3 as time]" - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]" "### ); @@ -408,7 +408,7 @@ mod test { - " FilterExec: false" - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]" "### ); @@ -422,7 +422,7 @@ mod test { - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]" "### ); @@ -468,7 +468,7 @@ mod test { --- - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]" "### ); @@ -484,7 +484,7 @@ mod test { --- - " ProjectionExec: expr=[tag1@1 as tag1, time@3 as time]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]" "### ); @@ -513,7 +513,7 @@ mod test { - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" - " FilterExec: false AND tag1@1 = CAST(foo AS Dictionary(Int32, Utf8))" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]" "### ); @@ -526,7 +526,7 @@ mod test { --- - " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]" "### ); @@ -577,7 +577,7 @@ mod test { - " FilterExec: time@3 > 100" - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]" "### ); @@ -595,7 +595,7 @@ mod test { - " FilterExec: time@3 > 100" - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]" "### ); @@ -619,7 +619,7 @@ mod test { - " FilterExec: false AND time@3 > 100" - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]" "### ); @@ -634,7 +634,7 @@ mod test { - " FilterExec: time@3 > 100" - " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]" - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]" "### ); diff --git a/iox_query/src/provider/physical.rs b/iox_query/src/provider/physical.rs index a94fbdee40..9616653c9d 100644 --- a/iox_query/src/provider/physical.rs +++ b/iox_query/src/provider/physical.rs @@ -470,7 +470,7 @@ mod tests { @r###" --- - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" "### ); } @@ -546,7 +546,7 @@ mod tests { @r###" --- - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[0.parquet]]}" "### ); @@ -575,7 +575,7 @@ mod tests { @r###" --- - " UnionExec" - - " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0" + - " RecordBatchesExec: chunks=1" - " ParquetExec: file_groups={1 group: [[0.parquet]]}, projection=[tag, __chunk_order], output_ordering=[__chunk_order@1 ASC]" "### ); diff --git a/iox_query/src/provider/record_batch_exec.rs b/iox_query/src/provider/record_batch_exec.rs index 3959dca206..a1007fb45b 100644 --- a/iox_query/src/provider/record_batch_exec.rs +++ b/iox_query/src/provider/record_batch_exec.rs @@ -3,10 +3,7 @@ use crate::{statistics::DFStatsAggregator, QueryChunk, CHUNK_ORDER_COLUMN_NAME}; use super::adapter::SchemaAdapterStream; -use arrow::{ - datatypes::{Schema, SchemaRef}, - record_batch::RecordBatch, -}; +use arrow::datatypes::{Schema, SchemaRef}; use datafusion::{ error::DataFusionError, execution::context::TaskContext, @@ -28,10 +25,13 @@ use std::{ }; /// Implements the DataFusion physical plan interface for [`RecordBatch`]es with automatic projection and NULL-column creation. +/// +/// +/// [`RecordBatch`]: arrow::record_batch::RecordBatch #[derive(Debug)] pub(crate) struct RecordBatchesExec { /// Chunks contained in this exec node. - chunks: Vec<(Arc, Vec)>, + chunks: Vec>, /// Overall schema. schema: SchemaRef, @@ -64,47 +64,34 @@ impl RecordBatchesExec { let chunk_order_only_schema = chunk_order_field.map(|field| Schema::new(vec![field.clone()])); - let chunks: Vec<_> = chunks - .into_iter() - .map(|chunk| { - let batches = chunk - .data() - .into_record_batches() - .expect("chunk must have record batches"); - - (chunk, batches) - }) - .collect(); + let chunks: Vec<_> = chunks.into_iter().collect(); let statistics = chunks .iter() - .fold( - DFStatsAggregator::new(&schema), - |mut agg, (chunk, _batches)| { - agg.update(&chunk.stats(), chunk.schema().as_arrow().as_ref()); + .fold(DFStatsAggregator::new(&schema), |mut agg, chunk| { + agg.update(&chunk.stats(), chunk.schema().as_arrow().as_ref()); - if let Some(schema) = chunk_order_only_schema.as_ref() { - let order = chunk.order().get(); - let order = ScalarValue::from(order); - agg.update( - &Statistics { - num_rows: Some(0), - total_byte_size: Some(0), - column_statistics: Some(vec![ColumnStatistics { - null_count: Some(0), - max_value: Some(order.clone()), - min_value: Some(order), - distinct_count: Some(1), - }]), - is_exact: true, - }, - schema, - ); - } + if let Some(schema) = chunk_order_only_schema.as_ref() { + let order = chunk.order().get(); + let order = ScalarValue::from(order); + agg.update( + &Statistics { + num_rows: Some(0), + total_byte_size: Some(0), + column_statistics: Some(vec![ColumnStatistics { + null_count: Some(0), + max_value: Some(order.clone()), + min_value: Some(order), + distinct_count: Some(1), + }]), + is_exact: true, + }, + schema, + ); + } - agg - }, - ) + agg + }) .build(); let output_ordering = if chunk_order_field.is_some() { @@ -134,7 +121,7 @@ impl RecordBatchesExec { /// Chunks that make up this node. pub fn chunks(&self) -> impl Iterator> { - self.chunks.iter().map(|(chunk, _batches)| chunk) + self.chunks.iter() } /// Sort key that was passed to [`chunks_to_physical_nodes`]. @@ -190,7 +177,7 @@ impl ExecutionPlan for RecordBatchesExec { let schema = self.schema(); - let (chunk, batches) = &self.chunks[partition]; + let chunk = &self.chunks[partition]; let part_schema = chunk.schema().as_arrow(); // The output selection is all the columns in the schema. @@ -216,6 +203,10 @@ impl ExecutionPlan for RecordBatchesExec { .map(|projection| Arc::new(part_schema.project(projection).expect("projection broken"))) .unwrap_or(part_schema); + let batches = chunk.data().into_record_batches().ok_or_else(|| { + DataFusionError::Execution(String::from("chunk must contain record batches")) + })?; + let stream = Box::pin(MemoryStream::try_new( batches.clone(), incomplete_output_schema, @@ -245,26 +236,9 @@ impl ExecutionPlan for RecordBatchesExec { impl DisplayAs for RecordBatchesExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let total_groups = self.chunks.len(); - - let total_batches = self - .chunks - .iter() - .map(|(_chunk, batches)| batches.len()) - .sum::(); - - let total_rows = self - .chunks - .iter() - .flat_map(|(_chunk, batches)| batches.iter().map(|batch| batch.num_rows())) - .sum::(); - match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!( - f, - "RecordBatchesExec: batches_groups={total_groups} batches={total_batches} total_rows={total_rows}", - ) + write!(f, "RecordBatchesExec: chunks={}", self.chunks.len(),) } } } diff --git a/iox_query_influxrpc/src/scan_plan.rs b/iox_query_influxrpc/src/scan_plan.rs index e20c0f5819..661a4ee94c 100644 --- a/iox_query_influxrpc/src/scan_plan.rs +++ b/iox_query_influxrpc/src/scan_plan.rs @@ -212,7 +212,7 @@ mod tests { - " DeduplicateExec: [tag1@3 ASC,time@4 ASC]" - " SortPreservingMergeExec: [tag1@3 ASC,time@4 ASC,__chunk_order@0 ASC]" - " SortExec: expr=[tag1@3 ASC,time@4 ASC,__chunk_order@0 ASC]" - - " RecordBatchesExec: batches_groups=2 batches=2 total_rows=9" + - " RecordBatchesExec: chunks=2" "### );