feat: Use ? for id of uncreated parquet files (#7066)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2023-02-27 17:35:51 +01:00 committed by GitHub
parent 63bf663b94
commit 5194999d62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 147 additions and 113 deletions

View File

@ -126,8 +126,8 @@ async fn all_overlapping_l0() {
- "L0.1[100,200] |-------------------------------------L0.1-------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 90mb total:"
- "L1 "
- "L1.0[100,180] 72mb |-----------------------------L1.0-----------------------------| "
- "L1.0[180,200] 18mb |-----L1.0-----|"
- "L1.?[100,180] 72mb |-----------------------------L1.?-----------------------------| "
- "L1.?[180,200] 18mb |-----L1.?-----|"
- "**** Final Output Files "
- "L1 "
- "L1.11[100,180] 72mb |----------------------------L1.11-----------------------------| "
@ -186,8 +186,8 @@ async fn all_overlapping_l0_split_percentage() {
- "L0.1[100,200] |-------------------------------------L0.1-------------------------------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 90mb total:"
- "L1 "
- "L1.0[100,195] 85.5mb|-----------------------------------L1.0-----------------------------------| "
- "L1.0[195,200] 4.5mb |L1.0|"
- "L1.?[100,195] 85.5mb|-----------------------------------L1.?-----------------------------------| "
- "L1.?[195,200] 4.5mb |L1.?|"
- "**** Final Output Files "
- "L1 "
- "L1.11[100,195] 85.5mb|----------------------------------L1.11-----------------------------------| "
@ -248,7 +248,7 @@ async fn all_overlapping_l0_max_file_size() {
- "L0.1[100,200] |-------------------------------------L0.1-------------------------------------|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 90mb total:"
- "L1, all files 90mb "
- "L1.0[100,200] |-------------------------------------L1.0-------------------------------------|"
- "L1.?[100,200] |-------------------------------------L1.?-------------------------------------|"
- "**** Final Output Files "
- "L1, all files 90mb "
- "L1.11[100,200] |------------------------------------L1.11-------------------------------------|"
@ -308,15 +308,15 @@ async fn all_overlapping_l0_split_percentage_and_max_file_size() {
- "L0.1[100,200] |-------------------------------------L0.1-------------------------------------|"
- "**** 9 Output Files (parquet_file_id not yet assigned), 90mb total:"
- "L1 "
- "L1.0[100,112] 10.8mb|-L1.0--| "
- "L1.0[112,124] 10.8mb |-L1.0--| "
- "L1.0[124,136] 10.8mb |-L1.0--| "
- "L1.0[136,148] 10.8mb |-L1.0--| "
- "L1.0[148,160] 10.8mb |-L1.0--| "
- "L1.0[160,172] 10.8mb |-L1.0--| "
- "L1.0[172,184] 10.8mb |-L1.0--| "
- "L1.0[184,196] 10.8mb |-L1.0--| "
- "L1.0[196,200] 3.6mb |L1.0|"
- "L1.?[100,112] 10.8mb|-L1.?--| "
- "L1.?[112,124] 10.8mb |-L1.?--| "
- "L1.?[124,136] 10.8mb |-L1.?--| "
- "L1.?[136,148] 10.8mb |-L1.?--| "
- "L1.?[148,160] 10.8mb |-L1.?--| "
- "L1.?[160,172] 10.8mb |-L1.?--| "
- "L1.?[172,184] 10.8mb |-L1.?--| "
- "L1.?[184,196] 10.8mb |-L1.?--| "
- "L1.?[196,200] 3.6mb |L1.?|"
- "**** Simulation run 1, type=split(split_times=[112, 124, 136, 148, 160, 172, 184, 196]). 9 Input Files, 90mb total:"
- "L1 "
- "L1.19[196,200] 3.6mb |L1.19|"
@ -330,15 +330,15 @@ async fn all_overlapping_l0_split_percentage_and_max_file_size() {
- "L1.11[100,112] 10.8mb|-L1.11-| "
- "**** 9 Output Files (parquet_file_id not yet assigned), 90mb total:"
- "L2 "
- "L2.0[100,112] 10.8mb|-L2.0--| "
- "L2.0[112,124] 10.8mb |-L2.0--| "
- "L2.0[124,136] 10.8mb |-L2.0--| "
- "L2.0[136,148] 10.8mb |-L2.0--| "
- "L2.0[148,160] 10.8mb |-L2.0--| "
- "L2.0[160,172] 10.8mb |-L2.0--| "
- "L2.0[172,184] 10.8mb |-L2.0--| "
- "L2.0[184,196] 10.8mb |-L2.0--| "
- "L2.0[196,200] 3.6mb |L2.0|"
- "L2.?[100,112] 10.8mb|-L2.?--| "
- "L2.?[112,124] 10.8mb |-L2.?--| "
- "L2.?[124,136] 10.8mb |-L2.?--| "
- "L2.?[136,148] 10.8mb |-L2.?--| "
- "L2.?[148,160] 10.8mb |-L2.?--| "
- "L2.?[160,172] 10.8mb |-L2.?--| "
- "L2.?[172,184] 10.8mb |-L2.?--| "
- "L2.?[184,196] 10.8mb |-L2.?--| "
- "L2.?[196,200] 3.6mb |L2.?|"
- "**** Final Output Files "
- "L2 "
- "L2.20[100,112] 10.8mb|-L2.20-| "
@ -403,8 +403,8 @@ async fn all_non_overlapping_l0() {
- "L0.1[0,1] |L0.1| "
- "**** 2 Output Files (parquet_file_id not yet assigned), 100mb total:"
- "L1 "
- "L1.0[0,720] 79.91mb |----------------------------L1.0-----------------------------| "
- "L1.0[720,901] 20.09mb |-----L1.0-----| "
- "L1.?[0,720] 79.91mb |----------------------------L1.?-----------------------------| "
- "L1.?[720,901] 20.09mb |-----L1.?-----| "
- "**** Final Output Files "
- "L1 "
- "L1.11[0,720] 79.91mb|----------------------------L1.11----------------------------| "
@ -476,7 +476,7 @@ async fn l1_with_overlapping_l0() {
- "L1.2[100,150] 10mb |------L1.2-------| "
- "**** 1 Output Files (parquet_file_id not yet assigned), 10.02mb total:"
- "L1, all files 10.02mb "
- "L1.0[100,310] |-------------------------------------L1.0-------------------------------------|"
- "L1.?[100,310] |-------------------------------------L1.?-------------------------------------|"
- "**** Final Output Files "
- "L1 "
- "L1.1[50,100] 10mb |----L1.1-----| "
@ -544,7 +544,7 @@ async fn l1_with_non_overlapping_l0() {
- "L0.3[300,350] |-----L0.3-----| "
- "**** 1 Output Files (parquet_file_id not yet assigned), 25kb total:"
- "L1, all files 25kb "
- "L1.0[300,550] |-------------------------------------L1.0-------------------------------------|"
- "L1.?[300,550] |-------------------------------------L1.?-------------------------------------|"
- "**** Final Output Files "
- "L1 "
- "L1.1[50,100] 10mb |-L1.1-| "
@ -611,7 +611,7 @@ async fn l1_with_non_overlapping_l0_larger() {
- "L0.5[300,350] |----------L0.5----------| "
- "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:"
- "L1, all files 15mb "
- "L1.0[300,450] |-------------------------------------L1.0-------------------------------------|"
- "L1.?[300,450] |-------------------------------------L1.?-------------------------------------|"
- "**** Simulation run 1, type=split(split_times=[370]). 5 Input Files, 108mb total:"
- "L1 "
- "L1.4[200,250] 3mb |--L1.4--| "
@ -621,8 +621,8 @@ async fn l1_with_non_overlapping_l0_larger() {
- "L1.8[300,450] 15mb |------------L1.8------------|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 108mb total:"
- "L2 "
- "L2.0[50,370] 86.4mb |-----------------------------L2.0-----------------------------| "
- "L2.0[370,450] 21.6mb |-----L2.0-----|"
- "L2.?[50,370] 86.4mb |-----------------------------L2.?-----------------------------| "
- "L2.?[370,450] 21.6mb |-----L2.?-----|"
- "**** Final Output Files "
- "L2 "
- "L2.9[50,370] 86.4mb |-----------------------------L2.9-----------------------------| "
@ -697,7 +697,7 @@ async fn l1_too_much_with_non_overlapping_l0() {
- "L0.11[600,650] |------------------------------------L0.11-------------------------------------|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:"
- "L1, all files 15mb "
- "L1.0[600,650] |-------------------------------------L1.0-------------------------------------|"
- "L1.?[600,650] |-------------------------------------L1.?-------------------------------------|"
- "SKIPPED COMPACTION for PartitionId(1): partition 1 has 781189120 parquet file bytes, limit is 268435456"
- "**** Final Output Files "
- "L1 "
@ -781,7 +781,7 @@ async fn many_l1_with_non_overlapping_l0() {
- "L0.11[600,650] |------------------------------------L0.11-------------------------------------|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:"
- "L1, all files 15mb "
- "L1.0[600,650] |-------------------------------------L1.0-------------------------------------|"
- "L1.?[600,650] |-------------------------------------L1.?-------------------------------------|"
- "**** Simulation run 1, type=split(split_times=[530]). 11 Input Files, 88mb total:"
- "L1 "
- "L1.10[500,550] 7mb |L1.10| "
@ -797,8 +797,8 @@ async fn many_l1_with_non_overlapping_l0() {
- "L1.14[600,650] 15mb |L1.14|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 88mb total:"
- "L2 "
- "L2.0[50,530] 70.4mb |-----------------------------L2.0-----------------------------| "
- "L2.0[530,650] 17.6mb |-----L2.0-----|"
- "L2.?[50,530] 70.4mb |-----------------------------L2.?-----------------------------| "
- "L2.?[530,650] 17.6mb |-----L2.?-----|"
- "**** Final Output Files "
- "L2 "
- "L2.15[50,530] 70.4mb|----------------------------L2.15-----------------------------| "
@ -864,7 +864,7 @@ async fn large_l1_with_non_overlapping_l0() {
- "L0.3[600,650] |-------------------------------------L0.3-------------------------------------|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:"
- "L1, all files 15mb "
- "L1.0[600,650] |-------------------------------------L1.0-------------------------------------|"
- "L1.?[600,650] |-------------------------------------L1.?-------------------------------------|"
- "**** Simulation run 1, type=split(split_times=[375]). 3 Input Files, 185mb total:"
- "L1 "
- "L1.2[100,150] 80mb |L1.2| "
@ -872,8 +872,8 @@ async fn large_l1_with_non_overlapping_l0() {
- "L1.6[600,650] 15mb |L1.6| "
- "**** 2 Output Files (parquet_file_id not yet assigned), 185mb total:"
- "L2 "
- "L2.0[50,375] 100.21mb|------------------L2.0-------------------| "
- "L2.0[375,650] 84.79mb |---------------L2.0---------------| "
- "L2.?[50,375] 100.21mb|------------------L2.?-------------------| "
- "L2.?[375,650] 84.79mb |---------------L2.?---------------| "
- "**** Final Output Files "
- "L2 "
- "L2.7[50,375] 100.21mb|------------------L2.7-------------------| "
@ -951,7 +951,7 @@ async fn many_l1_files() {
- "L0.21[24,25] |------------------------------------L0.21-------------------------------------|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 3mb total:"
- "L1, all files 3mb "
- "L1.0[24,25] |-------------------------------------L1.0-------------------------------------|"
- "L1.?[24,25] |-------------------------------------L1.?-------------------------------------|"
- "**** Simulation run 1, type=split(split_times=[13]). 21 Input Files, 203mb total:"
- "L1 "
- "L1.20[19,20] 10mb |L1.20| "
@ -977,8 +977,8 @@ async fn many_l1_files() {
- "L1.24[24,25] 3mb |L1.24|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 203mb total:"
- "L2 "
- "L2.0[0,13] 105.56mb |-----------------L2.0------------------| "
- "L2.0[13,25] 97.44mb |----------------L2.0----------------| "
- "L2.?[0,13] 105.56mb |-----------------L2.?------------------| "
- "L2.?[13,25] 97.44mb |----------------L2.?----------------| "
- "**** Final Output Files "
- "L2 "
- "L2.25[0,13] 105.56mb|-----------------L2.25-----------------| "
@ -1514,7 +1514,7 @@ async fn many_tiny_l0_files() {
- "L0.200[199,200] |L0.200|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:"
- "L0, all files 1.37mb "
- "L0.0[0,200] |-------------------------------------L0.0-------------------------------------|"
- "L0.?[0,200] |-------------------------------------L0.?-------------------------------------|"
- "**** Simulation run 1, type=compact. 88 Input Files, 616kb total:"
- "L0, all files 7kb "
- "L0.201[200,201] |L0.201| "
@ -1607,14 +1607,14 @@ async fn many_tiny_l0_files() {
- "L0.288[287,288] |L0.288|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 616kb total:"
- "L0, all files 616kb "
- "L0.0[200,288] |-------------------------------------L0.0-------------------------------------|"
- "L0.?[200,288] |-------------------------------------L0.?-------------------------------------|"
- "**** Simulation run 2, type=compact. 2 Input Files, 1.97mb total:"
- "L0 "
- "L0.290[200,288] 616kb |--------L0.290--------| "
- "L0.289[0,200] 1.37mb|-----------------------L0.289------------------------| "
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.97mb total:"
- "L1, all files 1.97mb "
- "L1.0[0,288] |-------------------------------------L1.0-------------------------------------|"
- "L1.?[0,288] |-------------------------------------L1.?-------------------------------------|"
- "**** Final Output Files "
- "L1, all files 1.97mb "
- "L1.291[0,288] |------------------------------------L1.291------------------------------------|"
@ -2269,7 +2269,7 @@ async fn over_two_times_max_files_per_plan() {
- "L0.200[199,200] |L0.200|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:"
- "L0, all files 1.37mb "
- "L0.0[0,200] |-------------------------------------L0.0-------------------------------------|"
- "L0.?[0,200] |-------------------------------------L0.?-------------------------------------|"
- "**** Simulation run 1, type=compact. 200 Input Files, 1.37mb total:"
- "L0, all files 7kb "
- "L0.201[200,201] |L0.201| "
@ -2474,7 +2474,7 @@ async fn over_two_times_max_files_per_plan() {
- "L0.400[399,400] |L0.400|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:"
- "L0, all files 1.37mb "
- "L0.0[200,400] |-------------------------------------L0.0-------------------------------------|"
- "L0.?[200,400] |-------------------------------------L0.?-------------------------------------|"
- "**** Simulation run 2, type=compact. 10 Input Files, 70kb total:"
- "L0, all files 7kb "
- "L0.401[400,401] |L0.401| "
@ -2489,7 +2489,7 @@ async fn over_two_times_max_files_per_plan() {
- "L0.410[409,410] |L0.410|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 70kb total:"
- "L0, all files 70kb "
- "L0.0[400,410] |-------------------------------------L0.0-------------------------------------|"
- "L0.?[400,410] |-------------------------------------L0.?-------------------------------------|"
- "**** Simulation run 3, type=compact. 3 Input Files, 2.8mb total:"
- "L0 "
- "L0.413[400,410] 70kb |L0.413|"
@ -2497,7 +2497,7 @@ async fn over_two_times_max_files_per_plan() {
- "L0.411[0,200] 1.37mb|---------------L0.411----------------| "
- "**** 1 Output Files (parquet_file_id not yet assigned), 2.8mb total:"
- "L1, all files 2.8mb "
- "L1.0[0,410] |-------------------------------------L1.0-------------------------------------|"
- "L1.?[0,410] |-------------------------------------L1.?-------------------------------------|"
- "**** Final Output Files "
- "L1, all files 2.8mb "
- "L1.414[0,410] |------------------------------------L1.414------------------------------------|"
@ -3032,7 +3032,7 @@ async fn many_tiny_l1_files() {
- "L1.200[199,200] |L1.200|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:"
- "L1, all files 1.37mb "
- "L1.0[0,200] |-------------------------------------L1.0-------------------------------------|"
- "L1.?[0,200] |-------------------------------------L1.?-------------------------------------|"
- "**** Simulation run 1, type=compact. 88 Input Files, 616kb total:"
- "L1, all files 7kb "
- "L1.201[200,201] |L1.201| "
@ -3125,7 +3125,7 @@ async fn many_tiny_l1_files() {
- "L1.288[287,288] |L1.288|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 616kb total:"
- "L1, all files 616kb "
- "L1.0[200,288] |-------------------------------------L1.0-------------------------------------|"
- "L1.?[200,288] |-------------------------------------L1.?-------------------------------------|"
- "**** Final Output Files "
- "L1 "
- "L1.289[0,200] 1.37mb|-----------------------L1.289------------------------| "
@ -3594,7 +3594,7 @@ async fn many_l0_and_overlapped_l1_files() {
- "L0.190[189,190] |L0.190|"
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.3mb total:"
- "L0, all files 1.3mb "
- "L0.0[0,190] |-------------------------------------L0.0-------------------------------------|"
- "L0.?[0,190] |-------------------------------------L0.?-------------------------------------|"
- "**** Simulation run 1, type=split(split_times=[159]). 21 Input Files, 21.3mb total:"
- "L0 "
- "L0.211[0,190] 1.3mb |----------------------------------L0.211----------------------------------| "
@ -3621,8 +3621,8 @@ async fn many_l0_and_overlapped_l1_files() {
- "L1.209[180,189] 1mb |L1.209|"
- "**** 2 Output Files (parquet_file_id not yet assigned), 21.3mb total:"
- "L1 "
- "L1.0[0,159] 17.02mb |----------------------------L1.0-----------------------------| "
- "L1.0[159,199] 4.28mb |-----L1.0-----| "
- "L1.?[0,159] 17.02mb |----------------------------L1.?-----------------------------| "
- "L1.?[159,199] 4.28mb |-----L1.?-----| "
- "**** Final Output Files "
- "L1 "
- "L1.212[0,159] 17.02mb|---------------------------L1.212----------------------------| "
@ -4112,7 +4112,7 @@ async fn not_many_l0_and_overlapped_l1_files() {
- "L1.194[30,39] 1mb |L1.194| "
- "**** 1 Output Files (parquet_file_id not yet assigned), 6.3mb total:"
- "L1, all files 6.3mb "
- "L1.0[0,190] |-------------------------------------L1.0-------------------------------------|"
- "L1.?[0,190] |-------------------------------------L1.?-------------------------------------|"
- "**** Simulation run 1, type=split(split_times=[2407]). 16 Input Files, 21.3mb total:"
- "L1 "
- "L1.196[200,209] 1mb |L1.196| "
@ -4133,8 +4133,8 @@ async fn not_many_l0_and_overlapped_l1_files() {
- "L1.211[0,190] 6.3mb |L1.211| "
- "**** 2 Output Files (parquet_file_id not yet assigned), 21.3mb total:"
- "L2 "
- "L2.0[0,2407] 17.04mb|----------------------------L2.0-----------------------------| "
- "L2.0[2407,3009] 4.26mb |-----L2.0-----| "
- "L2.?[0,2407] 17.04mb|----------------------------L2.?-----------------------------| "
- "L2.?[2407,3009] 4.26mb |-----L2.?-----| "
- "**** Final Output Files "
- "L2 "
- "L2.212[0,2407] 17.04mb|---------------------------L2.212----------------------------| "

View File

@ -1,25 +1,81 @@
use std::collections::BTreeMap;
use data_types::{CompactionLevel, ParquetFile};
use data_types::{CompactionLevel, ParquetFile, ParquetFileParams};
/// Trait for ParquetFiles and ParquetFileParams (which are not yet
/// inserted into the catalog and thus are not assigned an ID)
pub trait ParquetFileInfo {
fn min_time(&self) -> i64;
fn max_time(&self) -> i64;
fn file_size_bytes(&self) -> i64;
fn compaction_level(&self) -> CompactionLevel;
/// returns a value like `L0.<id>` to identify this ParquetFile
fn display_id(&self) -> String;
}
impl ParquetFileInfo for ParquetFile {
fn min_time(&self) -> i64 {
self.min_time.get()
}
fn max_time(&self) -> i64 {
self.max_time.get()
}
fn file_size_bytes(&self) -> i64 {
self.file_size_bytes
}
fn display_id(&self) -> String {
let level = display_level(&self.compaction_level());
let id = self.id;
format!("{level}.{id}")
}
fn compaction_level(&self) -> CompactionLevel {
self.compaction_level
}
}
impl ParquetFileInfo for ParquetFileParams {
fn min_time(&self) -> i64 {
self.min_time.get()
}
fn max_time(&self) -> i64 {
self.max_time.get()
}
fn file_size_bytes(&self) -> i64 {
self.file_size_bytes
}
fn compaction_level(&self) -> CompactionLevel {
self.compaction_level
}
fn display_id(&self) -> String {
let level = display_level(&self.compaction_level());
// ID is not assigned, so use '?' in place of id
format!("{level}.?")
}
}
/// Formats the list of files in the manner described on
/// [`ParquetFileFormatter`] into strings suitable for comparison with
/// `insta`.
pub fn format_files<'a>(
title: impl Into<String>,
files: impl IntoIterator<Item = &'a ParquetFile>,
) -> Vec<String> {
pub fn format_files<P: ParquetFileInfo>(title: impl Into<String>, files: &[P]) -> Vec<String> {
readable_list_of_files(Some(title.into()), files)
}
/// Formats two lists of files in the manner described on
/// [`ParquetFileFormatter`] into strings suitable for comparison with
/// `insta`.
pub fn format_files_split<'a>(
pub fn format_files_split<P: ParquetFileInfo>(
title1: impl Into<String>,
files1: impl IntoIterator<Item = &'a ParquetFile>,
files1: &[P],
title2: impl Into<String>,
files2: impl IntoIterator<Item = &'a ParquetFile>,
files2: &[P],
) -> Vec<String> {
let strings1 = readable_list_of_files(Some(title1.into()), files1);
let strings2 = readable_list_of_files(Some(title2.into()), files2);
@ -38,27 +94,23 @@ const DEFAULT_HEADING_WIDTH: usize = 20;
/// their relative time range.
///
/// See docs on [`ParquetFileFormatter`] for examples.
fn readable_list_of_files<'a>(
title: Option<String>,
files: impl IntoIterator<Item = &'a ParquetFile>,
) -> Vec<String> {
fn readable_list_of_files<P: ParquetFileInfo>(title: Option<String>, files: &[P]) -> Vec<String> {
let mut output = vec![];
if let Some(title) = title {
output.push(title);
}
let files: Vec<_> = files.into_iter().collect();
if files.is_empty() {
return output;
}
let formatter = ParquetFileFormatter::new(&files);
let formatter = ParquetFileFormatter::new(files);
// split up the files into groups by levels (compaction levels)
let mut files_by_level = BTreeMap::new();
for file in &files {
for file in files {
let existing_files = files_by_level
.entry(file.compaction_level)
.entry(file.compaction_level())
.or_insert_with(Vec::new);
existing_files.push(file);
}
@ -130,24 +182,24 @@ impl FileSizeSeen {
impl ParquetFileFormatter {
/// calculates display parameters for formatting a set of files
fn new(files: &[&ParquetFile]) -> Self {
fn new<P: ParquetFileInfo>(files: &[P]) -> Self {
let row_heading_chars = DEFAULT_HEADING_WIDTH;
let width_chars = DEFAULT_WIDTH;
let min_time = files
.iter()
.map(|f| f.min_time.get())
.map(|f| f.min_time())
.min()
.expect("at least one file");
let max_time = files
.iter()
.map(|f| f.max_time.get())
.map(|f| f.max_time())
.max()
.expect("at least one file");
let file_size_seen = files
.iter()
.fold(FileSizeSeen::None, |file_size_seen, file| {
file_size_seen.observe(file.file_size_bytes)
file_size_seen.observe(file.file_size_bytes())
});
let time_range = max_time - min_time;
@ -196,9 +248,9 @@ impl ParquetFileFormatter {
/// characters, which tries to visually depict the timge range of
/// the file using the width. See docs on [`ParquetFileFormatter`]
/// for examples.
fn format_file(&self, file: &ParquetFile) -> String {
fn format_file<P: ParquetFileInfo>(&self, file: &P) -> String {
// use try_into to force conversion to usize
let time_width = (file.max_time - file.min_time).get();
let time_width = file.max_time() - file.min_time();
// special case "zero" width times
let field_width = if self.min_time == self.max_time {
@ -211,7 +263,7 @@ impl ParquetFileFormatter {
// Get compact display of the file, like 'L0.1'
// add |--- ---| formatting (based on field width)
let file_string = format!("|{:-^width$}|", display_file_id(file), width = field_width);
let file_string = format!("|{:-^width$}|", file.display_id(), width = field_width);
// show indvidual file sizes if they are different
let show_size = matches!(self.file_size_seen, FileSizeSeen::Many);
let row_heading = display_format(file, show_size);
@ -228,7 +280,7 @@ impl ParquetFileFormatter {
// otherwise, figure out whitespace padding at start and back
// based on the relative start time of the file
// assume time from 0
let prefix_time_range = file.min_time.get().saturating_sub(self.min_time);
let prefix_time_range = file.min_time().saturating_sub(self.min_time);
let prefix_padding = " ".repeat(self.time_range_to_chars(prefix_time_range));
// pad the rest with whitespace
@ -253,13 +305,6 @@ fn display_level(compaction_level: &CompactionLevel) -> &'static str {
}
}
/// Display like 'L0.1' with file level and id
fn display_file_id(file: &ParquetFile) -> String {
let level = display_level(&file.compaction_level);
let id = file.id;
format!("{level}.{id}")
}
/// Format a size for reasonable human reading
pub fn display_size(sz: i64) -> String {
let kbyte = 1024.0;
@ -281,6 +326,11 @@ pub fn display_size(sz: i64) -> String {
}
}
/// return the total file size of all the parquet files
pub fn total_size<P: ParquetFileInfo>(parquet_files: &[P]) -> i64 {
parquet_files.iter().map(|f| f.file_size_bytes()).sum()
}
// https://stackoverflow.com/questions/28655362/how-does-one-round-a-floating-point-number-to-a-specified-number-of-digits
fn round(x: f64, decimals: u32) -> f64 {
let y = 10i32.pow(decimals) as f64;
@ -294,11 +344,11 @@ fn round(x: f64, decimals: u32) -> f64 {
/// ```text
/// L0.1[100,200]@1
/// ```
fn display_format(file: &ParquetFile, show_size: bool) -> String {
let file_id = display_file_id(file);
let min_time = file.min_time.get(); // display as i64
let max_time = file.max_time.get(); // display as i64
let sz = file.file_size_bytes;
fn display_format<P: ParquetFileInfo>(file: &P, show_size: bool) -> String {
let file_id = file.display_id();
let min_time = file.min_time(); // display as i64
let max_time = file.max_time(); // display as i64
let sz = file.file_size_bytes();
if show_size {
let sz = display_size(sz);
format!("{file_id}[{min_time},{max_time}] {sz}")

View File

@ -5,8 +5,7 @@ use std::{
use async_trait::async_trait;
use data_types::{
ColumnSet, CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, SequenceNumber,
ShardId, Timestamp,
ColumnSet, CompactionLevel, ParquetFile, ParquetFileParams, SequenceNumber, ShardId, Timestamp,
};
use datafusion::physical_plan::SendableRecordBatchStream;
use iox_time::Time;
@ -15,7 +14,7 @@ use uuid::Uuid;
use compactor2::{DynError, ParquetFilesSink, PartitionInfo, PlanIR};
use crate::{display_size, format_files};
use crate::{display::total_size, display_size, format_files};
/// Simulates the result of running a compaction plan that
/// produces multiple parquet files.
@ -81,39 +80,24 @@ impl ParquetFileSimulator {
i,
plan_type,
input_parquet_files.len(),
display_size(total_size(input_parquet_files.iter()))
display_size(total_size(&input_parquet_files))
);
// display the files created by this run
let output_parquet_files: Vec<_> = output_params
.into_iter()
.map(|params| {
// Use file id 0 as they haven't been
// assigned an id in the catalog yet
ParquetFile::from_params(params, ParquetFileId::new(0))
})
.collect();
let output_title = format!(
"**** {} Output Files (parquet_file_id not yet assigned), {} total:",
output_parquet_files.len(),
display_size(total_size(output_parquet_files.iter()))
output_params.len(),
display_size(total_size(&output_params))
);
// hook up inputs and outputs
format_files(input_title, &input_parquet_files)
.into_iter()
.chain(format_files(output_title, &output_parquet_files).into_iter())
.chain(format_files(output_title, &output_params).into_iter())
})
.collect()
}
}
/// return the total file size of all the parquet files
fn total_size<'a>(parquet_files: impl IntoIterator<Item = &'a ParquetFile>) -> i64 {
parquet_files.into_iter().map(|f| f.file_size_bytes).sum()
}
#[async_trait]
impl ParquetFilesSink for ParquetFileSimulator {
async fn stream_into_file_sink(