diff --git a/compactor/src/parquet_file_combining.rs b/compactor/src/parquet_file_combining.rs index fcc05e353b..157bca1d61 100644 --- a/compactor/src/parquet_file_combining.rs +++ b/compactor/src/parquet_file_combining.rs @@ -3,6 +3,7 @@ use data_types::{ CompactionLevel, ParquetFile, ParquetFileId, ParquetFileParams, PartitionId, TableSchema, }; use datafusion::error::DataFusionError; +use futures::{stream::FuturesOrdered, StreamExt, TryStreamExt}; use iox_catalog::interface::Catalog; use iox_query::{ exec::{Executor, ExecutorType}, @@ -44,6 +45,9 @@ pub(crate) enum Error { #[snafu(display("Error executing compact plan {}", source))] ExecuteCompactPlan { source: DataFusionError }, + #[snafu(display("Error executing parquet write task {}", source))] + ExecuteParquetTask { source: tokio::task::JoinError }, + #[snafu(display("Could not serialize and persist record batches {}", source))] Persist { source: parquet_file::storage::UploadError, @@ -215,60 +219,89 @@ pub(crate) async fn compact_parquet_files( .await .context(CompactPhysicalPlanSnafu)?; + let partition = Arc::new(partition); + // Run to collect each stream of the plan let stream_count = physical_plan.output_partitioning().partition_count(); - let mut compacted_parquet_files = Vec::with_capacity(stream_count); debug!("running plan with {} streams", stream_count); - for i in 0..stream_count { - trace!(partition = i, "executing datafusion partition"); - let data = ctx - .execute_stream_partitioned(Arc::clone(&physical_plan), i) - .await - .context(ExecuteCompactPlanSnafu)?; - trace!(partition = i, "built result stream for partition"); - let meta = IoxMetadata { - object_store_id: Uuid::new_v4(), - creation_timestamp: time_provider.now(), - sequencer_id: partition.sequencer_id(), - namespace_id: partition.namespace_id(), - namespace_name: partition.namespace.name.clone().into(), - table_id: partition.table.id, - table_name: partition.table.name.clone().into(), - partition_id, - partition_key: partition.partition_key.clone(), - max_sequence_number, - compaction_level: CompactionLevel::FileNonOverlapped, - sort_key: Some(sort_key.clone()), - }; + // These streams *must* to run in parallel otherwise a deadlock + // can occur. Since there is a merge in the plan, in order to make + // progress on one stream there must be (potential space) on the + // other streams. + // + // https://github.com/influxdata/influxdb_iox/issues/4306 + // https://github.com/influxdata/influxdb_iox/issues/4324 + let compacted_parquet_files = (0..stream_count) + .map(|i| { + // Prepare variables to pass to the closure + let ctx = exec.new_context(ExecutorType::Reorg); + let physical_plan = Arc::clone(&physical_plan); + let store = store.clone(); + let time_provider = Arc::clone(&time_provider); + let sort_key = sort_key.clone(); + let partition = Arc::clone(&partition); + // run as a separate tokio task so files can be written + // concurrently. + tokio::task::spawn(async move { + trace!(partition = i, "executing datafusion partition"); + let data = ctx + .execute_stream_partitioned(physical_plan, i) + .await + .context(ExecuteCompactPlanSnafu)?; + trace!(partition = i, "built result stream for partition"); - debug!( - ?partition_id, - "executing and uploading compaction StreamSplitExec" - ); + let meta = IoxMetadata { + object_store_id: Uuid::new_v4(), + creation_timestamp: time_provider.now(), + sequencer_id: partition.sequencer_id(), + namespace_id: partition.namespace_id(), + namespace_name: partition.namespace.name.clone().into(), + table_id: partition.table.id, + table_name: partition.table.name.clone().into(), + partition_id, + partition_key: partition.partition_key.clone(), + max_sequence_number, + compaction_level: CompactionLevel::FileNonOverlapped, + sort_key: Some(sort_key.clone()), + }; - let object_store_id = meta.object_store_id; - info!(?partition_id, %object_store_id, "streaming exec to object store"); + debug!( + ?partition_id, + "executing and uploading compaction StreamSplitExec" + ); - // Stream the record batches from the compaction exec, serialize - // them, and directly upload the resulting Parquet files to - // object storage. - let (parquet_meta, file_size) = store.upload(data, &meta).await.context(PersistSnafu)?; + let object_store_id = meta.object_store_id; + info!(?partition_id, %object_store_id, "streaming exec to object store"); - debug!(?partition_id, %object_store_id, "file uploaded to object store"); + // Stream the record batches from the compaction exec, serialize + // them, and directly upload the resulting Parquet files to + // object storage. + let (parquet_meta, file_size) = + store.upload(data, &meta).await.context(PersistSnafu)?; - let parquet_file = meta.to_parquet_file(partition_id, file_size, &parquet_meta, |name| { - partition - .table_schema - .columns - .get(name) - .expect("unknown column") - .id - }); + debug!(?partition_id, %object_store_id, "file uploaded to object store"); - compacted_parquet_files.push(parquet_file); - } + let parquet_file = + meta.to_parquet_file(partition_id, file_size, &parquet_meta, |name| { + partition + .table_schema + .columns + .get(name) + .expect("unknown column") + .id + }); + + Ok(parquet_file) + }) + }) + // NB: FuturesOrdered allows the futures to run in parallel + .collect::>() + // Check for errors in the task + .map(|t| t.context(ExecuteParquetTaskSnafu)?) + .try_collect::>() + .await?; update_catalog( catalog,