diff --git a/compactor/src/components/partition_stream/endless.rs b/compactor/src/components/partition_stream/endless.rs index f2dfb8b25b..4a2c0318aa 100644 --- a/compactor/src/components/partition_stream/endless.rs +++ b/compactor/src/components/partition_stream/endless.rs @@ -82,11 +82,11 @@ where mod tests { use data_types::PartitionId; - use super::{super::super::partitions_source::mock::MockPartitionsSource, *}; + use super::{super::super::partitions_source::mock::MockCompactionJobsSource, *}; #[test] fn test_display() { - let stream = EndlessPartititionStream::new(MockPartitionsSource::new(vec![])); + let stream = EndlessPartititionStream::new(MockCompactionJobsSource::new(vec![])); assert_eq!(stream.to_string(), "endless(mock)"); } @@ -97,7 +97,7 @@ mod tests { CompactionJob::new(PartitionId::new(3)), CompactionJob::new(PartitionId::new(2)), ]; - let stream = EndlessPartititionStream::new(MockPartitionsSource::new(ids.clone())); + let stream = EndlessPartititionStream::new(MockCompactionJobsSource::new(ids.clone())); // stream is stateless for _ in 0..2 { diff --git a/compactor/src/components/partition_stream/once.rs b/compactor/src/components/partition_stream/once.rs index cd0f9f46fa..a4179a59a6 100644 --- a/compactor/src/components/partition_stream/once.rs +++ b/compactor/src/components/partition_stream/once.rs @@ -49,11 +49,11 @@ where mod tests { use data_types::PartitionId; - use super::{super::super::partitions_source::mock::MockPartitionsSource, *}; + use super::{super::super::partitions_source::mock::MockCompactionJobsSource, *}; #[test] fn test_display() { - let stream = OncePartititionStream::new(MockPartitionsSource::new(vec![])); + let stream = OncePartititionStream::new(MockCompactionJobsSource::new(vec![])); assert_eq!(stream.to_string(), "once(mock)"); } @@ -64,7 +64,7 @@ mod tests { CompactionJob::new(PartitionId::new(3)), CompactionJob::new(PartitionId::new(2)), ]; - let stream = OncePartititionStream::new(MockPartitionsSource::new(ids.clone())); + let stream = OncePartititionStream::new(MockCompactionJobsSource::new(ids.clone())); // stream is stateless for _ in 0..2 { diff --git a/compactor/src/components/partitions_source/logging.rs b/compactor/src/components/partitions_source/logging.rs index 1b2ef0d18f..67ad441aa5 100644 --- a/compactor/src/components/partitions_source/logging.rs +++ b/compactor/src/components/partitions_source/logging.rs @@ -52,17 +52,17 @@ mod tests { use data_types::PartitionId; use test_helpers::tracing::TracingCapture; - use super::{super::mock::MockPartitionsSource, *}; + use super::{super::mock::MockCompactionJobsSource, *}; #[test] fn test_display() { - let source = LoggingCompactionJobsWrapper::new(MockPartitionsSource::new(vec![])); + let source = LoggingCompactionJobsWrapper::new(MockCompactionJobsSource::new(vec![])); assert_eq!(source.to_string(), "logging(mock)",); } #[tokio::test] async fn test_fetch_empty() { - let source = LoggingCompactionJobsWrapper::new(MockPartitionsSource::new(vec![])); + let source = LoggingCompactionJobsWrapper::new(MockCompactionJobsSource::new(vec![])); let capture = TracingCapture::new(); assert_eq!(source.fetch().await, vec![],); // logs normal log message (so it's easy search for every single call) but also an extra warning @@ -81,7 +81,7 @@ mod tests { let partitions = vec![p_1, p_2, p_3]; let source = - LoggingCompactionJobsWrapper::new(MockPartitionsSource::new(partitions.clone())); + LoggingCompactionJobsWrapper::new(MockCompactionJobsSource::new(partitions.clone())); let capture = TracingCapture::new(); assert_eq!(source.fetch().await, partitions,); // just the ordinary log message, no warning diff --git a/compactor/src/components/partitions_source/metrics.rs b/compactor/src/components/partitions_source/metrics.rs index 2be3fdeccd..d617216c15 100644 --- a/compactor/src/components/partitions_source/metrics.rs +++ b/compactor/src/components/partitions_source/metrics.rs @@ -72,13 +72,15 @@ mod tests { use data_types::PartitionId; use metric::assert_counter; - use super::{super::mock::MockPartitionsSource, *}; + use super::{super::mock::MockCompactionJobsSource, *}; #[test] fn test_display() { let registry = Registry::new(); - let source = - MetricsCompactionJobsSourceWrapper::new(MockPartitionsSource::new(vec![]), ®istry); + let source = MetricsCompactionJobsSourceWrapper::new( + MockCompactionJobsSource::new(vec![]), + ®istry, + ); assert_eq!(source.to_string(), "metrics(mock)",); } @@ -91,7 +93,7 @@ mod tests { CompactionJob::new(PartitionId::new(12)), ]; let source = MetricsCompactionJobsSourceWrapper::new( - MockPartitionsSource::new(partitions.clone()), + MockCompactionJobsSource::new(partitions.clone()), ®istry, ); diff --git a/compactor/src/components/partitions_source/mock.rs b/compactor/src/components/partitions_source/mock.rs index 312e74b76f..0248caec1f 100644 --- a/compactor/src/components/partitions_source/mock.rs +++ b/compactor/src/components/partitions_source/mock.rs @@ -4,36 +4,36 @@ use parking_lot::Mutex; use super::CompactionJobsSource; -/// A mock structure for providing [partitions](CompactionJob). +/// A mock structure for providing [compaction jobs](CompactionJob). #[derive(Debug)] -pub struct MockPartitionsSource { +pub struct MockCompactionJobsSource { partitions: Mutex>, } -impl MockPartitionsSource { +impl MockCompactionJobsSource { #[allow(dead_code)] - /// Create a new MockPartitionsSource. + /// Create a new MockCompactionJobsSource. pub fn new(partitions: Vec) -> Self { Self { partitions: Mutex::new(partitions), } } - /// Set CompactionJobs for MockPartitionsSource. + /// Set CompactionJobs for MockCompactionJobsSource. #[allow(dead_code)] // not used anywhere pub fn set(&self, partitions: Vec) { *self.partitions.lock() = partitions; } } -impl std::fmt::Display for MockPartitionsSource { +impl std::fmt::Display for MockCompactionJobsSource { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "mock") } } #[async_trait] -impl CompactionJobsSource for MockPartitionsSource { +impl CompactionJobsSource for MockCompactionJobsSource { async fn fetch(&self) -> Vec { self.partitions.lock().clone() } @@ -47,12 +47,12 @@ mod tests { #[test] fn test_display() { - assert_eq!(MockPartitionsSource::new(vec![]).to_string(), "mock",); + assert_eq!(MockCompactionJobsSource::new(vec![]).to_string(), "mock",); } #[tokio::test] async fn test_fetch() { - let source = MockPartitionsSource::new(vec![]); + let source = MockCompactionJobsSource::new(vec![]); assert_eq!(source.fetch().await, vec![],); let p_1 = CompactionJob::new(PartitionId::new(5)); diff --git a/compactor/src/components/partitions_source/not_empty.rs b/compactor/src/components/partitions_source/not_empty.rs index ef066e52fa..c15ad151b9 100644 --- a/compactor/src/components/partitions_source/not_empty.rs +++ b/compactor/src/components/partitions_source/not_empty.rs @@ -60,12 +60,12 @@ mod tests { use data_types::PartitionId; use iox_time::{MockProvider, Time}; - use super::{super::mock::MockPartitionsSource, *}; + use super::{super::mock::MockCompactionJobsSource, *}; #[test] fn test_display() { let source = NotEmptyPartitionsSourceWrapper::new( - MockPartitionsSource::new(vec![]), + MockCompactionJobsSource::new(vec![]), Duration::from_secs(1), Arc::new(MockProvider::new(Time::MIN)), ); @@ -74,7 +74,7 @@ mod tests { #[tokio::test] async fn test_fetch() { - let inner = Arc::new(MockPartitionsSource::new(vec![])); + let inner = Arc::new(MockCompactionJobsSource::new(vec![])); let time_provider = Arc::new(MockProvider::new(Time::MIN)); let source = NotEmptyPartitionsSourceWrapper::new( Arc::clone(&inner), diff --git a/compactor/src/components/partitions_source/randomize_order.rs b/compactor/src/components/partitions_source/randomize_order.rs index 5e2f9957cb..b7587dbe93 100644 --- a/compactor/src/components/partitions_source/randomize_order.rs +++ b/compactor/src/components/partitions_source/randomize_order.rs @@ -50,19 +50,19 @@ where mod tests { use data_types::PartitionId; - use super::{super::mock::MockPartitionsSource, *}; + use super::{super::mock::MockCompactionJobsSource, *}; #[test] fn test_display() { let source = - RandomizeOrderPartitionsSourcesWrapper::new(MockPartitionsSource::new(vec![]), 123); + RandomizeOrderPartitionsSourcesWrapper::new(MockCompactionJobsSource::new(vec![]), 123); assert_eq!(source.to_string(), "randomize_order(mock)",); } #[tokio::test] async fn test_fetch_empty() { let source = - RandomizeOrderPartitionsSourcesWrapper::new(MockPartitionsSource::new(vec![]), 123); + RandomizeOrderPartitionsSourcesWrapper::new(MockCompactionJobsSource::new(vec![]), 123); assert_eq!(source.fetch().await, vec![],); } @@ -75,7 +75,7 @@ mod tests { // shuffles let source = RandomizeOrderPartitionsSourcesWrapper::new( - MockPartitionsSource::new(partitions.clone()), + MockCompactionJobsSource::new(partitions.clone()), 123, ); assert_eq!( @@ -94,7 +94,7 @@ mod tests { // is deterministic with new source for _ in 0..100 { let source = RandomizeOrderPartitionsSourcesWrapper::new( - MockPartitionsSource::new(partitions.clone()), + MockCompactionJobsSource::new(partitions.clone()), 123, ); assert_eq!( @@ -105,7 +105,7 @@ mod tests { // different seed => different output let source = RandomizeOrderPartitionsSourcesWrapper::new( - MockPartitionsSource::new(partitions.clone()), + MockCompactionJobsSource::new(partitions.clone()), 1234, ); assert_eq!(source.fetch().await, vec![p_2, p_3, p_1,],);