From e0929f20ae7a41b970212b20f58eb35232e26621 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 13 Oct 2021 15:05:15 -0400 Subject: [PATCH] refactor: Pull out read_group order creation (#2832) --- query/src/frontend/influxrpc.rs | 85 +++++++++++++++++++++++---------- 1 file changed, 61 insertions(+), 24 deletions(-) diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index 814cd5f8f9..b059fd0dba 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -8,7 +8,7 @@ use arrow::datatypes::{DataType, Field}; use data_types::chunk_metadata::ChunkId; use datafusion::{ error::{DataFusionError, Result as DatafusionResult}, - logical_plan::{lit, Column, Expr, ExprRewriter, LogicalPlan, LogicalPlanBuilder}, + logical_plan::{lit, Column, DFSchemaRef, Expr, ExprRewriter, LogicalPlan, LogicalPlanBuilder}, optimizer::utils::expr_to_columns, prelude::col, }; @@ -1004,7 +1004,7 @@ impl InfluxRpcPlanner { /// agg_function(_valN) as _valueN /// agg_function(time) as time /// GROUP BY - /// group_key1, group_key2, remaining tags, + /// tags, /// ORDER BY /// group_key1, group_key2, remaining tags /// @@ -1022,7 +1022,7 @@ impl InfluxRpcPlanner { /// agg_function(_valN) as _valueN /// agg_function(time) as timeN /// GROUP BY - /// group_key1, group_key2, remaining tags, + /// tags /// ORDER BY /// group_key1, group_key2, remaining tags /// @@ -1061,11 +1061,6 @@ impl InfluxRpcPlanner { // order in the same order) let tag_columns: Vec<_> = schema.tags_iter().map(|f| f.name() as &str).collect(); - let tag_columns: Vec> = reorder_prefix(group_columns, tag_columns)? - .into_iter() - .map(Arc::from) - .collect(); - // Group by all tag columns let group_exprs = tag_columns .iter() @@ -1077,23 +1072,56 @@ impl InfluxRpcPlanner { field_columns, } = AggExprs::try_new(agg, &schema, &predicate)?; - let sort_exprs = group_exprs - .iter() - .map(|expr| expr.as_sort_expr()) - .collect::>(); - let plan_builder = plan_builder .aggregate(group_exprs, agg_exprs) .context(BuildingPlan)?; + // Reorganize the output so the group columns are first and + // the output is sorted first on the group columns and then + // any remaining tags + // + // This ensures that the `group_columns` are next to each + // other in the output in the order expected by flux + let reordered_tag_columns: Vec> = reorder_prefix(group_columns, tag_columns)? + .into_iter() + .map(Arc::from) + .collect(); + + // no columns if there are no tags in the input and no group columns in the query + let plan_builder = if !reordered_tag_columns.is_empty() { + // reorder columns + let reorder_exprs = reordered_tag_columns + .iter() + .map(|tag_name| tag_name.as_expr()) + .collect::>(); + + let sort_exprs = reorder_exprs + .iter() + .map(|expr| expr.as_sort_expr()) + .collect::>(); + + let project_exprs = + project_exprs_in_schema(&reordered_tag_columns, plan_builder.schema()); + + plan_builder + .project(project_exprs) + .context(BuildingPlan)? + .sort(sort_exprs) + .context(BuildingPlan)? + } else { + plan_builder + }; + let plan_builder = cast_aggregates(plan_builder, agg, &field_columns)?; - let plan_builder = add_sort(plan_builder, sort_exprs)?; - - // and finally create the plan let plan = plan_builder.build().context(BuildingPlan)?; - let ss_plan = SeriesSetPlan::new(Arc::from(table_name), plan, tag_columns, field_columns); + let ss_plan = SeriesSetPlan::new( + Arc::from(table_name), + plan, + reordered_tag_columns, + field_columns, + ); Ok(Some(ss_plan)) } @@ -1287,13 +1315,22 @@ impl InfluxRpcPlanner { } } -/// Adds a sort to the plan_builder if there are any sort exprs -fn add_sort(plan_builder: LogicalPlanBuilder, sort_exprs: Vec) -> Result { - if sort_exprs.is_empty() { - Ok(plan_builder) - } else { - plan_builder.sort(sort_exprs).context(BuildingPlan) - } +/// Return a `Vec` of `Exprs` such that it starts with `prefix` cols and +/// then has all columns in `schema` that are not already in the prefix. +fn project_exprs_in_schema(prefix: &[Arc], schema: &DFSchemaRef) -> Vec { + let seen: HashSet<_> = prefix.iter().map(|s| s.as_ref()).collect(); + + let prefix_exprs = prefix.iter().map(|name| col(name)); + let new_exprs = schema.fields().iter().filter_map(|f| { + let name = f.name().as_str(); + if !seen.contains(name) { + Some(col(name)) + } else { + None + } + }); + + prefix_exprs.chain(new_exprs).collect() } /// casts aggregates (fields named in field_columns) to the types