feat: support sinking traces to an OTEL SpanExporter (#2319)

* feat: support sinking traces to an OTEL SpanExporter

* chore: consistent logging

Co-authored-by: Andrew Lamb <alamb@influxdata.com>

* chore: review feedback

Co-authored-by: Andrew Lamb <alamb@influxdata.com>

Co-authored-by: Andrew Lamb <alamb@influxdata.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-08-18 09:05:21 +01:00 committed by GitHub
parent f93be2bae4
commit 1d6a8703af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 525 additions and 68 deletions

3
Cargo.lock generated
View File

@ -4710,6 +4710,7 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
name = "trace"
version = "0.1.0"
dependencies = [
"async-trait",
"chrono",
"futures",
"http",
@ -4722,6 +4723,8 @@ dependencies = [
"serde",
"serde_json",
"snafu",
"tokio",
"tokio-util",
"tower",
]

View File

@ -41,6 +41,7 @@ members = [
"server",
"server_benchmarks",
"test_helpers",
"trace",
"tracker",
"trogging",
"grpc-router",
@ -60,7 +61,7 @@ datafusion = { path = "datafusion" }
data_types = { path = "data_types" }
entry = { path = "entry" }
generated_types = { path = "generated_types" }
heappy = { git = "https://github.com/mkmik/heappy", rev = "aed37ab50a70c3f0a7a8bd1c51d28d3d8050461f", features = ["enable_heap_profiler", "jemalloc_shim", "measure_free"], optional = true}
heappy = { git = "https://github.com/mkmik/heappy", rev = "aed37ab50a70c3f0a7a8bd1c51d28d3d8050461f", features = ["enable_heap_profiler", "jemalloc_shim", "measure_free"], optional = true }
influxdb_iox_client = { path = "influxdb_iox_client", features = ["format"] }
influxdb_line_protocol = { path = "influxdb_line_protocol" }
@ -114,7 +115,7 @@ snafu = "0.6.9"
structopt = "0.3.21"
thiserror = "1.0.23"
# remember to put "unprefixed_malloc_on_supported_platforms" if you disable heappy
tikv-jemallocator = {version = "0.4.0" }
tikv-jemallocator = { version = "0.4.0" }
tikv-jemalloc-ctl = "0.4.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "parking_lot", "signal"] }
tokio-stream = { version = "0.1.2", features = ["net"] }

View File

@ -323,8 +323,10 @@ mod tests {
use super::*;
use ::http::{header::HeaderName, HeaderValue};
use data_types::{database_rules::DatabaseRules, DatabaseName};
use influxdb_iox_client::connection::Connection;
use std::convert::TryInto;
use structopt::StructOpt;
use trace::otel::{OtelExporter, TestOtelExporter};
use trace::RingBufferTraceCollector;
fn test_config(server_id: Option<u32>) -> Config {
@ -513,6 +515,17 @@ mod tests {
)
}
async fn jaeger_client(addr: SocketAddr, trace: &'static str) -> Connection {
influxdb_iox_client::connection::Builder::default()
.header(
HeaderName::from_static("uber-trace-id"),
HeaderValue::from_static(trace),
)
.build(format!("http://{}", addr))
.await
.unwrap()
}
#[tokio::test]
async fn test_tracing() {
let config = test_config(Some(23));
@ -571,16 +584,8 @@ mod tests {
b3_tracing_client.list_databases().await.unwrap();
b3_tracing_client.get_server_status().await.unwrap();
let jaeger_tracing_client = influxdb_iox_client::connection::Builder::default()
.header(
HeaderName::from_static("uber-trace-id"),
HeaderValue::from_static("34f9495:30e34:0:1"),
)
.build(format!("http://{}", addr))
.await
.unwrap();
influxdb_iox_client::management::Client::new(jaeger_tracing_client)
let conn = jaeger_client(addr, "34f9495:30e34:0:1").await;
influxdb_iox_client::management::Client::new(conn)
.list_databases()
.await
.unwrap();
@ -588,7 +593,7 @@ mod tests {
let spans = trace_collector.spans();
assert_eq!(spans.len(), 3);
let spans: Vec<trace::span::Span<'_>> = spans
let spans: Vec<trace::span::Span> = spans
.iter()
.map(|x| serde_json::from_str(x.as_str()).unwrap())
.collect();
@ -615,4 +620,50 @@ mod tests {
server.shutdown();
join.await.unwrap().unwrap();
}
#[tokio::test]
async fn test_otel_exporter() {
let config = test_config(Some(23));
let application = make_application(&config).await.unwrap();
let server = make_server(Arc::clone(&application), &config);
server.wait_for_init().await.unwrap();
let (sender, mut receiver) = tokio::sync::mpsc::channel(20);
let collector = Arc::new(trace::otel::OtelExporter::new(TestOtelExporter::new(
sender,
)));
let grpc_listener = grpc_listener(config.grpc_bind_address).await.unwrap();
let http_listener = http_listener(config.grpc_bind_address).await.unwrap();
let addr = grpc_listener.local_addr().unwrap();
let fut = serve(
config,
application,
grpc_listener,
http_listener,
Arc::<OtelExporter>::clone(&collector),
Arc::clone(&server),
);
let join = tokio::spawn(fut);
let conn = jaeger_client(addr, "34f8495:30e34:0:1").await;
influxdb_iox_client::management::Client::new(conn)
.list_databases()
.await
.unwrap();
collector.shutdown();
collector.join().await.unwrap();
server.shutdown();
join.await.unwrap().unwrap();
let span = receiver.recv().await.unwrap();
assert_eq!(span.span_context.trace_id().to_u128(), 0x34f8495);
assert_eq!(span.parent_span_id.to_u64(), 0x30e34);
}
}

View File

@ -7,6 +7,7 @@ description = "Distributed tracing support within IOx"
[dependencies]
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
futures = "0.3"
http = "0.2"
@ -20,5 +21,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
snafu = "0.6"
tower = "0.4"
tokio = { version = "1.0", features = ["macros", "time", "sync"] }
tokio-util = { version = "0.6.3" }
[dev-dependencies]

View File

@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::num::{NonZeroU128, NonZeroU64, ParseIntError};
use std::str::FromStr;
use std::sync::Arc;
@ -8,8 +9,8 @@ use rand::Rng;
use serde::{Deserialize, Serialize};
use snafu::Snafu;
use crate::ctx::ContextCodec::{Jaeger, B3};
use crate::{
ctx::ContextCodec::{Jaeger, B3},
span::{Span, SpanStatus},
TraceCollector,
};
@ -64,6 +65,16 @@ impl From<ParseIntError> for DecodeError {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct TraceId(pub NonZeroU128);
impl TraceId {
pub fn new(val: u128) -> Option<Self> {
Some(Self(NonZeroU128::new(val)?))
}
pub fn get(self) -> u128 {
self.0.get()
}
}
impl<'a> FromStr for TraceId {
type Err = DecodeError;
@ -78,10 +89,18 @@ impl<'a> FromStr for TraceId {
pub struct SpanId(pub NonZeroU64);
impl SpanId {
pub fn new(val: u64) -> Option<Self> {
Some(Self(NonZeroU64::new(val)?))
}
pub fn gen() -> Self {
// Should this be a UUID?
Self(rand::thread_rng().gen())
}
pub fn get(self) -> u64 {
self.0.get()
}
}
impl<'a> FromStr for SpanId {
@ -111,9 +130,9 @@ pub struct SpanContext {
impl SpanContext {
/// Creates a new child of the Span described by this TraceContext
pub fn child<'a>(&self, name: &'a str) -> Span<'a> {
pub fn child(&self, name: impl Into<Cow<'static, str>>) -> Span {
Span {
name,
name: name.into(),
ctx: Self {
trace_id: self.trace_id,
span_id: SpanId::gen(),
@ -296,9 +315,10 @@ fn required_header<T: FromStr<Err = DecodeError>>(
#[cfg(test)]
mod tests {
use super::*;
use http::HeaderValue;
use super::*;
#[test]
fn test_decode_b3() {
let collector: Arc<dyn TraceCollector> = Arc::new(crate::LogTraceCollector::new());

View File

@ -16,12 +16,13 @@ use observability_deps::tracing::info;
use crate::span::Span;
pub mod ctx;
pub mod otel;
pub mod span;
pub mod tower;
/// A TraceCollector is a sink for completed `Span`
pub trait TraceCollector: std::fmt::Debug + Send + Sync {
fn export(&self, span: &span::Span<'_>);
fn export(&self, span: Span);
}
/// A basic trace collector that prints to stdout
@ -41,7 +42,7 @@ impl Default for LogTraceCollector {
}
impl TraceCollector for LogTraceCollector {
fn export(&self, span: &Span<'_>) {
fn export(&self, span: Span) {
info!("completed span {}", span.json())
}
}
@ -65,7 +66,7 @@ impl RingBufferTraceCollector {
}
impl TraceCollector for RingBufferTraceCollector {
fn export(&self, span: &Span<'_>) {
fn export(&self, span: Span) {
let serialized = span.json();
let mut buffer = self.buffer.lock();
if buffer.len() == buffer.capacity() {

369
trace/src/otel.rs Normal file
View File

@ -0,0 +1,369 @@
use std::borrow::Cow;
use std::future::Future;
use std::sync::Arc;
use async_trait::async_trait;
use futures::{
future::{BoxFuture, Shared},
FutureExt, TryFutureExt,
};
use tokio::sync::mpsc;
use tokio::task::JoinError;
use tokio_util::sync::CancellationToken;
use observability_deps::{
opentelemetry::sdk::export::trace::{ExportResult, SpanData, SpanExporter},
tracing::{error, info, warn},
};
use crate::ctx::{SpanContext, SpanId, TraceId};
use crate::span::{MetaValue, SpanEvent, SpanStatus};
use crate::{span::Span, TraceCollector};
/// Size of the exporter buffer
const CHANNEL_SIZE: usize = 1000;
/// Maximum number of events that can be associated with a span
const MAX_EVENTS: u32 = 10;
/// Maximum number of attributes that can be associated with a span
const MAX_ATTRIBUTES: u32 = 100;
/// `OtelExporter` wraps a opentelemetry SpanExporter and sinks spans to it
///
/// In order to do this it spawns a background worker that pulls messages
/// of a queue and writes them to opentelemetry. If this worker cannot keep
/// up, and this queue fills up, spans will be dropped and warnings logged
#[derive(Debug)]
pub struct OtelExporter {
join: Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>,
sender: tokio::sync::mpsc::Sender<SpanData>,
shutdown: CancellationToken,
}
impl OtelExporter {
/// Creates a new `OtelExporter`
pub fn new<T: SpanExporter + 'static>(exporter: T) -> Self {
let shutdown = CancellationToken::new();
let (sender, receiver) = mpsc::channel(CHANNEL_SIZE);
let handle = tokio::spawn(background_worker(shutdown.clone(), exporter, receiver));
let join = handle.map_err(Arc::new).boxed().shared();
Self {
join,
shutdown,
sender,
}
}
/// Triggers shutdown of this `OtelExporter`
pub fn shutdown(&self) {
info!("otel exporter shutting down");
self.shutdown.cancel()
}
/// Waits for the background worker of OtelExporter to finish
pub fn join(&self) -> impl Future<Output = Result<(), Arc<JoinError>>> {
self.join.clone()
}
}
impl TraceCollector for OtelExporter {
fn export(&self, span: Span) {
use mpsc::error::TrySendError;
match self.sender.try_send(span.into()) {
Ok(_) => {
//TODO: Increment some metric
}
Err(TrySendError::Full(_)) => {
warn!("exporter cannot keep up, dropping spans")
}
Err(TrySendError::Closed(_)) => {
warn!("background worker shutdown")
}
}
}
}
async fn background_worker<T: SpanExporter + 'static>(
shutdown: CancellationToken,
exporter: T,
receiver: mpsc::Receiver<SpanData>,
) {
tokio::select! {
_ = exporter_loop(exporter, receiver) => {
// Don't expect this future to complete
error!("otel exporter loop completed")
}
_ = shutdown.cancelled() => {}
}
info!("otel exporter shut down")
}
/// An opentelemetry::SpanExporter that sinks writes to a tokio mpsc channel.
///
/// Intended for testing ONLY
///
/// Note: There is a similar construct in opentelemetry behind the testing feature
/// flag, but enabling this brings in a large number of additional dependencies and
/// so we just implement our own version
#[derive(Debug)]
pub struct TestOtelExporter {
channel: mpsc::Sender<SpanData>,
}
impl TestOtelExporter {
pub fn new(channel: mpsc::Sender<SpanData>) -> Self {
Self { channel }
}
}
#[async_trait]
impl SpanExporter for TestOtelExporter {
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
for span in batch {
self.channel.send(span).await.expect("channel closed")
}
Ok(())
}
}
async fn exporter_loop<T: SpanExporter + 'static>(
mut exporter: T,
mut receiver: tokio::sync::mpsc::Receiver<SpanData>,
) {
while let Some(span) = receiver.recv().await {
// TODO: Batch export spans
if let Err(e) = exporter.export(vec![span]).await {
error!(%e, "error exporting span")
}
}
warn!("sender-side of jaeger exporter dropped without waiting for shut down")
}
impl From<Span> for SpanData {
fn from(span: Span) -> Self {
use observability_deps::opentelemetry::sdk::trace::{EvictedHashMap, EvictedQueue};
use observability_deps::opentelemetry::sdk::InstrumentationLibrary;
use observability_deps::opentelemetry::trace::{SpanId, SpanKind};
use observability_deps::opentelemetry::{Key, KeyValue};
let parent_span_id = match span.ctx.parent_span_id {
Some(id) => id.into(),
None => SpanId::invalid(),
};
let mut ret = Self {
span_context: (&span.ctx).into(),
parent_span_id,
span_kind: SpanKind::Server,
name: span.name,
start_time: span.start.map(Into::into).unwrap_or(std::time::UNIX_EPOCH),
end_time: span.end.map(Into::into).unwrap_or(std::time::UNIX_EPOCH),
attributes: EvictedHashMap::new(MAX_ATTRIBUTES, 0),
events: EvictedQueue::new(MAX_EVENTS),
links: EvictedQueue::new(0),
status_code: span.status.into(),
status_message: Default::default(),
resource: None,
instrumentation_lib: InstrumentationLibrary::new("iox-trace", None),
};
ret.events.extend(span.events.into_iter().map(Into::into));
for (key, value) in span.metadata {
let key = match key {
Cow::Owned(key) => Key::new(key),
Cow::Borrowed(key) => Key::new(key),
};
ret.attributes.insert(KeyValue::new(key, value))
}
ret
}
}
impl<'a> From<&'a SpanContext> for observability_deps::opentelemetry::trace::SpanContext {
fn from(ctx: &'a SpanContext) -> Self {
Self::new(
ctx.trace_id.into(),
ctx.span_id.into(),
Default::default(),
false,
Default::default(),
)
}
}
impl From<SpanEvent> for observability_deps::opentelemetry::trace::Event {
fn from(event: SpanEvent) -> Self {
Self {
name: event.msg,
timestamp: event.time.into(),
attributes: vec![],
dropped_attributes_count: 0,
}
}
}
impl From<SpanStatus> for observability_deps::opentelemetry::trace::StatusCode {
fn from(status: SpanStatus) -> Self {
match status {
SpanStatus::Unknown => Self::Unset,
SpanStatus::Ok => Self::Ok,
SpanStatus::Err => Self::Error,
}
}
}
impl From<SpanId> for observability_deps::opentelemetry::trace::SpanId {
fn from(id: SpanId) -> Self {
Self::from_u64(id.0.get())
}
}
impl From<TraceId> for observability_deps::opentelemetry::trace::TraceId {
fn from(id: TraceId) -> Self {
Self::from_u128(id.0.get())
}
}
impl From<MetaValue> for observability_deps::opentelemetry::Value {
fn from(v: MetaValue) -> Self {
match v {
MetaValue::String(v) => Self::String(v),
MetaValue::Float(v) => Self::F64(v),
MetaValue::Int(v) => Self::I64(v),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{TimeZone, Utc};
use observability_deps::opentelemetry::{Key, Value};
use std::time::{Duration, UNIX_EPOCH};
#[test]
fn test_conversion() {
let root = SpanContext {
trace_id: TraceId::new(232345).unwrap(),
parent_span_id: Some(SpanId::new(2484).unwrap()),
span_id: SpanId::new(2343).unwrap(),
collector: None,
};
let mut span = root.child("foo");
span.metadata.insert("string".into(), "bar".into());
span.metadata.insert("float".into(), 3.32.into());
span.metadata.insert("int".into(), 5.into());
span.events.push(SpanEvent {
time: Utc.timestamp_nanos(1230),
msg: "event".into(),
});
span.status = SpanStatus::Ok;
span.start = Some(Utc.timestamp_nanos(1000));
span.end = Some(Utc.timestamp_nanos(2000));
let span_data: SpanData = span.clone().into();
assert_eq!(
span_data.span_context.span_id().to_u64(),
span.ctx.span_id.get()
);
assert_eq!(
span_data.span_context.trace_id().to_u128(),
span.ctx.trace_id.get()
);
assert_eq!(
span_data.parent_span_id.to_u64(),
span.ctx.parent_span_id.unwrap().get()
);
assert_eq!(
span_data.start_time,
UNIX_EPOCH + Duration::from_nanos(1000)
);
assert_eq!(span_data.end_time, UNIX_EPOCH + Duration::from_nanos(2000));
let events: Vec<_> = span_data.events.iter().collect();
assert_eq!(events.len(), 1);
assert_eq!(events[0].name.as_ref(), "event");
assert_eq!(events[0].timestamp, UNIX_EPOCH + Duration::from_nanos(1230));
assert_eq!(events[0].attributes.len(), 0);
assert_eq!(
span_data
.attributes
.get(&Key::from_static_str("string"))
.unwrap()
.clone(),
Value::String("bar".into())
);
assert_eq!(
span_data
.attributes
.get(&Key::from_static_str("float"))
.unwrap()
.clone(),
Value::F64(3.32)
);
assert_eq!(
span_data
.attributes
.get(&Key::from_static_str("int"))
.unwrap()
.clone(),
Value::I64(5)
);
}
#[tokio::test]
async fn test_exporter() {
let (sender, mut receiver) = mpsc::channel(10);
let exporter = OtelExporter::new(TestOtelExporter::new(sender));
assert!(exporter.join().now_or_never().is_none());
let root = SpanContext {
trace_id: TraceId::new(232345).unwrap(),
parent_span_id: None,
span_id: SpanId::new(2343).unwrap(),
collector: None,
};
let s1 = root.child("foo");
let s2 = root.child("bar");
exporter.export(s1.clone());
exporter.export(s2.clone());
exporter.export(s2.clone());
let r1 = receiver.recv().await.unwrap();
let r2 = receiver.recv().await.unwrap();
let r3 = receiver.recv().await.unwrap();
exporter.shutdown();
exporter.join().await.unwrap();
// Should not be fatal despite exporter having been shutdown
exporter.export(s2.clone());
assert_eq!(root.span_id.get(), r1.parent_span_id.to_u64());
assert_eq!(s1.ctx.span_id.get(), r1.span_context.span_id().to_u64());
assert_eq!(s1.ctx.trace_id.get(), r1.span_context.trace_id().to_u128());
assert_eq!(root.span_id.get(), r2.parent_span_id.to_u64());
assert_eq!(s2.ctx.span_id.get(), r2.span_context.span_id().to_u64());
assert_eq!(s2.ctx.trace_id.get(), r2.span_context.trace_id().to_u128());
assert_eq!(root.span_id.get(), r3.parent_span_id.to_u64());
assert_eq!(s2.ctx.span_id.get(), r3.span_context.span_id().to_u64());
assert_eq!(s2.ctx.trace_id.get(), r3.span_context.trace_id().to_u128());
}
}

View File

@ -1,3 +1,4 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
@ -20,11 +21,10 @@ pub enum SpanStatus {
/// A `Span` has a name, metadata, a start and end time and a unique ID. Additionally they
/// have relationships with other Spans that together comprise a Trace
///
/// On Drop a `Span` is published to the registered collector
///
#[derive(Debug, Serialize, Deserialize)]
pub struct Span<'a> {
pub name: &'a str,
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Span {
pub name: Cow<'static, str>,
//#[serde(flatten)] - https://github.com/serde-rs/json/issues/505
pub ctx: SpanContext,
@ -35,15 +35,14 @@ pub struct Span<'a> {
pub status: SpanStatus,
#[serde(borrow)]
pub metadata: HashMap<&'a str, MetaValue<'a>>,
pub metadata: HashMap<Cow<'static, str>, MetaValue>,
#[serde(borrow)]
pub events: Vec<SpanEvent<'a>>,
pub events: Vec<SpanEvent>,
}
impl<'a> Span<'a> {
pub fn event(&mut self, meta: impl Into<MetaValue<'a>>) {
impl Span {
/// Record an event on this `Span`
pub fn event(&mut self, meta: impl Into<Cow<'static, str>>) {
let event = SpanEvent {
time: Utc::now(),
msg: meta.into(),
@ -51,11 +50,13 @@ impl<'a> Span<'a> {
self.events.push(event)
}
pub fn error(&mut self, meta: impl Into<MetaValue<'a>>) {
/// Record an error on this `Span`
pub fn error(&mut self, meta: impl Into<Cow<'static, str>>) {
self.event(meta);
self.status = SpanStatus::Err;
}
/// Returns a JSON representation of this `Span`
pub fn json(&self) -> String {
match serde_json::to_string(self) {
Ok(serialized) => serialized,
@ -65,46 +66,50 @@ impl<'a> Span<'a> {
}
}
}
}
impl<'a> Drop for Span<'a> {
fn drop(&mut self) {
if let Some(collector) = &self.ctx.collector {
/// Exports this `Span` to its registered collector if any
pub fn export(mut self) {
if let Some(collector) = self.ctx.collector.take() {
collector.export(self)
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SpanEvent<'a> {
pub struct SpanEvent {
pub time: DateTime<Utc>,
#[serde(borrow)]
pub msg: MetaValue<'a>,
pub msg: Cow<'static, str>,
}
/// Values that can be stored in a Span's metadata and events
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum MetaValue<'a> {
String(&'a str),
pub enum MetaValue {
String(Cow<'static, str>),
Float(f64),
Int(i64),
}
impl<'a> From<&'a str> for MetaValue<'a> {
fn from(v: &'a str) -> Self {
Self::String(v)
impl From<&'static str> for MetaValue {
fn from(v: &'static str) -> Self {
Self::String(Cow::Borrowed(v))
}
}
impl<'a> From<f64> for MetaValue<'a> {
impl From<String> for MetaValue {
fn from(v: String) -> Self {
Self::String(Cow::Owned(v))
}
}
impl From<f64> for MetaValue {
fn from(v: f64) -> Self {
Self::Float(v)
}
}
impl<'a> From<i64> for MetaValue<'a> {
impl From<i64> for MetaValue {
fn from(v: i64) -> Self {
Self::Int(v)
}
@ -112,46 +117,50 @@ impl<'a> From<i64> for MetaValue<'a> {
/// Updates the start and end times on the provided Span
#[derive(Debug)]
pub struct EnteredSpan<'a> {
span: Span<'a>,
pub struct EnteredSpan {
/// Option so we can take out of it on drop / publish
span: Option<Span>,
}
impl<'a> Deref for EnteredSpan<'a> {
type Target = Span<'a>;
impl<'a> Deref for EnteredSpan {
type Target = Span;
fn deref(&self) -> &Self::Target {
&self.span
self.span.as_ref().expect("dropped")
}
}
impl<'a> DerefMut for EnteredSpan<'a> {
impl<'a> DerefMut for EnteredSpan {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.span
self.span.as_mut().expect("dropped")
}
}
impl<'a> EnteredSpan<'a> {
pub fn new(mut span: Span<'a>) -> Self {
impl<'a> EnteredSpan {
pub fn new(mut span: Span) -> Self {
span.start = Some(Utc::now());
Self { span }
Self { span: Some(span) }
}
}
impl<'a> Drop for EnteredSpan<'a> {
impl<'a> Drop for EnteredSpan {
fn drop(&mut self) {
let now = Utc::now();
// SystemTime is not monotonic so must also check min
let mut span = self.span.take().expect("dropped");
self.span.start = Some(match self.span.start {
// SystemTime is not monotonic so must also check min
span.start = Some(match span.start {
Some(a) => a.min(now),
None => now,
});
self.span.end = Some(match self.span.end {
span.end = Some(match span.end {
Some(a) => a.max(now),
None => now,
});
span.export()
}
}
@ -167,9 +176,9 @@ mod tests {
use super::*;
fn make_span(collector: Arc<dyn TraceCollector>) -> Span<'static> {
fn make_span(collector: Arc<dyn TraceCollector>) -> Span {
Span {
name: "foo",
name: "foo".into(),
ctx: SpanContext {
trace_id: TraceId(NonZeroU128::new(23948923).unwrap()),
parent_span_id: None,
@ -197,7 +206,7 @@ mod tests {
span.events.push(SpanEvent {
time: Utc.timestamp_nanos(1000),
msg: MetaValue::String("this is a test event"),
msg: "this is a test event".into(),
});
assert_eq!(
@ -205,7 +214,7 @@ mod tests {
r#"{"name":"foo","ctx":{"trace_id":23948923,"parent_span_id":null,"span_id":3498394},"start":null,"end":null,"status":"Unknown","metadata":{},"events":[{"time":"1970-01-01T00:00:00.000001Z","msg":"this is a test event"}]}"#
);
span.metadata.insert("foo", MetaValue::String("bar"));
span.metadata.insert("foo".into(), "bar".into());
span.start = Some(Utc.timestamp_nanos(100));
assert_eq!(
@ -219,7 +228,7 @@ mod tests {
let expected = r#"{"name":"foo","ctx":{"trace_id":23948923,"parent_span_id":23493,"span_id":3498394},"start":"1970-01-01T00:00:00.000000100Z","end":null,"status":"Ok","metadata":{"foo":"bar"},"events":[{"time":"1970-01-01T00:00:00.000001Z","msg":"this is a test event"}]}"#;
assert_eq!(span.json(), expected);
std::mem::drop(span);
span.export();
// Should publish span
let spans = collector.spans();
@ -243,7 +252,7 @@ mod tests {
let spans = collector.spans();
assert_eq!(spans.len(), 1);
let span: Span<'_> = serde_json::from_str(spans[0].as_str()).unwrap();
let span: Span = serde_json::from_str(spans[0].as_str()).unwrap();
assert!(span.start.is_some());
assert!(span.end.is_some());

View File

@ -98,7 +98,7 @@ where
#[pin_project]
#[derive(Debug)]
pub struct TracedFuture<F> {
span: Option<EnteredSpan<'static>>,
span: Option<EnteredSpan>,
#[pin]
inner: F,
}
@ -133,7 +133,7 @@ where
#[pin_project]
#[derive(Debug)]
pub struct TracedBody<B> {
span: Option<EnteredSpan<'static>>,
span: Option<EnteredSpan>,
#[pin]
inner: B,
}