refactor: refactor `Schemas` type in InfluxQL planner (#8237)

* refactor: refactor `Schemas` type in InfluxQL planner

* chore: update after merging main

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Christopher M. Wolff 2023-07-18 10:14:39 -07:00 committed by GitHub
parent efae0f108a
commit b79009b128
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 174 additions and 146 deletions

View File

@ -14,7 +14,7 @@ use crate::plan::udf::{
cumulative_sum, derivative, difference, find_window_udfs, moving_average,
non_negative_derivative, non_negative_difference,
};
use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, Schemas};
use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, IQLSchema};
use crate::plan::var_ref::var_ref_data_type_to_data_type;
use crate::plan::{planner_rewrite_expression, udf, util_copy};
use crate::window::{CUMULATIVE_SUM, DIFFERENCE, NON_NEGATIVE_DIFFERENCE, PERCENT_ROW_NUMBER};
@ -91,7 +91,6 @@ use std::ops::{Bound, ControlFlow, Deref, Not, Range};
use std::str::FromStr;
use std::sync::Arc;
use super::ir::DataSourceSchema;
use super::parse_regex;
use super::util::contains_expr;
use super::util_copy::clone_with_replacement;
@ -709,16 +708,14 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
continue;
};
let schemas = Schemas::new(plan.schema())?;
let ds_schema = ds.schema(self.s)?;
let schema = IQLSchema::new_from_ds_schema(plan.schema(), ds.schema(self.s)?)?;
let plan = self.plan_condition_time_range(
ctx.condition,
ctx.extended_time_range(),
plan,
&schemas,
&ds_schema,
&schema,
)?;
plans.push((plan, ds_schema));
plans.push((plan, schema));
}
Ok(match plans.len() {
@ -794,10 +791,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
/// Plan "Raw" SELECT queriers, These are queries that have no grouping
/// and call only scalar functions.
fn project_select_raw(&self, input: LogicalPlan, fields: &[Field]) -> Result<LogicalPlan> {
let schemas = Schemas::new(input.schema())?;
let schema = IQLSchema::new_from_fields(input.schema(), fields)?;
// Transform InfluxQL AST field expressions to a list of DataFusion expressions.
let select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?;
let select_exprs = self.field_list_to_exprs(&input, fields, &schema)?;
// Wrap the plan in a `LogicalPlan::Projection` from the select expressions
project(input, select_exprs)
@ -810,10 +807,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
input: LogicalPlan,
fields: &[Field],
) -> Result<LogicalPlan> {
let schemas = Schemas::new(input.schema())?;
let schema = IQLSchema::new_from_fields(input.schema(), fields)?;
// Transform InfluxQL AST field expressions to a list of DataFusion expressions.
let mut select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?;
let mut select_exprs = self.field_list_to_exprs(&input, fields, &schema)?;
// This is a special case, where exactly one column can be projected with a `DISTINCT`
// clause or the `distinct` function.
@ -847,10 +844,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
fields: &[Field],
group_by_tag_set: &[&str],
) -> Result<LogicalPlan> {
let schemas = Schemas::new(input.schema())?;
let schema = IQLSchema::new_from_fields(input.schema(), fields)?;
// Transform InfluxQL AST field expressions to a list of DataFusion expressions.
let select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?;
let select_exprs = self.field_list_to_exprs(&input, fields, &schema)?;
let (plan, select_exprs) =
self.select_aggregate(ctx, input, fields, select_exprs, group_by_tag_set)?;
@ -868,10 +865,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
fields: &[Field],
group_by_tag_set: &[&str],
) -> Result<LogicalPlan> {
let schemas = Schemas::new(input.schema())?;
let schema = IQLSchema::new_from_fields(input.schema(), fields)?;
// Transform InfluxQL AST field expressions to a list of DataFusion expressions.
let select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?;
let select_exprs = self.field_list_to_exprs(&input, fields, &schema)?;
let (plan, select_exprs) =
self.select_window(ctx, input, select_exprs, group_by_tag_set)?;
@ -906,10 +903,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
fields: &[Field],
group_by_tag_set: &[&str],
) -> Result<LogicalPlan> {
let schemas = Schemas::new(input.schema())?;
let schema = IQLSchema::new_from_fields(input.schema(), fields)?;
// Transform InfluxQL AST field expressions to a list of DataFusion expressions.
let select_exprs = self.field_list_to_exprs(&input, fields, &schemas)?;
let select_exprs = self.field_list_to_exprs(&input, fields, &schema)?;
let (plan, select_exprs) =
self.select_aggregate(ctx, input, fields, select_exprs, group_by_tag_set)?;
@ -950,7 +947,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
fields: &[Field],
group_by_tag_set: &[&str],
) -> Result<LogicalPlan> {
let schemas = Schemas::new(input.schema())?;
let schema = IQLSchema::new_from_fields(input.schema(), fields)?;
let (selector_index, field_key, plan) = match Selector::find_enumerated(fields)? {
(_, Selector::First { .. })
@ -1024,7 +1021,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
});
// Transform InfluxQL AST field expressions to a list of DataFusion expressions.
let select_exprs = self.field_list_to_exprs(&plan, fields_vec.as_slice(), &schemas)?;
let select_exprs = self.field_list_to_exprs(&plan, fields_vec.as_slice(), &schema)?;
// Wrap the plan in a `LogicalPlan::Projection` from the select expressions
project(plan, select_exprs)
@ -1040,7 +1037,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
fields: &[Field],
group_by_tag_set: &[&str],
) -> Result<LogicalPlan> {
let schemas = Schemas::new(input.schema())?;
let schema = IQLSchema::new_from_fields(input.schema(), fields)?;
let (selector_index, is_bottom, field_key, tag_keys, narg) =
match Selector::find_enumerated(fields)? {
@ -1095,7 +1092,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
}
// Transform InfluxQL AST field expressions to a list of DataFusion expressions.
let select_exprs = self.field_list_to_exprs(&input, fields_vec.as_slice(), &schemas)?;
let select_exprs = self.field_list_to_exprs(&input, fields_vec.as_slice(), &schema)?;
let plan = if !tag_keys.is_empty() {
self.select_first(ctx, input, order_by, internal_group_by.as_slice(), 1)?
@ -1695,7 +1692,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
&self,
plan: &LogicalPlan,
fields: &[Field],
schemas: &Schemas,
schema: &IQLSchema<'_>,
) -> Result<Vec<Expr>> {
let mut names: HashMap<&str, usize> = HashMap::new();
fields
@ -1715,7 +1712,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
};
new_field
})
.map(|field| self.field_to_df_expr(&field, plan, schemas))
.map(|field| self.field_to_df_expr(&field, plan, schema))
.collect()
}
@ -1726,10 +1723,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
&self,
field: &Field,
plan: &LogicalPlan,
schemas: &Schemas,
schema: &IQLSchema<'_>,
) -> Result<Expr> {
let expr = self.expr_to_df_expr(ExprScope::Projection, &field.expr, schemas)?;
let expr = planner_rewrite_expression::rewrite_field_expr(expr, schemas)?;
let expr = self.expr_to_df_expr(ExprScope::Projection, &field.expr, schema)?;
let expr = planner_rewrite_expression::rewrite_field_expr(expr, schema)?;
normalize_col(expr.alias(&field.name), plan)
}
@ -1737,16 +1734,14 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
fn conditional_to_df_expr(
&self,
iql: &ConditionalExpression,
schemas: &Schemas,
schema: &IQLSchema<'_>,
) -> Result<Expr> {
match iql {
ConditionalExpression::Expr(expr) => {
self.expr_to_df_expr(ExprScope::Where, expr, schemas)
self.expr_to_df_expr(ExprScope::Where, expr, schema)
}
ConditionalExpression::Binary(expr) => {
self.binary_conditional_to_df_expr(expr, schemas)
}
ConditionalExpression::Grouped(e) => self.conditional_to_df_expr(e, schemas),
ConditionalExpression::Binary(expr) => self.binary_conditional_to_df_expr(expr, schema),
ConditionalExpression::Grouped(e) => self.conditional_to_df_expr(e, schema),
}
}
@ -1754,20 +1749,25 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
fn binary_conditional_to_df_expr(
&self,
expr: &ConditionalBinary,
schemas: &Schemas,
schema: &IQLSchema<'_>,
) -> Result<Expr> {
let ConditionalBinary { lhs, op, rhs } = expr;
Ok(binary_expr(
self.conditional_to_df_expr(lhs, schemas)?,
self.conditional_to_df_expr(lhs, schema)?,
conditional_op_to_operator(*op)?,
self.conditional_to_df_expr(rhs, schemas)?,
self.conditional_to_df_expr(rhs, schema)?,
))
}
/// Map an InfluxQL [`IQLExpr`] to a DataFusion [`Expr`].
fn expr_to_df_expr(&self, scope: ExprScope, iql: &IQLExpr, schemas: &Schemas) -> Result<Expr> {
let schema = &schemas.df_schema;
fn expr_to_df_expr(
&self,
scope: ExprScope,
iql: &IQLExpr,
schema: &IQLSchema<'_>,
) -> Result<Expr> {
let df_schema = &schema.df_schema;
match iql {
// rewriter is expected to expand wildcard expressions
IQLExpr::Wildcard(_) => error::internal("unexpected wildcard in projection"),
@ -1784,7 +1784,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
"time".as_expr()
}
(ExprScope::Projection, "time") => "time".as_expr(),
(_, name) => match schema
(_, name) => match df_schema
.fields_with_unqualified_name(name)
.first()
.map(|f| f.data_type().clone())
@ -1808,7 +1808,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
// and it is safe to unconditionally unwrap, as the
// `is_numeric_type` call guarantees it can be mapped to
// an Arrow DataType
column.cast_to(&dst_type, &schemas.df_schema)?
column.cast_to(&dst_type, &schema.df_schema)?
} else {
// If the cast is incompatible, evaluates to NULL
Expr::Literal(ScalarValue::Null)
@ -1846,9 +1846,9 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
},
// A DISTINCT <ident> clause should have been replaced by `rewrite_statement`.
IQLExpr::Distinct(_) => error::internal("distinct expression"),
IQLExpr::Call(call) => self.call_to_df_expr(scope, call, schemas),
IQLExpr::Binary(expr) => self.arithmetic_expr_to_df_expr(scope, expr, schemas),
IQLExpr::Nested(e) => self.expr_to_df_expr(scope, e, schemas),
IQLExpr::Call(call) => self.call_to_df_expr(scope, call, schema),
IQLExpr::Binary(expr) => self.arithmetic_expr_to_df_expr(scope, expr, schema),
IQLExpr::Nested(e) => self.expr_to_df_expr(scope, e, schema),
}
}
@ -1868,9 +1868,14 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
/// > * <https://github.com/influxdata/influxdb_iox/issues/6939>
///
/// [docs]: https://docs.influxdata.com/influxdb/v1.8/query_language/functions/
fn call_to_df_expr(&self, scope: ExprScope, call: &Call, schemas: &Schemas) -> Result<Expr> {
fn call_to_df_expr(
&self,
scope: ExprScope,
call: &Call,
schema: &IQLSchema<'_>,
) -> Result<Expr> {
if is_scalar_math_function(call.name.as_str()) {
return self.scalar_math_func_to_df_expr(scope, call, schemas);
return self.scalar_math_func_to_df_expr(scope, call, schema);
}
match scope {
@ -1882,7 +1887,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
error::query(format!("invalid function call in condition: {name}"))
}
}
ExprScope::Projection => self.function_to_df_expr(scope, call, schemas),
ExprScope::Projection => self.function_to_df_expr(scope, call, schema),
}
}
@ -1890,7 +1895,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
&self,
scope: ExprScope,
call: &Call,
schemas: &Schemas,
schema: &IQLSchema<'_>,
) -> Result<Expr> {
fn check_arg_count(name: &str, args: &[IQLExpr], count: usize) -> Result<()> {
let got = args.len();
@ -1925,13 +1930,13 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
// The DISTINCT function is handled as a `ProjectionType::RawDistinct`
// query, so the planner only needs to project the single column
// argument.
"distinct" => self.expr_to_df_expr(scope, &args[0], schemas),
"distinct" => self.expr_to_df_expr(scope, &args[0], schema),
"count" => {
let (expr, distinct) = match &args[0] {
IQLExpr::Call(c) if c.name == "distinct" => {
(self.expr_to_df_expr(scope, &c.args[0], schemas)?, true)
(self.expr_to_df_expr(scope, &c.args[0], schema)?, true)
}
expr => (self.expr_to_df_expr(scope, expr, schemas)?, false),
expr => (self.expr_to_df_expr(scope, expr, schema)?, false),
};
if let Expr::Literal(ScalarValue::Null) = expr {
return Ok(expr);
@ -1947,7 +1952,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
)))
}
"sum" | "stddev" | "mean" | "median" => {
let expr = self.expr_to_df_expr(scope, &args[0], schemas)?;
let expr = self.expr_to_df_expr(scope, &args[0], schema)?;
if let Expr::Literal(ScalarValue::Null) = expr {
return Ok(expr);
}
@ -1962,13 +1967,13 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
)))
}
"percentile" => {
let expr = self.expr_to_df_expr(scope, &args[0], schemas)?;
let expr = self.expr_to_df_expr(scope, &args[0], schema)?;
if let Expr::Literal(ScalarValue::Null) = expr {
return Ok(expr);
}
check_arg_count(name, args, 2)?;
let nexpr = self.expr_to_df_expr(scope, &args[1], schemas)?;
let nexpr = self.expr_to_df_expr(scope, &args[1], schema)?;
Ok(Expr::AggregateUDF(expr::AggregateUDF::new(
PERCENTILE.clone(),
vec![expr, nexpr],
@ -1977,7 +1982,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
)))
}
name @ ("first" | "last" | "min" | "max") => {
let expr = self.expr_to_df_expr(scope, &args[0], schemas)?;
let expr = self.expr_to_df_expr(scope, &args[0], schema)?;
if let Expr::Literal(ScalarValue::Null) = expr {
return Ok(expr);
}
@ -2000,7 +2005,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
check_arg_count(name, args, 1)?;
// arg0 should be a column or function
let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?;
let arg0 = self.expr_to_df_expr(scope, &args[0], schema)?;
if let Expr::Literal(ScalarValue::Null) = arg0 {
return Ok(arg0);
}
@ -2011,7 +2016,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
check_arg_count(name, args, 1)?;
// arg0 should be a column or function
let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?;
let arg0 = self.expr_to_df_expr(scope, &args[0], schema)?;
if let Expr::Literal(ScalarValue::Null) = arg0 {
return Ok(arg0);
}
@ -2022,14 +2027,14 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
check_arg_count(name, args, 2)?;
// arg0 should be a column or function
let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?;
let arg0 = self.expr_to_df_expr(scope, &args[0], schema)?;
if let Expr::Literal(ScalarValue::Null) = arg0 {
return Ok(arg0);
}
// arg1 should be an integer.
let arg1 = ScalarValue::Int64(Some(
match self.expr_to_df_expr(scope, &args[1], schemas)? {
match self.expr_to_df_expr(scope, &args[1], schema)? {
Expr::Literal(ScalarValue::Int64(Some(v))) => v,
Expr::Literal(ScalarValue::UInt64(Some(v))) => v as i64,
_ => {
@ -2046,13 +2051,13 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
check_arg_count_range(name, args, 1, 2)?;
// arg0 should be a column or function
let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?;
let arg0 = self.expr_to_df_expr(scope, &args[0], schema)?;
if let Expr::Literal(ScalarValue::Null) = arg0 {
return Ok(arg0);
}
let mut eargs = vec![arg0];
if args.len() > 1 {
let arg1 = self.expr_to_df_expr(scope, &args[1], schemas)?;
let arg1 = self.expr_to_df_expr(scope, &args[1], schema)?;
eargs.push(arg1);
}
@ -2062,13 +2067,13 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
check_arg_count_range(name, args, 1, 2)?;
// arg0 should be a column or function
let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?;
let arg0 = self.expr_to_df_expr(scope, &args[0], schema)?;
if let Expr::Literal(ScalarValue::Null) = arg0 {
return Ok(arg0);
}
let mut eargs = vec![arg0];
if args.len() > 1 {
let arg1 = self.expr_to_df_expr(scope, &args[1], schemas)?;
let arg1 = self.expr_to_df_expr(scope, &args[1], schema)?;
eargs.push(arg1);
}
@ -2078,7 +2083,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
check_arg_count(name, args, 1)?;
// arg0 should be a column or function
let arg0 = self.expr_to_df_expr(scope, &args[0], schemas)?;
let arg0 = self.expr_to_df_expr(scope, &args[0], schema)?;
if let Expr::Literal(ScalarValue::Null) = arg0 {
return Ok(arg0);
}
@ -2088,7 +2093,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
// The TOP/BOTTOM function is handled as a `ProjectionType::TopBottomSelector`
// query, so the planner only needs to project the single column
// argument.
"top" | "bottom" => self.expr_to_df_expr(scope, &args[0], schemas),
"top" | "bottom" => self.expr_to_df_expr(scope, &args[0], schema),
_ => error::query(format!("Invalid function '{name}'")),
}
@ -2099,12 +2104,12 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
&self,
scope: ExprScope,
call: &Call,
schemas: &Schemas,
schema: &IQLSchema<'a>,
) -> Result<Expr> {
let args = call
.args
.iter()
.map(|e| self.expr_to_df_expr(scope, e, schemas))
.map(|e| self.expr_to_df_expr(scope, e, schema))
.collect::<Result<Vec<Expr>>>()?;
match BuiltinScalarFunction::from_str(call.name.as_str())? {
@ -2127,12 +2132,12 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
&self,
scope: ExprScope,
expr: &Binary,
schemas: &Schemas,
schema: &IQLSchema<'_>,
) -> Result<Expr> {
Ok(binary_expr(
self.expr_to_df_expr(scope, &expr.lhs, schemas)?,
self.expr_to_df_expr(scope, &expr.lhs, schema)?,
binary_operator_to_df_operator(expr.op),
self.expr_to_df_expr(scope, &expr.rhs, schemas)?,
self.expr_to_df_expr(scope, &expr.rhs, schema)?,
))
}
@ -2141,17 +2146,15 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
condition: Option<&ConditionalExpression>,
time_range: TimeRange,
plan: LogicalPlan,
schemas: &Schemas,
ds_schema: &DataSourceSchema<'_>,
schema: &IQLSchema<'a>,
) -> Result<LogicalPlan> {
let filter_expr = condition
.map(|condition| {
let filter_expr = self.conditional_to_df_expr(condition, schemas)?;
let filter_expr = self.conditional_to_df_expr(condition, schema)?;
planner_rewrite_expression::rewrite_conditional_expr(
self.s.execution_props(),
filter_expr,
schemas,
ds_schema,
schema,
)
})
.transpose()?;
@ -2174,8 +2177,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
plan: LogicalPlan,
condition: &Option<WhereClause>,
cutoff: MetadataCutoff,
schemas: &Schemas,
ds_schema: &DataSourceSchema<'_>,
schema: &IQLSchema<'_>,
) -> Result<LogicalPlan> {
let start_time = Timestamp::from(self.s.execution_props().query_execution_start_time);
@ -2207,7 +2209,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
time_range
};
self.plan_condition_time_range(cond.as_ref(), time_range, plan, schemas, ds_schema)
self.plan_condition_time_range(cond.as_ref(), time_range, plan, schema)
}
/// Generate a logical plan for the specified `DataSource`.
@ -2381,16 +2383,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
let Some(table_schema) = self.s.table_schema(&table) else {continue};
let Some((plan, measurement_expr)) = self.create_table_ref(&table)? else {continue;};
let schemas = Schemas::new(plan.schema())?;
let ds = DataSource::Table(table.clone());
let ds_schema = ds.schema(self.s)?;
let plan = self.plan_where_clause(
plan,
&condition,
metadata_cutoff,
&schemas,
&ds_schema,
)?;
let schema = IQLSchema::new_from_ds_schema(plan.schema(), ds.schema(self.s)?)?;
let plan =
self.plan_where_clause(plan, &condition, metadata_cutoff, &schema)?;
let tags = table_schema
.iter()
@ -2634,16 +2630,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
let Some((plan, measurement_expr)) = self.create_table_ref(&table)? else {continue;};
let schemas = Schemas::new(plan.schema())?;
let ds = DataSource::Table(table.clone());
let ds_schema = ds.schema(self.s)?;
let plan = self.plan_where_clause(
plan,
&show_tag_values.condition,
metadata_cutoff,
&schemas,
&ds_schema,
)?;
let schema = IQLSchema::new_from_ds_schema(plan.schema(), ds.schema(self.s)?)?;
let plan =
self.plan_where_clause(plan, &show_tag_values.condition, metadata_cutoff, &schema)?;
for key in keys {
let idx = plan
@ -2740,16 +2730,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
for table in tables {
let Some((plan, _measurement_expr)) = self.create_table_ref(&table)? else {continue;};
let schemas = Schemas::new(plan.schema())?;
let ds = DataSource::Table(table.clone());
let ds_schema = ds.schema(self.s)?;
let plan = self.plan_where_clause(
plan,
&condition,
metadata_cutoff,
&schemas,
&ds_schema,
)?;
let schema = IQLSchema::new_from_ds_schema(plan.schema(), ds.schema(self.s)?)?;
let plan =
self.plan_where_clause(plan, &condition, metadata_cutoff, &schema)?;
let plan = LogicalPlanBuilder::from(plan)
.limit(0, Some(1))?

View File

@ -123,7 +123,7 @@
//! [`Eval`]: https://github.com/influxdata/influxql/blob/1ba470371ec093d57a726b143fe6ccbacf1b452b/ast.go#L4137
use std::sync::Arc;
use crate::plan::util::Schemas;
use crate::plan::util::IQLSchema;
use arrow::datatypes::DataType;
use datafusion::common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion::common::{Result, ScalarValue};
@ -136,31 +136,28 @@ use datafusion::prelude::when;
use observability_deps::tracing::trace;
use predicate::rpc_predicate::{iox_expr_rewrite, simplify_predicate};
use super::ir::DataSourceSchema;
/// Perform a series of passes to rewrite `expr` in compliance with InfluxQL behavior
/// in an effort to ensure the query executes without error.
pub(super) fn rewrite_conditional_expr(
exec_props: &ExecutionProps,
expr: Expr,
schemas: &Schemas,
ds_schema: &DataSourceSchema<'_>,
schema: &IQLSchema<'_>,
) -> Result<Expr> {
let simplify_context =
SimplifyContext::new(exec_props).with_schema(Arc::clone(&schemas.df_schema));
SimplifyContext::new(exec_props).with_schema(Arc::clone(&schema.df_schema));
let simplifier = ExprSimplifier::new(simplify_context);
Ok(expr)
.map(|expr| log_rewrite(expr, "original"))
// make regex matching with invalid types produce false
.and_then(|expr| expr.rewrite(&mut FixRegularExpressions { schemas }))
.and_then(|expr| expr.rewrite(&mut FixRegularExpressions { schema }))
.map(|expr| log_rewrite(expr, "after fix_regular_expressions"))
// rewrite exprs with incompatible operands to NULL or FALSE
// (seems like FixRegularExpressions could be combined into this pass)
.and_then(|expr| rewrite_expr(expr, schemas))
.and_then(|expr| rewrite_expr(expr, schema))
.map(|expr| log_rewrite(expr, "after rewrite_expr"))
// Convert tag column references to CASE WHEN <tag> IS NULL THEN '' ELSE <tag> END
.and_then(|expr| rewrite_tag_columns(expr, schemas, ds_schema))
.and_then(|expr| rewrite_tag_columns(expr, schema))
.map(|expr| log_rewrite(expr, "after rewrite_tag_columns"))
// Push comparison operators into CASE exprs:
// CASE WHEN tag0 IS NULL THEN '' ELSE tag0 END = 'foo'
@ -172,7 +169,7 @@ pub(super) fn rewrite_conditional_expr(
// - convert numeric types so that operands agree
// - convert Utf8 to Dictionary as needed
// The next step will fail with type errors if we don't do this.
.and_then(|expr| simplifier.coerce(expr, Arc::clone(&schemas.df_schema)))
.and_then(|expr| simplifier.coerce(expr, Arc::clone(&schema.df_schema)))
.map(|expr| log_rewrite(expr, "after coerce"))
// DataFusion expression simplification. This is important here because:
// CASE WHEN tag0 IS NULL THEN '' = 'foo' ELSE tag0 = 'foo' END
@ -206,8 +203,8 @@ fn log_rewrite(expr: Expr, description: &str) -> Expr {
/// Perform a series of passes to rewrite `expr`, used as a column projection,
/// to match the behavior of InfluxQL.
pub(super) fn rewrite_field_expr(expr: Expr, schemas: &Schemas) -> Result<Expr> {
rewrite_expr(expr, schemas)
pub(super) fn rewrite_field_expr(expr: Expr, schema: &IQLSchema<'_>) -> Result<Expr> {
rewrite_expr(expr, schema)
}
/// The expression was rewritten
@ -225,7 +222,7 @@ fn no(expr: Expr) -> Result<Transformed<Expr>> {
///
/// Rewrite and coerce the expression tree to model the behavior
/// of an InfluxQL query.
fn rewrite_expr(expr: Expr, schemas: &Schemas) -> Result<Expr> {
fn rewrite_expr(expr: Expr, schema: &IQLSchema<'_>) -> Result<Expr> {
expr.transform(&|expr| {
match expr {
Expr::BinaryExpr(BinaryExpr {
@ -233,8 +230,8 @@ fn rewrite_expr(expr: Expr, schemas: &Schemas) -> Result<Expr> {
op,
ref right,
}) => {
let lhs_type = left.get_type(&schemas.df_schema)?;
let rhs_type = right.get_type(&schemas.df_schema)?;
let lhs_type = left.get_type(&schema.df_schema)?;
let rhs_type = right.get_type(&schema.df_schema)?;
match (lhs_type, op, rhs_type) {
//
@ -467,7 +464,7 @@ fn rewrite_boolean(lhs: Expr, op: Operator, rhs: Expr) -> Expr {
/// Rewrite regex conditional expressions to match InfluxQL behaviour.
struct FixRegularExpressions<'a> {
schemas: &'a Schemas,
schema: &'a IQLSchema<'a>,
}
impl<'a> TreeNodeRewriter for FixRegularExpressions<'a> {
@ -483,7 +480,7 @@ impl<'a> TreeNodeRewriter for FixRegularExpressions<'a> {
right,
}) => {
Ok(if let Expr::Column(ref col) = *left {
match self.schemas.df_schema.field_from_column(col)?.data_type() {
match self.schema.df_schema.field_from_column(col)?.data_type() {
DataType::Dictionary(..) | DataType::Utf8 => {
Expr::BinaryExpr(BinaryExpr { left, op, right })
}
@ -517,13 +514,9 @@ impl<'a> TreeNodeRewriter for FixRegularExpressions<'a> {
/// case when tag0 is null then "" else tag0 end
/// ```
/// This ensures that we treat tags with the same semantics as OG InfluxQL.
fn rewrite_tag_columns(
expr: Expr,
_schemas: &Schemas,
ds_schema: &DataSourceSchema<'_>,
) -> Result<Expr> {
fn rewrite_tag_columns(expr: Expr, schema: &IQLSchema<'_>) -> Result<Expr> {
expr.transform(&|expr| match expr {
Expr::Column(ref c) if ds_schema.is_tag_field(&c.name) => {
Expr::Column(ref c) if schema.is_tag_field(&c.name) => {
yes(when(expr.clone().is_null(), lit("")).otherwise(expr)?)
}
e => no(e),
@ -532,6 +525,8 @@ fn rewrite_tag_columns(
#[cfg(test)]
mod test {
use crate::plan::ir::DataSourceSchema;
use super::*;
use datafusion::logical_expr::lit_timestamp_nano;
use datafusion::prelude::col;
@ -542,7 +537,7 @@ mod test {
use schema::{InfluxFieldType, SchemaBuilder};
use std::sync::Arc;
fn new_schemas() -> (Schemas, DataSourceSchema<'static>) {
fn new_schema() -> IQLSchema<'static> {
let iox_schema = SchemaBuilder::new()
.measurement("m0")
.timestamp()
@ -556,7 +551,8 @@ mod test {
.build()
.expect("schema failed");
let df_schema: DFSchemaRef = Arc::clone(iox_schema.inner()).to_dfschema_ref().unwrap();
(Schemas { df_schema }, DataSourceSchema::Table(iox_schema))
let ds_schema = DataSourceSchema::Table(iox_schema);
IQLSchema::new_from_ds_schema(&df_schema, ds_schema).unwrap()
}
/// Tests which validate that division is coalesced to `0`, to handle division by zero,
@ -566,7 +562,7 @@ mod test {
/// binary expression to a scalar value, `0`.
#[test]
fn test_division() {
let (schemas, _) = new_schemas();
let schemas = new_schema();
let rewrite = |expr| rewrite_expr(expr, &schemas).unwrap().to_string();
// Float64
@ -627,7 +623,7 @@ mod test {
#[test]
fn test_pass_thru() {
test_helpers::maybe_start_logging();
let (schemas, _) = new_schemas();
let schemas = new_schema();
let rewrite = |expr| rewrite_expr(expr, &schemas).unwrap().to_string();
let expr = lit(5.5).gt(lit(1_i64));
@ -664,9 +660,9 @@ mod test {
#[test]
fn test_string_operations() {
let props = execution_props();
let (schemas, ds_schema) = new_schemas();
let schemas = new_schema();
let rewrite = |expr| {
rewrite_conditional_expr(&props, expr, &schemas, &ds_schema)
rewrite_conditional_expr(&props, expr, &schemas)
.unwrap()
.to_string()
};
@ -688,7 +684,7 @@ mod test {
/// to the supported bitwise operators.
#[test]
fn test_boolean_operations() {
let (schemas, _) = new_schemas();
let schemas = new_schema();
let rewrite = |expr| rewrite_expr(expr, &schemas).unwrap().to_string();
let expr = "boolean_field".as_expr().and(lit(true));
@ -743,7 +739,7 @@ mod test {
/// Tests cases to validate Boolean and NULL data types
#[test]
fn test_rewrite_conditional_null() {
let (schemas, _) = new_schemas();
let schemas = new_schema();
let rewrite = |expr| rewrite_expr(expr, &schemas).unwrap().to_string();
// NULL on either side and boolean on the other of a binary expression
@ -779,7 +775,7 @@ mod test {
#[test]
fn test_time_range() {
let (schemas, _) = new_schemas();
let schemas = new_schema();
let rewrite = |expr| rewrite_expr(expr, &schemas).unwrap().to_string();
let expr = "time".as_expr().gt_eq(lit_timestamp_nano(1000));
@ -811,7 +807,7 @@ mod test {
/// valid operation for the given the operands. These are used when projecting columns.
#[test]
fn test_rewrite_expr_coercion_reduce_to_null() {
let (schemas, _) = new_schemas();
let schemas = new_schema();
let rewrite = |expr| rewrite_expr(expr, &schemas).unwrap().to_string();
//
@ -851,9 +847,9 @@ mod test {
fn test_rewrite_tag_columns_eq() {
test_helpers::maybe_start_logging();
let props = execution_props();
let (schemas, ds_schema) = new_schemas();
let schemas = new_schema();
let rewrite = |expr| {
rewrite_conditional_expr(&props, expr, &schemas, &ds_schema)
rewrite_conditional_expr(&props, expr, &schemas)
.unwrap()
.to_string()
};
@ -904,9 +900,9 @@ mod test {
fn test_rewrite_tag_columns_regex() {
let props = execution_props();
test_helpers::maybe_start_logging();
let (schemas, ds_schema) = new_schemas();
let schemas = new_schema();
let rewrite = |expr| {
rewrite_conditional_expr(&props, expr, &schemas, &ds_schema)
rewrite_conditional_expr(&props, expr, &schemas)
.unwrap()
.to_string()
};
@ -931,9 +927,9 @@ mod test {
fn test_fields_pass_thru() {
test_helpers::maybe_start_logging();
let props = execution_props();
let (schemas, ds_schema) = new_schemas();
let schemas = new_schema();
let rewrite = |expr| {
rewrite_conditional_expr(&props, expr, &schemas, &ds_schema)
rewrite_conditional_expr(&props, expr, &schemas)
.unwrap()
.to_string()
};

View File

@ -11,8 +11,11 @@ use influxdb_influxql_parser::literal::Number;
use influxdb_influxql_parser::string::Regex;
use query_functions::clean_non_meta_escapes;
use query_functions::coalesce_struct::coalesce_struct;
use schema::InfluxColumnType;
use std::sync::Arc;
use super::ir::{DataSourceSchema, Field};
pub(in crate::plan) fn binary_operator_to_df_operator(op: BinaryOperator) -> Operator {
match op {
BinaryOperator::Add => Operator::Plus,
@ -26,17 +29,62 @@ pub(in crate::plan) fn binary_operator_to_df_operator(op: BinaryOperator) -> Ope
}
}
/// Container for both the DataFusion and equivalent IOx schema.
pub(in crate::plan) struct Schemas {
/// Container for the DataFusion schema as well as
/// info on which columns are tags.
pub(in crate::plan) struct IQLSchema<'a> {
pub(in crate::plan) df_schema: DFSchemaRef,
tag_info: TagInfo<'a>,
}
impl Schemas {
pub(in crate::plan) fn new(df_schema: &DFSchemaRef) -> Result<Self> {
impl<'a> IQLSchema<'a> {
/// Create a new IQLSchema from a [`DataSourceSchema`] from the
/// FROM clause of a query or subquery.
pub(in crate::plan) fn new_from_ds_schema(
df_schema: &DFSchemaRef,
ds_schema: DataSourceSchema<'a>,
) -> Result<Self> {
Ok(Self {
df_schema: Arc::clone(df_schema),
tag_info: TagInfo::DataSourceSchema(ds_schema),
})
}
/// Create a new IQLSchema from a list of [`Field`]s on the SELECT list
/// of a subquery.
pub(in crate::plan) fn new_from_fields(
df_schema: &DFSchemaRef,
fields: &'a [Field],
) -> Result<Self> {
Ok(Self {
df_schema: Arc::clone(df_schema),
tag_info: TagInfo::FieldList(fields),
})
}
/// Returns `true` if the schema contains a tag column with the specified name.
pub fn is_tag_field(&self, name: &str) -> bool {
match self.tag_info {
TagInfo::DataSourceSchema(ref ds_schema) => ds_schema.is_tag_field(name),
TagInfo::FieldList(fields) => fields
.iter()
.any(|f| f.name == name && f.data_type == Some(InfluxColumnType::Tag)),
}
}
/// Returns `true` if the schema contains a tag column with the specified name.
/// If the underlying data source is a subquery, it will apply any aliases in the
/// projection that represents the SELECT list.
pub fn is_projected_tag_field(&self, name: &str) -> bool {
match self.tag_info {
TagInfo::DataSourceSchema(ref ds_schema) => ds_schema.is_projected_tag_field(name),
_ => self.is_tag_field(name),
}
}
}
pub(in crate::plan) enum TagInfo<'a> {
DataSourceSchema(DataSourceSchema<'a>),
FieldList(&'a [Field]),
}
/// Sanitize an InfluxQL regular expression and create a compiled [`regex::Regex`].