refactor: Extract current compaction into a fn for 'hot' partitions

pull/24376/head
Carol (Nichols || Goulding) 2022-08-03 11:17:46 -04:00
parent e82214ed38
commit 6e9c752230
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
3 changed files with 129 additions and 121 deletions

View File

@ -238,7 +238,7 @@ impl Compactor {
/// * In all cases above, for each sequencer, N partitions with the most new ingested files
/// will be selected and the return list will include at most, P = N * S, partitions where S
/// is the number of sequencers this compactor handles.
pub async fn partitions_to_compact(
pub async fn hot_partitions_to_compact(
&self,
// Max number of the most recent highest ingested throughput partitions
// per sequencer we want to read
@ -582,7 +582,7 @@ mod tests {
// --------------------------------------
// Case 1: no files yet --> no partition candidates
//
let candidates = compactor.partitions_to_compact(1, 1).await.unwrap();
let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
assert!(candidates.is_empty());
// --------------------------------------
@ -606,7 +606,7 @@ mod tests {
.unwrap();
txn.commit().await.unwrap();
// No non-deleted level 0 files yet --> no candidates
let candidates = compactor.partitions_to_compact(1, 1).await.unwrap();
let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
assert!(candidates.is_empty());
// --------------------------------------
@ -624,7 +624,7 @@ mod tests {
txn.commit().await.unwrap();
//
// Has at least one partition with a L0 file --> make it a candidate
let candidates = compactor.partitions_to_compact(1, 1).await.unwrap();
let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].partition_id, partition2.id);
@ -643,7 +643,7 @@ mod tests {
txn.commit().await.unwrap();
//
// Has at least one partition with a recent write --> make it a candidate
let candidates = compactor.partitions_to_compact(1, 1).await.unwrap();
let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].partition_id, partition4.id);
@ -665,7 +665,7 @@ mod tests {
txn.commit().await.unwrap();
//
// make partitions in the most recent group candidates
let candidates = compactor.partitions_to_compact(1, 1).await.unwrap();
let candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].partition_id, partition3.id);
@ -686,7 +686,7 @@ mod tests {
txn.commit().await.unwrap();
//
// Will have 2 candidates, one for each sequencer
let mut candidates = compactor.partitions_to_compact(1, 1).await.unwrap();
let mut candidates = compactor.hot_partitions_to_compact(1, 1).await.unwrap();
candidates.sort();
assert_eq!(candidates.len(), 2);
assert_eq!(candidates[0].partition_id, partition3.id);

View File

@ -248,122 +248,130 @@ async fn run_compactor(compactor: Arc<Compactor>, shutdown: CancellationToken) {
while !shutdown.is_cancelled() {
debug!("compactor main loop tick.");
// Select partition candidates
let start_time = compactor.time_provider.now();
let candidates = Backoff::new(&compactor.backoff_config)
.retry_all_errors("partitions_to_compact", || async {
compactor
.partitions_to_compact(
compactor.config.max_number_partitions_per_sequencer(),
compactor
.config
.min_number_recent_ingested_files_per_partition(),
)
.await
})
.await
.expect("retry forever");
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.candidate_selection_duration
.recorder(Attributes::from([]));
duration.record(delta);
}
// Add other compaction-needed info into selected partitions
let start_time = compactor.time_provider.now();
let candidates = Backoff::new(&compactor.backoff_config)
.retry_all_errors("partitions_to_compact", || async {
compactor.add_info_to_partitions(&candidates).await
})
.await
.expect("retry forever");
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.partitions_extra_info_reading_duration
.recorder(Attributes::from([]));
duration.record(delta);
}
let n_candidates = candidates.len();
if n_candidates == 0 {
debug!("no compaction candidates found");
// sleep for a second to avoid a hot busy loop when the
// catalog is polled
tokio::time::sleep(PAUSE_BETWEEN_NO_WORK).await;
continue;
} else {
debug!(n_candidates, "found compaction candidates");
}
let start_time = compactor.time_provider.now();
// Repeat compacting n partitions in parallel until all candidates are compacted.
// Concurrency level calculation (this is estimated from previous experiments. The actual resource
// management will be more complicated and a future feature):
// . Each `compact partititon` takes max of this much memory input_size_threshold_bytes
// . We have this memory budget: max_concurrent_compaction_size_bytes
// --> num_parallel_partitions = max_concurrent_compaction_size_bytes/ input_size_threshold_bytes
let num_parallel_partitions = (compactor.config.max_concurrent_compaction_size_bytes
/ compactor.config.input_size_threshold_bytes)
as usize;
futures::stream::iter(candidates)
.map(|p| {
// run compaction in its own task
let comp = Arc::clone(&compactor);
tokio::task::spawn(async move {
let partition_id = p.candidate.partition_id;
let compaction_result = crate::compact_partition(&comp, p).await;
match compaction_result {
Err(e) => {
warn!(?e, ?partition_id, "compaction failed");
}
Ok(_) => {
debug!(?partition_id, "compaction complete");
}
};
})
})
// Assume we have enough resources to run
// num_parallel_partitions compactions in parallel
.buffer_unordered(num_parallel_partitions)
// report any JoinErrors (aka task panics)
.map(|join_result| {
if let Err(e) = join_result {
warn!(?e, "compaction task failed");
}
Ok(())
})
// Errors are reported during execution, so ignore results here
// https://stackoverflow.com/questions/64443085/how-to-run-stream-to-completion-in-rust-using-combinators-other-than-for-each
.forward(futures::sink::drain())
.await
.ok();
// Done compacting all candidates in the cycle, record its time
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.compaction_cycle_duration
.recorder(Attributes::from([]));
duration.record(delta);
}
compact_hot_partitions(Arc::clone(&compactor)).await;
compact_cold_partitions(Arc::clone(&compactor)).await;
}
}
async fn compact_hot_partitions(compactor: Arc<Compactor>) {
// Select hot partition candidates
let start_time = compactor.time_provider.now();
let candidates = Backoff::new(&compactor.backoff_config)
.retry_all_errors("hot_partitions_to_compact", || async {
compactor
.hot_partitions_to_compact(
compactor.config.max_number_partitions_per_sequencer(),
compactor
.config
.min_number_recent_ingested_files_per_partition(),
)
.await
})
.await
.expect("retry forever");
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.candidate_selection_duration
.recorder(Attributes::from([]));
duration.record(delta);
}
// Add other compaction-needed info into selected partitions
let start_time = compactor.time_provider.now();
let candidates = Backoff::new(&compactor.backoff_config)
.retry_all_errors("add_info_to_partitions", || async {
compactor.add_info_to_partitions(&candidates).await
})
.await
.expect("retry forever");
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.partitions_extra_info_reading_duration
.recorder(Attributes::from([]));
duration.record(delta);
}
let n_candidates = candidates.len();
if n_candidates == 0 {
debug!("no hot compaction candidates found");
// sleep for a second to avoid a hot busy loop when the
// catalog is polled
tokio::time::sleep(PAUSE_BETWEEN_NO_WORK).await;
return;
} else {
debug!(n_candidates, "found hot compaction candidates");
}
let start_time = compactor.time_provider.now();
// Repeat compacting n partitions in parallel until all candidates are compacted.
// Concurrency level calculation (this is estimated from previous experiments. The actual
// resource management will be more complicated and a future feature):
// . Each `compact partititon` takes max of this much memory input_size_threshold_bytes
// . We have this memory budget: max_concurrent_compaction_size_bytes
// --> num_parallel_partitions = max_concurrent_compaction_size_bytes/
// input_size_threshold_bytes
let num_parallel_partitions = (compactor.config.max_concurrent_compaction_size_bytes
/ compactor.config.input_size_threshold_bytes)
as usize;
futures::stream::iter(candidates)
.map(|p| {
// run compaction in its own task
let comp = Arc::clone(&compactor);
tokio::task::spawn(async move {
let partition_id = p.candidate.partition_id;
let compaction_result = crate::compact_partition(&comp, p).await;
match compaction_result {
Err(e) => {
warn!(?e, ?partition_id, "hot compaction failed");
}
Ok(_) => {
debug!(?partition_id, "hot compaction complete");
}
};
})
})
// Assume we have enough resources to run
// num_parallel_partitions compactions in parallel
.buffer_unordered(num_parallel_partitions)
// report any JoinErrors (aka task panics)
.map(|join_result| {
if let Err(e) = join_result {
warn!(?e, "compaction task failed");
}
Ok(())
})
// Errors are reported during execution, so ignore results here
// https://stackoverflow.com/questions/64443085/how-to-run-stream-to-completion-in-rust-using-combinators-other-than-for-each
.forward(futures::sink::drain())
.await
.ok();
// Done compacting all candidates in the cycle, record its time
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.compaction_cycle_duration
.recorder(Attributes::from([]));
duration.record(delta);
}
}
async fn compact_cold_partitions(_compactor: Arc<Compactor>) {}
#[async_trait]
impl CompactorHandler for CompactorHandlerImpl {
async fn join(&self) {

View File

@ -253,7 +253,7 @@ mod tests {
// ------------------------------------------------
// Compact
let candidates = compactor
.partitions_to_compact(
.hot_partitions_to_compact(
compactor.config.max_number_partitions_per_sequencer(),
compactor
.config