fix: avoid panic when clock goes backwards (#8322)
I've seen at least one case in prod where the UTC clock goes backwards. The `TimeProvider` and `Time` interface even warns about that. However there was a `Sub` impl that would panic if that happens and even though this was documented, I think we can do better and just not offer a panicky interface at all. So this removes the `Sub` impl. and replaces all uses with `checked_duration_since`.pull/24376/head
parent
fca624a039
commit
edf77c73d8
|
@ -4,6 +4,7 @@ use std::{fmt::Debug, sync::Arc};
|
|||
use async_trait::async_trait;
|
||||
use iox_time::{Time, TimeProvider};
|
||||
use metric::{Attributes, DurationHistogram, U64Counter};
|
||||
use observability_deps::tracing::warn;
|
||||
use trace::span::{Span, SpanRecorder};
|
||||
|
||||
use super::{Cache, CacheGetStatus, CachePeekStatus};
|
||||
|
@ -180,15 +181,22 @@ impl<'a> Drop for SetGetMetricOnDrop<'a> {
|
|||
fn drop(&mut self) {
|
||||
let t_end = self.metrics.time_provider.now();
|
||||
|
||||
match self.status {
|
||||
Some(CacheGetStatus::Hit) => &self.metrics.metric_get_hit,
|
||||
Some(CacheGetStatus::Miss) => &self.metrics.metric_get_miss,
|
||||
Some(CacheGetStatus::MissAlreadyLoading) => {
|
||||
&self.metrics.metric_get_miss_already_loading
|
||||
match t_end.checked_duration_since(self.t_start) {
|
||||
Some(duration) => {
|
||||
match self.status {
|
||||
Some(CacheGetStatus::Hit) => &self.metrics.metric_get_hit,
|
||||
Some(CacheGetStatus::Miss) => &self.metrics.metric_get_miss,
|
||||
Some(CacheGetStatus::MissAlreadyLoading) => {
|
||||
&self.metrics.metric_get_miss_already_loading
|
||||
}
|
||||
None => &self.metrics.metric_get_cancelled,
|
||||
}
|
||||
.record(duration);
|
||||
}
|
||||
None => {
|
||||
warn!("Clock went backwards, not recording cache GET duration");
|
||||
}
|
||||
None => &self.metrics.metric_get_cancelled,
|
||||
}
|
||||
.record(t_end - self.t_start);
|
||||
|
||||
if let Some(status) = self.status {
|
||||
self.span_recorder.ok(status.name());
|
||||
|
@ -224,15 +232,22 @@ impl<'a> Drop for SetPeekMetricOnDrop<'a> {
|
|||
fn drop(&mut self) {
|
||||
let t_end = self.metrics.time_provider.now();
|
||||
|
||||
match self.status {
|
||||
Some(Some(CachePeekStatus::Hit)) => &self.metrics.metric_peek_hit,
|
||||
Some(Some(CachePeekStatus::MissAlreadyLoading)) => {
|
||||
&self.metrics.metric_peek_miss_already_loading
|
||||
match t_end.checked_duration_since(self.t_start) {
|
||||
Some(duration) => {
|
||||
match self.status {
|
||||
Some(Some(CachePeekStatus::Hit)) => &self.metrics.metric_peek_hit,
|
||||
Some(Some(CachePeekStatus::MissAlreadyLoading)) => {
|
||||
&self.metrics.metric_peek_miss_already_loading
|
||||
}
|
||||
Some(None) => &self.metrics.metric_peek_miss,
|
||||
None => &self.metrics.metric_peek_cancelled,
|
||||
}
|
||||
.record(duration);
|
||||
}
|
||||
None => {
|
||||
warn!("Clock went backwards, not recording cache PEEK duration");
|
||||
}
|
||||
Some(None) => &self.metrics.metric_peek_miss,
|
||||
None => &self.metrics.metric_peek_cancelled,
|
||||
}
|
||||
.record(t_end - self.t_start);
|
||||
|
||||
if let Some(status) = self.status {
|
||||
self.span_recorder
|
||||
|
|
|
@ -5,6 +5,7 @@ use std::sync::Arc;
|
|||
use async_trait::async_trait;
|
||||
use iox_time::TimeProvider;
|
||||
use metric::{DurationHistogram, U64Counter};
|
||||
use observability_deps::tracing::warn;
|
||||
use parking_lot::Mutex;
|
||||
use pdatastructs::filters::{bloomfilter::BloomFilter, Filter};
|
||||
|
||||
|
@ -138,7 +139,14 @@ where
|
|||
let v = self.inner.load(k, extra).await;
|
||||
let t_end = self.time_provider.now();
|
||||
|
||||
self.metric_duration.record(t_end - t_start);
|
||||
match t_end.checked_duration_since(t_start) {
|
||||
Some(duration) => {
|
||||
self.metric_duration.record(duration);
|
||||
}
|
||||
None => {
|
||||
warn!("Clock went backwards, not recording loader duration");
|
||||
}
|
||||
}
|
||||
|
||||
v
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::{fmt::Display, ops::Sub, sync::Arc, sync::Mutex, time::Duration};
|
||||
use std::{fmt::Display, sync::Arc, sync::Mutex, time::Duration};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use backoff::{Backoff, BackoffConfig};
|
||||
|
@ -81,7 +81,12 @@ impl PartitionsSource for CatalogToCompactPartitionsSource {
|
|||
// that creates busy-work that will spam the catalog with more queries to determine no compaction
|
||||
// needed. But we also don't want to query so far back in time that we get all partitions, so the
|
||||
// lookback is limited to 3x the configured threshold.
|
||||
if minimum_time < *last || minimum_time.sub(*last) < self.min_threshold * 3 {
|
||||
if minimum_time < *last
|
||||
|| minimum_time
|
||||
.checked_duration_since(*last)
|
||||
.map(|duration| duration < self.min_threshold * 3)
|
||||
.unwrap_or_default()
|
||||
{
|
||||
// the end of the last query is less than 3x our configured lookback, so we can query everything
|
||||
// since the last query.
|
||||
minimum_time = *last;
|
||||
|
|
|
@ -52,22 +52,6 @@ impl Sub<Duration> for Time {
|
|||
}
|
||||
}
|
||||
|
||||
impl Sub<Self> for Time {
|
||||
type Output = Duration;
|
||||
|
||||
/// Calculates difference in wall-clock time
|
||||
///
|
||||
/// **Warning: Because monotonicity is not guaranteed, `t2 - t1` might be negative
|
||||
/// even when `t2` was generated after `t1!**
|
||||
///
|
||||
/// # Panic
|
||||
///
|
||||
/// Panics if the result would be negative
|
||||
fn sub(self, rhs: Self) -> Self::Output {
|
||||
(self.0 - rhs.0).to_std().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for Time {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
std::fmt::Display::fmt(self, f)
|
||||
|
@ -359,7 +343,7 @@ mod test {
|
|||
let b = provider.now();
|
||||
let c = provider.now();
|
||||
|
||||
let delta = b - a;
|
||||
let delta = b.checked_duration_since(a).unwrap();
|
||||
assert!(delta > Duration::from_millis(500));
|
||||
assert!(delta < Duration::from_secs(5));
|
||||
assert!(b <= c);
|
||||
|
@ -373,7 +357,7 @@ mod test {
|
|||
provider.sleep(Duration::from_secs(1)).await;
|
||||
let b = provider.now();
|
||||
|
||||
let delta = b - a;
|
||||
let delta = b.checked_duration_since(a).unwrap();
|
||||
assert!(delta > Duration::from_millis(500));
|
||||
assert!(delta < Duration::from_secs(5));
|
||||
}
|
||||
|
@ -386,7 +370,7 @@ mod test {
|
|||
provider.sleep_until(a + Duration::from_secs(1)).await;
|
||||
let b = provider.now();
|
||||
|
||||
let delta = b - a;
|
||||
let delta = b.checked_duration_since(a).unwrap();
|
||||
assert!(delta > Duration::from_millis(500));
|
||||
assert!(delta < Duration::from_secs(5));
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
use data_types::NamespaceId;
|
||||
use iox_query::QueryText;
|
||||
use iox_time::{Time, TimeProvider};
|
||||
use observability_deps::tracing::warn;
|
||||
use parking_lot::Mutex;
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
|
@ -91,9 +92,15 @@ impl QueryLogEntry {
|
|||
/// Mark this entry complete as of `now`. `success` records if the
|
||||
/// entry is successful or not.
|
||||
pub fn set_completed(&self, now: Time, success: bool) {
|
||||
let dur = now - self.issue_time;
|
||||
self.query_completed_duration
|
||||
.store(dur.as_nanos() as i64, atomic::Ordering::Relaxed);
|
||||
match now.checked_duration_since(self.issue_time) {
|
||||
Some(dur) => {
|
||||
self.query_completed_duration
|
||||
.store(dur.as_nanos() as i64, atomic::Ordering::Relaxed);
|
||||
}
|
||||
None => {
|
||||
warn!("Clock went backwards, not query duration")
|
||||
}
|
||||
}
|
||||
self.success.store(success, atomic::Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue