diff --git a/compactor2/src/components/commit/mod.rs b/compactor2/src/components/commit/mod.rs index b72908b358..de777ccefd 100644 --- a/compactor2/src/components/commit/mod.rs +++ b/compactor2/src/components/commit/mod.rs @@ -30,6 +30,14 @@ pub trait Commit: Debug + Display + Send + Sync { ) -> Vec; } +/// Something that can wrap `Commit` instances +/// +/// Use to install test observation +pub trait CommitWrapper: Debug + Send + Sync { + /// a function to call that wraps a [`Commit`] instance + fn wrap(&self, commit: Arc) -> Arc; +} + #[async_trait] impl Commit for Arc where diff --git a/compactor2/src/components/hardcoded.rs b/compactor2/src/components/hardcoded.rs index 19be5858f9..6158fc9e00 100644 --- a/compactor2/src/components/hardcoded.rs +++ b/compactor2/src/components/hardcoded.rs @@ -156,6 +156,12 @@ pub fn hardcoded_components(config: &Config) -> Arc { )) }; + let commit = if let Some(commit_wrapper) = config.commit_wrapper.as_ref() { + commit_wrapper.wrap(commit) + } else { + commit + }; + let scratchpad_store_output = if config.shadow_mode { Arc::new(IgnoreWrites::new(Arc::new(InMemory::new()))) } else { diff --git a/compactor2/src/components/report.rs b/compactor2/src/components/report.rs index 27c714fd96..0828e046c9 100644 --- a/compactor2/src/components/report.rs +++ b/compactor2/src/components/report.rs @@ -34,7 +34,8 @@ pub fn log_config(config: &Config) { shard_config, min_num_l1_files_to_compact, process_once, - parquet_files_sink_override: parquet_files_sink, + parquet_files_sink_override, + commit_wrapper, simulate_without_object_store, all_errors_are_fatal, max_num_columns_per_table, @@ -50,10 +51,13 @@ pub fn log_config(config: &Config) { } }; - let parquet_files_sink = parquet_files_sink + let parquet_files_sink_override = parquet_files_sink_override .as_ref() .map(|_| "Some") .unwrap_or("None"); + + let commit_wrapper = commit_wrapper.as_ref().map(|_| "Some").unwrap_or("None"); + info!( shard_id=shard_id.get(), %catalog, @@ -79,7 +83,8 @@ pub fn log_config(config: &Config) { min_num_l1_files_to_compact, process_once, simulate_without_object_store, - %parquet_files_sink, + %parquet_files_sink_override, + %commit_wrapper, all_errors_are_fatal, max_num_columns_per_table, max_num_files_per_plan, diff --git a/compactor2/src/config.rs b/compactor2/src/config.rs index 0dbc19b10b..c50d616d79 100644 --- a/compactor2/src/config.rs +++ b/compactor2/src/config.rs @@ -8,7 +8,7 @@ use iox_query::exec::Executor; use iox_time::TimeProvider; use parquet_file::storage::ParquetStorage; -use crate::components::parquet_files_sink::ParquetFilesSink; +use crate::components::{commit::CommitWrapper, parquet_files_sink::ParquetFilesSink}; /// Config to set up a compactor. #[derive(Debug, Clone)] @@ -113,9 +113,15 @@ pub struct Config { /// This is useful for testing. pub simulate_without_object_store: bool, - /// Use the provided [`ParquetFilesSink`] to create parquet files (used for testing) + /// Use the provided [`ParquetFilesSink`] to create parquet files + /// (used for testing) pub parquet_files_sink_override: Option>, + /// Optionally wrap the `Commit` instance + /// + /// This is mostly used for testing + pub commit_wrapper: Option>, + /// Ensure that ALL errors (including object store errors) result in "skipped" partitions. /// /// This is mostly useful for testing. diff --git a/compactor2/src/lib.rs b/compactor2/src/lib.rs index 32e62be246..1b5bb9818f 100644 --- a/compactor2/src/lib.rs +++ b/compactor2/src/lib.rs @@ -179,8 +179,12 @@ mod round_info; // publically expose items needed for testing pub use components::{ - df_planner::panic::PanicDataFusionPlanner, hardcoded::hardcoded_components, - namespaces_source::mock::NamespaceWrapper, parquet_files_sink::ParquetFilesSink, Components, + commit::{Commit, CommitWrapper}, + df_planner::panic::PanicDataFusionPlanner, + hardcoded::hardcoded_components, + namespaces_source::mock::NamespaceWrapper, + parquet_files_sink::ParquetFilesSink, + Components, }; pub use driver::compact; pub use error::DynError; diff --git a/compactor2/tests/layouts/core.rs b/compactor2/tests/layouts/core.rs index 5f354f809a..5b0dcb6c4e 100644 --- a/compactor2/tests/layouts/core.rs +++ b/compactor2/tests/layouts/core.rs @@ -47,6 +47,9 @@ async fn all_overlapping_l0() { - "L1 " - "L1.?[100,180] 72mb |-----------------------------L1.?-----------------------------| " - "L1.?[180,200] 18mb |-----L1.?-----|" + - "Committing partition 1:" + - " Soft Deleting 10 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10" + - " Creating 2 files at level CompactionLevel::L1" - "**** Final Output Files " - "L1 " - "L1.11[100,180] 72mb |----------------------------L1.11-----------------------------| " @@ -106,6 +109,9 @@ async fn all_non_overlapping_l0() { - "L1 " - "L1.?[0,720] 79.91mb |----------------------------L1.?-----------------------------| " - "L1.?[720,901] 20.09mb |-----L1.?-----| " + - "Committing partition 1:" + - " Soft Deleting 10 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10" + - " Creating 2 files at level CompactionLevel::L1" - "**** Final Output Files " - "L1 " - "L1.11[0,720] 79.91mb|----------------------------L1.11----------------------------| " @@ -178,6 +184,9 @@ async fn l1_with_overlapping_l0() { - "**** 1 Output Files (parquet_file_id not yet assigned), 10.02mb total:" - "L1, all files 10.02mb " - "L1.?[100,310] |-------------------------------------L1.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 6 files: L1.2, L0.3, L0.4, L0.5, L0.6, L0.7" + - " Creating 1 files at level CompactionLevel::L1" - "**** Final Output Files " - "L1 " - "L1.1[50,100] 10mb |----L1.1-----| " @@ -246,6 +255,9 @@ async fn l1_with_non_overlapping_l0() { - "**** 1 Output Files (parquet_file_id not yet assigned), 25kb total:" - "L1, all files 25kb " - "L1.?[300,550] |-------------------------------------L1.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 5 files: L0.3, L0.4, L0.5, L0.6, L0.7" + - " Creating 1 files at level CompactionLevel::L1" - "**** Final Output Files " - "L1 " - "L1.1[50,100] 10mb |-L1.1-| " @@ -313,6 +325,9 @@ async fn l1_with_non_overlapping_l0_larger() { - "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:" - "L1, all files 15mb " - "L1.?[300,450] |-------------------------------------L1.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 3 files: L0.5, L0.6, L0.7" + - " Creating 1 files at level CompactionLevel::L1" - "**** Simulation run 1, type=split(split_times=[370]). 5 Input Files, 108mb total:" - "L1 " - "L1.4[200,250] 3mb |--L1.4--| " @@ -324,6 +339,9 @@ async fn l1_with_non_overlapping_l0_larger() { - "L2 " - "L2.?[50,370] 86.4mb |-----------------------------L2.?-----------------------------| " - "L2.?[370,450] 21.6mb |-----L2.?-----|" + - "Committing partition 1:" + - " Soft Deleting 5 files: L1.1, L1.2, L1.3, L1.4, L1.8" + - " Creating 2 files at level CompactionLevel::L2" - "**** Final Output Files " - "L2 " - "L2.9[50,370] 86.4mb |-----------------------------L2.9-----------------------------| " @@ -399,6 +417,9 @@ async fn l1_too_much_with_non_overlapping_l0() { - "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:" - "L1, all files 15mb " - "L1.?[600,650] |-------------------------------------L1.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 3 files: L0.11, L0.12, L0.13" + - " Creating 1 files at level CompactionLevel::L1" - "SKIPPED COMPACTION for PartitionId(1): partition 1 has 781189120 parquet file bytes, limit is 268435456" - "**** Final Output Files " - "L1 " @@ -483,6 +504,9 @@ async fn many_l1_with_non_overlapping_l0() { - "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:" - "L1, all files 15mb " - "L1.?[600,650] |-------------------------------------L1.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 3 files: L0.11, L0.12, L0.13" + - " Creating 1 files at level CompactionLevel::L1" - "**** Simulation run 1, type=split(split_times=[530]). 11 Input Files, 88mb total:" - "L1 " - "L1.10[500,550] 7mb |L1.10| " @@ -500,6 +524,9 @@ async fn many_l1_with_non_overlapping_l0() { - "L2 " - "L2.?[50,530] 70.4mb |-----------------------------L2.?-----------------------------| " - "L2.?[530,650] 17.6mb |-----L2.?-----|" + - "Committing partition 1:" + - " Soft Deleting 11 files: L1.1, L1.2, L1.3, L1.4, L1.5, L1.6, L1.7, L1.8, L1.9, L1.10, L1.14" + - " Creating 2 files at level CompactionLevel::L2" - "**** Final Output Files " - "L2 " - "L2.15[50,530] 70.4mb|----------------------------L2.15-----------------------------| " @@ -566,6 +593,9 @@ async fn large_l1_with_non_overlapping_l0() { - "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:" - "L1, all files 15mb " - "L1.?[600,650] |-------------------------------------L1.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 3 files: L0.3, L0.4, L0.5" + - " Creating 1 files at level CompactionLevel::L1" - "**** Simulation run 1, type=split(split_times=[375]). 3 Input Files, 185mb total:" - "L1 " - "L1.2[100,150] 80mb |L1.2| " @@ -575,6 +605,9 @@ async fn large_l1_with_non_overlapping_l0() { - "L2 " - "L2.?[50,375] 100.21mb|------------------L2.?-------------------| " - "L2.?[375,650] 84.79mb |---------------L2.?---------------| " + - "Committing partition 1:" + - " Soft Deleting 3 files: L1.1, L1.2, L1.6" + - " Creating 2 files at level CompactionLevel::L2" - "**** Final Output Files " - "L2 " - "L2.7[50,375] 100.21mb|------------------L2.7-------------------| " diff --git a/compactor2/tests/layouts/knobs.rs b/compactor2/tests/layouts/knobs.rs index 873ffcafe1..d30a50844d 100644 --- a/compactor2/tests/layouts/knobs.rs +++ b/compactor2/tests/layouts/knobs.rs @@ -57,6 +57,9 @@ async fn all_overlapping_l0_split_percentage() { - "L1 " - "L1.?[100,195] 85.5mb|-----------------------------------L1.?-----------------------------------| " - "L1.?[195,200] 4.5mb |L1.?|" + - "Committing partition 1:" + - " Soft Deleting 10 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10" + - " Creating 2 files at level CompactionLevel::L1" - "**** Final Output Files " - "L1 " - "L1.11[100,195] 85.5mb|----------------------------------L1.11-----------------------------------| " @@ -118,6 +121,9 @@ async fn all_overlapping_l0_max_file_size() { - "**** 1 Output Files (parquet_file_id not yet assigned), 90mb total:" - "L1, all files 90mb " - "L1.?[100,200] |-------------------------------------L1.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 10 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10" + - " Creating 1 files at level CompactionLevel::L1" - "**** Final Output Files " - "L1, all files 90mb " - "L1.11[100,200] |------------------------------------L1.11-------------------------------------|" @@ -186,6 +192,9 @@ async fn all_overlapping_l0_split_percentage_and_max_file_size() { - "L1.?[172,184] 10.8mb |-L1.?--| " - "L1.?[184,196] 10.8mb |-L1.?--| " - "L1.?[196,200] 3.6mb |L1.?|" + - "Committing partition 1:" + - " Soft Deleting 10 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10" + - " Creating 9 files at level CompactionLevel::L1" - "**** Simulation run 1, type=split(split_times=[112, 124, 136, 148, 160, 172, 184, 196]). 9 Input Files, 90mb total:" - "L1 " - "L1.19[196,200] 3.6mb |L1.19|" @@ -208,6 +217,9 @@ async fn all_overlapping_l0_split_percentage_and_max_file_size() { - "L2.?[172,184] 10.8mb |-L2.?--| " - "L2.?[184,196] 10.8mb |-L2.?--| " - "L2.?[196,200] 3.6mb |L2.?|" + - "Committing partition 1:" + - " Soft Deleting 9 files: L1.11, L1.12, L1.13, L1.14, L1.15, L1.16, L1.17, L1.18, L1.19" + - " Creating 9 files at level CompactionLevel::L2" - "**** Final Output Files " - "L2 " - "L2.20[100,112] 10.8mb|-L2.20-| " diff --git a/compactor2/tests/layouts/many_files.rs b/compactor2/tests/layouts/many_files.rs index 15f409134e..1970297f40 100644 --- a/compactor2/tests/layouts/many_files.rs +++ b/compactor2/tests/layouts/many_files.rs @@ -76,6 +76,9 @@ async fn many_l1_files() { - "**** 1 Output Files (parquet_file_id not yet assigned), 3mb total:" - "L1, all files 3mb " - "L1.?[24,25] |-------------------------------------L1.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 3 files: L0.21, L0.22, L0.23" + - " Creating 1 files at level CompactionLevel::L1" - "**** Simulation run 1, type=split(split_times=[13]). 21 Input Files, 203mb total:" - "L1 " - "L1.20[19,20] 10mb |L1.20| " @@ -103,6 +106,9 @@ async fn many_l1_files() { - "L2 " - "L2.?[0,13] 105.56mb |-----------------L2.?------------------| " - "L2.?[13,25] 97.44mb |----------------L2.?----------------| " + - "Committing partition 1:" + - " Soft Deleting 21 files: L1.1, L1.2, L1.3, L1.4, L1.5, L1.6, L1.7, L1.8, L1.9, L1.10, L1.11, L1.12, L1.13, L1.14, L1.15, L1.16, L1.17, L1.18, L1.19, L1.20, L1.24" + - " Creating 2 files at level CompactionLevel::L2" - "**** Final Output Files " - "L2 " - "L2.25[0,13] 105.56mb|-----------------L2.25-----------------| " @@ -639,6 +645,9 @@ async fn many_tiny_l0_files() { - "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:" - "L0, all files 1.37mb " - "L0.?[0,200] |-------------------------------------L0.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 200 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10, L0.11, L0.12, L0.13, L0.14, L0.15, L0.16, L0.17, L0.18, L0.19, L0.20, L0.21, L0.22, L0.23, L0.24, L0.25, L0.26, L0.27, L0.28, L0.29, L0.30, L0.31, L0.32, L0.33, L0.34, L0.35, L0.36, L0.37, L0.38, L0.39, L0.40, L0.41, L0.42, L0.43, L0.44, L0.45, L0.46, L0.47, L0.48, L0.49, L0.50, L0.51, L0.52, L0.53, L0.54, L0.55, L0.56, L0.57, L0.58, L0.59, L0.60, L0.61, L0.62, L0.63, L0.64, L0.65, L0.66, L0.67, L0.68, L0.69, L0.70, L0.71, L0.72, L0.73, L0.74, L0.75, L0.76, L0.77, L0.78, L0.79, L0.80, L0.81, L0.82, L0.83, L0.84, L0.85, L0.86, L0.87, L0.88, L0.89, L0.90, L0.91, L0.92, L0.93, L0.94, L0.95, L0.96, L0.97, L0.98, L0.99, L0.100, L0.101, L0.102, L0.103, L0.104, L0.105, L0.106, L0.107, L0.108, L0.109, L0.110, L0.111, L0.112, L0.113, L0.114, L0.115, L0.116, L0.117, L0.118, L0.119, L0.120, L0.121, L0.122, L0.123, L0.124, L0.125, L0.126, L0.127, L0.128, L0.129, L0.130, L0.131, L0.132, L0.133, L0.134, L0.135, L0.136, L0.137, L0.138, L0.139, L0.140, L0.141, L0.142, L0.143, L0.144, L0.145, L0.146, L0.147, L0.148, L0.149, L0.150, L0.151, L0.152, L0.153, L0.154, L0.155, L0.156, L0.157, L0.158, L0.159, L0.160, L0.161, L0.162, L0.163, L0.164, L0.165, L0.166, L0.167, L0.168, L0.169, L0.170, L0.171, L0.172, L0.173, L0.174, L0.175, L0.176, L0.177, L0.178, L0.179, L0.180, L0.181, L0.182, L0.183, L0.184, L0.185, L0.186, L0.187, L0.188, L0.189, L0.190, L0.191, L0.192, L0.193, L0.194, L0.195, L0.196, L0.197, L0.198, L0.199, L0.200" + - " Creating 1 files at level CompactionLevel::L0" - "**** Simulation run 1, type=compact. 88 Input Files, 616kb total:" - "L0, all files 7kb " - "L0.201[200,201] |L0.201| " @@ -732,6 +741,9 @@ async fn many_tiny_l0_files() { - "**** 1 Output Files (parquet_file_id not yet assigned), 616kb total:" - "L0, all files 616kb " - "L0.?[200,288] |-------------------------------------L0.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 88 files: L0.201, L0.202, L0.203, L0.204, L0.205, L0.206, L0.207, L0.208, L0.209, L0.210, L0.211, L0.212, L0.213, L0.214, L0.215, L0.216, L0.217, L0.218, L0.219, L0.220, L0.221, L0.222, L0.223, L0.224, L0.225, L0.226, L0.227, L0.228, L0.229, L0.230, L0.231, L0.232, L0.233, L0.234, L0.235, L0.236, L0.237, L0.238, L0.239, L0.240, L0.241, L0.242, L0.243, L0.244, L0.245, L0.246, L0.247, L0.248, L0.249, L0.250, L0.251, L0.252, L0.253, L0.254, L0.255, L0.256, L0.257, L0.258, L0.259, L0.260, L0.261, L0.262, L0.263, L0.264, L0.265, L0.266, L0.267, L0.268, L0.269, L0.270, L0.271, L0.272, L0.273, L0.274, L0.275, L0.276, L0.277, L0.278, L0.279, L0.280, L0.281, L0.282, L0.283, L0.284, L0.285, L0.286, L0.287, L0.288" + - " Creating 1 files at level CompactionLevel::L0" - "**** Simulation run 2, type=compact. 2 Input Files, 1.97mb total:" - "L0 " - "L0.290[200,288] 616kb |--------L0.290--------| " @@ -739,6 +751,9 @@ async fn many_tiny_l0_files() { - "**** 1 Output Files (parquet_file_id not yet assigned), 1.97mb total:" - "L1, all files 1.97mb " - "L1.?[0,288] |-------------------------------------L1.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 2 files: L0.289, L0.290" + - " Creating 1 files at level CompactionLevel::L1" - "**** Final Output Files " - "L1, all files 1.97mb " - "L1.291[0,288] |------------------------------------L1.291------------------------------------|" @@ -1394,6 +1409,9 @@ async fn over_two_times_max_files_per_plan() { - "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:" - "L0, all files 1.37mb " - "L0.?[0,200] |-------------------------------------L0.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 200 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10, L0.11, L0.12, L0.13, L0.14, L0.15, L0.16, L0.17, L0.18, L0.19, L0.20, L0.21, L0.22, L0.23, L0.24, L0.25, L0.26, L0.27, L0.28, L0.29, L0.30, L0.31, L0.32, L0.33, L0.34, L0.35, L0.36, L0.37, L0.38, L0.39, L0.40, L0.41, L0.42, L0.43, L0.44, L0.45, L0.46, L0.47, L0.48, L0.49, L0.50, L0.51, L0.52, L0.53, L0.54, L0.55, L0.56, L0.57, L0.58, L0.59, L0.60, L0.61, L0.62, L0.63, L0.64, L0.65, L0.66, L0.67, L0.68, L0.69, L0.70, L0.71, L0.72, L0.73, L0.74, L0.75, L0.76, L0.77, L0.78, L0.79, L0.80, L0.81, L0.82, L0.83, L0.84, L0.85, L0.86, L0.87, L0.88, L0.89, L0.90, L0.91, L0.92, L0.93, L0.94, L0.95, L0.96, L0.97, L0.98, L0.99, L0.100, L0.101, L0.102, L0.103, L0.104, L0.105, L0.106, L0.107, L0.108, L0.109, L0.110, L0.111, L0.112, L0.113, L0.114, L0.115, L0.116, L0.117, L0.118, L0.119, L0.120, L0.121, L0.122, L0.123, L0.124, L0.125, L0.126, L0.127, L0.128, L0.129, L0.130, L0.131, L0.132, L0.133, L0.134, L0.135, L0.136, L0.137, L0.138, L0.139, L0.140, L0.141, L0.142, L0.143, L0.144, L0.145, L0.146, L0.147, L0.148, L0.149, L0.150, L0.151, L0.152, L0.153, L0.154, L0.155, L0.156, L0.157, L0.158, L0.159, L0.160, L0.161, L0.162, L0.163, L0.164, L0.165, L0.166, L0.167, L0.168, L0.169, L0.170, L0.171, L0.172, L0.173, L0.174, L0.175, L0.176, L0.177, L0.178, L0.179, L0.180, L0.181, L0.182, L0.183, L0.184, L0.185, L0.186, L0.187, L0.188, L0.189, L0.190, L0.191, L0.192, L0.193, L0.194, L0.195, L0.196, L0.197, L0.198, L0.199, L0.200" + - " Creating 1 files at level CompactionLevel::L0" - "**** Simulation run 1, type=compact. 200 Input Files, 1.37mb total:" - "L0, all files 7kb " - "L0.201[200,201] |L0.201| " @@ -1599,6 +1617,9 @@ async fn over_two_times_max_files_per_plan() { - "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:" - "L0, all files 1.37mb " - "L0.?[200,400] |-------------------------------------L0.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 200 files: L0.201, L0.202, L0.203, L0.204, L0.205, L0.206, L0.207, L0.208, L0.209, L0.210, L0.211, L0.212, L0.213, L0.214, L0.215, L0.216, L0.217, L0.218, L0.219, L0.220, L0.221, L0.222, L0.223, L0.224, L0.225, L0.226, L0.227, L0.228, L0.229, L0.230, L0.231, L0.232, L0.233, L0.234, L0.235, L0.236, L0.237, L0.238, L0.239, L0.240, L0.241, L0.242, L0.243, L0.244, L0.245, L0.246, L0.247, L0.248, L0.249, L0.250, L0.251, L0.252, L0.253, L0.254, L0.255, L0.256, L0.257, L0.258, L0.259, L0.260, L0.261, L0.262, L0.263, L0.264, L0.265, L0.266, L0.267, L0.268, L0.269, L0.270, L0.271, L0.272, L0.273, L0.274, L0.275, L0.276, L0.277, L0.278, L0.279, L0.280, L0.281, L0.282, L0.283, L0.284, L0.285, L0.286, L0.287, L0.288, L0.289, L0.290, L0.291, L0.292, L0.293, L0.294, L0.295, L0.296, L0.297, L0.298, L0.299, L0.300, L0.301, L0.302, L0.303, L0.304, L0.305, L0.306, L0.307, L0.308, L0.309, L0.310, L0.311, L0.312, L0.313, L0.314, L0.315, L0.316, L0.317, L0.318, L0.319, L0.320, L0.321, L0.322, L0.323, L0.324, L0.325, L0.326, L0.327, L0.328, L0.329, L0.330, L0.331, L0.332, L0.333, L0.334, L0.335, L0.336, L0.337, L0.338, L0.339, L0.340, L0.341, L0.342, L0.343, L0.344, L0.345, L0.346, L0.347, L0.348, L0.349, L0.350, L0.351, L0.352, L0.353, L0.354, L0.355, L0.356, L0.357, L0.358, L0.359, L0.360, L0.361, L0.362, L0.363, L0.364, L0.365, L0.366, L0.367, L0.368, L0.369, L0.370, L0.371, L0.372, L0.373, L0.374, L0.375, L0.376, L0.377, L0.378, L0.379, L0.380, L0.381, L0.382, L0.383, L0.384, L0.385, L0.386, L0.387, L0.388, L0.389, L0.390, L0.391, L0.392, L0.393, L0.394, L0.395, L0.396, L0.397, L0.398, L0.399, L0.400" + - " Creating 1 files at level CompactionLevel::L0" - "**** Simulation run 2, type=compact. 10 Input Files, 70kb total:" - "L0, all files 7kb " - "L0.401[400,401] |L0.401| " @@ -1614,6 +1635,9 @@ async fn over_two_times_max_files_per_plan() { - "**** 1 Output Files (parquet_file_id not yet assigned), 70kb total:" - "L0, all files 70kb " - "L0.?[400,410] |-------------------------------------L0.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 10 files: L0.401, L0.402, L0.403, L0.404, L0.405, L0.406, L0.407, L0.408, L0.409, L0.410" + - " Creating 1 files at level CompactionLevel::L0" - "**** Simulation run 3, type=compact. 3 Input Files, 2.8mb total:" - "L0 " - "L0.413[400,410] 70kb |L0.413|" @@ -1622,6 +1646,9 @@ async fn over_two_times_max_files_per_plan() { - "**** 1 Output Files (parquet_file_id not yet assigned), 2.8mb total:" - "L1, all files 2.8mb " - "L1.?[0,410] |-------------------------------------L1.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 3 files: L0.411, L0.412, L0.413" + - " Creating 1 files at level CompactionLevel::L1" - "**** Final Output Files " - "L1, all files 2.8mb " - "L1.414[0,410] |------------------------------------L1.414------------------------------------|" @@ -2157,6 +2184,9 @@ async fn many_tiny_l1_files() { - "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:" - "L1, all files 1.37mb " - "L1.?[0,200] |-------------------------------------L1.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 200 files: L1.1, L1.2, L1.3, L1.4, L1.5, L1.6, L1.7, L1.8, L1.9, L1.10, L1.11, L1.12, L1.13, L1.14, L1.15, L1.16, L1.17, L1.18, L1.19, L1.20, L1.21, L1.22, L1.23, L1.24, L1.25, L1.26, L1.27, L1.28, L1.29, L1.30, L1.31, L1.32, L1.33, L1.34, L1.35, L1.36, L1.37, L1.38, L1.39, L1.40, L1.41, L1.42, L1.43, L1.44, L1.45, L1.46, L1.47, L1.48, L1.49, L1.50, L1.51, L1.52, L1.53, L1.54, L1.55, L1.56, L1.57, L1.58, L1.59, L1.60, L1.61, L1.62, L1.63, L1.64, L1.65, L1.66, L1.67, L1.68, L1.69, L1.70, L1.71, L1.72, L1.73, L1.74, L1.75, L1.76, L1.77, L1.78, L1.79, L1.80, L1.81, L1.82, L1.83, L1.84, L1.85, L1.86, L1.87, L1.88, L1.89, L1.90, L1.91, L1.92, L1.93, L1.94, L1.95, L1.96, L1.97, L1.98, L1.99, L1.100, L1.101, L1.102, L1.103, L1.104, L1.105, L1.106, L1.107, L1.108, L1.109, L1.110, L1.111, L1.112, L1.113, L1.114, L1.115, L1.116, L1.117, L1.118, L1.119, L1.120, L1.121, L1.122, L1.123, L1.124, L1.125, L1.126, L1.127, L1.128, L1.129, L1.130, L1.131, L1.132, L1.133, L1.134, L1.135, L1.136, L1.137, L1.138, L1.139, L1.140, L1.141, L1.142, L1.143, L1.144, L1.145, L1.146, L1.147, L1.148, L1.149, L1.150, L1.151, L1.152, L1.153, L1.154, L1.155, L1.156, L1.157, L1.158, L1.159, L1.160, L1.161, L1.162, L1.163, L1.164, L1.165, L1.166, L1.167, L1.168, L1.169, L1.170, L1.171, L1.172, L1.173, L1.174, L1.175, L1.176, L1.177, L1.178, L1.179, L1.180, L1.181, L1.182, L1.183, L1.184, L1.185, L1.186, L1.187, L1.188, L1.189, L1.190, L1.191, L1.192, L1.193, L1.194, L1.195, L1.196, L1.197, L1.198, L1.199, L1.200" + - " Creating 1 files at level CompactionLevel::L1" - "**** Simulation run 1, type=compact. 88 Input Files, 616kb total:" - "L1, all files 7kb " - "L1.201[200,201] |L1.201| " @@ -2250,6 +2280,9 @@ async fn many_tiny_l1_files() { - "**** 1 Output Files (parquet_file_id not yet assigned), 616kb total:" - "L1, all files 616kb " - "L1.?[200,288] |-------------------------------------L1.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 88 files: L1.201, L1.202, L1.203, L1.204, L1.205, L1.206, L1.207, L1.208, L1.209, L1.210, L1.211, L1.212, L1.213, L1.214, L1.215, L1.216, L1.217, L1.218, L1.219, L1.220, L1.221, L1.222, L1.223, L1.224, L1.225, L1.226, L1.227, L1.228, L1.229, L1.230, L1.231, L1.232, L1.233, L1.234, L1.235, L1.236, L1.237, L1.238, L1.239, L1.240, L1.241, L1.242, L1.243, L1.244, L1.245, L1.246, L1.247, L1.248, L1.249, L1.250, L1.251, L1.252, L1.253, L1.254, L1.255, L1.256, L1.257, L1.258, L1.259, L1.260, L1.261, L1.262, L1.263, L1.264, L1.265, L1.266, L1.267, L1.268, L1.269, L1.270, L1.271, L1.272, L1.273, L1.274, L1.275, L1.276, L1.277, L1.278, L1.279, L1.280, L1.281, L1.282, L1.283, L1.284, L1.285, L1.286, L1.287, L1.288" + - " Creating 1 files at level CompactionLevel::L1" - "**** Final Output Files " - "L1 " - "L1.289[0,200] 1.37mb|-----------------------L1.289------------------------| " @@ -2719,6 +2752,9 @@ async fn many_l0_and_overlapped_l1_files() { - "**** 1 Output Files (parquet_file_id not yet assigned), 1.3mb total:" - "L0, all files 1.3mb " - "L0.?[0,190] |-------------------------------------L0.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 190 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10, L0.11, L0.12, L0.13, L0.14, L0.15, L0.16, L0.17, L0.18, L0.19, L0.20, L0.21, L0.22, L0.23, L0.24, L0.25, L0.26, L0.27, L0.28, L0.29, L0.30, L0.31, L0.32, L0.33, L0.34, L0.35, L0.36, L0.37, L0.38, L0.39, L0.40, L0.41, L0.42, L0.43, L0.44, L0.45, L0.46, L0.47, L0.48, L0.49, L0.50, L0.51, L0.52, L0.53, L0.54, L0.55, L0.56, L0.57, L0.58, L0.59, L0.60, L0.61, L0.62, L0.63, L0.64, L0.65, L0.66, L0.67, L0.68, L0.69, L0.70, L0.71, L0.72, L0.73, L0.74, L0.75, L0.76, L0.77, L0.78, L0.79, L0.80, L0.81, L0.82, L0.83, L0.84, L0.85, L0.86, L0.87, L0.88, L0.89, L0.90, L0.91, L0.92, L0.93, L0.94, L0.95, L0.96, L0.97, L0.98, L0.99, L0.100, L0.101, L0.102, L0.103, L0.104, L0.105, L0.106, L0.107, L0.108, L0.109, L0.110, L0.111, L0.112, L0.113, L0.114, L0.115, L0.116, L0.117, L0.118, L0.119, L0.120, L0.121, L0.122, L0.123, L0.124, L0.125, L0.126, L0.127, L0.128, L0.129, L0.130, L0.131, L0.132, L0.133, L0.134, L0.135, L0.136, L0.137, L0.138, L0.139, L0.140, L0.141, L0.142, L0.143, L0.144, L0.145, L0.146, L0.147, L0.148, L0.149, L0.150, L0.151, L0.152, L0.153, L0.154, L0.155, L0.156, L0.157, L0.158, L0.159, L0.160, L0.161, L0.162, L0.163, L0.164, L0.165, L0.166, L0.167, L0.168, L0.169, L0.170, L0.171, L0.172, L0.173, L0.174, L0.175, L0.176, L0.177, L0.178, L0.179, L0.180, L0.181, L0.182, L0.183, L0.184, L0.185, L0.186, L0.187, L0.188, L0.189, L0.190" + - " Creating 1 files at level CompactionLevel::L0" - "**** Simulation run 1, type=split(split_times=[159]). 21 Input Files, 21.3mb total:" - "L0 " - "L0.211[0,190] 1.3mb |----------------------------------L0.211----------------------------------| " @@ -2747,6 +2783,9 @@ async fn many_l0_and_overlapped_l1_files() { - "L1 " - "L1.?[0,159] 17.02mb |----------------------------L1.?-----------------------------| " - "L1.?[159,199] 4.28mb |-----L1.?-----| " + - "Committing partition 1:" + - " Soft Deleting 21 files: L1.191, L1.192, L1.193, L1.194, L1.195, L1.196, L1.197, L1.198, L1.199, L1.200, L1.201, L1.202, L1.203, L1.204, L1.205, L1.206, L1.207, L1.208, L1.209, L1.210, L0.211" + - " Creating 2 files at level CompactionLevel::L1" - "**** Final Output Files " - "L1 " - "L1.212[0,159] 17.02mb|---------------------------L1.212----------------------------| " @@ -3237,6 +3276,9 @@ async fn not_many_l0_and_overlapped_l1_files() { - "**** 1 Output Files (parquet_file_id not yet assigned), 6.3mb total:" - "L1, all files 6.3mb " - "L1.?[0,190] |-------------------------------------L1.?-------------------------------------|" + - "Committing partition 1:" + - " Soft Deleting 195 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10, L0.11, L0.12, L0.13, L0.14, L0.15, L0.16, L0.17, L0.18, L0.19, L0.20, L0.21, L0.22, L0.23, L0.24, L0.25, L0.26, L0.27, L0.28, L0.29, L0.30, L0.31, L0.32, L0.33, L0.34, L0.35, L0.36, L0.37, L0.38, L0.39, L0.40, L0.41, L0.42, L0.43, L0.44, L0.45, L0.46, L0.47, L0.48, L0.49, L0.50, L0.51, L0.52, L0.53, L0.54, L0.55, L0.56, L0.57, L0.58, L0.59, L0.60, L0.61, L0.62, L0.63, L0.64, L0.65, L0.66, L0.67, L0.68, L0.69, L0.70, L0.71, L0.72, L0.73, L0.74, L0.75, L0.76, L0.77, L0.78, L0.79, L0.80, L0.81, L0.82, L0.83, L0.84, L0.85, L0.86, L0.87, L0.88, L0.89, L0.90, L0.91, L0.92, L0.93, L0.94, L0.95, L0.96, L0.97, L0.98, L0.99, L0.100, L0.101, L0.102, L0.103, L0.104, L0.105, L0.106, L0.107, L0.108, L0.109, L0.110, L0.111, L0.112, L0.113, L0.114, L0.115, L0.116, L0.117, L0.118, L0.119, L0.120, L0.121, L0.122, L0.123, L0.124, L0.125, L0.126, L0.127, L0.128, L0.129, L0.130, L0.131, L0.132, L0.133, L0.134, L0.135, L0.136, L0.137, L0.138, L0.139, L0.140, L0.141, L0.142, L0.143, L0.144, L0.145, L0.146, L0.147, L0.148, L0.149, L0.150, L0.151, L0.152, L0.153, L0.154, L0.155, L0.156, L0.157, L0.158, L0.159, L0.160, L0.161, L0.162, L0.163, L0.164, L0.165, L0.166, L0.167, L0.168, L0.169, L0.170, L0.171, L0.172, L0.173, L0.174, L0.175, L0.176, L0.177, L0.178, L0.179, L0.180, L0.181, L0.182, L0.183, L0.184, L0.185, L0.186, L0.187, L0.188, L0.189, L0.190, L1.191, L1.192, L1.193, L1.194, L1.195" + - " Creating 1 files at level CompactionLevel::L1" - "**** Simulation run 1, type=split(split_times=[2407]). 16 Input Files, 21.3mb total:" - "L1 " - "L1.196[200,209] 1mb |L1.196| " @@ -3259,6 +3301,9 @@ async fn not_many_l0_and_overlapped_l1_files() { - "L2 " - "L2.?[0,2407] 17.04mb|----------------------------L2.?-----------------------------| " - "L2.?[2407,3009] 4.26mb |-----L2.?-----| " + - "Committing partition 1:" + - " Soft Deleting 16 files: L1.196, L1.197, L1.198, L1.199, L1.200, L1.201, L1.202, L1.203, L1.204, L1.205, L1.206, L1.207, L1.208, L1.209, L1.210, L1.211" + - " Creating 2 files at level CompactionLevel::L2" - "**** Final Output Files " - "L2 " - "L2.212[0,2407] 17.04mb|---------------------------L2.212----------------------------| " diff --git a/compactor2/tests/layouts/mod.rs b/compactor2/tests/layouts/mod.rs index 24ba602b96..63ad177464 100644 --- a/compactor2/tests/layouts/mod.rs +++ b/compactor2/tests/layouts/mod.rs @@ -113,7 +113,7 @@ pub(crate) async fn run_layout_scenario(setup: &TestSetup) -> Vec { let compact_result = setup.run_compact().await; // record what the compactor actually did - output.extend(compact_result.simulator_runs); + output.extend(compact_result.run_log); // Record any skipped compactions (is after what the compactor actually did) output.extend(get_skipped_compactions(setup).await); diff --git a/compactor2_test_utils/src/commit_wrapper.rs b/compactor2_test_utils/src/commit_wrapper.rs new file mode 100644 index 0000000000..ca7014ce85 --- /dev/null +++ b/compactor2_test_utils/src/commit_wrapper.rs @@ -0,0 +1,103 @@ +//! Handles recording commit information to the test run log + +use async_trait::async_trait; +use compactor2::{Commit, CommitWrapper}; +use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId}; +use std::{ + fmt::Display, + sync::{Arc, Mutex}, +}; + +use crate::display::ParquetFileInfo; + +/// Records catalog operations to a shared run_log during tests +#[derive(Debug)] +struct CommitRecorder { + /// The inner commit that does the work + inner: Arc<(dyn Commit)>, + + /// a log of what happened during this test + run_log: Arc>>, +} + +#[async_trait] +impl Commit for CommitRecorder { + async fn commit( + &self, + partition_id: PartitionId, + delete: &[ParquetFile], + upgrade: &[ParquetFile], + create: &[ParquetFileParams], + target_level: CompactionLevel, + ) -> Vec { + // lock scope + { + let mut run_log = self.run_log.lock().unwrap(); + + run_log.push(format!("Committing partition {partition_id}:")); + + if !delete.is_empty() { + run_log.push(format!( + " Soft Deleting {} files: {}", + delete.len(), + id_list(delete) + )); + } + + if !upgrade.is_empty() { + run_log.push(format!( + " Upgrading {} files level to {}: {}", + upgrade.len(), + target_level, + id_list(upgrade) + )); + } + + if !create.is_empty() { + run_log.push(format!( + " Creating {} files at level {}", + create.len(), + target_level + )); + } + } + self.inner + .commit(partition_id, delete, upgrade, create, target_level) + .await + } +} + +fn id_list(files: &[ParquetFile]) -> String { + // sort by id to get a more consistent display + let mut files: Vec<_> = files.iter().collect(); + files.sort_by_key(|f| f.id); + + let files: Vec<_> = files.iter().map(|f| f.display_id()).collect(); + files.join(", ") +} + +impl Display for CommitRecorder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "CommitRecorder") + } +} + +/// Wraps another commit reference in a CommitRecorder +#[derive(Debug)] +pub(crate) struct CommitRecorderBuilder { + /// a shared log of what happened during a simulated run + run_log: Arc>>, +} + +impl CommitRecorderBuilder { + pub fn new(run_log: Arc>>) -> Self { + Self { run_log } + } +} + +impl CommitWrapper for CommitRecorderBuilder { + fn wrap(&self, inner: Arc<(dyn Commit)>) -> Arc<(dyn Commit + 'static)> { + let run_log = Arc::clone(&self.run_log); + Arc::new(CommitRecorder { inner, run_log }) + } +} diff --git a/compactor2_test_utils/src/lib.rs b/compactor2_test_utils/src/lib.rs index f6830058c9..27ae510e14 100644 --- a/compactor2_test_utils/src/lib.rs +++ b/compactor2_test_utils/src/lib.rs @@ -12,14 +12,22 @@ clippy::dbg_macro )] +mod commit_wrapper; mod display; mod simulator; +use commit_wrapper::CommitRecorderBuilder; pub use display::{display_size, format_files, format_files_split}; use iox_query::exec::ExecutorType; use simulator::ParquetFileSimulator; use tracker::AsyncSemaphoreMetrics; -use std::{collections::HashSet, future::Future, num::NonZeroUsize, sync::Arc, time::Duration}; +use std::{ + collections::HashSet, + future::Future, + num::NonZeroUsize, + sync::{Arc, Mutex}, + time::Duration, +}; use async_trait::async_trait; use backoff::BackoffConfig; @@ -59,6 +67,8 @@ pub struct TestSetupBuilder { table: Arc, partition: Arc, files: Vec, + /// a shared log of what happened during the test + run_log: Arc>>, } impl TestSetupBuilder { @@ -84,6 +94,10 @@ impl TestSetupBuilder { let sort_key = SortKey::from_columns(["tag1", "tag2", "tag3", "time"]); let partition = partition.update_sort_key(sort_key.clone()).await; + // intercept commit calls and record them as well + let run_log = Arc::new(Mutex::new(vec![])); + let commit_wrapper = CommitRecorderBuilder::new(Arc::clone(&run_log)); + let config = Config { shard_id: shard.shard.id, metric_registry: catalog.metric_registry(), @@ -113,6 +127,7 @@ impl TestSetupBuilder { process_once: true, simulate_without_object_store: false, parquet_files_sink_override: None, + commit_wrapper: Some(Arc::new(commit_wrapper)), all_errors_are_fatal: true, max_num_columns_per_table: 200, max_num_files_per_plan: 200, @@ -126,6 +141,7 @@ impl TestSetupBuilder { table, partition, files: vec![], + run_log, } } @@ -245,6 +261,7 @@ impl TestSetupBuilder { table: self.table, partition: self.partition, files, + run_log: Arc::new(Mutex::new(vec![])), } } } @@ -287,8 +304,10 @@ impl TestSetupBuilder { /// set simulate_without_object_store pub fn simulate_without_object_store(mut self) -> Self { + let run_log = Arc::clone(&self.run_log); self.config.simulate_without_object_store = true; - self.config.parquet_files_sink_override = Some(Arc::new(ParquetFileSimulator::new())); + self.config.parquet_files_sink_override = + Some(Arc::new(ParquetFileSimulator::new(run_log))); self } @@ -338,6 +357,7 @@ impl TestSetupBuilder { table: self.table, partition: self.partition, config: Arc::new(self.config), + run_log: self.run_log, } } } @@ -357,6 +377,8 @@ pub struct TestSetup { pub partition: Arc, /// The compactor2 configuration pub config: Arc, + /// a shared log of what happened during a simulated run + run_log: Arc>>, } impl TestSetup { @@ -400,6 +422,9 @@ impl TestSetup { } async fn run_compact_impl(&self, components: Arc) -> CompactResult { + // clear any existing log entries, if any + self.run_log.lock().unwrap().clear(); + let config = Arc::clone(&self.config); let job_semaphore = Arc::new( Arc::new(AsyncSemaphoreMetrics::new(&config.metric_registry, [])).new_semaphore(10), @@ -426,24 +451,16 @@ impl TestSetup { .await; // get the results - let simulator_runs = if let Some(simulator) = components - .parquet_files_sink - .as_any() - .downcast_ref::() - { - simulator.runs() - } else { - vec![] - }; - - CompactResult { simulator_runs } + CompactResult { + run_log: self.run_log.lock().unwrap().clone(), + } } } /// Information about the compaction that was run pub struct CompactResult { /// [`ParquetFileSimulator`] output, if enabled - pub simulator_runs: Vec, + pub run_log: Vec, } /// A collection of nanosecond timestamps relative to now diff --git a/compactor2_test_utils/src/simulator.rs b/compactor2_test_utils/src/simulator.rs index e85df0a175..d66390eedc 100644 --- a/compactor2_test_utils/src/simulator.rs +++ b/compactor2_test_utils/src/simulator.rs @@ -1,6 +1,9 @@ use std::{ collections::BTreeSet, - sync::{Arc, Mutex}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, }; use async_trait::async_trait; @@ -44,7 +47,10 @@ use crate::{display::total_size, display_size, format_files}; /// uniform distribution. #[derive(Debug, Default)] pub struct ParquetFileSimulator { - runs: Arc>>, + /// entries that are added to while running + run_log: Arc>>, + /// Used to generate run ids for display + run_id_generator: AtomicUsize, } impl std::fmt::Display for ParquetFileSimulator { @@ -54,47 +60,17 @@ impl std::fmt::Display for ParquetFileSimulator { } impl ParquetFileSimulator { - /// Create a new simulator for creating parquet files - pub fn new() -> Self { + /// Create a new simulator for creating parquet files, which + /// appends its output to `run_log` + pub fn new(run_log: Arc>>) -> Self { Self { - runs: Arc::new(Mutex::new(vec![])), + run_log, + run_id_generator: AtomicUsize::new(0), } } - /// Get a visual display of the simulated runs this simulator - /// has performed, for use in tests - pub fn runs(&self) -> Vec { - let runs = (*self.runs.lock().unwrap()).clone(); - - runs.into_iter() - .enumerate() - .flat_map(|(i, run)| { - let SimulatedRun { - plan_type, - input_parquet_files, - output_params, - } = run; - - let input_title = format!( - "**** Simulation run {}, type={}. {} Input Files, {} total:", - i, - plan_type, - input_parquet_files.len(), - display_size(total_size(&input_parquet_files)) - ); - - let output_title = format!( - "**** {} Output Files (parquet_file_id not yet assigned), {} total:", - output_params.len(), - display_size(total_size(&output_params)) - ); - - // hook up inputs and outputs - format_files(input_title, &input_parquet_files) - .into_iter() - .chain(format_files(output_title, &output_params).into_iter()) - }) - .collect() + fn next_run_id(&self) -> usize { + self.run_id_generator.fetch_add(1, Ordering::SeqCst) } } @@ -153,11 +129,13 @@ impl ParquetFilesSink for ParquetFileSimulator { .collect(); // record what the simulator did - self.runs.lock().unwrap().push(SimulatedRun { + let run = SimulatedRun { + run_id: self.next_run_id(), plan_type, input_parquet_files, output_params: output_params.clone(), - }); + }; + self.run_log.lock().unwrap().extend(run.into_strings()); Ok(output_params) } @@ -238,12 +216,44 @@ impl SimulatedFile { /// purposes #[derive(Debug, Clone)] pub struct SimulatedRun { + run_id: usize, // fields are used in testing plan_type: String, input_parquet_files: Vec, output_params: Vec, } +impl SimulatedRun { + /// Convert this simulated run into a set of human readable strings + fn into_strings(self) -> impl Iterator { + let Self { + run_id, + plan_type, + input_parquet_files, + output_params, + } = self; + + let input_title = format!( + "**** Simulation run {}, type={}. {} Input Files, {} total:", + run_id, + plan_type, + input_parquet_files.len(), + display_size(total_size(&input_parquet_files)) + ); + + let output_title = format!( + "**** {} Output Files (parquet_file_id not yet assigned), {} total:", + output_params.len(), + display_size(total_size(&output_params)) + ); + + // hook up inputs and outputs + format_files(input_title, &input_parquet_files) + .into_iter() + .chain(format_files(output_title, &output_params).into_iter()) + } +} + fn overall_column_set<'a>(files: impl IntoIterator) -> ColumnSet { let all_columns = files .into_iter() diff --git a/ioxd_compactor2/src/lib.rs b/ioxd_compactor2/src/lib.rs index d4d2a7ffb3..01c1cb3e1e 100644 --- a/ioxd_compactor2/src/lib.rs +++ b/ioxd_compactor2/src/lib.rs @@ -202,6 +202,7 @@ pub async fn create_compactor2_server_type( process_once: compactor_config.process_once, simulate_without_object_store: false, parquet_files_sink_override: None, + commit_wrapper: None, all_errors_are_fatal: false, max_num_columns_per_table: compactor_config.max_num_columns_per_table, max_num_files_per_plan: compactor_config.max_num_files_per_plan,