chore: Move target level choice into round info

pull/24376/head
Andrew Lamb 2023-02-15 13:50:26 -05:00
parent 063ac9f2cb
commit 63877ab314
9 changed files with 44 additions and 284 deletions

View File

@ -3,20 +3,18 @@ use std::fmt::Display;
use data_types::ParquetFile;
use crate::{
components::{files_split::FilesSplit, target_level_chooser::TargetLevelChooser},
file_classification::FileClassification,
partition_info::PartitionInfo,
RoundInfo,
components::files_split::FilesSplit, file_classification::FileClassification,
partition_info::PartitionInfo, RoundInfo,
};
use super::FileClassifier;
/// Use a combination of [`TargetLevelChooser`] and [`FilesSplit`] to build a [`FileClassification`].
/// Use [`FilesSplit`] to build a [`FileClassification`].
///
/// This uses the following data flow:
/// Uses the target_level from the `round_info` in the following data flow:
///
/// ```text
/// (files)--------------+->[target level chooser (T)]--->(target level)
/// (files+target_level)-+.......................................
/// | :
/// | :
/// | +................................+
@ -48,34 +46,25 @@ use super::FileClassifier;
/// (file compact) (file upgrade)
/// ```
#[derive(Debug)]
pub struct SplitBasedFileClassifier<T, FT, FO, FU>
pub struct SplitBasedFileClassifier<FT, FO, FU>
where
T: TargetLevelChooser,
FT: FilesSplit,
FO: FilesSplit,
FU: FilesSplit,
{
target_level_chooser: T,
target_level_split: FT,
non_overlap_split: FO,
upgrade_split: FU,
}
impl<T, FT, FO, FU> SplitBasedFileClassifier<T, FT, FO, FU>
impl<FT, FO, FU> SplitBasedFileClassifier<FT, FO, FU>
where
T: TargetLevelChooser,
FT: FilesSplit,
FO: FilesSplit,
FU: FilesSplit,
{
pub fn new(
target_level_chooser: T,
target_level_split: FT,
non_overlap_split: FO,
upgrade_split: FU,
) -> Self {
pub fn new(target_level_split: FT, non_overlap_split: FO, upgrade_split: FU) -> Self {
Self {
target_level_chooser,
target_level_split,
non_overlap_split,
upgrade_split,
@ -83,9 +72,8 @@ where
}
}
impl<T, FT, FO, FU> Display for SplitBasedFileClassifier<T, FT, FO, FU>
impl<FT, FO, FU> Display for SplitBasedFileClassifier<FT, FO, FU>
where
T: TargetLevelChooser,
FT: FilesSplit,
FO: FilesSplit,
FU: FilesSplit,
@ -93,18 +81,14 @@ where
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"split_based(target_level_chooser={}, target_level_split={}, non_overlap_split={}, upgrade_split={})",
self.target_level_chooser,
self.target_level_split,
self.non_overlap_split,
self.upgrade_split,
"split_based(target_level_split={}, non_overlap_split={}, upgrade_split={})",
self.target_level_split, self.non_overlap_split, self.upgrade_split,
)
}
}
impl<T, FT, FO, FU> FileClassifier for SplitBasedFileClassifier<T, FT, FO, FU>
impl<FT, FO, FU> FileClassifier for SplitBasedFileClassifier<FT, FO, FU>
where
T: TargetLevelChooser,
FT: FilesSplit,
FO: FilesSplit,
FU: FilesSplit,
@ -112,13 +96,11 @@ where
fn classify(
&self,
_partition_info: &PartitionInfo,
_round_info: &RoundInfo,
round_info: &RoundInfo,
files: Vec<ParquetFile>,
) -> FileClassification {
let files_to_compact = files;
// Detect target level to compact to
let target_level = self.target_level_chooser.detect(&files_to_compact);
let target_level = round_info.target_level();
// Split files into files_to_compact, files_to_upgrade, and files_to_keep
//

View File

@ -43,7 +43,6 @@ use super::{
and::AndIdOnlyPartitionFilter, shard::ShardPartitionFilter, IdOnlyPartitionFilter,
},
ir_planner::{logging::LoggingIRPlannerWrapper, planner_v1::V1IRPlanner},
level_exist::one_level::OneLevelExist,
parquet_file_sink::{
dedicated::DedicatedExecParquetFileSinkWrapper, logging::LoggingParquetFileSinkWrapper,
object_store::ObjectStoreParquetFileSink,
@ -84,7 +83,6 @@ use super::{
round_split::all_now::AllNowRoundSplit,
scratchpad::{noop::NoopScratchpadGen, prod::ProdScratchpadGen, ScratchpadGen},
skipped_compactions_source::catalog::CatalogSkippedCompactionsSource,
target_level_chooser::target_level::TargetLevelTargetLevelChooser,
Components,
};
@ -365,7 +363,6 @@ fn version_specific_file_classifier(config: &Config) -> Arc<dyn FileClassifier>
match config.compact_version {
AlgoVersion::AllAtOnce => Arc::new(AllAtOnceFileClassifier::new()),
AlgoVersion::TargetLevel => Arc::new(SplitBasedFileClassifier::new(
TargetLevelTargetLevelChooser::new(OneLevelExist::new()),
TargetLevelTargetLevelSplit::new(),
TargetLevelNonOverlapSplit::new(),
TargetLevelUpgradeSplit::new(config.max_desired_file_size_bytes),

View File

@ -1,10 +0,0 @@
use std::fmt::{Debug, Display};
use data_types::CompactionLevel;
pub mod one_level;
pub trait LevelExist: Debug + Display + Send + Sync {
/// return true if at least one file in the given level
fn apply(&self, files: &[data_types::ParquetFile], level: CompactionLevel) -> bool;
}

View File

@ -1,71 +0,0 @@
use std::fmt::Display;
use data_types::CompactionLevel;
use super::LevelExist;
#[derive(Debug)]
pub struct OneLevelExist {}
impl OneLevelExist {
pub fn new() -> Self {
Self {}
}
}
impl Display for OneLevelExist {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "one level",)
}
}
impl LevelExist for OneLevelExist {
fn apply(&self, files: &[data_types::ParquetFile], level: CompactionLevel) -> bool {
files.iter().any(|f| f.compaction_level == level)
}
}
#[cfg(test)]
mod tests {
use iox_tests::ParquetFileBuilder;
use super::*;
#[test]
fn test_display() {
assert_eq!(OneLevelExist::new().to_string(), "one level");
}
#[test]
fn test_apply() {
let filter = OneLevelExist::new();
let f0 = ParquetFileBuilder::new(0)
.with_compaction_level(CompactionLevel::Initial)
.build();
let f1 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
let f2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
// empty list
assert!(!filter.apply(&[], CompactionLevel::Initial));
// list of one
assert!(filter.apply(&[f0.clone()], CompactionLevel::Initial));
assert!(filter.apply(&[f1.clone()], CompactionLevel::FileNonOverlapped));
assert!(filter.apply(&[f2.clone()], CompactionLevel::Final));
assert!(!filter.apply(&[f0.clone()], CompactionLevel::FileNonOverlapped));
assert!(!filter.apply(&[f1.clone()], CompactionLevel::Initial));
assert!(!filter.apply(&[f2.clone()], CompactionLevel::Initial));
// list of many
assert!(filter.apply(&[f2.clone(), f0.clone()], CompactionLevel::Initial));
assert!(filter.apply(
&vec![f2.clone(), f0, f1.clone()],
CompactionLevel::FileNonOverlapped
));
assert!(!filter.apply(&[f2, f1], CompactionLevel::Initial));
}
}

View File

@ -22,7 +22,6 @@ pub mod files_split;
pub mod hardcoded;
pub mod id_only_partition_filter;
pub mod ir_planner;
pub mod level_exist;
pub mod namespaces_source;
pub mod parquet_file_sink;
pub mod parquet_files_sink;
@ -39,7 +38,6 @@ pub mod round_split;
pub mod scratchpad;
pub mod skipped_compactions_source;
pub mod tables_source;
pub mod target_level_chooser;
/// Pluggable system to determine compactor behavior. Please see
/// [Crate Level Documentation](crate) for more details on the

View File

@ -4,7 +4,7 @@ use std::{
};
use async_trait::async_trait;
use data_types::ParquetFile;
use data_types::{CompactionLevel, ParquetFile};
use observability_deps::tracing::debug;
use crate::{error::DynError, PartitionInfo, RoundInfo};
@ -71,11 +71,25 @@ impl RoundInfoSource for LevelBasedRoundInfo {
async fn calculate(
&self,
_partition_info: &PartitionInfo,
_files: &[ParquetFile],
files: &[ParquetFile],
) -> Result<Arc<RoundInfo>, DynError> {
// TODO: use this to calculate splits
Ok(Arc::new(RoundInfo::TargetLevel {
target_level: data_types::CompactionLevel::Initial,
}))
let target_level = pick_level(files);
Ok(Arc::new(RoundInfo::TargetLevel { target_level }))
}
}
fn pick_level(files: &[data_types::ParquetFile]) -> CompactionLevel {
// Start with initial level
// If there are files in this level, the compaction's target level will be the next level.
// Otherwise repeat until reaching the final level.
let mut level = CompactionLevel::Initial;
while level != CompactionLevel::Final {
if files.iter().any(|f| f.compaction_level == level) {
return level.next();
}
level = level.next();
}
level
}

View File

@ -1,10 +0,0 @@
use std::fmt::{Debug, Display};
use data_types::CompactionLevel;
pub mod target_level;
pub trait TargetLevelChooser: Debug + Display + Send + Sync {
/// Return the compaction level the given files are suitable to get compacted to
fn detect(&self, files: &[data_types::ParquetFile]) -> CompactionLevel;
}

View File

@ -1,150 +0,0 @@
use std::fmt::Display;
use data_types::CompactionLevel;
use crate::components::level_exist::LevelExist;
use super::TargetLevelChooser;
/// For TargetLevel version, we support compact (L0s + L1s) to L1s and (L1s + L2s) to L2s
/// Target is the next level of the lowest level that has files
#[derive(Debug)]
pub struct TargetLevelTargetLevelChooser<T>
where
T: LevelExist,
{
inner: T,
}
impl<T> TargetLevelTargetLevelChooser<T>
where
T: LevelExist,
{
pub fn new(inner: T) -> Self {
Self { inner }
}
}
impl<T> Display for TargetLevelTargetLevelChooser<T>
where
T: LevelExist,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Target level detection for TargetLevel version",)
}
}
impl<T> TargetLevelChooser for TargetLevelTargetLevelChooser<T>
where
T: LevelExist,
{
fn detect(&self, files: &[data_types::ParquetFile]) -> CompactionLevel {
// Start with initial level
// If there are files in this level, the compaction's target level will be the next level.
// Otherwise repeat until reaching the final level.
let mut level = CompactionLevel::Initial;
while level != CompactionLevel::Final {
if self.inner.apply(files, level) {
return level.next();
}
level = level.next();
}
// All files are in final level and should have been filtered out earlier
panic!("Neither level-0 nor level-1 found in target level detection");
}
}
#[cfg(test)]
mod tests {
use crate::components::level_exist::one_level::OneLevelExist;
use iox_tests::ParquetFileBuilder;
use super::*;
#[test]
fn test_display() {
assert_eq!(
TargetLevelTargetLevelChooser::new(OneLevelExist::new()).to_string(),
"Target level detection for TargetLevel version"
);
}
#[test]
#[should_panic(expected = "Neither level-0 nor level-1 found in target level detection")]
fn test_apply_empty() {
let target_level_chooser = TargetLevelTargetLevelChooser::new(OneLevelExist::new());
target_level_chooser.detect(&[]);
}
#[test]
fn test_apply_only_l0() {
let target_level_chooser = TargetLevelTargetLevelChooser::new(OneLevelExist::new());
let f0 = ParquetFileBuilder::new(0)
.with_compaction_level(CompactionLevel::Initial)
.build();
assert_eq!(
target_level_chooser.detect(&[f0]),
CompactionLevel::FileNonOverlapped
);
}
#[test]
fn test_apply_only_l1() {
let target_level_chooser = TargetLevelTargetLevelChooser::new(OneLevelExist::new());
let f1 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
assert_eq!(target_level_chooser.detect(&[f1]), CompactionLevel::Final);
}
#[test]
#[should_panic(expected = "Neither level-0 nor level-1 found in target level detection")]
fn test_apply_only_l2() {
let target_level_chooser = TargetLevelTargetLevelChooser::new(OneLevelExist::new());
let f2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
target_level_chooser.detect(&[f2]);
}
#[test]
fn test_apply_many_files() {
let target_level_chooser = TargetLevelTargetLevelChooser::new(OneLevelExist::new());
let f0 = ParquetFileBuilder::new(0)
.with_compaction_level(CompactionLevel::Initial)
.build();
let f1 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
let f2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
assert_eq!(
target_level_chooser.detect(&[f1.clone(), f0.clone()]),
CompactionLevel::FileNonOverlapped
);
assert_eq!(
target_level_chooser.detect(&[f2.clone(), f0.clone()]),
CompactionLevel::FileNonOverlapped
);
assert_eq!(
target_level_chooser.detect(&[f2.clone(), f0, f1.clone()]),
CompactionLevel::FileNonOverlapped
);
assert_eq!(
target_level_chooser.detect(&[f2, f1]),
CompactionLevel::Final
);
}
}

View File

@ -25,3 +25,13 @@ impl Display for RoundInfo {
}
}
}
impl RoundInfo {
/// what levels should the files in this round be?
pub fn target_level(&self) -> CompactionLevel {
match self {
Self::TargetLevel { target_level } => *target_level,
Self::ManySmallFiles => CompactionLevel::Initial,
}
}
}