From b785f751b354c65d2e0548972f9e94b50e118612 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 17 Feb 2023 16:04:26 +0100 Subject: [PATCH] feat(compactor): add simulator output (#7021) --- compactor2/tests/layouts/mod.rs | 86 ++++++++++++++++++++++++++ compactor2_test_utils/src/display.rs | 2 +- compactor2_test_utils/src/simulator.rs | 57 ++++++++++++----- 3 files changed, 130 insertions(+), 15 deletions(-) diff --git a/compactor2/tests/layouts/mod.rs b/compactor2/tests/layouts/mod.rs index b8b2af1cfd..be2a5c58c3 100644 --- a/compactor2/tests/layouts/mod.rs +++ b/compactor2/tests/layouts/mod.rs @@ -121,6 +121,10 @@ async fn all_overlapping_l0() { - "L0.3[100,200] |-------------------------------------L0.3-------------------------------------|" - "L0.2[100,200] |-------------------------------------L0.2-------------------------------------|" - "L0.1[100,200] |-------------------------------------L0.1-------------------------------------|" + - "**** 2 Output Files (parquet_file_id not yet assigned), 90mb total:" + - "L1 " + - "L1.0[100,180] 72mb |-----------------------------L1.0-----------------------------| " + - "L1.0[180,200] 18mb |-----L1.0-----|" - "**** Final Output Files " - "L1 " - "L1.11[100,180] 72mb |----------------------------L1.11-----------------------------| " @@ -176,6 +180,10 @@ async fn all_non_overlapping_l0() { - "L0.3[200,201] |L0.3| " - "L0.2[100,101] |L0.2| " - "L0.1[0,1] |L0.1| " + - "**** 2 Output Files (parquet_file_id not yet assigned), 100mb total:" + - "L1 " + - "L1.0[0,720] 79.91mb |----------------------------L1.0-----------------------------| " + - "L1.0[720,901] 20.09mb |-----L1.0-----| " - "**** Final Output Files " - "L1 " - "L1.11[0,720] 79.91mb|----------------------------L1.11----------------------------| " @@ -245,6 +253,9 @@ async fn l1_with_overlapping_l0() { - "L0.3[140,190] 5kb |------L0.3-------| " - "L1 " - "L1.2[100,150] 10mb |------L1.2-------| " + - "**** 1 Output Files (parquet_file_id not yet assigned), 10.02mb total:" + - "L1, all files 10.02mb " + - "L1.0[100,310] |-------------------------------------L1.0-------------------------------------|" - "**** Final Output Files " - "L1 " - "L1.1[50,100] 10mb |----L1.1-----| " @@ -310,6 +321,9 @@ async fn l1_with_non_overlapping_l0() { - "L0.5[400,450] |-----L0.5-----| " - "L0.4[350,400] |-----L0.4-----| " - "L0.3[300,350] |-----L0.3-----| " + - "**** 1 Output Files (parquet_file_id not yet assigned), 25kb total:" + - "L1, all files 25kb " + - "L1.0[300,550] |-------------------------------------L1.0-------------------------------------|" - "**** Final Output Files " - "L1 " - "L1.1[50,100] 10mb |-L1.1-| " @@ -374,6 +388,9 @@ async fn l1_with_non_overlapping_l0_larger() { - "L0.7[400,450] |----------L0.7----------| " - "L0.6[350,400] |----------L0.6----------| " - "L0.5[300,350] |----------L0.5----------| " + - "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:" + - "L1, all files 15mb " + - "L1.0[300,450] |-------------------------------------L1.0-------------------------------------|" - "**** Simulation run 1, type=split(split_times=[370]). 5 Input Files, 108mb total:" - "L1 " - "L1.4[200,250] 3mb |--L1.4--| " @@ -381,6 +398,10 @@ async fn l1_with_non_overlapping_l0_larger() { - "L1.2[100,150] 50mb |--L1.2--| " - "L1.1[50,100] 20mb |--L1.1--| " - "L1.8[300,450] 15mb |------------L1.8------------|" + - "**** 2 Output Files (parquet_file_id not yet assigned), 108mb total:" + - "L2 " + - "L2.0[50,370] 86.4mb |-----------------------------L2.0-----------------------------| " + - "L2.0[370,450] 21.6mb |-----L2.0-----|" - "**** Final Output Files " - "L2 " - "L2.9[50,370] 86.4mb |-----------------------------L2.9-----------------------------| " @@ -453,6 +474,9 @@ async fn l1_too_much_with_non_overlapping_l0() { - "L0.13[600,650] |------------------------------------L0.13-------------------------------------|" - "L0.12[600,650] |------------------------------------L0.12-------------------------------------|" - "L0.11[600,650] |------------------------------------L0.11-------------------------------------|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:" + - "L1, all files 15mb " + - "L1.0[600,650] |-------------------------------------L1.0-------------------------------------|" - "SKIPPED COMPACTION for PartitionId(1): partition 1 has 781189120 parquet file bytes, limit is 268435456" - "**** Final Output Files " - "L1 " @@ -534,6 +558,9 @@ async fn many_l1_with_non_overlapping_l0() { - "L0.13[600,650] |------------------------------------L0.13-------------------------------------|" - "L0.12[600,650] |------------------------------------L0.12-------------------------------------|" - "L0.11[600,650] |------------------------------------L0.11-------------------------------------|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:" + - "L1, all files 15mb " + - "L1.0[600,650] |-------------------------------------L1.0-------------------------------------|" - "**** Simulation run 1, type=split(split_times=[530]). 11 Input Files, 88mb total:" - "L1 " - "L1.10[500,550] 7mb |L1.10| " @@ -547,6 +574,10 @@ async fn many_l1_with_non_overlapping_l0() { - "L1.2[100,150] 8mb |L1.2| " - "L1.1[50,100] 9mb |L1.1| " - "L1.14[600,650] 15mb |L1.14|" + - "**** 2 Output Files (parquet_file_id not yet assigned), 88mb total:" + - "L2 " + - "L2.0[50,530] 70.4mb |-----------------------------L2.0-----------------------------| " + - "L2.0[530,650] 17.6mb |-----L2.0-----|" - "**** Final Output Files " - "L2 " - "L2.15[50,530] 70.4mb|----------------------------L2.15-----------------------------| " @@ -610,11 +641,18 @@ async fn large_l1_with_non_overlapping_l0() { - "L0.5[600,650] |-------------------------------------L0.5-------------------------------------|" - "L0.4[600,650] |-------------------------------------L0.4-------------------------------------|" - "L0.3[600,650] |-------------------------------------L0.3-------------------------------------|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:" + - "L1, all files 15mb " + - "L1.0[600,650] |-------------------------------------L1.0-------------------------------------|" - "**** Simulation run 1, type=split(split_times=[375]). 3 Input Files, 185mb total:" - "L1 " - "L1.2[100,150] 80mb |L1.2| " - "L1.1[50,100] 90mb |L1.1| " - "L1.6[600,650] 15mb |L1.6| " + - "**** 2 Output Files (parquet_file_id not yet assigned), 185mb total:" + - "L2 " + - "L2.0[50,375] 100.21mb|------------------L2.0-------------------| " + - "L2.0[375,650] 84.79mb |---------------L2.0---------------| " - "**** Final Output Files " - "L2 " - "L2.7[50,375] 100.21mb|------------------L2.7-------------------| " @@ -690,6 +728,9 @@ async fn many_l1_files() { - "L0.23[24,25] |------------------------------------L0.23-------------------------------------|" - "L0.22[24,25] |------------------------------------L0.22-------------------------------------|" - "L0.21[24,25] |------------------------------------L0.21-------------------------------------|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 3mb total:" + - "L1, all files 3mb " + - "L1.0[24,25] |-------------------------------------L1.0-------------------------------------|" - "**** Simulation run 1, type=split(split_times=[13]). 21 Input Files, 203mb total:" - "L1 " - "L1.20[19,20] 10mb |L1.20| " @@ -713,6 +754,10 @@ async fn many_l1_files() { - "L1.2[1,2] 10mb |L1.2| " - "L1.1[0,1] 10mb |L1.1| " - "L1.24[24,25] 3mb |L1.24|" + - "**** 2 Output Files (parquet_file_id not yet assigned), 203mb total:" + - "L2 " + - "L2.0[0,13] 105.56mb |-----------------L2.0------------------| " + - "L2.0[13,25] 97.44mb |----------------L2.0----------------| " - "**** Final Output Files " - "L2 " - "L2.25[0,13] 105.56mb|-----------------L2.25-----------------| " @@ -1246,6 +1291,9 @@ async fn many_tiny_l0_files() { - "L0.198[197,198] |L0.198|" - "L0.199[198,199] |L0.199|" - "L0.200[199,200] |L0.200|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:" + - "L0, all files 1.37mb " + - "L0.0[0,200] |-------------------------------------L0.0-------------------------------------|" - "**** Simulation run 1, type=compact. 88 Input Files, 616kb total:" - "L0, all files 7kb " - "L0.201[200,201] |L0.201| " @@ -1336,10 +1384,16 @@ async fn many_tiny_l0_files() { - "L0.286[285,286] |L0.286|" - "L0.287[286,287] |L0.287|" - "L0.288[287,288] |L0.288|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 616kb total:" + - "L0, all files 616kb " + - "L0.0[200,288] |-------------------------------------L0.0-------------------------------------|" - "**** Simulation run 2, type=compact. 2 Input Files, 1.97mb total:" - "L0 " - "L0.290[200,288] 616kb |--------L0.290--------| " - "L0.289[0,200] 1.37mb|-----------------------L0.289------------------------| " + - "**** 1 Output Files (parquet_file_id not yet assigned), 1.97mb total:" + - "L1, all files 1.97mb " + - "L1.0[0,288] |-------------------------------------L1.0-------------------------------------|" - "**** Final Output Files " - "L1, all files 1.97mb " - "L1.291[0,288] |------------------------------------L1.291------------------------------------|" @@ -1992,6 +2046,9 @@ async fn over_two_times_max_files_per_plan() { - "L0.198[197,198] |L0.198|" - "L0.199[198,199] |L0.199|" - "L0.200[199,200] |L0.200|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:" + - "L0, all files 1.37mb " + - "L0.0[0,200] |-------------------------------------L0.0-------------------------------------|" - "**** Simulation run 1, type=compact. 200 Input Files, 1.37mb total:" - "L0, all files 7kb " - "L0.201[200,201] |L0.201| " @@ -2194,6 +2251,9 @@ async fn over_two_times_max_files_per_plan() { - "L0.398[397,398] |L0.398|" - "L0.399[398,399] |L0.399|" - "L0.400[399,400] |L0.400|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:" + - "L0, all files 1.37mb " + - "L0.0[200,400] |-------------------------------------L0.0-------------------------------------|" - "**** Simulation run 2, type=compact. 10 Input Files, 70kb total:" - "L0, all files 7kb " - "L0.401[400,401] |L0.401| " @@ -2206,11 +2266,17 @@ async fn over_two_times_max_files_per_plan() { - "L0.408[407,408] |L0.408| " - "L0.409[408,409] |L0.409| " - "L0.410[409,410] |L0.410|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 70kb total:" + - "L0, all files 70kb " + - "L0.0[400,410] |-------------------------------------L0.0-------------------------------------|" - "**** Simulation run 3, type=compact. 3 Input Files, 2.8mb total:" - "L0 " - "L0.413[400,410] 70kb |L0.413|" - "L0.412[200,400] 1.37mb |---------------L0.412----------------| " - "L0.411[0,200] 1.37mb|---------------L0.411----------------| " + - "**** 1 Output Files (parquet_file_id not yet assigned), 2.8mb total:" + - "L1, all files 2.8mb " + - "L1.0[0,410] |-------------------------------------L1.0-------------------------------------|" - "**** Final Output Files " - "L1, all files 2.8mb " - "L1.414[0,410] |------------------------------------L1.414------------------------------------|" @@ -2743,6 +2809,9 @@ async fn many_tiny_l1_files() { - "L1.198[197,198] |L1.198|" - "L1.199[198,199] |L1.199|" - "L1.200[199,200] |L1.200|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:" + - "L1, all files 1.37mb " + - "L1.0[0,200] |-------------------------------------L1.0-------------------------------------|" - "**** Simulation run 1, type=compact. 88 Input Files, 616kb total:" - "L1, all files 7kb " - "L1.201[200,201] |L1.201| " @@ -2833,6 +2902,9 @@ async fn many_tiny_l1_files() { - "L1.286[285,286] |L1.286|" - "L1.287[286,287] |L1.287|" - "L1.288[287,288] |L1.288|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 616kb total:" + - "L1, all files 616kb " + - "L1.0[200,288] |-------------------------------------L1.0-------------------------------------|" - "**** Final Output Files " - "L1 " - "L1.289[0,200] 1.37mb|-----------------------L1.289------------------------| " @@ -3299,6 +3371,9 @@ async fn many_l0_and_overlapped_l1_files() { - "L0.188[187,188] |L0.188|" - "L0.189[188,189] |L0.189|" - "L0.190[189,190] |L0.190|" + - "**** 1 Output Files (parquet_file_id not yet assigned), 1.3mb total:" + - "L0, all files 1.3mb " + - "L0.0[0,190] |-------------------------------------L0.0-------------------------------------|" - "**** Simulation run 1, type=split(split_times=[159]). 21 Input Files, 21.3mb total:" - "L0 " - "L0.211[0,190] 1.3mb |----------------------------------L0.211----------------------------------| " @@ -3323,6 +3398,10 @@ async fn many_l0_and_overlapped_l1_files() { - "L1.207[160,169] 1mb |L1.207| " - "L1.208[170,179] 1mb |L1.208| " - "L1.209[180,189] 1mb |L1.209|" + - "**** 2 Output Files (parquet_file_id not yet assigned), 21.3mb total:" + - "L1 " + - "L1.0[0,159] 17.02mb |----------------------------L1.0-----------------------------| " + - "L1.0[159,199] 4.28mb |-----L1.0-----| " - "**** Final Output Files " - "L1 " - "L1.212[0,159] 17.02mb|---------------------------L1.212----------------------------| " @@ -3810,6 +3889,9 @@ async fn not_many_l0_and_overlapped_l1_files() { - "L1.192[10,19] 1mb |L1.192| " - "L1.193[20,29] 1mb |L1.193| " - "L1.194[30,39] 1mb |L1.194| " + - "**** 1 Output Files (parquet_file_id not yet assigned), 6.3mb total:" + - "L1, all files 6.3mb " + - "L1.0[0,190] |-------------------------------------L1.0-------------------------------------|" - "**** Simulation run 1, type=split(split_times=[2407]). 16 Input Files, 21.3mb total:" - "L1 " - "L1.196[200,209] 1mb |L1.196| " @@ -3828,6 +3910,10 @@ async fn not_many_l0_and_overlapped_l1_files() { - "L1.209[2800,2809] 1mb |L1.209|" - "L1.210[3000,3009] 1mb |L1.210|" - "L1.211[0,190] 6.3mb |L1.211| " + - "**** 2 Output Files (parquet_file_id not yet assigned), 21.3mb total:" + - "L2 " + - "L2.0[0,2407] 17.04mb|----------------------------L2.0-----------------------------| " + - "L2.0[2407,3009] 4.26mb |-----L2.0-----| " - "**** Final Output Files " - "L2 " - "L2.212[0,2407] 17.04mb|---------------------------L2.212----------------------------| " diff --git a/compactor2_test_utils/src/display.rs b/compactor2_test_utils/src/display.rs index 7ff46eaf9b..0065454ed3 100644 --- a/compactor2_test_utils/src/display.rs +++ b/compactor2_test_utils/src/display.rs @@ -37,7 +37,7 @@ const DEFAULT_HEADING_WIDTH: usize = 20; /// parquet files arranged so they are lined up horizontally based on /// their relative time range. /// -/// See docs on [`ParquetFileFormatter`]z for examples. +/// See docs on [`ParquetFileFormatter`] for examples. fn readable_list_of_files<'a>( title: Option, files: impl IntoIterator, diff --git a/compactor2_test_utils/src/simulator.rs b/compactor2_test_utils/src/simulator.rs index 651e48bd87..1d4a299c9b 100644 --- a/compactor2_test_utils/src/simulator.rs +++ b/compactor2_test_utils/src/simulator.rs @@ -5,7 +5,8 @@ use std::{ use async_trait::async_trait; use data_types::{ - ColumnSet, CompactionLevel, ParquetFile, ParquetFileParams, SequenceNumber, ShardId, Timestamp, + ColumnSet, CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, SequenceNumber, + ShardId, Timestamp, }; use datafusion::physical_plan::SendableRecordBatchStream; use iox_time::Time; @@ -69,24 +70,50 @@ impl ParquetFileSimulator { runs.into_iter() .enumerate() .flat_map(|(i, run)| { - let total_input_size: i64 = run - .input_parquet_files - .iter() - .map(|f| f.file_size_bytes) - .sum(); - let title = format!( + let SimulatedRun { + plan_type, + input_parquet_files, + output_params, + } = run; + + let input_title = format!( "**** Simulation run {}, type={}. {} Input Files, {} total:", i, - run.plan_type, - run.input_parquet_files.len(), - display_size(total_input_size) + plan_type, + input_parquet_files.len(), + display_size(total_size(input_parquet_files.iter())) ); - format_files(title, &run.input_parquet_files) + + // display the files created by this run + let output_parquet_files: Vec<_> = output_params + .into_iter() + .map(|params| { + // Use file id 0 as they haven't been + // assigned an id in the catalog yet + ParquetFile::from_params(params, ParquetFileId::new(0)) + }) + .collect(); + + let output_title = format!( + "**** {} Output Files (parquet_file_id not yet assigned), {} total:", + output_parquet_files.len(), + display_size(total_size(output_parquet_files.iter())) + ); + + // hook up inputs and outputs + format_files(input_title, &input_parquet_files) + .into_iter() + .chain(format_files(output_title, &output_parquet_files).into_iter()) }) .collect() } } +/// return the total file size of all the parquet files +fn total_size<'a>(parquet_files: impl IntoIterator) -> i64 { + parquet_files.into_iter().map(|f| f.file_size_bytes).sum() +} + #[async_trait] impl ParquetFilesSink for ParquetFileSimulator { async fn stream_into_file_sink( @@ -134,20 +161,21 @@ impl ParquetFilesSink for ParquetFileSimulator { let partition_info = partition_info.as_ref(); // Compute final output - let output: Vec<_> = output_files + let output_params: Vec<_> = output_files .into_iter() .map(|f| { f.into_parquet_file_params(max_l0_created_at, column_set.clone(), partition_info) }) .collect(); - // record what we did + // record what the simulator did self.runs.lock().unwrap().push(SimulatedRun { plan_type, input_parquet_files, + output_params: output_params.clone(), }); - Ok(output) + Ok(output_params) } fn as_any(&self) -> &dyn std::any::Any { @@ -229,6 +257,7 @@ pub struct SimulatedRun { // fields are used in testing plan_type: String, input_parquet_files: Vec, + output_params: Vec, } fn overall_column_set<'a>(files: impl IntoIterator) -> ColumnSet {