diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index 666f3503d0..ffae2517d6 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -14,9 +14,7 @@ use crate::{ Components, }, error::{DynError, ErrorKind, SimpleError}, - file_classification::{ - FileClassification, FilesForProgress, FilesToSplitOrCompact, - }, + file_classification::{FileClassification, FilesForProgress, FilesToSplitOrCompact}, partition_info::PartitionInfo, PlanIR, }; @@ -324,52 +322,61 @@ async fn run_plans( }) .collect(); - match split_or_compact { - FilesToSplitOrCompact::Compact(_files) => { - let plan_ir = components.ir_planner.compact_plan( - files_inpad, - Arc::clone(partition_info), - target_level, - ); + let capacity = match split_or_compact { + FilesToSplitOrCompact::Compact(files) => files.len(), + FilesToSplitOrCompact::Split(files) => files.iter().map(|f| f.split_times.len() + 1).sum(), + FilesToSplitOrCompact::None => 0, + }; + let mut created_file_params = Vec::with_capacity(capacity); + let plans_and_target_levels = match split_or_compact { + FilesToSplitOrCompact::Compact(_files) => { + vec![( + components.ir_planner.compact_plan( + files_inpad, + Arc::clone(partition_info), + target_level, + ), + target_level, + )] + } + FilesToSplitOrCompact::Split(files) => { + files + .into_iter() + .zip(files_inpad.into_iter()) + .map(|(file_to_split, file_inpad)| { + // target level of a split file is the same as its level + let target_level = file_to_split.file.compaction_level; + + ( + components.ir_planner.split_plan( + file_inpad, + file_to_split.split_times.clone(), + Arc::clone(partition_info), + target_level, + ), + target_level, + ) + }) + .collect() + } + FilesToSplitOrCompact::None => vec![], // Nothing to do + }; + + for (plan_ir, target_level) in plans_and_target_levels { + created_file_params.extend( execute_plan( plan_ir, partition_info, components, target_level, - job_semaphore, + Arc::clone(&job_semaphore), ) - .await - } - FilesToSplitOrCompact::Split(files) => { - let mut created_file_params = - Vec::with_capacity(files.iter().map(|f| f.split_times.len() + 1).sum()); - for (file_to_split, file_inpad) in files.into_iter().zip(files_inpad.into_iter()) { - // target level of a split file is the same as its level - let target_level = file_to_split.file.compaction_level; - - let plan_ir = components.ir_planner.split_plan( - file_inpad, - file_to_split.split_times.clone(), - Arc::clone(partition_info), - target_level, - ); - - created_file_params.extend( - execute_plan( - plan_ir, - partition_info, - components, - target_level, - Arc::clone(&job_semaphore), - ) - .await?, - ); - } - Ok(created_file_params) - } - FilesToSplitOrCompact::None => Ok(vec![]), // Nothing to do + .await?, + ) } + + Ok(created_file_params) } async fn execute_plan(