feat: divide&conquer framework (#6697)

Allows compactor2 to run a fixed-point loop (until all work is done) and
in every loop in can run mulitiple jobs.

The jobs are currently organized by "branches". This is because our
upcoming OOM handling may split a branch further if it doesn't complete.

Also note that the current config resembles the state prior to this PR.
So the FP-loop will only iterate ONCE and then runs out of L0 files. A
more advanced setup can be built using the framework though.
pull/24376/head
Marco Neumann 2023-01-25 15:45:20 +01:00 committed by GitHub
parent c928eddaab
commit 7306ea9424
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 351 additions and 52 deletions

View File

@ -0,0 +1,9 @@
use std::fmt::{Debug, Display};
use data_types::ParquetFile;
pub mod single_branch;
pub trait DivideInitial: Debug + Display + Send + Sync {
fn divide(&self, files: Vec<ParquetFile>) -> Vec<Vec<ParquetFile>>;
}

View File

@ -0,0 +1,61 @@
use std::fmt::Display;
use data_types::ParquetFile;
use super::DivideInitial;
#[derive(Debug, Default)]
pub struct SingleBranchDivideInitial;
impl SingleBranchDivideInitial {
pub fn new() -> Self {
Self::default()
}
}
impl Display for SingleBranchDivideInitial {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "single_branch")
}
}
impl DivideInitial for SingleBranchDivideInitial {
fn divide(&self, files: Vec<ParquetFile>) -> Vec<Vec<ParquetFile>> {
if files.is_empty() {
vec![]
} else {
vec![files]
}
}
}
#[cfg(test)]
mod tests {
use crate::test_util::ParquetFileBuilder;
use super::*;
#[test]
fn test_display() {
assert_eq!(
SingleBranchDivideInitial::new().to_string(),
"single_branch"
);
}
#[test]
fn test_divide() {
let divide = SingleBranchDivideInitial::new();
// empty input
assert_eq!(divide.divide(vec![]), Vec::<Vec<_>>::new());
// not empty
let f1 = ParquetFileBuilder::new(1).build();
let f2 = ParquetFileBuilder::new(2).build();
assert_eq!(
divide.divide(vec![f1.clone(), f2.clone()]),
vec![vec![f1, f2]]
);
}
}

View File

@ -21,6 +21,7 @@ use super::{
},
df_plan_exec::dedicated::DedicatedDataFusionPlanExec,
df_planner::planner_v1::V1DataFusionPlanner,
divide_initial::single_branch::SingleBranchDivideInitial,
file_filter::{and::AndFileFilter, level_range::LevelRangeFileFilter},
files_filter::{chain::FilesFilterChain, per_file::PerFileFilesFilter},
parquet_file_sink::{
@ -34,14 +35,15 @@ use super::{
partition_files_source::catalog::CatalogPartitionFilesSource,
partition_filter::{
and::AndPartitionFilter, has_files::HasFilesPartitionFilter,
logging::LoggingPartitionFilterWrapper, metrics::MetricsPartitionFilterWrapper,
never_skipped::NeverSkippedPartitionFilter,
has_matching_file::HasMatchingFilePartitionFilter, logging::LoggingPartitionFilterWrapper,
metrics::MetricsPartitionFilterWrapper, never_skipped::NeverSkippedPartitionFilter,
},
partitions_source::{
catalog::CatalogPartitionsSource, logging::LoggingPartitionsSourceWrapper,
metrics::MetricsPartitionsSourceWrapper,
randomize_order::RandomizeOrderPartitionsSourcesWrapper,
},
round_split::all_now::AllNowRoundSplit,
skipped_compactions_source::catalog::CatalogSkippedCompactionsSource,
Components,
};
@ -86,6 +88,11 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
Arc::clone(&config.catalog),
),
)),
Arc::new(HasMatchingFilePartitionFilter::new(
LevelRangeFileFilter::new(
CompactionLevel::Initial..=CompactionLevel::Initial,
),
)),
Arc::new(HasFilesPartitionFilter::new()),
]),
&config.metric_registry,
@ -133,5 +140,7 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
Arc::clone(&config.exec),
),
)),
round_split: Arc::new(AllNowRoundSplit::new()),
divide_initial: Arc::new(SingleBranchDivideInitial::new()),
})
}

View File

@ -2,15 +2,16 @@ use std::sync::Arc;
use self::{
commit::Commit, df_plan_exec::DataFusionPlanExec, df_planner::DataFusionPlanner,
files_filter::FilesFilter, namespaces_source::NamespacesSource,
divide_initial::DivideInitial, files_filter::FilesFilter, namespaces_source::NamespacesSource,
parquet_file_sink::ParquetFileSink, partition_done_sink::PartitionDoneSink,
partition_files_source::PartitionFilesSource, partition_filter::PartitionFilter,
partitions_source::PartitionsSource, tables_source::TablesSource,
partitions_source::PartitionsSource, round_split::RoundSplit, tables_source::TablesSource,
};
pub mod commit;
pub mod df_plan_exec;
pub mod df_planner;
pub mod divide_initial;
pub mod file_filter;
pub mod files_filter;
pub mod hardcoded;
@ -21,6 +22,7 @@ pub mod partition_files_source;
pub mod partition_filter;
pub mod partitions_source;
pub mod report;
pub mod round_split;
pub mod skipped_compactions_source;
pub mod tables_source;
@ -37,4 +39,6 @@ pub struct Components {
pub df_planner: Arc<dyn DataFusionPlanner>,
pub df_plan_exec: Arc<dyn DataFusionPlanExec>,
pub parquet_file_sink: Arc<dyn ParquetFileSink>,
pub round_split: Arc<dyn RoundSplit>,
pub divide_initial: Arc<dyn DivideInitial>,
}

View File

@ -0,0 +1,88 @@
use std::fmt::Display;
use async_trait::async_trait;
use data_types::{ParquetFile, PartitionId};
use crate::components::file_filter::FileFilter;
use super::PartitionFilter;
#[derive(Debug)]
pub struct HasMatchingFilePartitionFilter<T>
where
T: FileFilter,
{
filter: T,
}
impl<T> HasMatchingFilePartitionFilter<T>
where
T: FileFilter,
{
pub fn new(filter: T) -> Self {
Self { filter }
}
}
impl<T> Display for HasMatchingFilePartitionFilter<T>
where
T: FileFilter,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "has_matching_file({})", self.filter)
}
}
#[async_trait]
impl<T> PartitionFilter for HasMatchingFilePartitionFilter<T>
where
T: FileFilter,
{
async fn apply(&self, _partition_id: PartitionId, files: &[ParquetFile]) -> bool {
files.iter().any(|file| self.filter.apply(file))
}
}
#[cfg(test)]
mod tests {
use data_types::CompactionLevel;
use crate::{
components::file_filter::level_range::LevelRangeFileFilter, test_util::ParquetFileBuilder,
};
use super::*;
#[test]
fn test_display() {
let filter = HasMatchingFilePartitionFilter::new(LevelRangeFileFilter::new(
CompactionLevel::Initial..=CompactionLevel::FileNonOverlapped,
));
assert_eq!(filter.to_string(), "has_matching_file(level_range(0..=1))");
}
#[tokio::test]
async fn test_apply() {
let filter = HasMatchingFilePartitionFilter::new(LevelRangeFileFilter::new(
CompactionLevel::Initial..=CompactionLevel::FileNonOverlapped,
));
let f1 = ParquetFileBuilder::new(0)
.with_compaction_level(CompactionLevel::FileNonOverlapped)
.build();
let f2 = ParquetFileBuilder::new(1)
.with_compaction_level(CompactionLevel::Final)
.build();
// empty
assert!(!filter.apply(PartitionId::new(1), &[]).await);
// all matching
assert!(filter.apply(PartitionId::new(1), &[f1.clone()]).await);
// none matching
assert!(!filter.apply(PartitionId::new(1), &[f2.clone()]).await);
// some matching
assert!(filter.apply(PartitionId::new(1), &[f1, f2]).await);
}
}

View File

@ -5,6 +5,7 @@ use data_types::{ParquetFile, PartitionId};
pub mod and;
pub mod has_files;
pub mod has_matching_file;
pub mod logging;
pub mod metrics;
pub mod never_skipped;

View File

@ -19,6 +19,8 @@ pub fn log_components(components: &Components) {
df_planner,
df_plan_exec,
parquet_file_sink,
round_split,
divide_initial,
} = components;
info!(
@ -33,6 +35,8 @@ pub fn log_components(components: &Components) {
%df_planner,
%df_plan_exec,
%parquet_file_sink,
%round_split,
%divide_initial,
"component setup",
);
}

View File

@ -0,0 +1,54 @@
use std::fmt::Display;
use data_types::ParquetFile;
use super::RoundSplit;
#[derive(Debug, Default)]
pub struct AllNowRoundSplit;
impl AllNowRoundSplit {
pub fn new() -> Self {
Self::default()
}
}
impl Display for AllNowRoundSplit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "now")
}
}
impl RoundSplit for AllNowRoundSplit {
fn split(&self, files: Vec<ParquetFile>) -> (Vec<ParquetFile>, Vec<ParquetFile>) {
(files, vec![])
}
}
#[cfg(test)]
mod tests {
use crate::test_util::ParquetFileBuilder;
use super::*;
#[test]
fn test_display() {
assert_eq!(AllNowRoundSplit::new().to_string(), "now");
}
#[test]
fn test_split() {
let split = AllNowRoundSplit::new();
// empty input
assert_eq!(split.split(vec![]), (vec![], vec![]));
// not empty
let f1 = ParquetFileBuilder::new(1).build();
let f2 = ParquetFileBuilder::new(2).build();
assert_eq!(
split.split(vec![f1.clone(), f2.clone()]),
(vec![f1, f2], vec![])
);
}
}

View File

@ -0,0 +1,15 @@
use std::fmt::{Debug, Display};
use data_types::ParquetFile;
pub mod all_now;
pub trait RoundSplit: Debug + Display + Send + Sync {
/// Split files into two buckets "now" and "later".
///
/// All files belong to the same partition.
///
/// - **now:** will be processed in this round
/// - **later:** will be processed in the next round
fn split(&self, files: Vec<ParquetFile>) -> (Vec<ParquetFile>, Vec<ParquetFile>);
}

View File

@ -1,7 +1,8 @@
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use std::{future::Future, num::NonZeroUsize, sync::Arc, time::Duration};
use data_types::{CompactionLevel, PartitionId};
use futures::{stream::FuturesOrdered, StreamExt, TryStreamExt};
use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt};
use tracker::InstrumentedAsyncSemaphore;
use crate::{components::Components, partition_info::PartitionInfo};
@ -70,57 +71,82 @@ async fn try_compact_partition(
job_semaphore: Arc<InstrumentedAsyncSemaphore>,
components: Arc<Components>,
) -> Result<(), Error> {
let files = components.partition_files_source.fetch(partition_id).await;
let files = components.files_filter.apply(files);
let delete_ids = files.iter().map(|f| f.id).collect::<Vec<_>>();
let mut files = components.partition_files_source.fetch(partition_id).await;
if !components
.partition_filter
.apply(partition_id, &files)
.await
{
return Ok(());
}
// fetch partition info only if we need it
let mut lazy_partition_info = None;
let partition_info = fetch_partition_info(partition_id, &components).await?;
loop {
files = components.files_filter.apply(files);
// TODO: Need a wraper funtion to:
// . split files into L0, L1 and L2
// . identify right files for hot/cold compaction
// . filter right amount of files
// . compact many steps hot/cold (need more thinking)
let target_level = CompactionLevel::FileNonOverlapped;
let plan = components
.df_planner
.plan(files, Arc::clone(&partition_info), target_level)
.await?;
let streams = components.df_plan_exec.exec(plan);
let job = streams
.into_iter()
.map(|stream| {
components
.parquet_file_sink
.store(stream, Arc::clone(&partition_info), target_level)
})
// NB: FuturesOrdered allows the futures to run in parallel
.collect::<FuturesOrdered<_>>()
// Discard the streams that resulted in empty output / no file uploaded
// to the object store.
.try_filter_map(|v| futures::future::ready(Ok(v)))
// Collect all the persisted parquet files together.
.try_collect::<Vec<_>>();
let create = {
let _permit = job_semaphore
.acquire(None)
if !components
.partition_filter
.apply(partition_id, &files)
.await
.expect("semaphore not closed");
job.await?
};
{
return Ok(());
}
components.commit.commit(&delete_ids, &create).await;
// fetch partition info
if lazy_partition_info.is_none() {
lazy_partition_info = Some(fetch_partition_info(partition_id, &components).await?);
}
let partition_info = lazy_partition_info.as_ref().expect("just fetched");
Ok(())
let (files_now, files_later) = components.round_split.split(files);
let mut branches = components.divide_initial.divide(files_now);
let mut files_next = files_later;
while let Some(branch) = branches.pop() {
let delete_ids = branch.iter().map(|f| f.id).collect::<Vec<_>>();
let create = {
// draw semaphore BEFORE creating the DataFusion plan and drop it directly AFTER finishing the
// DataFusion computation (but BEFORE doing any additional external IO).
//
// We guard the DataFusion planning (that doesn't perform any IO) via the semaphore as well in case
// DataFusion ever starts to pre-allocate buffers during the physical planning. To the best of our
// knowledge, this is currently (2023-01-25) not the case but if this ever changes, then we are prepared.
let _permit = job_semaphore
.acquire(None)
.await
.expect("semaphore not closed");
// TODO: Need a wraper funtion to:
// . split files into L0, L1 and L2
// . identify right files for hot/cold compaction
// . filter right amount of files
// . compact many steps hot/cold (need more thinking)
let target_level = CompactionLevel::FileNonOverlapped;
let plan = components
.df_planner
.plan(branch, Arc::clone(partition_info), target_level)
.await?;
let streams = components.df_plan_exec.exec(plan);
let job = stream_into_file_sink(
streams,
Arc::clone(partition_info),
target_level,
Arc::clone(&components),
);
// TODO: react to OOM and try to divide branch
job.await?
};
let ids = components.commit.commit(&delete_ids, &create).await;
files_next.extend(
create
.into_iter()
.zip(ids)
.map(|(params, id)| ParquetFile::from_params(params, id)),
);
}
files = files_next;
}
}
async fn fetch_partition_info(
@ -170,3 +196,31 @@ async fn fetch_partition_info(
partition_key: partition.partition_key,
}))
}
fn stream_into_file_sink(
streams: Vec<SendableRecordBatchStream>,
partition_info: Arc<PartitionInfo>,
target_level: CompactionLevel,
components: Arc<Components>,
) -> impl Future<Output = Result<Vec<ParquetFileParams>, Error>> {
streams
.into_iter()
.map(move |stream| {
let components = Arc::clone(&components);
let partition_info = Arc::clone(&partition_info);
async move {
components
.parquet_file_sink
.store(stream, partition_info, target_level)
.await
}
})
// NB: FuturesOrdered allows the futures to run in parallel
.collect::<FuturesOrdered<_>>()
// Discard the streams that resulted in empty output / no file uploaded
// to the object store.
.try_filter_map(|v| futures::future::ready(Ok(v)))
// Collect all the persisted parquet files together.
.try_collect::<Vec<_>>()
.map_err(|e| Box::new(e) as _)
}