refactor(influxql): make DERIVATIVE a user-defined window function (#8265)

Now that user-defined window functions are available, change the
DERIVATIVE and NON_NEGATIVE_DERIVATIVE function implementations to
use user-defined windows functions. This should improve performance
by allowing the entire window to be processed in one go, rather
than processing one row at a time.

The implementation is also moved out of the planner module alongside
the other user-defined window functions.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Martin Hilton 2023-07-20 07:19:02 +01:00 committed by GitHub
parent b3fca844d5
commit 2f2fcb6f05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 209 additions and 247 deletions

View File

@ -9,7 +9,7 @@ use crate::plan::planner::select::{
};
use crate::plan::planner_time_range_expression::time_range_to_df_expr;
use crate::plan::rewriter::{find_table_names, rewrite_statement, ProjectionType};
use crate::plan::udaf::{derivative_udf, non_negative_derivative_udf, MOVING_AVERAGE};
use crate::plan::udaf::MOVING_AVERAGE;
use crate::plan::udf::{
cumulative_sum, derivative, difference, find_window_udfs, moving_average,
non_negative_derivative, non_negative_difference,
@ -17,7 +17,10 @@ use crate::plan::udf::{
use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, IQLSchema};
use crate::plan::var_ref::var_ref_data_type_to_data_type;
use crate::plan::{planner_rewrite_expression, udf, util_copy};
use crate::window::{CUMULATIVE_SUM, DIFFERENCE, NON_NEGATIVE_DIFFERENCE, PERCENT_ROW_NUMBER};
use crate::window::{
CUMULATIVE_SUM, DERIVATIVE, DIFFERENCE, NON_NEGATIVE_DERIVATIVE, NON_NEGATIVE_DIFFERENCE,
PERCENT_ROW_NUMBER,
};
use arrow::array::{StringBuilder, StringDictionaryBuilder};
use arrow::datatypes::{DataType, Field as ArrowField, Int32Type, Schema as ArrowSchema};
use arrow::record_batch::RecordBatch;
@ -1444,17 +1447,17 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
return error::internal(format!("udf_to_expr: unexpected expression: {e}"))
};
fn derivative_unit(ctx: &Context<'_>, args: &Vec<Expr>) -> Result<i64> {
fn derivative_unit(ctx: &Context<'_>, args: &Vec<Expr>) -> Result<ScalarValue> {
if args.len() > 1 {
if let Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(v))) = args[1] {
Ok(v as i64)
if let Expr::Literal(v) = &args[1] {
Ok(v.clone())
} else {
error::internal(format!("udf_to_expr: unexpected expression: {}", args[1]))
}
} else if let Some(interval) = ctx.interval {
Ok(interval.duration)
Ok(ScalarValue::new_interval_mdn(0, 0, interval.duration))
} else {
Ok(1000000000) // 1s
Ok(ScalarValue::new_interval_mdn(0, 0, 1_000_000_000)) // 1s
}
}
@ -1498,31 +1501,35 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
.alias(alias))
}
Some(udf::WindowFunction::Derivative) => Ok(Expr::WindowFunction(WindowFunction {
fun: window_function::WindowFunction::AggregateUDF(
derivative_udf(derivative_unit(ctx, &args)?).into(),
),
args: vec!["time".as_expr(), args[0].clone()],
fun: DERIVATIVE.clone(),
args: vec![
args[0].clone(),
lit(derivative_unit(ctx, &args)?),
"time".as_expr(),
],
partition_by,
order_by,
window_frame: WindowFrame {
units: WindowFrameUnits::Rows,
start_bound: WindowFrameBound::Preceding(ScalarValue::Null),
end_bound: WindowFrameBound::CurrentRow,
end_bound: WindowFrameBound::Following(ScalarValue::Null),
},
})
.alias(alias)),
Some(udf::WindowFunction::NonNegativeDerivative) => {
Ok(Expr::WindowFunction(WindowFunction {
fun: window_function::WindowFunction::AggregateUDF(
non_negative_derivative_udf(derivative_unit(ctx, &args)?).into(),
),
args: vec!["time".as_expr(), args[0].clone()],
fun: NON_NEGATIVE_DERIVATIVE.clone(),
args: vec![
args[0].clone(),
lit(derivative_unit(ctx, &args)?),
"time".as_expr(),
],
partition_by,
order_by,
window_frame: WindowFrame {
units: WindowFrameUnits::Rows,
start_bound: WindowFrameBound::Preceding(ScalarValue::Null),
end_bound: WindowFrameBound::CurrentRow,
end_bound: WindowFrameBound::Following(ScalarValue::Null),
},
})
.alias(alias))
@ -3969,7 +3976,7 @@ mod test {
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, derivative [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), derivative:Float64;N]
Filter: NOT derivative IS NULL [time:Timestamp(Nanosecond, None), derivative:Float64;N]
Projection: cpu.time AS time, derivative(cpu.usage_idle) AS derivative [time:Timestamp(Nanosecond, None), derivative:Float64;N]
WindowAggr: windowExpr=[[AggregateUDF { name: "derivative(unit: 1000000000)", signature: Signature { type_signature: OneOf([Exact([Timestamp(Nanosecond, None), Int64]), Exact([Timestamp(Nanosecond, None), UInt64]), Exact([Timestamp(Nanosecond, None), Float64])]), volatility: Immutable }, fun: "<FUNC>" }(cpu.time, cpu.usage_idle) ORDER BY [cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS derivative(cpu.usage_idle)]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, derivative(cpu.usage_idle):Float64;N]
WindowAggr: windowExpr=[[derivative(cpu.usage_idle, IntervalMonthDayNano("1000000000"), cpu.time) ORDER BY [cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS derivative(cpu.usage_idle)]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, derivative(cpu.usage_idle):Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
@ -3979,7 +3986,7 @@ mod test {
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, derivative [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, derivative:Float64;N]
Filter: NOT derivative IS NULL [time:Timestamp(Nanosecond, None);N, derivative:Float64;N]
Projection: time, derivative(AVG(cpu.usage_idle)) AS derivative [time:Timestamp(Nanosecond, None);N, derivative:Float64;N]
WindowAggr: windowExpr=[[AggregateUDF { name: "derivative(unit: 10000000000)", signature: Signature { type_signature: OneOf([Exact([Timestamp(Nanosecond, None), Int64]), Exact([Timestamp(Nanosecond, None), UInt64]), Exact([Timestamp(Nanosecond, None), Float64])]), volatility: Immutable }, fun: "<FUNC>" }(time, AVG(cpu.usage_idle)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS derivative(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, derivative(AVG(cpu.usage_idle)):Float64;N]
WindowAggr: windowExpr=[[derivative(AVG(cpu.usage_idle), IntervalMonthDayNano("10000000000"), time) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS derivative(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, derivative(AVG(cpu.usage_idle)):Float64;N]
GapFill: groupBy=[time], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
@ -3995,7 +4002,7 @@ mod test {
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, non_negative_derivative [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), non_negative_derivative:Float64;N]
Filter: NOT non_negative_derivative IS NULL [time:Timestamp(Nanosecond, None), non_negative_derivative:Float64;N]
Projection: cpu.time AS time, non_negative_derivative(cpu.usage_idle) AS non_negative_derivative [time:Timestamp(Nanosecond, None), non_negative_derivative:Float64;N]
WindowAggr: windowExpr=[[AggregateUDF { name: "non_negative_derivative(unit: 1000000000)", signature: Signature { type_signature: OneOf([Exact([Timestamp(Nanosecond, None), Int64]), Exact([Timestamp(Nanosecond, None), UInt64]), Exact([Timestamp(Nanosecond, None), Float64])]), volatility: Immutable }, fun: "<FUNC>" }(cpu.time, cpu.usage_idle) ORDER BY [cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS non_negative_derivative(cpu.usage_idle)]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, non_negative_derivative(cpu.usage_idle):Float64;N]
WindowAggr: windowExpr=[[non_negative_derivative(cpu.usage_idle, IntervalMonthDayNano("1000000000"), cpu.time) ORDER BY [cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS non_negative_derivative(cpu.usage_idle)]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, non_negative_derivative(cpu.usage_idle):Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
@ -4005,7 +4012,7 @@ mod test {
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, non_negative_derivative [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, non_negative_derivative:Float64;N]
Filter: NOT non_negative_derivative IS NULL [time:Timestamp(Nanosecond, None);N, non_negative_derivative:Float64;N]
Projection: time, non_negative_derivative(AVG(cpu.usage_idle)) AS non_negative_derivative [time:Timestamp(Nanosecond, None);N, non_negative_derivative:Float64;N]
WindowAggr: windowExpr=[[AggregateUDF { name: "non_negative_derivative(unit: 10000000000)", signature: Signature { type_signature: OneOf([Exact([Timestamp(Nanosecond, None), Int64]), Exact([Timestamp(Nanosecond, None), UInt64]), Exact([Timestamp(Nanosecond, None), Float64])]), volatility: Immutable }, fun: "<FUNC>" }(time, AVG(cpu.usage_idle)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS non_negative_derivative(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, non_negative_derivative(AVG(cpu.usage_idle)):Float64;N]
WindowAggr: windowExpr=[[non_negative_derivative(AVG(cpu.usage_idle), IntervalMonthDayNano("10000000000"), time) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS non_negative_derivative(AVG(cpu.usage_idle))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, non_negative_derivative(AVG(cpu.usage_idle)):Float64;N]
GapFill: groupBy=[time], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]

View File

@ -1,13 +1,12 @@
use crate::{error, NUMERICS};
use arrow::array::{Array, ArrayRef, Int64Array};
use arrow::datatypes::{DataType, TimeUnit};
use arrow::datatypes::DataType;
use datafusion::common::{downcast_value, DataFusionError, Result, ScalarValue};
use datafusion::logical_expr::{
Accumulator, AccumulatorFactoryFunction, AggregateUDF, ReturnTypeFunction, Signature,
StateTypeFunction, TypeSignature, Volatility,
};
use once_cell::sync::Lazy;
use std::mem::replace;
use std::sync::Arc;
/// Name of the `MOVING_AVERAGE` user-defined aggregate function.
@ -148,222 +147,3 @@ impl Accumulator for AvgNAccumulator {
- std::mem::size_of_val(&self.data_type)
}
}
/// NonNegative is a wrapper around an Accumulator that transposes
/// negative value to be NULL.
#[derive(Debug)]
struct NonNegative<T> {
acc: T,
}
impl<T> NonNegative<T> {
fn new(acc: T) -> Self {
Self { acc }
}
}
impl<T: Accumulator> Accumulator for NonNegative<T> {
fn state(&self) -> Result<Vec<ScalarValue>> {
self.acc.state()
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
self.acc.update_batch(values)
}
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
self.acc.merge_batch(states)
}
fn evaluate(&self) -> Result<ScalarValue> {
Ok(match self.acc.evaluate()? {
ScalarValue::Float64(Some(v)) if v < 0.0 => ScalarValue::Float64(None),
ScalarValue::Int64(Some(v)) if v < 0 => ScalarValue::Int64(None),
v => v,
})
}
fn size(&self) -> usize {
self.acc.size()
}
}
/// Name of the `DERIVATIVE` user-defined aggregate function.
pub(crate) const DERIVATIVE_NAME: &str = "derivative";
pub(crate) fn derivative_udf(unit: i64) -> AggregateUDF {
let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64)));
let accumulator: AccumulatorFactoryFunction =
Arc::new(move |_| Ok(Box::new(DerivativeAccumulator::new(unit))));
let state_type: StateTypeFunction = Arc::new(|_| Ok(Arc::new(vec![])));
let sig = Signature::one_of(
NUMERICS
.iter()
.map(|dt| {
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
dt.clone(),
])
})
.collect(),
Volatility::Immutable,
);
AggregateUDF::new(
format!("{DERIVATIVE_NAME}(unit: {unit})").as_str(),
&sig,
&return_type,
&accumulator,
// State shouldn't be called, so no schema to report
&state_type,
)
}
/// Name of the `NON_NEGATIVE_DERIVATIVE` user-defined aggregate function.
pub(crate) const NON_NEGATIVE_DERIVATIVE_NAME: &str = "non_negative_derivative";
pub(crate) fn non_negative_derivative_udf(unit: i64) -> AggregateUDF {
let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64)));
let accumulator: AccumulatorFactoryFunction = Arc::new(move |_| {
Ok(Box::new(NonNegative::<_>::new(DerivativeAccumulator::new(
unit,
))))
});
let state_type: StateTypeFunction = Arc::new(|_| Ok(Arc::new(vec![])));
let sig = Signature::one_of(
NUMERICS
.iter()
.map(|dt| {
TypeSignature::Exact(vec![
DataType::Timestamp(TimeUnit::Nanosecond, None),
dt.clone(),
])
})
.collect(),
Volatility::Immutable,
);
AggregateUDF::new(
format!("{NON_NEGATIVE_DERIVATIVE_NAME}(unit: {unit})").as_str(),
&sig,
&return_type,
&accumulator,
// State shouldn't be called, so no schema to report
&state_type,
)
}
#[derive(Debug)]
struct DerivativeAccumulator {
unit: i64,
prev: Option<Point>,
curr: Option<Point>,
}
impl DerivativeAccumulator {
fn new(unit: i64) -> Self {
Self {
unit,
prev: None,
curr: None,
}
}
}
impl Accumulator for DerivativeAccumulator {
/// `state` is only called when used as an aggregate function. It can be
/// can safely left unimplemented, as this accumulator is only used as a window aggregate.
///
/// See: <https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.Accumulator.html#tymethod.state>
fn state(&self) -> Result<Vec<ScalarValue>> {
error::internal("unexpected call to DerivativeAccumulator::state")
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
}
let times = &values[0];
let arr = &values[1];
for index in 0..arr.len() {
let time = match ScalarValue::try_from_array(times, index)? {
ScalarValue::TimestampNanosecond(Some(ts), _) => ts,
v => {
return Err(DataFusionError::Internal(format!(
"invalid time value: {}",
v
)))
}
};
let curr = Point::new(time, ScalarValue::try_from_array(arr, index)?);
let prev = replace(&mut self.curr, curr);
// don't replace the previous value if the current value has the same timestamp.
if self.prev.is_none()
|| prev
.as_ref()
.is_some_and(|prev| prev.time > self.prev.as_ref().unwrap().time)
{
self.prev = prev
}
}
Ok(())
}
/// `merge_batch` is only called when used as an aggregate function. It can be
/// can safely left unimplemented, as this accumulator is only used as a window aggregate.
///
/// See: <https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.Accumulator.html#tymethod.state>
fn merge_batch(&mut self, _states: &[ArrayRef]) -> Result<()> {
error::internal("unexpected call to DerivativeAccumulator::merge_batch")
}
fn evaluate(&self) -> Result<ScalarValue> {
Ok(ScalarValue::Float64(
self.curr
.as_ref()
.and_then(|c| c.derivative(self.prev.as_ref(), self.unit)),
))
}
fn size(&self) -> usize {
std::mem::size_of_val(self)
}
}
#[derive(Debug)]
struct Point {
time: i64,
value: ScalarValue,
}
impl Point {
fn new(time: i64, value: ScalarValue) -> Option<Self> {
if value.is_null() {
None
} else {
Some(Self { time, value })
}
}
fn value_as_f64(&self) -> f64 {
match self.value {
ScalarValue::Int64(Some(v)) => v as f64,
ScalarValue::Float64(Some(v)) => v,
ScalarValue::UInt64(Some(v)) => v as f64,
_ => panic!("invalid point {:?}", self),
}
}
fn derivative(&self, prev: Option<&Self>, unit: i64) -> Option<f64> {
prev.and_then(|prev| {
let diff = self.value_as_f64() - prev.value_as_f64();
let elapsed = match self.time - prev.time {
// if the time hasn't changed then it is a NULL.
0 => return None,
v => v,
} as f64;
let devisor = elapsed / (unit as f64);
Some(diff / devisor)
})
}
}

View File

@ -7,7 +7,7 @@
use crate::plan::util_copy::find_exprs_in_exprs;
use crate::{error, NUMERICS};
use arrow::datatypes::DataType;
use arrow::datatypes::{DataType, TimeUnit};
use datafusion::logical_expr::{
Expr, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature, TypeSignature,
Volatility,
@ -131,13 +131,21 @@ pub(crate) fn derivative(args: Vec<Expr>) -> Expr {
/// Definition of the `DERIVATIVE` function.
static DERIVATIVE: Lazy<Arc<ScalarUDF>> = Lazy::new(|| {
let return_type_fn: ReturnTypeFunction = Arc::new(|args| Ok(Arc::new(args[0].clone())));
let return_type_fn: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64)));
Arc::new(ScalarUDF::new(
DERIVATIVE_UDF_NAME,
&Signature::one_of(
NUMERICS
.iter()
.map(|dt| TypeSignature::Exact(vec![dt.clone()]))
.flat_map(|dt| {
vec![
TypeSignature::Exact(vec![dt.clone()]),
TypeSignature::Exact(vec![
dt.clone(),
DataType::Duration(TimeUnit::Nanosecond),
]),
]
})
.collect(),
Volatility::Immutable,
),
@ -155,13 +163,21 @@ pub(crate) fn non_negative_derivative(args: Vec<Expr>) -> Expr {
/// Definition of the `NON_NEGATIVE_DERIVATIVE` function.
static NON_NEGATIVE_DERIVATIVE: Lazy<Arc<ScalarUDF>> = Lazy::new(|| {
let return_type_fn: ReturnTypeFunction = Arc::new(|args| Ok(Arc::new(args[0].clone())));
let return_type_fn: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64)));
Arc::new(ScalarUDF::new(
NON_NEGATIVE_DERIVATIVE_UDF_NAME,
&Signature::one_of(
NUMERICS
.iter()
.map(|dt| TypeSignature::Exact(vec![dt.clone()]))
.flat_map(|dt| {
vec![
TypeSignature::Exact(vec![dt.clone()]),
TypeSignature::Exact(vec![
dt.clone(),
DataType::Duration(TimeUnit::Nanosecond),
]),
]
})
.collect(),
Volatility::Immutable,
),

View File

@ -7,6 +7,7 @@ use once_cell::sync::Lazy;
use std::sync::Arc;
mod cumulative_sum;
mod derivative;
mod difference;
mod non_negative;
mod percent_row_number;
@ -25,6 +26,20 @@ pub(crate) static CUMULATIVE_SUM: Lazy<WindowFunction> = Lazy::new(|| {
)))
});
/// Definition of the `DERIVATIVE` user-defined window function.
pub(crate) static DERIVATIVE: Lazy<WindowFunction> = Lazy::new(|| {
let return_type: ReturnTypeFunction = Arc::new(derivative::return_type);
let partition_evaluator_factory: PartitionEvaluatorFactory =
Arc::new(derivative::partition_evaluator_factory);
WindowFunction::WindowUDF(Arc::new(WindowUDF::new(
derivative::NAME,
&derivative::SIGNATURE,
&return_type,
&partition_evaluator_factory,
)))
});
/// Definition of the `DIFFERENCE` user-defined window function.
pub(crate) static DIFFERENCE: Lazy<WindowFunction> = Lazy::new(|| {
let return_type: ReturnTypeFunction = Arc::new(difference::return_type);
@ -39,6 +54,25 @@ pub(crate) static DIFFERENCE: Lazy<WindowFunction> = Lazy::new(|| {
)))
});
const NON_NEGATIVE_DERIVATIVE_NAME: &str = "non_negative_derivative";
/// Definition of the `NON_NEGATIVE_DERIVATIVE` user-defined window function.
pub(crate) static NON_NEGATIVE_DERIVATIVE: Lazy<WindowFunction> = Lazy::new(|| {
let return_type: ReturnTypeFunction = Arc::new(derivative::return_type);
let partition_evaluator_factory: PartitionEvaluatorFactory = Arc::new(|| {
Ok(non_negative::wrapper(
derivative::partition_evaluator_factory()?,
))
});
WindowFunction::WindowUDF(Arc::new(WindowUDF::new(
NON_NEGATIVE_DERIVATIVE_NAME,
&derivative::SIGNATURE,
&return_type,
&partition_evaluator_factory,
)))
});
const NON_NEGATIVE_DIFFERENCE_NAME: &str = "non_negative_difference";
/// Definition of the `NON_NEGATIVE_DIFFERENCE` user-defined window function.

View File

@ -0,0 +1,125 @@
use crate::{error, NUMERICS};
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::{DataType, TimeUnit};
use datafusion::common::{Result, ScalarValue};
use datafusion::logical_expr::{PartitionEvaluator, Signature, TypeSignature, Volatility};
use once_cell::sync::Lazy;
use std::borrow::Borrow;
use std::sync::Arc;
/// The name of the derivative window function.
pub(super) const NAME: &str = "derivative";
/// Valid signatures for the derivative window function.
pub(super) static SIGNATURE: Lazy<Signature> = Lazy::new(|| {
Signature::one_of(
NUMERICS
.iter()
.map(|dt| {
TypeSignature::Exact(vec![
dt.clone(),
DataType::Duration(TimeUnit::Nanosecond),
DataType::Timestamp(TimeUnit::Nanosecond, None),
])
})
.collect(),
Volatility::Immutable,
)
});
/// Calculate the return type given the function signature.
pub(super) fn return_type(_: &[DataType]) -> Result<Arc<DataType>> {
Ok(Arc::new(DataType::Float64))
}
/// Create a new partition_evaluator_factory.
pub(super) fn partition_evaluator_factory() -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(DifferencePartitionEvaluator {}))
}
/// PartitionEvaluator which returns the derivative between input values,
/// in the provided units.
#[derive(Debug)]
struct DifferencePartitionEvaluator {}
impl PartitionEvaluator for DifferencePartitionEvaluator {
fn evaluate_all(&mut self, values: &[ArrayRef], _num_rows: usize) -> Result<Arc<dyn Array>> {
assert_eq!(values.len(), 3);
let array = Arc::clone(&values[0]);
let times = Arc::clone(&values[2]);
// The second element of the values array is the second argument to
// the 'derivative' function. This specifies the unit duration for the
// derivation to use.
//
// INVARIANT:
// The planner guarantees that the second argument is always a duration
// literal.
let unit = ScalarValue::try_from_array(&values[1], 0)?;
let mut idx: usize = 0;
let mut last: ScalarValue = array.data_type().try_into()?;
let mut last_time: ScalarValue = times.data_type().try_into()?;
let mut derivative: Vec<ScalarValue> = vec![];
while idx < array.len() {
last = ScalarValue::try_from_array(&array, idx)?;
last_time = ScalarValue::try_from_array(&times, idx)?;
derivative.push(ScalarValue::Float64(None));
idx += 1;
if !last.is_null() {
break;
}
}
while idx < array.len() {
let v = ScalarValue::try_from_array(&array, idx)?;
let t = ScalarValue::try_from_array(&times, idx)?;
if v.is_null() {
derivative.push(ScalarValue::Float64(None));
} else {
derivative.push(ScalarValue::Float64(Some(
delta(&v, &last)? / delta_time(&t, &last_time, &unit)?,
)));
last = v.clone();
last_time = t.clone();
}
idx += 1;
}
Ok(Arc::new(ScalarValue::iter_to_array(derivative)?))
}
fn uses_window_frame(&self) -> bool {
false
}
fn include_rank(&self) -> bool {
false
}
}
fn delta(curr: &ScalarValue, prev: &ScalarValue) -> Result<f64> {
match (curr.borrow(), prev.borrow()) {
(ScalarValue::Float64(Some(curr)), ScalarValue::Float64(Some(prev))) => Ok(*curr - *prev),
(ScalarValue::Int64(Some(curr)), ScalarValue::Int64(Some(prev))) => {
Ok(*curr as f64 - *prev as f64)
}
(ScalarValue::UInt64(Some(curr)), ScalarValue::UInt64(Some(prev))) => {
Ok(*curr as f64 - *prev as f64)
}
_ => error::internal("derivative attempted on unsupported values"),
}
}
fn delta_time(curr: &ScalarValue, prev: &ScalarValue, unit: &ScalarValue) -> Result<f64> {
if let (
ScalarValue::TimestampNanosecond(Some(curr), _),
ScalarValue::TimestampNanosecond(Some(prev), _),
ScalarValue::IntervalMonthDayNano(Some(unit)),
) = (curr, prev, unit)
{
Ok((*curr as f64 - *prev as f64) / *unit as f64)
} else {
error::internal("derivative attempted on unsupported values")
}
}