diff --git a/Cargo.lock b/Cargo.lock index f63e68dc73..4b63c5dbdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1467,6 +1467,7 @@ dependencies = [ "itertools 0.9.0", "logfmt", "mem_qe", + "metrics", "mutable_buffer", "object_store", "observability_deps", @@ -1801,6 +1802,15 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.1.0" +dependencies = [ + "observability_deps", + "once_cell", + "parking_lot", +] + [[package]] name = "mime" version = "0.3.16" diff --git a/Cargo.toml b/Cargo.toml index 1d8803f4ae..55ff7896d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ internal_types = { path = "internal_types" } ingest = { path = "ingest" } logfmt = { path = "logfmt" } mem_qe = { path = "mem_qe" } +metrics = { path = "metrics" } mutable_buffer = { path = "mutable_buffer" } object_store = { path = "object_store" } observability_deps = { path = "observability_deps" } diff --git a/metrics/Cargo.toml b/metrics/Cargo.toml new file mode 100644 index 0000000000..ede2e59f52 --- /dev/null +++ b/metrics/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "metrics" +version = "0.1.0" +authors = ["Edd Robinson "] +edition = "2018" + +# This crate contains the application-specific abstraction of the metrics +# implementation for IOx. Any crate that wants to expose telemetry should use +# this crate. + + +[dependencies] # In alphabetical order +observability_deps = { path = "../observability_deps" } +once_cell = { version = "1.4.0", features = ["parking_lot"] } +parking_lot = "0.11.1" + +[dev-dependencies] # In alphabetical order + diff --git a/metrics/src/lib.rs b/metrics/src/lib.rs new file mode 100644 index 0000000000..0b9d574d4e --- /dev/null +++ b/metrics/src/lib.rs @@ -0,0 +1,348 @@ +#![deny(rust_2018_idioms)] +#![warn( + missing_debug_implementations, + clippy::explicit_iter_loop, + clippy::use_self, + clippy::clone_on_ref_ptr +)] +use std::sync::Arc; + +use observability_deps::{ + opentelemetry::metrics::Meter as OTMeter, + opentelemetry::{self}, + opentelemetry_prometheus::{self, ExporterBuilder}, + prometheus::{Encoder, Registry, TextEncoder}, + tracing::log::warn, +}; +use once_cell::sync::Lazy; +use parking_lot::{const_rwlock, RwLock}; + +pub mod metrics; +pub use crate::metrics::*; + +/// global metrics +pub static IOXD_METRICS_REGISTRY: Lazy = Lazy::new(GlobalRegistry::new); + +// // TODO(jacobmarble): better way to write-once-read-many without a lock +// // TODO(jacobmarble): generic OTel exporter, rather than just prometheus +// static PROMETHEUS_EXPORTER: RwLock> = +// const_rwlock(None); + +// /// Returns the data in the Prom exposition format. +// pub fn metrics_as_text() -> Vec { +// let metric_families = PROMETHEUS_EXPORTER +// .read() +// .as_ref() +// .unwrap() +// .registry() +// .gather(); +// let mut result = Vec::new(); +// TextEncoder::new() +// .encode(&metric_families, &mut result) +// .unwrap(); +// result +// } + +/// Configuration options for the global registry. +/// +/// +/// TODO add flags to config, to configure the OpenTelemetry exporter (OTLP) +/// This sets the global meter provider, for other code to use +#[derive(Debug)] +pub struct Config {} + +#[derive(Debug)] +pub struct GlobalRegistry { + meter: Arc, + exporter: RwLock, +} + +// TODO(jacobmarble): better way to write-once-read-many without a lock +// // TODO(jacobmarble): generic OTel exporter, rather than just prometheus +// static PROMETHEUS_EXPORTER: RwLock> = +// const_rwlock(None); + +impl GlobalRegistry { + // `new` is private because this is only initialised via + // `IOXD_METRICS_REGISTRY`. + fn new() -> Self { + // Self::init(); + let prom_reg = Registry::new(); + let exporter = RwLock::new(ExporterBuilder::default().with_registry(prom_reg).init()); + + // let exporter = RwLock::new(opentelemetry_prometheus::exporter().init()); + Self { + meter: Arc::new(opentelemetry::global::meter("iox")), + exporter, + } + } + + /// Returns the data in the Prom exposition format. + pub fn metrics_as_text(&self) -> Vec { + let metric_families = self.exporter.read().registry().gather(); + let mut result = Vec::new(); + TextEncoder::new() + .encode(&metric_families, &mut result) + .unwrap(); + result + } + + // /// Initializes global registry + // pub fn init() { + // let exporter = opentelemetry_prometheus::exporter().init(); + // let mut guard = PROMETHEUS_EXPORTER.write(); + // if guard.is_some() { + // warn!("metrics were already initialized, overwriting configuration"); + // } + // *guard = Some(exporter); + // } + + /// Initializes global registry with configuration options + // pub fn init_with_config(_config: &Config) { + // let exporter = opentelemetry_prometheus::exporter().init(); + // let mut guard = PROMETHEUS_EXPORTER.write(); + // if guard.is_some() { + // warn!("metrics were already initialized, overwriting configuration"); + // } + // *guard = Some(exporter); + // } + + /// This method should be used to register a new domain such as a + /// sub-system, crate or similar. + pub fn register_domain(&self, name: &'static str) -> Domain { + Domain::new(name, Arc::clone(&self.meter)) + } +} + +/// A `Domain` provides a namespace that describes a collection of related +/// metrics. Within the domain all metrics' names will be prefixed by the name +/// of the domain; for example "http", "read_buffer", "wal". +/// +/// Domains should be registered on the global registry. The returned domain +/// allows individual metrics to then be registered and used. +#[derive(Debug)] +pub struct Domain { + name: &'static str, + meter: Arc, +} + +impl Domain { + pub(crate) fn new(name: &'static str, meter: Arc) -> Self { + Self { name, meter } + } + + // Creates an appropriate metric name prefix based on the Domain's name and + // an optional suffix. + fn build_metric_prefix(&self, suffix: Option) -> String { + let suffix = match suffix { + Some(name) => format!(".{}", name), + None => "".to_string(), + }; + + format!("{}{}", self.name, suffix) + } + + /// Registers a new metric following the RED methodology. + /// + /// By default, two distinct metrics will be created. One will be a counter + /// that will track the number of total requests and failed requests. The + /// second will be a metric that tracks a latency distributions of all + /// requests in seconds. + /// + /// If `name` is not provided then the metrics registered on the `mydomain` + /// domain will be named: + /// + /// `mydomain.requests.total` (tracks total and failed requests) + /// `mydomain.requests.duration.seconds` (tracks a distribution of + /// latencies) + /// + /// If `name` is provided then the metric names become: + // + /// `mydomain.somename.requests.total` + /// `mydomain.somename.requests.duration.seconds` + pub fn register_red_metric(&self, name: Option) -> metrics::RedMetric { + self.register_red_metric_with_labels(name, &[]) + } + + /// As `register_red_metric` but with a set of default labels. These labels + /// will be associated with each observation given to the metric. + pub fn register_red_metric_with_labels( + &self, + name: Option, + default_labels: &[KeyValue], + ) -> metrics::RedMetric { + let requests = self + .meter + .u64_counter(format!( + "{}.requests.total", + self.build_metric_prefix(name.clone()) + )) + .with_description("accumulated total requests") + .init(); + + let duration = self + .meter + .f64_value_recorder(format!( + "{}.request.duration.seconds", + self.build_metric_prefix(name) + )) + .with_description("distribution of request latencies") + .init(); + + metrics::RedMetric::new(requests, duration, &default_labels) + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::time::Duration; + + #[test] + fn red_metric() { + let reg = GlobalRegistry::new(); + let domain = reg.register_domain("http"); + + // create a RED metrics + let metric = domain.register_red_metric(None); + + // Get an observation to start measuring something. + let ob = metric.observation(); + + // do some "work" + let duration = Duration::from_millis(100); + + // finish observation with success using explicit duration for easier + // testing. + // + // Usually caller would call `ob.ok()`. + ob.observe(RedRequestStatus::Ok, duration, vec![]); + + assert_eq!( + String::from_utf8(reg.metrics_as_text()).unwrap(), + vec![ + "# HELP http_request_duration_seconds distribution of request latencies", + "# TYPE http_request_duration_seconds histogram", + r#"http_request_duration_seconds_bucket{status="ok",le="0.5"} 1"#, + r#"http_request_duration_seconds_bucket{status="ok",le="0.9"} 1"#, + r#"http_request_duration_seconds_bucket{status="ok",le="0.99"} 1"#, + r#"http_request_duration_seconds_bucket{status="ok",le="+Inf"} 1"#, + r#"http_request_duration_seconds_sum{status="ok"} 0.1"#, + r#"http_request_duration_seconds_count{status="ok"} 1"#, + "# HELP http_requests_total accumulated total requests", + "# TYPE http_requests_total counter", + r#"http_requests_total{status="ok"} 1"#, + "" + ] + .join("\n") + ); + + // report some other observations + let ob = metric.observation(); + ob.observe( + RedRequestStatus::OkError, + Duration::from_millis(2000), + vec![], + ); + + let ob = metric.observation(); + ob.observe(RedRequestStatus::Error, Duration::from_millis(350), vec![]); + + assert_eq!( + String::from_utf8(reg.metrics_as_text()).unwrap(), + vec![ + "# HELP http_request_duration_seconds distribution of request latencies", + "# TYPE http_request_duration_seconds histogram", + r#"http_request_duration_seconds_bucket{status="error",le="0.5"} 1"#, + r#"http_request_duration_seconds_bucket{status="error",le="0.9"} 1"#, + r#"http_request_duration_seconds_bucket{status="error",le="0.99"} 1"#, + r#"http_request_duration_seconds_bucket{status="error",le="+Inf"} 1"#, + r#"http_request_duration_seconds_sum{status="error"} 0.35"#, + r#"http_request_duration_seconds_count{status="error"} 1"#, + r#"http_request_duration_seconds_bucket{status="ok",le="0.5"} 1"#, + r#"http_request_duration_seconds_bucket{status="ok",le="0.9"} 1"#, + r#"http_request_duration_seconds_bucket{status="ok",le="0.99"} 1"#, + r#"http_request_duration_seconds_bucket{status="ok",le="+Inf"} 1"#, + r#"http_request_duration_seconds_sum{status="ok"} 0.1"#, + r#"http_request_duration_seconds_count{status="ok"} 1"#, + r#"http_request_duration_seconds_bucket{status="ok_error",le="0.5"} 0"#, + r#"http_request_duration_seconds_bucket{status="ok_error",le="0.9"} 0"#, + r#"http_request_duration_seconds_bucket{status="ok_error",le="0.99"} 0"#, + r#"http_request_duration_seconds_bucket{status="ok_error",le="+Inf"} 1"#, + r#"http_request_duration_seconds_sum{status="ok_error"} 2"#, + r#"http_request_duration_seconds_count{status="ok_error"} 1"#, + "# HELP http_requests_total accumulated total requests", + "# TYPE http_requests_total counter", + r#"http_requests_total{status="error"} 1"#, + r#"http_requests_total{status="ok"} 1"#, + r#"http_requests_total{status="ok_error"} 1"#, + "", + ] + .join("\n") + ); + } + + #[test] + fn red_metric_labels() { + let reg = GlobalRegistry::new(); + let domain = reg.register_domain("ftp"); + + // create a RED metrics + let metric = domain.register_red_metric(None); + + // Get an observation to start measuring something. + let ob = metric.observation(); + + // Usually a caller would use `ob.ok_with_labels(labels)`; + ob.observe( + RedRequestStatus::Ok, + Duration::from_millis(100), + vec![KeyValue::new("account", "abc123")], + ); + + metric.observation().observe( + RedRequestStatus::OkError, + Duration::from_millis(200), + vec![KeyValue::new("account", "other")], + ); + + metric.observation().observe( + RedRequestStatus::Error, + Duration::from_millis(203), + vec![KeyValue::new("account", "abc123")], + ); + + assert_eq!( + String::from_utf8(reg.metrics_as_text()).unwrap(), + vec![ + "# HELP ftp_request_duration_seconds distribution of request latencies", +"# TYPE ftp_request_duration_seconds histogram", +r#"ftp_request_duration_seconds_bucket{account="abc123",status="error",le="0.5"} 1"#, +r#"ftp_request_duration_seconds_bucket{account="abc123",status="error",le="0.9"} 1"#, +r#"ftp_request_duration_seconds_bucket{account="abc123",status="error",le="0.99"} 1"#, +r#"ftp_request_duration_seconds_bucket{account="abc123",status="error",le="+Inf"} 1"#, +r#"ftp_request_duration_seconds_sum{account="abc123",status="error"} 0.203"#, +r#"ftp_request_duration_seconds_count{account="abc123",status="error"} 1"#, +r#"ftp_request_duration_seconds_bucket{account="abc123",status="ok",le="0.5"} 1"#, +r#"ftp_request_duration_seconds_bucket{account="abc123",status="ok",le="0.9"} 1"#, +r#"ftp_request_duration_seconds_bucket{account="abc123",status="ok",le="0.99"} 1"#, +r#"ftp_request_duration_seconds_bucket{account="abc123",status="ok",le="+Inf"} 1"#, +r#"ftp_request_duration_seconds_sum{account="abc123",status="ok"} 0.1"#, +r#"ftp_request_duration_seconds_count{account="abc123",status="ok"} 1"#, +r#"ftp_request_duration_seconds_bucket{account="other",status="ok_error",le="0.5"} 1"#, +r#"ftp_request_duration_seconds_bucket{account="other",status="ok_error",le="0.9"} 1"#, +r#"ftp_request_duration_seconds_bucket{account="other",status="ok_error",le="0.99"} 1"#, +r#"ftp_request_duration_seconds_bucket{account="other",status="ok_error",le="+Inf"} 1"#, +r#"ftp_request_duration_seconds_sum{account="other",status="ok_error"} 0.2"#, +r#"ftp_request_duration_seconds_count{account="other",status="ok_error"} 1"#, +r#"# HELP ftp_requests_total accumulated total requests"#, +r#"# TYPE ftp_requests_total counter"#, +r#"ftp_requests_total{account="abc123",status="error"} 1"#, +r#"ftp_requests_total{account="abc123",status="ok"} 1"#, +r#"ftp_requests_total{account="other",status="ok_error"} 1"#, +"" + ] + .join("\n") + ); + } +} diff --git a/metrics/src/metrics.rs b/metrics/src/metrics.rs new file mode 100644 index 0000000000..92908fcf0e --- /dev/null +++ b/metrics/src/metrics.rs @@ -0,0 +1,201 @@ +use std::{ + borrow::Cow, + fmt::Display, + time::{Duration, Instant}, +}; + +use observability_deps::opentelemetry::metrics::{ + Counter as OTCounter, ValueRecorder as OTHistorgram, +}; + +pub use observability_deps::opentelemetry::KeyValue; + +const RED_REQUEST_STATUS_LABEL: &str = "status"; + +/// Possible types of RED metric observation status. +/// +/// Ok - an observed request was successful. +/// OkError - an observed request was unsuccessful but it was not the fault of +/// the observed service. +/// Error - an observed request failed and it was the fault of the service. +/// +/// What is the difference between OkError and Error? The difference is to do +/// where the failure occurred. When thinking about measuring SLOs like +/// availability it's necessary to calculate things like: +/// +/// Availability = 1 - (failed_requests / all_valid_requests) +/// +/// `all_valid_requests` includes successful requests and any requests that +/// failed but not due to the fault of the service (e.g., client errors). +/// +/// It is useful to track the components of `all_valid_requests` separately so +/// operators can also monitor external errors (ok_error) errors to help +/// improve their APIs or other systems. +#[derive(Debug)] +pub enum RedRequestStatus { + Ok, + OkError, + Error, +} + +impl Display for RedRequestStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Ok => write!(f, "ok"), + Self::OkError => write!(f, "ok_error"), + Self::Error => write!(f, "error"), + } + } +} + +#[derive(Debug)] +/// A REDMetric is a metric that tracks requests to some resource. +/// +/// The RED methodology stipulates you should track three key measures: +/// +/// - Request Rate: (total number of requests / second); +/// - Error Rate: (total number of failed requests / second); +/// - Duration: (latency distributions for the various requests) +/// +/// Using a `REDMetric` makes following this methodology easy because it handles +/// updating the three components for you. +pub struct RedMetric { + default_labels: Vec, + requests: OTCounter, + duration: OTHistorgram, +} + +impl RedMetric { + pub(crate) fn new( + requests: OTCounter, + duration: OTHistorgram, + labels: &[KeyValue], + ) -> Self { + let mut default_labels = vec![KeyValue::new( + RED_REQUEST_STATUS_LABEL, + RedRequestStatus::Ok.to_string(), + )]; + + // TODO(edd): decide what to do if `labels` contains + // RED_REQUEST_STATUS_LABEL. + default_labels.extend(labels.iter().cloned()); + + Self { + requests, + duration, + default_labels, + } + } + + /// Returns a new observation that will handle timing and recording an + /// observation the metric is tracking. + pub fn observation( + &'_ self, + ) -> RedObservation) + '_> { + // The recording call-back + let record = + move |status: RedRequestStatus, duration: Duration, mut labels: Vec| { + let status_idx = labels.len(); // status label will be located at end of labels vec. + let labels = match labels.is_empty() { + // If there are no labels specified just borrow defaults + true => Cow::Borrowed(&self.default_labels), + false => { + // Otherwise merge the provided labels and the defaults. + labels.extend(self.default_labels.iter().cloned()); + Cow::Owned(labels) + } + }; + + match status { + RedRequestStatus::Ok => { + self.requests.add(1, &labels); + self.duration.record(duration.as_secs_f64(), &labels); + } + RedRequestStatus::OkError => { + let mut labels = labels.into_owned(); + labels[status_idx] = KeyValue::new( + RED_REQUEST_STATUS_LABEL, + RedRequestStatus::OkError.to_string(), + ); + + self.requests.add(1, &labels); + self.duration.record(duration.as_secs_f64(), &labels); + } + RedRequestStatus::Error => { + let mut labels = labels.into_owned(); + labels[status_idx] = KeyValue::new( + RED_REQUEST_STATUS_LABEL, + RedRequestStatus::Error.to_string(), + ); + + self.requests.add(1, &labels); + self.duration.record(duration.as_secs_f64(), &labels); + } + }; + }; + + RedObservation::new(record) + } +} + +#[derive(Debug)] +pub struct RedObservation +where + T: Fn(RedRequestStatus, Duration, Vec), +{ + start: Instant, + record: T, // a call-back that records the observation on the metric. +} + +impl RedObservation +where + T: Fn(RedRequestStatus, Duration, Vec), +{ + pub(crate) fn new(record: T) -> Self { + Self { + start: std::time::Instant::now(), + record, + } + } + + /// Record that an observation was successful. The duration of the + /// observation should be provided. Callers might prefer `ok` where the + /// timing will be handled for them. + pub fn observe( + self, + observation: RedRequestStatus, + duration: Duration, + labels_itr: Vec, + ) { + (self.record)(observation, duration, labels_itr); + } + + /// Record that the observation was successful. Timing of observation is + /// handled automatically. + pub fn ok(self) { + let duration = self.start.elapsed(); + + // PERF: allocation here but not convinced worth worrying about yet. + self.observe(RedRequestStatus::Ok, duration, vec![]); + } + + /// Record that the observation was not successful but was still valid. + /// `ok_error` is the right thing to choose when the request failed perhaps + /// due to client error. Timing of observation is handled automatically. + pub fn ok_error(self) { + let duration = self.start.elapsed(); + + // PERF: allocation here but not convinced worth worrying about yet. + self.observe(RedRequestStatus::OkError, duration, vec![]); + } + + /// Record that the observation was not successful and results in an error + /// caused by the service under observation. Timing of observation is + /// handled automatically. + pub fn error(self) { + let duration = self.start.elapsed(); + + // PERF: allocation here but not convinced worth worrying about yet. + self.observe(RedRequestStatus::Error, duration, vec![]); + } +}