fix: avoid combing groups that overlap with other groups even if they are small (#5052)
* fix: avoid combing groups that overlap with other groups even if they are small * chore: Apply suggestions from code review Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
e193913ed3
commit
425b8a63cf
|
@ -673,6 +673,48 @@ impl Compactor {
|
|||
}
|
||||
|
||||
// Group time-contiguous non-overlapped groups if their total size is smaller than a threshold
|
||||
// There are 2 types of input groups
|
||||
// 1. Type-1: Groups that inlcude overlapped files but the groups do not overlap with other groups
|
||||
// 2. Type-2: Groups that overlap with other group
|
||||
// We can only combine time-contiguous type-1 groups to avoid a buggy situation where the groups overlap
|
||||
// in both time and sequence numbers
|
||||
//
|
||||
// Example:
|
||||
// n_m: chunk with range of sequence numbers [n, m] where n <= m
|
||||
//
|
||||
// Input: 5 groups
|
||||
// . 3 type-1 groups: [1_1, 2_2], [4_4], [7_7],
|
||||
// . 2 type-2 groups: [5_5], [6_6]
|
||||
//
|
||||
// │ │ │ Time
|
||||
// ──────────────────────────────────────────────────────────────▶
|
||||
// ┌─────┐ │ ┌─────┐ │ ┌─────────────────┐ │ ┌─────┐
|
||||
// │ 1_1 │ │ 7_7 │ │ 5_5(*) │ │ 4_4 │
|
||||
// └─────┘ │ └─────┘ │ └─────────────────┘ │ └─────┘
|
||||
// ┌─────┐ ┌──────┐
|
||||
// │ 2_2 │ │ │ │ │6_6(*)│ │
|
||||
// └─────┘ └──────┘
|
||||
// │ │ │ │
|
||||
//
|
||||
// Output: 4 groups
|
||||
// . [1_1, 2_2, 7_7]
|
||||
// . [5_5]
|
||||
// . [6_6]
|
||||
// . [4_4]
|
||||
// │ │ Time
|
||||
// ──────────────────────────────────────────────────────────────▶
|
||||
// ┌─────┐ ┌─────┐ │ ┌─────────────────┐ │ ┌─────┐
|
||||
// │ 1_1 │ │ 7_7 │ │ 5_5(*) │ │ 4_4 │
|
||||
// └─────┘ └─────┘ │ └─────────────────┘ │ └─────┘
|
||||
// ┌─────┐ ┌──────┐
|
||||
// │ 2_2 │ │ │ │6_6(*)│ │
|
||||
// └─────┘ └──────┘
|
||||
// │ │ │
|
||||
//
|
||||
// Note that even if groups [6_6] and [4_4] are very small, they cannot be combined
|
||||
// because chunk 6_6 overlaps with chunk 5_5. Combining [6_6] and [4_4] will lead to
|
||||
// a bad comacting result of 2 chunks 5_5 and 4_6 that overlap in both time and sequence numbers
|
||||
|
||||
fn group_small_contiguous_groups(
|
||||
mut file_groups: Vec<GroupWithMinTimeAndSize>,
|
||||
compaction_max_size_bytes: i64,
|
||||
|
@ -700,13 +742,25 @@ impl Compactor {
|
|||
current_group.extend(g.parquet_files);
|
||||
current_size += g.total_file_size_bytes;
|
||||
} else {
|
||||
// Current group cannot combine with it next one
|
||||
// Current group cannot be combined with its next one
|
||||
if !current_group.is_empty() {
|
||||
groups.push(current_group);
|
||||
}
|
||||
|
||||
// Create new current group
|
||||
current_num_files = g.parquet_files.len();
|
||||
current_group = g.parquet_files;
|
||||
current_size = g.total_file_size_bytes;
|
||||
|
||||
// If this current group overlaps with other groups, it cannot
|
||||
// be combined with any groups
|
||||
if g.overlapped_with_other_groups {
|
||||
groups.push(current_group);
|
||||
|
||||
current_group = vec![];
|
||||
current_num_files = 0;
|
||||
current_size = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3239,6 +3293,133 @@ mod tests {
|
|||
assert!(groups[4].contains(&alone)); // min_time = 30
|
||||
}
|
||||
|
||||
// bug: https://github.com/influxdata/conductor/issues/1013
|
||||
//
|
||||
// n_m: chunks with sequence number range [n_m] where n <= m
|
||||
//
|
||||
// This is the scenario that leads to bug
|
||||
// Chunks 2_2 and 3_3 overlap but 2_2 is too large to compact with any other chunks and they were
|
||||
// split and marked with (*) meaning they should not group with any other groups. However, due to the bug,
|
||||
// chunk 3_3 (small) was then wrongly grouped with chunk 1_1 and lead to wrong result of two
|
||||
// chunks, 1_3 and 2_2, that overlap in both time and sequence number
|
||||
//
|
||||
// Input
|
||||
// Time
|
||||
// ────────────────────────────────────────────▶
|
||||
// ┌─────────────────┐ ┌────────┐
|
||||
// │ 2_2(*) │ │ 1_1 │
|
||||
// └─────────────────┘ └────────┘
|
||||
// ┌────────┐
|
||||
// │ 3_3(*) │
|
||||
// └────────┘
|
||||
//
|
||||
// Wrong output: [3_3(*)] is combined with [1_1]
|
||||
// Right output: Nothing gets combined and the output should look the same as the input
|
||||
#[test]
|
||||
fn test_group_small_contiguous_groups_103() {
|
||||
let compaction_max_size_bytes = 100000;
|
||||
|
||||
// Given 3 files as in the input diagram
|
||||
let pf1 = arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 1, 1, 100);
|
||||
let pf2 = arbitrary_parquet_file_with_size_and_sequence_number(
|
||||
1,
|
||||
5,
|
||||
2,
|
||||
2,
|
||||
compaction_max_size_bytes,
|
||||
); // too large to group
|
||||
let pf3 = arbitrary_parquet_file_with_size_and_sequence_number(3, 6, 3, 3, 200);
|
||||
|
||||
let g1 = GroupWithMinTimeAndSize::new(vec![pf1], false);
|
||||
let g2 = GroupWithMinTimeAndSize::new(vec![pf2], true);
|
||||
let g3 = GroupWithMinTimeAndSize::new(vec![pf3], true);
|
||||
|
||||
let overlapped_groups = vec![g1, g2, g3];
|
||||
|
||||
// None of the 3 groups should be further combined
|
||||
let groups = Compactor::group_small_contiguous_groups(
|
||||
overlapped_groups,
|
||||
compaction_max_size_bytes,
|
||||
TEST_MAX_FILE_COUNT,
|
||||
);
|
||||
assert_eq!(groups.len(), 3);
|
||||
}
|
||||
|
||||
// Thorough tests for group_small_contiguous_groups
|
||||
// Input: 5 groups
|
||||
// . 3 type-1 groups: [1_1, 2_2], [4_4], [7_7],
|
||||
// . 2 type-2 groups: [5_5], [6_6]
|
||||
//
|
||||
// │ │ │ Time
|
||||
// ──────────────────────────────────────────────────────────────▶
|
||||
// ┌─────┐ │ ┌─────┐ │ ┌─────────────────┐ │ ┌─────┐
|
||||
// │ 1_1 │ │ 7_7 │ │ 5_5(*) │ │ 4_4 │
|
||||
// └─────┘ │ └─────┘ │ └─────────────────┘ │ └─────┘
|
||||
// ┌─────┐ ┌──────┐
|
||||
// │ 2_2 │ │ │ │ │6_6(*)│ │
|
||||
// └─────┘ └──────┘
|
||||
// │ │ │ │
|
||||
//
|
||||
// Output: 4 groups
|
||||
// . [1_1, 2_2, 7_7]
|
||||
// . [5_5]
|
||||
// . [6_6]
|
||||
// . [4_4]
|
||||
// │ │ Time
|
||||
// ──────────────────────────────────────────────────────────────▶
|
||||
// ┌─────┐ ┌─────┐ │ ┌─────────────────┐ │ ┌─────┐
|
||||
// │ 1_1 │ │ 7_7 │ │ 5_5(*) │ │ 4_4 │
|
||||
// └─────┘ └─────┘ │ └─────────────────┘ │ └─────┘
|
||||
// ┌─────┐ ┌──────┐
|
||||
// │ 2_2 │ │ │ │6_6(*)│ │
|
||||
// └─────┘ └──────┘
|
||||
// │ │ │
|
||||
#[test]
|
||||
fn test_group_small_contiguous_groups() {
|
||||
let compaction_max_size_bytes = 100000;
|
||||
|
||||
// Given 6 files as in the input diagram
|
||||
let pf1 = arbitrary_parquet_file_with_size_and_sequence_number(1, 5, 1, 1, 100);
|
||||
let pf2 = arbitrary_parquet_file_with_size_and_sequence_number(3, 6, 2, 2, 100);
|
||||
let pf4 = arbitrary_parquet_file_with_size_and_sequence_number(100, 110, 4, 4, 100);
|
||||
let pf5 = arbitrary_parquet_file_with_size_and_sequence_number(
|
||||
50,
|
||||
60,
|
||||
5,
|
||||
5,
|
||||
compaction_max_size_bytes,
|
||||
); // too large to group
|
||||
let pf6 = arbitrary_parquet_file_with_size_and_sequence_number(55, 65, 6, 6, 200);
|
||||
let pf7 = arbitrary_parquet_file_with_size_and_sequence_number(10, 20, 7, 7, 200);
|
||||
|
||||
// 6 files in in 5 groups
|
||||
let g1 = GroupWithMinTimeAndSize::new(vec![pf1.clone(), pf2.clone()], false);
|
||||
let g2 = GroupWithMinTimeAndSize::new(vec![pf4.clone()], false);
|
||||
let g3 = GroupWithMinTimeAndSize::new(vec![pf5.clone()], true);
|
||||
let g4 = GroupWithMinTimeAndSize::new(vec![pf6.clone()], true);
|
||||
let g5 = GroupWithMinTimeAndSize::new(vec![pf7.clone()], false);
|
||||
|
||||
let overlapped_groups = vec![g1, g2, g3, g4, g5];
|
||||
|
||||
// g1 and g5 will be combined
|
||||
let groups = Compactor::group_small_contiguous_groups(
|
||||
overlapped_groups,
|
||||
compaction_max_size_bytes,
|
||||
TEST_MAX_FILE_COUNT,
|
||||
);
|
||||
assert_eq!(groups.len(), 4);
|
||||
assert_eq!(groups[0].len(), 3);
|
||||
assert!(groups[0].contains(&pf1));
|
||||
assert!(groups[0].contains(&pf2));
|
||||
assert!(groups[0].contains(&pf7));
|
||||
assert_eq!(groups[1].len(), 1);
|
||||
assert!(groups[1].contains(&pf5));
|
||||
assert_eq!(groups[2].len(), 1);
|
||||
assert!(groups[2].contains(&pf6));
|
||||
assert_eq!(groups[3].len(), 1);
|
||||
assert!(groups[3].contains(&pf4));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_group_small_contiguous_overlapped_groups_many_files_middle_too_large() {
|
||||
// oldest overlapped and very small
|
||||
|
@ -3252,7 +3433,7 @@ mod tests {
|
|||
// newest files and small
|
||||
let alone = arbitrary_parquet_file_with_size_and_sequence_number(30, 35, 30, 30, 200);
|
||||
|
||||
// large files in the middle
|
||||
// large files in the middle
|
||||
let another = arbitrary_parquet_file_with_size_and_sequence_number(
|
||||
13,
|
||||
15,
|
||||
|
@ -3275,18 +3456,20 @@ mod tests {
|
|||
];
|
||||
|
||||
// Group into overlapped groups
|
||||
// Becasue file `another` is too large, it was split from its actulal overlapped `partial_overlap`.
|
||||
// This leads to actual 3 overlapped groups become 4 groups
|
||||
let overlapped_groups =
|
||||
Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap();
|
||||
assert_eq!(overlapped_groups.len(), 4);
|
||||
|
||||
// 4 input groups but 3 output groups
|
||||
// The last 2 groups will be grouped together because they are small
|
||||
// 4 input groups and still 4 output groups. Although the sorted last 2 groups [partial_overlap] and
|
||||
// [alone] are small, they cannot be combined because [partial_overlap] overlaps with [another]
|
||||
let groups = Compactor::group_small_contiguous_groups(
|
||||
overlapped_groups,
|
||||
TEST_MAX_SIZE_BYTES,
|
||||
TEST_MAX_FILE_COUNT,
|
||||
);
|
||||
assert_eq!(groups.len(), 3);
|
||||
assert_eq!(groups.len(), 4);
|
||||
|
||||
// first group includes 4 oldest files
|
||||
assert_eq!(groups[0].len(), 4);
|
||||
|
@ -3294,13 +3477,15 @@ mod tests {
|
|||
assert!(groups[0].contains(&overlaps_many)); // min _time = 5
|
||||
assert!(groups[0].contains(&contained_completely_within)); // min _time = 6
|
||||
assert!(groups[0].contains(&min_equals_max)); // min _time = 10
|
||||
// second group
|
||||
// second group includes the large file `another` of the middle time
|
||||
assert_eq!(groups[1].len(), 1);
|
||||
assert!(groups[1].contains(&another)); // min _time = 13
|
||||
// third group
|
||||
assert_eq!(groups[2].len(), 2);
|
||||
assert!(groups[2].contains(&partial_overlap)); // min _time = 3
|
||||
assert!(groups[2].contains(&alone)); // min _time = 30
|
||||
// third group includes the second file `partial_overlap` of the middle time
|
||||
assert_eq!(groups[2].len(), 1);
|
||||
assert!(groups[2].contains(&partial_overlap)); // min _time = 14
|
||||
// fourth group includes the newes one
|
||||
assert_eq!(groups[3].len(), 1);
|
||||
assert!(groups[3].contains(&alone)); // min _time = 30
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
Loading…
Reference in New Issue