diff --git a/tracker/src/async_semaphore.rs b/tracker/src/async_semaphore.rs index d2b1571f6d..e7dd859c97 100644 --- a/tracker/src/async_semaphore.rs +++ b/tracker/src/async_semaphore.rs @@ -2,7 +2,7 @@ use std::{future::Future, marker::PhantomData, sync::Arc, task::Poll, time::Instant}; use futures::{future::BoxFuture, FutureExt}; -use metric::{Attributes, DurationHistogram, MakeMetricObserver, U64Gauge}; +use metric::{Attributes, DurationHistogram, MakeMetricObserver, U64Counter, U64Gauge}; use pin_project::{pin_project, pinned_drop}; use tokio::sync::{OwnedSemaphorePermit, Semaphore}; @@ -14,8 +14,10 @@ pub struct AsyncSemaphoreMetrics { permits_acquired: U64Gauge, permits_total: U64Gauge, permits_pending: U64Gauge, + permits_cancelled_while_pending: U64Counter, holders_acquired: U64Gauge, holders_pending: U64Gauge, + holders_cancelled_while_pending: U64Counter, acquire_duration: DurationHistogram, } @@ -42,6 +44,12 @@ impl AsyncSemaphoreMetrics { "Number of pending permits", ) .recorder(attributes.clone()); + let permits_cancelled_while_pending = registry + .register_metric::( + "iox_async_semaphore_permits_cancelled_while_pending", + "Counter for permits that were cancelled while they were waiting for the semaphore.", + ) + .recorder(attributes.clone()); let holders_acquired = registry .register_metric::( "iox_async_semaphore_holders_acquired", @@ -54,6 +62,12 @@ impl AsyncSemaphoreMetrics { "Number of pending semaphore holders. Each holder might have multiple permits", ) .recorder(attributes.clone()); + let holders_cancelled_while_pending = registry + .register_metric::( + "iox_async_semaphore_holders_cancelled_while_pending", + "Number of pending semaphore holders that were cancelled while they were waiting for the semaphore. Each holder might have multiple permits", + ) + .recorder(attributes.clone()); let acquire_duration = registry .register_metric::( "iox_async_semaphore_acquire_duration", @@ -65,8 +79,10 @@ impl AsyncSemaphoreMetrics { permits_acquired, permits_total, permits_pending, + permits_cancelled_while_pending, holders_acquired, holders_pending, + holders_cancelled_while_pending, acquire_duration, } } @@ -77,8 +93,10 @@ impl AsyncSemaphoreMetrics { permits_acquired: Default::default(), permits_total: Default::default(), permits_pending: Default::default(), + permits_cancelled_while_pending: Default::default(), holders_acquired: Default::default(), holders_pending: Default::default(), + holders_cancelled_while_pending: Default::default(), acquire_duration: DurationHistogram::create(&Default::default()), } } @@ -267,6 +285,11 @@ impl<'a> PinnedDrop for InstrumentedAsyncSemaphoreAcquire<'a> { if *this.reported_pending { this.metrics.permits_pending.dec(*this.n as u64); this.metrics.holders_pending.dec(1); + + this.metrics + .permits_cancelled_while_pending + .inc(*this.n as u64); + this.metrics.holders_cancelled_while_pending.inc(1); } } } @@ -449,11 +472,15 @@ mod tests { assert_eq!(metrics.holders_pending.fetch(), 0); assert_eq!(metrics.permits_pending.fetch(), 0); + assert_eq!(metrics.holders_cancelled_while_pending.fetch(), 0); + assert_eq!(metrics.permits_cancelled_while_pending.fetch(), 0); let p1 = semaphore.acquire_many(5).await.unwrap(); assert_eq!(metrics.holders_pending.fetch(), 0); assert_eq!(metrics.permits_pending.fetch(), 0); + assert_eq!(metrics.holders_cancelled_while_pending.fetch(), 0); + assert_eq!(metrics.permits_cancelled_while_pending.fetch(), 0); { let fut = semaphore.acquire_many(6); @@ -462,12 +489,16 @@ mod tests { assert_eq!(metrics.holders_pending.fetch(), 1); assert_eq!(metrics.permits_pending.fetch(), 6); + assert_eq!(metrics.holders_cancelled_while_pending.fetch(), 0); + assert_eq!(metrics.permits_cancelled_while_pending.fetch(), 0); // `fut` is dropped here } assert_eq!(metrics.holders_pending.fetch(), 0); assert_eq!(metrics.permits_pending.fetch(), 0); + assert_eq!(metrics.holders_cancelled_while_pending.fetch(), 1); + assert_eq!(metrics.permits_cancelled_while_pending.fetch(), 6); { let fut = semaphore.acquire_many(6); @@ -476,6 +507,8 @@ mod tests { assert_eq!(metrics.holders_pending.fetch(), 1); assert_eq!(metrics.permits_pending.fetch(), 6); + assert_eq!(metrics.holders_cancelled_while_pending.fetch(), 1); + assert_eq!(metrics.permits_cancelled_while_pending.fetch(), 6); drop(p1); @@ -483,13 +516,17 @@ mod tests { assert_eq!(metrics.holders_pending.fetch(), 0); assert_eq!(metrics.permits_pending.fetch(), 0); + assert_eq!(metrics.holders_cancelled_while_pending.fetch(), 1); + assert_eq!(metrics.permits_cancelled_while_pending.fetch(), 6); // `fut` is finally dropped here } - // dropping the future should not decrease the counter a 2nd time + // dropping the future should NOT decrease the pending counter a 2nd time and should NOT be considered as "cancelled" assert_eq!(metrics.holders_pending.fetch(), 0); assert_eq!(metrics.permits_pending.fetch(), 0); + assert_eq!(metrics.holders_cancelled_while_pending.fetch(), 1); + assert_eq!(metrics.permits_cancelled_while_pending.fetch(), 6); } #[tokio::test]