Merge pull request #5895 from influxdata/dom/ttbr-histogram
refactor: use histogram to record TTBRpull/24376/head
commit
e6700fc249
|
@ -6,7 +6,7 @@ use data_types::{SequenceNumber, ShardId, ShardIndex};
|
|||
use dml::DmlOperation;
|
||||
use futures::{pin_mut, FutureExt, StreamExt};
|
||||
use iox_time::{SystemProvider, TimeProvider};
|
||||
use metric::{Attributes, DurationCounter, DurationGauge, U64Counter};
|
||||
use metric::{Attributes, DurationCounter, DurationHistogram, U64Counter};
|
||||
use observability_deps::tracing::*;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use write_buffer::core::{WriteBufferErrorKind, WriteBufferStreamHandler};
|
||||
|
@ -55,7 +55,7 @@ pub(crate) struct SequencedStreamHandler<I, O, T = SystemProvider> {
|
|||
|
||||
// Metrics
|
||||
time_provider: T,
|
||||
time_to_be_readable: DurationGauge,
|
||||
time_to_be_readable: DurationHistogram,
|
||||
|
||||
/// Duration of time ingest is paused at the request of the LifecycleManager
|
||||
pause_duration: DurationCounter,
|
||||
|
@ -99,10 +99,10 @@ impl<I, O> SequencedStreamHandler<I, O> {
|
|||
) -> Self {
|
||||
// TTBR
|
||||
let time_to_be_readable = metrics
|
||||
.register_metric::<DurationGauge>(
|
||||
.register_metric::<DurationHistogram>(
|
||||
"ingester_ttbr",
|
||||
"duration of time between producer writing to consumer putting into queryable \
|
||||
cache",
|
||||
"distribution of duration between producer writing \
|
||||
to consumer putting into queryable cache",
|
||||
)
|
||||
.recorder(metric_attrs(shard_index, &topic_name, None, false));
|
||||
|
||||
|
@ -399,7 +399,7 @@ something clever.",
|
|||
// we only want to report the TTBR if anything was applied
|
||||
if let Some(delta) = duration_since_production {
|
||||
// Update the TTBR metric before potentially sleeping.
|
||||
self.time_to_be_readable.set(delta);
|
||||
self.time_to_be_readable.record(delta);
|
||||
trace!(
|
||||
kafka_topic=%self.topic_name,
|
||||
shard_index=%self.shard_index,
|
||||
|
@ -516,7 +516,7 @@ mod tests {
|
|||
use dml::{DmlDelete, DmlMeta, DmlWrite};
|
||||
use futures::stream::{self, BoxStream};
|
||||
use iox_time::{SystemProvider, Time};
|
||||
use metric::Metric;
|
||||
use metric::{HistogramObservation, Metric};
|
||||
use mutable_batch_lp::lines_to_batches;
|
||||
use once_cell::sync::Lazy;
|
||||
use test_helpers::timeout::FutureTimeout;
|
||||
|
@ -646,7 +646,7 @@ mod tests {
|
|||
skip_to_oldest_available = $skip_to_oldest_available:expr,
|
||||
stream_ops = $stream_ops:expr, // Ordered set of stream items to feed to the handler
|
||||
sink_rets = $sink_ret:expr, // Ordered set of values to return from the mock op sink
|
||||
want_ttbr = $want_ttbr:literal, // Desired TTBR value in milliseconds
|
||||
want_ttbr = $want_ttbr:literal, // Desired TTBR value in milliseconds (inexact - mapped to a histogram bucket)
|
||||
want_reset = $want_reset:literal, // Desired reset counter value
|
||||
// Optional set of ingest error metric label / values to assert
|
||||
want_err_metrics = [$($metric_name:literal => $metric_count:literal),*],
|
||||
|
@ -725,7 +725,7 @@ mod tests {
|
|||
|
||||
// Assert the TTBR metric value
|
||||
let ttbr = metrics
|
||||
.get_instrument::<Metric<DurationGauge>>("ingester_ttbr")
|
||||
.get_instrument::<Metric<DurationHistogram>>("ingester_ttbr")
|
||||
.expect("did not find ttbr metric")
|
||||
.get_observer(&Attributes::from([
|
||||
("kafka_topic", TEST_TOPIC_NAME.into()),
|
||||
|
@ -733,7 +733,15 @@ mod tests {
|
|||
]))
|
||||
.expect("did not match metric attributes")
|
||||
.fetch();
|
||||
assert_eq!(ttbr, Duration::from_millis($want_ttbr));
|
||||
// Assert an observation exists in the specified bucket if a
|
||||
// non-zero TTBR value was given to check.
|
||||
if $want_ttbr != 0_u64 {
|
||||
assert_histogram_bucket_count(
|
||||
&ttbr,
|
||||
Duration::from_millis($want_ttbr),
|
||||
1
|
||||
);
|
||||
}
|
||||
|
||||
// assert reset counter
|
||||
let reset = metrics
|
||||
|
@ -768,6 +776,21 @@ mod tests {
|
|||
};
|
||||
}
|
||||
|
||||
/// Assert the bucket in `hist` for `want_value` has exactly `hits` number
|
||||
/// of observations.
|
||||
fn assert_histogram_bucket_count(
|
||||
hist: &HistogramObservation<Duration>,
|
||||
want_value: Duration,
|
||||
hits: u64,
|
||||
) {
|
||||
let bucket = hist
|
||||
.buckets
|
||||
.iter()
|
||||
.find(|bucket| want_value <= bucket.le)
|
||||
.unwrap_or_else(|| panic!("no bucket found for metric value {:?}", want_value));
|
||||
assert_eq!(bucket.count, hits);
|
||||
}
|
||||
|
||||
test_stream_handler!(
|
||||
immediate_shutdown,
|
||||
skip_to_oldest_available = false,
|
||||
|
@ -795,6 +818,23 @@ mod tests {
|
|||
}
|
||||
);
|
||||
|
||||
// Single write op applies OK with a large TTBR, ensuring the TTBR
|
||||
// observation is in a high bucket, then shutdown.
|
||||
test_stream_handler!(
|
||||
write_ok_large_ttbr,
|
||||
skip_to_oldest_available = false,
|
||||
stream_ops = vec![
|
||||
vec![Ok(DmlOperation::Write(make_write("bananas", 4242424242)))]
|
||||
],
|
||||
sink_rets = [Ok(DmlApplyAction::Applied(true))],
|
||||
want_ttbr = 4242424242,
|
||||
want_reset = 0,
|
||||
want_err_metrics = [],
|
||||
want_sink = [DmlOperation::Write(op)] => {
|
||||
assert_eq!(op.namespace(), "bananas");
|
||||
}
|
||||
);
|
||||
|
||||
// Single delete op applies OK, then shutdown.
|
||||
test_stream_handler!(
|
||||
delete_ok,
|
||||
|
|
Loading…
Reference in New Issue