diff --git a/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql b/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql index 23f78b8642..ef72030af9 100644 --- a/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql +++ b/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql @@ -289,7 +289,10 @@ SELECT usage_idle, bytes_free, device, cpu FROM cpu, disk GROUP BY device, cpu; -- Aggregate queries -- +SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0; +SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0, m1; SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY tag0; +SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0, m1 GROUP BY tag0; SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY tag0, non_existent; SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY non_existent; SELECT COUNT(f64), COUNT(f64) + COUNT(f64), COUNT(f64) * 3 FROM m0; @@ -298,6 +301,17 @@ SELECT COUNT(f64) as the_count, SUM(non_existent) as foo FROM m0; -- non-existent columns in an aggregate expression should evaluate to NULL SELECT COUNT(f64) as the_count, SUM(f64) + SUM(non_existent) as foo FROM m0; +-- measurements with different schema +SELECT MEAN(usage_idle), MEAN(bytes_free) FROM cpu, disk; +SELECT MEAN(usage_idle), MEAN(bytes_free) FROM cpu, disk GROUP BY TIME(10s) FILL(none); + +-- TODO(sgc): The following two queries produce incorrect results +-- See: https://github.com/influxdata/influxdb_iox/issues/7361 +-- using aggregates across measurements +-- SELECT MEAN(usage_idle) + MEAN(bytes_free) FROM cpu, disk; +-- using aggregates with missing fields +-- SELECT MEAN(usage_idle) + MEAN(foo) FROM cpu; + SELECT COUNT(f64), SUM(f64) FROM m0 GROUP BY TIME(30s) FILL(none); -- supports offset parameter SELECT COUNT(f64), SUM(f64) FROM m0 GROUP BY TIME(30s, 1s) FILL(none); @@ -317,12 +331,43 @@ SELECT SUM(usage_idle) FROM cpu, disk GROUP BY cpu; SELECT COUNT(usage_idle) + usage_idle FROM cpu; SELECT COUNT(usage_idle), usage_idle FROM cpu; +-- +-- gap filling via FILL clause +-- + +-- Default FILL(null) when FILL is omitted +SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s); +SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s); +SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(null); +SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(null); +SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(previous); +SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(previous); +SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(3.14); +SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(3.14); + +-- correct data type of FILL(value) depending on the data type of the aggregate +SELECT COUNT(usage_idle), SUM(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(3.14); +SELECT COUNT(bytes_free), SUM(bytes_free) FROM disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(3.14); + +-- can combine multiple aggregates into a binary expression +SELECT COUNT(bytes_free) as a, SUM(bytes_free) as b, COUNT(bytes_free) + SUM(bytes_free) as "a+b" FROM disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(5); + +-- grouping by tags +SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu; +SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu FILL(null); +SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device FILL(null); +SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu FILL(previous); +SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device FILL(previous); +SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu FILL(3.14); +SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device FILL(3.14); + +-- Succeeds without upper bound +-- NOTE: expected to return no data, as there is none within the time range +SELECT COUNT(usage_idle) FROM cpu WHERE time >= now() - 2m GROUP BY TIME(30s) FILL(null); + -- Unimplemented cases --- TODO(sgc): No gap filling --- Default FILL(null) when FILL is omitted -SELECT COUNT(usage_idle) FROM cpu GROUP BY TIME(30s); -SELECT COUNT(usage_idle) FROM cpu GROUP BY TIME(30s) FILL(previous); +SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(linear); -- LIMIT and OFFSET aren't supported with aggregates and groups SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu LIMIT 1; diff --git a/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected b/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected index ff513df086..e2b3936ce8 100644 --- a/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected +++ b/influxdb_iox/tests/query_tests2/cases/in/issue_6112.influxql.expected @@ -417,12 +417,12 @@ Error while planning query: Error during planning: invalid number of arguments f | cpu | 2022-10-31T02:00:10Z | 2.99 | | | cpu | 2022-10-31T02:00:10Z | 0.99 | | | cpu | 2022-10-31T02:00:10Z | 1.99 | | -| disk | 2022-10-31T02:00:00Z | | 219838.0 | -| disk | 2022-10-31T02:00:00Z | | 319838.0 | -| disk | 2022-10-31T02:00:00Z | | 419838.0 | -| disk | 2022-10-31T02:00:10Z | | 219833.0 | -| disk | 2022-10-31T02:00:10Z | | 319833.0 | -| disk | 2022-10-31T02:00:10Z | | 419833.0 | +| disk | 2022-10-31T02:00:00Z | | 219838 | +| disk | 2022-10-31T02:00:00Z | | 319838 | +| disk | 2022-10-31T02:00:00Z | | 419838 | +| disk | 2022-10-31T02:00:10Z | | 219833 | +| disk | 2022-10-31T02:00:10Z | | 319833 | +| disk | 2022-10-31T02:00:10Z | | 419833 | +------------------+----------------------+------------+------------+ -- InfluxQL: SELECT cpu, usage_idle FROM cpu; +------------------+----------------------+-----------+------------+ @@ -489,12 +489,12 @@ Error while planning query: Error during planning: invalid number of arguments f | cpu | 2022-10-31T02:00:10Z | cpu0 | 0.99 | | | cpu | 2022-10-31T02:00:00Z | cpu1 | 1.98 | | | cpu | 2022-10-31T02:00:10Z | cpu1 | 1.99 | | -| disk | 2022-10-31T02:00:00Z | | | 1234.0 | -| disk | 2022-10-31T02:00:00Z | | | 2234.0 | -| disk | 2022-10-31T02:00:00Z | | | 3234.0 | -| disk | 2022-10-31T02:00:10Z | | | 1239.0 | -| disk | 2022-10-31T02:00:10Z | | | 2239.0 | -| disk | 2022-10-31T02:00:10Z | | | 3239.0 | +| disk | 2022-10-31T02:00:00Z | | | 1234 | +| disk | 2022-10-31T02:00:00Z | | | 2234 | +| disk | 2022-10-31T02:00:00Z | | | 3234 | +| disk | 2022-10-31T02:00:10Z | | | 1239 | +| disk | 2022-10-31T02:00:10Z | | | 2239 | +| disk | 2022-10-31T02:00:10Z | | | 3239 | +------------------+----------------------+-----------+------------+------------+ -- InfluxQL: SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY cpu, non_existent; +------------------+----------------------+-----------+--------------+------------+------------+ @@ -506,12 +506,12 @@ Error while planning query: Error during planning: invalid number of arguments f | cpu | 2022-10-31T02:00:10Z | cpu0 | | 0.99 | | | cpu | 2022-10-31T02:00:00Z | cpu1 | | 1.98 | | | cpu | 2022-10-31T02:00:10Z | cpu1 | | 1.99 | | -| disk | 2022-10-31T02:00:00Z | | | | 1234.0 | -| disk | 2022-10-31T02:00:00Z | | | | 2234.0 | -| disk | 2022-10-31T02:00:00Z | | | | 3234.0 | -| disk | 2022-10-31T02:00:10Z | | | | 1239.0 | -| disk | 2022-10-31T02:00:10Z | | | | 2239.0 | -| disk | 2022-10-31T02:00:10Z | | | | 3239.0 | +| disk | 2022-10-31T02:00:00Z | | | | 1234 | +| disk | 2022-10-31T02:00:00Z | | | | 2234 | +| disk | 2022-10-31T02:00:00Z | | | | 3234 | +| disk | 2022-10-31T02:00:10Z | | | | 1239 | +| disk | 2022-10-31T02:00:10Z | | | | 2239 | +| disk | 2022-10-31T02:00:10Z | | | | 3239 | +------------------+----------------------+-----------+--------------+------------+------------+ -- InfluxQL: SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY cpu, device; +------------------+----------------------+-----------+---------+------------+------------+ @@ -523,12 +523,12 @@ Error while planning query: Error during planning: invalid number of arguments f | cpu | 2022-10-31T02:00:10Z | cpu0 | | 0.99 | | | cpu | 2022-10-31T02:00:00Z | cpu1 | | 1.98 | | | cpu | 2022-10-31T02:00:10Z | cpu1 | | 1.99 | | -| disk | 2022-10-31T02:00:00Z | | disk1s1 | | 1234.0 | -| disk | 2022-10-31T02:00:10Z | | disk1s1 | | 1239.0 | -| disk | 2022-10-31T02:00:00Z | | disk1s2 | | 2234.0 | -| disk | 2022-10-31T02:00:10Z | | disk1s2 | | 2239.0 | -| disk | 2022-10-31T02:00:00Z | | disk1s5 | | 3234.0 | -| disk | 2022-10-31T02:00:10Z | | disk1s5 | | 3239.0 | +| disk | 2022-10-31T02:00:00Z | | disk1s1 | | 1234 | +| disk | 2022-10-31T02:00:10Z | | disk1s1 | | 1239 | +| disk | 2022-10-31T02:00:00Z | | disk1s2 | | 2234 | +| disk | 2022-10-31T02:00:10Z | | disk1s2 | | 2239 | +| disk | 2022-10-31T02:00:00Z | | disk1s5 | | 3234 | +| disk | 2022-10-31T02:00:10Z | | disk1s5 | | 3239 | +------------------+----------------------+-----------+---------+------------+------------+ -- InfluxQL: SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY device, cpu; +------------------+----------------------+-----------+---------+------------+------------+ @@ -540,12 +540,12 @@ Error while planning query: Error during planning: invalid number of arguments f | cpu | 2022-10-31T02:00:10Z | cpu0 | | 0.99 | | | cpu | 2022-10-31T02:00:00Z | cpu1 | | 1.98 | | | cpu | 2022-10-31T02:00:10Z | cpu1 | | 1.99 | | -| disk | 2022-10-31T02:00:00Z | | disk1s1 | | 1234.0 | -| disk | 2022-10-31T02:00:10Z | | disk1s1 | | 1239.0 | -| disk | 2022-10-31T02:00:00Z | | disk1s2 | | 2234.0 | -| disk | 2022-10-31T02:00:10Z | | disk1s2 | | 2239.0 | -| disk | 2022-10-31T02:00:00Z | | disk1s5 | | 3234.0 | -| disk | 2022-10-31T02:00:10Z | | disk1s5 | | 3239.0 | +| disk | 2022-10-31T02:00:00Z | | disk1s1 | | 1234 | +| disk | 2022-10-31T02:00:10Z | | disk1s1 | | 1239 | +| disk | 2022-10-31T02:00:00Z | | disk1s2 | | 2234 | +| disk | 2022-10-31T02:00:10Z | | disk1s2 | | 2239 | +| disk | 2022-10-31T02:00:00Z | | disk1s5 | | 3234 | +| disk | 2022-10-31T02:00:10Z | | disk1s5 | | 3239 | +------------------+----------------------+-----------+---------+------------+------------+ -- InfluxQL: SELECT usage_idle, bytes_free, device, cpu FROM cpu, disk GROUP BY device, cpu; +------------------+----------------------+------------+------------+---------+-----------+ @@ -557,13 +557,26 @@ Error while planning query: Error during planning: invalid number of arguments f | cpu | 2022-10-31T02:00:10Z | 0.99 | | | cpu0 | | cpu | 2022-10-31T02:00:00Z | 1.98 | | | cpu1 | | cpu | 2022-10-31T02:00:10Z | 1.99 | | | cpu1 | -| disk | 2022-10-31T02:00:00Z | | 1234.0 | disk1s1 | | -| disk | 2022-10-31T02:00:10Z | | 1239.0 | disk1s1 | | -| disk | 2022-10-31T02:00:00Z | | 2234.0 | disk1s2 | | -| disk | 2022-10-31T02:00:10Z | | 2239.0 | disk1s2 | | -| disk | 2022-10-31T02:00:00Z | | 3234.0 | disk1s5 | | -| disk | 2022-10-31T02:00:10Z | | 3239.0 | disk1s5 | | +| disk | 2022-10-31T02:00:00Z | | 1234 | disk1s1 | | +| disk | 2022-10-31T02:00:10Z | | 1239 | disk1s1 | | +| disk | 2022-10-31T02:00:00Z | | 2234 | disk1s2 | | +| disk | 2022-10-31T02:00:10Z | | 2239 | disk1s2 | | +| disk | 2022-10-31T02:00:00Z | | 3234 | disk1s5 | | +| disk | 2022-10-31T02:00:10Z | | 3239 | disk1s5 | | +------------------+----------------------+------------+------------+---------+-----------+ +-- InfluxQL: SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0; ++------------------+----------------------+-------+--------------------+--------------------+ +| iox::measurement | time | count | sum | stddev | ++------------------+----------------------+-------+--------------------+--------------------+ +| m0 | 1970-01-01T00:00:00Z | 7 | 102.30000000000001 | 4.8912945019454614 | ++------------------+----------------------+-------+--------------------+--------------------+ +-- InfluxQL: SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0, m1; ++------------------+----------------------+-------+--------------------+--------------------+ +| iox::measurement | time | count | sum | stddev | ++------------------+----------------------+-------+--------------------+--------------------+ +| m0 | 1970-01-01T00:00:00Z | 7 | 102.30000000000001 | 4.8912945019454614 | +| m1 | 1970-01-01T00:00:00Z | 3 | 402.8 | 57.4494850571642 | ++------------------+----------------------+-------+--------------------+--------------------+ -- InfluxQL: SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY tag0; +------------------+----------------------+-------+-------+------+-------------------+ | iox::measurement | time | tag0 | count | sum | stddev | @@ -572,6 +585,16 @@ Error while planning query: Error during planning: invalid number of arguments f | m0 | 1970-01-01T00:00:00Z | val01 | 1 | 11.3 | | | m0 | 1970-01-01T00:00:00Z | val02 | 1 | 10.4 | | +------------------+----------------------+-------+-------+------+-------------------+ +-- InfluxQL: SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0, m1 GROUP BY tag0; ++------------------+----------------------+-------+-------+-------+-------------------+ +| iox::measurement | time | tag0 | count | sum | stddev | ++------------------+----------------------+-------+-------+-------+-------------------+ +| m0 | 1970-01-01T00:00:00Z | val00 | 5 | 80.6 | 5.085961069453836 | +| m0 | 1970-01-01T00:00:00Z | val01 | 1 | 11.3 | | +| m0 | 1970-01-01T00:00:00Z | val02 | 1 | 10.4 | | +| m1 | 1970-01-01T00:00:00Z | val00 | 2 | 301.1 | 70.7813887967734 | +| m1 | 1970-01-01T00:00:00Z | val01 | 1 | 101.7 | | ++------------------+----------------------+-------+-------+-------+-------------------+ -- InfluxQL: SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY tag0, non_existent; +------------------+----------------------+--------------+-------+-------+------+-------------------+ | iox::measurement | time | non_existent | tag0 | count | sum | stddev | @@ -604,6 +627,22 @@ Error while planning query: Error during planning: invalid number of arguments f +------------------+----------------------+-----------+-----+ | m0 | 1970-01-01T00:00:00Z | 7 | | +------------------+----------------------+-----------+-----+ +-- InfluxQL: SELECT MEAN(usage_idle), MEAN(bytes_free) FROM cpu, disk; ++------------------+----------------------+--------------------+--------+ +| iox::measurement | time | mean | mean_1 | ++------------------+----------------------+--------------------+--------+ +| cpu | 1970-01-01T00:00:00Z | 1.9850000000000003 | | +| disk | 1970-01-01T00:00:00Z | | 2236.5 | ++------------------+----------------------+--------------------+--------+ +-- InfluxQL: SELECT MEAN(usage_idle), MEAN(bytes_free) FROM cpu, disk GROUP BY TIME(10s) FILL(none); ++------------------+----------------------+--------------------+--------+ +| iox::measurement | time | mean | mean_1 | ++------------------+----------------------+--------------------+--------+ +| cpu | 2022-10-31T02:00:00Z | 1.9799999999999998 | | +| cpu | 2022-10-31T02:00:10Z | 1.9900000000000002 | | +| disk | 2022-10-31T02:00:00Z | | 2234.0 | +| disk | 2022-10-31T02:00:10Z | | 2239.0 | ++------------------+----------------------+--------------------+--------+ -- InfluxQL: SELECT COUNT(f64), SUM(f64) FROM m0 GROUP BY TIME(30s) FILL(none); +------------------+----------------------+-------+------+ | iox::measurement | time | count | sum | @@ -668,10 +707,281 @@ Error while planning query: Error during planning: invalid number of arguments f Error while planning query: Error during planning: mixing aggregate and non-aggregate columns is not supported -- InfluxQL: SELECT COUNT(usage_idle), usage_idle FROM cpu; Error while planning query: Error during planning: mixing aggregate and non-aggregate columns is not supported --- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY TIME(30s); -Error while planning query: This feature is not implemented: FILL(NULL) --- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY TIME(30s) FILL(previous); -Error while planning query: This feature is not implemented: FILL(PREVIOUS) +-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s); ++------------------+----------------------+-------+ +| iox::measurement | time | count | ++------------------+----------------------+-------+ +| cpu | 2022-10-31T02:00:00Z | 6 | +| cpu | 2022-10-31T02:00:30Z | | +| cpu | 2022-10-31T02:01:00Z | | +| cpu | 2022-10-31T02:01:30Z | | ++------------------+----------------------+-------+ +-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s); ++------------------+----------------------+-------+---------+ +| iox::measurement | time | count | count_1 | ++------------------+----------------------+-------+---------+ +| cpu | 2022-10-31T02:00:00Z | 6 | | +| cpu | 2022-10-31T02:00:30Z | | | +| cpu | 2022-10-31T02:01:00Z | | | +| cpu | 2022-10-31T02:01:30Z | | | +| disk | 2022-10-31T02:00:00Z | | 6 | +| disk | 2022-10-31T02:00:30Z | | | +| disk | 2022-10-31T02:01:00Z | | | +| disk | 2022-10-31T02:01:30Z | | | ++------------------+----------------------+-------+---------+ +-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(null); ++------------------+----------------------+-------+ +| iox::measurement | time | count | ++------------------+----------------------+-------+ +| cpu | 2022-10-31T02:00:00Z | 6 | +| cpu | 2022-10-31T02:00:30Z | | +| cpu | 2022-10-31T02:01:00Z | | +| cpu | 2022-10-31T02:01:30Z | | ++------------------+----------------------+-------+ +-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(null); ++------------------+----------------------+-------+---------+ +| iox::measurement | time | count | count_1 | ++------------------+----------------------+-------+---------+ +| cpu | 2022-10-31T02:00:00Z | 6 | | +| cpu | 2022-10-31T02:00:30Z | | | +| cpu | 2022-10-31T02:01:00Z | | | +| cpu | 2022-10-31T02:01:30Z | | | +| disk | 2022-10-31T02:00:00Z | | 6 | +| disk | 2022-10-31T02:00:30Z | | | +| disk | 2022-10-31T02:01:00Z | | | +| disk | 2022-10-31T02:01:30Z | | | ++------------------+----------------------+-------+---------+ +-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(previous); ++------------------+----------------------+-------+ +| iox::measurement | time | count | ++------------------+----------------------+-------+ +| cpu | 2022-10-31T02:00:00Z | 6 | +| cpu | 2022-10-31T02:00:30Z | 6 | +| cpu | 2022-10-31T02:01:00Z | 6 | +| cpu | 2022-10-31T02:01:30Z | 6 | ++------------------+----------------------+-------+ +-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(previous); ++------------------+----------------------+-------+---------+ +| iox::measurement | time | count | count_1 | ++------------------+----------------------+-------+---------+ +| cpu | 2022-10-31T02:00:00Z | 6 | | +| cpu | 2022-10-31T02:00:30Z | 6 | | +| cpu | 2022-10-31T02:01:00Z | 6 | | +| cpu | 2022-10-31T02:01:30Z | 6 | | +| disk | 2022-10-31T02:00:00Z | | 6 | +| disk | 2022-10-31T02:00:30Z | | 6 | +| disk | 2022-10-31T02:01:00Z | | 6 | +| disk | 2022-10-31T02:01:30Z | | 6 | ++------------------+----------------------+-------+---------+ +-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(3.14); ++------------------+----------------------+-------+ +| iox::measurement | time | count | ++------------------+----------------------+-------+ +| cpu | 2022-10-31T02:00:00Z | 6 | +| cpu | 2022-10-31T02:00:30Z | 3 | +| cpu | 2022-10-31T02:01:00Z | 3 | +| cpu | 2022-10-31T02:01:30Z | 3 | ++------------------+----------------------+-------+ +-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(3.14); ++------------------+----------------------+-------+---------+ +| iox::measurement | time | count | count_1 | ++------------------+----------------------+-------+---------+ +| cpu | 2022-10-31T02:00:00Z | 6 | | +| cpu | 2022-10-31T02:00:30Z | 3 | | +| cpu | 2022-10-31T02:01:00Z | 3 | | +| cpu | 2022-10-31T02:01:30Z | 3 | | +| disk | 2022-10-31T02:00:00Z | | 6 | +| disk | 2022-10-31T02:00:30Z | | 3 | +| disk | 2022-10-31T02:01:00Z | | 3 | +| disk | 2022-10-31T02:01:30Z | | 3 | ++------------------+----------------------+-------+---------+ +-- InfluxQL: SELECT COUNT(usage_idle), SUM(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(3.14); ++------------------+----------------------+-------+--------------------+ +| iox::measurement | time | count | sum | ++------------------+----------------------+-------+--------------------+ +| cpu | 2022-10-31T02:00:00Z | 6 | 11.910000000000002 | +| cpu | 2022-10-31T02:00:30Z | 3 | 3.14 | +| cpu | 2022-10-31T02:01:00Z | 3 | 3.14 | +| cpu | 2022-10-31T02:01:30Z | 3 | 3.14 | ++------------------+----------------------+-------+--------------------+ +-- InfluxQL: SELECT COUNT(bytes_free), SUM(bytes_free) FROM disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(3.14); ++------------------+----------------------+-------+-------+ +| iox::measurement | time | count | sum | ++------------------+----------------------+-------+-------+ +| disk | 2022-10-31T02:00:00Z | 6 | 13419 | +| disk | 2022-10-31T02:00:30Z | 3 | 3 | +| disk | 2022-10-31T02:01:00Z | 3 | 3 | +| disk | 2022-10-31T02:01:30Z | 3 | 3 | ++------------------+----------------------+-------+-------+ +-- InfluxQL: SELECT COUNT(bytes_free) as a, SUM(bytes_free) as b, COUNT(bytes_free) + SUM(bytes_free) as "a+b" FROM disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(5); ++------------------+----------------------+---+-------+-------+ +| iox::measurement | time | a | b | "a+b" | ++------------------+----------------------+---+-------+-------+ +| disk | 2022-10-31T02:00:00Z | 6 | 13419 | 13425 | +| disk | 2022-10-31T02:00:30Z | 5 | 5 | 10 | +| disk | 2022-10-31T02:01:00Z | 5 | 5 | 10 | +| disk | 2022-10-31T02:01:30Z | 5 | 5 | 10 | ++------------------+----------------------+---+-------+-------+ +-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu; ++------------------+----------------------+-----------+-------+ +| iox::measurement | time | cpu | count | ++------------------+----------------------+-----------+-------+ +| cpu | 2022-10-31T02:00:00Z | cpu-total | 2 | +| cpu | 2022-10-31T02:00:30Z | cpu-total | | +| cpu | 2022-10-31T02:01:00Z | cpu-total | | +| cpu | 2022-10-31T02:01:30Z | cpu-total | | +| cpu | 2022-10-31T02:00:00Z | cpu0 | 2 | +| cpu | 2022-10-31T02:00:30Z | cpu0 | | +| cpu | 2022-10-31T02:01:00Z | cpu0 | | +| cpu | 2022-10-31T02:01:30Z | cpu0 | | +| cpu | 2022-10-31T02:00:00Z | cpu1 | 2 | +| cpu | 2022-10-31T02:00:30Z | cpu1 | | +| cpu | 2022-10-31T02:01:00Z | cpu1 | | +| cpu | 2022-10-31T02:01:30Z | cpu1 | | ++------------------+----------------------+-----------+-------+ +-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu FILL(null); ++------------------+----------------------+-----------+-------+ +| iox::measurement | time | cpu | count | ++------------------+----------------------+-----------+-------+ +| cpu | 2022-10-31T02:00:00Z | cpu-total | 2 | +| cpu | 2022-10-31T02:00:30Z | cpu-total | | +| cpu | 2022-10-31T02:01:00Z | cpu-total | | +| cpu | 2022-10-31T02:01:30Z | cpu-total | | +| cpu | 2022-10-31T02:00:00Z | cpu0 | 2 | +| cpu | 2022-10-31T02:00:30Z | cpu0 | | +| cpu | 2022-10-31T02:01:00Z | cpu0 | | +| cpu | 2022-10-31T02:01:30Z | cpu0 | | +| cpu | 2022-10-31T02:00:00Z | cpu1 | 2 | +| cpu | 2022-10-31T02:00:30Z | cpu1 | | +| cpu | 2022-10-31T02:01:00Z | cpu1 | | +| cpu | 2022-10-31T02:01:30Z | cpu1 | | ++------------------+----------------------+-----------+-------+ +-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device FILL(null); ++------------------+----------------------+-----------+---------+-------+---------+ +| iox::measurement | time | cpu | device | count | count_1 | ++------------------+----------------------+-----------+---------+-------+---------+ +| cpu | 2022-10-31T02:00:00Z | cpu-total | | 2 | | +| cpu | 2022-10-31T02:00:30Z | cpu-total | | | | +| cpu | 2022-10-31T02:01:00Z | cpu-total | | | | +| cpu | 2022-10-31T02:01:30Z | cpu-total | | | | +| cpu | 2022-10-31T02:00:00Z | cpu0 | | 2 | | +| cpu | 2022-10-31T02:00:30Z | cpu0 | | | | +| cpu | 2022-10-31T02:01:00Z | cpu0 | | | | +| cpu | 2022-10-31T02:01:30Z | cpu0 | | | | +| cpu | 2022-10-31T02:00:00Z | cpu1 | | 2 | | +| cpu | 2022-10-31T02:00:30Z | cpu1 | | | | +| cpu | 2022-10-31T02:01:00Z | cpu1 | | | | +| cpu | 2022-10-31T02:01:30Z | cpu1 | | | | +| disk | 2022-10-31T02:00:00Z | | disk1s1 | | 2 | +| disk | 2022-10-31T02:00:30Z | | disk1s1 | | | +| disk | 2022-10-31T02:01:00Z | | disk1s1 | | | +| disk | 2022-10-31T02:01:30Z | | disk1s1 | | | +| disk | 2022-10-31T02:00:00Z | | disk1s2 | | 2 | +| disk | 2022-10-31T02:00:30Z | | disk1s2 | | | +| disk | 2022-10-31T02:01:00Z | | disk1s2 | | | +| disk | 2022-10-31T02:01:30Z | | disk1s2 | | | +| disk | 2022-10-31T02:00:00Z | | disk1s5 | | 2 | +| disk | 2022-10-31T02:00:30Z | | disk1s5 | | | +| disk | 2022-10-31T02:01:00Z | | disk1s5 | | | +| disk | 2022-10-31T02:01:30Z | | disk1s5 | | | ++------------------+----------------------+-----------+---------+-------+---------+ +-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu FILL(previous); ++------------------+----------------------+-----------+-------+ +| iox::measurement | time | cpu | count | ++------------------+----------------------+-----------+-------+ +| cpu | 2022-10-31T02:00:00Z | cpu-total | 2 | +| cpu | 2022-10-31T02:00:30Z | cpu-total | 2 | +| cpu | 2022-10-31T02:01:00Z | cpu-total | 2 | +| cpu | 2022-10-31T02:01:30Z | cpu-total | 2 | +| cpu | 2022-10-31T02:00:00Z | cpu0 | 2 | +| cpu | 2022-10-31T02:00:30Z | cpu0 | 2 | +| cpu | 2022-10-31T02:01:00Z | cpu0 | 2 | +| cpu | 2022-10-31T02:01:30Z | cpu0 | 2 | +| cpu | 2022-10-31T02:00:00Z | cpu1 | 2 | +| cpu | 2022-10-31T02:00:30Z | cpu1 | 2 | +| cpu | 2022-10-31T02:01:00Z | cpu1 | 2 | +| cpu | 2022-10-31T02:01:30Z | cpu1 | 2 | ++------------------+----------------------+-----------+-------+ +-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device FILL(previous); ++------------------+----------------------+-----------+---------+-------+---------+ +| iox::measurement | time | cpu | device | count | count_1 | ++------------------+----------------------+-----------+---------+-------+---------+ +| cpu | 2022-10-31T02:00:00Z | cpu-total | | 2 | | +| cpu | 2022-10-31T02:00:30Z | cpu-total | | 2 | | +| cpu | 2022-10-31T02:01:00Z | cpu-total | | 2 | | +| cpu | 2022-10-31T02:01:30Z | cpu-total | | 2 | | +| cpu | 2022-10-31T02:00:00Z | cpu0 | | 2 | | +| cpu | 2022-10-31T02:00:30Z | cpu0 | | 2 | | +| cpu | 2022-10-31T02:01:00Z | cpu0 | | 2 | | +| cpu | 2022-10-31T02:01:30Z | cpu0 | | 2 | | +| cpu | 2022-10-31T02:00:00Z | cpu1 | | 2 | | +| cpu | 2022-10-31T02:00:30Z | cpu1 | | 2 | | +| cpu | 2022-10-31T02:01:00Z | cpu1 | | 2 | | +| cpu | 2022-10-31T02:01:30Z | cpu1 | | 2 | | +| disk | 2022-10-31T02:00:00Z | | disk1s1 | | 2 | +| disk | 2022-10-31T02:00:30Z | | disk1s1 | | 2 | +| disk | 2022-10-31T02:01:00Z | | disk1s1 | | 2 | +| disk | 2022-10-31T02:01:30Z | | disk1s1 | | 2 | +| disk | 2022-10-31T02:00:00Z | | disk1s2 | | 2 | +| disk | 2022-10-31T02:00:30Z | | disk1s2 | | 2 | +| disk | 2022-10-31T02:01:00Z | | disk1s2 | | 2 | +| disk | 2022-10-31T02:01:30Z | | disk1s2 | | 2 | +| disk | 2022-10-31T02:00:00Z | | disk1s5 | | 2 | +| disk | 2022-10-31T02:00:30Z | | disk1s5 | | 2 | +| disk | 2022-10-31T02:01:00Z | | disk1s5 | | 2 | +| disk | 2022-10-31T02:01:30Z | | disk1s5 | | 2 | ++------------------+----------------------+-----------+---------+-------+---------+ +-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu FILL(3.14); ++------------------+----------------------+-----------+-------+ +| iox::measurement | time | cpu | count | ++------------------+----------------------+-----------+-------+ +| cpu | 2022-10-31T02:00:00Z | cpu-total | 2 | +| cpu | 2022-10-31T02:00:30Z | cpu-total | 3 | +| cpu | 2022-10-31T02:01:00Z | cpu-total | 3 | +| cpu | 2022-10-31T02:01:30Z | cpu-total | 3 | +| cpu | 2022-10-31T02:00:00Z | cpu0 | 2 | +| cpu | 2022-10-31T02:00:30Z | cpu0 | 3 | +| cpu | 2022-10-31T02:01:00Z | cpu0 | 3 | +| cpu | 2022-10-31T02:01:30Z | cpu0 | 3 | +| cpu | 2022-10-31T02:00:00Z | cpu1 | 2 | +| cpu | 2022-10-31T02:00:30Z | cpu1 | 3 | +| cpu | 2022-10-31T02:01:00Z | cpu1 | 3 | +| cpu | 2022-10-31T02:01:30Z | cpu1 | 3 | ++------------------+----------------------+-----------+-------+ +-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device FILL(3.14); ++------------------+----------------------+-----------+---------+-------+---------+ +| iox::measurement | time | cpu | device | count | count_1 | ++------------------+----------------------+-----------+---------+-------+---------+ +| cpu | 2022-10-31T02:00:00Z | cpu-total | | 2 | | +| cpu | 2022-10-31T02:00:30Z | cpu-total | | 3 | | +| cpu | 2022-10-31T02:01:00Z | cpu-total | | 3 | | +| cpu | 2022-10-31T02:01:30Z | cpu-total | | 3 | | +| cpu | 2022-10-31T02:00:00Z | cpu0 | | 2 | | +| cpu | 2022-10-31T02:00:30Z | cpu0 | | 3 | | +| cpu | 2022-10-31T02:01:00Z | cpu0 | | 3 | | +| cpu | 2022-10-31T02:01:30Z | cpu0 | | 3 | | +| cpu | 2022-10-31T02:00:00Z | cpu1 | | 2 | | +| cpu | 2022-10-31T02:00:30Z | cpu1 | | 3 | | +| cpu | 2022-10-31T02:01:00Z | cpu1 | | 3 | | +| cpu | 2022-10-31T02:01:30Z | cpu1 | | 3 | | +| disk | 2022-10-31T02:00:00Z | | disk1s1 | | 2 | +| disk | 2022-10-31T02:00:30Z | | disk1s1 | | 3 | +| disk | 2022-10-31T02:01:00Z | | disk1s1 | | 3 | +| disk | 2022-10-31T02:01:30Z | | disk1s1 | | 3 | +| disk | 2022-10-31T02:00:00Z | | disk1s2 | | 2 | +| disk | 2022-10-31T02:00:30Z | | disk1s2 | | 3 | +| disk | 2022-10-31T02:01:00Z | | disk1s2 | | 3 | +| disk | 2022-10-31T02:01:30Z | | disk1s2 | | 3 | +| disk | 2022-10-31T02:00:00Z | | disk1s5 | | 2 | +| disk | 2022-10-31T02:00:30Z | | disk1s5 | | 3 | +| disk | 2022-10-31T02:01:00Z | | disk1s5 | | 3 | +| disk | 2022-10-31T02:01:30Z | | disk1s5 | | 3 | ++------------------+----------------------+-----------+---------+-------+---------+ +-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= now() - 2m GROUP BY TIME(30s) FILL(null); +++ +++ +-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(linear); +Error while planning query: This feature is not implemented: FILL(LINEAR) -- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu LIMIT 1; Error while planning query: This feature is not implemented: GROUP BY combined with LIMIT or OFFSET clause -- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu OFFSET 1; diff --git a/influxdb_iox/tests/query_tests2/setups.rs b/influxdb_iox/tests/query_tests2/setups.rs index e06613a1c9..c557f1132f 100644 --- a/influxdb_iox/tests/query_tests2/setups.rs +++ b/influxdb_iox/tests/query_tests2/setups.rs @@ -1240,12 +1240,12 @@ pub static SETUPS: Lazy> = Lazy::new(|| { cpu,host=host1,cpu=cpu0 usage_idle=0.99,usage_system=0.1 1667181610000000000 cpu,host=host1,cpu=cpu1 usage_idle=1.98,usage_system=1.2 1667181600000000000 cpu,host=host1,cpu=cpu1 usage_idle=1.99,usage_system=1.1 1667181610000000000 - disk,host=host1,device=disk1s1 bytes_free=1234,bytes_used=219838 1667181600000000000 - disk,host=host1,device=disk1s1 bytes_free=1239,bytes_used=219833 1667181610000000000 - disk,host=host1,device=disk1s2 bytes_free=2234,bytes_used=319838 1667181600000000000 - disk,host=host1,device=disk1s2 bytes_free=2239,bytes_used=319833 1667181610000000000 - disk,host=host1,device=disk1s5 bytes_free=3234,bytes_used=419838 1667181600000000000 - disk,host=host1,device=disk1s5 bytes_free=3239,bytes_used=419833 1667181610000000000 + disk,host=host1,device=disk1s1 bytes_free=1234i,bytes_used=219838i 1667181600000000000 + disk,host=host1,device=disk1s1 bytes_free=1239i,bytes_used=219833i 1667181610000000000 + disk,host=host1,device=disk1s2 bytes_free=2234i,bytes_used=319838i 1667181600000000000 + disk,host=host1,device=disk1s2 bytes_free=2239i,bytes_used=319833i 1667181610000000000 + disk,host=host1,device=disk1s5 bytes_free=3234i,bytes_used=419838i 1667181600000000000 + disk,host=host1,device=disk1s5 bytes_free=3239i,bytes_used=419833i 1667181610000000000 "# .to_string(), ), diff --git a/iox_query/src/exec.rs b/iox_query/src/exec.rs index af3d5a30a1..c5aa569aba 100644 --- a/iox_query/src/exec.rs +++ b/iox_query/src/exec.rs @@ -4,7 +4,7 @@ pub(crate) mod context; pub mod field; pub mod fieldlist; -pub(crate) mod gapfill; +pub mod gapfill; mod non_null_checker; mod query_tracing; mod schema_pivot; diff --git a/iox_query/src/exec/gapfill/mod.rs b/iox_query/src/exec/gapfill/mod.rs index d0b700e58b..97471c5041 100644 --- a/iox_query/src/exec/gapfill/mod.rs +++ b/iox_query/src/exec/gapfill/mod.rs @@ -34,15 +34,20 @@ use self::stream::GapFillStream; /// A logical node that represents the gap filling operation. #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct GapFill { - input: Arc, - group_expr: Vec, - aggr_expr: Vec, - params: GapFillParams, + /// The incoming logical plan + pub input: Arc, + /// Grouping expressions + pub group_expr: Vec, + /// Aggregate expressions + pub aggr_expr: Vec, + /// Parameters to configure the behavior of the + /// gap-filling operation + pub params: GapFillParams, } /// Parameters to the GapFill operation #[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub(crate) struct GapFillParams { +pub struct GapFillParams { /// The stride argument from the call to DATE_BIN_GAPFILL pub stride: Expr, /// The source time column @@ -149,7 +154,8 @@ impl GapFillParams { } impl GapFill { - pub(crate) fn try_new( + /// Create a new gap-filling operator. + pub fn try_new( input: Arc, group_expr: Vec, aggr_expr: Vec, diff --git a/iox_query/src/logical_optimizer/handle_gapfill.rs b/iox_query/src/logical_optimizer/handle_gapfill.rs index 8ef87c6429..1d96aae6e6 100644 --- a/iox_query/src/logical_optimizer/handle_gapfill.rs +++ b/iox_query/src/logical_optimizer/handle_gapfill.rs @@ -1,7 +1,7 @@ //! An optimizer rule that transforms a plan //! to fill gaps in time series data. -mod range_predicate; +pub mod range_predicate; use crate::exec::gapfill::{FillStrategy, GapFill, GapFillParams}; use datafusion::{ diff --git a/iox_query/src/logical_optimizer/handle_gapfill/range_predicate.rs b/iox_query/src/logical_optimizer/handle_gapfill/range_predicate.rs index 20a0cdeb68..353e15bacc 100644 --- a/iox_query/src/logical_optimizer/handle_gapfill/range_predicate.rs +++ b/iox_query/src/logical_optimizer/handle_gapfill/range_predicate.rs @@ -14,7 +14,7 @@ use datafusion::{ /// Given a plan and a column, finds the predicates that use that column /// and return a range with expressions for upper and lower bounds. -pub(super) fn find_time_range(plan: &LogicalPlan, time_col: &Column) -> Result>> { +pub fn find_time_range(plan: &LogicalPlan, time_col: &Column) -> Result>> { let mut v = TimeRangeVisitor { col: time_col.clone(), range: TimeRange::default(), diff --git a/iox_query/src/logical_optimizer/mod.rs b/iox_query/src/logical_optimizer/mod.rs index aeea68e9bf..6e88a65bd2 100644 --- a/iox_query/src/logical_optimizer/mod.rs +++ b/iox_query/src/logical_optimizer/mod.rs @@ -8,6 +8,7 @@ use self::{ mod handle_gapfill; mod influx_regex_to_datafusion_regex; +pub use handle_gapfill::range_predicate; /// Register IOx-specific logical [`OptimizerRule`]s with the SessionContext /// diff --git a/iox_query_influxql/src/plan/planner.rs b/iox_query_influxql/src/plan/planner.rs index 2ada36aab5..d4999930c6 100644 --- a/iox_query_influxql/src/plan/planner.rs +++ b/iox_query_influxql/src/plan/planner.rs @@ -8,8 +8,7 @@ use crate::plan::planner_time_range_expression::{ duration_expr_to_nanoseconds, expr_to_df_interval_dt, time_range_to_df_expr, }; use crate::plan::rewriter::rewrite_statement; -use crate::plan::util::{binary_operator_to_df_operator, Schemas}; -use crate::plan::util_copy::rebase_expr; +use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, Schemas}; use crate::plan::var_ref::{column_type_to_var_ref_data_type, var_ref_data_type_to_data_type}; use arrow::datatypes::DataType; use chrono_tz::Tz; @@ -21,9 +20,9 @@ use datafusion::logical_expr::logical_plan::builder::project; use datafusion::logical_expr::logical_plan::Analyze; use datafusion::logical_expr::utils::{expr_as_column_expr, find_aggregate_exprs}; use datafusion::logical_expr::{ - binary_expr, date_bin, expr, lit, lit_timestamp_nano, AggregateFunction, AggregateUDF, - BinaryExpr, BuiltinScalarFunction, Explain, Expr, ExprSchemable, LogicalPlan, - LogicalPlanBuilder, Operator, PlanType, ScalarUDF, TableSource, ToStringifiedPlan, + binary_expr, col, date_bin, expr, lit, lit_timestamp_nano, now, Aggregate, AggregateFunction, + AggregateUDF, BinaryExpr, BuiltinScalarFunction, Explain, Expr, ExprSchemable, Extension, + LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ScalarUDF, TableSource, ToStringifiedPlan, }; use datafusion_util::{lit_dict, AsExpr}; use generated_types::influxdata::iox::querier::v1::InfluxQlMetadata; @@ -43,6 +42,8 @@ use influxdb_influxql_parser::{ select::{Field, FieldList, FromMeasurementClause, MeasurementSelection, SelectStatement}, statement::Statement, }; +use iox_query::exec::gapfill::{FillStrategy, GapFill, GapFillParams}; +use iox_query::logical_optimizer::range_predicate::find_time_range; use itertools::Itertools; use once_cell::sync::Lazy; use query_functions::clean_non_meta_escapes; @@ -52,7 +53,7 @@ use schema::{ }; use std::collections::{HashSet, VecDeque}; use std::fmt::Debug; -use std::ops::{ControlFlow, Deref}; +use std::ops::{Bound, ControlFlow, Deref, Range}; use std::str::FromStr; use std::sync::Arc; @@ -144,6 +145,10 @@ impl<'a> Context<'a> { ..*self } } + + fn fill(&self) -> FillClause { + self.fill.unwrap_or_default() + } } #[allow(missing_debug_implementations)] @@ -403,14 +408,8 @@ impl<'a> InfluxQLToLogicalPlan<'a> { // Transform InfluxQL AST field expressions to a list of DataFusion expressions. let select_exprs = self.field_list_to_exprs(ctx, &plan, fields, &schemas)?; - let (plan, select_exprs_post_aggr) = self.select_aggregate( - plan, - fields, - select_exprs, - select.group_by.as_ref(), - group_by_tag_set, - &schemas, - )?; + let (plan, select_exprs_post_aggr) = + self.select_aggregate(ctx, plan, fields, select_exprs, group_by_tag_set, &schemas)?; // Wrap the plan in a `LogicalPlan::Projection` from the select expressions project( @@ -422,13 +421,17 @@ impl<'a> InfluxQLToLogicalPlan<'a> { fn select_aggregate( &self, + ctx: &Context<'_>, input: LogicalPlan, fields: &[Field], select_exprs: Vec, - group_by: Option<&GroupByClause>, group_by_tag_set: &[&str], schemas: &Schemas, ) -> Result<(LogicalPlan, Vec)> { + let Some(time_column_index) = find_time_column_index(fields) else { + return Err(DataFusionError::Internal("unable to find time column".to_owned())) + }; + // Find a list of unique aggregate expressions from the projection. // // For example, a projection such as: @@ -443,14 +446,12 @@ impl<'a> InfluxQLToLogicalPlan<'a> { return Ok((input, select_exprs)); } - let aggr_group_by_exprs = if let Some(group_by) = group_by { + let aggr_group_by_exprs = if let Some(group_by) = ctx.group_by { let mut group_by_exprs = Vec::new(); if group_by.time_dimension().is_some() { // Include the GROUP BY TIME(..) expression - if let Some(index) = find_time_column_index(fields) { - group_by_exprs.push(select_exprs[index].clone()); - } + group_by_exprs.push(select_exprs[time_column_index].clone()); } // Exclude tags that do not exist in the current table schema. @@ -475,6 +476,48 @@ impl<'a> InfluxQLToLogicalPlan<'a> { .aggregate(aggr_group_by_exprs.clone(), aggr_exprs.clone())? .build()?; + let fill_option = ctx.fill(); + + // Wrap the plan in a GapFill operator if the statement specifies a `GROUP BY TIME` clause and + // the FILL option is one of + // + // * `null` + // * `previous` + // * `literal` value + // * `linear` + // + let plan = if ctx.group_by.and_then(|gb| gb.time_dimension()).is_some() + && fill_option != FillClause::None + { + let args = match select_exprs[time_column_index].clone().unalias() { + Expr::ScalarFunction { + fun: BuiltinScalarFunction::DateBin, + args, + } => args, + _ => { + // The InfluxQL planner adds the `date_bin` function, + // so this condition represents an internal failure. + return Err(DataFusionError::Internal( + "expected DATE_BIN function".to_owned(), + )); + } + }; + + let fill_strategy = match fill_option { + FillClause::Null | FillClause::Value(_) => FillStrategy::Null, + FillClause::Previous => FillStrategy::PrevNullAsMissing, + FillClause::Linear => { + return Err(DataFusionError::NotImplemented(fill_option.to_string())) + } + + FillClause::None => unreachable!(), + }; + + build_gap_fill_node(plan, time_column_index, args, fill_strategy)? + } else { + plan + }; + // Combine the aggregate columns and group by expressions, which represents // the final projection from the aggregate operator. let aggr_projection_exprs = [aggr_group_by_exprs, aggr_exprs].concat(); @@ -486,11 +529,29 @@ impl<'a> InfluxQLToLogicalPlan<'a> { .map(|expr| expr_as_column_expr(expr, &plan)) .collect::>>()?; + // Create a literal expression for `value` if the strategy + // is `FILL()` + let fill_if_null = match fill_option { + FillClause::Value(v) => Some(v), + _ => None, + }; + // Rewrite the aggregate columns from the projection, so that the expressions // refer to the columns from the aggregate projection let select_exprs_post_aggr = select_exprs .iter() - .map(|expr| rebase_expr(expr, &aggr_projection_exprs, &plan)) + .zip(fields) + .map(|(expr, f)| { + // This implements the `FILL()` strategy, by coalescing any aggregate + // expressions to `` when they are `NULL`. + let fill_if_null = if fill_if_null.is_some() && is_aggregate_field(f) { + fill_if_null + } else { + None + }; + + rebase_expr(expr, &aggr_projection_exprs, &fill_if_null, &plan) + }) .collect::>>()?; // Strip the NULL columns, which are tags that do not exist in the aggregate @@ -682,14 +743,6 @@ impl<'a> InfluxQLToLogicalPlan<'a> { // or binning the time. if let Some(group_by) = ctx.group_by { if let Some(dim) = group_by.time_dimension() { - // Not supported until date_bin_gapfill is complete - let fill = ctx.fill.unwrap_or_default(); - if fill != FillClause::None { - return Err(DataFusionError::NotImplemented(format!( - "{fill}" - ))); - } - let stride = expr_to_df_interval_dt(&dim.interval)?; let offset = if let Some(offset) = &dim.offset { duration_expr_to_nanoseconds(offset)? @@ -983,6 +1036,94 @@ impl<'a> InfluxQLToLogicalPlan<'a> { } } +/// Returns a [`LogicalPlan`] that performs gap-filling for the `input` plan. +/// +/// # Arguments +/// +/// * `input` - An aggregate plan which requires gap-filling. +/// * `date_bin_index` - The index of the field in the input schema that refers to the `date_bin` expression. +/// * `date_bin_args` - The list of arguments passed to the `date_bin` function, used to configure the gap-fill parameters. +/// * `fill_strategy` - The strategy used to fill gaps in the data. +fn build_gap_fill_node( + input: LogicalPlan, + date_bin_index: usize, + date_bin_args: Vec, + fill_strategy: FillStrategy, +) -> Result { + // Extract the gap-fill parameters from the arguments to the `DATE_BIN` function. + // Any unexpected conditions represents an internal error, as the `DATE_BIN` function is + // added by the planner. + let (stride, time_range, origin) = if date_bin_args.len() == 3 { + let time_col = date_bin_args[1].try_into_col().map_err(|_| { + DataFusionError::Internal( + "DATE_BIN requires a column as the source argument".to_string(), + ) + })?; + + // Ensure that a time range was specified and is valid for gap filling + let time_range = match find_time_range(input.inputs()[0], &time_col)? { + // Follow the InfluxQL behaviour to use an upper bound of `now` when + // not found: + // + // See: https://github.com/influxdata/influxdb/blob/98361e207349a3643bcc332d54b009818fe7585f/query/compile.go#L172-L176 + Range { + start, + end: Bound::Unbounded, + } => Range { + start, + end: Bound::Excluded(now()), + }, + time_range => time_range, + }; + + ( + date_bin_args[0].clone(), + time_range, + date_bin_args[2].clone(), + ) + } else { + // This is an internal error as the date_bin function is added by the planner and should + // always contain the correct number of arguments. + return Err(DataFusionError::Internal(format!( + "DATE_BIN expects 3 arguments, got {}", + date_bin_args.len() + ))); + }; + + let aggr = Aggregate::try_from_plan(&input)?; + let mut new_group_expr: Vec<_> = aggr + .schema + .fields() + .iter() + .map(|f| Expr::Column(f.qualified_column())) + .collect(); + let aggr_expr = new_group_expr.split_off(aggr.group_expr.len()); + + // The fill strategy for InfluxQL is specified at the query level + let fill_strategy = aggr_expr + .iter() + .cloned() + .map(|e| (e, fill_strategy.clone())) + .collect(); + + let time_column = col(input.schema().fields()[date_bin_index].qualified_column()); + + Ok(LogicalPlan::Extension(Extension { + node: Arc::new(GapFill::try_new( + Arc::new(input), + new_group_expr, + aggr_expr, + GapFillParams { + stride, + time_column, + origin, + time_range, + fill_strategy, + }, + )?), + })) +} + /// Adds [`InfluxQlMetadata`] to the `plan`. /// /// **Note** @@ -1025,13 +1166,18 @@ fn plan_with_metadata(plan: LogicalPlan, metadata: &InfluxQlMetadata) -> Result< /// Returns `true` if any expressions refer to an aggregate function. fn has_aggregate_exprs(fields: &FieldList) -> bool { - fields.iter().any(|f| { - walk_expr(&f.expr, &mut |e| match e { - IQLExpr::Call { name, .. } if is_aggregate_function(name) => ControlFlow::Break(()), - _ => ControlFlow::Continue(()), - }) - .is_break() + fields.iter().any(is_aggregate_field) +} + +/// A utility function that checks whether `f` is an +/// aggregate field or not. An aggregate field is one that contains at least one +/// call to an aggregate function. +fn is_aggregate_field(f: &Field) -> bool { + walk_expr(&f.expr, &mut |e| match e { + IQLExpr::Call { name, .. } if is_aggregate_function(name) => ControlFlow::Break(()), + _ => ControlFlow::Continue(()), }) + .is_break() } /// Find all the columns where the resolved data type @@ -1762,105 +1908,191 @@ mod test { mod select_aggregate { use super::*; - #[test] - fn test_single_measurement() { - assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data"), @r###" + mod single_measurement { + use super::*; + + #[test] + fn no_group_by() { + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N] Aggregate: groupBy=[[]], aggr=[[COUNT(data.f64_field)]] [COUNT(data.f64_field):Int64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] "###); - assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY non_existent"), @r###" + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY non_existent"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), non_existent:Null;N, count:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, NULL AS non_existent, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), non_existent:Null;N, count:Int64;N] Aggregate: groupBy=[[]], aggr=[[COUNT(data.f64_field)]] [COUNT(data.f64_field):Int64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] "###); - assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo"), @r###" + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo"), @r###" Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, data.foo AS foo, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N] Aggregate: groupBy=[[data.foo]], aggr=[[COUNT(data.f64_field)]] [foo:Dictionary(Int32, Utf8);N, COUNT(data.f64_field):Int64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] "###); - // The `COUNT(f64_field)` aggregate is only projected ones in the Aggregate and reused in the projection - assert_snapshot!(plan("SELECT COUNT(f64_field), COUNT(f64_field) + COUNT(f64_field), COUNT(f64_field) * 3 FROM data"), @r###" + // The `COUNT(f64_field)` aggregate is only projected ones in the Aggregate and reused in the projection + assert_snapshot!(plan("SELECT COUNT(f64_field), COUNT(f64_field) + COUNT(f64_field), COUNT(f64_field) * 3 FROM data"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N, count_f64_field_count_f64_field:Int64;N, count_f64_field:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, COUNT(data.f64_field) AS count, COUNT(data.f64_field) + COUNT(data.f64_field) AS count_f64_field_count_f64_field, COUNT(data.f64_field) * Int64(3) AS count_f64_field [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N, count_f64_field_count_f64_field:Int64;N, count_f64_field:Int64;N] Aggregate: groupBy=[[]], aggr=[[COUNT(data.f64_field)]] [COUNT(data.f64_field):Int64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] "###); - // non-existent tags are excluded from the Aggregate groupBy and Sort operators - assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo, non_existent"), @r###" + // non-existent tags are excluded from the Aggregate groupBy and Sort operators + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo, non_existent"), @r###" Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, non_existent:Null;N, count:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, data.foo AS foo, NULL AS non_existent, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, non_existent:Null;N, count:Int64;N] Aggregate: groupBy=[[data.foo]], aggr=[[COUNT(data.f64_field)]] [foo:Dictionary(Int32, Utf8);N, COUNT(data.f64_field):Int64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] "###); - // Fallible + // Aggregate expression is projected once and reused in final projection + assert_snapshot!(plan("SELECT COUNT(f64_field), COUNT(f64_field) * 2 FROM data"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N, count_f64_field:Int64;N] + Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, COUNT(data.f64_field) AS count, COUNT(data.f64_field) * Int64(2) AS count_f64_field [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N, count_f64_field:Int64;N] + Aggregate: groupBy=[[]], aggr=[[COUNT(data.f64_field)]] [COUNT(data.f64_field):Int64;N] + TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + "###); - // Cannot combine aggregate and non-aggregate columns in the projection - assert_snapshot!(plan("SELECT COUNT(f64_field), f64_field FROM data"), @"Error during planning: mixing aggregate and non-aggregate columns is not supported"); - assert_snapshot!(plan("SELECT COUNT(f64_field) + f64_field FROM data"), @"Error during planning: mixing aggregate and non-aggregate columns is not supported"); - } + // Fallible - #[test] - fn test_single_measurement_group_by_time() { - assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(none)"), @r###" + // Cannot combine aggregate and non-aggregate columns in the projection + assert_snapshot!(plan("SELECT COUNT(f64_field), f64_field FROM data"), @"Error during planning: mixing aggregate and non-aggregate columns is not supported"); + assert_snapshot!(plan("SELECT COUNT(f64_field) + f64_field FROM data"), @"Error during planning: mixing aggregate and non-aggregate columns is not supported"); + } + + #[test] + fn group_by_time() { + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(none)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] "###); - // supports offset parameter - assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s, 5s) FILL(none)"), @r###" + // supports offset parameter + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s, 5s) FILL(none)"), @r###" Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(5000000000, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] "###); - } - - /// These tests validate the planner returns an error when using features that - /// are not implemented. - mod not_implemented { - use super::*; - - #[test] - fn test_with_limit_or_offset() { - assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data LIMIT 1"), @r###" - Limit: skip=0, fetch=1 [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N] - Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N] - Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N] - Aggregate: groupBy=[[]], aggr=[[COUNT(data.f64_field)]] [COUNT(data.f64_field):Int64;N] - TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] - "###); - assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data OFFSET 1"), @r###" - Limit: skip=1, fetch=None [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N] - Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N] - Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N] - Aggregate: groupBy=[[]], aggr=[[COUNT(data.f64_field)]] [COUNT(data.f64_field):Int64;N] - TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] - "###); } #[test] - fn test_group_by_time_precision() { - assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10u) FILL(none)"), @"This feature is not implemented: interval limited to a precision of milliseconds. See https://github.com/influxdata/influxdb_iox/issues/7204"); - } + fn group_by_time_gapfill() { + // No time bounds + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s)"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + "###); + + // No lower time bounds + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data WHERE time < '2022-10-31T02:02:00Z' GROUP BY TIME(10s)"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(TimestampNanosecond(1667181720000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + Filter: data.time < TimestampNanosecond(1667181720000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + "###); + + // No upper time bounds + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data WHERE time >= '2022-10-31T02:00:00Z' GROUP BY TIME(10s)"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Included(TimestampNanosecond(1667181600000000000, None))..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + Filter: data.time >= TimestampNanosecond(1667181600000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + "###); - #[test] - fn test_single_measurement_group_by_time_gapfill() { // Default is FILL(null) - assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s)"), @"This feature is not implemented: FILL(NULL)"); - assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(null)"), @"This feature is not implemented: FILL(NULL)"); - assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(linear)"), @"This feature is not implemented: FILL(LINEAR)"); - assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(previous)"), @"This feature is not implemented: FILL(PREVIOUS)"); - assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(0)"), @"This feature is not implemented: FILL(0)"); + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(10s)"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Included(TimestampNanosecond(1667181600000000000, None))..Excluded(TimestampNanosecond(1667181720000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + Filter: data.time >= TimestampNanosecond(1667181600000000000, None) AND data.time < TimestampNanosecond(1667181720000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + "###); + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s)"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + "###); + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(null)"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + "###); + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(previous)"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + GapFill: groupBy=[[time]], aggr=[[LOCF(null-as-missing, COUNT(data.f64_field))]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + "###); + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(0)"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, coalesce(COUNT(data.f64_field), Int64(0)) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + "###); + + // Coalesces the fill value, which is a float, to the matching type of a `COUNT` aggregate. + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(3.2)"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, coalesce(COUNT(data.f64_field), Int64(3)) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N] + GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N] + TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + "###); + + // Aggregates as part of a binary expression + assert_snapshot!(plan("SELECT COUNT(f64_field) + MEAN(f64_field) FROM data GROUP BY TIME(10s) FILL(3.2)"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count_f64_field_mean_f64_field:Float64;N] + Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, coalesce(COUNT(data.f64_field), Int64(3)) + coalesce(AVG(data.f64_field), Float64(3.2)) AS count_f64_field_mean_f64_field [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count_f64_field_mean_f64_field:Float64;N] + GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field), AVG(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N, AVG(data.f64_field):Float64;N] + Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field), AVG(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N, AVG(data.f64_field):Float64;N] + TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N] + "###); + } + + /// These tests validate the planner returns an error when using features that + /// are not implemented or supported. + mod not_implemented { + use super::*; + + /// Tracked by + #[test] + fn with_limit_or_offset() { + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo LIMIT 1"), @"This feature is not implemented: GROUP BY combined with LIMIT or OFFSET clause"); + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo OFFSET 1"), @"This feature is not implemented: GROUP BY combined with LIMIT or OFFSET clause"); + } + + /// Tracked by + #[test] + fn group_by_time_precision() { + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10u) FILL(none)"), @"This feature is not implemented: interval limited to a precision of milliseconds. See https://github.com/influxdata/influxdb_iox/issues/7204"); + } + + /// Tracked by + #[test] + fn group_by_time_gapfill() { + assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(linear)"), @"This feature is not implemented: FILL(LINEAR)"); + } } } } diff --git a/iox_query_influxql/src/plan/util.rs b/iox_query_influxql/src/plan/util.rs index d2d1871a03..a7bcbe1808 100644 --- a/iox_query_influxql/src/plan/util.rs +++ b/iox_query_influxql/src/plan/util.rs @@ -1,6 +1,10 @@ +use crate::plan::util_copy; +use arrow::datatypes::DataType; use datafusion::common::{DFSchema, DFSchemaRef, DataFusionError, Result}; -use datafusion::logical_expr::Operator; +use datafusion::logical_expr::utils::expr_as_column_expr; +use datafusion::logical_expr::{coalesce, lit, Expr, ExprSchemable, LogicalPlan, Operator}; use influxdb_influxql_parser::expression::BinaryOperator; +use influxdb_influxql_parser::literal::Number; use influxdb_influxql_parser::string::Regex; use query_functions::clean_non_meta_escapes; use schema::Schema; @@ -51,3 +55,65 @@ pub(crate) fn parse_regex(re: &Regex) -> Result { DataFusionError::External(format!("invalid regular expression '{re}': {e}").into()) }) } + +/// Returns `n` as a literal expression of the specified `data_type`. +fn number_to_expr(n: &Number, data_type: DataType) -> Result { + Ok(match (n, data_type) { + (Number::Integer(v), DataType::Int64) => lit(*v), + (Number::Integer(v), DataType::Float64) => lit(*v as f64), + (Number::Integer(v), DataType::UInt64) => lit(*v as u64), + (Number::Float(v), DataType::Int64) => lit(*v as i64), + (Number::Float(v), DataType::Float64) => lit(*v), + (Number::Float(v), DataType::UInt64) => lit(*v as u64), + (n, data_type) => { + // The only output data types expected are Int64, Float64 or UInt64 + return Err(DataFusionError::Internal(format!( + "no conversion from {n} to {data_type}" + ))); + } + }) +} + +/// Rebuilds an `Expr` as a projection on top of a collection of `Expr`'s. +/// +/// For example, the expression `a + b < 1` would require, as input, the 2 +/// individual columns, `a` and `b`. But, if the base expressions already +/// contain the `a + b` result, then that may be used in lieu of the `a` and +/// `b` columns. +/// +/// This is useful in the context of a query like: +/// +/// SELECT a + b < 1 ... GROUP BY a + b +/// +/// where post-aggregation, `a + b` need not be a projection against the +/// individual columns `a` and `b`, but rather it is a projection against the +/// `a + b` found in the GROUP BY. +/// +/// `fill_if_null` will be used to coalesce any expressions from `NULL`. +/// This is used with the `FILL()` strategy. +pub(crate) fn rebase_expr( + expr: &Expr, + base_exprs: &[Expr], + fill_if_null: &Option, + plan: &LogicalPlan, +) -> Result { + if let Some(value) = fill_if_null { + util_copy::clone_with_replacement(expr, &|nested_expr| { + Ok(if base_exprs.contains(nested_expr) { + let col_expr = expr_as_column_expr(nested_expr, plan)?; + let data_type = col_expr.get_type(plan.schema())?; + Some(coalesce(vec![col_expr, number_to_expr(value, data_type)?])) + } else { + None + }) + }) + } else { + util_copy::clone_with_replacement(expr, &|nested_expr| { + Ok(if base_exprs.contains(nested_expr) { + Some(expr_as_column_expr(nested_expr, plan)?) + } else { + None + }) + }) + } +} diff --git a/iox_query_influxql/src/plan/util_copy.rs b/iox_query_influxql/src/plan/util_copy.rs index 94b8bfd2a1..d420ca7caa 100644 --- a/iox_query_influxql/src/plan/util_copy.rs +++ b/iox_query_influxql/src/plan/util_copy.rs @@ -17,32 +17,6 @@ use datafusion::logical_expr::{ LogicalPlan, }; -/// Rebuilds an `Expr` as a projection on top of a collection of `Expr`'s. -/// -/// For example, the expression `a + b < 1` would require, as input, the 2 -/// individual columns, `a` and `b`. But, if the base expressions already -/// contain the `a + b` result, then that may be used in lieu of the `a` and -/// `b` columns. -/// -/// This is useful in the context of a query like: -/// -/// SELECT a + b < 1 ... GROUP BY a + b -/// -/// where post-aggregation, `a + b` need not be a projection against the -/// individual columns `a` and `b`, but rather it is a projection against the -/// `a + b` found in the GROUP BY. -/// -/// Source: -pub(crate) fn rebase_expr(expr: &Expr, base_exprs: &[Expr], plan: &LogicalPlan) -> Result { - clone_with_replacement(expr, &|nested_expr| { - if base_exprs.contains(nested_expr) { - Ok(Some(expr_as_column_expr(nested_expr, plan)?)) - } else { - Ok(None) - } - }) -} - /// Returns a cloned `Expr`, but any of the `Expr`'s in the tree may be /// replaced/customized by the replacement function. /// @@ -62,7 +36,7 @@ pub(crate) fn rebase_expr(expr: &Expr, base_exprs: &[Expr], plan: &LogicalPlan) /// `clone_with_replacement()`. /// /// Source: -fn clone_with_replacement(expr: &Expr, replacement_fn: &F) -> Result +pub(super) fn clone_with_replacement(expr: &Expr, replacement_fn: &F) -> Result where F: Fn(&Expr) -> Result>, {