diff --git a/.circleci/config.yml b/.circleci/config.yml index 752bce57b8..4ca737bbc3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -214,7 +214,7 @@ jobs: command: cargo test --workspace --benches --no-run - run: name: Build with object store + exporter support + HEAP profiling - command: cargo build --no-default-features --features="aws,gcp,azure,jaeger,otlp,heappy,pprof" + command: cargo build --no-default-features --features="aws,gcp,azure,jaeger,heappy,pprof" - cache_save # Lint protobufs. @@ -273,10 +273,10 @@ jobs: - cache_restore - run: name: Print rustc target CPU options - command: cargo run --release --no-default-features --features="aws,gcp,azure,jaeger,otlp,heappy" --bin print_cpu + command: cargo run --release --no-default-features --features="aws,gcp,azure,jaeger,heappy" --bin print_cpu - run: name: Cargo release build with target arch set for CRoaring - command: cargo build --release --no-default-features --features="aws,gcp,azure,jaeger,otlp,heappy" + command: cargo build --release --no-default-features --features="aws,gcp,azure,jaeger,heappy" - run: | echo sha256sum after build is sha256sum target/release/influxdb_iox diff --git a/Cargo.lock b/Cargo.lock index ec3a32fe11..27f7294f77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -160,11 +160,11 @@ dependencies = [ "base64 0.13.0", "bytes", "proc-macro2", - "prost 0.8.0", - "prost-derive 0.8.0", + "prost", + "prost-derive", "tokio", - "tonic 0.5.2", - "tonic-build 0.5.2", + "tonic", + "tonic-build", ] [[package]] @@ -1327,15 +1327,15 @@ dependencies = [ "num_cpus", "observability_deps", "proc-macro2", - "prost 0.8.0", - "prost-build 0.8.0", - "prost-types 0.8.0", + "prost", + "prost-build", + "prost-types", "regex", "serde", "serde_json", "thiserror", - "tonic 0.5.2", - "tonic-build 0.5.2", + "tonic", + "tonic-build", ] [[package]] @@ -1397,8 +1397,8 @@ version = "0.1.0" dependencies = [ "bytes", "chrono", - "prost 0.8.0", - "prost-build 0.8.0", + "prost", + "prost-build", "serde", ] @@ -1412,15 +1412,15 @@ dependencies = [ "grpc-router-test-gen", "observability_deps", "paste", - "prost 0.8.0", - "prost-build 0.8.0", - "prost-types 0.8.0", + "prost", + "prost-build", + "prost-types", "thiserror", "tokio", "tokio-stream", "tokio-util", - "tonic 0.5.2", - "tonic-build 0.5.2", + "tonic", + "tonic-build", "tonic-reflection", ] @@ -1428,11 +1428,11 @@ dependencies = [ name = "grpc-router-test-gen" version = "0.1.0" dependencies = [ - "prost 0.8.0", - "prost-build 0.8.0", - "prost-types 0.8.0", - "tonic 0.5.2", - "tonic-build 0.5.2", + "prost", + "prost-build", + "prost-types", + "tonic", + "tonic-build", ] [[package]] @@ -1493,7 +1493,7 @@ dependencies = [ "lazy_static", "libc", "pprof", - "prost 0.8.0", + "prost", "spin 0.9.2", "thiserror", "tikv-jemalloc-sys", @@ -1748,7 +1748,7 @@ dependencies = [ "pprof", "predicates", "prettytable-rs", - "prost 0.8.0", + "prost", "query", "rand 0.8.4", "rdkafka", @@ -1771,7 +1771,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tonic 0.5.2", + "tonic", "tonic-health", "tonic-reflection", "trace", @@ -1793,13 +1793,13 @@ dependencies = [ "generated_types", "http", "hyper", - "prost 0.8.0", + "prost", "rand 0.8.4", "serde", "serde_json", "thiserror", "tokio", - "tonic 0.5.2", + "tonic", "tower", ] @@ -2613,8 +2613,6 @@ dependencies = [ "pin-project 1.0.8", "rand 0.8.4", "thiserror", - "tokio", - "tokio-stream", ] [[package]] @@ -2628,24 +2626,6 @@ dependencies = [ "opentelemetry", "thiserror", "thrift", - "tokio", -] - -[[package]] -name = "opentelemetry-otlp" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c42168ec1ee8fe85f36600c41196963eb075ccdd0aaac947119f1f562c0804dd" -dependencies = [ - "async-trait", - "futures", - "http", - "opentelemetry", - "prost 0.7.0", - "thiserror", - "tokio", - "tonic 0.4.3", - "tonic-build 0.4.2", ] [[package]] @@ -2802,7 +2782,7 @@ dependencies = [ "parquet", "parquet-format", "persistence_windows", - "prost 0.8.0", + "prost", "query", "snafu", "tempfile", @@ -3014,9 +2994,9 @@ dependencies = [ "log", "nix", "parking_lot", - "prost 0.8.0", - "prost-build 0.8.0", - "prost-derive 0.8.0", + "prost", + "prost-build", + "prost-derive", "symbolic-demangle", "tempfile", "thiserror", @@ -3142,16 +3122,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "prost" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e6984d2f1a23009bd270b8bb56d0926810a3d483f59c987d77969e9d8e840b2" -dependencies = [ - "bytes", - "prost-derive 0.7.0", -] - [[package]] name = "prost" version = "0.8.0" @@ -3159,25 +3129,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "de5e2533f59d08fcf364fd374ebda0692a70bd6d7e66ef97f306f45c6c5d8020" dependencies = [ "bytes", - "prost-derive 0.8.0", -] - -[[package]] -name = "prost-build" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32d3ebd75ac2679c2af3a92246639f9fcc8a442ee420719cc4fe195b98dd5fa3" -dependencies = [ - "bytes", - "heck", - "itertools 0.9.0", - "log", - "multimap", - "petgraph", - "prost 0.7.0", - "prost-types 0.7.0", - "tempfile", - "which 4.2.2", + "prost-derive", ] [[package]] @@ -3192,25 +3144,12 @@ dependencies = [ "log", "multimap", "petgraph", - "prost 0.8.0", - "prost-types 0.8.0", + "prost", + "prost-types", "tempfile", "which 4.2.2", ] -[[package]] -name = "prost-derive" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "169a15f3008ecb5160cba7d37bcd690a7601b6d30cfb87a117d45e59d52af5d4" -dependencies = [ - "anyhow", - "itertools 0.9.0", - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "prost-derive" version = "0.8.0" @@ -3224,16 +3163,6 @@ dependencies = [ "syn", ] -[[package]] -name = "prost-types" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b518d7cdd93dab1d1122cf07fa9a60771836c668dde9d9e2a139f957f0d9f1bb" -dependencies = [ - "bytes", - "prost 0.7.0", -] - [[package]] name = "prost-types" version = "0.8.0" @@ -3241,7 +3170,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "603bbd6394701d13f3f25aada59c7de9d35a6a5887cfc156181234a44002771b" dependencies = [ "bytes", - "prost 0.8.0", + "prost", ] [[package]] @@ -4704,35 +4633,6 @@ dependencies = [ "serde", ] -[[package]] -name = "tonic" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ac42cd97ac6bd2339af5bcabf105540e21e45636ec6fa6aae5e85d44db31be0" -dependencies = [ - "async-stream", - "async-trait", - "base64 0.13.0", - "bytes", - "futures-core", - "futures-util", - "h2", - "http", - "http-body", - "hyper", - "percent-encoding", - "pin-project 1.0.8", - "prost 0.7.0", - "prost-derive 0.7.0", - "tokio", - "tokio-stream", - "tokio-util", - "tower", - "tower-service", - "tracing", - "tracing-futures", -] - [[package]] name = "tonic" version = "0.5.2" @@ -4752,8 +4652,8 @@ dependencies = [ "hyper-timeout", "percent-encoding", "pin-project 1.0.8", - "prost 0.8.0", - "prost-derive 0.8.0", + "prost", + "prost-derive", "tokio", "tokio-stream", "tokio-util", @@ -4764,18 +4664,6 @@ dependencies = [ "tracing-futures", ] -[[package]] -name = "tonic-build" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c695de27302f4697191dda1c7178131a8cb805463dda02864acb80fe1322fdcf" -dependencies = [ - "proc-macro2", - "prost-build 0.7.0", - "quote", - "syn", -] - [[package]] name = "tonic-build" version = "0.5.2" @@ -4783,7 +4671,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "12b52d07035516c2b74337d2ac7746075e7dcae7643816c1b12c5ff8a7484c08" dependencies = [ "proc-macro2", - "prost-build 0.8.0", + "prost-build", "quote", "syn", ] @@ -4796,11 +4684,11 @@ checksum = "493fcae35818dffa28437b210a615119d791116c1cac80716f571f35dd55b1b9" dependencies = [ "async-stream", "bytes", - "prost 0.8.0", + "prost", "tokio", "tokio-stream", - "tonic 0.5.2", - "tonic-build 0.5.2", + "tonic", + "tonic-build", ] [[package]] @@ -4810,12 +4698,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8249761575cfef2635c3df0aa9f09d8e42d53ff0f7c21739ff2b37dd3343d827" dependencies = [ "bytes", - "prost 0.8.0", - "prost-types 0.8.0", + "prost", + "prost-types", "tokio", "tokio-stream", - "tonic 0.5.2", - "tonic-build 0.5.2", + "tonic", + "tonic-build", ] [[package]] @@ -4940,18 +4828,6 @@ dependencies = [ "tracing-core", ] -[[package]] -name = "tracing-opentelemetry" -version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c47440f2979c4cd3138922840eec122e3c0ba2148bc290f756bd7fd60fc97fff" -dependencies = [ - "opentelemetry", - "tracing", - "tracing-core", - "tracing-subscriber", -] - [[package]] name = "tracing-serde" version = "0.1.2" @@ -5011,15 +4887,11 @@ version = "0.1.0" dependencies = [ "logfmt", "observability_deps", - "opentelemetry", - "opentelemetry-jaeger", - "opentelemetry-otlp", "regex", "structopt", "synchronized-writer", "thiserror", "tracing-log", - "tracing-opentelemetry", "tracing-subscriber", ] diff --git a/Cargo.toml b/Cargo.toml index baf66f0331..4b018fb36c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -156,8 +156,7 @@ default = [ "jemalloc_replacing_malloc" ] azure = ["object_store/azure"] # Optional Azure Object store support gcp = ["object_store/gcp"] # Optional GCP object store support aws = ["object_store/aws"] # Optional AWS / S3 object store support -jaeger = ["trogging/jaeger", "trace_exporters/jaeger"] # Enable optional jaeger tracing support -otlp = ["trogging/otlp"] # Enable optional open telemetry collector +jaeger = ["trace_exporters/jaeger"] # Enable optional jaeger tracing support # pprof is an optional feature for pprof support # heappy is an optional feature; Not on by default as it diff --git a/Dockerfile b/Dockerfile index 0a3022534d..1280a6fecc 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,7 +13,7 @@ RUN \ --mount=type=cache,id=influxdb_iox_git,sharing=locked,target=/usr/local/cargo/git \ --mount=type=cache,id=influxdb_iox_target,sharing=locked,target=/influxdb_iox/target \ du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target && \ - cargo build --target-dir /influxdb_iox/target --release --features azure,gcp,aws,jaeger,otlp,pprof && \ + cargo build --target-dir /influxdb_iox/target --release --features azure,gcp,aws,jaeger,pprof && \ cp /influxdb_iox/target/release/influxdb_iox /root/influxdb_iox && \ du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target diff --git a/src/commands/tracing.rs b/src/commands/tracing.rs index 90cf8fdb56..a40d727afa 100644 --- a/src/commands/tracing.rs +++ b/src/commands/tracing.rs @@ -3,10 +3,10 @@ use std::cmp::max; use trogging::cli::LoggingConfigBuilderExt; pub use trogging::config::*; -pub use trogging::TracingGuard; +pub use trogging::TroggingGuard; /// Start simple logger. Panics on error. -pub fn init_simple_logs(log_verbose_count: u8) -> Result { +pub fn init_simple_logs(log_verbose_count: u8) -> Result { trogging::Builder::new() .with_log_verbose_count(log_verbose_count) .install_global() @@ -16,7 +16,7 @@ pub fn init_simple_logs(log_verbose_count: u8) -> Result Result { +) -> Result { let mut logging_config = config.logging_config.clone(); // Handle the case if -v/-vv is specified both before and after the server diff --git a/src/main.rs b/src/main.rs index 056c41bdd1..955a144e4d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,7 +16,7 @@ use tokio::runtime::Runtime; use commands::tracing::{init_logs_and_tracing, init_simple_logs}; use observability_deps::tracing::warn; -use crate::commands::tracing::TracingGuard; +use crate::commands::tracing::TroggingGuard; use influxdb_iox_client::connection::Builder; use std::str::FromStr; @@ -164,7 +164,7 @@ fn main() -> Result<(), std::io::Error> { } }; - fn handle_init_logs(r: Result) -> TracingGuard { + fn handle_init_logs(r: Result) -> TroggingGuard { match r { Ok(guard) => guard, Err(e) => { diff --git a/trogging/Cargo.toml b/trogging/Cargo.toml index 217c8e90f7..c048e607a9 100644 --- a/trogging/Cargo.toml +++ b/trogging/Cargo.toml @@ -3,26 +3,18 @@ name = "trogging" version = "0.1.0" authors = ["Marko Mikulicic "] edition = "2018" -description = "(TR)acing and l(OGGING) configuration" +description = "IOx logging pipeline built upon tokio-tracing" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] logfmt = { path = "../logfmt" } observability_deps = { path = "../observability_deps" } -opentelemetry = "0.15" -opentelemetry-jaeger = { version = "0.14", features = ["tokio"], optional = true } -opentelemetry-otlp = { version = "0.8", optional = true } thiserror = "1.0.23" tracing-log = "0.1" tracing-subscriber = "0.2" -tracing-opentelemetry = { version = "0.14", default-features = false } structopt = { version = "0.3.21", optional = true } [dev-dependencies] synchronized-writer = "1" regex = "1" - -[features] -jaeger = ["opentelemetry-jaeger", "opentelemetry/rt-tokio"] -otlp = ["opentelemetry-otlp", "opentelemetry/rt-tokio"] diff --git a/trogging/src/cli.rs b/trogging/src/cli.rs index c8e6f4f167..c0538d14f9 100644 --- a/trogging/src/cli.rs +++ b/trogging/src/cli.rs @@ -1,6 +1,5 @@ ///! Common CLI flags for logging and tracing -use crate::{config::*, Builder, Result, TracingGuard}; -use std::num::NonZeroU16; +use crate::{config::*, Builder, Result, TroggingGuard}; use structopt::StructOpt; use tracing_subscriber::fmt::{writer::BoxMakeWriter, MakeWriter}; @@ -117,7 +116,7 @@ impl LoggingConfig { .with_log_format(self.log_format) } - pub fn install_global_subscriber(&self) -> Result { + pub fn install_global_subscriber(&self) -> Result { self.to_builder().install_global() } } @@ -137,207 +136,12 @@ where } } -/// CLI config for the logging+tracing related subset of options. -#[derive(Debug, StructOpt, Clone)] -pub struct TracingConfig { - /// Tracing: exporter type - /// - /// Can be one of: none, jaeger, otlp - /// - /// When enabled, additional flags are considered (see flags related to OTLP - /// and Jaeger). - #[structopt( - long = "--traces-exporter", - env = "TRACES_EXPORTER", - default_value = "none" - )] - pub traces_exporter: TracesExporter, - - /// Tracing: filter directive - /// - /// Configures traces severity level filter, by target. - /// - /// Simplest options: error, warn, info, debug, trace - /// - /// Levels for different modules can be specified. For example - /// `debug,hyper::proto::h1=info` specifies debug tracing for all modules - /// except for the `hyper::proto::h1` module which will only display info - /// level tracing. - /// - /// Extended syntax provided by `tracing-subscriber` includes span/field - /// filters. See for more details. - /// - /// No filter by default. - #[structopt(long = "--traces-filter", env = "TRACES_FILTER")] - pub traces_filter: Option, - - /// Tracing: sampler type - /// - /// Can be one of: - /// always_on, always_off, traceidratio, - /// parentbased_always_on, parentbased_always_off, parentbased_traceidratio - /// - /// These alternatives are described in detail at . - #[structopt( - long = "--traces-sampler", - env = "TRACES_SAMPLER", - default_value = "parentbased_traceidratio" - )] - pub traces_sampler: TracesSampler, - - #[rustfmt::skip] - /// Tracing: sampler argument - /// - /// Valid range: [0.0, 1.0]. - /// - /// Only used if `--traces-sampler` is set to - /// parentbased_traceidratio (default) or traceidratio. - /// - /// With sample parentbased_traceidratio, the following rules apply: - /// - if parent is sampled, then all of its children are sampled - /// - else sample this portion of traces (0.5 = 50%) - /// - /// More details about this sampling argument at . - #[structopt( - long = "--traces-sampler-arg", - env = "TRACES_SAMPLER_ARG", - default_value = "1.0", - verbatim_doc_comment - )] - pub traces_sampler_arg: f64, - - /// Tracing: OTLP (eg OpenTelemetry collector) network hostname - /// - /// Only used if `--traces-exporter` is "otlp". - /// - /// Protocol is gRPC. HTTP is not supported. - #[structopt( - long = "--traces-exporter-otlp-host", - env = "TRACES_EXPORTER_OTLP_HOST", - default_value = "localhost" - )] - pub traces_exporter_otlp_host: String, - - /// Tracing: OTLP (eg OpenTelemetry collector) network port - /// - /// Only used if `--traces-exporter` is "otlp". - /// - /// Protocol is gRPC. HTTP is not supported. - #[structopt( - long = "--traces-exporter-otlp-port", - env = "TRACES_EXPORTER_OTLP_PORT", - default_value = "4317" - )] - pub traces_exporter_otlp_port: NonZeroU16, - - /// Tracing: Jaeger agent network hostname - /// - /// Protocol is Thrift/Compact over UDP. - /// - /// Only used if `--traces-exporter` is "jaeger". - #[structopt( - long = "--traces-exporter-jaeger-agent-host", - env = "TRACES_EXPORTER_JAEGER_AGENT_HOST", - default_value = "0.0.0.0" - )] - pub traces_exporter_jaeger_agent_host: String, - - /// Tracing: Jaeger agent network port - /// - /// Protocol is Thrift/Compact over UDP. - /// - /// Only used if `--traces-exporter` is "jaeger". - #[structopt( - long = "--traces-exporter-jaeger-agent-port", - env = "TRACES_EXPORTER_JAEGER_AGENT_PORT", - default_value = "6831" - )] - pub traces_exporter_jaeger_agent_port: NonZeroU16, - - /// Tracing: Jaeger service name. - /// - /// Only used if `--traces-exporter` is "jaeger". - #[structopt( - long = "--traces-exporter-jaeger-service-name", - env = "TRACES_EXPORTER_JAEGER_SERVICE_NAME", - default_value = "iox-conductor" - )] - pub traces_exporter_jaeger_service_name: String, - - /// Tracing: Jaeger max UDP packet size - /// - /// Default to 1300, which is a safe MTU. - /// - /// You can increase it to 65000 if the target is a jaeger collector - /// on localhost. If so, the batching exporter will be enabled for - /// extra efficiency. Otherwise an UDP packet will be sent for each exported span. - /// - /// Only used if `--traces-exporter` is "jaeger". - #[structopt( - long = "--traces-exporter-jaeger-max-packet-size", - env = "TRACES_EXPORTER_JAEGER_MAX_PACKET_SIZE", - default_value = "1300" - )] - pub traces_exporter_jaeger_max_packet_size: usize, -} - -impl TracingConfig { - pub fn to_builder(&self) -> Builder { - self.with_builder(Builder::new()) - } - - pub fn with_builder(&self, builder: Builder) -> Builder - where - W: MakeWriter + Send + Sync + Clone + 'static, - { - builder - .with_traces_filter(&self.traces_filter) - .with_traces_exporter(self.traces_exporter) - .with_traces_sampler(self.traces_sampler, self.traces_sampler_arg) - .with_jaeger_config(JaegerConfig { - agent_host: self.traces_exporter_jaeger_agent_host.clone(), - agent_port: self.traces_exporter_jaeger_agent_port, - service_name: self.traces_exporter_jaeger_service_name.clone(), - max_packet_size: self.traces_exporter_jaeger_max_packet_size, - }) - .with_oltp_config(OtlpConfig { - host: self.traces_exporter_otlp_host.clone(), - port: self.traces_exporter_otlp_port, - }) - } - - pub fn install_global_subscriber(&self) -> Result { - self.to_builder().install_global() - } -} - -/// Extends the trogging [`crate::Builder`] API. -pub trait TracingConfigBuilderExt { - /// Applies all config entries from a [`TracingConfig`] to a [`crate::Builder`]. - fn with_tracing_config(self, config: &TracingConfig) -> Builder; -} - -impl TracingConfigBuilderExt for Builder -where - W: MakeWriter + Send + Sync + Clone + 'static, -{ - fn with_tracing_config(self, config: &TracingConfig) -> Self { - config.with_builder(self) - } -} - impl From for Builder { fn from(config: LoggingConfig) -> Self { config.to_builder() } } -impl From for Builder { - fn from(config: TracingConfig) -> Self { - config.to_builder() - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/trogging/src/config.rs b/trogging/src/config.rs index 9ab55cfe67..f4dd89c3f7 100644 --- a/trogging/src/config.rs +++ b/trogging/src/config.rs @@ -1,5 +1,3 @@ -use std::num::NonZeroU16; - #[derive(Debug, Clone, Copy, PartialEq)] pub enum LogFormat { Full, @@ -65,89 +63,3 @@ impl std::fmt::Display for LogDestination { } } } - -#[derive(Debug, Clone, Copy)] -pub enum TracesExporter { - None, - Jaeger, - Otlp, -} - -impl std::str::FromStr for TracesExporter { - type Err = String; - - fn from_str(s: &str) -> Result { - match s.to_ascii_lowercase().as_str() { - "none" => Ok(Self::None), - "jaeger" => Ok(Self::Jaeger), - "otlp" => Ok(Self::Otlp), - _ => Err(format!( - "Invalid traces exporter '{}'. Valid options: none, jaeger, otlp", - s - )), - } - } -} - -impl std::fmt::Display for TracesExporter { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::None => write!(f, "none"), - Self::Jaeger => write!(f, "jaeger"), - Self::Otlp => write!(f, "otlp"), - } - } -} - -#[derive(Debug, Clone, Copy)] -pub enum TracesSampler { - AlwaysOn, - AlwaysOff, - TraceIdRatio, - ParentBasedAlwaysOn, - ParentBasedAlwaysOff, - ParentBasedTraceIdRatio, -} - -impl std::str::FromStr for TracesSampler { - type Err = String; - - fn from_str(s: &str) -> Result { - match s.to_ascii_lowercase().as_str() { - "always_on" => Ok(Self::AlwaysOn), - "always_off" => Ok(Self::AlwaysOff), - "traceidratio" => Ok(Self::TraceIdRatio), - "parentbased_always_on" => Ok(Self::ParentBasedAlwaysOn), - "parentbased_always_off" => Ok(Self::ParentBasedAlwaysOff), - "parentbased_traceidratio" => Ok(Self::ParentBasedTraceIdRatio), - _ => Err(format!("Invalid traces sampler '{}'. Valid options: always_on, always_off, traceidratio, parentbased_always_on, parentbased_always_off, parentbased_traceidratio", s)), - } - } -} - -impl std::fmt::Display for TracesSampler { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::AlwaysOn => write!(f, "always_on"), - Self::AlwaysOff => write!(f, "always_off"), - Self::TraceIdRatio => write!(f, "traceidratio"), - Self::ParentBasedAlwaysOn => write!(f, "parentbased_always_on"), - Self::ParentBasedAlwaysOff => write!(f, "parentbased_always_off"), - Self::ParentBasedTraceIdRatio => write!(f, "parentbased_traceidratio"), - } - } -} - -#[derive(Debug)] -pub struct JaegerConfig { - pub agent_host: String, - pub agent_port: NonZeroU16, - pub service_name: String, - pub max_packet_size: usize, -} - -#[derive(Debug)] -pub struct OtlpConfig { - pub host: String, - pub port: NonZeroU16, -} diff --git a/trogging/src/layered_tracing.rs b/trogging/src/layered_tracing.rs deleted file mode 100644 index 5056bc0df7..0000000000 --- a/trogging/src/layered_tracing.rs +++ /dev/null @@ -1,270 +0,0 @@ -use observability_deps::tracing::Metadata; -use observability_deps::tracing::{ - event::Event, - span::{Attributes, Id, Record}, - subscriber::Subscriber, -}; -use std::fmt::Formatter; -use std::marker::PhantomData; -use std::sync::Arc; -use tracing_subscriber::layer::{Context, Layer}; -use tracing_subscriber::EnvFilter; - -/// A FilteredLayer wraps a tracing subscriber Layer and passes events only -/// if the provided EnvFilter accepts the event. -/// -/// Tracing subscriber's layering makes it easy to split event filtering from event recording -/// and collection, but the filtering decisions apply to all layers, effectively producing -/// an intersection of all filtering decisions. -/// -/// A FilteredLayer on the other hand, allows to restrict the verbosity of one event sink -/// without throwing away events available to other layers. -#[derive(Debug)] -pub struct FilteredLayer -where - S: Subscriber, - L: Layer, -{ - inner: L, - _phantom_data: PhantomData, -} - -impl FilteredLayer -where - S: Subscriber, - L: Layer, -{ - pub fn new(inner: L) -> Self { - Self { - inner, - _phantom_data: Default::default(), - } - } -} - -impl Layer for FilteredLayer -where - S: Subscriber, - L: Layer, -{ - fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { - if self.inner.enabled(event.metadata(), ctx.clone()) { - self.inner.on_event(event, ctx); - } - } - fn new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) { - self.inner.new_span(attrs, id, ctx) - } - fn on_record(&self, span: &Id, values: &Record<'_>, ctx: Context<'_, S>) { - self.inner.on_record(span, values, ctx) - } - fn on_follows_from(&self, span: &Id, follows: &Id, ctx: Context<'_, S>) { - self.inner.on_follows_from(span, follows, ctx) - } - fn on_enter(&self, id: &Id, ctx: Context<'_, S>) { - self.inner.on_enter(id, ctx) - } - fn on_exit(&self, id: &Id, ctx: Context<'_, S>) { - self.inner.on_exit(id, ctx) - } - fn on_close(&self, id: Id, ctx: Context<'_, S>) { - self.inner.on_close(id, ctx) - } - fn on_id_change(&self, old: &Id, new: &Id, ctx: Context<'_, S>) { - self.inner.on_id_change(old, new, ctx) - } -} - -/// A union filter is a filtering Layer that returns "enabled: true" -/// for events for which at least one of the inner filters returns "enabled: true". -/// -/// This is the opposite of what the main subscriber's layered chain does, where -/// if one filter says nope, the event is filtered out. -/// -/// Since there is a blanked `impl Layer for Option` that returns -/// "enabled: true" when the option is None, it would be very confusing if an -/// user accidentally passed such a none to the filter vector in the union filter. -/// However it's quite tempting to pass an optional layer around hoping it "does the right thing", -/// since it does indeed work in other places a layer is passed to a subscriber builder -/// (moreover, due to the static typing builder pattern used there, there is no way of -/// conditionally adding a filter other than passing an optional filter). Thus, it would be -/// (and it happened in practice) quite confusing for this API to technically accept optional -/// layers and silently do the wrong thing with them. -/// For this reason the [`UnionFilter::new`] method accepts a vector of [`Option`][option]s. -/// It's not perfect, since a user could pass an `Option>` but hopefully -/// -/// This filter is intended to be used together with the [`FilteredLayer`] layer -/// which will filter unwanted events for each of the. -/// -/// This [`UnionFilter`] and the [`FilteredLayer`] are likely to share filters. -/// Unfortunately the [`EnvFilter`][envfilter] doesn't implement [`Clone`]. -/// See [`CloneableEnvFilter`] for a workaround. -/// -/// [envfilter]: tracing_subscriber::EnvFilter -/// [option]: std::option::Option -pub struct UnionFilter -where - S: Subscriber, -{ - inner: Vec + Send + Sync + 'static>>, - _phantom_data: PhantomData, -} - -impl UnionFilter -where - S: Subscriber, -{ - pub fn new(inner: Vec + Send + Sync + 'static>>>) -> Self { - let inner = inner.into_iter().flatten().collect(); - Self { - inner, - _phantom_data: Default::default(), - } - } -} - -impl Layer for UnionFilter -where - S: Subscriber, -{ - /// Return the disjunction of all the enabled flags of all the inner filters which are not None. - /// - /// A None filter doesn't really mean "I want this event", nor it means "I want to filter this event out"; - /// it just means "please ignore me, I'm not really a filter". - /// - /// Yet, there is a blanked implementation of `Option` for all `L: Layer`, and its implementation - /// if the `enabled()` method returns true. This works because the normal subscriber layer chain - /// performs a conjunction of each filter decision. - /// - /// However, the [`UnionFilter`] here is the opposite, we want to enable processing of one event - /// as long as one of the event filters registers interest in it. - fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool { - self.inner.iter().any(|i| i.enabled(metadata, ctx.clone())) - } -} - -impl std::fmt::Debug for UnionFilter -where - S: Subscriber, -{ - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.write_str("UnionFilter(...)") - } -} - -#[derive(Clone, Debug)] -pub struct CloneableEnvFilter(Arc); - -impl CloneableEnvFilter { - pub fn new(inner: EnvFilter) -> Self { - Self(Arc::new(inner)) - } -} - -impl Layer for CloneableEnvFilter -where - S: Subscriber, -{ - fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool { - self.0.enabled(metadata, ctx) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use observability_deps::tracing::{self, debug, error, info}; - use std::sync::{Arc, Mutex}; - use synchronized_writer::SynchronizedWriter; - use tracing_subscriber::{self, fmt, layer::SubscriberExt, EnvFilter}; - - // capture_two_streams is a test helper that sets up two independent tracing subscribers, each with - // a different filtering level and returns a tuple of the emitted lines for the respective streams. - // - // The first stream uses a textual format, the second stream uses a json format (just to make it clear they are - // indeed completely different exporters). - fn capture_two_streams( - filter1: EnvFilter, - filter2: EnvFilter, - workload: impl FnOnce(), - ) -> (Vec, Vec) { - let writer1 = Arc::new(Mutex::new(Vec::new())); - let writer1_captured = Arc::clone(&writer1); - let writer2 = Arc::new(Mutex::new(Vec::new())); - let writer2_captured = Arc::clone(&writer2); - - let layer1 = fmt::layer() - .with_target(false) - .with_ansi(false) - .without_time() - .with_writer(move || SynchronizedWriter::new(Arc::clone(&writer1_captured))); - - let layer2 = fmt::layer() - .json() - .with_target(false) - .with_ansi(false) - .without_time() - .with_writer(move || SynchronizedWriter::new(Arc::clone(&writer2_captured))); - - let subscriber = tracing_subscriber::Registry::default() - .with(FilteredLayer::new(filter1.and_then(layer1))) - .with(FilteredLayer::new(filter2.and_then(layer2))); - - tracing::subscriber::with_default(subscriber, workload); - - let lines = |writer: Arc>>| { - std::str::from_utf8(&writer.lock().unwrap()) - .unwrap() - .to_string() - .trim_end() - .split('\n') - .into_iter() - .map(String::from) - .collect() - }; - - (lines(writer1), lines(writer2)) - } - - #[test] - fn test_independent() { - let workload = || { - error!("foo"); - info!("bar"); - debug!("baz"); - }; - - let (lines1, lines2) = - capture_two_streams(EnvFilter::new("info"), EnvFilter::new("debug"), workload); - assert_eq!( - lines1, - vec!["ERROR foo".to_string(), " INFO bar".to_string()], - ); - assert_eq!( - lines2, - vec![ - r#"{"timestamp":"","level":"ERROR","fields":{"message":"foo"}}"#.to_string(), - r#"{"timestamp":"","level":"INFO","fields":{"message":"bar"}}"#.to_string(), - r#"{"timestamp":"","level":"DEBUG","fields":{"message":"baz"}}"#.to_string() - ], - ); - - let (lines1, lines2) = - capture_two_streams(EnvFilter::new("debug"), EnvFilter::new("info"), workload); - assert_eq!( - lines1, - vec![ - "ERROR foo".to_string(), - " INFO bar".to_string(), - "DEBUG baz".to_string(), - ], - ); - assert_eq!( - lines2, - vec![ - r#"{"timestamp":"","level":"ERROR","fields":{"message":"foo"}}"#.to_string(), - r#"{"timestamp":"","level":"INFO","fields":{"message":"bar"}}"#.to_string(), - ], - ); - } -} diff --git a/trogging/src/lib.rs b/trogging/src/lib.rs index 4858d0da64..95f1f32abf 100644 --- a/trogging/src/lib.rs +++ b/trogging/src/lib.rs @@ -13,20 +13,13 @@ #[cfg(feature = "structopt")] pub mod cli; pub mod config; -pub mod layered_tracing; -use crate::layered_tracing::{CloneableEnvFilter, FilteredLayer, UnionFilter}; pub use config::*; // Re-export tracing_subscriber pub use tracing_subscriber; use observability_deps::tracing::{self, Subscriber}; -use opentelemetry::{ - self, - sdk::{trace, Resource}, - KeyValue, -}; use std::cmp::min; use std::io; use std::io::Write; @@ -49,18 +42,6 @@ const MAX_LINE_LENGTH: usize = 16 * 1024 - 1; #[derive(Debug, Error)] pub enum Error { - #[error("Jaeger exporter selected but jaeger config passed to builder")] - JaegerConfigMissing, - - #[error("'jaeger' not supported with this build. Hint: recompile with appropriate features")] - JaegerNotBuilt {}, - - #[error("OTLP exporter selected but OTLP config passed to builder")] - OtlpConfigMissing, - - #[error("'otlp' not supported with this build. Hint: recompile with appropriate features")] - OtlpNotBuilt {}, - #[error("Cannot set global tracing subscriber")] SetGlobalDefaultError(#[from] tracing::dispatcher::SetGlobalDefaultError), @@ -77,15 +58,9 @@ pub struct Builder io::Stdout> { log_filter: Option, // used when log_filter is none. default_log_filter: EnvFilter, - traces_filter: Option, - traces_exporter: TracesExporter, - traces_sampler: TracesSampler, - traces_sampler_arg: f64, make_writer: W, with_target: bool, with_ansi: bool, - jaeger_config: Option, - otlp_config: Option, } impl Default for Builder { @@ -94,15 +69,9 @@ impl Default for Builder { log_format: LogFormat::Full, log_filter: None, default_log_filter: EnvFilter::try_new(Self::DEFAULT_LOG_FILTER).unwrap(), - traces_filter: None, - traces_exporter: TracesExporter::None, - traces_sampler: TracesSampler::ParentBasedTraceIdRatio, - traces_sampler_arg: 1.0, make_writer: io::stdout, with_target: true, with_ansi: true, - jaeger_config: None, - otlp_config: None, } } } @@ -125,14 +94,8 @@ impl Builder { log_format: self.log_format, log_filter: self.log_filter, default_log_filter: self.default_log_filter, - traces_filter: self.traces_filter, - traces_exporter: self.traces_exporter, - traces_sampler: self.traces_sampler, - traces_sampler_arg: self.traces_sampler_arg, with_target: self.with_target, with_ansi: self.with_ansi, - jaeger_config: self.jaeger_config, - otlp_config: self.otlp_config, } } } @@ -204,14 +167,8 @@ where log_format: self.log_format, log_filter: self.log_filter, default_log_filter: self.default_log_filter, - traces_filter: self.traces_filter, - traces_exporter: self.traces_exporter, - traces_sampler: self.traces_sampler, - traces_sampler_arg: self.traces_sampler_arg, with_target: self.with_target, with_ansi: self.with_ansi, - jaeger_config: self.jaeger_config, - otlp_config: self.otlp_config, } } @@ -232,149 +189,7 @@ where Self { with_ansi, ..self } } - /// Sets an optional event filter for the tracing pipeline. - /// - /// The filter will be parsed with [tracing_subscriber::EnvFilter] - /// and applied to all events before they reach the tracing exporter. - pub fn with_traces_filter(self, traces_filter: &Option) -> Self { - if let Some(traces_filter) = traces_filter { - let traces_filter = EnvFilter::try_new(traces_filter).unwrap(); - Self { - traces_filter: Some(traces_filter), - ..self - } - } else { - self - } - } - - pub fn with_traces_exporter(self, traces_exporter: TracesExporter) -> Self { - Self { - traces_exporter, - ..self - } - } - - pub fn with_traces_sampler( - self, - traces_sampler: TracesSampler, - traces_sampler_arg: f64, - ) -> Self { - Self { - traces_sampler, - traces_sampler_arg, - ..self - } - } - - pub fn with_jaeger_config(self, config: JaegerConfig) -> Self { - Self { - jaeger_config: Some(config), - ..self - } - } - - pub fn with_oltp_config(self, config: OtlpConfig) -> Self { - Self { - otlp_config: Some(config), - ..self - } - } - - fn construct_opentelemetry_tracer(&self) -> Result> { - let trace_config = { - let sampler = match self.traces_sampler { - TracesSampler::AlwaysOn => trace::Sampler::AlwaysOn, - TracesSampler::AlwaysOff => { - return Ok(None); - } - TracesSampler::TraceIdRatio => { - trace::Sampler::TraceIdRatioBased(self.traces_sampler_arg) - } - TracesSampler::ParentBasedAlwaysOn => { - trace::Sampler::ParentBased(Box::new(trace::Sampler::AlwaysOn)) - } - TracesSampler::ParentBasedAlwaysOff => { - trace::Sampler::ParentBased(Box::new(trace::Sampler::AlwaysOff)) - } - TracesSampler::ParentBasedTraceIdRatio => trace::Sampler::ParentBased(Box::new( - trace::Sampler::TraceIdRatioBased(self.traces_sampler_arg), - )), - }; - let resource = Resource::new(vec![KeyValue::new("service.name", "influxdb-iox")]); - trace::Config::default() - .with_sampler(sampler) - .with_resource(resource) - }; - - Ok(match self.traces_exporter { - TracesExporter::Jaeger => Some(self.construct_jaeger_tracer(trace_config)?), - TracesExporter::Otlp => Some(self.construct_otlp_tracer(trace_config)?), - TracesExporter::None => None, - }) - } - - #[cfg(feature = "jaeger")] - fn construct_jaeger_tracer(&self, trace_config: trace::Config) -> Result { - let config = self - .jaeger_config - .as_ref() - .ok_or(Error::JaegerConfigMissing)?; - let agent_endpoint = format!("{}:{}", config.agent_host.trim(), config.agent_port); - opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); - let builder = opentelemetry_jaeger::new_pipeline() - .with_trace_config(trace_config) - .with_agent_endpoint(agent_endpoint) - .with_service_name(&config.service_name) - .with_max_packet_size(config.max_packet_size); - - // Batching is hard to tune because the max batch size - // is not currently exposed as a tunable from the trace config, and even then - // it's defined in terms of max number of spans, and not their size in bytes. - // Thus we enable batching only when the MTU size is 65000 which is the value suggested - // by jaeger when exporting to localhost. - let tracer = if config.max_packet_size >= 65_000 { - builder.install_batch(opentelemetry::runtime::Tokio) - } else { - builder.install_simple() - } - .unwrap(); - Ok(tracer) - } - - #[cfg(not(feature = "jaeger"))] - fn construct_jaeger_tracer(&self, _trace_config: trace::Config) -> Result { - Err(Error::JaegerNotBuilt {}) - } - - #[cfg(feature = "otlp")] - fn construct_otlp_tracer(&self, trace_config: trace::Config) -> Result { - let config = self.otlp_config.as_ref().ok_or(Error::OtlpConfigMissing)?; - let jaeger_endpoint = format!("{}:{}", config.host.trim(), config.port); - Ok(opentelemetry_otlp::new_pipeline() - .with_trace_config(trace_config) - .with_endpoint(jaeger_endpoint) - .with_protocol(opentelemetry_otlp::Protocol::Grpc) - .with_tonic() - .install_batch(opentelemetry::runtime::Tokio) - .unwrap()) - } - - #[cfg(not(feature = "otlp"))] - fn construct_otlp_tracer(&self, _trace_config: trace::Config) -> Result { - Err(Error::OtlpNotBuilt {}) - } - pub fn build(self) -> Result { - let (traces_layer_filter, traces_layer_otel) = - match self.construct_opentelemetry_tracer()? { - None => (None, None), - Some(tracer) => ( - self.traces_filter, - Some(tracing_opentelemetry::OpenTelemetryLayer::new(tracer)), - ), - }; - let log_writer = self.make_writer; let log_format = self.log_format; let with_target = self.with_target; @@ -427,53 +242,37 @@ where let log_filter = self.log_filter.unwrap_or(self.default_log_filter); - // construct the union filter which allows us to skip evaluating the expensive field values unless - // at least one of the filters is interested in the events. - // e.g. consider: `debug!(foo=bar(), "baz");` - // `bar()` will only be called if either the log_filter or the traces_layer_filter is at debug level for that module. - let log_filter = CloneableEnvFilter::new(log_filter); - let traces_layer_filter = traces_layer_filter.map(CloneableEnvFilter::new); - let union_filter = UnionFilter::new(vec![ - Some(Box::new(log_filter.clone())), - traces_layer_filter.clone().map(|l| Box::new(l) as _), - ]); - - let subscriber = tracing_subscriber::Registry::default() - .with(union_filter) - .with(FilteredLayer::new( - log_filter - .and_then(log_format_full) - .and_then(log_format_pretty) - .and_then(log_format_json) - .and_then(log_format_logfmt), - )) - .with( - traces_layer_filter - .map(|filter| FilteredLayer::new(filter.and_then(traces_layer_otel))), - ); + let subscriber = tracing_subscriber::Registry::default().with( + log_filter + .and_then(log_format_full) + .and_then(log_format_pretty) + .and_then(log_format_json) + .and_then(log_format_logfmt), + ); Ok(subscriber) } /// Build a tracing subscriber and install it as a global default subscriber for all threads. /// - /// It returns a RAII guard that will ensure all events are flushed to the tracing exporter. - pub fn install_global(self) -> Result { + /// It returns a RAII guard that will ensure all events are flushed on drop + pub fn install_global(self) -> Result { let subscriber = self.build()?; tracing::subscriber::set_global_default(subscriber)?; tracing_log::LogTracer::init()?; - Ok(TracingGuard) + Ok(TroggingGuard) } } -/// A RAII guard. On Drop, tracing and OpenTelemetry are flushed and shut down. +/// A RAII guard. On Drop, ensures all events are flushed +/// +/// Note: This is currently unnecessary but has been kept in case we choose to +/// switch to using tracing-appender which writes logs in a background worker #[derive(Debug)] -pub struct TracingGuard; +pub struct TroggingGuard; -impl Drop for TracingGuard { - fn drop(&mut self) { - opentelemetry::global::shutdown_tracer_provider(); - } +impl Drop for TroggingGuard { + fn drop(&mut self) {} } fn make_writer(m: M) -> BoxMakeWriter