From c63312ce125d39415e78c9322f721ea322cf2869 Mon Sep 17 00:00:00 2001
From: Dom Dwyer <dom@itsallbroken.com>
Date: Tue, 18 Oct 2022 16:29:09 +0200
Subject: [PATCH] refactor: use histogram to record TTBR

Changes the TTBR metric from a gauge to a histogram so that observations
maintain a time dimension.
---
 ingester/src/stream_handler/handler.rs | 60 +++++++++++++++++++++-----
 1 file changed, 50 insertions(+), 10 deletions(-)

diff --git a/ingester/src/stream_handler/handler.rs b/ingester/src/stream_handler/handler.rs
index f1d31b08d5..1b674fd2ea 100644
--- a/ingester/src/stream_handler/handler.rs
+++ b/ingester/src/stream_handler/handler.rs
@@ -6,7 +6,7 @@ use data_types::{SequenceNumber, ShardId, ShardIndex};
 use dml::DmlOperation;
 use futures::{pin_mut, FutureExt, StreamExt};
 use iox_time::{SystemProvider, TimeProvider};
-use metric::{Attributes, DurationCounter, DurationGauge, U64Counter};
+use metric::{Attributes, DurationCounter, DurationHistogram, U64Counter};
 use observability_deps::tracing::*;
 use tokio_util::sync::CancellationToken;
 use write_buffer::core::{WriteBufferErrorKind, WriteBufferStreamHandler};
@@ -55,7 +55,7 @@ pub(crate) struct SequencedStreamHandler<I, O, T = SystemProvider> {
 
     // Metrics
     time_provider: T,
-    time_to_be_readable: DurationGauge,
+    time_to_be_readable: DurationHistogram,
 
     /// Duration of time ingest is paused at the request of the LifecycleManager
     pause_duration: DurationCounter,
@@ -99,10 +99,10 @@ impl<I, O> SequencedStreamHandler<I, O> {
     ) -> Self {
         // TTBR
         let time_to_be_readable = metrics
-            .register_metric::<DurationGauge>(
+            .register_metric::<DurationHistogram>(
                 "ingester_ttbr",
-                "duration of time between producer writing to consumer putting into queryable \
-                 cache",
+                "distribution of duration between producer writing \
+                to consumer putting into queryable cache",
             )
             .recorder(metric_attrs(shard_index, &topic_name, None, false));
 
@@ -399,7 +399,7 @@ something clever.",
                     // we only want to report the TTBR if anything was applied
                     if let Some(delta) = duration_since_production {
                         // Update the TTBR metric before potentially sleeping.
-                        self.time_to_be_readable.set(delta);
+                        self.time_to_be_readable.record(delta);
                         trace!(
                             kafka_topic=%self.topic_name,
                             shard_index=%self.shard_index,
@@ -516,7 +516,7 @@ mod tests {
     use dml::{DmlDelete, DmlMeta, DmlWrite};
     use futures::stream::{self, BoxStream};
     use iox_time::{SystemProvider, Time};
-    use metric::Metric;
+    use metric::{HistogramObservation, Metric};
     use mutable_batch_lp::lines_to_batches;
     use once_cell::sync::Lazy;
     use test_helpers::timeout::FutureTimeout;
@@ -646,7 +646,7 @@ mod tests {
             skip_to_oldest_available = $skip_to_oldest_available:expr,
             stream_ops = $stream_ops:expr,  // Ordered set of stream items to feed to the handler
             sink_rets = $sink_ret:expr,     // Ordered set of values to return from the mock op sink
-            want_ttbr = $want_ttbr:literal, // Desired TTBR value in milliseconds
+            want_ttbr = $want_ttbr:literal, // Desired TTBR value in milliseconds (inexact - mapped to a histogram bucket)
             want_reset = $want_reset:literal,  // Desired reset counter value
             // Optional set of ingest error metric label / values to assert
             want_err_metrics = [$($metric_name:literal => $metric_count:literal),*],
@@ -725,7 +725,7 @@ mod tests {
 
                     // Assert the TTBR metric value
                     let ttbr = metrics
-                        .get_instrument::<Metric<DurationGauge>>("ingester_ttbr")
+                        .get_instrument::<Metric<DurationHistogram>>("ingester_ttbr")
                         .expect("did not find ttbr metric")
                         .get_observer(&Attributes::from([
                             ("kafka_topic", TEST_TOPIC_NAME.into()),
@@ -733,7 +733,15 @@ mod tests {
                         ]))
                         .expect("did not match metric attributes")
                         .fetch();
-                    assert_eq!(ttbr, Duration::from_millis($want_ttbr));
+                    // Assert an observation exists in the specified bucket if a
+                    // non-zero TTBR value was given to check.
+                    if $want_ttbr != 0_u64 {
+                        assert_histogram_bucket_count(
+                            &ttbr,
+                            Duration::from_millis($want_ttbr),
+                            1
+                        );
+                    }
 
                     // assert reset counter
                     let reset = metrics
@@ -768,6 +776,21 @@ mod tests {
         };
     }
 
+    /// Assert the bucket in `hist` for `want_value` has exactly `hits` number
+    /// of observations.
+    fn assert_histogram_bucket_count(
+        hist: &HistogramObservation<Duration>,
+        want_value: Duration,
+        hits: u64,
+    ) {
+        let bucket = hist
+            .buckets
+            .iter()
+            .find(|bucket| want_value <= bucket.le)
+            .unwrap_or_else(|| panic!("no bucket found for metric value {:?}", want_value));
+        assert_eq!(bucket.count, hits);
+    }
+
     test_stream_handler!(
         immediate_shutdown,
         skip_to_oldest_available = false,
@@ -795,6 +818,23 @@ mod tests {
         }
     );
 
+    // Single write op applies OK with a large TTBR, ensuring the TTBR
+    // observation is in a high bucket, then shutdown.
+    test_stream_handler!(
+        write_ok_large_ttbr,
+        skip_to_oldest_available = false,
+        stream_ops = vec![
+            vec![Ok(DmlOperation::Write(make_write("bananas", 4242424242)))]
+        ],
+        sink_rets = [Ok(DmlApplyAction::Applied(true))],
+        want_ttbr = 4242424242,
+        want_reset = 0,
+        want_err_metrics = [],
+        want_sink = [DmlOperation::Write(op)] => {
+            assert_eq!(op.namespace(), "bananas");
+        }
+    );
+
     // Single delete op applies OK, then shutdown.
     test_stream_handler!(
         delete_ok,