Merge pull request #1796 from influxdata/layered-tracing

fix: Use layered tracing
pull/24376/head
kodiakhq[bot] 2021-06-28 16:45:55 +00:00 committed by GitHub
commit 6aa5cfa937
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 140 additions and 23 deletions

View File

@ -20,7 +20,6 @@ use tikv_jemallocator::Jemalloc;
mod commands {
pub mod database;
pub mod layered_tracing;
pub mod operations;
pub mod run;
pub mod server;

View File

@ -1,11 +1,16 @@
use observability_deps::tracing::Metadata;
use observability_deps::tracing_subscriber::EnvFilter;
use observability_deps::{
tracing::{
event::Event,
span::{Attributes, Id, Record},
subscriber::Subscriber,
},
tracing_subscriber::layer::{Context, Layer, Layered},
tracing_subscriber::layer::{Context, Layer},
};
use std::fmt::Formatter;
use std::marker::PhantomData;
use std::sync::Arc;
/// A FilteredLayer wraps a tracing subscriber Layer and passes events only
/// if the provided EnvFilter accepts the event.
@ -16,34 +21,32 @@ use observability_deps::{
///
/// A FilteredLayer on the other hand, allows to restrict the verbosity of one event sink
/// without throwing away events available to other layers.
pub struct FilteredLayer<S, F, L>
#[derive(Debug)]
pub struct FilteredLayer<S, L>
where
S: Subscriber,
F: Layer<S>,
L: Layer<S>,
{
inner: Layered<F, L, S>,
inner: L,
_phantom_data: PhantomData<S>,
}
impl<S, F, L> FilteredLayer<S, F, L>
impl<S, L> FilteredLayer<S, L>
where
S: Subscriber,
F: Layer<S>,
L: Layer<S>,
{
// TODO remove when we'll use this
#[allow(dead_code)]
pub fn new(filter: F, delegate: L) -> Self {
pub fn new(inner: L) -> Self {
Self {
inner: delegate.and_then(filter),
inner,
_phantom_data: Default::default(),
}
}
}
impl<S, F, L> Layer<S> for FilteredLayer<S, F, L>
impl<S, L> Layer<S> for FilteredLayer<S, L>
where
S: Subscriber,
F: Layer<S>,
L: Layer<S>,
{
fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
@ -74,6 +77,101 @@ where
}
}
/// A union filter is a filtering Layer that returns "enabled: true"
/// for events for which at least one of the inner filters returns "enabled: true".
///
/// This is the opposite of what the main subscriber's layered chain does, where
/// if one filter says nope, the event is filtered out.
///
/// Since there is a blanked `impl<L: Layer> Layer for Option<L>` that returns
/// "enabled: true" when the option is None, it would be very confusing if an
/// user accidentally passed such a none to the filter vector in the union filter.
/// However it's quite tempting to pass an optional layer around hoping it "does the right thing",
/// since it does indeed work in other places a layer is passed to a subscriber builder
/// (moreover, due to the static typing builder pattern used there, there is no way of
/// conditionally adding a filter other than passing an optional filter). Thus, it would be
/// (and it happened in practice) quite confusing for this API to technically accept optional
/// layers and silently do the wrong thing with them.
/// For this reason the [`UnionFilter::new`] method accepts a vector of [`Option`][option]s.
/// It's not perfect, since a user could pass an `Option<Option<L>>` but hopefully
///
/// This filter is intended to be used together with the [`FilteredLayer`] layer
/// which will filter unwanted events for each of the.
///
/// This [`UnionFilter`] and the [`FilteredLayer`] are likely to share filters.
/// Unfortunately the [`EnvFilter`][envfilter] doesn't implement [`Clone`].
/// See [`CloneableEnvFilter`] for a workaround.
///
/// [envfilter]: observability_deps::tracing_subscriber::EnvFilter
/// [option]: std::option::Option
pub struct UnionFilter<S>
where
S: Subscriber,
{
inner: Vec<Box<dyn Layer<S> + Send + Sync + 'static>>,
_phantom_data: PhantomData<S>,
}
impl<S> UnionFilter<S>
where
S: Subscriber,
{
pub fn new(inner: Vec<Option<Box<dyn Layer<S> + Send + Sync + 'static>>>) -> Self {
let inner = inner.into_iter().flatten().collect();
Self {
inner,
_phantom_data: Default::default(),
}
}
}
impl<S> Layer<S> for UnionFilter<S>
where
S: Subscriber,
{
/// Return the disjunction of all the enabled flags of all the inner filters which are not None.
///
/// A None filter doesn't really mean "I want this event", nor it means "I want to filter this event out";
/// it just means "please ignore me, I'm not really a filter".
///
/// Yet, there is a blanked implementation of `Option<L>` for all `L: Layer`, and its implementation
/// if the `enabled()` method returns true. This works because the normal subscriber layer chain
/// performs a conjunction of each filter decision.
///
/// However, the [`UnionFilter`] here is the opposite, we want to enable processing of one event
/// as long as one of the event filters registers interest in it.
fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
self.inner.iter().any(|i| i.enabled(metadata, ctx.clone()))
}
}
impl<S> std::fmt::Debug for UnionFilter<S>
where
S: Subscriber,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.write_str("UnionFilter(...)")
}
}
#[derive(Clone, Debug)]
pub struct CloneableEnvFilter(Arc<EnvFilter>);
impl CloneableEnvFilter {
pub fn new(inner: EnvFilter) -> Self {
Self(Arc::new(inner))
}
}
impl<S> Layer<S> for CloneableEnvFilter
where
S: Subscriber,
{
fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
self.0.enabled(metadata, ctx)
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -113,8 +211,8 @@ mod tests {
.with_writer(move || SynchronizedWriter::new(Arc::clone(&writer2_captured)));
let subscriber = tracing_subscriber::Registry::default()
.with(FilteredLayer::new(filter1, layer1))
.with(FilteredLayer::new(filter2, layer2));
.with(FilteredLayer::new(filter1.and_then(layer1)))
.with(FilteredLayer::new(filter2.and_then(layer2)));
tracing::subscriber::with_default(subscriber, workload);

View File

@ -13,7 +13,9 @@
#[cfg(feature = "structopt")]
pub mod cli;
pub mod config;
pub mod layered_tracing;
use crate::layered_tracing::{CloneableEnvFilter, FilteredLayer, UnionFilter};
pub use config::*;
use observability_deps::{
@ -26,7 +28,7 @@ use observability_deps::{
self,
fmt::{self, writer::BoxMakeWriter, MakeWriter},
layer::SubscriberExt,
EnvFilter,
EnvFilter, Layer,
},
};
use std::cmp::min;
@ -404,14 +406,32 @@ where
),
};
let log_filter = self.log_filter.unwrap_or(self.default_log_filter);
// construct the union filter which allows us to skip evaluating the expensive field values unless
// at least one of the filters is interested in the events.
// e.g. consider: `debug!(foo=bar(), "baz");`
// `bar()` will only be called if either the log_filter or the traces_layer_filter is at debug level for that module.
let log_filter = CloneableEnvFilter::new(log_filter);
let traces_layer_filter = traces_layer_filter.map(CloneableEnvFilter::new);
let union_filter = UnionFilter::new(vec![
Some(Box::new(log_filter.clone())),
traces_layer_filter.clone().map(|l| Box::new(l) as _),
]);
let subscriber = tracing_subscriber::Registry::default()
.with(self.log_filter.unwrap_or(self.default_log_filter))
.with(log_format_full)
.with(log_format_pretty)
.with(log_format_json)
.with(log_format_logfmt)
.with(traces_layer_otel)
.with(traces_layer_filter);
.with(union_filter)
.with(FilteredLayer::new(
log_filter
.and_then(log_format_full)
.and_then(log_format_pretty)
.and_then(log_format_json)
.and_then(log_format_logfmt),
))
.with(
traces_layer_filter
.map(|filter| FilteredLayer::new(filter.and_then(traces_layer_otel))),
);
Ok(subscriber)
}