diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs index 3933acfd6a..8f9aec897b 100644 --- a/ingester/src/stream_handler/handler.rs +++ b/ingester/src/stream_handler/handler.rs @@ -6,7 +6,7 @@ use data_types::{SequenceNumber, ShardId, ShardIndex}; use dml::DmlOperation; use futures::{pin_mut, FutureExt, StreamExt}; use iox_time::{SystemProvider, TimeProvider}; -use metric::{Attributes, DurationCounter, DurationHistogram, U64Counter}; +use metric::{Attributes, DurationCounter, DurationGauge, U64Counter}; use observability_deps::tracing::*; use tokio_util::sync::CancellationToken; use write_buffer::core::{WriteBufferErrorKind, WriteBufferStreamHandler}; @@ -55,7 +55,7 @@ pub(crate) struct SequencedStreamHandler { // Metrics time_provider: T, - time_to_be_readable: DurationHistogram, + time_to_be_readable: DurationGauge, /// Duration of time ingest is paused at the request of the LifecycleManager pause_duration: DurationCounter, @@ -99,11 +99,10 @@ impl SequencedStreamHandler { ) -> Self { // TTBR let time_to_be_readable = metrics - .register_metric::( + .register_metric::( "ingester_ttbr", - "distribution of duration of Time To Become Readable (TTBR), the \ - time between write to a router and becoming readable \ - for query in the ingester", + "Time To Become Readable (TTBR), the time between \ + a write to a router and becoming readable for query in the ingester", ) .recorder(metric_attrs(shard_index, &topic_name, None, false)); @@ -402,7 +401,7 @@ something clever.", // we only want to report the TTBR if anything was applied if let Some(delta) = duration_since_production { // Update the TTBR metric before potentially sleeping. - self.time_to_be_readable.record(delta); + self.time_to_be_readable.set(delta); trace!( kafka_topic=%self.topic_name, shard_index=%self.shard_index, @@ -519,7 +518,7 @@ mod tests { use dml::{DmlDelete, DmlMeta, DmlWrite}; use futures::stream::{self, BoxStream}; use iox_time::{SystemProvider, Time}; - use metric::{HistogramObservation, Metric}; + use metric::Metric; use mutable_batch_lp::lines_to_batches; use once_cell::sync::Lazy; use test_helpers::timeout::FutureTimeout; @@ -649,7 +648,7 @@ mod tests { skip_to_oldest_available = $skip_to_oldest_available:expr, stream_ops = $stream_ops:expr, // Ordered set of stream items to feed to the handler sink_rets = $sink_ret:expr, // Ordered set of values to return from the mock op sink - want_ttbr = $want_ttbr:literal, // Desired TTBR value in milliseconds (inexact - mapped to a histogram bucket) + want_ttbr = $want_ttbr:literal, // Desired TTBR value in milliseconds want_reset = $want_reset:literal, // Desired reset counter value // Optional set of ingest error metric label / values to assert want_err_metrics = [$($metric_name:literal => $metric_count:literal),*], @@ -728,7 +727,7 @@ mod tests { // Assert the TTBR metric value let ttbr = metrics - .get_instrument::>("ingester_ttbr") + .get_instrument::>("ingester_ttbr") .expect("did not find ttbr metric") .get_observer(&Attributes::from([ ("kafka_topic", TEST_TOPIC_NAME.into()), @@ -736,15 +735,7 @@ mod tests { ])) .expect("did not match metric attributes") .fetch(); - // Assert an observation exists in the specified bucket if a - // non-zero TTBR value was given to check. - if $want_ttbr != 0_u64 { - assert_histogram_bucket_count( - &ttbr, - Duration::from_millis($want_ttbr), - 1 - ); - } + assert_eq!(ttbr, Duration::from_millis($want_ttbr)); // assert reset counter let reset = metrics @@ -779,21 +770,6 @@ mod tests { }; } - /// Assert the bucket in `hist` for `want_value` has exactly `hits` number - /// of observations. - fn assert_histogram_bucket_count( - hist: &HistogramObservation, - want_value: Duration, - hits: u64, - ) { - let bucket = hist - .buckets - .iter() - .find(|bucket| want_value <= bucket.le) - .unwrap_or_else(|| panic!("no bucket found for metric value {:?}", want_value)); - assert_eq!(bucket.count, hits); - } - test_stream_handler!( immediate_shutdown, skip_to_oldest_available = false, @@ -821,23 +797,6 @@ mod tests { } ); - // Single write op applies OK with a large TTBR, ensuring the TTBR - // observation is in a high bucket, then shutdown. - test_stream_handler!( - write_ok_large_ttbr, - skip_to_oldest_available = false, - stream_ops = vec![ - vec![Ok(DmlOperation::Write(make_write("bananas", 4242424242)))] - ], - sink_rets = [Ok(DmlApplyAction::Applied(true))], - want_ttbr = 4242424242, - want_reset = 0, - want_err_metrics = [], - want_sink = [DmlOperation::Write(op)] => { - assert_eq!(op.namespace(), "bananas"); - } - ); - // Single delete op applies OK, then shutdown. test_stream_handler!( delete_ok,