refactor: Extract a shared function to retry fetching of compaction candidates

pull/24376/head
Carol (Nichols || Goulding) 2022-09-15 14:50:25 -04:00
parent 8e6d9f8af1
commit fa11031a36
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
3 changed files with 73 additions and 48 deletions

View File

@ -3,9 +3,8 @@
use crate::{
compact::Compactor, compact_candidates_with_memory_budget, compact_in_parallel,
parquet_file_combining, parquet_file_lookup,
parquet_file_combining, parquet_file_lookup, utils::get_candidates_with_retry,
};
use backoff::Backoff;
use data_types::CompactionLevel;
use metric::Attributes;
use observability_deps::tracing::*;
@ -16,28 +15,19 @@ use std::sync::Arc;
#[allow(dead_code)]
pub async fn compact(compactor: Arc<Compactor>, do_full_compact: bool) -> usize {
let compaction_type = "cold";
// Select cold partition candidates
debug!(compaction_type, "start collecting partitions to compact");
let attributes = Attributes::from(&[("partition_type", compaction_type)]);
let start_time = compactor.time_provider.now();
let candidates = Backoff::new(&compactor.backoff_config)
.retry_all_errors("cold_partitions_to_compact", || async {
compactor
.cold_partitions_to_compact(compactor.config.max_number_partitions_per_shard)
let candidates = get_candidates_with_retry(
Arc::clone(&compactor),
compaction_type,
|compactor_for_retry| async move {
compactor_for_retry
.cold_partitions_to_compact(
compactor_for_retry.config.max_number_partitions_per_shard,
)
.await
})
.await
.expect("retry forever");
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.candidate_selection_duration
.recorder(attributes.clone());
duration.record(delta);
}
},
)
.await;
let n_candidates = candidates.len();
if n_candidates == 0 {
@ -79,6 +69,7 @@ pub async fn compact(compactor: Arc<Compactor>, do_full_compact: bool) -> usize
.now()
.checked_duration_since(start_time)
{
let attributes = Attributes::from(&[("partition_type", compaction_type)]);
let duration = compactor.compaction_cycle_duration.recorder(attributes);
duration.record(delta);
}

View File

@ -1,7 +1,9 @@
//! Collect highest hot candidates and compact them
use crate::{compact::Compactor, compact_candidates_with_memory_budget, compact_in_parallel};
use backoff::Backoff;
use crate::{
compact::Compactor, compact_candidates_with_memory_budget, compact_in_parallel,
utils::get_candidates_with_retry,
};
use data_types::CompactionLevel;
use metric::Attributes;
use observability_deps::tracing::*;
@ -10,33 +12,22 @@ use std::sync::Arc;
/// Hot compaction. Returns the number of compacted partitions.
pub async fn compact(compactor: Arc<Compactor>) -> usize {
let compaction_type = "hot";
// Select hot partition candidates
debug!(compaction_type, "start collecting partitions to compact");
let attributes = Attributes::from(&[("partition_type", compaction_type)]);
let start_time = compactor.time_provider.now();
let candidates = Backoff::new(&compactor.backoff_config)
.retry_all_errors("hot_partitions_to_compact", || async {
compactor
let candidates = get_candidates_with_retry(
Arc::clone(&compactor),
compaction_type,
|compactor_for_retry| async move {
compactor_for_retry
.hot_partitions_to_compact(
compactor.config.max_number_partitions_per_shard,
compactor
compactor_for_retry.config.max_number_partitions_per_shard,
compactor_for_retry
.config
.min_number_recent_ingested_files_per_partition,
)
.await
})
.await
.expect("retry forever");
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor
.candidate_selection_duration
.recorder(attributes.clone());
duration.record(delta);
}
},
)
.await;
let n_candidates = candidates.len();
if n_candidates == 0 {
@ -64,6 +55,7 @@ pub async fn compact(compactor: Arc<Compactor>) -> usize {
.now()
.checked_duration_since(start_time)
{
let attributes = Attributes::from(&[("partition_type", compaction_type)]);
let duration = compactor.compaction_cycle_duration.recorder(attributes);
duration.record(delta);
}

View File

@ -1,10 +1,12 @@
//! Helpers of the Compactor
use crate::query::QueryableParquetChunk;
use crate::{compact, query::QueryableParquetChunk, PartitionCompactionCandidateWithInfo};
use backoff::Backoff;
use data_types::{
CompactionLevel, ParquetFile, ParquetFileId, TableSchema, Timestamp, TimestampMinMax,
Tombstone, TombstoneId,
};
use metric::Attributes;
use observability_deps::tracing::*;
use parquet_file::{chunk::ParquetChunk, storage::ParquetStorage};
use schema::{sort::SortKey, Schema};
@ -13,6 +15,46 @@ use std::{
sync::Arc,
};
/// Given a compactor, a string describing the compaction type for logging and metrics, and a
/// fallible function that returns compaction candidates, retry with backoff and return the
/// candidates. Record metrics on the compactor about how long it took to get the candidates.
pub(crate) async fn get_candidates_with_retry<Q, Fut>(
compactor: Arc<compact::Compactor>,
compaction_type: &'static str,
query_function: Q,
) -> Vec<Arc<PartitionCompactionCandidateWithInfo>>
where
Q: Fn(Arc<compact::Compactor>) -> Fut + Send + Sync + 'static,
Fut: futures::Future<
Output = Result<Vec<Arc<PartitionCompactionCandidateWithInfo>>, compact::Error>,
> + Send,
{
let backoff_task_name = format!("{compaction_type}_partitions_to_compact");
debug!(compaction_type, "start collecting partitions to compact");
let attributes = Attributes::from(&[("partition_type", compaction_type)]);
let start_time = compactor.time_provider.now();
let candidates = Backoff::new(&compactor.backoff_config)
.retry_all_errors(&backoff_task_name, || {
let compactor = Arc::clone(&compactor);
async { query_function(compactor).await }
})
.await
.expect("retry forever");
if let Some(delta) = compactor
.time_provider
.now()
.checked_duration_since(start_time)
{
let duration = compactor.candidate_selection_duration.recorder(attributes);
duration.record(delta);
}
candidates
}
/// Wrapper of group of parquet files with their min time and total size
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct GroupWithMinTimeAndSize {