refactor(router): Use by-ref lookup API for NamespaceCache

Assert namespace ID does not change for the same named cache entry,
as it is an invariant.
pull/24376/head
Fraser Savage 2023-04-18 10:14:56 +01:00
parent b99141f880
commit dae06b4587
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
1 changed files with 33 additions and 43 deletions

View File

@ -42,42 +42,37 @@ impl NamespaceCache for Arc<MemoryNamespaceCache> {
fn put_schema(
&self,
namespace: NamespaceName<'static>,
mut schema: NamespaceSchema,
schema: NamespaceSchema,
) -> (Option<Arc<NamespaceSchema>>, Arc<NamespaceSchema>) {
let mut guard = self.cache.write();
match guard.get(&namespace) {
Some(old_ns) => {
// If the previous tenant has a different ID then take the new
// schema. The old may have been replaced.
if old_ns.id != schema.id {
let new_ns = Arc::new(schema);
return (guard.insert(namespace, Arc::clone(&new_ns)), new_ns);
}
let merged_schema = match guard.get(&namespace) {
Some(old) => merge_schema(old, schema),
None => schema,
};
for (table_name, new_table) in &mut schema.tables {
let old_columns = match old_ns.tables.get(table_name) {
Some(v) => &v.columns,
None => continue,
};
let ret = Arc::new(merged_schema);
(guard.insert(namespace, Arc::clone(&ret)), ret)
}
}
for (column_name, column) in old_columns {
new_table
.columns
.entry(column_name.clone())
.or_insert(*column);
}
}
fn merge_schema(old_ns: &Arc<NamespaceSchema>, mut new_ns: NamespaceSchema) -> NamespaceSchema {
// invariant: Namespace ID should never change for a given name
assert_eq!(old_ns.id, new_ns.id);
let ret = Arc::new(schema);
(guard.insert(namespace, Arc::clone(&ret)), ret)
}
None => {
let new_ns = Arc::new(schema);
(guard.insert(namespace, Arc::clone(&new_ns)), new_ns)
for (table_name, new_table) in &mut new_ns.tables {
let old_columns = match old_ns.tables.get(table_name) {
Some(v) => &v.columns,
None => continue,
};
for (column_name, column) in old_columns {
if !new_table.columns.contains_key(column_name) {
new_table.columns.insert(column_name.to_owned(), *column);
}
}
}
new_ns
}
#[cfg(test)]
@ -91,6 +86,8 @@ mod tests {
use super::*;
const TEST_NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
#[tokio::test]
async fn test_put_get() {
let ns = NamespaceName::new("test").expect("namespace name is valid");
@ -104,7 +101,7 @@ mod tests {
);
let schema1 = NamespaceSchema {
id: NamespaceId::new(42),
id: TEST_NAMESPACE_ID,
topic_id: TopicId::new(24),
query_pool_id: QueryPoolId::new(1234),
tables: Default::default(),
@ -119,7 +116,7 @@ mod tests {
);
let schema2 = NamespaceSchema {
id: NamespaceId::new(2),
id: TEST_NAMESPACE_ID,
topic_id: TopicId::new(2),
query_pool_id: QueryPoolId::new(2),
tables: Default::default(),
@ -142,7 +139,7 @@ mod tests {
}
#[tokio::test]
async fn test_put_racy_additive_merge() {
async fn test_put_additive_merge() {
let ns = NamespaceName::new("arán").expect("namespace name is valid");
let table_name = "arán";
let table_id = TableId::new(1);
@ -202,19 +199,12 @@ mod tests {
}
);
let cache_clone = Arc::clone(&cache);
let ns_clone = ns.clone();
let final_schema_clone = want_namespace_schema.clone();
tokio::task::spawn(async move {
assert_matches!(cache_clone.put_schema(ns_clone.clone(), schema_update_1.clone()), (None, new_schema) => {
assert_eq!(*new_schema, schema_update_1);
});
assert_matches!(cache_clone.put_schema(ns_clone.clone(), schema_update_2), (Some(_), new_schema) => {
assert_eq!(*new_schema, final_schema_clone);
});
})
.await
.unwrap();
assert_matches!(cache.put_schema(ns.clone(), schema_update_1.clone()), (None, new_schema) => {
assert_eq!(*new_schema, schema_update_1);
});
assert_matches!(cache.put_schema(ns.clone(), schema_update_2), (Some(_), new_schema) => {
assert_eq!(*new_schema, want_namespace_schema);
});
let got_namespace_schema = cache
.get_schema(&ns)