diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index 7951372234..22e78264c0 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -260,12 +260,10 @@ pub async fn create_router_server_type( // Initialise an instrumented namespace cache to be shared with the schema // validator, and namespace auto-creator that reports cache hit/miss/update // metrics. - let ns_cache = Arc::new(InstrumentedCache::new( - Arc::new(ShardedCache::new( - std::iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), - )), + let ns_cache = InstrumentedCache::new( + ShardedCache::new(std::iter::repeat_with(MemoryNamespaceCache::default).take(10)), &metrics, - )); + ); // Pre-warm the cache before adding the gossip layer to avoid broadcasting // the full cache content at startup. @@ -312,6 +310,8 @@ pub async fn create_router_server_type( // other peers, helping converge them. let ns_cache = match gossip_config.gossip_bind_address { Some(bind_addr) => { + let ns_cache = Arc::new(ns_cache); + // Initialise the NamespaceSchemaGossip responsible for applying the // incoming gossip schema diffs. let gossip_reader = Arc::new(NamespaceSchemaGossip::new(Arc::clone(&ns_cache))); @@ -513,7 +513,7 @@ mod tests { drop(repos); // Or it'll deadlock. - let cache = Arc::new(MemoryNamespaceCache::default()); + let cache = MemoryNamespaceCache::default(); pre_warm_schema_cache(&cache, &*catalog) .await .expect("pre-warming failed"); diff --git a/router/benches/namespace_schema_cache.rs b/router/benches/namespace_schema_cache.rs index dc8fb215e9..f4be57c003 100644 --- a/router/benches/namespace_schema_cache.rs +++ b/router/benches/namespace_schema_cache.rs @@ -26,14 +26,14 @@ fn init_ns_cache( let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); let cache = Arc::new(ShardedCache::new( - iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + iter::repeat_with(MemoryNamespaceCache::default).take(10), )); let (actor, handle) = AntiEntropyActor::new(Arc::clone(&cache)); rt.spawn(actor.run()); let cache = MerkleTree::new(cache, handle); - let cache = Arc::new(ReadThroughCache::new(cache, Arc::clone(&catalog))); + let cache = ReadThroughCache::new(cache, Arc::clone(&catalog)); for (name, schema) in initial_schema { cache.put_schema(name, schema); diff --git a/router/benches/schema_validator.rs b/router/benches/schema_validator.rs index c1a402004e..49bbe49fd1 100644 --- a/router/benches/schema_validator.rs +++ b/router/benches/schema_validator.rs @@ -43,9 +43,7 @@ fn bench(group: &mut BenchmarkGroup, tables: usize, columns_per_table: let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); let ns_cache = Arc::new(ReadThroughCache::new( - Arc::new(ShardedCache::new( - iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), - )), + ShardedCache::new(iter::repeat_with(MemoryNamespaceCache::default).take(10)), Arc::clone(&catalog), )); let validator = SchemaValidator::new(catalog, Arc::clone(&ns_cache), &metrics); diff --git a/router/src/dml_handlers/schema_validation.rs b/router/src/dml_handlers/schema_validation.rs index c10b94cacd..8a0655c2f0 100644 --- a/router/src/dml_handlers/schema_validation.rs +++ b/router/src/dml_handlers/schema_validation.rs @@ -541,13 +541,13 @@ mod tests { // Table exists and is over the column limit because of the race condition, { // Make two schema validator instances each with their own cache - let cache1 = setup_test_cache(&catalog); + let cache1 = Arc::new(setup_test_cache(&catalog)); let handler1 = SchemaValidator::new( catalog.catalog(), Arc::clone(&cache1), &catalog.metric_registry, ); - let cache2 = setup_test_cache(&catalog); + let cache2 = Arc::new(setup_test_cache(&catalog)); let handler2 = SchemaValidator::new( catalog.catalog(), Arc::clone(&cache2), @@ -753,11 +753,8 @@ mod tests { (catalog, namespace) } - fn setup_test_cache(catalog: &TestCatalog) -> Arc>> { - Arc::new(ReadThroughCache::new( - Arc::new(MemoryNamespaceCache::default()), - catalog.catalog(), - )) + fn setup_test_cache(catalog: &TestCatalog) -> ReadThroughCache { + ReadThroughCache::new(MemoryNamespaceCache::default(), catalog.catalog()) } async fn assert_cache(handler: &SchemaValidator, table: &str, col: &str, want: ColumnType) @@ -789,7 +786,7 @@ mod tests { let want_id = namespace.create_table("bananas").await.table.id; let metrics = Arc::new(metric::Registry::default()); - let cache = setup_test_cache(&catalog); + let cache = Arc::new(setup_test_cache(&catalog)); let handler = SchemaValidator::new(catalog.catalog(), Arc::clone(&cache), &metrics); let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456"); diff --git a/router/src/gossip/anti_entropy/mst/actor.rs b/router/src/gossip/anti_entropy/mst/actor.rs index 6690193836..8ce2c3cb43 100644 --- a/router/src/gossip/anti_entropy/mst/actor.rs +++ b/router/src/gossip/anti_entropy/mst/actor.rs @@ -156,7 +156,7 @@ mod tests { #[tokio::test] async fn test_empty_content_hash_fixture() { - let (actor, handle) = AntiEntropyActor::new(Arc::new(MemoryNamespaceCache::default())); + let (actor, handle) = AntiEntropyActor::new(MemoryNamespaceCache::default()); tokio::spawn(actor.run()); let got = handle.content_hash().await; diff --git a/router/src/gossip/schema_change_observer.rs b/router/src/gossip/schema_change_observer.rs index f73e2e7eb7..fd4e60ef9b 100644 --- a/router/src/gossip/schema_change_observer.rs +++ b/router/src/gossip/schema_change_observer.rs @@ -242,7 +242,7 @@ mod tests { async fn []() { let gossip = Arc::new(MockSchemaBroadcast::default()); let observer = SchemaChangeObserver::new( - Arc::new(MemoryNamespaceCache::default()), + MemoryNamespaceCache::default(), Arc::clone(&gossip) ); diff --git a/router/src/namespace_cache.rs b/router/src/namespace_cache.rs index ccce4b1a34..ebae54e5d6 100644 --- a/router/src/namespace_cache.rs +++ b/router/src/namespace_cache.rs @@ -43,6 +43,29 @@ pub trait NamespaceCache: Debug + Send + Sync { ) -> (Arc, ChangeStats); } +#[async_trait] +impl NamespaceCache for Arc +where + T: NamespaceCache, +{ + type ReadError = T::ReadError; + + async fn get_schema( + &self, + namespace: &NamespaceName<'static>, + ) -> Result, Self::ReadError> { + T::get_schema(self, namespace).await + } + + fn put_schema( + &self, + namespace: NamespaceName<'static>, + schema: NamespaceSchema, + ) -> (Arc, ChangeStats) { + T::put_schema(self, namespace, schema) + } +} + /// Change statistics describing how the cache entry was modified by the /// associated [`NamespaceCache::put_schema()`] call. #[derive(Debug, PartialEq, Eq)] diff --git a/router/src/namespace_cache/memory.rs b/router/src/namespace_cache/memory.rs index b278d06b21..6fcf6bc134 100644 --- a/router/src/namespace_cache/memory.rs +++ b/router/src/namespace_cache/memory.rs @@ -23,7 +23,7 @@ pub struct MemoryNamespaceCache { } #[async_trait] -impl NamespaceCache for Arc { +impl NamespaceCache for MemoryNamespaceCache { type ReadError = CacheMissErr; async fn get_schema( @@ -184,7 +184,7 @@ mod tests { #[tokio::test] async fn test_put_get() { let ns = NamespaceName::new("test").expect("namespace name is valid"); - let cache = Arc::new(MemoryNamespaceCache::default()); + let cache = MemoryNamespaceCache::default(); assert_matches!( cache.get_schema(&ns).await, @@ -292,7 +292,7 @@ mod tests { }; // Set up the cache and ensure there are no entries for the namespace. - let cache = Arc::new(MemoryNamespaceCache::default()); + let cache = MemoryNamespaceCache::default(); assert_matches!( cache.get_schema(&ns).await, Err(CacheMissErr { namespace: got_ns }) => { @@ -396,7 +396,7 @@ mod tests { }; // Set up the cache and ensure there are no entries for the namespace. - let cache = Arc::new(MemoryNamespaceCache::default()); + let cache = MemoryNamespaceCache::default(); assert_matches!( cache.get_schema(&ns).await, Err(CacheMissErr { namespace: got_ns }) => { @@ -551,7 +551,7 @@ mod tests { // Merge the schemas using the cache merge logic. let name = NamespaceName::try_from("bananas").unwrap(); - let cache = Arc::new(MemoryNamespaceCache::default()); + let cache = MemoryNamespaceCache::default(); let (got, stats_1) = cache.put_schema(name.clone(), a.clone()); assert_eq!(*got, a); // The new namespace should be unchanged assert_eq!(stats_1.new_tables, a.tables); diff --git a/router/src/namespace_cache/metrics.rs b/router/src/namespace_cache/metrics.rs index 020dce95bd..b0ad316017 100644 --- a/router/src/namespace_cache/metrics.rs +++ b/router/src/namespace_cache/metrics.rs @@ -71,7 +71,7 @@ impl InstrumentedCache { } #[async_trait] -impl NamespaceCache for Arc> +impl NamespaceCache for InstrumentedCache where T: NamespaceCache, P: TimeProvider, @@ -194,8 +194,8 @@ mod tests { async fn test_put() { let ns = NamespaceName::new("test").expect("namespace name is valid"); let registry = metric::Registry::default(); - let cache = Arc::new(MemoryNamespaceCache::default()); - let cache = Arc::new(InstrumentedCache::new(cache, ®istry)); + let cache = MemoryNamespaceCache::default(); + let cache = InstrumentedCache::new(cache, ®istry); // No tables let schema = new_schema(&[]); diff --git a/router/src/namespace_cache/read_through_cache.rs b/router/src/namespace_cache/read_through_cache.rs index 9d41efa4c1..dc41f993e8 100644 --- a/router/src/namespace_cache/read_through_cache.rs +++ b/router/src/namespace_cache/read_through_cache.rs @@ -39,7 +39,7 @@ impl ReadThroughCache { } #[async_trait] -impl NamespaceCache for Arc> +impl NamespaceCache for ReadThroughCache where T: NamespaceCache, { @@ -108,11 +108,11 @@ mod tests { async fn test_put_get() { let ns = NamespaceName::try_from("arán").expect("namespace name should be valid"); - let inner = Arc::new(MemoryNamespaceCache::default()); + let inner = MemoryNamespaceCache::default(); let metrics = Arc::new(metric::Registry::new()); let catalog = Arc::new(MemCatalog::new(metrics)); - let cache = Arc::new(ReadThroughCache::new(inner, catalog)); + let cache = ReadThroughCache::new(inner, catalog); // Pre-condition: Namespace not in cache or catalog. assert_matches!(cache.get_schema(&ns).await, Err(_)); @@ -144,11 +144,11 @@ mod tests { async fn test_get_cache_miss_catalog_fetch_ok() { let ns = NamespaceName::try_from("arán").expect("namespace name should be valid"); - let inner = Arc::new(MemoryNamespaceCache::default()); + let inner = MemoryNamespaceCache::default(); let metrics = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(metrics)); - let cache = Arc::new(ReadThroughCache::new(inner, Arc::clone(&catalog))); + let cache = ReadThroughCache::new(inner, Arc::clone(&catalog)); // Pre-condition: Namespace not in cache or catalog. assert_matches!(cache.get_schema(&ns).await, Err(_)); diff --git a/router/src/namespace_cache/sharded_cache.rs b/router/src/namespace_cache/sharded_cache.rs index cef698b5ff..d08db6a1ee 100644 --- a/router/src/namespace_cache/sharded_cache.rs +++ b/router/src/namespace_cache/sharded_cache.rs @@ -23,7 +23,7 @@ impl ShardedCache { } #[async_trait] -impl NamespaceCache for Arc> +impl NamespaceCache for ShardedCache where T: NamespaceCache, { @@ -86,9 +86,8 @@ mod tests { // The number of shards to hash into. const SHARDS: usize = 10; - let cache = Arc::new(ShardedCache::new( - iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(SHARDS), - )); + let cache = + ShardedCache::new(iter::repeat_with(MemoryNamespaceCache::default).take(SHARDS)); // Build a set of namespace -> unique integer to validate the shard // mapping later. diff --git a/router/src/namespace_resolver.rs b/router/src/namespace_resolver.rs index 2e8184c390..9bd0a40586 100644 --- a/router/src/namespace_resolver.rs +++ b/router/src/namespace_resolver.rs @@ -89,7 +89,7 @@ mod tests { // Prep the cache before the test to cause a hit let cache = Arc::new(ReadThroughCache::new( - Arc::new(MemoryNamespaceCache::default()), + MemoryNamespaceCache::default(), Arc::clone(&catalog), )); cache.put_schema( @@ -135,7 +135,7 @@ mod tests { let metrics = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(metrics)); let cache = Arc::new(ReadThroughCache::new( - Arc::new(MemoryNamespaceCache::default()), + MemoryNamespaceCache::default(), Arc::clone(&catalog), )); @@ -167,7 +167,7 @@ mod tests { let metrics = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(metrics)); let cache = Arc::new(ReadThroughCache::new( - Arc::new(MemoryNamespaceCache::default()), + MemoryNamespaceCache::default(), Arc::clone(&catalog), )); @@ -208,7 +208,7 @@ mod tests { let metrics = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(metrics)); let cache = Arc::new(ReadThroughCache::new( - Arc::new(MemoryNamespaceCache::default()), + MemoryNamespaceCache::default(), Arc::clone(&catalog), )); diff --git a/router/src/namespace_resolver/ns_autocreation.rs b/router/src/namespace_resolver/ns_autocreation.rs index 11e0d5bf00..7110dbf526 100644 --- a/router/src/namespace_resolver/ns_autocreation.rs +++ b/router/src/namespace_resolver/ns_autocreation.rs @@ -159,10 +159,7 @@ mod tests { let catalog: Arc = Arc::new(MemCatalog::new(metrics)); // Prep the cache before the test to cause a hit - let cache = Arc::new(ReadThroughCache::new( - Arc::new(MemoryNamespaceCache::default()), - Arc::clone(&catalog), - )); + let cache = ReadThroughCache::new(MemoryNamespaceCache::default(), Arc::clone(&catalog)); cache.put_schema( ns.clone(), NamespaceSchema { @@ -210,10 +207,7 @@ mod tests { let metrics = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(metrics)); - let cache = Arc::new(ReadThroughCache::new( - Arc::new(MemoryNamespaceCache::default()), - Arc::clone(&catalog), - )); + let cache = ReadThroughCache::new(MemoryNamespaceCache::default(), Arc::clone(&catalog)); let creator = NamespaceAutocreation::new( MockNamespaceResolver::default().with_mapping(ns.clone(), NamespaceId::new(1)), @@ -258,10 +252,7 @@ mod tests { let metrics = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(metrics)); - let cache = Arc::new(ReadThroughCache::new( - Arc::new(MemoryNamespaceCache::default()), - Arc::clone(&catalog), - )); + let cache = ReadThroughCache::new(MemoryNamespaceCache::default(), Arc::clone(&catalog)); let creator = NamespaceAutocreation::new( MockNamespaceResolver::default(), @@ -296,7 +287,7 @@ mod tests { let metrics = Arc::new(metric::Registry::new()); let catalog: Arc = Arc::new(MemCatalog::new(metrics)); let cache = Arc::new(ReadThroughCache::new( - Arc::new(MemoryNamespaceCache::default()), + MemoryNamespaceCache::default(), Arc::clone(&catalog), )); diff --git a/router/tests/common/mod.rs b/router/tests/common/mod.rs index a85a5c3f57..668cb46948 100644 --- a/router/tests/common/mod.rs +++ b/router/tests/common/mod.rs @@ -114,9 +114,7 @@ type HttpDelegateStack = HttpDelegate< Chain< Chain< RetentionValidator, - SchemaValidator< - Arc>>>>, - >, + SchemaValidator>>>>, >, Partitioner, >, @@ -127,10 +125,8 @@ type HttpDelegateStack = HttpDelegate< >, >, NamespaceAutocreation< - Arc>>>>, - NamespaceSchemaResolver< - Arc>>>>, - >, + Arc>>>, + NamespaceSchemaResolver>>>>, >, >; @@ -154,7 +150,7 @@ impl TestContext { let ns_cache = Arc::new(ReadThroughCache::new( Arc::new(ShardedCache::new( - iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + iter::repeat_with(MemoryNamespaceCache::default).take(10), )), Arc::clone(&catalog), ));