Merge branch 'main' into dom/no-openssl
commit
de3ae7d6f8
|
@ -1,7 +1,6 @@
|
|||
//! Building blocks for [`clap`]-driven configs.
|
||||
//!
|
||||
//! They can easily be re-used using `#[clap(flatten)]`.
|
||||
pub mod boolean_flag;
|
||||
pub mod catalog_dsn;
|
||||
pub mod object_store;
|
||||
pub mod run_config;
|
||||
|
|
|
@ -1,6 +1,4 @@
|
|||
/// Boolean flag that works with environment variables.
|
||||
///
|
||||
/// Workaround for <https://github.com/TeXitoi/structopt/issues/428>
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum BooleanFlag {
|
||||
True,
|
|
@ -10,6 +10,7 @@
|
|||
clippy::clone_on_ref_ptr
|
||||
)]
|
||||
|
||||
pub mod boolean_flag;
|
||||
pub mod chunk_metadata;
|
||||
pub mod consistent_hasher;
|
||||
mod database_name;
|
||||
|
|
|
@ -85,6 +85,14 @@ For example, a command such as this should do the trick:
|
|||
TRACES_EXPORTER=jaeger TRACES_EXPORTER_JAEGER_AGENT_HOST=localhost TRACES_EXPORTER_JAEGER_AGENT_PORT=6831 cargo run -- run -v --server-id=42
|
||||
```
|
||||
|
||||
Additional trace granularity, in particular traces with spans for each DataFusion partition, can be enabled with
|
||||
|
||||
```
|
||||
INFLUXDB_IOX_PER_PARTITION_TRACING=1
|
||||
```
|
||||
|
||||
_Some tracing setups may struggle with the size of the generated traces with this setting enabled_
|
||||
|
||||
### Step 3: Send a request with trace context
|
||||
|
||||
For IOx to emit traces, the request must have a span context set. You can use the `--header` flag on the IOx CLI to do
|
||||
|
|
|
@ -2,7 +2,8 @@
|
|||
|
||||
use std::sync::Arc;
|
||||
|
||||
use clap_blocks::{boolean_flag::BooleanFlag, run_config::RunConfig};
|
||||
use clap_blocks::run_config::RunConfig;
|
||||
use data_types::boolean_flag::BooleanFlag;
|
||||
use influxdb_ioxd::{
|
||||
self,
|
||||
server_type::{
|
||||
|
|
|
@ -5,6 +5,7 @@ use std::{fmt, sync::Arc};
|
|||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use chrono::{DateTime, Utc};
|
||||
use data_types::boolean_flag::BooleanFlag;
|
||||
use datafusion::physical_plan::{
|
||||
metrics::{MetricValue, MetricsSet},
|
||||
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
|
||||
|
@ -14,6 +15,26 @@ use hashbrown::HashMap;
|
|||
use observability_deps::tracing::debug;
|
||||
use trace::span::{Span, SpanRecorder};
|
||||
|
||||
const PER_PARTITION_TRACING_ENABLE_ENV: &str = "INFLUXDB_IOX_PER_PARTITION_TRACING";
|
||||
fn per_partition_tracing() -> bool {
|
||||
use std::sync::atomic::{AtomicU8, Ordering};
|
||||
static TRACING_ENABLED: AtomicU8 = AtomicU8::new(u8::MAX);
|
||||
|
||||
match TRACING_ENABLED.load(Ordering::Relaxed) {
|
||||
u8::MAX => {
|
||||
let val = std::env::var(PER_PARTITION_TRACING_ENABLE_ENV)
|
||||
.ok()
|
||||
.and_then(|x| x.parse::<BooleanFlag>().ok())
|
||||
.map(Into::into)
|
||||
.unwrap_or(false);
|
||||
|
||||
TRACING_ENABLED.store(val as u8, Ordering::Relaxed);
|
||||
val
|
||||
}
|
||||
x => x != 0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Stream wrapper that records DataFusion `MetricSets` into IOx
|
||||
/// [`Span`]s when it is dropped.
|
||||
pub(crate) struct TracedStream {
|
||||
|
@ -59,7 +80,13 @@ impl Drop for TracedStream {
|
|||
fn drop(&mut self) {
|
||||
if let Some(span) = self.span_recorder.span() {
|
||||
let default_end_time = Utc::now();
|
||||
send_metrics_to_tracing(default_end_time, span, self.physical_plan.as_ref());
|
||||
let per_partition_tracing = per_partition_tracing();
|
||||
send_metrics_to_tracing(
|
||||
default_end_time,
|
||||
span,
|
||||
self.physical_plan.as_ref(),
|
||||
per_partition_tracing,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -87,6 +114,7 @@ fn send_metrics_to_tracing(
|
|||
default_end_time: DateTime<Utc>,
|
||||
parent_span: &Span,
|
||||
physical_plan: &dyn ExecutionPlan,
|
||||
per_partition_tracing: bool,
|
||||
) {
|
||||
// Something like this when one_line is contributed back upstream
|
||||
//let plan_name = physical_plan.displayable().one_line().to_string();
|
||||
|
@ -137,17 +165,21 @@ fn send_metrics_to_tracing(
|
|||
// Update the aggregate totals in the operator span
|
||||
operator_metrics.aggregate_child(&partition_metrics);
|
||||
|
||||
// Generate a span for the partition or skip if these metrics
|
||||
// only correspond to the operator and not a specific partition
|
||||
if let Some(partition) = partition {
|
||||
let mut partition_span =
|
||||
operator_span.child(format!("{} ({})", operator_name, partition));
|
||||
// Generate a span for the partition if
|
||||
// - these metrics correspond to a partition
|
||||
// - per partition tracing is enabled
|
||||
if per_partition_tracing {
|
||||
if let Some(partition) = partition {
|
||||
let mut partition_span =
|
||||
operator_span.child(format!("{} ({})", operator_name, partition));
|
||||
|
||||
partition_span.start = Some(partition_start_time);
|
||||
partition_span.end = Some(partition_end_time);
|
||||
partition_span.start = Some(partition_start_time);
|
||||
partition_span.end = Some(partition_end_time);
|
||||
|
||||
partition_metrics.add_to_span(&mut partition_span);
|
||||
partition_span.export();
|
||||
partition_metrics.add_to_span(&mut partition_span);
|
||||
|
||||
partition_span.export();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -168,7 +200,12 @@ fn send_metrics_to_tracing(
|
|||
|
||||
// recurse
|
||||
for child in physical_plan.children() {
|
||||
send_metrics_to_tracing(operator_end_time, &operator_span, child.as_ref());
|
||||
send_metrics_to_tracing(
|
||||
operator_end_time,
|
||||
&operator_span,
|
||||
child.as_ref(),
|
||||
per_partition_tracing,
|
||||
);
|
||||
}
|
||||
|
||||
operator_metrics.add_to_span(&mut operator_span);
|
||||
|
@ -299,7 +336,7 @@ mod tests {
|
|||
let exec = TestExec::new(name, Default::default());
|
||||
|
||||
let traces = TraceBuilder::new();
|
||||
send_metrics_to_tracing(Utc::now(), &traces.make_span(), &exec);
|
||||
send_metrics_to_tracing(Utc::now(), &traces.make_span(), &exec, true);
|
||||
|
||||
let spans = traces.spans();
|
||||
assert_eq!(spans.len(), 1);
|
||||
|
@ -349,7 +386,7 @@ mod tests {
|
|||
exec.new_child("child5: bongo", many_partition);
|
||||
|
||||
let traces = TraceBuilder::new();
|
||||
send_metrics_to_tracing(ts5, &traces.make_span(), &exec);
|
||||
send_metrics_to_tracing(ts5, &traces.make_span(), &exec, true);
|
||||
|
||||
let spans = traces.spans();
|
||||
let spans: BTreeMap<_, _> = spans.iter().map(|s| (s.name.as_ref(), s)).collect();
|
||||
|
@ -419,7 +456,7 @@ mod tests {
|
|||
exec.metrics = None;
|
||||
|
||||
let traces = TraceBuilder::new();
|
||||
send_metrics_to_tracing(Utc::now(), &traces.make_span(), &exec);
|
||||
send_metrics_to_tracing(Utc::now(), &traces.make_span(), &exec, true);
|
||||
|
||||
let spans = traces.spans();
|
||||
assert_eq!(spans.len(), 1);
|
||||
|
@ -443,7 +480,7 @@ mod tests {
|
|||
add_elapsed_compute(exec.metrics_mut(), 2000, 2);
|
||||
|
||||
let traces = TraceBuilder::new();
|
||||
send_metrics_to_tracing(Utc::now(), &traces.make_span(), &exec);
|
||||
send_metrics_to_tracing(Utc::now(), &traces.make_span(), &exec, true);
|
||||
|
||||
// aggregated metrics should be reported
|
||||
let spans = traces.spans();
|
||||
|
|
Loading…
Reference in New Issue