fix: Return better errors instead of unwrapping

pull/24376/head
Carol (Nichols || Goulding) 2021-02-08 16:36:09 -05:00
parent 261a05b55b
commit afdb0c8274
6 changed files with 108 additions and 26 deletions

View File

@ -136,7 +136,7 @@ impl FlightClientBuilder {
T: std::convert::TryInto<tonic::transport::Endpoint>, T: std::convert::TryInto<tonic::transport::Endpoint>,
T::Error: Into<tonic::codegen::StdError>, T::Error: Into<tonic::codegen::StdError>,
{ {
Ok(FlightClient::connect(flight_url).await?) Ok(FlightClient::connect(flight_url).await.map_err(Box::new)?)
} }
} }

View File

@ -15,6 +15,8 @@ use serde::Serialize;
use std::{convert::TryFrom, sync::Arc}; use std::{convert::TryFrom, sync::Arc};
use tonic::Streaming; use tonic::Streaming;
use crate::errors::{GrpcError, GrpcQueryError};
/// An IOx Arrow Flight gRPC API client. /// An IOx Arrow Flight gRPC API client.
/// ///
/// ```rust /// ```rust
@ -44,7 +46,7 @@ pub struct FlightClient {
} }
impl FlightClient { impl FlightClient {
pub(crate) async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error> pub(crate) async fn connect<D>(dst: D) -> Result<Self, GrpcError>
where where
D: std::convert::TryInto<tonic::transport::Endpoint>, D: std::convert::TryInto<tonic::transport::Endpoint>,
D::Error: Into<tonic::codegen::StdError>, D::Error: Into<tonic::codegen::StdError>,
@ -60,7 +62,7 @@ impl FlightClient {
&mut self, &mut self,
database_name: impl Into<String>, database_name: impl Into<String>,
sql_query: impl Into<String>, sql_query: impl Into<String>,
) -> PerformQuery { ) -> Result<PerformQuery, GrpcQueryError> {
PerformQuery::new(self, database_name.into(), sql_query.into()).await PerformQuery::new(self, database_name.into(), sql_query.into()).await
} }
} }
@ -87,62 +89,69 @@ impl PerformQuery {
flight: &mut FlightClient, flight: &mut FlightClient,
database_name: String, database_name: String,
sql_query: String, sql_query: String,
) -> Self { ) -> Result<Self, GrpcQueryError> {
let query = ReadInfo { let query = ReadInfo {
database_name, database_name,
sql_query, sql_query,
}; };
let t = Ticket { let t = Ticket {
ticket: serde_json::to_string(&query).unwrap().into(), ticket: serde_json::to_string(&query)?.into(),
}; };
let mut response = flight.inner.do_get(t).await.unwrap().into_inner(); let mut response = flight.inner.do_get(t).await?.into_inner();
let flight_data_schema = response.next().await.unwrap().unwrap(); let flight_data_schema = response.next().await.ok_or(GrpcQueryError::NoSchema)??;
let schema = Arc::new(Schema::try_from(&flight_data_schema).unwrap()); let schema = Arc::new(Schema::try_from(&flight_data_schema)?);
let dictionaries_by_field = vec![None; schema.fields().len()]; let dictionaries_by_field = vec![None; schema.fields().len()];
Self { Ok(Self {
schema, schema,
dictionaries_by_field, dictionaries_by_field,
response, response,
} })
} }
/// Returns the next `RecordBatch` available for this query, or `None` if /// Returns the next `RecordBatch` available for this query, or `None` if
/// there are no further results available. /// there are no further results available.
pub async fn next(&mut self) -> Option<RecordBatch> { pub async fn next(&mut self) -> Result<Option<RecordBatch>, GrpcQueryError> {
let Self { let Self {
schema, schema,
dictionaries_by_field, dictionaries_by_field,
response, response,
} = self; } = self;
let data = response.next().await?; let mut data = match response.next().await {
Some(d) => d?,
None => return Ok(None),
};
let mut data = data.unwrap(); let mut message = ipc::root_as_message(&data.data_header[..])
let mut message = .map_err(|e| GrpcQueryError::InvalidFlatbuffer(e.to_string()))?;
ipc::root_as_message(&data.data_header[..]).expect("Error parsing first message");
while message.header_type() == ipc::MessageHeader::DictionaryBatch { while message.header_type() == ipc::MessageHeader::DictionaryBatch {
reader::read_dictionary( reader::read_dictionary(
&data.data_body, &data.data_body,
message message
.header_as_dictionary_batch() .header_as_dictionary_batch()
.expect("Error parsing dictionary"), .ok_or(GrpcQueryError::CouldNotGetDictionaryBatch)?,
&schema, &schema,
dictionaries_by_field, dictionaries_by_field,
) )?;
.expect("Error reading dictionary");
data = response.next().await.unwrap().ok().unwrap(); data = match response.next().await {
message = ipc::root_as_message(&data.data_header[..]).expect("Error parsing message"); Some(d) => d?,
None => return Ok(None),
};
message = ipc::root_as_message(&data.data_header[..])
.map_err(|e| GrpcQueryError::InvalidFlatbuffer(e.to_string()))?;
} }
Some( Ok(Some(flight_data_to_arrow_batch(
flight_data_to_arrow_batch(&data, schema.clone(), &dictionaries_by_field) &data,
.expect("Unable to convert flight data to Arrow batch"), schema.clone(),
) &dictionaries_by_field,
)?))
} }
} }

View File

@ -0,0 +1,25 @@
/// A gRPC request error.
///
/// This is a non-application level error returned when a gRPC request to the
/// IOx server has failed.
#[derive(Debug)]
pub struct GrpcError(tonic::transport::Error);
impl std::fmt::Display for GrpcError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::error::Error for GrpcError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&self.0)
}
}
/// Convert errors from the underlying gRPC client into `GrpcError` instances.
impl From<tonic::transport::Error> for GrpcError {
fn from(v: tonic::transport::Error) -> Self {
Self(v)
}
}

View File

@ -0,0 +1,34 @@
use thiserror::Error;
/// Error responses when querying an IOx database using the Arrow Flight gRPC
/// API.
#[derive(Debug, Error)]
pub enum GrpcQueryError {
/// An error occurred while serializing the query.
#[error(transparent)]
QuerySerializeError(#[from] serde_json::Error),
/// There were no FlightData messages returned when we expected to get one
/// containing a Schema.
#[error("no FlightData containing a Schema returned")]
NoSchema,
/// An error involving an Arrow operation occurred.
#[error(transparent)]
ArrowError(#[from] arrow_deps::arrow::error::ArrowError),
/// The data contained invalid Flatbuffers.
#[error("Invalid Flatbuffer: `{0}`")]
InvalidFlatbuffer(String),
/// The message header said it was a dictionary batch, but interpreting the
/// message as a dictionary batch returned `None`. Indicates malformed
/// Flight data from the server.
#[error("Message with header of type dictionary batch could not return a dictionary batch")]
CouldNotGetDictionaryBatch,
/// An unknown server error occurred. Contains the `tonic::Status` returned
/// from the server.
#[error(transparent)]
GrpcError(#[from] tonic::Status),
}

View File

@ -17,6 +17,9 @@
//! The last case is a generic error returned by the IOx server. These become //! The last case is a generic error returned by the IOx server. These become
//! [`ServerErrorResponse`] instances and contain the error string, optional //! [`ServerErrorResponse`] instances and contain the error string, optional
//! error code and HTTP status code sent by the server. //! error code and HTTP status code sent by the server.
//!
//! If using the Arrow Flight API, errors from gRPC requests will be converted
//! into a [`GrpcError`] containing details of the failed request.
use thiserror::Error; use thiserror::Error;
@ -32,6 +35,16 @@ pub use server_error_response::*;
mod create_database; mod create_database;
pub use create_database::*; pub use create_database::*;
#[cfg(feature = "flight")]
mod grpc_error;
#[cfg(feature = "flight")]
pub use grpc_error::*;
#[cfg(feature = "flight")]
mod grpc_query_error;
#[cfg(feature = "flight")]
pub use grpc_query_error::*;
/// Constants used in API error codes. /// Constants used in API error codes.
/// ///
/// Expressing this as a enum prevents reuse of discriminants, and as they're /// Expressing this as a enum prevents reuse of discriminants, and as they're

View File

@ -9,11 +9,12 @@ pub async fn test(scenario: &Scenario, sql_query: &str, expected_read_data: &[St
.unwrap(); .unwrap();
let mut query_results = client let mut query_results = client
.perform_query(scenario.database_name(), sql_query) .perform_query(scenario.database_name(), sql_query)
.await; .await
.unwrap();
let mut batches = vec![]; let mut batches = vec![];
while let Some(data) = query_results.next().await { while let Some(data) = query_results.next().await.unwrap() {
batches.push(data); batches.push(data);
} }