perf(router): Perform NamespaceCache schema merge out of lock

This re-introduces the potential racy conflicting schema updates, to
optimise for the expected read-heavy workload. This limits the point at
which write requests may race with schema updates to overlapping calls
to put, rather than the write call-path as a whole.
pull/24376/head
Fraser Savage 2023-04-24 15:33:19 +01:00
parent 065018be11
commit 5a9c68e428
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
2 changed files with 23 additions and 39 deletions

View File

@ -36,6 +36,9 @@ pub trait NamespaceCache: Debug + Send + Sync {
/// All data except the set of tables/columns have "last writer wins"
/// semantics. The resulting merged schema is returned, along with a set
/// of change statistics.
///
/// Concurrent calls to this method will race and may result in a schema
/// change being lost.
fn put_schema(
&self,
namespace: NamespaceName<'static>,

View File

@ -44,9 +44,15 @@ impl NamespaceCache for Arc<MemoryNamespaceCache> {
namespace: NamespaceName<'static>,
schema: NamespaceSchema,
) -> (Arc<NamespaceSchema>, ChangeStats) {
let mut guard = self.cache.write();
let old = self
.cache
.read()
.get(&namespace)
// The existing Arc is cloned to allow the merge to be performed without holding
// the read-lock on the cache
.map(Arc::clone);
let (merged_schema, change_stats) = match guard.remove(&namespace) {
let (merged_schema, change_stats) = match old {
Some(old) => merge_schema_additive(schema, old),
None => {
let change_stats = ChangeStats {
@ -62,7 +68,7 @@ impl NamespaceCache for Arc<MemoryNamespaceCache> {
};
let ret = Arc::new(merged_schema);
guard.insert(namespace, Arc::clone(&ret));
self.cache.write().insert(namespace, Arc::clone(&ret));
(ret, change_stats)
}
}
@ -79,49 +85,24 @@ fn merge_schema_additive(
let old_table_count = old_ns.tables.len();
let mut old_column_count = 0;
// In order to avoid un-necessary copies, the merge attempts to own the
// value for the old namespace.
//
// Table schema missing from the new schema are added from the old. If
// the table exists in both the new and the old namespace schema then
// any column schema missing from the new table schema are added from
// the old.
//
// The two match arms below must merge in the same way, aside from memory
// semantics.
match Arc::try_unwrap(old_ns) {
Ok(owned_old_ns) => {
for (old_table_name, old_table) in owned_old_ns.tables {
match new_ns.tables.get_mut(&old_table_name) {
Some(new_table) => {
old_column_count += old_table.columns.len();
for (column_name, column) in old_table.columns {
new_table.columns.entry(column_name).or_insert(column);
}
}
None => {
new_ns.tables.insert(old_table_name, old_table);
for (old_table_name, old_table) in &old_ns.tables {
match new_ns.tables.get_mut(old_table_name) {
Some(new_table) => {
old_column_count += old_table.columns.len();
for (column_name, column) in &old_table.columns {
if !new_table.columns.contains_key(column_name) {
new_table.columns.insert(column_name.to_owned(), *column);
}
}
}
}
Err(old_ns) => {
for (old_table_name, old_table) in &old_ns.tables {
match new_ns.tables.get_mut(old_table_name) {
Some(new_table) => {
old_column_count += old_table.columns.len();
for (column_name, column) in &old_table.columns {
if !new_table.columns.contains_key(column_name) {
new_table.columns.insert(column_name.to_owned(), *column);
}
}
}
None => {
new_ns
.tables
.insert(old_table_name.to_owned(), old_table.to_owned());
}
}
None => {
new_ns
.tables
.insert(old_table_name.to_owned(), old_table.to_owned());
}
}
}