refactor: allow streaming record batches into query

For #8350, we won't have all the record batches from the ingester during
planning but we'll stream them during the execution. Technically the
DF plan is already based on streams, it's just `QueryChunkData` that
required a materialized `Vec<RecordBatch>`. This change moves the stream
creation up so a chunk can decide to either use `QueryChunkData::in_mem`
(which conveniently creates the stream) or it can provide its own
stream.
pull/24376/head
Marco Neumann 2023-09-19 13:53:30 +02:00
parent ca791386eb
commit 74b1a5e368
6 changed files with 68 additions and 102 deletions

View File

@ -139,11 +139,12 @@ impl QueryChunk for QueryAdaptor {
fn data(&self) -> QueryChunkData { fn data(&self) -> QueryChunkData {
let schema = self.schema().as_arrow(); let schema = self.schema().as_arrow();
QueryChunkData::RecordBatches( QueryChunkData::in_mem(
self.data self.data
.iter() .iter()
.map(|b| ensure_schema(&schema, b).expect("schema handling broken")) .map(|b| ensure_schema(&schema, b).expect("schema handling broken"))
.collect(), .collect(),
Arc::clone(self.schema.inner()),
) )
} }

View File

@ -12,18 +12,20 @@
unused_crate_dependencies unused_crate_dependencies
)] )]
use datafusion_util::MemoryStream;
use futures::TryStreamExt;
// Workaround for "unused crate" lint false positives. // Workaround for "unused crate" lint false positives.
use workspace_hack as _; use workspace_hack as _;
use arrow::{ use arrow::{
datatypes::{DataType, Field}, datatypes::{DataType, Field, SchemaRef},
record_batch::RecordBatch, record_batch::RecordBatch,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use data_types::{ChunkId, ChunkOrder, TransitionPartitionId}; use data_types::{ChunkId, ChunkOrder, TransitionPartitionId};
use datafusion::{ use datafusion::{
error::DataFusionError, error::DataFusionError,
physical_plan::Statistics, physical_plan::{SendableRecordBatchStream, Statistics},
prelude::{Expr, SessionContext}, prelude::{Expr, SessionContext},
}; };
use exec::IOxSessionContext; use exec::IOxSessionContext;
@ -189,12 +191,9 @@ pub trait QueryNamespace: Debug + Send + Sync {
} }
/// Raw data of a [`QueryChunk`]. /// Raw data of a [`QueryChunk`].
#[derive(Debug, Clone)]
pub enum QueryChunkData { pub enum QueryChunkData {
/// In-memory record batches. /// Record batches.
/// RecordBatches(SendableRecordBatchStream),
/// **IMPORTANT: All batches MUST have the schema that the [chunk reports](QueryChunk::schema).**
RecordBatches(Vec<RecordBatch>),
/// Parquet file. /// Parquet file.
/// ///
@ -210,7 +209,7 @@ impl QueryChunkData {
session_ctx: &SessionContext, session_ctx: &SessionContext,
) -> Vec<RecordBatch> { ) -> Vec<RecordBatch> {
match self { match self {
Self::RecordBatches(batches) => batches, Self::RecordBatches(batches) => batches.try_collect::<Vec<_>>().await.unwrap(),
Self::Parquet(exec_input) => exec_input Self::Parquet(exec_input) => exec_input
.read_to_batches(schema.as_arrow(), Projection::All, session_ctx) .read_to_batches(schema.as_arrow(), Projection::All, session_ctx)
.await .await
@ -218,11 +217,19 @@ impl QueryChunkData {
} }
} }
/// Extract [record batches](Self::RecordBatches) variant. /// Create data based on batches and schema.
pub fn into_record_batches(self) -> Option<Vec<RecordBatch>> { pub fn in_mem(batches: Vec<RecordBatch>, schema: SchemaRef) -> Self {
let s = MemoryStream::new_with_schema(batches, schema);
let s: SendableRecordBatchStream = Box::pin(s);
Self::RecordBatches(s)
}
}
impl std::fmt::Debug for QueryChunkData {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self { match self {
Self::RecordBatches(batches) => Some(batches), Self::RecordBatches(_) => f.debug_tuple("RecordBatches").field(&"<stream>").finish(),
Self::Parquet(_) => None, Self::Parquet(input) => f.debug_tuple("Parquet").field(input).finish(),
} }
} }
} }

View File

@ -19,16 +19,6 @@ use futures::Stream;
#[allow(clippy::enum_variant_names)] #[allow(clippy::enum_variant_names)]
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display(
"Internal error creating SchemaAdapterStream: field '{}' does not appear in the output schema, known fields are: {:?}",
field_name,
known_fields,
))]
InternalLostInputField {
field_name: String,
known_fields: Vec<String>,
},
#[snafu(display("Internal error creating SchemaAdapterStream: input field '{}' had type '{:?}' which is different than output field '{}' which had type '{:?}'", #[snafu(display("Internal error creating SchemaAdapterStream: input field '{}' had type '{:?}' which is different than output field '{}' which had type '{:?}'",
input_field_name, input_field_type, output_field_name, output_field_type,))] input_field_name, input_field_type, output_field_name, output_field_type,))]
InternalDataTypeMismatch { InternalDataTypeMismatch {
@ -152,29 +142,6 @@ impl SchemaAdapterStream {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// sanity logic checks
for input_field in input_schema.fields() {
// that there are no fields in the input schema that are
// not present in the desired output schema (otherwise we
// are dropping fields -- theys should have been selected
// out with projection push down)
if !output_schema
.fields()
.iter()
.any(|output_field| input_field.name() == output_field.name())
{
return InternalLostInputFieldSnafu {
field_name: input_field.name(),
known_fields: output_schema
.fields()
.iter()
.map(|f| f.name().clone())
.collect::<Vec<String>>(),
}
.fail();
}
}
// Verify the mappings match the output type // Verify the mappings match the output type
for (output_index, mapping) in mappings.iter().enumerate() { for (output_index, mapping) in mappings.iter().enumerate() {
let output_field = output_schema.field(output_index); let output_field = output_schema.field(output_index);
@ -417,17 +384,27 @@ mod tests {
Field::new("a", DataType::Int32, false), Field::new("a", DataType::Int32, false),
])); ]));
let input_stream = stream_from_batch(batch.schema(), batch); let input_stream = stream_from_batch(batch.schema(), batch);
let res = SchemaAdapterStream::try_new( let adapter_stream = SchemaAdapterStream::try_new(
input_stream, input_stream,
output_schema, output_schema,
&Default::default(), &Default::default(),
baseline_metrics(), baseline_metrics(),
); )
.unwrap();
assert_contains!( let output = collect(Box::pin(adapter_stream))
res.unwrap_err().to_string(), .await
"field 'b' does not appear in the output schema" .expect("Running plan");
); let expected = vec![
"+-----+---+",
"| c | a |",
"+-----+---+",
"| foo | 1 |",
"| bar | 2 |",
"| baz | 3 |",
"+-----+---+",
];
assert_batches_eq!(&expected, &output);
} }
#[tokio::test] #[tokio::test]

View File

@ -9,7 +9,6 @@ use datafusion::{
execution::context::TaskContext, execution::context::TaskContext,
physical_plan::{ physical_plan::{
expressions::{Column, PhysicalSortExpr}, expressions::{Column, PhysicalSortExpr},
memory::MemoryStream,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, ColumnStatistics, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
SendableRecordBatchStream, Statistics, SendableRecordBatchStream, Statistics,
@ -18,11 +17,7 @@ use datafusion::{
}; };
use observability_deps::tracing::trace; use observability_deps::tracing::trace;
use schema::sort::SortKey; use schema::sort::SortKey;
use std::{ use std::{collections::HashMap, fmt, sync::Arc};
collections::{HashMap, HashSet},
fmt,
sync::Arc,
};
/// Implements the DataFusion physical plan interface for [`RecordBatch`]es with automatic projection and NULL-column creation. /// Implements the DataFusion physical plan interface for [`RecordBatch`]es with automatic projection and NULL-column creation.
/// ///
@ -178,47 +173,22 @@ impl ExecutionPlan for RecordBatchesExec {
let schema = self.schema(); let schema = self.schema();
let chunk = &self.chunks[partition]; let chunk = &self.chunks[partition];
let part_schema = chunk.schema().as_arrow();
// The output selection is all the columns in the schema. let stream = match chunk.data() {
// crate::QueryChunkData::RecordBatches(stream) => stream,
// However, this chunk may not have all those columns. Thus we crate::QueryChunkData::Parquet(_) => {
// restrict the requested selection to the actual columns return Err(DataFusionError::Execution(String::from(
// available, and use SchemaAdapterStream to pad the rest of "chunk must contain record batches",
// the columns with NULLs if necessary )));
let final_output_column_names: HashSet<_> = }
schema.fields().iter().map(|f| f.name()).collect(); };
let projection: Vec<_> = part_schema
.fields()
.iter()
.enumerate()
.filter(|(_idx, field)| final_output_column_names.contains(field.name()))
.map(|(idx, _)| idx)
.collect();
let projection = (!((projection.len() == part_schema.fields().len())
&& (projection.iter().enumerate().all(|(a, b)| a == *b))))
.then_some(projection);
let incomplete_output_schema = projection
.as_ref()
.map(|projection| Arc::new(part_schema.project(projection).expect("projection broken")))
.unwrap_or(part_schema);
let batches = chunk.data().into_record_batches().ok_or_else(|| {
DataFusionError::Execution(String::from("chunk must contain record batches"))
})?;
let stream = Box::pin(MemoryStream::try_new(
batches.clone(),
incomplete_output_schema,
projection,
)?);
let virtual_columns = HashMap::from([( let virtual_columns = HashMap::from([(
CHUNK_ORDER_COLUMN_NAME, CHUNK_ORDER_COLUMN_NAME,
ScalarValue::from(chunk.order().get()), ScalarValue::from(chunk.order().get()),
)]); )]);
let adapter = Box::pin( let adapter = Box::pin(
SchemaAdapterStream::try_new(stream, schema, &virtual_columns, baseline_metrics) SchemaAdapterStream::try_new(stream, schema, &virtual_columns, baseline_metrics)
.map_err(|e| DataFusionError::Internal(e.to_string()))?, .map_err(|e| DataFusionError::External(Box::new(e)))?,
); );
trace!(partition, "End RecordBatchesExec::execute"); trace!(partition, "End RecordBatchesExec::execute");

View File

@ -282,6 +282,12 @@ impl TableProvider for TestDatabaseTableProvider {
} }
} }
#[derive(Debug)]
enum TestChunkData {
RecordBatches(Vec<RecordBatch>),
Parquet(ParquetExecInput),
}
#[derive(Debug)] #[derive(Debug)]
pub struct TestChunk { pub struct TestChunk {
/// Table name /// Table name
@ -302,7 +308,7 @@ pub struct TestChunk {
may_contain_pk_duplicates: bool, may_contain_pk_duplicates: bool,
/// Data in this chunk. /// Data in this chunk.
table_data: QueryChunkData, table_data: TestChunkData,
/// A saved error that is returned instead of actual results /// A saved error that is returned instead of actual results
saved_error: Option<String>, saved_error: Option<String>,
@ -372,7 +378,7 @@ impl TestChunk {
num_rows: None, num_rows: None,
id: ChunkId::new_test(0), id: ChunkId::new_test(0),
may_contain_pk_duplicates: Default::default(), may_contain_pk_duplicates: Default::default(),
table_data: QueryChunkData::RecordBatches(vec![]), table_data: TestChunkData::RecordBatches(vec![]),
saved_error: Default::default(), saved_error: Default::default(),
order: ChunkOrder::MIN, order: ChunkOrder::MIN,
sort_key: None, sort_key: None,
@ -383,10 +389,10 @@ impl TestChunk {
fn push_record_batch(&mut self, batch: RecordBatch) { fn push_record_batch(&mut self, batch: RecordBatch) {
match &mut self.table_data { match &mut self.table_data {
QueryChunkData::RecordBatches(batches) => { TestChunkData::RecordBatches(batches) => {
batches.push(batch); batches.push(batch);
} }
QueryChunkData::Parquet(_) => panic!("chunk is parquet-based"), TestChunkData::Parquet(_) => panic!("chunk is parquet-based"),
} }
} }
@ -403,14 +409,14 @@ impl TestChunk {
pub fn with_dummy_parquet_file_and_store(self, store: &str) -> Self { pub fn with_dummy_parquet_file_and_store(self, store: &str) -> Self {
match self.table_data { match self.table_data {
QueryChunkData::RecordBatches(batches) => { TestChunkData::RecordBatches(batches) => {
assert!(batches.is_empty(), "chunk already has record batches"); assert!(batches.is_empty(), "chunk already has record batches");
} }
QueryChunkData::Parquet(_) => panic!("chunk already has a file"), TestChunkData::Parquet(_) => panic!("chunk already has a file"),
} }
Self { Self {
table_data: QueryChunkData::Parquet(ParquetExecInput { table_data: TestChunkData::Parquet(ParquetExecInput {
object_store_url: ObjectStoreUrl::parse(store).unwrap(), object_store_url: ObjectStoreUrl::parse(store).unwrap(),
object_meta: ObjectMeta { object_meta: ObjectMeta {
location: Self::parquet_location(self.id), location: Self::parquet_location(self.id),
@ -436,7 +442,7 @@ impl TestChunk {
pub fn with_id(mut self, id: u128) -> Self { pub fn with_id(mut self, id: u128) -> Self {
self.id = ChunkId::new_test(id); self.id = ChunkId::new_test(id);
if let QueryChunkData::Parquet(parquet_input) = &mut self.table_data { if let TestChunkData::Parquet(parquet_input) = &mut self.table_data {
parquet_input.object_meta.location = Self::parquet_location(self.id); parquet_input.object_meta.location = Self::parquet_location(self.id);
} }
@ -1078,7 +1084,12 @@ impl QueryChunk for TestChunk {
fn data(&self) -> QueryChunkData { fn data(&self) -> QueryChunkData {
self.check_error().unwrap(); self.check_error().unwrap();
self.table_data.clone() match &self.table_data {
TestChunkData::RecordBatches(batches) => {
QueryChunkData::in_mem(batches.clone(), Arc::clone(self.schema.inner()))
}
TestChunkData::Parquet(input) => QueryChunkData::Parquet(input.clone()),
}
} }
fn chunk_type(&self) -> &str { fn chunk_type(&self) -> &str {

View File

@ -954,7 +954,7 @@ impl QueryChunk for IngesterChunk {
} }
fn data(&self) -> QueryChunkData { fn data(&self) -> QueryChunkData {
QueryChunkData::RecordBatches(self.batches.clone()) QueryChunkData::in_mem(self.batches.clone(), Arc::clone(self.schema.inner()))
} }
fn chunk_type(&self) -> &str { fn chunk_type(&self) -> &str {