refactor: Extract a function for repeated code to get and save parquet file state
parent
03e5b1ac6a
commit
a0890bf8d3
|
@ -222,8 +222,8 @@ async fn try_compact_partition(
|
|||
// Keep the current state as a check to make sure this is the only compactor modifying this partition's
|
||||
// files. Check that the catalog state matches this before committing and, if it doesn't match, throw away
|
||||
// the compaction work we've done.
|
||||
let catalog_files = components.partition_files_source.fetch(partition_id).await;
|
||||
let saved_parquet_file_state = SavedParquetFileState::from(catalog_files.iter());
|
||||
let saved_parquet_file_state =
|
||||
fetch_and_save_parquet_file_state(&components, partition_id).await;
|
||||
|
||||
let input_paths: Vec<ParquetFilePath> =
|
||||
branch.iter().map(ParquetFilePath::from).collect();
|
||||
|
@ -416,6 +416,14 @@ async fn upload_files_to_object_store(
|
|||
.collect()
|
||||
}
|
||||
|
||||
async fn fetch_and_save_parquet_file_state(
|
||||
components: &Components,
|
||||
partition_id: PartitionId,
|
||||
) -> SavedParquetFileState {
|
||||
let catalog_files = components.partition_files_source.fetch(partition_id).await;
|
||||
SavedParquetFileState::from(&catalog_files)
|
||||
}
|
||||
|
||||
/// Update the catalog to create, soft delete and upgrade corresponding given input
|
||||
/// to provided target level
|
||||
/// Return created and upgraded files
|
||||
|
@ -428,8 +436,8 @@ async fn update_catalog(
|
|||
file_params_to_create: Vec<ParquetFileParams>,
|
||||
target_level: CompactionLevel,
|
||||
) -> Result<(Vec<ParquetFile>, Vec<ParquetFile>), DynError> {
|
||||
let catalog_files = components.partition_files_source.fetch(partition_id).await;
|
||||
let current_parquet_file_state = SavedParquetFileState::from(&catalog_files);
|
||||
let current_parquet_file_state =
|
||||
fetch_and_save_parquet_file_state(&components, partition_id).await;
|
||||
|
||||
if saved_parquet_file_state != current_parquet_file_state {
|
||||
// Someone else has changed the files in the catalog since we started compacting; throw away our work and
|
||||
|
|
Loading…
Reference in New Issue