diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index fb67253c27..d693fd8c9e 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -31,6 +31,7 @@ use super::{ and::AndIdOnlyPartitionFilter, by_id::ByIdPartitionFilter, shard::ShardPartitionFilter, IdOnlyPartitionFilter, }, + level_exist::one_level::OneLevelExist, parquet_file_sink::{ dedicated::DedicatedExecParquetFileSinkWrapper, logging::LoggingParquetFileSinkWrapper, object_store::ObjectStoreParquetFileSink, @@ -61,8 +62,8 @@ use super::{ round_split::all_now::AllNowRoundSplit, scratchpad::{ignore_writes_object_store::IgnoreWrites, prod::ProdScratchpadGen}, skipped_compactions_source::catalog::CatalogSkippedCompactionsSource, - target_level::{ - hot_cold::HotColdTargetLevelDetection, naive::NaiveTargetLevelDetection, + target_level_detection::{ + all_at_once::AllAtOnceTargetLevelDetection, target_level::TargetLevelTargetLevelDetection, TargetLevelDetection, }, Components, @@ -287,7 +288,9 @@ fn version_specific_partition_filters(config: &Config) -> Vec Arc { match config.compact_version { - AlgoVersion::Naive => Arc::new(NaiveTargetLevelDetection::new()), - AlgoVersion::HotCold => Arc::new(HotColdTargetLevelDetection::new()), + AlgoVersion::Naive => Arc::new(AllAtOnceTargetLevelDetection::new()), + AlgoVersion::HotCold => { + Arc::new(TargetLevelTargetLevelDetection::new(OneLevelExist::new())) + } } } diff --git a/compactor2/src/components/level_filter/mod.rs b/compactor2/src/components/level_exist/mod.rs similarity index 60% rename from compactor2/src/components/level_filter/mod.rs rename to compactor2/src/components/level_exist/mod.rs index cc8c747040..51a5c68ec7 100644 --- a/compactor2/src/components/level_filter/mod.rs +++ b/compactor2/src/components/level_exist/mod.rs @@ -4,7 +4,7 @@ use data_types::CompactionLevel; pub mod one_level; -pub trait LevelFilter: Debug + Display + Send + Sync { - /// return true if this filter has the given level +pub trait LevelExist: Debug + Display + Send + Sync { + /// return true if at least one file in the given level fn apply(&self, files: &[data_types::ParquetFile], level: CompactionLevel) -> bool; } diff --git a/compactor2/src/components/level_filter/one_level.rs b/compactor2/src/components/level_exist/one_level.rs similarity index 87% rename from compactor2/src/components/level_filter/one_level.rs rename to compactor2/src/components/level_exist/one_level.rs index b2e94df8b1..e4c6087513 100644 --- a/compactor2/src/components/level_filter/one_level.rs +++ b/compactor2/src/components/level_exist/one_level.rs @@ -2,24 +2,24 @@ use std::fmt::Display; use data_types::CompactionLevel; -use super::LevelFilter; +use super::LevelExist; #[derive(Debug)] -pub struct OneLevelFilter {} +pub struct OneLevelExist {} -impl OneLevelFilter { +impl OneLevelExist { pub fn new() -> Self { Self {} } } -impl Display for OneLevelFilter { +impl Display for OneLevelExist { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "one level",) } } -impl LevelFilter for OneLevelFilter { +impl LevelExist for OneLevelExist { fn apply(&self, files: &[data_types::ParquetFile], level: CompactionLevel) -> bool { files.iter().any(|f| f.compaction_level == level) } @@ -33,12 +33,12 @@ mod tests { #[test] fn test_display() { - assert_eq!(OneLevelFilter::new().to_string(), "one level"); + assert_eq!(OneLevelExist::new().to_string(), "one level"); } #[test] fn test_apply() { - let filter = OneLevelFilter::new(); + let filter = OneLevelExist::new(); let f0 = ParquetFileBuilder::new(0) .with_compaction_level(CompactionLevel::Initial) diff --git a/compactor2/src/components/mod.rs b/compactor2/src/components/mod.rs index 426ebd33cb..a69d10deed 100644 --- a/compactor2/src/components/mod.rs +++ b/compactor2/src/components/mod.rs @@ -7,7 +7,7 @@ use self::{ partition_files_source::PartitionFilesSource, partition_filter::PartitionFilter, partition_source::PartitionSource, partitions_source::PartitionsSource, round_split::RoundSplit, scratchpad::ScratchpadGen, tables_source::TablesSource, - target_level::TargetLevelDetection, + target_level_detection::TargetLevelDetection, }; pub mod combos; @@ -19,7 +19,7 @@ pub mod file_filter; pub mod files_filter; pub mod hardcoded; pub mod id_only_partition_filter; -pub mod level_filter; +pub mod level_exist; pub mod namespaces_source; pub mod parquet_file_sink; pub mod partition_done_sink; @@ -32,7 +32,7 @@ pub mod round_split; pub mod scratchpad; pub mod skipped_compactions_source; pub mod tables_source; -pub mod target_level; +pub mod target_level_detection; #[derive(Debug, Clone)] pub struct Components { diff --git a/compactor2/src/components/target_level/hot_cold.rs b/compactor2/src/components/target_level/hot_cold.rs deleted file mode 100644 index c6045f3166..0000000000 --- a/compactor2/src/components/target_level/hot_cold.rs +++ /dev/null @@ -1,105 +0,0 @@ -use std::fmt::Display; - -use data_types::CompactionLevel; - -use crate::components::level_filter::{one_level::OneLevelFilter, LevelFilter}; - -use super::TargetLevelDetection; - -#[derive(Debug)] -pub struct HotColdTargetLevelDetection where T: LevelFilter { - level_filter: T, -} - -impl HotColdTargetLevelDetection { - pub fn new() -> Self { - Self { - level_filter: OneLevelFilter::new(), - } - } -} - -impl Display for HotColdTargetLevelDetection { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "hot cold target level detection",) - } -} - -impl TargetLevelDetection for HotColdTargetLevelDetection { - // For HotCold version, we support compact (L0s + L1s) to L1s and (L1s + L2s) to L2s - // Target is the next level of the lowest level that has files - fn detect(&self, files: &[data_types::ParquetFile]) -> Option { - // Start with initial level - // If there are files in this level, the compaction's target level will be the next level. - // Otherwise repeat until reaching the final level. - let mut level = CompactionLevel::Initial; - while level != CompactionLevel::Final { - if self.level_filter.apply(files, level) { - return Some(level.next()); - } - - level = level.next(); - } - - // All files are in final level, nothing to compact - None - } -} - -#[cfg(test)] -mod tests { - use crate::test_util::ParquetFileBuilder; - - use super::*; - - #[test] - fn test_display() { - assert_eq!( - HotColdTargetLevelDetection::new().to_string(), - "hot cold target level detection" - ); - } - - #[test] - fn test_apply() { - let target_level_detection = HotColdTargetLevelDetection::new(); - - let f0 = ParquetFileBuilder::new(0) - .with_compaction_level(CompactionLevel::Initial) - .build(); - let f1 = ParquetFileBuilder::new(1) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .build(); - let f2 = ParquetFileBuilder::new(2) - .with_compaction_level(CompactionLevel::Final) - .build(); - - // list of one - assert_eq!( - target_level_detection.detect(&[f0.clone()]), - Some(CompactionLevel::FileNonOverlapped) - ); - assert_eq!( - target_level_detection.detect(&[f1.clone()]), - Some(CompactionLevel::Final) - ); - assert_eq!(target_level_detection.detect(&[f2.clone()]), None); - // list of many - assert_eq!( - target_level_detection.detect(&[f1.clone(), f0.clone()]), - Some(CompactionLevel::FileNonOverlapped) - ); - assert_eq!( - target_level_detection.detect(&[f2.clone(), f0.clone()]), - Some(CompactionLevel::FileNonOverlapped) - ); - assert_eq!( - target_level_detection.detect(&[f2.clone(), f0, f1.clone()]), - Some(CompactionLevel::FileNonOverlapped) - ); - assert_eq!( - target_level_detection.detect(&[f2, f1]), - Some(CompactionLevel::Final) - ); - } -} diff --git a/compactor2/src/components/target_level/mod.rs b/compactor2/src/components/target_level/mod.rs deleted file mode 100644 index 1475063cf2..0000000000 --- a/compactor2/src/components/target_level/mod.rs +++ /dev/null @@ -1,11 +0,0 @@ -use std::fmt::{Debug, Display}; - -use data_types::CompactionLevel; - -pub mod hot_cold; -pub mod naive; - -pub trait TargetLevelDetection: Debug + Display + Send + Sync { - /// return the compaction level the given files are suitable to get compacted to - fn detect(&self, files: &[data_types::ParquetFile]) -> Option; -} diff --git a/compactor2/src/components/target_level/naive.rs b/compactor2/src/components/target_level/naive.rs deleted file mode 100644 index e6118b40fd..0000000000 --- a/compactor2/src/components/target_level/naive.rs +++ /dev/null @@ -1,88 +0,0 @@ -use std::fmt::Display; - -use data_types::CompactionLevel; - -use super::TargetLevelDetection; - -#[derive(Debug)] -pub struct NaiveTargetLevelDetection {} - -impl NaiveTargetLevelDetection { - pub fn new() -> Self { - Self {} - } -} - -impl Display for NaiveTargetLevelDetection { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "naive target level detection",) - } -} - -impl TargetLevelDetection for NaiveTargetLevelDetection { - // For naive version, we only compact (L0s + L1s) to L1s - // The target level is always 1 and there must be at least one file in L0 - fn detect(&self, files: &[data_types::ParquetFile]) -> Option { - // Check if there are files in Compaction::Initial level - if files - .iter() - .any(|file| file.compaction_level == CompactionLevel::Initial) - { - return Some(CompactionLevel::FileNonOverlapped); - } - - None - } -} - -#[cfg(test)] -mod tests { - use crate::test_util::ParquetFileBuilder; - - use super::*; - - #[test] - fn test_display() { - assert_eq!( - NaiveTargetLevelDetection::new().to_string(), - "naive target level detection" - ); - } - - #[test] - fn test_apply() { - let target_level_detection = NaiveTargetLevelDetection::new(); - - let f0 = ParquetFileBuilder::new(0) - .with_compaction_level(CompactionLevel::Initial) - .build(); - let f1 = ParquetFileBuilder::new(1) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .build(); - let f2 = ParquetFileBuilder::new(2) - .with_compaction_level(CompactionLevel::Final) - .build(); - - // list of one - assert_eq!( - target_level_detection.detect(&[f0.clone()]), - Some(CompactionLevel::FileNonOverlapped) - ); - assert_eq!(target_level_detection.detect(&[f1.clone()]), None); - assert_eq!(target_level_detection.detect(&[f2.clone()]), None); - // list of many - assert_eq!( - target_level_detection.detect(&[f1.clone(), f0.clone()]), - Some(CompactionLevel::FileNonOverlapped) - ); - assert_eq!( - target_level_detection.detect(&[f2.clone(), f0.clone()]), - Some(CompactionLevel::FileNonOverlapped) - ); - assert_eq!( - target_level_detection.detect(&[f2.clone(), f0, f1.clone()]), - Some(CompactionLevel::FileNonOverlapped) - ); - assert_eq!(target_level_detection.detect(&[f2, f1]), None); - } -} diff --git a/compactor2/src/components/target_level_detection/all_at_once.rs b/compactor2/src/components/target_level_detection/all_at_once.rs new file mode 100644 index 0000000000..94762e42a9 --- /dev/null +++ b/compactor2/src/components/target_level_detection/all_at_once.rs @@ -0,0 +1,134 @@ +use std::fmt::Display; + +use data_types::CompactionLevel; + +use super::TargetLevelDetection; + +#[derive(Debug)] +pub struct AllAtOnceTargetLevelDetection {} + +impl AllAtOnceTargetLevelDetection { + pub fn new() -> Self { + Self {} + } +} + +impl Display for AllAtOnceTargetLevelDetection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Target level detection for AllAtOnce version",) + } +} + +impl TargetLevelDetection for AllAtOnceTargetLevelDetection { + // For AllAtOnce version, we only compact (L0s + L1s) to L1s + // The target level is always 1 and there must be at least one file in L0 + fn detect(&self, files: &[data_types::ParquetFile]) -> CompactionLevel { + // Check if there are files in Compaction::Initial level + if files + .iter() + .any(|file| file.compaction_level == CompactionLevel::Initial) + { + return CompactionLevel::FileNonOverlapped; + } + + panic!("Level-0 file not found in target level detection"); + } +} + +#[cfg(test)] +mod tests { + use crate::test_util::ParquetFileBuilder; + + use super::*; + + #[test] + fn test_display() { + assert_eq!( + AllAtOnceTargetLevelDetection::new().to_string(), + "Target level detection for AllAtOnce version" + ); + } + + #[test] + #[should_panic(expected = "Level-0 file not found in target level detection")] + fn test_apply_empty() { + let target_level_detection = AllAtOnceTargetLevelDetection::new(); + + target_level_detection.detect(&[]); + } + + #[test] + #[should_panic(expected = "Level-0 file not found in target level detection")] + fn test_only_l1() { + let target_level_detection = AllAtOnceTargetLevelDetection::new(); + + let f1 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + + target_level_detection.detect(&[f1]); + } + + #[test] + #[should_panic(expected = "Level-0 file not found in target level detection")] + fn test_only_l2() { + let target_level_detection = AllAtOnceTargetLevelDetection::new(); + + let f2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + + target_level_detection.detect(&[f2]); + } + + #[test] + #[should_panic(expected = "Level-0 file not found in target level detection")] + fn test_only_l1_l2() { + let target_level_detection = AllAtOnceTargetLevelDetection::new(); + + let f1 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + + let f2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + + target_level_detection.detect(&[f1, f2]); + } + + #[test] + fn test_apply() { + let target_level_detection = AllAtOnceTargetLevelDetection::new(); + + let f0 = ParquetFileBuilder::new(0) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let f1 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + let f2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + + // list of one + assert_eq!( + target_level_detection.detect(&[f0.clone()]), + CompactionLevel::FileNonOverlapped + ); + + // list of many + assert_eq!( + target_level_detection.detect(&[f1.clone(), f0.clone()]), + CompactionLevel::FileNonOverlapped + ); + assert_eq!( + target_level_detection.detect(&[f2.clone(), f0.clone()]), + CompactionLevel::FileNonOverlapped + ); + assert_eq!( + target_level_detection.detect(&[f2, f0, f1]), + CompactionLevel::FileNonOverlapped + ); + } +} diff --git a/compactor2/src/components/target_level_detection/mod.rs b/compactor2/src/components/target_level_detection/mod.rs new file mode 100644 index 0000000000..3062007a54 --- /dev/null +++ b/compactor2/src/components/target_level_detection/mod.rs @@ -0,0 +1,11 @@ +use std::fmt::{Debug, Display}; + +use data_types::CompactionLevel; + +pub mod all_at_once; +pub mod target_level; + +pub trait TargetLevelDetection: Debug + Display + Send + Sync { + /// Return the compaction level the given files are suitable to get compacted to + fn detect(&self, files: &[data_types::ParquetFile]) -> CompactionLevel; +} diff --git a/compactor2/src/components/target_level_detection/target_level.rs b/compactor2/src/components/target_level_detection/target_level.rs new file mode 100644 index 0000000000..fc71e697da --- /dev/null +++ b/compactor2/src/components/target_level_detection/target_level.rs @@ -0,0 +1,149 @@ +use std::fmt::Display; + +use data_types::CompactionLevel; + +use crate::components::level_exist::LevelExist; + +use super::TargetLevelDetection; + +/// For TargetLevel version, we support compact (L0s + L1s) to L1s and (L1s + L2s) to L2s +/// Target is the next level of the lowest level that has files +#[derive(Debug)] +pub struct TargetLevelTargetLevelDetection +where + T: LevelExist, +{ + inner: T, +} + +impl TargetLevelTargetLevelDetection +where + T: LevelExist, +{ + pub fn new(inner: T) -> Self { + Self { inner } + } +} + +impl Display for TargetLevelTargetLevelDetection +where + T: LevelExist, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Target level detection for TargetLevel version",) + } +} + +impl TargetLevelDetection for TargetLevelTargetLevelDetection +where + T: LevelExist, +{ + fn detect(&self, files: &[data_types::ParquetFile]) -> CompactionLevel { + // Start with initial level + // If there are files in this level, the compaction's target level will be the next level. + // Otherwise repeat until reaching the final level. + let mut level = CompactionLevel::Initial; + while level != CompactionLevel::Final { + if self.inner.apply(files, level) { + return level.next(); + } + + level = level.next(); + } + + // All files are in final level and should have been filtered out earlier + panic!("Neither level-0 nor level-1 found in target level detection"); + } +} + +#[cfg(test)] +mod tests { + use crate::{components::level_exist::one_level::OneLevelExist, test_util::ParquetFileBuilder}; + + use super::*; + + #[test] + fn test_display() { + assert_eq!( + TargetLevelTargetLevelDetection::new(OneLevelExist::new()).to_string(), + "Target level detection for TargetLevel version" + ); + } + + #[test] + #[should_panic(expected = "Neither level-0 nor level-1 found in target level detection")] + fn test_apply_empty() { + let target_level_detection = TargetLevelTargetLevelDetection::new(OneLevelExist::new()); + + target_level_detection.detect(&[]); + } + + #[test] + fn test_apply_only_l0() { + let target_level_detection = TargetLevelTargetLevelDetection::new(OneLevelExist::new()); + + let f0 = ParquetFileBuilder::new(0) + .with_compaction_level(CompactionLevel::Initial) + .build(); + + assert_eq!( + target_level_detection.detect(&[f0]), + CompactionLevel::FileNonOverlapped + ); + } + + #[test] + fn test_apply_only_l1() { + let target_level_detection = TargetLevelTargetLevelDetection::new(OneLevelExist::new()); + + let f1 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + + assert_eq!(target_level_detection.detect(&[f1]), CompactionLevel::Final); + } + + #[test] + #[should_panic(expected = "Neither level-0 nor level-1 found in target level detection")] + fn test_apply_only_l2() { + let target_level_detection = TargetLevelTargetLevelDetection::new(OneLevelExist::new()); + + let f2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + + target_level_detection.detect(&[f2]); + } + + #[test] + fn test_apply_many_files() { + let target_level_detection = TargetLevelTargetLevelDetection::new(OneLevelExist::new()); + + let f0 = ParquetFileBuilder::new(0) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let f1 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + let f2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + + assert_eq!( + target_level_detection.detect(&[f1.clone(), f0.clone()]), + CompactionLevel::FileNonOverlapped + ); + assert_eq!( + target_level_detection.detect(&[f2.clone(), f0.clone()]), + CompactionLevel::FileNonOverlapped + ); + assert_eq!( + target_level_detection.detect(&[f2.clone(), f0, f1.clone()]), + CompactionLevel::FileNonOverlapped + ); + assert_eq!( + target_level_detection.detect(&[f2, f1]), + CompactionLevel::Final + ); + } +} diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index b6c434c939..fa00b0dcdd 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -308,8 +308,6 @@ fn buil_compaction_plan( ) -> Result { // Detect target level to compact to let target_level = components.target_level_detection.detect(&files); - assert!(target_level.is_some(), "No target level is detected"); - let target_level = target_level.unwrap(); // Split files into files_to_compact, files_to_upgrade, and files_to_keep //