chore: add APIs to find selector user-defined aggregate functions

This will be used to complete queries that have selector semantics,
meaning they project a single selector function and therefore
use the timestamp for the time column.
pull/24376/head
Stuart Carnie 2023-04-15 13:50:23 +10:00
parent 42074e7a9d
commit d11097cf18
No known key found for this signature in database
GPG Key ID: 848D9C9718D78B4F
1 changed files with 48 additions and 0 deletions

View File

@ -1,11 +1,13 @@
use crate::plan::util_copy;
use arrow::datatypes::DataType;
use datafusion::common::tree_node::{TreeNode, VisitRecursion};
use datafusion::common::{DFSchema, DFSchemaRef, DataFusionError, Result};
use datafusion::logical_expr::utils::expr_as_column_expr;
use datafusion::logical_expr::{coalesce, lit, Expr, ExprSchemable, LogicalPlan, Operator};
use influxdb_influxql_parser::expression::BinaryOperator;
use influxdb_influxql_parser::literal::Number;
use influxdb_influxql_parser::string::Regex;
use once_cell::sync::Lazy;
use query_functions::clean_non_meta_escapes;
use schema::Schema;
use std::sync::Arc;
@ -117,3 +119,49 @@ pub(crate) fn rebase_expr(
})
}
}
/// Returns `true` if `expr` is an [`Expr::AggregateUDF`] for one of
/// the selector functions.
pub(crate) fn is_selector_aggregate_udf(expr: &Expr) -> bool {
static FUNCTIONS: Lazy<Vec<&'static str>> = Lazy::new(|| {
vec![
"selector_first",
"selector_last",
"selector_max",
"selector_min",
]
});
matches!(expr, Expr::AggregateUDF { fun, ..} if FUNCTIONS.contains(&fun.name.as_str()))
}
/// Collect all the references to selector functions, such as `selector_last`.
/// They are returned in order of occurrence (depth first), with duplicates omitted.
pub(crate) fn find_aggregate_selector_exprs(exprs: &[Expr]) -> Vec<Expr> {
exprs
.iter()
.flat_map(|expr| {
// Contains a list of unique selector UDAFs
let mut exprs = vec![];
expr.apply(&mut |expr| {
if is_selector_aggregate_udf(expr) {
if !exprs.contains(expr) {
exprs.push(expr.clone());
}
Ok(VisitRecursion::Skip)
} else {
Ok(VisitRecursion::Continue)
}
})
.unwrap();
exprs
})
.fold(vec![], |mut acc, expr| {
if !acc.contains(&expr) {
acc.push(expr)
}
acc
})
}