diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index b441e07ec6..fb67253c27 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -61,6 +61,10 @@ use super::{ round_split::all_now::AllNowRoundSplit, scratchpad::{ignore_writes_object_store::IgnoreWrites, prod::ProdScratchpadGen}, skipped_compactions_source::catalog::CatalogSkippedCompactionsSource, + target_level::{ + hot_cold::HotColdTargetLevelDetection, naive::NaiveTargetLevelDetection, + TargetLevelDetection, + }, Components, }; @@ -235,6 +239,7 @@ pub fn hardcoded_components(config: &Config) -> Arc { Arc::clone(config.parquet_store_scratchpad.object_store()), scratchpad_store_output, )), + target_level_detection: version_specific_target_level_detection(config), }) } @@ -279,3 +284,10 @@ fn version_specific_partition_filters(config: &Config) -> Vec Arc { + match config.compact_version { + AlgoVersion::Naive => Arc::new(NaiveTargetLevelDetection::new()), + AlgoVersion::HotCold => Arc::new(HotColdTargetLevelDetection::new()), + } +} diff --git a/compactor2/src/components/level_filter/mod.rs b/compactor2/src/components/level_filter/mod.rs new file mode 100644 index 0000000000..1d80ff9ed5 --- /dev/null +++ b/compactor2/src/components/level_filter/mod.rs @@ -0,0 +1,10 @@ +use std::fmt::{Debug, Display}; + +use data_types::CompactionLevel; + +pub mod one_level; + +pub trait LevelFilter: Debug + Display + Send + Sync { + // return true if this filter has the given level + fn apply(&self, files: &[data_types::ParquetFile], level: CompactionLevel) -> bool; +} diff --git a/compactor2/src/components/level_filter/one_level.rs b/compactor2/src/components/level_filter/one_level.rs new file mode 100644 index 0000000000..b2e94df8b1 --- /dev/null +++ b/compactor2/src/components/level_filter/one_level.rs @@ -0,0 +1,71 @@ +use std::fmt::Display; + +use data_types::CompactionLevel; + +use super::LevelFilter; + +#[derive(Debug)] +pub struct OneLevelFilter {} + +impl OneLevelFilter { + pub fn new() -> Self { + Self {} + } +} + +impl Display for OneLevelFilter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "one level",) + } +} + +impl LevelFilter for OneLevelFilter { + fn apply(&self, files: &[data_types::ParquetFile], level: CompactionLevel) -> bool { + files.iter().any(|f| f.compaction_level == level) + } +} + +#[cfg(test)] +mod tests { + use crate::test_util::ParquetFileBuilder; + + use super::*; + + #[test] + fn test_display() { + assert_eq!(OneLevelFilter::new().to_string(), "one level"); + } + + #[test] + fn test_apply() { + let filter = OneLevelFilter::new(); + + let f0 = ParquetFileBuilder::new(0) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let f1 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + let f2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + + // empty list + assert!(!filter.apply(&[], CompactionLevel::Initial)); + + // list of one + assert!(filter.apply(&[f0.clone()], CompactionLevel::Initial)); + assert!(filter.apply(&[f1.clone()], CompactionLevel::FileNonOverlapped)); + assert!(filter.apply(&[f2.clone()], CompactionLevel::Final)); + assert!(!filter.apply(&[f0.clone()], CompactionLevel::FileNonOverlapped)); + assert!(!filter.apply(&[f1.clone()], CompactionLevel::Initial)); + assert!(!filter.apply(&[f2.clone()], CompactionLevel::Initial)); + // list of many + assert!(filter.apply(&[f2.clone(), f0.clone()], CompactionLevel::Initial)); + assert!(filter.apply( + &vec![f2.clone(), f0, f1.clone()], + CompactionLevel::FileNonOverlapped + )); + assert!(!filter.apply(&[f2, f1], CompactionLevel::Initial)); + } +} diff --git a/compactor2/src/components/mod.rs b/compactor2/src/components/mod.rs index 34af944c21..426ebd33cb 100644 --- a/compactor2/src/components/mod.rs +++ b/compactor2/src/components/mod.rs @@ -7,6 +7,7 @@ use self::{ partition_files_source::PartitionFilesSource, partition_filter::PartitionFilter, partition_source::PartitionSource, partitions_source::PartitionsSource, round_split::RoundSplit, scratchpad::ScratchpadGen, tables_source::TablesSource, + target_level::TargetLevelDetection, }; pub mod combos; @@ -18,6 +19,7 @@ pub mod file_filter; pub mod files_filter; pub mod hardcoded; pub mod id_only_partition_filter; +pub mod level_filter; pub mod namespaces_source; pub mod parquet_file_sink; pub mod partition_done_sink; @@ -30,6 +32,7 @@ pub mod round_split; pub mod scratchpad; pub mod skipped_compactions_source; pub mod tables_source; +pub mod target_level; #[derive(Debug, Clone)] pub struct Components { @@ -48,4 +51,5 @@ pub struct Components { pub round_split: Arc, pub divide_initial: Arc, pub scratchpad_gen: Arc, + pub target_level_detection: Arc, } diff --git a/compactor2/src/components/report.rs b/compactor2/src/components/report.rs index e00ad99061..e41ccd5a42 100644 --- a/compactor2/src/components/report.rs +++ b/compactor2/src/components/report.rs @@ -92,6 +92,7 @@ pub fn log_components(components: &Components) { round_split, divide_initial, scratchpad_gen, + target_level_detection, } = components; info!( @@ -110,6 +111,7 @@ pub fn log_components(components: &Components) { %round_split, %divide_initial, %scratchpad_gen, + %target_level_detection, "component setup", ); } diff --git a/compactor2/src/components/target_level/hot_cold.rs b/compactor2/src/components/target_level/hot_cold.rs new file mode 100644 index 0000000000..42012cf53b --- /dev/null +++ b/compactor2/src/components/target_level/hot_cold.rs @@ -0,0 +1,105 @@ +use std::fmt::Display; + +use data_types::CompactionLevel; + +use crate::components::level_filter::{one_level::OneLevelFilter, LevelFilter}; + +use super::TargetLevelDetection; + +#[derive(Debug)] +pub struct HotColdTargetLevelDetection { + level_filter: OneLevelFilter, +} + +impl HotColdTargetLevelDetection { + pub fn new() -> Self { + Self { + level_filter: OneLevelFilter::new(), + } + } +} + +impl Display for HotColdTargetLevelDetection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "hot cold target level detection",) + } +} + +impl TargetLevelDetection for HotColdTargetLevelDetection { + // For HotCold version, we support compact (L0s + L1s) to L1s and (L1s + L2s) to L2s + // Target is the next level of the lowest level that has files + fn detect(&self, files: &[data_types::ParquetFile]) -> Option { + // Start with initial level + // If there are files in this level, the compaction's target level will be the next level. + // Otherwiuse repeat until reaching the final level. + let mut level = CompactionLevel::Initial; + while level != CompactionLevel::Final { + if self.level_filter.apply(files, level) { + return Some(level.next()); + } + + level = level.next(); + } + + // All files are in final level, nothing to compact + None + } +} + +#[cfg(test)] +mod tests { + use crate::test_util::ParquetFileBuilder; + + use super::*; + + #[test] + fn test_display() { + assert_eq!( + HotColdTargetLevelDetection::new().to_string(), + "hot cold target level detection" + ); + } + + #[test] + fn test_apply() { + let target_level_detection = HotColdTargetLevelDetection::new(); + + let f0 = ParquetFileBuilder::new(0) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let f1 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + let f2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + + // list of one + assert_eq!( + target_level_detection.detect(&[f0.clone()]), + Some(CompactionLevel::FileNonOverlapped) + ); + assert_eq!( + target_level_detection.detect(&[f1.clone()]), + Some(CompactionLevel::Final) + ); + assert_eq!(target_level_detection.detect(&[f2.clone()]), None); + // list of many + assert_eq!( + target_level_detection.detect(&[f1.clone(), f0.clone()]), + Some(CompactionLevel::FileNonOverlapped) + ); + assert_eq!( + target_level_detection.detect(&[f2.clone(), f0.clone()]), + Some(CompactionLevel::FileNonOverlapped) + ); + assert_eq!( + target_level_detection.detect(&[f2.clone(), f0, f1.clone()]), + Some(CompactionLevel::FileNonOverlapped) + ); + assert_eq!( + target_level_detection.detect(&[f2, f1]), + Some(CompactionLevel::Final) + ); + } +} diff --git a/compactor2/src/components/target_level/mod.rs b/compactor2/src/components/target_level/mod.rs new file mode 100644 index 0000000000..01814d1095 --- /dev/null +++ b/compactor2/src/components/target_level/mod.rs @@ -0,0 +1,11 @@ +use std::fmt::{Debug, Display}; + +use data_types::CompactionLevel; + +pub mod hot_cold; +pub mod naive; + +pub trait TargetLevelDetection: Debug + Display + Send + Sync { + // return the compaction level the given files are suitable to get compacted to + fn detect(&self, files: &[data_types::ParquetFile]) -> Option; +} diff --git a/compactor2/src/components/target_level/naive.rs b/compactor2/src/components/target_level/naive.rs new file mode 100644 index 0000000000..e6118b40fd --- /dev/null +++ b/compactor2/src/components/target_level/naive.rs @@ -0,0 +1,88 @@ +use std::fmt::Display; + +use data_types::CompactionLevel; + +use super::TargetLevelDetection; + +#[derive(Debug)] +pub struct NaiveTargetLevelDetection {} + +impl NaiveTargetLevelDetection { + pub fn new() -> Self { + Self {} + } +} + +impl Display for NaiveTargetLevelDetection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "naive target level detection",) + } +} + +impl TargetLevelDetection for NaiveTargetLevelDetection { + // For naive version, we only compact (L0s + L1s) to L1s + // The target level is always 1 and there must be at least one file in L0 + fn detect(&self, files: &[data_types::ParquetFile]) -> Option { + // Check if there are files in Compaction::Initial level + if files + .iter() + .any(|file| file.compaction_level == CompactionLevel::Initial) + { + return Some(CompactionLevel::FileNonOverlapped); + } + + None + } +} + +#[cfg(test)] +mod tests { + use crate::test_util::ParquetFileBuilder; + + use super::*; + + #[test] + fn test_display() { + assert_eq!( + NaiveTargetLevelDetection::new().to_string(), + "naive target level detection" + ); + } + + #[test] + fn test_apply() { + let target_level_detection = NaiveTargetLevelDetection::new(); + + let f0 = ParquetFileBuilder::new(0) + .with_compaction_level(CompactionLevel::Initial) + .build(); + let f1 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .build(); + let f2 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::Final) + .build(); + + // list of one + assert_eq!( + target_level_detection.detect(&[f0.clone()]), + Some(CompactionLevel::FileNonOverlapped) + ); + assert_eq!(target_level_detection.detect(&[f1.clone()]), None); + assert_eq!(target_level_detection.detect(&[f2.clone()]), None); + // list of many + assert_eq!( + target_level_detection.detect(&[f1.clone(), f0.clone()]), + Some(CompactionLevel::FileNonOverlapped) + ); + assert_eq!( + target_level_detection.detect(&[f2.clone(), f0.clone()]), + Some(CompactionLevel::FileNonOverlapped) + ); + assert_eq!( + target_level_detection.detect(&[f2.clone(), f0, f1.clone()]), + Some(CompactionLevel::FileNonOverlapped) + ); + assert_eq!(target_level_detection.detect(&[f2, f1]), None); + } +} diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index 065c696163..b6c434c939 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -252,7 +252,7 @@ async fn try_compact_partition( scratchpad_ctx.clean_from_scratchpad(&input_paths).await; // Update the catalog to reflect the newly created files, soft delete the compacted files and - // update the the upgraded files + // update the upgraded files let (created_files, upgraded_files) = update_catalog( Arc::clone(&components), partition_id, @@ -304,13 +304,12 @@ struct CompactionPlan { /// fn buil_compaction_plan( files: Vec, - _components: Arc, + components: Arc, ) -> Result { - // TODO : Detect target level to compact to - // target = 1 if there are L0 files - // target = 2 if there are L1 files but no L0 file - // let target_level = components.target_level_detection.detect(&branch); - let target_level = CompactionLevel::FileNonOverlapped; + // Detect target level to compact to + let target_level = components.target_level_detection.detect(&files); + assert!(target_level.is_some(), "No target level is detected"); + let target_level = target_level.unwrap(); // Split files into files_to_compact, files_to_upgrade, and files_to_keep //