diff --git a/compactor/src/cold.rs b/compactor/src/cold.rs index 04ffbdd4e2..1eb3aad4ee 100644 --- a/compactor/src/cold.rs +++ b/compactor/src/cold.rs @@ -45,7 +45,7 @@ pub async fn compact(compactor: Arc, do_full_compact: bool) -> usize compaction_type, CompactionLevel::Initial, compact_in_parallel, - true, // split + false, // no split candidates.clone().into(), ) .await; @@ -108,7 +108,7 @@ mod tests { use backoff::BackoffConfig; use data_types::{ColumnType, CompactionLevel, ParquetFileId}; use iox_query::exec::Executor; - use iox_tests::util::{TestCatalog, TestParquetFileBuilder}; + use iox_tests::util::{TestCatalog, TestParquetFileBuilder, TestTable}; use iox_time::{SystemProvider, TimeProvider}; use std::collections::HashMap; @@ -700,166 +700,112 @@ mod tests { } } + #[tokio::test] + async fn cold_compaction_first_step_no_split() { + test_helpers::maybe_start_logging(); + let TestDb { + catalog, + compactor, + table, + } = make_db_with_4_l0s_2_l1s().await; + + // Let do cold compaction first step + // + // Select partition candidates. Must be one becasue all 6 files belong to the same partition + let candidates = compactor.cold_partitions_to_compact(10).await.unwrap(); + assert_eq!(candidates.len(), 1); + // + // Cold compaction first step + compact_candidates_with_memory_budget( + Arc::clone(&compactor), + "cold", + CompactionLevel::Initial, + compact_in_parallel, + false, // no split + candidates.clone().into(), + ) + .await; + + // Should have 2 non-soft-deleted L-1 files: + // . level-1 pf6 with is not compacted because it does not overlap with any L0s + // . new compacted level-1 file as a result of compacting 4 L0s (pf1, pf2, pf3, pf4) and one L1 (pf5) + let files = catalog.list_by_table_not_to_delete(table.table.id).await; + assert_eq!(files.len(), 2, "{files:?}"); + let files_and_levels: Vec<_> = files + .iter() + .map(|f| (f.id.get(), f.compaction_level)) + .collect(); + + // The initial files and their IDs are: L0 1-4, L1 5-6. The first step of cold compaction took files 1-5 + // and compacted them into level-1 with next ID 7 + assert_eq!( + files_and_levels, + vec![ + (6, CompactionLevel::FileNonOverlapped), + (7, CompactionLevel::FileNonOverlapped) + ] + ); + } + + #[tokio::test] + async fn cold_compaction_first_step_split() { + test_helpers::maybe_start_logging(); + + let TestDb { + catalog, + compactor, + table, + } = make_db_with_4_l0s_2_l1s().await; + + // Let do cold compaction first step + // + // Select partition candidates. Must be one becasue all 6 files belong to the same partition + let candidates = compactor.cold_partitions_to_compact(10).await.unwrap(); + assert_eq!(candidates.len(), 1); + // + // Cold compaction first step + compact_candidates_with_memory_budget( + Arc::clone(&compactor), + "cold", + CompactionLevel::Initial, + compact_in_parallel, + true, // split + candidates.clone().into(), + ) + .await; + + // Should have 3 non-soft-deleted L-1 files: + // . level-1 pf6 is not compacted because it does not overlap with any L0s + // . Two new compacted level-1 files as a result of compacting 4 L0s (pf1, pf2, pf3, pf4) and one L1 (pf5) + // and split the output into 2 files + let files = catalog.list_by_table_not_to_delete(table.table.id).await; + assert_eq!(files.len(), 3, "{files:?}"); + let files_and_levels: Vec<_> = files + .iter() + .map(|f| (f.id.get(), f.compaction_level)) + .collect(); + + // The initial files and their IDs are: L0 1-4, L1 5-6. The first step of cold compaction took files 1-5 + // and compacted them into 2 level-1 with IDs 7 and 8 + assert_eq!( + files_and_levels, + vec![ + (6, CompactionLevel::FileNonOverlapped), + (7, CompactionLevel::FileNonOverlapped), + (8, CompactionLevel::FileNonOverlapped) + ] + ); + } + #[tokio::test] async fn full_cold_compaction_many_files() { test_helpers::maybe_start_logging(); - let catalog = TestCatalog::new(); - // lp1 does not overlap with any other level 0 - let lp1 = vec![ - "table,tag1=WA field_int=1000i 10", - "table,tag1=VT field_int=10i 20", - ] - .join("\n"); - - // lp2 overlaps with lp3 - let lp2 = vec![ - "table,tag1=WA field_int=1000i 8000", // will be eliminated due to duplicate - "table,tag1=VT field_int=10i 10000", - "table,tag1=UT field_int=70i 20000", - ] - .join("\n"); - - // lp3 overlaps with lp2 - let lp3 = vec![ - "table,tag1=WA field_int=1500i 8000", // latest duplicate and kept - "table,tag1=VT field_int=10i 6000", - "table,tag1=UT field_int=270i 25000", - ] - .join("\n"); - - // lp4 does not overlap with any - let lp4 = vec![ - "table,tag2=WA,tag3=10 field_int=1600i 28000", - "table,tag2=VT,tag3=20 field_int=20i 26000", - ] - .join("\n"); - - // lp5 overlaps with lp1 - let lp5 = vec![ - "table,tag2=PA,tag3=15 field_int=1601i 9", - "table,tag2=OH,tag3=21 field_int=21i 25", - ] - .join("\n"); - - // lp6 does not overlap with any - let lp6 = vec![ - "table,tag2=PA,tag3=15 field_int=81601i 90000", - "table,tag2=OH,tag3=21 field_int=421i 91000", - ] - .join("\n"); - - let ns = catalog.create_namespace("ns").await; - let shard = ns.create_shard(1).await; - let table = ns.create_table("table").await; - table.create_column("field_int", ColumnType::I64).await; - table.create_column("tag1", ColumnType::Tag).await; - table.create_column("tag2", ColumnType::Tag).await; - table.create_column("tag3", ColumnType::Tag).await; - table.create_column("time", ColumnType::Time).await; - - let partition = table.with_shard(&shard).create_partition("part").await; - let time = Arc::new(SystemProvider::new()); - let time_38_hour_ago = time.hours_ago(38); - let config = make_compactor_config(); - let metrics = Arc::new(metric::Registry::new()); - let compactor = Arc::new(Compactor::new( - vec![shard.shard.id], - Arc::clone(&catalog.catalog), - ParquetStorage::new(Arc::clone(&catalog.object_store)), - Arc::new(Executor::new(1)), - Arc::new(SystemProvider::new()), - BackoffConfig::default(), - config, - Arc::clone(&metrics), - )); - - // parquet files that are all in the same partition - let mut size_overrides = HashMap::::default(); - - // pf1 does not overlap with any other level 0 - let builder = TestParquetFileBuilder::default() - .with_line_protocol(&lp1) - .with_max_seq(3) - .with_min_time(10) - .with_max_time(20) - .with_creation_time(time_38_hour_ago); - let pf1 = partition.create_parquet_file(builder).await; - size_overrides.insert( - pf1.parquet_file.id, - compactor.config.max_desired_file_size_bytes as i64 + 10, - ); - - // pf2 overlaps with pf3 - let builder = TestParquetFileBuilder::default() - .with_line_protocol(&lp2) - .with_max_seq(5) - .with_min_time(8_000) - .with_max_time(20_000) - .with_creation_time(time_38_hour_ago); - let pf2 = partition.create_parquet_file(builder).await; - size_overrides.insert( - pf2.parquet_file.id, - 100, // small file - ); - - // pf3 overlaps with pf2 - let builder = TestParquetFileBuilder::default() - .with_line_protocol(&lp3) - .with_max_seq(10) - .with_min_time(6_000) - .with_max_time(25_000) - .with_creation_time(time_38_hour_ago); - let pf3 = partition.create_parquet_file(builder).await; - size_overrides.insert( - pf3.parquet_file.id, - 100, // small file - ); - - // pf4 does not overlap with any but is small - let builder = TestParquetFileBuilder::default() - .with_line_protocol(&lp4) - .with_max_seq(18) - .with_min_time(26_000) - .with_max_time(28_000) - .with_creation_time(time_38_hour_ago); - let pf4 = partition.create_parquet_file(builder).await; - size_overrides.insert( - pf4.parquet_file.id, - 100, // small file - ); - - // pf5 was created in a previous compaction cycle; overlaps with pf1 - let builder = TestParquetFileBuilder::default() - .with_line_protocol(&lp5) - .with_max_seq(1) - .with_min_time(9) - .with_max_time(25) - .with_creation_time(time_38_hour_ago) - .with_compaction_level(CompactionLevel::FileNonOverlapped); - let pf5 = partition.create_parquet_file(builder).await; - size_overrides.insert( - pf5.parquet_file.id, - 100, // small file - ); - - // pf6 was created in a previous compaction cycle; does not overlap with any - let builder = TestParquetFileBuilder::default() - .with_line_protocol(&lp6) - .with_max_seq(20) - .with_min_time(90000) - .with_max_time(91000) - .with_creation_time(time_38_hour_ago) - .with_compaction_level(CompactionLevel::FileNonOverlapped); - let pf6 = partition.create_parquet_file(builder).await; - size_overrides.insert( - pf6.parquet_file.id, - 100, // small file - ); - - // should have 4 level-0 files before compacting - let count = catalog.count_level_0_files(shard.shard.id).await; - assert_eq!(count, 4); + let TestDb { + catalog, + compactor, + table, + } = make_db_with_4_l0s_2_l1s().await; // ------------------------------------------------ // Compact @@ -878,9 +824,9 @@ mod tests { .collect(); // The initial files are: L0 1-4, L1 5-6. The first step of cold compaction took files 1-5 - // and compacted them and split them into files 7 and 8. The second step of cold compaction - // took 6, 7, and 8 and combined them all into file 9. - assert_eq!(files_and_levels, vec![(9, CompactionLevel::Final)]); + // and compacted them into a l-1 file 7. The second step of cold compaction + // took 6 and 7 and combined them all into file 8. + assert_eq!(files_and_levels, vec![(8, CompactionLevel::Final)]); // ------------------------------------------------ // Verify the parquet file content @@ -913,6 +859,7 @@ mod tests { #[tokio::test] async fn full_cold_compaction_new_level_1_overlapping_with_level_2() { test_helpers::maybe_start_logging(); + let catalog = TestCatalog::new(); // lp1 will be level 1 with min time 10, overlaps with lp5 (L2) @@ -1142,4 +1089,180 @@ mod tests { &batches ); } + + struct TestDb { + catalog: Arc, + compactor: Arc, + table: Arc, + } + + async fn make_db_with_4_l0s_2_l1s() -> TestDb { + let catalog = TestCatalog::new(); + + // lp1 does not overlap with any other level 0 + let lp1 = vec![ + "table,tag1=WA field_int=1000i 10", + "table,tag1=VT field_int=10i 20", + ] + .join("\n"); + + // lp2 overlaps with lp3 + let lp2 = vec![ + "table,tag1=WA field_int=1000i 8000", // will be eliminated due to duplicate + "table,tag1=VT field_int=10i 10000", + "table,tag1=UT field_int=70i 20000", + ] + .join("\n"); + + // lp3 overlaps with lp2 + let lp3 = vec![ + "table,tag1=WA field_int=1500i 8000", // latest duplicate and kept + "table,tag1=VT field_int=10i 6000", + "table,tag1=UT field_int=270i 25000", + ] + .join("\n"); + + // lp4 does not overlap with any + let lp4 = vec![ + "table,tag2=WA,tag3=10 field_int=1600i 28000", + "table,tag2=VT,tag3=20 field_int=20i 26000", + ] + .join("\n"); + + // lp5 overlaps with lp1 + let lp5 = vec![ + "table,tag2=PA,tag3=15 field_int=1601i 9", + "table,tag2=OH,tag3=21 field_int=21i 25", + ] + .join("\n"); + + // lp6 does not overlap with any + let lp6 = vec![ + "table,tag2=PA,tag3=15 field_int=81601i 90000", + "table,tag2=OH,tag3=21 field_int=421i 91000", + ] + .join("\n"); + + let ns = catalog.create_namespace("ns").await; + let shard = ns.create_shard(1).await; + let table = ns.create_table("table").await; + table.create_column("field_int", ColumnType::I64).await; + table.create_column("tag1", ColumnType::Tag).await; + table.create_column("tag2", ColumnType::Tag).await; + table.create_column("tag3", ColumnType::Tag).await; + table.create_column("time", ColumnType::Time).await; + + let partition = table.with_shard(&shard).create_partition("part").await; + let time = Arc::new(SystemProvider::new()); + let time_38_hour_ago = time.hours_ago(38); + let config = make_compactor_config(); + let metrics = Arc::new(metric::Registry::new()); + let compactor = Arc::new(Compactor::new( + vec![shard.shard.id], + Arc::clone(&catalog.catalog), + ParquetStorage::new(Arc::clone(&catalog.object_store)), + Arc::new(Executor::new(1)), + Arc::new(SystemProvider::new()), + BackoffConfig::default(), + config, + Arc::clone(&metrics), + )); + + // parquet files that are all in the same partition + let mut size_overrides = HashMap::::default(); + + // pf1 does not overlap with any other level 0 + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp1) + .with_max_seq(3) + .with_min_time(10) + .with_max_time(20) + .with_creation_time(time_38_hour_ago); + let pf1 = partition.create_parquet_file(builder).await; + size_overrides.insert( + pf1.parquet_file.id, + compactor.config.max_desired_file_size_bytes as i64 + 10, + ); + + // pf2 overlaps with pf3 + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp2) + .with_max_seq(5) + .with_min_time(8_000) + .with_max_time(20_000) + .with_creation_time(time_38_hour_ago); + let pf2 = partition.create_parquet_file(builder).await; + size_overrides.insert( + pf2.parquet_file.id, + 100, // small file + ); + + // pf3 overlaps with pf2 + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp3) + .with_max_seq(10) + .with_min_time(6_000) + .with_max_time(25_000) + .with_creation_time(time_38_hour_ago); + let pf3 = partition.create_parquet_file(builder).await; + size_overrides.insert( + pf3.parquet_file.id, + 100, // small file + ); + + // pf4 does not overlap with any but is small + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp4) + .with_max_seq(18) + .with_min_time(26_000) + .with_max_time(28_000) + .with_creation_time(time_38_hour_ago); + let pf4 = partition.create_parquet_file(builder).await; + size_overrides.insert( + pf4.parquet_file.id, + 100, // small file + ); + + // pf5 was created in a previous compaction cycle; overlaps with pf1 + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp5) + .with_max_seq(1) + .with_min_time(9) + .with_max_time(25) + .with_creation_time(time_38_hour_ago) + .with_compaction_level(CompactionLevel::FileNonOverlapped); + let pf5 = partition.create_parquet_file(builder).await; + size_overrides.insert( + pf5.parquet_file.id, + 100, // small file + ); + + // pf6 was created in a previous compaction cycle; does not overlap with any + let builder = TestParquetFileBuilder::default() + .with_line_protocol(&lp6) + .with_max_seq(20) + .with_min_time(90000) + .with_max_time(91000) + .with_creation_time(time_38_hour_ago) + .with_compaction_level(CompactionLevel::FileNonOverlapped); + let pf6 = partition.create_parquet_file(builder).await; + size_overrides.insert( + pf6.parquet_file.id, + 100, // small file + ); + + // should have 4 level-0 files before compacting + let count = catalog.count_level_0_files(shard.shard.id).await; + assert_eq!(count, 4); + + // should have two level-1 file. Thus total L-0 and l-1 will be six before compacting + let all_files = catalog.list_by_table_not_to_delete(table.table.id).await; + assert_eq!(all_files.len(), 6); + + TestDb { + catalog: Arc::clone(&catalog), + compactor: Arc::clone(&compactor), + table: Arc::clone(&table), + } + } }