diff --git a/clap_blocks/src/compactor.rs b/clap_blocks/src/compactor.rs index 5c23f98709..3370d0d2d0 100644 --- a/clap_blocks/src/compactor.rs +++ b/clap_blocks/src/compactor.rs @@ -245,6 +245,17 @@ macro_rules! gen_compactor_config { )] pub max_parallel_partitions: u64, + /// When select warm partition candidates, partitions with new created files (any level) after + /// this threshold will be considered a candidate. However, only partitions with many contiguous small + /// L1 files will get warm compacted + #[clap( + long = "compaction-warm-partition_candidate-hours-threshold", + env = "INFLUXDB_IOX_COMPACTION_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD", + default_value = "24", + action + )] + pub warm_partition_candidates_hours_threshold: u64, + /// When querying for partitions suitable for warm compaction, this is the /// upper bound on file size to be counted as "small". /// Default is half of max_desired_file_size_bytes's default (see above). @@ -300,6 +311,7 @@ impl CompactorOnceConfig { hot_compaction_hours_threshold_1: self.hot_compaction_hours_threshold_1, hot_compaction_hours_threshold_2: self.hot_compaction_hours_threshold_2, max_parallel_partitions: self.max_parallel_partitions, + warm_partition_candidates_hours_threshold: self.warm_partition_candidates_hours_threshold, warm_compaction_small_size_threshold_bytes: self.warm_compaction_small_size_threshold_bytes, warm_compaction_min_small_file_count: self.warm_compaction_min_small_file_count, } diff --git a/clap_blocks/src/compactor2.rs b/clap_blocks/src/compactor2.rs index 9cef1899dd..ba28a16d98 100644 --- a/clap_blocks/src/compactor2.rs +++ b/clap_blocks/src/compactor2.rs @@ -223,6 +223,17 @@ macro_rules! gen_compactor_config { )] pub max_parallel_partitions: u64, + /// When select warm partition candidates, partitions with new created files (any level) after + /// this threshold will be considered a candidate. However, only partitions with many contiguous small + /// L1 files will get warm compacted + #[clap( + long = "compaction-warm-partition_candidate-hours-threshold", + env = "INFLUXDB_IOX_COMPACTION_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD", + default_value = "24", + action + )] + pub warm_partition_candidates_hours_threshold: u64, + /// When querying for partitions suitable for warm compaction, this is the /// upper bound on file size to be counted as "small". /// Default is half of max_desired_file_size_bytes's default (see above). @@ -275,6 +286,7 @@ impl Compactor2OnceConfig { hot_compaction_hours_threshold_1: self.hot_compaction_hours_threshold_1, hot_compaction_hours_threshold_2: self.hot_compaction_hours_threshold_2, max_parallel_partitions: self.max_parallel_partitions, + warm_partition_candidates_hours_threshold: self.warm_partition_candidates_hours_threshold, warm_compaction_small_size_threshold_bytes: self.warm_compaction_small_size_threshold_bytes, warm_compaction_min_small_file_count: self.warm_compaction_min_small_file_count, } diff --git a/compactor/src/cold.rs b/compactor/src/cold.rs index 70f61831bd..ada321affb 100644 --- a/compactor/src/cold.rs +++ b/compactor/src/cold.rs @@ -2,7 +2,7 @@ //! fully compacted. use crate::{ - compact::{Compactor, ShardAssignment}, + compact::Compactor, compact_candidates_with_memory_budget, compact_in_parallel, parquet_file_combining, parquet_file_lookup::{self, CompactionType}, utils::get_candidates_with_retry, @@ -20,25 +20,8 @@ pub async fn compact(compactor: Arc, do_full_compact: bool) -> usize // https://github.com/influxdata/influxdb_iox/issues/6518 to remove the use of shard_id and // simplify this - let max_num_partitions = match &compactor.shards { - ShardAssignment::All => { - debug!( - %compaction_type, - max_num_partitions = compactor.config.max_number_partitions_per_shard, - "Compactor2" - ); - compactor.config.max_number_partitions_per_shard - } - ShardAssignment::Only(shards) => { - debug!( - %compaction_type, - num_shards = shards.len(), - max_number_partitions_per_shard = compactor.config.max_number_partitions_per_shard, - "Compactor1" - ); - compactor.config.max_number_partitions_per_shard * shards.len() - } - }; + let max_num_partitions = + compactor.shards.len() * compactor.config.max_number_partitions_per_shard; let cold_partition_candidates_hours_threshold = compactor.config.cold_partition_candidates_hours_threshold; @@ -150,6 +133,7 @@ mod tests { const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4; const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24; + const DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24; const DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24; const MINUTE_WITHOUT_NEW_WRITE_TO_BE_COLD: u64 = 10; const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20; @@ -759,6 +743,8 @@ mod tests { hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1, hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2, max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS, + warm_partition_candidates_hours_threshold: + DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD, warm_compaction_small_size_threshold_bytes: 5_000, warm_compaction_min_small_file_count: 10, } diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index d87698e7ee..0ca6d09b49 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -756,6 +756,8 @@ pub mod tests { hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1, hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2, max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS, + warm_partition_candidates_hours_threshold: + DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD, warm_compaction_small_size_threshold_bytes: 5_000, warm_compaction_min_small_file_count: 10, } diff --git a/compactor/src/handler.rs b/compactor/src/handler.rs index cadbbff486..1ff58c59af 100644 --- a/compactor/src/handler.rs +++ b/compactor/src/handler.rs @@ -190,6 +190,11 @@ pub struct CompactorConfig { /// and may also lead to inaccuracy of memory estimation. This number is to cap that. pub max_parallel_partitions: u64, + /// When select warm partition candidates, partitions with new created files (any level) after + /// this threshold will be considered a candidate. However, only partitions with many contiguous small + /// L1 files will get warm compacted + pub warm_partition_candidates_hours_threshold: u64, + /// Upper bound on file size to be counted as "small" for warm compaction. pub warm_compaction_small_size_threshold_bytes: i64, diff --git a/compactor/src/hot.rs b/compactor/src/hot.rs index 1118eb8f56..15e0f6de14 100644 --- a/compactor/src/hot.rs +++ b/compactor/src/hot.rs @@ -1,10 +1,8 @@ //! Collect highest hot candidates and compact them use crate::{ - compact::{Compactor, ShardAssignment}, - compact_candidates_with_memory_budget, compact_in_parallel, - parquet_file_lookup::CompactionType, - utils::get_candidates_with_retry, + compact::Compactor, compact_candidates_with_memory_budget, compact_in_parallel, + parquet_file_lookup::CompactionType, utils::get_candidates_with_retry, }; use data_types::CompactionLevel; use metric::Attributes; @@ -17,25 +15,8 @@ pub async fn compact(compactor: Arc) -> usize { // https://github.com/influxdata/influxdb_iox/issues/6518 to remove the use of shard_id and // simplify this - let max_num_partitions = match &compactor.shards { - ShardAssignment::All => { - debug!( - %compaction_type, - max_num_partitions = compactor.config.max_number_partitions_per_shard, - "Compactor2" - ); - compactor.config.max_number_partitions_per_shard - } - ShardAssignment::Only(shards) => { - debug!( - %compaction_type, - num_shards = shards.len(), - max_number_partitions_per_shard = compactor.config.max_number_partitions_per_shard, - "Compactor1" - ); - compactor.config.max_number_partitions_per_shard * shards.len() - } - }; + let max_num_partitions = + compactor.shards.len() * compactor.config.max_number_partitions_per_shard; let hour_threshold_1 = compactor.config.hot_compaction_hours_threshold_1; let hour_threshold_2 = compactor.config.hot_compaction_hours_threshold_2; @@ -106,6 +87,7 @@ mod tests { const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4; const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24; + const DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24; const DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24; const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20; const DEFAULT_MAX_NUM_PARTITION_CANDIDATES: usize = 10; @@ -462,6 +444,8 @@ mod tests { hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1, hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2, max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS, + warm_partition_candidates_hours_threshold: + DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD, warm_compaction_small_size_threshold_bytes: 5_000, warm_compaction_min_small_file_count: 10, }; diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index 33c86445af..9a3b693172 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -489,6 +489,7 @@ pub mod tests { const DEFAULT_MAX_NUM_PARTITION_CANDIDATES: usize = 100; const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4; const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24; + const DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24; const DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24; const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20; @@ -613,6 +614,8 @@ pub mod tests { hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1, hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2, max_parallel_partitions: max_parallel_jobs, + warm_partition_candidates_hours_threshold: + DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD, warm_compaction_small_size_threshold_bytes: 50_000_000, warm_compaction_min_small_file_count: 10, } @@ -945,6 +948,8 @@ pub mod tests { hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1, hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2, max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS, + warm_partition_candidates_hours_threshold: + DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD, warm_compaction_small_size_threshold_bytes: 5_000, warm_compaction_min_small_file_count: 10, }; diff --git a/compactor/src/parquet_file_lookup.rs b/compactor/src/parquet_file_lookup.rs index 8ab71138bc..6136d253af 100644 --- a/compactor/src/parquet_file_lookup.rs +++ b/compactor/src/parquet_file_lookup.rs @@ -116,19 +116,34 @@ impl ParquetFilesForCompaction { return Ok(None); } + let mut count_small_l1 = 0; for parquet_file in parquet_files { - // For cold compaction, won't proceed if at least one L0 file created after - // minutes_without_new_writes_to_be_cold - if parquet_file.compaction_level == CompactionLevel::Initial - && compaction_type == CompactionType::Cold - && parquet_file.created_at - > Timestamp::from( - compactor - .time_provider - .minutes_ago(minutes_without_new_writes_to_be_cold), - ) - { - return Ok(None); + match compaction_type { + CompactionType::Warm => { + // Count number of small L1 files + if parquet_file.compaction_level == CompactionLevel::FileNonOverlapped + && parquet_file.file_size_bytes + < compactor.config.warm_compaction_small_size_threshold_bytes + { + count_small_l1 += 1; + } + } + CompactionType::Cold => { + // won't proceed if at least one L0 file created after + // minutes_without_new_writes_to_be_cold + if parquet_file.compaction_level == CompactionLevel::Initial + && compaction_type == CompactionType::Cold + && parquet_file.created_at + > Timestamp::from( + compactor + .time_provider + .minutes_ago(minutes_without_new_writes_to_be_cold), + ) + { + return Ok(None); + } + } + CompactionType::Hot => {} } // Estimate the bytes DataFusion needs when scan this file @@ -162,6 +177,22 @@ impl ParquetFilesForCompaction { return Ok(None); } + match compaction_type { + CompactionType::Hot => { + // Only do hot compaction if there are L0 files + if level_0.is_empty() { + return Ok(None); + } + } + CompactionType::Warm => { + // Only do warm compaction if there are certain small L1 files + if count_small_l1 < compactor.config.warm_compaction_min_small_file_count { + return Ok(None); + } + } + CompactionType::Cold => {} + } + level_0.sort_by_key(|pf| pf.created_at()); level_1.sort_by_key(|pf| pf.min_time()); @@ -186,6 +217,7 @@ mod tests { const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4; const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24; + const DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24; const DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24; const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20; const DEFAULT_MINUTES_WITHOUT_NEW_WRITES: u64 = 8 * 60; @@ -271,6 +303,8 @@ mod tests { hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1, hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2, max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS, + warm_partition_candidates_hours_threshold: + DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD, warm_compaction_small_size_threshold_bytes: 5_000, warm_compaction_min_small_file_count: 10, } @@ -389,7 +423,7 @@ mod tests { let parquet_files_for_compaction = ParquetFilesForCompaction::for_partition( compactor, partition_with_info, - CompactionType::Hot, + CompactionType::Cold, ) .await .unwrap() diff --git a/compactor/src/warm.rs b/compactor/src/warm.rs index 559ecf1d50..5b0ae5c17b 100644 --- a/compactor/src/warm.rs +++ b/compactor/src/warm.rs @@ -1,14 +1,10 @@ //! Select partitions with small, adjacent L1 files and compact them use crate::{ - compact::{self, Compactor, ShardAssignment}, - compact_candidates_with_memory_budget, compact_in_parallel, - parquet_file_lookup::CompactionType, - utils::get_candidates_with_retry, - PartitionCompactionCandidateWithInfo, + compact::Compactor, compact_candidates_with_memory_budget, compact_in_parallel, + parquet_file_lookup::CompactionType, utils::get_candidates_with_retry, }; -use data_types::{CompactionLevel, PartitionParam, ShardId}; -use iox_catalog::interface::Catalog; +use data_types::CompactionLevel; use metric::Attributes; use observability_deps::tracing::*; use std::sync::Arc; @@ -17,10 +13,20 @@ use std::sync::Arc; pub async fn compact(compactor: Arc) -> usize { let compaction_type = CompactionType::Warm; + // https://github.com/influxdata/influxdb_iox/issues/6518 to remove the use of shard_id and + // simplify this + let max_num_partitions = + compactor.shards.len() * compactor.config.max_number_partitions_per_shard; + + let hours_threshold = compactor.config.warm_partition_candidates_hours_threshold; let candidates = get_candidates_with_retry( Arc::clone(&compactor), compaction_type, - |compactor_for_retry| async move { warm_partitions_to_compact(compactor_for_retry).await }, + move |compactor_for_retry| async move { + compactor_for_retry + .partitions_to_compact(compaction_type, vec![hours_threshold], max_num_partitions) + .await + }, ) .await; @@ -60,402 +66,30 @@ pub async fn compact(compactor: Arc) -> usize { n_candidates } -/// Return a list of partitions that have enough small L1 files that make warm compaction -/// worthwhile. This works by counting the number of L1 files that are smaller than -/// `max_desired_file_size_bytes / 2` -pub(crate) async fn warm_partitions_to_compact( - compactor: Arc, -) -> Result>, compact::Error> { - let compaction_type = "warm"; - - let max_number_partitions_per_shard = compactor.config.max_number_partitions_per_shard; - let mut candidates = - Vec::with_capacity(compactor.shards.len() * max_number_partitions_per_shard); - - match &compactor.shards { - ShardAssignment::All => { - let mut partitions = warm_partitions_for_shard( - Arc::clone(&compactor.catalog), - None, - compactor.config.warm_compaction_small_size_threshold_bytes, - compactor.config.warm_compaction_min_small_file_count, - max_number_partitions_per_shard, - ) - .await?; - - // Record metric for candidates per shard - let num_partitions = partitions.len(); - debug!(n = num_partitions, compaction_type, "compaction candidates",); - let attributes = Attributes::from([("partition_type", compaction_type.into())]); - let number_gauge = compactor.compaction_candidate_gauge.recorder(attributes); - number_gauge.set(num_partitions as u64); - - candidates.append(&mut partitions); - } - ShardAssignment::Only(shards) => { - for &shard_id in shards { - let mut partitions = warm_partitions_for_shard( - Arc::clone(&compactor.catalog), - Some(shard_id), - compactor.config.warm_compaction_small_size_threshold_bytes, - compactor.config.warm_compaction_min_small_file_count, - max_number_partitions_per_shard, - ) - .await?; - - // Record metric for candidates per shard - let num_partitions = partitions.len(); - debug!( - shard_id = shard_id.get(), - n = num_partitions, - compaction_type, - "compaction candidates", - ); - let attributes = Attributes::from([ - ("shard_id", format!("{}", shard_id).into()), - ("partition_type", compaction_type.into()), - ]); - let number_gauge = compactor.compaction_candidate_gauge.recorder(attributes); - number_gauge.set(num_partitions as u64); - - candidates.append(&mut partitions); - } - } - } - - // Get extra needed information for selected partitions - let start_time = compactor.time_provider.now(); - - // Column types and their counts of the tables of the partition candidates - debug!( - num_candidates=?candidates.len(), - compaction_type, - "start getting column types for the partition candidates" - ); - let table_columns = compactor.table_columns(&candidates).await?; - - // Add other compaction-needed info into selected partitions - debug!( - num_candidates=?candidates.len(), - compaction_type, - "start getting additional info for the partition candidates" - ); - let candidates = compactor - .add_info_to_partitions(&candidates, &table_columns) - .await?; - - if let Some(delta) = compactor - .time_provider - .now() - .checked_duration_since(start_time) - { - let attributes = Attributes::from(&[("partition_type", compaction_type)]); - let duration = compactor - .partitions_extra_info_reading_duration - .recorder(attributes); - duration.record(delta); - } - - Ok(candidates) -} - -async fn warm_partitions_for_shard( - catalog: Arc, - shard_id: Option, - // Upper bound on file size to be counted as "small" for warm compaction. - small_size_threshold_bytes: i64, - // Minimum number of small files a partition must have to be selected as a candidate for warm - // compaction. - min_small_file_count: usize, - // Max number of partitions per shard we want to read - max_number_partitions_per_shard: usize, -) -> Result, compact::Error> { - let mut repos = catalog.repositories().await; - - let partitions = repos - .parquet_files() - .partitions_with_small_l1_file_count( - shard_id, - small_size_threshold_bytes, - min_small_file_count, - max_number_partitions_per_shard, - ) - .await - .map_err(|e| compact::Error::PartitionsWithSmallL1Files { - shard_id, - source: e, - })?; - if !partitions.is_empty() { - debug!( - ?shard_id, - small_size_threshold_bytes, - min_small_file_count, - n = partitions.len(), - "found some partitions with small L1 files" - ); - return Ok(partitions); - } - - Ok(Vec::new()) -} - #[cfg(test)] mod tests { use super::*; - use crate::{compact::Compactor, handler::CompactorConfig}; + use crate::{ + compact::{Compactor, ShardAssignment}, + handler::CompactorConfig, + }; use arrow_util::assert_batches_sorted_eq; use backoff::BackoffConfig; use data_types::{ColumnType, CompactionLevel, TablePartition, Timestamp}; use futures::{stream::FuturesUnordered, StreamExt}; - use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestShard, TestTable}; + use iox_tests::util::{TestCatalog, TestParquetFileBuilder}; use iox_time::{SystemProvider, Time, TimeProvider}; use parquet_file::storage::{ParquetStorage, StorageId}; use std::sync::Arc; - struct TestSetup { - catalog: Arc, - shard1: Arc, - table1: Arc, - shard2: Arc, - } - - async fn test_setup() -> TestSetup { - let catalog = TestCatalog::new(); - let namespace = catalog - .create_namespace_1hr_retention("namespace_warm_compaction") - .await; - let shard1 = namespace.create_shard(1).await; - let table1 = namespace.create_table("test_table1").await; - let shard2 = namespace.create_shard(2).await; - - TestSetup { - catalog, - shard1, - table1, - shard2, - } - } - - #[tokio::test] - async fn no_partitions_no_candidates() { - let TestSetup { - catalog, shard1, .. - } = test_setup().await; - let candidates = warm_partitions_for_shard( - Arc::clone(&catalog.catalog), - Some(shard1.shard.id), - 100, - 10, - 1, - ) - .await - .unwrap(); - assert!(candidates.is_empty()); - } - - #[tokio::test] - async fn no_files_no_candidates() { - let TestSetup { - catalog, - shard1, - table1, - .. - } = test_setup().await; - table1.with_shard(&shard1).create_partition("one").await; - let candidates = warm_partitions_for_shard( - Arc::clone(&catalog.catalog), - Some(shard1.shard.id), - 100, - 10, - 1, - ) - .await - .unwrap(); - assert!(candidates.is_empty()); - } - - #[tokio::test] - async fn l0_not_returned() { - let TestSetup { - catalog, - shard1, - table1, - .. - } = test_setup().await; - let partition1 = table1.with_shard(&shard1).create_partition("one").await; - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::Initial) - .with_file_size_bytes(100); - partition1.create_parquet_file_catalog_record(builder).await; - assert_eq!(catalog.count_level_0_files(shard1.shard.id).await, 1); - let candidates = warm_partitions_for_shard( - Arc::clone(&catalog.catalog), - Some(shard1.shard.id), - 200, // anything bigger than our file will work - 1, - 1, - ) - .await - .unwrap(); - assert!(candidates.is_empty()); - } - - #[tokio::test] - async fn l2_not_returned() { - let TestSetup { - catalog, - shard1, - table1, - .. - } = test_setup().await; - let partition1 = table1.with_shard(&shard1).create_partition("one").await; - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::Final) - .with_file_size_bytes(100); - partition1.create_parquet_file_catalog_record(builder).await; - let candidates = warm_partitions_for_shard( - Arc::clone(&catalog.catalog), - Some(shard1.shard.id), - 200, // anything bigger than our file will work - 1, - 1, - ) - .await - .unwrap(); - assert!(candidates.is_empty()); - } - - #[tokio::test] - async fn only_l1_returned() { - let TestSetup { - catalog, - shard1, - table1, - .. - } = test_setup().await; - let partition1 = table1.with_shard(&shard1).create_partition("one").await; - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_file_size_bytes(100); - partition1.create_parquet_file_catalog_record(builder).await; - let candidates = warm_partitions_for_shard( - Arc::clone(&catalog.catalog), - Some(shard1.shard.id), - 200, // anything bigger than our file will work - 1, - 1, - ) - .await - .unwrap(); - assert!(!candidates.is_empty()); - } - - #[tokio::test] - async fn below_min_count_not_returned() { - let TestSetup { - catalog, - shard1, - table1, - .. - } = test_setup().await; - let partition1 = table1.with_shard(&shard1).create_partition("one").await; - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_file_size_bytes(100); - partition1.create_parquet_file_catalog_record(builder).await; - let candidates = warm_partitions_for_shard( - Arc::clone(&catalog.catalog), - Some(shard1.shard.id), - 200, - 2, // min limit is more than we have - 1, - ) - .await - .unwrap(); - assert!(candidates.is_empty()); - } - - #[tokio::test] - async fn over_size_threshold_not_returned() { - let TestSetup { - catalog, - shard1, - table1, - .. - } = test_setup().await; - let partition1 = table1.with_shard(&shard1).create_partition("one").await; - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_file_size_bytes(200); - partition1.create_parquet_file_catalog_record(builder).await; - let candidates = warm_partitions_for_shard( - Arc::clone(&catalog.catalog), - Some(shard1.shard.id), - 100, // note, smaller than our file - 1, - 1, - ) - .await - .unwrap(); - assert!(candidates.is_empty()); - } - - #[tokio::test] - async fn mixed_partition_but_warm_and_returned() { - let TestSetup { - catalog, - shard1, - table1, - .. - } = test_setup().await; - - // create a partition that meets the criteria for being warm - let partition1 = table1.with_shard(&shard1).create_partition("one").await; - // create three small files - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_file_size_bytes(100); - partition1 - .create_parquet_file_catalog_record(builder.clone()) - .await; - partition1 - .create_parquet_file_catalog_record(builder.clone()) - .await; - partition1.create_parquet_file_catalog_record(builder).await; - // create one file that's too big - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_file_size_bytes(1000); - partition1.create_parquet_file_catalog_record(builder).await; - // create two files that aren't L1 - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::Initial) - .with_file_size_bytes(200); - partition1.create_parquet_file_catalog_record(builder).await; - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::Final) - .with_file_size_bytes(200); - partition1.create_parquet_file_catalog_record(builder).await; - // create two more small files, for a total of five now - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_file_size_bytes(100); - partition1 - .create_parquet_file_catalog_record(builder.clone()) - .await; - partition1.create_parquet_file_catalog_record(builder).await; - let candidates = warm_partitions_for_shard( - Arc::clone(&catalog.catalog), - Some(shard1.shard.id), - 500, - // recall that we created 5 small files above - 5, - 10, - ) - .await - .unwrap(); - assert_eq!(candidates.len(), 1); - } + const DEFAULT_MAX_NUM_PARTITION_CANDIDATES: usize = 100; + const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4; + const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24; + const DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24; + const DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD: u64 = 24; + const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20; + const DEFAULT_MINUTES_WITHOUT_NEW_WRITES: u64 = 8 * 60; + const DEFAULT_MIN_ROWS_ALLOCATED: u64 = 100; fn make_compactor_config( max_desired_file_size_bytes: u64, @@ -467,185 +101,27 @@ mod tests { max_desired_file_size_bytes, percentage_max_file_size, split_percentage: 80, - max_number_partitions_per_shard: 1, + max_number_partitions_per_shard: DEFAULT_MAX_NUM_PARTITION_CANDIDATES, min_number_recent_ingested_files_per_partition: 1, hot_multiple: 4, warm_multiple: 1, memory_budget_bytes: 10 * 1024 * 1024, - min_num_rows_allocated_per_record_batch_to_datafusion_plan: 100, + min_num_rows_allocated_per_record_batch_to_datafusion_plan: DEFAULT_MIN_ROWS_ALLOCATED, max_num_compacting_files: 20, max_num_compacting_files_first_in_partition: 40, - minutes_without_new_writes_to_be_cold: 10, - cold_partition_candidates_hours_threshold: 24, - hot_compaction_hours_threshold_1: 4, - hot_compaction_hours_threshold_2: 24, - max_parallel_partitions: 20, + minutes_without_new_writes_to_be_cold: DEFAULT_MINUTES_WITHOUT_NEW_WRITES, + cold_partition_candidates_hours_threshold: + DEFAULT_COLD_PARTITION_CANDIDATES_HOURS_THRESHOLD, + hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1, + hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2, + max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS, + warm_partition_candidates_hours_threshold: + DEFAULT_WARM_PARTITION_CANDIDATES_HOURS_THRESHOLD, warm_compaction_small_size_threshold_bytes, warm_compaction_min_small_file_count, } } - #[tokio::test] - async fn test_warm_partitions_to_compact() { - let TestSetup { - catalog, - shard1, - table1, - shard2, - } = test_setup().await; - - // Shard 1: 7 initally empty partitions - let partition1 = table1.with_shard(&shard1).create_partition("waḥid").await; - let partition2 = table1.with_shard(&shard1).create_partition("ʾiṯnān").await; - let partition3 = table1.with_shard(&shard1).create_partition("ṯalāṯah").await; - let partition4 = table1 - .with_shard(&shard1) - .create_partition("ʾarbaʿah") - .await; - let partition5 = table1.with_shard(&shard1).create_partition("ḫamsah").await; - let partition6 = table1.with_shard(&shard1).create_partition("sittah").await; - let partition7 = table1 - .with_shard(&shard2) - .create_partition_with_sort_key("sabʿah", &["tag1", "time"]) - .await; - - // Create a compactor - let time_provider = Arc::clone(&catalog.time_provider); - let config = make_compactor_config(10_000, 5_000, 10, 30); - let compactor = Arc::new(Compactor::new( - ShardAssignment::Only(vec![shard1.shard.id, shard2.shard.id]), - Arc::clone(&catalog.catalog), - ParquetStorage::new(Arc::clone(&catalog.object_store), StorageId::from("iox")), - catalog.exec(), - time_provider, - BackoffConfig::default(), - config, - Arc::new(metric::Registry::new()), - )); - - // This test is an integration test that covers the priority of the candidate selection - // algorithm when there are many files of different kinds across many partitions. - // Warm compaction candidate selection doesn't care about creation time. - - // partition1 has a deleted L1, isn't returned - let builder = TestParquetFileBuilder::default() - .with_to_delete(true) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_file_size_bytes(100); - let _pf1 = partition1.create_parquet_file_catalog_record(builder).await; - - // partition2 has a non-L1 file, isn't returned - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::Initial) - .with_file_size_bytes(100); - let _pf2 = partition2.create_parquet_file_catalog_record(builder).await; - - // partition3 has too few small L1 files (min. is 10), isn't returned - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_file_size_bytes(100); - for _n in 1..=9 { - let _pf3 = partition3 - .create_parquet_file_catalog_record(builder.clone()) - .await; - } - - // partition4 has plenty of files but not enough small ones, isn't returned - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_file_size_bytes(100); - for _n in 1..=9 { - let _pf4 = partition4 - .create_parquet_file_catalog_record(builder.clone()) - .await; - } - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_file_size_bytes(10000); // over the threshold of 5000 - for _n in 1..=2 { - let _pf4 = partition4 - .create_parquet_file_catalog_record(builder.clone()) - .await; - } - - // partition5 has plenty of files but not enough small L1s, isn't returned - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_file_size_bytes(100); - for _n in 1..=9 { - let _pf5 = partition5 - .create_parquet_file_catalog_record(builder.clone()) - .await; - } - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::Initial) - .with_file_size_bytes(100); - for _n in 1..=2 { - let _pf5 = partition5 - .create_parquet_file_catalog_record(builder.clone()) - .await; - } - - // partition6 has many small L1 files, is returned - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_file_size_bytes(100); - for _n in 1..=10 { - let _pf6 = partition6 - .create_parquet_file_catalog_record(builder.clone()) - .await; - } - - // partition7 has many small L1 files, plus some other things, is returned - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_file_size_bytes(100); - for _n in 1..=10 { - let _pf7 = partition7 - .create_parquet_file_catalog_record(builder.clone()) - .await; - } - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::Initial) - .with_file_size_bytes(100); - for _n in 1..=2 { - let _pf7 = partition7 - .create_parquet_file_catalog_record(builder.clone()) - .await; - } - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::Final) - .with_file_size_bytes(100); - for _n in 1..=2 { - let _pf7 = partition7 - .create_parquet_file_catalog_record(builder.clone()) - .await; - } - let builder = TestParquetFileBuilder::default() - .with_compaction_level(CompactionLevel::Final) - .with_file_size_bytes(10000); - for _n in 1..=2 { - let _pf7 = partition7 - .create_parquet_file_catalog_record(builder.clone()) - .await; - } - - // Will have 2 candidates, one for each shard - let mut candidates = warm_partitions_to_compact(Arc::clone(&compactor)) - .await - .unwrap(); - candidates.sort_by_key(|c| c.candidate); - assert_eq!(candidates.len(), 2); - - assert_eq!(candidates[0].id(), partition6.partition.id); - // this sort key is None - assert_eq!(candidates[0].sort_key, partition6.partition.sort_key()); - - assert_eq!(candidates[1].id(), partition7.partition.id); - // this sort key is Some(tag1, time) - assert_eq!(candidates[1].sort_key, partition7.partition.sort_key()); - } - struct TestFileToCreate { lp: String, max_seq: i64, @@ -769,7 +245,11 @@ mod tests { // there will only be 1 because all files in the same partition let compactor = Arc::new(compactor); - let partition_candidates = warm_partitions_to_compact(Arc::clone(&compactor)) + let compaction_type = CompactionType::Warm; + let hour_threshold = compactor.config.warm_partition_candidates_hours_threshold; + let max_num_partitions = compactor.config.max_number_partitions_per_shard; + let partition_candidates = compactor + .partitions_to_compact(compaction_type, vec![hour_threshold], max_num_partitions) .await .unwrap(); assert_eq!(partition_candidates.len(), 1); @@ -967,7 +447,11 @@ mod tests { // there will only be 1 because all files in the same partition let compactor = Arc::new(compactor); - let partition_candidates = warm_partitions_to_compact(Arc::clone(&compactor)) + let compaction_type = CompactionType::Warm; + let hour_threshold = compactor.config.warm_partition_candidates_hours_threshold; + let max_num_partitions = compactor.config.max_number_partitions_per_shard; + let partition_candidates = compactor + .partitions_to_compact(compaction_type, vec![hour_threshold], max_num_partitions) .await .unwrap(); assert_eq!(partition_candidates.len(), 1); @@ -1160,7 +644,11 @@ mod tests { // there will only be 1 because all files in the same partition let compactor = Arc::new(compactor); - let partition_candidates = warm_partitions_to_compact(Arc::clone(&compactor)) + let compaction_type = CompactionType::Warm; + let hour_threshold = compactor.config.warm_partition_candidates_hours_threshold; + let max_num_partitions = compactor.config.max_number_partitions_per_shard; + let partition_candidates = compactor + .partitions_to_compact(compaction_type, vec![hour_threshold], max_num_partitions) .await .unwrap(); assert_eq!(partition_candidates.len(), 1); @@ -1340,7 +828,11 @@ mod tests { // there will only be 1 because all files in the same partition let compactor = Arc::new(compactor); - let partition_candidates = warm_partitions_to_compact(Arc::clone(&compactor)) + let compaction_type = CompactionType::Warm; + let hour_threshold = compactor.config.warm_partition_candidates_hours_threshold; + let max_num_partitions = compactor.config.max_number_partitions_per_shard; + let partition_candidates = compactor + .partitions_to_compact(compaction_type, vec![hour_threshold], max_num_partitions) .await .unwrap(); assert_eq!(partition_candidates.len(), 1); diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 13c98ef80f..f71f9649e1 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -461,6 +461,7 @@ impl Config { hot_compaction_hours_threshold_1: 4, hot_compaction_hours_threshold_2: 24, max_parallel_partitions: 20, + warm_partition_candidates_hours_threshold: 24, warm_compaction_small_size_threshold_bytes: 15_000, warm_compaction_min_small_file_count: 10, }; diff --git a/ioxd_compactor/src/lib.rs b/ioxd_compactor/src/lib.rs index 6983ae2bdf..ffa67aa945 100644 --- a/ioxd_compactor/src/lib.rs +++ b/ioxd_compactor/src/lib.rs @@ -213,6 +213,7 @@ pub async fn build_compactor2_from_config( hot_compaction_hours_threshold_1, hot_compaction_hours_threshold_2, max_parallel_partitions, + warm_partition_candidates_hours_threshold, warm_compaction_small_size_threshold_bytes, warm_compaction_min_small_file_count, } = compactor_config; @@ -234,6 +235,7 @@ pub async fn build_compactor2_from_config( hot_compaction_hours_threshold_1, hot_compaction_hours_threshold_2, max_parallel_partitions, + warm_partition_candidates_hours_threshold, warm_compaction_small_size_threshold_bytes, warm_compaction_min_small_file_count, }; @@ -358,6 +360,7 @@ pub async fn build_compactor_from_config( hot_compaction_hours_threshold_1, hot_compaction_hours_threshold_2, max_parallel_partitions, + warm_partition_candidates_hours_threshold, warm_compaction_small_size_threshold_bytes, warm_compaction_min_small_file_count, .. @@ -380,6 +383,7 @@ pub async fn build_compactor_from_config( hot_compaction_hours_threshold_1, hot_compaction_hours_threshold_2, max_parallel_partitions, + warm_partition_candidates_hours_threshold, warm_compaction_small_size_threshold_bytes, warm_compaction_min_small_file_count, };