diff --git a/Cargo.lock b/Cargo.lock index 7906c8bf5d..30f8070f9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1825,6 +1825,7 @@ dependencies = [ name = "metrics" version = "0.1.0" dependencies = [ + "dashmap", "observability_deps", "snafu", ] diff --git a/metrics/Cargo.toml b/metrics/Cargo.toml index 2f1e124108..29a332b29b 100644 --- a/metrics/Cargo.toml +++ b/metrics/Cargo.toml @@ -4,18 +4,19 @@ version = "0.1.0" authors = ["Edd Robinson "] edition = "2018" -# This crate contains the application-specific abstraction of the metrics -# implementation for IOx. Any crate that wants to expose telemetry should use -# this crate. +# This crate contains the application-specific abstraction of the metrics +# implementation for IOx. Any crate that wants to expose telemetry should use +# this crate. # -# The crate exposes a special type of metrics registry which allows users to -# isolate their metrics from the rest of the system so they can be tested +# The crate exposes a special type of metrics registry which allows users to +# isolate their metrics from the rest of the system so they can be tested # in isolation. [dependencies] # In alphabetical order observability_deps = { path = "../observability_deps" } snafu = "0.6" +dashmap = { version = "4.0.1" } [dev-dependencies] # In alphabetical order diff --git a/metrics/src/lib.rs b/metrics/src/lib.rs index dc8d7b40e8..b673dc2039 100644 --- a/metrics/src/lib.rs +++ b/metrics/src/lib.rs @@ -8,6 +8,7 @@ mod metrics; mod tests; +use dashmap::{mapref::multiple::RefMulti, DashMap}; use observability_deps::{ opentelemetry::metrics::Meter as OTMeter, opentelemetry::{ @@ -21,6 +22,7 @@ use observability_deps::{ prometheus::{Encoder, Registry, TextEncoder}, tracing::*, }; +use std::sync::Arc; pub use crate::metrics::{Counter, Histogram, KeyValue, RedMetric}; pub use crate::tests::*; @@ -268,6 +270,72 @@ impl Domain { HistogramBuilder::new(histogram) } + + /// Registers a new gauge metric. + /// + /// A gauge is a metric that represents a single numerical value that can arbitrarily go up and down. + /// Gauges are typically used for measured values like temperatures or current memory usage, but also + /// "counts" that can go up and down, like the number of concurrent requests. + /// + /// `name` should be a noun that describes the thing being observed, e.g., + /// threads, buffers, ... + /// + /// `unit` is optional and will appear in the metric name before a final + /// `total` suffix. Consider reviewing + /// https://prometheus.io/docs/practices/naming/#base-units for appropriate + pub fn register_gauge_metric( + &self, + name: &str, + unit: Option<&str>, + description: impl Into, + ) -> metrics::Gauge { + self.register_gauge_metric_with_labels(name, unit, description, vec![]) + } + + /// Registers a new gauge metric with default labels. + pub fn register_gauge_metric_with_labels( + &self, + name: &str, + unit: Option<&str>, + description: impl Into, + default_labels: Vec, + ) -> metrics::Gauge { + // A gauge is technically a ValueRecorder for which we only care about the last value. + // Unfortunately, in opentelemetry it appears that such behavior is selected by crafting + // a custom selector that will then apply a LastValueAggregator. This is all very + // complicated and turned making gauges into such a wild yak shave, that I (mkm), opted for + // an alternative approach: + // + // A Gauge is backed by ValueObserver. A ValueObserver is an asynchronous instrument that invokes a + // callback at scrape time (i.e. when the prometheus exporter renders the metrics as text). + // The Gauge itself records the values in a map keyed by label sets and returns all the recorded + // values when the ValueObserver invokes the callback. + + let values = Arc::new(DashMap::new()); + let values_captured = Arc::clone(&values); + let gauge = self + .meter + .u64_value_observer( + match unit { + Some(unit) => { + format!("{}.{}", self.build_metric_prefix(name, None), unit) + } + None => self.build_metric_prefix(name, None), + }, + move |arg| { + for i in values_captured.iter() { + // currently rust type inference cannot deduct the type of i.value() + let i: RefMulti<'_, _, (u64, Vec), _> = i; + let &(value, ref labels) = i.value(); + arg.observe(value, labels); + } + }, + ) + .with_description(description) + .init(); + + metrics::Gauge::new(gauge, default_labels, values) + } } #[derive(Debug)] @@ -581,4 +649,58 @@ mod test { // .bucket_cumulative_count_eq(0.12, 0) // .unwrap(); } + + #[test] + fn gauge_metric() { + let reg = TestMetricRegistry::default(); + let domain = reg.registry().register_domain("http"); + + // create a gauge metric + let metric = domain.register_gauge_metric_with_labels( + "mem", + Some("bytes"), + "currently used bytes", + vec![KeyValue::new("tier", "a")], + ); + + metric.set(40); + metric.set_with_labels(41, &[KeyValue::new("tier", "b")]); + metric.set_with_labels( + 42, + &[KeyValue::new("tier", "c"), KeyValue::new("tier", "b")], + ); + metric.set_with_labels(43, &[KeyValue::new("beer", "c")]); + + reg.has_metric_family("http_mem_bytes") + .with_labels(&[("tier", "a")]) + .gauge() + .eq(40.0) + .unwrap(); + + reg.has_metric_family("http_mem_bytes") + .with_labels(&[("tier", "b")]) + .gauge() + .eq(42.0) + .unwrap(); + + reg.has_metric_family("http_mem_bytes") + .with_labels(&[("tier", "a"), ("beer", "c")]) + .gauge() + .eq(43.0) + .unwrap(); + + let rendered = reg.registry().metrics_as_str(); + assert_eq!( + rendered, + vec![ + r#"# HELP http_mem_bytes currently used bytes"#, + r#"# TYPE http_mem_bytes gauge"#, + r#"http_mem_bytes{tier="a"} 40"#, + r#"http_mem_bytes{tier="b"} 42"#, + r#"http_mem_bytes{beer="c",tier="a"} 43"#, + "", + ] + .join("\n") + ); + } } diff --git a/metrics/src/metrics.rs b/metrics/src/metrics.rs index a0be1cd37d..67e087128c 100644 --- a/metrics/src/metrics.rs +++ b/metrics/src/metrics.rs @@ -1,11 +1,14 @@ +use std::sync::Arc; use std::{ borrow::Cow, fmt::Display, time::{Duration, Instant}, }; -use observability_deps::opentelemetry::metrics::{ - Counter as OTCounter, ValueRecorder as OTHistogram, +use dashmap::DashMap; +use observability_deps::opentelemetry::{ + labels, + metrics::{Counter as OTCounter, ValueObserver as OTGauge, ValueRecorder as OTHistogram}, }; pub use observability_deps::opentelemetry::KeyValue; @@ -216,13 +219,14 @@ where self.observe(RedRequestStatus::Error, duration, labels); } } -#[derive(Debug)] + /// A Counter is a metric exposing a monotonically increasing counter. /// It is best used to track increases in something over time. /// /// If you want to track some notion of success, failure and latency consider /// using a `REDMetric` instead rather than expressing that with labels on a /// `Counter`. +#[derive(Debug)] pub struct Counter { counter: OTCounter, default_labels: Vec, @@ -274,8 +278,54 @@ impl Counter { } } +/// A Gauge is an asynchronous instrument that reports +/// the last value set +/// +/// (e.g. current memory usage) #[derive(Debug)] +pub struct Gauge { + gauge: OTGauge, + default_labels: Vec, + values: Arc)>>, +} + +impl Gauge { + pub(crate) fn new( + gauge: OTGauge, + default_labels: Vec, + values: Arc)>>, + ) -> Self { + Self { + gauge, + default_labels, + values, + } + } + + // Set the gauge's `value` + pub fn set(&self, value: u64) { + self.set_with_labels(value, &[]); + } + + /// Set the gauge's `value` and associate the observation with the + /// provided labels. + pub fn set_with_labels(&self, value: u64, labels: &[KeyValue]) { + // Note: provided labels need to go last so that they overwrite + // any default labels. + + let mut labels_new = self.default_labels.clone(); + labels_new.extend(labels.iter().cloned()); + + // this may seem inefficient but it's nothing compared to this and more that happens in the + // internals of opentelemetry. There's lots of room for improvement everywhere. + let label_set = labels::LabelSet::from_labels(labels_new.iter().cloned()); + let encoded_labels = label_set.encoded(Some(&labels::DefaultLabelEncoder)); + self.values.insert(encoded_labels, (value, labels_new)); + } +} + /// A Histogram is a metric exposing a distribution of observations. +#[derive(Debug)] pub struct Histogram { default_labels: Vec, histogram: OTHistogram, diff --git a/metrics/src/tests.rs b/metrics/src/tests.rs index 93012608b9..f0f7678801 100644 --- a/metrics/src/tests.rs +++ b/metrics/src/tests.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use snafu::{ensure, OptionExt, Snafu}; use observability_deps::prometheus::proto::{ - Counter as PromCounter, Histogram as PromHistogram, MetricFamily, + Counter as PromCounter, Gauge as PromGauge, Histogram as PromHistogram, MetricFamily, }; use crate::MetricRegistry; @@ -173,6 +173,62 @@ impl<'a> AssertionBuilder<'a> { } } + /// Returns the gauge metric, allowing assertions to be applied. + pub fn gauge(&mut self) -> Gauge<'_> { + // sort the assertion's labels + self.labels.sort_by(|a, b| a.0.cmp(&b.0)); + + let metric = self.family.get_metric().iter().find(|metric| { + if metric.get_label().len() != self.labels.len() { + return false; // this metric can't match + } + + // sort this metrics labels and compare to assertion labels + let mut metric_labels = metric.get_label().to_vec(); + metric_labels.sort_by(|a, b| a.get_name().cmp(b.get_name())); + + // metric only matches if all labels are identical. + metric_labels + .iter() + .zip(self.labels.iter()) + .all(|(a, b)| a.get_name() == b.0 && a.get_value() == b.1) + }); + + // Can't find metric matching labels + if metric.is_none() { + return Gauge { + c: NoMatchingLabelsError { + name: self.family.get_name().to_owned(), + labels: self.labels.clone(), + metrics: self.registry.metrics_as_str(), + } + .fail(), + family_name: "".to_string(), + metric_dump: "".to_string(), + }; + } + let metric = metric.unwrap(); + + if !metric.has_gauge() { + return Gauge { + c: FailedMetricAssertionError { + name: self.family.get_name().to_owned(), + msg: "metric not a gauge".to_owned(), + metrics: self.registry.metrics_as_str(), + } + .fail(), + family_name: "".to_string(), + metric_dump: "".to_string(), + }; + } + + Gauge { + c: Ok(metric.get_gauge()), + family_name: self.family.get_name().to_owned(), + metric_dump: self.registry.metrics_as_str(), + } + } + /// Returns the histogram metric, allowing assertions to be applied. pub fn histogram(&mut self) -> Histogram<'_> { // sort the assertion's labels @@ -314,6 +370,88 @@ impl<'a> Counter<'a> { } } +#[derive(Debug)] +pub struct Gauge<'a> { + // if there was a problem getting the gauge based on labels then the + // Error will contain details. + c: Result<&'a PromGauge, Error>, + + family_name: String, + metric_dump: String, +} + +impl<'a> Gauge<'a> { + pub fn eq(self, v: f64) -> Result<(), Error> { + let c = self.c?; // return previous errors + + ensure!( + v == c.get_value(), + FailedMetricAssertionError { + name: self.family_name, + msg: format!("{:?} == {:?} failed", c.get_value(), v), + metrics: self.metric_dump, + } + ); + Ok(()) + } + + pub fn gte(self, v: f64) -> Result<(), Error> { + let c = self.c?; // return previous errors + + ensure!( + c.get_value() >= v, + FailedMetricAssertionError { + name: self.family_name, + msg: format!("{:?} >= {:?} failed", c.get_value(), v), + metrics: self.metric_dump, + } + ); + Ok(()) + } + + pub fn gt(self, v: f64) -> Result<(), Error> { + let c = self.c?; // return previous errors + + ensure!( + c.get_value() > v, + FailedMetricAssertionError { + name: self.family_name, + msg: format!("{:?} > {:?} failed", c.get_value(), v), + metrics: self.metric_dump, + } + ); + Ok(()) + } + + pub fn lte(self, v: f64) -> Result<(), Error> { + let c = self.c?; // return previous errors + + ensure!( + c.get_value() <= v, + FailedMetricAssertionError { + name: self.family_name, + msg: format!("{:?} <= {:?} failed", c.get_value(), v), + metrics: self.metric_dump, + } + ); + Ok(()) + } + + pub fn lt(self, v: f64) -> Result<(), Error> { + let c = self.c?; // return previous errors + + ensure!( + c.get_value() < v, + FailedMetricAssertionError { + name: self.family_name, + msg: format!("{:?} < {:?} failed", c.get_value(), v), + metrics: self.metric_dump, + } + ); + Ok(()) + } +} + #[derive(Debug)] pub struct Histogram<'a> { // if there was a problem getting the counter based on labels then the