refactor: Move all partition filter creation into the function for that purpose
parent
b9727d2e17
commit
b5f233f037
|
@ -93,28 +93,12 @@ use super::{
|
|||
|
||||
/// Get hardcoded components.
|
||||
pub fn hardcoded_components(config: &Config) -> Arc<Components> {
|
||||
let mut partition_filters: Vec<Arc<dyn PartitionFilter>> = vec![];
|
||||
partition_filters.push(Arc::new(HasFilesPartitionFilter::new()));
|
||||
if !config.ignore_partition_skip_marker {
|
||||
partition_filters.push(Arc::new(NeverSkippedPartitionFilter::new(
|
||||
CatalogSkippedCompactionsSource::new(
|
||||
config.backoff_config.clone(),
|
||||
Arc::clone(&config.catalog),
|
||||
),
|
||||
)));
|
||||
}
|
||||
partition_filters.push(Arc::new(MaxNumColumnsPartitionFilter::new(
|
||||
config.max_num_columns_per_table,
|
||||
)));
|
||||
partition_filters.append(&mut make_partition_filters(config));
|
||||
|
||||
let scratchpad_store_output = if config.shadow_mode {
|
||||
Arc::new(IgnoreWrites::new(Arc::new(InMemory::new())))
|
||||
} else {
|
||||
Arc::clone(config.parquet_store_real.object_store())
|
||||
};
|
||||
|
||||
let partition_continue_conditions = "continue_conditions";
|
||||
let partition_resource_limit_conditions = "resource_limit_conditions";
|
||||
|
||||
let scratchpad_gen: Arc<dyn ScratchpadGen> = if config.simulate_without_object_store {
|
||||
|
@ -179,14 +163,7 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
|
|||
config.max_compact_size_bytes(),
|
||||
),
|
||||
))),
|
||||
partition_filter: Arc::new(LoggingPartitionFilterWrapper::new(
|
||||
MetricsPartitionFilterWrapper::new(
|
||||
AndPartitionFilter::new(partition_filters),
|
||||
&config.metric_registry,
|
||||
partition_continue_conditions,
|
||||
),
|
||||
partition_continue_conditions,
|
||||
)),
|
||||
partition_filter: make_partition_filter(config),
|
||||
partition_done_sink: Arc::new(LoggingPartitionDoneSinkWrapper::new(
|
||||
MetricsPartitionDoneSinkWrapper::new(partition_done_sink, &config.metric_registry),
|
||||
)),
|
||||
|
@ -362,11 +339,27 @@ fn make_partition_stream(
|
|||
}
|
||||
|
||||
// Conditions to compact this partittion
|
||||
fn make_partition_filters(config: &Config) -> Vec<Arc<dyn PartitionFilter>> {
|
||||
// (Has-L0) OR -- to avoid overlaped files
|
||||
fn make_partition_filter(config: &Config) -> Arc<dyn PartitionFilter> {
|
||||
let mut partition_filters: Vec<Arc<dyn PartitionFilter>> = Vec::with_capacity(8);
|
||||
partition_filters.push(Arc::new(HasFilesPartitionFilter::new()));
|
||||
|
||||
if !config.ignore_partition_skip_marker {
|
||||
partition_filters.push(Arc::new(NeverSkippedPartitionFilter::new(
|
||||
CatalogSkippedCompactionsSource::new(
|
||||
config.backoff_config.clone(),
|
||||
Arc::clone(&config.catalog),
|
||||
),
|
||||
)));
|
||||
}
|
||||
|
||||
partition_filters.push(Arc::new(MaxNumColumnsPartitionFilter::new(
|
||||
config.max_num_columns_per_table,
|
||||
)));
|
||||
|
||||
// (Has-L0) OR -- to avoid overlapped files
|
||||
// (num(L1) > N) OR -- to avoid many files
|
||||
// (total_size(L1) > max_desired_file_size) -- to avoid compact and than split
|
||||
vec![Arc::new(OrPartitionFilter::new(vec![
|
||||
partition_filters.push(Arc::new(OrPartitionFilter::new(vec![
|
||||
Arc::new(HasMatchingFilePartitionFilter::new(
|
||||
LevelRangeFileFilter::new(CompactionLevel::Initial..=CompactionLevel::Initial),
|
||||
)),
|
||||
|
@ -382,5 +375,15 @@ fn make_partition_filters(config: &Config) -> Vec<Arc<dyn PartitionFilter>> {
|
|||
),
|
||||
config.max_desired_file_size_bytes,
|
||||
)),
|
||||
]))]
|
||||
])));
|
||||
|
||||
let partition_continue_conditions = "continue_conditions";
|
||||
Arc::new(LoggingPartitionFilterWrapper::new(
|
||||
MetricsPartitionFilterWrapper::new(
|
||||
AndPartitionFilter::new(partition_filters),
|
||||
&config.metric_registry,
|
||||
partition_continue_conditions,
|
||||
),
|
||||
partition_continue_conditions,
|
||||
))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue