refactor(router): Surface stats for new namespace schema in cache

The previous behaviour of the router's NamespaceCache was to provide
put semantics where the entire schema in the cache is replaced. With
the addition of the additive merging side-effect, the metrics decorator
could not compute the correct statistics. This calculates them during
the merge and surfaces the result to the caller.
pull/24376/head
Fraser Savage 2023-04-13 16:33:41 +01:00
parent 96365fc1c6
commit 3425bc176e
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
5 changed files with 96 additions and 86 deletions

View File

@ -36,5 +36,26 @@ pub trait NamespaceCache: Debug + Send + Sync {
&self,
namespace: NamespaceName<'static>,
schema: impl Into<Arc<NamespaceSchema>>,
) -> Option<Arc<NamespaceSchema>>;
) -> (Option<Arc<NamespaceSchema>>, NamespaceStats);
}
#[derive(Debug, PartialEq, Eq)]
/// An encapsulation of statistics associated with a namespace schema.
pub struct NamespaceStats {
/// Number of tables within the namespace
pub table_count: u64,
/// Total number of columns across all tables within the namespace
pub column_count: u64,
}
impl NamespaceStats {
/// Derives a set of [`NamespaceStats`] from the given schema.
pub fn new(ns: &NamespaceSchema) -> Self {
let table_count = ns.tables.len() as _;
let column_count = ns.tables.values().fold(0, |acc, t| acc + t.columns.len()) as _;
Self {
table_count,
column_count,
}
}
}

View File

@ -6,7 +6,7 @@ use hashbrown::HashMap;
use parking_lot::RwLock;
use thiserror::Error;
use super::NamespaceCache;
use super::{NamespaceCache, NamespaceStats};
/// An error type indicating that `namespace` is not present in the cache.
#[derive(Debug, Error)]
@ -43,33 +43,45 @@ impl NamespaceCache for Arc<MemoryNamespaceCache> {
&self,
namespace: NamespaceName<'static>,
schema: impl Into<Arc<NamespaceSchema>>,
) -> Option<Arc<NamespaceSchema>> {
) -> (Option<Arc<NamespaceSchema>>, NamespaceStats) {
let mut guard = self.cache.write();
let new_ns = schema.into();
let new_stats = NamespaceStats::new(&new_ns);
match guard.get(&namespace) {
Some(old_ns) => {
// If the previous tenant has a different ID then take the new
// schema. The old may have been replaced.
if old_ns.id != new_ns.id {
return guard.insert(namespace, new_ns);
return (guard.insert(namespace, new_ns), new_stats);
}
let mut new_ns = (*new_ns).clone();
// The column count can be computed as part of the merge process
// here to save on additional iteration.
let mut new_column_count: u64 = 0;
for (table_name, new_table) in &mut new_ns.tables {
new_column_count += new_table.columns.len() as u64;
let old_columns = match old_ns.tables.get(table_name) {
Some(v) => &v.columns,
None => continue,
};
for (column_name, column) in old_columns {
if !new_table.columns.contains_key(column_name) {
new_column_count += 1;
new_table.columns.insert(column_name.clone(), *column);
}
}
}
guard.insert(namespace, Arc::new(new_ns))
let new_stats = NamespaceStats {
table_count: new_ns.tables.len() as _,
column_count: new_column_count,
};
(guard.insert(namespace, Arc::new(new_ns)), new_stats)
}
None => guard.insert(namespace, new_ns),
None => (guard.insert(namespace, new_ns), new_stats),
}
}
}
@ -106,7 +118,7 @@ mod tests {
max_tables: 24,
retention_period_ns: Some(876),
};
assert!(cache.put_schema(ns.clone(), schema1.clone()).is_none());
assert_matches!(cache.put_schema(ns.clone(), schema1.clone()), (None, _));
assert_eq!(
*cache.get_schema(&ns).await.expect("lookup failure"),
schema1
@ -122,11 +134,12 @@ mod tests {
retention_period_ns: Some(876),
};
assert_eq!(
*cache
.put_schema(ns.clone(), schema2.clone())
.expect("should have existing schema"),
schema1
assert_matches!(
cache
.put_schema(ns.clone(), schema2.clone()),
(Some(prev), _) => {
assert_eq!(*prev, schema1);
}
);
assert_eq!(
*cache.get_schema(&ns).await.expect("lookup failure"),
@ -198,8 +211,18 @@ mod tests {
let cache_clone = Arc::clone(&cache);
let ns_clone = ns.clone();
tokio::task::spawn(async move {
cache_clone.put_schema(ns_clone.clone(), schema_update_1);
cache_clone.put_schema(ns_clone.clone(), schema_update_2);
assert_matches!(cache_clone.put_schema(ns_clone.clone(), schema_update_1), (None, new_stats) => {
assert_eq!(new_stats, NamespaceStats{
table_count: 1,
column_count: 1,
});
});
assert_matches!(cache_clone.put_schema(ns_clone.clone(), schema_update_2), (Some(_), new_stats) => {
assert_eq!(new_stats, NamespaceStats{
table_count: 1,
column_count: 2,
});
});
})
.await
.unwrap();

View File

@ -7,7 +7,7 @@ use data_types::{NamespaceName, NamespaceSchema};
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric, U64Gauge};
use super::NamespaceCache;
use super::{NamespaceCache, NamespaceStats};
/// An [`InstrumentedCache`] decorates a [`NamespaceCache`] with cache read
/// hit/miss and cache put insert/update metrics.
@ -101,14 +101,13 @@ where
&self,
namespace: NamespaceName<'static>,
schema: impl Into<Arc<NamespaceSchema>>,
) -> Option<Arc<NamespaceSchema>> {
) -> (Option<Arc<NamespaceSchema>>, NamespaceStats) {
let schema = schema.into();
let stats = NamespaceStats::new(&schema);
let t = self.time_provider.now();
let res = self.inner.put_schema(namespace, schema);
let (previous, new_stats) = self.inner.put_schema(namespace, schema);
match res {
match previous {
Some(v) => {
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
self.put_update.record(delta);
@ -117,14 +116,15 @@ where
// Figure out the difference between the new namespace and the
// evicted old namespace
let old_stats = NamespaceStats::new(&v);
let table_count_diff = stats.table_count as i64 - old_stats.table_count as i64;
let column_count_diff = stats.column_count as i64 - old_stats.column_count as i64;
let table_count_diff = new_stats.table_count as i64 - old_stats.table_count as i64;
let column_count_diff =
new_stats.column_count as i64 - old_stats.column_count as i64;
// Adjust the metrics to reflect the change
self.table_count.delta(table_count_diff);
self.column_count.delta(column_count_diff);
Some(v)
(Some(v), new_stats)
}
None => {
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
@ -132,36 +132,20 @@ where
}
// Add the new namespace stats to the counts.
self.table_count.inc(stats.table_count);
self.column_count.inc(stats.column_count);
self.table_count.inc(new_stats.table_count);
self.column_count.inc(new_stats.column_count);
None
(None, new_stats)
}
}
}
}
#[derive(Debug)]
struct NamespaceStats {
table_count: u64,
column_count: u64,
}
impl NamespaceStats {
fn new(ns: &NamespaceSchema) -> Self {
let table_count = ns.tables.len() as _;
let column_count = ns.tables.values().fold(0, |acc, t| acc + t.columns.len()) as _;
Self {
table_count,
column_count,
}
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use assert_matches::assert_matches;
use data_types::{
ColumnId, ColumnSchema, ColumnType, NamespaceId, QueryPoolId, TableId, TableSchema, TopicId,
};
@ -240,7 +224,7 @@ mod tests {
// No tables
let schema = new_schema(&[]);
assert!(cache.put_schema(ns.clone(), schema).is_none());
assert_matches!(cache.put_schema(ns.clone(), schema), (None, _));
assert_histogram_hit(
&registry,
"namespace_cache_put_duration",
@ -258,7 +242,7 @@ mod tests {
// Add a table with 1 column
let schema = new_schema(&[1]);
assert!(cache.put_schema(ns.clone(), schema).is_some());
assert_matches!(cache.put_schema(ns.clone(), schema), (Some(_), _));
assert_histogram_hit(
&registry,
"namespace_cache_put_duration",
@ -276,7 +260,7 @@ mod tests {
// Increase the number of columns in this one table
let schema = new_schema(&[5]);
assert!(cache.put_schema(ns.clone(), schema).is_some());
assert_matches!(cache.put_schema(ns.clone(), schema), (Some(_), _));
assert_histogram_hit(
&registry,
"namespace_cache_put_duration",
@ -292,9 +276,9 @@ mod tests {
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(1));
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(5));
// Decrease the number of columns
let schema = new_schema(&[2]);
assert!(cache.put_schema(ns.clone(), schema).is_some());
// Add another table
let schema = new_schema(&[5, 5]);
assert_matches!(cache.put_schema(ns.clone(), schema), (Some(_), _));
assert_histogram_hit(
&registry,
"namespace_cache_put_duration",
@ -307,12 +291,12 @@ mod tests {
("op", "update"),
3,
);
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(1));
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(2));
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(2));
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(10));
// Add another table
let schema = new_schema(&[2, 5]);
assert!(cache.put_schema(ns.clone(), schema).is_some());
// Add another table and adjust an existing table (increased column count)
let schema = new_schema(&[5, 10, 4]);
assert_matches!(cache.put_schema(ns.clone(), schema), (Some(_), _));
assert_histogram_hit(
&registry,
"namespace_cache_put_duration",
@ -325,12 +309,12 @@ mod tests {
("op", "update"),
4,
);
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(2));
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(7));
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(3));
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(19));
// Add another table and adjust the existing tables (one up, one down)
let schema = new_schema(&[1, 10, 4]);
assert!(cache.put_schema(ns.clone(), schema).is_some());
// Remove a table
let schema = new_schema(&[5, 10]);
assert_matches!(cache.put_schema(ns, schema), (Some(_), _));
assert_histogram_hit(
&registry,
"namespace_cache_put_duration",
@ -343,31 +327,13 @@ mod tests {
("op", "update"),
5,
);
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(3));
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(15));
// Remove a table
let schema = new_schema(&[1, 10]);
assert!(cache.put_schema(ns, schema).is_some());
assert_histogram_hit(
&registry,
"namespace_cache_put_duration",
("op", "insert"),
1,
); // Unchanged
assert_histogram_hit(
&registry,
"namespace_cache_put_duration",
("op", "update"),
6,
);
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(2));
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(11));
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(15));
// Add a new namespace
let ns = NamespaceName::new("another").expect("namespace name is valid");
let schema = new_schema(&[10, 12, 9]);
assert!(cache.put_schema(ns.clone(), schema).is_none());
assert_matches!(cache.put_schema(ns.clone(), schema), (None, _));
assert_histogram_hit(
&registry,
"namespace_cache_put_duration",
@ -378,10 +344,10 @@ mod tests {
&registry,
"namespace_cache_put_duration",
("op", "update"),
6,
5,
);
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(5));
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(42));
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(46)); // 15 + new columns (31)
let _got = cache.get_schema(&ns).await.expect("should exist");
assert_histogram_hit(

View File

@ -8,7 +8,7 @@ use iox_catalog::interface::{get_schema_by_name, Catalog, SoftDeletedRows};
use observability_deps::tracing::*;
use super::memory::CacheMissErr;
use super::NamespaceCache;
use super::{NamespaceCache, NamespaceStats};
/// A [`ReadThroughCache`] decorates a [`NamespaceCache`] with read-through
/// caching behaviour on calls to `self.get_schema()` when contained in an
@ -90,7 +90,7 @@ where
&self,
namespace: NamespaceName<'static>,
schema: impl Into<Arc<NamespaceSchema>>,
) -> Option<Arc<NamespaceSchema>> {
) -> (Option<Arc<NamespaceSchema>>, NamespaceStats) {
self.inner_cache.put_schema(namespace, schema)
}
}
@ -126,7 +126,7 @@ mod tests {
iox_catalog::DEFAULT_MAX_TABLES,
iox_catalog::DEFAULT_RETENTION_PERIOD,
);
assert_matches!(cache.put_schema(ns.clone(), schema1.clone()), None);
assert_matches!(cache.put_schema(ns.clone(), schema1.clone()), (None, _));
// Ensure it is present
assert_eq!(

View File

@ -4,7 +4,7 @@ use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema};
use sharder::JumpHash;
use super::NamespaceCache;
use super::{NamespaceCache, NamespaceStats};
/// A decorator sharding the [`NamespaceCache`] keyspace into a set of `T`.
#[derive(Debug)]
@ -40,7 +40,7 @@ where
&self,
namespace: NamespaceName<'static>,
schema: impl Into<Arc<NamespaceSchema>>,
) -> Option<Arc<NamespaceSchema>> {
) -> (Option<Arc<NamespaceSchema>>, NamespaceStats) {
self.shards.hash(&namespace).put_schema(namespace, schema)
}
}
@ -107,7 +107,7 @@ mod tests {
// Populate the cache
for (name, id) in &names {
let schema = schema_with_id(*id as _);
assert!(cache.put_schema(name.clone(), schema).is_none());
assert_matches!(cache.put_schema(name.clone(), schema), (None, _));
}
// The mapping should be stable