feat: support jaeger context propagation format (#2293)

pull/24376/head
Raphael Taylor-Davies 2021-08-16 11:55:40 +01:00 committed by GitHub
parent 756f5c6699
commit 302e2b5353
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 213 additions and 40 deletions

1
Cargo.lock generated
View File

@ -4721,6 +4721,7 @@ dependencies = [
"futures",
"http",
"http-body",
"itertools 0.10.1",
"observability_deps",
"parking_lot",
"pin-project 1.0.8",

View File

@ -548,7 +548,7 @@ mod tests {
assert_eq!(trace_collector.spans().len(), 0);
let tracing_client = influxdb_iox_client::connection::Builder::default()
let b3_tracing_client = influxdb_iox_client::connection::Builder::default()
.header(
HeaderName::from_static("x-b3-sampled"),
HeaderValue::from_static("1"),
@ -565,13 +565,27 @@ mod tests {
.await
.unwrap();
let mut tracing_client = influxdb_iox_client::management::Client::new(tracing_client);
let mut b3_tracing_client = influxdb_iox_client::management::Client::new(b3_tracing_client);
tracing_client.list_databases().await.unwrap();
tracing_client.get_server_status().await.unwrap();
b3_tracing_client.list_databases().await.unwrap();
b3_tracing_client.get_server_status().await.unwrap();
let jaeger_tracing_client = influxdb_iox_client::connection::Builder::default()
.header(
HeaderName::from_static("uber-trace-id"),
HeaderValue::from_static("3459495:30434:0:1"),
)
.build(format!("http://{}", addr))
.await
.unwrap();
influxdb_iox_client::management::Client::new(jaeger_tracing_client)
.list_databases()
.await
.unwrap();
let spans = trace_collector.spans();
assert_eq!(spans.len(), 2);
assert_eq!(spans.len(), 3);
let spans: Vec<trace::span::Span<'_>> = spans
.iter()
@ -590,6 +604,12 @@ mod tests {
assert!(spans[1].start.is_some());
assert!(spans[1].end.is_some());
assert_eq!(spans[2].name, "IOx");
assert_eq!(spans[2].ctx.parent_span_id.unwrap().0.get(), 30434);
assert_eq!(spans[2].ctx.trace_id.0.get(), 3459495);
assert!(spans[2].start.is_some());
assert!(spans[2].end.is_some());
assert_ne!(spans[0].ctx.span_id, spans[1].ctx.span_id);
server.shutdown();
join.await.unwrap().unwrap();

View File

@ -354,7 +354,7 @@ where
fn from_str(s: &str) -> Result<Self, Self::Err> {
use itertools::Itertools;
match s.split(':').collect_tuple() {
match s.splitn(2, ':').collect_tuple() {
Some((key, value)) => {
let key = K::from_str(key).map_err(|e| e.to_string())?;
let value = V::from_str(value).map_err(|e| e.to_string())?;

View File

@ -11,6 +11,7 @@ chrono = { version = "0.4", features = ["serde"] }
futures = "0.3"
http = "0.2.0"
http-body = "0.4"
itertools = "0.10"
observability_deps = { path = "../observability_deps" }
parking_lot = "0.11"
pin-project = "1.0"

View File

@ -7,6 +7,7 @@ use rand::Rng;
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use crate::ctx::ContextCodec::{Jaeger, B3};
use crate::{
span::{Span, SpanStatus},
TraceCollector,
@ -18,6 +19,8 @@ const B3_TRACE_ID_HEADER: &str = "X-B3-TraceId";
const B3_PARENT_SPAN_ID_HEADER: &str = "X-B3-ParentSpanId";
const B3_SPAN_ID_HEADER: &str = "X-B3-SpanId";
const JAEGER_TRACE_HEADER: &str = "uber-trace-id";
/// Error decoding SpanContext from transport representation
#[derive(Debug, Snafu)]
pub enum ContextError {
@ -43,10 +46,20 @@ pub enum DecodeError {
#[snafu(display("value decode error: {}", source))]
ValueDecodeError { source: ParseIntError },
#[snafu(display("Expected \"trace-id:span-id:parent-span-id:flags\""))]
InvalidJaegerTrace,
#[snafu(display("value cannot be 0"))]
ZeroError,
}
impl From<ParseIntError> for DecodeError {
// Snafu doesn't allow both no context and a custom message
fn from(source: ParseIntError) -> Self {
Self::ValueDecodeError { source }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct TraceId(pub NonZeroU128);
@ -55,11 +68,7 @@ impl<'a> FromStr for TraceId {
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(
NonZeroU128::new(
s.parse()
.map_err(|source| DecodeError::ValueDecodeError { source })?,
)
.ok_or(DecodeError::ZeroError)?,
NonZeroU128::new(s.parse()?).ok_or(DecodeError::ZeroError)?,
))
}
}
@ -79,11 +88,7 @@ impl<'a> FromStr for SpanId {
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(
NonZeroU64::new(
s.parse()
.map_err(|source| DecodeError::ValueDecodeError { source })?,
)
.ok_or(DecodeError::ZeroError)?,
NonZeroU64::new(s.parse()?).ok_or(DecodeError::ZeroError)?,
))
}
}
@ -130,31 +135,115 @@ impl SpanContext {
collector: &Arc<dyn TraceCollector>,
headers: &HeaderMap,
) -> Result<Option<Self>, ContextError> {
let debug = decoded_header(headers, B3_FLAGS)?
.map(|header| header == "1")
.unwrap_or(false);
let sampled = match debug {
// Debug implies an accept decision
true => true,
false => decoded_header(headers, B3_SAMPLED_HEADER)?
.map(|value| value == "1" || value == "true")
.unwrap_or(false),
};
if !sampled {
return Ok(None);
match ContextCodec::detect(headers) {
None => Ok(None),
Some(ContextCodec::B3) => decode_b3(collector, headers),
Some(ContextCodec::Jaeger) => decode_jaeger(collector, headers),
}
Ok(Some(Self {
trace_id: required_header(headers, B3_TRACE_ID_HEADER)?,
parent_span_id: parsed_header(headers, B3_PARENT_SPAN_ID_HEADER)?,
span_id: required_header(headers, B3_SPAN_ID_HEADER)?,
collector: Some(Arc::clone(collector)),
}))
}
}
/// The codec used to encode trace context
enum ContextCodec {
/// <https://github.com/openzipkin/b3-propagation#multiple-headers>
B3,
/// <https://www.jaegertracing.io/docs/1.21/client-libraries/#propagation-format>
Jaeger,
}
impl ContextCodec {
fn detect(headers: &HeaderMap) -> Option<Self> {
if headers.contains_key(B3_TRACE_ID_HEADER) {
Some(B3)
} else if headers.contains_key(JAEGER_TRACE_HEADER) {
Some(Jaeger)
} else {
None
}
}
}
/// Decodes headers in the B3 format
fn decode_b3(
collector: &Arc<dyn TraceCollector>,
headers: &HeaderMap,
) -> Result<Option<SpanContext>, ContextError> {
let debug = decoded_header(headers, B3_FLAGS)?
.map(|header| header == "1")
.unwrap_or(false);
let sampled = match debug {
// Debug implies an accept decision
true => true,
false => decoded_header(headers, B3_SAMPLED_HEADER)?
.map(|value| value == "1" || value == "true")
.unwrap_or(false),
};
if !sampled {
return Ok(None);
}
Ok(Some(SpanContext {
trace_id: required_header(headers, B3_TRACE_ID_HEADER)?,
parent_span_id: parsed_header(headers, B3_PARENT_SPAN_ID_HEADER)?,
span_id: required_header(headers, B3_SPAN_ID_HEADER)?,
collector: Some(Arc::clone(collector)),
}))
}
struct JaegerCtx {
trace_id: TraceId,
span_id: SpanId,
parent_span_id: Option<SpanId>,
flags: u8,
}
impl FromStr for JaegerCtx {
type Err = DecodeError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
use itertools::Itertools;
let (trace_id, span_id, parent_span_id, flags) = s
.split(':')
.collect_tuple()
.ok_or(DecodeError::InvalidJaegerTrace)?;
let trace_id = trace_id.parse()?;
let span_id = span_id.parse()?;
let parent_span_id = match parent_span_id {
"0" => None,
_ => Some(parent_span_id.parse()?),
};
let flags = flags.parse()?;
Ok(Self {
trace_id,
span_id,
parent_span_id,
flags,
})
}
}
/// Decodes headers in the Jaeger format
fn decode_jaeger(
collector: &Arc<dyn TraceCollector>,
headers: &HeaderMap,
) -> Result<Option<SpanContext>, ContextError> {
let decoded: JaegerCtx = required_header(headers, JAEGER_TRACE_HEADER)?;
if decoded.flags & 0x01 == 0 {
return Ok(None);
}
Ok(Some(SpanContext {
trace_id: decoded.trace_id,
parent_span_id: decoded.parent_span_id,
span_id: decoded.span_id,
collector: Some(Arc::clone(collector)),
}))
}
/// Decodes a given header from the provided HeaderMap to a string
///
/// - Returns Ok(None) if the header doesn't exist
@ -206,7 +295,7 @@ mod tests {
use http::HeaderValue;
#[test]
fn test_decode() {
fn test_decode_b3() {
let collector: Arc<dyn TraceCollector> = Arc::new(crate::LogTraceCollector::new());
let mut headers = HeaderMap::new();
@ -215,6 +304,7 @@ mod tests {
.unwrap()
.is_none());
headers.insert(B3_TRACE_ID_HEADER, HeaderValue::from_static("99999999"));
headers.insert(B3_SAMPLED_HEADER, HeaderValue::from_static("0"));
// Not sampled
@ -229,10 +319,9 @@ mod tests {
SpanContext::from_headers(&collector, &headers)
.unwrap_err()
.to_string(),
"header 'X-B3-TraceId' not found"
"header 'X-B3-SpanId' not found"
);
headers.insert(B3_TRACE_ID_HEADER, HeaderValue::from_static("99999999"));
headers.insert(B3_SPAN_ID_HEADER, HeaderValue::from_static("69559"));
let span = SpanContext::from_headers(&collector, &headers)
@ -274,4 +363,66 @@ mod tests {
"error decoding header 'X-B3-SpanId': value cannot be 0"
);
}
#[test]
fn test_decode_jaeger() {
let collector: Arc<dyn TraceCollector> = Arc::new(crate::LogTraceCollector::new());
let mut headers = HeaderMap::new();
// Invalid format
headers.insert(JAEGER_TRACE_HEADER, HeaderValue::from_static("invalid"));
assert_eq!(
SpanContext::from_headers(&collector, &headers)
.unwrap_err()
.to_string(),
"error decoding header 'uber-trace-id': Expected \"trace-id:span-id:parent-span-id:flags\""
);
// Not sampled
headers.insert(
JAEGER_TRACE_HEADER,
HeaderValue::from_static("343:4325345:0:0"),
);
assert!(SpanContext::from_headers(&collector, &headers)
.unwrap()
.is_none());
// Sampled
headers.insert(
JAEGER_TRACE_HEADER,
HeaderValue::from_static("343:4325345:0:1"),
);
let span = SpanContext::from_headers(&collector, &headers)
.unwrap()
.unwrap();
assert_eq!(span.trace_id.0.get(), 343);
assert_eq!(span.span_id.0.get(), 4325345);
assert!(span.parent_span_id.is_none());
// Parent span
headers.insert(
JAEGER_TRACE_HEADER,
HeaderValue::from_static("343:4325345:3434:1"),
);
let span = SpanContext::from_headers(&collector, &headers)
.unwrap()
.unwrap();
assert_eq!(span.trace_id.0.get(), 343);
assert_eq!(span.span_id.0.get(), 4325345);
assert_eq!(span.parent_span_id.unwrap().0.get(), 3434);
// Invalid trace id
headers.insert(
JAEGER_TRACE_HEADER,
HeaderValue::from_static("0:4325345:3434:1"),
);
assert_eq!(
SpanContext::from_headers(&collector, &headers)
.unwrap_err()
.to_string(),
"error decoding header 'uber-trace-id': value cannot be 0"
);
}
}