feat: sort-related phys. optimizers (#7095)

* feat: `SortPushdown` optimizer

* feat: `RedundantSort` optimizer

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-03-02 15:20:13 +01:00 committed by GitHub
parent 6b35af115e
commit 2c4da24f73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 274 additions and 0 deletions

View File

@ -8,6 +8,7 @@ mod chunk_extraction;
mod combine_chunks;
mod dedup;
mod predicate_pushdown;
mod sort;
mod union;
#[cfg(test)]

View File

@ -0,0 +1,6 @@
//! Rules specific to [`SortExec`].
//!
//! [`SortExec`]: datafusion::physical_plan::sorts::sort::SortExec
pub mod redundant_sort;
pub mod sort_pushdown;

View File

@ -0,0 +1,142 @@
use std::sync::Arc;
use datafusion::{
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{rewrite::TreeNodeRewritable, sorts::sort::SortExec, ExecutionPlan},
};
/// Removes [`SortExec`] if it is no longer needed.
#[derive(Debug, Default)]
pub struct RedundantSort;
impl PhysicalOptimizerRule for RedundantSort {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_down(&|plan| {
let plan_any = plan.as_any();
if let Some(sort_exec) = plan_any.downcast_ref::<SortExec>() {
let child = sort_exec.input();
if child.output_ordering() == Some(sort_exec.expr()) {
return Ok(Some(Arc::clone(child)));
}
}
Ok(None)
})
}
fn name(&self) -> &str {
"redundant_sort"
}
fn schema_check(&self) -> bool {
true
}
}
#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::{
datasource::object_store::ObjectStoreUrl,
physical_expr::PhysicalSortExpr,
physical_plan::{
expressions::Column,
file_format::{FileScanConfig, ParquetExec},
Statistics,
},
};
use crate::physical_optimizer::test_util::OptimizationTest;
use super::*;
#[test]
fn test_not_redundant() {
let schema = schema();
let input = Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test://").unwrap(),
file_schema: Arc::clone(&schema),
file_groups: vec![],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
},
None,
None,
));
let plan =
Arc::new(SortExec::try_new(sort_expr(schema.as_ref()), input, Some(10)).unwrap());
let opt = RedundantSort::default();
insta::assert_yaml_snapshot!(
OptimizationTest::new(plan, opt),
@r###"
---
input:
- " SortExec: fetch=10, expr=[col@0 ASC]"
- " ParquetExec: limit=None, partitions={0 groups: []}, projection=[col]"
output:
Ok:
- " SortExec: fetch=10, expr=[col@0 ASC]"
- " ParquetExec: limit=None, partitions={0 groups: []}, projection=[col]"
"###
);
}
#[test]
fn test_redundant() {
let schema = schema();
let sort_expr = sort_expr(schema.as_ref());
let input = Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test://").unwrap(),
file_schema: Arc::clone(&schema),
file_groups: vec![],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: Some(sort_expr.clone()),
infinite_source: false,
},
None,
None,
));
let plan = Arc::new(SortExec::try_new(sort_expr, input, Some(10)).unwrap());
let opt = RedundantSort::default();
insta::assert_yaml_snapshot!(
OptimizationTest::new(plan, opt),
@r###"
---
input:
- " SortExec: fetch=10, expr=[col@0 ASC]"
- " ParquetExec: limit=None, partitions={0 groups: []}, output_ordering=[col@0 ASC], projection=[col]"
output:
Ok:
- " ParquetExec: limit=None, partitions={0 groups: []}, output_ordering=[col@0 ASC], projection=[col]"
"###
);
}
fn sort_expr(schema: &Schema) -> Vec<PhysicalSortExpr> {
vec![PhysicalSortExpr {
expr: Arc::new(Column::new_with_schema("col", schema).unwrap()),
options: Default::default(),
}]
}
fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, false)]))
}
}

View File

@ -0,0 +1,125 @@
use std::sync::Arc;
use datafusion::{
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
rewrite::TreeNodeRewritable, sorts::sort::SortExec, union::UnionExec, ExecutionPlan,
},
};
/// Pushes [`SortExec`] closer to the data source.
///
/// This is especially useful when there are [`UnionExec`]s within the plan, since they determine a common sort key but
/// often some children may already be sorted.
#[derive(Debug, Default)]
pub struct SortPushdown;
impl PhysicalOptimizerRule for SortPushdown {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_down(&|plan| {
let plan_any = plan.as_any();
if let Some(sort_exec) = plan_any.downcast_ref::<SortExec>() {
let child = sort_exec.input();
let child_any = child.as_any();
if let Some(child_union) = child_any.downcast_ref::<UnionExec>() {
let new_union = UnionExec::new(
child_union
.children()
.into_iter()
.map(|plan| {
let new_sort_exec = SortExec::try_new(
sort_exec.expr().to_vec(),
plan,
sort_exec.fetch(),
)?;
Ok(Arc::new(new_sort_exec) as _)
})
.collect::<Result<Vec<_>>>()?,
);
return Ok(Some(Arc::new(new_union)));
}
}
Ok(None)
})
}
fn name(&self) -> &str {
"sort_pushown"
}
fn schema_check(&self) -> bool {
true
}
}
#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::{
physical_expr::PhysicalSortExpr,
physical_plan::{empty::EmptyExec, expressions::Column},
};
use crate::physical_optimizer::test_util::OptimizationTest;
use super::*;
#[test]
fn test_pushdown() {
let schema = schema();
let input = Arc::new(UnionExec::new(
(0..2)
.map(|_| Arc::new(EmptyExec::new(true, Arc::clone(&schema))) as _)
.collect(),
));
let plan = Arc::new(
SortExec::try_new(
sort_expr(schema.as_ref()),
Arc::new(UnionExec::new(vec![input])),
Some(10),
)
.unwrap(),
);
let opt = SortPushdown::default();
insta::assert_yaml_snapshot!(
OptimizationTest::new(plan, opt),
@r###"
---
input:
- " SortExec: fetch=10, expr=[col@0 ASC]"
- " UnionExec"
- " UnionExec"
- " EmptyExec: produce_one_row=true"
- " EmptyExec: produce_one_row=true"
output:
Ok:
- " UnionExec"
- " UnionExec"
- " SortExec: fetch=10, expr=[col@0 ASC]"
- " EmptyExec: produce_one_row=true"
- " SortExec: fetch=10, expr=[col@0 ASC]"
- " EmptyExec: produce_one_row=true"
"###
);
}
fn sort_expr(schema: &Schema) -> Vec<PhysicalSortExpr> {
vec![PhysicalSortExpr {
expr: Arc::new(Column::new_with_schema("col", schema).unwrap()),
options: Default::default(),
}]
}
fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, false)]))
}
}