refactor: use struct-style selectors in InfluxRPC (#7742)

Some clean up before I implement the core logic for #7533.
pull/24376/head
Marco Neumann 2023-05-04 11:08:46 +02:00 committed by GitHub
parent 90186e1937
commit e1825ec45b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 29 additions and 31 deletions

View File

@ -5,8 +5,11 @@ use data_types::ChunkId;
use datafusion::{ use datafusion::{
common::DFSchemaRef, common::DFSchemaRef,
error::DataFusionError, error::DataFusionError,
logical_expr::{utils::exprlist_to_columns, ExprSchemable, LogicalPlan, LogicalPlanBuilder}, logical_expr::{
utils::exprlist_to_columns, ExprSchemable, GetIndexedField, LogicalPlan, LogicalPlanBuilder,
},
prelude::{when, Column, Expr}, prelude::{when, Column, Expr},
scalar::ScalarValue,
}; };
use datafusion_util::AsExpr; use datafusion_util::AsExpr;
use futures::{Stream, StreamExt, TryStreamExt}; use futures::{Stream, StreamExt, TryStreamExt};
@ -35,7 +38,9 @@ use predicate::{
use query_functions::{ use query_functions::{
group_by::{Aggregate, WindowDuration}, group_by::{Aggregate, WindowDuration},
make_window_bound_expr, make_window_bound_expr,
selectors::{selector_first, selector_last, selector_max, selector_min, SelectorOutput}, selectors::{
struct_selector_first, struct_selector_last, struct_selector_max, struct_selector_min,
},
}; };
use schema::{InfluxColumnType, Projection, Schema, TIME_COLUMN_NAME}; use schema::{InfluxColumnType, Projection, Schema, TIME_COLUMN_NAME};
use snafu::{ensure, OptionExt, ResultExt, Snafu}; use snafu::{ensure, OptionExt, ResultExt, Snafu};
@ -1636,7 +1641,6 @@ pub(crate) struct AggExprs {
struct FieldExpr<'a> { struct FieldExpr<'a> {
expr: Expr, expr: Expr,
name: &'a str, name: &'a str,
datatype: &'a DataType,
} }
// Returns an iterator of fields from schema that pass the predicate. If there // Returns an iterator of fields from schema that pass the predicate. If there
@ -1674,7 +1678,6 @@ fn filtered_fields_iter<'a>(
Some(FieldExpr { Some(FieldExpr {
expr: expr.alias(f.name()), expr: expr.alias(f.name()),
name: f.name(), name: f.name(),
datatype: f.data_type(),
}) })
}) })
} }
@ -1735,22 +1738,25 @@ impl AggExprs {
let mut field_list = Vec::new(); let mut field_list = Vec::new();
for field in filtered_fields_iter(schema, predicate) { for field in filtered_fields_iter(schema, predicate) {
let selector = make_selector_expr(agg, field.clone())?;
let field_name = field.name; let field_name = field.name;
agg_exprs.push(make_selector_expr( agg_exprs.push(
agg, Expr::GetIndexedField(GetIndexedField {
SelectorOutput::Value, expr: Box::new(selector.clone()),
field.clone(), key: ScalarValue::from("value"),
field_name, })
)?); .alias(field_name),
);
let time_column_name = format!("{TIME_COLUMN_NAME}_{field_name}"); let time_column_name = format!("{TIME_COLUMN_NAME}_{field_name}");
agg_exprs.push(
agg_exprs.push(make_selector_expr( Expr::GetIndexedField(GetIndexedField {
agg, expr: Box::new(selector.clone()),
SelectorOutput::Time, key: ScalarValue::from("time"),
field, })
&time_column_name, .alias(&time_column_name),
)?); );
field_list.push(( field_list.push((
Arc::from(field_name), // value name Arc::from(field_name), // value name
@ -1782,7 +1788,6 @@ impl AggExprs {
agg, agg,
FieldExpr { FieldExpr {
expr: field.name().as_expr(), expr: field.name().as_expr(),
datatype: field.data_type(),
name: field.name(), name: field.name(),
}, },
) )
@ -1866,23 +1871,16 @@ fn make_agg_expr(agg: Aggregate, field_expr: FieldExpr<'_>) -> Result<Expr> {
/// ELSE NULL /// ELSE NULL
/// END) as col_name /// END) as col_name
/// ///
fn make_selector_expr<'a>( fn make_selector_expr(agg: Aggregate, field: FieldExpr<'_>) -> Result<Expr> {
agg: Aggregate,
output: SelectorOutput,
field: FieldExpr<'a>,
col_name: &'a str,
) -> Result<Expr> {
let uda = match agg { let uda = match agg {
Aggregate::First => selector_first(field.datatype, output), Aggregate::First => struct_selector_first(),
Aggregate::Last => selector_last(field.datatype, output), Aggregate::Last => struct_selector_last(),
Aggregate::Min => selector_min(field.datatype, output), Aggregate::Min => struct_selector_min(),
Aggregate::Max => selector_max(field.datatype, output), Aggregate::Max => struct_selector_max(),
_ => return InternalAggregateNotSelectorSnafu { agg }.fail(), _ => return InternalAggregateNotSelectorSnafu { agg }.fail(),
}; };
Ok(uda Ok(uda.call(vec![field.expr, TIME_COLUMN_NAME.as_expr()]))
.call(vec![field.expr, TIME_COLUMN_NAME.as_expr()])
.alias(col_name))
} }
/// Orders chunks so it is likely that the ones that already have cached data are pulled first. /// Orders chunks so it is likely that the ones that already have cached data are pulled first.