diff --git a/compactor/src/components/compaction_job_stream/mod.rs b/compactor/src/components/compaction_job_stream/mod.rs index 0a1a162a48..2062251712 100644 --- a/compactor/src/components/compaction_job_stream/mod.rs +++ b/compactor/src/components/compaction_job_stream/mod.rs @@ -6,7 +6,7 @@ use futures::stream::BoxStream; pub mod endless; pub mod once; -/// Source for partitions. +/// Source for compaction jobs. pub trait CompactionJobStream: Debug + Display + Send + Sync { /// Create new source stream of compaction jobs. /// diff --git a/compactor/src/components/compaction_jobs_source/logging.rs b/compactor/src/components/compaction_jobs_source/logging.rs index 67ad441aa5..8aed030350 100644 --- a/compactor/src/components/compaction_jobs_source/logging.rs +++ b/compactor/src/components/compaction_jobs_source/logging.rs @@ -38,12 +38,12 @@ where T: CompactionJobsSource, { async fn fetch(&self) -> Vec { - let partitions = self.inner.fetch().await; - info!(n_partitions = partitions.len(), "Fetch partitions",); - if partitions.is_empty() { - warn!("No partition found"); + let jobs = self.inner.fetch().await; + info!(n_jobs = jobs.len(), "Fetch jobs",); + if jobs.is_empty() { + warn!("No compaction job found"); } - partitions + jobs } } @@ -68,26 +68,25 @@ mod tests { // logs normal log message (so it's easy search for every single call) but also an extra warning assert_eq!( capture.to_string(), - "level = INFO; message = Fetch partitions; n_partitions = 0; \ - \nlevel = WARN; message = No partition found; ", + "level = INFO; message = Fetch jobs; n_jobs = 0; \ + \nlevel = WARN; message = No compaction job found; ", ); } #[tokio::test] async fn test_fetch_some() { - let p_1 = CompactionJob::new(PartitionId::new(5)); - let p_2 = CompactionJob::new(PartitionId::new(1)); - let p_3 = CompactionJob::new(PartitionId::new(12)); - let partitions = vec![p_1, p_2, p_3]; + let cj_1 = CompactionJob::new(PartitionId::new(5)); + let cj_2 = CompactionJob::new(PartitionId::new(1)); + let cj_3 = CompactionJob::new(PartitionId::new(12)); + let jobs = vec![cj_1, cj_2, cj_3]; - let source = - LoggingCompactionJobsWrapper::new(MockCompactionJobsSource::new(partitions.clone())); + let source = LoggingCompactionJobsWrapper::new(MockCompactionJobsSource::new(jobs.clone())); let capture = TracingCapture::new(); - assert_eq!(source.fetch().await, partitions,); + assert_eq!(source.fetch().await, jobs,); // just the ordinary log message, no warning assert_eq!( capture.to_string(), - "level = INFO; message = Fetch partitions; n_partitions = 3; ", + "level = INFO; message = Fetch jobs; n_jobs = 3; ", ); } } diff --git a/compactor/src/components/compaction_jobs_source/metrics.rs b/compactor/src/components/compaction_jobs_source/metrics.rs index d617216c15..d42e005fcd 100644 --- a/compactor/src/components/compaction_jobs_source/metrics.rs +++ b/compactor/src/components/compaction_jobs_source/metrics.rs @@ -60,10 +60,10 @@ where T: CompactionJobsSource, { async fn fetch(&self) -> Vec { - let partitions = self.inner.fetch().await; + let jobs = self.inner.fetch().await; self.partitions_fetch_counter.inc(1); - self.partitions_counter.inc(partitions.len() as u64); - partitions + self.partitions_counter.inc(jobs.len() as u64); + jobs } } diff --git a/compactor/src/components/compaction_jobs_source/mock.rs b/compactor/src/components/compaction_jobs_source/mock.rs index 0248caec1f..78e92f2974 100644 --- a/compactor/src/components/compaction_jobs_source/mock.rs +++ b/compactor/src/components/compaction_jobs_source/mock.rs @@ -7,22 +7,22 @@ use super::CompactionJobsSource; /// A mock structure for providing [compaction jobs](CompactionJob). #[derive(Debug)] pub struct MockCompactionJobsSource { - partitions: Mutex>, + compaction_jobs: Mutex>, } impl MockCompactionJobsSource { #[allow(dead_code)] /// Create a new MockCompactionJobsSource. - pub fn new(partitions: Vec) -> Self { + pub fn new(jobs: Vec) -> Self { Self { - partitions: Mutex::new(partitions), + compaction_jobs: Mutex::new(jobs), } } /// Set CompactionJobs for MockCompactionJobsSource. #[allow(dead_code)] // not used anywhere - pub fn set(&self, partitions: Vec) { - *self.partitions.lock() = partitions; + pub fn set(&self, jobs: Vec) { + *self.compaction_jobs.lock() = jobs; } } @@ -35,7 +35,7 @@ impl std::fmt::Display for MockCompactionJobsSource { #[async_trait] impl CompactionJobsSource for MockCompactionJobsSource { async fn fetch(&self) -> Vec { - self.partitions.lock().clone() + self.compaction_jobs.lock().clone() } } @@ -55,10 +55,10 @@ mod tests { let source = MockCompactionJobsSource::new(vec![]); assert_eq!(source.fetch().await, vec![],); - let p_1 = CompactionJob::new(PartitionId::new(5)); - let p_2 = CompactionJob::new(PartitionId::new(1)); - let p_3 = CompactionJob::new(PartitionId::new(12)); - let parts = vec![p_1, p_2, p_3]; + let cj_1 = CompactionJob::new(PartitionId::new(5)); + let cj_2 = CompactionJob::new(PartitionId::new(1)); + let cj_3 = CompactionJob::new(PartitionId::new(12)); + let parts = vec![cj_1, cj_2, cj_3]; source.set(parts.clone()); assert_eq!(source.fetch().await, parts,); } diff --git a/compactor/src/components/compaction_jobs_source/mod.rs b/compactor/src/components/compaction_jobs_source/mod.rs index 3caf735f37..f88d5fa55b 100644 --- a/compactor/src/components/compaction_jobs_source/mod.rs +++ b/compactor/src/components/compaction_jobs_source/mod.rs @@ -19,7 +19,7 @@ use compactor_scheduler::CompactionJob; /// A source of partitions, noted by [`CompactionJob`](compactor_scheduler::CompactionJob), that may potentially need compacting. #[async_trait] pub trait CompactionJobsSource: Debug + Display + Send + Sync { - /// Get partition IDs. + /// Get compaction jobs. (For now, 1 job equals 1 partition ID). /// /// This method performs retries. /// diff --git a/compactor/src/components/compaction_jobs_source/randomize_order.rs b/compactor/src/components/compaction_jobs_source/randomize_order.rs index 25696f98b6..da23d4fe9a 100644 --- a/compactor/src/components/compaction_jobs_source/randomize_order.rs +++ b/compactor/src/components/compaction_jobs_source/randomize_order.rs @@ -39,10 +39,10 @@ where T: CompactionJobsSource, { async fn fetch(&self) -> Vec { - let mut partitions = self.inner.fetch().await; + let mut compaction_jobs = self.inner.fetch().await; let mut rng = StdRng::seed_from_u64(self.seed); - partitions.shuffle(&mut rng); - partitions + compaction_jobs.shuffle(&mut rng); + compaction_jobs } } @@ -72,46 +72,46 @@ mod tests { #[tokio::test] async fn test_fetch_some() { - let p_1 = CompactionJob::new(PartitionId::new(5)); - let p_2 = CompactionJob::new(PartitionId::new(1)); - let p_3 = CompactionJob::new(PartitionId::new(12)); - let partitions = vec![p_1.clone(), p_2.clone(), p_3.clone()]; + let cj_1 = CompactionJob::new(PartitionId::new(5)); + let cj_2 = CompactionJob::new(PartitionId::new(1)); + let cj_3 = CompactionJob::new(PartitionId::new(12)); + let compaction_jobs = vec![cj_1.clone(), cj_2.clone(), cj_3.clone()]; // shuffles let source = RandomizeOrderCompactionJobsSourcesWrapper::new( - MockCompactionJobsSource::new(partitions.clone()), + MockCompactionJobsSource::new(compaction_jobs.clone()), 123, ); assert_eq!( source.fetch().await, - vec![p_3.clone(), p_2.clone(), p_1.clone(),], + vec![cj_3.clone(), cj_2.clone(), cj_1.clone(),], ); // is deterministic in same source for _ in 0..100 { assert_eq!( source.fetch().await, - vec![p_3.clone(), p_2.clone(), p_1.clone(),], + vec![cj_3.clone(), cj_2.clone(), cj_1.clone(),], ); } // is deterministic with new source for _ in 0..100 { let source = RandomizeOrderCompactionJobsSourcesWrapper::new( - MockCompactionJobsSource::new(partitions.clone()), + MockCompactionJobsSource::new(compaction_jobs.clone()), 123, ); assert_eq!( source.fetch().await, - vec![p_3.clone(), p_2.clone(), p_1.clone(),], + vec![cj_3.clone(), cj_2.clone(), cj_1.clone(),], ); } // different seed => different output let source = RandomizeOrderCompactionJobsSourcesWrapper::new( - MockCompactionJobsSource::new(partitions.clone()), + MockCompactionJobsSource::new(compaction_jobs.clone()), 1234, ); - assert_eq!(source.fetch().await, vec![p_2, p_3, p_1,],); + assert_eq!(source.fetch().await, vec![cj_2, cj_3, cj_1,],); } } diff --git a/compactor/src/components/partition_source/mock.rs b/compactor/src/components/partition_source/mock.rs index d1aa3d5272..27511a5743 100644 --- a/compactor/src/components/partition_source/mock.rs +++ b/compactor/src/components/partition_source/mock.rs @@ -46,25 +46,25 @@ mod tests { #[tokio::test] async fn test_fetch_by_id() { - let p_1 = PartitionBuilder::new(5).build(); - let p_2 = PartitionBuilder::new(1).build(); - let p_3 = PartitionBuilder::new(12).build(); - let partitions = vec![p_1.clone(), p_2.clone(), p_3.clone()]; - let source = MockPartitionSource::new(partitions); + let cj_1 = PartitionBuilder::new(5).build(); + let cj_2 = PartitionBuilder::new(1).build(); + let cj_3 = PartitionBuilder::new(12).build(); + let compaction_jobs = vec![cj_1.clone(), cj_2.clone(), cj_3.clone()]; + let source = MockPartitionSource::new(compaction_jobs); assert_eq!( source.fetch_by_id(PartitionId::new(5)).await, - Some(p_1.clone()) + Some(cj_1.clone()) ); assert_eq!( source.fetch_by_id(PartitionId::new(1)).await, - Some(p_2.clone()) + Some(cj_2.clone()) ); // fetching does not drain assert_eq!( source.fetch_by_id(PartitionId::new(5)).await, - Some(p_1.clone()) + Some(cj_1.clone()) ); // unknown table => None result diff --git a/compactor/src/driver.rs b/compactor/src/driver.rs index faef17fccd..5c73969226 100644 --- a/compactor/src/driver.rs +++ b/compactor/src/driver.rs @@ -40,7 +40,8 @@ pub async fn compact( .map(|job| { let components = Arc::clone(components); - // A root span is created for each partition. Later this can be linked to the + // A root span is created for each compaction job (a.k.a. partition). + // Later this can be linked to the // scheduler's span via something passed through compaction_job_stream. let root_span: Option = trace_collector .as_ref()