From 6e9c7522304ed5a7147fbd1b0453c97b4e998ed3 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 3 Aug 2022 11:17:46 -0400 Subject: [PATCH] refactor: Extract current compaction into a fn for 'hot' partitions --- compactor/src/compact.rs | 14 +-- compactor/src/handler.rs | 234 ++++++++++++++++++++------------------- compactor/src/lib.rs | 2 +- 3 files changed, 129 insertions(+), 121 deletions(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 0455de866f..fb1fe7b457 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -238,7 +238,7 @@ impl Compactor { /// * In all cases above, for each sequencer, N partitions with the most new ingested files /// will be selected and the return list will include at most, P = N * S, partitions where S /// is the number of sequencers this compactor handles. - pub async fn partitions_to_compact( + pub async fn hot_partitions_to_compact( &self, // Max number of the most recent highest ingested throughput partitions // per sequencer we want to read @@ -582,7 +582,7 @@ mod tests { // -------------------------------------- // Case 1: no files yet --> no partition candidates // - let candidates = compactor.partitions_to_compact(1, 1).await.unwrap(); + let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); assert!(candidates.is_empty()); // -------------------------------------- @@ -606,7 +606,7 @@ mod tests { .unwrap(); txn.commit().await.unwrap(); // No non-deleted level 0 files yet --> no candidates - let candidates = compactor.partitions_to_compact(1, 1).await.unwrap(); + let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); assert!(candidates.is_empty()); // -------------------------------------- @@ -624,7 +624,7 @@ mod tests { txn.commit().await.unwrap(); // // Has at least one partition with a L0 file --> make it a candidate - let candidates = compactor.partitions_to_compact(1, 1).await.unwrap(); + let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); assert_eq!(candidates.len(), 1); assert_eq!(candidates[0].partition_id, partition2.id); @@ -643,7 +643,7 @@ mod tests { txn.commit().await.unwrap(); // // Has at least one partition with a recent write --> make it a candidate - let candidates = compactor.partitions_to_compact(1, 1).await.unwrap(); + let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); assert_eq!(candidates.len(), 1); assert_eq!(candidates[0].partition_id, partition4.id); @@ -665,7 +665,7 @@ mod tests { txn.commit().await.unwrap(); // // make partitions in the most recent group candidates - let candidates = compactor.partitions_to_compact(1, 1).await.unwrap(); + let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); assert_eq!(candidates.len(), 1); assert_eq!(candidates[0].partition_id, partition3.id); @@ -686,7 +686,7 @@ mod tests { txn.commit().await.unwrap(); // // Will have 2 candidates, one for each sequencer - let mut candidates = compactor.partitions_to_compact(1, 1).await.unwrap(); + let mut candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap(); candidates.sort(); assert_eq!(candidates.len(), 2); assert_eq!(candidates[0].partition_id, partition3.id); diff --git a/compactor/src/handler.rs b/compactor/src/handler.rs index 8f7f34bb78..f236586bfd 100644 --- a/compactor/src/handler.rs +++ b/compactor/src/handler.rs @@ -248,122 +248,130 @@ async fn run_compactor(compactor: Arc, shutdown: CancellationToken) { while !shutdown.is_cancelled() { debug!("compactor main loop tick."); - // Select partition candidates - let start_time = compactor.time_provider.now(); - let candidates = Backoff::new(&compactor.backoff_config) - .retry_all_errors("partitions_to_compact", || async { - compactor - .partitions_to_compact( - compactor.config.max_number_partitions_per_sequencer(), - compactor - .config - .min_number_recent_ingested_files_per_partition(), - ) - .await - }) - .await - .expect("retry forever"); - if let Some(delta) = compactor - .time_provider - .now() - .checked_duration_since(start_time) - { - let duration = compactor - .candidate_selection_duration - .recorder(Attributes::from([])); - duration.record(delta); - } - - // Add other compaction-needed info into selected partitions - let start_time = compactor.time_provider.now(); - let candidates = Backoff::new(&compactor.backoff_config) - .retry_all_errors("partitions_to_compact", || async { - compactor.add_info_to_partitions(&candidates).await - }) - .await - .expect("retry forever"); - if let Some(delta) = compactor - .time_provider - .now() - .checked_duration_since(start_time) - { - let duration = compactor - .partitions_extra_info_reading_duration - .recorder(Attributes::from([])); - duration.record(delta); - } - - let n_candidates = candidates.len(); - if n_candidates == 0 { - debug!("no compaction candidates found"); - // sleep for a second to avoid a hot busy loop when the - // catalog is polled - tokio::time::sleep(PAUSE_BETWEEN_NO_WORK).await; - continue; - } else { - debug!(n_candidates, "found compaction candidates"); - } - - let start_time = compactor.time_provider.now(); - - // Repeat compacting n partitions in parallel until all candidates are compacted. - // Concurrency level calculation (this is estimated from previous experiments. The actual resource - // management will be more complicated and a future feature): - // . Each `compact partititon` takes max of this much memory input_size_threshold_bytes - // . We have this memory budget: max_concurrent_compaction_size_bytes - // --> num_parallel_partitions = max_concurrent_compaction_size_bytes/ input_size_threshold_bytes - let num_parallel_partitions = (compactor.config.max_concurrent_compaction_size_bytes - / compactor.config.input_size_threshold_bytes) - as usize; - - futures::stream::iter(candidates) - .map(|p| { - // run compaction in its own task - let comp = Arc::clone(&compactor); - tokio::task::spawn(async move { - let partition_id = p.candidate.partition_id; - let compaction_result = crate::compact_partition(&comp, p).await; - - match compaction_result { - Err(e) => { - warn!(?e, ?partition_id, "compaction failed"); - } - Ok(_) => { - debug!(?partition_id, "compaction complete"); - } - }; - }) - }) - // Assume we have enough resources to run - // num_parallel_partitions compactions in parallel - .buffer_unordered(num_parallel_partitions) - // report any JoinErrors (aka task panics) - .map(|join_result| { - if let Err(e) = join_result { - warn!(?e, "compaction task failed"); - } - Ok(()) - }) - // Errors are reported during execution, so ignore results here - // https://stackoverflow.com/questions/64443085/how-to-run-stream-to-completion-in-rust-using-combinators-other-than-for-each - .forward(futures::sink::drain()) - .await - .ok(); - - // Done compacting all candidates in the cycle, record its time - if let Some(delta) = compactor - .time_provider - .now() - .checked_duration_since(start_time) - { - let duration = compactor - .compaction_cycle_duration - .recorder(Attributes::from([])); - duration.record(delta); - } + compact_hot_partitions(Arc::clone(&compactor)).await; + compact_cold_partitions(Arc::clone(&compactor)).await; } } +async fn compact_hot_partitions(compactor: Arc) { + // Select hot partition candidates + let start_time = compactor.time_provider.now(); + let candidates = Backoff::new(&compactor.backoff_config) + .retry_all_errors("hot_partitions_to_compact", || async { + compactor + .hot_partitions_to_compact( + compactor.config.max_number_partitions_per_sequencer(), + compactor + .config + .min_number_recent_ingested_files_per_partition(), + ) + .await + }) + .await + .expect("retry forever"); + if let Some(delta) = compactor + .time_provider + .now() + .checked_duration_since(start_time) + { + let duration = compactor + .candidate_selection_duration + .recorder(Attributes::from([])); + duration.record(delta); + } + + // Add other compaction-needed info into selected partitions + let start_time = compactor.time_provider.now(); + let candidates = Backoff::new(&compactor.backoff_config) + .retry_all_errors("add_info_to_partitions", || async { + compactor.add_info_to_partitions(&candidates).await + }) + .await + .expect("retry forever"); + if let Some(delta) = compactor + .time_provider + .now() + .checked_duration_since(start_time) + { + let duration = compactor + .partitions_extra_info_reading_duration + .recorder(Attributes::from([])); + duration.record(delta); + } + + let n_candidates = candidates.len(); + if n_candidates == 0 { + debug!("no hot compaction candidates found"); + // sleep for a second to avoid a hot busy loop when the + // catalog is polled + tokio::time::sleep(PAUSE_BETWEEN_NO_WORK).await; + return; + } else { + debug!(n_candidates, "found hot compaction candidates"); + } + + let start_time = compactor.time_provider.now(); + + // Repeat compacting n partitions in parallel until all candidates are compacted. + // Concurrency level calculation (this is estimated from previous experiments. The actual + // resource management will be more complicated and a future feature): + // . Each `compact partititon` takes max of this much memory input_size_threshold_bytes + // . We have this memory budget: max_concurrent_compaction_size_bytes + // --> num_parallel_partitions = max_concurrent_compaction_size_bytes/ + // input_size_threshold_bytes + let num_parallel_partitions = (compactor.config.max_concurrent_compaction_size_bytes + / compactor.config.input_size_threshold_bytes) + as usize; + + futures::stream::iter(candidates) + .map(|p| { + // run compaction in its own task + let comp = Arc::clone(&compactor); + tokio::task::spawn(async move { + let partition_id = p.candidate.partition_id; + let compaction_result = crate::compact_partition(&comp, p).await; + + match compaction_result { + Err(e) => { + warn!(?e, ?partition_id, "hot compaction failed"); + } + Ok(_) => { + debug!(?partition_id, "hot compaction complete"); + } + }; + }) + }) + // Assume we have enough resources to run + // num_parallel_partitions compactions in parallel + .buffer_unordered(num_parallel_partitions) + // report any JoinErrors (aka task panics) + .map(|join_result| { + if let Err(e) = join_result { + warn!(?e, "compaction task failed"); + } + Ok(()) + }) + // Errors are reported during execution, so ignore results here + // https://stackoverflow.com/questions/64443085/how-to-run-stream-to-completion-in-rust-using-combinators-other-than-for-each + .forward(futures::sink::drain()) + .await + .ok(); + + // Done compacting all candidates in the cycle, record its time + if let Some(delta) = compactor + .time_provider + .now() + .checked_duration_since(start_time) + { + let duration = compactor + .compaction_cycle_duration + .recorder(Attributes::from([])); + duration.record(delta); + } +} + +async fn compact_cold_partitions(_compactor: Arc) {} + #[async_trait] impl CompactorHandler for CompactorHandlerImpl { async fn join(&self) { diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index 8e045881bd..6ae24d05f0 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -253,7 +253,7 @@ mod tests { // ------------------------------------------------ // Compact let candidates = compactor - .partitions_to_compact( + .hot_partitions_to_compact( compactor.config.max_number_partitions_per_sequencer(), compactor .config