refactor: split out trace http (#2388)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-08-24 14:37:20 +01:00 committed by GitHub
parent 375347478a
commit 2c9c191b17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 442 additions and 483 deletions

26
Cargo.lock generated
View File

@ -1776,6 +1776,7 @@ dependencies = [
"tonic-reflection",
"trace",
"trace_exporters",
"trace_http",
"tracker",
"trogging",
"uuid",
@ -4743,18 +4744,9 @@ name = "trace"
version = "0.1.0"
dependencies = [
"chrono",
"futures",
"http",
"http-body",
"itertools 0.10.1",
"observability_deps",
"parking_lot",
"pin-project 1.0.8",
"rand 0.8.4",
"serde",
"serde_json",
"snafu",
"tower",
]
[[package]]
@ -4774,6 +4766,22 @@ dependencies = [
"trace",
]
[[package]]
name = "trace_http"
version = "0.1.0"
dependencies = [
"futures",
"http",
"http-body",
"itertools 0.10.1",
"observability_deps",
"parking_lot",
"pin-project 1.0.8",
"snafu",
"tower",
"trace",
]
[[package]]
name = "tracing"
version = "0.1.26"

View File

@ -43,6 +43,7 @@ members = [
"test_helpers",
"trace",
"trace_exporters",
"trace_http",
"tracker",
"trogging",
"grpc-router",
@ -78,6 +79,7 @@ read_buffer = { path = "read_buffer" }
server = { path = "server" }
trace = { path = "trace" }
trace_exporters = { path = "trace_exporters" }
trace_http = { path = "trace_http" }
tracker = { path = "tracker" }
trogging = { path = "trogging", default-features = false, features = ["structopt"] }
@ -125,7 +127,7 @@ tonic-reflection = "0.2.0"
uuid = { version = "0.8", features = ["v4"] }
# jemalloc-sys with unprefixed_malloc_on_supported_platforms feature and heappy are mutually exclusive
tikv-jemalloc-sys = { version = "0.4.0", optional = true, features = ["unprefixed_malloc_on_supported_platforms"]}
tikv-jemalloc-sys = { version = "0.4.0", optional = true, features = ["unprefixed_malloc_on_supported_platforms"] }
heappy = { git = "https://github.com/mkmik/heappy", rev = "20aa466524ac9ce34a4bae29f27ec11869b50e21", features = ["enable_heap_profiler", "jemalloc_shim", "measure_free"], optional = true }
@ -151,7 +153,7 @@ reqwest = "0.11"
tempfile = "3.1.0"
[features]
default = [ "jemalloc_replacing_malloc" ]
default = ["jemalloc_replacing_malloc"]
azure = ["object_store/azure"] # Optional Azure Object store support
gcp = ["object_store/gcp"] # Optional GCP object store support

View File

@ -619,11 +619,6 @@ mod tests {
let spans = trace_collector.spans();
assert_eq!(spans.len(), 3);
let spans: Vec<trace::span::Span> = spans
.iter()
.map(|x| serde_json::from_str(x.as_str()).unwrap())
.collect();
assert_eq!(spans[0].name, "IOx");
assert_eq!(spans[0].ctx.parent_span_id.unwrap().0.get(), 0xab3409);
assert_eq!(spans[0].ctx.trace_id.0.get(), 0xfea24902);

View File

@ -104,7 +104,7 @@ where
.context(ReflectionError)?;
let builder = tonic::transport::Server::builder();
let mut builder = builder.layer(trace::tower::TraceLayer::new(trace_collector));
let mut builder = builder.layer(trace_http::tower::TraceLayer::new(trace_collector));
// important that this one is NOT gated so that it can answer health requests
add_service!(builder, health_reporter, health_service);

View File

@ -7,18 +7,9 @@ description = "Distributed tracing support within IOx"
[dependencies]
chrono = { version = "0.4", features = ["serde"] }
futures = "0.3"
http = "0.2"
http-body = "0.4"
itertools = "0.10"
chrono = "0.4"
observability_deps = { path = "../observability_deps" }
parking_lot = "0.11"
pin-project = "1.0"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
snafu = "0.6"
tower = "0.4"
[dev-dependencies]

View File

@ -1,68 +1,15 @@
use std::borrow::Cow;
use std::num::{NonZeroU128, NonZeroU64, ParseIntError};
use std::str::FromStr;
use std::num::{NonZeroU128, NonZeroU64};
use std::sync::Arc;
use http::HeaderMap;
use observability_deps::tracing::info;
use rand::Rng;
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use crate::{
ctx::ContextCodec::{Jaeger, B3},
span::{Span, SpanStatus},
TraceCollector,
};
const B3_FLAGS: &str = "X-B3-Flags";
const B3_SAMPLED_HEADER: &str = "X-B3-Sampled";
const B3_TRACE_ID_HEADER: &str = "X-B3-TraceId";
const B3_PARENT_SPAN_ID_HEADER: &str = "X-B3-ParentSpanId";
const B3_SPAN_ID_HEADER: &str = "X-B3-SpanId";
const JAEGER_TRACE_HEADER: &str = "uber-trace-id";
/// Error decoding SpanContext from transport representation
#[derive(Debug, Snafu)]
pub enum ContextError {
#[snafu(display("header '{}' not found", header))]
Missing { header: &'static str },
#[snafu(display("header '{}' has non-UTF8 content: {}", header, source))]
InvalidUtf8 {
header: &'static str,
source: http::header::ToStrError,
},
#[snafu(display("error decoding header '{}': {}", header, source))]
HeaderDecodeError {
header: &'static str,
source: DecodeError,
},
}
/// Error decoding a specific header value
#[derive(Debug, Snafu)]
pub enum DecodeError {
#[snafu(display("value decode error: {}", source))]
ValueDecodeError { source: ParseIntError },
#[snafu(display("Expected \"trace-id:span-id:parent-span-id:flags\""))]
InvalidJaegerTrace,
#[snafu(display("value cannot be 0"))]
ZeroError,
}
impl From<ParseIntError> for DecodeError {
// Snafu doesn't allow both no context and a custom message
fn from(source: ParseIntError) -> Self {
Self::ValueDecodeError { source }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TraceId(pub NonZeroU128);
impl TraceId {
@ -75,17 +22,7 @@ impl TraceId {
}
}
impl<'a> FromStr for TraceId {
type Err = DecodeError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(
NonZeroU128::new(u128::from_str_radix(s, 16)?).ok_or(DecodeError::ZeroError)?,
))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SpanId(pub NonZeroU64);
impl SpanId {
@ -103,20 +40,10 @@ impl SpanId {
}
}
impl<'a> FromStr for SpanId {
type Err = DecodeError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Self(
NonZeroU64::new(u64::from_str_radix(s, 16)?).ok_or(DecodeError::ZeroError)?,
))
}
}
/// The immutable context of a `Span`
///
/// Importantly this contains all the information necessary to create a child `Span`
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone)]
pub struct SpanContext {
pub trace_id: TraceId,
@ -124,7 +51,6 @@ pub struct SpanContext {
pub span_id: SpanId,
#[serde(skip)]
pub collector: Option<Arc<dyn TraceCollector>>,
}
@ -146,321 +72,4 @@ impl SpanContext {
events: Default::default(),
}
}
/// Create a SpanContext for the trace described in the request's headers
///
/// Follows the B3 multiple header encoding defined here
/// - <https://github.com/openzipkin/b3-propagation#multiple-headers>
pub fn from_headers(
collector: &Arc<dyn TraceCollector>,
headers: &HeaderMap,
) -> Result<Option<Self>, ContextError> {
match ContextCodec::detect(headers) {
None => Ok(None),
Some(ContextCodec::B3) => decode_b3(collector, headers),
Some(ContextCodec::Jaeger) => decode_jaeger(collector, headers),
}
}
}
/// The codec used to encode trace context
enum ContextCodec {
/// <https://github.com/openzipkin/b3-propagation#multiple-headers>
B3,
/// <https://www.jaegertracing.io/docs/1.21/client-libraries/#propagation-format>
Jaeger,
}
impl ContextCodec {
fn detect(headers: &HeaderMap) -> Option<Self> {
if headers.contains_key(JAEGER_TRACE_HEADER) {
Some(Jaeger)
} else if headers.contains_key(B3_TRACE_ID_HEADER) {
Some(B3)
} else {
None
}
}
}
/// Decodes headers in the B3 format
fn decode_b3(
collector: &Arc<dyn TraceCollector>,
headers: &HeaderMap,
) -> Result<Option<SpanContext>, ContextError> {
let debug = decoded_header(headers, B3_FLAGS)?
.map(|header| header == "1")
.unwrap_or(false);
let sampled = match debug {
// Debug implies an accept decision
true => true,
false => decoded_header(headers, B3_SAMPLED_HEADER)?
.map(|value| value == "1" || value == "true")
.unwrap_or(false),
};
if !sampled {
return Ok(None);
}
Ok(Some(SpanContext {
trace_id: required_header(headers, B3_TRACE_ID_HEADER)?,
parent_span_id: parsed_header(headers, B3_PARENT_SPAN_ID_HEADER)?,
span_id: required_header(headers, B3_SPAN_ID_HEADER)?,
collector: Some(Arc::clone(collector)),
}))
}
struct JaegerCtx {
trace_id: TraceId,
span_id: SpanId,
parent_span_id: Option<SpanId>,
flags: u8,
}
impl FromStr for JaegerCtx {
type Err = DecodeError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
use itertools::Itertools;
// TEMPORARY (#2297)
info!("traced request {}", s);
let (trace_id, span_id, parent_span_id, flags) = s
.split(':')
.collect_tuple()
.ok_or(DecodeError::InvalidJaegerTrace)?;
let trace_id = trace_id.parse()?;
let span_id = span_id.parse()?;
let parent_span_id = match parent_span_id.parse() {
Ok(span_id) => Some(span_id),
Err(DecodeError::ZeroError) => None,
Err(e) => return Err(e),
};
let flags = u8::from_str_radix(flags, 16)?;
Ok(Self {
trace_id,
span_id,
parent_span_id,
flags,
})
}
}
/// Decodes headers in the Jaeger format
fn decode_jaeger(
collector: &Arc<dyn TraceCollector>,
headers: &HeaderMap,
) -> Result<Option<SpanContext>, ContextError> {
let decoded: JaegerCtx = required_header(headers, JAEGER_TRACE_HEADER)?;
if decoded.flags & 0x01 == 0 {
return Ok(None);
}
Ok(Some(SpanContext {
trace_id: decoded.trace_id,
parent_span_id: decoded.parent_span_id,
span_id: decoded.span_id,
collector: Some(Arc::clone(collector)),
}))
}
/// Decodes a given header from the provided HeaderMap to a string
///
/// - Returns Ok(None) if the header doesn't exist
/// - Returns Err if the header fails to decode to a string
/// - Returns Ok(Some(_)) otherwise
fn decoded_header<'a>(
headers: &'a HeaderMap,
header: &'static str,
) -> Result<Option<&'a str>, ContextError> {
headers
.get(header)
.map(|value| {
value
.to_str()
.map_err(|source| ContextError::InvalidUtf8 { header, source })
})
.transpose()
}
/// Decodes and parses a given header from the provided HeaderMap
///
/// - Returns Ok(None) if the header doesn't exist
/// - Returns Err if the header fails to decode to a string or fails to parse
/// - Returns Ok(Some(_)) otherwise
fn parsed_header<T: FromStr<Err = DecodeError>>(
headers: &HeaderMap,
header: &'static str,
) -> Result<Option<T>, ContextError> {
decoded_header(headers, header)?
.map(FromStr::from_str)
.transpose()
.map_err(|source| ContextError::HeaderDecodeError { source, header })
}
/// Decodes and parses a given required header from the provided HeaderMap
///
/// - Returns Err if the header fails to decode to a string, fails to parse, or doesn't exist
/// - Returns Ok(str) otherwise
fn required_header<T: FromStr<Err = DecodeError>>(
headers: &HeaderMap,
header: &'static str,
) -> Result<T, ContextError> {
parsed_header(headers, header)?.ok_or(ContextError::Missing { header })
}
#[cfg(test)]
mod tests {
use http::HeaderValue;
use super::*;
#[test]
fn test_decode_b3() {
let collector: Arc<dyn TraceCollector> = Arc::new(crate::LogTraceCollector::new());
let mut headers = HeaderMap::new();
// No headers should be None
assert!(SpanContext::from_headers(&collector, &headers)
.unwrap()
.is_none());
headers.insert(B3_TRACE_ID_HEADER, HeaderValue::from_static("ee25f"));
headers.insert(B3_SAMPLED_HEADER, HeaderValue::from_static("0"));
// Not sampled
assert!(SpanContext::from_headers(&collector, &headers)
.unwrap()
.is_none());
headers.insert(B3_SAMPLED_HEADER, HeaderValue::from_static("1"));
// Missing required headers
assert_eq!(
SpanContext::from_headers(&collector, &headers)
.unwrap_err()
.to_string(),
"header 'X-B3-SpanId' not found"
);
headers.insert(B3_SPAN_ID_HEADER, HeaderValue::from_static("34e"));
let span = SpanContext::from_headers(&collector, &headers)
.unwrap()
.unwrap();
assert_eq!(span.span_id.0.get(), 0x34e);
assert_eq!(span.trace_id.0.get(), 0xee25f);
assert!(span.parent_span_id.is_none());
headers.insert(
B3_PARENT_SPAN_ID_HEADER,
HeaderValue::from_static("4595945"),
);
let span = SpanContext::from_headers(&collector, &headers)
.unwrap()
.unwrap();
assert_eq!(span.span_id.0.get(), 0x34e);
assert_eq!(span.trace_id.0.get(), 0xee25f);
assert_eq!(span.parent_span_id.unwrap().0.get(), 0x4595945);
headers.insert(B3_SPAN_ID_HEADER, HeaderValue::from_static("not a number"));
assert_eq!(
SpanContext::from_headers(&collector, &headers)
.unwrap_err()
.to_string(),
"error decoding header 'X-B3-SpanId': value decode error: invalid digit found in string"
);
headers.insert(B3_SPAN_ID_HEADER, HeaderValue::from_static("0"));
assert_eq!(
SpanContext::from_headers(&collector, &headers)
.unwrap_err()
.to_string(),
"error decoding header 'X-B3-SpanId': value cannot be 0"
);
}
#[test]
fn test_decode_jaeger() {
let collector: Arc<dyn TraceCollector> = Arc::new(crate::LogTraceCollector::new());
let mut headers = HeaderMap::new();
// Invalid format
headers.insert(JAEGER_TRACE_HEADER, HeaderValue::from_static("invalid"));
assert_eq!(
SpanContext::from_headers(&collector, &headers)
.unwrap_err()
.to_string(),
"error decoding header 'uber-trace-id': Expected \"trace-id:span-id:parent-span-id:flags\""
);
// Not sampled
headers.insert(
JAEGER_TRACE_HEADER,
HeaderValue::from_static("343:4325345:0:0"),
);
assert!(SpanContext::from_headers(&collector, &headers)
.unwrap()
.is_none());
// Sampled
headers.insert(
JAEGER_TRACE_HEADER,
HeaderValue::from_static("3a43:432e345:0:1"),
);
let span = SpanContext::from_headers(&collector, &headers)
.unwrap()
.unwrap();
assert_eq!(span.trace_id.0.get(), 0x3a43);
assert_eq!(span.span_id.0.get(), 0x432e345);
assert!(span.parent_span_id.is_none());
// Parent span
headers.insert(
JAEGER_TRACE_HEADER,
HeaderValue::from_static("343:4325345:3434:F"),
);
let span = SpanContext::from_headers(&collector, &headers)
.unwrap()
.unwrap();
assert_eq!(span.trace_id.0.get(), 0x343);
assert_eq!(span.span_id.0.get(), 0x4325345);
assert_eq!(span.parent_span_id.unwrap().0.get(), 0x3434);
// Invalid trace id
headers.insert(
JAEGER_TRACE_HEADER,
HeaderValue::from_static("0:4325345:3434:1"),
);
assert_eq!(
SpanContext::from_headers(&collector, &headers)
.unwrap_err()
.to_string(),
"error decoding header 'uber-trace-id': value cannot be 0"
);
headers.insert(
JAEGER_TRACE_HEADER,
HeaderValue::from_static("008e813572f53b3a:008e813572f53b3a:0000000000000000:1"),
);
let span = SpanContext::from_headers(&collector, &headers)
.unwrap()
.unwrap();
assert_eq!(span.trace_id.0.get(), 0x008e813572f53b3a);
assert_eq!(span.span_id.0.get(), 0x008e813572f53b3a);
assert!(span.parent_span_id.is_none());
}
}

View File

@ -17,7 +17,6 @@ use crate::span::Span;
pub mod ctx;
pub mod span;
pub mod tower;
/// A TraceCollector is a sink for completed `Span`
pub trait TraceCollector: std::fmt::Debug + Send + Sync {
@ -42,14 +41,14 @@ impl Default for LogTraceCollector {
impl TraceCollector for LogTraceCollector {
fn export(&self, span: Span) {
info!("completed span {}", span.json())
info!("completed span {:?}", span)
}
}
/// A trace collector that maintains a ring buffer of spans
#[derive(Debug)]
pub struct RingBufferTraceCollector {
buffer: Mutex<VecDeque<String>>,
buffer: Mutex<VecDeque<Span>>,
}
impl RingBufferTraceCollector {
@ -59,18 +58,17 @@ impl RingBufferTraceCollector {
}
}
pub fn spans(&self) -> Vec<String> {
pub fn spans(&self) -> Vec<Span> {
self.buffer.lock().iter().cloned().collect()
}
}
impl TraceCollector for RingBufferTraceCollector {
fn export(&self, span: Span) {
let serialized = span.json();
let mut buffer = self.buffer.lock();
if buffer.len() == buffer.capacity() {
buffer.pop_front();
}
buffer.push_back(serialized);
buffer.push_back(span);
}
}

View File

@ -3,13 +3,10 @@ use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use observability_deps::tracing::error;
use crate::ctx::SpanContext;
#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
#[derive(Debug, Copy, Clone)]
pub enum SpanStatus {
Unknown,
Ok,
@ -22,11 +19,10 @@ pub enum SpanStatus {
/// have relationships with other Spans that together comprise a Trace
///
///
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone)]
pub struct Span {
pub name: Cow<'static, str>,
//#[serde(flatten)] - https://github.com/serde-rs/json/issues/505
pub ctx: SpanContext,
pub start: Option<DateTime<Utc>>,
@ -56,17 +52,6 @@ impl Span {
self.status = SpanStatus::Err;
}
/// Returns a JSON representation of this `Span`
pub fn json(&self) -> String {
match serde_json::to_string(self) {
Ok(serialized) => serialized,
Err(e) => {
error!(%e, "error serializing span to JSON");
format!("\"Invalid span: {}\"", e)
}
}
}
/// Exports this `Span` to its registered collector if any
pub fn export(mut self) {
if let Some(collector) = self.ctx.collector.take() {
@ -75,7 +60,7 @@ impl Span {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone)]
pub struct SpanEvent {
pub time: DateTime<Utc>,
@ -83,8 +68,7 @@ pub struct SpanEvent {
}
/// Values that can be stored in a Span's metadata and events
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
#[derive(Debug, Clone)]
pub enum MetaValue {
String(Cow<'static, str>),
Float(f64),
@ -169,8 +153,6 @@ mod tests {
use std::num::{NonZeroU128, NonZeroU64};
use std::sync::Arc;
use chrono::TimeZone;
use crate::ctx::{SpanId, TraceId};
use crate::{RingBufferTraceCollector, TraceCollector};
@ -197,43 +179,15 @@ mod tests {
fn test_span() {
let collector = Arc::new(RingBufferTraceCollector::new(5));
let mut span = make_span(Arc::<RingBufferTraceCollector>::clone(&collector));
let span = make_span(Arc::<RingBufferTraceCollector>::clone(&collector));
assert_eq!(
span.json(),
r#"{"name":"foo","ctx":{"trace_id":23948923,"parent_span_id":null,"span_id":3498394},"start":null,"end":null,"status":"Unknown","metadata":{},"events":[]}"#
);
span.events.push(SpanEvent {
time: Utc.timestamp_nanos(1000),
msg: "this is a test event".into(),
});
assert_eq!(
span.json(),
r#"{"name":"foo","ctx":{"trace_id":23948923,"parent_span_id":null,"span_id":3498394},"start":null,"end":null,"status":"Unknown","metadata":{},"events":[{"time":"1970-01-01T00:00:00.000001Z","msg":"this is a test event"}]}"#
);
span.metadata.insert("foo".into(), "bar".into());
span.start = Some(Utc.timestamp_nanos(100));
assert_eq!(
span.json(),
r#"{"name":"foo","ctx":{"trace_id":23948923,"parent_span_id":null,"span_id":3498394},"start":"1970-01-01T00:00:00.000000100Z","end":null,"status":"Unknown","metadata":{"foo":"bar"},"events":[{"time":"1970-01-01T00:00:00.000001Z","msg":"this is a test event"}]}"#
);
span.status = SpanStatus::Ok;
span.ctx.parent_span_id = Some(SpanId(NonZeroU64::new(23493).unwrap()));
let expected = r#"{"name":"foo","ctx":{"trace_id":23948923,"parent_span_id":23493,"span_id":3498394},"start":"1970-01-01T00:00:00.000000100Z","end":null,"status":"Ok","metadata":{"foo":"bar"},"events":[{"time":"1970-01-01T00:00:00.000001Z","msg":"this is a test event"}]}"#;
assert_eq!(span.json(), expected);
assert_eq!(collector.spans().len(), 0);
span.export();
// Should publish span
let spans = collector.spans();
assert_eq!(spans.len(), 1);
assert_eq!(spans[0], expected)
}
#[test]
@ -252,7 +206,7 @@ mod tests {
let spans = collector.spans();
assert_eq!(spans.len(), 1);
let span: Span = serde_json::from_str(spans[0].as_str()).unwrap();
let span = &spans[0];
assert!(span.start.is_some());
assert!(span.end.is_some());

21
trace_http/Cargo.toml Normal file
View File

@ -0,0 +1,21 @@
[package]
name = "trace_http"
version = "0.1.0"
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
edition = "2018"
description = "Distributed tracing support for HTTP services"
[dependencies]
trace = { path = "../trace" }
futures = "0.3"
http = "0.2"
http-body = "0.4"
itertools = "0.10"
observability_deps = { path = "../observability_deps" }
parking_lot = "0.11"
pin-project = "1.0"
snafu = "0.6"
tower = "0.4"
[dev-dependencies]

369
trace_http/src/ctx.rs Normal file
View File

@ -0,0 +1,369 @@
use std::num::{NonZeroU128, NonZeroU64, ParseIntError};
use std::str::FromStr;
use std::sync::Arc;
use http::HeaderMap;
use observability_deps::tracing::info;
use snafu::Snafu;
use trace::ctx::{SpanContext, SpanId, TraceId};
use trace::TraceCollector;
const B3_FLAGS: &str = "X-B3-Flags";
const B3_SAMPLED_HEADER: &str = "X-B3-Sampled";
const B3_TRACE_ID_HEADER: &str = "X-B3-TraceId";
const B3_PARENT_SPAN_ID_HEADER: &str = "X-B3-ParentSpanId";
const B3_SPAN_ID_HEADER: &str = "X-B3-SpanId";
const JAEGER_TRACE_HEADER: &str = "uber-trace-id";
/// Error decoding SpanContext from transport representation
#[derive(Debug, Snafu)]
pub enum ContextError {
#[snafu(display("header '{}' not found", header))]
Missing { header: &'static str },
#[snafu(display("header '{}' has non-UTF8 content: {}", header, source))]
InvalidUtf8 {
header: &'static str,
source: http::header::ToStrError,
},
#[snafu(display("error decoding header '{}': {}", header, source))]
HeaderDecodeError {
header: &'static str,
source: DecodeError,
},
}
/// Error decoding a specific header value
#[derive(Debug, Snafu)]
pub enum DecodeError {
#[snafu(display("value decode error: {}", source))]
ValueDecodeError { source: ParseIntError },
#[snafu(display("Expected \"trace-id:span-id:parent-span-id:flags\""))]
InvalidJaegerTrace,
#[snafu(display("value cannot be 0"))]
ZeroError,
}
impl From<ParseIntError> for DecodeError {
// Snafu doesn't allow both no context and a custom message
fn from(source: ParseIntError) -> Self {
Self::ValueDecodeError { source }
}
}
fn parse_trace(s: &str) -> Result<TraceId, DecodeError> {
Ok(TraceId(
NonZeroU128::new(u128::from_str_radix(s, 16)?).ok_or(DecodeError::ZeroError)?,
))
}
fn parse_span(s: &str) -> Result<SpanId, DecodeError> {
Ok(SpanId(
NonZeroU64::new(u64::from_str_radix(s, 16)?).ok_or(DecodeError::ZeroError)?,
))
}
/// Create a SpanContext for the trace described in the request's headers
pub fn parse_span_ctx(
collector: &Arc<dyn TraceCollector>,
headers: &HeaderMap,
) -> Result<Option<SpanContext>, ContextError> {
match ContextCodec::detect(headers) {
None => Ok(None),
Some(ContextCodec::B3) => decode_b3(collector, headers),
Some(ContextCodec::Jaeger) => decode_jaeger(collector, headers),
}
}
/// The codec used to encode trace context
enum ContextCodec {
/// <https://github.com/openzipkin/b3-propagation#multiple-headers>
B3,
/// <https://www.jaegertracing.io/docs/1.21/client-libraries/#propagation-format>
Jaeger,
}
impl ContextCodec {
fn detect(headers: &HeaderMap) -> Option<Self> {
if headers.contains_key(JAEGER_TRACE_HEADER) {
Some(Self::Jaeger)
} else if headers.contains_key(B3_TRACE_ID_HEADER) {
Some(Self::B3)
} else {
None
}
}
}
/// Decodes headers in the B3 format
fn decode_b3(
collector: &Arc<dyn TraceCollector>,
headers: &HeaderMap,
) -> Result<Option<SpanContext>, ContextError> {
let debug = decoded_header(headers, B3_FLAGS)?
.map(|header| header == "1")
.unwrap_or(false);
let sampled = match debug {
// Debug implies an accept decision
true => true,
false => decoded_header(headers, B3_SAMPLED_HEADER)?
.map(|value| value == "1" || value == "true")
.unwrap_or(false),
};
if !sampled {
return Ok(None);
}
Ok(Some(SpanContext {
trace_id: required_header(headers, B3_TRACE_ID_HEADER, parse_trace)?,
parent_span_id: parsed_header(headers, B3_PARENT_SPAN_ID_HEADER, parse_span)?,
span_id: required_header(headers, B3_SPAN_ID_HEADER, parse_span)?,
collector: Some(Arc::clone(collector)),
}))
}
struct JaegerCtx {
trace_id: TraceId,
span_id: SpanId,
parent_span_id: Option<SpanId>,
flags: u8,
}
impl FromStr for JaegerCtx {
type Err = DecodeError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
use itertools::Itertools;
// TEMPORARY (#2297)
info!("traced request {}", s);
let (trace_id, span_id, parent_span_id, flags) = s
.split(':')
.collect_tuple()
.ok_or(DecodeError::InvalidJaegerTrace)?;
let trace_id = parse_trace(trace_id)?;
let span_id = parse_span(span_id)?;
let parent_span_id = match parse_span(parent_span_id) {
Ok(span_id) => Some(span_id),
Err(DecodeError::ZeroError) => None,
Err(e) => return Err(e),
};
let flags = u8::from_str_radix(flags, 16)?;
Ok(Self {
trace_id,
span_id,
parent_span_id,
flags,
})
}
}
/// Decodes headers in the Jaeger format
fn decode_jaeger(
collector: &Arc<dyn TraceCollector>,
headers: &HeaderMap,
) -> Result<Option<SpanContext>, ContextError> {
let decoded: JaegerCtx = required_header(headers, JAEGER_TRACE_HEADER, FromStr::from_str)?;
if decoded.flags & 0x01 == 0 {
return Ok(None);
}
Ok(Some(SpanContext {
trace_id: decoded.trace_id,
parent_span_id: decoded.parent_span_id,
span_id: decoded.span_id,
collector: Some(Arc::clone(collector)),
}))
}
/// Decodes a given header from the provided HeaderMap to a string
///
/// - Returns Ok(None) if the header doesn't exist
/// - Returns Err if the header fails to decode to a string
/// - Returns Ok(Some(_)) otherwise
fn decoded_header<'a>(
headers: &'a HeaderMap,
header: &'static str,
) -> Result<Option<&'a str>, ContextError> {
headers
.get(header)
.map(|value| {
value
.to_str()
.map_err(|source| ContextError::InvalidUtf8 { header, source })
})
.transpose()
}
/// Decodes and parses a given header from the provided HeaderMap
///
/// - Returns Ok(None) if the header doesn't exist
/// - Returns Err if the header fails to decode to a string or fails to parse
/// - Returns Ok(Some(_)) otherwise
fn parsed_header<T, F: FnOnce(&str) -> Result<T, DecodeError>>(
headers: &HeaderMap,
header: &'static str,
parse: F,
) -> Result<Option<T>, ContextError> {
decoded_header(headers, header)?
.map(parse)
.transpose()
.map_err(|source| ContextError::HeaderDecodeError { source, header })
}
/// Decodes and parses a given required header from the provided HeaderMap
///
/// - Returns Err if the header fails to decode to a string, fails to parse, or doesn't exist
/// - Returns Ok(str) otherwise
fn required_header<T, F: FnOnce(&str) -> Result<T, DecodeError>>(
headers: &HeaderMap,
header: &'static str,
parse: F,
) -> Result<T, ContextError> {
parsed_header(headers, header, parse)?.ok_or(ContextError::Missing { header })
}
#[cfg(test)]
mod tests {
use http::HeaderValue;
use super::*;
#[test]
fn test_decode_b3() {
let collector: Arc<dyn TraceCollector> = Arc::new(trace::LogTraceCollector::new());
let mut headers = HeaderMap::new();
// No headers should be None
assert!(parse_span_ctx(&collector, &headers).unwrap().is_none());
headers.insert(B3_TRACE_ID_HEADER, HeaderValue::from_static("ee25f"));
headers.insert(B3_SAMPLED_HEADER, HeaderValue::from_static("0"));
// Not sampled
assert!(parse_span_ctx(&collector, &headers).unwrap().is_none());
headers.insert(B3_SAMPLED_HEADER, HeaderValue::from_static("1"));
// Missing required headers
assert_eq!(
parse_span_ctx(&collector, &headers)
.unwrap_err()
.to_string(),
"header 'X-B3-SpanId' not found"
);
headers.insert(B3_SPAN_ID_HEADER, HeaderValue::from_static("34e"));
let span = parse_span_ctx(&collector, &headers).unwrap().unwrap();
assert_eq!(span.span_id.0.get(), 0x34e);
assert_eq!(span.trace_id.0.get(), 0xee25f);
assert!(span.parent_span_id.is_none());
headers.insert(
B3_PARENT_SPAN_ID_HEADER,
HeaderValue::from_static("4595945"),
);
let span = parse_span_ctx(&collector, &headers).unwrap().unwrap();
assert_eq!(span.span_id.0.get(), 0x34e);
assert_eq!(span.trace_id.0.get(), 0xee25f);
assert_eq!(span.parent_span_id.unwrap().0.get(), 0x4595945);
headers.insert(B3_SPAN_ID_HEADER, HeaderValue::from_static("not a number"));
assert_eq!(
parse_span_ctx(&collector, &headers)
.unwrap_err()
.to_string(),
"error decoding header 'X-B3-SpanId': value decode error: invalid digit found in string"
);
headers.insert(B3_SPAN_ID_HEADER, HeaderValue::from_static("0"));
assert_eq!(
parse_span_ctx(&collector, &headers)
.unwrap_err()
.to_string(),
"error decoding header 'X-B3-SpanId': value cannot be 0"
);
}
#[test]
fn test_decode_jaeger() {
let collector: Arc<dyn TraceCollector> = Arc::new(trace::LogTraceCollector::new());
let mut headers = HeaderMap::new();
// Invalid format
headers.insert(JAEGER_TRACE_HEADER, HeaderValue::from_static("invalid"));
assert_eq!(
parse_span_ctx(&collector, &headers)
.unwrap_err()
.to_string(),
"error decoding header 'uber-trace-id': Expected \"trace-id:span-id:parent-span-id:flags\""
);
// Not sampled
headers.insert(
JAEGER_TRACE_HEADER,
HeaderValue::from_static("343:4325345:0:0"),
);
assert!(parse_span_ctx(&collector, &headers).unwrap().is_none());
// Sampled
headers.insert(
JAEGER_TRACE_HEADER,
HeaderValue::from_static("3a43:432e345:0:1"),
);
let span = parse_span_ctx(&collector, &headers).unwrap().unwrap();
assert_eq!(span.trace_id.0.get(), 0x3a43);
assert_eq!(span.span_id.0.get(), 0x432e345);
assert!(span.parent_span_id.is_none());
// Parent span
headers.insert(
JAEGER_TRACE_HEADER,
HeaderValue::from_static("343:4325345:3434:F"),
);
let span = parse_span_ctx(&collector, &headers).unwrap().unwrap();
assert_eq!(span.trace_id.0.get(), 0x343);
assert_eq!(span.span_id.0.get(), 0x4325345);
assert_eq!(span.parent_span_id.unwrap().0.get(), 0x3434);
// Invalid trace id
headers.insert(
JAEGER_TRACE_HEADER,
HeaderValue::from_static("0:4325345:3434:1"),
);
assert_eq!(
parse_span_ctx(&collector, &headers)
.unwrap_err()
.to_string(),
"error decoding header 'uber-trace-id': value cannot be 0"
);
headers.insert(
JAEGER_TRACE_HEADER,
HeaderValue::from_static("008e813572f53b3a:008e813572f53b3a:0000000000000000:1"),
);
let span = parse_span_ctx(&collector, &headers).unwrap().unwrap();
assert_eq!(span.trace_id.0.get(), 0x008e813572f53b3a);
assert_eq!(span.span_id.0.get(), 0x008e813572f53b3a);
assert!(span.parent_span_id.is_none());
}
}

11
trace_http/src/lib.rs Normal file
View File

@ -0,0 +1,11 @@
#![deny(broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::use_self,
clippy::clone_on_ref_ptr,
clippy::future_not_send
)]
pub mod ctx;
pub mod tower;

View File

@ -12,7 +12,7 @@
//! - This Body contains the data payload (potentially streamed)
//!
use crate::{ctx::SpanContext, span::EnteredSpan, TraceCollector};
use crate::ctx::parse_span_ctx;
use futures::ready;
use http::{Request, Response};
use http_body::SizeHint;
@ -23,6 +23,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tower::{Layer, Service};
use trace::{span::EnteredSpan, TraceCollector};
/// `TraceLayer` implements `tower::Layer` and can be used to decorate a
/// `tower::Service` to collect information about requests flowing through it
@ -79,7 +80,7 @@ where
}
};
let span = match SpanContext::from_headers(collector, request.headers()) {
let span = match parse_span_ctx(collector, request.headers()) {
Ok(Some(ctx)) => {
let span = ctx.child("IOx");