diff --git a/clap_blocks/src/compactor.rs b/clap_blocks/src/compactor.rs index ba266f0f2e..5dac779d25 100644 --- a/clap_blocks/src/compactor.rs +++ b/clap_blocks/src/compactor.rs @@ -27,15 +27,58 @@ pub struct CompactorConfig { )] pub write_buffer_partition_range_end: i32, - /// Percentage of least recent data we want to split to reduce compacting non-overlapped data - /// Must be between 0 and 100. Default is 100, which won't split the resulting file. + /// Max number of level-0 files (written by ingester) we want to compact with level-1 each time + /// Default: 3 + #[clap( + // TODO: verify the theory that if the streaming works as expected, + // we do not need to limit this number + long = "--compaction-max-number-level-0-files", + env = "INFLUXDB_IOX_COMPACTION_MAX_NUMBER_LEVEL_0_FILES", + default_value = "3", + action + )] + pub compaction_max_number_level_0_files: i32, + + /// Desired max size of compacted parquet files + /// It is a target desired value than a guarantee + /// Default is 100,000,000 (100MB) + #[clap( + long = "--compaction-max-desired-size-bytes", + env = "INFLUXDB_IOX_COMPACTION_MAX_DESIRED_FILE_SIZE_BYTES", + default_value = "100000000", + action + )] + pub compaction_max_desired_file_size_bytes: i64, + + /// Percentage of desired max file size. + /// If the estimated compacted result is too small, no need to split it. + /// This percentage is to determine how small it is: + /// < compaction_percentage_max_file_size * compaction_max_desired_file_size_bytes: + /// This value must be between (0, 100) + /// Default is 30 + #[clap( + long = "--compaction-percentage-max-file_size", + env = "INFLUXDB_IOX_COMPACTION_PERCENTAGE_MAX_FILE_SIZE", + default_value = "30", + action + )] + pub compaction_percentage_max_file_size: i16, + + /// Split file percentage + /// If the estimated compacted result is neither too small nor too large, it will be split + /// into 2 files determined by this percentage. + /// . Too small means: < compaction_percentage_max_file_size * compaction_max_desired_file_size_bytes + /// . Too large means: > compaction_max_desired_file_size_bytes + /// . Any size in the middle will be considered neither too small nor too large + /// This value must be between (0, 100) + /// Default is 80 #[clap( long = "--compaction-split-percentage", env = "INFLUXDB_IOX_COMPACTION_SPLIT_PERCENTAGE", - default_value = "100", + default_value = "80", action )] - pub split_percentage: i64, + pub compaction_split_percentage: i16, /// The compactor will limit the number of simultaneous compaction jobs based on the /// size of the input files to be compacted. This number should be less than 1/10th @@ -44,40 +87,8 @@ pub struct CompactorConfig { #[clap( long = "--compaction-concurrent-size-bytes", env = "INFLUXDB_IOX_COMPACTION_CONCURRENT_SIZE_BYTES", - default_value = "1000000000", - action - )] - pub max_concurrent_compaction_size_bytes: i64, - - /// The compactor will compact overlapped files with non-overlapped and contiguous files - /// a larger file of max size defined by the config value. - /// Default is 100,000,000 (100MB) - #[clap( - long = "--compaction-max-size-bytes", - env = "INFLUXDB_IOX_COMPACTION_MAX_SIZE_BYTES", default_value = "100000000", action )] - pub compaction_max_size_bytes: i64, - - /// Limit the number of files per compaction - /// Default is 100 - #[clap( - long = "--compaction-max-file-count", - env = "INFLUXDB_IOX_COMPACTION_MAX_FILE_COUNT", - default_value = "100", - action - )] - pub compaction_max_file_count: i64, - - /// If the compacted result is too large, the compactor will split it into many files each - /// estimated to be smaller than this value - /// Default is 30,000,000 (30MB) - #[clap( - long = "--compaction-max-desired-size-bytes", - env = "INFLUXDB_IOX_COMPACTION_MAX_DESIRED_FILE_SIZE_BYTES", - default_value = "30000000", - action - )] - pub compaction_max_desired_file_size_bytes: i64, + pub max_concurrent_compaction_size_bytes: i64, } diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 552f79ce5d..ccad4c6c2f 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -1157,11 +1157,7 @@ mod tests { // ------------------------------------------------ // Compact - let split_percentage = 90; - let max_concurrent_compaction_size_bytes = 100000; - let compaction_max_size_bytes = 100000; - let compaction_max_file_count = 10; - let compaction_max_desired_file_size_bytes = 30000; + let config = make_compactor_config(); let compactor = Compactor::new( vec![sequencer.sequencer.id], Arc::clone(&catalog.catalog), @@ -1169,13 +1165,7 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new( - split_percentage, - max_concurrent_compaction_size_bytes, - compaction_max_size_bytes, - compaction_max_file_count, - compaction_max_desired_file_size_bytes, - ), + config, Arc::new(metric::Registry::new()), ); @@ -1296,11 +1286,7 @@ mod tests { .create_partition("part") .await; let time = Arc::new(SystemProvider::new()); - let split_percentage = 90; - let max_concurrent_compaction_size_bytes = 100000; - let compaction_max_size_bytes = 100000; - let compaction_max_file_count = 10; - let compaction_max_desired_file_size_bytes = 30000; + let config = make_compactor_config(); let compactor = Compactor::new( vec![sequencer.sequencer.id], Arc::clone(&catalog.catalog), @@ -1308,13 +1294,7 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new( - split_percentage, - max_concurrent_compaction_size_bytes, - compaction_max_size_bytes, - compaction_max_file_count, - compaction_max_desired_file_size_bytes, - ), + config, Arc::new(metric::Registry::new()), ); @@ -1329,7 +1309,7 @@ mod tests { 1, 10, 20, - compactor.config.compaction_max_size_bytes() + 10, + config.compaction_max_desired_file_size_bytes() + 10, 20, ) .await; @@ -1498,11 +1478,7 @@ mod tests { .await .parquet_file; - let split_percentage = 90; - let max_concurrent_compaction_size_bytes = 100000; - let compaction_max_size_bytes = 100000; - let compaction_max_file_count = 10; - let compaction_max_desired_file_size_bytes = 30000; + let config = make_compactor_config(); let compactor = Compactor::new( vec![sequencer.sequencer.id], Arc::clone(&catalog.catalog), @@ -1510,13 +1486,7 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new( - split_percentage, - max_concurrent_compaction_size_bytes, - compaction_max_size_bytes, - compaction_max_file_count, - compaction_max_desired_file_size_bytes, - ), + config, Arc::new(metric::Registry::new()), ); @@ -1593,11 +1563,7 @@ mod tests { .await .parquet_file; - let split_percentage = 90; - let max_concurrent_compaction_size_bytes = 100000; - let compaction_max_size_bytes = 100000; - let compaction_max_file_count = 10; - let compaction_max_desired_file_size_bytes = 30000; + let config = make_compactor_config(); let compactor = Compactor::new( vec![sequencer.sequencer.id], Arc::clone(&catalog.catalog), @@ -1605,13 +1571,7 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new( - split_percentage, - max_concurrent_compaction_size_bytes, - compaction_max_size_bytes, - compaction_max_file_count, - compaction_max_desired_file_size_bytes, - ), + config, Arc::new(metric::Registry::new()), ); @@ -1724,11 +1684,7 @@ mod tests { .await .parquet_file; - let split_percentage = 90; - let max_concurrent_compaction_size_bytes = 100000; - let compaction_max_size_bytes = 100000; - let compaction_max_file_count = 10; - let compaction_max_desired_file_size_bytes = 30000; + let config = make_compactor_config(); let compactor = Compactor::new( vec![sequencer.sequencer.id], Arc::clone(&catalog.catalog), @@ -1736,13 +1692,7 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new( - split_percentage, - max_concurrent_compaction_size_bytes, - compaction_max_size_bytes, - compaction_max_file_count, - compaction_max_desired_file_size_bytes, - ), + config, Arc::new(metric::Registry::new()), ); @@ -1941,11 +1891,7 @@ mod tests { async fn test_add_parquet_file_with_tombstones() { let catalog = TestCatalog::new(); - let split_percentage = 90; - let max_concurrent_compaction_size_bytes = 100000; - let compaction_max_size_bytes = 100000; - let compaction_max_file_count = 10; - let compaction_max_desired_file_size_bytes = 30000; + let config = make_compactor_config(); let compactor = Compactor::new( vec![], Arc::clone(&catalog.catalog), @@ -1953,13 +1899,7 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new( - split_percentage, - max_concurrent_compaction_size_bytes, - compaction_max_size_bytes, - compaction_max_file_count, - compaction_max_desired_file_size_bytes, - ), + config, Arc::new(metric::Registry::new()), ); @@ -2279,11 +2219,7 @@ mod tests { .unwrap(); txn.commit().await.unwrap(); - let split_percentage = 90; - let max_concurrent_compaction_size_bytes = 100000; - let compaction_max_size_bytes = 100000; - let compaction_max_file_count = 10; - let compaction_max_desired_file_size_bytes = 30000; + let config = make_compactor_config(); let compactor = Compactor::new( vec![sequencer.id], Arc::clone(&catalog.catalog), @@ -2291,13 +2227,7 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new( - split_percentage, - max_concurrent_compaction_size_bytes, - compaction_max_size_bytes, - compaction_max_file_count, - compaction_max_desired_file_size_bytes, - ), + config, Arc::new(metric::Registry::new()), ); @@ -2372,11 +2302,7 @@ mod tests { .await .parquet_file; - let split_percentage = 90; - let max_concurrent_compaction_size_bytes = 100000; - let compaction_max_size_bytes = 100000; - let compaction_max_file_count = 10; - let compaction_max_desired_file_size_bytes = 30000; + let config = make_compactor_config(); let compactor = Compactor::new( vec![sequencer.sequencer.id], Arc::clone(&catalog.catalog), @@ -2384,13 +2310,7 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new( - split_percentage, - max_concurrent_compaction_size_bytes, - compaction_max_size_bytes, - compaction_max_file_count, - compaction_max_desired_file_size_bytes, - ), + config, Arc::new(metric::Registry::new()), ); @@ -2452,4 +2372,19 @@ mod tests { .await .unwrap() } + + fn make_compactor_config() -> CompactorConfig { + let compaction_max_number_level_0_files = 3; + let compaction_max_desired_file_size_bytes = 10_000; + let compaction_percentage_max_file_size = 30; + let compaction_split_percentage = 80; + let max_concurrent_compaction_size_bytes = 100_000; + CompactorConfig::new( + compaction_max_number_level_0_files, + compaction_max_desired_file_size_bytes, + compaction_percentage_max_file_size, + compaction_split_percentage, + max_concurrent_compaction_size_bytes, + ) + } } diff --git a/compactor/src/handler.rs b/compactor/src/handler.rs index 2df6ad2fc8..1a4d7ed89b 100644 --- a/compactor/src/handler.rs +++ b/compactor/src/handler.rs @@ -99,49 +99,74 @@ impl CompactorHandlerImpl { /// The configuration options for the compactor. #[derive(Debug, Clone, Copy)] pub struct CompactorConfig { - /// Percentage of least recent data we want to split to reduce compacting non-overlapped data - /// Must be between 0 and 100. - split_percentage: i64, + /// Max number of level-0 files (written by ingester) we want to compact with level-1 each time + compaction_max_number_level_0_files: i32, + + /// Desired max size of compacted parquet files + /// It is a target desired value than a guarantee + compaction_max_desired_file_size_bytes: i64, + + /// Percentage of desired max file size. + /// If the estimated compacted result is too small, no need to split it. + /// This percentage is to determine how small it is: + /// < compaction_percentage_max_file_size * compaction_max_desired_file_size_bytes: + /// This value must be between (0, 100) + compaction_percentage_max_file_size: i16, + + /// Split file percentage + /// If the estimated compacted result is neither too small nor too large, it will be split + /// into 2 files determined by this percentage. + /// . Too small means: < compaction_percentage_max_file_size * compaction_max_desired_file_size_bytes + /// . Too large means: > compaction_max_desired_file_size_bytes + /// . Any size in the middle will be considered neither too small nor too large + /// This value must be between (0, 100) + compaction_split_percentage: i16, + /// The compactor will limit the number of simultaneous compaction jobs based on the - /// size of the input files to be compacted. Currently this only takes into account the - /// level 0 and 1 files, but should later also consider the level 2 files to be compacted. This - /// number should be less than 1/10th of the available memory to ensure compactions have + /// size of the input files to be compacted. This number should be less than 1/10th + /// of the available memory to ensure compactions have /// enough space to run. max_concurrent_compaction_size_bytes: i64, - /// The compactor will compact overlapped files but if the total size of the files - /// exceeds this value, the content of each files will be split in the current compaction cycle - /// so in the next cycle, they will be small enough to get compacted - compaction_max_size_bytes: i64, - /// Limit the number of files to compact into one file - compaction_max_file_count: i64, // TODO: remove this value - /// If the compacted result is larger than this value, it will be persisted into - /// many files, each is estimated smaller than this value - compaction_max_desired_file_size_bytes: i64, } impl CompactorConfig { /// Initialize a valid config pub fn new( - split_percentage: i64, - max_concurrent_compaction_size_bytes: i64, - compaction_max_size_bytes: i64, - compaction_max_file_count: i64, + compaction_max_number_level_0_files: i32, compaction_max_desired_file_size_bytes: i64, + compaction_percentage_max_file_size: i16, + compaction_split_percentage: i16, + max_concurrent_compaction_size_bytes: i64, ) -> Self { - assert!(split_percentage > 0 && split_percentage <= 100); + assert!(compaction_split_percentage > 0 && compaction_split_percentage <= 100); Self { - split_percentage, - max_concurrent_compaction_size_bytes, - compaction_max_size_bytes, - compaction_max_file_count, + compaction_max_number_level_0_files, compaction_max_desired_file_size_bytes, + compaction_percentage_max_file_size, + compaction_split_percentage, + max_concurrent_compaction_size_bytes, } } + /// Max number of level-0 files we want to compact with level-1 each time + pub fn compaction_max_number_level_0_files(&self) -> i32 { + self.compaction_max_number_level_0_files + } + + /// Desired max file of a compacted file + pub fn compaction_max_desired_file_size_bytes(&self) -> i64 { + self.compaction_max_desired_file_size_bytes + } + + /// Percentage of desired max file size to determine a size is too small + pub fn compaction_percentage_max_file_size(&self) -> i16 { + self.compaction_percentage_max_file_size + } + /// Percentage of least recent data we want to split to reduce compacting non-overlapped data - pub fn split_percentage(&self) -> i64 { - self.split_percentage + pub fn compaction_split_percentage(&self) -> i16 { + self.compaction_split_percentage } /// The compactor will limit the number of simultaneous compaction jobs based on the @@ -152,23 +177,6 @@ impl CompactorConfig { pub fn max_concurrent_compaction_size_bytes(&self) -> i64 { self.max_concurrent_compaction_size_bytes } - - /// The compactor will compact overlapped files no matter how much large they are. - /// For non-overlapped and contiguous files, compactor will also compact them into - /// a larger file of max size defined by this config value. - pub fn compaction_max_size_bytes(&self) -> i64 { - self.compaction_max_size_bytes - } - - /// Max number of files to compact at a time - pub fn compaction_max_file_count(&self) -> i64 { - self.compaction_max_file_count - } - - /// max desired persisted file size by the compactor (estimate) - pub fn compaction_max_desired_file_size_bytes(&self) -> i64 { - self.compaction_max_desired_file_size_bytes - } } /// Checks for candidate partitions to compact and spawns tokio tasks to compact as many diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index fb22f1322a..8f379db559 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -392,11 +392,11 @@ impl Config { topic: QUERY_POOL_NAME.to_string(), write_buffer_partition_range_start, write_buffer_partition_range_end, - split_percentage: 90, - max_concurrent_compaction_size_bytes: 100000, - compaction_max_size_bytes: 100000, - compaction_max_file_count: 10, + compaction_max_number_level_0_files: 3, compaction_max_desired_file_size_bytes: 30000, + compaction_percentage_max_file_size: 30, + compaction_split_percentage: 80, + max_concurrent_compaction_size_bytes: 100000, }; let querier_config = QuerierConfig { diff --git a/ioxd_compactor/src/lib.rs b/ioxd_compactor/src/lib.rs index bdb2722297..b1b0f6ad2d 100644 --- a/ioxd_compactor/src/lib.rs +++ b/ioxd_compactor/src/lib.rs @@ -164,11 +164,11 @@ pub async fn create_compactor_server_type( let parquet_store = ParquetStorage::new(object_store); let compactor_config = compactor::handler::CompactorConfig::new( - compactor_config.split_percentage, - compactor_config.max_concurrent_compaction_size_bytes, - compactor_config.compaction_max_size_bytes, - compactor_config.compaction_max_file_count, + compactor_config.compaction_max_number_level_0_files, compactor_config.compaction_max_desired_file_size_bytes, + compactor_config.compaction_percentage_max_file_size, + compactor_config.compaction_split_percentage, + compactor_config.max_concurrent_compaction_size_bytes, ); let compactor_handler = Arc::new(CompactorHandlerImpl::new( sequencers,