diff --git a/ingester2/src/query/instrumentation.rs b/ingester2/src/query/instrumentation.rs index 907f6548ff..bb7eea293e 100644 --- a/ingester2/src/query/instrumentation.rs +++ b/ingester2/src/query/instrumentation.rs @@ -49,6 +49,7 @@ where T: QueryExec, P: TimeProvider, { + #[inline(always)] async fn query_exec( &self, namespace_id: NamespaceId, diff --git a/ingester2/src/query/mod.rs b/ingester2/src/query/mod.rs index ec414418c5..c5d8d2a9d5 100644 --- a/ingester2/src/query/mod.rs +++ b/ingester2/src/query/mod.rs @@ -9,6 +9,7 @@ pub(crate) mod response; pub(crate) mod exec; pub(crate) mod instrumentation; +pub(crate) mod tracing; #[cfg(test)] pub(crate) mod mock_query_exec; diff --git a/ingester2/src/query/tracing.rs b/ingester2/src/query/tracing.rs new file mode 100644 index 0000000000..7a323a2c0c --- /dev/null +++ b/ingester2/src/query/tracing.rs @@ -0,0 +1,143 @@ +use std::borrow::Cow; + +use async_trait::async_trait; +use data_types::{NamespaceId, TableId}; +use trace::span::{Span, SpanRecorder}; + +use super::{response::QueryResponse, QueryExec}; +use crate::query::QueryError; + +/// An tracing decorator over a [`QueryExec`] implementation. +/// +/// This wrapper emits child tracing spans covering the execution of the inner +/// [`QueryExec::query_exec()`] call. +/// +/// Constructing this decorator is cheap. +#[derive(Debug)] +pub(crate) struct QueryExecTracing { + inner: T, + name: Cow<'static, str>, +} + +impl QueryExecTracing { + pub(crate) fn new(inner: T, name: impl Into>) -> Self { + Self { + inner, + name: name.into(), + } + } +} + +#[async_trait] +impl QueryExec for QueryExecTracing +where + T: QueryExec, +{ + #[inline(always)] + async fn query_exec( + &self, + namespace_id: NamespaceId, + table_id: TableId, + columns: Vec, + span: Option, + ) -> Result { + let span = span.map(|s| s.child(self.name.clone())); + let mut recorder = SpanRecorder::new(span.clone()); + + match self + .inner + .query_exec(namespace_id, table_id, columns, span) + .await + { + Ok(v) => { + recorder.ok("query_exec complete"); + Ok(v) + } + Err(e) => { + recorder.error(e.to_string()); + Err(e) + } + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use assert_matches::assert_matches; + use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector, TraceCollector}; + + use crate::query::{mock_query_exec::MockQueryExec, response::PartitionStream}; + + use super::*; + + #[track_caller] + fn assert_trace(name: impl Into, status: SpanStatus, traces: &dyn TraceCollector) { + let traces = traces + .as_any() + .downcast_ref::() + .expect("unexpected collector impl"); + + let name = name.into(); + let span = traces + .spans() + .into_iter() + .find(|s| s.name == name) + .unwrap_or_else(|| panic!("tracing span {name} not found")); + + assert_eq!( + span.status, status, + "span status does not match expected value" + ); + } + + #[tokio::test] + async fn test_ok() { + let stream: PartitionStream = Box::pin(Box::new(futures::stream::iter([]))); + let mock = MockQueryExec::default().with_result(Ok(QueryResponse::new(stream))); + + let traces: Arc = Arc::new(RingBufferTraceCollector::new(5)); + let span = SpanContext::new(Arc::clone(&traces)); + + // Drive the trace wrapper + let _ = QueryExecTracing::new(mock, "bananas") + .query_exec( + NamespaceId::new(42), + TableId::new(24), + vec![], + Some(span.child("root span")), + ) + .await + .expect("wrapper should not modify result"); + + // Assert the trace showed up. + assert_trace("bananas", SpanStatus::Ok, &*traces); + } + + #[tokio::test] + async fn test_err() { + let mock = MockQueryExec::default() + .with_result(Err(QueryError::NamespaceNotFound(NamespaceId::new(42)))); + + let traces: Arc = Arc::new(RingBufferTraceCollector::new(5)); + let span = SpanContext::new(Arc::clone(&traces)); + + // Drive the trace wrapper + let got = QueryExecTracing::new(mock, "bananas") + .query_exec( + NamespaceId::new(42), + TableId::new(24), + vec![], + Some(span.child("root span")), + ) + .await + .expect_err("wrapper should not modify result"); + assert_matches!(got, QueryError::NamespaceNotFound(ns) => { + assert_eq!(ns, NamespaceId::new(42)); + }); + + // Assert the trace showed up. + assert_trace("bananas", SpanStatus::Err, &*traces); + } +}