fix: Run compactor streams in parallel to avoid deadlock (#5212)
* fix: run compaction streams at once * fix: make it compile * fix: improve wording * fix: use task to write parquet files in parallelpull/24376/head
parent
7d16ac7de0
commit
bbcf4ec64e
|
|
@ -3,6 +3,7 @@ use data_types::{
|
|||
CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId, TableSchema,
|
||||
};
|
||||
use datafusion::error::DataFusionError;
|
||||
use futures::{stream::FuturesOrdered, StreamExt, TryStreamExt};
|
||||
use iox_catalog::interface::Catalog;
|
||||
use iox_query::{
|
||||
exec::{Executor, ExecutorType},
|
||||
|
|
@ -44,6 +45,9 @@ pub(crate) enum Error {
|
|||
#[snafu(display("Error executing compact plan {}", source))]
|
||||
ExecuteCompactPlan { source: DataFusionError },
|
||||
|
||||
#[snafu(display("Error executing parquet write task {}", source))]
|
||||
ExecuteParquetTask { source: tokio::task::JoinError },
|
||||
|
||||
#[snafu(display("Could not serialize and persist record batches {}", source))]
|
||||
Persist {
|
||||
source: parquet_file::storage::UploadError,
|
||||
|
|
@ -215,60 +219,89 @@ pub(crate) async fn compact_parquet_files(
|
|||
.await
|
||||
.context(CompactPhysicalPlanSnafu)?;
|
||||
|
||||
let partition = Arc::new(partition);
|
||||
|
||||
// Run to collect each stream of the plan
|
||||
let stream_count = physical_plan.output_partitioning().partition_count();
|
||||
let mut compacted_parquet_files = Vec::with_capacity(stream_count);
|
||||
|
||||
debug!("running plan with {} streams", stream_count);
|
||||
for i in 0..stream_count {
|
||||
trace!(partition = i, "executing datafusion partition");
|
||||
let data = ctx
|
||||
.execute_stream_partitioned(Arc::clone(&physical_plan), i)
|
||||
.await
|
||||
.context(ExecuteCompactPlanSnafu)?;
|
||||
trace!(partition = i, "built result stream for partition");
|
||||
|
||||
let meta = IoxMetadata {
|
||||
object_store_id: Uuid::new_v4(),
|
||||
creation_timestamp: time_provider.now(),
|
||||
sequencer_id: partition.sequencer_id(),
|
||||
namespace_id: partition.namespace_id(),
|
||||
namespace_name: partition.namespace.name.clone().into(),
|
||||
table_id: partition.table.id,
|
||||
table_name: partition.table.name.clone().into(),
|
||||
partition_id,
|
||||
partition_key: partition.partition_key.clone(),
|
||||
max_sequence_number,
|
||||
compaction_level: CompactionLevel::FileNonOverlapped,
|
||||
sort_key: Some(sort_key.clone()),
|
||||
};
|
||||
// These streams *must* to run in parallel otherwise a deadlock
|
||||
// can occur. Since there is a merge in the plan, in order to make
|
||||
// progress on one stream there must be (potential space) on the
|
||||
// other streams.
|
||||
//
|
||||
// https://github.com/influxdata/influxdb_iox/issues/4306
|
||||
// https://github.com/influxdata/influxdb_iox/issues/4324
|
||||
let compacted_parquet_files = (0..stream_count)
|
||||
.map(|i| {
|
||||
// Prepare variables to pass to the closure
|
||||
let ctx = exec.new_context(ExecutorType::Reorg);
|
||||
let physical_plan = Arc::clone(&physical_plan);
|
||||
let store = store.clone();
|
||||
let time_provider = Arc::clone(&time_provider);
|
||||
let sort_key = sort_key.clone();
|
||||
let partition = Arc::clone(&partition);
|
||||
// run as a separate tokio task so files can be written
|
||||
// concurrently.
|
||||
tokio::task::spawn(async move {
|
||||
trace!(partition = i, "executing datafusion partition");
|
||||
let data = ctx
|
||||
.execute_stream_partitioned(physical_plan, i)
|
||||
.await
|
||||
.context(ExecuteCompactPlanSnafu)?;
|
||||
trace!(partition = i, "built result stream for partition");
|
||||
|
||||
debug!(
|
||||
?partition_id,
|
||||
"executing and uploading compaction StreamSplitExec"
|
||||
);
|
||||
let meta = IoxMetadata {
|
||||
object_store_id: Uuid::new_v4(),
|
||||
creation_timestamp: time_provider.now(),
|
||||
sequencer_id: partition.sequencer_id(),
|
||||
namespace_id: partition.namespace_id(),
|
||||
namespace_name: partition.namespace.name.clone().into(),
|
||||
table_id: partition.table.id,
|
||||
table_name: partition.table.name.clone().into(),
|
||||
partition_id,
|
||||
partition_key: partition.partition_key.clone(),
|
||||
max_sequence_number,
|
||||
compaction_level: CompactionLevel::FileNonOverlapped,
|
||||
sort_key: Some(sort_key.clone()),
|
||||
};
|
||||
|
||||
let object_store_id = meta.object_store_id;
|
||||
info!(?partition_id, %object_store_id, "streaming exec to object store");
|
||||
debug!(
|
||||
?partition_id,
|
||||
"executing and uploading compaction StreamSplitExec"
|
||||
);
|
||||
|
||||
// Stream the record batches from the compaction exec, serialize
|
||||
// them, and directly upload the resulting Parquet files to
|
||||
// object storage.
|
||||
let (parquet_meta, file_size) = store.upload(data, &meta).await.context(PersistSnafu)?;
|
||||
let object_store_id = meta.object_store_id;
|
||||
info!(?partition_id, %object_store_id, "streaming exec to object store");
|
||||
|
||||
debug!(?partition_id, %object_store_id, "file uploaded to object store");
|
||||
// Stream the record batches from the compaction exec, serialize
|
||||
// them, and directly upload the resulting Parquet files to
|
||||
// object storage.
|
||||
let (parquet_meta, file_size) =
|
||||
store.upload(data, &meta).await.context(PersistSnafu)?;
|
||||
|
||||
let parquet_file = meta.to_parquet_file(partition_id, file_size, &parquet_meta, |name| {
|
||||
partition
|
||||
.table_schema
|
||||
.columns
|
||||
.get(name)
|
||||
.expect("unknown column")
|
||||
.id
|
||||
});
|
||||
debug!(?partition_id, %object_store_id, "file uploaded to object store");
|
||||
|
||||
compacted_parquet_files.push(parquet_file);
|
||||
}
|
||||
let parquet_file =
|
||||
meta.to_parquet_file(partition_id, file_size, &parquet_meta, |name| {
|
||||
partition
|
||||
.table_schema
|
||||
.columns
|
||||
.get(name)
|
||||
.expect("unknown column")
|
||||
.id
|
||||
});
|
||||
|
||||
Ok(parquet_file)
|
||||
})
|
||||
})
|
||||
// NB: FuturesOrdered allows the futures to run in parallel
|
||||
.collect::<FuturesOrdered<_>>()
|
||||
// Check for errors in the task
|
||||
.map(|t| t.context(ExecuteParquetTaskSnafu)?)
|
||||
.try_collect::<Vec<_>>()
|
||||
.await?;
|
||||
|
||||
update_catalog(
|
||||
catalog,
|
||||
|
|
|
|||
Loading…
Reference in New Issue