Merge pull request #1240 from influxdata/jgm-tracing-logging

feat(tracing): improve logs and tracing
pull/24376/head
kodiakhq[bot] 2021-04-20 20:21:13 +00:00 committed by GitHub
commit 70a82cc038
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 643 additions and 364 deletions

55
Cargo.lock generated
View File

@ -2083,10 +2083,10 @@ dependencies = [
"env_logger",
"opentelemetry",
"opentelemetry-jaeger",
"opentelemetry-otlp",
"opentelemetry-prometheus",
"prometheus",
"tracing",
"tracing-futures",
"tracing-opentelemetry",
"tracing-subscriber",
]
@ -2147,9 +2147,9 @@ dependencies = [
[[package]]
name = "opentelemetry"
version = "0.12.0"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "514d24875c140ed269eecc2d1b56d7b71b573716922a763c317fb1b1b4b58f15"
checksum = "b91cea1dfd50064e52db033179952d18c770cbc5dfefc8eba45d619357ba3914"
dependencies = [
"async-trait",
"dashmap",
@ -2167,9 +2167,9 @@ dependencies = [
[[package]]
name = "opentelemetry-jaeger"
version = "0.11.0"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5677b3a361784aff6e2b1b30dbdb5f85f4ec57ff2ced41d9a481ad70a9d0b57"
checksum = "ddd4984441954f9ebbe3eebdfc6fd4fa95be6400d403171228779b949f3cd918"
dependencies = [
"async-trait",
"lazy_static",
@ -2180,10 +2180,26 @@ dependencies = [
]
[[package]]
name = "opentelemetry-prometheus"
version = "0.5.0"
name = "opentelemetry-otlp"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5407eac459699e03e83f55a557920c612d09f202b58f44212b5cdc4e8a7666e"
checksum = "09c19adec09e1d86bdc72cbc2dea6d7276d90d6d50ad430842446382a4ef440b"
dependencies = [
"async-trait",
"futures",
"opentelemetry",
"prost",
"thiserror",
"tokio",
"tonic",
"tonic-build",
]
[[package]]
name = "opentelemetry-prometheus"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7162eac03f0bf5edbe2f2d4f44279904f62c779cd1c8260456446db2e1bf200d"
dependencies = [
"opentelemetry",
"prometheus",
@ -2550,16 +2566,16 @@ dependencies = [
[[package]]
name = "prometheus"
version = "0.11.0"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8425533e7122f0c3cc7a37e6244b16ad3a2cc32ae7ac6276e2a75da0d9c200d"
checksum = "5986aa8d62380092d2f50f8b1cdba9cb9b6731ffd4b25b51fd126b6c3e05b99c"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"memchr",
"parking_lot",
"protobuf",
"regex",
"thiserror",
]
@ -3957,27 +3973,15 @@ dependencies = [
"tracing",
]
[[package]]
name = "tracing-log"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6923477a48e41c1951f1999ef8bb5a3023eb723ceadafe78ffb65dc366761e3"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-opentelemetry"
version = "0.11.0"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccdf13c28f1654fe806838f28c5b9cb23ca4c0eae71450daa489f50e523ceb1"
checksum = "99003208b647dae59dcefc49c98aecaa3512fbc29351685d4b9ef23a9218458e"
dependencies = [
"opentelemetry",
"tracing",
"tracing-core",
"tracing-log",
"tracing-subscriber",
]
@ -4010,7 +4014,6 @@ dependencies = [
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-serde",
]

View File

@ -22,6 +22,7 @@ members = [
"mem_qe",
"mutable_buffer",
"object_store",
"observability_deps",
"packers",
"panic_logging",
"query",
@ -29,7 +30,6 @@ members = [
"server",
"test_helpers",
"tracker",
"observability_deps",
"wal",
]
@ -53,13 +53,13 @@ logfmt = { path = "logfmt" }
mem_qe = { path = "mem_qe" }
mutable_buffer = { path = "mutable_buffer" }
object_store = { path = "object_store" }
observability_deps = { path = "observability_deps" }
packers = { path = "packers" }
panic_logging = { path = "panic_logging" }
query = { path = "query" }
read_buffer = { path = "read_buffer" }
server = { path = "server" }
tracker = { path = "tracker" }
observability_deps = { path = "observability_deps" }
wal = { path = "wal" }
# Crates.io dependencies, in alphabetical order

View File

@ -1,11 +1,11 @@
use observability_deps::tracing::{
use observability_deps::{
tracing::{
self,
field::{Field, Visit},
subscriber::Interest,
Id, Level, Subscriber,
};
use observability_deps::tracing_subscriber::{
fmt::MakeWriter, layer::Context, registry::LookupSpan, Layer,
},
tracing_subscriber::{fmt::MakeWriter, layer::Context, registry::LookupSpan, Layer},
};
use std::borrow::Cow;
use std::{io::Write, time::SystemTime};
@ -64,19 +64,6 @@ where
Interest::always()
}
fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
let writer = self.writer.make_writer();
let mut p = FieldPrinter::new(writer, event.metadata().level());
// record fields
event.record(&mut p);
if let Some(span) = ctx.lookup_current() {
p.write_span_id(&span.id())
}
// record source information
p.write_source_info(event);
p.write_timestamp();
}
fn new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
let writer = self.writer.make_writer();
let metadata = ctx.metadata(id).expect("span should have metadata");
@ -90,6 +77,19 @@ where
fn max_level_hint(&self) -> Option<tracing::metadata::LevelFilter> {
None
}
fn on_event(&self, event: &tracing::Event<'_>, ctx: Context<'_, S>) {
let writer = self.writer.make_writer();
let mut p = FieldPrinter::new(writer, event.metadata().level());
// record fields
event.record(&mut p);
if let Some(span) = ctx.lookup_current() {
p.write_span_id(&span.id())
}
// record source information
p.write_source_info(event);
p.write_timestamp();
}
}
/// This thing is responsible for actually printing log information to
@ -158,18 +158,6 @@ impl<W: Write> Drop for FieldPrinter<W> {
}
impl<W: Write> Visit for FieldPrinter<W> {
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
// Note this appears to be invoked via `debug!` and `info! macros
let formatted_value = format!("{:?}", value);
write!(
self.writer,
" {}={}",
translate_field_name(field.name()),
quote_and_escape(&formatted_value)
)
.ok();
}
fn record_i64(&mut self, field: &Field, value: i64) {
write!(
self.writer,
@ -231,6 +219,18 @@ impl<W: Write> Visit for FieldPrinter<W> {
)
.ok();
}
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
// Note this appears to be invoked via `debug!` and `info! macros
let formatted_value = format!("{:?}", value);
write!(
self.writer,
" {}={}",
translate_field_name(field.name()),
quote_and_escape(&formatted_value)
)
.ok();
}
}
/// return true if the string value already starts/ends with quotes and is
@ -241,7 +241,7 @@ fn needs_quotes_and_escaping(value: &str) -> bool {
return true;
}
// ignore begining/ending quotes, if any
// ignore beginning/ending quotes, if any
let pre_quoted = value.len() >= 2 && value.starts_with('"') && value.ends_with('"');
let value = if pre_quoted {

View File

@ -6,12 +6,12 @@ edition = "2018"
description = "Observability ecosystem dependencies for InfluxDB IOx, to ensure consistent versions and unified updates"
[dependencies] # In alphabetical order
env_logger = "0.8.3"
opentelemetry = { version = "0.12", default-features = false, features = ["trace", "metrics", "tokio-support"] }
opentelemetry-jaeger = { version = "0.11", features = ["tokio"] }
opentelemetry-prometheus = "0.5.0"
prometheus = "0.11"
tracing = { version = "0.1", features = ["release_max_level_debug"] }
tracing-futures = "0.2.4"
tracing-opentelemetry = "0.11.0"
tracing-subscriber = { version = "0.2.15", features = ["parking_lot"] }
env_logger = "0.8"
opentelemetry = { version = "0.13", default-features = false, features = ["trace", "metrics", "rt-tokio"] }
opentelemetry-jaeger = { version = "0.12", features = ["tokio"] }
opentelemetry-otlp = "0.6"
opentelemetry-prometheus = "0.6"
prometheus = "0.12"
tracing = { version = "0.1", features = ["max_level_trace", "release_max_level_debug"] }
tracing-opentelemetry = { version = "0.12", default-features = false }
tracing-subscriber = { version = "0.2", default-features = false, features = ["env-filter", "smallvec", "chrono", "parking_lot", "registry", "fmt", "ansi", "json"] }

View File

@ -6,10 +6,10 @@
pub use env_logger;
pub use opentelemetry;
pub use opentelemetry_jaeger;
pub use opentelemetry_otlp;
pub use opentelemetry_prometheus;
pub use prometheus;
pub use tracing;
pub use tracing::instrument;
pub use tracing_futures;
pub use tracing_opentelemetry;
pub use tracing_subscriber;

View File

@ -1,141 +0,0 @@
//! Logging initization and setup
use observability_deps::{
env_logger, opentelemetry_jaeger, tracing_opentelemetry,
tracing_subscriber::{self, prelude::*, EnvFilter},
};
use super::run::{Config, LogFormat};
/// Handles setting up logging levels
#[derive(Debug)]
pub enum LoggingLevel {
// Default log level is warn level for all components
Default,
// Verbose log level is info level for all components
Verbose,
// Debug log level is debug for everything except
// some especially noisy low level libraries
Debug,
}
impl LoggingLevel {
/// Creates a logging level usig the following rules.
///
/// 1. if `-vv` (multiple instances of verbose), use Debug
/// 2. if `-v` (single instances of verbose), use Verbose
/// 3. Otherwise use Default
pub fn new(num_verbose: u64) -> Self {
match num_verbose {
0 => Self::Default,
1 => Self::Verbose,
_ => Self::Debug,
}
}
/// Return a LoggingLevel that represents the most verbose logging
/// of `self` and `other`
pub fn combine(self, other: Self) -> Self {
Self::new(std::cmp::max(self as u64, other as u64))
}
/// set RUST_LOG to the level represented by self, unless RUST_LOG
/// is already set
fn set_rust_log_if_needed(&self, level: Option<String>) {
/// Default debug level is debug for everything except
/// some especially noisy low level libraries
const DEFAULT_DEBUG_LOG_LEVEL: &str = "debug,hyper::proto::h1=info,h2=info";
// Default verbose log level is info level for all components
const DEFAULT_VERBOSE_LOG_LEVEL: &str = "info";
// Default log level is warn level for all components
const DEFAULT_LOG_LEVEL: &str = "info";
match level {
Some(lvl) => {
if !matches!(self, Self::Default) {
eprintln!(
"WARNING: Using RUST_LOG='{}' environment, ignoring -v command line",
lvl
);
} else {
std::env::set_var("RUST_LOG", lvl);
}
}
None => {
match self {
Self::Default => std::env::set_var("RUST_LOG", DEFAULT_LOG_LEVEL),
Self::Verbose => std::env::set_var("RUST_LOG", DEFAULT_VERBOSE_LOG_LEVEL),
Self::Debug => std::env::set_var("RUST_LOG", DEFAULT_DEBUG_LOG_LEVEL),
};
}
}
}
/// Configures basic logging for 'simple' command line tools. Note
/// this does not setup tracing or open telemetry
pub fn setup_basic_logging(&self) {
self.set_rust_log_if_needed(std::env::var("RUST_LOG").ok());
env_logger::init();
}
/// Configures logging and tracing, based on the configuration
/// values, for the IOx server (the whole enchalada)
pub fn setup_logging(&self, config: &Config) -> Option<opentelemetry_jaeger::Uninstall> {
// Copy anything from the config to the rust log environment
self.set_rust_log_if_needed(config.rust_log.clone());
// Configure the OpenTelemetry tracer, if requested.
let (opentelemetry, drop_handle) =
if std::env::var("OTEL_EXPORTER_JAEGER_AGENT_HOST").is_ok() {
// For now, configure open telemetry directly from the
// environment. Eventually it would be cool to document
// all of the open telemetry options in IOx and pass them
// explicitly to opentelemetry for additional visibility
let (tracer, drop_handle) = opentelemetry_jaeger::new_pipeline()
.with_service_name("iox")
.from_env()
.install()
.expect("failed to initialise the Jaeger tracing sink");
// Initialise the opentelemetry tracing layer, giving it the jaeger emitter
let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);
(Some(opentelemetry), Some(drop_handle))
} else {
(None, None)
};
// Register the chain of event subscribers:
//
// - Jaeger tracing emitter
// - Env filter (using RUST_LOG as the filter env)
// - A stderr logger
//
let subscriber = tracing_subscriber::registry()
.with(opentelemetry)
// filter messages to only those specified by RUST_LOG environment
.with(EnvFilter::from_default_env());
// Configure the logger to write to stderr and install it
let output_stream = std::io::stderr;
let log_format = config.log_format.as_ref().cloned().unwrap_or_default();
match log_format {
LogFormat::Rust => {
let logger = tracing_subscriber::fmt::layer().with_writer(output_stream);
subscriber.with(logger).init();
}
LogFormat::LogFmt => {
let logger = logfmt::LogFmtLayer::new(output_stream);
subscriber.with(logger).init();
}
};
drop_handle
}
}

View File

@ -45,18 +45,19 @@ pub static IOXD_METRICS: Lazy<SystemMetrics> = Lazy::new(SystemMetrics::new);
impl SystemMetrics {
fn new() -> Self {
let meter = opentelemetry::global::meter("iox");
Self {
lp_lines_errors: meter()
lp_lines_errors: meter
.u64_counter("ingest.lp.lines.errors")
.with_description("line protocol formatted lines which were rejected")
.init(),
lp_lines_success: meter()
lp_lines_success: meter
.u64_counter("ingest.lp.lines.success")
.with_description("line protocol formatted lines which were successfully loaded")
.init(),
lp_bytes_success: meter()
lp_bytes_success: meter
.u64_counter("ingest.lp.bytes.success")
.with_description("line protocol formatted bytes which were successfully loaded")
.init(),
@ -92,30 +93,6 @@ pub fn init_metrics_internal(exporter: opentelemetry_prometheus::PrometheusExpor
*guard = Some(exporter);
}
/// Returns a global [`Meter`] for reporting metrics. See
/// [`IOXD_METRICS`] for specific crate wide counters.
///
/// # Example
///
/// ```
/// let meter = crate::influxdb_ioxd::metrics::meter();
///
/// let counter = meter
/// .u64_counter("a.counter")
/// .with_description("Counts things")
/// .init();
/// let recorder = meter
/// .i64_value_recorder("a.value_recorder")
/// .with_description("Records values")
/// .init();
///
/// counter.add(100, &[KeyValue::new("key", "value")]);
/// recorder.record(100, &[KeyValue::new("key", "value")]);
/// ```
fn meter() -> opentelemetry::metrics::Meter {
opentelemetry::global::meter("iox")
}
/// Gets current metrics state, in UTF-8 encoded Prometheus Exposition Format.
/// https://prometheus.io/docs/instrumenting/exposition_formats/
///

View File

@ -1,8 +1,9 @@
//! Implementation of command line option for running server
use crate::commands::logging::LoggingLevel;
use crate::commands::tracing;
use crate::influxdb_ioxd;
use clap::arg_enum;
use core::num::NonZeroU16;
use std::num::NonZeroU32;
use std::{net::SocketAddr, net::ToSocketAddrs, path::PathBuf};
use structopt::StructOpt;
@ -42,30 +43,32 @@ Configuration is loaded from the following sources (highest precedence first):
- pre-configured default values"
)]
pub struct Config {
/// This controls the IOx server logging level, as described in
/// https://crates.io/crates/env_logger.
/// Logs: filter directive
///
/// Levels for different modules can be specified as well. For example
/// Configures log severity level filter, by target.
///
/// Simplest options: error, warn, info, debug, trace
///
/// Levels for different modules can be specified. For example
/// `debug,hyper::proto::h1=info` specifies debug logging for all modules
/// except for the `hyper::proto::h1' module which will only display info
/// level logging.
#[structopt(long = "--log", env = "RUST_LOG")]
pub rust_log: Option<String>,
///
/// Extended syntax provided by `tracing-subscriber` includes span/field
/// filters. See <https://docs.rs/tracing-subscriber/0.2.17/tracing_subscriber/filter/struct.EnvFilter.html> for more details.
#[structopt(long = "--log-filter", env = "LOG_FILTER", default_value = "warn")]
pub log_filter: String,
/// Log message format. Can be one of:
/// Logs: filter short-hand
///
/// "rust" (default)
/// "logfmt" (logfmt/Heroku style - https://brandur.org/logfmt)
#[structopt(long = "--log_format", env = "INFLUXDB_IOX_LOG_FORMAT")]
pub log_format: Option<LogFormat>,
/// This sets logging up with a pre-configured set of convenient log levels.
/// Convenient way to set log severity level filter.
/// Overrides `--log-filter`.
///
/// -v means 'info' log levels
/// -vv means 'verbose' log level (with the exception of some particularly
/// low level libraries)
/// -v 'info'
///
/// This option is ignored if --log / RUST_LOG are set
/// -vv 'debug,hyper::proto::h1=info,h2=info'
///
/// -vvv 'trace,hyper::proto::h1=info,h2=info'
#[structopt(
short = "-v",
long = "--verbose",
@ -73,7 +76,174 @@ pub struct Config {
takes_value = false,
parse(from_occurrences)
)]
pub verbose_count: u64,
pub log_verbose_count: u8,
#[rustfmt::skip]
/// Logs: message format
///
/// Can be one of:
///
/// full: human-readable, single line
///
/// Oct 24 12:55:47.815 ERROR shaving_yaks{yaks=3}: fmt::yak_shave: failed to shave yak yak=3 error=missing yak
/// Oct 24 12:55:47.815 TRACE shaving_yaks{yaks=3}: fmt::yak_shave: yaks_shaved=2
/// Oct 24 12:55:47.815 INFO fmt: yak shaving completed all_yaks_shaved=false
///
/// pretty: human-readable, multi line
///
/// Oct 24 12:57:29.387 fmt_pretty::yak_shave: failed to shave yak, yak: 3, error: missing yak
/// at examples/examples/fmt/yak_shave.rs:48 on main
/// in fmt_pretty::yak_shave::shaving_yaks with yaks: 3
///
/// Oct 24 12:57:29.387 fmt_pretty::yak_shave: yaks_shaved: 2
/// at examples/examples/fmt/yak_shave.rs:52 on main
/// in fmt_pretty::yak_shave::shaving_yaks with yaks: 3
///
/// Oct 24 12:57:29.387 fmt_pretty: yak shaving completed, all_yaks_shaved: false
/// at examples/examples/fmt-pretty.rs:19 on main
///
/// json: machine-parseable
///
/// {"timestamp":"Oct 24 13:00:00.875","level":"ERROR","fields":{"message":"failed to shave yak","yak":3,"error":"missing yak"},"target":"fmt_json::yak_shave","spans":[{"yaks":3,"name":"shaving_yaks"}]}
/// {"timestamp":"Oct 24 13:00:00.875","level":"TRACE","fields":{"yaks_shaved":2},"target":"fmt_json::yak_shave","spans":[{"yaks":3,"name":"shaving_yaks"}]}
/// {"timestamp":"Oct 24 13:00:00.875","level":"INFO","fields":{"message":"yak shaving completed","all_yaks_shaved":false},"target":"fmt_json"}
///
/// logfmt: human-readable and machine-parseable
///
/// level=info msg="This is an info message" target="logging" location="logfmt/tests/logging.rs:36" time=1612181556329599000
/// level=debug msg="This is a debug message" target="logging" location="logfmt/tests/logging.rs:37" time=1612181556329618000
/// level=trace msg="This is a trace message" target="logging" location="logfmt/tests/logging.rs:38" time=1612181556329634000
#[structopt(long = "--log-format", env = "LOG_FORMAT", default_value = "full", verbatim_doc_comment)]
pub log_format: tracing::LogFormat,
/// Logs: destination
///
/// Can be one of: stdout, stderr
//
// TODO(jacobmarble): consider adding file path, file rotation, syslog, ?
#[structopt(
long = "--log-destination",
env = "LOG_DESTINATION",
default_value = "stdout",
verbatim_doc_comment
)]
pub log_destination: tracing::LogDestination,
/// Tracing: exporter type
///
/// Can be one of: none, jaeger, otlp
///
/// When enabled, additional flags are considered (see flags related to OTLP
/// and Jaeger), and log output is disabled.
//
// TODO(jacobmarble): allow logs and traces simultaneously
#[structopt(
long = "--traces-exporter",
env = "TRACES_EXPORTER",
default_value = "none"
)]
pub traces_exporter: tracing::TracesExporter,
/// Tracing: filter directive
///
/// Configures traces severity level filter, by target.
///
/// Simplest options: error, warn, info, debug, trace
///
/// Levels for different modules can be specified. For example
/// `debug,hyper::proto::h1=info` specifies debug tracing for all modules
/// except for the `hyper::proto::h1` module which will only display info
/// level tracing.
///
/// Extended syntax provided by `tracing-subscriber` includes span/field
/// filters. See <https://docs.rs/tracing-subscriber/0.2.17/tracing_subscriber/filter/struct.EnvFilter.html> for more details.
///
/// No filter by default.
#[structopt(long = "--traces-filter", env = "TRACES_FILTER")]
pub traces_filter: Option<String>,
/// Tracing: sampler type
///
/// Can be one of:
/// always_on, always_off, traceidratio,
/// parentbased_always_on, parentbased_always_off, parentbased_traceidratio
///
/// These alternatives are described in detail at <https://github.com/open-telemetry/opentelemetry-specification/blob/v1.1.0/specification/sdk-environment-variables.md#general-sdk-configuration>.
#[structopt(
long = "--traces-sampler",
env = "TRACES_SAMPLER",
default_value = "parentbased_traceidratio"
)]
pub traces_sampler: tracing::TracesSampler,
#[rustfmt::skip]
/// Tracing: sampler argument
///
/// Valid range: [0.0, 1.0].
///
/// Only used if `--traces-sampler` is set to
/// parentbased_traceidratio (default) or traceidratio.
///
/// With sample parentbased_traceidratio, the following rules apply:
/// - if parent is sampled, then all of its children are sampled
/// - else sample this portion of traces (0.5 = 50%)
///
/// More details about this sampling argument at <https://github.com/open-telemetry/opentelemetry-specification/blob/v1.1.0/specification/sdk-environment-variables.md#general-sdk-configuration>.
#[structopt(
long = "--traces-sampler-arg",
env = "TRACES_SAMPLER_ARG",
default_value = "1.0",
verbatim_doc_comment
)]
pub traces_sampler_arg: f64,
/// Tracing: OTLP (eg OpenTelemetry collector) network hostname
///
/// Only used if `--traces-exporter` is "otlp".
///
/// Protocol is gRPC. HTTP is not supported.
#[structopt(
long = "--traces-exporter-otlp-host",
env = "TRACES_EXPORTER_OTLP_HOST",
default_value = "localhost"
)]
pub traces_exporter_otlp_host: String,
/// Tracing: OTLP (eg OpenTelemetry collector) network port
///
/// Only used if `--traces-exporter` is "otlp".
///
/// Protocol is gRPC. HTTP is not supported.
#[structopt(
long = "--traces-exporter-otlp-port",
env = "TRACES_EXPORTER_OTLP_PORT",
default_value = "4317"
)]
pub traces_exporter_otlp_port: NonZeroU16,
/// Tracing: Jaeger agent network hostname
///
/// Protocol is Thrift/Compact over UDP.
///
/// Only used if `--traces-exporter` is "jaeger".
#[structopt(
long = "--traces-exporter-jaeger-agent-host",
env = "TRACES_EXPORTER_JAEGER_AGENT_HOST",
default_value = "0.0.0.0"
)]
pub traces_exporter_jaeger_agent_host: String,
/// Tracing: Jaeger agent network port
///
/// Protocol is Thrift/Compact over UDP.
///
/// Only used if `--traces-exporter` is "jaeger".
#[structopt(
long = "--traces-exporter-jaeger-agent-port",
env = "TRACES_EXPORTER_JAEGER_AGENT_PORT",
default_value = "6831"
)]
pub traces_exporter_jaeger_agent_port: NonZeroU16,
/// The identifier for the server.
///
@ -109,7 +279,7 @@ pub struct Config {
/// The number of threads to use for the query worker pool.
///
/// IOx uses `--num-threads` threads for handling API requests and
/// will use a dedicated thread pool woth `--num-worker-threads`
/// will use a dedicated thread pool with `--num-worker-threads`
/// for running queries.
///
/// If not specified, defaults to the number of cores on the system
@ -214,29 +384,10 @@ Possible values (case insensitive):
/// environments.
#[structopt(long = "--azure-storage-access-key", env = "AZURE_STORAGE_ACCESS_KEY")]
pub azure_storage_access_key: Option<String>,
/// If set, Jaeger traces are emitted to this host
/// using the OpenTelemetry tracer.
///
/// NOTE: The OpenTelemetry agent CAN ONLY be
/// configured using environment variables. It CAN NOT be configured
/// using the command line at this time. Some useful variables:
///
/// * OTEL_SERVICE_NAME: emitter service name (iox by default)
/// * OTEL_EXPORTER_JAEGER_AGENT_HOST: hostname/address of the collector
/// * OTEL_EXPORTER_JAEGER_AGENT_PORT: listening port of the collector.
///
/// The entire list of variables can be found in
/// https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/sdk-environment-variables.md#jaeger-exporter
#[structopt(
long = "--oetl_exporter_jaeger_agent",
env = "OTEL_EXPORTER_JAEGER_AGENT_HOST"
)]
pub jaeger_host: Option<String>,
}
pub async fn command(logging_level: LoggingLevel, config: Config) -> Result<()> {
Ok(influxdb_ioxd::main(logging_level, config).await?)
pub async fn command(config: Config) -> Result<()> {
Ok(influxdb_ioxd::main(config).await?)
}
fn parse_socket_addr(s: &str) -> std::io::Result<SocketAddr> {
@ -260,48 +411,6 @@ arg_enum! {
}
}
/// How to format output logging messages
#[derive(Debug, Clone, Copy)]
pub enum LogFormat {
/// Default formatted logging
///
/// Example:
/// ```
/// level=warn msg="NO PERSISTENCE: using memory for object storage" target="influxdb_iox::influxdb_ioxd"
/// ```
Rust,
/// Use the (somwhat pretentiously named) Heroku / logfmt formatted output
/// format
///
/// Example:
/// ```
/// Jan 31 13:19:39.059 WARN influxdb_iox::influxdb_ioxd: NO PERSISTENCE: using memory for object storage
/// ```
LogFmt,
}
impl Default for LogFormat {
fn default() -> Self {
Self::Rust
}
}
impl std::str::FromStr for LogFormat {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"rust" => Ok(Self::Rust),
"logfmt" => Ok(Self::LogFmt),
_ => Err(format!(
"Invalid log format '{}'. Valid options: rust, logfmt",
s
)),
}
}
}
#[cfg(test)]
mod tests {

332
src/commands/tracing.rs Normal file
View File

@ -0,0 +1,332 @@
//! Log and trace initialization and setup
use observability_deps::{
opentelemetry,
opentelemetry::sdk::trace,
opentelemetry::sdk::Resource,
opentelemetry::KeyValue,
opentelemetry_jaeger, opentelemetry_otlp, tracing, tracing_opentelemetry,
tracing_subscriber::{self, fmt, layer::SubscriberExt, EnvFilter},
};
/// Start simple logger. Panics on error.
pub fn init_simple_logs(log_verbose_count: u8) -> TracingGuard {
let log_layer_filter = match log_verbose_count {
0 => EnvFilter::try_new("warn").unwrap(),
1 => EnvFilter::try_new("info").unwrap(),
2 => EnvFilter::try_new("debug,hyper::proto::h1=info,h2=info").unwrap(),
_ => EnvFilter::try_new("trace,hyper::proto::h1=info,h2=info").unwrap(),
};
let subscriber = tracing_subscriber::Registry::default()
.with(log_layer_filter)
.with(fmt::layer());
let tracing_guard = tracing::subscriber::set_default(subscriber);
TracingGuard(tracing_guard)
}
/// Start log or trace emitter. Panics on error.
pub fn init_logs_and_tracing(
log_verbose_count: u8,
config: &crate::commands::run::Config,
) -> TracingGuard {
// Handle the case if -v/-vv is specified both before and after the server
// command
let log_verbose_count = if log_verbose_count > config.log_verbose_count {
log_verbose_count
} else {
config.log_verbose_count
};
let (traces_layer_filter, traces_layer_otel) = match construct_opentelemetry_tracer(config) {
None => (None, None),
Some(tracer) => {
let traces_layer_otel = Some(tracing_opentelemetry::OpenTelemetryLayer::new(tracer));
match &config.traces_filter {
None => (None, traces_layer_otel),
Some(traces_filter) => (
Some(EnvFilter::try_new(traces_filter).unwrap()),
traces_layer_otel,
),
}
}
};
let (
log_layer_filter,
log_layer_format_full,
log_layer_format_pretty,
log_layer_format_json,
log_layer_format_logfmt,
) = {
match traces_layer_otel {
Some(_) => (None, None, None, None, None),
None => {
let log_writer = match config.log_destination {
LogDestination::Stdout => fmt::writer::BoxMakeWriter::new(std::io::stdout),
LogDestination::Stderr => fmt::writer::BoxMakeWriter::new(std::io::stderr),
};
let (log_format_full, log_format_pretty, log_format_json, log_format_logfmt) =
match config.log_format {
LogFormat::Full => {
(Some(fmt::layer().with_writer(log_writer)), None, None, None)
}
LogFormat::Pretty => (
None,
Some(fmt::layer().pretty().with_writer(log_writer)),
None,
None,
),
LogFormat::Json => (
None,
None,
Some(fmt::layer().json().with_writer(log_writer)),
None,
),
LogFormat::Logfmt => {
(None, None, None, Some(logfmt::LogFmtLayer::new(log_writer)))
}
};
let log_layer_filter = match log_verbose_count {
0 => EnvFilter::try_new(&config.log_filter).unwrap(),
1 => EnvFilter::try_new("info").unwrap(),
2 => EnvFilter::try_new("debug,hyper::proto::h1=info,h2=info").unwrap(),
_ => EnvFilter::try_new("trace,hyper::proto::h1=info,h2=info").unwrap(),
};
(
Some(log_layer_filter),
log_format_full,
log_format_pretty,
log_format_json,
log_format_logfmt,
)
}
}
};
let subscriber = tracing_subscriber::Registry::default()
.with(log_layer_format_logfmt)
.with(log_layer_format_json)
.with(log_layer_format_pretty)
.with(log_layer_format_full)
.with(log_layer_filter)
.with(traces_layer_otel)
.with(traces_layer_filter);
let tracing_guard = tracing::subscriber::set_default(subscriber);
TracingGuard(tracing_guard)
}
fn construct_opentelemetry_tracer(config: &crate::commands::run::Config) -> Option<trace::Tracer> {
let trace_config = {
let sampler = match config.traces_sampler {
TracesSampler::AlwaysOn => trace::Sampler::AlwaysOn,
TracesSampler::AlwaysOff => {
return None;
}
TracesSampler::TraceIdRatio => {
trace::Sampler::TraceIdRatioBased(config.traces_sampler_arg)
}
TracesSampler::ParentBasedAlwaysOn => {
trace::Sampler::ParentBased(Box::new(trace::Sampler::AlwaysOn))
}
TracesSampler::ParentBasedAlwaysOff => {
trace::Sampler::ParentBased(Box::new(trace::Sampler::AlwaysOff))
}
TracesSampler::ParentBasedTraceIdRatio => trace::Sampler::ParentBased(Box::new(
trace::Sampler::TraceIdRatioBased(config.traces_sampler_arg),
)),
};
let resource = Resource::new(vec![KeyValue::new("service.name", "influxdb-iox")]);
trace::Config::default()
.with_sampler(sampler)
.with_resource(resource)
};
match config.traces_exporter {
TracesExporter::Jaeger => {
let agent_endpoint = format!(
"{}:{}",
config.traces_exporter_jaeger_agent_host.trim(),
config.traces_exporter_jaeger_agent_port
);
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
Some(
opentelemetry_jaeger::new_pipeline()
.with_trace_config(trace_config)
.with_agent_endpoint(agent_endpoint)
.install_batch(opentelemetry::runtime::Tokio)
.unwrap(),
)
}
TracesExporter::Otlp => {
let jaeger_endpoint = format!(
"{}:{}",
config.traces_exporter_otlp_host.trim(),
config.traces_exporter_otlp_port
);
Some(
opentelemetry_otlp::new_pipeline()
.with_trace_config(trace_config)
.with_endpoint(jaeger_endpoint)
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.with_tonic()
.install_batch(opentelemetry::runtime::Tokio)
.unwrap(),
)
}
TracesExporter::None => None,
}
}
/// An RAII guard. On Drop, tracing and OpenTelemetry are flushed and shut down.
pub struct TracingGuard(tracing::subscriber::DefaultGuard);
impl Drop for TracingGuard {
fn drop(&mut self) {
opentelemetry::global::shutdown_tracer_provider();
}
}
#[derive(Debug, Clone, Copy)]
pub enum LogFormat {
Full,
Pretty,
Json,
Logfmt,
}
impl std::str::FromStr for LogFormat {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"full" => Ok(Self::Full),
"pretty" => Ok(Self::Pretty),
"json" => Ok(Self::Json),
"logfmt" => Ok(Self::Logfmt),
_ => Err(format!(
"Invalid log format '{}'. Valid options: full, pretty, json, logfmt",
s
)),
}
}
}
impl std::fmt::Display for LogFormat {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Full => write!(f, "full"),
Self::Pretty => write!(f, "pretty"),
Self::Json => write!(f, "json"),
Self::Logfmt => write!(f, "logfmt"),
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum LogDestination {
Stdout,
Stderr,
}
impl std::str::FromStr for LogDestination {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"stdout" => Ok(Self::Stdout),
"stderr" => Ok(Self::Stderr),
_ => Err(format!(
"Invalid log destination '{}'. Valid options: stdout, stderr",
s
)),
}
}
}
impl std::fmt::Display for LogDestination {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Stdout => write!(f, "stdout"),
Self::Stderr => write!(f, "stderr"),
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum TracesExporter {
None,
Jaeger,
Otlp,
}
impl std::str::FromStr for TracesExporter {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"none" => Ok(Self::None),
"jaeger" => Ok(Self::Jaeger),
"otlp" => Ok(Self::Otlp),
_ => Err(format!(
"Invalid traces exporter '{}'. Valid options: none, jaeger, otlp",
s
)),
}
}
}
impl std::fmt::Display for TracesExporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::None => write!(f, "none"),
Self::Jaeger => write!(f, "jaeger"),
Self::Otlp => write!(f, "otlp"),
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum TracesSampler {
AlwaysOn,
AlwaysOff,
TraceIdRatio,
ParentBasedAlwaysOn,
ParentBasedAlwaysOff,
ParentBasedTraceIdRatio,
}
impl std::str::FromStr for TracesSampler {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"always_on" => Ok(Self::AlwaysOn),
"always_off" => Ok(Self::AlwaysOff),
"traceidratio" => Ok(Self::TraceIdRatio),
"parentbased_always_on" => Ok(Self::ParentBasedAlwaysOn),
"parentbased_always_off" => Ok(Self::ParentBasedAlwaysOff),
"parentbased_traceidratio" => Ok(Self::ParentBasedTraceIdRatio),
_ => Err(format!("Invalid traces sampler '{}'. Valid options: always_on, always_off, traceidratio, parentbased_always_on, parentbased_always_off, parentbased_traceidratio", s)),
}
}
}
impl std::fmt::Display for TracesSampler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::AlwaysOn => write!(f, "always_on"),
Self::AlwaysOff => write!(f, "always_off"),
Self::TraceIdRatio => write!(f, "traceidratio"),
Self::ParentBasedAlwaysOn => write!(f, "parentbased_always_on"),
Self::ParentBasedAlwaysOff => write!(f, "parentbased_always_off"),
Self::ParentBasedTraceIdRatio => write!(f, "parentbased_traceidratio"),
}
}
}

View File

@ -1,5 +1,4 @@
use crate::commands::{
logging::LoggingLevel,
metrics,
run::{Config, ObjectStore as ObjStoreOpt},
};
@ -99,16 +98,8 @@ async fn wait_for_signal() {
}
/// This is the entry point for the IOx server. `config` represents
/// command line arguments, if any
///
/// The logging_level passed in is the global setting (e.g. if -v or
/// -vv was passed in before 'server')
pub async fn main(logging_level: LoggingLevel, config: Config) -> Result<()> {
// Handle the case if -v/-vv is specified both before and after the server
// command
let logging_level = logging_level.combine(LoggingLevel::new(config.verbose_count));
let _drop_handle = logging_level.setup_logging(&config);
/// command line arguments, if any.
pub async fn main(config: Config) -> Result<()> {
metrics::init_metrics(&config);
// Install custom panic handler and forget about it.

View File

@ -10,12 +10,12 @@
use std::str::FromStr;
use dotenv::dotenv;
use observability_deps::tracing::{debug, warn};
use structopt::StructOpt;
use tokio::runtime::Runtime;
use commands::logging::LoggingLevel;
use commands::tracing::{init_logs_and_tracing, init_simple_logs};
use ingest::parquet::writer::CompressionLevel;
use observability_deps::tracing::{debug, warn};
use tikv_jemallocator::Jemalloc;
@ -23,7 +23,6 @@ mod commands {
pub mod convert;
pub mod database;
mod input;
pub mod logging;
pub mod meta;
pub mod metrics;
pub mod operations;
@ -31,6 +30,7 @@ mod commands {
pub mod server;
pub mod server_remote;
pub mod stats;
pub mod tracing;
pub mod writer;
}
@ -81,8 +81,24 @@ For example, a command such as the following shows all actions
"#
)]
struct Config {
#[structopt(short, long, parse(from_occurrences))]
verbose: u64,
/// Log filter short-hand.
///
/// Convenient way to set log severity level filter.
/// Overrides --log-filter / LOG_FILTER.
///
/// -v 'info'
///
/// -vv 'debug,hyper::proto::h1=info,h2=info'
///
/// -vvv 'trace,hyper::proto::h1=info,h2=info'
#[structopt(
short = "-v",
long = "--verbose",
multiple = true,
takes_value = false,
parse(from_occurrences)
)]
pub log_verbose_count: u8,
/// gRPC address of IOx server to connect to
#[structopt(
@ -140,24 +156,17 @@ fn main() -> Result<(), std::io::Error> {
let config = Config::from_args();
// Logging level is determined via:
// 1. If RUST_LOG environment variable is set, use that value
// 2. if `-vv` (multiple instances of verbose), use DEFAULT_DEBUG_LOG_LEVEL
// 2. if `-v` (single instances of verbose), use DEFAULT_VERBOSE_LOG_LEVEL
// 3. Otherwise use DEFAULT_LOG_LEVEL
let logging_level = LoggingLevel::new(config.verbose);
let tokio_runtime = get_runtime(config.num_threads)?;
tokio_runtime.block_on(async move {
let host = config.host;
let log_verbose_count = config.log_verbose_count;
match config.command {
Command::Convert {
input,
output,
compression_level,
} => {
logging_level.setup_basic_logging();
let _tracing_guard = init_simple_logs(log_verbose_count);
let compression_level = CompressionLevel::from_str(&compression_level).unwrap();
match commands::convert::convert(&input, &output, compression_level) {
Ok(()) => debug!("Conversion completed successfully"),
@ -168,7 +177,7 @@ fn main() -> Result<(), std::io::Error> {
}
}
Command::Meta { input } => {
logging_level.setup_basic_logging();
let _tracing_guard = init_simple_logs(log_verbose_count);
match commands::meta::dump_meta(&input) {
Ok(()) => debug!("Metadata dump completed successfully"),
Err(e) => {
@ -178,7 +187,7 @@ fn main() -> Result<(), std::io::Error> {
}
}
Command::Stats(config) => {
logging_level.setup_basic_logging();
let _tracing_guard = init_simple_logs(log_verbose_count);
match commands::stats::stats(&config).await {
Ok(()) => debug!("Storage statistics dump completed successfully"),
Err(e) => {
@ -188,37 +197,36 @@ fn main() -> Result<(), std::io::Error> {
}
}
Command::Database(config) => {
logging_level.setup_basic_logging();
let _tracing_guard = init_simple_logs(log_verbose_count);
if let Err(e) = commands::database::command(host, config).await {
eprintln!("{}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Writer(config) => {
logging_level.setup_basic_logging();
let _tracing_guard = init_simple_logs(log_verbose_count);
if let Err(e) = commands::writer::command(host, config).await {
eprintln!("{}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Operation(config) => {
logging_level.setup_basic_logging();
let _tracing_guard = init_simple_logs(log_verbose_count);
if let Err(e) = commands::operations::command(host, config).await {
eprintln!("{}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Server(config) => {
logging_level.setup_basic_logging();
let _tracing_guard = init_simple_logs(log_verbose_count);
if let Err(e) = commands::server::command(host, config).await {
eprintln!("Server command failed: {}", e);
std::process::exit(ReturnCode::Failure as _)
}
}
Command::Run(config) => {
// Note don't set up basic logging here, different logging rules apply in server
// mode
if let Err(e) = commands::run::command(logging_level, *config).await {
let _tracing_guard = init_logs_and_tracing(log_verbose_count, &config);
if let Err(e) = commands::run::command(*config).await {
eprintln!("Server command failed: {}", e);
std::process::exit(ReturnCode::Failure as _)
}

View File

@ -91,11 +91,11 @@ fn convert_line_protocol_good_input_filename() {
assert
.success()
.stderr(predicate::str::contains("convert starting"))
.stderr(predicate::str::contains(
.stdout(predicate::str::contains("convert starting"))
.stdout(predicate::str::contains(
"Writing output for measurement h2o_temperature",
))
.stderr(predicate::str::contains(expected_success_string));
.stdout(predicate::str::contains(expected_success_string));
validate_parquet_file(&parquet_path);
}
@ -179,15 +179,15 @@ fn convert_multiple_measurements() {
assert
.success()
.stderr(predicate::str::contains("convert starting"))
.stderr(predicate::str::contains("Writing to output directory"))
.stderr(predicate::str::contains(
.stdout(predicate::str::contains("convert starting"))
.stdout(predicate::str::contains("Writing to output directory"))
.stdout(predicate::str::contains(
"Writing output for measurement h2o_temperature",
))
.stderr(predicate::str::contains(
.stdout(predicate::str::contains(
"Writing output for measurement air_temperature",
))
.stderr(predicate::str::contains(expected_success_string));
.stdout(predicate::str::contains(expected_success_string));
// check that the two files have been written successfully
let mut output_files: Vec<_> = fs::read_dir(parquet_output_path.path())