feat(router): Include table/column diff for namespace schema cache update

This adds some computational overhead during the merging of new
namespace schema with what's in the router's local cache, but will allow
gossiping of changes.
pull/24376/head
Fraser Savage 2023-07-27 13:34:47 +01:00
parent 73339cfc57
commit 5453ad8ba4
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
4 changed files with 132 additions and 63 deletions

View File

@ -386,7 +386,7 @@ pub struct Table {
}
/// Column definitions for a table
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TableSchema {
/// the table id
pub id: TableId,

View File

@ -11,10 +11,10 @@ pub mod metrics;
mod read_through_cache;
pub use read_through_cache::*;
use std::{error::Error, fmt::Debug, sync::Arc};
use std::{collections::BTreeMap, error::Error, fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema};
use data_types::{ColumnsByName, NamespaceName, NamespaceSchema, TableSchema};
/// An abstract cache of [`NamespaceSchema`].
#[async_trait]
@ -50,11 +50,11 @@ pub trait NamespaceCache: Debug + Send + Sync {
/// associated [`NamespaceCache::put_schema()`] call.
#[derive(Debug, PartialEq, Eq)]
pub struct ChangeStats {
/// The number of tables added to the cache.
pub(crate) new_tables: usize,
/// The new tables added to the cache.
pub(crate) new_tables: BTreeMap<String, TableSchema>,
/// The number of columns added to the cache (across all tables).
pub(crate) new_columns: usize,
/// The new columns added to cache for all pre-existing tables.
pub(crate) new_columns_per_table: BTreeMap<String, ColumnsByName>,
/// Indicates whether the change took place when an entry already
/// existed.

View File

@ -1,7 +1,7 @@
use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};
use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema};
use data_types::{ColumnsByName, NamespaceName, NamespaceSchema};
use hashbrown::HashMap;
use parking_lot::RwLock;
use thiserror::Error;
@ -56,12 +56,10 @@ impl NamespaceCache for Arc<MemoryNamespaceCache> {
Some(old) => merge_schema_additive(schema, old),
None => {
let change_stats = ChangeStats {
new_columns: schema
.tables
.values()
.map(|v| v.column_count())
.sum::<usize>(),
new_tables: schema.tables.len(),
new_tables: schema.tables.clone(),
// There are no pre-existing tables for columns to be added
// to, so don't need to build another map.
new_columns_per_table: Default::default(),
did_update: false,
};
(schema, change_stats)
@ -86,11 +84,13 @@ fn merge_schema_additive(
// invariant: Namespace partition template override should never change for a given name
assert_eq!(old_ns.partition_template, new_ns.partition_template);
let old_table_count = old_ns.tables.len();
let mut old_column_count = 0;
let mut new_columns: BTreeMap<String, ColumnsByName> = Default::default();
// Table schema missing from the new schema are added from the old. If the
// table exists in both the new and the old namespace schema then any column
// schema missing from the new table schema are added from the old.
// schema missing from the new table schema are added from the old, while
// columns added that are not in the old schema get placed in the
// `new_columns` set to be included in the returned [`ChangeStats`].
//
// This code performs get_mut() & insert() operations to populate `new_ns`,
// instead of using the BTreeMap's entry() API. This allows this loop to
@ -102,14 +102,30 @@ fn merge_schema_additive(
// to 0 as the schemas become fully populated, leaving the common path free
// of overhead.
for (old_table_name, old_table) in &old_ns.tables {
old_column_count += old_table.column_count();
match new_ns.tables.get_mut(old_table_name) {
Some(new_table) => {
for (column_name, column) in old_table.columns.iter() {
if !new_table.contains_column_name(column_name) {
new_table.add_column_schema(column_name.to_string(), *column);
// Insert old columns missing from the new table schema
for (old_column_name, old_column) in old_table.columns.iter() {
if !new_table.contains_column_name(old_column_name) {
new_table.add_column_schema(old_column_name.clone(), *old_column);
}
}
// Then take note of any columns added to the new table schema
// that are not present in the previous
let new_columns_in_table = new_table
.columns
.iter()
.filter_map(|(new_column_name, new_column_schema)| {
if old_table.contains_column_name(new_column_name) {
None
} else {
Some((new_column_name.clone(), *new_column_schema))
}
})
.collect::<BTreeMap<_, _>>();
if !new_columns_in_table.is_empty() {
new_columns.insert(old_table_name.clone(), new_columns_in_table.into());
}
}
None => {
new_ns
@ -119,17 +135,26 @@ fn merge_schema_additive(
}
}
// Work out the set of new tables added to the namespace schema and capture
// their schema in the [`ChangeStats`].
let new_tables = new_ns
.tables
.iter()
.filter_map(|(new_table_name, new_table_schema)| {
if old_ns.tables.contains_key(new_table_name) {
None
} else {
Some((new_table_name.clone(), new_table_schema.clone()))
}
})
.collect();
// To compute the change stats for the merge it is still necessary to iterate
// over the tables present in the new schema. The new schema may have
// introduced additional tables that won't be visited by the merge logic's logic.
let change_stats = ChangeStats {
new_tables: new_ns.tables.len() - old_table_count,
new_columns: new_ns
.tables
.values()
.map(|v| v.column_count())
.sum::<usize>()
- old_column_count,
new_tables,
new_columns_per_table: new_columns,
did_update: true,
};
(new_ns, change_stats)
@ -172,7 +197,7 @@ mod tests {
};
assert_matches!(cache.put_schema(ns.clone(), schema1.clone()), (new, s) => {
assert_eq!(*new, schema1);
assert_eq!(s.new_tables, 0);
assert!(s.new_tables.is_empty());
});
assert_eq!(
*cache.get_schema(&ns).await.expect("lookup failure"),
@ -190,7 +215,7 @@ mod tests {
assert_matches!(cache.put_schema(ns.clone(), schema2.clone()), (new, s) => {
assert_eq!(*new, schema2);
assert_eq!(s.new_tables, 0);
assert!(s.new_tables.is_empty());
});
assert_eq!(
*cache.get_schema(&ns).await.expect("lookup failure"),
@ -239,21 +264,21 @@ mod tests {
let schema_update_1 = NamespaceSchema {
id: NamespaceId::new(42),
tables: BTreeMap::from([(String::from(table_name), first_write_table_schema)]),
tables: BTreeMap::from([(String::from(table_name), first_write_table_schema.clone())]),
max_columns_per_table: 50,
max_tables: 24,
retention_period_ns: None,
partition_template: Default::default(),
};
let schema_update_2 = NamespaceSchema {
tables: BTreeMap::from([(String::from(table_name), second_write_table_schema)]),
tables: BTreeMap::from([(String::from(table_name), second_write_table_schema.clone())]),
..schema_update_1.clone()
};
let want_namespace_schema = {
let mut want_table_schema = empty_table_schema(table_id);
want_table_schema.add_column(column_1);
want_table_schema.add_column(column_2);
want_table_schema.add_column(column_1.clone());
want_table_schema.add_column(column_2.clone());
NamespaceSchema {
tables: BTreeMap::from([(String::from(table_name), want_table_schema)]),
..schema_update_1.clone()
@ -275,13 +300,21 @@ mod tests {
assert_eq!(*new_schema, schema_update_1);
assert_eq!(
new_stats,
ChangeStats { new_tables: 1, new_columns: 1, did_update: false }
ChangeStats { new_tables: schema_update_1.tables, new_columns_per_table: Default::default(), did_update: false }
);
}
);
assert_matches!(cache.put_schema(ns.clone(), schema_update_2), (new_schema, new_stats) => {
assert_eq!(*new_schema, want_namespace_schema);
assert_eq!(new_stats, ChangeStats{ new_tables: 0, new_columns: 1, did_update: true});
let want_new_columns = [(
String::from(table_name),
[(
column_2.name.clone(),
*second_write_table_schema.columns.get(column_2.name.as_str()).expect("should have column 2")
)].into_iter().collect::<BTreeMap<_,_>>().into(),
)].into_iter().collect();
assert_eq!(new_stats, ChangeStats{ new_tables: Default::default(), new_columns_per_table: want_new_columns, did_update: true});
});
let got_namespace_schema = cache
@ -304,26 +337,29 @@ mod tests {
// Each table has been given a column to assert the table merge logic
// produces the correct metrics.
let mut table_1 = empty_table_schema(TableId::new(1));
table_1.add_column(Column {
let column_1 = Column {
id: ColumnId::new(1),
table_id: TableId::new(1),
name: "column_a".to_string(),
column_type: ColumnType::String,
});
};
table_1.add_column(column_1);
let mut table_2 = empty_table_schema(TableId::new(2));
table_2.add_column(Column {
let column_2 = Column {
id: ColumnId::new(2),
table_id: TableId::new(2),
name: "column_b".to_string(),
column_type: ColumnType::String,
});
};
table_2.add_column(column_2);
let mut table_3 = empty_table_schema(TableId::new(3));
table_3.add_column(Column {
let column_3 = Column {
id: ColumnId::new(3),
table_id: TableId::new(3),
name: "column_c".to_string(),
column_type: ColumnType::String,
});
};
table_3.add_column(column_3);
let schema_update_1 = NamespaceSchema {
id: NamespaceId::new(42),
@ -346,9 +382,9 @@ mod tests {
let want_namespace_schema = NamespaceSchema {
tables: BTreeMap::from([
(String::from("table_1"), table_1),
(String::from("table_2"), table_2),
(String::from("table_3"), table_3),
(String::from("table_1"), table_1.clone()),
(String::from("table_2"), table_2.clone()),
(String::from("table_3"), table_3.clone()),
]),
..schema_update_1.clone()
};
@ -368,13 +404,14 @@ mod tests {
assert_eq!(*new_schema, schema_update_1);
assert_eq!(
new_stats,
ChangeStats { new_tables: 2, new_columns: 2, did_update: false }
ChangeStats { new_tables: schema_update_1.tables.clone(), new_columns_per_table: Default::default(), did_update: false }
);
}
);
assert_matches!(cache.put_schema(ns.clone(), schema_update_2), (new_schema, new_stats) => {
assert_eq!(*new_schema, want_namespace_schema);
assert_eq!(new_stats, ChangeStats{ new_tables: 1, new_columns: 1, did_update: true});
let want_new_tables = [(String::from("table_3"), table_3)].into_iter().collect();
assert_eq!(new_stats, ChangeStats{ new_tables: want_new_tables, new_columns_per_table: Default::default(), did_update: true});
});
let got_namespace_schema = cache
@ -443,10 +480,10 @@ mod tests {
}
}
/// Reduce `ns` into a set of `(table_name, column_name)` for all tables &
/// Reduce `ns_tables` into a set of `(table_name, column_name)` for all tables &
/// columns.
fn into_set(ns: &NamespaceSchema) -> HashSet<(String, String)> {
ns.tables
fn into_set(ns_tables: &BTreeMap<String, TableSchema>) -> HashSet<(String, String)> {
ns_tables
.iter()
.flat_map(|(table_name, col_set)| {
// Build a set of tuples in the form (table_name, column_name)
@ -459,6 +496,28 @@ mod tests {
.collect()
}
/// Construct a set of `(table_name, column_name)` from a set of table schema and
/// table-associated column schema.
fn into_set_with_columns(
new_tables: &BTreeMap<String, TableSchema>,
new_columns: &BTreeMap<String, ColumnsByName>,
) -> HashSet<(String, String)> {
let new_table_set = into_set(new_tables);
let new_column_set = new_columns
.iter()
.flat_map(|(table_name, col_set)| {
col_set
.names()
.into_iter()
.map(|col_name| (table_name.to_string(), col_name.to_string()))
})
.collect();
new_table_set
.union(&new_column_set)
.map(|v| v.to_owned())
.collect::<HashSet<_>>()
}
proptest! {
#[test]
fn prop_schema_merge(
@ -466,8 +525,8 @@ mod tests {
b in arbitrary_namespace_schema()
) {
// Convert inputs into sets
let known_a = into_set(&a);
let known_b = into_set(&b);
let known_a = into_set(&a.tables);
let known_b = into_set(&b.tables);
// Compute the union set of the input schema sets.
//
@ -479,12 +538,18 @@ mod tests {
let cache = Arc::new(MemoryNamespaceCache::default());
let (got, stats_1) = cache.put_schema(name.clone(), a.clone());
assert_eq!(*got, a); // The new namespace should be unchanged
assert_eq!(stats_1.new_tables, a.tables);
// Drive the merging logic
let (got, stats_2) = cache.put_schema(name, b.clone());
// Check the change stats return the difference
let want_change_stat_set = known_b.difference(&known_a).map(|v| v.to_owned()).collect::<HashSet<_>>();
let got_change_stat_set = into_set_with_columns(&stats_2.new_tables, &stats_2.new_columns_per_table);
assert_eq!(got_change_stat_set, want_change_stat_set);
// Reduce the merged schema into a comparable set.
let got_set = into_set(&got);
let got_set = into_set(&got.tables);
// Assert the table/column sets merged by the known good hashset
// union implementation, and the cache merging logic are the same.
@ -495,13 +560,6 @@ mod tests {
assert_eq!(got.max_columns_per_table, b.max_columns_per_table);
assert_eq!(got.max_tables, b.max_tables);
assert_eq!(got.retention_period_ns, b.retention_period_ns);
// Finally, assert the reported "newly added" statistics sum to the
// total of the inputs.
assert_eq!(stats_1.new_columns + stats_2.new_columns, want.len());
let tables = a.tables.keys().chain(b.tables.keys()).collect::<HashSet<_>>();
assert_eq!(stats_1.new_tables + stats_2.new_tables, tables.len());
}
}
}

View File

@ -116,8 +116,19 @@ where
// Figure out the difference between the new namespace and the
// evicted old namespace
// Adjust the metrics to reflect the change
self.table_count.inc(change_stats.new_tables as u64);
self.column_count.inc(change_stats.new_columns as u64);
self.table_count.inc(change_stats.new_tables.len() as u64);
let columns_for_new_tables = change_stats
.new_tables
.iter()
.fold(0, |acc, (_, table_schema)| {
acc + table_schema.column_count() as u64
});
let columns_for_existing_tables = change_stats
.new_columns_per_table
.iter()
.fold(0, |acc, (_, columns)| acc + columns.column_count() as u64);
self.column_count
.inc(columns_for_new_tables + columns_for_existing_tables);
(result, change_stats)
}