refactor: improve DF error handling (#6311)

This is required to extract "resource exhausted" errors in more cases.
pull/24376/head
Marco Neumann 2022-12-02 11:25:30 +00:00 committed by GitHub
parent 8742ed6c67
commit ab4f910111
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 51 deletions

View File

@ -62,11 +62,17 @@ async fn watch_task<S>(
let msg = match task_result {
Err(join_err) => {
debug!(e=%join_err, %description, "Error joining");
Some(format!("Join error for '{description}': {join_err}"))
Some(DataFusionError::Context(
format!("Join error for '{description}'"),
Box::new(DataFusionError::External(Box::new(join_err))),
))
}
Ok(Err(e)) => {
debug!(%e, %description, "Error in task itself");
Some(format!("Execution error for '{description}': {e}"))
Some(DataFusionError::Context(
format!("Execution error for '{description}'"),
Box::new(DataFusionError::ArrowError(e)),
))
}
Ok(Ok(())) => {
// successful
@ -76,12 +82,13 @@ async fn watch_task<S>(
// If there is a message to send down the channel, try and do so
if let Some(e) = msg {
let e = Arc::new(e);
for tx in tx {
// try and tell the receiver something went
// wrong. Note we ignore errors sending this message
// as that means the receiver has already been
// shutdown and no one cares anymore lol
let err: ArrowError = DataFusionError::Execution(e.clone()).into();
let err = ArrowError::ExternalError(Box::new(Arc::clone(&e)));
if tx.send(Err(err)).await.is_err() {
debug!(%description, "receiver hung up");
}

View File

@ -504,7 +504,10 @@ impl IOxSessionContext {
.await?
.into_fieldlist()
.map_err(|e| {
Error::Execution(format!("Error converting to field list: {}", e))
Error::Context(
"Error converting to field list".to_string(),
Box::new(Error::External(Box::new(e))),
)
})?;
Ok(field_list)
@ -527,9 +530,12 @@ impl IOxSessionContext {
}
// TODO: Stream this
results
.into_fieldlist()
.map_err(|e| Error::Execution(format!("Error converting to field list: {}", e)))
results.into_fieldlist().map_err(|e| {
Error::Context(
"Error converting to field list".to_string(),
Box::new(Error::External(Box::new(e))),
)
})
}
/// Executes this plan on the query pool, and returns the
@ -542,7 +548,12 @@ impl IOxSessionContext {
.run_logical_plans(plans)
.await?
.into_stringset()
.map_err(|e| Error::Execution(format!("Error converting to stringset: {}", e))),
.map_err(|e| {
Error::Context(
"Error converting to stringset".to_string(),
Box::new(Error::External(Box::new(e))),
)
}),
}
}
@ -590,9 +601,12 @@ impl IOxSessionContext {
Fut: std::future::Future<Output = Result<T>> + Send + 'static,
T: Send + 'static,
{
exec.spawn(fut)
.await
.unwrap_or_else(|e| Err(Error::Execution(format!("Join Error: {}", e))))
exec.spawn(fut).await.unwrap_or_else(|e| {
Err(Error::Context(
"Join Error".to_string(),
Box::new(Error::External(Box::new(e))),
))
})
}
/// Returns a IOxSessionContext with a SpanRecorder that is a child of the current

View File

@ -1,6 +1,5 @@
//! Routines for error handling
use datafusion::{arrow::error::ArrowError, error::DataFusionError};
use datafusion::error::DataFusionError;
/// Converts a [`DataFusionError`] into the appropriate [`tonic::Code`]
///
@ -21,27 +20,8 @@ use datafusion::{arrow::error::ArrowError, error::DataFusionError};
/// Basically because I wasn't sure they were all internal errors --
/// for example, you can get an Arrow error if you try and divide a
/// column by zero, depending on the data.
pub fn datafusion_error_to_tonic_code(mut e: &DataFusionError) -> tonic::Code {
// traverse potential error chains
loop {
// traverse context chain without recursion
if let DataFusionError::Context(_msg, inner) = e {
e = inner;
continue;
}
// The Arrow error may itself contain a datafusion error again
// See https://github.com/apache/arrow-datafusion/issues/4172
if let DataFusionError::ArrowError(ArrowError::ExternalError(inner)) = e {
if let Some(inner) = inner.downcast_ref::<DataFusionError>() {
e = inner;
continue;
}
}
// no more traversal
break;
}
pub fn datafusion_error_to_tonic_code(e: &DataFusionError) -> tonic::Code {
let e = e.find_root();
match e {
DataFusionError::ResourcesExhausted(_) => tonic::Code::ResourceExhausted,
@ -56,10 +36,6 @@ pub fn datafusion_error_to_tonic_code(mut e: &DataFusionError) -> tonic::Code {
// Since we are not sure they are all internal errors we
// classify them as InvalidArgument so the user has a chance
// to see them
//
// Potential future TODO: we could inspect the error and
// decide. e.g. For Box<dyn ...> we could downcast the type
// if IOx only puts a single concrete enum in there.
| DataFusionError::Execution(_)
| DataFusionError::ArrowError(_)
| DataFusionError::ParquetError(_)
@ -104,37 +80,36 @@ mod test {
let s = "foo".to_string();
// this is basically a second implementation of the translation table to help avoid mistakes
do_test(
do_transl_test(
DataFusionError::ResourcesExhausted(s.clone()),
tonic::Code::ResourceExhausted,
);
let e = ParserError::ParserError(s.clone());
do_test(DataFusionError::SQL(e), tonic::Code::InvalidArgument);
do_transl_test(DataFusionError::SQL(e), tonic::Code::InvalidArgument);
do_test(
do_transl_test(
DataFusionError::NotImplemented(s.clone()),
tonic::Code::InvalidArgument,
);
do_test(
do_transl_test(
DataFusionError::Plan(s.clone()),
tonic::Code::InvalidArgument,
);
do_test(DataFusionError::Internal(s), tonic::Code::Internal);
}
do_transl_test(DataFusionError::Internal(s), tonic::Code::Internal);
#[test]
fn test_error_context_traversal() {
let inner_error = DataFusionError::ResourcesExhausted("foo".to_string());
do_test(
DataFusionError::Context("it happened!".to_string(), Box::new(inner_error)),
// traversal
do_transl_test(
DataFusionError::Context(
"it happened!".to_string(),
Box::new(DataFusionError::ResourcesExhausted("foo".to_string())),
),
tonic::Code::ResourceExhausted,
);
}
fn do_test(e: DataFusionError, code: tonic::Code) {
fn do_transl_test(e: DataFusionError, code: tonic::Code) {
assert_eq!(datafusion_error_to_tonic_code(&e), code);
}
}