feat: more projection pushdown (#7218)

* feat: proj->proj pushdown

For #6098.

* feat: proj->SortPreservingMergeExec pushdown

For #6098.

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-03-16 09:59:48 +01:00 committed by GitHub
parent 2dde0658c6
commit f128539f98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 173 additions and 2 deletions

View File

@ -19,7 +19,7 @@ use datafusion::{
filter::FilterExec,
projection::ProjectionExec,
rewrite::TreeNodeRewritable,
sorts::sort::SortExec,
sorts::{sort::SortExec, sort_preserving_merge::SortPreservingMergeExec},
union::UnionExec,
ExecutionPlan, PhysicalExpr,
},
@ -32,10 +32,11 @@ use crate::provider::{DeduplicateExec, RecordBatchesExec};
pub struct ProjectionPushdown;
impl PhysicalOptimizerRule for ProjectionPushdown {
#[allow(clippy::only_used_in_recursion)]
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_down(&|plan| {
let plan_any = plan.as_any();
@ -149,6 +150,47 @@ impl PhysicalOptimizerRule for ProjectionPushdown {
},
)?;
return Ok(Some(plan));
} else if let Some(child_sort) = child_any.downcast_ref::<SortPreservingMergeExec>()
{
let sort_required_cols = child_sort
.expr()
.iter()
.map(|expr| collect_columns(&expr.expr))
.collect::<Vec<_>>();
let sort_required_cols = sort_required_cols
.iter()
.flat_map(|cols| cols.iter())
.map(|col| col.name())
.collect::<HashSet<_>>();
let plan = wrap_user_into_projections(
&sort_required_cols,
&column_names,
Arc::clone(child_sort.input()),
|plan| {
Ok(Arc::new(SortPreservingMergeExec::new(
reassign_sort_exprs_columns(child_sort.expr(), &plan.schema())?,
plan,
)))
},
)?;
return Ok(Some(plan));
} else if let Some(child_proj) = child_any.downcast_ref::<ProjectionExec>() {
let expr = column_indices
.iter()
.map(|idx| child_proj.expr()[*idx].clone())
.collect();
let plan = Arc::new(ProjectionExec::try_new(
expr,
Arc::clone(child_proj.input()),
)?);
// need to call `optimize` directly on the plan, because otherwise we would continue with the child
// and miss the optimization of that particular new ProjectionExec
let plan = self.optimize(plan, config)?;
return Ok(Some(plan));
} else if let Some(child_dedup) = child_any.downcast_ref::<DeduplicateExec>() {
let dedup_required_cols = child_dedup.sort_columns();
@ -793,6 +835,135 @@ mod tests {
);
}
// since `SortPreservingMergeExec` and `FilterExec` both use `wrap_user_into_projections`, we only test one variant for `SortPreservingMergeExec`
#[test]
fn test_sortpreservingmerge_projection_split() {
let schema = schema();
let plan = Arc::new(
ProjectionExec::try_new(
vec![(expr_col("tag1", &schema), String::from("tag1"))],
Arc::new(SortPreservingMergeExec::new(
vec![PhysicalSortExpr {
expr: expr_col("tag2", &schema),
options: SortOptions {
descending: true,
..Default::default()
},
}],
Arc::new(TestExec::new(schema)),
)),
)
.unwrap(),
);
let opt = ProjectionPushdown::default();
insta::assert_yaml_snapshot!(
OptimizationTest::new(plan, opt),
@r###"
---
input:
- " ProjectionExec: expr=[tag1@0 as tag1]"
- " SortPreservingMergeExec: [tag2@1 DESC]"
- " Test"
output:
Ok:
- " ProjectionExec: expr=[tag1@0 as tag1]"
- " SortPreservingMergeExec: [tag2@1 DESC]"
- " ProjectionExec: expr=[tag1@0 as tag1, tag2@1 as tag2]"
- " Test"
"###
);
}
#[test]
fn test_nested_proj_inner_is_impure() {
let schema = schema();
let plan = Arc::new(EmptyExec::new(false, schema));
let plan = Arc::new(
ProjectionExec::try_new(
vec![
(
Arc::new(Literal::new(ScalarValue::from("foo"))),
String::from("tag1"),
),
(
Arc::new(Literal::new(ScalarValue::from("bar"))),
String::from("tag2"),
),
],
plan,
)
.unwrap(),
);
let plan = Arc::new(
ProjectionExec::try_new(
vec![(expr_col("tag1", &plan.schema()), String::from("tag1"))],
plan,
)
.unwrap(),
);
let opt = ProjectionPushdown::default();
insta::assert_yaml_snapshot!(
OptimizationTest::new(plan, opt),
@r###"
---
input:
- " ProjectionExec: expr=[tag1@0 as tag1]"
- " ProjectionExec: expr=[foo as tag1, bar as tag2]"
- " EmptyExec: produce_one_row=false"
output:
Ok:
- " ProjectionExec: expr=[foo as tag1]"
- " EmptyExec: produce_one_row=false"
"###
);
}
#[test]
fn test_nested_proj_inner_is_pure() {
let schema = schema();
let plan = Arc::new(EmptyExec::new(false, schema));
let plan = Arc::new(
ProjectionExec::try_new(
vec![
(expr_col("tag1", &plan.schema()), String::from("tag1")),
(expr_col("tag2", &plan.schema()), String::from("tag2")),
],
plan,
)
.unwrap(),
);
let plan = Arc::new(
ProjectionExec::try_new(
vec![(expr_col("tag1", &plan.schema()), String::from("tag1"))],
plan,
)
.unwrap(),
);
let opt = ProjectionPushdown::default();
let test = OptimizationTest::new(plan, opt);
insta::assert_yaml_snapshot!(
test,
@r###"
---
input:
- " ProjectionExec: expr=[tag1@0 as tag1]"
- " ProjectionExec: expr=[tag1@0 as tag1, tag2@1 as tag2]"
- " EmptyExec: produce_one_row=false"
output:
Ok:
- " EmptyExec: produce_one_row=false"
"###
);
let empty_exec = test
.output_plan()
.unwrap()
.as_any()
.downcast_ref::<EmptyExec>()
.unwrap();
let expected_schema = Schema::new(vec![Field::new("tag1", DataType::Utf8, true)]);
assert_eq!(empty_exec.schema().as_ref(), &expected_schema);
}
// since `DeduplicateExec` and `FilterExec` both use `wrap_user_into_projections`, we only test a few variants for `DeduplicateExec`
#[test]
fn test_dedup_projection_split1() {