From e1825ec45b8ee6fa1fe3286d2c53e068eac68099 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 4 May 2023 11:08:46 +0200 Subject: [PATCH] refactor: use struct-style selectors in InfluxRPC (#7742) Some clean up before I implement the core logic for #7533. --- iox_query_influxrpc/src/lib.rs | 60 ++++++++++++++++------------------ 1 file changed, 29 insertions(+), 31 deletions(-) diff --git a/iox_query_influxrpc/src/lib.rs b/iox_query_influxrpc/src/lib.rs index 1532e5e039..23fb1bfec7 100644 --- a/iox_query_influxrpc/src/lib.rs +++ b/iox_query_influxrpc/src/lib.rs @@ -5,8 +5,11 @@ use data_types::ChunkId; use datafusion::{ common::DFSchemaRef, error::DataFusionError, - logical_expr::{utils::exprlist_to_columns, ExprSchemable, LogicalPlan, LogicalPlanBuilder}, + logical_expr::{ + utils::exprlist_to_columns, ExprSchemable, GetIndexedField, LogicalPlan, LogicalPlanBuilder, + }, prelude::{when, Column, Expr}, + scalar::ScalarValue, }; use datafusion_util::AsExpr; use futures::{Stream, StreamExt, TryStreamExt}; @@ -35,7 +38,9 @@ use predicate::{ use query_functions::{ group_by::{Aggregate, WindowDuration}, make_window_bound_expr, - selectors::{selector_first, selector_last, selector_max, selector_min, SelectorOutput}, + selectors::{ + struct_selector_first, struct_selector_last, struct_selector_max, struct_selector_min, + }, }; use schema::{InfluxColumnType, Projection, Schema, TIME_COLUMN_NAME}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; @@ -1636,7 +1641,6 @@ pub(crate) struct AggExprs { struct FieldExpr<'a> { expr: Expr, name: &'a str, - datatype: &'a DataType, } // Returns an iterator of fields from schema that pass the predicate. If there @@ -1674,7 +1678,6 @@ fn filtered_fields_iter<'a>( Some(FieldExpr { expr: expr.alias(f.name()), name: f.name(), - datatype: f.data_type(), }) }) } @@ -1735,22 +1738,25 @@ impl AggExprs { let mut field_list = Vec::new(); for field in filtered_fields_iter(schema, predicate) { + let selector = make_selector_expr(agg, field.clone())?; + let field_name = field.name; - agg_exprs.push(make_selector_expr( - agg, - SelectorOutput::Value, - field.clone(), - field_name, - )?); + agg_exprs.push( + Expr::GetIndexedField(GetIndexedField { + expr: Box::new(selector.clone()), + key: ScalarValue::from("value"), + }) + .alias(field_name), + ); let time_column_name = format!("{TIME_COLUMN_NAME}_{field_name}"); - - agg_exprs.push(make_selector_expr( - agg, - SelectorOutput::Time, - field, - &time_column_name, - )?); + agg_exprs.push( + Expr::GetIndexedField(GetIndexedField { + expr: Box::new(selector.clone()), + key: ScalarValue::from("time"), + }) + .alias(&time_column_name), + ); field_list.push(( Arc::from(field_name), // value name @@ -1782,7 +1788,6 @@ impl AggExprs { agg, FieldExpr { expr: field.name().as_expr(), - datatype: field.data_type(), name: field.name(), }, ) @@ -1866,23 +1871,16 @@ fn make_agg_expr(agg: Aggregate, field_expr: FieldExpr<'_>) -> Result { /// ELSE NULL /// END) as col_name /// -fn make_selector_expr<'a>( - agg: Aggregate, - output: SelectorOutput, - field: FieldExpr<'a>, - col_name: &'a str, -) -> Result { +fn make_selector_expr(agg: Aggregate, field: FieldExpr<'_>) -> Result { let uda = match agg { - Aggregate::First => selector_first(field.datatype, output), - Aggregate::Last => selector_last(field.datatype, output), - Aggregate::Min => selector_min(field.datatype, output), - Aggregate::Max => selector_max(field.datatype, output), + Aggregate::First => struct_selector_first(), + Aggregate::Last => struct_selector_last(), + Aggregate::Min => struct_selector_min(), + Aggregate::Max => struct_selector_max(), _ => return InternalAggregateNotSelectorSnafu { agg }.fail(), }; - Ok(uda - .call(vec![field.expr, TIME_COLUMN_NAME.as_expr()]) - .alias(col_name)) + Ok(uda.call(vec![field.expr, TIME_COLUMN_NAME.as_expr()])) } /// Orders chunks so it is likely that the ones that already have cached data are pulled first.