feat: instrument semaphore "cancelled while pending" requests (#4876)

This is useful to see how many requests timed out while waiting for a
semaphore.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-06-16 14:33:39 +02:00 committed by GitHub
parent 4b945493be
commit 827d869658
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 39 additions and 2 deletions

View File

@ -2,7 +2,7 @@
use std::{future::Future, marker::PhantomData, sync::Arc, task::Poll, time::Instant};
use futures::{future::BoxFuture, FutureExt};
use metric::{Attributes, DurationHistogram, MakeMetricObserver, U64Gauge};
use metric::{Attributes, DurationHistogram, MakeMetricObserver, U64Counter, U64Gauge};
use pin_project::{pin_project, pinned_drop};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
@ -14,8 +14,10 @@ pub struct AsyncSemaphoreMetrics {
permits_acquired: U64Gauge,
permits_total: U64Gauge,
permits_pending: U64Gauge,
permits_cancelled_while_pending: U64Counter,
holders_acquired: U64Gauge,
holders_pending: U64Gauge,
holders_cancelled_while_pending: U64Counter,
acquire_duration: DurationHistogram,
}
@ -42,6 +44,12 @@ impl AsyncSemaphoreMetrics {
"Number of pending permits",
)
.recorder(attributes.clone());
let permits_cancelled_while_pending = registry
.register_metric::<U64Counter>(
"iox_async_semaphore_permits_cancelled_while_pending",
"Counter for permits that were cancelled while they were waiting for the semaphore.",
)
.recorder(attributes.clone());
let holders_acquired = registry
.register_metric::<U64Gauge>(
"iox_async_semaphore_holders_acquired",
@ -54,6 +62,12 @@ impl AsyncSemaphoreMetrics {
"Number of pending semaphore holders. Each holder might have multiple permits",
)
.recorder(attributes.clone());
let holders_cancelled_while_pending = registry
.register_metric::<U64Counter>(
"iox_async_semaphore_holders_cancelled_while_pending",
"Number of pending semaphore holders that were cancelled while they were waiting for the semaphore. Each holder might have multiple permits",
)
.recorder(attributes.clone());
let acquire_duration = registry
.register_metric::<DurationHistogram>(
"iox_async_semaphore_acquire_duration",
@ -65,8 +79,10 @@ impl AsyncSemaphoreMetrics {
permits_acquired,
permits_total,
permits_pending,
permits_cancelled_while_pending,
holders_acquired,
holders_pending,
holders_cancelled_while_pending,
acquire_duration,
}
}
@ -77,8 +93,10 @@ impl AsyncSemaphoreMetrics {
permits_acquired: Default::default(),
permits_total: Default::default(),
permits_pending: Default::default(),
permits_cancelled_while_pending: Default::default(),
holders_acquired: Default::default(),
holders_pending: Default::default(),
holders_cancelled_while_pending: Default::default(),
acquire_duration: DurationHistogram::create(&Default::default()),
}
}
@ -267,6 +285,11 @@ impl<'a> PinnedDrop for InstrumentedAsyncSemaphoreAcquire<'a> {
if *this.reported_pending {
this.metrics.permits_pending.dec(*this.n as u64);
this.metrics.holders_pending.dec(1);
this.metrics
.permits_cancelled_while_pending
.inc(*this.n as u64);
this.metrics.holders_cancelled_while_pending.inc(1);
}
}
}
@ -449,11 +472,15 @@ mod tests {
assert_eq!(metrics.holders_pending.fetch(), 0);
assert_eq!(metrics.permits_pending.fetch(), 0);
assert_eq!(metrics.holders_cancelled_while_pending.fetch(), 0);
assert_eq!(metrics.permits_cancelled_while_pending.fetch(), 0);
let p1 = semaphore.acquire_many(5).await.unwrap();
assert_eq!(metrics.holders_pending.fetch(), 0);
assert_eq!(metrics.permits_pending.fetch(), 0);
assert_eq!(metrics.holders_cancelled_while_pending.fetch(), 0);
assert_eq!(metrics.permits_cancelled_while_pending.fetch(), 0);
{
let fut = semaphore.acquire_many(6);
@ -462,12 +489,16 @@ mod tests {
assert_eq!(metrics.holders_pending.fetch(), 1);
assert_eq!(metrics.permits_pending.fetch(), 6);
assert_eq!(metrics.holders_cancelled_while_pending.fetch(), 0);
assert_eq!(metrics.permits_cancelled_while_pending.fetch(), 0);
// `fut` is dropped here
}
assert_eq!(metrics.holders_pending.fetch(), 0);
assert_eq!(metrics.permits_pending.fetch(), 0);
assert_eq!(metrics.holders_cancelled_while_pending.fetch(), 1);
assert_eq!(metrics.permits_cancelled_while_pending.fetch(), 6);
{
let fut = semaphore.acquire_many(6);
@ -476,6 +507,8 @@ mod tests {
assert_eq!(metrics.holders_pending.fetch(), 1);
assert_eq!(metrics.permits_pending.fetch(), 6);
assert_eq!(metrics.holders_cancelled_while_pending.fetch(), 1);
assert_eq!(metrics.permits_cancelled_while_pending.fetch(), 6);
drop(p1);
@ -483,13 +516,17 @@ mod tests {
assert_eq!(metrics.holders_pending.fetch(), 0);
assert_eq!(metrics.permits_pending.fetch(), 0);
assert_eq!(metrics.holders_cancelled_while_pending.fetch(), 1);
assert_eq!(metrics.permits_cancelled_while_pending.fetch(), 6);
// `fut` is finally dropped here
}
// dropping the future should not decrease the counter a 2nd time
// dropping the future should NOT decrease the pending counter a 2nd time and should NOT be considered as "cancelled"
assert_eq!(metrics.holders_pending.fetch(), 0);
assert_eq!(metrics.permits_pending.fetch(), 0);
assert_eq!(metrics.holders_cancelled_while_pending.fetch(), 1);
assert_eq!(metrics.permits_cancelled_while_pending.fetch(), 6);
}
#[tokio::test]