diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index 68049799f7..e3a8e7bdbf 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -31,7 +31,8 @@ use router::{ ShardedWriteBuffer, WriteSummaryAdapter, }, namespace_cache::{ - metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ShardedCache, + metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ReadThroughCache, + ShardedCache, }, namespace_resolver::{ MissingNamespaceAction, NamespaceAutocreation, NamespaceResolver, NamespaceSchemaResolver, @@ -322,11 +323,14 @@ pub async fn create_router2_server_type( // Initialise an instrumented namespace cache to be shared with the schema // validator, and namespace auto-creator that reports cache hit/miss/update // metrics. - let ns_cache = Arc::new(InstrumentedCache::new( - Arc::new(ShardedCache::new( - std::iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + let ns_cache = Arc::new(ReadThroughCache::new( + Arc::new(InstrumentedCache::new( + Arc::new(ShardedCache::new( + std::iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + )), + &metrics, )), - &metrics, + Arc::clone(&catalog), )); pre_warm_schema_cache(&ns_cache, &*catalog) @@ -342,7 +346,7 @@ pub async fn create_router2_server_type( // c. Retention validator // Add a retention validator into handler stack to reject data outside the retention period - let retention_validator = RetentionValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache)); + let retention_validator = RetentionValidator::new(Arc::clone(&ns_cache)); let retention_validator = InstrumentationDecorator::new("retention_validator", &metrics, retention_validator); @@ -358,8 +362,7 @@ pub async fn create_router2_server_type( // e. Namespace resolver // Initialise the Namespace ID lookup + cache - let namespace_resolver = - NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache)); + let namespace_resolver = NamespaceSchemaResolver::new(Arc::clone(&ns_cache)); //////////////////////////////////////////////////////////////////////////// // @@ -513,11 +516,14 @@ pub async fn create_router_server_type<'a>( // Initialise an instrumented namespace cache to be shared with the schema // validator, and namespace auto-creator that reports cache hit/miss/update // metrics. - let ns_cache = Arc::new(InstrumentedCache::new( - Arc::new(ShardedCache::new( - std::iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + let ns_cache = Arc::new(ReadThroughCache::new( + Arc::new(InstrumentedCache::new( + Arc::new(ShardedCache::new( + std::iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + )), + &metrics, )), - &metrics, + Arc::clone(&catalog), )); pre_warm_schema_cache(&ns_cache, &*catalog) @@ -533,7 +539,7 @@ pub async fn create_router_server_type<'a>( // c. Retention validator // Add a retention validator into handler stack to reject data outside the retention period - let retention_validator = RetentionValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache)); + let retention_validator = RetentionValidator::new(Arc::clone(&ns_cache)); let retention_validator = InstrumentationDecorator::new("retention_validator", &metrics, retention_validator); @@ -547,8 +553,7 @@ pub async fn create_router_server_type<'a>( // e. Namespace resolver // Initialise the Namespace ID lookup + cache - let namespace_resolver = - NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache)); + let namespace_resolver = NamespaceSchemaResolver::new(Arc::clone(&ns_cache)); //////////////////////////////////////////////////////////////////////////// // @@ -799,7 +804,10 @@ mod tests { .expect("pre-warming failed"); let name = NamespaceName::new("test_ns").unwrap(); - let got = cache.get_schema(&name).expect("should contain a schema"); + let got = cache + .get_schema(&name) + .await + .expect("should contain a schema"); assert!(got.tables.get("name").is_some()); } diff --git a/router/benches/e2e.rs b/router/benches/e2e.rs index 3830424f1e..2533d09b23 100644 --- a/router/benches/e2e.rs +++ b/router/benches/e2e.rs @@ -9,7 +9,7 @@ use router::{ DmlHandlerChainExt, FanOutAdaptor, Partitioner, SchemaValidator, ShardedWriteBuffer, WriteSummaryAdapter, }, - namespace_cache::{MemoryNamespaceCache, ShardedCache}, + namespace_cache::{MemoryNamespaceCache, ReadThroughCache, ShardedCache}, namespace_resolver::mock::MockNamespaceResolver, server::http::{ write::{multi_tenant::MultiTenantRequestParser, WriteParamExtractor}, @@ -64,8 +64,11 @@ fn e2e_benchmarks(c: &mut Criterion) { let delegate = { let metrics = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); - let ns_cache = Arc::new(ShardedCache::new( - iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + let ns_cache = Arc::new(ReadThroughCache::new( + Arc::new(ShardedCache::new( + iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + )), + Arc::clone(&catalog), )); let write_buffer = init_write_buffer(1); diff --git a/router/benches/schema_validator.rs b/router/benches/schema_validator.rs index 9cd7a99fd0..96d42a1c9b 100644 --- a/router/benches/schema_validator.rs +++ b/router/benches/schema_validator.rs @@ -6,12 +6,12 @@ use criterion::{ }; use data_types::{NamespaceId, NamespaceName}; use hashbrown::HashMap; -use iox_catalog::mem::MemCatalog; +use iox_catalog::{interface::Catalog, mem::MemCatalog}; use mutable_batch::MutableBatch; use once_cell::sync::Lazy; use router::{ dml_handlers::{DmlHandler, SchemaValidator}, - namespace_cache::{MemoryNamespaceCache, ShardedCache}, + namespace_cache::{MemoryNamespaceCache, ReadThroughCache, ShardedCache}, }; use schema::Projection; use tokio::runtime::Runtime; @@ -41,9 +41,12 @@ fn sharder_benchmarks(c: &mut Criterion) { fn bench(group: &mut BenchmarkGroup, tables: usize, columns_per_table: usize) { let metrics = Arc::new(metric::Registry::default()); - let catalog = Arc::new(MemCatalog::new(Arc::clone(&metrics))); - let ns_cache = Arc::new(ShardedCache::new( - iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); + let ns_cache = Arc::new(ReadThroughCache::new( + Arc::new(ShardedCache::new( + iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + )), + Arc::clone(&catalog), )); let validator = SchemaValidator::new(catalog, ns_cache, &metrics); diff --git a/router/src/dml_handlers/retention_validation.rs b/router/src/dml_handlers/retention_validation.rs index 50f19abeca..0e426be1c9 100644 --- a/router/src/dml_handlers/retention_validation.rs +++ b/router/src/dml_handlers/retention_validation.rs @@ -1,9 +1,6 @@ -use std::{ops::DerefMut, sync::Arc}; - use async_trait::async_trait; use data_types::{DeletePredicate, NamespaceId, NamespaceName}; use hashbrown::HashMap; -use iox_catalog::interface::{get_schema_by_name, Catalog, SoftDeletedRows}; use iox_time::{SystemProvider, TimeProvider}; use mutable_batch::MutableBatch; use observability_deps::tracing::*; @@ -11,7 +8,7 @@ use thiserror::Error; use trace::ctx::SpanContext; use super::DmlHandler; -use crate::namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache}; +use crate::namespace_cache::NamespaceCache; /// Errors emitted during retention validation. #[derive(Debug, Error)] @@ -33,21 +30,17 @@ pub enum RetentionError { /// entire write is rejected. /// /// Namespace retention periods are loaded from the provided [`NamespaceCache`] -/// implementation. If a cache miss occurs, the [`Catalog`] is queried and the -/// cache is populated. +/// implementation. #[derive(Debug)] -pub struct RetentionValidator>, P = SystemProvider> -{ - catalog: Arc, +pub struct RetentionValidator { cache: C, time_provider: P, } impl RetentionValidator { /// Initialise a new [`RetentionValidator`], rejecting time outside retention period - pub fn new(catalog: Arc, cache: C) -> Self { + pub fn new(cache: C) -> Self { Self { - catalog, cache, time_provider: Default::default(), } @@ -57,7 +50,7 @@ impl RetentionValidator { #[async_trait] impl DmlHandler for RetentionValidator where - C: NamespaceCache, + C: NamespaceCache, // The handler expects the cache to read from the catalog if necessary. { type WriteError = RetentionError; type DeleteError = RetentionError; @@ -69,43 +62,14 @@ where async fn write( &self, namespace: &NamespaceName<'static>, - namespace_id: NamespaceId, + _namespace_id: NamespaceId, batch: Self::WriteInput, _span_ctx: Option, ) -> Result { - let mut repos = self.catalog.repositories().await; - - // Load the namespace schema from the cache, falling back to pulling it - // from the global catalog (if it exists). - let schema = self.cache.get_schema(namespace); - let schema = match schema { - Some(v) => v, - None => { - // Pull the schema from the global catalog or error if it does - // not exist. - let schema = get_schema_by_name( - namespace, - repos.deref_mut(), - SoftDeletedRows::ExcludeDeleted, - ) - .await - .map_err(|e| { - warn!( - error=%e, - %namespace, - %namespace_id, - "failed to retrieve namespace schema" - ); - RetentionError::NamespaceLookup(e) - }) - .map(Arc::new)?; - - self.cache - .put_schema(namespace.clone(), Arc::clone(&schema)); - - trace!(%namespace, "schema cache populated"); - schema - } + // Try to fetch the namespace schema through the cache. + let schema = match self.cache.get_schema(namespace).await { + Ok(v) => v, + Err(e) => return Err(RetentionError::NamespaceLookup(e)), }; // retention is not infinte, validate all lines of a write are within the retention period @@ -139,14 +103,25 @@ where #[cfg(test)] mod tests { - use iox_tests::{TestCatalog, TestNamespace}; - use once_cell::sync::Lazy; use std::sync::Arc; + use iox_tests::{TestCatalog, TestNamespace}; + use once_cell::sync::Lazy; + use super::*; + use crate::namespace_cache::{MemoryNamespaceCache, ReadThroughCache}; static NAMESPACE: Lazy> = Lazy::new(|| "bananas".try_into().unwrap()); + fn setup_test_cache( + catalog: Arc, + ) -> Arc>> { + Arc::new(ReadThroughCache::new( + Arc::new(MemoryNamespaceCache::default()), + catalog.catalog(), + )) + } + #[tokio::test] async fn test_time_inside_retention_period() { let (catalog, namespace) = test_setup().await; @@ -154,9 +129,9 @@ mod tests { // Create the table so that there is a known ID that must be returned. let _want_id = namespace.create_table("bananas").await.table.id; - // Create the validator whse retention period is 1 hour - let handler = - RetentionValidator::new(catalog.catalog(), Arc::new(MemoryNamespaceCache::default())); + // Create the validator whose retention period is 1 hour + let cache = setup_test_cache(catalog); + let handler = RetentionValidator::new(cache); // Make time now to be inside the retention period let now = SystemProvider::default() @@ -182,8 +157,8 @@ mod tests { let _want_id = namespace.create_table("bananas").await.table.id; // Create the validator whose retention period is 1 hour - let handler = - RetentionValidator::new(catalog.catalog(), Arc::new(MemoryNamespaceCache::default())); + let cache = setup_test_cache(catalog); + let handler = RetentionValidator::new(cache); // Make time outside the retention period let two_hours_ago = (SystemProvider::default().now().timestamp_nanos() @@ -209,9 +184,9 @@ mod tests { // Create the table so that there is a known ID that must be returned. let _want_id = namespace.create_table("bananas").await.table.id; - // Create the validator whse retention period is 1 hour - let handler = - RetentionValidator::new(catalog.catalog(), Arc::new(MemoryNamespaceCache::default())); + // Create the validator whose retention period is 1 hour + let cache = setup_test_cache(catalog); + let handler = RetentionValidator::new(cache); // Make time now to be inside the retention period let now = SystemProvider::default() @@ -246,8 +221,8 @@ mod tests { let _want_id = namespace.create_table("bananas").await.table.id; // Create the validator whse retention period is 1 hour - let handler = - RetentionValidator::new(catalog.catalog(), Arc::new(MemoryNamespaceCache::default())); + let cache = setup_test_cache(catalog); + let handler = RetentionValidator::new(cache); // Make time now to be inside the retention period let now = SystemProvider::default() diff --git a/router/src/dml_handlers/schema_validation.rs b/router/src/dml_handlers/schema_validation.rs index 41b72603f4..86736a7692 100644 --- a/router/src/dml_handlers/schema_validation.rs +++ b/router/src/dml_handlers/schema_validation.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use data_types::{DeletePredicate, NamespaceId, NamespaceName, NamespaceSchema, TableId}; use hashbrown::HashMap; use iox_catalog::{ - interface::{get_schema_by_name, Catalog, Error as CatalogError, SoftDeletedRows}, + interface::{Catalog, Error as CatalogError}, validate_or_insert_schema, }; use metric::U64Counter; @@ -14,7 +14,7 @@ use thiserror::Error; use trace::ctx::SpanContext; use super::DmlHandler; -use crate::namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache}; +use crate::namespace_cache::NamespaceCache; /// Errors emitted during schema validation. #[derive(Debug, Error)] @@ -102,7 +102,7 @@ pub enum SchemaError { /// /// [#3573]: https://github.com/influxdata/influxdb_iox/issues/3573 #[derive(Debug)] -pub struct SchemaValidator>> { +pub struct SchemaValidator { catalog: Arc, cache: C, @@ -113,9 +113,7 @@ pub struct SchemaValidator>> { impl SchemaValidator { /// Initialise a new [`SchemaValidator`] decorator, loading schemas from - /// `catalog`. - /// - /// Schemas are cached in `ns_cache`. + /// `catalog` and the provided `ns_cache`. pub fn new(catalog: Arc, ns_cache: C, metrics: &metric::Registry) -> Self { let service_limit_hit = metrics.register_metric::( "schema_validation_service_limit_reached", @@ -144,7 +142,7 @@ impl SchemaValidator { #[async_trait] impl DmlHandler for SchemaValidator where - C: NamespaceCache, + C: NamespaceCache, // The handler expects the cache to read from the catalog if necessary. { type WriteError = SchemaError; type DeleteError = SchemaError; @@ -176,39 +174,10 @@ where batches: Self::WriteInput, _span_ctx: Option, ) -> Result { - let mut repos = self.catalog.repositories().await; - - // Load the namespace schema from the cache, falling back to pulling it - // from the global catalog (if it exists). - let schema = self.cache.get_schema(namespace); - let schema = match schema { - Some(v) => v, - None => { - // Pull the schema from the global catalog or error if it does - // not exist. - let schema = get_schema_by_name( - namespace, - repos.deref_mut(), - SoftDeletedRows::ExcludeDeleted, - ) - .await - .map_err(|e| { - warn!( - error=%e, - %namespace, - %namespace_id, - "failed to retrieve namespace schema" - ); - SchemaError::NamespaceLookup(e) - }) - .map(Arc::new)?; - - self.cache - .put_schema(namespace.clone(), Arc::clone(&schema)); - - trace!(%namespace, "schema cache populated"); - schema - } + // Try to fetch the namespace schema through the cache. + let schema = match self.cache.get_schema(namespace).await { + Ok(v) => v, + Err(e) => return Err(SchemaError::NamespaceLookup(e)), }; validate_schema_limits(&batches, &schema).map_err(|e| { @@ -249,6 +218,8 @@ where SchemaError::ServiceLimit(Box::new(e)) })?; + let mut repos = self.catalog.repositories().await; + let maybe_new_schema = validate_or_insert_schema( batches.iter().map(|(k, v)| (k.as_str(), v)), &schema, @@ -495,6 +466,7 @@ mod tests { use once_cell::sync::Lazy; use super::*; + use crate::namespace_cache::{MemoryNamespaceCache, ReadThroughCache}; static NAMESPACE: Lazy> = Lazy::new(|| "bananas".try_into().unwrap()); @@ -597,12 +569,12 @@ mod tests { // Make two schema validator instances each with their own cache let handler1 = SchemaValidator::new( catalog.catalog(), - Arc::new(MemoryNamespaceCache::default()), + setup_test_cache(&catalog), &catalog.metric_registry, ); let handler2 = SchemaValidator::new( catalog.catalog(), - Arc::new(MemoryNamespaceCache::default()), + setup_test_cache(&catalog), &catalog.metric_registry, ); @@ -785,7 +757,14 @@ mod tests { (catalog, namespace) } - fn assert_cache(handler: &SchemaValidator, table: &str, col: &str, want: ColumnType) + fn setup_test_cache(catalog: &TestCatalog) -> Arc>> { + Arc::new(ReadThroughCache::new( + Arc::new(MemoryNamespaceCache::default()), + catalog.catalog(), + )) + } + + async fn assert_cache(handler: &SchemaValidator, table: &str, col: &str, want: ColumnType) where C: NamespaceCache, { @@ -793,6 +772,7 @@ mod tests { let ns = handler .cache .get_schema(&NAMESPACE) + .await .expect("cache should be populated"); let table = ns.tables.get(table).expect("table should exist in cache"); assert_eq!( @@ -813,11 +793,7 @@ mod tests { let want_id = namespace.create_table("bananas").await.table.id; let metrics = Arc::new(metric::Registry::default()); - let handler = SchemaValidator::new( - catalog.catalog(), - Arc::new(MemoryNamespaceCache::default()), - &metrics, - ); + let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics); let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); let got = handler @@ -826,10 +802,10 @@ mod tests { .expect("request should succeed"); // The cache should be populated. - assert_cache(&handler, "bananas", "tag1", ColumnType::Tag); - assert_cache(&handler, "bananas", "tag2", ColumnType::Tag); - assert_cache(&handler, "bananas", "val", ColumnType::I64); - assert_cache(&handler, "bananas", "time", ColumnType::Time); + assert_cache(&handler, "bananas", "tag1", ColumnType::Tag).await; + assert_cache(&handler, "bananas", "tag2", ColumnType::Tag).await; + assert_cache(&handler, "bananas", "val", ColumnType::I64).await; + assert_cache(&handler, "bananas", "time", ColumnType::Time).await; // Validate the table ID mapping. let (name, _data) = got.get(&want_id).expect("table not in output"); @@ -840,11 +816,7 @@ mod tests { async fn test_write_schema_not_found() { let (catalog, _namespace) = test_setup().await; let metrics = Arc::new(metric::Registry::default()); - let handler = SchemaValidator::new( - catalog.catalog(), - Arc::new(MemoryNamespaceCache::default()), - &metrics, - ); + let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics); let ns = NamespaceName::try_from("A_DIFFERENT_NAMESPACE").unwrap(); @@ -857,18 +829,14 @@ mod tests { assert_matches!(err, SchemaError::NamespaceLookup(_)); // The cache should not have retained the schema. - assert!(handler.cache.get_schema(&ns).is_none()); + assert!(handler.cache.get_schema(&ns).await.is_err()); } #[tokio::test] async fn test_write_validation_failure() { let (catalog, _namespace) = test_setup().await; let metrics = Arc::new(metric::Registry::default()); - let handler = SchemaValidator::new( - catalog.catalog(), - Arc::new(MemoryNamespaceCache::default()), - &metrics, - ); + let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics); // First write sets the schema let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); // val=i64 @@ -890,10 +858,10 @@ mod tests { }); // The cache should retain the original schema. - assert_cache(&handler, "bananas", "tag1", ColumnType::Tag); - assert_cache(&handler, "bananas", "tag2", ColumnType::Tag); - assert_cache(&handler, "bananas", "val", ColumnType::I64); // original type - assert_cache(&handler, "bananas", "time", ColumnType::Time); + assert_cache(&handler, "bananas", "tag1", ColumnType::Tag).await; + assert_cache(&handler, "bananas", "tag2", ColumnType::Tag).await; + assert_cache(&handler, "bananas", "val", ColumnType::I64).await; // original type + assert_cache(&handler, "bananas", "time", ColumnType::Time).await; assert_eq!(1, handler.schema_conflict.fetch()); } @@ -902,11 +870,7 @@ mod tests { async fn test_write_table_service_limit() { let (catalog, _namespace) = test_setup().await; let metrics = Arc::new(metric::Registry::default()); - let handler = SchemaValidator::new( - catalog.catalog(), - Arc::new(MemoryNamespaceCache::default()), - &metrics, - ); + let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics); // First write sets the schema let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); @@ -941,11 +905,7 @@ mod tests { async fn test_write_column_service_limit() { let (catalog, namespace) = test_setup().await; let metrics = Arc::new(metric::Registry::default()); - let handler = SchemaValidator::new( - catalog.catalog(), - Arc::new(MemoryNamespaceCache::default()), - &metrics, - ); + let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics); // First write sets the schema let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); @@ -957,11 +917,7 @@ mod tests { // Configure the service limit to be hit next request namespace.update_column_limit(1).await; - let handler = SchemaValidator::new( - catalog.catalog(), - Arc::new(MemoryNamespaceCache::default()), - &metrics, - ); + let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics); // Second write attempts to violate limits, causing an error let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i,val2=42i 123456"); @@ -978,11 +934,7 @@ mod tests { async fn test_first_write_many_columns_service_limit() { let (catalog, _namespace) = test_setup().await; let metrics = Arc::new(metric::Registry::default()); - let handler = SchemaValidator::new( - catalog.catalog(), - Arc::new(MemoryNamespaceCache::default()), - &metrics, - ); + let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics); // Configure the service limit to be hit next request catalog @@ -1012,11 +964,7 @@ mod tests { let (catalog, _namespace) = test_setup().await; let metrics = Arc::new(metric::Registry::default()); - let handler = SchemaValidator::new( - catalog.catalog(), - Arc::new(MemoryNamespaceCache::default()), - &metrics, - ); + let handler = SchemaValidator::new(catalog.catalog(), setup_test_cache(&catalog), &metrics); let predicate = DeletePredicate { range: TimestampRange::new(1, 2), @@ -1031,6 +979,6 @@ mod tests { .expect("request should succeed"); // Deletes have no effect on the cache. - assert!(handler.cache.get_schema(&ns).is_none()); + assert!(handler.cache.get_schema(&ns).await.is_err()); } } diff --git a/router/src/namespace_cache.rs b/router/src/namespace_cache.rs index bcee97db63..ec6389856f 100644 --- a/router/src/namespace_cache.rs +++ b/router/src/namespace_cache.rs @@ -8,14 +8,27 @@ pub use sharded_cache::*; pub mod metrics; -use std::{fmt::Debug, sync::Arc}; +mod read_through_cache; +pub use read_through_cache::*; +use std::{error::Error, fmt::Debug, sync::Arc}; + +use async_trait::async_trait; use data_types::{NamespaceName, NamespaceSchema}; /// An abstract cache of [`NamespaceSchema`]. +#[async_trait] pub trait NamespaceCache: Debug + Send + Sync { + /// The type of error a [`NamespaceCache`] implementation produces + /// when unable to read the [`NamespaceSchema`] requested from the + /// cache. + type ReadError: Error + Send; + /// Return the [`NamespaceSchema`] for `namespace`. - fn get_schema(&self, namespace: &NamespaceName<'_>) -> Option>; + async fn get_schema( + &self, + namespace: &NamespaceName<'static>, + ) -> Result, Self::ReadError>; /// Place `schema` in the cache, unconditionally overwriting any existing /// [`NamespaceSchema`] mapped to `namespace`, returning diff --git a/router/src/namespace_cache/memory.rs b/router/src/namespace_cache/memory.rs index 1baeed6a4d..8b73f2a1d3 100644 --- a/router/src/namespace_cache/memory.rs +++ b/router/src/namespace_cache/memory.rs @@ -1,11 +1,20 @@ use std::sync::Arc; +use async_trait::async_trait; use data_types::{NamespaceName, NamespaceSchema}; use hashbrown::HashMap; use parking_lot::RwLock; +use thiserror::Error; use super::NamespaceCache; +/// An error type indicating that `namespace` is not present in the cache. +#[derive(Debug, Error)] +#[error("namespace {namespace} not found in cache")] +pub struct CacheMissErr { + pub(super) namespace: NamespaceName<'static>, +} + /// An in-memory cache of [`NamespaceSchema`] backed by a hashmap protected with /// a read-write mutex. #[derive(Debug, Default)] @@ -13,9 +22,21 @@ pub struct MemoryNamespaceCache { cache: RwLock, Arc>>, } +#[async_trait] impl NamespaceCache for Arc { - fn get_schema(&self, namespace: &NamespaceName<'_>) -> Option> { - self.cache.read().get(namespace).map(Arc::clone) + type ReadError = CacheMissErr; + + async fn get_schema( + &self, + namespace: &NamespaceName<'static>, + ) -> Result, Self::ReadError> { + self.cache + .read() + .get(namespace) + .ok_or(CacheMissErr { + namespace: namespace.clone(), + }) + .map(Arc::clone) } fn put_schema( @@ -29,16 +50,22 @@ impl NamespaceCache for Arc { #[cfg(test)] mod tests { + use assert_matches::assert_matches; use data_types::{NamespaceId, QueryPoolId, TopicId}; use super::*; - #[test] - fn test_put_get() { + #[tokio::test] + async fn test_put_get() { let ns = NamespaceName::new("test").expect("namespace name is valid"); let cache = Arc::new(MemoryNamespaceCache::default()); - assert!(cache.get_schema(&ns).is_none()); + assert_matches!( + cache.get_schema(&ns).await, + Err(CacheMissErr { namespace: got_ns }) => { + assert_eq!(got_ns, ns); + } + ); let schema1 = NamespaceSchema { id: NamespaceId::new(42), @@ -50,7 +77,10 @@ mod tests { retention_period_ns: Some(876), }; assert!(cache.put_schema(ns.clone(), schema1.clone()).is_none()); - assert_eq!(*cache.get_schema(&ns).expect("lookup failure"), schema1); + assert_eq!( + *cache.get_schema(&ns).await.expect("lookup failure"), + schema1 + ); let schema2 = NamespaceSchema { id: NamespaceId::new(2), @@ -68,6 +98,9 @@ mod tests { .expect("should have existing schema"), schema1 ); - assert_eq!(*cache.get_schema(&ns).expect("lookup failure"), schema2); + assert_eq!( + *cache.get_schema(&ns).await.expect("lookup failure"), + schema2 + ); } } diff --git a/router/src/namespace_cache/metrics.rs b/router/src/namespace_cache/metrics.rs index 015a8a34ec..5df5d1752d 100644 --- a/router/src/namespace_cache/metrics.rs +++ b/router/src/namespace_cache/metrics.rs @@ -2,6 +2,7 @@ use std::sync::Arc; +use async_trait::async_trait; use data_types::{NamespaceName, NamespaceSchema}; use iox_time::{SystemProvider, TimeProvider}; use metric::{DurationHistogram, Metric, U64Gauge}; @@ -69,21 +70,27 @@ impl InstrumentedCache { } } +#[async_trait] impl NamespaceCache for Arc> where T: NamespaceCache, P: TimeProvider, { - fn get_schema(&self, namespace: &NamespaceName<'_>) -> Option> { + type ReadError = T::ReadError; + + async fn get_schema( + &self, + namespace: &NamespaceName<'static>, + ) -> Result, Self::ReadError> { let t = self.time_provider.now(); - let res = self.inner.get_schema(namespace); + let res = self.inner.get_schema(namespace).await; // Avoid exploding if time goes backwards - simply drop the measurement // if it happens. if let Some(delta) = self.time_provider.now().checked_duration_since(t) { match &res { - Some(_) => self.get_hit.record(delta), - None => self.get_miss.record(delta), + Ok(_) => self.get_hit.record(delta), + Err(_) => self.get_miss.record(delta), }; } @@ -224,8 +231,8 @@ mod tests { ); } - #[test] - fn test_put() { + #[tokio::test] + async fn test_put() { let ns = NamespaceName::new("test").expect("namespace name is valid"); let registry = metric::Registry::default(); let cache = Arc::new(MemoryNamespaceCache::default()); @@ -376,7 +383,7 @@ mod tests { assert_eq!(cache.table_count.observe(), Observation::U64Gauge(5)); assert_eq!(cache.column_count.observe(), Observation::U64Gauge(42)); - let _got = cache.get_schema(&ns).expect("should exist"); + let _got = cache.get_schema(&ns).await.expect("should exist"); assert_histogram_hit( ®istry, "namespace_cache_get_duration", diff --git a/router/src/namespace_cache/read_through_cache.rs b/router/src/namespace_cache/read_through_cache.rs new file mode 100644 index 0000000000..697ab93690 --- /dev/null +++ b/router/src/namespace_cache/read_through_cache.rs @@ -0,0 +1,183 @@ +//! Read-through caching behaviour for a [`NamespaceCache`] implementation + +use std::{ops::DerefMut, sync::Arc}; + +use async_trait::async_trait; +use data_types::{NamespaceName, NamespaceSchema}; +use iox_catalog::interface::{get_schema_by_name, Catalog, SoftDeletedRows}; +use observability_deps::tracing::*; + +use super::memory::CacheMissErr; +use super::NamespaceCache; + +/// A [`ReadThroughCache`] decorates a [`NamespaceCache`] with read-through +/// caching behaviour on calls to `self.get_schema()` when contained in an +/// [`Arc`], resolving cache misses with the provided [`Catalog`]. +/// +/// Filters out all soft-deleted namespaces when resolving. +/// +/// No attempt to serialise cache misses for a particular namespace is made - +/// `N` concurrent calls for a missing namespace will cause `N` concurrent +/// catalog queries, and `N` [`NamespaceSchema`] instances replacing each other +/// in the cache before converging on a single instance (last resolved wins). +/// Subsequent queries will return the currently cached instance. +#[derive(Debug)] +pub struct ReadThroughCache { + inner_cache: T, + catalog: Arc, +} + +impl ReadThroughCache { + /// Decorates `inner_cache` with read-through caching behaviour, looking + /// up schema from `catalog` when not present in the underlying cache. + pub fn new(inner_cache: T, catalog: Arc) -> Self { + Self { + inner_cache, + catalog, + } + } +} + +#[async_trait] +impl NamespaceCache for Arc> +where + T: NamespaceCache, +{ + type ReadError = iox_catalog::interface::Error; + /// Fetch the schema for `namespace` directly from the inner cache if + /// present, pullng from the catalog if not. + async fn get_schema( + &self, + namespace: &NamespaceName<'static>, + ) -> Result, Self::ReadError> { + match self.inner_cache.get_schema(namespace).await { + Ok(v) => Ok(v), + Err(CacheMissErr { + namespace: cache_ns, + }) => { + // Invariant: the cache should not return misses for a different + // namespace name. + assert_eq!(cache_ns, *namespace); + let mut repos = self.catalog.repositories().await; + + let schema = match get_schema_by_name( + namespace, + repos.deref_mut(), + SoftDeletedRows::ExcludeDeleted, + ) + .await + { + Ok(v) => Arc::new(v), + Err(e) => { + warn!( + error = %e, + %namespace, + "failed to retrieve namespace schema" + ); + return Err(e); + } + }; + + self.put_schema(namespace.clone(), Arc::clone(&schema)); + + trace!(%namespace, "schema cache populated"); + Ok(schema) + } + } + } + + fn put_schema( + &self, + namespace: NamespaceName<'static>, + schema: impl Into>, + ) -> Option> { + self.inner_cache.put_schema(namespace, schema) + } +} + +#[cfg(test)] +mod tests { + use assert_matches::assert_matches; + use data_types::{NamespaceId, QueryPoolId, TopicId}; + use iox_catalog::mem::MemCatalog; + + use super::*; + use crate::namespace_cache::memory::MemoryNamespaceCache; + + #[tokio::test] + async fn test_put_get() { + let ns = NamespaceName::try_from("arán").expect("namespace name should be valid"); + + let inner = Arc::new(MemoryNamespaceCache::default()); + let metrics = Arc::new(metric::Registry::new()); + let catalog = Arc::new(MemCatalog::new(metrics)); + + let cache = Arc::new(ReadThroughCache::new(inner, catalog)); + + // Pre-condition: Namespace not in cache or catalog. + assert_matches!(cache.get_schema(&ns).await, Err(_)); + + // Place a schema in the cache for that name + let schema1 = NamespaceSchema::new( + NamespaceId::new(1), + TopicId::new(2), + QueryPoolId::new(3), + iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE, + iox_catalog::DEFAULT_MAX_TABLES, + iox_catalog::DEFAULT_RETENTION_PERIOD, + ); + assert_matches!(cache.put_schema(ns.clone(), schema1.clone()), None); + + // Ensure it is present + assert_eq!( + *cache + .get_schema(&ns) + .await + .expect("schema should be present in cache"), + schema1 + ); + } + + #[tokio::test] + async fn test_get_cache_miss_catalog_fetch_ok() { + let ns = NamespaceName::try_from("arán").expect("namespace name should be valid"); + + let inner = Arc::new(MemoryNamespaceCache::default()); + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + + let cache = Arc::new(ReadThroughCache::new(inner, Arc::clone(&catalog))); + + // Pre-condition: Namespace not in cache or catalog. + assert_matches!(cache.get_schema(&ns).await, Err(_)); + + // Place a schema in the catalog for that name + let schema1 = NamespaceSchema::new( + NamespaceId::new(1), + TopicId::new(2), + QueryPoolId::new(3), + iox_catalog::DEFAULT_MAX_COLUMNS_PER_TABLE, + iox_catalog::DEFAULT_MAX_TABLES, + iox_catalog::DEFAULT_RETENTION_PERIOD, + ); + assert_matches!( + catalog + .repositories() + .await + .namespaces() + .create( + &ns, + iox_catalog::DEFAULT_RETENTION_PERIOD, + schema1.topic_id, + schema1.query_pool_id, + ) + .await, + Ok(_) + ); + + // Query the cache again, should return the above schema after missing the cache. + assert_matches!(cache.get_schema(&ns).await, Ok(v) => { + assert_eq!(*v, schema1); + }) + } +} diff --git a/router/src/namespace_cache/sharded_cache.rs b/router/src/namespace_cache/sharded_cache.rs index 8125e7aa92..288f9a45b6 100644 --- a/router/src/namespace_cache/sharded_cache.rs +++ b/router/src/namespace_cache/sharded_cache.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use async_trait::async_trait; use data_types::{NamespaceName, NamespaceSchema}; use sharder::JumpHash; @@ -21,12 +22,18 @@ impl ShardedCache { } } +#[async_trait] impl NamespaceCache for Arc> where T: NamespaceCache, { - fn get_schema(&self, namespace: &NamespaceName<'_>) -> Option> { - self.shards.hash(namespace).get_schema(namespace) + type ReadError = T::ReadError; + + async fn get_schema( + &self, + namespace: &NamespaceName<'static>, + ) -> Result, Self::ReadError> { + self.shards.hash(namespace).get_schema(namespace).await } fn put_schema( @@ -40,8 +47,10 @@ where #[cfg(test)] mod tests { + use std::{collections::HashMap, iter}; + use assert_matches::assert_matches; use data_types::{NamespaceId, QueryPoolId, TopicId}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; @@ -70,8 +79,8 @@ mod tests { } } - #[test] - fn test_stable_cache_sharding() { + #[tokio::test] + async fn test_stable_cache_sharding() { // The number of namespaces to test with. const N: usize = 100; @@ -92,7 +101,7 @@ mod tests { // The cache should be empty. for name in names.keys() { - assert!(cache.get_schema(name).is_none()); + assert_matches!(cache.get_schema(name).await, Err(_)); } // Populate the cache @@ -104,7 +113,9 @@ mod tests { // The mapping should be stable for (name, id) in names { let want = schema_with_id(id as _); - assert_eq!(cache.get_schema(&name), Some(Arc::new(want))); + assert_matches!(cache.get_schema(&name).await, Ok(got) => { + assert_eq!(got, Arc::new(want)); + }); } } } diff --git a/router/src/namespace_resolver.rs b/router/src/namespace_resolver.rs index ebab033ec6..b0caacb2a9 100644 --- a/router/src/namespace_resolver.rs +++ b/router/src/namespace_resolver.rs @@ -1,11 +1,7 @@ //! An trait to abstract resolving a[`NamespaceName`] to [`NamespaceId`], and a //! collection of composable implementations. - -use std::{ops::DerefMut, sync::Arc}; - use async_trait::async_trait; use data_types::{NamespaceId, NamespaceName}; -use iox_catalog::interface::{get_schema_by_name, Catalog, SoftDeletedRows}; use observability_deps::tracing::*; use thiserror::Error; @@ -37,27 +33,25 @@ pub trait NamespaceResolver: std::fmt::Debug + Send + Sync { ) -> Result; } -/// An implementation of [`NamespaceResolver`] that queries the [`Catalog`] to -/// resolve a [`NamespaceId`], and populates the [`NamespaceCache`] as a side -/// effect. +/// An implementation of [`NamespaceResolver`] that resolves the [`NamespaceId`] +/// for a given name through a [`NamespaceCache`]. #[derive(Debug)] pub struct NamespaceSchemaResolver { - catalog: Arc, cache: C, } impl NamespaceSchemaResolver { - /// Construct a new [`NamespaceSchemaResolver`] that fetches schemas from - /// `catalog` and caches them in `cache`. - pub fn new(catalog: Arc, cache: C) -> Self { - Self { catalog, cache } + /// Construct a new [`NamespaceSchemaResolver`] that resolves namespace IDs + /// using `cache`. + pub fn new(cache: C) -> Self { + Self { cache } } } #[async_trait] impl NamespaceResolver for NamespaceSchemaResolver where - C: NamespaceCache, + C: NamespaceCache, { async fn get_namespace_id( &self, @@ -65,38 +59,9 @@ where ) -> Result { // Load the namespace schema from the cache, falling back to pulling it // from the global catalog (if it exists). - match self.cache.get_schema(namespace) { - Some(v) => Ok(v.id), - None => { - let mut repos = self.catalog.repositories().await; - - // Pull the schema from the global catalog or error if it does - // not exist. - let schema = get_schema_by_name( - namespace, - repos.deref_mut(), - SoftDeletedRows::ExcludeDeleted, - ) - .await - .map_err(|e| { - warn!( - error=%e, - %namespace, - "failed to retrieve namespace schema" - ); - Error::Lookup(e) - }) - .map(Arc::new)?; - - // Cache population MAY race with other threads and lead to - // overwrites, but an entry will always exist once inserted, and - // the schemas will eventually converge. - self.cache - .put_schema(namespace.clone(), Arc::clone(&schema)); - - trace!(%namespace, "schema cache populated"); - Ok(schema.id) - } + match self.cache.get_schema(namespace).await { + Ok(v) => Ok(v.id), + Err(e) => return Err(Error::Lookup(e)), } } } @@ -107,17 +72,26 @@ mod tests { use assert_matches::assert_matches; use data_types::{NamespaceId, NamespaceSchema, QueryPoolId, TopicId}; - use iox_catalog::mem::MemCatalog; + use iox_catalog::{ + interface::{Catalog, SoftDeletedRows}, + mem::MemCatalog, + }; use super::*; - use crate::namespace_cache::MemoryNamespaceCache; + use crate::namespace_cache::{MemoryNamespaceCache, ReadThroughCache}; #[tokio::test] async fn test_cache_hit() { let ns = NamespaceName::try_from("bananas").unwrap(); + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + // Prep the cache before the test to cause a hit - let cache = Arc::new(MemoryNamespaceCache::default()); + let cache = Arc::new(ReadThroughCache::new( + Arc::new(MemoryNamespaceCache::default()), + Arc::clone(&catalog), + )); cache.put_schema( ns.clone(), NamespaceSchema { @@ -131,10 +105,7 @@ mod tests { }, ); - let metrics = Arc::new(metric::Registry::new()); - let catalog: Arc = Arc::new(MemCatalog::new(metrics)); - - let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)); + let resolver = NamespaceSchemaResolver::new(Arc::clone(&cache)); // Drive the code under test resolver @@ -142,7 +113,7 @@ mod tests { .await .expect("lookup should succeed"); - assert!(cache.get_schema(&ns).is_some()); + assert!(cache.get_schema(&ns).await.is_ok()); // The cache hit should mean the catalog SHOULD NOT see a create request // for the namespace. @@ -162,9 +133,12 @@ mod tests { async fn test_cache_miss() { let ns = NamespaceName::try_from("bananas").unwrap(); - let cache = Arc::new(MemoryNamespaceCache::default()); let metrics = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + let cache = Arc::new(ReadThroughCache::new( + Arc::new(MemoryNamespaceCache::default()), + Arc::clone(&catalog), + )); // Create the namespace in the catalog { @@ -178,7 +152,7 @@ mod tests { .expect("failed to setup catalog state"); } - let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)); + let resolver = NamespaceSchemaResolver::new(Arc::clone(&cache)); resolver .get_namespace_id(&ns) @@ -186,16 +160,19 @@ mod tests { .expect("lookup should succeed"); // The cache should be populated as a result of the lookup. - assert!(cache.get_schema(&ns).is_some()); + assert!(cache.get_schema(&ns).await.is_ok()); } #[tokio::test] async fn test_cache_miss_soft_deleted() { let ns = NamespaceName::try_from("bananas").unwrap(); - let cache = Arc::new(MemoryNamespaceCache::default()); let metrics = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + let cache = Arc::new(ReadThroughCache::new( + Arc::new(MemoryNamespaceCache::default()), + Arc::clone(&catalog), + )); // Create the namespace in the catalog and mark it as deleted { @@ -214,7 +191,7 @@ mod tests { .expect("failed to setup catalog state"); } - let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)); + let resolver = NamespaceSchemaResolver::new(Arc::clone(&cache)); let err = resolver .get_namespace_id(&ns) @@ -226,18 +203,21 @@ mod tests { ); // The cache should NOT be populated as a result of the lookup. - assert!(cache.get_schema(&ns).is_none()); + assert!(cache.get_schema(&ns).await.is_err()); } #[tokio::test] async fn test_cache_miss_does_not_exist() { let ns = NamespaceName::try_from("bananas").unwrap(); - let cache = Arc::new(MemoryNamespaceCache::default()); let metrics = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + let cache = Arc::new(ReadThroughCache::new( + Arc::new(MemoryNamespaceCache::default()), + Arc::clone(&catalog), + )); - let resolver = NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)); + let resolver = NamespaceSchemaResolver::new(Arc::clone(&cache)); let err = resolver .get_namespace_id(&ns) @@ -245,6 +225,6 @@ mod tests { .expect_err("lookup should error"); assert_matches!(err, Error::Lookup(_)); - assert!(cache.get_schema(&ns).is_none()); + assert!(cache.get_schema(&ns).await.is_err()); } } diff --git a/router/src/namespace_resolver/ns_autocreation.rs b/router/src/namespace_resolver/ns_autocreation.rs index 4ab0e40e15..a0514ee098 100644 --- a/router/src/namespace_resolver/ns_autocreation.rs +++ b/router/src/namespace_resolver/ns_autocreation.rs @@ -79,7 +79,7 @@ impl NamespaceAutocreation { #[async_trait] impl NamespaceResolver for NamespaceAutocreation where - C: NamespaceCache, + C: NamespaceCache, // The resolver relies on the cache for read-through cache behaviour T: NamespaceResolver, { /// Force the creation of `namespace` if it does not already exist in the @@ -88,7 +88,7 @@ where &self, namespace: &NamespaceName<'static>, ) -> Result { - if self.cache.get_schema(namespace).is_none() { + if self.cache.get_schema(namespace).await.is_err() { trace!(%namespace, "namespace not found in cache"); match self.action { @@ -153,7 +153,7 @@ mod tests { use super::*; use crate::{ - namespace_cache::MemoryNamespaceCache, + namespace_cache::{MemoryNamespaceCache, ReadThroughCache}, namespace_resolver::{mock::MockNamespaceResolver, NamespaceSchemaResolver}, }; @@ -166,8 +166,14 @@ mod tests { let ns = NamespaceName::try_from("bananas").unwrap(); + let metrics = Arc::new(metric::Registry::new()); + let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + // Prep the cache before the test to cause a hit - let cache = Arc::new(MemoryNamespaceCache::default()); + let cache = Arc::new(ReadThroughCache::new( + Arc::new(MemoryNamespaceCache::default()), + Arc::clone(&catalog), + )); cache.put_schema( ns.clone(), NamespaceSchema { @@ -181,9 +187,6 @@ mod tests { }, ); - let metrics = Arc::new(metric::Registry::new()); - let catalog: Arc = Arc::new(MemCatalog::new(metrics)); - let creator = NamespaceAutocreation::new( MockNamespaceResolver::default().with_mapping(ns.clone(), NAMESPACE_ID), cache, @@ -218,10 +221,14 @@ mod tests { async fn test_cache_miss() { let ns = NamespaceName::try_from("bananas").unwrap(); - let cache = Arc::new(MemoryNamespaceCache::default()); let metrics = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + let cache = Arc::new(ReadThroughCache::new( + Arc::new(MemoryNamespaceCache::default()), + Arc::clone(&catalog), + )); + let creator = NamespaceAutocreation::new( MockNamespaceResolver::default().with_mapping(ns.clone(), NamespaceId::new(1)), cache, @@ -266,9 +273,12 @@ mod tests { async fn test_reject() { let ns = NamespaceName::try_from("bananas").unwrap(); - let cache = Arc::new(MemoryNamespaceCache::default()); let metrics = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + let cache = Arc::new(ReadThroughCache::new( + Arc::new(MemoryNamespaceCache::default()), + Arc::clone(&catalog), + )); let creator = NamespaceAutocreation::new( MockNamespaceResolver::default(), @@ -302,13 +312,16 @@ mod tests { async fn test_reject_exists_in_catalog() { let ns = NamespaceName::try_from("bananas").unwrap(); - let cache = Arc::new(MemoryNamespaceCache::default()); let metrics = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + let cache = Arc::new(ReadThroughCache::new( + Arc::new(MemoryNamespaceCache::default()), + Arc::clone(&catalog), + )); // First drive the population of the catalog let creator = NamespaceAutocreation::new( - NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)), + NamespaceSchemaResolver::new(Arc::clone(&cache)), Arc::clone(&cache), Arc::clone(&catalog), TopicId::new(42), @@ -323,7 +336,7 @@ mod tests { // Now try in "reject" mode. let creator = NamespaceAutocreation::new( - NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&cache)), + NamespaceSchemaResolver::new(Arc::clone(&cache)), cache, Arc::clone(&catalog), TopicId::new(42), diff --git a/router/tests/common/mod.rs b/router/tests/common/mod.rs index bfaad5c149..0f5f642c52 100644 --- a/router/tests/common/mod.rs +++ b/router/tests/common/mod.rs @@ -14,7 +14,7 @@ use router::{ InstrumentationDecorator, Partitioned, Partitioner, RetentionValidator, RpcWrite, SchemaValidator, WriteSummaryAdapter, }, - namespace_cache::{MemoryNamespaceCache, ShardedCache}, + namespace_cache::{MemoryNamespaceCache, ReadThroughCache, ShardedCache}, namespace_resolver::{MissingNamespaceAction, NamespaceAutocreation, NamespaceSchemaResolver}, server::{ grpc::RpcWriteGrpcDelegate, @@ -118,8 +118,12 @@ type HttpDelegateStack = HttpDelegate< Chain< Chain< Chain< - RetentionValidator>>>, - SchemaValidator>>>, + RetentionValidator< + Arc>>>>, + >, + SchemaValidator< + Arc>>>>, + >, >, Partitioner, >, @@ -132,8 +136,10 @@ type HttpDelegateStack = HttpDelegate< >, >, NamespaceAutocreation< - Arc>>, - NamespaceSchemaResolver>>>, + Arc>>>>, + NamespaceSchemaResolver< + Arc>>>>, + >, >, >; @@ -148,22 +154,23 @@ impl TestContext { let client = Arc::new(MockWriteClient::default()); let rpc_writer = RpcWrite::new([(Arc::clone(&client), "mock client")], None, &metrics); - let ns_cache = Arc::new(ShardedCache::new( - iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + let ns_cache = Arc::new(ReadThroughCache::new( + Arc::new(ShardedCache::new( + iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + )), + Arc::clone(&catalog), )); let schema_validator = SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache), &metrics); - let retention_validator = - RetentionValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache)); + let retention_validator = RetentionValidator::new(Arc::clone(&ns_cache)); let partitioner = Partitioner::new(PartitionTemplate { parts: vec![TemplatePart::TimeFormat("%Y-%m-%d".to_owned())], }); - let namespace_resolver = - NamespaceSchemaResolver::new(Arc::clone(&catalog), Arc::clone(&ns_cache)); + let namespace_resolver = NamespaceSchemaResolver::new(Arc::clone(&ns_cache)); let namespace_resolver = NamespaceAutocreation::new( namespace_resolver, Arc::clone(&ns_cache),