feat: support static tracing tags
Adds support for annotating all Jaeger tracing spans with a set of static process tags.pull/24376/head
parent
581d277f75
commit
edc52181cf
|
@ -1,8 +1,11 @@
|
|||
use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
|
||||
use std::{
|
||||
net::{SocketAddr, ToSocketAddrs, UdpSocket},
|
||||
str::FromStr,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
use observability_deps::tracing::{error, info};
|
||||
use observability_deps::tracing::*;
|
||||
use trace::span::Span;
|
||||
|
||||
use crate::export::AsyncExport;
|
||||
|
@ -12,6 +15,49 @@ use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol};
|
|||
|
||||
mod span;
|
||||
|
||||
/// A key=value pair for span annotations.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct JaegerTag {
|
||||
key: String,
|
||||
value: String,
|
||||
}
|
||||
|
||||
impl JaegerTag {
|
||||
/// Create a new static tag for all jaeger spans.
|
||||
pub fn new(key: impl Into<String>, value: impl Into<String>) -> Self {
|
||||
Self {
|
||||
key: key.into(),
|
||||
value: value.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<JaegerTag> for jaeger::Tag {
|
||||
fn from(t: JaegerTag) -> Self {
|
||||
Self::new(
|
||||
t.key,
|
||||
jaeger::TagType::String,
|
||||
Some(t.value),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for JaegerTag {
|
||||
type Err = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
let parts = s.split('=').collect::<Vec<_>>();
|
||||
match *parts {
|
||||
[key, value] if !key.is_empty() && !value.is_empty() => Ok(Self::new(key, value)),
|
||||
_ => Err(format!("invalid key=value pair ({})", s).into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `JaegerAgentExporter` receives span data and writes it over UDP to a local jaeger agent
|
||||
///
|
||||
/// Note: will drop data if the UDP socket would block
|
||||
|
@ -26,6 +72,9 @@ pub struct JaegerAgentExporter {
|
|||
/// Spans should be assigned a sequential sequence number
|
||||
/// to allow jaeger to better detect dropped spans
|
||||
next_sequence: i64,
|
||||
|
||||
/// Optional static tags to annotate every span with.
|
||||
tags: Option<Vec<jaeger::Tag>>,
|
||||
}
|
||||
|
||||
impl JaegerAgentExporter {
|
||||
|
@ -61,16 +110,25 @@ impl JaegerAgentExporter {
|
|||
service_name,
|
||||
client,
|
||||
next_sequence: 0,
|
||||
tags: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Annotate all spans emitted by this exporter with the specified static
|
||||
/// tags.
|
||||
pub fn with_tags(self, tags: &[JaegerTag]) -> Self {
|
||||
debug!(?tags, "setting static jaeger span tags");
|
||||
let tags = Some(tags.iter().cloned().map(Into::into).collect());
|
||||
Self { tags, ..self }
|
||||
}
|
||||
|
||||
fn make_batch(&mut self, spans: Vec<Span>) -> jaeger::Batch {
|
||||
let seq_no = Some(self.next_sequence);
|
||||
self.next_sequence += 1;
|
||||
jaeger::Batch {
|
||||
process: jaeger::Process {
|
||||
service_name: self.service_name.clone(),
|
||||
tags: None,
|
||||
tags: self.tags.clone(),
|
||||
},
|
||||
spans: spans.into_iter().map(Into::into).collect(),
|
||||
seq_no,
|
||||
|
@ -214,6 +272,34 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_jaeger_tag_from_str() {
|
||||
"".parse::<JaegerTag>().expect_err("empty tag should fail");
|
||||
"key"
|
||||
.parse::<JaegerTag>()
|
||||
.expect_err("no value should fail");
|
||||
"key="
|
||||
.parse::<JaegerTag>()
|
||||
.expect_err("no value should fail");
|
||||
"key=="
|
||||
.parse::<JaegerTag>()
|
||||
.expect_err("no value should fail");
|
||||
"=value"
|
||||
.parse::<JaegerTag>()
|
||||
.expect_err("no key should fail");
|
||||
"==value"
|
||||
.parse::<JaegerTag>()
|
||||
.expect_err("empty key should fail");
|
||||
"key==value"
|
||||
.parse::<JaegerTag>()
|
||||
.expect_err("too many = should fail");
|
||||
"=".parse::<JaegerTag>()
|
||||
.expect_err("empty key value should fail");
|
||||
"key=value"
|
||||
.parse::<JaegerTag>()
|
||||
.expect("valid form should succeed");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_jaeger() {
|
||||
let server = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||
|
@ -221,8 +307,22 @@ mod tests {
|
|||
.set_read_timeout(Some(std::time::Duration::from_secs(1)))
|
||||
.unwrap();
|
||||
|
||||
let tags = [JaegerTag::new("bananas", "great")];
|
||||
let address = server.local_addr().unwrap();
|
||||
let mut exporter = JaegerAgentExporter::new("service_name".to_string(), address).unwrap();
|
||||
let mut exporter = JaegerAgentExporter::new("service_name".to_string(), address)
|
||||
.unwrap()
|
||||
.with_tags(&tags);
|
||||
|
||||
// Encoded form of tags.
|
||||
let want_tags = [jaeger::Tag {
|
||||
key: "bananas".into(),
|
||||
v_str: Some("great".into()),
|
||||
v_type: jaeger::TagType::String,
|
||||
v_double: None,
|
||||
v_bool: None,
|
||||
v_long: None,
|
||||
v_binary: None,
|
||||
}];
|
||||
|
||||
let batches = Arc::new(Mutex::new(vec![]));
|
||||
|
||||
|
@ -271,11 +371,26 @@ mod tests {
|
|||
assert_eq!(b1.spans.len(), 2);
|
||||
assert_eq!(b1.process.service_name.as_str(), "service_name");
|
||||
assert_eq!(b1.seq_no.unwrap(), 0);
|
||||
let got_tags = b1
|
||||
.process
|
||||
.tags
|
||||
.as_ref()
|
||||
.expect("expected static process tags");
|
||||
assert_eq!(got_tags, &want_tags);
|
||||
|
||||
let b2 = &batches[1];
|
||||
assert_eq!(b2.spans.len(), 1);
|
||||
assert_eq!(b2.process.service_name.as_str(), "service_name");
|
||||
assert_eq!(b2.seq_no.unwrap(), 1);
|
||||
let got_tags = b2
|
||||
.process
|
||||
.tags
|
||||
.as_ref()
|
||||
.expect("expected static process tags");
|
||||
assert_eq!(got_tags, &want_tags);
|
||||
|
||||
// Span tags should be constant
|
||||
assert_eq!(b1.process.tags, b2.process.tags);
|
||||
|
||||
let b1_s0 = &b1.spans[0];
|
||||
|
||||
|
|
Loading…
Reference in New Issue