fix: Make some hot compaction code more general/parameterized

pull/24376/head
Carol (Nichols || Goulding) 2022-09-08 11:21:14 -04:00
parent 2a5ef3058c
commit 608290b83d
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
1 changed files with 15 additions and 9 deletions

View File

@ -15,9 +15,13 @@ use std::sync::Arc;
/// Hot compaction. Returns the number of compacted partitions.
pub async fn compact(compactor: Arc<Compactor>) -> usize {
let compaction_type = "hot";
// Select hot partition candidates
debug!("start collecting hot partitions to compact");
let hot_attributes = Attributes::from(&[("partition_type", "hot")]);
debug!(
compaction_type,
"start collecting partitions to compact"
);
let attributes = Attributes::from(&[("partition_type", compaction_type)]);
let start_time = compactor.time_provider.now();
let candidates = Backoff::new(&compactor.backoff_config)
.retry_all_errors("hot_partitions_to_compact", || async {
@ -39,7 +43,7 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
{
let duration = compactor
.candidate_selection_duration
.recorder(hot_attributes.clone());
.recorder(attributes.clone());
duration.record(delta);
}
@ -49,6 +53,7 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
// Column types and their counts of the tables of the partition candidates
debug!(
num_candidates=?candidates.len(),
compaction_type,
"start getting column types for the partition candidates"
);
let table_columns = Backoff::new(&compactor.backoff_config)
@ -61,7 +66,8 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
// Add other compaction-needed info into selected partitions
debug!(
num_candidates=?candidates.len(),
"start getting column types for the partition candidates"
compaction_type,
"start getting additional info for the partition candidates"
);
let candidates = Backoff::new(&compactor.backoff_config)
.retry_all_errors("add_info_to_partitions", || async {
@ -77,23 +83,23 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
{
let duration = compactor
.partitions_extra_info_reading_duration
.recorder(hot_attributes.clone());
.recorder(attributes.clone());
duration.record(delta);
}
let n_candidates = candidates.len();
if n_candidates == 0 {
debug!("no hot compaction candidates found");
debug!(compaction_type, "no compaction candidates found");
return 0;
} else {
debug!(n_candidates, "found hot compaction candidates");
debug!(n_candidates, compaction_type, "found compaction candidates");
}
let start_time = compactor.time_provider.now();
compact_candidates_with_memory_budget(
Arc::clone(&compactor),
"hot",
compaction_type,
|compactor: Arc<Compactor>,
partition: PartitionCompactionCandidateWithInfo,
parquet_files_for_compaction: ParquetFilesForCompaction,
@ -120,7 +126,7 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
.now()
.checked_duration_since(start_time)
{
let duration = compactor.compaction_cycle_duration.recorder(hot_attributes);
let duration = compactor.compaction_cycle_duration.recorder(attributes);
duration.record(delta);
}