fix: query error propagation (#5801)

- treat OOM protection as "resource exhausted"
- use `DataFusionError` in more places instead of opaque `Box<dyn Error>`
- improve conversion from/into `DataFusionError` to preserve more
  semantics

Overall, this improves our error handling. DF can now return errors like
"resource exhausted" and gRPC should now automatically generate a
sensible status code for it.

Fixes #5799.
pull/24376/head
Marco Neumann 2022-10-06 10:54:01 +02:00 committed by GitHub
parent e81dad972f
commit c4c83e0840
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 326 additions and 195 deletions

View File

@ -4,10 +4,10 @@ use data_types::{
ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, PartitionId, SequenceNumber,
TableSummary, Timestamp, TimestampMinMax, Tombstone,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use iox_query::{
exec::{stringset::StringSet, IOxSessionContext},
QueryChunk, QueryChunkError, QueryChunkMeta,
QueryChunk, QueryChunkMeta,
};
use observability_deps::tracing::trace;
use parquet_file::chunk::ParquetChunk;
@ -194,7 +194,7 @@ impl QueryChunk for QueryableParquetChunk {
_ctx: IOxSessionContext,
_predicate: &Predicate,
_columns: Selection<'_>,
) -> Result<Option<StringSet>, QueryChunkError> {
) -> Result<Option<StringSet>, DataFusionError> {
Ok(None)
}
@ -208,7 +208,7 @@ impl QueryChunk for QueryableParquetChunk {
_ctx: IOxSessionContext,
_column_name: &str,
_predicate: &Predicate,
) -> Result<Option<StringSet>, QueryChunkError> {
) -> Result<Option<StringSet>, DataFusionError> {
Ok(None)
}
@ -230,7 +230,7 @@ impl QueryChunk for QueryableParquetChunk {
mut ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, QueryChunkError> {
) -> Result<SendableRecordBatchStream, DataFusionError> {
ctx.set_metadata("storage", "compactor");
ctx.set_metadata("projection", format!("{}", selection));
trace!(?selection, "selection");
@ -238,7 +238,7 @@ impl QueryChunk for QueryableParquetChunk {
self.data
.read_filter(predicate, selection)
.context(ReadParquetSnafu)
.map_err(|e| Box::new(e) as _)
.map_err(|e| DataFusionError::External(Box::new(e)))
}
/// Returns chunk type

View File

@ -7,7 +7,8 @@ use futures::FutureExt;
use predicates::prelude::*;
use test_helpers::assert_contains;
use test_helpers_end_to_end::{
maybe_skip_integration, run_query, MiniCluster, Step, StepTest, StepTestState, TestConfig,
maybe_skip_integration, run_query, try_run_query, GrpcRequestBuilder, MiniCluster, Step,
StepTest, StepTestState, TestConfig,
};
#[tokio::test]
@ -454,6 +455,87 @@ async fn issue_4631_b() {
.await
}
#[tokio::test]
async fn oom_protection() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let table_name = "the_table";
// Set up the cluster ====================================
let router_config = TestConfig::new_router(&database_url);
let ingester_config = TestConfig::new_ingester(&router_config);
let querier_config =
TestConfig::new_querier(&ingester_config).with_querier_max_table_query_bytes(1);
let mut cluster = MiniCluster::new()
.with_router(router_config)
.await
.with_ingester(ingester_config)
.await
.with_querier(querier_config)
.await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123457", table_name)),
Step::WaitForReadable,
Step::AssertNotPersisted,
// SQL query
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let sql = format!("select * from {}", table_name);
let err = try_run_query(
sql,
state.cluster().namespace(),
state.cluster().querier().querier_grpc_connection(),
)
.await
.unwrap_err();
if let influxdb_iox_client::flight::Error::GrpcError(status) = err {
assert_eq!(
status.code(),
tonic::Code::ResourceExhausted,
"Wrong status code: {}\n\nStatus:\n{}",
status.code(),
status,
);
} else {
panic!("Not a gRPC error: {err}");
}
}
.boxed()
})),
// InfluxRPC/storage query
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let mut storage_client = state.cluster().querier_storage_client();
let read_filter_request = GrpcRequestBuilder::new()
.source(state.cluster())
.build_read_filter();
let status = storage_client
.read_filter(read_filter_request)
.await
.unwrap_err();
assert_eq!(
status.code(),
tonic::Code::ResourceExhausted,
"Wrong status code: {}\n\nStatus:\n{}",
status.code(),
status,
);
}
.boxed()
})),
],
)
.run()
.await
}
/// This structure holds information for tests that need to force a parquet file to be persisted
struct ForcePersistenceSetup {
// Set up a cluster that will will persist quickly

View File

@ -8,14 +8,17 @@ use data_types::{
ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary,
TimestampMinMax, Tombstone,
};
use datafusion::physical_plan::{
common::SizedRecordBatchStream,
metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics},
SendableRecordBatchStream,
use datafusion::{
error::DataFusionError,
physical_plan::{
common::SizedRecordBatchStream,
metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics},
SendableRecordBatchStream,
},
};
use iox_query::{
exec::{stringset::StringSet, IOxSessionContext},
QueryChunk, QueryChunkError, QueryChunkMeta,
QueryChunk, QueryChunkMeta,
};
use observability_deps::tracing::trace;
use predicate::{
@ -185,7 +188,7 @@ impl QueryChunk for QueryableBatch {
_ctx: IOxSessionContext,
_predicate: &Predicate,
_columns: Selection<'_>,
) -> Result<Option<StringSet>, QueryChunkError> {
) -> Result<Option<StringSet>, DataFusionError> {
Ok(None)
}
@ -199,7 +202,7 @@ impl QueryChunk for QueryableBatch {
_ctx: IOxSessionContext,
_column_name: &str,
_predicate: &Predicate,
) -> Result<Option<StringSet>, QueryChunkError> {
) -> Result<Option<StringSet>, DataFusionError> {
Ok(None)
}
@ -210,12 +213,16 @@ impl QueryChunk for QueryableBatch {
mut ctx: IOxSessionContext,
_predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, QueryChunkError> {
) -> Result<SendableRecordBatchStream, DataFusionError> {
ctx.set_metadata("storage", "ingester");
ctx.set_metadata("projection", format!("{}", selection));
trace!(?selection, "selection");
let schema = self.schema().select(selection).context(SchemaSnafu)?;
let schema = self
.schema()
.select(selection)
.context(SchemaSnafu)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
// Get all record batches from their snapshots
let batches = self
@ -234,7 +241,8 @@ impl QueryChunk for QueryableBatch {
.map(Arc::new);
Some(batch)
})
.collect::<Result<Vec<_>, _>>()?;
.collect::<Result<Vec<_>, _>>()
.map_err(|e| DataFusionError::External(Box::new(e)))?;
// Return stream of data
let dummy_metrics = ExecutionPlanMetricsSet::new();

View File

@ -37,33 +37,11 @@ const CONCURRENT_TABLE_JOBS: usize = 10;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("gRPC planner got error making table_name plan for chunk: {}", source))]
TableNamePlan {
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("gRPC planner got error listing partition keys: {}", source))]
ListingPartitions {
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("gRPC planner got error finding column names: {}", source))]
FindingColumnNames {
source: Box<dyn std::error::Error + Send + Sync>,
},
FindingColumnNames { source: DataFusionError },
#[snafu(display("gRPC planner got error finding column values: {}", source))]
FindingColumnValues {
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display(
"gRPC planner got internal error making table_name with default predicate: {}",
source
))]
InternalTableNamePlanForDefault {
source: Box<dyn std::error::Error + Send + Sync>,
},
FindingColumnValues { source: DataFusionError },
#[snafu(display(
"gRPC planner got error fetching chunks for table '{}': {}",
@ -72,7 +50,7 @@ pub enum Error {
))]
GettingChunks {
table_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
source: DataFusionError,
},
#[snafu(display(
@ -82,21 +60,17 @@ pub enum Error {
))]
CheckingChunkPredicate {
chunk_id: ChunkId,
source: Box<dyn std::error::Error + Send + Sync>,
source: DataFusionError,
},
#[snafu(display("gRPC planner got error creating string set plan: {}", source))]
CreatingStringSet { source: StringSetError },
#[snafu(display("gRPC planner got error creating predicates: {}", source))]
CreatingPredicates {
source: datafusion::error::DataFusionError,
},
CreatingPredicates { source: DataFusionError },
#[snafu(display("gRPC planner got error building plan: {}", source))]
BuildingPlan {
source: datafusion::error::DataFusionError,
},
BuildingPlan { source: DataFusionError },
#[snafu(display(
"gRPC planner error: column '{}' is not a tag, it is {:?}",
@ -148,7 +122,7 @@ pub enum Error {
CastingAggregates {
agg: Aggregate,
field_name: String,
source: datafusion::error::DataFusionError,
source: DataFusionError,
},
#[snafu(display("Internal error: unexpected aggregate request for None aggregate",))]
@ -163,6 +137,34 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl Error {
pub fn to_df_error(self, method: &'static str) -> DataFusionError {
let msg = self.to_string();
match self {
Self::GettingChunks { source, .. }
| Self::CreatingPredicates { source, .. }
| Self::BuildingPlan { source, .. }
| Self::CheckingChunkPredicate { source, .. }
| Self::FindingColumnNames { source, .. }
| Self::FindingColumnValues { source, .. }
| Self::CastingAggregates { source, .. } => {
DataFusionError::Context(format!("{method}: {msg}"), Box::new(source))
}
e @ (Self::CreatingStringSet { .. }
| Self::TableRemoved { .. }
| Self::InvalidTagColumn { .. }
| Self::InternalInvalidTagType { .. }
| Self::DuplicateGroupColumn { .. }
| Self::GroupColumnNotFound { .. }
| Self::CreatingAggregates { .. }
| Self::CreatingScan { .. }
| Self::InternalUnexpectedNoneAggregate {}
| Self::InternalAggregateNotSelector { .. }) => DataFusionError::External(Box::new(e)),
}
}
}
impl From<super::common::Error> for Error {
fn from(source: super::common::Error) -> Self {
Self::CreatingScan { source }

View File

@ -14,7 +14,7 @@ use async_trait::async_trait;
use data_types::{
ChunkId, ChunkOrder, DeletePredicate, InfluxDbType, PartitionId, TableSummary, TimestampMinMax,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use exec::{stringset::StringSet, IOxSessionContext};
use hashbrown::HashMap;
use observability_deps::tracing::{debug, trace};
@ -141,9 +141,6 @@ impl Drop for QueryCompletedToken {
/// This avoids storing potentially large strings
pub type QueryText = Box<dyn std::fmt::Display + Send + Sync>;
/// Error type for [`QueryDatabase`] operations.
pub type QueryDatabaseError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// A `Database` is the main trait implemented by the IOx subsystems
/// that store actual data.
///
@ -159,7 +156,7 @@ pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync {
table_name: &str,
predicate: &Predicate,
ctx: IOxSessionContext,
) -> Result<Vec<Arc<dyn QueryChunk>>, QueryDatabaseError>;
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError>;
/// Record that particular type of query was run / planned
fn record_query(
@ -175,9 +172,6 @@ pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync {
fn as_meta(&self) -> &dyn QueryDatabaseMeta;
}
/// Error type for [`QueryChunk`] operations.
pub type QueryChunkError = Box<dyn std::error::Error + Send + Sync + 'static>;
/// Collection of data that shares the same partition key
pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
/// returns the Id of this chunk. Ids are unique within a
@ -200,7 +194,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
fn apply_predicate_to_metadata(
&self,
predicate: &Predicate,
) -> Result<PredicateMatch, QueryChunkError> {
) -> Result<PredicateMatch, DataFusionError> {
Ok(self
.summary()
.map(|summary| predicate.apply_to_table_summary(&summary, self.schema().as_arrow()))
@ -216,7 +210,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
ctx: IOxSessionContext,
predicate: &Predicate,
columns: Selection<'_>,
) -> Result<Option<StringSet>, QueryChunkError>;
) -> Result<Option<StringSet>, DataFusionError>;
/// Return a set of Strings containing the distinct values in the
/// specified columns. If the predicate can be evaluated entirely
@ -228,7 +222,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
ctx: IOxSessionContext,
column_name: &str,
predicate: &Predicate,
) -> Result<Option<StringSet>, QueryChunkError>;
) -> Result<Option<StringSet>, DataFusionError>;
/// Provides access to raw `QueryChunk` data as an
/// asynchronous stream of `RecordBatch`es filtered by a *required*
@ -248,7 +242,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, QueryChunkError>;
) -> Result<SendableRecordBatchStream, DataFusionError>;
/// Returns chunk type. Useful in tests and debug logs.
fn chunk_type(&self) -> &str;

View File

@ -8,8 +8,8 @@ use crate::{
stringset::{StringSet, StringSetRef},
ExecutionContextProvider, Executor, ExecutorType, IOxSessionContext,
},
Predicate, PredicateMatch, QueryChunk, QueryChunkError, QueryChunkMeta, QueryCompletedToken,
QueryDatabase, QueryDatabaseError, QueryText,
Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryCompletedToken, QueryDatabase,
QueryText,
};
use arrow::{
array::{
@ -24,7 +24,7 @@ use data_types::{
ChunkId, ChunkOrder, ColumnSummary, DeletePredicate, InfluxDbType, PartitionId, StatValues,
Statistics, TableSummary, TimestampMinMax,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use datafusion_util::stream_from_batches;
use futures::StreamExt;
use hashbrown::HashSet;
@ -109,7 +109,7 @@ impl QueryDatabase for TestDatabase {
table_name: &str,
predicate: &Predicate,
_ctx: IOxSessionContext,
) -> Result<Vec<Arc<dyn QueryChunk>>, QueryDatabaseError> {
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
// save last predicate
*self.chunks_predicate.lock() = predicate.clone();
@ -327,9 +327,9 @@ impl TestChunk {
}
/// Checks the saved error, and returns it if any, otherwise returns OK
fn check_error(&self) -> Result<(), QueryChunkError> {
fn check_error(&self) -> Result<(), DataFusionError> {
if let Some(message) = self.saved_error.as_ref() {
Err(message.clone().into())
Err(DataFusionError::External(message.clone().into()))
} else {
Ok(())
}
@ -921,13 +921,17 @@ impl QueryChunk for TestChunk {
_ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, QueryChunkError> {
) -> Result<SendableRecordBatchStream, DataFusionError> {
self.check_error()?;
// save the predicate
self.predicates.lock().push(predicate.clone());
let batches = match self.schema.df_projection(selection)? {
let batches = match self
.schema
.df_projection(selection)
.map_err(|e| DataFusionError::External(Box::new(e)))?
{
None => self.table_data.clone(),
Some(projection) => self
.table_data
@ -948,7 +952,7 @@ impl QueryChunk for TestChunk {
fn apply_predicate_to_metadata(
&self,
predicate: &Predicate,
) -> Result<PredicateMatch, QueryChunkError> {
) -> Result<PredicateMatch, DataFusionError> {
self.check_error()?;
// save the predicate
@ -967,7 +971,7 @@ impl QueryChunk for TestChunk {
_ctx: IOxSessionContext,
_column_name: &str,
_predicate: &Predicate,
) -> Result<Option<StringSet>, QueryChunkError> {
) -> Result<Option<StringSet>, DataFusionError> {
// Model not being able to get column values from metadata
Ok(None)
}
@ -977,7 +981,7 @@ impl QueryChunk for TestChunk {
_ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<Option<StringSet>, QueryChunkError> {
) -> Result<Option<StringSet>, DataFusionError> {
self.check_error()?;
// save the predicate

View File

@ -7,13 +7,16 @@ use arrow::{
use data_types::{
ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax,
};
use datafusion::physical_plan::{
stream::RecordBatchStreamAdapter, RecordBatchStream, SendableRecordBatchStream,
use datafusion::{
error::DataFusionError,
physical_plan::{
stream::RecordBatchStreamAdapter, RecordBatchStream, SendableRecordBatchStream,
},
};
use futures::{Stream, TryStreamExt};
use iox_query::{
exec::{stringset::StringSet, IOxSessionContext},
QueryChunk, QueryChunkError, QueryChunkMeta,
QueryChunk, QueryChunkMeta,
};
use observability_deps::tracing::debug;
use predicate::Predicate;
@ -114,7 +117,7 @@ impl QueryChunk for QuerierChunk {
mut ctx: IOxSessionContext,
predicate: &Predicate,
columns: Selection<'_>,
) -> Result<Option<StringSet>, QueryChunkError> {
) -> Result<Option<StringSet>, DataFusionError> {
ctx.set_metadata("projection", format!("{}", columns));
ctx.set_metadata("predicate", format!("{}", &predicate));
@ -161,10 +164,10 @@ impl QueryChunk for QuerierChunk {
None
}
Err(other) => {
return Err(Box::new(Error::RBChunk {
return Err(DataFusionError::External(Box::new(Error::RBChunk {
source: other,
chunk_id: self.id(),
}))
})))
}
};
@ -178,7 +181,7 @@ impl QueryChunk for QuerierChunk {
mut ctx: IOxSessionContext,
column_name: &str,
predicate: &Predicate,
) -> Result<Option<StringSet>, QueryChunkError> {
) -> Result<Option<StringSet>, DataFusionError> {
ctx.set_metadata("column_name", column_name.to_string());
ctx.set_metadata("predicate", format!("{}", &predicate));
@ -205,11 +208,13 @@ impl QueryChunk for QuerierChunk {
};
ctx.set_metadata("rb_predicate", format!("{}", &rb_predicate));
let mut values = rb_chunk.column_values(
rb_predicate,
Selection::Some(&[column_name]),
BTreeMap::new(),
)?;
let mut values = rb_chunk
.column_values(
rb_predicate,
Selection::Some(&[column_name]),
BTreeMap::new(),
)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
// The InfluxRPC frontend only supports getting column values
// for one column at a time (this is a restriction on the Influx
@ -221,7 +226,8 @@ impl QueryChunk for QuerierChunk {
.context(ColumnNameNotFoundSnafu {
chunk_id: self.id(),
column_name,
})?;
})
.map_err(|e| DataFusionError::External(Box::new(e)))?;
ctx.set_metadata("output_values", values.len() as i64);
Ok(Some(values))
@ -234,7 +240,7 @@ impl QueryChunk for QuerierChunk {
mut ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, QueryChunkError> {
) -> Result<SendableRecordBatchStream, DataFusionError> {
let span_recorder = SpanRecorder::new(
ctx.span()
.map(|span| span.child("QuerierChunk::read_filter")),

View File

@ -11,6 +11,7 @@ use data_types::{
ChunkId, ChunkOrder, IngesterMapping, PartitionId, SequenceNumber, ShardId, ShardIndex,
TableSummary, TimestampMinMax,
};
use datafusion::error::DataFusionError;
use datafusion_util::MemoryStream;
use futures::{stream::FuturesUnordered, TryStreamExt};
use generated_types::{
@ -24,7 +25,7 @@ use influxdb_iox_client::flight::{
use iox_query::{
exec::{stringset::StringSet, IOxSessionContext},
util::compute_timenanosecond_min_max,
QueryChunk, QueryChunkError, QueryChunkMeta,
QueryChunk, QueryChunkMeta,
};
use iox_time::{Time, TimeProvider};
use metric::{DurationHistogram, Metric};
@ -1097,7 +1098,7 @@ impl QueryChunk for IngesterChunk {
_ctx: IOxSessionContext,
_predicate: &Predicate,
_columns: Selection<'_>,
) -> Result<Option<StringSet>, QueryChunkError> {
) -> Result<Option<StringSet>, DataFusionError> {
// TODO maybe some special handling?
Ok(None)
}
@ -1107,7 +1108,7 @@ impl QueryChunk for IngesterChunk {
_ctx: IOxSessionContext,
_column_name: &str,
_predicate: &Predicate,
) -> Result<Option<StringSet>, QueryChunkError> {
) -> Result<Option<StringSet>, DataFusionError> {
// TODO maybe some special handling?
Ok(None)
}
@ -1117,11 +1118,15 @@ impl QueryChunk for IngesterChunk {
_ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<datafusion::physical_plan::SendableRecordBatchStream, QueryChunkError> {
) -> Result<datafusion::physical_plan::SendableRecordBatchStream, DataFusionError> {
trace!(?predicate, ?selection, input_batches=?self.batches, "Reading data");
// Apply selection to in-memory batch
let batches = match self.schema.df_projection(selection)? {
let batches = match self
.schema
.df_projection(selection)
.map_err(|e| DataFusionError::External(Box::new(e)))?
{
None => self.batches.clone(),
Some(projection) => self
.batches

View File

@ -11,10 +11,11 @@ use data_types::NamespaceId;
use datafusion::{
catalog::{catalog::CatalogProvider, schema::SchemaProvider},
datasource::TableProvider,
error::DataFusionError,
};
use iox_query::{
exec::{ExecutionContextProvider, ExecutorType, IOxSessionContext},
QueryChunk, QueryCompletedToken, QueryDatabase, QueryDatabaseError, QueryText, DEFAULT_SCHEMA,
QueryChunk, QueryCompletedToken, QueryDatabase, QueryText, DEFAULT_SCHEMA,
};
use observability_deps::tracing::{debug, trace};
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate};
@ -41,7 +42,7 @@ impl QueryDatabase for QuerierNamespace {
table_name: &str,
predicate: &Predicate,
ctx: IOxSessionContext,
) -> Result<Vec<Arc<dyn QueryChunk>>, QueryDatabaseError> {
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
debug!(%table_name, %predicate, "Finding chunks for table");
// get table metadata
let table = match self.tables.get(table_name).map(Arc::clone) {
@ -627,7 +628,7 @@ mod tests {
.unwrap_err();
assert_eq!(
err.to_string(),
format!("Cannot build plan: External error: Chunk pruning failed: Query would scan at least {total_size} bytes, more than configured maximum {limit} bytes. Try adjusting your compactor settings or increasing the per query memory limit."),
format!("Cannot build plan: Resources exhausted: Query would scan at least {total_size} bytes, more than configured maximum {limit} bytes. Try adjusting your compactor settings or increasing the per query memory limit."),
);
}

View File

@ -8,6 +8,7 @@ use crate::{
IngesterConnection,
};
use data_types::{ColumnId, PartitionId, ShardIndex, TableId, TimestampMinMax};
use datafusion::error::DataFusionError;
use futures::{join, StreamExt};
use iox_query::pruning::prune_summaries;
use iox_query::{exec::Executor, provider, provider::ChunkPruner, QueryChunk};
@ -65,6 +66,17 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl From<Error> for DataFusionError {
fn from(err: Error) -> Self {
match err {
Error::ChunkPruning {
source: err @ provider::Error::TooMuchData { .. },
} => Self::ResourcesExhausted(err.to_string()),
_ => Self::External(Box::new(err) as _),
}
}
}
/// Args to create a [`QuerierTable`].
pub struct QuerierTableArgs {
pub sharder: Arc<JumpHash<Arc<ShardIndex>>>,

View File

@ -66,8 +66,7 @@ impl TableProvider for QuerierTable {
ctx.child_span("querier table chunks"),
projection,
)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
.await?;
for chunk in chunks {
builder = builder.add_chunk(chunk);

View File

@ -23,6 +23,7 @@ use crate::{
use self::interface::{IngesterPartitionInfo, ParquetFileInfo, TombstoneInfo};
#[derive(Snafu, Debug)]
#[allow(missing_copy_implementations)]
pub enum ReconcileError {
#[snafu(display("Compactor processed file that the querier would need to split apart which is not yet implemented"))]
CompactorConflict,

View File

@ -60,7 +60,7 @@ impl Planner {
planner
.table_names(database, predicate)
.await
.map_err(|e| Error::Plan(format!("table_names error: {}", e)))
.map_err(|e| e.to_df_error("table_names"))
})
.await
}
@ -82,7 +82,7 @@ impl Planner {
planner
.tag_keys(database, predicate)
.await
.map_err(|e| Error::Plan(format!("tag_keys error: {}", e)))
.map_err(|e| e.to_df_error("tag_keys"))
})
.await
}
@ -106,7 +106,7 @@ impl Planner {
planner
.tag_values(database, &tag_name, predicate)
.await
.map_err(|e| Error::Plan(format!("tag_values error: {}", e)))
.map_err(|e| e.to_df_error("tag_values"))
})
.await
}
@ -128,7 +128,7 @@ impl Planner {
planner
.field_columns(database, predicate)
.await
.map_err(|e| Error::Plan(format!("field_columns error: {}", e)))
.map_err(|e| e.to_df_error("field_columns"))
})
.await
}
@ -150,7 +150,7 @@ impl Planner {
planner
.read_filter(database, predicate)
.await
.map_err(|e| Error::Plan(format!("read_filter error: {}", e)))
.map_err(|e| e.to_df_error("read_filter"))
})
.await
}
@ -174,7 +174,7 @@ impl Planner {
planner
.read_group(database, predicate, agg, &group_columns)
.await
.map_err(|e| Error::Plan(format!("read_group error: {}", e)))
.map_err(|e| e.to_df_error("read_group"))
})
.await
}
@ -199,7 +199,7 @@ impl Planner {
planner
.read_window_aggregate(database, predicate, agg, every, offset)
.await
.map_err(|e| Error::Plan(format!("read_window_aggregate error: {}", e)))
.map_err(|e| e.to_df_error("read_window_aggregate"))
})
.await
}

View File

@ -9,7 +9,7 @@ use arrow_flight::{
use arrow_util::optimize::{optimize_record_batch, optimize_schema};
use bytes::{Bytes, BytesMut};
use data_types::{DatabaseName, DatabaseNameError};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan};
use futures::{SinkExt, Stream, StreamExt};
use generated_types::influxdata::iox::querier::v1 as proto;
use iox_query::{
@ -54,7 +54,7 @@ pub enum Error {
))]
Query {
database_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
source: DataFusionError,
},
#[snafu(display("Invalid database name: {}", source))]
@ -91,29 +91,40 @@ impl From<Error> for tonic::Status {
Error::Optimize { .. }
| Error::Planning { .. } | Error::Serialization { .. } => warn!(?err, msg),
}
err.to_status()
err.into_status()
}
}
impl Error {
/// Converts a result from the business logic into the appropriate tonic
/// status
fn to_status(&self) -> tonic::Status {
use tonic::Status;
match &self {
Self::InvalidTicket { .. } => Status::invalid_argument(self.to_string()),
Self::InvalidTicketLegacy { .. } => Status::invalid_argument(self.to_string()),
Self::InvalidQuery { .. } => Status::invalid_argument(self.to_string()),
Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()),
Self::Query { .. } => Status::internal(self.to_string()),
Self::InvalidDatabaseName { .. } => Status::invalid_argument(self.to_string()),
Self::Planning {
source: service_common::planner::Error::External(_),
} => Status::internal(self.to_string()),
Self::Planning { .. } => Status::invalid_argument(self.to_string()),
Self::Optimize { .. } => Status::internal(self.to_string()),
Self::Serialization { .. } => Status::internal(self.to_string()),
}
fn into_status(self) -> tonic::Status {
let msg = self.to_string();
let code = match self {
Self::DatabaseNotFound { .. } => tonic::Code::NotFound,
Self::InvalidTicket { .. }
| Self::InvalidTicketLegacy { .. }
| Self::InvalidQuery { .. }
| Self::InvalidDatabaseName { .. } => tonic::Code::InvalidArgument,
Self::Planning { source, .. } | Self::Query { source, .. } => {
// traverse context chain
let mut source = source;
while let DataFusionError::Context(_msg, inner) = source {
source = *inner;
}
match source {
DataFusionError::ResourcesExhausted(_) => tonic::Code::ResourceExhausted,
DataFusionError::Plan(_) => tonic::Code::InvalidArgument,
DataFusionError::NotImplemented(_) => tonic::Code::Unimplemented,
_ => tonic::Code::Internal,
}
}
Self::Optimize { .. } | Self::Serialization { .. } => tonic::Code::Internal,
};
tonic::Status::new(code, msg)
}
}
@ -334,7 +345,6 @@ impl GetStream {
let mut stream_record_batches = ctx
.execute_stream(Arc::clone(&physical_plan))
.await
.map_err(|e| Box::new(e) as _)
.context(QuerySnafu {
database_name: &database_name,
})?;
@ -382,7 +392,7 @@ impl GetStream {
// failure sending here is OK because we're cutting the stream anyways
tx.send(Err(Error::Query {
database_name: database_name.clone(),
source: Box::new(e),
source: DataFusionError::ArrowError(e),
}
.into()))
.await

View File

@ -12,6 +12,7 @@ use crate::{
StorageService,
};
use data_types::{org_and_bucket_to_database, DatabaseName};
use datafusion::error::DataFusionError;
use futures::Stream;
use generated_types::{
google::protobuf::Empty, literal_or_regex::Value as RegexOrLiteralValue,
@ -54,43 +55,43 @@ pub enum Error {
#[snafu(display("Error listing tables in database '{}': {}", db_name, source))]
ListingTables {
db_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
source: DataFusionError,
},
#[snafu(display("Error listing columns in database '{}': {}", db_name, source))]
ListingColumns {
db_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
source: DataFusionError,
},
#[snafu(display("Error listing fields in database '{}': {}", db_name, source))]
ListingFields {
db_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
source: DataFusionError,
},
#[snafu(display("Error creating series plans for database '{}': {}", db_name, source))]
PlanningFilteringSeries {
db_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
source: DataFusionError,
},
#[snafu(display("Error creating group plans for database '{}': {}", db_name, source))]
PlanningGroupSeries {
db_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
source: DataFusionError,
},
#[snafu(display("Error running series plans for database '{}': {}", db_name, source))]
FilteringSeries {
db_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
source: DataFusionError,
},
#[snafu(display("Error running grouping plans for database '{}': {}", db_name, source))]
GroupingSeries {
db_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
source: DataFusionError,
},
#[snafu(display(
@ -102,7 +103,7 @@ pub enum Error {
ListingTagValues {
db_name: String,
tag_name: String,
source: Box<dyn std::error::Error + Send + Sync>,
source: DataFusionError,
},
#[snafu(display("Error converting Predicate '{}: {}", rpc_predicate_string, source))]
@ -177,44 +178,56 @@ impl From<Error> for tonic::Status {
/// status
fn from(err: Error) -> Self {
error!("Error handling gRPC request: {}", err);
err.to_status()
err.into_status()
}
}
impl Error {
/// Converts a result from the business logic into the appropriate tonic
/// status
fn to_status(&self) -> tonic::Status {
match &self {
Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()),
Self::ListingTables { .. } => Status::internal(self.to_string()),
Self::ListingColumns { .. } => {
// TODO: distinguish between input errors and internal errors
Status::invalid_argument(self.to_string())
fn into_status(self) -> tonic::Status {
let msg = self.to_string();
let code = match self {
Self::DatabaseNotFound { .. } => tonic::Code::NotFound,
Self::ListingTables { source, .. }
| Self::ListingColumns { source, .. }
| Self::ListingFields { source, .. }
| Self::PlanningFilteringSeries { source, .. }
| Self::PlanningGroupSeries { source, .. }
| Self::FilteringSeries { source, .. }
| Self::GroupingSeries { source, .. }
| Self::ListingTagValues { source, .. } => {
// traverse context chain
let mut source = source;
while let DataFusionError::Context(_msg, inner) = source {
source = *inner;
}
match source {
DataFusionError::ResourcesExhausted(_) => tonic::Code::ResourceExhausted,
DataFusionError::Plan(_) => tonic::Code::InvalidArgument,
DataFusionError::NotImplemented(_) => tonic::Code::Unimplemented,
_ => tonic::Code::Internal,
}
}
Self::ListingFields { .. } => {
// TODO: distinguish between input errors and internal errors
Status::invalid_argument(self.to_string())
Self::ConvertingPredicate { .. }
| Self::ConvertingReadGroupAggregate { .. }
| Self::ConvertingReadGroupType { .. }
| Self::ConvertingWindowAggregate { .. }
| Self::ConvertingTagKeyInTagValues { .. }
| Self::ComputingGroupedSeriesSet { .. }
| Self::ConvertingFieldList { .. }
| Self::MeasurementLiteralOrRegex { .. }
| Self::MissingTagKeyPredicate {}
| Self::InvalidTagKeyRegex { .. } => tonic::Code::InvalidArgument,
Self::SendingResults { .. } | Self::InternalHintsFieldNotSupported { .. } => {
tonic::Code::Internal
}
Self::PlanningFilteringSeries { .. } => Status::invalid_argument(self.to_string()),
Self::PlanningGroupSeries { .. } => Status::invalid_argument(self.to_string()),
Self::FilteringSeries { .. } => Status::invalid_argument(self.to_string()),
Self::GroupingSeries { .. } => Status::invalid_argument(self.to_string()),
Self::ListingTagValues { .. } => Status::invalid_argument(self.to_string()),
Self::ConvertingPredicate { .. } => Status::invalid_argument(self.to_string()),
Self::ConvertingReadGroupAggregate { .. } => Status::invalid_argument(self.to_string()),
Self::ConvertingReadGroupType { .. } => Status::invalid_argument(self.to_string()),
Self::ConvertingWindowAggregate { .. } => Status::invalid_argument(self.to_string()),
Self::ConvertingTagKeyInTagValues { .. } => Status::invalid_argument(self.to_string()),
Self::ComputingGroupedSeriesSet { .. } => Status::invalid_argument(self.to_string()),
Self::ConvertingFieldList { .. } => Status::invalid_argument(self.to_string()),
Self::SendingResults { .. } => Status::internal(self.to_string()),
Self::InternalHintsFieldNotSupported { .. } => Status::internal(self.to_string()),
Self::NotYetImplemented { .. } => Status::internal(self.to_string()),
Self::MeasurementLiteralOrRegex { .. } => Status::invalid_argument(self.to_string()),
Self::MissingTagKeyPredicate {} => Status::invalid_argument(self.to_string()),
Self::InvalidTagKeyRegex { .. } => Status::invalid_argument(self.to_string()),
}
Self::NotYetImplemented { .. } => tonic::Code::Unimplemented,
};
tonic::Status::new(code, msg)
}
}
@ -341,7 +354,7 @@ where
&ctx,
)
.await
.map_err(|e| e.to_status())?
.map_err(|e| e.into_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
@ -423,7 +436,7 @@ where
&ctx,
)
.await
.map_err(|e| e.to_status())?
.map_err(|e| e.into_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
@ -489,7 +502,7 @@ where
&ctx,
)
.await
.map_err(|e| e.to_status());
.map_err(|e| e.into_status());
if response.is_ok() {
query_completed_token.set_success();
@ -560,7 +573,7 @@ where
operation: "tag_value for a measurement, with general predicate"
.to_string(),
}
.to_status());
.into_status());
}
measurement_name_impl(Arc::clone(&db), db_name, range, predicate, &ctx).await
@ -593,7 +606,7 @@ where
}
};
let response = response.map_err(|e| e.to_status());
let response = response.map_err(|e| e.into_status());
if response.is_ok() {
query_completed_token.set_success();
@ -652,7 +665,7 @@ where
let results =
tag_values_grouped_by_measurement_and_tag_key_impl(Arc::clone(&db), db_name, req, &ctx)
.await
.map_err(|e| e.to_status())?
.map_err(|e| e.into_status())?
.into_iter()
.map(Ok)
.collect::<Vec<_>>();
@ -762,7 +775,7 @@ where
let response = measurement_name_impl(Arc::clone(&db), db_name, range, predicate, &ctx)
.await
.map_err(|e| e.to_status());
.map_err(|e| e.into_status());
if response.is_ok() {
query_completed_token.set_success();
@ -833,7 +846,7 @@ where
&ctx,
)
.await
.map_err(|e| e.to_status());
.map_err(|e| e.into_status());
if response.is_ok() {
query_completed_token.set_success();
@ -907,7 +920,7 @@ where
&ctx,
)
.await
.map_err(|e| e.to_status());
.map_err(|e| e.into_status());
if response.is_ok() {
query_completed_token.set_success();
@ -981,9 +994,9 @@ where
.map(|fieldlist| {
fieldlist_to_measurement_fields_response(fieldlist)
.context(ConvertingFieldListSnafu)
.map_err(|e| e.to_status())
.map_err(|e| e.into_status())
})
.map_err(|e| e.to_status())?;
.map_err(|e| e.into_status())?;
if response.is_ok() {
query_completed_token.set_success();
@ -1048,13 +1061,11 @@ where
let plan = Planner::new(ctx)
.table_names(db, predicate)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingTablesSnafu { db_name })?;
let table_names = ctx
.to_string_set(plan)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingTablesSnafu { db_name })?;
// Map the resulting collection of Strings into a Vec<Vec<u8>>for return
@ -1095,13 +1106,11 @@ where
let tag_key_plan = Planner::new(ctx)
.tag_keys(db, predicate)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingColumnsSnafu { db_name })?;
let tag_keys = ctx
.to_string_set(tag_key_plan)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingColumnsSnafu { db_name })?;
// Map the resulting collection of Strings into a Vec<Vec<u8>>for return
@ -1142,13 +1151,11 @@ where
let tag_value_plan = Planner::new(ctx)
.tag_values(db, tag_name, predicate)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingTagValuesSnafu { db_name, tag_name })?;
let tag_values = ctx
.to_string_set(tag_value_plan)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingTagValuesSnafu { db_name, tag_name })?;
// Map the resulting collection of Strings into a Vec<Vec<u8>>for return
@ -1266,14 +1273,12 @@ where
let series_plan = Planner::new(ctx)
.read_filter(db, predicate)
.await
.map_err(|e| Box::new(e) as _)
.context(PlanningFilteringSeriesSnafu { db_name })?;
// Execute the plans.
let series_or_groups = ctx
.to_series_and_groups(series_plan)
.await
.map_err(|e| Box::new(e) as _)
.context(FilteringSeriesSnafu { db_name })
.log_if_error("Running series set plan")?;
@ -1319,9 +1324,8 @@ where
.await
}
};
let grouped_series_set_plan = grouped_series_set_plan
.map_err(|e| Box::new(e) as _)
.context(PlanningGroupSeriesSnafu { db_name })?;
let grouped_series_set_plan =
grouped_series_set_plan.context(PlanningGroupSeriesSnafu { db_name })?;
// PERF - This used to send responses to the client before execution had
// completed, but now it doesn't. We may need to revisit this in the future
@ -1331,7 +1335,6 @@ where
let series_or_groups = ctx
.to_series_and_groups(grouped_series_set_plan)
.await
.map_err(|e| Box::new(e) as _)
.context(GroupingSeriesSnafu { db_name })
.log_if_error("Running Grouped SeriesSet Plan")?;
@ -1370,13 +1373,11 @@ where
let field_list_plan = Planner::new(ctx)
.field_columns(db, predicate)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingFieldsSnafu { db_name })?;
let field_list = ctx
.to_field_list(field_list_plan)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingFieldsSnafu { db_name })?;
trace!(field_names=?field_list, "Field names response");
@ -1878,7 +1879,7 @@ mod tests {
let response = fixture.storage_client.tag_keys(request).await;
assert_contains!(response.unwrap_err().to_string(), "Sugar we are going down");
grpc_request_metric_has_count(&fixture, "TagKeys", "client_error", 1);
grpc_request_metric_has_count(&fixture, "TagKeys", "server_error", 1);
}
/// test the plumbing of the RPC layer for measurement_tag_keys--
@ -1984,7 +1985,7 @@ mod tests {
let response = fixture.storage_client.measurement_tag_keys(request).await;
assert_contains!(response.unwrap_err().to_string(), "This is an error");
grpc_request_metric_has_count(&fixture, "MeasurementTagKeys", "client_error", 1);
grpc_request_metric_has_count(&fixture, "MeasurementTagKeys", "server_error", 1);
}
/// test the plumbing of the RPC layer for tag_values -- specifically that
@ -2173,7 +2174,8 @@ mod tests {
"Error converting tag_key to UTF-8 in tag_values request"
);
grpc_request_metric_has_count(&fixture, "TagValues", "client_error", 2);
grpc_request_metric_has_count(&fixture, "TagValues", "client_error", 1);
grpc_request_metric_has_count(&fixture, "TagValues", "server_error", 1);
}
#[tokio::test]
@ -2524,7 +2526,7 @@ mod tests {
assert_contains!(response_string, "Sugar we are going down");
grpc_request_metric_has_count(&fixture, "MeasurementTagValues", "client_error", 1);
grpc_request_metric_has_count(&fixture, "MeasurementTagValues", "server_error", 1);
}
#[tokio::test]
@ -2730,7 +2732,7 @@ mod tests {
let response = fixture.storage_client.read_filter(request).await;
assert_contains!(response.unwrap_err().to_string(), "Sugar we are going down");
grpc_request_metric_has_count(&fixture, "ReadFilter", "client_error", 1);
grpc_request_metric_has_count(&fixture, "ReadFilter", "server_error", 1);
}
#[tokio::test]
@ -2822,7 +2824,7 @@ mod tests {
.to_string();
assert_contains!(response_string, "Sugar we are going down");
grpc_request_metric_has_count(&fixture, "ReadGroup", "client_error", 1);
grpc_request_metric_has_count(&fixture, "ReadGroup", "server_error", 1);
}
#[tokio::test]
@ -2988,7 +2990,7 @@ mod tests {
assert_contains!(response_string, "Sugar we are going down");
grpc_request_metric_has_count(&fixture, "ReadWindowAggregate", "client_error", 1);
grpc_request_metric_has_count(&fixture, "ReadWindowAggregate", "server_error", 1);
}
#[tokio::test]

View File

@ -290,6 +290,11 @@ impl TestConfig {
self.with_env("INFLUXDB_IOX_FLIGHT_DO_GET_PANIC", times.to_string())
}
/// Configure maximum per-table query bytes for the querier.
pub fn with_querier_max_table_query_bytes(self, bytes: usize) -> Self {
self.with_env("INFLUXDB_IOX_MAX_TABLE_QUERY_BYTES", bytes.to_string())
}
/// Changes the log to JSON for easier parsing.
pub fn with_json_logs(self) -> Self {
self.with_env("LOG_FORMAT", "json")