From d8b74f6af8cb3304b39fbdbf54f217a743e65a97 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Tue, 5 Jul 2022 14:53:52 -0400 Subject: [PATCH] refactor: convert a panic into an error and throw a warning if we choose non-actionable compacting candidates (#5041) * refactor: convert a panic into an error and throw a warning if we choose non-actionable candidates * chore: Apply suggestions from code review Co-authored-by: Andrew Lamb * chore: run fmt Co-authored-by: Andrew Lamb --- compactor/src/compact.rs | 110 +++++++++++++++++++++++++-------------- compactor/src/handler.rs | 5 ++ 2 files changed, 77 insertions(+), 38 deletions(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 0e4ac5a513..f5426900ea 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -135,6 +135,28 @@ pub enum Error { Persist { source: parquet_file::storage::UploadError, }, + + #[snafu(display( + "Two time-overlapped files with overlapped range sequence numbers. \ + File id 1: {}, file id 2: {}, sequence number range 1: [{}, {}], \ + sequence number range 2: [{}, {}], partition id: {}", + file_id_1, + file_id_2, + min_seq_1, + max_seq_1, + min_seq_2, + max_seq_2, + partition_id, + ))] + OverlapTimeAndSequenceNumber { + file_id_1: ParquetFileId, + file_id_2: ParquetFileId, + min_seq_1: i64, + max_seq_1: i64, + min_seq_2: i64, + max_seq_2: i64, + partition_id: PartitionId, + }, } /// A specialized `Error` for Compactor Data errors @@ -467,7 +489,7 @@ impl Compactor { parquet_files, compaction_max_size_bytes, compaction_max_file_count, - ); + )?; // Group time-contiguous non-overlapped groups if their total size is smaller than a threshold // If their @@ -1004,7 +1026,7 @@ impl Compactor { groups: &mut Vec>, max_size_bytes: i64, max_file_count: i64, - ) -> OverlappedGroups { + ) -> Result { let mut overlapped_groups = OverlappedGroups::new(groups.len() * 2); let max_count = max_file_count.try_into().unwrap(); for group in groups { @@ -1027,7 +1049,7 @@ impl Compactor { // Points 1 and 2 together will lead to many non-time-overlapped files with same sequence number ranges // can end up in the same time-overlapped group for i in 1..group.len() { - Self::verify_contiguous_files(&group[i - 1], &group[i]); + Self::verify_contiguous_files(&group[i - 1], &group[i])? } while !group.is_empty() { @@ -1059,13 +1081,13 @@ impl Compactor { } } - overlapped_groups + Ok(overlapped_groups) } // Panic if the two given time are both time-overlapped and sequence-number-overlapped // . file_1 and file_2 must belong to the same partition // . file_1 must have range sequence number <= file_s' range sequence number - fn verify_contiguous_files(file_1: &ParquetFile, file_2: &ParquetFile) { + fn verify_contiguous_files(file_1: &ParquetFile, file_2: &ParquetFile) -> Result<()> { assert!(file_1.partition_id == file_2.partition_id); assert!(file_1.min_sequence_number <= file_2.min_sequence_number); @@ -1074,19 +1096,18 @@ impl Compactor { || (file_2.min_time >= file_1.min_time && file_2.min_time <= file_1.max_time)) && (file_1.max_sequence_number >= file_2.min_sequence_number) { - panic!( - "Two time-overlapped files with overlapped range sequence numbers. \ - File id 1: {}, file id 2: {}, sequence number range 1: [{}, {}], \ - sequence number range 2: [{}, {}], partition id: {}", - file_1.id, - file_2.id, - file_1.min_sequence_number.get(), - file_1.max_sequence_number.get(), - file_2.min_sequence_number.get(), - file_2.max_sequence_number.get(), - file_1.partition_id, - ) + return Err(crate::compact::Error::OverlapTimeAndSequenceNumber { + file_id_1: file_1.id, + file_id_2: file_2.id, + min_seq_1: file_1.min_sequence_number.get(), + max_seq_1: file_1.max_sequence_number.get(), + min_seq_2: file_2.min_sequence_number.get(), + max_seq_2: file_2.max_sequence_number.get(), + partition_id: file_1.partition_id, + }); } + + Ok(()) } // Given a list of parquet files that come from the same Table Partition, group files together @@ -1097,16 +1118,16 @@ impl Compactor { parquet_files: Vec, max_size_bytes: i64, max_file_count: i64, - ) -> Vec { + ) -> Result> { // group overlap files let mut overlapped_groups = group_potential_duplicates(parquet_files).expect("Error grouping overlapped chunks"); // split overlapped groups into smaller groups if they include so many files let overlapped_groups = - Self::split_overlapped_groups(&mut overlapped_groups, max_size_bytes, max_file_count); + Self::split_overlapped_groups(&mut overlapped_groups, max_size_bytes, max_file_count)?; - overlapped_groups.groups_with_min_time_and_size() + Ok(overlapped_groups.groups_with_min_time_and_size()) } // Compute time to split data @@ -2564,7 +2585,8 @@ mod tests { vec![pf1.clone(), pf2.clone()], TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT, - ); + ) + .unwrap(); // They should be 2 groups assert_eq!(groups.len(), 2, "There should have been two group"); @@ -2583,7 +2605,8 @@ mod tests { vec![pf1.clone(), pf2.clone()], TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT, - ); + ) + .unwrap(); // They should be in one group (order not guaranteed) assert_eq!(groups.len(), 1, "There should have only been one group"); @@ -2622,7 +2645,7 @@ mod tests { ]; let mut groups = - Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT); + Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap(); dbg!(&groups); assert_eq!(groups.len(), 3); @@ -2687,7 +2710,8 @@ mod tests { vec![pf1.clone(), pf2.clone()], TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT, - ); + ) + .unwrap(); // 2 overlapped groups assert_eq!(overlapped_groups.len(), 2); let g1 = GroupWithMinTimeAndSize { @@ -2718,7 +2742,6 @@ mod tests { } #[test] - #[should_panic] fn test_split_overlapped_groups_panic() { let max_size_bytes = 1000; let max_file_count = 2; @@ -2731,8 +2754,12 @@ mod tests { arbitrary_parquet_file_with_size_and_sequence_number(7, 11, 7, 15, max_size_bytes - 10); let mut groups = vec![vec![f1, f2]]; - // will panic becasue of - Compactor::split_overlapped_groups(&mut groups, max_size_bytes, max_file_count); + // should error + let err = Compactor::split_overlapped_groups(&mut groups, max_size_bytes, max_file_count) + .unwrap_err(); + assert!(err + .to_string() + .contains("Two time-overlapped files with overlapped range sequence numbers")) } // test of `https://github.com/influxdata/conductor/issues/1009` @@ -2752,7 +2779,7 @@ mod tests { let mut groups = vec![vec![f1, f2, f3]]; // should not panic - Compactor::split_overlapped_groups(&mut groups, max_size_bytes, max_file_count); + Compactor::split_overlapped_groups(&mut groups, max_size_bytes, max_file_count).unwrap(); } // This is specific unit test for split_overlapped_groups but it is a subtest of test_limit_size_and_num_files @@ -2807,7 +2834,8 @@ mod tests { &mut overlapped_groups, max_size_bytes, max_file_count, - ); + ) + .unwrap(); // one intact group assert_eq!(groups.internal_overlapped_groups.len(), 1); @@ -2890,7 +2918,8 @@ mod tests { &mut overlapped_groups, max_size_bytes, max_file_count, - ); + ) + .unwrap(); // one intact group assert_eq!(groups.internal_overlapped_groups.len(), 1); @@ -2965,7 +2994,8 @@ mod tests { // Group into overlapped groups let max_size_bytes = 1000; let max_file_count = 2; - let overlapped_groups = Compactor::overlapped_groups(all, max_size_bytes, max_file_count); + let overlapped_groups = + Compactor::overlapped_groups(all, max_size_bytes, max_file_count).unwrap(); // Must be 5 assert_eq!(overlapped_groups.len(), 5); // Non-overlapped with other groups always first @@ -3035,7 +3065,8 @@ mod tests { vec![pf1.clone(), pf2.clone()], TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT, - ); + ) + .unwrap(); // 2 overlapped groups assert_eq!(overlapped_groups.len(), 2); let g1 = GroupWithMinTimeAndSize { @@ -3099,7 +3130,7 @@ mod tests { // Group into overlapped groups let overlapped_groups = - Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT); + Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap(); // There are 3 overlapped groups but each group does not overlap with the other assert_eq!(overlapped_groups.len(), 3); @@ -3178,7 +3209,7 @@ mod tests { // Group into overlapped groups let overlapped_groups = - Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT); + Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap(); assert_eq!(overlapped_groups.len(), 5); // 5 input groups and 5 output groups because they are too large to group further @@ -3245,7 +3276,7 @@ mod tests { // Group into overlapped groups let overlapped_groups = - Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT); + Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap(); assert_eq!(overlapped_groups.len(), 4); // 4 input groups but 3 output groups @@ -3310,7 +3341,8 @@ mod tests { let max_file_count = 2; // Group into overlapped groups - let overlapped_groups = Compactor::overlapped_groups(all, max_size_bytes, max_file_count); + let overlapped_groups = + Compactor::overlapped_groups(all, max_size_bytes, max_file_count).unwrap(); assert_eq!(overlapped_groups.len(), 4); // Must be 4 @@ -3550,7 +3582,8 @@ mod tests { // Given a bunch of files in a particular order to exercise the algorithm: let all = vec![one, two, three]; - let groups = Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT); + let groups = + Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap(); dbg!(&groups); // All should be in the same group. @@ -3564,7 +3597,8 @@ mod tests { // Given a bunch of files in a particular order to exercise the algorithm: let all = vec![one, two, three, four]; - let groups = Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT); + let groups = + Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap(); dbg!(&groups); // All should be in the same group. diff --git a/compactor/src/handler.rs b/compactor/src/handler.rs index eb7e2d01d0..99aeb48b9c 100644 --- a/compactor/src/handler.rs +++ b/compactor/src/handler.rs @@ -243,6 +243,11 @@ async fn run_compactor(compactor: Arc, shutdown: CancellationToken) { debug!(%max_size, %used_size, n_compactions=%handles.len(), "reached maximum concurrent compaction size limit"); break; } + } else { + // All candidates should be compactable (have files to compact and/or upgrade) + // Reaching here means we do not choose the right candidates and + // it would be a waste of time to repeat this cycle + warn!("The candidate partition {} has no files to be either compacted or upgraded", c.candidate.partition_id); } } }