refactor: remove `RedudantSort` optimizer pass (#7809)

* test: add dedup test for multiple partitions and ranges

* refactor: remove `RedudantSort` optimizer pass

Similar to #7807 this is now covered by DataFusion, as demonstrated by
the fact that all query tests (incl. explain tests) still pass.

The good thing is: passes that are no longer required don't require any
upstreaming, so this also closes #7411.
pull/24376/head
Marco Neumann 2023-05-17 11:30:04 +02:00 committed by GitHub
parent 931b4488bd
commit 7e64264eef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 86 additions and 145 deletions

View File

@ -145,6 +145,18 @@ async fn duplicates_parquet_50() {
.await;
}
#[tokio::test]
async fn duplicates_different_domains() {
test_helpers::maybe_start_logging();
TestCase {
input: "cases/in/duplicates_different_domains.sql",
chunk_stage: ChunkStage::Parquet,
}
.run()
.await;
}
#[tokio::test]
async fn gapfill() {
test_helpers::maybe_start_logging();

View File

@ -0,0 +1,7 @@
-- Test for dedup across different domains (like time range, partitions, et.c)
-- IOX_SETUP: DuplicateDifferentDomains
select * from m order by time;
-- IOX_COMPARE: uuid
explain select * from m order by time;

View File

@ -0,0 +1,35 @@
-- Test Setup: DuplicateDifferentDomains
-- SQL: select * from m order by time;
+-----+-----+--------------------------------+
| f | tag | time |
+-----+-----+--------------------------------+
| 1.0 | A | 1970-01-01T00:00:00Z |
| 3.0 | A | 1970-01-01T00:00:00.000000001Z |
| 2.0 | A | 1970-01-02T00:00:00Z |
+-----+-----+--------------------------------+
-- SQL: explain select * from m order by time;
-- Results After Normalizing UUIDs
----------
| plan_type | plan |
----------
| logical_plan | Sort: m.time ASC NULLS LAST |
| | TableScan: m projection=[f, tag, time] |
| physical_plan | SortPreservingMergeExec: [time@2 ASC NULLS LAST] |
| | UnionExec |
| | SortExec: expr=[time@2 ASC NULLS LAST] |
| | ProjectionExec: expr=[f@1 as f, tag@2 as tag, time@3 as time] |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC,__chunk_order@0 ASC] |
| | ParquetExec: file_groups={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet], [1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, projection=[__chunk_order, f, tag, time], output_ordering=[tag@2 ASC, time@3 ASC, __chunk_order@0 ASC] |
| | SortExec: expr=[time@2 ASC NULLS LAST] |
| | ProjectionExec: expr=[f@1 as f, tag@2 as tag, time@3 as time] |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC,__chunk_order@0 ASC] |
| | ParquetExec: file_groups={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000002.parquet], [1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, projection=[__chunk_order, f, tag, time], output_ordering=[tag@2 ASC, time@3 ASC, __chunk_order@0 ASC] |
| | SortExec: expr=[time@2 ASC NULLS LAST] |
| | ProjectionExec: expr=[f@1 as f, tag@2 as tag, time@3 as time] |
| | DeduplicateExec: [tag@2 ASC,time@3 ASC] |
| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC,__chunk_order@0 ASC] |
| | ParquetExec: file_groups={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000004.parquet], [1/1/1/00000000-0000-0000-0000-000000000005.parquet]]}, projection=[__chunk_order, f, tag, time], output_ordering=[tag@2 ASC, time@3 ASC, __chunk_order@0 ASC] |
| | |
----------

View File

@ -1358,6 +1358,37 @@ pub static SETUPS: Lazy<HashMap<SetupName, SetupSteps>> = Lazy::new(|| {
},
],
),
(
"DuplicateDifferentDomains",
(0..2)
.flat_map(|_| {
[
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
r#"
m,tag=A f=1 0
m,tag=A f=2 86400000000000
"#.into(),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 2,
},
Step::RecordNumParquetFiles,
Step::WriteLineProtocol(
r#"
m,tag=A f=3 1
"#.into(),
),
Step::Persist,
Step::WaitForPersisted2 {
expected_increase: 1,
},
]
.into_iter()
})
.collect::<Vec<_>>(),
),
])
});

View File

@ -10,7 +10,7 @@ use self::{
},
predicate_pushdown::PredicatePushdown,
projection_pushdown::ProjectionPushdown,
sort::{parquet_sortness::ParquetSortness, redundant_sort::RedundantSort},
sort::parquet_sortness::ParquetSortness,
union::{nested_union::NestedUnion, one_union::OneUnion},
};
@ -43,7 +43,6 @@ pub fn register_iox_physical_optimizers(state: SessionState) -> SessionState {
Arc::new(OneUnion::default()),
];
optimizers.append(&mut state.physical_optimizers().to_vec());
optimizers.extend([Arc::new(RedundantSort::default()) as _]);
state.with_physical_optimizer_rules(optimizers)
}

View File

@ -3,4 +3,3 @@
//! [`SortExec`]: datafusion::physical_plan::sorts::sort::SortExec
pub mod parquet_sortness;
pub mod redundant_sort;

View File

@ -1,142 +0,0 @@
use std::sync::Arc;
use datafusion::{
common::tree_node::{Transformed, TreeNode},
config::ConfigOptions,
error::Result,
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{sorts::sort::SortExec, ExecutionPlan},
};
/// Removes [`SortExec`] if it is no longer needed.
#[derive(Debug, Default)]
pub struct RedundantSort;
impl PhysicalOptimizerRule for RedundantSort {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_down(&|plan| {
let plan_any = plan.as_any();
if let Some(sort_exec) = plan_any.downcast_ref::<SortExec>() {
let child = sort_exec.input();
if child.output_ordering() == Some(sort_exec.expr()) {
return Ok(Transformed::Yes(Arc::clone(child)));
}
}
Ok(Transformed::No(plan))
})
}
fn name(&self) -> &str {
"redundant_sort"
}
fn schema_check(&self) -> bool {
true
}
}
#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::{
datasource::object_store::ObjectStoreUrl,
physical_expr::PhysicalSortExpr,
physical_plan::{
expressions::Column,
file_format::{FileScanConfig, ParquetExec},
Statistics,
},
};
use crate::physical_optimizer::test_util::OptimizationTest;
use super::*;
#[test]
fn test_not_redundant() {
let schema = schema();
let input = Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test://").unwrap(),
file_schema: Arc::clone(&schema),
file_groups: vec![],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
},
None,
None,
));
let plan = Arc::new(SortExec::new(sort_expr(schema.as_ref()), input).with_fetch(Some(10)));
let opt = RedundantSort::default();
insta::assert_yaml_snapshot!(
OptimizationTest::new(plan, opt),
@r###"
---
input:
- " SortExec: fetch=10, expr=[col@0 ASC]"
- " ParquetExec: file_groups={0 groups: []}, projection=[col]"
output:
Ok:
- " SortExec: fetch=10, expr=[col@0 ASC]"
- " ParquetExec: file_groups={0 groups: []}, projection=[col]"
"###
);
}
#[test]
fn test_redundant() {
let schema = schema();
let sort_expr = sort_expr(schema.as_ref());
let input = Arc::new(ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("test://").unwrap(),
file_schema: Arc::clone(&schema),
file_groups: vec![],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
output_ordering: Some(sort_expr.clone()),
infinite_source: false,
},
None,
None,
));
let plan = Arc::new(SortExec::new(sort_expr, input).with_fetch(Some(10)));
let opt = RedundantSort::default();
insta::assert_yaml_snapshot!(
OptimizationTest::new(plan, opt),
@r###"
---
input:
- " SortExec: fetch=10, expr=[col@0 ASC]"
- " ParquetExec: file_groups={0 groups: []}, projection=[col], output_ordering=[col@0 ASC]"
output:
Ok:
- " ParquetExec: file_groups={0 groups: []}, projection=[col], output_ordering=[col@0 ASC]"
"###
);
}
fn sort_expr(schema: &Schema) -> Vec<PhysicalSortExpr> {
vec![PhysicalSortExpr {
expr: Arc::new(Column::new_with_schema("col", schema).unwrap()),
options: Default::default(),
}]
}
fn schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, false)]))
}
}