fix: Capture query execution traces for storage gRPC queries as well (#2553)
* fix: Capture query execution traces for storage gRPC queries as well * refactor: remove debugging droppings * refactor: do not Box::pin within TracedStream * refactor: Use Futures::TryStreamExt rather than custom collect function * fix: remove wild println Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
							parent
							
								
									ad4b8532bf
								
							
						
					
					
						commit
						ce224bd37f
					
				| 
						 | 
				
			
			@ -11,18 +11,19 @@ use datafusion::{
 | 
			
		|||
    logical_plan::{LogicalPlan, UserDefinedLogicalNode},
 | 
			
		||||
    physical_plan::{
 | 
			
		||||
        coalesce_partitions::CoalescePartitionsExec,
 | 
			
		||||
        collect, displayable,
 | 
			
		||||
        displayable,
 | 
			
		||||
        planner::{DefaultPhysicalPlanner, ExtensionPlanner},
 | 
			
		||||
        ExecutionPlan, PhysicalPlanner, SendableRecordBatchStream,
 | 
			
		||||
    },
 | 
			
		||||
    prelude::*,
 | 
			
		||||
};
 | 
			
		||||
use futures::TryStreamExt;
 | 
			
		||||
use observability_deps::tracing::{debug, trace};
 | 
			
		||||
use trace::{ctx::SpanContext, span::SpanRecorder};
 | 
			
		||||
 | 
			
		||||
use crate::exec::{
 | 
			
		||||
    fieldlist::{FieldList, IntoFieldList},
 | 
			
		||||
    query_tracing::send_metrics_to_tracing,
 | 
			
		||||
    query_tracing::TracedStream,
 | 
			
		||||
    schema_pivot::{SchemaPivotExec, SchemaPivotNode},
 | 
			
		||||
    seriesset::{SeriesSetConverter, SeriesSetItem},
 | 
			
		||||
    split::StreamSplitExec,
 | 
			
		||||
| 
						 | 
				
			
			@ -272,45 +273,63 @@ impl IOxExecutionContext {
 | 
			
		|||
    /// Executes the logical plan using DataFusion on a separate
 | 
			
		||||
    /// thread pool and produces RecordBatches
 | 
			
		||||
    pub async fn collect(&self, physical_plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
 | 
			
		||||
        let ctx = self.child_ctx("collect");
 | 
			
		||||
        debug!(
 | 
			
		||||
            "Running plan, physical:\n{}",
 | 
			
		||||
            displayable(physical_plan.as_ref()).indent()
 | 
			
		||||
        );
 | 
			
		||||
        let ctx = self.child_ctx("collect");
 | 
			
		||||
        let stream = ctx.execute_stream(physical_plan).await?;
 | 
			
		||||
 | 
			
		||||
        let res = ctx.run(collect(Arc::clone(&physical_plan))).await;
 | 
			
		||||
 | 
			
		||||
        // send metrics to tracing, even on error
 | 
			
		||||
        ctx.save_metrics(physical_plan);
 | 
			
		||||
        res
 | 
			
		||||
        ctx.run(
 | 
			
		||||
            stream
 | 
			
		||||
                .err_into() // convert to DataFusionError
 | 
			
		||||
                .try_collect(),
 | 
			
		||||
        )
 | 
			
		||||
        .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Executes the physical plan and produces a RecordBatchStream to stream
 | 
			
		||||
    /// over the result that iterates over the results.
 | 
			
		||||
    pub async fn execute(
 | 
			
		||||
    /// Executes the physical plan and produces a
 | 
			
		||||
    /// `SendableRecordBatchStream` to stream over the result that
 | 
			
		||||
    /// iterates over the results. The creation of the stream is
 | 
			
		||||
    /// performed in a separate thread pool.
 | 
			
		||||
    pub async fn execute_stream(
 | 
			
		||||
        &self,
 | 
			
		||||
        physical_plan: Arc<dyn ExecutionPlan>,
 | 
			
		||||
    ) -> Result<SendableRecordBatchStream> {
 | 
			
		||||
        match physical_plan.output_partitioning().partition_count() {
 | 
			
		||||
            0 => unreachable!(),
 | 
			
		||||
            1 => self.execute_partition(physical_plan, 0).await,
 | 
			
		||||
            1 => self.execute_stream_partitioned(physical_plan, 0).await,
 | 
			
		||||
            _ => {
 | 
			
		||||
                // Merge into a single partition
 | 
			
		||||
                self.execute_partition(Arc::new(CoalescePartitionsExec::new(physical_plan)), 0)
 | 
			
		||||
                    .await
 | 
			
		||||
                self.execute_stream_partitioned(
 | 
			
		||||
                    Arc::new(CoalescePartitionsExec::new(physical_plan)),
 | 
			
		||||
                    0,
 | 
			
		||||
                )
 | 
			
		||||
                .await
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Executes a single partition of a physical plan and produces a RecordBatchStream to stream
 | 
			
		||||
    /// over the result that iterates over the results.
 | 
			
		||||
    pub async fn execute_partition(
 | 
			
		||||
    /// Executes a single partition of a physical plan and produces a
 | 
			
		||||
    /// `SendableRecordBatchStream` to stream over the result that
 | 
			
		||||
    /// iterates over the results. The creation of the stream is
 | 
			
		||||
    /// performed in a separate thread pool.
 | 
			
		||||
    pub async fn execute_stream_partitioned(
 | 
			
		||||
        &self,
 | 
			
		||||
        physical_plan: Arc<dyn ExecutionPlan>,
 | 
			
		||||
        partition: usize,
 | 
			
		||||
    ) -> Result<SendableRecordBatchStream> {
 | 
			
		||||
        self.run(async move { physical_plan.execute(partition).await })
 | 
			
		||||
            .await
 | 
			
		||||
        let span = self
 | 
			
		||||
            .recorder
 | 
			
		||||
            .span()
 | 
			
		||||
            .map(|span| span.child("execute_stream_partitioned"));
 | 
			
		||||
 | 
			
		||||
        self.run(async move {
 | 
			
		||||
            let stream = physical_plan.execute(partition).await?;
 | 
			
		||||
            let stream = TracedStream::new(stream, span, physical_plan);
 | 
			
		||||
            Ok(Box::pin(stream) as _)
 | 
			
		||||
        })
 | 
			
		||||
        .await
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Executes the SeriesSetPlans on the query executor, in
 | 
			
		||||
| 
						 | 
				
			
			@ -349,7 +368,7 @@ impl IOxExecutionContext {
 | 
			
		|||
 | 
			
		||||
                    let physical_plan = ctx.prepare_plan(&plan)?;
 | 
			
		||||
 | 
			
		||||
                    let it = ctx.execute(physical_plan).await?;
 | 
			
		||||
                    let it = ctx.execute_stream(physical_plan).await?;
 | 
			
		||||
 | 
			
		||||
                    SeriesSetConverter::default()
 | 
			
		||||
                        .convert(
 | 
			
		||||
| 
						 | 
				
			
			@ -486,19 +505,4 @@ impl IOxExecutionContext {
 | 
			
		|||
            recorder: self.recorder.child(name),
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Saves any DataFusion metrics that are currently present in
 | 
			
		||||
    /// `physical_plan` to the span recorder so they show up in
 | 
			
		||||
    /// distributed traces (e.g. Jaeger)
 | 
			
		||||
    ///
 | 
			
		||||
    /// This function should be invoked after `physical_plan` has
 | 
			
		||||
    /// fully `collect`ed, meaning that `PhysicalPlan::execute()` has
 | 
			
		||||
    /// been invoked and the resulting streams have been completely
 | 
			
		||||
    /// consumed. Calling `save_metrics` metrics prior to this point
 | 
			
		||||
    /// may result in saving incomplete information.
 | 
			
		||||
    pub fn save_metrics(&self, physical_plan: Arc<dyn ExecutionPlan>) {
 | 
			
		||||
        if let Some(span) = self.recorder.span() {
 | 
			
		||||
            send_metrics_to_tracing(span, physical_plan.as_ref())
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,15 +1,67 @@
 | 
			
		|||
//! This module contains the code to map DataFusion metrics to `Span`s
 | 
			
		||||
//! for use in distributed tracing (e.g. Jaeger)
 | 
			
		||||
 | 
			
		||||
use std::{borrow::Cow, fmt};
 | 
			
		||||
use std::{borrow::Cow, fmt, sync::Arc};
 | 
			
		||||
 | 
			
		||||
use arrow::record_batch::RecordBatch;
 | 
			
		||||
use chrono::{DateTime, Utc};
 | 
			
		||||
use datafusion::physical_plan::{
 | 
			
		||||
    metrics::{MetricValue, MetricsSet},
 | 
			
		||||
    DisplayFormatType, ExecutionPlan,
 | 
			
		||||
    DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
 | 
			
		||||
};
 | 
			
		||||
use futures::StreamExt;
 | 
			
		||||
use observability_deps::tracing::debug;
 | 
			
		||||
use trace::span::Span;
 | 
			
		||||
use trace::span::{Span, SpanRecorder};
 | 
			
		||||
 | 
			
		||||
/// Stream wrapper that records DataFusion `MetricSets` into IOx
 | 
			
		||||
/// [`Span`]s when it is dropped.
 | 
			
		||||
pub(crate) struct TracedStream {
 | 
			
		||||
    inner: SendableRecordBatchStream,
 | 
			
		||||
    span_recorder: SpanRecorder,
 | 
			
		||||
    physical_plan: Arc<dyn ExecutionPlan>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl TracedStream {
 | 
			
		||||
    /// Return a stream that records DataFusion `MetricSets` from
 | 
			
		||||
    /// `physical_plan` into `span` when dropped.
 | 
			
		||||
    pub(crate) fn new(
 | 
			
		||||
        inner: SendableRecordBatchStream,
 | 
			
		||||
        span: Option<trace::span::Span>,
 | 
			
		||||
        physical_plan: Arc<dyn ExecutionPlan>,
 | 
			
		||||
    ) -> Self {
 | 
			
		||||
        Self {
 | 
			
		||||
            inner,
 | 
			
		||||
            span_recorder: SpanRecorder::new(span),
 | 
			
		||||
            physical_plan,
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl RecordBatchStream for TracedStream {
 | 
			
		||||
    fn schema(&self) -> arrow::datatypes::SchemaRef {
 | 
			
		||||
        self.inner.schema()
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl futures::Stream for TracedStream {
 | 
			
		||||
    type Item = arrow::error::Result<RecordBatch>;
 | 
			
		||||
 | 
			
		||||
    fn poll_next(
 | 
			
		||||
        mut self: std::pin::Pin<&mut Self>,
 | 
			
		||||
        cx: &mut std::task::Context<'_>,
 | 
			
		||||
    ) -> std::task::Poll<Option<Self::Item>> {
 | 
			
		||||
        self.inner.poll_next_unpin(cx)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl Drop for TracedStream {
 | 
			
		||||
    fn drop(&mut self) {
 | 
			
		||||
        if let Some(span) = self.span_recorder.span() {
 | 
			
		||||
            let default_end_time = Utc::now();
 | 
			
		||||
            send_metrics_to_tracing(default_end_time, span, self.physical_plan.as_ref());
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// This function translates data in DataFusion `MetricSets` into IOx
 | 
			
		||||
/// [`Span`]s. It records a snapshot of the current state of the
 | 
			
		||||
| 
						 | 
				
			
			@ -26,15 +78,7 @@ use trace::span::Span;
 | 
			
		|||
/// 1. If the ExecutionPlan had no metrics
 | 
			
		||||
/// 2. The total number of rows produced by the ExecutionPlan (if available)
 | 
			
		||||
/// 3. The elapsed compute time taken by the ExecutionPlan
 | 
			
		||||
pub(crate) fn send_metrics_to_tracing(parent_span: &Span, physical_plan: &dyn ExecutionPlan) {
 | 
			
		||||
    // The parent span may be open, but since the physical_plan is
 | 
			
		||||
    // assumed to be fully collected, using `now()` is a conservative
 | 
			
		||||
    // estimate of the end time
 | 
			
		||||
    let default_end_time = Utc::now();
 | 
			
		||||
    send_metrics_to_tracing_inner(default_end_time, parent_span, physical_plan)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
fn send_metrics_to_tracing_inner(
 | 
			
		||||
fn send_metrics_to_tracing(
 | 
			
		||||
    default_end_time: DateTime<Utc>,
 | 
			
		||||
    parent_span: &Span,
 | 
			
		||||
    physical_plan: &dyn ExecutionPlan,
 | 
			
		||||
| 
						 | 
				
			
			@ -101,7 +145,7 @@ fn send_metrics_to_tracing_inner(
 | 
			
		|||
 | 
			
		||||
    // recurse
 | 
			
		||||
    for child in physical_plan.children() {
 | 
			
		||||
        send_metrics_to_tracing_inner(span_end, &span, child.as_ref())
 | 
			
		||||
        send_metrics_to_tracing(span_end, &span, child.as_ref())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    span.export()
 | 
			
		||||
| 
						 | 
				
			
			@ -185,7 +229,7 @@ mod tests {
 | 
			
		|||
        let exec = TestExec::new(name, Default::default());
 | 
			
		||||
 | 
			
		||||
        let traces = TraceBuilder::new();
 | 
			
		||||
        send_metrics_to_tracing(&traces.make_span(), &exec);
 | 
			
		||||
        send_metrics_to_tracing(Utc::now(), &traces.make_span(), &exec);
 | 
			
		||||
 | 
			
		||||
        let spans = traces.spans();
 | 
			
		||||
        assert_eq!(spans.len(), 1);
 | 
			
		||||
| 
						 | 
				
			
			@ -216,7 +260,7 @@ mod tests {
 | 
			
		|||
        exec.new_child("child4", make_time_metricset(None, None));
 | 
			
		||||
 | 
			
		||||
        let traces = TraceBuilder::new();
 | 
			
		||||
        send_metrics_to_tracing_inner(ts5, &traces.make_span(), &exec);
 | 
			
		||||
        send_metrics_to_tracing(ts5, &traces.make_span(), &exec);
 | 
			
		||||
 | 
			
		||||
        let spans = traces.spans();
 | 
			
		||||
        println!("Spans: \n\n{:#?}", spans);
 | 
			
		||||
| 
						 | 
				
			
			@ -242,7 +286,7 @@ mod tests {
 | 
			
		|||
        exec.metrics = None;
 | 
			
		||||
 | 
			
		||||
        let traces = TraceBuilder::new();
 | 
			
		||||
        send_metrics_to_tracing(&traces.make_span(), &exec);
 | 
			
		||||
        send_metrics_to_tracing(Utc::now(), &traces.make_span(), &exec);
 | 
			
		||||
 | 
			
		||||
        let spans = traces.spans();
 | 
			
		||||
        assert_eq!(spans.len(), 1);
 | 
			
		||||
| 
						 | 
				
			
			@ -266,7 +310,7 @@ mod tests {
 | 
			
		|||
        add_elapsed_compute(exec.metrics_mut(), 2000, 2);
 | 
			
		||||
 | 
			
		||||
        let traces = TraceBuilder::new();
 | 
			
		||||
        send_metrics_to_tracing(&traces.make_span(), &exec);
 | 
			
		||||
        send_metrics_to_tracing(Utc::now(), &traces.make_span(), &exec);
 | 
			
		||||
 | 
			
		||||
        // aggregated metrics should be reported
 | 
			
		||||
        let spans = traces.spans();
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -103,7 +103,7 @@ pub(crate) fn compact_chunks(
 | 
			
		|||
            ReorgPlanner::new().compact_plan(schema, query_chunks.iter().map(Arc::clone), key)?;
 | 
			
		||||
 | 
			
		||||
        let physical_plan = ctx.prepare_plan(&plan)?;
 | 
			
		||||
        let stream = ctx.execute(physical_plan).await?;
 | 
			
		||||
        let stream = ctx.execute_stream(physical_plan).await?;
 | 
			
		||||
        let rb_chunk = collect_rub(stream, &addr, metric_registry.as_ref())
 | 
			
		||||
            .await?
 | 
			
		||||
            .expect("chunk has zero rows");
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -54,7 +54,7 @@ pub fn move_chunk_to_read_buffer(
 | 
			
		|||
            ReorgPlanner::new().compact_plan(schema, query_chunks.iter().map(Arc::clone), key)?;
 | 
			
		||||
 | 
			
		||||
        let physical_plan = ctx.prepare_plan(&plan)?;
 | 
			
		||||
        let stream = ctx.execute(physical_plan).await?;
 | 
			
		||||
        let stream = ctx.execute_stream(physical_plan).await?;
 | 
			
		||||
        let rb_chunk = collect_rub(
 | 
			
		||||
            stream,
 | 
			
		||||
            &addr.clone().into_partition(),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -112,8 +112,10 @@ pub fn persist_chunks(
 | 
			
		|||
            "Expected split plan to produce exactly 2 partitions"
 | 
			
		||||
        );
 | 
			
		||||
 | 
			
		||||
        let to_persist_stream = ctx.execute_partition(Arc::clone(&physical_plan), 0).await?;
 | 
			
		||||
        let remainder_stream = ctx.execute_partition(physical_plan, 1).await?;
 | 
			
		||||
        let to_persist_stream = ctx
 | 
			
		||||
            .execute_stream_partitioned(Arc::clone(&physical_plan), 0)
 | 
			
		||||
            .await?;
 | 
			
		||||
        let remainder_stream = ctx.execute_stream_partitioned(physical_plan, 1).await?;
 | 
			
		||||
 | 
			
		||||
        let (to_persist, remainder) = futures::future::try_join(
 | 
			
		||||
            collect_rub(to_persist_stream, &addr, metric_registry.as_ref()),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -753,9 +753,11 @@ mod tests {
 | 
			
		|||
        child(prepare_sql_span, "prepare_plan").unwrap();
 | 
			
		||||
 | 
			
		||||
        let collect_span = child(ctx_span, "collect").unwrap();
 | 
			
		||||
        let execute_span = child(collect_span, "execute_stream_partitioned").unwrap();
 | 
			
		||||
        let coalesce_span = child(execute_span, "CoalescePartitionsEx").unwrap();
 | 
			
		||||
 | 
			
		||||
        // validate spans from DataFusion ExecutionPlan are present
 | 
			
		||||
        child(collect_span, "ProjectionExec: expr").unwrap();
 | 
			
		||||
        child(coalesce_span, "ProjectionExec: expr").unwrap();
 | 
			
		||||
 | 
			
		||||
        let database_not_found = root_spans[3];
 | 
			
		||||
        assert_eq!(database_not_found.status, SpanStatus::Err);
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue