feat: emit tracing span for op apply
This commit uses the tracing metadata within the DmlOperation to emit a tracing span from the ingester covering the DmlSink::apply() operation.pull/24376/head
parent
f6c65f52a3
commit
091640bb23
|
@ -1,3 +1,4 @@
|
|||
//! Instrumentation for [`DmlSink`] implementations.
|
||||
use std::fmt::Debug;
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
@ -5,6 +6,7 @@ use data_types2::KafkaPartition;
|
|||
use dml::DmlOperation;
|
||||
use metric::{Attributes, U64Counter, U64Gauge, U64Histogram, U64HistogramOptions};
|
||||
use time::{SystemProvider, TimeProvider};
|
||||
use trace::span::SpanRecorder;
|
||||
|
||||
use super::DmlSink;
|
||||
|
||||
|
@ -21,7 +23,7 @@ pub trait WatermarkFetcher: Debug + Send + Sync {
|
|||
|
||||
/// A [`SinkInstrumentation`] decorates a [`DmlSink`] implementation and records
|
||||
/// write buffer metrics and the latency of the decorated [`DmlSink::apply()`]
|
||||
/// call.
|
||||
/// call, and emits a tracing span covering the call duration.
|
||||
///
|
||||
/// # Panics
|
||||
///
|
||||
|
@ -216,6 +218,12 @@ where
|
|||
);
|
||||
}
|
||||
|
||||
// Create a tracing span covering the inner DmlSink call.
|
||||
let mut span_recorder = SpanRecorder::new(
|
||||
meta.span_context()
|
||||
.map(|parent| parent.child("DmlSink::apply()")),
|
||||
);
|
||||
|
||||
// Call into the inner handler to process the op and calculate the call
|
||||
// latency.
|
||||
let started_at = self.time_provider.now();
|
||||
|
@ -225,8 +233,14 @@ where
|
|||
// latency.
|
||||
if let Some(delta) = self.time_provider.now().checked_duration_since(started_at) {
|
||||
let metric = match &res {
|
||||
Ok(_) => &self.op_apply_success_ms,
|
||||
Err(_) => &self.op_apply_error_ms,
|
||||
Ok(_) => {
|
||||
span_recorder.ok("success");
|
||||
&self.op_apply_success_ms
|
||||
}
|
||||
Err(e) => {
|
||||
span_recorder.error(e.to_string());
|
||||
&self.op_apply_error_ms
|
||||
}
|
||||
};
|
||||
metric.record(delta.as_millis() as _);
|
||||
}
|
||||
|
@ -238,12 +252,15 @@ where
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use data_types2::Sequence;
|
||||
use dml::{DmlMeta, DmlWrite};
|
||||
use metric::{Metric, MetricObserver, Observation};
|
||||
use mutable_batch_lp::lines_to_batches;
|
||||
use time::Time;
|
||||
use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector, TraceCollector};
|
||||
|
||||
use crate::stream_handler::{
|
||||
mock_sink::MockDmlSink, mock_watermark_fetcher::MockWatermarkFetcher,
|
||||
|
@ -309,15 +326,36 @@ mod tests {
|
|||
instrumentation.apply(op).await
|
||||
}
|
||||
|
||||
fn assert_trace(traces: Arc<dyn TraceCollector>, status: SpanStatus) {
|
||||
let traces = traces
|
||||
.as_any()
|
||||
.downcast_ref::<RingBufferTraceCollector>()
|
||||
.expect("unexpected collector impl");
|
||||
|
||||
let span = traces
|
||||
.spans()
|
||||
.into_iter()
|
||||
.find(|s| s.name == "DmlSink::apply()")
|
||||
.expect("tracing span not found");
|
||||
|
||||
assert_eq!(
|
||||
span.status, status,
|
||||
"span status does not match expected value"
|
||||
);
|
||||
}
|
||||
|
||||
// This test asserts the various metrics are set in the happy path.
|
||||
#[tokio::test]
|
||||
async fn test_call_inner_ok() {
|
||||
let metrics = metric::Registry::default();
|
||||
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
|
||||
let span = SpanContext::new(Arc::clone(&traces));
|
||||
|
||||
let meta = DmlMeta::sequenced(
|
||||
// Op is offset 100 for sequencer 42
|
||||
Sequence::new(SEQUENCER_ID, 100),
|
||||
*TEST_TIME,
|
||||
None,
|
||||
Some(span),
|
||||
4242,
|
||||
);
|
||||
let op = make_write(meta);
|
||||
|
@ -365,6 +403,9 @@ mod tests {
|
|||
let hits: u64 = h.buckets.iter().map(|b| b.count).sum();
|
||||
assert_eq!(hits, 1);
|
||||
});
|
||||
|
||||
// Assert the trace span was recorded
|
||||
assert_trace(traces, SpanStatus::Ok);
|
||||
}
|
||||
|
||||
// This test asserts the various metrics are set when the inner handler
|
||||
|
@ -372,11 +413,14 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_call_inner_error() {
|
||||
let metrics = metric::Registry::default();
|
||||
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
|
||||
let span = SpanContext::new(Arc::clone(&traces));
|
||||
|
||||
let meta = DmlMeta::sequenced(
|
||||
// Op is offset 100 for sequencer 42
|
||||
Sequence::new(SEQUENCER_ID, 100),
|
||||
*TEST_TIME,
|
||||
None,
|
||||
Some(span),
|
||||
4242,
|
||||
);
|
||||
let op = make_write(meta);
|
||||
|
@ -430,17 +474,23 @@ mod tests {
|
|||
let hits: u64 = h.buckets.iter().map(|b| b.count).sum();
|
||||
assert_eq!(hits, 1);
|
||||
});
|
||||
|
||||
// Assert the trace span was recorded
|
||||
assert_trace(traces, SpanStatus::Err);
|
||||
}
|
||||
|
||||
// If there's no high watermark available, the write should still succeed.
|
||||
#[tokio::test]
|
||||
async fn test_no_high_watermark() {
|
||||
let metrics = metric::Registry::default();
|
||||
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
|
||||
let span = SpanContext::new(Arc::clone(&traces));
|
||||
|
||||
let meta = DmlMeta::sequenced(
|
||||
// Op is offset 100 for sequencer 42
|
||||
Sequence::new(SEQUENCER_ID, 100),
|
||||
*TEST_TIME,
|
||||
None,
|
||||
Some(span),
|
||||
4242,
|
||||
);
|
||||
let op = make_write(meta);
|
||||
|
@ -488,6 +538,9 @@ mod tests {
|
|||
let hits: u64 = h.buckets.iter().map(|b| b.count).sum();
|
||||
assert_eq!(hits, 1);
|
||||
});
|
||||
|
||||
// Assert the trace span was recorded
|
||||
assert_trace(traces, SpanStatus::Ok);
|
||||
}
|
||||
|
||||
// If the high watermark is less than the current sequence number (for
|
||||
|
@ -495,11 +548,14 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_high_watermark_less_than_current_op() {
|
||||
let metrics = metric::Registry::default();
|
||||
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
|
||||
let span = SpanContext::new(Arc::clone(&traces));
|
||||
|
||||
let meta = DmlMeta::sequenced(
|
||||
// Op is offset 100 for sequencer 42
|
||||
Sequence::new(SEQUENCER_ID, 100),
|
||||
*TEST_TIME,
|
||||
None,
|
||||
Some(span),
|
||||
4242,
|
||||
);
|
||||
let op = make_write(meta);
|
||||
|
@ -547,6 +603,9 @@ mod tests {
|
|||
let hits: u64 = h.buckets.iter().map(|b| b.count).sum();
|
||||
assert_eq!(hits, 1);
|
||||
});
|
||||
|
||||
// Assert the trace span was recorded
|
||||
assert_trace(traces, SpanStatus::Ok);
|
||||
}
|
||||
|
||||
// The missing metadata can cause various panics, but the bytes_read is the
|
||||
|
|
Loading…
Reference in New Issue