Merge pull request #4820 from influxdata/cn/duration

refactor: Use DurationHistogram in more places
pull/24376/head
kodiakhq[bot] 2022-06-10 14:29:26 +00:00 committed by GitHub
commit e437dff4c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 211 additions and 338 deletions

View File

@ -23,7 +23,7 @@ use iox_query::{
QueryChunk,
};
use iox_time::TimeProvider;
use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram, U64HistogramOptions};
use metric::{Attributes, DurationHistogram, Metric, U64Counter, U64Gauge};
use observability_deps::tracing::{debug, info, trace, warn};
use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
@ -191,7 +191,7 @@ pub struct Compactor {
compaction_candidate_bytes_gauge: Metric<U64Gauge>,
/// Histogram for tracking the time to compact a partition
compaction_duration_ms: Metric<U64Histogram>,
compaction_duration: Metric<DurationHistogram>,
}
impl Compactor {
@ -230,14 +230,9 @@ impl Compactor {
"Counter for level promotion from 0 to 1",
);
// buckets for timing compact partition
let compaction_duration_buckets_ms =
|| U64HistogramOptions::new([100, 1000, 5000, 10000, 30000, 60000, 360000, u64::MAX]);
let compaction_duration_ms: Metric<U64Histogram> = registry.register_metric_with_options(
"compactor_compact_partition_duration_ms",
"Compact partition duration in milliseconds",
compaction_duration_buckets_ms,
let compaction_duration: Metric<DurationHistogram> = registry.register_metric(
"compactor_compact_partition_duration",
"Compact partition duration",
);
Self {
@ -254,7 +249,7 @@ impl Compactor {
level_promotion_counter,
compaction_candidate_gauge,
compaction_candidate_bytes_gauge,
compaction_duration_ms,
compaction_duration,
}
}
@ -540,8 +535,8 @@ impl Compactor {
}
if let Some(delta) = self.time_provider.now().checked_duration_since(start_time) {
let duration_ms = self.compaction_duration_ms.recorder(attributes.clone());
duration_ms.record(delta.as_millis() as _);
let duration = self.compaction_duration.recorder(attributes.clone());
duration.record(delta);
}
let compaction_counter = self.compaction_counter.recorder(attributes.clone());

View File

@ -16,13 +16,17 @@ pub async fn test_metrics() {
Step::WriteLineProtocol(lp),
Step::WaitForReadable,
Step::VerifiedMetrics(Box::new(|_state, metrics| {
let metrics_lines: Vec<_> = metrics.trim().split('\n').collect();
let catalog_op_metrics_count = metrics_lines
.iter()
.filter(|x| x.starts_with("catalog_op_duration_seconds_bucket"))
.count();
assert!(
catalog_op_metrics_count >= 180,
"Expected at least 180 catalog op metrics, got: {}\n\n{}",
catalog_op_metrics_count,
metrics
.trim()
.split('\n')
.filter(|x| x.starts_with("catalog_op_duration_ms_bucket"))
.count()
>= 180
);
})),
],

View File

@ -23,7 +23,7 @@ use generated_types::ingester::IngesterQueryRequest;
use iox_catalog::interface::Catalog;
use iox_query::exec::Executor;
use iox_time::{SystemProvider, TimeProvider};
use metric::{Metric, U64Counter, U64Histogram, U64HistogramOptions};
use metric::{DurationHistogram, Metric, U64Counter};
use object_store::DynObjectStore;
use observability_deps::tracing::*;
use snafu::{ResultExt, Snafu};
@ -109,9 +109,9 @@ pub struct IngestHandlerImpl<T = SystemProvider> {
time_provider: T,
/// Query execution duration distribution (milliseconds).
query_duration_success_ms: U64Histogram,
query_duration_error_ms: U64Histogram,
/// Query execution duration distribution.
query_duration_success: DurationHistogram,
query_duration_error: DurationHistogram,
/// Query request rejected due to concurrency limits
query_request_limit_rejected: U64Counter,
@ -250,30 +250,12 @@ impl IngestHandlerImpl {
}
// Record query duration metrics, broken down by query execution result
let query_duration: Metric<U64Histogram> = metric_registry.register_metric_with_options(
"flight_query_duration_ms",
"flight request query execution duration in milliseconds",
|| {
U64HistogramOptions::new([
5,
10,
20,
40,
80,
160,
320,
640,
1280,
2560,
5120,
10240,
20480,
u64::MAX,
])
},
let query_duration: Metric<DurationHistogram> = metric_registry.register_metric(
"flight_query_duration",
"flight request query execution duration",
);
let query_duration_success_ms = query_duration.recorder(&[("result", "success")]);
let query_duration_error_ms = query_duration.recorder(&[("result", "error")]);
let query_duration_success = query_duration.recorder(&[("result", "success")]);
let query_duration_error = query_duration.recorder(&[("result", "error")]);
let query_request_limit_rejected = metric_registry
.register_metric::<U64Counter>(
@ -287,8 +269,8 @@ impl IngestHandlerImpl {
kafka_topic: topic,
join_handles,
shutdown,
query_duration_success_ms,
query_duration_error_ms,
query_duration_success,
query_duration_error,
query_request_limit_rejected,
request_sem: Semaphore::new(max_requests),
time_provider: Default::default(),
@ -325,10 +307,8 @@ impl IngestHandler for IngestHandlerImpl {
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
match &res {
Ok(_) => self
.query_duration_success_ms
.record(delta.as_millis() as _),
Err(_) => self.query_duration_error_ms.record(delta.as_millis() as _),
Ok(_) => self.query_duration_success.record(delta),
Err(_) => self.query_duration_error.record(delta),
};
}
@ -397,7 +377,7 @@ mod tests {
use dml::{DmlMeta, DmlWrite};
use iox_catalog::{mem::MemCatalog, validate_or_insert_schema};
use iox_time::Time;
use metric::{Attributes, Metric, U64Counter, U64Gauge, U64Histogram};
use metric::{Attributes, Metric, U64Counter, U64Gauge};
use mutable_batch_lp::lines_to_batches;
use object_store::memory::InMemory;
use std::{num::NonZeroU32, ops::DerefMut};
@ -504,7 +484,7 @@ mod tests {
let observation = ingester
.metrics
.get_instrument::<Metric<U64Histogram>>("ingester_op_apply_duration_ms")
.get_instrument::<Metric<DurationHistogram>>("ingester_op_apply_duration")
.unwrap()
.get_observer(&Attributes::from(&[
("kafka_topic", "whatevs"),

View File

@ -4,7 +4,7 @@ use data_types::{KafkaPartition, SequenceNumber};
use dml::DmlOperation;
use futures::{pin_mut, FutureExt, StreamExt};
use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, U64Counter, U64Gauge};
use metric::{Attributes, DurationCounter, DurationGauge, U64Counter};
use observability_deps::tracing::*;
use std::{fmt::Debug, time::Duration};
use tokio_util::sync::CancellationToken;
@ -48,10 +48,10 @@ pub struct SequencedStreamHandler<I, O, T = SystemProvider> {
// Metrics
time_provider: T,
time_to_be_readable_ms: U64Gauge,
time_to_be_readable: DurationGauge,
/// Duration of time ingest is paused at the request of the LifecycleManager
pause_duration_ms: U64Counter,
pause_duration: DurationCounter,
/// Errors during op stream reading
seq_unknown_sequence_number_count: U64Counter,
@ -86,17 +86,18 @@ impl<I, O> SequencedStreamHandler<I, O> {
skip_to_oldest_available: bool,
) -> Self {
// TTBR
let time_to_be_readable_ms = metrics.register_metric::<U64Gauge>(
"ingester_ttbr_ms",
"duration of time between producer writing to consumer putting into queryable cache in \
milliseconds",
let time_to_be_readable = metrics.register_metric::<DurationGauge>(
"ingester_ttbr",
"duration of time between producer writing to consumer putting into queryable cache",
).recorder(metric_attrs(kafka_partition, &kafka_topic_name, None, false));
// Lifecycle-driven ingest pause duration
let pause_duration_ms = metrics.register_metric::<U64Counter>(
"ingest_paused_duration_ms_total",
"duration of time ingestion has been paused by the lifecycle manager in milliseconds",
).recorder(&[]);
let pause_duration = metrics
.register_metric::<DurationCounter>(
"ingest_paused_duration_total",
"duration of time ingestion has been paused by the lifecycle manager",
)
.recorder(&[]);
// Error count metrics
let ingest_errors = metrics.register_metric::<U64Counter>(
@ -140,8 +141,8 @@ impl<I, O> SequencedStreamHandler<I, O> {
sink,
lifecycle_handle,
time_provider: SystemProvider::default(),
time_to_be_readable_ms,
pause_duration_ms,
time_to_be_readable,
pause_duration,
seq_unknown_sequence_number_count,
seq_invalid_data_count,
seq_unknown_error_count,
@ -162,8 +163,8 @@ impl<I, O> SequencedStreamHandler<I, O> {
sink: self.sink,
lifecycle_handle: self.lifecycle_handle,
time_provider: provider,
time_to_be_readable_ms: self.time_to_be_readable_ms,
pause_duration_ms: self.pause_duration_ms,
time_to_be_readable: self.time_to_be_readable,
pause_duration: self.pause_duration,
seq_unknown_sequence_number_count: self.seq_unknown_sequence_number_count,
seq_invalid_data_count: self.seq_invalid_data_count,
seq_unknown_error_count: self.seq_unknown_error_count,
@ -356,7 +357,7 @@ where
if let Some(delta) =
produced_at.and_then(|ts| self.time_provider.now().checked_duration_since(ts))
{
self.time_to_be_readable_ms.set(delta.as_millis() as u64);
self.time_to_be_readable.set(delta);
}
if should_pause {
@ -384,8 +385,7 @@ where
// While the actual sleep may be slightly longer than
// INGEST_POLL_INTERVAL, it's not likely to be a useful
// distinction in the metrics.
self.pause_duration_ms
.inc(INGEST_POLL_INTERVAL.as_millis() as _);
self.pause_duration.inc(INGEST_POLL_INTERVAL);
tokio::time::sleep(INGEST_POLL_INTERVAL).await;
}
@ -644,7 +644,7 @@ mod tests {
// Assert the TTBR metric value
let ttbr = metrics
.get_instrument::<Metric<U64Gauge>>("ingester_ttbr_ms")
.get_instrument::<Metric<DurationGauge>>("ingester_ttbr")
.expect("did not find ttbr metric")
.get_observer(&Attributes::from([
("kafka_topic", TEST_KAFKA_TOPIC.into()),
@ -652,7 +652,7 @@ mod tests {
]))
.expect("did not match metric attributes")
.fetch();
assert_eq!(ttbr, $want_ttbr);
assert_eq!(ttbr, Duration::from_millis($want_ttbr));
// Assert any error metrics in the macro call
$(

View File

@ -5,7 +5,7 @@ use async_trait::async_trait;
use data_types::KafkaPartition;
use dml::DmlOperation;
use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, U64Counter, U64Gauge, U64Histogram, U64HistogramOptions};
use metric::{Attributes, DurationHistogram, U64Counter, U64Gauge};
use std::fmt::Debug;
use trace::span::SpanRecorder;
@ -52,8 +52,8 @@ pub struct SinkInstrumentation<F, T, P = SystemProvider> {
/// Op application success/failure call latency histograms (which include
/// counters)
op_apply_success_ms: U64Histogram,
op_apply_error_ms: U64Histogram,
op_apply_success: DurationHistogram,
op_apply_error: DurationHistogram,
/// Write buffer metrics
write_buffer_bytes_read: U64Counter,
@ -111,37 +111,16 @@ where
)
.recorder(attr.clone());
// The buckets for the op apply histogram
let buckets = || {
U64HistogramOptions::new([
5,
10,
20,
40,
80,
160,
320,
640,
1280,
2560,
5120,
10240,
20480,
u64::MAX,
])
};
let op_apply = metrics.register_metric_with_options::<U64Histogram, _>(
"ingester_op_apply_duration_ms",
let op_apply = metrics.register_metric::<DurationHistogram>(
"ingester_op_apply_duration",
"The duration of time taken to process an operation read from the sequencer",
buckets,
);
let op_apply_success_ms = op_apply.recorder({
let op_apply_success = op_apply.recorder({
let mut attr = attr.clone();
attr.insert("result", "success");
attr
});
let op_apply_error_ms = op_apply.recorder({
let op_apply_error = op_apply.recorder({
let mut attr = attr;
attr.insert("result", "error");
attr
@ -152,8 +131,8 @@ where
watermark_fetcher,
sequencer_id: kafka_partition.get(),
op_apply_success_ms,
op_apply_error_ms,
op_apply_success,
op_apply_error,
write_buffer_bytes_read,
write_buffer_last_sequence_number,
@ -235,14 +214,14 @@ where
let metric = match &res {
Ok(_) => {
span_recorder.ok("success");
&self.op_apply_success_ms
&self.op_apply_success
}
Err(e) => {
span_recorder.error(e.to_string());
&self.op_apply_error_ms
&self.op_apply_error
}
};
metric.record(delta.as_millis() as _);
metric.record(delta);
}
// Return the result from the inner handler unmodified
@ -394,12 +373,12 @@ mod tests {
);
// Validate the success histogram was hit
let hist = get_metric::<U64Histogram>(&metrics, "ingester_op_apply_duration_ms", &{
let hist = get_metric::<DurationHistogram>(&metrics, "ingester_op_apply_duration", &{
let mut attrs = DEFAULT_ATTRS.clone();
attrs.insert("result", "success");
attrs
});
assert_matches!(hist, Observation::U64Histogram(h) => {
assert_matches!(hist, Observation::DurationHistogram(h) => {
let hits: u64 = h.buckets.iter().map(|b| b.count).sum();
assert_eq!(hits, 1);
});
@ -465,12 +444,12 @@ mod tests {
);
// Validate the histogram was hit even on error
let hist = get_metric::<U64Histogram>(&metrics, "ingester_op_apply_duration_ms", &{
let hist = get_metric::<DurationHistogram>(&metrics, "ingester_op_apply_duration", &{
let mut attrs = DEFAULT_ATTRS.clone();
attrs.insert("result", "error");
attrs
});
assert_matches!(hist, Observation::U64Histogram(h) => {
assert_matches!(hist, Observation::DurationHistogram(h) => {
let hits: u64 = h.buckets.iter().map(|b| b.count).sum();
assert_eq!(hits, 1);
});
@ -529,12 +508,12 @@ mod tests {
);
// Validate the success histogram was hit
let hist = get_metric::<U64Histogram>(&metrics, "ingester_op_apply_duration_ms", &{
let hist = get_metric::<DurationHistogram>(&metrics, "ingester_op_apply_duration", &{
let mut attrs = DEFAULT_ATTRS.clone();
attrs.insert("result", "success");
attrs
});
assert_matches!(hist, Observation::U64Histogram(h) => {
assert_matches!(hist, Observation::DurationHistogram(h) => {
let hits: u64 = h.buckets.iter().map(|b| b.count).sum();
assert_eq!(hits, 1);
});
@ -594,12 +573,12 @@ mod tests {
);
// Validate the success histogram was hit
let hist = get_metric::<U64Histogram>(&metrics, "ingester_op_apply_duration_ms", &{
let hist = get_metric::<DurationHistogram>(&metrics, "ingester_op_apply_duration", &{
let mut attrs = DEFAULT_ATTRS.clone();
attrs.insert("result", "success");
attrs
});
assert_matches!(hist, Observation::U64Histogram(h) => {
assert_matches!(hist, Observation::DurationHistogram(h) => {
let hits: u64 = h.buckets.iter().map(|b| b.count).sum();
assert_eq!(hits, 1);
});

View File

@ -774,7 +774,7 @@ pub(crate) mod test_helpers {
use super::*;
use ::test_helpers::{assert_contains, tracing::TracingCapture};
use data_types::ColumnId;
use metric::{Attributes, Metric, U64Histogram};
use metric::{Attributes, DurationHistogram, Metric};
use std::{
ops::{Add, DerefMut},
sync::Arc,
@ -3084,7 +3084,7 @@ pub(crate) mod test_helpers {
fn assert_metric_hit(metrics: &metric::Registry, name: &'static str) {
let histogram = metrics
.get_instrument::<Metric<U64Histogram>>("catalog_op_duration_ms")
.get_instrument::<Metric<DurationHistogram>>("catalog_op_duration")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[("op", name), ("result", "success")]))
.expect("failed to get observer")

View File

@ -13,7 +13,7 @@ use data_types::{
SequencerId, Table, TableId, TablePartition, Timestamp, Tombstone, TombstoneId,
};
use iox_time::{SystemProvider, TimeProvider};
use metric::{Metric, U64Histogram, U64HistogramOptions};
use metric::{DurationHistogram, Metric};
use std::{fmt::Debug, sync::Arc};
use uuid::Uuid;
@ -21,7 +21,7 @@ use uuid::Uuid;
/// transactional variant) with instrumentation that emits latency histograms
/// for each method.
///
/// Values are recorded under the `catalog_op_duration_ms` metric, labelled by
/// Values are recorded under the `catalog_op_duration` metric, labelled by
/// operation name and result (success/error).
#[derive(Debug)]
pub struct MetricDecorator<T, P = SystemProvider> {
@ -149,14 +149,9 @@ macro_rules! decorate {
$(
async fn $method(&mut self, $($arg : $t),*) -> Result<$out> {
let buckets = || {
U64HistogramOptions::new([5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, u64::MAX])
};
let observer: Metric<U64Histogram> = self.metrics.register_metric_with_options(
"catalog_op_duration_ms",
"catalog call duration in milliseconds",
buckets,
let observer: Metric<DurationHistogram> = self.metrics.register_metric(
"catalog_op_duration",
"catalog call duration",
);
let t = self.time_provider.now();
@ -169,7 +164,7 @@ macro_rules! decorate {
Ok(_) => "success",
Err(_) => "error",
};
observer.recorder(&[("op", $metric), ("result", tag)]).record(delta.as_millis() as _);
observer.recorder(&[("op", $metric), ("result", tag)]).record(delta);
}
res

View File

@ -1950,7 +1950,7 @@ mod tests {
use super::*;
use crate::create_or_get_default_records;
use assert_matches::assert_matches;
use metric::{Attributes, Metric, U64Histogram};
use metric::{Attributes, DurationHistogram, Metric};
use rand::Rng;
use sqlx::migrate::MigrateDatabase;
use std::{env, io::Write, ops::DerefMut, sync::Arc, time::Instant};
@ -2005,7 +2005,7 @@ mod tests {
fn assert_metric_hit(metrics: &metric::Registry, name: &'static str) {
let histogram = metrics
.get_instrument::<Metric<U64Histogram>>("catalog_op_duration_ms")
.get_instrument::<Metric<DurationHistogram>>("catalog_op_duration")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[("op", name), ("result", "success")]))
.expect("failed to get observer")

View File

@ -11,7 +11,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use futures::{stream::BoxStream, Stream, StreamExt};
use iox_time::{SystemProvider, Time, TimeProvider};
use metric::{Metric, U64Counter, U64Histogram, U64HistogramOptions};
use metric::{DurationHistogram, Metric, U64Counter};
use pin_project::{pin_project, pinned_drop};
use object_store::{path::Path, GetResult, ListResult, ObjectMeta, ObjectStore, Result};
@ -70,19 +70,19 @@ pub struct ObjectStoreMetrics {
inner: Arc<dyn ObjectStore>,
time_provider: Arc<dyn TimeProvider>,
put_success_duration_ms: U64Histogram,
put_error_duration_ms: U64Histogram,
put_success_duration: DurationHistogram,
put_error_duration: DurationHistogram,
put_bytes: U64Counter,
get_success_duration_ms: U64Histogram,
get_error_duration_ms: U64Histogram,
get_success_duration: DurationHistogram,
get_error_duration: DurationHistogram,
get_bytes: U64Counter,
delete_success_duration_ms: U64Histogram,
delete_error_duration_ms: U64Histogram,
delete_success_duration: DurationHistogram,
delete_error_duration: DurationHistogram,
list_success_duration_ms: U64Histogram,
list_error_duration_ms: U64Histogram,
list_success_duration: DurationHistogram,
list_error_duration: DurationHistogram,
}
impl ObjectStoreMetrics {
@ -92,25 +92,6 @@ impl ObjectStoreMetrics {
time_provider: Arc<dyn TimeProvider>,
registry: &metric::Registry,
) -> Self {
let buckets = || {
U64HistogramOptions::new([
5,
10,
20,
40,
80,
160,
320,
640,
1280,
2560,
5120,
10240,
20480,
u64::MAX,
])
};
// Byte counts up/down
let bytes = registry.register_metric::<U64Counter>(
"object_store_transfer_bytes",
@ -120,10 +101,9 @@ impl ObjectStoreMetrics {
let get_bytes = bytes.recorder(&[("op", "get")]);
// Call durations broken down by op & result
let duration: Metric<U64Histogram> = registry.register_metric_with_options(
"object_store_op_duration_ms",
"object store operation duration in milliseconds",
buckets,
let duration: Metric<DurationHistogram> = registry.register_metric(
"object_store_op_duration",
"object store operation duration",
);
let put_success_duration = duration.recorder(&[("op", "put"), ("result", "success")]);
let put_error_duration = duration.recorder(&[("op", "put"), ("result", "error")]);
@ -138,19 +118,19 @@ impl ObjectStoreMetrics {
inner,
time_provider,
put_success_duration_ms: put_success_duration,
put_error_duration_ms: put_error_duration,
put_success_duration,
put_error_duration,
put_bytes,
get_bytes,
get_success_duration_ms: get_success_duration,
get_error_duration_ms: get_error_duration,
get_success_duration,
get_error_duration,
delete_success_duration_ms: delete_success_duration,
delete_error_duration_ms: delete_error_duration,
delete_success_duration,
delete_error_duration,
list_success_duration_ms: list_success_duration,
list_error_duration_ms: list_error_duration,
list_success_duration,
list_error_duration,
}
}
}
@ -174,8 +154,8 @@ impl ObjectStore for ObjectStoreMetrics {
// if it happens.
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
match &res {
Ok(_) => self.put_success_duration_ms.record(delta.as_millis() as _),
Err(_) => self.put_error_duration_ms.record(delta.as_millis() as _),
Ok(_) => self.put_success_duration.record(delta),
Err(_) => self.put_error_duration.record(delta),
};
}
@ -193,7 +173,7 @@ impl ObjectStore for ObjectStoreMetrics {
if let Ok(m) = file.metadata().await {
self.get_bytes.inc(m.len());
if let Some(d) = self.time_provider.now().checked_duration_since(started_at) {
self.get_success_duration_ms.record(d.as_millis() as _)
self.get_success_duration.record(d)
}
}
Ok(GetResult::File(file, path))
@ -205,8 +185,8 @@ impl ObjectStore for ObjectStoreMetrics {
StreamMetricRecorder::new(
s,
started_at,
self.get_success_duration_ms.clone(),
self.get_error_duration_ms.clone(),
self.get_success_duration.clone(),
self.get_error_duration.clone(),
BytesStreamDelegate(self.get_bytes.clone()),
)
.fuse(),
@ -215,7 +195,7 @@ impl ObjectStore for ObjectStoreMetrics {
Err(e) => {
// Record the call duration in the error histogram.
if let Some(delta) = self.time_provider.now().checked_duration_since(started_at) {
self.get_error_duration_ms.record(delta.as_millis() as _);
self.get_error_duration.record(delta);
}
Err(e)
}
@ -236,10 +216,8 @@ impl ObjectStore for ObjectStoreMetrics {
// if it happens.
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
match &res {
Ok(_) => self
.delete_success_duration_ms
.record(delta.as_millis() as _),
Err(_) => self.delete_error_duration_ms.record(delta.as_millis() as _),
Ok(_) => self.delete_success_duration.record(delta),
Err(_) => self.delete_error_duration.record(delta),
};
}
@ -262,8 +240,8 @@ impl ObjectStore for ObjectStoreMetrics {
StreamMetricRecorder::new(
s,
started_at,
self.list_success_duration_ms.clone(),
self.list_error_duration_ms.clone(),
self.list_success_duration.clone(),
self.list_error_duration.clone(),
NopStreamDelegate::default(),
)
.fuse(),
@ -272,7 +250,7 @@ impl ObjectStore for ObjectStoreMetrics {
Err(e) => {
// Record the call duration in the error histogram.
if let Some(delta) = self.time_provider.now().checked_duration_since(started_at) {
self.list_error_duration_ms.record(delta.as_millis() as _);
self.list_error_duration.record(delta);
}
Err(e)
}
@ -288,8 +266,8 @@ impl ObjectStore for ObjectStoreMetrics {
// if it happens.
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
match &res {
Ok(_) => self.list_success_duration_ms.record(delta.as_millis() as _),
Err(_) => self.list_error_duration_ms.record(delta.as_millis() as _),
Ok(_) => self.list_success_duration.record(delta),
Err(_) => self.list_error_duration.record(delta),
};
}
@ -389,8 +367,8 @@ where
// the `T`.
metric_delegate: D,
success_duration_ms: U64Histogram,
error_duration_ms: U64Histogram,
success_duration: DurationHistogram,
error_duration: DurationHistogram,
}
impl<S, D> StreamMetricRecorder<S, D>
@ -401,8 +379,8 @@ where
fn new(
stream: S,
started_at: Time,
success_duration_ms: U64Histogram,
error_duration_ms: U64Histogram,
success_duration: DurationHistogram,
error_duration: DurationHistogram,
metric_delegate: D,
) -> Self {
let time_provider = SystemProvider::default();
@ -420,8 +398,8 @@ where
started_at,
time_provider,
success_duration_ms,
error_duration_ms,
success_duration,
error_duration,
metric_delegate,
}
}
@ -459,8 +437,8 @@ where
// The stream has terminated - record the wall clock duration
// immediately.
let hist = match this.last_call_ok {
true => this.success_duration_ms,
false => this.error_duration_ms,
true => this.success_duration,
false => this.error_duration,
};
// Take the last_yielded_at option, marking metrics as emitted
@ -471,7 +449,7 @@ where
.expect("no last_yielded_at value for fused stream")
.checked_duration_since(*this.started_at)
{
hist.record(d.as_millis() as _)
hist.record(d)
}
Poll::Ready(None)
@ -498,12 +476,12 @@ where
// therefore last_yielded_at is still Some).
if let Some(last) = self.last_yielded_at {
let hist = match self.last_call_ok {
true => &self.success_duration_ms,
false => &self.error_duration_ms,
true => &self.success_duration,
false => &self.error_duration,
};
if let Some(d) = last.checked_duration_since(self.started_at) {
hist.record(d.as_millis() as _)
hist.record(d)
}
}
}
@ -532,7 +510,7 @@ mod tests {
attr: [(&'static str, &'static str); N],
) {
let histogram = metrics
.get_instrument::<Metric<U64Histogram>>(name)
.get_instrument::<Metric<DurationHistogram>>(name)
.expect("failed to read histogram")
.get_observer(&Attributes::from(&attr))
.expect("failed to get observer")
@ -575,7 +553,7 @@ mod tests {
assert_counter_value(&metrics, "object_store_transfer_bytes", [("op", "put")], 5);
assert_histogram_hit(
&metrics,
"object_store_op_duration_ms",
"object_store_op_duration",
[("op", "put"), ("result", "success")],
);
}
@ -598,7 +576,7 @@ mod tests {
assert_counter_value(&metrics, "object_store_transfer_bytes", [("op", "put")], 5);
assert_histogram_hit(
&metrics,
"object_store_op_duration_ms",
"object_store_op_duration",
[("op", "put"), ("result", "error")],
);
}
@ -614,7 +592,7 @@ mod tests {
assert_histogram_hit(
&metrics,
"object_store_op_duration_ms",
"object_store_op_duration",
[("op", "list"), ("result", "success")],
);
}
@ -630,7 +608,7 @@ mod tests {
assert_histogram_hit(
&metrics,
"object_store_op_duration_ms",
"object_store_op_duration",
[("op", "list"), ("result", "error")],
);
}
@ -649,7 +627,7 @@ mod tests {
assert_histogram_hit(
&metrics,
"object_store_op_duration_ms",
"object_store_op_duration",
[("op", "list"), ("result", "success")],
);
}
@ -671,7 +649,7 @@ mod tests {
assert_histogram_hit(
&metrics,
"object_store_op_duration_ms",
"object_store_op_duration",
[("op", "list"), ("result", "error")],
);
}
@ -690,7 +668,7 @@ mod tests {
assert_histogram_hit(
&metrics,
"object_store_op_duration_ms",
"object_store_op_duration",
[("op", "get"), ("result", "error")],
);
}
@ -724,7 +702,7 @@ mod tests {
assert_counter_value(&metrics, "object_store_transfer_bytes", [("op", "get")], 5);
assert_histogram_hit(
&metrics,
"object_store_op_duration_ms",
"object_store_op_duration",
[("op", "get"), ("result", "success")],
);
@ -735,7 +713,7 @@ mod tests {
assert_histogram_hit(
&metrics,
"object_store_op_duration_ms",
"object_store_op_duration",
[("op", "delete"), ("result", "success")],
);
}
@ -763,7 +741,7 @@ mod tests {
assert_counter_value(&metrics, "object_store_transfer_bytes", [("op", "get")], 5);
assert_histogram_hit(
&metrics,
"object_store_op_duration_ms",
"object_store_op_duration",
[("op", "get"), ("result", "success")],
);
}
@ -785,10 +763,7 @@ mod tests {
let time_provider = SystemProvider::default();
let metrics = Arc::new(metric::Registry::default());
let hist: Metric<U64Histogram> =
metrics.register_metric_with_options("wall_clock", "", || {
U64HistogramOptions::new([1, 100, u64::MAX])
});
let hist: Metric<DurationHistogram> = metrics.register_metric("wall_clock", "");
let bytes = metrics
.register_metric::<U64Counter>(
@ -852,7 +827,7 @@ mod tests {
.fetch()
.buckets
.iter()
.skip_while(|b| b.le < SLEEP.as_millis() as _) // Skip buckets less than the sleep duration
.skip_while(|b| b.le < SLEEP) // Skip buckets less than the sleep duration
.fold(0, |acc, v| acc + v.count);
assert_eq!(
hit_count, 1,
@ -887,10 +862,7 @@ mod tests {
let time_provider = SystemProvider::default();
let metrics = Arc::new(metric::Registry::default());
let hist: Metric<U64Histogram> =
metrics.register_metric_with_options("wall_clock", "", || {
U64HistogramOptions::new([1, 100, u64::MAX])
});
let hist: Metric<DurationHistogram> = metrics.register_metric("wall_clock", "");
let bytes = metrics
.register_metric::<U64Counter>(
@ -955,10 +927,7 @@ mod tests {
let time_provider = SystemProvider::default();
let metrics = Arc::new(metric::Registry::default());
let hist: Metric<U64Histogram> =
metrics.register_metric_with_options("wall_clock", "", || {
U64HistogramOptions::new([1, 100, u64::MAX])
});
let hist: Metric<DurationHistogram> = metrics.register_metric("wall_clock", "");
let bytes = metrics
.register_metric::<U64Counter>(
@ -1023,10 +992,7 @@ mod tests {
let time_provider = SystemProvider::default();
let metrics = Arc::new(metric::Registry::default());
let hist: Metric<U64Histogram> =
metrics.register_metric_with_options("wall_clock", "", || {
U64HistogramOptions::new([1, 100, u64::MAX])
});
let hist: Metric<DurationHistogram> = metrics.register_metric("wall_clock", "");
let bytes = metrics
.register_metric::<U64Counter>(
@ -1096,10 +1062,7 @@ mod tests {
let time_provider = SystemProvider::default();
let metrics = Arc::new(metric::Registry::default());
let hist: Metric<U64Histogram> =
metrics.register_metric_with_options("wall_clock", "", || {
U64HistogramOptions::new([1, 100, u64::MAX])
});
let hist: Metric<DurationHistogram> = metrics.register_metric("wall_clock", "");
let bytes = metrics
.register_metric::<U64Counter>(
@ -1145,10 +1108,7 @@ mod tests {
let time_provider = SystemProvider::default();
let metrics = Arc::new(metric::Registry::default());
let hist: Metric<U64Histogram> =
metrics.register_metric_with_options("wall_clock", "", || {
U64HistogramOptions::new([1, 100, u64::MAX])
});
let hist: Metric<DurationHistogram> = metrics.register_metric("wall_clock", "");
let bytes = metrics
.register_metric::<U64Counter>(

View File

@ -1,8 +1,8 @@
use metric::{Attributes, Metric, U64Histogram};
use metric::{Attributes, DurationHistogram, Metric};
pub fn assert_histogram_metric_count(metrics: &metric::Registry, name: &'static str, n: u64) {
let histogram = metrics
.get_instrument::<Metric<U64Histogram>>("catalog_op_duration_ms")
.get_instrument::<Metric<DurationHistogram>>("catalog_op_duration")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[("op", name), ("result", "success")]))
.expect("failed to get observer")

View File

@ -2,59 +2,34 @@ use super::DmlHandler;
use async_trait::async_trait;
use data_types::{DatabaseName, DeletePredicate};
use iox_time::{SystemProvider, TimeProvider};
use metric::{Metric, U64Histogram, U64HistogramOptions};
use metric::{DurationHistogram, Metric};
use trace::{ctx::SpanContext, span::SpanRecorder};
/// An instrumentation decorator recording call latencies for [`DmlHandler`]
/// implementations.
/// An instrumentation decorator recording call latencies for [`DmlHandler`] implementations.
///
/// Metrics are broken down by operation (write/delete) and result
/// (success/error) with call latency reported in milliseconds.
/// Metrics are broken down by operation (write/delete) and result (success/error).
#[derive(Debug)]
pub struct InstrumentationDecorator<T, P = SystemProvider> {
name: &'static str,
inner: T,
time_provider: P,
write_success: U64Histogram,
write_error: U64Histogram,
write_success: DurationHistogram,
write_error: DurationHistogram,
delete_success: U64Histogram,
delete_error: U64Histogram,
delete_success: DurationHistogram,
delete_error: DurationHistogram,
}
impl<T> InstrumentationDecorator<T> {
/// Wrap a new [`InstrumentationDecorator`] over `T` exposing metrics
/// labelled with `handler=name`.
pub fn new(name: &'static str, registry: &metric::Registry, inner: T) -> Self {
let buckets = || {
U64HistogramOptions::new([
5,
10,
20,
40,
80,
160,
320,
640,
1280,
2560,
5120,
10240,
20480,
u64::MAX,
])
};
let write: Metric<U64Histogram> = registry.register_metric_with_options(
"dml_handler_write_duration_ms",
"write handler call duration in milliseconds",
buckets,
);
let delete: Metric<U64Histogram> = registry.register_metric_with_options(
"dml_handler_delete_duration_ms",
"delete handler call duration in milliseconds",
buckets,
let write: Metric<DurationHistogram> =
registry.register_metric("dml_handler_write_duration", "write handler call duration");
let delete: Metric<DurationHistogram> = registry.register_metric(
"dml_handler_delete_duration",
"delete handler call duration",
);
let write_success = write.recorder(&[("handler", name), ("result", "success")]);
@ -106,11 +81,11 @@ where
match &res {
Ok(_) => {
span_recorder.ok("success");
self.write_success.record(delta.as_millis() as _)
self.write_success.record(delta)
}
Err(e) => {
span_recorder.error(e.to_string());
self.write_error.record(delta.as_millis() as _)
self.write_error.record(delta)
}
};
}
@ -143,11 +118,11 @@ where
match &res {
Ok(_) => {
span_recorder.ok("success");
self.delete_success.record(delta.as_millis() as _)
self.delete_success.record(delta)
}
Err(e) => {
span_recorder.error(e.to_string());
self.delete_error.record(delta.as_millis() as _)
self.delete_error.record(delta)
}
};
}
@ -175,7 +150,7 @@ mod tests {
result: &'static str,
) {
let histogram = metrics
.get_instrument::<Metric<U64Histogram>>(metric_name)
.get_instrument::<Metric<DurationHistogram>>(metric_name)
.expect("failed to read metric")
.get_observer(&Attributes::from(&[
("handler", HANDLER_NAME),
@ -226,7 +201,7 @@ mod tests {
.await
.expect("inner handler configured to succeed");
assert_metric_hit(&*metrics, "dml_handler_write_duration_ms", "success");
assert_metric_hit(&*metrics, "dml_handler_write_duration", "success");
assert_trace(traces, SpanStatus::Ok);
}
@ -251,7 +226,7 @@ mod tests {
assert_matches!(err, DmlError::DatabaseNotFound(_));
assert_metric_hit(&*metrics, "dml_handler_write_duration_ms", "error");
assert_metric_hit(&*metrics, "dml_handler_write_duration", "error");
assert_trace(traces, SpanStatus::Err);
}
@ -276,7 +251,7 @@ mod tests {
.await
.expect("inner handler configured to succeed");
assert_metric_hit(&*metrics, "dml_handler_delete_duration_ms", "success");
assert_metric_hit(&*metrics, "dml_handler_delete_duration", "success");
assert_trace(traces, SpanStatus::Ok);
}
@ -304,7 +279,7 @@ mod tests {
.await
.expect_err("inner handler configured to fail");
assert_metric_hit(&*metrics, "dml_handler_delete_duration_ms", "error");
assert_metric_hit(&*metrics, "dml_handler_delete_duration", "error");
assert_trace(traces, SpanStatus::Err);
}
}

View File

@ -3,7 +3,7 @@
use super::NamespaceCache;
use data_types::{DatabaseName, NamespaceSchema};
use iox_time::{SystemProvider, TimeProvider};
use metric::{Metric, U64Gauge, U64Histogram, U64HistogramOptions};
use metric::{DurationHistogram, Metric, U64Gauge};
use std::sync::Arc;
/// An [`InstrumentedCache`] decorates a [`NamespaceCache`] with cache read
@ -18,36 +18,26 @@ pub struct InstrumentedCache<T, P = SystemProvider> {
column_count: U64Gauge,
/// A cache read hit
get_hit: U64Histogram,
get_hit: DurationHistogram,
/// A cache read miss
get_miss: U64Histogram,
get_miss: DurationHistogram,
/// A cache put for a namespace that did not previously exist.
put_insert: U64Histogram,
put_insert: DurationHistogram,
/// A cache put replacing a namespace that previously had a cache entry.
put_update: U64Histogram,
put_update: DurationHistogram,
}
impl<T> InstrumentedCache<T> {
/// Instrument `T`, recording cache operations to `registry`.
pub fn new(inner: T, registry: &metric::Registry) -> Self {
let buckets = || {
U64HistogramOptions::new([5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, u64::MAX])
};
let get_counter: Metric<U64Histogram> = registry.register_metric_with_options(
"namespace_cache_get_duration_ms",
"cache read call duration in milliseconds",
buckets,
);
let get_counter: Metric<DurationHistogram> =
registry.register_metric("namespace_cache_get_duration", "cache read call duration");
let get_hit = get_counter.recorder(&[("result", "hit")]);
let get_miss = get_counter.recorder(&[("result", "miss")]);
let put_counter: Metric<U64Histogram> = registry.register_metric_with_options(
"namespace_cache_put_duration_ms",
"cache put call duration in milliseconds",
buckets,
);
let put_counter: Metric<DurationHistogram> =
registry.register_metric("namespace_cache_put_duration", "cache put call duration");
let put_insert = put_counter.recorder(&[("op", "insert")]);
let put_update = put_counter.recorder(&[("op", "update")]);
@ -90,8 +80,8 @@ where
// if it happens.
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
match &res {
Some(_) => self.get_hit.record(delta.as_millis() as _),
None => self.get_miss.record(delta.as_millis() as _),
Some(_) => self.get_hit.record(delta),
None => self.get_miss.record(delta),
};
}
@ -112,7 +102,7 @@ where
match res {
Some(v) => {
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
self.put_update.record(delta.as_millis() as _);
self.put_update.record(delta);
}
// Figure out the difference between the new namespace and the
@ -129,7 +119,7 @@ where
}
None => {
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
self.put_insert.record(delta.as_millis() as _);
self.put_insert.record(delta);
}
// Add the new namespace stats to the counts.
@ -215,7 +205,7 @@ mod tests {
count: u64,
) {
let histogram = metrics
.get_instrument::<Metric<U64Histogram>>(metric_name)
.get_instrument::<Metric<DurationHistogram>>(metric_name)
.expect("failed to read metric")
.get_observer(&Attributes::from(&[attr]))
.expect("failed to get observer")
@ -240,13 +230,13 @@ mod tests {
assert!(cache.put_schema(ns.clone(), schema).is_none());
assert_histogram_hit(
&registry,
"namespace_cache_put_duration_ms",
"namespace_cache_put_duration",
("op", "insert"),
1,
);
assert_histogram_hit(
&registry,
"namespace_cache_put_duration_ms",
"namespace_cache_put_duration",
("op", "update"),
0,
);
@ -258,13 +248,13 @@ mod tests {
assert!(cache.put_schema(ns.clone(), schema).is_some());
assert_histogram_hit(
&registry,
"namespace_cache_put_duration_ms",
"namespace_cache_put_duration",
("op", "insert"),
1,
); // Unchanged
assert_histogram_hit(
&registry,
"namespace_cache_put_duration_ms",
"namespace_cache_put_duration",
("op", "update"),
1,
);
@ -276,13 +266,13 @@ mod tests {
assert!(cache.put_schema(ns.clone(), schema).is_some());
assert_histogram_hit(
&registry,
"namespace_cache_put_duration_ms",
"namespace_cache_put_duration",
("op", "insert"),
1,
); // Unchanged
assert_histogram_hit(
&registry,
"namespace_cache_put_duration_ms",
"namespace_cache_put_duration",
("op", "update"),
2,
);
@ -294,13 +284,13 @@ mod tests {
assert!(cache.put_schema(ns.clone(), schema).is_some());
assert_histogram_hit(
&registry,
"namespace_cache_put_duration_ms",
"namespace_cache_put_duration",
("op", "insert"),
1,
); // Unchanged
assert_histogram_hit(
&registry,
"namespace_cache_put_duration_ms",
"namespace_cache_put_duration",
("op", "update"),
3,
);
@ -312,13 +302,13 @@ mod tests {
assert!(cache.put_schema(ns.clone(), schema).is_some());
assert_histogram_hit(
&registry,
"namespace_cache_put_duration_ms",
"namespace_cache_put_duration",
("op", "insert"),
1,
); // Unchanged
assert_histogram_hit(
&registry,
"namespace_cache_put_duration_ms",
"namespace_cache_put_duration",
("op", "update"),
4,
);
@ -330,13 +320,13 @@ mod tests {
assert!(cache.put_schema(ns.clone(), schema).is_some());
assert_histogram_hit(
&registry,
"namespace_cache_put_duration_ms",
"namespace_cache_put_duration",
("op", "insert"),
1,
); // Unchanged
assert_histogram_hit(
&registry,
"namespace_cache_put_duration_ms",
"namespace_cache_put_duration",
("op", "update"),
5,
);
@ -348,13 +338,13 @@ mod tests {
assert!(cache.put_schema(ns, schema).is_some());
assert_histogram_hit(
&registry,
"namespace_cache_put_duration_ms",
"namespace_cache_put_duration",
("op", "insert"),
1,
); // Unchanged
assert_histogram_hit(
&registry,
"namespace_cache_put_duration_ms",
"namespace_cache_put_duration",
("op", "update"),
6,
);
@ -367,13 +357,13 @@ mod tests {
assert!(cache.put_schema(ns.clone(), schema).is_none());
assert_histogram_hit(
&registry,
"namespace_cache_put_duration_ms",
"namespace_cache_put_duration",
("op", "insert"),
2,
);
assert_histogram_hit(
&registry,
"namespace_cache_put_duration_ms",
"namespace_cache_put_duration",
("op", "update"),
6,
);
@ -383,7 +373,7 @@ mod tests {
let _got = cache.get_schema(&ns).expect("should exist");
assert_histogram_hit(
&registry,
"namespace_cache_get_duration_ms",
"namespace_cache_get_duration",
("result", "hit"),
1,
);

View File

@ -1,10 +1,9 @@
//! A representation of a single operation sequencer.
use std::{borrow::Cow, hash::Hash, sync::Arc};
use dml::{DmlMeta, DmlOperation};
use iox_time::{SystemProvider, TimeProvider};
use metric::{Metric, U64Histogram, U64HistogramOptions};
use metric::{DurationHistogram, Metric};
use std::{borrow::Cow, hash::Hash, sync::Arc};
use write_buffer::core::{WriteBufferError, WriteBufferWriting};
/// A sequencer tags an write buffer with a sequencer ID.
@ -14,8 +13,8 @@ pub struct Sequencer<P = SystemProvider> {
inner: Arc<dyn WriteBufferWriting>,
time_provider: P,
enqueue_success: U64Histogram,
enqueue_error: U64Histogram,
enqueue_success: DurationHistogram,
enqueue_error: DurationHistogram,
}
impl Eq for Sequencer {}
@ -35,13 +34,9 @@ impl Hash for Sequencer {
impl Sequencer {
/// Tag `inner` with the specified `id`.
pub fn new(id: usize, inner: Arc<dyn WriteBufferWriting>, metrics: &metric::Registry) -> Self {
let buckets = || {
U64HistogramOptions::new([5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, u64::MAX])
};
let write: Metric<U64Histogram> = metrics.register_metric_with_options(
"sequencer_enqueue_duration_ms",
"sequencer enqueue call duration in milliseconds",
buckets,
let write: Metric<DurationHistogram> = metrics.register_metric(
"sequencer_enqueue_duration",
"sequencer enqueue call duration",
);
let enqueue_success = write.recorder([
@ -79,8 +74,8 @@ impl Sequencer {
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
match &res {
Ok(_) => self.enqueue_success.record(delta.as_millis() as _),
Err(_) => self.enqueue_error.record(delta.as_millis() as _),
Ok(_) => self.enqueue_success.record(delta),
Err(_) => self.enqueue_error.record(delta),
}
}

View File

@ -4,7 +4,7 @@ use dml::DmlOperation;
use hashbrown::HashMap;
use hyper::{Body, Request, StatusCode};
use iox_catalog::{interface::Catalog, mem::MemCatalog};
use metric::{Attributes, Metric, Registry, U64Counter, U64Histogram};
use metric::{Attributes, DurationHistogram, Metric, Registry, U64Counter};
use mutable_batch::MutableBatch;
use router::{
dml_handlers::{
@ -201,7 +201,7 @@ async fn test_write_ok() {
// Ensure the metric instrumentation was hit
let histogram = ctx
.metrics()
.get_instrument::<Metric<U64Histogram>>("dml_handler_write_duration_ms")
.get_instrument::<Metric<DurationHistogram>>("dml_handler_write_duration")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[
("handler", "request"),
@ -224,7 +224,7 @@ async fn test_write_ok() {
let histogram = ctx
.metrics()
.get_instrument::<Metric<U64Histogram>>("sequencer_enqueue_duration_ms")
.get_instrument::<Metric<DurationHistogram>>("sequencer_enqueue_duration")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[
("kafka_partition", "0"),