diff --git a/clap_blocks/src/compactor.rs b/clap_blocks/src/compactor.rs index 3a54c4c459..fef3c80ea6 100644 --- a/clap_blocks/src/compactor.rs +++ b/clap_blocks/src/compactor.rs @@ -36,22 +36,31 @@ pub struct CompactorConfig { /// The compactor will limit the number of simultaneous compaction jobs based on the /// size of the input files to be compacted. This number should be less than 1/10th /// of the available memory to ensure compactions have - /// enough space to run. Default is 100,000,000 (100MB). + /// enough space to run. Default is 10,000,000 (10MB). #[clap( long = "--compaction-concurrent-size-bytes", env = "INFLUXDB_IOX_COMPACTION_CONCURRENT_SIZE_BYTES", - default_value = "100000000" + default_value = "10000000" )] pub max_concurrent_compaction_size_bytes: i64, /// The compactor will compact overlapped files no matter how much large they are. /// For non-overlapped and contiguous files, compactor will also compact them into /// a larger file of max size defined by the config value. - /// Default is 100,000,000 (100MB). + /// Default is 5,000,000 (5MB) #[clap( long = "--compaction-max-size-bytes", env = "INFLUXDB_IOX_COMPACTION_MAX_SIZE_BYTES", - default_value = "100000000" + default_value = "5000000" )] pub compaction_max_size_bytes: i64, + + /// Limit the number of files per compaction + /// Default is 10 + #[clap( + long = "--compaction-max-file-count", + env = "INFLUXDB_IOX_COMPACTION_MAX_FILE_COUNT", + default_value = "10" + )] + pub compaction_max_file_count: i64, } diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 8891028edb..d84e9fa145 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -124,6 +124,36 @@ pub enum Error { /// A specialized `Error` for Compactor Data errors pub type Result = std::result::Result; +/// Data of parquet files to compact and upgrade +pub struct CompactAndUpgrade { + // sequencer ID of all files in this struct + sequencer_id: Option, + // Each group will be compacted into one file + groups_to_compact: Vec, + // level-0 files to be upgraded to level 1 + files_to_upgrade: Vec, +} + +impl CompactAndUpgrade { + fn new(sequencer_id: Option) -> Self { + Self { + sequencer_id, + groups_to_compact: vec![], + files_to_upgrade: vec![], + } + } + + /// Return true if there are files to compact and/or upgrade + pub fn compactable(&self) -> bool { + !self.groups_to_compact.is_empty() || !self.files_to_upgrade.is_empty() + } + + /// Return sequncer ID + pub fn sequencer_id(&self) -> Option { + self.sequencer_id + } +} + /// Data points need to run a compactor #[derive(Debug)] pub struct Compactor { @@ -148,10 +178,6 @@ pub struct Compactor { /// Counter for the number of files compacted compaction_counter: Metric, - /// Counter for the actual number of files compacted - /// compaction_actual_counter = compaction_counter - some large-enough non-overlapped files - compaction_actual_counter: Metric, - /// Counter for level promotion from level 0 to 1 /// These are large enough and non-overlapped file level_promotion_counter: Metric, @@ -187,11 +213,6 @@ impl Compactor { "counter for the number of files compacted", ); - let compaction_actual_counter = registry.register_metric( - "compactor_actual_compacted_files_total", - "counter for the number of actual files compacted", - ); - let compaction_output_counter = registry.register_metric( "compactor_output_compacted_files_total", "counter for the number of output compacted files", @@ -229,7 +250,7 @@ impl Compactor { backoff_config, config, compaction_counter, - compaction_actual_counter, + // compaction_actual_counter, compaction_output_counter, level_promotion_counter, compaction_candidate_gauge, @@ -339,16 +360,18 @@ impl Compactor { Ok(partition.sort_key()) } - /// Runs compaction in a partition resolving any tombstones and compacting data so that parquet - /// files will be non-overlapping in time. - pub async fn compact_partition( + /// Group files to be compacted together and level-0 files that will get upgraded + /// for a given partition. + /// The number of compacting files per group will be limited by thier total size and number of files + pub async fn groups_to_compact_and_files_to_upgrade( &self, partition_id: PartitionId, - compaction_max_size_bytes: i64, - ) -> Result<()> { - info!("compacting partition {}", partition_id); - let start_time = self.time_provider.now(); + compaction_max_size_bytes: i64, // max size of files to get compacted + compaction_max_file_count: i64, // max number of files to get compacted + ) -> Result { + let mut compact_and_upgrade = CompactAndUpgrade::new(None); + // List all valid (not soft deletded) files of the partition let parquet_files = self .catalog .repositories() @@ -358,9 +381,63 @@ impl Compactor { .await .context(ListParquetFilesSnafu)?; if parquet_files.is_empty() { + return Ok(compact_and_upgrade); + } + + compact_and_upgrade.sequencer_id = Some(parquet_files[0].sequencer_id); + + // Group overlapped files + // Each group will be limited by thier size and number of files + let overlapped_file_groups = Self::overlapped_groups( + parquet_files, + compaction_max_size_bytes, + compaction_max_file_count, + ); + + // Group time-contiguous non-overlapped groups if their total size is smaller than a threshold + // If their + let compact_file_groups = Self::group_small_contiguous_groups( + overlapped_file_groups, + compaction_max_size_bytes, + compaction_max_file_count, + ); + + // Attach appropriate tombstones to each file + let groups_with_tombstones = self.add_tombstones_to_groups(compact_file_groups).await?; + info!("compacting {} groups", groups_with_tombstones.len()); + + // File groups to compact and files to upgrade + for group in groups_with_tombstones { + // Only one file without tombstones, no need to compact. + if group.parquet_files.len() == 1 && group.tombstones.is_empty() { + // If it is level 0, upgrade it since it is non-overlapping + if group.parquet_files[0].data.compaction_level == INITIAL_COMPACTION_LEVEL { + compact_and_upgrade + .files_to_upgrade + .push(group.parquet_files[0].parquet_file_id()) + } + } else { + compact_and_upgrade.groups_to_compact.push(group); + } + } + + Ok(compact_and_upgrade) + } + + /// Runs compaction in a partition resolving any tombstones and compacting data so that parquet + /// files will be non-overlapping in time. + pub async fn compact_partition( + &self, + partition_id: PartitionId, + compact_and_upgrade: CompactAndUpgrade, + ) -> Result<()> { + if !compact_and_upgrade.compactable() { return Ok(()); } + info!("compacting partition {}", partition_id); + let start_time = self.time_provider.now(); + let sort_key_from_catalog = self .sort_key_from_catalog(partition_id) .await? @@ -369,39 +446,20 @@ impl Compactor { // old catalogs) .expect("Partition sort key should have been available in the catalog"); - let sequencer_id = parquet_files[0].sequencer_id; - let file_count = parquet_files.len(); - let mut actual_compacted_file_count = file_count; - - // Group overlapped files - let overlapped_file_groups = Self::overlapped_groups(parquet_files); - - // Group time-contiguous non-overlapped groups if their total size is smaller than a threshold - let compact_file_groups = - Self::group_small_contiguous_groups(overlapped_file_groups, compaction_max_size_bytes); - - // Attach appropriate tombstones to each file - let groups_with_tombstones = self.add_tombstones_to_groups(compact_file_groups).await?; - info!("compacting {} groups", groups_with_tombstones.len()); + let mut file_count = 0; // Compact, persist,and update catalog accordingly for each overlaped file let mut tombstones = BTreeMap::new(); - let mut upgrade_level_list: Vec = vec![]; let mut output_file_count = 0; - for group in groups_with_tombstones { + let sequencer_id = compact_and_upgrade + .sequencer_id() + .expect("Should have sequencer ID"); + for group in compact_and_upgrade.groups_to_compact { + file_count += group.parquet_files.len(); + // keep tombstone ids tombstones = Self::union_tombstones(tombstones, &group); - // Only one file without tombstones, no need to compact. - if group.parquet_files.len() == 1 && group.tombstones.is_empty() { - actual_compacted_file_count -= 1; - // If it is level 0, upgrade it since it is non-overlapping - if group.parquet_files[0].data.compaction_level == INITIAL_COMPACTION_LEVEL { - upgrade_level_list.push(group.parquet_files[0].parquet_file_id()); - } - continue; - } - // Collect all the parquet file IDs, to be able to set their catalog records to be // deleted. These should already be unique, no need to dedupe. let original_parquet_file_ids: Vec<_> = @@ -478,12 +536,13 @@ impl Compactor { self.remove_fully_processed_tombstones(tombstones).await?; // Upgrade old level-0 to level 1 - self.update_to_level_1(&upgrade_level_list).await?; + self.update_to_level_1(&compact_and_upgrade.files_to_upgrade) + .await?; let attributes = Attributes::from([("sequencer_id", format!("{}", sequencer_id).into())]); - if !upgrade_level_list.is_empty() { + if !compact_and_upgrade.files_to_upgrade.is_empty() { let promotion_counter = self.level_promotion_counter.recorder(attributes.clone()); - promotion_counter.inc(upgrade_level_list.len() as u64); + promotion_counter.inc(compact_and_upgrade.files_to_upgrade.len() as u64); } if let Some(delta) = self.time_provider.now().checked_duration_since(start_time) { @@ -494,9 +553,6 @@ impl Compactor { let compaction_counter = self.compaction_counter.recorder(attributes.clone()); compaction_counter.inc(file_count as u64); - let compaction_actual_counter = self.compaction_actual_counter.recorder(attributes.clone()); - compaction_actual_counter.inc(actual_compacted_file_count as u64); - let compaction_output_counter = self.compaction_output_counter.recorder(attributes); compaction_output_counter.inc(output_file_count as u64); @@ -507,6 +563,7 @@ impl Compactor { fn group_small_contiguous_groups( mut file_groups: Vec, compaction_max_size_bytes: i64, + compaction_max_file_count: i64, ) -> Vec> { let mut groups = Vec::with_capacity(file_groups.len()); if file_groups.is_empty() { @@ -518,9 +575,14 @@ impl Compactor { let mut current_group = vec![]; let mut current_size = 0; + let mut current_num_files = 0; for g in file_groups { - if current_size + g.total_file_size_bytes < compaction_max_size_bytes { + if current_size + g.total_file_size_bytes < compaction_max_size_bytes + && current_num_files + g.parquet_files.len() + < compaction_max_file_count.try_into().unwrap() + { // Group this one with the current_group + current_num_files += g.parquet_files.len(); current_group.extend(g.parquet_files); current_size += g.total_file_size_bytes; } else { @@ -528,7 +590,7 @@ impl Compactor { if !current_group.is_empty() { groups.push(current_group); } - + current_num_files = g.parquet_files.len(); current_group = g.parquet_files; current_size = g.total_file_size_bytes; } @@ -748,15 +810,72 @@ impl Compactor { Ok(()) } + // Split overlapped groups into smaller groups if there are so mnay files in each group or + // their size are too large. The files are sorted by their min time before splitting so files + // are guarannteed to be overlapped in each new group + fn split_overlapped_groups( + groups: &mut Vec>, + max_size_bytes: i64, + max_file_count: i64, + ) -> Vec> { + let mut overlapped_groups: Vec> = + Vec::with_capacity(groups.len() * 2); + let max_count = max_file_count.try_into().unwrap(); + for group in groups { + let total_size_bytes: i64 = group.iter().map(|f| f.file_size_bytes).sum(); + if group.len() <= max_count && total_size_bytes <= max_size_bytes { + overlapped_groups.push(group.to_vec()); + } else { + group.sort_by_key(|f| f.min_time); + while !group.is_empty() { + // limit file num + let mut count = max_count; + + // limit total file size + let mut size = 0; + for (i, item) in group.iter().enumerate() { + if i >= max_count { + count = max_count; + break; + } + + size += item.file_size_bytes; + if size >= max_size_bytes { + count = i + 1; + break; + } + } + + if count > group.len() { + count = group.len(); + } + let group_new = group.split_off(count); + overlapped_groups.push(group.to_vec()); + *group = group_new; + } + } + } + + overlapped_groups + } + // Given a list of parquet files that come from the same Table Partition, group files together // if their (min_time, max_time) ranges overlap. Does not preserve or guarantee any ordering. + // If there are so mnay files in an overlapped group, the group will be split to ensure each + // group contains limited number of files fn overlapped_groups( parquet_files: Vec, + max_size_bytes: i64, + max_file_count: i64, ) -> Vec { // group overlap files - let overlapped_groups = + let mut overlapped_groups = group_potential_duplicates(parquet_files).expect("Error grouping overlapped chunks"); + // split overlapped groups into smalller groups if they include so many files + let overlapped_groups = + Self::split_overlapped_groups(&mut overlapped_groups, max_size_bytes, max_file_count); + // Compute min time and total size for each overlapped group let mut groups_with_min_time_and_size = Vec::with_capacity(overlapped_groups.len()); for group in overlapped_groups { @@ -984,6 +1103,8 @@ mod tests { // Simulate unique ID generation static NEXT_ID: AtomicI64 = AtomicI64::new(0); + static TEST_MAX_SIZE_BYTES: i64 = 100000; + static TEST_MAX_FILE_COUNT: i64 = 10; #[tokio::test] // This is integration test to verify all pieces are put together correctly @@ -1038,17 +1159,23 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new(90, 100000, 100000), + CompactorConfig::new(90, 100000, 100000, 10), Arc::new(metric::Registry::new()), ); - compactor - .compact_partition( + let compact_and_upgrade = compactor + .groups_to_compact_and_files_to_upgrade( partition.partition.id, compactor.config.compaction_max_size_bytes(), + compactor.config.compaction_max_file_count(), ) .await .unwrap(); + compactor + .compact_partition(partition.partition.id, compact_and_upgrade) + .await + .unwrap(); + // should have 2 non-deleted level_0 files. The original file was marked deleted and not counted let mut files = catalog .list_by_table_not_to_delete_with_metadata(table.table.id) @@ -1177,7 +1304,7 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new(90, 100000, 100000), + CompactorConfig::new(90, 100000, 100000, 10), Arc::new(metric::Registry::new()), ); @@ -1260,13 +1387,18 @@ mod tests { // ------------------------------------------------ // Compact - compactor - .compact_partition( + let compact_and_upgrade = compactor + .groups_to_compact_and_files_to_upgrade( partition.partition.id, compactor.config.compaction_max_size_bytes(), + compactor.config.compaction_max_file_count(), ) .await .unwrap(); + compactor + .compact_partition(partition.partition.id, compact_and_upgrade) + .await + .unwrap(); // Should have 3 non-soft-deleted files: pf1 not compacted and stay, and 2 newly created after compacting pf2, pf3, pf4 let mut files = catalog @@ -1373,7 +1505,7 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new(90, 100000, 100000), + CompactorConfig::new(90, 100000, 100000, 10), Arc::new(metric::Registry::new()), ); @@ -1488,7 +1620,7 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new(90, 100000, 100000), + CompactorConfig::new(90, 100000, 100000, 10), Arc::new(metric::Registry::new()), ); @@ -1609,7 +1741,7 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new(90, 100000, 100000), + CompactorConfig::new(90, 100000, 100000, 10), Arc::new(metric::Registry::new()), ); @@ -1804,7 +1936,11 @@ mod tests { let pf1 = arbitrary_parquet_file(1, 2); let pf2 = arbitrary_parquet_file(3, 4); - let groups = Compactor::overlapped_groups(vec![pf1.clone(), pf2.clone()]); + let groups = Compactor::overlapped_groups( + vec![pf1.clone(), pf2.clone()], + TEST_MAX_SIZE_BYTES, + TEST_MAX_FILE_COUNT, + ); // They should be 2 groups assert_eq!(groups.len(), 2, "There should have been two group"); @@ -1819,7 +1955,11 @@ mod tests { let pf1 = arbitrary_parquet_file(1, 3); let pf2 = arbitrary_parquet_file(2, 4); - let groups = Compactor::overlapped_groups(vec![pf1.clone(), pf2.clone()]); + let groups = Compactor::overlapped_groups( + vec![pf1.clone(), pf2.clone()], + TEST_MAX_SIZE_BYTES, + TEST_MAX_FILE_COUNT, + ); // They should be in one group (order not guaranteed) assert_eq!(groups.len(), 1, "There should have only been one group"); @@ -1857,7 +1997,8 @@ mod tests { partial_overlap.clone(), ]; - let mut groups = Compactor::overlapped_groups(all); + let mut groups = + Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT); dbg!(&groups); assert_eq!(groups.len(), 3); @@ -1918,7 +2059,11 @@ mod tests { let pf1 = arbitrary_parquet_file_with_size(1, 2, 100); let pf2 = arbitrary_parquet_file_with_size(3, 4, 200); - let overlapped_groups = Compactor::overlapped_groups(vec![pf1.clone(), pf2.clone()]); + let overlapped_groups = Compactor::overlapped_groups( + vec![pf1.clone(), pf2.clone()], + TEST_MAX_SIZE_BYTES, + TEST_MAX_FILE_COUNT, + ); // 2 overlapped groups assert_eq!(overlapped_groups.len(), 2); let g1 = GroupWithMinTimeAndSize { @@ -1936,13 +2081,162 @@ mod tests { // Group them by size let compaction_max_size_bytes = 100000; - let groups = - Compactor::group_small_contiguous_groups(overlapped_groups, compaction_max_size_bytes); + let groups = Compactor::group_small_contiguous_groups( + overlapped_groups, + compaction_max_size_bytes, + TEST_MAX_FILE_COUNT, + ); // 2 small groups should be grouped in one assert_eq!(groups.len(), 1); assert_eq!(groups, vec![vec![pf1, pf2]]); } + // This is specific unit test for split_overlapped_groups but it is a subtest of test_limit_size_and_num_files + // Keep all the variables the same names in both tests for us to follow them easily + #[test] + fn test_split_overlapped_groups() { + let compaction_max_size_bytes = 100000; + + // oldest overlapped and very small + let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 400); + let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 500); + let max_equals_min = arbitrary_parquet_file_with_size(3, 5, 400); + let min_equals_max = arbitrary_parquet_file_with_size(10, 12, 500); + let oldest_overlapped_group = vec![ + overlaps_many.clone(), + contained_completely_within.clone(), + max_equals_min.clone(), + min_equals_max.clone(), + ]; + + // newest files and very large + let alone = arbitrary_parquet_file_with_size(30, 35, compaction_max_size_bytes + 200); // too large to group + let newest_overlapped_group = vec![alone.clone()]; + + // small files in the middle + let another = arbitrary_parquet_file_with_size(13, 15, 1000); + let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000); + let middle_overlapped_group = vec![another.clone(), partial_overlap.clone()]; + + let mut overlapped_groups = vec![ + oldest_overlapped_group, + newest_overlapped_group, + middle_overlapped_group, + ]; + + let max_size_bytes = 1000; + let max_file_count = 2; + + // Three input groups but will produce 5 output ones becasue of limit in size and file count. + // Note that the 3 input groups each includes overlapped files but the groups do not overlap. + // The function split_overlapped_groups is to split each overlapped group. It does not merge any groups. + let groups = Compactor::split_overlapped_groups( + &mut overlapped_groups, + max_size_bytes, + max_file_count, + ); + + // must be 5 + assert_eq!(groups.len(), 5); + + // oldest_overlapped_group was split into groups[0] and groups[1] due to file count limit + assert_eq!(groups[0].len(), 2); // reach limit file count + assert!(groups[0].contains(&max_equals_min)); // min_time = 3 + assert!(groups[0].contains(&overlaps_many)); // min_time = 5 + assert_eq!(groups[1].len(), 2); // reach limit file count + assert!(groups[1].contains(&contained_completely_within)); // min_time = 6 + assert!(groups[1].contains(&min_equals_max)); // min_time = 10 + + // newest_overlapped_group stays the same length one file and the corresponding output is groups[2] + assert_eq!(groups[2].len(), 1); // reach limit file size + assert!(groups[2].contains(&alone)); + + // middle_overlapped_group was split into groups[3] and groups[4] due to size limit + assert_eq!(groups[3].len(), 1); // reach limit file size + assert!(groups[3].contains(&another)); // min_time = 13 + assert_eq!(groups[4].len(), 1); // reach limit file size + assert!(groups[4].contains(&partial_overlap)); // min_time = 14 + } + + // This tests + // 1. overlapped_groups which focuses on the detail of both its children: + // 1.a. group_potential_duplicates that groups files into overlapped groups + // 1.b. split_overlapped_groups that splits each overlapped group further to meet size and/or file limit + // 2. group_small_contiguous_groups that merges non-overlapped group into a larger one if they meet size and file limit + #[test] + fn test_limit_size_and_num_files() { + let compaction_max_size_bytes = 100000; + + // oldest overlapped and very small + let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 400); + let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 500); + let max_equals_min = arbitrary_parquet_file_with_size(3, 5, 400); + let min_equals_max = arbitrary_parquet_file_with_size(10, 12, 500); + + // newest files and very large + let alone = arbitrary_parquet_file_with_size(30, 35, compaction_max_size_bytes + 200); // too large to group + + // small files in the middle + let another = arbitrary_parquet_file_with_size(13, 15, 1000); + let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000); + + // Given a bunch of files in an arbitrary order, + let all = vec![ + min_equals_max.clone(), + overlaps_many.clone(), + alone.clone(), + another.clone(), + max_equals_min.clone(), + contained_completely_within.clone(), + partial_overlap.clone(), + ]; + + // Group into overlapped groups + let max_size_bytes = 1000; + let max_file_count = 2; + let overlapped_groups = Compactor::overlapped_groups(all, max_size_bytes, max_file_count); + // Must be 5 + assert_eq!(overlapped_groups.len(), 5); + assert_eq!(overlapped_groups[0].parquet_files.len(), 2); // reach limit file count + assert_eq!(overlapped_groups[1].parquet_files.len(), 2); // reach limit file count + assert_eq!(overlapped_groups[2].parquet_files.len(), 1); // reach limit file size + assert_eq!(overlapped_groups[3].parquet_files.len(), 1); // reach limit file size + assert_eq!(overlapped_groups[4].parquet_files.len(), 1); // reach limit file size + + // Group further into group by size and file count limit + // Due to the merge with correct time range, this function has to sort the groups hence output data will be in time order + let groups = Compactor::group_small_contiguous_groups( + overlapped_groups, + compaction_max_size_bytes, + max_file_count, + ); + + // Still 5 groups. Nothing is merged due to the limit of size and file num + assert_eq!(groups.len(), 5); + + assert_eq!(groups[0].len(), 2); // reach file num limit + assert!(groups[0].contains(&max_equals_min)); // min_time = 3 + assert!(groups[0].contains(&overlaps_many)); // min_time = 5 + + assert_eq!(groups[1].len(), 2); // reach file num limit + assert!(groups[1].contains(&contained_completely_within)); // min_time = 6 + assert!(groups[1].contains(&min_equals_max)); // min_time = 10 + + assert_eq!(groups[2].len(), 1); // reach size limit + assert!(groups[2].contains(&another)); // min_time = 13 + + assert_eq!(groups[3].len(), 1); // reach size limit + assert!(groups[3].contains(&partial_overlap)); // min_time = 14 + + assert_eq!(groups[4].len(), 1); // reach size limit + assert!(groups[4].contains(&alone)); + } + + // This tests + // 1. overlapped_groups which focuses on the detail of both its children: + // 1.a. group_potential_duplicates that groups files into overlapped groups + // 1.b. split_overlapped_groups that splits each overlapped group further to meet size and/or file limit + // 2. group_small_contiguous_groups that merges non-overlapped group into a larger one if they meet size and file limit #[test] fn test_group_small_contiguous_overlapped_groups_no_group() { let compaction_max_size_bytes = 100000; @@ -1951,7 +2245,11 @@ mod tests { let pf1 = arbitrary_parquet_file_with_size(1, 2, 100); let pf2 = arbitrary_parquet_file_with_size(3, 4, compaction_max_size_bytes); // too large to group - let overlapped_groups = Compactor::overlapped_groups(vec![pf1.clone(), pf2.clone()]); + let overlapped_groups = Compactor::overlapped_groups( + vec![pf1.clone(), pf2.clone()], + TEST_MAX_SIZE_BYTES, + TEST_MAX_FILE_COUNT, + ); // 2 overlapped groups assert_eq!(overlapped_groups.len(), 2); let g1 = GroupWithMinTimeAndSize { @@ -1968,13 +2266,21 @@ mod tests { assert_eq!(overlapped_groups, vec![g1, g2]); // Group them by size - let groups = - Compactor::group_small_contiguous_groups(overlapped_groups, compaction_max_size_bytes); + let groups = Compactor::group_small_contiguous_groups( + overlapped_groups, + compaction_max_size_bytes, + TEST_MAX_FILE_COUNT, + ); // Files too big to group further assert_eq!(groups.len(), 2); assert_eq!(groups, vec![vec![pf1], vec![pf2]]); } + // This tests + // 1. overlapped_groups which focuses on the detail of both its children: + // 1.a. group_potential_duplicates that groups files into overlapped groups + // 1.b. split_overlapped_groups that splits each overlapped group further to meet size and/or file limit + // 2. group_small_contiguous_groups that merges non-overlapped group into a larger one if they meet size and file limit #[test] fn test_group_small_contiguous_overlapped_groups_many_files() { let compaction_max_size_bytes = 100000; @@ -2004,12 +2310,16 @@ mod tests { ]; // Group into overlapped groups - let overlapped_groups = Compactor::overlapped_groups(all); + let overlapped_groups = + Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT); assert_eq!(overlapped_groups.len(), 3); // group further into group by size - let groups = - Compactor::group_small_contiguous_groups(overlapped_groups, compaction_max_size_bytes); + let groups = Compactor::group_small_contiguous_groups( + overlapped_groups, + compaction_max_size_bytes, + TEST_MAX_FILE_COUNT, + ); // should be 2 groups assert_eq!(groups.len(), 2); // first group includes 6 oldest files in 2 overlapped groups @@ -2025,22 +2335,24 @@ mod tests { assert!(groups[1].contains(&alone)); } + // This tests + // 1. overlapped_groups which focuses on the detail of both its children: + // 1.a. group_potential_duplicates that groups files into overlapped groups + // 1.b. split_overlapped_groups that splits each overlapped group further to meet size and/or file limit + // 2. group_small_contiguous_groups that merges non-overlapped group into a larger one if they meet size and file limit #[test] fn test_group_small_contiguous_overlapped_groups_many_files_too_large() { - let compaction_max_size_bytes = 100000; - // oldest overlapped and large let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 200); let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 300); - let max_equals_min = - arbitrary_parquet_file_with_size(3, 5, compaction_max_size_bytes + 400); // too large to group + let max_equals_min = arbitrary_parquet_file_with_size(3, 5, TEST_MAX_SIZE_BYTES + 400); // too large to group let min_equals_max = arbitrary_parquet_file_with_size(10, 12, 500); // newest files and large - let alone = arbitrary_parquet_file_with_size(30, 35, compaction_max_size_bytes + 200); // too large to group + let alone = arbitrary_parquet_file_with_size(30, 35, TEST_MAX_SIZE_BYTES); // too large to group // files in the middle and also large - let another = arbitrary_parquet_file_with_size(13, 15, compaction_max_size_bytes + 100); // too large to group + let another = arbitrary_parquet_file_with_size(13, 15, TEST_MAX_SIZE_BYTES); // too large to group let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000); // Given a bunch of files in an arbitrary order @@ -2055,33 +2367,39 @@ mod tests { ]; // Group into overlapped groups - let overlapped_groups = Compactor::overlapped_groups(all); - assert_eq!(overlapped_groups.len(), 3); + let overlapped_groups = + Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT); + assert_eq!(overlapped_groups.len(), 5); - // group further into group by size - let groups = - Compactor::group_small_contiguous_groups(overlapped_groups, compaction_max_size_bytes); - // Still 3 groups because they are all large - assert_eq!(groups.len(), 3); - // first group includes 4 oldest files - assert_eq!(groups[0].len(), 4); - assert!(groups[0].contains(&overlaps_many)); - assert!(groups[0].contains(&contained_completely_within)); - assert!(groups[0].contains(&max_equals_min)); - assert!(groups[0].contains(&min_equals_max)); - // second group - assert_eq!(groups[1].len(), 2); - assert!(groups[1].contains(&another)); - assert!(groups[1].contains(&partial_overlap)); - // third group + // 5 input groups and 5 output groups because they are too large to group further + let groups = Compactor::group_small_contiguous_groups( + overlapped_groups, + TEST_MAX_SIZE_BYTES, + TEST_MAX_FILE_COUNT, + ); + assert_eq!(groups.len(), 5); + + // first group includes oldest and large file + assert_eq!(groups[0].len(), 1); + assert!(groups[0].contains(&max_equals_min)); // min_time = 3 + // second group + assert_eq!(groups[1].len(), 3); + assert!(groups[1].contains(&overlaps_many)); // min_time = 5 + assert!(groups[1].contains(&contained_completely_within)); // min_time = 6 + assert!(groups[1].contains(&min_equals_max)); // min_time = 10 + // third group assert_eq!(groups[2].len(), 1); - assert!(groups[2].contains(&alone)); + assert!(groups[2].contains(&another)); // min_time = 13 + // forth group + assert_eq!(groups[3].len(), 1); // min_time = 14 + assert!(groups[3].contains(&partial_overlap)); + // fifth group + assert_eq!(groups[4].len(), 1); + assert!(groups[4].contains(&alone)); // min_time = 30 } #[test] fn test_group_small_contiguous_overlapped_groups_many_files_middle_too_large() { - let compaction_max_size_bytes = 100000; - // oldest overlapped and very small let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 200); let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 300); @@ -2092,7 +2410,7 @@ mod tests { let alone = arbitrary_parquet_file_with_size(30, 35, 200); // large files in the middle - let another = arbitrary_parquet_file_with_size(13, 15, compaction_max_size_bytes + 100); // too large to group + let another = arbitrary_parquet_file_with_size(13, 15, TEST_MAX_SIZE_BYTES); // too large to group let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000); // Given a bunch of files in an arbitrary order @@ -2107,27 +2425,32 @@ mod tests { ]; // Group into overlapped groups - let overlapped_groups = Compactor::overlapped_groups(all); - assert_eq!(overlapped_groups.len(), 3); + let overlapped_groups = + Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT); + assert_eq!(overlapped_groups.len(), 4); - // group further into group by size - let groups = - Compactor::group_small_contiguous_groups(overlapped_groups, compaction_max_size_bytes); - // Nothing will be group further because the middle one is large + // 4 input groups but 3 output groups + // The last 2 groups will be grouped together because they are small + let groups = Compactor::group_small_contiguous_groups( + overlapped_groups, + TEST_MAX_SIZE_BYTES, + TEST_MAX_FILE_COUNT, + ); assert_eq!(groups.len(), 3); + // first group includes 4 oldest files assert_eq!(groups[0].len(), 4); - assert!(groups[0].contains(&overlaps_many)); - assert!(groups[0].contains(&contained_completely_within)); - assert!(groups[0].contains(&max_equals_min)); - assert!(groups[0].contains(&min_equals_max)); - // second group - assert_eq!(groups[1].len(), 2); - assert!(groups[1].contains(&another)); - assert!(groups[1].contains(&partial_overlap)); - // third group - assert_eq!(groups[2].len(), 1); - assert!(groups[2].contains(&alone)); + assert!(groups[0].contains(&max_equals_min)); // min _time = 3 + assert!(groups[0].contains(&overlaps_many)); // min _time = 5 + assert!(groups[0].contains(&contained_completely_within)); // min _time = 6 + assert!(groups[0].contains(&min_equals_max)); // min _time = 10 + // second group + assert_eq!(groups[1].len(), 1); + assert!(groups[1].contains(&another)); // min _time = 13 + // third group + assert_eq!(groups[2].len(), 2); + assert!(groups[2].contains(&partial_overlap)); // min _time = 3 + assert!(groups[2].contains(&alone)); // min _time = 30 } #[tokio::test] @@ -2141,7 +2464,7 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new(90, 100000, 100000), + CompactorConfig::new(90, 100000, 100000, 10), Arc::new(metric::Registry::new()), ); @@ -2314,7 +2637,7 @@ mod tests { // Given a bunch of files in a particular order to exercise the algorithm: let all = vec![one, two, three]; - let groups = Compactor::overlapped_groups(all); + let groups = Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT); dbg!(&groups); // All should be in the same group. @@ -2328,7 +2651,7 @@ mod tests { // Given a bunch of files in a particular order to exercise the algorithm: let all = vec![one, two, three, four]; - let groups = Compactor::overlapped_groups(all); + let groups = Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT); dbg!(&groups); // All should be in the same group. @@ -2346,7 +2669,7 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new(90, 100000, 100000), + CompactorConfig::new(90, 100000, 100000, 10), Arc::new(metric::Registry::new()), ); @@ -2673,7 +2996,7 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new(90, 100000, 100000), + CompactorConfig::new(90, 100000, 100000, 10), Arc::new(metric::Registry::new()), ); @@ -2747,7 +3070,7 @@ mod tests { Arc::new(Executor::new(1)), Arc::new(SystemProvider::new()), BackoffConfig::default(), - CompactorConfig::new(90, 100000, 100000), + CompactorConfig::new(90, 100000, 100000, 10), Arc::new(metric::Registry::new()), ); diff --git a/compactor/src/handler.rs b/compactor/src/handler.rs index acde6daf4f..2ad880893c 100644 --- a/compactor/src/handler.rs +++ b/compactor/src/handler.rs @@ -112,6 +112,8 @@ pub struct CompactorConfig { /// For non-overlapped and contiguous files, compactor will also compact them into /// a larger file of max size defined by this config value. compaction_max_size_bytes: i64, + /// Limit the number of files to compact into one file + compaction_max_file_count: i64, } impl CompactorConfig { @@ -120,6 +122,7 @@ impl CompactorConfig { split_percentage: i64, max_concurrent_compaction_size_bytes: i64, compaction_max_size_bytes: i64, + compaction_max_file_count: i64, ) -> Self { assert!(split_percentage > 0 && split_percentage <= 100); @@ -127,6 +130,7 @@ impl CompactorConfig { split_percentage, max_concurrent_compaction_size_bytes, compaction_max_size_bytes, + compaction_max_file_count, } } @@ -150,6 +154,11 @@ impl CompactorConfig { pub fn compaction_max_size_bytes(&self) -> i64 { self.compaction_max_size_bytes } + + /// Max number of files to compact at a time + pub fn compaction_max_file_count(&self) -> i64 { + self.compaction_max_file_count + } } /// Checks for candidate partitions to compact and spawns tokio tasks to compact as many @@ -170,28 +179,50 @@ async fn run_compactor(compactor: Arc, shutdown: CancellationToken) { let mut used_size = 0; let max_size = compactor.config.max_concurrent_compaction_size_bytes(); let max_file_size = compactor.config.compaction_max_size_bytes(); + let max_file_count = compactor.config.compaction_max_file_count(); let mut handles = vec![]; for c in candidates { let compactor = Arc::clone(&compactor); - let handle = tokio::task::spawn(async move { - debug!(candidate=?c, "compacting candidate"); - if let Err(e) = compactor - .compact_partition(c.partition_id, max_file_size) - .await - { + let compact_and_upgrade = compactor + .groups_to_compact_and_files_to_upgrade( + c.partition_id, + max_file_size, + max_file_count, + ) + .await; + + match compact_and_upgrade { + Err(e) => { warn!( - "compaction on partition {} failed with: {:?}", + "groups file to compact and upgrade on partition {} failed with: {:?}", c.partition_id, e ); } - debug!(candidate=?c, "compaction complete"); - }); - used_size += c.file_size_bytes; - handles.push(handle); - if used_size > max_size { - debug!(%max_size, %used_size, n_compactions=%handles.len(), "reached maximum concurrent compaction size limit"); - break; + Ok(compact_and_upgrade) => { + if compact_and_upgrade.compactable() { + let handle = tokio::task::spawn(async move { + debug!(candidate=?c, "compacting candidate"); + let res = compactor + .compact_partition(c.partition_id, compact_and_upgrade) + .await; + if let Err(e) = res { + warn!( + "compaction on partition {} failed with: {:?}", + c.partition_id, e + ); + } + debug!(candidate=?c, "compaction complete"); + }); + + used_size += c.file_size_bytes; + handles.push(handle); + if used_size > max_size { + debug!(%max_size, %used_size, n_compactions=%handles.len(), "reached maximum concurrent compaction size limit"); + break; + } + } + } } } diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index 51d8d377fd..235df8726d 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -360,6 +360,7 @@ impl Config { split_percentage: 90, max_concurrent_compaction_size_bytes: 100000, compaction_max_size_bytes: 100000, + compaction_max_file_count: 10, }; let querier_config = QuerierConfig { diff --git a/ioxd_compactor/src/lib.rs b/ioxd_compactor/src/lib.rs index f58cb432cf..aeb5968a25 100644 --- a/ioxd_compactor/src/lib.rs +++ b/ioxd_compactor/src/lib.rs @@ -167,6 +167,7 @@ pub async fn create_compactor_server_type( compactor_config.split_percentage, compactor_config.max_concurrent_compaction_size_bytes, compactor_config.compaction_max_size_bytes, + compactor_config.compaction_max_file_count, ); let compactor_handler = Arc::new(CompactorHandlerImpl::new( sequencers,