From 76d155fe89955fabb1f4311d5d4263b2d42d9d7f Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Wed, 5 Apr 2023 12:48:23 -0400 Subject: [PATCH] feat: Configuration for hot vs cold thresholds This creates a separate option for the number of minutes *without* a write that a partition must have before being considered for cold compaction. This is a new CLI flag so that it can have a different default from hot compaction's compaction_partition_minute_threshold. I didn't add "hot" to compaction_partition_minute_threshold's name so that k8s-idpe doesn't have to change to continue running hot compaction as it is today. Then use the relevant threshold earlier, when creating the PartitionsSourceConfig, to make it clearer which threshold is used where. Right now, this will silently ignore any CLI flag specified that isn't relevant to the current compaction mode. We might want to change that to warn or error to save debugging time in the future. --- clap_blocks/src/compactor2.rs | 30 ++++++++++++++------- compactor2/src/components/hardcoded.rs | 16 ++++++++--- compactor2/src/components/report.rs | 2 -- compactor2/src/config.rs | 26 +++++++++++++----- compactor2_test_utils/src/lib.rs | 5 ++-- influxdb_iox/src/commands/run/all_in_one.rs | 3 ++- ioxd_compactor2/src/lib.rs | 23 ++++++++++------ 7 files changed, 73 insertions(+), 32 deletions(-) diff --git a/clap_blocks/src/compactor2.rs b/clap_blocks/src/compactor2.rs index 4226c224fb..09d5dd81cb 100644 --- a/clap_blocks/src/compactor2.rs +++ b/clap_blocks/src/compactor2.rs @@ -26,6 +26,26 @@ pub struct Compactor2Config { )] pub compaction_type: CompactionType, + /// When in "hot" compaction mode, the compactor will only consider compacting partitions that + /// have new Parquet files created within this many minutes. + #[clap( + long = "compaction_partition_minute_threshold", + env = "INFLUXDB_IOX_COMPACTION_PARTITION_MINUTE_THRESHOLD", + default_value = "10", + action + )] + pub compaction_partition_minute_threshold: u64, + + /// When in "cold" compaction mode, the compactor will only consider compacting partitions that + /// have had no new Parquet files created in at least this many minutes. + #[clap( + long = "compaction_cold_partition_minute_threshold", + env = "INFLUXDB_IOX_COMPACTION_COLD_PARTITION_MINUTE_THRESHOLD", + default_value = "60", + action + )] + pub compaction_cold_partition_minute_threshold: u64, + /// Number of partitions that should be compacted in parallel. /// /// This should usually be larger than the compaction job @@ -61,16 +81,6 @@ pub struct Compactor2Config { )] pub compaction_partition_scratchpad_concurrency: NonZeroUsize, - /// The compactor will only consider compacting partitions that - /// have new parquet files created within this many minutes. - #[clap( - long = "compaction_partition_minute_threshold", - env = "INFLUXDB_IOX_COMPACTION_PARTITION_MINUTE_THRESHOLD", - default_value = "10", - action - )] - pub compaction_partition_minute_threshold: u64, - /// Number of threads to use for the compactor query execution, /// compaction and persistence. /// If not specified, defaults to one less than the number of cores on the system diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index 98c3501782..67fc4c2379 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -125,12 +125,22 @@ fn make_partitions_source_commit_partition_sink( Arc, ) { let partitions_source: Arc = match &config.partitions_source { - PartitionsSourceConfig::CatalogRecentWrites => { + PartitionsSourceConfig::CatalogRecentWrites { threshold } => { Arc::new(CatalogToCompactPartitionsSource::new( config.backoff_config.clone(), Arc::clone(&config.catalog), - config.partition_threshold, - None, // Recent writes is `partition_threshold` ago to now + *threshold, + None, // Recent writes is `threshold` ago to now + Arc::clone(&config.time_provider), + )) + } + PartitionsSourceConfig::CatalogColdForWrites { threshold } => { + Arc::new(CatalogToCompactPartitionsSource::new( + config.backoff_config.clone(), + Arc::clone(&config.catalog), + // Cold for writes is `threshold * 3` ago to `threshold` ago + *threshold * 3, + Some(*threshold), Arc::clone(&config.time_provider), )) } diff --git a/compactor2/src/components/report.rs b/compactor2/src/components/report.rs index a9fcdcbfe3..550783153d 100644 --- a/compactor2/src/components/report.rs +++ b/compactor2/src/components/report.rs @@ -22,7 +22,6 @@ pub fn log_config(config: &Config) { partition_concurrency, job_concurrency, partition_scratchpad_concurrency, - partition_threshold, max_desired_file_size_bytes, percentage_max_file_size, split_percentage, @@ -68,7 +67,6 @@ pub fn log_config(config: &Config) { partition_concurrency=partition_concurrency.get(), job_concurrency=job_concurrency.get(), partition_scratchpad_concurrency=partition_scratchpad_concurrency.get(), - partition_threshold_secs=partition_threshold.as_secs_f32(), max_desired_file_size_bytes, percentage_max_file_size, split_percentage, diff --git a/compactor2/src/config.rs b/compactor2/src/config.rs index 8afc8c4d13..6b91b1f672 100644 --- a/compactor2/src/config.rs +++ b/compactor2/src/config.rs @@ -58,9 +58,6 @@ pub struct Config { /// Number of jobs PER PARTITION that move files in and out of the scratchpad. pub partition_scratchpad_concurrency: NonZeroUsize, - /// Partitions with recent created files this recent duration are selected for compaction. - pub partition_threshold: Duration, - /// Desired max size of compacted parquet files /// It is a target desired value than a guarantee pub max_desired_file_size_bytes: u64, @@ -213,8 +210,20 @@ pub struct ShardConfig { /// Partitions source config. #[derive(Debug, Clone)] pub enum PartitionsSourceConfig { - /// Use the catalog to determine which partitions have recently received writes. - CatalogRecentWrites, + /// For "hot" compaction: use the catalog to determine which partitions have recently received + /// writes, defined as having a new Parquet file created within the last `threshold`. + CatalogRecentWrites { + /// The amount of time ago to look for Parquet file creations + threshold: Duration, + }, + + /// For "cold" compaction: use the catalog to determine which partitions have gone cold for + /// writing and should undergo final compaction, defined as having no new Parquet files created + /// in at least the last `threshold`. + CatalogColdForWrites { + /// The amount of time ago the last Parquet file creation must have happened + threshold: Duration, + }, /// Use all partitions from the catalog. /// @@ -230,7 +239,12 @@ pub enum PartitionsSourceConfig { impl Display for PartitionsSourceConfig { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::CatalogRecentWrites => write!(f, "catalog_recent_writes"), + Self::CatalogRecentWrites { threshold } => { + write!(f, "catalog_recent_writes({threshold:?})") + } + Self::CatalogColdForWrites { threshold } => { + write!(f, "catalog_cold_for_writes({threshold:?})") + } Self::CatalogAll => write!(f, "catalog_all"), Self::Fixed(p_ids) => { let mut p_ids = p_ids.iter().copied().collect::>(); diff --git a/compactor2_test_utils/src/lib.rs b/compactor2_test_utils/src/lib.rs index 664f3f5d07..f6855b9e46 100644 --- a/compactor2_test_utils/src/lib.rs +++ b/compactor2_test_utils/src/lib.rs @@ -133,12 +133,13 @@ impl TestSetupBuilder { partition_concurrency: NonZeroUsize::new(1).unwrap(), job_concurrency: NonZeroUsize::new(1).unwrap(), partition_scratchpad_concurrency: NonZeroUsize::new(1).unwrap(), - partition_threshold: PARTITION_THRESHOLD, max_desired_file_size_bytes: MAX_DESIRE_FILE_SIZE, percentage_max_file_size: PERCENTAGE_MAX_FILE_SIZE, split_percentage: SPLIT_PERCENTAGE, partition_timeout: Duration::from_secs(3_600), - partitions_source: PartitionsSourceConfig::CatalogRecentWrites, + partitions_source: PartitionsSourceConfig::CatalogRecentWrites { + threshold: PARTITION_THRESHOLD, + }, shadow_mode: false, ignore_partition_skip_marker: false, shard_config: None, diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 22dda0ab32..a5ab99b9d4 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -476,10 +476,11 @@ impl Config { // parameters are redundant with ingester's let compactor_config = Compactor2Config { compaction_type: Default::default(), + compaction_partition_minute_threshold: 10, + compaction_cold_partition_minute_threshold: 60, compaction_partition_concurrency: NonZeroUsize::new(1).unwrap(), compaction_job_concurrency: NonZeroUsize::new(1).unwrap(), compaction_partition_scratchpad_concurrency: NonZeroUsize::new(1).unwrap(), - compaction_partition_minute_threshold: 10, query_exec_thread_count: Some(NonZeroUsize::new(1).unwrap()), exec_mem_pool_bytes, max_desired_file_size_bytes: 30_000, diff --git a/ioxd_compactor2/src/lib.rs b/ioxd_compactor2/src/lib.rs index 28f2a183e1..2fbed6dc77 100644 --- a/ioxd_compactor2/src/lib.rs +++ b/ioxd_compactor2/src/lib.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; use backoff::BackoffConfig; -use clap_blocks::compactor2::Compactor2Config; +use clap_blocks::compactor2::{CompactionType, Compactor2Config}; use compactor2::{ compactor::Compactor2, config::{Config, PartitionsSourceConfig, ShardConfig}, @@ -159,13 +159,23 @@ pub async fn create_compactor2_server_type( let partitions_source = match ( compactor_config.partition_filter, compactor_config.process_all_partitions, + compactor_config.compaction_type, ) { - (None, false) => PartitionsSourceConfig::CatalogRecentWrites, - (None, true) => PartitionsSourceConfig::CatalogAll, - (Some(ids), false) => { + (None, false, CompactionType::Hot) => PartitionsSourceConfig::CatalogRecentWrites { + threshold: Duration::from_secs( + compactor_config.compaction_partition_minute_threshold * 60, + ), + }, + (None, false, CompactionType::Cold) => PartitionsSourceConfig::CatalogColdForWrites { + threshold: Duration::from_secs( + compactor_config.compaction_cold_partition_minute_threshold * 60, + ), + }, + (None, true, _) => PartitionsSourceConfig::CatalogAll, + (Some(ids), false, _) => { PartitionsSourceConfig::Fixed(ids.into_iter().map(PartitionId::new).collect()) } - (Some(_), true) => panic!( + (Some(_), true, _) => panic!( "provided partition ID filter and specific 'process all', this does not make sense" ), }; @@ -190,9 +200,6 @@ pub async fn create_compactor2_server_type( job_concurrency: compactor_config.compaction_job_concurrency, partition_scratchpad_concurrency: compactor_config .compaction_partition_scratchpad_concurrency, - partition_threshold: Duration::from_secs( - compactor_config.compaction_partition_minute_threshold * 60, - ), max_desired_file_size_bytes: compactor_config.max_desired_file_size_bytes, percentage_max_file_size: compactor_config.percentage_max_file_size, split_percentage: compactor_config.split_percentage,