fix: explicit namespace creation w/ existing ns
Prior to this commit, namespaces that had been created on one router could not be used on another router until the latter was restarted. Effectively, newly created namespaces couldn't be used. After this commit, the catalog is also checked when a cache miss occurs, ensuring the router discovers new, not-yet-cached namespaces.pull/24376/head
parent
105e354299
commit
7eaa8f59b0
|
@ -36,10 +36,10 @@ impl NamespaceResolver for MockNamespaceResolver {
|
|||
&self,
|
||||
namespace: &NamespaceName<'static>,
|
||||
) -> Result<NamespaceId, super::Error> {
|
||||
Ok(*self
|
||||
.map
|
||||
.lock()
|
||||
.get(namespace)
|
||||
.expect("mock namespace resolver does not have ID"))
|
||||
Ok(*self.map.lock().get(namespace).ok_or(super::Error::Lookup(
|
||||
iox_catalog::interface::Error::NamespaceNotFoundByName {
|
||||
name: namespace.to_string(),
|
||||
},
|
||||
))?)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -91,15 +91,27 @@ where
|
|||
if self.cache.get_schema(namespace).is_none() {
|
||||
trace!(%namespace, "namespace not found in cache");
|
||||
|
||||
let mut repos = self.catalog.repositories().await;
|
||||
|
||||
match self.action {
|
||||
MissingNamespaceAction::Reject => {
|
||||
debug!(%namespace, "namespace not in catalog and autocreation disabled");
|
||||
return Err(NamespaceCreationError::Reject(namespace.into()).into());
|
||||
// The namespace is not cached, but may exist in the
|
||||
// catalog. Delegate discovery down to the inner handler,
|
||||
// and map the lookup error to a reject error.
|
||||
match self.inner.get_namespace_id(namespace).await {
|
||||
Ok(v) => return Ok(v),
|
||||
Err(super::Error::Lookup(
|
||||
iox_catalog::interface::Error::NamespaceNotFoundByName { .. },
|
||||
)) => {
|
||||
warn!(%namespace, "namespace not in catalog and auto-creation disabled");
|
||||
return Err(NamespaceCreationError::Reject(namespace.into()).into());
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
MissingNamespaceAction::AutoCreate(retention_period_ns) => {
|
||||
match repos
|
||||
match self
|
||||
.catalog
|
||||
.repositories()
|
||||
.await
|
||||
.namespaces()
|
||||
.create(
|
||||
namespace.as_str(),
|
||||
|
@ -256,7 +268,7 @@ mod tests {
|
|||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
||||
|
||||
let creator = NamespaceAutocreation::new(
|
||||
MockNamespaceResolver::default().with_mapping(ns.clone(), NamespaceId::new(1)),
|
||||
MockNamespaceResolver::default(),
|
||||
cache,
|
||||
Arc::clone(&catalog),
|
||||
TopicId::new(42),
|
||||
|
@ -276,4 +288,45 @@ mod tests {
|
|||
let mut repos = catalog.repositories().await;
|
||||
assert_matches!(repos.namespaces().get_by_name(ns.as_str()).await, Ok(None));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_reject_exists_in_catalog() {
|
||||
let ns = NamespaceName::try_from("bananas").unwrap();
|
||||
|
||||
let cache = Arc::new(MemoryNamespaceCache::default());
|
||||
let metrics = Arc::new(metric::Registry::new());
|
||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
||||
|
||||
// First drive the population of the catalog
|
||||
let creator = NamespaceAutocreation::new(
|
||||
MockNamespaceResolver::default().with_mapping(ns.clone(), NamespaceId::new(1)),
|
||||
Arc::clone(&cache),
|
||||
Arc::clone(&catalog),
|
||||
TopicId::new(42),
|
||||
QueryPoolId::new(42),
|
||||
MissingNamespaceAction::AutoCreate(TEST_RETENTION_PERIOD_NS),
|
||||
);
|
||||
|
||||
let created_id = creator
|
||||
.get_namespace_id(&ns)
|
||||
.await
|
||||
.expect("handler should succeed");
|
||||
|
||||
// Now try in "reject" mode.
|
||||
let creator = NamespaceAutocreation::new(
|
||||
MockNamespaceResolver::default().with_mapping(ns.clone(), NamespaceId::new(1)),
|
||||
cache,
|
||||
Arc::clone(&catalog),
|
||||
TopicId::new(42),
|
||||
QueryPoolId::new(42),
|
||||
MissingNamespaceAction::Reject,
|
||||
);
|
||||
|
||||
// It should not autocreate because we specified "rejection" behaviour, above
|
||||
let id = creator
|
||||
.get_namespace_id(&ns)
|
||||
.await
|
||||
.expect("should allow existing namespace from catalog");
|
||||
assert_eq!(created_id, id);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue