feat: implement jaeger-agent protocol directly (#2607)
* feat: implement jaeger-agent protocol directly * chore: review feedback * fix: remove jaeger feature flagpull/24376/head
parent
011dba8f51
commit
92e6173f49
|
@ -148,7 +148,7 @@ jobs:
|
||||||
- cache_restore
|
- cache_restore
|
||||||
- run:
|
- run:
|
||||||
name: Cargo test
|
name: Cargo test
|
||||||
command: cargo test --features=jaeger --workspace
|
command: cargo test --workspace
|
||||||
- cache_save
|
- cache_save
|
||||||
|
|
||||||
# end to end tests with Heappy (heap profiling enabled)
|
# end to end tests with Heappy (heap profiling enabled)
|
||||||
|
@ -275,7 +275,7 @@ jobs:
|
||||||
command: cargo test --workspace --benches --no-run
|
command: cargo test --workspace --benches --no-run
|
||||||
- run:
|
- run:
|
||||||
name: Build with object store + exporter support + HEAP profiling
|
name: Build with object store + exporter support + HEAP profiling
|
||||||
command: cargo build --no-default-features --features="aws,gcp,azure,jaeger,heappy,pprof"
|
command: cargo build --no-default-features --features="aws,gcp,azure,heappy,pprof"
|
||||||
- cache_save
|
- cache_save
|
||||||
|
|
||||||
# Lint protobufs.
|
# Lint protobufs.
|
||||||
|
@ -334,10 +334,10 @@ jobs:
|
||||||
- cache_restore
|
- cache_restore
|
||||||
- run:
|
- run:
|
||||||
name: Print rustc target CPU options
|
name: Print rustc target CPU options
|
||||||
command: cargo run --release --no-default-features --features="aws,gcp,azure,jaeger,heappy" --bin print_cpu
|
command: cargo run --release --no-default-features --features="aws,gcp,azure,heappy" --bin print_cpu
|
||||||
- run:
|
- run:
|
||||||
name: Cargo release build with target arch set for CRoaring
|
name: Cargo release build with target arch set for CRoaring
|
||||||
command: cargo build --release --no-default-features --features="aws,gcp,azure,jaeger,heappy"
|
command: cargo build --release --no-default-features --features="aws,gcp,azure,heappy"
|
||||||
- run: |
|
- run: |
|
||||||
echo sha256sum after build is
|
echo sha256sum after build is
|
||||||
sha256sum target/release/influxdb_iox
|
sha256sum target/release/influxdb_iox
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
generated_types/protos/google/ linguist-generated=true
|
generated_types/protos/google/ linguist-generated=true
|
||||||
generated_types/protos/grpc/ linguist-generated=true
|
generated_types/protos/grpc/ linguist-generated=true
|
||||||
generated_types/src/wal_generated.rs linguist-generated=true
|
generated_types/src/wal_generated.rs linguist-generated=true
|
||||||
|
trace_exporters/src/thrift/ linguist-generated=true
|
||||||
|
|
|
@ -265,7 +265,7 @@ docker run -d --name jaeger \
|
||||||
|
|
||||||
### Step 2: Run IOx configured to send traces to the local Jaeger instance
|
### Step 2: Run IOx configured to send traces to the local Jaeger instance
|
||||||
|
|
||||||
Build IOx with `--features=jaeger` and run with the following environment variables set:
|
Build IOx and run with the following environment variable set:
|
||||||
```
|
```
|
||||||
TRACES_EXPORTER=jaeger
|
TRACES_EXPORTER=jaeger
|
||||||
TRACES_EXPORTER_JAEGER_AGENT_HOST=localhost
|
TRACES_EXPORTER_JAEGER_AGENT_HOST=localhost
|
||||||
|
@ -274,7 +274,7 @@ TRACES_EXPORTER_JAEGER_AGENT_PORT=6831
|
||||||
|
|
||||||
For example, a command such as this should do the trick:
|
For example, a command such as this should do the trick:
|
||||||
```shell
|
```shell
|
||||||
TRACES_EXPORTER=jaeger TRACES_EXPORTER_JAEGER_AGENT_HOST=localhost TRACES_EXPORTER_JAEGER_AGENT_PORT=6831 cargo run --features=jaeger -- run -v --object-store=file --data-dir=$HOME/.influxdb_iox --server-id=42
|
TRACES_EXPORTER=jaeger TRACES_EXPORTER_JAEGER_AGENT_HOST=localhost TRACES_EXPORTER_JAEGER_AGENT_PORT=6831 cargo run -- run -v --server-id=42
|
||||||
```
|
```
|
||||||
|
|
||||||
### Step 3: Send a request with trace context
|
### Step 3: Send a request with trace context
|
||||||
|
|
|
@ -2588,46 +2588,6 @@ dependencies = [
|
||||||
"vcpkg",
|
"vcpkg",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "opentelemetry"
|
|
||||||
version = "0.16.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "e1cf9b1c4e9a6c4de793c632496fa490bdc0e1eea73f0c91394f7b6990935d22"
|
|
||||||
dependencies = [
|
|
||||||
"async-trait",
|
|
||||||
"crossbeam-channel",
|
|
||||||
"futures",
|
|
||||||
"js-sys",
|
|
||||||
"lazy_static",
|
|
||||||
"percent-encoding",
|
|
||||||
"pin-project",
|
|
||||||
"rand",
|
|
||||||
"thiserror",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "opentelemetry-jaeger"
|
|
||||||
version = "0.15.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "db22f492873ea037bc267b35a0e8e4fb846340058cb7c864efe3d0bf23684593"
|
|
||||||
dependencies = [
|
|
||||||
"async-trait",
|
|
||||||
"lazy_static",
|
|
||||||
"opentelemetry",
|
|
||||||
"opentelemetry-semantic-conventions",
|
|
||||||
"thiserror",
|
|
||||||
"thrift",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "opentelemetry-semantic-conventions"
|
|
||||||
version = "0.8.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "ffeac823339e8b0f27b961f4385057bf9f97f2863bc745bd015fd6091f2270e9"
|
|
||||||
dependencies = [
|
|
||||||
"opentelemetry",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ordered-float"
|
name = "ordered-float"
|
||||||
version = "1.1.1"
|
version = "1.1.1"
|
||||||
|
@ -4568,12 +4528,10 @@ dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
"futures",
|
"futures",
|
||||||
"observability_deps",
|
"observability_deps",
|
||||||
"opentelemetry",
|
|
||||||
"opentelemetry-jaeger",
|
|
||||||
"snafu",
|
"snafu",
|
||||||
"structopt",
|
"structopt",
|
||||||
|
"thrift",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
|
||||||
"trace",
|
"trace",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
@ -191,7 +191,6 @@ default = ["jemalloc_replacing_malloc"]
|
||||||
azure = ["object_store/azure"] # Optional Azure Object store support
|
azure = ["object_store/azure"] # Optional Azure Object store support
|
||||||
gcp = ["object_store/gcp"] # Optional GCP object store support
|
gcp = ["object_store/gcp"] # Optional GCP object store support
|
||||||
aws = ["object_store/aws"] # Optional AWS / S3 object store support
|
aws = ["object_store/aws"] # Optional AWS / S3 object store support
|
||||||
jaeger = ["trace_exporters/jaeger"] # Enable optional jaeger tracing support
|
|
||||||
# pprof is an optional feature for pprof support
|
# pprof is an optional feature for pprof support
|
||||||
|
|
||||||
# heappy is an optional feature; Not on by default as it
|
# heappy is an optional feature; Not on by default as it
|
||||||
|
|
|
@ -13,7 +13,7 @@ RUN \
|
||||||
--mount=type=cache,id=influxdb_iox_git,sharing=locked,target=/usr/local/cargo/git \
|
--mount=type=cache,id=influxdb_iox_git,sharing=locked,target=/usr/local/cargo/git \
|
||||||
--mount=type=cache,id=influxdb_iox_target,sharing=locked,target=/influxdb_iox/target \
|
--mount=type=cache,id=influxdb_iox_target,sharing=locked,target=/influxdb_iox/target \
|
||||||
du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target && \
|
du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target && \
|
||||||
cargo build --target-dir /influxdb_iox/target --release --features azure,gcp,aws,jaeger,pprof && \
|
cargo build --target-dir /influxdb_iox/target --release --features azure,gcp,aws,pprof && \
|
||||||
cp /influxdb_iox/target/release/influxdb_iox /root/influxdb_iox && \
|
cp /influxdb_iox/target/release/influxdb_iox /root/influxdb_iox && \
|
||||||
du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target
|
du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target
|
||||||
|
|
||||||
|
|
|
@ -79,8 +79,7 @@ def main():
|
||||||
try:
|
try:
|
||||||
if not args.skip_build:
|
if not args.skip_build:
|
||||||
build_with_aws = args.object_store == 's3'
|
build_with_aws = args.object_store == 's3'
|
||||||
build_with_jaeger = do_trace
|
cargo_build_iox(args.debug, build_with_aws)
|
||||||
cargo_build_iox(args.debug, build_with_aws, build_with_jaeger)
|
|
||||||
|
|
||||||
docker_create_network(dc)
|
docker_create_network(dc)
|
||||||
if args.kafka_zookeeper:
|
if args.kafka_zookeeper:
|
||||||
|
@ -382,15 +381,13 @@ def docker_run_jaeger(dc):
|
||||||
return container
|
return container
|
||||||
|
|
||||||
|
|
||||||
def cargo_build_iox(debug=False, build_with_aws=True, build_with_jaeger=True):
|
def cargo_build_iox(debug=False, build_with_aws=True):
|
||||||
t = time.time()
|
t = time.time()
|
||||||
print('building IOx')
|
print('building IOx')
|
||||||
|
|
||||||
features = []
|
features = []
|
||||||
if build_with_aws:
|
if build_with_aws:
|
||||||
features.append('aws')
|
features.append('aws')
|
||||||
if build_with_jaeger:
|
|
||||||
features.append('jaeger')
|
|
||||||
features = ','.join(features)
|
features = ','.join(features)
|
||||||
|
|
||||||
env = os.environ.copy()
|
env = os.environ.copy()
|
||||||
|
|
|
@ -189,9 +189,12 @@ pub async fn main(config: Config) -> Result<()> {
|
||||||
|
|
||||||
let grpc_listener = grpc_listener(config.grpc_bind_address).await?;
|
let grpc_listener = grpc_listener(config.grpc_bind_address).await?;
|
||||||
let http_listener = http_listener(config.http_bind_address).await?;
|
let http_listener = http_listener(config.http_bind_address).await?;
|
||||||
let trace_collector = config.tracing_config.build().context(Tracing)?;
|
let async_exporter = config.tracing_config.build().context(Tracing)?;
|
||||||
|
let trace_collector = async_exporter
|
||||||
|
.clone()
|
||||||
|
.map(|x| -> Arc<dyn TraceCollector> { x });
|
||||||
|
|
||||||
serve(
|
let r = serve(
|
||||||
config,
|
config,
|
||||||
application,
|
application,
|
||||||
grpc_listener,
|
grpc_listener,
|
||||||
|
@ -199,7 +202,14 @@ pub async fn main(config: Config) -> Result<()> {
|
||||||
trace_collector,
|
trace_collector,
|
||||||
app_server,
|
app_server,
|
||||||
)
|
)
|
||||||
.await
|
.await;
|
||||||
|
|
||||||
|
if let Some(async_exporter) = async_exporter {
|
||||||
|
if let Err(e) = async_exporter.drain().await {
|
||||||
|
error!(%e, "error draining trace exporter");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
r
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn grpc_listener(addr: SocketAddr) -> Result<tokio::net::TcpListener> {
|
async fn grpc_listener(addr: SocketAddr) -> Result<tokio::net::TcpListener> {
|
||||||
|
@ -379,7 +389,7 @@ mod tests {
|
||||||
use tokio::task::JoinHandle;
|
use tokio::task::JoinHandle;
|
||||||
use trace::span::{Span, SpanStatus};
|
use trace::span::{Span, SpanStatus};
|
||||||
use trace::RingBufferTraceCollector;
|
use trace::RingBufferTraceCollector;
|
||||||
use trace_exporters::otel::{OtelExporter, TestOtelExporter};
|
use trace_exporters::export::{AsyncExporter, TestAsyncExporter};
|
||||||
|
|
||||||
fn test_config(server_id: Option<u32>) -> Config {
|
fn test_config(server_id: Option<u32>) -> Config {
|
||||||
let mut config = Config::from_iter(&[
|
let mut config = Config::from_iter(&[
|
||||||
|
@ -794,9 +804,9 @@ mod tests {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_otel_exporter() {
|
async fn test_async_exporter() {
|
||||||
let (sender, mut receiver) = tokio::sync::mpsc::channel(20);
|
let (sender, mut receiver) = tokio::sync::mpsc::channel(20);
|
||||||
let collector = Arc::new(OtelExporter::new(TestOtelExporter::new(sender)));
|
let collector = Arc::new(AsyncExporter::new(TestAsyncExporter::new(sender)));
|
||||||
|
|
||||||
let (addr, server, join) = tracing_server(&collector).await;
|
let (addr, server, join) = tracing_server(&collector).await;
|
||||||
let conn = jaeger_client(addr, "34f8495:30e34:0:1").await;
|
let conn = jaeger_client(addr, "34f8495:30e34:0:1").await;
|
||||||
|
@ -805,15 +815,14 @@ mod tests {
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
collector.shutdown();
|
collector.drain().await.unwrap();
|
||||||
collector.join().await.unwrap();
|
|
||||||
|
|
||||||
server.shutdown();
|
server.shutdown();
|
||||||
join.await.unwrap().unwrap();
|
join.await.unwrap().unwrap();
|
||||||
|
|
||||||
let span = receiver.recv().await.unwrap();
|
let span = receiver.recv().await.unwrap();
|
||||||
assert_eq!(span.span_context.trace_id().to_u128(), 0x34f8495);
|
assert_eq!(span.ctx.trace_id.get(), 0x34f8495);
|
||||||
assert_eq!(span.parent_span_id.to_u64(), 0x30e34);
|
assert_eq!(span.ctx.parent_span_id.unwrap().get(), 0x30e34);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn make_rules(db_name: impl Into<String>) -> ProvidedDatabaseRules {
|
fn make_rules(db_name: impl Into<String>) -> ProvidedDatabaseRules {
|
||||||
|
|
|
@ -6,18 +6,6 @@ use crate::common::{
|
||||||
use futures::TryStreamExt;
|
use futures::TryStreamExt;
|
||||||
use generated_types::{storage_client::StorageClient, ReadFilterRequest};
|
use generated_types::{storage_client::StorageClient, ReadFilterRequest};
|
||||||
|
|
||||||
// cfg at this level so IDE can resolve code even when jaeger feature is not active
|
|
||||||
#[cfg(feature = "jaeger")]
|
|
||||||
fn run_test() -> bool {
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(not(feature = "jaeger"))]
|
|
||||||
fn run_test() -> bool {
|
|
||||||
println!("Skipping test because jaeger feature not enabled");
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn setup() -> (UdpCapture, ServerFixture) {
|
async fn setup() -> (UdpCapture, ServerFixture) {
|
||||||
let udp_capture = UdpCapture::new().await;
|
let udp_capture = UdpCapture::new().await;
|
||||||
|
|
||||||
|
@ -59,10 +47,6 @@ async fn run_sql_query(server_fixture: &ServerFixture) {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
pub async fn test_tracing_sql() {
|
pub async fn test_tracing_sql() {
|
||||||
if !run_test() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let (udp_capture, server_fixture) = setup().await;
|
let (udp_capture, server_fixture) = setup().await;
|
||||||
run_sql_query(&server_fixture).await;
|
run_sql_query(&server_fixture).await;
|
||||||
|
|
||||||
|
@ -80,10 +64,6 @@ pub async fn test_tracing_sql() {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
pub async fn test_tracing_storage_api() {
|
pub async fn test_tracing_storage_api() {
|
||||||
if !run_test() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let (udp_capture, server_fixture) = setup().await;
|
let (udp_capture, server_fixture) = setup().await;
|
||||||
|
|
||||||
let scenario = Scenario::new();
|
let scenario = Scenario::new();
|
||||||
|
@ -127,9 +107,6 @@ pub async fn test_tracing_storage_api() {
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
pub async fn test_tracing_create_trace() {
|
pub async fn test_tracing_create_trace() {
|
||||||
if !run_test() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let udp_capture = UdpCapture::new().await;
|
let udp_capture = UdpCapture::new().await;
|
||||||
|
|
||||||
let test_config = TestConfig::new()
|
let test_config = TestConfig::new()
|
||||||
|
|
|
@ -87,6 +87,7 @@ pub enum MetaValue {
|
||||||
String(Cow<'static, str>),
|
String(Cow<'static, str>),
|
||||||
Float(f64),
|
Float(f64),
|
||||||
Int(i64),
|
Int(i64),
|
||||||
|
Bool(bool),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<&'static str> for MetaValue {
|
impl From<&'static str> for MetaValue {
|
||||||
|
|
|
@ -11,16 +11,10 @@ async-trait = "0.1"
|
||||||
chrono = { version = "0.4" }
|
chrono = { version = "0.4" }
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
observability_deps = { path = "../observability_deps" }
|
observability_deps = { path = "../observability_deps" }
|
||||||
opentelemetry = { version = "0.16" }
|
|
||||||
opentelemetry-jaeger = { version = "0.15", optional = true }
|
|
||||||
snafu = "0.6"
|
snafu = "0.6"
|
||||||
structopt = { version = "0.3.23" }
|
structopt = { version = "0.3.23" }
|
||||||
|
thrift = { version = "0.13.0" }
|
||||||
tokio = { version = "1.11", features = ["macros", "time", "sync", "rt"] }
|
tokio = { version = "1.11", features = ["macros", "time", "sync", "rt"] }
|
||||||
tokio-util = { version = "0.6.3" }
|
|
||||||
trace = { path = "../trace" }
|
trace = { path = "../trace" }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
|
||||||
[features]
|
|
||||||
default = []
|
|
||||||
jaeger = ["opentelemetry-jaeger"]
|
|
||||||
|
|
|
@ -0,0 +1,54 @@
|
||||||
|
# Trace Exporters
|
||||||
|
|
||||||
|
## Regenerating Jaeger Thrift
|
||||||
|
|
||||||
|
_The instructions below use docker, but this is optional._
|
||||||
|
|
||||||
|
_Depending on your setup there may be permissions complications that require using`-u`_
|
||||||
|
|
||||||
|
Startup a Debian bullseye image
|
||||||
|
|
||||||
|
```
|
||||||
|
docker run -it -v $PWD:/out debian:bullseye-slim
|
||||||
|
```
|
||||||
|
|
||||||
|
Install the thrift-compiler
|
||||||
|
|
||||||
|
```
|
||||||
|
$ apt-get update
|
||||||
|
$ apt-get install thrift-compiler wget
|
||||||
|
```
|
||||||
|
|
||||||
|
Verify the version of the compiler matches the version of `thrift` in [Cargo.toml](./Cargo.toml)
|
||||||
|
|
||||||
|
```
|
||||||
|
$ thrift --version
|
||||||
|
Thrift version 0.13.0
|
||||||
|
```
|
||||||
|
|
||||||
|
Get the IDL definition
|
||||||
|
|
||||||
|
```
|
||||||
|
$ wget https://raw.githubusercontent.com/jaegertracing/jaeger-idl/master/thrift/jaeger.thrift https://raw.githubusercontent.com/jaegertracing/jaeger-idl/master/thrift/zipkincore.thrift https://raw.githubusercontent.com/jaegertracing/jaeger-idl/master/thrift/agent.thrift
|
||||||
|
```
|
||||||
|
|
||||||
|
Generate the code
|
||||||
|
|
||||||
|
```
|
||||||
|
$ thrift --out /out/src/thrift --gen rs agent.thrift
|
||||||
|
$ thrift --out /out/src/thrift --gen rs jaeger.thrift
|
||||||
|
$ thrift --out /out/src/thrift --gen rs zipkincore.thrift
|
||||||
|
```
|
||||||
|
|
||||||
|
Patch up imports
|
||||||
|
|
||||||
|
```
|
||||||
|
sed -i 's/use jaeger;/use super::jaeger;/g' /out/src/thrift/agent.rs
|
||||||
|
sed -i 's/use zipkincore;/use super::zipkincore;/g' /out/src/thrift/agent.rs
|
||||||
|
```
|
||||||
|
|
||||||
|
Remove the clippy line
|
||||||
|
|
||||||
|
```
|
||||||
|
#![cfg_attr(feature = "cargo-clippy", allow(too_many_arguments, type_complexity))]
|
||||||
|
```
|
|
@ -0,0 +1,162 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use futures::{
|
||||||
|
future::{BoxFuture, Shared},
|
||||||
|
FutureExt, TryFutureExt,
|
||||||
|
};
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tokio::task::JoinError;
|
||||||
|
|
||||||
|
use observability_deps::tracing::{error, info, warn};
|
||||||
|
use trace::{span::Span, TraceCollector};
|
||||||
|
|
||||||
|
/// Size of the exporter buffer
|
||||||
|
const CHANNEL_SIZE: usize = 1000;
|
||||||
|
|
||||||
|
/// An `AsyncExport` is a batched async version of `trace::TraceCollector`
|
||||||
|
#[async_trait]
|
||||||
|
pub trait AsyncExport: Send + 'static {
|
||||||
|
async fn export(&mut self, span: Vec<Span>);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `AsyncExporter` wraps a `AsyncExport` and sinks spans to it
|
||||||
|
///
|
||||||
|
/// In order to do this it spawns a background worker that pulls messages
|
||||||
|
/// off a queue and writes them to the `AsyncExport`.
|
||||||
|
///
|
||||||
|
/// If this worker cannot keep up, and this queue fills up, spans will
|
||||||
|
/// be dropped and warnings logged
|
||||||
|
///
|
||||||
|
/// Note: Currently this does not batch spans (#2392)
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct AsyncExporter {
|
||||||
|
join: Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>,
|
||||||
|
|
||||||
|
/// Communication queue with the background worker
|
||||||
|
///
|
||||||
|
/// Sending None triggers termination
|
||||||
|
sender: tokio::sync::mpsc::Sender<Option<Span>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncExporter {
|
||||||
|
/// Creates a new `AsyncExporter`
|
||||||
|
pub fn new<T: AsyncExport>(collector: T) -> Self {
|
||||||
|
let (sender, receiver) = mpsc::channel(CHANNEL_SIZE);
|
||||||
|
|
||||||
|
let handle = tokio::spawn(background_worker(collector, receiver));
|
||||||
|
let join = handle.map_err(Arc::new).boxed().shared();
|
||||||
|
|
||||||
|
Self { join, sender }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Triggers shutdown of this `AsyncExporter` and waits until all in-flight
|
||||||
|
/// spans have been published to the `AsyncExport`
|
||||||
|
pub async fn drain(&self) -> Result<(), Arc<JoinError>> {
|
||||||
|
info!("batched exporter shutting down");
|
||||||
|
let _ = self.sender.send(None).await;
|
||||||
|
self.join.clone().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TraceCollector for AsyncExporter {
|
||||||
|
fn export(&self, span: Span) {
|
||||||
|
use mpsc::error::TrySendError;
|
||||||
|
match self.sender.try_send(Some(span)) {
|
||||||
|
Ok(_) => {
|
||||||
|
//TODO: Increment some metric (#2613)
|
||||||
|
}
|
||||||
|
Err(TrySendError::Full(_)) => {
|
||||||
|
warn!("exporter cannot keep up, dropping spans")
|
||||||
|
}
|
||||||
|
Err(TrySendError::Closed(_)) => {
|
||||||
|
warn!("background worker shutdown")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn background_worker<T: AsyncExport>(
|
||||||
|
mut exporter: T,
|
||||||
|
mut receiver: mpsc::Receiver<Option<Span>>,
|
||||||
|
) {
|
||||||
|
loop {
|
||||||
|
match receiver.recv().await {
|
||||||
|
Some(Some(span)) => exporter.export(vec![span]).await,
|
||||||
|
Some(None) => {
|
||||||
|
info!("async exporter shut down");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
error!("sender-side of async exporter dropped without waiting for shut down");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An `AsyncExporter` that sinks writes to a tokio mpsc channel.
|
||||||
|
///
|
||||||
|
/// Intended for testing ONLY
|
||||||
|
///
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct TestAsyncExporter {
|
||||||
|
channel: mpsc::Sender<Span>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TestAsyncExporter {
|
||||||
|
pub fn new(channel: mpsc::Sender<Span>) -> Self {
|
||||||
|
Self { channel }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl AsyncExport for TestAsyncExporter {
|
||||||
|
async fn export(&mut self, batch: Vec<Span>) {
|
||||||
|
for span in batch {
|
||||||
|
self.channel.send(span).await.expect("channel closed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use trace::ctx::SpanContext;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_exporter() {
|
||||||
|
let (sender, mut receiver) = mpsc::channel(10);
|
||||||
|
let exporter = AsyncExporter::new(TestAsyncExporter::new(sender));
|
||||||
|
|
||||||
|
let root = SpanContext::new(Arc::new(trace::LogTraceCollector::new()));
|
||||||
|
let s1 = root.child("foo");
|
||||||
|
let s2 = root.child("bar");
|
||||||
|
|
||||||
|
exporter.export(s1.clone());
|
||||||
|
exporter.export(s2.clone());
|
||||||
|
exporter.export(s2.clone());
|
||||||
|
|
||||||
|
// Drain should wait for all published spans to be flushed
|
||||||
|
exporter.drain().await.unwrap();
|
||||||
|
|
||||||
|
let r1 = receiver.recv().await.unwrap();
|
||||||
|
let r2 = receiver.recv().await.unwrap();
|
||||||
|
let r3 = receiver.recv().await.unwrap();
|
||||||
|
|
||||||
|
// Should not be fatal despite exporter having been shutdown
|
||||||
|
exporter.export(s2.clone());
|
||||||
|
|
||||||
|
assert_eq!(root.span_id.get(), r1.ctx.parent_span_id.unwrap().get());
|
||||||
|
assert_eq!(s1.ctx.span_id.get(), r1.ctx.span_id.get());
|
||||||
|
assert_eq!(s1.ctx.trace_id.get(), r1.ctx.trace_id.get());
|
||||||
|
|
||||||
|
assert_eq!(root.span_id.get(), r2.ctx.parent_span_id.unwrap().get());
|
||||||
|
assert_eq!(s2.ctx.span_id.get(), r2.ctx.span_id.get());
|
||||||
|
assert_eq!(s2.ctx.trace_id.get(), r2.ctx.trace_id.get());
|
||||||
|
|
||||||
|
assert_eq!(root.span_id.get(), r3.ctx.parent_span_id.unwrap().get());
|
||||||
|
assert_eq!(s2.ctx.span_id.get(), r3.ctx.span_id.get());
|
||||||
|
assert_eq!(s2.ctx.trace_id.get(), r3.ctx.trace_id.get());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,307 @@
|
||||||
|
use std::net::{SocketAddr, ToSocketAddrs, UdpSocket};
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
use observability_deps::tracing::{error, info};
|
||||||
|
use trace::span::Span;
|
||||||
|
|
||||||
|
use crate::export::AsyncExport;
|
||||||
|
use crate::thrift::agent::{AgentSyncClient, TAgentSyncClient};
|
||||||
|
use crate::thrift::jaeger;
|
||||||
|
use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol};
|
||||||
|
|
||||||
|
mod span;
|
||||||
|
|
||||||
|
/// `JaegerAgentExporter` receives span data and writes it over UDP to a local jaeger agent
|
||||||
|
///
|
||||||
|
/// Note: will drop data if the UDP socket would block
|
||||||
|
pub struct JaegerAgentExporter {
|
||||||
|
/// The name of the service
|
||||||
|
service_name: String,
|
||||||
|
|
||||||
|
/// The agent client that encodes messages
|
||||||
|
client:
|
||||||
|
AgentSyncClient<TCompactInputProtocol<NoopReader>, TCompactOutputProtocol<MessageWriter>>,
|
||||||
|
|
||||||
|
/// Spans should be assigned a sequential sequence number
|
||||||
|
/// to allow jaeger to better detect dropped spans
|
||||||
|
next_sequence: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JaegerAgentExporter {
|
||||||
|
pub fn new<E: ToSocketAddrs + std::fmt::Display>(
|
||||||
|
service_name: String,
|
||||||
|
agent_endpoint: E,
|
||||||
|
) -> super::Result<Self> {
|
||||||
|
info!(%agent_endpoint, %service_name, "Creating jaeger tracing exporter");
|
||||||
|
let remote_addr = agent_endpoint.to_socket_addrs()?.next().ok_or_else(|| {
|
||||||
|
super::Error::ResolutionError {
|
||||||
|
address: agent_endpoint.to_string(),
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let local_addr: SocketAddr = if remote_addr.is_ipv4() {
|
||||||
|
"0.0.0.0:0"
|
||||||
|
} else {
|
||||||
|
"[::]:0"
|
||||||
|
}
|
||||||
|
.parse()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let socket = UdpSocket::bind(local_addr)?;
|
||||||
|
socket.set_nonblocking(true)?;
|
||||||
|
socket.connect(remote_addr)?;
|
||||||
|
|
||||||
|
let client = AgentSyncClient::new(
|
||||||
|
TCompactInputProtocol::new(NoopReader::default()),
|
||||||
|
TCompactOutputProtocol::new(MessageWriter::new(socket)),
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
service_name,
|
||||||
|
client,
|
||||||
|
next_sequence: 0,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_batch(&mut self, spans: Vec<Span>) -> jaeger::Batch {
|
||||||
|
let seq_no = Some(self.next_sequence);
|
||||||
|
self.next_sequence += 1;
|
||||||
|
jaeger::Batch {
|
||||||
|
process: jaeger::Process {
|
||||||
|
service_name: self.service_name.clone(),
|
||||||
|
tags: None,
|
||||||
|
},
|
||||||
|
spans: spans.into_iter().map(Into::into).collect(),
|
||||||
|
seq_no,
|
||||||
|
stats: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl AsyncExport for JaegerAgentExporter {
|
||||||
|
async fn export(&mut self, spans: Vec<Span>) {
|
||||||
|
let batch = self.make_batch(spans);
|
||||||
|
if let Err(e) = self.client.emit_batch(batch) {
|
||||||
|
error!(%e, "error writing batch to jaeger agent")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `NoopReader` is a `std::io::Read` that never returns any data
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
struct NoopReader {}
|
||||||
|
|
||||||
|
impl std::io::Read for NoopReader {
|
||||||
|
fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
|
Ok(0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A `MessageWriter` only writes entire message payloads to the provided UDP socket
|
||||||
|
///
|
||||||
|
/// If the UDP socket would block, drops the packet
|
||||||
|
struct MessageWriter {
|
||||||
|
buf: Vec<u8>,
|
||||||
|
socket: UdpSocket,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MessageWriter {
|
||||||
|
fn new(socket: UdpSocket) -> Self {
|
||||||
|
Self {
|
||||||
|
buf: vec![],
|
||||||
|
socket,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::io::Write for MessageWriter {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||||
|
self.buf.extend_from_slice(buf);
|
||||||
|
Ok(buf.len())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&mut self) -> std::io::Result<()> {
|
||||||
|
let message_len = self.buf.len();
|
||||||
|
let r = self.socket.send(&self.buf);
|
||||||
|
self.buf.clear();
|
||||||
|
match r {
|
||||||
|
Ok(written) => {
|
||||||
|
if written != message_len {
|
||||||
|
// In the event a message is truncated, there isn't an obvious way to recover
|
||||||
|
//
|
||||||
|
// The Thrift protocol is normally used on top of a reliable stream,
|
||||||
|
// e.g. TCP, and it is a bit of a hack to send it over UDP
|
||||||
|
//
|
||||||
|
// Jaeger requires that each thrift Message is encoded in exactly one UDP
|
||||||
|
// packet, as this ensures it either arrives in its entirety or not at all
|
||||||
|
//
|
||||||
|
// If for whatever reason the packet is truncated, the agent will fail to
|
||||||
|
// to decode it, likely due to a missing stop-field, and discard it
|
||||||
|
error!(%written, %message_len, "jaeger agent exporter failed to write message as single UDP packet");
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
||||||
|
error!("jaeger agent exporter would have blocked - dropping message");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::thrift::agent::{AgentSyncHandler, AgentSyncProcessor};
|
||||||
|
use chrono::{TimeZone, Utc};
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use thrift::server::TProcessor;
|
||||||
|
use thrift::transport::TBufferChannel;
|
||||||
|
use trace::ctx::{SpanContext, SpanId, TraceId};
|
||||||
|
use trace::span::{SpanEvent, SpanStatus};
|
||||||
|
|
||||||
|
struct TestHandler {
|
||||||
|
batches: Arc<Mutex<Vec<jaeger::Batch>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AgentSyncHandler for TestHandler {
|
||||||
|
fn handle_emit_zipkin_batch(
|
||||||
|
&self,
|
||||||
|
_spans: Vec<crate::thrift::zipkincore::Span>,
|
||||||
|
) -> thrift::Result<()> {
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_emit_batch(&self, batch: jaeger::Batch) -> thrift::Result<()> {
|
||||||
|
self.batches.lock().unwrap().push(batch);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wraps a UdpSocket and a buffer the size of the max UDP datagram and provides
|
||||||
|
/// `std::io::Read` on this buffer's contents, ensuring that reads are not truncated
|
||||||
|
struct Reader {
|
||||||
|
socket: UdpSocket,
|
||||||
|
buffer: Box<[u8; 65535]>,
|
||||||
|
idx: usize,
|
||||||
|
len: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Reader {
|
||||||
|
pub fn new(socket: UdpSocket) -> Self {
|
||||||
|
Self {
|
||||||
|
socket,
|
||||||
|
buffer: Box::new([0; 65535]),
|
||||||
|
idx: 0,
|
||||||
|
len: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::io::Read for Reader {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
|
if self.idx == self.len {
|
||||||
|
self.idx = 0;
|
||||||
|
self.len = self.socket.recv(self.buffer.as_mut())?;
|
||||||
|
}
|
||||||
|
let to_read = buf.len().min(self.len - self.idx);
|
||||||
|
buf.copy_from_slice(&self.buffer[self.idx..(self.idx + to_read)]);
|
||||||
|
self.idx += to_read;
|
||||||
|
Ok(to_read)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_jaeger() {
|
||||||
|
let server = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
|
server
|
||||||
|
.set_read_timeout(Some(std::time::Duration::from_secs(1)))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let address = server.local_addr().unwrap();
|
||||||
|
let mut exporter = JaegerAgentExporter::new("service_name".to_string(), address).unwrap();
|
||||||
|
|
||||||
|
let batches = Arc::new(Mutex::new(vec![]));
|
||||||
|
|
||||||
|
let mut processor_input = TCompactInputProtocol::new(Reader::new(server));
|
||||||
|
let mut processor_output = TCompactOutputProtocol::new(TBufferChannel::with_capacity(0, 0));
|
||||||
|
let processor = AgentSyncProcessor::new(TestHandler {
|
||||||
|
batches: Arc::clone(&batches),
|
||||||
|
});
|
||||||
|
|
||||||
|
let ctx = SpanContext {
|
||||||
|
trace_id: TraceId::new(43434).unwrap(),
|
||||||
|
parent_span_id: None,
|
||||||
|
span_id: SpanId::new(3495993).unwrap(),
|
||||||
|
collector: None,
|
||||||
|
};
|
||||||
|
let mut span = ctx.child("foo");
|
||||||
|
span.status = SpanStatus::Ok;
|
||||||
|
span.events = vec![SpanEvent {
|
||||||
|
time: Utc.timestamp_nanos(200000),
|
||||||
|
msg: "hello".into(),
|
||||||
|
}];
|
||||||
|
span.start = Some(Utc.timestamp_nanos(100000));
|
||||||
|
span.end = Some(Utc.timestamp_nanos(300000));
|
||||||
|
|
||||||
|
exporter.export(vec![span.clone(), span.clone()]).await;
|
||||||
|
exporter.export(vec![span.clone()]).await;
|
||||||
|
|
||||||
|
processor
|
||||||
|
.process(&mut processor_input, &mut processor_output)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
processor
|
||||||
|
.process(&mut processor_input, &mut processor_output)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let batches = batches.lock().unwrap();
|
||||||
|
assert_eq!(batches.len(), 2);
|
||||||
|
|
||||||
|
let b1 = &batches[0];
|
||||||
|
|
||||||
|
assert_eq!(b1.spans.len(), 2);
|
||||||
|
assert_eq!(b1.process.service_name.as_str(), "service_name");
|
||||||
|
assert_eq!(b1.seq_no.unwrap(), 0);
|
||||||
|
|
||||||
|
let b2 = &batches[1];
|
||||||
|
assert_eq!(b2.spans.len(), 1);
|
||||||
|
assert_eq!(b2.process.service_name.as_str(), "service_name");
|
||||||
|
assert_eq!(b2.seq_no.unwrap(), 1);
|
||||||
|
|
||||||
|
let b1_s0 = &b1.spans[0];
|
||||||
|
|
||||||
|
assert_eq!(b1_s0, &b1.spans[1]);
|
||||||
|
assert_eq!(b1_s0, &b2.spans[0]);
|
||||||
|
|
||||||
|
assert_eq!(b1_s0.span_id, span.ctx.span_id.get() as i64);
|
||||||
|
assert_eq!(
|
||||||
|
b1_s0.parent_span_id,
|
||||||
|
span.ctx.parent_span_id.unwrap().get() as i64
|
||||||
|
);
|
||||||
|
|
||||||
|
// microseconds not nanoseconds
|
||||||
|
assert_eq!(b1_s0.start_time, 100);
|
||||||
|
assert_eq!(b1_s0.duration, 200);
|
||||||
|
|
||||||
|
let logs = b1_s0.logs.as_ref().unwrap();
|
||||||
|
assert_eq!(logs.len(), 1);
|
||||||
|
assert_eq!(logs[0].timestamp, 200);
|
||||||
|
assert_eq!(logs[0].fields.len(), 1);
|
||||||
|
assert_eq!(logs[0].fields[0].key.as_str(), "event");
|
||||||
|
assert_eq!(logs[0].fields[0].v_str.as_ref().unwrap().as_str(), "hello");
|
||||||
|
|
||||||
|
let tags = b1_s0.tags.as_ref().unwrap();
|
||||||
|
assert_eq!(tags.len(), 1);
|
||||||
|
assert_eq!(tags[0].key.as_str(), "ok");
|
||||||
|
assert!(tags[0].v_bool.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_resolve() {
|
||||||
|
JaegerAgentExporter::new("service_name".to_string(), "localhost:8082").unwrap();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,117 @@
|
||||||
|
/// Contains the conversion logic from a `trace::span::Span` to `thrift::jaeger::Span`
|
||||||
|
use crate::thrift::jaeger;
|
||||||
|
use trace::span::{MetaValue, Span, SpanEvent, SpanStatus};
|
||||||
|
|
||||||
|
impl From<Span> for jaeger::Span {
|
||||||
|
fn from(mut s: Span) -> Self {
|
||||||
|
let trace_id = s.ctx.trace_id.get();
|
||||||
|
let trace_id_high = (trace_id >> 64) as i64;
|
||||||
|
let trace_id_low = trace_id as i64;
|
||||||
|
|
||||||
|
// A parent span id of 0 indicates no parent span ID (span IDs are non-zero)
|
||||||
|
let parent_span_id = s.ctx.parent_span_id.map(|id| id.get()).unwrap_or_default() as i64;
|
||||||
|
|
||||||
|
let (start_time, duration) = match (s.start, s.end) {
|
||||||
|
(Some(start), Some(end)) => (
|
||||||
|
start.timestamp_nanos() / 1000,
|
||||||
|
(end - start).num_microseconds().expect("no overflow"),
|
||||||
|
),
|
||||||
|
(Some(start), _) => (start.timestamp_nanos() / 1000, 0),
|
||||||
|
_ => (0, 0),
|
||||||
|
};
|
||||||
|
|
||||||
|
// These don't appear to be standardised, however, the jaeger UI treats
|
||||||
|
// the presence of an "error" tag as indicating an error
|
||||||
|
match s.status {
|
||||||
|
SpanStatus::Ok => {
|
||||||
|
s.metadata
|
||||||
|
.entry("ok".into())
|
||||||
|
.or_insert(MetaValue::Bool(true));
|
||||||
|
}
|
||||||
|
SpanStatus::Err => {
|
||||||
|
s.metadata
|
||||||
|
.entry("error".into())
|
||||||
|
.or_insert(MetaValue::Bool(true));
|
||||||
|
}
|
||||||
|
SpanStatus::Unknown => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
let tags = match s.metadata.is_empty() {
|
||||||
|
true => None,
|
||||||
|
false => Some(
|
||||||
|
s.metadata
|
||||||
|
.into_iter()
|
||||||
|
.map(|(name, value)| tag_from_meta(name.to_string(), value))
|
||||||
|
.collect(),
|
||||||
|
),
|
||||||
|
};
|
||||||
|
|
||||||
|
let logs = match s.events.is_empty() {
|
||||||
|
true => None,
|
||||||
|
false => Some(s.events.into_iter().map(Into::into).collect()),
|
||||||
|
};
|
||||||
|
|
||||||
|
Self {
|
||||||
|
trace_id_low,
|
||||||
|
trace_id_high,
|
||||||
|
span_id: s.ctx.span_id.get() as i64,
|
||||||
|
parent_span_id,
|
||||||
|
operation_name: s.name.to_string(),
|
||||||
|
references: None,
|
||||||
|
flags: 0,
|
||||||
|
start_time,
|
||||||
|
duration,
|
||||||
|
tags,
|
||||||
|
logs,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<SpanEvent> for jaeger::Log {
|
||||||
|
fn from(event: SpanEvent) -> Self {
|
||||||
|
Self {
|
||||||
|
timestamp: event.time.timestamp_nanos() / 1000,
|
||||||
|
fields: vec![jaeger::Tag {
|
||||||
|
key: "event".to_string(),
|
||||||
|
v_type: jaeger::TagType::String,
|
||||||
|
v_str: Some(event.msg.to_string()),
|
||||||
|
v_double: None,
|
||||||
|
v_bool: None,
|
||||||
|
v_long: None,
|
||||||
|
v_binary: None,
|
||||||
|
}],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn tag_from_meta(key: String, value: MetaValue) -> jaeger::Tag {
|
||||||
|
let mut tag = jaeger::Tag {
|
||||||
|
key,
|
||||||
|
v_type: jaeger::TagType::String,
|
||||||
|
v_str: None,
|
||||||
|
v_double: None,
|
||||||
|
v_bool: None,
|
||||||
|
v_long: None,
|
||||||
|
v_binary: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
match value {
|
||||||
|
MetaValue::String(v) => {
|
||||||
|
tag.v_type = jaeger::TagType::String;
|
||||||
|
tag.v_str = Some(v.to_string())
|
||||||
|
}
|
||||||
|
MetaValue::Float(v) => {
|
||||||
|
tag.v_type = jaeger::TagType::Double;
|
||||||
|
tag.v_double = Some(v.into())
|
||||||
|
}
|
||||||
|
MetaValue::Int(v) => {
|
||||||
|
tag.v_type = jaeger::TagType::Long;
|
||||||
|
tag.v_long = Some(v)
|
||||||
|
}
|
||||||
|
MetaValue::Bool(v) => {
|
||||||
|
tag.v_type = jaeger::TagType::Bool;
|
||||||
|
tag.v_bool = Some(v)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
tag
|
||||||
|
}
|
|
@ -7,13 +7,34 @@
|
||||||
clippy::future_not_send
|
clippy::future_not_send
|
||||||
)]
|
)]
|
||||||
|
|
||||||
|
use crate::export::AsyncExporter;
|
||||||
|
use crate::jaeger::JaegerAgentExporter;
|
||||||
use snafu::Snafu;
|
use snafu::Snafu;
|
||||||
use std::num::NonZeroU16;
|
use std::num::NonZeroU16;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use structopt::StructOpt;
|
use structopt::StructOpt;
|
||||||
use trace::TraceCollector;
|
|
||||||
|
|
||||||
pub mod otel;
|
pub mod export;
|
||||||
|
|
||||||
|
mod jaeger;
|
||||||
|
|
||||||
|
/// Auto-generated thrift code
|
||||||
|
#[allow(
|
||||||
|
dead_code,
|
||||||
|
deprecated,
|
||||||
|
clippy::redundant_field_names,
|
||||||
|
clippy::unused_unit,
|
||||||
|
clippy::use_self,
|
||||||
|
clippy::too_many_arguments,
|
||||||
|
clippy::type_complexity
|
||||||
|
)]
|
||||||
|
mod thrift {
|
||||||
|
pub mod agent;
|
||||||
|
|
||||||
|
pub mod zipkincore;
|
||||||
|
|
||||||
|
pub mod jaeger;
|
||||||
|
}
|
||||||
|
|
||||||
/// CLI config for distributed tracing options
|
/// CLI config for distributed tracing options
|
||||||
#[derive(Debug, StructOpt, Clone)]
|
#[derive(Debug, StructOpt, Clone)]
|
||||||
|
@ -84,7 +105,7 @@ pub struct TracingConfig {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TracingConfig {
|
impl TracingConfig {
|
||||||
pub fn build(&self) -> Result<Option<Arc<dyn TraceCollector>>> {
|
pub fn build(&self) -> Result<Option<Arc<AsyncExporter>>> {
|
||||||
match self.traces_exporter {
|
match self.traces_exporter {
|
||||||
TracesExporter::None => Ok(None),
|
TracesExporter::None => Ok(None),
|
||||||
TracesExporter::Jaeger => Ok(Some(jaeger_exporter(self)?)),
|
TracesExporter::Jaeger => Ok(Some(jaeger_exporter(self)?)),
|
||||||
|
@ -115,23 +136,16 @@ impl std::str::FromStr for TracesExporter {
|
||||||
|
|
||||||
#[derive(Debug, Snafu)]
|
#[derive(Debug, Snafu)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[snafu(display("failed to construct trace exporter: {}", source))]
|
#[snafu(display("Failed to resolve address: {}", address))]
|
||||||
TraceExporter {
|
ResolutionError { address: String },
|
||||||
source: opentelemetry::trace::TraceError,
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display(
|
#[snafu(context(false))]
|
||||||
"'jaeger' not supported with this build. Hint: recompile with appropriate features"
|
IOError { source: std::io::Error },
|
||||||
))]
|
|
||||||
JaegerNotBuilt {},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
|
||||||
#[cfg(feature = "jaeger")]
|
fn jaeger_exporter(config: &TracingConfig) -> Result<Arc<AsyncExporter>> {
|
||||||
fn jaeger_exporter(config: &TracingConfig) -> Result<Arc<dyn TraceCollector>> {
|
|
||||||
use observability_deps::tracing::info;
|
|
||||||
|
|
||||||
let agent_endpoint = format!(
|
let agent_endpoint = format!(
|
||||||
"{}:{}",
|
"{}:{}",
|
||||||
config.traces_exporter_jaeger_agent_host.trim(),
|
config.traces_exporter_jaeger_agent_host.trim(),
|
||||||
|
@ -139,18 +153,7 @@ fn jaeger_exporter(config: &TracingConfig) -> Result<Arc<dyn TraceCollector>> {
|
||||||
);
|
);
|
||||||
|
|
||||||
let service_name = &config.traces_exporter_jaeger_service_name;
|
let service_name = &config.traces_exporter_jaeger_service_name;
|
||||||
info!(%agent_endpoint, %service_name, "Creating jaeger tracing exporter");
|
let jaeger = JaegerAgentExporter::new(service_name.clone(), agent_endpoint)?;
|
||||||
|
|
||||||
let exporter = opentelemetry_jaeger::new_pipeline()
|
Ok(Arc::new(AsyncExporter::new(jaeger)))
|
||||||
.with_agent_endpoint(agent_endpoint)
|
|
||||||
.with_service_name(&config.traces_exporter_jaeger_service_name)
|
|
||||||
.init_sync_exporter()
|
|
||||||
.map_err(|source| Error::TraceExporter { source })?;
|
|
||||||
|
|
||||||
Ok(Arc::new(otel::OtelExporter::new(exporter)))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(not(feature = "jaeger"))]
|
|
||||||
fn jaeger_exporter(_config: &TracingConfig) -> Result<Arc<dyn TraceCollector>> {
|
|
||||||
Err(Error::JaegerNotBuilt {})
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,356 +0,0 @@
|
||||||
use std::borrow::Cow;
|
|
||||||
use std::future::Future;
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use futures::{
|
|
||||||
future::{BoxFuture, Shared},
|
|
||||||
FutureExt, TryFutureExt,
|
|
||||||
};
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
use tokio::task::JoinError;
|
|
||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
|
|
||||||
use observability_deps::tracing::{error, info, warn};
|
|
||||||
use opentelemetry::sdk::export::trace::{ExportResult, SpanData, SpanExporter};
|
|
||||||
|
|
||||||
use trace::ctx::{SpanContext, SpanId, TraceId};
|
|
||||||
use trace::span::{MetaValue, SpanEvent, SpanStatus};
|
|
||||||
use trace::{span::Span, TraceCollector};
|
|
||||||
|
|
||||||
/// Size of the exporter buffer
|
|
||||||
const CHANNEL_SIZE: usize = 1000;
|
|
||||||
|
|
||||||
/// Maximum number of events that can be associated with a span
|
|
||||||
const MAX_EVENTS: u32 = 100;
|
|
||||||
|
|
||||||
/// Maximum number of attributes that can be associated with a span
|
|
||||||
const MAX_ATTRIBUTES: u32 = 100;
|
|
||||||
|
|
||||||
/// `OtelExporter` wraps a opentelemetry SpanExporter and sinks spans to it
|
|
||||||
///
|
|
||||||
/// In order to do this it spawns a background worker that pulls messages
|
|
||||||
/// of a queue and writes them to opentelemetry. If this worker cannot keep
|
|
||||||
/// up, and this queue fills up, spans will be dropped and warnings logged
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct OtelExporter {
|
|
||||||
join: Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>,
|
|
||||||
|
|
||||||
sender: tokio::sync::mpsc::Sender<SpanData>,
|
|
||||||
|
|
||||||
shutdown: CancellationToken,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl OtelExporter {
|
|
||||||
/// Creates a new `OtelExporter`
|
|
||||||
pub fn new<T: SpanExporter + 'static>(exporter: T) -> Self {
|
|
||||||
let shutdown = CancellationToken::new();
|
|
||||||
let (sender, receiver) = mpsc::channel(CHANNEL_SIZE);
|
|
||||||
|
|
||||||
let handle = tokio::spawn(background_worker(shutdown.clone(), exporter, receiver));
|
|
||||||
let join = handle.map_err(Arc::new).boxed().shared();
|
|
||||||
|
|
||||||
Self {
|
|
||||||
join,
|
|
||||||
shutdown,
|
|
||||||
sender,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Triggers shutdown of this `OtelExporter`
|
|
||||||
pub fn shutdown(&self) {
|
|
||||||
info!("otel exporter shutting down");
|
|
||||||
self.shutdown.cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Waits for the background worker of OtelExporter to finish
|
|
||||||
pub fn join(&self) -> impl Future<Output = Result<(), Arc<JoinError>>> {
|
|
||||||
self.join.clone()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TraceCollector for OtelExporter {
|
|
||||||
fn export(&self, span: Span) {
|
|
||||||
use mpsc::error::TrySendError;
|
|
||||||
|
|
||||||
match self.sender.try_send(convert_span(span)) {
|
|
||||||
Ok(_) => {
|
|
||||||
//TODO: Increment some metric
|
|
||||||
}
|
|
||||||
Err(TrySendError::Full(_)) => {
|
|
||||||
warn!("exporter cannot keep up, dropping spans")
|
|
||||||
}
|
|
||||||
Err(TrySendError::Closed(_)) => {
|
|
||||||
warn!("background worker shutdown")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn background_worker<T: SpanExporter + 'static>(
|
|
||||||
shutdown: CancellationToken,
|
|
||||||
exporter: T,
|
|
||||||
receiver: mpsc::Receiver<SpanData>,
|
|
||||||
) {
|
|
||||||
tokio::select! {
|
|
||||||
_ = exporter_loop(exporter, receiver) => {
|
|
||||||
// Don't expect this future to complete
|
|
||||||
error!("otel exporter loop completed")
|
|
||||||
}
|
|
||||||
_ = shutdown.cancelled() => {}
|
|
||||||
}
|
|
||||||
info!("otel exporter shut down")
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An opentelemetry::SpanExporter that sinks writes to a tokio mpsc channel.
|
|
||||||
///
|
|
||||||
/// Intended for testing ONLY
|
|
||||||
///
|
|
||||||
/// Note: There is a similar construct in opentelemetry behind the testing feature
|
|
||||||
/// flag, but enabling this brings in a large number of additional dependencies and
|
|
||||||
/// so we just implement our own version
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct TestOtelExporter {
|
|
||||||
channel: mpsc::Sender<SpanData>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TestOtelExporter {
|
|
||||||
pub fn new(channel: mpsc::Sender<SpanData>) -> Self {
|
|
||||||
Self { channel }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl SpanExporter for TestOtelExporter {
|
|
||||||
async fn export(&mut self, batch: Vec<SpanData>) -> ExportResult {
|
|
||||||
for span in batch {
|
|
||||||
self.channel.send(span).await.expect("channel closed")
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn exporter_loop<T: SpanExporter + 'static>(
|
|
||||||
mut exporter: T,
|
|
||||||
mut receiver: tokio::sync::mpsc::Receiver<SpanData>,
|
|
||||||
) {
|
|
||||||
while let Some(span) = receiver.recv().await {
|
|
||||||
// TODO: Batch export spans
|
|
||||||
if let Err(e) = exporter.export(vec![span]).await {
|
|
||||||
error!(%e, "error exporting span")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
warn!("sender-side of jaeger exporter dropped without waiting for shut down")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn convert_span(span: Span) -> SpanData {
|
|
||||||
use opentelemetry::sdk::trace::{EvictedHashMap, EvictedQueue};
|
|
||||||
use opentelemetry::sdk::InstrumentationLibrary;
|
|
||||||
use opentelemetry::trace::{SpanId, SpanKind};
|
|
||||||
use opentelemetry::{Key, KeyValue};
|
|
||||||
|
|
||||||
let parent_span_id = match span.ctx.parent_span_id {
|
|
||||||
Some(id) => convert_span_id(id),
|
|
||||||
None => SpanId::invalid(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut ret = SpanData {
|
|
||||||
span_context: convert_ctx(&span.ctx),
|
|
||||||
parent_span_id,
|
|
||||||
span_kind: SpanKind::Server,
|
|
||||||
name: span.name,
|
|
||||||
start_time: span.start.map(Into::into).unwrap_or(std::time::UNIX_EPOCH),
|
|
||||||
end_time: span.end.map(Into::into).unwrap_or(std::time::UNIX_EPOCH),
|
|
||||||
attributes: EvictedHashMap::new(MAX_ATTRIBUTES, 0),
|
|
||||||
events: EvictedQueue::new(MAX_EVENTS),
|
|
||||||
links: EvictedQueue::new(0),
|
|
||||||
status_code: convert_status(span.status),
|
|
||||||
status_message: Default::default(),
|
|
||||||
resource: None,
|
|
||||||
instrumentation_lib: InstrumentationLibrary::new("iox-trace", None),
|
|
||||||
};
|
|
||||||
|
|
||||||
ret.events
|
|
||||||
.extend(span.events.into_iter().map(convert_event));
|
|
||||||
for (key, value) in span.metadata {
|
|
||||||
let key = match key {
|
|
||||||
Cow::Owned(key) => Key::new(key),
|
|
||||||
Cow::Borrowed(key) => Key::new(key),
|
|
||||||
};
|
|
||||||
|
|
||||||
let value = convert_meta_value(value);
|
|
||||||
ret.attributes.insert(KeyValue::new(key, value))
|
|
||||||
}
|
|
||||||
ret
|
|
||||||
}
|
|
||||||
|
|
||||||
fn convert_ctx(ctx: &SpanContext) -> opentelemetry::trace::SpanContext {
|
|
||||||
opentelemetry::trace::SpanContext::new(
|
|
||||||
convert_trace_id(ctx.trace_id),
|
|
||||||
convert_span_id(ctx.span_id),
|
|
||||||
Default::default(),
|
|
||||||
false,
|
|
||||||
Default::default(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn convert_event(event: SpanEvent) -> opentelemetry::trace::Event {
|
|
||||||
opentelemetry::trace::Event {
|
|
||||||
name: event.msg,
|
|
||||||
timestamp: event.time.into(),
|
|
||||||
attributes: vec![],
|
|
||||||
dropped_attributes_count: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn convert_status(status: SpanStatus) -> opentelemetry::trace::StatusCode {
|
|
||||||
use opentelemetry::trace::StatusCode;
|
|
||||||
match status {
|
|
||||||
SpanStatus::Unknown => StatusCode::Unset,
|
|
||||||
SpanStatus::Ok => StatusCode::Ok,
|
|
||||||
SpanStatus::Err => StatusCode::Error,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn convert_span_id(id: SpanId) -> opentelemetry::trace::SpanId {
|
|
||||||
opentelemetry::trace::SpanId::from_u64(id.0.get())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn convert_trace_id(id: TraceId) -> opentelemetry::trace::TraceId {
|
|
||||||
opentelemetry::trace::TraceId::from_u128(id.0.get())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn convert_meta_value(v: MetaValue) -> opentelemetry::Value {
|
|
||||||
match v {
|
|
||||||
MetaValue::String(v) => opentelemetry::Value::String(v),
|
|
||||||
MetaValue::Float(v) => opentelemetry::Value::F64(v),
|
|
||||||
MetaValue::Int(v) => opentelemetry::Value::I64(v),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use chrono::{TimeZone, Utc};
|
|
||||||
use opentelemetry::{Key, Value};
|
|
||||||
use std::time::{Duration, UNIX_EPOCH};
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_conversion() {
|
|
||||||
let root = SpanContext {
|
|
||||||
trace_id: TraceId::new(232345).unwrap(),
|
|
||||||
parent_span_id: Some(SpanId::new(2484).unwrap()),
|
|
||||||
span_id: SpanId::new(2343).unwrap(),
|
|
||||||
collector: None,
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut span = root.child("foo");
|
|
||||||
span.metadata.insert("string".into(), "bar".into());
|
|
||||||
span.metadata.insert("float".into(), 3.32.into());
|
|
||||||
span.metadata.insert("int".into(), 5.into());
|
|
||||||
|
|
||||||
span.events.push(SpanEvent {
|
|
||||||
time: Utc.timestamp_nanos(1230),
|
|
||||||
msg: "event".into(),
|
|
||||||
});
|
|
||||||
span.status = SpanStatus::Ok;
|
|
||||||
|
|
||||||
span.start = Some(Utc.timestamp_nanos(1000));
|
|
||||||
span.end = Some(Utc.timestamp_nanos(2000));
|
|
||||||
|
|
||||||
let span_data: SpanData = convert_span(span.clone());
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
span_data.span_context.span_id().to_u64(),
|
|
||||||
span.ctx.span_id.get()
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
span_data.span_context.trace_id().to_u128(),
|
|
||||||
span.ctx.trace_id.get()
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
span_data.parent_span_id.to_u64(),
|
|
||||||
span.ctx.parent_span_id.unwrap().get()
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
span_data.start_time,
|
|
||||||
UNIX_EPOCH + Duration::from_nanos(1000)
|
|
||||||
);
|
|
||||||
assert_eq!(span_data.end_time, UNIX_EPOCH + Duration::from_nanos(2000));
|
|
||||||
|
|
||||||
let events: Vec<_> = span_data.events.iter().collect();
|
|
||||||
assert_eq!(events.len(), 1);
|
|
||||||
assert_eq!(events[0].name.as_ref(), "event");
|
|
||||||
assert_eq!(events[0].timestamp, UNIX_EPOCH + Duration::from_nanos(1230));
|
|
||||||
assert_eq!(events[0].attributes.len(), 0);
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
span_data
|
|
||||||
.attributes
|
|
||||||
.get(&Key::from_static_str("string"))
|
|
||||||
.unwrap()
|
|
||||||
.clone(),
|
|
||||||
Value::String("bar".into())
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
span_data
|
|
||||||
.attributes
|
|
||||||
.get(&Key::from_static_str("float"))
|
|
||||||
.unwrap()
|
|
||||||
.clone(),
|
|
||||||
Value::F64(3.32)
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
span_data
|
|
||||||
.attributes
|
|
||||||
.get(&Key::from_static_str("int"))
|
|
||||||
.unwrap()
|
|
||||||
.clone(),
|
|
||||||
Value::I64(5)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_exporter() {
|
|
||||||
let (sender, mut receiver) = mpsc::channel(10);
|
|
||||||
let exporter = OtelExporter::new(TestOtelExporter::new(sender));
|
|
||||||
|
|
||||||
assert!(exporter.join().now_or_never().is_none());
|
|
||||||
|
|
||||||
let root = SpanContext {
|
|
||||||
trace_id: TraceId::new(232345).unwrap(),
|
|
||||||
parent_span_id: None,
|
|
||||||
span_id: SpanId::new(2343).unwrap(),
|
|
||||||
collector: None,
|
|
||||||
};
|
|
||||||
let s1 = root.child("foo");
|
|
||||||
let s2 = root.child("bar");
|
|
||||||
|
|
||||||
exporter.export(s1.clone());
|
|
||||||
exporter.export(s2.clone());
|
|
||||||
exporter.export(s2.clone());
|
|
||||||
|
|
||||||
let r1 = receiver.recv().await.unwrap();
|
|
||||||
let r2 = receiver.recv().await.unwrap();
|
|
||||||
let r3 = receiver.recv().await.unwrap();
|
|
||||||
|
|
||||||
exporter.shutdown();
|
|
||||||
exporter.join().await.unwrap();
|
|
||||||
|
|
||||||
// Should not be fatal despite exporter having been shutdown
|
|
||||||
exporter.export(s2.clone());
|
|
||||||
|
|
||||||
assert_eq!(root.span_id.get(), r1.parent_span_id.to_u64());
|
|
||||||
assert_eq!(s1.ctx.span_id.get(), r1.span_context.span_id().to_u64());
|
|
||||||
assert_eq!(s1.ctx.trace_id.get(), r1.span_context.trace_id().to_u128());
|
|
||||||
|
|
||||||
assert_eq!(root.span_id.get(), r2.parent_span_id.to_u64());
|
|
||||||
assert_eq!(s2.ctx.span_id.get(), r2.span_context.span_id().to_u64());
|
|
||||||
assert_eq!(s2.ctx.trace_id.get(), r2.span_context.trace_id().to_u128());
|
|
||||||
|
|
||||||
assert_eq!(root.span_id.get(), r3.parent_span_id.to_u64());
|
|
||||||
assert_eq!(s2.ctx.span_id.get(), r3.span_context.span_id().to_u64());
|
|
||||||
assert_eq!(s2.ctx.trace_id.get(), r3.span_context.trace_id().to_u128());
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,305 @@
|
||||||
|
// Autogenerated by Thrift Compiler (0.13.0)
|
||||||
|
// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
||||||
|
|
||||||
|
#![allow(unused_imports)]
|
||||||
|
#![allow(unused_extern_crates)]
|
||||||
|
#![cfg_attr(rustfmt, rustfmt_skip)]
|
||||||
|
|
||||||
|
extern crate thrift;
|
||||||
|
|
||||||
|
use thrift::OrderedFloat;
|
||||||
|
use std::cell::RefCell;
|
||||||
|
use std::collections::{BTreeMap, BTreeSet};
|
||||||
|
use std::convert::{From, TryFrom};
|
||||||
|
use std::default::Default;
|
||||||
|
use std::error::Error;
|
||||||
|
use std::fmt;
|
||||||
|
use std::fmt::{Display, Formatter};
|
||||||
|
use std::rc::Rc;
|
||||||
|
|
||||||
|
use thrift::{ApplicationError, ApplicationErrorKind, ProtocolError, ProtocolErrorKind, TThriftClient};
|
||||||
|
use thrift::protocol::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType, TInputProtocol, TOutputProtocol, TSetIdentifier, TStructIdentifier, TType};
|
||||||
|
use thrift::protocol::field_id;
|
||||||
|
use thrift::protocol::verify_expected_message_type;
|
||||||
|
use thrift::protocol::verify_expected_sequence_number;
|
||||||
|
use thrift::protocol::verify_expected_service_call;
|
||||||
|
use thrift::protocol::verify_required_field_exists;
|
||||||
|
use thrift::server::TProcessor;
|
||||||
|
|
||||||
|
use super::jaeger;
|
||||||
|
use super::zipkincore;
|
||||||
|
|
||||||
|
//
|
||||||
|
// Agent service client
|
||||||
|
//
|
||||||
|
|
||||||
|
pub trait TAgentSyncClient {
|
||||||
|
fn emit_zipkin_batch(&mut self, spans: Vec<zipkincore::Span>) -> thrift::Result<()>;
|
||||||
|
fn emit_batch(&mut self, batch: jaeger::Batch) -> thrift::Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait TAgentSyncClientMarker {}
|
||||||
|
|
||||||
|
pub struct AgentSyncClient<IP, OP> where IP: TInputProtocol, OP: TOutputProtocol {
|
||||||
|
_i_prot: IP,
|
||||||
|
_o_prot: OP,
|
||||||
|
_sequence_number: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <IP, OP> AgentSyncClient<IP, OP> where IP: TInputProtocol, OP: TOutputProtocol {
|
||||||
|
pub fn new(input_protocol: IP, output_protocol: OP) -> AgentSyncClient<IP, OP> {
|
||||||
|
AgentSyncClient { _i_prot: input_protocol, _o_prot: output_protocol, _sequence_number: 0 }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <IP, OP> TThriftClient for AgentSyncClient<IP, OP> where IP: TInputProtocol, OP: TOutputProtocol {
|
||||||
|
fn i_prot_mut(&mut self) -> &mut dyn TInputProtocol { &mut self._i_prot }
|
||||||
|
fn o_prot_mut(&mut self) -> &mut dyn TOutputProtocol { &mut self._o_prot }
|
||||||
|
fn sequence_number(&self) -> i32 { self._sequence_number }
|
||||||
|
fn increment_sequence_number(&mut self) -> i32 { self._sequence_number += 1; self._sequence_number }
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <IP, OP> TAgentSyncClientMarker for AgentSyncClient<IP, OP> where IP: TInputProtocol, OP: TOutputProtocol {}
|
||||||
|
|
||||||
|
impl <C: TThriftClient + TAgentSyncClientMarker> TAgentSyncClient for C {
|
||||||
|
fn emit_zipkin_batch(&mut self, spans: Vec<zipkincore::Span>) -> thrift::Result<()> {
|
||||||
|
(
|
||||||
|
{
|
||||||
|
self.increment_sequence_number();
|
||||||
|
let message_ident = TMessageIdentifier::new("emitZipkinBatch", TMessageType::OneWay, self.sequence_number());
|
||||||
|
let call_args = AgentEmitZipkinBatchArgs { spans: spans };
|
||||||
|
self.o_prot_mut().write_message_begin(&message_ident)?;
|
||||||
|
call_args.write_to_out_protocol(self.o_prot_mut())?;
|
||||||
|
self.o_prot_mut().write_message_end()?;
|
||||||
|
self.o_prot_mut().flush()
|
||||||
|
}
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
fn emit_batch(&mut self, batch: jaeger::Batch) -> thrift::Result<()> {
|
||||||
|
(
|
||||||
|
{
|
||||||
|
self.increment_sequence_number();
|
||||||
|
let message_ident = TMessageIdentifier::new("emitBatch", TMessageType::OneWay, self.sequence_number());
|
||||||
|
let call_args = AgentEmitBatchArgs { batch: batch };
|
||||||
|
self.o_prot_mut().write_message_begin(&message_ident)?;
|
||||||
|
call_args.write_to_out_protocol(self.o_prot_mut())?;
|
||||||
|
self.o_prot_mut().write_message_end()?;
|
||||||
|
self.o_prot_mut().flush()
|
||||||
|
}
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// Agent service processor
|
||||||
|
//
|
||||||
|
|
||||||
|
pub trait AgentSyncHandler {
|
||||||
|
fn handle_emit_zipkin_batch(&self, spans: Vec<zipkincore::Span>) -> thrift::Result<()>;
|
||||||
|
fn handle_emit_batch(&self, batch: jaeger::Batch) -> thrift::Result<()>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct AgentSyncProcessor<H: AgentSyncHandler> {
|
||||||
|
handler: H,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <H: AgentSyncHandler> AgentSyncProcessor<H> {
|
||||||
|
pub fn new(handler: H) -> AgentSyncProcessor<H> {
|
||||||
|
AgentSyncProcessor {
|
||||||
|
handler,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn process_emit_zipkin_batch(&self, incoming_sequence_number: i32, i_prot: &mut dyn TInputProtocol, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> {
|
||||||
|
TAgentProcessFunctions::process_emit_zipkin_batch(&self.handler, incoming_sequence_number, i_prot, o_prot)
|
||||||
|
}
|
||||||
|
fn process_emit_batch(&self, incoming_sequence_number: i32, i_prot: &mut dyn TInputProtocol, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> {
|
||||||
|
TAgentProcessFunctions::process_emit_batch(&self.handler, incoming_sequence_number, i_prot, o_prot)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct TAgentProcessFunctions;
|
||||||
|
|
||||||
|
impl TAgentProcessFunctions {
|
||||||
|
pub fn process_emit_zipkin_batch<H: AgentSyncHandler>(handler: &H, _: i32, i_prot: &mut dyn TInputProtocol, _: &mut dyn TOutputProtocol) -> thrift::Result<()> {
|
||||||
|
let args = AgentEmitZipkinBatchArgs::read_from_in_protocol(i_prot)?;
|
||||||
|
match handler.handle_emit_zipkin_batch(args.spans) {
|
||||||
|
Ok(_) => {
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
match e {
|
||||||
|
thrift::Error::Application(app_err) => {
|
||||||
|
Err(thrift::Error::Application(app_err))
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
let ret_err = {
|
||||||
|
ApplicationError::new(
|
||||||
|
ApplicationErrorKind::Unknown,
|
||||||
|
e.description()
|
||||||
|
)
|
||||||
|
};
|
||||||
|
Err(thrift::Error::Application(ret_err))
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn process_emit_batch<H: AgentSyncHandler>(handler: &H, _: i32, i_prot: &mut dyn TInputProtocol, _: &mut dyn TOutputProtocol) -> thrift::Result<()> {
|
||||||
|
let args = AgentEmitBatchArgs::read_from_in_protocol(i_prot)?;
|
||||||
|
match handler.handle_emit_batch(args.batch) {
|
||||||
|
Ok(_) => {
|
||||||
|
Ok(())
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
match e {
|
||||||
|
thrift::Error::Application(app_err) => {
|
||||||
|
Err(thrift::Error::Application(app_err))
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
let ret_err = {
|
||||||
|
ApplicationError::new(
|
||||||
|
ApplicationErrorKind::Unknown,
|
||||||
|
e.description()
|
||||||
|
)
|
||||||
|
};
|
||||||
|
Err(thrift::Error::Application(ret_err))
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl <H: AgentSyncHandler> TProcessor for AgentSyncProcessor<H> {
|
||||||
|
fn process(&self, i_prot: &mut dyn TInputProtocol, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> {
|
||||||
|
let message_ident = i_prot.read_message_begin()?;
|
||||||
|
let res = match &*message_ident.name {
|
||||||
|
"emitZipkinBatch" => {
|
||||||
|
self.process_emit_zipkin_batch(message_ident.sequence_number, i_prot, o_prot)
|
||||||
|
},
|
||||||
|
"emitBatch" => {
|
||||||
|
self.process_emit_batch(message_ident.sequence_number, i_prot, o_prot)
|
||||||
|
},
|
||||||
|
method => {
|
||||||
|
Err(
|
||||||
|
thrift::Error::Application(
|
||||||
|
ApplicationError::new(
|
||||||
|
ApplicationErrorKind::UnknownMethod,
|
||||||
|
format!("unknown method {}", method)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
},
|
||||||
|
};
|
||||||
|
thrift::server::handle_process_result(&message_ident, res, o_prot)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// AgentEmitZipkinBatchArgs
|
||||||
|
//
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
|
||||||
|
struct AgentEmitZipkinBatchArgs {
|
||||||
|
spans: Vec<zipkincore::Span>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AgentEmitZipkinBatchArgs {
|
||||||
|
fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result<AgentEmitZipkinBatchArgs> {
|
||||||
|
i_prot.read_struct_begin()?;
|
||||||
|
let mut f_1: Option<Vec<zipkincore::Span>> = None;
|
||||||
|
loop {
|
||||||
|
let field_ident = i_prot.read_field_begin()?;
|
||||||
|
if field_ident.field_type == TType::Stop {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let field_id = field_id(&field_ident)?;
|
||||||
|
match field_id {
|
||||||
|
1 => {
|
||||||
|
let list_ident = i_prot.read_list_begin()?;
|
||||||
|
let mut val: Vec<zipkincore::Span> = Vec::with_capacity(list_ident.size as usize);
|
||||||
|
for _ in 0..list_ident.size {
|
||||||
|
let list_elem_0 = zipkincore::Span::read_from_in_protocol(i_prot)?;
|
||||||
|
val.push(list_elem_0);
|
||||||
|
}
|
||||||
|
i_prot.read_list_end()?;
|
||||||
|
f_1 = Some(val);
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
i_prot.skip(field_ident.field_type)?;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
i_prot.read_field_end()?;
|
||||||
|
}
|
||||||
|
i_prot.read_struct_end()?;
|
||||||
|
verify_required_field_exists("AgentEmitZipkinBatchArgs.spans", &f_1)?;
|
||||||
|
let ret = AgentEmitZipkinBatchArgs {
|
||||||
|
spans: f_1.expect("auto-generated code should have checked for presence of required fields"),
|
||||||
|
};
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
|
fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> {
|
||||||
|
let struct_ident = TStructIdentifier::new("emitZipkinBatch_args");
|
||||||
|
o_prot.write_struct_begin(&struct_ident)?;
|
||||||
|
o_prot.write_field_begin(&TFieldIdentifier::new("spans", TType::List, 1))?;
|
||||||
|
o_prot.write_list_begin(&TListIdentifier::new(TType::Struct, self.spans.len() as i32))?;
|
||||||
|
for e in &self.spans {
|
||||||
|
e.write_to_out_protocol(o_prot)?;
|
||||||
|
o_prot.write_list_end()?;
|
||||||
|
}
|
||||||
|
o_prot.write_field_end()?;
|
||||||
|
o_prot.write_field_stop()?;
|
||||||
|
o_prot.write_struct_end()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
// AgentEmitBatchArgs
|
||||||
|
//
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
|
||||||
|
struct AgentEmitBatchArgs {
|
||||||
|
batch: jaeger::Batch,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AgentEmitBatchArgs {
|
||||||
|
fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result<AgentEmitBatchArgs> {
|
||||||
|
i_prot.read_struct_begin()?;
|
||||||
|
let mut f_1: Option<jaeger::Batch> = None;
|
||||||
|
loop {
|
||||||
|
let field_ident = i_prot.read_field_begin()?;
|
||||||
|
if field_ident.field_type == TType::Stop {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let field_id = field_id(&field_ident)?;
|
||||||
|
match field_id {
|
||||||
|
1 => {
|
||||||
|
let val = jaeger::Batch::read_from_in_protocol(i_prot)?;
|
||||||
|
f_1 = Some(val);
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
i_prot.skip(field_ident.field_type)?;
|
||||||
|
},
|
||||||
|
};
|
||||||
|
i_prot.read_field_end()?;
|
||||||
|
}
|
||||||
|
i_prot.read_struct_end()?;
|
||||||
|
verify_required_field_exists("AgentEmitBatchArgs.batch", &f_1)?;
|
||||||
|
let ret = AgentEmitBatchArgs {
|
||||||
|
batch: f_1.expect("auto-generated code should have checked for presence of required fields"),
|
||||||
|
};
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
|
fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> {
|
||||||
|
let struct_ident = TStructIdentifier::new("emitBatch_args");
|
||||||
|
o_prot.write_struct_begin(&struct_ident)?;
|
||||||
|
o_prot.write_field_begin(&TFieldIdentifier::new("batch", TType::Struct, 1))?;
|
||||||
|
self.batch.write_to_out_protocol(o_prot)?;
|
||||||
|
o_prot.write_field_end()?;
|
||||||
|
o_prot.write_field_stop()?;
|
||||||
|
o_prot.write_struct_end()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue