feat: add log around job semaphore and per-partition (#6927)

May help w/ debug OOMs.
pull/24376/head
Marco Neumann 2023-02-09 16:02:34 +01:00 committed by GitHub
parent 00fd545a18
commit 0e5f31c576
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 16 additions and 2 deletions

View File

@ -56,6 +56,7 @@ async fn compact_partition(
job_semaphore: Arc<InstrumentedAsyncSemaphore>,
components: Arc<Components>,
) {
info!(partition_id = partition_id.get(), "compact partition",);
let mut scratchpad = components.scratchpad_gen.pad();
let res = tokio::time::timeout(
@ -78,6 +79,7 @@ async fn compact_partition(
.await;
scratchpad.clean().await;
info!(partition_id = partition_id.get(), "compacted partition",);
}
/// Main function to compact files of a single partition.
@ -412,10 +414,14 @@ async fn run_compaction_plan(
// We guard the DataFusion planning (that doesn't perform any IO) via the semaphore as well in case
// DataFusion ever starts to pre-allocate buffers during the physical planning. To the best of our
// knowledge, this is currently (2023-01-25) not the case but if this ever changes, then we are prepared.
let _permit = job_semaphore
let permit = job_semaphore
.acquire(None)
.await
.expect("semaphore not closed");
info!(
partition_id = partition_info.partition_id.get(),
"job semaphore acquired",
);
let plan = components
.df_planner
@ -431,7 +437,15 @@ async fn run_compaction_plan(
);
// TODO: react to OOM and try to divide branch
job.await?
let res = job.await;
drop(permit);
info!(
partition_id = partition_info.partition_id.get(),
"job semaphore released",
);
res?
};
Ok(create)