refactor: convert a panic into an error and throw a warning if we choose non-actionable compacting candidates (#5041)
* refactor: convert a panic into an error and throw a warning if we choose non-actionable candidates * chore: Apply suggestions from code review Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> * chore: run fmt Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>pull/24376/head
parent
1de022136c
commit
d8b74f6af8
|
@ -135,6 +135,28 @@ pub enum Error {
|
|||
Persist {
|
||||
source: parquet_file::storage::UploadError,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Two time-overlapped files with overlapped range sequence numbers. \
|
||||
File id 1: {}, file id 2: {}, sequence number range 1: [{}, {}], \
|
||||
sequence number range 2: [{}, {}], partition id: {}",
|
||||
file_id_1,
|
||||
file_id_2,
|
||||
min_seq_1,
|
||||
max_seq_1,
|
||||
min_seq_2,
|
||||
max_seq_2,
|
||||
partition_id,
|
||||
))]
|
||||
OverlapTimeAndSequenceNumber {
|
||||
file_id_1: ParquetFileId,
|
||||
file_id_2: ParquetFileId,
|
||||
min_seq_1: i64,
|
||||
max_seq_1: i64,
|
||||
min_seq_2: i64,
|
||||
max_seq_2: i64,
|
||||
partition_id: PartitionId,
|
||||
},
|
||||
}
|
||||
|
||||
/// A specialized `Error` for Compactor Data errors
|
||||
|
@ -467,7 +489,7 @@ impl Compactor {
|
|||
parquet_files,
|
||||
compaction_max_size_bytes,
|
||||
compaction_max_file_count,
|
||||
);
|
||||
)?;
|
||||
|
||||
// Group time-contiguous non-overlapped groups if their total size is smaller than a threshold
|
||||
// If their
|
||||
|
@ -1004,7 +1026,7 @@ impl Compactor {
|
|||
groups: &mut Vec<Vec<ParquetFile>>,
|
||||
max_size_bytes: i64,
|
||||
max_file_count: i64,
|
||||
) -> OverlappedGroups {
|
||||
) -> Result<OverlappedGroups> {
|
||||
let mut overlapped_groups = OverlappedGroups::new(groups.len() * 2);
|
||||
let max_count = max_file_count.try_into().unwrap();
|
||||
for group in groups {
|
||||
|
@ -1027,7 +1049,7 @@ impl Compactor {
|
|||
// Points 1 and 2 together will lead to many non-time-overlapped files with same sequence number ranges
|
||||
// can end up in the same time-overlapped group
|
||||
for i in 1..group.len() {
|
||||
Self::verify_contiguous_files(&group[i - 1], &group[i]);
|
||||
Self::verify_contiguous_files(&group[i - 1], &group[i])?
|
||||
}
|
||||
|
||||
while !group.is_empty() {
|
||||
|
@ -1059,13 +1081,13 @@ impl Compactor {
|
|||
}
|
||||
}
|
||||
|
||||
overlapped_groups
|
||||
Ok(overlapped_groups)
|
||||
}
|
||||
|
||||
// Panic if the two given time are both time-overlapped and sequence-number-overlapped
|
||||
// . file_1 and file_2 must belong to the same partition
|
||||
// . file_1 must have range sequence number <= file_s' range sequence number
|
||||
fn verify_contiguous_files(file_1: &ParquetFile, file_2: &ParquetFile) {
|
||||
fn verify_contiguous_files(file_1: &ParquetFile, file_2: &ParquetFile) -> Result<()> {
|
||||
assert!(file_1.partition_id == file_2.partition_id);
|
||||
assert!(file_1.min_sequence_number <= file_2.min_sequence_number);
|
||||
|
||||
|
@ -1074,19 +1096,18 @@ impl Compactor {
|
|||
|| (file_2.min_time >= file_1.min_time && file_2.min_time <= file_1.max_time))
|
||||
&& (file_1.max_sequence_number >= file_2.min_sequence_number)
|
||||
{
|
||||
panic!(
|
||||
"Two time-overlapped files with overlapped range sequence numbers. \
|
||||
File id 1: {}, file id 2: {}, sequence number range 1: [{}, {}], \
|
||||
sequence number range 2: [{}, {}], partition id: {}",
|
||||
file_1.id,
|
||||
file_2.id,
|
||||
file_1.min_sequence_number.get(),
|
||||
file_1.max_sequence_number.get(),
|
||||
file_2.min_sequence_number.get(),
|
||||
file_2.max_sequence_number.get(),
|
||||
file_1.partition_id,
|
||||
)
|
||||
return Err(crate::compact::Error::OverlapTimeAndSequenceNumber {
|
||||
file_id_1: file_1.id,
|
||||
file_id_2: file_2.id,
|
||||
min_seq_1: file_1.min_sequence_number.get(),
|
||||
max_seq_1: file_1.max_sequence_number.get(),
|
||||
min_seq_2: file_2.min_sequence_number.get(),
|
||||
max_seq_2: file_2.max_sequence_number.get(),
|
||||
partition_id: file_1.partition_id,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Given a list of parquet files that come from the same Table Partition, group files together
|
||||
|
@ -1097,16 +1118,16 @@ impl Compactor {
|
|||
parquet_files: Vec<ParquetFile>,
|
||||
max_size_bytes: i64,
|
||||
max_file_count: i64,
|
||||
) -> Vec<GroupWithMinTimeAndSize> {
|
||||
) -> Result<Vec<GroupWithMinTimeAndSize>> {
|
||||
// group overlap files
|
||||
let mut overlapped_groups =
|
||||
group_potential_duplicates(parquet_files).expect("Error grouping overlapped chunks");
|
||||
|
||||
// split overlapped groups into smaller groups if they include so many files
|
||||
let overlapped_groups =
|
||||
Self::split_overlapped_groups(&mut overlapped_groups, max_size_bytes, max_file_count);
|
||||
Self::split_overlapped_groups(&mut overlapped_groups, max_size_bytes, max_file_count)?;
|
||||
|
||||
overlapped_groups.groups_with_min_time_and_size()
|
||||
Ok(overlapped_groups.groups_with_min_time_and_size())
|
||||
}
|
||||
|
||||
// Compute time to split data
|
||||
|
@ -2564,7 +2585,8 @@ mod tests {
|
|||
vec![pf1.clone(), pf2.clone()],
|
||||
TEST_MAX_SIZE_BYTES,
|
||||
TEST_MAX_FILE_COUNT,
|
||||
);
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// They should be 2 groups
|
||||
assert_eq!(groups.len(), 2, "There should have been two group");
|
||||
|
@ -2583,7 +2605,8 @@ mod tests {
|
|||
vec![pf1.clone(), pf2.clone()],
|
||||
TEST_MAX_SIZE_BYTES,
|
||||
TEST_MAX_FILE_COUNT,
|
||||
);
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// They should be in one group (order not guaranteed)
|
||||
assert_eq!(groups.len(), 1, "There should have only been one group");
|
||||
|
@ -2622,7 +2645,7 @@ mod tests {
|
|||
];
|
||||
|
||||
let mut groups =
|
||||
Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT);
|
||||
Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap();
|
||||
dbg!(&groups);
|
||||
|
||||
assert_eq!(groups.len(), 3);
|
||||
|
@ -2687,7 +2710,8 @@ mod tests {
|
|||
vec![pf1.clone(), pf2.clone()],
|
||||
TEST_MAX_SIZE_BYTES,
|
||||
TEST_MAX_FILE_COUNT,
|
||||
);
|
||||
)
|
||||
.unwrap();
|
||||
// 2 overlapped groups
|
||||
assert_eq!(overlapped_groups.len(), 2);
|
||||
let g1 = GroupWithMinTimeAndSize {
|
||||
|
@ -2718,7 +2742,6 @@ mod tests {
|
|||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn test_split_overlapped_groups_panic() {
|
||||
let max_size_bytes = 1000;
|
||||
let max_file_count = 2;
|
||||
|
@ -2731,8 +2754,12 @@ mod tests {
|
|||
arbitrary_parquet_file_with_size_and_sequence_number(7, 11, 7, 15, max_size_bytes - 10);
|
||||
let mut groups = vec![vec![f1, f2]];
|
||||
|
||||
// will panic becasue of
|
||||
Compactor::split_overlapped_groups(&mut groups, max_size_bytes, max_file_count);
|
||||
// should error
|
||||
let err = Compactor::split_overlapped_groups(&mut groups, max_size_bytes, max_file_count)
|
||||
.unwrap_err();
|
||||
assert!(err
|
||||
.to_string()
|
||||
.contains("Two time-overlapped files with overlapped range sequence numbers"))
|
||||
}
|
||||
|
||||
// test of `https://github.com/influxdata/conductor/issues/1009`
|
||||
|
@ -2752,7 +2779,7 @@ mod tests {
|
|||
let mut groups = vec![vec![f1, f2, f3]];
|
||||
|
||||
// should not panic
|
||||
Compactor::split_overlapped_groups(&mut groups, max_size_bytes, max_file_count);
|
||||
Compactor::split_overlapped_groups(&mut groups, max_size_bytes, max_file_count).unwrap();
|
||||
}
|
||||
|
||||
// This is specific unit test for split_overlapped_groups but it is a subtest of test_limit_size_and_num_files
|
||||
|
@ -2807,7 +2834,8 @@ mod tests {
|
|||
&mut overlapped_groups,
|
||||
max_size_bytes,
|
||||
max_file_count,
|
||||
);
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// one intact group
|
||||
assert_eq!(groups.internal_overlapped_groups.len(), 1);
|
||||
|
@ -2890,7 +2918,8 @@ mod tests {
|
|||
&mut overlapped_groups,
|
||||
max_size_bytes,
|
||||
max_file_count,
|
||||
);
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// one intact group
|
||||
assert_eq!(groups.internal_overlapped_groups.len(), 1);
|
||||
|
@ -2965,7 +2994,8 @@ mod tests {
|
|||
// Group into overlapped groups
|
||||
let max_size_bytes = 1000;
|
||||
let max_file_count = 2;
|
||||
let overlapped_groups = Compactor::overlapped_groups(all, max_size_bytes, max_file_count);
|
||||
let overlapped_groups =
|
||||
Compactor::overlapped_groups(all, max_size_bytes, max_file_count).unwrap();
|
||||
// Must be 5
|
||||
assert_eq!(overlapped_groups.len(), 5);
|
||||
// Non-overlapped with other groups always first
|
||||
|
@ -3035,7 +3065,8 @@ mod tests {
|
|||
vec![pf1.clone(), pf2.clone()],
|
||||
TEST_MAX_SIZE_BYTES,
|
||||
TEST_MAX_FILE_COUNT,
|
||||
);
|
||||
)
|
||||
.unwrap();
|
||||
// 2 overlapped groups
|
||||
assert_eq!(overlapped_groups.len(), 2);
|
||||
let g1 = GroupWithMinTimeAndSize {
|
||||
|
@ -3099,7 +3130,7 @@ mod tests {
|
|||
|
||||
// Group into overlapped groups
|
||||
let overlapped_groups =
|
||||
Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT);
|
||||
Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap();
|
||||
// There are 3 overlapped groups but each group does not overlap with the other
|
||||
assert_eq!(overlapped_groups.len(), 3);
|
||||
|
||||
|
@ -3178,7 +3209,7 @@ mod tests {
|
|||
|
||||
// Group into overlapped groups
|
||||
let overlapped_groups =
|
||||
Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT);
|
||||
Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap();
|
||||
assert_eq!(overlapped_groups.len(), 5);
|
||||
|
||||
// 5 input groups and 5 output groups because they are too large to group further
|
||||
|
@ -3245,7 +3276,7 @@ mod tests {
|
|||
|
||||
// Group into overlapped groups
|
||||
let overlapped_groups =
|
||||
Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT);
|
||||
Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap();
|
||||
assert_eq!(overlapped_groups.len(), 4);
|
||||
|
||||
// 4 input groups but 3 output groups
|
||||
|
@ -3310,7 +3341,8 @@ mod tests {
|
|||
let max_file_count = 2;
|
||||
|
||||
// Group into overlapped groups
|
||||
let overlapped_groups = Compactor::overlapped_groups(all, max_size_bytes, max_file_count);
|
||||
let overlapped_groups =
|
||||
Compactor::overlapped_groups(all, max_size_bytes, max_file_count).unwrap();
|
||||
assert_eq!(overlapped_groups.len(), 4);
|
||||
|
||||
// Must be 4
|
||||
|
@ -3550,7 +3582,8 @@ mod tests {
|
|||
// Given a bunch of files in a particular order to exercise the algorithm:
|
||||
let all = vec![one, two, three];
|
||||
|
||||
let groups = Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT);
|
||||
let groups =
|
||||
Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap();
|
||||
dbg!(&groups);
|
||||
|
||||
// All should be in the same group.
|
||||
|
@ -3564,7 +3597,8 @@ mod tests {
|
|||
// Given a bunch of files in a particular order to exercise the algorithm:
|
||||
let all = vec![one, two, three, four];
|
||||
|
||||
let groups = Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT);
|
||||
let groups =
|
||||
Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap();
|
||||
dbg!(&groups);
|
||||
|
||||
// All should be in the same group.
|
||||
|
|
|
@ -243,6 +243,11 @@ async fn run_compactor(compactor: Arc<Compactor>, shutdown: CancellationToken) {
|
|||
debug!(%max_size, %used_size, n_compactions=%handles.len(), "reached maximum concurrent compaction size limit");
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// All candidates should be compactable (have files to compact and/or upgrade)
|
||||
// Reaching here means we do not choose the right candidates and
|
||||
// it would be a waste of time to repeat this cycle
|
||||
warn!("The candidate partition {} has no files to be either compacted or upgraded", c.candidate.partition_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue