chore: Update datafusion (#6997)

* chore: Update datafusion

* chore: update the plans

* fix: update some plans

* chore: Update plans and port some explain plans to use insta snapshots

* fix: another plan

* chore: Run cargo hakari tasks

---------

Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2023-02-16 18:03:25 +01:00 committed by GitHub
parent fea5245148
commit 27890b313f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 121 additions and 100 deletions

16
Cargo.lock generated
View File

@ -1421,7 +1421,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "18.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9#fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=253550c6c936f75f654dcdc9480025a9ef55d9fd#253550c6c936f75f654dcdc9480025a9ef55d9fd"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1467,7 +1467,7 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "18.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9#fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=253550c6c936f75f654dcdc9480025a9ef55d9fd#253550c6c936f75f654dcdc9480025a9ef55d9fd"
dependencies = [
"arrow",
"chrono",
@ -1480,7 +1480,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "18.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9#fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=253550c6c936f75f654dcdc9480025a9ef55d9fd#253550c6c936f75f654dcdc9480025a9ef55d9fd"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1492,7 +1492,7 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "18.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9#fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=253550c6c936f75f654dcdc9480025a9ef55d9fd#253550c6c936f75f654dcdc9480025a9ef55d9fd"
dependencies = [
"arrow",
"async-trait",
@ -1508,7 +1508,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "18.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9#fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=253550c6c936f75f654dcdc9480025a9ef55d9fd#253550c6c936f75f654dcdc9480025a9ef55d9fd"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1538,7 +1538,7 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "18.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9#fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=253550c6c936f75f654dcdc9480025a9ef55d9fd#253550c6c936f75f654dcdc9480025a9ef55d9fd"
dependencies = [
"arrow",
"chrono",
@ -1555,7 +1555,7 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "18.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9#fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=253550c6c936f75f654dcdc9480025a9ef55d9fd#253550c6c936f75f654dcdc9480025a9ef55d9fd"
dependencies = [
"arrow",
"datafusion-common",
@ -1566,7 +1566,7 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "18.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9#fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=253550c6c936f75f654dcdc9480025a9ef55d9fd#253550c6c936f75f654dcdc9480025a9ef55d9fd"
dependencies = [
"arrow-schema",
"datafusion-common",

View File

@ -117,8 +117,8 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies]
arrow = { version = "32.0.0" }
arrow-flight = { version = "32.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev="253550c6c936f75f654dcdc9480025a9ef55d9fd", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev="253550c6c936f75f654dcdc9480025a9ef55d9fd" }
hashbrown = { version = "0.13.2" }
parquet = { version = "32.0.0" }

View File

@ -14,7 +14,7 @@
| logical_plan | Sort: table.tag ASC NULLS LAST |
| | Projection: table.bar, table.foo, table.tag, table.time |
| | TableScan: table projection=[bar, foo, tag, time] |
| physical_plan | SortExec: [tag@2 ASC NULLS LAST] |
| physical_plan | SortExec: expr=[tag@2 ASC NULLS LAST] |
| | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
@ -87,7 +87,7 @@
| | Projection: table.bar, table.foo, table.tag, table.time |
| | Filter: table.time = TimestampNanosecond(0, None) |
| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.time = TimestampNanosecond(0, None)] |
| physical_plan | SortExec: [tag@2 ASC NULLS LAST] |
| physical_plan | SortExec: expr=[tag@2 ASC NULLS LAST] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | CoalesceBatchesExec: target_batch_size=8192 |

View File

@ -14,13 +14,13 @@
| logical_plan | Sort: table.tag ASC NULLS LAST |
| | Projection: table.bar, table.foo, table.tag, table.time |
| | TableScan: table projection=[bar, foo, tag, time] |
| physical_plan | SortExec: [tag@2 ASC NULLS LAST] |
| physical_plan | SortExec: expr=[tag@2 ASC NULLS LAST] |
| | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | SortExec: [tag@2 ASC,time@3 ASC] |
| | SortExec: expr=[tag@2 ASC,time@3 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 |
| | |
----------
@ -45,7 +45,7 @@
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | SortExec: [tag@2 ASC,time@3 ASC] |
| | SortExec: expr=[tag@2 ASC,time@3 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 |
| | |
----------
@ -70,7 +70,7 @@
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | SortExec: [tag@2 ASC,time@3 ASC] |
| | SortExec: expr=[tag@2 ASC,time@3 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 |
| | |
----------
@ -90,7 +90,7 @@
| | Projection: table.bar, table.foo, table.tag, table.time |
| | Filter: table.time = TimestampNanosecond(0, None) |
| | TableScan: table projection=[bar, foo, tag, time], partial_filters=[table.time = TimestampNanosecond(0, None)] |
| physical_plan | SortExec: [tag@2 ASC NULLS LAST] |
| physical_plan | SortExec: expr=[tag@2 ASC NULLS LAST] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[bar@0 as bar, foo@1 as foo, tag@2 as tag, time@3 as time] |
| | CoalesceBatchesExec: target_batch_size=8192 |
@ -100,7 +100,7 @@
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time = TimestampNanosecond(0, None), pruning_predicate=time_min@0 <= 0 AND 0 <= time_max@1, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | SortExec: [tag@2 ASC,time@3 ASC] |
| | SortExec: expr=[tag@2 ASC,time@3 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 |
| | |
----------
@ -125,7 +125,7 @@
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=tag = Dictionary(Int32, Utf8("A")) AND time = TimestampNanosecond(0, None), pruning_predicate=tag_min@0 <= A AND A <= tag_max@1 AND time_min@2 <= 0 AND 0 <= time_max@3, output_ordering=[tag@2 ASC, time@3 ASC], projection=[bar, foo, tag, time] |
| | SortExec: [tag@2 ASC,time@3 ASC] |
| | SortExec: expr=[tag@2 ASC,time@3 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=2 |
| | |
----------

View File

@ -31,7 +31,7 @@
| logical_plan | Sort: h2o.time ASC NULLS LAST, h2o.state ASC NULLS LAST, h2o.city ASC NULLS LAST |
| | Projection: h2o.time, h2o.state, h2o.city, h2o.min_temp, h2o.max_temp, h2o.area |
| | TableScan: h2o projection=[area, city, max_temp, min_temp, state, time] |
| physical_plan | SortExec: [time@0 ASC NULLS LAST,state@1 ASC NULLS LAST,city@2 ASC NULLS LAST] |
| physical_plan | SortExec: expr=[time@0 ASC NULLS LAST,state@1 ASC NULLS LAST,city@2 ASC NULLS LAST] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] |
| | UnionExec |
@ -41,7 +41,7 @@
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] |
| | DeduplicateExec: [city@1 ASC,state@4 ASC,time@5 ASC] |
| | SortExec: [city@1 ASC,state@4 ASC,time@5 ASC] |
| | SortExec: expr=[city@1 ASC,state@4 ASC,time@5 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=6 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, projection=[area, city, max_temp, min_temp, state, time] |
| | |
@ -61,7 +61,7 @@
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[state@4 ASC, city@1 ASC, time@5 ASC], projection=[area, city, max_temp, min_temp, state, time] |
| | DeduplicateExec: [city@1 ASC,state@4 ASC,time@5 ASC] |
| | SortExec: [city@1 ASC,state@4 ASC,time@5 ASC] |
| | SortExec: expr=[city@1 ASC,state@4 ASC,time@5 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=6 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, projection=[area, city, max_temp, min_temp, state, time] |
| | |
@ -87,7 +87,7 @@
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[state@1 ASC, city@0 ASC, time@2 ASC], projection=[city, state, time] |
| | ProjectionExec: expr=[state@1 as state] |
| | DeduplicateExec: [city@0 ASC,state@1 ASC,time@2 ASC] |
| | SortExec: [city@0 ASC,state@1 ASC,time@2 ASC] |
| | SortExec: expr=[city@0 ASC,state@1 ASC,time@2 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=6 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, projection=[state] |
| | ProjectionExec: expr=[city@0 as name] |
@ -100,7 +100,7 @@
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[state@1 ASC, city@0 ASC, time@2 ASC], projection=[city, state, time] |
| | ProjectionExec: expr=[city@0 as city] |
| | DeduplicateExec: [city@0 ASC,state@1 ASC,time@2 ASC] |
| | SortExec: [city@0 ASC,state@1 ASC,time@2 ASC] |
| | SortExec: expr=[city@0 ASC,state@1 ASC,time@2 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=6 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, projection=[city] |
| | |

View File

@ -31,7 +31,7 @@
| logical_plan | Sort: h2o.time ASC NULLS LAST, h2o.state ASC NULLS LAST, h2o.city ASC NULLS LAST |
| | Projection: h2o.time, h2o.state, h2o.city, h2o.min_temp, h2o.max_temp, h2o.area |
| | TableScan: h2o projection=[area, city, max_temp, min_temp, state, time] |
| physical_plan | SortExec: [time@0 ASC NULLS LAST,state@1 ASC NULLS LAST,city@2 ASC NULLS LAST] |
| physical_plan | SortExec: expr=[time@0 ASC NULLS LAST,state@1 ASC NULLS LAST,city@2 ASC NULLS LAST] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[time@5 as time, state@4 as state, city@1 as city, min_temp@3 as min_temp, max_temp@2 as max_temp, area@0 as area] |
| | UnionExec |

View File

@ -11,7 +11,7 @@
| | TableScan: cpu projection=[time, user], partial_filters=[cpu.time >= TimestampNanosecond(957528000000000000, None), cpu.time <= TimestampNanosecond(957531540000000000, None)] |
| physical_plan | ProjectionExec: expr=[date_bin_gapfill(IntervalDayTime("600000"),cpu.time,Utf8("1970-01-01T00:00:00Z"))@0 as minute, COUNT(cpu.user)@1 as COUNT(cpu.user)] |
| | GapFillExec: group_expr=[date_bin_gapfill(IntervalDayTime("600000"),cpu.time,Utf8("1970-01-01T00:00:00Z"))@0], aggr_expr=[COUNT(cpu.user)@1], stride=600000, time_range=Included("957528000000000000")..Included("957531540000000000") |
| | SortExec: [date_bin_gapfill(IntervalDayTime("600000"),cpu.time,Utf8("1970-01-01T00:00:00Z"))@0 ASC] |
| | SortExec: expr=[date_bin_gapfill(IntervalDayTime("600000"),cpu.time,Utf8("1970-01-01T00:00:00Z"))@0 ASC] |
| | CoalescePartitionsExec |
| | AggregateExec: mode=FinalPartitioned, gby=[date_bin_gapfill(IntervalDayTime("600000"),cpu.time,Utf8("1970-01-01T00:00:00Z"))@0 as date_bin_gapfill(IntervalDayTime("600000"),cpu.time,Utf8("1970-01-01T00:00:00Z"))], aggr=[COUNT(cpu.user)] |
| | CoalesceBatchesExec: target_batch_size=8192 |

View File

@ -16,7 +16,7 @@
| logical_plan | Sort: cpu.host ASC NULLS LAST, cpu.load ASC NULLS LAST, cpu.time ASC NULLS LAST |
| | Projection: cpu.host, cpu.load, cpu.time |
| | TableScan: cpu projection=[host, load, time] |
| physical_plan | SortExec: [host@0 ASC NULLS LAST,load@1 ASC NULLS LAST,time@2 ASC NULLS LAST] |
| physical_plan | SortExec: expr=[host@0 ASC NULLS LAST,load@1 ASC NULLS LAST,time@2 ASC NULLS LAST] |
| | ProjectionExec: expr=[host@0 as host, load@1 as load, time@2 as time] |
| | DeduplicateExec: [host@0 ASC,time@2 ASC] |
| | SortPreservingMergeExec: [host@0 ASC,time@2 ASC] |
@ -46,7 +46,7 @@
| | Projection: cpu.host, cpu.load, cpu.time |
| | Filter: cpu.host != Dictionary(Int32, Utf8("b")) |
| | TableScan: cpu projection=[host, load, time], partial_filters=[cpu.host != Dictionary(Int32, Utf8("b"))] |
| physical_plan | SortExec: [host@0 ASC NULLS LAST,time@2 ASC NULLS LAST] |
| physical_plan | SortExec: expr=[host@0 ASC NULLS LAST,time@2 ASC NULLS LAST] |
| | CoalescePartitionsExec |
| | ProjectionExec: expr=[host@0 as host, load@1 as load, time@2 as time] |
| | CoalesceBatchesExec: target_batch_size=8192 |

View File

@ -28,7 +28,7 @@
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | SortExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | SortExec: expr=[city@0 ASC,state@2 ASC,time@4 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, projection=[city, other_temp, state, temp, time] |
| | |
@ -64,7 +64,7 @@
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | ProjectionExec: expr=[other_temp@1 as other_temp, temp@3 as temp, time@4 as time] |
| | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | SortExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | SortExec: expr=[city@0 ASC,state@2 ASC,time@4 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, projection=[other_temp, temp, time] |
| | |
@ -88,7 +88,7 @@
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, predicate=time >= TimestampNanosecond(250, None), pruning_predicate=time_max@0 >= 250, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, predicate=time >= TimestampNanosecond(250, None), pruning_predicate=time_max@0 >= 250, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | DeduplicateExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | SortExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | SortExec: expr=[city@0 ASC,state@2 ASC,time@4 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000002.parquet]]}, predicate=time >= TimestampNanosecond(250, None), pruning_predicate=time_max@0 >= 250, projection=[city, other_temp, state, temp, time] |
| | |

View File

@ -20,7 +20,7 @@
| | SortPreservingMergeExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | SortExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | SortExec: expr=[city@0 ASC,state@2 ASC,time@4 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 |
| | |
----------
@ -45,7 +45,7 @@
| | SortPreservingMergeExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | UnionExec |
| | ParquetExec: limit=None, partitions={1 group: [[1/1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, output_ordering=[city@0 ASC, state@2 ASC, time@4 ASC], projection=[city, other_temp, state, temp, time] |
| | SortExec: [city@0 ASC,state@2 ASC,time@4 ASC] |
| | SortExec: expr=[city@0 ASC,state@2 ASC,time@4 ASC] |
| | RecordBatchesExec: batches_groups=1 batches=1 total_rows=1 |
| | |
----------

View File

@ -459,7 +459,10 @@ mod test {
ops::{Bound, Range},
};
use crate::exec::{Executor, ExecutorType};
use crate::{
exec::{Executor, ExecutorType},
test::{format_execution_plan, format_logical_plan},
};
use super::*;
use arrow::{
@ -472,7 +475,7 @@ mod test {
datasource::empty::EmptyTable,
error::Result,
logical_expr::{logical_plan, Extension},
physical_plan::{collect, displayable, expressions::lit as phys_lit, memory::MemoryExec},
physical_plan::{collect, expressions::lit as phys_lit, memory::MemoryExec},
prelude::{col, lit, lit_timestamp_nano, SessionConfig, SessionContext},
scalar::ScalarValue,
};
@ -548,46 +551,52 @@ mod test {
let plan = LogicalPlan::Extension(Extension {
node: Arc::new(gapfill),
});
let expected = "GapFill: groupBy=[[loc, time]], aggr=[[temp]], time_column=time, stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))\
\n TableScan: temps";
assert_eq!(expected, format!("{}", plan.display_indent()));
insta::assert_yaml_snapshot!(
format_logical_plan(&plan),
@r###"
---
- " GapFill: groupBy=[[loc, time]], aggr=[[temp]], time_column=time, stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))"
- " TableScan: temps"
"###
);
Ok(())
}
async fn assert_explain(sql: &str, expected: &str) -> Result<()> {
async fn format_explain(sql: &str) -> Result<Vec<String>> {
let executor = Executor::new_testing();
let context = executor.new_context(ExecutorType::Query);
context
.inner()
.register_table("temps", Arc::new(EmptyTable::new(Arc::new(schema()))))?;
let physical_plan = context.prepare_sql(sql).await?;
let actual_plan = displayable(physical_plan.as_ref()).indent().to_string();
let actual_iter = actual_plan.split('\n');
let expected = expected.split('\n');
expected.zip(actual_iter).for_each(|(expected, actual)| {
assert_eq!(expected, actual, "\ncomplete plan was:\n{actual_plan:?}\n")
});
Ok(())
Ok(format_execution_plan(&physical_plan))
}
#[tokio::test]
async fn plan_gap_fill() -> Result<()> {
// show that the optimizer rule can fire and that physical
// planning will succeed.
let dbg_args = "IntervalDayTime(\"60000\"),temps.time,Utf8(\"1970-01-01T00:00:00Z\")";
assert_explain(
"SELECT date_bin_gapfill(interval '1 minute', time, timestamp '1970-01-01T00:00:00Z') AS minute, avg(temp)\
\nFROM temps\
\nWHERE time >= '1980-01-01T00:00:00Z' and time < '1981-01-01T00:00:00Z'
\nGROUP BY minute;",
format!(
"ProjectionExec: expr=[date_bin_gapfill({dbg_args})@0 as minute, AVG(temps.temp)@1 as AVG(temps.temp)]\
\n GapFillExec: group_expr=[date_bin_gapfill({dbg_args})@0], aggr_expr=[AVG(temps.temp)@1], stride=60000, time_range=Included(\"315532800000000000\")..Excluded(\"347155200000000000\")\
\n SortExec: [date_bin_gapfill({dbg_args})@0 ASC]\
\n AggregateExec: mode=Final, gby=[date_bin_gapfill({dbg_args})@0 as date_bin_gapfill({dbg_args})], aggr=[AVG(temps.temp)]"
).as_str()
).await?;
let sql = "SELECT date_bin_gapfill(interval '1 minute', time, timestamp '1970-01-01T00:00:00Z') AS minute, avg(temp)\
\nFROM temps\
\nWHERE time >= '1980-01-01T00:00:00Z' and time < '1981-01-01T00:00:00Z'\
\nGROUP BY minute;";
let explain = format_explain(sql).await?;
insta::assert_yaml_snapshot!(
explain,
@r###"
---
- " ProjectionExec: expr=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,Utf8(\"1970-01-01T00:00:00Z\"))@0 as minute, AVG(temps.temp)@1 as AVG(temps.temp)]"
- " GapFillExec: group_expr=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,Utf8(\"1970-01-01T00:00:00Z\"))@0], aggr_expr=[AVG(temps.temp)@1], stride=60000, time_range=Included(\"315532800000000000\")..Excluded(\"347155200000000000\")"
- " SortExec: expr=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,Utf8(\"1970-01-01T00:00:00Z\"))@0 ASC]"
- " AggregateExec: mode=Final, gby=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,Utf8(\"1970-01-01T00:00:00Z\"))@0 as date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,Utf8(\"1970-01-01T00:00:00Z\"))], aggr=[AVG(temps.temp)]"
- " AggregateExec: mode=Partial, gby=[datebin(60000, time@0, 0) as date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,Utf8(\"1970-01-01T00:00:00Z\"))], aggr=[AVG(temps.temp)]"
- " CoalesceBatchesExec: target_batch_size=8192"
- " FilterExec: time@0 >= 315532800000000000 AND time@0 < 347155200000000000"
- " EmptyExec: produce_one_row=false"
"###
);
Ok(())
}
@ -596,24 +605,30 @@ mod test {
// The call to `date_bin_gapfill` should be last in the SortExec
// expressions, even though it was not last on the SELECT list
// or the GROUP BY clause.
let dbg_args = "IntervalDayTime(\"60000\"),temps.time,Utf8(\"1970-01-01T00:00:00Z\")";
assert_explain(
"SELECT \
let sql = "SELECT \
\n loc,\
\n date_bin_gapfill(interval '1 minute', time, timestamp '1970-01-01T00:00:00Z') AS minute,\
\n concat('zz', loc) AS loczz,\
\n avg(temp)\
\nFROM temps\
\nWHERE time >= '1980-01-01T00:00:00Z' and time < '1981-01-01T00:00:00Z'
\nGROUP BY loc, minute, loczz;",
format!(
"ProjectionExec: expr=[loc@0 as loc, date_bin_gapfill({dbg_args})@1 as minute, concat(Utf8(\"zz\"),temps.loc)@2 as loczz, AVG(temps.temp)@3 as AVG(temps.temp)]\
\n GapFillExec: group_expr=[loc@0, date_bin_gapfill({dbg_args})@1, concat(Utf8(\"zz\"),temps.loc)@2], aggr_expr=[AVG(temps.temp)@3], stride=60000, time_range=Included(\"315532800000000000\")..Excluded(\"347155200000000000\")\
\n SortExec: [loc@0 ASC,concat(Utf8(\"zz\"),temps.loc)@2 ASC,date_bin_gapfill({dbg_args})@1 ASC]\
\n AggregateExec: mode=Final, gby=[loc@0 as loc, date_bin_gapfill({dbg_args})@1 as date_bin_gapfill({dbg_args}), concat(Utf8(\"zz\"),temps.loc)@2 as concat(Utf8(\"zz\"),temps.loc)], aggr=[AVG(temps.temp)]"
).as_str()
\nGROUP BY loc, minute, loczz;";
).await?;
let explain = format_explain(sql).await?;
insta::assert_yaml_snapshot!(
explain,
@r###"
---
- " ProjectionExec: expr=[loc@0 as loc, date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,Utf8(\"1970-01-01T00:00:00Z\"))@1 as minute, concat(Utf8(\"zz\"),temps.loc)@2 as loczz, AVG(temps.temp)@3 as AVG(temps.temp)]"
- " GapFillExec: group_expr=[loc@0, date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,Utf8(\"1970-01-01T00:00:00Z\"))@1, concat(Utf8(\"zz\"),temps.loc)@2], aggr_expr=[AVG(temps.temp)@3], stride=60000, time_range=Included(\"315532800000000000\")..Excluded(\"347155200000000000\")"
- " SortExec: expr=[loc@0 ASC,concat(Utf8(\"zz\"),temps.loc)@2 ASC,date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,Utf8(\"1970-01-01T00:00:00Z\"))@1 ASC]"
- " AggregateExec: mode=Final, gby=[loc@0 as loc, date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,Utf8(\"1970-01-01T00:00:00Z\"))@1 as date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,Utf8(\"1970-01-01T00:00:00Z\")), concat(Utf8(\"zz\"),temps.loc)@2 as concat(Utf8(\"zz\"),temps.loc)], aggr=[AVG(temps.temp)]"
- " AggregateExec: mode=Partial, gby=[loc@1 as loc, datebin(60000, time@0, 0) as date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,Utf8(\"1970-01-01T00:00:00Z\")), concat(zz, loc@1) as concat(Utf8(\"zz\"),temps.loc)], aggr=[AVG(temps.temp)]"
- " CoalesceBatchesExec: target_batch_size=8192"
- " FilterExec: time@0 >= 315532800000000000 AND time@0 < 347155200000000000"
- " EmptyExec: produce_one_row=false"
"###
);
Ok(())
}

View File

@ -5,7 +5,7 @@ use datafusion::{
};
use serde::Serialize;
use crate::test::format_plan;
use crate::test::format_execution_plan;
#[derive(Debug, Serialize)]
pub struct OptimizationTest {
@ -19,10 +19,10 @@ impl OptimizationTest {
O: PhysicalOptimizerRule,
{
Self {
input: format_plan(&input_plan),
input: format_execution_plan(&input_plan),
output: opt
.optimize(input_plan, &ConfigOptions::default())
.map(|plan| format_plan(&plan))
.map(|plan| format_execution_plan(&plan))
.map_err(|e| e.to_string()),
}
}

View File

@ -1352,7 +1352,7 @@ impl Deduplicater {
#[cfg(test)]
mod test {
use super::*;
use crate::test::{format_plan, raw_data, TestChunk};
use crate::test::{format_execution_plan, raw_data, TestChunk};
use arrow::datatypes::DataType;
use arrow_util::{
assert_batches_eq, assert_batches_sorted_eq, test_util::equalize_batch_schemas,
@ -2850,21 +2850,21 @@ mod test {
// plan should include SortExec because chunks are not yet sorted
insta::assert_yaml_snapshot!(
format_plan(&plan),
format_execution_plan(&plan),
@r###"
---
- " UnionExec"
- " DeduplicateExec: [tag1@1 ASC,time@2 ASC]"
- " SortPreservingMergeExec: [tag1@1 ASC,time@2 ASC]"
- " UnionExec"
- " SortExec: [tag1@1 ASC,time@2 ASC]"
- " SortExec: expr=[tag1@1 ASC,time@2 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=10"
- " SortExec: [tag1@1 ASC,time@2 ASC]"
- " SortExec: expr=[tag1@1 ASC,time@2 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=5"
- " DeduplicateExec: [tag1@1 ASC,time@2 ASC]"
- " SortExec: [tag1@1 ASC,time@2 ASC]"
- " SortExec: expr=[tag1@1 ASC,time@2 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=4"
- " UnionExec"
@ -2911,7 +2911,7 @@ mod test {
// Plan is very simple with one single RecordBatchesExec that includes 4 chunks
insta::assert_yaml_snapshot!(
format_plan(&plan),
format_execution_plan(&plan),
@r###"
---
- " UnionExec"
@ -3066,7 +3066,7 @@ mod test {
// plan should include SortExec because chunks are not yet sorted on the specified sort_key
insta::assert_yaml_snapshot!(
format_plan(&plan),
format_execution_plan(&plan),
@r###"
---
- " SortPreservingMergeExec: [tag1@1 ASC,time@2 ASC]"
@ -3074,17 +3074,17 @@ mod test {
- " DeduplicateExec: [tag1@1 ASC,time@2 ASC]"
- " SortPreservingMergeExec: [tag1@1 ASC,time@2 ASC]"
- " UnionExec"
- " SortExec: [tag1@1 ASC,time@2 ASC]"
- " SortExec: expr=[tag1@1 ASC,time@2 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=10"
- " SortExec: [tag1@1 ASC,time@2 ASC]"
- " SortExec: expr=[tag1@1 ASC,time@2 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=5"
- " DeduplicateExec: [tag1@1 ASC,time@2 ASC]"
- " SortExec: [tag1@1 ASC,time@2 ASC]"
- " SortExec: expr=[tag1@1 ASC,time@2 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=4"
- " SortExec: [tag1@1 ASC,time@2 ASC]"
- " SortExec: expr=[tag1@1 ASC,time@2 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=3"
"###
@ -3132,21 +3132,21 @@ mod test {
// there will be a UnionExec and a SortPreservinngMergeExec on top to merge the sorted chunks
// plan should include SortExec because chunks are not yet sorted
insta::assert_yaml_snapshot!(
format_plan(&plan),
format_execution_plan(&plan),
@r###"
---
- " SortPreservingMergeExec: [tag1@1 ASC,time@2 ASC]"
- " UnionExec"
- " SortExec: [tag1@1 ASC,time@2 ASC]"
- " SortExec: expr=[tag1@1 ASC,time@2 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=10"
- " SortExec: [tag1@1 ASC,time@2 ASC]"
- " SortExec: expr=[tag1@1 ASC,time@2 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=5"
- " SortExec: [tag1@1 ASC,time@2 ASC]"
- " SortExec: expr=[tag1@1 ASC,time@2 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=3"
- " SortExec: [tag1@1 ASC,time@2 ASC]"
- " SortExec: expr=[tag1@1 ASC,time@2 ASC]"
- " UnionExec"
- " RecordBatchesExec: batches_groups=1 batches=1 total_rows=4"
"###
@ -3229,7 +3229,7 @@ mod test {
// The plan should look like this. No SortExec at all because
// all chunks are already sorted on the same requested sort key
insta::assert_yaml_snapshot!(
format_plan(&plan),
format_execution_plan(&plan),
@r###"
---
- " SortPreservingMergeExec: [tag1@1 ASC,time@2 ASC]"
@ -3266,7 +3266,7 @@ mod test {
)
.unwrap();
insta::assert_yaml_snapshot!(
format_plan(&plan),
format_execution_plan(&plan),
@r###"
---
- " SortPreservingMergeExec: [tag1@1 ASC,time@2 ASC]"
@ -3437,7 +3437,7 @@ mod test {
// The plan should look like this. No SortExec at all because
// all chunks are already sorted on the same requested sort key
insta::assert_yaml_snapshot!(
format_plan(&plan),
format_execution_plan(&plan),
@r###"
---
- " SortPreservingMergeExec: [tag1@1 ASC,time@2 ASC]"
@ -3491,7 +3491,7 @@ mod test {
// Since all 10 chunks each is sorted on the same otuput sort key, the plan should scan 10 chunks
// without any SortExec nor DeduplicateExec. Only a UnionExec and a SortPreservingMergeExec on top to merge them
insta::assert_yaml_snapshot!(
format_plan(&plan),
format_execution_plan(&plan),
@r###"
---
- " SortPreservingMergeExec: [tag1@1 ASC,time@2 ASC]"

View File

@ -24,13 +24,13 @@ use data_types::{
ChunkId, ChunkOrder, ColumnSummary, DeletePredicate, InfluxDbType, PartitionId, StatValues,
Statistics, TableSummary,
};
use datafusion::catalog::schema::SchemaProvider;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::DataFusionError;
use datafusion::execution::context::SessionState;
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::{catalog::catalog::CatalogProvider, physical_plan::displayable};
use datafusion::{catalog::schema::SchemaProvider, logical_expr::LogicalPlan};
use hashbrown::HashSet;
use itertools::Itertools;
use observability_deps::tracing::debug;
@ -1220,10 +1220,16 @@ pub async fn raw_data(chunks: &[Arc<dyn QueryChunk>]) -> Vec<RecordBatch> {
batches
}
pub fn format_plan(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
let s = displayable(plan.as_ref()).indent().to_string();
pub fn format_logical_plan(plan: &LogicalPlan) -> Vec<String> {
format_lines(&plan.display_indent().to_string())
}
pub fn format_execution_plan(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
format_lines(&displayable(plan.as_ref()).indent().to_string())
}
fn format_lines(s: &str) -> Vec<String> {
s.trim()
.to_string()
.split('\n')
.map(|s| {
// Always add a leading space to ensure tha all lines in the YAML insta snapshots are quoted, otherwise the

View File

@ -514,7 +514,7 @@ mod tests {
"| logical_plan | Sort: mem.host ASC NULLS LAST, mem.time ASC NULLS LAST |",
"| | Projection: mem.host, mem.perc, mem.time |",
"| | TableScan: mem projection=[host, perc, time] |",
"| physical_plan | SortExec: [host@0 ASC NULLS LAST,time@2 ASC NULLS LAST] |",
"| physical_plan | SortExec: expr=[host@0 ASC NULLS LAST,time@2 ASC NULLS LAST] |",
"| | CoalescePartitionsExec |",
"| | ProjectionExec: expr=[host@0 as host, perc@1 as perc, time@2 as time] |",
"| | UnionExec |",

View File

@ -29,7 +29,7 @@ bytes = { version = "1", features = ["std"] }
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "iana-time-zone", "serde", "std", "winapi"] }
crossbeam-utils = { version = "0.8", features = ["std"] }
crypto-common = { version = "0.1", default-features = false, features = ["std"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "fc211c37c67bbcb43e0b1010c36fbf6dd5e0c6b9", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "253550c6c936f75f654dcdc9480025a9ef55d9fd", features = ["async-compression", "bzip2", "compression", "crypto_expressions", "flate2", "regex_expressions", "unicode_expressions", "xz2"] }
digest = { version = "0.10", features = ["alloc", "block-buffer", "core-api", "mac", "std", "subtle"] }
either = { version = "1", features = ["use_std"] }
fixedbitset = { version = "0.4", features = ["std"] }