Merge pull request #6436 from influxdata/dom/query-types-refactor
refactor(ingester2): query typespull/24376/head
commit
f469854a0d
|
@ -294,9 +294,9 @@ where
|
|||
let data = Box::pin(MemoryStream::new(
|
||||
data.project_selection(selection).into_iter().collect(),
|
||||
));
|
||||
PartitionResponse::new(data, id, None, completed_persistence_count)
|
||||
PartitionResponse::new(Some(data), id, completed_persistence_count)
|
||||
}
|
||||
None => PartitionResponse::new_no_batches(id, None, completed_persistence_count),
|
||||
None => PartitionResponse::new(None, id, completed_persistence_count),
|
||||
};
|
||||
|
||||
span.ok("read partition data");
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
//!
|
||||
//! [`QueryResponse`]: super::response::QueryResponse
|
||||
|
||||
use data_types::{PartitionId, SequenceNumber};
|
||||
use data_types::PartitionId;
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
|
||||
/// Response data for a single partition.
|
||||
|
@ -13,9 +13,6 @@ pub(crate) struct PartitionResponse {
|
|||
/// Partition ID.
|
||||
id: PartitionId,
|
||||
|
||||
/// Max sequence number persisted
|
||||
max_persisted_sequence_number: Option<SequenceNumber>,
|
||||
|
||||
/// Count of persisted Parquet files for this partition by this ingester instance.
|
||||
completed_persistence_count: u64,
|
||||
}
|
||||
|
@ -23,9 +20,14 @@ pub(crate) struct PartitionResponse {
|
|||
impl std::fmt::Debug for PartitionResponse {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("PartitionResponse")
|
||||
.field("batches", &"<SNAPSHOT STREAM>")
|
||||
.field(
|
||||
"batches",
|
||||
&match self.batches {
|
||||
Some(_) => "<SNAPSHOT STREAM>",
|
||||
None => "<NO DATA>,",
|
||||
},
|
||||
)
|
||||
.field("partition_id", &self.id)
|
||||
.field("max_persisted", &self.max_persisted_sequence_number)
|
||||
.field(
|
||||
"completed_persistence_count",
|
||||
&self.completed_persistence_count,
|
||||
|
@ -36,28 +38,13 @@ impl std::fmt::Debug for PartitionResponse {
|
|||
|
||||
impl PartitionResponse {
|
||||
pub(crate) fn new(
|
||||
batches: SendableRecordBatchStream,
|
||||
data: Option<SendableRecordBatchStream>,
|
||||
id: PartitionId,
|
||||
max_persisted_sequence_number: Option<SequenceNumber>,
|
||||
completed_persistence_count: u64,
|
||||
) -> Self {
|
||||
Self {
|
||||
batches: Some(batches),
|
||||
batches: data,
|
||||
id,
|
||||
max_persisted_sequence_number,
|
||||
completed_persistence_count,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new_no_batches(
|
||||
id: PartitionId,
|
||||
max_persisted_sequence_number: Option<SequenceNumber>,
|
||||
completed_persistence_count: u64,
|
||||
) -> Self {
|
||||
Self {
|
||||
batches: None,
|
||||
id,
|
||||
max_persisted_sequence_number,
|
||||
completed_persistence_count,
|
||||
}
|
||||
}
|
||||
|
@ -66,10 +53,6 @@ impl PartitionResponse {
|
|||
self.id
|
||||
}
|
||||
|
||||
pub(crate) fn max_persisted_sequence_number(&self) -> Option<SequenceNumber> {
|
||||
self.max_persisted_sequence_number
|
||||
}
|
||||
|
||||
pub(crate) fn completed_persistence_count(&self) -> u64 {
|
||||
self.completed_persistence_count
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
//!
|
||||
//! [`QueryExec::query_exec()`]: super::QueryExec::query_exec()
|
||||
|
||||
use std::pin::Pin;
|
||||
use std::{future, pin::Pin};
|
||||
|
||||
use arrow::{error::ArrowError, record_batch::RecordBatch};
|
||||
use futures::{Stream, StreamExt};
|
||||
|
@ -51,7 +51,7 @@ impl QueryResponse {
|
|||
/// Reduce the [`QueryResponse`] to a stream of [`RecordBatch`].
|
||||
pub(crate) fn into_record_batches(self) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
|
||||
self.into_partition_stream()
|
||||
.flat_map(|partition| futures::stream::iter(partition.into_record_batch_stream()))
|
||||
.filter_map(|partition| future::ready(partition.into_record_batch_stream()))
|
||||
.flatten()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -311,13 +311,12 @@ impl From<QueryResponse> for FlatIngesterQueryResponseStream {
|
|||
v.into_partition_stream()
|
||||
.flat_map(|partition| {
|
||||
let partition_id = partition.id();
|
||||
let max_seq = partition.max_persisted_sequence_number().map(|v| v.get());
|
||||
let completed_persistence_count = partition.completed_persistence_count();
|
||||
let head = futures::stream::once(async move {
|
||||
Ok(FlatIngesterQueryResponse::StartPartition {
|
||||
partition_id,
|
||||
status: PartitionStatus {
|
||||
parquet_max_sequence_number: max_seq,
|
||||
parquet_max_sequence_number: None,
|
||||
},
|
||||
completed_persistence_count,
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue