From 40e5b193013ffb4445eed7b964bd2b7773f40f43 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 9 Feb 2022 17:26:57 +0000 Subject: [PATCH] feat: metric instrumentation for DML handlers Adds a decorator type over a DmlHandler implementation that records call latency for writes & deletes, broken down by result (success/error). --- router2/src/dml_handlers/instrumentation.rs | 137 ++++++++++++++++++++ router2/src/dml_handlers/mod.rs | 3 + 2 files changed, 140 insertions(+) create mode 100644 router2/src/dml_handlers/instrumentation.rs diff --git a/router2/src/dml_handlers/instrumentation.rs b/router2/src/dml_handlers/instrumentation.rs new file mode 100644 index 0000000000..e05727966f --- /dev/null +++ b/router2/src/dml_handlers/instrumentation.rs @@ -0,0 +1,137 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use data_types::{delete_predicate::DeletePredicate, DatabaseName}; +use metric::{Metric, U64Histogram, U64HistogramOptions}; +use time::{SystemProvider, TimeProvider}; +use trace::ctx::SpanContext; + +use super::DmlHandler; + +/// An instrumentation decorator recording call latencies for [`DmlHandler`] +/// implementations. +/// +/// Metrics are broken down by operation (write/delete) and result +/// (success/error) with call latency reported in milliseconds. +/// +/// # Chained / Nested Handlers +/// +/// Because [`DmlHandler`] implementations are constructed as a chain of +/// decorators to build up a full request handling pipeline, the reported call +/// latency of a given handler is a cumulative measure of the execution time for +/// handler and all of its children. +#[derive(Debug)] +pub struct InstrumentationDecorator { + inner: T, + time_provider: P, + + write_success: U64Histogram, + write_error: U64Histogram, + + delete_success: U64Histogram, + delete_error: U64Histogram, +} + +impl InstrumentationDecorator { + /// Wrap a new [`InstrumentationDecorator`] over `T` exposing metrics + /// prefixed by `name`. + /// + /// # Memory leak + /// + /// Calling this method constructs a set of metric names derived from `name` + /// and leaks them once to acquire a static str for each. + pub fn new(name: &'static str, registry: Arc, inner: T) -> Self { + let buckets = || { + U64HistogramOptions::new([5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, u64::MAX]) + }; + + let write: Metric = registry.register_metric_with_options( + leak_concat_name(name, "_write_duration_ms"), + "write handler call duration in milliseconds", + buckets, + ); + let delete: Metric = registry.register_metric_with_options( + leak_concat_name(name, "_delete_duration_ms"), + "write handler call duration in milliseconds", + buckets, + ); + + let write_success = write.recorder(&[("result", "success")]); + let write_error = write.recorder(&[("result", "error")]); + + let delete_success = delete.recorder(&[("result", "success")]); + let delete_error = delete.recorder(&[("result", "error")]); + + Self { + inner, + time_provider: Default::default(), + write_success, + write_error, + delete_success, + delete_error, + } + } +} + +#[async_trait] +impl DmlHandler for InstrumentationDecorator +where + T: DmlHandler, +{ + type WriteInput = T::WriteInput; + type WriteError = T::WriteError; + type DeleteError = T::DeleteError; + + /// Call the inner `write` method and record the call latency. + async fn write( + &self, + namespace: DatabaseName<'static>, + input: Self::WriteInput, + span_ctx: Option, + ) -> Result<(), Self::WriteError> { + let t = self.time_provider.now(); + let res = self.inner.write(namespace, input, span_ctx).await; + + // Avoid exploding if time goes backwards - simply drop the measurement + // if it happens. + if let Some(delta) = self.time_provider.now().checked_duration_since(t) { + match &res { + Ok(_) => self.write_success.record(delta.as_millis() as _), + Err(_) => self.write_error.record(delta.as_millis() as _), + }; + } + + res + } + + /// Call the inner `delete` method and record the call latency. + async fn delete<'a>( + &self, + namespace: DatabaseName<'static>, + table_name: impl Into + Send + Sync + 'a, + predicate: DeletePredicate, + span_ctx: Option, + ) -> Result<(), Self::DeleteError> { + let t = self.time_provider.now(); + let res = self + .inner + .delete(namespace, table_name, predicate, span_ctx) + .await; + + // Avoid exploding if time goes backwards - simply drop the measurement + // if it happens. + if let Some(delta) = self.time_provider.now().checked_duration_since(t) { + match &res { + Ok(_) => self.delete_success.record(delta.as_millis() as _), + Err(_) => self.delete_error.record(delta.as_millis() as _), + }; + } + + res + } +} + +fn leak_concat_name(prefix: &str, suffix: &str) -> &'static str { + let s = format!("{}{}", prefix, suffix); + Box::leak(s.into_boxed_str()) +} diff --git a/router2/src/dml_handlers/mod.rs b/router2/src/dml_handlers/mod.rs index ae498bd9f3..eb109e5d5d 100644 --- a/router2/src/dml_handlers/mod.rs +++ b/router2/src/dml_handlers/mod.rs @@ -90,5 +90,8 @@ pub use ns_autocreation::*; mod partitioner; pub use partitioner::*; +mod instrumentation; +pub use instrumentation::*; + #[cfg(test)] pub mod mock;