refactor: Inline run_split_plan in anticipation of combining more code

pull/24376/head
Carol (Nichols || Goulding) 2023-03-22 14:50:28 -04:00
parent 91ac9627fa
commit 3235be572d
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
1 changed files with 15 additions and 41 deletions

View File

@ -15,7 +15,7 @@ use crate::{
}, },
error::{DynError, ErrorKind, SimpleError}, error::{DynError, ErrorKind, SimpleError},
file_classification::{ file_classification::{
FileClassification, FileToSplit, FilesForProgress, FilesToSplitOrCompact, FileClassification, FilesForProgress, FilesToSplitOrCompact,
}, },
partition_info::PartitionInfo, partition_info::PartitionInfo,
PlanIR, PlanIR,
@ -344,40 +344,7 @@ async fn run_plans(
FilesToSplitOrCompact::Split(files) => { FilesToSplitOrCompact::Split(files) => {
let mut created_file_params = let mut created_file_params =
Vec::with_capacity(files.iter().map(|f| f.split_times.len() + 1).sum()); Vec::with_capacity(files.iter().map(|f| f.split_times.len() + 1).sum());
for file in files { for (file_to_split, file_inpad) in files.into_iter().zip(files_inpad.into_iter()) {
created_file_params.extend(
run_split_plan(
file,
partition_info,
components,
Arc::clone(&job_semaphore),
scratchpad_ctx,
)
.await?,
);
}
Ok(created_file_params)
}
FilesToSplitOrCompact::None => Ok(vec![]), // Nothing to do
}
}
// Split a given file into multiple files
async fn run_split_plan(
file_to_split: &FileToSplit,
partition_info: &Arc<PartitionInfo>,
components: &Arc<Components>,
job_semaphore: Arc<InstrumentedAsyncSemaphore>,
scratchpad_ctx: &mut dyn Scratchpad,
) -> Result<Vec<ParquetFileParams>, DynError> {
// stage files
let input_path = (&file_to_split.file).into();
let input_uuids_inpad = scratchpad_ctx.load_to_scratchpad(&[input_path]).await;
let file_inpad = ParquetFile {
object_store_id: input_uuids_inpad[0],
..file_to_split.file.clone()
};
// target level of a split file is the same as its level // target level of a split file is the same as its level
let target_level = file_to_split.file.compaction_level; let target_level = file_to_split.file.compaction_level;
@ -388,14 +355,21 @@ async fn run_split_plan(
target_level, target_level,
); );
created_file_params.extend(
execute_plan( execute_plan(
plan_ir, plan_ir,
partition_info, partition_info,
components, components,
target_level, target_level,
job_semaphore, Arc::clone(&job_semaphore),
) )
.await .await?,
);
}
Ok(created_file_params)
}
FilesToSplitOrCompact::None => Ok(vec![]), // Nothing to do
}
} }
async fn execute_plan( async fn execute_plan(