From 4506bf3b8faa3d6ca78f300eebfbfb55b2634d16 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 6 May 2022 11:27:03 -0400 Subject: [PATCH] fix: Remove or rescope dead code in ingester --- ingester/src/data.rs | 93 +++++++++---------------------------- ingester/src/handler.rs | 6 +-- ingester/src/job.rs | 6 --- ingester/src/lib.rs | 1 - ingester/src/poison.rs | 3 ++ ingester/src/server/grpc.rs | 13 ------ 6 files changed, 27 insertions(+), 95 deletions(-) diff --git a/ingester/src/data.rs b/ingester/src/data.rs index 8f4ca19198..b9129c6f77 100644 --- a/ingester/src/data.rs +++ b/ingester/src/data.rs @@ -79,7 +79,10 @@ pub enum Error { #[snafu(display("Nothing in the Persisting list to get removed"))] PersistingEmpty, - #[snafu(display("The given batch does not match any in the Persisting list. Nothing is removed from the Persisting list"))] + #[snafu(display( + "The given batch does not match any in the Persisting list. \ + Nothing is removed from the Persisting list" + ))] PersistingNotMatch, #[snafu(display("Cannot partition data: {}", source))] @@ -154,6 +157,7 @@ impl IngesterData { } /// Get sequencer data for specific sequencer. + #[allow(dead_code)] // Used in tests pub(crate) fn sequencer(&self, sequencer_id: SequencerId) -> Option<&SequencerData> { self.sequencers.get(&sequencer_id) } @@ -191,21 +195,6 @@ impl IngesterData { .await } - /// Return table data of a given (sequencer id, namespace name, and table name) - pub(crate) fn table_data( - &self, - sequencer_id: SequencerId, - namespace_name: &str, - table_name: &str, - ) -> Option>> { - let sequencer_data = self.sequencers.get(&sequencer_id)?; - let namespaces = sequencer_data.namespaces.read(); - let namespace_data = namespaces.get(namespace_name)?; - let tables = namespace_data.tables.read(); - let table_data = tables.get(table_name)?; - Some(Arc::clone(table_data)) - } - /// Return the ingestion progress for the specified kafka /// partitions. Returns an empty `SequencerProgress` for any kafka /// partitions that this ingester doesn't know about. @@ -543,8 +532,9 @@ impl NamespaceData { } } - /// Buffer the operation in the cache, adding any new partitions or delete tombstones to the catalog. - /// Returns true if ingest should be paused due to memory limits set in the passed lifecycle manager. + /// Buffer the operation in the cache, adding any new partitions or delete tombstones to the + /// catalog. Returns true if ingest should be paused due to memory limits set in the passed + /// lifecycle manager. pub async fn buffer_operation( &self, dml_operation: DmlOperation, @@ -615,8 +605,8 @@ impl NamespaceData { } } - /// Snapshots the mutable buffer for the partition, which clears it out and moves it over to snapshots. Then - /// return a vec of the snapshots and the optional persisting batch. + /// Snapshots the mutable buffer for the partition, which clears it out and moves it over to + /// snapshots. Then return a vec of the snapshots and the optional persisting batch. pub async fn snapshot( &self, table_name: &str, @@ -782,6 +772,7 @@ impl TableData { } /// Return tombstone_max_sequence_number + #[allow(dead_code)] // Used in tests pub fn tombstone_max_sequence_number(&self) -> Option { self.tombstone_max_sequence_number } @@ -967,17 +958,9 @@ impl PartitionData { .snapshot_to_persisting(sequencer_id, table_id, partition_id, table_name) } - /// Clears the persisting batch, updates the max_persisted_sequence_number. - fn mark_persisted(&mut self) { - if let Some(persisting) = &self.data.persisting { - let (_, max) = persisting.data.min_max_sequence_numbers(); - self.data.max_persisted_sequence_number = Some(max); - } - self.data.persisting = None; - } - /// Snapshot whatever is in the buffer and return a new vec of the /// arc cloned snapshots + #[allow(dead_code)] // Used in tests pub fn snapshot(&mut self) -> Result>> { self.data.snapshot().context(SnapshotSnafu)?; Ok(self.data.snapshots.to_vec()) @@ -1017,8 +1000,10 @@ impl PartitionData { } /// Buffers a new tombstone: - /// . All the data in the `buffer` and `snapshots` will be replaced with one tombstone-applied snapshot - /// . The tombstone is only added in the `deletes_during_persisting` if the `persisting` exists + /// . All the data in the `buffer` and `snapshots` will be replaced with one + /// tombstone-applied snapshot + /// . The tombstone is only added in the `deletes_during_persisting` if the `persisting` + /// exists pub(crate) async fn buffer_tombstone( &mut self, executor: &Executor, @@ -1037,7 +1022,7 @@ impl PartitionData { { Some(query_batch) if !query_batch.is_empty() => query_batch, _ => { - // No need to procedd further + // No need to proceed further return; } }; @@ -1197,11 +1182,6 @@ impl DataBuffer { Ok(None) } - /// Returns true if there are no batches in the buffer or snapshots or persisting data - fn is_empty(&self) -> bool { - self.snapshots.is_empty() && self.buffer.is_none() && self.persisting.is_none() - } - /// Snapshots the buffer and make a QueryableBatch for all the snapshots /// Both buffer and snapshots will be empty after this pub fn snapshot_to_queryable_batch( @@ -1246,6 +1226,7 @@ impl DataBuffer { /// Snapshots the buffer and moves snapshots over to the `PersistingBatch`. /// /// # Panic + /// /// Panics if there is already a persisting batch. pub fn snapshot_to_persisting( &mut self, @@ -1275,19 +1256,6 @@ impl DataBuffer { } } - /// Add a persiting batch into the buffer persisting list - /// Note: For now, there is at most one persisting batch at a time but - /// the plan is to process several of them a time as needed - pub fn add_persisting_batch(&mut self, batch: Arc) -> Result<()> { - if self.persisting.is_some() { - return Err(Error::PersistingNotEmpty); - } else { - self.persisting = Some(batch); - } - - Ok(()) - } - /// Return a QueryableBatch of the persisting batch after applying new tombstones pub fn get_persisting_data(&self) -> Option { let persisting = match &self.persisting { @@ -1304,22 +1272,6 @@ impl DataBuffer { Some(queryable_batch) } - /// Remove the given PersistingBatch that was persisted - pub fn remove_persisting_batch(&mut self, batch: &Arc) -> Result<()> { - if let Some(persisting_batch) = &self.persisting { - if persisting_batch == batch { - // found. Remove this batch from the memory - self.persisting = None; - } else { - return Err(Error::PersistingNotMatch); - } - } else { - return Err(Error::PersistingEmpty); - } - - Ok(()) - } - /// Return the progress in this DataBuffer fn progress(&self) -> SequencerProgress { let progress = SequencerProgress::new(); @@ -1362,7 +1314,6 @@ pub struct BufferBatch { impl BufferBatch { /// Return the progress in this DataBuffer - fn progress(&self) -> SequencerProgress { SequencerProgress::new() .with_buffered(self.min_sequence_number) @@ -1452,7 +1403,8 @@ pub struct QueryableBatch { /// Status of a partition that has unpersisted data. /// -/// Note that this structure is specific to a partition (which itself is bound to a table and sequencer)! +/// Note that this structure is specific to a partition (which itself is bound to a table and +/// sequencer)! #[derive(Debug, Clone)] #[allow(missing_copy_implementations)] pub struct PartitionStatus { @@ -1473,8 +1425,9 @@ pub struct IngesterQueryResponse { /// Contains status for every partition that has unpersisted data. /// - /// If a partition does NOT appear within this map, then either all data was persisted or the ingester has never seen - /// data for this partition. In either case the querier may just read all parquet files for the missing partition. + /// If a partition does NOT appear within this map, then either all data was persisted or the + /// ingester has never seen data for this partition. In either case the querier may just read + /// all parquet files for the missing partition. pub unpersisted_partitions: BTreeMap, /// Map each record batch to a partition ID. diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 24c58e36c3..c7179e14d2 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -51,11 +51,6 @@ pub enum Error { }, } -/// When the lifecycle manager indicates that ingest should be paused because of -/// memory pressure, the sequencer will loop, sleeping this long before checking -/// with the manager if it can resume ingest. -const INGEST_PAUSE_DELAY: Duration = Duration::from_millis(100); - /// A specialized `Error` for Catalog errors pub type Result = std::result::Result; @@ -478,6 +473,7 @@ mod tests { .unwrap(); } + #[tokio::test] #[should_panic(expected = "Background worker 'bad_task' exited early!")] async fn test_join_task_early_shutdown() { let mut ingester = TestIngester::new().await.ingester; diff --git a/ingester/src/job.rs b/ingester/src/job.rs index 89aa2d3376..6b20226f23 100644 --- a/ingester/src/job.rs +++ b/ingester/src/job.rs @@ -20,12 +20,6 @@ impl Job { Self::Persist { .. } => "persist", } } - - fn partition_id(&self) -> Option { - match self { - Self::Persist { partition_id, .. } => Some(*partition_id), - } - } } /// The global job registry diff --git a/ingester/src/lib.rs b/ingester/src/lib.rs index 97824b36bd..a4534839d9 100644 --- a/ingester/src/lib.rs +++ b/ingester/src/lib.rs @@ -12,7 +12,6 @@ clippy::use_self, clippy::clone_on_ref_ptr )] -#![allow(dead_code)] pub mod compact; pub mod data; diff --git a/ingester/src/poison.rs b/ingester/src/poison.rs index 746cd05c97..c8a887deb4 100644 --- a/ingester/src/poison.rs +++ b/ingester/src/poison.rs @@ -8,6 +8,7 @@ use std::{ }; #[derive(Debug, Clone, PartialEq, Eq)] +#[allow(dead_code)] pub enum PoisonPill { LifecyclePanic, LifecycleExit, @@ -48,6 +49,7 @@ impl PoisonCabinet { } } + #[allow(dead_code)] pub fn add(&self, pill: PoisonPill) { let mut inner = self.inner.write(); inner.pills.push(pill); @@ -63,6 +65,7 @@ impl PoisonCabinet { inner.pills.contains(pill) } + #[allow(dead_code)] pub fn wait_for(&self, pill: PoisonPill) -> PoisonWait { PoisonWait { pill, diff --git a/ingester/src/server/grpc.rs b/ingester/src/server/grpc.rs index d80a7b21ee..b51c47ccd6 100644 --- a/ingester/src/server/grpc.rs +++ b/ingester/src/server/grpc.rs @@ -67,19 +67,6 @@ impl WriteInfoServiceImpl { } } -fn write_summary_error_to_status(e: write_summary::Error) -> tonic::Status { - use write_summary::Error; - - match e { - // treat "unknown partition error" as a failed precondition - // (so the client can distinguish between "write isn't - // readable" from "we can't tell if write is readable" - e @ Error::UnknownKafkaPartition { .. } => { - tonic::Status::failed_precondition(format!("Can not determine status of write: {}", e)) - } - } -} - fn to_proto_status(status: KafkaPartitionWriteStatus) -> proto::KafkaPartitionStatus { match status { KafkaPartitionWriteStatus::KafkaPartitionUnknown => proto::KafkaPartitionStatus::Unknown,