feat(router): Use read-through namespace cache for NamespaceResolver
The NamespaceResolver was using its own very similar look-aside caching to the DML handlers, this commit leverages the read-through cache implementation to deduplicate more code and makes the read through behavioural expectation explicit for namespace autocreation.pull/24376/head
parent
d590d19e3b
commit
728b7293b9
|
@ -362,8 +362,7 @@ pub async fn create_router2_server_type(
|
||||||
|
|
||||||
// e. Namespace resolver
|
// e. Namespace resolver
|
||||||
// Initialise the Namespace ID lookup + cache
|
// Initialise the Namespace ID lookup + cache
|
||||||
let namespace_resolver =
|
let namespace_resolver = NamespaceSchemaResolver::new(Arc::clone(&ns_cache));
|
||||||
NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////
|
||||||
//
|
//
|
||||||
|
@ -554,8 +553,7 @@ pub async fn create_router_server_type<'a>(
|
||||||
|
|
||||||
// e. Namespace resolver
|
// e. Namespace resolver
|
||||||
// Initialise the Namespace ID lookup + cache
|
// Initialise the Namespace ID lookup + cache
|
||||||
let namespace_resolver =
|
let namespace_resolver = NamespaceSchemaResolver::new(Arc::clone(&ns_cache));
|
||||||
NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////
|
||||||
//
|
//
|
||||||
|
|
|
@ -41,11 +41,11 @@ where
|
||||||
&self,
|
&self,
|
||||||
namespace: &NamespaceName<'static>,
|
namespace: &NamespaceName<'static>,
|
||||||
) -> Result<Arc<NamespaceSchema>, Self::ReadError> {
|
) -> Result<Arc<NamespaceSchema>, Self::ReadError> {
|
||||||
let mut repos = self.catalog.repositories().await;
|
|
||||||
|
|
||||||
match self.inner_cache.get_schema(namespace).await {
|
match self.inner_cache.get_schema(namespace).await {
|
||||||
Ok(v) => Ok(v),
|
Ok(v) => Ok(v),
|
||||||
Err(CacheMissErr { .. }) => {
|
Err(CacheMissErr { .. }) => {
|
||||||
|
let mut repos = self.catalog.repositories().await;
|
||||||
|
|
||||||
let schema = match get_schema_by_name(
|
let schema = match get_schema_by_name(
|
||||||
namespace,
|
namespace,
|
||||||
repos.deref_mut(),
|
repos.deref_mut(),
|
||||||
|
|
|
@ -1,11 +1,7 @@
|
||||||
//! An trait to abstract resolving a[`NamespaceName`] to [`NamespaceId`], and a
|
//! An trait to abstract resolving a[`NamespaceName`] to [`NamespaceId`], and a
|
||||||
//! collection of composable implementations.
|
//! collection of composable implementations.
|
||||||
|
|
||||||
use std::{ops::DerefMut, sync::Arc};
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use data_types::{NamespaceId, NamespaceName};
|
use data_types::{NamespaceId, NamespaceName};
|
||||||
use iox_catalog::interface::{get_schema_by_name, Catalog, SoftDeletedRows};
|
|
||||||
use observability_deps::tracing::*;
|
use observability_deps::tracing::*;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
|
@ -37,27 +33,25 @@ pub trait NamespaceResolver: std::fmt::Debug + Send + Sync {
|
||||||
) -> Result<NamespaceId, Error>;
|
) -> Result<NamespaceId, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// An implementation of [`NamespaceResolver`] that queries the [`Catalog`] to
|
/// An implementation of [`NamespaceResolver`] that resolves the [`NamespaceId`]
|
||||||
/// resolve a [`NamespaceId`], and populates the [`NamespaceCache`] as a side
|
/// for a given name through a [`NamespaceCache`].
|
||||||
/// effect.
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct NamespaceSchemaResolver<C> {
|
pub struct NamespaceSchemaResolver<C> {
|
||||||
catalog: Arc<dyn Catalog>,
|
|
||||||
cache: C,
|
cache: C,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<C> NamespaceSchemaResolver<C> {
|
impl<C> NamespaceSchemaResolver<C> {
|
||||||
/// Construct a new [`NamespaceSchemaResolver`] that fetches schemas from
|
/// Construct a new [`NamespaceSchemaResolver`] that resolves namespace IDs
|
||||||
/// `catalog` and caches them in `cache`.
|
/// using `cache`.
|
||||||
pub fn new(catalog: Arc<dyn Catalog>, cache: C) -> Self {
|
pub fn new(cache: C) -> Self {
|
||||||
Self { catalog, cache }
|
Self { cache }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<C> NamespaceResolver for NamespaceSchemaResolver<C>
|
impl<C> NamespaceResolver for NamespaceSchemaResolver<C>
|
||||||
where
|
where
|
||||||
C: NamespaceCache,
|
C: NamespaceCache<ReadError = iox_catalog::interface::Error>,
|
||||||
{
|
{
|
||||||
async fn get_namespace_id(
|
async fn get_namespace_id(
|
||||||
&self,
|
&self,
|
||||||
|
@ -67,36 +61,7 @@ where
|
||||||
// from the global catalog (if it exists).
|
// from the global catalog (if it exists).
|
||||||
match self.cache.get_schema(namespace).await {
|
match self.cache.get_schema(namespace).await {
|
||||||
Ok(v) => Ok(v.id),
|
Ok(v) => Ok(v.id),
|
||||||
Err(_) => {
|
Err(e) => return Err(Error::Lookup(e)),
|
||||||
let mut repos = self.catalog.repositories().await;
|
|
||||||
|
|
||||||
// Pull the schema from the global catalog or error if it does
|
|
||||||
// not exist.
|
|
||||||
let schema = get_schema_by_name(
|
|
||||||
namespace,
|
|
||||||
repos.deref_mut(),
|
|
||||||
SoftDeletedRows::ExcludeDeleted,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.map_err(|e| {
|
|
||||||
warn!(
|
|
||||||
error=%e,
|
|
||||||
%namespace,
|
|
||||||
"failed to retrieve namespace schema"
|
|
||||||
);
|
|
||||||
Error::Lookup(e)
|
|
||||||
})
|
|
||||||
.map(Arc::new)?;
|
|
||||||
|
|
||||||
// Cache population MAY race with other threads and lead to
|
|
||||||
// overwrites, but an entry will always exist once inserted, and
|
|
||||||
// the schemas will eventually converge.
|
|
||||||
self.cache
|
|
||||||
.put_schema(namespace.clone(), Arc::clone(&schema));
|
|
||||||
|
|
||||||
trace!(%namespace, "schema cache populated");
|
|
||||||
Ok(schema.id)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -107,17 +72,26 @@ mod tests {
|
||||||
|
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use data_types::{NamespaceId, NamespaceSchema, QueryPoolId, TopicId};
|
use data_types::{NamespaceId, NamespaceSchema, QueryPoolId, TopicId};
|
||||||
use iox_catalog::mem::MemCatalog;
|
use iox_catalog::{
|
||||||
|
interface::{Catalog, SoftDeletedRows},
|
||||||
|
mem::MemCatalog,
|
||||||
|
};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::namespace_cache::MemoryNamespaceCache;
|
use crate::namespace_cache::{MemoryNamespaceCache, ReadThroughCache};
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_cache_hit() {
|
async fn test_cache_hit() {
|
||||||
let ns = NamespaceName::try_from("bananas").unwrap();
|
let ns = NamespaceName::try_from("bananas").unwrap();
|
||||||
|
|
||||||
|
let metrics = Arc::new(metric::Registry::new());
|
||||||
|
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
||||||
|
|
||||||
// Prep the cache before the test to cause a hit
|
// Prep the cache before the test to cause a hit
|
||||||
let cache = Arc::new(MemoryNamespaceCache::default());
|
let cache = Arc::new(ReadThroughCache::new(
|
||||||
|
Arc::new(MemoryNamespaceCache::default()),
|
||||||
|
Arc::clone(&catalog),
|
||||||
|
));
|
||||||
cache.put_schema(
|
cache.put_schema(
|
||||||
ns.clone(),
|
ns.clone(),
|
||||||
NamespaceSchema {
|
NamespaceSchema {
|
||||||
|
@ -131,10 +105,7 @@ mod tests {
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
let metrics = Arc::new(metric::Registry::new());
|
let resolver = NamespaceSchemaResolver::new(Arc::clone(&cache));
|
||||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
|
||||||
|
|
||||||
let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache));
|
|
||||||
|
|
||||||
// Drive the code under test
|
// Drive the code under test
|
||||||
resolver
|
resolver
|
||||||
|
@ -162,9 +133,12 @@ mod tests {
|
||||||
async fn test_cache_miss() {
|
async fn test_cache_miss() {
|
||||||
let ns = NamespaceName::try_from("bananas").unwrap();
|
let ns = NamespaceName::try_from("bananas").unwrap();
|
||||||
|
|
||||||
let cache = Arc::new(MemoryNamespaceCache::default());
|
|
||||||
let metrics = Arc::new(metric::Registry::new());
|
let metrics = Arc::new(metric::Registry::new());
|
||||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
||||||
|
let cache = Arc::new(ReadThroughCache::new(
|
||||||
|
Arc::new(MemoryNamespaceCache::default()),
|
||||||
|
Arc::clone(&catalog),
|
||||||
|
));
|
||||||
|
|
||||||
// Create the namespace in the catalog
|
// Create the namespace in the catalog
|
||||||
{
|
{
|
||||||
|
@ -178,7 +152,7 @@ mod tests {
|
||||||
.expect("failed to setup catalog state");
|
.expect("failed to setup catalog state");
|
||||||
}
|
}
|
||||||
|
|
||||||
let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache));
|
let resolver = NamespaceSchemaResolver::new(Arc::clone(&cache));
|
||||||
|
|
||||||
resolver
|
resolver
|
||||||
.get_namespace_id(&ns)
|
.get_namespace_id(&ns)
|
||||||
|
@ -193,9 +167,12 @@ mod tests {
|
||||||
async fn test_cache_miss_soft_deleted() {
|
async fn test_cache_miss_soft_deleted() {
|
||||||
let ns = NamespaceName::try_from("bananas").unwrap();
|
let ns = NamespaceName::try_from("bananas").unwrap();
|
||||||
|
|
||||||
let cache = Arc::new(MemoryNamespaceCache::default());
|
|
||||||
let metrics = Arc::new(metric::Registry::new());
|
let metrics = Arc::new(metric::Registry::new());
|
||||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
||||||
|
let cache = Arc::new(ReadThroughCache::new(
|
||||||
|
Arc::new(MemoryNamespaceCache::default()),
|
||||||
|
Arc::clone(&catalog),
|
||||||
|
));
|
||||||
|
|
||||||
// Create the namespace in the catalog and mark it as deleted
|
// Create the namespace in the catalog and mark it as deleted
|
||||||
{
|
{
|
||||||
|
@ -214,7 +191,7 @@ mod tests {
|
||||||
.expect("failed to setup catalog state");
|
.expect("failed to setup catalog state");
|
||||||
}
|
}
|
||||||
|
|
||||||
let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache));
|
let resolver = NamespaceSchemaResolver::new(Arc::clone(&cache));
|
||||||
|
|
||||||
let err = resolver
|
let err = resolver
|
||||||
.get_namespace_id(&ns)
|
.get_namespace_id(&ns)
|
||||||
|
@ -233,11 +210,14 @@ mod tests {
|
||||||
async fn test_cache_miss_does_not_exist() {
|
async fn test_cache_miss_does_not_exist() {
|
||||||
let ns = NamespaceName::try_from("bananas").unwrap();
|
let ns = NamespaceName::try_from("bananas").unwrap();
|
||||||
|
|
||||||
let cache = Arc::new(MemoryNamespaceCache::default());
|
|
||||||
let metrics = Arc::new(metric::Registry::new());
|
let metrics = Arc::new(metric::Registry::new());
|
||||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
||||||
|
let cache = Arc::new(ReadThroughCache::new(
|
||||||
|
Arc::new(MemoryNamespaceCache::default()),
|
||||||
|
Arc::clone(&catalog),
|
||||||
|
));
|
||||||
|
|
||||||
let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache));
|
let resolver = NamespaceSchemaResolver::new(Arc::clone(&cache));
|
||||||
|
|
||||||
let err = resolver
|
let err = resolver
|
||||||
.get_namespace_id(&ns)
|
.get_namespace_id(&ns)
|
||||||
|
|
|
@ -79,7 +79,7 @@ impl<C, T> NamespaceAutocreation<C, T> {
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<C, T> NamespaceResolver for NamespaceAutocreation<C, T>
|
impl<C, T> NamespaceResolver for NamespaceAutocreation<C, T>
|
||||||
where
|
where
|
||||||
C: NamespaceCache,
|
C: NamespaceCache<ReadError = iox_catalog::interface::Error>, // The resolver relies on the cache for read-through cache behaviour
|
||||||
T: NamespaceResolver,
|
T: NamespaceResolver,
|
||||||
{
|
{
|
||||||
/// Force the creation of `namespace` if it does not already exist in the
|
/// Force the creation of `namespace` if it does not already exist in the
|
||||||
|
@ -153,7 +153,7 @@ mod tests {
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::{
|
use crate::{
|
||||||
namespace_cache::MemoryNamespaceCache,
|
namespace_cache::{MemoryNamespaceCache, ReadThroughCache},
|
||||||
namespace_resolver::{mock::MockNamespaceResolver, NamespaceSchemaResolver},
|
namespace_resolver::{mock::MockNamespaceResolver, NamespaceSchemaResolver},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -166,8 +166,14 @@ mod tests {
|
||||||
|
|
||||||
let ns = NamespaceName::try_from("bananas").unwrap();
|
let ns = NamespaceName::try_from("bananas").unwrap();
|
||||||
|
|
||||||
|
let metrics = Arc::new(metric::Registry::new());
|
||||||
|
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
||||||
|
|
||||||
// Prep the cache before the test to cause a hit
|
// Prep the cache before the test to cause a hit
|
||||||
let cache = Arc::new(MemoryNamespaceCache::default());
|
let cache = Arc::new(ReadThroughCache::new(
|
||||||
|
Arc::new(MemoryNamespaceCache::default()),
|
||||||
|
Arc::clone(&catalog),
|
||||||
|
));
|
||||||
cache.put_schema(
|
cache.put_schema(
|
||||||
ns.clone(),
|
ns.clone(),
|
||||||
NamespaceSchema {
|
NamespaceSchema {
|
||||||
|
@ -181,9 +187,6 @@ mod tests {
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
let metrics = Arc::new(metric::Registry::new());
|
|
||||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
|
||||||
|
|
||||||
let creator = NamespaceAutocreation::new(
|
let creator = NamespaceAutocreation::new(
|
||||||
MockNamespaceResolver::default().with_mapping(ns.clone(), NAMESPACE_ID),
|
MockNamespaceResolver::default().with_mapping(ns.clone(), NAMESPACE_ID),
|
||||||
cache,
|
cache,
|
||||||
|
@ -218,10 +221,14 @@ mod tests {
|
||||||
async fn test_cache_miss() {
|
async fn test_cache_miss() {
|
||||||
let ns = NamespaceName::try_from("bananas").unwrap();
|
let ns = NamespaceName::try_from("bananas").unwrap();
|
||||||
|
|
||||||
let cache = Arc::new(MemoryNamespaceCache::default());
|
|
||||||
let metrics = Arc::new(metric::Registry::new());
|
let metrics = Arc::new(metric::Registry::new());
|
||||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
||||||
|
|
||||||
|
let cache = Arc::new(ReadThroughCache::new(
|
||||||
|
Arc::new(MemoryNamespaceCache::default()),
|
||||||
|
Arc::clone(&catalog),
|
||||||
|
));
|
||||||
|
|
||||||
let creator = NamespaceAutocreation::new(
|
let creator = NamespaceAutocreation::new(
|
||||||
MockNamespaceResolver::default().with_mapping(ns.clone(), NamespaceId::new(1)),
|
MockNamespaceResolver::default().with_mapping(ns.clone(), NamespaceId::new(1)),
|
||||||
cache,
|
cache,
|
||||||
|
@ -266,9 +273,12 @@ mod tests {
|
||||||
async fn test_reject() {
|
async fn test_reject() {
|
||||||
let ns = NamespaceName::try_from("bananas").unwrap();
|
let ns = NamespaceName::try_from("bananas").unwrap();
|
||||||
|
|
||||||
let cache = Arc::new(MemoryNamespaceCache::default());
|
|
||||||
let metrics = Arc::new(metric::Registry::new());
|
let metrics = Arc::new(metric::Registry::new());
|
||||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
||||||
|
let cache = Arc::new(ReadThroughCache::new(
|
||||||
|
Arc::new(MemoryNamespaceCache::default()),
|
||||||
|
Arc::clone(&catalog),
|
||||||
|
));
|
||||||
|
|
||||||
let creator = NamespaceAutocreation::new(
|
let creator = NamespaceAutocreation::new(
|
||||||
MockNamespaceResolver::default(),
|
MockNamespaceResolver::default(),
|
||||||
|
@ -302,13 +312,16 @@ mod tests {
|
||||||
async fn test_reject_exists_in_catalog() {
|
async fn test_reject_exists_in_catalog() {
|
||||||
let ns = NamespaceName::try_from("bananas").unwrap();
|
let ns = NamespaceName::try_from("bananas").unwrap();
|
||||||
|
|
||||||
let cache = Arc::new(MemoryNamespaceCache::default());
|
|
||||||
let metrics = Arc::new(metric::Registry::new());
|
let metrics = Arc::new(metric::Registry::new());
|
||||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
||||||
|
let cache = Arc::new(ReadThroughCache::new(
|
||||||
|
Arc::new(MemoryNamespaceCache::default()),
|
||||||
|
Arc::clone(&catalog),
|
||||||
|
));
|
||||||
|
|
||||||
// First drive the population of the catalog
|
// First drive the population of the catalog
|
||||||
let creator = NamespaceAutocreation::new(
|
let creator = NamespaceAutocreation::new(
|
||||||
NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)),
|
NamespaceSchemaResolver::new(Arc::clone(&cache)),
|
||||||
Arc::clone(&cache),
|
Arc::clone(&cache),
|
||||||
Arc::clone(&catalog),
|
Arc::clone(&catalog),
|
||||||
TopicId::new(42),
|
TopicId::new(42),
|
||||||
|
@ -323,7 +336,7 @@ mod tests {
|
||||||
|
|
||||||
// Now try in "reject" mode.
|
// Now try in "reject" mode.
|
||||||
let creator = NamespaceAutocreation::new(
|
let creator = NamespaceAutocreation::new(
|
||||||
NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)),
|
NamespaceSchemaResolver::new(Arc::clone(&cache)),
|
||||||
cache,
|
cache,
|
||||||
Arc::clone(&catalog),
|
Arc::clone(&catalog),
|
||||||
TopicId::new(42),
|
TopicId::new(42),
|
||||||
|
|
|
@ -170,8 +170,7 @@ impl TestContext {
|
||||||
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
|
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
|
||||||
});
|
});
|
||||||
|
|
||||||
let namespace_resolver =
|
let namespace_resolver = NamespaceSchemaResolver::new(Arc::clone(&ns_cache));
|
||||||
NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
|
|
||||||
let namespace_resolver = NamespaceAutocreation::new(
|
let namespace_resolver = NamespaceAutocreation::new(
|
||||||
namespace_resolver,
|
namespace_resolver,
|
||||||
Arc::clone(&ns_cache),
|
Arc::clone(&ns_cache),
|
||||||
|
|
Loading…
Reference in New Issue