From 75065abfb6274633785aed455d672e1f9a227a5f Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Thu, 7 Jul 2022 13:52:33 -0400 Subject: [PATCH] fix: Compact all data for a partition to one file --- compactor/src/compact.rs | 2195 +++----------------------------------- compactor/src/handler.rs | 10 +- 2 files changed, 152 insertions(+), 2053 deletions(-) diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 40b4c9af47..7ab0ecc065 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -3,10 +3,7 @@ use crate::{ handler::CompactorConfig, query::QueryableParquetChunk, - utils::{ - CatalogUpdate, CompactedData, GroupWithMinTimeAndSize, GroupWithTombstones, - ParquetFileWithTombstone, - }, + utils::{CatalogUpdate, CompactedData, GroupWithTombstones, ParquetFileWithTombstone}, }; use backoff::BackoffConfig; use data_types::{ @@ -20,7 +17,6 @@ use iox_catalog::interface::{get_schema_by_id, Catalog, Transaction}; use iox_query::{ exec::{Executor, ExecutorType}, frontend::reorg::ReorgPlanner, - provider::overlap::group_potential_duplicates, QueryChunk, }; use iox_time::TimeProvider; @@ -460,15 +456,10 @@ impl Compactor { Ok(partition) } - /// Group files to be compacted together and level-0 files that will get upgraded - /// for a given partition. - /// The number of compacting files per group will be limited by their total size and number of - /// files. + /// Find the level-0 files to be compacted together and upgraded for a given partition. pub async fn groups_to_compact_and_files_to_upgrade( &self, partition_id: PartitionId, - compaction_max_size_bytes: i64, // max size of files to get compacted - compaction_max_file_count: i64, // max number of files to get compacted ) -> Result { let mut compact_and_upgrade = CompactAndUpgrade::new(None); @@ -487,38 +478,38 @@ impl Compactor { compact_and_upgrade.sequencer_id = Some(parquet_files[0].sequencer_id); - // Group overlapped files - // Each group will be limited by their size and number of files - let overlapped_file_groups = Self::overlapped_groups( - parquet_files, - compaction_max_size_bytes, - compaction_max_file_count, - )?; - - // Group time-contiguous non-overlapped groups if their total size is smaller than a - // threshold - let compact_file_groups = Self::group_small_contiguous_groups( - overlapped_file_groups, - compaction_max_size_bytes, - compaction_max_file_count, - ); - // Attach appropriate tombstones to each file - let groups_with_tombstones = self.add_tombstones_to_groups(compact_file_groups).await?; - info!("compacting {} groups", groups_with_tombstones.len()); + let files_with_tombstones = self.add_tombstones_to_files(parquet_files).await?; + if let Some(files_with_tombstones) = files_with_tombstones { + info!( + "compacting {} files", + files_with_tombstones.parquet_files.len() + ); - // File groups to compact and files to upgrade - for group in groups_with_tombstones { // Only one file without tombstones, no need to compact. - if group.parquet_files.len() == 1 && group.tombstones.is_empty() { - // If it is level 0, upgrade it since it is non-overlapping - if group.parquet_files[0].compaction_level == INITIAL_COMPACTION_LEVEL { + if files_with_tombstones.parquet_files.len() == 1 + && files_with_tombstones.tombstones.is_empty() + { + info!( + sequencer_id = files_with_tombstones.parquet_files[0].sequencer_id.get(), + partition_id = partition_id.get(), + parquet_file_id = files_with_tombstones.parquet_files[0] + .parquet_file_id() + .get(), + "no need to compact one file without tombstones" + ); + // If it is level 0, upgrade it + if files_with_tombstones.parquet_files[0].compaction_level + == INITIAL_COMPACTION_LEVEL + { compact_and_upgrade .files_to_upgrade - .push(group.parquet_files[0].parquet_file_id()) + .push(files_with_tombstones.parquet_files[0].parquet_file_id()) } } else { - compact_and_upgrade.groups_to_compact.push(group); + compact_and_upgrade + .groups_to_compact + .push(files_with_tombstones); } } @@ -541,7 +532,6 @@ impl Compactor { table_schema: &TableSchema, partition_id: PartitionId, compact_and_upgrade: CompactAndUpgrade, - max_desired_file_size: i64, ) -> Result<()> { if !compact_and_upgrade.compactable() { return Ok(()); @@ -595,7 +585,6 @@ impl Compactor { table, table_schema, &partition, - max_desired_file_size, ) .await? .into_iter() @@ -684,108 +673,6 @@ impl Compactor { Ok(()) } - // Group time-contiguous non-overlapped groups if their total size is smaller than a threshold - // There are 2 types of input groups - // - // 1. Type-1: Groups that inlcude overlapped files but the groups do not overlap with other - // groups - // 2. Type-2: Groups that overlap with other group - // - // We can only combine time-contiguous type-1 groups to avoid a buggy situation where the - // groups overlap in both time and sequence numbers - // - // Example: - // n_m: chunk with range of sequence numbers [n, m] where n <= m - // - // Input: 5 groups - // . 3 type-1 groups: [1_1, 2_2], [4_4], [7_7], - // . 2 type-2 groups: [5_5], [6_6] - // - // │ │ │ Time - // ──────────────────────────────────────────────────────────────▶ - // ┌─────┐ │ ┌─────┐ │ ┌─────────────────┐ │ ┌─────┐ - // │ 1_1 │ │ 7_7 │ │ 5_5(*) │ │ 4_4 │ - // └─────┘ │ └─────┘ │ └─────────────────┘ │ └─────┘ - // ┌─────┐ ┌──────┐ - // │ 2_2 │ │ │ │ │6_6(*)│ │ - // └─────┘ └──────┘ - // │ │ │ │ - // - // Output: 4 groups - // . [1_1, 2_2, 7_7] - // . [5_5] - // . [6_6] - // . [4_4] - // │ │ Time - // ──────────────────────────────────────────────────────────────▶ - // ┌─────┐ ┌─────┐ │ ┌─────────────────┐ │ ┌─────┐ - // │ 1_1 │ │ 7_7 │ │ 5_5(*) │ │ 4_4 │ - // └─────┘ └─────┘ │ └─────────────────┘ │ └─────┘ - // ┌─────┐ ┌──────┐ - // │ 2_2 │ │ │ │6_6(*)│ │ - // └─────┘ └──────┘ - // │ │ │ - // - // Note that even if groups [6_6] and [4_4] are very small, they cannot be combined - // because chunk 6_6 overlaps with chunk 5_5. Combining [6_6] and [4_4] will lead to - // a bad comacting result of 2 chunks 5_5 and 4_6 that overlap in both time and sequence - // numbers - - fn group_small_contiguous_groups( - mut file_groups: Vec, - compaction_max_size_bytes: i64, - compaction_max_file_count: i64, - ) -> Vec> { - let mut groups = Vec::with_capacity(file_groups.len()); - if file_groups.is_empty() { - return groups; - } - - // Sort the groups by their min_time - file_groups.sort_by_key(|a| a.min_time); - - let mut current_group = vec![]; - let mut current_size = 0; - let mut current_num_files = 0; - for g in file_groups { - if !g.overlapped_with_other_groups // only group non-overlapped ones - && current_size + g.total_file_size_bytes < compaction_max_size_bytes - && current_num_files + g.parquet_files.len() - < compaction_max_file_count.try_into().unwrap() - { - // Group this one with the current_group - current_num_files += g.parquet_files.len(); - current_group.extend(g.parquet_files); - current_size += g.total_file_size_bytes; - } else { - // Current group cannot be combined with its next one - if !current_group.is_empty() { - groups.push(current_group); - } - - // Create new current group - current_num_files = g.parquet_files.len(); - current_group = g.parquet_files; - current_size = g.total_file_size_bytes; - - // If this current group overlaps with other groups, it cannot - // be combined with any groups - if g.overlapped_with_other_groups { - groups.push(current_group); - - current_group = vec![]; - current_num_files = 0; - current_size = 0; - } - } - } - - // push the last one - groups.push(current_group); - - groups - } - fn union_tombstones( mut tombstones: BTreeMap, group_with_tombstones: &GroupWithTombstones, @@ -796,11 +683,7 @@ impl Compactor { tombstones } - // Compact given files. The given files are either overlapped or contiguous in time. - // The output will include 2 CompactedData sets, one contains a large amount of data of - // least recent time and the other has a small amount of data of most recent time. Each - // will be persisted in its own file. The idea is when new writes come, they will - // mostly overlap with the most recent data only. + // Compact given files. The given files are all for the same partition. async fn compact( &self, overlapped_files: Vec, @@ -808,7 +691,6 @@ impl Compactor { table: &Table, table_schema: &TableSchema, partition: &Partition, - max_desired_file_size: i64, ) -> Result> { debug!(num_files = overlapped_files.len(), "compact files"); @@ -827,13 +709,7 @@ impl Compactor { let namespace_id = table.namespace_id; let table_id = table.id; - // Total size of all files - let total_size = overlapped_files - .iter() - .map(|f| f.file_size_bytes) - .sum::(); - - // Collect all unique tombstone + // Collect all unique tombstones let mut tombstone_map = overlapped_files[0].tombstone_map(); // Verify if the given files belong to the same partition and collect their tombstones. @@ -914,30 +790,10 @@ impl Compactor { .expect("no parittion sort key in catalog") .filter_to(&merged_schema.primary_key()); - // Identify split time - let split_times = - Self::compute_split_time(min_time, max_time, total_size, max_desired_file_size); - - // Build compact logical plan - let plan = { - // split data to compact data into 2 files - if split_times.len() == 1 && split_times[0] == max_time { - // compact everything into one file - ReorgPlanner::new() - .compact_plan(Arc::clone(&merged_schema), query_chunks, sort_key.clone()) - .context(CompactLogicalPlanSnafu)? - } else { - // split compact query plan - ReorgPlanner::new() - .split_plan( - Arc::clone(&merged_schema), - query_chunks, - sort_key.clone(), - split_times, - ) - .context(CompactLogicalPlanSnafu)? - } - }; + // Build compact logical plan, compacting everything into one file + let plan = ReorgPlanner::new() + .compact_plan(Arc::clone(&merged_schema), query_chunks, sort_key.clone()) + .context(CompactLogicalPlanSnafu)?; let ctx = self.exec.new_context(ExecutorType::Reorg); let physical_plan = ctx @@ -947,6 +803,10 @@ impl Compactor { // Run to collect each stream of the plan let stream_count = physical_plan.output_partitioning().partition_count(); + + // Should be compacting to only one file + assert_eq!(stream_count, 1); + let mut compacted = Vec::with_capacity(stream_count); debug!("running plan with {} streams", stream_count); for i in 0..stream_count { @@ -1025,233 +885,6 @@ impl Compactor { Ok(()) } - // Split overlapped groups into smaller groups if there are so mnay files in each group or - // their total size is too large. - // - // Invariants - // - // 1. Input `groups`, each contains overlapped files but the groups do not overlap with each - // other. As a result, a group can include files whose total size is larger than our - // maximum allocated memory. - // 2. Output will be in OverlappedGroups type that include 2 different groups of files: - // - internal_overlapped_groups: groups of the input that are neither contains too many - // files nor their total file size is too large. Each of these groups do not overlap - // with any other groups. - // - external_overlapped_groups: subgroups of splitting too-large-or-too-many-file input - // groups. - // Each of these subgroups will overlap with at least one of the other subgroups. However, - // to correctly deduplicate data, each subgroup only includes contiguous sequence numbers. - // - // Example: - // The input `groups` consists of 4 groups of 9 chunks with time ranges as below. - // The chunks are named using their sequence number. `C4` means this chunk inlcudes - // data of sequence numner 4 (Note: a chunk can include data of a range of sequence numbers - // but to simplify the example, we make chunk contain only one sequence number) - // . G1: includes one chunk C4 - // . G2: inlcudes 4 overlapped C1, C2, C3, C6 - // . G3: inlcudes 3 overlapped C5, C6, C9 - // . G4: inlcudes C7 - // Time - // ────────────────────────────────────────────────────────────────────────────────────▶ - // - // ┌───────┐ ┌───────────┐ ┌───────┐ ┌────────┐ ┌──────────┐ - // │ C4 │ │ C1 │ │ C2 │ │ C5 │ │ C7 │ - // └───────┘ └───────────┘ └───────┘ └────────┘ └──────────┘ - // ┌───────┐ ┌────────────┐ - // │ C3 │ │ C8 │ - // └───────┘ └────────────┘ - // ┌─────────────┐ ┌───────────┐ - // │ C6 │ │ C9 │ - // └─────────────┘ └───────────┘ - // - // ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ - // G1[C4] G2[C1, C2, C3, C6] G3[C5, C8, C9] G4[C7] - // - // - // Since total size of files in G2 and G3 is over the limit `max_size_bytes`, they are split - // further, each into 2 groups. The output will inlcude 6 groups as follows: - // Time - // ────────────────────────────────────────────────────────────────────────────────────▶ - // - // ┌───────┐ ┌───────────┐ ┌───────┐ ┌────────┐ ┌──────────┐ - // │ C4 │ │ C1 │ │ C2 │ │ C5 │ │ C7 │ - // └───────┘ └───────────┘ └───────┘ └────────┘ └──────────┘ - // ┌───────┐ ┌────────────┐ - // │ C3 │ │ C8 │ - // └───────┘ └────────────┘ - // ┌─────────────┐ ┌───────────┐ - // │ C6 │ │ C9 │ - // └─────────────┘ └───────────┘ - // - // ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ - // G1[C4] G2[C1, C2]* G4[C5, C8]* G6[C7] - // G3[C3, C6]* G5[C9]* - // - // The newly split groups are G2, G3, G4 and G5 and marked with a star (*) at the end. - // Before splitting, the chunks are sorted by their sequence numbers to guarantee their data - // will be deduplicated correctly in this and future compaction cycles. As a consequence, - // the new group may inlcude non-overlapped chunks, e.g. G2. But this is not an issue becasue: - // . The compaction plan will discover whether the chunks overlap or not to avoid the - // actual deduplication work - // . We do want to compact/concat small continuous non-overlapped chunks (in later steps) so - // this can be considered as a part of that optimization - fn split_overlapped_groups( - groups: &mut Vec>, - max_size_bytes: i64, - max_file_count: i64, - ) -> Result { - let mut overlapped_groups = OverlappedGroups::new(groups.len() * 2); - let max_count = max_file_count.try_into().unwrap(); - for group in groups { - let total_size_bytes: i64 = group.iter().map(|f| f.file_size_bytes).sum(); - if group.len() == 1 || (group.len() <= max_count && total_size_bytes <= max_size_bytes) - { - overlapped_groups.add_internal_overalapped_group(group.to_vec()); - } else { - // Sort overlapped files on their min sequence number to ensure their split - // subgroups contain contiguous sequnce numbers - group.sort_by_key(|f| f.min_sequence_number); - - // Verify that the sorted ranges of [min_sequence_number, max_sequence_number] do - // not overlap if their time ranges overlap - // Note that: `https://github.com/influxdata/conductor/issues/1009` - // 1. the input groups includes time-overlaped files but 2 files in them may NOT - // overlap but both overlap with a third file - // 2. Since we split large compacted result into many files in previous cycles, - // files in this cycle can have exact same range of sequence numbers - // Points 1 and 2 together will lead to many non-time-overlapped files with the - // same sequence number ranges ending up in the same time-overlapped group - for i in 1..group.len() { - Self::verify_contiguous_files(&group[i - 1], &group[i])? - } - - while !group.is_empty() { - // limit file num - let mut count = max_count; - - // limit total file size - let mut size = 0; - for (i, item) in group.iter().enumerate() { - if i >= max_count { - count = max_count; - break; - } - - size += item.file_size_bytes; - if size >= max_size_bytes { - count = i + 1; - break; - } - } - - if count > group.len() { - count = group.len(); - } - let group_new = group.split_off(count); - overlapped_groups.add_external_overalapped_group(group.to_vec()); - *group = group_new; - } - } - } - - Ok(overlapped_groups) - } - - // Panic if the two given time are both time-overlapped and sequence-number-overlapped - // . file_1 and file_2 must belong to the same partition - // . file_1 must have range sequence number <= file_s' range sequence number - fn verify_contiguous_files(file_1: &ParquetFile, file_2: &ParquetFile) -> Result<()> { - assert!(file_1.partition_id == file_2.partition_id); - assert!(file_1.min_sequence_number <= file_2.min_sequence_number); - - // Overlap on both time and sequence number - if ((file_1.min_time >= file_2.min_time && file_1.min_time <= file_2.max_time) - || (file_2.min_time >= file_1.min_time && file_2.min_time <= file_1.max_time)) - && (file_1.max_sequence_number >= file_2.min_sequence_number) - { - return Err(crate::compact::Error::OverlapTimeAndSequenceNumber { - file_id_1: file_1.id, - file_id_2: file_2.id, - min_seq_1: file_1.min_sequence_number.get(), - max_seq_1: file_1.max_sequence_number.get(), - min_seq_2: file_2.min_sequence_number.get(), - max_seq_2: file_2.max_sequence_number.get(), - partition_id: file_1.partition_id, - }); - } - - Ok(()) - } - - // Given a list of parquet files that come from the same Table Partition, group files together - // if their (min_time, max_time) ranges overlap. Does not preserve or guarantee any ordering. - // If there are so many files in an overlapped group, the group will be split to ensure each - // group contains limited number of files - fn overlapped_groups( - parquet_files: Vec, - max_size_bytes: i64, - max_file_count: i64, - ) -> Result> { - // group overlap files - let mut overlapped_groups = - group_potential_duplicates(parquet_files).expect("Error grouping overlapped chunks"); - - // split overlapped groups into smaller groups if they include so many files - let overlapped_groups = - Self::split_overlapped_groups(&mut overlapped_groups, max_size_bytes, max_file_count)?; - - Ok(overlapped_groups.groups_with_min_time_and_size()) - } - - // Compute time to split data - // Return a list of times at which we want data to be split. The times are computed - // based on the max_desired_file_size each file should not exceed and the total_size this input - // time range [min_time, max_time] contains. - // The split times assume that the data is evenly distributed in the time range and if - // that is not the case the resulting files are not guaranteed to be below max_desired_file_size - // Hence, the range between two contiguous returned time is pecentage of - // max_desired_file_size/total_size of the time range - // Example: - // . Input - // min_time = 1 - // max_time = 21 - // total_size = 100 - // max_desired_file_size = 30 - // - // . Pecentage = 70/100 = 0.3 - // . Time range between 2 times = (21 - 1) * 0.3 = 6 - // - // . Output = [7, 13, 19] in which - // 7 = 1 (min_time) + 6 (time range) - // 13 = 7 (previous time) + 6 (time range) - // 19 = 13 (previous time) + 6 (time range) - fn compute_split_time( - min_time: i64, - max_time: i64, - total_size: i64, - max_desired_file_size: i64, - ) -> Vec { - // Too small to split - if total_size <= max_desired_file_size { - return vec![max_time]; - } - - let mut split_times = vec![]; - let percentage = max_desired_file_size as f64 / total_size as f64; - let mut min = min_time; - loop { - let split_time = min + ((max_time - min_time) as f64 * percentage).floor() as i64; - if split_time < max_time { - split_times.push(split_time); - min = split_time; - } else { - break; - } - } - - split_times - } - // remove fully processed tombstones async fn remove_fully_processed_tombstones( &self, @@ -1338,127 +971,78 @@ impl Compactor { count_pf == count_pt } - async fn add_tombstones_to_groups( + async fn add_tombstones_to_files( &self, - groups: Vec>, - ) -> Result> { + parquet_files: Vec, + ) -> Result> { let mut repo = self.catalog.repositories().await; let tombstone_repo = repo.tombstones(); - let mut overlapped_file_with_tombstones_groups = Vec::with_capacity(groups.len()); - - // For each group of overlapping parquet files, - for parquet_files in groups { - // Skip over any empty groups - if parquet_files.is_empty() { - continue; - } - - // Find the time range of the group - let overall_min_time = parquet_files - .iter() - .map(|pf| pf.min_time) - .min() - .expect("The group was checked for emptiness above"); - let overall_max_time = parquet_files - .iter() - .map(|pf| pf.max_time) - .max() - .expect("The group was checked for emptiness above"); - // For a tombstone to be relevant to any parquet file, the tombstone must have a - // sequence number greater than the parquet file's max_sequence_number. If we query - // for all tombstones with a sequence number greater than the smallest parquet file - // max_sequence_number in the group, we'll get all tombstones that could possibly - // be relevant for this group. - let overall_min_max_sequence_number = parquet_files - .iter() - .map(|pf| pf.max_sequence_number) - .min() - .expect("The group was checked for emptiness above"); - - // Query the catalog for the tombstones that could be relevant to any parquet files - // in this group. - let tombstones = tombstone_repo - .list_tombstones_for_time_range( - // We've previously grouped the parquet files by sequence and table IDs, so - // these values will be the same for all parquet files in the group. - parquet_files[0].sequencer_id, - parquet_files[0].table_id, - overall_min_max_sequence_number, - overall_min_time, - overall_max_time, - ) - .await - .context(QueryingTombstonesSnafu)?; - - let parquet_files = parquet_files - .into_iter() - .map(|data| { - // Filter the set of tombstones relevant to any file in the group to just those - // relevant to this particular parquet file. - let relevant_tombstones = tombstones - .iter() - .cloned() - .filter(|t| { - t.sequence_number > data.max_sequence_number - && ((t.min_time <= data.min_time && t.max_time >= data.min_time) - || (t.min_time > data.min_time && t.min_time <= data.max_time)) - }) - .collect(); - - ParquetFileWithTombstone::new(Arc::new(data), relevant_tombstones) - }) - .collect(); - - overlapped_file_with_tombstones_groups.push(GroupWithTombstones { - parquet_files, - tombstones, - }); + // Skip over any empty groups + if parquet_files.is_empty() { + return Ok(None); } - Ok(overlapped_file_with_tombstones_groups) - } -} + // Find the time range of the group + let overall_min_time = parquet_files + .iter() + .map(|pf| pf.min_time) + .min() + .expect("The group was checked for emptiness above"); + let overall_max_time = parquet_files + .iter() + .map(|pf| pf.max_time) + .max() + .expect("The group was checked for emptiness above"); + // For a tombstone to be relevant to any parquet file, the tombstone must have a + // sequence number greater than the parquet file's max_sequence_number. If we query + // for all tombstones with a sequence number greater than the smallest parquet file + // max_sequence_number in the group, we'll get all tombstones that could possibly + // be relevant for this group. + let overall_min_max_sequence_number = parquet_files + .iter() + .map(|pf| pf.max_sequence_number) + .min() + .expect("The group was checked for emptiness above"); -#[derive(Debug, Clone, PartialEq)] -struct OverlappedGroups { - // Groups that contain overlapped files but the groups do not overlap - // with other groups - internal_overlapped_groups: Vec>, + // Query the catalog for the tombstones that could be relevant to any parquet files + // in this group. + let tombstones = tombstone_repo + .list_tombstones_for_time_range( + // We've previously grouped the parquet files by sequence and table IDs, so + // these values will be the same for all parquet files in the group. + parquet_files[0].sequencer_id, + parquet_files[0].table_id, + overall_min_max_sequence_number, + overall_min_time, + overall_max_time, + ) + .await + .context(QueryingTombstonesSnafu)?; - // Groups that overlap with at least one of other groups - external_overlapped_groups: Vec>, -} + let parquet_files = parquet_files + .into_iter() + .map(|data| { + // Filter the set of tombstones relevant to any file in the group to just those + // relevant to this particular parquet file. + let relevant_tombstones = tombstones + .iter() + .cloned() + .filter(|t| { + t.sequence_number > data.max_sequence_number + && ((t.min_time <= data.min_time && t.max_time >= data.min_time) + || (t.min_time > data.min_time && t.min_time <= data.max_time)) + }) + .collect(); -impl OverlappedGroups { - pub fn new(len: usize) -> Self { - Self { - internal_overlapped_groups: Vec::with_capacity(len), - external_overlapped_groups: Vec::with_capacity(len), - } - } + ParquetFileWithTombstone::new(Arc::new(data), relevant_tombstones) + }) + .collect(); - pub fn add_internal_overalapped_group(&mut self, group: Vec) { - self.internal_overlapped_groups.push(group); - } - - pub fn add_external_overalapped_group(&mut self, group: Vec) { - self.external_overlapped_groups.push(group); - } - - pub fn groups_with_min_time_and_size(self) -> Vec { - let mut groups = Vec::with_capacity( - self.internal_overlapped_groups.len() + self.external_overlapped_groups.len(), - ); - - for group in self.internal_overlapped_groups { - groups.push(GroupWithMinTimeAndSize::new(group, false)); - } - for group in self.external_overlapped_groups { - groups.push(GroupWithMinTimeAndSize::new(group, true)); - } - - groups + Ok(Some(GroupWithTombstones { + parquet_files, + tombstones, + })) } } @@ -1512,48 +1096,11 @@ mod tests { use iox_time::SystemProvider; use parquet_file::ParquetFilePath; use schema::{selection::Selection, sort::SortKey}; - use std::sync::atomic::{AtomicI64, Ordering}; use test_helpers::maybe_start_logging; - // Simulate unique ID generation - static NEXT_ID: AtomicI64 = AtomicI64::new(0); - static TEST_MAX_SIZE_BYTES: i64 = 100000; - static TEST_MAX_FILE_COUNT: i64 = 10; - - #[tokio::test] - async fn test_compute_split_time() { - let min_time = 1; - let max_time = 11; - let total_size = 100; - let max_desired_file_size = 100; - - // no split - let result = - Compactor::compute_split_time(min_time, max_time, total_size, max_desired_file_size); - assert_eq!(result.len(), 1); - assert_eq!(result[0], max_time); - - // split 70% and 30% - let max_desired_file_size = 70; - let result = - Compactor::compute_split_time(min_time, max_time, total_size, max_desired_file_size); - // only need to store the last split time - assert_eq!(result.len(), 1); - assert_eq!(result[0], 8); // = 1 (min_time) + 7 - - // split 40%, 40%, 20% - let max_desired_file_size = 40; - let result = - Compactor::compute_split_time(min_time, max_time, total_size, max_desired_file_size); - // store first and second split time - assert_eq!(result.len(), 2); - assert_eq!(result[0], 5); // = 1 (min_time) + 4 - assert_eq!(result[1], 9); // = 5 (previous split_time) + 4 - } - #[tokio::test] // This is integration test to verify all pieces are put together correctly - async fn test_compact_partition() { + async fn test_compact_partition_one_file_one_tombstone() { test_helpers::maybe_start_logging(); let catalog = TestCatalog::new(); @@ -1596,7 +1143,7 @@ mod tests { .with_sequencer(&sequencer) .create_tombstone(20, 6000, 12000, "tag1=VT") .await; - // Should have 1 tomstone + // Should have 1 tombstone let count = catalog.count_tombstones_for_table(table.table.id).await; assert_eq!(count, 1); @@ -1625,11 +1172,7 @@ mod tests { ); let compact_and_upgrade = compactor - .groups_to_compact_and_files_to_upgrade( - partition.partition.id, - compactor.config.compaction_max_size_bytes(), - compactor.config.compaction_max_file_count(), - ) + .groups_to_compact_and_files_to_upgrade(partition.partition.id) .await .unwrap(); compactor @@ -1639,23 +1182,19 @@ mod tests { &table_schema, partition.partition.id, compact_and_upgrade, - compaction_max_size_bytes, ) .await .unwrap(); - // should have 2 non-deleted level_0 files. The original file was marked deleted and not counted + // should have 1 non-deleted level-1 file. The original file was marked deleted and not + // counted. let mut files = catalog.list_by_table_not_to_delete(table.table.id).await; - assert_eq!(files.len(), 2); - // 2 newly created level-1 files as the result of compaction + assert_eq!(files.len(), 1); + // 1 newly created level-1 file as the result of compaction assert_eq!( (files[0].id.get(), files[0].compaction_level), (2, FILE_NON_OVERLAPPED_COMAPCTION_LEVEL) ); - assert_eq!( - (files[1].id.get(), files[1].compaction_level), - (3, FILE_NON_OVERLAPPED_COMAPCTION_LEVEL) - ); // processed tombstones created and deleted inside compact_partition function let count = catalog @@ -1666,18 +1205,17 @@ mod tests { let count = catalog.count_tombstones_for_table(table.table.id).await; assert_eq!(count, 0); - // Verify the files were pushed to the object store + // Verify the file was pushed to the object store let object_store = catalog.object_store(); let list = object_store.list(None).await.unwrap(); let object_store_files: Vec<_> = list.try_collect().await.unwrap(); - // Original + 2 compacted - assert_eq!(object_store_files.len(), 3); + // Original + 1 compacted + assert_eq!(object_store_files.len(), 2); // ------------------------------------------------ // Verify the parquet file content // query the chunks - // most recent compacted second half (~10%) let files1 = files.pop().unwrap(); let batches = read_parquet_file(&table, files1).await; assert_batches_sorted_eq!( @@ -1686,18 +1224,6 @@ mod tests { "| field_int | tag1 | time |", "+-----------+------+-----------------------------+", "| 70 | UT | 1970-01-01T00:00:00.000020Z |", - "+-----------+------+-----------------------------+", - ], - &batches - ); - // least recent compacted first half (~90%) - let files2 = files.pop().unwrap(); - let batches = read_parquet_file(&table, files2).await; - assert_batches_sorted_eq!( - &[ - "+-----------+------+-----------------------------+", - "| field_int | tag1 | time |", - "+-----------+------+-----------------------------+", "| 1000 | WA | 1970-01-01T00:00:00.000008Z |", "+-----------+------+-----------------------------+", ], @@ -1780,8 +1306,10 @@ mod tests { Arc::new(metric::Registry::new()), ); - // parquet files - // pf1 does not overlap with any and very large ==> will be upgraded to level 1 during compaction + // parquet files that are all in the same partition and should all end up in the same + // compacted file + + // pf1 does not overlap with any and is very large partition .create_parquet_file_with_min_max_size_and_creation_time( &lp1, @@ -1793,7 +1321,7 @@ mod tests { 20, ) .await; - // pf2 overlaps with pf3 ==> compacted and marked to_delete with a timestamp + // pf2 overlaps with pf3 partition .create_parquet_file_with_min_max_size_and_creation_time( &lp2, @@ -1801,11 +1329,11 @@ mod tests { 5, 8000, 20000, - 100, // smal file + 100, // small file time.now().timestamp_nanos(), ) .await; - // pf3 overlaps with pf2 ==> compacted and marked to_delete with a timestamp + // pf3 overlaps with pf2 partition .create_parquet_file_with_min_max_size_and_creation_time( &lp3, @@ -1817,7 +1345,8 @@ mod tests { time.now().timestamp_nanos(), ) .await; - // pf4 does not overlap with any but small => will also be compacted with pf2 and pf3 + + // pf4 does not overlap with any but is small partition .create_parquet_file_with_min_max_size_and_creation_time( &lp4, @@ -1829,6 +1358,7 @@ mod tests { time.now().timestamp_nanos(), ) .await; + // should have 4 level-0 files before compacting let count = catalog.count_level_0_files(sequencer.sequencer.id).await; assert_eq!(count, 4); @@ -1849,7 +1379,7 @@ mod tests { .with_sequencer(&sequencer) .create_tombstone(22, 1000, 2000, "tag1=VT") .await; - // should have 3 tomstones + // should have 3 tombstones let count = catalog.count_tombstones_for_table(table.table.id).await; assert_eq!(count, 3); // should not have any processed tombstones for any tombstones @@ -1863,11 +1393,7 @@ mod tests { // ------------------------------------------------ // Compact let compact_and_upgrade = compactor - .groups_to_compact_and_files_to_upgrade( - partition.partition.id, - compactor.config.compaction_max_size_bytes(), - compactor.config.compaction_max_file_count(), - ) + .groups_to_compact_and_files_to_upgrade(partition.partition.id) .await .unwrap(); compactor @@ -1877,23 +1403,18 @@ mod tests { &table_schema, partition.partition.id, compact_and_upgrade, - compactor.config.compaction_max_size_bytes(), ) .await .unwrap(); - // Should have 2 non-soft-deleted files: pf1 not compacted and stay, and 1 newly created - // after compacting pf2, pf3, pf4 all very small into one file + // Should have 1 non-soft-deleted files: the one newly created after compacting pf1, pf2, + // pf3, pf4 all into one file let mut files = catalog.list_by_table_not_to_delete(table.table.id).await; - assert_eq!(files.len(), 2); - // pf1 upgraded to level FILE_NON_OVERLAPPED_COMAPCTION_LEVEL + assert_eq!(files.len(), 1); + // 1 newly created level-FILE_NON_OVERLAPPED_COMAPCTION_LEVEL file as the result of + // compaction assert_eq!( (files[0].id.get(), files[0].compaction_level), - (1, FILE_NON_OVERLAPPED_COMAPCTION_LEVEL) - ); - // 1 newly created level FILE_NON_OVERLAPPED_COMAPCTION_LEVEL files as the result of compaction - assert_eq!( - (files[1].id.get(), files[1].compaction_level), (5, FILE_NON_OVERLAPPED_COMAPCTION_LEVEL) ); @@ -1912,34 +1433,22 @@ mod tests { // Verify the parquet file content // Compacted file - let file2 = files.pop().unwrap(); - let batches = read_parquet_file(&table, file2).await; - assert_batches_sorted_eq!( - &[ - "+-----------+------+------+------+-----------------------------+", - "| field_int | tag1 | tag2 | tag3 | time |", - "+-----------+------+------+------+-----------------------------+", - "| 10 | VT | | | 1970-01-01T00:00:00.000006Z |", - "| 1500 | WA | | | 1970-01-01T00:00:00.000008Z |", - "| 270 | UT | | | 1970-01-01T00:00:00.000025Z |", - "| 70 | UT | | | 1970-01-01T00:00:00.000020Z |", - "| 1600 | | WA | 10 | 1970-01-01T00:00:00.000028Z |", - "| 20 | | VT | 20 | 1970-01-01T00:00:00.000026Z |", - "+-----------+------+------+------+-----------------------------+", - ], - &batches - ); - // Non-compacted file let file1 = files.pop().unwrap(); let batches = read_parquet_file(&table, file1).await; assert_batches_sorted_eq!( &[ - "+-----------+------+--------------------------------+", - "| field_int | tag1 | time |", - "+-----------+------+--------------------------------+", - "| 10 | VT | 1970-01-01T00:00:00.000000020Z |", - "| 1000 | WA | 1970-01-01T00:00:00.000000010Z |", - "+-----------+------+--------------------------------+", + "+-----------+------+------+------+--------------------------------+", + "| field_int | tag1 | tag2 | tag3 | time |", + "+-----------+------+------+------+--------------------------------+", + "| 10 | VT | | | 1970-01-01T00:00:00.000000020Z |", + "| 10 | VT | | | 1970-01-01T00:00:00.000006Z |", + "| 1000 | WA | | | 1970-01-01T00:00:00.000000010Z |", + "| 1500 | WA | | | 1970-01-01T00:00:00.000008Z |", + "| 270 | UT | | | 1970-01-01T00:00:00.000025Z |", + "| 70 | UT | | | 1970-01-01T00:00:00.000020Z |", + "| 1600 | | WA | 10 | 1970-01-01T00:00:00.000028Z |", + "| 20 | | VT | 20 | 1970-01-01T00:00:00.000026Z |", + "+-----------+------+------+------+--------------------------------+", ], &batches ); @@ -1948,7 +1457,7 @@ mod tests { } #[tokio::test] - async fn test_compact_one_file() { + async fn test_compact_one_file_no_tombstones_is_complete() { let catalog = TestCatalog::new(); let lp = vec![ @@ -2007,7 +1516,6 @@ mod tests { &table.table, &table_schema, &partition.partition, - compactor.config.compaction_max_size_bytes(), ) .await .unwrap(); @@ -2015,7 +1523,7 @@ mod tests { // ------------------------------------------------ // File without tombstones - let mut pf = ParquetFileWithTombstone::new(Arc::new(parquet_file), vec![]); + let pf = ParquetFileWithTombstone::new(Arc::new(parquet_file), vec![]); // Nothing compacted for one file without tombstones let result = compactor .compact( @@ -2024,164 +1532,10 @@ mod tests { &table.table, &table_schema, &partition.partition, - compactor.config.compaction_max_size_bytes(), ) .await .unwrap(); assert!(result.is_empty()); - - // ------------------------------------------------ - // Let add a tombstone - let tombstone = table - .with_sequencer(&sequencer) - .create_tombstone(20, 6000, 12000, "tag1=VT") - .await; - pf.add_tombstones(vec![tombstone.tombstone.clone()]); - - // should have compacted data - let batches = compactor - .compact( - vec![pf], - &ns.namespace, - &table.table, - &table_schema, - &partition.partition, - compactor.config.compaction_max_size_bytes(), - ) - .await - .unwrap(); - // One output batch because the input is too small to split - assert_eq!(batches.len(), 1); - - // Collect the results for inspection. - let batches = batches - .into_iter() - .map(|v| async { - datafusion::physical_plan::common::collect(v.data) - .await - .expect("failed to collect record batches") - }) - .collect::>() - .collect::>() - .await; - - // Data: row tag1=VT was removed - assert_batches_sorted_eq!( - &[ - "+-----------+------+-----------------------------+", - "| field_int | tag1 | time |", - "+-----------+------+-----------------------------+", - "| 1000 | WA | 1970-01-01T00:00:00.000008Z |", - "| 70 | UT | 1970-01-01T00:00:00.000020Z |", - "+-----------+------+-----------------------------+", - ], - &batches[0] - ); - } - - #[tokio::test] - async fn test_compact_one_file_no_split() { - maybe_start_logging(); - let catalog = TestCatalog::new(); - - let lp = vec![ - "table,tag1=WA field_int=1000i 8000", - "table,tag1=VT field_int=10i 10000", - "table,tag1=UT field_int=70i 20000", - ] - .join("\n"); - let ns = catalog.create_namespace("ns").await; - let sequencer = ns.create_sequencer(1).await; - let table = ns.create_table("table").await; - table.create_column("tag1", ColumnType::Tag).await; - table.create_column("field_int", ColumnType::I64).await; - table.create_column("time", ColumnType::Time).await; - let table_schema = table.catalog_schema().await; - let partition = table - .with_sequencer(&sequencer) - .create_partition("part") - .await; - let parquet_file = partition - .create_parquet_file_with_min_max(&lp, 1, 1, 8000, 20000) - .await - .parquet_file; - - let split_percentage = 100; - let max_concurrent_compaction_size_bytes = 100000; - let compaction_max_size_bytes = 100000; - let compaction_max_file_count = 10; - let compaction_max_desired_file_size_bytes = 30000; - let compactor = Compactor::new( - vec![sequencer.sequencer.id], - Arc::clone(&catalog.catalog), - ParquetStorage::new(Arc::clone(&catalog.object_store)), - Arc::new(Executor::new(1)), - Arc::new(SystemProvider::new()), - BackoffConfig::default(), - // split_percentage = 100 which means no split - CompactorConfig::new( - split_percentage, - max_concurrent_compaction_size_bytes, - compaction_max_size_bytes, - compaction_max_file_count, - compaction_max_desired_file_size_bytes, - ), - Arc::new(metric::Registry::new()), - ); - - let sort_key = SortKey::from_columns(["tag1", "time"]); - let partition = partition.update_sort_key(sort_key).await; - - // ------------------------------------------------ - // Let add a tombstone - let tombstone = table - .with_sequencer(&sequencer) - .create_tombstone(20, 6000, 12000, "tag1=VT") - .await; - let pf = ParquetFileWithTombstone::new( - Arc::new(parquet_file), - vec![tombstone.tombstone.clone()], - ); - - // should have compacted datas - let batches = compactor - .compact( - vec![pf], - &ns.namespace, - &table.table, - &table_schema, - &partition.partition, - compactor.config.compaction_max_size_bytes(), - ) - .await - .unwrap(); - // 1 output set becasue split rule = 100% - assert_eq!(batches.len(), 1); - - // Collect the results for inspection. - let batches = batches - .into_iter() - .map(|v| async { - datafusion::physical_plan::common::collect(v.data) - .await - .expect("failed to collect record batches") - }) - .collect::>() - .collect::>() - .await; - - // Data: row tag1=VT was removed - assert_batches_sorted_eq!( - &[ - "+-----------+------+-----------------------------+", - "| field_int | tag1 | time |", - "+-----------+------+-----------------------------+", - "| 1000 | WA | 1970-01-01T00:00:00.000008Z |", - "| 70 | UT | 1970-01-01T00:00:00.000020Z |", - "+-----------+------+-----------------------------+", - ], - &batches[0] - ); } #[tokio::test] @@ -2213,7 +1567,7 @@ mod tests { .with_sequencer(&sequencer) .create_partition("part") .await; - // Create 2 parquet files with total size = 140000 (file1) + 100000 (file2) = 240000 + // Create 2 parquet files in the same partition let parquet_file1 = partition .create_parquet_file_with_min_max_size(&lp1, 1, 5, 8000, 20000, 140000) .await @@ -2260,7 +1614,7 @@ mod tests { // File 2 without tombstones let pf2 = ParquetFileWithTombstone::new(Arc::new(parquet_file2), vec![]); - // Compact them + // Compact them into 1 batch/file let batches = compactor .compact( vec![pf1, pf2], @@ -2268,12 +1622,10 @@ mod tests { &table.table, &table_schema, &partition.partition, - compactor.config.compaction_max_size_bytes(), ) .await .unwrap(); - // 3 sets based on 42% split rule = 100000 (max_desired_file_size) / 240000 (total_size of 2 files) - assert_eq!(batches.len(), 3); + assert_eq!(batches.len(), 1); // Collect the results for inspection. let batches = batches @@ -2294,34 +1646,14 @@ mod tests { "+-----------+------+-----------------------------+", "| field_int | tag1 | time |", "+-----------+------+-----------------------------+", + "| 70 | UT | 1970-01-01T00:00:00.000020Z |", + "| 270 | UT | 1970-01-01T00:00:00.000025Z |", "| 10 | VT | 1970-01-01T00:00:00.000006Z |", "| 1500 | WA | 1970-01-01T00:00:00.000008Z |", "+-----------+------+-----------------------------+", ], &batches[0] ); - // second set contains the next least recent one row - assert_batches_sorted_eq!( - &[ - "+-----------+------+-----------------------------+", - "| field_int | tag1 | time |", - "+-----------+------+-----------------------------+", - "| 70 | UT | 1970-01-01T00:00:00.000020Z |", - "+-----------+------+-----------------------------+", - ], - &batches[1] - ); - // third set contains most recent one row - assert_batches_sorted_eq!( - &[ - "+-----------+------+-----------------------------+", - "| field_int | tag1 | time |", - "+-----------+------+-----------------------------+", - "| 270 | UT | 1970-01-01T00:00:00.000025Z |", - "+-----------+------+-----------------------------+", - ], - &batches[2] - ); } #[tokio::test] @@ -2418,7 +1750,7 @@ mod tests { // File 3 without tombstones let pf3 = ParquetFileWithTombstone::new(Arc::new(parquet_file3), vec![]); - // Compact them + // Compact them into 1 batch/file let batches = compactor .compact( vec![pf1.clone(), pf2.clone(), pf3.clone()], @@ -2426,17 +1758,13 @@ mod tests { &table.table, &table_schema, &partition.partition, - compactor.config.compaction_max_size_bytes(), ) .await .unwrap(); - - // 2 sets based on the 83% split rule: 100000 (max_desired_file_size) / 120000 (total fle size) - assert_eq!(batches.len(), 2); + assert_eq!(batches.len(), 1); // Sort keys should be the same as was passed in to compact assert_eq!(batches[0].meta.sort_key.as_ref().unwrap(), &sort_key); - assert_eq!(batches[1].meta.sort_key.as_ref().unwrap(), &sort_key); // Collect the results for inspection. let batches = batches @@ -2451,7 +1779,6 @@ mod tests { .await; // Data: Should have 6 rows left - // first set contains least recent 5 rows assert_batches_sorted_eq!( &[ "+-----------+------+------+------+-----------------------------+", @@ -2462,69 +1789,11 @@ mod tests { "| 1500 | | WA | 10 | 1970-01-01T00:00:00.000008Z |", "| 1500 | WA | | | 1970-01-01T00:00:00.000008Z |", "| 70 | UT | | | 1970-01-01T00:00:00.000020Z |", + "| 270 | UT | | | 1970-01-01T00:00:00.000025Z |", "+-----------+------+------+------+-----------------------------+", ], &batches[0] ); - // second set contains most recent one row - assert_batches_sorted_eq!( - &[ - "+-----------+------+------+------+-----------------------------+", - "| field_int | tag1 | tag2 | tag3 | time |", - "+-----------+------+------+------+-----------------------------+", - "| 270 | UT | | | 1970-01-01T00:00:00.000025Z |", - "+-----------+------+------+------+-----------------------------+", - ], - &batches[1] - ); - } - - /// A test utility function to make minimially-viable ParquetFile records with particular - /// min/max times. Does not involve the catalog at all. - fn arbitrary_parquet_file(min_time: i64, max_time: i64) -> ParquetFile { - arbitrary_parquet_file_with_size(min_time, max_time, 100) - } - - fn arbitrary_parquet_file_with_size_and_sequence_number( - min_time: i64, - max_time: i64, - min_seq: i64, - max_seq: i64, - file_size_bytes: i64, - ) -> ParquetFile { - let id = NEXT_ID.fetch_add(1, Ordering::SeqCst); - ParquetFile { - id: ParquetFileId::new(id), - sequencer_id: SequencerId::new(0), - namespace_id: NamespaceId::new(0), - table_id: TableId::new(0), - partition_id: PartitionId::new(0), - object_store_id: Uuid::new_v4(), - min_sequence_number: SequenceNumber::new(min_seq), - max_sequence_number: SequenceNumber::new(max_seq), - min_time: Timestamp::new(min_time), - max_time: Timestamp::new(max_time), - to_delete: None, - file_size_bytes, - row_count: 0, - compaction_level: INITIAL_COMPACTION_LEVEL, // level of file of new writes - created_at: Timestamp::new(1), - column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), - } - } - - fn arbitrary_parquet_file_with_size( - min_time: i64, - max_time: i64, - file_size_bytes: i64, - ) -> ParquetFile { - arbitrary_parquet_file_with_size_and_sequence_number( - min_time, - max_time, - 0, - 1, - file_size_bytes, - ) } #[tokio::test] @@ -2653,1167 +1922,6 @@ mod tests { ); } - #[test] - fn test_overlapped_groups_no_overlap() { - // Given two files that don't overlap, - let pf1 = arbitrary_parquet_file(1, 2); - let pf2 = arbitrary_parquet_file(3, 4); - - let groups = Compactor::overlapped_groups( - vec![pf1.clone(), pf2.clone()], - TEST_MAX_SIZE_BYTES, - TEST_MAX_FILE_COUNT, - ) - .unwrap(); - - // They should be 2 groups - assert_eq!(groups.len(), 2, "There should have been two group"); - - assert!(groups[0].parquet_files.contains(&pf1)); - assert!(groups[1].parquet_files.contains(&pf2)); - } - - #[test] - fn test_overlapped_groups_with_overlap() { - // Given two files that do overlap, - let pf1 = arbitrary_parquet_file(1, 3); - let pf2 = arbitrary_parquet_file(2, 4); - - let groups = Compactor::overlapped_groups( - vec![pf1.clone(), pf2.clone()], - TEST_MAX_SIZE_BYTES, - TEST_MAX_FILE_COUNT, - ) - .unwrap(); - - // They should be in one group (order not guaranteed) - assert_eq!(groups.len(), 1, "There should have only been one group"); - - let group = &groups[0]; - assert_eq!( - group.parquet_files.len(), - 2, - "The one group should have contained 2 items" - ); - assert!(group.parquet_files.contains(&pf1)); - assert!(group.parquet_files.contains(&pf2)); - } - - #[test] - fn test_overlapped_groups_many_groups() { - let overlaps_many = arbitrary_parquet_file(5, 10); - let contained_completely_within = arbitrary_parquet_file(6, 7); - let max_equals_min = arbitrary_parquet_file(3, 5); - let min_equals_max = arbitrary_parquet_file(10, 12); - - let alone = arbitrary_parquet_file(30, 35); - - let another = arbitrary_parquet_file(13, 15); - let partial_overlap = arbitrary_parquet_file(14, 16); - - // Given a bunch of files in an arbitrary order, - let all = vec![ - min_equals_max.clone(), - overlaps_many.clone(), - alone.clone(), - another.clone(), - max_equals_min.clone(), - contained_completely_within.clone(), - partial_overlap.clone(), - ]; - - let mut groups = - Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap(); - dbg!(&groups); - - assert_eq!(groups.len(), 3); - - // Order of the groups is not guaranteed; sort by min_time of group so we can test membership - groups.sort_by_key(|g| g.min_time); - - let alone_group = &groups[2]; - assert_eq!(alone_group.min_time, Timestamp::new(30)); - assert!( - alone_group.parquet_files.contains(&alone), - "Actually contains: {:#?}", - alone_group - ); - - let another_group = &groups[1]; - assert_eq!(another_group.min_time, Timestamp::new(13)); - assert!( - another_group.parquet_files.contains(&another), - "Actually contains: {:#?}", - another_group - ); - assert!( - another_group.parquet_files.contains(&partial_overlap), - "Actually contains: {:#?}", - another_group - ); - - let many_group = &groups[0]; - assert_eq!(many_group.min_time, Timestamp::new(3)); - assert!( - many_group.parquet_files.contains(&overlaps_many), - "Actually contains: {:#?}", - many_group - ); - assert!( - many_group - .parquet_files - .contains(&contained_completely_within), - "Actually contains: {:#?}", - many_group - ); - assert!( - many_group.parquet_files.contains(&max_equals_min), - "Actually contains: {:#?}", - many_group - ); - assert!( - many_group.parquet_files.contains(&min_equals_max), - "Actually contains: {:#?}", - many_group - ); - } - - #[test] - fn test_group_small_contiguous_overlapped_groups() { - // Given two files that don't overlap, - let pf1 = arbitrary_parquet_file_with_size(1, 2, 100); - let pf2 = arbitrary_parquet_file_with_size(3, 4, 200); - - let overlapped_groups = Compactor::overlapped_groups( - vec![pf1.clone(), pf2.clone()], - TEST_MAX_SIZE_BYTES, - TEST_MAX_FILE_COUNT, - ) - .unwrap(); - // 2 overlapped groups - assert_eq!(overlapped_groups.len(), 2); - let g1 = GroupWithMinTimeAndSize { - parquet_files: vec![pf1.clone()], - min_time: Timestamp::new(1), - total_file_size_bytes: 100, - overlapped_with_other_groups: false, - }; - let g2 = GroupWithMinTimeAndSize { - parquet_files: vec![pf2.clone()], - min_time: Timestamp::new(3), - total_file_size_bytes: 200, - overlapped_with_other_groups: false, - }; - // They should each be in their own group - assert_eq!(overlapped_groups, vec![g1, g2]); - - // Group them by size - let compaction_max_size_bytes = 100000; - let groups = Compactor::group_small_contiguous_groups( - overlapped_groups, - compaction_max_size_bytes, - TEST_MAX_FILE_COUNT, - ); - // 2 small groups should be grouped in one - assert_eq!(groups.len(), 1); - assert_eq!(groups, vec![vec![pf1, pf2]]); - } - - #[test] - fn test_split_overlapped_groups_panic() { - let max_size_bytes = 1000; - let max_file_count = 2; - - // 2 files, each smaller than max_size_bytes but total is larger --> will be split - // files have overlapped range of sequence numbers: [5, 11] and [7, 15] - let f1 = - arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 5, 11, max_size_bytes - 10); - let f2 = - arbitrary_parquet_file_with_size_and_sequence_number(7, 11, 7, 15, max_size_bytes - 10); - let mut groups = vec![vec![f1, f2]]; - - // should error - let err = Compactor::split_overlapped_groups(&mut groups, max_size_bytes, max_file_count) - .unwrap_err(); - assert!(err - .to_string() - .contains("Two time-overlapped files with overlapped range sequence numbers")) - } - - // test of `https://github.com/influxdata/conductor/issues/1009` - #[test] - fn test_split_overlapped_same_sequence_for_non_time_overlapped() { - let max_size_bytes = 1000; - let max_file_count = 2; - - // f1 anf f2 are not time-overlapped but have same sequence number range [5, 6] - // f3 overlaps with both f1 and f2 but have higher range sequence number [7, 7] - let f1 = - arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 5, 6, max_size_bytes - 10); - let f2 = - arbitrary_parquet_file_with_size_and_sequence_number(11, 20, 5, 6, max_size_bytes - 10); - let f3 = - arbitrary_parquet_file_with_size_and_sequence_number(8, 12, 7, 7, max_size_bytes - 10); - let mut groups = vec![vec![f1, f2, f3]]; - - // should not panic - Compactor::split_overlapped_groups(&mut groups, max_size_bytes, max_file_count).unwrap(); - } - - // This is specific unit test for split_overlapped_groups but it is a subtest of test_limit_size_and_num_files - // Keep all the variables the same names in both tests for us to follow them easily - #[test] - fn test_split_overlapped_groups_sequence_number_same_order_with_time() { - let compaction_max_size_bytes = 100000; - - // oldest overlapped and very small - let overlaps_many = arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 5, 5, 400); - let contained_completely_within = - arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 500); - let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number(3, 5, 3, 3, 400); - let min_equals_max = - arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 10, 10, 500); - let oldest_overlapped_group = vec![ - overlaps_many.clone(), - contained_completely_within.clone(), - max_equals_min.clone(), - min_equals_max.clone(), - ]; - - // newest files and very large - let alone = arbitrary_parquet_file_with_size_and_sequence_number( - 30, - 35, - 30, - 30, - compaction_max_size_bytes + 200, - ); // too large to group - let newest_overlapped_group = vec![alone.clone()]; - - // small files in the middle - let another = arbitrary_parquet_file_with_size_and_sequence_number(13, 15, 13, 13, 1000); - let partial_overlap = - arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 14, 14, 2000); - let middle_overlapped_group = vec![another.clone(), partial_overlap.clone()]; - - let mut overlapped_groups = vec![ - oldest_overlapped_group, - newest_overlapped_group, - middle_overlapped_group, - ]; - - let max_size_bytes = 1000; - let max_file_count = 2; - - // Three input groups but will produce 5 output ones becasue of limit in size and file count. - // Note that the 3 input groups each includes overlapped files but the groups do not overlap. - // The function split_overlapped_groups is to split each overlapped group. It does not merge any groups. - let groups = Compactor::split_overlapped_groups( - &mut overlapped_groups, - max_size_bytes, - max_file_count, - ) - .unwrap(); - - // one intact group - assert_eq!(groups.internal_overlapped_groups.len(), 1); - // four newly splitted groups from two large groups - assert_eq!(groups.external_overlapped_groups.len(), 4); - - // oldest_overlapped_group was split into external_overlapped_groups[0] and [1] due to file count limit - // the chunks split by their min_seq order - assert_eq!(groups.external_overlapped_groups[0].len(), 2); // reach limit file count - assert!(groups.external_overlapped_groups[0].contains(&max_equals_min)); // min_time = 3, min_seq = 3 - assert!(groups.external_overlapped_groups[0].contains(&overlaps_many)); // min_time = 5, min_seq = 5 - assert_eq!(groups.external_overlapped_groups[1].len(), 2); // reach limit file count - assert!(groups.external_overlapped_groups[1].contains(&contained_completely_within)); // min_time = 6, min_seq = 6 - assert!(groups.external_overlapped_groups[1].contains(&min_equals_max)); // min_time = 10, min_seq = 10 - - // newest_overlapped_group stays the same length one file and the corresponding output is - // groups.internal_overlapped_groups[0] - assert_eq!(groups.internal_overlapped_groups[0].len(), 1); // reach limit file size - assert!(groups.internal_overlapped_groups[0].contains(&alone)); - - // middle_overlapped_group was split into groups.external_overlapped_groups[2] and [3] due to size limit - // the chunks split by their min_seq order - assert_eq!(groups.external_overlapped_groups[2].len(), 1); // reach limit file size - assert!(groups.external_overlapped_groups[2].contains(&another)); // min_time = 13, min_seq = 13 - assert_eq!(groups.external_overlapped_groups[3].len(), 1); // reach limit file size - assert!(groups.external_overlapped_groups[3].contains(&partial_overlap)); - // min_time = 14, min_seq = 14 - } - - // This is specific unit test for split_overlapped_groups but it is a subtest of test_limit_size_and_num_files - // Keep all the variables the same names in both tests for us to follow them easily - #[test] - fn test_split_overlapped_groups_sequence_number_different_order_from_time() { - let compaction_max_size_bytes = 100000; - - // oldest overlapped and very small - let overlaps_many = - arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 10, 10, 400); - let contained_completely_within = - arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 500); - let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number(3, 5, 3, 3, 400); - let min_equals_max = - arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 5, 5, 500); - let oldest_overlapped_group = vec![ - overlaps_many.clone(), - contained_completely_within.clone(), - max_equals_min.clone(), - min_equals_max.clone(), - ]; - - // newest files and very large - let alone = arbitrary_parquet_file_with_size_and_sequence_number( - 30, - 35, - 30, - 30, - compaction_max_size_bytes + 200, - ); // too large to group - let newest_overlapped_group = vec![alone.clone()]; - - // small files in the middle - let another = arbitrary_parquet_file_with_size_and_sequence_number(13, 15, 14, 14, 1000); - let partial_overlap = - arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 13, 13, 2000); - let middle_overlapped_group = vec![another.clone(), partial_overlap.clone()]; - - let mut overlapped_groups = vec![ - oldest_overlapped_group, - newest_overlapped_group, - middle_overlapped_group, - ]; - - let max_size_bytes = 1000; - let max_file_count = 2; - - // Three input groups but will produce 5 output ones becasue of limit in size and file count. - // Note that the 3 input groups each includes overlapped files but the groups do not overlap. - // The function split_overlapped_groups is to split each overlapped group. It does not merge any groups. - let groups = Compactor::split_overlapped_groups( - &mut overlapped_groups, - max_size_bytes, - max_file_count, - ) - .unwrap(); - - // one intact group - assert_eq!(groups.internal_overlapped_groups.len(), 1); - // four newly splitted groups from two large groups - assert_eq!(groups.external_overlapped_groups.len(), 4); - - // oldest_overlapped_group was split into external_overlapped_groups[0] and [1] due to file count limit - // the chunks split by their min_seq order - assert_eq!(groups.external_overlapped_groups[0].len(), 2); // reach limit file count - assert!(groups.external_overlapped_groups[0].contains(&max_equals_min)); // min_time = 3, min_seq = 3 - assert!(groups.external_overlapped_groups[0].contains(&min_equals_max)); // min_time = 10, min_seq = 5 - assert_eq!(groups.external_overlapped_groups[1].len(), 2); // reach limit file count - assert!(groups.external_overlapped_groups[1].contains(&contained_completely_within)); // min_time = 6, min_seq = 6 - assert!(groups.external_overlapped_groups[1].contains(&overlaps_many)); // min_time = 5, min_seq = 10 - - // newest_overlapped_group stays the same length one file and the corresponding output is - // groups.internal_overlapped_groups[0] - assert_eq!(groups.internal_overlapped_groups[0].len(), 1); // reach limit file size - assert!(groups.internal_overlapped_groups[0].contains(&alone)); - - // middle_overlapped_group was split into groups.external_overlapped_groups[2] and [3] due to size limit - // the chunks split by their min_seq order - assert_eq!(groups.external_overlapped_groups[2].len(), 1); // reach limit file size - assert!(groups.external_overlapped_groups[2].contains(&partial_overlap)); // min_time = 14, min_seq = 13 - assert_eq!(groups.external_overlapped_groups[3].len(), 1); // reach limit file size - assert!(groups.external_overlapped_groups[3].contains(&another)); // min_time = 13, min_seq = 14 - } - - // This tests - // 1. overlapped_groups which focuses on the detail of both its children: - // 1.a. group_potential_duplicates that groups files into overlapped groups - // 1.b. split_overlapped_groups that splits each overlapped group further to meet size and/or file limit - // 2. group_small_contiguous_groups that merges non-overlapped group into a larger one if they meet size and file limit - #[test] - fn test_limit_size_and_num_files() { - let compaction_max_size_bytes = 100000; - - // oldest overlapped and very small - let overlaps_many = - arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 10, 10, 400); - let contained_completely_within = - arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 500); - let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number(3, 5, 3, 3, 400); - let min_equals_max = - arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 5, 5, 500); - - // newest files and very large - let alone = arbitrary_parquet_file_with_size_and_sequence_number( - 30, - 35, - 30, - 30, - compaction_max_size_bytes + 200, - ); // too large to group - - // small files in the middle - let another = arbitrary_parquet_file_with_size_and_sequence_number(13, 15, 14, 14, 1000); - let partial_overlap = - arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 13, 13, 2000); - - // Given a bunch of files in an arbitrary order, - let all = vec![ - min_equals_max.clone(), - overlaps_many.clone(), - alone.clone(), - another.clone(), - max_equals_min.clone(), - contained_completely_within.clone(), - partial_overlap.clone(), - ]; - - // Group into overlapped groups - let max_size_bytes = 1000; - let max_file_count = 2; - let overlapped_groups = - Compactor::overlapped_groups(all, max_size_bytes, max_file_count).unwrap(); - // Must be 5 - assert_eq!(overlapped_groups.len(), 5); - // Non-overlapped with other groups always first - // Group 0: newest files and very large - assert_eq!(overlapped_groups[0].parquet_files.len(), 1); // reach limit file size - assert!(overlapped_groups[0].parquet_files.contains(&alone)); // min_time = 30 - // The rest of the groups, each overlapp with another group - // Groups 1 & 2: split from `oldest overlapped and very small` ordering by sequence numbers - assert_eq!(overlapped_groups[1].parquet_files.len(), 2); // reach limit file count - assert!(overlapped_groups[1].parquet_files.contains(&max_equals_min)); // min_time = 3, min_seq = 3 - assert!(overlapped_groups[1].parquet_files.contains(&min_equals_max)); // min_time = 10, min_seq = 5 - assert_eq!(overlapped_groups[2].parquet_files.len(), 2); // reach limit file count - assert!(overlapped_groups[2] - .parquet_files - .contains(&contained_completely_within)); // min_time = 6, min_seq = 6 - assert!(overlapped_groups[2].parquet_files.contains(&overlaps_many)); // min_time = 5, min_seq = 10 - // Groups 3 & 4: split from `small files in the middle` ordering by sequence numbers - assert_eq!(overlapped_groups[3].parquet_files.len(), 1); // reach limit file size - assert!(overlapped_groups[3] - .parquet_files - .contains(&partial_overlap)); // min_time = 14, min_seq = 13 - assert_eq!(overlapped_groups[4].parquet_files.len(), 1); // reach limit file size - assert!(overlapped_groups[4].parquet_files.contains(&another)); // min_time = 13, min_seq = 14 - - // Group further into group by size and file count limit - // Due to the merge with correct time range, this function has to sort the groups hence output data will be - // in time order between each group and in sequence number order within a each group - let groups = Compactor::group_small_contiguous_groups( - overlapped_groups, - compaction_max_size_bytes, - max_file_count, - ); - - // Still 5 groups. Nothing is merged due to the limit of size and file num - assert_eq!(groups.len(), 5); - - assert_eq!(groups[0].len(), 2); // reach file num limit - assert!(groups[0].contains(&max_equals_min)); // min_time = 3, min_seq = 3 - assert!(groups[0].contains(&min_equals_max)); // min_time = 10, min_seq = 5 - - assert_eq!(groups[1].len(), 2); // reach file num limit - assert!(groups[1].contains(&contained_completely_within)); // min_time = 6, min_seq = 6 - assert!(groups[1].contains(&overlaps_many)); // min_time = 5, min_seq = 10 - - assert_eq!(groups[2].len(), 1); // reach size limit - assert!(groups[2].contains(&another)); // min_time = 13, min_seq = 14 - - assert_eq!(groups[3].len(), 1); // reach size limit - assert!(groups[3].contains(&partial_overlap)); // min_time = 14, min_seq = 13 - - assert_eq!(groups[4].len(), 1); // reach size limit - assert!(groups[4].contains(&alone)); // min_time=30 - } - - // This tests - // 1. overlapped_groups - // 2. group_small_contiguous_groups that merges non-overlapped group into a larger one if they meet size and file limit - #[test] - fn test_group_small_contiguous_overlapped_groups_no_group() { - let compaction_max_size_bytes = 100000; - - // Given two files that don't overlap, - let pf1 = arbitrary_parquet_file_with_size(1, 2, 100); - let pf2 = arbitrary_parquet_file_with_size(3, 4, compaction_max_size_bytes); // too large to group - - let overlapped_groups = Compactor::overlapped_groups( - vec![pf1.clone(), pf2.clone()], - TEST_MAX_SIZE_BYTES, - TEST_MAX_FILE_COUNT, - ) - .unwrap(); - // 2 overlapped groups - assert_eq!(overlapped_groups.len(), 2); - let g1 = GroupWithMinTimeAndSize { - parquet_files: vec![pf1.clone()], - min_time: Timestamp::new(1), - total_file_size_bytes: 100, - overlapped_with_other_groups: false, - }; - let g2 = GroupWithMinTimeAndSize { - parquet_files: vec![pf2.clone()], - min_time: Timestamp::new(3), - total_file_size_bytes: compaction_max_size_bytes, - overlapped_with_other_groups: false, - }; - // They should each be in their own group - assert_eq!(overlapped_groups, vec![g1, g2]); - - // Group them by size - let groups = Compactor::group_small_contiguous_groups( - overlapped_groups, - compaction_max_size_bytes, - TEST_MAX_FILE_COUNT, - ); - // Files too big to group further - assert_eq!(groups.len(), 2); - assert_eq!(groups, vec![vec![pf1], vec![pf2]]); - } - - // This tests - // 1. overlapped_groups which focuses on the detail of both its children: - // 1.a. group_potential_duplicates that groups files into overlapped groups - // 1.b. split_overlapped_groups that splits each overlapped group further to meet size and/or file limit - // 2. group_small_contiguous_groups that merges non-overlapped group into a larger one if they meet size and file limit - #[test] - fn test_group_small_contiguous_overlapped_groups_many_files() { - let compaction_max_size_bytes = 100000; - - // oldest overlapped and very small - let overlaps_many = arbitrary_parquet_file_with_size(5, 10, 200); - let contained_completely_within = arbitrary_parquet_file_with_size(6, 7, 300); - let max_equals_min = arbitrary_parquet_file_with_size(3, 5, 400); - let min_equals_max = arbitrary_parquet_file_with_size(10, 12, 500); - - // newest files and very large - let alone = arbitrary_parquet_file_with_size(30, 35, compaction_max_size_bytes + 200); // too large to group - - // small files in the middle - let another = arbitrary_parquet_file_with_size(13, 15, 1000); - let partial_overlap = arbitrary_parquet_file_with_size(14, 16, 2000); - - // Given a bunch of files in an arbitrary order, - let all = vec![ - min_equals_max.clone(), - overlaps_many.clone(), - alone.clone(), - another.clone(), - max_equals_min.clone(), - contained_completely_within.clone(), - partial_overlap.clone(), - ]; - - // Group into overlapped groups - let overlapped_groups = - Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap(); - // There are 3 overlapped groups but each group does not overlap with the other - assert_eq!(overlapped_groups.len(), 3); - - // group further into group by size - let groups = Compactor::group_small_contiguous_groups( - overlapped_groups, - compaction_max_size_bytes, - TEST_MAX_FILE_COUNT, - ); - // should be 2 groups - assert_eq!(groups.len(), 2); - // first group includes 6 oldest files in 2 overlapped groups - assert_eq!(groups[0].len(), 6); - assert!(groups[0].contains(&overlaps_many)); - assert!(groups[0].contains(&contained_completely_within)); - assert!(groups[0].contains(&max_equals_min)); - assert!(groups[0].contains(&min_equals_max)); - assert!(groups[0].contains(&another)); - assert!(groups[0].contains(&partial_overlap)); - // second group includes the one newest file - assert_eq!(groups[1].len(), 1); - assert!(groups[1].contains(&alone)); - } - - // This tests - // 1. overlapped_groups which focuses on the detail of both its children: - // 1.a. group_potential_duplicates that groups files into overlapped groups - // 1.b. split_overlapped_groups that splits each overlapped group further to meet size and/or file limit - // 2. group_small_contiguous_groups that merges non-overlapped group into a larger one if they meet size and file limit - #[test] - fn test_group_small_contiguous_overlapped_groups_many_files_too_large() { - // oldest overlapped and very small - let overlaps_many = arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 5, 5, 200); - let contained_completely_within = - arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 300); - let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number( - 3, - 5, - 3, - 3, - TEST_MAX_SIZE_BYTES + 400, - ); // too large to group - let min_equals_max = - arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 10, 10, 500); - - // newest files and small - let alone = arbitrary_parquet_file_with_size_and_sequence_number( - 30, - 35, - 30, - 30, - TEST_MAX_SIZE_BYTES, - ); // too large to group - - // large files in the middle - let another = arbitrary_parquet_file_with_size_and_sequence_number( - 13, - 15, - 13, - 13, - TEST_MAX_SIZE_BYTES, - ); // too large to group - let partial_overlap = - arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 14, 14, 2000); - - // Given a bunch of files in an arbitrary order - let all = vec![ - min_equals_max.clone(), - overlaps_many.clone(), - alone.clone(), - another.clone(), - max_equals_min.clone(), - contained_completely_within.clone(), - partial_overlap.clone(), - ]; - - // Group into overlapped groups - let overlapped_groups = - Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap(); - assert_eq!(overlapped_groups.len(), 5); - - // 5 input groups and 5 output groups because they are too large to group further - let groups = Compactor::group_small_contiguous_groups( - overlapped_groups, - TEST_MAX_SIZE_BYTES, - TEST_MAX_FILE_COUNT, - ); - assert_eq!(groups.len(), 5); - - // first group includes oldest and large file - assert_eq!(groups[0].len(), 1); - assert!(groups[0].contains(&max_equals_min)); // min_time = 3 - // second group - assert_eq!(groups[1].len(), 3); - assert!(groups[1].contains(&overlaps_many)); // min_time = 5 - assert!(groups[1].contains(&contained_completely_within)); // min_time = 6 - assert!(groups[1].contains(&min_equals_max)); // min_time = 10 - // third group - assert_eq!(groups[2].len(), 1); - assert!(groups[2].contains(&another)); // min_time = 13 - // forth group - assert_eq!(groups[3].len(), 1); // min_time = 14 - assert!(groups[3].contains(&partial_overlap)); - // fifth group - assert_eq!(groups[4].len(), 1); - assert!(groups[4].contains(&alone)); // min_time = 30 - } - - // bug: https://github.com/influxdata/conductor/issues/1013 - // - // n_m: chunks with sequence number range [n_m] where n <= m - // - // This is the scenario that leads to bug - // Chunks 2_2 and 3_3 overlap but 2_2 is too large to compact with any other chunks and they were - // split and marked with (*) meaning they should not group with any other groups. However, due to the bug, - // chunk 3_3 (small) was then wrongly grouped with chunk 1_1 and lead to wrong result of two - // chunks, 1_3 and 2_2, that overlap in both time and sequence number - // - // Input - // Time - // ────────────────────────────────────────────▶ - // ┌─────────────────┐ ┌────────┐ - // │ 2_2(*) │ │ 1_1 │ - // └─────────────────┘ └────────┘ - // ┌────────┐ - // │ 3_3(*) │ - // └────────┘ - // - // Wrong output: [3_3(*)] is combined with [1_1] - // Right output: Nothing gets combined and the output should look the same as the input - #[test] - fn test_group_small_contiguous_groups_103() { - let compaction_max_size_bytes = 100000; - - // Given 3 files as in the input diagram - let pf1 = arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 1, 1, 100); - let pf2 = arbitrary_parquet_file_with_size_and_sequence_number( - 1, - 5, - 2, - 2, - compaction_max_size_bytes, - ); // too large to group - let pf3 = arbitrary_parquet_file_with_size_and_sequence_number(3, 6, 3, 3, 200); - - let g1 = GroupWithMinTimeAndSize::new(vec![pf1], false); - let g2 = GroupWithMinTimeAndSize::new(vec![pf2], true); - let g3 = GroupWithMinTimeAndSize::new(vec![pf3], true); - - let overlapped_groups = vec![g1, g2, g3]; - - // None of the 3 groups should be further combined - let groups = Compactor::group_small_contiguous_groups( - overlapped_groups, - compaction_max_size_bytes, - TEST_MAX_FILE_COUNT, - ); - assert_eq!(groups.len(), 3); - } - - // Thorough tests for group_small_contiguous_groups - // Input: 5 groups - // . 3 type-1 groups: [1_1, 2_2], [4_4], [7_7], - // . 2 type-2 groups: [5_5], [6_6] - // - // │ │ │ Time - // ──────────────────────────────────────────────────────────────▶ - // ┌─────┐ │ ┌─────┐ │ ┌─────────────────┐ │ ┌─────┐ - // │ 1_1 │ │ 7_7 │ │ 5_5(*) │ │ 4_4 │ - // └─────┘ │ └─────┘ │ └─────────────────┘ │ └─────┘ - // ┌─────┐ ┌──────┐ - // │ 2_2 │ │ │ │ │6_6(*)│ │ - // └─────┘ └──────┘ - // │ │ │ │ - // - // Output: 4 groups - // . [1_1, 2_2, 7_7] - // . [5_5] - // . [6_6] - // . [4_4] - // │ │ Time - // ──────────────────────────────────────────────────────────────▶ - // ┌─────┐ ┌─────┐ │ ┌─────────────────┐ │ ┌─────┐ - // │ 1_1 │ │ 7_7 │ │ 5_5(*) │ │ 4_4 │ - // └─────┘ └─────┘ │ └─────────────────┘ │ └─────┘ - // ┌─────┐ ┌──────┐ - // │ 2_2 │ │ │ │6_6(*)│ │ - // └─────┘ └──────┘ - // │ │ │ - #[test] - fn test_group_small_contiguous_groups() { - let compaction_max_size_bytes = 100000; - - // Given 6 files as in the input diagram - let pf1 = arbitrary_parquet_file_with_size_and_sequence_number(1, 5, 1, 1, 100); - let pf2 = arbitrary_parquet_file_with_size_and_sequence_number(3, 6, 2, 2, 100); - let pf4 = arbitrary_parquet_file_with_size_and_sequence_number(100, 110, 4, 4, 100); - let pf5 = arbitrary_parquet_file_with_size_and_sequence_number( - 50, - 60, - 5, - 5, - compaction_max_size_bytes, - ); // too large to group - let pf6 = arbitrary_parquet_file_with_size_and_sequence_number(55, 65, 6, 6, 200); - let pf7 = arbitrary_parquet_file_with_size_and_sequence_number(10, 20, 7, 7, 200); - - // 6 files in in 5 groups - let g1 = GroupWithMinTimeAndSize::new(vec![pf1.clone(), pf2.clone()], false); - let g2 = GroupWithMinTimeAndSize::new(vec![pf4.clone()], false); - let g3 = GroupWithMinTimeAndSize::new(vec![pf5.clone()], true); - let g4 = GroupWithMinTimeAndSize::new(vec![pf6.clone()], true); - let g5 = GroupWithMinTimeAndSize::new(vec![pf7.clone()], false); - - let overlapped_groups = vec![g1, g2, g3, g4, g5]; - - // g1 and g5 will be combined - let groups = Compactor::group_small_contiguous_groups( - overlapped_groups, - compaction_max_size_bytes, - TEST_MAX_FILE_COUNT, - ); - assert_eq!(groups.len(), 4); - assert_eq!(groups[0].len(), 3); - assert!(groups[0].contains(&pf1)); - assert!(groups[0].contains(&pf2)); - assert!(groups[0].contains(&pf7)); - assert_eq!(groups[1].len(), 1); - assert!(groups[1].contains(&pf5)); - assert_eq!(groups[2].len(), 1); - assert!(groups[2].contains(&pf6)); - assert_eq!(groups[3].len(), 1); - assert!(groups[3].contains(&pf4)); - } - - #[test] - fn test_group_small_contiguous_overlapped_groups_many_files_middle_too_large() { - // oldest overlapped and very small - let overlaps_many = arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 5, 5, 200); - let contained_completely_within = - arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 300); - let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number(3, 5, 3, 3, 400); - let min_equals_max = - arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 10, 10, 500); - - // newest files and small - let alone = arbitrary_parquet_file_with_size_and_sequence_number(30, 35, 30, 30, 200); - - // large files in the middle - let another = arbitrary_parquet_file_with_size_and_sequence_number( - 13, - 15, - 13, - 13, - TEST_MAX_SIZE_BYTES, - ); // too large to group - let partial_overlap = - arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 14, 14, 2000); - - // Given a bunch of files in an arbitrary order - let all = vec![ - min_equals_max.clone(), - overlaps_many.clone(), - alone.clone(), - another.clone(), - max_equals_min.clone(), - contained_completely_within.clone(), - partial_overlap.clone(), - ]; - - // Group into overlapped groups - // Becasue file `another` is too large, it was split from its actulal overlapped `partial_overlap`. - // This leads to actual 3 overlapped groups become 4 groups - let overlapped_groups = - Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap(); - assert_eq!(overlapped_groups.len(), 4); - - // 4 input groups and still 4 output groups. Although the sorted last 2 groups [partial_overlap] and - // [alone] are small, they cannot be combined because [partial_overlap] overlaps with [another] - let groups = Compactor::group_small_contiguous_groups( - overlapped_groups, - TEST_MAX_SIZE_BYTES, - TEST_MAX_FILE_COUNT, - ); - assert_eq!(groups.len(), 4); - - // first group includes 4 oldest files - assert_eq!(groups[0].len(), 4); - assert!(groups[0].contains(&max_equals_min)); // min _time = 3 - assert!(groups[0].contains(&overlaps_many)); // min _time = 5 - assert!(groups[0].contains(&contained_completely_within)); // min _time = 6 - assert!(groups[0].contains(&min_equals_max)); // min _time = 10 - // second group includes the large file `another` of the middle time - assert_eq!(groups[1].len(), 1); - assert!(groups[1].contains(&another)); // min _time = 13 - // third group includes the second file `partial_overlap` of the middle time - assert_eq!(groups[2].len(), 1); - assert!(groups[2].contains(&partial_overlap)); // min _time = 14 - // fourth group includes the newes one - assert_eq!(groups[3].len(), 1); - assert!(groups[3].contains(&alone)); // min _time = 30 - } - - #[test] - fn test_group_small_contiguous_overlapped_groups_with_split_overlap_groups() { - // oldest overlapped - // These 4 overlapped files exceed max count --> will be split into 2 overlapped groups - let overlaps_many = arbitrary_parquet_file_with_size_and_sequence_number(5, 10, 5, 5, 400); - let contained_completely_within = - arbitrary_parquet_file_with_size_and_sequence_number(6, 7, 6, 6, 500); - let max_equals_min = arbitrary_parquet_file_with_size_and_sequence_number(3, 5, 3, 3, 400); - let min_equals_max = - arbitrary_parquet_file_with_size_and_sequence_number(10, 12, 10, 10, 500); - - // newest files - // one and large file that does not overlapped with any other files - // --> will be in its own overlapped group after Compactor::overlapped_groups - // Since it is small --> it will be grouped with the middle_overlapped_group after Compactor::group_small_contiguous_groups - let alone = arbitrary_parquet_file_with_size_and_sequence_number(30, 35, 30, 30, 50); - - // very small files in the middle - // these 2 files overlap but small so no further split - let another = arbitrary_parquet_file_with_size_and_sequence_number(13, 15, 13, 13, 10); - let partial_overlap = - arbitrary_parquet_file_with_size_and_sequence_number(14, 16, 14, 14, 20); - - // Given a bunch of files in an arbitrary order - let all = vec![ - min_equals_max.clone(), - overlaps_many.clone(), - alone.clone(), - another.clone(), - max_equals_min.clone(), - contained_completely_within.clone(), - partial_overlap.clone(), - ]; - - let max_size_bytes = 1000; - let max_file_count = 2; - - // Group into overlapped groups - let overlapped_groups = - Compactor::overlapped_groups(all, max_size_bytes, max_file_count).unwrap(); - assert_eq!(overlapped_groups.len(), 4); - - // Must be 4 - assert_eq!(overlapped_groups.len(), 4); - // 2 groups that do not overlapped wih other groups are always first - // Group 0: `very small files in the middle` - assert_eq!(overlapped_groups[0].parquet_files.len(), 2); - assert!(overlapped_groups[0].parquet_files.contains(&another)); // min_time = 13, min_seq = 14 - assert!(overlapped_groups[0] - .parquet_files - .contains(&partial_overlap)); // min_time = 14, min_seq = 13 - // Group 1: `newest files` - assert_eq!(overlapped_groups[1].parquet_files.len(), 1); - assert!(overlapped_groups[1].parquet_files.contains(&alone)); // min_time = 30 - // Next 2 groups are split from `oldest overlapped` - assert_eq!(overlapped_groups[2].parquet_files.len(), 2); - assert!(overlapped_groups[2].parquet_files.contains(&max_equals_min)); // min_time = 3, min_seq = 3 - assert!(overlapped_groups[2].parquet_files.contains(&overlaps_many)); // min_time = 5, min_seq = 5 - assert_eq!(overlapped_groups[3].parquet_files.len(), 2); - assert!(overlapped_groups[3] - .parquet_files - .contains(&contained_completely_within)); // min_time = 6, min_seq = 6 - assert!(overlapped_groups[3].parquet_files.contains(&min_equals_max)); // min_time = 10, min_seq = 10 - - // 4 input groups but 3 output groups. Group 0 and Group 1 above are further grouped together - let groups = Compactor::group_small_contiguous_groups( - overlapped_groups, - max_size_bytes, - max_file_count + 5, // increase limit file count ot get them grouped - ); - assert_eq!(groups.len(), 3); - - // The output group is in time order - // groups[0] the same as overlapped_groups[2] - assert_eq!(groups[0].len(), 2); - assert!(groups[0].contains(&max_equals_min)); // min_time = 3, min_seq = 3 - assert!(groups[0].contains(&overlaps_many)); // min_time = 5, min_seq = 5 - // groups[1] the same as overlapped_groups[3] - assert_eq!(groups[1].len(), 2); - assert!(groups[1].contains(&contained_completely_within)); // min_time = 6, min_seq = 6 - assert!(groups[1].contains(&min_equals_max)); // min_time = 10, min_seq = 10 - // group[2] is the result of grouping overlapped_groups[0] and overlapped_groups[1] - assert_eq!(groups[2].len(), 3); - assert!(groups[2].contains(&another)); // min_time = 13, min_seq = 1 - assert!(groups[2].contains(&partial_overlap)); // min_time = 14, min_seq = 13 - assert!(groups[2].contains(&alone)); // min_time = 30 - } - - #[tokio::test] - async fn add_tombstones_to_parquet_files_in_groups() { - let catalog = TestCatalog::new(); - - let split_percentage = 90; - let max_concurrent_compaction_size_bytes = 100000; - let compaction_max_size_bytes = 100000; - let compaction_max_file_count = 10; - let compaction_max_desired_file_size_bytes = 30000; - let compactor = Compactor::new( - vec![], - Arc::clone(&catalog.catalog), - ParquetStorage::new(Arc::clone(&catalog.object_store)), - Arc::new(Executor::new(1)), - Arc::new(SystemProvider::new()), - BackoffConfig::default(), - CompactorConfig::new( - split_percentage, - max_concurrent_compaction_size_bytes, - compaction_max_size_bytes, - compaction_max_file_count, - compaction_max_desired_file_size_bytes, - ), - Arc::new(metric::Registry::new()), - ); - - let mut txn = catalog.catalog.start_transaction().await.unwrap(); - - let kafka = txn.kafka_topics().create_or_get("foo").await.unwrap(); - let pool = txn.query_pools().create_or_get("foo").await.unwrap(); - let namespace = txn - .namespaces() - .create( - "namespace_add_tombstones_to_parquet_files_in_groups", - "inf", - kafka.id, - pool.id, - ) - .await - .unwrap(); - let table = txn - .tables() - .create_or_get("test_table", namespace.id) - .await - .unwrap(); - let sequencer = txn - .sequencers() - .create_or_get(&kafka, KafkaPartition::new(1)) - .await - .unwrap(); - let partition = txn - .partitions() - .create_or_get("one".into(), sequencer.id, table.id) - .await - .unwrap(); - - let p1 = ParquetFileParams { - sequencer_id: sequencer.id, - namespace_id: namespace.id, - table_id: table.id, - partition_id: partition.id, - object_store_id: Uuid::new_v4(), - min_sequence_number: SequenceNumber::new(4), - max_sequence_number: SequenceNumber::new(100), - min_time: Timestamp::new(1), - max_time: Timestamp::new(5), - file_size_bytes: 1337, - row_count: 0, - created_at: Timestamp::new(1), - compaction_level: INITIAL_COMPACTION_LEVEL, - column_set: ColumnSet::new([ColumnId::new(1), ColumnId::new(2)]), - }; - - let p2 = ParquetFileParams { - object_store_id: Uuid::new_v4(), - max_sequence_number: SequenceNumber::new(200), - min_time: Timestamp::new(4), - max_time: Timestamp::new(7), - - ..p1.clone() - }; - let pf1 = txn.parquet_files().create(p1).await.unwrap(); - let pf2 = txn.parquet_files().create(p2).await.unwrap(); - - let parquet_files = vec![pf1.clone(), pf2.clone()]; - let groups = vec![ - vec![], // empty group should get filtered out - parquet_files, - ]; - - // Tombstone with a sequence number that's too low for both files, even though it overlaps - // with the time range. Shouldn't be included in the group - let _t1 = txn - .tombstones() - .create_or_get( - table.id, - sequencer.id, - SequenceNumber::new(1), - Timestamp::new(3), - Timestamp::new(6), - "whatevs", - ) - .await - .unwrap(); - - // Tombstone with a sequence number too low for one file but not the other, time range - // overlaps both files - let t2 = txn - .tombstones() - .create_or_get( - table.id, - sequencer.id, - SequenceNumber::new(150), - Timestamp::new(3), - Timestamp::new(6), - "whatevs", - ) - .await - .unwrap(); - - // Tombstone with a time range that only overlaps with one file - let t3 = txn - .tombstones() - .create_or_get( - table.id, - sequencer.id, - SequenceNumber::new(300), - Timestamp::new(6), - Timestamp::new(8), - "whatevs", - ) - .await - .unwrap(); - - // Tombstone with a time range that overlaps both files and has a sequence number large - // enough for both files - let t4 = txn - .tombstones() - .create_or_get( - table.id, - sequencer.id, - SequenceNumber::new(400), - Timestamp::new(1), - Timestamp::new(10), - "whatevs", - ) - .await - .unwrap(); - txn.commit().await.unwrap(); - - let groups_with_tombstones = compactor.add_tombstones_to_groups(groups).await.unwrap(); - - assert_eq!(groups_with_tombstones.len(), 1); - let group_with_tombstones = &groups_with_tombstones[0]; - - let actual_group_tombstone_ids = group_with_tombstones.tombstone_ids(); - assert_eq!( - actual_group_tombstone_ids, - HashSet::from([t2.id, t3.id, t4.id]) - ); - - let actual_pf1 = group_with_tombstones - .parquet_files - .iter() - .find(|pf| pf.parquet_file_id() == pf1.id) - .unwrap(); - let mut actual_pf1_tombstones: Vec<_> = - actual_pf1.tombstones().iter().map(|t| t.id).collect(); - actual_pf1_tombstones.sort(); - assert_eq!(actual_pf1_tombstones, &[t2.id, t4.id]); - - let actual_pf2 = group_with_tombstones - .parquet_files - .iter() - .find(|pf| pf.parquet_file_id() == pf2.id) - .unwrap(); - let mut actual_pf2_tombstones: Vec<_> = - actual_pf2.tombstones().iter().map(|t| t.id).collect(); - actual_pf2_tombstones.sort(); - assert_eq!(actual_pf2_tombstones, &[t3.id, t4.id]); - } - - #[tokio::test] - async fn test_overlap_group_edge_case() { - let one = arbitrary_parquet_file(0, 3); - let two = arbitrary_parquet_file(5, 10); - let three = arbitrary_parquet_file(2, 6); - - // Given a bunch of files in a particular order to exercise the algorithm: - let all = vec![one, two, three]; - - let groups = - Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap(); - dbg!(&groups); - - // All should be in the same group. - assert_eq!(groups.len(), 1); - - let one = arbitrary_parquet_file(0, 3); - let two = arbitrary_parquet_file(5, 10); - let three = arbitrary_parquet_file(2, 6); - let four = arbitrary_parquet_file(8, 11); - - // Given a bunch of files in a particular order to exercise the algorithm: - let all = vec![one, two, three, four]; - - let groups = - Compactor::overlapped_groups(all, TEST_MAX_SIZE_BYTES, TEST_MAX_FILE_COUNT).unwrap(); - dbg!(&groups); - - // All should be in the same group. - assert_eq!(groups.len(), 1); - } - #[tokio::test] async fn test_add_parquet_file_with_tombstones() { let catalog = TestCatalog::new(); @@ -4286,13 +2394,11 @@ mod tests { &table.table, &table_schema, &partition.partition, - compactor.config.compaction_max_size_bytes(), ) .await .unwrap(); - // 2 sets based on 83% split rule = 100000 (max file size) / 120000 (total file size) - assert_eq!(batches.len(), 2); + assert_eq!(batches.len(), 1); let batches = batches .into_iter() @@ -4307,8 +2413,7 @@ mod tests { // Verify number of output rows // There should be total 1499 output rows (999 from lp1 + 100 from lp2 - 500 duplicates) - let mut num_rows: usize = batches[0].iter().map(|rb| rb.num_rows()).sum(); - num_rows += batches[1].iter().map(|rb| rb.num_rows()).sum::(); + let num_rows: usize = batches[0].iter().map(|rb| rb.num_rows()).sum(); assert_eq!(num_rows, 1499); } diff --git a/compactor/src/handler.rs b/compactor/src/handler.rs index 99aeb48b9c..1819fb013a 100644 --- a/compactor/src/handler.rs +++ b/compactor/src/handler.rs @@ -194,18 +194,13 @@ async fn run_compactor(compactor: Arc, shutdown: CancellationToken) { let mut used_size = 0; let max_size = compactor.config.max_concurrent_compaction_size_bytes(); - let max_desired_file_size = compactor.config.compaction_max_desired_file_size_bytes(); - let max_file_count = compactor.config.compaction_max_file_count(); + let mut handles = vec![]; for c in candidates { let compactor = Arc::clone(&compactor); let compact_and_upgrade = compactor - .groups_to_compact_and_files_to_upgrade( - c.candidate.partition_id, - max_desired_file_size, - max_file_count, - ) + .groups_to_compact_and_files_to_upgrade(c.candidate.partition_id) .await; match compact_and_upgrade { @@ -227,7 +222,6 @@ async fn run_compactor(compactor: Arc, shutdown: CancellationToken) { &c.table_schema, c.candidate.partition_id, compact_and_upgrade, - max_desired_file_size, ) .await; if let Err(e) = res {