feat(compactor): Add more tests, improve sizes to simulator run display more (#6981)

* refactor: Split layout tests into their own module

* feat: Add more tests, improve sizes to simulator run display more

* fix: Apply suggestions from code review

Co-authored-by: Nga Tran <nga-tran@live.com>

* fix: fix comment wording

* fix: reporting order of skipped compactions

* chore: Run cargo hakari tasks

* fix: revert changes to Cargo.lock

* fix: revert workspace hack change

---------

Co-authored-by: Nga Tran <nga-tran@live.com>
Co-authored-by: CircleCI[bot] <circleci@influxdata.com>
pull/24376/head
Andrew Lamb 2023-02-14 20:34:19 +01:00 committed by GitHub
parent ed007cb71f
commit 04bd47e64a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1178 additions and 239 deletions

View File

@ -1,11 +1,10 @@
use std::time::Duration;
use arrow_util::assert_batches_sorted_eq;
use data_types::{CompactionLevel, ParquetFile, PartitionId};
use iox_tests::TestParquetFileBuilder;
use compactor2::config::AlgoVersion;
use compactor2_test_utils::{format_files, list_object_store, TestSetup, TestSetupBuilder};
use compactor2_test_utils::{list_object_store, TestSetup};
mod layouts;
#[tokio::test]
async fn test_compact_no_file() {
@ -526,233 +525,3 @@ async fn assert_skipped_compactions<const N: usize>(
assert_eq!(actual, expected);
}
// ----------------------------
// ----- Begin Layout Tests ----
// (TODO move these to a separate module)
// ----------------------------
const ONE_MB: u64 = 1024 * 1024;
/// creates a TestParquetFileBuilder setup for layout tests
fn parquet_builder() -> TestParquetFileBuilder {
TestParquetFileBuilder::default()
.with_compaction_level(CompactionLevel::Initial)
// need some LP to generate the schema
.with_line_protocol("table,tag1=A,tag2=B,tag3=C field_int=1i 100")
}
/// Creates the default TestSetupBuilder for layout tests
async fn layout_setup_builder() -> TestSetupBuilder<false> {
// Goal is to keep these as close to the compactor defaults (in
// clap_blocks) as possible so we can predict what the compactor
// will do in production with default settings
TestSetup::builder()
.await
.with_compact_version(AlgoVersion::TargetLevel)
.with_min_num_l1_files_to_compact(10)
.simulate_without_object_store()
}
/// runs the scenario and returns a string based output for comparison
async fn run_layout_scenario(setup: &TestSetup) -> Vec<String> {
setup.catalog.time_provider.inc(Duration::from_nanos(200));
// run the actual compaction
let compact_result = setup.run_compact().await;
assert_skipped_compactions(setup, []).await;
// record what the compactor actually did
let mut output = compact_result.simulator_runs;
// record the final state of the catalog
let output_files = setup.list_by_table_not_to_delete().await;
output.extend(format_files("**** Final Output Files ", &output_files));
output
}
#[tokio::test]
async fn layout_all_overlapping() {
test_helpers::maybe_start_logging();
let setup = layout_setup_builder()
.await
.with_max_desired_file_size_bytes(20 * ONE_MB)
.build()
.await;
// create virtual files
for _ in 0..10 {
setup
.partition
.create_parquet_file(
parquet_builder()
.with_min_time(100)
.with_max_time(200)
.with_file_size_bytes(ONE_MB),
)
.await;
}
insta::assert_yaml_snapshot!(
run_layout_scenario(&setup).await,
@r###"
---
- "**** Simulation run 0, type=split(split_times=[180]). Input Files:"
- "L0, all files 1mb "
- "L0.10[100,200] |------------------------------------L0.10-------------------------------------|"
- "L0.9[100,200] |-------------------------------------L0.9-------------------------------------|"
- "L0.8[100,200] |-------------------------------------L0.8-------------------------------------|"
- "L0.7[100,200] |-------------------------------------L0.7-------------------------------------|"
- "L0.6[100,200] |-------------------------------------L0.6-------------------------------------|"
- "L0.5[100,200] |-------------------------------------L0.5-------------------------------------|"
- "L0.4[100,200] |-------------------------------------L0.4-------------------------------------|"
- "L0.3[100,200] |-------------------------------------L0.3-------------------------------------|"
- "L0.2[100,200] |-------------------------------------L0.2-------------------------------------|"
- "L0.1[100,200] |-------------------------------------L0.1-------------------------------------|"
- "**** Final Output Files "
- "L1 "
- "L1.11[100,180] 8mb |----------------------------L1.11-----------------------------| "
- "L1.12[180,200] 2mb |----L1.12-----|"
"###
);
}
#[tokio::test]
async fn layout_l1_with_new_non_overlapping_l0() {
test_helpers::maybe_start_logging();
let five_kb = 5 * 1024;
let setup = layout_setup_builder()
.await
.with_max_desired_file_size_bytes(100 * ONE_MB)
.build()
.await;
// Model several non overlapping L1 file and new L0 files written
// that are not overlapping
//
// L1: 100MB, 100MB, 100MB, 100MB
// L0: 5k, 5k, 5k, 5k, 5k (all non overlapping with the L1 files)
for i in 0..4 {
setup
.partition
.create_parquet_file(
parquet_builder()
.with_min_time(50 + i * 50)
.with_max_time(100 + i * 50)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(100 * ONE_MB),
)
.await;
}
for i in 0..5 {
setup
.partition
.create_parquet_file(
parquet_builder()
.with_min_time(300 + i * 50)
.with_max_time(350 + i * 50)
.with_file_size_bytes(five_kb),
)
.await;
}
setup.catalog.time_provider.inc(Duration::from_nanos(200));
insta::assert_yaml_snapshot!(
run_layout_scenario(&setup).await,
@r###"
---
- "**** Simulation run 0, type=compact. Input Files:"
- "L0, all files 5kb "
- "L0.9[500,550] |-----L0.9-----|"
- "L0.8[450,500] |-----L0.8-----| "
- "L0.7[400,450] |-----L0.7-----| "
- "L0.6[350,400] |-----L0.6-----| "
- "L0.5[300,350] |-----L0.5-----| "
- "**** Simulation run 1, type=split(split_times=[175, 300, 425]). Input Files:"
- "L1 "
- "L1.4[200,250] 100mb |-L1.4-| "
- "L1.3[150,200] 100mb |-L1.3-| "
- "L1.2[100,150] 100mb |-L1.2-| "
- "L1.1[50,100] 100mb |-L1.1-| "
- "L1.10[300,550] 25kb |----------------L1.10-----------------|"
- "**** Final Output Files "
- "L2, all files 100.01mb "
- "L2.11[50,175] |------L2.11-------| "
- "L2.12[175,300] |------L2.12-------| "
- "L2.13[300,425] |------L2.13-------| "
- "L2.14[425,550] |------L2.14-------|"
"###
);
}
#[tokio::test]
async fn layout_l1_with_new_non_overlapping_l0_larger() {
test_helpers::maybe_start_logging();
let one_mb = 1024 * 1024;
let setup = layout_setup_builder()
.await
.with_max_desired_file_size_bytes(100 * ONE_MB)
.build()
.await;
// Model several non overlapping L1 file and new L0 files written
// that are also not overlapping
//
// L1: 20MB, 50MB, 20MB, 3MB
// L0: 5MB, 5MB, 5MB
for (i, sz) in [20, 50, 20, 3].iter().enumerate() {
let i = i as i64;
setup
.partition
.create_parquet_file(
parquet_builder()
.with_min_time(50 + i * 50)
.with_max_time(100 + i * 50)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.with_file_size_bytes(sz * one_mb),
)
.await;
}
for i in 0..3 {
setup
.partition
.create_parquet_file(
parquet_builder()
.with_min_time(300 + i * 50)
.with_max_time(350 + i * 50)
.with_file_size_bytes(5 * one_mb),
)
.await;
}
setup.catalog.time_provider.inc(Duration::from_nanos(200));
insta::assert_yaml_snapshot!(
run_layout_scenario(&setup).await,
@r###"
---
- "**** Simulation run 0, type=split(split_times=[420]). Input Files:"
- "L0, all files 5mb "
- "L0.7[400,450] |----------L0.7----------| "
- "L0.6[350,400] |----------L0.6----------| "
- "L0.5[300,350] |----------L0.5----------| "
- "**** Simulation run 1, type=split(split_times=[421]). Input Files:"
- "L1 "
- "L1.4[200,250] 3mb |--L1.4--| "
- "L1.3[150,200] 20mb |--L1.3--| "
- "L1.2[100,150] 50mb |--L1.2--| "
- "L1.1[50,100] 20mb |--L1.1--| "
- "L1.9[420,450] 3mb |L1.9|"
- "L1.8[300,420] 12mb |---------L1.8---------| "
- "**** Final Output Files "
- "L2 "
- "L2.10[50,421] 100.17mb|---------------------------------L2.10----------------------------------| "
- "L2.11[421,450] 7.83mb |L2.11|"
"###
);
}

File diff suppressed because it is too large Load Diff

View File

@ -260,7 +260,8 @@ fn display_file_id(file: &ParquetFile) -> String {
format!("{level}.{id}")
}
fn display_size(sz: i64) -> String {
/// Format a size for reasonable human reading
pub fn display_size(sz: i64) -> String {
let kbyte = 1024.0;
let mbyte = 1024.0 * kbyte;
let gbyte = 1024.0 * mbyte;

View File

@ -14,7 +14,7 @@
mod display;
mod simulator;
pub use display::{format_files, format_files_split};
pub use display::{display_size, format_files, format_files_split};
use iox_query::exec::ExecutorType;
use simulator::ParquetFileSimulator;
use tracker::AsyncSemaphoreMetrics;
@ -319,6 +319,27 @@ impl<const WITH_FILES: bool> TestSetupBuilder<WITH_FILES> {
self
}
/// Set percentage_max_file_size
pub fn with_percentage_max_file_size(mut self, percentage_max_file_size: u16) -> Self {
self.config.percentage_max_file_size = percentage_max_file_size;
self
}
/// Set split_percentage
pub fn with_split_percentage(mut self, split_percentage: u16) -> Self {
self.config.split_percentage = split_percentage;
self
}
/// Set max_input_parquet_bytes_per_partition
pub fn with_max_input_parquet_bytes_per_partition(
mut self,
max_input_parquet_bytes_per_partition: usize,
) -> Self {
self.config.max_input_parquet_bytes_per_partition = max_input_parquet_bytes_per_partition;
self
}
/// Create a [`TestSetup`]
pub async fn build(self) -> TestSetup {
let candidate_partition = Arc::new(PartitionInfo {

View File

@ -14,7 +14,7 @@ use uuid::Uuid;
use compactor2::{DynError, ParquetFilesSink, PartitionInfo, PlanIR};
use crate::format_files;
use crate::{display_size, format_files};
/// Simulates the result of running a compaction plan that
/// produces multiple parquet files.
@ -69,9 +69,17 @@ impl ParquetFileSimulator {
runs.into_iter()
.enumerate()
.flat_map(|(i, run)| {
let total_input_size: i64 = run
.input_parquet_files
.iter()
.map(|f| f.file_size_bytes)
.sum();
let title = format!(
"**** Simulation run {}, type={}. Input Files:",
i, run.plan_type
"**** Simulation run {}, type={}. {} Input Files, {} total:",
i,
run.plan_type,
run.input_parquet_files.len(),
display_size(total_input_size)
);
format_files(title, &run.input_parquet_files)
})