From 3220c6f88bbacc5e567bed4d9fc2b50f68ebb42e Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Tue, 23 Aug 2022 16:12:21 -0400 Subject: [PATCH] feat: add file_count_threshold for comapcting cold partitions (#5456) * feat: file file_count_threshold for comapcting cold partitions to make it consistent with the hot case and help set up to avoid oom easier * chore: remove unecessary commments --- clap_blocks/src/compactor.rs | 18 ++++++++-- compactor/src/compact.rs | 2 ++ compactor/src/handler.rs | 20 +++++++++++ compactor/src/lib.rs | 3 ++ compactor/src/parquet_file_filtering.rs | 38 +++++++++++++++++++++ influxdb_iox/src/commands/run/all_in_one.rs | 1 + ioxd_compactor/src/lib.rs | 1 + 7 files changed, 81 insertions(+), 2 deletions(-) diff --git a/clap_blocks/src/compactor.rs b/clap_blocks/src/compactor.rs index 83a8ad0748..63f1350d96 100644 --- a/clap_blocks/src/compactor.rs +++ b/clap_blocks/src/compactor.rs @@ -173,8 +173,6 @@ macro_rules! gen_compactor_config { /// /// A compaction operation will be limited by this or by the input size threshold, /// whichever is hit first. - /// - /// Default is 50. #[clap( long = "--compaction-input-file-count-threshold", env = "INFLUXDB_IOX_COMPACTION_INPUT_FILE_COUNT_THRESHOLD", @@ -183,6 +181,21 @@ macro_rules! gen_compactor_config { )] pub input_file_count_threshold: usize, + /// A compaction operation for cold partitions will gather as many L0 files with their + /// overlapping L1 files to compact together until the total number of L0 + L1 files + /// crosses this threshold. + /// Later compactions will pick up the remaining L0 files. + /// + /// A compaction operation will be limited by this or by the cold input size threshold, + /// whichever is hit first. + #[clap( + long = "--compaction-cold-input-file-count-threshold", + env = "INFLUXDB_IOX_COMPACTION_COLD_INPUT_FILE_COUNT_THRESHOLD", + default_value = "50", + action + )] + pub cold_input_file_count_threshold: usize, + /// The multiple of times that compacting hot partitions should run for every one time /// that compacting cold partitions runs. Set to 1 to compact hot partitions and cold /// partitions equally. @@ -223,6 +236,7 @@ impl CompactorOnceConfig { input_size_threshold_bytes: self.input_size_threshold_bytes, cold_input_size_threshold_bytes: self.cold_input_size_threshold_bytes, input_file_count_threshold: self.input_file_count_threshold, + cold_input_file_count_threshold: self.cold_input_file_count_threshold, hot_multiple: self.hot_multiple, } } diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index e897360922..7dd8c8637c 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -767,6 +767,7 @@ mod tests { let input_size_threshold_bytes = 300 * 1024 * 1024; let cold_input_size_threshold_bytes = 600 * 1024 * 1024; let input_file_count_threshold = 100; + let cold_input_file_count_threshold = 100; let hot_multiple = 4; CompactorConfig::new( max_desired_file_size_bytes, @@ -779,6 +780,7 @@ mod tests { input_size_threshold_bytes, cold_input_size_threshold_bytes, input_file_count_threshold, + cold_input_file_count_threshold, hot_multiple, ) } diff --git a/compactor/src/handler.rs b/compactor/src/handler.rs index 02f7f55aaf..c758cc183a 100644 --- a/compactor/src/handler.rs +++ b/compactor/src/handler.rs @@ -144,6 +144,14 @@ pub struct CompactorConfig { /// hit first. input_file_count_threshold: usize, + /// A compaction operation or cold partitions will gather as many L0 files with their + /// overlapping L1 files to compact together until the total number of L0 + L1 files crosses this + /// threshold. Later compactions will pick up the remaining L0 files. + /// + /// A compaction operation will be limited by this or by the input size threshold, whichever is + /// hit first. + cold_input_file_count_threshold: usize, + /// The multiple of times that compacting hot partitions should run for every one time that /// compacting cold partitions runs. Set to 1 to compact hot partitions and cold partitions /// equally. @@ -164,6 +172,7 @@ impl CompactorConfig { input_size_threshold_bytes: u64, cold_input_size_threshold_bytes: u64, input_file_count_threshold: usize, + cold_input_file_count_threshold: usize, hot_multiple: usize, ) -> Self { assert!(split_percentage > 0 && split_percentage <= 100); @@ -179,6 +188,7 @@ impl CompactorConfig { input_size_threshold_bytes, cold_input_size_threshold_bytes, input_file_count_threshold, + cold_input_file_count_threshold, hot_multiple, } } @@ -243,6 +253,16 @@ impl CompactorConfig { pub fn input_file_count_threshold(&self) -> usize { self.input_file_count_threshold } + + /// A compaction operation for cold partitions will gather as many L0 files with their overlapping L1 files to + /// compact together until the total number of L0 + L1 files crosses this threshold. Later + /// compactions will pick up the remaining L0 files. + /// + /// A compaction operation will be limited by this or by the input size threshold, whichever is + /// hit first. + pub fn cold_input_file_count_threshold(&self) -> usize { + self.cold_input_file_count_threshold + } } /// How long to pause before checking for more work again if there was diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index 117c7e6d4e..4f8eb90d38 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -119,6 +119,7 @@ pub(crate) async fn compact_cold_partition( let to_compact = parquet_file_filtering::filter_cold_parquet_files( parquet_files_for_compaction, compactor.config.cold_input_size_threshold_bytes(), + compactor.config.cold_input_file_count_threshold(), &compactor.parquet_file_candidate_gauge, &compactor.parquet_file_candidate_bytes, ); @@ -792,6 +793,7 @@ mod tests { let input_size_threshold_bytes = 300 * 1024 * 1024; let cold_input_size_threshold_bytes = 600 * 1024 * 1024; let input_file_count_threshold = 100; + let cold_input_file_count_threshold = 100; let hot_multiple = 4; CompactorConfig::new( @@ -805,6 +807,7 @@ mod tests { input_size_threshold_bytes, cold_input_size_threshold_bytes, input_file_count_threshold, + cold_input_file_count_threshold, hot_multiple, ) } diff --git a/compactor/src/parquet_file_filtering.rs b/compactor/src/parquet_file_filtering.rs index 85ff4ece84..8c9a24c448 100644 --- a/compactor/src/parquet_file_filtering.rs +++ b/compactor/src/parquet_file_filtering.rs @@ -152,6 +152,9 @@ pub(crate) fn filter_cold_parquet_files( // Stop considering level 0 files when the total size of all files selected for compaction so // far exceeds this value max_bytes: u64, + // Stop considering level 0 files when the count of L0 + L1 files selected for compaction so + // far exceeds this value + input_file_count_threshold: usize, // Gauge for the number of Parquet file candidates parquet_file_candidate_gauge: &Metric, // Histogram for the number of bytes of Parquet file candidates @@ -187,6 +190,12 @@ pub(crate) fn filter_cold_parquet_files( let mut total_level_1_bytes = 0; for level_0_file in level_0 { + // Check we haven't exceeded `input_file_count_threshold`, if we have, stop considering + // level 0 files + if (level_0_to_return.len() + files_to_return.len()) >= input_file_count_threshold { + break; + } + // Include at least one level 0 file without checking against `max_bytes` total_level_0_bytes += level_0_file.file_size_bytes as u64; @@ -820,6 +829,7 @@ mod tests { use super::*; const DEFAULT_MAX_FILE_SIZE: u64 = 1024 * 1024 * 10; + const DEFAULT_INPUT_FILE_COUNT: usize = 100; #[test] fn empty_in_empty_out() { @@ -832,6 +842,7 @@ mod tests { let files = filter_cold_parquet_files( parquet_files_for_compaction, DEFAULT_MAX_FILE_SIZE, + DEFAULT_INPUT_FILE_COUNT, &files_metric, &bytes_metric, ); @@ -850,6 +861,7 @@ mod tests { let files = filter_cold_parquet_files( parquet_files_for_compaction, 0, + DEFAULT_INPUT_FILE_COUNT, &files_metric, &bytes_metric, ); @@ -858,6 +870,25 @@ mod tests { assert_eq!(files[0].id.get(), 1); } + #[test] + fn max_file_count_0_returns_empty() { + let parquet_files_for_compaction = ParquetFilesForCompaction { + level_0: vec![ParquetFileBuilder::level_0().id(1).build()], + level_1: vec![], + }; + let (files_metric, bytes_metric) = metrics(); + + let files = filter_cold_parquet_files( + parquet_files_for_compaction, + DEFAULT_MAX_FILE_SIZE, + 0, + &files_metric, + &bytes_metric, + ); + + assert!(files.is_empty(), "Expected empty, got: {:#?}", files); + } + #[test] fn one_level_0_file_no_level_1_overlaps() { let parquet_files_for_compaction = ParquetFilesForCompaction { @@ -886,6 +917,7 @@ mod tests { let files = filter_cold_parquet_files( parquet_files_for_compaction, DEFAULT_MAX_FILE_SIZE, + DEFAULT_INPUT_FILE_COUNT, &files_metric, &bytes_metric, ); @@ -928,6 +960,7 @@ mod tests { let files = filter_cold_parquet_files( parquet_files_for_compaction, DEFAULT_MAX_FILE_SIZE, + DEFAULT_INPUT_FILE_COUNT, &files_metric, &bytes_metric, ); @@ -992,6 +1025,7 @@ mod tests { let files = filter_cold_parquet_files( parquet_files_for_compaction, DEFAULT_MAX_FILE_SIZE, + DEFAULT_INPUT_FILE_COUNT, &files_metric, &bytes_metric, ); @@ -1102,6 +1136,7 @@ mod tests { let files = filter_cold_parquet_files( parquet_files_for_compaction.clone(), max_size, + DEFAULT_INPUT_FILE_COUNT, &files_metric, &bytes_metric, ); @@ -1133,6 +1168,7 @@ mod tests { let files = filter_cold_parquet_files( parquet_files_for_compaction.clone(), max_size, + DEFAULT_INPUT_FILE_COUNT, &files_metric, &bytes_metric, ); @@ -1164,6 +1200,7 @@ mod tests { let files = filter_cold_parquet_files( parquet_files_for_compaction.clone(), max_size, + DEFAULT_INPUT_FILE_COUNT, &files_metric, &bytes_metric, ); @@ -1193,6 +1230,7 @@ mod tests { let files = filter_cold_parquet_files( parquet_files_for_compaction, DEFAULT_MAX_FILE_SIZE, + DEFAULT_INPUT_FILE_COUNT, &files_metric, &bytes_metric, ); diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 7e5c778694..c153ef5717 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -426,6 +426,7 @@ impl Config { input_size_threshold_bytes: 314_572_800, cold_input_size_threshold_bytes: 629_145_600, input_file_count_threshold: 100, + cold_input_file_count_threshold: 100, hot_multiple: 4, }; diff --git a/ioxd_compactor/src/lib.rs b/ioxd_compactor/src/lib.rs index 31f8402ebd..42fb8d2f79 100644 --- a/ioxd_compactor/src/lib.rs +++ b/ioxd_compactor/src/lib.rs @@ -197,6 +197,7 @@ pub async fn build_compactor_from_config( compactor_config.input_size_threshold_bytes, compactor_config.cold_input_size_threshold_bytes, compactor_config.input_file_count_threshold, + compactor_config.cold_input_file_count_threshold, compactor_config.hot_multiple, );