diff --git a/compactor/src/query.rs b/compactor/src/query.rs index 37bfc9d435..21e6f5a2ea 100644 --- a/compactor/src/query.rs +++ b/compactor/src/query.rs @@ -4,40 +4,18 @@ use data_types::{ ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, PartitionId, SequenceNumber, TableSummary, Timestamp, Tombstone, }; -use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; +use datafusion::error::DataFusionError; use iox_query::{ exec::{stringset::StringSet, IOxSessionContext}, util::create_basic_summary, - QueryChunk, QueryChunkMeta, + QueryChunk, QueryChunkData, QueryChunkMeta, }; -use observability_deps::tracing::trace; use parquet_file::chunk::ParquetChunk; use predicate::{delete_predicate::tombstones_to_delete_predicates, Predicate}; use schema::{merge::SchemaMerger, selection::Selection, sort::SortKey, Schema}; -use snafu::{ResultExt, Snafu}; use std::{any::Any, sync::Arc}; use uuid::Uuid; -#[derive(Debug, Snafu)] -#[allow(missing_copy_implementations, missing_docs)] -pub enum Error { - #[snafu(display("Failed to read parquet: {}", source))] - ReadParquet { - source: parquet_file::storage::ReadError, - }, - - #[snafu(display( - "Error reading IOx Metadata from Parquet IoxParquetMetadata: {}", - source - ))] - ReadParquetMeta { - source: parquet_file::storage::ReadError, - }, -} - -/// A specialized `Error` for Compactor's query errors -pub type Result = std::result::Result; - /// QueryableParquetChunk that implements QueryChunk and QueryMetaChunk for building query plan #[derive(Debug, Clone)] pub struct QueryableParquetChunk { @@ -213,33 +191,8 @@ impl QueryChunk for QueryableParquetChunk { Ok(None) } - /// Provides access to raw `QueryChunk` data as an - /// asynchronous stream of `RecordBatch`es filtered by a *required* - /// predicate. Note that not all chunks can evaluate all types of - /// predicates and this function will return an error - /// if requested to evaluate with a predicate that is not supported - /// - /// This is the analog of the `TableProvider` in DataFusion - /// - /// The reason we can't simply use the `TableProvider` trait - /// directly is that the data for a particular Table lives in - /// several chunks within a partition, so there needs to be an - /// implementation of `TableProvider` that stitches together the - /// streams from several different `QueryChunk`s. - fn read_filter( - &self, - mut ctx: IOxSessionContext, - predicate: &Predicate, - selection: Selection<'_>, - ) -> Result { - ctx.set_metadata("storage", "compactor"); - ctx.set_metadata("projection", format!("{}", selection)); - trace!(?selection, "selection"); - - self.data - .read_filter(predicate, selection, ctx.inner()) - .context(ReadParquetSnafu) - .map_err(|e| DataFusionError::External(Box::new(e))) + fn data(&self) -> QueryChunkData { + QueryChunkData::Parquet(self.data.parquet_exec_input()) } /// Returns chunk type diff --git a/ingester/src/query_adaptor.rs b/ingester/src/query_adaptor.rs index be16efaa78..5b5df0f256 100644 --- a/ingester/src/query_adaptor.rs +++ b/ingester/src/query_adaptor.rs @@ -6,24 +6,16 @@ use std::{any::Any, sync::Arc}; use arrow::record_batch::RecordBatch; use arrow_util::util::ensure_schema; use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary}; -use datafusion::{ - error::DataFusionError, - physical_plan::{ - common::SizedRecordBatchStream, - metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics}, - SendableRecordBatchStream, - }, -}; +use datafusion::error::DataFusionError; use iox_query::{ exec::{stringset::StringSet, IOxSessionContext}, util::{compute_timenanosecond_min_max, create_basic_summary}, - QueryChunk, QueryChunkMeta, + QueryChunk, QueryChunkData, QueryChunkMeta, }; -use observability_deps::tracing::trace; use once_cell::sync::OnceCell; use predicate::Predicate; use schema::{merge::merge_record_batch_schemas, selection::Selection, sort::SortKey, Schema}; -use snafu::{ResultExt, Snafu}; +use snafu::Snafu; use crate::data::table::TableName; @@ -230,42 +222,15 @@ impl QueryChunk for QueryAdaptor { Ok(None) } - /// Provides access to raw `QueryChunk` data as an - /// asynchronous stream of `RecordBatch`es - fn read_filter( - &self, - mut ctx: IOxSessionContext, - _predicate: &Predicate, - selection: Selection<'_>, - ) -> Result { - ctx.set_metadata("storage", "ingester"); - ctx.set_metadata("projection", format!("{}", selection)); - trace!(?selection, "selection"); + fn data(&self) -> QueryChunkData { + let schema = self.schema().as_arrow(); - let schema = self - .schema() - .select(selection) - .context(SchemaSnafu) - .map_err(|e| DataFusionError::External(Box::new(e)))?; - - // Apply the projection over all the data in self, ensuring each batch - // has the specified schema. - let batches = self - .project_selection(selection) - .into_iter() - .map(|batch| { - ensure_schema(&schema.as_arrow(), &batch) - .context(ConcatBatchesSnafu {}) - .map(Arc::new) - }) - .collect::, _>>() - .map_err(|e| DataFusionError::External(Box::new(e)))?; - - // Return stream of data - let dummy_metrics = ExecutionPlanMetricsSet::new(); - let mem_metrics = MemTrackingMetrics::new(&dummy_metrics, 0); - let stream = SizedRecordBatchStream::new(schema.as_arrow(), batches, mem_metrics); - Ok(Box::pin(stream)) + QueryChunkData::RecordBatches( + self.data + .iter() + .map(|b| ensure_schema(&schema, b).expect("schema handling broken")) + .collect(), + ) } /// Returns chunk type diff --git a/iox_query/src/lib.rs b/iox_query/src/lib.rs index 94db6974ae..08cd02adb0 100644 --- a/iox_query/src/lib.rs +++ b/iox_query/src/lib.rs @@ -10,12 +10,14 @@ clippy::dbg_macro )] +use arrow::record_batch::RecordBatch; use async_trait::async_trait; use data_types::{ChunkId, ChunkOrder, DeletePredicate, InfluxDbType, PartitionId, TableSummary}; -use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; +use datafusion::{error::DataFusionError, prelude::SessionContext}; use exec::{stringset::StringSet, IOxSessionContext}; use hashbrown::HashMap; use observability_deps::tracing::{debug, trace}; +use parquet_file::storage::ParquetExecInput; use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate, PredicateMatch}; use schema::{ selection::Selection, @@ -173,6 +175,37 @@ pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync { fn as_meta(&self) -> &dyn QueryDatabaseMeta; } +/// Raw data of a [`QueryChunk`]. +#[derive(Debug)] +pub enum QueryChunkData { + /// In-memory record batches. + /// + /// **IMPORTANT: All batches MUST have the schema that the [chunk reports](QueryChunkMeta::schema).** + RecordBatches(Vec), + + /// Parquet file. + /// + /// See [`ParquetExecInput`] for details. + Parquet(ParquetExecInput), +} + +impl QueryChunkData { + /// Read data into [`RecordBatch`]es. This is mostly meant for testing! + pub async fn read_to_batches( + self, + schema: Arc, + session_ctx: &SessionContext, + ) -> Vec { + match self { + Self::RecordBatches(batches) => batches, + Self::Parquet(exec_input) => exec_input + .read_to_batches(schema.as_arrow(), Selection::All, session_ctx) + .await + .unwrap(), + } + } +} + /// Collection of data that shares the same partition key pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static { /// returns the Id of this chunk. Ids are unique within a @@ -222,25 +255,10 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static { predicate: &Predicate, ) -> Result, DataFusionError>; - /// Provides access to raw `QueryChunk` data as an - /// asynchronous stream of `RecordBatch`es filtered by a *required* - /// predicate. Note that not all chunks can evaluate all types of - /// predicates and this function will return an error - /// if requested to evaluate with a predicate that is not supported + /// Provides access to raw [`QueryChunk`] data. /// - /// This is the analog of the `TableProvider` in DataFusion - /// - /// The reason we can't simply use the `TableProvider` trait - /// directly is that the data for a particular Table lives in - /// several chunks within a partition, so there needs to be an - /// implementation of `TableProvider` that stitches together the - /// streams from several different `QueryChunk`s. - fn read_filter( - &self, - ctx: IOxSessionContext, - predicate: &Predicate, - selection: Selection<'_>, - ) -> Result; + /// The engine assume that minimal work shall be performed to gather the `QueryChunkData`. + fn data(&self) -> QueryChunkData; /// Returns chunk type. Useful in tests and debug logs. fn chunk_type(&self) -> &str; diff --git a/iox_query/src/provider/physical.rs b/iox_query/src/provider/physical.rs index e677fabcb5..384583ae47 100644 --- a/iox_query/src/provider/physical.rs +++ b/iox_query/src/provider/physical.rs @@ -1,22 +1,28 @@ //! Implementation of a DataFusion PhysicalPlan node across partition chunks use super::adapter::SchemaAdapterStream; -use crate::{exec::IOxSessionContext, QueryChunk}; +use crate::{exec::IOxSessionContext, QueryChunk, QueryChunkData}; use arrow::datatypes::SchemaRef; use data_types::TableSummary; use datafusion::{ + datasource::listing::PartitionedFile, error::DataFusionError, execution::context::TaskContext, physical_plan::{ + execute_stream, expressions::PhysicalSortExpr, + file_format::{FileScanConfig, ParquetExec}, + memory::MemoryStream, metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, + stream::RecordBatchStreamAdapter, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }, }; +use futures::TryStreamExt; use observability_deps::tracing::trace; use predicate::Predicate; -use schema::{selection::Selection, Schema}; -use std::{fmt, sync::Arc}; +use schema::Schema; +use std::{collections::HashSet, fmt, sync::Arc}; /// Implements the DataFusion physical plan interface #[derive(Debug)] @@ -104,16 +110,13 @@ impl ExecutionPlan for IOxReadFilterNode { fn execute( &self, partition: usize, - _context: Arc, + context: Arc, ) -> datafusion::error::Result { trace!(partition, "Start IOxReadFilterNode::execute"); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - let timer = baseline_metrics.elapsed_compute().timer(); let schema = self.schema(); - let fields = schema.fields(); - let selection_cols = fields.iter().map(|f| f.name() as &str).collect::>(); let chunk = Arc::clone(&self.chunks[partition]); @@ -125,32 +128,79 @@ impl ExecutionPlan for IOxReadFilterNode { // restrict the requested selection to the actual columns // available, and use SchemaAdapterStream to pad the rest of // the columns with NULLs if necessary - let selection_cols = restrict_selection(selection_cols, &chunk_table_schema); - let selection = Selection::Some(&selection_cols); + let final_output_column_names: HashSet<_> = + schema.fields().iter().map(|f| f.name()).collect(); + let projection: Vec<_> = chunk_table_schema + .iter() + .enumerate() + .filter(|(_idx, (_t, field))| final_output_column_names.contains(field.name())) + .map(|(idx, _)| idx) + .collect(); + let projection = (!((projection.len() == chunk_table_schema.len()) + && (projection.iter().enumerate().all(|(a, b)| a == *b)))) + .then_some(projection); + let incomplete_output_schema = projection + .as_ref() + .map(|projection| { + Arc::new( + chunk_table_schema + .as_arrow() + .project(projection) + .expect("projection broken"), + ) + }) + .unwrap_or_else(|| chunk_table_schema.as_arrow()); - let stream = chunk - .read_filter( - self.ctx.child_ctx("chunk read_filter"), - &self.predicate, - selection, - ) - .map_err(|e| { - DataFusionError::Execution(format!( - "Error creating scan for table {} chunk {}: {}", - self.table_name, - chunk.id(), - e - )) - })?; + let stream = match chunk.data() { + QueryChunkData::RecordBatches(batches) => { + let stream = Box::pin(MemoryStream::try_new( + batches, + incomplete_output_schema, + projection, + )?); + let adapter = SchemaAdapterStream::try_new(stream, schema, baseline_metrics) + .map_err(|e| DataFusionError::Internal(e.to_string()))?; + Box::pin(adapter) as SendableRecordBatchStream + } + QueryChunkData::Parquet(exec_input) => { + let base_config = FileScanConfig { + object_store_url: exec_input.object_store_url, + file_schema: Arc::clone(&schema), + file_groups: vec![vec![PartitionedFile { + object_meta: exec_input.object_meta, + partition_values: vec![], + range: None, + extensions: None, + }]], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + config_options: context.session_config().config_options(), + }; + let delete_predicates: Vec<_> = chunk + .delete_predicates() + .iter() + .map(|pred| Arc::new(pred.as_ref().clone().into())) + .collect(); + let predicate = self + .predicate + .clone() + .with_delete_predicates(&delete_predicates); + let exec = ParquetExec::new(base_config, predicate.filter_expr(), None); + let stream = RecordBatchStreamAdapter::new( + schema, + futures::stream::once(execute_stream(Arc::new(exec), context)).try_flatten(), + ); - // all CPU time is now done, pass in baseline metrics to adapter - timer.done(); + // Note: No SchemaAdapterStream required here because `ParquetExec` already creates NULL columns for us. - let adapter = SchemaAdapterStream::try_new(stream, schema, baseline_metrics) - .map_err(|e| DataFusionError::Internal(e.to_string()))?; + Box::pin(stream) + } + }; trace!(partition, "End IOxReadFilterNode::execute"); - Ok(Box::pin(adapter)) + Ok(stream) } fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -190,17 +240,3 @@ impl ExecutionPlan for IOxReadFilterNode { .unwrap_or_default() } } - -/// Removes any columns that are not present in schema, returning a possibly -/// restricted set of columns -fn restrict_selection<'a>( - selection_cols: Vec<&'a str>, - chunk_table_schema: &'a Schema, -) -> Vec<&'a str> { - let arrow_schema = chunk_table_schema.as_arrow(); - - selection_cols - .into_iter() - .filter(|col| arrow_schema.fields().iter().any(|f| f.name() == col)) - .collect() -} diff --git a/iox_query/src/test.rs b/iox_query/src/test.rs index 1a5bf26681..07cf132608 100644 --- a/iox_query/src/test.rs +++ b/iox_query/src/test.rs @@ -8,15 +8,14 @@ use crate::{ stringset::{StringSet, StringSetRef}, ExecutionContextProvider, Executor, ExecutorType, IOxSessionContext, }, - Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryCompletedToken, QueryDatabase, - QueryText, + Predicate, PredicateMatch, QueryChunk, QueryChunkData, QueryChunkMeta, QueryCompletedToken, + QueryDatabase, QueryText, }; use arrow::{ array::{ ArrayRef, DictionaryArray, Int64Array, StringArray, TimestampNanosecondArray, UInt64Array, }, datatypes::{DataType, Int32Type, TimeUnit}, - error::ArrowError, record_batch::RecordBatch, }; use async_trait::async_trait; @@ -24,9 +23,7 @@ use data_types::{ ChunkId, ChunkOrder, ColumnSummary, DeletePredicate, InfluxDbType, PartitionId, StatValues, Statistics, TableSummary, }; -use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; -use datafusion_util::stream_from_batches; -use futures::StreamExt; +use datafusion::error::DataFusionError; use hashbrown::HashSet; use observability_deps::tracing::debug; use parking_lot::Mutex; @@ -949,34 +946,8 @@ impl QueryChunk for TestChunk { self.may_contain_pk_duplicates } - fn read_filter( - &self, - _ctx: IOxSessionContext, - predicate: &Predicate, - selection: Selection<'_>, - ) -> Result { - self.check_error()?; - - // save the predicate - self.predicates.lock().push(predicate.clone()); - - let batches = match self - .schema - .df_projection(selection) - .map_err(|e| DataFusionError::External(Box::new(e)))? - { - None => self.table_data.clone(), - Some(projection) => self - .table_data - .iter() - .map(|batch| { - let batch = batch.project(&projection)?; - Ok(Arc::new(batch)) - }) - .collect::, ArrowError>>()?, - }; - - Ok(stream_from_batches(self.schema().as_arrow(), batches)) + fn data(&self) -> QueryChunkData { + QueryChunkData::RecordBatches(self.table_data.iter().map(|b| b.as_ref().clone()).collect()) } fn chunk_type(&self) -> &str { @@ -1071,17 +1042,10 @@ impl QueryChunkMeta for TestChunk { /// Return the raw data from the list of chunks pub async fn raw_data(chunks: &[Arc]) -> Vec { + let ctx = IOxSessionContext::with_testing(); let mut batches = vec![]; for c in chunks { - let pred = Predicate::default(); - let selection = Selection::All; - let mut stream = c - .read_filter(IOxSessionContext::with_testing(), &pred, selection) - .expect("Error in read_filter"); - while let Some(b) = stream.next().await { - let b = b.expect("Error in stream"); - batches.push(b) - } + batches.append(&mut c.data().read_to_batches(c.schema(), ctx.inner()).await); } batches } diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index d6958cbe79..2d26075c1b 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -30,7 +30,6 @@ use parquet_file::{ metadata::IoxMetadata, storage::{ParquetStorage, StorageId}, }; -use predicate::Predicate; use schema::{ selection::Selection, sort::{adjust_sort_key_columns, compute_sort_key, SortKey}, @@ -389,14 +388,13 @@ impl TestTable { Arc::new(schema), self.catalog.parquet_store.clone(), ); - let rx = chunk - .read_filter( - &Predicate::default(), + chunk + .parquet_exec_input() + .read_to_batches( + chunk.schema().as_arrow(), Selection::All, &chunk.store().test_df_context(), ) - .unwrap(); - datafusion::physical_plan::common::collect(rx) .await .unwrap() } diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index cf3b8488fc..22e04ef780 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -1,10 +1,11 @@ //! A metadata summary of a Parquet file in object storage, with the ability to //! download & execute a scan. -use crate::{storage::ParquetStorage, ParquetFilePath}; +use crate::{ + storage::{ParquetExecInput, ParquetStorage}, + ParquetFilePath, +}; use data_types::{ParquetFile, TimestampMinMax}; -use datafusion::{physical_plan::SendableRecordBatchStream, prelude::SessionContext}; -use predicate::Predicate; use schema::{selection::Selection, Schema}; use std::{collections::BTreeSet, mem, sync::Arc}; use uuid::Uuid; @@ -78,21 +79,14 @@ impl ParquetChunk { } /// Return stream of data read from parquet file - pub fn read_filter( - &self, - predicate: &Predicate, - selection: Selection<'_>, - session_ctx: &SessionContext, - ) -> Result { + /// Inputs for [`ParquetExec`]. + /// + /// See [`ParquetExecInput`] for more information. + /// + /// [`ParquetExec`]: datafusion::physical_plan::file_format::ParquetExec + pub fn parquet_exec_input(&self) -> ParquetExecInput { let path: ParquetFilePath = self.parquet_file.as_ref().into(); - self.store.read_filter( - predicate, - selection, - Arc::clone(&self.schema.as_arrow()), - &path, - self.file_size_bytes(), - session_ctx, - ) + self.store.parquet_exec_input(&path, self.file_size_bytes()) } /// The total number of rows in all row groups in this chunk. diff --git a/parquet_file/src/storage.rs b/parquet_file/src/storage.rs index f4f3b7c6b9..0f245101f0 100644 --- a/parquet_file/src/storage.rs +++ b/parquet_file/src/storage.rs @@ -6,27 +6,26 @@ use crate::{ serialize::{self, CodecError}, ParquetFilePath, }; -use arrow::datatypes::{Field, SchemaRef}; +use arrow::{ + datatypes::{Field, SchemaRef}, + record_batch::RecordBatch, +}; use bytes::Bytes; use datafusion::{ datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}, + error::DataFusionError, execution::context::TaskContext, physical_plan::{ - execute_stream, file_format::{FileScanConfig, ParquetExec}, - stream::RecordBatchStreamAdapter, - SendableRecordBatchStream, Statistics, + ExecutionPlan, SendableRecordBatchStream, Statistics, }, prelude::SessionContext, }; use datafusion_util::config::iox_session_config; -use futures::TryStreamExt; use object_store::{DynObjectStore, ObjectMeta}; use observability_deps::tracing::*; -use predicate::Predicate; use schema::selection::{select_schema, Selection}; use std::{ - num::TryFromIntError, sync::Arc, time::{Duration, Instant}, }; @@ -53,38 +52,6 @@ pub enum UploadError { Upload(#[from] object_store::Error), } -/// Errors during Parquet file download & scan. -#[derive(Debug, Error)] -#[allow(clippy::large_enum_variant)] -pub enum ReadError { - /// Error writing the bytes fetched from object store to the temporary - /// parquet file on disk. - #[error("i/o error writing downloaded parquet: {0}")] - IO(#[from] std::io::Error), - - /// An error fetching Parquet file bytes from object store. - #[error("failed to read data from object store: {0}")] - ObjectStore(#[from] object_store::Error), - - /// An error reading the downloaded Parquet file. - #[error("invalid parquet file: {0}")] - Parquet(#[from] parquet::errors::ParquetError), - - /// Schema mismatch - #[error("Schema mismatch (expected VS actual parquet file) for file '{path}': {source}")] - SchemaMismatch { - /// Path of the affected parquet file. - path: object_store::path::Path, - - /// Source error - source: ProjectionError, - }, - - /// Malformed integer data for row count - #[error("Malformed row count integer")] - MalformedRowCount(#[from] TryFromIntError), -} - /// ID for an object store hooked up into DataFusion. #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] pub struct StorageId(&'static str); @@ -107,6 +74,68 @@ impl std::fmt::Display for StorageId { } } +/// Inputs required to build a [`ParquetExec`] for one or multiple files. +/// +/// The files shall be grouped by [`object_store_url`](Self::object_store_url). For each each object store, you shall +/// create one [`ParquetExec`] and put each file into its own "file group". +/// +/// [`ParquetExec`]: datafusion::physical_plan::file_format::ParquetExec +#[derive(Debug)] +pub struct ParquetExecInput { + /// Store where the file is located. + pub object_store_url: ObjectStoreUrl, + + /// Object metadata. + pub object_meta: ObjectMeta, +} + +impl ParquetExecInput { + /// Read parquet file into [`RecordBatch`]es. + /// + /// This should only be used for testing purposes. + pub async fn read_to_batches( + &self, + schema: SchemaRef, + selection: Selection<'_>, + session_ctx: &SessionContext, + ) -> Result, DataFusionError> { + // Compute final (output) schema after selection + let schema = Arc::new( + select_schema(selection, &schema) + .as_ref() + .clone() + .with_metadata(Default::default()), + ); + + let base_config = FileScanConfig { + object_store_url: self.object_store_url.clone(), + file_schema: schema, + file_groups: vec![vec![PartitionedFile { + object_meta: self.object_meta.clone(), + partition_values: vec![], + range: None, + extensions: None, + }]], + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + // TODO avoid this `copied_config` when config_options are directly available on context + config_options: session_ctx.copied_config().config_options(), + }; + let exec = ParquetExec::new(base_config, None, None); + let exec_schema = exec.schema(); + datafusion::physical_plan::collect(Arc::new(exec), session_ctx.task_ctx()) + .await + .map(|batches| { + for batch in &batches { + assert_eq!(batch.schema(), exec_schema); + } + batches + }) + } +} + /// The [`ParquetStorage`] type encapsulates [`RecordBatch`] persistence to an /// underlying [`ObjectStore`]. /// @@ -220,72 +249,22 @@ impl ParquetStorage { Ok((parquet_meta, file_size)) } - /// Pull the Parquet-encoded [`RecordBatch`] at the file path derived from - /// the provided [`ParquetFilePath`]. + /// Inputs for [`ParquetExec`]. /// - /// The `selection` projection is pushed down to the Parquet deserializer. + /// See [`ParquetExecInput`] for more information. /// - /// This impl fetches the associated Parquet file bytes from object storage, - /// temporarily persisting them to a local temp file to feed to the arrow - /// reader. - /// - /// No caching is performed by `read_filter()`, and each call to - /// `read_filter()` will re-download the parquet file unless the underlying - /// object store impl caches the fetched bytes. - /// - /// [`RecordBatch`]: arrow::record_batch::RecordBatch - pub fn read_filter( - &self, - predicate: &Predicate, - selection: Selection<'_>, - schema: SchemaRef, - path: &ParquetFilePath, - file_size: usize, - session_ctx: &SessionContext, - ) -> Result { - let path = path.object_store_path(); - trace!(path=?path, "fetching parquet data for filtered read"); - - // Compute final (output) schema after selection - let schema = Arc::new( - select_schema(selection, &schema) - .as_ref() - .clone() - .with_metadata(Default::default()), - ); - - // create ParquetExec node - let object_meta = ObjectMeta { - location: path, - // we don't care about the "last modified" field - last_modified: Default::default(), - size: file_size, - }; - let expr = predicate.filter_expr(); - let base_config = FileScanConfig { + /// [`ParquetExec`]: datafusion::physical_plan::file_format::ParquetExec + pub fn parquet_exec_input(&self, path: &ParquetFilePath, file_size: usize) -> ParquetExecInput { + ParquetExecInput { object_store_url: ObjectStoreUrl::parse(format!("iox://{}/", self.id)) .expect("valid object store URL"), - file_schema: Arc::clone(&schema), - file_groups: vec![vec![PartitionedFile { - object_meta, - partition_values: vec![], - range: None, - extensions: None, - }]], - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - // TODO avoid this `copied_config` when config_options are directly available on context - config_options: session_ctx.copied_config().config_options(), - }; - let exec = ParquetExec::new(base_config, expr, None); - - Ok(Box::pin(RecordBatchStreamAdapter::new( - Arc::clone(&schema), - futures::stream::once(execute_stream(Arc::new(exec), session_ctx.task_ctx())) - .try_flatten(), - ))) + object_meta: ObjectMeta { + location: path.object_store_path(), + // we don't care about the "last modified" field + last_modified: Default::default(), + size: file_size, + }, + } } } @@ -598,24 +577,13 @@ mod tests { file_size: usize, ) -> Result { let path: ParquetFilePath = meta.into(); - let rx = store - .read_filter( - &Predicate::default(), - selection, - expected_schema, - &path, - file_size, - &store.test_df_context(), - ) - .expect("should read record batches from object store"); - let schema = rx.schema(); - datafusion::physical_plan::common::collect(rx) + store + .parquet_exec_input(&path, file_size) + .read_to_batches(expected_schema, selection, &store.test_df_context()) .await .map(|mut batches| { assert_eq!(batches.len(), 1); - let batch = batches.remove(0); - assert_eq!(batch.schema(), schema); - batch + batches.remove(0) }) } diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index 8b8e73272e..03a2e89082 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -340,14 +340,13 @@ pub mod tests { use arrow::{datatypes::DataType, record_batch::RecordBatch}; use arrow_util::assert_batches_eq; use data_types::{ColumnType, NamespaceSchema}; - use futures::StreamExt; use iox_query::{ exec::{ExecutorType, IOxSessionContext}, QueryChunk, QueryChunkMeta, }; use iox_tests::util::{TestCatalog, TestNamespace, TestParquetFileBuilder}; use metric::{Attributes, Observation, RawReporter}; - use schema::{builder::SchemaBuilder, selection::Selection, sort::SortKeyBuilder}; + use schema::{builder::SchemaBuilder, sort::SortKeyBuilder}; use test_helpers::maybe_start_logging; use tokio::runtime::Handle; @@ -397,13 +396,9 @@ pub mod tests { ctx: IOxSessionContext, ) -> Vec { chunk - .read_filter(ctx, &Default::default(), Selection::All) - .unwrap() - .collect::>() + .data() + .read_to_batches(chunk.schema(), ctx.inner()) .await - .into_iter() - .map(Result::unwrap) - .collect() } struct TestData { diff --git a/querier/src/chunk/query_access.rs b/querier/src/chunk/query_access.rs index bcc9f29b8d..3b2765f929 100644 --- a/querier/src/chunk/query_access.rs +++ b/querier/src/chunk/query_access.rs @@ -1,35 +1,13 @@ use crate::chunk::QuerierChunk; use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary}; -use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; +use datafusion::error::DataFusionError; use iox_query::{ exec::{stringset::StringSet, IOxSessionContext}, - QueryChunk, QueryChunkMeta, + QueryChunk, QueryChunkData, QueryChunkMeta, }; -use observability_deps::tracing::debug; use predicate::Predicate; use schema::{selection::Selection, sort::SortKey, Schema}; -use snafu::{ResultExt, Snafu}; use std::{any::Any, sync::Arc}; -use trace::span::SpanRecorder; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Parquet File Error in chunk {}: {}", chunk_id, source))] - ParquetFileChunk { - source: Box, - chunk_id: ChunkId, - }, - - #[snafu(display( - "Could not find column name '{}' in read buffer column_values results for chunk {}", - column_name, - chunk_id, - ))] - ColumnNameNotFound { - column_name: String, - chunk_id: ChunkId, - }, -} impl QueryChunkMeta for QuerierChunk { fn summary(&self) -> Arc { @@ -103,42 +81,8 @@ impl QueryChunk for QuerierChunk { Ok(None) } - fn read_filter( - &self, - mut ctx: IOxSessionContext, - predicate: &Predicate, - selection: Selection<'_>, - ) -> Result { - let span_recorder = SpanRecorder::new( - ctx.span() - .map(|span| span.child("QuerierChunk::read_filter")), - ); - let delete_predicates: Vec<_> = self - .delete_predicates() - .iter() - .map(|pred| Arc::new(pred.as_ref().clone().into())) - .collect(); - ctx.set_metadata("delete_predicates", delete_predicates.len() as i64); - - // merge the negated delete predicates into the select predicate - let pred_with_deleted_exprs = predicate.clone().with_delete_predicates(&delete_predicates); - debug!(?pred_with_deleted_exprs, "Merged negated predicate"); - - ctx.set_metadata("predicate", format!("{}", &pred_with_deleted_exprs)); - ctx.set_metadata("projection", format!("{}", selection)); - ctx.set_metadata("storage", "parquet"); - - let chunk_id = self.id(); - debug!(?predicate, "parquet read_filter"); - - // TODO(marco): propagate span all the way down to the object store cache access - let _span_recorder = span_recorder; - - self.parquet_chunk - .read_filter(&pred_with_deleted_exprs, selection, ctx.inner()) - .map_err(Box::new) - .context(ParquetFileChunkSnafu { chunk_id }) - .map_err(|e| DataFusionError::External(Box::new(e))) + fn data(&self) -> QueryChunkData { + QueryChunkData::Parquet(self.parquet_chunk.parquet_exec_input()) } fn chunk_type(&self) -> &str { diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index e655d6436f..0ab1cd04cf 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -12,7 +12,6 @@ use data_types::{ TableSummary, TimestampMinMax, }; use datafusion::error::DataFusionError; -use datafusion_util::MemoryStream; use futures::{stream::FuturesUnordered, TryStreamExt}; use generated_types::{ influxdata::iox::ingester::v1::GetWriteInfoResponse, @@ -25,7 +24,7 @@ use influxdb_iox_client::flight::{ use iox_query::{ exec::{stringset::StringSet, IOxSessionContext}, util::{compute_timenanosecond_min_max, create_basic_summary}, - QueryChunk, QueryChunkMeta, + QueryChunk, QueryChunkData, QueryChunkMeta, }; use iox_time::{Time, TimeProvider}; use metric::{DurationHistogram, Metric}; @@ -1111,30 +1110,8 @@ impl QueryChunk for IngesterChunk { Ok(None) } - fn read_filter( - &self, - _ctx: IOxSessionContext, - predicate: &Predicate, - selection: Selection<'_>, - ) -> Result { - trace!(?predicate, ?selection, input_batches=?self.batches, "Reading data"); - - // Apply selection to in-memory batch - let batches = match self - .schema - .df_projection(selection) - .map_err(|e| DataFusionError::External(Box::new(e)))? - { - None => self.batches.clone(), - Some(projection) => self - .batches - .iter() - .map(|batch| batch.project(&projection)) - .collect::, ArrowError>>()?, - }; - trace!(?predicate, ?selection, output_batches=?batches, input_batches=?self.batches, "Reading data"); - - Ok(Box::pin(MemoryStream::new(batches))) + fn data(&self) -> QueryChunkData { + QueryChunkData::RecordBatches(self.batches.clone()) } fn chunk_type(&self) -> &str { diff --git a/querier/src/ingester/test_util.rs b/querier/src/ingester/test_util.rs index 39ae18531d..8328ef2c79 100644 --- a/querier/src/ingester/test_util.rs +++ b/querier/src/ingester/test_util.rs @@ -1,10 +1,8 @@ use super::IngesterConnection; -use arrow::record_batch::RecordBatch; use async_trait::async_trait; use data_types::ShardIndex; -use futures::StreamExt; use generated_types::influxdata::iox::ingester::v1::GetWriteInfoResponse; -use iox_query::{exec::IOxSessionContext, util::create_basic_summary, QueryChunk}; +use iox_query::util::create_basic_summary; use parking_lot::Mutex; use schema::selection::Selection; use schema::Schema as IOxSchema; @@ -38,7 +36,7 @@ impl IngesterConnection for MockIngesterConnection { _namespace_name: Arc, _table_name: Arc, columns: Vec, - predicate: &predicate::Predicate, + _predicate: &predicate::Predicate, _expected_schema: Arc, _span: Option, ) -> super::Result> { @@ -77,14 +75,14 @@ impl IngesterConnection for MockIngesterConnection { .chunks .into_iter() .map(|ic| async move { - let mut batches: Vec = vec![]; - let mut stream = ic - .read_filter(IOxSessionContext::with_testing(), predicate, selection) - .expect("Error in read_filter"); - while let Some(b) = stream.next().await { - let b = b.expect("Error in stream"); - batches.push(b) - } + let batches: Vec<_> = ic + .batches + .iter() + .map(|batch| match ic.schema.df_projection(selection).unwrap() { + Some(projection) => batch.project(&projection).unwrap(), + None => batch.clone(), + }) + .collect(); assert!(!batches.is_empty(), "Error: empty batches"); let new_schema = IOxSchema::try_from(batches[0].schema()).unwrap(); diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 6d7d6b4335..e049335eaa 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -520,7 +520,7 @@ mod tests { use iox_query::exec::IOxSessionContext; use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable}; use predicate::Predicate; - use schema::{builder::SchemaBuilder, selection::Selection, InfluxFieldType}; + use schema::{builder::SchemaBuilder, InfluxFieldType}; use std::sync::Arc; use test_helpers::maybe_start_logging; use trace::{span::SpanStatus, RingBufferTraceCollector}; @@ -711,8 +711,8 @@ mod tests { .await .unwrap(); assert_eq!(chunks.len(), 1); - let chunk = &chunks[0]; + assert_eq!(chunk.chunk_type(), "IngesterPartition"); // verify chunk schema let schema = chunk.schema(); @@ -739,17 +739,9 @@ mod tests { // verify chunk data let batches = chunk - .read_filter( - IOxSessionContext::with_testing(), - &Default::default(), - Selection::All, - ) - .unwrap() - .collect::>() - .await - .into_iter() - .map(Result::unwrap) - .collect::>(); + .data() + .read_to_batches(chunk.schema(), IOxSessionContext::with_testing().inner()) + .await; let expected = vec![ "+-----+------+------+--------------------------------+", "| foo | tag1 | tag2 | time |",