From 77cbc880f636391b6e15762aa00a958a4790fa14 Mon Sep 17 00:00:00 2001 From: Nga Tran Date: Thu, 1 Dec 2022 16:23:44 -0500 Subject: [PATCH] feat: Add cap limit on number of partitions to be compacted in parallel (#6305) * feat: Add cap limit on number of partitions to be comapcted in parallel * chore: cleanup * chore: clearer comments --- clap_blocks/src/compactor.rs | 13 + compactor/src/cold.rs | 2 + compactor/src/compact.rs | 2 + compactor/src/handler.rs | 6 + compactor/src/hot.rs | 2 + compactor/src/lib.rs | 445 ++++++++++++-------- influxdb_iox/src/commands/run/all_in_one.rs | 1 + ioxd_compactor/src/lib.rs | 2 + 8 files changed, 303 insertions(+), 170 deletions(-) diff --git a/clap_blocks/src/compactor.rs b/clap_blocks/src/compactor.rs index 7732c6b785..78eed150c8 100644 --- a/clap_blocks/src/compactor.rs +++ b/clap_blocks/src/compactor.rs @@ -208,6 +208,18 @@ macro_rules! gen_compactor_config { action )] pub hot_compaction_hours_threshold_2: u64, + + /// Max number of partitions that can be compacted in parallel at once + /// We use memory budget to estimate how many partitions can be compacted in parallel at once. + /// However, we do not want to have that number too large which will cause the high usage of CPU cores + /// and may also lead to inaccuracy of memory estimation. This number is to cap that. + #[clap( + long = "compaction-max-parallel-partitions", + env = "INFLUXDB_IOX_COMPACTION_MAX_PARALLEL_PARTITIONS", + default_value = "20", + action + )] + pub max_parallel_partitions: u64, } }; } @@ -239,6 +251,7 @@ impl CompactorOnceConfig { minutes_without_new_writes_to_be_cold: self.minutes_without_new_writes_to_be_cold, hot_compaction_hours_threshold_1: self.hot_compaction_hours_threshold_1, hot_compaction_hours_threshold_2: self.hot_compaction_hours_threshold_2, + max_parallel_partitions: self.max_parallel_partitions, } } } diff --git a/compactor/src/cold.rs b/compactor/src/cold.rs index 7191e08e6e..30b1a5e47e 100644 --- a/compactor/src/cold.rs +++ b/compactor/src/cold.rs @@ -114,6 +114,7 @@ mod tests { const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4; const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24; + const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20; #[tokio::test] async fn test_compact_remaining_level_0_files_many_files() { @@ -710,6 +711,7 @@ mod tests { minutes_without_new_writes_to_be_cold: 10, hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1, hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2, + max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS, } } diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 5fb9d49245..cdad6bbe78 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -562,6 +562,7 @@ pub mod tests { const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4; const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24; + const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20; impl PartitionCompactionCandidateWithInfo { pub(crate) async fn from_test_partition(test_partition: &TestPartition) -> Self { @@ -697,6 +698,7 @@ pub mod tests { minutes_without_new_writes_to_be_cold: 10, hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1, hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2, + max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS, } } diff --git a/compactor/src/handler.rs b/compactor/src/handler.rs index 2d5b4f1cc3..e51940ee59 100644 --- a/compactor/src/handler.rs +++ b/compactor/src/handler.rs @@ -173,6 +173,12 @@ pub struct CompactorConfig { /// When querying for partitions with data for hot compaction, how many hours to look /// back for a second pass if we found nothing in the first pass. pub hot_compaction_hours_threshold_2: u64, + + /// Max number of partitions that can be compacted in parallel at once + /// We use memory budget to estimate how many partitions can be compacted in parallel at once. + /// However, we do not want to have that number too large which will cause the high usage of CPU cores + /// and may also lead to inaccuracy of memory estimation. This number is to cap that. + pub max_parallel_partitions: u64, } /// How long to pause before checking for more work again if there was diff --git a/compactor/src/hot.rs b/compactor/src/hot.rs index af4cfbfd4a..447a841e58 100644 --- a/compactor/src/hot.rs +++ b/compactor/src/hot.rs @@ -224,6 +224,7 @@ mod tests { const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4; const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24; + const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20; struct TestSetup { catalog: Arc, @@ -544,6 +545,7 @@ mod tests { minutes_without_new_writes_to_be_cold: 10, hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1, hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2, + max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS, }; let compactor = Arc::new(Compactor::new( vec![shard1.shard.id, shard2.shard.id], diff --git a/compactor/src/lib.rs b/compactor/src/lib.rs index b867b3ce87..d44de50bf2 100644 --- a/compactor/src/lib.rs +++ b/compactor/src/lib.rs @@ -238,18 +238,22 @@ async fn compact_candidates_with_memory_budget( } // -------------------------------------------------------------------- - // 4. Almost hitting max budget (only 10% left) - // OR no more candidates - // OR already considered all remaining candidates. + // 4. Let compact the candidates that are in parallel_compacting_candidates if one of this condition hits: + // . candidates in parallel_compacting_candidates consume almost all the budget + // . no more candidates + // . already considered all remaining candidates. + // . hit the max number of partitions to compact in parallel if (!parallel_compacting_candidates.is_empty()) && ((remaining_budget_bytes <= (compactor.config.memory_budget_bytes / 10) as u64) || (candidates.is_empty()) - || (count == num_remaining_candidates)) + || (count == num_remaining_candidates) + || (count as u64 == compactor.config.max_parallel_partitions)) { debug!( num_parallel_compacting_candidates = parallel_compacting_candidates.len(), total_needed_memory_budget_bytes = compactor.config.memory_budget_bytes - remaining_budget_bytes, + config_max_parallel_partitions = compactor.config.max_parallel_partitions, compaction_type, "parallel compacting candidate" ); @@ -452,7 +456,9 @@ pub mod tests { use arrow_util::assert_batches_sorted_eq; use backoff::BackoffConfig; use data_types::{ColumnType, CompactionLevel, ParquetFileId}; - use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestShard, TestTable}; + use iox_tests::util::{ + TestCatalog, TestParquetFileBuilder, TestPartition, TestShard, TestTable, + }; use iox_time::{SystemProvider, TimeProvider}; use std::{ collections::{HashMap, VecDeque}, @@ -462,6 +468,7 @@ pub mod tests { const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1: u64 = 4; const DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2: u64 = 24; + const DEFAULT_MAX_PARALLEL_PARTITIONS: u64 = 20; // In tests that are verifying successful compaction not affected by the memory budget, this // converts a `parquet_file_filtering::FilteredFiles` that has a `filter_result` of @@ -500,7 +507,7 @@ pub mod tests { compactor, mock_compactor, .. - } = test_setup(14350).await; + } = test_setup(14350, 20).await; let sorted_candidates = VecDeque::new(); @@ -563,7 +570,7 @@ pub mod tests { } } - fn make_compactor_config(budget: u64) -> CompactorConfig { + fn make_compactor_config(budget: u64, max_parallel_jobs: u64) -> CompactorConfig { // All numbers in here are chosen carefully for many tests. // Change them will break the tests CompactorConfig { @@ -580,6 +587,7 @@ pub mod tests { minutes_without_new_writes_to_be_cold: 10, hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1, hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2, + max_parallel_partitions: max_parallel_jobs, } } @@ -591,10 +599,10 @@ pub mod tests { } pub(crate) async fn test_setup_with_default_budget() -> TestSetup { - test_setup(14350).await + test_setup(14350, 20).await } - pub(crate) async fn test_setup(budget: u64) -> TestSetup { + pub(crate) async fn test_setup(budget: u64, max_parallel_jobs: u64) -> TestSetup { let catalog = TestCatalog::new(); let namespace = catalog .create_namespace_1hr_retention("namespace_hot_partitions_to_compact") @@ -617,7 +625,7 @@ pub mod tests { // Create a compactor let time_provider = Arc::new(SystemProvider::new()); - let config = make_compactor_config(budget); + let config = make_compactor_config(budget, max_parallel_jobs); let compactor = Arc::new(Compactor::new( vec![shard.shard.id], Arc::clone(&catalog.catalog), @@ -640,161 +648,11 @@ pub mod tests { } #[tokio::test] - async fn test_hot_compact_candidates_with_memory_budget() { + async fn test_hot_compact_candidates_with_limit_memory_budget() { test_helpers::maybe_start_logging(); - let TestSetup { - compactor, - mock_compactor, - shard, - table, - .. - } = test_setup(14350).await; - - // Some times in the past to set to created_at of the files - let hot_time_one_hour_ago = compactor.time_provider.hours_ago(1); - - // P1: - // L0 2 rows. bytes: 2,250 - // L1 2 rows. bytes: 2,250 - // total = 2,250 + 2,250 = 4,500 - let partition1 = table.with_shard(&shard).create_partition("one").await; - - // 2 files with IDs 1 and 2 - let pf1_1 = TestParquetFileBuilder::default() - .with_min_time(1) - .with_max_time(5) - .with_row_count(2) - .with_compaction_level(CompactionLevel::Initial) - .with_creation_time(hot_time_one_hour_ago); - partition1.create_parquet_file_catalog_record(pf1_1).await; - - let pf1_2 = TestParquetFileBuilder::default() - .with_min_time(4) // overlapped with pf1_1 - .with_max_time(6) - .with_row_count(2) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_creation_time(hot_time_one_hour_ago); - partition1.create_parquet_file_catalog_record(pf1_2).await; - - // P2: - // L0 2 rows. bytes: 2,250 - // L1 2 rows. bytes: 2,250 - // total = 2,250 + 2,250 = 4,500 - let partition2 = table.with_shard(&shard).create_partition("two").await; - - // 2 files with IDs 3 and 4 - let pf2_1 = TestParquetFileBuilder::default() - .with_min_time(1) - .with_max_time(5) - .with_row_count(2) - .with_compaction_level(CompactionLevel::Initial) - .with_creation_time(hot_time_one_hour_ago); - partition2.create_parquet_file_catalog_record(pf2_1).await; - - let pf2_2 = TestParquetFileBuilder::default() - .with_min_time(4) // overlapped with pf2_1 - .with_max_time(6) - .with_row_count(2) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_creation_time(hot_time_one_hour_ago); - partition2.create_parquet_file_catalog_record(pf2_2).await; - - // P3: bytes >= 90% of full budget = 90% * 14,350 = 12,915 - // L0 40 rows. bytes: 2,250 - // Five L1s. bytes: 2,250 each - // total = 2,250 * 6 = 13,500 - let partition3 = table.with_shard(&shard).create_partition("three").await; - - // 6 files with IDs 5, 6, 7, 8, 9, 10 - let pf3_1 = TestParquetFileBuilder::default() - .with_min_time(1) - .with_max_time(6) - .with_row_count(40) - .with_compaction_level(CompactionLevel::Initial) - .with_creation_time(hot_time_one_hour_ago); - partition3.create_parquet_file_catalog_record(pf3_1).await; - - // Five overlapped L1 files - for i in 1..6 { - let pf3_i = TestParquetFileBuilder::default() - .with_min_time(i) // overlapped with pf3_1 - .with_max_time(i) - .with_row_count(24) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_creation_time(hot_time_one_hour_ago); - partition3.create_parquet_file_catalog_record(pf3_i).await; - } - - // P4: Over the full budget - // L0 40 rows. bytes: 2,250 - // Six L1s. bytes: 2,250 each - // total = 2,250 * 7 = 15,750 > 14350 - let partition4 = table.with_shard(&shard).create_partition("four").await; - - // 7 files with IDs 11, 12, 13, 14, 15, 16, 17 - let pf4_1 = TestParquetFileBuilder::default() - .with_min_time(1) - .with_max_time(7) - .with_row_count(70) - .with_compaction_level(CompactionLevel::Initial) - .with_creation_time(hot_time_one_hour_ago); - partition4.create_parquet_file_catalog_record(pf4_1).await; - - // Six overlapped L1 files - for i in 1..7 { - let pf4_i = TestParquetFileBuilder::default() - .with_min_time(i) // overlapped with pf4_1 - .with_max_time(i) - .with_row_count(40) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_creation_time(hot_time_one_hour_ago); - partition4.create_parquet_file_catalog_record(pf4_i).await; - } - - // P5: - // L0 2 rows. bytes: 2,250 - // L1 2 rows. bytes: 2,250 - // total = 2,250 + 2,250 = 4,500 - let partition5 = table.with_shard(&shard).create_partition("five").await; - // 2 files with IDs 18, 19 - let pf5_1 = TestParquetFileBuilder::default() - .with_min_time(1) - .with_max_time(5) - .with_row_count(2) - .with_compaction_level(CompactionLevel::Initial) - .with_creation_time(hot_time_one_hour_ago); - partition5.create_parquet_file_catalog_record(pf5_1).await; - - let pf5_2 = TestParquetFileBuilder::default() - .with_min_time(4) // overlapped with pf5_1 - .with_max_time(6) - .with_row_count(2) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_creation_time(hot_time_one_hour_ago); - partition5.create_parquet_file_catalog_record(pf5_2).await; - - // P6: - // L0 2 rows. bytes: 2,250 - // L1 2 rows. bytes: 2,250 - // total = 2,250 + 2,250 = 4,500 - let partition6 = table.with_shard(&shard).create_partition("six").await; - // 2 files with IDs 20, 21 - let pf6_1 = TestParquetFileBuilder::default() - .with_min_time(1) - .with_max_time(5) - .with_row_count(2) - .with_compaction_level(CompactionLevel::Initial) - .with_creation_time(hot_time_one_hour_ago); - partition6.create_parquet_file_catalog_record(pf6_1).await; - - let pf6_2 = TestParquetFileBuilder::default() - .with_min_time(4) // overlapped with pf6_1 - .with_max_time(6) - .with_row_count(2) - .with_compaction_level(CompactionLevel::FileNonOverlapped) - .with_creation_time(hot_time_one_hour_ago); - partition6.create_parquet_file_catalog_record(pf6_2).await; + // test setup with limit memory budget, 14350, and very large (aka unlimited in this test) max_parallel_jobs, 200 + let (compactor, mock_compactor, partitions) = make_6_partitions(14350, 200).await; // partition candidates: partitions with L0 and overlapped L1 let mut candidates = hot::hot_partitions_to_compact(Arc::clone(&compactor)) @@ -838,19 +696,19 @@ pub mod tests { assert_eq!(group1.len(), 3); let g1_candidate1 = &group1[0]; - assert_eq!(g1_candidate1.partition.id(), partition1.partition.id); + assert_eq!(g1_candidate1.partition.id(), partitions[0].partition.id); let g1_candidate1_pf_ids: Vec<_> = g1_candidate1.files.iter().map(|pf| pf.id().get()).collect(); assert_eq!(g1_candidate1_pf_ids, vec![2, 1]); let g1_candidate2 = &group1[1]; - assert_eq!(g1_candidate2.partition.id(), partition2.partition.id); + assert_eq!(g1_candidate2.partition.id(), partitions[1].partition.id); let g1_candidate2_pf_ids: Vec<_> = g1_candidate2.files.iter().map(|pf| pf.id().get()).collect(); assert_eq!(g1_candidate2_pf_ids, vec![4, 3]); let g1_candidate3 = &group1[2]; - assert_eq!(g1_candidate3.partition.id(), partition5.partition.id); + assert_eq!(g1_candidate3.partition.id(), partitions[4].partition.id); let g1_candidate3_pf_ids: Vec<_> = g1_candidate3.files.iter().map(|pf| pf.id().get()).collect(); assert_eq!(g1_candidate3_pf_ids, vec![19, 18]); @@ -860,7 +718,7 @@ pub mod tests { assert_eq!(group2.len(), 1); let g2_candidate1 = &group2[0]; - assert_eq!(g2_candidate1.partition.id(), partition6.partition.id); + assert_eq!(g2_candidate1.partition.id(), partitions[5].partition.id); let g2_candidate1_pf_ids: Vec<_> = g2_candidate1.files.iter().map(|pf| pf.id().get()).collect(); assert_eq!(g2_candidate1_pf_ids, vec![21, 20]); @@ -870,21 +728,99 @@ pub mod tests { assert_eq!(group3.len(), 1); let g3_candidate1 = &group3[0]; - assert_eq!(g3_candidate1.partition.id(), partition3.partition.id); + assert_eq!(g3_candidate1.partition.id(), partitions[2].partition.id); let g3_candidate1_pf_ids: Vec<_> = g3_candidate1.files.iter().map(|pf| pf.id().get()).collect(); - // all IDs of level-1 firts then level-0 + // all IDs of level-1 first then level-0 assert_eq!(g3_candidate1_pf_ids, vec![6, 7, 8, 9, 10, 5]); { let mut repos = compactor.catalog.repositories().await; let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap(); assert_eq!(skipped_compactions.len(), 1); - assert_eq!(skipped_compactions[0].partition_id, partition4.partition.id); + assert_eq!( + skipped_compactions[0].partition_id, + partitions[3].partition.id + ); assert_eq!(skipped_compactions[0].reason, "over memory budget"); } } + #[tokio::test] + async fn test_hot_compact_candidates_with_limit_parallel_jobs() { + test_helpers::maybe_start_logging(); + + // tes setup with plenty of memory budget 1GB (aka unlimited) but limit to 2 parallel jobs + let (compactor, mock_compactor, partitions) = + make_6_partitions(1024 * 1024 * 1024, 2).await; + + // partition candidates: partitions with L0 and overlapped L1 + let mut candidates = hot::hot_partitions_to_compact(Arc::clone(&compactor)) + .await + .unwrap(); + assert_eq!(candidates.len(), 6); + candidates.sort_by_key(|c| c.candidate.partition_id); + { + let mut repos = compactor.catalog.repositories().await; + let skipped_compactions = repos.partitions().list_skipped_compactions().await.unwrap(); + assert!( + skipped_compactions.is_empty(), + "Expected no skipped compactions, got: {skipped_compactions:?}" + ); + } + + // There are 3 rounds of parallel compaction: + // + // * Round 1: 2 candidates [P1, P2] + // * Round 2: 2 candidate [P3, P4] + // * Round 3: 1 candidate [P5, P6] + + compact_candidates_with_memory_budget( + Arc::clone(&compactor), + "hot", + CompactionLevel::Initial, + mock_compactor.compaction_function(), + true, + candidates.into(), + ) + .await; + + let compaction_groups = mock_compactor.results(); + + // 3 rounds of parallel compaction + assert_eq!(compaction_groups.len(), 3); + + // Round 1 + let group1 = &compaction_groups[0]; + assert_eq!(group1.len(), 2); + + let g1_candidate1 = &group1[0]; + assert_eq!(g1_candidate1.partition.id(), partitions[0].partition.id); + + let g1_candidate2 = &group1[1]; + assert_eq!(g1_candidate2.partition.id(), partitions[1].partition.id); + + // Round 2 + let group2 = &compaction_groups[1]; + assert_eq!(group2.len(), 2); + + let g2_candidate1 = &group2[0]; + assert_eq!(g2_candidate1.partition.id(), partitions[2].partition.id); + + let g2_candidate2 = &group2[1]; + assert_eq!(g2_candidate2.partition.id(), partitions[3].partition.id); + + // Round 3 + let group3 = &compaction_groups[2]; + assert_eq!(group3.len(), 2); + + let g3_candidate1 = &group3[0]; + assert_eq!(g3_candidate1.partition.id(), partitions[4].partition.id); + + let g3_candidate2 = &group3[1]; + assert_eq!(g3_candidate2.partition.id(), partitions[5].partition.id); + } + // A quite sophisticated integration test of compacting one hot partition // Beside lp data, every value min/max sequence numbers and min/max time are important // to have a combination of needed tests in this test function @@ -962,6 +898,7 @@ pub mod tests { minutes_without_new_writes_to_be_cold: 10, hot_compaction_hours_threshold_1: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_1, hot_compaction_hours_threshold_2: DEFAULT_HOT_COMPACTION_HOURS_THRESHOLD_2, + max_parallel_partitions: DEFAULT_MAX_PARALLEL_PARTITIONS, }; let metrics = Arc::new(metric::Registry::new()); @@ -1153,4 +1090,172 @@ pub mod tests { &batches ); } + + async fn make_6_partitions( + budget: u64, + max_parallel_jobs: u64, + ) -> (Arc, MockCompactor, Vec>) { + let TestSetup { + compactor, + mock_compactor, + shard, + table, + .. + } = test_setup(budget, max_parallel_jobs).await; + + // Some times in the past to set to created_at of the files + let hot_time_one_hour_ago = compactor.time_provider.hours_ago(1); + + let mut partitions = Vec::with_capacity(6); + + // P1: + // L0 2 rows. bytes: 2,250 + // L1 2 rows. bytes: 2,250 + // total = 2,250 + 2,250 = 4,500 + let partition1 = table.with_shard(&shard).create_partition("one").await; + + // 2 files with IDs 1 and 2 + let pf1_1 = TestParquetFileBuilder::default() + .with_min_time(1) + .with_max_time(5) + .with_row_count(2) + .with_compaction_level(CompactionLevel::Initial) + .with_creation_time(hot_time_one_hour_ago); + partition1.create_parquet_file_catalog_record(pf1_1).await; + + let pf1_2 = TestParquetFileBuilder::default() + .with_min_time(4) // overlapped with pf1_1 + .with_max_time(6) + .with_row_count(2) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_creation_time(hot_time_one_hour_ago); + partition1.create_parquet_file_catalog_record(pf1_2).await; + partitions.push(partition1); + + // P2: + // L0 2 rows. bytes: 2,250 + // L1 2 rows. bytes: 2,250 + // total = 2,250 + 2,250 = 4,500 + let partition2 = table.with_shard(&shard).create_partition("two").await; + + // 2 files with IDs 3 and 4 + let pf2_1 = TestParquetFileBuilder::default() + .with_min_time(1) + .with_max_time(5) + .with_row_count(2) + .with_compaction_level(CompactionLevel::Initial) + .with_creation_time(hot_time_one_hour_ago); + partition2.create_parquet_file_catalog_record(pf2_1).await; + + let pf2_2 = TestParquetFileBuilder::default() + .with_min_time(4) // overlapped with pf2_1 + .with_max_time(6) + .with_row_count(2) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_creation_time(hot_time_one_hour_ago); + partition2.create_parquet_file_catalog_record(pf2_2).await; + partitions.push(partition2); + + // P3: bytes >= 90% of full budget = 90% * 14,350 = 12,915 + // L0 40 rows. bytes: 2,250 + // Five L1s. bytes: 2,250 each + // total = 2,250 * 6 = 13,500 + let partition3 = table.with_shard(&shard).create_partition("three").await; + + // 6 files with IDs 5, 6, 7, 8, 9, 10 + let pf3_1 = TestParquetFileBuilder::default() + .with_min_time(1) + .with_max_time(6) + .with_row_count(40) + .with_compaction_level(CompactionLevel::Initial) + .with_creation_time(hot_time_one_hour_ago); + partition3.create_parquet_file_catalog_record(pf3_1).await; + + // Five overlapped L1 files + for i in 1..6 { + let pf3_i = TestParquetFileBuilder::default() + .with_min_time(i) // overlapped with pf3_1 + .with_max_time(i) + .with_row_count(24) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_creation_time(hot_time_one_hour_ago); + partition3.create_parquet_file_catalog_record(pf3_i).await; + } + partitions.push(partition3); + + // P4: Over the full budget + // L0 40 rows. bytes: 2,250 + // Six L1s. bytes: 2,250 each + // total = 2,250 * 7 = 15,750 > 14350 + let partition4 = table.with_shard(&shard).create_partition("four").await; + + // 7 files with IDs 11, 12, 13, 14, 15, 16, 17 + let pf4_1 = TestParquetFileBuilder::default() + .with_min_time(1) + .with_max_time(7) + .with_row_count(70) + .with_compaction_level(CompactionLevel::Initial) + .with_creation_time(hot_time_one_hour_ago); + partition4.create_parquet_file_catalog_record(pf4_1).await; + + // Six overlapped L1 files + for i in 1..7 { + let pf4_i = TestParquetFileBuilder::default() + .with_min_time(i) // overlapped with pf4_1 + .with_max_time(i) + .with_row_count(40) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_creation_time(hot_time_one_hour_ago); + partition4.create_parquet_file_catalog_record(pf4_i).await; + } + partitions.push(partition4); + + // P5: + // L0 2 rows. bytes: 2,250 + // L1 2 rows. bytes: 2,250 + // total = 2,250 + 2,250 = 4,500 + let partition5 = table.with_shard(&shard).create_partition("five").await; + // 2 files with IDs 18, 19 + let pf5_1 = TestParquetFileBuilder::default() + .with_min_time(1) + .with_max_time(5) + .with_row_count(2) + .with_compaction_level(CompactionLevel::Initial) + .with_creation_time(hot_time_one_hour_ago); + partition5.create_parquet_file_catalog_record(pf5_1).await; + + let pf5_2 = TestParquetFileBuilder::default() + .with_min_time(4) // overlapped with pf5_1 + .with_max_time(6) + .with_row_count(2) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_creation_time(hot_time_one_hour_ago); + partition5.create_parquet_file_catalog_record(pf5_2).await; + partitions.push(partition5); + + // P6: + // L0 2 rows. bytes: 2,250 + // L1 2 rows. bytes: 2,250 + // total = 2,250 + 2,250 = 4,500 + let partition6 = table.with_shard(&shard).create_partition("six").await; + // 2 files with IDs 20, 21 + let pf6_1 = TestParquetFileBuilder::default() + .with_min_time(1) + .with_max_time(5) + .with_row_count(2) + .with_compaction_level(CompactionLevel::Initial) + .with_creation_time(hot_time_one_hour_ago); + partition6.create_parquet_file_catalog_record(pf6_1).await; + + let pf6_2 = TestParquetFileBuilder::default() + .with_min_time(4) // overlapped with pf6_1 + .with_max_time(6) + .with_row_count(2) + .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_creation_time(hot_time_one_hour_ago); + partition6.create_parquet_file_catalog_record(pf6_2).await; + partitions.push(partition6); + + (compactor, mock_compactor, partitions) + } } diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 00adedb793..a554b29400 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -439,6 +439,7 @@ impl Config { minutes_without_new_writes_to_be_cold: 10, hot_compaction_hours_threshold_1: 4, hot_compaction_hours_threshold_2: 24, + max_parallel_partitions: 20, }; let querier_config = QuerierConfig { diff --git a/ioxd_compactor/src/lib.rs b/ioxd_compactor/src/lib.rs index 7dd7525d4e..3601d3401f 100644 --- a/ioxd_compactor/src/lib.rs +++ b/ioxd_compactor/src/lib.rs @@ -214,6 +214,7 @@ pub async fn build_compactor_from_config( minutes_without_new_writes_to_be_cold, hot_compaction_hours_threshold_1, hot_compaction_hours_threshold_2, + max_parallel_partitions, .. } = compactor_config; @@ -231,6 +232,7 @@ pub async fn build_compactor_from_config( minutes_without_new_writes_to_be_cold, hot_compaction_hours_threshold_1, hot_compaction_hours_threshold_2, + max_parallel_partitions, }; Ok(compactor::compact::Compactor::new(