diff --git a/clap_blocks/src/compactor.rs b/clap_blocks/src/compactor.rs index 38fdc826d3..27cc3a7ddf 100644 --- a/clap_blocks/src/compactor.rs +++ b/clap_blocks/src/compactor.rs @@ -72,7 +72,7 @@ pub struct CompactorConfig { /// The compactor will limit the number of simultaneous compaction jobs based on the /// size of the input files to be compacted. This number should be less than 1/10th /// of the available memory to ensure compactions have - /// enough space to run. Default is 1,073,741,824 bytes (1GB ). + /// enough space to run. Default is 1,073,741,824 bytes (1GB). #[clap( long = "--compaction-concurrent-size-bytes", env = "INFLUXDB_IOX_COMPACTION_CONCURRENT_SIZE_BYTES", @@ -101,12 +101,14 @@ pub struct CompactorConfig { )] pub min_number_recent_ingested_files_per_partition: usize, - /// TODO: Describe this. Multiplier? + /// A compaction operation will gather as many L0 files with their overlapping L1 files to + /// compact together until the total size of input files crosses this threshold. Later + /// compactions will pick up the remaining L0 files. Default is 314,572,800 bytes (300MB). #[clap( - long = "--new-param", - env = "INFLUXDB_IOX_COMPACTION_NEW_PARAM", - default_value = "3", + long = "--compaction-input-size-threshold-bytes", + env = "INFLUXDB_IOX_COMPACTION_INPUT_SIZE_THRESHOLD_BYTES", + default_value = "314572800", action )] - pub new_param: u64, + pub input_size_threshold_bytes: u64, } diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index a8d042806c..b875390e42 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -2613,7 +2613,7 @@ mod tests { let max_concurrent_size_bytes = 100_000; let max_number_partitions_per_sequencer = 1; let min_number_recent_ingested_per_partition = 1; - let new_param = 3; + let input_size_threshold_bytes = 300 * 1024 * 1024; CompactorConfig::new( max_desired_file_size_bytes, percentage_max_file_size, @@ -2621,7 +2621,7 @@ mod tests { max_concurrent_size_bytes, max_number_partitions_per_sequencer, min_number_recent_ingested_per_partition, - new_param, + input_size_threshold_bytes, ) } } diff --git a/compactor/src/handler.rs b/compactor/src/handler.rs index 146bf78001..63eb8c6be1 100644 --- a/compactor/src/handler.rs +++ b/compactor/src/handler.rs @@ -130,8 +130,10 @@ pub struct CompactorConfig { /// Min number of recent ingested files a partition needs to be considered for compacting min_number_recent_ingested_files_per_partition: usize, - /// TODO: describe this. Multiplier? - new_param: u64, + /// A compaction operation will gather as many L0 files with their overlapping L1 files to + /// compact together until the total size of input files crosses this threshold. Later + /// compactions will pick up the remaining L0 files. + input_size_threshold_bytes: u64, } impl CompactorConfig { @@ -143,7 +145,7 @@ impl CompactorConfig { max_concurrent_compaction_size_bytes: u64, max_number_partitions_per_sequencer: usize, min_number_recent_ingested_files_per_partition: usize, - new_param: u64, + input_size_threshold_bytes: u64, ) -> Self { assert!(split_percentage > 0 && split_percentage <= 100); @@ -154,7 +156,7 @@ impl CompactorConfig { max_concurrent_compaction_size_bytes, max_number_partitions_per_sequencer, min_number_recent_ingested_files_per_partition, - new_param, + input_size_threshold_bytes, } } @@ -192,9 +194,11 @@ impl CompactorConfig { self.min_number_recent_ingested_files_per_partition } - /// TODO: Describe this. Multiplier? - pub fn new_param(&self) -> u64 { - self.new_param + /// A compaction operation will gather as many L0 files with their overlapping L1 files to + /// compact together until the total size of input files crosses this threshold. Later + /// compactions will pick up the remaining L0 files. + pub fn input_size_threshold_bytes(&self) -> u64 { + self.input_size_threshold_bytes } } diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index c8723b1c06..0321f7ee3c 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -47,11 +47,9 @@ pub(crate) async fn compact_partition( .await .context(ParquetFileLookupSnafu)?; - let max_bytes = compactor.config.max_desired_file_size_bytes() * compactor.config.new_param(); - let _to_compact = parquet_file_filtering::filter_parquet_files( parquet_files_for_compaction, - max_bytes, + compactor.config.input_size_threshold_bytes(), &compactor.parquet_file_candidate_gauge, &compactor.parquet_file_candidate_bytes_gauge, ); diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 6b86ea64c3..448cf70b03 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -392,13 +392,13 @@ impl Config { topic: QUERY_POOL_NAME.to_string(), write_buffer_partition_range_start, write_buffer_partition_range_end, - max_desired_file_size_bytes: 30000, + max_desired_file_size_bytes: 30_000, percentage_max_file_size: 30, split_percentage: 80, - max_concurrent_size_bytes: 100000, + max_concurrent_size_bytes: 100_000, max_number_partitions_per_sequencer: 1, min_number_recent_ingested_files_per_partition: 1, - new_param: 3, + input_size_threshold_bytes: 314_572_800, }; let querier_config = QuerierConfig { diff --git a/ioxd_compactor/src/lib.rs b/ioxd_compactor/src/lib.rs index 552e831c84..d1d8f26aec 100644 --- a/ioxd_compactor/src/lib.rs +++ b/ioxd_compactor/src/lib.rs @@ -170,7 +170,7 @@ pub async fn create_compactor_server_type( compactor_config.max_concurrent_size_bytes, compactor_config.max_number_partitions_per_sequencer, compactor_config.min_number_recent_ingested_files_per_partition, - compactor_config.new_param, + compactor_config.input_size_threshold_bytes, ); let compactor_handler = Arc::new(CompactorHandlerImpl::new( sequencers,