Merge pull request #7503 from influxdata/savage/router-read-through-namespace-cache

feat(router): Use a read-through decorator for `NamespaceCache` usage
pull/24376/head
kodiakhq[bot] 2023-04-12 13:50:17 +00:00 committed by GitHub
commit 1c24440305
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 466 additions and 282 deletions

View File

@ -31,7 +31,8 @@ use router::{
ShardedWriteBuffer, WriteSummaryAdapter,
},
namespace_cache::{
metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ShardedCache,
metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ReadThroughCache,
ShardedCache,
},
namespace_resolver::{
MissingNamespaceAction, NamespaceAutocreation, NamespaceResolver, NamespaceSchemaResolver,
@ -322,11 +323,14 @@ pub async fn create_router2_server_type(
// Initialise an instrumented namespace cache to be shared with the schema
// validator, and namespace auto-creator that reports cache hit/miss/update
// metrics.
let ns_cache = Arc::new(InstrumentedCache::new(
Arc::new(ShardedCache::new(
std::iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
let ns_cache = Arc::new(ReadThroughCache::new(
Arc::new(InstrumentedCache::new(
Arc::new(ShardedCache::new(
std::iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
)),
&metrics,
)),
&metrics,
Arc::clone(&catalog),
));
pre_warm_schema_cache(&ns_cache, &*catalog)
@ -342,7 +346,7 @@ pub async fn create_router2_server_type(
// c. Retention validator
// Add a retention validator into handler stack to reject data outside the retention period
let retention_validator = RetentionValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
let retention_validator = RetentionValidator::new(Arc::clone(&ns_cache));
let retention_validator =
InstrumentationDecorator::new("retention_validator", &metrics, retention_validator);
@ -358,8 +362,7 @@ pub async fn create_router2_server_type(
// e. Namespace resolver
// Initialise the Namespace ID lookup + cache
let namespace_resolver =
NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
let namespace_resolver = NamespaceSchemaResolver::new(Arc::clone(&ns_cache));
////////////////////////////////////////////////////////////////////////////
//
@ -513,11 +516,14 @@ pub async fn create_router_server_type<'a>(
// Initialise an instrumented namespace cache to be shared with the schema
// validator, and namespace auto-creator that reports cache hit/miss/update
// metrics.
let ns_cache = Arc::new(InstrumentedCache::new(
Arc::new(ShardedCache::new(
std::iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
let ns_cache = Arc::new(ReadThroughCache::new(
Arc::new(InstrumentedCache::new(
Arc::new(ShardedCache::new(
std::iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
)),
&metrics,
)),
&metrics,
Arc::clone(&catalog),
));
pre_warm_schema_cache(&ns_cache, &*catalog)
@ -533,7 +539,7 @@ pub async fn create_router_server_type<'a>(
// c. Retention validator
// Add a retention validator into handler stack to reject data outside the retention period
let retention_validator = RetentionValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
let retention_validator = RetentionValidator::new(Arc::clone(&ns_cache));
let retention_validator =
InstrumentationDecorator::new("retention_validator", &metrics, retention_validator);
@ -547,8 +553,7 @@ pub async fn create_router_server_type<'a>(
// e. Namespace resolver
// Initialise the Namespace ID lookup + cache
let namespace_resolver =
NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
let namespace_resolver = NamespaceSchemaResolver::new(Arc::clone(&ns_cache));
////////////////////////////////////////////////////////////////////////////
//
@ -799,7 +804,10 @@ mod tests {
.expect("pre-warming failed");
let name = NamespaceName::new("test_ns").unwrap();
let got = cache.get_schema(&name).expect("should contain a schema");
let got = cache
.get_schema(&name)
.await
.expect("should contain a schema");
assert!(got.tables.get("name").is_some());
}

View File

@ -9,7 +9,7 @@ use router::{
DmlHandlerChainExt, FanOutAdaptor, Partitioner, SchemaValidator, ShardedWriteBuffer,
WriteSummaryAdapter,
},
namespace_cache::{MemoryNamespaceCache, ShardedCache},
namespace_cache::{MemoryNamespaceCache, ReadThroughCache, ShardedCache},
namespace_resolver::mock::MockNamespaceResolver,
server::http::{
write::{multi_tenant::MultiTenantRequestParser, WriteParamExtractor},
@ -64,8 +64,11 @@ fn e2e_benchmarks(c: &mut Criterion) {
let delegate = {
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let ns_cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
let ns_cache = Arc::new(ReadThroughCache::new(
Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
)),
Arc::clone(&catalog),
));
let write_buffer = init_write_buffer(1);

View File

@ -6,12 +6,12 @@ use criterion::{
};
use data_types::{NamespaceId, NamespaceName};
use hashbrown::HashMap;
use iox_catalog::mem::MemCatalog;
use iox_catalog::{interface::Catalog, mem::MemCatalog};
use mutable_batch::MutableBatch;
use once_cell::sync::Lazy;
use router::{
dml_handlers::{DmlHandler, SchemaValidator},
namespace_cache::{MemoryNamespaceCache, ShardedCache},
namespace_cache::{MemoryNamespaceCache, ReadThroughCache, ShardedCache},
};
use schema::Projection;
use tokio::runtime::Runtime;
@ -41,9 +41,12 @@ fn sharder_benchmarks(c: &mut Criterion) {
fn bench(group: &mut BenchmarkGroup<WallTime>, tables: usize, columns_per_table: usize) {
let metrics = Arc::new(metric::Registry::default());
let catalog = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let ns_cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let ns_cache = Arc::new(ReadThroughCache::new(
Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
)),
Arc::clone(&catalog),
));
let validator = SchemaValidator::new(catalog, ns_cache, &metrics);

View File

@ -1,9 +1,6 @@
use std::{ops::DerefMut, sync::Arc};
use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName};
use hashbrown::HashMap;
use iox_catalog::interface::{get_schema_by_name, Catalog, SoftDeletedRows};
use iox_time::{SystemProvider, TimeProvider};
use mutable_batch::MutableBatch;
use observability_deps::tracing::*;
@ -11,7 +8,7 @@ use thiserror::Error;
use trace::ctx::SpanContext;
use super::DmlHandler;
use crate::namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache};
use crate::namespace_cache::NamespaceCache;
/// Errors emitted during retention validation.
#[derive(Debug, Error)]
@ -33,21 +30,17 @@ pub enum RetentionError {
/// entire write is rejected.
///
/// Namespace retention periods are loaded from the provided [`NamespaceCache`]
/// implementation. If a cache miss occurs, the [`Catalog`] is queried and the
/// cache is populated.
/// implementation.
#[derive(Debug)]
pub struct RetentionValidator<C = Arc<InstrumentedCache<MemoryNamespaceCache>>, P = SystemProvider>
{
catalog: Arc<dyn Catalog>,
pub struct RetentionValidator<C, P = SystemProvider> {
cache: C,
time_provider: P,
}
impl<C> RetentionValidator<C> {
/// Initialise a new [`RetentionValidator`], rejecting time outside retention period
pub fn new(catalog: Arc<dyn Catalog>, cache: C) -> Self {
pub fn new(cache: C) -> Self {
Self {
catalog,
cache,
time_provider: Default::default(),
}
@ -57,7 +50,7 @@ impl<C> RetentionValidator<C> {
#[async_trait]
impl<C> DmlHandler for RetentionValidator<C>
where
C: NamespaceCache,
C: NamespaceCache<ReadError = iox_catalog::interface::Error>, // The handler expects the cache to read from the catalog if necessary.
{
type WriteError = RetentionError;
type DeleteError = RetentionError;
@ -69,43 +62,14 @@ where
async fn write(
&self,
namespace: &NamespaceName<'static>,
namespace_id: NamespaceId,
_namespace_id: NamespaceId,
batch: Self::WriteInput,
_span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
let mut repos = self.catalog.repositories().await;
// Load the namespace schema from the cache, falling back to pulling it
// from the global catalog (if it exists).
let schema = self.cache.get_schema(namespace);
let schema = match schema {
Some(v) => v,
None => {
// Pull the schema from the global catalog or error if it does
// not exist.
let schema = get_schema_by_name(
namespace,
repos.deref_mut(),
SoftDeletedRows::ExcludeDeleted,
)
.await
.map_err(|e| {
warn!(
error=%e,
%namespace,
%namespace_id,
"failed to retrieve namespace schema"
);
RetentionError::NamespaceLookup(e)
})
.map(Arc::new)?;
self.cache
.put_schema(namespace.clone(), Arc::clone(&schema));
trace!(%namespace, "schema cache populated");
schema
}
// Try to fetch the namespace schema through the cache.
let schema = match self.cache.get_schema(namespace).await {
Ok(v) => v,
Err(e) => return Err(RetentionError::NamespaceLookup(e)),
};
// retention is not infinte, validate all lines of a write are within the retention period
@ -139,14 +103,25 @@ where
#[cfg(test)]
mod tests {
use iox_tests::{TestCatalog, TestNamespace};
use once_cell::sync::Lazy;
use std::sync::Arc;
use iox_tests::{TestCatalog, TestNamespace};
use once_cell::sync::Lazy;
use super::*;
use crate::namespace_cache::{MemoryNamespaceCache, ReadThroughCache};
static NAMESPACE: Lazy<NamespaceName<'static>> = Lazy::new(|| "bananas".try_into().unwrap());
fn setup_test_cache(
catalog: Arc<TestCatalog>,
) -> Arc<ReadThroughCache<Arc<MemoryNamespaceCache>>> {
Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
catalog.catalog(),
))
}
#[tokio::test]
async fn test_time_inside_retention_period() {
let (catalog, namespace) = test_setup().await;
@ -154,9 +129,9 @@ mod tests {
// Create the table so that there is a known ID that must be returned.
let _want_id = namespace.create_table("bananas").await.table.id;
// Create the validator whse retention period is 1 hour
let handler =
RetentionValidator::new(catalog.catalog(), Arc::new(MemoryNamespaceCache::default()));
// Create the validator whose retention period is 1 hour
let cache = setup_test_cache(catalog);
let handler = RetentionValidator::new(cache);
// Make time now to be inside the retention period
let now = SystemProvider::default()
@ -182,8 +157,8 @@ mod tests {
let _want_id = namespace.create_table("bananas").await.table.id;
// Create the validator whose retention period is 1 hour
let handler =
RetentionValidator::new(catalog.catalog(), Arc::new(MemoryNamespaceCache::default()));
let cache = setup_test_cache(catalog);
let handler = RetentionValidator::new(cache);
// Make time outside the retention period
let two_hours_ago = (SystemProvider::default().now().timestamp_nanos()
@ -209,9 +184,9 @@ mod tests {
// Create the table so that there is a known ID that must be returned.
let _want_id = namespace.create_table("bananas").await.table.id;
// Create the validator whse retention period is 1 hour
let handler =
RetentionValidator::new(catalog.catalog(), Arc::new(MemoryNamespaceCache::default()));
// Create the validator whose retention period is 1 hour
let cache = setup_test_cache(catalog);
let handler = RetentionValidator::new(cache);
// Make time now to be inside the retention period
let now = SystemProvider::default()
@ -246,8 +221,8 @@ mod tests {
let _want_id = namespace.create_table("bananas").await.table.id;
// Create the validator whse retention period is 1 hour
let handler =
RetentionValidator::new(catalog.catalog(), Arc::new(MemoryNamespaceCache::default()));
let cache = setup_test_cache(catalog);
let handler = RetentionValidator::new(cache);
// Make time now to be inside the retention period
let now = SystemProvider::default()

View File

@ -4,7 +4,7 @@ use async_trait::async_trait;
use data_types::{DeletePredicate, NamespaceId, NamespaceName, NamespaceSchema, TableId};
use hashbrown::HashMap;
use iox_catalog::{
interface::{get_schema_by_name, Catalog, Error as CatalogError, SoftDeletedRows},
interface::{Catalog, Error as CatalogError},
validate_or_insert_schema,
};
use metric::U64Counter;
@ -14,7 +14,7 @@ use thiserror::Error;
use trace::ctx::SpanContext;
use super::DmlHandler;
use crate::namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache};
use crate::namespace_cache::NamespaceCache;
/// Errors emitted during schema validation.
#[derive(Debug, Error)]
@ -102,7 +102,7 @@ pub enum SchemaError {
///
/// [#3573]: https://github.com/influxdata/influxdb_iox/issues/3573
#[derive(Debug)]
pub struct SchemaValidator<C = Arc<InstrumentedCache<MemoryNamespaceCache>>> {
pub struct SchemaValidator<C> {
catalog: Arc<dyn Catalog>,
cache: C,
@ -113,9 +113,7 @@ pub struct SchemaValidator<C = Arc<InstrumentedCache<MemoryNamespaceCache>>> {
impl<C> SchemaValidator<C> {
/// Initialise a new [`SchemaValidator`] decorator, loading schemas from
/// `catalog`.
///
/// Schemas are cached in `ns_cache`.
/// `catalog` and the provided `ns_cache`.
pub fn new(catalog: Arc<dyn Catalog>, ns_cache: C, metrics: &metric::Registry) -> Self {
let service_limit_hit = metrics.register_metric::<U64Counter>(
"schema_validation_service_limit_reached",
@ -144,7 +142,7 @@ impl<C> SchemaValidator<C> {
#[async_trait]
impl<C> DmlHandler for SchemaValidator<C>
where
C: NamespaceCache,
C: NamespaceCache<ReadError = iox_catalog::interface::Error>, // The handler expects the cache to read from the catalog if necessary.
{
type WriteError = SchemaError;
type DeleteError = SchemaError;
@ -176,39 +174,10 @@ where
batches: Self::WriteInput,
_span_ctx: Option<SpanContext>,
) -> Result<Self::WriteOutput, Self::WriteError> {
let mut repos = self.catalog.repositories().await;
// Load the namespace schema from the cache, falling back to pulling it
// from the global catalog (if it exists).
let schema = self.cache.get_schema(namespace);
let schema = match schema {
Some(v) => v,
None => {
// Pull the schema from the global catalog or error if it does
// not exist.
let schema = get_schema_by_name(
namespace,
repos.deref_mut(),
SoftDeletedRows::ExcludeDeleted,
)
.await
.map_err(|e| {
warn!(
error=%e,
%namespace,
%namespace_id,
"failed to retrieve namespace schema"
);
SchemaError::NamespaceLookup(e)
})
.map(Arc::new)?;
self.cache
.put_schema(namespace.clone(), Arc::clone(&schema));
trace!(%namespace, "schema cache populated");
schema
}
// Try to fetch the namespace schema through the cache.
let schema = match self.cache.get_schema(namespace).await {
Ok(v) => v,
Err(e) => return Err(SchemaError::NamespaceLookup(e)),
};
validate_schema_limits(&batches, &schema).map_err(|e| {
@ -249,6 +218,8 @@ where
SchemaError::ServiceLimit(Box::new(e))
})?;
let mut repos = self.catalog.repositories().await;
let maybe_new_schema = validate_or_insert_schema(
batches.iter().map(|(k, v)| (k.as_str(), v)),
&schema,
@ -495,6 +466,7 @@ mod tests {
use once_cell::sync::Lazy;
use super::*;
use crate::namespace_cache::{MemoryNamespaceCache, ReadThroughCache};
static NAMESPACE: Lazy<NamespaceName<'static>> = Lazy::new(|| "bananas".try_into().unwrap());
@ -597,12 +569,12 @@ mod tests {
// Make two schema validator instances each with their own cache
let handler1 = SchemaValidator::new(
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
setup_test_cache(&catalog),
&catalog.metric_registry,
);
let handler2 = SchemaValidator::new(
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
setup_test_cache(&catalog),
&catalog.metric_registry,
);
@ -785,7 +757,14 @@ mod tests {
(catalog, namespace)
}
fn assert_cache<C>(handler: &SchemaValidator<C>, table: &str, col: &str, want: ColumnType)
fn setup_test_cache(catalog: &TestCatalog) -> Arc<ReadThroughCache<Arc<MemoryNamespaceCache>>> {
Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
catalog.catalog(),
))
}
async fn assert_cache<C>(handler: &SchemaValidator<C>, table: &str, col: &str, want: ColumnType)
where
C: NamespaceCache,
{
@ -793,6 +772,7 @@ mod tests {
let ns = handler
.cache
.get_schema(&NAMESPACE)
.await
.expect("cache should be populated");
let table = ns.tables.get(table).expect("table should exist in cache");
assert_eq!(
@ -813,11 +793,7 @@ mod tests {
let want_id = namespace.create_table("bananas").await.table.id;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&metrics,
);
let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics);
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
let got = handler
@ -826,10 +802,10 @@ mod tests {
.expect("request should succeed");
// The cache should be populated.
assert_cache(&handler, "bananas", "tag1", ColumnType::Tag);
assert_cache(&handler, "bananas", "tag2", ColumnType::Tag);
assert_cache(&handler, "bananas", "val", ColumnType::I64);
assert_cache(&handler, "bananas", "time", ColumnType::Time);
assert_cache(&handler, "bananas", "tag1", ColumnType::Tag).await;
assert_cache(&handler, "bananas", "tag2", ColumnType::Tag).await;
assert_cache(&handler, "bananas", "val", ColumnType::I64).await;
assert_cache(&handler, "bananas", "time", ColumnType::Time).await;
// Validate the table ID mapping.
let (name, _data) = got.get(&want_id).expect("table not in output");
@ -840,11 +816,7 @@ mod tests {
async fn test_write_schema_not_found() {
let (catalog, _namespace) = test_setup().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&metrics,
);
let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics);
let ns = NamespaceName::try_from("A_DIFFERENT_NAMESPACE").unwrap();
@ -857,18 +829,14 @@ mod tests {
assert_matches!(err, SchemaError::NamespaceLookup(_));
// The cache should not have retained the schema.
assert!(handler.cache.get_schema(&ns).is_none());
assert!(handler.cache.get_schema(&ns).await.is_err());
}
#[tokio::test]
async fn test_write_validation_failure() {
let (catalog, _namespace) = test_setup().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&metrics,
);
let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics);
// First write sets the schema
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); // val=i64
@ -890,10 +858,10 @@ mod tests {
});
// The cache should retain the original schema.
assert_cache(&handler, "bananas", "tag1", ColumnType::Tag);
assert_cache(&handler, "bananas", "tag2", ColumnType::Tag);
assert_cache(&handler, "bananas", "val", ColumnType::I64); // original type
assert_cache(&handler, "bananas", "time", ColumnType::Time);
assert_cache(&handler, "bananas", "tag1", ColumnType::Tag).await;
assert_cache(&handler, "bananas", "tag2", ColumnType::Tag).await;
assert_cache(&handler, "bananas", "val", ColumnType::I64).await; // original type
assert_cache(&handler, "bananas", "time", ColumnType::Time).await;
assert_eq!(1, handler.schema_conflict.fetch());
}
@ -902,11 +870,7 @@ mod tests {
async fn test_write_table_service_limit() {
let (catalog, _namespace) = test_setup().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&metrics,
);
let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics);
// First write sets the schema
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
@ -941,11 +905,7 @@ mod tests {
async fn test_write_column_service_limit() {
let (catalog, namespace) = test_setup().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&metrics,
);
let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics);
// First write sets the schema
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");
@ -957,11 +917,7 @@ mod tests {
// Configure the service limit to be hit next request
namespace.update_column_limit(1).await;
let handler = SchemaValidator::new(
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&metrics,
);
let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics);
// Second write attempts to violate limits, causing an error
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i,val2=42i 123456");
@ -978,11 +934,7 @@ mod tests {
async fn test_first_write_many_columns_service_limit() {
let (catalog, _namespace) = test_setup().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&metrics,
);
let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics);
// Configure the service limit to be hit next request
catalog
@ -1012,11 +964,7 @@ mod tests {
let (catalog, _namespace) = test_setup().await;
let metrics = Arc::new(metric::Registry::default());
let handler = SchemaValidator::new(
catalog.catalog(),
Arc::new(MemoryNamespaceCache::default()),
&metrics,
);
let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics);
let predicate = DeletePredicate {
range: TimestampRange::new(1, 2),
@ -1031,6 +979,6 @@ mod tests {
.expect("request should succeed");
// Deletes have no effect on the cache.
assert!(handler.cache.get_schema(&ns).is_none());
assert!(handler.cache.get_schema(&ns).await.is_err());
}
}

View File

@ -8,14 +8,27 @@ pub use sharded_cache::*;
pub mod metrics;
use std::{fmt::Debug, sync::Arc};
mod read_through_cache;
pub use read_through_cache::*;
use std::{error::Error, fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema};
/// An abstract cache of [`NamespaceSchema`].
#[async_trait]
pub trait NamespaceCache: Debug + Send + Sync {
/// The type of error a [`NamespaceCache`] implementation produces
/// when unable to read the [`NamespaceSchema`] requested from the
/// cache.
type ReadError: Error + Send;
/// Return the [`NamespaceSchema`] for `namespace`.
fn get_schema(&self, namespace: &NamespaceName<'_>) -> Option<Arc<NamespaceSchema>>;
async fn get_schema(
&self,
namespace: &NamespaceName<'static>,
) -> Result<Arc<NamespaceSchema>, Self::ReadError>;
/// Place `schema` in the cache, unconditionally overwriting any existing
/// [`NamespaceSchema`] mapped to `namespace`, returning

View File

@ -1,11 +1,20 @@
use std::sync::Arc;
use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema};
use hashbrown::HashMap;
use parking_lot::RwLock;
use thiserror::Error;
use super::NamespaceCache;
/// An error type indicating that `namespace` is not present in the cache.
#[derive(Debug, Error)]
#[error("namespace {namespace} not found in cache")]
pub struct CacheMissErr {
pub(super) namespace: NamespaceName<'static>,
}
/// An in-memory cache of [`NamespaceSchema`] backed by a hashmap protected with
/// a read-write mutex.
#[derive(Debug, Default)]
@ -13,9 +22,21 @@ pub struct MemoryNamespaceCache {
cache: RwLock<HashMap<NamespaceName<'static>, Arc<NamespaceSchema>>>,
}
#[async_trait]
impl NamespaceCache for Arc<MemoryNamespaceCache> {
fn get_schema(&self, namespace: &NamespaceName<'_>) -> Option<Arc<NamespaceSchema>> {
self.cache.read().get(namespace).map(Arc::clone)
type ReadError = CacheMissErr;
async fn get_schema(
&self,
namespace: &NamespaceName<'static>,
) -> Result<Arc<NamespaceSchema>, Self::ReadError> {
self.cache
.read()
.get(namespace)
.ok_or(CacheMissErr {
namespace: namespace.clone(),
})
.map(Arc::clone)
}
fn put_schema(
@ -29,16 +50,22 @@ impl NamespaceCache for Arc<MemoryNamespaceCache> {
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use data_types::{NamespaceId, QueryPoolId, TopicId};
use super::*;
#[test]
fn test_put_get() {
#[tokio::test]
async fn test_put_get() {
let ns = NamespaceName::new("test").expect("namespace name is valid");
let cache = Arc::new(MemoryNamespaceCache::default());
assert!(cache.get_schema(&ns).is_none());
assert_matches!(
cache.get_schema(&ns).await,
Err(CacheMissErr { namespace: got_ns }) => {
assert_eq!(got_ns, ns);
}
);
let schema1 = NamespaceSchema {
id: NamespaceId::new(42),
@ -50,7 +77,10 @@ mod tests {
retention_period_ns: Some(876),
};
assert!(cache.put_schema(ns.clone(), schema1.clone()).is_none());
assert_eq!(*cache.get_schema(&ns).expect("lookup failure"), schema1);
assert_eq!(
*cache.get_schema(&ns).await.expect("lookup failure"),
schema1
);
let schema2 = NamespaceSchema {
id: NamespaceId::new(2),
@ -68,6 +98,9 @@ mod tests {
.expect("should have existing schema"),
schema1
);
assert_eq!(*cache.get_schema(&ns).expect("lookup failure"), schema2);
assert_eq!(
*cache.get_schema(&ns).await.expect("lookup failure"),
schema2
);
}
}

View File

@ -2,6 +2,7 @@
use std::sync::Arc;
use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema};
use iox_time::{SystemProvider, TimeProvider};
use metric::{DurationHistogram, Metric, U64Gauge};
@ -69,21 +70,27 @@ impl<T> InstrumentedCache<T> {
}
}
#[async_trait]
impl<T, P> NamespaceCache for Arc<InstrumentedCache<T, P>>
where
T: NamespaceCache,
P: TimeProvider,
{
fn get_schema(&self, namespace: &NamespaceName<'_>) -> Option<Arc<NamespaceSchema>> {
type ReadError = T::ReadError;
async fn get_schema(
&self,
namespace: &NamespaceName<'static>,
) -> Result<Arc<NamespaceSchema>, Self::ReadError> {
let t = self.time_provider.now();
let res = self.inner.get_schema(namespace);
let res = self.inner.get_schema(namespace).await;
// Avoid exploding if time goes backwards - simply drop the measurement
// if it happens.
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
match &res {
Some(_) => self.get_hit.record(delta),
None => self.get_miss.record(delta),
Ok(_) => self.get_hit.record(delta),
Err(_) => self.get_miss.record(delta),
};
}
@ -224,8 +231,8 @@ mod tests {
);
}
#[test]
fn test_put() {
#[tokio::test]
async fn test_put() {
let ns = NamespaceName::new("test").expect("namespace name is valid");
let registry = metric::Registry::default();
let cache = Arc::new(MemoryNamespaceCache::default());
@ -376,7 +383,7 @@ mod tests {
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(5));
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(42));
let _got = cache.get_schema(&ns).expect("should exist");
let _got = cache.get_schema(&ns).await.expect("should exist");
assert_histogram_hit(
&registry,
"namespace_cache_get_duration",

View File

@ -0,0 +1,183 @@
//! Read-through caching behaviour for a [`NamespaceCache`] implementation
use std::{ops::DerefMut, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema};
use iox_catalog::interface::{get_schema_by_name, Catalog, SoftDeletedRows};
use observability_deps::tracing::*;
use super::memory::CacheMissErr;
use super::NamespaceCache;
/// A [`ReadThroughCache`] decorates a [`NamespaceCache`] with read-through
/// caching behaviour on calls to `self.get_schema()` when contained in an
/// [`Arc`], resolving cache misses with the provided [`Catalog`].
///
/// Filters out all soft-deleted namespaces when resolving.
///
/// No attempt to serialise cache misses for a particular namespace is made -
/// `N` concurrent calls for a missing namespace will cause `N` concurrent
/// catalog queries, and `N` [`NamespaceSchema`] instances replacing each other
/// in the cache before converging on a single instance (last resolved wins).
/// Subsequent queries will return the currently cached instance.
#[derive(Debug)]
pub struct ReadThroughCache<T> {
inner_cache: T,
catalog: Arc<dyn Catalog>,
}
impl<T> ReadThroughCache<T> {
/// Decorates `inner_cache` with read-through caching behaviour, looking
/// up schema from `catalog` when not present in the underlying cache.
pub fn new(inner_cache: T, catalog: Arc<dyn Catalog>) -> Self {
Self {
inner_cache,
catalog,
}
}
}
#[async_trait]
impl<T> NamespaceCache for Arc<ReadThroughCache<T>>
where
T: NamespaceCache<ReadError = CacheMissErr>,
{
type ReadError = iox_catalog::interface::Error;
/// Fetch the schema for `namespace` directly from the inner cache if
/// present, pullng from the catalog if not.
async fn get_schema(
&self,
namespace: &NamespaceName<'static>,
) -> Result<Arc<NamespaceSchema>, Self::ReadError> {
match self.inner_cache.get_schema(namespace).await {
Ok(v) => Ok(v),
Err(CacheMissErr {
namespace: cache_ns,
}) => {
// Invariant: the cache should not return misses for a different
// namespace name.
assert_eq!(cache_ns, *namespace);
let mut repos = self.catalog.repositories().await;
let schema = match get_schema_by_name(
namespace,
repos.deref_mut(),
SoftDeletedRows::ExcludeDeleted,
)
.await
{
Ok(v) => Arc::new(v),
Err(e) => {
warn!(
error = %e,
%namespace,
"failed to retrieve namespace schema"
);
return Err(e);
}
};
self.put_schema(namespace.clone(), Arc::clone(&schema));
trace!(%namespace, "schema cache populated");
Ok(schema)
}
}
}
fn put_schema(
&self,
namespace: NamespaceName<'static>,
schema: impl Into<Arc<NamespaceSchema>>,
) -> Option<Arc<NamespaceSchema>> {
self.inner_cache.put_schema(namespace, schema)
}
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use data_types::{NamespaceId, QueryPoolId, TopicId};
use iox_catalog::mem::MemCatalog;
use super::*;
use crate::namespace_cache::memory::MemoryNamespaceCache;
#[tokio::test]
async fn test_put_get() {
let ns = NamespaceName::try_from("arán").expect("namespace name should be valid");
let inner = Arc::new(MemoryNamespaceCache::default());
let metrics = Arc::new(metric::Registry::new());
let catalog = Arc::new(MemCatalog::new(metrics));
let cache = Arc::new(ReadThroughCache::new(inner, catalog));
// Pre-condition: Namespace not in cache or catalog.
assert_matches!(cache.get_schema(&ns).await, Err(_));
// Place a schema in the cache for that name
let schema1 = NamespaceSchema::new(
NamespaceId::new(1),
TopicId::new(2),
QueryPoolId::new(3),
iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE,
iox_catalog::DEFAULT_MAX_TABLES,
iox_catalog::DEFAULT_RETENTION_PERIOD,
);
assert_matches!(cache.put_schema(ns.clone(), schema1.clone()), None);
// Ensure it is present
assert_eq!(
*cache
.get_schema(&ns)
.await
.expect("schema should be present in cache"),
schema1
);
}
#[tokio::test]
async fn test_get_cache_miss_catalog_fetch_ok() {
let ns = NamespaceName::try_from("arán").expect("namespace name should be valid");
let inner = Arc::new(MemoryNamespaceCache::default());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let cache = Arc::new(ReadThroughCache::new(inner, Arc::clone(&catalog)));
// Pre-condition: Namespace not in cache or catalog.
assert_matches!(cache.get_schema(&ns).await, Err(_));
// Place a schema in the catalog for that name
let schema1 = NamespaceSchema::new(
NamespaceId::new(1),
TopicId::new(2),
QueryPoolId::new(3),
iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE,
iox_catalog::DEFAULT_MAX_TABLES,
iox_catalog::DEFAULT_RETENTION_PERIOD,
);
assert_matches!(
catalog
.repositories()
.await
.namespaces()
.create(
&ns,
iox_catalog::DEFAULT_RETENTION_PERIOD,
schema1.topic_id,
schema1.query_pool_id,
)
.await,
Ok(_)
);
// Query the cache again, should return the above schema after missing the cache.
assert_matches!(cache.get_schema(&ns).await, Ok(v) => {
assert_eq!(*v, schema1);
})
}
}

View File

@ -1,5 +1,6 @@
use std::sync::Arc;
use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema};
use sharder::JumpHash;
@ -21,12 +22,18 @@ impl<T> ShardedCache<T> {
}
}
#[async_trait]
impl<T> NamespaceCache for Arc<ShardedCache<T>>
where
T: NamespaceCache,
{
fn get_schema(&self, namespace: &NamespaceName<'_>) -> Option<Arc<NamespaceSchema>> {
self.shards.hash(namespace).get_schema(namespace)
type ReadError = T::ReadError;
async fn get_schema(
&self,
namespace: &NamespaceName<'static>,
) -> Result<Arc<NamespaceSchema>, Self::ReadError> {
self.shards.hash(namespace).get_schema(namespace).await
}
fn put_schema(
@ -40,8 +47,10 @@ where
#[cfg(test)]
mod tests {
use std::{collections::HashMap, iter};
use assert_matches::assert_matches;
use data_types::{NamespaceId, QueryPoolId, TopicId};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
@ -70,8 +79,8 @@ mod tests {
}
}
#[test]
fn test_stable_cache_sharding() {
#[tokio::test]
async fn test_stable_cache_sharding() {
// The number of namespaces to test with.
const N: usize = 100;
@ -92,7 +101,7 @@ mod tests {
// The cache should be empty.
for name in names.keys() {
assert!(cache.get_schema(name).is_none());
assert_matches!(cache.get_schema(name).await, Err(_));
}
// Populate the cache
@ -104,7 +113,9 @@ mod tests {
// The mapping should be stable
for (name, id) in names {
let want = schema_with_id(id as _);
assert_eq!(cache.get_schema(&name), Some(Arc::new(want)));
assert_matches!(cache.get_schema(&name).await, Ok(got) => {
assert_eq!(got, Arc::new(want));
});
}
}
}

View File

@ -1,11 +1,7 @@
//! An trait to abstract resolving a[`NamespaceName`] to [`NamespaceId`], and a
//! collection of composable implementations.
use std::{ops::DerefMut, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceId, NamespaceName};
use iox_catalog::interface::{get_schema_by_name, Catalog, SoftDeletedRows};
use observability_deps::tracing::*;
use thiserror::Error;
@ -37,27 +33,25 @@ pub trait NamespaceResolver: std::fmt::Debug + Send + Sync {
) -> Result<NamespaceId, Error>;
}
/// An implementation of [`NamespaceResolver`] that queries the [`Catalog`] to
/// resolve a [`NamespaceId`], and populates the [`NamespaceCache`] as a side
/// effect.
/// An implementation of [`NamespaceResolver`] that resolves the [`NamespaceId`]
/// for a given name through a [`NamespaceCache`].
#[derive(Debug)]
pub struct NamespaceSchemaResolver<C> {
catalog: Arc<dyn Catalog>,
cache: C,
}
impl<C> NamespaceSchemaResolver<C> {
/// Construct a new [`NamespaceSchemaResolver`] that fetches schemas from
/// `catalog` and caches them in `cache`.
pub fn new(catalog: Arc<dyn Catalog>, cache: C) -> Self {
Self { catalog, cache }
/// Construct a new [`NamespaceSchemaResolver`] that resolves namespace IDs
/// using `cache`.
pub fn new(cache: C) -> Self {
Self { cache }
}
}
#[async_trait]
impl<C> NamespaceResolver for NamespaceSchemaResolver<C>
where
C: NamespaceCache,
C: NamespaceCache<ReadError = iox_catalog::interface::Error>,
{
async fn get_namespace_id(
&self,
@ -65,38 +59,9 @@ where
) -> Result<NamespaceId, Error> {
// Load the namespace schema from the cache, falling back to pulling it
// from the global catalog (if it exists).
match self.cache.get_schema(namespace) {
Some(v) => Ok(v.id),
None => {
let mut repos = self.catalog.repositories().await;
// Pull the schema from the global catalog or error if it does
// not exist.
let schema = get_schema_by_name(
namespace,
repos.deref_mut(),
SoftDeletedRows::ExcludeDeleted,
)
.await
.map_err(|e| {
warn!(
error=%e,
%namespace,
"failed to retrieve namespace schema"
);
Error::Lookup(e)
})
.map(Arc::new)?;
// Cache population MAY race with other threads and lead to
// overwrites, but an entry will always exist once inserted, and
// the schemas will eventually converge.
self.cache
.put_schema(namespace.clone(), Arc::clone(&schema));
trace!(%namespace, "schema cache populated");
Ok(schema.id)
}
match self.cache.get_schema(namespace).await {
Ok(v) => Ok(v.id),
Err(e) => return Err(Error::Lookup(e)),
}
}
}
@ -107,17 +72,26 @@ mod tests {
use assert_matches::assert_matches;
use data_types::{NamespaceId, NamespaceSchema, QueryPoolId, TopicId};
use iox_catalog::mem::MemCatalog;
use iox_catalog::{
interface::{Catalog, SoftDeletedRows},
mem::MemCatalog,
};
use super::*;
use crate::namespace_cache::MemoryNamespaceCache;
use crate::namespace_cache::{MemoryNamespaceCache, ReadThroughCache};
#[tokio::test]
async fn test_cache_hit() {
let ns = NamespaceName::try_from("bananas").unwrap();
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
// Prep the cache before the test to cause a hit
let cache = Arc::new(MemoryNamespaceCache::default());
let cache = Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
Arc::clone(&catalog),
));
cache.put_schema(
ns.clone(),
NamespaceSchema {
@ -131,10 +105,7 @@ mod tests {
},
);
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache));
let resolver = NamespaceSchemaResolver::new(Arc::clone(&cache));
// Drive the code under test
resolver
@ -142,7 +113,7 @@ mod tests {
.await
.expect("lookup should succeed");
assert!(cache.get_schema(&ns).is_some());
assert!(cache.get_schema(&ns).await.is_ok());
// The cache hit should mean the catalog SHOULD NOT see a create request
// for the namespace.
@ -162,9 +133,12 @@ mod tests {
async fn test_cache_miss() {
let ns = NamespaceName::try_from("bananas").unwrap();
let cache = Arc::new(MemoryNamespaceCache::default());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let cache = Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
Arc::clone(&catalog),
));
// Create the namespace in the catalog
{
@ -178,7 +152,7 @@ mod tests {
.expect("failed to setup catalog state");
}
let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache));
let resolver = NamespaceSchemaResolver::new(Arc::clone(&cache));
resolver
.get_namespace_id(&ns)
@ -186,16 +160,19 @@ mod tests {
.expect("lookup should succeed");
// The cache should be populated as a result of the lookup.
assert!(cache.get_schema(&ns).is_some());
assert!(cache.get_schema(&ns).await.is_ok());
}
#[tokio::test]
async fn test_cache_miss_soft_deleted() {
let ns = NamespaceName::try_from("bananas").unwrap();
let cache = Arc::new(MemoryNamespaceCache::default());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let cache = Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
Arc::clone(&catalog),
));
// Create the namespace in the catalog and mark it as deleted
{
@ -214,7 +191,7 @@ mod tests {
.expect("failed to setup catalog state");
}
let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache));
let resolver = NamespaceSchemaResolver::new(Arc::clone(&cache));
let err = resolver
.get_namespace_id(&ns)
@ -226,18 +203,21 @@ mod tests {
);
// The cache should NOT be populated as a result of the lookup.
assert!(cache.get_schema(&ns).is_none());
assert!(cache.get_schema(&ns).await.is_err());
}
#[tokio::test]
async fn test_cache_miss_does_not_exist() {
let ns = NamespaceName::try_from("bananas").unwrap();
let cache = Arc::new(MemoryNamespaceCache::default());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let cache = Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
Arc::clone(&catalog),
));
let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache));
let resolver = NamespaceSchemaResolver::new(Arc::clone(&cache));
let err = resolver
.get_namespace_id(&ns)
@ -245,6 +225,6 @@ mod tests {
.expect_err("lookup should error");
assert_matches!(err, Error::Lookup(_));
assert!(cache.get_schema(&ns).is_none());
assert!(cache.get_schema(&ns).await.is_err());
}
}

View File

@ -79,7 +79,7 @@ impl<C, T> NamespaceAutocreation<C, T> {
#[async_trait]
impl<C, T> NamespaceResolver for NamespaceAutocreation<C, T>
where
C: NamespaceCache,
C: NamespaceCache<ReadError = iox_catalog::interface::Error>, // The resolver relies on the cache for read-through cache behaviour
T: NamespaceResolver,
{
/// Force the creation of `namespace` if it does not already exist in the
@ -88,7 +88,7 @@ where
&self,
namespace: &NamespaceName<'static>,
) -> Result<NamespaceId, super::Error> {
if self.cache.get_schema(namespace).is_none() {
if self.cache.get_schema(namespace).await.is_err() {
trace!(%namespace, "namespace not found in cache");
match self.action {
@ -153,7 +153,7 @@ mod tests {
use super::*;
use crate::{
namespace_cache::MemoryNamespaceCache,
namespace_cache::{MemoryNamespaceCache, ReadThroughCache},
namespace_resolver::{mock::MockNamespaceResolver, NamespaceSchemaResolver},
};
@ -166,8 +166,14 @@ mod tests {
let ns = NamespaceName::try_from("bananas").unwrap();
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
// Prep the cache before the test to cause a hit
let cache = Arc::new(MemoryNamespaceCache::default());
let cache = Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
Arc::clone(&catalog),
));
cache.put_schema(
ns.clone(),
NamespaceSchema {
@ -181,9 +187,6 @@ mod tests {
},
);
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let creator = NamespaceAutocreation::new(
MockNamespaceResolver::default().with_mapping(ns.clone(), NAMESPACE_ID),
cache,
@ -218,10 +221,14 @@ mod tests {
async fn test_cache_miss() {
let ns = NamespaceName::try_from("bananas").unwrap();
let cache = Arc::new(MemoryNamespaceCache::default());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let cache = Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
Arc::clone(&catalog),
));
let creator = NamespaceAutocreation::new(
MockNamespaceResolver::default().with_mapping(ns.clone(), NamespaceId::new(1)),
cache,
@ -266,9 +273,12 @@ mod tests {
async fn test_reject() {
let ns = NamespaceName::try_from("bananas").unwrap();
let cache = Arc::new(MemoryNamespaceCache::default());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let cache = Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
Arc::clone(&catalog),
));
let creator = NamespaceAutocreation::new(
MockNamespaceResolver::default(),
@ -302,13 +312,16 @@ mod tests {
async fn test_reject_exists_in_catalog() {
let ns = NamespaceName::try_from("bananas").unwrap();
let cache = Arc::new(MemoryNamespaceCache::default());
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let cache = Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
Arc::clone(&catalog),
));
// First drive the population of the catalog
let creator = NamespaceAutocreation::new(
NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)),
NamespaceSchemaResolver::new(Arc::clone(&cache)),
Arc::clone(&cache),
Arc::clone(&catalog),
TopicId::new(42),
@ -323,7 +336,7 @@ mod tests {
// Now try in "reject" mode.
let creator = NamespaceAutocreation::new(
NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)),
NamespaceSchemaResolver::new(Arc::clone(&cache)),
cache,
Arc::clone(&catalog),
TopicId::new(42),

View File

@ -14,7 +14,7 @@ use router::{
InstrumentationDecorator, Partitioned, Partitioner, RetentionValidator, RpcWrite,
SchemaValidator, WriteSummaryAdapter,
},
namespace_cache::{MemoryNamespaceCache, ShardedCache},
namespace_cache::{MemoryNamespaceCache, ReadThroughCache, ShardedCache},
namespace_resolver::{MissingNamespaceAction, NamespaceAutocreation, NamespaceSchemaResolver},
server::{
grpc::RpcWriteGrpcDelegate,
@ -118,8 +118,12 @@ type HttpDelegateStack = HttpDelegate<
Chain<
Chain<
Chain<
RetentionValidator<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>,
SchemaValidator<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>,
RetentionValidator<
Arc<ReadThroughCache<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>>,
>,
SchemaValidator<
Arc<ReadThroughCache<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>>,
>,
>,
Partitioner,
>,
@ -132,8 +136,10 @@ type HttpDelegateStack = HttpDelegate<
>,
>,
NamespaceAutocreation<
Arc<ShardedCache<Arc<MemoryNamespaceCache>>>,
NamespaceSchemaResolver<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>,
Arc<ReadThroughCache<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>>,
NamespaceSchemaResolver<
Arc<ReadThroughCache<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>>,
>,
>,
>;
@ -148,22 +154,23 @@ impl TestContext {
let client = Arc::new(MockWriteClient::default());
let rpc_writer = RpcWrite::new([(Arc::clone(&client), "mock client")], None, &metrics);
let ns_cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
let ns_cache = Arc::new(ReadThroughCache::new(
Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
)),
Arc::clone(&catalog),
));
let schema_validator =
SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache), &metrics);
let retention_validator =
RetentionValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
let retention_validator = RetentionValidator::new(Arc::clone(&ns_cache));
let partitioner = Partitioner::new(PartitionTemplate {
parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())],
});
let namespace_resolver =
NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache));
let namespace_resolver = NamespaceSchemaResolver::new(Arc::clone(&ns_cache));
let namespace_resolver = NamespaceAutocreation::new(
namespace_resolver,
Arc::clone(&ns_cache),