chore: add DF metrics to compaction spans (#8270)

* chore: add DF metrics to compaction spans

* chore: update string for test verification

* chore: update comment

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Joe-Blount 2023-07-20 10:00:22 -05:00 committed by GitHub
parent 668a1c3d8e
commit 1bed99567c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 16 additions and 6 deletions

1
Cargo.lock generated
View File

@ -967,6 +967,7 @@ dependencies = [
"async-trait",
"backoff",
"bytes",
"chrono",
"compactor_scheduler",
"compactor_test_utils",
"data_types",

View File

@ -9,6 +9,7 @@ license.workspace = true
async-trait = "0.1.71"
backoff = { path = "../backoff" }
bytes = "1.4"
chrono = { version = "0.4", default-features = false }
compactor_scheduler = { path = "../compactor_scheduler" }
datafusion = { workspace = true }
data_types = { path = "../data_types" }

View File

@ -1,7 +1,9 @@
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use chrono::Utc;
use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId};
use futures::{stream, StreamExt, TryStreamExt};
use iox_query::exec::query_tracing::send_metrics_to_tracing;
use observability_deps::tracing::info;
use parquet_file::ParquetFilePath;
use tokio::sync::watch::Sender;
@ -531,12 +533,14 @@ async fn execute_plan(
"job semaphore acquired",
);
let df_span = span.child("data_fusion");
let df_span = span.child_span("data_fusion");
let plan = components
.df_planner
.plan(&plan_ir, Arc::clone(partition_info))
.await?;
let streams = components.df_plan_exec.exec(plan);
let streams = components.df_plan_exec.exec(Arc::<
dyn datafusion::physical_plan::ExecutionPlan,
>::clone(&plan));
let job = components.parquet_files_sink.stream_into_file_sink(
streams,
Arc::clone(partition_info),
@ -547,6 +551,10 @@ async fn execute_plan(
// TODO: react to OOM and try to divide branch
let res = job.await;
if let Some(span) = &df_span {
send_metrics_to_tracing(Utc::now(), span, plan.as_ref(), true);
};
drop(permit);
drop(df_span);

View File

@ -235,9 +235,9 @@ async fn test_tracing_create_compactor_trace() {
// "shallow" packet inspection and verify the UDP server got omething that had some expected
// results. We could look for any text of any of the compaction spans. The name of the span
// for data fusion execution is arbitrarily chosen.
// for acquiring permit is arbitrarily chosen.
udp_capture
.wait_for(|m| m.to_string().contains("data_fusion"))
.wait_for(|m| m.to_string().contains("acquire_permit"))
.await;
// debugging assistance

View File

@ -6,7 +6,7 @@ pub mod field;
pub mod fieldlist;
pub mod gapfill;
mod non_null_checker;
mod query_tracing;
pub mod query_tracing;
mod schema_pivot;
pub mod seriesset;
pub(crate) mod split;

View File

@ -109,7 +109,7 @@ impl Drop for TracedStream {
/// 1. If the ExecutionPlan had no metrics
/// 2. The total number of rows produced by the ExecutionPlan (if available)
/// 3. The elapsed compute time taken by the ExecutionPlan
fn send_metrics_to_tracing(
pub fn send_metrics_to_tracing(
default_end_time: DateTime<Utc>,
parent_span: &Span,
physical_plan: &dyn ExecutionPlan,