refactor(router): Return Result from NamespaceCache, use GAT for Error

This commit refactors the NamespaceCache trait to return a result
instead of an option for calls to `get_schema()`, allowing callers and
decorators to differentiate between cache misses, namespaces not
existing and transient I/O errors. This allows implementations to
interact with backend catalog storage.
pull/24376/head
Fraser Savage 2023-04-06 10:37:37 +01:00
parent 082e8db9ef
commit 0bb88dcd4f
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
8 changed files with 71 additions and 26 deletions

View File

@ -77,10 +77,9 @@ where
// Load the namespace schema from the cache, falling back to pulling it
// from the global catalog (if it exists).
let schema = self.cache.get_schema(namespace).await;
let schema = match schema {
Some(v) => v,
None => {
let schema = match self.cache.get_schema(namespace).await {
Ok(v) => v,
Err(_) => {
// Pull the schema from the global catalog or error if it does
// not exist.
let schema = get_schema_by_name(

View File

@ -182,8 +182,8 @@ where
// from the global catalog (if it exists).
let schema = self.cache.get_schema(namespace).await;
let schema = match schema {
Some(v) => v,
None => {
Ok(v) => v,
Err(_) => {
// Pull the schema from the global catalog or error if it does
// not exist.
let schema = get_schema_by_name(
@ -858,7 +858,7 @@ mod tests {
assert_matches!(err, SchemaError::NamespaceLookup(_));
// The cache should not have retained the schema.
assert!(handler.cache.get_schema(&ns).await.is_none());
assert!(handler.cache.get_schema(&ns).await.is_err());
}
#[tokio::test]
@ -1032,6 +1032,6 @@ mod tests {
.expect("request should succeed");
// Deletes have no effect on the cache.
assert!(handler.cache.get_schema(&ns).await.is_none());
assert!(handler.cache.get_schema(&ns).await.is_err());
}
}

View File

@ -8,7 +8,7 @@ pub use sharded_cache::*;
pub mod metrics;
use std::{fmt::Debug, sync::Arc};
use std::{error::Error, fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema};
@ -16,8 +16,16 @@ use data_types::{NamespaceName, NamespaceSchema};
/// An abstract cache of [`NamespaceSchema`].
#[async_trait]
pub trait NamespaceCache: Debug + Send + Sync {
/// The type of error a [`NamespaceCache`] implementation produces
/// when unable to read the [`NamespaceSchema`] requested from the
/// cache.
type ReadError: Error + Send;
/// Return the [`NamespaceSchema`] for `namespace`.
async fn get_schema(&self, namespace: &NamespaceName<'_>) -> Option<Arc<NamespaceSchema>>;
async fn get_schema(
&self,
namespace: &NamespaceName<'static>,
) -> Result<Arc<NamespaceSchema>, Self::ReadError>;
/// Place `schema` in the cache, unconditionally overwriting any existing
/// [`NamespaceSchema`] mapped to `namespace`, returning

View File

@ -4,9 +4,17 @@ use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema};
use hashbrown::HashMap;
use parking_lot::RwLock;
use thiserror::Error;
use super::NamespaceCache;
/// An error type indicating that `namespace` is not present in the cache.
#[derive(Debug, Error)]
#[error("namespace {namespace} not found in cache")]
pub struct CacheMissErr {
namespace: NamespaceName<'static>,
}
/// An in-memory cache of [`NamespaceSchema`] backed by a hashmap protected with
/// a read-write mutex.
#[derive(Debug, Default)]
@ -16,8 +24,18 @@ pub struct MemoryNamespaceCache {
#[async_trait]
impl NamespaceCache for Arc<MemoryNamespaceCache> {
async fn get_schema(&self, namespace: &NamespaceName<'_>) -> Option<Arc<NamespaceSchema>> {
self.cache.read().get(namespace).map(Arc::clone)
type ReadError = CacheMissErr;
async fn get_schema(
&self,
namespace: &NamespaceName<'static>,
) -> Result<Arc<NamespaceSchema>, Self::ReadError> {
match self.cache.read().get(namespace) {
Some(s) => Ok(Arc::clone(s)),
None => Err(CacheMissErr {
namespace: namespace.clone(),
}),
}
}
fn put_schema(
@ -31,6 +49,7 @@ impl NamespaceCache for Arc<MemoryNamespaceCache> {
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use data_types::{NamespaceId, QueryPoolId, TopicId};
use super::*;
@ -40,7 +59,12 @@ mod tests {
let ns = NamespaceName::new("test").expect("namespace name is valid");
let cache = Arc::new(MemoryNamespaceCache::default());
assert!(cache.get_schema(&ns).await.is_none());
assert_matches!(
cache.get_schema(&ns).await,
Err(CacheMissErr { namespace: got_ns }) => {
assert_eq!(got_ns, ns);
}
);
let schema1 = NamespaceSchema {
id: NamespaceId::new(42),

View File

@ -76,7 +76,12 @@ where
T: NamespaceCache,
P: TimeProvider,
{
async fn get_schema(&self, namespace: &NamespaceName<'_>) -> Option<Arc<NamespaceSchema>> {
type ReadError = T::ReadError;
async fn get_schema(
&self,
namespace: &NamespaceName<'static>,
) -> Result<Arc<NamespaceSchema>, Self::ReadError> {
let t = self.time_provider.now();
let res = self.inner.get_schema(namespace).await;
@ -84,8 +89,8 @@ where
// if it happens.
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
match &res {
Some(_) => self.get_hit.record(delta),
None => self.get_miss.record(delta),
Ok(_) => self.get_hit.record(delta),
Err(_) => self.get_miss.record(delta),
};
}

View File

@ -27,7 +27,12 @@ impl<T> NamespaceCache for Arc<ShardedCache<T>>
where
T: NamespaceCache,
{
async fn get_schema(&self, namespace: &NamespaceName<'_>) -> Option<Arc<NamespaceSchema>> {
type ReadError = T::ReadError;
async fn get_schema(
&self,
namespace: &NamespaceName<'static>,
) -> Result<Arc<NamespaceSchema>, Self::ReadError> {
self.shards.hash(namespace).get_schema(namespace).await
}
@ -42,6 +47,8 @@ where
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use std::{collections::HashMap, iter};
use data_types::{NamespaceId, QueryPoolId, TopicId};
@ -94,7 +101,7 @@ mod tests {
// The cache should be empty.
for name in names.keys() {
assert!(cache.get_schema(name).await.is_none());
assert_matches!(cache.get_schema(name).await, Err(_));
}
// Populate the cache
@ -106,7 +113,9 @@ mod tests {
// The mapping should be stable
for (name, id) in names {
let want = schema_with_id(id as _);
assert_eq!(cache.get_schema(&name).await, Some(Arc::new(want)));
assert_matches!(cache.get_schema(&name).await, Ok(got) => {
assert_eq!(got, Arc::new(want));
});
}
}
}

View File

@ -66,8 +66,8 @@ where
// Load the namespace schema from the cache, falling back to pulling it
// from the global catalog (if it exists).
match self.cache.get_schema(namespace).await {
Some(v) => Ok(v.id),
None => {
Ok(v) => Ok(v.id),
Err(_) => {
let mut repos = self.catalog.repositories().await;
// Pull the schema from the global catalog or error if it does
@ -142,7 +142,7 @@ mod tests {
.await
.expect("lookup should succeed");
assert!(cache.get_schema(&ns).await.is_some());
assert!(cache.get_schema(&ns).await.is_ok());
// The cache hit should mean the catalog SHOULD NOT see a create request
// for the namespace.
@ -186,7 +186,7 @@ mod tests {
.expect("lookup should succeed");
// The cache should be populated as a result of the lookup.
assert!(cache.get_schema(&ns).await.is_some());
assert!(cache.get_schema(&ns).await.is_ok());
}
#[tokio::test]
@ -226,7 +226,7 @@ mod tests {
);
// The cache should NOT be populated as a result of the lookup.
assert!(cache.get_schema(&ns).await.is_none());
assert!(cache.get_schema(&ns).await.is_err());
}
#[tokio::test]
@ -245,6 +245,6 @@ mod tests {
.expect_err("lookup should error");
assert_matches!(err, Error::Lookup(_));
assert!(cache.get_schema(&ns).await.is_none());
assert!(cache.get_schema(&ns).await.is_err());
}
}

View File

@ -88,7 +88,7 @@ where
&self,
namespace: &NamespaceName<'static>,
) -> Result<NamespaceId, super::Error> {
if self.cache.get_schema(namespace).await.is_none() {
if self.cache.get_schema(namespace).await.is_err() {
trace!(%namespace, "namespace not found in cache");
match self.action {