refactor: Extract function for creating partition files source

pull/24376/head
Carol (Nichols || Goulding) 2023-03-29 17:22:31 -04:00
parent 338ca030ab
commit 3ce062fd2e
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
1 changed files with 8 additions and 5 deletions

View File

@ -51,7 +51,7 @@ use super::{
logging::LoggingPartitionDoneSinkWrapper, metrics::MetricsPartitionDoneSinkWrapper,
mock::MockPartitionDoneSink, PartitionDoneSink,
},
partition_files_source::catalog::CatalogPartitionFilesSource,
partition_files_source::{catalog::CatalogPartitionFilesSource, PartitionFilesSource},
partition_filter::{
and::AndPartitionFilter, greater_matching_files::GreaterMatchingFilesPartitionFilter,
greater_size_matching_files::GreaterSizeMatchingFilesPartitionFilter,
@ -140,10 +140,7 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
Arc::new(Components {
partition_stream: make_partition_stream(config, partitions_source),
partition_info_source: make_partition_info_source(config),
partition_files_source: Arc::new(CatalogPartitionFilesSource::new(
config.backoff_config.clone(),
Arc::clone(&config.catalog),
)),
partition_files_source: make_partition_files_source(config),
round_info_source: Arc::new(LoggingRoundInfoWrapper::new(Arc::new(
LevelBasedRoundInfo::new(
config.max_num_files_per_plan,
@ -336,6 +333,12 @@ fn make_partition_info_source(config: &Config) -> Arc<dyn PartitionInfoSource> {
))
}
fn make_partition_files_source(config: &Config) -> Arc<dyn PartitionFilesSource> {
Arc::new(CatalogPartitionFilesSource::new(
config.backoff_config.clone(),
Arc::clone(&config.catalog),
))
}
// Conditions to compact this partittion
fn make_partition_filter(config: &Config) -> Arc<dyn PartitionFilter> {
let mut partition_filters: Vec<Arc<dyn PartitionFilter>> = Vec::with_capacity(8);