feat(influxql): add derivative and non_negative_derivative (#8103)

Add the DERIVATIVE and NON_NEGATIVE_DERIVATIVE functions to influxql.
These are used to calculate derivatives over arbitrary time units.
The implementation is modeled after the DIFFERENCE and
NON_NEGATIVE_DIFFERENCE functions, with a difference that the unit
parameters is a configuration of the user-defined aggregator function
and therefore there cannot be a single shared definition of the
function.

The NON_NEGATIVE_DIFFERENCE function implementation has been
refactored to be an arbitrary NON_NEGATIVE wrapper for any Accumulator
function.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Martin Hilton 2023-06-29 06:53:18 +01:00 committed by GitHub
parent 3a8a8a153e
commit 511a0bae78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1038 additions and 27 deletions

View File

@ -89,4 +89,62 @@ SELECT difference(mean(usage_idle)), non_negative_difference(mean(usage_idle)),
-- SELECT moving_average(mean(writes), 3), mean(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(none);
-- SELECT moving_average(mean(writes), 3), mean(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0);
-- SELECT moving_average(mean(writes), 3), mean(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous);
-- SELECT moving_average(mean(writes), 3), mean(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear);
-- SELECT moving_average(mean(writes), 3), mean(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear);
--
-- derivative
--
SELECT derivative(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001;
SELECT derivative(writes, 2s) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001;
SELECT derivative(usage_system) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu0';
SELECT derivative(usage_system, 5s) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu1';
-- group by a tag
SELECT derivative(usage_idle) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu;
SELECT derivative(usage_idle, 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu;
--
-- derivative + aggregate
--
SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s);
SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s);
SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s);
SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s);
-- the input data is regular data at 10s intervals, so 7s windows ensure the `mean` generates windows with NULL values to test NULL handling of difference
SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0);
SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0);
SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous);
SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous);
SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear);
SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear);
-- group by time and a tag
SELECT derivative(mean(usage_idle)) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu;
SELECT derivative(mean(usage_idle), 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu;
--
-- non_negative_derivative
--
SELECT non_negative_derivative(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001;
SELECT non_negative_derivative(writes, 2s) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001;
SELECT non_negative_derivative(usage_system) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu0';
SELECT non_negative_derivative(usage_system, 5s) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu1';
-- group by a tag
SELECT non_negative_derivative(usage_idle) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu;
SELECT non_negative_derivative(usage_idle, 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu;
--
-- non_negative_derivative + aggregate
--
SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s);
SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s);
SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s);
SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s);
-- the input data is regular data at 10s intervals, so 7s windows ensure the `mean` generates windows with NULL values to test NULL handling of difference
SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0);
SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0);
SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous);
SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous);
SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear);
SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear);
-- group by time and a tag
SELECT non_negative_derivative(mean(usage_idle)) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu;
SELECT non_negative_derivative(mean(usage_idle), 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu;

View File

@ -350,4 +350,571 @@ tags: cpu=cpu1
| 1970-01-01T00:02:00 | 0.36666666666667425 | 0.36666666666667425 | |
| 1970-01-01T00:02:30 | -0.03333333333334565 | | |
| 1970-01-01T00:03:00 | -0.03333333333333144 | | 99.75 |
+---------------------+----------------------+-------------------------+----------------+
+---------------------+----------------------+-------------------------+----------------+
-- InfluxQL: SELECT derivative(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001;
name: diskio
+---------------------+------------+
| time | derivative |
+---------------------+------------+
| 1970-01-01T00:02:20 | 16.4 |
| 1970-01-01T00:02:30 | 18.7 |
| 1970-01-01T00:02:40 | 11.2 |
| 1970-01-01T00:02:50 | 11.0 |
| 1970-01-01T00:03:00 | 21.9 |
| 1970-01-01T00:03:10 | 7.5 |
| 1970-01-01T00:03:20 | 7.6 |
| 1970-01-01T00:03:30 | 14.6 |
+---------------------+------------+
-- InfluxQL: SELECT derivative(writes, 2s) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001;
name: diskio
+---------------------+------------+
| time | derivative |
+---------------------+------------+
| 1970-01-01T00:02:20 | 32.8 |
| 1970-01-01T00:02:30 | 37.4 |
| 1970-01-01T00:02:40 | 22.4 |
| 1970-01-01T00:02:50 | 22.0 |
| 1970-01-01T00:03:00 | 43.8 |
| 1970-01-01T00:03:10 | 15.0 |
| 1970-01-01T00:03:20 | 15.2 |
| 1970-01-01T00:03:30 | 29.2 |
+---------------------+------------+
-- InfluxQL: SELECT derivative(usage_system) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu0';
name: cpu
+---------------------+----------------------+
| time | derivative |
+---------------------+----------------------+
| 1970-01-01T00:01:10 | -0.09000000000000057 |
| 1970-01-01T00:01:30 | -0.25999999999999945 |
| 1970-01-01T00:01:40 | 0.4299999999999997 |
| 1970-01-01T00:02:10 | 0.06999999999999981 |
| 1970-01-01T00:02:50 | 0.0 |
| 1970-01-01T00:03:00 | 0.020000000000000285 |
+---------------------+----------------------+
-- InfluxQL: SELECT derivative(usage_system, 5s) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu1';
name: cpu
+---------------------+-----------------------+
| time | derivative |
+---------------------+-----------------------+
| 1970-01-01T00:01:10 | -0.04999999999999716 |
| 1970-01-01T00:02:00 | 0.020000000000000285 |
| 1970-01-01T00:02:10 | -0.05000000000000426 |
| 1970-01-01T00:02:20 | 0.05000000000000426 |
| 1970-01-01T00:03:00 | -0.012500000000001066 |
| 1970-01-01T00:03:10 | 0.0 |
+---------------------+-----------------------+
-- InfluxQL: SELECT derivative(usage_idle) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu;
name: cpu
tags: cpu=cpu0
+---------------------+-----------------------+
| time | derivative |
+---------------------+-----------------------+
| 1970-01-01T00:02:20 | -0.07999999999999971 |
| 1970-01-01T00:02:30 | 0.14000000000000057 |
| 1970-01-01T00:02:40 | -0.020000000000000285 |
| 1970-01-01T00:02:50 | -0.04000000000000057 |
| 1970-01-01T00:03:00 | 0.020000000000000285 |
| 1970-01-01T00:03:10 | -0.12000000000000029 |
+---------------------+-----------------------+
name: cpu
tags: cpu=cpu1
+---------------------+-----------------------+
| time | derivative |
+---------------------+-----------------------+
| 1970-01-01T00:02:20 | 0.010000000000000852 |
| 1970-01-01T00:02:30 | 0.0 |
| 1970-01-01T00:02:40 | -0.010000000000000852 |
| 1970-01-01T00:02:50 | 0.0 |
| 1970-01-01T00:03:00 | 0.0 |
| 1970-01-01T00:03:10 | 0.0 |
+---------------------+-----------------------+
-- InfluxQL: SELECT derivative(usage_idle, 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu;
name: cpu
tags: cpu=cpu0
+---------------------+-----------------------+
| time | derivative |
+---------------------+-----------------------+
| 1970-01-01T00:02:20 | -0.039999999999999855 |
| 1970-01-01T00:02:30 | 0.07000000000000028 |
| 1970-01-01T00:02:40 | -0.010000000000000142 |
| 1970-01-01T00:02:50 | -0.020000000000000285 |
| 1970-01-01T00:03:00 | 0.010000000000000142 |
| 1970-01-01T00:03:10 | -0.060000000000000143 |
+---------------------+-----------------------+
name: cpu
tags: cpu=cpu1
+---------------------+-----------------------+
| time | derivative |
+---------------------+-----------------------+
| 1970-01-01T00:02:20 | 0.005000000000000426 |
| 1970-01-01T00:02:30 | 0.0 |
| 1970-01-01T00:02:40 | -0.005000000000000426 |
| 1970-01-01T00:02:50 | 0.0 |
| 1970-01-01T00:03:00 | 0.0 |
| 1970-01-01T00:03:10 | 0.0 |
+---------------------+-----------------------+
-- InfluxQL: SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s);
name: diskio
+---------------------+------------+
| time | derivative |
+---------------------+------------+
| 1970-01-01T00:02:20 | 82.0 |
| 1970-01-01T00:02:27 | 187.0 |
| 1970-01-01T00:02:34 | 112.0 |
| 1970-01-01T00:02:48 | 55.0 |
| 1970-01-01T00:02:55 | 219.0 |
| 1970-01-01T00:03:09 | 37.5 |
| 1970-01-01T00:03:16 | 76.0 |
| 1970-01-01T00:03:30 | 73.0 |
+---------------------+------------+
-- InfluxQL: SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s);
name: diskio
+---------------------+--------------------+
| time | derivative |
+---------------------+--------------------+
| 1970-01-01T00:02:20 | 5.857142857142857 |
| 1970-01-01T00:02:27 | 13.357142857142858 |
| 1970-01-01T00:02:34 | 8.0 |
| 1970-01-01T00:02:48 | 3.9285714285714284 |
| 1970-01-01T00:02:55 | 15.642857142857142 |
| 1970-01-01T00:03:09 | 2.6785714285714284 |
| 1970-01-01T00:03:16 | 5.428571428571429 |
| 1970-01-01T00:03:30 | 5.214285714285714 |
+---------------------+--------------------+
-- InfluxQL: SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s);
name: diskio
+---------------------+-------------------+
| time | derivative |
+---------------------+-------------------+
| 1970-01-01T00:02:00 | 389.3333333330229 |
| 1970-01-01T00:02:30 | 431.0 |
| 1970-01-01T00:03:00 | 405.0 |
| 1970-01-01T00:03:30 | 221.6666666669771 |
+---------------------+-------------------+
-- InfluxQL: SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s);
name: diskio
+---------------------+--------------------+
| time | derivative |
+---------------------+--------------------+
| 1970-01-01T00:02:00 | 6.488888888883715 |
| 1970-01-01T00:02:30 | 7.183333333333334 |
| 1970-01-01T00:03:00 | 6.75 |
| 1970-01-01T00:03:30 | 3.6944444444496183 |
+---------------------+--------------------+
-- InfluxQL: SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0);
name: diskio
+---------------------+------------+
| time | derivative |
+---------------------+------------+
| 1970-01-01T00:02:06 | 5592646.0 |
| 1970-01-01T00:02:13 | -5592646.0 |
| 1970-01-01T00:02:20 | 5592810.0 |
| 1970-01-01T00:02:27 | 187.0 |
| 1970-01-01T00:02:34 | 112.0 |
| 1970-01-01T00:02:41 | -5593109.0 |
| 1970-01-01T00:02:48 | 5593219.0 |
| 1970-01-01T00:02:55 | 219.0 |
| 1970-01-01T00:03:02 | -5593438.0 |
| 1970-01-01T00:03:09 | 5593513.0 |
| 1970-01-01T00:03:16 | 76.0 |
| 1970-01-01T00:03:23 | -5593589.0 |
| 1970-01-01T00:03:30 | 5593735.0 |
+---------------------+------------+
-- InfluxQL: SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0);
name: diskio
+---------------------+---------------------+
| time | derivative |
+---------------------+---------------------+
| 1970-01-01T00:02:06 | 399474.71428571426 |
| 1970-01-01T00:02:13 | -399474.71428571426 |
| 1970-01-01T00:02:20 | 399486.4285714286 |
| 1970-01-01T00:02:27 | 13.357142857142858 |
| 1970-01-01T00:02:34 | 8.0 |
| 1970-01-01T00:02:41 | -399507.78571428574 |
| 1970-01-01T00:02:48 | 399515.64285714284 |
| 1970-01-01T00:02:55 | 15.642857142857142 |
| 1970-01-01T00:03:02 | -399531.28571428574 |
| 1970-01-01T00:03:09 | 399536.64285714284 |
| 1970-01-01T00:03:16 | 5.428571428571429 |
| 1970-01-01T00:03:23 | -399542.0714285714 |
| 1970-01-01T00:03:30 | 399552.5 |
+---------------------+---------------------+
-- InfluxQL: SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous);
name: diskio
+---------------------+------------+
| time | derivative |
+---------------------+------------+
| 1970-01-01T00:02:13 | 0.0 |
| 1970-01-01T00:02:20 | 164.0 |
| 1970-01-01T00:02:27 | 187.0 |
| 1970-01-01T00:02:34 | 112.0 |
| 1970-01-01T00:02:41 | 0.0 |
| 1970-01-01T00:02:48 | 110.0 |
| 1970-01-01T00:02:55 | 219.0 |
| 1970-01-01T00:03:02 | 0.0 |
| 1970-01-01T00:03:09 | 75.0 |
| 1970-01-01T00:03:16 | 76.0 |
| 1970-01-01T00:03:23 | 0.0 |
| 1970-01-01T00:03:30 | 146.0 |
+---------------------+------------+
-- InfluxQL: SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous);
name: diskio
+---------------------+--------------------+
| time | derivative |
+---------------------+--------------------+
| 1970-01-01T00:02:13 | 0.0 |
| 1970-01-01T00:02:20 | 11.714285714285714 |
| 1970-01-01T00:02:27 | 13.357142857142858 |
| 1970-01-01T00:02:34 | 8.0 |
| 1970-01-01T00:02:41 | 0.0 |
| 1970-01-01T00:02:48 | 7.857142857142857 |
| 1970-01-01T00:02:55 | 15.642857142857142 |
| 1970-01-01T00:03:02 | 0.0 |
| 1970-01-01T00:03:09 | 5.357142857142857 |
| 1970-01-01T00:03:16 | 5.428571428571429 |
| 1970-01-01T00:03:23 | 0.0 |
| 1970-01-01T00:03:30 | 10.428571428571429 |
+---------------------+--------------------+
-- InfluxQL: SELECT derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear);
name: diskio
+---------------------+------------+
| time | derivative |
+---------------------+------------+
| 1970-01-01T00:02:13 | 82.0 |
| 1970-01-01T00:02:20 | 82.0 |
| 1970-01-01T00:02:27 | 187.0 |
| 1970-01-01T00:02:34 | 112.0 |
| 1970-01-01T00:02:41 | 55.0 |
| 1970-01-01T00:02:48 | 55.0 |
| 1970-01-01T00:02:55 | 219.0 |
| 1970-01-01T00:03:02 | 37.5 |
| 1970-01-01T00:03:09 | 37.5 |
| 1970-01-01T00:03:16 | 76.0 |
| 1970-01-01T00:03:23 | 73.0 |
| 1970-01-01T00:03:30 | 73.0 |
+---------------------+------------+
-- InfluxQL: SELECT derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear);
name: diskio
+---------------------+--------------------+
| time | derivative |
+---------------------+--------------------+
| 1970-01-01T00:02:13 | 5.857142857142857 |
| 1970-01-01T00:02:20 | 5.857142857142857 |
| 1970-01-01T00:02:27 | 13.357142857142858 |
| 1970-01-01T00:02:34 | 8.0 |
| 1970-01-01T00:02:41 | 3.9285714285714284 |
| 1970-01-01T00:02:48 | 3.9285714285714284 |
| 1970-01-01T00:02:55 | 15.642857142857142 |
| 1970-01-01T00:03:02 | 2.6785714285714284 |
| 1970-01-01T00:03:09 | 2.6785714285714284 |
| 1970-01-01T00:03:16 | 5.428571428571429 |
| 1970-01-01T00:03:23 | 5.214285714285714 |
| 1970-01-01T00:03:30 | 5.214285714285714 |
+---------------------+--------------------+
-- InfluxQL: SELECT derivative(mean(usage_idle)) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu;
name: cpu
tags: cpu=cpu0
+---------------------+---------------------+
| time | derivative |
+---------------------+---------------------+
| 1970-01-01T00:02:00 | 0.36666666666666003 |
| 1970-01-01T00:02:30 | 1.566666666666677 |
| 1970-01-01T00:03:00 | -0.7333333333333343 |
+---------------------+---------------------+
name: cpu
tags: cpu=cpu1
+---------------------+----------------------+
| time | derivative |
+---------------------+----------------------+
| 1970-01-01T00:02:00 | 0.36666666666667425 |
| 1970-01-01T00:02:30 | -0.03333333333334565 |
| 1970-01-01T00:03:00 | -0.03333333333333144 |
+---------------------+----------------------+
-- InfluxQL: SELECT derivative(mean(usage_idle), 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu;
name: cpu
tags: cpu=cpu0
+---------------------+-----------------------+
| time | derivative |
+---------------------+-----------------------+
| 1970-01-01T00:02:00 | 0.006111111111111 |
| 1970-01-01T00:02:30 | 0.026111111111111286 |
| 1970-01-01T00:03:00 | -0.012222222222222238 |
+---------------------+-----------------------+
name: cpu
tags: cpu=cpu1
+---------------------+------------------------+
| time | derivative |
+---------------------+------------------------+
| 1970-01-01T00:02:00 | 0.006111111111111237 |
| 1970-01-01T00:02:30 | -0.0005555555555557608 |
| 1970-01-01T00:03:00 | -0.000555555555555524 |
+---------------------+------------------------+
-- InfluxQL: SELECT non_negative_derivative(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001;
name: diskio
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:20 | 16.4 |
| 1970-01-01T00:02:30 | 18.7 |
| 1970-01-01T00:02:40 | 11.2 |
| 1970-01-01T00:02:50 | 11.0 |
| 1970-01-01T00:03:00 | 21.9 |
| 1970-01-01T00:03:10 | 7.5 |
| 1970-01-01T00:03:20 | 7.6 |
| 1970-01-01T00:03:30 | 14.6 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(writes, 2s) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001;
name: diskio
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:20 | 32.8 |
| 1970-01-01T00:02:30 | 37.4 |
| 1970-01-01T00:02:40 | 22.4 |
| 1970-01-01T00:02:50 | 22.0 |
| 1970-01-01T00:03:00 | 43.8 |
| 1970-01-01T00:03:10 | 15.0 |
| 1970-01-01T00:03:20 | 15.2 |
| 1970-01-01T00:03:30 | 29.2 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(usage_system) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu0';
name: cpu
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:01:40 | 0.4299999999999997 |
| 1970-01-01T00:02:10 | 0.06999999999999981 |
| 1970-01-01T00:02:50 | 0.0 |
| 1970-01-01T00:03:00 | 0.020000000000000285 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(usage_system, 5s) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu1';
name: cpu
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:00 | 0.020000000000000285 |
| 1970-01-01T00:02:20 | 0.05000000000000426 |
| 1970-01-01T00:03:10 | 0.0 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(usage_idle) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu;
name: cpu
tags: cpu=cpu0
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:30 | 0.14000000000000057 |
| 1970-01-01T00:03:00 | 0.020000000000000285 |
+---------------------+-------------------------+
name: cpu
tags: cpu=cpu1
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:20 | 0.010000000000000852 |
| 1970-01-01T00:02:30 | 0.0 |
| 1970-01-01T00:02:50 | 0.0 |
| 1970-01-01T00:03:00 | 0.0 |
| 1970-01-01T00:03:10 | 0.0 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(usage_idle, 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu;
name: cpu
tags: cpu=cpu0
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:30 | 0.07000000000000028 |
| 1970-01-01T00:03:00 | 0.010000000000000142 |
+---------------------+-------------------------+
name: cpu
tags: cpu=cpu1
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:20 | 0.005000000000000426 |
| 1970-01-01T00:02:30 | 0.0 |
| 1970-01-01T00:02:50 | 0.0 |
| 1970-01-01T00:03:00 | 0.0 |
| 1970-01-01T00:03:10 | 0.0 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s);
name: diskio
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:20 | 82.0 |
| 1970-01-01T00:02:27 | 187.0 |
| 1970-01-01T00:02:34 | 112.0 |
| 1970-01-01T00:02:48 | 55.0 |
| 1970-01-01T00:02:55 | 219.0 |
| 1970-01-01T00:03:09 | 37.5 |
| 1970-01-01T00:03:16 | 76.0 |
| 1970-01-01T00:03:30 | 73.0 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s);
name: diskio
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:20 | 5.857142857142857 |
| 1970-01-01T00:02:27 | 13.357142857142858 |
| 1970-01-01T00:02:34 | 8.0 |
| 1970-01-01T00:02:48 | 3.9285714285714284 |
| 1970-01-01T00:02:55 | 15.642857142857142 |
| 1970-01-01T00:03:09 | 2.6785714285714284 |
| 1970-01-01T00:03:16 | 5.428571428571429 |
| 1970-01-01T00:03:30 | 5.214285714285714 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s);
name: diskio
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:00 | 389.3333333330229 |
| 1970-01-01T00:02:30 | 431.0 |
| 1970-01-01T00:03:00 | 405.0 |
| 1970-01-01T00:03:30 | 221.6666666669771 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s);
name: diskio
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:00 | 6.488888888883715 |
| 1970-01-01T00:02:30 | 7.183333333333334 |
| 1970-01-01T00:03:00 | 6.75 |
| 1970-01-01T00:03:30 | 3.6944444444496183 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0);
name: diskio
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:06 | 5592646.0 |
| 1970-01-01T00:02:20 | 5592810.0 |
| 1970-01-01T00:02:27 | 187.0 |
| 1970-01-01T00:02:34 | 112.0 |
| 1970-01-01T00:02:48 | 5593219.0 |
| 1970-01-01T00:02:55 | 219.0 |
| 1970-01-01T00:03:09 | 5593513.0 |
| 1970-01-01T00:03:16 | 76.0 |
| 1970-01-01T00:03:30 | 5593735.0 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0);
name: diskio
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:06 | 399474.71428571426 |
| 1970-01-01T00:02:20 | 399486.4285714286 |
| 1970-01-01T00:02:27 | 13.357142857142858 |
| 1970-01-01T00:02:34 | 8.0 |
| 1970-01-01T00:02:48 | 399515.64285714284 |
| 1970-01-01T00:02:55 | 15.642857142857142 |
| 1970-01-01T00:03:09 | 399536.64285714284 |
| 1970-01-01T00:03:16 | 5.428571428571429 |
| 1970-01-01T00:03:30 | 399552.5 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous);
name: diskio
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:13 | 0.0 |
| 1970-01-01T00:02:20 | 164.0 |
| 1970-01-01T00:02:27 | 187.0 |
| 1970-01-01T00:02:34 | 112.0 |
| 1970-01-01T00:02:41 | 0.0 |
| 1970-01-01T00:02:48 | 110.0 |
| 1970-01-01T00:02:55 | 219.0 |
| 1970-01-01T00:03:02 | 0.0 |
| 1970-01-01T00:03:09 | 75.0 |
| 1970-01-01T00:03:16 | 76.0 |
| 1970-01-01T00:03:23 | 0.0 |
| 1970-01-01T00:03:30 | 146.0 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous);
name: diskio
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:13 | 0.0 |
| 1970-01-01T00:02:20 | 11.714285714285714 |
| 1970-01-01T00:02:27 | 13.357142857142858 |
| 1970-01-01T00:02:34 | 8.0 |
| 1970-01-01T00:02:41 | 0.0 |
| 1970-01-01T00:02:48 | 7.857142857142857 |
| 1970-01-01T00:02:55 | 15.642857142857142 |
| 1970-01-01T00:03:02 | 0.0 |
| 1970-01-01T00:03:09 | 5.357142857142857 |
| 1970-01-01T00:03:16 | 5.428571428571429 |
| 1970-01-01T00:03:23 | 0.0 |
| 1970-01-01T00:03:30 | 10.428571428571429 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear);
name: diskio
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:13 | 82.0 |
| 1970-01-01T00:02:20 | 82.0 |
| 1970-01-01T00:02:27 | 187.0 |
| 1970-01-01T00:02:34 | 112.0 |
| 1970-01-01T00:02:41 | 55.0 |
| 1970-01-01T00:02:48 | 55.0 |
| 1970-01-01T00:02:55 | 219.0 |
| 1970-01-01T00:03:02 | 37.5 |
| 1970-01-01T00:03:09 | 37.5 |
| 1970-01-01T00:03:16 | 76.0 |
| 1970-01-01T00:03:23 | 73.0 |
| 1970-01-01T00:03:30 | 73.0 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear);
name: diskio
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:13 | 5.857142857142857 |
| 1970-01-01T00:02:20 | 5.857142857142857 |
| 1970-01-01T00:02:27 | 13.357142857142858 |
| 1970-01-01T00:02:34 | 8.0 |
| 1970-01-01T00:02:41 | 3.9285714285714284 |
| 1970-01-01T00:02:48 | 3.9285714285714284 |
| 1970-01-01T00:02:55 | 15.642857142857142 |
| 1970-01-01T00:03:02 | 2.6785714285714284 |
| 1970-01-01T00:03:09 | 2.6785714285714284 |
| 1970-01-01T00:03:16 | 5.428571428571429 |
| 1970-01-01T00:03:23 | 5.214285714285714 |
| 1970-01-01T00:03:30 | 5.214285714285714 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(mean(usage_idle)) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu;
name: cpu
tags: cpu=cpu0
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:00 | 0.36666666666666003 |
| 1970-01-01T00:02:30 | 1.566666666666677 |
+---------------------+-------------------------+
name: cpu
tags: cpu=cpu1
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:00 | 0.36666666666667425 |
+---------------------+-------------------------+
-- InfluxQL: SELECT non_negative_derivative(mean(usage_idle), 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu;
name: cpu
tags: cpu=cpu0
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:00 | 0.006111111111111 |
| 1970-01-01T00:02:30 | 0.026111111111111286 |
+---------------------+-------------------------+
name: cpu
tags: cpu=cpu1
+---------------------+-------------------------+
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:00 | 0.006111111111111237 |
+---------------------+-------------------------+

View File

@ -6,8 +6,14 @@ use crate::plan::planner::select::{
};
use crate::plan::planner_time_range_expression::time_range_to_df_expr;
use crate::plan::rewriter::{find_table_names, rewrite_statement, ProjectionType};
use crate::plan::udaf::{DIFFERENCE, MOVING_AVERAGE, NON_NEGATIVE_DIFFERENCE};
use crate::plan::udf::{difference, find_window_udfs, moving_average, non_negative_difference};
use crate::plan::udaf::{
derivative_udf, non_negative_derivative_udf, DIFFERENCE, MOVING_AVERAGE,
NON_NEGATIVE_DIFFERENCE,
};
use crate::plan::udf::{
derivative, difference, find_window_udfs, moving_average, non_negative_derivative,
non_negative_difference,
};
use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, Schemas};
use crate::plan::var_ref::var_ref_data_type_to_data_type;
use crate::plan::{error, planner_rewrite_expression, udf, util_copy};
@ -1092,7 +1098,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
let window_func_exprs = udfs
.clone()
.into_iter()
.map(|e| Self::udf_to_expr(e, partition_by.clone(), order_by.clone()))
.map(|e| Self::udf_to_expr(ctx, e, partition_by.clone(), order_by.clone()))
.collect::<Result<Vec<_>>>()?;
let plan = LogicalPlanBuilder::from(input)
@ -1118,7 +1124,12 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
}
/// Transform a UDF to a window expression.
fn udf_to_expr(e: Expr, partition_by: Vec<Expr>, order_by: Vec<Expr>) -> Result<Expr> {
fn udf_to_expr(
ctx: &Context<'_>,
e: Expr,
partition_by: Vec<Expr>,
order_by: Vec<Expr>,
) -> Result<Expr> {
let alias = e
.display_name()
// display_name is known only to fail with Expr::Sort and Expr::QualifiedWildcard,
@ -1129,6 +1140,20 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
return error::internal(format!("udf_to_expr: unexpected expression: {e}"))
};
fn derivative_unit(ctx: &Context<'_>, args: &Vec<Expr>) -> Result<i64> {
if args.len() > 1 {
if let Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(v))) = args[1] {
Ok(v as i64)
} else {
error::internal(format!("udf_to_expr: unexpected expression: {}", args[1]))
}
} else if let Some(interval) = ctx.interval {
Ok(interval.duration)
} else {
Ok(1000000000) // 1s
}
}
match udf::WindowFunction::try_from_scalar_udf(Arc::clone(&fun)) {
Some(udf::WindowFunction::MovingAverage) => Ok(Expr::WindowFunction(WindowFunction {
fun: window_function::WindowFunction::AggregateUDF(MOVING_AVERAGE.clone()),
@ -1170,6 +1195,36 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
})
.alias(alias))
}
Some(udf::WindowFunction::Derivative) => Ok(Expr::WindowFunction(WindowFunction {
fun: window_function::WindowFunction::AggregateUDF(
derivative_udf(derivative_unit(ctx, &args)?).into(),
),
args: vec!["time".as_expr(), args[0].clone()],
partition_by,
order_by,
window_frame: WindowFrame {
units: WindowFrameUnits::Rows,
start_bound: WindowFrameBound::Preceding(ScalarValue::Null),
end_bound: WindowFrameBound::CurrentRow,
},
})
.alias(alias)),
Some(udf::WindowFunction::NonNegativeDerivative) => {
Ok(Expr::WindowFunction(WindowFunction {
fun: window_function::WindowFunction::AggregateUDF(
non_negative_derivative_udf(derivative_unit(ctx, &args)?).into(),
),
args: vec!["time".as_expr(), args[0].clone()],
partition_by,
order_by,
window_frame: WindowFrame {
units: WindowFrameUnits::Rows,
start_bound: WindowFrameBound::Preceding(ScalarValue::Null),
end_bound: WindowFrameBound::CurrentRow,
},
})
.alias(alias))
}
None => error::internal(format!(
"unexpected user-defined window function: {}",
fun.name
@ -1447,7 +1502,9 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
Some(v.timestamp_nanos()),
None,
))),
Literal::Duration(_) => error::not_implemented("duration literal"),
Literal::Duration(v) => {
Ok(lit(ScalarValue::IntervalMonthDayNano(Some((**v).into()))))
}
Literal::Regex(re) => match scope {
// a regular expression in a projection list is unexpected,
// as it should have been expanded by the rewriter.
@ -1516,6 +1573,22 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
}
}
fn check_arg_count_range(
name: &str,
args: &[IQLExpr],
min: usize,
max: usize,
) -> Result<()> {
let got = args.len();
if got < min || got > max {
error::query(format!(
"invalid number of arguments for {name}: expected between {min} and {max}, got {got}"
))
} else {
Ok(())
}
}
let Call { name, args } = call;
match name.as_str() {
@ -1624,6 +1697,38 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
Ok(moving_average(vec![arg0, lit(arg1)]))
}
"derivative" => {
check_arg_count_range(name, args, 1, 2)?;
// arg0 should be a column or function
let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?;
if let Expr::Literal(ScalarValue::Null) = arg0 {
return Ok(arg0);
}
let mut eargs = vec![arg0];
if args.len() > 1 {
let arg1 = self.expr_to_df_expr(scope, &args[1], schemas)?;
eargs.push(arg1);
}
Ok(derivative(eargs))
}
"non_negative_derivative" => {
check_arg_count_range(name, args, 1, 2)?;
// arg0 should be a column or function
let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?;
if let Expr::Literal(ScalarValue::Null) = arg0 {
return Ok(arg0);
}
let mut eargs = vec![arg0];
if args.len() > 1 {
let arg1 = self.expr_to_df_expr(scope, &args[1], schemas)?;
eargs.push(arg1);
}
Ok(non_negative_derivative(eargs))
}
_ => error::query(format!("Invalid function '{name}'")),
}
}
@ -3491,6 +3596,58 @@ mod test {
assert_snapshot!(plan("SELECT MOVING_AVERAGE(MEAN(usage_idle), usage_system) FROM cpu GROUP BY TIME(10s)"), @"Error during planning: expected integer argument in moving_average()");
}
#[test]
fn test_derivative() {
// no aggregates
assert_snapshot!(plan("SELECT DERIVATIVE(usage_idle) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), derivative:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, derivative [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), derivative:Float64;N]
Filter: NOT derivative IS NULL [time:Timestamp(Nanosecond, None), derivative:Float64;N]
Projection: cpu.time AS time, derivative(cpu.usage_idle) AS derivative [time:Timestamp(Nanosecond, None), derivative:Float64;N]
WindowAggr: windowExpr=[[AggregateUDF { name: "derivative(unit: 1000000000)", signature: Signature { type_signature: OneOf([Exact([Timestamp(Nanosecond, None), Int64]), Exact([Timestamp(Nanosecond, None), UInt64]), Exact([Timestamp(Nanosecond, None), Float64])]), volatility: Immutable }, fun: "<FUNC>" }(cpu.time, cpu.usage_idle) ORDER BY [cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS derivative(cpu.usage_idle)]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, derivative(cpu.usage_idle):Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
// aggregate
assert_snapshot!(plan("SELECT DERIVATIVE(MEAN(usage_idle)) FROM cpu GROUP BY TIME(10s)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, derivative:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, derivative [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, derivative:Float64;N]
Filter: NOT derivative IS NULL [time:Timestamp(Nanosecond, None);N, derivative:Float64;N]
Projection: time, derivative(AVG(cpu.usage_idle)) AS derivative [time:Timestamp(Nanosecond, None);N, derivative:Float64;N]
WindowAggr: windowExpr=[[AggregateUDF { name: "derivative(unit: 10000000000)", signature: Signature { type_signature: OneOf([Exact([Timestamp(Nanosecond, None), Int64]), Exact([Timestamp(Nanosecond, None), UInt64]), Exact([Timestamp(Nanosecond, None), Float64])]), volatility: Immutable }, fun: "<FUNC>" }(time, AVG(cpu.usage_idle)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS derivative(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, derivative(AVG(cpu.usage_idle)):Float64;N]
GapFill: groupBy=[[time]], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
#[test]
fn test_non_negative_derivative() {
// no aggregates
assert_snapshot!(plan("SELECT NON_NEGATIVE_DERIVATIVE(usage_idle) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), non_negative_derivative:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, non_negative_derivative [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), non_negative_derivative:Float64;N]
Filter: NOT non_negative_derivative IS NULL [time:Timestamp(Nanosecond, None), non_negative_derivative:Float64;N]
Projection: cpu.time AS time, non_negative_derivative(cpu.usage_idle) AS non_negative_derivative [time:Timestamp(Nanosecond, None), non_negative_derivative:Float64;N]
WindowAggr: windowExpr=[[AggregateUDF { name: "non_negative_derivative(unit: 1000000000)", signature: Signature { type_signature: OneOf([Exact([Timestamp(Nanosecond, None), Int64]), Exact([Timestamp(Nanosecond, None), UInt64]), Exact([Timestamp(Nanosecond, None), Float64])]), volatility: Immutable }, fun: "<FUNC>" }(cpu.time, cpu.usage_idle) ORDER BY [cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS non_negative_derivative(cpu.usage_idle)]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, non_negative_derivative(cpu.usage_idle):Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
// aggregate
assert_snapshot!(plan("SELECT NON_NEGATIVE_DERIVATIVE(MEAN(usage_idle)) FROM cpu GROUP BY TIME(10s)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, non_negative_derivative:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, non_negative_derivative [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, non_negative_derivative:Float64;N]
Filter: NOT non_negative_derivative IS NULL [time:Timestamp(Nanosecond, None);N, non_negative_derivative:Float64;N]
Projection: time, non_negative_derivative(AVG(cpu.usage_idle)) AS non_negative_derivative [time:Timestamp(Nanosecond, None);N, non_negative_derivative:Float64;N]
WindowAggr: windowExpr=[[AggregateUDF { name: "non_negative_derivative(unit: 10000000000)", signature: Signature { type_signature: OneOf([Exact([Timestamp(Nanosecond, None), Int64]), Exact([Timestamp(Nanosecond, None), UInt64]), Exact([Timestamp(Nanosecond, None), Float64])]), volatility: Immutable }, fun: "<FUNC>" }(time, AVG(cpu.usage_idle)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS non_negative_derivative(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, non_negative_derivative(AVG(cpu.usage_idle)):Float64;N]
GapFill: groupBy=[[time]], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(TimestampNanosecond(1672531200000000000, None)) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
#[test]
fn test_not_implemented() {
assert_snapshot!(plan("SELECT DIFFERENCE(MEAN(usage_idle)), MEAN(usage_idle) FROM cpu GROUP BY TIME(10s)"), @"This feature is not implemented: mixed window-aggregate and aggregate columns, such as DIFFERENCE(MEAN(col)), MEAN(col)");

View File

@ -1,12 +1,13 @@
use crate::plan::error;
use arrow::array::{Array, ArrayRef, Int64Array};
use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, TimeUnit};
use datafusion::common::{downcast_value, DataFusionError, Result, ScalarValue};
use datafusion::logical_expr::{
Accumulator, AccumulatorFunctionImplementation, AggregateUDF, ReturnTypeFunction, Signature,
StateTypeFunction, TypeSignature, Volatility,
};
use once_cell::sync::Lazy;
use std::mem::replace;
use std::sync::Arc;
/// A list of the numeric types supported by InfluxQL that can be be used
@ -247,8 +248,11 @@ pub(crate) const NON_NEGATIVE_DIFFERENCE_NAME: &str = "non_negative_difference";
/// Definition of the `NON_NEGATIVE_DIFFERENCE` user-defined aggregate function.
pub(crate) static NON_NEGATIVE_DIFFERENCE: Lazy<Arc<AggregateUDF>> = Lazy::new(|| {
let return_type: ReturnTypeFunction = Arc::new(|dt| Ok(Arc::new(dt[0].clone())));
let accumulator: AccumulatorFunctionImplementation =
Arc::new(|dt| Ok(Box::new(NonNegativeDifferenceAccumulator::new(dt))));
let accumulator: AccumulatorFunctionImplementation = Arc::new(|dt| {
Ok(Box::new(NonNegative::<_>::new(DifferenceAccumulator::new(
dt,
))))
});
let state_type: StateTypeFunction = Arc::new(|_| Ok(Arc::new(vec![])));
Arc::new(AggregateUDF::new(
NON_NEGATIVE_DIFFERENCE_NAME,
@ -266,37 +270,30 @@ pub(crate) static NON_NEGATIVE_DIFFERENCE: Lazy<Arc<AggregateUDF>> = Lazy::new(|
))
});
/// NonNegative is a wrapper around an Accumulator that transposes
/// negative value to be NULL.
#[derive(Debug)]
struct NonNegativeDifferenceAccumulator {
acc: DifferenceAccumulator,
struct NonNegative<T> {
acc: T,
}
impl NonNegativeDifferenceAccumulator {
fn new(data_type: &DataType) -> Self {
let acc = DifferenceAccumulator::new(data_type);
impl<T> NonNegative<T> {
fn new(acc: T) -> Self {
Self { acc }
}
}
impl Accumulator for NonNegativeDifferenceAccumulator {
/// `state` is only called when used as an aggregate function. It can be
/// can safely left unimplemented, as this accumulator is only used as a window aggregate.
///
/// See: <https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.Accumulator.html#tymethod.state>
impl<T: Accumulator> Accumulator for NonNegative<T> {
fn state(&self) -> Result<Vec<ScalarValue>> {
error::internal("unexpected call to NonNegativeDifferenceAccumulator::state")
self.acc.state()
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
self.acc.update_batch(values)
}
/// `merge_batch` is only called when used as an aggregate function. It can be
/// can safely left unimplemented, as this accumulator is only used as a window aggregate.
///
/// See: <https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.Accumulator.html#tymethod.state>
fn merge_batch(&mut self, _states: &[ArrayRef]) -> Result<()> {
error::internal("unexpected call to NonNegativeDifferenceAccumulator::merge_batch")
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
self.acc.merge_batch(states)
}
fn evaluate(&self) -> Result<ScalarValue> {
@ -311,3 +308,183 @@ impl Accumulator for NonNegativeDifferenceAccumulator {
self.acc.size()
}
}
/// Name of the `DERIVATIVE` user-defined aggregate function.
pub(crate) const DERIVATIVE_NAME: &str = "derivative";
pub(crate) fn derivative_udf(unit: i64) -> AggregateUDF {
let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64)));
let accumulator: AccumulatorFunctionImplementation =
Arc::new(move |_| Ok(Box::new(DerivativeAccumulator::new(unit))));
let state_type: StateTypeFunction = Arc::new(|_| Ok(Arc::new(vec![])));
let sig = Signature::one_of(
NUMERICS
.iter()
.map(|dt| {
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
dt.clone(),
])
})
.collect(),
Volatility::Immutable,
);
AggregateUDF::new(
format!("{DERIVATIVE_NAME}(unit: {unit})").as_str(),
&sig,
&return_type,
&accumulator,
// State shouldn't be called, so no schema to report
&state_type,
)
}
/// Name of the `NON_NEGATIVE_DERIVATIVE` user-defined aggregate function.
pub(crate) const NON_NEGATIVE_DERIVATIVE_NAME: &str = "non_negative_derivative";
pub(crate) fn non_negative_derivative_udf(unit: i64) -> AggregateUDF {
let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64)));
let accumulator: AccumulatorFunctionImplementation = Arc::new(move |_| {
Ok(Box::new(NonNegative::<_>::new(DerivativeAccumulator::new(
unit,
))))
});
let state_type: StateTypeFunction = Arc::new(|_| Ok(Arc::new(vec![])));
let sig = Signature::one_of(
NUMERICS
.iter()
.map(|dt| {
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
dt.clone(),
])
})
.collect(),
Volatility::Immutable,
);
AggregateUDF::new(
format!("{NON_NEGATIVE_DERIVATIVE_NAME}(unit: {unit})").as_str(),
&sig,
&return_type,
&accumulator,
// State shouldn't be called, so no schema to report
&state_type,
)
}
#[derive(Debug)]
pub(super) struct DerivativeAccumulator {
unit: i64,
prev: Option<Point>,
curr: Option<Point>,
}
impl DerivativeAccumulator {
fn new(unit: i64) -> Self {
Self {
unit,
prev: None,
curr: None,
}
}
}
impl Accumulator for DerivativeAccumulator {
/// `state` is only called when used as an aggregate function. It can be
/// can safely left unimplemented, as this accumulator is only used as a window aggregate.
///
/// See: <https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.Accumulator.html#tymethod.state>
fn state(&self) -> Result<Vec<ScalarValue>> {
error::internal("unexpected call to DerivativeAccumulator::state")
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
let times = &values[0];
let arr = &values[1];
for index in 0..arr.len() {
let time = match ScalarValue::try_from_array(times, index)? {
ScalarValue::TimestampNanosecond(Some(ts), _) => ts,
v => {
return Err(DataFusionError::Internal(format!(
"invalid time value: {}",
v
)))
}
};
let curr = Point::new(time, ScalarValue::try_from_array(arr, index)?);
let prev = replace(&mut self.curr, curr);
// don't replace the previous value if the current value has the same timestamp.
if self.prev.is_none()
|| prev
.as_ref()
.is_some_and(|prev| prev.time > self.prev.as_ref().unwrap().time)
{
self.prev = prev
}
}
Ok(())
}
/// `merge_batch` is only called when used as an aggregate function. It can be
/// can safely left unimplemented, as this accumulator is only used as a window aggregate.
///
/// See: <https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.Accumulator.html#tymethod.state>
fn merge_batch(&mut self, _states: &[ArrayRef]) -> Result<()> {
error::internal("unexpected call to DerivativeAccumulator::merge_batch")
}
fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::Float64(
self.curr
.as_ref()
.and_then(|c| c.derivative(self.prev.as_ref(), self.unit)),
))
}
fn size(&self) -> usize {
std::mem::size_of_val(self)
}
}
#[derive(Debug)]
struct Point {
time: i64,
value: ScalarValue,
}
impl Point {
fn new(time: i64, value: ScalarValue) -> Option<Self> {
if value.is_null() {
None
} else {
Some(Self { time, value })
}
}
fn value_as_f64(&self) -> f64 {
match self.value {
ScalarValue::Int64(Some(v)) => v as f64,
ScalarValue::Float64(Some(v)) => v,
ScalarValue::UInt64(Some(v)) => v as f64,
_ => panic!("invalid point {:?}", self),
}
}
fn derivative(&self, prev: Option<&Self>, unit: i64) -> Option<f64> {
prev.and_then(|prev| {
let diff = self.value_as_f64() - prev.value_as_f64();
let elapsed = match self.time - prev.time {
// if the time hasn't changed then it is a NULL.
0 => return None,
v => v,
} as f64;
let devisor = elapsed / (unit as f64);
Some(diff / devisor)
})
}
}

View File

@ -20,6 +20,8 @@ pub(super) enum WindowFunction {
MovingAverage,
Difference,
NonNegativeDifference,
Derivative,
NonNegativeDerivative,
}
impl WindowFunction {
@ -29,6 +31,8 @@ impl WindowFunction {
MOVING_AVERAGE_UDF_NAME => Some(Self::MovingAverage),
DIFFERENCE_UDF_NAME => Some(Self::Difference),
NON_NEGATIVE_DIFFERENCE_UDF_NAME => Some(Self::NonNegativeDifference),
DERIVATIVE_UDF_NAME => Some(Self::Derivative),
NON_NEGATIVE_DERIVATIVE_UDF_NAME => Some(Self::NonNegativeDerivative),
_ => None,
}
}
@ -117,6 +121,54 @@ static NON_NEGATIVE_DIFFERENCE: Lazy<Arc<ScalarUDF>> = Lazy::new(|| {
))
});
const DERIVATIVE_UDF_NAME: &str = "derivative";
/// Create an expression to represent the `DERIVATIVE` function.
pub(crate) fn derivative(args: Vec<Expr>) -> Expr {
DERIVATIVE.call(args)
}
/// Definition of the `DERIVATIVE` function.
static DERIVATIVE: Lazy<Arc<ScalarUDF>> = Lazy::new(|| {
let return_type_fn: ReturnTypeFunction = Arc::new(|args| Ok(Arc::new(args[0].clone())));
Arc::new(ScalarUDF::new(
DERIVATIVE_UDF_NAME,
&Signature::one_of(
NUMERICS
.iter()
.map(|dt| TypeSignature::Exact(vec![dt.clone()]))
.collect(),
Volatility::Immutable,
),
&return_type_fn,
&stand_in_impl(DERIVATIVE_UDF_NAME),
))
});
const NON_NEGATIVE_DERIVATIVE_UDF_NAME: &str = "non_negative_derivative";
/// Create an expression to represent the `NON_NEGATIVE_DERIVATIVE` function.
pub(crate) fn non_negative_derivative(args: Vec<Expr>) -> Expr {
NON_NEGATIVE_DERIVATIVE.call(args)
}
/// Definition of the `NON_NEGATIVE_DERIVATIVE` function.
static NON_NEGATIVE_DERIVATIVE: Lazy<Arc<ScalarUDF>> = Lazy::new(|| {
let return_type_fn: ReturnTypeFunction = Arc::new(|args| Ok(Arc::new(args[0].clone())));
Arc::new(ScalarUDF::new(
NON_NEGATIVE_DERIVATIVE_UDF_NAME,
&Signature::one_of(
NUMERICS
.iter()
.map(|dt| TypeSignature::Exact(vec![dt.clone()]))
.collect(),
Volatility::Immutable,
),
&return_type_fn,
&stand_in_impl(NON_NEGATIVE_DERIVATIVE_UDF_NAME),
))
});
/// Returns an implementation that always returns an error.
fn stand_in_impl(name: &'static str) -> ScalarFunctionImplementation {
Arc::new(move |_| error::internal(format!("{name} should not exist in the final logical plan")))