feat: track why bytes are written in compactor simulator (#8493)
* feat: add tracking of why bytes are written in simulator * chore: enable breakdown of why bytes are written in a few larger tests * chore: enable writes breakdown in another test --------- Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
0d009e3467
commit
1cc0926a7f
|
@ -470,6 +470,7 @@ async fn random_backfill_over_l2s() {
|
|||
.with_max_num_files_per_plan(10)
|
||||
.with_max_desired_file_size_bytes(MAX_DESIRED_FILE_SIZE)
|
||||
.with_partition_timeout(Duration::from_secs(10))
|
||||
.with_writes_breakdown()
|
||||
.build()
|
||||
.await;
|
||||
|
||||
|
@ -1021,6 +1022,12 @@ async fn random_backfill_over_l2s() {
|
|||
- "L2.135[765,799] 1.05us 55mb |L2.135| "
|
||||
- "L2.136[800,864] 1.05us 102mb |L2.136| "
|
||||
- "L2.137[865,899] 1.05us 55mb |L2.137| "
|
||||
- "**** Breakdown of where bytes were written"
|
||||
- 1.84gb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))
|
||||
- 500mb written by compact(ManySmallFiles)
|
||||
- 500mb written by split(HighL0OverlapSingleFile)
|
||||
- 597mb written by split(CompactAndSplitOutput(TotalSizeLessThanMaxCompactSize))
|
||||
- 665mb written by split(ReduceOverlap)
|
||||
"###
|
||||
);
|
||||
}
|
||||
|
@ -1040,6 +1047,7 @@ async fn actual_case_from_catalog_1() {
|
|||
.with_max_desired_file_size_bytes(MAX_DESIRED_FILE_SIZE)
|
||||
.with_max_num_files_per_plan(20)
|
||||
.with_suppress_run_output()
|
||||
.with_writes_breakdown()
|
||||
.with_partition_timeout(Duration::from_secs(10))
|
||||
.build()
|
||||
.await;
|
||||
|
@ -3084,6 +3092,12 @@ async fn actual_case_from_catalog_1() {
|
|||
- "L2.664[182,182] 342ns 19mb |L2.664| "
|
||||
- "L2.665[288,293] 342ns 114mb |L2.665| "
|
||||
- "L2.666[294,296] 342ns 57mb |L2.666| "
|
||||
- "**** Breakdown of where bytes were written"
|
||||
- 2.95gb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))
|
||||
- 455mb written by split(ReduceOverlap)
|
||||
- 456mb written by split(CompactAndSplitOutput(TotalSizeLessThanMaxCompactSize))
|
||||
- 5.48gb written by compact(ManySmallFiles)
|
||||
- 6.01gb written by split(HighL0OverlapTotalBacklog)
|
||||
- "WARNING: file L2.598[150,165] 342ns 224mb exceeds soft limit 100mb by more than 50%"
|
||||
- "WARNING: file L2.602[183,197] 342ns 267mb exceeds soft limit 100mb by more than 50%"
|
||||
- "WARNING: file L2.603[198,207] 342ns 157mb exceeds soft limit 100mb by more than 50%"
|
||||
|
|
|
@ -148,6 +148,20 @@ pub(crate) async fn run_layout_scenario(setup: &TestSetup) -> Vec<String> {
|
|||
&sort_files(output_files),
|
||||
));
|
||||
|
||||
if !setup.suppress_writes_breakdown {
|
||||
output.extend(vec![
|
||||
"**** Breakdown of where bytes were written".to_string()
|
||||
]);
|
||||
|
||||
let mut breakdown = Vec::new();
|
||||
for (op, written) in setup.bytes_written_per_plan.lock().unwrap().iter() {
|
||||
let written = *written as i64;
|
||||
breakdown.push(format!("{} written by {}", display_size(written), op));
|
||||
}
|
||||
breakdown.sort();
|
||||
output.extend(breakdown);
|
||||
}
|
||||
|
||||
// verify that the output of the compactor was valid as well
|
||||
setup.verify_invariants().await;
|
||||
|
||||
|
|
|
@ -19,8 +19,9 @@ async fn stuck_l0() {
|
|||
.await
|
||||
.with_max_num_files_per_plan(20)
|
||||
.with_max_desired_file_size_bytes(MAX_DESIRED_FILE_SIZE)
|
||||
.with_partition_timeout(Duration::from_millis(10000))
|
||||
.with_partition_timeout(Duration::from_millis(100000))
|
||||
.with_suppress_run_output() // remove this to debug
|
||||
.with_writes_breakdown()
|
||||
.build()
|
||||
.await;
|
||||
|
||||
|
@ -1189,6 +1190,13 @@ async fn stuck_l0() {
|
|||
- "L2.1790[1686872478395764410,1686872728189189161] 1686936871.55s 51mb |L2.1790|"
|
||||
- "L2.1791[1686872728189189162,1686873217700530422] 1686936871.55s 100mb |L2.1791|"
|
||||
- "L2.1792[1686873217700530423,1686873599000000000] 1686936871.55s 78mb |L2.1792|"
|
||||
- "**** Breakdown of where bytes were written"
|
||||
- 1.66gb written by split(CompactAndSplitOutput(TotalSizeLessThanMaxCompactSize))
|
||||
- 18.49gb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))
|
||||
- 5.22gb written by compact(ManySmallFiles)
|
||||
- 524mb written by split(StartLevelOverlapsTooBig)
|
||||
- 7.29gb written by split(ReduceOverlap)
|
||||
- 8.26gb written by split(HighL0OverlapTotalBacklog)
|
||||
"###
|
||||
);
|
||||
}
|
||||
|
@ -1202,6 +1210,7 @@ async fn stuck_l1() {
|
|||
.with_max_num_files_per_plan(20)
|
||||
.with_max_desired_file_size_bytes(MAX_DESIRED_FILE_SIZE)
|
||||
.with_partition_timeout(Duration::from_millis(100))
|
||||
.with_writes_breakdown()
|
||||
.build()
|
||||
.await;
|
||||
|
||||
|
@ -1424,6 +1433,9 @@ async fn stuck_l1() {
|
|||
- "L2.23[1686873630000000000,1686879262359644750] 1686928116.94s 100mb|-L2.23-| "
|
||||
- "L2.24[1686879262359644751,1686884894719289500] 1686928116.94s 100mb |-L2.24-| "
|
||||
- "L2.25[1686884894719289501,1686888172000000000] 1686928116.94s 58mb |L2.25| "
|
||||
- "**** Breakdown of where bytes were written"
|
||||
- 258mb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))
|
||||
- 71mb written by split(ReduceOverlap)
|
||||
"###
|
||||
);
|
||||
// TODO(maybe): see matching comment in files_to_compact.rs/limit_files_to_compact
|
||||
|
@ -1445,6 +1457,7 @@ async fn stuck_l0_large_l0s() {
|
|||
.with_max_desired_file_size_bytes(MAX_DESIRED_FILE_SIZE)
|
||||
.with_partition_timeout(Duration::from_millis(100000))
|
||||
//.with_suppress_run_output() // remove this to debug
|
||||
.with_writes_breakdown()
|
||||
.build()
|
||||
.await;
|
||||
|
||||
|
@ -11523,6 +11536,14 @@ async fn stuck_l0_large_l0s() {
|
|||
- "L2.3516[1682,723271] 199ns 100mb|-----------L2.3516------------| "
|
||||
- "L2.3517[723272,1444860] 199ns 100mb |-----------L2.3517------------| "
|
||||
- "L2.3518[1444861,1990000] 199ns 76mb |-------L2.3518--------| "
|
||||
- "**** Breakdown of where bytes were written"
|
||||
- 1.43gb written by split(CompactAndSplitOutput(TotalSizeLessThanMaxCompactSize))
|
||||
- 132b written by compact(FoundSubsetLessThanMaxCompactSize)
|
||||
- 2.14gb written by compact(ManySmallFiles)
|
||||
- 2kb written by compact(TotalSizeLessThanMaxCompactSize)
|
||||
- 4.11gb written by split(HighL0OverlapTotalBacklog)
|
||||
- 4.32gb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))
|
||||
- 503mb written by split(ReduceOverlap)
|
||||
"###
|
||||
);
|
||||
}
|
||||
|
@ -11541,6 +11562,7 @@ async fn single_file_compaction() {
|
|||
.with_max_desired_file_size_bytes(MAX_DESIRED_FILE_SIZE)
|
||||
.with_partition_timeout(Duration::from_millis(1000))
|
||||
.with_suppress_run_output() // remove this to debug
|
||||
.with_writes_breakdown()
|
||||
.build()
|
||||
.await;
|
||||
|
||||
|
@ -11934,6 +11956,8 @@ async fn single_file_compaction() {
|
|||
- "L1.30[1681776002714783000,1681862268794595000] 1683039505.9s 192kb|-----------------------------------------L1.30------------------------------------------|"
|
||||
- "L2 "
|
||||
- "L2.1[1681776057065884000,1681848094846357000] 1681848108.8s 145kb|----------------------------------L2.1-----------------------------------| "
|
||||
- "**** Breakdown of where bytes were written"
|
||||
- 192kb written by compact(TotalSizeLessThanMaxCompactSize)
|
||||
"###
|
||||
);
|
||||
}
|
||||
|
@ -11951,6 +11975,7 @@ async fn split_then_undo_it() {
|
|||
.with_max_desired_file_size_bytes(MAX_DESIRED_FILE_SIZE)
|
||||
.with_partition_timeout(Duration::from_millis(1000))
|
||||
.with_suppress_run_output() // remove this to debug
|
||||
.with_writes_breakdown()
|
||||
.build()
|
||||
.await;
|
||||
|
||||
|
@ -12207,6 +12232,11 @@ async fn split_then_undo_it() {
|
|||
- "L2.61[1680046929099006065,1680047338160274709] 1681420678.89s 71mb |L2.61|"
|
||||
- "L2.62[1680047338160274710,1680047867631254942] 1681420678.89s 93mb |L2.62|"
|
||||
- "L2.63[1680047867631254943,1680047999999000000] 1681420678.89s 23mb |L2.63|"
|
||||
- "**** Breakdown of where bytes were written"
|
||||
- 1.17gb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))
|
||||
- 11mb written by split(ReduceOverlap)
|
||||
- 152mb written by split(CompactAndSplitOutput(TotalSizeLessThanMaxCompactSize))
|
||||
- 185mb written by split(HighL0OverlapTotalBacklog)
|
||||
"###
|
||||
);
|
||||
}
|
||||
|
@ -12226,6 +12256,7 @@ async fn split_precent_loop() {
|
|||
.with_percentage_max_file_size(5)
|
||||
.with_split_percentage(80)
|
||||
.with_suppress_run_output() // remove this to debug
|
||||
.with_writes_breakdown()
|
||||
.build()
|
||||
.await;
|
||||
|
||||
|
@ -12835,6 +12866,12 @@ async fn split_precent_loop() {
|
|||
- "L2.261[1676018715629485680,1676027900853158703] 1676066475.26s 100mb |-L2.261-| "
|
||||
- "L2.262[1676027900853158704,1676037086076831726] 1676066475.26s 100mb |-L2.262-| "
|
||||
- "L2.263[1676037086076831727,1676045833054395545] 1676066475.26s 95mb |L2.263-| "
|
||||
- "**** Breakdown of where bytes were written"
|
||||
- 1.46gb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))
|
||||
- 141mb written by split(CompactAndSplitOutput(TotalSizeLessThanMaxCompactSize))
|
||||
- 176mb written by split(ReduceOverlap)
|
||||
- 639mb written by compact(ManySmallFiles)
|
||||
- 724mb written by split(HighL0OverlapTotalBacklog)
|
||||
"###
|
||||
);
|
||||
}
|
||||
|
@ -12858,6 +12895,7 @@ async fn very_big_overlapped_backlog() {
|
|||
.with_max_desired_file_size_bytes(MAX_DESIRED_FILE_SIZE)
|
||||
.with_partition_timeout(Duration::from_millis(100000))
|
||||
.with_suppress_run_output() // remove this to debug
|
||||
.with_writes_breakdown()
|
||||
.build()
|
||||
.await;
|
||||
|
||||
|
@ -13621,6 +13659,12 @@ async fn very_big_overlapped_backlog() {
|
|||
- "L2.1150[197961,198942] 299ns 100mb |L2.1150|"
|
||||
- "L2.1151[198943,199923] 299ns 100mb |L2.1151|"
|
||||
- "L2.1152[199924,200000] 299ns 8mb |L2.1152|"
|
||||
- "**** Breakdown of where bytes were written"
|
||||
- 1000mb written by split(ReduceOverlap)
|
||||
- 38.21gb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))
|
||||
- 400mb written by compact(ManySmallFiles)
|
||||
- 780mb written by split(CompactAndSplitOutput(TotalSizeLessThanMaxCompactSize))
|
||||
- 860mb written by split(HighL0OverlapTotalBacklog)
|
||||
"###
|
||||
);
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ mod simulator;
|
|||
pub use display::{display_format, display_size, format_files, format_files_split};
|
||||
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
collections::{HashMap, HashSet},
|
||||
future::Future,
|
||||
num::NonZeroUsize,
|
||||
sync::{atomic::AtomicUsize, Arc, Mutex},
|
||||
|
@ -83,6 +83,11 @@ pub struct TestSetupBuilder<const WITH_FILES: bool> {
|
|||
invariant_check: Arc<dyn InvariantCheck>,
|
||||
/// A shared count of total bytes written during test
|
||||
bytes_written: Arc<AtomicUsize>,
|
||||
/// A shared count of the breakdown of where bytes were written
|
||||
bytes_written_per_plan: Arc<Mutex<HashMap<String, usize>>>,
|
||||
/// Suppresses showing detailed output of where bytes are written
|
||||
suppress_writes_breakdown: bool,
|
||||
/// Suppresses showing each 'run' (compact|split) output
|
||||
suppress_run_output: bool,
|
||||
}
|
||||
|
||||
|
@ -112,6 +117,7 @@ impl TestSetupBuilder<false> {
|
|||
});
|
||||
|
||||
let suppress_run_output = false;
|
||||
let suppress_writes_breakdown = true;
|
||||
|
||||
// Intercept all catalog commit calls to record them in
|
||||
// `run_log` as well as ensuring the invariants still hold
|
||||
|
@ -156,6 +162,8 @@ impl TestSetupBuilder<false> {
|
|||
};
|
||||
|
||||
let bytes_written = Arc::new(AtomicUsize::new(0));
|
||||
let bytes_written_per_plan: Arc<Mutex<HashMap<String, usize>>> =
|
||||
Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
Self {
|
||||
config,
|
||||
|
@ -167,6 +175,8 @@ impl TestSetupBuilder<false> {
|
|||
run_log,
|
||||
invariant_check,
|
||||
bytes_written,
|
||||
bytes_written_per_plan,
|
||||
suppress_writes_breakdown,
|
||||
suppress_run_output,
|
||||
}
|
||||
}
|
||||
|
@ -292,6 +302,8 @@ impl TestSetupBuilder<false> {
|
|||
invariant_check.check().await;
|
||||
|
||||
let bytes_written = Arc::new(AtomicUsize::new(0));
|
||||
let bytes_written_per_plan: Arc<Mutex<HashMap<String, usize>>> =
|
||||
Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
TestSetupBuilder::<true> {
|
||||
config: self.config,
|
||||
|
@ -303,6 +315,8 @@ impl TestSetupBuilder<false> {
|
|||
run_log: Arc::new(Mutex::new(vec![])),
|
||||
invariant_check,
|
||||
bytes_written,
|
||||
bytes_written_per_plan,
|
||||
suppress_writes_breakdown: true,
|
||||
suppress_run_output: false,
|
||||
}
|
||||
}
|
||||
|
@ -325,6 +339,8 @@ impl TestSetupBuilder<false> {
|
|||
invariant_check.check().await;
|
||||
|
||||
let bytes_written = Arc::new(AtomicUsize::new(0));
|
||||
let bytes_written_per_plan: Arc<Mutex<HashMap<String, usize>>> =
|
||||
Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
TestSetupBuilder::<true> {
|
||||
config: self.config.clone(),
|
||||
|
@ -336,6 +352,8 @@ impl TestSetupBuilder<false> {
|
|||
run_log: Arc::new(Mutex::new(vec![])),
|
||||
invariant_check,
|
||||
bytes_written,
|
||||
bytes_written_per_plan,
|
||||
suppress_writes_breakdown: true,
|
||||
suppress_run_output: false,
|
||||
}
|
||||
}
|
||||
|
@ -359,6 +377,8 @@ impl TestSetupBuilder<false> {
|
|||
invariant_check.check().await;
|
||||
|
||||
let bytes_written = Arc::new(AtomicUsize::new(0));
|
||||
let bytes_written_per_plan: Arc<Mutex<HashMap<String, usize>>> =
|
||||
Arc::new(Mutex::new(HashMap::new()));
|
||||
|
||||
TestSetupBuilder::<true> {
|
||||
config: self.config.clone(),
|
||||
|
@ -370,6 +390,8 @@ impl TestSetupBuilder<false> {
|
|||
run_log: Arc::new(Mutex::new(vec![])),
|
||||
invariant_check,
|
||||
bytes_written,
|
||||
bytes_written_per_plan,
|
||||
suppress_writes_breakdown: true,
|
||||
suppress_run_output: false,
|
||||
}
|
||||
}
|
||||
|
@ -530,14 +552,24 @@ impl<const WITH_FILES: bool> TestSetupBuilder<WITH_FILES> {
|
|||
self
|
||||
}
|
||||
|
||||
/// Set option to show detailed output of where bytes are written
|
||||
pub fn with_writes_breakdown(mut self) -> Self {
|
||||
self.suppress_writes_breakdown = false;
|
||||
self
|
||||
}
|
||||
|
||||
/// set simulate_without_object_store
|
||||
pub fn simulate_without_object_store(mut self) -> Self {
|
||||
let run_log = Arc::clone(&self.run_log);
|
||||
let bytes_written = Arc::clone(&self.bytes_written);
|
||||
let bytes_written_per_plan = Arc::clone(&self.bytes_written_per_plan);
|
||||
|
||||
self.config.simulate_without_object_store = true;
|
||||
self.config.parquet_files_sink_override =
|
||||
Some(Arc::new(ParquetFileSimulator::new(run_log, bytes_written)));
|
||||
self.config.parquet_files_sink_override = Some(Arc::new(ParquetFileSimulator::new(
|
||||
run_log,
|
||||
bytes_written,
|
||||
bytes_written_per_plan,
|
||||
)));
|
||||
self
|
||||
}
|
||||
|
||||
|
@ -587,7 +619,9 @@ impl<const WITH_FILES: bool> TestSetupBuilder<WITH_FILES> {
|
|||
config: Arc::new(self.config),
|
||||
run_log: self.run_log,
|
||||
bytes_written: self.bytes_written,
|
||||
bytes_written_per_plan: self.bytes_written_per_plan,
|
||||
invariant_check: self.invariant_check,
|
||||
suppress_writes_breakdown: self.suppress_writes_breakdown,
|
||||
suppress_run_output: self.suppress_run_output,
|
||||
}
|
||||
}
|
||||
|
@ -608,12 +642,16 @@ pub struct TestSetup {
|
|||
pub partition: Arc<TestPartition>,
|
||||
/// The compactor configuration
|
||||
pub config: Arc<Config>,
|
||||
/// allows optionally suppressing detailed output of where bytes are written
|
||||
pub suppress_writes_breakdown: bool,
|
||||
/// allows optionally suppressing output of running the test
|
||||
pub suppress_run_output: bool,
|
||||
/// a shared log of what happened during a simulated run
|
||||
run_log: Arc<Mutex<Vec<String>>>,
|
||||
/// A total of all bytes written during test.
|
||||
pub bytes_written: Arc<AtomicUsize>,
|
||||
/// A total of bytes written during test per operation.
|
||||
pub bytes_written_per_plan: Arc<Mutex<HashMap<String, usize>>>,
|
||||
/// Checker that catalog invariant are not violated
|
||||
invariant_check: Arc<dyn InvariantCheck>,
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use std::{
|
||||
collections::BTreeSet,
|
||||
collections::{BTreeSet, HashMap},
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc, Mutex,
|
||||
|
@ -51,6 +51,8 @@ pub struct ParquetFileSimulator {
|
|||
run_id_generator: AtomicUsize,
|
||||
/// Used to track total bytes written (to help judge efficiency changes)
|
||||
bytes_written: Arc<AtomicUsize>,
|
||||
/// map of bytes written per plan type
|
||||
bytes_written_per_plan: Arc<Mutex<HashMap<String, usize>>>,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ParquetFileSimulator {
|
||||
|
@ -62,11 +64,16 @@ impl std::fmt::Display for ParquetFileSimulator {
|
|||
impl ParquetFileSimulator {
|
||||
/// Create a new simulator for creating parquet files, which
|
||||
/// appends its output to `run_log`
|
||||
pub fn new(run_log: Arc<Mutex<Vec<String>>>, bytes_written: Arc<AtomicUsize>) -> Self {
|
||||
pub fn new(
|
||||
run_log: Arc<Mutex<Vec<String>>>,
|
||||
bytes_written: Arc<AtomicUsize>,
|
||||
bytes_written_per_plan: Arc<Mutex<HashMap<String, usize>>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
run_log,
|
||||
run_id_generator: AtomicUsize::new(0),
|
||||
bytes_written,
|
||||
bytes_written_per_plan,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -143,6 +150,13 @@ impl ParquetFilesSink for ParquetFileSimulator {
|
|||
self.bytes_written
|
||||
.fetch_add(bytes_written as usize, Ordering::Relaxed);
|
||||
|
||||
self.bytes_written_per_plan
|
||||
.lock()
|
||||
.unwrap()
|
||||
.entry(plan_ir.to_string())
|
||||
.and_modify(|e| *e += bytes_written as usize)
|
||||
.or_insert(bytes_written as usize);
|
||||
|
||||
Ok(output_params)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue