From d9ca28e83aad6252bc7689b12c8671ba9e9d62d4 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 22 Jul 2022 09:17:54 -0400 Subject: [PATCH] fix: Compact one level 0 file to avoid getting stuck on it If compaction is called on one level 0 file, the work of compaction doesn't really need to be done, but for simplicity's sake for now, run it through the compaction query/write process rather than special casing it. This will keep us from getting stuck in the unlikely event that a partition with one level 0 file gets selected as a top candidate for compaction. --- compactor/src/parquet_file_combining.rs | 37 +++++++++++++++++++++---- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/compactor/src/parquet_file_combining.rs b/compactor/src/parquet_file_combining.rs index 7172c5a822..bbbe8a3370 100644 --- a/compactor/src/parquet_file_combining.rs +++ b/compactor/src/parquet_file_combining.rs @@ -95,7 +95,7 @@ pub(crate) async fn compact_parquet_files( let num_files = files.len(); ensure!( - num_files >= 2, + num_files > 0, NotEnoughParquetFilesSnafu { num_files, partition_id @@ -459,7 +459,7 @@ mod tests { } #[tokio::test] - async fn one_input_file_is_an_error() { + async fn one_input_file_gets_compacted() { test_helpers::maybe_start_logging(); let TestSetup { @@ -469,16 +469,41 @@ mod tests { .. } = test_setup().await; - let result = compact_parquet_files( - vec![parquet_files.pop().unwrap()], + let parquet_file = parquet_files.remove(0); + compact_parquet_files( + vec![parquet_file], &candidate_partition, Arc::clone(&catalog.catalog), ParquetStorage::new(Arc::clone(&catalog.object_store)), Arc::clone(&catalog.exec), Arc::clone(&catalog.time_provider) as Arc, ) - .await; - assert_error!(result, Error::NotEnoughParquetFiles { num_files: 1, .. }); + .await + .unwrap(); + + // Should have 5 non-soft-deleted files: + // + // - 2 initial level 0 files not compacted + // - 2 initial level 1 files not compacted + // - the 1 initial level 0 file that was "compacted" into 1 level 1 file + let files = catalog + .list_by_table_not_to_delete(candidate_partition.table.id) + .await; + assert_eq!(files.len(), 5); + let files_and_levels: Vec<_> = files + .into_iter() + .map(|f| (f.id.get(), f.compaction_level)) + .collect(); + assert_eq!( + files_and_levels, + vec![ + (1, CompactionLevel::FileNonOverlapped), + (2, CompactionLevel::Initial), + (4, CompactionLevel::FileNonOverlapped), + (5, CompactionLevel::Initial), + (6, CompactionLevel::FileNonOverlapped), + ] + ); } #[tokio::test]