feat: Configuration for hot vs cold thresholds

This creates a separate option for the number of minutes *without* a
write that a partition must have before being considered for cold
compaction.

This is a new CLI flag so that it can have a different default from hot
compaction's compaction_partition_minute_threshold.

I didn't add "hot" to compaction_partition_minute_threshold's name so
that k8s-idpe doesn't have to change to continue running hot compaction
as it is today.

Then use the relevant threshold earlier, when creating the
PartitionsSourceConfig, to make it clearer which threshold is used
where.

Right now, this will silently ignore any CLI flag specified that isn't
relevant to the current compaction mode. We might want to change that to
warn or error to save debugging time in the future.
pull/24376/head
Carol (Nichols || Goulding) 2023-04-05 12:48:23 -04:00
parent 15a7c527b4
commit 76d155fe89
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
7 changed files with 73 additions and 32 deletions

View File

@ -26,6 +26,26 @@ pub struct Compactor2Config {
)]
pub compaction_type: CompactionType,
/// When in "hot" compaction mode, the compactor will only consider compacting partitions that
/// have new Parquet files created within this many minutes.
#[clap(
long = "compaction_partition_minute_threshold",
env = "INFLUXDB_IOX_COMPACTION_PARTITION_MINUTE_THRESHOLD",
default_value = "10",
action
)]
pub compaction_partition_minute_threshold: u64,
/// When in "cold" compaction mode, the compactor will only consider compacting partitions that
/// have had no new Parquet files created in at least this many minutes.
#[clap(
long = "compaction_cold_partition_minute_threshold",
env = "INFLUXDB_IOX_COMPACTION_COLD_PARTITION_MINUTE_THRESHOLD",
default_value = "60",
action
)]
pub compaction_cold_partition_minute_threshold: u64,
/// Number of partitions that should be compacted in parallel.
///
/// This should usually be larger than the compaction job
@ -61,16 +81,6 @@ pub struct Compactor2Config {
)]
pub compaction_partition_scratchpad_concurrency: NonZeroUsize,
/// The compactor will only consider compacting partitions that
/// have new parquet files created within this many minutes.
#[clap(
long = "compaction_partition_minute_threshold",
env = "INFLUXDB_IOX_COMPACTION_PARTITION_MINUTE_THRESHOLD",
default_value = "10",
action
)]
pub compaction_partition_minute_threshold: u64,
/// Number of threads to use for the compactor query execution,
/// compaction and persistence.
/// If not specified, defaults to one less than the number of cores on the system

View File

@ -125,12 +125,22 @@ fn make_partitions_source_commit_partition_sink(
Arc<dyn PartitionDoneSink>,
) {
let partitions_source: Arc<dyn PartitionsSource> = match &config.partitions_source {
PartitionsSourceConfig::CatalogRecentWrites => {
PartitionsSourceConfig::CatalogRecentWrites { threshold } => {
Arc::new(CatalogToCompactPartitionsSource::new(
config.backoff_config.clone(),
Arc::clone(&config.catalog),
config.partition_threshold,
None, // Recent writes is `partition_threshold` ago to now
*threshold,
None, // Recent writes is `threshold` ago to now
Arc::clone(&config.time_provider),
))
}
PartitionsSourceConfig::CatalogColdForWrites { threshold } => {
Arc::new(CatalogToCompactPartitionsSource::new(
config.backoff_config.clone(),
Arc::clone(&config.catalog),
// Cold for writes is `threshold * 3` ago to `threshold` ago
*threshold * 3,
Some(*threshold),
Arc::clone(&config.time_provider),
))
}

View File

@ -22,7 +22,6 @@ pub fn log_config(config: &Config) {
partition_concurrency,
job_concurrency,
partition_scratchpad_concurrency,
partition_threshold,
max_desired_file_size_bytes,
percentage_max_file_size,
split_percentage,
@ -68,7 +67,6 @@ pub fn log_config(config: &Config) {
partition_concurrency=partition_concurrency.get(),
job_concurrency=job_concurrency.get(),
partition_scratchpad_concurrency=partition_scratchpad_concurrency.get(),
partition_threshold_secs=partition_threshold.as_secs_f32(),
max_desired_file_size_bytes,
percentage_max_file_size,
split_percentage,

View File

@ -58,9 +58,6 @@ pub struct Config {
/// Number of jobs PER PARTITION that move files in and out of the scratchpad.
pub partition_scratchpad_concurrency: NonZeroUsize,
/// Partitions with recent created files this recent duration are selected for compaction.
pub partition_threshold: Duration,
/// Desired max size of compacted parquet files
/// It is a target desired value than a guarantee
pub max_desired_file_size_bytes: u64,
@ -213,8 +210,20 @@ pub struct ShardConfig {
/// Partitions source config.
#[derive(Debug, Clone)]
pub enum PartitionsSourceConfig {
/// Use the catalog to determine which partitions have recently received writes.
CatalogRecentWrites,
/// For "hot" compaction: use the catalog to determine which partitions have recently received
/// writes, defined as having a new Parquet file created within the last `threshold`.
CatalogRecentWrites {
/// The amount of time ago to look for Parquet file creations
threshold: Duration,
},
/// For "cold" compaction: use the catalog to determine which partitions have gone cold for
/// writing and should undergo final compaction, defined as having no new Parquet files created
/// in at least the last `threshold`.
CatalogColdForWrites {
/// The amount of time ago the last Parquet file creation must have happened
threshold: Duration,
},
/// Use all partitions from the catalog.
///
@ -230,7 +239,12 @@ pub enum PartitionsSourceConfig {
impl Display for PartitionsSourceConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::CatalogRecentWrites => write!(f, "catalog_recent_writes"),
Self::CatalogRecentWrites { threshold } => {
write!(f, "catalog_recent_writes({threshold:?})")
}
Self::CatalogColdForWrites { threshold } => {
write!(f, "catalog_cold_for_writes({threshold:?})")
}
Self::CatalogAll => write!(f, "catalog_all"),
Self::Fixed(p_ids) => {
let mut p_ids = p_ids.iter().copied().collect::<Vec<_>>();

View File

@ -133,12 +133,13 @@ impl TestSetupBuilder<false> {
partition_concurrency: NonZeroUsize::new(1).unwrap(),
job_concurrency: NonZeroUsize::new(1).unwrap(),
partition_scratchpad_concurrency: NonZeroUsize::new(1).unwrap(),
partition_threshold: PARTITION_THRESHOLD,
max_desired_file_size_bytes: MAX_DESIRE_FILE_SIZE,
percentage_max_file_size: PERCENTAGE_MAX_FILE_SIZE,
split_percentage: SPLIT_PERCENTAGE,
partition_timeout: Duration::from_secs(3_600),
partitions_source: PartitionsSourceConfig::CatalogRecentWrites,
partitions_source: PartitionsSourceConfig::CatalogRecentWrites {
threshold: PARTITION_THRESHOLD,
},
shadow_mode: false,
ignore_partition_skip_marker: false,
shard_config: None,

View File

@ -476,10 +476,11 @@ impl Config {
// parameters are redundant with ingester's
let compactor_config = Compactor2Config {
compaction_type: Default::default(),
compaction_partition_minute_threshold: 10,
compaction_cold_partition_minute_threshold: 60,
compaction_partition_concurrency: NonZeroUsize::new(1).unwrap(),
compaction_job_concurrency: NonZeroUsize::new(1).unwrap(),
compaction_partition_scratchpad_concurrency: NonZeroUsize::new(1).unwrap(),
compaction_partition_minute_threshold: 10,
query_exec_thread_count: Some(NonZeroUsize::new(1).unwrap()),
exec_mem_pool_bytes,
max_desired_file_size_bytes: 30_000,

View File

@ -1,6 +1,6 @@
use async_trait::async_trait;
use backoff::BackoffConfig;
use clap_blocks::compactor2::Compactor2Config;
use clap_blocks::compactor2::{CompactionType, Compactor2Config};
use compactor2::{
compactor::Compactor2,
config::{Config, PartitionsSourceConfig, ShardConfig},
@ -159,13 +159,23 @@ pub async fn create_compactor2_server_type(
let partitions_source = match (
compactor_config.partition_filter,
compactor_config.process_all_partitions,
compactor_config.compaction_type,
) {
(None, false) => PartitionsSourceConfig::CatalogRecentWrites,
(None, true) => PartitionsSourceConfig::CatalogAll,
(Some(ids), false) => {
(None, false, CompactionType::Hot) => PartitionsSourceConfig::CatalogRecentWrites {
threshold: Duration::from_secs(
compactor_config.compaction_partition_minute_threshold * 60,
),
},
(None, false, CompactionType::Cold) => PartitionsSourceConfig::CatalogColdForWrites {
threshold: Duration::from_secs(
compactor_config.compaction_cold_partition_minute_threshold * 60,
),
},
(None, true, _) => PartitionsSourceConfig::CatalogAll,
(Some(ids), false, _) => {
PartitionsSourceConfig::Fixed(ids.into_iter().map(PartitionId::new).collect())
}
(Some(_), true) => panic!(
(Some(_), true, _) => panic!(
"provided partition ID filter and specific 'process all', this does not make sense"
),
};
@ -190,9 +200,6 @@ pub async fn create_compactor2_server_type(
job_concurrency: compactor_config.compaction_job_concurrency,
partition_scratchpad_concurrency: compactor_config
.compaction_partition_scratchpad_concurrency,
partition_threshold: Duration::from_secs(
compactor_config.compaction_partition_minute_threshold * 60,
),
max_desired_file_size_bytes: compactor_config.max_desired_file_size_bytes,
percentage_max_file_size: compactor_config.percentage_max_file_size,
split_percentage: compactor_config.split_percentage,