From ab4f91011191968567a7dc08dd8243d85caf39e1 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 2 Dec 2022 11:25:30 +0000 Subject: [PATCH] refactor: improve DF error handling (#6311) This is required to extract "resource exhausted" errors in more cases. --- datafusion_util/src/watch.rs | 13 +++++++-- iox_query/src/exec/context.rs | 30 ++++++++++++++----- service_common/src/error.rs | 55 ++++++++++------------------------- 3 files changed, 47 insertions(+), 51 deletions(-) diff --git a/datafusion_util/src/watch.rs b/datafusion_util/src/watch.rs index e6a01d0e99..95dfc8ccc8 100644 --- a/datafusion_util/src/watch.rs +++ b/datafusion_util/src/watch.rs @@ -62,11 +62,17 @@ async fn watch_task( let msg = match task_result { Err(join_err) => { debug!(e=%join_err, %description, "Error joining"); - Some(format!("Join error for '{description}': {join_err}")) + Some(DataFusionError::Context( + format!("Join error for '{description}'"), + Box::new(DataFusionError::External(Box::new(join_err))), + )) } Ok(Err(e)) => { debug!(%e, %description, "Error in task itself"); - Some(format!("Execution error for '{description}': {e}")) + Some(DataFusionError::Context( + format!("Execution error for '{description}'"), + Box::new(DataFusionError::ArrowError(e)), + )) } Ok(Ok(())) => { // successful @@ -76,12 +82,13 @@ async fn watch_task( // If there is a message to send down the channel, try and do so if let Some(e) = msg { + let e = Arc::new(e); for tx in tx { // try and tell the receiver something went // wrong. Note we ignore errors sending this message // as that means the receiver has already been // shutdown and no one cares anymore lol - let err: ArrowError = DataFusionError::Execution(e.clone()).into(); + let err = ArrowError::ExternalError(Box::new(Arc::clone(&e))); if tx.send(Err(err)).await.is_err() { debug!(%description, "receiver hung up"); } diff --git a/iox_query/src/exec/context.rs b/iox_query/src/exec/context.rs index 294731821a..f8de905b78 100644 --- a/iox_query/src/exec/context.rs +++ b/iox_query/src/exec/context.rs @@ -504,7 +504,10 @@ impl IOxSessionContext { .await? .into_fieldlist() .map_err(|e| { - Error::Execution(format!("Error converting to field list: {}", e)) + Error::Context( + "Error converting to field list".to_string(), + Box::new(Error::External(Box::new(e))), + ) })?; Ok(field_list) @@ -527,9 +530,12 @@ impl IOxSessionContext { } // TODO: Stream this - results - .into_fieldlist() - .map_err(|e| Error::Execution(format!("Error converting to field list: {}", e))) + results.into_fieldlist().map_err(|e| { + Error::Context( + "Error converting to field list".to_string(), + Box::new(Error::External(Box::new(e))), + ) + }) } /// Executes this plan on the query pool, and returns the @@ -542,7 +548,12 @@ impl IOxSessionContext { .run_logical_plans(plans) .await? .into_stringset() - .map_err(|e| Error::Execution(format!("Error converting to stringset: {}", e))), + .map_err(|e| { + Error::Context( + "Error converting to stringset".to_string(), + Box::new(Error::External(Box::new(e))), + ) + }), } } @@ -590,9 +601,12 @@ impl IOxSessionContext { Fut: std::future::Future> + Send + 'static, T: Send + 'static, { - exec.spawn(fut) - .await - .unwrap_or_else(|e| Err(Error::Execution(format!("Join Error: {}", e)))) + exec.spawn(fut).await.unwrap_or_else(|e| { + Err(Error::Context( + "Join Error".to_string(), + Box::new(Error::External(Box::new(e))), + )) + }) } /// Returns a IOxSessionContext with a SpanRecorder that is a child of the current diff --git a/service_common/src/error.rs b/service_common/src/error.rs index c662972d63..30b1ca1bee 100644 --- a/service_common/src/error.rs +++ b/service_common/src/error.rs @@ -1,6 +1,5 @@ //! Routines for error handling - -use datafusion::{arrow::error::ArrowError, error::DataFusionError}; +use datafusion::error::DataFusionError; /// Converts a [`DataFusionError`] into the appropriate [`tonic::Code`] /// @@ -21,27 +20,8 @@ use datafusion::{arrow::error::ArrowError, error::DataFusionError}; /// Basically because I wasn't sure they were all internal errors -- /// for example, you can get an Arrow error if you try and divide a /// column by zero, depending on the data. -pub fn datafusion_error_to_tonic_code(mut e: &DataFusionError) -> tonic::Code { - // traverse potential error chains - loop { - // traverse context chain without recursion - if let DataFusionError::Context(_msg, inner) = e { - e = inner; - continue; - } - - // The Arrow error may itself contain a datafusion error again - // See https://github.com/apache/arrow-datafusion/issues/4172 - if let DataFusionError::ArrowError(ArrowError::ExternalError(inner)) = e { - if let Some(inner) = inner.downcast_ref::() { - e = inner; - continue; - } - } - - // no more traversal - break; - } +pub fn datafusion_error_to_tonic_code(e: &DataFusionError) -> tonic::Code { + let e = e.find_root(); match e { DataFusionError::ResourcesExhausted(_) => tonic::Code::ResourceExhausted, @@ -56,10 +36,6 @@ pub fn datafusion_error_to_tonic_code(mut e: &DataFusionError) -> tonic::Code { // Since we are not sure they are all internal errors we // classify them as InvalidArgument so the user has a chance // to see them - // - // Potential future TODO: we could inspect the error and - // decide. e.g. For Box we could downcast the type - // if IOx only puts a single concrete enum in there. | DataFusionError::Execution(_) | DataFusionError::ArrowError(_) | DataFusionError::ParquetError(_) @@ -104,37 +80,36 @@ mod test { let s = "foo".to_string(); // this is basically a second implementation of the translation table to help avoid mistakes - do_test( + do_transl_test( DataFusionError::ResourcesExhausted(s.clone()), tonic::Code::ResourceExhausted, ); let e = ParserError::ParserError(s.clone()); - do_test(DataFusionError::SQL(e), tonic::Code::InvalidArgument); + do_transl_test(DataFusionError::SQL(e), tonic::Code::InvalidArgument); - do_test( + do_transl_test( DataFusionError::NotImplemented(s.clone()), tonic::Code::InvalidArgument, ); - do_test( + do_transl_test( DataFusionError::Plan(s.clone()), tonic::Code::InvalidArgument, ); - do_test(DataFusionError::Internal(s), tonic::Code::Internal); - } + do_transl_test(DataFusionError::Internal(s), tonic::Code::Internal); - #[test] - fn test_error_context_traversal() { - let inner_error = DataFusionError::ResourcesExhausted("foo".to_string()); - - do_test( - DataFusionError::Context("it happened!".to_string(), Box::new(inner_error)), + // traversal + do_transl_test( + DataFusionError::Context( + "it happened!".to_string(), + Box::new(DataFusionError::ResourcesExhausted("foo".to_string())), + ), tonic::Code::ResourceExhausted, ); } - fn do_test(e: DataFusionError, code: tonic::Code) { + fn do_transl_test(e: DataFusionError, code: tonic::Code) { assert_eq!(datafusion_error_to_tonic_code(&e), code); } }