Merge pull request #8668 from influxdata/dom/memory-cache-arc

refactor(router): remove unnecessary Arc wrapper in NamespaceCache stack
pull/24376/head
Dom 2023-09-05 14:42:33 +01:00 committed by GitHub
commit 91939decbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 67 additions and 63 deletions

View File

@ -260,12 +260,10 @@ pub async fn create_router_server_type(
// Initialise an instrumented namespace cache to be shared with the schema
// validator, and namespace auto-creator that reports cache hit/miss/update
// metrics.
let ns_cache = Arc::new(InstrumentedCache::new(
Arc::new(ShardedCache::new(
std::iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
)),
let ns_cache = InstrumentedCache::new(
ShardedCache::new(std::iter::repeat_with(MemoryNamespaceCache::default).take(10)),
&metrics,
));
);
// Pre-warm the cache before adding the gossip layer to avoid broadcasting
// the full cache content at startup.
@ -312,6 +310,8 @@ pub async fn create_router_server_type(
// other peers, helping converge them.
let ns_cache = match gossip_config.gossip_bind_address {
Some(bind_addr) => {
let ns_cache = Arc::new(ns_cache);
// Initialise the NamespaceSchemaGossip responsible for applying the
// incoming gossip schema diffs.
let gossip_reader = Arc::new(NamespaceSchemaGossip::new(Arc::clone(&ns_cache)));
@ -513,7 +513,7 @@ mod tests {
drop(repos); // Or it'll deadlock.
let cache = Arc::new(MemoryNamespaceCache::default());
let cache = MemoryNamespaceCache::default();
pre_warm_schema_cache(&cache, &*catalog)
.await
.expect("pre-warming failed");

View File

@ -26,14 +26,14 @@ fn init_ns_cache(
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
iter::repeat_with(MemoryNamespaceCache::default).take(10),
));
let (actor, handle) = AntiEntropyActor::new(Arc::clone(&cache));
rt.spawn(actor.run());
let cache = MerkleTree::new(cache, handle);
let cache = Arc::new(ReadThroughCache::new(cache, Arc::clone(&catalog)));
let cache = ReadThroughCache::new(cache, Arc::clone(&catalog));
for (name, schema) in initial_schema {
cache.put_schema(name, schema);

View File

@ -43,9 +43,7 @@ fn bench(group: &mut BenchmarkGroup<WallTime>, tables: usize, columns_per_table:
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let ns_cache = Arc::new(ReadThroughCache::new(
Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
)),
ShardedCache::new(iter::repeat_with(MemoryNamespaceCache::default).take(10)),
Arc::clone(&catalog),
));
let validator = SchemaValidator::new(catalog, Arc::clone(&ns_cache), &metrics);

View File

@ -541,13 +541,13 @@ mod tests {
// Table exists and is over the column limit because of the race condition,
{
// Make two schema validator instances each with their own cache
let cache1 = setup_test_cache(&catalog);
let cache1 = Arc::new(setup_test_cache(&catalog));
let handler1 = SchemaValidator::new(
catalog.catalog(),
Arc::clone(&cache1),
&catalog.metric_registry,
);
let cache2 = setup_test_cache(&catalog);
let cache2 = Arc::new(setup_test_cache(&catalog));
let handler2 = SchemaValidator::new(
catalog.catalog(),
Arc::clone(&cache2),
@ -753,11 +753,8 @@ mod tests {
(catalog, namespace)
}
fn setup_test_cache(catalog: &TestCatalog) -> Arc<ReadThroughCache<Arc<MemoryNamespaceCache>>> {
Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
catalog.catalog(),
))
fn setup_test_cache(catalog: &TestCatalog) -> ReadThroughCache<MemoryNamespaceCache> {
ReadThroughCache::new(MemoryNamespaceCache::default(), catalog.catalog())
}
async fn assert_cache<C>(handler: &SchemaValidator<C>, table: &str, col: &str, want: ColumnType)
@ -789,7 +786,7 @@ mod tests {
let want_id = namespace.create_table("bananas").await.table.id;
let metrics = Arc::new(metric::Registry::default());
let cache = setup_test_cache(&catalog);
let cache = Arc::new(setup_test_cache(&catalog));
let handler = SchemaValidator::new(catalog.catalog(), Arc::clone(&cache), &metrics);
let writes = lp_to_writes("bananas,tag1=A,tag2=B val=42i 123456");

View File

@ -156,7 +156,7 @@ mod tests {
#[tokio::test]
async fn test_empty_content_hash_fixture() {
let (actor, handle) = AntiEntropyActor::new(Arc::new(MemoryNamespaceCache::default()));
let (actor, handle) = AntiEntropyActor::new(MemoryNamespaceCache::default());
tokio::spawn(actor.run());
let got = handle.content_hash().await;

View File

@ -242,7 +242,7 @@ mod tests {
async fn [<test_observe_ $name>]() {
let gossip = Arc::new(MockSchemaBroadcast::default());
let observer = SchemaChangeObserver::new(
Arc::new(MemoryNamespaceCache::default()),
MemoryNamespaceCache::default(),
Arc::clone(&gossip)
);

View File

@ -43,6 +43,29 @@ pub trait NamespaceCache: Debug + Send + Sync {
) -> (Arc<NamespaceSchema>, ChangeStats);
}
#[async_trait]
impl<T> NamespaceCache for Arc<T>
where
T: NamespaceCache,
{
type ReadError = T::ReadError;
async fn get_schema(
&self,
namespace: &NamespaceName<'static>,
) -> Result<Arc<NamespaceSchema>, Self::ReadError> {
T::get_schema(self, namespace).await
}
fn put_schema(
&self,
namespace: NamespaceName<'static>,
schema: NamespaceSchema,
) -> (Arc<NamespaceSchema>, ChangeStats) {
T::put_schema(self, namespace, schema)
}
}
/// Change statistics describing how the cache entry was modified by the
/// associated [`NamespaceCache::put_schema()`] call.
#[derive(Debug, PartialEq, Eq)]

View File

@ -23,7 +23,7 @@ pub struct MemoryNamespaceCache {
}
#[async_trait]
impl NamespaceCache for Arc<MemoryNamespaceCache> {
impl NamespaceCache for MemoryNamespaceCache {
type ReadError = CacheMissErr;
async fn get_schema(
@ -184,7 +184,7 @@ mod tests {
#[tokio::test]
async fn test_put_get() {
let ns = NamespaceName::new("test").expect("namespace name is valid");
let cache = Arc::new(MemoryNamespaceCache::default());
let cache = MemoryNamespaceCache::default();
assert_matches!(
cache.get_schema(&ns).await,
@ -292,7 +292,7 @@ mod tests {
};
// Set up the cache and ensure there are no entries for the namespace.
let cache = Arc::new(MemoryNamespaceCache::default());
let cache = MemoryNamespaceCache::default();
assert_matches!(
cache.get_schema(&ns).await,
Err(CacheMissErr { namespace: got_ns }) => {
@ -396,7 +396,7 @@ mod tests {
};
// Set up the cache and ensure there are no entries for the namespace.
let cache = Arc::new(MemoryNamespaceCache::default());
let cache = MemoryNamespaceCache::default();
assert_matches!(
cache.get_schema(&ns).await,
Err(CacheMissErr { namespace: got_ns }) => {
@ -551,7 +551,7 @@ mod tests {
// Merge the schemas using the cache merge logic.
let name = NamespaceName::try_from("bananas").unwrap();
let cache = Arc::new(MemoryNamespaceCache::default());
let cache = MemoryNamespaceCache::default();
let (got, stats_1) = cache.put_schema(name.clone(), a.clone());
assert_eq!(*got, a); // The new namespace should be unchanged
assert_eq!(stats_1.new_tables, a.tables);

View File

@ -71,7 +71,7 @@ impl<T> InstrumentedCache<T> {
}
#[async_trait]
impl<T, P> NamespaceCache for Arc<InstrumentedCache<T, P>>
impl<T, P> NamespaceCache for InstrumentedCache<T, P>
where
T: NamespaceCache,
P: TimeProvider,
@ -194,8 +194,8 @@ mod tests {
async fn test_put() {
let ns = NamespaceName::new("test").expect("namespace name is valid");
let registry = metric::Registry::default();
let cache = Arc::new(MemoryNamespaceCache::default());
let cache = Arc::new(InstrumentedCache::new(cache, &registry));
let cache = MemoryNamespaceCache::default();
let cache = InstrumentedCache::new(cache, &registry);
// No tables
let schema = new_schema(&[]);

View File

@ -39,7 +39,7 @@ impl<T> ReadThroughCache<T> {
}
#[async_trait]
impl<T> NamespaceCache for Arc<ReadThroughCache<T>>
impl<T> NamespaceCache for ReadThroughCache<T>
where
T: NamespaceCache<ReadError = CacheMissErr>,
{
@ -108,11 +108,11 @@ mod tests {
async fn test_put_get() {
let ns = NamespaceName::try_from("arán").expect("namespace name should be valid");
let inner = Arc::new(MemoryNamespaceCache::default());
let inner = MemoryNamespaceCache::default();
let metrics = Arc::new(metric::Registry::new());
let catalog = Arc::new(MemCatalog::new(metrics));
let cache = Arc::new(ReadThroughCache::new(inner, catalog));
let cache = ReadThroughCache::new(inner, catalog);
// Pre-condition: Namespace not in cache or catalog.
assert_matches!(cache.get_schema(&ns).await, Err(_));
@ -144,11 +144,11 @@ mod tests {
async fn test_get_cache_miss_catalog_fetch_ok() {
let ns = NamespaceName::try_from("arán").expect("namespace name should be valid");
let inner = Arc::new(MemoryNamespaceCache::default());
let inner = MemoryNamespaceCache::default();
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let cache = Arc::new(ReadThroughCache::new(inner, Arc::clone(&catalog)));
let cache = ReadThroughCache::new(inner, Arc::clone(&catalog));
// Pre-condition: Namespace not in cache or catalog.
assert_matches!(cache.get_schema(&ns).await, Err(_));

View File

@ -23,7 +23,7 @@ impl<T> ShardedCache<T> {
}
#[async_trait]
impl<T> NamespaceCache for Arc<ShardedCache<T>>
impl<T> NamespaceCache for ShardedCache<T>
where
T: NamespaceCache,
{
@ -86,9 +86,8 @@ mod tests {
// The number of shards to hash into.
const SHARDS: usize = 10;
let cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(SHARDS),
));
let cache =
ShardedCache::new(iter::repeat_with(MemoryNamespaceCache::default).take(SHARDS));
// Build a set of namespace -> unique integer to validate the shard
// mapping later.

View File

@ -89,7 +89,7 @@ mod tests {
// Prep the cache before the test to cause a hit
let cache = Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
MemoryNamespaceCache::default(),
Arc::clone(&catalog),
));
cache.put_schema(
@ -135,7 +135,7 @@ mod tests {
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let cache = Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
MemoryNamespaceCache::default(),
Arc::clone(&catalog),
));
@ -167,7 +167,7 @@ mod tests {
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let cache = Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
MemoryNamespaceCache::default(),
Arc::clone(&catalog),
));
@ -208,7 +208,7 @@ mod tests {
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let cache = Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
MemoryNamespaceCache::default(),
Arc::clone(&catalog),
));

View File

@ -159,10 +159,7 @@ mod tests {
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
// Prep the cache before the test to cause a hit
let cache = Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
Arc::clone(&catalog),
));
let cache = ReadThroughCache::new(MemoryNamespaceCache::default(), Arc::clone(&catalog));
cache.put_schema(
ns.clone(),
NamespaceSchema {
@ -210,10 +207,7 @@ mod tests {
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let cache = Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
Arc::clone(&catalog),
));
let cache = ReadThroughCache::new(MemoryNamespaceCache::default(), Arc::clone(&catalog));
let creator = NamespaceAutocreation::new(
MockNamespaceResolver::default().with_mapping(ns.clone(), NamespaceId::new(1)),
@ -258,10 +252,7 @@ mod tests {
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let cache = Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
Arc::clone(&catalog),
));
let cache = ReadThroughCache::new(MemoryNamespaceCache::default(), Arc::clone(&catalog));
let creator = NamespaceAutocreation::new(
MockNamespaceResolver::default(),
@ -296,7 +287,7 @@ mod tests {
let metrics = Arc::new(metric::Registry::new());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
let cache = Arc::new(ReadThroughCache::new(
Arc::new(MemoryNamespaceCache::default()),
MemoryNamespaceCache::default(),
Arc::clone(&catalog),
));

View File

@ -114,9 +114,7 @@ type HttpDelegateStack = HttpDelegate<
Chain<
Chain<
RetentionValidator,
SchemaValidator<
Arc<ReadThroughCache<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>>,
>,
SchemaValidator<Arc<ReadThroughCache<Arc<ShardedCache<MemoryNamespaceCache>>>>>,
>,
Partitioner,
>,
@ -127,10 +125,8 @@ type HttpDelegateStack = HttpDelegate<
>,
>,
NamespaceAutocreation<
Arc<ReadThroughCache<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>>,
NamespaceSchemaResolver<
Arc<ReadThroughCache<Arc<ShardedCache<Arc<MemoryNamespaceCache>>>>>,
>,
Arc<ReadThroughCache<Arc<ShardedCache<MemoryNamespaceCache>>>>,
NamespaceSchemaResolver<Arc<ReadThroughCache<Arc<ShardedCache<MemoryNamespaceCache>>>>>,
>,
>;
@ -154,7 +150,7 @@ impl TestContext {
let ns_cache = Arc::new(ReadThroughCache::new(
Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
iter::repeat_with(MemoryNamespaceCache::default).take(10),
)),
Arc::clone(&catalog),
));