feat: to prevent OOMs/crrash that will lead to skip compaction, let us compact L1 files when they are large enough (#6877)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
f56023b222
commit
76172daf39
|
@ -52,6 +52,7 @@ use super::{
|
||||||
partition_files_source::catalog::CatalogPartitionFilesSource,
|
partition_files_source::catalog::CatalogPartitionFilesSource,
|
||||||
partition_filter::{
|
partition_filter::{
|
||||||
and::AndPartitionFilter, greater_matching_files::GreaterMatchingFilesPartitionFilter,
|
and::AndPartitionFilter, greater_matching_files::GreaterMatchingFilesPartitionFilter,
|
||||||
|
greater_size_matching_files::GreaterSizeMatchingFilesPartitionFilter,
|
||||||
has_files::HasFilesPartitionFilter, has_matching_file::HasMatchingFilePartitionFilter,
|
has_files::HasFilesPartitionFilter, has_matching_file::HasMatchingFilePartitionFilter,
|
||||||
logging::LoggingPartitionFilterWrapper, max_files::MaxFilesPartitionFilter,
|
logging::LoggingPartitionFilterWrapper, max_files::MaxFilesPartitionFilter,
|
||||||
max_parquet_bytes::MaxParquetBytesPartitionFilter, metrics::MetricsPartitionFilterWrapper,
|
max_parquet_bytes::MaxParquetBytesPartitionFilter, metrics::MetricsPartitionFilterWrapper,
|
||||||
|
@ -295,7 +296,9 @@ fn version_specific_partition_filters(config: &Config) -> Vec<Arc<dyn PartitionF
|
||||||
)),
|
)),
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
// (Has-L0) OR (num(L1) > N)
|
// (Has-L0) OR -- to avoid overlaped files
|
||||||
|
// (num(L1) > N) OR -- to avoid many files
|
||||||
|
// (total_size(L1) > max_desired_file_size) -- to avoid compact and than split
|
||||||
AlgoVersion::TargetLevel => {
|
AlgoVersion::TargetLevel => {
|
||||||
vec![
|
vec![
|
||||||
Arc::new(OrPartitionFilter::new(vec![
|
Arc::new(OrPartitionFilter::new(vec![
|
||||||
|
@ -310,6 +313,12 @@ fn version_specific_partition_filters(config: &Config) -> Vec<Arc<dyn PartitionF
|
||||||
),
|
),
|
||||||
config.min_num_l1_files_to_compact,
|
config.min_num_l1_files_to_compact,
|
||||||
)),
|
)),
|
||||||
|
Arc::new(GreaterSizeMatchingFilesPartitionFilter::new(
|
||||||
|
LevelRangeFileFilter::new(
|
||||||
|
CompactionLevel::FileNonOverlapped..=CompactionLevel::FileNonOverlapped,
|
||||||
|
),
|
||||||
|
config.max_desired_file_size_bytes,
|
||||||
|
)),
|
||||||
])),
|
])),
|
||||||
Arc::new(MaxFilesPartitionFilter::new(
|
Arc::new(MaxFilesPartitionFilter::new(
|
||||||
config.max_input_files_per_partition,
|
config.max_input_files_per_partition,
|
||||||
|
|
|
@ -0,0 +1,132 @@
|
||||||
|
use std::fmt::Display;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use data_types::{ParquetFile, PartitionId};
|
||||||
|
|
||||||
|
use crate::{components::file_filter::FileFilter, error::DynError};
|
||||||
|
|
||||||
|
use super::PartitionFilter;
|
||||||
|
|
||||||
|
/// A partition filter that matches partitions that have files
|
||||||
|
/// matching the given file filter and their total size > max_desired_file_bytes
|
||||||
|
/// The idea for doing this:
|
||||||
|
/// 1. Not to compact large input size to avoid hitting OOM/crash.
|
||||||
|
/// 2. Not to compact too-large input size that lead to unecessary split into many files.
|
||||||
|
/// - Becasue we limit the size of a file. If the compacted result is too large, we will split them into many files.
|
||||||
|
/// - Becasue Level-1 files do not overlap, it is a waste to compact too-large size and then split.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct GreaterSizeMatchingFilesPartitionFilter<T>
|
||||||
|
where
|
||||||
|
T: FileFilter,
|
||||||
|
{
|
||||||
|
filter: T,
|
||||||
|
max_desired_file_bytes: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> GreaterSizeMatchingFilesPartitionFilter<T>
|
||||||
|
where
|
||||||
|
T: FileFilter,
|
||||||
|
{
|
||||||
|
pub fn new(filter: T, max_desired_file_bytes: u64) -> Self {
|
||||||
|
Self {
|
||||||
|
filter,
|
||||||
|
max_desired_file_bytes,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Display for GreaterSizeMatchingFilesPartitionFilter<T>
|
||||||
|
where
|
||||||
|
T: FileFilter,
|
||||||
|
{
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"greater_size_matching_file({}, {})",
|
||||||
|
self.filter, self.max_desired_file_bytes
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<T> PartitionFilter for GreaterSizeMatchingFilesPartitionFilter<T>
|
||||||
|
where
|
||||||
|
T: FileFilter,
|
||||||
|
{
|
||||||
|
async fn apply(
|
||||||
|
&self,
|
||||||
|
_partition_id: PartitionId,
|
||||||
|
files: &[ParquetFile],
|
||||||
|
) -> Result<bool, DynError> {
|
||||||
|
// Matching files
|
||||||
|
let matching_files: Vec<&ParquetFile> = files
|
||||||
|
.iter()
|
||||||
|
.filter(|file| self.filter.apply(file))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Sum of file_size_bytes matching files
|
||||||
|
let sum: i64 = matching_files.iter().map(|file| file.file_size_bytes).sum();
|
||||||
|
Ok(sum >= self.max_desired_file_bytes as i64)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use data_types::CompactionLevel;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
components::file_filter::level_range::LevelRangeFileFilter, test_util::ParquetFileBuilder,
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_display() {
|
||||||
|
let filter = GreaterSizeMatchingFilesPartitionFilter::new(
|
||||||
|
LevelRangeFileFilter::new(
|
||||||
|
CompactionLevel::FileNonOverlapped..=CompactionLevel::FileNonOverlapped,
|
||||||
|
),
|
||||||
|
1,
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
filter.to_string(),
|
||||||
|
"greater_size_matching_file(level_range(1..=1), 1)"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_apply() {
|
||||||
|
let filter = GreaterSizeMatchingFilesPartitionFilter::new(
|
||||||
|
LevelRangeFileFilter::new(
|
||||||
|
CompactionLevel::FileNonOverlapped..=CompactionLevel::FileNonOverlapped,
|
||||||
|
),
|
||||||
|
15,
|
||||||
|
);
|
||||||
|
let f1 = ParquetFileBuilder::new(0)
|
||||||
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||||
|
.with_file_size_bytes(10)
|
||||||
|
.build();
|
||||||
|
let f2 = ParquetFileBuilder::new(1)
|
||||||
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||||
|
.with_file_size_bytes(14)
|
||||||
|
.build();
|
||||||
|
let f3 = ParquetFileBuilder::new(2)
|
||||||
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||||
|
.with_file_size_bytes(15)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
let p_id = PartitionId::new(1);
|
||||||
|
|
||||||
|
// empty, not large enough
|
||||||
|
assert!(!filter.apply(p_id, &[]).await.unwrap());
|
||||||
|
|
||||||
|
// Not large enough
|
||||||
|
assert!(!filter.apply(p_id, &[f1.clone()]).await.unwrap());
|
||||||
|
assert!(!filter.apply(p_id, &[f2.clone()]).await.unwrap());
|
||||||
|
|
||||||
|
// large enough
|
||||||
|
assert!(filter.apply(p_id, &[f1.clone(), f2.clone()]).await.unwrap());
|
||||||
|
assert!(filter.apply(p_id, &[f3.clone()]).await.unwrap());
|
||||||
|
assert!(filter.apply(p_id, &[f1, f2, f3]).await.unwrap());
|
||||||
|
}
|
||||||
|
}
|
|
@ -7,6 +7,7 @@ use crate::error::DynError;
|
||||||
|
|
||||||
pub mod and;
|
pub mod and;
|
||||||
pub mod greater_matching_files;
|
pub mod greater_matching_files;
|
||||||
|
pub mod greater_size_matching_files;
|
||||||
pub mod has_files;
|
pub mod has_files;
|
||||||
pub mod has_matching_file;
|
pub mod has_matching_file;
|
||||||
pub mod logging;
|
pub mod logging;
|
||||||
|
|
Loading…
Reference in New Issue