From 07772e8d2254fb734e7f826298559658a4964015 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 16 Dec 2022 16:32:30 -0500 Subject: [PATCH] fix: Always return a PartitionRecord which maybe streams record batches Connects to #6421. Even if the ingester doesn't have data in memory for a query, we need to send back metadata about the ingester UUID and the number of files persisted so that the querier can decide whether it needs to refresh the cache. --- ingester2/src/buffer_tree/root.rs | 9 ++--- ingester2/src/buffer_tree/table.rs | 40 +++++++++++---------- ingester2/src/query/partition_response.rs | 19 ++++++++-- ingester2/src/query/response.rs | 3 +- ingester2/src/server/grpc/query.rs | 43 +++++++++++++---------- 5 files changed, 69 insertions(+), 45 deletions(-) diff --git a/ingester2/src/buffer_tree/root.rs b/ingester2/src/buffer_tree/root.rs index 76c27e9f12..b4f5177977 100644 --- a/ingester2/src/buffer_tree/root.rs +++ b/ingester2/src/buffer_tree/root.rs @@ -1041,10 +1041,11 @@ mod tests { let partition = partitions.pop().unwrap(); // Perform the partition read - let batches = - datafusion::physical_plan::common::collect(partition.into_record_batch_stream()) - .await - .expect("failed to collate query results"); + let batches = datafusion::physical_plan::common::collect( + partition.into_record_batch_stream().unwrap(), + ) + .await + .expect("failed to collate query results"); // Assert the contents of p1 contains both the initial write, and the // 3rd write in a single RecordBatch. diff --git a/ingester2/src/buffer_tree/table.rs b/ingester2/src/buffer_tree/table.rs index 287bc9653f..e109807710 100644 --- a/ingester2/src/buffer_tree/table.rs +++ b/ingester2/src/buffer_tree/table.rs @@ -267,7 +267,7 @@ where ); // Gather the partition data from all of the partitions in this table. - let partitions = self.partitions().into_iter().filter_map(move |p| { + let partitions = self.partitions().into_iter().map(move |p| { let mut span = SpanRecorder::new(span.clone().map(|s| s.child("partition read"))); let (id, completed_persistence_count, data) = { @@ -275,30 +275,32 @@ where ( p.partition_id(), p.completed_persistence_count(), - p.get_query_data()?, + p.get_query_data(), ) }; - assert_eq!(id, data.partition_id()); - // Project the data if necessary - let columns = columns.iter().map(String::as_str).collect::>(); - let selection = if columns.is_empty() { - Projection::All - } else { - Projection::Some(columns.as_ref()) + let ret = match data { + Some(data) => { + assert_eq!(id, data.partition_id()); + + // Project the data if necessary + let columns = columns.iter().map(String::as_str).collect::>(); + let selection = if columns.is_empty() { + Projection::All + } else { + Projection::Some(columns.as_ref()) + }; + + let data = Box::pin(MemoryStream::new( + data.project_selection(selection).into_iter().collect(), + )); + PartitionResponse::new(data, id, None, completed_persistence_count) + } + None => PartitionResponse::new_no_batches(id, None, completed_persistence_count), }; - let ret = PartitionResponse::new( - Box::pin(MemoryStream::new( - data.project_selection(selection).into_iter().collect(), - )), - id, - None, - completed_persistence_count, - ); - span.ok("read partition data"); - Some(ret) + ret }); Ok(PartitionStream::new(futures::stream::iter(partitions))) diff --git a/ingester2/src/query/partition_response.rs b/ingester2/src/query/partition_response.rs index cd642c9bb4..150f0f6f40 100644 --- a/ingester2/src/query/partition_response.rs +++ b/ingester2/src/query/partition_response.rs @@ -8,7 +8,7 @@ use datafusion::physical_plan::SendableRecordBatchStream; /// Response data for a single partition. pub(crate) struct PartitionResponse { /// Stream of snapshots. - batches: SendableRecordBatchStream, + batches: Option, /// Partition ID. id: PartitionId, @@ -42,7 +42,20 @@ impl PartitionResponse { completed_persistence_count: u64, ) -> Self { Self { - batches, + batches: Some(batches), + id, + max_persisted_sequence_number, + completed_persistence_count, + } + } + + pub(crate) fn new_no_batches( + id: PartitionId, + max_persisted_sequence_number: Option, + completed_persistence_count: u64, + ) -> Self { + Self { + batches: None, id, max_persisted_sequence_number, completed_persistence_count, @@ -61,7 +74,7 @@ impl PartitionResponse { self.completed_persistence_count } - pub(crate) fn into_record_batch_stream(self) -> SendableRecordBatchStream { + pub(crate) fn into_record_batch_stream(self) -> Option { self.batches } } diff --git a/ingester2/src/query/response.rs b/ingester2/src/query/response.rs index 5e9533c65b..b1bf77ff33 100644 --- a/ingester2/src/query/response.rs +++ b/ingester2/src/query/response.rs @@ -51,6 +51,7 @@ impl QueryResponse { /// Reduce the [`QueryResponse`] to a stream of [`RecordBatch`]. pub(crate) fn into_record_batches(self) -> impl Stream> { self.into_partition_stream() - .flat_map(|partition| partition.into_record_batch_stream()) + .flat_map(|partition| futures::stream::iter(partition.into_record_batch_stream())) + .flatten() } } diff --git a/ingester2/src/server/grpc/query.rs b/ingester2/src/server/grpc/query.rs index 3f1de02781..3693a9ec06 100644 --- a/ingester2/src/server/grpc/query.rs +++ b/ingester2/src/server/grpc/query.rs @@ -322,21 +322,25 @@ impl From for FlatIngesterQueryResponseStream { completed_persistence_count, }) }); - let tail = partition - .into_record_batch_stream() - .flat_map(|snapshot_res| match snapshot_res { - Ok(snapshot) => { - let schema = Arc::new(prepare_schema_for_flight(&snapshot.schema())); - let schema_captured = Arc::clone(&schema); - let head = futures::stream::once(async { - Ok(FlatIngesterQueryResponse::StartSnapshot { - schema: schema_captured, - }) - }); + match partition.into_record_batch_stream() { + Some(stream) => { + let tail = stream.flat_map(|snapshot_res| match snapshot_res { + Ok(snapshot) => { + let schema = + Arc::new(prepare_schema_for_flight(&snapshot.schema())); - let tail = - match prepare_batch_for_flight(&snapshot, Arc::clone(&schema)) { + let schema_captured = Arc::clone(&schema); + let head = futures::stream::once(async { + Ok(FlatIngesterQueryResponse::StartSnapshot { + schema: schema_captured, + }) + }); + + let tail = match prepare_batch_for_flight( + &snapshot, + Arc::clone(&schema), + ) { Ok(batch) => { futures::stream::iter(split_batch_for_grpc_response(batch)) .map(|batch| { @@ -347,12 +351,15 @@ impl From for FlatIngesterQueryResponseStream { Err(e) => futures::stream::once(async { Err(e) }).boxed(), }; - head.chain(tail).boxed() - } - Err(e) => futures::stream::once(async { Err(e) }).boxed(), - }); + head.chain(tail).boxed() + } + Err(e) => futures::stream::once(async { Err(e) }).boxed(), + }); - head.chain(tail).boxed() + head.chain(tail).boxed() + } + None => head.boxed(), + } }) .boxed() }