From 0cca97516782eff3935663276b59657573a434d3 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Wed, 29 Jun 2022 16:09:51 -0400 Subject: [PATCH] fix: Split overlapped files based on the order of sequence numbers and only group non-overlapped contigous small files (#4968) * fix: Split overlapped files based on the order of sequence numbers and only group non-overlapped contigous small files * test: add one more test for group contiguous files: * refactor: address review comments Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- compactor/src/compact.rs | 575 ++++++++++++++++++++++++++++++++------- compactor/src/utils.rs | 27 ++ 2 files changed, 509 insertions(+), 93 deletions(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 250f5334e2..b5d08c4ca7 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -464,7 +464,7 @@ impl Compactor { compact_and_upgrade.sequencer_id = Some(parquet_files[0].sequencer_id); // Group overlapped files - // Each group will be limited by thier size and number of files + // Each group will be limited by their size and number of files let overlapped_file_groups = Self::overlapped_groups( parquet_files, compaction_max_size_bytes, @@ -669,7 +669,8 @@ impl Compactor { let mut current_size = 0; let mut current_num_files = 0; for g in file_groups { - if current_size + g.total_file_size_bytes < compaction_max_size_bytes + if !g.overlapped_with_other_groups // only group non-overlapped ones + && current_size + g.total_file_size_bytes < compaction_max_size_bytes && current_num_files + g.parquet_files.len() < compaction_max_file_count.try_into().unwrap() { @@ -935,21 +936,106 @@ impl Compactor { } // Split overlapped groups into smaller groups if there are so mnay files in each group or - // their size are too large. The files are sorted by their min time before splitting so files - // are guarannteed to be overlapped in each new group + // their total size is too large. + // + // Invariants + // 1. Input `groups`, each contains overlapped files but the groups do not overlap with each other. + // As a result, a group can include files whose total size is larger than our maximum allocated memory. + // 2. Output will be in OverlappedGroups type that include 2 different groups of files: + // . internal_overlapped_groups: groups of the input that are neither contains too many files + // nor their total file size is too large. Each of these groups do not overlap with any other groups. + // . external_overlapped_groups: subgroups of splitting too-large-or-too-many-file input groups. + // Each of these subgroups will overlap with at least one of the other subgroups. However, to correclty + // deduplicate data, each subgroup only include contiguous sequence numbers. + // + // Example: + // The input `groups` consists of 4 groups of 9 chunks with time ranges as below. + // The chunks are named using their sequence number. `C4` means this chunk inlcudes + // data of sequence numner 4 (Note: a chunk can include data of a range of sequence numbers + // but to simplify the example, we make chunk contain only one sequence number) + // . G1: includes one chunk C4 + // . G2: inlcudes 4 overlapped C1, C2, C3, C6 + // . G3: inlcudes 3 overlapped C5, C6, C9 + // . G4: inlcudes C7 + // Time + // ────────────────────────────────────────────────────────────────────────────────────▶ + // + // ┌───────┐ ┌───────────┐ ┌───────┐ ┌────────┐ ┌──────────┐ + // │ C4 │ │ C1 │ │ C2 │ │ C5 │ │ C7 │ + // └───────┘ └───────────┘ └───────┘ └────────┘ └──────────┘ + // ┌───────┐ ┌────────────┐ + // │ C3 │ │ C8 │ + // └───────┘ └────────────┘ + // ┌─────────────┐ ┌───────────┐ + // │ C6 │ │ C9 │ + // └─────────────┘ └───────────┘ + // + // ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ + // G1[C4] G2[C1, C2, C3, C6] G3[C5, C8, C9] G4[C7] + // + // + // Since total size of files in G2 and G3 is over the limit `max_size_bytes`, they are split further, + // each into 2 groups. The output will inlcude 6 groups as follows: + // Time + // ────────────────────────────────────────────────────────────────────────────────────▶ + // + // ┌───────┐ ┌───────────┐ ┌───────┐ ┌────────┐ ┌──────────┐ + // │ C4 │ │ C1 │ │ C2 │ │ C5 │ │ C7 │ + // └───────┘ └───────────┘ └───────┘ └────────┘ └──────────┘ + // ┌───────┐ ┌────────────┐ + // │ C3 │ │ C8 │ + // └───────┘ └────────────┘ + // ┌─────────────┐ ┌───────────┐ + // │ C6 │ │ C9 │ + // └─────────────┘ └───────────┘ + // + // ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ + // G1[C4] G2[C1, C2]* G4[C5, C8]* G6[C7] + // G3[C3, C6]* G5[C9]* + // + // The newly split group are G2, G3, G4 and G5 and marked with a star (*) at the end. + // Before splitting, the chunks are sorted by their sequence numbers to guarantee their data + // will be deduplicated correctly in this and future compaction cycles. As a consequence, + // new group may inlcude non-overlapped chunks, e.g. G2. But this is not an issue becasue: + // . The compaction plan will discover whether the chunks overlap or not to avoid the + // actual deduplication work + // . We do want to compact/concat small continuous non-overlapped chunks (in later steps) so + // this can be considered as a part of that optimization fn split_overlapped_groups( groups: &mut Vec>, max_size_bytes: i64, max_file_count: i64, - ) -> Vec> { - let mut overlapped_groups: Vec> = Vec::with_capacity(groups.len() * 2); + ) -> OverlappedGroups { + let mut overlapped_groups = OverlappedGroups::new(groups.len() * 2); let max_count = max_file_count.try_into().unwrap(); for group in groups { let total_size_bytes: i64 = group.iter().map(|f| f.file_size_bytes).sum(); - if group.len() <= max_count && total_size_bytes <= max_size_bytes { - overlapped_groups.push(group.to_vec()); + if group.len() == 1 || (group.len() <= max_count && total_size_bytes <= max_size_bytes) + { + overlapped_groups.add_internal_overalapped_group(group.to_vec()); } else { - group.sort_by_key(|f| f.min_time); + // Sort overlapped files on their min sequence number to ensure their split subgroups + // contain contiguous sequnce numbers + group.sort_by_key(|f| f.min_sequence_number); + + // Verify that the sorted ranges of [min_sequence_number, max_sequence_number] do not overlap + for i in 1..group.len() { + if group[i - 1].max_sequence_number >= group[i].min_sequence_number { + panic!( + "Two files with overlapped range sequence numbers. \ + File id 1: {}, file id 2: {}, sequence number range 1: [{}, {}], \ + sequence number range 2: [{}, {}], partition id: {}", + group[i - 1].id, + group[i].id, + group[i - 1].min_sequence_number.get(), + group[i - 1].max_sequence_number.get(), + group[i].min_sequence_number.get(), + group[i].max_sequence_number.get(), + group[i].partition_id, + ) + } + } + while !group.is_empty() { // limit file num let mut count = max_count; @@ -973,7 +1059,7 @@ impl Compactor { count = group.len(); } let group_new = group.split_off(count); - overlapped_groups.push(group.to_vec()); + overlapped_groups.add_external_overalapped_group(group.to_vec()); *group = group_new; } } @@ -984,7 +1070,7 @@ impl Compactor { // Given a list of parquet files that come from the same Table Partition, group files together // if their (min_time, max_time) ranges overlap. Does not preserve or guarantee any ordering. - // If there are so mnay files in an overlapped group, the group will be split to ensure each + // If there are so many files in an overlapped group, the group will be split to ensure each // group contains limited number of files fn overlapped_groups( parquet_files: Vec, @@ -999,26 +1085,7 @@ impl Compactor { let overlapped_groups = Self::split_overlapped_groups(&mut overlapped_groups, max_size_bytes, max_file_count); - // Compute min time and total size for each overlapped group - let mut groups_with_min_time_and_size = Vec::with_capacity(overlapped_groups.len()); - for group in overlapped_groups { - let mut group_with_min_time_and_size = GroupWithMinTimeAndSize { - parquet_files: Vec::with_capacity(group.len()), - min_time: Timestamp::new(i64::MAX), - total_file_size_bytes: 0, - }; - - for file in group { - group_with_min_time_and_size.min_time = - group_with_min_time_and_size.min_time.min(file.min_time); - group_with_min_time_and_size.total_file_size_bytes += file.file_size_bytes; - group_with_min_time_and_size.parquet_files.push(file); - } - - groups_with_min_time_and_size.push(group_with_min_time_and_size); - } - - groups_with_min_time_and_size + overlapped_groups.groups_with_min_time_and_size() } // Compute time to split data @@ -1233,6 +1300,48 @@ impl Compactor { } } +#[derive(Debug, Clone, PartialEq)] +struct OverlappedGroups { + // Groups that contain overlapped files but the groups do not overlapp + // with other groups + internal_overlapped_groups: Vec>, + + // Groups that overlap with at least one of other groups + external_overlapped_groups: Vec>, +} + +impl OverlappedGroups { + pub fn new(len: usize) -> Self { + Self { + internal_overlapped_groups: Vec::with_capacity(len), + external_overlapped_groups: Vec::with_capacity(len), + } + } + + pub fn add_internal_overalapped_group(&mut self, group: Vec) { + self.internal_overlapped_groups.push(group); + } + + pub fn add_external_overalapped_group(&mut self, group: Vec) { + self.external_overlapped_groups.push(group); + } + + pub fn groups_with_min_time_and_size(self) -> Vec { + let mut groups = Vec::with_capacity( + self.internal_overlapped_groups.len() + self.external_overlapped_groups.len(), + ); + + for group in self.internal_overlapped_groups { + groups.push(GroupWithMinTimeAndSize::new(group, false)); + } + for group in self.external_overlapped_groups { + groups.push(GroupWithMinTimeAndSize::new(group, true)); + } + + groups + } +} + /// Summary information for a partition that is a candidate for compaction. #[derive(Debug, Clone, Eq, PartialEq)] #[allow(missing_copy_implementations)] // shouldn't be silently cloned @@ -2242,9 +2351,11 @@ mod tests { arbitrary_parquet_file_with_size(min_time, max_time, 100) } - fn arbitrary_parquet_file_with_size( + fn arbitrary_parquet_file_with_size_and_sequence_number( min_time: i64, max_time: i64, + min_seq: i64, + max_seq: i64, file_size_bytes: i64, ) -> ParquetFile { let id = NEXT_ID.fetch_add(1, Ordering::SeqCst); @@ -2255,8 +2366,8 @@ mod tests { table_id: TableId::new(0), partition_id: PartitionId::new(0), object_store_id: Uuid::new_v4(), - min_sequence_number: SequenceNumber::new(0), - max_sequence_number: SequenceNumber::new(1), + min_sequence_number: SequenceNumber::new(min_seq), + max_sequence_number: SequenceNumber::new(max_seq), min_time: Timestamp::new(min_time), max_time: Timestamp::new(max_time), to_delete: None, @@ -2268,6 +2379,20 @@ mod tests { } } + fn arbitrary_parquet_file_with_size( + min_time: i64, + max_time: i64, + file_size_bytes: i64, + ) -> ParquetFile { + arbitrary_parquet_file_with_size_and_sequence_number( + min_time, + max_time, + 0, + 1, + file_size_bytes, + ) + } + #[tokio::test] async fn test_sort_queryable_parquet_chunk() { let catalog = TestCatalog::new(); @@ -2534,11 +2659,13 @@ mod tests { parquet_files: vec![pf1.clone()], min_time: Timestamp::new(1), total_file_size_bytes: 100, + overlapped_with_other_groups: false, }; let g2 = GroupWithMinTimeAndSize { parquet_files: vec![pf2.clone()], min_time: Timestamp::new(3), total_file_size_bytes: 200, + overlapped_with_other_groups: false, }; // They should each be in their own group assert_eq!(overlapped_groups, vec![g1, g2]); @@ -2555,17 +2682,37 @@ mod tests { assert_eq!(groups, vec![vec![pf1, pf2]]); } + #[test] + #[should_panic] + fn test_split_overlapped_groups_panic() { + let max_size_bytes = 1000; + let max_file_count = 2; + + // 2 files, each smaller than max_size_bytes but total is larger --> will be split + // files have overlapped range of sequence numbers: [5, 11] and [7, 15] + let f1 = + arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 5, 11, max_size_bytes - 10); + let f2 = + arbitrary_parquet_file_with_size_and_sequence_number(7, 11, 7, 15, max_size_bytes - 10); + let mut groups = vec![vec![f1, f2]]; + + // will panic becasue of + Compactor::split_overlapped_groups(&mut groups, max_size_bytes, max_file_count); + } + // This is specific unit test for split_overlapped_groups but it is a subtest of test_limit_size_and_num_files // Keep all the variables the same names in both tests for us to follow them easily #[test] - fn test_split_overlapped_groups() { + fn test_split_overlapped_groups_sequence_number_same_order_with_time() { let compaction_max_size_bytes = 100000; // oldest overlapped and very small - let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 400); - let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 500); - let max_equals_min = arbitrary_parquet_file_with_size(3, 5, 400); - let min_equals_max = arbitrary_parquet_file_with_size(10, 12, 500); + let overlaps_many = arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 5, 5, 400); + let contained_completely_within = + arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 500); + let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number(3, 5, 3, 3, 400); + let min_equals_max = + arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 10, 10, 500); let oldest_overlapped_group = vec![ overlaps_many.clone(), contained_completely_within.clone(), @@ -2574,12 +2721,102 @@ mod tests { ]; // newest files and very large - let alone = arbitrary_parquet_file_with_size(30, 35, compaction_max_size_bytes + 200); // too large to group + let alone = arbitrary_parquet_file_with_size_and_sequence_number( + 30, + 35, + 30, + 30, + compaction_max_size_bytes + 200, + ); // too large to group let newest_overlapped_group = vec![alone.clone()]; // small files in the middle - let another = arbitrary_parquet_file_with_size(13, 15, 1000); - let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000); + let another = arbitrary_parquet_file_with_size_and_sequence_number(13, 15, 13, 13, 1000); + let partial_overlap = + arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 14, 14, 2000); + let middle_overlapped_group = vec![another.clone(), partial_overlap.clone()]; + + let mut overlapped_groups = vec![ + oldest_overlapped_group, + newest_overlapped_group, + middle_overlapped_group, + ]; + + let max_size_bytes = 1000; + let max_file_count = 2; + + // Three input groups but will produce 5 output ones becasue of limit in size and file count. + // Note that the 3 input groups each includes overlapped files but the groups do not overlap. + // The function split_overlapped_groups is to split each overlapped group. It does not merge any groups. + let groups = Compactor::split_overlapped_groups( + &mut overlapped_groups, + max_size_bytes, + max_file_count, + ); + + // one intact group + assert_eq!(groups.internal_overlapped_groups.len(), 1); + // four newly splitted groups from two large groups + assert_eq!(groups.external_overlapped_groups.len(), 4); + + // oldest_overlapped_group was split into external_overlapped_groups[0] and [1] due to file count limit + // the chunks split by their min_seq order + assert_eq!(groups.external_overlapped_groups[0].len(), 2); // reach limit file count + assert!(groups.external_overlapped_groups[0].contains(&max_equals_min)); // min_time = 3, min_seq = 3 + assert!(groups.external_overlapped_groups[0].contains(&overlaps_many)); // min_time = 5, min_seq = 5 + assert_eq!(groups.external_overlapped_groups[1].len(), 2); // reach limit file count + assert!(groups.external_overlapped_groups[1].contains(&contained_completely_within)); // min_time = 6, min_seq = 6 + assert!(groups.external_overlapped_groups[1].contains(&min_equals_max)); // min_time = 10, min_seq = 10 + + // newest_overlapped_group stays the same length one file and the corresponding output is + // groups.internal_overlapped_groups[0] + assert_eq!(groups.internal_overlapped_groups[0].len(), 1); // reach limit file size + assert!(groups.internal_overlapped_groups[0].contains(&alone)); + + // middle_overlapped_group was split into groups.external_overlapped_groups[2] and [3] due to size limit + // the chunks split by their min_seq order + assert_eq!(groups.external_overlapped_groups[2].len(), 1); // reach limit file size + assert!(groups.external_overlapped_groups[2].contains(&another)); // min_time = 13, min_seq = 13 + assert_eq!(groups.external_overlapped_groups[3].len(), 1); // reach limit file size + assert!(groups.external_overlapped_groups[3].contains(&partial_overlap)); + // min_time = 14, min_seq = 14 + } + + // This is specific unit test for split_overlapped_groups but it is a subtest of test_limit_size_and_num_files + // Keep all the variables the same names in both tests for us to follow them easily + #[test] + fn test_split_overlapped_groups_sequence_number_different_order_from_time() { + let compaction_max_size_bytes = 100000; + + // oldest overlapped and very small + let overlaps_many = + arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 10, 10, 400); + let contained_completely_within = + arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 500); + let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number(3, 5, 3, 3, 400); + let min_equals_max = + arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 5, 5, 500); + let oldest_overlapped_group = vec![ + overlaps_many.clone(), + contained_completely_within.clone(), + max_equals_min.clone(), + min_equals_max.clone(), + ]; + + // newest files and very large + let alone = arbitrary_parquet_file_with_size_and_sequence_number( + 30, + 35, + 30, + 30, + compaction_max_size_bytes + 200, + ); // too large to group + let newest_overlapped_group = vec![alone.clone()]; + + // small files in the middle + let another = arbitrary_parquet_file_with_size_and_sequence_number(13, 15, 14, 14, 1000); + let partial_overlap = + arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 13, 13, 2000); let middle_overlapped_group = vec![another.clone(), partial_overlap.clone()]; let mut overlapped_groups = vec![ @@ -2600,26 +2837,31 @@ mod tests { max_file_count, ); - // must be 5 - assert_eq!(groups.len(), 5); + // one intact group + assert_eq!(groups.internal_overlapped_groups.len(), 1); + // four newly splitted groups from two large groups + assert_eq!(groups.external_overlapped_groups.len(), 4); - // oldest_overlapped_group was split into groups[0] and groups[1] due to file count limit - assert_eq!(groups[0].len(), 2); // reach limit file count - assert!(groups[0].contains(&max_equals_min)); // min_time = 3 - assert!(groups[0].contains(&overlaps_many)); // min_time = 5 - assert_eq!(groups[1].len(), 2); // reach limit file count - assert!(groups[1].contains(&contained_completely_within)); // min_time = 6 - assert!(groups[1].contains(&min_equals_max)); // min_time = 10 + // oldest_overlapped_group was split into external_overlapped_groups[0] and [1] due to file count limit + // the chunks split by their min_seq order + assert_eq!(groups.external_overlapped_groups[0].len(), 2); // reach limit file count + assert!(groups.external_overlapped_groups[0].contains(&max_equals_min)); // min_time = 3, min_seq = 3 + assert!(groups.external_overlapped_groups[0].contains(&min_equals_max)); // min_time = 10, min_seq = 5 + assert_eq!(groups.external_overlapped_groups[1].len(), 2); // reach limit file count + assert!(groups.external_overlapped_groups[1].contains(&contained_completely_within)); // min_time = 6, min_seq = 6 + assert!(groups.external_overlapped_groups[1].contains(&overlaps_many)); // min_time = 5, min_seq = 10 - // newest_overlapped_group stays the same length one file and the corresponding output is groups[2] - assert_eq!(groups[2].len(), 1); // reach limit file size - assert!(groups[2].contains(&alone)); + // newest_overlapped_group stays the same length one file and the corresponding output is + // groups.internal_overlapped_groups[0] + assert_eq!(groups.internal_overlapped_groups[0].len(), 1); // reach limit file size + assert!(groups.internal_overlapped_groups[0].contains(&alone)); - // middle_overlapped_group was split into groups[3] and groups[4] due to size limit - assert_eq!(groups[3].len(), 1); // reach limit file size - assert!(groups[3].contains(&another)); // min_time = 13 - assert_eq!(groups[4].len(), 1); // reach limit file size - assert!(groups[4].contains(&partial_overlap)); // min_time = 14 + // middle_overlapped_group was split into groups.external_overlapped_groups[2] and [3] due to size limit + // the chunks split by their min_seq order + assert_eq!(groups.external_overlapped_groups[2].len(), 1); // reach limit file size + assert!(groups.external_overlapped_groups[2].contains(&partial_overlap)); // min_time = 14, min_seq = 13 + assert_eq!(groups.external_overlapped_groups[3].len(), 1); // reach limit file size + assert!(groups.external_overlapped_groups[3].contains(&another)); // min_time = 13, min_seq = 14 } // This tests @@ -2632,17 +2874,27 @@ mod tests { let compaction_max_size_bytes = 100000; // oldest overlapped and very small - let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 400); - let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 500); - let max_equals_min = arbitrary_parquet_file_with_size(3, 5, 400); - let min_equals_max = arbitrary_parquet_file_with_size(10, 12, 500); + let overlaps_many = + arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 10, 10, 400); + let contained_completely_within = + arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 500); + let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number(3, 5, 3, 3, 400); + let min_equals_max = + arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 5, 5, 500); // newest files and very large - let alone = arbitrary_parquet_file_with_size(30, 35, compaction_max_size_bytes + 200); // too large to group + let alone = arbitrary_parquet_file_with_size_and_sequence_number( + 30, + 35, + 30, + 30, + compaction_max_size_bytes + 200, + ); // too large to group // small files in the middle - let another = arbitrary_parquet_file_with_size(13, 15, 1000); - let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000); + let another = arbitrary_parquet_file_with_size_and_sequence_number(13, 15, 14, 14, 1000); + let partial_overlap = + arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 13, 13, 2000); // Given a bunch of files in an arbitrary order, let all = vec![ @@ -2661,14 +2913,31 @@ mod tests { let overlapped_groups = Compactor::overlapped_groups(all, max_size_bytes, max_file_count); // Must be 5 assert_eq!(overlapped_groups.len(), 5); - assert_eq!(overlapped_groups[0].parquet_files.len(), 2); // reach limit file count + // Non-overlapped with other groups always first + // Group 0: newest files and very large + assert_eq!(overlapped_groups[0].parquet_files.len(), 1); // reach limit file size + assert!(overlapped_groups[0].parquet_files.contains(&alone)); // min_time = 30 + // The rest of the groups, each overlapp with another group + // Groups 1 & 2: split from `oldest overlapped and very small` ordering by sequence numbers assert_eq!(overlapped_groups[1].parquet_files.len(), 2); // reach limit file count - assert_eq!(overlapped_groups[2].parquet_files.len(), 1); // reach limit file size + assert!(overlapped_groups[1].parquet_files.contains(&max_equals_min)); // min_time = 3, min_seq = 3 + assert!(overlapped_groups[1].parquet_files.contains(&min_equals_max)); // min_time = 10, min_seq = 5 + assert_eq!(overlapped_groups[2].parquet_files.len(), 2); // reach limit file count + assert!(overlapped_groups[2] + .parquet_files + .contains(&contained_completely_within)); // min_time = 6, min_seq = 6 + assert!(overlapped_groups[2].parquet_files.contains(&overlaps_many)); // min_time = 5, min_seq = 10 + // Groups 3 & 4: split from `small files in the middle` ordering by sequence numbers assert_eq!(overlapped_groups[3].parquet_files.len(), 1); // reach limit file size + assert!(overlapped_groups[3] + .parquet_files + .contains(&partial_overlap)); // min_time = 14, min_seq = 13 assert_eq!(overlapped_groups[4].parquet_files.len(), 1); // reach limit file size + assert!(overlapped_groups[4].parquet_files.contains(&another)); // min_time = 13, min_seq = 14 // Group further into group by size and file count limit - // Due to the merge with correct time range, this function has to sort the groups hence output data will be in time order + // Due to the merge with correct time range, this function has to sort the groups hence output data will be + // in time order between each group and in sequence number order within a each group let groups = Compactor::group_small_contiguous_groups( overlapped_groups, compaction_max_size_bytes, @@ -2679,21 +2948,21 @@ mod tests { assert_eq!(groups.len(), 5); assert_eq!(groups[0].len(), 2); // reach file num limit - assert!(groups[0].contains(&max_equals_min)); // min_time = 3 - assert!(groups[0].contains(&overlaps_many)); // min_time = 5 + assert!(groups[0].contains(&max_equals_min)); // min_time = 3, min_seq = 3 + assert!(groups[0].contains(&min_equals_max)); // min_time = 10, min_seq = 5 assert_eq!(groups[1].len(), 2); // reach file num limit - assert!(groups[1].contains(&contained_completely_within)); // min_time = 6 - assert!(groups[1].contains(&min_equals_max)); // min_time = 10 + assert!(groups[1].contains(&contained_completely_within)); // min_time = 6, min_seq = 6 + assert!(groups[1].contains(&overlaps_many)); // min_time = 5, min_seq = 10 assert_eq!(groups[2].len(), 1); // reach size limit - assert!(groups[2].contains(&another)); // min_time = 13 + assert!(groups[2].contains(&another)); // min_time = 13, min_seq = 14 assert_eq!(groups[3].len(), 1); // reach size limit - assert!(groups[3].contains(&partial_overlap)); // min_time = 14 + assert!(groups[3].contains(&partial_overlap)); // min_time = 14, min_seq = 13 assert_eq!(groups[4].len(), 1); // reach size limit - assert!(groups[4].contains(&alone)); + assert!(groups[4].contains(&alone)); // min_time=30 } // This tests @@ -2718,11 +2987,13 @@ mod tests { parquet_files: vec![pf1.clone()], min_time: Timestamp::new(1), total_file_size_bytes: 100, + overlapped_with_other_groups: false, }; let g2 = GroupWithMinTimeAndSize { parquet_files: vec![pf2.clone()], min_time: Timestamp::new(3), total_file_size_bytes: compaction_max_size_bytes, + overlapped_with_other_groups: false, }; // They should each be in their own group assert_eq!(overlapped_groups, vec![g1, g2]); @@ -2774,6 +3045,7 @@ mod tests { // Group into overlapped groups let overlapped_groups = Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT); + // There are 3 overlapped groups but each group does not overlap with the other assert_eq!(overlapped_groups.len(), 3); // group further into group by size @@ -2804,18 +3076,39 @@ mod tests { // 2. group_small_contiguous_groups that merges non-overlapped group into a larger one if they meet size and file limit #[test] fn test_group_small_contiguous_overlapped_groups_many_files_too_large() { - // oldest overlapped and large - let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 200); - let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 300); - let max_equals_min = arbitrary_parquet_file_with_size(3, 5, TEST_MAX_SIZE_BYTES + 400); // too large to group - let min_equals_max = arbitrary_parquet_file_with_size(10, 12, 500); + // oldest overlapped and very small + let overlaps_many = arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 5, 5, 200); + let contained_completely_within = + arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 300); + let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number( + 3, + 5, + 3, + 3, + TEST_MAX_SIZE_BYTES + 400, + ); // too large to group + let min_equals_max = + arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 10, 10, 500); - // newest files and large - let alone = arbitrary_parquet_file_with_size(30, 35, TEST_MAX_SIZE_BYTES); // too large to group + // newest files and small + let alone = arbitrary_parquet_file_with_size_and_sequence_number( + 30, + 35, + 30, + 30, + TEST_MAX_SIZE_BYTES, + ); // too large to group - // files in the middle and also large - let another = arbitrary_parquet_file_with_size(13, 15, TEST_MAX_SIZE_BYTES); // too large to group - let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000); + // large files in the middle + let another = arbitrary_parquet_file_with_size_and_sequence_number( + 13, + 15, + 13, + 13, + TEST_MAX_SIZE_BYTES, + ); // too large to group + let partial_overlap = + arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 14, 14, 2000); // Given a bunch of files in an arbitrary order let all = vec![ @@ -2863,17 +3156,26 @@ mod tests { #[test] fn test_group_small_contiguous_overlapped_groups_many_files_middle_too_large() { // oldest overlapped and very small - let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 200); - let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 300); - let max_equals_min = arbitrary_parquet_file_with_size(3, 5, 400); - let min_equals_max = arbitrary_parquet_file_with_size(10, 12, 500); + let overlaps_many = arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 5, 5, 200); + let contained_completely_within = + arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 300); + let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number(3, 5, 3, 3, 400); + let min_equals_max = + arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 10, 10, 500); // newest files and small - let alone = arbitrary_parquet_file_with_size(30, 35, 200); + let alone = arbitrary_parquet_file_with_size_and_sequence_number(30, 35, 30, 30, 200); // large files in the middle - let another = arbitrary_parquet_file_with_size(13, 15, TEST_MAX_SIZE_BYTES); // too large to group - let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000); + let another = arbitrary_parquet_file_with_size_and_sequence_number( + 13, + 15, + 13, + 13, + TEST_MAX_SIZE_BYTES, + ); // too large to group + let partial_overlap = + arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 14, 14, 2000); // Given a bunch of files in an arbitrary order let all = vec![ @@ -2915,6 +3217,93 @@ mod tests { assert!(groups[2].contains(&alone)); // min _time = 30 } + #[test] + fn test_group_small_contiguous_overlapped_groups_with_split_overlap_groups() { + // oldest overlapped + // These 4 overlapped files exceed max count --> will be split into 2 overlapped groups + let overlaps_many = arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 5, 5, 400); + let contained_completely_within = + arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 500); + let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number(3, 5, 3, 3, 400); + let min_equals_max = + arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 10, 10, 500); + + // newest files + // one and large file that does not overlapped with any other files + // --> will be in its own overlapped group after Compactor::overlapped_groups + // Since it is small --> it will be grouped with the middle_overlapped_group after Compactor::group_small_contiguous_groups + let alone = arbitrary_parquet_file_with_size_and_sequence_number(30, 35, 30, 30, 50); + + // very small files in the middle + // these 2 files overlap but small so no further split + let another = arbitrary_parquet_file_with_size_and_sequence_number(13, 15, 13, 13, 10); + let partial_overlap = + arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 14, 14, 20); + + // Given a bunch of files in an arbitrary order + let all = vec![ + min_equals_max.clone(), + overlaps_many.clone(), + alone.clone(), + another.clone(), + max_equals_min.clone(), + contained_completely_within.clone(), + partial_overlap.clone(), + ]; + + let max_size_bytes = 1000; + let max_file_count = 2; + + // Group into overlapped groups + let overlapped_groups = Compactor::overlapped_groups(all, max_size_bytes, max_file_count); + assert_eq!(overlapped_groups.len(), 4); + + // Must be 4 + assert_eq!(overlapped_groups.len(), 4); + // 2 groups that do not overlapped wih other groups are always first + // Group 0: `very small files in the middle` + assert_eq!(overlapped_groups[0].parquet_files.len(), 2); + assert!(overlapped_groups[0].parquet_files.contains(&another)); // min_time = 13, min_seq = 14 + assert!(overlapped_groups[0] + .parquet_files + .contains(&partial_overlap)); // min_time = 14, min_seq = 13 + // Group 1: `newest files` + assert_eq!(overlapped_groups[1].parquet_files.len(), 1); + assert!(overlapped_groups[1].parquet_files.contains(&alone)); // min_time = 30 + // Next 2 groups are split from `oldest overlapped` + assert_eq!(overlapped_groups[2].parquet_files.len(), 2); + assert!(overlapped_groups[2].parquet_files.contains(&max_equals_min)); // min_time = 3, min_seq = 3 + assert!(overlapped_groups[2].parquet_files.contains(&overlaps_many)); // min_time = 5, min_seq = 5 + assert_eq!(overlapped_groups[3].parquet_files.len(), 2); + assert!(overlapped_groups[3] + .parquet_files + .contains(&contained_completely_within)); // min_time = 6, min_seq = 6 + assert!(overlapped_groups[3].parquet_files.contains(&min_equals_max)); // min_time = 10, min_seq = 10 + + // 4 input groups but 3 output groups. Group 0 and Group 1 above are further grouped together + let groups = Compactor::group_small_contiguous_groups( + overlapped_groups, + max_size_bytes, + max_file_count + 5, // increase limit file count ot get them grouped + ); + assert_eq!(groups.len(), 3); + + // The output group is in time order + // groups[0] the same as overlapped_groups[2] + assert_eq!(groups[0].len(), 2); + assert!(groups[0].contains(&max_equals_min)); // min_time = 3, min_seq = 3 + assert!(groups[0].contains(&overlaps_many)); // min_time = 5, min_seq = 5 + // groups[1] the same as overlapped_groups[3] + assert_eq!(groups[1].len(), 2); + assert!(groups[1].contains(&contained_completely_within)); // min_time = 6, min_seq = 6 + assert!(groups[1].contains(&min_equals_max)); // min_time = 10, min_seq = 10 + // group[2] is the result of grouping overlapped_groups[0] and overlapped_groups[1] + assert_eq!(groups[2].len(), 3); + assert!(groups[2].contains(&another)); // min_time = 13, min_seq = 1 + assert!(groups[2].contains(&partial_overlap)); // min_time = 14, min_seq = 13 + assert!(groups[2].contains(&alone)); // min_time = 30 + } + #[tokio::test] async fn add_tombstones_to_parquet_files_in_groups() { let catalog = TestCatalog::new(); diff --git a/compactor/src/utils.rs b/compactor/src/utils.rs index 2921f18435..db8132ee0f 100644 --- a/compactor/src/utils.rs +++ b/compactor/src/utils.rs @@ -45,6 +45,33 @@ pub struct GroupWithMinTimeAndSize { /// total size of all file pub(crate) total_file_size_bytes: i64, + + /// true if this group was split from a group of many overlapped files + pub(crate) overlapped_with_other_groups: bool, +} + +impl GroupWithMinTimeAndSize { + /// Make GroupWithMinTimeAndSize for a given set of parquet files + pub fn new(files: Vec, overlaped: bool) -> Self { + let mut group = Self { + parquet_files: files, + min_time: Timestamp::new(i64::MAX), + total_file_size_bytes: 0, + overlapped_with_other_groups: overlaped, + }; + + assert!( + !group.parquet_files.is_empty(), + "invalid empty group for computing min time and total size" + ); + + for file in &group.parquet_files { + group.min_time = group.min_time.min(file.min_time); + group.total_file_size_bytes += file.file_size_bytes; + } + + group + } } /// Wrapper of a parquet file and its tombstones