From 511a0bae7828f362ae40b9ac8c34533faa82e1bc Mon Sep 17 00:00:00 2001 From: Martin Hilton Date: Thu, 29 Jun 2023 06:53:18 +0100 Subject: [PATCH] feat(influxql): add derivative and non_negative_derivative (#8103) Add the DERIVATIVE and NON_NEGATIVE_DERIVATIVE functions to influxql. These are used to calculate derivatives over arbitrary time units. The implementation is modeled after the DIFFERENCE and NON_NEGATIVE_DIFFERENCE functions, with a difference that the unit parameters is a configuration of the user-defined aggregator function and therefore there cannot be a single shared definition of the function. The NON_NEGATIVE_DIFFERENCE function implementation has been refactored to be an arbitrary NON_NEGATIVE wrapper for any Accumulator function. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .../query_tests/cases/in/window_like.influxql | 60 +- .../cases/in/window_like.influxql.expected | 569 +++++++++++++++++- iox_query_influxql/src/plan/planner.rs | 167 ++++- iox_query_influxql/src/plan/udaf.rs | 217 ++++++- iox_query_influxql/src/plan/udf.rs | 52 ++ 5 files changed, 1038 insertions(+), 27 deletions(-) diff --git a/influxdb_iox/tests/query_tests/cases/in/window_like.influxql b/influxdb_iox/tests/query_tests/cases/in/window_like.influxql index 8e2a2f392f..abae5a7ac1 100644 --- a/influxdb_iox/tests/query_tests/cases/in/window_like.influxql +++ b/influxdb_iox/tests/query_tests/cases/in/window_like.influxql @@ -89,4 +89,62 @@ SELECT difference(mean(usage_idle)), non_negative_difference(mean(usage_idle)), -- SELECT moving_average(mean(writes), 3), mean(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(none); -- SELECT moving_average(mean(writes), 3), mean(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0); -- SELECT moving_average(mean(writes), 3), mean(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous); --- SELECT moving_average(mean(writes), 3), mean(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear); \ No newline at end of file +-- SELECT moving_average(mean(writes), 3), mean(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear); + +-- +-- derivative +-- +SELECT derivative(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001; +SELECT derivative(writes, 2s) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001; +SELECT derivative(usage_system) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu0'; +SELECT derivative(usage_system, 5s) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu1'; +-- group by a tag +SELECT derivative(usage_idle) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu; +SELECT derivative(usage_idle, 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu; + +-- +-- derivative + aggregate +-- +SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s); +SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s); +SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s); +SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s); +-- the input data is regular data at 10s intervals, so 7s windows ensure the `mean` generates windows with NULL values to test NULL handling of difference +SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0); +SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0); +SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous); +SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous); +SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear); +SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear); +-- group by time and a tag +SELECT derivative(mean(usage_idle)) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu; +SELECT derivative(mean(usage_idle), 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu; + +-- +-- non_negative_derivative +-- +SELECT non_negative_derivative(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001; +SELECT non_negative_derivative(writes, 2s) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001; +SELECT non_negative_derivative(usage_system) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu0'; +SELECT non_negative_derivative(usage_system, 5s) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu1'; +-- group by a tag +SELECT non_negative_derivative(usage_idle) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu; +SELECT non_negative_derivative(usage_idle, 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu; + +-- +-- non_negative_derivative + aggregate +-- +SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s); +SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s); +SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s); +SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s); +-- the input data is regular data at 10s intervals, so 7s windows ensure the `mean` generates windows with NULL values to test NULL handling of difference +SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0); +SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0); +SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous); +SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous); +SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear); +SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear); +-- group by time and a tag +SELECT non_negative_derivative(mean(usage_idle)) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu; +SELECT non_negative_derivative(mean(usage_idle), 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu; \ No newline at end of file diff --git a/influxdb_iox/tests/query_tests/cases/in/window_like.influxql.expected b/influxdb_iox/tests/query_tests/cases/in/window_like.influxql.expected index 276a741127..935e319ac5 100644 --- a/influxdb_iox/tests/query_tests/cases/in/window_like.influxql.expected +++ b/influxdb_iox/tests/query_tests/cases/in/window_like.influxql.expected @@ -350,4 +350,571 @@ tags: cpu=cpu1 | 1970-01-01T00:02:00 | 0.36666666666667425 | 0.36666666666667425 | | | 1970-01-01T00:02:30 | -0.03333333333334565 | | | | 1970-01-01T00:03:00 | -0.03333333333333144 | | 99.75 | -+---------------------+----------------------+-------------------------+----------------+ \ No newline at end of file ++---------------------+----------------------+-------------------------+----------------+ +-- InfluxQL: SELECT derivative(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001; +name: diskio ++---------------------+------------+ +| time | derivative | ++---------------------+------------+ +| 1970-01-01T00:02:20 | 16.4 | +| 1970-01-01T00:02:30 | 18.7 | +| 1970-01-01T00:02:40 | 11.2 | +| 1970-01-01T00:02:50 | 11.0 | +| 1970-01-01T00:03:00 | 21.9 | +| 1970-01-01T00:03:10 | 7.5 | +| 1970-01-01T00:03:20 | 7.6 | +| 1970-01-01T00:03:30 | 14.6 | ++---------------------+------------+ +-- InfluxQL: SELECT derivative(writes, 2s) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001; +name: diskio ++---------------------+------------+ +| time | derivative | ++---------------------+------------+ +| 1970-01-01T00:02:20 | 32.8 | +| 1970-01-01T00:02:30 | 37.4 | +| 1970-01-01T00:02:40 | 22.4 | +| 1970-01-01T00:02:50 | 22.0 | +| 1970-01-01T00:03:00 | 43.8 | +| 1970-01-01T00:03:10 | 15.0 | +| 1970-01-01T00:03:20 | 15.2 | +| 1970-01-01T00:03:30 | 29.2 | ++---------------------+------------+ +-- InfluxQL: SELECT derivative(usage_system) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu0'; +name: cpu ++---------------------+----------------------+ +| time | derivative | ++---------------------+----------------------+ +| 1970-01-01T00:01:10 | -0.09000000000000057 | +| 1970-01-01T00:01:30 | -0.25999999999999945 | +| 1970-01-01T00:01:40 | 0.4299999999999997 | +| 1970-01-01T00:02:10 | 0.06999999999999981 | +| 1970-01-01T00:02:50 | 0.0 | +| 1970-01-01T00:03:00 | 0.020000000000000285 | ++---------------------+----------------------+ +-- InfluxQL: SELECT derivative(usage_system, 5s) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu1'; +name: cpu ++---------------------+-----------------------+ +| time | derivative | ++---------------------+-----------------------+ +| 1970-01-01T00:01:10 | -0.04999999999999716 | +| 1970-01-01T00:02:00 | 0.020000000000000285 | +| 1970-01-01T00:02:10 | -0.05000000000000426 | +| 1970-01-01T00:02:20 | 0.05000000000000426 | +| 1970-01-01T00:03:00 | -0.012500000000001066 | +| 1970-01-01T00:03:10 | 0.0 | ++---------------------+-----------------------+ +-- InfluxQL: SELECT derivative(usage_idle) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu; +name: cpu +tags: cpu=cpu0 ++---------------------+-----------------------+ +| time | derivative | ++---------------------+-----------------------+ +| 1970-01-01T00:02:20 | -0.07999999999999971 | +| 1970-01-01T00:02:30 | 0.14000000000000057 | +| 1970-01-01T00:02:40 | -0.020000000000000285 | +| 1970-01-01T00:02:50 | -0.04000000000000057 | +| 1970-01-01T00:03:00 | 0.020000000000000285 | +| 1970-01-01T00:03:10 | -0.12000000000000029 | ++---------------------+-----------------------+ +name: cpu +tags: cpu=cpu1 ++---------------------+-----------------------+ +| time | derivative | ++---------------------+-----------------------+ +| 1970-01-01T00:02:20 | 0.010000000000000852 | +| 1970-01-01T00:02:30 | 0.0 | +| 1970-01-01T00:02:40 | -0.010000000000000852 | +| 1970-01-01T00:02:50 | 0.0 | +| 1970-01-01T00:03:00 | 0.0 | +| 1970-01-01T00:03:10 | 0.0 | ++---------------------+-----------------------+ +-- InfluxQL: SELECT derivative(usage_idle, 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu; +name: cpu +tags: cpu=cpu0 ++---------------------+-----------------------+ +| time | derivative | ++---------------------+-----------------------+ +| 1970-01-01T00:02:20 | -0.039999999999999855 | +| 1970-01-01T00:02:30 | 0.07000000000000028 | +| 1970-01-01T00:02:40 | -0.010000000000000142 | +| 1970-01-01T00:02:50 | -0.020000000000000285 | +| 1970-01-01T00:03:00 | 0.010000000000000142 | +| 1970-01-01T00:03:10 | -0.060000000000000143 | ++---------------------+-----------------------+ +name: cpu +tags: cpu=cpu1 ++---------------------+-----------------------+ +| time | derivative | ++---------------------+-----------------------+ +| 1970-01-01T00:02:20 | 0.005000000000000426 | +| 1970-01-01T00:02:30 | 0.0 | +| 1970-01-01T00:02:40 | -0.005000000000000426 | +| 1970-01-01T00:02:50 | 0.0 | +| 1970-01-01T00:03:00 | 0.0 | +| 1970-01-01T00:03:10 | 0.0 | ++---------------------+-----------------------+ +-- InfluxQL: SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s); +name: diskio ++---------------------+------------+ +| time | derivative | ++---------------------+------------+ +| 1970-01-01T00:02:20 | 82.0 | +| 1970-01-01T00:02:27 | 187.0 | +| 1970-01-01T00:02:34 | 112.0 | +| 1970-01-01T00:02:48 | 55.0 | +| 1970-01-01T00:02:55 | 219.0 | +| 1970-01-01T00:03:09 | 37.5 | +| 1970-01-01T00:03:16 | 76.0 | +| 1970-01-01T00:03:30 | 73.0 | ++---------------------+------------+ +-- InfluxQL: SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s); +name: diskio ++---------------------+--------------------+ +| time | derivative | ++---------------------+--------------------+ +| 1970-01-01T00:02:20 | 5.857142857142857 | +| 1970-01-01T00:02:27 | 13.357142857142858 | +| 1970-01-01T00:02:34 | 8.0 | +| 1970-01-01T00:02:48 | 3.9285714285714284 | +| 1970-01-01T00:02:55 | 15.642857142857142 | +| 1970-01-01T00:03:09 | 2.6785714285714284 | +| 1970-01-01T00:03:16 | 5.428571428571429 | +| 1970-01-01T00:03:30 | 5.214285714285714 | ++---------------------+--------------------+ +-- InfluxQL: SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s); +name: diskio ++---------------------+-------------------+ +| time | derivative | ++---------------------+-------------------+ +| 1970-01-01T00:02:00 | 389.3333333330229 | +| 1970-01-01T00:02:30 | 431.0 | +| 1970-01-01T00:03:00 | 405.0 | +| 1970-01-01T00:03:30 | 221.6666666669771 | ++---------------------+-------------------+ +-- InfluxQL: SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s); +name: diskio ++---------------------+--------------------+ +| time | derivative | ++---------------------+--------------------+ +| 1970-01-01T00:02:00 | 6.488888888883715 | +| 1970-01-01T00:02:30 | 7.183333333333334 | +| 1970-01-01T00:03:00 | 6.75 | +| 1970-01-01T00:03:30 | 3.6944444444496183 | ++---------------------+--------------------+ +-- InfluxQL: SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0); +name: diskio ++---------------------+------------+ +| time | derivative | ++---------------------+------------+ +| 1970-01-01T00:02:06 | 5592646.0 | +| 1970-01-01T00:02:13 | -5592646.0 | +| 1970-01-01T00:02:20 | 5592810.0 | +| 1970-01-01T00:02:27 | 187.0 | +| 1970-01-01T00:02:34 | 112.0 | +| 1970-01-01T00:02:41 | -5593109.0 | +| 1970-01-01T00:02:48 | 5593219.0 | +| 1970-01-01T00:02:55 | 219.0 | +| 1970-01-01T00:03:02 | -5593438.0 | +| 1970-01-01T00:03:09 | 5593513.0 | +| 1970-01-01T00:03:16 | 76.0 | +| 1970-01-01T00:03:23 | -5593589.0 | +| 1970-01-01T00:03:30 | 5593735.0 | ++---------------------+------------+ +-- InfluxQL: SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0); +name: diskio ++---------------------+---------------------+ +| time | derivative | ++---------------------+---------------------+ +| 1970-01-01T00:02:06 | 399474.71428571426 | +| 1970-01-01T00:02:13 | -399474.71428571426 | +| 1970-01-01T00:02:20 | 399486.4285714286 | +| 1970-01-01T00:02:27 | 13.357142857142858 | +| 1970-01-01T00:02:34 | 8.0 | +| 1970-01-01T00:02:41 | -399507.78571428574 | +| 1970-01-01T00:02:48 | 399515.64285714284 | +| 1970-01-01T00:02:55 | 15.642857142857142 | +| 1970-01-01T00:03:02 | -399531.28571428574 | +| 1970-01-01T00:03:09 | 399536.64285714284 | +| 1970-01-01T00:03:16 | 5.428571428571429 | +| 1970-01-01T00:03:23 | -399542.0714285714 | +| 1970-01-01T00:03:30 | 399552.5 | ++---------------------+---------------------+ +-- InfluxQL: SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous); +name: diskio ++---------------------+------------+ +| time | derivative | ++---------------------+------------+ +| 1970-01-01T00:02:13 | 0.0 | +| 1970-01-01T00:02:20 | 164.0 | +| 1970-01-01T00:02:27 | 187.0 | +| 1970-01-01T00:02:34 | 112.0 | +| 1970-01-01T00:02:41 | 0.0 | +| 1970-01-01T00:02:48 | 110.0 | +| 1970-01-01T00:02:55 | 219.0 | +| 1970-01-01T00:03:02 | 0.0 | +| 1970-01-01T00:03:09 | 75.0 | +| 1970-01-01T00:03:16 | 76.0 | +| 1970-01-01T00:03:23 | 0.0 | +| 1970-01-01T00:03:30 | 146.0 | ++---------------------+------------+ +-- InfluxQL: SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous); +name: diskio ++---------------------+--------------------+ +| time | derivative | ++---------------------+--------------------+ +| 1970-01-01T00:02:13 | 0.0 | +| 1970-01-01T00:02:20 | 11.714285714285714 | +| 1970-01-01T00:02:27 | 13.357142857142858 | +| 1970-01-01T00:02:34 | 8.0 | +| 1970-01-01T00:02:41 | 0.0 | +| 1970-01-01T00:02:48 | 7.857142857142857 | +| 1970-01-01T00:02:55 | 15.642857142857142 | +| 1970-01-01T00:03:02 | 0.0 | +| 1970-01-01T00:03:09 | 5.357142857142857 | +| 1970-01-01T00:03:16 | 5.428571428571429 | +| 1970-01-01T00:03:23 | 0.0 | +| 1970-01-01T00:03:30 | 10.428571428571429 | ++---------------------+--------------------+ +-- InfluxQL: SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear); +name: diskio ++---------------------+------------+ +| time | derivative | ++---------------------+------------+ +| 1970-01-01T00:02:13 | 82.0 | +| 1970-01-01T00:02:20 | 82.0 | +| 1970-01-01T00:02:27 | 187.0 | +| 1970-01-01T00:02:34 | 112.0 | +| 1970-01-01T00:02:41 | 55.0 | +| 1970-01-01T00:02:48 | 55.0 | +| 1970-01-01T00:02:55 | 219.0 | +| 1970-01-01T00:03:02 | 37.5 | +| 1970-01-01T00:03:09 | 37.5 | +| 1970-01-01T00:03:16 | 76.0 | +| 1970-01-01T00:03:23 | 73.0 | +| 1970-01-01T00:03:30 | 73.0 | ++---------------------+------------+ +-- InfluxQL: SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear); +name: diskio ++---------------------+--------------------+ +| time | derivative | ++---------------------+--------------------+ +| 1970-01-01T00:02:13 | 5.857142857142857 | +| 1970-01-01T00:02:20 | 5.857142857142857 | +| 1970-01-01T00:02:27 | 13.357142857142858 | +| 1970-01-01T00:02:34 | 8.0 | +| 1970-01-01T00:02:41 | 3.9285714285714284 | +| 1970-01-01T00:02:48 | 3.9285714285714284 | +| 1970-01-01T00:02:55 | 15.642857142857142 | +| 1970-01-01T00:03:02 | 2.6785714285714284 | +| 1970-01-01T00:03:09 | 2.6785714285714284 | +| 1970-01-01T00:03:16 | 5.428571428571429 | +| 1970-01-01T00:03:23 | 5.214285714285714 | +| 1970-01-01T00:03:30 | 5.214285714285714 | ++---------------------+--------------------+ +-- InfluxQL: SELECT derivative(mean(usage_idle)) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu; +name: cpu +tags: cpu=cpu0 ++---------------------+---------------------+ +| time | derivative | ++---------------------+---------------------+ +| 1970-01-01T00:02:00 | 0.36666666666666003 | +| 1970-01-01T00:02:30 | 1.566666666666677 | +| 1970-01-01T00:03:00 | -0.7333333333333343 | ++---------------------+---------------------+ +name: cpu +tags: cpu=cpu1 ++---------------------+----------------------+ +| time | derivative | ++---------------------+----------------------+ +| 1970-01-01T00:02:00 | 0.36666666666667425 | +| 1970-01-01T00:02:30 | -0.03333333333334565 | +| 1970-01-01T00:03:00 | -0.03333333333333144 | ++---------------------+----------------------+ +-- InfluxQL: SELECT derivative(mean(usage_idle), 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu; +name: cpu +tags: cpu=cpu0 ++---------------------+-----------------------+ +| time | derivative | ++---------------------+-----------------------+ +| 1970-01-01T00:02:00 | 0.006111111111111 | +| 1970-01-01T00:02:30 | 0.026111111111111286 | +| 1970-01-01T00:03:00 | -0.012222222222222238 | ++---------------------+-----------------------+ +name: cpu +tags: cpu=cpu1 ++---------------------+------------------------+ +| time | derivative | ++---------------------+------------------------+ +| 1970-01-01T00:02:00 | 0.006111111111111237 | +| 1970-01-01T00:02:30 | -0.0005555555555557608 | +| 1970-01-01T00:03:00 | -0.000555555555555524 | ++---------------------+------------------------+ +-- InfluxQL: SELECT non_negative_derivative(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001; +name: diskio ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:20 | 16.4 | +| 1970-01-01T00:02:30 | 18.7 | +| 1970-01-01T00:02:40 | 11.2 | +| 1970-01-01T00:02:50 | 11.0 | +| 1970-01-01T00:03:00 | 21.9 | +| 1970-01-01T00:03:10 | 7.5 | +| 1970-01-01T00:03:20 | 7.6 | +| 1970-01-01T00:03:30 | 14.6 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(writes, 2s) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001; +name: diskio ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:20 | 32.8 | +| 1970-01-01T00:02:30 | 37.4 | +| 1970-01-01T00:02:40 | 22.4 | +| 1970-01-01T00:02:50 | 22.0 | +| 1970-01-01T00:03:00 | 43.8 | +| 1970-01-01T00:03:10 | 15.0 | +| 1970-01-01T00:03:20 | 15.2 | +| 1970-01-01T00:03:30 | 29.2 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(usage_system) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu0'; +name: cpu ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:01:40 | 0.4299999999999997 | +| 1970-01-01T00:02:10 | 0.06999999999999981 | +| 1970-01-01T00:02:50 | 0.0 | +| 1970-01-01T00:03:00 | 0.020000000000000285 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(usage_system, 5s) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu1'; +name: cpu ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:00 | 0.020000000000000285 | +| 1970-01-01T00:02:20 | 0.05000000000000426 | +| 1970-01-01T00:03:10 | 0.0 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(usage_idle) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu; +name: cpu +tags: cpu=cpu0 ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:30 | 0.14000000000000057 | +| 1970-01-01T00:03:00 | 0.020000000000000285 | ++---------------------+-------------------------+ +name: cpu +tags: cpu=cpu1 ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:20 | 0.010000000000000852 | +| 1970-01-01T00:02:30 | 0.0 | +| 1970-01-01T00:02:50 | 0.0 | +| 1970-01-01T00:03:00 | 0.0 | +| 1970-01-01T00:03:10 | 0.0 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(usage_idle, 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu; +name: cpu +tags: cpu=cpu0 ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:30 | 0.07000000000000028 | +| 1970-01-01T00:03:00 | 0.010000000000000142 | ++---------------------+-------------------------+ +name: cpu +tags: cpu=cpu1 ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:20 | 0.005000000000000426 | +| 1970-01-01T00:02:30 | 0.0 | +| 1970-01-01T00:02:50 | 0.0 | +| 1970-01-01T00:03:00 | 0.0 | +| 1970-01-01T00:03:10 | 0.0 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s); +name: diskio ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:20 | 82.0 | +| 1970-01-01T00:02:27 | 187.0 | +| 1970-01-01T00:02:34 | 112.0 | +| 1970-01-01T00:02:48 | 55.0 | +| 1970-01-01T00:02:55 | 219.0 | +| 1970-01-01T00:03:09 | 37.5 | +| 1970-01-01T00:03:16 | 76.0 | +| 1970-01-01T00:03:30 | 73.0 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s); +name: diskio ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:20 | 5.857142857142857 | +| 1970-01-01T00:02:27 | 13.357142857142858 | +| 1970-01-01T00:02:34 | 8.0 | +| 1970-01-01T00:02:48 | 3.9285714285714284 | +| 1970-01-01T00:02:55 | 15.642857142857142 | +| 1970-01-01T00:03:09 | 2.6785714285714284 | +| 1970-01-01T00:03:16 | 5.428571428571429 | +| 1970-01-01T00:03:30 | 5.214285714285714 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s); +name: diskio ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:00 | 389.3333333330229 | +| 1970-01-01T00:02:30 | 431.0 | +| 1970-01-01T00:03:00 | 405.0 | +| 1970-01-01T00:03:30 | 221.6666666669771 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s); +name: diskio ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:00 | 6.488888888883715 | +| 1970-01-01T00:02:30 | 7.183333333333334 | +| 1970-01-01T00:03:00 | 6.75 | +| 1970-01-01T00:03:30 | 3.6944444444496183 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0); +name: diskio ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:06 | 5592646.0 | +| 1970-01-01T00:02:20 | 5592810.0 | +| 1970-01-01T00:02:27 | 187.0 | +| 1970-01-01T00:02:34 | 112.0 | +| 1970-01-01T00:02:48 | 5593219.0 | +| 1970-01-01T00:02:55 | 219.0 | +| 1970-01-01T00:03:09 | 5593513.0 | +| 1970-01-01T00:03:16 | 76.0 | +| 1970-01-01T00:03:30 | 5593735.0 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0); +name: diskio ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:06 | 399474.71428571426 | +| 1970-01-01T00:02:20 | 399486.4285714286 | +| 1970-01-01T00:02:27 | 13.357142857142858 | +| 1970-01-01T00:02:34 | 8.0 | +| 1970-01-01T00:02:48 | 399515.64285714284 | +| 1970-01-01T00:02:55 | 15.642857142857142 | +| 1970-01-01T00:03:09 | 399536.64285714284 | +| 1970-01-01T00:03:16 | 5.428571428571429 | +| 1970-01-01T00:03:30 | 399552.5 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous); +name: diskio ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:13 | 0.0 | +| 1970-01-01T00:02:20 | 164.0 | +| 1970-01-01T00:02:27 | 187.0 | +| 1970-01-01T00:02:34 | 112.0 | +| 1970-01-01T00:02:41 | 0.0 | +| 1970-01-01T00:02:48 | 110.0 | +| 1970-01-01T00:02:55 | 219.0 | +| 1970-01-01T00:03:02 | 0.0 | +| 1970-01-01T00:03:09 | 75.0 | +| 1970-01-01T00:03:16 | 76.0 | +| 1970-01-01T00:03:23 | 0.0 | +| 1970-01-01T00:03:30 | 146.0 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous); +name: diskio ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:13 | 0.0 | +| 1970-01-01T00:02:20 | 11.714285714285714 | +| 1970-01-01T00:02:27 | 13.357142857142858 | +| 1970-01-01T00:02:34 | 8.0 | +| 1970-01-01T00:02:41 | 0.0 | +| 1970-01-01T00:02:48 | 7.857142857142857 | +| 1970-01-01T00:02:55 | 15.642857142857142 | +| 1970-01-01T00:03:02 | 0.0 | +| 1970-01-01T00:03:09 | 5.357142857142857 | +| 1970-01-01T00:03:16 | 5.428571428571429 | +| 1970-01-01T00:03:23 | 0.0 | +| 1970-01-01T00:03:30 | 10.428571428571429 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear); +name: diskio ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:13 | 82.0 | +| 1970-01-01T00:02:20 | 82.0 | +| 1970-01-01T00:02:27 | 187.0 | +| 1970-01-01T00:02:34 | 112.0 | +| 1970-01-01T00:02:41 | 55.0 | +| 1970-01-01T00:02:48 | 55.0 | +| 1970-01-01T00:02:55 | 219.0 | +| 1970-01-01T00:03:02 | 37.5 | +| 1970-01-01T00:03:09 | 37.5 | +| 1970-01-01T00:03:16 | 76.0 | +| 1970-01-01T00:03:23 | 73.0 | +| 1970-01-01T00:03:30 | 73.0 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear); +name: diskio ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:13 | 5.857142857142857 | +| 1970-01-01T00:02:20 | 5.857142857142857 | +| 1970-01-01T00:02:27 | 13.357142857142858 | +| 1970-01-01T00:02:34 | 8.0 | +| 1970-01-01T00:02:41 | 3.9285714285714284 | +| 1970-01-01T00:02:48 | 3.9285714285714284 | +| 1970-01-01T00:02:55 | 15.642857142857142 | +| 1970-01-01T00:03:02 | 2.6785714285714284 | +| 1970-01-01T00:03:09 | 2.6785714285714284 | +| 1970-01-01T00:03:16 | 5.428571428571429 | +| 1970-01-01T00:03:23 | 5.214285714285714 | +| 1970-01-01T00:03:30 | 5.214285714285714 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(mean(usage_idle)) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu; +name: cpu +tags: cpu=cpu0 ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:00 | 0.36666666666666003 | +| 1970-01-01T00:02:30 | 1.566666666666677 | ++---------------------+-------------------------+ +name: cpu +tags: cpu=cpu1 ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:00 | 0.36666666666667425 | ++---------------------+-------------------------+ +-- InfluxQL: SELECT non_negative_derivative(mean(usage_idle), 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu; +name: cpu +tags: cpu=cpu0 ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:00 | 0.006111111111111 | +| 1970-01-01T00:02:30 | 0.026111111111111286 | ++---------------------+-------------------------+ +name: cpu +tags: cpu=cpu1 ++---------------------+-------------------------+ +| time | non_negative_derivative | ++---------------------+-------------------------+ +| 1970-01-01T00:02:00 | 0.006111111111111237 | ++---------------------+-------------------------+ \ No newline at end of file diff --git a/iox_query_influxql/src/plan/planner.rs b/iox_query_influxql/src/plan/planner.rs index c735fdf20f..f7cd18fe4a 100644 --- a/iox_query_influxql/src/plan/planner.rs +++ b/iox_query_influxql/src/plan/planner.rs @@ -6,8 +6,14 @@ use crate::plan::planner::select::{ }; use crate::plan::planner_time_range_expression::time_range_to_df_expr; use crate::plan::rewriter::{find_table_names, rewrite_statement, ProjectionType}; -use crate::plan::udaf::{DIFFERENCE, MOVING_AVERAGE, NON_NEGATIVE_DIFFERENCE}; -use crate::plan::udf::{difference, find_window_udfs, moving_average, non_negative_difference}; +use crate::plan::udaf::{ + derivative_udf, non_negative_derivative_udf, DIFFERENCE, MOVING_AVERAGE, + NON_NEGATIVE_DIFFERENCE, +}; +use crate::plan::udf::{ + derivative, difference, find_window_udfs, moving_average, non_negative_derivative, + non_negative_difference, +}; use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, Schemas}; use crate::plan::var_ref::var_ref_data_type_to_data_type; use crate::plan::{error, planner_rewrite_expression, udf, util_copy}; @@ -1092,7 +1098,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { let window_func_exprs = udfs .clone() .into_iter() - .map(|e| Self::udf_to_expr(e, partition_by.clone(), order_by.clone())) + .map(|e| Self::udf_to_expr(ctx, e, partition_by.clone(), order_by.clone())) .collect::>>()?; let plan = LogicalPlanBuilder::from(input) @@ -1118,7 +1124,12 @@ impl<'a> InfluxQLToLogicalPlan<'a> { } /// Transform a UDF to a window expression. - fn udf_to_expr(e: Expr, partition_by: Vec, order_by: Vec) -> Result { + fn udf_to_expr( + ctx: &Context<'_>, + e: Expr, + partition_by: Vec, + order_by: Vec, + ) -> Result { let alias = e .display_name() // display_name is known only to fail with Expr::Sort and Expr::QualifiedWildcard, @@ -1129,6 +1140,20 @@ impl<'a> InfluxQLToLogicalPlan<'a> { return error::internal(format!("udf_to_expr: unexpected expression: {e}")) }; + fn derivative_unit(ctx: &Context<'_>, args: &Vec) -> Result { + if args.len() > 1 { + if let Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(v))) = args[1] { + Ok(v as i64) + } else { + error::internal(format!("udf_to_expr: unexpected expression: {}", args[1])) + } + } else if let Some(interval) = ctx.interval { + Ok(interval.duration) + } else { + Ok(1000000000) // 1s + } + } + match udf::WindowFunction::try_from_scalar_udf(Arc::clone(&fun)) { Some(udf::WindowFunction::MovingAverage) => Ok(Expr::WindowFunction(WindowFunction { fun: window_function::WindowFunction::AggregateUDF(MOVING_AVERAGE.clone()), @@ -1170,6 +1195,36 @@ impl<'a> InfluxQLToLogicalPlan<'a> { }) .alias(alias)) } + Some(udf::WindowFunction::Derivative) => Ok(Expr::WindowFunction(WindowFunction { + fun: window_function::WindowFunction::AggregateUDF( + derivative_udf(derivative_unit(ctx, &args)?).into(), + ), + args: vec!["time".as_expr(), args[0].clone()], + partition_by, + order_by, + window_frame: WindowFrame { + units: WindowFrameUnits::Rows, + start_bound: WindowFrameBound::Preceding(ScalarValue::Null), + end_bound: WindowFrameBound::CurrentRow, + }, + }) + .alias(alias)), + Some(udf::WindowFunction::NonNegativeDerivative) => { + Ok(Expr::WindowFunction(WindowFunction { + fun: window_function::WindowFunction::AggregateUDF( + non_negative_derivative_udf(derivative_unit(ctx, &args)?).into(), + ), + args: vec!["time".as_expr(), args[0].clone()], + partition_by, + order_by, + window_frame: WindowFrame { + units: WindowFrameUnits::Rows, + start_bound: WindowFrameBound::Preceding(ScalarValue::Null), + end_bound: WindowFrameBound::CurrentRow, + }, + }) + .alias(alias)) + } None => error::internal(format!( "unexpected user-defined window function: {}", fun.name @@ -1447,7 +1502,9 @@ impl<'a> InfluxQLToLogicalPlan<'a> { Some(v.timestamp_nanos()), None, ))), - Literal::Duration(_) => error::not_implemented("duration literal"), + Literal::Duration(v) => { + Ok(lit(ScalarValue::IntervalMonthDayNano(Some((**v).into())))) + } Literal::Regex(re) => match scope { // a regular expression in a projection list is unexpected, // as it should have been expanded by the rewriter. @@ -1516,6 +1573,22 @@ impl<'a> InfluxQLToLogicalPlan<'a> { } } + fn check_arg_count_range( + name: &str, + args: &[IQLExpr], + min: usize, + max: usize, + ) -> Result<()> { + let got = args.len(); + if got < min || got > max { + error::query(format!( + "invalid number of arguments for {name}: expected between {min} and {max}, got {got}" + )) + } else { + Ok(()) + } + } + let Call { name, args } = call; match name.as_str() { @@ -1624,6 +1697,38 @@ impl<'a> InfluxQLToLogicalPlan<'a> { Ok(moving_average(vec![arg0, lit(arg1)])) } + "derivative" => { + check_arg_count_range(name, args, 1, 2)?; + + // arg0 should be a column or function + let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?; + if let Expr::Literal(ScalarValue::Null) = arg0 { + return Ok(arg0); + } + let mut eargs = vec![arg0]; + if args.len() > 1 { + let arg1 = self.expr_to_df_expr(scope, &args[1], schemas)?; + eargs.push(arg1); + } + + Ok(derivative(eargs)) + } + "non_negative_derivative" => { + check_arg_count_range(name, args, 1, 2)?; + + // arg0 should be a column or function + let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?; + if let Expr::Literal(ScalarValue::Null) = arg0 { + return Ok(arg0); + } + let mut eargs = vec![arg0]; + if args.len() > 1 { + let arg1 = self.expr_to_df_expr(scope, &args[1], schemas)?; + eargs.push(arg1); + } + + Ok(non_negative_derivative(eargs)) + } _ => error::query(format!("Invalid function '{name}'")), } } @@ -3491,6 +3596,58 @@ mod test { assert_snapshot!(plan("SELECT MOVING_AVERAGE(MEAN(usage_idle), usage_system) FROM cpu GROUP BY TIME(10s)"), @"Error during planning: expected integer argument in moving_average()"); } + #[test] + fn test_derivative() { + // no aggregates + assert_snapshot!(plan("SELECT DERIVATIVE(usage_idle) FROM cpu"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), derivative:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, derivative [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), derivative:Float64;N] + Filter: NOT derivative IS NULL [time:Timestamp(Nanosecond, None), derivative:Float64;N] + Projection: cpu.time AS time, derivative(cpu.usage_idle) AS derivative [time:Timestamp(Nanosecond, None), derivative:Float64;N] + WindowAggr: windowExpr=[[AggregateUDF { name: "derivative(unit: 1000000000)", signature: Signature { type_signature: OneOf([Exact([Timestamp(Nanosecond, None), Int64]), Exact([Timestamp(Nanosecond, None), UInt64]), Exact([Timestamp(Nanosecond, None), Float64])]), volatility: Immutable }, fun: "" }(cpu.time, cpu.usage_idle) ORDER BY [cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS derivative(cpu.usage_idle)]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, derivative(cpu.usage_idle):Float64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + + // aggregate + assert_snapshot!(plan("SELECT DERIVATIVE(MEAN(usage_idle)) FROM cpu GROUP BY TIME(10s)"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, derivative:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, derivative [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, derivative:Float64;N] + Filter: NOT derivative IS NULL [time:Timestamp(Nanosecond, None);N, derivative:Float64;N] + Projection: time, derivative(AVG(cpu.usage_idle)) AS derivative [time:Timestamp(Nanosecond, None);N, derivative:Float64;N] + WindowAggr: windowExpr=[[AggregateUDF { name: "derivative(unit: 10000000000)", signature: Signature { type_signature: OneOf([Exact([Timestamp(Nanosecond, None), Int64]), Exact([Timestamp(Nanosecond, None), UInt64]), Exact([Timestamp(Nanosecond, None), Float64])]), volatility: Immutable }, fun: "" }(time, AVG(cpu.usage_idle)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS derivative(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, derivative(AVG(cpu.usage_idle)):Float64;N] + GapFill: groupBy=[[time]], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] + Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] + Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + } + + #[test] + fn test_non_negative_derivative() { + // no aggregates + assert_snapshot!(plan("SELECT NON_NEGATIVE_DERIVATIVE(usage_idle) FROM cpu"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), non_negative_derivative:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, non_negative_derivative [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), non_negative_derivative:Float64;N] + Filter: NOT non_negative_derivative IS NULL [time:Timestamp(Nanosecond, None), non_negative_derivative:Float64;N] + Projection: cpu.time AS time, non_negative_derivative(cpu.usage_idle) AS non_negative_derivative [time:Timestamp(Nanosecond, None), non_negative_derivative:Float64;N] + WindowAggr: windowExpr=[[AggregateUDF { name: "non_negative_derivative(unit: 1000000000)", signature: Signature { type_signature: OneOf([Exact([Timestamp(Nanosecond, None), Int64]), Exact([Timestamp(Nanosecond, None), UInt64]), Exact([Timestamp(Nanosecond, None), Float64])]), volatility: Immutable }, fun: "" }(cpu.time, cpu.usage_idle) ORDER BY [cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS non_negative_derivative(cpu.usage_idle)]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, non_negative_derivative(cpu.usage_idle):Float64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + + // aggregate + assert_snapshot!(plan("SELECT NON_NEGATIVE_DERIVATIVE(MEAN(usage_idle)) FROM cpu GROUP BY TIME(10s)"), @r###" + Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, non_negative_derivative:Float64;N] + Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, non_negative_derivative [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, non_negative_derivative:Float64;N] + Filter: NOT non_negative_derivative IS NULL [time:Timestamp(Nanosecond, None);N, non_negative_derivative:Float64;N] + Projection: time, non_negative_derivative(AVG(cpu.usage_idle)) AS non_negative_derivative [time:Timestamp(Nanosecond, None);N, non_negative_derivative:Float64;N] + WindowAggr: windowExpr=[[AggregateUDF { name: "non_negative_derivative(unit: 10000000000)", signature: Signature { type_signature: OneOf([Exact([Timestamp(Nanosecond, None), Int64]), Exact([Timestamp(Nanosecond, None), UInt64]), Exact([Timestamp(Nanosecond, None), Float64])]), volatility: Immutable }, fun: "" }(time, AVG(cpu.usage_idle)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS non_negative_derivative(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, non_negative_derivative(AVG(cpu.usage_idle)):Float64;N] + GapFill: groupBy=[[time]], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] + Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] + Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] + "###); + } + #[test] fn test_not_implemented() { assert_snapshot!(plan("SELECT DIFFERENCE(MEAN(usage_idle)), MEAN(usage_idle) FROM cpu GROUP BY TIME(10s)"), @"This feature is not implemented: mixed window-aggregate and aggregate columns, such as DIFFERENCE(MEAN(col)), MEAN(col)"); diff --git a/iox_query_influxql/src/plan/udaf.rs b/iox_query_influxql/src/plan/udaf.rs index 5f4a761d9d..a1b6eac250 100644 --- a/iox_query_influxql/src/plan/udaf.rs +++ b/iox_query_influxql/src/plan/udaf.rs @@ -1,12 +1,13 @@ use crate::plan::error; use arrow::array::{Array, ArrayRef, Int64Array}; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, TimeUnit}; use datafusion::common::{downcast_value, DataFusionError, Result, ScalarValue}; use datafusion::logical_expr::{ Accumulator, AccumulatorFunctionImplementation, AggregateUDF, ReturnTypeFunction, Signature, StateTypeFunction, TypeSignature, Volatility, }; use once_cell::sync::Lazy; +use std::mem::replace; use std::sync::Arc; /// A list of the numeric types supported by InfluxQL that can be be used @@ -247,8 +248,11 @@ pub(crate) const NON_NEGATIVE_DIFFERENCE_NAME: &str = "non_negative_difference"; /// Definition of the `NON_NEGATIVE_DIFFERENCE` user-defined aggregate function. pub(crate) static NON_NEGATIVE_DIFFERENCE: Lazy> = Lazy::new(|| { let return_type: ReturnTypeFunction = Arc::new(|dt| Ok(Arc::new(dt[0].clone()))); - let accumulator: AccumulatorFunctionImplementation = - Arc::new(|dt| Ok(Box::new(NonNegativeDifferenceAccumulator::new(dt)))); + let accumulator: AccumulatorFunctionImplementation = Arc::new(|dt| { + Ok(Box::new(NonNegative::<_>::new(DifferenceAccumulator::new( + dt, + )))) + }); let state_type: StateTypeFunction = Arc::new(|_| Ok(Arc::new(vec![]))); Arc::new(AggregateUDF::new( NON_NEGATIVE_DIFFERENCE_NAME, @@ -266,37 +270,30 @@ pub(crate) static NON_NEGATIVE_DIFFERENCE: Lazy> = Lazy::new(| )) }); +/// NonNegative is a wrapper around an Accumulator that transposes +/// negative value to be NULL. #[derive(Debug)] -struct NonNegativeDifferenceAccumulator { - acc: DifferenceAccumulator, +struct NonNegative { + acc: T, } -impl NonNegativeDifferenceAccumulator { - fn new(data_type: &DataType) -> Self { - let acc = DifferenceAccumulator::new(data_type); +impl NonNegative { + fn new(acc: T) -> Self { Self { acc } } } -impl Accumulator for NonNegativeDifferenceAccumulator { - /// `state` is only called when used as an aggregate function. It can be - /// can safely left unimplemented, as this accumulator is only used as a window aggregate. - /// - /// See: +impl Accumulator for NonNegative { fn state(&self) -> Result> { - error::internal("unexpected call to NonNegativeDifferenceAccumulator::state") + self.acc.state() } fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { self.acc.update_batch(values) } - /// `merge_batch` is only called when used as an aggregate function. It can be - /// can safely left unimplemented, as this accumulator is only used as a window aggregate. - /// - /// See: - fn merge_batch(&mut self, _states: &[ArrayRef]) -> Result<()> { - error::internal("unexpected call to NonNegativeDifferenceAccumulator::merge_batch") + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.acc.merge_batch(states) } fn evaluate(&self) -> Result { @@ -311,3 +308,183 @@ impl Accumulator for NonNegativeDifferenceAccumulator { self.acc.size() } } + +/// Name of the `DERIVATIVE` user-defined aggregate function. +pub(crate) const DERIVATIVE_NAME: &str = "derivative"; + +pub(crate) fn derivative_udf(unit: i64) -> AggregateUDF { + let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64))); + let accumulator: AccumulatorFunctionImplementation = + Arc::new(move |_| Ok(Box::new(DerivativeAccumulator::new(unit)))); + let state_type: StateTypeFunction = Arc::new(|_| Ok(Arc::new(vec![]))); + let sig = Signature::one_of( + NUMERICS + .iter() + .map(|dt| { + TypeSignature::Exact(vec![ + DataType::Timestamp(TimeUnit::Nanosecond, None), + dt.clone(), + ]) + }) + .collect(), + Volatility::Immutable, + ); + AggregateUDF::new( + format!("{DERIVATIVE_NAME}(unit: {unit})").as_str(), + &sig, + &return_type, + &accumulator, + // State shouldn't be called, so no schema to report + &state_type, + ) +} + +/// Name of the `NON_NEGATIVE_DERIVATIVE` user-defined aggregate function. +pub(crate) const NON_NEGATIVE_DERIVATIVE_NAME: &str = "non_negative_derivative"; + +pub(crate) fn non_negative_derivative_udf(unit: i64) -> AggregateUDF { + let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64))); + let accumulator: AccumulatorFunctionImplementation = Arc::new(move |_| { + Ok(Box::new(NonNegative::<_>::new(DerivativeAccumulator::new( + unit, + )))) + }); + let state_type: StateTypeFunction = Arc::new(|_| Ok(Arc::new(vec![]))); + let sig = Signature::one_of( + NUMERICS + .iter() + .map(|dt| { + TypeSignature::Exact(vec![ + DataType::Timestamp(TimeUnit::Nanosecond, None), + dt.clone(), + ]) + }) + .collect(), + Volatility::Immutable, + ); + AggregateUDF::new( + format!("{NON_NEGATIVE_DERIVATIVE_NAME}(unit: {unit})").as_str(), + &sig, + &return_type, + &accumulator, + // State shouldn't be called, so no schema to report + &state_type, + ) +} + +#[derive(Debug)] +pub(super) struct DerivativeAccumulator { + unit: i64, + prev: Option, + curr: Option, +} + +impl DerivativeAccumulator { + fn new(unit: i64) -> Self { + Self { + unit, + prev: None, + curr: None, + } + } +} + +impl Accumulator for DerivativeAccumulator { + /// `state` is only called when used as an aggregate function. It can be + /// can safely left unimplemented, as this accumulator is only used as a window aggregate. + /// + /// See: + fn state(&self) -> Result> { + error::internal("unexpected call to DerivativeAccumulator::state") + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + + let times = &values[0]; + let arr = &values[1]; + for index in 0..arr.len() { + let time = match ScalarValue::try_from_array(times, index)? { + ScalarValue::TimestampNanosecond(Some(ts), _) => ts, + v => { + return Err(DataFusionError::Internal(format!( + "invalid time value: {}", + v + ))) + } + }; + let curr = Point::new(time, ScalarValue::try_from_array(arr, index)?); + let prev = replace(&mut self.curr, curr); + + // don't replace the previous value if the current value has the same timestamp. + if self.prev.is_none() + || prev + .as_ref() + .is_some_and(|prev| prev.time > self.prev.as_ref().unwrap().time) + { + self.prev = prev + } + } + Ok(()) + } + + /// `merge_batch` is only called when used as an aggregate function. It can be + /// can safely left unimplemented, as this accumulator is only used as a window aggregate. + /// + /// See: + fn merge_batch(&mut self, _states: &[ArrayRef]) -> Result<()> { + error::internal("unexpected call to DerivativeAccumulator::merge_batch") + } + + fn evaluate(&self) -> Result { + Ok(ScalarValue::Float64( + self.curr + .as_ref() + .and_then(|c| c.derivative(self.prev.as_ref(), self.unit)), + )) + } + + fn size(&self) -> usize { + std::mem::size_of_val(self) + } +} + +#[derive(Debug)] +struct Point { + time: i64, + value: ScalarValue, +} + +impl Point { + fn new(time: i64, value: ScalarValue) -> Option { + if value.is_null() { + None + } else { + Some(Self { time, value }) + } + } + + fn value_as_f64(&self) -> f64 { + match self.value { + ScalarValue::Int64(Some(v)) => v as f64, + ScalarValue::Float64(Some(v)) => v, + ScalarValue::UInt64(Some(v)) => v as f64, + _ => panic!("invalid point {:?}", self), + } + } + + fn derivative(&self, prev: Option<&Self>, unit: i64) -> Option { + prev.and_then(|prev| { + let diff = self.value_as_f64() - prev.value_as_f64(); + let elapsed = match self.time - prev.time { + // if the time hasn't changed then it is a NULL. + 0 => return None, + v => v, + } as f64; + let devisor = elapsed / (unit as f64); + Some(diff / devisor) + }) + } +} diff --git a/iox_query_influxql/src/plan/udf.rs b/iox_query_influxql/src/plan/udf.rs index 8581ff1f03..728aeffbc1 100644 --- a/iox_query_influxql/src/plan/udf.rs +++ b/iox_query_influxql/src/plan/udf.rs @@ -20,6 +20,8 @@ pub(super) enum WindowFunction { MovingAverage, Difference, NonNegativeDifference, + Derivative, + NonNegativeDerivative, } impl WindowFunction { @@ -29,6 +31,8 @@ impl WindowFunction { MOVING_AVERAGE_UDF_NAME => Some(Self::MovingAverage), DIFFERENCE_UDF_NAME => Some(Self::Difference), NON_NEGATIVE_DIFFERENCE_UDF_NAME => Some(Self::NonNegativeDifference), + DERIVATIVE_UDF_NAME => Some(Self::Derivative), + NON_NEGATIVE_DERIVATIVE_UDF_NAME => Some(Self::NonNegativeDerivative), _ => None, } } @@ -117,6 +121,54 @@ static NON_NEGATIVE_DIFFERENCE: Lazy> = Lazy::new(|| { )) }); +const DERIVATIVE_UDF_NAME: &str = "derivative"; + +/// Create an expression to represent the `DERIVATIVE` function. +pub(crate) fn derivative(args: Vec) -> Expr { + DERIVATIVE.call(args) +} + +/// Definition of the `DERIVATIVE` function. +static DERIVATIVE: Lazy> = Lazy::new(|| { + let return_type_fn: ReturnTypeFunction = Arc::new(|args| Ok(Arc::new(args[0].clone()))); + Arc::new(ScalarUDF::new( + DERIVATIVE_UDF_NAME, + &Signature::one_of( + NUMERICS + .iter() + .map(|dt| TypeSignature::Exact(vec![dt.clone()])) + .collect(), + Volatility::Immutable, + ), + &return_type_fn, + &stand_in_impl(DERIVATIVE_UDF_NAME), + )) +}); + +const NON_NEGATIVE_DERIVATIVE_UDF_NAME: &str = "non_negative_derivative"; + +/// Create an expression to represent the `NON_NEGATIVE_DERIVATIVE` function. +pub(crate) fn non_negative_derivative(args: Vec) -> Expr { + NON_NEGATIVE_DERIVATIVE.call(args) +} + +/// Definition of the `NON_NEGATIVE_DERIVATIVE` function. +static NON_NEGATIVE_DERIVATIVE: Lazy> = Lazy::new(|| { + let return_type_fn: ReturnTypeFunction = Arc::new(|args| Ok(Arc::new(args[0].clone()))); + Arc::new(ScalarUDF::new( + NON_NEGATIVE_DERIVATIVE_UDF_NAME, + &Signature::one_of( + NUMERICS + .iter() + .map(|dt| TypeSignature::Exact(vec![dt.clone()])) + .collect(), + Volatility::Immutable, + ), + &return_type_fn, + &stand_in_impl(NON_NEGATIVE_DERIVATIVE_UDF_NAME), + )) +}); + /// Returns an implementation that always returns an error. fn stand_in_impl(name: &'static str) -> ScalarFunctionImplementation { Arc::new(move |_| error::internal(format!("{name} should not exist in the final logical plan")))