From 1bed99567c06d8bf8079f6fe13d4af01631c54c5 Mon Sep 17 00:00:00 2001 From: Joe-Blount <73478756+Joe-Blount@users.noreply.github.com> Date: Thu, 20 Jul 2023 10:00:22 -0500 Subject: [PATCH] chore: add DF metrics to compaction spans (#8270) * chore: add DF metrics to compaction spans * chore: update string for test verification * chore: update comment --------- Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 1 + compactor/Cargo.toml | 1 + compactor/src/driver.rs | 12 ++++++++++-- influxdb_iox/tests/end_to_end_cases/tracing.rs | 4 ++-- iox_query/src/exec.rs | 2 +- iox_query/src/exec/query_tracing.rs | 2 +- 6 files changed, 16 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 28b8648f03..dd0f47923b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -967,6 +967,7 @@ dependencies = [ "async-trait", "backoff", "bytes", + "chrono", "compactor_scheduler", "compactor_test_utils", "data_types", diff --git a/compactor/Cargo.toml b/compactor/Cargo.toml index 8a8619999f..43dffc5337 100644 --- a/compactor/Cargo.toml +++ b/compactor/Cargo.toml @@ -9,6 +9,7 @@ license.workspace = true async-trait = "0.1.71" backoff = { path = "../backoff" } bytes = "1.4" +chrono = { version = "0.4", default-features = false } compactor_scheduler = { path = "../compactor_scheduler" } datafusion = { workspace = true } data_types = { path = "../data_types" } diff --git a/compactor/src/driver.rs b/compactor/src/driver.rs index 2d0bd97619..0ea42ad576 100644 --- a/compactor/src/driver.rs +++ b/compactor/src/driver.rs @@ -1,7 +1,9 @@ use std::{num::NonZeroUsize, sync::Arc, time::Duration}; +use chrono::Utc; use data_types::{CompactionLevel, ParquetFile, ParquetFileParams, PartitionId}; use futures::{stream, StreamExt, TryStreamExt}; +use iox_query::exec::query_tracing::send_metrics_to_tracing; use observability_deps::tracing::info; use parquet_file::ParquetFilePath; use tokio::sync::watch::Sender; @@ -531,12 +533,14 @@ async fn execute_plan( "job semaphore acquired", ); - let df_span = span.child("data_fusion"); + let df_span = span.child_span("data_fusion"); let plan = components .df_planner .plan(&plan_ir, Arc::clone(partition_info)) .await?; - let streams = components.df_plan_exec.exec(plan); + let streams = components.df_plan_exec.exec(Arc::< + dyn datafusion::physical_plan::ExecutionPlan, + >::clone(&plan)); let job = components.parquet_files_sink.stream_into_file_sink( streams, Arc::clone(partition_info), @@ -547,6 +551,10 @@ async fn execute_plan( // TODO: react to OOM and try to divide branch let res = job.await; + if let Some(span) = &df_span { + send_metrics_to_tracing(Utc::now(), span, plan.as_ref(), true); + }; + drop(permit); drop(df_span); diff --git a/influxdb_iox/tests/end_to_end_cases/tracing.rs b/influxdb_iox/tests/end_to_end_cases/tracing.rs index 51a51b4109..cb2597a6d9 100644 --- a/influxdb_iox/tests/end_to_end_cases/tracing.rs +++ b/influxdb_iox/tests/end_to_end_cases/tracing.rs @@ -235,9 +235,9 @@ async fn test_tracing_create_compactor_trace() { // "shallow" packet inspection and verify the UDP server got omething that had some expected // results. We could look for any text of any of the compaction spans. The name of the span - // for data fusion execution is arbitrarily chosen. + // for acquiring permit is arbitrarily chosen. udp_capture - .wait_for(|m| m.to_string().contains("data_fusion")) + .wait_for(|m| m.to_string().contains("acquire_permit")) .await; // debugging assistance diff --git a/iox_query/src/exec.rs b/iox_query/src/exec.rs index eea940b118..03089c83f1 100644 --- a/iox_query/src/exec.rs +++ b/iox_query/src/exec.rs @@ -6,7 +6,7 @@ pub mod field; pub mod fieldlist; pub mod gapfill; mod non_null_checker; -mod query_tracing; +pub mod query_tracing; mod schema_pivot; pub mod seriesset; pub(crate) mod split; diff --git a/iox_query/src/exec/query_tracing.rs b/iox_query/src/exec/query_tracing.rs index b3d37b23b6..a4b81bd2c3 100644 --- a/iox_query/src/exec/query_tracing.rs +++ b/iox_query/src/exec/query_tracing.rs @@ -109,7 +109,7 @@ impl Drop for TracedStream { /// 1. If the ExecutionPlan had no metrics /// 2. The total number of rows produced by the ExecutionPlan (if available) /// 3. The elapsed compute time taken by the ExecutionPlan -fn send_metrics_to_tracing( +pub fn send_metrics_to_tracing( default_end_time: DateTime, parent_span: &Span, physical_plan: &dyn ExecutionPlan,