fix: Always return a PartitionRecord which maybe streams record batches

Connects to #6421.

Even if the ingester doesn't have data in memory for a query, we need to
send back metadata about the ingester UUID and the number of files
persisted so that the querier can decide whether it needs to refresh the
cache.
pull/24376/head
Carol (Nichols || Goulding) 2022-12-16 16:32:30 -05:00
parent 473ce7a268
commit 07772e8d22
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
5 changed files with 69 additions and 45 deletions

View File

@ -1041,10 +1041,11 @@ mod tests {
let partition = partitions.pop().unwrap();
// Perform the partition read
let batches =
datafusion::physical_plan::common::collect(partition.into_record_batch_stream())
.await
.expect("failed to collate query results");
let batches = datafusion::physical_plan::common::collect(
partition.into_record_batch_stream().unwrap(),
)
.await
.expect("failed to collate query results");
// Assert the contents of p1 contains both the initial write, and the
// 3rd write in a single RecordBatch.

View File

@ -267,7 +267,7 @@ where
);
// Gather the partition data from all of the partitions in this table.
let partitions = self.partitions().into_iter().filter_map(move |p| {
let partitions = self.partitions().into_iter().map(move |p| {
let mut span = SpanRecorder::new(span.clone().map(|s| s.child("partition read")));
let (id, completed_persistence_count, data) = {
@ -275,30 +275,32 @@ where
(
p.partition_id(),
p.completed_persistence_count(),
p.get_query_data()?,
p.get_query_data(),
)
};
assert_eq!(id, data.partition_id());
// Project the data if necessary
let columns = columns.iter().map(String::as_str).collect::<Vec<_>>();
let selection = if columns.is_empty() {
Projection::All
} else {
Projection::Some(columns.as_ref())
let ret = match data {
Some(data) => {
assert_eq!(id, data.partition_id());
// Project the data if necessary
let columns = columns.iter().map(String::as_str).collect::<Vec<_>>();
let selection = if columns.is_empty() {
Projection::All
} else {
Projection::Some(columns.as_ref())
};
let data = Box::pin(MemoryStream::new(
data.project_selection(selection).into_iter().collect(),
));
PartitionResponse::new(data, id, None, completed_persistence_count)
}
None => PartitionResponse::new_no_batches(id, None, completed_persistence_count),
};
let ret = PartitionResponse::new(
Box::pin(MemoryStream::new(
data.project_selection(selection).into_iter().collect(),
)),
id,
None,
completed_persistence_count,
);
span.ok("read partition data");
Some(ret)
ret
});
Ok(PartitionStream::new(futures::stream::iter(partitions)))

View File

@ -8,7 +8,7 @@ use datafusion::physical_plan::SendableRecordBatchStream;
/// Response data for a single partition.
pub(crate) struct PartitionResponse {
/// Stream of snapshots.
batches: SendableRecordBatchStream,
batches: Option<SendableRecordBatchStream>,
/// Partition ID.
id: PartitionId,
@ -42,7 +42,20 @@ impl PartitionResponse {
completed_persistence_count: u64,
) -> Self {
Self {
batches,
batches: Some(batches),
id,
max_persisted_sequence_number,
completed_persistence_count,
}
}
pub(crate) fn new_no_batches(
id: PartitionId,
max_persisted_sequence_number: Option<SequenceNumber>,
completed_persistence_count: u64,
) -> Self {
Self {
batches: None,
id,
max_persisted_sequence_number,
completed_persistence_count,
@ -61,7 +74,7 @@ impl PartitionResponse {
self.completed_persistence_count
}
pub(crate) fn into_record_batch_stream(self) -> SendableRecordBatchStream {
pub(crate) fn into_record_batch_stream(self) -> Option<SendableRecordBatchStream> {
self.batches
}
}

View File

@ -51,6 +51,7 @@ impl QueryResponse {
/// Reduce the [`QueryResponse`] to a stream of [`RecordBatch`].
pub(crate) fn into_record_batches(self) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
self.into_partition_stream()
.flat_map(|partition| partition.into_record_batch_stream())
.flat_map(|partition| futures::stream::iter(partition.into_record_batch_stream()))
.flatten()
}
}

View File

@ -322,21 +322,25 @@ impl From<QueryResponse> for FlatIngesterQueryResponseStream {
completed_persistence_count,
})
});
let tail = partition
.into_record_batch_stream()
.flat_map(|snapshot_res| match snapshot_res {
Ok(snapshot) => {
let schema = Arc::new(prepare_schema_for_flight(&snapshot.schema()));
let schema_captured = Arc::clone(&schema);
let head = futures::stream::once(async {
Ok(FlatIngesterQueryResponse::StartSnapshot {
schema: schema_captured,
})
});
match partition.into_record_batch_stream() {
Some(stream) => {
let tail = stream.flat_map(|snapshot_res| match snapshot_res {
Ok(snapshot) => {
let schema =
Arc::new(prepare_schema_for_flight(&snapshot.schema()));
let tail =
match prepare_batch_for_flight(&snapshot, Arc::clone(&schema)) {
let schema_captured = Arc::clone(&schema);
let head = futures::stream::once(async {
Ok(FlatIngesterQueryResponse::StartSnapshot {
schema: schema_captured,
})
});
let tail = match prepare_batch_for_flight(
&snapshot,
Arc::clone(&schema),
) {
Ok(batch) => {
futures::stream::iter(split_batch_for_grpc_response(batch))
.map(|batch| {
@ -347,12 +351,15 @@ impl From<QueryResponse> for FlatIngesterQueryResponseStream {
Err(e) => futures::stream::once(async { Err(e) }).boxed(),
};
head.chain(tail).boxed()
}
Err(e) => futures::stream::once(async { Err(e) }).boxed(),
});
head.chain(tail).boxed()
}
Err(e) => futures::stream::once(async { Err(e) }).boxed(),
});
head.chain(tail).boxed()
head.chain(tail).boxed()
}
None => head.boxed(),
}
})
.boxed()
}