From 74b1a5e368abe756c114fe4dbfd081c5257aa38e Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 19 Sep 2023 13:53:30 +0200 Subject: [PATCH] refactor: allow streaming record batches into query For #8350, we won't have all the record batches from the ingester during planning but we'll stream them during the execution. Technically the DF plan is already based on streams, it's just `QueryChunkData` that required a materialized `Vec`. This change moves the stream creation up so a chunk can decide to either use `QueryChunkData::in_mem` (which conveniently creates the stream) or it can provide its own stream. --- ingester/src/query_adaptor.rs | 3 +- iox_query/src/lib.rs | 31 +++++++----- iox_query/src/provider/adapter.rs | 55 ++++++--------------- iox_query/src/provider/record_batch_exec.rs | 50 ++++--------------- iox_query/src/test.rs | 29 +++++++---- querier/src/ingester/mod.rs | 2 +- 6 files changed, 68 insertions(+), 102 deletions(-) diff --git a/ingester/src/query_adaptor.rs b/ingester/src/query_adaptor.rs index 0bbfd55651..4d10cb7b10 100644 --- a/ingester/src/query_adaptor.rs +++ b/ingester/src/query_adaptor.rs @@ -139,11 +139,12 @@ impl QueryChunk for QueryAdaptor { fn data(&self) -> QueryChunkData { let schema = self.schema().as_arrow(); - QueryChunkData::RecordBatches( + QueryChunkData::in_mem( self.data .iter() .map(|b| ensure_schema(&schema, b).expect("schema handling broken")) .collect(), + Arc::clone(self.schema.inner()), ) } diff --git a/iox_query/src/lib.rs b/iox_query/src/lib.rs index 258dc77d39..4ef0eebe2b 100644 --- a/iox_query/src/lib.rs +++ b/iox_query/src/lib.rs @@ -12,18 +12,20 @@ unused_crate_dependencies )] +use datafusion_util::MemoryStream; +use futures::TryStreamExt; // Workaround for "unused crate" lint false positives. use workspace_hack as _; use arrow::{ - datatypes::{DataType, Field}, + datatypes::{DataType, Field, SchemaRef}, record_batch::RecordBatch, }; use async_trait::async_trait; use data_types::{ChunkId, ChunkOrder, TransitionPartitionId}; use datafusion::{ error::DataFusionError, - physical_plan::Statistics, + physical_plan::{SendableRecordBatchStream, Statistics}, prelude::{Expr, SessionContext}, }; use exec::IOxSessionContext; @@ -189,12 +191,9 @@ pub trait QueryNamespace: Debug + Send + Sync { } /// Raw data of a [`QueryChunk`]. -#[derive(Debug, Clone)] pub enum QueryChunkData { - /// In-memory record batches. - /// - /// **IMPORTANT: All batches MUST have the schema that the [chunk reports](QueryChunk::schema).** - RecordBatches(Vec), + /// Record batches. + RecordBatches(SendableRecordBatchStream), /// Parquet file. /// @@ -210,7 +209,7 @@ impl QueryChunkData { session_ctx: &SessionContext, ) -> Vec { match self { - Self::RecordBatches(batches) => batches, + Self::RecordBatches(batches) => batches.try_collect::>().await.unwrap(), Self::Parquet(exec_input) => exec_input .read_to_batches(schema.as_arrow(), Projection::All, session_ctx) .await @@ -218,11 +217,19 @@ impl QueryChunkData { } } - /// Extract [record batches](Self::RecordBatches) variant. - pub fn into_record_batches(self) -> Option> { + /// Create data based on batches and schema. + pub fn in_mem(batches: Vec, schema: SchemaRef) -> Self { + let s = MemoryStream::new_with_schema(batches, schema); + let s: SendableRecordBatchStream = Box::pin(s); + Self::RecordBatches(s) + } +} + +impl std::fmt::Debug for QueryChunkData { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::RecordBatches(batches) => Some(batches), - Self::Parquet(_) => None, + Self::RecordBatches(_) => f.debug_tuple("RecordBatches").field(&"").finish(), + Self::Parquet(input) => f.debug_tuple("Parquet").field(input).finish(), } } } diff --git a/iox_query/src/provider/adapter.rs b/iox_query/src/provider/adapter.rs index f7ec713ea9..5d928852d4 100644 --- a/iox_query/src/provider/adapter.rs +++ b/iox_query/src/provider/adapter.rs @@ -19,16 +19,6 @@ use futures::Stream; #[allow(clippy::enum_variant_names)] #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display( - "Internal error creating SchemaAdapterStream: field '{}' does not appear in the output schema, known fields are: {:?}", - field_name, - known_fields, - ))] - InternalLostInputField { - field_name: String, - known_fields: Vec, - }, - #[snafu(display("Internal error creating SchemaAdapterStream: input field '{}' had type '{:?}' which is different than output field '{}' which had type '{:?}'", input_field_name, input_field_type, output_field_name, output_field_type,))] InternalDataTypeMismatch { @@ -152,29 +142,6 @@ impl SchemaAdapterStream { }) .collect::>(); - // sanity logic checks - for input_field in input_schema.fields() { - // that there are no fields in the input schema that are - // not present in the desired output schema (otherwise we - // are dropping fields -- theys should have been selected - // out with projection push down) - if !output_schema - .fields() - .iter() - .any(|output_field| input_field.name() == output_field.name()) - { - return InternalLostInputFieldSnafu { - field_name: input_field.name(), - known_fields: output_schema - .fields() - .iter() - .map(|f| f.name().clone()) - .collect::>(), - } - .fail(); - } - } - // Verify the mappings match the output type for (output_index, mapping) in mappings.iter().enumerate() { let output_field = output_schema.field(output_index); @@ -417,17 +384,27 @@ mod tests { Field::new("a", DataType::Int32, false), ])); let input_stream = stream_from_batch(batch.schema(), batch); - let res = SchemaAdapterStream::try_new( + let adapter_stream = SchemaAdapterStream::try_new( input_stream, output_schema, &Default::default(), baseline_metrics(), - ); + ) + .unwrap(); - assert_contains!( - res.unwrap_err().to_string(), - "field 'b' does not appear in the output schema" - ); + let output = collect(Box::pin(adapter_stream)) + .await + .expect("Running plan"); + let expected = vec![ + "+-----+---+", + "| c | a |", + "+-----+---+", + "| foo | 1 |", + "| bar | 2 |", + "| baz | 3 |", + "+-----+---+", + ]; + assert_batches_eq!(&expected, &output); } #[tokio::test] diff --git a/iox_query/src/provider/record_batch_exec.rs b/iox_query/src/provider/record_batch_exec.rs index a1007fb45b..9b3c591eda 100644 --- a/iox_query/src/provider/record_batch_exec.rs +++ b/iox_query/src/provider/record_batch_exec.rs @@ -9,7 +9,6 @@ use datafusion::{ execution::context::TaskContext, physical_plan::{ expressions::{Column, PhysicalSortExpr}, - memory::MemoryStream, metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, @@ -18,11 +17,7 @@ use datafusion::{ }; use observability_deps::tracing::trace; use schema::sort::SortKey; -use std::{ - collections::{HashMap, HashSet}, - fmt, - sync::Arc, -}; +use std::{collections::HashMap, fmt, sync::Arc}; /// Implements the DataFusion physical plan interface for [`RecordBatch`]es with automatic projection and NULL-column creation. /// @@ -178,47 +173,22 @@ impl ExecutionPlan for RecordBatchesExec { let schema = self.schema(); let chunk = &self.chunks[partition]; - let part_schema = chunk.schema().as_arrow(); - // The output selection is all the columns in the schema. - // - // However, this chunk may not have all those columns. Thus we - // restrict the requested selection to the actual columns - // available, and use SchemaAdapterStream to pad the rest of - // the columns with NULLs if necessary - let final_output_column_names: HashSet<_> = - schema.fields().iter().map(|f| f.name()).collect(); - let projection: Vec<_> = part_schema - .fields() - .iter() - .enumerate() - .filter(|(_idx, field)| final_output_column_names.contains(field.name())) - .map(|(idx, _)| idx) - .collect(); - let projection = (!((projection.len() == part_schema.fields().len()) - && (projection.iter().enumerate().all(|(a, b)| a == *b)))) - .then_some(projection); - let incomplete_output_schema = projection - .as_ref() - .map(|projection| Arc::new(part_schema.project(projection).expect("projection broken"))) - .unwrap_or(part_schema); - - let batches = chunk.data().into_record_batches().ok_or_else(|| { - DataFusionError::Execution(String::from("chunk must contain record batches")) - })?; - - let stream = Box::pin(MemoryStream::try_new( - batches.clone(), - incomplete_output_schema, - projection, - )?); + let stream = match chunk.data() { + crate::QueryChunkData::RecordBatches(stream) => stream, + crate::QueryChunkData::Parquet(_) => { + return Err(DataFusionError::Execution(String::from( + "chunk must contain record batches", + ))); + } + }; let virtual_columns = HashMap::from([( CHUNK_ORDER_COLUMN_NAME, ScalarValue::from(chunk.order().get()), )]); let adapter = Box::pin( SchemaAdapterStream::try_new(stream, schema, &virtual_columns, baseline_metrics) - .map_err(|e| DataFusionError::Internal(e.to_string()))?, + .map_err(|e| DataFusionError::External(Box::new(e)))?, ); trace!(partition, "End RecordBatchesExec::execute"); diff --git a/iox_query/src/test.rs b/iox_query/src/test.rs index d6e07b523b..05ffd796da 100644 --- a/iox_query/src/test.rs +++ b/iox_query/src/test.rs @@ -282,6 +282,12 @@ impl TableProvider for TestDatabaseTableProvider { } } +#[derive(Debug)] +enum TestChunkData { + RecordBatches(Vec), + Parquet(ParquetExecInput), +} + #[derive(Debug)] pub struct TestChunk { /// Table name @@ -302,7 +308,7 @@ pub struct TestChunk { may_contain_pk_duplicates: bool, /// Data in this chunk. - table_data: QueryChunkData, + table_data: TestChunkData, /// A saved error that is returned instead of actual results saved_error: Option, @@ -372,7 +378,7 @@ impl TestChunk { num_rows: None, id: ChunkId::new_test(0), may_contain_pk_duplicates: Default::default(), - table_data: QueryChunkData::RecordBatches(vec![]), + table_data: TestChunkData::RecordBatches(vec![]), saved_error: Default::default(), order: ChunkOrder::MIN, sort_key: None, @@ -383,10 +389,10 @@ impl TestChunk { fn push_record_batch(&mut self, batch: RecordBatch) { match &mut self.table_data { - QueryChunkData::RecordBatches(batches) => { + TestChunkData::RecordBatches(batches) => { batches.push(batch); } - QueryChunkData::Parquet(_) => panic!("chunk is parquet-based"), + TestChunkData::Parquet(_) => panic!("chunk is parquet-based"), } } @@ -403,14 +409,14 @@ impl TestChunk { pub fn with_dummy_parquet_file_and_store(self, store: &str) -> Self { match self.table_data { - QueryChunkData::RecordBatches(batches) => { + TestChunkData::RecordBatches(batches) => { assert!(batches.is_empty(), "chunk already has record batches"); } - QueryChunkData::Parquet(_) => panic!("chunk already has a file"), + TestChunkData::Parquet(_) => panic!("chunk already has a file"), } Self { - table_data: QueryChunkData::Parquet(ParquetExecInput { + table_data: TestChunkData::Parquet(ParquetExecInput { object_store_url: ObjectStoreUrl::parse(store).unwrap(), object_meta: ObjectMeta { location: Self::parquet_location(self.id), @@ -436,7 +442,7 @@ impl TestChunk { pub fn with_id(mut self, id: u128) -> Self { self.id = ChunkId::new_test(id); - if let QueryChunkData::Parquet(parquet_input) = &mut self.table_data { + if let TestChunkData::Parquet(parquet_input) = &mut self.table_data { parquet_input.object_meta.location = Self::parquet_location(self.id); } @@ -1078,7 +1084,12 @@ impl QueryChunk for TestChunk { fn data(&self) -> QueryChunkData { self.check_error().unwrap(); - self.table_data.clone() + match &self.table_data { + TestChunkData::RecordBatches(batches) => { + QueryChunkData::in_mem(batches.clone(), Arc::clone(self.schema.inner())) + } + TestChunkData::Parquet(input) => QueryChunkData::Parquet(input.clone()), + } } fn chunk_type(&self) -> &str { diff --git a/querier/src/ingester/mod.rs b/querier/src/ingester/mod.rs index a62d598150..36db5d34f5 100644 --- a/querier/src/ingester/mod.rs +++ b/querier/src/ingester/mod.rs @@ -954,7 +954,7 @@ impl QueryChunk for IngesterChunk { } fn data(&self) -> QueryChunkData { - QueryChunkData::RecordBatches(self.batches.clone()) + QueryChunkData::in_mem(self.batches.clone(), Arc::clone(self.schema.inner())) } fn chunk_type(&self) -> &str {