diff --git a/compactor2/src/components/split_or_compact/split_compact.rs b/compactor2/src/components/split_or_compact/split_compact.rs index cb096af1b7..21853191dc 100644 --- a/compactor2/src/components/split_or_compact/split_compact.rs +++ b/compactor2/src/components/split_or_compact/split_compact.rs @@ -45,7 +45,7 @@ impl SplitOrCompact for SplitCompact { /// (2).If split is not needed which also means the split was needed and done in previous round, /// pick files to compact that under max_compact_size limit. Mostly after the split above /// done in previous round, we will be able to do this because start level and - /// target level time ranges are aligned + /// target level time ranges are aligned /// (3).If the smallest possible set to compact is still over size limit, split over-size files. /// This will be any large files of start-level or target-level. We expect this split is very rare /// and the goal is to reduce the size for us to move forward, hence the split time will make e diff --git a/compactor2/tests/layouts/large_files.rs b/compactor2/tests/layouts/large_files.rs index 0fa310f12d..9c19124625 100644 --- a/compactor2/tests/layouts/large_files.rs +++ b/compactor2/tests/layouts/large_files.rs @@ -3,6 +3,7 @@ //! See [crate::layout] module for detailed documentation use data_types::CompactionLevel; +use iox_time::Time; use crate::layouts::{layout_setup_builder, parquet_builder, run_layout_scenario, ONE_MB}; @@ -111,6 +112,7 @@ async fn one_larger_max_compact_size() { .with_min_time(1) .with_max_time(1000) .with_compaction_level(CompactionLevel::FileNonOverlapped) + .with_max_l0_created_at(Time::from_timestamp_nanos(5)) // file > max_desired_file_size_bytes .with_file_size_bytes((max_compact_size + 1) as u64), ) @@ -123,14 +125,14 @@ async fn one_larger_max_compact_size() { --- - "**** Input Files " - "L1, all files 300mb " - - "L1.1[1,1000] 1ns |------------------------------------------L1.1------------------------------------------|" - - "WARNING: file L1.1[1,1000] 1ns 300mb exceeds soft limit 100mb by more than 50%" + - "L1.1[1,1000] 5ns |------------------------------------------L1.1------------------------------------------|" + - "WARNING: file L1.1[1,1000] 5ns 300mb exceeds soft limit 100mb by more than 50%" - "Committing partition 1:" - " Upgrading 1 files level to CompactionLevel::L2: L1.1" - "**** Final Output Files " - "L2, all files 300mb " - - "L2.1[1,1000] 1ns |------------------------------------------L2.1------------------------------------------|" - - "WARNING: file L2.1[1,1000] 1ns 300mb exceeds soft limit 100mb by more than 50%" + - "L2.1[1,1000] 5ns |------------------------------------------L2.1------------------------------------------|" + - "WARNING: file L2.1[1,1000] 5ns 300mb exceeds soft limit 100mb by more than 50%" "### ); } @@ -155,6 +157,7 @@ async fn one_l0_larger_max_compact_size() { .with_min_time(1) .with_max_time(1000) .with_compaction_level(CompactionLevel::Initial) + .with_max_l0_created_at(Time::from_timestamp_nanos(5)) // file > max_desired_file_size_bytes .with_file_size_bytes((max_compact_size + 1) as u64), ) @@ -167,16 +170,16 @@ async fn one_l0_larger_max_compact_size() { --- - "**** Input Files " - "L0, all files 300mb " - - "L0.1[1,1000] 1ns |------------------------------------------L0.1------------------------------------------|" - - "WARNING: file L0.1[1,1000] 1ns 300mb exceeds soft limit 100mb by more than 50%" + - "L0.1[1,1000] 5ns |------------------------------------------L0.1------------------------------------------|" + - "WARNING: file L0.1[1,1000] 5ns 300mb exceeds soft limit 100mb by more than 50%" - "Committing partition 1:" - " Upgrading 1 files level to CompactionLevel::L1: L0.1" - "Committing partition 1:" - " Upgrading 1 files level to CompactionLevel::L2: L1.1" - "**** Final Output Files " - "L2, all files 300mb " - - "L2.1[1,1000] 1ns |------------------------------------------L2.1------------------------------------------|" - - "WARNING: file L2.1[1,1000] 1ns 300mb exceeds soft limit 100mb by more than 50%" + - "L2.1[1,1000] 5ns |------------------------------------------L2.1------------------------------------------|" + - "WARNING: file L2.1[1,1000] 5ns 300mb exceeds soft limit 100mb by more than 50%" "### ); } @@ -203,6 +206,8 @@ async fn two_large_files_total_under_max_compact_size() { .with_min_time(i) .with_max_time(1000) .with_compaction_level(CompactionLevel::try_from(i as i32).unwrap()) + // max_l0_created_at of larger level is set to be smaller + .with_max_l0_created_at(Time::from_timestamp_nanos(10 - i)) .with_file_size_bytes(size), ) .await; @@ -214,25 +219,25 @@ async fn two_large_files_total_under_max_compact_size() { --- - "**** Input Files " - "L1, all files 100mb " - - "L1.1[1,1000] 1ns |------------------------------------------L1.1------------------------------------------|" + - "L1.1[1,1000] 9ns |------------------------------------------L1.1------------------------------------------|" - "L2, all files 100mb " - - "L2.2[2,1000] 1ns |-----------------------------------------L2.2------------------------------------------| " + - "L2.2[2,1000] 8ns |-----------------------------------------L2.2------------------------------------------| " - "**** Simulation run 0, type=split(split_times=[501]). 2 Input Files, 200mb total:" - "L1, all files 100mb " - - "L1.1[1,1000] 1ns |------------------------------------------L1.1------------------------------------------|" + - "L1.1[1,1000] 9ns |------------------------------------------L1.1------------------------------------------|" - "L2, all files 100mb " - - "L2.2[2,1000] 1ns |-----------------------------------------L2.2------------------------------------------| " + - "L2.2[2,1000] 8ns |-----------------------------------------L2.2------------------------------------------| " - "**** 2 Output Files (parquet_file_id not yet assigned), 200mb total:" - "L2 " - - "L2.?[1,501] 1ns 100.1mb |-------------------L2.?--------------------| " - - "L2.?[502,1000] 1ns 99.9mb |-------------------L2.?-------------------| " + - "L2.?[1,501] 9ns 100.1mb |-------------------L2.?--------------------| " + - "L2.?[502,1000] 9ns 99.9mb |-------------------L2.?-------------------| " - "Committing partition 1:" - " Soft Deleting 2 files: L1.1, L2.2" - " Creating 2 files" - "**** Final Output Files " - "L2 " - - "L2.3[1,501] 1ns 100.1mb |-------------------L2.3--------------------| " - - "L2.4[502,1000] 1ns 99.9mb |-------------------L2.4-------------------| " + - "L2.3[1,501] 9ns 100.1mb |-------------------L2.3--------------------| " + - "L2.4[502,1000] 9ns 99.9mb |-------------------L2.4-------------------| " "### ); } @@ -260,6 +265,8 @@ async fn two_large_files_total_over_max_compact_size() { .with_max_time(1000) // L1.1 or L2.2 .with_compaction_level(CompactionLevel::try_from(i as i32).unwrap()) + // max_l0_created_at of larger level is set to be smaller + .with_max_l0_created_at(Time::from_timestamp_nanos(10 - i)) .with_file_size_bytes(size as u64), ) .await; @@ -271,58 +278,58 @@ async fn two_large_files_total_over_max_compact_size() { --- - "**** Input Files " - "L1, all files 150mb " - - "L1.1[1,1000] 1ns |------------------------------------------L1.1------------------------------------------|" + - "L1.1[1,1000] 9ns |------------------------------------------L1.1------------------------------------------|" - "L2, all files 150mb " - - "L2.2[2,1000] 1ns |-----------------------------------------L2.2------------------------------------------| " - - "WARNING: file L1.1[1,1000] 1ns 150mb exceeds soft limit 100mb by more than 50%" - - "WARNING: file L2.2[2,1000] 1ns 150mb exceeds soft limit 100mb by more than 50%" + - "L2.2[2,1000] 8ns |-----------------------------------------L2.2------------------------------------------| " + - "WARNING: file L1.1[1,1000] 9ns 150mb exceeds soft limit 100mb by more than 50%" + - "WARNING: file L2.2[2,1000] 8ns 150mb exceeds soft limit 100mb by more than 50%" - "**** Simulation run 0, type=split(split_times=[667]). 1 Input Files, 150mb total:" - "L1, all files 150mb " - - "L1.1[1,1000] 1ns |------------------------------------------L1.1------------------------------------------|" + - "L1.1[1,1000] 9ns |------------------------------------------L1.1------------------------------------------|" - "**** 2 Output Files (parquet_file_id not yet assigned), 150mb total:" - "L1 " - - "L1.?[1,667] 1ns 100mb |---------------------------L1.?---------------------------| " - - "L1.?[668,1000] 1ns 50mb |-----------L1.?------------| " + - "L1.?[1,667] 9ns 100mb |---------------------------L1.?---------------------------| " + - "L1.?[668,1000] 9ns 50mb |-----------L1.?------------| " - "**** Simulation run 1, type=split(split_times=[668]). 1 Input Files, 150mb total:" - "L2, all files 150mb " - - "L2.2[2,1000] 1ns |------------------------------------------L2.2------------------------------------------|" + - "L2.2[2,1000] 8ns |------------------------------------------L2.2------------------------------------------|" - "**** 2 Output Files (parquet_file_id not yet assigned), 150mb total:" - "L2 " - - "L2.?[2,668] 1ns 100.1mb |---------------------------L2.?---------------------------| " - - "L2.?[669,1000] 1ns 49.9mb |-----------L2.?------------| " + - "L2.?[2,668] 8ns 100.1mb |---------------------------L2.?---------------------------| " + - "L2.?[669,1000] 8ns 49.9mb |-----------L2.?------------| " - "Committing partition 1:" - " Soft Deleting 2 files: L1.1, L2.2" - " Creating 4 files" - "**** Simulation run 2, type=split(split_times=[668]). 1 Input Files, 50mb total:" - "L1, all files 50mb " - - "L1.4[668,1000] 1ns |------------------------------------------L1.4------------------------------------------|" + - "L1.4[668,1000] 9ns |------------------------------------------L1.4------------------------------------------|" - "**** 2 Output Files (parquet_file_id not yet assigned), 50mb total:" - "L1 " - - "L1.?[668,668] 1ns 0b |L1.?| " - - "L1.?[669,1000] 1ns 50mb |-----------------------------------------L1.?------------------------------------------| " + - "L1.?[668,668] 9ns 0b |L1.?| " + - "L1.?[669,1000] 9ns 50mb |-----------------------------------------L1.?------------------------------------------| " - "Committing partition 1:" - " Soft Deleting 1 files: L1.4" - " Creating 2 files" - "**** Simulation run 3, type=split(split_times=[335]). 3 Input Files, 200.1mb total:" - "L1 " - - "L1.3[1,667] 1ns 100mb |-----------------------------------------L1.3------------------------------------------| " - - "L1.7[668,668] 1ns 0b |L1.7|" + - "L1.3[1,667] 9ns 100mb |-----------------------------------------L1.3------------------------------------------| " + - "L1.7[668,668] 9ns 0b |L1.7|" - "L2 " - - "L2.5[2,668] 1ns 100.1mb |-----------------------------------------L2.5------------------------------------------| " + - "L2.5[2,668] 8ns 100.1mb |-----------------------------------------L2.5------------------------------------------| " - "**** 2 Output Files (parquet_file_id not yet assigned), 200.1mb total:" - "L2 " - - "L2.?[1,335] 1ns 100.2mb |-------------------L2.?--------------------| " - - "L2.?[336,668] 1ns 99.9mb |-------------------L2.?-------------------| " + - "L2.?[1,335] 9ns 100.2mb |-------------------L2.?--------------------| " + - "L2.?[336,668] 9ns 99.9mb |-------------------L2.?-------------------| " - "Committing partition 1:" - " Soft Deleting 3 files: L1.3, L2.5, L1.7" - " Creating 2 files" - "**** Final Output Files " - "L1 " - - "L1.8[669,1000] 1ns 50mb |-----------L1.8------------| " + - "L1.8[669,1000] 9ns 50mb |-----------L1.8------------| " - "L2 " - - "L2.6[669,1000] 1ns 49.9mb |-----------L2.6------------| " - - "L2.9[1,335] 1ns 100.2mb |------------L2.9------------| " - - "L2.10[336,668] 1ns 99.9mb |-----------L2.10-----------| " + - "L2.6[669,1000] 8ns 49.9mb |-----------L2.6------------| " + - "L2.9[1,335] 9ns 100.2mb |------------L2.9------------| " + - "L2.10[336,668] 9ns 99.9mb |-----------L2.10-----------| " "### ); } @@ -351,6 +358,8 @@ async fn two_large_files_total_over_max_compact_size_small_overlap_range() { .with_max_time(1000) // L1.1 or L2.2 .with_compaction_level(CompactionLevel::try_from(i as i32).unwrap()) + // max_l0_created_at of larger level is set to be smaller + .with_max_l0_created_at(Time::from_timestamp_nanos(10 - i)) .with_file_size_bytes(size as u64), ) .await; @@ -362,47 +371,47 @@ async fn two_large_files_total_over_max_compact_size_small_overlap_range() { --- - "**** Input Files " - "L1, all files 150mb " - - "L1.1[0,1000] 1ns |------------------------------------------L1.1------------------------------------------|" + - "L1.1[0,1000] 9ns |------------------------------------------L1.1------------------------------------------|" - "L2, all files 150mb " - - "L2.2[800,1000] 1ns |------L2.2------|" - - "WARNING: file L1.1[0,1000] 1ns 150mb exceeds soft limit 100mb by more than 50%" - - "WARNING: file L2.2[800,1000] 1ns 150mb exceeds soft limit 100mb by more than 50%" + - "L2.2[800,1000] 8ns |------L2.2------|" + - "WARNING: file L1.1[0,1000] 9ns 150mb exceeds soft limit 100mb by more than 50%" + - "WARNING: file L2.2[800,1000] 8ns 150mb exceeds soft limit 100mb by more than 50%" - "**** Simulation run 0, type=split(split_times=[667]). 1 Input Files, 150mb total:" - "L1, all files 150mb " - - "L1.1[0,1000] 1ns |------------------------------------------L1.1------------------------------------------|" + - "L1.1[0,1000] 9ns |------------------------------------------L1.1------------------------------------------|" - "**** 2 Output Files (parquet_file_id not yet assigned), 150mb total:" - "L1 " - - "L1.?[0,667] 1ns 100.05mb |---------------------------L1.?---------------------------| " - - "L1.?[668,1000] 1ns 49.95mb |-----------L1.?------------| " + - "L1.?[0,667] 9ns 100.05mb |---------------------------L1.?---------------------------| " + - "L1.?[668,1000] 9ns 49.95mb |-----------L1.?------------| " - "**** Simulation run 1, type=split(split_times=[934]). 1 Input Files, 150mb total:" - "L2, all files 150mb " - - "L2.2[800,1000] 1ns |------------------------------------------L2.2------------------------------------------|" + - "L2.2[800,1000] 8ns |------------------------------------------L2.2------------------------------------------|" - "**** 2 Output Files (parquet_file_id not yet assigned), 150mb total:" - "L2 " - - "L2.?[800,934] 1ns 100.5mb|---------------------------L2.?---------------------------| " - - "L2.?[935,1000] 1ns 49.5mb |-----------L2.?------------| " + - "L2.?[800,934] 8ns 100.5mb|---------------------------L2.?---------------------------| " + - "L2.?[935,1000] 8ns 49.5mb |-----------L2.?------------| " - "Committing partition 1:" - " Soft Deleting 2 files: L1.1, L2.2" - " Creating 4 files" - "**** Simulation run 2, type=split(split_times=[835]). 3 Input Files, 199.95mb total:" - "L1 " - - "L1.4[668,1000] 1ns 49.95mb|------------------------------------------L1.4------------------------------------------|" + - "L1.4[668,1000] 9ns 49.95mb|------------------------------------------L1.4------------------------------------------|" - "L2 " - - "L2.5[800,934] 1ns 100.5mb |---------------L2.5---------------| " - - "L2.6[935,1000] 1ns 49.5mb |-----L2.6------| " + - "L2.5[800,934] 8ns 100.5mb |---------------L2.5---------------| " + - "L2.6[935,1000] 8ns 49.5mb |-----L2.6------| " - "**** 2 Output Files (parquet_file_id not yet assigned), 199.95mb total:" - "L2 " - - "L2.?[668,835] 1ns 100.58mb|-------------------L2.?--------------------| " - - "L2.?[836,1000] 1ns 99.37mb |-------------------L2.?-------------------| " + - "L2.?[668,835] 9ns 100.58mb|-------------------L2.?--------------------| " + - "L2.?[836,1000] 9ns 99.37mb |-------------------L2.?-------------------| " - "Committing partition 1:" - " Soft Deleting 3 files: L1.4, L2.5, L2.6" - " Upgrading 1 files level to CompactionLevel::L2: L1.3" - " Creating 2 files" - "**** Final Output Files " - "L2 " - - "L2.3[0,667] 1ns 100.05mb |---------------------------L2.3---------------------------| " - - "L2.7[668,835] 1ns 100.58mb |----L2.7-----| " - - "L2.8[836,1000] 1ns 99.37mb |----L2.8----| " + - "L2.3[0,667] 9ns 100.05mb |---------------------------L2.3---------------------------| " + - "L2.7[668,835] 9ns 100.58mb |----L2.7-----| " + - "L2.8[836,1000] 9ns 99.37mb |----L2.8----| " "### ); } @@ -432,6 +441,8 @@ async fn two_large_files_total_over_max_compact_size_small_overlap_range_2() { .with_max_time((i + 1) * 1000) // L1.1 or L2.2 .with_compaction_level(CompactionLevel::try_from(i as i32).unwrap()) + // max_l0_created_at of larger level is set to be smaller + .with_max_l0_created_at(Time::from_timestamp_nanos(10 - i)) .with_file_size_bytes(size as u64), ) .await; @@ -443,48 +454,48 @@ async fn two_large_files_total_over_max_compact_size_small_overlap_range_2() { --- - "**** Input Files " - "L1, all files 150mb " - - "L1.1[800,2000] 1ns |---------------------L1.1----------------------| " + - "L1.1[800,2000] 9ns |---------------------L1.1----------------------| " - "L2, all files 150mb " - - "L2.2[1600,3000] 1ns |-------------------------L2.2--------------------------| " - - "WARNING: file L1.1[800,2000] 1ns 150mb exceeds soft limit 100mb by more than 50%" - - "WARNING: file L2.2[1600,3000] 1ns 150mb exceeds soft limit 100mb by more than 50%" + - "L2.2[1600,3000] 8ns |-------------------------L2.2--------------------------| " + - "WARNING: file L1.1[800,2000] 9ns 150mb exceeds soft limit 100mb by more than 50%" + - "WARNING: file L2.2[1600,3000] 8ns 150mb exceeds soft limit 100mb by more than 50%" - "**** Simulation run 0, type=split(split_times=[1600]). 1 Input Files, 150mb total:" - "L1, all files 150mb " - - "L1.1[800,2000] 1ns |------------------------------------------L1.1------------------------------------------|" + - "L1.1[800,2000] 9ns |------------------------------------------L1.1------------------------------------------|" - "**** 2 Output Files (parquet_file_id not yet assigned), 150mb total:" - "L1 " - - "L1.?[800,1600] 1ns 100mb |---------------------------L1.?---------------------------| " - - "L1.?[1601,2000] 1ns 50mb |-----------L1.?------------| " + - "L1.?[800,1600] 9ns 100mb |---------------------------L1.?---------------------------| " + - "L1.?[1601,2000] 9ns 50mb |-----------L1.?------------| " - "**** Simulation run 1, type=split(split_times=[2534]). 1 Input Files, 150mb total:" - "L2, all files 150mb " - - "L2.2[1600,3000] 1ns |------------------------------------------L2.2------------------------------------------|" + - "L2.2[1600,3000] 8ns |------------------------------------------L2.2------------------------------------------|" - "**** 2 Output Files (parquet_file_id not yet assigned), 150mb total:" - "L2 " - - "L2.?[1600,2534] 1ns 100.07mb|---------------------------L2.?---------------------------| " - - "L2.?[2535,3000] 1ns 49.93mb |-----------L2.?------------| " + - "L2.?[1600,2534] 8ns 100.07mb|---------------------------L2.?---------------------------| " + - "L2.?[2535,3000] 8ns 49.93mb |-----------L2.?------------| " - "Committing partition 1:" - " Soft Deleting 2 files: L1.1, L2.2" - " Creating 4 files" - "**** Simulation run 2, type=split(split_times=[1494, 2188]). 3 Input Files, 250.07mb total:" - "L1 " - - "L1.4[1601,2000] 1ns 50mb |-------L1.4-------| " - - "L1.3[800,1600] 1ns 100mb |-----------------L1.3------------------| " + - "L1.4[1601,2000] 9ns 50mb |-------L1.4-------| " + - "L1.3[800,1600] 9ns 100mb |-----------------L1.3------------------| " - "L2 " - - "L2.5[1600,2534] 1ns 100.07mb |---------------------L2.5---------------------| " + - "L2.5[1600,2534] 8ns 100.07mb |---------------------L2.5---------------------| " - "**** 3 Output Files (parquet_file_id not yet assigned), 250.07mb total:" - "L2 " - - "L2.?[800,1494] 1ns 100.09mb|---------------L2.?---------------| " - - "L2.?[1495,2188] 1ns 99.94mb |--------------L2.?---------------| " - - "L2.?[2189,2534] 1ns 50.04mb |-----L2.?------| " + - "L2.?[800,1494] 9ns 100.09mb|---------------L2.?---------------| " + - "L2.?[1495,2188] 9ns 99.94mb |--------------L2.?---------------| " + - "L2.?[2189,2534] 9ns 50.04mb |-----L2.?------| " - "Committing partition 1:" - " Soft Deleting 3 files: L1.3, L1.4, L2.5" - " Creating 3 files" - "**** Final Output Files " - "L2 " - - "L2.6[2535,3000] 1ns 49.93mb |------L2.6-------| " - - "L2.7[800,1494] 1ns 100.09mb|-----------L2.7-----------| " - - "L2.8[1495,2188] 1ns 99.94mb |-----------L2.8-----------| " - - "L2.9[2189,2534] 1ns 50.04mb |----L2.9----| " + - "L2.6[2535,3000] 8ns 49.93mb |------L2.6-------| " + - "L2.7[800,1494] 9ns 100.09mb|-----------L2.7-----------| " + - "L2.8[1495,2188] 9ns 99.94mb |-----------L2.8-----------| " + - "L2.9[2189,2534] 9ns 50.04mb |----L2.9----| " "### ); } @@ -514,6 +525,8 @@ async fn two_large_files_total_over_max_compact_size_small_overlap_range_3() { .with_max_time((i - 1) * 1000 + 300) // L1.1 or L2.2 .with_compaction_level(CompactionLevel::try_from(i as i32).unwrap()) + // max_l0_created_at of larger level is set to be smaller + .with_max_l0_created_at(Time::from_timestamp_nanos(10 - i)) .with_file_size_bytes(size as u64), ) .await; @@ -525,48 +538,48 @@ async fn two_large_files_total_over_max_compact_size_small_overlap_range_3() { --- - "**** Input Files " - "L1, all files 150mb " - - "L1.1[0,300] 1ns |-------L1.1-------| " + - "L1.1[0,300] 9ns |-------L1.1-------| " - "L2, all files 150mb " - - "L2.2[200,1300] 1ns |-----------------------------------L2.2-----------------------------------| " - - "WARNING: file L1.1[0,300] 1ns 150mb exceeds soft limit 100mb by more than 50%" - - "WARNING: file L2.2[200,1300] 1ns 150mb exceeds soft limit 100mb by more than 50%" + - "L2.2[200,1300] 8ns |-----------------------------------L2.2-----------------------------------| " + - "WARNING: file L1.1[0,300] 9ns 150mb exceeds soft limit 100mb by more than 50%" + - "WARNING: file L2.2[200,1300] 8ns 150mb exceeds soft limit 100mb by more than 50%" - "**** Simulation run 0, type=split(split_times=[200]). 1 Input Files, 150mb total:" - "L1, all files 150mb " - - "L1.1[0,300] 1ns |------------------------------------------L1.1------------------------------------------|" + - "L1.1[0,300] 9ns |------------------------------------------L1.1------------------------------------------|" - "**** 2 Output Files (parquet_file_id not yet assigned), 150mb total:" - "L1 " - - "L1.?[0,200] 1ns 100mb |---------------------------L1.?---------------------------| " - - "L1.?[201,300] 1ns 50mb |-----------L1.?------------| " + - "L1.?[0,200] 9ns 100mb |---------------------------L1.?---------------------------| " + - "L1.?[201,300] 9ns 50mb |-----------L1.?------------| " - "**** Simulation run 1, type=split(split_times=[934]). 1 Input Files, 150mb total:" - "L2, all files 150mb " - - "L2.2[200,1300] 1ns |------------------------------------------L2.2------------------------------------------|" + - "L2.2[200,1300] 8ns |------------------------------------------L2.2------------------------------------------|" - "**** 2 Output Files (parquet_file_id not yet assigned), 150mb total:" - "L2 " - - "L2.?[200,934] 1ns 100.09mb|---------------------------L2.?---------------------------| " - - "L2.?[935,1300] 1ns 49.91mb |-----------L2.?------------| " + - "L2.?[200,934] 8ns 100.09mb|---------------------------L2.?---------------------------| " + - "L2.?[935,1300] 8ns 49.91mb |-----------L2.?------------| " - "Committing partition 1:" - " Soft Deleting 2 files: L1.1, L2.2" - " Creating 4 files" - "**** Simulation run 2, type=split(split_times=[374, 748]). 3 Input Files, 250.09mb total:" - "L1 " - - "L1.4[201,300] 1ns 50mb |-L1.4--| " - - "L1.3[0,200] 1ns 100mb |------L1.3-------| " + - "L1.4[201,300] 9ns 50mb |-L1.4--| " + - "L1.3[0,200] 9ns 100mb |------L1.3-------| " - "L2 " - - "L2.5[200,934] 1ns 100.09mb |--------------------------------L2.5--------------------------------| " + - "L2.5[200,934] 8ns 100.09mb |--------------------------------L2.5--------------------------------| " - "**** 3 Output Files (parquet_file_id not yet assigned), 250.09mb total:" - "L2 " - - "L2.?[0,374] 1ns 100.14mb |---------------L2.?---------------| " - - "L2.?[375,748] 1ns 99.88mb |--------------L2.?---------------| " - - "L2.?[749,934] 1ns 50.07mb |-----L2.?------| " + - "L2.?[0,374] 9ns 100.14mb |---------------L2.?---------------| " + - "L2.?[375,748] 9ns 99.88mb |--------------L2.?---------------| " + - "L2.?[749,934] 9ns 50.07mb |-----L2.?------| " - "Committing partition 1:" - " Soft Deleting 3 files: L1.3, L1.4, L2.5" - " Creating 3 files" - "**** Final Output Files " - "L2 " - - "L2.6[935,1300] 1ns 49.91mb |---------L2.6----------| " - - "L2.7[0,374] 1ns 100.14mb |---------L2.7----------| " - - "L2.8[375,748] 1ns 99.88mb |---------L2.8----------| " - - "L2.9[749,934] 1ns 50.07mb |---L2.9---| " + - "L2.6[935,1300] 8ns 49.91mb |---------L2.6----------| " + - "L2.7[0,374] 9ns 100.14mb |---------L2.7----------| " + - "L2.8[375,748] 9ns 99.88mb |---------L2.8----------| " + - "L2.9[749,934] 9ns 50.07mb |---L2.9---| " "### ); } @@ -593,6 +606,8 @@ async fn two_large_files_total_over_max_compact_size_start_l0() { parquet_builder() .with_min_time(i) .with_max_time(1000) + // time of L0 larger than time of L1 + .with_max_l0_created_at(Time::from_timestamp_nanos(10 - i)) // L0.1 or L1.2 .with_compaction_level(CompactionLevel::try_from(i as i32).unwrap()) .with_file_size_bytes(size as u64), @@ -606,72 +621,88 @@ async fn two_large_files_total_over_max_compact_size_start_l0() { --- - "**** Input Files " - "L0, all files 150mb " - - "L0.1[0,1000] 1ns |------------------------------------------L0.1------------------------------------------|" + - "L0.1[0,1000] 10ns |------------------------------------------L0.1------------------------------------------|" - "L1, all files 150mb " - - "L1.2[1,1000] 1ns |-----------------------------------------L1.2------------------------------------------| " - - "WARNING: file L0.1[0,1000] 1ns 150mb exceeds soft limit 100mb by more than 50%" - - "WARNING: file L1.2[1,1000] 1ns 150mb exceeds soft limit 100mb by more than 50%" + - "L1.2[1,1000] 9ns |-----------------------------------------L1.2------------------------------------------| " + - "WARNING: file L0.1[0,1000] 10ns 150mb exceeds soft limit 100mb by more than 50%" + - "WARNING: file L1.2[1,1000] 9ns 150mb exceeds soft limit 100mb by more than 50%" - "**** Simulation run 0, type=split(split_times=[667]). 1 Input Files, 150mb total:" - "L0, all files 150mb " - - "L0.1[0,1000] 1ns |------------------------------------------L0.1------------------------------------------|" + - "L0.1[0,1000] 10ns |------------------------------------------L0.1------------------------------------------|" - "**** 2 Output Files (parquet_file_id not yet assigned), 150mb total:" - "L0 " - - "L0.?[0,667] 1ns 100.05mb |---------------------------L0.?---------------------------| " - - "L0.?[668,1000] 1ns 49.95mb |-----------L0.?------------| " + - "L0.?[0,667] 10ns 100.05mb|---------------------------L0.?---------------------------| " + - "L0.?[668,1000] 10ns 49.95mb |-----------L0.?------------| " - "**** Simulation run 1, type=split(split_times=[667]). 1 Input Files, 150mb total:" - "L1, all files 150mb " - - "L1.2[1,1000] 1ns |------------------------------------------L1.2------------------------------------------|" + - "L1.2[1,1000] 9ns |------------------------------------------L1.2------------------------------------------|" - "**** 2 Output Files (parquet_file_id not yet assigned), 150mb total:" - "L1 " - - "L1.?[1,667] 1ns 100mb |---------------------------L1.?---------------------------| " - - "L1.?[668,1000] 1ns 50mb |-----------L1.?------------| " + - "L1.?[1,667] 9ns 100mb |---------------------------L1.?---------------------------| " + - "L1.?[668,1000] 9ns 50mb |-----------L1.?------------| " - "Committing partition 1:" - " Soft Deleting 2 files: L0.1, L1.2" - " Creating 4 files" - "**** Simulation run 2, type=split(split_times=[933]). 2 Input Files, 99.95mb total:" - "L0 " - - "L0.4[668,1000] 1ns 49.95mb|------------------------------------------L0.4------------------------------------------|" + - "L0.4[668,1000] 10ns 49.95mb|------------------------------------------L0.4------------------------------------------|" - "L1 " - - "L1.6[668,1000] 1ns 50mb |------------------------------------------L1.6------------------------------------------|" + - "L1.6[668,1000] 9ns 50mb |------------------------------------------L1.6------------------------------------------|" - "**** 2 Output Files (parquet_file_id not yet assigned), 99.95mb total:" - "L1 " - - "L1.?[668,933] 1ns 79.78mb|--------------------------------L1.?---------------------------------| " - - "L1.?[934,1000] 1ns 20.17mb |-----L1.?------| " + - "L1.?[668,933] 10ns 79.78mb|--------------------------------L1.?---------------------------------| " + - "L1.?[934,1000] 10ns 20.17mb |-----L1.?------| " - "Committing partition 1:" - " Soft Deleting 2 files: L0.4, L1.6" - " Creating 2 files" - "**** Simulation run 3, type=split(split_times=[334]). 2 Input Files, 200.05mb total:" - "L0 " - - "L0.3[0,667] 1ns 100.05mb |------------------------------------------L0.3------------------------------------------|" + - "L0.3[0,667] 10ns 100.05mb|------------------------------------------L0.3------------------------------------------|" - "L1 " - - "L1.5[1,667] 1ns 100mb |-----------------------------------------L1.5------------------------------------------| " + - "L1.5[1,667] 9ns 100mb |-----------------------------------------L1.5------------------------------------------| " - "**** 2 Output Files (parquet_file_id not yet assigned), 200.05mb total:" - "L1 " - - "L1.?[0,334] 1ns 100.17mb |-------------------L1.?--------------------| " - - "L1.?[335,667] 1ns 99.88mb |-------------------L1.?-------------------| " + - "L1.?[0,334] 10ns 100.17mb|-------------------L1.?--------------------| " + - "L1.?[335,667] 10ns 99.88mb |-------------------L1.?-------------------| " - "Committing partition 1:" - " Soft Deleting 2 files: L0.3, L1.5" - " Creating 2 files" - "**** Simulation run 4, type=split(split_times=[668]). 3 Input Files, 199.83mb total:" - "L1 " - - "L1.7[668,933] 1ns 79.78mb |--------------L1.7---------------| " - - "L1.8[934,1000] 1ns 20.17mb |-L1.8-| " - - "L1.10[335,667] 1ns 99.88mb|------------------L1.10-------------------| " + - "L1.7[668,933] 10ns 79.78mb |--------------L1.7---------------| " + - "L1.8[934,1000] 10ns 20.17mb |-L1.8-| " + - "L1.10[335,667] 10ns 99.88mb|------------------L1.10-------------------| " - "**** 2 Output Files (parquet_file_id not yet assigned), 199.83mb total:" - "L2 " - - "L2.?[335,668] 1ns 100.06mb|-------------------L2.?--------------------| " - - "L2.?[669,1000] 1ns 99.76mb |-------------------L2.?-------------------| " + - "L2.?[335,668] 10ns 100.06mb|-------------------L2.?--------------------| " + - "L2.?[669,1000] 10ns 99.76mb |-------------------L2.?-------------------| " - "Committing partition 1:" - " Soft Deleting 3 files: L1.7, L1.8, L1.10" - " Upgrading 1 files level to CompactionLevel::L2: L1.9" - " Creating 2 files" - "**** Final Output Files " - "L2 " - - "L2.9[0,334] 1ns 100.17mb |------------L2.9------------| " - - "L2.11[335,668] 1ns 100.06mb |-----------L2.11-----------| " - - "L2.12[669,1000] 1ns 99.76mb |-----------L2.12-----------| " + - "L2.9[0,334] 10ns 100.17mb|------------L2.9------------| " + - "L2.11[335,668] 10ns 100.06mb |-----------L2.11-----------| " + - "L2.12[669,1000] 10ns 99.76mb |-----------L2.12-----------| " "### ); + + // Read all 12 files including the soft deleted ones + let output_files = setup.list_by_table().await; + assert_eq!(output_files.len(), 12); + + // Sort the files by id + let mut output_files = output_files; + output_files.sort_by(|a, b| a.id.cmp(&b.id)); + + // Verify all L0 files created by splitting must have value of max_l0_created_at 10 which is the value of the riginal L0 + // Note: this test make created_test deterministic and 1 which we do not care much about + for file in &output_files { + if file.compaction_level == CompactionLevel::Initial { + assert_eq!(file.max_l0_created_at.get(), 10); + } + } } // Real-life case with three good size L1s and one very large L2 @@ -701,6 +732,8 @@ async fn target_too_large_1() { .with_min_time(1) .with_max_time(1000) .with_compaction_level(CompactionLevel::Final) + // level-2 file has small max_l0_created_at, 5 + .with_max_l0_created_at(Time::from_timestamp_nanos(5)) .with_file_size_bytes(l2_size), ) .await; @@ -714,6 +747,9 @@ async fn target_too_large_1() { .with_min_time(i * 1000 + 1) .with_max_time(i * 1000 + 1000) .with_compaction_level(CompactionLevel::FileNonOverlapped) + // level-1 files, each has different max_l0_created_at and >= 10 + // simulate to have L1 with larger time range having smaller max_l0_created_at which is a rare use case + .with_max_l0_created_at(Time::from_timestamp_nanos(10 + (10 - i))) .with_file_size_bytes(l1_sizes[i as usize]), ) .await; @@ -725,59 +761,59 @@ async fn target_too_large_1() { --- - "**** Input Files " - "L1 " - - "L1.2[1,1000] 1ns 53mb |-----------L1.2------------| " - - "L1.3[1001,2000] 1ns 45mb |-----------L1.3------------| " - - "L1.4[2001,3000] 1ns 5mb |-----------L1.4------------| " + - "L1.2[1,1000] 20ns 53mb |-----------L1.2------------| " + - "L1.3[1001,2000] 19ns 45mb |-----------L1.3------------| " + - "L1.4[2001,3000] 18ns 5mb |-----------L1.4------------| " - "L2 " - - "L2.1[1,1000] 1ns 253mb |-----------L2.1------------| " - - "WARNING: file L2.1[1,1000] 1ns 253mb exceeds soft limit 100mb by more than 50%" + - "L2.1[1,1000] 5ns 253mb |-----------L2.1------------| " + - "WARNING: file L2.1[1,1000] 5ns 253mb exceeds soft limit 100mb by more than 50%" - "**** Simulation run 0, type=split(split_times=[396, 791]). 1 Input Files, 253mb total:" - "L2, all files 253mb " - - "L2.1[1,1000] 1ns |------------------------------------------L2.1------------------------------------------|" + - "L2.1[1,1000] 5ns |------------------------------------------L2.1------------------------------------------|" - "**** 3 Output Files (parquet_file_id not yet assigned), 253mb total:" - "L2 " - - "L2.?[1,396] 1ns 100.04mb |--------------L2.?---------------| " - - "L2.?[397,791] 1ns 99.78mb |--------------L2.?---------------| " - - "L2.?[792,1000] 1ns 53.18mb |------L2.?------| " + - "L2.?[1,396] 5ns 100.04mb |--------------L2.?---------------| " + - "L2.?[397,791] 5ns 99.78mb |--------------L2.?---------------| " + - "L2.?[792,1000] 5ns 53.18mb |------L2.?------| " - "Committing partition 1:" - " Soft Deleting 1 files: L2.1" - " Creating 3 files" - "**** Simulation run 1, type=split(split_times=[396, 791]). 1 Input Files, 53mb total:" - "L1, all files 53mb " - - "L1.2[1,1000] 1ns |------------------------------------------L1.2------------------------------------------|" + - "L1.2[1,1000] 20ns |------------------------------------------L1.2------------------------------------------|" - "**** 3 Output Files (parquet_file_id not yet assigned), 53mb total:" - "L1 " - - "L1.?[1,396] 1ns 20.96mb |--------------L1.?---------------| " - - "L1.?[397,791] 1ns 20.9mb |--------------L1.?---------------| " - - "L1.?[792,1000] 1ns 11.14mb |------L1.?------| " + - "L1.?[1,396] 20ns 20.96mb |--------------L1.?---------------| " + - "L1.?[397,791] 20ns 20.9mb |--------------L1.?---------------| " + - "L1.?[792,1000] 20ns 11.14mb |------L1.?------| " - "Committing partition 1:" - " Soft Deleting 1 files: L1.2" - " Creating 3 files" - "**** Simulation run 2, type=split(split_times=[328, 655]). 4 Input Files, 241.68mb total:" - "L1 " - - "L1.8[1,396] 1ns 20.96mb |-------------------L1.8-------------------| " - - "L1.9[397,791] 1ns 20.9mb |-------------------L1.9-------------------| " + - "L1.8[1,396] 20ns 20.96mb |-------------------L1.8-------------------| " + - "L1.9[397,791] 20ns 20.9mb |-------------------L1.9-------------------| " - "L2 " - - "L2.5[1,396] 1ns 100.04mb |-------------------L2.5-------------------| " - - "L2.6[397,791] 1ns 99.78mb |-------------------L2.6-------------------| " + - "L2.5[1,396] 5ns 100.04mb |-------------------L2.5-------------------| " + - "L2.6[397,791] 5ns 99.78mb |-------------------L2.6-------------------| " - "**** 3 Output Files (parquet_file_id not yet assigned), 241.68mb total:" - "L2 " - - "L2.?[1,328] 1ns 100.04mb |---------------L2.?----------------| " - - "L2.?[329,655] 1ns 99.73mb |---------------L2.?----------------| " - - "L2.?[656,791] 1ns 41.91mb |----L2.?-----| " + - "L2.?[1,328] 20ns 100.04mb|---------------L2.?----------------| " + - "L2.?[329,655] 20ns 99.73mb |---------------L2.?----------------| " + - "L2.?[656,791] 20ns 41.91mb |----L2.?-----| " - "Committing partition 1:" - " Soft Deleting 4 files: L2.5, L2.6, L1.8, L1.9" - " Creating 3 files" - "**** Final Output Files " - "L1 " - - "L1.3[1001,2000] 1ns 45mb |-----------L1.3------------| " - - "L1.4[2001,3000] 1ns 5mb |-----------L1.4------------| " - - "L1.10[792,1000] 1ns 11.14mb |L1.10| " + - "L1.3[1001,2000] 19ns 45mb |-----------L1.3------------| " + - "L1.4[2001,3000] 18ns 5mb |-----------L1.4------------| " + - "L1.10[792,1000] 20ns 11.14mb |L1.10| " - "L2 " - - "L2.7[792,1000] 1ns 53.18mb |L2.7| " - - "L2.11[1,328] 1ns 100.04mb|-L2.11-| " - - "L2.12[329,655] 1ns 99.73mb |-L2.12-| " - - "L2.13[656,791] 1ns 41.91mb |L2.13| " + - "L2.7[792,1000] 5ns 53.18mb |L2.7| " + - "L2.11[1,328] 20ns 100.04mb|-L2.11-| " + - "L2.12[329,655] 20ns 99.73mb |-L2.12-| " + - "L2.13[656,791] 20ns 41.91mb |L2.13| " "### ); } @@ -809,6 +845,8 @@ async fn target_too_large_2() { .with_min_time(1) .with_max_time(3000) .with_compaction_level(CompactionLevel::Final) + // level-2 file has small max_l0_created_at + .with_max_l0_created_at(Time::from_timestamp_nanos(5)) .with_file_size_bytes(l2_size), ) .await; @@ -822,6 +860,9 @@ async fn target_too_large_2() { .with_min_time(i * 1000 + 1) .with_max_time(i * 1000 + 1000) .with_compaction_level(CompactionLevel::FileNonOverlapped) + // level-1 each has different max_l0_created_at and larger than level-2 one + // set smaller time range wiht smaller max_l0_created_at which is the common use case + .with_max_l0_created_at(Time::from_timestamp_nanos(10 + i)) .with_file_size_bytes(l1_sizes[i as usize]), ) .await; @@ -833,53 +874,53 @@ async fn target_too_large_2() { --- - "**** Input Files " - "L1 " - - "L1.2[1,1000] 1ns 69mb |-----------L1.2------------| " - - "L1.3[1001,2000] 1ns 50mb |-----------L1.3------------| " + - "L1.2[1,1000] 10ns 69mb |-----------L1.2------------| " + - "L1.3[1001,2000] 11ns 50mb |-----------L1.3------------| " - "L2 " - - "L2.1[1,3000] 1ns 232mb |------------------------------------------L2.1------------------------------------------|" - - "WARNING: file L2.1[1,3000] 1ns 232mb exceeds soft limit 100mb by more than 50%" + - "L2.1[1,3000] 5ns 232mb |------------------------------------------L2.1------------------------------------------|" + - "WARNING: file L2.1[1,3000] 5ns 232mb exceeds soft limit 100mb by more than 50%" - "**** Simulation run 0, type=split(split_times=[1294, 2587]). 1 Input Files, 232mb total:" - "L2, all files 232mb " - - "L2.1[1,3000] 1ns |------------------------------------------L2.1------------------------------------------|" + - "L2.1[1,3000] 5ns |------------------------------------------L2.1------------------------------------------|" - "**** 3 Output Files (parquet_file_id not yet assigned), 232mb total:" - "L2 " - - "L2.?[1,1294] 1ns 100.03mb|----------------L2.?----------------| " - - "L2.?[1295,2587] 1ns 99.95mb |----------------L2.?----------------| " - - "L2.?[2588,3000] 1ns 32.03mb |---L2.?---| " + - "L2.?[1,1294] 5ns 100.03mb|----------------L2.?----------------| " + - "L2.?[1295,2587] 5ns 99.95mb |----------------L2.?----------------| " + - "L2.?[2588,3000] 5ns 32.03mb |---L2.?---| " - "Committing partition 1:" - " Soft Deleting 1 files: L2.1" - " Creating 3 files" - "**** Simulation run 1, type=split(split_times=[1294]). 1 Input Files, 50mb total:" - "L1, all files 50mb " - - "L1.3[1001,2000] 1ns |------------------------------------------L1.3------------------------------------------|" + - "L1.3[1001,2000] 11ns |------------------------------------------L1.3------------------------------------------|" - "**** 2 Output Files (parquet_file_id not yet assigned), 50mb total:" - "L1 " - - "L1.?[1001,1294] 1ns 14.66mb|----------L1.?----------| " - - "L1.?[1295,2000] 1ns 35.34mb |----------------------------L1.?-----------------------------| " + - "L1.?[1001,1294] 11ns 14.66mb|----------L1.?----------| " + - "L1.?[1295,2000] 11ns 35.34mb |----------------------------L1.?-----------------------------| " - "Committing partition 1:" - " Soft Deleting 1 files: L1.3" - " Creating 2 files" - "**** Simulation run 2, type=split(split_times=[705]). 3 Input Files, 183.69mb total:" - "L1 " - - "L1.2[1,1000] 1ns 69mb |-------------------------------L1.2--------------------------------| " - - "L1.7[1001,1294] 1ns 14.66mb |-------L1.7-------| " + - "L1.2[1,1000] 10ns 69mb |-------------------------------L1.2--------------------------------| " + - "L1.7[1001,1294] 11ns 14.66mb |-------L1.7-------| " - "L2 " - - "L2.4[1,1294] 1ns 100.03mb|------------------------------------------L2.4------------------------------------------|" + - "L2.4[1,1294] 5ns 100.03mb|------------------------------------------L2.4------------------------------------------|" - "**** 2 Output Files (parquet_file_id not yet assigned), 183.69mb total:" - "L2 " - - "L2.?[1,705] 1ns 100.01mb |---------------------L2.?----------------------| " - - "L2.?[706,1294] 1ns 83.68mb |-----------------L2.?-----------------| " + - "L2.?[1,705] 11ns 100.01mb|---------------------L2.?----------------------| " + - "L2.?[706,1294] 11ns 83.68mb |-----------------L2.?-----------------| " - "Committing partition 1:" - " Soft Deleting 3 files: L1.2, L2.4, L1.7" - " Creating 2 files" - "**** Final Output Files " - "L1 " - - "L1.8[1295,2000] 1ns 35.34mb |-------L1.8--------| " + - "L1.8[1295,2000] 11ns 35.34mb |-------L1.8--------| " - "L2 " - - "L2.5[1295,2587] 1ns 99.95mb |----------------L2.5----------------| " - - "L2.6[2588,3000] 1ns 32.03mb |---L2.6---| " - - "L2.9[1,705] 1ns 100.01mb |-------L2.9--------| " - - "L2.10[706,1294] 1ns 83.68mb |-----L2.10-----| " + - "L2.5[1295,2587] 5ns 99.95mb |----------------L2.5----------------| " + - "L2.6[2588,3000] 5ns 32.03mb |---L2.6---| " + - "L2.9[1,705] 11ns 100.01mb|-------L2.9--------| " + - "L2.10[706,1294] 11ns 83.68mb |-----L2.10-----| " "### ); } @@ -912,6 +953,8 @@ async fn start_too_large_similar_time_range() { .with_max_time(1000) // L1.1 or L2.2 .with_compaction_level(CompactionLevel::try_from(i as i32).unwrap()) + // max_l0_created_at of larger level is set to be smaller + .with_max_l0_created_at(Time::from_timestamp_nanos(10 - i)) .with_file_size_bytes(sizes[(i - 1) as usize]), ) .await; @@ -923,42 +966,42 @@ async fn start_too_large_similar_time_range() { --- - "**** Input Files " - "L1 " - - "L1.1[1,1000] 1ns 250mb |------------------------------------------L1.1------------------------------------------|" + - "L1.1[1,1000] 9ns 250mb |------------------------------------------L1.1------------------------------------------|" - "L2 " - - "L2.2[2,1000] 1ns 52mb |-----------------------------------------L2.2------------------------------------------| " - - "WARNING: file L1.1[1,1000] 1ns 250mb exceeds soft limit 100mb by more than 50%" + - "L2.2[2,1000] 8ns 52mb |-----------------------------------------L2.2------------------------------------------| " + - "WARNING: file L1.1[1,1000] 9ns 250mb exceeds soft limit 100mb by more than 50%" - "**** Simulation run 0, type=split(split_times=[401, 801]). 1 Input Files, 250mb total:" - "L1, all files 250mb " - - "L1.1[1,1000] 1ns |------------------------------------------L1.1------------------------------------------|" + - "L1.1[1,1000] 9ns |------------------------------------------L1.1------------------------------------------|" - "**** 3 Output Files (parquet_file_id not yet assigned), 250mb total:" - "L1 " - - "L1.?[1,401] 1ns 100.1mb |---------------L1.?---------------| " - - "L1.?[402,801] 1ns 99.85mb |--------------L1.?---------------| " - - "L1.?[802,1000] 1ns 50.05mb |-----L1.?------| " + - "L1.?[1,401] 9ns 100.1mb |---------------L1.?---------------| " + - "L1.?[402,801] 9ns 99.85mb |--------------L1.?---------------| " + - "L1.?[802,1000] 9ns 50.05mb |-----L1.?------| " - "Committing partition 1:" - " Soft Deleting 1 files: L1.1" - " Creating 3 files" - "**** Simulation run 1, type=split(split_times=[398, 795]). 3 Input Files, 251.95mb total:" - "L1 " - - "L1.3[1,401] 1ns 100.1mb |---------------L1.3---------------| " - - "L1.4[402,801] 1ns 99.85mb |--------------L1.4---------------| " + - "L1.3[1,401] 9ns 100.1mb |---------------L1.3---------------| " + - "L1.4[402,801] 9ns 99.85mb |--------------L1.4---------------| " - "L2 " - - "L2.2[2,1000] 1ns 52mb |-----------------------------------------L2.2------------------------------------------| " + - "L2.2[2,1000] 8ns 52mb |-----------------------------------------L2.2------------------------------------------| " - "**** 3 Output Files (parquet_file_id not yet assigned), 251.95mb total:" - "L2 " - - "L2.?[1,398] 1ns 100.12mb |--------------L2.?---------------| " - - "L2.?[399,795] 1ns 99.87mb |--------------L2.?---------------| " - - "L2.?[796,1000] 1ns 51.95mb |------L2.?------| " + - "L2.?[1,398] 9ns 100.12mb |--------------L2.?---------------| " + - "L2.?[399,795] 9ns 99.87mb |--------------L2.?---------------| " + - "L2.?[796,1000] 9ns 51.95mb |------L2.?------| " - "Committing partition 1:" - " Soft Deleting 3 files: L2.2, L1.3, L1.4" - " Creating 3 files" - "**** Final Output Files " - "L1 " - - "L1.5[802,1000] 1ns 50.05mb |-----L1.5------| " + - "L1.5[802,1000] 9ns 50.05mb |-----L1.5------| " - "L2 " - - "L2.6[1,398] 1ns 100.12mb |--------------L2.6---------------| " - - "L2.7[399,795] 1ns 99.87mb |--------------L2.7---------------| " - - "L2.8[796,1000] 1ns 51.95mb |------L2.8------| " + - "L2.6[1,398] 9ns 100.12mb |--------------L2.6---------------| " + - "L2.7[399,795] 9ns 99.87mb |--------------L2.7---------------| " + - "L2.8[796,1000] 9ns 51.95mb |------L2.8------| " "### ); } @@ -992,6 +1035,8 @@ async fn start_too_large_small_time_range() { .with_max_time(1000) // L1.1 or L2.2 .with_compaction_level(CompactionLevel::try_from(i as i32).unwrap()) + // max_l0_created_at of larger level is set to be smaller + .with_max_l0_created_at(Time::from_timestamp_nanos(10 - i)) .with_file_size_bytes(sizes[(i - 1) as usize]), ) .await; @@ -1003,42 +1048,42 @@ async fn start_too_large_small_time_range() { --- - "**** Input Files " - "L1 " - - "L1.1[0,1000] 1ns 250mb |------------------------------------------L1.1------------------------------------------|" + - "L1.1[0,1000] 9ns 250mb |------------------------------------------L1.1------------------------------------------|" - "L2 " - - "L2.2[800,1000] 1ns 52mb |------L2.2------|" - - "WARNING: file L1.1[0,1000] 1ns 250mb exceeds soft limit 100mb by more than 50%" + - "L2.2[800,1000] 8ns 52mb |------L2.2------|" + - "WARNING: file L1.1[0,1000] 9ns 250mb exceeds soft limit 100mb by more than 50%" - "**** Simulation run 0, type=split(split_times=[400, 800]). 1 Input Files, 250mb total:" - "L1, all files 250mb " - - "L1.1[0,1000] 1ns |------------------------------------------L1.1------------------------------------------|" + - "L1.1[0,1000] 9ns |------------------------------------------L1.1------------------------------------------|" - "**** 3 Output Files (parquet_file_id not yet assigned), 250mb total:" - "L1 " - - "L1.?[0,400] 1ns 100mb |---------------L1.?---------------| " - - "L1.?[401,800] 1ns 99.75mb |--------------L1.?---------------| " - - "L1.?[801,1000] 1ns 50.25mb |-----L1.?------| " + - "L1.?[0,400] 9ns 100mb |---------------L1.?---------------| " + - "L1.?[401,800] 9ns 99.75mb |--------------L1.?---------------| " + - "L1.?[801,1000] 9ns 50.25mb |-----L1.?------| " - "Committing partition 1:" - " Soft Deleting 1 files: L1.1" - " Creating 3 files" - "**** Simulation run 1, type=split(split_times=[698, 995]). 3 Input Files, 202mb total:" - "L1 " - - "L1.5[801,1000] 1ns 50.25mb |-----------L1.5------------| " - - "L1.4[401,800] 1ns 99.75mb|--------------------------L1.4---------------------------| " + - "L1.5[801,1000] 9ns 50.25mb |-----------L1.5------------| " + - "L1.4[401,800] 9ns 99.75mb|--------------------------L1.4---------------------------| " - "L2 " - - "L2.2[800,1000] 1ns 52mb |------------L2.2------------| " + - "L2.2[800,1000] 8ns 52mb |------------L2.2------------| " - "**** 3 Output Files (parquet_file_id not yet assigned), 202mb total:" - "L2 " - - "L2.?[401,698] 1ns 100.16mb|-------------------L2.?-------------------| " - - "L2.?[699,995] 1ns 99.82mb |-------------------L2.?-------------------| " - - "L2.?[996,1000] 1ns 2.02mb |L2.?|" + - "L2.?[401,698] 9ns 100.16mb|-------------------L2.?-------------------| " + - "L2.?[699,995] 9ns 99.82mb |-------------------L2.?-------------------| " + - "L2.?[996,1000] 9ns 2.02mb |L2.?|" - "Committing partition 1:" - " Soft Deleting 3 files: L2.2, L1.4, L1.5" - " Upgrading 1 files level to CompactionLevel::L2: L1.3" - " Creating 3 files" - "**** Final Output Files " - "L2 " - - "L2.3[0,400] 1ns 100mb |---------------L2.3---------------| " - - "L2.6[401,698] 1ns 100.16mb |----------L2.6----------| " - - "L2.7[699,995] 1ns 99.82mb |----------L2.7----------| " - - "L2.8[996,1000] 1ns 2.02mb |L2.8|" + - "L2.3[0,400] 9ns 100mb |---------------L2.3---------------| " + - "L2.6[401,698] 9ns 100.16mb |----------L2.6----------| " + - "L2.7[699,995] 9ns 99.82mb |----------L2.7----------| " + - "L2.8[996,1000] 9ns 2.02mb |L2.8|" "### ); } @@ -1153,6 +1198,8 @@ async fn start_too_large_small_time_range_3() { .with_max_time((i - 1) * 1000 + 300) // L1.1 or L2.2 .with_compaction_level(CompactionLevel::try_from(i as i32).unwrap()) + // max_l0_created_at of larger level is set to be smaller + .with_max_l0_created_at(Time::from_timestamp_nanos(10 - i)) .with_file_size_bytes(sizes[(i - 1) as usize]), ) .await; @@ -1164,42 +1211,42 @@ async fn start_too_large_small_time_range_3() { --- - "**** Input Files " - "L1 " - - "L1.1[0,300] 1ns 250mb |-------L1.1-------| " + - "L1.1[0,300] 9ns 250mb |-------L1.1-------| " - "L2 " - - "L2.2[200,1300] 1ns 52mb |-----------------------------------L2.2-----------------------------------| " - - "WARNING: file L1.1[0,300] 1ns 250mb exceeds soft limit 100mb by more than 50%" + - "L2.2[200,1300] 8ns 52mb |-----------------------------------L2.2-----------------------------------| " + - "WARNING: file L1.1[0,300] 9ns 250mb exceeds soft limit 100mb by more than 50%" - "**** Simulation run 0, type=split(split_times=[120, 240]). 1 Input Files, 250mb total:" - "L1, all files 250mb " - - "L1.1[0,300] 1ns |------------------------------------------L1.1------------------------------------------|" + - "L1.1[0,300] 9ns |------------------------------------------L1.1------------------------------------------|" - "**** 3 Output Files (parquet_file_id not yet assigned), 250mb total:" - "L1 " - - "L1.?[0,120] 1ns 100mb |---------------L1.?---------------| " - - "L1.?[121,240] 1ns 99.17mb |--------------L1.?---------------| " - - "L1.?[241,300] 1ns 50.83mb |-----L1.?------| " + - "L1.?[0,120] 9ns 100mb |---------------L1.?---------------| " + - "L1.?[121,240] 9ns 99.17mb |--------------L1.?---------------| " + - "L1.?[241,300] 9ns 50.83mb |-----L1.?------| " - "Committing partition 1:" - " Soft Deleting 1 files: L1.1" - " Creating 3 files" - "**** Simulation run 1, type=split(split_times=[705, 1289]). 3 Input Files, 202mb total:" - "L1 " - - "L1.5[241,300] 1ns 50.83mb |L1.5| " - - "L1.4[121,240] 1ns 99.17mb|-L1.4--| " + - "L1.5[241,300] 9ns 50.83mb |L1.5| " + - "L1.4[121,240] 9ns 99.17mb|-L1.4--| " - "L2 " - - "L2.2[200,1300] 1ns 52mb |--------------------------------------L2.2---------------------------------------| " + - "L2.2[200,1300] 8ns 52mb |--------------------------------------L2.2---------------------------------------| " - "**** 3 Output Files (parquet_file_id not yet assigned), 202mb total:" - "L2 " - - "L2.?[121,705] 1ns 100.06mb|-------------------L2.?-------------------| " - - "L2.?[706,1289] 1ns 99.89mb |-------------------L2.?-------------------| " - - "L2.?[1290,1300] 1ns 2.06mb |L2.?|" + - "L2.?[121,705] 9ns 100.06mb|-------------------L2.?-------------------| " + - "L2.?[706,1289] 9ns 99.89mb |-------------------L2.?-------------------| " + - "L2.?[1290,1300] 9ns 2.06mb |L2.?|" - "Committing partition 1:" - " Soft Deleting 3 files: L2.2, L1.4, L1.5" - " Upgrading 1 files level to CompactionLevel::L2: L1.3" - " Creating 3 files" - "**** Final Output Files " - "L2 " - - "L2.3[0,120] 1ns 100mb |-L2.3-| " - - "L2.6[121,705] 1ns 100.06mb |-----------------L2.6-----------------| " - - "L2.7[706,1289] 1ns 99.89mb |-----------------L2.7-----------------| " - - "L2.8[1290,1300] 1ns 2.06mb |L2.8|" + - "L2.3[0,120] 9ns 100mb |-L2.3-| " + - "L2.6[121,705] 9ns 100.06mb |-----------------L2.6-----------------| " + - "L2.7[706,1289] 9ns 99.89mb |-----------------L2.7-----------------| " + - "L2.8[1290,1300] 9ns 2.06mb |L2.8|" "### ); } @@ -1232,6 +1279,8 @@ async fn tiny_time_range() { .with_min_time(1) .with_max_time(2) .with_compaction_level(CompactionLevel::FileNonOverlapped) + // L1 file with larger max_l0_created_at + .with_max_l0_created_at(Time::from_timestamp_nanos(10)) .with_file_size_bytes(l1_size), ) .await; @@ -1244,6 +1293,8 @@ async fn tiny_time_range() { .with_min_time(1) .with_max_time(1000) .with_compaction_level(CompactionLevel::Final) + // L2 file with smaller max_l0_created_at + .with_max_l0_created_at(Time::from_timestamp_nanos(5)) .with_file_size_bytes(l2_size), ) .await; @@ -1255,17 +1306,17 @@ async fn tiny_time_range() { --- - "**** Input Files " - "L1 " - - "L1.1[1,2] 1ns 250mb |L1.1| " + - "L1.1[1,2] 10ns 250mb |L1.1| " - "L2 " - - "L2.2[1,1000] 1ns 52mb |------------------------------------------L2.2------------------------------------------|" - - "WARNING: file L1.1[1,2] 1ns 250mb exceeds soft limit 100mb by more than 50%" + - "L2.2[1,1000] 5ns 52mb |------------------------------------------L2.2------------------------------------------|" + - "WARNING: file L1.1[1,2] 10ns 250mb exceeds soft limit 100mb by more than 50%" - "SKIPPED COMPACTION for PartitionId(1): partition 1 has overlapped files that exceed max compact size limit 314572800. This may happen if a large amount of data has the same timestamp" - "**** Final Output Files " - "L1 " - - "L1.1[1,2] 1ns 250mb |L1.1| " + - "L1.1[1,2] 10ns 250mb |L1.1| " - "L2 " - - "L2.2[1,1000] 1ns 52mb |------------------------------------------L2.2------------------------------------------|" - - "WARNING: file L1.1[1,2] 1ns 250mb exceeds soft limit 100mb by more than 50%" + - "L2.2[1,1000] 5ns 52mb |------------------------------------------L2.2------------------------------------------|" + - "WARNING: file L1.1[1,2] 10ns 250mb exceeds soft limit 100mb by more than 50%" "### ); } diff --git a/compactor2_test_utils/src/lib.rs b/compactor2_test_utils/src/lib.rs index e23956f8f4..43049ef8dc 100644 --- a/compactor2_test_utils/src/lib.rs +++ b/compactor2_test_utils/src/lib.rs @@ -595,6 +595,11 @@ impl TestSetup { .await } + /// Get the parquet files including the soft-deleted stored in the catalog + pub async fn list_by_table(&self) -> Vec { + self.catalog.list_by_table(self.table.table.id).await + } + /// Reads the specified parquet file out of object store pub async fn read_parquet_file(&self, file: ParquetFile) -> Vec { assert_eq!(file.table_id, self.table.table.id); diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index 6bf8a165fa..ce032d4226 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -679,6 +679,10 @@ pub trait ParquetFileRepo: Send + Sync { /// [`to_delete`](ParquetFile::to_delete). async fn list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result>; + /// List all parquet files within a given table including those marked as [`to_delete`](ParquetFile::to_delete). + /// This is for debug purpose + async fn list_by_table(&mut self, table_id: TableId) -> Result>; + /// Delete all parquet files that were marked to be deleted earlier than the specified time. /// Returns the deleted records. async fn delete_old(&mut self, older_than: Timestamp) -> Result>; @@ -2743,6 +2747,15 @@ pub(crate) mod test_helpers { assert_matches!(deleted_files.as_slice(), []); assert!(repos.parquet_files().exist(parquet_file.id).await.unwrap()); + // test list_by_table that includes soft-deleted file + // at this time the file is not soft-deleted yet and will be included in the returned list + let files = repos + .parquet_files() + .list_by_table(parquet_file.table_id) + .await + .unwrap(); + assert_eq!(files.len(), 1); + // verify to_delete can be updated to a timestamp repos .parquet_files() @@ -2757,6 +2770,15 @@ pub(crate) mod test_helpers { let marked_deleted = files.first().unwrap(); assert!(marked_deleted.to_delete.is_some()); + // test list_by_table that includes soft-deleted file + // at this time the file is soft-deleted and will be included in the returned list + let files = repos + .parquet_files() + .list_by_table(parquet_file.table_id) + .await + .unwrap(); + assert_eq!(files.len(), 1); + // File is not deleted if it was marked to be deleted after the specified time let before_deleted = Timestamp::new( (catalog.time_provider().now() - Duration::from_secs(100)).timestamp_nanos(), @@ -2769,12 +2791,31 @@ pub(crate) mod test_helpers { assert!(deleted_files.is_empty()); assert!(repos.parquet_files().exist(parquet_file.id).await.unwrap()); + // test list_by_table that includes soft-deleted file + // at this time the file is not actually hard deleted yet and stay as soft deleted + // and will be returned in the list + let files = repos + .parquet_files() + .list_by_table(parquet_file.table_id) + .await + .unwrap(); + assert_eq!(files.len(), 1); + // File is deleted if it was marked to be deleted before the specified time let deleted_files = repos.parquet_files().delete_old(older_than).await.unwrap(); assert_eq!(deleted_files.len(), 1); assert_eq!(marked_deleted, &deleted_files[0]); assert!(!repos.parquet_files().exist(parquet_file.id).await.unwrap()); + // test list_by_table that includes soft-deleted file + // at this time the file is hard deleted -> the returned list is empty + let files = repos + .parquet_files() + .list_by_table(parquet_file.table_id) + .await + .unwrap(); + assert_eq!(files.len(), 0); + // test list_by_table_not_to_delete let files = repos .parquet_files() @@ -2789,6 +2830,16 @@ pub(crate) mod test_helpers { .unwrap(); assert_eq!(files, vec![other_file.clone()]); + // test list_by_table + println!("parquet_file.table_id = {}", parquet_file.table_id); + let files = repos + .parquet_files() + // .list_by_table(parquet_file.table_id) // todo: tables of deleted files + .list_by_table(other_file.table_id) + .await + .unwrap(); + assert_eq!(files.len(), 1); + // test list_by_namespace_not_to_delete let namespace2 = repos .namespaces() diff --git a/iox_catalog/src/mem.rs b/iox_catalog/src/mem.rs index 4e196960e3..7915eed462 100644 --- a/iox_catalog/src/mem.rs +++ b/iox_catalog/src/mem.rs @@ -1291,6 +1291,18 @@ impl ParquetFileRepo for MemTxn { Ok(parquet_files) } + async fn list_by_table(&mut self, table_id: TableId) -> Result> { + let stage = self.stage(); + + let parquet_files: Vec<_> = stage + .parquet_files + .iter() + .filter(|f| table_id == f.table_id) + .cloned() + .collect(); + Ok(parquet_files) + } + async fn delete_old(&mut self, older_than: Timestamp) -> Result> { let stage = self.stage(); diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index 60d2a91214..20f213884c 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -281,6 +281,7 @@ decorate!( "parquet_list_by_shard_greater_than" = list_by_shard_greater_than(&mut self, shard_id: ShardId, sequence_number: SequenceNumber) -> Result>; "parquet_list_by_namespace_not_to_delete" = list_by_namespace_not_to_delete(&mut self, namespace_id: NamespaceId) -> Result>; "parquet_list_by_table_not_to_delete" = list_by_table_not_to_delete(&mut self, table_id: TableId) -> Result>; + "parquet_list_by_table" = list_by_table(&mut self, table_id: TableId) -> Result>; "parquet_delete_old" = delete_old(&mut self, older_than: Timestamp) -> Result>; "parquet_delete_old_ids_only" = delete_old_ids_only(&mut self, older_than: Timestamp) -> Result>; "parquet_list_by_partition_not_to_delete" = list_by_partition_not_to_delete(&mut self, partition_id: PartitionId) -> Result>; diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 0548fd2be3..c59c8886ba 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1914,6 +1914,24 @@ WHERE table_id = $1 AND to_delete IS NULL; .map_err(|e| Error::SqlxError { source: e }) } + async fn list_by_table(&mut self, table_id: TableId) -> Result> { + // Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large + // `parquet_metadata` column!! + sqlx::query_as::<_, ParquetFile>( + r#" +SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, + max_sequence_number, min_time, max_time, to_delete, file_size_bytes, + row_count, compaction_level, created_at, column_set, max_l0_created_at +FROM parquet_file +WHERE table_id = $1; + "#, + ) + .bind(table_id) // $1 + .fetch_all(&mut self.inner) + .await + .map_err(|e| Error::SqlxError { source: e }) + } + async fn delete_old(&mut self, older_than: Timestamp) -> Result> { sqlx::query_as::<_, ParquetFile>( r#" diff --git a/iox_catalog/src/sqlite.rs b/iox_catalog/src/sqlite.rs index 8928d0c8e8..38e4149860 100644 --- a/iox_catalog/src/sqlite.rs +++ b/iox_catalog/src/sqlite.rs @@ -1797,6 +1797,27 @@ WHERE table_id = $1 AND to_delete IS NULL; .collect()) } + async fn list_by_table(&mut self, table_id: TableId) -> Result> { + // Deliberately doesn't use `SELECT *` to avoid the performance hit of fetching the large + // `parquet_metadata` column!! + Ok(sqlx::query_as::<_, ParquetFilePod>( + r#" +SELECT id, shard_id, namespace_id, table_id, partition_id, object_store_id, + max_sequence_number, min_time, max_time, to_delete, file_size_bytes, + row_count, compaction_level, created_at, column_set, max_l0_created_at +FROM parquet_file +WHERE table_id = $1; + "#, + ) + .bind(table_id) // $1 + .fetch_all(self.inner.get_mut()) + .await + .map_err(|e| Error::SqlxError { source: e })? + .into_iter() + .map(Into::into) + .collect()) + } + async fn delete_old(&mut self, older_than: Timestamp) -> Result> { Ok(sqlx::query_as::<_, ParquetFilePod>( r#" diff --git a/iox_tests/src/catalog.rs b/iox_tests/src/catalog.rs index 7e6c660f4f..e7b488911d 100644 --- a/iox_tests/src/catalog.rs +++ b/iox_tests/src/catalog.rs @@ -277,6 +277,17 @@ impl TestCatalog { .unwrap() } + /// List all files including the soft deleted ones + pub async fn list_by_table(self: &Arc, table_id: TableId) -> Vec { + self.catalog + .repositories() + .await + .parquet_files() + .list_by_table(table_id) + .await + .unwrap() + } + /// Add a partition into skipped compaction pub async fn add_to_skipped_compaction( self: &Arc,