feat: instrumented namespace cache
Decorates the NamespaceCache with a set of cache get hit/miss counters, and put insert/update counters to expose cache behaviour.pull/24376/head
parent
a3e15ade62
commit
92fe507e52
|
@ -18,7 +18,7 @@ use data_types::database_rules::{PartitionTemplate, TemplatePart};
|
|||
use observability_deps::tracing::*;
|
||||
use router2::{
|
||||
dml_handlers::{NamespaceAutocreation, Partitioner, SchemaValidator, ShardedWriteBuffer},
|
||||
namespace_cache::{MemoryNamespaceCache, ShardedCache},
|
||||
namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, ShardedCache},
|
||||
sequencer::Sequencer,
|
||||
server::{http::HttpDelegate, RouterServer},
|
||||
sharder::JumpHash,
|
||||
|
@ -94,10 +94,14 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
)
|
||||
.await?;
|
||||
|
||||
// Initialise a namespace cache to be shared with the schema validator, and
|
||||
// namespace auto-creator.
|
||||
let ns_cache = Arc::new(ShardedCache::new(
|
||||
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
|
||||
// Initialise an instrumented namespace cache to be shared with the schema
|
||||
// validator, and namespace auto-creator that reports cache hit/miss/update
|
||||
// metrics.
|
||||
let ns_cache = Arc::new(InstrumentedCache::new(
|
||||
Arc::new(ShardedCache::new(
|
||||
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
|
||||
)),
|
||||
Arc::clone(&metrics),
|
||||
));
|
||||
// Add the schema validator layer.
|
||||
let handler_stack =
|
||||
|
|
|
@ -12,7 +12,7 @@ use observability_deps::tracing::*;
|
|||
use thiserror::Error;
|
||||
use trace::ctx::SpanContext;
|
||||
|
||||
use crate::namespace_cache::{MemoryNamespaceCache, NamespaceCache};
|
||||
use crate::namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache};
|
||||
|
||||
use super::{DmlError, DmlHandler, Partitioned};
|
||||
|
||||
|
@ -82,7 +82,7 @@ pub enum SchemaError {
|
|||
///
|
||||
/// [#3573]: https://github.com/influxdata/influxdb_iox/issues/3573
|
||||
#[derive(Debug)]
|
||||
pub struct SchemaValidator<D, C = Arc<MemoryNamespaceCache>> {
|
||||
pub struct SchemaValidator<D, C = Arc<InstrumentedCache<MemoryNamespaceCache>>> {
|
||||
inner: D,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
|
||||
|
@ -266,7 +266,10 @@ mod tests {
|
|||
catalog
|
||||
}
|
||||
|
||||
fn assert_cache<D>(handler: &SchemaValidator<D>, table: &str, col: &str, want: ColumnType) {
|
||||
fn assert_cache<D, C>(handler: &SchemaValidator<D, C>, table: &str, col: &str, want: ColumnType)
|
||||
where
|
||||
C: NamespaceCache,
|
||||
{
|
||||
// The cache should be populated.
|
||||
let ns = handler
|
||||
.cache
|
||||
|
|
|
@ -6,6 +6,8 @@ pub use memory::*;
|
|||
mod sharded_cache;
|
||||
pub use sharded_cache::*;
|
||||
|
||||
pub mod metrics;
|
||||
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use data_types::DatabaseName;
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
//! Metric instrumentation for a [`NamespaceCache`] implementation.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use data_types::DatabaseName;
|
||||
use iox_catalog::interface::NamespaceSchema;
|
||||
use metric::{Metric, U64Counter};
|
||||
|
||||
use super::NamespaceCache;
|
||||
|
||||
/// An [`InstrumentedCache`] decorates a [`NamespaceCache`] with cache read
|
||||
/// hit/miss and cache put insert/update metrics.
|
||||
#[derive(Debug)]
|
||||
pub struct InstrumentedCache<T> {
|
||||
inner: T,
|
||||
|
||||
/// A cache read hit
|
||||
get_hit_counter: U64Counter,
|
||||
/// A cache read miss
|
||||
get_miss_counter: U64Counter,
|
||||
|
||||
/// A cache put for a namespace that did not previously exist.
|
||||
put_insert_counter: U64Counter,
|
||||
/// A cache put replacing a namespace that previously had a cache entry.
|
||||
put_update_counter: U64Counter,
|
||||
}
|
||||
|
||||
impl<T> InstrumentedCache<T> {
|
||||
/// Instrument `T`, recording cache operations to `registry`.
|
||||
pub fn new(inner: T, registry: Arc<metric::Registry>) -> Self {
|
||||
let get_counter: Metric<U64Counter> =
|
||||
registry.register_metric("namespace_cache_get_count", "cache read requests");
|
||||
let get_hit_counter = get_counter.recorder(&[("result", "hit")]);
|
||||
let get_miss_counter = get_counter.recorder(&[("result", "miss")]);
|
||||
|
||||
let put_counter: Metric<U64Counter> =
|
||||
registry.register_metric("namespace_cache_put_count", "cache put requests");
|
||||
let put_insert_counter = put_counter.recorder(&[("op", "insert")]);
|
||||
let put_update_counter = put_counter.recorder(&[("op", "update")]);
|
||||
|
||||
Self {
|
||||
inner,
|
||||
get_hit_counter,
|
||||
get_miss_counter,
|
||||
put_insert_counter,
|
||||
put_update_counter,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> NamespaceCache for Arc<InstrumentedCache<T>>
|
||||
where
|
||||
T: NamespaceCache,
|
||||
{
|
||||
fn get_schema(&self, namespace: &DatabaseName<'_>) -> Option<Arc<NamespaceSchema>> {
|
||||
match self.inner.get_schema(namespace) {
|
||||
Some(v) => {
|
||||
self.get_hit_counter.inc(1);
|
||||
Some(v)
|
||||
}
|
||||
None => {
|
||||
self.get_miss_counter.inc(1);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn put_schema(
|
||||
&self,
|
||||
namespace: DatabaseName<'static>,
|
||||
schema: impl Into<Arc<NamespaceSchema>>,
|
||||
) -> Option<Arc<NamespaceSchema>> {
|
||||
match self.inner.put_schema(namespace, schema) {
|
||||
Some(v) => {
|
||||
self.put_update_counter.inc(1);
|
||||
Some(v)
|
||||
}
|
||||
None => {
|
||||
self.put_insert_counter.inc(1);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue