feat: split higher target level files from compact ones (#6829)

* feat: split upgrade & compact

* chore: Apply suggestions from code review

Co-authored-by: Marco Neumann <marco@crepererum.net>

* chore: merge main to branch

* refactor: address review comments

* fix: doc comment fo []

---------

Co-authored-by: Marco Neumann <marco@crepererum.net>
Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Nga Tran 2023-02-03 12:57:38 -05:00 committed by GitHub
parent a9433302dd
commit 0ed9754c4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 389 additions and 4 deletions

View File

@ -0,0 +1,78 @@
use std::fmt::Display;
use data_types::{CompactionLevel, ParquetFile};
use super::FilesSplit;
#[derive(Debug)]
/// In AllAtOnce version, we will compact all files at once and do not split anything
pub struct AllAtOnceTargetLevelSplit {}
impl AllAtOnceTargetLevelSplit {
pub fn new() -> Self {
Self {}
}
}
impl Display for AllAtOnceTargetLevelSplit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Target level split for AllAtOnce version")
}
}
impl FilesSplit for AllAtOnceTargetLevelSplit {
fn apply(
&self,
files: Vec<data_types::ParquetFile>,
_target_level: CompactionLevel,
) -> (Vec<ParquetFile>, Vec<ParquetFile>) {
(files, vec![])
}
}
#[cfg(test)]
mod tests {
use crate::test_util::create_overlapped_files;
use super::*;
#[test]
fn test_display() {
assert_eq!(
AllAtOnceTargetLevelSplit::new().to_string(),
"Target level split for AllAtOnce version"
);
}
#[test]
fn test_apply_empty_files() {
let files = vec![];
let split = AllAtOnceTargetLevelSplit::new();
let (lower, higher) = split.apply(files, CompactionLevel::FileNonOverlapped);
assert_eq!(lower.len(), 0);
assert_eq!(higher.len(), 0);
}
#[test]
fn test_apply() {
// ------------------------------
// Create 8 files with all levels
let files = create_overlapped_files();
assert_eq!(files.len(), 8);
let split = AllAtOnceTargetLevelSplit::new();
let (lower, higher) = split.apply(files.clone(), CompactionLevel::Initial);
assert_eq!(lower.len(), 8);
assert_eq!(higher.len(), 0);
let (lower, higher) = split.apply(files.clone(), CompactionLevel::FileNonOverlapped);
assert_eq!(lower.len(), 8);
assert_eq!(higher.len(), 0);
let (lower, higher) = split.apply(files, CompactionLevel::Final);
assert_eq!(lower.len(), 8);
assert_eq!(higher.len(), 0);
}
}

View File

@ -0,0 +1,21 @@
use std::fmt::{Debug, Display};
use data_types::{CompactionLevel, ParquetFile};
pub mod all_at_once_target_level_split;
pub mod target_level_target_level_split;
pub trait FilesSplit: Debug + Display + Send + Sync {
/// Split provided files into 2 groups of files. There will be different split needs:
/// . `[files <= target_level]` and `[files > target_level]`
/// . `[files_to_upgrade]` and `[files_to_compact]`
///
/// Note that for AllAtOnce version, we do not split anything and compact all files at once
/// This split is mainly for version after the naive AllAtOnce. For the AllAtOnce, we will
/// create dummy modules to return all files
fn apply(
&self,
files: Vec<data_types::ParquetFile>,
target_level: CompactionLevel,
) -> (Vec<ParquetFile>, Vec<ParquetFile>);
}

View File

@ -0,0 +1,206 @@
use std::fmt::Display;
use data_types::{CompactionLevel, ParquetFile};
use super::FilesSplit;
/// Split given files into 2 groups of files: `[<= target_level]` and `[> target_level]`
#[derive(Debug)]
pub struct TargetLevelTargetLevelSplit {}
impl TargetLevelTargetLevelSplit {
pub fn new() -> Self {
Self {}
}
}
impl Display for TargetLevelTargetLevelSplit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Target level split for TargetLevel version")
}
}
impl FilesSplit for TargetLevelTargetLevelSplit {
fn apply(
&self,
files: Vec<data_types::ParquetFile>,
target_level: CompactionLevel,
) -> (Vec<ParquetFile>, Vec<ParquetFile>) {
files
.into_iter()
.partition(|f| f.compaction_level <= target_level)
}
}
#[cfg(test)]
mod tests {
use crate::test_util::create_overlapped_files;
use super::*;
#[test]
fn test_display() {
assert_eq!(
TargetLevelTargetLevelSplit::new().to_string(),
"Target level split for TargetLevel version"
);
}
#[test]
fn test_apply_empty_files() {
let files = vec![];
let split = TargetLevelTargetLevelSplit::new();
let (lower, higher) = split.apply(files, CompactionLevel::FileNonOverlapped);
assert_eq!(lower.len(), 0);
assert_eq!(higher.len(), 0);
}
#[test]
fn test_apply_partial_empty_files_l0() {
// ------------------------------
// Create 8 files with all levels
let files = create_overlapped_files();
assert_eq!(files.len(), 8);
//
// Only keep files of CompactionLevel::Initial
let files = files
.into_iter()
.filter(|f| f.compaction_level == CompactionLevel::Initial)
.collect::<Vec<_>>();
assert_eq!(files.len(), 3);
let split = TargetLevelTargetLevelSplit::new();
let (lower, higher) = split.apply(files.clone(), CompactionLevel::Initial);
assert_eq!(lower.len(), 3);
assert_eq!(higher.len(), 0);
let (lower, higher) = split.apply(files.clone(), CompactionLevel::FileNonOverlapped);
assert_eq!(lower.len(), 3);
assert_eq!(higher.len(), 0);
let (lower, higher) = split.apply(files, CompactionLevel::Final);
assert_eq!(lower.len(), 3);
assert_eq!(higher.len(), 0);
}
#[test]
fn test_apply_partial_empty_files_l1() {
// ------------------------------
// Create 8 files with all levels
let files = create_overlapped_files();
assert_eq!(files.len(), 8);
//
// Only keep files of CompactionLevel::FileNonOverlapped
let files = files
.into_iter()
.filter(|f| f.compaction_level == CompactionLevel::FileNonOverlapped)
.collect::<Vec<_>>();
assert_eq!(files.len(), 3);
let split = TargetLevelTargetLevelSplit::new();
let (lower, higher) = split.apply(files.clone(), CompactionLevel::Initial);
assert_eq!(lower.len(), 0);
assert_eq!(higher.len(), 3);
//
let (lower, higher) = split.apply(files.clone(), CompactionLevel::FileNonOverlapped);
assert_eq!(lower.len(), 3);
assert_eq!(higher.len(), 0);
//
let (lower, higher) = split.apply(files, CompactionLevel::Final);
assert_eq!(lower.len(), 3);
assert_eq!(higher.len(), 0);
}
#[test]
fn test_apply_partial_empty_files_l2() {
// ------------------------------
// Create 8 files with all levels
let files = create_overlapped_files();
assert_eq!(files.len(), 8);
//
// Only keep files of CompactionLevel::Final
let files = files
.into_iter()
.filter(|f| f.compaction_level == CompactionLevel::Final)
.collect::<Vec<_>>();
assert_eq!(files.len(), 2);
let split = TargetLevelTargetLevelSplit::new();
let (lower, higher) = split.apply(files.clone(), CompactionLevel::Initial);
assert_eq!(lower.len(), 0);
assert_eq!(higher.len(), 2);
let (lower, higher) = split.apply(files.clone(), CompactionLevel::FileNonOverlapped);
assert_eq!(lower.len(), 0);
assert_eq!(higher.len(), 2);
let (lower, higher) = split.apply(files, CompactionLevel::Final);
assert_eq!(lower.len(), 2);
assert_eq!(higher.len(), 0);
}
#[test]
fn test_apply_target_level_0() {
// Test target level Initial
let files = create_overlapped_files();
assert_eq!(files.len(), 8);
let split = TargetLevelTargetLevelSplit::new();
let (lower, higher) = split.apply(files, CompactionLevel::Initial);
// verify number of files
assert_eq!(lower.len(), 3);
assert_eq!(higher.len(), 5);
// verify compaction level of files
assert!(lower
.iter()
.all(|f| f.compaction_level == CompactionLevel::Initial));
assert!(higher
.iter()
.all(|f| f.compaction_level == CompactionLevel::FileNonOverlapped
|| f.compaction_level == CompactionLevel::Final));
}
#[test]
fn test_apply_terget_level_l1() {
// ------------------------------
// Test target level is FileNonOverlapped
let files = create_overlapped_files();
assert_eq!(files.len(), 8);
let split = TargetLevelTargetLevelSplit::new();
let (lower, higher) = split.apply(files, CompactionLevel::FileNonOverlapped);
// verify number of files
assert_eq!(lower.len(), 6);
assert_eq!(higher.len(), 2);
// verify compaction level of files
assert!(lower
.iter()
.all(|f| f.compaction_level == CompactionLevel::Initial
|| f.compaction_level == CompactionLevel::FileNonOverlapped));
assert!(higher
.iter()
.all(|f| f.compaction_level == CompactionLevel::Final));
}
#[test]
fn test_apply_taget_level_l2() {
// ------------------------------
// Test target level is Final
let files = create_overlapped_files();
assert_eq!(files.len(), 8);
let split = TargetLevelTargetLevelSplit::new();
let (lower, higher) = split.apply(files, CompactionLevel::Final);
// verify number of files
assert_eq!(lower.len(), 8);
assert_eq!(higher.len(), 0);
// verify compaction level of files
assert!(lower
.iter()
.all(|f| f.compaction_level == CompactionLevel::Initial
|| f.compaction_level == CompactionLevel::FileNonOverlapped
|| f.compaction_level == CompactionLevel::Final));
}
}

View File

@ -27,6 +27,10 @@ use super::{
divide_initial::single_branch::SingleBranchDivideInitial,
file_filter::{and::AndFileFilter, level_range::LevelRangeFileFilter},
files_filter::{chain::FilesFilterChain, per_file::PerFileFilesFilter},
files_split::{
all_at_once_target_level_split::AllAtOnceTargetLevelSplit,
target_level_target_level_split::TargetLevelTargetLevelSplit, FilesSplit,
},
id_only_partition_filter::{
and::AndIdOnlyPartitionFilter, by_id::ByIdPartitionFilter, shard::ShardPartitionFilter,
IdOnlyPartitionFilter,
@ -235,6 +239,7 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
Arc::clone(config.parquet_store_scratchpad.object_store()),
scratchpad_store_output,
)),
target_level_split: version_specific_target_level_split(config),
})
}
@ -279,3 +284,10 @@ fn version_specific_partition_filters(config: &Config) -> Vec<Arc<dyn PartitionF
}
}
}
fn version_specific_target_level_split(config: &Config) -> Arc<dyn FilesSplit> {
match config.compact_version {
AlgoVersion::AllAtOnce => Arc::new(AllAtOnceTargetLevelSplit::new()),
AlgoVersion::TargetLevel => Arc::new(TargetLevelTargetLevelSplit::new()),
}
}

View File

@ -16,6 +16,7 @@ pub mod df_planner;
pub mod divide_initial;
pub mod file_filter;
pub mod files_filter;
pub mod files_split;
pub mod hardcoded;
pub mod id_only_partition_filter;
pub mod namespaces_source;
@ -48,4 +49,5 @@ pub struct Components {
pub round_split: Arc<dyn RoundSplit>,
pub divide_initial: Arc<dyn DivideInitial>,
pub scratchpad_gen: Arc<dyn ScratchpadGen>,
pub target_level_split: Arc<dyn files_split::FilesSplit>,
}

View File

@ -92,6 +92,7 @@ pub fn log_components(components: &Components) {
round_split,
divide_initial,
scratchpad_gen,
target_level_split,
} = components;
info!(
@ -110,6 +111,7 @@ pub fn log_components(components: &Components) {
%round_split,
%divide_initial,
%scratchpad_gen,
%target_level_split,
"component setup",
);
}

View File

@ -304,8 +304,10 @@ struct CompactionPlan {
///
fn buil_compaction_plan(
files: Vec<ParquetFile>,
_components: Arc<Components>,
components: Arc<Components>,
) -> Result<CompactionPlan, DynError> {
let files_to_compact = files;
// TODO : Detect target level to compact to
// target = 1 if there are L0 files
// target = 2 if there are L1 files but no L0 file
@ -317,10 +319,11 @@ fn buil_compaction_plan(
// Since output of one compaction is used as input of next compaction, all files that are not
// compacted or upgraded are still kept to consider in next round of compaction
// TODO: Split atctual files to compact from its higher-target-level files
// Split atctual files to compact from its higher-target-level files
// The higher-target-level files are kept for next round of compaction
// let (compacted_files, higher_level_files) = components.target_level_split.apply(branch, target_level);
let (files_to_compact, mut files_to_keep) = (files, vec![]);
let (files_to_compact, mut files_to_keep) = components
.target_level_split
.apply(files_to_compact, target_level);
// To have efficient compaction performance, we do not need to compact eligible non-overlapped file
// Example:

View File

@ -85,6 +85,16 @@ impl ParquetFileBuilder {
}
}
pub fn with_time_range(self, min_time: i64, max_time: i64) -> Self {
Self {
file: ParquetFile {
min_time: Timestamp::new(min_time),
max_time: Timestamp::new(max_time),
..self.file
},
}
}
pub fn build(self) -> ParquetFile {
self.file
}
@ -581,3 +591,54 @@ where
.expect("timeout")
}
}
// This setup will return files with ranges as follows:
// |--L2.1--| |--L2.2--|
// |--L1.1--| |--L1.2--| |--L1.3--|
// |--L0.1--| |--L0.2--| |--L0.3--|
// Sizes of L1.3 and L0.3 are set large (100), the rest is default (1)
pub fn create_overlapped_files() -> Vec<ParquetFile> {
let l2_1 = ParquetFileBuilder::new(0)
.with_compaction_level(CompactionLevel::Final)
.with_time_range(0, 100)
.build();
let l2_2 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::Final)
.with_time_range(200, 300)
.build();
// L1_1 overlaps with L2_1
let l1_1 = ParquetFileBuilder::new(2)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_time_range(250, 350)
.build();
let l1_2 = ParquetFileBuilder::new(3)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_time_range(400, 500)
.build();
let l1_3 = ParquetFileBuilder::new(4)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_time_range(600, 700)
.with_file_size_bytes(100)
.build();
// L0_1 overlaps with L1_2 and L1_3
let l0_1 = ParquetFileBuilder::new(5)
.with_compaction_level(CompactionLevel::Initial)
.with_time_range(450, 620)
.build();
// L0_2 overlaps with L1_3
let l0_2 = ParquetFileBuilder::new(6)
.with_compaction_level(CompactionLevel::Initial)
.with_time_range(650, 750)
.build();
// L0_3 overlaps with nothing
let l0_3 = ParquetFileBuilder::new(7)
.with_compaction_level(CompactionLevel::Initial)
.with_time_range(800, 900)
.with_file_size_bytes(100)
.build();
// Put the files in random order
vec![l1_3, l1_2, l2_1, l2_2, l0_2, l1_1, l0_1, l0_3]
}