fix: Split overlapped files based on the order of sequence numbers and only group non-overlapped contigous small files (#4968)

* fix: Split overlapped files based on the order of sequence numbers and only group non-overlapped contigous small files

* test: add one more test for group contiguous files:

* refactor: address review comments

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Nga Tran 2022-06-29 16:09:51 -04:00 committed by GitHub
parent bacd2ea470
commit 0cca975167
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 509 additions and 93 deletions

View File

@ -464,7 +464,7 @@ impl Compactor {
compact_and_upgrade.sequencer_id = Some(parquet_files[0].sequencer_id);
// Group overlapped files
// Each group will be limited by thier size and number of files
// Each group will be limited by their size and number of files
let overlapped_file_groups = Self::overlapped_groups(
parquet_files,
compaction_max_size_bytes,
@ -669,7 +669,8 @@ impl Compactor {
let mut current_size = 0;
let mut current_num_files = 0;
for g in file_groups {
if current_size + g.total_file_size_bytes < compaction_max_size_bytes
if !g.overlapped_with_other_groups // only group non-overlapped ones
&& current_size + g.total_file_size_bytes < compaction_max_size_bytes
&& current_num_files + g.parquet_files.len()
< compaction_max_file_count.try_into().unwrap()
{
@ -935,21 +936,106 @@ impl Compactor {
}
// Split overlapped groups into smaller groups if there are so mnay files in each group or
// their size are too large. The files are sorted by their min time before splitting so files
// are guarannteed to be overlapped in each new group
// their total size is too large.
//
// Invariants
// 1. Input `groups`, each contains overlapped files but the groups do not overlap with each other.
// As a result, a group can include files whose total size is larger than our maximum allocated memory.
// 2. Output will be in OverlappedGroups type that include 2 different groups of files:
// . internal_overlapped_groups: groups of the input that are neither contains too many files
// nor their total file size is too large. Each of these groups do not overlap with any other groups.
// . external_overlapped_groups: subgroups of splitting too-large-or-too-many-file input groups.
// Each of these subgroups will overlap with at least one of the other subgroups. However, to correclty
// deduplicate data, each subgroup only include contiguous sequence numbers.
//
// Example:
// The input `groups` consists of 4 groups of 9 chunks with time ranges as below.
// The chunks are named using their sequence number. `C4` means this chunk inlcudes
// data of sequence numner 4 (Note: a chunk can include data of a range of sequence numbers
// but to simplify the example, we make chunk contain only one sequence number)
// . G1: includes one chunk C4
// . G2: inlcudes 4 overlapped C1, C2, C3, C6
// . G3: inlcudes 3 overlapped C5, C6, C9
// . G4: inlcudes C7
// Time
// ────────────────────────────────────────────────────────────────────────────────────▶
//
// ┌───────┐ ┌───────────┐ ┌───────┐ ┌────────┐ ┌──────────┐
// │ C4 │ │ C1 │ │ C2 │ │ C5 │ │ C7 │
// └───────┘ └───────────┘ └───────┘ └────────┘ └──────────┘
// ┌───────┐ ┌────────────┐
// │ C3 │ │ C8 │
// └───────┘ └────────────┘
// ┌─────────────┐ ┌───────────┐
// │ C6 │ │ C9 │
// └─────────────┘ └───────────┘
//
// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
// G1[C4] G2[C1, C2, C3, C6] G3[C5, C8, C9] G4[C7]
//
//
// Since total size of files in G2 and G3 is over the limit `max_size_bytes`, they are split further,
// each into 2 groups. The output will inlcude 6 groups as follows:
// Time
// ────────────────────────────────────────────────────────────────────────────────────▶
//
// ┌───────┐ ┌───────────┐ ┌───────┐ ┌────────┐ ┌──────────┐
// │ C4 │ │ C1 │ │ C2 │ │ C5 │ │ C7 │
// └───────┘ └───────────┘ └───────┘ └────────┘ └──────────┘
// ┌───────┐ ┌────────────┐
// │ C3 │ │ C8 │
// └───────┘ └────────────┘
// ┌─────────────┐ ┌───────────┐
// │ C6 │ │ C9 │
// └─────────────┘ └───────────┘
//
// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
// G1[C4] G2[C1, C2]* G4[C5, C8]* G6[C7]
// G3[C3, C6]* G5[C9]*
//
// The newly split group are G2, G3, G4 and G5 and marked with a star (*) at the end.
// Before splitting, the chunks are sorted by their sequence numbers to guarantee their data
// will be deduplicated correctly in this and future compaction cycles. As a consequence,
// new group may inlcude non-overlapped chunks, e.g. G2. But this is not an issue becasue:
// . The compaction plan will discover whether the chunks overlap or not to avoid the
// actual deduplication work
// . We do want to compact/concat small continuous non-overlapped chunks (in later steps) so
// this can be considered as a part of that optimization
fn split_overlapped_groups(
groups: &mut Vec<Vec<ParquetFile>>,
max_size_bytes: i64,
max_file_count: i64,
) -> Vec<Vec<ParquetFile>> {
let mut overlapped_groups: Vec<Vec<ParquetFile>> = Vec::with_capacity(groups.len() * 2);
) -> OverlappedGroups {
let mut overlapped_groups = OverlappedGroups::new(groups.len() * 2);
let max_count = max_file_count.try_into().unwrap();
for group in groups {
let total_size_bytes: i64 = group.iter().map(|f| f.file_size_bytes).sum();
if group.len() <= max_count && total_size_bytes <= max_size_bytes {
overlapped_groups.push(group.to_vec());
if group.len() == 1 || (group.len() <= max_count && total_size_bytes <= max_size_bytes)
{
overlapped_groups.add_internal_overalapped_group(group.to_vec());
} else {
group.sort_by_key(|f| f.min_time);
// Sort overlapped files on their min sequence number to ensure their split subgroups
// contain contiguous sequnce numbers
group.sort_by_key(|f| f.min_sequence_number);
// Verify that the sorted ranges of [min_sequence_number, max_sequence_number] do not overlap
for i in 1..group.len() {
if group[i - 1].max_sequence_number >= group[i].min_sequence_number {
panic!(
"Two files with overlapped range sequence numbers. \
File id 1: {}, file id 2: {}, sequence number range 1: [{}, {}], \
sequence number range 2: [{}, {}], partition id: {}",
group[i - 1].id,
group[i].id,
group[i - 1].min_sequence_number.get(),
group[i - 1].max_sequence_number.get(),
group[i].min_sequence_number.get(),
group[i].max_sequence_number.get(),
group[i].partition_id,
)
}
}
while !group.is_empty() {
// limit file num
let mut count = max_count;
@ -973,7 +1059,7 @@ impl Compactor {
count = group.len();
}
let group_new = group.split_off(count);
overlapped_groups.push(group.to_vec());
overlapped_groups.add_external_overalapped_group(group.to_vec());
*group = group_new;
}
}
@ -984,7 +1070,7 @@ impl Compactor {
// Given a list of parquet files that come from the same Table Partition, group files together
// if their (min_time, max_time) ranges overlap. Does not preserve or guarantee any ordering.
// If there are so mnay files in an overlapped group, the group will be split to ensure each
// If there are so many files in an overlapped group, the group will be split to ensure each
// group contains limited number of files
fn overlapped_groups(
parquet_files: Vec<ParquetFile>,
@ -999,26 +1085,7 @@ impl Compactor {
let overlapped_groups =
Self::split_overlapped_groups(&mut overlapped_groups, max_size_bytes, max_file_count);
// Compute min time and total size for each overlapped group
let mut groups_with_min_time_and_size = Vec::with_capacity(overlapped_groups.len());
for group in overlapped_groups {
let mut group_with_min_time_and_size = GroupWithMinTimeAndSize {
parquet_files: Vec::with_capacity(group.len()),
min_time: Timestamp::new(i64::MAX),
total_file_size_bytes: 0,
};
for file in group {
group_with_min_time_and_size.min_time =
group_with_min_time_and_size.min_time.min(file.min_time);
group_with_min_time_and_size.total_file_size_bytes += file.file_size_bytes;
group_with_min_time_and_size.parquet_files.push(file);
}
groups_with_min_time_and_size.push(group_with_min_time_and_size);
}
groups_with_min_time_and_size
overlapped_groups.groups_with_min_time_and_size()
}
// Compute time to split data
@ -1233,6 +1300,48 @@ impl Compactor {
}
}
#[derive(Debug, Clone, PartialEq)]
struct OverlappedGroups {
// Groups that contain overlapped files but the groups do not overlapp
// with other groups
internal_overlapped_groups: Vec<Vec<ParquetFile>>,
// Groups that overlap with at least one of other groups
external_overlapped_groups: Vec<Vec<ParquetFile>>,
}
impl OverlappedGroups {
pub fn new(len: usize) -> Self {
Self {
internal_overlapped_groups: Vec::with_capacity(len),
external_overlapped_groups: Vec::with_capacity(len),
}
}
pub fn add_internal_overalapped_group(&mut self, group: Vec<ParquetFile>) {
self.internal_overlapped_groups.push(group);
}
pub fn add_external_overalapped_group(&mut self, group: Vec<ParquetFile>) {
self.external_overlapped_groups.push(group);
}
pub fn groups_with_min_time_and_size(self) -> Vec<GroupWithMinTimeAndSize> {
let mut groups = Vec::with_capacity(
self.internal_overlapped_groups.len() + self.external_overlapped_groups.len(),
);
for group in self.internal_overlapped_groups {
groups.push(GroupWithMinTimeAndSize::new(group, false));
}
for group in self.external_overlapped_groups {
groups.push(GroupWithMinTimeAndSize::new(group, true));
}
groups
}
}
/// Summary information for a partition that is a candidate for compaction.
#[derive(Debug, Clone, Eq, PartialEq)]
#[allow(missing_copy_implementations)] // shouldn't be silently cloned
@ -2242,9 +2351,11 @@ mod tests {
arbitrary_parquet_file_with_size(min_time, max_time, 100)
}
fn arbitrary_parquet_file_with_size(
fn arbitrary_parquet_file_with_size_and_sequence_number(
min_time: i64,
max_time: i64,
min_seq: i64,
max_seq: i64,
file_size_bytes: i64,
) -> ParquetFile {
let id = NEXT_ID.fetch_add(1, Ordering::SeqCst);
@ -2255,8 +2366,8 @@ mod tests {
table_id: TableId::new(0),
partition_id: PartitionId::new(0),
object_store_id: Uuid::new_v4(),
min_sequence_number: SequenceNumber::new(0),
max_sequence_number: SequenceNumber::new(1),
min_sequence_number: SequenceNumber::new(min_seq),
max_sequence_number: SequenceNumber::new(max_seq),
min_time: Timestamp::new(min_time),
max_time: Timestamp::new(max_time),
to_delete: None,
@ -2268,6 +2379,20 @@ mod tests {
}
}
fn arbitrary_parquet_file_with_size(
min_time: i64,
max_time: i64,
file_size_bytes: i64,
) -> ParquetFile {
arbitrary_parquet_file_with_size_and_sequence_number(
min_time,
max_time,
0,
1,
file_size_bytes,
)
}
#[tokio::test]
async fn test_sort_queryable_parquet_chunk() {
let catalog = TestCatalog::new();
@ -2534,11 +2659,13 @@ mod tests {
parquet_files: vec![pf1.clone()],
min_time: Timestamp::new(1),
total_file_size_bytes: 100,
overlapped_with_other_groups: false,
};
let g2 = GroupWithMinTimeAndSize {
parquet_files: vec![pf2.clone()],
min_time: Timestamp::new(3),
total_file_size_bytes: 200,
overlapped_with_other_groups: false,
};
// They should each be in their own group
assert_eq!(overlapped_groups, vec![g1, g2]);
@ -2555,17 +2682,37 @@ mod tests {
assert_eq!(groups, vec![vec![pf1, pf2]]);
}
#[test]
#[should_panic]
fn test_split_overlapped_groups_panic() {
let max_size_bytes = 1000;
let max_file_count = 2;
// 2 files, each smaller than max_size_bytes but total is larger --> will be split
// files have overlapped range of sequence numbers: [5, 11] and [7, 15]
let f1 =
arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 5, 11, max_size_bytes - 10);
let f2 =
arbitrary_parquet_file_with_size_and_sequence_number(7, 11, 7, 15, max_size_bytes - 10);
let mut groups = vec![vec![f1, f2]];
// will panic becasue of
Compactor::split_overlapped_groups(&mut groups, max_size_bytes, max_file_count);
}
// This is specific unit test for split_overlapped_groups but it is a subtest of test_limit_size_and_num_files
// Keep all the variables the same names in both tests for us to follow them easily
#[test]
fn test_split_overlapped_groups() {
fn test_split_overlapped_groups_sequence_number_same_order_with_time() {
let compaction_max_size_bytes = 100000;
// oldest overlapped and very small
let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 400);
let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 500);
let max_equals_min = arbitrary_parquet_file_with_size(3, 5, 400);
let min_equals_max = arbitrary_parquet_file_with_size(10, 12, 500);
let overlaps_many = arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 5, 5, 400);
let contained_completely_within =
arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 500);
let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number(3, 5, 3, 3, 400);
let min_equals_max =
arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 10, 10, 500);
let oldest_overlapped_group = vec![
overlaps_many.clone(),
contained_completely_within.clone(),
@ -2574,12 +2721,102 @@ mod tests {
];
// newest files and very large
let alone = arbitrary_parquet_file_with_size(30, 35, compaction_max_size_bytes + 200); // too large to group
let alone = arbitrary_parquet_file_with_size_and_sequence_number(
30,
35,
30,
30,
compaction_max_size_bytes + 200,
); // too large to group
let newest_overlapped_group = vec![alone.clone()];
// small files in the middle
let another = arbitrary_parquet_file_with_size(13, 15, 1000);
let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000);
let another = arbitrary_parquet_file_with_size_and_sequence_number(13, 15, 13, 13, 1000);
let partial_overlap =
arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 14, 14, 2000);
let middle_overlapped_group = vec![another.clone(), partial_overlap.clone()];
let mut overlapped_groups = vec![
oldest_overlapped_group,
newest_overlapped_group,
middle_overlapped_group,
];
let max_size_bytes = 1000;
let max_file_count = 2;
// Three input groups but will produce 5 output ones becasue of limit in size and file count.
// Note that the 3 input groups each includes overlapped files but the groups do not overlap.
// The function split_overlapped_groups is to split each overlapped group. It does not merge any groups.
let groups = Compactor::split_overlapped_groups(
&mut overlapped_groups,
max_size_bytes,
max_file_count,
);
// one intact group
assert_eq!(groups.internal_overlapped_groups.len(), 1);
// four newly splitted groups from two large groups
assert_eq!(groups.external_overlapped_groups.len(), 4);
// oldest_overlapped_group was split into external_overlapped_groups[0] and [1] due to file count limit
// the chunks split by their min_seq order
assert_eq!(groups.external_overlapped_groups[0].len(), 2); // reach limit file count
assert!(groups.external_overlapped_groups[0].contains(&max_equals_min)); // min_time = 3, min_seq = 3
assert!(groups.external_overlapped_groups[0].contains(&overlaps_many)); // min_time = 5, min_seq = 5
assert_eq!(groups.external_overlapped_groups[1].len(), 2); // reach limit file count
assert!(groups.external_overlapped_groups[1].contains(&contained_completely_within)); // min_time = 6, min_seq = 6
assert!(groups.external_overlapped_groups[1].contains(&min_equals_max)); // min_time = 10, min_seq = 10
// newest_overlapped_group stays the same length one file and the corresponding output is
// groups.internal_overlapped_groups[0]
assert_eq!(groups.internal_overlapped_groups[0].len(), 1); // reach limit file size
assert!(groups.internal_overlapped_groups[0].contains(&alone));
// middle_overlapped_group was split into groups.external_overlapped_groups[2] and [3] due to size limit
// the chunks split by their min_seq order
assert_eq!(groups.external_overlapped_groups[2].len(), 1); // reach limit file size
assert!(groups.external_overlapped_groups[2].contains(&another)); // min_time = 13, min_seq = 13
assert_eq!(groups.external_overlapped_groups[3].len(), 1); // reach limit file size
assert!(groups.external_overlapped_groups[3].contains(&partial_overlap));
// min_time = 14, min_seq = 14
}
// This is specific unit test for split_overlapped_groups but it is a subtest of test_limit_size_and_num_files
// Keep all the variables the same names in both tests for us to follow them easily
#[test]
fn test_split_overlapped_groups_sequence_number_different_order_from_time() {
let compaction_max_size_bytes = 100000;
// oldest overlapped and very small
let overlaps_many =
arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 10, 10, 400);
let contained_completely_within =
arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 500);
let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number(3, 5, 3, 3, 400);
let min_equals_max =
arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 5, 5, 500);
let oldest_overlapped_group = vec![
overlaps_many.clone(),
contained_completely_within.clone(),
max_equals_min.clone(),
min_equals_max.clone(),
];
// newest files and very large
let alone = arbitrary_parquet_file_with_size_and_sequence_number(
30,
35,
30,
30,
compaction_max_size_bytes + 200,
); // too large to group
let newest_overlapped_group = vec![alone.clone()];
// small files in the middle
let another = arbitrary_parquet_file_with_size_and_sequence_number(13, 15, 14, 14, 1000);
let partial_overlap =
arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 13, 13, 2000);
let middle_overlapped_group = vec![another.clone(), partial_overlap.clone()];
let mut overlapped_groups = vec![
@ -2600,26 +2837,31 @@ mod tests {
max_file_count,
);
// must be 5
assert_eq!(groups.len(), 5);
// one intact group
assert_eq!(groups.internal_overlapped_groups.len(), 1);
// four newly splitted groups from two large groups
assert_eq!(groups.external_overlapped_groups.len(), 4);
// oldest_overlapped_group was split into groups[0] and groups[1] due to file count limit
assert_eq!(groups[0].len(), 2); // reach limit file count
assert!(groups[0].contains(&max_equals_min)); // min_time = 3
assert!(groups[0].contains(&overlaps_many)); // min_time = 5
assert_eq!(groups[1].len(), 2); // reach limit file count
assert!(groups[1].contains(&contained_completely_within)); // min_time = 6
assert!(groups[1].contains(&min_equals_max)); // min_time = 10
// oldest_overlapped_group was split into external_overlapped_groups[0] and [1] due to file count limit
// the chunks split by their min_seq order
assert_eq!(groups.external_overlapped_groups[0].len(), 2); // reach limit file count
assert!(groups.external_overlapped_groups[0].contains(&max_equals_min)); // min_time = 3, min_seq = 3
assert!(groups.external_overlapped_groups[0].contains(&min_equals_max)); // min_time = 10, min_seq = 5
assert_eq!(groups.external_overlapped_groups[1].len(), 2); // reach limit file count
assert!(groups.external_overlapped_groups[1].contains(&contained_completely_within)); // min_time = 6, min_seq = 6
assert!(groups.external_overlapped_groups[1].contains(&overlaps_many)); // min_time = 5, min_seq = 10
// newest_overlapped_group stays the same length one file and the corresponding output is groups[2]
assert_eq!(groups[2].len(), 1); // reach limit file size
assert!(groups[2].contains(&alone));
// newest_overlapped_group stays the same length one file and the corresponding output is
// groups.internal_overlapped_groups[0]
assert_eq!(groups.internal_overlapped_groups[0].len(), 1); // reach limit file size
assert!(groups.internal_overlapped_groups[0].contains(&alone));
// middle_overlapped_group was split into groups[3] and groups[4] due to size limit
assert_eq!(groups[3].len(), 1); // reach limit file size
assert!(groups[3].contains(&another)); // min_time = 13
assert_eq!(groups[4].len(), 1); // reach limit file size
assert!(groups[4].contains(&partial_overlap)); // min_time = 14
// middle_overlapped_group was split into groups.external_overlapped_groups[2] and [3] due to size limit
// the chunks split by their min_seq order
assert_eq!(groups.external_overlapped_groups[2].len(), 1); // reach limit file size
assert!(groups.external_overlapped_groups[2].contains(&partial_overlap)); // min_time = 14, min_seq = 13
assert_eq!(groups.external_overlapped_groups[3].len(), 1); // reach limit file size
assert!(groups.external_overlapped_groups[3].contains(&another)); // min_time = 13, min_seq = 14
}
// This tests
@ -2632,17 +2874,27 @@ mod tests {
let compaction_max_size_bytes = 100000;
// oldest overlapped and very small
let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 400);
let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 500);
let max_equals_min = arbitrary_parquet_file_with_size(3, 5, 400);
let min_equals_max = arbitrary_parquet_file_with_size(10, 12, 500);
let overlaps_many =
arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 10, 10, 400);
let contained_completely_within =
arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 500);
let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number(3, 5, 3, 3, 400);
let min_equals_max =
arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 5, 5, 500);
// newest files and very large
let alone = arbitrary_parquet_file_with_size(30, 35, compaction_max_size_bytes + 200); // too large to group
let alone = arbitrary_parquet_file_with_size_and_sequence_number(
30,
35,
30,
30,
compaction_max_size_bytes + 200,
); // too large to group
// small files in the middle
let another = arbitrary_parquet_file_with_size(13, 15, 1000);
let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000);
let another = arbitrary_parquet_file_with_size_and_sequence_number(13, 15, 14, 14, 1000);
let partial_overlap =
arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 13, 13, 2000);
// Given a bunch of files in an arbitrary order,
let all = vec![
@ -2661,14 +2913,31 @@ mod tests {
let overlapped_groups = Compactor::overlapped_groups(all, max_size_bytes, max_file_count);
// Must be 5
assert_eq!(overlapped_groups.len(), 5);
assert_eq!(overlapped_groups[0].parquet_files.len(), 2); // reach limit file count
// Non-overlapped with other groups always first
// Group 0: newest files and very large
assert_eq!(overlapped_groups[0].parquet_files.len(), 1); // reach limit file size
assert!(overlapped_groups[0].parquet_files.contains(&alone)); // min_time = 30
// The rest of the groups, each overlapp with another group
// Groups 1 & 2: split from `oldest overlapped and very small` ordering by sequence numbers
assert_eq!(overlapped_groups[1].parquet_files.len(), 2); // reach limit file count
assert_eq!(overlapped_groups[2].parquet_files.len(), 1); // reach limit file size
assert!(overlapped_groups[1].parquet_files.contains(&max_equals_min)); // min_time = 3, min_seq = 3
assert!(overlapped_groups[1].parquet_files.contains(&min_equals_max)); // min_time = 10, min_seq = 5
assert_eq!(overlapped_groups[2].parquet_files.len(), 2); // reach limit file count
assert!(overlapped_groups[2]
.parquet_files
.contains(&contained_completely_within)); // min_time = 6, min_seq = 6
assert!(overlapped_groups[2].parquet_files.contains(&overlaps_many)); // min_time = 5, min_seq = 10
// Groups 3 & 4: split from `small files in the middle` ordering by sequence numbers
assert_eq!(overlapped_groups[3].parquet_files.len(), 1); // reach limit file size
assert!(overlapped_groups[3]
.parquet_files
.contains(&partial_overlap)); // min_time = 14, min_seq = 13
assert_eq!(overlapped_groups[4].parquet_files.len(), 1); // reach limit file size
assert!(overlapped_groups[4].parquet_files.contains(&another)); // min_time = 13, min_seq = 14
// Group further into group by size and file count limit
// Due to the merge with correct time range, this function has to sort the groups hence output data will be in time order
// Due to the merge with correct time range, this function has to sort the groups hence output data will be
// in time order between each group and in sequence number order within a each group
let groups = Compactor::group_small_contiguous_groups(
overlapped_groups,
compaction_max_size_bytes,
@ -2679,21 +2948,21 @@ mod tests {
assert_eq!(groups.len(), 5);
assert_eq!(groups[0].len(), 2); // reach file num limit
assert!(groups[0].contains(&max_equals_min)); // min_time = 3
assert!(groups[0].contains(&overlaps_many)); // min_time = 5
assert!(groups[0].contains(&max_equals_min)); // min_time = 3, min_seq = 3
assert!(groups[0].contains(&min_equals_max)); // min_time = 10, min_seq = 5
assert_eq!(groups[1].len(), 2); // reach file num limit
assert!(groups[1].contains(&contained_completely_within)); // min_time = 6
assert!(groups[1].contains(&min_equals_max)); // min_time = 10
assert!(groups[1].contains(&contained_completely_within)); // min_time = 6, min_seq = 6
assert!(groups[1].contains(&overlaps_many)); // min_time = 5, min_seq = 10
assert_eq!(groups[2].len(), 1); // reach size limit
assert!(groups[2].contains(&another)); // min_time = 13
assert!(groups[2].contains(&another)); // min_time = 13, min_seq = 14
assert_eq!(groups[3].len(), 1); // reach size limit
assert!(groups[3].contains(&partial_overlap)); // min_time = 14
assert!(groups[3].contains(&partial_overlap)); // min_time = 14, min_seq = 13
assert_eq!(groups[4].len(), 1); // reach size limit
assert!(groups[4].contains(&alone));
assert!(groups[4].contains(&alone)); // min_time=30
}
// This tests
@ -2718,11 +2987,13 @@ mod tests {
parquet_files: vec![pf1.clone()],
min_time: Timestamp::new(1),
total_file_size_bytes: 100,
overlapped_with_other_groups: false,
};
let g2 = GroupWithMinTimeAndSize {
parquet_files: vec![pf2.clone()],
min_time: Timestamp::new(3),
total_file_size_bytes: compaction_max_size_bytes,
overlapped_with_other_groups: false,
};
// They should each be in their own group
assert_eq!(overlapped_groups, vec![g1, g2]);
@ -2774,6 +3045,7 @@ mod tests {
// Group into overlapped groups
let overlapped_groups =
Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT);
// There are 3 overlapped groups but each group does not overlap with the other
assert_eq!(overlapped_groups.len(), 3);
// group further into group by size
@ -2804,18 +3076,39 @@ mod tests {
// 2. group_small_contiguous_groups that merges non-overlapped group into a larger one if they meet size and file limit
#[test]
fn test_group_small_contiguous_overlapped_groups_many_files_too_large() {
// oldest overlapped and large
let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 200);
let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 300);
let max_equals_min = arbitrary_parquet_file_with_size(3, 5, TEST_MAX_SIZE_BYTES + 400); // too large to group
let min_equals_max = arbitrary_parquet_file_with_size(10, 12, 500);
// oldest overlapped and very small
let overlaps_many = arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 5, 5, 200);
let contained_completely_within =
arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 300);
let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number(
3,
5,
3,
3,
TEST_MAX_SIZE_BYTES + 400,
); // too large to group
let min_equals_max =
arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 10, 10, 500);
// newest files and large
let alone = arbitrary_parquet_file_with_size(30, 35, TEST_MAX_SIZE_BYTES); // too large to group
// newest files and small
let alone = arbitrary_parquet_file_with_size_and_sequence_number(
30,
35,
30,
30,
TEST_MAX_SIZE_BYTES,
); // too large to group
// files in the middle and also large
let another = arbitrary_parquet_file_with_size(13, 15, TEST_MAX_SIZE_BYTES); // too large to group
let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000);
// large files in the middle
let another = arbitrary_parquet_file_with_size_and_sequence_number(
13,
15,
13,
13,
TEST_MAX_SIZE_BYTES,
); // too large to group
let partial_overlap =
arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 14, 14, 2000);
// Given a bunch of files in an arbitrary order
let all = vec![
@ -2863,17 +3156,26 @@ mod tests {
#[test]
fn test_group_small_contiguous_overlapped_groups_many_files_middle_too_large() {
// oldest overlapped and very small
let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 200);
let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 300);
let max_equals_min = arbitrary_parquet_file_with_size(3, 5, 400);
let min_equals_max = arbitrary_parquet_file_with_size(10, 12, 500);
let overlaps_many = arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 5, 5, 200);
let contained_completely_within =
arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 300);
let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number(3, 5, 3, 3, 400);
let min_equals_max =
arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 10, 10, 500);
// newest files and small
let alone = arbitrary_parquet_file_with_size(30, 35, 200);
let alone = arbitrary_parquet_file_with_size_and_sequence_number(30, 35, 30, 30, 200);
// large files in the middle
let another = arbitrary_parquet_file_with_size(13, 15, TEST_MAX_SIZE_BYTES); // too large to group
let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000);
let another = arbitrary_parquet_file_with_size_and_sequence_number(
13,
15,
13,
13,
TEST_MAX_SIZE_BYTES,
); // too large to group
let partial_overlap =
arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 14, 14, 2000);
// Given a bunch of files in an arbitrary order
let all = vec![
@ -2915,6 +3217,93 @@ mod tests {
assert!(groups[2].contains(&alone)); // min _time = 30
}
#[test]
fn test_group_small_contiguous_overlapped_groups_with_split_overlap_groups() {
// oldest overlapped
// These 4 overlapped files exceed max count --> will be split into 2 overlapped groups
let overlaps_many = arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 5, 5, 400);
let contained_completely_within =
arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 500);
let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number(3, 5, 3, 3, 400);
let min_equals_max =
arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 10, 10, 500);
// newest files
// one and large file that does not overlapped with any other files
// --> will be in its own overlapped group after Compactor::overlapped_groups
// Since it is small --> it will be grouped with the middle_overlapped_group after Compactor::group_small_contiguous_groups
let alone = arbitrary_parquet_file_with_size_and_sequence_number(30, 35, 30, 30, 50);
// very small files in the middle
// these 2 files overlap but small so no further split
let another = arbitrary_parquet_file_with_size_and_sequence_number(13, 15, 13, 13, 10);
let partial_overlap =
arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 14, 14, 20);
// Given a bunch of files in an arbitrary order
let all = vec![
min_equals_max.clone(),
overlaps_many.clone(),
alone.clone(),
another.clone(),
max_equals_min.clone(),
contained_completely_within.clone(),
partial_overlap.clone(),
];
let max_size_bytes = 1000;
let max_file_count = 2;
// Group into overlapped groups
let overlapped_groups = Compactor::overlapped_groups(all, max_size_bytes, max_file_count);
assert_eq!(overlapped_groups.len(), 4);
// Must be 4
assert_eq!(overlapped_groups.len(), 4);
// 2 groups that do not overlapped wih other groups are always first
// Group 0: `very small files in the middle`
assert_eq!(overlapped_groups[0].parquet_files.len(), 2);
assert!(overlapped_groups[0].parquet_files.contains(&another)); // min_time = 13, min_seq = 14
assert!(overlapped_groups[0]
.parquet_files
.contains(&partial_overlap)); // min_time = 14, min_seq = 13
// Group 1: `newest files`
assert_eq!(overlapped_groups[1].parquet_files.len(), 1);
assert!(overlapped_groups[1].parquet_files.contains(&alone)); // min_time = 30
// Next 2 groups are split from `oldest overlapped`
assert_eq!(overlapped_groups[2].parquet_files.len(), 2);
assert!(overlapped_groups[2].parquet_files.contains(&max_equals_min)); // min_time = 3, min_seq = 3
assert!(overlapped_groups[2].parquet_files.contains(&overlaps_many)); // min_time = 5, min_seq = 5
assert_eq!(overlapped_groups[3].parquet_files.len(), 2);
assert!(overlapped_groups[3]
.parquet_files
.contains(&contained_completely_within)); // min_time = 6, min_seq = 6
assert!(overlapped_groups[3].parquet_files.contains(&min_equals_max)); // min_time = 10, min_seq = 10
// 4 input groups but 3 output groups. Group 0 and Group 1 above are further grouped together
let groups = Compactor::group_small_contiguous_groups(
overlapped_groups,
max_size_bytes,
max_file_count + 5, // increase limit file count ot get them grouped
);
assert_eq!(groups.len(), 3);
// The output group is in time order
// groups[0] the same as overlapped_groups[2]
assert_eq!(groups[0].len(), 2);
assert!(groups[0].contains(&max_equals_min)); // min_time = 3, min_seq = 3
assert!(groups[0].contains(&overlaps_many)); // min_time = 5, min_seq = 5
// groups[1] the same as overlapped_groups[3]
assert_eq!(groups[1].len(), 2);
assert!(groups[1].contains(&contained_completely_within)); // min_time = 6, min_seq = 6
assert!(groups[1].contains(&min_equals_max)); // min_time = 10, min_seq = 10
// group[2] is the result of grouping overlapped_groups[0] and overlapped_groups[1]
assert_eq!(groups[2].len(), 3);
assert!(groups[2].contains(&another)); // min_time = 13, min_seq = 1
assert!(groups[2].contains(&partial_overlap)); // min_time = 14, min_seq = 13
assert!(groups[2].contains(&alone)); // min_time = 30
}
#[tokio::test]
async fn add_tombstones_to_parquet_files_in_groups() {
let catalog = TestCatalog::new();

View File

@ -45,6 +45,33 @@ pub struct GroupWithMinTimeAndSize {
/// total size of all file
pub(crate) total_file_size_bytes: i64,
/// true if this group was split from a group of many overlapped files
pub(crate) overlapped_with_other_groups: bool,
}
impl GroupWithMinTimeAndSize {
/// Make GroupWithMinTimeAndSize for a given set of parquet files
pub fn new(files: Vec<ParquetFile>, overlaped: bool) -> Self {
let mut group = Self {
parquet_files: files,
min_time: Timestamp::new(i64::MAX),
total_file_size_bytes: 0,
overlapped_with_other_groups: overlaped,
};
assert!(
!group.parquet_files.is_empty(),
"invalid empty group for computing min time and total size"
);
for file in &group.parquet_files {
group.min_time = group.min_time.min(file.min_time);
group.total_file_size_bytes += file.file_size_bytes;
}
group
}
}
/// Wrapper of a parquet file and its tombstones