feat(compactor): add simulator output (#7021)
parent
bda2310ca1
commit
b785f751b3
|
@ -121,6 +121,10 @@ async fn all_overlapping_l0() {
|
|||
- "L0.3[100,200] |-------------------------------------L0.3-------------------------------------|"
|
||||
- "L0.2[100,200] |-------------------------------------L0.2-------------------------------------|"
|
||||
- "L0.1[100,200] |-------------------------------------L0.1-------------------------------------|"
|
||||
- "**** 2 Output Files (parquet_file_id not yet assigned), 90mb total:"
|
||||
- "L1 "
|
||||
- "L1.0[100,180] 72mb |-----------------------------L1.0-----------------------------| "
|
||||
- "L1.0[180,200] 18mb |-----L1.0-----|"
|
||||
- "**** Final Output Files "
|
||||
- "L1 "
|
||||
- "L1.11[100,180] 72mb |----------------------------L1.11-----------------------------| "
|
||||
|
@ -176,6 +180,10 @@ async fn all_non_overlapping_l0() {
|
|||
- "L0.3[200,201] |L0.3| "
|
||||
- "L0.2[100,101] |L0.2| "
|
||||
- "L0.1[0,1] |L0.1| "
|
||||
- "**** 2 Output Files (parquet_file_id not yet assigned), 100mb total:"
|
||||
- "L1 "
|
||||
- "L1.0[0,720] 79.91mb |----------------------------L1.0-----------------------------| "
|
||||
- "L1.0[720,901] 20.09mb |-----L1.0-----| "
|
||||
- "**** Final Output Files "
|
||||
- "L1 "
|
||||
- "L1.11[0,720] 79.91mb|----------------------------L1.11----------------------------| "
|
||||
|
@ -245,6 +253,9 @@ async fn l1_with_overlapping_l0() {
|
|||
- "L0.3[140,190] 5kb |------L0.3-------| "
|
||||
- "L1 "
|
||||
- "L1.2[100,150] 10mb |------L1.2-------| "
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 10.02mb total:"
|
||||
- "L1, all files 10.02mb "
|
||||
- "L1.0[100,310] |-------------------------------------L1.0-------------------------------------|"
|
||||
- "**** Final Output Files "
|
||||
- "L1 "
|
||||
- "L1.1[50,100] 10mb |----L1.1-----| "
|
||||
|
@ -310,6 +321,9 @@ async fn l1_with_non_overlapping_l0() {
|
|||
- "L0.5[400,450] |-----L0.5-----| "
|
||||
- "L0.4[350,400] |-----L0.4-----| "
|
||||
- "L0.3[300,350] |-----L0.3-----| "
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 25kb total:"
|
||||
- "L1, all files 25kb "
|
||||
- "L1.0[300,550] |-------------------------------------L1.0-------------------------------------|"
|
||||
- "**** Final Output Files "
|
||||
- "L1 "
|
||||
- "L1.1[50,100] 10mb |-L1.1-| "
|
||||
|
@ -374,6 +388,9 @@ async fn l1_with_non_overlapping_l0_larger() {
|
|||
- "L0.7[400,450] |----------L0.7----------| "
|
||||
- "L0.6[350,400] |----------L0.6----------| "
|
||||
- "L0.5[300,350] |----------L0.5----------| "
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:"
|
||||
- "L1, all files 15mb "
|
||||
- "L1.0[300,450] |-------------------------------------L1.0-------------------------------------|"
|
||||
- "**** Simulation run 1, type=split(split_times=[370]). 5 Input Files, 108mb total:"
|
||||
- "L1 "
|
||||
- "L1.4[200,250] 3mb |--L1.4--| "
|
||||
|
@ -381,6 +398,10 @@ async fn l1_with_non_overlapping_l0_larger() {
|
|||
- "L1.2[100,150] 50mb |--L1.2--| "
|
||||
- "L1.1[50,100] 20mb |--L1.1--| "
|
||||
- "L1.8[300,450] 15mb |------------L1.8------------|"
|
||||
- "**** 2 Output Files (parquet_file_id not yet assigned), 108mb total:"
|
||||
- "L2 "
|
||||
- "L2.0[50,370] 86.4mb |-----------------------------L2.0-----------------------------| "
|
||||
- "L2.0[370,450] 21.6mb |-----L2.0-----|"
|
||||
- "**** Final Output Files "
|
||||
- "L2 "
|
||||
- "L2.9[50,370] 86.4mb |-----------------------------L2.9-----------------------------| "
|
||||
|
@ -453,6 +474,9 @@ async fn l1_too_much_with_non_overlapping_l0() {
|
|||
- "L0.13[600,650] |------------------------------------L0.13-------------------------------------|"
|
||||
- "L0.12[600,650] |------------------------------------L0.12-------------------------------------|"
|
||||
- "L0.11[600,650] |------------------------------------L0.11-------------------------------------|"
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:"
|
||||
- "L1, all files 15mb "
|
||||
- "L1.0[600,650] |-------------------------------------L1.0-------------------------------------|"
|
||||
- "SKIPPED COMPACTION for PartitionId(1): partition 1 has 781189120 parquet file bytes, limit is 268435456"
|
||||
- "**** Final Output Files "
|
||||
- "L1 "
|
||||
|
@ -534,6 +558,9 @@ async fn many_l1_with_non_overlapping_l0() {
|
|||
- "L0.13[600,650] |------------------------------------L0.13-------------------------------------|"
|
||||
- "L0.12[600,650] |------------------------------------L0.12-------------------------------------|"
|
||||
- "L0.11[600,650] |------------------------------------L0.11-------------------------------------|"
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:"
|
||||
- "L1, all files 15mb "
|
||||
- "L1.0[600,650] |-------------------------------------L1.0-------------------------------------|"
|
||||
- "**** Simulation run 1, type=split(split_times=[530]). 11 Input Files, 88mb total:"
|
||||
- "L1 "
|
||||
- "L1.10[500,550] 7mb |L1.10| "
|
||||
|
@ -547,6 +574,10 @@ async fn many_l1_with_non_overlapping_l0() {
|
|||
- "L1.2[100,150] 8mb |L1.2| "
|
||||
- "L1.1[50,100] 9mb |L1.1| "
|
||||
- "L1.14[600,650] 15mb |L1.14|"
|
||||
- "**** 2 Output Files (parquet_file_id not yet assigned), 88mb total:"
|
||||
- "L2 "
|
||||
- "L2.0[50,530] 70.4mb |-----------------------------L2.0-----------------------------| "
|
||||
- "L2.0[530,650] 17.6mb |-----L2.0-----|"
|
||||
- "**** Final Output Files "
|
||||
- "L2 "
|
||||
- "L2.15[50,530] 70.4mb|----------------------------L2.15-----------------------------| "
|
||||
|
@ -610,11 +641,18 @@ async fn large_l1_with_non_overlapping_l0() {
|
|||
- "L0.5[600,650] |-------------------------------------L0.5-------------------------------------|"
|
||||
- "L0.4[600,650] |-------------------------------------L0.4-------------------------------------|"
|
||||
- "L0.3[600,650] |-------------------------------------L0.3-------------------------------------|"
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:"
|
||||
- "L1, all files 15mb "
|
||||
- "L1.0[600,650] |-------------------------------------L1.0-------------------------------------|"
|
||||
- "**** Simulation run 1, type=split(split_times=[375]). 3 Input Files, 185mb total:"
|
||||
- "L1 "
|
||||
- "L1.2[100,150] 80mb |L1.2| "
|
||||
- "L1.1[50,100] 90mb |L1.1| "
|
||||
- "L1.6[600,650] 15mb |L1.6| "
|
||||
- "**** 2 Output Files (parquet_file_id not yet assigned), 185mb total:"
|
||||
- "L2 "
|
||||
- "L2.0[50,375] 100.21mb|------------------L2.0-------------------| "
|
||||
- "L2.0[375,650] 84.79mb |---------------L2.0---------------| "
|
||||
- "**** Final Output Files "
|
||||
- "L2 "
|
||||
- "L2.7[50,375] 100.21mb|------------------L2.7-------------------| "
|
||||
|
@ -690,6 +728,9 @@ async fn many_l1_files() {
|
|||
- "L0.23[24,25] |------------------------------------L0.23-------------------------------------|"
|
||||
- "L0.22[24,25] |------------------------------------L0.22-------------------------------------|"
|
||||
- "L0.21[24,25] |------------------------------------L0.21-------------------------------------|"
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 3mb total:"
|
||||
- "L1, all files 3mb "
|
||||
- "L1.0[24,25] |-------------------------------------L1.0-------------------------------------|"
|
||||
- "**** Simulation run 1, type=split(split_times=[13]). 21 Input Files, 203mb total:"
|
||||
- "L1 "
|
||||
- "L1.20[19,20] 10mb |L1.20| "
|
||||
|
@ -713,6 +754,10 @@ async fn many_l1_files() {
|
|||
- "L1.2[1,2] 10mb |L1.2| "
|
||||
- "L1.1[0,1] 10mb |L1.1| "
|
||||
- "L1.24[24,25] 3mb |L1.24|"
|
||||
- "**** 2 Output Files (parquet_file_id not yet assigned), 203mb total:"
|
||||
- "L2 "
|
||||
- "L2.0[0,13] 105.56mb |-----------------L2.0------------------| "
|
||||
- "L2.0[13,25] 97.44mb |----------------L2.0----------------| "
|
||||
- "**** Final Output Files "
|
||||
- "L2 "
|
||||
- "L2.25[0,13] 105.56mb|-----------------L2.25-----------------| "
|
||||
|
@ -1246,6 +1291,9 @@ async fn many_tiny_l0_files() {
|
|||
- "L0.198[197,198] |L0.198|"
|
||||
- "L0.199[198,199] |L0.199|"
|
||||
- "L0.200[199,200] |L0.200|"
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:"
|
||||
- "L0, all files 1.37mb "
|
||||
- "L0.0[0,200] |-------------------------------------L0.0-------------------------------------|"
|
||||
- "**** Simulation run 1, type=compact. 88 Input Files, 616kb total:"
|
||||
- "L0, all files 7kb "
|
||||
- "L0.201[200,201] |L0.201| "
|
||||
|
@ -1336,10 +1384,16 @@ async fn many_tiny_l0_files() {
|
|||
- "L0.286[285,286] |L0.286|"
|
||||
- "L0.287[286,287] |L0.287|"
|
||||
- "L0.288[287,288] |L0.288|"
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 616kb total:"
|
||||
- "L0, all files 616kb "
|
||||
- "L0.0[200,288] |-------------------------------------L0.0-------------------------------------|"
|
||||
- "**** Simulation run 2, type=compact. 2 Input Files, 1.97mb total:"
|
||||
- "L0 "
|
||||
- "L0.290[200,288] 616kb |--------L0.290--------| "
|
||||
- "L0.289[0,200] 1.37mb|-----------------------L0.289------------------------| "
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.97mb total:"
|
||||
- "L1, all files 1.97mb "
|
||||
- "L1.0[0,288] |-------------------------------------L1.0-------------------------------------|"
|
||||
- "**** Final Output Files "
|
||||
- "L1, all files 1.97mb "
|
||||
- "L1.291[0,288] |------------------------------------L1.291------------------------------------|"
|
||||
|
@ -1992,6 +2046,9 @@ async fn over_two_times_max_files_per_plan() {
|
|||
- "L0.198[197,198] |L0.198|"
|
||||
- "L0.199[198,199] |L0.199|"
|
||||
- "L0.200[199,200] |L0.200|"
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:"
|
||||
- "L0, all files 1.37mb "
|
||||
- "L0.0[0,200] |-------------------------------------L0.0-------------------------------------|"
|
||||
- "**** Simulation run 1, type=compact. 200 Input Files, 1.37mb total:"
|
||||
- "L0, all files 7kb "
|
||||
- "L0.201[200,201] |L0.201| "
|
||||
|
@ -2194,6 +2251,9 @@ async fn over_two_times_max_files_per_plan() {
|
|||
- "L0.398[397,398] |L0.398|"
|
||||
- "L0.399[398,399] |L0.399|"
|
||||
- "L0.400[399,400] |L0.400|"
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:"
|
||||
- "L0, all files 1.37mb "
|
||||
- "L0.0[200,400] |-------------------------------------L0.0-------------------------------------|"
|
||||
- "**** Simulation run 2, type=compact. 10 Input Files, 70kb total:"
|
||||
- "L0, all files 7kb "
|
||||
- "L0.401[400,401] |L0.401| "
|
||||
|
@ -2206,11 +2266,17 @@ async fn over_two_times_max_files_per_plan() {
|
|||
- "L0.408[407,408] |L0.408| "
|
||||
- "L0.409[408,409] |L0.409| "
|
||||
- "L0.410[409,410] |L0.410|"
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 70kb total:"
|
||||
- "L0, all files 70kb "
|
||||
- "L0.0[400,410] |-------------------------------------L0.0-------------------------------------|"
|
||||
- "**** Simulation run 3, type=compact. 3 Input Files, 2.8mb total:"
|
||||
- "L0 "
|
||||
- "L0.413[400,410] 70kb |L0.413|"
|
||||
- "L0.412[200,400] 1.37mb |---------------L0.412----------------| "
|
||||
- "L0.411[0,200] 1.37mb|---------------L0.411----------------| "
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 2.8mb total:"
|
||||
- "L1, all files 2.8mb "
|
||||
- "L1.0[0,410] |-------------------------------------L1.0-------------------------------------|"
|
||||
- "**** Final Output Files "
|
||||
- "L1, all files 2.8mb "
|
||||
- "L1.414[0,410] |------------------------------------L1.414------------------------------------|"
|
||||
|
@ -2743,6 +2809,9 @@ async fn many_tiny_l1_files() {
|
|||
- "L1.198[197,198] |L1.198|"
|
||||
- "L1.199[198,199] |L1.199|"
|
||||
- "L1.200[199,200] |L1.200|"
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:"
|
||||
- "L1, all files 1.37mb "
|
||||
- "L1.0[0,200] |-------------------------------------L1.0-------------------------------------|"
|
||||
- "**** Simulation run 1, type=compact. 88 Input Files, 616kb total:"
|
||||
- "L1, all files 7kb "
|
||||
- "L1.201[200,201] |L1.201| "
|
||||
|
@ -2833,6 +2902,9 @@ async fn many_tiny_l1_files() {
|
|||
- "L1.286[285,286] |L1.286|"
|
||||
- "L1.287[286,287] |L1.287|"
|
||||
- "L1.288[287,288] |L1.288|"
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 616kb total:"
|
||||
- "L1, all files 616kb "
|
||||
- "L1.0[200,288] |-------------------------------------L1.0-------------------------------------|"
|
||||
- "**** Final Output Files "
|
||||
- "L1 "
|
||||
- "L1.289[0,200] 1.37mb|-----------------------L1.289------------------------| "
|
||||
|
@ -3299,6 +3371,9 @@ async fn many_l0_and_overlapped_l1_files() {
|
|||
- "L0.188[187,188] |L0.188|"
|
||||
- "L0.189[188,189] |L0.189|"
|
||||
- "L0.190[189,190] |L0.190|"
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.3mb total:"
|
||||
- "L0, all files 1.3mb "
|
||||
- "L0.0[0,190] |-------------------------------------L0.0-------------------------------------|"
|
||||
- "**** Simulation run 1, type=split(split_times=[159]). 21 Input Files, 21.3mb total:"
|
||||
- "L0 "
|
||||
- "L0.211[0,190] 1.3mb |----------------------------------L0.211----------------------------------| "
|
||||
|
@ -3323,6 +3398,10 @@ async fn many_l0_and_overlapped_l1_files() {
|
|||
- "L1.207[160,169] 1mb |L1.207| "
|
||||
- "L1.208[170,179] 1mb |L1.208| "
|
||||
- "L1.209[180,189] 1mb |L1.209|"
|
||||
- "**** 2 Output Files (parquet_file_id not yet assigned), 21.3mb total:"
|
||||
- "L1 "
|
||||
- "L1.0[0,159] 17.02mb |----------------------------L1.0-----------------------------| "
|
||||
- "L1.0[159,199] 4.28mb |-----L1.0-----| "
|
||||
- "**** Final Output Files "
|
||||
- "L1 "
|
||||
- "L1.212[0,159] 17.02mb|---------------------------L1.212----------------------------| "
|
||||
|
@ -3810,6 +3889,9 @@ async fn not_many_l0_and_overlapped_l1_files() {
|
|||
- "L1.192[10,19] 1mb |L1.192| "
|
||||
- "L1.193[20,29] 1mb |L1.193| "
|
||||
- "L1.194[30,39] 1mb |L1.194| "
|
||||
- "**** 1 Output Files (parquet_file_id not yet assigned), 6.3mb total:"
|
||||
- "L1, all files 6.3mb "
|
||||
- "L1.0[0,190] |-------------------------------------L1.0-------------------------------------|"
|
||||
- "**** Simulation run 1, type=split(split_times=[2407]). 16 Input Files, 21.3mb total:"
|
||||
- "L1 "
|
||||
- "L1.196[200,209] 1mb |L1.196| "
|
||||
|
@ -3828,6 +3910,10 @@ async fn not_many_l0_and_overlapped_l1_files() {
|
|||
- "L1.209[2800,2809] 1mb |L1.209|"
|
||||
- "L1.210[3000,3009] 1mb |L1.210|"
|
||||
- "L1.211[0,190] 6.3mb |L1.211| "
|
||||
- "**** 2 Output Files (parquet_file_id not yet assigned), 21.3mb total:"
|
||||
- "L2 "
|
||||
- "L2.0[0,2407] 17.04mb|----------------------------L2.0-----------------------------| "
|
||||
- "L2.0[2407,3009] 4.26mb |-----L2.0-----| "
|
||||
- "**** Final Output Files "
|
||||
- "L2 "
|
||||
- "L2.212[0,2407] 17.04mb|---------------------------L2.212----------------------------| "
|
||||
|
|
|
@ -37,7 +37,7 @@ const DEFAULT_HEADING_WIDTH: usize = 20;
|
|||
/// parquet files arranged so they are lined up horizontally based on
|
||||
/// their relative time range.
|
||||
///
|
||||
/// See docs on [`ParquetFileFormatter`]z for examples.
|
||||
/// See docs on [`ParquetFileFormatter`] for examples.
|
||||
fn readable_list_of_files<'a>(
|
||||
title: Option<String>,
|
||||
files: impl IntoIterator<Item = &'a ParquetFile>,
|
||||
|
|
|
@ -5,7 +5,8 @@ use std::{
|
|||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{
|
||||
ColumnSet, CompactionLevel, ParquetFile, ParquetFileParams, SequenceNumber, ShardId, Timestamp,
|
||||
ColumnSet, CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, SequenceNumber,
|
||||
ShardId, Timestamp,
|
||||
};
|
||||
use datafusion::physical_plan::SendableRecordBatchStream;
|
||||
use iox_time::Time;
|
||||
|
@ -69,24 +70,50 @@ impl ParquetFileSimulator {
|
|||
runs.into_iter()
|
||||
.enumerate()
|
||||
.flat_map(|(i, run)| {
|
||||
let total_input_size: i64 = run
|
||||
.input_parquet_files
|
||||
.iter()
|
||||
.map(|f| f.file_size_bytes)
|
||||
.sum();
|
||||
let title = format!(
|
||||
let SimulatedRun {
|
||||
plan_type,
|
||||
input_parquet_files,
|
||||
output_params,
|
||||
} = run;
|
||||
|
||||
let input_title = format!(
|
||||
"**** Simulation run {}, type={}. {} Input Files, {} total:",
|
||||
i,
|
||||
run.plan_type,
|
||||
run.input_parquet_files.len(),
|
||||
display_size(total_input_size)
|
||||
plan_type,
|
||||
input_parquet_files.len(),
|
||||
display_size(total_size(input_parquet_files.iter()))
|
||||
);
|
||||
format_files(title, &run.input_parquet_files)
|
||||
|
||||
// display the files created by this run
|
||||
let output_parquet_files: Vec<_> = output_params
|
||||
.into_iter()
|
||||
.map(|params| {
|
||||
// Use file id 0 as they haven't been
|
||||
// assigned an id in the catalog yet
|
||||
ParquetFile::from_params(params, ParquetFileId::new(0))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let output_title = format!(
|
||||
"**** {} Output Files (parquet_file_id not yet assigned), {} total:",
|
||||
output_parquet_files.len(),
|
||||
display_size(total_size(output_parquet_files.iter()))
|
||||
);
|
||||
|
||||
// hook up inputs and outputs
|
||||
format_files(input_title, &input_parquet_files)
|
||||
.into_iter()
|
||||
.chain(format_files(output_title, &output_parquet_files).into_iter())
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// return the total file size of all the parquet files
|
||||
fn total_size<'a>(parquet_files: impl IntoIterator<Item = &'a ParquetFile>) -> i64 {
|
||||
parquet_files.into_iter().map(|f| f.file_size_bytes).sum()
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ParquetFilesSink for ParquetFileSimulator {
|
||||
async fn stream_into_file_sink(
|
||||
|
@ -134,20 +161,21 @@ impl ParquetFilesSink for ParquetFileSimulator {
|
|||
let partition_info = partition_info.as_ref();
|
||||
|
||||
// Compute final output
|
||||
let output: Vec<_> = output_files
|
||||
let output_params: Vec<_> = output_files
|
||||
.into_iter()
|
||||
.map(|f| {
|
||||
f.into_parquet_file_params(max_l0_created_at, column_set.clone(), partition_info)
|
||||
})
|
||||
.collect();
|
||||
|
||||
// record what we did
|
||||
// record what the simulator did
|
||||
self.runs.lock().unwrap().push(SimulatedRun {
|
||||
plan_type,
|
||||
input_parquet_files,
|
||||
output_params: output_params.clone(),
|
||||
});
|
||||
|
||||
Ok(output)
|
||||
Ok(output_params)
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
|
@ -229,6 +257,7 @@ pub struct SimulatedRun {
|
|||
// fields are used in testing
|
||||
plan_type: String,
|
||||
input_parquet_files: Vec<ParquetFile>,
|
||||
output_params: Vec<ParquetFileParams>,
|
||||
}
|
||||
|
||||
fn overall_column_set<'a>(files: impl IntoIterator<Item = &'a ParquetFile>) -> ColumnSet {
|
||||
|
|
Loading…
Reference in New Issue