From df9dd15ab8b31bc46ab66609801f765c6e184f8b Mon Sep 17 00:00:00 2001 From: Jacob Marble Date: Fri, 16 Apr 2021 08:59:29 -0700 Subject: [PATCH] feat(tracing): improve logs and tracing Logs and traces are emitted via one pipeline. For now, it is not possible to emit both, but it should be possible in a few weeks, as tokio/tracing/tracing-subscriber is going through some refactoring recently. All affected flags are well-documented, and I have tested all but the OTLP output flags. chore: clippy happy chore: revert instrumentation changes feat: add log format logfmt, log destinations stderr, stdout chore: clippy happy --- Cargo.lock | 61 ++++--- Cargo.toml | 4 +- logfmt/src/lib.rs | 68 +++---- observability_deps/Cargo.toml | 16 +- observability_deps/src/lib.rs | 2 +- src/commands/logging.rs | 141 --------------- src/commands/run.rs | 269 +++++++++++++++++++--------- src/commands/tracing.rs | 321 ++++++++++++++++++++++++++++++++++ src/influxdb_ioxd.rs | 8 +- src/main.rs | 54 +++--- 10 files changed, 625 insertions(+), 319 deletions(-) delete mode 100644 src/commands/logging.rs create mode 100644 src/commands/tracing.rs diff --git a/Cargo.lock b/Cargo.lock index 569114175b..71bd72f05c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2094,12 +2094,12 @@ name = "observability_deps" version = "0.1.0" dependencies = [ "env_logger", - "opentelemetry", + "opentelemetry 0.13.0", "opentelemetry-jaeger", + "opentelemetry-otlp", "opentelemetry-prometheus", "prometheus", "tracing", - "tracing-futures", "tracing-opentelemetry", "tracing-subscriber", ] @@ -2163,6 +2163,20 @@ name = "opentelemetry" version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "514d24875c140ed269eecc2d1b56d7b71b573716922a763c317fb1b1b4b58f15" +dependencies = [ + "dashmap", + "fnv", + "futures", + "js-sys", + "lazy_static", + "thiserror", +] + +[[package]] +name = "opentelemetry" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b91cea1dfd50064e52db033179952d18c770cbc5dfefc8eba45d619357ba3914" dependencies = [ "async-trait", "dashmap", @@ -2180,25 +2194,41 @@ dependencies = [ [[package]] name = "opentelemetry-jaeger" -version = "0.11.0" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5677b3a361784aff6e2b1b30dbdb5f85f4ec57ff2ced41d9a481ad70a9d0b57" +checksum = "ddd4984441954f9ebbe3eebdfc6fd4fa95be6400d403171228779b949f3cd918" dependencies = [ "async-trait", "lazy_static", - "opentelemetry", + "opentelemetry 0.13.0", "thiserror", "thrift", "tokio", ] +[[package]] +name = "opentelemetry-otlp" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09c19adec09e1d86bdc72cbc2dea6d7276d90d6d50ad430842446382a4ef440b" +dependencies = [ + "async-trait", + "futures", + "opentelemetry 0.13.0", + "prost", + "thiserror", + "tokio", + "tonic", + "tonic-build", +] + [[package]] name = "opentelemetry-prometheus" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c5407eac459699e03e83f55a557920c612d09f202b58f44212b5cdc4e8a7666e" dependencies = [ - "opentelemetry", + "opentelemetry 0.12.0", "prometheus", "protobuf", ] @@ -3980,27 +4010,15 @@ dependencies = [ "tracing", ] -[[package]] -name = "tracing-log" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6923477a48e41c1951f1999ef8bb5a3023eb723ceadafe78ffb65dc366761e3" -dependencies = [ - "lazy_static", - "log", - "tracing-core", -] - [[package]] name = "tracing-opentelemetry" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccdf13c28f1654fe806838f28c5b9cb23ca4c0eae71450daa489f50e523ceb1" +checksum = "99003208b647dae59dcefc49c98aecaa3512fbc29351685d4b9ef23a9218458e" dependencies = [ - "opentelemetry", + "opentelemetry 0.13.0", "tracing", "tracing-core", - "tracing-log", "tracing-subscriber", ] @@ -4033,7 +4051,6 @@ dependencies = [ "thread_local", "tracing", "tracing-core", - "tracing-log", "tracing-serde", ] diff --git a/Cargo.toml b/Cargo.toml index d2056a3556..9d004494b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "mem_qe", "mutable_buffer", "object_store", + "observability_deps", "packers", "panic_logging", "query", @@ -29,7 +30,6 @@ members = [ "server", "test_helpers", "tracker", - "observability_deps", "wal", ] @@ -53,13 +53,13 @@ logfmt = { path = "logfmt" } mem_qe = { path = "mem_qe" } mutable_buffer = { path = "mutable_buffer" } object_store = { path = "object_store" } +observability_deps = { path = "observability_deps" } packers = { path = "packers" } panic_logging = { path = "panic_logging" } query = { path = "query" } read_buffer = { path = "read_buffer" } server = { path = "server" } tracker = { path = "tracker" } -observability_deps = { path = "observability_deps" } wal = { path = "wal" } # Crates.io dependencies, in alphabetical order diff --git a/logfmt/src/lib.rs b/logfmt/src/lib.rs index cda244768f..3adeb075fb 100644 --- a/logfmt/src/lib.rs +++ b/logfmt/src/lib.rs @@ -1,11 +1,11 @@ -use observability_deps::tracing::{ - self, - field::{Field, Visit}, - subscriber::Interest, - Id, Level, Subscriber, -}; -use observability_deps::tracing_subscriber::{ - fmt::MakeWriter, layer::Context, registry::LookupSpan, Layer, +use observability_deps::{ + tracing::{ + self, + field::{Field, Visit}, + subscriber::Interest, + Id, Level, Subscriber, + }, + tracing_subscriber::{fmt::MakeWriter, layer::Context, registry::LookupSpan, Layer}, }; use std::borrow::Cow; use std::{io::Write, time::SystemTime}; @@ -64,19 +64,6 @@ where Interest::always() } - fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) { - let writer = self.writer.make_writer(); - let mut p = FieldPrinter::new(writer, event.metadata().level()); - // record fields - event.record(&mut p); - if let Some(span) = ctx.lookup_current() { - p.write_span_id(&span.id()) - } - // record source information - p.write_source_info(event); - p.write_timestamp(); - } - fn new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) { let writer = self.writer.make_writer(); let metadata = ctx.metadata(id).expect("span should have metadata"); @@ -90,6 +77,19 @@ where fn max_level_hint(&self) -> Option { None } + + fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) { + let writer = self.writer.make_writer(); + let mut p = FieldPrinter::new(writer, event.metadata().level()); + // record fields + event.record(&mut p); + if let Some(span) = ctx.lookup_current() { + p.write_span_id(&span.id()) + } + // record source information + p.write_source_info(event); + p.write_timestamp(); + } } /// This thing is responsible for actually printing log information to @@ -158,18 +158,6 @@ impl Drop for FieldPrinter { } impl Visit for FieldPrinter { - fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { - // Note this appears to be invoked via `debug!` and `info! macros - let formatted_value = format!("{:?}", value); - write!( - self.writer, - " {}={}", - translate_field_name(field.name()), - quote_and_escape(&formatted_value) - ) - .ok(); - } - fn record_i64(&mut self, field: &Field, value: i64) { write!( self.writer, @@ -231,6 +219,18 @@ impl Visit for FieldPrinter { ) .ok(); } + + fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) { + // Note this appears to be invoked via `debug!` and `info! macros + let formatted_value = format!("{:?}", value); + write!( + self.writer, + " {}={}", + translate_field_name(field.name()), + quote_and_escape(&formatted_value) + ) + .ok(); + } } /// return true if the string value already starts/ends with quotes and is @@ -241,7 +241,7 @@ fn needs_quotes_and_escaping(value: &str) -> bool { return true; } - // ignore begining/ending quotes, if any + // ignore beginning/ending quotes, if any let pre_quoted = value.len() >= 2 && value.starts_with('"') && value.ends_with('"'); let value = if pre_quoted { diff --git a/observability_deps/Cargo.toml b/observability_deps/Cargo.toml index a852d0b6bf..db1d976c6e 100644 --- a/observability_deps/Cargo.toml +++ b/observability_deps/Cargo.toml @@ -6,12 +6,12 @@ edition = "2018" description = "Observability ecosystem dependencies for InfluxDB IOx, to ensure consistent versions and unified updates" [dependencies] # In alphabetical order -env_logger = "0.8.3" -opentelemetry = { version = "0.12", default-features = false, features = ["trace", "metrics", "tokio-support"] } -opentelemetry-jaeger = { version = "0.11", features = ["tokio"] } -opentelemetry-prometheus = "0.5.0" +env_logger = "0.8" +opentelemetry = { version = "0.13", default-features = false, features = ["trace", "metrics", "rt-tokio"] } +opentelemetry-jaeger = { version = "0.12", features = ["tokio"] } +opentelemetry-otlp = "0.6" +opentelemetry-prometheus = "0.5" prometheus = "0.11" -tracing = { version = "0.1", features = ["release_max_level_debug"] } -tracing-futures = "0.2.4" -tracing-opentelemetry = "0.11.0" -tracing-subscriber = { version = "0.2.15", features = ["parking_lot"] } +tracing = { version = "0.1", features = ["max_level_trace", "release_max_level_debug"] } +tracing-opentelemetry = { version = "0.12.0", default-features = false } +tracing-subscriber = { version = "0.2", default-features = false, features = ["env-filter", "smallvec", "chrono", "parking_lot", "registry", "fmt", "ansi", "json"] } diff --git a/observability_deps/src/lib.rs b/observability_deps/src/lib.rs index dd3eba8956..cec0960df5 100644 --- a/observability_deps/src/lib.rs +++ b/observability_deps/src/lib.rs @@ -6,10 +6,10 @@ pub use env_logger; pub use opentelemetry; pub use opentelemetry_jaeger; +pub use opentelemetry_otlp; pub use opentelemetry_prometheus; pub use prometheus; pub use tracing; pub use tracing::instrument; -pub use tracing_futures; pub use tracing_opentelemetry; pub use tracing_subscriber; diff --git a/src/commands/logging.rs b/src/commands/logging.rs deleted file mode 100644 index a500c98f17..0000000000 --- a/src/commands/logging.rs +++ /dev/null @@ -1,141 +0,0 @@ -//! Logging initization and setup - -use observability_deps::{ - env_logger, opentelemetry_jaeger, tracing_opentelemetry, - tracing_subscriber::{self, prelude::*, EnvFilter}, -}; - -use super::run::{Config, LogFormat}; - -/// Handles setting up logging levels -#[derive(Debug)] -pub enum LoggingLevel { - // Default log level is warn level for all components - Default, - - // Verbose log level is info level for all components - Verbose, - - // Debug log level is debug for everything except - // some especially noisy low level libraries - Debug, -} - -impl LoggingLevel { - /// Creates a logging level usig the following rules. - /// - /// 1. if `-vv` (multiple instances of verbose), use Debug - /// 2. if `-v` (single instances of verbose), use Verbose - /// 3. Otherwise use Default - pub fn new(num_verbose: u64) -> Self { - match num_verbose { - 0 => Self::Default, - 1 => Self::Verbose, - _ => Self::Debug, - } - } - - /// Return a LoggingLevel that represents the most verbose logging - /// of `self` and `other` - pub fn combine(self, other: Self) -> Self { - Self::new(std::cmp::max(self as u64, other as u64)) - } - - /// set RUST_LOG to the level represented by self, unless RUST_LOG - /// is already set - fn set_rust_log_if_needed(&self, level: Option) { - /// Default debug level is debug for everything except - /// some especially noisy low level libraries - const DEFAULT_DEBUG_LOG_LEVEL: &str = "debug,hyper::proto::h1=info,h2=info"; - - // Default verbose log level is info level for all components - const DEFAULT_VERBOSE_LOG_LEVEL: &str = "info"; - - // Default log level is warn level for all components - const DEFAULT_LOG_LEVEL: &str = "info"; - - match level { - Some(lvl) => { - if !matches!(self, Self::Default) { - eprintln!( - "WARNING: Using RUST_LOG='{}' environment, ignoring -v command line", - lvl - ); - } else { - std::env::set_var("RUST_LOG", lvl); - } - } - None => { - match self { - Self::Default => std::env::set_var("RUST_LOG", DEFAULT_LOG_LEVEL), - Self::Verbose => std::env::set_var("RUST_LOG", DEFAULT_VERBOSE_LOG_LEVEL), - Self::Debug => std::env::set_var("RUST_LOG", DEFAULT_DEBUG_LOG_LEVEL), - }; - } - } - } - - /// Configures basic logging for 'simple' command line tools. Note - /// this does not setup tracing or open telemetry - pub fn setup_basic_logging(&self) { - self.set_rust_log_if_needed(std::env::var("RUST_LOG").ok()); - env_logger::init(); - } - - /// Configures logging and tracing, based on the configuration - /// values, for the IOx server (the whole enchalada) - pub fn setup_logging(&self, config: &Config) -> Option { - // Copy anything from the config to the rust log environment - self.set_rust_log_if_needed(config.rust_log.clone()); - - // Configure the OpenTelemetry tracer, if requested. - let (opentelemetry, drop_handle) = - if std::env::var("OTEL_EXPORTER_JAEGER_AGENT_HOST").is_ok() { - // For now, configure open telemetry directly from the - // environment. Eventually it would be cool to document - // all of the open telemetry options in IOx and pass them - // explicitly to opentelemetry for additional visibility - let (tracer, drop_handle) = opentelemetry_jaeger::new_pipeline() - .with_service_name("iox") - .from_env() - .install() - .expect("failed to initialise the Jaeger tracing sink"); - - // Initialise the opentelemetry tracing layer, giving it the jaeger emitter - let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); - - (Some(opentelemetry), Some(drop_handle)) - } else { - (None, None) - }; - - // Register the chain of event subscribers: - // - // - Jaeger tracing emitter - // - Env filter (using RUST_LOG as the filter env) - // - A stderr logger - // - let subscriber = tracing_subscriber::registry() - .with(opentelemetry) - // filter messages to only those specified by RUST_LOG environment - .with(EnvFilter::from_default_env()); - - // Configure the logger to write to stderr and install it - let output_stream = std::io::stderr; - - let log_format = config.log_format.as_ref().cloned().unwrap_or_default(); - - match log_format { - LogFormat::Rust => { - let logger = tracing_subscriber::fmt::layer().with_writer(output_stream); - subscriber.with(logger).init(); - } - LogFormat::LogFmt => { - let logger = logfmt::LogFmtLayer::new(output_stream); - subscriber.with(logger).init(); - } - }; - - drop_handle - } -} diff --git a/src/commands/run.rs b/src/commands/run.rs index 3bc7efb002..908c7a8946 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -1,8 +1,9 @@ //! Implementation of command line option for running server -use crate::commands::logging::LoggingLevel; +use crate::commands::tracing; use crate::influxdb_ioxd; use clap::arg_enum; +use core::num::NonZeroU16; use std::num::NonZeroU32; use std::{net::SocketAddr, net::ToSocketAddrs, path::PathBuf}; use structopt::StructOpt; @@ -42,30 +43,34 @@ Configuration is loaded from the following sources (highest precedence first): - pre-configured default values" )] pub struct Config { - /// This controls the IOx server logging level, as described in - /// https://crates.io/crates/env_logger. + /// Logs: filter directive /// - /// Levels for different modules can be specified as well. For example + /// Configures log severity level filter, by target. + /// + /// Simplest options: error, warn, info, debug, trace + /// + /// Levels for different modules can be specified. For example /// `debug,hyper::proto::h1=info` specifies debug logging for all modules /// except for the `hyper::proto::h1' module which will only display info /// level logging. - #[structopt(long = "--log", env = "RUST_LOG")] - pub rust_log: Option, + /// + /// Extended syntax provided by `tracing-subscriber` includes span/field + /// filters. See the link above for more details. + /// + /// https://docs.rs/tracing-subscriber/0.2.17/tracing_subscriber/filter/struct.EnvFilter.html + #[structopt(long = "--log-filter", env = "LOG_FILTER", default_value = "warn")] + pub log_filter: String, - /// Log message format. Can be one of: + /// Logs: filter short-hand /// - /// "rust" (default) - /// "logfmt" (logfmt/Heroku style - https://brandur.org/logfmt) - #[structopt(long = "--log_format", env = "INFLUXDB_IOX_LOG_FORMAT")] - pub log_format: Option, - - /// This sets logging up with a pre-configured set of convenient log levels. + /// Convenient way to set log severity level filter. + /// Overrides --log-filter / LOG_FILTER. /// - /// -v means 'info' log levels - /// -vv means 'verbose' log level (with the exception of some particularly - /// low level libraries) + /// -v 'info' /// - /// This option is ignored if --log / RUST_LOG are set + /// -vv 'debug,hyper::proto::h1=info,h2=info' + /// + /// -vvv 'trace,hyper::proto::h1=info,h2=info' #[structopt( short = "-v", long = "--verbose", @@ -73,7 +78,170 @@ pub struct Config { takes_value = false, parse(from_occurrences) )] - pub verbose_count: u64, + pub log_verbose_count: u8, + + #[rustfmt::skip] + /// Logs: message format + /// + /// Can be one of: + /// + /// full: human-readable, single line + /// + /// Oct 24 12:55:47.815 ERROR shaving_yaks{yaks=3}: fmt::yak_shave: failed to shave yak yak=3 error=missing yak + /// Oct 24 12:55:47.815 TRACE shaving_yaks{yaks=3}: fmt::yak_shave: yaks_shaved=2 + /// Oct 24 12:55:47.815 INFO fmt: yak shaving completed all_yaks_shaved=false + /// + /// pretty: human-readable, multi line + /// + /// Oct 24 12:57:29.387 fmt_pretty::yak_shave: failed to shave yak, yak: 3, error: missing yak + /// at examples/examples/fmt/yak_shave.rs:48 on main + /// in fmt_pretty::yak_shave::shaving_yaks with yaks: 3 + /// + /// Oct 24 12:57:29.387 fmt_pretty::yak_shave: yaks_shaved: 2 + /// at examples/examples/fmt/yak_shave.rs:52 on main + /// in fmt_pretty::yak_shave::shaving_yaks with yaks: 3 + /// + /// Oct 24 12:57:29.387 fmt_pretty: yak shaving completed, all_yaks_shaved: false + /// at examples/examples/fmt-pretty.rs:19 on main + /// + /// json: machine-parseable + /// + /// {"timestamp":"Oct 24 13:00:00.875","level":"ERROR","fields":{"message":"failed to shave yak","yak":3,"error":"missing yak"},"target":"fmt_json::yak_shave","spans":[{"yaks":3,"name":"shaving_yaks"}]} + /// {"timestamp":"Oct 24 13:00:00.875","level":"TRACE","fields":{"yaks_shaved":2},"target":"fmt_json::yak_shave","spans":[{"yaks":3,"name":"shaving_yaks"}]} + /// {"timestamp":"Oct 24 13:00:00.875","level":"INFO","fields":{"message":"yak shaving completed","all_yaks_shaved":false},"target":"fmt_json"} + #[structopt(long = "--log-format", env = "LOG_FORMAT", default_value = "full", verbatim_doc_comment)] + pub log_format: tracing::LogFormat, + + /// Logs: destination + /// + /// Can be one of: stdout, stderr + /// + /// TODO(jacobmarble): consider adding file path, file rotation, syslog, ? + #[structopt( + long = "--log-destination", + env = "LOG_DESTINATION", + default_value = "stdout", + verbatim_doc_comment + )] + pub log_destination: tracing::LogDestination, + + /// Tracing: exporter type + /// + /// Can be one of: none, jaeger, otlp + /// + /// When enabled, additional flags are considered (see flags related to OTLP + /// and Jaeger), and log output is disabled. + /// + /// TODO(jacobmarble): allow logs and traces simultaneously + #[structopt( + long = "--traces-exporter", + env = "TRACES_EXPORTER", + default_value = "none" + )] + pub traces_exporter: tracing::TracesExporter, + + /// Tracing: filter directive + /// + /// Configures traces severity level filter, by target. + /// + /// Simplest options: error, warn, info, debug, trace + /// + /// Levels for different modules can be specified. For example + /// `debug,hyper::proto::h1=info` specifies debug logging for all modules + /// except for the `hyper::proto::h1' module which will only display info + /// level logging. + /// + /// Extended syntax provided by `tracing-subscriber` includes span/field + /// filters. See the link below for more details. + /// + /// No filter by default. + /// + /// https://docs.rs/tracing-subscriber/0.2.17/tracing_subscriber/filter/struct.EnvFilter.html + #[structopt(long = "--traces-filter", env = "TRACES_FILTER")] + pub traces_filter: Option, + + /// Tracing: sampler type + /// + /// Can be one of: + /// always_on, always_off, traceidratio, + /// parentbased_always_on, parentbased_always_off, parentbased_traceidratio + /// + /// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.1.0/specification/sdk-environment-variables.md#general-sdk-configuration + #[structopt( + long = "--traces-sampler", + env = "TRACES_SAMPLER", + default_value = "parentbased_traceidratio" + )] + pub traces_sampler: tracing::TracesSampler, + + #[rustfmt::skip] + /// Tracing: sampler argument + /// + /// Valid range: [0.0, 1.0]. + /// + /// Only used if OTEL_TRACES_SAMPLER is set to + /// parentbased_traceidratio (default) or traceidratio. + /// + /// With sample parentbased_traceidratio, the following rules apply: + /// - if parent is sampled, then all of its children are sampled + /// - else sample this portion of traces (0.5 = 50%) + /// + /// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.1.0/specification/sdk-environment-variables.md#general-sdk-configuration + #[structopt( + long = "--traces-sampler-arg", + env = "TRACES_SAMPLER_ARG", + default_value = "1.0", + verbatim_doc_comment + )] + pub traces_sampler_arg: f64, + + /// Tracing: OTLP (eg OpenTelemetry collector) network hostname + /// + /// Only used if TRACES_EXPORTER is "otlp". + /// + /// Protocol is gRPC. HTTP is not supported. + #[structopt( + long = "--traces-exporter-otlp-host", + env = "TRACES_EXPORTER_OTLP_HOST", + default_value = "localhost" + )] + pub traces_exporter_otlp_host: String, + + /// Tracing: OTLP (eg OpenTelemetry collector) network port + /// + /// Only used if TRACES_EXPORTER is "otlp". + /// + /// Protocol is gRPC. HTTP is not supported. + #[structopt( + long = "--traces-exporter-otlp-port", + env = "TRACES_EXPORTER_OTLP_PORT", + default_value = "4317" + )] + pub traces_exporter_otlp_port: NonZeroU16, + + /// Tracing: Jaeger agent network hostname + /// + /// Protocol is Thrift/Compact over UDP. + /// + /// Only used if TRACES_EXPORTER is "jaeger". + #[structopt( + long = "--traces-exporter-jaeger-agent-host", + env = "TRACES_EXPORTER_JAEGER_AGENT_HOST", + default_value = "0.0.0.0" + )] + pub traces_exporter_jaeger_agent_host: String, + + /// Tracing: Jaeger agent network port + /// + /// Protocol is Thrift/Compact over UDP. + /// + /// Only used if TRACES_EXPORTER is "jaeger". + #[structopt( + long = "--traces-exporter-jaeger-agent-port", + env = "TRACES_EXPORTER_JAEGER_AGENT_PORT", + default_value = "6831" + )] + pub traces_exporter_jaeger_agent_port: NonZeroU16, /// The identifier for the server. /// @@ -214,29 +382,10 @@ Possible values (case insensitive): /// environments. #[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")] pub azure_storage_access_key: Option, - - /// If set, Jaeger traces are emitted to this host - /// using the OpenTelemetry tracer. - /// - /// NOTE: The OpenTelemetry agent CAN ONLY be - /// configured using environment variables. It CAN NOT be configured - /// using the command line at this time. Some useful variables: - /// - /// * OTEL_SERVICE_NAME: emitter service name (iox by default) - /// * OTEL_EXPORTER_JAEGER_AGENT_HOST: hostname/address of the collector - /// * OTEL_EXPORTER_JAEGER_AGENT_PORT: listening port of the collector. - /// - /// The entire list of variables can be found in - /// https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#jaeger-exporter - #[structopt( - long = "--oetl_exporter_jaeger_agent", - env = "OTEL_EXPORTER_JAEGER_AGENT_HOST" - )] - pub jaeger_host: Option, } -pub async fn command(logging_level: LoggingLevel, config: Config) -> Result<()> { - Ok(influxdb_ioxd::main(logging_level, config).await?) +pub async fn command(config: Config) -> Result<()> { + Ok(influxdb_ioxd::main(config).await?) } fn parse_socket_addr(s: &str) -> std::io::Result { @@ -260,48 +409,6 @@ arg_enum! { } } -/// How to format output logging messages -#[derive(Debug, Clone, Copy)] -pub enum LogFormat { - /// Default formatted logging - /// - /// Example: - /// ``` - /// level=warn msg="NO PERSISTENCE: using memory for object storage" target="influxdb_iox::influxdb_ioxd" - /// ``` - Rust, - - /// Use the (somwhat pretentiously named) Heroku / logfmt formatted output - /// format - /// - /// Example: - /// ``` - /// Jan 31 13:19:39.059 WARN influxdb_iox::influxdb_ioxd: NO PERSISTENCE: using memory for object storage - /// ``` - LogFmt, -} - -impl Default for LogFormat { - fn default() -> Self { - Self::Rust - } -} - -impl std::str::FromStr for LogFormat { - type Err = String; - - fn from_str(s: &str) -> Result { - match s.to_ascii_lowercase().as_str() { - "rust" => Ok(Self::Rust), - "logfmt" => Ok(Self::LogFmt), - _ => Err(format!( - "Invalid log format '{}'. Valid options: rust, logfmt", - s - )), - } - } -} - #[cfg(test)] mod tests { diff --git a/src/commands/tracing.rs b/src/commands/tracing.rs new file mode 100644 index 0000000000..7162292219 --- /dev/null +++ b/src/commands/tracing.rs @@ -0,0 +1,321 @@ +//! Log and trace initialization and setup + +use observability_deps::{ + opentelemetry, + opentelemetry::sdk::trace, + opentelemetry::sdk::Resource, + opentelemetry::KeyValue, + opentelemetry_jaeger, opentelemetry_otlp, tracing, tracing_opentelemetry, + tracing_subscriber::{self, fmt, layer::SubscriberExt, EnvFilter}, +}; + +/// Start simple logger. Panics on error. +pub fn init_simple_logs(log_verbose_count: u8) -> TracingGuard { + let log_layer_filter = match log_verbose_count { + 0 => EnvFilter::try_new("warn").unwrap(), + 1 => EnvFilter::try_new("info").unwrap(), + 2 => EnvFilter::try_new("debug,hyper::proto::h1=info,h2=info").unwrap(), + _ => EnvFilter::try_new("trace,hyper::proto::h1=info,h2=info").unwrap(), + }; + let subscriber = tracing_subscriber::Registry::default() + .with(log_layer_filter) + .with(fmt::layer()); + + let tracing_guard = tracing::subscriber::set_default(subscriber); + + TracingGuard(tracing_guard) +} + +/// Start log or trace emitter. Panics on error. +pub fn init_logs_and_tracing(config: &crate::commands::run::Config) -> TracingGuard { + let (traces_layer_filter, traces_layer_otel) = match construct_opentelemetry_tracer(config) { + None => (None, None), + Some(tracer) => { + let traces_layer_otel = Some(tracing_opentelemetry::OpenTelemetryLayer::new(tracer)); + match &config.traces_filter { + None => (None, traces_layer_otel), + Some(traces_filter) => ( + Some(EnvFilter::try_new(traces_filter).unwrap()), + traces_layer_otel, + ), + } + } + }; + + let ( + log_layer_filter, + log_layer_format_full, + log_layer_format_pretty, + log_layer_format_json, + log_layer_format_logfmt, + ) = { + match traces_layer_otel { + Some(_) => (None, None, None, None, None), + None => { + let log_writer = match config.log_destination { + LogDestination::Stdout => fmt::writer::BoxMakeWriter::new(std::io::stdout), + LogDestination::Stderr => fmt::writer::BoxMakeWriter::new(std::io::stderr), + }; + let (log_format_full, log_format_pretty, log_format_json, log_format_logfmt) = + match config.log_format { + LogFormat::Full => { + (Some(fmt::layer().with_writer(log_writer)), None, None, None) + } + LogFormat::Pretty => ( + None, + Some(fmt::layer().pretty().with_writer(log_writer)), + None, + None, + ), + LogFormat::Json => ( + None, + None, + Some(fmt::layer().json().with_writer(log_writer)), + None, + ), + LogFormat::Logfmt => { + (None, None, None, Some(logfmt::LogFmtLayer::new(log_writer))) + } + }; + + let log_layer_filter = match config.log_verbose_count { + 0 => EnvFilter::try_new(&config.log_filter).unwrap(), + 1 => EnvFilter::try_new("info").unwrap(), + 2 => EnvFilter::try_new("debug,hyper::proto::h1=info,h2=info").unwrap(), + _ => EnvFilter::try_new("trace,hyper::proto::h1=info,h2=info").unwrap(), + }; + ( + Some(log_layer_filter), + log_format_full, + log_format_pretty, + log_format_json, + log_format_logfmt, + ) + } + } + }; + + let subscriber = tracing_subscriber::Registry::default() + .with(log_layer_format_logfmt) + .with(log_layer_format_json) + .with(log_layer_format_pretty) + .with(log_layer_format_full) + .with(log_layer_filter) + .with(traces_layer_otel) + .with(traces_layer_filter); + + let tracing_guard = tracing::subscriber::set_default(subscriber); + + TracingGuard(tracing_guard) +} + +fn construct_opentelemetry_tracer(config: &crate::commands::run::Config) -> Option { + let trace_config = { + let sampler = match config.traces_sampler { + TracesSampler::AlwaysOn => trace::Sampler::AlwaysOn, + TracesSampler::AlwaysOff => { + return None; + } + TracesSampler::TraceIdRatio => { + trace::Sampler::TraceIdRatioBased(config.traces_sampler_arg) + } + TracesSampler::ParentBasedAlwaysOn => { + trace::Sampler::ParentBased(Box::new(trace::Sampler::AlwaysOn)) + } + TracesSampler::ParentBasedAlwaysOff => { + trace::Sampler::ParentBased(Box::new(trace::Sampler::AlwaysOff)) + } + TracesSampler::ParentBasedTraceIdRatio => trace::Sampler::ParentBased(Box::new( + trace::Sampler::TraceIdRatioBased(config.traces_sampler_arg), + )), + }; + let resource = Resource::new(vec![KeyValue::new("service.name", "influxdb-iox")]); + trace::Config::default() + .with_sampler(sampler) + .with_resource(resource) + }; + + match config.traces_exporter { + TracesExporter::Jaeger => { + let agent_endpoint = format!( + "{}:{}", + config.traces_exporter_jaeger_agent_host.trim(), + config.traces_exporter_jaeger_agent_port + ); + opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new()); + Some( + opentelemetry_jaeger::new_pipeline() + .with_trace_config(trace_config) + .with_agent_endpoint(agent_endpoint) + .install_batch(opentelemetry::runtime::Tokio) + .unwrap(), + ) + } + + TracesExporter::Otlp => { + let jaeger_endpoint = format!( + "{}:{}", + config.traces_exporter_otlp_host.trim(), + config.traces_exporter_otlp_port + ); + Some( + opentelemetry_otlp::new_pipeline() + .with_trace_config(trace_config) + .with_endpoint(jaeger_endpoint) + .with_protocol(opentelemetry_otlp::Protocol::Grpc) + .with_tonic() + .install_batch(opentelemetry::runtime::Tokio) + .unwrap(), + ) + } + + TracesExporter::None => None, + } +} + +/// An RAII guard. On Drop, tracing and OpenTelemetry are flushed and shut down. +pub struct TracingGuard(tracing::subscriber::DefaultGuard); + +impl Drop for TracingGuard { + fn drop(&mut self) { + opentelemetry::global::shutdown_tracer_provider(); + } +} + +#[derive(Debug, Clone, Copy)] +pub enum LogFormat { + Full, + Pretty, + Json, + Logfmt, +} + +impl std::str::FromStr for LogFormat { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_ascii_lowercase().as_str() { + "full" => Ok(Self::Full), + "pretty" => Ok(Self::Pretty), + "json" => Ok(Self::Json), + "logfmt" => Ok(Self::Logfmt), + _ => Err(format!( + "Invalid log format '{}'. Valid options: full, pretty, json", + s + )), + } + } +} + +impl std::fmt::Display for LogFormat { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Full => write!(f, "full"), + Self::Pretty => write!(f, "pretty"), + Self::Json => write!(f, "json"), + Self::Logfmt => write!(f, "logfmt"), + } + } +} + +#[derive(Debug, Clone, Copy)] +pub enum LogDestination { + Stdout, + Stderr, +} + +impl std::str::FromStr for LogDestination { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_ascii_lowercase().as_str() { + "stdout" => Ok(Self::Stdout), + "stderr" => Ok(Self::Stderr), + _ => Err(format!( + "Invalid log destination '{}'. Valid options: stdout, stderr", + s + )), + } + } +} + +impl std::fmt::Display for LogDestination { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Stdout => write!(f, "stdout"), + Self::Stderr => write!(f, "stderr"), + } + } +} + +#[derive(Debug, Clone, Copy)] +pub enum TracesExporter { + None, + Jaeger, + Otlp, +} + +impl std::str::FromStr for TracesExporter { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_ascii_lowercase().as_str() { + "none" => Ok(Self::None), + "jaeger" => Ok(Self::Jaeger), + "otlp" => Ok(Self::Otlp), + _ => Err(format!( + "Invalid traces exporter '{}'. Valid options: none, jaeger, otlp", + s + )), + } + } +} + +impl std::fmt::Display for TracesExporter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::None => write!(f, "none"), + Self::Jaeger => write!(f, "jaeger"), + Self::Otlp => write!(f, "otlp"), + } + } +} + +#[derive(Debug, Clone, Copy)] +pub enum TracesSampler { + AlwaysOn, + AlwaysOff, + TraceIdRatio, + ParentBasedAlwaysOn, + ParentBasedAlwaysOff, + ParentBasedTraceIdRatio, +} + +impl std::str::FromStr for TracesSampler { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_ascii_lowercase().as_str() { + "always_on" => Ok(Self::AlwaysOn), + "always_off" => Ok(Self::AlwaysOff), + "traceidratio" => Ok(Self::TraceIdRatio), + "parentbased_always_on" => Ok(Self::ParentBasedAlwaysOn), + "parentbased_always_off" => Ok(Self::ParentBasedAlwaysOff), + "parentbased_traceidratio" => Ok(Self::ParentBasedTraceIdRatio), + _ => Err(format!("Invalid traces sampler '{}'. Valid options: always_on, always_off, traceidratio, parentbased_always_on, parentbased_always_off, parentbased_traceidratio", s)), + } + } +} + +impl std::fmt::Display for TracesSampler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::AlwaysOn => write!(f, "always_on"), + Self::AlwaysOff => write!(f, "always_off"), + Self::TraceIdRatio => write!(f, "traceidratio"), + Self::ParentBasedAlwaysOn => write!(f, "parentbased_always_on"), + Self::ParentBasedAlwaysOff => write!(f, "parentbased_always_off"), + Self::ParentBasedTraceIdRatio => write!(f, "parentbased_traceidratio"), + } + } +} diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index 94bfa7362e..a02f3276f1 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -1,5 +1,4 @@ use crate::commands::{ - logging::LoggingLevel, metrics, run::{Config, ObjectStore as ObjStoreOpt}, }; @@ -103,12 +102,7 @@ async fn wait_for_signal() { /// /// The logging_level passed in is the global setting (e.g. if -v or /// -vv was passed in before 'server') -pub async fn main(logging_level: LoggingLevel, config: Config) -> Result<()> { - // Handle the case if -v/-vv is specified both before and after the server - // command - let logging_level = logging_level.combine(LoggingLevel::new(config.verbose_count)); - - let _drop_handle = logging_level.setup_logging(&config); +pub async fn main(config: Config) -> Result<()> { metrics::init_metrics(&config); // Install custom panic handler and forget about it. diff --git a/src/main.rs b/src/main.rs index 8888e0472d..21f236353f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,12 +10,12 @@ use std::str::FromStr; use dotenv::dotenv; -use observability_deps::tracing::{debug, warn}; use structopt::StructOpt; use tokio::runtime::Runtime; -use commands::logging::LoggingLevel; +use commands::tracing::{init_logs_and_tracing, init_simple_logs}; use ingest::parquet::writer::CompressionLevel; +use observability_deps::tracing::{debug, warn}; use tikv_jemallocator::Jemalloc; @@ -23,7 +23,6 @@ mod commands { pub mod convert; pub mod database; mod input; - pub mod logging; pub mod meta; pub mod metrics; pub mod operations; @@ -31,6 +30,7 @@ mod commands { pub mod server; pub mod server_remote; pub mod stats; + pub mod tracing; pub mod writer; } @@ -81,8 +81,24 @@ For example, a command such as the following shows all actions "# )] struct Config { - #[structopt(short, long, parse(from_occurrences))] - verbose: u64, + /// Log filter short-hand. + /// + /// Convenient way to set log severity level filter. + /// Overrides --log-filter / LOG_FILTER. + /// + /// -v 'info' + /// + /// -vv 'debug,hyper::proto::h1=info,h2=info' + /// + /// -vvv 'trace,hyper::proto::h1=info,h2=info' + #[structopt( + short = "-v", + long = "--verbose", + multiple = true, + takes_value = false, + parse(from_occurrences) + )] + pub log_verbose_count: u8, /// gRPC address of IOx server to connect to #[structopt( @@ -140,24 +156,17 @@ fn main() -> Result<(), std::io::Error> { let config = Config::from_args(); - // Logging level is determined via: - // 1. If RUST_LOG environment variable is set, use that value - // 2. if `-vv` (multiple instances of verbose), use DEFAULT_DEBUG_LOG_LEVEL - // 2. if `-v` (single instances of verbose), use DEFAULT_VERBOSE_LOG_LEVEL - // 3. Otherwise use DEFAULT_LOG_LEVEL - let logging_level = LoggingLevel::new(config.verbose); - let tokio_runtime = get_runtime(config.num_threads)?; tokio_runtime.block_on(async move { let host = config.host; + let log_verbose_count = config.log_verbose_count; match config.command { Command::Convert { input, output, compression_level, } => { - logging_level.setup_basic_logging(); - + let _tracing_guard = init_simple_logs(log_verbose_count); let compression_level = CompressionLevel::from_str(&compression_level).unwrap(); match commands::convert::convert(&input, &output, compression_level) { Ok(()) => debug!("Conversion completed successfully"), @@ -168,7 +177,7 @@ fn main() -> Result<(), std::io::Error> { } } Command::Meta { input } => { - logging_level.setup_basic_logging(); + let _tracing_guard = init_simple_logs(log_verbose_count); match commands::meta::dump_meta(&input) { Ok(()) => debug!("Metadata dump completed successfully"), Err(e) => { @@ -178,7 +187,7 @@ fn main() -> Result<(), std::io::Error> { } } Command::Stats(config) => { - logging_level.setup_basic_logging(); + let _tracing_guard = init_simple_logs(log_verbose_count); match commands::stats::stats(&config).await { Ok(()) => debug!("Storage statistics dump completed successfully"), Err(e) => { @@ -188,37 +197,36 @@ fn main() -> Result<(), std::io::Error> { } } Command::Database(config) => { - logging_level.setup_basic_logging(); + let _tracing_guard = init_simple_logs(log_verbose_count); if let Err(e) = commands::database::command(host, config).await { eprintln!("{}", e); std::process::exit(ReturnCode::Failure as _) } } Command::Writer(config) => { - logging_level.setup_basic_logging(); + let _tracing_guard = init_simple_logs(log_verbose_count); if let Err(e) = commands::writer::command(host, config).await { eprintln!("{}", e); std::process::exit(ReturnCode::Failure as _) } } Command::Operation(config) => { - logging_level.setup_basic_logging(); + let _tracing_guard = init_simple_logs(log_verbose_count); if let Err(e) = commands::operations::command(host, config).await { eprintln!("{}", e); std::process::exit(ReturnCode::Failure as _) } } Command::Server(config) => { - logging_level.setup_basic_logging(); + let _tracing_guard = init_simple_logs(log_verbose_count); if let Err(e) = commands::server::command(host, config).await { eprintln!("Server command failed: {}", e); std::process::exit(ReturnCode::Failure as _) } } Command::Run(config) => { - // Note don't set up basic logging here, different logging rules apply in server - // mode - if let Err(e) = commands::run::command(logging_level, *config).await { + let _tracing_guard = init_logs_and_tracing(&config); + if let Err(e) = commands::run::command(*config).await { eprintln!("Server command failed: {}", e); std::process::exit(ReturnCode::Failure as _) }