From 45d23f76523774b0b30f27e291ff6eef9c9a3f85 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 16 Mar 2023 15:19:52 +0100 Subject: [PATCH] refactor: `extract_chunks` return arrow schema (#7231) Similar to #7217 there is no need to convert the arrow schema to an IOx schema. This also makes it easier to handle the chunk order column in #6098. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .../physical_optimizer/chunk_extraction.rs | 43 ++++++++----------- .../src/physical_optimizer/combine_chunks.rs | 4 +- .../dedup/dedup_null_columns.rs | 7 ++- .../dedup/dedup_sort_order.rs | 5 +-- .../dedup/partition_split.rs | 3 +- .../physical_optimizer/dedup/remove_dedup.rs | 3 +- .../physical_optimizer/dedup/time_split.rs | 3 +- 7 files changed, 28 insertions(+), 40 deletions(-) diff --git a/iox_query/src/physical_optimizer/chunk_extraction.rs b/iox_query/src/physical_optimizer/chunk_extraction.rs index f5f615bda2..87c72f7d01 100644 --- a/iox_query/src/physical_optimizer/chunk_extraction.rs +++ b/iox_query/src/physical_optimizer/chunk_extraction.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use arrow::datatypes::SchemaRef; use datafusion::{ error::DataFusionError, physical_plan::{ @@ -8,7 +9,6 @@ use datafusion::{ }, }; use observability_deps::tracing::debug; -use schema::Schema; use crate::{ provider::{PartitionedFileExt, RecordBatchesExec}, @@ -24,7 +24,7 @@ use crate::{ /// additional nodes (like de-duplication, filtering, projection) then NO data will be returned. /// /// [`chunks_to_physical_nodes`]: crate::provider::chunks_to_physical_nodes -pub fn extract_chunks(plan: &dyn ExecutionPlan) -> Option<(Schema, Vec>)> { +pub fn extract_chunks(plan: &dyn ExecutionPlan) -> Option<(SchemaRef, Vec>)> { let mut visitor = ExtractChunksVisitor::default(); if let Err(e) = visit_execution_plan(plan, &mut visitor) { debug!( @@ -39,7 +39,7 @@ pub fn extract_chunks(plan: &dyn ExecutionPlan) -> Option<(Schema, Vec>, - schema: Option, + schema: Option, } impl ExtractChunksVisitor { @@ -48,12 +48,7 @@ impl ExtractChunksVisitor { } fn add_schema_from_exec(&mut self, exec: &dyn ExecutionPlan) -> Result<(), DataFusionError> { - let schema = Schema::try_from(exec.schema()).map_err(|e| { - DataFusionError::Context( - "Schema recovery".to_owned(), - Box::new(DataFusionError::External(Box::new(e))), - ) - })?; + let schema = exec.schema(); if let Some(existing) = &self.schema { if existing != &schema { return Err(DataFusionError::External( @@ -146,20 +141,20 @@ mod tests { #[test] fn test_roundtrip_empty() { - let schema = chunk(1).schema().clone(); + let schema = chunk(1).schema().as_arrow(); assert_roundtrip(schema, vec![]); } #[test] fn test_roundtrip_single_record_batch() { let chunk1 = chunk(1); - assert_roundtrip(chunk1.schema().clone(), vec![Arc::new(chunk1)]); + assert_roundtrip(chunk1.schema().as_arrow(), vec![Arc::new(chunk1)]); } #[test] fn test_roundtrip_single_parquet() { let chunk1 = chunk(1).with_dummy_parquet_file(); - assert_roundtrip(chunk1.schema().clone(), vec![Arc::new(chunk1)]); + assert_roundtrip(chunk1.schema().as_arrow(), vec![Arc::new(chunk1)]); } #[test] @@ -170,7 +165,7 @@ mod tests { let chunk4 = chunk(4); let chunk5 = chunk(5); assert_roundtrip( - chunk1.schema().clone(), + chunk1.schema().as_arrow(), vec![ Arc::new(chunk1), Arc::new(chunk2), @@ -208,8 +203,10 @@ mod tests { DataType::Float64, true, )])); - let plan = EmptyExec::new(false, schema); - assert!(extract_chunks(&plan).is_none()); + let plan = EmptyExec::new(false, Arc::clone(&schema)); + let (schema2, chunks) = extract_chunks(&plan).unwrap(); + assert_eq!(schema, schema2); + assert!(chunks.is_empty()); } #[test] @@ -240,7 +237,8 @@ mod tests { .unwrap() .merge(&schema_ext) .unwrap() - .build(); + .build() + .as_arrow(); assert_roundtrip(schema, vec![Arc::new(chunk)]); } @@ -253,19 +251,14 @@ mod tests { .unwrap() .merge(&schema_ext) .unwrap() - .build(); + .build() + .as_arrow(); assert_roundtrip(schema, vec![Arc::new(chunk)]); } #[track_caller] - fn assert_roundtrip(schema: Schema, chunks: Vec>) { - let plan = chunks_to_physical_nodes( - &schema.as_arrow(), - None, - chunks.clone(), - Predicate::default(), - 2, - ); + fn assert_roundtrip(schema: SchemaRef, chunks: Vec>) { + let plan = chunks_to_physical_nodes(&schema, None, chunks.clone(), Predicate::default(), 2); let (schema2, chunks2) = extract_chunks(plan.as_ref()).expect("data found"); assert_eq!(schema, schema2); assert_eq!(chunk_ids(&chunks), chunk_ids(&chunks2)); diff --git a/iox_query/src/physical_optimizer/combine_chunks.rs b/iox_query/src/physical_optimizer/combine_chunks.rs index 78cd14602a..9347c4834f 100644 --- a/iox_query/src/physical_optimizer/combine_chunks.rs +++ b/iox_query/src/physical_optimizer/combine_chunks.rs @@ -34,9 +34,9 @@ impl PhysicalOptimizerRule for CombineChunks { config: &ConfigOptions, ) -> Result> { plan.transform_up(&|plan| { - if let Some((iox_schema, chunks)) = extract_chunks(plan.as_ref()) { + if let Some((schema, chunks)) = extract_chunks(plan.as_ref()) { return Ok(Some(chunks_to_physical_nodes( - &iox_schema.as_arrow(), + &schema, None, chunks, Predicate::new(), diff --git a/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs b/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs index f9790cb22c..c5e808e52d 100644 --- a/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs +++ b/iox_query/src/physical_optimizer/dedup/dedup_null_columns.rs @@ -44,7 +44,7 @@ impl PhysicalOptimizerRule for DedupNullColumns { return Ok(None); }; - let pk_cols = schema.primary_key().into_iter().collect::>(); + let pk_cols = dedup_exec.sort_columns(); let mut used_pk_cols = HashSet::new(); for chunk in &chunks { @@ -64,16 +64,15 @@ impl PhysicalOptimizerRule for DedupNullColumns { } let sort_key = sort_key_builder.build(); - let arrow_schema = schema.as_arrow(); let child = chunks_to_physical_nodes( - &arrow_schema, + &schema, (!sort_key.is_empty()).then_some(&sort_key), chunks, Predicate::new(), config.execution.target_partitions, ); - let sort_exprs = arrow_sort_key_exprs(&sort_key, &arrow_schema); + let sort_exprs = arrow_sort_key_exprs(&sort_key, &schema); return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs)))); } diff --git a/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs b/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs index 650f9dc141..de42c0c90f 100644 --- a/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs +++ b/iox_query/src/physical_optimizer/dedup/dedup_sort_order.rs @@ -124,16 +124,15 @@ impl PhysicalOptimizerRule for DedupSortOrder { } let quorum_sort_key = quorum_sort_key_builder.build(); - let arrow_schema = schema.as_arrow(); let child = chunks_to_physical_nodes( - &arrow_schema, + &schema, (!quorum_sort_key.is_empty()).then_some(&quorum_sort_key), chunks, Predicate::new(), config.execution.target_partitions, ); - let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, &arrow_schema); + let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, &schema); return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs)))); } diff --git a/iox_query/src/physical_optimizer/dedup/partition_split.rs b/iox_query/src/physical_optimizer/dedup/partition_split.rs index 066f97febb..0d21fc5113 100644 --- a/iox_query/src/physical_optimizer/dedup/partition_split.rs +++ b/iox_query/src/physical_optimizer/dedup/partition_split.rs @@ -76,14 +76,13 @@ impl PhysicalOptimizerRule for PartitionSplit { let mut chunks_by_partition = chunks_by_partition.into_iter().collect::>(); chunks_by_partition.sort_by_key(|(p_id, _chunks)| *p_id); - let arrow_schema = schema.as_arrow(); let out = UnionExec::new( chunks_by_partition .into_iter() .map(|(_p_id, chunks)| { Arc::new(DeduplicateExec::new( chunks_to_physical_nodes( - &arrow_schema, + &schema, None, chunks, Predicate::new(), diff --git a/iox_query/src/physical_optimizer/dedup/remove_dedup.rs b/iox_query/src/physical_optimizer/dedup/remove_dedup.rs index 1aae525dcf..edda8ac6b9 100644 --- a/iox_query/src/physical_optimizer/dedup/remove_dedup.rs +++ b/iox_query/src/physical_optimizer/dedup/remove_dedup.rs @@ -35,9 +35,8 @@ impl PhysicalOptimizerRule for RemoveDedup { }; if (chunks.len() < 2) && chunks.iter().all(|c| !c.may_contain_pk_duplicates()) { - let arrow_schema = schema.as_arrow(); return Ok(Some(chunks_to_physical_nodes( - &arrow_schema, + &schema, None, chunks, Predicate::new(), diff --git a/iox_query/src/physical_optimizer/dedup/time_split.rs b/iox_query/src/physical_optimizer/dedup/time_split.rs index a54715f34e..bed6b825cd 100644 --- a/iox_query/src/physical_optimizer/dedup/time_split.rs +++ b/iox_query/src/physical_optimizer/dedup/time_split.rs @@ -63,14 +63,13 @@ impl PhysicalOptimizerRule for TimeSplit { return Ok(None); } - let arrow_schema = schema.as_arrow(); let out = UnionExec::new( groups .into_iter() .map(|chunks| { Arc::new(DeduplicateExec::new( chunks_to_physical_nodes( - &arrow_schema, + &schema, None, chunks, Predicate::new(),