diff --git a/.circleci/config.yml b/.circleci/config.yml index c6b8140f30..7c4a3a0feb 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -148,7 +148,7 @@ jobs: - cache_restore - run: name: Cargo test - command: cargo test --features=jaeger --workspace + command: cargo test --workspace - cache_save # end to end tests with Heappy (heap profiling enabled) @@ -275,7 +275,7 @@ jobs: command: cargo test --workspace --benches --no-run - run: name: Build with object store + exporter support + HEAP profiling - command: cargo build --no-default-features --features="aws,gcp,azure,jaeger,heappy,pprof" + command: cargo build --no-default-features --features="aws,gcp,azure,heappy,pprof" - cache_save # Lint protobufs. @@ -334,10 +334,10 @@ jobs: - cache_restore - run: name: Print rustc target CPU options - command: cargo run --release --no-default-features --features="aws,gcp,azure,jaeger,heappy" --bin print_cpu + command: cargo run --release --no-default-features --features="aws,gcp,azure,heappy" --bin print_cpu - run: name: Cargo release build with target arch set for CRoaring - command: cargo build --release --no-default-features --features="aws,gcp,azure,jaeger,heappy" + command: cargo build --release --no-default-features --features="aws,gcp,azure,heappy" - run: | echo sha256sum after build is sha256sum target/release/influxdb_iox diff --git a/.gitattributes b/.gitattributes index 05de81c3b6..0248cc6d2d 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1,3 +1,4 @@ generated_types/protos/google/ linguist-generated=true generated_types/protos/grpc/ linguist-generated=true generated_types/src/wal_generated.rs linguist-generated=true +trace_exporters/src/thrift/ linguist-generated=true diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8b938f92e9..a17f072231 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -265,7 +265,7 @@ docker run -d --name jaeger \ ### Step 2: Run IOx configured to send traces to the local Jaeger instance -Build IOx with `--features=jaeger` and run with the following environment variables set: +Build IOx and run with the following environment variable set: ``` TRACES_EXPORTER=jaeger TRACES_EXPORTER_JAEGER_AGENT_HOST=localhost @@ -274,7 +274,7 @@ TRACES_EXPORTER_JAEGER_AGENT_PORT=6831 For example, a command such as this should do the trick: ```shell -TRACES_EXPORTER=jaeger TRACES_EXPORTER_JAEGER_AGENT_HOST=localhost TRACES_EXPORTER_JAEGER_AGENT_PORT=6831 cargo run --features=jaeger -- run -v --object-store=file --data-dir=$HOME/.influxdb_iox --server-id=42 +TRACES_EXPORTER=jaeger TRACES_EXPORTER_JAEGER_AGENT_HOST=localhost TRACES_EXPORTER_JAEGER_AGENT_PORT=6831 cargo run -- run -v --server-id=42 ``` ### Step 3: Send a request with trace context diff --git a/Cargo.lock b/Cargo.lock index 3d2aefbcd7..cd62588c8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2588,46 +2588,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "opentelemetry" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1cf9b1c4e9a6c4de793c632496fa490bdc0e1eea73f0c91394f7b6990935d22" -dependencies = [ - "async-trait", - "crossbeam-channel", - "futures", - "js-sys", - "lazy_static", - "percent-encoding", - "pin-project", - "rand", - "thiserror", -] - -[[package]] -name = "opentelemetry-jaeger" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db22f492873ea037bc267b35a0e8e4fb846340058cb7c864efe3d0bf23684593" -dependencies = [ - "async-trait", - "lazy_static", - "opentelemetry", - "opentelemetry-semantic-conventions", - "thiserror", - "thrift", -] - -[[package]] -name = "opentelemetry-semantic-conventions" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffeac823339e8b0f27b961f4385057bf9f97f2863bc745bd015fd6091f2270e9" -dependencies = [ - "opentelemetry", -] - [[package]] name = "ordered-float" version = "1.1.1" @@ -4568,12 +4528,10 @@ dependencies = [ "chrono", "futures", "observability_deps", - "opentelemetry", - "opentelemetry-jaeger", "snafu", "structopt", + "thrift", "tokio", - "tokio-util", "trace", ] diff --git a/Cargo.toml b/Cargo.toml index 5e74ed7e38..b8a27779f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -191,7 +191,6 @@ default = ["jemalloc_replacing_malloc"] azure = ["object_store/azure"] # Optional Azure Object store support gcp = ["object_store/gcp"] # Optional GCP object store support aws = ["object_store/aws"] # Optional AWS / S3 object store support -jaeger = ["trace_exporters/jaeger"] # Enable optional jaeger tracing support # pprof is an optional feature for pprof support # heappy is an optional feature; Not on by default as it diff --git a/Dockerfile b/Dockerfile index 1280a6fecc..afbd1fbf82 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,7 +13,7 @@ RUN \ --mount=type=cache,id=influxdb_iox_git,sharing=locked,target=/usr/local/cargo/git \ --mount=type=cache,id=influxdb_iox_target,sharing=locked,target=/influxdb_iox/target \ du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target && \ - cargo build --target-dir /influxdb_iox/target --release --features azure,gcp,aws,jaeger,pprof && \ + cargo build --target-dir /influxdb_iox/target --release --features azure,gcp,aws,pprof && \ cp /influxdb_iox/target/release/influxdb_iox /root/influxdb_iox && \ du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target diff --git a/perf/perf.py b/perf/perf.py index 32618b8c86..2d7d7e32e0 100755 --- a/perf/perf.py +++ b/perf/perf.py @@ -79,8 +79,7 @@ def main(): try: if not args.skip_build: build_with_aws = args.object_store == 's3' - build_with_jaeger = do_trace - cargo_build_iox(args.debug, build_with_aws, build_with_jaeger) + cargo_build_iox(args.debug, build_with_aws) docker_create_network(dc) if args.kafka_zookeeper: @@ -382,15 +381,13 @@ def docker_run_jaeger(dc): return container -def cargo_build_iox(debug=False, build_with_aws=True, build_with_jaeger=True): +def cargo_build_iox(debug=False, build_with_aws=True): t = time.time() print('building IOx') features = [] if build_with_aws: features.append('aws') - if build_with_jaeger: - features.append('jaeger') features = ','.join(features) env = os.environ.copy() diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index 5c4ddaf1b9..42a0bf2231 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -189,9 +189,12 @@ pub async fn main(config: Config) -> Result<()> { let grpc_listener = grpc_listener(config.grpc_bind_address).await?; let http_listener = http_listener(config.http_bind_address).await?; - let trace_collector = config.tracing_config.build().context(Tracing)?; + let async_exporter = config.tracing_config.build().context(Tracing)?; + let trace_collector = async_exporter + .clone() + .map(|x| -> Arc { x }); - serve( + let r = serve( config, application, grpc_listener, @@ -199,7 +202,14 @@ pub async fn main(config: Config) -> Result<()> { trace_collector, app_server, ) - .await + .await; + + if let Some(async_exporter) = async_exporter { + if let Err(e) = async_exporter.drain().await { + error!(%e, "error draining trace exporter"); + } + } + r } async fn grpc_listener(addr: SocketAddr) -> Result { @@ -379,7 +389,7 @@ mod tests { use tokio::task::JoinHandle; use trace::span::{Span, SpanStatus}; use trace::RingBufferTraceCollector; - use trace_exporters::otel::{OtelExporter, TestOtelExporter}; + use trace_exporters::export::{AsyncExporter, TestAsyncExporter}; fn test_config(server_id: Option) -> Config { let mut config = Config::from_iter(&[ @@ -794,9 +804,9 @@ mod tests { } #[tokio::test] - async fn test_otel_exporter() { + async fn test_async_exporter() { let (sender, mut receiver) = tokio::sync::mpsc::channel(20); - let collector = Arc::new(OtelExporter::new(TestOtelExporter::new(sender))); + let collector = Arc::new(AsyncExporter::new(TestAsyncExporter::new(sender))); let (addr, server, join) = tracing_server(&collector).await; let conn = jaeger_client(addr, "34f8495:30e34:0:1").await; @@ -805,15 +815,14 @@ mod tests { .await .unwrap(); - collector.shutdown(); - collector.join().await.unwrap(); + collector.drain().await.unwrap(); server.shutdown(); join.await.unwrap().unwrap(); let span = receiver.recv().await.unwrap(); - assert_eq!(span.span_context.trace_id().to_u128(), 0x34f8495); - assert_eq!(span.parent_span_id.to_u64(), 0x30e34); + assert_eq!(span.ctx.trace_id.get(), 0x34f8495); + assert_eq!(span.ctx.parent_span_id.unwrap().get(), 0x30e34); } fn make_rules(db_name: impl Into) -> ProvidedDatabaseRules { diff --git a/tests/end_to_end_cases/tracing.rs b/tests/end_to_end_cases/tracing.rs index 5d2576746e..942a2423af 100644 --- a/tests/end_to_end_cases/tracing.rs +++ b/tests/end_to_end_cases/tracing.rs @@ -6,18 +6,6 @@ use crate::common::{ use futures::TryStreamExt; use generated_types::{storage_client::StorageClient, ReadFilterRequest}; -// cfg at this level so IDE can resolve code even when jaeger feature is not active -#[cfg(feature = "jaeger")] -fn run_test() -> bool { - true -} - -#[cfg(not(feature = "jaeger"))] -fn run_test() -> bool { - println!("Skipping test because jaeger feature not enabled"); - false -} - async fn setup() -> (UdpCapture, ServerFixture) { let udp_capture = UdpCapture::new().await; @@ -59,10 +47,6 @@ async fn run_sql_query(server_fixture: &ServerFixture) { #[tokio::test] pub async fn test_tracing_sql() { - if !run_test() { - return; - } - let (udp_capture, server_fixture) = setup().await; run_sql_query(&server_fixture).await; @@ -80,10 +64,6 @@ pub async fn test_tracing_sql() { #[tokio::test] pub async fn test_tracing_storage_api() { - if !run_test() { - return; - } - let (udp_capture, server_fixture) = setup().await; let scenario = Scenario::new(); @@ -127,9 +107,6 @@ pub async fn test_tracing_storage_api() { #[tokio::test] pub async fn test_tracing_create_trace() { - if !run_test() { - return; - } let udp_capture = UdpCapture::new().await; let test_config = TestConfig::new() diff --git a/trace/src/span.rs b/trace/src/span.rs index 953cc7b99e..249af64ee9 100644 --- a/trace/src/span.rs +++ b/trace/src/span.rs @@ -87,6 +87,7 @@ pub enum MetaValue { String(Cow<'static, str>), Float(f64), Int(i64), + Bool(bool), } impl From<&'static str> for MetaValue { diff --git a/trace_exporters/Cargo.toml b/trace_exporters/Cargo.toml index f2aaeea58b..5bb519f68c 100644 --- a/trace_exporters/Cargo.toml +++ b/trace_exporters/Cargo.toml @@ -11,16 +11,10 @@ async-trait = "0.1" chrono = { version = "0.4" } futures = "0.3" observability_deps = { path = "../observability_deps" } -opentelemetry = { version = "0.16" } -opentelemetry-jaeger = { version = "0.15", optional = true } snafu = "0.6" structopt = { version = "0.3.23" } +thrift = { version = "0.13.0" } tokio = { version = "1.11", features = ["macros", "time", "sync", "rt"] } -tokio-util = { version = "0.6.3" } trace = { path = "../trace" } [dev-dependencies] - -[features] -default = [] -jaeger = ["opentelemetry-jaeger"] diff --git a/trace_exporters/README.md b/trace_exporters/README.md new file mode 100644 index 0000000000..0bec84bb9c --- /dev/null +++ b/trace_exporters/README.md @@ -0,0 +1,54 @@ +# Trace Exporters + +## Regenerating Jaeger Thrift + +_The instructions below use docker, but this is optional._ + +_Depending on your setup there may be permissions complications that require using`-u`_ + +Startup a Debian bullseye image + +``` +docker run -it -v $PWD:/out debian:bullseye-slim +``` + +Install the thrift-compiler + +``` +$ apt-get update +$ apt-get install thrift-compiler wget +``` + +Verify the version of the compiler matches the version of `thrift` in [Cargo.toml](./Cargo.toml) + +``` +$ thrift --version +Thrift version 0.13.0 +``` + +Get the IDL definition + +``` +$ wget https://raw.githubusercontent.com/jaegertracing/jaeger-idl/master/thrift/jaeger.thrift https://raw.githubusercontent.com/jaegertracing/jaeger-idl/master/thrift/zipkincore.thrift https://raw.githubusercontent.com/jaegertracing/jaeger-idl/master/thrift/agent.thrift +``` + +Generate the code + +``` +$ thrift --out /out/src/thrift --gen rs agent.thrift +$ thrift --out /out/src/thrift --gen rs jaeger.thrift +$ thrift --out /out/src/thrift --gen rs zipkincore.thrift +``` + +Patch up imports + +``` +sed -i 's/use jaeger;/use super::jaeger;/g' /out/src/thrift/agent.rs +sed -i 's/use zipkincore;/use super::zipkincore;/g' /out/src/thrift/agent.rs +``` + +Remove the clippy line + +``` +#![cfg_attr(feature = "cargo-clippy", allow(too_many_arguments, type_complexity))] +``` \ No newline at end of file diff --git a/trace_exporters/src/export.rs b/trace_exporters/src/export.rs new file mode 100644 index 0000000000..74ff4d78cc --- /dev/null +++ b/trace_exporters/src/export.rs @@ -0,0 +1,162 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use futures::{ + future::{BoxFuture, Shared}, + FutureExt, TryFutureExt, +}; +use tokio::sync::mpsc; +use tokio::task::JoinError; + +use observability_deps::tracing::{error, info, warn}; +use trace::{span::Span, TraceCollector}; + +/// Size of the exporter buffer +const CHANNEL_SIZE: usize = 1000; + +/// An `AsyncExport` is a batched async version of `trace::TraceCollector` +#[async_trait] +pub trait AsyncExport: Send + 'static { + async fn export(&mut self, span: Vec); +} + +/// `AsyncExporter` wraps a `AsyncExport` and sinks spans to it +/// +/// In order to do this it spawns a background worker that pulls messages +/// off a queue and writes them to the `AsyncExport`. +/// +/// If this worker cannot keep up, and this queue fills up, spans will +/// be dropped and warnings logged +/// +/// Note: Currently this does not batch spans (#2392) +#[derive(Debug)] +pub struct AsyncExporter { + join: Shared>>>, + + /// Communication queue with the background worker + /// + /// Sending None triggers termination + sender: tokio::sync::mpsc::Sender>, +} + +impl AsyncExporter { + /// Creates a new `AsyncExporter` + pub fn new(collector: T) -> Self { + let (sender, receiver) = mpsc::channel(CHANNEL_SIZE); + + let handle = tokio::spawn(background_worker(collector, receiver)); + let join = handle.map_err(Arc::new).boxed().shared(); + + Self { join, sender } + } + + /// Triggers shutdown of this `AsyncExporter` and waits until all in-flight + /// spans have been published to the `AsyncExport` + pub async fn drain(&self) -> Result<(), Arc> { + info!("batched exporter shutting down"); + let _ = self.sender.send(None).await; + self.join.clone().await + } +} + +impl TraceCollector for AsyncExporter { + fn export(&self, span: Span) { + use mpsc::error::TrySendError; + match self.sender.try_send(Some(span)) { + Ok(_) => { + //TODO: Increment some metric (#2613) + } + Err(TrySendError::Full(_)) => { + warn!("exporter cannot keep up, dropping spans") + } + Err(TrySendError::Closed(_)) => { + warn!("background worker shutdown") + } + } + } +} + +async fn background_worker( + mut exporter: T, + mut receiver: mpsc::Receiver>, +) { + loop { + match receiver.recv().await { + Some(Some(span)) => exporter.export(vec![span]).await, + Some(None) => { + info!("async exporter shut down"); + break; + } + None => { + error!("sender-side of async exporter dropped without waiting for shut down"); + break; + } + } + } +} + +/// An `AsyncExporter` that sinks writes to a tokio mpsc channel. +/// +/// Intended for testing ONLY +/// +#[derive(Debug)] +pub struct TestAsyncExporter { + channel: mpsc::Sender, +} + +impl TestAsyncExporter { + pub fn new(channel: mpsc::Sender) -> Self { + Self { channel } + } +} + +#[async_trait] +impl AsyncExport for TestAsyncExporter { + async fn export(&mut self, batch: Vec) { + for span in batch { + self.channel.send(span).await.expect("channel closed") + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use trace::ctx::SpanContext; + + #[tokio::test] + async fn test_exporter() { + let (sender, mut receiver) = mpsc::channel(10); + let exporter = AsyncExporter::new(TestAsyncExporter::new(sender)); + + let root = SpanContext::new(Arc::new(trace::LogTraceCollector::new())); + let s1 = root.child("foo"); + let s2 = root.child("bar"); + + exporter.export(s1.clone()); + exporter.export(s2.clone()); + exporter.export(s2.clone()); + + // Drain should wait for all published spans to be flushed + exporter.drain().await.unwrap(); + + let r1 = receiver.recv().await.unwrap(); + let r2 = receiver.recv().await.unwrap(); + let r3 = receiver.recv().await.unwrap(); + + // Should not be fatal despite exporter having been shutdown + exporter.export(s2.clone()); + + assert_eq!(root.span_id.get(), r1.ctx.parent_span_id.unwrap().get()); + assert_eq!(s1.ctx.span_id.get(), r1.ctx.span_id.get()); + assert_eq!(s1.ctx.trace_id.get(), r1.ctx.trace_id.get()); + + assert_eq!(root.span_id.get(), r2.ctx.parent_span_id.unwrap().get()); + assert_eq!(s2.ctx.span_id.get(), r2.ctx.span_id.get()); + assert_eq!(s2.ctx.trace_id.get(), r2.ctx.trace_id.get()); + + assert_eq!(root.span_id.get(), r3.ctx.parent_span_id.unwrap().get()); + assert_eq!(s2.ctx.span_id.get(), r3.ctx.span_id.get()); + assert_eq!(s2.ctx.trace_id.get(), r3.ctx.trace_id.get()); + } +} diff --git a/trace_exporters/src/jaeger.rs b/trace_exporters/src/jaeger.rs new file mode 100644 index 0000000000..6ae8427909 --- /dev/null +++ b/trace_exporters/src/jaeger.rs @@ -0,0 +1,307 @@ +use std::net::{SocketAddr, ToSocketAddrs, UdpSocket}; + +use async_trait::async_trait; + +use observability_deps::tracing::{error, info}; +use trace::span::Span; + +use crate::export::AsyncExport; +use crate::thrift::agent::{AgentSyncClient, TAgentSyncClient}; +use crate::thrift::jaeger; +use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol}; + +mod span; + +/// `JaegerAgentExporter` receives span data and writes it over UDP to a local jaeger agent +/// +/// Note: will drop data if the UDP socket would block +pub struct JaegerAgentExporter { + /// The name of the service + service_name: String, + + /// The agent client that encodes messages + client: + AgentSyncClient, TCompactOutputProtocol>, + + /// Spans should be assigned a sequential sequence number + /// to allow jaeger to better detect dropped spans + next_sequence: i64, +} + +impl JaegerAgentExporter { + pub fn new( + service_name: String, + agent_endpoint: E, + ) -> super::Result { + info!(%agent_endpoint, %service_name, "Creating jaeger tracing exporter"); + let remote_addr = agent_endpoint.to_socket_addrs()?.next().ok_or_else(|| { + super::Error::ResolutionError { + address: agent_endpoint.to_string(), + } + })?; + + let local_addr: SocketAddr = if remote_addr.is_ipv4() { + "0.0.0.0:0" + } else { + "[::]:0" + } + .parse() + .unwrap(); + + let socket = UdpSocket::bind(local_addr)?; + socket.set_nonblocking(true)?; + socket.connect(remote_addr)?; + + let client = AgentSyncClient::new( + TCompactInputProtocol::new(NoopReader::default()), + TCompactOutputProtocol::new(MessageWriter::new(socket)), + ); + + Ok(Self { + service_name, + client, + next_sequence: 0, + }) + } + + fn make_batch(&mut self, spans: Vec) -> jaeger::Batch { + let seq_no = Some(self.next_sequence); + self.next_sequence += 1; + jaeger::Batch { + process: jaeger::Process { + service_name: self.service_name.clone(), + tags: None, + }, + spans: spans.into_iter().map(Into::into).collect(), + seq_no, + stats: None, + } + } +} + +#[async_trait] +impl AsyncExport for JaegerAgentExporter { + async fn export(&mut self, spans: Vec) { + let batch = self.make_batch(spans); + if let Err(e) = self.client.emit_batch(batch) { + error!(%e, "error writing batch to jaeger agent") + } + } +} + +/// `NoopReader` is a `std::io::Read` that never returns any data +#[derive(Debug, Default)] +struct NoopReader {} + +impl std::io::Read for NoopReader { + fn read(&mut self, _buf: &mut [u8]) -> std::io::Result { + Ok(0) + } +} + +/// A `MessageWriter` only writes entire message payloads to the provided UDP socket +/// +/// If the UDP socket would block, drops the packet +struct MessageWriter { + buf: Vec, + socket: UdpSocket, +} + +impl MessageWriter { + fn new(socket: UdpSocket) -> Self { + Self { + buf: vec![], + socket, + } + } +} + +impl std::io::Write for MessageWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.buf.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + let message_len = self.buf.len(); + let r = self.socket.send(&self.buf); + self.buf.clear(); + match r { + Ok(written) => { + if written != message_len { + // In the event a message is truncated, there isn't an obvious way to recover + // + // The Thrift protocol is normally used on top of a reliable stream, + // e.g. TCP, and it is a bit of a hack to send it over UDP + // + // Jaeger requires that each thrift Message is encoded in exactly one UDP + // packet, as this ensures it either arrives in its entirety or not at all + // + // If for whatever reason the packet is truncated, the agent will fail to + // to decode it, likely due to a missing stop-field, and discard it + error!(%written, %message_len, "jaeger agent exporter failed to write message as single UDP packet"); + } + Ok(()) + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + error!("jaeger agent exporter would have blocked - dropping message"); + Ok(()) + } + Err(e) => Err(e), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::thrift::agent::{AgentSyncHandler, AgentSyncProcessor}; + use chrono::{TimeZone, Utc}; + use std::sync::{Arc, Mutex}; + use thrift::server::TProcessor; + use thrift::transport::TBufferChannel; + use trace::ctx::{SpanContext, SpanId, TraceId}; + use trace::span::{SpanEvent, SpanStatus}; + + struct TestHandler { + batches: Arc>>, + } + + impl AgentSyncHandler for TestHandler { + fn handle_emit_zipkin_batch( + &self, + _spans: Vec, + ) -> thrift::Result<()> { + unimplemented!() + } + + fn handle_emit_batch(&self, batch: jaeger::Batch) -> thrift::Result<()> { + self.batches.lock().unwrap().push(batch); + Ok(()) + } + } + + /// Wraps a UdpSocket and a buffer the size of the max UDP datagram and provides + /// `std::io::Read` on this buffer's contents, ensuring that reads are not truncated + struct Reader { + socket: UdpSocket, + buffer: Box<[u8; 65535]>, + idx: usize, + len: usize, + } + + impl Reader { + pub fn new(socket: UdpSocket) -> Self { + Self { + socket, + buffer: Box::new([0; 65535]), + idx: 0, + len: 0, + } + } + } + + impl std::io::Read for Reader { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + if self.idx == self.len { + self.idx = 0; + self.len = self.socket.recv(self.buffer.as_mut())?; + } + let to_read = buf.len().min(self.len - self.idx); + buf.copy_from_slice(&self.buffer[self.idx..(self.idx + to_read)]); + self.idx += to_read; + Ok(to_read) + } + } + + #[tokio::test] + async fn test_jaeger() { + let server = UdpSocket::bind("0.0.0.0:0").unwrap(); + server + .set_read_timeout(Some(std::time::Duration::from_secs(1))) + .unwrap(); + + let address = server.local_addr().unwrap(); + let mut exporter = JaegerAgentExporter::new("service_name".to_string(), address).unwrap(); + + let batches = Arc::new(Mutex::new(vec![])); + + let mut processor_input = TCompactInputProtocol::new(Reader::new(server)); + let mut processor_output = TCompactOutputProtocol::new(TBufferChannel::with_capacity(0, 0)); + let processor = AgentSyncProcessor::new(TestHandler { + batches: Arc::clone(&batches), + }); + + let ctx = SpanContext { + trace_id: TraceId::new(43434).unwrap(), + parent_span_id: None, + span_id: SpanId::new(3495993).unwrap(), + collector: None, + }; + let mut span = ctx.child("foo"); + span.status = SpanStatus::Ok; + span.events = vec![SpanEvent { + time: Utc.timestamp_nanos(200000), + msg: "hello".into(), + }]; + span.start = Some(Utc.timestamp_nanos(100000)); + span.end = Some(Utc.timestamp_nanos(300000)); + + exporter.export(vec![span.clone(), span.clone()]).await; + exporter.export(vec![span.clone()]).await; + + processor + .process(&mut processor_input, &mut processor_output) + .unwrap(); + + processor + .process(&mut processor_input, &mut processor_output) + .unwrap(); + + let batches = batches.lock().unwrap(); + assert_eq!(batches.len(), 2); + + let b1 = &batches[0]; + + assert_eq!(b1.spans.len(), 2); + assert_eq!(b1.process.service_name.as_str(), "service_name"); + assert_eq!(b1.seq_no.unwrap(), 0); + + let b2 = &batches[1]; + assert_eq!(b2.spans.len(), 1); + assert_eq!(b2.process.service_name.as_str(), "service_name"); + assert_eq!(b2.seq_no.unwrap(), 1); + + let b1_s0 = &b1.spans[0]; + + assert_eq!(b1_s0, &b1.spans[1]); + assert_eq!(b1_s0, &b2.spans[0]); + + assert_eq!(b1_s0.span_id, span.ctx.span_id.get() as i64); + assert_eq!( + b1_s0.parent_span_id, + span.ctx.parent_span_id.unwrap().get() as i64 + ); + + // microseconds not nanoseconds + assert_eq!(b1_s0.start_time, 100); + assert_eq!(b1_s0.duration, 200); + + let logs = b1_s0.logs.as_ref().unwrap(); + assert_eq!(logs.len(), 1); + assert_eq!(logs[0].timestamp, 200); + assert_eq!(logs[0].fields.len(), 1); + assert_eq!(logs[0].fields[0].key.as_str(), "event"); + assert_eq!(logs[0].fields[0].v_str.as_ref().unwrap().as_str(), "hello"); + + let tags = b1_s0.tags.as_ref().unwrap(); + assert_eq!(tags.len(), 1); + assert_eq!(tags[0].key.as_str(), "ok"); + assert!(tags[0].v_bool.unwrap()); + } + + #[test] + fn test_resolve() { + JaegerAgentExporter::new("service_name".to_string(), "localhost:8082").unwrap(); + } +} diff --git a/trace_exporters/src/jaeger/span.rs b/trace_exporters/src/jaeger/span.rs new file mode 100644 index 0000000000..e3af3e45d3 --- /dev/null +++ b/trace_exporters/src/jaeger/span.rs @@ -0,0 +1,117 @@ +/// Contains the conversion logic from a `trace::span::Span` to `thrift::jaeger::Span` +use crate::thrift::jaeger; +use trace::span::{MetaValue, Span, SpanEvent, SpanStatus}; + +impl From for jaeger::Span { + fn from(mut s: Span) -> Self { + let trace_id = s.ctx.trace_id.get(); + let trace_id_high = (trace_id >> 64) as i64; + let trace_id_low = trace_id as i64; + + // A parent span id of 0 indicates no parent span ID (span IDs are non-zero) + let parent_span_id = s.ctx.parent_span_id.map(|id| id.get()).unwrap_or_default() as i64; + + let (start_time, duration) = match (s.start, s.end) { + (Some(start), Some(end)) => ( + start.timestamp_nanos() / 1000, + (end - start).num_microseconds().expect("no overflow"), + ), + (Some(start), _) => (start.timestamp_nanos() / 1000, 0), + _ => (0, 0), + }; + + // These don't appear to be standardised, however, the jaeger UI treats + // the presence of an "error" tag as indicating an error + match s.status { + SpanStatus::Ok => { + s.metadata + .entry("ok".into()) + .or_insert(MetaValue::Bool(true)); + } + SpanStatus::Err => { + s.metadata + .entry("error".into()) + .or_insert(MetaValue::Bool(true)); + } + SpanStatus::Unknown => {} + } + + let tags = match s.metadata.is_empty() { + true => None, + false => Some( + s.metadata + .into_iter() + .map(|(name, value)| tag_from_meta(name.to_string(), value)) + .collect(), + ), + }; + + let logs = match s.events.is_empty() { + true => None, + false => Some(s.events.into_iter().map(Into::into).collect()), + }; + + Self { + trace_id_low, + trace_id_high, + span_id: s.ctx.span_id.get() as i64, + parent_span_id, + operation_name: s.name.to_string(), + references: None, + flags: 0, + start_time, + duration, + tags, + logs, + } + } +} + +impl From for jaeger::Log { + fn from(event: SpanEvent) -> Self { + Self { + timestamp: event.time.timestamp_nanos() / 1000, + fields: vec![jaeger::Tag { + key: "event".to_string(), + v_type: jaeger::TagType::String, + v_str: Some(event.msg.to_string()), + v_double: None, + v_bool: None, + v_long: None, + v_binary: None, + }], + } + } +} + +fn tag_from_meta(key: String, value: MetaValue) -> jaeger::Tag { + let mut tag = jaeger::Tag { + key, + v_type: jaeger::TagType::String, + v_str: None, + v_double: None, + v_bool: None, + v_long: None, + v_binary: None, + }; + + match value { + MetaValue::String(v) => { + tag.v_type = jaeger::TagType::String; + tag.v_str = Some(v.to_string()) + } + MetaValue::Float(v) => { + tag.v_type = jaeger::TagType::Double; + tag.v_double = Some(v.into()) + } + MetaValue::Int(v) => { + tag.v_type = jaeger::TagType::Long; + tag.v_long = Some(v) + } + MetaValue::Bool(v) => { + tag.v_type = jaeger::TagType::Bool; + tag.v_bool = Some(v) + } + }; + tag +} diff --git a/trace_exporters/src/lib.rs b/trace_exporters/src/lib.rs index 7a41dc6794..352e17845b 100644 --- a/trace_exporters/src/lib.rs +++ b/trace_exporters/src/lib.rs @@ -7,13 +7,34 @@ clippy::future_not_send )] +use crate::export::AsyncExporter; +use crate::jaeger::JaegerAgentExporter; use snafu::Snafu; use std::num::NonZeroU16; use std::sync::Arc; use structopt::StructOpt; -use trace::TraceCollector; -pub mod otel; +pub mod export; + +mod jaeger; + +/// Auto-generated thrift code +#[allow( + dead_code, + deprecated, + clippy::redundant_field_names, + clippy::unused_unit, + clippy::use_self, + clippy::too_many_arguments, + clippy::type_complexity +)] +mod thrift { + pub mod agent; + + pub mod zipkincore; + + pub mod jaeger; +} /// CLI config for distributed tracing options #[derive(Debug, StructOpt, Clone)] @@ -84,7 +105,7 @@ pub struct TracingConfig { } impl TracingConfig { - pub fn build(&self) -> Result>> { + pub fn build(&self) -> Result>> { match self.traces_exporter { TracesExporter::None => Ok(None), TracesExporter::Jaeger => Ok(Some(jaeger_exporter(self)?)), @@ -115,23 +136,16 @@ impl std::str::FromStr for TracesExporter { #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("failed to construct trace exporter: {}", source))] - TraceExporter { - source: opentelemetry::trace::TraceError, - }, + #[snafu(display("Failed to resolve address: {}", address))] + ResolutionError { address: String }, - #[snafu(display( - "'jaeger' not supported with this build. Hint: recompile with appropriate features" - ))] - JaegerNotBuilt {}, + #[snafu(context(false))] + IOError { source: std::io::Error }, } pub type Result = std::result::Result; -#[cfg(feature = "jaeger")] -fn jaeger_exporter(config: &TracingConfig) -> Result> { - use observability_deps::tracing::info; - +fn jaeger_exporter(config: &TracingConfig) -> Result> { let agent_endpoint = format!( "{}:{}", config.traces_exporter_jaeger_agent_host.trim(), @@ -139,18 +153,7 @@ fn jaeger_exporter(config: &TracingConfig) -> Result> { ); let service_name = &config.traces_exporter_jaeger_service_name; - info!(%agent_endpoint, %service_name, "Creating jaeger tracing exporter"); + let jaeger = JaegerAgentExporter::new(service_name.clone(), agent_endpoint)?; - let exporter = opentelemetry_jaeger::new_pipeline() - .with_agent_endpoint(agent_endpoint) - .with_service_name(&config.traces_exporter_jaeger_service_name) - .init_sync_exporter() - .map_err(|source| Error::TraceExporter { source })?; - - Ok(Arc::new(otel::OtelExporter::new(exporter))) -} - -#[cfg(not(feature = "jaeger"))] -fn jaeger_exporter(_config: &TracingConfig) -> Result> { - Err(Error::JaegerNotBuilt {}) + Ok(Arc::new(AsyncExporter::new(jaeger))) } diff --git a/trace_exporters/src/otel.rs b/trace_exporters/src/otel.rs deleted file mode 100644 index 78dcd838c8..0000000000 --- a/trace_exporters/src/otel.rs +++ /dev/null @@ -1,356 +0,0 @@ -use std::borrow::Cow; -use std::future::Future; -use std::sync::Arc; - -use async_trait::async_trait; -use futures::{ - future::{BoxFuture, Shared}, - FutureExt, TryFutureExt, -}; -use tokio::sync::mpsc; -use tokio::task::JoinError; -use tokio_util::sync::CancellationToken; - -use observability_deps::tracing::{error, info, warn}; -use opentelemetry::sdk::export::trace::{ExportResult, SpanData, SpanExporter}; - -use trace::ctx::{SpanContext, SpanId, TraceId}; -use trace::span::{MetaValue, SpanEvent, SpanStatus}; -use trace::{span::Span, TraceCollector}; - -/// Size of the exporter buffer -const CHANNEL_SIZE: usize = 1000; - -/// Maximum number of events that can be associated with a span -const MAX_EVENTS: u32 = 100; - -/// Maximum number of attributes that can be associated with a span -const MAX_ATTRIBUTES: u32 = 100; - -/// `OtelExporter` wraps a opentelemetry SpanExporter and sinks spans to it -/// -/// In order to do this it spawns a background worker that pulls messages -/// of a queue and writes them to opentelemetry. If this worker cannot keep -/// up, and this queue fills up, spans will be dropped and warnings logged -#[derive(Debug)] -pub struct OtelExporter { - join: Shared>>>, - - sender: tokio::sync::mpsc::Sender, - - shutdown: CancellationToken, -} - -impl OtelExporter { - /// Creates a new `OtelExporter` - pub fn new(exporter: T) -> Self { - let shutdown = CancellationToken::new(); - let (sender, receiver) = mpsc::channel(CHANNEL_SIZE); - - let handle = tokio::spawn(background_worker(shutdown.clone(), exporter, receiver)); - let join = handle.map_err(Arc::new).boxed().shared(); - - Self { - join, - shutdown, - sender, - } - } - - /// Triggers shutdown of this `OtelExporter` - pub fn shutdown(&self) { - info!("otel exporter shutting down"); - self.shutdown.cancel() - } - - /// Waits for the background worker of OtelExporter to finish - pub fn join(&self) -> impl Future>> { - self.join.clone() - } -} - -impl TraceCollector for OtelExporter { - fn export(&self, span: Span) { - use mpsc::error::TrySendError; - - match self.sender.try_send(convert_span(span)) { - Ok(_) => { - //TODO: Increment some metric - } - Err(TrySendError::Full(_)) => { - warn!("exporter cannot keep up, dropping spans") - } - Err(TrySendError::Closed(_)) => { - warn!("background worker shutdown") - } - } - } -} - -async fn background_worker( - shutdown: CancellationToken, - exporter: T, - receiver: mpsc::Receiver, -) { - tokio::select! { - _ = exporter_loop(exporter, receiver) => { - // Don't expect this future to complete - error!("otel exporter loop completed") - } - _ = shutdown.cancelled() => {} - } - info!("otel exporter shut down") -} - -/// An opentelemetry::SpanExporter that sinks writes to a tokio mpsc channel. -/// -/// Intended for testing ONLY -/// -/// Note: There is a similar construct in opentelemetry behind the testing feature -/// flag, but enabling this brings in a large number of additional dependencies and -/// so we just implement our own version -#[derive(Debug)] -pub struct TestOtelExporter { - channel: mpsc::Sender, -} - -impl TestOtelExporter { - pub fn new(channel: mpsc::Sender) -> Self { - Self { channel } - } -} - -#[async_trait] -impl SpanExporter for TestOtelExporter { - async fn export(&mut self, batch: Vec) -> ExportResult { - for span in batch { - self.channel.send(span).await.expect("channel closed") - } - Ok(()) - } -} - -async fn exporter_loop( - mut exporter: T, - mut receiver: tokio::sync::mpsc::Receiver, -) { - while let Some(span) = receiver.recv().await { - // TODO: Batch export spans - if let Err(e) = exporter.export(vec![span]).await { - error!(%e, "error exporting span") - } - } - - warn!("sender-side of jaeger exporter dropped without waiting for shut down") -} - -fn convert_span(span: Span) -> SpanData { - use opentelemetry::sdk::trace::{EvictedHashMap, EvictedQueue}; - use opentelemetry::sdk::InstrumentationLibrary; - use opentelemetry::trace::{SpanId, SpanKind}; - use opentelemetry::{Key, KeyValue}; - - let parent_span_id = match span.ctx.parent_span_id { - Some(id) => convert_span_id(id), - None => SpanId::invalid(), - }; - - let mut ret = SpanData { - span_context: convert_ctx(&span.ctx), - parent_span_id, - span_kind: SpanKind::Server, - name: span.name, - start_time: span.start.map(Into::into).unwrap_or(std::time::UNIX_EPOCH), - end_time: span.end.map(Into::into).unwrap_or(std::time::UNIX_EPOCH), - attributes: EvictedHashMap::new(MAX_ATTRIBUTES, 0), - events: EvictedQueue::new(MAX_EVENTS), - links: EvictedQueue::new(0), - status_code: convert_status(span.status), - status_message: Default::default(), - resource: None, - instrumentation_lib: InstrumentationLibrary::new("iox-trace", None), - }; - - ret.events - .extend(span.events.into_iter().map(convert_event)); - for (key, value) in span.metadata { - let key = match key { - Cow::Owned(key) => Key::new(key), - Cow::Borrowed(key) => Key::new(key), - }; - - let value = convert_meta_value(value); - ret.attributes.insert(KeyValue::new(key, value)) - } - ret -} - -fn convert_ctx(ctx: &SpanContext) -> opentelemetry::trace::SpanContext { - opentelemetry::trace::SpanContext::new( - convert_trace_id(ctx.trace_id), - convert_span_id(ctx.span_id), - Default::default(), - false, - Default::default(), - ) -} - -fn convert_event(event: SpanEvent) -> opentelemetry::trace::Event { - opentelemetry::trace::Event { - name: event.msg, - timestamp: event.time.into(), - attributes: vec![], - dropped_attributes_count: 0, - } -} - -fn convert_status(status: SpanStatus) -> opentelemetry::trace::StatusCode { - use opentelemetry::trace::StatusCode; - match status { - SpanStatus::Unknown => StatusCode::Unset, - SpanStatus::Ok => StatusCode::Ok, - SpanStatus::Err => StatusCode::Error, - } -} - -fn convert_span_id(id: SpanId) -> opentelemetry::trace::SpanId { - opentelemetry::trace::SpanId::from_u64(id.0.get()) -} - -fn convert_trace_id(id: TraceId) -> opentelemetry::trace::TraceId { - opentelemetry::trace::TraceId::from_u128(id.0.get()) -} - -fn convert_meta_value(v: MetaValue) -> opentelemetry::Value { - match v { - MetaValue::String(v) => opentelemetry::Value::String(v), - MetaValue::Float(v) => opentelemetry::Value::F64(v), - MetaValue::Int(v) => opentelemetry::Value::I64(v), - } -} - -#[cfg(test)] -mod tests { - use super::*; - use chrono::{TimeZone, Utc}; - use opentelemetry::{Key, Value}; - use std::time::{Duration, UNIX_EPOCH}; - - #[test] - fn test_conversion() { - let root = SpanContext { - trace_id: TraceId::new(232345).unwrap(), - parent_span_id: Some(SpanId::new(2484).unwrap()), - span_id: SpanId::new(2343).unwrap(), - collector: None, - }; - - let mut span = root.child("foo"); - span.metadata.insert("string".into(), "bar".into()); - span.metadata.insert("float".into(), 3.32.into()); - span.metadata.insert("int".into(), 5.into()); - - span.events.push(SpanEvent { - time: Utc.timestamp_nanos(1230), - msg: "event".into(), - }); - span.status = SpanStatus::Ok; - - span.start = Some(Utc.timestamp_nanos(1000)); - span.end = Some(Utc.timestamp_nanos(2000)); - - let span_data: SpanData = convert_span(span.clone()); - - assert_eq!( - span_data.span_context.span_id().to_u64(), - span.ctx.span_id.get() - ); - assert_eq!( - span_data.span_context.trace_id().to_u128(), - span.ctx.trace_id.get() - ); - assert_eq!( - span_data.parent_span_id.to_u64(), - span.ctx.parent_span_id.unwrap().get() - ); - assert_eq!( - span_data.start_time, - UNIX_EPOCH + Duration::from_nanos(1000) - ); - assert_eq!(span_data.end_time, UNIX_EPOCH + Duration::from_nanos(2000)); - - let events: Vec<_> = span_data.events.iter().collect(); - assert_eq!(events.len(), 1); - assert_eq!(events[0].name.as_ref(), "event"); - assert_eq!(events[0].timestamp, UNIX_EPOCH + Duration::from_nanos(1230)); - assert_eq!(events[0].attributes.len(), 0); - - assert_eq!( - span_data - .attributes - .get(&Key::from_static_str("string")) - .unwrap() - .clone(), - Value::String("bar".into()) - ); - assert_eq!( - span_data - .attributes - .get(&Key::from_static_str("float")) - .unwrap() - .clone(), - Value::F64(3.32) - ); - assert_eq!( - span_data - .attributes - .get(&Key::from_static_str("int")) - .unwrap() - .clone(), - Value::I64(5) - ); - } - - #[tokio::test] - async fn test_exporter() { - let (sender, mut receiver) = mpsc::channel(10); - let exporter = OtelExporter::new(TestOtelExporter::new(sender)); - - assert!(exporter.join().now_or_never().is_none()); - - let root = SpanContext { - trace_id: TraceId::new(232345).unwrap(), - parent_span_id: None, - span_id: SpanId::new(2343).unwrap(), - collector: None, - }; - let s1 = root.child("foo"); - let s2 = root.child("bar"); - - exporter.export(s1.clone()); - exporter.export(s2.clone()); - exporter.export(s2.clone()); - - let r1 = receiver.recv().await.unwrap(); - let r2 = receiver.recv().await.unwrap(); - let r3 = receiver.recv().await.unwrap(); - - exporter.shutdown(); - exporter.join().await.unwrap(); - - // Should not be fatal despite exporter having been shutdown - exporter.export(s2.clone()); - - assert_eq!(root.span_id.get(), r1.parent_span_id.to_u64()); - assert_eq!(s1.ctx.span_id.get(), r1.span_context.span_id().to_u64()); - assert_eq!(s1.ctx.trace_id.get(), r1.span_context.trace_id().to_u128()); - - assert_eq!(root.span_id.get(), r2.parent_span_id.to_u64()); - assert_eq!(s2.ctx.span_id.get(), r2.span_context.span_id().to_u64()); - assert_eq!(s2.ctx.trace_id.get(), r2.span_context.trace_id().to_u128()); - - assert_eq!(root.span_id.get(), r3.parent_span_id.to_u64()); - assert_eq!(s2.ctx.span_id.get(), r3.span_context.span_id().to_u64()); - assert_eq!(s2.ctx.trace_id.get(), r3.span_context.trace_id().to_u128()); - } -} diff --git a/trace_exporters/src/thrift/agent.rs b/trace_exporters/src/thrift/agent.rs new file mode 100644 index 0000000000..454bf3f74a --- /dev/null +++ b/trace_exporters/src/thrift/agent.rs @@ -0,0 +1,305 @@ +// Autogenerated by Thrift Compiler (0.13.0) +// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + +#![allow(unused_imports)] +#![allow(unused_extern_crates)] +#![cfg_attr(rustfmt, rustfmt_skip)] + +extern crate thrift; + +use thrift::OrderedFloat; +use std::cell::RefCell; +use std::collections::{BTreeMap, BTreeSet}; +use std::convert::{From, TryFrom}; +use std::default::Default; +use std::error::Error; +use std::fmt; +use std::fmt::{Display, Formatter}; +use std::rc::Rc; + +use thrift::{ApplicationError, ApplicationErrorKind, ProtocolError, ProtocolErrorKind, TThriftClient}; +use thrift::protocol::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType, TInputProtocol, TOutputProtocol, TSetIdentifier, TStructIdentifier, TType}; +use thrift::protocol::field_id; +use thrift::protocol::verify_expected_message_type; +use thrift::protocol::verify_expected_sequence_number; +use thrift::protocol::verify_expected_service_call; +use thrift::protocol::verify_required_field_exists; +use thrift::server::TProcessor; + +use super::jaeger; +use super::zipkincore; + +// +// Agent service client +// + +pub trait TAgentSyncClient { + fn emit_zipkin_batch(&mut self, spans: Vec) -> thrift::Result<()>; + fn emit_batch(&mut self, batch: jaeger::Batch) -> thrift::Result<()>; +} + +pub trait TAgentSyncClientMarker {} + +pub struct AgentSyncClient where IP: TInputProtocol, OP: TOutputProtocol { + _i_prot: IP, + _o_prot: OP, + _sequence_number: i32, +} + +impl AgentSyncClient where IP: TInputProtocol, OP: TOutputProtocol { + pub fn new(input_protocol: IP, output_protocol: OP) -> AgentSyncClient { + AgentSyncClient { _i_prot: input_protocol, _o_prot: output_protocol, _sequence_number: 0 } + } +} + +impl TThriftClient for AgentSyncClient where IP: TInputProtocol, OP: TOutputProtocol { + fn i_prot_mut(&mut self) -> &mut dyn TInputProtocol { &mut self._i_prot } + fn o_prot_mut(&mut self) -> &mut dyn TOutputProtocol { &mut self._o_prot } + fn sequence_number(&self) -> i32 { self._sequence_number } + fn increment_sequence_number(&mut self) -> i32 { self._sequence_number += 1; self._sequence_number } +} + +impl TAgentSyncClientMarker for AgentSyncClient where IP: TInputProtocol, OP: TOutputProtocol {} + +impl TAgentSyncClient for C { + fn emit_zipkin_batch(&mut self, spans: Vec) -> thrift::Result<()> { + ( + { + self.increment_sequence_number(); + let message_ident = TMessageIdentifier::new("emitZipkinBatch", TMessageType::OneWay, self.sequence_number()); + let call_args = AgentEmitZipkinBatchArgs { spans: spans }; + self.o_prot_mut().write_message_begin(&message_ident)?; + call_args.write_to_out_protocol(self.o_prot_mut())?; + self.o_prot_mut().write_message_end()?; + self.o_prot_mut().flush() + } + )?; + Ok(()) + } + fn emit_batch(&mut self, batch: jaeger::Batch) -> thrift::Result<()> { + ( + { + self.increment_sequence_number(); + let message_ident = TMessageIdentifier::new("emitBatch", TMessageType::OneWay, self.sequence_number()); + let call_args = AgentEmitBatchArgs { batch: batch }; + self.o_prot_mut().write_message_begin(&message_ident)?; + call_args.write_to_out_protocol(self.o_prot_mut())?; + self.o_prot_mut().write_message_end()?; + self.o_prot_mut().flush() + } + )?; + Ok(()) + } +} + +// +// Agent service processor +// + +pub trait AgentSyncHandler { + fn handle_emit_zipkin_batch(&self, spans: Vec) -> thrift::Result<()>; + fn handle_emit_batch(&self, batch: jaeger::Batch) -> thrift::Result<()>; +} + +pub struct AgentSyncProcessor { + handler: H, +} + +impl AgentSyncProcessor { + pub fn new(handler: H) -> AgentSyncProcessor { + AgentSyncProcessor { + handler, + } + } + fn process_emit_zipkin_batch(&self, incoming_sequence_number: i32, i_prot: &mut dyn TInputProtocol, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + TAgentProcessFunctions::process_emit_zipkin_batch(&self.handler, incoming_sequence_number, i_prot, o_prot) + } + fn process_emit_batch(&self, incoming_sequence_number: i32, i_prot: &mut dyn TInputProtocol, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + TAgentProcessFunctions::process_emit_batch(&self.handler, incoming_sequence_number, i_prot, o_prot) + } +} + +pub struct TAgentProcessFunctions; + +impl TAgentProcessFunctions { + pub fn process_emit_zipkin_batch(handler: &H, _: i32, i_prot: &mut dyn TInputProtocol, _: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let args = AgentEmitZipkinBatchArgs::read_from_in_protocol(i_prot)?; + match handler.handle_emit_zipkin_batch(args.spans) { + Ok(_) => { + Ok(()) + }, + Err(e) => { + match e { + thrift::Error::Application(app_err) => { + Err(thrift::Error::Application(app_err)) + }, + _ => { + let ret_err = { + ApplicationError::new( + ApplicationErrorKind::Unknown, + e.description() + ) + }; + Err(thrift::Error::Application(ret_err)) + }, + } + }, + } + } + pub fn process_emit_batch(handler: &H, _: i32, i_prot: &mut dyn TInputProtocol, _: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let args = AgentEmitBatchArgs::read_from_in_protocol(i_prot)?; + match handler.handle_emit_batch(args.batch) { + Ok(_) => { + Ok(()) + }, + Err(e) => { + match e { + thrift::Error::Application(app_err) => { + Err(thrift::Error::Application(app_err)) + }, + _ => { + let ret_err = { + ApplicationError::new( + ApplicationErrorKind::Unknown, + e.description() + ) + }; + Err(thrift::Error::Application(ret_err)) + }, + } + }, + } + } +} + +impl TProcessor for AgentSyncProcessor { + fn process(&self, i_prot: &mut dyn TInputProtocol, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let message_ident = i_prot.read_message_begin()?; + let res = match &*message_ident.name { + "emitZipkinBatch" => { + self.process_emit_zipkin_batch(message_ident.sequence_number, i_prot, o_prot) + }, + "emitBatch" => { + self.process_emit_batch(message_ident.sequence_number, i_prot, o_prot) + }, + method => { + Err( + thrift::Error::Application( + ApplicationError::new( + ApplicationErrorKind::UnknownMethod, + format!("unknown method {}", method) + ) + ) + ) + }, + }; + thrift::server::handle_process_result(&message_ident, res, o_prot) + } +} + +// +// AgentEmitZipkinBatchArgs +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct AgentEmitZipkinBatchArgs { + spans: Vec, +} + +impl AgentEmitZipkinBatchArgs { + fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option> = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_0 = zipkincore::Span::read_from_in_protocol(i_prot)?; + val.push(list_elem_0); + } + i_prot.read_list_end()?; + f_1 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("AgentEmitZipkinBatchArgs.spans", &f_1)?; + let ret = AgentEmitZipkinBatchArgs { + spans: f_1.expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("emitZipkinBatch_args"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("spans", TType::List, 1))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::Struct, self.spans.len() as i32))?; + for e in &self.spans { + e.write_to_out_protocol(o_prot)?; + o_prot.write_list_end()?; + } + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// AgentEmitBatchArgs +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct AgentEmitBatchArgs { + batch: jaeger::Batch, +} + +impl AgentEmitBatchArgs { + fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = jaeger::Batch::read_from_in_protocol(i_prot)?; + f_1 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("AgentEmitBatchArgs.batch", &f_1)?; + let ret = AgentEmitBatchArgs { + batch: f_1.expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("emitBatch_args"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("batch", TType::Struct, 1))?; + self.batch.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + diff --git a/trace_exporters/src/thrift/jaeger.rs b/trace_exporters/src/thrift/jaeger.rs new file mode 100644 index 0000000000..d15b7e713d --- /dev/null +++ b/trace_exporters/src/thrift/jaeger.rs @@ -0,0 +1,1224 @@ +// Autogenerated by Thrift Compiler (0.13.0) +// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + +#![allow(unused_imports)] +#![allow(unused_extern_crates)] +#![cfg_attr(rustfmt, rustfmt_skip)] + +extern crate thrift; + +use thrift::OrderedFloat; +use std::cell::RefCell; +use std::collections::{BTreeMap, BTreeSet}; +use std::convert::{From, TryFrom}; +use std::default::Default; +use std::error::Error; +use std::fmt; +use std::fmt::{Display, Formatter}; +use std::rc::Rc; + +use thrift::{ApplicationError, ApplicationErrorKind, ProtocolError, ProtocolErrorKind, TThriftClient}; +use thrift::protocol::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType, TInputProtocol, TOutputProtocol, TSetIdentifier, TStructIdentifier, TType}; +use thrift::protocol::field_id; +use thrift::protocol::verify_expected_message_type; +use thrift::protocol::verify_expected_sequence_number; +use thrift::protocol::verify_expected_service_call; +use thrift::protocol::verify_required_field_exists; +use thrift::server::TProcessor; + +#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub enum TagType { + String = 0, + Double = 1, + Bool = 2, + Long = 3, + Binary = 4, +} + +impl TagType { + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + o_prot.write_i32(*self as i32) + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + let enum_value = i_prot.read_i32()?; + TagType::try_from(enum_value) } +} + +impl TryFrom for TagType { + type Error = thrift::Error; fn try_from(i: i32) -> Result { + match i { + 0 => Ok(TagType::String), + 1 => Ok(TagType::Double), + 2 => Ok(TagType::Bool), + 3 => Ok(TagType::Long), + 4 => Ok(TagType::Binary), + _ => { + Err( + thrift::Error::Protocol( + ProtocolError::new( + ProtocolErrorKind::InvalidData, + format!("cannot convert enum constant {} to TagType", i) + ) + ) + ) + }, + } + } +} + +#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub enum SpanRefType { + ChildOf = 0, + FollowsFrom = 1, +} + +impl SpanRefType { + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + o_prot.write_i32(*self as i32) + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + let enum_value = i_prot.read_i32()?; + SpanRefType::try_from(enum_value) } +} + +impl TryFrom for SpanRefType { + type Error = thrift::Error; fn try_from(i: i32) -> Result { + match i { + 0 => Ok(SpanRefType::ChildOf), + 1 => Ok(SpanRefType::FollowsFrom), + _ => { + Err( + thrift::Error::Protocol( + ProtocolError::new( + ProtocolErrorKind::InvalidData, + format!("cannot convert enum constant {} to SpanRefType", i) + ) + ) + ) + }, + } + } +} + +// +// Tag +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct Tag { + pub key: String, + pub v_type: TagType, + pub v_str: Option, + pub v_double: Option>, + pub v_bool: Option, + pub v_long: Option, + pub v_binary: Option>, +} + +impl Tag { + pub fn new(key: String, v_type: TagType, v_str: F3, v_double: F4, v_bool: F5, v_long: F6, v_binary: F7) -> Tag where F3: Into>, F4: Into>>, F5: Into>, F6: Into>, F7: Into>> { + Tag { + key: key, + v_type: v_type, + v_str: v_str.into(), + v_double: v_double.into(), + v_bool: v_bool.into(), + v_long: v_long.into(), + v_binary: v_binary.into(), + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + let mut f_2: Option = None; + let mut f_3: Option = None; + let mut f_4: Option> = None; + let mut f_5: Option = None; + let mut f_6: Option = None; + let mut f_7: Option> = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_string()?; + f_1 = Some(val); + }, + 2 => { + let val = TagType::read_from_in_protocol(i_prot)?; + f_2 = Some(val); + }, + 3 => { + let val = i_prot.read_string()?; + f_3 = Some(val); + }, + 4 => { + let val = OrderedFloat::from(i_prot.read_double()?); + f_4 = Some(val); + }, + 5 => { + let val = i_prot.read_bool()?; + f_5 = Some(val); + }, + 6 => { + let val = i_prot.read_i64()?; + f_6 = Some(val); + }, + 7 => { + let val = i_prot.read_bytes()?; + f_7 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("Tag.key", &f_1)?; + verify_required_field_exists("Tag.v_type", &f_2)?; + let ret = Tag { + key: f_1.expect("auto-generated code should have checked for presence of required fields"), + v_type: f_2.expect("auto-generated code should have checked for presence of required fields"), + v_str: f_3, + v_double: f_4, + v_bool: f_5, + v_long: f_6, + v_binary: f_7, + }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("Tag"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("key", TType::String, 1))?; + o_prot.write_string(&self.key)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("vType", TType::I32, 2))?; + self.v_type.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + if let Some(ref fld_var) = self.v_str { + o_prot.write_field_begin(&TFieldIdentifier::new("vStr", TType::String, 3))?; + o_prot.write_string(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(fld_var) = self.v_double { + o_prot.write_field_begin(&TFieldIdentifier::new("vDouble", TType::Double, 4))?; + o_prot.write_double(fld_var.into())?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(fld_var) = self.v_bool { + o_prot.write_field_begin(&TFieldIdentifier::new("vBool", TType::Bool, 5))?; + o_prot.write_bool(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(fld_var) = self.v_long { + o_prot.write_field_begin(&TFieldIdentifier::new("vLong", TType::I64, 6))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(ref fld_var) = self.v_binary { + o_prot.write_field_begin(&TFieldIdentifier::new("vBinary", TType::String, 7))?; + o_prot.write_bytes(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// Log +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct Log { + pub timestamp: i64, + pub fields: Vec, +} + +impl Log { + pub fn new(timestamp: i64, fields: Vec) -> Log { + Log { + timestamp: timestamp, + fields: fields, + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + let mut f_2: Option> = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_i64()?; + f_1 = Some(val); + }, + 2 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_0 = Tag::read_from_in_protocol(i_prot)?; + val.push(list_elem_0); + } + i_prot.read_list_end()?; + f_2 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("Log.timestamp", &f_1)?; + verify_required_field_exists("Log.fields", &f_2)?; + let ret = Log { + timestamp: f_1.expect("auto-generated code should have checked for presence of required fields"), + fields: f_2.expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("Log"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("timestamp", TType::I64, 1))?; + o_prot.write_i64(self.timestamp)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("fields", TType::List, 2))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::Struct, self.fields.len() as i32))?; + for e in &self.fields { + e.write_to_out_protocol(o_prot)?; + o_prot.write_list_end()?; + } + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// SpanRef +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct SpanRef { + pub ref_type: SpanRefType, + pub trace_id_low: i64, + pub trace_id_high: i64, + pub span_id: i64, +} + +impl SpanRef { + pub fn new(ref_type: SpanRefType, trace_id_low: i64, trace_id_high: i64, span_id: i64) -> SpanRef { + SpanRef { + ref_type: ref_type, + trace_id_low: trace_id_low, + trace_id_high: trace_id_high, + span_id: span_id, + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + let mut f_2: Option = None; + let mut f_3: Option = None; + let mut f_4: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = SpanRefType::read_from_in_protocol(i_prot)?; + f_1 = Some(val); + }, + 2 => { + let val = i_prot.read_i64()?; + f_2 = Some(val); + }, + 3 => { + let val = i_prot.read_i64()?; + f_3 = Some(val); + }, + 4 => { + let val = i_prot.read_i64()?; + f_4 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("SpanRef.ref_type", &f_1)?; + verify_required_field_exists("SpanRef.trace_id_low", &f_2)?; + verify_required_field_exists("SpanRef.trace_id_high", &f_3)?; + verify_required_field_exists("SpanRef.span_id", &f_4)?; + let ret = SpanRef { + ref_type: f_1.expect("auto-generated code should have checked for presence of required fields"), + trace_id_low: f_2.expect("auto-generated code should have checked for presence of required fields"), + trace_id_high: f_3.expect("auto-generated code should have checked for presence of required fields"), + span_id: f_4.expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("SpanRef"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("refType", TType::I32, 1))?; + self.ref_type.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("traceIdLow", TType::I64, 2))?; + o_prot.write_i64(self.trace_id_low)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("traceIdHigh", TType::I64, 3))?; + o_prot.write_i64(self.trace_id_high)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("spanId", TType::I64, 4))?; + o_prot.write_i64(self.span_id)?; + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// Span +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct Span { + pub trace_id_low: i64, + pub trace_id_high: i64, + pub span_id: i64, + pub parent_span_id: i64, + pub operation_name: String, + pub references: Option>, + pub flags: i32, + pub start_time: i64, + pub duration: i64, + pub tags: Option>, + pub logs: Option>, +} + +impl Span { + pub fn new(trace_id_low: i64, trace_id_high: i64, span_id: i64, parent_span_id: i64, operation_name: String, references: F6, flags: i32, start_time: i64, duration: i64, tags: F10, logs: F11) -> Span where F6: Into>>, F10: Into>>, F11: Into>> { + Span { + trace_id_low: trace_id_low, + trace_id_high: trace_id_high, + span_id: span_id, + parent_span_id: parent_span_id, + operation_name: operation_name, + references: references.into(), + flags: flags, + start_time: start_time, + duration: duration, + tags: tags.into(), + logs: logs.into(), + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + let mut f_2: Option = None; + let mut f_3: Option = None; + let mut f_4: Option = None; + let mut f_5: Option = None; + let mut f_6: Option> = None; + let mut f_7: Option = None; + let mut f_8: Option = None; + let mut f_9: Option = None; + let mut f_10: Option> = None; + let mut f_11: Option> = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_i64()?; + f_1 = Some(val); + }, + 2 => { + let val = i_prot.read_i64()?; + f_2 = Some(val); + }, + 3 => { + let val = i_prot.read_i64()?; + f_3 = Some(val); + }, + 4 => { + let val = i_prot.read_i64()?; + f_4 = Some(val); + }, + 5 => { + let val = i_prot.read_string()?; + f_5 = Some(val); + }, + 6 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_1 = SpanRef::read_from_in_protocol(i_prot)?; + val.push(list_elem_1); + } + i_prot.read_list_end()?; + f_6 = Some(val); + }, + 7 => { + let val = i_prot.read_i32()?; + f_7 = Some(val); + }, + 8 => { + let val = i_prot.read_i64()?; + f_8 = Some(val); + }, + 9 => { + let val = i_prot.read_i64()?; + f_9 = Some(val); + }, + 10 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_2 = Tag::read_from_in_protocol(i_prot)?; + val.push(list_elem_2); + } + i_prot.read_list_end()?; + f_10 = Some(val); + }, + 11 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_3 = Log::read_from_in_protocol(i_prot)?; + val.push(list_elem_3); + } + i_prot.read_list_end()?; + f_11 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("Span.trace_id_low", &f_1)?; + verify_required_field_exists("Span.trace_id_high", &f_2)?; + verify_required_field_exists("Span.span_id", &f_3)?; + verify_required_field_exists("Span.parent_span_id", &f_4)?; + verify_required_field_exists("Span.operation_name", &f_5)?; + verify_required_field_exists("Span.flags", &f_7)?; + verify_required_field_exists("Span.start_time", &f_8)?; + verify_required_field_exists("Span.duration", &f_9)?; + let ret = Span { + trace_id_low: f_1.expect("auto-generated code should have checked for presence of required fields"), + trace_id_high: f_2.expect("auto-generated code should have checked for presence of required fields"), + span_id: f_3.expect("auto-generated code should have checked for presence of required fields"), + parent_span_id: f_4.expect("auto-generated code should have checked for presence of required fields"), + operation_name: f_5.expect("auto-generated code should have checked for presence of required fields"), + references: f_6, + flags: f_7.expect("auto-generated code should have checked for presence of required fields"), + start_time: f_8.expect("auto-generated code should have checked for presence of required fields"), + duration: f_9.expect("auto-generated code should have checked for presence of required fields"), + tags: f_10, + logs: f_11, + }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("Span"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("traceIdLow", TType::I64, 1))?; + o_prot.write_i64(self.trace_id_low)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("traceIdHigh", TType::I64, 2))?; + o_prot.write_i64(self.trace_id_high)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("spanId", TType::I64, 3))?; + o_prot.write_i64(self.span_id)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("parentSpanId", TType::I64, 4))?; + o_prot.write_i64(self.parent_span_id)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("operationName", TType::String, 5))?; + o_prot.write_string(&self.operation_name)?; + o_prot.write_field_end()?; + if let Some(ref fld_var) = self.references { + o_prot.write_field_begin(&TFieldIdentifier::new("references", TType::List, 6))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::Struct, fld_var.len() as i32))?; + for e in fld_var { + e.write_to_out_protocol(o_prot)?; + o_prot.write_list_end()?; + } + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_begin(&TFieldIdentifier::new("flags", TType::I32, 7))?; + o_prot.write_i32(self.flags)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("startTime", TType::I64, 8))?; + o_prot.write_i64(self.start_time)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("duration", TType::I64, 9))?; + o_prot.write_i64(self.duration)?; + o_prot.write_field_end()?; + if let Some(ref fld_var) = self.tags { + o_prot.write_field_begin(&TFieldIdentifier::new("tags", TType::List, 10))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::Struct, fld_var.len() as i32))?; + for e in fld_var { + e.write_to_out_protocol(o_prot)?; + o_prot.write_list_end()?; + } + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(ref fld_var) = self.logs { + o_prot.write_field_begin(&TFieldIdentifier::new("logs", TType::List, 11))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::Struct, fld_var.len() as i32))?; + for e in fld_var { + e.write_to_out_protocol(o_prot)?; + o_prot.write_list_end()?; + } + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// Process +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct Process { + pub service_name: String, + pub tags: Option>, +} + +impl Process { + pub fn new(service_name: String, tags: F2) -> Process where F2: Into>> { + Process { + service_name: service_name, + tags: tags.into(), + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + let mut f_2: Option> = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_string()?; + f_1 = Some(val); + }, + 2 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_4 = Tag::read_from_in_protocol(i_prot)?; + val.push(list_elem_4); + } + i_prot.read_list_end()?; + f_2 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("Process.service_name", &f_1)?; + let ret = Process { + service_name: f_1.expect("auto-generated code should have checked for presence of required fields"), + tags: f_2, + }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("Process"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("serviceName", TType::String, 1))?; + o_prot.write_string(&self.service_name)?; + o_prot.write_field_end()?; + if let Some(ref fld_var) = self.tags { + o_prot.write_field_begin(&TFieldIdentifier::new("tags", TType::List, 2))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::Struct, fld_var.len() as i32))?; + for e in fld_var { + e.write_to_out_protocol(o_prot)?; + o_prot.write_list_end()?; + } + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// ClientStats +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct ClientStats { + pub full_queue_dropped_spans: i64, + pub too_large_dropped_spans: i64, + pub failed_to_emit_spans: i64, +} + +impl ClientStats { + pub fn new(full_queue_dropped_spans: i64, too_large_dropped_spans: i64, failed_to_emit_spans: i64) -> ClientStats { + ClientStats { + full_queue_dropped_spans: full_queue_dropped_spans, + too_large_dropped_spans: too_large_dropped_spans, + failed_to_emit_spans: failed_to_emit_spans, + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + let mut f_2: Option = None; + let mut f_3: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_i64()?; + f_1 = Some(val); + }, + 2 => { + let val = i_prot.read_i64()?; + f_2 = Some(val); + }, + 3 => { + let val = i_prot.read_i64()?; + f_3 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("ClientStats.full_queue_dropped_spans", &f_1)?; + verify_required_field_exists("ClientStats.too_large_dropped_spans", &f_2)?; + verify_required_field_exists("ClientStats.failed_to_emit_spans", &f_3)?; + let ret = ClientStats { + full_queue_dropped_spans: f_1.expect("auto-generated code should have checked for presence of required fields"), + too_large_dropped_spans: f_2.expect("auto-generated code should have checked for presence of required fields"), + failed_to_emit_spans: f_3.expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("ClientStats"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("fullQueueDroppedSpans", TType::I64, 1))?; + o_prot.write_i64(self.full_queue_dropped_spans)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("tooLargeDroppedSpans", TType::I64, 2))?; + o_prot.write_i64(self.too_large_dropped_spans)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("failedToEmitSpans", TType::I64, 3))?; + o_prot.write_i64(self.failed_to_emit_spans)?; + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// Batch +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct Batch { + pub process: Process, + pub spans: Vec, + pub seq_no: Option, + pub stats: Option, +} + +impl Batch { + pub fn new(process: Process, spans: Vec, seq_no: F3, stats: F4) -> Batch where F3: Into>, F4: Into> { + Batch { + process: process, + spans: spans, + seq_no: seq_no.into(), + stats: stats.into(), + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + let mut f_2: Option> = None; + let mut f_3: Option = None; + let mut f_4: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = Process::read_from_in_protocol(i_prot)?; + f_1 = Some(val); + }, + 2 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_5 = Span::read_from_in_protocol(i_prot)?; + val.push(list_elem_5); + } + i_prot.read_list_end()?; + f_2 = Some(val); + }, + 3 => { + let val = i_prot.read_i64()?; + f_3 = Some(val); + }, + 4 => { + let val = ClientStats::read_from_in_protocol(i_prot)?; + f_4 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("Batch.process", &f_1)?; + verify_required_field_exists("Batch.spans", &f_2)?; + let ret = Batch { + process: f_1.expect("auto-generated code should have checked for presence of required fields"), + spans: f_2.expect("auto-generated code should have checked for presence of required fields"), + seq_no: f_3, + stats: f_4, + }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("Batch"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("process", TType::Struct, 1))?; + self.process.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + o_prot.write_field_begin(&TFieldIdentifier::new("spans", TType::List, 2))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::Struct, self.spans.len() as i32))?; + for e in &self.spans { + e.write_to_out_protocol(o_prot)?; + o_prot.write_list_end()?; + } + o_prot.write_field_end()?; + if let Some(fld_var) = self.seq_no { + o_prot.write_field_begin(&TFieldIdentifier::new("seqNo", TType::I64, 3))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(ref fld_var) = self.stats { + o_prot.write_field_begin(&TFieldIdentifier::new("stats", TType::Struct, 4))?; + fld_var.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// BatchSubmitResponse +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct BatchSubmitResponse { + pub ok: bool, +} + +impl BatchSubmitResponse { + pub fn new(ok: bool) -> BatchSubmitResponse { + BatchSubmitResponse { + ok: ok, + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_bool()?; + f_1 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("BatchSubmitResponse.ok", &f_1)?; + let ret = BatchSubmitResponse { + ok: f_1.expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("BatchSubmitResponse"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("ok", TType::Bool, 1))?; + o_prot.write_bool(self.ok)?; + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// Collector service client +// + +pub trait TCollectorSyncClient { + fn submit_batches(&mut self, batches: Vec) -> thrift::Result>; +} + +pub trait TCollectorSyncClientMarker {} + +pub struct CollectorSyncClient where IP: TInputProtocol, OP: TOutputProtocol { + _i_prot: IP, + _o_prot: OP, + _sequence_number: i32, +} + +impl CollectorSyncClient where IP: TInputProtocol, OP: TOutputProtocol { + pub fn new(input_protocol: IP, output_protocol: OP) -> CollectorSyncClient { + CollectorSyncClient { _i_prot: input_protocol, _o_prot: output_protocol, _sequence_number: 0 } + } +} + +impl TThriftClient for CollectorSyncClient where IP: TInputProtocol, OP: TOutputProtocol { + fn i_prot_mut(&mut self) -> &mut dyn TInputProtocol { &mut self._i_prot } + fn o_prot_mut(&mut self) -> &mut dyn TOutputProtocol { &mut self._o_prot } + fn sequence_number(&self) -> i32 { self._sequence_number } + fn increment_sequence_number(&mut self) -> i32 { self._sequence_number += 1; self._sequence_number } +} + +impl TCollectorSyncClientMarker for CollectorSyncClient where IP: TInputProtocol, OP: TOutputProtocol {} + +impl TCollectorSyncClient for C { + fn submit_batches(&mut self, batches: Vec) -> thrift::Result> { + ( + { + self.increment_sequence_number(); + let message_ident = TMessageIdentifier::new("submitBatches", TMessageType::Call, self.sequence_number()); + let call_args = CollectorSubmitBatchesArgs { batches: batches }; + self.o_prot_mut().write_message_begin(&message_ident)?; + call_args.write_to_out_protocol(self.o_prot_mut())?; + self.o_prot_mut().write_message_end()?; + self.o_prot_mut().flush() + } + )?; + { + let message_ident = self.i_prot_mut().read_message_begin()?; + verify_expected_sequence_number(self.sequence_number(), message_ident.sequence_number)?; + verify_expected_service_call("submitBatches", &message_ident.name)?; + if message_ident.message_type == TMessageType::Exception { + let remote_error = thrift::Error::read_application_error_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + return Err(thrift::Error::Application(remote_error)) + } + verify_expected_message_type(TMessageType::Reply, message_ident.message_type)?; + let result = CollectorSubmitBatchesResult::read_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + result.ok_or() + } + } +} + +// +// Collector service processor +// + +pub trait CollectorSyncHandler { + fn handle_submit_batches(&self, batches: Vec) -> thrift::Result>; +} + +pub struct CollectorSyncProcessor { + handler: H, +} + +impl CollectorSyncProcessor { + pub fn new(handler: H) -> CollectorSyncProcessor { + CollectorSyncProcessor { + handler, + } + } + fn process_submit_batches(&self, incoming_sequence_number: i32, i_prot: &mut dyn TInputProtocol, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + TCollectorProcessFunctions::process_submit_batches(&self.handler, incoming_sequence_number, i_prot, o_prot) + } +} + +pub struct TCollectorProcessFunctions; + +impl TCollectorProcessFunctions { + pub fn process_submit_batches(handler: &H, incoming_sequence_number: i32, i_prot: &mut dyn TInputProtocol, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let args = CollectorSubmitBatchesArgs::read_from_in_protocol(i_prot)?; + match handler.handle_submit_batches(args.batches) { + Ok(handler_return) => { + let message_ident = TMessageIdentifier::new("submitBatches", TMessageType::Reply, incoming_sequence_number); + o_prot.write_message_begin(&message_ident)?; + let ret = CollectorSubmitBatchesResult { result_value: Some(handler_return) }; + ret.write_to_out_protocol(o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + }, + Err(e) => { + match e { + thrift::Error::Application(app_err) => { + let message_ident = TMessageIdentifier::new("submitBatches", TMessageType::Exception, incoming_sequence_number); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&app_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + }, + _ => { + let ret_err = { + ApplicationError::new( + ApplicationErrorKind::Unknown, + e.description() + ) + }; + let message_ident = TMessageIdentifier::new("submitBatches", TMessageType::Exception, incoming_sequence_number); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&ret_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + }, + } + }, + } + } +} + +impl TProcessor for CollectorSyncProcessor { + fn process(&self, i_prot: &mut dyn TInputProtocol, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let message_ident = i_prot.read_message_begin()?; + let res = match &*message_ident.name { + "submitBatches" => { + self.process_submit_batches(message_ident.sequence_number, i_prot, o_prot) + }, + method => { + Err( + thrift::Error::Application( + ApplicationError::new( + ApplicationErrorKind::UnknownMethod, + format!("unknown method {}", method) + ) + ) + ) + }, + }; + thrift::server::handle_process_result(&message_ident, res, o_prot) + } +} + +// +// CollectorSubmitBatchesArgs +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CollectorSubmitBatchesArgs { + batches: Vec, +} + +impl CollectorSubmitBatchesArgs { + fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option> = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_6 = Batch::read_from_in_protocol(i_prot)?; + val.push(list_elem_6); + } + i_prot.read_list_end()?; + f_1 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("CollectorSubmitBatchesArgs.batches", &f_1)?; + let ret = CollectorSubmitBatchesArgs { + batches: f_1.expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("submitBatches_args"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("batches", TType::List, 1))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::Struct, self.batches.len() as i32))?; + for e in &self.batches { + e.write_to_out_protocol(o_prot)?; + o_prot.write_list_end()?; + } + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// CollectorSubmitBatchesResult +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct CollectorSubmitBatchesResult { + result_value: Option>, +} + +impl CollectorSubmitBatchesResult { + fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_0: Option> = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 0 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_7 = BatchSubmitResponse::read_from_in_protocol(i_prot)?; + val.push(list_elem_7); + } + i_prot.read_list_end()?; + f_0 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = CollectorSubmitBatchesResult { + result_value: f_0, + }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("CollectorSubmitBatchesResult"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(ref fld_var) = self.result_value { + o_prot.write_field_begin(&TFieldIdentifier::new("result_value", TType::List, 0))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::Struct, fld_var.len() as i32))?; + for e in fld_var { + e.write_to_out_protocol(o_prot)?; + o_prot.write_list_end()?; + } + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } + fn ok_or(self) -> thrift::Result> { + if self.result_value.is_some() { + Ok(self.result_value.unwrap()) + } else { + Err( + thrift::Error::Application( + ApplicationError::new( + ApplicationErrorKind::MissingResult, + "no result received for CollectorSubmitBatches" + ) + ) + ) + } + } +} + diff --git a/trace_exporters/src/thrift/zipkincore.rs b/trace_exporters/src/thrift/zipkincore.rs new file mode 100644 index 0000000000..6a1117e924 --- /dev/null +++ b/trace_exporters/src/thrift/zipkincore.rs @@ -0,0 +1,1091 @@ +// Autogenerated by Thrift Compiler (0.13.0) +// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + +#![allow(unused_imports)] +#![allow(unused_extern_crates)] +#![cfg_attr(rustfmt, rustfmt_skip)] + +extern crate thrift; + +use thrift::OrderedFloat; +use std::cell::RefCell; +use std::collections::{BTreeMap, BTreeSet}; +use std::convert::{From, TryFrom}; +use std::default::Default; +use std::error::Error; +use std::fmt; +use std::fmt::{Display, Formatter}; +use std::rc::Rc; + +use thrift::{ApplicationError, ApplicationErrorKind, ProtocolError, ProtocolErrorKind, TThriftClient}; +use thrift::protocol::{TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType, TInputProtocol, TOutputProtocol, TSetIdentifier, TStructIdentifier, TType}; +use thrift::protocol::field_id; +use thrift::protocol::verify_expected_message_type; +use thrift::protocol::verify_expected_sequence_number; +use thrift::protocol::verify_expected_service_call; +use thrift::protocol::verify_required_field_exists; +use thrift::server::TProcessor; + +#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub enum AnnotationType { + Bool = 0, + Bytes = 1, + I16 = 2, + I32 = 3, + I64 = 4, + Double = 5, + String = 6, +} + +impl AnnotationType { + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + o_prot.write_i32(*self as i32) + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + let enum_value = i_prot.read_i32()?; + AnnotationType::try_from(enum_value) } +} + +impl TryFrom for AnnotationType { + type Error = thrift::Error; fn try_from(i: i32) -> Result { + match i { + 0 => Ok(AnnotationType::Bool), + 1 => Ok(AnnotationType::Bytes), + 2 => Ok(AnnotationType::I16), + 3 => Ok(AnnotationType::I32), + 4 => Ok(AnnotationType::I64), + 5 => Ok(AnnotationType::Double), + 6 => Ok(AnnotationType::String), + _ => { + Err( + thrift::Error::Protocol( + ProtocolError::new( + ProtocolErrorKind::InvalidData, + format!("cannot convert enum constant {} to AnnotationType", i) + ) + ) + ) + }, + } + } +} + +// +// Endpoint +// + +/// Indicates the network context of a service recording an annotation with two +/// exceptions. +/// +/// When a BinaryAnnotation, and key is CLIENT_ADDR or SERVER_ADDR, +/// the endpoint indicates the source or destination of an RPC. This exception +/// allows zipkin to display network context of uninstrumented services, or +/// clients such as web browsers. +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct Endpoint { + /// IPv4 host address packed into 4 bytes. + /// + /// Ex for the ip 1.2.3.4, it would be (1 << 24) | (2 << 16) | (3 << 8) | 4 + pub ipv4: Option, + /// IPv4 port + /// + /// Note: this is to be treated as an unsigned integer, so watch for negatives. + /// + /// Conventionally, when the port isn't known, port = 0. + pub port: Option, + /// Service name in lowercase, such as "memcache" or "zipkin-web" + /// + /// Conventionally, when the service name isn't known, service_name = "unknown". + pub service_name: Option, + /// IPv6 host address packed into 16 bytes. Ex Inet6Address.getBytes() + pub ipv6: Option>, +} + +impl Endpoint { + pub fn new(ipv4: F1, port: F2, service_name: F3, ipv6: F4) -> Endpoint where F1: Into>, F2: Into>, F3: Into>, F4: Into>> { + Endpoint { + ipv4: ipv4.into(), + port: port.into(), + service_name: service_name.into(), + ipv6: ipv6.into(), + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = Some(0); + let mut f_2: Option = Some(0); + let mut f_3: Option = Some("".to_owned()); + let mut f_4: Option> = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_i32()?; + f_1 = Some(val); + }, + 2 => { + let val = i_prot.read_i16()?; + f_2 = Some(val); + }, + 3 => { + let val = i_prot.read_string()?; + f_3 = Some(val); + }, + 4 => { + let val = i_prot.read_bytes()?; + f_4 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = Endpoint { + ipv4: f_1, + port: f_2, + service_name: f_3, + ipv6: f_4, + }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("Endpoint"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(fld_var) = self.ipv4 { + o_prot.write_field_begin(&TFieldIdentifier::new("ipv4", TType::I32, 1))?; + o_prot.write_i32(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(fld_var) = self.port { + o_prot.write_field_begin(&TFieldIdentifier::new("port", TType::I16, 2))?; + o_prot.write_i16(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(ref fld_var) = self.service_name { + o_prot.write_field_begin(&TFieldIdentifier::new("service_name", TType::String, 3))?; + o_prot.write_string(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(ref fld_var) = self.ipv6 { + o_prot.write_field_begin(&TFieldIdentifier::new("ipv6", TType::String, 4))?; + o_prot.write_bytes(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +impl Default for Endpoint { + fn default() -> Self { + Endpoint{ + ipv4: Some(0), + port: Some(0), + service_name: Some("".to_owned()), + ipv6: Some(Vec::new()), + } + } +} + +// +// Annotation +// + +/// An annotation is similar to a log statement. It includes a host field which +/// allows these events to be attributed properly, and also aggregatable. +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct Annotation { + /// Microseconds from epoch. + /// + /// This value should use the most precise value possible. For example, + /// gettimeofday or syncing nanoTime against a tick of currentTimeMillis. + pub timestamp: Option, + pub value: Option, + /// Always the host that recorded the event. By specifying the host you allow + /// rollup of all events (such as client requests to a service) by IP address. + pub host: Option, +} + +impl Annotation { + pub fn new(timestamp: F1, value: F2, host: F3) -> Annotation where F1: Into>, F2: Into>, F3: Into> { + Annotation { + timestamp: timestamp.into(), + value: value.into(), + host: host.into(), + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = Some(0); + let mut f_2: Option = Some("".to_owned()); + let mut f_3: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_i64()?; + f_1 = Some(val); + }, + 2 => { + let val = i_prot.read_string()?; + f_2 = Some(val); + }, + 3 => { + let val = Endpoint::read_from_in_protocol(i_prot)?; + f_3 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = Annotation { + timestamp: f_1, + value: f_2, + host: f_3, + }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("Annotation"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(fld_var) = self.timestamp { + o_prot.write_field_begin(&TFieldIdentifier::new("timestamp", TType::I64, 1))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(ref fld_var) = self.value { + o_prot.write_field_begin(&TFieldIdentifier::new("value", TType::String, 2))?; + o_prot.write_string(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(ref fld_var) = self.host { + o_prot.write_field_begin(&TFieldIdentifier::new("host", TType::Struct, 3))?; + fld_var.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +impl Default for Annotation { + fn default() -> Self { + Annotation{ + timestamp: Some(0), + value: Some("".to_owned()), + host: None, + } + } +} + +// +// BinaryAnnotation +// + +/// Binary annotations are tags applied to a Span to give it context. For +/// example, a binary annotation of "http.uri" could the path to a resource in a +/// RPC call. +/// +/// Binary annotations of type STRING are always queryable, though more a +/// historical implementation detail than a structural concern. +/// +/// Binary annotations can repeat, and vary on the host. Similar to Annotation, +/// the host indicates who logged the event. This allows you to tell the +/// difference between the client and server side of the same key. For example, +/// the key "http.uri" might be different on the client and server side due to +/// rewriting, like "/api/v1/myresource" vs "/myresource. Via the host field, +/// you can see the different points of view, which often help in debugging. +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct BinaryAnnotation { + pub key: Option, + pub value: Option>, + pub annotation_type: Option, + /// The host that recorded tag, which allows you to differentiate between + /// multiple tags with the same key. There are two exceptions to this. + /// + /// When the key is CLIENT_ADDR or SERVER_ADDR, host indicates the source or + /// destination of an RPC. This exception allows zipkin to display network + /// context of uninstrumented services, or clients such as web browsers. + pub host: Option, +} + +impl BinaryAnnotation { + pub fn new(key: F1, value: F2, annotation_type: F3, host: F4) -> BinaryAnnotation where F1: Into>, F2: Into>>, F3: Into>, F4: Into> { + BinaryAnnotation { + key: key.into(), + value: value.into(), + annotation_type: annotation_type.into(), + host: host.into(), + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = Some("".to_owned()); + let mut f_2: Option> = Some(Vec::new()); + let mut f_3: Option = None; + let mut f_4: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_string()?; + f_1 = Some(val); + }, + 2 => { + let val = i_prot.read_bytes()?; + f_2 = Some(val); + }, + 3 => { + let val = AnnotationType::read_from_in_protocol(i_prot)?; + f_3 = Some(val); + }, + 4 => { + let val = Endpoint::read_from_in_protocol(i_prot)?; + f_4 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = BinaryAnnotation { + key: f_1, + value: f_2, + annotation_type: f_3, + host: f_4, + }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("BinaryAnnotation"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(ref fld_var) = self.key { + o_prot.write_field_begin(&TFieldIdentifier::new("key", TType::String, 1))?; + o_prot.write_string(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(ref fld_var) = self.value { + o_prot.write_field_begin(&TFieldIdentifier::new("value", TType::String, 2))?; + o_prot.write_bytes(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(ref fld_var) = self.annotation_type { + o_prot.write_field_begin(&TFieldIdentifier::new("annotation_type", TType::I32, 3))?; + fld_var.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(ref fld_var) = self.host { + o_prot.write_field_begin(&TFieldIdentifier::new("host", TType::Struct, 4))?; + fld_var.write_to_out_protocol(o_prot)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +impl Default for BinaryAnnotation { + fn default() -> Self { + BinaryAnnotation{ + key: Some("".to_owned()), + value: Some(Vec::new()), + annotation_type: None, + host: None, + } + } +} + +// +// Span +// + +/// A trace is a series of spans (often RPC calls) which form a latency tree. +/// +/// The root span is where trace_id = id and parent_id = Nil. The root span is +/// usually the longest interval in the trace, starting with a SERVER_RECV +/// annotation and ending with a SERVER_SEND. +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct Span { + pub trace_id: Option, + /// Span name in lowercase, rpc method for example + /// + /// Conventionally, when the span name isn't known, name = "unknown". + pub name: Option, + pub id: Option, + pub parent_id: Option, + pub annotations: Option>, + pub binary_annotations: Option>, + pub debug: Option, + /// Microseconds from epoch of the creation of this span. + /// + /// This value should be set directly by instrumentation, using the most + /// precise value possible. For example, gettimeofday or syncing nanoTime + /// against a tick of currentTimeMillis. + /// + /// For compatibility with instrumentation that precede this field, collectors + /// or span stores can derive this via Annotation.timestamp. + /// For example, SERVER_RECV.timestamp or CLIENT_SEND.timestamp. + /// + /// This field is optional for compatibility with old data: first-party span + /// stores are expected to support this at time of introduction. + pub timestamp: Option, + /// Measurement of duration in microseconds, used to support queries. + /// + /// This value should be set directly, where possible. Doing so encourages + /// precise measurement decoupled from problems of clocks, such as skew or NTP + /// updates causing time to move backwards. + /// + /// For compatibility with instrumentation that precede this field, collectors + /// or span stores can derive this by subtracting Annotation.timestamp. + /// For example, SERVER_SEND.timestamp - SERVER_RECV.timestamp. + /// + /// If this field is persisted as unset, zipkin will continue to work, except + /// duration query support will be implementation-specific. Similarly, setting + /// this field non-atomically is implementation-specific. + /// + /// This field is i64 vs i32 to support spans longer than 35 minutes. + pub duration: Option, + /// Optional unique 8-byte additional identifier for a trace. If non zero, this + /// means the trace uses 128 bit traceIds instead of 64 bit. + pub trace_id_high: Option, +} + +impl Span { + pub fn new(trace_id: F1, name: F3, id: F4, parent_id: F5, annotations: F6, binary_annotations: F8, debug: F9, timestamp: F10, duration: F11, trace_id_high: F12) -> Span where F1: Into>, F3: Into>, F4: Into>, F5: Into>, F6: Into>>, F8: Into>>, F9: Into>, F10: Into>, F11: Into>, F12: Into> { + Span { + trace_id: trace_id.into(), + name: name.into(), + id: id.into(), + parent_id: parent_id.into(), + annotations: annotations.into(), + binary_annotations: binary_annotations.into(), + debug: debug.into(), + timestamp: timestamp.into(), + duration: duration.into(), + trace_id_high: trace_id_high.into(), + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = Some(0); + let mut f_3: Option = Some("".to_owned()); + let mut f_4: Option = Some(0); + let mut f_5: Option = None; + let mut f_6: Option> = Some(Vec::new()); + let mut f_8: Option> = Some(Vec::new()); + let mut f_9: Option = None; + let mut f_10: Option = None; + let mut f_11: Option = None; + let mut f_12: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_i64()?; + f_1 = Some(val); + }, + 3 => { + let val = i_prot.read_string()?; + f_3 = Some(val); + }, + 4 => { + let val = i_prot.read_i64()?; + f_4 = Some(val); + }, + 5 => { + let val = i_prot.read_i64()?; + f_5 = Some(val); + }, + 6 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_0 = Annotation::read_from_in_protocol(i_prot)?; + val.push(list_elem_0); + } + i_prot.read_list_end()?; + f_6 = Some(val); + }, + 8 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_1 = BinaryAnnotation::read_from_in_protocol(i_prot)?; + val.push(list_elem_1); + } + i_prot.read_list_end()?; + f_8 = Some(val); + }, + 9 => { + let val = i_prot.read_bool()?; + f_9 = Some(val); + }, + 10 => { + let val = i_prot.read_i64()?; + f_10 = Some(val); + }, + 11 => { + let val = i_prot.read_i64()?; + f_11 = Some(val); + }, + 12 => { + let val = i_prot.read_i64()?; + f_12 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = Span { + trace_id: f_1, + name: f_3, + id: f_4, + parent_id: f_5, + annotations: f_6, + binary_annotations: f_8, + debug: f_9, + timestamp: f_10, + duration: f_11, + trace_id_high: f_12, + }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("Span"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(fld_var) = self.trace_id { + o_prot.write_field_begin(&TFieldIdentifier::new("trace_id", TType::I64, 1))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(ref fld_var) = self.name { + o_prot.write_field_begin(&TFieldIdentifier::new("name", TType::String, 3))?; + o_prot.write_string(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(fld_var) = self.id { + o_prot.write_field_begin(&TFieldIdentifier::new("id", TType::I64, 4))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(fld_var) = self.parent_id { + o_prot.write_field_begin(&TFieldIdentifier::new("parent_id", TType::I64, 5))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(ref fld_var) = self.annotations { + o_prot.write_field_begin(&TFieldIdentifier::new("annotations", TType::List, 6))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::Struct, fld_var.len() as i32))?; + for e in fld_var { + e.write_to_out_protocol(o_prot)?; + o_prot.write_list_end()?; + } + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(ref fld_var) = self.binary_annotations { + o_prot.write_field_begin(&TFieldIdentifier::new("binary_annotations", TType::List, 8))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::Struct, fld_var.len() as i32))?; + for e in fld_var { + e.write_to_out_protocol(o_prot)?; + o_prot.write_list_end()?; + } + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(fld_var) = self.debug { + o_prot.write_field_begin(&TFieldIdentifier::new("debug", TType::Bool, 9))?; + o_prot.write_bool(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(fld_var) = self.timestamp { + o_prot.write_field_begin(&TFieldIdentifier::new("timestamp", TType::I64, 10))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(fld_var) = self.duration { + o_prot.write_field_begin(&TFieldIdentifier::new("duration", TType::I64, 11))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + if let Some(fld_var) = self.trace_id_high { + o_prot.write_field_begin(&TFieldIdentifier::new("trace_id_high", TType::I64, 12))?; + o_prot.write_i64(fld_var)?; + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +impl Default for Span { + fn default() -> Self { + Span{ + trace_id: Some(0), + name: Some("".to_owned()), + id: Some(0), + parent_id: Some(0), + annotations: Some(Vec::new()), + binary_annotations: Some(Vec::new()), + debug: Some(false), + timestamp: Some(0), + duration: Some(0), + trace_id_high: Some(0), + } + } +} + +// +// Response +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +pub struct Response { + pub ok: bool, +} + +impl Response { + pub fn new(ok: bool) -> Response { + Response { + ok: ok, + } + } + pub fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let val = i_prot.read_bool()?; + f_1 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("Response.ok", &f_1)?; + let ret = Response { + ok: f_1.expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + pub fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("Response"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("ok", TType::Bool, 1))?; + o_prot.write_bool(self.ok)?; + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +pub const C_L_I_E_N_T_S_E_N_D: &str = "cs"; + +pub const C_L_I_E_N_T_R_E_C_V: &str = "cr"; + +pub const S_E_R_V_E_R_S_E_N_D: &str = "ss"; + +pub const S_E_R_V_E_R_R_E_C_V: &str = "sr"; + +pub const M_E_S_S_A_G_E_S_E_N_D: &str = "ms"; + +pub const M_E_S_S_A_G_E_R_E_C_V: &str = "mr"; + +pub const W_I_R_E_S_E_N_D: &str = "ws"; + +pub const W_I_R_E_R_E_C_V: &str = "wr"; + +pub const C_L_I_E_N_T_S_E_N_D_F_R_A_G_M_E_N_T: &str = "csf"; + +pub const C_L_I_E_N_T_R_E_C_V_F_R_A_G_M_E_N_T: &str = "crf"; + +pub const S_E_R_V_E_R_S_E_N_D_F_R_A_G_M_E_N_T: &str = "ssf"; + +pub const S_E_R_V_E_R_R_E_C_V_F_R_A_G_M_E_N_T: &str = "srf"; + +pub const L_O_C_A_L_C_O_M_P_O_N_E_N_T: &str = "lc"; + +pub const C_L_I_E_N_T_A_D_D_R: &str = "ca"; + +pub const S_E_R_V_E_R_A_D_D_R: &str = "sa"; + +pub const M_E_S_S_A_G_E_A_D_D_R: &str = "ma"; + +// +// ZipkinCollector service client +// + +pub trait TZipkinCollectorSyncClient { + fn submit_zipkin_batch(&mut self, spans: Vec) -> thrift::Result>; +} + +pub trait TZipkinCollectorSyncClientMarker {} + +pub struct ZipkinCollectorSyncClient where IP: TInputProtocol, OP: TOutputProtocol { + _i_prot: IP, + _o_prot: OP, + _sequence_number: i32, +} + +impl ZipkinCollectorSyncClient where IP: TInputProtocol, OP: TOutputProtocol { + pub fn new(input_protocol: IP, output_protocol: OP) -> ZipkinCollectorSyncClient { + ZipkinCollectorSyncClient { _i_prot: input_protocol, _o_prot: output_protocol, _sequence_number: 0 } + } +} + +impl TThriftClient for ZipkinCollectorSyncClient where IP: TInputProtocol, OP: TOutputProtocol { + fn i_prot_mut(&mut self) -> &mut dyn TInputProtocol { &mut self._i_prot } + fn o_prot_mut(&mut self) -> &mut dyn TOutputProtocol { &mut self._o_prot } + fn sequence_number(&self) -> i32 { self._sequence_number } + fn increment_sequence_number(&mut self) -> i32 { self._sequence_number += 1; self._sequence_number } +} + +impl TZipkinCollectorSyncClientMarker for ZipkinCollectorSyncClient where IP: TInputProtocol, OP: TOutputProtocol {} + +impl TZipkinCollectorSyncClient for C { + fn submit_zipkin_batch(&mut self, spans: Vec) -> thrift::Result> { + ( + { + self.increment_sequence_number(); + let message_ident = TMessageIdentifier::new("submitZipkinBatch", TMessageType::Call, self.sequence_number()); + let call_args = ZipkinCollectorSubmitZipkinBatchArgs { spans: spans }; + self.o_prot_mut().write_message_begin(&message_ident)?; + call_args.write_to_out_protocol(self.o_prot_mut())?; + self.o_prot_mut().write_message_end()?; + self.o_prot_mut().flush() + } + )?; + { + let message_ident = self.i_prot_mut().read_message_begin()?; + verify_expected_sequence_number(self.sequence_number(), message_ident.sequence_number)?; + verify_expected_service_call("submitZipkinBatch", &message_ident.name)?; + if message_ident.message_type == TMessageType::Exception { + let remote_error = thrift::Error::read_application_error_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + return Err(thrift::Error::Application(remote_error)) + } + verify_expected_message_type(TMessageType::Reply, message_ident.message_type)?; + let result = ZipkinCollectorSubmitZipkinBatchResult::read_from_in_protocol(self.i_prot_mut())?; + self.i_prot_mut().read_message_end()?; + result.ok_or() + } + } +} + +// +// ZipkinCollector service processor +// + +pub trait ZipkinCollectorSyncHandler { + fn handle_submit_zipkin_batch(&self, spans: Vec) -> thrift::Result>; +} + +pub struct ZipkinCollectorSyncProcessor { + handler: H, +} + +impl ZipkinCollectorSyncProcessor { + pub fn new(handler: H) -> ZipkinCollectorSyncProcessor { + ZipkinCollectorSyncProcessor { + handler, + } + } + fn process_submit_zipkin_batch(&self, incoming_sequence_number: i32, i_prot: &mut dyn TInputProtocol, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + TZipkinCollectorProcessFunctions::process_submit_zipkin_batch(&self.handler, incoming_sequence_number, i_prot, o_prot) + } +} + +pub struct TZipkinCollectorProcessFunctions; + +impl TZipkinCollectorProcessFunctions { + pub fn process_submit_zipkin_batch(handler: &H, incoming_sequence_number: i32, i_prot: &mut dyn TInputProtocol, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let args = ZipkinCollectorSubmitZipkinBatchArgs::read_from_in_protocol(i_prot)?; + match handler.handle_submit_zipkin_batch(args.spans) { + Ok(handler_return) => { + let message_ident = TMessageIdentifier::new("submitZipkinBatch", TMessageType::Reply, incoming_sequence_number); + o_prot.write_message_begin(&message_ident)?; + let ret = ZipkinCollectorSubmitZipkinBatchResult { result_value: Some(handler_return) }; + ret.write_to_out_protocol(o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + }, + Err(e) => { + match e { + thrift::Error::Application(app_err) => { + let message_ident = TMessageIdentifier::new("submitZipkinBatch", TMessageType::Exception, incoming_sequence_number); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&app_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + }, + _ => { + let ret_err = { + ApplicationError::new( + ApplicationErrorKind::Unknown, + e.description() + ) + }; + let message_ident = TMessageIdentifier::new("submitZipkinBatch", TMessageType::Exception, incoming_sequence_number); + o_prot.write_message_begin(&message_ident)?; + thrift::Error::write_application_error_to_out_protocol(&ret_err, o_prot)?; + o_prot.write_message_end()?; + o_prot.flush() + }, + } + }, + } + } +} + +impl TProcessor for ZipkinCollectorSyncProcessor { + fn process(&self, i_prot: &mut dyn TInputProtocol, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let message_ident = i_prot.read_message_begin()?; + let res = match &*message_ident.name { + "submitZipkinBatch" => { + self.process_submit_zipkin_batch(message_ident.sequence_number, i_prot, o_prot) + }, + method => { + Err( + thrift::Error::Application( + ApplicationError::new( + ApplicationErrorKind::UnknownMethod, + format!("unknown method {}", method) + ) + ) + ) + }, + }; + thrift::server::handle_process_result(&message_ident, res, o_prot) + } +} + +// +// ZipkinCollectorSubmitZipkinBatchArgs +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct ZipkinCollectorSubmitZipkinBatchArgs { + spans: Vec, +} + +impl ZipkinCollectorSubmitZipkinBatchArgs { + fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_1: Option> = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 1 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_2 = Span::read_from_in_protocol(i_prot)?; + val.push(list_elem_2); + } + i_prot.read_list_end()?; + f_1 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + verify_required_field_exists("ZipkinCollectorSubmitZipkinBatchArgs.spans", &f_1)?; + let ret = ZipkinCollectorSubmitZipkinBatchArgs { + spans: f_1.expect("auto-generated code should have checked for presence of required fields"), + }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("submitZipkinBatch_args"); + o_prot.write_struct_begin(&struct_ident)?; + o_prot.write_field_begin(&TFieldIdentifier::new("spans", TType::List, 1))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::Struct, self.spans.len() as i32))?; + for e in &self.spans { + e.write_to_out_protocol(o_prot)?; + o_prot.write_list_end()?; + } + o_prot.write_field_end()?; + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } +} + +// +// ZipkinCollectorSubmitZipkinBatchResult +// + +#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] +struct ZipkinCollectorSubmitZipkinBatchResult { + result_value: Option>, +} + +impl ZipkinCollectorSubmitZipkinBatchResult { + fn read_from_in_protocol(i_prot: &mut dyn TInputProtocol) -> thrift::Result { + i_prot.read_struct_begin()?; + let mut f_0: Option> = None; + loop { + let field_ident = i_prot.read_field_begin()?; + if field_ident.field_type == TType::Stop { + break; + } + let field_id = field_id(&field_ident)?; + match field_id { + 0 => { + let list_ident = i_prot.read_list_begin()?; + let mut val: Vec = Vec::with_capacity(list_ident.size as usize); + for _ in 0..list_ident.size { + let list_elem_3 = Response::read_from_in_protocol(i_prot)?; + val.push(list_elem_3); + } + i_prot.read_list_end()?; + f_0 = Some(val); + }, + _ => { + i_prot.skip(field_ident.field_type)?; + }, + }; + i_prot.read_field_end()?; + } + i_prot.read_struct_end()?; + let ret = ZipkinCollectorSubmitZipkinBatchResult { + result_value: f_0, + }; + Ok(ret) + } + fn write_to_out_protocol(&self, o_prot: &mut dyn TOutputProtocol) -> thrift::Result<()> { + let struct_ident = TStructIdentifier::new("ZipkinCollectorSubmitZipkinBatchResult"); + o_prot.write_struct_begin(&struct_ident)?; + if let Some(ref fld_var) = self.result_value { + o_prot.write_field_begin(&TFieldIdentifier::new("result_value", TType::List, 0))?; + o_prot.write_list_begin(&TListIdentifier::new(TType::Struct, fld_var.len() as i32))?; + for e in fld_var { + e.write_to_out_protocol(o_prot)?; + o_prot.write_list_end()?; + } + o_prot.write_field_end()?; + () + } else { + () + } + o_prot.write_field_stop()?; + o_prot.write_struct_end() + } + fn ok_or(self) -> thrift::Result> { + if self.result_value.is_some() { + Ok(self.result_value.unwrap()) + } else { + Err( + thrift::Error::Application( + ApplicationError::new( + ApplicationErrorKind::MissingResult, + "no result received for ZipkinCollectorSubmitZipkinBatch" + ) + ) + ) + } + } +} +