diff --git a/metric/src/duration.rs b/metric/src/duration.rs index e25d408852..3de3cdd329 100644 --- a/metric/src/duration.rs +++ b/metric/src/duration.rs @@ -98,11 +98,16 @@ pub struct DurationHistogram { impl DurationHistogram { pub fn record(&self, value: Duration) { - self.inner.record( + self.record_multiple(value, 1) + } + + pub fn record_multiple(&self, value: Duration, count: u64) { + self.inner.record_multiple( value .as_nanos() .try_into() .expect("cannot fit duration into u64"), + count, ) } } diff --git a/metric/src/histogram.rs b/metric/src/histogram.rs index 98ce164e1e..4d033ee1ca 100644 --- a/metric/src/histogram.rs +++ b/metric/src/histogram.rs @@ -50,6 +50,10 @@ impl U64Histogram { } pub fn record(&self, value: u64) { + self.record_multiple(value, 1) + } + + pub fn record_multiple(&self, value: u64, count: u64) { let mut state = self.shared.lock(); if let Some(bucket) = state .buckets @@ -57,8 +61,8 @@ impl U64Histogram { .find(|bucket| value <= bucket.le) .as_mut() { - bucket.count = bucket.count.wrapping_add(1); - state.total = state.total.wrapping_add(value); + bucket.count = bucket.count.wrapping_add(count); + state.total = state.total.wrapping_add(value * count); } } } diff --git a/server/src/db.rs b/server/src/db.rs index b23cf70944..2a6851dac7 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -2319,20 +2319,27 @@ mod tests { summary.record(Utc.timestamp_nanos(650000000000)); summary.record(Utc.timestamp_nanos(650000000010)); - for (minute, count) in summary.cumulative_counts() { - let minute = (minute * 60).to_string(); - test_db - .metric_registry - .has_metric_family("catalog_row_time_seconds_bucket") - .with_attributes(&[ - ("svr_id", "1"), - ("db_name", "placeholder"), - ("table", "write_metrics_test"), - ("le", minute.as_str()), - ]) - .counter() - .eq(count as _) - .unwrap(); + let mut reporter = metric::RawReporter::default(); + test_db.metrics_registry_v2.report(&mut reporter); + + let observation = reporter + .metric("catalog_row_time") + .unwrap() + .observation(&[("db_name", "placeholder"), ("table", "write_metrics_test")]) + .unwrap(); + + let histogram = match observation { + Observation::DurationHistogram(histogram) => histogram, + _ => unreachable!(), + }; + assert_eq!(histogram.buckets.len(), 60); + + for ((minute, count), observation) in + summary.counts.iter().enumerate().zip(&histogram.buckets) + { + let minute = Duration::from_secs((minute * 60) as u64); + assert_eq!(observation.le, minute); + assert_eq!(*count as u64, observation.count) } } diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 5d6d4fb17e..fa15edf000 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -242,7 +242,7 @@ pub struct ChunkMetrics { pub(super) memory_metrics: StorageGauge, /// Track ingested timestamps - pub(super) timestamp_histogram: TimestampHistogram, + pub(super) timestamp_histogram: Option, } impl ChunkMetrics { @@ -497,7 +497,9 @@ impl CatalogChunk { /// `time_of_write` is the wall clock time of the write /// `timestamps` is a summary of the row timestamps contained in the write pub fn record_write(&mut self, time_of_write: DateTime, timestamps: &TimestampSummary) { - self.metrics.timestamp_histogram.add(timestamps); + if let Some(timestamp_histogram) = self.metrics.timestamp_histogram.as_ref() { + timestamp_histogram.add(timestamps) + } self.access_recorder.record_access_now(); self.time_of_first_write = self.time_of_first_write.min(time_of_write); diff --git a/server/src/db/catalog/metrics.rs b/server/src/db/catalog/metrics.rs index a53b522010..0e9384dd8e 100644 --- a/server/src/db/catalog/metrics.rs +++ b/server/src/db/catalog/metrics.rs @@ -1,10 +1,15 @@ -use crate::db::catalog::chunk::ChunkMetrics; -use data_types::write_summary::TimestampSummary; -use metrics::{Counter, Gauge, GaugeValue, Histogram, KeyValue, MetricObserverBuilder}; -use parking_lot::Mutex; use std::sync::Arc; +use std::time::Duration; + +use parking_lot::Mutex; + +use data_types::write_summary::TimestampSummary; +use metric::{Attributes, DurationHistogram, DurationHistogramOptions}; +use metrics::{Counter, Gauge, GaugeValue, Histogram, KeyValue}; use tracker::{LockMetrics, RwLock}; +use crate::db::catalog::chunk::ChunkMetrics; + const TIMESTAMP_METRICS_ENABLE_ENV: &str = "INFLUXDB_IOX_ROW_TIMESTAMP_METRICS"; fn report_timestamp_metrics(table_name: &str) -> bool { std::env::var(TIMESTAMP_METRICS_ENABLE_ENV) @@ -53,26 +58,29 @@ impl CatalogMetrics { } pub(super) fn new_table_metrics(&self, table_name: &str) -> TableMetrics { - let mut attributes = metric::Attributes::from([ + let base_attributes = metric::Attributes::from([ ("db_name", self.db_name.to_string().into()), ("table", table_name.to_string().into()), ]); - attributes.insert("lock", "table"); + let mut lock_attributes = base_attributes.clone(); + lock_attributes.insert("lock", "table"); let table_lock_metrics = Arc::new(LockMetrics::new( self.metrics_registry.as_ref(), - attributes.clone(), + lock_attributes.clone(), )); - attributes.insert("lock", "partition"); + lock_attributes.insert("lock", "partition"); let partition_lock_metrics = Arc::new(LockMetrics::new( self.metrics_registry.as_ref(), - attributes.clone(), + lock_attributes.clone(), )); - attributes.insert("lock", "chunk"); - let chunk_lock_metrics = - Arc::new(LockMetrics::new(self.metrics_registry.as_ref(), attributes)); + lock_attributes.insert("lock", "chunk"); + let chunk_lock_metrics = Arc::new(LockMetrics::new( + self.metrics_registry.as_ref(), + lock_attributes, + )); let storage_gauge = self.metrics_domain.register_gauge_metric_with_attributes( "loaded", @@ -88,14 +96,8 @@ impl CatalogMetrics { &[KeyValue::new("table", table_name.to_string())], ); - let timestamp_histogram = Default::default(); - if report_timestamp_metrics(table_name) { - self.metrics_domain.register_observer( - None, - &[KeyValue::new("table", table_name.to_string())], - ×tamp_histogram, - ); - } + let timestamp_histogram = report_timestamp_metrics(table_name) + .then(|| TimestampHistogram::new(self.metrics_registry.as_ref(), base_attributes)); TableMetrics { metrics_domain: Arc::clone(&self.metrics_domain), @@ -134,7 +136,7 @@ pub struct TableMetrics { chunk_lock_metrics: Arc, /// Track ingested timestamps - timestamp_histogram: TimestampHistogram, + timestamp_histogram: Option, } impl TableMetrics { @@ -192,7 +194,7 @@ pub struct PartitionMetrics { chunk_lock_metrics: Arc, /// Track ingested timestamps - timestamp_histogram: TimestampHistogram, + timestamp_histogram: Option, } impl PartitionMetrics { @@ -337,35 +339,34 @@ impl StorageGauge { /// A Histogram-inspired metric for reporting `TimestampSummary` /// -/// This is partly to workaround limitations defining custom Histogram bucketing in OTEL -/// and also because it can be implemented more efficiently as the set of values is fixed -/// /// Like `TimestampSummary`, this is bucketed based on minute within the hour /// It will therefore wrap around on the hour -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub(super) struct TimestampHistogram { - inner: Arc>, + inner: metric::DurationHistogram, } impl TimestampHistogram { - pub(super) fn add(&self, summary: &TimestampSummary) { - self.inner.lock().merge(summary) - } -} - -impl metrics::MetricObserver for &TimestampHistogram { - fn register(self, builder: MetricObserverBuilder<'_>) { - let inner = Arc::clone(&self.inner); - builder.register_histogram_bucket( - "row_time", - Some("seconds"), + fn new(registry: &metric::Registry, attributes: impl Into) -> Self { + let histogram: metric::Metric = registry.register_metric_with_options( + "catalog_row_time", "The cumulative distribution of the ingested row timestamps", - move |result| { - let inner = inner.lock(); - for (min, total) in inner.cumulative_counts() { - result.observe(total, &[KeyValue::new("le", (min * 60).to_string())]) - } + || { + DurationHistogramOptions::new( + (0..60).map(|minute| Duration::from_secs(minute * 60)), + ) }, - ) + ); + + Self { + inner: histogram.recorder(attributes), + } + } + + pub(super) fn add(&self, summary: &TimestampSummary) { + for (minute, count) in summary.counts.iter().enumerate() { + self.inner + .record_multiple(Duration::from_secs(minute as u64 * 60), *count as u64) + } } } diff --git a/tests/end_to_end_cases/metrics.rs b/tests/end_to_end_cases/metrics.rs index 1ff2417b94..9417a24e62 100644 --- a/tests/end_to_end_cases/metrics.rs +++ b/tests/end_to_end_cases/metrics.rs @@ -33,7 +33,7 @@ pub async fn test_row_timestamp() { let db_name_attribute = format!("db_name=\"{}\"", db_name); // Should only be enabled for the system table - assert_eq!(lines.len(), 60); + assert_eq!(lines.len(), 61); assert!(lines .iter() .all(|x| x.contains("table=\"system\"") && x.contains(&db_name_attribute)));