diff --git a/iox_query/src/physical_optimizer/projection_pushdown.rs b/iox_query/src/physical_optimizer/projection_pushdown.rs index 86da9bff2b..afa5a1e3c0 100644 --- a/iox_query/src/physical_optimizer/projection_pushdown.rs +++ b/iox_query/src/physical_optimizer/projection_pushdown.rs @@ -19,7 +19,7 @@ use datafusion::{ filter::FilterExec, projection::ProjectionExec, rewrite::TreeNodeRewritable, - sorts::sort::SortExec, + sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec}, union::UnionExec, ExecutionPlan, PhysicalExpr, }, @@ -32,10 +32,11 @@ use crate::provider::{DeduplicateExec, RecordBatchesExec}; pub struct ProjectionPushdown; impl PhysicalOptimizerRule for ProjectionPushdown { + #[allow(clippy::only_used_in_recursion)] fn optimize( &self, plan: Arc, - _config: &ConfigOptions, + config: &ConfigOptions, ) -> Result> { plan.transform_down(&|plan| { let plan_any = plan.as_any(); @@ -149,6 +150,47 @@ impl PhysicalOptimizerRule for ProjectionPushdown { }, )?; + return Ok(Some(plan)); + } else if let Some(child_sort) = child_any.downcast_ref::() + { + let sort_required_cols = child_sort + .expr() + .iter() + .map(|expr| collect_columns(&expr.expr)) + .collect::>(); + let sort_required_cols = sort_required_cols + .iter() + .flat_map(|cols| cols.iter()) + .map(|col| col.name()) + .collect::>(); + + let plan = wrap_user_into_projections( + &sort_required_cols, + &column_names, + Arc::clone(child_sort.input()), + |plan| { + Ok(Arc::new(SortPreservingMergeExec::new( + reassign_sort_exprs_columns(child_sort.expr(), &plan.schema())?, + plan, + ))) + }, + )?; + + return Ok(Some(plan)); + } else if let Some(child_proj) = child_any.downcast_ref::() { + let expr = column_indices + .iter() + .map(|idx| child_proj.expr()[*idx].clone()) + .collect(); + let plan = Arc::new(ProjectionExec::try_new( + expr, + Arc::clone(child_proj.input()), + )?); + + // need to call `optimize` directly on the plan, because otherwise we would continue with the child + // and miss the optimization of that particular new ProjectionExec + let plan = self.optimize(plan, config)?; + return Ok(Some(plan)); } else if let Some(child_dedup) = child_any.downcast_ref::() { let dedup_required_cols = child_dedup.sort_columns(); @@ -793,6 +835,135 @@ mod tests { ); } + // since `SortPreservingMergeExec` and `FilterExec` both use `wrap_user_into_projections`, we only test one variant for `SortPreservingMergeExec` + #[test] + fn test_sortpreservingmerge_projection_split() { + let schema = schema(); + let plan = Arc::new( + ProjectionExec::try_new( + vec![(expr_col("tag1", &schema), String::from("tag1"))], + Arc::new(SortPreservingMergeExec::new( + vec![PhysicalSortExpr { + expr: expr_col("tag2", &schema), + options: SortOptions { + descending: true, + ..Default::default() + }, + }], + Arc::new(TestExec::new(schema)), + )), + ) + .unwrap(), + ); + let opt = ProjectionPushdown::default(); + insta::assert_yaml_snapshot!( + OptimizationTest::new(plan, opt), + @r###" + --- + input: + - " ProjectionExec: expr=[tag1@0 as tag1]" + - " SortPreservingMergeExec: [tag2@1 DESC]" + - " Test" + output: + Ok: + - " ProjectionExec: expr=[tag1@0 as tag1]" + - " SortPreservingMergeExec: [tag2@1 DESC]" + - " ProjectionExec: expr=[tag1@0 as tag1, tag2@1 as tag2]" + - " Test" + "### + ); + } + + #[test] + fn test_nested_proj_inner_is_impure() { + let schema = schema(); + let plan = Arc::new(EmptyExec::new(false, schema)); + let plan = Arc::new( + ProjectionExec::try_new( + vec![ + ( + Arc::new(Literal::new(ScalarValue::from("foo"))), + String::from("tag1"), + ), + ( + Arc::new(Literal::new(ScalarValue::from("bar"))), + String::from("tag2"), + ), + ], + plan, + ) + .unwrap(), + ); + let plan = Arc::new( + ProjectionExec::try_new( + vec![(expr_col("tag1", &plan.schema()), String::from("tag1"))], + plan, + ) + .unwrap(), + ); + let opt = ProjectionPushdown::default(); + insta::assert_yaml_snapshot!( + OptimizationTest::new(plan, opt), + @r###" + --- + input: + - " ProjectionExec: expr=[tag1@0 as tag1]" + - " ProjectionExec: expr=[foo as tag1, bar as tag2]" + - " EmptyExec: produce_one_row=false" + output: + Ok: + - " ProjectionExec: expr=[foo as tag1]" + - " EmptyExec: produce_one_row=false" + "### + ); + } + + #[test] + fn test_nested_proj_inner_is_pure() { + let schema = schema(); + let plan = Arc::new(EmptyExec::new(false, schema)); + let plan = Arc::new( + ProjectionExec::try_new( + vec![ + (expr_col("tag1", &plan.schema()), String::from("tag1")), + (expr_col("tag2", &plan.schema()), String::from("tag2")), + ], + plan, + ) + .unwrap(), + ); + let plan = Arc::new( + ProjectionExec::try_new( + vec![(expr_col("tag1", &plan.schema()), String::from("tag1"))], + plan, + ) + .unwrap(), + ); + let opt = ProjectionPushdown::default(); + let test = OptimizationTest::new(plan, opt); + insta::assert_yaml_snapshot!( + test, + @r###" + --- + input: + - " ProjectionExec: expr=[tag1@0 as tag1]" + - " ProjectionExec: expr=[tag1@0 as tag1, tag2@1 as tag2]" + - " EmptyExec: produce_one_row=false" + output: + Ok: + - " EmptyExec: produce_one_row=false" + "### + ); + let empty_exec = test + .output_plan() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let expected_schema = Schema::new(vec![Field::new("tag1", DataType::Utf8, true)]); + assert_eq!(empty_exec.schema().as_ref(), &expected_schema); + } + // since `DeduplicateExec` and `FilterExec` both use `wrap_user_into_projections`, we only test a few variants for `DeduplicateExec` #[test] fn test_dedup_projection_split1() {