feat: workaround bug/limitation in OT handling of observers (#1457)

* feat: workardound bug/limitation in OT handling of observers

* chore: fix lints

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2021-05-07 19:53:28 +01:00 committed by GitHub
parent 60f4b927e8
commit e2e3c9f77c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 374 additions and 35 deletions

1
Cargo.lock generated
View File

@ -1863,6 +1863,7 @@ dependencies = [
"dashmap",
"observability_deps",
"opentelemetry-prometheus",
"parking_lot",
"prometheus",
"snafu",
]

View File

@ -15,6 +15,7 @@ edition = "2018"
[dependencies] # In alphabetical order
observability_deps = { path = "../observability_deps" }
parking_lot = "0.11.1"
prometheus = "0.12"
opentelemetry-prometheus = "0.6"
snafu = "0.6"

View File

@ -6,6 +6,7 @@
clippy::clone_on_ref_ptr
)]
mod metrics;
mod observer;
mod tests;
use dashmap::{mapref::multiple::RefMulti, DashMap};
@ -28,6 +29,7 @@ use prometheus::{Encoder, Registry, TextEncoder};
use std::sync::Arc;
pub use crate::metrics::{Counter, Gauge, Histogram, KeyValue, RedMetric};
use crate::observer::ObserverCollection;
pub use crate::tests::*;
/// A registry responsible for initialising IOx metrics and exposing their
@ -48,6 +50,7 @@ pub use crate::tests::*;
pub struct MetricRegistry {
provider: RegistryMeterProvider,
exporter: PrometheusExporter,
observers: Arc<ObserverCollection>,
}
impl MetricRegistry {
@ -100,7 +103,7 @@ impl MetricRegistry {
/// sub-system, crate or similar.
pub fn register_domain(&self, name: &'static str) -> Domain {
let meter = self.provider.meter(name, None);
Domain::new(name, meter)
Domain::new(name, meter, Arc::clone(&self.observers))
}
}
@ -130,8 +133,13 @@ impl Default for MetricRegistry {
.unwrap();
let provider = exporter.provider().unwrap();
let observers = Arc::new(ObserverCollection::new(provider.meter("observers", None)));
Self { provider, exporter }
Self {
provider,
exporter,
observers,
}
}
}
@ -145,11 +153,20 @@ impl Default for MetricRegistry {
pub struct Domain {
name: &'static str,
meter: OTMeter,
observers: Arc<ObserverCollection>,
}
impl Domain {
pub(crate) fn new(name: &'static str, meter: OTMeter) -> Self {
Self { name, meter }
pub(crate) fn new(
name: &'static str,
meter: OTMeter,
observers: Arc<ObserverCollection>,
) -> Self {
Self {
name,
meter,
observers,
}
}
// Creates an appropriate metric name based on the Domain's name, and
@ -322,20 +339,20 @@ impl Domain {
let values = Arc::new(DashMap::new());
let values_captured = Arc::clone(&values);
let gauge = self
.meter
.f64_value_observer(self.build_metric_name(name, None, unit, None), move |arg| {
self.observers.f64_value_observer(
self.build_metric_name(name, None, unit, None),
description,
move |arg| {
for i in values_captured.iter() {
// currently rust type inference cannot deduct the type of i.value()
let i: RefMulti<'_, _, (f64, Vec<KeyValue>), _> = i;
let &(value, ref labels) = i.value();
arg.observe(value, labels);
}
})
.with_description(description)
.init();
},
);
metrics::Gauge::new(gauge, default_labels, values)
metrics::Gauge::new(default_labels, values)
}
pub fn register_observer(
@ -384,15 +401,30 @@ impl<'a> MetricObserverBuilder<'a> {
) where
F: Fn(TaggedObserverResult<'_, u64>) + Send + Sync + 'static,
{
self.domain
.meter
.u64_value_observer(
self.domain
.build_metric_name(name, self.subname, unit, None),
TaggedObserverResult::with_callback(self.labels.to_owned(), callback),
)
.with_description(description)
.init();
self.domain.observers.u64_value_observer(
self.domain
.build_metric_name(name, self.subname, unit, None),
description,
TaggedObserverResult::with_callback(self.labels.to_owned(), callback),
)
}
/// Register a f64 gauge
pub fn register_gauge_f64<F>(
&self,
name: &str,
unit: Option<&str>,
description: impl Into<String>,
callback: F,
) where
F: Fn(TaggedObserverResult<'_, f64>) + Send + Sync + 'static,
{
self.domain.observers.f64_value_observer(
self.domain
.build_metric_name(name, self.subname, unit, None),
description,
TaggedObserverResult::with_callback(self.labels.to_owned(), callback),
)
}
/// Register a u64 counter
@ -405,21 +437,36 @@ impl<'a> MetricObserverBuilder<'a> {
) where
F: Fn(TaggedObserverResult<'_, u64>) + Send + Sync + 'static,
{
self.domain
.meter
.u64_sum_observer(
self.domain
.build_metric_name(name, self.subname, unit, Some("total")),
TaggedObserverResult::with_callback(self.labels.to_owned(), callback),
)
.with_description(description)
.init();
self.domain.observers.u64_sum_observer(
self.domain
.build_metric_name(name, self.subname, unit, None),
description,
TaggedObserverResult::with_callback(self.labels.to_owned(), callback),
)
}
/// Register a f64 counter
pub fn register_counter_f64<F>(
&self,
name: &str,
unit: Option<&str>,
description: impl Into<String>,
callback: F,
) where
F: Fn(TaggedObserverResult<'_, f64>) + Send + Sync + 'static,
{
self.domain.observers.f64_sum_observer(
self.domain
.build_metric_name(name, self.subname, unit, None),
description,
TaggedObserverResult::with_callback(self.labels.to_owned(), callback),
)
}
}
#[derive(Debug)]
pub struct TaggedObserverResult<'a, T> {
observer: ObserverResult<T>,
observer: &'a ObserverResult<T>,
labels: &'a [KeyValue],
}
@ -430,7 +477,7 @@ where
fn with_callback<F>(
labels: Vec<KeyValue>,
callback: F,
) -> impl Fn(ObserverResult<T>) + Send + Sync + 'static
) -> impl Fn(&ObserverResult<T>) + Send + Sync + 'static
where
F: Fn(TaggedObserverResult<'_, T>) + Send + Sync + 'static,
{
@ -895,4 +942,124 @@ mod test {
.eq(43.0)
.unwrap();
}
#[test]
fn test_basic() {
let reg = TestMetricRegistry::default();
let d1 = reg.registry().register_domain("A");
let d2 = reg.registry().register_domain("A");
d1.meter
.u64_sum_observer("mem", |observer| {
observer.observe(23, &[KeyValue::new("a", "b")])
})
.init();
d2.meter
.u64_sum_observer("mem", |observer| {
observer.observe(232, &[KeyValue::new("c", "d")])
})
.init();
reg.has_metric_family("mem")
.with_labels(&[("a", "b")])
.counter()
.eq(23.)
.unwrap();
// If this passes it implies the upstream SDK has been fixed and the hacky
// observer shim can be removed
reg.has_metric_family("mem")
.with_labels(&[("c", "d")])
.counter()
.eq(232.)
.expect_err("expected default SDK to not handle correctly");
}
#[test]
fn test_duplicate() {
let reg = TestMetricRegistry::default();
let domain = reg.registry().register_domain("test");
domain.register_observer(None, &[], |builder: MetricObserverBuilder<'_>| {
builder.register_gauge_u64("mem", None, "", |observer| {
observer.observe(1, &[KeyValue::new("a", "b")]);
observer.observe(2, &[KeyValue::new("c", "d")]);
});
builder.register_gauge_u64("mem", None, "", |observer| {
observer.observe(3, &[KeyValue::new("e", "f")]);
});
builder.register_counter_u64("disk", None, "", |observer| {
observer.observe(1, &[KeyValue::new("a", "b")]);
});
builder.register_counter_f64("float", None, "", |observer| {
observer.observe(1., &[KeyValue::new("a", "b")]);
});
builder.register_gauge_f64("gauge", None, "", |observer| {
observer.observe(1., &[KeyValue::new("a", "b")]);
});
});
domain.register_observer(None, &[], |builder: MetricObserverBuilder<'_>| {
builder.register_gauge_u64("mem", None, "", |observer| {
observer.observe(4, &[KeyValue::new("g", "h")]);
});
builder.register_gauge_f64("gauge", None, "", |observer| {
observer.observe(2., &[KeyValue::new("c", "d")]);
});
});
reg.has_metric_family("test_mem")
.with_labels(&[("a", "b")])
.gauge()
.eq(1.)
.unwrap();
reg.has_metric_family("test_mem")
.with_labels(&[("c", "d")])
.gauge()
.eq(2.)
.unwrap();
reg.has_metric_family("test_mem")
.with_labels(&[("e", "f")])
.gauge()
.eq(3.)
.unwrap();
reg.has_metric_family("test_mem")
.with_labels(&[("g", "h")])
.gauge()
.eq(4.)
.unwrap();
reg.has_metric_family("test_disk")
.with_labels(&[("a", "b")])
.counter()
.eq(1.)
.unwrap();
reg.has_metric_family("test_float")
.with_labels(&[("a", "b")])
.counter()
.eq(1.)
.unwrap();
reg.has_metric_family("test_gauge")
.with_labels(&[("a", "b")])
.gauge()
.eq(1.)
.unwrap();
reg.has_metric_family("test_gauge")
.with_labels(&[("c", "d")])
.gauge()
.eq(2.)
.unwrap();
}
}

View File

@ -8,7 +8,7 @@ use std::{
use dashmap::DashMap;
use observability_deps::opentelemetry::{
labels,
metrics::{Counter as OTCounter, ValueObserver as OTGauge, ValueRecorder as OTHistogram},
metrics::{Counter as OTCounter, ValueRecorder as OTHistogram},
};
pub use observability_deps::opentelemetry::KeyValue;
@ -284,19 +284,16 @@ impl Counter {
/// (e.g. current memory usage)
#[derive(Debug)]
pub struct Gauge {
gauge: OTGauge<f64>,
default_labels: Vec<KeyValue>,
values: Arc<DashMap<String, (f64, Vec<KeyValue>)>>,
}
impl Gauge {
pub(crate) fn new(
gauge: OTGauge<f64>,
default_labels: Vec<KeyValue>,
values: Arc<DashMap<String, (f64, Vec<KeyValue>)>>,
) -> Self {
Self {
gauge,
default_labels,
values,
}

173
metrics/src/observer.rs Normal file
View File

@ -0,0 +1,173 @@
//! This module is a gnarly hack around the fact opentelemetry doesn't currently let you
//! register an observer multiple times with the same name
//!
//! See https://github.com/open-telemetry/opentelemetry-rust/issues/541
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;
use parking_lot::Mutex;
use observability_deps::opentelemetry::metrics::{Meter, ObserverResult};
type CallbackFunc<T> = Box<dyn Fn(&ObserverResult<T>) + Send + Sync + 'static>;
#[derive(Clone)]
struct CallbackCollection<T>(Arc<Mutex<Vec<CallbackFunc<T>>>>);
impl<T> CallbackCollection<T> {
fn new<F>(f: F) -> Self
where
F: Fn(&ObserverResult<T>) + Send + Sync + 'static,
{
Self(Arc::new(Mutex::new(vec![Box::new(f)])))
}
fn push<F>(&self, f: F)
where
F: Fn(&ObserverResult<T>) + Send + Sync + 'static,
{
self.0.lock().push(Box::new(f))
}
fn invoke(&self, observer: ObserverResult<T>) {
let callbacks = self.0.lock();
for callback in callbacks.iter() {
callback(&observer)
}
}
}
enum Callbacks {
U64Value(CallbackCollection<u64>),
U64Sum(CallbackCollection<u64>),
F64Value(CallbackCollection<f64>),
F64Sum(CallbackCollection<f64>),
}
impl std::fmt::Debug for Callbacks {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self {
Self::U64Value(_) => write!(f, "U64Value"),
Self::U64Sum(_) => write!(f, "U64Sum"),
Self::F64Value(_) => write!(f, "F64Value"),
Self::F64Sum(_) => write!(f, "F64Sum"),
}
}
}
#[derive(Debug)]
pub struct ObserverCollection {
callbacks: Mutex<HashMap<String, Callbacks>>,
meter: Meter,
}
impl ObserverCollection {
pub fn new(meter: Meter) -> Self {
Self {
callbacks: Default::default(),
meter,
}
}
pub fn u64_value_observer<F>(
&self,
name: impl Into<String>,
description: impl Into<String>,
callback: F,
) where
F: Fn(&ObserverResult<u64>) + Send + Sync + 'static,
{
match self.callbacks.lock().entry(name.into()) {
Entry::Occupied(occupied) => match occupied.get() {
Callbacks::U64Value(callbacks) => callbacks.push(Box::new(callback)),
c => panic!("metric type mismatch, expected U64Value got {:?}", c),
},
Entry::Vacant(vacant) => {
let name = vacant.key().clone();
let callbacks = CallbackCollection::new(callback);
vacant.insert(Callbacks::U64Value(callbacks.clone()));
self.meter
.u64_value_observer(name, move |observer| callbacks.invoke(observer))
.with_description(description)
.init();
}
}
}
pub fn u64_sum_observer<F>(
&self,
name: impl Into<String>,
description: impl Into<String>,
callback: F,
) where
F: Fn(&ObserverResult<u64>) + Send + Sync + 'static,
{
match self.callbacks.lock().entry(name.into()) {
Entry::Occupied(occupied) => match occupied.get() {
Callbacks::U64Sum(callbacks) => callbacks.push(Box::new(callback)),
c => panic!("metric type mismatch, expected U64Sum got {:?}", c),
},
Entry::Vacant(vacant) => {
let name = vacant.key().clone();
let callbacks = CallbackCollection::new(callback);
vacant.insert(Callbacks::U64Sum(callbacks.clone()));
self.meter
.u64_sum_observer(name, move |observer| callbacks.invoke(observer))
.with_description(description)
.init();
}
}
}
pub fn f64_value_observer<F>(
&self,
name: impl Into<String>,
description: impl Into<String>,
callback: F,
) where
F: Fn(&ObserverResult<f64>) + Send + Sync + 'static,
{
match self.callbacks.lock().entry(name.into()) {
Entry::Occupied(occupied) => match occupied.get() {
Callbacks::F64Value(callbacks) => callbacks.push(Box::new(callback)),
c => panic!("metric type mismatch, expected F64Value got {:?}", c),
},
Entry::Vacant(vacant) => {
let name = vacant.key().clone();
let callbacks = CallbackCollection::new(callback);
vacant.insert(Callbacks::F64Value(callbacks.clone()));
self.meter
.f64_value_observer(name, move |observer| callbacks.invoke(observer))
.with_description(description)
.init();
}
}
}
pub fn f64_sum_observer<F>(
&self,
name: impl Into<String>,
description: impl Into<String>,
callback: F,
) where
F: Fn(&ObserverResult<f64>) + Send + Sync + 'static,
{
match self.callbacks.lock().entry(name.into()) {
Entry::Occupied(occupied) => match occupied.get() {
Callbacks::F64Sum(callbacks) => callbacks.push(Box::new(callback)),
c => panic!("metric type mismatch, expected F64Sum got {:?}", c),
},
Entry::Vacant(vacant) => {
let name = vacant.key().clone();
let callbacks = CallbackCollection::new(callback);
vacant.insert(Callbacks::F64Sum(callbacks.clone()));
self.meter
.f64_sum_observer(name, move |observer| callbacks.invoke(observer))
.with_description(description)
.init();
}
}
}
}