chore: enable more ManySmallFiles compactions (#8603)
* chore: enable more ManySmallFiles compactions * chore: insta churnpull/24376/head
parent
72c48d34f8
commit
0996a95630
|
@ -11,9 +11,9 @@ use crate::components::{
|
||||||
Components,
|
Components,
|
||||||
};
|
};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use data_types::{CompactionLevel, FileRange, ParquetFile, Timestamp};
|
use data_types::{CompactionLevel, FileRange, ParquetFile, Timestamp, TransitionPartitionId};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use observability_deps::tracing::debug;
|
use observability_deps::tracing::{debug, info};
|
||||||
|
|
||||||
use crate::{error::DynError, PartitionInfo, RoundInfo};
|
use crate::{error::DynError, PartitionInfo, RoundInfo};
|
||||||
|
|
||||||
|
@ -218,6 +218,7 @@ impl LevelBasedRoundInfo {
|
||||||
/// If neither is returned, the caller will identify another type of RoundInfo for this round of compaction.
|
/// If neither is returned, the caller will identify another type of RoundInfo for this round of compaction.
|
||||||
pub fn vertical_split_handling(
|
pub fn vertical_split_handling(
|
||||||
&self,
|
&self,
|
||||||
|
partition_id: TransitionPartitionId,
|
||||||
files: Vec<ParquetFile>,
|
files: Vec<ParquetFile>,
|
||||||
max_compact_size: usize,
|
max_compact_size: usize,
|
||||||
) -> (Vec<i64>, Vec<FileRange>) {
|
) -> (Vec<i64>, Vec<FileRange>) {
|
||||||
|
@ -239,6 +240,12 @@ impl LevelBasedRoundInfo {
|
||||||
for chain in &chains {
|
for chain in &chains {
|
||||||
let chain_cap: usize = chain.iter().map(|f| f.file_size_bytes as usize).sum();
|
let chain_cap: usize = chain.iter().map(|f| f.file_size_bytes as usize).sum();
|
||||||
|
|
||||||
|
if chain.len() > 300 && chain_cap / chain.len() < max_compact_size / 10 {
|
||||||
|
info!("skipping vertical splitting on partition_id {} for now, due to excessive file count. chain length: {}, cap: {} MB",
|
||||||
|
partition_id, chain.len(), chain_cap/1024/1024);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// A single file over max size can just get upgraded to L1, then L2, unless it overlaps other L0s.
|
// A single file over max size can just get upgraded to L1, then L2, unless it overlaps other L0s.
|
||||||
// So multi file chains over the max compact size may need split
|
// So multi file chains over the max compact size may need split
|
||||||
if chain.len() > 1 && chain_cap > max_compact_size {
|
if chain.len() > 1 && chain_cap > max_compact_size {
|
||||||
|
@ -424,8 +431,11 @@ impl RoundInfoSource for LevelBasedRoundInfo {
|
||||||
max_total_file_size_to_group: self.max_total_file_size_per_plan,
|
max_total_file_size_to_group: self.max_total_file_size_per_plan,
|
||||||
}
|
}
|
||||||
} else if start_level == CompactionLevel::Initial {
|
} else if start_level == CompactionLevel::Initial {
|
||||||
let (split_times, ranges) = self
|
let (split_times, ranges) = self.vertical_split_handling(
|
||||||
.vertical_split_handling(files.clone().to_vec(), self.max_total_file_size_per_plan);
|
partition_info.partition_id(),
|
||||||
|
files.clone().to_vec(),
|
||||||
|
self.max_total_file_size_per_plan,
|
||||||
|
);
|
||||||
|
|
||||||
if !split_times.is_empty() {
|
if !split_times.is_empty() {
|
||||||
RoundInfo::VerticalSplit { split_times }
|
RoundInfo::VerticalSplit { split_times }
|
||||||
|
|
|
@ -5725,21 +5725,21 @@ async fn l0s_needing_vertical_split() {
|
||||||
- "L0.998[24,100] 1.02us |-----------------------------------------L0.998-----------------------------------------|"
|
- "L0.998[24,100] 1.02us |-----------------------------------------L0.998-----------------------------------------|"
|
||||||
- "L0.999[24,100] 1.02us |-----------------------------------------L0.999-----------------------------------------|"
|
- "L0.999[24,100] 1.02us |-----------------------------------------L0.999-----------------------------------------|"
|
||||||
- "L0.1000[24,100] 1.02us |----------------------------------------L0.1000-----------------------------------------|"
|
- "L0.1000[24,100] 1.02us |----------------------------------------L0.1000-----------------------------------------|"
|
||||||
- "**** Final Output Files (2.63gb written)"
|
- "**** Final Output Files (2.62gb written)"
|
||||||
- "L2 "
|
- "L2 "
|
||||||
- "L2.6026[24,34] 1.02us 107mb|-L2.6026-| "
|
- "L2.1018[24,34] 1.02us 107mb|-L2.1018-| "
|
||||||
- "L2.6034[81,91] 1.02us 107mb |-L2.6034-| "
|
- "L2.1019[35,44] 1.02us 97mb |L2.1019-| "
|
||||||
- "L2.6035[92,100] 1.02us 88mb |L2.6035| "
|
- "L2.1020[45,50] 1.02us 58mb |L2.1020| "
|
||||||
- "L2.6036[35,45] 1.02us 107mb |-L2.6036-| "
|
- "L2.1021[51,61] 1.02us 107mb |-L2.1021-| "
|
||||||
- "L2.6037[46,55] 1.02us 97mb |L2.6037-| "
|
- "L2.1022[62,71] 1.02us 97mb |L2.1022-| "
|
||||||
- "L2.6038[56,63] 1.02us 78mb |L2.6038| "
|
- "L2.1027[72,82] 1.02us 107mb |-L2.1027-| "
|
||||||
- "L2.6039[64,74] 1.02us 107mb |-L2.6039-| "
|
- "L2.1028[83,92] 1.02us 97mb |L2.1028-| "
|
||||||
- "L2.6040[75,80] 1.02us 58mb |L2.6040| "
|
- "L2.1029[93,100] 1.02us 78mb |L2.1029|"
|
||||||
- "**** Breakdown of where bytes were written"
|
- "**** Breakdown of where bytes were written"
|
||||||
- 282mb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))
|
- 1.01gb written by split(CompactAndSplitOutput(TotalSizeLessThanMaxCompactSize))
|
||||||
- 750mb written by compact(ManySmallFiles)
|
- 300mb written by split(CompactAndSplitOutput(FoundSubsetLessThanMaxCompactSize))
|
||||||
- 750mb written by split(VerticalSplit)
|
- 450mb written by split(VerticalSplit)
|
||||||
- 916mb written by split(CompactAndSplitOutput(TotalSizeLessThanMaxCompactSize))
|
- 899mb written by compact(ManySmallFiles)
|
||||||
"###
|
"###
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue