feat: support nested aggregate subqueries
parent
0dd1826e3c
commit
c77c4b3d23
|
@ -2821,3 +2821,25 @@ tags: cpu=cpu1
|
||||||
| 2022-10-31T02:00:00 | 1.98 |
|
| 2022-10-31T02:00:00 | 1.98 |
|
||||||
| 2022-10-31T02:00:10 | 1.99 |
|
| 2022-10-31T02:00:10 | 1.99 |
|
||||||
+---------------------+------------+
|
+---------------------+------------+
|
||||||
|
-- InfluxQL: SELECT * FROM (SELECT MAX(value) FROM (SELECT DISTINCT(usage_idle) AS value FROM cpu)) GROUP BY cpu;
|
||||||
|
name: cpu
|
||||||
|
tags: cpu=cpu-total
|
||||||
|
+---------------------+------+
|
||||||
|
| time | max |
|
||||||
|
+---------------------+------+
|
||||||
|
| 1970-01-01T00:00:00 | 2.99 |
|
||||||
|
+---------------------+------+
|
||||||
|
name: cpu
|
||||||
|
tags: cpu=cpu0
|
||||||
|
+---------------------+------+
|
||||||
|
| time | max |
|
||||||
|
+---------------------+------+
|
||||||
|
| 1970-01-01T00:00:00 | 0.99 |
|
||||||
|
+---------------------+------+
|
||||||
|
name: cpu
|
||||||
|
tags: cpu=cpu1
|
||||||
|
+---------------------+------+
|
||||||
|
| time | max |
|
||||||
|
+---------------------+------+
|
||||||
|
| 1970-01-01T00:00:00 | 1.99 |
|
||||||
|
+---------------------+------+
|
|
@ -2,7 +2,7 @@ mod select;
|
||||||
|
|
||||||
use crate::plan::ir::{DataSource, Field, Select, SelectQuery};
|
use crate::plan::ir::{DataSource, Field, Select, SelectQuery};
|
||||||
use crate::plan::planner::select::{
|
use crate::plan::planner::select::{
|
||||||
check_exprs_satisfy_columns, fields_to_exprs_no_nulls, make_tag_key_column_meta, plan_with_sort,
|
fields_to_exprs_no_nulls, make_tag_key_column_meta, plan_with_sort,
|
||||||
};
|
};
|
||||||
use crate::plan::planner_time_range_expression::{
|
use crate::plan::planner_time_range_expression::{
|
||||||
duration_expr_to_nanoseconds, expr_to_df_interval_dt, time_range_to_df_expr,
|
duration_expr_to_nanoseconds, expr_to_df_interval_dt, time_range_to_df_expr,
|
||||||
|
@ -21,7 +21,7 @@ use datafusion::datasource::{provider_as_source, MemTable};
|
||||||
use datafusion::logical_expr::expr_rewriter::normalize_col;
|
use datafusion::logical_expr::expr_rewriter::normalize_col;
|
||||||
use datafusion::logical_expr::logical_plan::builder::project;
|
use datafusion::logical_expr::logical_plan::builder::project;
|
||||||
use datafusion::logical_expr::logical_plan::Analyze;
|
use datafusion::logical_expr::logical_plan::Analyze;
|
||||||
use datafusion::logical_expr::utils::{expr_as_column_expr, find_aggregate_exprs};
|
use datafusion::logical_expr::utils::find_aggregate_exprs;
|
||||||
use datafusion::logical_expr::{
|
use datafusion::logical_expr::{
|
||||||
binary_expr, col, date_bin, expr, expr::WindowFunction, lit, lit_timestamp_nano, now,
|
binary_expr, col, date_bin, expr, expr::WindowFunction, lit, lit_timestamp_nano, now,
|
||||||
window_function, Aggregate, AggregateFunction, AggregateUDF, Between, BuiltInWindowFunction,
|
window_function, Aggregate, AggregateFunction, AggregateUDF, Between, BuiltInWindowFunction,
|
||||||
|
@ -754,7 +754,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
||||||
|
|
||||||
let schema = ds.schema(self.s)?;
|
let schema = ds.schema(self.s)?;
|
||||||
group_by_exprs.extend(group_by_tag_set.iter().filter_map(|name| {
|
group_by_exprs.extend(group_by_tag_set.iter().filter_map(|name| {
|
||||||
if let Some(InfluxColumnType::Tag) = schema.field_type_by_name(name) {
|
if schema.is_tag_field(name) {
|
||||||
Some(name.as_expr())
|
Some(name.as_expr())
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
|
@ -818,13 +818,6 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
||||||
// the final projection from the aggregate operator.
|
// the final projection from the aggregate operator.
|
||||||
let aggr_projection_exprs = [aggr_group_by_exprs, aggr_exprs].concat();
|
let aggr_projection_exprs = [aggr_group_by_exprs, aggr_exprs].concat();
|
||||||
|
|
||||||
// Replace any expressions that are not a column with a column referencing
|
|
||||||
// an output column from the aggregate schema.
|
|
||||||
let column_exprs_post_aggr = aggr_projection_exprs
|
|
||||||
.iter()
|
|
||||||
.map(|expr| expr_as_column_expr(expr, &plan))
|
|
||||||
.collect::<Result<Vec<Expr>>>()?;
|
|
||||||
|
|
||||||
// Create a literal expression for `value` if the strategy
|
// Create a literal expression for `value` if the strategy
|
||||||
// is `FILL(<value>)`
|
// is `FILL(<value>)`
|
||||||
let fill_if_null = match fill_option {
|
let fill_if_null = match fill_option {
|
||||||
|
@ -850,29 +843,6 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
|
||||||
})
|
})
|
||||||
.collect::<Result<Vec<Expr>>>()?;
|
.collect::<Result<Vec<Expr>>>()?;
|
||||||
|
|
||||||
// Strip the NULL columns, which are tags that do not exist in the aggregate
|
|
||||||
// table schema. The NULL columns are projected as scalar values in the final
|
|
||||||
// projection.
|
|
||||||
let select_exprs_post_aggr_no_nulls = select_exprs_post_aggr
|
|
||||||
.iter()
|
|
||||||
.filter(|expr| match expr {
|
|
||||||
Expr::Alias(expr, _) => !matches!(**expr, Expr::Literal(ScalarValue::Null)),
|
|
||||||
_ => true,
|
|
||||||
})
|
|
||||||
.cloned()
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
// Finally, we ensure that the re-written projection can be resolved
|
|
||||||
// from the aggregate output columns and that there are no
|
|
||||||
// column references that are not aggregates.
|
|
||||||
//
|
|
||||||
// This will identify issues such as:
|
|
||||||
//
|
|
||||||
// SELECT COUNT(field), field FROM foo
|
|
||||||
//
|
|
||||||
// where the field without the aggregate is not valid.
|
|
||||||
check_exprs_satisfy_columns(&column_exprs_post_aggr, &select_exprs_post_aggr_no_nulls)?;
|
|
||||||
|
|
||||||
Ok((plan, select_exprs_post_aggr))
|
Ok((plan, select_exprs_post_aggr))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
use crate::plan::error;
|
|
||||||
use crate::plan::ir::Field;
|
use crate::plan::ir::Field;
|
||||||
use arrow::datatypes::DataType;
|
use arrow::datatypes::DataType;
|
||||||
use datafusion::common::{DFSchemaRef, Result};
|
use datafusion::common::{DFSchemaRef, Result};
|
||||||
use datafusion::logical_expr::utils::find_column_exprs;
|
|
||||||
use datafusion::logical_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
|
use datafusion::logical_expr::{Expr, LogicalPlan, LogicalPlanBuilder};
|
||||||
use datafusion_util::AsExpr;
|
use datafusion_util::AsExpr;
|
||||||
use generated_types::influxdata::iox::querier::v1::influx_ql_metadata::TagKeyColumn;
|
use generated_types::influxdata::iox::querier::v1::influx_ql_metadata::TagKeyColumn;
|
||||||
|
@ -11,19 +9,6 @@ use schema::INFLUXQL_MEASUREMENT_COLUMN_NAME;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
|
|
||||||
/// Determines that all [`Expr::Column`] references in `exprs` refer to a
|
|
||||||
/// column in `columns`.
|
|
||||||
pub(crate) fn check_exprs_satisfy_columns(columns: &[Expr], exprs: &[Expr]) -> Result<()> {
|
|
||||||
if !columns.iter().all(|c| matches!(c, Expr::Column(_))) {
|
|
||||||
return error::internal("expected Expr::Column");
|
|
||||||
}
|
|
||||||
let column_exprs = find_column_exprs(exprs);
|
|
||||||
if column_exprs.iter().any(|expr| !columns.contains(expr)) {
|
|
||||||
return error::query("mixing aggregate and non-aggregate columns is not supported");
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) fn make_tag_key_column_meta(
|
pub(super) fn make_tag_key_column_meta(
|
||||||
fields: &[Field],
|
fields: &[Field],
|
||||||
tag_set: &[&str],
|
tag_set: &[&str],
|
||||||
|
|
Loading…
Reference in New Issue