From a0890bf8d33a989378d98477b1bcb5317221ff74 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 30 Mar 2023 10:58:27 -0400 Subject: [PATCH] refactor: Extract a function for repeated code to get and save parquet file state --- compactor2/src/driver.rs | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index cf264cbb86..8ea00f9242 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -222,8 +222,8 @@ async fn try_compact_partition( // Keep the current state as a check to make sure this is the only compactor modifying this partition's // files. Check that the catalog state matches this before committing and, if it doesn't match, throw away // the compaction work we've done. - let catalog_files = components.partition_files_source.fetch(partition_id).await; - let saved_parquet_file_state = SavedParquetFileState::from(catalog_files.iter()); + let saved_parquet_file_state = + fetch_and_save_parquet_file_state(&components, partition_id).await; let input_paths: Vec = branch.iter().map(ParquetFilePath::from).collect(); @@ -416,6 +416,14 @@ async fn upload_files_to_object_store( .collect() } +async fn fetch_and_save_parquet_file_state( + components: &Components, + partition_id: PartitionId, +) -> SavedParquetFileState { + let catalog_files = components.partition_files_source.fetch(partition_id).await; + SavedParquetFileState::from(&catalog_files) +} + /// Update the catalog to create, soft delete and upgrade corresponding given input /// to provided target level /// Return created and upgraded files @@ -428,8 +436,8 @@ async fn update_catalog( file_params_to_create: Vec, target_level: CompactionLevel, ) -> Result<(Vec, Vec), DynError> { - let catalog_files = components.partition_files_source.fetch(partition_id).await; - let current_parquet_file_state = SavedParquetFileState::from(&catalog_files); + let current_parquet_file_state = + fetch_and_save_parquet_file_state(&components, partition_id).await; if saved_parquet_file_state != current_parquet_file_state { // Someone else has changed the files in the catalog since we started compacting; throw away our work and