refactor: Pull out read_group order creation (#2832)

pull/24376/head
Andrew Lamb 2021-10-13 15:05:15 -04:00 committed by GitHub
parent e07461b250
commit e0929f20ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 61 additions and 24 deletions

View File

@ -8,7 +8,7 @@ use arrow::datatypes::{DataType, Field};
use data_types::chunk_metadata::ChunkId; use data_types::chunk_metadata::ChunkId;
use datafusion::{ use datafusion::{
error::{DataFusionError, Result as DatafusionResult}, error::{DataFusionError, Result as DatafusionResult},
logical_plan::{lit, Column, Expr, ExprRewriter, LogicalPlan, LogicalPlanBuilder}, logical_plan::{lit, Column, DFSchemaRef, Expr, ExprRewriter, LogicalPlan, LogicalPlanBuilder},
optimizer::utils::expr_to_columns, optimizer::utils::expr_to_columns,
prelude::col, prelude::col,
}; };
@ -1004,7 +1004,7 @@ impl InfluxRpcPlanner {
/// agg_function(_valN) as _valueN /// agg_function(_valN) as _valueN
/// agg_function(time) as time /// agg_function(time) as time
/// GROUP BY /// GROUP BY
/// group_key1, group_key2, remaining tags, /// tags,
/// ORDER BY /// ORDER BY
/// group_key1, group_key2, remaining tags /// group_key1, group_key2, remaining tags
/// ///
@ -1022,7 +1022,7 @@ impl InfluxRpcPlanner {
/// agg_function(_valN) as _valueN /// agg_function(_valN) as _valueN
/// agg_function(time) as timeN /// agg_function(time) as timeN
/// GROUP BY /// GROUP BY
/// group_key1, group_key2, remaining tags, /// tags
/// ORDER BY /// ORDER BY
/// group_key1, group_key2, remaining tags /// group_key1, group_key2, remaining tags
/// ///
@ -1061,11 +1061,6 @@ impl InfluxRpcPlanner {
// order in the same order) // order in the same order)
let tag_columns: Vec<_> = schema.tags_iter().map(|f| f.name() as &str).collect(); let tag_columns: Vec<_> = schema.tags_iter().map(|f| f.name() as &str).collect();
let tag_columns: Vec<Arc<str>> = reorder_prefix(group_columns, tag_columns)?
.into_iter()
.map(Arc::from)
.collect();
// Group by all tag columns // Group by all tag columns
let group_exprs = tag_columns let group_exprs = tag_columns
.iter() .iter()
@ -1077,23 +1072,56 @@ impl InfluxRpcPlanner {
field_columns, field_columns,
} = AggExprs::try_new(agg, &schema, &predicate)?; } = AggExprs::try_new(agg, &schema, &predicate)?;
let sort_exprs = group_exprs
.iter()
.map(|expr| expr.as_sort_expr())
.collect::<Vec<_>>();
let plan_builder = plan_builder let plan_builder = plan_builder
.aggregate(group_exprs, agg_exprs) .aggregate(group_exprs, agg_exprs)
.context(BuildingPlan)?; .context(BuildingPlan)?;
// Reorganize the output so the group columns are first and
// the output is sorted first on the group columns and then
// any remaining tags
//
// This ensures that the `group_columns` are next to each
// other in the output in the order expected by flux
let reordered_tag_columns: Vec<Arc<str>> = reorder_prefix(group_columns, tag_columns)?
.into_iter()
.map(Arc::from)
.collect();
// no columns if there are no tags in the input and no group columns in the query
let plan_builder = if !reordered_tag_columns.is_empty() {
// reorder columns
let reorder_exprs = reordered_tag_columns
.iter()
.map(|tag_name| tag_name.as_expr())
.collect::<Vec<_>>();
let sort_exprs = reorder_exprs
.iter()
.map(|expr| expr.as_sort_expr())
.collect::<Vec<_>>();
let project_exprs =
project_exprs_in_schema(&reordered_tag_columns, plan_builder.schema());
plan_builder
.project(project_exprs)
.context(BuildingPlan)?
.sort(sort_exprs)
.context(BuildingPlan)?
} else {
plan_builder
};
let plan_builder = cast_aggregates(plan_builder, agg, &field_columns)?; let plan_builder = cast_aggregates(plan_builder, agg, &field_columns)?;
let plan_builder = add_sort(plan_builder, sort_exprs)?;
// and finally create the plan
let plan = plan_builder.build().context(BuildingPlan)?; let plan = plan_builder.build().context(BuildingPlan)?;
let ss_plan = SeriesSetPlan::new(Arc::from(table_name), plan, tag_columns, field_columns); let ss_plan = SeriesSetPlan::new(
Arc::from(table_name),
plan,
reordered_tag_columns,
field_columns,
);
Ok(Some(ss_plan)) Ok(Some(ss_plan))
} }
@ -1287,13 +1315,22 @@ impl InfluxRpcPlanner {
} }
} }
/// Adds a sort to the plan_builder if there are any sort exprs /// Return a `Vec` of `Exprs` such that it starts with `prefix` cols and
fn add_sort(plan_builder: LogicalPlanBuilder, sort_exprs: Vec<Expr>) -> Result<LogicalPlanBuilder> { /// then has all columns in `schema` that are not already in the prefix.
if sort_exprs.is_empty() { fn project_exprs_in_schema(prefix: &[Arc<str>], schema: &DFSchemaRef) -> Vec<Expr> {
Ok(plan_builder) let seen: HashSet<_> = prefix.iter().map(|s| s.as_ref()).collect();
let prefix_exprs = prefix.iter().map(|name| col(name));
let new_exprs = schema.fields().iter().filter_map(|f| {
let name = f.name().as_str();
if !seen.contains(name) {
Some(col(name))
} else { } else {
plan_builder.sort(sort_exprs).context(BuildingPlan) None
} }
});
prefix_exprs.chain(new_exprs).collect()
} }
/// casts aggregates (fields named in field_columns) to the types /// casts aggregates (fields named in field_columns) to the types