Merge pull request #6237 from influxdata/dom/query-tracing-decorator

feat(ingester2): query tracing decorator
pull/24376/head
kodiakhq[bot] 2022-11-28 15:26:50 +00:00 committed by GitHub
commit 77bc0a1042
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 164 additions and 19 deletions

View File

@ -4,7 +4,7 @@ use observability_deps::tracing::*;
use trace::span::{Span, SpanRecorder};
use super::{QueryError, QueryExec};
use crate::query::response::Response;
use crate::query::response::QueryResponse;
#[derive(Debug)]
pub(crate) struct QueryRunner;
@ -23,7 +23,7 @@ impl QueryExec for QueryRunner {
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<Response, QueryError> {
) -> Result<QueryResponse, QueryError> {
let mut _span_recorder = SpanRecorder::new(span);
info!(

View File

@ -4,7 +4,7 @@ use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric};
use trace::span::Span;
use super::{response::Response, QueryExec};
use super::{response::QueryResponse, QueryExec};
use crate::query::QueryError;
/// An instrumentation decorator over a [`QueryExec`] implementation.
@ -49,13 +49,14 @@ where
T: QueryExec,
P: TimeProvider,
{
#[inline(always)]
async fn query_exec(
&self,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<Response, QueryError> {
) -> Result<QueryResponse, QueryError> {
let t = self.time_provider.now();
let res = self
@ -121,7 +122,7 @@ mod tests {
ok,
inner = {
let stream: PartitionStream = Box::pin(Box::new(futures::stream::iter([])));
MockQueryExec::default().with_result(Ok(Response::new(stream)))
MockQueryExec::default().with_result(Ok(QueryResponse::new(stream)))
},
want_metric_attr = [("result", "success")],
want_ret = Ok(_)

View File

@ -3,15 +3,15 @@ use data_types::{NamespaceId, TableId};
use parking_lot::Mutex;
use trace::span::Span;
use super::{response::Response, QueryError, QueryExec};
use super::{response::QueryResponse, QueryError, QueryExec};
#[derive(Debug, Default)]
pub(crate) struct MockQueryExec {
response: Mutex<Option<Result<Response, QueryError>>>,
response: Mutex<Option<Result<QueryResponse, QueryError>>>,
}
impl MockQueryExec {
pub(crate) fn with_result(self, r: Result<Response, QueryError>) -> Self {
pub(crate) fn with_result(self, r: Result<QueryResponse, QueryError>) -> Self {
*self.response.lock() = Some(r);
self
}
@ -25,7 +25,7 @@ impl QueryExec for MockQueryExec {
_table_id: TableId,
_columns: Vec<String>,
_span: Option<Span>,
) -> Result<Response, QueryError> {
) -> Result<QueryResponse, QueryError> {
self.response
.lock()
.take()

View File

@ -9,6 +9,7 @@ pub(crate) mod response;
pub(crate) mod exec;
pub(crate) mod instrumentation;
pub(crate) mod tracing;
#[cfg(test)]
pub(crate) mod mock_query_exec;

View File

@ -1,6 +1,6 @@
//! The per-partition data nested in a query [`Response`].
//! The per-partition data nested in a query [`QueryResponse`].
//!
//! [`Response`]: super::response::Response
//! [`QueryResponse`]: super::response::QueryResponse
use std::pin::Pin;

View File

@ -18,12 +18,12 @@ pub(crate) type PartitionStream =
///
/// The data structure is constructed to allow lazy/streaming/pull-based data
/// sourcing..
pub(crate) struct Response {
pub(crate) struct QueryResponse {
/// Stream of partitions.
partitions: PartitionStream,
}
impl std::fmt::Debug for Response {
impl std::fmt::Debug for QueryResponse {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Response")
.field("partitions", &"<PARTITION STREAM>")
@ -31,7 +31,7 @@ impl std::fmt::Debug for Response {
}
}
impl Response {
impl QueryResponse {
/// Make a response
pub(crate) fn new(partitions: PartitionStream) -> Self {
Self { partitions }
@ -42,7 +42,7 @@ impl Response {
self.partitions
}
/// Reduce the [`Response`] to a set of [`RecordBatch`].
/// Reduce the [`QueryResponse`] to a set of [`RecordBatch`].
pub(crate) async fn into_record_batches(self) -> Result<Vec<RecordBatch>, ArrowError> {
self.into_partition_stream()
.map_ok(|partition| {

View File

@ -0,0 +1,143 @@
use std::borrow::Cow;
use async_trait::async_trait;
use data_types::{NamespaceId, TableId};
use trace::span::{Span, SpanRecorder};
use super::{response::QueryResponse, QueryExec};
use crate::query::QueryError;
/// An tracing decorator over a [`QueryExec`] implementation.
///
/// This wrapper emits child tracing spans covering the execution of the inner
/// [`QueryExec::query_exec()`] call.
///
/// Constructing this decorator is cheap.
#[derive(Debug)]
pub(crate) struct QueryExecTracing<T> {
inner: T,
name: Cow<'static, str>,
}
impl<T> QueryExecTracing<T> {
pub(crate) fn new(inner: T, name: impl Into<Cow<'static, str>>) -> Self {
Self {
inner,
name: name.into(),
}
}
}
#[async_trait]
impl<T> QueryExec for QueryExecTracing<T>
where
T: QueryExec,
{
#[inline(always)]
async fn query_exec(
&self,
namespace_id: NamespaceId,
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<QueryResponse, QueryError> {
let span = span.map(|s| s.child(self.name.clone()));
let mut recorder = SpanRecorder::new(span.clone());
match self
.inner
.query_exec(namespace_id, table_id, columns, span)
.await
{
Ok(v) => {
recorder.ok("query_exec complete");
Ok(v)
}
Err(e) => {
recorder.error(e.to_string());
Err(e)
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use assert_matches::assert_matches;
use trace::{ctx::SpanContext, span::SpanStatus, RingBufferTraceCollector, TraceCollector};
use crate::query::{mock_query_exec::MockQueryExec, response::PartitionStream};
use super::*;
#[track_caller]
fn assert_trace(name: impl Into<String>, status: SpanStatus, traces: &dyn TraceCollector) {
let traces = traces
.as_any()
.downcast_ref::<RingBufferTraceCollector>()
.expect("unexpected collector impl");
let name = name.into();
let span = traces
.spans()
.into_iter()
.find(|s| s.name == name)
.unwrap_or_else(|| panic!("tracing span {name} not found"));
assert_eq!(
span.status, status,
"span status does not match expected value"
);
}
#[tokio::test]
async fn test_ok() {
let stream: PartitionStream = Box::pin(Box::new(futures::stream::iter([])));
let mock = MockQueryExec::default().with_result(Ok(QueryResponse::new(stream)));
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span = SpanContext::new(Arc::clone(&traces));
// Drive the trace wrapper
let _ = QueryExecTracing::new(mock, "bananas")
.query_exec(
NamespaceId::new(42),
TableId::new(24),
vec![],
Some(span.child("root span")),
)
.await
.expect("wrapper should not modify result");
// Assert the trace showed up.
assert_trace("bananas", SpanStatus::Ok, &*traces);
}
#[tokio::test]
async fn test_err() {
let mock = MockQueryExec::default()
.with_result(Err(QueryError::NamespaceNotFound(NamespaceId::new(42))));
let traces: Arc<dyn TraceCollector> = Arc::new(RingBufferTraceCollector::new(5));
let span = SpanContext::new(Arc::clone(&traces));
// Drive the trace wrapper
let got = QueryExecTracing::new(mock, "bananas")
.query_exec(
NamespaceId::new(42),
TableId::new(24),
vec![],
Some(span.child("root span")),
)
.await
.expect_err("wrapper should not modify result");
assert_matches!(got, QueryError::NamespaceNotFound(ns) => {
assert_eq!(ns, NamespaceId::new(42));
});
// Assert the trace showed up.
assert_trace("bananas", SpanStatus::Err, &*traces);
}
}

View File

@ -5,7 +5,7 @@ use data_types::{NamespaceId, TableId};
use thiserror::Error;
use trace::span::Span;
use super::response::Response;
use super::response::QueryResponse;
#[derive(Debug, Error)]
#[allow(missing_copy_implementations)]
@ -25,7 +25,7 @@ pub(crate) trait QueryExec: Send + Sync + Debug {
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<Response, QueryError>;
) -> Result<QueryResponse, QueryError>;
}
#[async_trait]
@ -39,7 +39,7 @@ where
table_id: TableId,
columns: Vec<String>,
span: Option<Span>,
) -> Result<Response, QueryError> {
) -> Result<QueryResponse, QueryError> {
self.deref()
.query_exec(namespace_id, table_id, columns, span)
.await

View File

@ -20,7 +20,7 @@ use tokio::sync::{Semaphore, TryAcquireError};
use tonic::{Request, Response, Streaming};
use trace::{ctx::SpanContext, span::SpanExt};
use crate::query::{response::Response as QueryResponse, QueryError, QueryExec};
use crate::query::{response::QueryResponse, QueryError, QueryExec};
/// Error states for the query RPC handler.
///