feat: add file_count_threshold for comapcting cold partitions (#5456)
* feat: file file_count_threshold for comapcting cold partitions to make it consistent with the hot case and help set up to avoid oom easier * chore: remove unecessary commmentspull/24376/head
parent
e1cbc23a0f
commit
3220c6f88b
|
@ -173,8 +173,6 @@ macro_rules! gen_compactor_config {
|
|||
///
|
||||
/// A compaction operation will be limited by this or by the input size threshold,
|
||||
/// whichever is hit first.
|
||||
///
|
||||
/// Default is 50.
|
||||
#[clap(
|
||||
long = "--compaction-input-file-count-threshold",
|
||||
env = "INFLUXDB_IOX_COMPACTION_INPUT_FILE_COUNT_THRESHOLD",
|
||||
|
@ -183,6 +181,21 @@ macro_rules! gen_compactor_config {
|
|||
)]
|
||||
pub input_file_count_threshold: usize,
|
||||
|
||||
/// A compaction operation for cold partitions will gather as many L0 files with their
|
||||
/// overlapping L1 files to compact together until the total number of L0 + L1 files
|
||||
/// crosses this threshold.
|
||||
/// Later compactions will pick up the remaining L0 files.
|
||||
///
|
||||
/// A compaction operation will be limited by this or by the cold input size threshold,
|
||||
/// whichever is hit first.
|
||||
#[clap(
|
||||
long = "--compaction-cold-input-file-count-threshold",
|
||||
env = "INFLUXDB_IOX_COMPACTION_COLD_INPUT_FILE_COUNT_THRESHOLD",
|
||||
default_value = "50",
|
||||
action
|
||||
)]
|
||||
pub cold_input_file_count_threshold: usize,
|
||||
|
||||
/// The multiple of times that compacting hot partitions should run for every one time
|
||||
/// that compacting cold partitions runs. Set to 1 to compact hot partitions and cold
|
||||
/// partitions equally.
|
||||
|
@ -223,6 +236,7 @@ impl CompactorOnceConfig {
|
|||
input_size_threshold_bytes: self.input_size_threshold_bytes,
|
||||
cold_input_size_threshold_bytes: self.cold_input_size_threshold_bytes,
|
||||
input_file_count_threshold: self.input_file_count_threshold,
|
||||
cold_input_file_count_threshold: self.cold_input_file_count_threshold,
|
||||
hot_multiple: self.hot_multiple,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -767,6 +767,7 @@ mod tests {
|
|||
let input_size_threshold_bytes = 300 * 1024 * 1024;
|
||||
let cold_input_size_threshold_bytes = 600 * 1024 * 1024;
|
||||
let input_file_count_threshold = 100;
|
||||
let cold_input_file_count_threshold = 100;
|
||||
let hot_multiple = 4;
|
||||
CompactorConfig::new(
|
||||
max_desired_file_size_bytes,
|
||||
|
@ -779,6 +780,7 @@ mod tests {
|
|||
input_size_threshold_bytes,
|
||||
cold_input_size_threshold_bytes,
|
||||
input_file_count_threshold,
|
||||
cold_input_file_count_threshold,
|
||||
hot_multiple,
|
||||
)
|
||||
}
|
||||
|
|
|
@ -144,6 +144,14 @@ pub struct CompactorConfig {
|
|||
/// hit first.
|
||||
input_file_count_threshold: usize,
|
||||
|
||||
/// A compaction operation or cold partitions will gather as many L0 files with their
|
||||
/// overlapping L1 files to compact together until the total number of L0 + L1 files crosses this
|
||||
/// threshold. Later compactions will pick up the remaining L0 files.
|
||||
///
|
||||
/// A compaction operation will be limited by this or by the input size threshold, whichever is
|
||||
/// hit first.
|
||||
cold_input_file_count_threshold: usize,
|
||||
|
||||
/// The multiple of times that compacting hot partitions should run for every one time that
|
||||
/// compacting cold partitions runs. Set to 1 to compact hot partitions and cold partitions
|
||||
/// equally.
|
||||
|
@ -164,6 +172,7 @@ impl CompactorConfig {
|
|||
input_size_threshold_bytes: u64,
|
||||
cold_input_size_threshold_bytes: u64,
|
||||
input_file_count_threshold: usize,
|
||||
cold_input_file_count_threshold: usize,
|
||||
hot_multiple: usize,
|
||||
) -> Self {
|
||||
assert!(split_percentage > 0 && split_percentage <= 100);
|
||||
|
@ -179,6 +188,7 @@ impl CompactorConfig {
|
|||
input_size_threshold_bytes,
|
||||
cold_input_size_threshold_bytes,
|
||||
input_file_count_threshold,
|
||||
cold_input_file_count_threshold,
|
||||
hot_multiple,
|
||||
}
|
||||
}
|
||||
|
@ -243,6 +253,16 @@ impl CompactorConfig {
|
|||
pub fn input_file_count_threshold(&self) -> usize {
|
||||
self.input_file_count_threshold
|
||||
}
|
||||
|
||||
/// A compaction operation for cold partitions will gather as many L0 files with their overlapping L1 files to
|
||||
/// compact together until the total number of L0 + L1 files crosses this threshold. Later
|
||||
/// compactions will pick up the remaining L0 files.
|
||||
///
|
||||
/// A compaction operation will be limited by this or by the input size threshold, whichever is
|
||||
/// hit first.
|
||||
pub fn cold_input_file_count_threshold(&self) -> usize {
|
||||
self.cold_input_file_count_threshold
|
||||
}
|
||||
}
|
||||
|
||||
/// How long to pause before checking for more work again if there was
|
||||
|
|
|
@ -119,6 +119,7 @@ pub(crate) async fn compact_cold_partition(
|
|||
let to_compact = parquet_file_filtering::filter_cold_parquet_files(
|
||||
parquet_files_for_compaction,
|
||||
compactor.config.cold_input_size_threshold_bytes(),
|
||||
compactor.config.cold_input_file_count_threshold(),
|
||||
&compactor.parquet_file_candidate_gauge,
|
||||
&compactor.parquet_file_candidate_bytes,
|
||||
);
|
||||
|
@ -792,6 +793,7 @@ mod tests {
|
|||
let input_size_threshold_bytes = 300 * 1024 * 1024;
|
||||
let cold_input_size_threshold_bytes = 600 * 1024 * 1024;
|
||||
let input_file_count_threshold = 100;
|
||||
let cold_input_file_count_threshold = 100;
|
||||
let hot_multiple = 4;
|
||||
|
||||
CompactorConfig::new(
|
||||
|
@ -805,6 +807,7 @@ mod tests {
|
|||
input_size_threshold_bytes,
|
||||
cold_input_size_threshold_bytes,
|
||||
input_file_count_threshold,
|
||||
cold_input_file_count_threshold,
|
||||
hot_multiple,
|
||||
)
|
||||
}
|
||||
|
|
|
@ -152,6 +152,9 @@ pub(crate) fn filter_cold_parquet_files(
|
|||
// Stop considering level 0 files when the total size of all files selected for compaction so
|
||||
// far exceeds this value
|
||||
max_bytes: u64,
|
||||
// Stop considering level 0 files when the count of L0 + L1 files selected for compaction so
|
||||
// far exceeds this value
|
||||
input_file_count_threshold: usize,
|
||||
// Gauge for the number of Parquet file candidates
|
||||
parquet_file_candidate_gauge: &Metric<U64Gauge>,
|
||||
// Histogram for the number of bytes of Parquet file candidates
|
||||
|
@ -187,6 +190,12 @@ pub(crate) fn filter_cold_parquet_files(
|
|||
let mut total_level_1_bytes = 0;
|
||||
|
||||
for level_0_file in level_0 {
|
||||
// Check we haven't exceeded `input_file_count_threshold`, if we have, stop considering
|
||||
// level 0 files
|
||||
if (level_0_to_return.len() + files_to_return.len()) >= input_file_count_threshold {
|
||||
break;
|
||||
}
|
||||
|
||||
// Include at least one level 0 file without checking against `max_bytes`
|
||||
total_level_0_bytes += level_0_file.file_size_bytes as u64;
|
||||
|
||||
|
@ -820,6 +829,7 @@ mod tests {
|
|||
use super::*;
|
||||
|
||||
const DEFAULT_MAX_FILE_SIZE: u64 = 1024 * 1024 * 10;
|
||||
const DEFAULT_INPUT_FILE_COUNT: usize = 100;
|
||||
|
||||
#[test]
|
||||
fn empty_in_empty_out() {
|
||||
|
@ -832,6 +842,7 @@ mod tests {
|
|||
let files = filter_cold_parquet_files(
|
||||
parquet_files_for_compaction,
|
||||
DEFAULT_MAX_FILE_SIZE,
|
||||
DEFAULT_INPUT_FILE_COUNT,
|
||||
&files_metric,
|
||||
&bytes_metric,
|
||||
);
|
||||
|
@ -850,6 +861,7 @@ mod tests {
|
|||
let files = filter_cold_parquet_files(
|
||||
parquet_files_for_compaction,
|
||||
0,
|
||||
DEFAULT_INPUT_FILE_COUNT,
|
||||
&files_metric,
|
||||
&bytes_metric,
|
||||
);
|
||||
|
@ -858,6 +870,25 @@ mod tests {
|
|||
assert_eq!(files[0].id.get(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn max_file_count_0_returns_empty() {
|
||||
let parquet_files_for_compaction = ParquetFilesForCompaction {
|
||||
level_0: vec![ParquetFileBuilder::level_0().id(1).build()],
|
||||
level_1: vec![],
|
||||
};
|
||||
let (files_metric, bytes_metric) = metrics();
|
||||
|
||||
let files = filter_cold_parquet_files(
|
||||
parquet_files_for_compaction,
|
||||
DEFAULT_MAX_FILE_SIZE,
|
||||
0,
|
||||
&files_metric,
|
||||
&bytes_metric,
|
||||
);
|
||||
|
||||
assert!(files.is_empty(), "Expected empty, got: {:#?}", files);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn one_level_0_file_no_level_1_overlaps() {
|
||||
let parquet_files_for_compaction = ParquetFilesForCompaction {
|
||||
|
@ -886,6 +917,7 @@ mod tests {
|
|||
let files = filter_cold_parquet_files(
|
||||
parquet_files_for_compaction,
|
||||
DEFAULT_MAX_FILE_SIZE,
|
||||
DEFAULT_INPUT_FILE_COUNT,
|
||||
&files_metric,
|
||||
&bytes_metric,
|
||||
);
|
||||
|
@ -928,6 +960,7 @@ mod tests {
|
|||
let files = filter_cold_parquet_files(
|
||||
parquet_files_for_compaction,
|
||||
DEFAULT_MAX_FILE_SIZE,
|
||||
DEFAULT_INPUT_FILE_COUNT,
|
||||
&files_metric,
|
||||
&bytes_metric,
|
||||
);
|
||||
|
@ -992,6 +1025,7 @@ mod tests {
|
|||
let files = filter_cold_parquet_files(
|
||||
parquet_files_for_compaction,
|
||||
DEFAULT_MAX_FILE_SIZE,
|
||||
DEFAULT_INPUT_FILE_COUNT,
|
||||
&files_metric,
|
||||
&bytes_metric,
|
||||
);
|
||||
|
@ -1102,6 +1136,7 @@ mod tests {
|
|||
let files = filter_cold_parquet_files(
|
||||
parquet_files_for_compaction.clone(),
|
||||
max_size,
|
||||
DEFAULT_INPUT_FILE_COUNT,
|
||||
&files_metric,
|
||||
&bytes_metric,
|
||||
);
|
||||
|
@ -1133,6 +1168,7 @@ mod tests {
|
|||
let files = filter_cold_parquet_files(
|
||||
parquet_files_for_compaction.clone(),
|
||||
max_size,
|
||||
DEFAULT_INPUT_FILE_COUNT,
|
||||
&files_metric,
|
||||
&bytes_metric,
|
||||
);
|
||||
|
@ -1164,6 +1200,7 @@ mod tests {
|
|||
let files = filter_cold_parquet_files(
|
||||
parquet_files_for_compaction.clone(),
|
||||
max_size,
|
||||
DEFAULT_INPUT_FILE_COUNT,
|
||||
&files_metric,
|
||||
&bytes_metric,
|
||||
);
|
||||
|
@ -1193,6 +1230,7 @@ mod tests {
|
|||
let files = filter_cold_parquet_files(
|
||||
parquet_files_for_compaction,
|
||||
DEFAULT_MAX_FILE_SIZE,
|
||||
DEFAULT_INPUT_FILE_COUNT,
|
||||
&files_metric,
|
||||
&bytes_metric,
|
||||
);
|
||||
|
|
|
@ -426,6 +426,7 @@ impl Config {
|
|||
input_size_threshold_bytes: 314_572_800,
|
||||
cold_input_size_threshold_bytes: 629_145_600,
|
||||
input_file_count_threshold: 100,
|
||||
cold_input_file_count_threshold: 100,
|
||||
hot_multiple: 4,
|
||||
};
|
||||
|
||||
|
|
|
@ -197,6 +197,7 @@ pub async fn build_compactor_from_config(
|
|||
compactor_config.input_size_threshold_bytes,
|
||||
compactor_config.cold_input_size_threshold_bytes,
|
||||
compactor_config.input_file_count_threshold,
|
||||
compactor_config.cold_input_file_count_threshold,
|
||||
compactor_config.hot_multiple,
|
||||
);
|
||||
|
||||
|
|
Loading…
Reference in New Issue