diff --git a/compactor/src/handler.rs b/compactor/src/handler.rs index 24ead85527..f48e3928df 100644 --- a/compactor/src/handler.rs +++ b/compactor/src/handler.rs @@ -5,7 +5,7 @@ use backoff::{Backoff, BackoffConfig}; use data_types::SequencerId; use futures::{ future::{BoxFuture, Shared}, - FutureExt, TryFutureExt, + FutureExt, StreamExt, TryFutureExt, }; use iox_catalog::interface::Catalog; use iox_query::exec::Executor; @@ -296,23 +296,53 @@ async fn run_compactor(compactor: Arc, shutdown: CancellationToken) { debug!(n_candidates, "found compaction candidates"); } - // Serially compact all candidates - // TODO: we will parallelize this when everything runs smoothly in serial let start_time = compactor.time_provider.now(); - for c in candidates { - let partition_id = c.candidate.partition_id; - debug!(?partition_id, "compaction starting"); - let compaction_result = crate::compact_partition(&compactor, c).await; - match compaction_result { - Err(e) => { - warn!(?partition_id, "compaction failed: {:?}", e); + // Repeat compacting n partitions in parallel until all candidates are compacted. + // Concurrency level calculation (this is estimated from previous experiments. The actual resource + // management will be more complicated and a future feature): + // . Each `compact partititon` takes max of this much memory input_size_threshold_bytes + // . We have this memory budget: max_concurrent_compaction_size_bytes + // --> num_parallel_partitions = max_concurrent_compaction_size_bytes/ input_size_threshold_bytes + let num_parallel_partitions = (compactor.config.max_concurrent_compaction_size_bytes + / compactor.config.input_size_threshold_bytes) + as usize; + + futures::stream::iter(candidates) + .map(|p| { + // run compaction in its own task + let comp = Arc::clone(&compactor); + tokio::task::spawn(async move { + let partition_id = p.candidate.partition_id; + let compaction_result = crate::compact_partition(&comp, p).await; + + match compaction_result { + Err(e) => { + warn!(?e, ?partition_id, "compaction failed"); + } + Ok(_) => { + debug!(?partition_id, "compaction complete"); + } + }; + }) + }) + // Assume we have enough resources to run + // num_parallel_partitions compactions in parallel + .buffer_unordered(num_parallel_partitions) + // report any JoinErrors (aka task panics) + .map(|join_result| { + if let Err(e) = join_result { + warn!(?e, "compaction task failed"); } - Ok(_) => { - debug!(?partition_id, "compaction complete"); - } - } - } + Ok(()) + }) + // Errors are reported during execution, so ignore results here + // https://stackoverflow.com/questions/64443085/how-to-run-stream-to-completion-in-rust-using-combinators-other-than-for-each + .forward(futures::sink::drain()) + .await + .ok(); + + // Done compacting all candidates in the cycle, record its time if let Some(delta) = compactor .time_provider .now()