refactor: simplify `QueryChunk` data access (#6015)

* refactor: simplify `QueryChunk` data access

We have only two types for chunks (now that the RUB is gone):

1. In-memory RecordBatches
2. Parquet files

Loads of logic is duplicated in the different `read_filter`
implementations. Also `read_filter` hides a solid amount of logic from
DataFusion, which will prevent certain (future) optimizations. To enable #5897
and to simplify the interface, let the chunks return the data (batches
or metadata for parquet files) directly and let `iox_query` perform the
actual heavy-lifting.

* docs: improve

Co-authored-by: Andrew Lamb <alamb@influxdata.com>

* docs: improve

Co-authored-by: Andrew Lamb <alamb@influxdata.com>

Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-11-02 08:18:33 +00:00 committed by GitHub
parent 1eb0d64210
commit 45b3984aa3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 261 additions and 459 deletions

View File

@ -4,40 +4,18 @@ use data_types::{
ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, PartitionId, SequenceNumber,
TableSummary, Timestamp, Tombstone,
};
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use datafusion::error::DataFusionError;
use iox_query::{
exec::{stringset::StringSet, IOxSessionContext},
util::create_basic_summary,
QueryChunk, QueryChunkMeta,
QueryChunk, QueryChunkData, QueryChunkMeta,
};
use observability_deps::tracing::trace;
use parquet_file::chunk::ParquetChunk;
use predicate::{delete_predicate::tombstones_to_delete_predicates, Predicate};
use schema::{merge::SchemaMerger, selection::Selection, sort::SortKey, Schema};
use snafu::{ResultExt, Snafu};
use std::{any::Any, sync::Arc};
use uuid::Uuid;
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {
#[snafu(display("Failed to read parquet: {}", source))]
ReadParquet {
source: parquet_file::storage::ReadError,
},
#[snafu(display(
"Error reading IOx Metadata from Parquet IoxParquetMetadata: {}",
source
))]
ReadParquetMeta {
source: parquet_file::storage::ReadError,
},
}
/// A specialized `Error` for Compactor's query errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// QueryableParquetChunk that implements QueryChunk and QueryMetaChunk for building query plan
#[derive(Debug, Clone)]
pub struct QueryableParquetChunk {
@ -213,33 +191,8 @@ impl QueryChunk for QueryableParquetChunk {
Ok(None)
}
/// Provides access to raw `QueryChunk` data as an
/// asynchronous stream of `RecordBatch`es filtered by a *required*
/// predicate. Note that not all chunks can evaluate all types of
/// predicates and this function will return an error
/// if requested to evaluate with a predicate that is not supported
///
/// This is the analog of the `TableProvider` in DataFusion
///
/// The reason we can't simply use the `TableProvider` trait
/// directly is that the data for a particular Table lives in
/// several chunks within a partition, so there needs to be an
/// implementation of `TableProvider` that stitches together the
/// streams from several different `QueryChunk`s.
fn read_filter(
&self,
mut ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, DataFusionError> {
ctx.set_metadata("storage", "compactor");
ctx.set_metadata("projection", format!("{}", selection));
trace!(?selection, "selection");
self.data
.read_filter(predicate, selection, ctx.inner())
.context(ReadParquetSnafu)
.map_err(|e| DataFusionError::External(Box::new(e)))
fn data(&self) -> QueryChunkData {
QueryChunkData::Parquet(self.data.parquet_exec_input())
}
/// Returns chunk type

View File

@ -6,24 +6,16 @@ use std::{any::Any, sync::Arc};
use arrow::record_batch::RecordBatch;
use arrow_util::util::ensure_schema;
use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary};
use datafusion::{
error::DataFusionError,
physical_plan::{
common::SizedRecordBatchStream,
metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics},
SendableRecordBatchStream,
},
};
use datafusion::error::DataFusionError;
use iox_query::{
exec::{stringset::StringSet, IOxSessionContext},
util::{compute_timenanosecond_min_max, create_basic_summary},
QueryChunk, QueryChunkMeta,
QueryChunk, QueryChunkData, QueryChunkMeta,
};
use observability_deps::tracing::trace;
use once_cell::sync::OnceCell;
use predicate::Predicate;
use schema::{merge::merge_record_batch_schemas, selection::Selection, sort::SortKey, Schema};
use snafu::{ResultExt, Snafu};
use snafu::Snafu;
use crate::data::table::TableName;
@ -230,42 +222,15 @@ impl QueryChunk for QueryAdaptor {
Ok(None)
}
/// Provides access to raw `QueryChunk` data as an
/// asynchronous stream of `RecordBatch`es
fn read_filter(
&self,
mut ctx: IOxSessionContext,
_predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, DataFusionError> {
ctx.set_metadata("storage", "ingester");
ctx.set_metadata("projection", format!("{}", selection));
trace!(?selection, "selection");
fn data(&self) -> QueryChunkData {
let schema = self.schema().as_arrow();
let schema = self
.schema()
.select(selection)
.context(SchemaSnafu)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
// Apply the projection over all the data in self, ensuring each batch
// has the specified schema.
let batches = self
.project_selection(selection)
.into_iter()
.map(|batch| {
ensure_schema(&schema.as_arrow(), &batch)
.context(ConcatBatchesSnafu {})
.map(Arc::new)
})
.collect::<Result<Vec<_>, _>>()
.map_err(|e| DataFusionError::External(Box::new(e)))?;
// Return stream of data
let dummy_metrics = ExecutionPlanMetricsSet::new();
let mem_metrics = MemTrackingMetrics::new(&dummy_metrics, 0);
let stream = SizedRecordBatchStream::new(schema.as_arrow(), batches, mem_metrics);
Ok(Box::pin(stream))
QueryChunkData::RecordBatches(
self.data
.iter()
.map(|b| ensure_schema(&schema, b).expect("schema handling broken"))
.collect(),
)
}
/// Returns chunk type

View File

@ -10,12 +10,14 @@
clippy::dbg_macro
)]
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use data_types::{ChunkId, ChunkOrder, DeletePredicate, InfluxDbType, PartitionId, TableSummary};
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use datafusion::{error::DataFusionError, prelude::SessionContext};
use exec::{stringset::StringSet, IOxSessionContext};
use hashbrown::HashMap;
use observability_deps::tracing::{debug, trace};
use parquet_file::storage::ParquetExecInput;
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate, PredicateMatch};
use schema::{
selection::Selection,
@ -173,6 +175,37 @@ pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync {
fn as_meta(&self) -> &dyn QueryDatabaseMeta;
}
/// Raw data of a [`QueryChunk`].
#[derive(Debug)]
pub enum QueryChunkData {
/// In-memory record batches.
///
/// **IMPORTANT: All batches MUST have the schema that the [chunk reports](QueryChunkMeta::schema).**
RecordBatches(Vec<RecordBatch>),
/// Parquet file.
///
/// See [`ParquetExecInput`] for details.
Parquet(ParquetExecInput),
}
impl QueryChunkData {
/// Read data into [`RecordBatch`]es. This is mostly meant for testing!
pub async fn read_to_batches(
self,
schema: Arc<Schema>,
session_ctx: &SessionContext,
) -> Vec<RecordBatch> {
match self {
Self::RecordBatches(batches) => batches,
Self::Parquet(exec_input) => exec_input
.read_to_batches(schema.as_arrow(), Selection::All, session_ctx)
.await
.unwrap(),
}
}
}
/// Collection of data that shares the same partition key
pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
/// returns the Id of this chunk. Ids are unique within a
@ -222,25 +255,10 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
predicate: &Predicate,
) -> Result<Option<StringSet>, DataFusionError>;
/// Provides access to raw `QueryChunk` data as an
/// asynchronous stream of `RecordBatch`es filtered by a *required*
/// predicate. Note that not all chunks can evaluate all types of
/// predicates and this function will return an error
/// if requested to evaluate with a predicate that is not supported
/// Provides access to raw [`QueryChunk`] data.
///
/// This is the analog of the `TableProvider` in DataFusion
///
/// The reason we can't simply use the `TableProvider` trait
/// directly is that the data for a particular Table lives in
/// several chunks within a partition, so there needs to be an
/// implementation of `TableProvider` that stitches together the
/// streams from several different `QueryChunk`s.
fn read_filter(
&self,
ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, DataFusionError>;
/// The engine assume that minimal work shall be performed to gather the `QueryChunkData`.
fn data(&self) -> QueryChunkData;
/// Returns chunk type. Useful in tests and debug logs.
fn chunk_type(&self) -> &str;

View File

@ -1,22 +1,28 @@
//! Implementation of a DataFusion PhysicalPlan node across partition chunks
use super::adapter::SchemaAdapterStream;
use crate::{exec::IOxSessionContext, QueryChunk};
use crate::{exec::IOxSessionContext, QueryChunk, QueryChunkData};
use arrow::datatypes::SchemaRef;
use data_types::TableSummary;
use datafusion::{
datasource::listing::PartitionedFile,
error::DataFusionError,
execution::context::TaskContext,
physical_plan::{
execute_stream,
expressions::PhysicalSortExpr,
file_format::{FileScanConfig, ParquetExec},
memory::MemoryStream,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
stream::RecordBatchStreamAdapter,
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
},
};
use futures::TryStreamExt;
use observability_deps::tracing::trace;
use predicate::Predicate;
use schema::{selection::Selection, Schema};
use std::{fmt, sync::Arc};
use schema::Schema;
use std::{collections::HashSet, fmt, sync::Arc};
/// Implements the DataFusion physical plan interface
#[derive(Debug)]
@ -104,16 +110,13 @@ impl ExecutionPlan for IOxReadFilterNode {
fn execute(
&self,
partition: usize,
_context: Arc<TaskContext>,
context: Arc<TaskContext>,
) -> datafusion::error::Result<SendableRecordBatchStream> {
trace!(partition, "Start IOxReadFilterNode::execute");
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let timer = baseline_metrics.elapsed_compute().timer();
let schema = self.schema();
let fields = schema.fields();
let selection_cols = fields.iter().map(|f| f.name() as &str).collect::<Vec<_>>();
let chunk = Arc::clone(&self.chunks[partition]);
@ -125,32 +128,79 @@ impl ExecutionPlan for IOxReadFilterNode {
// restrict the requested selection to the actual columns
// available, and use SchemaAdapterStream to pad the rest of
// the columns with NULLs if necessary
let selection_cols = restrict_selection(selection_cols, &chunk_table_schema);
let selection = Selection::Some(&selection_cols);
let final_output_column_names: HashSet<_> =
schema.fields().iter().map(|f| f.name()).collect();
let projection: Vec<_> = chunk_table_schema
.iter()
.enumerate()
.filter(|(_idx, (_t, field))| final_output_column_names.contains(field.name()))
.map(|(idx, _)| idx)
.collect();
let projection = (!((projection.len() == chunk_table_schema.len())
&& (projection.iter().enumerate().all(|(a, b)| a == *b))))
.then_some(projection);
let incomplete_output_schema = projection
.as_ref()
.map(|projection| {
Arc::new(
chunk_table_schema
.as_arrow()
.project(projection)
.expect("projection broken"),
)
})
.unwrap_or_else(|| chunk_table_schema.as_arrow());
let stream = chunk
.read_filter(
self.ctx.child_ctx("chunk read_filter"),
&self.predicate,
selection,
)
.map_err(|e| {
DataFusionError::Execution(format!(
"Error creating scan for table {} chunk {}: {}",
self.table_name,
chunk.id(),
e
))
})?;
let stream = match chunk.data() {
QueryChunkData::RecordBatches(batches) => {
let stream = Box::pin(MemoryStream::try_new(
batches,
incomplete_output_schema,
projection,
)?);
let adapter = SchemaAdapterStream::try_new(stream, schema, baseline_metrics)
.map_err(|e| DataFusionError::Internal(e.to_string()))?;
Box::pin(adapter) as SendableRecordBatchStream
}
QueryChunkData::Parquet(exec_input) => {
let base_config = FileScanConfig {
object_store_url: exec_input.object_store_url,
file_schema: Arc::clone(&schema),
file_groups: vec![vec![PartitionedFile {
object_meta: exec_input.object_meta,
partition_values: vec![],
range: None,
extensions: None,
}]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
config_options: context.session_config().config_options(),
};
let delete_predicates: Vec<_> = chunk
.delete_predicates()
.iter()
.map(|pred| Arc::new(pred.as_ref().clone().into()))
.collect();
let predicate = self
.predicate
.clone()
.with_delete_predicates(&delete_predicates);
let exec = ParquetExec::new(base_config, predicate.filter_expr(), None);
let stream = RecordBatchStreamAdapter::new(
schema,
futures::stream::once(execute_stream(Arc::new(exec), context)).try_flatten(),
);
// all CPU time is now done, pass in baseline metrics to adapter
timer.done();
// Note: No SchemaAdapterStream required here because `ParquetExec` already creates NULL columns for us.
let adapter = SchemaAdapterStream::try_new(stream, schema, baseline_metrics)
.map_err(|e| DataFusionError::Internal(e.to_string()))?;
Box::pin(stream)
}
};
trace!(partition, "End IOxReadFilterNode::execute");
Ok(Box::pin(adapter))
Ok(stream)
}
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
@ -190,17 +240,3 @@ impl ExecutionPlan for IOxReadFilterNode {
.unwrap_or_default()
}
}
/// Removes any columns that are not present in schema, returning a possibly
/// restricted set of columns
fn restrict_selection<'a>(
selection_cols: Vec<&'a str>,
chunk_table_schema: &'a Schema,
) -> Vec<&'a str> {
let arrow_schema = chunk_table_schema.as_arrow();
selection_cols
.into_iter()
.filter(|col| arrow_schema.fields().iter().any(|f| f.name() == col))
.collect()
}

View File

@ -8,15 +8,14 @@ use crate::{
stringset::{StringSet, StringSetRef},
ExecutionContextProvider, Executor, ExecutorType, IOxSessionContext,
},
Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryCompletedToken, QueryDatabase,
QueryText,
Predicate, PredicateMatch, QueryChunk, QueryChunkData, QueryChunkMeta, QueryCompletedToken,
QueryDatabase, QueryText,
};
use arrow::{
array::{
ArrayRef, DictionaryArray, Int64Array, StringArray, TimestampNanosecondArray, UInt64Array,
},
datatypes::{DataType, Int32Type, TimeUnit},
error::ArrowError,
record_batch::RecordBatch,
};
use async_trait::async_trait;
@ -24,9 +23,7 @@ use data_types::{
ChunkId, ChunkOrder, ColumnSummary, DeletePredicate, InfluxDbType, PartitionId, StatValues,
Statistics, TableSummary,
};
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use datafusion_util::stream_from_batches;
use futures::StreamExt;
use datafusion::error::DataFusionError;
use hashbrown::HashSet;
use observability_deps::tracing::debug;
use parking_lot::Mutex;
@ -949,34 +946,8 @@ impl QueryChunk for TestChunk {
self.may_contain_pk_duplicates
}
fn read_filter(
&self,
_ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, DataFusionError> {
self.check_error()?;
// save the predicate
self.predicates.lock().push(predicate.clone());
let batches = match self
.schema
.df_projection(selection)
.map_err(|e| DataFusionError::External(Box::new(e)))?
{
None => self.table_data.clone(),
Some(projection) => self
.table_data
.iter()
.map(|batch| {
let batch = batch.project(&projection)?;
Ok(Arc::new(batch))
})
.collect::<std::result::Result<Vec<_>, ArrowError>>()?,
};
Ok(stream_from_batches(self.schema().as_arrow(), batches))
fn data(&self) -> QueryChunkData {
QueryChunkData::RecordBatches(self.table_data.iter().map(|b| b.as_ref().clone()).collect())
}
fn chunk_type(&self) -> &str {
@ -1071,17 +1042,10 @@ impl QueryChunkMeta for TestChunk {
/// Return the raw data from the list of chunks
pub async fn raw_data(chunks: &[Arc<dyn QueryChunk>]) -> Vec<RecordBatch> {
let ctx = IOxSessionContext::with_testing();
let mut batches = vec![];
for c in chunks {
let pred = Predicate::default();
let selection = Selection::All;
let mut stream = c
.read_filter(IOxSessionContext::with_testing(), &pred, selection)
.expect("Error in read_filter");
while let Some(b) = stream.next().await {
let b = b.expect("Error in stream");
batches.push(b)
}
batches.append(&mut c.data().read_to_batches(c.schema(), ctx.inner()).await);
}
batches
}

View File

@ -30,7 +30,6 @@ use parquet_file::{
metadata::IoxMetadata,
storage::{ParquetStorage, StorageId},
};
use predicate::Predicate;
use schema::{
selection::Selection,
sort::{adjust_sort_key_columns, compute_sort_key, SortKey},
@ -389,14 +388,13 @@ impl TestTable {
Arc::new(schema),
self.catalog.parquet_store.clone(),
);
let rx = chunk
.read_filter(
&Predicate::default(),
chunk
.parquet_exec_input()
.read_to_batches(
chunk.schema().as_arrow(),
Selection::All,
&chunk.store().test_df_context(),
)
.unwrap();
datafusion::physical_plan::common::collect(rx)
.await
.unwrap()
}

View File

@ -1,10 +1,11 @@
//! A metadata summary of a Parquet file in object storage, with the ability to
//! download & execute a scan.
use crate::{storage::ParquetStorage, ParquetFilePath};
use crate::{
storage::{ParquetExecInput, ParquetStorage},
ParquetFilePath,
};
use data_types::{ParquetFile, TimestampMinMax};
use datafusion::{physical_plan::SendableRecordBatchStream, prelude::SessionContext};
use predicate::Predicate;
use schema::{selection::Selection, Schema};
use std::{collections::BTreeSet, mem, sync::Arc};
use uuid::Uuid;
@ -78,21 +79,14 @@ impl ParquetChunk {
}
/// Return stream of data read from parquet file
pub fn read_filter(
&self,
predicate: &Predicate,
selection: Selection<'_>,
session_ctx: &SessionContext,
) -> Result<SendableRecordBatchStream, crate::storage::ReadError> {
/// Inputs for [`ParquetExec`].
///
/// See [`ParquetExecInput`] for more information.
///
/// [`ParquetExec`]: datafusion::physical_plan::file_format::ParquetExec
pub fn parquet_exec_input(&self) -> ParquetExecInput {
let path: ParquetFilePath = self.parquet_file.as_ref().into();
self.store.read_filter(
predicate,
selection,
Arc::clone(&self.schema.as_arrow()),
&path,
self.file_size_bytes(),
session_ctx,
)
self.store.parquet_exec_input(&path, self.file_size_bytes())
}
/// The total number of rows in all row groups in this chunk.

View File

@ -6,27 +6,26 @@ use crate::{
serialize::{self, CodecError},
ParquetFilePath,
};
use arrow::datatypes::{Field, SchemaRef};
use arrow::{
datatypes::{Field, SchemaRef},
record_batch::RecordBatch,
};
use bytes::Bytes;
use datafusion::{
datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl},
error::DataFusionError,
execution::context::TaskContext,
physical_plan::{
execute_stream,
file_format::{FileScanConfig, ParquetExec},
stream::RecordBatchStreamAdapter,
SendableRecordBatchStream, Statistics,
ExecutionPlan, SendableRecordBatchStream, Statistics,
},
prelude::SessionContext,
};
use datafusion_util::config::iox_session_config;
use futures::TryStreamExt;
use object_store::{DynObjectStore, ObjectMeta};
use observability_deps::tracing::*;
use predicate::Predicate;
use schema::selection::{select_schema, Selection};
use std::{
num::TryFromIntError,
sync::Arc,
time::{Duration, Instant},
};
@ -53,38 +52,6 @@ pub enum UploadError {
Upload(#[from] object_store::Error),
}
/// Errors during Parquet file download & scan.
#[derive(Debug, Error)]
#[allow(clippy::large_enum_variant)]
pub enum ReadError {
/// Error writing the bytes fetched from object store to the temporary
/// parquet file on disk.
#[error("i/o error writing downloaded parquet: {0}")]
IO(#[from] std::io::Error),
/// An error fetching Parquet file bytes from object store.
#[error("failed to read data from object store: {0}")]
ObjectStore(#[from] object_store::Error),
/// An error reading the downloaded Parquet file.
#[error("invalid parquet file: {0}")]
Parquet(#[from] parquet::errors::ParquetError),
/// Schema mismatch
#[error("Schema mismatch (expected VS actual parquet file) for file '{path}': {source}")]
SchemaMismatch {
/// Path of the affected parquet file.
path: object_store::path::Path,
/// Source error
source: ProjectionError,
},
/// Malformed integer data for row count
#[error("Malformed row count integer")]
MalformedRowCount(#[from] TryFromIntError),
}
/// ID for an object store hooked up into DataFusion.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub struct StorageId(&'static str);
@ -107,6 +74,68 @@ impl std::fmt::Display for StorageId {
}
}
/// Inputs required to build a [`ParquetExec`] for one or multiple files.
///
/// The files shall be grouped by [`object_store_url`](Self::object_store_url). For each each object store, you shall
/// create one [`ParquetExec`] and put each file into its own "file group".
///
/// [`ParquetExec`]: datafusion::physical_plan::file_format::ParquetExec
#[derive(Debug)]
pub struct ParquetExecInput {
/// Store where the file is located.
pub object_store_url: ObjectStoreUrl,
/// Object metadata.
pub object_meta: ObjectMeta,
}
impl ParquetExecInput {
/// Read parquet file into [`RecordBatch`]es.
///
/// This should only be used for testing purposes.
pub async fn read_to_batches(
&self,
schema: SchemaRef,
selection: Selection<'_>,
session_ctx: &SessionContext,
) -> Result<Vec<RecordBatch>, DataFusionError> {
// Compute final (output) schema after selection
let schema = Arc::new(
select_schema(selection, &schema)
.as_ref()
.clone()
.with_metadata(Default::default()),
);
let base_config = FileScanConfig {
object_store_url: self.object_store_url.clone(),
file_schema: schema,
file_groups: vec![vec![PartitionedFile {
object_meta: self.object_meta.clone(),
partition_values: vec![],
range: None,
extensions: None,
}]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
// TODO avoid this `copied_config` when config_options are directly available on context
config_options: session_ctx.copied_config().config_options(),
};
let exec = ParquetExec::new(base_config, None, None);
let exec_schema = exec.schema();
datafusion::physical_plan::collect(Arc::new(exec), session_ctx.task_ctx())
.await
.map(|batches| {
for batch in &batches {
assert_eq!(batch.schema(), exec_schema);
}
batches
})
}
}
/// The [`ParquetStorage`] type encapsulates [`RecordBatch`] persistence to an
/// underlying [`ObjectStore`].
///
@ -220,72 +249,22 @@ impl ParquetStorage {
Ok((parquet_meta, file_size))
}
/// Pull the Parquet-encoded [`RecordBatch`] at the file path derived from
/// the provided [`ParquetFilePath`].
/// Inputs for [`ParquetExec`].
///
/// The `selection` projection is pushed down to the Parquet deserializer.
/// See [`ParquetExecInput`] for more information.
///
/// This impl fetches the associated Parquet file bytes from object storage,
/// temporarily persisting them to a local temp file to feed to the arrow
/// reader.
///
/// No caching is performed by `read_filter()`, and each call to
/// `read_filter()` will re-download the parquet file unless the underlying
/// object store impl caches the fetched bytes.
///
/// [`RecordBatch`]: arrow::record_batch::RecordBatch
pub fn read_filter(
&self,
predicate: &Predicate,
selection: Selection<'_>,
schema: SchemaRef,
path: &ParquetFilePath,
file_size: usize,
session_ctx: &SessionContext,
) -> Result<SendableRecordBatchStream, ReadError> {
let path = path.object_store_path();
trace!(path=?path, "fetching parquet data for filtered read");
// Compute final (output) schema after selection
let schema = Arc::new(
select_schema(selection, &schema)
.as_ref()
.clone()
.with_metadata(Default::default()),
);
// create ParquetExec node
let object_meta = ObjectMeta {
location: path,
// we don't care about the "last modified" field
last_modified: Default::default(),
size: file_size,
};
let expr = predicate.filter_expr();
let base_config = FileScanConfig {
/// [`ParquetExec`]: datafusion::physical_plan::file_format::ParquetExec
pub fn parquet_exec_input(&self, path: &ParquetFilePath, file_size: usize) -> ParquetExecInput {
ParquetExecInput {
object_store_url: ObjectStoreUrl::parse(format!("iox://{}/", self.id))
.expect("valid object store URL"),
file_schema: Arc::clone(&schema),
file_groups: vec![vec![PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
extensions: None,
}]],
statistics: Statistics::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
// TODO avoid this `copied_config` when config_options are directly available on context
config_options: session_ctx.copied_config().config_options(),
};
let exec = ParquetExec::new(base_config, expr, None);
Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&schema),
futures::stream::once(execute_stream(Arc::new(exec), session_ctx.task_ctx()))
.try_flatten(),
)))
object_meta: ObjectMeta {
location: path.object_store_path(),
// we don't care about the "last modified" field
last_modified: Default::default(),
size: file_size,
},
}
}
}
@ -598,24 +577,13 @@ mod tests {
file_size: usize,
) -> Result<RecordBatch, DataFusionError> {
let path: ParquetFilePath = meta.into();
let rx = store
.read_filter(
&Predicate::default(),
selection,
expected_schema,
&path,
file_size,
&store.test_df_context(),
)
.expect("should read record batches from object store");
let schema = rx.schema();
datafusion::physical_plan::common::collect(rx)
store
.parquet_exec_input(&path, file_size)
.read_to_batches(expected_schema, selection, &store.test_df_context())
.await
.map(|mut batches| {
assert_eq!(batches.len(), 1);
let batch = batches.remove(0);
assert_eq!(batch.schema(), schema);
batch
batches.remove(0)
})
}

View File

@ -340,14 +340,13 @@ pub mod tests {
use arrow::{datatypes::DataType, record_batch::RecordBatch};
use arrow_util::assert_batches_eq;
use data_types::{ColumnType, NamespaceSchema};
use futures::StreamExt;
use iox_query::{
exec::{ExecutorType, IOxSessionContext},
QueryChunk, QueryChunkMeta,
};
use iox_tests::util::{TestCatalog, TestNamespace, TestParquetFileBuilder};
use metric::{Attributes, Observation, RawReporter};
use schema::{builder::SchemaBuilder, selection::Selection, sort::SortKeyBuilder};
use schema::{builder::SchemaBuilder, sort::SortKeyBuilder};
use test_helpers::maybe_start_logging;
use tokio::runtime::Handle;
@ -397,13 +396,9 @@ pub mod tests {
ctx: IOxSessionContext,
) -> Vec<RecordBatch> {
chunk
.read_filter(ctx, &Default::default(), Selection::All)
.unwrap()
.collect::<Vec<_>>()
.data()
.read_to_batches(chunk.schema(), ctx.inner())
.await
.into_iter()
.map(Result::unwrap)
.collect()
}
struct TestData {

View File

@ -1,35 +1,13 @@
use crate::chunk::QuerierChunk;
use data_types::{ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary};
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use datafusion::error::DataFusionError;
use iox_query::{
exec::{stringset::StringSet, IOxSessionContext},
QueryChunk, QueryChunkMeta,
QueryChunk, QueryChunkData, QueryChunkMeta,
};
use observability_deps::tracing::debug;
use predicate::Predicate;
use schema::{selection::Selection, sort::SortKey, Schema};
use snafu::{ResultExt, Snafu};
use std::{any::Any, sync::Arc};
use trace::span::SpanRecorder;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Parquet File Error in chunk {}: {}", chunk_id, source))]
ParquetFileChunk {
source: Box<parquet_file::storage::ReadError>,
chunk_id: ChunkId,
},
#[snafu(display(
"Could not find column name '{}' in read buffer column_values results for chunk {}",
column_name,
chunk_id,
))]
ColumnNameNotFound {
column_name: String,
chunk_id: ChunkId,
},
}
impl QueryChunkMeta for QuerierChunk {
fn summary(&self) -> Arc<TableSummary> {
@ -103,42 +81,8 @@ impl QueryChunk for QuerierChunk {
Ok(None)
}
fn read_filter(
&self,
mut ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, DataFusionError> {
let span_recorder = SpanRecorder::new(
ctx.span()
.map(|span| span.child("QuerierChunk::read_filter")),
);
let delete_predicates: Vec<_> = self
.delete_predicates()
.iter()
.map(|pred| Arc::new(pred.as_ref().clone().into()))
.collect();
ctx.set_metadata("delete_predicates", delete_predicates.len() as i64);
// merge the negated delete predicates into the select predicate
let pred_with_deleted_exprs = predicate.clone().with_delete_predicates(&delete_predicates);
debug!(?pred_with_deleted_exprs, "Merged negated predicate");
ctx.set_metadata("predicate", format!("{}", &pred_with_deleted_exprs));
ctx.set_metadata("projection", format!("{}", selection));
ctx.set_metadata("storage", "parquet");
let chunk_id = self.id();
debug!(?predicate, "parquet read_filter");
// TODO(marco): propagate span all the way down to the object store cache access
let _span_recorder = span_recorder;
self.parquet_chunk
.read_filter(&pred_with_deleted_exprs, selection, ctx.inner())
.map_err(Box::new)
.context(ParquetFileChunkSnafu { chunk_id })
.map_err(|e| DataFusionError::External(Box::new(e)))
fn data(&self) -> QueryChunkData {
QueryChunkData::Parquet(self.parquet_chunk.parquet_exec_input())
}
fn chunk_type(&self) -> &str {

View File

@ -12,7 +12,6 @@ use data_types::{
TableSummary, TimestampMinMax,
};
use datafusion::error::DataFusionError;
use datafusion_util::MemoryStream;
use futures::{stream::FuturesUnordered, TryStreamExt};
use generated_types::{
influxdata::iox::ingester::v1::GetWriteInfoResponse,
@ -25,7 +24,7 @@ use influxdb_iox_client::flight::{
use iox_query::{
exec::{stringset::StringSet, IOxSessionContext},
util::{compute_timenanosecond_min_max, create_basic_summary},
QueryChunk, QueryChunkMeta,
QueryChunk, QueryChunkData, QueryChunkMeta,
};
use iox_time::{Time, TimeProvider};
use metric::{DurationHistogram, Metric};
@ -1111,30 +1110,8 @@ impl QueryChunk for IngesterChunk {
Ok(None)
}
fn read_filter(
&self,
_ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<datafusion::physical_plan::SendableRecordBatchStream, DataFusionError> {
trace!(?predicate, ?selection, input_batches=?self.batches, "Reading data");
// Apply selection to in-memory batch
let batches = match self
.schema
.df_projection(selection)
.map_err(|e| DataFusionError::External(Box::new(e)))?
{
None => self.batches.clone(),
Some(projection) => self
.batches
.iter()
.map(|batch| batch.project(&projection))
.collect::<std::result::Result<Vec<_>, ArrowError>>()?,
};
trace!(?predicate, ?selection, output_batches=?batches, input_batches=?self.batches, "Reading data");
Ok(Box::pin(MemoryStream::new(batches)))
fn data(&self) -> QueryChunkData {
QueryChunkData::RecordBatches(self.batches.clone())
}
fn chunk_type(&self) -> &str {

View File

@ -1,10 +1,8 @@
use super::IngesterConnection;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use data_types::ShardIndex;
use futures::StreamExt;
use generated_types::influxdata::iox::ingester::v1::GetWriteInfoResponse;
use iox_query::{exec::IOxSessionContext, util::create_basic_summary, QueryChunk};
use iox_query::util::create_basic_summary;
use parking_lot::Mutex;
use schema::selection::Selection;
use schema::Schema as IOxSchema;
@ -38,7 +36,7 @@ impl IngesterConnection for MockIngesterConnection {
_namespace_name: Arc<str>,
_table_name: Arc<str>,
columns: Vec<String>,
predicate: &predicate::Predicate,
_predicate: &predicate::Predicate,
_expected_schema: Arc<schema::Schema>,
_span: Option<Span>,
) -> super::Result<Vec<super::IngesterPartition>> {
@ -77,14 +75,14 @@ impl IngesterConnection for MockIngesterConnection {
.chunks
.into_iter()
.map(|ic| async move {
let mut batches: Vec<RecordBatch> = vec![];
let mut stream = ic
.read_filter(IOxSessionContext::with_testing(), predicate, selection)
.expect("Error in read_filter");
while let Some(b) = stream.next().await {
let b = b.expect("Error in stream");
batches.push(b)
}
let batches: Vec<_> = ic
.batches
.iter()
.map(|batch| match ic.schema.df_projection(selection).unwrap() {
Some(projection) => batch.project(&projection).unwrap(),
None => batch.clone(),
})
.collect();
assert!(!batches.is_empty(), "Error: empty batches");
let new_schema = IOxSchema::try_from(batches[0].schema()).unwrap();

View File

@ -520,7 +520,7 @@ mod tests {
use iox_query::exec::IOxSessionContext;
use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable};
use predicate::Predicate;
use schema::{builder::SchemaBuilder, selection::Selection, InfluxFieldType};
use schema::{builder::SchemaBuilder, InfluxFieldType};
use std::sync::Arc;
use test_helpers::maybe_start_logging;
use trace::{span::SpanStatus, RingBufferTraceCollector};
@ -711,8 +711,8 @@ mod tests {
.await
.unwrap();
assert_eq!(chunks.len(), 1);
let chunk = &chunks[0];
assert_eq!(chunk.chunk_type(), "IngesterPartition");
// verify chunk schema
let schema = chunk.schema();
@ -739,17 +739,9 @@ mod tests {
// verify chunk data
let batches = chunk
.read_filter(
IOxSessionContext::with_testing(),
&Default::default(),
Selection::All,
)
.unwrap()
.collect::<Vec<_>>()
.await
.into_iter()
.map(Result::unwrap)
.collect::<Vec<_>>();
.data()
.read_to_batches(chunk.schema(), IOxSessionContext::with_testing().inner())
.await;
let expected = vec![
"+-----+------+------+--------------------------------+",
"| foo | tag1 | tag2 | time |",