From f2c74542744b1ffa359501faaea68cf7a7bb6dfc Mon Sep 17 00:00:00 2001 From: Marko Mikulicic Date: Thu, 24 Jun 2021 12:58:27 +0200 Subject: [PATCH] fix: Use layered tracing --- src/main.rs | 1 - .../src}/layered_tracing.rs | 126 ++++++++++++++++-- trogging/src/lib.rs | 36 +++-- 3 files changed, 140 insertions(+), 23 deletions(-) rename {src/commands => trogging/src}/layered_tracing.rs (54%) diff --git a/src/main.rs b/src/main.rs index e1c2a94c1e..b0bd18e35e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,7 +20,6 @@ use tikv_jemallocator::Jemalloc; mod commands { pub mod database; - pub mod layered_tracing; pub mod operations; pub mod run; pub mod server; diff --git a/src/commands/layered_tracing.rs b/trogging/src/layered_tracing.rs similarity index 54% rename from src/commands/layered_tracing.rs rename to trogging/src/layered_tracing.rs index 2a6e0557ae..78c836748c 100644 --- a/src/commands/layered_tracing.rs +++ b/trogging/src/layered_tracing.rs @@ -1,11 +1,16 @@ +use observability_deps::tracing::Metadata; +use observability_deps::tracing_subscriber::EnvFilter; use observability_deps::{ tracing::{ event::Event, span::{Attributes, Id, Record}, subscriber::Subscriber, }, - tracing_subscriber::layer::{Context, Layer, Layered}, + tracing_subscriber::layer::{Context, Layer}, }; +use std::fmt::Formatter; +use std::marker::PhantomData; +use std::sync::Arc; /// A FilteredLayer wraps a tracing subscriber Layer and passes events only /// if the provided EnvFilter accepts the event. @@ -16,34 +21,32 @@ use observability_deps::{ /// /// A FilteredLayer on the other hand, allows to restrict the verbosity of one event sink /// without throwing away events available to other layers. -pub struct FilteredLayer +#[derive(Debug)] +pub struct FilteredLayer where S: Subscriber, - F: Layer, L: Layer, { - inner: Layered, + inner: L, + _phantom_data: PhantomData, } -impl FilteredLayer +impl FilteredLayer where S: Subscriber, - F: Layer, L: Layer, { - // TODO remove when we'll use this - #[allow(dead_code)] - pub fn new(filter: F, delegate: L) -> Self { + pub fn new(inner: L) -> Self { Self { - inner: delegate.and_then(filter), + inner, + _phantom_data: Default::default(), } } } -impl Layer for FilteredLayer +impl Layer for FilteredLayer where S: Subscriber, - F: Layer, L: Layer, { fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) { @@ -74,6 +77,101 @@ where } } +/// A union filter is a filtering Layer that returns "enabled: true" +/// for events for which at least one of the inner filters returns "enabled: true". +/// +/// This is the opposite of what the main subscriber's layered chain does, where +/// if one filter says nope, the event is filtered out. +/// +/// Since there is a blanked `impl Layer for Option` that returns +/// "enabled: true" when the option is None, it would be very confusing if an +/// user accidentally passed such a none to the filter vector in the union filter. +/// However it's quite tempting to pass an optional layer around hoping it "does the right thing", +/// since it does indeed work in other places a layer is passed to a subscriber builder +/// (moreover, due to the static typing builder pattern used there, there is no way of +/// conditionally adding a filter other than passing an optional filter). Thus, it would be +/// (and it happened in practice) quite confusing for this API to technically accept optional +/// layers and silently do the wrong thing with them. +/// For this reason the [`UnionFilter::new`] method accepts a vector of [`Option`][option]s. +/// It's not perfect, since a user could pass an `Option>` but hopefully +/// +/// This filter is intended to be used together with the [`FilteredLayer`] layer +/// which will filter unwanted events for each of the. +/// +/// This [`UnionFilter`] and the [`FilteredLayer`] are likely to share filters. +/// Unfortunately the [`EnvFilter`][envfilter] doesn't implement [`Clone`]. +/// See [`CloneableEnvFilter`] for a workaround. +/// +/// [envfilter]: observability_deps::tracing_subscriber::EnvFilter +/// [option]: std::option::Option +pub struct UnionFilter +where + S: Subscriber, +{ + inner: Vec + Send + Sync + 'static>>, + _phantom_data: PhantomData, +} + +impl UnionFilter +where + S: Subscriber, +{ + pub fn new(inner: Vec + Send + Sync + 'static>>>) -> Self { + let inner = inner.into_iter().flatten().collect(); + Self { + inner, + _phantom_data: Default::default(), + } + } +} + +impl Layer for UnionFilter +where + S: Subscriber, +{ + /// Return the disjunction of all the enabled flags of all the inner filters which are not None. + /// + /// A None filter doesn't really mean "I want this event", nor it means "I want to filter this event out"; + /// it just means "please ignore me, I'm not really a filter". + /// + /// Yet, there is a blanked implementation of `Option` for all `L: Layer`, and its implementation + /// if the `enabled()` method returns true. This works because the normal subscriber layer chain + /// performs a conjunction of each filter decision. + /// + /// However, the [`UnionFilter`] here is the opposite, we want to enable processing of one event + /// as long as one of the event filters registers interest in it. + fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool { + self.inner.iter().any(|i| i.enabled(metadata, ctx.clone())) + } +} + +impl std::fmt::Debug for UnionFilter +where + S: Subscriber, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str("UnionFilter(...)") + } +} + +#[derive(Clone, Debug)] +pub struct CloneableEnvFilter(Arc); + +impl CloneableEnvFilter { + pub fn new(inner: EnvFilter) -> Self { + Self(Arc::new(inner)) + } +} + +impl Layer for CloneableEnvFilter +where + S: Subscriber, +{ + fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool { + self.0.enabled(metadata, ctx) + } +} + #[cfg(test)] mod tests { use super::*; @@ -113,8 +211,8 @@ mod tests { .with_writer(move || SynchronizedWriter::new(Arc::clone(&writer2_captured))); let subscriber = tracing_subscriber::Registry::default() - .with(FilteredLayer::new(filter1, layer1)) - .with(FilteredLayer::new(filter2, layer2)); + .with(FilteredLayer::new(filter1.and_then(layer1))) + .with(FilteredLayer::new(filter2.and_then(layer2))); tracing::subscriber::with_default(subscriber, workload); diff --git a/trogging/src/lib.rs b/trogging/src/lib.rs index a10cee4e4e..743469a16c 100644 --- a/trogging/src/lib.rs +++ b/trogging/src/lib.rs @@ -13,7 +13,9 @@ #[cfg(feature = "structopt")] pub mod cli; pub mod config; +pub mod layered_tracing; +use crate::layered_tracing::{CloneableEnvFilter, FilteredLayer, UnionFilter}; pub use config::*; use observability_deps::{ @@ -26,7 +28,7 @@ use observability_deps::{ self, fmt::{self, writer::BoxMakeWriter, MakeWriter}, layer::SubscriberExt, - EnvFilter, + EnvFilter, Layer, }, }; use std::cmp::min; @@ -404,14 +406,32 @@ where ), }; + let log_filter = self.log_filter.unwrap_or(self.default_log_filter); + + // construct the union filter which allows us to skip evaluating the expensive field values unless + // at least one of the filters is interested in the events. + // e.g. consider: `debug!(foo=bar(), "baz");` + // `bar()` will only be called if either the log_filter or the traces_layer_filter is at debug level for that module. + let log_filter = CloneableEnvFilter::new(log_filter); + let traces_layer_filter = traces_layer_filter.map(CloneableEnvFilter::new); + let union_filter = UnionFilter::new(vec![ + Some(Box::new(log_filter.clone())), + traces_layer_filter.clone().map(|l| Box::new(l) as _), + ]); + let subscriber = tracing_subscriber::Registry::default() - .with(self.log_filter.unwrap_or(self.default_log_filter)) - .with(log_format_full) - .with(log_format_pretty) - .with(log_format_json) - .with(log_format_logfmt) - .with(traces_layer_otel) - .with(traces_layer_filter); + .with(union_filter) + .with(FilteredLayer::new( + log_filter + .and_then(log_format_full) + .and_then(log_format_pretty) + .and_then(log_format_json) + .and_then(log_format_logfmt), + )) + .with( + traces_layer_filter + .map(|filter| FilteredLayer::new(filter.and_then(traces_layer_otel))), + ); Ok(subscriber) }