From b79009b128ee52fd4b43be8120d05989c0a077aa Mon Sep 17 00:00:00 2001 From: "Christopher M. Wolff" Date: Tue, 18 Jul 2023 10:14:39 -0700 Subject: [PATCH] refactor: refactor `Schemas` type in InfluxQL planner (#8237) * refactor: refactor `Schemas` type in InfluxQL planner * chore: update after merging main --------- Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- iox_query_influxql/src/plan/planner.rs | 190 ++++++++---------- .../src/plan/planner_rewrite_expression.rs | 74 ++++--- iox_query_influxql/src/plan/util.rs | 56 +++++- 3 files changed, 174 insertions(+), 146 deletions(-) diff --git a/iox_query_influxql/src/plan/planner.rs b/iox_query_influxql/src/plan/planner.rs index 07d996a457..8936106e62 100644 --- a/iox_query_influxql/src/plan/planner.rs +++ b/iox_query_influxql/src/plan/planner.rs @@ -14,7 +14,7 @@ use crate::plan::udf::{ cumulative_sum, derivative, difference, find_window_udfs, moving_average, non_negative_derivative, non_negative_difference, }; -use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, Schemas}; +use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, IQLSchema}; use crate::plan::var_ref::var_ref_data_type_to_data_type; use crate::plan::{planner_rewrite_expression, udf, util_copy}; use crate::window::{CUMULATIVE_SUM, DIFFERENCE, NON_NEGATIVE_DIFFERENCE, PERCENT_ROW_NUMBER}; @@ -91,7 +91,6 @@ use std::ops::{Bound, ControlFlow, Deref, Not, Range}; use std::str::FromStr; use std::sync::Arc; -use super::ir::DataSourceSchema; use super::parse_regex; use super::util::contains_expr; use super::util_copy::clone_with_replacement; @@ -709,16 +708,14 @@ impl<'a> InfluxQLToLogicalPlan<'a> { continue; }; - let schemas = Schemas::new(plan.schema())?; - let ds_schema = ds.schema(self.s)?; + let schema = IQLSchema::new_from_ds_schema(plan.schema(), ds.schema(self.s)?)?; let plan = self.plan_condition_time_range( ctx.condition, ctx.extended_time_range(), plan, - &schemas, - &ds_schema, + &schema, )?; - plans.push((plan, ds_schema)); + plans.push((plan, schema)); } Ok(match plans.len() { @@ -794,10 +791,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> { /// Plan "Raw" SELECT queriers, These are queries that have no grouping /// and call only scalar functions. fn project_select_raw(&self, input: LogicalPlan, fields: &[Field]) -> Result { - let schemas = Schemas::new(input.schema())?; + let schema = IQLSchema::new_from_fields(input.schema(), fields)?; // Transform InfluxQL AST field expressions to a list of DataFusion expressions. - let select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?; + let select_exprs = self.field_list_to_exprs(&input, fields, &schema)?; // Wrap the plan in a `LogicalPlan::Projection` from the select expressions project(input, select_exprs) @@ -810,10 +807,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> { input: LogicalPlan, fields: &[Field], ) -> Result { - let schemas = Schemas::new(input.schema())?; + let schema = IQLSchema::new_from_fields(input.schema(), fields)?; // Transform InfluxQL AST field expressions to a list of DataFusion expressions. - let mut select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?; + let mut select_exprs = self.field_list_to_exprs(&input, fields, &schema)?; // This is a special case, where exactly one column can be projected with a `DISTINCT` // clause or the `distinct` function. @@ -847,10 +844,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> { fields: &[Field], group_by_tag_set: &[&str], ) -> Result { - let schemas = Schemas::new(input.schema())?; + let schema = IQLSchema::new_from_fields(input.schema(), fields)?; // Transform InfluxQL AST field expressions to a list of DataFusion expressions. - let select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?; + let select_exprs = self.field_list_to_exprs(&input, fields, &schema)?; let (plan, select_exprs) = self.select_aggregate(ctx, input, fields, select_exprs, group_by_tag_set)?; @@ -868,10 +865,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> { fields: &[Field], group_by_tag_set: &[&str], ) -> Result { - let schemas = Schemas::new(input.schema())?; + let schema = IQLSchema::new_from_fields(input.schema(), fields)?; // Transform InfluxQL AST field expressions to a list of DataFusion expressions. - let select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?; + let select_exprs = self.field_list_to_exprs(&input, fields, &schema)?; let (plan, select_exprs) = self.select_window(ctx, input, select_exprs, group_by_tag_set)?; @@ -906,10 +903,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> { fields: &[Field], group_by_tag_set: &[&str], ) -> Result { - let schemas = Schemas::new(input.schema())?; + let schema = IQLSchema::new_from_fields(input.schema(), fields)?; // Transform InfluxQL AST field expressions to a list of DataFusion expressions. - let select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?; + let select_exprs = self.field_list_to_exprs(&input, fields, &schema)?; let (plan, select_exprs) = self.select_aggregate(ctx, input, fields, select_exprs, group_by_tag_set)?; @@ -950,7 +947,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { fields: &[Field], group_by_tag_set: &[&str], ) -> Result { - let schemas = Schemas::new(input.schema())?; + let schema = IQLSchema::new_from_fields(input.schema(), fields)?; let (selector_index, field_key, plan) = match Selector::find_enumerated(fields)? { (_, Selector::First { .. }) @@ -1024,7 +1021,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { }); // Transform InfluxQL AST field expressions to a list of DataFusion expressions. - let select_exprs = self.field_list_to_exprs(&plan, fields_vec.as_slice(), &schemas)?; + let select_exprs = self.field_list_to_exprs(&plan, fields_vec.as_slice(), &schema)?; // Wrap the plan in a `LogicalPlan::Projection` from the select expressions project(plan, select_exprs) @@ -1040,7 +1037,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { fields: &[Field], group_by_tag_set: &[&str], ) -> Result { - let schemas = Schemas::new(input.schema())?; + let schema = IQLSchema::new_from_fields(input.schema(), fields)?; let (selector_index, is_bottom, field_key, tag_keys, narg) = match Selector::find_enumerated(fields)? { @@ -1095,7 +1092,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { } // Transform InfluxQL AST field expressions to a list of DataFusion expressions. - let select_exprs = self.field_list_to_exprs(&input, fields_vec.as_slice(), &schemas)?; + let select_exprs = self.field_list_to_exprs(&input, fields_vec.as_slice(), &schema)?; let plan = if !tag_keys.is_empty() { self.select_first(ctx, input, order_by, internal_group_by.as_slice(), 1)? @@ -1695,7 +1692,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { &self, plan: &LogicalPlan, fields: &[Field], - schemas: &Schemas, + schema: &IQLSchema<'_>, ) -> Result> { let mut names: HashMap<&str, usize> = HashMap::new(); fields @@ -1715,7 +1712,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { }; new_field }) - .map(|field| self.field_to_df_expr(&field, plan, schemas)) + .map(|field| self.field_to_df_expr(&field, plan, schema)) .collect() } @@ -1726,10 +1723,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> { &self, field: &Field, plan: &LogicalPlan, - schemas: &Schemas, + schema: &IQLSchema<'_>, ) -> Result { - let expr = self.expr_to_df_expr(ExprScope::Projection, &field.expr, schemas)?; - let expr = planner_rewrite_expression::rewrite_field_expr(expr, schemas)?; + let expr = self.expr_to_df_expr(ExprScope::Projection, &field.expr, schema)?; + let expr = planner_rewrite_expression::rewrite_field_expr(expr, schema)?; normalize_col(expr.alias(&field.name), plan) } @@ -1737,16 +1734,14 @@ impl<'a> InfluxQLToLogicalPlan<'a> { fn conditional_to_df_expr( &self, iql: &ConditionalExpression, - schemas: &Schemas, + schema: &IQLSchema<'_>, ) -> Result { match iql { ConditionalExpression::Expr(expr) => { - self.expr_to_df_expr(ExprScope::Where, expr, schemas) + self.expr_to_df_expr(ExprScope::Where, expr, schema) } - ConditionalExpression::Binary(expr) => { - self.binary_conditional_to_df_expr(expr, schemas) - } - ConditionalExpression::Grouped(e) => self.conditional_to_df_expr(e, schemas), + ConditionalExpression::Binary(expr) => self.binary_conditional_to_df_expr(expr, schema), + ConditionalExpression::Grouped(e) => self.conditional_to_df_expr(e, schema), } } @@ -1754,20 +1749,25 @@ impl<'a> InfluxQLToLogicalPlan<'a> { fn binary_conditional_to_df_expr( &self, expr: &ConditionalBinary, - schemas: &Schemas, + schema: &IQLSchema<'_>, ) -> Result { let ConditionalBinary { lhs, op, rhs } = expr; Ok(binary_expr( - self.conditional_to_df_expr(lhs, schemas)?, + self.conditional_to_df_expr(lhs, schema)?, conditional_op_to_operator(*op)?, - self.conditional_to_df_expr(rhs, schemas)?, + self.conditional_to_df_expr(rhs, schema)?, )) } /// Map an InfluxQL [`IQLExpr`] to a DataFusion [`Expr`]. - fn expr_to_df_expr(&self, scope: ExprScope, iql: &IQLExpr, schemas: &Schemas) -> Result { - let schema = &schemas.df_schema; + fn expr_to_df_expr( + &self, + scope: ExprScope, + iql: &IQLExpr, + schema: &IQLSchema<'_>, + ) -> Result { + let df_schema = &schema.df_schema; match iql { // rewriter is expected to expand wildcard expressions IQLExpr::Wildcard(_) => error::internal("unexpected wildcard in projection"), @@ -1784,7 +1784,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { "time".as_expr() } (ExprScope::Projection, "time") => "time".as_expr(), - (_, name) => match schema + (_, name) => match df_schema .fields_with_unqualified_name(name) .first() .map(|f| f.data_type().clone()) @@ -1808,7 +1808,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { // and it is safe to unconditionally unwrap, as the // `is_numeric_type` call guarantees it can be mapped to // an Arrow DataType - column.cast_to(&dst_type, &schemas.df_schema)? + column.cast_to(&dst_type, &schema.df_schema)? } else { // If the cast is incompatible, evaluates to NULL Expr::Literal(ScalarValue::Null) @@ -1846,9 +1846,9 @@ impl<'a> InfluxQLToLogicalPlan<'a> { }, // A DISTINCT clause should have been replaced by `rewrite_statement`. IQLExpr::Distinct(_) => error::internal("distinct expression"), - IQLExpr::Call(call) => self.call_to_df_expr(scope, call, schemas), - IQLExpr::Binary(expr) => self.arithmetic_expr_to_df_expr(scope, expr, schemas), - IQLExpr::Nested(e) => self.expr_to_df_expr(scope, e, schemas), + IQLExpr::Call(call) => self.call_to_df_expr(scope, call, schema), + IQLExpr::Binary(expr) => self.arithmetic_expr_to_df_expr(scope, expr, schema), + IQLExpr::Nested(e) => self.expr_to_df_expr(scope, e, schema), } } @@ -1868,9 +1868,14 @@ impl<'a> InfluxQLToLogicalPlan<'a> { /// > * /// /// [docs]: https://docs.influxdata.com/influxdb/v1.8/query_language/functions/ - fn call_to_df_expr(&self, scope: ExprScope, call: &Call, schemas: &Schemas) -> Result { + fn call_to_df_expr( + &self, + scope: ExprScope, + call: &Call, + schema: &IQLSchema<'_>, + ) -> Result { if is_scalar_math_function(call.name.as_str()) { - return self.scalar_math_func_to_df_expr(scope, call, schemas); + return self.scalar_math_func_to_df_expr(scope, call, schema); } match scope { @@ -1882,7 +1887,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { error::query(format!("invalid function call in condition: {name}")) } } - ExprScope::Projection => self.function_to_df_expr(scope, call, schemas), + ExprScope::Projection => self.function_to_df_expr(scope, call, schema), } } @@ -1890,7 +1895,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { &self, scope: ExprScope, call: &Call, - schemas: &Schemas, + schema: &IQLSchema<'_>, ) -> Result { fn check_arg_count(name: &str, args: &[IQLExpr], count: usize) -> Result<()> { let got = args.len(); @@ -1925,13 +1930,13 @@ impl<'a> InfluxQLToLogicalPlan<'a> { // The DISTINCT function is handled as a `ProjectionType::RawDistinct` // query, so the planner only needs to project the single column // argument. - "distinct" => self.expr_to_df_expr(scope, &args[0], schemas), + "distinct" => self.expr_to_df_expr(scope, &args[0], schema), "count" => { let (expr, distinct) = match &args[0] { IQLExpr::Call(c) if c.name == "distinct" => { - (self.expr_to_df_expr(scope, &c.args[0], schemas)?, true) + (self.expr_to_df_expr(scope, &c.args[0], schema)?, true) } - expr => (self.expr_to_df_expr(scope, expr, schemas)?, false), + expr => (self.expr_to_df_expr(scope, expr, schema)?, false), }; if let Expr::Literal(ScalarValue::Null) = expr { return Ok(expr); @@ -1947,7 +1952,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { ))) } "sum" | "stddev" | "mean" | "median" => { - let expr = self.expr_to_df_expr(scope, &args[0], schemas)?; + let expr = self.expr_to_df_expr(scope, &args[0], schema)?; if let Expr::Literal(ScalarValue::Null) = expr { return Ok(expr); } @@ -1962,13 +1967,13 @@ impl<'a> InfluxQLToLogicalPlan<'a> { ))) } "percentile" => { - let expr = self.expr_to_df_expr(scope, &args[0], schemas)?; + let expr = self.expr_to_df_expr(scope, &args[0], schema)?; if let Expr::Literal(ScalarValue::Null) = expr { return Ok(expr); } check_arg_count(name, args, 2)?; - let nexpr = self.expr_to_df_expr(scope, &args[1], schemas)?; + let nexpr = self.expr_to_df_expr(scope, &args[1], schema)?; Ok(Expr::AggregateUDF(expr::AggregateUDF::new( PERCENTILE.clone(), vec![expr, nexpr], @@ -1977,7 +1982,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { ))) } name @ ("first" | "last" | "min" | "max") => { - let expr = self.expr_to_df_expr(scope, &args[0], schemas)?; + let expr = self.expr_to_df_expr(scope, &args[0], schema)?; if let Expr::Literal(ScalarValue::Null) = expr { return Ok(expr); } @@ -2000,7 +2005,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { check_arg_count(name, args, 1)?; // arg0 should be a column or function - let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?; + let arg0 = self.expr_to_df_expr(scope, &args[0], schema)?; if let Expr::Literal(ScalarValue::Null) = arg0 { return Ok(arg0); } @@ -2011,7 +2016,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { check_arg_count(name, args, 1)?; // arg0 should be a column or function - let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?; + let arg0 = self.expr_to_df_expr(scope, &args[0], schema)?; if let Expr::Literal(ScalarValue::Null) = arg0 { return Ok(arg0); } @@ -2022,14 +2027,14 @@ impl<'a> InfluxQLToLogicalPlan<'a> { check_arg_count(name, args, 2)?; // arg0 should be a column or function - let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?; + let arg0 = self.expr_to_df_expr(scope, &args[0], schema)?; if let Expr::Literal(ScalarValue::Null) = arg0 { return Ok(arg0); } // arg1 should be an integer. let arg1 = ScalarValue::Int64(Some( - match self.expr_to_df_expr(scope, &args[1], schemas)? { + match self.expr_to_df_expr(scope, &args[1], schema)? { Expr::Literal(ScalarValue::Int64(Some(v))) => v, Expr::Literal(ScalarValue::UInt64(Some(v))) => v as i64, _ => { @@ -2046,13 +2051,13 @@ impl<'a> InfluxQLToLogicalPlan<'a> { check_arg_count_range(name, args, 1, 2)?; // arg0 should be a column or function - let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?; + let arg0 = self.expr_to_df_expr(scope, &args[0], schema)?; if let Expr::Literal(ScalarValue::Null) = arg0 { return Ok(arg0); } let mut eargs = vec![arg0]; if args.len() > 1 { - let arg1 = self.expr_to_df_expr(scope, &args[1], schemas)?; + let arg1 = self.expr_to_df_expr(scope, &args[1], schema)?; eargs.push(arg1); } @@ -2062,13 +2067,13 @@ impl<'a> InfluxQLToLogicalPlan<'a> { check_arg_count_range(name, args, 1, 2)?; // arg0 should be a column or function - let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?; + let arg0 = self.expr_to_df_expr(scope, &args[0], schema)?; if let Expr::Literal(ScalarValue::Null) = arg0 { return Ok(arg0); } let mut eargs = vec![arg0]; if args.len() > 1 { - let arg1 = self.expr_to_df_expr(scope, &args[1], schemas)?; + let arg1 = self.expr_to_df_expr(scope, &args[1], schema)?; eargs.push(arg1); } @@ -2078,7 +2083,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { check_arg_count(name, args, 1)?; // arg0 should be a column or function - let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?; + let arg0 = self.expr_to_df_expr(scope, &args[0], schema)?; if let Expr::Literal(ScalarValue::Null) = arg0 { return Ok(arg0); } @@ -2088,7 +2093,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { // The TOP/BOTTOM function is handled as a `ProjectionType::TopBottomSelector` // query, so the planner only needs to project the single column // argument. - "top" | "bottom" => self.expr_to_df_expr(scope, &args[0], schemas), + "top" | "bottom" => self.expr_to_df_expr(scope, &args[0], schema), _ => error::query(format!("Invalid function '{name}'")), } @@ -2099,12 +2104,12 @@ impl<'a> InfluxQLToLogicalPlan<'a> { &self, scope: ExprScope, call: &Call, - schemas: &Schemas, + schema: &IQLSchema<'a>, ) -> Result { let args = call .args .iter() - .map(|e| self.expr_to_df_expr(scope, e, schemas)) + .map(|e| self.expr_to_df_expr(scope, e, schema)) .collect::>>()?; match BuiltinScalarFunction::from_str(call.name.as_str())? { @@ -2127,12 +2132,12 @@ impl<'a> InfluxQLToLogicalPlan<'a> { &self, scope: ExprScope, expr: &Binary, - schemas: &Schemas, + schema: &IQLSchema<'_>, ) -> Result { Ok(binary_expr( - self.expr_to_df_expr(scope, &expr.lhs, schemas)?, + self.expr_to_df_expr(scope, &expr.lhs, schema)?, binary_operator_to_df_operator(expr.op), - self.expr_to_df_expr(scope, &expr.rhs, schemas)?, + self.expr_to_df_expr(scope, &expr.rhs, schema)?, )) } @@ -2141,17 +2146,15 @@ impl<'a> InfluxQLToLogicalPlan<'a> { condition: Option<&ConditionalExpression>, time_range: TimeRange, plan: LogicalPlan, - schemas: &Schemas, - ds_schema: &DataSourceSchema<'_>, + schema: &IQLSchema<'a>, ) -> Result { let filter_expr = condition .map(|condition| { - let filter_expr = self.conditional_to_df_expr(condition, schemas)?; + let filter_expr = self.conditional_to_df_expr(condition, schema)?; planner_rewrite_expression::rewrite_conditional_expr( self.s.execution_props(), filter_expr, - schemas, - ds_schema, + schema, ) }) .transpose()?; @@ -2174,8 +2177,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { plan: LogicalPlan, condition: &Option, cutoff: MetadataCutoff, - schemas: &Schemas, - ds_schema: &DataSourceSchema<'_>, + schema: &IQLSchema<'_>, ) -> Result { let start_time = Timestamp::from(self.s.execution_props().query_execution_start_time); @@ -2207,7 +2209,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { time_range }; - self.plan_condition_time_range(cond.as_ref(), time_range, plan, schemas, ds_schema) + self.plan_condition_time_range(cond.as_ref(), time_range, plan, schema) } /// Generate a logical plan for the specified `DataSource`. @@ -2381,16 +2383,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> { let Some(table_schema) = self.s.table_schema(&table) else {continue}; let Some((plan, measurement_expr)) = self.create_table_ref(&table)? else {continue;}; - let schemas = Schemas::new(plan.schema())?; let ds = DataSource::Table(table.clone()); - let ds_schema = ds.schema(self.s)?; - let plan = self.plan_where_clause( - plan, - &condition, - metadata_cutoff, - &schemas, - &ds_schema, - )?; + let schema = IQLSchema::new_from_ds_schema(plan.schema(), ds.schema(self.s)?)?; + let plan = + self.plan_where_clause(plan, &condition, metadata_cutoff, &schema)?; let tags = table_schema .iter() @@ -2634,16 +2630,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> { let Some((plan, measurement_expr)) = self.create_table_ref(&table)? else {continue;}; - let schemas = Schemas::new(plan.schema())?; let ds = DataSource::Table(table.clone()); - let ds_schema = ds.schema(self.s)?; - let plan = self.plan_where_clause( - plan, - &show_tag_values.condition, - metadata_cutoff, - &schemas, - &ds_schema, - )?; + let schema = IQLSchema::new_from_ds_schema(plan.schema(), ds.schema(self.s)?)?; + let plan = + self.plan_where_clause(plan, &show_tag_values.condition, metadata_cutoff, &schema)?; for key in keys { let idx = plan @@ -2740,16 +2730,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> { for table in tables { let Some((plan, _measurement_expr)) = self.create_table_ref(&table)? else {continue;}; - let schemas = Schemas::new(plan.schema())?; let ds = DataSource::Table(table.clone()); - let ds_schema = ds.schema(self.s)?; - let plan = self.plan_where_clause( - plan, - &condition, - metadata_cutoff, - &schemas, - &ds_schema, - )?; + let schema = IQLSchema::new_from_ds_schema(plan.schema(), ds.schema(self.s)?)?; + let plan = + self.plan_where_clause(plan, &condition, metadata_cutoff, &schema)?; let plan = LogicalPlanBuilder::from(plan) .limit(0, Some(1))? diff --git a/iox_query_influxql/src/plan/planner_rewrite_expression.rs b/iox_query_influxql/src/plan/planner_rewrite_expression.rs index 20385a1eb2..2d326d4d1e 100644 --- a/iox_query_influxql/src/plan/planner_rewrite_expression.rs +++ b/iox_query_influxql/src/plan/planner_rewrite_expression.rs @@ -123,7 +123,7 @@ //! [`Eval`]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4137 use std::sync::Arc; -use crate::plan::util::Schemas; +use crate::plan::util::IQLSchema; use arrow::datatypes::DataType; use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; use datafusion::common::{Result, ScalarValue}; @@ -136,31 +136,28 @@ use datafusion::prelude::when; use observability_deps::tracing::trace; use predicate::rpc_predicate::{iox_expr_rewrite, simplify_predicate}; -use super::ir::DataSourceSchema; - /// Perform a series of passes to rewrite `expr` in compliance with InfluxQL behavior /// in an effort to ensure the query executes without error. pub(super) fn rewrite_conditional_expr( exec_props: &ExecutionProps, expr: Expr, - schemas: &Schemas, - ds_schema: &DataSourceSchema<'_>, + schema: &IQLSchema<'_>, ) -> Result { let simplify_context = - SimplifyContext::new(exec_props).with_schema(Arc::clone(&schemas.df_schema)); + SimplifyContext::new(exec_props).with_schema(Arc::clone(&schema.df_schema)); let simplifier = ExprSimplifier::new(simplify_context); Ok(expr) .map(|expr| log_rewrite(expr, "original")) // make regex matching with invalid types produce false - .and_then(|expr| expr.rewrite(&mut FixRegularExpressions { schemas })) + .and_then(|expr| expr.rewrite(&mut FixRegularExpressions { schema })) .map(|expr| log_rewrite(expr, "after fix_regular_expressions")) // rewrite exprs with incompatible operands to NULL or FALSE // (seems like FixRegularExpressions could be combined into this pass) - .and_then(|expr| rewrite_expr(expr, schemas)) + .and_then(|expr| rewrite_expr(expr, schema)) .map(|expr| log_rewrite(expr, "after rewrite_expr")) // Convert tag column references to CASE WHEN IS NULL THEN '' ELSE END - .and_then(|expr| rewrite_tag_columns(expr, schemas, ds_schema)) + .and_then(|expr| rewrite_tag_columns(expr, schema)) .map(|expr| log_rewrite(expr, "after rewrite_tag_columns")) // Push comparison operators into CASE exprs: // CASE WHEN tag0 IS NULL THEN '' ELSE tag0 END = 'foo' @@ -172,7 +169,7 @@ pub(super) fn rewrite_conditional_expr( // - convert numeric types so that operands agree // - convert Utf8 to Dictionary as needed // The next step will fail with type errors if we don't do this. - .and_then(|expr| simplifier.coerce(expr, Arc::clone(&schemas.df_schema))) + .and_then(|expr| simplifier.coerce(expr, Arc::clone(&schema.df_schema))) .map(|expr| log_rewrite(expr, "after coerce")) // DataFusion expression simplification. This is important here because: // CASE WHEN tag0 IS NULL THEN '' = 'foo' ELSE tag0 = 'foo' END @@ -206,8 +203,8 @@ fn log_rewrite(expr: Expr, description: &str) -> Expr { /// Perform a series of passes to rewrite `expr`, used as a column projection, /// to match the behavior of InfluxQL. -pub(super) fn rewrite_field_expr(expr: Expr, schemas: &Schemas) -> Result { - rewrite_expr(expr, schemas) +pub(super) fn rewrite_field_expr(expr: Expr, schema: &IQLSchema<'_>) -> Result { + rewrite_expr(expr, schema) } /// The expression was rewritten @@ -225,7 +222,7 @@ fn no(expr: Expr) -> Result> { /// /// Rewrite and coerce the expression tree to model the behavior /// of an InfluxQL query. -fn rewrite_expr(expr: Expr, schemas: &Schemas) -> Result { +fn rewrite_expr(expr: Expr, schema: &IQLSchema<'_>) -> Result { expr.transform(&|expr| { match expr { Expr::BinaryExpr(BinaryExpr { @@ -233,8 +230,8 @@ fn rewrite_expr(expr: Expr, schemas: &Schemas) -> Result { op, ref right, }) => { - let lhs_type = left.get_type(&schemas.df_schema)?; - let rhs_type = right.get_type(&schemas.df_schema)?; + let lhs_type = left.get_type(&schema.df_schema)?; + let rhs_type = right.get_type(&schema.df_schema)?; match (lhs_type, op, rhs_type) { // @@ -467,7 +464,7 @@ fn rewrite_boolean(lhs: Expr, op: Operator, rhs: Expr) -> Expr { /// Rewrite regex conditional expressions to match InfluxQL behaviour. struct FixRegularExpressions<'a> { - schemas: &'a Schemas, + schema: &'a IQLSchema<'a>, } impl<'a> TreeNodeRewriter for FixRegularExpressions<'a> { @@ -483,7 +480,7 @@ impl<'a> TreeNodeRewriter for FixRegularExpressions<'a> { right, }) => { Ok(if let Expr::Column(ref col) = *left { - match self.schemas.df_schema.field_from_column(col)?.data_type() { + match self.schema.df_schema.field_from_column(col)?.data_type() { DataType::Dictionary(..) | DataType::Utf8 => { Expr::BinaryExpr(BinaryExpr { left, op, right }) } @@ -517,13 +514,9 @@ impl<'a> TreeNodeRewriter for FixRegularExpressions<'a> { /// case when tag0 is null then "" else tag0 end /// ``` /// This ensures that we treat tags with the same semantics as OG InfluxQL. -fn rewrite_tag_columns( - expr: Expr, - _schemas: &Schemas, - ds_schema: &DataSourceSchema<'_>, -) -> Result { +fn rewrite_tag_columns(expr: Expr, schema: &IQLSchema<'_>) -> Result { expr.transform(&|expr| match expr { - Expr::Column(ref c) if ds_schema.is_tag_field(&c.name) => { + Expr::Column(ref c) if schema.is_tag_field(&c.name) => { yes(when(expr.clone().is_null(), lit("")).otherwise(expr)?) } e => no(e), @@ -532,6 +525,8 @@ fn rewrite_tag_columns( #[cfg(test)] mod test { + use crate::plan::ir::DataSourceSchema; + use super::*; use datafusion::logical_expr::lit_timestamp_nano; use datafusion::prelude::col; @@ -542,7 +537,7 @@ mod test { use schema::{InfluxFieldType, SchemaBuilder}; use std::sync::Arc; - fn new_schemas() -> (Schemas, DataSourceSchema<'static>) { + fn new_schema() -> IQLSchema<'static> { let iox_schema = SchemaBuilder::new() .measurement("m0") .timestamp() @@ -556,7 +551,8 @@ mod test { .build() .expect("schema failed"); let df_schema: DFSchemaRef = Arc::clone(iox_schema.inner()).to_dfschema_ref().unwrap(); - (Schemas { df_schema }, DataSourceSchema::Table(iox_schema)) + let ds_schema = DataSourceSchema::Table(iox_schema); + IQLSchema::new_from_ds_schema(&df_schema, ds_schema).unwrap() } /// Tests which validate that division is coalesced to `0`, to handle division by zero, @@ -566,7 +562,7 @@ mod test { /// binary expression to a scalar value, `0`. #[test] fn test_division() { - let (schemas, _) = new_schemas(); + let schemas = new_schema(); let rewrite = |expr| rewrite_expr(expr, &schemas).unwrap().to_string(); // Float64 @@ -627,7 +623,7 @@ mod test { #[test] fn test_pass_thru() { test_helpers::maybe_start_logging(); - let (schemas, _) = new_schemas(); + let schemas = new_schema(); let rewrite = |expr| rewrite_expr(expr, &schemas).unwrap().to_string(); let expr = lit(5.5).gt(lit(1_i64)); @@ -664,9 +660,9 @@ mod test { #[test] fn test_string_operations() { let props = execution_props(); - let (schemas, ds_schema) = new_schemas(); + let schemas = new_schema(); let rewrite = |expr| { - rewrite_conditional_expr(&props, expr, &schemas, &ds_schema) + rewrite_conditional_expr(&props, expr, &schemas) .unwrap() .to_string() }; @@ -688,7 +684,7 @@ mod test { /// to the supported bitwise operators. #[test] fn test_boolean_operations() { - let (schemas, _) = new_schemas(); + let schemas = new_schema(); let rewrite = |expr| rewrite_expr(expr, &schemas).unwrap().to_string(); let expr = "boolean_field".as_expr().and(lit(true)); @@ -743,7 +739,7 @@ mod test { /// Tests cases to validate Boolean and NULL data types #[test] fn test_rewrite_conditional_null() { - let (schemas, _) = new_schemas(); + let schemas = new_schema(); let rewrite = |expr| rewrite_expr(expr, &schemas).unwrap().to_string(); // NULL on either side and boolean on the other of a binary expression @@ -779,7 +775,7 @@ mod test { #[test] fn test_time_range() { - let (schemas, _) = new_schemas(); + let schemas = new_schema(); let rewrite = |expr| rewrite_expr(expr, &schemas).unwrap().to_string(); let expr = "time".as_expr().gt_eq(lit_timestamp_nano(1000)); @@ -811,7 +807,7 @@ mod test { /// valid operation for the given the operands. These are used when projecting columns. #[test] fn test_rewrite_expr_coercion_reduce_to_null() { - let (schemas, _) = new_schemas(); + let schemas = new_schema(); let rewrite = |expr| rewrite_expr(expr, &schemas).unwrap().to_string(); // @@ -851,9 +847,9 @@ mod test { fn test_rewrite_tag_columns_eq() { test_helpers::maybe_start_logging(); let props = execution_props(); - let (schemas, ds_schema) = new_schemas(); + let schemas = new_schema(); let rewrite = |expr| { - rewrite_conditional_expr(&props, expr, &schemas, &ds_schema) + rewrite_conditional_expr(&props, expr, &schemas) .unwrap() .to_string() }; @@ -904,9 +900,9 @@ mod test { fn test_rewrite_tag_columns_regex() { let props = execution_props(); test_helpers::maybe_start_logging(); - let (schemas, ds_schema) = new_schemas(); + let schemas = new_schema(); let rewrite = |expr| { - rewrite_conditional_expr(&props, expr, &schemas, &ds_schema) + rewrite_conditional_expr(&props, expr, &schemas) .unwrap() .to_string() }; @@ -931,9 +927,9 @@ mod test { fn test_fields_pass_thru() { test_helpers::maybe_start_logging(); let props = execution_props(); - let (schemas, ds_schema) = new_schemas(); + let schemas = new_schema(); let rewrite = |expr| { - rewrite_conditional_expr(&props, expr, &schemas, &ds_schema) + rewrite_conditional_expr(&props, expr, &schemas) .unwrap() .to_string() }; diff --git a/iox_query_influxql/src/plan/util.rs b/iox_query_influxql/src/plan/util.rs index 4d8eece367..dbbbf0fa12 100644 --- a/iox_query_influxql/src/plan/util.rs +++ b/iox_query_influxql/src/plan/util.rs @@ -11,8 +11,11 @@ use influxdb_influxql_parser::literal::Number; use influxdb_influxql_parser::string::Regex; use query_functions::clean_non_meta_escapes; use query_functions::coalesce_struct::coalesce_struct; +use schema::InfluxColumnType; use std::sync::Arc; +use super::ir::{DataSourceSchema, Field}; + pub(in crate::plan) fn binary_operator_to_df_operator(op: BinaryOperator) -> Operator { match op { BinaryOperator::Add => Operator::Plus, @@ -26,17 +29,62 @@ pub(in crate::plan) fn binary_operator_to_df_operator(op: BinaryOperator) -> Ope } } -/// Container for both the DataFusion and equivalent IOx schema. -pub(in crate::plan) struct Schemas { +/// Container for the DataFusion schema as well as +/// info on which columns are tags. +pub(in crate::plan) struct IQLSchema<'a> { pub(in crate::plan) df_schema: DFSchemaRef, + tag_info: TagInfo<'a>, } -impl Schemas { - pub(in crate::plan) fn new(df_schema: &DFSchemaRef) -> Result { +impl<'a> IQLSchema<'a> { + /// Create a new IQLSchema from a [`DataSourceSchema`] from the + /// FROM clause of a query or subquery. + pub(in crate::plan) fn new_from_ds_schema( + df_schema: &DFSchemaRef, + ds_schema: DataSourceSchema<'a>, + ) -> Result { Ok(Self { df_schema: Arc::clone(df_schema), + tag_info: TagInfo::DataSourceSchema(ds_schema), }) } + + /// Create a new IQLSchema from a list of [`Field`]s on the SELECT list + /// of a subquery. + pub(in crate::plan) fn new_from_fields( + df_schema: &DFSchemaRef, + fields: &'a [Field], + ) -> Result { + Ok(Self { + df_schema: Arc::clone(df_schema), + tag_info: TagInfo::FieldList(fields), + }) + } + + /// Returns `true` if the schema contains a tag column with the specified name. + pub fn is_tag_field(&self, name: &str) -> bool { + match self.tag_info { + TagInfo::DataSourceSchema(ref ds_schema) => ds_schema.is_tag_field(name), + TagInfo::FieldList(fields) => fields + .iter() + .any(|f| f.name == name && f.data_type == Some(InfluxColumnType::Tag)), + } + } + + /// Returns `true` if the schema contains a tag column with the specified name. + /// If the underlying data source is a subquery, it will apply any aliases in the + /// projection that represents the SELECT list. + pub fn is_projected_tag_field(&self, name: &str) -> bool { + match self.tag_info { + TagInfo::DataSourceSchema(ref ds_schema) => ds_schema.is_projected_tag_field(name), + _ => self.is_tag_field(name), + } + } +} + +pub(in crate::plan) enum TagInfo<'a> { + DataSourceSchema(DataSourceSchema<'a>), + FieldList(&'a [Field]), } /// Sanitize an InfluxQL regular expression and create a compiled [`regex::Regex`].