chore: Update DataFusion (#8190)

* chore: Update DataFusion

* chore: Run cargo hakari tasks

* fix: Update for API changes

* fix: use display format

* chore: Update explain plan output

* fix: update plans

---------

Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2023-07-10 05:54:50 -04:00 committed by GitHub
parent 1f1b17f712
commit 3ce11d8d66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 137 additions and 98 deletions

70
Cargo.lock generated
View File

@ -1343,8 +1343,8 @@ dependencies = [
[[package]]
name = "datafusion"
version = "26.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1367,8 +1367,8 @@ dependencies = [
"futures",
"glob",
"hashbrown 0.14.0",
"indexmap 1.9.3",
"itertools 0.10.5",
"indexmap 2.0.0",
"itertools 0.11.0",
"lazy_static",
"log",
"num_cpus",
@ -1379,7 +1379,7 @@ dependencies = [
"pin-project-lite",
"rand",
"smallvec",
"sqlparser 0.34.0",
"sqlparser",
"tempfile",
"tokio",
"tokio-util",
@ -1391,8 +1391,8 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "26.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14"
dependencies = [
"arrow",
"arrow-array",
@ -1400,13 +1400,13 @@ dependencies = [
"num_cpus",
"object_store",
"parquet",
"sqlparser 0.34.0",
"sqlparser",
]
[[package]]
name = "datafusion-execution"
version = "26.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14"
dependencies = [
"dashmap",
"datafusion-common",
@ -1422,22 +1422,22 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "26.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14"
dependencies = [
"ahash 0.8.3",
"arrow",
"datafusion-common",
"lazy_static",
"sqlparser 0.34.0",
"sqlparser",
"strum 0.25.0",
"strum_macros 0.25.0",
]
[[package]]
name = "datafusion-optimizer"
version = "26.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14"
dependencies = [
"arrow",
"async-trait",
@ -1446,15 +1446,15 @@ dependencies = [
"datafusion-expr",
"datafusion-physical-expr",
"hashbrown 0.14.0",
"itertools 0.10.5",
"itertools 0.11.0",
"log",
"regex-syntax 0.7.3",
]
[[package]]
name = "datafusion-physical-expr"
version = "26.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1469,8 +1469,8 @@ dependencies = [
"datafusion-row",
"half 2.3.1",
"hashbrown 0.14.0",
"indexmap 1.9.3",
"itertools 0.10.5",
"indexmap 2.0.0",
"itertools 0.11.0",
"lazy_static",
"libc",
"md-5",
@ -1485,8 +1485,8 @@ dependencies = [
[[package]]
name = "datafusion-proto"
version = "26.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14"
dependencies = [
"arrow",
"chrono",
@ -1499,8 +1499,8 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "26.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14"
dependencies = [
"arrow",
"datafusion-common",
@ -1510,15 +1510,15 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "26.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a"
version = "27.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14"
dependencies = [
"arrow",
"arrow-schema",
"datafusion-common",
"datafusion-expr",
"log",
"sqlparser 0.34.0",
"sqlparser",
]
[[package]]
@ -2582,7 +2582,7 @@ version = "0.1.0"
dependencies = [
"generated_types",
"snafu",
"sqlparser 0.35.0",
"sqlparser",
"workspace-hack",
]
@ -4171,7 +4171,7 @@ dependencies = [
"query_functions",
"schema",
"snafu",
"sqlparser 0.35.0",
"sqlparser",
"test_helpers",
"workspace-hack",
]
@ -5235,16 +5235,6 @@ dependencies = [
"unicode_categories",
]
[[package]]
name = "sqlparser"
version = "0.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37d3706eefb17039056234df6b566b0014f303f867f2656108334a55b8096f59"
dependencies = [
"log",
"sqlparser_derive",
]
[[package]]
name = "sqlparser"
version = "0.35.0"
@ -5252,6 +5242,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca597d77c98894be1f965f2e4e2d2a61575d4998088e655476c73715c54b2b43"
dependencies = [
"log",
"sqlparser_derive",
]
[[package]]
@ -6729,6 +6720,7 @@ dependencies = [
"sha2",
"similar",
"smallvec",
"sqlparser",
"sqlx",
"sqlx-core",
"sqlx-macros",

View File

@ -120,8 +120,8 @@ license = "MIT OR Apache-2.0"
[workspace.dependencies]
arrow = { version = "42.0.0" }
arrow-flight = { version = "42.0.0" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "e0330d6c957c724fcc91b673c6ae10c535d9a33a", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "e0330d6c957c724fcc91b673c6ae10c535d9a33a" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46752163bd4f30f778850160513e8ca7f15fcf14", default-features = false }
datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46752163bd4f30f778850160513e8ca7f15fcf14" }
hashbrown = { version = "0.14.0" }
object_store = { version = "0.6.0" }
parquet = { version = "42.0.0" }

View File

@ -412,7 +412,7 @@ mod tests {
let ts_predicate_expr = make_range_expr(101, 202, "time");
let expected_string =
"TimestampNanosecond(101, None) <= time AND time < TimestampNanosecond(202, None)";
let actual_string = format!("{ts_predicate_expr:?}");
let actual_string = format!("{ts_predicate_expr}");
assert_eq!(actual_string, expected_string);
}

View File

@ -109,7 +109,7 @@
| physical_plan | ProjectionExec: expr=[date_bin(Utf8("1 month"),cpu.time,Utf8("1970-12-31T00:15:00Z"))@0 as month, COUNT(cpu.user)@1 as COUNT(cpu.user)] |
| | AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("1 month"),cpu.time,Utf8("1970-12-31T00:15:00Z"))@0 as date_bin(Utf8("1 month"),cpu.time,Utf8("1970-12-31T00:15:00Z"))], aggr=[COUNT(cpu.user)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([Column { name: "date_bin(Utf8(\"1 month\"),cpu.time,Utf8(\"1970-12-31T00:15:00Z\"))", index: 0 }], 4), input_partitions=4 |
| | RepartitionExec: partitioning=Hash([date_bin(Utf8("1 month"),cpu.time,Utf8("1970-12-31T00:15:00Z"))@0], 4), input_partitions=4 |
| | AggregateExec: mode=Partial, gby=[date_bin(79228162514264337593543950336, time@0, 31450500000000000) as date_bin(Utf8("1 month"),cpu.time,Utf8("1970-12-31T00:15:00Z"))], aggr=[COUNT(cpu.user)] |
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | CoalesceBatchesExec: target_batch_size=8192 |

View File

@ -23,7 +23,7 @@
| plan_type | plan |
----------
| logical_plan | Projection: date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time) AS minute, COUNT(cpu.user) |
| | GapFill: groupBy=[[date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)]], aggr=[[COUNT(cpu.user)]], time_column=date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time), stride=IntervalMonthDayNano("600000000000"), range=Included(TimestampNanosecond(957528000000000000, None))..Included(TimestampNanosecond(957531540000000000, None)) |
| | GapFill: groupBy=[date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)], aggr=[[COUNT(cpu.user)]], time_column=date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time), stride=IntervalMonthDayNano("600000000000"), range=Included(Literal(TimestampNanosecond(957528000000000000, None)))..Included(Literal(TimestampNanosecond(957531540000000000, None))) |
| | Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("600000000000"), cpu.time) AS date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)]], aggr=[[COUNT(cpu.user)]] |
| | TableScan: cpu projection=[time, user], full_filters=[cpu.time >= TimestampNanosecond(957528000000000000, None), cpu.time <= TimestampNanosecond(957531540000000000, None)] |
| physical_plan | ProjectionExec: expr=[date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@0 as minute, COUNT(cpu.user)@1 as COUNT(cpu.user)] |
@ -32,7 +32,7 @@
| | SortExec: expr=[date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@0 ASC] |
| | AggregateExec: mode=FinalPartitioned, gby=[date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@0 as date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)], aggr=[COUNT(cpu.user)] |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([Column { name: "date_bin_gapfill(IntervalMonthDayNano(\"600000000000\"),cpu.time)", index: 0 }], 4), input_partitions=4 |
| | RepartitionExec: partitioning=Hash([date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@0], 4), input_partitions=4 |
| | AggregateExec: mode=Partial, gby=[date_bin(600000000000, time@0) as date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)], aggr=[COUNT(cpu.user)] |
| | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 |
| | CoalesceBatchesExec: target_batch_size=8192 |
@ -116,7 +116,7 @@ Error during planning: gap-filling query is missing lower time bound
| plan_type | plan |
----------
| logical_plan | Projection: cpu.region, date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time) AS minute, AVG(cpu.user) AS locf(AVG(cpu.user)) |
| | GapFill: groupBy=[[cpu.region, date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)]], aggr=[[LOCF(AVG(cpu.user))]], time_column=date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time), stride=IntervalMonthDayNano("600000000000"), range=Included(TimestampNanosecond(957528000000000000, None))..Included(TimestampNanosecond(957531540000000000, None)) |
| | GapFill: groupBy=[cpu.region, date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)], aggr=[[LOCF(AVG(cpu.user))]], time_column=date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time), stride=IntervalMonthDayNano("600000000000"), range=Included(Literal(TimestampNanosecond(957528000000000000, None)))..Included(Literal(TimestampNanosecond(957531540000000000, None))) |
| | Aggregate: groupBy=[[cpu.region, date_bin(IntervalMonthDayNano("600000000000"), cpu.time) AS date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)]], aggr=[[AVG(cpu.user)]] |
| | TableScan: cpu projection=[region, time, user], full_filters=[cpu.time >= TimestampNanosecond(957528000000000000, None), cpu.time <= TimestampNanosecond(957531540000000000, None)] |
| physical_plan | ProjectionExec: expr=[region@0 as region, date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@1 as minute, AVG(cpu.user)@2 as locf(AVG(cpu.user))] |
@ -125,7 +125,7 @@ Error during planning: gap-filling query is missing lower time bound
| | SortExec: expr=[region@0 ASC,date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@1 ASC] |
| | AggregateExec: mode=FinalPartitioned, gby=[region@0 as region, date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@1 as date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)], aggr=[AVG(cpu.user)], ordering_mode=PartiallyOrdered |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | RepartitionExec: partitioning=Hash([Column { name: "region", index: 0 }, Column { name: "date_bin_gapfill(IntervalMonthDayNano(\"600000000000\"),cpu.time)", index: 1 }], 4), input_partitions=1 |
| | RepartitionExec: partitioning=Hash([region@0, date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@1], 4), input_partitions=1 |
| | AggregateExec: mode=Partial, gby=[region@0 as region, date_bin(600000000000, time@1) as date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)], aggr=[AVG(cpu.user)], ordering_mode=PartiallyOrdered |
| | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: time@1 >= 957528000000000000 AND time@1 <= 957531540000000000 |

View File

@ -933,7 +933,7 @@ name: physical_plan
ProjectionExec: expr=[m0 as iox::measurement, 0 as time, tag0@0 as tag0, COUNT(m0.f64)@1 as count, SUM(m0.f64)@2 as sum, STDDEV(m0.f64)@3 as stddev]
AggregateExec: mode=FinalPartitioned, gby=[tag0@0 as tag0], aggr=[COUNT(m0.f64), SUM(m0.f64), STDDEV(m0.f64)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([Column { name: "tag0", index: 0 }], 4), input_partitions=4
RepartitionExec: partitioning=Hash([tag0@0], 4), input_partitions=4
RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
AggregateExec: mode=Partial, gby=[tag0@1 as tag0], aggr=[COUNT(m0.f64), SUM(m0.f64), STDDEV(m0.f64)]
ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[f64, tag0]
@ -942,7 +942,7 @@ name: physical_plan
RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=4
AggregateExec: mode=FinalPartitioned, gby=[tag0@0 as tag0], aggr=[COUNT(m1.f64), SUM(m1.f64), STDDEV(m1.f64)], ordering_mode=FullyOrdered
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([Column { name: "tag0", index: 0 }], 4), input_partitions=1
RepartitionExec: partitioning=Hash([tag0@0], 4), input_partitions=1
AggregateExec: mode=Partial, gby=[tag0@1 as tag0], aggr=[COUNT(m1.f64), SUM(m1.f64), STDDEV(m1.f64)], ordering_mode=FullyOrdered
ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, projection=[f64, tag0], output_ordering=[tag0@1 ASC]
-- InfluxQL: SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0, m1 GROUP BY tag0;

View File

@ -223,12 +223,18 @@ impl UserDefinedLogicalNodeCore for GapFill {
})
.collect::<Vec<String>>()
.join(", ");
let group_expr = self
.group_expr
.iter()
.map(|e| e.to_string())
.collect::<Vec<String>>()
.join(", ");
write!(
f,
"{}: groupBy=[{:?}], aggr=[[{}]], time_column={}, stride={}, range={:?}",
"{}: groupBy=[{group_expr}], aggr=[[{aggr_expr}]], time_column={}, stride={}, range={:?}",
self.name(),
self.group_expr,
aggr_expr,
self.params.time_column,
self.params.stride,
self.params.time_range,
@ -740,7 +746,7 @@ mod test {
format_logical_plan(&plan),
@r###"
---
- " GapFill: groupBy=[[loc, time]], aggr=[[temp]], time_column=time, stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))"
- " GapFill: groupBy=[loc, time], aggr=[[temp]], time_column=time, stride=IntervalDayTime(\"60000\"), range=Included(Literal(TimestampNanosecond(1000, None)))..Excluded(Literal(TimestampNanosecond(2000, None)))"
- " TableScan: temps"
"###
);

View File

@ -859,7 +859,7 @@ mod test {
format_optimized_plan(&plan)?,
@r###"
---
- "GapFill: groupBy=[[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))"
- "GapFill: groupBy=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(Literal(TimestampNanosecond(1000, None)))..Excluded(Literal(TimestampNanosecond(2000, None)))"
- " Aggregate: groupBy=[[date_bin(IntervalDayTime(\"60000\"), temps.time) AS date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[AVG(temps.temp)]]"
- " Filter: temps.time >= TimestampNanosecond(1000, None) AND temps.time < TimestampNanosecond(2000, None)"
- " TableScan: temps"
@ -889,7 +889,7 @@ mod test {
format_optimized_plan(&plan)?,
@r###"
---
- "GapFill: groupBy=[[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(7, None))]], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(7, None)), stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))"
- "GapFill: groupBy=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(7, None))], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(7, None)), stride=IntervalDayTime(\"60000\"), range=Included(Literal(TimestampNanosecond(1000, None)))..Excluded(Literal(TimestampNanosecond(2000, None)))"
- " Aggregate: groupBy=[[date_bin(IntervalDayTime(\"60000\"), temps.time, TimestampNanosecond(7, None)) AS date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(7, None))]], aggr=[[AVG(temps.temp)]]"
- " Filter: temps.time >= TimestampNanosecond(1000, None) AND temps.time < TimestampNanosecond(2000, None)"
- " TableScan: temps"
@ -918,7 +918,7 @@ mod test {
format_optimized_plan(&plan)?,
@r###"
---
- "GapFill: groupBy=[[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), temps.loc]], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))"
- "GapFill: groupBy=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), temps.loc], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(Literal(TimestampNanosecond(1000, None)))..Excluded(Literal(TimestampNanosecond(2000, None)))"
- " Aggregate: groupBy=[[date_bin(IntervalDayTime(\"60000\"), temps.time) AS date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), temps.loc]], aggr=[[AVG(temps.temp)]]"
- " Filter: temps.time >= TimestampNanosecond(1000, None) AND temps.time < TimestampNanosecond(2000, None)"
- " TableScan: temps"
@ -970,7 +970,7 @@ mod test {
@r###"
---
- "Projection: date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), AVG(temps.temp)"
- " GapFill: groupBy=[[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))"
- " GapFill: groupBy=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(Literal(TimestampNanosecond(1000, None)))..Excluded(Literal(TimestampNanosecond(2000, None)))"
- " Aggregate: groupBy=[[date_bin(IntervalDayTime(\"60000\"), temps.time) AS date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[AVG(temps.temp)]]"
- " Filter: temps.time >= TimestampNanosecond(1000, None) AND temps.time < TimestampNanosecond(2000, None)"
- " TableScan: temps"
@ -1005,7 +1005,7 @@ mod test {
@r###"
---
- "Projection: date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), AVG(temps.temp) AS locf(AVG(temps.temp)), MIN(temps.temp) AS locf(MIN(temps.temp))"
- " GapFill: groupBy=[[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[LOCF(AVG(temps.temp)), LOCF(MIN(temps.temp))]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))"
- " GapFill: groupBy=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)], aggr=[[LOCF(AVG(temps.temp)), LOCF(MIN(temps.temp))]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(Literal(TimestampNanosecond(1000, None)))..Excluded(Literal(TimestampNanosecond(2000, None)))"
- " Aggregate: groupBy=[[date_bin(IntervalDayTime(\"60000\"), temps.time) AS date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[AVG(temps.temp), MIN(temps.temp)]]"
- " Filter: temps.time >= TimestampNanosecond(1000, None) AND temps.time < TimestampNanosecond(2000, None)"
- " TableScan: temps"
@ -1039,7 +1039,7 @@ mod test {
@r###"
---
- "Projection: date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), MIN(temps.temp) AS locf(MIN(temps.temp)) AS locf_min_temp"
- " GapFill: groupBy=[[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[AVG(temps.temp), LOCF(MIN(temps.temp))]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))"
- " GapFill: groupBy=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)], aggr=[[AVG(temps.temp), LOCF(MIN(temps.temp))]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(Literal(TimestampNanosecond(1000, None)))..Excluded(Literal(TimestampNanosecond(2000, None)))"
- " Aggregate: groupBy=[[date_bin(IntervalDayTime(\"60000\"), temps.time) AS date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[AVG(temps.temp), MIN(temps.temp)]]"
- " Filter: temps.time >= TimestampNanosecond(1000, None) AND temps.time < TimestampNanosecond(2000, None)"
- " TableScan: temps"
@ -1074,7 +1074,7 @@ mod test {
@r###"
---
- "Projection: date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), AVG(temps.temp) AS interpolate(AVG(temps.temp)), MIN(temps.temp) AS interpolate(MIN(temps.temp))"
- " GapFill: groupBy=[[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[INTERPOLATE(AVG(temps.temp)), INTERPOLATE(MIN(temps.temp))]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))"
- " GapFill: groupBy=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)], aggr=[[INTERPOLATE(AVG(temps.temp)), INTERPOLATE(MIN(temps.temp))]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(Literal(TimestampNanosecond(1000, None)))..Excluded(Literal(TimestampNanosecond(2000, None)))"
- " Aggregate: groupBy=[[date_bin(IntervalDayTime(\"60000\"), temps.time) AS date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[AVG(temps.temp), MIN(temps.temp)]]"
- " Filter: temps.time >= TimestampNanosecond(1000, None) AND temps.time < TimestampNanosecond(2000, None)"
- " TableScan: temps"

View File

@ -3763,7 +3763,7 @@ mod test {
Filter: NOT difference IS NULL [time:Timestamp(Nanosecond, None);N, difference:Float64;N]
Projection: time, difference(AVG(cpu.usage_idle)) AS difference [time:Timestamp(Nanosecond, None);N, difference:Float64;N]
WindowAggr: windowExpr=[[AggregateUDF { name: "difference", signature: Signature { type_signature: OneOf([Exact([Int64]), Exact([UInt64]), Exact([Float64])]), volatility: Immutable }, fun: "<FUNC>" }(AVG(cpu.usage_idle)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS difference(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, difference(AVG(cpu.usage_idle)):Float64;N]
GapFill: groupBy=[[time]], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
GapFill: groupBy=[time], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
@ -3789,7 +3789,7 @@ mod test {
Filter: NOT non_negative_difference IS NULL [time:Timestamp(Nanosecond, None);N, non_negative_difference:Float64;N]
Projection: time, non_negative_difference(AVG(cpu.usage_idle)) AS non_negative_difference [time:Timestamp(Nanosecond, None);N, non_negative_difference:Float64;N]
WindowAggr: windowExpr=[[AggregateUDF { name: "non_negative_difference", signature: Signature { type_signature: OneOf([Exact([Int64]), Exact([UInt64]), Exact([Float64])]), volatility: Immutable }, fun: "<FUNC>" }(AVG(cpu.usage_idle)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS non_negative_difference(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, non_negative_difference(AVG(cpu.usage_idle)):Float64;N]
GapFill: groupBy=[[time]], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
GapFill: groupBy=[time], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
@ -3815,7 +3815,7 @@ mod test {
Filter: NOT moving_average IS NULL [time:Timestamp(Nanosecond, None);N, moving_average:Float64;N]
Projection: time, moving_average(AVG(cpu.usage_idle),Int64(3)) AS moving_average [time:Timestamp(Nanosecond, None);N, moving_average:Float64;N]
WindowAggr: windowExpr=[[AggregateUDF { name: "moving_average", signature: Signature { type_signature: OneOf([Exact([Int64, Int64]), Exact([UInt64, Int64]), Exact([Float64, Int64])]), volatility: Immutable }, fun: "<FUNC>" }(AVG(cpu.usage_idle), Int64(3)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS moving_average(AVG(cpu.usage_idle),Int64(3))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, moving_average(AVG(cpu.usage_idle),Int64(3)):Float64;N]
GapFill: groupBy=[[time]], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
GapFill: groupBy=[time], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
@ -3844,7 +3844,7 @@ mod test {
Filter: NOT derivative IS NULL [time:Timestamp(Nanosecond, None);N, derivative:Float64;N]
Projection: time, derivative(AVG(cpu.usage_idle)) AS derivative [time:Timestamp(Nanosecond, None);N, derivative:Float64;N]
WindowAggr: windowExpr=[[AggregateUDF { name: "derivative(unit: 10000000000)", signature: Signature { type_signature: OneOf([Exact([Timestamp(Nanosecond, None), Int64]), Exact([Timestamp(Nanosecond, None), UInt64]), Exact([Timestamp(Nanosecond, None), Float64])]), volatility: Immutable }, fun: "<FUNC>" }(time, AVG(cpu.usage_idle)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS derivative(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, derivative(AVG(cpu.usage_idle)):Float64;N]
GapFill: groupBy=[[time]], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
GapFill: groupBy=[time], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
@ -3870,7 +3870,7 @@ mod test {
Filter: NOT non_negative_derivative IS NULL [time:Timestamp(Nanosecond, None);N, non_negative_derivative:Float64;N]
Projection: time, non_negative_derivative(AVG(cpu.usage_idle)) AS non_negative_derivative [time:Timestamp(Nanosecond, None);N, non_negative_derivative:Float64;N]
WindowAggr: windowExpr=[[AggregateUDF { name: "non_negative_derivative(unit: 10000000000)", signature: Signature { type_signature: OneOf([Exact([Timestamp(Nanosecond, None), Int64]), Exact([Timestamp(Nanosecond, None), UInt64]), Exact([Timestamp(Nanosecond, None), Float64])]), volatility: Immutable }, fun: "<FUNC>" }(time, AVG(cpu.usage_idle)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS non_negative_derivative(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, non_negative_derivative(AVG(cpu.usage_idle)):Float64;N]
GapFill: groupBy=[[time]], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
GapFill: groupBy=[time], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
@ -3923,7 +3923,7 @@ mod test {
use super::*;
#[test]
fn test_selectors() {
fn test_selectors_query() {
// single-selector query
assert_snapshot!(plan("SELECT LAST(usage_idle) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N]
@ -3931,6 +3931,9 @@ mod test {
Aggregate: groupBy=[[]], aggr=[[selector_last(cpu.usage_idle, cpu.time)]] [selector_last(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
#[test]
fn test_selectors_query_grouping() {
// single-selector, grouping by tags
assert_snapshot!(plan("SELECT LAST(usage_idle) FROM cpu GROUP BY cpu"), @r###"
Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, cpu:Dictionary(Int32, Utf8);N, last:Float64;N]
@ -3938,27 +3941,33 @@ mod test {
Aggregate: groupBy=[[cpu.cpu]], aggr=[[selector_last(cpu.usage_idle, cpu.time)]] [cpu:Dictionary(Int32, Utf8);N, selector_last(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
#[test]
fn test_selectors_aggregate_gby_time() {
// aggregate query, as we're grouping by time
assert_snapshot!(plan("SELECT LAST(usage_idle) FROM cpu GROUP BY TIME(5s)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, (selector_last(cpu.usage_idle,cpu.time))[value] AS last [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N]
GapFill: groupBy=[[time]], aggr=[[selector_last(cpu.usage_idle,cpu.time)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
GapFill: groupBy=[time], aggr=[[selector_last(cpu.usage_idle,cpu.time)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("5000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[selector_last(cpu.usage_idle, cpu.time)]] [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
#[test]
fn test_selectors_aggregate_gby_time_gapfill() {
// aggregate query, grouping by time with gap filling
assert_snapshot!(plan("SELECT FIRST(usage_idle) FROM cpu GROUP BY TIME(5s) FILL(0)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, first:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, (coalesce_struct(selector_first(cpu.usage_idle,cpu.time), Struct({value:Float64(0),time:TimestampNanosecond(0, None)})))[value] AS first [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, first:Float64;N]
GapFill: groupBy=[[time]], aggr=[[selector_first(cpu.usage_idle,cpu.time)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, selector_first(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
GapFill: groupBy=[time], aggr=[[selector_first(cpu.usage_idle,cpu.time)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, selector_first(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("5000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[selector_first(cpu.usage_idle, cpu.time)]] [time:Timestamp(Nanosecond, None);N, selector_first(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
#[test]
fn test_selectors_aggregate_multi_selectors() {
// aggregate query, as we're specifying multiple selectors or aggregates
assert_snapshot!(plan("SELECT LAST(usage_idle), FIRST(usage_idle) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), last:Float64;N, first:Float64;N]
@ -3966,13 +3975,18 @@ mod test {
Aggregate: groupBy=[[]], aggr=[[selector_last(cpu.usage_idle, cpu.time), selector_first(cpu.usage_idle, cpu.time)]] [selector_last(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N, selector_first(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
#[test]
fn test_selectors_and_aggregate() {
assert_snapshot!(plan("SELECT LAST(usage_idle), COUNT(usage_idle) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), last:Float64;N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, TimestampNanosecond(0, None) AS time, (selector_last(cpu.usage_idle,cpu.time))[value] AS last, COUNT(cpu.usage_idle) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), last:Float64;N, count:Int64;N]
Aggregate: groupBy=[[]], aggr=[[selector_last(cpu.usage_idle, cpu.time), COUNT(cpu.usage_idle)]] [selector_last(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N, COUNT(cpu.usage_idle):Int64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
#[test]
fn test_selectors_additional_fields() {
// additional fields
assert_snapshot!(plan("SELECT LAST(usage_idle), usage_system FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N, usage_system:Float64;N]
@ -3980,29 +3994,40 @@ mod test {
Aggregate: groupBy=[[]], aggr=[[selector_last(cpu.usage_idle, cpu.time, cpu.usage_system)]] [selector_last(cpu.usage_idle,cpu.time,cpu.usage_system):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "other_1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
#[test]
fn test_selectors_additional_fields_2() {
assert_snapshot!(plan("SELECT LAST(usage_idle), usage_system FROM cpu GROUP BY TIME(5s)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N, usage_system:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, (selector_last(cpu.usage_idle,cpu.time,cpu.usage_system))[value] AS last, (selector_last(cpu.usage_idle,cpu.time,cpu.usage_system))[other_1] AS usage_system [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N, usage_system:Float64;N]
GapFill: groupBy=[[time]], aggr=[[selector_last(cpu.usage_idle,cpu.time,cpu.usage_system)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time,cpu.usage_system):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "other_1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
GapFill: groupBy=[time], aggr=[[selector_last(cpu.usage_idle,cpu.time,cpu.usage_system)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time,cpu.usage_system):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "other_1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("5000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[selector_last(cpu.usage_idle, cpu.time, cpu.usage_system)]] [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time,cpu.usage_system):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "other_1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
#[test]
fn test_selectors_additional_fields_3() {
assert_snapshot!(plan("SELECT LAST(usage_idle), usage_system FROM cpu GROUP BY TIME(5s) FILL(0)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N, usage_system:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, (coalesce_struct(selector_last(cpu.usage_idle,cpu.time,cpu.usage_system), Struct({value:Float64(0),time:TimestampNanosecond(0, None),other_1:Float64(0)})))[value] AS last, (coalesce_struct(selector_last(cpu.usage_idle,cpu.time,cpu.usage_system), Struct({value:Float64(0),time:TimestampNanosecond(0, None),other_1:Float64(0)})))[other_1] AS usage_system [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N, usage_system:Float64;N]
GapFill: groupBy=[[time]], aggr=[[selector_last(cpu.usage_idle,cpu.time,cpu.usage_system)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time,cpu.usage_system):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "other_1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
GapFill: groupBy=[time], aggr=[[selector_last(cpu.usage_idle,cpu.time,cpu.usage_system)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time,cpu.usage_system):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "other_1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("5000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[selector_last(cpu.usage_idle, cpu.time, cpu.usage_system)]] [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time,cpu.usage_system):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "other_1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
#[test]
fn test_selectors_additional_fields_4() {
assert_snapshot!(plan("SELECT FIRST(f), first FROM name_clash"), @r###"
Sort: time ASC NULLS LAST, first_1 ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, first:Float64;N, first_1:Dictionary(Int32, Utf8);N]
Projection: Dictionary(Int32, Utf8("name_clash")) AS iox::measurement, (selector_first(name_clash.f,name_clash.time,name_clash.first))[time] AS time, (selector_first(name_clash.f,name_clash.time,name_clash.first))[value] AS first, (selector_first(name_clash.f,name_clash.time,name_clash.first))[other_1] AS first_1 [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, first:Float64;N, first_1:Dictionary(Int32, Utf8);N]
Aggregate: groupBy=[[]], aggr=[[selector_first(name_clash.f, name_clash.time, name_clash.first)]] [selector_first(name_clash.f,name_clash.time,name_clash.first):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "other_1", data_type: Dictionary(Int32, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
TableScan: name_clash [f:Float64;N, first:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)]
"###);
}
#[test]
fn test_selectors_query_other_other_selectors_1() {
// Validate we can call the remaining supported selector functions
assert_snapshot!(plan("SELECT FIRST(usage_idle) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, first:Float64;N]
@ -4010,19 +4035,28 @@ mod test {
Aggregate: groupBy=[[]], aggr=[[selector_first(cpu.usage_idle, cpu.time)]] [selector_first(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
#[test]
fn test_selectors_query_other_other_selectors_2() {
assert_snapshot!(plan("SELECT MAX(usage_idle) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, max:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, (selector_max(cpu.usage_idle,cpu.time))[time] AS time, (selector_max(cpu.usage_idle,cpu.time))[value] AS max [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, max:Float64;N]
Aggregate: groupBy=[[]], aggr=[[selector_max(cpu.usage_idle, cpu.time)]] [selector_max(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
#[test]
fn test_selectors_query_other_other_selectors_3() {
assert_snapshot!(plan("SELECT MIN(usage_idle) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, min:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, (selector_min(cpu.usage_idle,cpu.time))[time] AS time, (selector_min(cpu.usage_idle,cpu.time))[value] AS min [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, min:Float64;N]
Aggregate: groupBy=[[]], aggr=[[selector_min(cpu.usage_idle, cpu.time)]] [selector_min(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
#[test]
fn test_selectors_invalid_arguments_3() {
// Invalid number of arguments
assert_snapshot!(plan("SELECT MIN(usage_idle, usage_idle) FROM cpu"), @"Error during planning: invalid number of arguments for min, expected 1, got 2");
}
@ -4671,7 +4705,7 @@ mod test {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Filter: data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
@ -4684,7 +4718,7 @@ mod test {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data WHERE time < '2022-10-31T02:02:00Z' GROUP BY TIME(10s)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1667181719999999999, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1667181719999999999, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Filter: data.time <= TimestampNanosecond(1667181719999999999, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
@ -4697,7 +4731,7 @@ mod test {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data WHERE time >= '2022-10-31T02:00:00Z' GROUP BY TIME(10s)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Included(TimestampNanosecond(1667181600000000000, None))..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Included(Literal(TimestampNanosecond(1667181600000000000, None)))..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Filter: data.time >= TimestampNanosecond(1667181600000000000, None) AND data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
@ -4710,7 +4744,7 @@ mod test {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(10s)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Included(TimestampNanosecond(1667181600000000000, None))..Included(TimestampNanosecond(1667181719999999999, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Included(Literal(TimestampNanosecond(1667181600000000000, None)))..Included(Literal(TimestampNanosecond(1667181719999999999, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Filter: data.time >= TimestampNanosecond(1667181600000000000, None) AND data.time <= TimestampNanosecond(1667181719999999999, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
@ -4722,7 +4756,7 @@ mod test {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Filter: data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
@ -4734,7 +4768,7 @@ mod test {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(null)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Filter: data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
@ -4746,7 +4780,7 @@ mod test {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(previous)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[LOCF(COUNT(data.f64_field))]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
GapFill: groupBy=[time], aggr=[[LOCF(COUNT(data.f64_field))]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Filter: data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
@ -4758,7 +4792,7 @@ mod test {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(0)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, coalesce_struct(COUNT(data.f64_field), Int64(0)) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Filter: data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
@ -4770,7 +4804,7 @@ mod test {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(linear)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[INTERPOLATE(COUNT(data.f64_field))]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
GapFill: groupBy=[time], aggr=[[INTERPOLATE(COUNT(data.f64_field))]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Filter: data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
@ -4783,7 +4817,7 @@ mod test {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(3.2)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, coalesce_struct(COUNT(data.f64_field), Int64(3)) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Filter: data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
@ -4796,7 +4830,7 @@ mod test {
assert_snapshot!(plan("SELECT COUNT(f64_field) + MEAN(f64_field) FROM data GROUP BY TIME(10s) FILL(3.2)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count_mean:Float64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, coalesce_struct(COUNT(data.f64_field), Int64(3)) + coalesce_struct(AVG(data.f64_field), Float64(3.2)) AS count_mean [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count_mean:Float64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field), AVG(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N, AVG(data.f64_field):Float64;N]
GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field), AVG(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N, AVG(data.f64_field):Float64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field), AVG(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N, AVG(data.f64_field):Float64;N]
Filter: data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]

View File

@ -3,7 +3,7 @@ use arrow::array::{Array, ArrayRef, Int64Array};
use arrow::datatypes::{DataType, TimeUnit};
use datafusion::common::{downcast_value, DataFusionError, Result, ScalarValue};
use datafusion::logical_expr::{
Accumulator, AccumulatorFunctionImplementation, AggregateUDF, ReturnTypeFunction, Signature,
Accumulator, AccumulatorFactoryFunction, AggregateUDF, ReturnTypeFunction, Signature,
StateTypeFunction, TypeSignature, Volatility,
};
use once_cell::sync::Lazy;
@ -20,7 +20,7 @@ pub(crate) const MOVING_AVERAGE_NAME: &str = "moving_average";
/// Definition of the `MOVING_AVERAGE` user-defined aggregate function.
pub(crate) static MOVING_AVERAGE: Lazy<Arc<AggregateUDF>> = Lazy::new(|| {
let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64)));
let accumulator: AccumulatorFunctionImplementation =
let accumulator: AccumulatorFactoryFunction =
Arc::new(|_| Ok(Box::new(AvgNAccumulator::new(&DataType::Float64))));
let state_type: StateTypeFunction = Arc::new(|_| Ok(Arc::new(vec![])));
Arc::new(AggregateUDF::new(
@ -159,7 +159,7 @@ pub(crate) const DIFFERENCE_NAME: &str = "difference";
/// Definition of the `DIFFERENCE` user-defined aggregate function.
pub(crate) static DIFFERENCE: Lazy<Arc<AggregateUDF>> = Lazy::new(|| {
let return_type: ReturnTypeFunction = Arc::new(|dt| Ok(Arc::new(dt[0].clone())));
let accumulator: AccumulatorFunctionImplementation =
let accumulator: AccumulatorFactoryFunction =
Arc::new(|dt| Ok(Box::new(DifferenceAccumulator::new(dt))));
let state_type: StateTypeFunction = Arc::new(|_| Ok(Arc::new(vec![])));
Arc::new(AggregateUDF::new(
@ -248,7 +248,7 @@ pub(crate) const NON_NEGATIVE_DIFFERENCE_NAME: &str = "non_negative_difference";
/// Definition of the `NON_NEGATIVE_DIFFERENCE` user-defined aggregate function.
pub(crate) static NON_NEGATIVE_DIFFERENCE: Lazy<Arc<AggregateUDF>> = Lazy::new(|| {
let return_type: ReturnTypeFunction = Arc::new(|dt| Ok(Arc::new(dt[0].clone())));
let accumulator: AccumulatorFunctionImplementation = Arc::new(|dt| {
let accumulator: AccumulatorFactoryFunction = Arc::new(|dt| {
Ok(Box::new(NonNegative::<_>::new(DifferenceAccumulator::new(
dt,
))))
@ -314,7 +314,7 @@ pub(crate) const DERIVATIVE_NAME: &str = "derivative";
pub(crate) fn derivative_udf(unit: i64) -> AggregateUDF {
let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64)));
let accumulator: AccumulatorFunctionImplementation =
let accumulator: AccumulatorFactoryFunction =
Arc::new(move |_| Ok(Box::new(DerivativeAccumulator::new(unit))));
let state_type: StateTypeFunction = Arc::new(|_| Ok(Arc::new(vec![])));
let sig = Signature::one_of(
@ -344,7 +344,7 @@ pub(crate) const NON_NEGATIVE_DERIVATIVE_NAME: &str = "non_negative_derivative";
pub(crate) fn non_negative_derivative_udf(unit: i64) -> AggregateUDF {
let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64)));
let accumulator: AccumulatorFunctionImplementation = Arc::new(move |_| {
let accumulator: AccumulatorFactoryFunction = Arc::new(move |_| {
Ok(Box::new(NonNegative::<_>::new(DerivativeAccumulator::new(
unit,
))))

View File

@ -3,7 +3,7 @@ use std::{collections::HashSet, sync::Arc};
use datafusion::{
common::{DataFusionError, Result as DataFusionResult},
execution::FunctionRegistry,
logical_expr::{AggregateUDF, ScalarUDF},
logical_expr::{AggregateUDF, ScalarUDF, WindowUDF},
};
use once_cell::sync::Lazy;
@ -55,6 +55,12 @@ impl FunctionRegistry for IOxFunctionRegistry {
"IOx FunctionRegistry does not contain user defined aggregate function '{name}'"
)))
}
fn udwf(&self, name: &str) -> DataFusionResult<Arc<WindowUDF>> {
Err(DataFusionError::Plan(format!(
"IOx FunctionRegistry does not contain user defined window function '{name}'"
)))
}
}
/// Return a reference to the global function registry

View File

@ -100,7 +100,7 @@ use std::{fmt::Debug, sync::Arc};
use arrow::datatypes::DataType;
use datafusion::{
error::Result as DataFusionResult,
logical_expr::{AccumulatorFunctionImplementation, Signature, Volatility},
logical_expr::{AccumulatorFactoryFunction, Signature, Volatility},
physical_plan::{udaf::AggregateUDF, Accumulator},
prelude::SessionContext,
};
@ -218,7 +218,7 @@ impl FactoryBuilder {
}
/// Returns a function that instantiates the accumulator, consuming self
fn build_accumulator_factory(self) -> AccumulatorFunctionImplementation {
fn build_accumulator_factory(self) -> AccumulatorFactoryFunction {
let Self { selector_type } = self;
Arc::new(move |return_type| {

View File

@ -30,9 +30,9 @@ bytes = { version = "1" }
chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] }
crossbeam-utils = { version = "0.8" }
crypto-common = { version = "0.1", default-features = false, features = ["std"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "e0330d6c957c724fcc91b673c6ae10c535d9a33a" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "e0330d6c957c724fcc91b673c6ae10c535d9a33a", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "e0330d6c957c724fcc91b673c6ae10c535d9a33a", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46752163bd4f30f778850160513e8ca7f15fcf14" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46752163bd4f30f778850160513e8ca7f15fcf14", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46752163bd4f30f778850160513e8ca7f15fcf14", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] }
digest = { version = "0.10", features = ["mac", "std"] }
either = { version = "1" }
fixedbitset = { version = "0.4" }
@ -78,6 +78,7 @@ serde_json = { version = "1", features = ["raw_value"] }
sha2 = { version = "0.10" }
similar = { version = "2", features = ["inline"] }
smallvec = { version = "1", default-features = false, features = ["union"] }
sqlparser = { version = "0.35", features = ["visitor"] }
sqlx = { version = "0.6", features = ["json", "postgres", "runtime-tokio-rustls", "sqlite", "tls", "uuid"] }
sqlx-core = { version = "0.6", default-features = false, features = ["any", "migrate", "postgres", "runtime-tokio-rustls", "sqlite", "uuid"] }
thrift = { version = "0.17" }