diff --git a/cache_system/src/cache/metrics.rs b/cache_system/src/cache/metrics.rs index 7a953bb163..7ebad842a1 100644 --- a/cache_system/src/cache/metrics.rs +++ b/cache_system/src/cache/metrics.rs @@ -4,6 +4,7 @@ use std::{fmt::Debug, sync::Arc}; use async_trait::async_trait; use iox_time::{Time, TimeProvider}; use metric::{Attributes, DurationHistogram, U64Counter}; +use observability_deps::tracing::warn; use trace::span::{Span, SpanRecorder}; use super::{Cache, CacheGetStatus, CachePeekStatus}; @@ -180,15 +181,22 @@ impl<'a> Drop for SetGetMetricOnDrop<'a> { fn drop(&mut self) { let t_end = self.metrics.time_provider.now(); - match self.status { - Some(CacheGetStatus::Hit) => &self.metrics.metric_get_hit, - Some(CacheGetStatus::Miss) => &self.metrics.metric_get_miss, - Some(CacheGetStatus::MissAlreadyLoading) => { - &self.metrics.metric_get_miss_already_loading + match t_end.checked_duration_since(self.t_start) { + Some(duration) => { + match self.status { + Some(CacheGetStatus::Hit) => &self.metrics.metric_get_hit, + Some(CacheGetStatus::Miss) => &self.metrics.metric_get_miss, + Some(CacheGetStatus::MissAlreadyLoading) => { + &self.metrics.metric_get_miss_already_loading + } + None => &self.metrics.metric_get_cancelled, + } + .record(duration); + } + None => { + warn!("Clock went backwards, not recording cache GET duration"); } - None => &self.metrics.metric_get_cancelled, } - .record(t_end - self.t_start); if let Some(status) = self.status { self.span_recorder.ok(status.name()); @@ -224,15 +232,22 @@ impl<'a> Drop for SetPeekMetricOnDrop<'a> { fn drop(&mut self) { let t_end = self.metrics.time_provider.now(); - match self.status { - Some(Some(CachePeekStatus::Hit)) => &self.metrics.metric_peek_hit, - Some(Some(CachePeekStatus::MissAlreadyLoading)) => { - &self.metrics.metric_peek_miss_already_loading + match t_end.checked_duration_since(self.t_start) { + Some(duration) => { + match self.status { + Some(Some(CachePeekStatus::Hit)) => &self.metrics.metric_peek_hit, + Some(Some(CachePeekStatus::MissAlreadyLoading)) => { + &self.metrics.metric_peek_miss_already_loading + } + Some(None) => &self.metrics.metric_peek_miss, + None => &self.metrics.metric_peek_cancelled, + } + .record(duration); + } + None => { + warn!("Clock went backwards, not recording cache PEEK duration"); } - Some(None) => &self.metrics.metric_peek_miss, - None => &self.metrics.metric_peek_cancelled, } - .record(t_end - self.t_start); if let Some(status) = self.status { self.span_recorder diff --git a/cache_system/src/loader/metrics.rs b/cache_system/src/loader/metrics.rs index c19f669fc7..72645b25da 100644 --- a/cache_system/src/loader/metrics.rs +++ b/cache_system/src/loader/metrics.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use async_trait::async_trait; use iox_time::TimeProvider; use metric::{DurationHistogram, U64Counter}; +use observability_deps::tracing::warn; use parking_lot::Mutex; use pdatastructs::filters::{bloomfilter::BloomFilter, Filter}; @@ -138,7 +139,14 @@ where let v = self.inner.load(k, extra).await; let t_end = self.time_provider.now(); - self.metric_duration.record(t_end - t_start); + match t_end.checked_duration_since(t_start) { + Some(duration) => { + self.metric_duration.record(duration); + } + None => { + warn!("Clock went backwards, not recording loader duration"); + } + } v } diff --git a/compactor_scheduler/src/local_scheduler/partitions_source/catalog_to_compact.rs b/compactor_scheduler/src/local_scheduler/partitions_source/catalog_to_compact.rs index 7a9ec49258..d6c71f424c 100644 --- a/compactor_scheduler/src/local_scheduler/partitions_source/catalog_to_compact.rs +++ b/compactor_scheduler/src/local_scheduler/partitions_source/catalog_to_compact.rs @@ -1,4 +1,4 @@ -use std::{fmt::Display, ops::Sub, sync::Arc, sync::Mutex, time::Duration}; +use std::{fmt::Display, sync::Arc, sync::Mutex, time::Duration}; use async_trait::async_trait; use backoff::{Backoff, BackoffConfig}; @@ -81,7 +81,12 @@ impl PartitionsSource for CatalogToCompactPartitionsSource { // that creates busy-work that will spam the catalog with more queries to determine no compaction // needed. But we also don't want to query so far back in time that we get all partitions, so the // lookback is limited to 3x the configured threshold. - if minimum_time < *last || minimum_time.sub(*last) < self.min_threshold * 3 { + if minimum_time < *last + || minimum_time + .checked_duration_since(*last) + .map(|duration| duration < self.min_threshold * 3) + .unwrap_or_default() + { // the end of the last query is less than 3x our configured lookback, so we can query everything // since the last query. minimum_time = *last; diff --git a/iox_time/src/lib.rs b/iox_time/src/lib.rs index f7e30b720c..3fd72e2cdf 100644 --- a/iox_time/src/lib.rs +++ b/iox_time/src/lib.rs @@ -52,22 +52,6 @@ impl Sub for Time { } } -impl Sub for Time { - type Output = Duration; - - /// Calculates difference in wall-clock time - /// - /// **Warning: Because monotonicity is not guaranteed, `t2 - t1` might be negative - /// even when `t2` was generated after `t1!** - /// - /// # Panic - /// - /// Panics if the result would be negative - fn sub(self, rhs: Self) -> Self::Output { - (self.0 - rhs.0).to_std().unwrap() - } -} - impl Debug for Time { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { std::fmt::Display::fmt(self, f) @@ -359,7 +343,7 @@ mod test { let b = provider.now(); let c = provider.now(); - let delta = b - a; + let delta = b.checked_duration_since(a).unwrap(); assert!(delta > Duration::from_millis(500)); assert!(delta < Duration::from_secs(5)); assert!(b <= c); @@ -373,7 +357,7 @@ mod test { provider.sleep(Duration::from_secs(1)).await; let b = provider.now(); - let delta = b - a; + let delta = b.checked_duration_since(a).unwrap(); assert!(delta > Duration::from_millis(500)); assert!(delta < Duration::from_secs(5)); } @@ -386,7 +370,7 @@ mod test { provider.sleep_until(a + Duration::from_secs(1)).await; let b = provider.now(); - let delta = b - a; + let delta = b.checked_duration_since(a).unwrap(); assert!(delta > Duration::from_millis(500)); assert!(delta < Duration::from_secs(5)); } diff --git a/querier/src/query_log.rs b/querier/src/query_log.rs index e183c25e7b..d95f4ebc67 100644 --- a/querier/src/query_log.rs +++ b/querier/src/query_log.rs @@ -3,6 +3,7 @@ use data_types::NamespaceId; use iox_query::QueryText; use iox_time::{Time, TimeProvider}; +use observability_deps::tracing::warn; use parking_lot::Mutex; use std::{ collections::VecDeque, @@ -91,9 +92,15 @@ impl QueryLogEntry { /// Mark this entry complete as of `now`. `success` records if the /// entry is successful or not. pub fn set_completed(&self, now: Time, success: bool) { - let dur = now - self.issue_time; - self.query_completed_duration - .store(dur.as_nanos() as i64, atomic::Ordering::Relaxed); + match now.checked_duration_since(self.issue_time) { + Some(dur) => { + self.query_completed_duration + .store(dur.as_nanos() as i64, atomic::Ordering::Relaxed); + } + None => { + warn!("Clock went backwards, not query duration") + } + } self.success.store(success, atomic::Ordering::SeqCst); } }