Merge pull request #4284 from influxdata/dom/jaeger-tags
feat: static jaeger span tagspull/24376/head
commit
3af4726708
|
@ -1,8 +1,11 @@
|
||||||
use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
|
use std::{
|
||||||
|
net::{SocketAddr, ToSocketAddrs, UdpSocket},
|
||||||
|
str::FromStr,
|
||||||
|
};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
||||||
use observability_deps::tracing::{error, info};
|
use observability_deps::tracing::*;
|
||||||
use trace::span::Span;
|
use trace::span::Span;
|
||||||
|
|
||||||
use crate::export::AsyncExport;
|
use crate::export::AsyncExport;
|
||||||
|
@ -12,6 +15,49 @@ use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol};
|
||||||
|
|
||||||
mod span;
|
mod span;
|
||||||
|
|
||||||
|
/// A key=value pair for span annotations.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct JaegerTag {
|
||||||
|
key: String,
|
||||||
|
value: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JaegerTag {
|
||||||
|
/// Create a new static tag for all jaeger spans.
|
||||||
|
pub fn new(key: impl Into<String>, value: impl Into<String>) -> Self {
|
||||||
|
Self {
|
||||||
|
key: key.into(),
|
||||||
|
value: value.into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<JaegerTag> for jaeger::Tag {
|
||||||
|
fn from(t: JaegerTag) -> Self {
|
||||||
|
Self::new(
|
||||||
|
t.key,
|
||||||
|
jaeger::TagType::String,
|
||||||
|
Some(t.value),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FromStr for JaegerTag {
|
||||||
|
type Err = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||||
|
|
||||||
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||||
|
let parts = s.split('=').collect::<Vec<_>>();
|
||||||
|
match *parts {
|
||||||
|
[key, value] if !key.is_empty() && !value.is_empty() => Ok(Self::new(key, value)),
|
||||||
|
_ => Err(format!("invalid key=value pair ({})", s).into()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// `JaegerAgentExporter` receives span data and writes it over UDP to a local jaeger agent
|
/// `JaegerAgentExporter` receives span data and writes it over UDP to a local jaeger agent
|
||||||
///
|
///
|
||||||
/// Note: will drop data if the UDP socket would block
|
/// Note: will drop data if the UDP socket would block
|
||||||
|
@ -26,6 +72,9 @@ pub struct JaegerAgentExporter {
|
||||||
/// Spans should be assigned a sequential sequence number
|
/// Spans should be assigned a sequential sequence number
|
||||||
/// to allow jaeger to better detect dropped spans
|
/// to allow jaeger to better detect dropped spans
|
||||||
next_sequence: i64,
|
next_sequence: i64,
|
||||||
|
|
||||||
|
/// Optional static tags to annotate every span with.
|
||||||
|
tags: Option<Vec<jaeger::Tag>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl JaegerAgentExporter {
|
impl JaegerAgentExporter {
|
||||||
|
@ -61,16 +110,25 @@ impl JaegerAgentExporter {
|
||||||
service_name,
|
service_name,
|
||||||
client,
|
client,
|
||||||
next_sequence: 0,
|
next_sequence: 0,
|
||||||
|
tags: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Annotate all spans emitted by this exporter with the specified static
|
||||||
|
/// tags.
|
||||||
|
pub fn with_tags(self, tags: &[JaegerTag]) -> Self {
|
||||||
|
debug!(?tags, "setting static jaeger span tags");
|
||||||
|
let tags = Some(tags.iter().cloned().map(Into::into).collect());
|
||||||
|
Self { tags, ..self }
|
||||||
|
}
|
||||||
|
|
||||||
fn make_batch(&mut self, spans: Vec<Span>) -> jaeger::Batch {
|
fn make_batch(&mut self, spans: Vec<Span>) -> jaeger::Batch {
|
||||||
let seq_no = Some(self.next_sequence);
|
let seq_no = Some(self.next_sequence);
|
||||||
self.next_sequence += 1;
|
self.next_sequence += 1;
|
||||||
jaeger::Batch {
|
jaeger::Batch {
|
||||||
process: jaeger::Process {
|
process: jaeger::Process {
|
||||||
service_name: self.service_name.clone(),
|
service_name: self.service_name.clone(),
|
||||||
tags: None,
|
tags: self.tags.clone(),
|
||||||
},
|
},
|
||||||
spans: spans.into_iter().map(Into::into).collect(),
|
spans: spans.into_iter().map(Into::into).collect(),
|
||||||
seq_no,
|
seq_no,
|
||||||
|
@ -214,6 +272,34 @@ mod tests {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_jaeger_tag_from_str() {
|
||||||
|
"".parse::<JaegerTag>().expect_err("empty tag should fail");
|
||||||
|
"key"
|
||||||
|
.parse::<JaegerTag>()
|
||||||
|
.expect_err("no value should fail");
|
||||||
|
"key="
|
||||||
|
.parse::<JaegerTag>()
|
||||||
|
.expect_err("no value should fail");
|
||||||
|
"key=="
|
||||||
|
.parse::<JaegerTag>()
|
||||||
|
.expect_err("no value should fail");
|
||||||
|
"=value"
|
||||||
|
.parse::<JaegerTag>()
|
||||||
|
.expect_err("no key should fail");
|
||||||
|
"==value"
|
||||||
|
.parse::<JaegerTag>()
|
||||||
|
.expect_err("empty key should fail");
|
||||||
|
"key==value"
|
||||||
|
.parse::<JaegerTag>()
|
||||||
|
.expect_err("too many = should fail");
|
||||||
|
"=".parse::<JaegerTag>()
|
||||||
|
.expect_err("empty key value should fail");
|
||||||
|
"key=value"
|
||||||
|
.parse::<JaegerTag>()
|
||||||
|
.expect("valid form should succeed");
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_jaeger() {
|
async fn test_jaeger() {
|
||||||
let server = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let server = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
@ -221,8 +307,22 @@ mod tests {
|
||||||
.set_read_timeout(Some(std::time::Duration::from_secs(1)))
|
.set_read_timeout(Some(std::time::Duration::from_secs(1)))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
let tags = [JaegerTag::new("bananas", "great")];
|
||||||
let address = server.local_addr().unwrap();
|
let address = server.local_addr().unwrap();
|
||||||
let mut exporter = JaegerAgentExporter::new("service_name".to_string(), address).unwrap();
|
let mut exporter = JaegerAgentExporter::new("service_name".to_string(), address)
|
||||||
|
.unwrap()
|
||||||
|
.with_tags(&tags);
|
||||||
|
|
||||||
|
// Encoded form of tags.
|
||||||
|
let want_tags = [jaeger::Tag {
|
||||||
|
key: "bananas".into(),
|
||||||
|
v_str: Some("great".into()),
|
||||||
|
v_type: jaeger::TagType::String,
|
||||||
|
v_double: None,
|
||||||
|
v_bool: None,
|
||||||
|
v_long: None,
|
||||||
|
v_binary: None,
|
||||||
|
}];
|
||||||
|
|
||||||
let batches = Arc::new(Mutex::new(vec![]));
|
let batches = Arc::new(Mutex::new(vec![]));
|
||||||
|
|
||||||
|
@ -271,11 +371,26 @@ mod tests {
|
||||||
assert_eq!(b1.spans.len(), 2);
|
assert_eq!(b1.spans.len(), 2);
|
||||||
assert_eq!(b1.process.service_name.as_str(), "service_name");
|
assert_eq!(b1.process.service_name.as_str(), "service_name");
|
||||||
assert_eq!(b1.seq_no.unwrap(), 0);
|
assert_eq!(b1.seq_no.unwrap(), 0);
|
||||||
|
let got_tags = b1
|
||||||
|
.process
|
||||||
|
.tags
|
||||||
|
.as_ref()
|
||||||
|
.expect("expected static process tags");
|
||||||
|
assert_eq!(got_tags, &want_tags);
|
||||||
|
|
||||||
let b2 = &batches[1];
|
let b2 = &batches[1];
|
||||||
assert_eq!(b2.spans.len(), 1);
|
assert_eq!(b2.spans.len(), 1);
|
||||||
assert_eq!(b2.process.service_name.as_str(), "service_name");
|
assert_eq!(b2.process.service_name.as_str(), "service_name");
|
||||||
assert_eq!(b2.seq_no.unwrap(), 1);
|
assert_eq!(b2.seq_no.unwrap(), 1);
|
||||||
|
let got_tags = b2
|
||||||
|
.process
|
||||||
|
.tags
|
||||||
|
.as_ref()
|
||||||
|
.expect("expected static process tags");
|
||||||
|
assert_eq!(got_tags, &want_tags);
|
||||||
|
|
||||||
|
// Span tags should be constant
|
||||||
|
assert_eq!(b1.process.tags, b2.process.tags);
|
||||||
|
|
||||||
let b1_s0 = &b1.spans[0];
|
let b1_s0 = &b1.spans[0];
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,7 @@
|
||||||
|
|
||||||
use crate::export::AsyncExporter;
|
use crate::export::AsyncExporter;
|
||||||
use crate::jaeger::JaegerAgentExporter;
|
use crate::jaeger::JaegerAgentExporter;
|
||||||
|
use jaeger::JaegerTag;
|
||||||
use snafu::Snafu;
|
use snafu::Snafu;
|
||||||
use std::num::NonZeroU16;
|
use std::num::NonZeroU16;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -103,6 +104,19 @@ pub struct TracingConfig {
|
||||||
default_value = "jaeger-debug-id"
|
default_value = "jaeger-debug-id"
|
||||||
)]
|
)]
|
||||||
pub traces_jaeger_debug_name: String,
|
pub traces_jaeger_debug_name: String,
|
||||||
|
|
||||||
|
/// Tracing: set of key=value pairs to annotate tracing spans with.
|
||||||
|
///
|
||||||
|
/// Use a comma-delimited string to set multiple pairs: env=prod,region=eu-1
|
||||||
|
///
|
||||||
|
/// Only used if `--traces-exporter` is "jaeger".
|
||||||
|
#[clap(
|
||||||
|
long = "--traces-jaeger-tags",
|
||||||
|
env = "TRACES_EXPORTER_JAEGER_TAGS",
|
||||||
|
value_delimiter = ',',
|
||||||
|
parse(try_from_str)
|
||||||
|
)]
|
||||||
|
pub traces_jaeger_tags: Option<Vec<JaegerTag>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TracingConfig {
|
impl TracingConfig {
|
||||||
|
@ -154,7 +168,12 @@ fn jaeger_exporter(config: &TracingConfig) -> Result<Arc<AsyncExporter>> {
|
||||||
);
|
);
|
||||||
|
|
||||||
let service_name = &config.traces_exporter_jaeger_service_name;
|
let service_name = &config.traces_exporter_jaeger_service_name;
|
||||||
let jaeger = JaegerAgentExporter::new(service_name.clone(), agent_endpoint)?;
|
let mut jaeger = JaegerAgentExporter::new(service_name.clone(), agent_endpoint)?;
|
||||||
|
|
||||||
|
// Use any specified static span tags.
|
||||||
|
if let Some(tags) = &config.traces_jaeger_tags {
|
||||||
|
jaeger = jaeger.with_tags(tags);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(Arc::new(AsyncExporter::new(jaeger)))
|
Ok(Arc::new(AsyncExporter::new(jaeger)))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue