refactor: Share execute_plan calls between compact and split paths

pull/24376/head
Carol (Nichols || Goulding) 2023-03-22 15:17:40 -04:00
parent 3235be572d
commit e2dcf0b5e9
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
1 changed files with 48 additions and 41 deletions

View File

@ -14,9 +14,7 @@ use crate::{
Components,
},
error::{DynError, ErrorKind, SimpleError},
file_classification::{
FileClassification, FilesForProgress, FilesToSplitOrCompact,
},
file_classification::{FileClassification, FilesForProgress, FilesToSplitOrCompact},
partition_info::PartitionInfo,
PlanIR,
};
@ -324,52 +322,61 @@ async fn run_plans(
})
.collect();
match split_or_compact {
FilesToSplitOrCompact::Compact(_files) => {
let plan_ir = components.ir_planner.compact_plan(
files_inpad,
Arc::clone(partition_info),
target_level,
);
let capacity = match split_or_compact {
FilesToSplitOrCompact::Compact(files) => files.len(),
FilesToSplitOrCompact::Split(files) => files.iter().map(|f| f.split_times.len() + 1).sum(),
FilesToSplitOrCompact::None => 0,
};
let mut created_file_params = Vec::with_capacity(capacity);
let plans_and_target_levels = match split_or_compact {
FilesToSplitOrCompact::Compact(_files) => {
vec![(
components.ir_planner.compact_plan(
files_inpad,
Arc::clone(partition_info),
target_level,
),
target_level,
)]
}
FilesToSplitOrCompact::Split(files) => {
files
.into_iter()
.zip(files_inpad.into_iter())
.map(|(file_to_split, file_inpad)| {
// target level of a split file is the same as its level
let target_level = file_to_split.file.compaction_level;
(
components.ir_planner.split_plan(
file_inpad,
file_to_split.split_times.clone(),
Arc::clone(partition_info),
target_level,
),
target_level,
)
})
.collect()
}
FilesToSplitOrCompact::None => vec![], // Nothing to do
};
for (plan_ir, target_level) in plans_and_target_levels {
created_file_params.extend(
execute_plan(
plan_ir,
partition_info,
components,
target_level,
job_semaphore,
Arc::clone(&job_semaphore),
)
.await
}
FilesToSplitOrCompact::Split(files) => {
let mut created_file_params =
Vec::with_capacity(files.iter().map(|f| f.split_times.len() + 1).sum());
for (file_to_split, file_inpad) in files.into_iter().zip(files_inpad.into_iter()) {
// target level of a split file is the same as its level
let target_level = file_to_split.file.compaction_level;
let plan_ir = components.ir_planner.split_plan(
file_inpad,
file_to_split.split_times.clone(),
Arc::clone(partition_info),
target_level,
);
created_file_params.extend(
execute_plan(
plan_ir,
partition_info,
components,
target_level,
Arc::clone(&job_semaphore),
)
.await?,
);
}
Ok(created_file_params)
}
FilesToSplitOrCompact::None => Ok(vec![]), // Nothing to do
.await?,
)
}
Ok(created_file_params)
}
async fn execute_plan(