feat(compactor): Add RoundInfo structure

pull/24376/head
Andrew Lamb 2023-02-15 12:07:19 -05:00
parent 4adbc9f764
commit 999aba9d56
16 changed files with 184 additions and 23 deletions

View File

@ -2,6 +2,8 @@ use std::fmt::{Debug, Display};
use data_types::ParquetFile;
use crate::RoundInfo;
pub mod single_branch;
pub trait DivideInitial: Debug + Display + Send + Sync {
@ -10,5 +12,5 @@ pub trait DivideInitial: Debug + Display + Send + Sync {
///
/// Each branch is compacted together in a single plan, and each
/// compact plan may produce one or more parquet files.
fn divide(&self, files: Vec<ParquetFile>) -> Vec<Vec<ParquetFile>>;
fn divide(&self, files: Vec<ParquetFile>, round_info: &RoundInfo) -> Vec<Vec<ParquetFile>>;
}

View File

@ -2,6 +2,8 @@ use std::fmt::Display;
use data_types::ParquetFile;
use crate::RoundInfo;
use super::DivideInitial;
#[derive(Debug, Default)]
@ -20,7 +22,7 @@ impl Display for SingleBranchDivideInitial {
}
impl DivideInitial for SingleBranchDivideInitial {
fn divide(&self, files: Vec<ParquetFile>) -> Vec<Vec<ParquetFile>> {
fn divide(&self, files: Vec<ParquetFile>, _round_info: &RoundInfo) -> Vec<Vec<ParquetFile>> {
if files.is_empty() {
vec![]
} else {
@ -45,16 +47,17 @@ mod tests {
#[test]
fn test_divide() {
let round_info = RoundInfo::ManySmallFiles {};
let divide = SingleBranchDivideInitial::new();
// empty input
assert_eq!(divide.divide(vec![]), Vec::<Vec<_>>::new());
assert_eq!(divide.divide(vec![], &round_info), Vec::<Vec<_>>::new());
// not empty
let f1 = ParquetFileBuilder::new(1).build();
let f2 = ParquetFileBuilder::new(2).build();
assert_eq!(
divide.divide(vec![f1.clone(), f2.clone()]),
divide.divide(vec![f1.clone(), f2.clone()], &round_info),
vec![vec![f1, f2]]
);
}

View File

@ -2,7 +2,7 @@ use std::fmt::Display;
use data_types::{CompactionLevel, ParquetFile};
use crate::{file_classification::FileClassification, partition_info::PartitionInfo};
use crate::{file_classification::FileClassification, partition_info::PartitionInfo, RoundInfo};
use super::FileClassifier;
@ -26,6 +26,7 @@ impl FileClassifier for AllAtOnceFileClassifier {
fn classify(
&self,
_partition_info: &PartitionInfo,
_round_info: &RoundInfo,
files: Vec<ParquetFile>,
) -> FileClassification {
// Check if there are files in Compaction::Initial level
@ -67,7 +68,7 @@ mod tests {
let classifier = AllAtOnceFileClassifier::new();
let partition_info = Arc::new(PartitionInfoBuilder::new().build());
classifier.classify(&partition_info, vec![]);
classifier.classify(&partition_info, &round_info(), vec![]);
}
#[test]
@ -80,7 +81,7 @@ mod tests {
.build();
let partition_info = Arc::new(PartitionInfoBuilder::new().build());
classifier.classify(&partition_info, vec![f1]);
classifier.classify(&partition_info, &round_info(), vec![f1]);
}
#[test]
@ -93,7 +94,7 @@ mod tests {
.build();
let partition_info = Arc::new(PartitionInfoBuilder::new().build());
classifier.classify(&partition_info, vec![f2]);
classifier.classify(&partition_info, &round_info(), vec![f2]);
}
#[test]
@ -110,7 +111,7 @@ mod tests {
.build();
let partition_info = Arc::new(PartitionInfoBuilder::new().build());
classifier.classify(&partition_info, vec![f1, f2]);
classifier.classify(&partition_info, &round_info(), vec![f1, f2]);
}
#[test]
@ -118,7 +119,7 @@ mod tests {
let classifier = AllAtOnceFileClassifier::new();
let files = create_overlapped_files();
let partition_info = Arc::new(PartitionInfoBuilder::new().build());
let classification = classifier.classify(&partition_info, files.clone());
let classification = classifier.classify(&partition_info, &round_info(), files.clone());
assert_eq!(
classification,
FileClassification {
@ -129,4 +130,8 @@ mod tests {
}
);
}
fn round_info() -> RoundInfo {
RoundInfo::ManySmallFiles {}
}
}

View File

@ -3,7 +3,7 @@ use std::fmt::Display;
use data_types::ParquetFile;
use observability_deps::tracing::info;
use crate::{file_classification::FileClassification, partition_info::PartitionInfo};
use crate::{file_classification::FileClassification, partition_info::PartitionInfo, RoundInfo};
use super::FileClassifier;
@ -40,13 +40,15 @@ where
fn classify(
&self,
partition_info: &PartitionInfo,
round_info: &RoundInfo,
files: Vec<ParquetFile>,
) -> FileClassification {
let classification = self.inner.classify(partition_info, files);
let classification = self.inner.classify(partition_info, round_info, files);
info!(
partition_id = partition_info.partition_id.get(),
target_level = %classification.target_level,
round_info = %round_info,
files_to_compacts = classification.files_to_compact.len(),
files_to_upgrade = classification.files_to_upgrade.len(),
files_to_keep = classification.files_to_keep.len(),

View File

@ -5,7 +5,7 @@ use std::{
use data_types::ParquetFile;
use crate::{file_classification::FileClassification, partition_info::PartitionInfo};
use crate::{file_classification::FileClassification, partition_info::PartitionInfo, RoundInfo};
pub mod all_at_once;
pub mod logging;
@ -15,6 +15,7 @@ pub trait FileClassifier: Debug + Display + Send + Sync {
fn classify(
&self,
partition_info: &PartitionInfo,
round_info: &RoundInfo,
files: Vec<ParquetFile>,
) -> FileClassification;
}
@ -26,8 +27,9 @@ where
fn classify(
&self,
partition_info: &PartitionInfo,
round_info: &RoundInfo,
files: Vec<ParquetFile>,
) -> FileClassification {
self.as_ref().classify(partition_info, files)
self.as_ref().classify(partition_info, round_info, files)
}
}

View File

@ -6,6 +6,7 @@ use crate::{
components::{files_split::FilesSplit, target_level_chooser::TargetLevelChooser},
file_classification::FileClassification,
partition_info::PartitionInfo,
RoundInfo,
};
use super::FileClassifier;
@ -111,6 +112,7 @@ where
fn classify(
&self,
_partition_info: &PartitionInfo,
_round_info: &RoundInfo,
files: Vec<ParquetFile>,
) -> FileClassification {
let files_to_compact = files;

View File

@ -80,6 +80,7 @@ use super::{
not_empty::NotEmptyPartitionsSourceWrapper,
randomize_order::RandomizeOrderPartitionsSourcesWrapper, PartitionsSource,
},
round_info_source::{LevelBasedRoundInfo, LoggingRoundInfoWrapper},
round_split::all_now::AllNowRoundSplit,
scratchpad::{noop::NoopScratchpadGen, prod::ProdScratchpadGen, ScratchpadGen},
skipped_compactions_source::catalog::CatalogSkippedCompactionsSource,
@ -279,6 +280,9 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
config.backoff_config.clone(),
Arc::clone(&config.catalog),
)),
round_info_source: Arc::new(LoggingRoundInfoWrapper::new(Arc::new(
LevelBasedRoundInfo::new(),
))),
files_filter: version_specific_files_filter(config),
partition_filter: Arc::new(LoggingPartitionFilterWrapper::new(
MetricsPartitionFilterWrapper::new(

View File

@ -6,7 +6,8 @@ use self::{
ir_planner::IRPlanner, parquet_files_sink::ParquetFilesSink,
partition_done_sink::PartitionDoneSink, partition_files_source::PartitionFilesSource,
partition_filter::PartitionFilter, partition_info_source::PartitionInfoSource,
partition_stream::PartitionStream, round_split::RoundSplit, scratchpad::ScratchpadGen,
partition_stream::PartitionStream, round_info_source::RoundInfoSource, round_split::RoundSplit,
scratchpad::ScratchpadGen,
};
pub mod combos;
@ -33,6 +34,7 @@ pub mod partition_source;
pub mod partition_stream;
pub mod partitions_source;
pub mod report;
pub mod round_info_source;
pub mod round_split;
pub mod scratchpad;
pub mod skipped_compactions_source;
@ -50,6 +52,8 @@ pub struct Components {
pub partition_info_source: Arc<dyn PartitionInfoSource>,
/// Source of files in a partition for compaction
pub partition_files_source: Arc<dyn PartitionFilesSource>,
/// Determines what type of compaction round the compactor will be doing
pub round_info_source: Arc<dyn RoundInfoSource>,
/// filter files for each round of compaction
pub files_filter: Arc<dyn FilesFilter>,
/// stop condition for completing a partition compaction

View File

@ -96,6 +96,7 @@ pub fn log_components(components: &Components) {
partition_stream,
partition_info_source,
partition_files_source,
round_info_source,
files_filter,
partition_filter,
partition_resource_limit_filter,
@ -116,6 +117,7 @@ pub fn log_components(components: &Components) {
%partition_info_source,
%partition_files_source,
%files_filter,
%round_info_source,
%partition_filter,
%partition_resource_limit_filter,
%partition_done_sink,

View File

@ -0,0 +1,81 @@
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use async_trait::async_trait;
use data_types::ParquetFile;
use observability_deps::tracing::debug;
use crate::{error::DynError, PartitionInfo, RoundInfo};
/// Calculates information about what this compaction round does
#[async_trait]
pub trait RoundInfoSource: Debug + Display + Send + Sync {
async fn calculate(
&self,
partition_info: &PartitionInfo,
files: &[ParquetFile],
) -> Result<Arc<RoundInfo>, DynError>;
}
#[derive(Debug)]
pub struct LoggingRoundInfoWrapper {
inner: Arc<dyn RoundInfoSource>,
}
impl LoggingRoundInfoWrapper {
pub fn new(inner: Arc<dyn RoundInfoSource>) -> Self {
Self { inner }
}
}
impl Display for LoggingRoundInfoWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "LoggingRoundInfoWrapper({})", self.inner)
}
}
#[async_trait]
impl RoundInfoSource for LoggingRoundInfoWrapper {
async fn calculate(
&self,
partition_info: &PartitionInfo,
files: &[ParquetFile],
) -> Result<Arc<RoundInfo>, DynError> {
let res = self.inner.calculate(partition_info, files).await;
if let Ok(round_info) = &res {
debug!(round_info_source=%self.inner, %round_info, "running round");
}
res
}
}
/// Computes the type of round based on the levels of the input files
#[derive(Debug)]
pub struct LevelBasedRoundInfo {}
impl Display for LevelBasedRoundInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "LevelBasedRoundInfo")
}
}
impl LevelBasedRoundInfo {
pub fn new() -> Self {
Self {}
}
}
#[async_trait]
impl RoundInfoSource for LevelBasedRoundInfo {
async fn calculate(
&self,
_partition_info: &PartitionInfo,
_files: &[ParquetFile],
) -> Result<Arc<RoundInfo>, DynError> {
// TODO: use this to calculate splits
Ok(Arc::new(RoundInfo::TargetLevel {
target_level: data_types::CompactionLevel::Initial,
}))
}
}

View File

@ -2,6 +2,8 @@ use std::fmt::Display;
use data_types::ParquetFile;
use crate::RoundInfo;
use super::RoundSplit;
#[derive(Debug, Default)]
@ -20,7 +22,11 @@ impl Display for AllNowRoundSplit {
}
impl RoundSplit for AllNowRoundSplit {
fn split(&self, files: Vec<ParquetFile>) -> (Vec<ParquetFile>, Vec<ParquetFile>) {
fn split(
&self,
files: Vec<ParquetFile>,
_round_info: &RoundInfo,
) -> (Vec<ParquetFile>, Vec<ParquetFile>) {
(files, vec![])
}
}
@ -29,6 +35,8 @@ impl RoundSplit for AllNowRoundSplit {
mod tests {
use iox_tests::ParquetFileBuilder;
use crate::RoundInfo;
use super::*;
#[test]
@ -38,16 +46,17 @@ mod tests {
#[test]
fn test_split() {
let round_info = RoundInfo::ManySmallFiles;
let split = AllNowRoundSplit::new();
// empty input
assert_eq!(split.split(vec![]), (vec![], vec![]));
assert_eq!(split.split(vec![], &round_info), (vec![], vec![]));
// not empty
let f1 = ParquetFileBuilder::new(1).build();
let f2 = ParquetFileBuilder::new(2).build();
assert_eq!(
split.split(vec![f1.clone(), f2.clone()]),
split.split(vec![f1.clone(), f2.clone()], &round_info),
(vec![f1, f2], vec![])
);
}

View File

@ -2,6 +2,8 @@ use std::fmt::{Debug, Display};
use data_types::ParquetFile;
use crate::RoundInfo;
pub mod all_now;
pub trait RoundSplit: Debug + Display + Send + Sync {
@ -11,5 +13,9 @@ pub trait RoundSplit: Debug + Display + Send + Sync {
///
/// - **now:** will be processed in this round
/// - **later:** will be processed in the next round
fn split(&self, files: Vec<ParquetFile>) -> (Vec<ParquetFile>, Vec<ParquetFile>);
fn split(
&self,
files: Vec<ParquetFile>,
round_info: &RoundInfo,
) -> (Vec<ParquetFile>, Vec<ParquetFile>);
}

View File

@ -184,6 +184,11 @@ async fn try_compact_partition(
// loop for each "Round", consider each file in the partition
loop {
let round_info = components
.round_info_source
.calculate(&partition_info, &files)
.await?;
files = components.files_filter.apply(files);
// This is the stop condition which will be different for different version of compaction
@ -196,10 +201,12 @@ async fn try_compact_partition(
return Ok(());
}
let (files_now, files_later) = components.round_split.split(files);
let (files_now, files_later) = components.round_split.split(files, round_info.as_ref());
// Each branch must not overlap with each other
let mut branches = components.divide_initial.divide(files_now);
let mut branches = components
.divide_initial
.divide(files_now, round_info.as_ref());
let mut files_next = files_later;
// loop for each "Branch"
@ -210,7 +217,10 @@ async fn try_compact_partition(
// Identify the target level and files that should be
// compacted together, upgraded, and kept for next round of
// compaction
let file_classification = components.file_classifier.classify(&partition_info, branch);
let file_classification =
components
.file_classifier
.classify(&partition_info, &round_info, branch);
// Cannot run this plan and skip this partition because of over limit of input num_files or size.
// The partition_resource_limit_filter will throw an error if one of the limits hit and will lead

View File

@ -179,6 +179,7 @@ mod file_classification;
pub mod object_store;
mod partition_info;
mod plan_ir;
mod round_info;
// publically expose items needed for testing
pub use components::{
@ -189,6 +190,7 @@ pub use driver::compact;
pub use error::DynError;
pub use partition_info::PartitionInfo;
pub use plan_ir::PlanIR;
pub use round_info::RoundInfo;
#[cfg(test)]
mod test_utils;

View File

@ -5,7 +5,7 @@ use std::sync::Arc;
use data_types::{NamespaceId, PartitionId, PartitionKey, Table, TableSchema};
use schema::sort::SortKey;
/// Information of a partition for compaction
/// Information about the Partition being compacted
#[derive(Debug, PartialEq, Eq)]
pub struct PartitionInfo {
/// the partition

View File

@ -0,0 +1,27 @@
//! Information about the current compaction round
use std::fmt::Display;
use data_types::CompactionLevel;
/// Information about the current compaction round (see driver.rs for
/// more details about a round)
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum RoundInfo {
/// compacting to target level
TargetLevel {
/// compaction level of target fles
target_level: CompactionLevel,
},
/// In many small files mode
ManySmallFiles,
}
impl Display for RoundInfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::TargetLevel { target_level } => write!(f, "TargetLevel: {target_level}"),
Self::ManySmallFiles => write!(f, "ManySmallFiles"),
}
}
}