feat: phys. pred. pushdown to parquet (#7159)

For #6098.
pull/24376/head
Marco Neumann 2023-03-08 17:36:27 +01:00 committed by GitHub
parent 3828d2a50e
commit 309177b750
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 66 additions and 1 deletions

View File

@ -13,6 +13,7 @@ use datafusion::{
physical_plan::{
empty::EmptyExec,
expressions::{BinaryExpr, Column},
file_format::ParquetExec,
filter::FilterExec,
rewrite::TreeNodeRewritable,
union::UnionExec,
@ -59,6 +60,27 @@ impl PhysicalOptimizerRule for PredicatePushdown {
.collect::<Result<Vec<_>>>()?;
let new_union = UnionExec::new(new_inputs);
return Ok(Some(Arc::new(new_union)));
} else if let Some(child_parquet) = child_any.downcast_ref::<ParquetExec>() {
let existing = child_parquet
.predicate()
.map(split_conjunction)
.unwrap_or_default();
let both = conjunction(
existing
.into_iter()
.chain(split_conjunction(filter_exec.predicate()))
.cloned(),
);
let new_node = Arc::new(FilterExec::try_new(
Arc::clone(filter_exec.predicate()),
Arc::new(ParquetExec::new(
child_parquet.base_config().clone(),
both,
None,
)),
)?);
return Ok(Some(new_node));
} else if let Some(child_dedup) = child_any.downcast_ref::<DeduplicateExec>() {
let dedup_cols = child_dedup.sort_columns();
let (pushdown, no_pushdown): (Vec<_>, Vec<_>) =
@ -143,11 +165,13 @@ fn conjunction(
mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::{
datasource::object_store::ObjectStoreUrl,
logical_expr::Operator,
physical_expr::PhysicalSortExpr,
physical_plan::{
expressions::{BinaryExpr, Column, Literal},
PhysicalExpr,
file_format::FileScanConfig,
PhysicalExpr, Statistics,
},
scalar::ScalarValue,
};
@ -280,6 +304,47 @@ mod tests {
);
}
#[test]
fn test_parquet() {
let schema = schema();
let base_config = FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test://").unwrap(),
file_schema: Arc::clone(&schema),
file_groups: vec![],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
};
let plan = Arc::new(
FilterExec::try_new(
predicate_mixed(&schema),
Arc::new(ParquetExec::new(
base_config,
Some(predicate_tag(&schema)),
None,
)),
)
.unwrap(),
);
let opt = PredicatePushdown::default();
insta::assert_yaml_snapshot!(
OptimizationTest::new(plan, opt),
@r###"
---
input:
- " FilterExec: tag1@0 = field@2"
- " ParquetExec: limit=None, partitions={0 groups: []}, predicate=tag1@0 = foo, pruning_predicate=tag1_min@0 <= foo AND foo <= tag1_max@1, projection=[tag1, tag2, field]"
output:
Ok:
- " FilterExec: tag1@0 = field@2"
- " ParquetExec: limit=None, partitions={0 groups: []}, predicate=tag1@0 = foo AND tag1@0 = field@2, pruning_predicate=tag1_min@0 <= foo AND foo <= tag1_max@1, projection=[tag1, tag2, field]"
"###
);
}
#[test]
fn test_dedup_no_pushdown() {
let schema = schema();