refactor: remove max seqnum in PartitionResponse
Removes the redundant max_persisted_sequence_number in PartitionResponse, which was functionally replaced with completed_persistence_count for the Querier's parquet file discovery instead.pull/24376/head
parent
13ed3f9acb
commit
c1db76bf9e
|
@ -294,9 +294,9 @@ where
|
||||||
let data = Box::pin(MemoryStream::new(
|
let data = Box::pin(MemoryStream::new(
|
||||||
data.project_selection(selection).into_iter().collect(),
|
data.project_selection(selection).into_iter().collect(),
|
||||||
));
|
));
|
||||||
PartitionResponse::new(data, id, None, completed_persistence_count)
|
PartitionResponse::new(data, id, completed_persistence_count)
|
||||||
}
|
}
|
||||||
None => PartitionResponse::new_no_batches(id, None, completed_persistence_count),
|
None => PartitionResponse::new_no_batches(id, completed_persistence_count),
|
||||||
};
|
};
|
||||||
|
|
||||||
span.ok("read partition data");
|
span.ok("read partition data");
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
//!
|
//!
|
||||||
//! [`QueryResponse`]: super::response::QueryResponse
|
//! [`QueryResponse`]: super::response::QueryResponse
|
||||||
|
|
||||||
use data_types::{PartitionId, SequenceNumber};
|
use data_types::PartitionId;
|
||||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||||
|
|
||||||
/// Response data for a single partition.
|
/// Response data for a single partition.
|
||||||
|
@ -13,9 +13,6 @@ pub(crate) struct PartitionResponse {
|
||||||
/// Partition ID.
|
/// Partition ID.
|
||||||
id: PartitionId,
|
id: PartitionId,
|
||||||
|
|
||||||
/// Max sequence number persisted
|
|
||||||
max_persisted_sequence_number: Option<SequenceNumber>,
|
|
||||||
|
|
||||||
/// Count of persisted Parquet files for this partition by this ingester instance.
|
/// Count of persisted Parquet files for this partition by this ingester instance.
|
||||||
completed_persistence_count: u64,
|
completed_persistence_count: u64,
|
||||||
}
|
}
|
||||||
|
@ -31,7 +28,6 @@ impl std::fmt::Debug for PartitionResponse {
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
.field("partition_id", &self.id)
|
.field("partition_id", &self.id)
|
||||||
.field("max_persisted", &self.max_persisted_sequence_number)
|
|
||||||
.field(
|
.field(
|
||||||
"completed_persistence_count",
|
"completed_persistence_count",
|
||||||
&self.completed_persistence_count,
|
&self.completed_persistence_count,
|
||||||
|
@ -44,26 +40,19 @@ impl PartitionResponse {
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
batches: SendableRecordBatchStream,
|
batches: SendableRecordBatchStream,
|
||||||
id: PartitionId,
|
id: PartitionId,
|
||||||
max_persisted_sequence_number: Option<SequenceNumber>,
|
|
||||||
completed_persistence_count: u64,
|
completed_persistence_count: u64,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
batches: Some(batches),
|
batches: Some(batches),
|
||||||
id,
|
id,
|
||||||
max_persisted_sequence_number,
|
|
||||||
completed_persistence_count,
|
completed_persistence_count,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn new_no_batches(
|
pub(crate) fn new_no_batches(id: PartitionId, completed_persistence_count: u64) -> Self {
|
||||||
id: PartitionId,
|
|
||||||
max_persisted_sequence_number: Option<SequenceNumber>,
|
|
||||||
completed_persistence_count: u64,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
Self {
|
||||||
batches: None,
|
batches: None,
|
||||||
id,
|
id,
|
||||||
max_persisted_sequence_number,
|
|
||||||
completed_persistence_count,
|
completed_persistence_count,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -72,10 +61,6 @@ impl PartitionResponse {
|
||||||
self.id
|
self.id
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn max_persisted_sequence_number(&self) -> Option<SequenceNumber> {
|
|
||||||
self.max_persisted_sequence_number
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn completed_persistence_count(&self) -> u64 {
|
pub(crate) fn completed_persistence_count(&self) -> u64 {
|
||||||
self.completed_persistence_count
|
self.completed_persistence_count
|
||||||
}
|
}
|
||||||
|
|
|
@ -311,13 +311,12 @@ impl From<QueryResponse> for FlatIngesterQueryResponseStream {
|
||||||
v.into_partition_stream()
|
v.into_partition_stream()
|
||||||
.flat_map(|partition| {
|
.flat_map(|partition| {
|
||||||
let partition_id = partition.id();
|
let partition_id = partition.id();
|
||||||
let max_seq = partition.max_persisted_sequence_number().map(|v| v.get());
|
|
||||||
let completed_persistence_count = partition.completed_persistence_count();
|
let completed_persistence_count = partition.completed_persistence_count();
|
||||||
let head = futures::stream::once(async move {
|
let head = futures::stream::once(async move {
|
||||||
Ok(FlatIngesterQueryResponse::StartPartition {
|
Ok(FlatIngesterQueryResponse::StartPartition {
|
||||||
partition_id,
|
partition_id,
|
||||||
status: PartitionStatus {
|
status: PartitionStatus {
|
||||||
parquet_max_sequence_number: max_seq,
|
parquet_max_sequence_number: None,
|
||||||
},
|
},
|
||||||
completed_persistence_count,
|
completed_persistence_count,
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue