refactor: `extract_chunks` return arrow schema (#7231)

Similar to #7217 there is no need to convert the arrow schema to an IOx
schema. This also makes it easier to handle the chunk order column in #6098.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2023-03-16 15:19:52 +01:00 committed by GitHub
parent f76be3fa1b
commit 45d23f7652
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 28 additions and 40 deletions

View File

@ -1,5 +1,6 @@
use std::sync::Arc;
use arrow::datatypes::SchemaRef;
use datafusion::{
error::DataFusionError,
physical_plan::{
@ -8,7 +9,6 @@ use datafusion::{
},
};
use observability_deps::tracing::debug;
use schema::Schema;
use crate::{
provider::{PartitionedFileExt, RecordBatchesExec},
@ -24,7 +24,7 @@ use crate::{
/// additional nodes (like de-duplication, filtering, projection) then NO data will be returned.
///
/// [`chunks_to_physical_nodes`]: crate::provider::chunks_to_physical_nodes
pub fn extract_chunks(plan: &dyn ExecutionPlan) -> Option<(Schema, Vec<Arc<dyn QueryChunk>>)> {
pub fn extract_chunks(plan: &dyn ExecutionPlan) -> Option<(SchemaRef, Vec<Arc<dyn QueryChunk>>)> {
let mut visitor = ExtractChunksVisitor::default();
if let Err(e) = visit_execution_plan(plan, &mut visitor) {
debug!(
@ -39,7 +39,7 @@ pub fn extract_chunks(plan: &dyn ExecutionPlan) -> Option<(Schema, Vec<Arc<dyn Q
#[derive(Debug, Default)]
struct ExtractChunksVisitor {
chunks: Vec<Arc<dyn QueryChunk>>,
schema: Option<Schema>,
schema: Option<SchemaRef>,
}
impl ExtractChunksVisitor {
@ -48,12 +48,7 @@ impl ExtractChunksVisitor {
}
fn add_schema_from_exec(&mut self, exec: &dyn ExecutionPlan) -> Result<(), DataFusionError> {
let schema = Schema::try_from(exec.schema()).map_err(|e| {
DataFusionError::Context(
"Schema recovery".to_owned(),
Box::new(DataFusionError::External(Box::new(e))),
)
})?;
let schema = exec.schema();
if let Some(existing) = &self.schema {
if existing != &schema {
return Err(DataFusionError::External(
@ -146,20 +141,20 @@ mod tests {
#[test]
fn test_roundtrip_empty() {
let schema = chunk(1).schema().clone();
let schema = chunk(1).schema().as_arrow();
assert_roundtrip(schema, vec![]);
}
#[test]
fn test_roundtrip_single_record_batch() {
let chunk1 = chunk(1);
assert_roundtrip(chunk1.schema().clone(), vec![Arc::new(chunk1)]);
assert_roundtrip(chunk1.schema().as_arrow(), vec![Arc::new(chunk1)]);
}
#[test]
fn test_roundtrip_single_parquet() {
let chunk1 = chunk(1).with_dummy_parquet_file();
assert_roundtrip(chunk1.schema().clone(), vec![Arc::new(chunk1)]);
assert_roundtrip(chunk1.schema().as_arrow(), vec![Arc::new(chunk1)]);
}
#[test]
@ -170,7 +165,7 @@ mod tests {
let chunk4 = chunk(4);
let chunk5 = chunk(5);
assert_roundtrip(
chunk1.schema().clone(),
chunk1.schema().as_arrow(),
vec![
Arc::new(chunk1),
Arc::new(chunk2),
@ -208,8 +203,10 @@ mod tests {
DataType::Float64,
true,
)]));
let plan = EmptyExec::new(false, schema);
assert!(extract_chunks(&plan).is_none());
let plan = EmptyExec::new(false, Arc::clone(&schema));
let (schema2, chunks) = extract_chunks(&plan).unwrap();
assert_eq!(schema, schema2);
assert!(chunks.is_empty());
}
#[test]
@ -240,7 +237,8 @@ mod tests {
.unwrap()
.merge(&schema_ext)
.unwrap()
.build();
.build()
.as_arrow();
assert_roundtrip(schema, vec![Arc::new(chunk)]);
}
@ -253,19 +251,14 @@ mod tests {
.unwrap()
.merge(&schema_ext)
.unwrap()
.build();
.build()
.as_arrow();
assert_roundtrip(schema, vec![Arc::new(chunk)]);
}
#[track_caller]
fn assert_roundtrip(schema: Schema, chunks: Vec<Arc<dyn QueryChunk>>) {
let plan = chunks_to_physical_nodes(
&schema.as_arrow(),
None,
chunks.clone(),
Predicate::default(),
2,
);
fn assert_roundtrip(schema: SchemaRef, chunks: Vec<Arc<dyn QueryChunk>>) {
let plan = chunks_to_physical_nodes(&schema, None, chunks.clone(), Predicate::default(), 2);
let (schema2, chunks2) = extract_chunks(plan.as_ref()).expect("data found");
assert_eq!(schema, schema2);
assert_eq!(chunk_ids(&chunks), chunk_ids(&chunks2));

View File

@ -34,9 +34,9 @@ impl PhysicalOptimizerRule for CombineChunks {
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(&|plan| {
if let Some((iox_schema, chunks)) = extract_chunks(plan.as_ref()) {
if let Some((schema, chunks)) = extract_chunks(plan.as_ref()) {
return Ok(Some(chunks_to_physical_nodes(
&iox_schema.as_arrow(),
&schema,
None,
chunks,
Predicate::new(),

View File

@ -44,7 +44,7 @@ impl PhysicalOptimizerRule for DedupNullColumns {
return Ok(None);
};
let pk_cols = schema.primary_key().into_iter().collect::<HashSet<_>>();
let pk_cols = dedup_exec.sort_columns();
let mut used_pk_cols = HashSet::new();
for chunk in &chunks {
@ -64,16 +64,15 @@ impl PhysicalOptimizerRule for DedupNullColumns {
}
let sort_key = sort_key_builder.build();
let arrow_schema = schema.as_arrow();
let child = chunks_to_physical_nodes(
&arrow_schema,
&schema,
(!sort_key.is_empty()).then_some(&sort_key),
chunks,
Predicate::new(),
config.execution.target_partitions,
);
let sort_exprs = arrow_sort_key_exprs(&sort_key, &arrow_schema);
let sort_exprs = arrow_sort_key_exprs(&sort_key, &schema);
return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs))));
}

View File

@ -124,16 +124,15 @@ impl PhysicalOptimizerRule for DedupSortOrder {
}
let quorum_sort_key = quorum_sort_key_builder.build();
let arrow_schema = schema.as_arrow();
let child = chunks_to_physical_nodes(
&arrow_schema,
&schema,
(!quorum_sort_key.is_empty()).then_some(&quorum_sort_key),
chunks,
Predicate::new(),
config.execution.target_partitions,
);
let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, &arrow_schema);
let sort_exprs = arrow_sort_key_exprs(&quorum_sort_key, &schema);
return Ok(Some(Arc::new(DeduplicateExec::new(child, sort_exprs))));
}

View File

@ -76,14 +76,13 @@ impl PhysicalOptimizerRule for PartitionSplit {
let mut chunks_by_partition = chunks_by_partition.into_iter().collect::<Vec<_>>();
chunks_by_partition.sort_by_key(|(p_id, _chunks)| *p_id);
let arrow_schema = schema.as_arrow();
let out = UnionExec::new(
chunks_by_partition
.into_iter()
.map(|(_p_id, chunks)| {
Arc::new(DeduplicateExec::new(
chunks_to_physical_nodes(
&arrow_schema,
&schema,
None,
chunks,
Predicate::new(),

View File

@ -35,9 +35,8 @@ impl PhysicalOptimizerRule for RemoveDedup {
};
if (chunks.len() < 2) && chunks.iter().all(|c| !c.may_contain_pk_duplicates()) {
let arrow_schema = schema.as_arrow();
return Ok(Some(chunks_to_physical_nodes(
&arrow_schema,
&schema,
None,
chunks,
Predicate::new(),

View File

@ -63,14 +63,13 @@ impl PhysicalOptimizerRule for TimeSplit {
return Ok(None);
}
let arrow_schema = schema.as_arrow();
let out = UnionExec::new(
groups
.into_iter()
.map(|chunks| {
Arc::new(DeduplicateExec::new(
chunks_to_physical_nodes(
&arrow_schema,
&schema,
None,
chunks,
Predicate::new(),