feat: run many compact partitions in parallel (#5230)

* feat: run many compact partitions in parallel

* refactor: Use rust futures fu to run compactor jobs in parallel

* chore: Apply suggestions from code review

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
pull/24376/head
Nga Tran 2022-07-27 16:55:45 -04:00 committed by GitHub
parent 7eebe061a6
commit fcce00bf09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 45 additions and 15 deletions

View File

@ -5,7 +5,7 @@ use backoff::{Backoff, BackoffConfig};
use data_types::SequencerId;
use futures::{
future::{BoxFuture, Shared},
FutureExt, TryFutureExt,
FutureExt, StreamExt, TryFutureExt,
};
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
@ -296,23 +296,53 @@ async fn run_compactor(compactor: Arc<Compactor>, shutdown: CancellationToken) {
debug!(n_candidates, "found compaction candidates");
}
// Serially compact all candidates
// TODO: we will parallelize this when everything runs smoothly in serial
let start_time = compactor.time_provider.now();
for c in candidates {
let partition_id = c.candidate.partition_id;
debug!(?partition_id, "compaction starting");
let compaction_result = crate::compact_partition(&compactor, c).await;
// Repeat compacting n partitions in parallel until all candidates are compacted.
// Concurrency level calculation (this is estimated from previous experiments. The actual resource
// management will be more complicated and a future feature):
// . Each `compact partititon` takes max of this much memory input_size_threshold_bytes
// . We have this memory budget: max_concurrent_compaction_size_bytes
// --> num_parallel_partitions = max_concurrent_compaction_size_bytes/ input_size_threshold_bytes
let num_parallel_partitions = (compactor.config.max_concurrent_compaction_size_bytes
/ compactor.config.input_size_threshold_bytes)
as usize;
futures::stream::iter(candidates)
.map(|p| {
// run compaction in its own task
let comp = Arc::clone(&compactor);
tokio::task::spawn(async move {
let partition_id = p.candidate.partition_id;
let compaction_result = crate::compact_partition(&comp, p).await;
match compaction_result {
Err(e) => {
warn!(?partition_id, "compaction failed: {:?}", e);
warn!(?e, ?partition_id, "compaction failed");
}
Ok(_) => {
debug!(?partition_id, "compaction complete");
}
};
})
})
// Assume we have enough resources to run
// num_parallel_partitions compactions in parallel
.buffer_unordered(num_parallel_partitions)
// report any JoinErrors (aka task panics)
.map(|join_result| {
if let Err(e) = join_result {
warn!(?e, "compaction task failed");
}
}
Ok(())
})
// Errors are reported during execution, so ignore results here
// https://stackoverflow.com/questions/64443085/how-to-run-stream-to-completion-in-rust-using-combinators-other-than-for-each
.forward(futures::sink::drain())
.await
.ok();
// Done compacting all candidates in the cycle, record its time
if let Some(delta) = compactor
.time_provider
.now()