refactor(ingester): reduced internal visibility

Changes many pub fields / methods to be pub(super), or if necessary,
pub(crate).

This helps maintain an internal API boundary for code hygiene, and helps
identify functions that are unused / only used in tests (which I've
annotated with cfg(test) and intend to remove - we should be driving
code under test via the public API rather than using test-only state
mutation, otherwise we're just testing our tests!)
pull/24376/head
Dom Dwyer 2022-09-20 15:33:48 +01:00
parent 6d00d6b683
commit c6fe0dab3e
5 changed files with 30 additions and 28 deletions

View File

@ -16,10 +16,7 @@ use write_summary::ShardProgress;
#[cfg(test)]
use super::triggers::TestTriggers;
use super::{
partition::{PersistingBatch, SnapshotBatch},
table::TableData,
};
use super::{partition::PersistingBatch, table::TableData};
use crate::lifecycle::LifecycleHandle;
/// Data of a Namespace that belongs to a given Shard
@ -103,7 +100,7 @@ impl NamespaceData {
/// Buffer the operation in the cache, adding any new partitions or delete tombstones to the
/// catalog. Returns true if ingest should be paused due to memory limits set in the passed
/// lifecycle manager.
pub async fn buffer_operation(
pub(super) async fn buffer_operation(
&self,
dml_operation: DmlOperation,
catalog: &dyn Catalog,
@ -180,11 +177,15 @@ impl NamespaceData {
/// Snapshots the mutable buffer for the partition, which clears it out and moves it over to
/// snapshots. Then return a vec of the snapshots and the optional persisting batch.
pub async fn snapshot(
#[cfg(test)] // Only used in tests
pub(crate) async fn snapshot(
&self,
table_name: &str,
partition_key: &PartitionKey,
) -> Option<(Vec<Arc<SnapshotBatch>>, Option<Arc<PersistingBatch>>)> {
) -> Option<(
Vec<Arc<super::partition::SnapshotBatch>>,
Option<Arc<PersistingBatch>>,
)> {
if let Some(t) = self.table_data(table_name) {
let mut t = t.write().await;
@ -202,7 +203,7 @@ impl NamespaceData {
/// Snapshots the mutable buffer for the partition, which clears it out and then moves all
/// snapshots over to a persisting batch, which is returned. If there is no data to snapshot
/// or persist, None will be returned.
pub async fn snapshot_to_persisting(
pub(crate) async fn snapshot_to_persisting(
&self,
table_name: &str,
partition_key: &PartitionKey,
@ -298,12 +299,12 @@ impl NamespaceData {
}
/// Return the [`NamespaceId`] this [`NamespaceData`] belongs to.
pub fn namespace_id(&self) -> NamespaceId {
pub(super) fn namespace_id(&self) -> NamespaceId {
self.namespace_id
}
#[cfg(test)]
pub fn table_count(&self) -> &U64Counter {
pub(super) fn table_count(&self) -> &U64Counter {
&self.table_count
}
}

View File

@ -119,7 +119,7 @@ pub(crate) struct PartitionData {
/// The name of the table this partition is part of.
table_name: Arc<str>,
pub(crate) data: DataBuffer,
pub(super) data: DataBuffer,
/// The max_persisted_sequence number for any parquet_file in this
/// partition.
@ -146,15 +146,15 @@ impl PartitionData {
}
/// Snapshot anything in the buffer and move all snapshot data into a persisting batch
pub fn snapshot_to_persisting_batch(&mut self) -> Option<Arc<PersistingBatch>> {
pub(super) fn snapshot_to_persisting_batch(&mut self) -> Option<Arc<PersistingBatch>> {
self.data
.snapshot_to_persisting(self.shard_id, self.table_id, self.id, &self.table_name)
}
/// Snapshot whatever is in the buffer and return a new vec of the
/// arc cloned snapshots
#[allow(dead_code)] // Used in tests
pub fn snapshot(&mut self) -> Result<Vec<Arc<SnapshotBatch>>, super::Error> {
#[cfg(test)]
fn snapshot(&mut self) -> Result<Vec<Arc<SnapshotBatch>>, super::Error> {
self.data
.generate_snapshot()
.context(super::SnapshotSnafu)?;
@ -162,17 +162,17 @@ impl PartitionData {
}
/// Return non persisting data
pub fn get_non_persisting_data(&self) -> Result<Vec<Arc<SnapshotBatch>>, super::Error> {
pub(super) fn get_non_persisting_data(&self) -> Result<Vec<Arc<SnapshotBatch>>, super::Error> {
self.data.buffer_and_snapshots()
}
/// Return persisting data
pub fn get_persisting_data(&self) -> Option<QueryableBatch> {
pub(super) fn get_persisting_data(&self) -> Option<QueryableBatch> {
self.data.get_persisting_data()
}
/// Write the given mb in the buffer
pub(crate) fn buffer_write(
pub(super) fn buffer_write(
&mut self,
sequence_number: SequenceNumber,
mb: MutableBatch,
@ -199,7 +199,7 @@ impl PartitionData {
/// tombstone-applied snapshot
/// . The tombstone is only added in the `deletes_during_persisting` if the `persisting`
/// exists
pub(crate) async fn buffer_tombstone(&mut self, executor: &Executor, tombstone: Tombstone) {
pub(super) async fn buffer_tombstone(&mut self, executor: &Executor, tombstone: Tombstone) {
self.data.add_tombstone(tombstone.clone());
// ----------------------------------------------------------
@ -265,23 +265,23 @@ impl PartitionData {
}
/// Return the progress from this Partition
pub(crate) fn progress(&self) -> ShardProgress {
pub(super) fn progress(&self) -> ShardProgress {
self.data.progress()
}
pub(crate) fn id(&self) -> PartitionId {
pub(super) fn id(&self) -> PartitionId {
self.id
}
/// Return the [`SequenceNumber`] that forms the (inclusive) persistence
/// watermark for this partition.
pub(crate) fn max_persisted_sequence_number(&self) -> Option<SequenceNumber> {
pub(super) fn max_persisted_sequence_number(&self) -> Option<SequenceNumber> {
self.max_persisted_sequence_number
}
/// Mark this partition as having completed persistence up to, and
/// including, the specified [`SequenceNumber`].
pub(crate) fn mark_persisted(&mut self, sequence_number: SequenceNumber) {
pub(super) fn mark_persisted(&mut self, sequence_number: SequenceNumber) {
self.max_persisted_sequence_number = Some(sequence_number);
self.data.mark_persisted();
}

View File

@ -232,6 +232,7 @@ impl DataBuffer {
}
}
#[cfg(test)]
pub(super) fn get_snapshots(&self) -> &[Arc<SnapshotBatch>] {
self.snapshots.as_ref()
}

View File

@ -55,7 +55,7 @@ impl ShardData {
/// be written into the catalog before getting stored in the buffer.
/// Any writes that create new IOx partitions will have those records
/// created in the catalog before putting into the buffer.
pub async fn buffer_operation(
pub(super) async fn buffer_operation(
&self,
dml_operation: DmlOperation,
catalog: &dyn Catalog,
@ -76,7 +76,7 @@ impl ShardData {
}
/// Gets the namespace data out of the map
pub fn namespace(&self, namespace: &str) -> Option<Arc<NamespaceData>> {
pub(crate) fn namespace(&self, namespace: &str) -> Option<Arc<NamespaceData>> {
let n = self.namespaces.read();
n.get(namespace).cloned()
}
@ -127,7 +127,7 @@ impl ShardData {
}
/// Return the [`ShardIndex`] this [`ShardData`] is buffering for.
pub fn shard_index(&self) -> ShardIndex {
pub(super) fn shard_index(&self) -> ShardIndex {
self.shard_index
}
}

View File

@ -45,7 +45,7 @@ impl TableData {
}
/// Return parquet_max_sequence_number
pub fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
pub(super) fn parquet_max_sequence_number(&self) -> Option<SequenceNumber> {
self.partition_data
.values()
.map(|p| p.max_persisted_sequence_number())
@ -55,7 +55,7 @@ impl TableData {
/// Return tombstone_max_sequence_number
#[allow(dead_code)] // Used in tests
pub fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber> {
pub(super) fn tombstone_max_sequence_number(&self) -> Option<SequenceNumber> {
self.tombstone_max_sequence_number
}
@ -132,7 +132,7 @@ impl TableData {
Ok(())
}
pub fn unpersisted_partition_data(&self) -> Vec<UnpersistedPartitionData> {
pub(crate) fn unpersisted_partition_data(&self) -> Vec<UnpersistedPartitionData> {
self.partition_data
.values()
.map(|p| UnpersistedPartitionData {