diff --git a/compactor/src/compact.rs b/compactor/src/compact.rs index 47da302d96..685f84a5d3 100644 --- a/compactor/src/compact.rs +++ b/compactor/src/compact.rs @@ -23,7 +23,7 @@ use iox_query::{ QueryChunk, }; use iox_time::TimeProvider; -use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram, U64HistogramOptions}; +use metric::{Attributes, DurationHistogram, Metric, U64Counter, U64Gauge}; use observability_deps::tracing::{debug, info, trace, warn}; use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; @@ -191,7 +191,7 @@ pub struct Compactor { compaction_candidate_bytes_gauge: Metric, /// Histogram for tracking the time to compact a partition - compaction_duration_ms: Metric, + compaction_duration: Metric, } impl Compactor { @@ -230,14 +230,9 @@ impl Compactor { "Counter for level promotion from 0 to 1", ); - // buckets for timing compact partition - let compaction_duration_buckets_ms = - || U64HistogramOptions::new([100, 1000, 5000, 10000, 30000, 60000, 360000, u64::MAX]); - - let compaction_duration_ms: Metric = registry.register_metric_with_options( - "compactor_compact_partition_duration_ms", - "Compact partition duration in milliseconds", - compaction_duration_buckets_ms, + let compaction_duration: Metric = registry.register_metric( + "compactor_compact_partition_duration", + "Compact partition duration", ); Self { @@ -254,7 +249,7 @@ impl Compactor { level_promotion_counter, compaction_candidate_gauge, compaction_candidate_bytes_gauge, - compaction_duration_ms, + compaction_duration, } } @@ -540,8 +535,8 @@ impl Compactor { } if let Some(delta) = self.time_provider.now().checked_duration_since(start_time) { - let duration_ms = self.compaction_duration_ms.recorder(attributes.clone()); - duration_ms.record(delta.as_millis() as _); + let duration = self.compaction_duration.recorder(attributes.clone()); + duration.record(delta); } let compaction_counter = self.compaction_counter.recorder(attributes.clone()); diff --git a/influxdb_iox/tests/end_to_end_cases/metrics.rs b/influxdb_iox/tests/end_to_end_cases/metrics.rs index f24212e630..11dc75e67d 100644 --- a/influxdb_iox/tests/end_to_end_cases/metrics.rs +++ b/influxdb_iox/tests/end_to_end_cases/metrics.rs @@ -16,13 +16,17 @@ pub async fn test_metrics() { Step::WriteLineProtocol(lp), Step::WaitForReadable, Step::VerifiedMetrics(Box::new(|_state, metrics| { + let metrics_lines: Vec<_> = metrics.trim().split('\n').collect(); + let catalog_op_metrics_count = metrics_lines + .iter() + .filter(|x| x.starts_with("catalog_op_duration_seconds_bucket")) + .count(); + assert!( + catalog_op_metrics_count >= 180, + "Expected at least 180 catalog op metrics, got: {}\n\n{}", + catalog_op_metrics_count, metrics - .trim() - .split('\n') - .filter(|x| x.starts_with("catalog_op_duration_ms_bucket")) - .count() - >= 180 ); })), ], diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 27a51f21d1..75d998c4d3 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -23,7 +23,7 @@ use generated_types::ingester::IngesterQueryRequest; use iox_catalog::interface::Catalog; use iox_query::exec::Executor; use iox_time::{SystemProvider, TimeProvider}; -use metric::{Metric, U64Counter, U64Histogram, U64HistogramOptions}; +use metric::{DurationHistogram, Metric, U64Counter}; use object_store::DynObjectStore; use observability_deps::tracing::*; use snafu::{ResultExt, Snafu}; @@ -109,9 +109,9 @@ pub struct IngestHandlerImpl { time_provider: T, - /// Query execution duration distribution (milliseconds). - query_duration_success_ms: U64Histogram, - query_duration_error_ms: U64Histogram, + /// Query execution duration distribution. + query_duration_success: DurationHistogram, + query_duration_error: DurationHistogram, /// Query request rejected due to concurrency limits query_request_limit_rejected: U64Counter, @@ -250,30 +250,12 @@ impl IngestHandlerImpl { } // Record query duration metrics, broken down by query execution result - let query_duration: Metric = metric_registry.register_metric_with_options( - "flight_query_duration_ms", - "flight request query execution duration in milliseconds", - || { - U64HistogramOptions::new([ - 5, - 10, - 20, - 40, - 80, - 160, - 320, - 640, - 1280, - 2560, - 5120, - 10240, - 20480, - u64::MAX, - ]) - }, + let query_duration: Metric = metric_registry.register_metric( + "flight_query_duration", + "flight request query execution duration", ); - let query_duration_success_ms = query_duration.recorder(&[("result", "success")]); - let query_duration_error_ms = query_duration.recorder(&[("result", "error")]); + let query_duration_success = query_duration.recorder(&[("result", "success")]); + let query_duration_error = query_duration.recorder(&[("result", "error")]); let query_request_limit_rejected = metric_registry .register_metric::( @@ -287,8 +269,8 @@ impl IngestHandlerImpl { kafka_topic: topic, join_handles, shutdown, - query_duration_success_ms, - query_duration_error_ms, + query_duration_success, + query_duration_error, query_request_limit_rejected, request_sem: Semaphore::new(max_requests), time_provider: Default::default(), @@ -325,10 +307,8 @@ impl IngestHandler for IngestHandlerImpl { if let Some(delta) = self.time_provider.now().checked_duration_since(t) { match &res { - Ok(_) => self - .query_duration_success_ms - .record(delta.as_millis() as _), - Err(_) => self.query_duration_error_ms.record(delta.as_millis() as _), + Ok(_) => self.query_duration_success.record(delta), + Err(_) => self.query_duration_error.record(delta), }; } @@ -397,7 +377,7 @@ mod tests { use dml::{DmlMeta, DmlWrite}; use iox_catalog::{mem::MemCatalog, validate_or_insert_schema}; use iox_time::Time; - use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram}; + use metric::{Attributes, Metric, U64Counter, U64Gauge}; use mutable_batch_lp::lines_to_batches; use object_store::memory::InMemory; use std::{num::NonZeroU32, ops::DerefMut}; @@ -504,7 +484,7 @@ mod tests { let observation = ingester .metrics - .get_instrument::>("ingester_op_apply_duration_ms") + .get_instrument::>("ingester_op_apply_duration") .unwrap() .get_observer(&Attributes::from(&[ ("kafka_topic", "whatevs"), diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index 43920b6c26..a178977720 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -4,7 +4,7 @@ use data_types::{KafkaPartition, SequenceNumber}; use dml::DmlOperation; use futures::{pin_mut, FutureExt, StreamExt}; use iox_time::{SystemProvider, TimeProvider}; -use metric::{Attributes, U64Counter, U64Gauge}; +use metric::{Attributes, DurationCounter, DurationGauge, U64Counter}; use observability_deps::tracing::*; use std::{fmt::Debug, time::Duration}; use tokio_util::sync::CancellationToken; @@ -48,10 +48,10 @@ pub struct SequencedStreamHandler { // Metrics time_provider: T, - time_to_be_readable_ms: U64Gauge, + time_to_be_readable: DurationGauge, /// Duration of time ingest is paused at the request of the LifecycleManager - pause_duration_ms: U64Counter, + pause_duration: DurationCounter, /// Errors during op stream reading seq_unknown_sequence_number_count: U64Counter, @@ -86,17 +86,18 @@ impl SequencedStreamHandler { skip_to_oldest_available: bool, ) -> Self { // TTBR - let time_to_be_readable_ms = metrics.register_metric::( - "ingester_ttbr_ms", - "duration of time between producer writing to consumer putting into queryable cache in \ - milliseconds", + let time_to_be_readable = metrics.register_metric::( + "ingester_ttbr", + "duration of time between producer writing to consumer putting into queryable cache", ).recorder(metric_attrs(kafka_partition, &kafka_topic_name, None, false)); // Lifecycle-driven ingest pause duration - let pause_duration_ms = metrics.register_metric::( - "ingest_paused_duration_ms_total", - "duration of time ingestion has been paused by the lifecycle manager in milliseconds", - ).recorder(&[]); + let pause_duration = metrics + .register_metric::( + "ingest_paused_duration_total", + "duration of time ingestion has been paused by the lifecycle manager", + ) + .recorder(&[]); // Error count metrics let ingest_errors = metrics.register_metric::( @@ -140,8 +141,8 @@ impl SequencedStreamHandler { sink, lifecycle_handle, time_provider: SystemProvider::default(), - time_to_be_readable_ms, - pause_duration_ms, + time_to_be_readable, + pause_duration, seq_unknown_sequence_number_count, seq_invalid_data_count, seq_unknown_error_count, @@ -162,8 +163,8 @@ impl SequencedStreamHandler { sink: self.sink, lifecycle_handle: self.lifecycle_handle, time_provider: provider, - time_to_be_readable_ms: self.time_to_be_readable_ms, - pause_duration_ms: self.pause_duration_ms, + time_to_be_readable: self.time_to_be_readable, + pause_duration: self.pause_duration, seq_unknown_sequence_number_count: self.seq_unknown_sequence_number_count, seq_invalid_data_count: self.seq_invalid_data_count, seq_unknown_error_count: self.seq_unknown_error_count, @@ -356,7 +357,7 @@ where if let Some(delta) = produced_at.and_then(|ts| self.time_provider.now().checked_duration_since(ts)) { - self.time_to_be_readable_ms.set(delta.as_millis() as u64); + self.time_to_be_readable.set(delta); } if should_pause { @@ -384,8 +385,7 @@ where // While the actual sleep may be slightly longer than // INGEST_POLL_INTERVAL, it's not likely to be a useful // distinction in the metrics. - self.pause_duration_ms - .inc(INGEST_POLL_INTERVAL.as_millis() as _); + self.pause_duration.inc(INGEST_POLL_INTERVAL); tokio::time::sleep(INGEST_POLL_INTERVAL).await; } @@ -644,7 +644,7 @@ mod tests { // Assert the TTBR metric value let ttbr = metrics - .get_instrument::>("ingester_ttbr_ms") + .get_instrument::>("ingester_ttbr") .expect("did not find ttbr metric") .get_observer(&Attributes::from([ ("kafka_topic", TEST_KAFKA_TOPIC.into()), @@ -652,7 +652,7 @@ mod tests { ])) .expect("did not match metric attributes") .fetch(); - assert_eq!(ttbr, $want_ttbr); + assert_eq!(ttbr, Duration::from_millis($want_ttbr)); // Assert any error metrics in the macro call $( diff --git a/ingester/src/stream_handler/sink_instrumentation.rs b/ingester/src/stream_handler/sink_instrumentation.rs index fb65aa6a85..86ac0ebdb0 100644 --- a/ingester/src/stream_handler/sink_instrumentation.rs +++ b/ingester/src/stream_handler/sink_instrumentation.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use data_types::KafkaPartition; use dml::DmlOperation; use iox_time::{SystemProvider, TimeProvider}; -use metric::{Attributes, U64Counter, U64Gauge, U64Histogram, U64HistogramOptions}; +use metric::{Attributes, DurationHistogram, U64Counter, U64Gauge}; use std::fmt::Debug; use trace::span::SpanRecorder; @@ -52,8 +52,8 @@ pub struct SinkInstrumentation { /// Op application success/failure call latency histograms (which include /// counters) - op_apply_success_ms: U64Histogram, - op_apply_error_ms: U64Histogram, + op_apply_success: DurationHistogram, + op_apply_error: DurationHistogram, /// Write buffer metrics write_buffer_bytes_read: U64Counter, @@ -111,37 +111,16 @@ where ) .recorder(attr.clone()); - // The buckets for the op apply histogram - let buckets = || { - U64HistogramOptions::new([ - 5, - 10, - 20, - 40, - 80, - 160, - 320, - 640, - 1280, - 2560, - 5120, - 10240, - 20480, - u64::MAX, - ]) - }; - - let op_apply = metrics.register_metric_with_options::( - "ingester_op_apply_duration_ms", + let op_apply = metrics.register_metric::( + "ingester_op_apply_duration", "The duration of time taken to process an operation read from the sequencer", - buckets, ); - let op_apply_success_ms = op_apply.recorder({ + let op_apply_success = op_apply.recorder({ let mut attr = attr.clone(); attr.insert("result", "success"); attr }); - let op_apply_error_ms = op_apply.recorder({ + let op_apply_error = op_apply.recorder({ let mut attr = attr; attr.insert("result", "error"); attr @@ -152,8 +131,8 @@ where watermark_fetcher, sequencer_id: kafka_partition.get(), - op_apply_success_ms, - op_apply_error_ms, + op_apply_success, + op_apply_error, write_buffer_bytes_read, write_buffer_last_sequence_number, @@ -235,14 +214,14 @@ where let metric = match &res { Ok(_) => { span_recorder.ok("success"); - &self.op_apply_success_ms + &self.op_apply_success } Err(e) => { span_recorder.error(e.to_string()); - &self.op_apply_error_ms + &self.op_apply_error } }; - metric.record(delta.as_millis() as _); + metric.record(delta); } // Return the result from the inner handler unmodified @@ -394,12 +373,12 @@ mod tests { ); // Validate the success histogram was hit - let hist = get_metric::(&metrics, "ingester_op_apply_duration_ms", &{ + let hist = get_metric::(&metrics, "ingester_op_apply_duration", &{ let mut attrs = DEFAULT_ATTRS.clone(); attrs.insert("result", "success"); attrs }); - assert_matches!(hist, Observation::U64Histogram(h) => { + assert_matches!(hist, Observation::DurationHistogram(h) => { let hits: u64 = h.buckets.iter().map(|b| b.count).sum(); assert_eq!(hits, 1); }); @@ -465,12 +444,12 @@ mod tests { ); // Validate the histogram was hit even on error - let hist = get_metric::(&metrics, "ingester_op_apply_duration_ms", &{ + let hist = get_metric::(&metrics, "ingester_op_apply_duration", &{ let mut attrs = DEFAULT_ATTRS.clone(); attrs.insert("result", "error"); attrs }); - assert_matches!(hist, Observation::U64Histogram(h) => { + assert_matches!(hist, Observation::DurationHistogram(h) => { let hits: u64 = h.buckets.iter().map(|b| b.count).sum(); assert_eq!(hits, 1); }); @@ -529,12 +508,12 @@ mod tests { ); // Validate the success histogram was hit - let hist = get_metric::(&metrics, "ingester_op_apply_duration_ms", &{ + let hist = get_metric::(&metrics, "ingester_op_apply_duration", &{ let mut attrs = DEFAULT_ATTRS.clone(); attrs.insert("result", "success"); attrs }); - assert_matches!(hist, Observation::U64Histogram(h) => { + assert_matches!(hist, Observation::DurationHistogram(h) => { let hits: u64 = h.buckets.iter().map(|b| b.count).sum(); assert_eq!(hits, 1); }); @@ -594,12 +573,12 @@ mod tests { ); // Validate the success histogram was hit - let hist = get_metric::(&metrics, "ingester_op_apply_duration_ms", &{ + let hist = get_metric::(&metrics, "ingester_op_apply_duration", &{ let mut attrs = DEFAULT_ATTRS.clone(); attrs.insert("result", "success"); attrs }); - assert_matches!(hist, Observation::U64Histogram(h) => { + assert_matches!(hist, Observation::DurationHistogram(h) => { let hits: u64 = h.buckets.iter().map(|b| b.count).sum(); assert_eq!(hits, 1); }); diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index d7bca02e4f..0c662427fe 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -774,7 +774,7 @@ pub(crate) mod test_helpers { use super::*; use ::test_helpers::{assert_contains, tracing::TracingCapture}; use data_types::ColumnId; - use metric::{Attributes, Metric, U64Histogram}; + use metric::{Attributes, DurationHistogram, Metric}; use std::{ ops::{Add, DerefMut}, sync::Arc, @@ -3084,7 +3084,7 @@ pub(crate) mod test_helpers { fn assert_metric_hit(metrics: &metric::Registry, name: &'static str) { let histogram = metrics - .get_instrument::>("catalog_op_duration_ms") + .get_instrument::>("catalog_op_duration") .expect("failed to read metric") .get_observer(&Attributes::from(&[("op", name), ("result", "success")])) .expect("failed to get observer") diff --git a/iox_catalog/src/metrics.rs b/iox_catalog/src/metrics.rs index 72728d2f2a..fc475a251b 100644 --- a/iox_catalog/src/metrics.rs +++ b/iox_catalog/src/metrics.rs @@ -13,7 +13,7 @@ use data_types::{ SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId, }; use iox_time::{SystemProvider, TimeProvider}; -use metric::{Metric, U64Histogram, U64HistogramOptions}; +use metric::{DurationHistogram, Metric}; use std::{fmt::Debug, sync::Arc}; use uuid::Uuid; @@ -21,7 +21,7 @@ use uuid::Uuid; /// transactional variant) with instrumentation that emits latency histograms /// for each method. /// -/// Values are recorded under the `catalog_op_duration_ms` metric, labelled by +/// Values are recorded under the `catalog_op_duration` metric, labelled by /// operation name and result (success/error). #[derive(Debug)] pub struct MetricDecorator { @@ -149,14 +149,9 @@ macro_rules! decorate { $( async fn $method(&mut self, $($arg : $t),*) -> Result<$out> { - let buckets = || { - U64HistogramOptions::new([5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, u64::MAX]) - }; - - let observer: Metric = self.metrics.register_metric_with_options( - "catalog_op_duration_ms", - "catalog call duration in milliseconds", - buckets, + let observer: Metric = self.metrics.register_metric( + "catalog_op_duration", + "catalog call duration", ); let t = self.time_provider.now(); @@ -169,7 +164,7 @@ macro_rules! decorate { Ok(_) => "success", Err(_) => "error", }; - observer.recorder(&[("op", $metric), ("result", tag)]).record(delta.as_millis() as _); + observer.recorder(&[("op", $metric), ("result", tag)]).record(delta); } res diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index 04f679670f..4dea638c81 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -1950,7 +1950,7 @@ mod tests { use super::*; use crate::create_or_get_default_records; use assert_matches::assert_matches; - use metric::{Attributes, Metric, U64Histogram}; + use metric::{Attributes, DurationHistogram, Metric}; use rand::Rng; use sqlx::migrate::MigrateDatabase; use std::{env, io::Write, ops::DerefMut, sync::Arc, time::Instant}; @@ -2005,7 +2005,7 @@ mod tests { fn assert_metric_hit(metrics: &metric::Registry, name: &'static str) { let histogram = metrics - .get_instrument::>("catalog_op_duration_ms") + .get_instrument::>("catalog_op_duration") .expect("failed to read metric") .get_observer(&Attributes::from(&[("op", name), ("result", "success")])) .expect("failed to get observer") diff --git a/object_store_metrics/src/lib.rs b/object_store_metrics/src/lib.rs index 8fbaa7e016..7b7d9d6843 100644 --- a/object_store_metrics/src/lib.rs +++ b/object_store_metrics/src/lib.rs @@ -11,7 +11,7 @@ use async_trait::async_trait; use bytes::Bytes; use futures::{stream::BoxStream, Stream, StreamExt}; use iox_time::{SystemProvider, Time, TimeProvider}; -use metric::{Metric, U64Counter, U64Histogram, U64HistogramOptions}; +use metric::{DurationHistogram, Metric, U64Counter}; use pin_project::{pin_project, pinned_drop}; use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result}; @@ -70,19 +70,19 @@ pub struct ObjectStoreMetrics { inner: Arc, time_provider: Arc, - put_success_duration_ms: U64Histogram, - put_error_duration_ms: U64Histogram, + put_success_duration: DurationHistogram, + put_error_duration: DurationHistogram, put_bytes: U64Counter, - get_success_duration_ms: U64Histogram, - get_error_duration_ms: U64Histogram, + get_success_duration: DurationHistogram, + get_error_duration: DurationHistogram, get_bytes: U64Counter, - delete_success_duration_ms: U64Histogram, - delete_error_duration_ms: U64Histogram, + delete_success_duration: DurationHistogram, + delete_error_duration: DurationHistogram, - list_success_duration_ms: U64Histogram, - list_error_duration_ms: U64Histogram, + list_success_duration: DurationHistogram, + list_error_duration: DurationHistogram, } impl ObjectStoreMetrics { @@ -92,25 +92,6 @@ impl ObjectStoreMetrics { time_provider: Arc, registry: &metric::Registry, ) -> Self { - let buckets = || { - U64HistogramOptions::new([ - 5, - 10, - 20, - 40, - 80, - 160, - 320, - 640, - 1280, - 2560, - 5120, - 10240, - 20480, - u64::MAX, - ]) - }; - // Byte counts up/down let bytes = registry.register_metric::( "object_store_transfer_bytes", @@ -120,10 +101,9 @@ impl ObjectStoreMetrics { let get_bytes = bytes.recorder(&[("op", "get")]); // Call durations broken down by op & result - let duration: Metric = registry.register_metric_with_options( - "object_store_op_duration_ms", - "object store operation duration in milliseconds", - buckets, + let duration: Metric = registry.register_metric( + "object_store_op_duration", + "object store operation duration", ); let put_success_duration = duration.recorder(&[("op", "put"), ("result", "success")]); let put_error_duration = duration.recorder(&[("op", "put"), ("result", "error")]); @@ -138,19 +118,19 @@ impl ObjectStoreMetrics { inner, time_provider, - put_success_duration_ms: put_success_duration, - put_error_duration_ms: put_error_duration, + put_success_duration, + put_error_duration, put_bytes, get_bytes, - get_success_duration_ms: get_success_duration, - get_error_duration_ms: get_error_duration, + get_success_duration, + get_error_duration, - delete_success_duration_ms: delete_success_duration, - delete_error_duration_ms: delete_error_duration, + delete_success_duration, + delete_error_duration, - list_success_duration_ms: list_success_duration, - list_error_duration_ms: list_error_duration, + list_success_duration, + list_error_duration, } } } @@ -174,8 +154,8 @@ impl ObjectStore for ObjectStoreMetrics { // if it happens. if let Some(delta) = self.time_provider.now().checked_duration_since(t) { match &res { - Ok(_) => self.put_success_duration_ms.record(delta.as_millis() as _), - Err(_) => self.put_error_duration_ms.record(delta.as_millis() as _), + Ok(_) => self.put_success_duration.record(delta), + Err(_) => self.put_error_duration.record(delta), }; } @@ -193,7 +173,7 @@ impl ObjectStore for ObjectStoreMetrics { if let Ok(m) = file.metadata().await { self.get_bytes.inc(m.len()); if let Some(d) = self.time_provider.now().checked_duration_since(started_at) { - self.get_success_duration_ms.record(d.as_millis() as _) + self.get_success_duration.record(d) } } Ok(GetResult::File(file, path)) @@ -205,8 +185,8 @@ impl ObjectStore for ObjectStoreMetrics { StreamMetricRecorder::new( s, started_at, - self.get_success_duration_ms.clone(), - self.get_error_duration_ms.clone(), + self.get_success_duration.clone(), + self.get_error_duration.clone(), BytesStreamDelegate(self.get_bytes.clone()), ) .fuse(), @@ -215,7 +195,7 @@ impl ObjectStore for ObjectStoreMetrics { Err(e) => { // Record the call duration in the error histogram. if let Some(delta) = self.time_provider.now().checked_duration_since(started_at) { - self.get_error_duration_ms.record(delta.as_millis() as _); + self.get_error_duration.record(delta); } Err(e) } @@ -236,10 +216,8 @@ impl ObjectStore for ObjectStoreMetrics { // if it happens. if let Some(delta) = self.time_provider.now().checked_duration_since(t) { match &res { - Ok(_) => self - .delete_success_duration_ms - .record(delta.as_millis() as _), - Err(_) => self.delete_error_duration_ms.record(delta.as_millis() as _), + Ok(_) => self.delete_success_duration.record(delta), + Err(_) => self.delete_error_duration.record(delta), }; } @@ -262,8 +240,8 @@ impl ObjectStore for ObjectStoreMetrics { StreamMetricRecorder::new( s, started_at, - self.list_success_duration_ms.clone(), - self.list_error_duration_ms.clone(), + self.list_success_duration.clone(), + self.list_error_duration.clone(), NopStreamDelegate::default(), ) .fuse(), @@ -272,7 +250,7 @@ impl ObjectStore for ObjectStoreMetrics { Err(e) => { // Record the call duration in the error histogram. if let Some(delta) = self.time_provider.now().checked_duration_since(started_at) { - self.list_error_duration_ms.record(delta.as_millis() as _); + self.list_error_duration.record(delta); } Err(e) } @@ -288,8 +266,8 @@ impl ObjectStore for ObjectStoreMetrics { // if it happens. if let Some(delta) = self.time_provider.now().checked_duration_since(t) { match &res { - Ok(_) => self.list_success_duration_ms.record(delta.as_millis() as _), - Err(_) => self.list_error_duration_ms.record(delta.as_millis() as _), + Ok(_) => self.list_success_duration.record(delta), + Err(_) => self.list_error_duration.record(delta), }; } @@ -389,8 +367,8 @@ where // the `T`. metric_delegate: D, - success_duration_ms: U64Histogram, - error_duration_ms: U64Histogram, + success_duration: DurationHistogram, + error_duration: DurationHistogram, } impl StreamMetricRecorder @@ -401,8 +379,8 @@ where fn new( stream: S, started_at: Time, - success_duration_ms: U64Histogram, - error_duration_ms: U64Histogram, + success_duration: DurationHistogram, + error_duration: DurationHistogram, metric_delegate: D, ) -> Self { let time_provider = SystemProvider::default(); @@ -420,8 +398,8 @@ where started_at, time_provider, - success_duration_ms, - error_duration_ms, + success_duration, + error_duration, metric_delegate, } } @@ -459,8 +437,8 @@ where // The stream has terminated - record the wall clock duration // immediately. let hist = match this.last_call_ok { - true => this.success_duration_ms, - false => this.error_duration_ms, + true => this.success_duration, + false => this.error_duration, }; // Take the last_yielded_at option, marking metrics as emitted @@ -471,7 +449,7 @@ where .expect("no last_yielded_at value for fused stream") .checked_duration_since(*this.started_at) { - hist.record(d.as_millis() as _) + hist.record(d) } Poll::Ready(None) @@ -498,12 +476,12 @@ where // therefore last_yielded_at is still Some). if let Some(last) = self.last_yielded_at { let hist = match self.last_call_ok { - true => &self.success_duration_ms, - false => &self.error_duration_ms, + true => &self.success_duration, + false => &self.error_duration, }; if let Some(d) = last.checked_duration_since(self.started_at) { - hist.record(d.as_millis() as _) + hist.record(d) } } } @@ -532,7 +510,7 @@ mod tests { attr: [(&'static str, &'static str); N], ) { let histogram = metrics - .get_instrument::>(name) + .get_instrument::>(name) .expect("failed to read histogram") .get_observer(&Attributes::from(&attr)) .expect("failed to get observer") @@ -575,7 +553,7 @@ mod tests { assert_counter_value(&metrics, "object_store_transfer_bytes", [("op", "put")], 5); assert_histogram_hit( &metrics, - "object_store_op_duration_ms", + "object_store_op_duration", [("op", "put"), ("result", "success")], ); } @@ -598,7 +576,7 @@ mod tests { assert_counter_value(&metrics, "object_store_transfer_bytes", [("op", "put")], 5); assert_histogram_hit( &metrics, - "object_store_op_duration_ms", + "object_store_op_duration", [("op", "put"), ("result", "error")], ); } @@ -614,7 +592,7 @@ mod tests { assert_histogram_hit( &metrics, - "object_store_op_duration_ms", + "object_store_op_duration", [("op", "list"), ("result", "success")], ); } @@ -630,7 +608,7 @@ mod tests { assert_histogram_hit( &metrics, - "object_store_op_duration_ms", + "object_store_op_duration", [("op", "list"), ("result", "error")], ); } @@ -649,7 +627,7 @@ mod tests { assert_histogram_hit( &metrics, - "object_store_op_duration_ms", + "object_store_op_duration", [("op", "list"), ("result", "success")], ); } @@ -671,7 +649,7 @@ mod tests { assert_histogram_hit( &metrics, - "object_store_op_duration_ms", + "object_store_op_duration", [("op", "list"), ("result", "error")], ); } @@ -690,7 +668,7 @@ mod tests { assert_histogram_hit( &metrics, - "object_store_op_duration_ms", + "object_store_op_duration", [("op", "get"), ("result", "error")], ); } @@ -724,7 +702,7 @@ mod tests { assert_counter_value(&metrics, "object_store_transfer_bytes", [("op", "get")], 5); assert_histogram_hit( &metrics, - "object_store_op_duration_ms", + "object_store_op_duration", [("op", "get"), ("result", "success")], ); @@ -735,7 +713,7 @@ mod tests { assert_histogram_hit( &metrics, - "object_store_op_duration_ms", + "object_store_op_duration", [("op", "delete"), ("result", "success")], ); } @@ -763,7 +741,7 @@ mod tests { assert_counter_value(&metrics, "object_store_transfer_bytes", [("op", "get")], 5); assert_histogram_hit( &metrics, - "object_store_op_duration_ms", + "object_store_op_duration", [("op", "get"), ("result", "success")], ); } @@ -785,10 +763,7 @@ mod tests { let time_provider = SystemProvider::default(); let metrics = Arc::new(metric::Registry::default()); - let hist: Metric = - metrics.register_metric_with_options("wall_clock", "", || { - U64HistogramOptions::new([1, 100, u64::MAX]) - }); + let hist: Metric = metrics.register_metric("wall_clock", ""); let bytes = metrics .register_metric::( @@ -852,7 +827,7 @@ mod tests { .fetch() .buckets .iter() - .skip_while(|b| b.le < SLEEP.as_millis() as _) // Skip buckets less than the sleep duration + .skip_while(|b| b.le < SLEEP) // Skip buckets less than the sleep duration .fold(0, |acc, v| acc + v.count); assert_eq!( hit_count, 1, @@ -887,10 +862,7 @@ mod tests { let time_provider = SystemProvider::default(); let metrics = Arc::new(metric::Registry::default()); - let hist: Metric = - metrics.register_metric_with_options("wall_clock", "", || { - U64HistogramOptions::new([1, 100, u64::MAX]) - }); + let hist: Metric = metrics.register_metric("wall_clock", ""); let bytes = metrics .register_metric::( @@ -955,10 +927,7 @@ mod tests { let time_provider = SystemProvider::default(); let metrics = Arc::new(metric::Registry::default()); - let hist: Metric = - metrics.register_metric_with_options("wall_clock", "", || { - U64HistogramOptions::new([1, 100, u64::MAX]) - }); + let hist: Metric = metrics.register_metric("wall_clock", ""); let bytes = metrics .register_metric::( @@ -1023,10 +992,7 @@ mod tests { let time_provider = SystemProvider::default(); let metrics = Arc::new(metric::Registry::default()); - let hist: Metric = - metrics.register_metric_with_options("wall_clock", "", || { - U64HistogramOptions::new([1, 100, u64::MAX]) - }); + let hist: Metric = metrics.register_metric("wall_clock", ""); let bytes = metrics .register_metric::( @@ -1096,10 +1062,7 @@ mod tests { let time_provider = SystemProvider::default(); let metrics = Arc::new(metric::Registry::default()); - let hist: Metric = - metrics.register_metric_with_options("wall_clock", "", || { - U64HistogramOptions::new([1, 100, u64::MAX]) - }); + let hist: Metric = metrics.register_metric("wall_clock", ""); let bytes = metrics .register_metric::( @@ -1145,10 +1108,7 @@ mod tests { let time_provider = SystemProvider::default(); let metrics = Arc::new(metric::Registry::default()); - let hist: Metric = - metrics.register_metric_with_options("wall_clock", "", || { - U64HistogramOptions::new([1, 100, u64::MAX]) - }); + let hist: Metric = metrics.register_metric("wall_clock", ""); let bytes = metrics .register_metric::( diff --git a/querier/src/cache/test_util.rs b/querier/src/cache/test_util.rs index 97e3efc294..ae4941e432 100644 --- a/querier/src/cache/test_util.rs +++ b/querier/src/cache/test_util.rs @@ -1,8 +1,8 @@ -use metric::{Attributes, Metric, U64Histogram}; +use metric::{Attributes, DurationHistogram, Metric}; pub fn assert_histogram_metric_count(metrics: &metric::Registry, name: &'static str, n: u64) { let histogram = metrics - .get_instrument::>("catalog_op_duration_ms") + .get_instrument::>("catalog_op_duration") .expect("failed to read metric") .get_observer(&Attributes::from(&[("op", name), ("result", "success")])) .expect("failed to get observer") diff --git a/router/src/dml_handlers/instrumentation.rs b/router/src/dml_handlers/instrumentation.rs index b07274884a..6ca6cce047 100644 --- a/router/src/dml_handlers/instrumentation.rs +++ b/router/src/dml_handlers/instrumentation.rs @@ -2,59 +2,34 @@ use super::DmlHandler; use async_trait::async_trait; use data_types::{DatabaseName, DeletePredicate}; use iox_time::{SystemProvider, TimeProvider}; -use metric::{Metric, U64Histogram, U64HistogramOptions}; +use metric::{DurationHistogram, Metric}; use trace::{ctx::SpanContext, span::SpanRecorder}; -/// An instrumentation decorator recording call latencies for [`DmlHandler`] -/// implementations. +/// An instrumentation decorator recording call latencies for [`DmlHandler`] implementations. /// -/// Metrics are broken down by operation (write/delete) and result -/// (success/error) with call latency reported in milliseconds. +/// Metrics are broken down by operation (write/delete) and result (success/error). #[derive(Debug)] pub struct InstrumentationDecorator { name: &'static str, inner: T, time_provider: P, - write_success: U64Histogram, - write_error: U64Histogram, + write_success: DurationHistogram, + write_error: DurationHistogram, - delete_success: U64Histogram, - delete_error: U64Histogram, + delete_success: DurationHistogram, + delete_error: DurationHistogram, } impl InstrumentationDecorator { /// Wrap a new [`InstrumentationDecorator`] over `T` exposing metrics /// labelled with `handler=name`. pub fn new(name: &'static str, registry: &metric::Registry, inner: T) -> Self { - let buckets = || { - U64HistogramOptions::new([ - 5, - 10, - 20, - 40, - 80, - 160, - 320, - 640, - 1280, - 2560, - 5120, - 10240, - 20480, - u64::MAX, - ]) - }; - - let write: Metric = registry.register_metric_with_options( - "dml_handler_write_duration_ms", - "write handler call duration in milliseconds", - buckets, - ); - let delete: Metric = registry.register_metric_with_options( - "dml_handler_delete_duration_ms", - "delete handler call duration in milliseconds", - buckets, + let write: Metric = + registry.register_metric("dml_handler_write_duration", "write handler call duration"); + let delete: Metric = registry.register_metric( + "dml_handler_delete_duration", + "delete handler call duration", ); let write_success = write.recorder(&[("handler", name), ("result", "success")]); @@ -106,11 +81,11 @@ where match &res { Ok(_) => { span_recorder.ok("success"); - self.write_success.record(delta.as_millis() as _) + self.write_success.record(delta) } Err(e) => { span_recorder.error(e.to_string()); - self.write_error.record(delta.as_millis() as _) + self.write_error.record(delta) } }; } @@ -143,11 +118,11 @@ where match &res { Ok(_) => { span_recorder.ok("success"); - self.delete_success.record(delta.as_millis() as _) + self.delete_success.record(delta) } Err(e) => { span_recorder.error(e.to_string()); - self.delete_error.record(delta.as_millis() as _) + self.delete_error.record(delta) } }; } @@ -175,7 +150,7 @@ mod tests { result: &'static str, ) { let histogram = metrics - .get_instrument::>(metric_name) + .get_instrument::>(metric_name) .expect("failed to read metric") .get_observer(&Attributes::from(&[ ("handler", HANDLER_NAME), @@ -226,7 +201,7 @@ mod tests { .await .expect("inner handler configured to succeed"); - assert_metric_hit(&*metrics, "dml_handler_write_duration_ms", "success"); + assert_metric_hit(&*metrics, "dml_handler_write_duration", "success"); assert_trace(traces, SpanStatus::Ok); } @@ -251,7 +226,7 @@ mod tests { assert_matches!(err, DmlError::DatabaseNotFound(_)); - assert_metric_hit(&*metrics, "dml_handler_write_duration_ms", "error"); + assert_metric_hit(&*metrics, "dml_handler_write_duration", "error"); assert_trace(traces, SpanStatus::Err); } @@ -276,7 +251,7 @@ mod tests { .await .expect("inner handler configured to succeed"); - assert_metric_hit(&*metrics, "dml_handler_delete_duration_ms", "success"); + assert_metric_hit(&*metrics, "dml_handler_delete_duration", "success"); assert_trace(traces, SpanStatus::Ok); } @@ -304,7 +279,7 @@ mod tests { .await .expect_err("inner handler configured to fail"); - assert_metric_hit(&*metrics, "dml_handler_delete_duration_ms", "error"); + assert_metric_hit(&*metrics, "dml_handler_delete_duration", "error"); assert_trace(traces, SpanStatus::Err); } } diff --git a/router/src/namespace_cache/metrics.rs b/router/src/namespace_cache/metrics.rs index 5a95ba105a..1aa63ba942 100644 --- a/router/src/namespace_cache/metrics.rs +++ b/router/src/namespace_cache/metrics.rs @@ -3,7 +3,7 @@ use super::NamespaceCache; use data_types::{DatabaseName, NamespaceSchema}; use iox_time::{SystemProvider, TimeProvider}; -use metric::{Metric, U64Gauge, U64Histogram, U64HistogramOptions}; +use metric::{DurationHistogram, Metric, U64Gauge}; use std::sync::Arc; /// An [`InstrumentedCache`] decorates a [`NamespaceCache`] with cache read @@ -18,36 +18,26 @@ pub struct InstrumentedCache { column_count: U64Gauge, /// A cache read hit - get_hit: U64Histogram, + get_hit: DurationHistogram, /// A cache read miss - get_miss: U64Histogram, + get_miss: DurationHistogram, /// A cache put for a namespace that did not previously exist. - put_insert: U64Histogram, + put_insert: DurationHistogram, /// A cache put replacing a namespace that previously had a cache entry. - put_update: U64Histogram, + put_update: DurationHistogram, } impl InstrumentedCache { /// Instrument `T`, recording cache operations to `registry`. pub fn new(inner: T, registry: &metric::Registry) -> Self { - let buckets = || { - U64HistogramOptions::new([5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, u64::MAX]) - }; - - let get_counter: Metric = registry.register_metric_with_options( - "namespace_cache_get_duration_ms", - "cache read call duration in milliseconds", - buckets, - ); + let get_counter: Metric = + registry.register_metric("namespace_cache_get_duration", "cache read call duration"); let get_hit = get_counter.recorder(&[("result", "hit")]); let get_miss = get_counter.recorder(&[("result", "miss")]); - let put_counter: Metric = registry.register_metric_with_options( - "namespace_cache_put_duration_ms", - "cache put call duration in milliseconds", - buckets, - ); + let put_counter: Metric = + registry.register_metric("namespace_cache_put_duration", "cache put call duration"); let put_insert = put_counter.recorder(&[("op", "insert")]); let put_update = put_counter.recorder(&[("op", "update")]); @@ -90,8 +80,8 @@ where // if it happens. if let Some(delta) = self.time_provider.now().checked_duration_since(t) { match &res { - Some(_) => self.get_hit.record(delta.as_millis() as _), - None => self.get_miss.record(delta.as_millis() as _), + Some(_) => self.get_hit.record(delta), + None => self.get_miss.record(delta), }; } @@ -112,7 +102,7 @@ where match res { Some(v) => { if let Some(delta) = self.time_provider.now().checked_duration_since(t) { - self.put_update.record(delta.as_millis() as _); + self.put_update.record(delta); } // Figure out the difference between the new namespace and the @@ -129,7 +119,7 @@ where } None => { if let Some(delta) = self.time_provider.now().checked_duration_since(t) { - self.put_insert.record(delta.as_millis() as _); + self.put_insert.record(delta); } // Add the new namespace stats to the counts. @@ -215,7 +205,7 @@ mod tests { count: u64, ) { let histogram = metrics - .get_instrument::>(metric_name) + .get_instrument::>(metric_name) .expect("failed to read metric") .get_observer(&Attributes::from(&[attr])) .expect("failed to get observer") @@ -240,13 +230,13 @@ mod tests { assert!(cache.put_schema(ns.clone(), schema).is_none()); assert_histogram_hit( ®istry, - "namespace_cache_put_duration_ms", + "namespace_cache_put_duration", ("op", "insert"), 1, ); assert_histogram_hit( ®istry, - "namespace_cache_put_duration_ms", + "namespace_cache_put_duration", ("op", "update"), 0, ); @@ -258,13 +248,13 @@ mod tests { assert!(cache.put_schema(ns.clone(), schema).is_some()); assert_histogram_hit( ®istry, - "namespace_cache_put_duration_ms", + "namespace_cache_put_duration", ("op", "insert"), 1, ); // Unchanged assert_histogram_hit( ®istry, - "namespace_cache_put_duration_ms", + "namespace_cache_put_duration", ("op", "update"), 1, ); @@ -276,13 +266,13 @@ mod tests { assert!(cache.put_schema(ns.clone(), schema).is_some()); assert_histogram_hit( ®istry, - "namespace_cache_put_duration_ms", + "namespace_cache_put_duration", ("op", "insert"), 1, ); // Unchanged assert_histogram_hit( ®istry, - "namespace_cache_put_duration_ms", + "namespace_cache_put_duration", ("op", "update"), 2, ); @@ -294,13 +284,13 @@ mod tests { assert!(cache.put_schema(ns.clone(), schema).is_some()); assert_histogram_hit( ®istry, - "namespace_cache_put_duration_ms", + "namespace_cache_put_duration", ("op", "insert"), 1, ); // Unchanged assert_histogram_hit( ®istry, - "namespace_cache_put_duration_ms", + "namespace_cache_put_duration", ("op", "update"), 3, ); @@ -312,13 +302,13 @@ mod tests { assert!(cache.put_schema(ns.clone(), schema).is_some()); assert_histogram_hit( ®istry, - "namespace_cache_put_duration_ms", + "namespace_cache_put_duration", ("op", "insert"), 1, ); // Unchanged assert_histogram_hit( ®istry, - "namespace_cache_put_duration_ms", + "namespace_cache_put_duration", ("op", "update"), 4, ); @@ -330,13 +320,13 @@ mod tests { assert!(cache.put_schema(ns.clone(), schema).is_some()); assert_histogram_hit( ®istry, - "namespace_cache_put_duration_ms", + "namespace_cache_put_duration", ("op", "insert"), 1, ); // Unchanged assert_histogram_hit( ®istry, - "namespace_cache_put_duration_ms", + "namespace_cache_put_duration", ("op", "update"), 5, ); @@ -348,13 +338,13 @@ mod tests { assert!(cache.put_schema(ns, schema).is_some()); assert_histogram_hit( ®istry, - "namespace_cache_put_duration_ms", + "namespace_cache_put_duration", ("op", "insert"), 1, ); // Unchanged assert_histogram_hit( ®istry, - "namespace_cache_put_duration_ms", + "namespace_cache_put_duration", ("op", "update"), 6, ); @@ -367,13 +357,13 @@ mod tests { assert!(cache.put_schema(ns.clone(), schema).is_none()); assert_histogram_hit( ®istry, - "namespace_cache_put_duration_ms", + "namespace_cache_put_duration", ("op", "insert"), 2, ); assert_histogram_hit( ®istry, - "namespace_cache_put_duration_ms", + "namespace_cache_put_duration", ("op", "update"), 6, ); @@ -383,7 +373,7 @@ mod tests { let _got = cache.get_schema(&ns).expect("should exist"); assert_histogram_hit( ®istry, - "namespace_cache_get_duration_ms", + "namespace_cache_get_duration", ("result", "hit"), 1, ); diff --git a/router/src/sequencer.rs b/router/src/sequencer.rs index 19ce339f16..a0849c09fd 100644 --- a/router/src/sequencer.rs +++ b/router/src/sequencer.rs @@ -1,10 +1,9 @@ //! A representation of a single operation sequencer. -use std::{borrow::Cow, hash::Hash, sync::Arc}; - use dml::{DmlMeta, DmlOperation}; use iox_time::{SystemProvider, TimeProvider}; -use metric::{Metric, U64Histogram, U64HistogramOptions}; +use metric::{DurationHistogram, Metric}; +use std::{borrow::Cow, hash::Hash, sync::Arc}; use write_buffer::core::{WriteBufferError, WriteBufferWriting}; /// A sequencer tags an write buffer with a sequencer ID. @@ -14,8 +13,8 @@ pub struct Sequencer

{ inner: Arc, time_provider: P, - enqueue_success: U64Histogram, - enqueue_error: U64Histogram, + enqueue_success: DurationHistogram, + enqueue_error: DurationHistogram, } impl Eq for Sequencer {} @@ -35,13 +34,9 @@ impl Hash for Sequencer { impl Sequencer { /// Tag `inner` with the specified `id`. pub fn new(id: usize, inner: Arc, metrics: &metric::Registry) -> Self { - let buckets = || { - U64HistogramOptions::new([5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, u64::MAX]) - }; - let write: Metric = metrics.register_metric_with_options( - "sequencer_enqueue_duration_ms", - "sequencer enqueue call duration in milliseconds", - buckets, + let write: Metric = metrics.register_metric( + "sequencer_enqueue_duration", + "sequencer enqueue call duration", ); let enqueue_success = write.recorder([ @@ -79,8 +74,8 @@ impl Sequencer { if let Some(delta) = self.time_provider.now().checked_duration_since(t) { match &res { - Ok(_) => self.enqueue_success.record(delta.as_millis() as _), - Err(_) => self.enqueue_error.record(delta.as_millis() as _), + Ok(_) => self.enqueue_success.record(delta), + Err(_) => self.enqueue_error.record(delta), } } diff --git a/router/tests/http.rs b/router/tests/http.rs index 90c143d47b..0c21ec5be2 100644 --- a/router/tests/http.rs +++ b/router/tests/http.rs @@ -4,7 +4,7 @@ use dml::DmlOperation; use hashbrown::HashMap; use hyper::{Body, Request, StatusCode}; use iox_catalog::{interface::Catalog, mem::MemCatalog}; -use metric::{Attributes, Metric, Registry, U64Counter, U64Histogram}; +use metric::{Attributes, DurationHistogram, Metric, Registry, U64Counter}; use mutable_batch::MutableBatch; use router::{ dml_handlers::{ @@ -201,7 +201,7 @@ async fn test_write_ok() { // Ensure the metric instrumentation was hit let histogram = ctx .metrics() - .get_instrument::>("dml_handler_write_duration_ms") + .get_instrument::>("dml_handler_write_duration") .expect("failed to read metric") .get_observer(&Attributes::from(&[ ("handler", "request"), @@ -224,7 +224,7 @@ async fn test_write_ok() { let histogram = ctx .metrics() - .get_instrument::>("sequencer_enqueue_duration_ms") + .get_instrument::>("sequencer_enqueue_duration") .expect("failed to read metric") .get_observer(&Attributes::from(&[ ("kafka_partition", "0"),