diff --git a/Cargo.lock b/Cargo.lock index 15f7c57759..8fa9abeece 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1343,8 +1343,8 @@ dependencies = [ [[package]] name = "datafusion" -version = "26.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a" +version = "27.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" dependencies = [ "ahash 0.8.3", "arrow", @@ -1367,8 +1367,8 @@ dependencies = [ "futures", "glob", "hashbrown 0.14.0", - "indexmap 1.9.3", - "itertools 0.10.5", + "indexmap 2.0.0", + "itertools 0.11.0", "lazy_static", "log", "num_cpus", @@ -1379,7 +1379,7 @@ dependencies = [ "pin-project-lite", "rand", "smallvec", - "sqlparser 0.34.0", + "sqlparser", "tempfile", "tokio", "tokio-util", @@ -1391,8 +1391,8 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "26.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a" +version = "27.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" dependencies = [ "arrow", "arrow-array", @@ -1400,13 +1400,13 @@ dependencies = [ "num_cpus", "object_store", "parquet", - "sqlparser 0.34.0", + "sqlparser", ] [[package]] name = "datafusion-execution" -version = "26.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a" +version = "27.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" dependencies = [ "dashmap", "datafusion-common", @@ -1422,22 +1422,22 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "26.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a" +version = "27.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" dependencies = [ "ahash 0.8.3", "arrow", "datafusion-common", "lazy_static", - "sqlparser 0.34.0", + "sqlparser", "strum 0.25.0", "strum_macros 0.25.0", ] [[package]] name = "datafusion-optimizer" -version = "26.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a" +version = "27.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" dependencies = [ "arrow", "async-trait", @@ -1446,15 +1446,15 @@ dependencies = [ "datafusion-expr", "datafusion-physical-expr", "hashbrown 0.14.0", - "itertools 0.10.5", + "itertools 0.11.0", "log", "regex-syntax 0.7.3", ] [[package]] name = "datafusion-physical-expr" -version = "26.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a" +version = "27.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" dependencies = [ "ahash 0.8.3", "arrow", @@ -1469,8 +1469,8 @@ dependencies = [ "datafusion-row", "half 2.3.1", "hashbrown 0.14.0", - "indexmap 1.9.3", - "itertools 0.10.5", + "indexmap 2.0.0", + "itertools 0.11.0", "lazy_static", "libc", "md-5", @@ -1485,8 +1485,8 @@ dependencies = [ [[package]] name = "datafusion-proto" -version = "26.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a" +version = "27.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" dependencies = [ "arrow", "chrono", @@ -1499,8 +1499,8 @@ dependencies = [ [[package]] name = "datafusion-row" -version = "26.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a" +version = "27.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" dependencies = [ "arrow", "datafusion-common", @@ -1510,15 +1510,15 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "26.0.0" -source = "git+https://github.com/apache/arrow-datafusion.git?rev=e0330d6c957c724fcc91b673c6ae10c535d9a33a#e0330d6c957c724fcc91b673c6ae10c535d9a33a" +version = "27.0.0" +source = "git+https://github.com/apache/arrow-datafusion.git?rev=46752163bd4f30f778850160513e8ca7f15fcf14#46752163bd4f30f778850160513e8ca7f15fcf14" dependencies = [ "arrow", "arrow-schema", "datafusion-common", "datafusion-expr", "log", - "sqlparser 0.34.0", + "sqlparser", ] [[package]] @@ -2582,7 +2582,7 @@ version = "0.1.0" dependencies = [ "generated_types", "snafu", - "sqlparser 0.35.0", + "sqlparser", "workspace-hack", ] @@ -4171,7 +4171,7 @@ dependencies = [ "query_functions", "schema", "snafu", - "sqlparser 0.35.0", + "sqlparser", "test_helpers", "workspace-hack", ] @@ -5235,16 +5235,6 @@ dependencies = [ "unicode_categories", ] -[[package]] -name = "sqlparser" -version = "0.34.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37d3706eefb17039056234df6b566b0014f303f867f2656108334a55b8096f59" -dependencies = [ - "log", - "sqlparser_derive", -] - [[package]] name = "sqlparser" version = "0.35.0" @@ -5252,6 +5242,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca597d77c98894be1f965f2e4e2d2a61575d4998088e655476c73715c54b2b43" dependencies = [ "log", + "sqlparser_derive", ] [[package]] @@ -6729,6 +6720,7 @@ dependencies = [ "sha2", "similar", "smallvec", + "sqlparser", "sqlx", "sqlx-core", "sqlx-macros", diff --git a/Cargo.toml b/Cargo.toml index 11eb30392f..1c6a8e3c82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,8 +120,8 @@ license = "MIT OR Apache-2.0" [workspace.dependencies] arrow = { version = "42.0.0" } arrow-flight = { version = "42.0.0" } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "e0330d6c957c724fcc91b673c6ae10c535d9a33a", default-features = false } -datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "e0330d6c957c724fcc91b673c6ae10c535d9a33a" } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46752163bd4f30f778850160513e8ca7f15fcf14", default-features = false } +datafusion-proto = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46752163bd4f30f778850160513e8ca7f15fcf14" } hashbrown = { version = "0.14.0" } object_store = { version = "0.6.0" } parquet = { version = "42.0.0" } diff --git a/datafusion_util/src/lib.rs b/datafusion_util/src/lib.rs index d65f7da229..b62cf3b800 100644 --- a/datafusion_util/src/lib.rs +++ b/datafusion_util/src/lib.rs @@ -412,7 +412,7 @@ mod tests { let ts_predicate_expr = make_range_expr(101, 202, "time"); let expected_string = "TimestampNanosecond(101, None) <= time AND time < TimestampNanosecond(202, None)"; - let actual_string = format!("{ts_predicate_expr:?}"); + let actual_string = format!("{ts_predicate_expr}"); assert_eq!(actual_string, expected_string); } diff --git a/influxdb_iox/tests/query_tests/cases/in/date_bin.sql.expected b/influxdb_iox/tests/query_tests/cases/in/date_bin.sql.expected index dcbde0c76f..8b00d17ba1 100644 --- a/influxdb_iox/tests/query_tests/cases/in/date_bin.sql.expected +++ b/influxdb_iox/tests/query_tests/cases/in/date_bin.sql.expected @@ -109,7 +109,7 @@ | physical_plan | ProjectionExec: expr=[date_bin(Utf8("1 month"),cpu.time,Utf8("1970-12-31T00:15:00Z"))@0 as month, COUNT(cpu.user)@1 as COUNT(cpu.user)] | | | AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("1 month"),cpu.time,Utf8("1970-12-31T00:15:00Z"))@0 as date_bin(Utf8("1 month"),cpu.time,Utf8("1970-12-31T00:15:00Z"))], aggr=[COUNT(cpu.user)] | | | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "date_bin(Utf8(\"1 month\"),cpu.time,Utf8(\"1970-12-31T00:15:00Z\"))", index: 0 }], 4), input_partitions=4 | +| | RepartitionExec: partitioning=Hash([date_bin(Utf8("1 month"),cpu.time,Utf8("1970-12-31T00:15:00Z"))@0], 4), input_partitions=4 | | | AggregateExec: mode=Partial, gby=[date_bin(79228162514264337593543950336, time@0, 31450500000000000) as date_bin(Utf8("1 month"),cpu.time,Utf8("1970-12-31T00:15:00Z"))], aggr=[COUNT(cpu.user)] | | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | CoalesceBatchesExec: target_batch_size=8192 | diff --git a/influxdb_iox/tests/query_tests/cases/in/gapfill.sql.expected b/influxdb_iox/tests/query_tests/cases/in/gapfill.sql.expected index 15a8e4e257..84e628f08b 100644 --- a/influxdb_iox/tests/query_tests/cases/in/gapfill.sql.expected +++ b/influxdb_iox/tests/query_tests/cases/in/gapfill.sql.expected @@ -23,7 +23,7 @@ | plan_type | plan | ---------- | logical_plan | Projection: date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time) AS minute, COUNT(cpu.user) | -| | GapFill: groupBy=[[date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)]], aggr=[[COUNT(cpu.user)]], time_column=date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time), stride=IntervalMonthDayNano("600000000000"), range=Included(TimestampNanosecond(957528000000000000, None))..Included(TimestampNanosecond(957531540000000000, None)) | +| | GapFill: groupBy=[date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)], aggr=[[COUNT(cpu.user)]], time_column=date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time), stride=IntervalMonthDayNano("600000000000"), range=Included(Literal(TimestampNanosecond(957528000000000000, None)))..Included(Literal(TimestampNanosecond(957531540000000000, None))) | | | Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("600000000000"), cpu.time) AS date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)]], aggr=[[COUNT(cpu.user)]] | | | TableScan: cpu projection=[time, user], full_filters=[cpu.time >= TimestampNanosecond(957528000000000000, None), cpu.time <= TimestampNanosecond(957531540000000000, None)] | | physical_plan | ProjectionExec: expr=[date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@0 as minute, COUNT(cpu.user)@1 as COUNT(cpu.user)] | @@ -32,7 +32,7 @@ | | SortExec: expr=[date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@0 ASC] | | | AggregateExec: mode=FinalPartitioned, gby=[date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@0 as date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)], aggr=[COUNT(cpu.user)] | | | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "date_bin_gapfill(IntervalMonthDayNano(\"600000000000\"),cpu.time)", index: 0 }], 4), input_partitions=4 | +| | RepartitionExec: partitioning=Hash([date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@0], 4), input_partitions=4 | | | AggregateExec: mode=Partial, gby=[date_bin(600000000000, time@0) as date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)], aggr=[COUNT(cpu.user)] | | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | | | CoalesceBatchesExec: target_batch_size=8192 | @@ -116,7 +116,7 @@ Error during planning: gap-filling query is missing lower time bound | plan_type | plan | ---------- | logical_plan | Projection: cpu.region, date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time) AS minute, AVG(cpu.user) AS locf(AVG(cpu.user)) | -| | GapFill: groupBy=[[cpu.region, date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)]], aggr=[[LOCF(AVG(cpu.user))]], time_column=date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time), stride=IntervalMonthDayNano("600000000000"), range=Included(TimestampNanosecond(957528000000000000, None))..Included(TimestampNanosecond(957531540000000000, None)) | +| | GapFill: groupBy=[cpu.region, date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)], aggr=[[LOCF(AVG(cpu.user))]], time_column=date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time), stride=IntervalMonthDayNano("600000000000"), range=Included(Literal(TimestampNanosecond(957528000000000000, None)))..Included(Literal(TimestampNanosecond(957531540000000000, None))) | | | Aggregate: groupBy=[[cpu.region, date_bin(IntervalMonthDayNano("600000000000"), cpu.time) AS date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)]], aggr=[[AVG(cpu.user)]] | | | TableScan: cpu projection=[region, time, user], full_filters=[cpu.time >= TimestampNanosecond(957528000000000000, None), cpu.time <= TimestampNanosecond(957531540000000000, None)] | | physical_plan | ProjectionExec: expr=[region@0 as region, date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@1 as minute, AVG(cpu.user)@2 as locf(AVG(cpu.user))] | @@ -125,7 +125,7 @@ Error during planning: gap-filling query is missing lower time bound | | SortExec: expr=[region@0 ASC,date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@1 ASC] | | | AggregateExec: mode=FinalPartitioned, gby=[region@0 as region, date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@1 as date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)], aggr=[AVG(cpu.user)], ordering_mode=PartiallyOrdered | | | CoalesceBatchesExec: target_batch_size=8192 | -| | RepartitionExec: partitioning=Hash([Column { name: "region", index: 0 }, Column { name: "date_bin_gapfill(IntervalMonthDayNano(\"600000000000\"),cpu.time)", index: 1 }], 4), input_partitions=1 | +| | RepartitionExec: partitioning=Hash([region@0, date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)@1], 4), input_partitions=1 | | | AggregateExec: mode=Partial, gby=[region@0 as region, date_bin(600000000000, time@1) as date_bin_gapfill(IntervalMonthDayNano("600000000000"),cpu.time)], aggr=[AVG(cpu.user)], ordering_mode=PartiallyOrdered | | | CoalesceBatchesExec: target_batch_size=8192 | | | FilterExec: time@1 >= 957528000000000000 AND time@1 <= 957531540000000000 | diff --git a/influxdb_iox/tests/query_tests/cases/in/issue_6112.influxql.expected b/influxdb_iox/tests/query_tests/cases/in/issue_6112.influxql.expected index f20e8397ec..9a5eff5cd6 100644 --- a/influxdb_iox/tests/query_tests/cases/in/issue_6112.influxql.expected +++ b/influxdb_iox/tests/query_tests/cases/in/issue_6112.influxql.expected @@ -933,7 +933,7 @@ name: physical_plan ProjectionExec: expr=[m0 as iox::measurement, 0 as time, tag0@0 as tag0, COUNT(m0.f64)@1 as count, SUM(m0.f64)@2 as sum, STDDEV(m0.f64)@3 as stddev] AggregateExec: mode=FinalPartitioned, gby=[tag0@0 as tag0], aggr=[COUNT(m0.f64), SUM(m0.f64), STDDEV(m0.f64)] CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=Hash([Column { name: "tag0", index: 0 }], 4), input_partitions=4 + RepartitionExec: partitioning=Hash([tag0@0], 4), input_partitions=4 RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 AggregateExec: mode=Partial, gby=[tag0@1 as tag0], aggr=[COUNT(m0.f64), SUM(m0.f64), STDDEV(m0.f64)] ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet]]}, projection=[f64, tag0] @@ -942,7 +942,7 @@ name: physical_plan RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=4 AggregateExec: mode=FinalPartitioned, gby=[tag0@0 as tag0], aggr=[COUNT(m1.f64), SUM(m1.f64), STDDEV(m1.f64)], ordering_mode=FullyOrdered CoalesceBatchesExec: target_batch_size=8192 - RepartitionExec: partitioning=Hash([Column { name: "tag0", index: 0 }], 4), input_partitions=1 + RepartitionExec: partitioning=Hash([tag0@0], 4), input_partitions=1 AggregateExec: mode=Partial, gby=[tag0@1 as tag0], aggr=[COUNT(m1.f64), SUM(m1.f64), STDDEV(m1.f64)], ordering_mode=FullyOrdered ParquetExec: file_groups={1 group: [[1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, projection=[f64, tag0], output_ordering=[tag0@1 ASC] -- InfluxQL: SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0, m1 GROUP BY tag0; diff --git a/iox_query/src/exec/gapfill/mod.rs b/iox_query/src/exec/gapfill/mod.rs index d1bf380712..95f806c74b 100644 --- a/iox_query/src/exec/gapfill/mod.rs +++ b/iox_query/src/exec/gapfill/mod.rs @@ -223,12 +223,18 @@ impl UserDefinedLogicalNodeCore for GapFill { }) .collect::>() .join(", "); + + let group_expr = self + .group_expr + .iter() + .map(|e| e.to_string()) + .collect::>() + .join(", "); + write!( f, - "{}: groupBy=[{:?}], aggr=[[{}]], time_column={}, stride={}, range={:?}", + "{}: groupBy=[{group_expr}], aggr=[[{aggr_expr}]], time_column={}, stride={}, range={:?}", self.name(), - self.group_expr, - aggr_expr, self.params.time_column, self.params.stride, self.params.time_range, @@ -740,7 +746,7 @@ mod test { format_logical_plan(&plan), @r###" --- - - " GapFill: groupBy=[[loc, time]], aggr=[[temp]], time_column=time, stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))" + - " GapFill: groupBy=[loc, time], aggr=[[temp]], time_column=time, stride=IntervalDayTime(\"60000\"), range=Included(Literal(TimestampNanosecond(1000, None)))..Excluded(Literal(TimestampNanosecond(2000, None)))" - " TableScan: temps" "### ); diff --git a/iox_query/src/logical_optimizer/handle_gapfill.rs b/iox_query/src/logical_optimizer/handle_gapfill.rs index 1973877b3a..5bb719cf2f 100644 --- a/iox_query/src/logical_optimizer/handle_gapfill.rs +++ b/iox_query/src/logical_optimizer/handle_gapfill.rs @@ -859,7 +859,7 @@ mod test { format_optimized_plan(&plan)?, @r###" --- - - "GapFill: groupBy=[[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))" + - "GapFill: groupBy=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(Literal(TimestampNanosecond(1000, None)))..Excluded(Literal(TimestampNanosecond(2000, None)))" - " Aggregate: groupBy=[[date_bin(IntervalDayTime(\"60000\"), temps.time) AS date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[AVG(temps.temp)]]" - " Filter: temps.time >= TimestampNanosecond(1000, None) AND temps.time < TimestampNanosecond(2000, None)" - " TableScan: temps" @@ -889,7 +889,7 @@ mod test { format_optimized_plan(&plan)?, @r###" --- - - "GapFill: groupBy=[[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(7, None))]], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(7, None)), stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))" + - "GapFill: groupBy=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(7, None))], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(7, None)), stride=IntervalDayTime(\"60000\"), range=Included(Literal(TimestampNanosecond(1000, None)))..Excluded(Literal(TimestampNanosecond(2000, None)))" - " Aggregate: groupBy=[[date_bin(IntervalDayTime(\"60000\"), temps.time, TimestampNanosecond(7, None)) AS date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(7, None))]], aggr=[[AVG(temps.temp)]]" - " Filter: temps.time >= TimestampNanosecond(1000, None) AND temps.time < TimestampNanosecond(2000, None)" - " TableScan: temps" @@ -918,7 +918,7 @@ mod test { format_optimized_plan(&plan)?, @r###" --- - - "GapFill: groupBy=[[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), temps.loc]], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))" + - "GapFill: groupBy=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), temps.loc], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(Literal(TimestampNanosecond(1000, None)))..Excluded(Literal(TimestampNanosecond(2000, None)))" - " Aggregate: groupBy=[[date_bin(IntervalDayTime(\"60000\"), temps.time) AS date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), temps.loc]], aggr=[[AVG(temps.temp)]]" - " Filter: temps.time >= TimestampNanosecond(1000, None) AND temps.time < TimestampNanosecond(2000, None)" - " TableScan: temps" @@ -970,7 +970,7 @@ mod test { @r###" --- - "Projection: date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), AVG(temps.temp)" - - " GapFill: groupBy=[[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))" + - " GapFill: groupBy=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)], aggr=[[AVG(temps.temp)]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(Literal(TimestampNanosecond(1000, None)))..Excluded(Literal(TimestampNanosecond(2000, None)))" - " Aggregate: groupBy=[[date_bin(IntervalDayTime(\"60000\"), temps.time) AS date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[AVG(temps.temp)]]" - " Filter: temps.time >= TimestampNanosecond(1000, None) AND temps.time < TimestampNanosecond(2000, None)" - " TableScan: temps" @@ -1005,7 +1005,7 @@ mod test { @r###" --- - "Projection: date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), AVG(temps.temp) AS locf(AVG(temps.temp)), MIN(temps.temp) AS locf(MIN(temps.temp))" - - " GapFill: groupBy=[[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[LOCF(AVG(temps.temp)), LOCF(MIN(temps.temp))]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))" + - " GapFill: groupBy=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)], aggr=[[LOCF(AVG(temps.temp)), LOCF(MIN(temps.temp))]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(Literal(TimestampNanosecond(1000, None)))..Excluded(Literal(TimestampNanosecond(2000, None)))" - " Aggregate: groupBy=[[date_bin(IntervalDayTime(\"60000\"), temps.time) AS date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[AVG(temps.temp), MIN(temps.temp)]]" - " Filter: temps.time >= TimestampNanosecond(1000, None) AND temps.time < TimestampNanosecond(2000, None)" - " TableScan: temps" @@ -1039,7 +1039,7 @@ mod test { @r###" --- - "Projection: date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), MIN(temps.temp) AS locf(MIN(temps.temp)) AS locf_min_temp" - - " GapFill: groupBy=[[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[AVG(temps.temp), LOCF(MIN(temps.temp))]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))" + - " GapFill: groupBy=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)], aggr=[[AVG(temps.temp), LOCF(MIN(temps.temp))]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(Literal(TimestampNanosecond(1000, None)))..Excluded(Literal(TimestampNanosecond(2000, None)))" - " Aggregate: groupBy=[[date_bin(IntervalDayTime(\"60000\"), temps.time) AS date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[AVG(temps.temp), MIN(temps.temp)]]" - " Filter: temps.time >= TimestampNanosecond(1000, None) AND temps.time < TimestampNanosecond(2000, None)" - " TableScan: temps" @@ -1074,7 +1074,7 @@ mod test { @r###" --- - "Projection: date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), AVG(temps.temp) AS interpolate(AVG(temps.temp)), MIN(temps.temp) AS interpolate(MIN(temps.temp))" - - " GapFill: groupBy=[[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[INTERPOLATE(AVG(temps.temp)), INTERPOLATE(MIN(temps.temp))]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(TimestampNanosecond(1000, None))..Excluded(TimestampNanosecond(2000, None))" + - " GapFill: groupBy=[date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)], aggr=[[INTERPOLATE(AVG(temps.temp)), INTERPOLATE(MIN(temps.temp))]], time_column=date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time), stride=IntervalDayTime(\"60000\"), range=Included(Literal(TimestampNanosecond(1000, None)))..Excluded(Literal(TimestampNanosecond(2000, None)))" - " Aggregate: groupBy=[[date_bin(IntervalDayTime(\"60000\"), temps.time) AS date_bin_gapfill(IntervalDayTime(\"60000\"),temps.time)]], aggr=[[AVG(temps.temp), MIN(temps.temp)]]" - " Filter: temps.time >= TimestampNanosecond(1000, None) AND temps.time < TimestampNanosecond(2000, None)" - " TableScan: temps" diff --git a/iox_query_influxql/src/plan/planner.rs b/iox_query_influxql/src/plan/planner.rs index 24f5f7c42a..ca67d73072 100644 --- a/iox_query_influxql/src/plan/planner.rs +++ b/iox_query_influxql/src/plan/planner.rs @@ -3763,7 +3763,7 @@ mod test { Filter: NOT difference IS NULL [time:Timestamp(Nanosecond, None);N, difference:Float64;N] Projection: time, difference(AVG(cpu.usage_idle)) AS difference [time:Timestamp(Nanosecond, None);N, difference:Float64;N] WindowAggr: windowExpr=[[AggregateUDF { name: "difference", signature: Signature { type_signature: OneOf([Exact([Int64]), Exact([UInt64]), Exact([Float64])]), volatility: Immutable }, fun: "" }(AVG(cpu.usage_idle)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS difference(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, difference(AVG(cpu.usage_idle)):Float64;N] - GapFill: groupBy=[[time]], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] + GapFill: groupBy=[time], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] @@ -3789,7 +3789,7 @@ mod test { Filter: NOT non_negative_difference IS NULL [time:Timestamp(Nanosecond, None);N, non_negative_difference:Float64;N] Projection: time, non_negative_difference(AVG(cpu.usage_idle)) AS non_negative_difference [time:Timestamp(Nanosecond, None);N, non_negative_difference:Float64;N] WindowAggr: windowExpr=[[AggregateUDF { name: "non_negative_difference", signature: Signature { type_signature: OneOf([Exact([Int64]), Exact([UInt64]), Exact([Float64])]), volatility: Immutable }, fun: "" }(AVG(cpu.usage_idle)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS non_negative_difference(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, non_negative_difference(AVG(cpu.usage_idle)):Float64;N] - GapFill: groupBy=[[time]], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] + GapFill: groupBy=[time], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] @@ -3815,7 +3815,7 @@ mod test { Filter: NOT moving_average IS NULL [time:Timestamp(Nanosecond, None);N, moving_average:Float64;N] Projection: time, moving_average(AVG(cpu.usage_idle),Int64(3)) AS moving_average [time:Timestamp(Nanosecond, None);N, moving_average:Float64;N] WindowAggr: windowExpr=[[AggregateUDF { name: "moving_average", signature: Signature { type_signature: OneOf([Exact([Int64, Int64]), Exact([UInt64, Int64]), Exact([Float64, Int64])]), volatility: Immutable }, fun: "" }(AVG(cpu.usage_idle), Int64(3)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS moving_average(AVG(cpu.usage_idle),Int64(3))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, moving_average(AVG(cpu.usage_idle),Int64(3)):Float64;N] - GapFill: groupBy=[[time]], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] + GapFill: groupBy=[time], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] @@ -3844,7 +3844,7 @@ mod test { Filter: NOT derivative IS NULL [time:Timestamp(Nanosecond, None);N, derivative:Float64;N] Projection: time, derivative(AVG(cpu.usage_idle)) AS derivative [time:Timestamp(Nanosecond, None);N, derivative:Float64;N] WindowAggr: windowExpr=[[AggregateUDF { name: "derivative(unit: 10000000000)", signature: Signature { type_signature: OneOf([Exact([Timestamp(Nanosecond, None), Int64]), Exact([Timestamp(Nanosecond, None), UInt64]), Exact([Timestamp(Nanosecond, None), Float64])]), volatility: Immutable }, fun: "" }(time, AVG(cpu.usage_idle)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS derivative(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, derivative(AVG(cpu.usage_idle)):Float64;N] - GapFill: groupBy=[[time]], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] + GapFill: groupBy=[time], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] @@ -3870,7 +3870,7 @@ mod test { Filter: NOT non_negative_derivative IS NULL [time:Timestamp(Nanosecond, None);N, non_negative_derivative:Float64;N] Projection: time, non_negative_derivative(AVG(cpu.usage_idle)) AS non_negative_derivative [time:Timestamp(Nanosecond, None);N, non_negative_derivative:Float64;N] WindowAggr: windowExpr=[[AggregateUDF { name: "non_negative_derivative(unit: 10000000000)", signature: Signature { type_signature: OneOf([Exact([Timestamp(Nanosecond, None), Int64]), Exact([Timestamp(Nanosecond, None), UInt64]), Exact([Timestamp(Nanosecond, None), Float64])]), volatility: Immutable }, fun: "" }(time, AVG(cpu.usage_idle)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS non_negative_derivative(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, non_negative_derivative(AVG(cpu.usage_idle)):Float64;N] - GapFill: groupBy=[[time]], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] + GapFill: groupBy=[time], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] @@ -3923,7 +3923,7 @@ mod test { use super::*; #[test] - fn test_selectors() { + fn test_selectors_query() { // single-selector query assert_snapshot!(plan("SELECT LAST(usage_idle) FROM cpu"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N] @@ -3931,6 +3931,9 @@ mod test { Aggregate: groupBy=[[]], aggr=[[selector_last(cpu.usage_idle, cpu.time)]] [selector_last(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); + } + #[test] + fn test_selectors_query_grouping() { // single-selector, grouping by tags assert_snapshot!(plan("SELECT LAST(usage_idle) FROM cpu GROUP BY cpu"), @r###" Sort: cpu ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, cpu:Dictionary(Int32, Utf8);N, last:Float64;N] @@ -3938,27 +3941,33 @@ mod test { Aggregate: groupBy=[[cpu.cpu]], aggr=[[selector_last(cpu.usage_idle, cpu.time)]] [cpu:Dictionary(Int32, Utf8);N, selector_last(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); - + } + #[test] + fn test_selectors_aggregate_gby_time() { // aggregate query, as we're grouping by time assert_snapshot!(plan("SELECT LAST(usage_idle) FROM cpu GROUP BY TIME(5s)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N] Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, (selector_last(cpu.usage_idle,cpu.time))[value] AS last [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N] - GapFill: groupBy=[[time]], aggr=[[selector_last(cpu.usage_idle,cpu.time)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] + GapFill: groupBy=[time], aggr=[[selector_last(cpu.usage_idle,cpu.time)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("5000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[selector_last(cpu.usage_idle, cpu.time)]] [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); - + } + #[test] + fn test_selectors_aggregate_gby_time_gapfill() { // aggregate query, grouping by time with gap filling assert_snapshot!(plan("SELECT FIRST(usage_idle) FROM cpu GROUP BY TIME(5s) FILL(0)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, first:Float64;N] Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, (coalesce_struct(selector_first(cpu.usage_idle,cpu.time), Struct({value:Float64(0),time:TimestampNanosecond(0, None)})))[value] AS first [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, first:Float64;N] - GapFill: groupBy=[[time]], aggr=[[selector_first(cpu.usage_idle,cpu.time)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, selector_first(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] + GapFill: groupBy=[time], aggr=[[selector_first(cpu.usage_idle,cpu.time)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, selector_first(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("5000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[selector_first(cpu.usage_idle, cpu.time)]] [time:Timestamp(Nanosecond, None);N, selector_first(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); - + } + #[test] + fn test_selectors_aggregate_multi_selectors() { // aggregate query, as we're specifying multiple selectors or aggregates assert_snapshot!(plan("SELECT LAST(usage_idle), FIRST(usage_idle) FROM cpu"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), last:Float64;N, first:Float64;N] @@ -3966,13 +3975,18 @@ mod test { Aggregate: groupBy=[[]], aggr=[[selector_last(cpu.usage_idle, cpu.time), selector_first(cpu.usage_idle, cpu.time)]] [selector_last(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N, selector_first(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); + } + #[test] + fn test_selectors_and_aggregate() { assert_snapshot!(plan("SELECT LAST(usage_idle), COUNT(usage_idle) FROM cpu"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), last:Float64;N, count:Int64;N] Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, TimestampNanosecond(0, None) AS time, (selector_last(cpu.usage_idle,cpu.time))[value] AS last, COUNT(cpu.usage_idle) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), last:Float64;N, count:Int64;N] Aggregate: groupBy=[[]], aggr=[[selector_last(cpu.usage_idle, cpu.time), COUNT(cpu.usage_idle)]] [selector_last(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N, COUNT(cpu.usage_idle):Int64;N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); - + } + #[test] + fn test_selectors_additional_fields() { // additional fields assert_snapshot!(plan("SELECT LAST(usage_idle), usage_system FROM cpu"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N, usage_system:Float64;N] @@ -3980,29 +3994,40 @@ mod test { Aggregate: groupBy=[[]], aggr=[[selector_last(cpu.usage_idle, cpu.time, cpu.usage_system)]] [selector_last(cpu.usage_idle,cpu.time,cpu.usage_system):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "other_1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); + } + #[test] + fn test_selectors_additional_fields_2() { assert_snapshot!(plan("SELECT LAST(usage_idle), usage_system FROM cpu GROUP BY TIME(5s)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N, usage_system:Float64;N] Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, (selector_last(cpu.usage_idle,cpu.time,cpu.usage_system))[value] AS last, (selector_last(cpu.usage_idle,cpu.time,cpu.usage_system))[other_1] AS usage_system [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N, usage_system:Float64;N] - GapFill: groupBy=[[time]], aggr=[[selector_last(cpu.usage_idle,cpu.time,cpu.usage_system)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time,cpu.usage_system):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "other_1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] + GapFill: groupBy=[time], aggr=[[selector_last(cpu.usage_idle,cpu.time,cpu.usage_system)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time,cpu.usage_system):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "other_1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("5000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[selector_last(cpu.usage_idle, cpu.time, cpu.usage_system)]] [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time,cpu.usage_system):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "other_1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); + } + #[test] + fn test_selectors_additional_fields_3() { assert_snapshot!(plan("SELECT LAST(usage_idle), usage_system FROM cpu GROUP BY TIME(5s) FILL(0)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N, usage_system:Float64;N] Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, (coalesce_struct(selector_last(cpu.usage_idle,cpu.time,cpu.usage_system), Struct({value:Float64(0),time:TimestampNanosecond(0, None),other_1:Float64(0)})))[value] AS last, (coalesce_struct(selector_last(cpu.usage_idle,cpu.time,cpu.usage_system), Struct({value:Float64(0),time:TimestampNanosecond(0, None),other_1:Float64(0)})))[other_1] AS usage_system [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, last:Float64;N, usage_system:Float64;N] - GapFill: groupBy=[[time]], aggr=[[selector_last(cpu.usage_idle,cpu.time,cpu.usage_system)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time,cpu.usage_system):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "other_1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] + GapFill: groupBy=[time], aggr=[[selector_last(cpu.usage_idle,cpu.time,cpu.usage_system)]], time_column=time, stride=IntervalMonthDayNano("5000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time,cpu.usage_system):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "other_1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("5000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[selector_last(cpu.usage_idle, cpu.time, cpu.usage_system)]] [time:Timestamp(Nanosecond, None);N, selector_last(cpu.usage_idle,cpu.time,cpu.usage_system):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "other_1", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); + } + #[test] + fn test_selectors_additional_fields_4() { assert_snapshot!(plan("SELECT FIRST(f), first FROM name_clash"), @r###" Sort: time ASC NULLS LAST, first_1 ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, first:Float64;N, first_1:Dictionary(Int32, Utf8);N] Projection: Dictionary(Int32, Utf8("name_clash")) AS iox::measurement, (selector_first(name_clash.f,name_clash.time,name_clash.first))[time] AS time, (selector_first(name_clash.f,name_clash.time,name_clash.first))[value] AS first, (selector_first(name_clash.f,name_clash.time,name_clash.first))[other_1] AS first_1 [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, first:Float64;N, first_1:Dictionary(Int32, Utf8);N] Aggregate: groupBy=[[]], aggr=[[selector_first(name_clash.f, name_clash.time, name_clash.first)]] [selector_first(name_clash.f,name_clash.time,name_clash.first):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "other_1", data_type: Dictionary(Int32, Utf8), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] TableScan: name_clash [f:Float64;N, first:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None)] "###); - + } + #[test] + fn test_selectors_query_other_other_selectors_1() { // Validate we can call the remaining supported selector functions assert_snapshot!(plan("SELECT FIRST(usage_idle) FROM cpu"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, first:Float64;N] @@ -4010,19 +4035,28 @@ mod test { Aggregate: groupBy=[[]], aggr=[[selector_first(cpu.usage_idle, cpu.time)]] [selector_first(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); + } + #[test] + fn test_selectors_query_other_other_selectors_2() { assert_snapshot!(plan("SELECT MAX(usage_idle) FROM cpu"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, max:Float64;N] Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, (selector_max(cpu.usage_idle,cpu.time))[time] AS time, (selector_max(cpu.usage_idle,cpu.time))[value] AS max [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, max:Float64;N] Aggregate: groupBy=[[]], aggr=[[selector_max(cpu.usage_idle, cpu.time)]] [selector_max(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); + } + #[test] + fn test_selectors_query_other_other_selectors_3() { assert_snapshot!(plan("SELECT MIN(usage_idle) FROM cpu"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, min:Float64;N] Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, (selector_min(cpu.usage_idle,cpu.time))[time] AS time, (selector_min(cpu.usage_idle,cpu.time))[value] AS min [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, min:Float64;N] Aggregate: groupBy=[[]], aggr=[[selector_min(cpu.usage_idle, cpu.time)]] [selector_min(cpu.usage_idle,cpu.time):Struct([Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "time", data_type: Timestamp(Nanosecond, None), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]);N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); + } + #[test] + fn test_selectors_invalid_arguments_3() { // Invalid number of arguments assert_snapshot!(plan("SELECT MIN(usage_idle, usage_idle) FROM cpu"), @"Error during planning: invalid number of arguments for min, expected 1, got 2"); } @@ -4671,7 +4705,7 @@ mod test { assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] - GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Filter: data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] @@ -4684,7 +4718,7 @@ mod test { assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data WHERE time < '2022-10-31T02:02:00Z' GROUP BY TIME(10s)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] - GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1667181719999999999, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1667181719999999999, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Filter: data.time <= TimestampNanosecond(1667181719999999999, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] @@ -4697,7 +4731,7 @@ mod test { assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data WHERE time >= '2022-10-31T02:00:00Z' GROUP BY TIME(10s)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] - GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Included(TimestampNanosecond(1667181600000000000, None))..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Included(Literal(TimestampNanosecond(1667181600000000000, None)))..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Filter: data.time >= TimestampNanosecond(1667181600000000000, None) AND data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] @@ -4710,7 +4744,7 @@ mod test { assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(10s)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] - GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Included(TimestampNanosecond(1667181600000000000, None))..Included(TimestampNanosecond(1667181719999999999, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Included(Literal(TimestampNanosecond(1667181600000000000, None)))..Included(Literal(TimestampNanosecond(1667181719999999999, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Filter: data.time >= TimestampNanosecond(1667181600000000000, None) AND data.time <= TimestampNanosecond(1667181719999999999, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] @@ -4722,7 +4756,7 @@ mod test { assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] - GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Filter: data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] @@ -4734,7 +4768,7 @@ mod test { assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(null)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] - GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Filter: data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] @@ -4746,7 +4780,7 @@ mod test { assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(previous)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] - GapFill: groupBy=[[time]], aggr=[[LOCF(COUNT(data.f64_field))]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + GapFill: groupBy=[time], aggr=[[LOCF(COUNT(data.f64_field))]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Filter: data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] @@ -4758,7 +4792,7 @@ mod test { assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(0)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, coalesce_struct(COUNT(data.f64_field), Int64(0)) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] - GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Filter: data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] @@ -4770,7 +4804,7 @@ mod test { assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(linear)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] - GapFill: groupBy=[[time]], aggr=[[INTERPOLATE(COUNT(data.f64_field))]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + GapFill: groupBy=[time], aggr=[[INTERPOLATE(COUNT(data.f64_field))]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Filter: data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] @@ -4783,7 +4817,7 @@ mod test { assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(3.2)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, coalesce_struct(COUNT(data.f64_field), Int64(3)) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] - GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] Filter: data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] @@ -4796,7 +4830,7 @@ mod test { assert_snapshot!(plan("SELECT COUNT(f64_field) + MEAN(f64_field) FROM data GROUP BY TIME(10s) FILL(3.2)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count_mean:Float64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, coalesce_struct(COUNT(data.f64_field), Int64(3)) + coalesce_struct(AVG(data.f64_field), Float64(3.2)) AS count_mean [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count_mean:Float64;N] - GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field), AVG(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N, AVG(data.f64_field):Float64;N] + GapFill: groupBy=[time], aggr=[[COUNT(data.f64_field), AVG(data.f64_field)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N, AVG(data.f64_field):Float64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field), AVG(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N, AVG(data.f64_field):Float64;N] Filter: data.time <= TimestampNanosecond(1672531200000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] diff --git a/iox_query_influxql/src/plan/udaf.rs b/iox_query_influxql/src/plan/udaf.rs index 3b96f34a95..571c5f064b 100644 --- a/iox_query_influxql/src/plan/udaf.rs +++ b/iox_query_influxql/src/plan/udaf.rs @@ -3,7 +3,7 @@ use arrow::array::{Array, ArrayRef, Int64Array}; use arrow::datatypes::{DataType, TimeUnit}; use datafusion::common::{downcast_value, DataFusionError, Result, ScalarValue}; use datafusion::logical_expr::{ - Accumulator, AccumulatorFunctionImplementation, AggregateUDF, ReturnTypeFunction, Signature, + Accumulator, AccumulatorFactoryFunction, AggregateUDF, ReturnTypeFunction, Signature, StateTypeFunction, TypeSignature, Volatility, }; use once_cell::sync::Lazy; @@ -20,7 +20,7 @@ pub(crate) const MOVING_AVERAGE_NAME: &str = "moving_average"; /// Definition of the `MOVING_AVERAGE` user-defined aggregate function. pub(crate) static MOVING_AVERAGE: Lazy> = Lazy::new(|| { let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64))); - let accumulator: AccumulatorFunctionImplementation = + let accumulator: AccumulatorFactoryFunction = Arc::new(|_| Ok(Box::new(AvgNAccumulator::new(&DataType::Float64)))); let state_type: StateTypeFunction = Arc::new(|_| Ok(Arc::new(vec![]))); Arc::new(AggregateUDF::new( @@ -159,7 +159,7 @@ pub(crate) const DIFFERENCE_NAME: &str = "difference"; /// Definition of the `DIFFERENCE` user-defined aggregate function. pub(crate) static DIFFERENCE: Lazy> = Lazy::new(|| { let return_type: ReturnTypeFunction = Arc::new(|dt| Ok(Arc::new(dt[0].clone()))); - let accumulator: AccumulatorFunctionImplementation = + let accumulator: AccumulatorFactoryFunction = Arc::new(|dt| Ok(Box::new(DifferenceAccumulator::new(dt)))); let state_type: StateTypeFunction = Arc::new(|_| Ok(Arc::new(vec![]))); Arc::new(AggregateUDF::new( @@ -248,7 +248,7 @@ pub(crate) const NON_NEGATIVE_DIFFERENCE_NAME: &str = "non_negative_difference"; /// Definition of the `NON_NEGATIVE_DIFFERENCE` user-defined aggregate function. pub(crate) static NON_NEGATIVE_DIFFERENCE: Lazy> = Lazy::new(|| { let return_type: ReturnTypeFunction = Arc::new(|dt| Ok(Arc::new(dt[0].clone()))); - let accumulator: AccumulatorFunctionImplementation = Arc::new(|dt| { + let accumulator: AccumulatorFactoryFunction = Arc::new(|dt| { Ok(Box::new(NonNegative::<_>::new(DifferenceAccumulator::new( dt, )))) @@ -314,7 +314,7 @@ pub(crate) const DERIVATIVE_NAME: &str = "derivative"; pub(crate) fn derivative_udf(unit: i64) -> AggregateUDF { let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64))); - let accumulator: AccumulatorFunctionImplementation = + let accumulator: AccumulatorFactoryFunction = Arc::new(move |_| Ok(Box::new(DerivativeAccumulator::new(unit)))); let state_type: StateTypeFunction = Arc::new(|_| Ok(Arc::new(vec![]))); let sig = Signature::one_of( @@ -344,7 +344,7 @@ pub(crate) const NON_NEGATIVE_DERIVATIVE_NAME: &str = "non_negative_derivative"; pub(crate) fn non_negative_derivative_udf(unit: i64) -> AggregateUDF { let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64))); - let accumulator: AccumulatorFunctionImplementation = Arc::new(move |_| { + let accumulator: AccumulatorFactoryFunction = Arc::new(move |_| { Ok(Box::new(NonNegative::<_>::new(DerivativeAccumulator::new( unit, )))) diff --git a/query_functions/src/registry.rs b/query_functions/src/registry.rs index d04b5cf861..a4f920db1c 100644 --- a/query_functions/src/registry.rs +++ b/query_functions/src/registry.rs @@ -3,7 +3,7 @@ use std::{collections::HashSet, sync::Arc}; use datafusion::{ common::{DataFusionError, Result as DataFusionResult}, execution::FunctionRegistry, - logical_expr::{AggregateUDF, ScalarUDF}, + logical_expr::{AggregateUDF, ScalarUDF, WindowUDF}, }; use once_cell::sync::Lazy; @@ -55,6 +55,12 @@ impl FunctionRegistry for IOxFunctionRegistry { "IOx FunctionRegistry does not contain user defined aggregate function '{name}'" ))) } + + fn udwf(&self, name: &str) -> DataFusionResult> { + Err(DataFusionError::Plan(format!( + "IOx FunctionRegistry does not contain user defined window function '{name}'" + ))) + } } /// Return a reference to the global function registry diff --git a/query_functions/src/selectors.rs b/query_functions/src/selectors.rs index 736e116e92..a0dc07de8a 100644 --- a/query_functions/src/selectors.rs +++ b/query_functions/src/selectors.rs @@ -100,7 +100,7 @@ use std::{fmt::Debug, sync::Arc}; use arrow::datatypes::DataType; use datafusion::{ error::Result as DataFusionResult, - logical_expr::{AccumulatorFunctionImplementation, Signature, Volatility}, + logical_expr::{AccumulatorFactoryFunction, Signature, Volatility}, physical_plan::{udaf::AggregateUDF, Accumulator}, prelude::SessionContext, }; @@ -218,7 +218,7 @@ impl FactoryBuilder { } /// Returns a function that instantiates the accumulator, consuming self - fn build_accumulator_factory(self) -> AccumulatorFunctionImplementation { + fn build_accumulator_factory(self) -> AccumulatorFactoryFunction { let Self { selector_type } = self; Arc::new(move |return_type| { diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 018a55068e..90bdcbb109 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -30,9 +30,9 @@ bytes = { version = "1" } chrono = { version = "0.4", default-features = false, features = ["alloc", "clock", "serde"] } crossbeam-utils = { version = "0.8" } crypto-common = { version = "0.1", default-features = false, features = ["std"] } -datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "e0330d6c957c724fcc91b673c6ae10c535d9a33a" } -datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "e0330d6c957c724fcc91b673c6ae10c535d9a33a", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } -datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "e0330d6c957c724fcc91b673c6ae10c535d9a33a", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } +datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46752163bd4f30f778850160513e8ca7f15fcf14" } +datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46752163bd4f30f778850160513e8ca7f15fcf14", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } +datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "46752163bd4f30f778850160513e8ca7f15fcf14", default-features = false, features = ["crypto_expressions", "regex_expressions", "unicode_expressions"] } digest = { version = "0.10", features = ["mac", "std"] } either = { version = "1" } fixedbitset = { version = "0.4" } @@ -78,6 +78,7 @@ serde_json = { version = "1", features = ["raw_value"] } sha2 = { version = "0.10" } similar = { version = "2", features = ["inline"] } smallvec = { version = "1", default-features = false, features = ["union"] } +sqlparser = { version = "0.35", features = ["visitor"] } sqlx = { version = "0.6", features = ["json", "postgres", "runtime-tokio-rustls", "sqlite", "tls", "uuid"] } sqlx-core = { version = "0.6", default-features = false, features = ["any", "migrate", "postgres", "runtime-tokio-rustls", "sqlite", "uuid"] } thrift = { version = "0.17" }