refactor: `ParquetFilesSink` (#6928)

* refactor: pass `PlanIR` by ref

* refactor: `ParquetFilesSink`
pull/24376/head
Marco Neumann 2023-02-09 16:53:19 +01:00 committed by GitHub
parent 0e5f31c576
commit 4a97620664
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 156 additions and 58 deletions

View File

@ -16,7 +16,7 @@ use crate::{partition_info::PartitionInfo, plan_ir::PlanIR};
pub trait DataFusionPlanner: Debug + Display + Send + Sync {
async fn plan(
&self,
ir: PlanIR,
ir: &PlanIR,
partition: Arc<PartitionInfo>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError>;
}

View File

@ -37,7 +37,7 @@ impl Display for PanicDataFusionPlanner {
impl DataFusionPlanner for PanicDataFusionPlanner {
async fn plan(
&self,
_ir: PlanIR,
_ir: &PlanIR,
_partition: Arc<PartitionInfo>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
Ok(Arc::new(PanicPlan))
@ -111,7 +111,7 @@ mod tests {
let planner = PanicDataFusionPlanner::new();
let partition = partition_info();
let plan = planner
.plan(PlanIR::Compact { files: vec![] }, partition)
.plan(&PlanIR::Compact { files: vec![] }, partition)
.await
.unwrap();

View File

@ -43,7 +43,7 @@ impl Display for V1DataFusionPlanner {
impl DataFusionPlanner for V1DataFusionPlanner {
async fn plan(
&self,
ir: PlanIR,
ir: &PlanIR,
partition: Arc<PartitionInfo>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let ctx = self.exec.new_context(ExecutorType::Reorg);
@ -87,7 +87,7 @@ impl DataFusionPlanner for V1DataFusionPlanner {
&merged_schema,
query_chunks,
sort_key,
split_times,
split_times.clone(),
)
.map_err(|e| {
DataFusionError::Context(

View File

@ -163,12 +163,12 @@ impl QueryChunk for QueryableParquetChunk {
}
pub fn to_query_chunks(
files: Vec<FileIR>,
files: &[FileIR],
partition_info: &PartitionInfo,
store: ParquetStorage,
) -> Vec<Arc<dyn QueryChunk>> {
files
.into_iter()
.iter()
.map(|file| {
Arc::new(to_queryable_parquet_chunk(
file,
@ -181,7 +181,7 @@ pub fn to_query_chunks(
/// Convert to a QueryableParquetChunk
fn to_queryable_parquet_chunk(
file: FileIR,
file: &FileIR,
partition_info: &PartitionInfo,
store: ParquetStorage,
) -> QueryableParquetChunk {
@ -220,7 +220,7 @@ fn to_queryable_parquet_chunk(
"built parquet chunk from metadata"
);
let parquet_chunk = ParquetChunk::new(Arc::new(file.file), schema, store);
let parquet_chunk = ParquetChunk::new(Arc::new(file.file.clone()), schema, store);
QueryableParquetChunk::new(
partition_id,
Arc::new(parquet_chunk),

View File

@ -47,6 +47,7 @@ use super::{
dedicated::DedicatedExecParquetFileSinkWrapper, logging::LoggingParquetFileSinkWrapper,
mock::MockParquetFileSink, object_store::ObjectStoreParquetFileSink, ParquetFileSink,
},
parquet_files_sink::dispatch::DispatchParquetFilesSink,
partition_done_sink::{
catalog::CatalogPartitionDoneSink, error_kind::ErrorKindPartitionDoneSinkWrapper,
logging::LoggingPartitionDoneSinkWrapper, metrics::MetricsPartitionDoneSinkWrapper,
@ -302,7 +303,7 @@ pub fn hardcoded_components(config: &Config) -> Arc<Components> {
Arc::clone(&config.exec),
)),
df_plan_exec,
parquet_file_sink,
parquet_files_sink: Arc::new(DispatchParquetFilesSink::new(parquet_file_sink)),
round_split: Arc::new(AllNowRoundSplit::new()),
divide_initial: Arc::new(SingleBranchDivideInitial::new()),
scratchpad_gen,

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use self::{
commit::Commit, df_plan_exec::DataFusionPlanExec, df_planner::DataFusionPlanner,
divide_initial::DivideInitial, files_filter::FilesFilter, ir_planner::IRPlanner,
namespaces_source::NamespacesSource, parquet_file_sink::ParquetFileSink,
namespaces_source::NamespacesSource, parquet_files_sink::ParquetFilesSink,
partition_done_sink::PartitionDoneSink, partition_files_source::PartitionFilesSource,
partition_filter::PartitionFilter, partition_source::PartitionSource,
partition_stream::PartitionStream, round_split::RoundSplit, scratchpad::ScratchpadGen,
@ -24,6 +24,7 @@ pub mod ir_planner;
pub mod level_exist;
pub mod namespaces_source;
pub mod parquet_file_sink;
pub mod parquet_files_sink;
pub mod partition_done_sink;
pub mod partition_files_source;
pub mod partition_filter;
@ -52,7 +53,7 @@ pub struct Components {
pub ir_planner: Arc<dyn IRPlanner>,
pub df_planner: Arc<dyn DataFusionPlanner>,
pub df_plan_exec: Arc<dyn DataFusionPlanExec>,
pub parquet_file_sink: Arc<dyn ParquetFileSink>,
pub parquet_files_sink: Arc<dyn ParquetFilesSink>,
pub round_split: Arc<dyn RoundSplit>,
pub divide_initial: Arc<dyn DivideInitial>,
pub scratchpad_gen: Arc<dyn ScratchpadGen>,

View File

@ -25,3 +25,21 @@ pub trait ParquetFileSink: Debug + Display + Send + Sync {
max_l0_created_at: Time,
) -> Result<Option<ParquetFileParams>, DataFusionError>;
}
#[async_trait]
impl<T> ParquetFileSink for Arc<T>
where
T: ParquetFileSink + ?Sized,
{
async fn store(
&self,
stream: SendableRecordBatchStream,
partition: Arc<PartitionInfo>,
level: CompactionLevel,
max_l0_created_at: Time,
) -> Result<Option<ParquetFileParams>, DataFusionError> {
self.as_ref()
.store(stream, partition, level, max_l0_created_at)
.await
}
}

View File

@ -0,0 +1,87 @@
use std::{fmt::Display, sync::Arc};
use async_trait::async_trait;
use data_types::{CompactionLevel, ParquetFileParams};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{stream::FuturesOrdered, TryFutureExt, TryStreamExt};
use iox_time::Time;
use crate::{
components::parquet_file_sink::ParquetFileSink, error::DynError, partition_info::PartitionInfo,
plan_ir::PlanIR,
};
use super::ParquetFilesSink;
#[derive(Debug)]
pub struct DispatchParquetFilesSink<T>
where
T: ParquetFileSink,
{
inner: Arc<T>,
}
impl<T> DispatchParquetFilesSink<T>
where
T: ParquetFileSink,
{
pub fn new(inner: T) -> Self {
Self {
inner: Arc::new(inner),
}
}
}
impl<T> Display for DispatchParquetFilesSink<T>
where
T: ParquetFileSink,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "dispatch({})", self.inner)
}
}
#[async_trait]
impl<T> ParquetFilesSink for DispatchParquetFilesSink<T>
where
T: ParquetFileSink,
{
async fn stream_into_file_sink(
&self,
streams: Vec<SendableRecordBatchStream>,
partition_info: Arc<PartitionInfo>,
target_level: CompactionLevel,
plan_ir: &PlanIR,
) -> Result<Vec<ParquetFileParams>, DynError> {
// compute max_l0_created_at
let max_l0_created_at: Time = plan_ir
.input_files()
.iter()
.map(|f| f.file.max_l0_created_at)
.max()
.expect("max_l0_created_at should have value")
.into();
let inner = Arc::clone(&self.inner);
streams
.into_iter()
.map(move |stream| {
let inner = Arc::clone(&inner);
let partition_info = Arc::clone(&partition_info);
async move {
inner
.store(stream, partition_info, target_level, max_l0_created_at)
.await
}
})
// NB: FuturesOrdered allows the futures to run in parallel
.collect::<FuturesOrdered<_>>()
// Discard the streams that resulted in empty output / no file uploaded
// to the object store.
.try_filter_map(|v| futures::future::ready(Ok(v)))
// Collect all the persisted parquet files together.
.try_collect::<Vec<_>>()
.map_err(|e| Box::new(e) as _)
.await
}
}

View File

@ -0,0 +1,23 @@
use std::{
fmt::{Debug, Display},
sync::Arc,
};
use async_trait::async_trait;
use data_types::{CompactionLevel, ParquetFileParams};
use datafusion::physical_plan::SendableRecordBatchStream;
use crate::{error::DynError, partition_info::PartitionInfo, plan_ir::PlanIR};
pub mod dispatch;
#[async_trait]
pub trait ParquetFilesSink: Debug + Display + Send + Sync {
async fn stream_into_file_sink(
&self,
streams: Vec<SendableRecordBatchStream>,
partition_info: Arc<PartitionInfo>,
target_level: CompactionLevel,
plan_ir: &PlanIR,
) -> Result<Vec<ParquetFileParams>, DynError>;
}

View File

@ -98,7 +98,7 @@ pub fn log_components(components: &Components) {
ir_planner,
df_planner,
df_plan_exec,
parquet_file_sink,
parquet_files_sink,
round_split,
divide_initial,
scratchpad_gen,
@ -122,7 +122,7 @@ pub fn log_components(components: &Components) {
%ir_planner,
%df_planner,
%df_plan_exec,
%parquet_file_sink,
%parquet_files_sink,
%round_split,
%divide_initial,
%scratchpad_gen,

View File

@ -1,9 +1,7 @@
use std::{future::Future, num::NonZeroUsize, sync::Arc, time::Duration};
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt};
use iox_time::Time;
use futures::StreamExt;
use observability_deps::tracing::info;
use parquet_file::ParquetFilePath;
use tracker::InstrumentedAsyncSemaphore;
@ -383,13 +381,6 @@ async fn run_compaction_plan(
return Ok(vec![]);
}
// compute max_l0_created_at
let max_l0_created_at = files
.iter()
.map(|f| f.max_l0_created_at)
.max()
.expect("max_l0_created_at should have value");
// stage files
let input_paths: Vec<ParquetFilePath> = files.iter().map(|f| f.into()).collect();
let input_uuids_inpad = scratchpad_ctx.load_to_scratchpad(&input_paths).await;
@ -425,15 +416,14 @@ async fn run_compaction_plan(
let plan = components
.df_planner
.plan(plan_ir, Arc::clone(partition_info))
.plan(&plan_ir, Arc::clone(partition_info))
.await?;
let streams = components.df_plan_exec.exec(plan);
let job = stream_into_file_sink(
let job = components.parquet_files_sink.stream_into_file_sink(
streams,
Arc::clone(partition_info),
target_level,
max_l0_created_at.into(),
Arc::clone(components),
&plan_ir,
);
// TODO: react to OOM and try to divide branch
@ -558,32 +548,3 @@ async fn fetch_partition_info(
partition_key: partition.partition_key,
}))
}
fn stream_into_file_sink(
streams: Vec<SendableRecordBatchStream>,
partition_info: Arc<PartitionInfo>,
target_level: CompactionLevel,
max_l0_created_at: Time,
components: Arc<Components>,
) -> impl Future<Output = Result<Vec<ParquetFileParams>, DynError>> {
streams
.into_iter()
.map(move |stream| {
let components = Arc::clone(&components);
let partition_info = Arc::clone(&partition_info);
async move {
components
.parquet_file_sink
.store(stream, partition_info, target_level, max_l0_created_at)
.await
}
})
// NB: FuturesOrdered allows the futures to run in parallel
.collect::<FuturesOrdered<_>>()
// Discard the streams that resulted in empty output / no file uploaded
// to the object store.
.try_filter_map(|v| futures::future::ready(Ok(v)))
// Collect all the persisted parquet files together.
.try_collect::<Vec<_>>()
.map_err(|e| Box::new(e) as _)
}

View File

@ -26,6 +26,13 @@ impl PlanIR {
Self::Split { split_times, .. } => split_times.len() + 1,
}
}
pub fn input_files(&self) -> &[FileIR] {
match self {
Self::Compact { files } => files,
Self::Split { files, .. } => files,
}
}
}
impl Display for PlanIR {