refactor: Extract function for creating the round info source

pull/24376/head
Carol (Nichols || Goulding) 2023-03-29 17:24:02 -04:00
parent 3ce062fd2e
commit 682ed14b9e
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
1 changed files with 12 additions and 7 deletions

View File

@ -80,7 +80,7 @@ use super::{
logging::LoggingPostClassificationFilterWrapper,
metrics::MetricsPostClassificationFilterWrapper, possible_progress::PossibleProgressFilter,
},
round_info_source::{LevelBasedRoundInfo, LoggingRoundInfoWrapper},
round_info_source::{LevelBasedRoundInfo, LoggingRoundInfoWrapper, RoundInfoSource},
round_split::many_files::ManyFilesRoundSplit,
scratchpad::{noop::NoopScratchpadGen, prod::ProdScratchpadGen, ScratchpadGen},
skipped_compactions_source::catalog::CatalogSkippedCompactionsSource,
@ -141,12 +141,7 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
partition_stream: make_partition_stream(config, partitions_source),
partition_info_source: make_partition_info_source(config),
partition_files_source: make_partition_files_source(config),
round_info_source: Arc::new(LoggingRoundInfoWrapper::new(Arc::new(
LevelBasedRoundInfo::new(
config.max_num_files_per_plan,
config.max_compact_size_bytes(),
),
))),
round_info_source: make_round_info_source(config),
partition_filter: make_partition_filter(config),
partition_done_sink: Arc::new(LoggingPartitionDoneSinkWrapper::new(
MetricsPartitionDoneSinkWrapper::new(partition_done_sink, &config.metric_registry),
@ -339,6 +334,16 @@ fn make_partition_files_source(config: &Config) -> Arc<dyn PartitionFilesSource>
Arc::clone(&config.catalog),
))
}
fn make_round_info_source(config: &Config) -> Arc<dyn RoundInfoSource> {
Arc::new(LoggingRoundInfoWrapper::new(Arc::new(
LevelBasedRoundInfo::new(
config.max_num_files_per_plan,
config.max_compact_size_bytes(),
),
)))
}
// Conditions to compact this partittion
fn make_partition_filter(config: &Config) -> Arc<dyn PartitionFilter> {
let mut partition_filters: Vec<Arc<dyn PartitionFilter>> = Vec::with_capacity(8);