From 0de5a1e309a3c6d649c40862a7af96a77ad67085 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 17 Dec 2020 17:51:36 -0500 Subject: [PATCH] feat: Implement selector functions first, last, min, and max (#565) * feat: Implement selector functions first, last, min, and max * fix: update for changes in arrow * docs: reference to min/max boolean array ticket * docs: add reference to selector structs * docs: Update query/src/func/selectors.rs --- query/src/func.rs | 2 + query/src/func/selectors.rs | 686 +++++++++++++++++++++++++++ query/src/func/selectors/internal.rs | 623 ++++++++++++++++++++++++ query/src/lib.rs | 1 + 4 files changed, 1312 insertions(+) create mode 100644 query/src/func.rs create mode 100644 query/src/func/selectors.rs create mode 100644 query/src/func/selectors/internal.rs diff --git a/query/src/func.rs b/query/src/func.rs new file mode 100644 index 0000000000..0546195c76 --- /dev/null +++ b/query/src/func.rs @@ -0,0 +1,2 @@ +//! Special IOx functions used in DataFusion plans +pub mod selectors; diff --git a/query/src/func/selectors.rs b/query/src/func/selectors.rs new file mode 100644 index 0000000000..431b640455 --- /dev/null +++ b/query/src/func/selectors.rs @@ -0,0 +1,686 @@ +//! Implementaton of InfluxDB "Selector" Functions +//! +//! Selector functions are similar to aggregate functions in that they +//! collapse down an input set of rows into just one. +//! +//! Selector functions are different than aggregate functions because +//! they also return multiple column values rather than a single +//! scalar. Selector functions return the entire row that was +//! "selected" from the timeseries (value and time pair). +//! +//! Note: At the time of writing, DataFusion aggregate functions have +//! no way to handle aggregates that produce multiple columns. +//! +//! This module implements a workaround of "do the aggregation twice +//! with two distinct functions" to get something working. It should +//! should be removed when DataFusion / Arrow has proper support +use std::{fmt::Debug, sync::Arc}; + +use arrow_deps::{ + arrow::{array::ArrayRef, datatypes::DataType}, + datafusion::{ + error::{DataFusionError, Result as DataFusionResult}, + physical_plan::{ + aggregates::{AccumulatorFunctionImplementation, StateTypeFunction}, + functions::{ReturnTypeFunction, Signature}, + udaf::AggregateUDF, + Accumulator, + }, + scalar::ScalarValue, + }, +}; + +// Internal implementations of the selector functions +mod internal; +use internal::{ + BooleanFirstSelector, BooleanLastSelector, BooleanMaxSelector, BooleanMinSelector, + F64FirstSelector, F64LastSelector, F64MaxSelector, F64MinSelector, I64FirstSelector, + I64LastSelector, I64MaxSelector, I64MinSelector, Utf8FirstSelector, Utf8LastSelector, + Utf8MaxSelector, Utf8MinSelector, +}; + +/// Returns a DataFusion user defined aggregate function for computing +/// one field of the first() selector function. +/// +/// Note that until https://issues.apache.org/jira/browse/ARROW-10945 +/// is fixed, selector functions must be computed using two separate +/// function calls, one each for the value and time part +/// +/// first(value_column, timestamp_column) -> value and timestamp +/// +/// timestamp is the minimum value of the timestamp_column +/// +/// value is the value of the value_column at the position of the +/// minimum of the timestamp column. If there are multiple rows with +/// the minimum timestamp value, the value of the value_column is +/// arbitrarily picked +pub fn selector_first(data_type: &DataType, output: SelectorOutput) -> AggregateUDF { + let name = match output { + SelectorOutput::Value => "selector_first_value", + SelectorOutput::Time => "selector_first_time", + }; + + match data_type { + DataType::Float64 => make_uda::(name, output), + DataType::Int64 => make_uda::(name, output), + DataType::Utf8 => make_uda::(name, output), + DataType::Boolean => make_uda::(name, output), + _ => unimplemented!("first not supported for {:?}", data_type), + } +} + +/// Returns a DataFusion user defined aggregate function for computing +/// one field of the last() selector function. +/// +/// Note that until https://issues.apache.org/jira/browse/ARROW-10945 +/// is fixed, selector functions must be computed using two separate +/// function calls, one each for the value and time part +/// +/// selector_last(data_column, timestamp_column) -> value and timestamp +/// +/// timestamp is the maximum value of the timestamp_column +/// +/// value is the value of the data_column at the position of the +/// maximum of the timestamp column. If there are multiple rows with +/// the maximum timestamp value, the value of the data_column is +/// arbitrarily picked +pub fn selector_last(data_type: &DataType, output: SelectorOutput) -> AggregateUDF { + let name = match output { + SelectorOutput::Value => "selector_last_value", + SelectorOutput::Time => "selector_last_time", + }; + + match data_type { + DataType::Float64 => make_uda::(name, output), + DataType::Int64 => make_uda::(name, output), + DataType::Utf8 => make_uda::(name, output), + DataType::Boolean => make_uda::(name, output), + _ => unimplemented!("last not supported for {:?}", data_type), + } +} + +/// Returns a DataFusion user defined aggregate function for computing +/// one field of the min() selector function. +/// +/// Note that until https://issues.apache.org/jira/browse/ARROW-10945 +/// is fixed, selector functions must be computed using two separate +/// function calls, one each for the value and time part +/// +/// selector_min(data_column, timestamp_column) -> value and timestamp +/// +/// value is the minimum value of the data_column +/// +/// timestamp is the value of the timestamp_column at the position of +/// the minimum value_column. If there are multiple rows with the +/// minimum timestamp value, the value of the data_column with the +/// first (earliest/smallest) timestamp is chosen +pub fn selector_min(data_type: &DataType, output: SelectorOutput) -> AggregateUDF { + let name = match output { + SelectorOutput::Value => "selector_min_value", + SelectorOutput::Time => "selector_min_time", + }; + + match data_type { + DataType::Float64 => make_uda::(name, output), + DataType::Int64 => make_uda::(name, output), + DataType::Utf8 => make_uda::(name, output), + DataType::Boolean => make_uda::(name, output), + _ => unimplemented!("min not supported for {:?}", data_type), + } +} + +/// Returns a DataFusion user defined aggregate function for computing +/// one field of the max() selector function. +/// +/// Note that until https://issues.apache.org/jira/browse/ARROW-10945 +/// is fixed, selector functions must be computed using two separate +/// function calls, one each for the value and time part +/// +/// selector_max(data_column, timestamp_column) -> value and timestamp +/// +/// value is the maximum value of the data_column +/// +/// timestamp is the value of the timestamp_column at the position of +/// the maximum value_column. If there are multiple rows with the +/// maximum timestamp value, the value of the data_column with the +/// first (earliest/smallest) timestamp is chosen +pub fn selector_max(data_type: &DataType, output: SelectorOutput) -> AggregateUDF { + let name = match output { + SelectorOutput::Value => "selector_max_value", + SelectorOutput::Time => "selector_max_time", + }; + + match data_type { + DataType::Float64 => make_uda::(name, output), + DataType::Int64 => make_uda::(name, output), + DataType::Utf8 => make_uda::(name, output), + DataType::Boolean => make_uda::(name, output), + _ => unimplemented!("max not supported for {:?}", data_type), + } +} + +/// Implements the logic of the specific selector function (this is a +/// cutdown version of the Accumulator DataFusion trait, to allow +/// sharing between implementations) +trait Selector: Debug + Default + Send + Sync { + /// What type of values does this selector function work with (time is + /// always I64) + fn value_data_type() -> DataType; + + /// return state in a form that DataFusion can store during execution + fn datafusion_state(&self) -> DataFusionResult>; + + /// produces the final value of this selector for the specified output type + fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult; + + /// Update this selector's state based on values in value_arr and time_arr + fn update_batch(&mut self, value_arr: &ArrayRef, time_arr: &ArrayRef) -> DataFusionResult<()>; +} + +// Describes which part of the selector to return: the timestamp or +// the value (when https://issues.apache.org/jira/browse/ARROW-10945 +// is fixed, this enum should be removed) +#[derive(Debug, Clone, Copy)] +pub enum SelectorOutput { + Value, + Time, +} + +impl SelectorOutput { + /// return the data type produced for this type of output + fn return_type(&self, input_type: &DataType) -> DataType { + match self { + Self::Value => input_type.clone(), + // timestamps are always i64 + Self::Time => DataType::Int64, + } + } +} + +/// Factory function for creating the UDA function for DataFusion +fn make_uda(name: &'static str, output: SelectorOutput) -> AggregateUDF +where + SELECTOR: Selector + 'static, +{ + let value_data_type = SELECTOR::value_data_type(); + let input_signature = Signature::Exact(vec![value_data_type.clone(), DataType::Int64]); + + let state_type = Arc::new(vec![value_data_type.clone(), DataType::Int64]); + let state_type_factory: StateTypeFunction = Arc::new(move |_| Ok(state_type.clone())); + + let factory: AccumulatorFunctionImplementation = + Arc::new(move || Ok(Box::new(SelectorAccumulator::::new(output)))); + + let return_type = Arc::new(output.return_type(&value_data_type)); + let return_type_func: ReturnTypeFunction = Arc::new(move |_| Ok(return_type.clone())); + + AggregateUDF::new( + name, + &input_signature, + &return_type_func, + &factory, + &state_type_factory, + ) +} + +/// Structure that implements the Accumultator trait for DataFusion +/// and processes (value, timestamp) pair and computes values +#[derive(Debug)] +struct SelectorAccumulator +where + SELECTOR: Selector, +{ + // The underlying implementation for the selector + selector: SELECTOR, + // Determine which value is output + output: SelectorOutput, +} + +impl SelectorAccumulator +where + SELECTOR: Selector, +{ + pub fn new(output: SelectorOutput) -> Self { + Self { + output, + selector: SELECTOR::default(), + } + } +} + +impl Accumulator for SelectorAccumulator +where + SELECTOR: Selector + 'static, +{ + // this function serializes our state to a vector of + // `ScalarValue`s, which DataFusion uses to pass this state + // between execution stages. + fn state(&self) -> DataFusionResult> { + self.selector.datafusion_state() + } + + fn update(&mut self, _values: &Vec) -> DataFusionResult<()> { + unreachable!("Should only be calling update_batch for performance reasons"); + } + + // this function receives states from other accumulators + // (Vec) and updates the accumulator. + fn merge(&mut self, _states: &Vec) -> DataFusionResult<()> { + unreachable!("Should only be calling merge_batch for performance reasons"); + } + + // Return the final value of this aggregator. + fn evaluate(&self) -> DataFusionResult { + self.selector.evaluate(&self.output) + } + + // This function receives one entry per argument of this + // accumulator and updates the selector state function appropriately + fn update_batch(&mut self, values: &Vec) -> DataFusionResult<()> { + if values.is_empty() { + return Ok(()); + } + + if values.len() != 2 { + return Err(DataFusionError::Internal(format!( + "Internal error: Expected 2 arguments passed to selector function but got {}", + values.len() + ))); + } + + // invoke the actual worker function. + self.selector.update_batch(&values[0], &values[1])?; + Ok(()) + } + + // The input values and accumulator state are the same types for + // selectors, and thus we can merge intermediate states with the + // same function as inputs + fn merge_batch(&mut self, states: &Vec) -> DataFusionResult<()> { + // merge is the same operation as update for these selectors + self.update_batch(states) + } +} + +#[cfg(test)] +mod test { + use arrow_deps::{ + arrow::array::Float64Array, + arrow::array::Int64Array, + arrow::array::StringArray, + arrow::datatypes::{Field, Schema}, + arrow::record_batch::RecordBatch, + arrow::{array::BooleanArray, util::pretty::pretty_format_batches}, + datafusion::logical_plan::Expr, + datafusion::{datasource::MemTable, prelude::*}, + }; + + use super::*; + + #[tokio::test] + async fn test_selector_first() { + let cases = vec![ + ( + selector_first(&DataType::Float64, SelectorOutput::Value), + selector_first(&DataType::Float64, SelectorOutput::Time), + "f64_value", + vec![ + "+--------------------------------------+-------------------------------------+", + "| selector_first_value(f64_value,time) | selector_first_time(f64_value,time) |", + "+--------------------------------------+-------------------------------------+", + "| 2 | 1000 |", + "+--------------------------------------+-------------------------------------+", + "" + ], + ), + ( + selector_first(&DataType::Int64, SelectorOutput::Value), + selector_first(&DataType::Int64, SelectorOutput::Time), + "i64_value", + vec![ + "+--------------------------------------+-------------------------------------+", + "| selector_first_value(i64_value,time) | selector_first_time(i64_value,time) |", + "+--------------------------------------+-------------------------------------+", + "| 20 | 1000 |", + "+--------------------------------------+-------------------------------------+", + "", + ], + ), + ( + selector_first(&DataType::Utf8, SelectorOutput::Value), + selector_first(&DataType::Utf8, SelectorOutput::Time), + "string_value", + vec![ + "+-----------------------------------------+----------------------------------------+", + "| selector_first_value(string_value,time) | selector_first_time(string_value,time) |", + "+-----------------------------------------+----------------------------------------+", + "| two | 1000 |", + "+-----------------------------------------+----------------------------------------+", + "", + ], + ), + ( + selector_first(&DataType::Boolean, SelectorOutput::Value), + selector_first(&DataType::Boolean, SelectorOutput::Time), + "bool_value", + vec![ + "+---------------------------------------+--------------------------------------+", + "| selector_first_value(bool_value,time) | selector_first_time(bool_value,time) |", + "+---------------------------------------+--------------------------------------+", + "| true | 1000 |", + "+---------------------------------------+--------------------------------------+", + "", + ], + ) + ]; + + for (val_func, time_func, val_column, expected) in cases.into_iter() { + let args = vec![col(val_column), col("time")]; + let aggs = vec![val_func.call(args.clone()), time_func.call(args)]; + let actual = run_plan(aggs).await; + + assert_eq!( + expected, actual, + "\n\nEXPECTED:\n{:#?}\nACTUAL:\n{:#?}\n", + expected, actual + ); + } + } + + #[tokio::test] + async fn test_selector_last() { + let cases = vec![ + ( + selector_last(&DataType::Float64, SelectorOutput::Value), + selector_last(&DataType::Float64, SelectorOutput::Time), + "f64_value", + vec![ + "+-------------------------------------+------------------------------------+", + "| selector_last_value(f64_value,time) | selector_last_time(f64_value,time) |", + "+-------------------------------------+------------------------------------+", + "| 3 | 6000 |", + "+-------------------------------------+------------------------------------+", + "", + ], + ), + ( + selector_last(&DataType::Int64, SelectorOutput::Value), + selector_last(&DataType::Int64, SelectorOutput::Time), + "i64_value", + vec![ + "+-------------------------------------+------------------------------------+", + "| selector_last_value(i64_value,time) | selector_last_time(i64_value,time) |", + "+-------------------------------------+------------------------------------+", + "| 30 | 6000 |", + "+-------------------------------------+------------------------------------+", + "", + ], + ), + ( + selector_last(&DataType::Utf8, SelectorOutput::Value), + selector_last(&DataType::Utf8, SelectorOutput::Time), + "string_value", + vec![ + "+----------------------------------------+---------------------------------------+", + "| selector_last_value(string_value,time) | selector_last_time(string_value,time) |", + "+----------------------------------------+---------------------------------------+", + "| three | 6000 |", + "+----------------------------------------+---------------------------------------+", + "", + ], + ), + ( + selector_last(&DataType::Boolean, SelectorOutput::Value), + selector_last(&DataType::Boolean, SelectorOutput::Time), + "bool_value", + vec![ + "+--------------------------------------+-------------------------------------+", + "| selector_last_value(bool_value,time) | selector_last_time(bool_value,time) |", + "+--------------------------------------+-------------------------------------+", + "| false | 6000 |", + "+--------------------------------------+-------------------------------------+", + "", + ], + ) + ]; + + for (val_func, time_func, val_column, expected) in cases.into_iter() { + let args = vec![col(val_column), col("time")]; + let aggs = vec![val_func.call(args.clone()), time_func.call(args)]; + let actual = run_plan(aggs).await; + + assert_eq!( + expected, actual, + "\n\nEXPECTED:\n{:#?}\nACTUAL:\n{:#?}\n", + expected, actual + ); + } + } + + #[tokio::test] + async fn test_selector_min() { + let cases = vec![ + ( + selector_min(&DataType::Float64, SelectorOutput::Value), + selector_min(&DataType::Float64, SelectorOutput::Time), + "f64_value", + vec![ + "+------------------------------------+-----------------------------------+", + "| selector_min_value(f64_value,time) | selector_min_time(f64_value,time) |", + "+------------------------------------+-----------------------------------+", + "| 1 | 4000 |", + "+------------------------------------+-----------------------------------+", + "", + ], + ), + ( + selector_min(&DataType::Int64, SelectorOutput::Value), + selector_min(&DataType::Int64, SelectorOutput::Time), + "i64_value", + vec![ + "+------------------------------------+-----------------------------------+", + "| selector_min_value(i64_value,time) | selector_min_time(i64_value,time) |", + "+------------------------------------+-----------------------------------+", + "| 10 | 4000 |", + "+------------------------------------+-----------------------------------+", + "", + ], + ), + ( + selector_min(&DataType::Utf8, SelectorOutput::Value), + selector_min(&DataType::Utf8, SelectorOutput::Time), + "string_value", + vec![ + "+---------------------------------------+--------------------------------------+", + "| selector_min_value(string_value,time) | selector_min_time(string_value,time) |", + "+---------------------------------------+--------------------------------------+", + "| a_one | 4000 |", + "+---------------------------------------+--------------------------------------+", + "", + ], + ), + ( + selector_min(&DataType::Boolean, SelectorOutput::Value), + selector_min(&DataType::Boolean, SelectorOutput::Time), + "bool_value", + vec![ + "+-------------------------------------+------------------------------------+", + "| selector_min_value(bool_value,time) | selector_min_time(bool_value,time) |", + "+-------------------------------------+------------------------------------+", + "| false | 2000 |", + "+-------------------------------------+------------------------------------+", + "", + ], + ) + ]; + + for (val_func, time_func, val_column, expected) in cases.into_iter() { + let args = vec![col(val_column), col("time")]; + let aggs = vec![val_func.call(args.clone()), time_func.call(args)]; + let actual = run_plan(aggs).await; + + assert_eq!( + expected, actual, + "\n\nEXPECTED:\n{:#?}\nACTUAL:\n{:#?}\n", + expected, actual + ); + } + } + + #[tokio::test] + async fn test_selector_max() { + let cases = vec![ + ( + selector_max(&DataType::Float64, SelectorOutput::Value), + selector_max(&DataType::Float64, SelectorOutput::Time), + "f64_value", + vec![ + "+------------------------------------+-----------------------------------+", + "| selector_max_value(f64_value,time) | selector_max_time(f64_value,time) |", + "+------------------------------------+-----------------------------------+", + "| 5 | 5000 |", + "+------------------------------------+-----------------------------------+", + "", + ], + ), + ( + selector_max(&DataType::Int64, SelectorOutput::Value), + selector_max(&DataType::Int64, SelectorOutput::Time), + "i64_value", + vec![ + "+------------------------------------+-----------------------------------+", + "| selector_max_value(i64_value,time) | selector_max_time(i64_value,time) |", + "+------------------------------------+-----------------------------------+", + "| 50 | 5000 |", + "+------------------------------------+-----------------------------------+", + "", + ], + ), + ( + selector_max(&DataType::Utf8, SelectorOutput::Value), + selector_max(&DataType::Utf8, SelectorOutput::Time), + "string_value", + vec![ + "+---------------------------------------+--------------------------------------+", + "| selector_max_value(string_value,time) | selector_max_time(string_value,time) |", + "+---------------------------------------+--------------------------------------+", + "| z_five | 5000 |", + "+---------------------------------------+--------------------------------------+", + "", + ], + ), + ( + selector_max(&DataType::Boolean, SelectorOutput::Value), + selector_max(&DataType::Boolean, SelectorOutput::Time), + "bool_value", + vec![ + "+-------------------------------------+------------------------------------+", + "| selector_max_value(bool_value,time) | selector_max_time(bool_value,time) |", + "+-------------------------------------+------------------------------------+", + "| true | 1000 |", + "+-------------------------------------+------------------------------------+", + "", + ], + ) + ]; + + for (val_func, time_func, val_column, expected) in cases.into_iter() { + let args = vec![col(val_column), col("time")]; + let aggs = vec![val_func.call(args.clone()), time_func.call(args)]; + let actual = run_plan(aggs).await; + + assert_eq!( + expected, actual, + "\n\nEXPECTED:\n{:#?}\nACTUAL:\n{:#?}\n", + expected, actual + ); + } + } + + /// Run a plan against the following input table as "t" + /// + /// +-----------+-----------+--------------+------------+------+ + /// | f64_value | i64_value | string_value | bool_value | time | + /// +-----------+-----------+--------------+------------+------+ + /// | 2 | 20 | two | true | 1000 | + /// | 4 | 40 | four | false | 2000 | + /// | | | | | 3000 | + /// | 1 | 10 | a_one | true | 4000 | + /// | 5 | 50 | z_five | false | 5000 | + /// | 3 | 30 | three | false | 6000 | + /// +-----------+-----------+--------------+------------+------+ + async fn run_plan(aggs: Vec) -> Vec { + // define a schema for input + // (value) and timestamp + let schema = Arc::new(Schema::new(vec![ + Field::new("f64_value", DataType::Float64, false), + Field::new("i64_value", DataType::Int64, false), + Field::new("string_value", DataType::Utf8, false), + Field::new("bool_value", DataType::Boolean, false), + Field::new("time", DataType::Int64, true), + ])); + + // define data in two partitions + let batch1 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Float64Array::from(vec![Some(2.0), Some(4.0), None])), + Arc::new(Int64Array::from(vec![Some(20), Some(40), None])), + Arc::new(StringArray::from(vec![Some("two"), Some("four"), None])), + Arc::new(BooleanArray::from(vec![Some(true), Some(false), None])), + Arc::new(Int64Array::from(vec![1000, 2000, 3000])), + ], + ) + .unwrap(); + + // No values in this batch + let batch2 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Float64Array::from(vec![] as Vec>)), + Arc::new(Int64Array::from(vec![] as Vec>)), + Arc::new(StringArray::from(vec![] as Vec>)), + Arc::new(BooleanArray::from(vec![] as Vec>)), + Arc::new(Int64Array::from(vec![] as Vec>)), + ], + ) + .unwrap(); + + let batch3 = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Float64Array::from(vec![Some(1.0), Some(5.0), Some(3.0)])), + Arc::new(Int64Array::from(vec![Some(10), Some(50), Some(30)])), + Arc::new(StringArray::from(vec![ + Some("a_one"), + Some("z_five"), + Some("three"), + ])), + Arc::new(BooleanArray::from(vec![ + Some(true), + Some(false), + Some(false), + ])), + Arc::new(Int64Array::from(vec![4000, 5000, 6000])), + ], + ) + .unwrap(); + + let provider = + MemTable::try_new(schema.clone(), vec![vec![batch1], vec![batch2, batch3]]).unwrap(); + let mut ctx = ExecutionContext::new(); + ctx.register_table("t", Box::new(provider)); + + let df = ctx.table("t").unwrap(); + let df = df.aggregate(vec![], aggs).unwrap(); + + // execute the query + let record_batches = df.collect().await.unwrap(); + + pretty_format_batches(&record_batches) + .unwrap() + .split('\n') + .map(|s| s.to_owned()) + .collect() + } +} diff --git a/query/src/func/selectors/internal.rs b/query/src/func/selectors/internal.rs new file mode 100644 index 0000000000..fc3484b0c5 --- /dev/null +++ b/query/src/func/selectors/internal.rs @@ -0,0 +1,623 @@ +//! Internal implementaton of InfluxDB "Selector" Functions +//! Tests are in selector module +//! +//! This module is implemented with macros rather than generic types; +//! I tried valiantly (at least in my mind) to use Generics , but I +//! couldn't get the traits to work out correctly (as Bool, I64/F64 +//! and Utf8 arrow types don't share enough in common). + +use std::fmt::Debug; + +use arrow_deps::{ + arrow::compute::kernels::aggregate::{ + max as array_max, max_string as array_max_string, min as array_min, + min_string as array_min_string, + }, + arrow::{ + array::{Array, ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray}, + datatypes::DataType, + }, + datafusion::{error::Result as DataFusionResult, scalar::ScalarValue}, +}; + +use super::{Selector, SelectorOutput}; + +/// Arrow aggregate kernels don't include a min or max for +/// bools... Which is a silly operation anyways, when you think about +/// it. However, we include it here for completeness. +/// +/// This is some version of `min_max_helper` from +/// aggregate.rs in arrow +/// +/// This code should be contribited upstream and then removed from +/// here when arrow gets this feature: +/// https://issues.apache.org/jira/browse/ARROW-10944 +fn min_max_helper(array: &BooleanArray, cmp: F) -> Option +where + F: Fn(bool, bool) -> bool, +{ + let null_count = array.null_count(); + + // Includes case array.len() == 0 + if null_count == array.len() { + return None; + } + + // optimized path for arrays without null values + let m0: Option = array.iter().next().unwrap(); + + array.iter().fold(m0, |max, item| match (max, item) { + (Some(max), Some(item)) => Some(if cmp(max, item) { item } else { max }), + (Some(max), None) => Some(max), + (None, Some(item)) => Some(item), + (None, None) => None, + }) +} + +fn array_min_bool(array: &BooleanArray) -> Option { + // a > b == a & !b + min_max_helper(array, |a, b| a & !b) +} + +fn array_max_bool(array: &BooleanArray) -> Option { + // a < b == !a & b + min_max_helper(array, |a, b| !a & b) +} + +/// Trait for comparing values in arrays with their native +/// representation. This so the same comparison expression can be used +/// in the macro definitions. +/// +/// Note the only one that is different String <--> &str +trait LtVal { + /// return true if v is less than self + fn lt_val(&self, v: &T) -> bool; +} + +impl LtVal for f64 { + fn lt_val(&self, v: &Self) -> bool { + self < v + } +} + +impl LtVal for i64 { + fn lt_val(&self, v: &Self) -> bool { + self < v + } +} + +impl LtVal for bool { + fn lt_val(&self, v: &Self) -> bool { + self < v + } +} + +impl LtVal for &str { + fn lt_val(&self, v: &String) -> bool { + *self < v.as_str() + } +} + +impl LtVal<&str> for String { + fn lt_val(&self, v: &&str) -> bool { + self.as_str() < *v + } +} + +/// Trait for comparing converting the result of aggregate kernels to their +/// native representation Note the only one that is different is &str --> String +trait ToState { + fn to_state(&self) -> T; +} + +impl ToState for f64 { + fn to_state(&self) -> Self { + *self + } +} + +impl ToState for i64 { + fn to_state(&self) -> Self { + *self + } +} + +impl ToState for bool { + fn to_state(&self) -> Self { + *self + } +} + +impl ToState for &str { + fn to_state(&self) -> String { + (*self).to_owned() + } +} + +macro_rules! make_first_selector { + ($STRUCTNAME:ident, $RUSTTYPE:ident, $ARROWTYPE:expr, $ARRTYPE:ident, $MINFUNC:ident, $TO_SCALARVALUE: expr) => { + #[derive(Debug)] + pub struct $STRUCTNAME { + value: Option<$RUSTTYPE>, + time: Option, + } + + impl Default for $STRUCTNAME { + fn default() -> Self { + Self { + value: None, + time: None, + } + } + } + + impl Selector for $STRUCTNAME { + fn value_data_type() -> DataType { + $ARROWTYPE + } + + fn datafusion_state(&self) -> DataFusionResult> { + Ok(vec![ + $TO_SCALARVALUE(self.value.clone()), + ScalarValue::Int64(self.time), + ]) + } + + fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult { + match output { + SelectorOutput::Value => Ok($TO_SCALARVALUE(self.value.clone())), + SelectorOutput::Time => Ok(ScalarValue::Int64(self.time)), + } + } + + fn update_batch( + &mut self, + value_arr: &ArrayRef, + time_arr: &ArrayRef, + ) -> DataFusionResult<()> { + let value_arr = value_arr + .as_any() + .downcast_ref::<$ARRTYPE>() + // the input type arguments should be ensured by datafusion + .expect("First argument was value"); + + let time_arr = time_arr + .as_any() + .downcast_ref::() + // the input type arguments should be ensured by datafusion + .expect("Second argument was time"); + + let cur_min_time = $MINFUNC(&time_arr); + + let need_update = match (&self.time, &cur_min_time) { + (Some(time), Some(cur_min_time)) => cur_min_time < time, + // No existing minimum, so update needed + (None, Some(_)) => true, + // No actual minimum time found, so no update needed + (_, None) => false, + }; + + if need_update { + let index = time_arr + .iter() + // arrow doesn't tell us what index had the + // minimum, so need to find it ourselves + .enumerate() + .find(|(_, time)| cur_min_time == *time) + .map(|(idx, _)| idx) + .unwrap(); // value always exists + + self.time = cur_min_time; + self.value = if value_arr.is_null(index) { + None + } else { + Some(value_arr.value(index).to_owned()) + }; + } + + Ok(()) + } + } + }; +} + +macro_rules! make_last_selector { + ($STRUCTNAME:ident, $RUSTTYPE:ident, $ARROWTYPE:expr, $ARRTYPE:ident, $MAXFUNC:ident, $TO_SCALARVALUE: expr) => { + #[derive(Debug)] + pub struct $STRUCTNAME { + value: Option<$RUSTTYPE>, + time: Option, + } + + impl Default for $STRUCTNAME { + fn default() -> Self { + Self { + value: None, + time: None, + } + } + } + + impl Selector for $STRUCTNAME { + fn value_data_type() -> DataType { + $ARROWTYPE + } + + fn datafusion_state(&self) -> DataFusionResult> { + Ok(vec![ + $TO_SCALARVALUE(self.value.clone()), + ScalarValue::Int64(self.time), + ]) + } + + fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult { + match output { + SelectorOutput::Value => Ok($TO_SCALARVALUE(self.value.clone())), + SelectorOutput::Time => Ok(ScalarValue::Int64(self.time)), + } + } + + fn update_batch( + &mut self, + value_arr: &ArrayRef, + time_arr: &ArrayRef, + ) -> DataFusionResult<()> { + let value_arr = value_arr + .as_any() + .downcast_ref::<$ARRTYPE>() + // the input type arguments should be ensured by datafusion + .expect("First argument was value"); + + let time_arr = time_arr + .as_any() + .downcast_ref::() + // the input type arguments should be ensured by datafusion + .expect("Second argument was time"); + + let cur_max_time = $MAXFUNC(&time_arr); + + let need_update = match (&self.time, &cur_max_time) { + (Some(time), Some(cur_max_time)) => time < cur_max_time, + // No existing maximum, so update needed + (None, Some(_)) => true, + // No actual maximum value found, so no update needed + (_, None) => false, + }; + + if need_update { + let index = time_arr + .iter() + // arrow doesn't tell us what index had the + // maximum, so need to find it ourselves + .enumerate() + .find(|(_, time)| cur_max_time == *time) + .map(|(idx, _)| idx) + .unwrap(); // value always exists + + self.time = cur_max_time; + self.value = if value_arr.is_null(index) { + None + } else { + Some(value_arr.value(index).to_owned()) + }; + } + + Ok(()) + } + } + }; +} + +macro_rules! make_min_selector { + ($STRUCTNAME:ident, $RUSTTYPE:ident, $ARROWTYPE:expr, $ARRTYPE:ident, $MINFUNC:ident, $TO_SCALARVALUE: expr) => { + #[derive(Debug)] + pub struct $STRUCTNAME { + value: Option<$RUSTTYPE>, + time: Option, + } + + impl Default for $STRUCTNAME { + fn default() -> Self { + Self { + value: None, + time: None, + } + } + } + + impl Selector for $STRUCTNAME { + fn value_data_type() -> DataType { + $ARROWTYPE + } + + fn datafusion_state(&self) -> DataFusionResult> { + Ok(vec![ + $TO_SCALARVALUE(self.value.clone()), + ScalarValue::Int64(self.time), + ]) + } + + fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult { + match output { + SelectorOutput::Value => Ok($TO_SCALARVALUE(self.value.clone())), + SelectorOutput::Time => Ok(ScalarValue::Int64(self.time)), + } + } + + fn update_batch( + &mut self, + value_arr: &ArrayRef, + time_arr: &ArrayRef, + ) -> DataFusionResult<()> { + let value_arr = value_arr + .as_any() + .downcast_ref::<$ARRTYPE>() + // the input type arguments should be ensured by datafusion + .expect("First argument was value"); + + let time_arr = time_arr + .as_any() + .downcast_ref::() + // the input type arguments should be ensured by datafusion + .expect("Second argument was time"); + + let cur_min_value = $MINFUNC(&value_arr); + + let need_update = match (&self.value, cur_min_value) { + (Some(value), Some(cur_min_value)) => cur_min_value.lt_val(value), + // No existing minimum time, so update needed + (None, Some(_)) => true, + // No actual minimum time found, so no update needed + (_, None) => false, + }; + + if need_update { + let index = value_arr + .iter() + // arrow doesn't tell us what index had the + // minimum, so need to find it ourselves + .enumerate() + .find(|(_, value)| *value == cur_min_value) + .map(|(idx, _)| idx) + .unwrap(); // value always exists + + self.value = cur_min_value.map(|v| v.to_state()); + // Note: time should never be null but handle it anyways + self.time = if time_arr.is_null(index) { + None + } else { + Some(time_arr.value(index)) + }; + } + Ok(()) + } + } + }; +} + +macro_rules! make_max_selector { + ($STRUCTNAME:ident, $RUSTTYPE:ident, $ARROWTYPE:expr, $ARRTYPE:ident, $MAXFUNC:ident, $TO_SCALARVALUE: expr) => { + #[derive(Debug)] + pub struct $STRUCTNAME { + value: Option<$RUSTTYPE>, + time: Option, + } + + impl Default for $STRUCTNAME { + fn default() -> Self { + Self { + value: None, + time: None, + } + } + } + + impl Selector for $STRUCTNAME { + fn value_data_type() -> DataType { + $ARROWTYPE + } + + fn datafusion_state(&self) -> DataFusionResult> { + Ok(vec![ + $TO_SCALARVALUE(self.value.clone()), + ScalarValue::Int64(self.time), + ]) + } + + fn evaluate(&self, output: &SelectorOutput) -> DataFusionResult { + match output { + SelectorOutput::Value => Ok($TO_SCALARVALUE(self.value.clone())), + SelectorOutput::Time => Ok(ScalarValue::Int64(self.time)), + } + } + + fn update_batch( + &mut self, + value_arr: &ArrayRef, + time_arr: &ArrayRef, + ) -> DataFusionResult<()> { + let value_arr = value_arr + .as_any() + .downcast_ref::<$ARRTYPE>() + // the input type arguments should be ensured by datafusion + .expect("First argument was value"); + + let time_arr = time_arr + .as_any() + .downcast_ref::() + // the input type arguments should be ensured by datafusion + .expect("Second argument was time"); + + let cur_max_value = $MAXFUNC(&value_arr); + + let need_update = match (&self.value, &cur_max_value) { + (Some(value), Some(cur_max_value)) => value.lt_val(cur_max_value), + // No existing maxmimum value, so update needed + (None, Some(_)) => true, + // No actual maximum value found, so no update needed + (_, None) => false, + }; + + if need_update { + let index = value_arr + .iter() + // arrow doesn't tell us what index had the + // maximum, so need to find it ourselves + .enumerate() + .find(|(_, value)| cur_max_value == *value) + .map(|(idx, _)| idx) + .unwrap(); // value always exists + + self.value = cur_max_value.map(|v| v.to_state()); + // Note: time should never be null but handle it anyways + self.time = if time_arr.is_null(index) { + None + } else { + Some(time_arr.value(index)) + }; + } + Ok(()) + } + } + }; +} + +// FIRST + +make_first_selector!( + F64FirstSelector, + f64, + DataType::Float64, + Float64Array, + array_min, + ScalarValue::Float64 +); +make_first_selector!( + I64FirstSelector, + i64, + DataType::Int64, + Int64Array, + array_min, + ScalarValue::Int64 +); +make_first_selector!( + Utf8FirstSelector, + String, + DataType::Utf8, + StringArray, + array_min, + ScalarValue::Utf8 +); +make_first_selector!( + BooleanFirstSelector, + bool, + DataType::Boolean, + BooleanArray, + array_min, + ScalarValue::Boolean +); + +// LAST + +make_last_selector!( + F64LastSelector, + f64, + DataType::Float64, + Float64Array, + array_max, + ScalarValue::Float64 +); +make_last_selector!( + I64LastSelector, + i64, + DataType::Int64, + Int64Array, + array_max, + ScalarValue::Int64 +); +make_last_selector!( + Utf8LastSelector, + String, + DataType::Utf8, + StringArray, + array_max, + ScalarValue::Utf8 +); +make_last_selector!( + BooleanLastSelector, + bool, + DataType::Boolean, + BooleanArray, + array_max, + ScalarValue::Boolean +); + +// MIN + +make_min_selector!( + F64MinSelector, + f64, + DataType::Float64, + Float64Array, + array_min, + ScalarValue::Float64 +); +make_min_selector!( + I64MinSelector, + i64, + DataType::Int64, + Int64Array, + array_min, + ScalarValue::Int64 +); +make_min_selector!( + Utf8MinSelector, + String, + DataType::Utf8, + StringArray, + array_min_string, + ScalarValue::Utf8 +); +make_min_selector!( + BooleanMinSelector, + bool, + DataType::Boolean, + BooleanArray, + array_min_bool, + ScalarValue::Boolean +); + +// MAX + +make_max_selector!( + F64MaxSelector, + f64, + DataType::Float64, + Float64Array, + array_max, + ScalarValue::Float64 +); +make_max_selector!( + I64MaxSelector, + i64, + DataType::Int64, + Int64Array, + array_max, + ScalarValue::Int64 +); +make_max_selector!( + Utf8MaxSelector, + String, + DataType::Utf8, + StringArray, + array_max_string, + ScalarValue::Utf8 +); +make_max_selector!( + BooleanMaxSelector, + bool, + DataType::Boolean, + BooleanArray, + array_max_bool, + ScalarValue::Boolean +); diff --git a/query/src/lib.rs b/query/src/lib.rs index 5a0db9dce8..ceb3f98be1 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -14,6 +14,7 @@ use influxdb_line_protocol::ParsedLine; use std::{fmt::Debug, sync::Arc}; pub mod exec; +pub mod func; pub mod group_by; pub mod id; pub mod predicate;