feat(influxql): CUMULATIVE_SUM window function (#8248)

* feat(influxql): CUMULATIVE_SUM window function

Implement the InfluxQL CUMULATIVE_SUM window function. This is
implemented as described in
https://docs.influxdata.com/influxdb/v1.8/query_language/functions/#cumulative_sum.

* chore: Add a test demonstrating NULL handling of CUMULATIVE_SUM

---------

Co-authored-by: Stuart Carnie <stuart.carnie@gmail.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Martin Hilton 2023-07-18 07:13:58 +01:00 committed by GitHub
parent 33e41fc5cb
commit d1640bb926
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 357 additions and 11 deletions

View File

@ -147,4 +147,24 @@ SELECT non_negative_derivative(mean(writes)) FROM diskio WHERE time >= 000000013
SELECT non_negative_derivative(mean(writes), 500ms) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear);
-- group by time and a tag
SELECT non_negative_derivative(mean(usage_idle)) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu;
SELECT non_negative_derivative(mean(usage_idle), 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu;
SELECT non_negative_derivative(mean(usage_idle), 500ms) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu;
--
-- cumulative_sum
--
SELECT cumulative_sum(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001;
SELECT cumulative_sum(usage_system) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu0';
SELECT cumulative_sum(usage_idle), cumulative_sum(usage_system) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu0';
SELECT cumulative_sum(usage_idle) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu;
--
-- cumulative_sum + aggregate
--
SELECT cumulative_sum(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s);
SELECT cumulative_sum(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s);
-- the input data is regular data at 10s intervals, so 7s windows ensure the `mean` generates windows with NULL values to test NULL handling of cumulative_sum
SELECT cumulative_sum(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0);
SELECT cumulative_sum(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous);
SELECT cumulative_sum(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear);
-- group by time and a tag
SELECT cumulative_sum(mean(usage_idle)) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu;

View File

@ -917,4 +917,180 @@ tags: cpu=cpu1
| time | non_negative_derivative |
+---------------------+-------------------------+
| 1970-01-01T00:02:00 | 0.006111111111111237 |
+---------------------+-------------------------+
+---------------------+-------------------------+
-- InfluxQL: SELECT cumulative_sum(writes) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001;
name: diskio
+---------------------+----------------+
| time | cumulative_sum |
+---------------------+----------------+
| 1970-01-01T00:02:10 | 5592646 |
| 1970-01-01T00:02:20 | 11185456 |
| 1970-01-01T00:02:30 | 16778453 |
| 1970-01-01T00:02:40 | 22371562 |
| 1970-01-01T00:02:50 | 27964781 |
| 1970-01-01T00:03:00 | 33558219 |
| 1970-01-01T00:03:10 | 39151732 |
| 1970-01-01T00:03:20 | 44745321 |
| 1970-01-01T00:03:30 | 50339056 |
+---------------------+----------------+
-- InfluxQL: SELECT cumulative_sum(usage_system) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu0';
name: cpu
+---------------------+----------------+
| time | cumulative_sum |
+---------------------+----------------+
| 1970-01-01T00:01:00 | 89.5 |
| 1970-01-01T00:01:10 | 178.1 |
| 1970-01-01T00:01:30 | 261.5 |
| 1970-01-01T00:01:40 | 349.2 |
| 1970-01-01T00:02:10 | 439.0 |
| 1970-01-01T00:02:50 | 528.8 |
| 1970-01-01T00:03:00 | 618.8 |
+---------------------+----------------+
-- InfluxQL: SELECT cumulative_sum(usage_idle), cumulative_sum(usage_system) FROM cpu WHERE time >= 0000000060000000000 AND time < 0000000210000000001 AND cpu = 'cpu0';
name: cpu
+---------------------+--------------------+------------------+
| time | cumulative_sum | cumulative_sum_1 |
+---------------------+--------------------+------------------+
| 1970-01-01T00:01:00 | 89.5 | 89.5 |
| 1970-01-01T00:01:10 | 178.1 | 178.1 |
| 1970-01-01T00:01:20 | 266.7 | |
| 1970-01-01T00:01:30 | 350.1 | 261.5 |
| 1970-01-01T00:01:40 | 437.8 | 349.2 |
| 1970-01-01T00:01:50 | 526.5 | |
| 1970-01-01T00:02:00 | 613.4 | |
| 1970-01-01T00:02:10 | 703.1999999999999 | 439.0 |
| 1970-01-01T00:02:20 | 792.1999999999999 | |
| 1970-01-01T00:02:30 | 882.5999999999999 | |
| 1970-01-01T00:02:40 | 972.8 | |
| 1970-01-01T00:02:50 | 1062.6 | 528.8 |
| 1970-01-01T00:03:00 | 1152.6 | 618.8 |
| 1970-01-01T00:03:10 | 1241.3999999999999 | |
+---------------------+--------------------+------------------+
-- InfluxQL: SELECT cumulative_sum(usage_idle) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY cpu;
name: cpu
tags: cpu=cpu0
+---------------------+--------------------+
| time | cumulative_sum |
+---------------------+--------------------+
| 1970-01-01T00:02:10 | 89.8 |
| 1970-01-01T00:02:20 | 178.8 |
| 1970-01-01T00:02:30 | 269.20000000000005 |
| 1970-01-01T00:02:40 | 359.40000000000003 |
| 1970-01-01T00:02:50 | 449.20000000000005 |
| 1970-01-01T00:03:00 | 539.2 |
| 1970-01-01T00:03:10 | 628.0 |
+---------------------+--------------------+
name: cpu
tags: cpu=cpu1
+---------------------+--------------------+
| time | cumulative_sum |
+---------------------+--------------------+
| 1970-01-01T00:02:10 | 99.8 |
| 1970-01-01T00:02:20 | 199.7 |
| 1970-01-01T00:02:30 | 299.6 |
| 1970-01-01T00:02:40 | 399.40000000000003 |
| 1970-01-01T00:02:50 | 499.20000000000005 |
| 1970-01-01T00:03:00 | 599.0 |
| 1970-01-01T00:03:10 | 698.8 |
+---------------------+--------------------+
-- InfluxQL: SELECT cumulative_sum(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s);
name: diskio
+---------------------+----------------+
| time | cumulative_sum |
+---------------------+----------------+
| 1970-01-01T00:02:06 | 5592646.0 |
| 1970-01-01T00:02:20 | 11185456.0 |
| 1970-01-01T00:02:27 | 16778453.0 |
| 1970-01-01T00:02:34 | 22371562.0 |
| 1970-01-01T00:02:48 | 27964781.0 |
| 1970-01-01T00:02:55 | 33558219.0 |
| 1970-01-01T00:03:09 | 39151732.0 |
| 1970-01-01T00:03:16 | 44745321.0 |
| 1970-01-01T00:03:30 | 50339056.0 |
+---------------------+----------------+
-- InfluxQL: SELECT cumulative_sum(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(30s);
name: diskio
+---------------------+--------------------+
| time | cumulative_sum |
+---------------------+--------------------+
| 1970-01-01T00:02:00 | 5592728.0 |
| 1970-01-01T00:02:30 | 11185836.333333332 |
| 1970-01-01T00:03:00 | 16779349.666666664 |
| 1970-01-01T00:03:30 | 22373084.666666664 |
+---------------------+--------------------+
-- InfluxQL: SELECT cumulative_sum(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(0);
name: diskio
+---------------------+----------------+
| time | cumulative_sum |
+---------------------+----------------+
| 1970-01-01T00:02:06 | 5592646.0 |
| 1970-01-01T00:02:13 | 5592646.0 |
| 1970-01-01T00:02:20 | 11185456.0 |
| 1970-01-01T00:02:27 | 16778453.0 |
| 1970-01-01T00:02:34 | 22371562.0 |
| 1970-01-01T00:02:41 | 22371562.0 |
| 1970-01-01T00:02:48 | 27964781.0 |
| 1970-01-01T00:02:55 | 33558219.0 |
| 1970-01-01T00:03:02 | 33558219.0 |
| 1970-01-01T00:03:09 | 39151732.0 |
| 1970-01-01T00:03:16 | 44745321.0 |
| 1970-01-01T00:03:23 | 44745321.0 |
| 1970-01-01T00:03:30 | 50339056.0 |
+---------------------+----------------+
-- InfluxQL: SELECT cumulative_sum(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(previous);
name: diskio
+---------------------+----------------+
| time | cumulative_sum |
+---------------------+----------------+
| 1970-01-01T00:02:06 | 5592646.0 |
| 1970-01-01T00:02:13 | 11185292.0 |
| 1970-01-01T00:02:20 | 16778102.0 |
| 1970-01-01T00:02:27 | 22371099.0 |
| 1970-01-01T00:02:34 | 27964208.0 |
| 1970-01-01T00:02:41 | 33557317.0 |
| 1970-01-01T00:02:48 | 39150536.0 |
| 1970-01-01T00:02:55 | 44743974.0 |
| 1970-01-01T00:03:02 | 50337412.0 |
| 1970-01-01T00:03:09 | 55930925.0 |
| 1970-01-01T00:03:16 | 61524514.0 |
| 1970-01-01T00:03:23 | 67118103.0 |
| 1970-01-01T00:03:30 | 72711838.0 |
+---------------------+----------------+
-- InfluxQL: SELECT cumulative_sum(mean(writes)) FROM diskio WHERE time >= 0000000130000000000 AND time < 0000000210000000001 GROUP BY time(7s) fill(linear);
name: diskio
+---------------------+----------------+
| time | cumulative_sum |
+---------------------+----------------+
| 1970-01-01T00:02:06 | 5592646.0 |
| 1970-01-01T00:02:13 | 11185374.0 |
| 1970-01-01T00:02:20 | 16778184.0 |
| 1970-01-01T00:02:27 | 22371181.0 |
| 1970-01-01T00:02:34 | 27964290.0 |
| 1970-01-01T00:02:41 | 33557454.0 |
| 1970-01-01T00:02:48 | 39150673.0 |
| 1970-01-01T00:02:55 | 44744111.0 |
| 1970-01-01T00:03:02 | 50337586.5 |
| 1970-01-01T00:03:09 | 55931099.5 |
| 1970-01-01T00:03:16 | 61524688.5 |
| 1970-01-01T00:03:23 | 67118350.5 |
| 1970-01-01T00:03:30 | 72712085.5 |
+---------------------+----------------+
-- InfluxQL: SELECT cumulative_sum(mean(usage_idle)) FROM cpu WHERE time >= 0000000130000000000 AND time < 0000000210000000001 AND cpu =~ /^cpu(0|1)$/ GROUP BY TIME(30s), cpu;
name: cpu
tags: cpu=cpu0
+---------------------+--------------------+
| time | cumulative_sum |
+---------------------+--------------------+
| 1970-01-01T00:02:00 | 89.4 |
| 1970-01-01T00:02:30 | 179.53333333333336 |
| 1970-01-01T00:03:00 | 268.9333333333334 |
+---------------------+--------------------+
name: cpu
tags: cpu=cpu1
+---------------------+--------------------+
| time | cumulative_sum |
+---------------------+--------------------+
| 1970-01-01T00:02:00 | 99.85 |
| 1970-01-01T00:02:30 | 199.68333333333334 |
| 1970-01-01T00:03:00 | 299.48333333333335 |
+---------------------+--------------------+

View File

@ -14,13 +14,13 @@ use crate::plan::udaf::{
NON_NEGATIVE_DIFFERENCE,
};
use crate::plan::udf::{
derivative, difference, find_window_udfs, moving_average, non_negative_derivative,
non_negative_difference,
cumulative_sum, derivative, difference, find_window_udfs, moving_average,
non_negative_derivative, non_negative_difference,
};
use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, Schemas};
use crate::plan::var_ref::var_ref_data_type_to_data_type;
use crate::plan::{planner_rewrite_expression, udf, util_copy};
use crate::window::PERCENT_ROW_NUMBER;
use crate::window::{CUMULATIVE_SUM, PERCENT_ROW_NUMBER};
use arrow::array::{StringBuilder, StringDictionaryBuilder};
use arrow::datatypes::{DataType, Field as ArrowField, Int32Type, Schema as ArrowSchema};
use arrow::record_batch::RecordBatch;
@ -1535,6 +1535,18 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
})
.alias(alias))
}
Some(udf::WindowFunction::CumulativeSum) => Ok(Expr::WindowFunction(WindowFunction {
fun: CUMULATIVE_SUM.clone(),
args,
partition_by,
order_by,
window_frame: WindowFrame {
units: WindowFrameUnits::Rows,
start_bound: WindowFrameBound::Preceding(ScalarValue::Null),
end_bound: WindowFrameBound::Following(ScalarValue::Null),
},
})
.alias(alias)),
None => error::internal(format!(
"unexpected user-defined window function: {}",
fun.name
@ -2067,6 +2079,17 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
Ok(non_negative_derivative(eargs))
}
"cumulative_sum" => {
check_arg_count(name, args, 1)?;
// arg0 should be a column or function
let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?;
if let Expr::Literal(ScalarValue::Null) = arg0 {
return Ok(arg0);
}
Ok(cumulative_sum(vec![arg0]))
}
// The TOP/BOTTOM function is handled as a `ProjectionType::TopBottomSelector`
// query, so the planner only needs to project the single column
// argument.
@ -4011,6 +4034,32 @@ mod test {
"###);
}
#[test]
fn test_cumulative_sum() {
// no aggregates
assert_snapshot!(plan("SELECT CUMULATIVE_SUM(usage_idle) FROM cpu"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cumulative_sum:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, cumulative_sum [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), cumulative_sum:Float64;N]
Filter: NOT cumulative_sum IS NULL [time:Timestamp(Nanosecond, None), cumulative_sum:Float64;N]
Projection: cpu.time AS time, cumulative_sum(cpu.usage_idle) AS cumulative_sum [time:Timestamp(Nanosecond, None), cumulative_sum:Float64;N]
WindowAggr: windowExpr=[[cumumlative_sum(cpu.usage_idle) ORDER BY [cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS cumulative_sum(cpu.usage_idle)]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, cumulative_sum(cpu.usage_idle):Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
// aggregate
assert_snapshot!(plan("SELECT CUMULATIVE_SUM(MEAN(usage_idle)) FROM cpu GROUP BY TIME(10s)"), @r###"
Sort: time ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, cumulative_sum:Float64;N]
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, cumulative_sum [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, cumulative_sum:Float64;N]
Filter: NOT cumulative_sum IS NULL [time:Timestamp(Nanosecond, None);N, cumulative_sum:Float64;N]
Projection: time, cumulative_sum(AVG(cpu.usage_idle)) AS cumulative_sum [time:Timestamp(Nanosecond, None);N, cumulative_sum:Float64;N]
WindowAggr: windowExpr=[[cumumlative_sum(AVG(cpu.usage_idle)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS cumulative_sum(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, cumulative_sum(AVG(cpu.usage_idle)):Float64;N]
GapFill: groupBy=[time], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
}
#[test]
fn test_not_implemented() {
assert_snapshot!(plan("SELECT DIFFERENCE(MEAN(usage_idle)), MEAN(usage_idle) FROM cpu GROUP BY TIME(10s)"), @"This feature is not implemented: mixed window-aggregate and aggregate columns, such as DIFFERENCE(MEAN(col)), MEAN(col)");

View File

@ -1338,11 +1338,8 @@ impl FieldChecker {
}
fn check_cumulative_sum(&mut self, args: &[Expr]) -> Result<()> {
self.inc_aggregate_count();
self.inc_window_count();
check_exp_args!("cumulative_sum", 1, args);
set_extra_intervals!(self, 1);
self.check_nested_symbol("cumulative_sum", &args[0])
}

View File

@ -21,6 +21,7 @@ pub(super) enum WindowFunction {
NonNegativeDifference,
Derivative,
NonNegativeDerivative,
CumulativeSum,
}
impl WindowFunction {
@ -32,6 +33,7 @@ impl WindowFunction {
NON_NEGATIVE_DIFFERENCE_UDF_NAME => Some(Self::NonNegativeDifference),
DERIVATIVE_UDF_NAME => Some(Self::Derivative),
NON_NEGATIVE_DERIVATIVE_UDF_NAME => Some(Self::NonNegativeDerivative),
CUMULATIVE_SUM_UDF_NAME => Some(Self::CumulativeSum),
_ => None,
}
}
@ -168,6 +170,29 @@ static NON_NEGATIVE_DERIVATIVE: Lazy<Arc<ScalarUDF>> = Lazy::new(|| {
))
});
const CUMULATIVE_SUM_UDF_NAME: &str = "cumulative_sum";
/// Create an expression to represent the `CUMULATIVE_SUM` function.
pub(crate) fn cumulative_sum(args: Vec<Expr>) -> Expr {
CUMULATIVE_SUM.call(args)
}
/// Definition of the `CUMULATIVE_SUM` function.
static CUMULATIVE_SUM: Lazy<Arc<ScalarUDF>> = Lazy::new(|| {
let return_type_fn: ReturnTypeFunction = Arc::new(|args| Ok(Arc::new(args[0].clone())));
Arc::new(ScalarUDF::new(
CUMULATIVE_SUM_UDF_NAME,
&Signature::one_of(
NUMERICS
.iter()
.map(|dt| TypeSignature::Exact(vec![dt.clone()]))
.collect(),
Volatility::Immutable,
),
&return_type_fn,
&stand_in_impl(CUMULATIVE_SUM_UDF_NAME),
))
});
/// Returns an implementation that always returns an error.
fn stand_in_impl(name: &'static str) -> ScalarFunctionImplementation {
Arc::new(move |_| error::internal(format!("{name} should not exist in the final logical plan")))

View File

@ -6,8 +6,23 @@ use datafusion::logical_expr::{
use once_cell::sync::Lazy;
use std::sync::Arc;
mod cumulative_sum;
mod percent_row_number;
/// Definition of the `CUMULATIVE_SUM` user-defined window function.
pub(crate) static CUMULATIVE_SUM: Lazy<WindowFunction> = Lazy::new(|| {
let return_type: ReturnTypeFunction = Arc::new(cumulative_sum::return_type);
let partition_evaluator_factory: PartitionEvaluatorFactory =
Arc::new(cumulative_sum::partition_evaluator_factory);
WindowFunction::WindowUDF(Arc::new(WindowUDF::new(
cumulative_sum::NAME,
&cumulative_sum::SIGNATURE,
&return_type,
&partition_evaluator_factory,
)))
});
/// Definition of the `PERCENT_ROW_NUMBER` user-defined window function.
pub(crate) static PERCENT_ROW_NUMBER: Lazy<WindowFunction> = Lazy::new(|| {
let return_type: ReturnTypeFunction = Arc::new(percent_row_number::return_type);

View File

@ -0,0 +1,64 @@
use crate::NUMERICS;
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::DataType;
use datafusion::common::{Result, ScalarValue};
use datafusion::logical_expr::{PartitionEvaluator, Signature, TypeSignature, Volatility};
use once_cell::sync::Lazy;
use std::sync::Arc;
/// The name of the cumulative_sum_number window function.
pub(super) const NAME: &str = "cumumlative_sum";
/// Valid signatures for the cumulative_sum window function.
pub(super) static SIGNATURE: Lazy<Signature> = Lazy::new(|| {
Signature::one_of(
NUMERICS
.iter()
.map(|dt| TypeSignature::Exact(vec![dt.clone()]))
.collect(),
Volatility::Immutable,
)
});
/// Calculate the return type given the function signature.
pub(super) fn return_type(sig: &[DataType]) -> Result<Arc<DataType>> {
Ok(Arc::new(sig[0].clone()))
}
/// Create a new partition_evaluator_factory.
pub(super) fn partition_evaluator_factory() -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(CumulativeSumPartitionEvaluator {}))
}
/// PartitionEvaluator which returns the cumulative sum of the input.
#[derive(Debug)]
struct CumulativeSumPartitionEvaluator {}
impl PartitionEvaluator for CumulativeSumPartitionEvaluator {
fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<Arc<dyn Array>> {
assert_eq!(values.len(), 1);
let array = Arc::clone(&values[0]);
let mut sum = ScalarValue::new_zero(array.data_type())?;
let mut cumulative: Vec<ScalarValue> = vec![];
for idx in 0..num_rows {
let v = ScalarValue::try_from_array(&array, idx)?;
let res = if v.is_null() {
v
} else {
sum = sum.add(&v)?;
sum.clone()
};
cumulative.push(res);
}
Ok(Arc::new(ScalarValue::iter_to_array(cumulative)?))
}
fn uses_window_frame(&self) -> bool {
false
}
fn include_rank(&self) -> bool {
false
}
}

View File

@ -6,10 +6,10 @@ use datafusion::logical_expr::{PartitionEvaluator, Signature, TypeSignature, Vol
use once_cell::sync::Lazy;
use std::sync::Arc;
/// The name of the percent_row_number aggregate function.
/// The name of the percent_row_number window function.
pub(super) const NAME: &str = "percent_row_number";
/// Valid signatures for the percent_row_number aggregate function.
/// Valid signatures for the percent_row_number window function.
pub(super) static SIGNATURE: Lazy<Signature> = Lazy::new(|| {
Signature::one_of(
vec![