From 9d7389dec26e4bb0ba6887642b17ddf618d29feb Mon Sep 17 00:00:00 2001 From: Dom Date: Wed, 9 Dec 2020 23:01:40 +0000 Subject: [PATCH] feat(tracing): add Jaeger tracing sink Adds telemetry / tracing with support for a Jaeger backend, and changes the logger from env_logger to a tracing subscriber to collect the log entries. Events are batched and then emitted asynchronosuly via UDP to the Jaeger collector using the tokio runtime. There's a bunch of settings (env vars) related to batch sizes and flush frequency etc - they're all using their default values at the moment (if it ain't broke...) See the docs for more info: https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#opentelemetry-environment-variable-specification This is only part 1 of telemetry - it does NOT propagate traces across RPC boundaries as we're still defining how all this should work. I've created #541 to track this. Closes #202 and closes #203. --- Cargo.lock | 145 +++++++++++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 6 +++ src/main.rs | 75 ++++++++++++++++++++++----- 3 files changed, 213 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index af049d28a8..5dcec6f635 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -55,6 +55,15 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "ansi_term" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "anyhow" version = "1.0.34" @@ -384,7 +393,7 @@ version = "2.33.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" dependencies = [ - "ansi_term", + "ansi_term 0.11.0", "atty", "bitflags", "strsim", @@ -1080,6 +1089,19 @@ dependencies = [ "tonic-build", ] +[[package]] +name = "generator" +version = "0.6.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cdc09201b2e8ca1b19290cf7e65de2246b8e91fb6874279722189c4de7b94dc" +dependencies = [ + "cc", + "libc", + "log", + "rustc_version", + "winapi 0.3.9", +] + [[package]] name = "generic-array" version = "0.12.3" @@ -1333,6 +1355,8 @@ dependencies = [ "ingest", "mem_qe", "object_store", + "opentelemetry", + "opentelemetry-jaeger", "packers", "predicates", "prost", @@ -1352,6 +1376,8 @@ dependencies = [ "tonic", "tracing", "tracing-futures", + "tracing-opentelemetry", + "tracing-subscriber", "wal", "write_buffer", ] @@ -1547,6 +1573,19 @@ dependencies = [ "cfg-if 0.1.10", ] +[[package]] +name = "loom" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0e8460f2f2121162705187214720353c517b97bdfb3494c0b1e33d83ebe4bed" +dependencies = [ + "cfg-if 0.1.10", + "generator", + "scoped-tls", + "serde", + "serde_json", +] + [[package]] name = "lz4" version = "1.23.2" @@ -1567,6 +1606,15 @@ dependencies = [ "libc", ] +[[package]] +name = "matchers" +version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" +dependencies = [ + "regex-automata", +] + [[package]] name = "matches" version = "0.1.8" @@ -1963,6 +2011,34 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e260f0ef133e1043b95143f83cfc8f6be023cf4b46a526da0212039b45392679" +dependencies = [ + "async-trait", + "futures", + "lazy_static", + "percent-encoding", + "pin-project 0.4.27", + "rand", + "regex", + "tokio", +] + +[[package]] +name = "opentelemetry-jaeger" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1434625fc67118fa79088f260051cf9149ca0d8737aefb7e4b5c6a861c765ca" +dependencies = [ + "async-trait", + "opentelemetry", + "thrift", + "tokio", +] + [[package]] name = "ordered-float" version = "1.1.0" @@ -2439,6 +2515,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" dependencies = [ "byteorder", + "regex-syntax", ] [[package]] @@ -2838,6 +2915,16 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "sharded-slab" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4921be914e16899a80adefb821f8ddb7974e3f1250223575a44ed994882127" +dependencies = [ + "lazy_static", + "loom", +] + [[package]] name = "shlex" version = "0.1.1" @@ -3510,6 +3597,62 @@ dependencies = [ "tracing", ] +[[package]] +name = "tracing-log" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-opentelemetry" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08457d22f95dc05ff929afcdc3d57f3a2437f9377765bb6e20ffeb67d72b1719" +dependencies = [ + "opentelemetry", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", +] + +[[package]] +name = "tracing-serde" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb65ea441fbb84f9f6748fd496cf7f63ec9af5bca94dd86456978d055e8eb28b" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1fa8f0c8f4c594e4fc9debc1990deab13238077271ba84dd853d54902ee3401" +dependencies = [ + "ansi_term 0.12.1", + "chrono", + "lazy_static", + "matchers", + "regex", + "serde", + "serde_json", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", + "tracing-serde", +] + [[package]] name = "treeline" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index b32aed06d5..b2ba61e475 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,12 @@ prost-types = "0.6.1" tracing = "0.1" tracing-futures="0.2.4" +# OpenTelemetry sinks for tracing +tracing-subscriber = "0.2.15" +tracing-opentelemetry = "0.9.0" +opentelemetry = { version = "0.10", default-features = false, features = ["trace", "tokio"] } +opentelemetry-jaeger = { version = "0.9", features = ["tokio"] } + http = "0.2.0" snafu = "0.6.9" flate2 = "1.0" diff --git a/src/main.rs b/src/main.rs index 60d63c6e0a..008c5e2c88 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,6 +24,7 @@ mod commands { } use panic::SendPanicsToTracing; +use tracing_subscriber::{prelude::*, EnvFilter}; enum ReturnCode { ConversionFailed = 1, @@ -129,7 +130,15 @@ Examples: )) .get_matches(); - setup_logging(matches.occurrences_of("verbose")); + let mut tokio_runtime = get_runtime(matches.value_of("num-threads"))?; + tokio_runtime.block_on(dispatch_args(matches)); + + info!("InfluxDB IOx server shutting down"); + Ok(()) +} + +async fn dispatch_args(matches: ArgMatches<'_>) { + let _drop_handle = setup_logging(matches.occurrences_of("verbose")); // Install custom panic handler and forget about it. // @@ -140,14 +149,6 @@ Examples: let f = SendPanicsToTracing::new(); std::mem::forget(f); - let mut tokio_runtime = get_runtime(matches.value_of("num-threads"))?; - tokio_runtime.block_on(dispatch_args(matches)); - - info!("InfluxDB IOx server shutting down"); - Ok(()) -} - -async fn dispatch_args(matches: ArgMatches<'_>) { match matches.subcommand() { ("convert", Some(sub_matches)) => { let input_path = sub_matches.value_of("INPUT").unwrap(); @@ -216,7 +217,7 @@ const DEFAULT_LOG_LEVEL: &str = "warn"; /// 2. if `-vv` (multiple instances of verbose), use DEFAULT_DEBUG_LOG_LEVEL /// 2. if `-v` (single instances of verbose), use DEFAULT_VERBOSE_LOG_LEVEL /// 3. Otherwise use DEFAULT_LOG_LEVEL -fn setup_logging(num_verbose: u64) { +fn setup_logging(num_verbose: u64) -> Option { let rust_log_env = std::env::var("RUST_LOG"); match rust_log_env { @@ -235,7 +236,52 @@ fn setup_logging(num_verbose: u64) { }, } - env_logger::init(); + // Configure the OpenTelemetry tracer, if requested. + // + // To enable the tracing functionality, set OTEL_EXPORTER_JAEGER_AGENT_HOST + // env to some suitable value (see below). + // + // The Jaeger layer emits traces under the service name of "iox" if not + // overwrote by the user by setting the env below. All configuration is + // sourced from the environment: + // + // - OTEL_SERVICE_NAME: emitter service name (iox by default) + // - OTEL_EXPORTER_JAEGER_AGENT_HOST: hostname/address of the collector + // - OTEL_EXPORTER_JAEGER_AGENT_PORT: listening port of the collector + // + let (opentelemetry, drop_handle) = match std::env::var("OTEL_EXPORTER_JAEGER_AGENT_HOST") { + Ok(_) => { + // Initialise the jaeger event emitter + let (tracer, drop_handle) = opentelemetry_jaeger::new_pipeline() + .with_service_name("iox") + .from_env() + .install() + .unwrap(); + + // Initialise the opentelemetry tracing layer, giving it the jaeger emitter + let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); + + (Some(opentelemetry), Some(drop_handle)) + } + Err(_) => (None, None), + }; + + // Configure the logger to write to stderr + let logger = tracing_subscriber::fmt::layer().with_writer(std::io::stderr); + + // Register the chain of event subscribers: + // + // - Jaeger tracing emitter + // - Env filter (using RUST_LOG as the filter env) + // - A stdout logger + // + tracing_subscriber::registry() + .with(opentelemetry) + .with(EnvFilter::from_default_env()) + .with(logger) + .init(); + + drop_handle } /// Creates the tokio runtime for executing IOx @@ -243,12 +289,17 @@ fn setup_logging(num_verbose: u64) { /// if nthreads is none, uses the default scheduler /// otherwise, creates a scheduler with the number of threads fn get_runtime(num_threads: Option<&str>) -> Result { + // NOTE: no log macros will work here! + // + // That means use eprintln!() instead of error!() and so on. The log emitter + // requires a running tokio runtime and is initialised after this function. + use tokio::runtime::Builder; let kind = std::io::ErrorKind::Other; match num_threads { None => Runtime::new(), Some(num_threads) => { - info!( + println!( "Setting number of threads to '{}' per command line request", num_threads );