feat(compactor2): add catalog upgrade information to tests (#7075)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2023-02-28 20:28:42 +01:00 committed by GitHub
parent 6d8fd37e26
commit f3a16a1221
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 313 additions and 63 deletions

View File

@ -30,6 +30,14 @@ pub trait Commit: Debug + Display + Send + Sync {
) -> Vec<ParquetFileId>;
}
/// Something that can wrap `Commit` instances
///
/// Use to install test observation
pub trait CommitWrapper: Debug + Send + Sync {
/// a function to call that wraps a [`Commit`] instance
fn wrap(&self, commit: Arc<dyn Commit>) -> Arc<dyn Commit>;
}
#[async_trait]
impl<T> Commit for Arc<T>
where

View File

@ -156,6 +156,12 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
))
};
let commit = if let Some(commit_wrapper) = config.commit_wrapper.as_ref() {
commit_wrapper.wrap(commit)
} else {
commit
};
let scratchpad_store_output = if config.shadow_mode {
Arc::new(IgnoreWrites::new(Arc::new(InMemory::new())))
} else {

View File

@ -34,7 +34,8 @@ pub fn log_config(config: &Config) {
shard_config,
min_num_l1_files_to_compact,
process_once,
parquet_files_sink_override: parquet_files_sink,
parquet_files_sink_override,
commit_wrapper,
simulate_without_object_store,
all_errors_are_fatal,
max_num_columns_per_table,
@ -50,10 +51,13 @@ pub fn log_config(config: &Config) {
}
};
let parquet_files_sink = parquet_files_sink
let parquet_files_sink_override = parquet_files_sink_override
.as_ref()
.map(|_| "Some")
.unwrap_or("None");
let commit_wrapper = commit_wrapper.as_ref().map(|_| "Some").unwrap_or("None");
info!(
shard_id=shard_id.get(),
%catalog,
@ -79,7 +83,8 @@ pub fn log_config(config: &Config) {
min_num_l1_files_to_compact,
process_once,
simulate_without_object_store,
%parquet_files_sink,
%parquet_files_sink_override,
%commit_wrapper,
all_errors_are_fatal,
max_num_columns_per_table,
max_num_files_per_plan,

View File

@ -8,7 +8,7 @@ use iox_query::exec::Executor;
use iox_time::TimeProvider;
use parquet_file::storage::ParquetStorage;
use crate::components::parquet_files_sink::ParquetFilesSink;
use crate::components::{commit::CommitWrapper, parquet_files_sink::ParquetFilesSink};
/// Config to set up a compactor.
#[derive(Debug, Clone)]
@ -113,9 +113,15 @@ pub struct Config {
/// This is useful for testing.
pub simulate_without_object_store: bool,
/// Use the provided [`ParquetFilesSink`] to create parquet files (used for testing)
/// Use the provided [`ParquetFilesSink`] to create parquet files
/// (used for testing)
pub parquet_files_sink_override: Option<Arc<dyn ParquetFilesSink>>,
/// Optionally wrap the `Commit` instance
///
/// This is mostly used for testing
pub commit_wrapper: Option<Arc<dyn CommitWrapper>>,
/// Ensure that ALL errors (including object store errors) result in "skipped" partitions.
///
/// This is mostly useful for testing.

View File

@ -179,8 +179,12 @@ mod round_info;
// publically expose items needed for testing
pub use components::{
df_planner::panic::PanicDataFusionPlanner, hardcoded::hardcoded_components,
namespaces_source::mock::NamespaceWrapper, parquet_files_sink::ParquetFilesSink, Components,
commit::{Commit, CommitWrapper},
df_planner::panic::PanicDataFusionPlanner,
hardcoded::hardcoded_components,
namespaces_source::mock::NamespaceWrapper,
parquet_files_sink::ParquetFilesSink,
Components,
};
pub use driver::compact;
pub use error::DynError;

View File

@ -47,6 +47,9 @@ async fn all_overlapping_l0() {
- "L1 "
- "L1.?[100,180] 72mb |-----------------------------L1.?-----------------------------| "
- "L1.?[180,200] 18mb |-----L1.?-----|"
- "Committing partition 1:"
- " Soft Deleting 10 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10"
- " Creating 2 files at level CompactionLevel::L1"
- "**** Final Output Files "
- "L1 "
- "L1.11[100,180] 72mb |----------------------------L1.11-----------------------------| "
@ -106,6 +109,9 @@ async fn all_non_overlapping_l0() {
- "L1 "
- "L1.?[0,720] 79.91mb |----------------------------L1.?-----------------------------| "
- "L1.?[720,901] 20.09mb |-----L1.?-----| "
- "Committing partition 1:"
- " Soft Deleting 10 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10"
- " Creating 2 files at level CompactionLevel::L1"
- "**** Final Output Files "
- "L1 "
- "L1.11[0,720] 79.91mb|----------------------------L1.11----------------------------| "
@ -178,6 +184,9 @@ async fn l1_with_overlapping_l0() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 10.02mb total:"
- "L1, all files 10.02mb "
- "L1.?[100,310] |-------------------------------------L1.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 6 files: L1.2, L0.3, L0.4, L0.5, L0.6, L0.7"
- " Creating 1 files at level CompactionLevel::L1"
- "**** Final Output Files "
- "L1 "
- "L1.1[50,100] 10mb |----L1.1-----| "
@ -246,6 +255,9 @@ async fn l1_with_non_overlapping_l0() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 25kb total:"
- "L1, all files 25kb "
- "L1.?[300,550] |-------------------------------------L1.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 5 files: L0.3, L0.4, L0.5, L0.6, L0.7"
- " Creating 1 files at level CompactionLevel::L1"
- "**** Final Output Files "
- "L1 "
- "L1.1[50,100] 10mb |-L1.1-| "
@ -313,6 +325,9 @@ async fn l1_with_non_overlapping_l0_larger() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:"
- "L1, all files 15mb "
- "L1.?[300,450] |-------------------------------------L1.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 3 files: L0.5, L0.6, L0.7"
- " Creating 1 files at level CompactionLevel::L1"
- "**** Simulation run 1, type=split(split_times=[370]). 5 Input Files, 108mb total:"
- "L1 "
- "L1.4[200,250] 3mb |--L1.4--| "
@ -324,6 +339,9 @@ async fn l1_with_non_overlapping_l0_larger() {
- "L2 "
- "L2.?[50,370] 86.4mb |-----------------------------L2.?-----------------------------| "
- "L2.?[370,450] 21.6mb |-----L2.?-----|"
- "Committing partition 1:"
- " Soft Deleting 5 files: L1.1, L1.2, L1.3, L1.4, L1.8"
- " Creating 2 files at level CompactionLevel::L2"
- "**** Final Output Files "
- "L2 "
- "L2.9[50,370] 86.4mb |-----------------------------L2.9-----------------------------| "
@ -399,6 +417,9 @@ async fn l1_too_much_with_non_overlapping_l0() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:"
- "L1, all files 15mb "
- "L1.?[600,650] |-------------------------------------L1.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 3 files: L0.11, L0.12, L0.13"
- " Creating 1 files at level CompactionLevel::L1"
- "SKIPPED COMPACTION for PartitionId(1): partition 1 has 781189120 parquet file bytes, limit is 268435456"
- "**** Final Output Files "
- "L1 "
@ -483,6 +504,9 @@ async fn many_l1_with_non_overlapping_l0() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:"
- "L1, all files 15mb "
- "L1.?[600,650] |-------------------------------------L1.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 3 files: L0.11, L0.12, L0.13"
- " Creating 1 files at level CompactionLevel::L1"
- "**** Simulation run 1, type=split(split_times=[530]). 11 Input Files, 88mb total:"
- "L1 "
- "L1.10[500,550] 7mb |L1.10| "
@ -500,6 +524,9 @@ async fn many_l1_with_non_overlapping_l0() {
- "L2 "
- "L2.?[50,530] 70.4mb |-----------------------------L2.?-----------------------------| "
- "L2.?[530,650] 17.6mb |-----L2.?-----|"
- "Committing partition 1:"
- " Soft Deleting 11 files: L1.1, L1.2, L1.3, L1.4, L1.5, L1.6, L1.7, L1.8, L1.9, L1.10, L1.14"
- " Creating 2 files at level CompactionLevel::L2"
- "**** Final Output Files "
- "L2 "
- "L2.15[50,530] 70.4mb|----------------------------L2.15-----------------------------| "
@ -566,6 +593,9 @@ async fn large_l1_with_non_overlapping_l0() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 15mb total:"
- "L1, all files 15mb "
- "L1.?[600,650] |-------------------------------------L1.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 3 files: L0.3, L0.4, L0.5"
- " Creating 1 files at level CompactionLevel::L1"
- "**** Simulation run 1, type=split(split_times=[375]). 3 Input Files, 185mb total:"
- "L1 "
- "L1.2[100,150] 80mb |L1.2| "
@ -575,6 +605,9 @@ async fn large_l1_with_non_overlapping_l0() {
- "L2 "
- "L2.?[50,375] 100.21mb|------------------L2.?-------------------| "
- "L2.?[375,650] 84.79mb |---------------L2.?---------------| "
- "Committing partition 1:"
- " Soft Deleting 3 files: L1.1, L1.2, L1.6"
- " Creating 2 files at level CompactionLevel::L2"
- "**** Final Output Files "
- "L2 "
- "L2.7[50,375] 100.21mb|------------------L2.7-------------------| "

View File

@ -57,6 +57,9 @@ async fn all_overlapping_l0_split_percentage() {
- "L1 "
- "L1.?[100,195] 85.5mb|-----------------------------------L1.?-----------------------------------| "
- "L1.?[195,200] 4.5mb |L1.?|"
- "Committing partition 1:"
- " Soft Deleting 10 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10"
- " Creating 2 files at level CompactionLevel::L1"
- "**** Final Output Files "
- "L1 "
- "L1.11[100,195] 85.5mb|----------------------------------L1.11-----------------------------------| "
@ -118,6 +121,9 @@ async fn all_overlapping_l0_max_file_size() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 90mb total:"
- "L1, all files 90mb "
- "L1.?[100,200] |-------------------------------------L1.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 10 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10"
- " Creating 1 files at level CompactionLevel::L1"
- "**** Final Output Files "
- "L1, all files 90mb "
- "L1.11[100,200] |------------------------------------L1.11-------------------------------------|"
@ -186,6 +192,9 @@ async fn all_overlapping_l0_split_percentage_and_max_file_size() {
- "L1.?[172,184] 10.8mb |-L1.?--| "
- "L1.?[184,196] 10.8mb |-L1.?--| "
- "L1.?[196,200] 3.6mb |L1.?|"
- "Committing partition 1:"
- " Soft Deleting 10 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10"
- " Creating 9 files at level CompactionLevel::L1"
- "**** Simulation run 1, type=split(split_times=[112, 124, 136, 148, 160, 172, 184, 196]). 9 Input Files, 90mb total:"
- "L1 "
- "L1.19[196,200] 3.6mb |L1.19|"
@ -208,6 +217,9 @@ async fn all_overlapping_l0_split_percentage_and_max_file_size() {
- "L2.?[172,184] 10.8mb |-L2.?--| "
- "L2.?[184,196] 10.8mb |-L2.?--| "
- "L2.?[196,200] 3.6mb |L2.?|"
- "Committing partition 1:"
- " Soft Deleting 9 files: L1.11, L1.12, L1.13, L1.14, L1.15, L1.16, L1.17, L1.18, L1.19"
- " Creating 9 files at level CompactionLevel::L2"
- "**** Final Output Files "
- "L2 "
- "L2.20[100,112] 10.8mb|-L2.20-| "

View File

@ -76,6 +76,9 @@ async fn many_l1_files() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 3mb total:"
- "L1, all files 3mb "
- "L1.?[24,25] |-------------------------------------L1.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 3 files: L0.21, L0.22, L0.23"
- " Creating 1 files at level CompactionLevel::L1"
- "**** Simulation run 1, type=split(split_times=[13]). 21 Input Files, 203mb total:"
- "L1 "
- "L1.20[19,20] 10mb |L1.20| "
@ -103,6 +106,9 @@ async fn many_l1_files() {
- "L2 "
- "L2.?[0,13] 105.56mb |-----------------L2.?------------------| "
- "L2.?[13,25] 97.44mb |----------------L2.?----------------| "
- "Committing partition 1:"
- " Soft Deleting 21 files: L1.1, L1.2, L1.3, L1.4, L1.5, L1.6, L1.7, L1.8, L1.9, L1.10, L1.11, L1.12, L1.13, L1.14, L1.15, L1.16, L1.17, L1.18, L1.19, L1.20, L1.24"
- " Creating 2 files at level CompactionLevel::L2"
- "**** Final Output Files "
- "L2 "
- "L2.25[0,13] 105.56mb|-----------------L2.25-----------------| "
@ -639,6 +645,9 @@ async fn many_tiny_l0_files() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:"
- "L0, all files 1.37mb "
- "L0.?[0,200] |-------------------------------------L0.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 200 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10, L0.11, L0.12, L0.13, L0.14, L0.15, L0.16, L0.17, L0.18, L0.19, L0.20, L0.21, L0.22, L0.23, L0.24, L0.25, L0.26, L0.27, L0.28, L0.29, L0.30, L0.31, L0.32, L0.33, L0.34, L0.35, L0.36, L0.37, L0.38, L0.39, L0.40, L0.41, L0.42, L0.43, L0.44, L0.45, L0.46, L0.47, L0.48, L0.49, L0.50, L0.51, L0.52, L0.53, L0.54, L0.55, L0.56, L0.57, L0.58, L0.59, L0.60, L0.61, L0.62, L0.63, L0.64, L0.65, L0.66, L0.67, L0.68, L0.69, L0.70, L0.71, L0.72, L0.73, L0.74, L0.75, L0.76, L0.77, L0.78, L0.79, L0.80, L0.81, L0.82, L0.83, L0.84, L0.85, L0.86, L0.87, L0.88, L0.89, L0.90, L0.91, L0.92, L0.93, L0.94, L0.95, L0.96, L0.97, L0.98, L0.99, L0.100, L0.101, L0.102, L0.103, L0.104, L0.105, L0.106, L0.107, L0.108, L0.109, L0.110, L0.111, L0.112, L0.113, L0.114, L0.115, L0.116, L0.117, L0.118, L0.119, L0.120, L0.121, L0.122, L0.123, L0.124, L0.125, L0.126, L0.127, L0.128, L0.129, L0.130, L0.131, L0.132, L0.133, L0.134, L0.135, L0.136, L0.137, L0.138, L0.139, L0.140, L0.141, L0.142, L0.143, L0.144, L0.145, L0.146, L0.147, L0.148, L0.149, L0.150, L0.151, L0.152, L0.153, L0.154, L0.155, L0.156, L0.157, L0.158, L0.159, L0.160, L0.161, L0.162, L0.163, L0.164, L0.165, L0.166, L0.167, L0.168, L0.169, L0.170, L0.171, L0.172, L0.173, L0.174, L0.175, L0.176, L0.177, L0.178, L0.179, L0.180, L0.181, L0.182, L0.183, L0.184, L0.185, L0.186, L0.187, L0.188, L0.189, L0.190, L0.191, L0.192, L0.193, L0.194, L0.195, L0.196, L0.197, L0.198, L0.199, L0.200"
- " Creating 1 files at level CompactionLevel::L0"
- "**** Simulation run 1, type=compact. 88 Input Files, 616kb total:"
- "L0, all files 7kb "
- "L0.201[200,201] |L0.201| "
@ -732,6 +741,9 @@ async fn many_tiny_l0_files() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 616kb total:"
- "L0, all files 616kb "
- "L0.?[200,288] |-------------------------------------L0.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 88 files: L0.201, L0.202, L0.203, L0.204, L0.205, L0.206, L0.207, L0.208, L0.209, L0.210, L0.211, L0.212, L0.213, L0.214, L0.215, L0.216, L0.217, L0.218, L0.219, L0.220, L0.221, L0.222, L0.223, L0.224, L0.225, L0.226, L0.227, L0.228, L0.229, L0.230, L0.231, L0.232, L0.233, L0.234, L0.235, L0.236, L0.237, L0.238, L0.239, L0.240, L0.241, L0.242, L0.243, L0.244, L0.245, L0.246, L0.247, L0.248, L0.249, L0.250, L0.251, L0.252, L0.253, L0.254, L0.255, L0.256, L0.257, L0.258, L0.259, L0.260, L0.261, L0.262, L0.263, L0.264, L0.265, L0.266, L0.267, L0.268, L0.269, L0.270, L0.271, L0.272, L0.273, L0.274, L0.275, L0.276, L0.277, L0.278, L0.279, L0.280, L0.281, L0.282, L0.283, L0.284, L0.285, L0.286, L0.287, L0.288"
- " Creating 1 files at level CompactionLevel::L0"
- "**** Simulation run 2, type=compact. 2 Input Files, 1.97mb total:"
- "L0 "
- "L0.290[200,288] 616kb |--------L0.290--------| "
@ -739,6 +751,9 @@ async fn many_tiny_l0_files() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.97mb total:"
- "L1, all files 1.97mb "
- "L1.?[0,288] |-------------------------------------L1.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 2 files: L0.289, L0.290"
- " Creating 1 files at level CompactionLevel::L1"
- "**** Final Output Files "
- "L1, all files 1.97mb "
- "L1.291[0,288] |------------------------------------L1.291------------------------------------|"
@ -1394,6 +1409,9 @@ async fn over_two_times_max_files_per_plan() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:"
- "L0, all files 1.37mb "
- "L0.?[0,200] |-------------------------------------L0.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 200 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10, L0.11, L0.12, L0.13, L0.14, L0.15, L0.16, L0.17, L0.18, L0.19, L0.20, L0.21, L0.22, L0.23, L0.24, L0.25, L0.26, L0.27, L0.28, L0.29, L0.30, L0.31, L0.32, L0.33, L0.34, L0.35, L0.36, L0.37, L0.38, L0.39, L0.40, L0.41, L0.42, L0.43, L0.44, L0.45, L0.46, L0.47, L0.48, L0.49, L0.50, L0.51, L0.52, L0.53, L0.54, L0.55, L0.56, L0.57, L0.58, L0.59, L0.60, L0.61, L0.62, L0.63, L0.64, L0.65, L0.66, L0.67, L0.68, L0.69, L0.70, L0.71, L0.72, L0.73, L0.74, L0.75, L0.76, L0.77, L0.78, L0.79, L0.80, L0.81, L0.82, L0.83, L0.84, L0.85, L0.86, L0.87, L0.88, L0.89, L0.90, L0.91, L0.92, L0.93, L0.94, L0.95, L0.96, L0.97, L0.98, L0.99, L0.100, L0.101, L0.102, L0.103, L0.104, L0.105, L0.106, L0.107, L0.108, L0.109, L0.110, L0.111, L0.112, L0.113, L0.114, L0.115, L0.116, L0.117, L0.118, L0.119, L0.120, L0.121, L0.122, L0.123, L0.124, L0.125, L0.126, L0.127, L0.128, L0.129, L0.130, L0.131, L0.132, L0.133, L0.134, L0.135, L0.136, L0.137, L0.138, L0.139, L0.140, L0.141, L0.142, L0.143, L0.144, L0.145, L0.146, L0.147, L0.148, L0.149, L0.150, L0.151, L0.152, L0.153, L0.154, L0.155, L0.156, L0.157, L0.158, L0.159, L0.160, L0.161, L0.162, L0.163, L0.164, L0.165, L0.166, L0.167, L0.168, L0.169, L0.170, L0.171, L0.172, L0.173, L0.174, L0.175, L0.176, L0.177, L0.178, L0.179, L0.180, L0.181, L0.182, L0.183, L0.184, L0.185, L0.186, L0.187, L0.188, L0.189, L0.190, L0.191, L0.192, L0.193, L0.194, L0.195, L0.196, L0.197, L0.198, L0.199, L0.200"
- " Creating 1 files at level CompactionLevel::L0"
- "**** Simulation run 1, type=compact. 200 Input Files, 1.37mb total:"
- "L0, all files 7kb "
- "L0.201[200,201] |L0.201| "
@ -1599,6 +1617,9 @@ async fn over_two_times_max_files_per_plan() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:"
- "L0, all files 1.37mb "
- "L0.?[200,400] |-------------------------------------L0.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 200 files: L0.201, L0.202, L0.203, L0.204, L0.205, L0.206, L0.207, L0.208, L0.209, L0.210, L0.211, L0.212, L0.213, L0.214, L0.215, L0.216, L0.217, L0.218, L0.219, L0.220, L0.221, L0.222, L0.223, L0.224, L0.225, L0.226, L0.227, L0.228, L0.229, L0.230, L0.231, L0.232, L0.233, L0.234, L0.235, L0.236, L0.237, L0.238, L0.239, L0.240, L0.241, L0.242, L0.243, L0.244, L0.245, L0.246, L0.247, L0.248, L0.249, L0.250, L0.251, L0.252, L0.253, L0.254, L0.255, L0.256, L0.257, L0.258, L0.259, L0.260, L0.261, L0.262, L0.263, L0.264, L0.265, L0.266, L0.267, L0.268, L0.269, L0.270, L0.271, L0.272, L0.273, L0.274, L0.275, L0.276, L0.277, L0.278, L0.279, L0.280, L0.281, L0.282, L0.283, L0.284, L0.285, L0.286, L0.287, L0.288, L0.289, L0.290, L0.291, L0.292, L0.293, L0.294, L0.295, L0.296, L0.297, L0.298, L0.299, L0.300, L0.301, L0.302, L0.303, L0.304, L0.305, L0.306, L0.307, L0.308, L0.309, L0.310, L0.311, L0.312, L0.313, L0.314, L0.315, L0.316, L0.317, L0.318, L0.319, L0.320, L0.321, L0.322, L0.323, L0.324, L0.325, L0.326, L0.327, L0.328, L0.329, L0.330, L0.331, L0.332, L0.333, L0.334, L0.335, L0.336, L0.337, L0.338, L0.339, L0.340, L0.341, L0.342, L0.343, L0.344, L0.345, L0.346, L0.347, L0.348, L0.349, L0.350, L0.351, L0.352, L0.353, L0.354, L0.355, L0.356, L0.357, L0.358, L0.359, L0.360, L0.361, L0.362, L0.363, L0.364, L0.365, L0.366, L0.367, L0.368, L0.369, L0.370, L0.371, L0.372, L0.373, L0.374, L0.375, L0.376, L0.377, L0.378, L0.379, L0.380, L0.381, L0.382, L0.383, L0.384, L0.385, L0.386, L0.387, L0.388, L0.389, L0.390, L0.391, L0.392, L0.393, L0.394, L0.395, L0.396, L0.397, L0.398, L0.399, L0.400"
- " Creating 1 files at level CompactionLevel::L0"
- "**** Simulation run 2, type=compact. 10 Input Files, 70kb total:"
- "L0, all files 7kb "
- "L0.401[400,401] |L0.401| "
@ -1614,6 +1635,9 @@ async fn over_two_times_max_files_per_plan() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 70kb total:"
- "L0, all files 70kb "
- "L0.?[400,410] |-------------------------------------L0.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 10 files: L0.401, L0.402, L0.403, L0.404, L0.405, L0.406, L0.407, L0.408, L0.409, L0.410"
- " Creating 1 files at level CompactionLevel::L0"
- "**** Simulation run 3, type=compact. 3 Input Files, 2.8mb total:"
- "L0 "
- "L0.413[400,410] 70kb |L0.413|"
@ -1622,6 +1646,9 @@ async fn over_two_times_max_files_per_plan() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 2.8mb total:"
- "L1, all files 2.8mb "
- "L1.?[0,410] |-------------------------------------L1.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 3 files: L0.411, L0.412, L0.413"
- " Creating 1 files at level CompactionLevel::L1"
- "**** Final Output Files "
- "L1, all files 2.8mb "
- "L1.414[0,410] |------------------------------------L1.414------------------------------------|"
@ -2157,6 +2184,9 @@ async fn many_tiny_l1_files() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.37mb total:"
- "L1, all files 1.37mb "
- "L1.?[0,200] |-------------------------------------L1.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 200 files: L1.1, L1.2, L1.3, L1.4, L1.5, L1.6, L1.7, L1.8, L1.9, L1.10, L1.11, L1.12, L1.13, L1.14, L1.15, L1.16, L1.17, L1.18, L1.19, L1.20, L1.21, L1.22, L1.23, L1.24, L1.25, L1.26, L1.27, L1.28, L1.29, L1.30, L1.31, L1.32, L1.33, L1.34, L1.35, L1.36, L1.37, L1.38, L1.39, L1.40, L1.41, L1.42, L1.43, L1.44, L1.45, L1.46, L1.47, L1.48, L1.49, L1.50, L1.51, L1.52, L1.53, L1.54, L1.55, L1.56, L1.57, L1.58, L1.59, L1.60, L1.61, L1.62, L1.63, L1.64, L1.65, L1.66, L1.67, L1.68, L1.69, L1.70, L1.71, L1.72, L1.73, L1.74, L1.75, L1.76, L1.77, L1.78, L1.79, L1.80, L1.81, L1.82, L1.83, L1.84, L1.85, L1.86, L1.87, L1.88, L1.89, L1.90, L1.91, L1.92, L1.93, L1.94, L1.95, L1.96, L1.97, L1.98, L1.99, L1.100, L1.101, L1.102, L1.103, L1.104, L1.105, L1.106, L1.107, L1.108, L1.109, L1.110, L1.111, L1.112, L1.113, L1.114, L1.115, L1.116, L1.117, L1.118, L1.119, L1.120, L1.121, L1.122, L1.123, L1.124, L1.125, L1.126, L1.127, L1.128, L1.129, L1.130, L1.131, L1.132, L1.133, L1.134, L1.135, L1.136, L1.137, L1.138, L1.139, L1.140, L1.141, L1.142, L1.143, L1.144, L1.145, L1.146, L1.147, L1.148, L1.149, L1.150, L1.151, L1.152, L1.153, L1.154, L1.155, L1.156, L1.157, L1.158, L1.159, L1.160, L1.161, L1.162, L1.163, L1.164, L1.165, L1.166, L1.167, L1.168, L1.169, L1.170, L1.171, L1.172, L1.173, L1.174, L1.175, L1.176, L1.177, L1.178, L1.179, L1.180, L1.181, L1.182, L1.183, L1.184, L1.185, L1.186, L1.187, L1.188, L1.189, L1.190, L1.191, L1.192, L1.193, L1.194, L1.195, L1.196, L1.197, L1.198, L1.199, L1.200"
- " Creating 1 files at level CompactionLevel::L1"
- "**** Simulation run 1, type=compact. 88 Input Files, 616kb total:"
- "L1, all files 7kb "
- "L1.201[200,201] |L1.201| "
@ -2250,6 +2280,9 @@ async fn many_tiny_l1_files() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 616kb total:"
- "L1, all files 616kb "
- "L1.?[200,288] |-------------------------------------L1.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 88 files: L1.201, L1.202, L1.203, L1.204, L1.205, L1.206, L1.207, L1.208, L1.209, L1.210, L1.211, L1.212, L1.213, L1.214, L1.215, L1.216, L1.217, L1.218, L1.219, L1.220, L1.221, L1.222, L1.223, L1.224, L1.225, L1.226, L1.227, L1.228, L1.229, L1.230, L1.231, L1.232, L1.233, L1.234, L1.235, L1.236, L1.237, L1.238, L1.239, L1.240, L1.241, L1.242, L1.243, L1.244, L1.245, L1.246, L1.247, L1.248, L1.249, L1.250, L1.251, L1.252, L1.253, L1.254, L1.255, L1.256, L1.257, L1.258, L1.259, L1.260, L1.261, L1.262, L1.263, L1.264, L1.265, L1.266, L1.267, L1.268, L1.269, L1.270, L1.271, L1.272, L1.273, L1.274, L1.275, L1.276, L1.277, L1.278, L1.279, L1.280, L1.281, L1.282, L1.283, L1.284, L1.285, L1.286, L1.287, L1.288"
- " Creating 1 files at level CompactionLevel::L1"
- "**** Final Output Files "
- "L1 "
- "L1.289[0,200] 1.37mb|-----------------------L1.289------------------------| "
@ -2719,6 +2752,9 @@ async fn many_l0_and_overlapped_l1_files() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 1.3mb total:"
- "L0, all files 1.3mb "
- "L0.?[0,190] |-------------------------------------L0.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 190 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10, L0.11, L0.12, L0.13, L0.14, L0.15, L0.16, L0.17, L0.18, L0.19, L0.20, L0.21, L0.22, L0.23, L0.24, L0.25, L0.26, L0.27, L0.28, L0.29, L0.30, L0.31, L0.32, L0.33, L0.34, L0.35, L0.36, L0.37, L0.38, L0.39, L0.40, L0.41, L0.42, L0.43, L0.44, L0.45, L0.46, L0.47, L0.48, L0.49, L0.50, L0.51, L0.52, L0.53, L0.54, L0.55, L0.56, L0.57, L0.58, L0.59, L0.60, L0.61, L0.62, L0.63, L0.64, L0.65, L0.66, L0.67, L0.68, L0.69, L0.70, L0.71, L0.72, L0.73, L0.74, L0.75, L0.76, L0.77, L0.78, L0.79, L0.80, L0.81, L0.82, L0.83, L0.84, L0.85, L0.86, L0.87, L0.88, L0.89, L0.90, L0.91, L0.92, L0.93, L0.94, L0.95, L0.96, L0.97, L0.98, L0.99, L0.100, L0.101, L0.102, L0.103, L0.104, L0.105, L0.106, L0.107, L0.108, L0.109, L0.110, L0.111, L0.112, L0.113, L0.114, L0.115, L0.116, L0.117, L0.118, L0.119, L0.120, L0.121, L0.122, L0.123, L0.124, L0.125, L0.126, L0.127, L0.128, L0.129, L0.130, L0.131, L0.132, L0.133, L0.134, L0.135, L0.136, L0.137, L0.138, L0.139, L0.140, L0.141, L0.142, L0.143, L0.144, L0.145, L0.146, L0.147, L0.148, L0.149, L0.150, L0.151, L0.152, L0.153, L0.154, L0.155, L0.156, L0.157, L0.158, L0.159, L0.160, L0.161, L0.162, L0.163, L0.164, L0.165, L0.166, L0.167, L0.168, L0.169, L0.170, L0.171, L0.172, L0.173, L0.174, L0.175, L0.176, L0.177, L0.178, L0.179, L0.180, L0.181, L0.182, L0.183, L0.184, L0.185, L0.186, L0.187, L0.188, L0.189, L0.190"
- " Creating 1 files at level CompactionLevel::L0"
- "**** Simulation run 1, type=split(split_times=[159]). 21 Input Files, 21.3mb total:"
- "L0 "
- "L0.211[0,190] 1.3mb |----------------------------------L0.211----------------------------------| "
@ -2747,6 +2783,9 @@ async fn many_l0_and_overlapped_l1_files() {
- "L1 "
- "L1.?[0,159] 17.02mb |----------------------------L1.?-----------------------------| "
- "L1.?[159,199] 4.28mb |-----L1.?-----| "
- "Committing partition 1:"
- " Soft Deleting 21 files: L1.191, L1.192, L1.193, L1.194, L1.195, L1.196, L1.197, L1.198, L1.199, L1.200, L1.201, L1.202, L1.203, L1.204, L1.205, L1.206, L1.207, L1.208, L1.209, L1.210, L0.211"
- " Creating 2 files at level CompactionLevel::L1"
- "**** Final Output Files "
- "L1 "
- "L1.212[0,159] 17.02mb|---------------------------L1.212----------------------------| "
@ -3237,6 +3276,9 @@ async fn not_many_l0_and_overlapped_l1_files() {
- "**** 1 Output Files (parquet_file_id not yet assigned), 6.3mb total:"
- "L1, all files 6.3mb "
- "L1.?[0,190] |-------------------------------------L1.?-------------------------------------|"
- "Committing partition 1:"
- " Soft Deleting 195 files: L0.1, L0.2, L0.3, L0.4, L0.5, L0.6, L0.7, L0.8, L0.9, L0.10, L0.11, L0.12, L0.13, L0.14, L0.15, L0.16, L0.17, L0.18, L0.19, L0.20, L0.21, L0.22, L0.23, L0.24, L0.25, L0.26, L0.27, L0.28, L0.29, L0.30, L0.31, L0.32, L0.33, L0.34, L0.35, L0.36, L0.37, L0.38, L0.39, L0.40, L0.41, L0.42, L0.43, L0.44, L0.45, L0.46, L0.47, L0.48, L0.49, L0.50, L0.51, L0.52, L0.53, L0.54, L0.55, L0.56, L0.57, L0.58, L0.59, L0.60, L0.61, L0.62, L0.63, L0.64, L0.65, L0.66, L0.67, L0.68, L0.69, L0.70, L0.71, L0.72, L0.73, L0.74, L0.75, L0.76, L0.77, L0.78, L0.79, L0.80, L0.81, L0.82, L0.83, L0.84, L0.85, L0.86, L0.87, L0.88, L0.89, L0.90, L0.91, L0.92, L0.93, L0.94, L0.95, L0.96, L0.97, L0.98, L0.99, L0.100, L0.101, L0.102, L0.103, L0.104, L0.105, L0.106, L0.107, L0.108, L0.109, L0.110, L0.111, L0.112, L0.113, L0.114, L0.115, L0.116, L0.117, L0.118, L0.119, L0.120, L0.121, L0.122, L0.123, L0.124, L0.125, L0.126, L0.127, L0.128, L0.129, L0.130, L0.131, L0.132, L0.133, L0.134, L0.135, L0.136, L0.137, L0.138, L0.139, L0.140, L0.141, L0.142, L0.143, L0.144, L0.145, L0.146, L0.147, L0.148, L0.149, L0.150, L0.151, L0.152, L0.153, L0.154, L0.155, L0.156, L0.157, L0.158, L0.159, L0.160, L0.161, L0.162, L0.163, L0.164, L0.165, L0.166, L0.167, L0.168, L0.169, L0.170, L0.171, L0.172, L0.173, L0.174, L0.175, L0.176, L0.177, L0.178, L0.179, L0.180, L0.181, L0.182, L0.183, L0.184, L0.185, L0.186, L0.187, L0.188, L0.189, L0.190, L1.191, L1.192, L1.193, L1.194, L1.195"
- " Creating 1 files at level CompactionLevel::L1"
- "**** Simulation run 1, type=split(split_times=[2407]). 16 Input Files, 21.3mb total:"
- "L1 "
- "L1.196[200,209] 1mb |L1.196| "
@ -3259,6 +3301,9 @@ async fn not_many_l0_and_overlapped_l1_files() {
- "L2 "
- "L2.?[0,2407] 17.04mb|----------------------------L2.?-----------------------------| "
- "L2.?[2407,3009] 4.26mb |-----L2.?-----| "
- "Committing partition 1:"
- " Soft Deleting 16 files: L1.196, L1.197, L1.198, L1.199, L1.200, L1.201, L1.202, L1.203, L1.204, L1.205, L1.206, L1.207, L1.208, L1.209, L1.210, L1.211"
- " Creating 2 files at level CompactionLevel::L2"
- "**** Final Output Files "
- "L2 "
- "L2.212[0,2407] 17.04mb|---------------------------L2.212----------------------------| "

View File

@ -113,7 +113,7 @@ pub(crate) async fn run_layout_scenario(setup: &TestSetup) -> Vec<String> {
let compact_result = setup.run_compact().await;
// record what the compactor actually did
output.extend(compact_result.simulator_runs);
output.extend(compact_result.run_log);
// Record any skipped compactions (is after what the compactor actually did)
output.extend(get_skipped_compactions(setup).await);

View File

@ -0,0 +1,103 @@
//! Handles recording commit information to the test run log
use async_trait::async_trait;
use compactor2::{Commit, CommitWrapper};
use data_types::{CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId};
use std::{
fmt::Display,
sync::{Arc, Mutex},
};
use crate::display::ParquetFileInfo;
/// Records catalog operations to a shared run_log during tests
#[derive(Debug)]
struct CommitRecorder {
/// The inner commit that does the work
inner: Arc<(dyn Commit)>,
/// a log of what happened during this test
run_log: Arc<Mutex<Vec<String>>>,
}
#[async_trait]
impl Commit for CommitRecorder {
async fn commit(
&self,
partition_id: PartitionId,
delete: &[ParquetFile],
upgrade: &[ParquetFile],
create: &[ParquetFileParams],
target_level: CompactionLevel,
) -> Vec<ParquetFileId> {
// lock scope
{
let mut run_log = self.run_log.lock().unwrap();
run_log.push(format!("Committing partition {partition_id}:"));
if !delete.is_empty() {
run_log.push(format!(
" Soft Deleting {} files: {}",
delete.len(),
id_list(delete)
));
}
if !upgrade.is_empty() {
run_log.push(format!(
" Upgrading {} files level to {}: {}",
upgrade.len(),
target_level,
id_list(upgrade)
));
}
if !create.is_empty() {
run_log.push(format!(
" Creating {} files at level {}",
create.len(),
target_level
));
}
}
self.inner
.commit(partition_id, delete, upgrade, create, target_level)
.await
}
}
fn id_list(files: &[ParquetFile]) -> String {
// sort by id to get a more consistent display
let mut files: Vec<_> = files.iter().collect();
files.sort_by_key(|f| f.id);
let files: Vec<_> = files.iter().map(|f| f.display_id()).collect();
files.join(", ")
}
impl Display for CommitRecorder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "CommitRecorder")
}
}
/// Wraps another commit reference in a CommitRecorder
#[derive(Debug)]
pub(crate) struct CommitRecorderBuilder {
/// a shared log of what happened during a simulated run
run_log: Arc<Mutex<Vec<String>>>,
}
impl CommitRecorderBuilder {
pub fn new(run_log: Arc<Mutex<Vec<String>>>) -> Self {
Self { run_log }
}
}
impl CommitWrapper for CommitRecorderBuilder {
fn wrap(&self, inner: Arc<(dyn Commit)>) -> Arc<(dyn Commit + 'static)> {
let run_log = Arc::clone(&self.run_log);
Arc::new(CommitRecorder { inner, run_log })
}
}

View File

@ -12,14 +12,22 @@
clippy::dbg_macro
)]
mod commit_wrapper;
mod display;
mod simulator;
use commit_wrapper::CommitRecorderBuilder;
pub use display::{display_size, format_files, format_files_split};
use iox_query::exec::ExecutorType;
use simulator::ParquetFileSimulator;
use tracker::AsyncSemaphoreMetrics;
use std::{collections::HashSet, future::Future, num::NonZeroUsize, sync::Arc, time::Duration};
use std::{
collections::HashSet,
future::Future,
num::NonZeroUsize,
sync::{Arc, Mutex},
time::Duration,
};
use async_trait::async_trait;
use backoff::BackoffConfig;
@ -59,6 +67,8 @@ pub struct TestSetupBuilder<const WITH_FILES: bool> {
table: Arc<TestTable>,
partition: Arc<TestPartition>,
files: Vec<ParquetFile>,
/// a shared log of what happened during the test
run_log: Arc<Mutex<Vec<String>>>,
}
impl TestSetupBuilder<false> {
@ -84,6 +94,10 @@ impl TestSetupBuilder<false> {
let sort_key = SortKey::from_columns(["tag1", "tag2", "tag3", "time"]);
let partition = partition.update_sort_key(sort_key.clone()).await;
// intercept commit calls and record them as well
let run_log = Arc::new(Mutex::new(vec![]));
let commit_wrapper = CommitRecorderBuilder::new(Arc::clone(&run_log));
let config = Config {
shard_id: shard.shard.id,
metric_registry: catalog.metric_registry(),
@ -113,6 +127,7 @@ impl TestSetupBuilder<false> {
process_once: true,
simulate_without_object_store: false,
parquet_files_sink_override: None,
commit_wrapper: Some(Arc::new(commit_wrapper)),
all_errors_are_fatal: true,
max_num_columns_per_table: 200,
max_num_files_per_plan: 200,
@ -126,6 +141,7 @@ impl TestSetupBuilder<false> {
table,
partition,
files: vec![],
run_log,
}
}
@ -245,6 +261,7 @@ impl TestSetupBuilder<false> {
table: self.table,
partition: self.partition,
files,
run_log: Arc::new(Mutex::new(vec![])),
}
}
}
@ -287,8 +304,10 @@ impl<const WITH_FILES: bool> TestSetupBuilder<WITH_FILES> {
/// set simulate_without_object_store
pub fn simulate_without_object_store(mut self) -> Self {
let run_log = Arc::clone(&self.run_log);
self.config.simulate_without_object_store = true;
self.config.parquet_files_sink_override = Some(Arc::new(ParquetFileSimulator::new()));
self.config.parquet_files_sink_override =
Some(Arc::new(ParquetFileSimulator::new(run_log)));
self
}
@ -338,6 +357,7 @@ impl<const WITH_FILES: bool> TestSetupBuilder<WITH_FILES> {
table: self.table,
partition: self.partition,
config: Arc::new(self.config),
run_log: self.run_log,
}
}
}
@ -357,6 +377,8 @@ pub struct TestSetup {
pub partition: Arc<TestPartition>,
/// The compactor2 configuration
pub config: Arc<Config>,
/// a shared log of what happened during a simulated run
run_log: Arc<Mutex<Vec<String>>>,
}
impl TestSetup {
@ -400,6 +422,9 @@ impl TestSetup {
}
async fn run_compact_impl(&self, components: Arc<Components>) -> CompactResult {
// clear any existing log entries, if any
self.run_log.lock().unwrap().clear();
let config = Arc::clone(&self.config);
let job_semaphore = Arc::new(
Arc::new(AsyncSemaphoreMetrics::new(&config.metric_registry, [])).new_semaphore(10),
@ -426,24 +451,16 @@ impl TestSetup {
.await;
// get the results
let simulator_runs = if let Some(simulator) = components
.parquet_files_sink
.as_any()
.downcast_ref::<ParquetFileSimulator>()
{
simulator.runs()
} else {
vec![]
};
CompactResult { simulator_runs }
CompactResult {
run_log: self.run_log.lock().unwrap().clone(),
}
}
}
/// Information about the compaction that was run
pub struct CompactResult {
/// [`ParquetFileSimulator`] output, if enabled
pub simulator_runs: Vec<String>,
pub run_log: Vec<String>,
}
/// A collection of nanosecond timestamps relative to now

View File

@ -1,6 +1,9 @@
use std::{
collections::BTreeSet,
sync::{Arc, Mutex},
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
};
use async_trait::async_trait;
@ -44,7 +47,10 @@ use crate::{display::total_size, display_size, format_files};
/// uniform distribution.
#[derive(Debug, Default)]
pub struct ParquetFileSimulator {
runs: Arc<Mutex<Vec<SimulatedRun>>>,
/// entries that are added to while running
run_log: Arc<Mutex<Vec<String>>>,
/// Used to generate run ids for display
run_id_generator: AtomicUsize,
}
impl std::fmt::Display for ParquetFileSimulator {
@ -54,47 +60,17 @@ impl std::fmt::Display for ParquetFileSimulator {
}
impl ParquetFileSimulator {
/// Create a new simulator for creating parquet files
pub fn new() -> Self {
/// Create a new simulator for creating parquet files, which
/// appends its output to `run_log`
pub fn new(run_log: Arc<Mutex<Vec<String>>>) -> Self {
Self {
runs: Arc::new(Mutex::new(vec![])),
run_log,
run_id_generator: AtomicUsize::new(0),
}
}
/// Get a visual display of the simulated runs this simulator
/// has performed, for use in tests
pub fn runs(&self) -> Vec<String> {
let runs = (*self.runs.lock().unwrap()).clone();
runs.into_iter()
.enumerate()
.flat_map(|(i, run)| {
let SimulatedRun {
plan_type,
input_parquet_files,
output_params,
} = run;
let input_title = format!(
"**** Simulation run {}, type={}. {} Input Files, {} total:",
i,
plan_type,
input_parquet_files.len(),
display_size(total_size(&input_parquet_files))
);
let output_title = format!(
"**** {} Output Files (parquet_file_id not yet assigned), {} total:",
output_params.len(),
display_size(total_size(&output_params))
);
// hook up inputs and outputs
format_files(input_title, &input_parquet_files)
.into_iter()
.chain(format_files(output_title, &output_params).into_iter())
})
.collect()
fn next_run_id(&self) -> usize {
self.run_id_generator.fetch_add(1, Ordering::SeqCst)
}
}
@ -153,11 +129,13 @@ impl ParquetFilesSink for ParquetFileSimulator {
.collect();
// record what the simulator did
self.runs.lock().unwrap().push(SimulatedRun {
let run = SimulatedRun {
run_id: self.next_run_id(),
plan_type,
input_parquet_files,
output_params: output_params.clone(),
});
};
self.run_log.lock().unwrap().extend(run.into_strings());
Ok(output_params)
}
@ -238,12 +216,44 @@ impl SimulatedFile {
/// purposes
#[derive(Debug, Clone)]
pub struct SimulatedRun {
run_id: usize,
// fields are used in testing
plan_type: String,
input_parquet_files: Vec<ParquetFile>,
output_params: Vec<ParquetFileParams>,
}
impl SimulatedRun {
/// Convert this simulated run into a set of human readable strings
fn into_strings(self) -> impl Iterator<Item = String> {
let Self {
run_id,
plan_type,
input_parquet_files,
output_params,
} = self;
let input_title = format!(
"**** Simulation run {}, type={}. {} Input Files, {} total:",
run_id,
plan_type,
input_parquet_files.len(),
display_size(total_size(&input_parquet_files))
);
let output_title = format!(
"**** {} Output Files (parquet_file_id not yet assigned), {} total:",
output_params.len(),
display_size(total_size(&output_params))
);
// hook up inputs and outputs
format_files(input_title, &input_parquet_files)
.into_iter()
.chain(format_files(output_title, &output_params).into_iter())
}
}
fn overall_column_set<'a>(files: impl IntoIterator<Item = &'a ParquetFile>) -> ColumnSet {
let all_columns = files
.into_iter()

View File

@ -202,6 +202,7 @@ pub async fn create_compactor2_server_type(
process_once: compactor_config.process_once,
simulate_without_object_store: false,
parquet_files_sink_override: None,
commit_wrapper: None,
all_errors_are_fatal: false,
max_num_columns_per_table: compactor_config.max_num_columns_per_table,
max_num_files_per_plan: compactor_config.max_num_files_per_plan,