diff --git a/iox_query_influxql/src/plan/mod.rs b/iox_query_influxql/src/plan/mod.rs index 15e6f5e957..df2be38a0d 100644 --- a/iox_query_influxql/src/plan/mod.rs +++ b/iox_query_influxql/src/plan/mod.rs @@ -7,7 +7,6 @@ mod planner_rewrite_expression; mod planner_time_range_expression; mod rewriter; mod test_utils; -mod udaf; mod udf; mod util; mod util_copy; diff --git a/iox_query_influxql/src/plan/planner.rs b/iox_query_influxql/src/plan/planner.rs index 8529a97475..1a79943e77 100644 --- a/iox_query_influxql/src/plan/planner.rs +++ b/iox_query_influxql/src/plan/planner.rs @@ -9,7 +9,6 @@ use crate::plan::planner::select::{ }; use crate::plan::planner_time_range_expression::time_range_to_df_expr; use crate::plan::rewriter::{find_table_names, rewrite_statement, ProjectionType}; -use crate::plan::udaf::MOVING_AVERAGE; use crate::plan::udf::{ cumulative_sum, derivative, difference, find_window_udfs, moving_average, non_negative_derivative, non_negative_difference, @@ -18,8 +17,8 @@ use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, IQLSchema}; use crate::plan::var_ref::var_ref_data_type_to_data_type; use crate::plan::{planner_rewrite_expression, udf, util_copy}; use crate::window::{ - CUMULATIVE_SUM, DERIVATIVE, DIFFERENCE, NON_NEGATIVE_DERIVATIVE, NON_NEGATIVE_DIFFERENCE, - PERCENT_ROW_NUMBER, + CUMULATIVE_SUM, DERIVATIVE, DIFFERENCE, MOVING_AVERAGE, NON_NEGATIVE_DERIVATIVE, + NON_NEGATIVE_DIFFERENCE, PERCENT_ROW_NUMBER, }; use arrow::array::{StringBuilder, StringDictionaryBuilder}; use arrow::datatypes::{DataType, Field as ArrowField, Int32Type, Schema as ArrowSchema}; @@ -1470,14 +1469,14 @@ impl<'a> InfluxQLToLogicalPlan<'a> { match udf::WindowFunction::try_from_scalar_udf(Arc::clone(&fun)) { Some(udf::WindowFunction::MovingAverage) => Ok(Expr::WindowFunction(WindowFunction { - fun: window_function::WindowFunction::AggregateUDF(MOVING_AVERAGE.clone()), + fun: MOVING_AVERAGE.clone(), args, partition_by, order_by, window_frame: WindowFrame { units: WindowFrameUnits::Rows, start_bound: WindowFrameBound::Preceding(ScalarValue::Null), - end_bound: WindowFrameBound::CurrentRow, + end_bound: WindowFrameBound::Following(ScalarValue::Null), }, }) .alias(alias)), @@ -3966,7 +3965,7 @@ mod test { Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, moving_average [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None), moving_average:Float64;N] Filter: NOT moving_average IS NULL [time:Timestamp(Nanosecond, None), moving_average:Float64;N] Projection: cpu.time AS time, moving_average(cpu.usage_idle,Int64(3)) AS moving_average [time:Timestamp(Nanosecond, None), moving_average:Float64;N] - WindowAggr: windowExpr=[[AggregateUDF { name: "moving_average", signature: Signature { type_signature: OneOf([Exact([Int64, Int64]), Exact([UInt64, Int64]), Exact([Float64, Int64])]), volatility: Immutable }, fun: "" }(cpu.usage_idle, Int64(3)) ORDER BY [cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS moving_average(cpu.usage_idle,Int64(3))]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, moving_average(cpu.usage_idle,Int64(3)):Float64;N] + WindowAggr: windowExpr=[[moving_average(cpu.usage_idle, Int64(3)) ORDER BY [cpu.time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS moving_average(cpu.usage_idle,Int64(3))]] [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N, moving_average(cpu.usage_idle,Int64(3)):Float64;N] TableScan: cpu [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] "###); @@ -3976,7 +3975,7 @@ mod test { Projection: Dictionary(Int32, Utf8("cpu")) AS iox::measurement, time, moving_average [iox::measurement:Dictionary(Int32, Utf8), time:Timestamp(Nanosecond, None);N, moving_average:Float64;N] Filter: NOT moving_average IS NULL [time:Timestamp(Nanosecond, None);N, moving_average:Float64;N] Projection: time, moving_average(AVG(cpu.usage_idle),Int64(3)) AS moving_average [time:Timestamp(Nanosecond, None);N, moving_average:Float64;N] - WindowAggr: windowExpr=[[AggregateUDF { name: "moving_average", signature: Signature { type_signature: OneOf([Exact([Int64, Int64]), Exact([UInt64, Int64]), Exact([Float64, Int64])]), volatility: Immutable }, fun: "" }(AVG(cpu.usage_idle), Int64(3)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS moving_average(AVG(cpu.usage_idle),Int64(3))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, moving_average(AVG(cpu.usage_idle),Int64(3)):Float64;N] + WindowAggr: windowExpr=[[moving_average(AVG(cpu.usage_idle), Int64(3)) ORDER BY [time ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS moving_average(AVG(cpu.usage_idle),Int64(3))]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N, moving_average(AVG(cpu.usage_idle),Int64(3)):Float64;N] GapFill: groupBy=[time], aggr=[[AVG(cpu.usage_idle)]], time_column=time, stride=IntervalMonthDayNano("10000000000"), range=Unbounded..Included(Literal(TimestampNanosecond(1672531200000000000, None))) [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("10000000000"), cpu.time, TimestampNanosecond(0, None)) AS time]], aggr=[[AVG(cpu.usage_idle)]] [time:Timestamp(Nanosecond, None);N, AVG(cpu.usage_idle):Float64;N] Filter: cpu.time <= TimestampNanosecond(1672531200000000000, None) [cpu:Dictionary(Int32, Utf8);N, host:Dictionary(Int32, Utf8);N, region:Dictionary(Int32, Utf8);N, time:Timestamp(Nanosecond, None), usage_idle:Float64;N, usage_system:Float64;N, usage_user:Float64;N] diff --git a/iox_query_influxql/src/plan/udaf.rs b/iox_query_influxql/src/plan/udaf.rs deleted file mode 100644 index 3b0eaed6a6..0000000000 --- a/iox_query_influxql/src/plan/udaf.rs +++ /dev/null @@ -1,149 +0,0 @@ -use crate::{error, NUMERICS}; -use arrow::array::{Array, ArrayRef, Int64Array}; -use arrow::datatypes::DataType; -use datafusion::common::{downcast_value, DataFusionError, Result, ScalarValue}; -use datafusion::logical_expr::{ - Accumulator, AccumulatorFactoryFunction, AggregateUDF, ReturnTypeFunction, Signature, - StateTypeFunction, TypeSignature, Volatility, -}; -use once_cell::sync::Lazy; -use std::sync::Arc; - -/// Name of the `MOVING_AVERAGE` user-defined aggregate function. -pub(crate) const MOVING_AVERAGE_NAME: &str = "moving_average"; - -/// Definition of the `MOVING_AVERAGE` user-defined aggregate function. -pub(crate) static MOVING_AVERAGE: Lazy> = Lazy::new(|| { - let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Float64))); - let accumulator: AccumulatorFactoryFunction = - Arc::new(|_| Ok(Box::new(AvgNAccumulator::new(&DataType::Float64)))); - let state_type: StateTypeFunction = Arc::new(|_| Ok(Arc::new(vec![]))); - Arc::new(AggregateUDF::new( - MOVING_AVERAGE_NAME, - &Signature::one_of( - NUMERICS - .iter() - .map(|dt| TypeSignature::Exact(vec![dt.clone(), DataType::Int64])) - .collect(), - Volatility::Immutable, - ), - &return_type, - &accumulator, - // State shouldn't be called, so no schema to report - &state_type, - )) -}); - -/// A moving average accumulator that accumulates exactly `N` values -/// before producing a non-null result. -#[derive(Debug)] -struct AvgNAccumulator { - /// The data type of the values being accumulated in [`Self::all_values`]. - data_type: DataType, - all_values: Vec, - /// Holds the number of non-null values to be accumulated and represents - /// the second argument to the UDAF. - n: usize, - /// The index into [`Self::all_values`] to store the next non-null value. - i: usize, - /// `true` if the last value observed was `NULL` - last_is_null: bool, -} - -impl AvgNAccumulator { - /// Creates a new `AvgNAccumulator` - fn new(datatype: &DataType) -> Self { - Self { - data_type: datatype.clone(), - all_values: vec![], - n: 0, - i: 0, - last_is_null: true, - } - } -} - -impl Accumulator for AvgNAccumulator { - /// `state` is only called when used as an aggregate function. It can be - /// can safely left unimplemented, as this accumulator is only used as a window aggregate. - /// - /// See: - fn state(&self) -> Result> { - error::internal("unexpected call to AvgNAccumulator::state") - } - - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - assert_eq!(values.len(), 2, "AVG_N expects two arguments"); - - // The second element of the values array is the second argument to the `moving_average` - // function, which specifies the minimum number of values that must be aggregated. - // - // INVARIANT: - // The planner and rewriter guarantee that the second argument is - // always a numeric constant. - // - // See: FieldChecker::check_moving_average - let n_values = downcast_value!(&values[1], Int64Array); - let n = n_values.value(0) as usize; - // first observation of the second argument, N - if self.n == 0 { - assert!(self.all_values.is_empty()); - self.n = n; - self.all_values = vec![ScalarValue::try_from(&self.data_type)?; n]; - } - - let array = &values[0]; - match array.len() { - 1 => { - let value = ScalarValue::try_from_array(array, 0)?; - self.last_is_null = value.is_null(); - if !self.last_is_null { - self.all_values[self.i] = value; - self.i = (self.i + 1) % self.n; - } - - Ok(()) - } - n => error::internal(format!( - "AvgNAccumulator: unexpected number of rows: got {n}, expected 1" - )), - } - } - - /// `merge_batch` is only called when used as an aggregate function. It can be - /// can safely left unimplemented, as this accumulator is only used as a window aggregate. - /// - /// See: - fn merge_batch(&mut self, _states: &[ArrayRef]) -> Result<()> { - error::internal("unexpected call to AvgNAccumulator::merge_batch") - } - - fn evaluate(&self) -> Result { - if self.last_is_null || self.all_values.iter().any(|v| v.is_null()) { - return ScalarValue::try_from(&self.data_type); - } - - let sum: ScalarValue = self - .all_values - .iter() - .cloned() - .reduce(|acc, v| acc.add(v).unwrap()) - // safe to unwrap, as all_values is known to contain only the same primitive values - .unwrap(); - - let n = self.n as f64; - Ok(match sum { - ScalarValue::Float64(Some(v)) => ScalarValue::from(v / n), - ScalarValue::Int64(Some(v)) => ScalarValue::from(v as f64 / n), - ScalarValue::UInt64(Some(v)) => ScalarValue::from(v as f64 / n), - _ => return error::internal("unexpected scalar value type"), - }) - } - - fn size(&self) -> usize { - std::mem::size_of_val(self) + ScalarValue::size_of_vec(&self.all_values) - - std::mem::size_of_val(&self.all_values) - + self.data_type.size() - - std::mem::size_of_val(&self.data_type) - } -} diff --git a/iox_query_influxql/src/window.rs b/iox_query_influxql/src/window.rs index 9e04438780..ced7f04ce1 100644 --- a/iox_query_influxql/src/window.rs +++ b/iox_query_influxql/src/window.rs @@ -9,6 +9,7 @@ use std::sync::Arc; mod cumulative_sum; mod derivative; mod difference; +mod moving_average; mod non_negative; mod percent_row_number; @@ -54,6 +55,20 @@ pub(crate) static DIFFERENCE: Lazy = Lazy::new(|| { ))) }); +/// Definition of the `MOVING_AVERAGE` user-defined window function. +pub(crate) static MOVING_AVERAGE: Lazy = Lazy::new(|| { + let return_type: ReturnTypeFunction = Arc::new(moving_average::return_type); + let partition_evaluator_factory: PartitionEvaluatorFactory = + Arc::new(moving_average::partition_evaluator_factory); + + WindowFunction::WindowUDF(Arc::new(WindowUDF::new( + moving_average::NAME, + &moving_average::SIGNATURE, + &return_type, + &partition_evaluator_factory, + ))) +}); + const NON_NEGATIVE_DERIVATIVE_NAME: &str = "non_negative_derivative"; /// Definition of the `NON_NEGATIVE_DERIVATIVE` user-defined window function. diff --git a/iox_query_influxql/src/window/moving_average.rs b/iox_query_influxql/src/window/moving_average.rs new file mode 100644 index 0000000000..3702e691f4 --- /dev/null +++ b/iox_query_influxql/src/window/moving_average.rs @@ -0,0 +1,98 @@ +use crate::{error, NUMERICS}; +use arrow::array::{Array, ArrayRef, Int64Array}; +use arrow::datatypes::DataType; +use datafusion::common::{downcast_value, DataFusionError, Result, ScalarValue}; +use datafusion::logical_expr::{PartitionEvaluator, Signature, TypeSignature, Volatility}; +use once_cell::sync::Lazy; +use std::collections::VecDeque; +use std::sync::Arc; + +/// The name of the moving average window function. +pub(super) const NAME: &str = "moving_average"; + +/// Valid signatures for the moving average window function. +pub(super) static SIGNATURE: Lazy = Lazy::new(|| { + Signature::one_of( + NUMERICS + .iter() + .map(|dt| TypeSignature::Exact(vec![dt.clone(), DataType::Int64])) + .collect(), + Volatility::Immutable, + ) +}); + +/// Calculate the return type given the function signature. +pub(super) fn return_type(_: &[DataType]) -> Result> { + Ok(Arc::new(DataType::Float64)) +} + +/// Create a new partition_evaluator_factory. +pub(super) fn partition_evaluator_factory() -> Result> { + Ok(Box::new(AvgNPartitionEvaluator {})) +} + +/// PartitionEvaluator which returns a moving average of the input data.. +#[derive(Debug)] +struct AvgNPartitionEvaluator {} + +impl PartitionEvaluator for AvgNPartitionEvaluator { + fn evaluate_all(&mut self, values: &[ArrayRef], _num_rows: usize) -> Result> { + assert_eq!(values.len(), 2, "AVG_N expects two arguments"); + + // The second element of the values array is the second argument to the `moving_average` + // function, which specifies the minimum number of values that must be aggregated. + // + // INVARIANT: + // The planner and rewriter guarantee that the second argument is + // always a numeric constant. + // + // See: FieldChecker::check_moving_average + let n_values = downcast_value!(&values[1], Int64Array); + let n = n_values.value(0); + + let array = &values[0]; + let mut deq: VecDeque = VecDeque::new(); + let mut avg_n: Vec = vec![]; + for idx in 0..array.len() { + let value = match ScalarValue::try_from_array(&array, idx)? { + ScalarValue::Float64(o) => o, + ScalarValue::Int64(o) => o.map(|v| v as f64), + ScalarValue::UInt64(o) => o.map(|v| v as f64), + _ => { + return error::internal(format!( + "unsupported data type for moving_average ({})", + array.data_type() + )); + } + }; + match value { + None => { + avg_n.push(ScalarValue::Float64(None)); + continue; + } + Some(v) => { + deq.push_back(v); + if deq.len() > n as usize { + deq.pop_front(); + } + if deq.len() != n as usize { + avg_n.push(ScalarValue::Float64(None)); + continue; + } + avg_n.push(ScalarValue::Float64(Some( + deq.iter().sum::() / n as f64, + ))); + } + } + } + Ok(Arc::new(ScalarValue::iter_to_array(avg_n)?)) + } + + fn uses_window_frame(&self) -> bool { + false + } + + fn include_rank(&self) -> bool { + false + } +}