refactor: unpack record batches later during query (#8663)

For #8350 we want to be able to stream record batches from the ingester
instead of waiting to buffer them fully before the query starts. Hence
we can no longer inspect the batches in the "display" implementation of
the plan.

This change mostly contains the display change, not the actual streaming
part. I'll do that in a follow-up.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-09-06 10:08:54 +02:00 committed by GitHub
parent ecc3a2c416
commit d0d355ba4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 122 additions and 148 deletions

View File

@ -19,7 +19,7 @@
| | SortPreservingMergeExec: [tag@3 ASC,time@4 ASC,__chunk_order@0 ASC] |
| | UnionExec |
| | SortExec: expr=[tag@3 ASC,time@4 ASC,__chunk_order@0 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 |
| | RecordBatchesExec: chunks=1 |
| | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[__chunk_order, bar, foo, tag, time], output_ordering=[tag@3 ASC, time@4 ASC, __chunk_order@0 ASC] |
| | |
----------
@ -43,7 +43,7 @@
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: tag@3 = A |
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 |
| | RecordBatchesExec: chunks=1 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: tag@3 = A |
| | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[__chunk_order, bar, foo, tag, time], output_ordering=[tag@3 ASC, time@4 ASC, __chunk_order@0 ASC], predicate=tag@2 = A, pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 |
@ -68,7 +68,7 @@
| | SortPreservingMergeExec: [tag@3 ASC,time@4 ASC,__chunk_order@0 ASC] |
| | UnionExec |
| | SortExec: expr=[tag@3 ASC,time@4 ASC,__chunk_order@0 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 |
| | RecordBatchesExec: chunks=1 |
| | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[__chunk_order, bar, foo, tag, time], output_ordering=[tag@3 ASC, time@4 ASC, __chunk_order@0 ASC] |
| | |
----------
@ -95,7 +95,7 @@
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@4 = 0 |
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 |
| | RecordBatchesExec: chunks=1 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@4 = 0 |
| | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[__chunk_order, bar, foo, tag, time], output_ordering=[tag@3 ASC, time@4 ASC, __chunk_order@0 ASC], predicate=time@3 = 0, pruning_predicate=time_min@0 <= 0 AND 0 <= time_max@1 |
@ -123,7 +123,7 @@
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: tag@3 = A AND time@4 = 0 |
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 |
| | RecordBatchesExec: chunks=1 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: tag@3 = A AND time@4 = 0 |
| | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[__chunk_order, bar, foo, tag, time], output_ordering=[tag@3 ASC, time@4 ASC, __chunk_order@0 ASC], predicate=tag@2 = A AND time@3 = 0, pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3 |

View File

@ -44,7 +44,7 @@
| | ProjectionExec: expr=[time@1 as time, state@2 as state, city@3 as city, min_temp@4 as min_temp, max_temp@5 as max_temp, area@6 as area] |
| | DeduplicateExec: [city@3 ASC,state@2 ASC,time@1 ASC] |
| | SortExec: expr=[city@3 ASC,state@2 ASC,time@1 ASC,__chunk_order@0 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=6 |
| | RecordBatchesExec: chunks=1 |
| | |
----------
-- SQL: EXPLAIN select time, state, city, min_temp, max_temp, area from h2o;
@ -63,7 +63,7 @@
| | ProjectionExec: expr=[time@1 as time, state@2 as state, city@3 as city, min_temp@4 as min_temp, max_temp@5 as max_temp, area@6 as area] |
| | DeduplicateExec: [city@3 ASC,state@2 ASC,time@1 ASC] |
| | SortExec: expr=[city@3 ASC,state@2 ASC,time@1 ASC,__chunk_order@0 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=6 |
| | RecordBatchesExec: chunks=1 |
| | |
----------
-- SQL: EXPLAIN select state as name from h2o UNION ALL select city as name from h2o;
@ -87,7 +87,7 @@
| | ProjectionExec: expr=[state@2 as state] |
| | DeduplicateExec: [city@1 ASC,state@2 ASC,time@3 ASC] |
| | SortExec: expr=[city@1 ASC,state@2 ASC,time@3 ASC,__chunk_order@0 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=6 |
| | RecordBatchesExec: chunks=1 |
| | ProjectionExec: expr=[city@0 as name] |
| | UnionExec |
| | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[city] |
@ -98,7 +98,7 @@
| | ProjectionExec: expr=[city@1 as city] |
| | DeduplicateExec: [city@1 ASC,state@2 ASC,time@3 ASC] |
| | SortExec: expr=[city@1 ASC,state@2 ASC,time@3 ASC,__chunk_order@0 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=6 |
| | RecordBatchesExec: chunks=1 |
| | |
----------
-- SQL: select count(*) from h2o;

View File

@ -22,7 +22,7 @@
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC,__chunk_order@0 ASC] |
| | UnionExec |
| | SortExec: expr=[tag@2 ASC,time@3 ASC,__chunk_order@0 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 |
| | RecordBatchesExec: chunks=1 |
| | ParquetExec: file_groups={10 groups: [[1/1/1/00000000-0000-0000-0000-00000000000a.parquet], [1/1/1/00000000-0000-0000-0000-00000000000b.parquet], [1/1/1/00000000-0000-0000-0000-00000000000c.parquet], [1/1/1/00000000-0000-0000-0000-00000000000d.parquet], [1/1/1/00000000-0000-0000-0000-00000000000e.parquet], ...]}, projection=[__chunk_order, f, tag, time], output_ordering=[tag@2 ASC, time@3 ASC, __chunk_order@0 ASC] |
| | |
----------

View File

@ -25,7 +25,7 @@
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@3 > <REDACTED> |
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 |
| | RecordBatchesExec: chunks=1 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@3 > <REDACTED> |
| | ParquetExec: file_groups={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet], [1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, projection=[__chunk_order, host, load, time], output_ordering=[host@1 ASC, time@3 ASC, __chunk_order@0 ASC], predicate=time@2 > <REDACTED>, pruning_predicate=time_max@0 > <REDACTED> |
@ -56,7 +56,7 @@
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: host@1 != b AND time@3 > <REDACTED> |
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 |
| | RecordBatchesExec: chunks=1 |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: host@1 != b AND time@3 > <REDACTED> |
| | ParquetExec: file_groups={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet], [1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, projection=[__chunk_order, host, load, time], output_ordering=[host@1 ASC, time@3 ASC, __chunk_order@0 ASC], predicate=host@0 != b AND time@2 > <REDACTED>, pruning_predicate=(host_min@0 != b OR b != host_max@1) AND time_max@2 > <REDACTED> |

View File

@ -27,7 +27,7 @@
| | ProjectionExec: expr=[city@1 as city, other_temp@2 as other_temp, state@3 as state, temp@4 as temp, time@5 as time] |
| | DeduplicateExec: [city@1 ASC,state@3 ASC,time@5 ASC] |
| | SortExec: expr=[city@1 ASC,state@3 ASC,time@5 ASC,__chunk_order@0 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 |
| | RecordBatchesExec: chunks=1 |
| | |
----------
-- SQL: select temp, other_temp, time from h2o;
@ -59,7 +59,7 @@
| | ProjectionExec: expr=[temp@3 as temp, other_temp@4 as other_temp, time@5 as time] |
| | DeduplicateExec: [city@1 ASC,state@2 ASC,time@5 ASC] |
| | SortExec: expr=[city@1 ASC,state@2 ASC,time@5 ASC,__chunk_order@0 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 |
| | RecordBatchesExec: chunks=1 |
| | |
----------
-- SQL: EXPLAIN SELECT * from h2o where time >= to_timestamp('1970-01-01T00:00:00.000000250+00:00');
@ -83,6 +83,6 @@
| | SortExec: expr=[city@1 ASC,state@3 ASC,time@5 ASC,__chunk_order@0 ASC] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@5 >= 250 |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 |
| | RecordBatchesExec: chunks=1 |
| | |
----------

View File

@ -19,7 +19,7 @@
| | SortPreservingMergeExec: [city@1 ASC,state@3 ASC,time@5 ASC,__chunk_order@0 ASC] |
| | UnionExec |
| | SortExec: expr=[city@1 ASC,state@3 ASC,time@5 ASC,__chunk_order@0 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 |
| | RecordBatchesExec: chunks=1 |
| | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[__chunk_order, city, other_temp, state, temp, time], output_ordering=[city@1 ASC, state@3 ASC, time@5 ASC, __chunk_order@0 ASC] |
| | |
----------
@ -43,7 +43,7 @@
| | SortPreservingMergeExec: [city@1 ASC,state@2 ASC,time@5 ASC,__chunk_order@0 ASC] |
| | UnionExec |
| | SortExec: expr=[city@1 ASC,state@2 ASC,time@5 ASC,__chunk_order@0 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 |
| | RecordBatchesExec: chunks=1 |
| | ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[__chunk_order, city, state, temp, other_temp, time], output_ordering=[city@1 ASC, state@2 ASC, time@5 ASC, __chunk_order@0 ASC] |
| | |
----------

View File

@ -417,12 +417,12 @@ mod test {
- " SortPreservingMergeExec: [tag1@2 DESC,time@3 ASC NULLS LAST]"
- " UnionExec"
- " SortExec: expr=[tag1@2 DESC,time@3 ASC NULLS LAST]"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=5"
- " RecordBatchesExec: chunks=1"
- " SortExec: expr=[tag1@2 DESC,time@3 ASC NULLS LAST]"
- " ProjectionExec: expr=[field_int@1 as field_int, field_int2@2 as field_int2, tag1@3 as tag1, time@4 as time]"
- " DeduplicateExec: [tag1@3 ASC,time@4 ASC]"
- " SortExec: expr=[tag1@3 ASC,time@4 ASC,__chunk_order@0 ASC]"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=4"
- " RecordBatchesExec: chunks=1"
"###
);
@ -486,12 +486,12 @@ mod test {
- " SortPreservingMergeExec: [time@3 ASC NULLS LAST,tag1@2 ASC]"
- " UnionExec"
- " SortExec: expr=[time@3 ASC NULLS LAST,tag1@2 ASC]"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=5"
- " RecordBatchesExec: chunks=1"
- " SortExec: expr=[time@3 ASC NULLS LAST,tag1@2 ASC]"
- " ProjectionExec: expr=[field_int@1 as field_int, field_int2@2 as field_int2, tag1@3 as tag1, time@4 as time]"
- " DeduplicateExec: [tag1@3 ASC,time@4 ASC]"
- " SortExec: expr=[tag1@3 ASC,time@4 ASC,__chunk_order@0 ASC]"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=4"
- " RecordBatchesExec: chunks=1"
"###
);
@ -567,12 +567,12 @@ mod test {
- " SortPreservingMergeExec: [time@3 ASC NULLS LAST,tag1@2 ASC]"
- " UnionExec"
- " SortExec: expr=[time@3 ASC NULLS LAST,tag1@2 ASC]"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=5"
- " RecordBatchesExec: chunks=1"
- " SortExec: expr=[time@3 ASC NULLS LAST,tag1@2 ASC]"
- " ProjectionExec: expr=[field_int@1 as field_int, field_int2@2 as field_int2, tag1@3 as tag1, time@4 as time]"
- " DeduplicateExec: [tag1@3 ASC,time@4 ASC]"
- " SortExec: expr=[tag1@3 ASC,time@4 ASC,__chunk_order@0 ASC]"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=4"
- " RecordBatchesExec: chunks=1"
"###
);

View File

@ -114,15 +114,15 @@ mod tests {
input:
- " UnionExec"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[2.parquet]]}"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={2 groups: [[4.parquet], [5.parquet]]}"
output:
Ok:
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={2 groups: [[2.parquet, 5.parquet], [4.parquet]]}"
"###
);
@ -211,13 +211,13 @@ mod tests {
- " UnionExec"
- " FilterExec: false"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
output:
Ok:
- " UnionExec"
- " FilterExec: false"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
"###
);
}

View File

@ -150,13 +150,13 @@ mod tests {
input:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={1 group: [[3.parquet]]}, projection=[field, tag1, tag2, time]"
output:
Ok:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={1 group: [[3.parquet]]}, projection=[field, tag1, tag2, time]"
"###
);
@ -183,18 +183,18 @@ mod tests {
input:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={2 groups: [[3.parquet, 5.parquet], [4.parquet, 6.parquet]]}, projection=[field, tag1, tag2, time]"
output:
Ok:
- " UnionExec"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={2 groups: [[3.parquet, 6.parquet], [5.parquet]]}, projection=[field, tag1, tag2, time]"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[4.parquet]]}, projection=[field, tag1, tag2, time]"
"###
);
@ -238,18 +238,18 @@ mod tests {
input:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={2 groups: [[3.parquet, 5.parquet], [4.parquet, 6.parquet]]}, projection=[field, tag1, tag2, time]"
output:
Ok:
- " UnionExec"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={2 groups: [[3.parquet, 6.parquet], [5.parquet]]}, projection=[field, tag1, tag2, time]"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[4.parquet]]}, projection=[field, tag1, tag2, time]"
"###
);
@ -275,12 +275,12 @@ mod tests {
input:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=3 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=3"
output:
Ok:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=3 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=3"
"###
);
}

View File

@ -101,11 +101,11 @@ mod tests {
input:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
output:
Ok:
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
"###
);
}
@ -123,12 +123,12 @@ mod tests {
input:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
output:
Ok:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
"###
);
}
@ -147,12 +147,12 @@ mod tests {
input:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
output:
Ok:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
"###
);
}

View File

@ -145,13 +145,13 @@ mod tests {
input:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={1 group: [[3.parquet]]}, projection=[field, tag1, tag2, time]"
output:
Ok:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={1 group: [[3.parquet]]}, projection=[field, tag1, tag2, time]"
"###
);
@ -186,18 +186,18 @@ mod tests {
input:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={2 groups: [[3.parquet, 5.parquet], [4.parquet, 6.parquet]]}, projection=[field, tag1, tag2, time]"
output:
Ok:
- " UnionExec"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={2 groups: [[6.parquet, 5.parquet], [3.parquet]]}, projection=[field, tag1, tag2, time]"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[4.parquet]]}, projection=[field, tag1, tag2, time]"
"###
);
@ -223,12 +223,12 @@ mod tests {
input:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=3 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=3"
output:
Ok:
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=3 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=3"
"###
);
}

View File

@ -1297,10 +1297,10 @@ mod tests {
---
input:
- " ProjectionExec: expr=[tag1@0 as tag1]"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
output:
Ok:
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
"###
);

View File

@ -612,13 +612,13 @@ mod tests {
input:
- " DeduplicateExec: [col2@1 ASC,col1@0 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=0 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=0"
- " ParquetExec: file_groups={1 group: [[1.parquet, 2.parquet]]}, projection=[col1, col2, col3, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, __chunk_order@3 ASC]"
output:
Ok:
- " DeduplicateExec: [col2@1 ASC,col1@0 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=0 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=0"
- " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, col3, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, __chunk_order@3 ASC]"
"###
);

View File

@ -266,7 +266,7 @@ mod test {
- " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8"
- " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]"
output:
Ok:
@ -275,7 +275,7 @@ mod test {
- " SortPreservingRepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4"
- " UnionExec"
- " SortExec: expr=[col2@1 ASC,col1@0 ASC,time@3 ASC,__chunk_order@4 ASC]"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]"
"###
);
@ -317,7 +317,7 @@ mod test {
- " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8"
- " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]"
output:
Ok:
@ -327,7 +327,7 @@ mod test {
- " SortPreservingRepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4"
- " UnionExec"
- " SortExec: expr=[col2@1 ASC,col1@0 ASC,time@3 ASC,__chunk_order@4 ASC]"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]"
"###
);
@ -358,14 +358,14 @@ mod test {
- " DeduplicateExec: [col2@1 ASC,col1@0 ASC,time@3 ASC,__chunk_order@4 ASC]"
- " SortExec: expr=[col2@1 ASC,col1@0 ASC,time@3 ASC,__chunk_order@4 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]"
output:
Ok:
- " DeduplicateExec: [col2@1 ASC,col1@0 ASC,time@3 ASC,__chunk_order@4 ASC]"
- " UnionExec"
- " SortExec: expr=[col2@1 ASC,col1@0 ASC,time@3 ASC,__chunk_order@4 ASC]"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]"
"###
);
@ -402,8 +402,8 @@ mod test {
- " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8"
- " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " RecordBatchesExec: chunks=2"
output:
Ok:
- " DeduplicateExec: [col2@1 ASC,col1@0 ASC,time@3 ASC,__chunk_order@4 ASC]"
@ -411,8 +411,8 @@ mod test {
- " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8"
- " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " RecordBatchesExec: chunks=2"
"###
);
}
@ -537,7 +537,7 @@ mod test {
- " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8"
- " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]"
output:
Ok:
@ -546,7 +546,7 @@ mod test {
- " SortPreservingRepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4"
- " UnionExec"
- " SortExec: expr=[col2@1 ASC,col1@0 ASC,time@3 ASC,__chunk_order@4 ASC]"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]"
"###
);
@ -586,7 +586,7 @@ mod test {
- " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8"
- " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]"
output:
Ok:
@ -596,7 +596,7 @@ mod test {
- " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8"
- " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col2@1 ASC, col1@0 ASC, time@3 ASC, __chunk_order@4 ASC]"
"###
);
@ -635,7 +635,7 @@ mod test {
- " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8"
- " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col1@0 ASC, col2@1 ASC, time@3 ASC, __chunk_order@4 ASC]"
output:
Ok:
@ -644,7 +644,7 @@ mod test {
- " RepartitionExec: partitioning=Hash([col2@1, col1@0, time@3, __chunk_order@4], 8), input_partitions=8"
- " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=4"
- " UnionExec"
- " RecordBatchesExec: batches_groups=2 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=2"
- " ParquetExec: file_groups={2 groups: [[1.parquet], [2.parquet]]}, projection=[col1, col2, field1, time, __chunk_order], output_ordering=[col1@0 ASC, col2@1 ASC, time@3 ASC, __chunk_order@4 ASC]"
"###
);

View File

@ -370,7 +370,7 @@ mod test {
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]"
"###
);
@ -387,7 +387,7 @@ mod test {
- " ProjectionExec: expr=[tag1@1 as tag1, time@3 as time]"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]"
"###
);
@ -408,7 +408,7 @@ mod test {
- " FilterExec: false"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]"
"###
);
@ -422,7 +422,7 @@ mod test {
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]"
"###
);
@ -468,7 +468,7 @@ mod test {
---
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]"
"###
);
@ -484,7 +484,7 @@ mod test {
---
- " ProjectionExec: expr=[tag1@1 as tag1, time@3 as time]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]"
"###
);
@ -513,7 +513,7 @@ mod test {
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " FilterExec: false AND tag1@1 = CAST(foo AS Dictionary(Int32, Utf8))"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]"
"###
);
@ -526,7 +526,7 @@ mod test {
---
- " ProjectionExec: expr=[field@0 as field, tag1@1 as tag1, tag2@2 as tag2, time@3 as time]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]"
"###
);
@ -577,7 +577,7 @@ mod test {
- " FilterExec: time@3 > 100"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]"
"###
);
@ -595,7 +595,7 @@ mod test {
- " FilterExec: time@3 > 100"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]"
"###
);
@ -619,7 +619,7 @@ mod test {
- " FilterExec: false AND time@3 > 100"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]"
"###
);
@ -634,7 +634,7 @@ mod test {
- " FilterExec: time@3 > 100"
- " DeduplicateExec: [tag1@1 ASC,tag2@2 ASC,time@3 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[2.parquet]]}, projection=[field, tag1, tag2, time, __chunk_order], output_ordering=[__chunk_order@4 ASC]"
"###
);

View File

@ -470,7 +470,7 @@ mod tests {
@r###"
---
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
"###
);
}
@ -546,7 +546,7 @@ mod tests {
@r###"
---
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[0.parquet]]}"
"###
);
@ -575,7 +575,7 @@ mod tests {
@r###"
---
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=0 total_rows=0"
- " RecordBatchesExec: chunks=1"
- " ParquetExec: file_groups={1 group: [[0.parquet]]}, projection=[tag, __chunk_order], output_ordering=[__chunk_order@1 ASC]"
"###
);

View File

@ -3,10 +3,7 @@
use crate::{statistics::DFStatsAggregator, QueryChunk, CHUNK_ORDER_COLUMN_NAME};
use super::adapter::SchemaAdapterStream;
use arrow::{
datatypes::{Schema, SchemaRef},
record_batch::RecordBatch,
};
use arrow::datatypes::{Schema, SchemaRef};
use datafusion::{
error::DataFusionError,
execution::context::TaskContext,
@ -28,10 +25,13 @@ use std::{
};
/// Implements the DataFusion physical plan interface for [`RecordBatch`]es with automatic projection and NULL-column creation.
///
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
#[derive(Debug)]
pub(crate) struct RecordBatchesExec {
/// Chunks contained in this exec node.
chunks: Vec<(Arc<dyn QueryChunk>, Vec<RecordBatch>)>,
chunks: Vec<Arc<dyn QueryChunk>>,
/// Overall schema.
schema: SchemaRef,
@ -64,47 +64,34 @@ impl RecordBatchesExec {
let chunk_order_only_schema =
chunk_order_field.map(|field| Schema::new(vec![field.clone()]));
let chunks: Vec<_> = chunks
.into_iter()
.map(|chunk| {
let batches = chunk
.data()
.into_record_batches()
.expect("chunk must have record batches");
(chunk, batches)
})
.collect();
let chunks: Vec<_> = chunks.into_iter().collect();
let statistics = chunks
.iter()
.fold(
DFStatsAggregator::new(&schema),
|mut agg, (chunk, _batches)| {
agg.update(&chunk.stats(), chunk.schema().as_arrow().as_ref());
.fold(DFStatsAggregator::new(&schema), |mut agg, chunk| {
agg.update(&chunk.stats(), chunk.schema().as_arrow().as_ref());
if let Some(schema) = chunk_order_only_schema.as_ref() {
let order = chunk.order().get();
let order = ScalarValue::from(order);
agg.update(
&Statistics {
num_rows: Some(0),
total_byte_size: Some(0),
column_statistics: Some(vec![ColumnStatistics {
null_count: Some(0),
max_value: Some(order.clone()),
min_value: Some(order),
distinct_count: Some(1),
}]),
is_exact: true,
},
schema,
);
}
if let Some(schema) = chunk_order_only_schema.as_ref() {
let order = chunk.order().get();
let order = ScalarValue::from(order);
agg.update(
&Statistics {
num_rows: Some(0),
total_byte_size: Some(0),
column_statistics: Some(vec![ColumnStatistics {
null_count: Some(0),
max_value: Some(order.clone()),
min_value: Some(order),
distinct_count: Some(1),
}]),
is_exact: true,
},
schema,
);
}
agg
},
)
agg
})
.build();
let output_ordering = if chunk_order_field.is_some() {
@ -134,7 +121,7 @@ impl RecordBatchesExec {
/// Chunks that make up this node.
pub fn chunks(&self) -> impl Iterator<Item = &Arc<dyn QueryChunk>> {
self.chunks.iter().map(|(chunk, _batches)| chunk)
self.chunks.iter()
}
/// Sort key that was passed to [`chunks_to_physical_nodes`].
@ -190,7 +177,7 @@ impl ExecutionPlan for RecordBatchesExec {
let schema = self.schema();
let (chunk, batches) = &self.chunks[partition];
let chunk = &self.chunks[partition];
let part_schema = chunk.schema().as_arrow();
// The output selection is all the columns in the schema.
@ -216,6 +203,10 @@ impl ExecutionPlan for RecordBatchesExec {
.map(|projection| Arc::new(part_schema.project(projection).expect("projection broken")))
.unwrap_or(part_schema);
let batches = chunk.data().into_record_batches().ok_or_else(|| {
DataFusionError::Execution(String::from("chunk must contain record batches"))
})?;
let stream = Box::pin(MemoryStream::try_new(
batches.clone(),
incomplete_output_schema,
@ -245,26 +236,9 @@ impl ExecutionPlan for RecordBatchesExec {
impl DisplayAs for RecordBatchesExec {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let total_groups = self.chunks.len();
let total_batches = self
.chunks
.iter()
.map(|(_chunk, batches)| batches.len())
.sum::<usize>();
let total_rows = self
.chunks
.iter()
.flat_map(|(_chunk, batches)| batches.iter().map(|batch| batch.num_rows()))
.sum::<usize>();
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"RecordBatchesExec: batches_groups={total_groups} batches={total_batches} total_rows={total_rows}",
)
write!(f, "RecordBatchesExec: chunks={}", self.chunks.len(),)
}
}
}

View File

@ -212,7 +212,7 @@ mod tests {
- " DeduplicateExec: [tag1@3 ASC,time@4 ASC]"
- " SortPreservingMergeExec: [tag1@3 ASC,time@4 ASC,__chunk_order@0 ASC]"
- " SortExec: expr=[tag1@3 ASC,time@4 ASC,__chunk_order@0 ASC]"
- " RecordBatchesExec: batches_groups=2 batches=2 total_rows=9"
- " RecordBatchesExec: chunks=2"
"###
);