refactor: address review comment and rename to match comapct algo version names

pull/24376/head
NGA-TRAN 2023-02-03 10:49:45 -05:00
parent ff39917dcb
commit f02d0629ce
11 changed files with 313 additions and 222 deletions

View File

@ -31,6 +31,7 @@ use super::{
and::AndIdOnlyPartitionFilter, by_id::ByIdPartitionFilter, shard::ShardPartitionFilter,
IdOnlyPartitionFilter,
},
level_exist::one_level::OneLevelExist,
parquet_file_sink::{
dedicated::DedicatedExecParquetFileSinkWrapper, logging::LoggingParquetFileSinkWrapper,
object_store::ObjectStoreParquetFileSink,
@ -61,8 +62,8 @@ use super::{
round_split::all_now::AllNowRoundSplit,
scratchpad::{ignore_writes_object_store::IgnoreWrites, prod::ProdScratchpadGen},
skipped_compactions_source::catalog::CatalogSkippedCompactionsSource,
target_level::{
hot_cold::HotColdTargetLevelDetection, naive::NaiveTargetLevelDetection,
target_level_detection::{
all_at_once::AllAtOnceTargetLevelDetection, target_level::TargetLevelTargetLevelDetection,
TargetLevelDetection,
},
Components,
@ -287,7 +288,9 @@ fn version_specific_partition_filters(config: &Config) -> Vec<Arc<dyn PartitionF
fn version_specific_target_level_detection(config: &Config) -> Arc<dyn TargetLevelDetection> {
match config.compact_version {
AlgoVersion::Naive => Arc::new(NaiveTargetLevelDetection::new()),
AlgoVersion::HotCold => Arc::new(HotColdTargetLevelDetection::new()),
AlgoVersion::Naive => Arc::new(AllAtOnceTargetLevelDetection::new()),
AlgoVersion::HotCold => {
Arc::new(TargetLevelTargetLevelDetection::new(OneLevelExist::new()))
}
}
}

View File

@ -4,7 +4,7 @@ use data_types::CompactionLevel;
pub mod one_level;
pub trait LevelFilter: Debug + Display + Send + Sync {
/// return true if this filter has the given level
pub trait LevelExist: Debug + Display + Send + Sync {
/// return true if at least one file in the given level
fn apply(&self, files: &[data_types::ParquetFile], level: CompactionLevel) -> bool;
}

View File

@ -2,24 +2,24 @@ use std::fmt::Display;
use data_types::CompactionLevel;
use super::LevelFilter;
use super::LevelExist;
#[derive(Debug)]
pub struct OneLevelFilter {}
pub struct OneLevelExist {}
impl OneLevelFilter {
impl OneLevelExist {
pub fn new() -> Self {
Self {}
}
}
impl Display for OneLevelFilter {
impl Display for OneLevelExist {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "one level",)
}
}
impl LevelFilter for OneLevelFilter {
impl LevelExist for OneLevelExist {
fn apply(&self, files: &[data_types::ParquetFile], level: CompactionLevel) -> bool {
files.iter().any(|f| f.compaction_level == level)
}
@ -33,12 +33,12 @@ mod tests {
#[test]
fn test_display() {
assert_eq!(OneLevelFilter::new().to_string(), "one level");
assert_eq!(OneLevelExist::new().to_string(), "one level");
}
#[test]
fn test_apply() {
let filter = OneLevelFilter::new();
let filter = OneLevelExist::new();
let f0 = ParquetFileBuilder::new(0)
.with_compaction_level(CompactionLevel::Initial)

View File

@ -7,7 +7,7 @@ use self::{
partition_files_source::PartitionFilesSource, partition_filter::PartitionFilter,
partition_source::PartitionSource, partitions_source::PartitionsSource,
round_split::RoundSplit, scratchpad::ScratchpadGen, tables_source::TablesSource,
target_level::TargetLevelDetection,
target_level_detection::TargetLevelDetection,
};
pub mod combos;
@ -19,7 +19,7 @@ pub mod file_filter;
pub mod files_filter;
pub mod hardcoded;
pub mod id_only_partition_filter;
pub mod level_filter;
pub mod level_exist;
pub mod namespaces_source;
pub mod parquet_file_sink;
pub mod partition_done_sink;
@ -32,7 +32,7 @@ pub mod round_split;
pub mod scratchpad;
pub mod skipped_compactions_source;
pub mod tables_source;
pub mod target_level;
pub mod target_level_detection;
#[derive(Debug, Clone)]
pub struct Components {

View File

@ -1,105 +0,0 @@
use std::fmt::Display;
use data_types::CompactionLevel;
use crate::components::level_filter::{one_level::OneLevelFilter, LevelFilter};
use super::TargetLevelDetection;
#[derive(Debug)]
pub struct HotColdTargetLevelDetection<T> where T: LevelFilter {
level_filter: T,
}
impl HotColdTargetLevelDetection {
pub fn new() -> Self {
Self {
level_filter: OneLevelFilter::new(),
}
}
}
impl Display for HotColdTargetLevelDetection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "hot cold target level detection",)
}
}
impl TargetLevelDetection for HotColdTargetLevelDetection {
// For HotCold version, we support compact (L0s + L1s) to L1s and (L1s + L2s) to L2s
// Target is the next level of the lowest level that has files
fn detect(&self, files: &[data_types::ParquetFile]) -> Option<CompactionLevel> {
// Start with initial level
// If there are files in this level, the compaction's target level will be the next level.
// Otherwise repeat until reaching the final level.
let mut level = CompactionLevel::Initial;
while level != CompactionLevel::Final {
if self.level_filter.apply(files, level) {
return Some(level.next());
}
level = level.next();
}
// All files are in final level, nothing to compact
None
}
}
#[cfg(test)]
mod tests {
use crate::test_util::ParquetFileBuilder;
use super::*;
#[test]
fn test_display() {
assert_eq!(
HotColdTargetLevelDetection::new().to_string(),
"hot cold target level detection"
);
}
#[test]
fn test_apply() {
let target_level_detection = HotColdTargetLevelDetection::new();
let f0 = ParquetFileBuilder::new(0)
.with_compaction_level(CompactionLevel::Initial)
.build();
let f1 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
let f2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
// list of one
assert_eq!(
target_level_detection.detect(&[f0.clone()]),
Some(CompactionLevel::FileNonOverlapped)
);
assert_eq!(
target_level_detection.detect(&[f1.clone()]),
Some(CompactionLevel::Final)
);
assert_eq!(target_level_detection.detect(&[f2.clone()]), None);
// list of many
assert_eq!(
target_level_detection.detect(&[f1.clone(), f0.clone()]),
Some(CompactionLevel::FileNonOverlapped)
);
assert_eq!(
target_level_detection.detect(&[f2.clone(), f0.clone()]),
Some(CompactionLevel::FileNonOverlapped)
);
assert_eq!(
target_level_detection.detect(&[f2.clone(), f0, f1.clone()]),
Some(CompactionLevel::FileNonOverlapped)
);
assert_eq!(
target_level_detection.detect(&[f2, f1]),
Some(CompactionLevel::Final)
);
}
}

View File

@ -1,11 +0,0 @@
use std::fmt::{Debug, Display};
use data_types::CompactionLevel;
pub mod hot_cold;
pub mod naive;
pub trait TargetLevelDetection: Debug + Display + Send + Sync {
/// return the compaction level the given files are suitable to get compacted to
fn detect(&self, files: &[data_types::ParquetFile]) -> Option<CompactionLevel>;
}

View File

@ -1,88 +0,0 @@
use std::fmt::Display;
use data_types::CompactionLevel;
use super::TargetLevelDetection;
#[derive(Debug)]
pub struct NaiveTargetLevelDetection {}
impl NaiveTargetLevelDetection {
pub fn new() -> Self {
Self {}
}
}
impl Display for NaiveTargetLevelDetection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "naive target level detection",)
}
}
impl TargetLevelDetection for NaiveTargetLevelDetection {
// For naive version, we only compact (L0s + L1s) to L1s
// The target level is always 1 and there must be at least one file in L0
fn detect(&self, files: &[data_types::ParquetFile]) -> Option<CompactionLevel> {
// Check if there are files in Compaction::Initial level
if files
.iter()
.any(|file| file.compaction_level == CompactionLevel::Initial)
{
return Some(CompactionLevel::FileNonOverlapped);
}
None
}
}
#[cfg(test)]
mod tests {
use crate::test_util::ParquetFileBuilder;
use super::*;
#[test]
fn test_display() {
assert_eq!(
NaiveTargetLevelDetection::new().to_string(),
"naive target level detection"
);
}
#[test]
fn test_apply() {
let target_level_detection = NaiveTargetLevelDetection::new();
let f0 = ParquetFileBuilder::new(0)
.with_compaction_level(CompactionLevel::Initial)
.build();
let f1 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
let f2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
// list of one
assert_eq!(
target_level_detection.detect(&[f0.clone()]),
Some(CompactionLevel::FileNonOverlapped)
);
assert_eq!(target_level_detection.detect(&[f1.clone()]), None);
assert_eq!(target_level_detection.detect(&[f2.clone()]), None);
// list of many
assert_eq!(
target_level_detection.detect(&[f1.clone(), f0.clone()]),
Some(CompactionLevel::FileNonOverlapped)
);
assert_eq!(
target_level_detection.detect(&[f2.clone(), f0.clone()]),
Some(CompactionLevel::FileNonOverlapped)
);
assert_eq!(
target_level_detection.detect(&[f2.clone(), f0, f1.clone()]),
Some(CompactionLevel::FileNonOverlapped)
);
assert_eq!(target_level_detection.detect(&[f2, f1]), None);
}
}

View File

@ -0,0 +1,134 @@
use std::fmt::Display;
use data_types::CompactionLevel;
use super::TargetLevelDetection;
#[derive(Debug)]
pub struct AllAtOnceTargetLevelDetection {}
impl AllAtOnceTargetLevelDetection {
pub fn new() -> Self {
Self {}
}
}
impl Display for AllAtOnceTargetLevelDetection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Target level detection for AllAtOnce version",)
}
}
impl TargetLevelDetection for AllAtOnceTargetLevelDetection {
// For AllAtOnce version, we only compact (L0s + L1s) to L1s
// The target level is always 1 and there must be at least one file in L0
fn detect(&self, files: &[data_types::ParquetFile]) -> CompactionLevel {
// Check if there are files in Compaction::Initial level
if files
.iter()
.any(|file| file.compaction_level == CompactionLevel::Initial)
{
return CompactionLevel::FileNonOverlapped;
}
panic!("Level-0 file not found in target level detection");
}
}
#[cfg(test)]
mod tests {
use crate::test_util::ParquetFileBuilder;
use super::*;
#[test]
fn test_display() {
assert_eq!(
AllAtOnceTargetLevelDetection::new().to_string(),
"Target level detection for AllAtOnce version"
);
}
#[test]
#[should_panic(expected = "Level-0 file not found in target level detection")]
fn test_apply_empty() {
let target_level_detection = AllAtOnceTargetLevelDetection::new();
target_level_detection.detect(&[]);
}
#[test]
#[should_panic(expected = "Level-0 file not found in target level detection")]
fn test_only_l1() {
let target_level_detection = AllAtOnceTargetLevelDetection::new();
let f1 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
target_level_detection.detect(&[f1]);
}
#[test]
#[should_panic(expected = "Level-0 file not found in target level detection")]
fn test_only_l2() {
let target_level_detection = AllAtOnceTargetLevelDetection::new();
let f2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
target_level_detection.detect(&[f2]);
}
#[test]
#[should_panic(expected = "Level-0 file not found in target level detection")]
fn test_only_l1_l2() {
let target_level_detection = AllAtOnceTargetLevelDetection::new();
let f1 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
let f2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
target_level_detection.detect(&[f1, f2]);
}
#[test]
fn test_apply() {
let target_level_detection = AllAtOnceTargetLevelDetection::new();
let f0 = ParquetFileBuilder::new(0)
.with_compaction_level(CompactionLevel::Initial)
.build();
let f1 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
let f2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
// list of one
assert_eq!(
target_level_detection.detect(&[f0.clone()]),
CompactionLevel::FileNonOverlapped
);
// list of many
assert_eq!(
target_level_detection.detect(&[f1.clone(), f0.clone()]),
CompactionLevel::FileNonOverlapped
);
assert_eq!(
target_level_detection.detect(&[f2.clone(), f0.clone()]),
CompactionLevel::FileNonOverlapped
);
assert_eq!(
target_level_detection.detect(&[f2, f0, f1]),
CompactionLevel::FileNonOverlapped
);
}
}

View File

@ -0,0 +1,11 @@
use std::fmt::{Debug, Display};
use data_types::CompactionLevel;
pub mod all_at_once;
pub mod target_level;
pub trait TargetLevelDetection: Debug + Display + Send + Sync {
/// Return the compaction level the given files are suitable to get compacted to
fn detect(&self, files: &[data_types::ParquetFile]) -> CompactionLevel;
}

View File

@ -0,0 +1,149 @@
use std::fmt::Display;
use data_types::CompactionLevel;
use crate::components::level_exist::LevelExist;
use super::TargetLevelDetection;
/// For TargetLevel version, we support compact (L0s + L1s) to L1s and (L1s + L2s) to L2s
/// Target is the next level of the lowest level that has files
#[derive(Debug)]
pub struct TargetLevelTargetLevelDetection<T>
where
T: LevelExist,
{
inner: T,
}
impl<T> TargetLevelTargetLevelDetection<T>
where
T: LevelExist,
{
pub fn new(inner: T) -> Self {
Self { inner }
}
}
impl<T> Display for TargetLevelTargetLevelDetection<T>
where
T: LevelExist,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Target level detection for TargetLevel version",)
}
}
impl<T> TargetLevelDetection for TargetLevelTargetLevelDetection<T>
where
T: LevelExist,
{
fn detect(&self, files: &[data_types::ParquetFile]) -> CompactionLevel {
// Start with initial level
// If there are files in this level, the compaction's target level will be the next level.
// Otherwise repeat until reaching the final level.
let mut level = CompactionLevel::Initial;
while level != CompactionLevel::Final {
if self.inner.apply(files, level) {
return level.next();
}
level = level.next();
}
// All files are in final level and should have been filtered out earlier
panic!("Neither level-0 nor level-1 found in target level detection");
}
}
#[cfg(test)]
mod tests {
use crate::{components::level_exist::one_level::OneLevelExist, test_util::ParquetFileBuilder};
use super::*;
#[test]
fn test_display() {
assert_eq!(
TargetLevelTargetLevelDetection::new(OneLevelExist::new()).to_string(),
"Target level detection for TargetLevel version"
);
}
#[test]
#[should_panic(expected = "Neither level-0 nor level-1 found in target level detection")]
fn test_apply_empty() {
let target_level_detection = TargetLevelTargetLevelDetection::new(OneLevelExist::new());
target_level_detection.detect(&[]);
}
#[test]
fn test_apply_only_l0() {
let target_level_detection = TargetLevelTargetLevelDetection::new(OneLevelExist::new());
let f0 = ParquetFileBuilder::new(0)
.with_compaction_level(CompactionLevel::Initial)
.build();
assert_eq!(
target_level_detection.detect(&[f0]),
CompactionLevel::FileNonOverlapped
);
}
#[test]
fn test_apply_only_l1() {
let target_level_detection = TargetLevelTargetLevelDetection::new(OneLevelExist::new());
let f1 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
assert_eq!(target_level_detection.detect(&[f1]), CompactionLevel::Final);
}
#[test]
#[should_panic(expected = "Neither level-0 nor level-1 found in target level detection")]
fn test_apply_only_l2() {
let target_level_detection = TargetLevelTargetLevelDetection::new(OneLevelExist::new());
let f2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
target_level_detection.detect(&[f2]);
}
#[test]
fn test_apply_many_files() {
let target_level_detection = TargetLevelTargetLevelDetection::new(OneLevelExist::new());
let f0 = ParquetFileBuilder::new(0)
.with_compaction_level(CompactionLevel::Initial)
.build();
let f1 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
let f2 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::Final)
.build();
assert_eq!(
target_level_detection.detect(&[f1.clone(), f0.clone()]),
CompactionLevel::FileNonOverlapped
);
assert_eq!(
target_level_detection.detect(&[f2.clone(), f0.clone()]),
CompactionLevel::FileNonOverlapped
);
assert_eq!(
target_level_detection.detect(&[f2.clone(), f0, f1.clone()]),
CompactionLevel::FileNonOverlapped
);
assert_eq!(
target_level_detection.detect(&[f2, f1]),
CompactionLevel::Final
);
}
}

View File

@ -308,8 +308,6 @@ fn buil_compaction_plan(
) -> Result<CompactionPlan, DynError> {
// Detect target level to compact to
let target_level = components.target_level_detection.detect(&files);
assert!(target_level.is_some(), "No target level is detected");
let target_level = target_level.unwrap();
// Split files into files_to_compact, files_to_upgrade, and files_to_keep
//