diff --git a/compactor2/src/components/files_split/target_level_non_overlap_split.rs b/compactor2/src/components/files_split/target_level_non_overlap_split.rs index 35f50de992..d42797e786 100644 --- a/compactor2/src/components/files_split/target_level_non_overlap_split.rs +++ b/compactor2/src/components/files_split/target_level_non_overlap_split.rs @@ -2,7 +2,7 @@ use std::{collections::VecDeque, fmt::Display}; use data_types::{CompactionLevel, ParquetFile}; -use crate::file_group::FilesTimeRange; +use crate::file_group::{split_by_level, FilesTimeRange}; use super::FilesSplit; @@ -67,18 +67,9 @@ impl FilesSplit for TargetLevelNonOverlapSplit { let num_files = files.len(); // Split files into levels - let mut target_level_files = Vec::with_capacity(num_files); - let mut prev_level_files = Vec::with_capacity(num_files); let prev_level = target_level.prev(); - for file in files { - if file.compaction_level == target_level { - target_level_files.push(file); - } else if file.compaction_level == prev_level { - prev_level_files.push(file); - } else { - panic!("Unexpected compaction level: {}", file.compaction_level); - } - } + let (mut target_level_files, prev_level_files) = + split_by_level(files, target_level, prev_level); // compute time range of prev_level_files let prev_level_range = if let Some(r) = FilesTimeRange::try_new(&prev_level_files) { @@ -161,7 +152,9 @@ mod tests { } #[test] - #[should_panic(expected = "Unexpected compaction level: CompactionLevel::L2")] + #[should_panic( + expected = "Unexpected compaction level. Expected CompactionLevel::L1 or CompactionLevel::L0 but got CompactionLevel::L2." + )] fn test_unexpected_compaction_level_2() { let files = create_overlapped_files(); let split = TargetLevelNonOverlapSplit::new(); @@ -170,7 +163,9 @@ mod tests { } #[test] - #[should_panic(expected = "Unexpected compaction level: CompactionLevel::L0")] + #[should_panic( + expected = "Unexpected compaction level. Expected CompactionLevel::L2 or CompactionLevel::L1 but got CompactionLevel::L0." + )] fn test_unexpected_compaction_level_0() { let files = create_overlapped_files(); let split = TargetLevelNonOverlapSplit::new(); diff --git a/compactor2/src/components/files_split/target_level_upgrade_split.rs b/compactor2/src/components/files_split/target_level_upgrade_split.rs index 5844fa5434..b36f833d23 100644 --- a/compactor2/src/components/files_split/target_level_upgrade_split.rs +++ b/compactor2/src/components/files_split/target_level_upgrade_split.rs @@ -3,7 +3,7 @@ use std::fmt::Display; use data_types::{CompactionLevel, ParquetFile}; use super::FilesSplit; -use crate::file_group::{overlaps_in_time, FilesTimeRange}; +use crate::file_group::{overlaps_in_time, split_by_level, FilesTimeRange}; #[derive(Debug)] /// Split files into `[files_to_compact]` and `[files_to_upgrade]` @@ -72,18 +72,9 @@ impl FilesSplit for TargetLevelUpgradeSplit { let mut files_to_compact = Vec::with_capacity(files.len()); // Split files into levels - let mut target_level_files = Vec::with_capacity(files.len()); - let mut prev_level_files = Vec::with_capacity(files.len()); let prev_level = target_level.prev(); - for file in files { - if file.compaction_level == target_level { - target_level_files.push(file); - } else if file.compaction_level == prev_level { - prev_level_files.push(file); - } else { - panic!("Unexpected compaction level: {}", file.compaction_level); - } - } + let (target_level_files, mut prev_level_files) = + split_by_level(files, target_level, prev_level); // compute time range of target_level_files, if any let target_time_range = FilesTimeRange::try_new(&target_level_files); @@ -178,7 +169,9 @@ mod tests { } #[test] - #[should_panic(expected = "Unexpected compaction level: CompactionLevel::L2")] + #[should_panic( + expected = "Unexpected compaction level. Expected CompactionLevel::L1 or CompactionLevel::L0 but got CompactionLevel::L2." + )] fn test_unexpected_compaction_level_2() { let files = create_overlapped_files(); let split = TargetLevelUpgradeSplit::new(MAX_SIZE); @@ -187,7 +180,9 @@ mod tests { } #[test] - #[should_panic(expected = "Unexpected compaction level: CompactionLevel::L0")] + #[should_panic( + expected = "Unexpected compaction level. Expected CompactionLevel::L2 or CompactionLevel::L1 but got CompactionLevel::L0." + )] fn test_unexpected_compaction_level_0() { let files = create_overlapped_files(); let split = TargetLevelUpgradeSplit::new(MAX_SIZE); diff --git a/compactor2/src/file_group.rs b/compactor2/src/file_group.rs index af03d4cf15..8418035df0 100644 --- a/compactor2/src/file_group.rs +++ b/compactor2/src/file_group.rs @@ -1,6 +1,6 @@ //! Utilities for working with groups of [`ParquetFile`] -use data_types::{ParquetFile, Timestamp}; +use data_types::{CompactionLevel, ParquetFile, Timestamp}; use std::cmp::{max, min}; /// Represent the min/max time range for a group of [`ParquetFile`]s @@ -43,6 +43,31 @@ pub fn overlaps_in_time<'a>( files.into_iter().any(|f| f.overlaps(file)) } +/// Divide a list of [`ParquetFile`]s into two lists based on the two levels +/// +/// Panics if there any level other than the two levels is present +pub fn split_by_level( + files: impl IntoIterator, + first_level: CompactionLevel, + second_level: CompactionLevel, +) -> (Vec, Vec) { + // Split files into levels + files + .into_iter() + .partition(|file| { + let file_level = file.compaction_level; + if file_level == first_level { + true + } else if file_level == second_level { + false + } else { + panic!( + "Unexpected compaction level. Expected {first_level} or {second_level} but got {file_level}." + ); + } + }) +} + #[cfg(test)] mod test { use super::*;