From c6fe0dab3e608b8a9cabb9b180aa5f4dcbe66ff6 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 20 Sep 2022 15:33:48 +0100 Subject: [PATCH] refactor(ingester): reduced internal visibility Changes many pub fields / methods to be pub(super), or if necessary, pub(crate). This helps maintain an internal API boundary for code hygiene, and helps identify functions that are unused / only used in tests (which I've annotated with cfg(test) and intend to remove - we should be driving code under test via the public API rather than using test-only state mutation, otherwise we're just testing our tests!) --- ingester/src/data/namespace.rs | 21 +++++++++++---------- ingester/src/data/partition.rs | 24 ++++++++++++------------ ingester/src/data/partition/buffer.rs | 1 + ingester/src/data/shard.rs | 6 +++--- ingester/src/data/table.rs | 6 +++--- 5 files changed, 30 insertions(+), 28 deletions(-) diff --git a/ingester/src/data/namespace.rs b/ingester/src/data/namespace.rs index 02a2ee7d48..73662cfffb 100644 --- a/ingester/src/data/namespace.rs +++ b/ingester/src/data/namespace.rs @@ -16,10 +16,7 @@ use write_summary::ShardProgress; #[cfg(test)] use super::triggers::TestTriggers; -use super::{ - partition::{PersistingBatch, SnapshotBatch}, - table::TableData, -}; +use super::{partition::PersistingBatch, table::TableData}; use crate::lifecycle::LifecycleHandle; /// Data of a Namespace that belongs to a given Shard @@ -103,7 +100,7 @@ impl NamespaceData { /// Buffer the operation in the cache, adding any new partitions or delete tombstones to the /// catalog. Returns true if ingest should be paused due to memory limits set in the passed /// lifecycle manager. - pub async fn buffer_operation( + pub(super) async fn buffer_operation( &self, dml_operation: DmlOperation, catalog: &dyn Catalog, @@ -180,11 +177,15 @@ impl NamespaceData { /// Snapshots the mutable buffer for the partition, which clears it out and moves it over to /// snapshots. Then return a vec of the snapshots and the optional persisting batch. - pub async fn snapshot( + #[cfg(test)] // Only used in tests + pub(crate) async fn snapshot( &self, table_name: &str, partition_key: &PartitionKey, - ) -> Option<(Vec>, Option>)> { + ) -> Option<( + Vec>, + Option>, + )> { if let Some(t) = self.table_data(table_name) { let mut t = t.write().await; @@ -202,7 +203,7 @@ impl NamespaceData { /// Snapshots the mutable buffer for the partition, which clears it out and then moves all /// snapshots over to a persisting batch, which is returned. If there is no data to snapshot /// or persist, None will be returned. - pub async fn snapshot_to_persisting( + pub(crate) async fn snapshot_to_persisting( &self, table_name: &str, partition_key: &PartitionKey, @@ -298,12 +299,12 @@ impl NamespaceData { } /// Return the [`NamespaceId`] this [`NamespaceData`] belongs to. - pub fn namespace_id(&self) -> NamespaceId { + pub(super) fn namespace_id(&self) -> NamespaceId { self.namespace_id } #[cfg(test)] - pub fn table_count(&self) -> &U64Counter { + pub(super) fn table_count(&self) -> &U64Counter { &self.table_count } } diff --git a/ingester/src/data/partition.rs b/ingester/src/data/partition.rs index 8fe2adcb9a..f31aae1807 100644 --- a/ingester/src/data/partition.rs +++ b/ingester/src/data/partition.rs @@ -119,7 +119,7 @@ pub(crate) struct PartitionData { /// The name of the table this partition is part of. table_name: Arc, - pub(crate) data: DataBuffer, + pub(super) data: DataBuffer, /// The max_persisted_sequence number for any parquet_file in this /// partition. @@ -146,15 +146,15 @@ impl PartitionData { } /// Snapshot anything in the buffer and move all snapshot data into a persisting batch - pub fn snapshot_to_persisting_batch(&mut self) -> Option> { + pub(super) fn snapshot_to_persisting_batch(&mut self) -> Option> { self.data .snapshot_to_persisting(self.shard_id, self.table_id, self.id, &self.table_name) } /// Snapshot whatever is in the buffer and return a new vec of the /// arc cloned snapshots - #[allow(dead_code)] // Used in tests - pub fn snapshot(&mut self) -> Result>, super::Error> { + #[cfg(test)] + fn snapshot(&mut self) -> Result>, super::Error> { self.data .generate_snapshot() .context(super::SnapshotSnafu)?; @@ -162,17 +162,17 @@ impl PartitionData { } /// Return non persisting data - pub fn get_non_persisting_data(&self) -> Result>, super::Error> { + pub(super) fn get_non_persisting_data(&self) -> Result>, super::Error> { self.data.buffer_and_snapshots() } /// Return persisting data - pub fn get_persisting_data(&self) -> Option { + pub(super) fn get_persisting_data(&self) -> Option { self.data.get_persisting_data() } /// Write the given mb in the buffer - pub(crate) fn buffer_write( + pub(super) fn buffer_write( &mut self, sequence_number: SequenceNumber, mb: MutableBatch, @@ -199,7 +199,7 @@ impl PartitionData { /// tombstone-applied snapshot /// . The tombstone is only added in the `deletes_during_persisting` if the `persisting` /// exists - pub(crate) async fn buffer_tombstone(&mut self, executor: &Executor, tombstone: Tombstone) { + pub(super) async fn buffer_tombstone(&mut self, executor: &Executor, tombstone: Tombstone) { self.data.add_tombstone(tombstone.clone()); // ---------------------------------------------------------- @@ -265,23 +265,23 @@ impl PartitionData { } /// Return the progress from this Partition - pub(crate) fn progress(&self) -> ShardProgress { + pub(super) fn progress(&self) -> ShardProgress { self.data.progress() } - pub(crate) fn id(&self) -> PartitionId { + pub(super) fn id(&self) -> PartitionId { self.id } /// Return the [`SequenceNumber`] that forms the (inclusive) persistence /// watermark for this partition. - pub(crate) fn max_persisted_sequence_number(&self) -> Option { + pub(super) fn max_persisted_sequence_number(&self) -> Option { self.max_persisted_sequence_number } /// Mark this partition as having completed persistence up to, and /// including, the specified [`SequenceNumber`]. - pub(crate) fn mark_persisted(&mut self, sequence_number: SequenceNumber) { + pub(super) fn mark_persisted(&mut self, sequence_number: SequenceNumber) { self.max_persisted_sequence_number = Some(sequence_number); self.data.mark_persisted(); } diff --git a/ingester/src/data/partition/buffer.rs b/ingester/src/data/partition/buffer.rs index 48552ad550..97c115a5e1 100644 --- a/ingester/src/data/partition/buffer.rs +++ b/ingester/src/data/partition/buffer.rs @@ -232,6 +232,7 @@ impl DataBuffer { } } + #[cfg(test)] pub(super) fn get_snapshots(&self) -> &[Arc] { self.snapshots.as_ref() } diff --git a/ingester/src/data/shard.rs b/ingester/src/data/shard.rs index 0f5257a93c..79f17963a2 100644 --- a/ingester/src/data/shard.rs +++ b/ingester/src/data/shard.rs @@ -55,7 +55,7 @@ impl ShardData { /// be written into the catalog before getting stored in the buffer. /// Any writes that create new IOx partitions will have those records /// created in the catalog before putting into the buffer. - pub async fn buffer_operation( + pub(super) async fn buffer_operation( &self, dml_operation: DmlOperation, catalog: &dyn Catalog, @@ -76,7 +76,7 @@ impl ShardData { } /// Gets the namespace data out of the map - pub fn namespace(&self, namespace: &str) -> Option> { + pub(crate) fn namespace(&self, namespace: &str) -> Option> { let n = self.namespaces.read(); n.get(namespace).cloned() } @@ -127,7 +127,7 @@ impl ShardData { } /// Return the [`ShardIndex`] this [`ShardData`] is buffering for. - pub fn shard_index(&self) -> ShardIndex { + pub(super) fn shard_index(&self) -> ShardIndex { self.shard_index } } diff --git a/ingester/src/data/table.rs b/ingester/src/data/table.rs index e53d3d815f..142a5684be 100644 --- a/ingester/src/data/table.rs +++ b/ingester/src/data/table.rs @@ -45,7 +45,7 @@ impl TableData { } /// Return parquet_max_sequence_number - pub fn parquet_max_sequence_number(&self) -> Option { + pub(super) fn parquet_max_sequence_number(&self) -> Option { self.partition_data .values() .map(|p| p.max_persisted_sequence_number()) @@ -55,7 +55,7 @@ impl TableData { /// Return tombstone_max_sequence_number #[allow(dead_code)] // Used in tests - pub fn tombstone_max_sequence_number(&self) -> Option { + pub(super) fn tombstone_max_sequence_number(&self) -> Option { self.tombstone_max_sequence_number } @@ -132,7 +132,7 @@ impl TableData { Ok(()) } - pub fn unpersisted_partition_data(&self) -> Vec { + pub(crate) fn unpersisted_partition_data(&self) -> Vec { self.partition_data .values() .map(|p| UnpersistedPartitionData {