From b5f233f03752a4f68d09650e5a14430ba4ec69b6 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 29 Mar 2023 17:16:06 -0400 Subject: [PATCH] refactor: Move all partition filter creation into the function for that purpose --- compactor2/src/components/hardcoded.rs | 59 ++++++++++++++------------ 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index dff53b6d99..913a5985fa 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -93,28 +93,12 @@ use super::{ /// Get hardcoded components. pub fn hardcoded_components(config: &Config) -> Arc { - let mut partition_filters: Vec> = vec![]; - partition_filters.push(Arc::new(HasFilesPartitionFilter::new())); - if !config.ignore_partition_skip_marker { - partition_filters.push(Arc::new(NeverSkippedPartitionFilter::new( - CatalogSkippedCompactionsSource::new( - config.backoff_config.clone(), - Arc::clone(&config.catalog), - ), - ))); - } - partition_filters.push(Arc::new(MaxNumColumnsPartitionFilter::new( - config.max_num_columns_per_table, - ))); - partition_filters.append(&mut make_partition_filters(config)); - let scratchpad_store_output = if config.shadow_mode { Arc::new(IgnoreWrites::new(Arc::new(InMemory::new()))) } else { Arc::clone(config.parquet_store_real.object_store()) }; - let partition_continue_conditions = "continue_conditions"; let partition_resource_limit_conditions = "resource_limit_conditions"; let scratchpad_gen: Arc = if config.simulate_without_object_store { @@ -179,14 +163,7 @@ pub fn hardcoded_components(config: &Config) -> Arc { config.max_compact_size_bytes(), ), ))), - partition_filter: Arc::new(LoggingPartitionFilterWrapper::new( - MetricsPartitionFilterWrapper::new( - AndPartitionFilter::new(partition_filters), - &config.metric_registry, - partition_continue_conditions, - ), - partition_continue_conditions, - )), + partition_filter: make_partition_filter(config), partition_done_sink: Arc::new(LoggingPartitionDoneSinkWrapper::new( MetricsPartitionDoneSinkWrapper::new(partition_done_sink, &config.metric_registry), )), @@ -362,11 +339,27 @@ fn make_partition_stream( } // Conditions to compact this partittion -fn make_partition_filters(config: &Config) -> Vec> { - // (Has-L0) OR -- to avoid overlaped files +fn make_partition_filter(config: &Config) -> Arc { + let mut partition_filters: Vec> = Vec::with_capacity(8); + partition_filters.push(Arc::new(HasFilesPartitionFilter::new())); + + if !config.ignore_partition_skip_marker { + partition_filters.push(Arc::new(NeverSkippedPartitionFilter::new( + CatalogSkippedCompactionsSource::new( + config.backoff_config.clone(), + Arc::clone(&config.catalog), + ), + ))); + } + + partition_filters.push(Arc::new(MaxNumColumnsPartitionFilter::new( + config.max_num_columns_per_table, + ))); + + // (Has-L0) OR -- to avoid overlapped files // (num(L1) > N) OR -- to avoid many files // (total_size(L1) > max_desired_file_size) -- to avoid compact and than split - vec![Arc::new(OrPartitionFilter::new(vec![ + partition_filters.push(Arc::new(OrPartitionFilter::new(vec![ Arc::new(HasMatchingFilePartitionFilter::new( LevelRangeFileFilter::new(CompactionLevel::Initial..=CompactionLevel::Initial), )), @@ -382,5 +375,15 @@ fn make_partition_filters(config: &Config) -> Vec> { ), config.max_desired_file_size_bytes, )), - ]))] + ]))); + + let partition_continue_conditions = "continue_conditions"; + Arc::new(LoggingPartitionFilterWrapper::new( + MetricsPartitionFilterWrapper::new( + AndPartitionFilter::new(partition_filters), + &config.metric_registry, + partition_continue_conditions, + ), + partition_continue_conditions, + )) }