From a54326d1ae76c8afaa8eec1e6cd71899d78e54fb Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 25 Nov 2022 15:40:52 +0100 Subject: [PATCH 1/2] refactor: rename Response -> QueryResponse Both more descriptive, and less conflict-y! This seems like a more sensible name for a system with many Response's. --- ingester2/src/query/exec.rs | 4 ++-- ingester2/src/query/instrumentation.rs | 6 +++--- ingester2/src/query/mock_query_exec.rs | 8 ++++---- ingester2/src/query/partition_response.rs | 4 ++-- ingester2/src/query/response.rs | 8 ++++---- ingester2/src/query/trait.rs | 6 +++--- ingester2/src/server/grpc/query.rs | 2 +- 7 files changed, 19 insertions(+), 19 deletions(-) diff --git a/ingester2/src/query/exec.rs b/ingester2/src/query/exec.rs index b37692d7ee..e0e3f3c5df 100644 --- a/ingester2/src/query/exec.rs +++ b/ingester2/src/query/exec.rs @@ -4,7 +4,7 @@ use observability_deps::tracing::*; use trace::span::{Span, SpanRecorder}; use super::{QueryError, QueryExec}; -use crate::query::response::Response; +use crate::query::response::QueryResponse; #[derive(Debug)] pub(crate) struct QueryRunner; @@ -23,7 +23,7 @@ impl QueryExec for QueryRunner { table_id: TableId, columns: Vec, span: Option, - ) -> Result { + ) -> Result { let mut _span_recorder = SpanRecorder::new(span); info!( diff --git a/ingester2/src/query/instrumentation.rs b/ingester2/src/query/instrumentation.rs index 56c6b3eaf7..907f6548ff 100644 --- a/ingester2/src/query/instrumentation.rs +++ b/ingester2/src/query/instrumentation.rs @@ -4,7 +4,7 @@ use iox_time::{SystemProvider, TimeProvider}; use metric::{DurationHistogram, Metric}; use trace::span::Span; -use super::{response::Response, QueryExec}; +use super::{response::QueryResponse, QueryExec}; use crate::query::QueryError; /// An instrumentation decorator over a [`QueryExec`] implementation. @@ -55,7 +55,7 @@ where table_id: TableId, columns: Vec, span: Option, - ) -> Result { + ) -> Result { let t = self.time_provider.now(); let res = self @@ -121,7 +121,7 @@ mod tests { ok, inner = { let stream: PartitionStream = Box::pin(Box::new(futures::stream::iter([]))); - MockQueryExec::default().with_result(Ok(Response::new(stream))) + MockQueryExec::default().with_result(Ok(QueryResponse::new(stream))) }, want_metric_attr = [("result", "success")], want_ret = Ok(_) diff --git a/ingester2/src/query/mock_query_exec.rs b/ingester2/src/query/mock_query_exec.rs index 75c0625b7b..c4e9602eb2 100644 --- a/ingester2/src/query/mock_query_exec.rs +++ b/ingester2/src/query/mock_query_exec.rs @@ -3,15 +3,15 @@ use data_types::{NamespaceId, TableId}; use parking_lot::Mutex; use trace::span::Span; -use super::{response::Response, QueryError, QueryExec}; +use super::{response::QueryResponse, QueryError, QueryExec}; #[derive(Debug, Default)] pub(crate) struct MockQueryExec { - response: Mutex>>, + response: Mutex>>, } impl MockQueryExec { - pub(crate) fn with_result(self, r: Result) -> Self { + pub(crate) fn with_result(self, r: Result) -> Self { *self.response.lock() = Some(r); self } @@ -25,7 +25,7 @@ impl QueryExec for MockQueryExec { _table_id: TableId, _columns: Vec, _span: Option, - ) -> Result { + ) -> Result { self.response .lock() .take() diff --git a/ingester2/src/query/partition_response.rs b/ingester2/src/query/partition_response.rs index 01fa0b9322..fcee6b7d77 100644 --- a/ingester2/src/query/partition_response.rs +++ b/ingester2/src/query/partition_response.rs @@ -1,6 +1,6 @@ -//! The per-partition data nested in a query [`Response`]. +//! The per-partition data nested in a query [`QueryResponse`]. //! -//! [`Response`]: super::response::Response +//! [`QueryResponse`]: super::response::QueryResponse use std::pin::Pin; diff --git a/ingester2/src/query/response.rs b/ingester2/src/query/response.rs index f298139dfd..08e47e7117 100644 --- a/ingester2/src/query/response.rs +++ b/ingester2/src/query/response.rs @@ -18,12 +18,12 @@ pub(crate) type PartitionStream = /// /// The data structure is constructed to allow lazy/streaming/pull-based data /// sourcing.. -pub(crate) struct Response { +pub(crate) struct QueryResponse { /// Stream of partitions. partitions: PartitionStream, } -impl std::fmt::Debug for Response { +impl std::fmt::Debug for QueryResponse { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Response") .field("partitions", &"") @@ -31,7 +31,7 @@ impl std::fmt::Debug for Response { } } -impl Response { +impl QueryResponse { /// Make a response pub(crate) fn new(partitions: PartitionStream) -> Self { Self { partitions } @@ -42,7 +42,7 @@ impl Response { self.partitions } - /// Reduce the [`Response`] to a set of [`RecordBatch`]. + /// Reduce the [`QueryResponse`] to a set of [`RecordBatch`]. pub(crate) async fn into_record_batches(self) -> Result, ArrowError> { self.into_partition_stream() .map_ok(|partition| { diff --git a/ingester2/src/query/trait.rs b/ingester2/src/query/trait.rs index a5dd02ef34..aede8f3595 100644 --- a/ingester2/src/query/trait.rs +++ b/ingester2/src/query/trait.rs @@ -5,7 +5,7 @@ use data_types::{NamespaceId, TableId}; use thiserror::Error; use trace::span::Span; -use super::response::Response; +use super::response::QueryResponse; #[derive(Debug, Error)] #[allow(missing_copy_implementations)] @@ -25,7 +25,7 @@ pub(crate) trait QueryExec: Send + Sync + Debug { table_id: TableId, columns: Vec, span: Option, - ) -> Result; + ) -> Result; } #[async_trait] @@ -39,7 +39,7 @@ where table_id: TableId, columns: Vec, span: Option, - ) -> Result { + ) -> Result { self.deref() .query_exec(namespace_id, table_id, columns, span) .await diff --git a/ingester2/src/server/grpc/query.rs b/ingester2/src/server/grpc/query.rs index f80b7c115e..7ed2d0fc5d 100644 --- a/ingester2/src/server/grpc/query.rs +++ b/ingester2/src/server/grpc/query.rs @@ -20,7 +20,7 @@ use tokio::sync::{Semaphore, TryAcquireError}; use tonic::{Request, Response, Streaming}; use trace::{ctx::SpanContext, span::SpanExt}; -use crate::query::{response::Response as QueryResponse, QueryError, QueryExec}; +use crate::query::{response::QueryResponse, QueryError, QueryExec}; /// Error states for the query RPC handler. /// From 443ec49f24fecd379c2e18f4ff943ce3291ae42e Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 25 Nov 2022 16:21:17 +0100 Subject: [PATCH 2/2] feat: query exec tracing spans Implement a QueryExec decorator that emits named tracing spans covering the inner delegate's query_exec() execution. Captures the result, emitting the error string in the span on failure. --- ingester2/src/query/instrumentation.rs | 1 + ingester2/src/query/mod.rs | 1 + ingester2/src/query/tracing.rs | 143 +++++++++++++++++++++++++ 3 files changed, 145 insertions(+) create mode 100644 ingester2/src/query/tracing.rs diff --git a/ingester2/src/query/instrumentation.rs b/ingester2/src/query/instrumentation.rs index 907f6548ff..bb7eea293e 100644 --- a/ingester2/src/query/instrumentation.rs +++ b/ingester2/src/query/instrumentation.rs @@ -49,6 +49,7 @@ where T: QueryExec, P: TimeProvider, { + #[inline(always)] async fn query_exec( &self, namespace_id: NamespaceId, diff --git a/ingester2/src/query/mod.rs b/ingester2/src/query/mod.rs index ec414418c5..c5d8d2a9d5 100644 --- a/ingester2/src/query/mod.rs +++ b/ingester2/src/query/mod.rs @@ -9,6 +9,7 @@ pub(crate) mod response; pub(crate) mod exec; pub(crate) mod instrumentation; +pub(crate) mod tracing; #[cfg(test)] pub(crate) mod mock_query_exec; diff --git a/ingester2/src/query/tracing.rs b/ingester2/src/query/tracing.rs new file mode 100644 index 0000000000..7a323a2c0c --- /dev/null +++ b/ingester2/src/query/tracing.rs @@ -0,0 +1,143 @@ +use std::borrow::Cow; + +use async_trait::async_trait; +use data_types::{NamespaceId, TableId}; +use trace::span::{Span, SpanRecorder}; + +use super::{response::QueryResponse, QueryExec}; +use crate::query::QueryError; + +/// An tracing decorator over a [`QueryExec`] implementation. +/// +/// This wrapper emits child tracing spans covering the execution of the inner +/// [`QueryExec::query_exec()`] call. +/// +/// Constructing this decorator is cheap. +#[derive(Debug)] +pub(crate) struct QueryExecTracing { + inner: T, + name: Cow<'static, str>, +} + +impl QueryExecTracing { + pub(crate) fn new(inner: T, name: impl Into>) -> Self { + Self { + inner, + name: name.into(), + } + } +} + +#[async_trait] +impl QueryExec for QueryExecTracing +where + T: QueryExec, +{ + #[inline(always)] + async fn query_exec( + &self, + namespace_id: NamespaceId, + table_id: TableId, + columns: Vec, + span: Option, + ) -> Result { + let span = span.map(|s| s.child(self.name.clone())); + let mut recorder = SpanRecorder::new(span.clone()); + + match self + .inner + .query_exec(namespace_id, table_id, columns, span) + .await + { + Ok(v) => { + recorder.ok("query_exec complete"); + Ok(v) + } + Err(e) => { + recorder.error(e.to_string()); + Err(e) + } + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use assert_matches::assert_matches; + use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector, TraceCollector}; + + use crate::query::{mock_query_exec::MockQueryExec, response::PartitionStream}; + + use super::*; + + #[track_caller] + fn assert_trace(name: impl Into, status: SpanStatus, traces: &dyn TraceCollector) { + let traces = traces + .as_any() + .downcast_ref::() + .expect("unexpected collector impl"); + + let name = name.into(); + let span = traces + .spans() + .into_iter() + .find(|s| s.name == name) + .unwrap_or_else(|| panic!("tracing span {name} not found")); + + assert_eq!( + span.status, status, + "span status does not match expected value" + ); + } + + #[tokio::test] + async fn test_ok() { + let stream: PartitionStream = Box::pin(Box::new(futures::stream::iter([]))); + let mock = MockQueryExec::default().with_result(Ok(QueryResponse::new(stream))); + + let traces: Arc = Arc::new(RingBufferTraceCollector::new(5)); + let span = SpanContext::new(Arc::clone(&traces)); + + // Drive the trace wrapper + let _ = QueryExecTracing::new(mock, "bananas") + .query_exec( + NamespaceId::new(42), + TableId::new(24), + vec![], + Some(span.child("root span")), + ) + .await + .expect("wrapper should not modify result"); + + // Assert the trace showed up. + assert_trace("bananas", SpanStatus::Ok, &*traces); + } + + #[tokio::test] + async fn test_err() { + let mock = MockQueryExec::default() + .with_result(Err(QueryError::NamespaceNotFound(NamespaceId::new(42)))); + + let traces: Arc = Arc::new(RingBufferTraceCollector::new(5)); + let span = SpanContext::new(Arc::clone(&traces)); + + // Drive the trace wrapper + let got = QueryExecTracing::new(mock, "bananas") + .query_exec( + NamespaceId::new(42), + TableId::new(24), + vec![], + Some(span.child("root span")), + ) + .await + .expect_err("wrapper should not modify result"); + assert_matches!(got, QueryError::NamespaceNotFound(ns) => { + assert_eq!(ns, NamespaceId::new(42)); + }); + + // Assert the trace showed up. + assert_trace("bananas", SpanStatus::Err, &*traces); + } +}