test: more compactor layout tests (#6988)
* test: more compactor layout tests * chore: address review commentspull/24376/head
parent
04bd47e64a
commit
0ffb211c54
|
@ -88,11 +88,11 @@ pub struct Compactor2Config {
|
||||||
|
|
||||||
/// Desired max size of compacted parquet files.
|
/// Desired max size of compacted parquet files.
|
||||||
/// It is a target desired value, rather than a guarantee.
|
/// It is a target desired value, rather than a guarantee.
|
||||||
/// 1024 * 1024 * 25 = 26,214,400 (25MB)
|
/// 1024 * 1024 * 100 = 104,857,600
|
||||||
#[clap(
|
#[clap(
|
||||||
long = "compaction-max-desired-size-bytes",
|
long = "compaction-max-desired-size-bytes",
|
||||||
env = "INFLUXDB_IOX_COMPACTION_MAX_DESIRED_FILE_SIZE_BYTES",
|
env = "INFLUXDB_IOX_COMPACTION_MAX_DESIRED_FILE_SIZE_BYTES",
|
||||||
default_value = "26214400",
|
default_value = "104857600",
|
||||||
action
|
action
|
||||||
)]
|
)]
|
||||||
pub max_desired_file_size_bytes: u64,
|
pub max_desired_file_size_bytes: u64,
|
||||||
|
|
|
@ -97,5 +97,12 @@ mod tests {
|
||||||
Arc::new(FalsePartitionFilter::new()),
|
Arc::new(FalsePartitionFilter::new()),
|
||||||
]);
|
]);
|
||||||
assert!(!filter.apply(&p_info, &[]).await.unwrap());
|
assert!(!filter.apply(&p_info, &[]).await.unwrap());
|
||||||
|
|
||||||
|
let filter = OrPartitionFilter::new(vec![
|
||||||
|
Arc::new(FalsePartitionFilter::new()),
|
||||||
|
Arc::new(FalsePartitionFilter::new()),
|
||||||
|
Arc::new(TruePartitionFilter::new()),
|
||||||
|
]);
|
||||||
|
assert!(filter.apply(&p_info, &[]).await.unwrap());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,7 +70,7 @@ async fn layout_setup_builder() -> TestSetupBuilder<false> {
|
||||||
.with_max_input_files_per_partition(200)
|
.with_max_input_files_per_partition(200)
|
||||||
.with_max_input_parquet_bytes_per_partition(256 * ONE_MB as usize)
|
.with_max_input_parquet_bytes_per_partition(256 * ONE_MB as usize)
|
||||||
.with_min_num_l1_files_to_compact(10)
|
.with_min_num_l1_files_to_compact(10)
|
||||||
.with_max_desired_file_size_bytes(256 * ONE_MB)
|
.with_max_desired_file_size_bytes(100 * ONE_MB)
|
||||||
.simulate_without_object_store()
|
.simulate_without_object_store()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,7 +88,7 @@ async fn all_overlapping_l0() {
|
||||||
parquet_builder()
|
parquet_builder()
|
||||||
.with_min_time(100)
|
.with_min_time(100)
|
||||||
.with_max_time(200)
|
.with_max_time(200)
|
||||||
.with_file_size_bytes(10 * ONE_MB),
|
.with_file_size_bytes(9 * ONE_MB),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
@ -98,7 +98,7 @@ async fn all_overlapping_l0() {
|
||||||
@r###"
|
@r###"
|
||||||
---
|
---
|
||||||
- "**** Input Files "
|
- "**** Input Files "
|
||||||
- "L0, all files 10mb "
|
- "L0, all files 9mb "
|
||||||
- "L0.1[100,200] |-------------------------------------L0.1-------------------------------------|"
|
- "L0.1[100,200] |-------------------------------------L0.1-------------------------------------|"
|
||||||
- "L0.2[100,200] |-------------------------------------L0.2-------------------------------------|"
|
- "L0.2[100,200] |-------------------------------------L0.2-------------------------------------|"
|
||||||
- "L0.3[100,200] |-------------------------------------L0.3-------------------------------------|"
|
- "L0.3[100,200] |-------------------------------------L0.3-------------------------------------|"
|
||||||
|
@ -109,8 +109,8 @@ async fn all_overlapping_l0() {
|
||||||
- "L0.8[100,200] |-------------------------------------L0.8-------------------------------------|"
|
- "L0.8[100,200] |-------------------------------------L0.8-------------------------------------|"
|
||||||
- "L0.9[100,200] |-------------------------------------L0.9-------------------------------------|"
|
- "L0.9[100,200] |-------------------------------------L0.9-------------------------------------|"
|
||||||
- "L0.10[100,200] |------------------------------------L0.10-------------------------------------|"
|
- "L0.10[100,200] |------------------------------------L0.10-------------------------------------|"
|
||||||
- "**** Simulation run 0, type=split(split_times=[180]). 10 Input Files, 100mb total:"
|
- "**** Simulation run 0, type=split(split_times=[180]). 10 Input Files, 90mb total:"
|
||||||
- "L0, all files 10mb "
|
- "L0, all files 9mb "
|
||||||
- "L0.10[100,200] |------------------------------------L0.10-------------------------------------|"
|
- "L0.10[100,200] |------------------------------------L0.10-------------------------------------|"
|
||||||
- "L0.9[100,200] |-------------------------------------L0.9-------------------------------------|"
|
- "L0.9[100,200] |-------------------------------------L0.9-------------------------------------|"
|
||||||
- "L0.8[100,200] |-------------------------------------L0.8-------------------------------------|"
|
- "L0.8[100,200] |-------------------------------------L0.8-------------------------------------|"
|
||||||
|
@ -123,8 +123,8 @@ async fn all_overlapping_l0() {
|
||||||
- "L0.1[100,200] |-------------------------------------L0.1-------------------------------------|"
|
- "L0.1[100,200] |-------------------------------------L0.1-------------------------------------|"
|
||||||
- "**** Final Output Files "
|
- "**** Final Output Files "
|
||||||
- "L1 "
|
- "L1 "
|
||||||
- "L1.11[100,180] 80mb |----------------------------L1.11-----------------------------| "
|
- "L1.11[100,180] 72mb |----------------------------L1.11-----------------------------| "
|
||||||
- "L1.12[180,200] 20mb |----L1.12-----|"
|
- "L1.12[180,200] 18mb |----L1.12-----|"
|
||||||
"###
|
"###
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -263,7 +263,7 @@ async fn l1_with_non_overlapping_l0() {
|
||||||
// Model several non overlapping L1 file and new L0 files written
|
// Model several non overlapping L1 file and new L0 files written
|
||||||
// that are not overlapping
|
// that are not overlapping
|
||||||
//
|
//
|
||||||
// L1: 100MB, 100MB
|
// L1: 10MB, 10MB
|
||||||
// L0: 5k, 5k, 5k, 5k, 5k (all non overlapping with the L1 files)
|
// L0: 5k, 5k, 5k, 5k, 5k (all non overlapping with the L1 files)
|
||||||
for i in 0..2 {
|
for i in 0..2 {
|
||||||
setup
|
setup
|
||||||
|
@ -273,7 +273,7 @@ async fn l1_with_non_overlapping_l0() {
|
||||||
.with_min_time(50 + i * 50)
|
.with_min_time(50 + i * 50)
|
||||||
.with_max_time(100 + i * 50)
|
.with_max_time(100 + i * 50)
|
||||||
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||||
.with_file_size_bytes(100 * ONE_MB),
|
.with_file_size_bytes(10 * ONE_MB),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
@ -301,8 +301,8 @@ async fn l1_with_non_overlapping_l0() {
|
||||||
- "L0.6[450,500] 5kb |-L0.6-| "
|
- "L0.6[450,500] 5kb |-L0.6-| "
|
||||||
- "L0.7[500,550] 5kb |-L0.7-|"
|
- "L0.7[500,550] 5kb |-L0.7-|"
|
||||||
- "L1 "
|
- "L1 "
|
||||||
- "L1.1[50,100] 100mb |-L1.1-| "
|
- "L1.1[50,100] 10mb |-L1.1-| "
|
||||||
- "L1.2[100,150] 100mb |-L1.2-| "
|
- "L1.2[100,150] 10mb |-L1.2-| "
|
||||||
- "**** Simulation run 0, type=compact. 5 Input Files, 25kb total:"
|
- "**** Simulation run 0, type=compact. 5 Input Files, 25kb total:"
|
||||||
- "L0, all files 5kb "
|
- "L0, all files 5kb "
|
||||||
- "L0.7[500,550] |-----L0.7-----|"
|
- "L0.7[500,550] |-----L0.7-----|"
|
||||||
|
@ -312,8 +312,8 @@ async fn l1_with_non_overlapping_l0() {
|
||||||
- "L0.3[300,350] |-----L0.3-----| "
|
- "L0.3[300,350] |-----L0.3-----| "
|
||||||
- "**** Final Output Files "
|
- "**** Final Output Files "
|
||||||
- "L1 "
|
- "L1 "
|
||||||
- "L1.1[50,100] 100mb |-L1.1-| "
|
- "L1.1[50,100] 10mb |-L1.1-| "
|
||||||
- "L1.2[100,150] 100mb |-L1.2-| "
|
- "L1.2[100,150] 10mb |-L1.2-| "
|
||||||
- "L1.8[300,550] 25kb |-----------------L1.8-----------------|"
|
- "L1.8[300,550] 25kb |-----------------L1.8-----------------|"
|
||||||
"###
|
"###
|
||||||
);
|
);
|
||||||
|
@ -374,13 +374,17 @@ async fn l1_with_non_overlapping_l0_larger() {
|
||||||
- "L0.7[400,450] |----------L0.7----------| "
|
- "L0.7[400,450] |----------L0.7----------| "
|
||||||
- "L0.6[350,400] |----------L0.6----------| "
|
- "L0.6[350,400] |----------L0.6----------| "
|
||||||
- "L0.5[300,350] |----------L0.5----------| "
|
- "L0.5[300,350] |----------L0.5----------| "
|
||||||
- "**** Final Output Files "
|
- "**** Simulation run 1, type=split(split_times=[370]). 5 Input Files, 108mb total:"
|
||||||
- "L1 "
|
- "L1 "
|
||||||
- "L1.1[50,100] 20mb |--L1.1--| "
|
|
||||||
- "L1.2[100,150] 50mb |--L1.2--| "
|
|
||||||
- "L1.3[150,200] 20mb |--L1.3--| "
|
|
||||||
- "L1.4[200,250] 3mb |--L1.4--| "
|
- "L1.4[200,250] 3mb |--L1.4--| "
|
||||||
|
- "L1.3[150,200] 20mb |--L1.3--| "
|
||||||
|
- "L1.2[100,150] 50mb |--L1.2--| "
|
||||||
|
- "L1.1[50,100] 20mb |--L1.1--| "
|
||||||
- "L1.8[300,450] 15mb |------------L1.8------------|"
|
- "L1.8[300,450] 15mb |------------L1.8------------|"
|
||||||
|
- "**** Final Output Files "
|
||||||
|
- "L2 "
|
||||||
|
- "L2.9[50,370] 86.4mb |-----------------------------L2.9-----------------------------| "
|
||||||
|
- "L2.10[370,450] 21.6mb |----L2.10-----|"
|
||||||
"###
|
"###
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -392,7 +396,7 @@ async fn l1_too_much_with_non_overlapping_l0() {
|
||||||
let setup = layout_setup_builder().await.build().await;
|
let setup = layout_setup_builder().await.build().await;
|
||||||
|
|
||||||
// If we wait until we have 10 L1 files each is not large
|
// If we wait until we have 10 L1 files each is not large
|
||||||
// enough to upgrade, the total size will be > 100MB and we will
|
// enough to upgrade, the total size will be > 256MB and we will
|
||||||
// skip the partition
|
// skip the partition
|
||||||
//
|
//
|
||||||
// L1: 90MB, 80MB, 70MB, ..., 70MB
|
// L1: 90MB, 80MB, 70MB, ..., 70MB
|
||||||
|
@ -467,6 +471,158 @@ async fn l1_too_much_with_non_overlapping_l0() {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
// Test that compacts L1 files in second round if their number of files >= min_num_l1_files_to_compact
|
||||||
|
async fn man_l1_with_non_overlapping_l0() {
|
||||||
|
test_helpers::maybe_start_logging();
|
||||||
|
|
||||||
|
let setup = layout_setup_builder().await.build().await;
|
||||||
|
|
||||||
|
// Create 10 L1 files so they will be compacted to L2 because they exceed min_num_l1_files_to_compact
|
||||||
|
//
|
||||||
|
// L1: 9MB, 8MB, 7MB, ..., 7MB
|
||||||
|
// L0: ..
|
||||||
|
|
||||||
|
for (i, sz) in [9, 8, 7, 7, 7, 7, 7, 7, 7, 7].iter().enumerate() {
|
||||||
|
let i = i as i64;
|
||||||
|
setup
|
||||||
|
.partition
|
||||||
|
.create_parquet_file(
|
||||||
|
parquet_builder()
|
||||||
|
.with_min_time(50 + i * 50)
|
||||||
|
.with_max_time(100 + i * 50)
|
||||||
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||||
|
.with_file_size_bytes(sz * ONE_MB),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
// note these overlap with each other, but not the L1 files
|
||||||
|
for _ in 0..3 {
|
||||||
|
setup
|
||||||
|
.partition
|
||||||
|
.create_parquet_file(
|
||||||
|
parquet_builder()
|
||||||
|
.with_min_time(600)
|
||||||
|
.with_max_time(650)
|
||||||
|
.with_file_size_bytes(5 * ONE_MB),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
insta::assert_yaml_snapshot!(
|
||||||
|
run_layout_scenario(&setup).await,
|
||||||
|
@r###"
|
||||||
|
---
|
||||||
|
- "**** Input Files "
|
||||||
|
- "L0 "
|
||||||
|
- "L0.11[600,650] 5mb |L0.11|"
|
||||||
|
- "L0.12[600,650] 5mb |L0.12|"
|
||||||
|
- "L0.13[600,650] 5mb |L0.13|"
|
||||||
|
- "L1 "
|
||||||
|
- "L1.1[50,100] 9mb |L1.1| "
|
||||||
|
- "L1.2[100,150] 8mb |L1.2| "
|
||||||
|
- "L1.3[150,200] 7mb |L1.3| "
|
||||||
|
- "L1.4[200,250] 7mb |L1.4| "
|
||||||
|
- "L1.5[250,300] 7mb |L1.5| "
|
||||||
|
- "L1.6[300,350] 7mb |L1.6| "
|
||||||
|
- "L1.7[350,400] 7mb |L1.7| "
|
||||||
|
- "L1.8[400,450] 7mb |L1.8| "
|
||||||
|
- "L1.9[450,500] 7mb |L1.9| "
|
||||||
|
- "L1.10[500,550] 7mb |L1.10| "
|
||||||
|
- "**** Simulation run 0, type=compact. 3 Input Files, 15mb total:"
|
||||||
|
- "L0, all files 5mb "
|
||||||
|
- "L0.13[600,650] |------------------------------------L0.13-------------------------------------|"
|
||||||
|
- "L0.12[600,650] |------------------------------------L0.12-------------------------------------|"
|
||||||
|
- "L0.11[600,650] |------------------------------------L0.11-------------------------------------|"
|
||||||
|
- "**** Simulation run 1, type=split(split_times=[530]). 11 Input Files, 88mb total:"
|
||||||
|
- "L1 "
|
||||||
|
- "L1.10[500,550] 7mb |L1.10| "
|
||||||
|
- "L1.9[450,500] 7mb |L1.9| "
|
||||||
|
- "L1.8[400,450] 7mb |L1.8| "
|
||||||
|
- "L1.7[350,400] 7mb |L1.7| "
|
||||||
|
- "L1.6[300,350] 7mb |L1.6| "
|
||||||
|
- "L1.5[250,300] 7mb |L1.5| "
|
||||||
|
- "L1.4[200,250] 7mb |L1.4| "
|
||||||
|
- "L1.3[150,200] 7mb |L1.3| "
|
||||||
|
- "L1.2[100,150] 8mb |L1.2| "
|
||||||
|
- "L1.1[50,100] 9mb |L1.1| "
|
||||||
|
- "L1.14[600,650] 15mb |L1.14|"
|
||||||
|
- "**** Final Output Files "
|
||||||
|
- "L2 "
|
||||||
|
- "L2.15[50,530] 70.4mb|----------------------------L2.15-----------------------------| "
|
||||||
|
- "L2.16[530,650] 17.6mb |----L2.16-----|"
|
||||||
|
"###
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
// Test that compacts L1 files in second round if their total size > max_desired_file_size
|
||||||
|
async fn large_l1_with_non_overlapping_l0() {
|
||||||
|
test_helpers::maybe_start_logging();
|
||||||
|
|
||||||
|
let setup = layout_setup_builder().await.build().await;
|
||||||
|
|
||||||
|
// L1 files with total size > 100MB will get compacted after in round 2
|
||||||
|
// after the L0 files are compacted in round 1
|
||||||
|
// L1: 90MB, 80MB
|
||||||
|
// L0: ..
|
||||||
|
|
||||||
|
for (i, sz) in [90, 80].iter().enumerate() {
|
||||||
|
let i = i as i64;
|
||||||
|
setup
|
||||||
|
.partition
|
||||||
|
.create_parquet_file(
|
||||||
|
parquet_builder()
|
||||||
|
.with_min_time(50 + i * 50)
|
||||||
|
.with_max_time(100 + i * 50)
|
||||||
|
.with_compaction_level(CompactionLevel::FileNonOverlapped)
|
||||||
|
.with_file_size_bytes(sz * ONE_MB),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
// note these overlap with each other, but not the L1 files
|
||||||
|
for _ in 0..3 {
|
||||||
|
setup
|
||||||
|
.partition
|
||||||
|
.create_parquet_file(
|
||||||
|
parquet_builder()
|
||||||
|
.with_min_time(600)
|
||||||
|
.with_max_time(650)
|
||||||
|
.with_file_size_bytes(5 * ONE_MB),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
insta::assert_yaml_snapshot!(
|
||||||
|
run_layout_scenario(&setup).await,
|
||||||
|
@r###"
|
||||||
|
---
|
||||||
|
- "**** Input Files "
|
||||||
|
- "L0 "
|
||||||
|
- "L0.3[600,650] 5mb |L0.3| "
|
||||||
|
- "L0.4[600,650] 5mb |L0.4| "
|
||||||
|
- "L0.5[600,650] 5mb |L0.5| "
|
||||||
|
- "L1 "
|
||||||
|
- "L1.1[50,100] 90mb |L1.1| "
|
||||||
|
- "L1.2[100,150] 80mb |L1.2| "
|
||||||
|
- "**** Simulation run 0, type=compact. 3 Input Files, 15mb total:"
|
||||||
|
- "L0, all files 5mb "
|
||||||
|
- "L0.5[600,650] |-------------------------------------L0.5-------------------------------------|"
|
||||||
|
- "L0.4[600,650] |-------------------------------------L0.4-------------------------------------|"
|
||||||
|
- "L0.3[600,650] |-------------------------------------L0.3-------------------------------------|"
|
||||||
|
- "**** Simulation run 1, type=split(split_times=[375]). 3 Input Files, 185mb total:"
|
||||||
|
- "L1 "
|
||||||
|
- "L1.2[100,150] 80mb |L1.2| "
|
||||||
|
- "L1.1[50,100] 90mb |L1.1| "
|
||||||
|
- "L1.6[600,650] 15mb |L1.6| "
|
||||||
|
- "**** Final Output Files "
|
||||||
|
- "L2 "
|
||||||
|
- "L2.7[50,375] 100.21mb|------------------L2.7-------------------| "
|
||||||
|
- "L2.8[375,650] 84.79mb |---------------L2.8---------------| "
|
||||||
|
"###
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn many_tiny_l0_files() {
|
async fn many_tiny_l0_files() {
|
||||||
test_helpers::maybe_start_logging();
|
test_helpers::maybe_start_logging();
|
||||||
|
|
Loading…
Reference in New Issue