diff --git a/influxdb_iox/src/commands/run/router2.rs b/influxdb_iox/src/commands/run/router2.rs index a609f6a8c5..8ba517b337 100644 --- a/influxdb_iox/src/commands/run/router2.rs +++ b/influxdb_iox/src/commands/run/router2.rs @@ -18,7 +18,7 @@ use data_types::database_rules::{PartitionTemplate, TemplatePart}; use observability_deps::tracing::*; use router2::{ dml_handlers::{NamespaceAutocreation, Partitioner, SchemaValidator, ShardedWriteBuffer}, - namespace_cache::{MemoryNamespaceCache, ShardedCache}, + namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, ShardedCache}, sequencer::Sequencer, server::{http::HttpDelegate, RouterServer}, sharder::JumpHash, @@ -94,10 +94,14 @@ pub async fn command(config: Config) -> Result<()> { ) .await?; - // Initialise a namespace cache to be shared with the schema validator, and - // namespace auto-creator. - let ns_cache = Arc::new(ShardedCache::new( - iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + // Initialise an instrumented namespace cache to be shared with the schema + // validator, and namespace auto-creator that reports cache hit/miss/update + // metrics. + let ns_cache = Arc::new(InstrumentedCache::new( + Arc::new(ShardedCache::new( + iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + )), + Arc::clone(&metrics), )); // Add the schema validator layer. let handler_stack = diff --git a/router2/src/dml_handlers/schema_validation.rs b/router2/src/dml_handlers/schema_validation.rs index b700841bc4..d7beafb035 100644 --- a/router2/src/dml_handlers/schema_validation.rs +++ b/router2/src/dml_handlers/schema_validation.rs @@ -12,7 +12,7 @@ use observability_deps::tracing::*; use thiserror::Error; use trace::ctx::SpanContext; -use crate::namespace_cache::{MemoryNamespaceCache, NamespaceCache}; +use crate::namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache}; use super::{DmlError, DmlHandler, Partitioned}; @@ -82,7 +82,7 @@ pub enum SchemaError { /// /// [#3573]: https://github.com/influxdata/influxdb_iox/issues/3573 #[derive(Debug)] -pub struct SchemaValidator> { +pub struct SchemaValidator>> { inner: D, catalog: Arc, @@ -266,7 +266,10 @@ mod tests { catalog } - fn assert_cache(handler: &SchemaValidator, table: &str, col: &str, want: ColumnType) { + fn assert_cache(handler: &SchemaValidator, table: &str, col: &str, want: ColumnType) + where + C: NamespaceCache, + { // The cache should be populated. let ns = handler .cache diff --git a/router2/src/namespace_cache.rs b/router2/src/namespace_cache.rs index 1d61ad24af..6082682f14 100644 --- a/router2/src/namespace_cache.rs +++ b/router2/src/namespace_cache.rs @@ -6,6 +6,8 @@ pub use memory::*; mod sharded_cache; pub use sharded_cache::*; +pub mod metrics; + use std::{fmt::Debug, sync::Arc}; use data_types::DatabaseName; diff --git a/router2/src/namespace_cache/metrics.rs b/router2/src/namespace_cache/metrics.rs new file mode 100644 index 0000000000..0cddf389d8 --- /dev/null +++ b/router2/src/namespace_cache/metrics.rs @@ -0,0 +1,84 @@ +//! Metric instrumentation for a [`NamespaceCache`] implementation. + +use std::sync::Arc; + +use data_types::DatabaseName; +use iox_catalog::interface::NamespaceSchema; +use metric::{Metric, U64Counter}; + +use super::NamespaceCache; + +/// An [`InstrumentedCache`] decorates a [`NamespaceCache`] with cache read +/// hit/miss and cache put insert/update metrics. +#[derive(Debug)] +pub struct InstrumentedCache { + inner: T, + + /// A cache read hit + get_hit_counter: U64Counter, + /// A cache read miss + get_miss_counter: U64Counter, + + /// A cache put for a namespace that did not previously exist. + put_insert_counter: U64Counter, + /// A cache put replacing a namespace that previously had a cache entry. + put_update_counter: U64Counter, +} + +impl InstrumentedCache { + /// Instrument `T`, recording cache operations to `registry`. + pub fn new(inner: T, registry: Arc) -> Self { + let get_counter: Metric = + registry.register_metric("namespace_cache_get_count", "cache read requests"); + let get_hit_counter = get_counter.recorder(&[("result", "hit")]); + let get_miss_counter = get_counter.recorder(&[("result", "miss")]); + + let put_counter: Metric = + registry.register_metric("namespace_cache_put_count", "cache put requests"); + let put_insert_counter = put_counter.recorder(&[("op", "insert")]); + let put_update_counter = put_counter.recorder(&[("op", "update")]); + + Self { + inner, + get_hit_counter, + get_miss_counter, + put_insert_counter, + put_update_counter, + } + } +} + +impl NamespaceCache for Arc> +where + T: NamespaceCache, +{ + fn get_schema(&self, namespace: &DatabaseName<'_>) -> Option> { + match self.inner.get_schema(namespace) { + Some(v) => { + self.get_hit_counter.inc(1); + Some(v) + } + None => { + self.get_miss_counter.inc(1); + None + } + } + } + + fn put_schema( + &self, + namespace: DatabaseName<'static>, + schema: impl Into>, + ) -> Option> { + match self.inner.put_schema(namespace, schema) { + Some(v) => { + self.put_update_counter.inc(1); + Some(v) + } + None => { + self.put_insert_counter.inc(1); + None + } + } + } +}