chore(compactor): move divide & branches to RoundInfo calculation (#8410)

pull/24376/head
Joe-Blount 2023-08-03 17:02:06 -05:00 committed by GitHub
parent 81b5d80a91
commit b7c0bcb61f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 29 deletions

View File

@ -4,7 +4,9 @@ use std::{
sync::Arc,
};
use crate::components::split_or_compact::start_level_files_to_split::split_into_chains;
use crate::components::{
split_or_compact::start_level_files_to_split::split_into_chains, Components,
};
use async_trait::async_trait;
use data_types::{CompactionLevel, ParquetFile, Timestamp};
use itertools::Itertools;
@ -17,9 +19,10 @@ use crate::{error::DynError, PartitionInfo, RoundInfo};
pub trait RoundInfoSource: Debug + Display + Send + Sync {
async fn calculate(
&self,
components: Arc<Components>,
partition_info: &PartitionInfo,
files: &[ParquetFile],
) -> Result<RoundInfo, DynError>;
files: Vec<ParquetFile>,
) -> Result<(RoundInfo, Vec<Vec<ParquetFile>>, Vec<ParquetFile>), DynError>;
}
#[derive(Debug)]
@ -43,12 +46,16 @@ impl Display for LoggingRoundInfoWrapper {
impl RoundInfoSource for LoggingRoundInfoWrapper {
async fn calculate(
&self,
components: Arc<Components>,
partition_info: &PartitionInfo,
files: &[ParquetFile],
) -> Result<RoundInfo, DynError> {
let res = self.inner.calculate(partition_info, files).await;
if let Ok(round_info) = &res {
debug!(round_info_source=%self.inner, %round_info, "running round");
files: Vec<ParquetFile>,
) -> Result<(RoundInfo, Vec<Vec<ParquetFile>>, Vec<ParquetFile>), DynError> {
let res = self
.inner
.calculate(components, partition_info, files)
.await;
if let Ok((round_info, branches, files_later)) = &res {
debug!(round_info_source=%self.inner, %round_info, branches=branches.len(), files_later=files_later.len(), "running round");
}
res
}
@ -169,25 +176,32 @@ impl LevelBasedRoundInfo {
impl RoundInfoSource for LevelBasedRoundInfo {
async fn calculate(
&self,
components: Arc<Components>,
_partition_info: &PartitionInfo,
files: &[ParquetFile],
) -> Result<RoundInfo, DynError> {
files: Vec<ParquetFile>,
) -> Result<(RoundInfo, Vec<Vec<ParquetFile>>, Vec<ParquetFile>), DynError> {
let start_level = get_start_level(
files,
&files,
self.max_num_files_per_plan,
self.max_total_file_size_per_plan,
);
if self.too_many_small_files_to_compact(files, start_level) {
return Ok(RoundInfo::ManySmallFiles {
let round_info = if self.too_many_small_files_to_compact(&files, start_level) {
RoundInfo::ManySmallFiles {
start_level,
max_num_files_to_group: self.max_num_files_per_plan,
max_total_file_size_to_group: self.max_total_file_size_per_plan,
});
}
}
} else {
let target_level = pick_level(&files);
RoundInfo::TargetLevel { target_level }
};
let target_level = pick_level(files);
Ok(RoundInfo::TargetLevel { target_level })
let (files_now, files_later) = components.round_split.split(files, round_info);
let branches = components.divide_initial.divide(files_now, round_info);
Ok((round_info, branches, files_later))
}
}

View File

@ -230,11 +230,6 @@ async fn try_compact_partition(
return Ok(());
}
let round_info = components
.round_info_source
.calculate(&partition_info, &files)
.await?;
// This is the stop condition which will be different for different version of compaction
// and describe where the filter is created at version_specific_partition_filters function
if !components
@ -245,13 +240,14 @@ async fn try_compact_partition(
return Ok(());
}
let (files_now, files_later) = components.round_split.split(files, round_info);
// Each branch must not overlap with each other
let branches = components
.divide_initial
.divide(files_now, round_info)
.into_iter();
let (round_info, branches, files_later) = components
.round_info_source
.calculate(
Arc::<Components>::clone(&components),
&partition_info,
files,
)
.await?;
files = files_later;