feat: add per-partition tracing (#3532)

* feat: add per-partition tracing

* chore: docs

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2022-01-26 10:39:21 +00:00 committed by GitHub
parent 2928254c0f
commit 1b6aed063d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 183 additions and 73 deletions

View File

@ -448,7 +448,7 @@ mod tests {
#[tokio::test]
async fn test_query_tracing() {
let collector = Arc::new(RingBufferTraceCollector::new(100));
let collector = Arc::new(RingBufferTraceCollector::new(1000));
let (addr, server, join) = tracing_server(&collector).await;
let conn = jaeger_client(addr, "34f8495:35e32:0:1").await;

View File

@ -1,7 +1,7 @@
//! This module contains the code to map DataFusion metrics to `Span`s
//! for use in distributed tracing (e.g. Jaeger)
use std::{borrow::Cow, fmt, sync::Arc};
use std::{fmt, sync::Arc};
use arrow::record_batch::RecordBatch;
use chrono::{DateTime, Utc};
@ -10,6 +10,7 @@ use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
};
use futures::StreamExt;
use hashbrown::HashMap;
use observability_deps::tracing::debug;
use trace::span::{Span, SpanRecorder};
@ -68,7 +69,11 @@ impl Drop for TracedStream {
/// DataFusion metrics, so it should only be invoked *after* a plan is
/// fully `collect`ed.
///
/// Each `ExecutionPlan` in the plan gets its own new [`Span`]
/// Each `ExecutionPlan` in the plan gets its own new [`Span`] that covers
/// the time spent executing its partitions and its children
///
/// Each `ExecutionPlan` also has a new [`Span`] for each of its
/// partitions that collected metrics
///
/// The start and end time of the span are taken from the
/// ExecutionPlan's metrics, falling back to the parent span's
@ -89,66 +94,128 @@ fn send_metrics_to_tracing(
// create a child span for this physical plan node. Truncate the
// name first 20 characters of the display representation to avoid
// making massive span names
let plan_name = one_line(physical_plan).to_string();
let mut plan_name = one_line(physical_plan).to_string();
plan_name.truncate(20);
let plan_name = if plan_name.len() > 20 {
Cow::Owned((&plan_name[0..20]).to_string())
} else {
Cow::Owned(plan_name)
// Get the timings of the parent operator
let parent_start_time = parent_span.start.unwrap_or(default_end_time);
let parent_end_time = parent_span.end.unwrap_or(default_end_time);
// A span for the operation, this is the aggregate of all the partition spans
let mut operator_span = parent_span.child(plan_name.clone());
let mut operator_metrics = SpanMetrics {
output_rows: None,
elapsed_compute_nanos: None,
};
let mut span = parent_span.child(plan_name);
span.start = parent_span.start;
// parent span may not have completed yet
let span_end = parent_span.end.unwrap_or(default_end_time);
span.end = Some(span_end);
// The total duration for this span and all its children and partitions
let mut operator_start_time = chrono::MAX_DATETIME;
let mut operator_end_time = chrono::MIN_DATETIME;
match physical_plan.metrics() {
None => {
// this DataFusion node had no metrics, so record that in
// metadata and use the start/stop time of the parent span
span.metadata
operator_span
.metadata
.insert("missing_statistics".into(), "true".into());
}
Some(metrics) => {
// this DataFusion node had metrics, translate them into
// span information
// Create a separate span for each partition in the operator
for (partition, metrics) in partition_metrics(metrics) {
let (start_ts, end_ts) = get_timestamps(&metrics);
// Aggregate metrics from all DataFusion partitions
// together (maybe in the future it would be neat to
// expose per partition traces)
let metrics = metrics.aggregate_by_partition();
let partition_start_time = start_ts.unwrap_or(parent_start_time);
let partition_end_time = end_ts.unwrap_or(parent_end_time);
let (start_ts, end_ts) = get_timestamps(&metrics);
let partition_metrics = SpanMetrics {
output_rows: metrics.output_rows(),
elapsed_compute_nanos: metrics.elapsed_compute(),
};
if start_ts.is_some() {
span.start = start_ts
}
operator_start_time = operator_start_time.min(partition_start_time);
operator_end_time = operator_end_time.max(partition_end_time);
if end_ts.is_some() {
span.end = end_ts
}
// Update the aggregate totals in the operator span
operator_metrics.aggregate_child(&partition_metrics);
if let Some(output_rows) = metrics.output_rows() {
let output_rows = output_rows as i64;
span.metadata
.insert("output_rows".into(), output_rows.into());
}
if let Some(elapsed_compute) = metrics.elapsed_compute() {
let elapsed_compute = elapsed_compute as i64;
span.metadata
.insert("elapsed_compute_nanos".into(), elapsed_compute.into());
// Generate a span for the partition or skip if these metrics
// only correspond to the operator and not a specific partition
if let Some(partition) = partition {
let mut partition_span =
operator_span.child(format!("{} ({})", plan_name, partition));
partition_span.start = Some(partition_start_time);
partition_span.end = Some(partition_end_time);
partition_metrics.add_to_span(&mut partition_span);
partition_span.export();
}
}
}
}
// recurse
for child in physical_plan.children() {
send_metrics_to_tracing(span_end, &span, child.as_ref())
// If we've not encountered any metrics to determine the operator's start
// and end time, use those of the parent
if operator_start_time == chrono::MAX_DATETIME {
operator_start_time = parent_span.start.unwrap_or(default_end_time);
}
span.export()
if operator_end_time == chrono::MIN_DATETIME {
operator_end_time = parent_span.end.unwrap_or(default_end_time);
}
operator_span.start = Some(operator_start_time);
operator_span.end = Some(operator_end_time);
// recurse
for child in physical_plan.children() {
send_metrics_to_tracing(operator_end_time, &operator_span, child.as_ref());
}
operator_metrics.add_to_span(&mut operator_span);
operator_span.export();
}
#[derive(Debug)]
struct SpanMetrics {
output_rows: Option<usize>,
elapsed_compute_nanos: Option<usize>,
}
impl SpanMetrics {
fn aggregate_child(&mut self, child: &Self) {
if let Some(rows) = child.output_rows {
*self.output_rows.get_or_insert(0) += rows;
}
if let Some(nanos) = child.elapsed_compute_nanos {
*self.elapsed_compute_nanos.get_or_insert(0) += nanos;
}
}
fn add_to_span(&self, span: &mut Span) {
if let Some(rows) = self.output_rows {
span.metadata
.insert("output_rows".into(), (rows as i64).into());
}
if let Some(nanos) = self.elapsed_compute_nanos {
span.metadata
.insert("elapsed_compute_nanos".into(), (nanos as i64).into());
}
}
}
fn partition_metrics(metrics: MetricsSet) -> HashMap<Option<usize>, MetricsSet> {
let mut hashmap = HashMap::<_, MetricsSet>::new();
for metric in metrics.iter() {
hashmap
.entry(*metric.partition())
.or_default()
.push(Arc::clone(metric))
}
hashmap
}
// todo contribute this back upstream to datafusion (add to `DisplayableExecutionPlan`)
@ -220,6 +287,7 @@ mod tests {
},
};
use std::collections::BTreeMap;
use std::{sync::Arc, time::Duration};
use trace::{ctx::SpanContext, span::MetaValue, RingBufferTraceCollector};
@ -249,6 +317,11 @@ mod tests {
let ts4 = Utc.timestamp(4, 0);
let ts5 = Utc.timestamp(5, 0);
let mut many_partition = MetricsSet::new();
add_time_metrics(&mut many_partition, None, Some(ts2), Some(1));
add_time_metrics(&mut many_partition, Some(ts2), Some(ts3), Some(2));
add_time_metrics(&mut many_partition, Some(ts1), None, Some(3));
// build this timestamp tree:
//
// exec: [ ts1 -------- ts4] <-- both start and end timestamps
@ -256,30 +329,47 @@ mod tests {
// child2: [ ts2 --- ts3] <-- both start and end timestamps
// child3: [ --- ts3] <-- only end timestamps (e.g. bad data)
// child4: [ ] <-- no timestamps
let mut exec = TestExec::new("exec", make_time_metricset(Some(ts1), Some(ts4)));
exec.new_child("child1", make_time_metricset(Some(ts2), None));
exec.new_child("child2", make_time_metricset(Some(ts2), Some(ts3)));
exec.new_child("child3", make_time_metricset(None, Some(ts3)));
exec.new_child("child4", make_time_metricset(None, None));
// child5 (1): [ --- ts2]
// child5 (2): [ ts2 --- ts3]
// child5 (4): [ ts1 --- ]
let mut exec = TestExec::new("exec", make_time_metric_set(Some(ts1), Some(ts4), Some(1)));
exec.new_child("child1", make_time_metric_set(Some(ts2), None, Some(1)));
exec.new_child("child2", make_time_metric_set(Some(ts2), Some(ts3), None));
exec.new_child("child3", make_time_metric_set(None, Some(ts3), Some(1)));
exec.new_child("child4", make_time_metric_set(None, None, Some(1)));
exec.new_child("child5", many_partition);
let traces = TraceBuilder::new();
send_metrics_to_tracing(ts5, &traces.make_span(), &exec);
let spans = traces.spans();
println!("Spans: \n\n{:#?}", spans);
assert_eq!(spans.len(), 5);
let spans: BTreeMap<_, _> = spans.iter().map(|s| (s.name.as_ref(), s)).collect();
let check_span = |span: &Span, expected_name, expected_start, expected_end| {
assert_eq!(span.name, expected_name, "name; {:?}", span);
println!("Spans: \n\n{:#?}", spans);
assert_eq!(spans.len(), 12);
let check_span = |span: &Span, expected_start, expected_end| {
assert_eq!(span.start, expected_start, "expected start; {:?}", span);
assert_eq!(span.end, expected_end, "expected end; {:?}", span);
};
check_span(&spans[0], "TestExec: child1", Some(ts2), Some(ts4));
check_span(&spans[1], "TestExec: child2", Some(ts2), Some(ts3));
check_span(&spans[2], "TestExec: child3", Some(ts1), Some(ts3));
check_span(&spans[3], "TestExec: child4", Some(ts1), Some(ts4));
check_span(&spans[4], "TestExec: exec", Some(ts1), Some(ts4));
check_span(spans["TestExec: exec"], Some(ts1), Some(ts4));
check_span(spans["TestExec: exec (1)"], Some(ts1), Some(ts4));
check_span(spans["TestExec: child1"], Some(ts2), Some(ts4));
check_span(spans["TestExec: child1 (1)"], Some(ts2), Some(ts4));
check_span(spans["TestExec: child2"], Some(ts2), Some(ts3));
check_span(spans["TestExec: child3"], Some(ts1), Some(ts3));
check_span(spans["TestExec: child3 (1)"], Some(ts1), Some(ts3));
check_span(spans["TestExec: child4"], Some(ts1), Some(ts4));
check_span(spans["TestExec: child5"], Some(ts1), Some(ts4));
check_span(spans["TestExec: child5 (1)"], Some(ts1), Some(ts2));
check_span(spans["TestExec: child5 (2)"], Some(ts2), Some(ts3));
check_span(spans["TestExec: child5 (3)"], Some(ts1), Some(ts4));
}
#[test]
@ -317,19 +407,29 @@ mod tests {
// aggregated metrics should be reported
let spans = traces.spans();
assert_eq!(spans.len(), 1);
assert_eq!(
spans[0].metadata.get("output_rows"),
Some(&MetaValue::Int(300)),
"spans: {:#?}",
spans
);
assert_eq!(
spans[0].metadata.get("elapsed_compute_nanos"),
Some(&MetaValue::Int(3000)),
"spans: {:#?}",
spans
);
let spans: BTreeMap<_, _> = spans.iter().map(|s| (s.name.as_ref(), s)).collect();
assert_eq!(spans.len(), 3);
let check_span = |span: &Span, output_row: i64, nanos: i64| {
assert_eq!(
span.metadata.get("output_rows"),
Some(&MetaValue::Int(output_row)),
"span: {:#?}",
span
);
assert_eq!(
span.metadata.get("elapsed_compute_nanos"),
Some(&MetaValue::Int(nanos)),
"spans: {:#?}",
span
);
};
check_span(spans["TestExec: exec"], 300, 3000);
check_span(spans["TestExec: exec (1)"], 100, 1000);
check_span(spans["TestExec: exec (2)"], 200, 2000);
}
fn add_output_rows(metrics: &mut MetricsSet, output_rows: usize, partition: usize) {
@ -354,11 +454,24 @@ mod tests {
)));
}
fn make_time_metricset(start: Option<DateTime<Utc>>, end: Option<DateTime<Utc>>) -> MetricsSet {
fn make_time_metric_set(
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
partition: Option<usize>,
) -> MetricsSet {
let mut metrics = MetricsSet::new();
add_time_metrics(&mut metrics, start, end, partition);
metrics
}
fn add_time_metrics(
metrics: &mut MetricsSet,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
partition: Option<usize>,
) {
if let Some(start) = start {
let value = make_metrics_timestamp(start);
let partition = None;
metrics.push(Arc::new(Metric::new(
MetricValue::StartTimestamp(value),
partition,
@ -367,14 +480,11 @@ mod tests {
if let Some(end) = end {
let value = make_metrics_timestamp(end);
let partition = None;
metrics.push(Arc::new(Metric::new(
MetricValue::EndTimestamp(value),
partition,
)));
}
metrics
}
fn make_metrics_timestamp(t: DateTime<Utc>) -> Timestamp {