diff --git a/compactor2/src/components/file_classifier/split_based.rs b/compactor2/src/components/file_classifier/split_based.rs index 57ba8d2dc5..aedd946be5 100644 --- a/compactor2/src/components/file_classifier/split_based.rs +++ b/compactor2/src/components/file_classifier/split_based.rs @@ -3,20 +3,18 @@ use std::fmt::Display; use data_types::ParquetFile; use crate::{ - components::{files_split::FilesSplit, target_level_chooser::TargetLevelChooser}, - file_classification::FileClassification, - partition_info::PartitionInfo, - RoundInfo, + components::files_split::FilesSplit, file_classification::FileClassification, + partition_info::PartitionInfo, RoundInfo, }; use super::FileClassifier; -/// Use a combination of [`TargetLevelChooser`] and [`FilesSplit`] to build a [`FileClassification`]. +/// Use [`FilesSplit`] to build a [`FileClassification`]. /// -/// This uses the following data flow: +/// Uses the target_level from the `round_info` in the following data flow: /// /// ```text -/// (files)--------------+->[target level chooser (T)]--->(target level) +/// (files+target_level)-+....................................... /// | : /// | : /// | +................................+ @@ -48,34 +46,25 @@ use super::FileClassifier; /// (file compact) (file upgrade) /// ``` #[derive(Debug)] -pub struct SplitBasedFileClassifier +pub struct SplitBasedFileClassifier where - T: TargetLevelChooser, FT: FilesSplit, FO: FilesSplit, FU: FilesSplit, { - target_level_chooser: T, target_level_split: FT, non_overlap_split: FO, upgrade_split: FU, } -impl SplitBasedFileClassifier +impl SplitBasedFileClassifier where - T: TargetLevelChooser, FT: FilesSplit, FO: FilesSplit, FU: FilesSplit, { - pub fn new( - target_level_chooser: T, - target_level_split: FT, - non_overlap_split: FO, - upgrade_split: FU, - ) -> Self { + pub fn new(target_level_split: FT, non_overlap_split: FO, upgrade_split: FU) -> Self { Self { - target_level_chooser, target_level_split, non_overlap_split, upgrade_split, @@ -83,9 +72,8 @@ where } } -impl Display for SplitBasedFileClassifier +impl Display for SplitBasedFileClassifier where - T: TargetLevelChooser, FT: FilesSplit, FO: FilesSplit, FU: FilesSplit, @@ -93,18 +81,14 @@ where fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "split_based(target_level_chooser={}, target_level_split={}, non_overlap_split={}, upgrade_split={})", - self.target_level_chooser, - self.target_level_split, - self.non_overlap_split, - self.upgrade_split, + "split_based(target_level_split={}, non_overlap_split={}, upgrade_split={})", + self.target_level_split, self.non_overlap_split, self.upgrade_split, ) } } -impl FileClassifier for SplitBasedFileClassifier +impl FileClassifier for SplitBasedFileClassifier where - T: TargetLevelChooser, FT: FilesSplit, FO: FilesSplit, FU: FilesSplit, @@ -112,13 +96,11 @@ where fn classify( &self, _partition_info: &PartitionInfo, - _round_info: &RoundInfo, + round_info: &RoundInfo, files: Vec, ) -> FileClassification { let files_to_compact = files; - - // Detect target level to compact to - let target_level = self.target_level_chooser.detect(&files_to_compact); + let target_level = round_info.target_level(); // Split files into files_to_compact, files_to_upgrade, and files_to_keep // diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index 6e764bbbc3..f108718553 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -43,7 +43,6 @@ use super::{ and::AndIdOnlyPartitionFilter, shard::ShardPartitionFilter, IdOnlyPartitionFilter, }, ir_planner::{logging::LoggingIRPlannerWrapper, planner_v1::V1IRPlanner}, - level_exist::one_level::OneLevelExist, parquet_file_sink::{ dedicated::DedicatedExecParquetFileSinkWrapper, logging::LoggingParquetFileSinkWrapper, object_store::ObjectStoreParquetFileSink, @@ -84,7 +83,6 @@ use super::{ round_split::all_now::AllNowRoundSplit, scratchpad::{noop::NoopScratchpadGen, prod::ProdScratchpadGen, ScratchpadGen}, skipped_compactions_source::catalog::CatalogSkippedCompactionsSource, - target_level_chooser::target_level::TargetLevelTargetLevelChooser, Components, }; @@ -365,7 +363,6 @@ fn version_specific_file_classifier(config: &Config) -> Arc match config.compact_version { AlgoVersion::AllAtOnce => Arc::new(AllAtOnceFileClassifier::new()), AlgoVersion::TargetLevel => Arc::new(SplitBasedFileClassifier::new( - TargetLevelTargetLevelChooser::new(OneLevelExist::new()), TargetLevelTargetLevelSplit::new(), TargetLevelNonOverlapSplit::new(), TargetLevelUpgradeSplit::new(config.max_desired_file_size_bytes), diff --git a/compactor2/src/components/level_exist/mod.rs b/compactor2/src/components/level_exist/mod.rs deleted file mode 100644 index 51a5c68ec7..0000000000 --- a/compactor2/src/components/level_exist/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -use std::fmt::{Debug, Display}; - -use data_types::CompactionLevel; - -pub mod one_level; - -pub trait LevelExist: Debug + Display + Send + Sync { - /// return true if at least one file in the given level - fn apply(&self, files: &[data_types::ParquetFile], level: CompactionLevel) -> bool; -} diff --git a/compactor2/src/components/level_exist/one_level.rs b/compactor2/src/components/level_exist/one_level.rs deleted file mode 100644 index 21042fdfa7..0000000000 --- a/compactor2/src/components/level_exist/one_level.rs +++ /dev/null @@ -1,71 +0,0 @@ -use std::fmt::Display; - -use data_types::CompactionLevel; - -use super::LevelExist; - -#[derive(Debug)] -pub struct OneLevelExist {} - -impl OneLevelExist { - pub fn new() -> Self { - Self {} - } -} - -impl Display for OneLevelExist { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "one level",) - } -} - -impl LevelExist for OneLevelExist { - fn apply(&self, files: &[data_types::ParquetFile], level: CompactionLevel) -> bool { - files.iter().any(|f| f.compaction_level == level) - } -} - -#[cfg(test)] -mod tests { - use iox_tests::ParquetFileBuilder; - - use super::*; - - #[test] - fn test_display() { - assert_eq!(OneLevelExist::new().to_string(), "one level"); - } - - #[test] - fn test_apply() { - let filter = OneLevelExist::new(); - - let f0 = ParquetFileBuilder::new(0) - .with_compaction_level(CompactionLevel::Initial) - .build(); - let f1 = ParquetFileBuilder::new(1) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .build(); - let f2 = ParquetFileBuilder::new(2) - .with_compaction_level(CompactionLevel::Final) - .build(); - - // empty list - assert!(!filter.apply(&[], CompactionLevel::Initial)); - - // list of one - assert!(filter.apply(&[f0.clone()], CompactionLevel::Initial)); - assert!(filter.apply(&[f1.clone()], CompactionLevel::FileNonOverlapped)); - assert!(filter.apply(&[f2.clone()], CompactionLevel::Final)); - assert!(!filter.apply(&[f0.clone()], CompactionLevel::FileNonOverlapped)); - assert!(!filter.apply(&[f1.clone()], CompactionLevel::Initial)); - assert!(!filter.apply(&[f2.clone()], CompactionLevel::Initial)); - // list of many - assert!(filter.apply(&[f2.clone(), f0.clone()], CompactionLevel::Initial)); - assert!(filter.apply( - &vec![f2.clone(), f0, f1.clone()], - CompactionLevel::FileNonOverlapped - )); - assert!(!filter.apply(&[f2, f1], CompactionLevel::Initial)); - } -} diff --git a/compactor2/src/components/mod.rs b/compactor2/src/components/mod.rs index 9377f361b7..19171b3c78 100644 --- a/compactor2/src/components/mod.rs +++ b/compactor2/src/components/mod.rs @@ -22,7 +22,6 @@ pub mod files_split; pub mod hardcoded; pub mod id_only_partition_filter; pub mod ir_planner; -pub mod level_exist; pub mod namespaces_source; pub mod parquet_file_sink; pub mod parquet_files_sink; @@ -39,7 +38,6 @@ pub mod round_split; pub mod scratchpad; pub mod skipped_compactions_source; pub mod tables_source; -pub mod target_level_chooser; /// Pluggable system to determine compactor behavior. Please see /// [Crate Level Documentation](crate) for more details on the diff --git a/compactor2/src/components/round_info_source/mod.rs b/compactor2/src/components/round_info_source/mod.rs index 4913f16dd1..29eda70215 100644 --- a/compactor2/src/components/round_info_source/mod.rs +++ b/compactor2/src/components/round_info_source/mod.rs @@ -4,7 +4,7 @@ use std::{ }; use async_trait::async_trait; -use data_types::ParquetFile; +use data_types::{CompactionLevel, ParquetFile}; use observability_deps::tracing::debug; use crate::{error::DynError, PartitionInfo, RoundInfo}; @@ -71,11 +71,25 @@ impl RoundInfoSource for LevelBasedRoundInfo { async fn calculate( &self, _partition_info: &PartitionInfo, - _files: &[ParquetFile], + files: &[ParquetFile], ) -> Result, DynError> { - // TODO: use this to calculate splits - Ok(Arc::new(RoundInfo::TargetLevel { - target_level: data_types::CompactionLevel::Initial, - })) + let target_level = pick_level(files); + Ok(Arc::new(RoundInfo::TargetLevel { target_level })) } } + +fn pick_level(files: &[data_types::ParquetFile]) -> CompactionLevel { + // Start with initial level + // If there are files in this level, the compaction's target level will be the next level. + // Otherwise repeat until reaching the final level. + let mut level = CompactionLevel::Initial; + while level != CompactionLevel::Final { + if files.iter().any(|f| f.compaction_level == level) { + return level.next(); + } + + level = level.next(); + } + + level +} diff --git a/compactor2/src/components/target_level_chooser/mod.rs b/compactor2/src/components/target_level_chooser/mod.rs deleted file mode 100644 index 92716a96f7..0000000000 --- a/compactor2/src/components/target_level_chooser/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -use std::fmt::{Debug, Display}; - -use data_types::CompactionLevel; - -pub mod target_level; - -pub trait TargetLevelChooser: Debug + Display + Send + Sync { - /// Return the compaction level the given files are suitable to get compacted to - fn detect(&self, files: &[data_types::ParquetFile]) -> CompactionLevel; -} diff --git a/compactor2/src/components/target_level_chooser/target_level.rs b/compactor2/src/components/target_level_chooser/target_level.rs deleted file mode 100644 index e0d2340d28..0000000000 --- a/compactor2/src/components/target_level_chooser/target_level.rs +++ /dev/null @@ -1,150 +0,0 @@ -use std::fmt::Display; - -use data_types::CompactionLevel; - -use crate::components::level_exist::LevelExist; - -use super::TargetLevelChooser; - -/// For TargetLevel version, we support compact (L0s + L1s) to L1s and (L1s + L2s) to L2s -/// Target is the next level of the lowest level that has files -#[derive(Debug)] -pub struct TargetLevelTargetLevelChooser -where - T: LevelExist, -{ - inner: T, -} - -impl TargetLevelTargetLevelChooser -where - T: LevelExist, -{ - pub fn new(inner: T) -> Self { - Self { inner } - } -} - -impl Display for TargetLevelTargetLevelChooser -where - T: LevelExist, -{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Target level detection for TargetLevel version",) - } -} - -impl TargetLevelChooser for TargetLevelTargetLevelChooser -where - T: LevelExist, -{ - fn detect(&self, files: &[data_types::ParquetFile]) -> CompactionLevel { - // Start with initial level - // If there are files in this level, the compaction's target level will be the next level. - // Otherwise repeat until reaching the final level. - let mut level = CompactionLevel::Initial; - while level != CompactionLevel::Final { - if self.inner.apply(files, level) { - return level.next(); - } - - level = level.next(); - } - - // All files are in final level and should have been filtered out earlier - panic!("Neither level-0 nor level-1 found in target level detection"); - } -} - -#[cfg(test)] -mod tests { - use crate::components::level_exist::one_level::OneLevelExist; - use iox_tests::ParquetFileBuilder; - - use super::*; - - #[test] - fn test_display() { - assert_eq!( - TargetLevelTargetLevelChooser::new(OneLevelExist::new()).to_string(), - "Target level detection for TargetLevel version" - ); - } - - #[test] - #[should_panic(expected = "Neither level-0 nor level-1 found in target level detection")] - fn test_apply_empty() { - let target_level_chooser = TargetLevelTargetLevelChooser::new(OneLevelExist::new()); - - target_level_chooser.detect(&[]); - } - - #[test] - fn test_apply_only_l0() { - let target_level_chooser = TargetLevelTargetLevelChooser::new(OneLevelExist::new()); - - let f0 = ParquetFileBuilder::new(0) - .with_compaction_level(CompactionLevel::Initial) - .build(); - - assert_eq!( - target_level_chooser.detect(&[f0]), - CompactionLevel::FileNonOverlapped - ); - } - - #[test] - fn test_apply_only_l1() { - let target_level_chooser = TargetLevelTargetLevelChooser::new(OneLevelExist::new()); - - let f1 = ParquetFileBuilder::new(1) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .build(); - - assert_eq!(target_level_chooser.detect(&[f1]), CompactionLevel::Final); - } - - #[test] - #[should_panic(expected = "Neither level-0 nor level-1 found in target level detection")] - fn test_apply_only_l2() { - let target_level_chooser = TargetLevelTargetLevelChooser::new(OneLevelExist::new()); - - let f2 = ParquetFileBuilder::new(2) - .with_compaction_level(CompactionLevel::Final) - .build(); - - target_level_chooser.detect(&[f2]); - } - - #[test] - fn test_apply_many_files() { - let target_level_chooser = TargetLevelTargetLevelChooser::new(OneLevelExist::new()); - - let f0 = ParquetFileBuilder::new(0) - .with_compaction_level(CompactionLevel::Initial) - .build(); - let f1 = ParquetFileBuilder::new(1) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .build(); - let f2 = ParquetFileBuilder::new(2) - .with_compaction_level(CompactionLevel::Final) - .build(); - - assert_eq!( - target_level_chooser.detect(&[f1.clone(), f0.clone()]), - CompactionLevel::FileNonOverlapped - ); - assert_eq!( - target_level_chooser.detect(&[f2.clone(), f0.clone()]), - CompactionLevel::FileNonOverlapped - ); - assert_eq!( - target_level_chooser.detect(&[f2.clone(), f0, f1.clone()]), - CompactionLevel::FileNonOverlapped - ); - assert_eq!( - target_level_chooser.detect(&[f2, f1]), - CompactionLevel::Final - ); - } -} diff --git a/compactor2/src/round_info.rs b/compactor2/src/round_info.rs index c2dba72bd6..3d1b524f69 100644 --- a/compactor2/src/round_info.rs +++ b/compactor2/src/round_info.rs @@ -25,3 +25,13 @@ impl Display for RoundInfo { } } } + +impl RoundInfo { + /// what levels should the files in this round be? + pub fn target_level(&self) -> CompactionLevel { + match self { + Self::TargetLevel { target_level } => *target_level, + Self::ManySmallFiles => CompactionLevel::Initial, + } + } +}