From f56023b222f5eda2ae0023b081960b1a5eb91ff5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 7 Feb 2023 16:08:47 +0100 Subject: [PATCH 1/2] feat: use insta_snapshots (#6884) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 1 + compactor2/Cargo.toml | 1 + .../target_level_non_overlap_split.rs | 144 ++++++++++++------ compactor2/src/test_util.rs | 2 +- compactor2/src/test_util/display.rs | 7 + 5 files changed, 105 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33b41a73c8..a9c7b4a765 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1003,6 +1003,7 @@ dependencies = [ "data_types", "datafusion", "futures", + "insta", "iox_catalog", "iox_query", "iox_tests", diff --git a/compactor2/Cargo.toml b/compactor2/Cargo.toml index f18f9b5e2c..2b378dc54c 100644 --- a/compactor2/Cargo.toml +++ b/compactor2/Cargo.toml @@ -34,3 +34,4 @@ workspace-hack = { path = "../workspace-hack"} arrow_util = { path = "../arrow_util" } iox_tests = { path = "../iox_tests" } test_helpers = { path = "../test_helpers"} +insta = { version = "1.26.0", features = ["yaml"] } diff --git a/compactor2/src/components/files_split/target_level_non_overlap_split.rs b/compactor2/src/components/files_split/target_level_non_overlap_split.rs index d42797e786..78c82cbec1 100644 --- a/compactor2/src/components/files_split/target_level_non_overlap_split.rs +++ b/compactor2/src/components/files_split/target_level_non_overlap_split.rs @@ -130,7 +130,7 @@ mod tests { use crate::test_util::{ create_l1_files, create_overlapped_files, create_overlapped_files_2, - create_overlapped_l0_l1_files, create_overlapped_l1_l2_files, + create_overlapped_l0_l1_files, create_overlapped_l1_l2_files, format_files, }; use super::*; @@ -186,7 +186,17 @@ mod tests { #[test] fn test_apply_one_level_empty() { let files = create_l1_files(1); - assert_eq!(files.len(), 3); + insta::assert_yaml_snapshot!( + format_files("initial", &files), + @r###" + --- + - initial + - "L1 " + - "L1.13[600,700] |-----L1.13-----| " + - "L1.12[400,500] |-----L1.12-----| " + - "L1.11[250,350] |-----L1.11-----| " + "### + ); let split = TargetLevelNonOverlapSplit::new(); @@ -206,27 +216,48 @@ mod tests { let files = create_overlapped_l0_l1_files(1); assert_eq!(files.len(), 6); - // Input files: - // |--L1.1--| |--L1.2--| |--L1.3--| - // |--L0.1--| |--L0.2--| |--L0.3--| - // Output files: (overlap, non_overlap) = ( [L0.1, L0.2, L0.3, L1.2, L1.3] , L1.1] ) + insta::assert_yaml_snapshot!( + format_files("initial", &files), + @r###" + --- + - initial + - "L0 " + - "L0.2[650,750] |---L0.2---| " + - "L0.1[450,620] |-------L0.1-------| " + - "L0.3[800,900] |---L0.3---| " + - "L1 " + - "L1.13[600,700] |--L1.13---| " + - "L1.12[400,500] |--L1.12---| " + - "L1.11[250,350] |--L1.11---| " + "### + ); let split = TargetLevelNonOverlapSplit::new(); let (overlap, non_overlap) = split.apply(files, CompactionLevel::FileNonOverlapped); - assert_eq!(overlap.len(), 5); - assert_eq!(non_overlap.len(), 1); - - // Verify overlapping files - // sort by id - let mut overlap = overlap; - overlap.sort_by(|a, b| a.id.cmp(&b.id)); - assert_eq!(overlap[0].id.get(), 1); - assert_eq!(overlap[1].id.get(), 2); - assert_eq!(overlap[2].id.get(), 3); - assert_eq!(overlap[3].id.get(), 12); - assert_eq!(overlap[4].id.get(), 13); + insta::assert_yaml_snapshot!( + format_files("overlap", &overlap), + @r###" + --- + - overlap + - "L0 " + - "L0.2[650,750] |-----L0.2-----| " + - "L0.1[450,620] |----------L0.1-----------| " + - "L0.3[800,900] |-----L0.3-----|" + - "L1 " + - "L1.12[400,500] |----L1.12-----| " + - "L1.13[600,700] |----L1.13-----| " + "### + ); // verify non-overlapping files - assert_eq!(non_overlap[0].id.get(), 11); + insta::assert_yaml_snapshot!( + format_files("non_overlap", &non_overlap), + @r###" + --- + - non_overlap + - "L1 " + - "L1.11[250,350] |------------------------------------L1.11-------------------------------------|" + "### + ); } // |--L2.1--| |--L2.2--| @@ -245,19 +276,28 @@ mod tests { let split = TargetLevelNonOverlapSplit::new(); let (overlap, non_overlap) = split.apply(files, CompactionLevel::Final); - assert_eq!(overlap.len(), 4); - assert_eq!(non_overlap.len(), 1); - - // Verify overlapping files - // sort by id - let mut overlap = overlap; - overlap.sort_by(|a, b| a.id.cmp(&b.id)); - assert_eq!(overlap[0].id.get(), 11); - assert_eq!(overlap[1].id.get(), 12); - assert_eq!(overlap[2].id.get(), 13); - assert_eq!(overlap[3].id.get(), 22); - // verify non-overlapping files - assert_eq!(non_overlap[0].id.get(), 21); + insta::assert_yaml_snapshot!( + format_files("overlap", &overlap), + @r###" + --- + - overlap + - "L1 " + - "L1.13[600,700] |----L1.13-----|" + - "L1.12[400,500] |----L1.12-----| " + - "L1.11[250,350] |----L1.11-----| " + - "L2 " + - "L2.22[200,300] |----L2.22-----| " + "### + ); + insta::assert_yaml_snapshot!( + format_files("non_overlap", &non_overlap), + @r###" + --- + - non_overlap + - "L2 " + - "L2.21[0,100] |------------------------------------L2.21-------------------------------------|" + "### + ); } #[test] @@ -273,22 +313,28 @@ mod tests { let split = TargetLevelNonOverlapSplit::new(); let (overlap, non_overlap) = split.apply(files, CompactionLevel::FileNonOverlapped); - assert_eq!(overlap.len(), 4); - assert_eq!(non_overlap.len(), 2); - - // Verify overlapping files - // sort by id - let mut overlap = overlap; - overlap.sort_by(|a, b| a.id.cmp(&b.id)); - assert_eq!(overlap[0].id.get(), 1); - assert_eq!(overlap[1].id.get(), 2); - assert_eq!(overlap[2].id.get(), 12); - assert_eq!(overlap[3].id.get(), 13); - // verify non-overlapping files - // sort by id - let mut non_overlap = non_overlap; - non_overlap.sort_by(|a, b| a.id.cmp(&b.id)); - assert_eq!(non_overlap[0].id.get(), 11); - assert_eq!(non_overlap[1].id.get(), 14); + insta::assert_yaml_snapshot!( + format_files("overlap", &overlap), + @r###" + --- + - overlap + - "L0 " + - "L0.2[520,550] |L0.2| " + - "L0.1[250,350] |--------L0.1--------| " + - "L1 " + - "L1.12[200,300] |-------L1.12--------| " + - "L1.13[400,500] |-------L1.13--------| " + "### + ); + insta::assert_yaml_snapshot!( + format_files("non_overlap", &non_overlap), + @r###" + --- + - non_overlap + - "L1 " + - "L1.11[0,100] |--L1.11--| " + - "L1.14[600,700] |--L1.14--| " + "### + ); } } diff --git a/compactor2/src/test_util.rs b/compactor2/src/test_util.rs index c52a15f401..2a4131feab 100644 --- a/compactor2/src/test_util.rs +++ b/compactor2/src/test_util.rs @@ -1,5 +1,5 @@ mod display; -pub(crate) use display::{assert_parquet_files, assert_parquet_files_split}; +pub(crate) use display::{assert_parquet_files, assert_parquet_files_split, format_files}; use std::{ collections::{BTreeMap, HashSet}, diff --git a/compactor2/src/test_util/display.rs b/compactor2/src/test_util/display.rs index cb1d859e21..bc74aa397d 100644 --- a/compactor2/src/test_util/display.rs +++ b/compactor2/src/test_util/display.rs @@ -59,6 +59,13 @@ pub fn assert_parquet_files_split<'a>( ); } +pub fn format_files<'a>( + title: impl Into, + files: impl IntoIterator, +) -> Vec { + readable_list_of_files(Some(title.into()), files) +} + /// default width for printing const DEFAULT_WIDTH: usize = 80; From 76172daf39f3ce12f8547eb02c65d216cdfebb2a Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Tue, 7 Feb 2023 12:46:44 -0500 Subject: [PATCH 2/2] feat: to prevent OOMs/crrash that will lead to skip compaction, let us compact L1 files when they are large enough (#6877) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- compactor2/src/components/hardcoded.rs | 11 +- .../greater_size_matching_files.rs | 132 ++++++++++++++++++ .../src/components/partition_filter/mod.rs | 1 + 3 files changed, 143 insertions(+), 1 deletion(-) create mode 100644 compactor2/src/components/partition_filter/greater_size_matching_files.rs diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index a1f30c8d6c..1d9fb76272 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -52,6 +52,7 @@ use super::{ partition_files_source::catalog::CatalogPartitionFilesSource, partition_filter::{ and::AndPartitionFilter, greater_matching_files::GreaterMatchingFilesPartitionFilter, + greater_size_matching_files::GreaterSizeMatchingFilesPartitionFilter, has_files::HasFilesPartitionFilter, has_matching_file::HasMatchingFilePartitionFilter, logging::LoggingPartitionFilterWrapper, max_files::MaxFilesPartitionFilter, max_parquet_bytes::MaxParquetBytesPartitionFilter, metrics::MetricsPartitionFilterWrapper, @@ -295,7 +296,9 @@ fn version_specific_partition_filters(config: &Config) -> Vec N) + // (Has-L0) OR -- to avoid overlaped files + // (num(L1) > N) OR -- to avoid many files + // (total_size(L1) > max_desired_file_size) -- to avoid compact and than split AlgoVersion::TargetLevel => { vec![ Arc::new(OrPartitionFilter::new(vec![ @@ -310,6 +313,12 @@ fn version_specific_partition_filters(config: &Config) -> Vec max_desired_file_bytes +/// The idea for doing this: +/// 1. Not to compact large input size to avoid hitting OOM/crash. +/// 2. Not to compact too-large input size that lead to unecessary split into many files. +/// - Becasue we limit the size of a file. If the compacted result is too large, we will split them into many files. +/// - Becasue Level-1 files do not overlap, it is a waste to compact too-large size and then split. +#[derive(Debug)] +pub struct GreaterSizeMatchingFilesPartitionFilter +where + T: FileFilter, +{ + filter: T, + max_desired_file_bytes: u64, +} + +impl GreaterSizeMatchingFilesPartitionFilter +where + T: FileFilter, +{ + pub fn new(filter: T, max_desired_file_bytes: u64) -> Self { + Self { + filter, + max_desired_file_bytes, + } + } +} + +impl Display for GreaterSizeMatchingFilesPartitionFilter +where + T: FileFilter, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "greater_size_matching_file({}, {})", + self.filter, self.max_desired_file_bytes + ) + } +} + +#[async_trait] +impl PartitionFilter for GreaterSizeMatchingFilesPartitionFilter +where + T: FileFilter, +{ + async fn apply( + &self, + _partition_id: PartitionId, + files: &[ParquetFile], + ) -> Result { + // Matching files + let matching_files: Vec<&ParquetFile> = files + .iter() + .filter(|file| self.filter.apply(file)) + .collect(); + + // Sum of file_size_bytes matching files + let sum: i64 = matching_files.iter().map(|file| file.file_size_bytes).sum(); + Ok(sum >= self.max_desired_file_bytes as i64) + } +} + +#[cfg(test)] +mod tests { + use data_types::CompactionLevel; + + use crate::{ + components::file_filter::level_range::LevelRangeFileFilter, test_util::ParquetFileBuilder, + }; + + use super::*; + + #[test] + fn test_display() { + let filter = GreaterSizeMatchingFilesPartitionFilter::new( + LevelRangeFileFilter::new( + CompactionLevel::FileNonOverlapped..=CompactionLevel::FileNonOverlapped, + ), + 1, + ); + assert_eq!( + filter.to_string(), + "greater_size_matching_file(level_range(1..=1), 1)" + ); + } + + #[tokio::test] + async fn test_apply() { + let filter = GreaterSizeMatchingFilesPartitionFilter::new( + LevelRangeFileFilter::new( + CompactionLevel::FileNonOverlapped..=CompactionLevel::FileNonOverlapped, + ), + 15, + ); + let f1 = ParquetFileBuilder::new(0) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_file_size_bytes(10) + .build(); + let f2 = ParquetFileBuilder::new(1) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_file_size_bytes(14) + .build(); + let f3 = ParquetFileBuilder::new(2) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_file_size_bytes(15) + .build(); + + let p_id = PartitionId::new(1); + + // empty, not large enough + assert!(!filter.apply(p_id, &[]).await.unwrap()); + + // Not large enough + assert!(!filter.apply(p_id, &[f1.clone()]).await.unwrap()); + assert!(!filter.apply(p_id, &[f2.clone()]).await.unwrap()); + + // large enough + assert!(filter.apply(p_id, &[f1.clone(), f2.clone()]).await.unwrap()); + assert!(filter.apply(p_id, &[f3.clone()]).await.unwrap()); + assert!(filter.apply(p_id, &[f1, f2, f3]).await.unwrap()); + } +} diff --git a/compactor2/src/components/partition_filter/mod.rs b/compactor2/src/components/partition_filter/mod.rs index e9afa9a25d..cd7fb40a34 100644 --- a/compactor2/src/components/partition_filter/mod.rs +++ b/compactor2/src/components/partition_filter/mod.rs @@ -7,6 +7,7 @@ use crate::error::DynError; pub mod and; pub mod greater_matching_files; +pub mod greater_size_matching_files; pub mod has_files; pub mod has_matching_file; pub mod logging;