diff --git a/compactor2/src/components/divide_initial/mod.rs b/compactor2/src/components/divide_initial/mod.rs index 237c000dab..65832c016c 100644 --- a/compactor2/src/components/divide_initial/mod.rs +++ b/compactor2/src/components/divide_initial/mod.rs @@ -2,6 +2,8 @@ use std::fmt::{Debug, Display}; use data_types::ParquetFile; +use crate::RoundInfo; + pub mod single_branch; pub trait DivideInitial: Debug + Display + Send + Sync { @@ -10,5 +12,5 @@ pub trait DivideInitial: Debug + Display + Send + Sync { /// /// Each branch is compacted together in a single plan, and each /// compact plan may produce one or more parquet files. - fn divide(&self, files: Vec) -> Vec>; + fn divide(&self, files: Vec, round_info: &RoundInfo) -> Vec>; } diff --git a/compactor2/src/components/divide_initial/single_branch.rs b/compactor2/src/components/divide_initial/single_branch.rs index 1a420ffd40..ef386c8672 100644 --- a/compactor2/src/components/divide_initial/single_branch.rs +++ b/compactor2/src/components/divide_initial/single_branch.rs @@ -2,6 +2,8 @@ use std::fmt::Display; use data_types::ParquetFile; +use crate::RoundInfo; + use super::DivideInitial; #[derive(Debug, Default)] @@ -20,7 +22,7 @@ impl Display for SingleBranchDivideInitial { } impl DivideInitial for SingleBranchDivideInitial { - fn divide(&self, files: Vec) -> Vec> { + fn divide(&self, files: Vec, _round_info: &RoundInfo) -> Vec> { if files.is_empty() { vec![] } else { @@ -45,16 +47,17 @@ mod tests { #[test] fn test_divide() { + let round_info = RoundInfo::ManySmallFiles {}; let divide = SingleBranchDivideInitial::new(); // empty input - assert_eq!(divide.divide(vec![]), Vec::>::new()); + assert_eq!(divide.divide(vec![], &round_info), Vec::>::new()); // not empty let f1 = ParquetFileBuilder::new(1).build(); let f2 = ParquetFileBuilder::new(2).build(); assert_eq!( - divide.divide(vec![f1.clone(), f2.clone()]), + divide.divide(vec![f1.clone(), f2.clone()], &round_info), vec![vec![f1, f2]] ); } diff --git a/compactor2/src/components/file_classifier/all_at_once.rs b/compactor2/src/components/file_classifier/all_at_once.rs index ac50cd886a..d258988521 100644 --- a/compactor2/src/components/file_classifier/all_at_once.rs +++ b/compactor2/src/components/file_classifier/all_at_once.rs @@ -2,7 +2,7 @@ use std::fmt::Display; use data_types::{CompactionLevel, ParquetFile}; -use crate::{file_classification::FileClassification, partition_info::PartitionInfo}; +use crate::{file_classification::FileClassification, partition_info::PartitionInfo, RoundInfo}; use super::FileClassifier; @@ -26,6 +26,7 @@ impl FileClassifier for AllAtOnceFileClassifier { fn classify( &self, _partition_info: &PartitionInfo, + _round_info: &RoundInfo, files: Vec, ) -> FileClassification { // Check if there are files in Compaction::Initial level @@ -67,7 +68,7 @@ mod tests { let classifier = AllAtOnceFileClassifier::new(); let partition_info = Arc::new(PartitionInfoBuilder::new().build()); - classifier.classify(&partition_info, vec![]); + classifier.classify(&partition_info, &round_info(), vec![]); } #[test] @@ -80,7 +81,7 @@ mod tests { .build(); let partition_info = Arc::new(PartitionInfoBuilder::new().build()); - classifier.classify(&partition_info, vec![f1]); + classifier.classify(&partition_info, &round_info(), vec![f1]); } #[test] @@ -93,7 +94,7 @@ mod tests { .build(); let partition_info = Arc::new(PartitionInfoBuilder::new().build()); - classifier.classify(&partition_info, vec![f2]); + classifier.classify(&partition_info, &round_info(), vec![f2]); } #[test] @@ -110,7 +111,7 @@ mod tests { .build(); let partition_info = Arc::new(PartitionInfoBuilder::new().build()); - classifier.classify(&partition_info, vec![f1, f2]); + classifier.classify(&partition_info, &round_info(), vec![f1, f2]); } #[test] @@ -118,7 +119,7 @@ mod tests { let classifier = AllAtOnceFileClassifier::new(); let files = create_overlapped_files(); let partition_info = Arc::new(PartitionInfoBuilder::new().build()); - let classification = classifier.classify(&partition_info, files.clone()); + let classification = classifier.classify(&partition_info, &round_info(), files.clone()); assert_eq!( classification, FileClassification { @@ -129,4 +130,8 @@ mod tests { } ); } + + fn round_info() -> RoundInfo { + RoundInfo::ManySmallFiles {} + } } diff --git a/compactor2/src/components/file_classifier/logging.rs b/compactor2/src/components/file_classifier/logging.rs index d374ccbcc9..729303ad58 100644 --- a/compactor2/src/components/file_classifier/logging.rs +++ b/compactor2/src/components/file_classifier/logging.rs @@ -3,7 +3,7 @@ use std::fmt::Display; use data_types::ParquetFile; use observability_deps::tracing::info; -use crate::{file_classification::FileClassification, partition_info::PartitionInfo}; +use crate::{file_classification::FileClassification, partition_info::PartitionInfo, RoundInfo}; use super::FileClassifier; @@ -40,13 +40,15 @@ where fn classify( &self, partition_info: &PartitionInfo, + round_info: &RoundInfo, files: Vec, ) -> FileClassification { - let classification = self.inner.classify(partition_info, files); + let classification = self.inner.classify(partition_info, round_info, files); info!( partition_id = partition_info.partition_id.get(), target_level = %classification.target_level, + round_info = %round_info, files_to_compacts = classification.files_to_compact.len(), files_to_upgrade = classification.files_to_upgrade.len(), files_to_keep = classification.files_to_keep.len(), diff --git a/compactor2/src/components/file_classifier/mod.rs b/compactor2/src/components/file_classifier/mod.rs index de8d49ffed..f1d879ae35 100644 --- a/compactor2/src/components/file_classifier/mod.rs +++ b/compactor2/src/components/file_classifier/mod.rs @@ -5,7 +5,7 @@ use std::{ use data_types::ParquetFile; -use crate::{file_classification::FileClassification, partition_info::PartitionInfo}; +use crate::{file_classification::FileClassification, partition_info::PartitionInfo, RoundInfo}; pub mod all_at_once; pub mod logging; @@ -15,6 +15,7 @@ pub trait FileClassifier: Debug + Display + Send + Sync { fn classify( &self, partition_info: &PartitionInfo, + round_info: &RoundInfo, files: Vec, ) -> FileClassification; } @@ -26,8 +27,9 @@ where fn classify( &self, partition_info: &PartitionInfo, + round_info: &RoundInfo, files: Vec, ) -> FileClassification { - self.as_ref().classify(partition_info, files) + self.as_ref().classify(partition_info, round_info, files) } } diff --git a/compactor2/src/components/file_classifier/split_based.rs b/compactor2/src/components/file_classifier/split_based.rs index 567ec64b46..57ba8d2dc5 100644 --- a/compactor2/src/components/file_classifier/split_based.rs +++ b/compactor2/src/components/file_classifier/split_based.rs @@ -6,6 +6,7 @@ use crate::{ components::{files_split::FilesSplit, target_level_chooser::TargetLevelChooser}, file_classification::FileClassification, partition_info::PartitionInfo, + RoundInfo, }; use super::FileClassifier; @@ -111,6 +112,7 @@ where fn classify( &self, _partition_info: &PartitionInfo, + _round_info: &RoundInfo, files: Vec, ) -> FileClassification { let files_to_compact = files; diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index 8a525ed785..6e764bbbc3 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -80,6 +80,7 @@ use super::{ not_empty::NotEmptyPartitionsSourceWrapper, randomize_order::RandomizeOrderPartitionsSourcesWrapper, PartitionsSource, }, + round_info_source::{LevelBasedRoundInfo, LoggingRoundInfoWrapper}, round_split::all_now::AllNowRoundSplit, scratchpad::{noop::NoopScratchpadGen, prod::ProdScratchpadGen, ScratchpadGen}, skipped_compactions_source::catalog::CatalogSkippedCompactionsSource, @@ -279,6 +280,9 @@ pub fn hardcoded_components(config: &Config) -> Arc { config.backoff_config.clone(), Arc::clone(&config.catalog), )), + round_info_source: Arc::new(LoggingRoundInfoWrapper::new(Arc::new( + LevelBasedRoundInfo::new(), + ))), files_filter: version_specific_files_filter(config), partition_filter: Arc::new(LoggingPartitionFilterWrapper::new( MetricsPartitionFilterWrapper::new( diff --git a/compactor2/src/components/mod.rs b/compactor2/src/components/mod.rs index a4d2c16057..9377f361b7 100644 --- a/compactor2/src/components/mod.rs +++ b/compactor2/src/components/mod.rs @@ -6,7 +6,8 @@ use self::{ ir_planner::IRPlanner, parquet_files_sink::ParquetFilesSink, partition_done_sink::PartitionDoneSink, partition_files_source::PartitionFilesSource, partition_filter::PartitionFilter, partition_info_source::PartitionInfoSource, - partition_stream::PartitionStream, round_split::RoundSplit, scratchpad::ScratchpadGen, + partition_stream::PartitionStream, round_info_source::RoundInfoSource, round_split::RoundSplit, + scratchpad::ScratchpadGen, }; pub mod combos; @@ -33,6 +34,7 @@ pub mod partition_source; pub mod partition_stream; pub mod partitions_source; pub mod report; +pub mod round_info_source; pub mod round_split; pub mod scratchpad; pub mod skipped_compactions_source; @@ -50,6 +52,8 @@ pub struct Components { pub partition_info_source: Arc, /// Source of files in a partition for compaction pub partition_files_source: Arc, + /// Determines what type of compaction round the compactor will be doing + pub round_info_source: Arc, /// filter files for each round of compaction pub files_filter: Arc, /// stop condition for completing a partition compaction diff --git a/compactor2/src/components/report.rs b/compactor2/src/components/report.rs index 7777d63fbb..8b74d5dba0 100644 --- a/compactor2/src/components/report.rs +++ b/compactor2/src/components/report.rs @@ -96,6 +96,7 @@ pub fn log_components(components: &Components) { partition_stream, partition_info_source, partition_files_source, + round_info_source, files_filter, partition_filter, partition_resource_limit_filter, @@ -116,6 +117,7 @@ pub fn log_components(components: &Components) { %partition_info_source, %partition_files_source, %files_filter, + %round_info_source, %partition_filter, %partition_resource_limit_filter, %partition_done_sink, diff --git a/compactor2/src/components/round_info_source/mod.rs b/compactor2/src/components/round_info_source/mod.rs new file mode 100644 index 0000000000..4913f16dd1 --- /dev/null +++ b/compactor2/src/components/round_info_source/mod.rs @@ -0,0 +1,81 @@ +use std::{ + fmt::{Debug, Display}, + sync::Arc, +}; + +use async_trait::async_trait; +use data_types::ParquetFile; +use observability_deps::tracing::debug; + +use crate::{error::DynError, PartitionInfo, RoundInfo}; + +/// Calculates information about what this compaction round does +#[async_trait] +pub trait RoundInfoSource: Debug + Display + Send + Sync { + async fn calculate( + &self, + partition_info: &PartitionInfo, + files: &[ParquetFile], + ) -> Result, DynError>; +} + +#[derive(Debug)] +pub struct LoggingRoundInfoWrapper { + inner: Arc, +} + +impl LoggingRoundInfoWrapper { + pub fn new(inner: Arc) -> Self { + Self { inner } + } +} + +impl Display for LoggingRoundInfoWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "LoggingRoundInfoWrapper({})", self.inner) + } +} + +#[async_trait] +impl RoundInfoSource for LoggingRoundInfoWrapper { + async fn calculate( + &self, + partition_info: &PartitionInfo, + files: &[ParquetFile], + ) -> Result, DynError> { + let res = self.inner.calculate(partition_info, files).await; + if let Ok(round_info) = &res { + debug!(round_info_source=%self.inner, %round_info, "running round"); + } + res + } +} + +/// Computes the type of round based on the levels of the input files +#[derive(Debug)] +pub struct LevelBasedRoundInfo {} + +impl Display for LevelBasedRoundInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "LevelBasedRoundInfo") + } +} +impl LevelBasedRoundInfo { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl RoundInfoSource for LevelBasedRoundInfo { + async fn calculate( + &self, + _partition_info: &PartitionInfo, + _files: &[ParquetFile], + ) -> Result, DynError> { + // TODO: use this to calculate splits + Ok(Arc::new(RoundInfo::TargetLevel { + target_level: data_types::CompactionLevel::Initial, + })) + } +} diff --git a/compactor2/src/components/round_split/all_now.rs b/compactor2/src/components/round_split/all_now.rs index bf1d401a3a..32b5c06d1e 100644 --- a/compactor2/src/components/round_split/all_now.rs +++ b/compactor2/src/components/round_split/all_now.rs @@ -2,6 +2,8 @@ use std::fmt::Display; use data_types::ParquetFile; +use crate::RoundInfo; + use super::RoundSplit; #[derive(Debug, Default)] @@ -20,7 +22,11 @@ impl Display for AllNowRoundSplit { } impl RoundSplit for AllNowRoundSplit { - fn split(&self, files: Vec) -> (Vec, Vec) { + fn split( + &self, + files: Vec, + _round_info: &RoundInfo, + ) -> (Vec, Vec) { (files, vec![]) } } @@ -29,6 +35,8 @@ impl RoundSplit for AllNowRoundSplit { mod tests { use iox_tests::ParquetFileBuilder; + use crate::RoundInfo; + use super::*; #[test] @@ -38,16 +46,17 @@ mod tests { #[test] fn test_split() { + let round_info = RoundInfo::ManySmallFiles; let split = AllNowRoundSplit::new(); // empty input - assert_eq!(split.split(vec![]), (vec![], vec![])); + assert_eq!(split.split(vec![], &round_info), (vec![], vec![])); // not empty let f1 = ParquetFileBuilder::new(1).build(); let f2 = ParquetFileBuilder::new(2).build(); assert_eq!( - split.split(vec![f1.clone(), f2.clone()]), + split.split(vec![f1.clone(), f2.clone()], &round_info), (vec![f1, f2], vec![]) ); } diff --git a/compactor2/src/components/round_split/mod.rs b/compactor2/src/components/round_split/mod.rs index 02809d115f..ab2a9a3335 100644 --- a/compactor2/src/components/round_split/mod.rs +++ b/compactor2/src/components/round_split/mod.rs @@ -2,6 +2,8 @@ use std::fmt::{Debug, Display}; use data_types::ParquetFile; +use crate::RoundInfo; + pub mod all_now; pub trait RoundSplit: Debug + Display + Send + Sync { @@ -11,5 +13,9 @@ pub trait RoundSplit: Debug + Display + Send + Sync { /// /// - **now:** will be processed in this round /// - **later:** will be processed in the next round - fn split(&self, files: Vec) -> (Vec, Vec); + fn split( + &self, + files: Vec, + round_info: &RoundInfo, + ) -> (Vec, Vec); } diff --git a/compactor2/src/driver.rs b/compactor2/src/driver.rs index 1a905ae856..656c3957f2 100644 --- a/compactor2/src/driver.rs +++ b/compactor2/src/driver.rs @@ -184,6 +184,11 @@ async fn try_compact_partition( // loop for each "Round", consider each file in the partition loop { + let round_info = components + .round_info_source + .calculate(&partition_info, &files) + .await?; + files = components.files_filter.apply(files); // This is the stop condition which will be different for different version of compaction @@ -196,10 +201,12 @@ async fn try_compact_partition( return Ok(()); } - let (files_now, files_later) = components.round_split.split(files); + let (files_now, files_later) = components.round_split.split(files, round_info.as_ref()); // Each branch must not overlap with each other - let mut branches = components.divide_initial.divide(files_now); + let mut branches = components + .divide_initial + .divide(files_now, round_info.as_ref()); let mut files_next = files_later; // loop for each "Branch" @@ -210,7 +217,10 @@ async fn try_compact_partition( // Identify the target level and files that should be // compacted together, upgraded, and kept for next round of // compaction - let file_classification = components.file_classifier.classify(&partition_info, branch); + let file_classification = + components + .file_classifier + .classify(&partition_info, &round_info, branch); // Cannot run this plan and skip this partition because of over limit of input num_files or size. // The partition_resource_limit_filter will throw an error if one of the limits hit and will lead diff --git a/compactor2/src/lib.rs b/compactor2/src/lib.rs index db9414db33..9a5b9f758b 100644 --- a/compactor2/src/lib.rs +++ b/compactor2/src/lib.rs @@ -179,6 +179,7 @@ mod file_classification; pub mod object_store; mod partition_info; mod plan_ir; +mod round_info; // publically expose items needed for testing pub use components::{ @@ -189,6 +190,7 @@ pub use driver::compact; pub use error::DynError; pub use partition_info::PartitionInfo; pub use plan_ir::PlanIR; +pub use round_info::RoundInfo; #[cfg(test)] mod test_utils; diff --git a/compactor2/src/partition_info.rs b/compactor2/src/partition_info.rs index ed247bb382..ada8b2da31 100644 --- a/compactor2/src/partition_info.rs +++ b/compactor2/src/partition_info.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use data_types::{NamespaceId, PartitionId, PartitionKey, Table, TableSchema}; use schema::sort::SortKey; -/// Information of a partition for compaction +/// Information about the Partition being compacted #[derive(Debug, PartialEq, Eq)] pub struct PartitionInfo { /// the partition diff --git a/compactor2/src/round_info.rs b/compactor2/src/round_info.rs new file mode 100644 index 0000000000..c2dba72bd6 --- /dev/null +++ b/compactor2/src/round_info.rs @@ -0,0 +1,27 @@ +//! Information about the current compaction round + +use std::fmt::Display; + +use data_types::CompactionLevel; + +/// Information about the current compaction round (see driver.rs for +/// more details about a round) +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum RoundInfo { + /// compacting to target level + TargetLevel { + /// compaction level of target fles + target_level: CompactionLevel, + }, + /// In many small files mode + ManySmallFiles, +} + +impl Display for RoundInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::TargetLevel { target_level } => write!(f, "TargetLevel: {target_level}"), + Self::ManySmallFiles => write!(f, "ManySmallFiles"), + } + } +}