diff --git a/compactor/src/query.rs b/compactor/src/query.rs index ea6e219d4e..20a8d068cc 100644 --- a/compactor/src/query.rs +++ b/compactor/src/query.rs @@ -4,10 +4,10 @@ use data_types::{ ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, PartitionId, SequenceNumber, TableSummary, Timestamp, TimestampMinMax, Tombstone, }; -use datafusion::physical_plan::SendableRecordBatchStream; +use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; use iox_query::{ exec::{stringset::StringSet, IOxSessionContext}, - QueryChunk, QueryChunkError, QueryChunkMeta, + QueryChunk, QueryChunkMeta, }; use observability_deps::tracing::trace; use parquet_file::chunk::ParquetChunk; @@ -194,7 +194,7 @@ impl QueryChunk for QueryableParquetChunk { _ctx: IOxSessionContext, _predicate: &Predicate, _columns: Selection<'_>, - ) -> Result, QueryChunkError> { + ) -> Result, DataFusionError> { Ok(None) } @@ -208,7 +208,7 @@ impl QueryChunk for QueryableParquetChunk { _ctx: IOxSessionContext, _column_name: &str, _predicate: &Predicate, - ) -> Result, QueryChunkError> { + ) -> Result, DataFusionError> { Ok(None) } @@ -230,7 +230,7 @@ impl QueryChunk for QueryableParquetChunk { mut ctx: IOxSessionContext, predicate: &Predicate, selection: Selection<'_>, - ) -> Result { + ) -> Result { ctx.set_metadata("storage", "compactor"); ctx.set_metadata("projection", format!("{}", selection)); trace!(?selection, "selection"); @@ -238,7 +238,7 @@ impl QueryChunk for QueryableParquetChunk { self.data .read_filter(predicate, selection) .context(ReadParquetSnafu) - .map_err(|e| Box::new(e) as _) + .map_err(|e| DataFusionError::External(Box::new(e))) } /// Returns chunk type diff --git a/influxdb_iox/tests/end_to_end_cases/querier.rs b/influxdb_iox/tests/end_to_end_cases/querier.rs index d5f1cfbe0e..b189098a64 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier.rs @@ -7,7 +7,8 @@ use futures::FutureExt; use predicates::prelude::*; use test_helpers::assert_contains; use test_helpers_end_to_end::{ - maybe_skip_integration, run_query, MiniCluster, Step, StepTest, StepTestState, TestConfig, + maybe_skip_integration, run_query, try_run_query, GrpcRequestBuilder, MiniCluster, Step, + StepTest, StepTestState, TestConfig, }; #[tokio::test] @@ -454,6 +455,87 @@ async fn issue_4631_b() { .await } +#[tokio::test] +async fn oom_protection() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + let table_name = "the_table"; + + // Set up the cluster ==================================== + let router_config = TestConfig::new_router(&database_url); + let ingester_config = TestConfig::new_ingester(&router_config); + let querier_config = + TestConfig::new_querier(&ingester_config).with_querier_max_table_query_bytes(1); + let mut cluster = MiniCluster::new() + .with_router(router_config) + .await + .with_ingester(ingester_config) + .await + .with_querier(querier_config) + .await; + + StepTest::new( + &mut cluster, + vec![ + Step::WriteLineProtocol(format!("{},tag1=A,tag2=B val=42i 123457", table_name)), + Step::WaitForReadable, + Step::AssertNotPersisted, + // SQL query + Step::Custom(Box::new(move |state: &mut StepTestState| { + async move { + let sql = format!("select * from {}", table_name); + let err = try_run_query( + sql, + state.cluster().namespace(), + state.cluster().querier().querier_grpc_connection(), + ) + .await + .unwrap_err(); + + if let influxdb_iox_client::flight::Error::GrpcError(status) = err { + assert_eq!( + status.code(), + tonic::Code::ResourceExhausted, + "Wrong status code: {}\n\nStatus:\n{}", + status.code(), + status, + ); + } else { + panic!("Not a gRPC error: {err}"); + } + } + .boxed() + })), + // InfluxRPC/storage query + Step::Custom(Box::new(move |state: &mut StepTestState| { + async move { + let mut storage_client = state.cluster().querier_storage_client(); + + let read_filter_request = GrpcRequestBuilder::new() + .source(state.cluster()) + .build_read_filter(); + + let status = storage_client + .read_filter(read_filter_request) + .await + .unwrap_err(); + assert_eq!( + status.code(), + tonic::Code::ResourceExhausted, + "Wrong status code: {}\n\nStatus:\n{}", + status.code(), + status, + ); + } + .boxed() + })), + ], + ) + .run() + .await +} + /// This structure holds information for tests that need to force a parquet file to be persisted struct ForcePersistenceSetup { // Set up a cluster that will will persist quickly diff --git a/ingester/src/query.rs b/ingester/src/query.rs index 747ff4666c..1829ecf4ae 100644 --- a/ingester/src/query.rs +++ b/ingester/src/query.rs @@ -8,14 +8,17 @@ use data_types::{ ChunkId, ChunkOrder, DeletePredicate, PartitionId, SequenceNumber, TableSummary, TimestampMinMax, Tombstone, }; -use datafusion::physical_plan::{ - common::SizedRecordBatchStream, - metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics}, - SendableRecordBatchStream, +use datafusion::{ + error::DataFusionError, + physical_plan::{ + common::SizedRecordBatchStream, + metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics}, + SendableRecordBatchStream, + }, }; use iox_query::{ exec::{stringset::StringSet, IOxSessionContext}, - QueryChunk, QueryChunkError, QueryChunkMeta, + QueryChunk, QueryChunkMeta, }; use observability_deps::tracing::trace; use predicate::{ @@ -185,7 +188,7 @@ impl QueryChunk for QueryableBatch { _ctx: IOxSessionContext, _predicate: &Predicate, _columns: Selection<'_>, - ) -> Result, QueryChunkError> { + ) -> Result, DataFusionError> { Ok(None) } @@ -199,7 +202,7 @@ impl QueryChunk for QueryableBatch { _ctx: IOxSessionContext, _column_name: &str, _predicate: &Predicate, - ) -> Result, QueryChunkError> { + ) -> Result, DataFusionError> { Ok(None) } @@ -210,12 +213,16 @@ impl QueryChunk for QueryableBatch { mut ctx: IOxSessionContext, _predicate: &Predicate, selection: Selection<'_>, - ) -> Result { + ) -> Result { ctx.set_metadata("storage", "ingester"); ctx.set_metadata("projection", format!("{}", selection)); trace!(?selection, "selection"); - let schema = self.schema().select(selection).context(SchemaSnafu)?; + let schema = self + .schema() + .select(selection) + .context(SchemaSnafu) + .map_err(|e| DataFusionError::External(Box::new(e)))?; // Get all record batches from their snapshots let batches = self @@ -234,7 +241,8 @@ impl QueryChunk for QueryableBatch { .map(Arc::new); Some(batch) }) - .collect::, _>>()?; + .collect::, _>>() + .map_err(|e| DataFusionError::External(Box::new(e)))?; // Return stream of data let dummy_metrics = ExecutionPlanMetricsSet::new(); diff --git a/iox_query/src/frontend/influxrpc.rs b/iox_query/src/frontend/influxrpc.rs index 0940aff71b..1018eed91e 100644 --- a/iox_query/src/frontend/influxrpc.rs +++ b/iox_query/src/frontend/influxrpc.rs @@ -37,33 +37,11 @@ const CONCURRENT_TABLE_JOBS: usize = 10; #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("gRPC planner got error making table_name plan for chunk: {}", source))] - TableNamePlan { - source: Box, - }, - - #[snafu(display("gRPC planner got error listing partition keys: {}", source))] - ListingPartitions { - source: Box, - }, - #[snafu(display("gRPC planner got error finding column names: {}", source))] - FindingColumnNames { - source: Box, - }, + FindingColumnNames { source: DataFusionError }, #[snafu(display("gRPC planner got error finding column values: {}", source))] - FindingColumnValues { - source: Box, - }, - - #[snafu(display( - "gRPC planner got internal error making table_name with default predicate: {}", - source - ))] - InternalTableNamePlanForDefault { - source: Box, - }, + FindingColumnValues { source: DataFusionError }, #[snafu(display( "gRPC planner got error fetching chunks for table '{}': {}", @@ -72,7 +50,7 @@ pub enum Error { ))] GettingChunks { table_name: String, - source: Box, + source: DataFusionError, }, #[snafu(display( @@ -82,21 +60,17 @@ pub enum Error { ))] CheckingChunkPredicate { chunk_id: ChunkId, - source: Box, + source: DataFusionError, }, #[snafu(display("gRPC planner got error creating string set plan: {}", source))] CreatingStringSet { source: StringSetError }, #[snafu(display("gRPC planner got error creating predicates: {}", source))] - CreatingPredicates { - source: datafusion::error::DataFusionError, - }, + CreatingPredicates { source: DataFusionError }, #[snafu(display("gRPC planner got error building plan: {}", source))] - BuildingPlan { - source: datafusion::error::DataFusionError, - }, + BuildingPlan { source: DataFusionError }, #[snafu(display( "gRPC planner error: column '{}' is not a tag, it is {:?}", @@ -148,7 +122,7 @@ pub enum Error { CastingAggregates { agg: Aggregate, field_name: String, - source: datafusion::error::DataFusionError, + source: DataFusionError, }, #[snafu(display("Internal error: unexpected aggregate request for None aggregate",))] @@ -163,6 +137,34 @@ pub enum Error { pub type Result = std::result::Result; +impl Error { + pub fn to_df_error(self, method: &'static str) -> DataFusionError { + let msg = self.to_string(); + + match self { + Self::GettingChunks { source, .. } + | Self::CreatingPredicates { source, .. } + | Self::BuildingPlan { source, .. } + | Self::CheckingChunkPredicate { source, .. } + | Self::FindingColumnNames { source, .. } + | Self::FindingColumnValues { source, .. } + | Self::CastingAggregates { source, .. } => { + DataFusionError::Context(format!("{method}: {msg}"), Box::new(source)) + } + e @ (Self::CreatingStringSet { .. } + | Self::TableRemoved { .. } + | Self::InvalidTagColumn { .. } + | Self::InternalInvalidTagType { .. } + | Self::DuplicateGroupColumn { .. } + | Self::GroupColumnNotFound { .. } + | Self::CreatingAggregates { .. } + | Self::CreatingScan { .. } + | Self::InternalUnexpectedNoneAggregate {} + | Self::InternalAggregateNotSelector { .. }) => DataFusionError::External(Box::new(e)), + } + } +} + impl From for Error { fn from(source: super::common::Error) -> Self { Self::CreatingScan { source } diff --git a/iox_query/src/lib.rs b/iox_query/src/lib.rs index a0bd37a68b..b9d09544a3 100644 --- a/iox_query/src/lib.rs +++ b/iox_query/src/lib.rs @@ -14,7 +14,7 @@ use async_trait::async_trait; use data_types::{ ChunkId, ChunkOrder, DeletePredicate, InfluxDbType, PartitionId, TableSummary, TimestampMinMax, }; -use datafusion::physical_plan::SendableRecordBatchStream; +use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; use exec::{stringset::StringSet, IOxSessionContext}; use hashbrown::HashMap; use observability_deps::tracing::{debug, trace}; @@ -141,9 +141,6 @@ impl Drop for QueryCompletedToken { /// This avoids storing potentially large strings pub type QueryText = Box; -/// Error type for [`QueryDatabase`] operations. -pub type QueryDatabaseError = Box; - /// A `Database` is the main trait implemented by the IOx subsystems /// that store actual data. /// @@ -159,7 +156,7 @@ pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync { table_name: &str, predicate: &Predicate, ctx: IOxSessionContext, - ) -> Result>, QueryDatabaseError>; + ) -> Result>, DataFusionError>; /// Record that particular type of query was run / planned fn record_query( @@ -175,9 +172,6 @@ pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync { fn as_meta(&self) -> &dyn QueryDatabaseMeta; } -/// Error type for [`QueryChunk`] operations. -pub type QueryChunkError = Box; - /// Collection of data that shares the same partition key pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static { /// returns the Id of this chunk. Ids are unique within a @@ -200,7 +194,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static { fn apply_predicate_to_metadata( &self, predicate: &Predicate, - ) -> Result { + ) -> Result { Ok(self .summary() .map(|summary| predicate.apply_to_table_summary(&summary, self.schema().as_arrow())) @@ -216,7 +210,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static { ctx: IOxSessionContext, predicate: &Predicate, columns: Selection<'_>, - ) -> Result, QueryChunkError>; + ) -> Result, DataFusionError>; /// Return a set of Strings containing the distinct values in the /// specified columns. If the predicate can be evaluated entirely @@ -228,7 +222,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static { ctx: IOxSessionContext, column_name: &str, predicate: &Predicate, - ) -> Result, QueryChunkError>; + ) -> Result, DataFusionError>; /// Provides access to raw `QueryChunk` data as an /// asynchronous stream of `RecordBatch`es filtered by a *required* @@ -248,7 +242,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static { ctx: IOxSessionContext, predicate: &Predicate, selection: Selection<'_>, - ) -> Result; + ) -> Result; /// Returns chunk type. Useful in tests and debug logs. fn chunk_type(&self) -> &str; diff --git a/iox_query/src/test.rs b/iox_query/src/test.rs index dee2d1120b..256cd069ea 100644 --- a/iox_query/src/test.rs +++ b/iox_query/src/test.rs @@ -8,8 +8,8 @@ use crate::{ stringset::{StringSet, StringSetRef}, ExecutionContextProvider, Executor, ExecutorType, IOxSessionContext, }, - Predicate, PredicateMatch, QueryChunk, QueryChunkError, QueryChunkMeta, QueryCompletedToken, - QueryDatabase, QueryDatabaseError, QueryText, + Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryCompletedToken, QueryDatabase, + QueryText, }; use arrow::{ array::{ @@ -24,7 +24,7 @@ use data_types::{ ChunkId, ChunkOrder, ColumnSummary, DeletePredicate, InfluxDbType, PartitionId, StatValues, Statistics, TableSummary, TimestampMinMax, }; -use datafusion::physical_plan::SendableRecordBatchStream; +use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; use datafusion_util::stream_from_batches; use futures::StreamExt; use hashbrown::HashSet; @@ -109,7 +109,7 @@ impl QueryDatabase for TestDatabase { table_name: &str, predicate: &Predicate, _ctx: IOxSessionContext, - ) -> Result>, QueryDatabaseError> { + ) -> Result>, DataFusionError> { // save last predicate *self.chunks_predicate.lock() = predicate.clone(); @@ -327,9 +327,9 @@ impl TestChunk { } /// Checks the saved error, and returns it if any, otherwise returns OK - fn check_error(&self) -> Result<(), QueryChunkError> { + fn check_error(&self) -> Result<(), DataFusionError> { if let Some(message) = self.saved_error.as_ref() { - Err(message.clone().into()) + Err(DataFusionError::External(message.clone().into())) } else { Ok(()) } @@ -921,13 +921,17 @@ impl QueryChunk for TestChunk { _ctx: IOxSessionContext, predicate: &Predicate, selection: Selection<'_>, - ) -> Result { + ) -> Result { self.check_error()?; // save the predicate self.predicates.lock().push(predicate.clone()); - let batches = match self.schema.df_projection(selection)? { + let batches = match self + .schema + .df_projection(selection) + .map_err(|e| DataFusionError::External(Box::new(e)))? + { None => self.table_data.clone(), Some(projection) => self .table_data @@ -948,7 +952,7 @@ impl QueryChunk for TestChunk { fn apply_predicate_to_metadata( &self, predicate: &Predicate, - ) -> Result { + ) -> Result { self.check_error()?; // save the predicate @@ -967,7 +971,7 @@ impl QueryChunk for TestChunk { _ctx: IOxSessionContext, _column_name: &str, _predicate: &Predicate, - ) -> Result, QueryChunkError> { + ) -> Result, DataFusionError> { // Model not being able to get column values from metadata Ok(None) } @@ -977,7 +981,7 @@ impl QueryChunk for TestChunk { _ctx: IOxSessionContext, predicate: &Predicate, selection: Selection<'_>, - ) -> Result, QueryChunkError> { + ) -> Result, DataFusionError> { self.check_error()?; // save the predicate diff --git a/querier/src/chunk/query_access.rs b/querier/src/chunk/query_access.rs index 0edf477ec7..dc94a55b69 100644 --- a/querier/src/chunk/query_access.rs +++ b/querier/src/chunk/query_access.rs @@ -7,13 +7,16 @@ use arrow::{ use data_types::{ ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax, }; -use datafusion::physical_plan::{ - stream::RecordBatchStreamAdapter, RecordBatchStream, SendableRecordBatchStream, +use datafusion::{ + error::DataFusionError, + physical_plan::{ + stream::RecordBatchStreamAdapter, RecordBatchStream, SendableRecordBatchStream, + }, }; use futures::{Stream, TryStreamExt}; use iox_query::{ exec::{stringset::StringSet, IOxSessionContext}, - QueryChunk, QueryChunkError, QueryChunkMeta, + QueryChunk, QueryChunkMeta, }; use observability_deps::tracing::debug; use predicate::Predicate; @@ -114,7 +117,7 @@ impl QueryChunk for QuerierChunk { mut ctx: IOxSessionContext, predicate: &Predicate, columns: Selection<'_>, - ) -> Result, QueryChunkError> { + ) -> Result, DataFusionError> { ctx.set_metadata("projection", format!("{}", columns)); ctx.set_metadata("predicate", format!("{}", &predicate)); @@ -161,10 +164,10 @@ impl QueryChunk for QuerierChunk { None } Err(other) => { - return Err(Box::new(Error::RBChunk { + return Err(DataFusionError::External(Box::new(Error::RBChunk { source: other, chunk_id: self.id(), - })) + }))) } }; @@ -178,7 +181,7 @@ impl QueryChunk for QuerierChunk { mut ctx: IOxSessionContext, column_name: &str, predicate: &Predicate, - ) -> Result, QueryChunkError> { + ) -> Result, DataFusionError> { ctx.set_metadata("column_name", column_name.to_string()); ctx.set_metadata("predicate", format!("{}", &predicate)); @@ -205,11 +208,13 @@ impl QueryChunk for QuerierChunk { }; ctx.set_metadata("rb_predicate", format!("{}", &rb_predicate)); - let mut values = rb_chunk.column_values( - rb_predicate, - Selection::Some(&[column_name]), - BTreeMap::new(), - )?; + let mut values = rb_chunk + .column_values( + rb_predicate, + Selection::Some(&[column_name]), + BTreeMap::new(), + ) + .map_err(|e| DataFusionError::External(Box::new(e)))?; // The InfluxRPC frontend only supports getting column values // for one column at a time (this is a restriction on the Influx @@ -221,7 +226,8 @@ impl QueryChunk for QuerierChunk { .context(ColumnNameNotFoundSnafu { chunk_id: self.id(), column_name, - })?; + }) + .map_err(|e| DataFusionError::External(Box::new(e)))?; ctx.set_metadata("output_values", values.len() as i64); Ok(Some(values)) @@ -234,7 +240,7 @@ impl QueryChunk for QuerierChunk { mut ctx: IOxSessionContext, predicate: &Predicate, selection: Selection<'_>, - ) -> Result { + ) -> Result { let span_recorder = SpanRecorder::new( ctx.span() .map(|span| span.child("QuerierChunk::read_filter")), diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index aac2635c29..86946d2c54 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -11,6 +11,7 @@ use data_types::{ ChunkId, ChunkOrder, IngesterMapping, PartitionId, SequenceNumber, ShardId, ShardIndex, TableSummary, TimestampMinMax, }; +use datafusion::error::DataFusionError; use datafusion_util::MemoryStream; use futures::{stream::FuturesUnordered, TryStreamExt}; use generated_types::{ @@ -24,7 +25,7 @@ use influxdb_iox_client::flight::{ use iox_query::{ exec::{stringset::StringSet, IOxSessionContext}, util::compute_timenanosecond_min_max, - QueryChunk, QueryChunkError, QueryChunkMeta, + QueryChunk, QueryChunkMeta, }; use iox_time::{Time, TimeProvider}; use metric::{DurationHistogram, Metric}; @@ -1097,7 +1098,7 @@ impl QueryChunk for IngesterChunk { _ctx: IOxSessionContext, _predicate: &Predicate, _columns: Selection<'_>, - ) -> Result, QueryChunkError> { + ) -> Result, DataFusionError> { // TODO maybe some special handling? Ok(None) } @@ -1107,7 +1108,7 @@ impl QueryChunk for IngesterChunk { _ctx: IOxSessionContext, _column_name: &str, _predicate: &Predicate, - ) -> Result, QueryChunkError> { + ) -> Result, DataFusionError> { // TODO maybe some special handling? Ok(None) } @@ -1117,11 +1118,15 @@ impl QueryChunk for IngesterChunk { _ctx: IOxSessionContext, predicate: &Predicate, selection: Selection<'_>, - ) -> Result { + ) -> Result { trace!(?predicate, ?selection, input_batches=?self.batches, "Reading data"); // Apply selection to in-memory batch - let batches = match self.schema.df_projection(selection)? { + let batches = match self + .schema + .df_projection(selection) + .map_err(|e| DataFusionError::External(Box::new(e)))? + { None => self.batches.clone(), Some(projection) => self .batches diff --git a/querier/src/namespace/query_access.rs b/querier/src/namespace/query_access.rs index b7451000b3..e7a3856554 100644 --- a/querier/src/namespace/query_access.rs +++ b/querier/src/namespace/query_access.rs @@ -11,10 +11,11 @@ use data_types::NamespaceId; use datafusion::{ catalog::{catalog::CatalogProvider, schema::SchemaProvider}, datasource::TableProvider, + error::DataFusionError, }; use iox_query::{ exec::{ExecutionContextProvider, ExecutorType, IOxSessionContext}, - QueryChunk, QueryCompletedToken, QueryDatabase, QueryDatabaseError, QueryText, DEFAULT_SCHEMA, + QueryChunk, QueryCompletedToken, QueryDatabase, QueryText, DEFAULT_SCHEMA, }; use observability_deps::tracing::{debug, trace}; use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate}; @@ -41,7 +42,7 @@ impl QueryDatabase for QuerierNamespace { table_name: &str, predicate: &Predicate, ctx: IOxSessionContext, - ) -> Result>, QueryDatabaseError> { + ) -> Result>, DataFusionError> { debug!(%table_name, %predicate, "Finding chunks for table"); // get table metadata let table = match self.tables.get(table_name).map(Arc::clone) { @@ -627,7 +628,7 @@ mod tests { .unwrap_err(); assert_eq!( err.to_string(), - format!("Cannot build plan: External error: Chunk pruning failed: Query would scan at least {total_size} bytes, more than configured maximum {limit} bytes. Try adjusting your compactor settings or increasing the per query memory limit."), + format!("Cannot build plan: Resources exhausted: Query would scan at least {total_size} bytes, more than configured maximum {limit} bytes. Try adjusting your compactor settings or increasing the per query memory limit."), ); } diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 19835fde6f..767fa6c83a 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -8,6 +8,7 @@ use crate::{ IngesterConnection, }; use data_types::{ColumnId, PartitionId, ShardIndex, TableId, TimestampMinMax}; +use datafusion::error::DataFusionError; use futures::{join, StreamExt}; use iox_query::pruning::prune_summaries; use iox_query::{exec::Executor, provider, provider::ChunkPruner, QueryChunk}; @@ -65,6 +66,17 @@ pub enum Error { pub type Result = std::result::Result; +impl From for DataFusionError { + fn from(err: Error) -> Self { + match err { + Error::ChunkPruning { + source: err @ provider::Error::TooMuchData { .. }, + } => Self::ResourcesExhausted(err.to_string()), + _ => Self::External(Box::new(err) as _), + } + } +} + /// Args to create a [`QuerierTable`]. pub struct QuerierTableArgs { pub sharder: Arc>>, diff --git a/querier/src/table/query_access/mod.rs b/querier/src/table/query_access/mod.rs index 5665f79171..e16830577b 100644 --- a/querier/src/table/query_access/mod.rs +++ b/querier/src/table/query_access/mod.rs @@ -66,8 +66,7 @@ impl TableProvider for QuerierTable { ctx.child_span("querier table chunks"), projection, ) - .await - .map_err(|e| DataFusionError::External(Box::new(e)))?; + .await?; for chunk in chunks { builder = builder.add_chunk(chunk); diff --git a/querier/src/table/state_reconciler.rs b/querier/src/table/state_reconciler.rs index baa2935911..d5fe4cced6 100644 --- a/querier/src/table/state_reconciler.rs +++ b/querier/src/table/state_reconciler.rs @@ -23,6 +23,7 @@ use crate::{ use self::interface::{IngesterPartitionInfo, ParquetFileInfo, TombstoneInfo}; #[derive(Snafu, Debug)] +#[allow(missing_copy_implementations)] pub enum ReconcileError { #[snafu(display("Compactor processed file that the querier would need to split apart which is not yet implemented"))] CompactorConflict, diff --git a/service_common/src/planner.rs b/service_common/src/planner.rs index 6431963aad..e1bc5adf71 100644 --- a/service_common/src/planner.rs +++ b/service_common/src/planner.rs @@ -60,7 +60,7 @@ impl Planner { planner .table_names(database, predicate) .await - .map_err(|e| Error::Plan(format!("table_names error: {}", e))) + .map_err(|e| e.to_df_error("table_names")) }) .await } @@ -82,7 +82,7 @@ impl Planner { planner .tag_keys(database, predicate) .await - .map_err(|e| Error::Plan(format!("tag_keys error: {}", e))) + .map_err(|e| e.to_df_error("tag_keys")) }) .await } @@ -106,7 +106,7 @@ impl Planner { planner .tag_values(database, &tag_name, predicate) .await - .map_err(|e| Error::Plan(format!("tag_values error: {}", e))) + .map_err(|e| e.to_df_error("tag_values")) }) .await } @@ -128,7 +128,7 @@ impl Planner { planner .field_columns(database, predicate) .await - .map_err(|e| Error::Plan(format!("field_columns error: {}", e))) + .map_err(|e| e.to_df_error("field_columns")) }) .await } @@ -150,7 +150,7 @@ impl Planner { planner .read_filter(database, predicate) .await - .map_err(|e| Error::Plan(format!("read_filter error: {}", e))) + .map_err(|e| e.to_df_error("read_filter")) }) .await } @@ -174,7 +174,7 @@ impl Planner { planner .read_group(database, predicate, agg, &group_columns) .await - .map_err(|e| Error::Plan(format!("read_group error: {}", e))) + .map_err(|e| e.to_df_error("read_group")) }) .await } @@ -199,7 +199,7 @@ impl Planner { planner .read_window_aggregate(database, predicate, agg, every, offset) .await - .map_err(|e| Error::Plan(format!("read_window_aggregate error: {}", e))) + .map_err(|e| e.to_df_error("read_window_aggregate")) }) .await } diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index f88ce0d184..f4d84266e6 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -9,7 +9,7 @@ use arrow_flight::{ use arrow_util::optimize::{optimize_record_batch, optimize_schema}; use bytes::{Bytes, BytesMut}; use data_types::{DatabaseName, DatabaseNameError}; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::{error::DataFusionError, physical_plan::ExecutionPlan}; use futures::{SinkExt, Stream, StreamExt}; use generated_types::influxdata::iox::querier::v1 as proto; use iox_query::{ @@ -54,7 +54,7 @@ pub enum Error { ))] Query { database_name: String, - source: Box, + source: DataFusionError, }, #[snafu(display("Invalid database name: {}", source))] @@ -91,29 +91,40 @@ impl From for tonic::Status { Error::Optimize { .. } | Error::Planning { .. } | Error::Serialization { .. } => warn!(?err, msg), } - err.to_status() + err.into_status() } } impl Error { /// Converts a result from the business logic into the appropriate tonic /// status - fn to_status(&self) -> tonic::Status { - use tonic::Status; - match &self { - Self::InvalidTicket { .. } => Status::invalid_argument(self.to_string()), - Self::InvalidTicketLegacy { .. } => Status::invalid_argument(self.to_string()), - Self::InvalidQuery { .. } => Status::invalid_argument(self.to_string()), - Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()), - Self::Query { .. } => Status::internal(self.to_string()), - Self::InvalidDatabaseName { .. } => Status::invalid_argument(self.to_string()), - Self::Planning { - source: service_common::planner::Error::External(_), - } => Status::internal(self.to_string()), - Self::Planning { .. } => Status::invalid_argument(self.to_string()), - Self::Optimize { .. } => Status::internal(self.to_string()), - Self::Serialization { .. } => Status::internal(self.to_string()), - } + fn into_status(self) -> tonic::Status { + let msg = self.to_string(); + + let code = match self { + Self::DatabaseNotFound { .. } => tonic::Code::NotFound, + Self::InvalidTicket { .. } + | Self::InvalidTicketLegacy { .. } + | Self::InvalidQuery { .. } + | Self::InvalidDatabaseName { .. } => tonic::Code::InvalidArgument, + Self::Planning { source, .. } | Self::Query { source, .. } => { + // traverse context chain + let mut source = source; + while let DataFusionError::Context(_msg, inner) = source { + source = *inner; + } + + match source { + DataFusionError::ResourcesExhausted(_) => tonic::Code::ResourceExhausted, + DataFusionError::Plan(_) => tonic::Code::InvalidArgument, + DataFusionError::NotImplemented(_) => tonic::Code::Unimplemented, + _ => tonic::Code::Internal, + } + } + Self::Optimize { .. } | Self::Serialization { .. } => tonic::Code::Internal, + }; + + tonic::Status::new(code, msg) } } @@ -334,7 +345,6 @@ impl GetStream { let mut stream_record_batches = ctx .execute_stream(Arc::clone(&physical_plan)) .await - .map_err(|e| Box::new(e) as _) .context(QuerySnafu { database_name: &database_name, })?; @@ -382,7 +392,7 @@ impl GetStream { // failure sending here is OK because we're cutting the stream anyways tx.send(Err(Error::Query { database_name: database_name.clone(), - source: Box::new(e), + source: DataFusionError::ArrowError(e), } .into())) .await diff --git a/service_grpc_influxrpc/src/service.rs b/service_grpc_influxrpc/src/service.rs index f8d54e2d05..8ad7bbbbfb 100644 --- a/service_grpc_influxrpc/src/service.rs +++ b/service_grpc_influxrpc/src/service.rs @@ -12,6 +12,7 @@ use crate::{ StorageService, }; use data_types::{org_and_bucket_to_database, DatabaseName}; +use datafusion::error::DataFusionError; use futures::Stream; use generated_types::{ google::protobuf::Empty, literal_or_regex::Value as RegexOrLiteralValue, @@ -54,43 +55,43 @@ pub enum Error { #[snafu(display("Error listing tables in database '{}': {}", db_name, source))] ListingTables { db_name: String, - source: Box, + source: DataFusionError, }, #[snafu(display("Error listing columns in database '{}': {}", db_name, source))] ListingColumns { db_name: String, - source: Box, + source: DataFusionError, }, #[snafu(display("Error listing fields in database '{}': {}", db_name, source))] ListingFields { db_name: String, - source: Box, + source: DataFusionError, }, #[snafu(display("Error creating series plans for database '{}': {}", db_name, source))] PlanningFilteringSeries { db_name: String, - source: Box, + source: DataFusionError, }, #[snafu(display("Error creating group plans for database '{}': {}", db_name, source))] PlanningGroupSeries { db_name: String, - source: Box, + source: DataFusionError, }, #[snafu(display("Error running series plans for database '{}': {}", db_name, source))] FilteringSeries { db_name: String, - source: Box, + source: DataFusionError, }, #[snafu(display("Error running grouping plans for database '{}': {}", db_name, source))] GroupingSeries { db_name: String, - source: Box, + source: DataFusionError, }, #[snafu(display( @@ -102,7 +103,7 @@ pub enum Error { ListingTagValues { db_name: String, tag_name: String, - source: Box, + source: DataFusionError, }, #[snafu(display("Error converting Predicate '{}: {}", rpc_predicate_string, source))] @@ -177,44 +178,56 @@ impl From for tonic::Status { /// status fn from(err: Error) -> Self { error!("Error handling gRPC request: {}", err); - err.to_status() + err.into_status() } } impl Error { /// Converts a result from the business logic into the appropriate tonic /// status - fn to_status(&self) -> tonic::Status { - match &self { - Self::DatabaseNotFound { .. } => Status::not_found(self.to_string()), - Self::ListingTables { .. } => Status::internal(self.to_string()), - Self::ListingColumns { .. } => { - // TODO: distinguish between input errors and internal errors - Status::invalid_argument(self.to_string()) + fn into_status(self) -> tonic::Status { + let msg = self.to_string(); + + let code = match self { + Self::DatabaseNotFound { .. } => tonic::Code::NotFound, + Self::ListingTables { source, .. } + | Self::ListingColumns { source, .. } + | Self::ListingFields { source, .. } + | Self::PlanningFilteringSeries { source, .. } + | Self::PlanningGroupSeries { source, .. } + | Self::FilteringSeries { source, .. } + | Self::GroupingSeries { source, .. } + | Self::ListingTagValues { source, .. } => { + // traverse context chain + let mut source = source; + while let DataFusionError::Context(_msg, inner) = source { + source = *inner; + } + + match source { + DataFusionError::ResourcesExhausted(_) => tonic::Code::ResourceExhausted, + DataFusionError::Plan(_) => tonic::Code::InvalidArgument, + DataFusionError::NotImplemented(_) => tonic::Code::Unimplemented, + _ => tonic::Code::Internal, + } } - Self::ListingFields { .. } => { - // TODO: distinguish between input errors and internal errors - Status::invalid_argument(self.to_string()) + Self::ConvertingPredicate { .. } + | Self::ConvertingReadGroupAggregate { .. } + | Self::ConvertingReadGroupType { .. } + | Self::ConvertingWindowAggregate { .. } + | Self::ConvertingTagKeyInTagValues { .. } + | Self::ComputingGroupedSeriesSet { .. } + | Self::ConvertingFieldList { .. } + | Self::MeasurementLiteralOrRegex { .. } + | Self::MissingTagKeyPredicate {} + | Self::InvalidTagKeyRegex { .. } => tonic::Code::InvalidArgument, + Self::SendingResults { .. } | Self::InternalHintsFieldNotSupported { .. } => { + tonic::Code::Internal } - Self::PlanningFilteringSeries { .. } => Status::invalid_argument(self.to_string()), - Self::PlanningGroupSeries { .. } => Status::invalid_argument(self.to_string()), - Self::FilteringSeries { .. } => Status::invalid_argument(self.to_string()), - Self::GroupingSeries { .. } => Status::invalid_argument(self.to_string()), - Self::ListingTagValues { .. } => Status::invalid_argument(self.to_string()), - Self::ConvertingPredicate { .. } => Status::invalid_argument(self.to_string()), - Self::ConvertingReadGroupAggregate { .. } => Status::invalid_argument(self.to_string()), - Self::ConvertingReadGroupType { .. } => Status::invalid_argument(self.to_string()), - Self::ConvertingWindowAggregate { .. } => Status::invalid_argument(self.to_string()), - Self::ConvertingTagKeyInTagValues { .. } => Status::invalid_argument(self.to_string()), - Self::ComputingGroupedSeriesSet { .. } => Status::invalid_argument(self.to_string()), - Self::ConvertingFieldList { .. } => Status::invalid_argument(self.to_string()), - Self::SendingResults { .. } => Status::internal(self.to_string()), - Self::InternalHintsFieldNotSupported { .. } => Status::internal(self.to_string()), - Self::NotYetImplemented { .. } => Status::internal(self.to_string()), - Self::MeasurementLiteralOrRegex { .. } => Status::invalid_argument(self.to_string()), - Self::MissingTagKeyPredicate {} => Status::invalid_argument(self.to_string()), - Self::InvalidTagKeyRegex { .. } => Status::invalid_argument(self.to_string()), - } + Self::NotYetImplemented { .. } => tonic::Code::Unimplemented, + }; + + tonic::Status::new(code, msg) } } @@ -341,7 +354,7 @@ where &ctx, ) .await - .map_err(|e| e.to_status())? + .map_err(|e| e.into_status())? .into_iter() .map(Ok) .collect::>(); @@ -423,7 +436,7 @@ where &ctx, ) .await - .map_err(|e| e.to_status())? + .map_err(|e| e.into_status())? .into_iter() .map(Ok) .collect::>(); @@ -489,7 +502,7 @@ where &ctx, ) .await - .map_err(|e| e.to_status()); + .map_err(|e| e.into_status()); if response.is_ok() { query_completed_token.set_success(); @@ -560,7 +573,7 @@ where operation: "tag_value for a measurement, with general predicate" .to_string(), } - .to_status()); + .into_status()); } measurement_name_impl(Arc::clone(&db), db_name, range, predicate, &ctx).await @@ -593,7 +606,7 @@ where } }; - let response = response.map_err(|e| e.to_status()); + let response = response.map_err(|e| e.into_status()); if response.is_ok() { query_completed_token.set_success(); @@ -652,7 +665,7 @@ where let results = tag_values_grouped_by_measurement_and_tag_key_impl(Arc::clone(&db), db_name, req, &ctx) .await - .map_err(|e| e.to_status())? + .map_err(|e| e.into_status())? .into_iter() .map(Ok) .collect::>(); @@ -762,7 +775,7 @@ where let response = measurement_name_impl(Arc::clone(&db), db_name, range, predicate, &ctx) .await - .map_err(|e| e.to_status()); + .map_err(|e| e.into_status()); if response.is_ok() { query_completed_token.set_success(); @@ -833,7 +846,7 @@ where &ctx, ) .await - .map_err(|e| e.to_status()); + .map_err(|e| e.into_status()); if response.is_ok() { query_completed_token.set_success(); @@ -907,7 +920,7 @@ where &ctx, ) .await - .map_err(|e| e.to_status()); + .map_err(|e| e.into_status()); if response.is_ok() { query_completed_token.set_success(); @@ -981,9 +994,9 @@ where .map(|fieldlist| { fieldlist_to_measurement_fields_response(fieldlist) .context(ConvertingFieldListSnafu) - .map_err(|e| e.to_status()) + .map_err(|e| e.into_status()) }) - .map_err(|e| e.to_status())?; + .map_err(|e| e.into_status())?; if response.is_ok() { query_completed_token.set_success(); @@ -1048,13 +1061,11 @@ where let plan = Planner::new(ctx) .table_names(db, predicate) .await - .map_err(|e| Box::new(e) as _) .context(ListingTablesSnafu { db_name })?; let table_names = ctx .to_string_set(plan) .await - .map_err(|e| Box::new(e) as _) .context(ListingTablesSnafu { db_name })?; // Map the resulting collection of Strings into a Vec>for return @@ -1095,13 +1106,11 @@ where let tag_key_plan = Planner::new(ctx) .tag_keys(db, predicate) .await - .map_err(|e| Box::new(e) as _) .context(ListingColumnsSnafu { db_name })?; let tag_keys = ctx .to_string_set(tag_key_plan) .await - .map_err(|e| Box::new(e) as _) .context(ListingColumnsSnafu { db_name })?; // Map the resulting collection of Strings into a Vec>for return @@ -1142,13 +1151,11 @@ where let tag_value_plan = Planner::new(ctx) .tag_values(db, tag_name, predicate) .await - .map_err(|e| Box::new(e) as _) .context(ListingTagValuesSnafu { db_name, tag_name })?; let tag_values = ctx .to_string_set(tag_value_plan) .await - .map_err(|e| Box::new(e) as _) .context(ListingTagValuesSnafu { db_name, tag_name })?; // Map the resulting collection of Strings into a Vec>for return @@ -1266,14 +1273,12 @@ where let series_plan = Planner::new(ctx) .read_filter(db, predicate) .await - .map_err(|e| Box::new(e) as _) .context(PlanningFilteringSeriesSnafu { db_name })?; // Execute the plans. let series_or_groups = ctx .to_series_and_groups(series_plan) .await - .map_err(|e| Box::new(e) as _) .context(FilteringSeriesSnafu { db_name }) .log_if_error("Running series set plan")?; @@ -1319,9 +1324,8 @@ where .await } }; - let grouped_series_set_plan = grouped_series_set_plan - .map_err(|e| Box::new(e) as _) - .context(PlanningGroupSeriesSnafu { db_name })?; + let grouped_series_set_plan = + grouped_series_set_plan.context(PlanningGroupSeriesSnafu { db_name })?; // PERF - This used to send responses to the client before execution had // completed, but now it doesn't. We may need to revisit this in the future @@ -1331,7 +1335,6 @@ where let series_or_groups = ctx .to_series_and_groups(grouped_series_set_plan) .await - .map_err(|e| Box::new(e) as _) .context(GroupingSeriesSnafu { db_name }) .log_if_error("Running Grouped SeriesSet Plan")?; @@ -1370,13 +1373,11 @@ where let field_list_plan = Planner::new(ctx) .field_columns(db, predicate) .await - .map_err(|e| Box::new(e) as _) .context(ListingFieldsSnafu { db_name })?; let field_list = ctx .to_field_list(field_list_plan) .await - .map_err(|e| Box::new(e) as _) .context(ListingFieldsSnafu { db_name })?; trace!(field_names=?field_list, "Field names response"); @@ -1878,7 +1879,7 @@ mod tests { let response = fixture.storage_client.tag_keys(request).await; assert_contains!(response.unwrap_err().to_string(), "Sugar we are going down"); - grpc_request_metric_has_count(&fixture, "TagKeys", "client_error", 1); + grpc_request_metric_has_count(&fixture, "TagKeys", "server_error", 1); } /// test the plumbing of the RPC layer for measurement_tag_keys-- @@ -1984,7 +1985,7 @@ mod tests { let response = fixture.storage_client.measurement_tag_keys(request).await; assert_contains!(response.unwrap_err().to_string(), "This is an error"); - grpc_request_metric_has_count(&fixture, "MeasurementTagKeys", "client_error", 1); + grpc_request_metric_has_count(&fixture, "MeasurementTagKeys", "server_error", 1); } /// test the plumbing of the RPC layer for tag_values -- specifically that @@ -2173,7 +2174,8 @@ mod tests { "Error converting tag_key to UTF-8 in tag_values request" ); - grpc_request_metric_has_count(&fixture, "TagValues", "client_error", 2); + grpc_request_metric_has_count(&fixture, "TagValues", "client_error", 1); + grpc_request_metric_has_count(&fixture, "TagValues", "server_error", 1); } #[tokio::test] @@ -2524,7 +2526,7 @@ mod tests { assert_contains!(response_string, "Sugar we are going down"); - grpc_request_metric_has_count(&fixture, "MeasurementTagValues", "client_error", 1); + grpc_request_metric_has_count(&fixture, "MeasurementTagValues", "server_error", 1); } #[tokio::test] @@ -2730,7 +2732,7 @@ mod tests { let response = fixture.storage_client.read_filter(request).await; assert_contains!(response.unwrap_err().to_string(), "Sugar we are going down"); - grpc_request_metric_has_count(&fixture, "ReadFilter", "client_error", 1); + grpc_request_metric_has_count(&fixture, "ReadFilter", "server_error", 1); } #[tokio::test] @@ -2822,7 +2824,7 @@ mod tests { .to_string(); assert_contains!(response_string, "Sugar we are going down"); - grpc_request_metric_has_count(&fixture, "ReadGroup", "client_error", 1); + grpc_request_metric_has_count(&fixture, "ReadGroup", "server_error", 1); } #[tokio::test] @@ -2988,7 +2990,7 @@ mod tests { assert_contains!(response_string, "Sugar we are going down"); - grpc_request_metric_has_count(&fixture, "ReadWindowAggregate", "client_error", 1); + grpc_request_metric_has_count(&fixture, "ReadWindowAggregate", "server_error", 1); } #[tokio::test] diff --git a/test_helpers_end_to_end/src/config.rs b/test_helpers_end_to_end/src/config.rs index b3dc091a93..d4597e8584 100644 --- a/test_helpers_end_to_end/src/config.rs +++ b/test_helpers_end_to_end/src/config.rs @@ -290,6 +290,11 @@ impl TestConfig { self.with_env("INFLUXDB_IOX_FLIGHT_DO_GET_PANIC", times.to_string()) } + /// Configure maximum per-table query bytes for the querier. + pub fn with_querier_max_table_query_bytes(self, bytes: usize) -> Self { + self.with_env("INFLUXDB_IOX_MAX_TABLE_QUERY_BYTES", bytes.to_string()) + } + /// Changes the log to JSON for easier parsing. pub fn with_json_logs(self) -> Self { self.with_env("LOG_FORMAT", "json")