diff --git a/compactor/src/components/round_info_source/mod.rs b/compactor/src/components/round_info_source/mod.rs index a1e5b96548..9260e1e4fc 100644 --- a/compactor/src/components/round_info_source/mod.rs +++ b/compactor/src/components/round_info_source/mod.rs @@ -4,7 +4,9 @@ use std::{ sync::Arc, }; -use crate::components::split_or_compact::start_level_files_to_split::split_into_chains; +use crate::components::{ + split_or_compact::start_level_files_to_split::split_into_chains, Components, +}; use async_trait::async_trait; use data_types::{CompactionLevel, ParquetFile, Timestamp}; use itertools::Itertools; @@ -17,9 +19,10 @@ use crate::{error::DynError, PartitionInfo, RoundInfo}; pub trait RoundInfoSource: Debug + Display + Send + Sync { async fn calculate( &self, + components: Arc, partition_info: &PartitionInfo, - files: &[ParquetFile], - ) -> Result; + files: Vec, + ) -> Result<(RoundInfo, Vec>, Vec), DynError>; } #[derive(Debug)] @@ -43,12 +46,16 @@ impl Display for LoggingRoundInfoWrapper { impl RoundInfoSource for LoggingRoundInfoWrapper { async fn calculate( &self, + components: Arc, partition_info: &PartitionInfo, - files: &[ParquetFile], - ) -> Result { - let res = self.inner.calculate(partition_info, files).await; - if let Ok(round_info) = &res { - debug!(round_info_source=%self.inner, %round_info, "running round"); + files: Vec, + ) -> Result<(RoundInfo, Vec>, Vec), DynError> { + let res = self + .inner + .calculate(components, partition_info, files) + .await; + if let Ok((round_info, branches, files_later)) = &res { + debug!(round_info_source=%self.inner, %round_info, branches=branches.len(), files_later=files_later.len(), "running round"); } res } @@ -169,25 +176,32 @@ impl LevelBasedRoundInfo { impl RoundInfoSource for LevelBasedRoundInfo { async fn calculate( &self, + components: Arc, _partition_info: &PartitionInfo, - files: &[ParquetFile], - ) -> Result { + files: Vec, + ) -> Result<(RoundInfo, Vec>, Vec), DynError> { let start_level = get_start_level( - files, + &files, self.max_num_files_per_plan, self.max_total_file_size_per_plan, ); - if self.too_many_small_files_to_compact(files, start_level) { - return Ok(RoundInfo::ManySmallFiles { + let round_info = if self.too_many_small_files_to_compact(&files, start_level) { + RoundInfo::ManySmallFiles { start_level, max_num_files_to_group: self.max_num_files_per_plan, max_total_file_size_to_group: self.max_total_file_size_per_plan, - }); - } + } + } else { + let target_level = pick_level(&files); + RoundInfo::TargetLevel { target_level } + }; - let target_level = pick_level(files); - Ok(RoundInfo::TargetLevel { target_level }) + let (files_now, files_later) = components.round_split.split(files, round_info); + + let branches = components.divide_initial.divide(files_now, round_info); + + Ok((round_info, branches, files_later)) } } diff --git a/compactor/src/driver.rs b/compactor/src/driver.rs index 0aa975c44e..2862e88e45 100644 --- a/compactor/src/driver.rs +++ b/compactor/src/driver.rs @@ -230,11 +230,6 @@ async fn try_compact_partition( return Ok(()); } - let round_info = components - .round_info_source - .calculate(&partition_info, &files) - .await?; - // This is the stop condition which will be different for different version of compaction // and describe where the filter is created at version_specific_partition_filters function if !components @@ -245,13 +240,14 @@ async fn try_compact_partition( return Ok(()); } - let (files_now, files_later) = components.round_split.split(files, round_info); - - // Each branch must not overlap with each other - let branches = components - .divide_initial - .divide(files_now, round_info) - .into_iter(); + let (round_info, branches, files_later) = components + .round_info_source + .calculate( + Arc::::clone(&components), + &partition_info, + files, + ) + .await?; files = files_later;