From 1d6a8703afc6f59f7f9e9f055fb18676712578c2 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Wed, 18 Aug 2021 09:05:21 +0100 Subject: [PATCH] feat: support sinking traces to an OTEL SpanExporter (#2319) * feat: support sinking traces to an OTEL SpanExporter * chore: consistent logging Co-authored-by: Andrew Lamb * chore: review feedback Co-authored-by: Andrew Lamb Co-authored-by: Andrew Lamb Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- Cargo.lock | 3 + Cargo.toml | 5 +- src/influxdb_ioxd.rs | 73 +++++++-- trace/Cargo.toml | 3 + trace/src/ctx.rs | 28 +++- trace/src/lib.rs | 7 +- trace/src/otel.rs | 369 +++++++++++++++++++++++++++++++++++++++++++ trace/src/span.rs | 101 ++++++------ trace/src/tower.rs | 4 +- 9 files changed, 525 insertions(+), 68 deletions(-) create mode 100644 trace/src/otel.rs diff --git a/Cargo.lock b/Cargo.lock index c309776eba..8c2d501b3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4710,6 +4710,7 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" name = "trace" version = "0.1.0" dependencies = [ + "async-trait", "chrono", "futures", "http", @@ -4722,6 +4723,8 @@ dependencies = [ "serde", "serde_json", "snafu", + "tokio", + "tokio-util", "tower", ] diff --git a/Cargo.toml b/Cargo.toml index ba2e708cb2..7647173251 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,7 @@ members = [ "server", "server_benchmarks", "test_helpers", + "trace", "tracker", "trogging", "grpc-router", @@ -60,7 +61,7 @@ datafusion = { path = "datafusion" } data_types = { path = "data_types" } entry = { path = "entry" } generated_types = { path = "generated_types" } -heappy = { git = "https://github.com/mkmik/heappy", rev = "aed37ab50a70c3f0a7a8bd1c51d28d3d8050461f", features = ["enable_heap_profiler", "jemalloc_shim", "measure_free"], optional = true} +heappy = { git = "https://github.com/mkmik/heappy", rev = "aed37ab50a70c3f0a7a8bd1c51d28d3d8050461f", features = ["enable_heap_profiler", "jemalloc_shim", "measure_free"], optional = true } influxdb_iox_client = { path = "influxdb_iox_client", features = ["format"] } influxdb_line_protocol = { path = "influxdb_line_protocol" } @@ -114,7 +115,7 @@ snafu = "0.6.9" structopt = "0.3.21" thiserror = "1.0.23" # remember to put "unprefixed_malloc_on_supported_platforms" if you disable heappy -tikv-jemallocator = {version = "0.4.0" } +tikv-jemallocator = { version = "0.4.0" } tikv-jemalloc-ctl = "0.4.0" tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "parking_lot", "signal"] } tokio-stream = { version = "0.1.2", features = ["net"] } diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index d122a975dd..999589d6de 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -323,8 +323,10 @@ mod tests { use super::*; use ::http::{header::HeaderName, HeaderValue}; use data_types::{database_rules::DatabaseRules, DatabaseName}; + use influxdb_iox_client::connection::Connection; use std::convert::TryInto; use structopt::StructOpt; + use trace::otel::{OtelExporter, TestOtelExporter}; use trace::RingBufferTraceCollector; fn test_config(server_id: Option) -> Config { @@ -513,6 +515,17 @@ mod tests { ) } + async fn jaeger_client(addr: SocketAddr, trace: &'static str) -> Connection { + influxdb_iox_client::connection::Builder::default() + .header( + HeaderName::from_static("uber-trace-id"), + HeaderValue::from_static(trace), + ) + .build(format!("http://{}", addr)) + .await + .unwrap() + } + #[tokio::test] async fn test_tracing() { let config = test_config(Some(23)); @@ -571,16 +584,8 @@ mod tests { b3_tracing_client.list_databases().await.unwrap(); b3_tracing_client.get_server_status().await.unwrap(); - let jaeger_tracing_client = influxdb_iox_client::connection::Builder::default() - .header( - HeaderName::from_static("uber-trace-id"), - HeaderValue::from_static("34f9495:30e34:0:1"), - ) - .build(format!("http://{}", addr)) - .await - .unwrap(); - - influxdb_iox_client::management::Client::new(jaeger_tracing_client) + let conn = jaeger_client(addr, "34f9495:30e34:0:1").await; + influxdb_iox_client::management::Client::new(conn) .list_databases() .await .unwrap(); @@ -588,7 +593,7 @@ mod tests { let spans = trace_collector.spans(); assert_eq!(spans.len(), 3); - let spans: Vec> = spans + let spans: Vec = spans .iter() .map(|x| serde_json::from_str(x.as_str()).unwrap()) .collect(); @@ -615,4 +620,50 @@ mod tests { server.shutdown(); join.await.unwrap().unwrap(); } + + #[tokio::test] + async fn test_otel_exporter() { + let config = test_config(Some(23)); + let application = make_application(&config).await.unwrap(); + let server = make_server(Arc::clone(&application), &config); + server.wait_for_init().await.unwrap(); + + let (sender, mut receiver) = tokio::sync::mpsc::channel(20); + + let collector = Arc::new(trace::otel::OtelExporter::new(TestOtelExporter::new( + sender, + ))); + + let grpc_listener = grpc_listener(config.grpc_bind_address).await.unwrap(); + let http_listener = http_listener(config.grpc_bind_address).await.unwrap(); + + let addr = grpc_listener.local_addr().unwrap(); + + let fut = serve( + config, + application, + grpc_listener, + http_listener, + Arc::::clone(&collector), + Arc::clone(&server), + ); + + let join = tokio::spawn(fut); + + let conn = jaeger_client(addr, "34f8495:30e34:0:1").await; + influxdb_iox_client::management::Client::new(conn) + .list_databases() + .await + .unwrap(); + + collector.shutdown(); + collector.join().await.unwrap(); + + server.shutdown(); + join.await.unwrap().unwrap(); + + let span = receiver.recv().await.unwrap(); + assert_eq!(span.span_context.trace_id().to_u128(), 0x34f8495); + assert_eq!(span.parent_span_id.to_u64(), 0x30e34); + } } diff --git a/trace/Cargo.toml b/trace/Cargo.toml index ca8d8d0a32..6fd4ab092a 100644 --- a/trace/Cargo.toml +++ b/trace/Cargo.toml @@ -7,6 +7,7 @@ description = "Distributed tracing support within IOx" [dependencies] +async-trait = "0.1" chrono = { version = "0.4", features = ["serde"] } futures = "0.3" http = "0.2" @@ -20,5 +21,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" snafu = "0.6" tower = "0.4" +tokio = { version = "1.0", features = ["macros", "time", "sync"] } +tokio-util = { version = "0.6.3" } [dev-dependencies] diff --git a/trace/src/ctx.rs b/trace/src/ctx.rs index 56e1314412..4f18abafec 100644 --- a/trace/src/ctx.rs +++ b/trace/src/ctx.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::num::{NonZeroU128, NonZeroU64, ParseIntError}; use std::str::FromStr; use std::sync::Arc; @@ -8,8 +9,8 @@ use rand::Rng; use serde::{Deserialize, Serialize}; use snafu::Snafu; -use crate::ctx::ContextCodec::{Jaeger, B3}; use crate::{ + ctx::ContextCodec::{Jaeger, B3}, span::{Span, SpanStatus}, TraceCollector, }; @@ -64,6 +65,16 @@ impl From for DecodeError { #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub struct TraceId(pub NonZeroU128); +impl TraceId { + pub fn new(val: u128) -> Option { + Some(Self(NonZeroU128::new(val)?)) + } + + pub fn get(self) -> u128 { + self.0.get() + } +} + impl<'a> FromStr for TraceId { type Err = DecodeError; @@ -78,10 +89,18 @@ impl<'a> FromStr for TraceId { pub struct SpanId(pub NonZeroU64); impl SpanId { + pub fn new(val: u64) -> Option { + Some(Self(NonZeroU64::new(val)?)) + } + pub fn gen() -> Self { // Should this be a UUID? Self(rand::thread_rng().gen()) } + + pub fn get(self) -> u64 { + self.0.get() + } } impl<'a> FromStr for SpanId { @@ -111,9 +130,9 @@ pub struct SpanContext { impl SpanContext { /// Creates a new child of the Span described by this TraceContext - pub fn child<'a>(&self, name: &'a str) -> Span<'a> { + pub fn child(&self, name: impl Into>) -> Span { Span { - name, + name: name.into(), ctx: Self { trace_id: self.trace_id, span_id: SpanId::gen(), @@ -296,9 +315,10 @@ fn required_header>( #[cfg(test)] mod tests { - use super::*; use http::HeaderValue; + use super::*; + #[test] fn test_decode_b3() { let collector: Arc = Arc::new(crate::LogTraceCollector::new()); diff --git a/trace/src/lib.rs b/trace/src/lib.rs index d46ec1a878..0eee25616f 100644 --- a/trace/src/lib.rs +++ b/trace/src/lib.rs @@ -16,12 +16,13 @@ use observability_deps::tracing::info; use crate::span::Span; pub mod ctx; +pub mod otel; pub mod span; pub mod tower; /// A TraceCollector is a sink for completed `Span` pub trait TraceCollector: std::fmt::Debug + Send + Sync { - fn export(&self, span: &span::Span<'_>); + fn export(&self, span: Span); } /// A basic trace collector that prints to stdout @@ -41,7 +42,7 @@ impl Default for LogTraceCollector { } impl TraceCollector for LogTraceCollector { - fn export(&self, span: &Span<'_>) { + fn export(&self, span: Span) { info!("completed span {}", span.json()) } } @@ -65,7 +66,7 @@ impl RingBufferTraceCollector { } impl TraceCollector for RingBufferTraceCollector { - fn export(&self, span: &Span<'_>) { + fn export(&self, span: Span) { let serialized = span.json(); let mut buffer = self.buffer.lock(); if buffer.len() == buffer.capacity() { diff --git a/trace/src/otel.rs b/trace/src/otel.rs new file mode 100644 index 0000000000..6ee3e9e727 --- /dev/null +++ b/trace/src/otel.rs @@ -0,0 +1,369 @@ +use std::borrow::Cow; +use std::future::Future; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::{ + future::{BoxFuture, Shared}, + FutureExt, TryFutureExt, +}; +use tokio::sync::mpsc; +use tokio::task::JoinError; +use tokio_util::sync::CancellationToken; + +use observability_deps::{ + opentelemetry::sdk::export::trace::{ExportResult, SpanData, SpanExporter}, + tracing::{error, info, warn}, +}; + +use crate::ctx::{SpanContext, SpanId, TraceId}; +use crate::span::{MetaValue, SpanEvent, SpanStatus}; +use crate::{span::Span, TraceCollector}; + +/// Size of the exporter buffer +const CHANNEL_SIZE: usize = 1000; + +/// Maximum number of events that can be associated with a span +const MAX_EVENTS: u32 = 10; + +/// Maximum number of attributes that can be associated with a span +const MAX_ATTRIBUTES: u32 = 100; + +/// `OtelExporter` wraps a opentelemetry SpanExporter and sinks spans to it +/// +/// In order to do this it spawns a background worker that pulls messages +/// of a queue and writes them to opentelemetry. If this worker cannot keep +/// up, and this queue fills up, spans will be dropped and warnings logged +#[derive(Debug)] +pub struct OtelExporter { + join: Shared>>>, + + sender: tokio::sync::mpsc::Sender, + + shutdown: CancellationToken, +} + +impl OtelExporter { + /// Creates a new `OtelExporter` + pub fn new(exporter: T) -> Self { + let shutdown = CancellationToken::new(); + let (sender, receiver) = mpsc::channel(CHANNEL_SIZE); + + let handle = tokio::spawn(background_worker(shutdown.clone(), exporter, receiver)); + let join = handle.map_err(Arc::new).boxed().shared(); + + Self { + join, + shutdown, + sender, + } + } + + /// Triggers shutdown of this `OtelExporter` + pub fn shutdown(&self) { + info!("otel exporter shutting down"); + self.shutdown.cancel() + } + + /// Waits for the background worker of OtelExporter to finish + pub fn join(&self) -> impl Future>> { + self.join.clone() + } +} + +impl TraceCollector for OtelExporter { + fn export(&self, span: Span) { + use mpsc::error::TrySendError; + + match self.sender.try_send(span.into()) { + Ok(_) => { + //TODO: Increment some metric + } + Err(TrySendError::Full(_)) => { + warn!("exporter cannot keep up, dropping spans") + } + Err(TrySendError::Closed(_)) => { + warn!("background worker shutdown") + } + } + } +} + +async fn background_worker( + shutdown: CancellationToken, + exporter: T, + receiver: mpsc::Receiver, +) { + tokio::select! { + _ = exporter_loop(exporter, receiver) => { + // Don't expect this future to complete + error!("otel exporter loop completed") + } + _ = shutdown.cancelled() => {} + } + info!("otel exporter shut down") +} + +/// An opentelemetry::SpanExporter that sinks writes to a tokio mpsc channel. +/// +/// Intended for testing ONLY +/// +/// Note: There is a similar construct in opentelemetry behind the testing feature +/// flag, but enabling this brings in a large number of additional dependencies and +/// so we just implement our own version +#[derive(Debug)] +pub struct TestOtelExporter { + channel: mpsc::Sender, +} + +impl TestOtelExporter { + pub fn new(channel: mpsc::Sender) -> Self { + Self { channel } + } +} + +#[async_trait] +impl SpanExporter for TestOtelExporter { + async fn export(&mut self, batch: Vec) -> ExportResult { + for span in batch { + self.channel.send(span).await.expect("channel closed") + } + Ok(()) + } +} + +async fn exporter_loop( + mut exporter: T, + mut receiver: tokio::sync::mpsc::Receiver, +) { + while let Some(span) = receiver.recv().await { + // TODO: Batch export spans + if let Err(e) = exporter.export(vec![span]).await { + error!(%e, "error exporting span") + } + } + + warn!("sender-side of jaeger exporter dropped without waiting for shut down") +} + +impl From for SpanData { + fn from(span: Span) -> Self { + use observability_deps::opentelemetry::sdk::trace::{EvictedHashMap, EvictedQueue}; + use observability_deps::opentelemetry::sdk::InstrumentationLibrary; + use observability_deps::opentelemetry::trace::{SpanId, SpanKind}; + use observability_deps::opentelemetry::{Key, KeyValue}; + + let parent_span_id = match span.ctx.parent_span_id { + Some(id) => id.into(), + None => SpanId::invalid(), + }; + + let mut ret = Self { + span_context: (&span.ctx).into(), + parent_span_id, + span_kind: SpanKind::Server, + name: span.name, + start_time: span.start.map(Into::into).unwrap_or(std::time::UNIX_EPOCH), + end_time: span.end.map(Into::into).unwrap_or(std::time::UNIX_EPOCH), + attributes: EvictedHashMap::new(MAX_ATTRIBUTES, 0), + events: EvictedQueue::new(MAX_EVENTS), + links: EvictedQueue::new(0), + status_code: span.status.into(), + status_message: Default::default(), + resource: None, + instrumentation_lib: InstrumentationLibrary::new("iox-trace", None), + }; + + ret.events.extend(span.events.into_iter().map(Into::into)); + for (key, value) in span.metadata { + let key = match key { + Cow::Owned(key) => Key::new(key), + Cow::Borrowed(key) => Key::new(key), + }; + + ret.attributes.insert(KeyValue::new(key, value)) + } + ret + } +} + +impl<'a> From<&'a SpanContext> for observability_deps::opentelemetry::trace::SpanContext { + fn from(ctx: &'a SpanContext) -> Self { + Self::new( + ctx.trace_id.into(), + ctx.span_id.into(), + Default::default(), + false, + Default::default(), + ) + } +} + +impl From for observability_deps::opentelemetry::trace::Event { + fn from(event: SpanEvent) -> Self { + Self { + name: event.msg, + timestamp: event.time.into(), + attributes: vec![], + dropped_attributes_count: 0, + } + } +} + +impl From for observability_deps::opentelemetry::trace::StatusCode { + fn from(status: SpanStatus) -> Self { + match status { + SpanStatus::Unknown => Self::Unset, + SpanStatus::Ok => Self::Ok, + SpanStatus::Err => Self::Error, + } + } +} + +impl From for observability_deps::opentelemetry::trace::SpanId { + fn from(id: SpanId) -> Self { + Self::from_u64(id.0.get()) + } +} + +impl From for observability_deps::opentelemetry::trace::TraceId { + fn from(id: TraceId) -> Self { + Self::from_u128(id.0.get()) + } +} + +impl From for observability_deps::opentelemetry::Value { + fn from(v: MetaValue) -> Self { + match v { + MetaValue::String(v) => Self::String(v), + MetaValue::Float(v) => Self::F64(v), + MetaValue::Int(v) => Self::I64(v), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{TimeZone, Utc}; + use observability_deps::opentelemetry::{Key, Value}; + use std::time::{Duration, UNIX_EPOCH}; + + #[test] + fn test_conversion() { + let root = SpanContext { + trace_id: TraceId::new(232345).unwrap(), + parent_span_id: Some(SpanId::new(2484).unwrap()), + span_id: SpanId::new(2343).unwrap(), + collector: None, + }; + + let mut span = root.child("foo"); + span.metadata.insert("string".into(), "bar".into()); + span.metadata.insert("float".into(), 3.32.into()); + span.metadata.insert("int".into(), 5.into()); + + span.events.push(SpanEvent { + time: Utc.timestamp_nanos(1230), + msg: "event".into(), + }); + span.status = SpanStatus::Ok; + + span.start = Some(Utc.timestamp_nanos(1000)); + span.end = Some(Utc.timestamp_nanos(2000)); + + let span_data: SpanData = span.clone().into(); + + assert_eq!( + span_data.span_context.span_id().to_u64(), + span.ctx.span_id.get() + ); + assert_eq!( + span_data.span_context.trace_id().to_u128(), + span.ctx.trace_id.get() + ); + assert_eq!( + span_data.parent_span_id.to_u64(), + span.ctx.parent_span_id.unwrap().get() + ); + assert_eq!( + span_data.start_time, + UNIX_EPOCH + Duration::from_nanos(1000) + ); + assert_eq!(span_data.end_time, UNIX_EPOCH + Duration::from_nanos(2000)); + + let events: Vec<_> = span_data.events.iter().collect(); + assert_eq!(events.len(), 1); + assert_eq!(events[0].name.as_ref(), "event"); + assert_eq!(events[0].timestamp, UNIX_EPOCH + Duration::from_nanos(1230)); + assert_eq!(events[0].attributes.len(), 0); + + assert_eq!( + span_data + .attributes + .get(&Key::from_static_str("string")) + .unwrap() + .clone(), + Value::String("bar".into()) + ); + assert_eq!( + span_data + .attributes + .get(&Key::from_static_str("float")) + .unwrap() + .clone(), + Value::F64(3.32) + ); + assert_eq!( + span_data + .attributes + .get(&Key::from_static_str("int")) + .unwrap() + .clone(), + Value::I64(5) + ); + } + + #[tokio::test] + async fn test_exporter() { + let (sender, mut receiver) = mpsc::channel(10); + let exporter = OtelExporter::new(TestOtelExporter::new(sender)); + + assert!(exporter.join().now_or_never().is_none()); + + let root = SpanContext { + trace_id: TraceId::new(232345).unwrap(), + parent_span_id: None, + span_id: SpanId::new(2343).unwrap(), + collector: None, + }; + let s1 = root.child("foo"); + let s2 = root.child("bar"); + + exporter.export(s1.clone()); + exporter.export(s2.clone()); + exporter.export(s2.clone()); + + let r1 = receiver.recv().await.unwrap(); + let r2 = receiver.recv().await.unwrap(); + let r3 = receiver.recv().await.unwrap(); + + exporter.shutdown(); + exporter.join().await.unwrap(); + + // Should not be fatal despite exporter having been shutdown + exporter.export(s2.clone()); + + assert_eq!(root.span_id.get(), r1.parent_span_id.to_u64()); + assert_eq!(s1.ctx.span_id.get(), r1.span_context.span_id().to_u64()); + assert_eq!(s1.ctx.trace_id.get(), r1.span_context.trace_id().to_u128()); + + assert_eq!(root.span_id.get(), r2.parent_span_id.to_u64()); + assert_eq!(s2.ctx.span_id.get(), r2.span_context.span_id().to_u64()); + assert_eq!(s2.ctx.trace_id.get(), r2.span_context.trace_id().to_u128()); + + assert_eq!(root.span_id.get(), r3.parent_span_id.to_u64()); + assert_eq!(s2.ctx.span_id.get(), r3.span_context.span_id().to_u64()); + assert_eq!(s2.ctx.trace_id.get(), r3.span_context.trace_id().to_u128()); + } +} diff --git a/trace/src/span.rs b/trace/src/span.rs index bde4fc4154..089ef69e3e 100644 --- a/trace/src/span.rs +++ b/trace/src/span.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::collections::HashMap; use std::ops::{Deref, DerefMut}; @@ -20,11 +21,10 @@ pub enum SpanStatus { /// A `Span` has a name, metadata, a start and end time and a unique ID. Additionally they /// have relationships with other Spans that together comprise a Trace /// -/// On Drop a `Span` is published to the registered collector /// -#[derive(Debug, Serialize, Deserialize)] -pub struct Span<'a> { - pub name: &'a str, +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Span { + pub name: Cow<'static, str>, //#[serde(flatten)] - https://github.com/serde-rs/json/issues/505 pub ctx: SpanContext, @@ -35,15 +35,14 @@ pub struct Span<'a> { pub status: SpanStatus, - #[serde(borrow)] - pub metadata: HashMap<&'a str, MetaValue<'a>>, + pub metadata: HashMap, MetaValue>, - #[serde(borrow)] - pub events: Vec>, + pub events: Vec, } -impl<'a> Span<'a> { - pub fn event(&mut self, meta: impl Into>) { +impl Span { + /// Record an event on this `Span` + pub fn event(&mut self, meta: impl Into>) { let event = SpanEvent { time: Utc::now(), msg: meta.into(), @@ -51,11 +50,13 @@ impl<'a> Span<'a> { self.events.push(event) } - pub fn error(&mut self, meta: impl Into>) { + /// Record an error on this `Span` + pub fn error(&mut self, meta: impl Into>) { self.event(meta); self.status = SpanStatus::Err; } + /// Returns a JSON representation of this `Span` pub fn json(&self) -> String { match serde_json::to_string(self) { Ok(serialized) => serialized, @@ -65,46 +66,50 @@ impl<'a> Span<'a> { } } } -} -impl<'a> Drop for Span<'a> { - fn drop(&mut self) { - if let Some(collector) = &self.ctx.collector { + /// Exports this `Span` to its registered collector if any + pub fn export(mut self) { + if let Some(collector) = self.ctx.collector.take() { collector.export(self) } } } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SpanEvent<'a> { +pub struct SpanEvent { pub time: DateTime, - #[serde(borrow)] - pub msg: MetaValue<'a>, + pub msg: Cow<'static, str>, } /// Values that can be stored in a Span's metadata and events -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(untagged)] -pub enum MetaValue<'a> { - String(&'a str), +pub enum MetaValue { + String(Cow<'static, str>), Float(f64), Int(i64), } -impl<'a> From<&'a str> for MetaValue<'a> { - fn from(v: &'a str) -> Self { - Self::String(v) +impl From<&'static str> for MetaValue { + fn from(v: &'static str) -> Self { + Self::String(Cow::Borrowed(v)) } } -impl<'a> From for MetaValue<'a> { +impl From for MetaValue { + fn from(v: String) -> Self { + Self::String(Cow::Owned(v)) + } +} + +impl From for MetaValue { fn from(v: f64) -> Self { Self::Float(v) } } -impl<'a> From for MetaValue<'a> { +impl From for MetaValue { fn from(v: i64) -> Self { Self::Int(v) } @@ -112,46 +117,50 @@ impl<'a> From for MetaValue<'a> { /// Updates the start and end times on the provided Span #[derive(Debug)] -pub struct EnteredSpan<'a> { - span: Span<'a>, +pub struct EnteredSpan { + /// Option so we can take out of it on drop / publish + span: Option, } -impl<'a> Deref for EnteredSpan<'a> { - type Target = Span<'a>; +impl<'a> Deref for EnteredSpan { + type Target = Span; fn deref(&self) -> &Self::Target { - &self.span + self.span.as_ref().expect("dropped") } } -impl<'a> DerefMut for EnteredSpan<'a> { +impl<'a> DerefMut for EnteredSpan { fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.span + self.span.as_mut().expect("dropped") } } -impl<'a> EnteredSpan<'a> { - pub fn new(mut span: Span<'a>) -> Self { +impl<'a> EnteredSpan { + pub fn new(mut span: Span) -> Self { span.start = Some(Utc::now()); - Self { span } + Self { span: Some(span) } } } -impl<'a> Drop for EnteredSpan<'a> { +impl<'a> Drop for EnteredSpan { fn drop(&mut self) { let now = Utc::now(); - // SystemTime is not monotonic so must also check min + let mut span = self.span.take().expect("dropped"); - self.span.start = Some(match self.span.start { + // SystemTime is not monotonic so must also check min + span.start = Some(match span.start { Some(a) => a.min(now), None => now, }); - self.span.end = Some(match self.span.end { + span.end = Some(match span.end { Some(a) => a.max(now), None => now, }); + + span.export() } } @@ -167,9 +176,9 @@ mod tests { use super::*; - fn make_span(collector: Arc) -> Span<'static> { + fn make_span(collector: Arc) -> Span { Span { - name: "foo", + name: "foo".into(), ctx: SpanContext { trace_id: TraceId(NonZeroU128::new(23948923).unwrap()), parent_span_id: None, @@ -197,7 +206,7 @@ mod tests { span.events.push(SpanEvent { time: Utc.timestamp_nanos(1000), - msg: MetaValue::String("this is a test event"), + msg: "this is a test event".into(), }); assert_eq!( @@ -205,7 +214,7 @@ mod tests { r#"{"name":"foo","ctx":{"trace_id":23948923,"parent_span_id":null,"span_id":3498394},"start":null,"end":null,"status":"Unknown","metadata":{},"events":[{"time":"1970-01-01T00:00:00.000001Z","msg":"this is a test event"}]}"# ); - span.metadata.insert("foo", MetaValue::String("bar")); + span.metadata.insert("foo".into(), "bar".into()); span.start = Some(Utc.timestamp_nanos(100)); assert_eq!( @@ -219,7 +228,7 @@ mod tests { let expected = r#"{"name":"foo","ctx":{"trace_id":23948923,"parent_span_id":23493,"span_id":3498394},"start":"1970-01-01T00:00:00.000000100Z","end":null,"status":"Ok","metadata":{"foo":"bar"},"events":[{"time":"1970-01-01T00:00:00.000001Z","msg":"this is a test event"}]}"#; assert_eq!(span.json(), expected); - std::mem::drop(span); + span.export(); // Should publish span let spans = collector.spans(); @@ -243,7 +252,7 @@ mod tests { let spans = collector.spans(); assert_eq!(spans.len(), 1); - let span: Span<'_> = serde_json::from_str(spans[0].as_str()).unwrap(); + let span: Span = serde_json::from_str(spans[0].as_str()).unwrap(); assert!(span.start.is_some()); assert!(span.end.is_some()); diff --git a/trace/src/tower.rs b/trace/src/tower.rs index 313f173735..2e0581242f 100644 --- a/trace/src/tower.rs +++ b/trace/src/tower.rs @@ -98,7 +98,7 @@ where #[pin_project] #[derive(Debug)] pub struct TracedFuture { - span: Option>, + span: Option, #[pin] inner: F, } @@ -133,7 +133,7 @@ where #[pin_project] #[derive(Debug)] pub struct TracedBody { - span: Option>, + span: Option, #[pin] inner: B, }