chore: remove orphaned code from trogging (#2371)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-08-24 11:12:59 +01:00 committed by GitHub
parent a6c9cc2bf2
commit 3fdc0e9a6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 74 additions and 966 deletions

View File

@ -214,7 +214,7 @@ jobs:
command: cargo test --workspace --benches --no-run
- run:
name: Build with object store + exporter support + HEAP profiling
command: cargo build --no-default-features --features="aws,gcp,azure,jaeger,otlp,heappy,pprof"
command: cargo build --no-default-features --features="aws,gcp,azure,jaeger,heappy,pprof"
- cache_save
# Lint protobufs.
@ -273,10 +273,10 @@ jobs:
- cache_restore
- run:
name: Print rustc target CPU options
command: cargo run --release --no-default-features --features="aws,gcp,azure,jaeger,otlp,heappy" --bin print_cpu
command: cargo run --release --no-default-features --features="aws,gcp,azure,jaeger,heappy" --bin print_cpu
- run:
name: Cargo release build with target arch set for CRoaring
command: cargo build --release --no-default-features --features="aws,gcp,azure,jaeger,otlp,heappy"
command: cargo build --release --no-default-features --features="aws,gcp,azure,jaeger,heappy"
- run: |
echo sha256sum after build is
sha256sum target/release/influxdb_iox

216
Cargo.lock generated
View File

@ -160,11 +160,11 @@ dependencies = [
"base64 0.13.0",
"bytes",
"proc-macro2",
"prost 0.8.0",
"prost-derive 0.8.0",
"prost",
"prost-derive",
"tokio",
"tonic 0.5.2",
"tonic-build 0.5.2",
"tonic",
"tonic-build",
]
[[package]]
@ -1327,15 +1327,15 @@ dependencies = [
"num_cpus",
"observability_deps",
"proc-macro2",
"prost 0.8.0",
"prost-build 0.8.0",
"prost-types 0.8.0",
"prost",
"prost-build",
"prost-types",
"regex",
"serde",
"serde_json",
"thiserror",
"tonic 0.5.2",
"tonic-build 0.5.2",
"tonic",
"tonic-build",
]
[[package]]
@ -1397,8 +1397,8 @@ version = "0.1.0"
dependencies = [
"bytes",
"chrono",
"prost 0.8.0",
"prost-build 0.8.0",
"prost",
"prost-build",
"serde",
]
@ -1412,15 +1412,15 @@ dependencies = [
"grpc-router-test-gen",
"observability_deps",
"paste",
"prost 0.8.0",
"prost-build 0.8.0",
"prost-types 0.8.0",
"prost",
"prost-build",
"prost-types",
"thiserror",
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.5.2",
"tonic-build 0.5.2",
"tonic",
"tonic-build",
"tonic-reflection",
]
@ -1428,11 +1428,11 @@ dependencies = [
name = "grpc-router-test-gen"
version = "0.1.0"
dependencies = [
"prost 0.8.0",
"prost-build 0.8.0",
"prost-types 0.8.0",
"tonic 0.5.2",
"tonic-build 0.5.2",
"prost",
"prost-build",
"prost-types",
"tonic",
"tonic-build",
]
[[package]]
@ -1493,7 +1493,7 @@ dependencies = [
"lazy_static",
"libc",
"pprof",
"prost 0.8.0",
"prost",
"spin 0.9.2",
"thiserror",
"tikv-jemalloc-sys",
@ -1748,7 +1748,7 @@ dependencies = [
"pprof",
"predicates",
"prettytable-rs",
"prost 0.8.0",
"prost",
"query",
"rand 0.8.4",
"rdkafka",
@ -1771,7 +1771,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.5.2",
"tonic",
"tonic-health",
"tonic-reflection",
"trace",
@ -1793,13 +1793,13 @@ dependencies = [
"generated_types",
"http",
"hyper",
"prost 0.8.0",
"prost",
"rand 0.8.4",
"serde",
"serde_json",
"thiserror",
"tokio",
"tonic 0.5.2",
"tonic",
"tower",
]
@ -2613,8 +2613,6 @@ dependencies = [
"pin-project 1.0.8",
"rand 0.8.4",
"thiserror",
"tokio",
"tokio-stream",
]
[[package]]
@ -2628,24 +2626,6 @@ dependencies = [
"opentelemetry",
"thiserror",
"thrift",
"tokio",
]
[[package]]
name = "opentelemetry-otlp"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c42168ec1ee8fe85f36600c41196963eb075ccdd0aaac947119f1f562c0804dd"
dependencies = [
"async-trait",
"futures",
"http",
"opentelemetry",
"prost 0.7.0",
"thiserror",
"tokio",
"tonic 0.4.3",
"tonic-build 0.4.2",
]
[[package]]
@ -2802,7 +2782,7 @@ dependencies = [
"parquet",
"parquet-format",
"persistence_windows",
"prost 0.8.0",
"prost",
"query",
"snafu",
"tempfile",
@ -3014,9 +2994,9 @@ dependencies = [
"log",
"nix",
"parking_lot",
"prost 0.8.0",
"prost-build 0.8.0",
"prost-derive 0.8.0",
"prost",
"prost-build",
"prost-derive",
"symbolic-demangle",
"tempfile",
"thiserror",
@ -3142,16 +3122,6 @@ dependencies = [
"thiserror",
]
[[package]]
name = "prost"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e6984d2f1a23009bd270b8bb56d0926810a3d483f59c987d77969e9d8e840b2"
dependencies = [
"bytes",
"prost-derive 0.7.0",
]
[[package]]
name = "prost"
version = "0.8.0"
@ -3159,25 +3129,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de5e2533f59d08fcf364fd374ebda0692a70bd6d7e66ef97f306f45c6c5d8020"
dependencies = [
"bytes",
"prost-derive 0.8.0",
]
[[package]]
name = "prost-build"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32d3ebd75ac2679c2af3a92246639f9fcc8a442ee420719cc4fe195b98dd5fa3"
dependencies = [
"bytes",
"heck",
"itertools 0.9.0",
"log",
"multimap",
"petgraph",
"prost 0.7.0",
"prost-types 0.7.0",
"tempfile",
"which 4.2.2",
"prost-derive",
]
[[package]]
@ -3192,25 +3144,12 @@ dependencies = [
"log",
"multimap",
"petgraph",
"prost 0.8.0",
"prost-types 0.8.0",
"prost",
"prost-types",
"tempfile",
"which 4.2.2",
]
[[package]]
name = "prost-derive"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "169a15f3008ecb5160cba7d37bcd690a7601b6d30cfb87a117d45e59d52af5d4"
dependencies = [
"anyhow",
"itertools 0.9.0",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-derive"
version = "0.8.0"
@ -3224,16 +3163,6 @@ dependencies = [
"syn",
]
[[package]]
name = "prost-types"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b518d7cdd93dab1d1122cf07fa9a60771836c668dde9d9e2a139f957f0d9f1bb"
dependencies = [
"bytes",
"prost 0.7.0",
]
[[package]]
name = "prost-types"
version = "0.8.0"
@ -3241,7 +3170,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "603bbd6394701d13f3f25aada59c7de9d35a6a5887cfc156181234a44002771b"
dependencies = [
"bytes",
"prost 0.8.0",
"prost",
]
[[package]]
@ -4704,35 +4633,6 @@ dependencies = [
"serde",
]
[[package]]
name = "tonic"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ac42cd97ac6bd2339af5bcabf105540e21e45636ec6fa6aae5e85d44db31be0"
dependencies = [
"async-stream",
"async-trait",
"base64 0.13.0",
"bytes",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"percent-encoding",
"pin-project 1.0.8",
"prost 0.7.0",
"prost-derive 0.7.0",
"tokio",
"tokio-stream",
"tokio-util",
"tower",
"tower-service",
"tracing",
"tracing-futures",
]
[[package]]
name = "tonic"
version = "0.5.2"
@ -4752,8 +4652,8 @@ dependencies = [
"hyper-timeout",
"percent-encoding",
"pin-project 1.0.8",
"prost 0.8.0",
"prost-derive 0.8.0",
"prost",
"prost-derive",
"tokio",
"tokio-stream",
"tokio-util",
@ -4764,18 +4664,6 @@ dependencies = [
"tracing-futures",
]
[[package]]
name = "tonic-build"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c695de27302f4697191dda1c7178131a8cb805463dda02864acb80fe1322fdcf"
dependencies = [
"proc-macro2",
"prost-build 0.7.0",
"quote",
"syn",
]
[[package]]
name = "tonic-build"
version = "0.5.2"
@ -4783,7 +4671,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12b52d07035516c2b74337d2ac7746075e7dcae7643816c1b12c5ff8a7484c08"
dependencies = [
"proc-macro2",
"prost-build 0.8.0",
"prost-build",
"quote",
"syn",
]
@ -4796,11 +4684,11 @@ checksum = "493fcae35818dffa28437b210a615119d791116c1cac80716f571f35dd55b1b9"
dependencies = [
"async-stream",
"bytes",
"prost 0.8.0",
"prost",
"tokio",
"tokio-stream",
"tonic 0.5.2",
"tonic-build 0.5.2",
"tonic",
"tonic-build",
]
[[package]]
@ -4810,12 +4698,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8249761575cfef2635c3df0aa9f09d8e42d53ff0f7c21739ff2b37dd3343d827"
dependencies = [
"bytes",
"prost 0.8.0",
"prost-types 0.8.0",
"prost",
"prost-types",
"tokio",
"tokio-stream",
"tonic 0.5.2",
"tonic-build 0.5.2",
"tonic",
"tonic-build",
]
[[package]]
@ -4940,18 +4828,6 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-opentelemetry"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c47440f2979c4cd3138922840eec122e3c0ba2148bc290f756bd7fd60fc97fff"
dependencies = [
"opentelemetry",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "tracing-serde"
version = "0.1.2"
@ -5011,15 +4887,11 @@ version = "0.1.0"
dependencies = [
"logfmt",
"observability_deps",
"opentelemetry",
"opentelemetry-jaeger",
"opentelemetry-otlp",
"regex",
"structopt",
"synchronized-writer",
"thiserror",
"tracing-log",
"tracing-opentelemetry",
"tracing-subscriber",
]

View File

@ -156,8 +156,7 @@ default = [ "jemalloc_replacing_malloc" ]
azure = ["object_store/azure"] # Optional Azure Object store support
gcp = ["object_store/gcp"] # Optional GCP object store support
aws = ["object_store/aws"] # Optional AWS / S3 object store support
jaeger = ["trogging/jaeger", "trace_exporters/jaeger"] # Enable optional jaeger tracing support
otlp = ["trogging/otlp"] # Enable optional open telemetry collector
jaeger = ["trace_exporters/jaeger"] # Enable optional jaeger tracing support
# pprof is an optional feature for pprof support
# heappy is an optional feature; Not on by default as it

View File

@ -13,7 +13,7 @@ RUN \
--mount=type=cache,id=influxdb_iox_git,sharing=locked,target=/usr/local/cargo/git \
--mount=type=cache,id=influxdb_iox_target,sharing=locked,target=/influxdb_iox/target \
du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target && \
cargo build --target-dir /influxdb_iox/target --release --features azure,gcp,aws,jaeger,otlp,pprof && \
cargo build --target-dir /influxdb_iox/target --release --features azure,gcp,aws,jaeger,pprof && \
cp /influxdb_iox/target/release/influxdb_iox /root/influxdb_iox && \
du -cshx /usr/local/cargo/registry /usr/local/cargo/git /influxdb_iox/target

View File

@ -3,10 +3,10 @@
use std::cmp::max;
use trogging::cli::LoggingConfigBuilderExt;
pub use trogging::config::*;
pub use trogging::TracingGuard;
pub use trogging::TroggingGuard;
/// Start simple logger. Panics on error.
pub fn init_simple_logs(log_verbose_count: u8) -> Result<TracingGuard, trogging::Error> {
pub fn init_simple_logs(log_verbose_count: u8) -> Result<TroggingGuard, trogging::Error> {
trogging::Builder::new()
.with_log_verbose_count(log_verbose_count)
.install_global()
@ -16,7 +16,7 @@ pub fn init_simple_logs(log_verbose_count: u8) -> Result<TracingGuard, trogging:
pub fn init_logs_and_tracing(
log_verbose_count: u8,
config: &crate::commands::run::Config,
) -> Result<TracingGuard, trogging::Error> {
) -> Result<TroggingGuard, trogging::Error> {
let mut logging_config = config.logging_config.clone();
// Handle the case if -v/-vv is specified both before and after the server

View File

@ -16,7 +16,7 @@ use tokio::runtime::Runtime;
use commands::tracing::{init_logs_and_tracing, init_simple_logs};
use observability_deps::tracing::warn;
use crate::commands::tracing::TracingGuard;
use crate::commands::tracing::TroggingGuard;
use influxdb_iox_client::connection::Builder;
use std::str::FromStr;
@ -164,7 +164,7 @@ fn main() -> Result<(), std::io::Error> {
}
};
fn handle_init_logs(r: Result<TracingGuard, trogging::Error>) -> TracingGuard {
fn handle_init_logs(r: Result<TroggingGuard, trogging::Error>) -> TroggingGuard {
match r {
Ok(guard) => guard,
Err(e) => {

View File

@ -3,26 +3,18 @@ name = "trogging"
version = "0.1.0"
authors = ["Marko Mikulicic <mkm@influxdata.com>"]
edition = "2018"
description = "(TR)acing and l(OGGING) configuration"
description = "IOx logging pipeline built upon tokio-tracing"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
logfmt = { path = "../logfmt" }
observability_deps = { path = "../observability_deps" }
opentelemetry = "0.15"
opentelemetry-jaeger = { version = "0.14", features = ["tokio"], optional = true }
opentelemetry-otlp = { version = "0.8", optional = true }
thiserror = "1.0.23"
tracing-log = "0.1"
tracing-subscriber = "0.2"
tracing-opentelemetry = { version = "0.14", default-features = false }
structopt = { version = "0.3.21", optional = true }
[dev-dependencies]
synchronized-writer = "1"
regex = "1"
[features]
jaeger = ["opentelemetry-jaeger", "opentelemetry/rt-tokio"]
otlp = ["opentelemetry-otlp", "opentelemetry/rt-tokio"]

View File

@ -1,6 +1,5 @@
///! Common CLI flags for logging and tracing
use crate::{config::*, Builder, Result, TracingGuard};
use std::num::NonZeroU16;
use crate::{config::*, Builder, Result, TroggingGuard};
use structopt::StructOpt;
use tracing_subscriber::fmt::{writer::BoxMakeWriter, MakeWriter};
@ -117,7 +116,7 @@ impl LoggingConfig {
.with_log_format(self.log_format)
}
pub fn install_global_subscriber(&self) -> Result<TracingGuard> {
pub fn install_global_subscriber(&self) -> Result<TroggingGuard> {
self.to_builder().install_global()
}
}
@ -137,207 +136,12 @@ where
}
}
/// CLI config for the logging+tracing related subset of options.
#[derive(Debug, StructOpt, Clone)]
pub struct TracingConfig {
/// Tracing: exporter type
///
/// Can be one of: none, jaeger, otlp
///
/// When enabled, additional flags are considered (see flags related to OTLP
/// and Jaeger).
#[structopt(
long = "--traces-exporter",
env = "TRACES_EXPORTER",
default_value = "none"
)]
pub traces_exporter: TracesExporter,
/// Tracing: filter directive
///
/// Configures traces severity level filter, by target.
///
/// Simplest options: error, warn, info, debug, trace
///
/// Levels for different modules can be specified. For example
/// `debug,hyper::proto::h1=info` specifies debug tracing for all modules
/// except for the `hyper::proto::h1` module which will only display info
/// level tracing.
///
/// Extended syntax provided by `tracing-subscriber` includes span/field
/// filters. See <https://docs.rs/tracing-subscriber/0.2.17/tracing_subscriber/filter/struct.EnvFilter.html> for more details.
///
/// No filter by default.
#[structopt(long = "--traces-filter", env = "TRACES_FILTER")]
pub traces_filter: Option<String>,
/// Tracing: sampler type
///
/// Can be one of:
/// always_on, always_off, traceidratio,
/// parentbased_always_on, parentbased_always_off, parentbased_traceidratio
///
/// These alternatives are described in detail at <https://github.com/open-telemetry/opentelemetry-specification/blob/v1.1.0/specification/sdk-environment-variables.md#general-sdk-configuration>.
#[structopt(
long = "--traces-sampler",
env = "TRACES_SAMPLER",
default_value = "parentbased_traceidratio"
)]
pub traces_sampler: TracesSampler,
#[rustfmt::skip]
/// Tracing: sampler argument
///
/// Valid range: [0.0, 1.0].
///
/// Only used if `--traces-sampler` is set to
/// parentbased_traceidratio (default) or traceidratio.
///
/// With sample parentbased_traceidratio, the following rules apply:
/// - if parent is sampled, then all of its children are sampled
/// - else sample this portion of traces (0.5 = 50%)
///
/// More details about this sampling argument at <https://github.com/open-telemetry/opentelemetry-specification/blob/v1.1.0/specification/sdk-environment-variables.md#general-sdk-configuration>.
#[structopt(
long = "--traces-sampler-arg",
env = "TRACES_SAMPLER_ARG",
default_value = "1.0",
verbatim_doc_comment
)]
pub traces_sampler_arg: f64,
/// Tracing: OTLP (eg OpenTelemetry collector) network hostname
///
/// Only used if `--traces-exporter` is "otlp".
///
/// Protocol is gRPC. HTTP is not supported.
#[structopt(
long = "--traces-exporter-otlp-host",
env = "TRACES_EXPORTER_OTLP_HOST",
default_value = "localhost"
)]
pub traces_exporter_otlp_host: String,
/// Tracing: OTLP (eg OpenTelemetry collector) network port
///
/// Only used if `--traces-exporter` is "otlp".
///
/// Protocol is gRPC. HTTP is not supported.
#[structopt(
long = "--traces-exporter-otlp-port",
env = "TRACES_EXPORTER_OTLP_PORT",
default_value = "4317"
)]
pub traces_exporter_otlp_port: NonZeroU16,
/// Tracing: Jaeger agent network hostname
///
/// Protocol is Thrift/Compact over UDP.
///
/// Only used if `--traces-exporter` is "jaeger".
#[structopt(
long = "--traces-exporter-jaeger-agent-host",
env = "TRACES_EXPORTER_JAEGER_AGENT_HOST",
default_value = "0.0.0.0"
)]
pub traces_exporter_jaeger_agent_host: String,
/// Tracing: Jaeger agent network port
///
/// Protocol is Thrift/Compact over UDP.
///
/// Only used if `--traces-exporter` is "jaeger".
#[structopt(
long = "--traces-exporter-jaeger-agent-port",
env = "TRACES_EXPORTER_JAEGER_AGENT_PORT",
default_value = "6831"
)]
pub traces_exporter_jaeger_agent_port: NonZeroU16,
/// Tracing: Jaeger service name.
///
/// Only used if `--traces-exporter` is "jaeger".
#[structopt(
long = "--traces-exporter-jaeger-service-name",
env = "TRACES_EXPORTER_JAEGER_SERVICE_NAME",
default_value = "iox-conductor"
)]
pub traces_exporter_jaeger_service_name: String,
/// Tracing: Jaeger max UDP packet size
///
/// Default to 1300, which is a safe MTU.
///
/// You can increase it to 65000 if the target is a jaeger collector
/// on localhost. If so, the batching exporter will be enabled for
/// extra efficiency. Otherwise an UDP packet will be sent for each exported span.
///
/// Only used if `--traces-exporter` is "jaeger".
#[structopt(
long = "--traces-exporter-jaeger-max-packet-size",
env = "TRACES_EXPORTER_JAEGER_MAX_PACKET_SIZE",
default_value = "1300"
)]
pub traces_exporter_jaeger_max_packet_size: usize,
}
impl TracingConfig {
pub fn to_builder(&self) -> Builder {
self.with_builder(Builder::new())
}
pub fn with_builder<W>(&self, builder: Builder<W>) -> Builder<W>
where
W: MakeWriter + Send + Sync + Clone + 'static,
{
builder
.with_traces_filter(&self.traces_filter)
.with_traces_exporter(self.traces_exporter)
.with_traces_sampler(self.traces_sampler, self.traces_sampler_arg)
.with_jaeger_config(JaegerConfig {
agent_host: self.traces_exporter_jaeger_agent_host.clone(),
agent_port: self.traces_exporter_jaeger_agent_port,
service_name: self.traces_exporter_jaeger_service_name.clone(),
max_packet_size: self.traces_exporter_jaeger_max_packet_size,
})
.with_oltp_config(OtlpConfig {
host: self.traces_exporter_otlp_host.clone(),
port: self.traces_exporter_otlp_port,
})
}
pub fn install_global_subscriber(&self) -> Result<TracingGuard> {
self.to_builder().install_global()
}
}
/// Extends the trogging [`crate::Builder`] API.
pub trait TracingConfigBuilderExt<W> {
/// Applies all config entries from a [`TracingConfig`] to a [`crate::Builder`].
fn with_tracing_config(self, config: &TracingConfig) -> Builder<W>;
}
impl<W> TracingConfigBuilderExt<W> for Builder<W>
where
W: MakeWriter + Send + Sync + Clone + 'static,
{
fn with_tracing_config(self, config: &TracingConfig) -> Self {
config.with_builder(self)
}
}
impl From<LoggingConfig> for Builder<BoxMakeWriter> {
fn from(config: LoggingConfig) -> Self {
config.to_builder()
}
}
impl From<TracingConfig> for Builder {
fn from(config: TracingConfig) -> Self {
config.to_builder()
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1,5 +1,3 @@
use std::num::NonZeroU16;
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum LogFormat {
Full,
@ -65,89 +63,3 @@ impl std::fmt::Display for LogDestination {
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum TracesExporter {
None,
Jaeger,
Otlp,
}
impl std::str::FromStr for TracesExporter {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"none" => Ok(Self::None),
"jaeger" => Ok(Self::Jaeger),
"otlp" => Ok(Self::Otlp),
_ => Err(format!(
"Invalid traces exporter '{}'. Valid options: none, jaeger, otlp",
s
)),
}
}
}
impl std::fmt::Display for TracesExporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::None => write!(f, "none"),
Self::Jaeger => write!(f, "jaeger"),
Self::Otlp => write!(f, "otlp"),
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum TracesSampler {
AlwaysOn,
AlwaysOff,
TraceIdRatio,
ParentBasedAlwaysOn,
ParentBasedAlwaysOff,
ParentBasedTraceIdRatio,
}
impl std::str::FromStr for TracesSampler {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"always_on" => Ok(Self::AlwaysOn),
"always_off" => Ok(Self::AlwaysOff),
"traceidratio" => Ok(Self::TraceIdRatio),
"parentbased_always_on" => Ok(Self::ParentBasedAlwaysOn),
"parentbased_always_off" => Ok(Self::ParentBasedAlwaysOff),
"parentbased_traceidratio" => Ok(Self::ParentBasedTraceIdRatio),
_ => Err(format!("Invalid traces sampler '{}'. Valid options: always_on, always_off, traceidratio, parentbased_always_on, parentbased_always_off, parentbased_traceidratio", s)),
}
}
}
impl std::fmt::Display for TracesSampler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::AlwaysOn => write!(f, "always_on"),
Self::AlwaysOff => write!(f, "always_off"),
Self::TraceIdRatio => write!(f, "traceidratio"),
Self::ParentBasedAlwaysOn => write!(f, "parentbased_always_on"),
Self::ParentBasedAlwaysOff => write!(f, "parentbased_always_off"),
Self::ParentBasedTraceIdRatio => write!(f, "parentbased_traceidratio"),
}
}
}
#[derive(Debug)]
pub struct JaegerConfig {
pub agent_host: String,
pub agent_port: NonZeroU16,
pub service_name: String,
pub max_packet_size: usize,
}
#[derive(Debug)]
pub struct OtlpConfig {
pub host: String,
pub port: NonZeroU16,
}

View File

@ -1,270 +0,0 @@
use observability_deps::tracing::Metadata;
use observability_deps::tracing::{
event::Event,
span::{Attributes, Id, Record},
subscriber::Subscriber,
};
use std::fmt::Formatter;
use std::marker::PhantomData;
use std::sync::Arc;
use tracing_subscriber::layer::{Context, Layer};
use tracing_subscriber::EnvFilter;
/// A FilteredLayer wraps a tracing subscriber Layer and passes events only
/// if the provided EnvFilter accepts the event.
///
/// Tracing subscriber's layering makes it easy to split event filtering from event recording
/// and collection, but the filtering decisions apply to all layers, effectively producing
/// an intersection of all filtering decisions.
///
/// A FilteredLayer on the other hand, allows to restrict the verbosity of one event sink
/// without throwing away events available to other layers.
#[derive(Debug)]
pub struct FilteredLayer<S, L>
where
S: Subscriber,
L: Layer<S>,
{
inner: L,
_phantom_data: PhantomData<S>,
}
impl<S, L> FilteredLayer<S, L>
where
S: Subscriber,
L: Layer<S>,
{
pub fn new(inner: L) -> Self {
Self {
inner,
_phantom_data: Default::default(),
}
}
}
impl<S, L> Layer<S> for FilteredLayer<S, L>
where
S: Subscriber,
L: Layer<S>,
{
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
if self.inner.enabled(event.metadata(), ctx.clone()) {
self.inner.on_event(event, ctx);
}
}
fn new_span(&self, attrs: &Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
self.inner.new_span(attrs, id, ctx)
}
fn on_record(&self, span: &Id, values: &Record<'_>, ctx: Context<'_, S>) {
self.inner.on_record(span, values, ctx)
}
fn on_follows_from(&self, span: &Id, follows: &Id, ctx: Context<'_, S>) {
self.inner.on_follows_from(span, follows, ctx)
}
fn on_enter(&self, id: &Id, ctx: Context<'_, S>) {
self.inner.on_enter(id, ctx)
}
fn on_exit(&self, id: &Id, ctx: Context<'_, S>) {
self.inner.on_exit(id, ctx)
}
fn on_close(&self, id: Id, ctx: Context<'_, S>) {
self.inner.on_close(id, ctx)
}
fn on_id_change(&self, old: &Id, new: &Id, ctx: Context<'_, S>) {
self.inner.on_id_change(old, new, ctx)
}
}
/// A union filter is a filtering Layer that returns "enabled: true"
/// for events for which at least one of the inner filters returns "enabled: true".
///
/// This is the opposite of what the main subscriber's layered chain does, where
/// if one filter says nope, the event is filtered out.
///
/// Since there is a blanked `impl<L: Layer> Layer for Option<L>` that returns
/// "enabled: true" when the option is None, it would be very confusing if an
/// user accidentally passed such a none to the filter vector in the union filter.
/// However it's quite tempting to pass an optional layer around hoping it "does the right thing",
/// since it does indeed work in other places a layer is passed to a subscriber builder
/// (moreover, due to the static typing builder pattern used there, there is no way of
/// conditionally adding a filter other than passing an optional filter). Thus, it would be
/// (and it happened in practice) quite confusing for this API to technically accept optional
/// layers and silently do the wrong thing with them.
/// For this reason the [`UnionFilter::new`] method accepts a vector of [`Option`][option]s.
/// It's not perfect, since a user could pass an `Option<Option<L>>` but hopefully
///
/// This filter is intended to be used together with the [`FilteredLayer`] layer
/// which will filter unwanted events for each of the.
///
/// This [`UnionFilter`] and the [`FilteredLayer`] are likely to share filters.
/// Unfortunately the [`EnvFilter`][envfilter] doesn't implement [`Clone`].
/// See [`CloneableEnvFilter`] for a workaround.
///
/// [envfilter]: tracing_subscriber::EnvFilter
/// [option]: std::option::Option
pub struct UnionFilter<S>
where
S: Subscriber,
{
inner: Vec<Box<dyn Layer<S> + Send + Sync + 'static>>,
_phantom_data: PhantomData<S>,
}
impl<S> UnionFilter<S>
where
S: Subscriber,
{
pub fn new(inner: Vec<Option<Box<dyn Layer<S> + Send + Sync + 'static>>>) -> Self {
let inner = inner.into_iter().flatten().collect();
Self {
inner,
_phantom_data: Default::default(),
}
}
}
impl<S> Layer<S> for UnionFilter<S>
where
S: Subscriber,
{
/// Return the disjunction of all the enabled flags of all the inner filters which are not None.
///
/// A None filter doesn't really mean "I want this event", nor it means "I want to filter this event out";
/// it just means "please ignore me, I'm not really a filter".
///
/// Yet, there is a blanked implementation of `Option<L>` for all `L: Layer`, and its implementation
/// if the `enabled()` method returns true. This works because the normal subscriber layer chain
/// performs a conjunction of each filter decision.
///
/// However, the [`UnionFilter`] here is the opposite, we want to enable processing of one event
/// as long as one of the event filters registers interest in it.
fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
self.inner.iter().any(|i| i.enabled(metadata, ctx.clone()))
}
}
impl<S> std::fmt::Debug for UnionFilter<S>
where
S: Subscriber,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("UnionFilter(...)")
}
}
#[derive(Clone, Debug)]
pub struct CloneableEnvFilter(Arc<EnvFilter>);
impl CloneableEnvFilter {
pub fn new(inner: EnvFilter) -> Self {
Self(Arc::new(inner))
}
}
impl<S> Layer<S> for CloneableEnvFilter
where
S: Subscriber,
{
fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
self.0.enabled(metadata, ctx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use observability_deps::tracing::{self, debug, error, info};
use std::sync::{Arc, Mutex};
use synchronized_writer::SynchronizedWriter;
use tracing_subscriber::{self, fmt, layer::SubscriberExt, EnvFilter};
// capture_two_streams is a test helper that sets up two independent tracing subscribers, each with
// a different filtering level and returns a tuple of the emitted lines for the respective streams.
//
// The first stream uses a textual format, the second stream uses a json format (just to make it clear they are
// indeed completely different exporters).
fn capture_two_streams(
filter1: EnvFilter,
filter2: EnvFilter,
workload: impl FnOnce(),
) -> (Vec<String>, Vec<String>) {
let writer1 = Arc::new(Mutex::new(Vec::new()));
let writer1_captured = Arc::clone(&writer1);
let writer2 = Arc::new(Mutex::new(Vec::new()));
let writer2_captured = Arc::clone(&writer2);
let layer1 = fmt::layer()
.with_target(false)
.with_ansi(false)
.without_time()
.with_writer(move || SynchronizedWriter::new(Arc::clone(&writer1_captured)));
let layer2 = fmt::layer()
.json()
.with_target(false)
.with_ansi(false)
.without_time()
.with_writer(move || SynchronizedWriter::new(Arc::clone(&writer2_captured)));
let subscriber = tracing_subscriber::Registry::default()
.with(FilteredLayer::new(filter1.and_then(layer1)))
.with(FilteredLayer::new(filter2.and_then(layer2)));
tracing::subscriber::with_default(subscriber, workload);
let lines = |writer: Arc<Mutex<Vec<_>>>| {
std::str::from_utf8(&writer.lock().unwrap())
.unwrap()
.to_string()
.trim_end()
.split('\n')
.into_iter()
.map(String::from)
.collect()
};
(lines(writer1), lines(writer2))
}
#[test]
fn test_independent() {
let workload = || {
error!("foo");
info!("bar");
debug!("baz");
};
let (lines1, lines2) =
capture_two_streams(EnvFilter::new("info"), EnvFilter::new("debug"), workload);
assert_eq!(
lines1,
vec!["ERROR foo".to_string(), " INFO bar".to_string()],
);
assert_eq!(
lines2,
vec![
r#"{"timestamp":"","level":"ERROR","fields":{"message":"foo"}}"#.to_string(),
r#"{"timestamp":"","level":"INFO","fields":{"message":"bar"}}"#.to_string(),
r#"{"timestamp":"","level":"DEBUG","fields":{"message":"baz"}}"#.to_string()
],
);
let (lines1, lines2) =
capture_two_streams(EnvFilter::new("debug"), EnvFilter::new("info"), workload);
assert_eq!(
lines1,
vec![
"ERROR foo".to_string(),
" INFO bar".to_string(),
"DEBUG baz".to_string(),
],
);
assert_eq!(
lines2,
vec![
r#"{"timestamp":"","level":"ERROR","fields":{"message":"foo"}}"#.to_string(),
r#"{"timestamp":"","level":"INFO","fields":{"message":"bar"}}"#.to_string(),
],
);
}
}

View File

@ -13,20 +13,13 @@
#[cfg(feature = "structopt")]
pub mod cli;
pub mod config;
pub mod layered_tracing;
use crate::layered_tracing::{CloneableEnvFilter, FilteredLayer, UnionFilter};
pub use config::*;
// Re-export tracing_subscriber
pub use tracing_subscriber;
use observability_deps::tracing::{self, Subscriber};
use opentelemetry::{
self,
sdk::{trace, Resource},
KeyValue,
};
use std::cmp::min;
use std::io;
use std::io::Write;
@ -49,18 +42,6 @@ const MAX_LINE_LENGTH: usize = 16 * 1024 - 1;
#[derive(Debug, Error)]
pub enum Error {
#[error("Jaeger exporter selected but jaeger config passed to builder")]
JaegerConfigMissing,
#[error("'jaeger' not supported with this build. Hint: recompile with appropriate features")]
JaegerNotBuilt {},
#[error("OTLP exporter selected but OTLP config passed to builder")]
OtlpConfigMissing,
#[error("'otlp' not supported with this build. Hint: recompile with appropriate features")]
OtlpNotBuilt {},
#[error("Cannot set global tracing subscriber")]
SetGlobalDefaultError(#[from] tracing::dispatcher::SetGlobalDefaultError),
@ -77,15 +58,9 @@ pub struct Builder<W = fn() -> io::Stdout> {
log_filter: Option<EnvFilter>,
// used when log_filter is none.
default_log_filter: EnvFilter,
traces_filter: Option<EnvFilter>,
traces_exporter: TracesExporter,
traces_sampler: TracesSampler,
traces_sampler_arg: f64,
make_writer: W,
with_target: bool,
with_ansi: bool,
jaeger_config: Option<JaegerConfig>,
otlp_config: Option<OtlpConfig>,
}
impl Default for Builder {
@ -94,15 +69,9 @@ impl Default for Builder {
log_format: LogFormat::Full,
log_filter: None,
default_log_filter: EnvFilter::try_new(Self::DEFAULT_LOG_FILTER).unwrap(),
traces_filter: None,
traces_exporter: TracesExporter::None,
traces_sampler: TracesSampler::ParentBasedTraceIdRatio,
traces_sampler_arg: 1.0,
make_writer: io::stdout,
with_target: true,
with_ansi: true,
jaeger_config: None,
otlp_config: None,
}
}
}
@ -125,14 +94,8 @@ impl<W> Builder<W> {
log_format: self.log_format,
log_filter: self.log_filter,
default_log_filter: self.default_log_filter,
traces_filter: self.traces_filter,
traces_exporter: self.traces_exporter,
traces_sampler: self.traces_sampler,
traces_sampler_arg: self.traces_sampler_arg,
with_target: self.with_target,
with_ansi: self.with_ansi,
jaeger_config: self.jaeger_config,
otlp_config: self.otlp_config,
}
}
}
@ -204,14 +167,8 @@ where
log_format: self.log_format,
log_filter: self.log_filter,
default_log_filter: self.default_log_filter,
traces_filter: self.traces_filter,
traces_exporter: self.traces_exporter,
traces_sampler: self.traces_sampler,
traces_sampler_arg: self.traces_sampler_arg,
with_target: self.with_target,
with_ansi: self.with_ansi,
jaeger_config: self.jaeger_config,
otlp_config: self.otlp_config,
}
}
@ -232,149 +189,7 @@ where
Self { with_ansi, ..self }
}
/// Sets an optional event filter for the tracing pipeline.
///
/// The filter will be parsed with [tracing_subscriber::EnvFilter]
/// and applied to all events before they reach the tracing exporter.
pub fn with_traces_filter(self, traces_filter: &Option<String>) -> Self {
if let Some(traces_filter) = traces_filter {
let traces_filter = EnvFilter::try_new(traces_filter).unwrap();
Self {
traces_filter: Some(traces_filter),
..self
}
} else {
self
}
}
pub fn with_traces_exporter(self, traces_exporter: TracesExporter) -> Self {
Self {
traces_exporter,
..self
}
}
pub fn with_traces_sampler(
self,
traces_sampler: TracesSampler,
traces_sampler_arg: f64,
) -> Self {
Self {
traces_sampler,
traces_sampler_arg,
..self
}
}
pub fn with_jaeger_config(self, config: JaegerConfig) -> Self {
Self {
jaeger_config: Some(config),
..self
}
}
pub fn with_oltp_config(self, config: OtlpConfig) -> Self {
Self {
otlp_config: Some(config),
..self
}
}
fn construct_opentelemetry_tracer(&self) -> Result<Option<trace::Tracer>> {
let trace_config = {
let sampler = match self.traces_sampler {
TracesSampler::AlwaysOn => trace::Sampler::AlwaysOn,
TracesSampler::AlwaysOff => {
return Ok(None);
}
TracesSampler::TraceIdRatio => {
trace::Sampler::TraceIdRatioBased(self.traces_sampler_arg)
}
TracesSampler::ParentBasedAlwaysOn => {
trace::Sampler::ParentBased(Box::new(trace::Sampler::AlwaysOn))
}
TracesSampler::ParentBasedAlwaysOff => {
trace::Sampler::ParentBased(Box::new(trace::Sampler::AlwaysOff))
}
TracesSampler::ParentBasedTraceIdRatio => trace::Sampler::ParentBased(Box::new(
trace::Sampler::TraceIdRatioBased(self.traces_sampler_arg),
)),
};
let resource = Resource::new(vec![KeyValue::new("service.name", "influxdb-iox")]);
trace::Config::default()
.with_sampler(sampler)
.with_resource(resource)
};
Ok(match self.traces_exporter {
TracesExporter::Jaeger => Some(self.construct_jaeger_tracer(trace_config)?),
TracesExporter::Otlp => Some(self.construct_otlp_tracer(trace_config)?),
TracesExporter::None => None,
})
}
#[cfg(feature = "jaeger")]
fn construct_jaeger_tracer(&self, trace_config: trace::Config) -> Result<trace::Tracer> {
let config = self
.jaeger_config
.as_ref()
.ok_or(Error::JaegerConfigMissing)?;
let agent_endpoint = format!("{}:{}", config.agent_host.trim(), config.agent_port);
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let builder = opentelemetry_jaeger::new_pipeline()
.with_trace_config(trace_config)
.with_agent_endpoint(agent_endpoint)
.with_service_name(&config.service_name)
.with_max_packet_size(config.max_packet_size);
// Batching is hard to tune because the max batch size
// is not currently exposed as a tunable from the trace config, and even then
// it's defined in terms of max number of spans, and not their size in bytes.
// Thus we enable batching only when the MTU size is 65000 which is the value suggested
// by jaeger when exporting to localhost.
let tracer = if config.max_packet_size >= 65_000 {
builder.install_batch(opentelemetry::runtime::Tokio)
} else {
builder.install_simple()
}
.unwrap();
Ok(tracer)
}
#[cfg(not(feature = "jaeger"))]
fn construct_jaeger_tracer(&self, _trace_config: trace::Config) -> Result<trace::Tracer> {
Err(Error::JaegerNotBuilt {})
}
#[cfg(feature = "otlp")]
fn construct_otlp_tracer(&self, trace_config: trace::Config) -> Result<trace::Tracer> {
let config = self.otlp_config.as_ref().ok_or(Error::OtlpConfigMissing)?;
let jaeger_endpoint = format!("{}:{}", config.host.trim(), config.port);
Ok(opentelemetry_otlp::new_pipeline()
.with_trace_config(trace_config)
.with_endpoint(jaeger_endpoint)
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.with_tonic()
.install_batch(opentelemetry::runtime::Tokio)
.unwrap())
}
#[cfg(not(feature = "otlp"))]
fn construct_otlp_tracer(&self, _trace_config: trace::Config) -> Result<trace::Tracer> {
Err(Error::OtlpNotBuilt {})
}
pub fn build(self) -> Result<impl Subscriber> {
let (traces_layer_filter, traces_layer_otel) =
match self.construct_opentelemetry_tracer()? {
None => (None, None),
Some(tracer) => (
self.traces_filter,
Some(tracing_opentelemetry::OpenTelemetryLayer::new(tracer)),
),
};
let log_writer = self.make_writer;
let log_format = self.log_format;
let with_target = self.with_target;
@ -427,29 +242,12 @@ where
let log_filter = self.log_filter.unwrap_or(self.default_log_filter);
// construct the union filter which allows us to skip evaluating the expensive field values unless
// at least one of the filters is interested in the events.
// e.g. consider: `debug!(foo=bar(), "baz");`
// `bar()` will only be called if either the log_filter or the traces_layer_filter is at debug level for that module.
let log_filter = CloneableEnvFilter::new(log_filter);
let traces_layer_filter = traces_layer_filter.map(CloneableEnvFilter::new);
let union_filter = UnionFilter::new(vec![
Some(Box::new(log_filter.clone())),
traces_layer_filter.clone().map(|l| Box::new(l) as _),
]);
let subscriber = tracing_subscriber::Registry::default()
.with(union_filter)
.with(FilteredLayer::new(
let subscriber = tracing_subscriber::Registry::default().with(
log_filter
.and_then(log_format_full)
.and_then(log_format_pretty)
.and_then(log_format_json)
.and_then(log_format_logfmt),
))
.with(
traces_layer_filter
.map(|filter| FilteredLayer::new(filter.and_then(traces_layer_otel))),
);
Ok(subscriber)
@ -457,23 +255,24 @@ where
/// Build a tracing subscriber and install it as a global default subscriber for all threads.
///
/// It returns a RAII guard that will ensure all events are flushed to the tracing exporter.
pub fn install_global(self) -> Result<TracingGuard> {
/// It returns a RAII guard that will ensure all events are flushed on drop
pub fn install_global(self) -> Result<TroggingGuard> {
let subscriber = self.build()?;
tracing::subscriber::set_global_default(subscriber)?;
tracing_log::LogTracer::init()?;
Ok(TracingGuard)
Ok(TroggingGuard)
}
}
/// A RAII guard. On Drop, tracing and OpenTelemetry are flushed and shut down.
/// A RAII guard. On Drop, ensures all events are flushed
///
/// Note: This is currently unnecessary but has been kept in case we choose to
/// switch to using tracing-appender which writes logs in a background worker
#[derive(Debug)]
pub struct TracingGuard;
pub struct TroggingGuard;
impl Drop for TracingGuard {
fn drop(&mut self) {
opentelemetry::global::shutdown_tracer_provider();
}
impl Drop for TroggingGuard {
fn drop(&mut self) {}
}
fn make_writer<M>(m: M) -> BoxMakeWriter