From c1db76bf9ee1fac273e83b7ec266afe88ef9bf2e Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 19 Dec 2022 11:21:53 +0100 Subject: [PATCH] refactor: remove max seqnum in PartitionResponse Removes the redundant max_persisted_sequence_number in PartitionResponse, which was functionally replaced with completed_persistence_count for the Querier's parquet file discovery instead. --- ingester2/src/buffer_tree/table.rs | 4 ++-- ingester2/src/query/partition_response.rs | 19 ++----------------- ingester2/src/server/grpc/query.rs | 3 +-- 3 files changed, 5 insertions(+), 21 deletions(-) diff --git a/ingester2/src/buffer_tree/table.rs b/ingester2/src/buffer_tree/table.rs index e109807710..d691e93e0c 100644 --- a/ingester2/src/buffer_tree/table.rs +++ b/ingester2/src/buffer_tree/table.rs @@ -294,9 +294,9 @@ where let data = Box::pin(MemoryStream::new( data.project_selection(selection).into_iter().collect(), )); - PartitionResponse::new(data, id, None, completed_persistence_count) + PartitionResponse::new(data, id, completed_persistence_count) } - None => PartitionResponse::new_no_batches(id, None, completed_persistence_count), + None => PartitionResponse::new_no_batches(id, completed_persistence_count), }; span.ok("read partition data"); diff --git a/ingester2/src/query/partition_response.rs b/ingester2/src/query/partition_response.rs index 541cf8abcc..5221199666 100644 --- a/ingester2/src/query/partition_response.rs +++ b/ingester2/src/query/partition_response.rs @@ -2,7 +2,7 @@ //! //! [`QueryResponse`]: super::response::QueryResponse -use data_types::{PartitionId, SequenceNumber}; +use data_types::PartitionId; use datafusion::physical_plan::SendableRecordBatchStream; /// Response data for a single partition. @@ -13,9 +13,6 @@ pub(crate) struct PartitionResponse { /// Partition ID. id: PartitionId, - /// Max sequence number persisted - max_persisted_sequence_number: Option, - /// Count of persisted Parquet files for this partition by this ingester instance. completed_persistence_count: u64, } @@ -31,7 +28,6 @@ impl std::fmt::Debug for PartitionResponse { }, ) .field("partition_id", &self.id) - .field("max_persisted", &self.max_persisted_sequence_number) .field( "completed_persistence_count", &self.completed_persistence_count, @@ -44,26 +40,19 @@ impl PartitionResponse { pub(crate) fn new( batches: SendableRecordBatchStream, id: PartitionId, - max_persisted_sequence_number: Option, completed_persistence_count: u64, ) -> Self { Self { batches: Some(batches), id, - max_persisted_sequence_number, completed_persistence_count, } } - pub(crate) fn new_no_batches( - id: PartitionId, - max_persisted_sequence_number: Option, - completed_persistence_count: u64, - ) -> Self { + pub(crate) fn new_no_batches(id: PartitionId, completed_persistence_count: u64) -> Self { Self { batches: None, id, - max_persisted_sequence_number, completed_persistence_count, } } @@ -72,10 +61,6 @@ impl PartitionResponse { self.id } - pub(crate) fn max_persisted_sequence_number(&self) -> Option { - self.max_persisted_sequence_number - } - pub(crate) fn completed_persistence_count(&self) -> u64 { self.completed_persistence_count } diff --git a/ingester2/src/server/grpc/query.rs b/ingester2/src/server/grpc/query.rs index 3693a9ec06..af12138b72 100644 --- a/ingester2/src/server/grpc/query.rs +++ b/ingester2/src/server/grpc/query.rs @@ -311,13 +311,12 @@ impl From for FlatIngesterQueryResponseStream { v.into_partition_stream() .flat_map(|partition| { let partition_id = partition.id(); - let max_seq = partition.max_persisted_sequence_number().map(|v| v.get()); let completed_persistence_count = partition.completed_persistence_count(); let head = futures::stream::once(async move { Ok(FlatIngesterQueryResponse::StartPartition { partition_id, status: PartitionStatus { - parquet_max_sequence_number: max_seq, + parquet_max_sequence_number: None, }, completed_persistence_count, })