chore: merge main to branch
commit
c5025d6271
|
@ -1003,6 +1003,7 @@ dependencies = [
|
|||
"data_types",
|
||||
"datafusion",
|
||||
"futures",
|
||||
"insta",
|
||||
"iox_catalog",
|
||||
"iox_query",
|
||||
"iox_tests",
|
||||
|
|
|
@ -34,3 +34,4 @@ workspace-hack = { path = "../workspace-hack"}
|
|||
arrow_util = { path = "../arrow_util" }
|
||||
iox_tests = { path = "../iox_tests" }
|
||||
test_helpers = { path = "../test_helpers"}
|
||||
insta = { version = "1.26.0", features = ["yaml"] }
|
||||
|
|
|
@ -130,7 +130,7 @@ mod tests {
|
|||
|
||||
use crate::test_util::{
|
||||
create_l1_files, create_overlapped_files, create_overlapped_files_2,
|
||||
create_overlapped_l0_l1_files, create_overlapped_l1_l2_files,
|
||||
create_overlapped_l0_l1_files, create_overlapped_l1_l2_files, format_files,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
@ -186,7 +186,17 @@ mod tests {
|
|||
#[test]
|
||||
fn test_apply_one_level_empty() {
|
||||
let files = create_l1_files(1);
|
||||
assert_eq!(files.len(), 3);
|
||||
insta::assert_yaml_snapshot!(
|
||||
format_files("initial", &files),
|
||||
@r###"
|
||||
---
|
||||
- initial
|
||||
- "L1 "
|
||||
- "L1.13[600,700] |-----L1.13-----| "
|
||||
- "L1.12[400,500] |-----L1.12-----| "
|
||||
- "L1.11[250,350] |-----L1.11-----| "
|
||||
"###
|
||||
);
|
||||
|
||||
let split = TargetLevelNonOverlapSplit::new();
|
||||
|
||||
|
@ -206,27 +216,48 @@ mod tests {
|
|||
let files = create_overlapped_l0_l1_files(1);
|
||||
assert_eq!(files.len(), 6);
|
||||
|
||||
// Input files:
|
||||
// |--L1.1--| |--L1.2--| |--L1.3--|
|
||||
// |--L0.1--| |--L0.2--| |--L0.3--|
|
||||
// Output files: (overlap, non_overlap) = ( [L0.1, L0.2, L0.3, L1.2, L1.3] , L1.1] )
|
||||
insta::assert_yaml_snapshot!(
|
||||
format_files("initial", &files),
|
||||
@r###"
|
||||
---
|
||||
- initial
|
||||
- "L0 "
|
||||
- "L0.2[650,750] |---L0.2---| "
|
||||
- "L0.1[450,620] |-------L0.1-------| "
|
||||
- "L0.3[800,900] |---L0.3---| "
|
||||
- "L1 "
|
||||
- "L1.13[600,700] |--L1.13---| "
|
||||
- "L1.12[400,500] |--L1.12---| "
|
||||
- "L1.11[250,350] |--L1.11---| "
|
||||
"###
|
||||
);
|
||||
|
||||
let split = TargetLevelNonOverlapSplit::new();
|
||||
let (overlap, non_overlap) = split.apply(files, CompactionLevel::FileNonOverlapped);
|
||||
assert_eq!(overlap.len(), 5);
|
||||
assert_eq!(non_overlap.len(), 1);
|
||||
|
||||
// Verify overlapping files
|
||||
// sort by id
|
||||
let mut overlap = overlap;
|
||||
overlap.sort_by(|a, b| a.id.cmp(&b.id));
|
||||
assert_eq!(overlap[0].id.get(), 1);
|
||||
assert_eq!(overlap[1].id.get(), 2);
|
||||
assert_eq!(overlap[2].id.get(), 3);
|
||||
assert_eq!(overlap[3].id.get(), 12);
|
||||
assert_eq!(overlap[4].id.get(), 13);
|
||||
insta::assert_yaml_snapshot!(
|
||||
format_files("overlap", &overlap),
|
||||
@r###"
|
||||
---
|
||||
- overlap
|
||||
- "L0 "
|
||||
- "L0.2[650,750] |-----L0.2-----| "
|
||||
- "L0.1[450,620] |----------L0.1-----------| "
|
||||
- "L0.3[800,900] |-----L0.3-----|"
|
||||
- "L1 "
|
||||
- "L1.12[400,500] |----L1.12-----| "
|
||||
- "L1.13[600,700] |----L1.13-----| "
|
||||
"###
|
||||
);
|
||||
// verify non-overlapping files
|
||||
assert_eq!(non_overlap[0].id.get(), 11);
|
||||
insta::assert_yaml_snapshot!(
|
||||
format_files("non_overlap", &non_overlap),
|
||||
@r###"
|
||||
---
|
||||
- non_overlap
|
||||
- "L1 "
|
||||
- "L1.11[250,350] |------------------------------------L1.11-------------------------------------|"
|
||||
"###
|
||||
);
|
||||
}
|
||||
|
||||
// |--L2.1--| |--L2.2--|
|
||||
|
@ -245,19 +276,28 @@ mod tests {
|
|||
|
||||
let split = TargetLevelNonOverlapSplit::new();
|
||||
let (overlap, non_overlap) = split.apply(files, CompactionLevel::Final);
|
||||
assert_eq!(overlap.len(), 4);
|
||||
assert_eq!(non_overlap.len(), 1);
|
||||
|
||||
// Verify overlapping files
|
||||
// sort by id
|
||||
let mut overlap = overlap;
|
||||
overlap.sort_by(|a, b| a.id.cmp(&b.id));
|
||||
assert_eq!(overlap[0].id.get(), 11);
|
||||
assert_eq!(overlap[1].id.get(), 12);
|
||||
assert_eq!(overlap[2].id.get(), 13);
|
||||
assert_eq!(overlap[3].id.get(), 22);
|
||||
// verify non-overlapping files
|
||||
assert_eq!(non_overlap[0].id.get(), 21);
|
||||
insta::assert_yaml_snapshot!(
|
||||
format_files("overlap", &overlap),
|
||||
@r###"
|
||||
---
|
||||
- overlap
|
||||
- "L1 "
|
||||
- "L1.13[600,700] |----L1.13-----|"
|
||||
- "L1.12[400,500] |----L1.12-----| "
|
||||
- "L1.11[250,350] |----L1.11-----| "
|
||||
- "L2 "
|
||||
- "L2.22[200,300] |----L2.22-----| "
|
||||
"###
|
||||
);
|
||||
insta::assert_yaml_snapshot!(
|
||||
format_files("non_overlap", &non_overlap),
|
||||
@r###"
|
||||
---
|
||||
- non_overlap
|
||||
- "L2 "
|
||||
- "L2.21[0,100] |------------------------------------L2.21-------------------------------------|"
|
||||
"###
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -273,22 +313,28 @@ mod tests {
|
|||
|
||||
let split = TargetLevelNonOverlapSplit::new();
|
||||
let (overlap, non_overlap) = split.apply(files, CompactionLevel::FileNonOverlapped);
|
||||
assert_eq!(overlap.len(), 4);
|
||||
assert_eq!(non_overlap.len(), 2);
|
||||
|
||||
// Verify overlapping files
|
||||
// sort by id
|
||||
let mut overlap = overlap;
|
||||
overlap.sort_by(|a, b| a.id.cmp(&b.id));
|
||||
assert_eq!(overlap[0].id.get(), 1);
|
||||
assert_eq!(overlap[1].id.get(), 2);
|
||||
assert_eq!(overlap[2].id.get(), 12);
|
||||
assert_eq!(overlap[3].id.get(), 13);
|
||||
// verify non-overlapping files
|
||||
// sort by id
|
||||
let mut non_overlap = non_overlap;
|
||||
non_overlap.sort_by(|a, b| a.id.cmp(&b.id));
|
||||
assert_eq!(non_overlap[0].id.get(), 11);
|
||||
assert_eq!(non_overlap[1].id.get(), 14);
|
||||
insta::assert_yaml_snapshot!(
|
||||
format_files("overlap", &overlap),
|
||||
@r###"
|
||||
---
|
||||
- overlap
|
||||
- "L0 "
|
||||
- "L0.2[520,550] |L0.2| "
|
||||
- "L0.1[250,350] |--------L0.1--------| "
|
||||
- "L1 "
|
||||
- "L1.12[200,300] |-------L1.12--------| "
|
||||
- "L1.13[400,500] |-------L1.13--------| "
|
||||
"###
|
||||
);
|
||||
insta::assert_yaml_snapshot!(
|
||||
format_files("non_overlap", &non_overlap),
|
||||
@r###"
|
||||
---
|
||||
- non_overlap
|
||||
- "L1 "
|
||||
- "L1.11[0,100] |--L1.11--| "
|
||||
- "L1.14[600,700] |--L1.14--| "
|
||||
"###
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,6 +52,7 @@ use super::{
|
|||
partition_files_source::catalog::CatalogPartitionFilesSource,
|
||||
partition_filter::{
|
||||
and::AndPartitionFilter, greater_matching_files::GreaterMatchingFilesPartitionFilter,
|
||||
greater_size_matching_files::GreaterSizeMatchingFilesPartitionFilter,
|
||||
has_files::HasFilesPartitionFilter, has_matching_file::HasMatchingFilePartitionFilter,
|
||||
logging::LoggingPartitionFilterWrapper, max_files::MaxFilesPartitionFilter,
|
||||
max_parquet_bytes::MaxParquetBytesPartitionFilter, metrics::MetricsPartitionFilterWrapper,
|
||||
|
@ -304,7 +305,9 @@ fn version_specific_partition_filters(config: &Config) -> Vec<Arc<dyn PartitionF
|
|||
LevelRangeFileFilter::new(CompactionLevel::Initial..=CompactionLevel::Initial),
|
||||
))]
|
||||
}
|
||||
// (Has-L0) OR (num(L1) > N)
|
||||
// (Has-L0) OR -- to avoid overlaped files
|
||||
// (num(L1) > N) OR -- to avoid many files
|
||||
// (total_size(L1) > max_desired_file_size) -- to avoid compact and than split
|
||||
AlgoVersion::TargetLevel => {
|
||||
vec![Arc::new(OrPartitionFilter::new(vec![
|
||||
Arc::new(HasMatchingFilePartitionFilter::new(
|
||||
|
@ -316,6 +319,12 @@ fn version_specific_partition_filters(config: &Config) -> Vec<Arc<dyn PartitionF
|
|||
),
|
||||
config.min_num_l1_files_to_compact,
|
||||
)),
|
||||
Arc::new(GreaterSizeMatchingFilesPartitionFilter::new(
|
||||
LevelRangeFileFilter::new(
|
||||
CompactionLevel::FileNonOverlapped..=CompactionLevel::FileNonOverlapped,
|
||||
),
|
||||
config.max_desired_file_size_bytes,
|
||||
)),
|
||||
]))]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,132 @@
|
|||
use std::fmt::Display;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{ParquetFile, PartitionId};
|
||||
|
||||
use crate::{components::file_filter::FileFilter, error::DynError};
|
||||
|
||||
use super::PartitionFilter;
|
||||
|
||||
/// A partition filter that matches partitions that have files
|
||||
/// matching the given file filter and their total size > max_desired_file_bytes
|
||||
/// The idea for doing this:
|
||||
/// 1. Not to compact large input size to avoid hitting OOM/crash.
|
||||
/// 2. Not to compact too-large input size that lead to unecessary split into many files.
|
||||
/// - Becasue we limit the size of a file. If the compacted result is too large, we will split them into many files.
|
||||
/// - Becasue Level-1 files do not overlap, it is a waste to compact too-large size and then split.
|
||||
#[derive(Debug)]
|
||||
pub struct GreaterSizeMatchingFilesPartitionFilter<T>
|
||||
where
|
||||
T: FileFilter,
|
||||
{
|
||||
filter: T,
|
||||
max_desired_file_bytes: u64,
|
||||
}
|
||||
|
||||
impl<T> GreaterSizeMatchingFilesPartitionFilter<T>
|
||||
where
|
||||
T: FileFilter,
|
||||
{
|
||||
pub fn new(filter: T, max_desired_file_bytes: u64) -> Self {
|
||||
Self {
|
||||
filter,
|
||||
max_desired_file_bytes,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Display for GreaterSizeMatchingFilesPartitionFilter<T>
|
||||
where
|
||||
T: FileFilter,
|
||||
{
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"greater_size_matching_file({}, {})",
|
||||
self.filter, self.max_desired_file_bytes
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T> PartitionFilter for GreaterSizeMatchingFilesPartitionFilter<T>
|
||||
where
|
||||
T: FileFilter,
|
||||
{
|
||||
async fn apply(
|
||||
&self,
|
||||
_partition_id: PartitionId,
|
||||
files: &[ParquetFile],
|
||||
) -> Result<bool, DynError> {
|
||||
// Matching files
|
||||
let matching_files: Vec<&ParquetFile> = files
|
||||
.iter()
|
||||
.filter(|file| self.filter.apply(file))
|
||||
.collect();
|
||||
|
||||
// Sum of file_size_bytes matching files
|
||||
let sum: i64 = matching_files.iter().map(|file| file.file_size_bytes).sum();
|
||||
Ok(sum >= self.max_desired_file_bytes as i64)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use data_types::CompactionLevel;
|
||||
|
||||
use crate::{
|
||||
components::file_filter::level_range::LevelRangeFileFilter, test_util::ParquetFileBuilder,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_display() {
|
||||
let filter = GreaterSizeMatchingFilesPartitionFilter::new(
|
||||
LevelRangeFileFilter::new(
|
||||
CompactionLevel::FileNonOverlapped..=CompactionLevel::FileNonOverlapped,
|
||||
),
|
||||
1,
|
||||
);
|
||||
assert_eq!(
|
||||
filter.to_string(),
|
||||
"greater_size_matching_file(level_range(1..=1), 1)"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_apply() {
|
||||
let filter = GreaterSizeMatchingFilesPartitionFilter::new(
|
||||
LevelRangeFileFilter::new(
|
||||
CompactionLevel::FileNonOverlapped..=CompactionLevel::FileNonOverlapped,
|
||||
),
|
||||
15,
|
||||
);
|
||||
let f1 = ParquetFileBuilder::new(0)
|
||||
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||
.with_file_size_bytes(10)
|
||||
.build();
|
||||
let f2 = ParquetFileBuilder::new(1)
|
||||
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||
.with_file_size_bytes(14)
|
||||
.build();
|
||||
let f3 = ParquetFileBuilder::new(2)
|
||||
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||
.with_file_size_bytes(15)
|
||||
.build();
|
||||
|
||||
let p_id = PartitionId::new(1);
|
||||
|
||||
// empty, not large enough
|
||||
assert!(!filter.apply(p_id, &[]).await.unwrap());
|
||||
|
||||
// Not large enough
|
||||
assert!(!filter.apply(p_id, &[f1.clone()]).await.unwrap());
|
||||
assert!(!filter.apply(p_id, &[f2.clone()]).await.unwrap());
|
||||
|
||||
// large enough
|
||||
assert!(filter.apply(p_id, &[f1.clone(), f2.clone()]).await.unwrap());
|
||||
assert!(filter.apply(p_id, &[f3.clone()]).await.unwrap());
|
||||
assert!(filter.apply(p_id, &[f1, f2, f3]).await.unwrap());
|
||||
}
|
||||
}
|
|
@ -7,6 +7,7 @@ use crate::error::DynError;
|
|||
|
||||
pub mod and;
|
||||
pub mod greater_matching_files;
|
||||
pub mod greater_size_matching_files;
|
||||
pub mod has_files;
|
||||
pub mod has_matching_file;
|
||||
pub mod logging;
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
mod display;
|
||||
pub(crate) use display::{assert_parquet_files, assert_parquet_files_split};
|
||||
pub(crate) use display::{assert_parquet_files, assert_parquet_files_split, format_files};
|
||||
|
||||
use std::{
|
||||
collections::{BTreeMap, HashSet},
|
||||
|
|
|
@ -59,6 +59,13 @@ pub fn assert_parquet_files_split<'a>(
|
|||
);
|
||||
}
|
||||
|
||||
pub fn format_files<'a>(
|
||||
title: impl Into<String>,
|
||||
files: impl IntoIterator<Item = &'a ParquetFile>,
|
||||
) -> Vec<String> {
|
||||
readable_list_of_files(Some(title.into()), files)
|
||||
}
|
||||
|
||||
/// default width for printing
|
||||
const DEFAULT_WIDTH: usize = 80;
|
||||
|
||||
|
|
Loading…
Reference in New Issue