feat: debug-log errors during chunk extraction (#7223)

Helps debugging while working on #6098 .
pull/24376/head
Marco Neumann 2023-03-15 19:55:33 +01:00 committed by GitHub
parent 2817e1adb4
commit 393de6980e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 52 additions and 18 deletions

View File

@ -1,9 +1,13 @@
use std::sync::Arc;
use datafusion::physical_plan::{
empty::EmptyExec, file_format::ParquetExec, union::UnionExec, visit_execution_plan,
ExecutionPlan, ExecutionPlanVisitor,
use datafusion::{
error::DataFusionError,
physical_plan::{
empty::EmptyExec, file_format::ParquetExec, union::UnionExec, visit_execution_plan,
ExecutionPlan, ExecutionPlanVisitor,
},
};
use observability_deps::tracing::debug;
use schema::Schema;
use crate::{
@ -22,7 +26,13 @@ use crate::{
/// [`chunks_to_physical_nodes`]: crate::provider::chunks_to_physical_nodes
pub fn extract_chunks(plan: &dyn ExecutionPlan) -> Option<(Schema, Vec<Arc<dyn QueryChunk>>)> {
let mut visitor = ExtractChunksVisitor::default();
visit_execution_plan(plan, &mut visitor).ok()?;
if let Err(e) = visit_execution_plan(plan, &mut visitor) {
debug!(
%e,
"cannot extract chunks",
);
return None;
}
visitor.schema.map(|schema| (schema, visitor.chunks))
}
@ -33,16 +43,22 @@ struct ExtractChunksVisitor {
}
impl ExtractChunksVisitor {
fn add_chunk(&mut self, chunk: Arc<dyn QueryChunk>) -> Result<(), ()> {
fn add_chunk(&mut self, chunk: Arc<dyn QueryChunk>) {
self.chunks.push(chunk);
Ok(())
}
fn add_schema_from_exec(&mut self, exec: &dyn ExecutionPlan) -> Result<(), ()> {
let schema = Schema::try_from(exec.schema()).map_err(|_| ())?;
fn add_schema_from_exec(&mut self, exec: &dyn ExecutionPlan) -> Result<(), DataFusionError> {
let schema = Schema::try_from(exec.schema()).map_err(|e| {
DataFusionError::Context(
"Schema recovery".to_owned(),
Box::new(DataFusionError::External(Box::new(e))),
)
})?;
if let Some(existing) = &self.schema {
if existing != &schema {
return Err(());
return Err(DataFusionError::External(
String::from("Different schema").into(),
));
}
} else {
self.schema = Some(schema);
@ -52,19 +68,27 @@ impl ExtractChunksVisitor {
}
impl ExecutionPlanVisitor for ExtractChunksVisitor {
type Error = ();
type Error = DataFusionError;
fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
let plan_any = plan.as_any();
if let Some(record_batches_exec) = plan_any.downcast_ref::<RecordBatchesExec>() {
self.add_schema_from_exec(record_batches_exec)?;
self.add_schema_from_exec(record_batches_exec)
.map_err(|e| {
DataFusionError::Context(
"add schema from RecordBatchesExec".to_owned(),
Box::new(e),
)
})?;
for chunk in record_batches_exec.chunks() {
self.add_chunk(Arc::clone(chunk))?;
self.add_chunk(Arc::clone(chunk));
}
} else if let Some(parquet_exec) = plan_any.downcast_ref::<ParquetExec>() {
self.add_schema_from_exec(parquet_exec)?;
self.add_schema_from_exec(parquet_exec).map_err(|e| {
DataFusionError::Context("add schema from ParquetExec".to_owned(), Box::new(e))
})?;
for group in &parquet_exec.base_config().file_groups {
for file in group {
@ -72,22 +96,32 @@ impl ExecutionPlanVisitor for ExtractChunksVisitor {
.extensions
.as_ref()
.and_then(|any| any.downcast_ref::<PartitionedFileExt>())
.ok_or(())?;
self.add_chunk(Arc::clone(&ext.0))?;
.ok_or_else(|| {
DataFusionError::External(
String::from("PartitionedFileExt not found").into(),
)
})?;
self.add_chunk(Arc::clone(&ext.0));
}
}
} else if let Some(empty_exec) = plan_any.downcast_ref::<EmptyExec>() {
// should not produce dummy data
if empty_exec.produce_one_row() {
return Err(());
return Err(DataFusionError::External(
String::from("EmptyExec produces row").into(),
));
}
self.add_schema_from_exec(empty_exec)?;
self.add_schema_from_exec(empty_exec).map_err(|e| {
DataFusionError::Context("add schema from EmptyExec".to_owned(), Box::new(e))
})?;
} else if plan_any.downcast_ref::<UnionExec>().is_some() {
// continue visiting
} else {
// unsupported node
return Err(());
return Err(DataFusionError::External(
String::from("Unsupported node").into(),
));
}
Ok(true)