feat: Teach InfluxQL how to process `FILL(null|previous|<value>)` (#7359)

* chore: Publicise gap-filling APIs

Helps #6916

* feat: IOx learns `FILL(null|previous|<value>)`

Helps #6916

* chore: More test cases

* chore: Revert change to TreeNodeVisitor

* chore: Update snapshot with expected gap-filling changes
pull/24376/head
Stuart Carnie 2023-03-30 10:11:20 +11:00 committed by GitHub
parent f41c1a7945
commit 19a0c7fe9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 807 additions and 173 deletions

View File

@ -289,7 +289,10 @@ SELECT usage_idle, bytes_free, device, cpu FROM cpu, disk GROUP BY device, cpu;
-- Aggregate queries
--
SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0;
SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0, m1;
SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY tag0;
SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0, m1 GROUP BY tag0;
SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY tag0, non_existent;
SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY non_existent;
SELECT COUNT(f64), COUNT(f64) + COUNT(f64), COUNT(f64) * 3 FROM m0;
@ -298,6 +301,17 @@ SELECT COUNT(f64) as the_count, SUM(non_existent) as foo FROM m0;
-- non-existent columns in an aggregate expression should evaluate to NULL
SELECT COUNT(f64) as the_count, SUM(f64) + SUM(non_existent) as foo FROM m0;
-- measurements with different schema
SELECT MEAN(usage_idle), MEAN(bytes_free) FROM cpu, disk;
SELECT MEAN(usage_idle), MEAN(bytes_free) FROM cpu, disk GROUP BY TIME(10s) FILL(none);
-- TODO(sgc): The following two queries produce incorrect results
-- See: https://github.com/influxdata/influxdb_iox/issues/7361
-- using aggregates across measurements
-- SELECT MEAN(usage_idle) + MEAN(bytes_free) FROM cpu, disk;
-- using aggregates with missing fields
-- SELECT MEAN(usage_idle) + MEAN(foo) FROM cpu;
SELECT COUNT(f64), SUM(f64) FROM m0 GROUP BY TIME(30s) FILL(none);
-- supports offset parameter
SELECT COUNT(f64), SUM(f64) FROM m0 GROUP BY TIME(30s, 1s) FILL(none);
@ -317,12 +331,43 @@ SELECT SUM(usage_idle) FROM cpu, disk GROUP BY cpu;
SELECT COUNT(usage_idle) + usage_idle FROM cpu;
SELECT COUNT(usage_idle), usage_idle FROM cpu;
--
-- gap filling via FILL clause
--
-- Default FILL(null) when FILL is omitted
SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s);
SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s);
SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(null);
SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(null);
SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(previous);
SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(previous);
SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(3.14);
SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(3.14);
-- correct data type of FILL(value) depending on the data type of the aggregate
SELECT COUNT(usage_idle), SUM(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(3.14);
SELECT COUNT(bytes_free), SUM(bytes_free) FROM disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(3.14);
-- can combine multiple aggregates into a binary expression
SELECT COUNT(bytes_free) as a, SUM(bytes_free) as b, COUNT(bytes_free) + SUM(bytes_free) as "a+b" FROM disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(5);
-- grouping by tags
SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu;
SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu FILL(null);
SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device FILL(null);
SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu FILL(previous);
SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device FILL(previous);
SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu FILL(3.14);
SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device FILL(3.14);
-- Succeeds without upper bound
-- NOTE: expected to return no data, as there is none within the time range
SELECT COUNT(usage_idle) FROM cpu WHERE time >= now() - 2m GROUP BY TIME(30s) FILL(null);
-- Unimplemented cases
-- TODO(sgc): No gap filling
-- Default FILL(null) when FILL is omitted
SELECT COUNT(usage_idle) FROM cpu GROUP BY TIME(30s);
SELECT COUNT(usage_idle) FROM cpu GROUP BY TIME(30s) FILL(previous);
SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(linear);
-- LIMIT and OFFSET aren't supported with aggregates and groups
SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu LIMIT 1;

View File

@ -417,12 +417,12 @@ Error while planning query: Error during planning: invalid number of arguments f
| cpu | 2022-10-31T02:00:10Z | 2.99 | |
| cpu | 2022-10-31T02:00:10Z | 0.99 | |
| cpu | 2022-10-31T02:00:10Z | 1.99 | |
| disk | 2022-10-31T02:00:00Z | | 219838.0 |
| disk | 2022-10-31T02:00:00Z | | 319838.0 |
| disk | 2022-10-31T02:00:00Z | | 419838.0 |
| disk | 2022-10-31T02:00:10Z | | 219833.0 |
| disk | 2022-10-31T02:00:10Z | | 319833.0 |
| disk | 2022-10-31T02:00:10Z | | 419833.0 |
| disk | 2022-10-31T02:00:00Z | | 219838 |
| disk | 2022-10-31T02:00:00Z | | 319838 |
| disk | 2022-10-31T02:00:00Z | | 419838 |
| disk | 2022-10-31T02:00:10Z | | 219833 |
| disk | 2022-10-31T02:00:10Z | | 319833 |
| disk | 2022-10-31T02:00:10Z | | 419833 |
+------------------+----------------------+------------+------------+
-- InfluxQL: SELECT cpu, usage_idle FROM cpu;
+------------------+----------------------+-----------+------------+
@ -489,12 +489,12 @@ Error while planning query: Error during planning: invalid number of arguments f
| cpu | 2022-10-31T02:00:10Z | cpu0 | 0.99 | |
| cpu | 2022-10-31T02:00:00Z | cpu1 | 1.98 | |
| cpu | 2022-10-31T02:00:10Z | cpu1 | 1.99 | |
| disk | 2022-10-31T02:00:00Z | | | 1234.0 |
| disk | 2022-10-31T02:00:00Z | | | 2234.0 |
| disk | 2022-10-31T02:00:00Z | | | 3234.0 |
| disk | 2022-10-31T02:00:10Z | | | 1239.0 |
| disk | 2022-10-31T02:00:10Z | | | 2239.0 |
| disk | 2022-10-31T02:00:10Z | | | 3239.0 |
| disk | 2022-10-31T02:00:00Z | | | 1234 |
| disk | 2022-10-31T02:00:00Z | | | 2234 |
| disk | 2022-10-31T02:00:00Z | | | 3234 |
| disk | 2022-10-31T02:00:10Z | | | 1239 |
| disk | 2022-10-31T02:00:10Z | | | 2239 |
| disk | 2022-10-31T02:00:10Z | | | 3239 |
+------------------+----------------------+-----------+------------+------------+
-- InfluxQL: SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY cpu, non_existent;
+------------------+----------------------+-----------+--------------+------------+------------+
@ -506,12 +506,12 @@ Error while planning query: Error during planning: invalid number of arguments f
| cpu | 2022-10-31T02:00:10Z | cpu0 | | 0.99 | |
| cpu | 2022-10-31T02:00:00Z | cpu1 | | 1.98 | |
| cpu | 2022-10-31T02:00:10Z | cpu1 | | 1.99 | |
| disk | 2022-10-31T02:00:00Z | | | | 1234.0 |
| disk | 2022-10-31T02:00:00Z | | | | 2234.0 |
| disk | 2022-10-31T02:00:00Z | | | | 3234.0 |
| disk | 2022-10-31T02:00:10Z | | | | 1239.0 |
| disk | 2022-10-31T02:00:10Z | | | | 2239.0 |
| disk | 2022-10-31T02:00:10Z | | | | 3239.0 |
| disk | 2022-10-31T02:00:00Z | | | | 1234 |
| disk | 2022-10-31T02:00:00Z | | | | 2234 |
| disk | 2022-10-31T02:00:00Z | | | | 3234 |
| disk | 2022-10-31T02:00:10Z | | | | 1239 |
| disk | 2022-10-31T02:00:10Z | | | | 2239 |
| disk | 2022-10-31T02:00:10Z | | | | 3239 |
+------------------+----------------------+-----------+--------------+------------+------------+
-- InfluxQL: SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY cpu, device;
+------------------+----------------------+-----------+---------+------------+------------+
@ -523,12 +523,12 @@ Error while planning query: Error during planning: invalid number of arguments f
| cpu | 2022-10-31T02:00:10Z | cpu0 | | 0.99 | |
| cpu | 2022-10-31T02:00:00Z | cpu1 | | 1.98 | |
| cpu | 2022-10-31T02:00:10Z | cpu1 | | 1.99 | |
| disk | 2022-10-31T02:00:00Z | | disk1s1 | | 1234.0 |
| disk | 2022-10-31T02:00:10Z | | disk1s1 | | 1239.0 |
| disk | 2022-10-31T02:00:00Z | | disk1s2 | | 2234.0 |
| disk | 2022-10-31T02:00:10Z | | disk1s2 | | 2239.0 |
| disk | 2022-10-31T02:00:00Z | | disk1s5 | | 3234.0 |
| disk | 2022-10-31T02:00:10Z | | disk1s5 | | 3239.0 |
| disk | 2022-10-31T02:00:00Z | | disk1s1 | | 1234 |
| disk | 2022-10-31T02:00:10Z | | disk1s1 | | 1239 |
| disk | 2022-10-31T02:00:00Z | | disk1s2 | | 2234 |
| disk | 2022-10-31T02:00:10Z | | disk1s2 | | 2239 |
| disk | 2022-10-31T02:00:00Z | | disk1s5 | | 3234 |
| disk | 2022-10-31T02:00:10Z | | disk1s5 | | 3239 |
+------------------+----------------------+-----------+---------+------------+------------+
-- InfluxQL: SELECT usage_idle, bytes_free FROM cpu, disk GROUP BY device, cpu;
+------------------+----------------------+-----------+---------+------------+------------+
@ -540,12 +540,12 @@ Error while planning query: Error during planning: invalid number of arguments f
| cpu | 2022-10-31T02:00:10Z | cpu0 | | 0.99 | |
| cpu | 2022-10-31T02:00:00Z | cpu1 | | 1.98 | |
| cpu | 2022-10-31T02:00:10Z | cpu1 | | 1.99 | |
| disk | 2022-10-31T02:00:00Z | | disk1s1 | | 1234.0 |
| disk | 2022-10-31T02:00:10Z | | disk1s1 | | 1239.0 |
| disk | 2022-10-31T02:00:00Z | | disk1s2 | | 2234.0 |
| disk | 2022-10-31T02:00:10Z | | disk1s2 | | 2239.0 |
| disk | 2022-10-31T02:00:00Z | | disk1s5 | | 3234.0 |
| disk | 2022-10-31T02:00:10Z | | disk1s5 | | 3239.0 |
| disk | 2022-10-31T02:00:00Z | | disk1s1 | | 1234 |
| disk | 2022-10-31T02:00:10Z | | disk1s1 | | 1239 |
| disk | 2022-10-31T02:00:00Z | | disk1s2 | | 2234 |
| disk | 2022-10-31T02:00:10Z | | disk1s2 | | 2239 |
| disk | 2022-10-31T02:00:00Z | | disk1s5 | | 3234 |
| disk | 2022-10-31T02:00:10Z | | disk1s5 | | 3239 |
+------------------+----------------------+-----------+---------+------------+------------+
-- InfluxQL: SELECT usage_idle, bytes_free, device, cpu FROM cpu, disk GROUP BY device, cpu;
+------------------+----------------------+------------+------------+---------+-----------+
@ -557,13 +557,26 @@ Error while planning query: Error during planning: invalid number of arguments f
| cpu | 2022-10-31T02:00:10Z | 0.99 | | | cpu0 |
| cpu | 2022-10-31T02:00:00Z | 1.98 | | | cpu1 |
| cpu | 2022-10-31T02:00:10Z | 1.99 | | | cpu1 |
| disk | 2022-10-31T02:00:00Z | | 1234.0 | disk1s1 | |
| disk | 2022-10-31T02:00:10Z | | 1239.0 | disk1s1 | |
| disk | 2022-10-31T02:00:00Z | | 2234.0 | disk1s2 | |
| disk | 2022-10-31T02:00:10Z | | 2239.0 | disk1s2 | |
| disk | 2022-10-31T02:00:00Z | | 3234.0 | disk1s5 | |
| disk | 2022-10-31T02:00:10Z | | 3239.0 | disk1s5 | |
| disk | 2022-10-31T02:00:00Z | | 1234 | disk1s1 | |
| disk | 2022-10-31T02:00:10Z | | 1239 | disk1s1 | |
| disk | 2022-10-31T02:00:00Z | | 2234 | disk1s2 | |
| disk | 2022-10-31T02:00:10Z | | 2239 | disk1s2 | |
| disk | 2022-10-31T02:00:00Z | | 3234 | disk1s5 | |
| disk | 2022-10-31T02:00:10Z | | 3239 | disk1s5 | |
+------------------+----------------------+------------+------------+---------+-----------+
-- InfluxQL: SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0;
+------------------+----------------------+-------+--------------------+--------------------+
| iox::measurement | time | count | sum | stddev |
+------------------+----------------------+-------+--------------------+--------------------+
| m0 | 1970-01-01T00:00:00Z | 7 | 102.30000000000001 | 4.8912945019454614 |
+------------------+----------------------+-------+--------------------+--------------------+
-- InfluxQL: SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0, m1;
+------------------+----------------------+-------+--------------------+--------------------+
| iox::measurement | time | count | sum | stddev |
+------------------+----------------------+-------+--------------------+--------------------+
| m0 | 1970-01-01T00:00:00Z | 7 | 102.30000000000001 | 4.8912945019454614 |
| m1 | 1970-01-01T00:00:00Z | 3 | 402.8 | 57.4494850571642 |
+------------------+----------------------+-------+--------------------+--------------------+
-- InfluxQL: SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY tag0;
+------------------+----------------------+-------+-------+------+-------------------+
| iox::measurement | time | tag0 | count | sum | stddev |
@ -572,6 +585,16 @@ Error while planning query: Error during planning: invalid number of arguments f
| m0 | 1970-01-01T00:00:00Z | val01 | 1 | 11.3 | |
| m0 | 1970-01-01T00:00:00Z | val02 | 1 | 10.4 | |
+------------------+----------------------+-------+-------+------+-------------------+
-- InfluxQL: SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0, m1 GROUP BY tag0;
+------------------+----------------------+-------+-------+-------+-------------------+
| iox::measurement | time | tag0 | count | sum | stddev |
+------------------+----------------------+-------+-------+-------+-------------------+
| m0 | 1970-01-01T00:00:00Z | val00 | 5 | 80.6 | 5.085961069453836 |
| m0 | 1970-01-01T00:00:00Z | val01 | 1 | 11.3 | |
| m0 | 1970-01-01T00:00:00Z | val02 | 1 | 10.4 | |
| m1 | 1970-01-01T00:00:00Z | val00 | 2 | 301.1 | 70.7813887967734 |
| m1 | 1970-01-01T00:00:00Z | val01 | 1 | 101.7 | |
+------------------+----------------------+-------+-------+-------+-------------------+
-- InfluxQL: SELECT COUNT(f64), SUM(f64), stddev(f64) FROM m0 GROUP BY tag0, non_existent;
+------------------+----------------------+--------------+-------+-------+------+-------------------+
| iox::measurement | time | non_existent | tag0 | count | sum | stddev |
@ -604,6 +627,22 @@ Error while planning query: Error during planning: invalid number of arguments f
+------------------+----------------------+-----------+-----+
| m0 | 1970-01-01T00:00:00Z | 7 | |
+------------------+----------------------+-----------+-----+
-- InfluxQL: SELECT MEAN(usage_idle), MEAN(bytes_free) FROM cpu, disk;
+------------------+----------------------+--------------------+--------+
| iox::measurement | time | mean | mean_1 |
+------------------+----------------------+--------------------+--------+
| cpu | 1970-01-01T00:00:00Z | 1.9850000000000003 | |
| disk | 1970-01-01T00:00:00Z | | 2236.5 |
+------------------+----------------------+--------------------+--------+
-- InfluxQL: SELECT MEAN(usage_idle), MEAN(bytes_free) FROM cpu, disk GROUP BY TIME(10s) FILL(none);
+------------------+----------------------+--------------------+--------+
| iox::measurement | time | mean | mean_1 |
+------------------+----------------------+--------------------+--------+
| cpu | 2022-10-31T02:00:00Z | 1.9799999999999998 | |
| cpu | 2022-10-31T02:00:10Z | 1.9900000000000002 | |
| disk | 2022-10-31T02:00:00Z | | 2234.0 |
| disk | 2022-10-31T02:00:10Z | | 2239.0 |
+------------------+----------------------+--------------------+--------+
-- InfluxQL: SELECT COUNT(f64), SUM(f64) FROM m0 GROUP BY TIME(30s) FILL(none);
+------------------+----------------------+-------+------+
| iox::measurement | time | count | sum |
@ -668,10 +707,281 @@ Error while planning query: Error during planning: invalid number of arguments f
Error while planning query: Error during planning: mixing aggregate and non-aggregate columns is not supported
-- InfluxQL: SELECT COUNT(usage_idle), usage_idle FROM cpu;
Error while planning query: Error during planning: mixing aggregate and non-aggregate columns is not supported
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY TIME(30s);
Error while planning query: This feature is not implemented: FILL(NULL)
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY TIME(30s) FILL(previous);
Error while planning query: This feature is not implemented: FILL(PREVIOUS)
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s);
+------------------+----------------------+-------+
| iox::measurement | time | count |
+------------------+----------------------+-------+
| cpu | 2022-10-31T02:00:00Z | 6 |
| cpu | 2022-10-31T02:00:30Z | |
| cpu | 2022-10-31T02:01:00Z | |
| cpu | 2022-10-31T02:01:30Z | |
+------------------+----------------------+-------+
-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s);
+------------------+----------------------+-------+---------+
| iox::measurement | time | count | count_1 |
+------------------+----------------------+-------+---------+
| cpu | 2022-10-31T02:00:00Z | 6 | |
| cpu | 2022-10-31T02:00:30Z | | |
| cpu | 2022-10-31T02:01:00Z | | |
| cpu | 2022-10-31T02:01:30Z | | |
| disk | 2022-10-31T02:00:00Z | | 6 |
| disk | 2022-10-31T02:00:30Z | | |
| disk | 2022-10-31T02:01:00Z | | |
| disk | 2022-10-31T02:01:30Z | | |
+------------------+----------------------+-------+---------+
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(null);
+------------------+----------------------+-------+
| iox::measurement | time | count |
+------------------+----------------------+-------+
| cpu | 2022-10-31T02:00:00Z | 6 |
| cpu | 2022-10-31T02:00:30Z | |
| cpu | 2022-10-31T02:01:00Z | |
| cpu | 2022-10-31T02:01:30Z | |
+------------------+----------------------+-------+
-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(null);
+------------------+----------------------+-------+---------+
| iox::measurement | time | count | count_1 |
+------------------+----------------------+-------+---------+
| cpu | 2022-10-31T02:00:00Z | 6 | |
| cpu | 2022-10-31T02:00:30Z | | |
| cpu | 2022-10-31T02:01:00Z | | |
| cpu | 2022-10-31T02:01:30Z | | |
| disk | 2022-10-31T02:00:00Z | | 6 |
| disk | 2022-10-31T02:00:30Z | | |
| disk | 2022-10-31T02:01:00Z | | |
| disk | 2022-10-31T02:01:30Z | | |
+------------------+----------------------+-------+---------+
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(previous);
+------------------+----------------------+-------+
| iox::measurement | time | count |
+------------------+----------------------+-------+
| cpu | 2022-10-31T02:00:00Z | 6 |
| cpu | 2022-10-31T02:00:30Z | 6 |
| cpu | 2022-10-31T02:01:00Z | 6 |
| cpu | 2022-10-31T02:01:30Z | 6 |
+------------------+----------------------+-------+
-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(previous);
+------------------+----------------------+-------+---------+
| iox::measurement | time | count | count_1 |
+------------------+----------------------+-------+---------+
| cpu | 2022-10-31T02:00:00Z | 6 | |
| cpu | 2022-10-31T02:00:30Z | 6 | |
| cpu | 2022-10-31T02:01:00Z | 6 | |
| cpu | 2022-10-31T02:01:30Z | 6 | |
| disk | 2022-10-31T02:00:00Z | | 6 |
| disk | 2022-10-31T02:00:30Z | | 6 |
| disk | 2022-10-31T02:01:00Z | | 6 |
| disk | 2022-10-31T02:01:30Z | | 6 |
+------------------+----------------------+-------+---------+
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(3.14);
+------------------+----------------------+-------+
| iox::measurement | time | count |
+------------------+----------------------+-------+
| cpu | 2022-10-31T02:00:00Z | 6 |
| cpu | 2022-10-31T02:00:30Z | 3 |
| cpu | 2022-10-31T02:01:00Z | 3 |
| cpu | 2022-10-31T02:01:30Z | 3 |
+------------------+----------------------+-------+
-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(3.14);
+------------------+----------------------+-------+---------+
| iox::measurement | time | count | count_1 |
+------------------+----------------------+-------+---------+
| cpu | 2022-10-31T02:00:00Z | 6 | |
| cpu | 2022-10-31T02:00:30Z | 3 | |
| cpu | 2022-10-31T02:01:00Z | 3 | |
| cpu | 2022-10-31T02:01:30Z | 3 | |
| disk | 2022-10-31T02:00:00Z | | 6 |
| disk | 2022-10-31T02:00:30Z | | 3 |
| disk | 2022-10-31T02:01:00Z | | 3 |
| disk | 2022-10-31T02:01:30Z | | 3 |
+------------------+----------------------+-------+---------+
-- InfluxQL: SELECT COUNT(usage_idle), SUM(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(3.14);
+------------------+----------------------+-------+--------------------+
| iox::measurement | time | count | sum |
+------------------+----------------------+-------+--------------------+
| cpu | 2022-10-31T02:00:00Z | 6 | 11.910000000000002 |
| cpu | 2022-10-31T02:00:30Z | 3 | 3.14 |
| cpu | 2022-10-31T02:01:00Z | 3 | 3.14 |
| cpu | 2022-10-31T02:01:30Z | 3 | 3.14 |
+------------------+----------------------+-------+--------------------+
-- InfluxQL: SELECT COUNT(bytes_free), SUM(bytes_free) FROM disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(3.14);
+------------------+----------------------+-------+-------+
| iox::measurement | time | count | sum |
+------------------+----------------------+-------+-------+
| disk | 2022-10-31T02:00:00Z | 6 | 13419 |
| disk | 2022-10-31T02:00:30Z | 3 | 3 |
| disk | 2022-10-31T02:01:00Z | 3 | 3 |
| disk | 2022-10-31T02:01:30Z | 3 | 3 |
+------------------+----------------------+-------+-------+
-- InfluxQL: SELECT COUNT(bytes_free) as a, SUM(bytes_free) as b, COUNT(bytes_free) + SUM(bytes_free) as "a+b" FROM disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(5);
+------------------+----------------------+---+-------+-------+
| iox::measurement | time | a | b | "a+b" |
+------------------+----------------------+---+-------+-------+
| disk | 2022-10-31T02:00:00Z | 6 | 13419 | 13425 |
| disk | 2022-10-31T02:00:30Z | 5 | 5 | 10 |
| disk | 2022-10-31T02:01:00Z | 5 | 5 | 10 |
| disk | 2022-10-31T02:01:30Z | 5 | 5 | 10 |
+------------------+----------------------+---+-------+-------+
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu;
+------------------+----------------------+-----------+-------+
| iox::measurement | time | cpu | count |
+------------------+----------------------+-----------+-------+
| cpu | 2022-10-31T02:00:00Z | cpu-total | 2 |
| cpu | 2022-10-31T02:00:30Z | cpu-total | |
| cpu | 2022-10-31T02:01:00Z | cpu-total | |
| cpu | 2022-10-31T02:01:30Z | cpu-total | |
| cpu | 2022-10-31T02:00:00Z | cpu0 | 2 |
| cpu | 2022-10-31T02:00:30Z | cpu0 | |
| cpu | 2022-10-31T02:01:00Z | cpu0 | |
| cpu | 2022-10-31T02:01:30Z | cpu0 | |
| cpu | 2022-10-31T02:00:00Z | cpu1 | 2 |
| cpu | 2022-10-31T02:00:30Z | cpu1 | |
| cpu | 2022-10-31T02:01:00Z | cpu1 | |
| cpu | 2022-10-31T02:01:30Z | cpu1 | |
+------------------+----------------------+-----------+-------+
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu FILL(null);
+------------------+----------------------+-----------+-------+
| iox::measurement | time | cpu | count |
+------------------+----------------------+-----------+-------+
| cpu | 2022-10-31T02:00:00Z | cpu-total | 2 |
| cpu | 2022-10-31T02:00:30Z | cpu-total | |
| cpu | 2022-10-31T02:01:00Z | cpu-total | |
| cpu | 2022-10-31T02:01:30Z | cpu-total | |
| cpu | 2022-10-31T02:00:00Z | cpu0 | 2 |
| cpu | 2022-10-31T02:00:30Z | cpu0 | |
| cpu | 2022-10-31T02:01:00Z | cpu0 | |
| cpu | 2022-10-31T02:01:30Z | cpu0 | |
| cpu | 2022-10-31T02:00:00Z | cpu1 | 2 |
| cpu | 2022-10-31T02:00:30Z | cpu1 | |
| cpu | 2022-10-31T02:01:00Z | cpu1 | |
| cpu | 2022-10-31T02:01:30Z | cpu1 | |
+------------------+----------------------+-----------+-------+
-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device FILL(null);
+------------------+----------------------+-----------+---------+-------+---------+
| iox::measurement | time | cpu | device | count | count_1 |
+------------------+----------------------+-----------+---------+-------+---------+
| cpu | 2022-10-31T02:00:00Z | cpu-total | | 2 | |
| cpu | 2022-10-31T02:00:30Z | cpu-total | | | |
| cpu | 2022-10-31T02:01:00Z | cpu-total | | | |
| cpu | 2022-10-31T02:01:30Z | cpu-total | | | |
| cpu | 2022-10-31T02:00:00Z | cpu0 | | 2 | |
| cpu | 2022-10-31T02:00:30Z | cpu0 | | | |
| cpu | 2022-10-31T02:01:00Z | cpu0 | | | |
| cpu | 2022-10-31T02:01:30Z | cpu0 | | | |
| cpu | 2022-10-31T02:00:00Z | cpu1 | | 2 | |
| cpu | 2022-10-31T02:00:30Z | cpu1 | | | |
| cpu | 2022-10-31T02:01:00Z | cpu1 | | | |
| cpu | 2022-10-31T02:01:30Z | cpu1 | | | |
| disk | 2022-10-31T02:00:00Z | | disk1s1 | | 2 |
| disk | 2022-10-31T02:00:30Z | | disk1s1 | | |
| disk | 2022-10-31T02:01:00Z | | disk1s1 | | |
| disk | 2022-10-31T02:01:30Z | | disk1s1 | | |
| disk | 2022-10-31T02:00:00Z | | disk1s2 | | 2 |
| disk | 2022-10-31T02:00:30Z | | disk1s2 | | |
| disk | 2022-10-31T02:01:00Z | | disk1s2 | | |
| disk | 2022-10-31T02:01:30Z | | disk1s2 | | |
| disk | 2022-10-31T02:00:00Z | | disk1s5 | | 2 |
| disk | 2022-10-31T02:00:30Z | | disk1s5 | | |
| disk | 2022-10-31T02:01:00Z | | disk1s5 | | |
| disk | 2022-10-31T02:01:30Z | | disk1s5 | | |
+------------------+----------------------+-----------+---------+-------+---------+
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu FILL(previous);
+------------------+----------------------+-----------+-------+
| iox::measurement | time | cpu | count |
+------------------+----------------------+-----------+-------+
| cpu | 2022-10-31T02:00:00Z | cpu-total | 2 |
| cpu | 2022-10-31T02:00:30Z | cpu-total | 2 |
| cpu | 2022-10-31T02:01:00Z | cpu-total | 2 |
| cpu | 2022-10-31T02:01:30Z | cpu-total | 2 |
| cpu | 2022-10-31T02:00:00Z | cpu0 | 2 |
| cpu | 2022-10-31T02:00:30Z | cpu0 | 2 |
| cpu | 2022-10-31T02:01:00Z | cpu0 | 2 |
| cpu | 2022-10-31T02:01:30Z | cpu0 | 2 |
| cpu | 2022-10-31T02:00:00Z | cpu1 | 2 |
| cpu | 2022-10-31T02:00:30Z | cpu1 | 2 |
| cpu | 2022-10-31T02:01:00Z | cpu1 | 2 |
| cpu | 2022-10-31T02:01:30Z | cpu1 | 2 |
+------------------+----------------------+-----------+-------+
-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device FILL(previous);
+------------------+----------------------+-----------+---------+-------+---------+
| iox::measurement | time | cpu | device | count | count_1 |
+------------------+----------------------+-----------+---------+-------+---------+
| cpu | 2022-10-31T02:00:00Z | cpu-total | | 2 | |
| cpu | 2022-10-31T02:00:30Z | cpu-total | | 2 | |
| cpu | 2022-10-31T02:01:00Z | cpu-total | | 2 | |
| cpu | 2022-10-31T02:01:30Z | cpu-total | | 2 | |
| cpu | 2022-10-31T02:00:00Z | cpu0 | | 2 | |
| cpu | 2022-10-31T02:00:30Z | cpu0 | | 2 | |
| cpu | 2022-10-31T02:01:00Z | cpu0 | | 2 | |
| cpu | 2022-10-31T02:01:30Z | cpu0 | | 2 | |
| cpu | 2022-10-31T02:00:00Z | cpu1 | | 2 | |
| cpu | 2022-10-31T02:00:30Z | cpu1 | | 2 | |
| cpu | 2022-10-31T02:01:00Z | cpu1 | | 2 | |
| cpu | 2022-10-31T02:01:30Z | cpu1 | | 2 | |
| disk | 2022-10-31T02:00:00Z | | disk1s1 | | 2 |
| disk | 2022-10-31T02:00:30Z | | disk1s1 | | 2 |
| disk | 2022-10-31T02:01:00Z | | disk1s1 | | 2 |
| disk | 2022-10-31T02:01:30Z | | disk1s1 | | 2 |
| disk | 2022-10-31T02:00:00Z | | disk1s2 | | 2 |
| disk | 2022-10-31T02:00:30Z | | disk1s2 | | 2 |
| disk | 2022-10-31T02:01:00Z | | disk1s2 | | 2 |
| disk | 2022-10-31T02:01:30Z | | disk1s2 | | 2 |
| disk | 2022-10-31T02:00:00Z | | disk1s5 | | 2 |
| disk | 2022-10-31T02:00:30Z | | disk1s5 | | 2 |
| disk | 2022-10-31T02:01:00Z | | disk1s5 | | 2 |
| disk | 2022-10-31T02:01:30Z | | disk1s5 | | 2 |
+------------------+----------------------+-----------+---------+-------+---------+
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu FILL(3.14);
+------------------+----------------------+-----------+-------+
| iox::measurement | time | cpu | count |
+------------------+----------------------+-----------+-------+
| cpu | 2022-10-31T02:00:00Z | cpu-total | 2 |
| cpu | 2022-10-31T02:00:30Z | cpu-total | 3 |
| cpu | 2022-10-31T02:01:00Z | cpu-total | 3 |
| cpu | 2022-10-31T02:01:30Z | cpu-total | 3 |
| cpu | 2022-10-31T02:00:00Z | cpu0 | 2 |
| cpu | 2022-10-31T02:00:30Z | cpu0 | 3 |
| cpu | 2022-10-31T02:01:00Z | cpu0 | 3 |
| cpu | 2022-10-31T02:01:30Z | cpu0 | 3 |
| cpu | 2022-10-31T02:00:00Z | cpu1 | 2 |
| cpu | 2022-10-31T02:00:30Z | cpu1 | 3 |
| cpu | 2022-10-31T02:01:00Z | cpu1 | 3 |
| cpu | 2022-10-31T02:01:30Z | cpu1 | 3 |
+------------------+----------------------+-----------+-------+
-- InfluxQL: SELECT COUNT(usage_idle), COUNT(bytes_free) FROM cpu, disk WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s), cpu, device FILL(3.14);
+------------------+----------------------+-----------+---------+-------+---------+
| iox::measurement | time | cpu | device | count | count_1 |
+------------------+----------------------+-----------+---------+-------+---------+
| cpu | 2022-10-31T02:00:00Z | cpu-total | | 2 | |
| cpu | 2022-10-31T02:00:30Z | cpu-total | | 3 | |
| cpu | 2022-10-31T02:01:00Z | cpu-total | | 3 | |
| cpu | 2022-10-31T02:01:30Z | cpu-total | | 3 | |
| cpu | 2022-10-31T02:00:00Z | cpu0 | | 2 | |
| cpu | 2022-10-31T02:00:30Z | cpu0 | | 3 | |
| cpu | 2022-10-31T02:01:00Z | cpu0 | | 3 | |
| cpu | 2022-10-31T02:01:30Z | cpu0 | | 3 | |
| cpu | 2022-10-31T02:00:00Z | cpu1 | | 2 | |
| cpu | 2022-10-31T02:00:30Z | cpu1 | | 3 | |
| cpu | 2022-10-31T02:01:00Z | cpu1 | | 3 | |
| cpu | 2022-10-31T02:01:30Z | cpu1 | | 3 | |
| disk | 2022-10-31T02:00:00Z | | disk1s1 | | 2 |
| disk | 2022-10-31T02:00:30Z | | disk1s1 | | 3 |
| disk | 2022-10-31T02:01:00Z | | disk1s1 | | 3 |
| disk | 2022-10-31T02:01:30Z | | disk1s1 | | 3 |
| disk | 2022-10-31T02:00:00Z | | disk1s2 | | 2 |
| disk | 2022-10-31T02:00:30Z | | disk1s2 | | 3 |
| disk | 2022-10-31T02:01:00Z | | disk1s2 | | 3 |
| disk | 2022-10-31T02:01:30Z | | disk1s2 | | 3 |
| disk | 2022-10-31T02:00:00Z | | disk1s5 | | 2 |
| disk | 2022-10-31T02:00:30Z | | disk1s5 | | 3 |
| disk | 2022-10-31T02:01:00Z | | disk1s5 | | 3 |
| disk | 2022-10-31T02:01:30Z | | disk1s5 | | 3 |
+------------------+----------------------+-----------+---------+-------+---------+
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= now() - 2m GROUP BY TIME(30s) FILL(null);
++
++
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(30s) FILL(linear);
Error while planning query: This feature is not implemented: FILL(LINEAR)
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu LIMIT 1;
Error while planning query: This feature is not implemented: GROUP BY combined with LIMIT or OFFSET clause
-- InfluxQL: SELECT COUNT(usage_idle) FROM cpu GROUP BY cpu OFFSET 1;

View File

@ -1240,12 +1240,12 @@ pub static SETUPS: Lazy<HashMap<SetupName, SetupSteps>> = Lazy::new(|| {
cpu,host=host1,cpu=cpu0 usage_idle=0.99,usage_system=0.1 1667181610000000000
cpu,host=host1,cpu=cpu1 usage_idle=1.98,usage_system=1.2 1667181600000000000
cpu,host=host1,cpu=cpu1 usage_idle=1.99,usage_system=1.1 1667181610000000000
disk,host=host1,device=disk1s1 bytes_free=1234,bytes_used=219838 1667181600000000000
disk,host=host1,device=disk1s1 bytes_free=1239,bytes_used=219833 1667181610000000000
disk,host=host1,device=disk1s2 bytes_free=2234,bytes_used=319838 1667181600000000000
disk,host=host1,device=disk1s2 bytes_free=2239,bytes_used=319833 1667181610000000000
disk,host=host1,device=disk1s5 bytes_free=3234,bytes_used=419838 1667181600000000000
disk,host=host1,device=disk1s5 bytes_free=3239,bytes_used=419833 1667181610000000000
disk,host=host1,device=disk1s1 bytes_free=1234i,bytes_used=219838i 1667181600000000000
disk,host=host1,device=disk1s1 bytes_free=1239i,bytes_used=219833i 1667181610000000000
disk,host=host1,device=disk1s2 bytes_free=2234i,bytes_used=319838i 1667181600000000000
disk,host=host1,device=disk1s2 bytes_free=2239i,bytes_used=319833i 1667181610000000000
disk,host=host1,device=disk1s5 bytes_free=3234i,bytes_used=419838i 1667181600000000000
disk,host=host1,device=disk1s5 bytes_free=3239i,bytes_used=419833i 1667181610000000000
"#
.to_string(),
),

View File

@ -4,7 +4,7 @@
pub(crate) mod context;
pub mod field;
pub mod fieldlist;
pub(crate) mod gapfill;
pub mod gapfill;
mod non_null_checker;
mod query_tracing;
mod schema_pivot;

View File

@ -34,15 +34,20 @@ use self::stream::GapFillStream;
/// A logical node that represents the gap filling operation.
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct GapFill {
input: Arc<LogicalPlan>,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,
params: GapFillParams,
/// The incoming logical plan
pub input: Arc<LogicalPlan>,
/// Grouping expressions
pub group_expr: Vec<Expr>,
/// Aggregate expressions
pub aggr_expr: Vec<Expr>,
/// Parameters to configure the behavior of the
/// gap-filling operation
pub params: GapFillParams,
}
/// Parameters to the GapFill operation
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub(crate) struct GapFillParams {
pub struct GapFillParams {
/// The stride argument from the call to DATE_BIN_GAPFILL
pub stride: Expr,
/// The source time column
@ -149,7 +154,8 @@ impl GapFillParams {
}
impl GapFill {
pub(crate) fn try_new(
/// Create a new gap-filling operator.
pub fn try_new(
input: Arc<LogicalPlan>,
group_expr: Vec<Expr>,
aggr_expr: Vec<Expr>,

View File

@ -1,7 +1,7 @@
//! An optimizer rule that transforms a plan
//! to fill gaps in time series data.
mod range_predicate;
pub mod range_predicate;
use crate::exec::gapfill::{FillStrategy, GapFill, GapFillParams};
use datafusion::{

View File

@ -14,7 +14,7 @@ use datafusion::{
/// Given a plan and a column, finds the predicates that use that column
/// and return a range with expressions for upper and lower bounds.
pub(super) fn find_time_range(plan: &LogicalPlan, time_col: &Column) -> Result<Range<Bound<Expr>>> {
pub fn find_time_range(plan: &LogicalPlan, time_col: &Column) -> Result<Range<Bound<Expr>>> {
let mut v = TimeRangeVisitor {
col: time_col.clone(),
range: TimeRange::default(),

View File

@ -8,6 +8,7 @@ use self::{
mod handle_gapfill;
mod influx_regex_to_datafusion_regex;
pub use handle_gapfill::range_predicate;
/// Register IOx-specific logical [`OptimizerRule`]s with the SessionContext
///

View File

@ -8,8 +8,7 @@ use crate::plan::planner_time_range_expression::{
duration_expr_to_nanoseconds, expr_to_df_interval_dt, time_range_to_df_expr,
};
use crate::plan::rewriter::rewrite_statement;
use crate::plan::util::{binary_operator_to_df_operator, Schemas};
use crate::plan::util_copy::rebase_expr;
use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, Schemas};
use crate::plan::var_ref::{column_type_to_var_ref_data_type, var_ref_data_type_to_data_type};
use arrow::datatypes::DataType;
use chrono_tz::Tz;
@ -21,9 +20,9 @@ use datafusion::logical_expr::logical_plan::builder::project;
use datafusion::logical_expr::logical_plan::Analyze;
use datafusion::logical_expr::utils::{expr_as_column_expr, find_aggregate_exprs};
use datafusion::logical_expr::{
binary_expr, date_bin, expr, lit, lit_timestamp_nano, AggregateFunction, AggregateUDF,
BinaryExpr, BuiltinScalarFunction, Explain, Expr, ExprSchemable, LogicalPlan,
LogicalPlanBuilder, Operator, PlanType, ScalarUDF, TableSource, ToStringifiedPlan,
binary_expr, col, date_bin, expr, lit, lit_timestamp_nano, now, Aggregate, AggregateFunction,
AggregateUDF, BinaryExpr, BuiltinScalarFunction, Explain, Expr, ExprSchemable, Extension,
LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ScalarUDF, TableSource, ToStringifiedPlan,
};
use datafusion_util::{lit_dict, AsExpr};
use generated_types::influxdata::iox::querier::v1::InfluxQlMetadata;
@ -43,6 +42,8 @@ use influxdb_influxql_parser::{
select::{Field, FieldList, FromMeasurementClause, MeasurementSelection, SelectStatement},
statement::Statement,
};
use iox_query::exec::gapfill::{FillStrategy, GapFill, GapFillParams};
use iox_query::logical_optimizer::range_predicate::find_time_range;
use itertools::Itertools;
use once_cell::sync::Lazy;
use query_functions::clean_non_meta_escapes;
@ -52,7 +53,7 @@ use schema::{
};
use std::collections::{HashSet, VecDeque};
use std::fmt::Debug;
use std::ops::{ControlFlow, Deref};
use std::ops::{Bound, ControlFlow, Deref, Range};
use std::str::FromStr;
use std::sync::Arc;
@ -144,6 +145,10 @@ impl<'a> Context<'a> {
..*self
}
}
fn fill(&self) -> FillClause {
self.fill.unwrap_or_default()
}
}
#[allow(missing_debug_implementations)]
@ -403,14 +408,8 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
// Transform InfluxQL AST field expressions to a list of DataFusion expressions.
let select_exprs = self.field_list_to_exprs(ctx, &plan, fields, &schemas)?;
let (plan, select_exprs_post_aggr) = self.select_aggregate(
plan,
fields,
select_exprs,
select.group_by.as_ref(),
group_by_tag_set,
&schemas,
)?;
let (plan, select_exprs_post_aggr) =
self.select_aggregate(ctx, plan, fields, select_exprs, group_by_tag_set, &schemas)?;
// Wrap the plan in a `LogicalPlan::Projection` from the select expressions
project(
@ -422,13 +421,17 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
fn select_aggregate(
&self,
ctx: &Context<'_>,
input: LogicalPlan,
fields: &[Field],
select_exprs: Vec<Expr>,
group_by: Option<&GroupByClause>,
group_by_tag_set: &[&str],
schemas: &Schemas,
) -> Result<(LogicalPlan, Vec<Expr>)> {
let Some(time_column_index) = find_time_column_index(fields) else {
return Err(DataFusionError::Internal("unable to find time column".to_owned()))
};
// Find a list of unique aggregate expressions from the projection.
//
// For example, a projection such as:
@ -443,14 +446,12 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
return Ok((input, select_exprs));
}
let aggr_group_by_exprs = if let Some(group_by) = group_by {
let aggr_group_by_exprs = if let Some(group_by) = ctx.group_by {
let mut group_by_exprs = Vec::new();
if group_by.time_dimension().is_some() {
// Include the GROUP BY TIME(..) expression
if let Some(index) = find_time_column_index(fields) {
group_by_exprs.push(select_exprs[index].clone());
}
group_by_exprs.push(select_exprs[time_column_index].clone());
}
// Exclude tags that do not exist in the current table schema.
@ -475,6 +476,48 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
.aggregate(aggr_group_by_exprs.clone(), aggr_exprs.clone())?
.build()?;
let fill_option = ctx.fill();
// Wrap the plan in a GapFill operator if the statement specifies a `GROUP BY TIME` clause and
// the FILL option is one of
//
// * `null`
// * `previous`
// * `literal` value
// * `linear`
//
let plan = if ctx.group_by.and_then(|gb| gb.time_dimension()).is_some()
&& fill_option != FillClause::None
{
let args = match select_exprs[time_column_index].clone().unalias() {
Expr::ScalarFunction {
fun: BuiltinScalarFunction::DateBin,
args,
} => args,
_ => {
// The InfluxQL planner adds the `date_bin` function,
// so this condition represents an internal failure.
return Err(DataFusionError::Internal(
"expected DATE_BIN function".to_owned(),
));
}
};
let fill_strategy = match fill_option {
FillClause::Null | FillClause::Value(_) => FillStrategy::Null,
FillClause::Previous => FillStrategy::PrevNullAsMissing,
FillClause::Linear => {
return Err(DataFusionError::NotImplemented(fill_option.to_string()))
}
FillClause::None => unreachable!(),
};
build_gap_fill_node(plan, time_column_index, args, fill_strategy)?
} else {
plan
};
// Combine the aggregate columns and group by expressions, which represents
// the final projection from the aggregate operator.
let aggr_projection_exprs = [aggr_group_by_exprs, aggr_exprs].concat();
@ -486,11 +529,29 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
.map(|expr| expr_as_column_expr(expr, &plan))
.collect::<Result<Vec<Expr>>>()?;
// Create a literal expression for `value` if the strategy
// is `FILL(<value>)`
let fill_if_null = match fill_option {
FillClause::Value(v) => Some(v),
_ => None,
};
// Rewrite the aggregate columns from the projection, so that the expressions
// refer to the columns from the aggregate projection
let select_exprs_post_aggr = select_exprs
.iter()
.map(|expr| rebase_expr(expr, &aggr_projection_exprs, &plan))
.zip(fields)
.map(|(expr, f)| {
// This implements the `FILL(<value>)` strategy, by coalescing any aggregate
// expressions to `<value>` when they are `NULL`.
let fill_if_null = if fill_if_null.is_some() && is_aggregate_field(f) {
fill_if_null
} else {
None
};
rebase_expr(expr, &aggr_projection_exprs, &fill_if_null, &plan)
})
.collect::<Result<Vec<Expr>>>()?;
// Strip the NULL columns, which are tags that do not exist in the aggregate
@ -682,14 +743,6 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
// or binning the time.
if let Some(group_by) = ctx.group_by {
if let Some(dim) = group_by.time_dimension() {
// Not supported until date_bin_gapfill is complete
let fill = ctx.fill.unwrap_or_default();
if fill != FillClause::None {
return Err(DataFusionError::NotImplemented(format!(
"{fill}"
)));
}
let stride = expr_to_df_interval_dt(&dim.interval)?;
let offset = if let Some(offset) = &dim.offset {
duration_expr_to_nanoseconds(offset)?
@ -983,6 +1036,94 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
}
}
/// Returns a [`LogicalPlan`] that performs gap-filling for the `input` plan.
///
/// # Arguments
///
/// * `input` - An aggregate plan which requires gap-filling.
/// * `date_bin_index` - The index of the field in the input schema that refers to the `date_bin` expression.
/// * `date_bin_args` - The list of arguments passed to the `date_bin` function, used to configure the gap-fill parameters.
/// * `fill_strategy` - The strategy used to fill gaps in the data.
fn build_gap_fill_node(
input: LogicalPlan,
date_bin_index: usize,
date_bin_args: Vec<Expr>,
fill_strategy: FillStrategy,
) -> Result<LogicalPlan> {
// Extract the gap-fill parameters from the arguments to the `DATE_BIN` function.
// Any unexpected conditions represents an internal error, as the `DATE_BIN` function is
// added by the planner.
let (stride, time_range, origin) = if date_bin_args.len() == 3 {
let time_col = date_bin_args[1].try_into_col().map_err(|_| {
DataFusionError::Internal(
"DATE_BIN requires a column as the source argument".to_string(),
)
})?;
// Ensure that a time range was specified and is valid for gap filling
let time_range = match find_time_range(input.inputs()[0], &time_col)? {
// Follow the InfluxQL behaviour to use an upper bound of `now` when
// not found:
//
// See: https://github.com/influxdata/influxdb/blob/98361e207349a3643bcc332d54b009818fe7585f/query/compile.go#L172-L176
Range {
start,
end: Bound::Unbounded,
} => Range {
start,
end: Bound::Excluded(now()),
},
time_range => time_range,
};
(
date_bin_args[0].clone(),
time_range,
date_bin_args[2].clone(),
)
} else {
// This is an internal error as the date_bin function is added by the planner and should
// always contain the correct number of arguments.
return Err(DataFusionError::Internal(format!(
"DATE_BIN expects 3 arguments, got {}",
date_bin_args.len()
)));
};
let aggr = Aggregate::try_from_plan(&input)?;
let mut new_group_expr: Vec<_> = aggr
.schema
.fields()
.iter()
.map(|f| Expr::Column(f.qualified_column()))
.collect();
let aggr_expr = new_group_expr.split_off(aggr.group_expr.len());
// The fill strategy for InfluxQL is specified at the query level
let fill_strategy = aggr_expr
.iter()
.cloned()
.map(|e| (e, fill_strategy.clone()))
.collect();
let time_column = col(input.schema().fields()[date_bin_index].qualified_column());
Ok(LogicalPlan::Extension(Extension {
node: Arc::new(GapFill::try_new(
Arc::new(input),
new_group_expr,
aggr_expr,
GapFillParams {
stride,
time_column,
origin,
time_range,
fill_strategy,
},
)?),
}))
}
/// Adds [`InfluxQlMetadata`] to the `plan`.
///
/// **Note**
@ -1025,13 +1166,18 @@ fn plan_with_metadata(plan: LogicalPlan, metadata: &InfluxQlMetadata) -> Result<
/// Returns `true` if any expressions refer to an aggregate function.
fn has_aggregate_exprs(fields: &FieldList) -> bool {
fields.iter().any(|f| {
walk_expr(&f.expr, &mut |e| match e {
IQLExpr::Call { name, .. } if is_aggregate_function(name) => ControlFlow::Break(()),
_ => ControlFlow::Continue(()),
})
.is_break()
fields.iter().any(is_aggregate_field)
}
/// A utility function that checks whether `f` is an
/// aggregate field or not. An aggregate field is one that contains at least one
/// call to an aggregate function.
fn is_aggregate_field(f: &Field) -> bool {
walk_expr(&f.expr, &mut |e| match e {
IQLExpr::Call { name, .. } if is_aggregate_function(name) => ControlFlow::Break(()),
_ => ControlFlow::Continue(()),
})
.is_break()
}
/// Find all the columns where the resolved data type
@ -1762,105 +1908,191 @@ mod test {
mod select_aggregate {
use super::*;
#[test]
fn test_single_measurement() {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data"), @r###"
mod single_measurement {
use super::*;
#[test]
fn no_group_by() {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N]
Aggregate: groupBy=[[]], aggr=[[COUNT(data.f64_field)]] [COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY non_existent"), @r###"
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY non_existent"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), non_existent:Null;N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, NULL AS non_existent, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), non_existent:Null;N, count:Int64;N]
Aggregate: groupBy=[[]], aggr=[[COUNT(data.f64_field)]] [COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo"), @r###"
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo"), @r###"
Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, data.foo AS foo, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, count:Int64;N]
Aggregate: groupBy=[[data.foo]], aggr=[[COUNT(data.f64_field)]] [foo:Dictionary(Int32, Utf8);N, COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
// The `COUNT(f64_field)` aggregate is only projected ones in the Aggregate and reused in the projection
assert_snapshot!(plan("SELECT COUNT(f64_field), COUNT(f64_field) + COUNT(f64_field), COUNT(f64_field) * 3 FROM data"), @r###"
// The `COUNT(f64_field)` aggregate is only projected ones in the Aggregate and reused in the projection
assert_snapshot!(plan("SELECT COUNT(f64_field), COUNT(f64_field) + COUNT(f64_field), COUNT(f64_field) * 3 FROM data"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N, count_f64_field_count_f64_field:Int64;N, count_f64_field:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, COUNT(data.f64_field) AS count, COUNT(data.f64_field) + COUNT(data.f64_field) AS count_f64_field_count_f64_field, COUNT(data.f64_field) * Int64(3) AS count_f64_field [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N, count_f64_field_count_f64_field:Int64;N, count_f64_field:Int64;N]
Aggregate: groupBy=[[]], aggr=[[COUNT(data.f64_field)]] [COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
// non-existent tags are excluded from the Aggregate groupBy and Sort operators
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo, non_existent"), @r###"
// non-existent tags are excluded from the Aggregate groupBy and Sort operators
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo, non_existent"), @r###"
Sort: foo ASC NULLS LAST, time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, non_existent:Null;N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, data.foo AS foo, NULL AS non_existent, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), foo:Dictionary(Int32, Utf8);N, non_existent:Null;N, count:Int64;N]
Aggregate: groupBy=[[data.foo]], aggr=[[COUNT(data.f64_field)]] [foo:Dictionary(Int32, Utf8);N, COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
// Fallible
// Aggregate expression is projected once and reused in final projection
assert_snapshot!(plan("SELECT COUNT(f64_field), COUNT(f64_field) * 2 FROM data"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N, count_f64_field:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, COUNT(data.f64_field) AS count, COUNT(data.f64_field) * Int64(2) AS count_f64_field [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N, count_f64_field:Int64;N]
Aggregate: groupBy=[[]], aggr=[[COUNT(data.f64_field)]] [COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
// Cannot combine aggregate and non-aggregate columns in the projection
assert_snapshot!(plan("SELECT COUNT(f64_field), f64_field FROM data"), @"Error during planning: mixing aggregate and non-aggregate columns is not supported");
assert_snapshot!(plan("SELECT COUNT(f64_field) + f64_field FROM data"), @"Error during planning: mixing aggregate and non-aggregate columns is not supported");
}
// Fallible
#[test]
fn test_single_measurement_group_by_time() {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(none)"), @r###"
// Cannot combine aggregate and non-aggregate columns in the projection
assert_snapshot!(plan("SELECT COUNT(f64_field), f64_field FROM data"), @"Error during planning: mixing aggregate and non-aggregate columns is not supported");
assert_snapshot!(plan("SELECT COUNT(f64_field) + f64_field FROM data"), @"Error during planning: mixing aggregate and non-aggregate columns is not supported");
}
#[test]
fn group_by_time() {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(none)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
// supports offset parameter
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s, 5s) FILL(none)"), @r###"
// supports offset parameter
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s, 5s) FILL(none)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(5000000000, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
}
/// These tests validate the planner returns an error when using features that
/// are not implemented.
mod not_implemented {
use super::*;
#[test]
fn test_with_limit_or_offset() {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data LIMIT 1"), @r###"
Limit: skip=0, fetch=1 [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N]
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N]
Aggregate: groupBy=[[]], aggr=[[COUNT(data.f64_field)]] [COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data OFFSET 1"), @r###"
Limit: skip=1, fetch=None [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N]
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, TimestampNanosecond(0, None) AS time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), count:Int64;N]
Aggregate: groupBy=[[]], aggr=[[COUNT(data.f64_field)]] [COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
}
#[test]
fn test_group_by_time_precision() {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10u) FILL(none)"), @"This feature is not implemented: interval limited to a precision of milliseconds. See https://github.com/influxdata/influxdb_iox/issues/7204");
}
fn group_by_time_gapfill() {
// No time bounds
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
// No lower time bounds
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data WHERE time < '2022-10-31T02:02:00Z' GROUP BY TIME(10s)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(TimestampNanosecond(1667181720000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Filter: data.time < TimestampNanosecond(1667181720000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
// No upper time bounds
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data WHERE time >= '2022-10-31T02:00:00Z' GROUP BY TIME(10s)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Included(TimestampNanosecond(1667181600000000000, None))..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Filter: data.time >= TimestampNanosecond(1667181600000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
#[test]
fn test_single_measurement_group_by_time_gapfill() {
// Default is FILL(null)
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s)"), @"This feature is not implemented: FILL(NULL)");
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(null)"), @"This feature is not implemented: FILL(NULL)");
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(linear)"), @"This feature is not implemented: FILL(LINEAR)");
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(previous)"), @"This feature is not implemented: FILL(PREVIOUS)");
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(0)"), @"This feature is not implemented: FILL(0)");
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data WHERE time >= '2022-10-31T02:00:00Z' AND time < '2022-10-31T02:02:00Z' GROUP BY TIME(10s)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Included(TimestampNanosecond(1667181600000000000, None))..Excluded(TimestampNanosecond(1667181720000000000, None)) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Filter: data.time >= TimestampNanosecond(1667181600000000000, None) AND data.time < TimestampNanosecond(1667181720000000000, None) [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(null)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(previous)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, COUNT(data.f64_field) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[LOCF(null-as-missing, COUNT(data.f64_field))]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(0)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, coalesce(COUNT(data.f64_field), Int64(0)) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
// Coalesces the fill value, which is a float, to the matching type of a `COUNT` aggregate.
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(3.2)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, coalesce(COUNT(data.f64_field), Int64(3)) AS count [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count:Int64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
// Aggregates as part of a binary expression
assert_snapshot!(plan("SELECT COUNT(f64_field) + MEAN(f64_field) FROM data GROUP BY TIME(10s) FILL(3.2)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count_f64_field_mean_f64_field:Float64;N]
Projection: Dictionary(Int32, Utf8("data")) AS iox::measurement, time, coalesce(COUNT(data.f64_field), Int64(3)) + coalesce(AVG(data.f64_field), Float64(3.2)) AS count_f64_field_mean_f64_field [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, count_f64_field_mean_f64_field:Float64;N]
GapFill: groupBy=[[time]], aggr=[[COUNT(data.f64_field), AVG(data.f64_field)]], time_column=time, stride=IntervalDayTime("10000"), range=Unbounded..Excluded(now()) [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N, AVG(data.f64_field):Float64;N]
Aggregate: groupBy=[[datebin(IntervalDayTime("10000"), data.time, TimestampNanosecond(0, None)) AS time]], aggr=[[COUNT(data.f64_field), AVG(data.f64_field)]] [time:Timestamp(Nanosecond, None);N, COUNT(data.f64_field):Int64;N, AVG(data.f64_field):Float64;N]
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
}
/// These tests validate the planner returns an error when using features that
/// are not implemented or supported.
mod not_implemented {
use super::*;
/// Tracked by <https://github.com/influxdata/influxdb_iox/issues/6920>
#[test]
fn with_limit_or_offset() {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo LIMIT 1"), @"This feature is not implemented: GROUP BY combined with LIMIT or OFFSET clause");
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY foo OFFSET 1"), @"This feature is not implemented: GROUP BY combined with LIMIT or OFFSET clause");
}
/// Tracked by <https://github.com/influxdata/influxdb_iox/issues/7204>
#[test]
fn group_by_time_precision() {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10u) FILL(none)"), @"This feature is not implemented: interval limited to a precision of milliseconds. See https://github.com/influxdata/influxdb_iox/issues/7204");
}
/// Tracked by <https://github.com/influxdata/influxdb_iox/issues/6916>
#[test]
fn group_by_time_gapfill() {
assert_snapshot!(plan("SELECT COUNT(f64_field) FROM data GROUP BY TIME(10s) FILL(linear)"), @"This feature is not implemented: FILL(LINEAR)");
}
}
}
}

View File

@ -1,6 +1,10 @@
use crate::plan::util_copy;
use arrow::datatypes::DataType;
use datafusion::common::{DFSchema, DFSchemaRef, DataFusionError, Result};
use datafusion::logical_expr::Operator;
use datafusion::logical_expr::utils::expr_as_column_expr;
use datafusion::logical_expr::{coalesce, lit, Expr, ExprSchemable, LogicalPlan, Operator};
use influxdb_influxql_parser::expression::BinaryOperator;
use influxdb_influxql_parser::literal::Number;
use influxdb_influxql_parser::string::Regex;
use query_functions::clean_non_meta_escapes;
use schema::Schema;
@ -51,3 +55,65 @@ pub(crate) fn parse_regex(re: &Regex) -> Result<regex::Regex> {
DataFusionError::External(format!("invalid regular expression '{re}': {e}").into())
})
}
/// Returns `n` as a literal expression of the specified `data_type`.
fn number_to_expr(n: &Number, data_type: DataType) -> Result<Expr> {
Ok(match (n, data_type) {
(Number::Integer(v), DataType::Int64) => lit(*v),
(Number::Integer(v), DataType::Float64) => lit(*v as f64),
(Number::Integer(v), DataType::UInt64) => lit(*v as u64),
(Number::Float(v), DataType::Int64) => lit(*v as i64),
(Number::Float(v), DataType::Float64) => lit(*v),
(Number::Float(v), DataType::UInt64) => lit(*v as u64),
(n, data_type) => {
// The only output data types expected are Int64, Float64 or UInt64
return Err(DataFusionError::Internal(format!(
"no conversion from {n} to {data_type}"
)));
}
})
}
/// Rebuilds an `Expr` as a projection on top of a collection of `Expr`'s.
///
/// For example, the expression `a + b < 1` would require, as input, the 2
/// individual columns, `a` and `b`. But, if the base expressions already
/// contain the `a + b` result, then that may be used in lieu of the `a` and
/// `b` columns.
///
/// This is useful in the context of a query like:
///
/// SELECT a + b < 1 ... GROUP BY a + b
///
/// where post-aggregation, `a + b` need not be a projection against the
/// individual columns `a` and `b`, but rather it is a projection against the
/// `a + b` found in the GROUP BY.
///
/// `fill_if_null` will be used to coalesce any expressions from `NULL`.
/// This is used with the `FILL(<value>)` strategy.
pub(crate) fn rebase_expr(
expr: &Expr,
base_exprs: &[Expr],
fill_if_null: &Option<Number>,
plan: &LogicalPlan,
) -> Result<Expr> {
if let Some(value) = fill_if_null {
util_copy::clone_with_replacement(expr, &|nested_expr| {
Ok(if base_exprs.contains(nested_expr) {
let col_expr = expr_as_column_expr(nested_expr, plan)?;
let data_type = col_expr.get_type(plan.schema())?;
Some(coalesce(vec![col_expr, number_to_expr(value, data_type)?]))
} else {
None
})
})
} else {
util_copy::clone_with_replacement(expr, &|nested_expr| {
Ok(if base_exprs.contains(nested_expr) {
Some(expr_as_column_expr(nested_expr, plan)?)
} else {
None
})
})
}
}

View File

@ -17,32 +17,6 @@ use datafusion::logical_expr::{
LogicalPlan,
};
/// Rebuilds an `Expr` as a projection on top of a collection of `Expr`'s.
///
/// For example, the expression `a + b < 1` would require, as input, the 2
/// individual columns, `a` and `b`. But, if the base expressions already
/// contain the `a + b` result, then that may be used in lieu of the `a` and
/// `b` columns.
///
/// This is useful in the context of a query like:
///
/// SELECT a + b < 1 ... GROUP BY a + b
///
/// where post-aggregation, `a + b` need not be a projection against the
/// individual columns `a` and `b`, but rather it is a projection against the
/// `a + b` found in the GROUP BY.
///
/// Source: <https://github.com/apache/arrow-datafusion/blob/e6d71068474f3b2ef9ad5e9af85f56f0d0560a1b/datafusion/sql/src/utils.rs#L63>
pub(crate) fn rebase_expr(expr: &Expr, base_exprs: &[Expr], plan: &LogicalPlan) -> Result<Expr> {
clone_with_replacement(expr, &|nested_expr| {
if base_exprs.contains(nested_expr) {
Ok(Some(expr_as_column_expr(nested_expr, plan)?))
} else {
Ok(None)
}
})
}
/// Returns a cloned `Expr`, but any of the `Expr`'s in the tree may be
/// replaced/customized by the replacement function.
///
@ -62,7 +36,7 @@ pub(crate) fn rebase_expr(expr: &Expr, base_exprs: &[Expr], plan: &LogicalPlan)
/// `clone_with_replacement()`.
///
/// Source: <https://github.com/apache/arrow-datafusion/blob/26e1b20ea/datafusion/sql/src/utils.rs#L153>
fn clone_with_replacement<F>(expr: &Expr, replacement_fn: &F) -> Result<Expr>
pub(super) fn clone_with_replacement<F>(expr: &Expr, replacement_fn: &F) -> Result<Expr>
where
F: Fn(&Expr) -> Result<Option<Expr>>,
{