From 7e64264eef9e4400b34fd8765c919c4887d1cee8 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 17 May 2023 11:30:04 +0200 Subject: [PATCH] refactor: remove `RedudantSort` optimizer pass (#7809) * test: add dedup test for multiple partitions and ranges * refactor: remove `RedudantSort` optimizer pass Similar to #7807 this is now covered by DataFusion, as demonstrated by the fact that all query tests (incl. explain tests) still pass. The good thing is: passes that are no longer required don't require any upstreaming, so this also closes #7411. --- influxdb_iox/tests/query_tests/cases.rs | 12 ++ .../cases/in/duplicates_different_domains.sql | 7 + .../duplicates_different_domains.sql.expected | 35 +++++ influxdb_iox/tests/query_tests/setups.rs | 31 ++++ iox_query/src/physical_optimizer/mod.rs | 3 +- iox_query/src/physical_optimizer/sort/mod.rs | 1 - .../physical_optimizer/sort/redundant_sort.rs | 142 ------------------ 7 files changed, 86 insertions(+), 145 deletions(-) create mode 100644 influxdb_iox/tests/query_tests/cases/in/duplicates_different_domains.sql create mode 100644 influxdb_iox/tests/query_tests/cases/in/duplicates_different_domains.sql.expected delete mode 100644 iox_query/src/physical_optimizer/sort/redundant_sort.rs diff --git a/influxdb_iox/tests/query_tests/cases.rs b/influxdb_iox/tests/query_tests/cases.rs index 8de8d94337..b7ca055e4b 100644 --- a/influxdb_iox/tests/query_tests/cases.rs +++ b/influxdb_iox/tests/query_tests/cases.rs @@ -145,6 +145,18 @@ async fn duplicates_parquet_50() { .await; } +#[tokio::test] +async fn duplicates_different_domains() { + test_helpers::maybe_start_logging(); + + TestCase { + input: "cases/in/duplicates_different_domains.sql", + chunk_stage: ChunkStage::Parquet, + } + .run() + .await; +} + #[tokio::test] async fn gapfill() { test_helpers::maybe_start_logging(); diff --git a/influxdb_iox/tests/query_tests/cases/in/duplicates_different_domains.sql b/influxdb_iox/tests/query_tests/cases/in/duplicates_different_domains.sql new file mode 100644 index 0000000000..c11074c501 --- /dev/null +++ b/influxdb_iox/tests/query_tests/cases/in/duplicates_different_domains.sql @@ -0,0 +1,7 @@ +-- Test for dedup across different domains (like time range, partitions, et.c) +-- IOX_SETUP: DuplicateDifferentDomains + +select * from m order by time; + +-- IOX_COMPARE: uuid +explain select * from m order by time; diff --git a/influxdb_iox/tests/query_tests/cases/in/duplicates_different_domains.sql.expected b/influxdb_iox/tests/query_tests/cases/in/duplicates_different_domains.sql.expected new file mode 100644 index 0000000000..5afae58e71 --- /dev/null +++ b/influxdb_iox/tests/query_tests/cases/in/duplicates_different_domains.sql.expected @@ -0,0 +1,35 @@ +-- Test Setup: DuplicateDifferentDomains +-- SQL: select * from m order by time; ++-----+-----+--------------------------------+ +| f | tag | time | ++-----+-----+--------------------------------+ +| 1.0 | A | 1970-01-01T00:00:00Z | +| 3.0 | A | 1970-01-01T00:00:00.000000001Z | +| 2.0 | A | 1970-01-02T00:00:00Z | ++-----+-----+--------------------------------+ +-- SQL: explain select * from m order by time; +-- Results After Normalizing UUIDs +---------- +| plan_type | plan | +---------- +| logical_plan | Sort: m.time ASC NULLS LAST | +| | TableScan: m projection=[f, tag, time] | +| physical_plan | SortPreservingMergeExec: [time@2 ASC NULLS LAST] | +| | UnionExec | +| | SortExec: expr=[time@2 ASC NULLS LAST] | +| | ProjectionExec: expr=[f@1 as f, tag@2 as tag, time@3 as time] | +| | DeduplicateExec: [tag@2 ASC,time@3 ASC] | +| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC,__chunk_order@0 ASC] | +| | ParquetExec: file_groups={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000000.parquet], [1/1/1/00000000-0000-0000-0000-000000000001.parquet]]}, projection=[__chunk_order, f, tag, time], output_ordering=[tag@2 ASC, time@3 ASC, __chunk_order@0 ASC] | +| | SortExec: expr=[time@2 ASC NULLS LAST] | +| | ProjectionExec: expr=[f@1 as f, tag@2 as tag, time@3 as time] | +| | DeduplicateExec: [tag@2 ASC,time@3 ASC] | +| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC,__chunk_order@0 ASC] | +| | ParquetExec: file_groups={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000002.parquet], [1/1/1/00000000-0000-0000-0000-000000000003.parquet]]}, projection=[__chunk_order, f, tag, time], output_ordering=[tag@2 ASC, time@3 ASC, __chunk_order@0 ASC] | +| | SortExec: expr=[time@2 ASC NULLS LAST] | +| | ProjectionExec: expr=[f@1 as f, tag@2 as tag, time@3 as time] | +| | DeduplicateExec: [tag@2 ASC,time@3 ASC] | +| | SortPreservingMergeExec: [tag@2 ASC,time@3 ASC,__chunk_order@0 ASC] | +| | ParquetExec: file_groups={2 groups: [[1/1/1/00000000-0000-0000-0000-000000000004.parquet], [1/1/1/00000000-0000-0000-0000-000000000005.parquet]]}, projection=[__chunk_order, f, tag, time], output_ordering=[tag@2 ASC, time@3 ASC, __chunk_order@0 ASC] | +| | | +---------- \ No newline at end of file diff --git a/influxdb_iox/tests/query_tests/setups.rs b/influxdb_iox/tests/query_tests/setups.rs index 0649fcb071..44946eed8c 100644 --- a/influxdb_iox/tests/query_tests/setups.rs +++ b/influxdb_iox/tests/query_tests/setups.rs @@ -1358,6 +1358,37 @@ pub static SETUPS: Lazy> = Lazy::new(|| { }, ], ), + ( + "DuplicateDifferentDomains", + (0..2) + .flat_map(|_| { + [ + Step::RecordNumParquetFiles, + Step::WriteLineProtocol( + r#" + m,tag=A f=1 0 + m,tag=A f=2 86400000000000 + "#.into(), + ), + Step::Persist, + Step::WaitForPersisted2 { + expected_increase: 2, + }, + Step::RecordNumParquetFiles, + Step::WriteLineProtocol( + r#" + m,tag=A f=3 1 + "#.into(), + ), + Step::Persist, + Step::WaitForPersisted2 { + expected_increase: 1, + }, + ] + .into_iter() + }) + .collect::>(), + ), ]) }); diff --git a/iox_query/src/physical_optimizer/mod.rs b/iox_query/src/physical_optimizer/mod.rs index 92198f6aeb..40800a2359 100644 --- a/iox_query/src/physical_optimizer/mod.rs +++ b/iox_query/src/physical_optimizer/mod.rs @@ -10,7 +10,7 @@ use self::{ }, predicate_pushdown::PredicatePushdown, projection_pushdown::ProjectionPushdown, - sort::{parquet_sortness::ParquetSortness, redundant_sort::RedundantSort}, + sort::parquet_sortness::ParquetSortness, union::{nested_union::NestedUnion, one_union::OneUnion}, }; @@ -43,7 +43,6 @@ pub fn register_iox_physical_optimizers(state: SessionState) -> SessionState { Arc::new(OneUnion::default()), ]; optimizers.append(&mut state.physical_optimizers().to_vec()); - optimizers.extend([Arc::new(RedundantSort::default()) as _]); state.with_physical_optimizer_rules(optimizers) } diff --git a/iox_query/src/physical_optimizer/sort/mod.rs b/iox_query/src/physical_optimizer/sort/mod.rs index 5e174342eb..f5f250636a 100644 --- a/iox_query/src/physical_optimizer/sort/mod.rs +++ b/iox_query/src/physical_optimizer/sort/mod.rs @@ -3,4 +3,3 @@ //! [`SortExec`]: datafusion::physical_plan::sorts::sort::SortExec pub mod parquet_sortness; -pub mod redundant_sort; diff --git a/iox_query/src/physical_optimizer/sort/redundant_sort.rs b/iox_query/src/physical_optimizer/sort/redundant_sort.rs deleted file mode 100644 index 9638748c00..0000000000 --- a/iox_query/src/physical_optimizer/sort/redundant_sort.rs +++ /dev/null @@ -1,142 +0,0 @@ -use std::sync::Arc; - -use datafusion::{ - common::tree_node::{Transformed, TreeNode}, - config::ConfigOptions, - error::Result, - physical_optimizer::PhysicalOptimizerRule, - physical_plan::{sorts::sort::SortExec, ExecutionPlan}, -}; - -/// Removes [`SortExec`] if it is no longer needed. -#[derive(Debug, Default)] -pub struct RedundantSort; - -impl PhysicalOptimizerRule for RedundantSort { - fn optimize( - &self, - plan: Arc, - _config: &ConfigOptions, - ) -> Result> { - plan.transform_down(&|plan| { - let plan_any = plan.as_any(); - - if let Some(sort_exec) = plan_any.downcast_ref::() { - let child = sort_exec.input(); - - if child.output_ordering() == Some(sort_exec.expr()) { - return Ok(Transformed::Yes(Arc::clone(child))); - } - } - - Ok(Transformed::No(plan)) - }) - } - - fn name(&self) -> &str { - "redundant_sort" - } - - fn schema_check(&self) -> bool { - true - } -} - -#[cfg(test)] -mod tests { - use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; - use datafusion::{ - datasource::object_store::ObjectStoreUrl, - physical_expr::PhysicalSortExpr, - physical_plan::{ - expressions::Column, - file_format::{FileScanConfig, ParquetExec}, - Statistics, - }, - }; - - use crate::physical_optimizer::test_util::OptimizationTest; - - use super::*; - - #[test] - fn test_not_redundant() { - let schema = schema(); - let input = Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test://").unwrap(), - file_schema: Arc::clone(&schema), - file_groups: vec![], - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: None, - infinite_source: false, - }, - None, - None, - )); - let plan = Arc::new(SortExec::new(sort_expr(schema.as_ref()), input).with_fetch(Some(10))); - let opt = RedundantSort::default(); - insta::assert_yaml_snapshot!( - OptimizationTest::new(plan, opt), - @r###" - --- - input: - - " SortExec: fetch=10, expr=[col@0 ASC]" - - " ParquetExec: file_groups={0 groups: []}, projection=[col]" - output: - Ok: - - " SortExec: fetch=10, expr=[col@0 ASC]" - - " ParquetExec: file_groups={0 groups: []}, projection=[col]" - "### - ); - } - - #[test] - fn test_redundant() { - let schema = schema(); - let sort_expr = sort_expr(schema.as_ref()); - let input = Arc::new(ParquetExec::new( - FileScanConfig { - object_store_url: ObjectStoreUrl::parse("test://").unwrap(), - file_schema: Arc::clone(&schema), - file_groups: vec![], - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - output_ordering: Some(sort_expr.clone()), - infinite_source: false, - }, - None, - None, - )); - let plan = Arc::new(SortExec::new(sort_expr, input).with_fetch(Some(10))); - let opt = RedundantSort::default(); - insta::assert_yaml_snapshot!( - OptimizationTest::new(plan, opt), - @r###" - --- - input: - - " SortExec: fetch=10, expr=[col@0 ASC]" - - " ParquetExec: file_groups={0 groups: []}, projection=[col], output_ordering=[col@0 ASC]" - output: - Ok: - - " ParquetExec: file_groups={0 groups: []}, projection=[col], output_ordering=[col@0 ASC]" - "### - ); - } - - fn sort_expr(schema: &Schema) -> Vec { - vec![PhysicalSortExpr { - expr: Arc::new(Column::new_with_schema("col", schema).unwrap()), - options: Default::default(), - }] - } - - fn schema() -> SchemaRef { - Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, false)])) - } -}