feat: compaction target level detection
parent
5a896764cc
commit
fc539cfe33
|
@ -61,6 +61,10 @@ use super::{
|
|||
round_split::all_now::AllNowRoundSplit,
|
||||
scratchpad::{ignore_writes_object_store::IgnoreWrites, prod::ProdScratchpadGen},
|
||||
skipped_compactions_source::catalog::CatalogSkippedCompactionsSource,
|
||||
target_level::{
|
||||
hot_cold::HotColdTargetLevelDetection, naive::NaiveTargetLevelDetection,
|
||||
TargetLevelDetection,
|
||||
},
|
||||
Components,
|
||||
};
|
||||
|
||||
|
@ -235,6 +239,7 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
|
|||
Arc::clone(config.parquet_store_scratchpad.object_store()),
|
||||
scratchpad_store_output,
|
||||
)),
|
||||
target_level_detection: version_specific_target_level_detection(config),
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -279,3 +284,10 @@ fn version_specific_partition_filters(config: &Config) -> Vec<Arc<dyn PartitionF
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn version_specific_target_level_detection(config: &Config) -> Arc<dyn TargetLevelDetection> {
|
||||
match config.compact_version {
|
||||
AlgoVersion::Naive => Arc::new(NaiveTargetLevelDetection::new()),
|
||||
AlgoVersion::HotCold => Arc::new(HotColdTargetLevelDetection::new()),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
use std::fmt::{Debug, Display};
|
||||
|
||||
use data_types::CompactionLevel;
|
||||
|
||||
pub mod one_level;
|
||||
|
||||
pub trait LevelFilter: Debug + Display + Send + Sync {
|
||||
// return true if this filter has the given level
|
||||
fn apply(&self, files: &[data_types::ParquetFile], level: CompactionLevel) -> bool;
|
||||
}
|
|
@ -0,0 +1,71 @@
|
|||
use std::fmt::Display;
|
||||
|
||||
use data_types::CompactionLevel;
|
||||
|
||||
use super::LevelFilter;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct OneLevelFilter {}
|
||||
|
||||
impl OneLevelFilter {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for OneLevelFilter {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "one level",)
|
||||
}
|
||||
}
|
||||
|
||||
impl LevelFilter for OneLevelFilter {
|
||||
fn apply(&self, files: &[data_types::ParquetFile], level: CompactionLevel) -> bool {
|
||||
files.iter().any(|f| f.compaction_level == level)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::test_util::ParquetFileBuilder;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_display() {
|
||||
assert_eq!(OneLevelFilter::new().to_string(), "one level");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_apply() {
|
||||
let filter = OneLevelFilter::new();
|
||||
|
||||
let f0 = ParquetFileBuilder::new(0)
|
||||
.with_compaction_level(CompactionLevel::Initial)
|
||||
.build();
|
||||
let f1 = ParquetFileBuilder::new(1)
|
||||
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||
.build();
|
||||
let f2 = ParquetFileBuilder::new(2)
|
||||
.with_compaction_level(CompactionLevel::Final)
|
||||
.build();
|
||||
|
||||
// empty list
|
||||
assert!(!filter.apply(&[], CompactionLevel::Initial));
|
||||
|
||||
// list of one
|
||||
assert!(filter.apply(&[f0.clone()], CompactionLevel::Initial));
|
||||
assert!(filter.apply(&[f1.clone()], CompactionLevel::FileNonOverlapped));
|
||||
assert!(filter.apply(&[f2.clone()], CompactionLevel::Final));
|
||||
assert!(!filter.apply(&[f0.clone()], CompactionLevel::FileNonOverlapped));
|
||||
assert!(!filter.apply(&[f1.clone()], CompactionLevel::Initial));
|
||||
assert!(!filter.apply(&[f2.clone()], CompactionLevel::Initial));
|
||||
// list of many
|
||||
assert!(filter.apply(&[f2.clone(), f0.clone()], CompactionLevel::Initial));
|
||||
assert!(filter.apply(
|
||||
&vec![f2.clone(), f0, f1.clone()],
|
||||
CompactionLevel::FileNonOverlapped
|
||||
));
|
||||
assert!(!filter.apply(&[f2, f1], CompactionLevel::Initial));
|
||||
}
|
||||
}
|
|
@ -7,6 +7,7 @@ use self::{
|
|||
partition_files_source::PartitionFilesSource, partition_filter::PartitionFilter,
|
||||
partition_source::PartitionSource, partitions_source::PartitionsSource,
|
||||
round_split::RoundSplit, scratchpad::ScratchpadGen, tables_source::TablesSource,
|
||||
target_level::TargetLevelDetection,
|
||||
};
|
||||
|
||||
pub mod combos;
|
||||
|
@ -18,6 +19,7 @@ pub mod file_filter;
|
|||
pub mod files_filter;
|
||||
pub mod hardcoded;
|
||||
pub mod id_only_partition_filter;
|
||||
pub mod level_filter;
|
||||
pub mod namespaces_source;
|
||||
pub mod parquet_file_sink;
|
||||
pub mod partition_done_sink;
|
||||
|
@ -30,6 +32,7 @@ pub mod round_split;
|
|||
pub mod scratchpad;
|
||||
pub mod skipped_compactions_source;
|
||||
pub mod tables_source;
|
||||
pub mod target_level;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Components {
|
||||
|
@ -48,4 +51,5 @@ pub struct Components {
|
|||
pub round_split: Arc<dyn RoundSplit>,
|
||||
pub divide_initial: Arc<dyn DivideInitial>,
|
||||
pub scratchpad_gen: Arc<dyn ScratchpadGen>,
|
||||
pub target_level_detection: Arc<dyn TargetLevelDetection>,
|
||||
}
|
||||
|
|
|
@ -92,6 +92,7 @@ pub fn log_components(components: &Components) {
|
|||
round_split,
|
||||
divide_initial,
|
||||
scratchpad_gen,
|
||||
target_level_detection,
|
||||
} = components;
|
||||
|
||||
info!(
|
||||
|
@ -110,6 +111,7 @@ pub fn log_components(components: &Components) {
|
|||
%round_split,
|
||||
%divide_initial,
|
||||
%scratchpad_gen,
|
||||
%target_level_detection,
|
||||
"component setup",
|
||||
);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
use std::fmt::Display;
|
||||
|
||||
use data_types::CompactionLevel;
|
||||
|
||||
use crate::components::level_filter::{one_level::OneLevelFilter, LevelFilter};
|
||||
|
||||
use super::TargetLevelDetection;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct HotColdTargetLevelDetection {
|
||||
level_filter: OneLevelFilter,
|
||||
}
|
||||
|
||||
impl HotColdTargetLevelDetection {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
level_filter: OneLevelFilter::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for HotColdTargetLevelDetection {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "hot cold target level detection",)
|
||||
}
|
||||
}
|
||||
|
||||
impl TargetLevelDetection for HotColdTargetLevelDetection {
|
||||
// For HotCold version, we support compact (L0s + L1s) to L1s and (L1s + L2s) to L2s
|
||||
// Target is the next level of the lowest level that has files
|
||||
fn detect(&self, files: &[data_types::ParquetFile]) -> Option<CompactionLevel> {
|
||||
// Start with initial level
|
||||
// If there are files in this level, the compaction's target level will be the next level.
|
||||
// Otherwiuse repeat until reaching the final level.
|
||||
let mut level = CompactionLevel::Initial;
|
||||
while level != CompactionLevel::Final {
|
||||
if self.level_filter.apply(files, level) {
|
||||
return Some(level.next());
|
||||
}
|
||||
|
||||
level = level.next();
|
||||
}
|
||||
|
||||
// All files are in final level, nothing to compact
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::test_util::ParquetFileBuilder;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_display() {
|
||||
assert_eq!(
|
||||
HotColdTargetLevelDetection::new().to_string(),
|
||||
"hot cold target level detection"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_apply() {
|
||||
let target_level_detection = HotColdTargetLevelDetection::new();
|
||||
|
||||
let f0 = ParquetFileBuilder::new(0)
|
||||
.with_compaction_level(CompactionLevel::Initial)
|
||||
.build();
|
||||
let f1 = ParquetFileBuilder::new(1)
|
||||
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||
.build();
|
||||
let f2 = ParquetFileBuilder::new(2)
|
||||
.with_compaction_level(CompactionLevel::Final)
|
||||
.build();
|
||||
|
||||
// list of one
|
||||
assert_eq!(
|
||||
target_level_detection.detect(&[f0.clone()]),
|
||||
Some(CompactionLevel::FileNonOverlapped)
|
||||
);
|
||||
assert_eq!(
|
||||
target_level_detection.detect(&[f1.clone()]),
|
||||
Some(CompactionLevel::Final)
|
||||
);
|
||||
assert_eq!(target_level_detection.detect(&[f2.clone()]), None);
|
||||
// list of many
|
||||
assert_eq!(
|
||||
target_level_detection.detect(&[f1.clone(), f0.clone()]),
|
||||
Some(CompactionLevel::FileNonOverlapped)
|
||||
);
|
||||
assert_eq!(
|
||||
target_level_detection.detect(&[f2.clone(), f0.clone()]),
|
||||
Some(CompactionLevel::FileNonOverlapped)
|
||||
);
|
||||
assert_eq!(
|
||||
target_level_detection.detect(&[f2.clone(), f0, f1.clone()]),
|
||||
Some(CompactionLevel::FileNonOverlapped)
|
||||
);
|
||||
assert_eq!(
|
||||
target_level_detection.detect(&[f2, f1]),
|
||||
Some(CompactionLevel::Final)
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
use std::fmt::{Debug, Display};
|
||||
|
||||
use data_types::CompactionLevel;
|
||||
|
||||
pub mod hot_cold;
|
||||
pub mod naive;
|
||||
|
||||
pub trait TargetLevelDetection: Debug + Display + Send + Sync {
|
||||
// return the compaction level the given files are suitable to get compacted to
|
||||
fn detect(&self, files: &[data_types::ParquetFile]) -> Option<CompactionLevel>;
|
||||
}
|
|
@ -0,0 +1,88 @@
|
|||
use std::fmt::Display;
|
||||
|
||||
use data_types::CompactionLevel;
|
||||
|
||||
use super::TargetLevelDetection;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct NaiveTargetLevelDetection {}
|
||||
|
||||
impl NaiveTargetLevelDetection {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for NaiveTargetLevelDetection {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "naive target level detection",)
|
||||
}
|
||||
}
|
||||
|
||||
impl TargetLevelDetection for NaiveTargetLevelDetection {
|
||||
// For naive version, we only compact (L0s + L1s) to L1s
|
||||
// The target level is always 1 and there must be at least one file in L0
|
||||
fn detect(&self, files: &[data_types::ParquetFile]) -> Option<CompactionLevel> {
|
||||
// Check if there are files in Compaction::Initial level
|
||||
if files
|
||||
.iter()
|
||||
.any(|file| file.compaction_level == CompactionLevel::Initial)
|
||||
{
|
||||
return Some(CompactionLevel::FileNonOverlapped);
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::test_util::ParquetFileBuilder;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_display() {
|
||||
assert_eq!(
|
||||
NaiveTargetLevelDetection::new().to_string(),
|
||||
"naive target level detection"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_apply() {
|
||||
let target_level_detection = NaiveTargetLevelDetection::new();
|
||||
|
||||
let f0 = ParquetFileBuilder::new(0)
|
||||
.with_compaction_level(CompactionLevel::Initial)
|
||||
.build();
|
||||
let f1 = ParquetFileBuilder::new(1)
|
||||
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||
.build();
|
||||
let f2 = ParquetFileBuilder::new(2)
|
||||
.with_compaction_level(CompactionLevel::Final)
|
||||
.build();
|
||||
|
||||
// list of one
|
||||
assert_eq!(
|
||||
target_level_detection.detect(&[f0.clone()]),
|
||||
Some(CompactionLevel::FileNonOverlapped)
|
||||
);
|
||||
assert_eq!(target_level_detection.detect(&[f1.clone()]), None);
|
||||
assert_eq!(target_level_detection.detect(&[f2.clone()]), None);
|
||||
// list of many
|
||||
assert_eq!(
|
||||
target_level_detection.detect(&[f1.clone(), f0.clone()]),
|
||||
Some(CompactionLevel::FileNonOverlapped)
|
||||
);
|
||||
assert_eq!(
|
||||
target_level_detection.detect(&[f2.clone(), f0.clone()]),
|
||||
Some(CompactionLevel::FileNonOverlapped)
|
||||
);
|
||||
assert_eq!(
|
||||
target_level_detection.detect(&[f2.clone(), f0, f1.clone()]),
|
||||
Some(CompactionLevel::FileNonOverlapped)
|
||||
);
|
||||
assert_eq!(target_level_detection.detect(&[f2, f1]), None);
|
||||
}
|
||||
}
|
|
@ -252,7 +252,7 @@ async fn try_compact_partition(
|
|||
scratchpad_ctx.clean_from_scratchpad(&input_paths).await;
|
||||
|
||||
// Update the catalog to reflect the newly created files, soft delete the compacted files and
|
||||
// update the the upgraded files
|
||||
// update the upgraded files
|
||||
let (created_files, upgraded_files) = update_catalog(
|
||||
Arc::clone(&components),
|
||||
partition_id,
|
||||
|
@ -304,13 +304,12 @@ struct CompactionPlan {
|
|||
///
|
||||
fn buil_compaction_plan(
|
||||
files: Vec<ParquetFile>,
|
||||
_components: Arc<Components>,
|
||||
components: Arc<Components>,
|
||||
) -> Result<CompactionPlan, DynError> {
|
||||
// TODO : Detect target level to compact to
|
||||
// target = 1 if there are L0 files
|
||||
// target = 2 if there are L1 files but no L0 file
|
||||
// let target_level = components.target_level_detection.detect(&branch);
|
||||
let target_level = CompactionLevel::FileNonOverlapped;
|
||||
// Detect target level to compact to
|
||||
let target_level = components.target_level_detection.detect(&files);
|
||||
assert!(target_level.is_some(), "No target level is detected");
|
||||
let target_level = target_level.unwrap();
|
||||
|
||||
// Split files into files_to_compact, files_to_upgrade, and files_to_keep
|
||||
//
|
||||
|
|
Loading…
Reference in New Issue