From 7ba619a32b135d65b651705f17c90898f6b2e36d Mon Sep 17 00:00:00 2001 From: Stuart Carnie Date: Mon, 15 May 2023 15:31:20 +1000 Subject: [PATCH] feat: outer GROUP BY pushed down to subqueries; more Cloud 2 examples --- .../query_tests/cases/in/issue_6112.influxql | 51 ++++- .../cases/in/issue_6112.influxql.expected | 53 +++++- iox_query_influxql/src/plan/ir.rs | 7 +- iox_query_influxql/src/plan/planner.rs | 179 +++--------------- 4 files changed, 138 insertions(+), 152 deletions(-) diff --git a/influxdb_iox/tests/query_tests/cases/in/issue_6112.influxql b/influxdb_iox/tests/query_tests/cases/in/issue_6112.influxql index 955e790520..969518f3c3 100644 --- a/influxdb_iox/tests/query_tests/cases/in/issue_6112.influxql +++ b/influxdb_iox/tests/query_tests/cases/in/issue_6112.influxql @@ -548,7 +548,56 @@ SELECT * FROM (SELECT usage_idle FROM cpu GROUP BY cpu) GROUP BY cpu; -- GROUP BY should be pushed to subquery SELECT * FROM (SELECT usage_idle FROM cpu) GROUP BY cpu; +-- GROUP BY should be pushed through multiple levels of subqueries SELECT * FROM (SELECT MAX(value) FROM (SELECT DISTINCT(usage_idle) AS value FROM cpu)) GROUP BY cpu; -- GROUP BY time with default FILL(none) for subquery -SELECT value FROM (SELECT mean(usage_idle) AS value FROM cpu GROUP BY TIME(10s)); \ No newline at end of file +SELECT value FROM (SELECT mean(usage_idle) AS value FROM cpu GROUP BY TIME(10s)); + +-- Multiple subqueries from same measurement, rows with same timestamp are not combined +SELECT sum_idle, sum_system FROM (SELECT sum(usage_idle) AS sum_idle FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)), (SELECT sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)); + +-- Similar query as prior, rows with same timestamp are combined (as expected) +SELECT sum_idle, sum_system FROM (SELECT sum(usage_idle) AS sum_idle, sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)); + +-- TODO(sgc): Incorrect output +-- Multiple subqueries from same measurement +-- * Outer query projects aggregates, values at same timestamp should be combined +-- Source: Cloud 2 +-- Expected output: +-- name: cpu +-- +---------------------+------+--------+ +-- | time | last | last_1 | +-- +---------------------+------+--------+ +-- | 1970-01-01T00:00:00 | 2.99 | 2.1 | +-- +---------------------+------+--------+ +-- SELECT last(sum_idle), last(sum_system) FROM (SELECT sum(usage_idle) AS sum_idle FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)), (SELECT sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)); + +-- Similar query produces expected output of prior query +SELECT last(sum_idle), last(sum_system) FROM (SELECT sum(usage_idle) AS sum_idle, sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)); + + +-- TODO(sgc): Incorrect output +-- Multiple subqueries from same measurement, GROUP BY TIME +-- * Outer query projects aggregates, values at same timestamp should be combined +-- Source: Cloud 2 +-- Expected output: +-- name: cpu +-- +---------------------+------+--------+ +-- | time | last | last_1 | +-- +---------------------+------+--------+ +-- | 2022-10-31T02:00:00 | 2.98 | 2.2 | +-- | 2022-10-31T02:00:10 | 2.99 | 2.1 | +-- +---------------------+------+--------+ +-- SELECT last(sum_idle), last(sum_system) FROM (SELECT sum(usage_idle) AS sum_idle FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)), (SELECT sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)) GROUP BY time(10s) FILL(none); + +-- Similar query produces expected output of prior query +SELECT last(sum_idle), last(sum_system) FROM (SELECT sum(usage_idle) AS sum_idle, sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)) GROUP BY time(10s) FILL(none); + +-- Conditions on fields projected of subquery +-- Source: Cloud 2 +SELECT P / 99.99 FROM (SELECT max(usage_system) AS P, mean(usage_idle) AS S FROM cpu GROUP BY time(10s)) WHERE S > 0.50 AND S < 1.50 AND P > 0.0 AND P < 1.0 GROUP BY cpu FILL(null); + +-- DISTINCT tag value from subquery +-- Source: Cloud 2 +SELECT distinct(cpu) FROM (SELECT usage_idle, cpu FROM cpu); \ No newline at end of file diff --git a/influxdb_iox/tests/query_tests/cases/in/issue_6112.influxql.expected b/influxdb_iox/tests/query_tests/cases/in/issue_6112.influxql.expected index f8fb7ed329..7f297a417c 100644 --- a/influxdb_iox/tests/query_tests/cases/in/issue_6112.influxql.expected +++ b/influxdb_iox/tests/query_tests/cases/in/issue_6112.influxql.expected @@ -2850,4 +2850,55 @@ name: cpu +---------------------+--------------------+ | 2022-10-31T02:00:00 | 1.9799999999999998 | | 2022-10-31T02:00:10 | 1.9900000000000002 | -+---------------------+--------------------+ \ No newline at end of file ++---------------------+--------------------+ +-- InfluxQL: SELECT sum_idle, sum_system FROM (SELECT sum(usage_idle) AS sum_idle FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)), (SELECT sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)); +name: cpu ++---------------------+----------+------------+ +| time | sum_idle | sum_system | ++---------------------+----------+------------+ +| 2022-10-31T02:00:00 | 2.98 | | +| 2022-10-31T02:00:00 | | 2.2 | +| 2022-10-31T02:00:10 | 2.99 | | +| 2022-10-31T02:00:10 | | 2.1 | ++---------------------+----------+------------+ +-- InfluxQL: SELECT sum_idle, sum_system FROM (SELECT sum(usage_idle) AS sum_idle, sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)); +name: cpu ++---------------------+----------+------------+ +| time | sum_idle | sum_system | ++---------------------+----------+------------+ +| 2022-10-31T02:00:00 | 2.98 | 2.2 | +| 2022-10-31T02:00:10 | 2.99 | 2.1 | ++---------------------+----------+------------+ +-- InfluxQL: SELECT last(sum_idle), last(sum_system) FROM (SELECT sum(usage_idle) AS sum_idle, sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)); +name: cpu ++---------------------+------+--------+ +| time | last | last_1 | ++---------------------+------+--------+ +| 1970-01-01T00:00:00 | 2.99 | 2.1 | ++---------------------+------+--------+ +-- InfluxQL: SELECT last(sum_idle), last(sum_system) FROM (SELECT sum(usage_idle) AS sum_idle, sum(usage_system) AS sum_system FROM cpu WHERE cpu = 'cpu-total' GROUP BY time(10s)) GROUP BY time(10s) FILL(none); +name: cpu ++---------------------+------+--------+ +| time | last | last_1 | ++---------------------+------+--------+ +| 2022-10-31T02:00:00 | 2.98 | 2.2 | +| 2022-10-31T02:00:10 | 2.99 | 2.1 | ++---------------------+------+--------+ +-- InfluxQL: SELECT P / 99.99 FROM (SELECT max(usage_system) AS P, mean(usage_idle) AS S FROM cpu GROUP BY time(10s)) WHERE S > 0.50 AND S < 1.50 AND P > 0.0 AND P < 1.0 GROUP BY cpu FILL(null); +name: cpu +tags: cpu=cpu0 ++---------------------+-------------------+ +| time | P | ++---------------------+-------------------+ +| 2022-10-31T02:00:00 | 0.002000200020002 | +| 2022-10-31T02:00:10 | 0.001000100010001 | ++---------------------+-------------------+ +-- InfluxQL: SELECT distinct(cpu) FROM (SELECT usage_idle, cpu FROM cpu); +name: cpu ++---------------------+-----------+ +| time | distinct | ++---------------------+-----------+ +| 1970-01-01T00:00:00 | cpu-total | +| 1970-01-01T00:00:00 | cpu0 | +| 1970-01-01T00:00:00 | cpu1 | ++---------------------+-----------+ \ No newline at end of file diff --git a/iox_query_influxql/src/plan/ir.rs b/iox_query_influxql/src/plan/ir.rs index 5244317725..ec85bb2ace 100644 --- a/iox_query_influxql/src/plan/ir.rs +++ b/iox_query_influxql/src/plan/ir.rs @@ -31,8 +31,11 @@ pub(super) struct SelectQuery { #[derive(Debug, Default, Clone)] pub(super) struct Select { - /// The schema of the selection. - // pub(super) schema: Todo, + /// The depth of the selection, where a value > 0 indicates + /// this is a subquery. + pub(super) depth: u32, + + /// The projection type of the selection. pub(super) projection_type: ProjectionType, /// Projection clause of the selection. diff --git a/iox_query_influxql/src/plan/planner.rs b/iox_query_influxql/src/plan/planner.rs index f379235751..eadb2a9272 100644 --- a/iox_query_influxql/src/plan/planner.rs +++ b/iox_query_influxql/src/plan/planner.rs @@ -325,153 +325,10 @@ impl<'a> InfluxQLToLogicalPlan<'a> { .with_has_multiple_measurements(query.has_multiple_measurements) .with_root_group_by_tags(&group_by_tags); - // Skip the `time` column - let fields_no_time = &select.fields[1..]; - // always start with the time column - let mut fields = vec![select.fields.first().cloned().unwrap()]; - - // group_by_tag_set : a list of tag columns specified in the GROUP BY clause - // projection_tag_set : a list of tag columns specified exclusively in the SELECT projection - // is_projected : a list of booleans indicating whether matching elements in the - // group_by_tag_set are also projected in the query - let (group_by_tag_set, projection_tag_set, is_projected) = - if let Some(group_by) = &select.group_by { - let mut tag_columns = - find_tag_and_unknown_columns(fields_no_time).collect::>(); - - // Find the list of tag keys specified in the `GROUP BY` clause, and - // whether any of the tag keys are also projected in the SELECT list. - let (tag_set, is_projected): (Vec<_>, Vec<_>) = group_by - .tags() - .map(|t| t.deref().as_str()) - .map(|s| (s, tag_columns.contains(s))) - // We sort the tag set, to ensure correct ordering of the results. The tag columns - // referenced in the `tag_set` variable are added to the sort operator in - // lexicographically ascending order. - .sorted_by(|a, b| a.0.cmp(b.0)) - .unzip(); - - // Tags specified in the `GROUP BY` clause that are not already added to the - // projection must be projected, so they can be used in the group key. - // - // At the end of the loop, the `tag_columns` set will contain the tag columns that - // exist in the projection and not in the `GROUP BY`. - fields.extend( - tag_set - .iter() - .filter_map(|col| match tag_columns.remove(*col) { - true => None, - false => Some(Field { - expr: IQLExpr::VarRef(VarRef { - name: (*col).into(), - data_type: Some(VarRefDataType::Tag), - }), - name: col.to_string(), - data_type: Some(InfluxColumnType::Tag), - }), - }), - ); - - ( - tag_set, - tag_columns.into_iter().sorted().collect::>(), - is_projected, - ) - } else { - let tag_columns = find_tag_and_unknown_columns(fields_no_time) - .sorted() - .collect::>(); - (vec![], tag_columns, vec![]) - }; - - fields.extend(fields_no_time.iter().cloned()); - - let plan = { - let mut iter = select.from.iter(); - let plan = match iter.next() { - Some(ds) => self.project_select(&ctx, ds, select, &fields, &group_by_tag_set), - None => { - // empty result, but let's at least have all the strictly necessary metadata - let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( - INFLUXQL_MEASUREMENT_COLUMN_NAME, - (&InfluxColumnType::Tag).into(), - false, - )])); - let plan = LogicalPlan::EmptyRelation(EmptyRelation { - produce_one_row: false, - schema: schema.to_dfschema_ref()?, - }); - let plan = plan_with_metadata( - plan, - &InfluxQlMetadata { - measurement_column_index: MEASUREMENT_COLUMN_INDEX, - tag_key_columns: vec![], - }, - )?; - return Ok(plan); - } - }?; - - iter.try_fold(plan, |prev, ds| { - let next = self.project_select(&ctx, ds, select, &fields, &group_by_tag_set)?; - LogicalPlanBuilder::from(prev).union(next)?.build() - })? - }; - - let plan = plan_with_metadata( - plan, - &InfluxQlMetadata { - measurement_column_index: MEASUREMENT_COLUMN_INDEX, - tag_key_columns: make_tag_key_column_meta( - &fields, - &group_by_tag_set, - &is_projected, - ), - }, - )?; - - // the sort planner node must refer to the time column using - // the alias that was specified - let time_alias = fields[0].name.as_str(); - - let time_sort_expr = time_alias.as_expr().sort( - match select.order_by { - // Default behaviour is to sort by time in ascending order if there is no ORDER BY - None | Some(OrderByClause::Ascending) => true, - Some(OrderByClause::Descending) => false, - }, - false, - ); - - let plan = plan_with_sort( - plan, - vec![time_sort_expr.clone()], - ctx.has_multiple_measurements, - &group_by_tag_set, - &projection_tag_set, - )?; - - let plan = self.limit( - plan, - select.offset, - select.limit, - vec![time_sort_expr], - ctx.has_multiple_measurements, - &group_by_tag_set, - &projection_tag_set, - )?; - - Ok(plan) + self.select_to_plan(&ctx, select) } fn select_to_plan(&self, ctx: &Context<'_>, select: &Select) -> Result { - let ctx = Context::new() - .with_projection_type(select.projection_type) - .with_timezone(select.timezone) - .with_group_by_fill(select) - .with_has_multiple_measurements(ctx.has_multiple_measurements) - .with_root_group_by_tags(ctx.root_group_by_tags); - // Skip the `time` column let fields_no_time = &select.fields[1..]; // always start with the time column @@ -561,6 +418,23 @@ impl<'a> InfluxQLToLogicalPlan<'a> { })? }; + let plan = if select.depth == 0 { + // Add the metadata to the root SELECT query + plan_with_metadata( + plan, + &InfluxQlMetadata { + measurement_column_index: MEASUREMENT_COLUMN_INDEX, + tag_key_columns: make_tag_key_column_meta( + &fields, + &group_by_tag_set, + &is_projected, + ), + }, + )? + } else { + plan + }; + // the sort planner node must refer to the time column using // the alias that was specified let time_alias = fields[0].name.as_str(); @@ -1369,10 +1243,19 @@ impl<'a> InfluxQLToLogicalPlan<'a> { vec![lit_dict(table_name).alias(INFLUXQL_MEASUREMENT_COLUMN_NAME)], )) } - DataSource::Subquery(select) => Ok(( - self.select_to_plan(ctx, select)?, - vec![INFLUXQL_MEASUREMENT_COLUMN_NAME.as_expr()], - )), + DataSource::Subquery(select) => { + let ctx = Context::new() + .with_projection_type(select.projection_type) + .with_timezone(select.timezone) + .with_group_by_fill(select) + .with_has_multiple_measurements(ctx.has_multiple_measurements) + .with_root_group_by_tags(ctx.root_group_by_tags); + + Ok(( + self.select_to_plan(&ctx, select)?, + vec![INFLUXQL_MEASUREMENT_COLUMN_NAME.as_expr()], + )) + } } }