From c77c4b3d238b858c31733c717cf58632a71739bc Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Mon, 15 May 2023 09:31:06 +1000 Subject: [PATCH] feat: support nested aggregate subqueries --- .../cases/in/issue_6112.influxql.expected | 24 ++++++++++++- iox_query_influxql/src/plan/planner.rs | 36 ++----------------- iox_query_influxql/src/plan/planner/select.rs | 15 -------- 3 files changed, 26 insertions(+), 49 deletions(-) diff --git a/influxdb_iox/tests/query_tests/cases/in/issue_6112.influxql.expected b/influxdb_iox/tests/query_tests/cases/in/issue_6112.influxql.expected index d0a26535b2..6b9519d952 100644 --- a/influxdb_iox/tests/query_tests/cases/in/issue_6112.influxql.expected +++ b/influxdb_iox/tests/query_tests/cases/in/issue_6112.influxql.expected @@ -2820,4 +2820,26 @@ tags: cpu=cpu1 +---------------------+------------+ | 2022-10-31T02:00:00 | 1.98 | | 2022-10-31T02:00:10 | 1.99 | -+---------------------+------------+ \ No newline at end of file ++---------------------+------------+ +-- InfluxQL: SELECT * FROM (SELECT MAX(value) FROM (SELECT DISTINCT(usage_idle) AS value FROM cpu)) GROUP BY cpu; +name: cpu +tags: cpu=cpu-total ++---------------------+------+ +| time | max | ++---------------------+------+ +| 1970-01-01T00:00:00 | 2.99 | ++---------------------+------+ +name: cpu +tags: cpu=cpu0 ++---------------------+------+ +| time | max | ++---------------------+------+ +| 1970-01-01T00:00:00 | 0.99 | ++---------------------+------+ +name: cpu +tags: cpu=cpu1 ++---------------------+------+ +| time | max | ++---------------------+------+ +| 1970-01-01T00:00:00 | 1.99 | ++---------------------+------+ \ No newline at end of file diff --git a/iox_query_influxql/src/plan/planner.rs b/iox_query_influxql/src/plan/planner.rs index e1953a13ea..9e1eb20760 100644 --- a/iox_query_influxql/src/plan/planner.rs +++ b/iox_query_influxql/src/plan/planner.rs @@ -2,7 +2,7 @@ mod select; use crate::plan::ir::{DataSource, Field, Select, SelectQuery}; use crate::plan::planner::select::{ - check_exprs_satisfy_columns, fields_to_exprs_no_nulls, make_tag_key_column_meta, plan_with_sort, + fields_to_exprs_no_nulls, make_tag_key_column_meta, plan_with_sort, }; use crate::plan::planner_time_range_expression::{ duration_expr_to_nanoseconds, expr_to_df_interval_dt, time_range_to_df_expr, @@ -21,7 +21,7 @@ use datafusion::datasource::{provider_as_source, MemTable}; use datafusion::logical_expr::expr_rewriter::normalize_col; use datafusion::logical_expr::logical_plan::builder::project; use datafusion::logical_expr::logical_plan::Analyze; -use datafusion::logical_expr::utils::{expr_as_column_expr, find_aggregate_exprs}; +use datafusion::logical_expr::utils::find_aggregate_exprs; use datafusion::logical_expr::{ binary_expr, col, date_bin, expr, expr::WindowFunction, lit, lit_timestamp_nano, now, window_function, Aggregate, AggregateFunction, AggregateUDF, Between, BuiltInWindowFunction, @@ -754,7 +754,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> { let schema = ds.schema(self.s)?; group_by_exprs.extend(group_by_tag_set.iter().filter_map(|name| { - if let Some(InfluxColumnType::Tag) = schema.field_type_by_name(name) { + if schema.is_tag_field(name) { Some(name.as_expr()) } else { None @@ -818,13 +818,6 @@ impl<'a> InfluxQLToLogicalPlan<'a> { // the final projection from the aggregate operator. let aggr_projection_exprs = [aggr_group_by_exprs, aggr_exprs].concat(); - // Replace any expressions that are not a column with a column referencing - // an output column from the aggregate schema. - let column_exprs_post_aggr = aggr_projection_exprs - .iter() - .map(|expr| expr_as_column_expr(expr, &plan)) - .collect::>>()?; - // Create a literal expression for `value` if the strategy // is `FILL()` let fill_if_null = match fill_option { @@ -850,29 +843,6 @@ impl<'a> InfluxQLToLogicalPlan<'a> { }) .collect::>>()?; - // Strip the NULL columns, which are tags that do not exist in the aggregate - // table schema. The NULL columns are projected as scalar values in the final - // projection. - let select_exprs_post_aggr_no_nulls = select_exprs_post_aggr - .iter() - .filter(|expr| match expr { - Expr::Alias(expr, _) => !matches!(**expr, Expr::Literal(ScalarValue::Null)), - _ => true, - }) - .cloned() - .collect::>(); - - // Finally, we ensure that the re-written projection can be resolved - // from the aggregate output columns and that there are no - // column references that are not aggregates. - // - // This will identify issues such as: - // - // SELECT COUNT(field), field FROM foo - // - // where the field without the aggregate is not valid. - check_exprs_satisfy_columns(&column_exprs_post_aggr, &select_exprs_post_aggr_no_nulls)?; - Ok((plan, select_exprs_post_aggr)) } diff --git a/iox_query_influxql/src/plan/planner/select.rs b/iox_query_influxql/src/plan/planner/select.rs index 8d527416ba..2062bca0fe 100644 --- a/iox_query_influxql/src/plan/planner/select.rs +++ b/iox_query_influxql/src/plan/planner/select.rs @@ -1,8 +1,6 @@ -use crate::plan::error; use crate::plan::ir::Field; use arrow::datatypes::DataType; use datafusion::common::{DFSchemaRef, Result}; -use datafusion::logical_expr::utils::find_column_exprs; use datafusion::logical_expr::{Expr, LogicalPlan, LogicalPlanBuilder}; use datafusion_util::AsExpr; use generated_types::influxdata::iox::querier::v1::influx_ql_metadata::TagKeyColumn; @@ -11,19 +9,6 @@ use schema::INFLUXQL_MEASUREMENT_COLUMN_NAME; use std::collections::HashMap; use std::ops::Deref; -/// Determines that all [`Expr::Column`] references in `exprs` refer to a -/// column in `columns`. -pub(crate) fn check_exprs_satisfy_columns(columns: &[Expr], exprs: &[Expr]) -> Result<()> { - if !columns.iter().all(|c| matches!(c, Expr::Column(_))) { - return error::internal("expected Expr::Column"); - } - let column_exprs = find_column_exprs(exprs); - if column_exprs.iter().any(|expr| !columns.contains(expr)) { - return error::query("mixing aggregate and non-aggregate columns is not supported"); - } - Ok(()) -} - pub(super) fn make_tag_key_column_meta( fields: &[Field], tag_set: &[&str],