refactor(influxql): make MOVING_AVERAGE a user-defined window function (#8377)

Update the implementation of the MOVING_AVERAGE function to be a
user-defined window function allowing the values to be calculated
for the entire window in one go.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Martin Hilton 2023-08-01 07:30:06 +01:00 committed by GitHub
parent de79619e71
commit 25c3ce805d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 119 additions and 157 deletions

View File

@ -7,7 +7,6 @@ mod planner_rewrite_expression;
mod planner_time_range_expression;
mod rewriter;
mod test_utils;
mod udaf;
mod udf;
mod util;
mod util_copy;

View File

@ -9,7 +9,6 @@ use crate::plan::planner::select::{
};
use crate::plan::planner_time_range_expression::time_range_to_df_expr;
use crate::plan::rewriter::{find_table_names, rewrite_statement, ProjectionType};
use crate::plan::udaf::MOVING_AVERAGE;
use crate::plan::udf::{
cumulative_sum, derivative, difference, find_window_udfs, moving_average,
non_negative_derivative, non_negative_difference,
@ -18,8 +17,8 @@ use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, IQLSchema};
use crate::plan::var_ref::var_ref_data_type_to_data_type;
use crate::plan::{planner_rewrite_expression, udf, util_copy};
use crate::window::{
CUMULATIVE_SUM, DERIVATIVE, DIFFERENCE, NON_NEGATIVE_DERIVATIVE, NON_NEGATIVE_DIFFERENCE,
PERCENT_ROW_NUMBER,
CUMULATIVE_SUM, DERIVATIVE, DIFFERENCE, MOVING_AVERAGE, NON_NEGATIVE_DERIVATIVE,
NON_NEGATIVE_DIFFERENCE, PERCENT_ROW_NUMBER,
};
use arrow::array::{StringBuilder, StringDictionaryBuilder};
use arrow::datatypes::{DataType, Field as ArrowField, Int32Type, Schema as ArrowSchema};
@ -1470,14 +1469,14 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
match udf::WindowFunction::try_from_scalar_udf(Arc::clone(&fun)) {
Some(udf::WindowFunction::MovingAverage) => Ok(Expr::WindowFunction(WindowFunction {
fun: window_function::WindowFunction::AggregateUDF(MOVING_AVERAGE.clone()),
fun: MOVING_AVERAGE.clone(),
args,
partition_by,
order_by,
window_frame: WindowFrame {
units: WindowFrameUnits::Rows,
start_bound: WindowFrameBound::Preceding(ScalarValue::Null),
end_bound: WindowFrameBound::CurrentRow,
end_bound: WindowFrameBound::Following(ScalarValue::Null),
},
})
.alias(alias)),
@ -3966,7 +3965,7 @@ mod test {
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, moving_average [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), moving_average:Float64;N]
Filter: NOT moving_average IS NULL [time:Timestamp(Nanosecond, None), moving_average:Float64;N]
Projection: cpu.time AS time, moving_average(cpu.usage_idle,Int64(3)) AS moving_average [time:Timestamp(Nanosecond, None), moving_average:Float64;N]
WindowAggr: windowExpr=[[AggregateUDF { name: "moving_average", signature: Signature { type_signature: OneOf([Exact([Int64, Int64]), Exact([UInt64, Int64]), Exact([Float64, Int64])]), volatility: Immutable }, fun: "<FUNC>" }(cpu.usage_idle, Int64(3)) ORDER BY [cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS moving_average(cpu.usage_idle,Int64(3))]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, moving_average(cpu.usage_idle,Int64(3)):Float64;N]
WindowAggr: windowExpr=[[moving_average(cpu.usage_idle, Int64(3)) ORDER BY [cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS moving_average(cpu.usage_idle,Int64(3))]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, moving_average(cpu.usage_idle,Int64(3)):Float64;N]
TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]
"###);
@ -3976,7 +3975,7 @@ mod test {
Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, moving_average [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, moving_average:Float64;N]
Filter: NOT moving_average IS NULL [time:Timestamp(Nanosecond, None);N, moving_average:Float64;N]
Projection: time, moving_average(AVG(cpu.usage_idle),Int64(3)) AS moving_average [time:Timestamp(Nanosecond, None);N, moving_average:Float64;N]
WindowAggr: windowExpr=[[AggregateUDF { name: "moving_average", signature: Signature { type_signature: OneOf([Exact([Int64, Int64]), Exact([UInt64, Int64]), Exact([Float64, Int64])]), volatility: Immutable }, fun: "<FUNC>" }(AVG(cpu.usage_idle), Int64(3)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS moving_average(AVG(cpu.usage_idle),Int64(3))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, moving_average(AVG(cpu.usage_idle),Int64(3)):Float64;N]
WindowAggr: windowExpr=[[moving_average(AVG(cpu.usage_idle), Int64(3)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS moving_average(AVG(cpu.usage_idle),Int64(3))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, moving_average(AVG(cpu.usage_idle),Int64(3)):Float64;N]
GapFill: groupBy=[time], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N]
Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N]

View File

@ -1,149 +0,0 @@
use crate::{error, NUMERICS};
use arrow::array::{Array, ArrayRef, Int64Array};
use arrow::datatypes::DataType;
use datafusion::common::{downcast_value, DataFusionError, Result, ScalarValue};
use datafusion::logical_expr::{
Accumulator, AccumulatorFactoryFunction, AggregateUDF, ReturnTypeFunction, Signature,
StateTypeFunction, TypeSignature, Volatility,
};
use once_cell::sync::Lazy;
use std::sync::Arc;
/// Name of the `MOVING_AVERAGE` user-defined aggregate function.
pub(crate) const MOVING_AVERAGE_NAME: &str = "moving_average";
/// Definition of the `MOVING_AVERAGE` user-defined aggregate function.
pub(crate) static MOVING_AVERAGE: Lazy<Arc<AggregateUDF>> = Lazy::new(|| {
let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64)));
let accumulator: AccumulatorFactoryFunction =
Arc::new(|_| Ok(Box::new(AvgNAccumulator::new(&DataType::Float64))));
let state_type: StateTypeFunction = Arc::new(|_| Ok(Arc::new(vec![])));
Arc::new(AggregateUDF::new(
MOVING_AVERAGE_NAME,
&Signature::one_of(
NUMERICS
.iter()
.map(|dt| TypeSignature::Exact(vec![dt.clone(), DataType::Int64]))
.collect(),
Volatility::Immutable,
),
&return_type,
&accumulator,
// State shouldn't be called, so no schema to report
&state_type,
))
});
/// A moving average accumulator that accumulates exactly `N` values
/// before producing a non-null result.
#[derive(Debug)]
struct AvgNAccumulator {
/// The data type of the values being accumulated in [`Self::all_values`].
data_type: DataType,
all_values: Vec<ScalarValue>,
/// Holds the number of non-null values to be accumulated and represents
/// the second argument to the UDAF.
n: usize,
/// The index into [`Self::all_values`] to store the next non-null value.
i: usize,
/// `true` if the last value observed was `NULL`
last_is_null: bool,
}
impl AvgNAccumulator {
/// Creates a new `AvgNAccumulator`
fn new(datatype: &DataType) -> Self {
Self {
data_type: datatype.clone(),
all_values: vec![],
n: 0,
i: 0,
last_is_null: true,
}
}
}
impl Accumulator for AvgNAccumulator {
/// `state` is only called when used as an aggregate function. It can be
/// can safely left unimplemented, as this accumulator is only used as a window aggregate.
///
/// See: <https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.Accumulator.html#tymethod.state>
fn state(&self) -> Result<Vec<ScalarValue>> {
error::internal("unexpected call to AvgNAccumulator::state")
}
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
assert_eq!(values.len(), 2, "AVG_N expects two arguments");
// The second element of the values array is the second argument to the `moving_average`
// function, which specifies the minimum number of values that must be aggregated.
//
// INVARIANT:
// The planner and rewriter guarantee that the second argument is
// always a numeric constant.
//
// See: FieldChecker::check_moving_average
let n_values = downcast_value!(&values[1], Int64Array);
let n = n_values.value(0) as usize;
// first observation of the second argument, N
if self.n == 0 {
assert!(self.all_values.is_empty());
self.n = n;
self.all_values = vec![ScalarValue::try_from(&self.data_type)?; n];
}
let array = &values[0];
match array.len() {
1 => {
let value = ScalarValue::try_from_array(array, 0)?;
self.last_is_null = value.is_null();
if !self.last_is_null {
self.all_values[self.i] = value;
self.i = (self.i + 1) % self.n;
}
Ok(())
}
n => error::internal(format!(
"AvgNAccumulator: unexpected number of rows: got {n}, expected 1"
)),
}
}
/// `merge_batch` is only called when used as an aggregate function. It can be
/// can safely left unimplemented, as this accumulator is only used as a window aggregate.
///
/// See: <https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.Accumulator.html#tymethod.state>
fn merge_batch(&mut self, _states: &[ArrayRef]) -> Result<()> {
error::internal("unexpected call to AvgNAccumulator::merge_batch")
}
fn evaluate(&self) -> Result<ScalarValue> {
if self.last_is_null || self.all_values.iter().any(|v| v.is_null()) {
return ScalarValue::try_from(&self.data_type);
}
let sum: ScalarValue = self
.all_values
.iter()
.cloned()
.reduce(|acc, v| acc.add(v).unwrap())
// safe to unwrap, as all_values is known to contain only the same primitive values
.unwrap();
let n = self.n as f64;
Ok(match sum {
ScalarValue::Float64(Some(v)) => ScalarValue::from(v / n),
ScalarValue::Int64(Some(v)) => ScalarValue::from(v as f64 / n),
ScalarValue::UInt64(Some(v)) => ScalarValue::from(v as f64 / n),
_ => return error::internal("unexpected scalar value type"),
})
}
fn size(&self) -> usize {
std::mem::size_of_val(self) + ScalarValue::size_of_vec(&self.all_values)
- std::mem::size_of_val(&self.all_values)
+ self.data_type.size()
- std::mem::size_of_val(&self.data_type)
}
}

View File

@ -9,6 +9,7 @@ use std::sync::Arc;
mod cumulative_sum;
mod derivative;
mod difference;
mod moving_average;
mod non_negative;
mod percent_row_number;
@ -54,6 +55,20 @@ pub(crate) static DIFFERENCE: Lazy<WindowFunction> = Lazy::new(|| {
)))
});
/// Definition of the `MOVING_AVERAGE` user-defined window function.
pub(crate) static MOVING_AVERAGE: Lazy<WindowFunction> = Lazy::new(|| {
let return_type: ReturnTypeFunction = Arc::new(moving_average::return_type);
let partition_evaluator_factory: PartitionEvaluatorFactory =
Arc::new(moving_average::partition_evaluator_factory);
WindowFunction::WindowUDF(Arc::new(WindowUDF::new(
moving_average::NAME,
&moving_average::SIGNATURE,
&return_type,
&partition_evaluator_factory,
)))
});
const NON_NEGATIVE_DERIVATIVE_NAME: &str = "non_negative_derivative";
/// Definition of the `NON_NEGATIVE_DERIVATIVE` user-defined window function.

View File

@ -0,0 +1,98 @@
use crate::{error, NUMERICS};
use arrow::array::{Array, ArrayRef, Int64Array};
use arrow::datatypes::DataType;
use datafusion::common::{downcast_value, DataFusionError, Result, ScalarValue};
use datafusion::logical_expr::{PartitionEvaluator, Signature, TypeSignature, Volatility};
use once_cell::sync::Lazy;
use std::collections::VecDeque;
use std::sync::Arc;
/// The name of the moving average window function.
pub(super) const NAME: &str = "moving_average";
/// Valid signatures for the moving average window function.
pub(super) static SIGNATURE: Lazy<Signature> = Lazy::new(|| {
Signature::one_of(
NUMERICS
.iter()
.map(|dt| TypeSignature::Exact(vec![dt.clone(), DataType::Int64]))
.collect(),
Volatility::Immutable,
)
});
/// Calculate the return type given the function signature.
pub(super) fn return_type(_: &[DataType]) -> Result<Arc<DataType>> {
Ok(Arc::new(DataType::Float64))
}
/// Create a new partition_evaluator_factory.
pub(super) fn partition_evaluator_factory() -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(AvgNPartitionEvaluator {}))
}
/// PartitionEvaluator which returns a moving average of the input data..
#[derive(Debug)]
struct AvgNPartitionEvaluator {}
impl PartitionEvaluator for AvgNPartitionEvaluator {
fn evaluate_all(&mut self, values: &[ArrayRef], _num_rows: usize) -> Result<Arc<dyn Array>> {
assert_eq!(values.len(), 2, "AVG_N expects two arguments");
// The second element of the values array is the second argument to the `moving_average`
// function, which specifies the minimum number of values that must be aggregated.
//
// INVARIANT:
// The planner and rewriter guarantee that the second argument is
// always a numeric constant.
//
// See: FieldChecker::check_moving_average
let n_values = downcast_value!(&values[1], Int64Array);
let n = n_values.value(0);
let array = &values[0];
let mut deq: VecDeque<f64> = VecDeque::new();
let mut avg_n: Vec<ScalarValue> = vec![];
for idx in 0..array.len() {
let value = match ScalarValue::try_from_array(&array, idx)? {
ScalarValue::Float64(o) => o,
ScalarValue::Int64(o) => o.map(|v| v as f64),
ScalarValue::UInt64(o) => o.map(|v| v as f64),
_ => {
return error::internal(format!(
"unsupported data type for moving_average ({})",
array.data_type()
));
}
};
match value {
None => {
avg_n.push(ScalarValue::Float64(None));
continue;
}
Some(v) => {
deq.push_back(v);
if deq.len() > n as usize {
deq.pop_front();
}
if deq.len() != n as usize {
avg_n.push(ScalarValue::Float64(None));
continue;
}
avg_n.push(ScalarValue::Float64(Some(
deq.iter().sum::<f64>() / n as f64,
)));
}
}
}
Ok(Arc::new(ScalarValue::iter_to_array(avg_n)?))
}
fn uses_window_frame(&self) -> bool {
false
}
fn include_rank(&self) -> bool {
false
}
}