diff --git a/Cargo.lock b/Cargo.lock index d85d87965e..42f01ab82c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4721,6 +4721,7 @@ dependencies = [ "futures", "http", "http-body", + "itertools 0.10.1", "observability_deps", "parking_lot", "pin-project 1.0.8", diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index 02d3908848..29344e5a22 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -548,7 +548,7 @@ mod tests { assert_eq!(trace_collector.spans().len(), 0); - let tracing_client = influxdb_iox_client::connection::Builder::default() + let b3_tracing_client = influxdb_iox_client::connection::Builder::default() .header( HeaderName::from_static("x-b3-sampled"), HeaderValue::from_static("1"), @@ -565,13 +565,27 @@ mod tests { .await .unwrap(); - let mut tracing_client = influxdb_iox_client::management::Client::new(tracing_client); + let mut b3_tracing_client = influxdb_iox_client::management::Client::new(b3_tracing_client); - tracing_client.list_databases().await.unwrap(); - tracing_client.get_server_status().await.unwrap(); + b3_tracing_client.list_databases().await.unwrap(); + b3_tracing_client.get_server_status().await.unwrap(); + + let jaeger_tracing_client = influxdb_iox_client::connection::Builder::default() + .header( + HeaderName::from_static("uber-trace-id"), + HeaderValue::from_static("3459495:30434:0:1"), + ) + .build(format!("http://{}", addr)) + .await + .unwrap(); + + influxdb_iox_client::management::Client::new(jaeger_tracing_client) + .list_databases() + .await + .unwrap(); let spans = trace_collector.spans(); - assert_eq!(spans.len(), 2); + assert_eq!(spans.len(), 3); let spans: Vec<trace::span::Span<'_>> = spans .iter() @@ -590,6 +604,12 @@ mod tests { assert!(spans[1].start.is_some()); assert!(spans[1].end.is_some()); + assert_eq!(spans[2].name, "IOx"); + assert_eq!(spans[2].ctx.parent_span_id.unwrap().0.get(), 30434); + assert_eq!(spans[2].ctx.trace_id.0.get(), 3459495); + assert!(spans[2].start.is_some()); + assert!(spans[2].end.is_some()); + assert_ne!(spans[0].ctx.span_id, spans[1].ctx.span_id); server.shutdown(); join.await.unwrap().unwrap(); diff --git a/src/main.rs b/src/main.rs index 32b8b281ab..5d3bca7e37 100644 --- a/src/main.rs +++ b/src/main.rs @@ -354,7 +354,7 @@ where fn from_str(s: &str) -> Result<Self, Self::Err> { use itertools::Itertools; - match s.split(':').collect_tuple() { + match s.splitn(2, ':').collect_tuple() { Some((key, value)) => { let key = K::from_str(key).map_err(|e| e.to_string())?; let value = V::from_str(value).map_err(|e| e.to_string())?; diff --git a/trace/Cargo.toml b/trace/Cargo.toml index abff942b4f..bbfda8c558 100644 --- a/trace/Cargo.toml +++ b/trace/Cargo.toml @@ -11,6 +11,7 @@ chrono = { version = "0.4", features = ["serde"] } futures = "0.3" http = "0.2.0" http-body = "0.4" +itertools = "0.10" observability_deps = { path = "../observability_deps" } parking_lot = "0.11" pin-project = "1.0" diff --git a/trace/src/ctx.rs b/trace/src/ctx.rs index 16e4ff7bf9..54606b7c33 100644 --- a/trace/src/ctx.rs +++ b/trace/src/ctx.rs @@ -7,6 +7,7 @@ use rand::Rng; use serde::{Deserialize, Serialize}; use snafu::Snafu; +use crate::ctx::ContextCodec::{Jaeger, B3}; use crate::{ span::{Span, SpanStatus}, TraceCollector, @@ -18,6 +19,8 @@ const B3_TRACE_ID_HEADER: &str = "X-B3-TraceId"; const B3_PARENT_SPAN_ID_HEADER: &str = "X-B3-ParentSpanId"; const B3_SPAN_ID_HEADER: &str = "X-B3-SpanId"; +const JAEGER_TRACE_HEADER: &str = "uber-trace-id"; + /// Error decoding SpanContext from transport representation #[derive(Debug, Snafu)] pub enum ContextError { @@ -43,10 +46,20 @@ pub enum DecodeError { #[snafu(display("value decode error: {}", source))] ValueDecodeError { source: ParseIntError }, + #[snafu(display("Expected \"trace-id:span-id:parent-span-id:flags\""))] + InvalidJaegerTrace, + #[snafu(display("value cannot be 0"))] ZeroError, } +impl From<ParseIntError> for DecodeError { + // Snafu doesn't allow both no context and a custom message + fn from(source: ParseIntError) -> Self { + Self::ValueDecodeError { source } + } +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub struct TraceId(pub NonZeroU128); @@ -55,11 +68,7 @@ impl<'a> FromStr for TraceId { fn from_str(s: &str) -> Result<Self, Self::Err> { Ok(Self( - NonZeroU128::new( - s.parse() - .map_err(|source| DecodeError::ValueDecodeError { source })?, - ) - .ok_or(DecodeError::ZeroError)?, + NonZeroU128::new(s.parse()?).ok_or(DecodeError::ZeroError)?, )) } } @@ -79,11 +88,7 @@ impl<'a> FromStr for SpanId { fn from_str(s: &str) -> Result<Self, Self::Err> { Ok(Self( - NonZeroU64::new( - s.parse() - .map_err(|source| DecodeError::ValueDecodeError { source })?, - ) - .ok_or(DecodeError::ZeroError)?, + NonZeroU64::new(s.parse()?).ok_or(DecodeError::ZeroError)?, )) } } @@ -130,31 +135,115 @@ impl SpanContext { collector: &Arc<dyn TraceCollector>, headers: &HeaderMap, ) -> Result<Option<Self>, ContextError> { - let debug = decoded_header(headers, B3_FLAGS)? - .map(|header| header == "1") - .unwrap_or(false); - - let sampled = match debug { - // Debug implies an accept decision - true => true, - false => decoded_header(headers, B3_SAMPLED_HEADER)? - .map(|value| value == "1" || value == "true") - .unwrap_or(false), - }; - - if !sampled { - return Ok(None); + match ContextCodec::detect(headers) { + None => Ok(None), + Some(ContextCodec::B3) => decode_b3(collector, headers), + Some(ContextCodec::Jaeger) => decode_jaeger(collector, headers), } - - Ok(Some(Self { - trace_id: required_header(headers, B3_TRACE_ID_HEADER)?, - parent_span_id: parsed_header(headers, B3_PARENT_SPAN_ID_HEADER)?, - span_id: required_header(headers, B3_SPAN_ID_HEADER)?, - collector: Some(Arc::clone(collector)), - })) } } +/// The codec used to encode trace context +enum ContextCodec { + /// <https://github.com/openzipkin/b3-propagation#multiple-headers> + B3, + /// <https://www.jaegertracing.io/docs/1.21/client-libraries/#propagation-format> + Jaeger, +} + +impl ContextCodec { + fn detect(headers: &HeaderMap) -> Option<Self> { + if headers.contains_key(B3_TRACE_ID_HEADER) { + Some(B3) + } else if headers.contains_key(JAEGER_TRACE_HEADER) { + Some(Jaeger) + } else { + None + } + } +} + +/// Decodes headers in the B3 format +fn decode_b3( + collector: &Arc<dyn TraceCollector>, + headers: &HeaderMap, +) -> Result<Option<SpanContext>, ContextError> { + let debug = decoded_header(headers, B3_FLAGS)? + .map(|header| header == "1") + .unwrap_or(false); + + let sampled = match debug { + // Debug implies an accept decision + true => true, + false => decoded_header(headers, B3_SAMPLED_HEADER)? + .map(|value| value == "1" || value == "true") + .unwrap_or(false), + }; + + if !sampled { + return Ok(None); + } + + Ok(Some(SpanContext { + trace_id: required_header(headers, B3_TRACE_ID_HEADER)?, + parent_span_id: parsed_header(headers, B3_PARENT_SPAN_ID_HEADER)?, + span_id: required_header(headers, B3_SPAN_ID_HEADER)?, + collector: Some(Arc::clone(collector)), + })) +} + +struct JaegerCtx { + trace_id: TraceId, + span_id: SpanId, + parent_span_id: Option<SpanId>, + flags: u8, +} + +impl FromStr for JaegerCtx { + type Err = DecodeError; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + use itertools::Itertools; + let (trace_id, span_id, parent_span_id, flags) = s + .split(':') + .collect_tuple() + .ok_or(DecodeError::InvalidJaegerTrace)?; + + let trace_id = trace_id.parse()?; + let span_id = span_id.parse()?; + let parent_span_id = match parent_span_id { + "0" => None, + _ => Some(parent_span_id.parse()?), + }; + let flags = flags.parse()?; + + Ok(Self { + trace_id, + span_id, + parent_span_id, + flags, + }) + } +} + +/// Decodes headers in the Jaeger format +fn decode_jaeger( + collector: &Arc<dyn TraceCollector>, + headers: &HeaderMap, +) -> Result<Option<SpanContext>, ContextError> { + let decoded: JaegerCtx = required_header(headers, JAEGER_TRACE_HEADER)?; + if decoded.flags & 0x01 == 0 { + return Ok(None); + } + + Ok(Some(SpanContext { + trace_id: decoded.trace_id, + parent_span_id: decoded.parent_span_id, + span_id: decoded.span_id, + collector: Some(Arc::clone(collector)), + })) +} + /// Decodes a given header from the provided HeaderMap to a string /// /// - Returns Ok(None) if the header doesn't exist @@ -206,7 +295,7 @@ mod tests { use http::HeaderValue; #[test] - fn test_decode() { + fn test_decode_b3() { let collector: Arc<dyn TraceCollector> = Arc::new(crate::LogTraceCollector::new()); let mut headers = HeaderMap::new(); @@ -215,6 +304,7 @@ mod tests { .unwrap() .is_none()); + headers.insert(B3_TRACE_ID_HEADER, HeaderValue::from_static("99999999")); headers.insert(B3_SAMPLED_HEADER, HeaderValue::from_static("0")); // Not sampled @@ -229,10 +319,9 @@ mod tests { SpanContext::from_headers(&collector, &headers) .unwrap_err() .to_string(), - "header 'X-B3-TraceId' not found" + "header 'X-B3-SpanId' not found" ); - headers.insert(B3_TRACE_ID_HEADER, HeaderValue::from_static("99999999")); headers.insert(B3_SPAN_ID_HEADER, HeaderValue::from_static("69559")); let span = SpanContext::from_headers(&collector, &headers) @@ -274,4 +363,66 @@ mod tests { "error decoding header 'X-B3-SpanId': value cannot be 0" ); } + + #[test] + fn test_decode_jaeger() { + let collector: Arc<dyn TraceCollector> = Arc::new(crate::LogTraceCollector::new()); + let mut headers = HeaderMap::new(); + + // Invalid format + headers.insert(JAEGER_TRACE_HEADER, HeaderValue::from_static("invalid")); + assert_eq!( + SpanContext::from_headers(&collector, &headers) + .unwrap_err() + .to_string(), + "error decoding header 'uber-trace-id': Expected \"trace-id:span-id:parent-span-id:flags\"" + ); + + // Not sampled + headers.insert( + JAEGER_TRACE_HEADER, + HeaderValue::from_static("343:4325345:0:0"), + ); + assert!(SpanContext::from_headers(&collector, &headers) + .unwrap() + .is_none()); + + // Sampled + headers.insert( + JAEGER_TRACE_HEADER, + HeaderValue::from_static("343:4325345:0:1"), + ); + let span = SpanContext::from_headers(&collector, &headers) + .unwrap() + .unwrap(); + + assert_eq!(span.trace_id.0.get(), 343); + assert_eq!(span.span_id.0.get(), 4325345); + assert!(span.parent_span_id.is_none()); + + // Parent span + headers.insert( + JAEGER_TRACE_HEADER, + HeaderValue::from_static("343:4325345:3434:1"), + ); + let span = SpanContext::from_headers(&collector, &headers) + .unwrap() + .unwrap(); + + assert_eq!(span.trace_id.0.get(), 343); + assert_eq!(span.span_id.0.get(), 4325345); + assert_eq!(span.parent_span_id.unwrap().0.get(), 3434); + + // Invalid trace id + headers.insert( + JAEGER_TRACE_HEADER, + HeaderValue::from_static("0:4325345:3434:1"), + ); + assert_eq!( + SpanContext::from_headers(&collector, &headers) + .unwrap_err() + .to_string(), + "error decoding header 'uber-trace-id': value cannot be 0" + ); + } }