From 1cc0926a7f52c7138be991df6dd60d11c6549df6 Mon Sep 17 00:00:00 2001 From: Joe-Blount <73478756+Joe-Blount@users.noreply.github.com> Date: Wed, 16 Aug 2023 08:39:37 -0500 Subject: [PATCH] feat: track why bytes are written in compactor simulator (#8493) * feat: add tracking of why bytes are written in simulator * chore: enable breakdown of why bytes are written in a few larger tests * chore: enable writes breakdown in another test --------- Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- compactor/tests/layouts/backfill.rs | 14 ++++++++ compactor/tests/layouts/mod.rs | 14 ++++++++ compactor/tests/layouts/stuck.rs | 46 ++++++++++++++++++++++++++- compactor_test_utils/src/lib.rs | 44 +++++++++++++++++++++++-- compactor_test_utils/src/simulator.rs | 18 +++++++++-- 5 files changed, 130 insertions(+), 6 deletions(-) diff --git a/compactor/tests/layouts/backfill.rs b/compactor/tests/layouts/backfill.rs index 6c044a7211..3cef1ee3b1 100644 --- a/compactor/tests/layouts/backfill.rs +++ b/compactor/tests/layouts/backfill.rs @@ -470,6 +470,7 @@ async fn random_backfill_over_l2s() { .with_max_num_files_per_plan(10) .with_max_desired_file_size_bytes(MAX_DESIRED_FILE_SIZE) .with_partition_timeout(Duration::from_secs(10)) + .with_writes_breakdown() .build() .await; @@ -1021,6 +1022,12 @@ async fn random_backfill_over_l2s() { - "L2.135[765,799] 1.05us 55mb |L2.135| " - "L2.136[800,864] 1.05us 102mb |L2.136| " - "L2.137[865,899] 1.05us 55mb |L2.137| " + - "**** Breakdown of where bytes were written" + - 1.84gb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize)) + - 500mb written by compact(ManySmallFiles) + - 500mb written by split(HighL0OverlapSingleFile) + - 597mb written by split(CompactAndSplitOutput(TotalSizeLessThanMaxCompactSize)) + - 665mb written by split(ReduceOverlap) "### ); } @@ -1040,6 +1047,7 @@ async fn actual_case_from_catalog_1() { .with_max_desired_file_size_bytes(MAX_DESIRED_FILE_SIZE) .with_max_num_files_per_plan(20) .with_suppress_run_output() + .with_writes_breakdown() .with_partition_timeout(Duration::from_secs(10)) .build() .await; @@ -3084,6 +3092,12 @@ async fn actual_case_from_catalog_1() { - "L2.664[182,182] 342ns 19mb |L2.664| " - "L2.665[288,293] 342ns 114mb |L2.665| " - "L2.666[294,296] 342ns 57mb |L2.666| " + - "**** Breakdown of where bytes were written" + - 2.95gb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize)) + - 455mb written by split(ReduceOverlap) + - 456mb written by split(CompactAndSplitOutput(TotalSizeLessThanMaxCompactSize)) + - 5.48gb written by compact(ManySmallFiles) + - 6.01gb written by split(HighL0OverlapTotalBacklog) - "WARNING: file L2.598[150,165] 342ns 224mb exceeds soft limit 100mb by more than 50%" - "WARNING: file L2.602[183,197] 342ns 267mb exceeds soft limit 100mb by more than 50%" - "WARNING: file L2.603[198,207] 342ns 157mb exceeds soft limit 100mb by more than 50%" diff --git a/compactor/tests/layouts/mod.rs b/compactor/tests/layouts/mod.rs index 5fd1817e08..a30eb9c391 100644 --- a/compactor/tests/layouts/mod.rs +++ b/compactor/tests/layouts/mod.rs @@ -148,6 +148,20 @@ pub(crate) async fn run_layout_scenario(setup: &TestSetup) -> Vec { &sort_files(output_files), )); + if !setup.suppress_writes_breakdown { + output.extend(vec![ + "**** Breakdown of where bytes were written".to_string() + ]); + + let mut breakdown = Vec::new(); + for (op, written) in setup.bytes_written_per_plan.lock().unwrap().iter() { + let written = *written as i64; + breakdown.push(format!("{} written by {}", display_size(written), op)); + } + breakdown.sort(); + output.extend(breakdown); + } + // verify that the output of the compactor was valid as well setup.verify_invariants().await; diff --git a/compactor/tests/layouts/stuck.rs b/compactor/tests/layouts/stuck.rs index 9781454d26..b78c7df9df 100644 --- a/compactor/tests/layouts/stuck.rs +++ b/compactor/tests/layouts/stuck.rs @@ -19,8 +19,9 @@ async fn stuck_l0() { .await .with_max_num_files_per_plan(20) .with_max_desired_file_size_bytes(MAX_DESIRED_FILE_SIZE) - .with_partition_timeout(Duration::from_millis(10000)) + .with_partition_timeout(Duration::from_millis(100000)) .with_suppress_run_output() // remove this to debug + .with_writes_breakdown() .build() .await; @@ -1189,6 +1190,13 @@ async fn stuck_l0() { - "L2.1790[1686872478395764410,1686872728189189161] 1686936871.55s 51mb |L2.1790|" - "L2.1791[1686872728189189162,1686873217700530422] 1686936871.55s 100mb |L2.1791|" - "L2.1792[1686873217700530423,1686873599000000000] 1686936871.55s 78mb |L2.1792|" + - "**** Breakdown of where bytes were written" + - 1.66gb written by split(CompactAndSplitOutput(TotalSizeLessThanMaxCompactSize)) + - 18.49gb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize)) + - 5.22gb written by compact(ManySmallFiles) + - 524mb written by split(StartLevelOverlapsTooBig) + - 7.29gb written by split(ReduceOverlap) + - 8.26gb written by split(HighL0OverlapTotalBacklog) "### ); } @@ -1202,6 +1210,7 @@ async fn stuck_l1() { .with_max_num_files_per_plan(20) .with_max_desired_file_size_bytes(MAX_DESIRED_FILE_SIZE) .with_partition_timeout(Duration::from_millis(100)) + .with_writes_breakdown() .build() .await; @@ -1424,6 +1433,9 @@ async fn stuck_l1() { - "L2.23[1686873630000000000,1686879262359644750] 1686928116.94s 100mb|-L2.23-| " - "L2.24[1686879262359644751,1686884894719289500] 1686928116.94s 100mb |-L2.24-| " - "L2.25[1686884894719289501,1686888172000000000] 1686928116.94s 58mb |L2.25| " + - "**** Breakdown of where bytes were written" + - 258mb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize)) + - 71mb written by split(ReduceOverlap) "### ); // TODO(maybe): see matching comment in files_to_compact.rs/limit_files_to_compact @@ -1445,6 +1457,7 @@ async fn stuck_l0_large_l0s() { .with_max_desired_file_size_bytes(MAX_DESIRED_FILE_SIZE) .with_partition_timeout(Duration::from_millis(100000)) //.with_suppress_run_output() // remove this to debug + .with_writes_breakdown() .build() .await; @@ -11523,6 +11536,14 @@ async fn stuck_l0_large_l0s() { - "L2.3516[1682,723271] 199ns 100mb|-----------L2.3516------------| " - "L2.3517[723272,1444860] 199ns 100mb |-----------L2.3517------------| " - "L2.3518[1444861,1990000] 199ns 76mb |-------L2.3518--------| " + - "**** Breakdown of where bytes were written" + - 1.43gb written by split(CompactAndSplitOutput(TotalSizeLessThanMaxCompactSize)) + - 132b written by compact(FoundSubsetLessThanMaxCompactSize) + - 2.14gb written by compact(ManySmallFiles) + - 2kb written by compact(TotalSizeLessThanMaxCompactSize) + - 4.11gb written by split(HighL0OverlapTotalBacklog) + - 4.32gb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize)) + - 503mb written by split(ReduceOverlap) "### ); } @@ -11541,6 +11562,7 @@ async fn single_file_compaction() { .with_max_desired_file_size_bytes(MAX_DESIRED_FILE_SIZE) .with_partition_timeout(Duration::from_millis(1000)) .with_suppress_run_output() // remove this to debug + .with_writes_breakdown() .build() .await; @@ -11934,6 +11956,8 @@ async fn single_file_compaction() { - "L1.30[1681776002714783000,1681862268794595000] 1683039505.9s 192kb|-----------------------------------------L1.30------------------------------------------|" - "L2 " - "L2.1[1681776057065884000,1681848094846357000] 1681848108.8s 145kb|----------------------------------L2.1-----------------------------------| " + - "**** Breakdown of where bytes were written" + - 192kb written by compact(TotalSizeLessThanMaxCompactSize) "### ); } @@ -11951,6 +11975,7 @@ async fn split_then_undo_it() { .with_max_desired_file_size_bytes(MAX_DESIRED_FILE_SIZE) .with_partition_timeout(Duration::from_millis(1000)) .with_suppress_run_output() // remove this to debug + .with_writes_breakdown() .build() .await; @@ -12207,6 +12232,11 @@ async fn split_then_undo_it() { - "L2.61[1680046929099006065,1680047338160274709] 1681420678.89s 71mb |L2.61|" - "L2.62[1680047338160274710,1680047867631254942] 1681420678.89s 93mb |L2.62|" - "L2.63[1680047867631254943,1680047999999000000] 1681420678.89s 23mb |L2.63|" + - "**** Breakdown of where bytes were written" + - 1.17gb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize)) + - 11mb written by split(ReduceOverlap) + - 152mb written by split(CompactAndSplitOutput(TotalSizeLessThanMaxCompactSize)) + - 185mb written by split(HighL0OverlapTotalBacklog) "### ); } @@ -12226,6 +12256,7 @@ async fn split_precent_loop() { .with_percentage_max_file_size(5) .with_split_percentage(80) .with_suppress_run_output() // remove this to debug + .with_writes_breakdown() .build() .await; @@ -12835,6 +12866,12 @@ async fn split_precent_loop() { - "L2.261[1676018715629485680,1676027900853158703] 1676066475.26s 100mb |-L2.261-| " - "L2.262[1676027900853158704,1676037086076831726] 1676066475.26s 100mb |-L2.262-| " - "L2.263[1676037086076831727,1676045833054395545] 1676066475.26s 95mb |L2.263-| " + - "**** Breakdown of where bytes were written" + - 1.46gb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize)) + - 141mb written by split(CompactAndSplitOutput(TotalSizeLessThanMaxCompactSize)) + - 176mb written by split(ReduceOverlap) + - 639mb written by compact(ManySmallFiles) + - 724mb written by split(HighL0OverlapTotalBacklog) "### ); } @@ -12858,6 +12895,7 @@ async fn very_big_overlapped_backlog() { .with_max_desired_file_size_bytes(MAX_DESIRED_FILE_SIZE) .with_partition_timeout(Duration::from_millis(100000)) .with_suppress_run_output() // remove this to debug + .with_writes_breakdown() .build() .await; @@ -13621,6 +13659,12 @@ async fn very_big_overlapped_backlog() { - "L2.1150[197961,198942] 299ns 100mb |L2.1150|" - "L2.1151[198943,199923] 299ns 100mb |L2.1151|" - "L2.1152[199924,200000] 299ns 8mb |L2.1152|" + - "**** Breakdown of where bytes were written" + - 1000mb written by split(ReduceOverlap) + - 38.21gb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize)) + - 400mb written by compact(ManySmallFiles) + - 780mb written by split(CompactAndSplitOutput(TotalSizeLessThanMaxCompactSize)) + - 860mb written by split(HighL0OverlapTotalBacklog) "### ); } diff --git a/compactor_test_utils/src/lib.rs b/compactor_test_utils/src/lib.rs index 886553e38c..4e954bb24e 100644 --- a/compactor_test_utils/src/lib.rs +++ b/compactor_test_utils/src/lib.rs @@ -24,7 +24,7 @@ mod simulator; pub use display::{display_format, display_size, format_files, format_files_split}; use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, future::Future, num::NonZeroUsize, sync::{atomic::AtomicUsize, Arc, Mutex}, @@ -83,6 +83,11 @@ pub struct TestSetupBuilder { invariant_check: Arc, /// A shared count of total bytes written during test bytes_written: Arc, + /// A shared count of the breakdown of where bytes were written + bytes_written_per_plan: Arc>>, + /// Suppresses showing detailed output of where bytes are written + suppress_writes_breakdown: bool, + /// Suppresses showing each 'run' (compact|split) output suppress_run_output: bool, } @@ -112,6 +117,7 @@ impl TestSetupBuilder { }); let suppress_run_output = false; + let suppress_writes_breakdown = true; // Intercept all catalog commit calls to record them in // `run_log` as well as ensuring the invariants still hold @@ -156,6 +162,8 @@ impl TestSetupBuilder { }; let bytes_written = Arc::new(AtomicUsize::new(0)); + let bytes_written_per_plan: Arc>> = + Arc::new(Mutex::new(HashMap::new())); Self { config, @@ -167,6 +175,8 @@ impl TestSetupBuilder { run_log, invariant_check, bytes_written, + bytes_written_per_plan, + suppress_writes_breakdown, suppress_run_output, } } @@ -292,6 +302,8 @@ impl TestSetupBuilder { invariant_check.check().await; let bytes_written = Arc::new(AtomicUsize::new(0)); + let bytes_written_per_plan: Arc>> = + Arc::new(Mutex::new(HashMap::new())); TestSetupBuilder:: { config: self.config, @@ -303,6 +315,8 @@ impl TestSetupBuilder { run_log: Arc::new(Mutex::new(vec![])), invariant_check, bytes_written, + bytes_written_per_plan, + suppress_writes_breakdown: true, suppress_run_output: false, } } @@ -325,6 +339,8 @@ impl TestSetupBuilder { invariant_check.check().await; let bytes_written = Arc::new(AtomicUsize::new(0)); + let bytes_written_per_plan: Arc>> = + Arc::new(Mutex::new(HashMap::new())); TestSetupBuilder:: { config: self.config.clone(), @@ -336,6 +352,8 @@ impl TestSetupBuilder { run_log: Arc::new(Mutex::new(vec![])), invariant_check, bytes_written, + bytes_written_per_plan, + suppress_writes_breakdown: true, suppress_run_output: false, } } @@ -359,6 +377,8 @@ impl TestSetupBuilder { invariant_check.check().await; let bytes_written = Arc::new(AtomicUsize::new(0)); + let bytes_written_per_plan: Arc>> = + Arc::new(Mutex::new(HashMap::new())); TestSetupBuilder:: { config: self.config.clone(), @@ -370,6 +390,8 @@ impl TestSetupBuilder { run_log: Arc::new(Mutex::new(vec![])), invariant_check, bytes_written, + bytes_written_per_plan, + suppress_writes_breakdown: true, suppress_run_output: false, } } @@ -530,14 +552,24 @@ impl TestSetupBuilder { self } + /// Set option to show detailed output of where bytes are written + pub fn with_writes_breakdown(mut self) -> Self { + self.suppress_writes_breakdown = false; + self + } + /// set simulate_without_object_store pub fn simulate_without_object_store(mut self) -> Self { let run_log = Arc::clone(&self.run_log); let bytes_written = Arc::clone(&self.bytes_written); + let bytes_written_per_plan = Arc::clone(&self.bytes_written_per_plan); self.config.simulate_without_object_store = true; - self.config.parquet_files_sink_override = - Some(Arc::new(ParquetFileSimulator::new(run_log, bytes_written))); + self.config.parquet_files_sink_override = Some(Arc::new(ParquetFileSimulator::new( + run_log, + bytes_written, + bytes_written_per_plan, + ))); self } @@ -587,7 +619,9 @@ impl TestSetupBuilder { config: Arc::new(self.config), run_log: self.run_log, bytes_written: self.bytes_written, + bytes_written_per_plan: self.bytes_written_per_plan, invariant_check: self.invariant_check, + suppress_writes_breakdown: self.suppress_writes_breakdown, suppress_run_output: self.suppress_run_output, } } @@ -608,12 +642,16 @@ pub struct TestSetup { pub partition: Arc, /// The compactor configuration pub config: Arc, + /// allows optionally suppressing detailed output of where bytes are written + pub suppress_writes_breakdown: bool, /// allows optionally suppressing output of running the test pub suppress_run_output: bool, /// a shared log of what happened during a simulated run run_log: Arc>>, /// A total of all bytes written during test. pub bytes_written: Arc, + /// A total of bytes written during test per operation. + pub bytes_written_per_plan: Arc>>, /// Checker that catalog invariant are not violated invariant_check: Arc, } diff --git a/compactor_test_utils/src/simulator.rs b/compactor_test_utils/src/simulator.rs index 5fc9536b7f..a39922ed47 100644 --- a/compactor_test_utils/src/simulator.rs +++ b/compactor_test_utils/src/simulator.rs @@ -1,5 +1,5 @@ use std::{ - collections::BTreeSet, + collections::{BTreeSet, HashMap}, sync::{ atomic::{AtomicUsize, Ordering}, Arc, Mutex, @@ -51,6 +51,8 @@ pub struct ParquetFileSimulator { run_id_generator: AtomicUsize, /// Used to track total bytes written (to help judge efficiency changes) bytes_written: Arc, + /// map of bytes written per plan type + bytes_written_per_plan: Arc>>, } impl std::fmt::Display for ParquetFileSimulator { @@ -62,11 +64,16 @@ impl std::fmt::Display for ParquetFileSimulator { impl ParquetFileSimulator { /// Create a new simulator for creating parquet files, which /// appends its output to `run_log` - pub fn new(run_log: Arc>>, bytes_written: Arc) -> Self { + pub fn new( + run_log: Arc>>, + bytes_written: Arc, + bytes_written_per_plan: Arc>>, + ) -> Self { Self { run_log, run_id_generator: AtomicUsize::new(0), bytes_written, + bytes_written_per_plan, } } @@ -143,6 +150,13 @@ impl ParquetFilesSink for ParquetFileSimulator { self.bytes_written .fetch_add(bytes_written as usize, Ordering::Relaxed); + self.bytes_written_per_plan + .lock() + .unwrap() + .entry(plan_ir.to_string()) + .and_modify(|e| *e += bytes_written as usize) + .or_insert(bytes_written as usize); + Ok(output_params) }