feat: RED metrics

pull/24376/head
Edd Robinson 2021-04-21 12:43:49 +01:00 committed by kodiakhq[bot]
parent 1741c063f6
commit ea909f45ad
5 changed files with 578 additions and 0 deletions

10
Cargo.lock generated
View File

@ -1467,6 +1467,7 @@ dependencies = [
"itertools 0.9.0",
"logfmt",
"mem_qe",
"metrics",
"mutable_buffer",
"object_store",
"observability_deps",
@ -1801,6 +1802,15 @@ dependencies = [
"autocfg",
]
[[package]]
name = "metrics"
version = "0.1.0"
dependencies = [
"observability_deps",
"once_cell",
"parking_lot",
]
[[package]]
name = "mime"
version = "0.3.16"

View File

@ -51,6 +51,7 @@ internal_types = { path = "internal_types" }
ingest = { path = "ingest" }
logfmt = { path = "logfmt" }
mem_qe = { path = "mem_qe" }
metrics = { path = "metrics" }
mutable_buffer = { path = "mutable_buffer" }
object_store = { path = "object_store" }
observability_deps = { path = "observability_deps" }

18
metrics/Cargo.toml Normal file
View File

@ -0,0 +1,18 @@
[package]
name = "metrics"
version = "0.1.0"
authors = ["Edd Robinson <me@edd.io>"]
edition = "2018"
# This crate contains the application-specific abstraction of the metrics
# implementation for IOx. Any crate that wants to expose telemetry should use
# this crate.
[dependencies] # In alphabetical order
observability_deps = { path = "../observability_deps" }
once_cell = { version = "1.4.0", features = ["parking_lot"] }
parking_lot = "0.11.1"
[dev-dependencies] # In alphabetical order

348
metrics/src/lib.rs Normal file
View File

@ -0,0 +1,348 @@
#![deny(rust_2018_idioms)]
#![warn(
missing_debug_implementations,
clippy::explicit_iter_loop,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
use std::sync::Arc;
use observability_deps::{
opentelemetry::metrics::Meter as OTMeter,
opentelemetry::{self},
opentelemetry_prometheus::{self, ExporterBuilder},
prometheus::{Encoder, Registry, TextEncoder},
tracing::log::warn,
};
use once_cell::sync::Lazy;
use parking_lot::{const_rwlock, RwLock};
pub mod metrics;
pub use crate::metrics::*;
/// global metrics
pub static IOXD_METRICS_REGISTRY: Lazy<GlobalRegistry> = Lazy::new(GlobalRegistry::new);
// // TODO(jacobmarble): better way to write-once-read-many without a lock
// // TODO(jacobmarble): generic OTel exporter, rather than just prometheus
// static PROMETHEUS_EXPORTER: RwLock<Option<opentelemetry_prometheus::PrometheusExporter>> =
// const_rwlock(None);
// /// Returns the data in the Prom exposition format.
// pub fn metrics_as_text() -> Vec<u8> {
// let metric_families = PROMETHEUS_EXPORTER
// .read()
// .as_ref()
// .unwrap()
// .registry()
// .gather();
// let mut result = Vec::new();
// TextEncoder::new()
// .encode(&metric_families, &mut result)
// .unwrap();
// result
// }
/// Configuration options for the global registry.
///
///
/// TODO add flags to config, to configure the OpenTelemetry exporter (OTLP)
/// This sets the global meter provider, for other code to use
#[derive(Debug)]
pub struct Config {}
#[derive(Debug)]
pub struct GlobalRegistry {
meter: Arc<OTMeter>,
exporter: RwLock<opentelemetry_prometheus::PrometheusExporter>,
}
// TODO(jacobmarble): better way to write-once-read-many without a lock
// // TODO(jacobmarble): generic OTel exporter, rather than just prometheus
// static PROMETHEUS_EXPORTER: RwLock<Option<opentelemetry_prometheus::PrometheusExporter>> =
// const_rwlock(None);
impl GlobalRegistry {
// `new` is private because this is only initialised via
// `IOXD_METRICS_REGISTRY`.
fn new() -> Self {
// Self::init();
let prom_reg = Registry::new();
let exporter = RwLock::new(ExporterBuilder::default().with_registry(prom_reg).init());
// let exporter = RwLock::new(opentelemetry_prometheus::exporter().init());
Self {
meter: Arc::new(opentelemetry::global::meter("iox")),
exporter,
}
}
/// Returns the data in the Prom exposition format.
pub fn metrics_as_text(&self) -> Vec<u8> {
let metric_families = self.exporter.read().registry().gather();
let mut result = Vec::new();
TextEncoder::new()
.encode(&metric_families, &mut result)
.unwrap();
result
}
// /// Initializes global registry
// pub fn init() {
// let exporter = opentelemetry_prometheus::exporter().init();
// let mut guard = PROMETHEUS_EXPORTER.write();
// if guard.is_some() {
// warn!("metrics were already initialized, overwriting configuration");
// }
// *guard = Some(exporter);
// }
/// Initializes global registry with configuration options
// pub fn init_with_config(_config: &Config) {
// let exporter = opentelemetry_prometheus::exporter().init();
// let mut guard = PROMETHEUS_EXPORTER.write();
// if guard.is_some() {
// warn!("metrics were already initialized, overwriting configuration");
// }
// *guard = Some(exporter);
// }
/// This method should be used to register a new domain such as a
/// sub-system, crate or similar.
pub fn register_domain(&self, name: &'static str) -> Domain {
Domain::new(name, Arc::clone(&self.meter))
}
}
/// A `Domain` provides a namespace that describes a collection of related
/// metrics. Within the domain all metrics' names will be prefixed by the name
/// of the domain; for example "http", "read_buffer", "wal".
///
/// Domains should be registered on the global registry. The returned domain
/// allows individual metrics to then be registered and used.
#[derive(Debug)]
pub struct Domain {
name: &'static str,
meter: Arc<OTMeter>,
}
impl Domain {
pub(crate) fn new(name: &'static str, meter: Arc<OTMeter>) -> Self {
Self { name, meter }
}
// Creates an appropriate metric name prefix based on the Domain's name and
// an optional suffix.
fn build_metric_prefix(&self, suffix: Option<String>) -> String {
let suffix = match suffix {
Some(name) => format!(".{}", name),
None => "".to_string(),
};
format!("{}{}", self.name, suffix)
}
/// Registers a new metric following the RED methodology.
///
/// By default, two distinct metrics will be created. One will be a counter
/// that will track the number of total requests and failed requests. The
/// second will be a metric that tracks a latency distributions of all
/// requests in seconds.
///
/// If `name` is not provided then the metrics registered on the `mydomain`
/// domain will be named:
///
/// `mydomain.requests.total` (tracks total and failed requests)
/// `mydomain.requests.duration.seconds` (tracks a distribution of
/// latencies)
///
/// If `name` is provided then the metric names become:
//
/// `mydomain.somename.requests.total`
/// `mydomain.somename.requests.duration.seconds`
pub fn register_red_metric(&self, name: Option<String>) -> metrics::RedMetric {
self.register_red_metric_with_labels(name, &[])
}
/// As `register_red_metric` but with a set of default labels. These labels
/// will be associated with each observation given to the metric.
pub fn register_red_metric_with_labels(
&self,
name: Option<String>,
default_labels: &[KeyValue],
) -> metrics::RedMetric {
let requests = self
.meter
.u64_counter(format!(
"{}.requests.total",
self.build_metric_prefix(name.clone())
))
.with_description("accumulated total requests")
.init();
let duration = self
.meter
.f64_value_recorder(format!(
"{}.request.duration.seconds",
self.build_metric_prefix(name)
))
.with_description("distribution of request latencies")
.init();
metrics::RedMetric::new(requests, duration, &default_labels)
}
}
#[cfg(test)]
mod test {
use super::*;
use std::time::Duration;
#[test]
fn red_metric() {
let reg = GlobalRegistry::new();
let domain = reg.register_domain("http");
// create a RED metrics
let metric = domain.register_red_metric(None);
// Get an observation to start measuring something.
let ob = metric.observation();
// do some "work"
let duration = Duration::from_millis(100);
// finish observation with success using explicit duration for easier
// testing.
//
// Usually caller would call `ob.ok()`.
ob.observe(RedRequestStatus::Ok, duration, vec![]);
assert_eq!(
String::from_utf8(reg.metrics_as_text()).unwrap(),
vec![
"# HELP http_request_duration_seconds distribution of request latencies",
"# TYPE http_request_duration_seconds histogram",
r#"http_request_duration_seconds_bucket{status="ok",le="0.5"} 1"#,
r#"http_request_duration_seconds_bucket{status="ok",le="0.9"} 1"#,
r#"http_request_duration_seconds_bucket{status="ok",le="0.99"} 1"#,
r#"http_request_duration_seconds_bucket{status="ok",le="+Inf"} 1"#,
r#"http_request_duration_seconds_sum{status="ok"} 0.1"#,
r#"http_request_duration_seconds_count{status="ok"} 1"#,
"# HELP http_requests_total accumulated total requests",
"# TYPE http_requests_total counter",
r#"http_requests_total{status="ok"} 1"#,
""
]
.join("\n")
);
// report some other observations
let ob = metric.observation();
ob.observe(
RedRequestStatus::OkError,
Duration::from_millis(2000),
vec![],
);
let ob = metric.observation();
ob.observe(RedRequestStatus::Error, Duration::from_millis(350), vec![]);
assert_eq!(
String::from_utf8(reg.metrics_as_text()).unwrap(),
vec![
"# HELP http_request_duration_seconds distribution of request latencies",
"# TYPE http_request_duration_seconds histogram",
r#"http_request_duration_seconds_bucket{status="error",le="0.5"} 1"#,
r#"http_request_duration_seconds_bucket{status="error",le="0.9"} 1"#,
r#"http_request_duration_seconds_bucket{status="error",le="0.99"} 1"#,
r#"http_request_duration_seconds_bucket{status="error",le="+Inf"} 1"#,
r#"http_request_duration_seconds_sum{status="error"} 0.35"#,
r#"http_request_duration_seconds_count{status="error"} 1"#,
r#"http_request_duration_seconds_bucket{status="ok",le="0.5"} 1"#,
r#"http_request_duration_seconds_bucket{status="ok",le="0.9"} 1"#,
r#"http_request_duration_seconds_bucket{status="ok",le="0.99"} 1"#,
r#"http_request_duration_seconds_bucket{status="ok",le="+Inf"} 1"#,
r#"http_request_duration_seconds_sum{status="ok"} 0.1"#,
r#"http_request_duration_seconds_count{status="ok"} 1"#,
r#"http_request_duration_seconds_bucket{status="ok_error",le="0.5"} 0"#,
r#"http_request_duration_seconds_bucket{status="ok_error",le="0.9"} 0"#,
r#"http_request_duration_seconds_bucket{status="ok_error",le="0.99"} 0"#,
r#"http_request_duration_seconds_bucket{status="ok_error",le="+Inf"} 1"#,
r#"http_request_duration_seconds_sum{status="ok_error"} 2"#,
r#"http_request_duration_seconds_count{status="ok_error"} 1"#,
"# HELP http_requests_total accumulated total requests",
"# TYPE http_requests_total counter",
r#"http_requests_total{status="error"} 1"#,
r#"http_requests_total{status="ok"} 1"#,
r#"http_requests_total{status="ok_error"} 1"#,
"",
]
.join("\n")
);
}
#[test]
fn red_metric_labels() {
let reg = GlobalRegistry::new();
let domain = reg.register_domain("ftp");
// create a RED metrics
let metric = domain.register_red_metric(None);
// Get an observation to start measuring something.
let ob = metric.observation();
// Usually a caller would use `ob.ok_with_labels(labels)`;
ob.observe(
RedRequestStatus::Ok,
Duration::from_millis(100),
vec![KeyValue::new("account", "abc123")],
);
metric.observation().observe(
RedRequestStatus::OkError,
Duration::from_millis(200),
vec![KeyValue::new("account", "other")],
);
metric.observation().observe(
RedRequestStatus::Error,
Duration::from_millis(203),
vec![KeyValue::new("account", "abc123")],
);
assert_eq!(
String::from_utf8(reg.metrics_as_text()).unwrap(),
vec![
"# HELP ftp_request_duration_seconds distribution of request latencies",
"# TYPE ftp_request_duration_seconds histogram",
r#"ftp_request_duration_seconds_bucket{account="abc123",status="error",le="0.5"} 1"#,
r#"ftp_request_duration_seconds_bucket{account="abc123",status="error",le="0.9"} 1"#,
r#"ftp_request_duration_seconds_bucket{account="abc123",status="error",le="0.99"} 1"#,
r#"ftp_request_duration_seconds_bucket{account="abc123",status="error",le="+Inf"} 1"#,
r#"ftp_request_duration_seconds_sum{account="abc123",status="error"} 0.203"#,
r#"ftp_request_duration_seconds_count{account="abc123",status="error"} 1"#,
r#"ftp_request_duration_seconds_bucket{account="abc123",status="ok",le="0.5"} 1"#,
r#"ftp_request_duration_seconds_bucket{account="abc123",status="ok",le="0.9"} 1"#,
r#"ftp_request_duration_seconds_bucket{account="abc123",status="ok",le="0.99"} 1"#,
r#"ftp_request_duration_seconds_bucket{account="abc123",status="ok",le="+Inf"} 1"#,
r#"ftp_request_duration_seconds_sum{account="abc123",status="ok"} 0.1"#,
r#"ftp_request_duration_seconds_count{account="abc123",status="ok"} 1"#,
r#"ftp_request_duration_seconds_bucket{account="other",status="ok_error",le="0.5"} 1"#,
r#"ftp_request_duration_seconds_bucket{account="other",status="ok_error",le="0.9"} 1"#,
r#"ftp_request_duration_seconds_bucket{account="other",status="ok_error",le="0.99"} 1"#,
r#"ftp_request_duration_seconds_bucket{account="other",status="ok_error",le="+Inf"} 1"#,
r#"ftp_request_duration_seconds_sum{account="other",status="ok_error"} 0.2"#,
r#"ftp_request_duration_seconds_count{account="other",status="ok_error"} 1"#,
r#"# HELP ftp_requests_total accumulated total requests"#,
r#"# TYPE ftp_requests_total counter"#,
r#"ftp_requests_total{account="abc123",status="error"} 1"#,
r#"ftp_requests_total{account="abc123",status="ok"} 1"#,
r#"ftp_requests_total{account="other",status="ok_error"} 1"#,
""
]
.join("\n")
);
}
}

201
metrics/src/metrics.rs Normal file
View File

@ -0,0 +1,201 @@
use std::{
borrow::Cow,
fmt::Display,
time::{Duration, Instant},
};
use observability_deps::opentelemetry::metrics::{
Counter as OTCounter, ValueRecorder as OTHistorgram,
};
pub use observability_deps::opentelemetry::KeyValue;
const RED_REQUEST_STATUS_LABEL: &str = "status";
/// Possible types of RED metric observation status.
///
/// Ok - an observed request was successful.
/// OkError - an observed request was unsuccessful but it was not the fault of
/// the observed service.
/// Error - an observed request failed and it was the fault of the service.
///
/// What is the difference between OkError and Error? The difference is to do
/// where the failure occurred. When thinking about measuring SLOs like
/// availability it's necessary to calculate things like:
///
/// Availability = 1 - (failed_requests / all_valid_requests)
///
/// `all_valid_requests` includes successful requests and any requests that
/// failed but not due to the fault of the service (e.g., client errors).
///
/// It is useful to track the components of `all_valid_requests` separately so
/// operators can also monitor external errors (ok_error) errors to help
/// improve their APIs or other systems.
#[derive(Debug)]
pub enum RedRequestStatus {
Ok,
OkError,
Error,
}
impl Display for RedRequestStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Ok => write!(f, "ok"),
Self::OkError => write!(f, "ok_error"),
Self::Error => write!(f, "error"),
}
}
}
#[derive(Debug)]
/// A REDMetric is a metric that tracks requests to some resource.
///
/// The RED methodology stipulates you should track three key measures:
///
/// - Request Rate: (total number of requests / second);
/// - Error Rate: (total number of failed requests / second);
/// - Duration: (latency distributions for the various requests)
///
/// Using a `REDMetric` makes following this methodology easy because it handles
/// updating the three components for you.
pub struct RedMetric {
default_labels: Vec<KeyValue>,
requests: OTCounter<u64>,
duration: OTHistorgram<f64>,
}
impl RedMetric {
pub(crate) fn new(
requests: OTCounter<u64>,
duration: OTHistorgram<f64>,
labels: &[KeyValue],
) -> Self {
let mut default_labels = vec![KeyValue::new(
RED_REQUEST_STATUS_LABEL,
RedRequestStatus::Ok.to_string(),
)];
// TODO(edd): decide what to do if `labels` contains
// RED_REQUEST_STATUS_LABEL.
default_labels.extend(labels.iter().cloned());
Self {
requests,
duration,
default_labels,
}
}
/// Returns a new observation that will handle timing and recording an
/// observation the metric is tracking.
pub fn observation(
&'_ self,
) -> RedObservation<impl Fn(RedRequestStatus, Duration, Vec<KeyValue>) + '_> {
// The recording call-back
let record =
move |status: RedRequestStatus, duration: Duration, mut labels: Vec<KeyValue>| {
let status_idx = labels.len(); // status label will be located at end of labels vec.
let labels = match labels.is_empty() {
// If there are no labels specified just borrow defaults
true => Cow::Borrowed(&self.default_labels),
false => {
// Otherwise merge the provided labels and the defaults.
labels.extend(self.default_labels.iter().cloned());
Cow::Owned(labels)
}
};
match status {
RedRequestStatus::Ok => {
self.requests.add(1, &labels);
self.duration.record(duration.as_secs_f64(), &labels);
}
RedRequestStatus::OkError => {
let mut labels = labels.into_owned();
labels[status_idx] = KeyValue::new(
RED_REQUEST_STATUS_LABEL,
RedRequestStatus::OkError.to_string(),
);
self.requests.add(1, &labels);
self.duration.record(duration.as_secs_f64(), &labels);
}
RedRequestStatus::Error => {
let mut labels = labels.into_owned();
labels[status_idx] = KeyValue::new(
RED_REQUEST_STATUS_LABEL,
RedRequestStatus::Error.to_string(),
);
self.requests.add(1, &labels);
self.duration.record(duration.as_secs_f64(), &labels);
}
};
};
RedObservation::new(record)
}
}
#[derive(Debug)]
pub struct RedObservation<T>
where
T: Fn(RedRequestStatus, Duration, Vec<KeyValue>),
{
start: Instant,
record: T, // a call-back that records the observation on the metric.
}
impl<T> RedObservation<T>
where
T: Fn(RedRequestStatus, Duration, Vec<KeyValue>),
{
pub(crate) fn new(record: T) -> Self {
Self {
start: std::time::Instant::now(),
record,
}
}
/// Record that an observation was successful. The duration of the
/// observation should be provided. Callers might prefer `ok` where the
/// timing will be handled for them.
pub fn observe(
self,
observation: RedRequestStatus,
duration: Duration,
labels_itr: Vec<KeyValue>,
) {
(self.record)(observation, duration, labels_itr);
}
/// Record that the observation was successful. Timing of observation is
/// handled automatically.
pub fn ok(self) {
let duration = self.start.elapsed();
// PERF: allocation here but not convinced worth worrying about yet.
self.observe(RedRequestStatus::Ok, duration, vec![]);
}
/// Record that the observation was not successful but was still valid.
/// `ok_error` is the right thing to choose when the request failed perhaps
/// due to client error. Timing of observation is handled automatically.
pub fn ok_error(self) {
let duration = self.start.elapsed();
// PERF: allocation here but not convinced worth worrying about yet.
self.observe(RedRequestStatus::OkError, duration, vec![]);
}
/// Record that the observation was not successful and results in an error
/// caused by the service under observation. Timing of observation is
/// handled automatically.
pub fn error(self) {
let duration = self.start.elapsed();
// PERF: allocation here but not convinced worth worrying about yet.
self.observe(RedRequestStatus::Error, duration, vec![]);
}
}