From 5453ad8ba4aa2f729b7ac0ceb829d5dbd343dcbc Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Thu, 27 Jul 2023 13:34:47 +0100 Subject: [PATCH] feat(router): Include table/column diff for namespace schema cache update This adds some computational overhead during the merging of new namespace schema with what's in the router's local cache, but will allow gossiping of changes. --- data_types/src/lib.rs | 2 +- router/src/namespace_cache.rs | 12 +- router/src/namespace_cache/memory.rs | 166 +++++++++++++++++--------- router/src/namespace_cache/metrics.rs | 15 ++- 4 files changed, 132 insertions(+), 63 deletions(-) diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index a0dbd14e59..97815e1939 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -386,7 +386,7 @@ pub struct Table { } /// Column definitions for a table -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct TableSchema { /// the table id pub id: TableId, diff --git a/router/src/namespace_cache.rs b/router/src/namespace_cache.rs index 7e5a20df60..d44c1ab526 100644 --- a/router/src/namespace_cache.rs +++ b/router/src/namespace_cache.rs @@ -11,10 +11,10 @@ pub mod metrics; mod read_through_cache; pub use read_through_cache::*; -use std::{error::Error, fmt::Debug, sync::Arc}; +use std::{collections::BTreeMap, error::Error, fmt::Debug, sync::Arc}; use async_trait::async_trait; -use data_types::{NamespaceName, NamespaceSchema}; +use data_types::{ColumnsByName, NamespaceName, NamespaceSchema, TableSchema}; /// An abstract cache of [`NamespaceSchema`]. #[async_trait] @@ -50,11 +50,11 @@ pub trait NamespaceCache: Debug + Send + Sync { /// associated [`NamespaceCache::put_schema()`] call. #[derive(Debug, PartialEq, Eq)] pub struct ChangeStats { - /// The number of tables added to the cache. - pub(crate) new_tables: usize, + /// The new tables added to the cache. + pub(crate) new_tables: BTreeMap, - /// The number of columns added to the cache (across all tables). - pub(crate) new_columns: usize, + /// The new columns added to cache for all pre-existing tables. + pub(crate) new_columns_per_table: BTreeMap, /// Indicates whether the change took place when an entry already /// existed. diff --git a/router/src/namespace_cache/memory.rs b/router/src/namespace_cache/memory.rs index 67c080f592..fd778a0df0 100644 --- a/router/src/namespace_cache/memory.rs +++ b/router/src/namespace_cache/memory.rs @@ -1,7 +1,7 @@ -use std::sync::Arc; +use std::{collections::BTreeMap, sync::Arc}; use async_trait::async_trait; -use data_types::{NamespaceName, NamespaceSchema}; +use data_types::{ColumnsByName, NamespaceName, NamespaceSchema}; use hashbrown::HashMap; use parking_lot::RwLock; use thiserror::Error; @@ -56,12 +56,10 @@ impl NamespaceCache for Arc { Some(old) => merge_schema_additive(schema, old), None => { let change_stats = ChangeStats { - new_columns: schema - .tables - .values() - .map(|v| v.column_count()) - .sum::(), - new_tables: schema.tables.len(), + new_tables: schema.tables.clone(), + // There are no pre-existing tables for columns to be added + // to, so don't need to build another map. + new_columns_per_table: Default::default(), did_update: false, }; (schema, change_stats) @@ -86,11 +84,13 @@ fn merge_schema_additive( // invariant: Namespace partition template override should never change for a given name assert_eq!(old_ns.partition_template, new_ns.partition_template); - let old_table_count = old_ns.tables.len(); - let mut old_column_count = 0; + let mut new_columns: BTreeMap = Default::default(); + // Table schema missing from the new schema are added from the old. If the // table exists in both the new and the old namespace schema then any column - // schema missing from the new table schema are added from the old. + // schema missing from the new table schema are added from the old, while + // columns added that are not in the old schema get placed in the + // `new_columns` set to be included in the returned [`ChangeStats`]. // // This code performs get_mut() & insert() operations to populate `new_ns`, // instead of using the BTreeMap's entry() API. This allows this loop to @@ -102,14 +102,30 @@ fn merge_schema_additive( // to 0 as the schemas become fully populated, leaving the common path free // of overhead. for (old_table_name, old_table) in &old_ns.tables { - old_column_count += old_table.column_count(); match new_ns.tables.get_mut(old_table_name) { Some(new_table) => { - for (column_name, column) in old_table.columns.iter() { - if !new_table.contains_column_name(column_name) { - new_table.add_column_schema(column_name.to_string(), *column); + // Insert old columns missing from the new table schema + for (old_column_name, old_column) in old_table.columns.iter() { + if !new_table.contains_column_name(old_column_name) { + new_table.add_column_schema(old_column_name.clone(), *old_column); } } + // Then take note of any columns added to the new table schema + // that are not present in the previous + let new_columns_in_table = new_table + .columns + .iter() + .filter_map(|(new_column_name, new_column_schema)| { + if old_table.contains_column_name(new_column_name) { + None + } else { + Some((new_column_name.clone(), *new_column_schema)) + } + }) + .collect::>(); + if !new_columns_in_table.is_empty() { + new_columns.insert(old_table_name.clone(), new_columns_in_table.into()); + } } None => { new_ns @@ -119,17 +135,26 @@ fn merge_schema_additive( } } + // Work out the set of new tables added to the namespace schema and capture + // their schema in the [`ChangeStats`]. + let new_tables = new_ns + .tables + .iter() + .filter_map(|(new_table_name, new_table_schema)| { + if old_ns.tables.contains_key(new_table_name) { + None + } else { + Some((new_table_name.clone(), new_table_schema.clone())) + } + }) + .collect(); + // To compute the change stats for the merge it is still necessary to iterate // over the tables present in the new schema. The new schema may have // introduced additional tables that won't be visited by the merge logic's logic. let change_stats = ChangeStats { - new_tables: new_ns.tables.len() - old_table_count, - new_columns: new_ns - .tables - .values() - .map(|v| v.column_count()) - .sum::() - - old_column_count, + new_tables, + new_columns_per_table: new_columns, did_update: true, }; (new_ns, change_stats) @@ -172,7 +197,7 @@ mod tests { }; assert_matches!(cache.put_schema(ns.clone(), schema1.clone()), (new, s) => { assert_eq!(*new, schema1); - assert_eq!(s.new_tables, 0); + assert!(s.new_tables.is_empty()); }); assert_eq!( *cache.get_schema(&ns).await.expect("lookup failure"), @@ -190,7 +215,7 @@ mod tests { assert_matches!(cache.put_schema(ns.clone(), schema2.clone()), (new, s) => { assert_eq!(*new, schema2); - assert_eq!(s.new_tables, 0); + assert!(s.new_tables.is_empty()); }); assert_eq!( *cache.get_schema(&ns).await.expect("lookup failure"), @@ -239,21 +264,21 @@ mod tests { let schema_update_1 = NamespaceSchema { id: NamespaceId::new(42), - tables: BTreeMap::from([(String::from(table_name), first_write_table_schema)]), + tables: BTreeMap::from([(String::from(table_name), first_write_table_schema.clone())]), max_columns_per_table: 50, max_tables: 24, retention_period_ns: None, partition_template: Default::default(), }; let schema_update_2 = NamespaceSchema { - tables: BTreeMap::from([(String::from(table_name), second_write_table_schema)]), + tables: BTreeMap::from([(String::from(table_name), second_write_table_schema.clone())]), ..schema_update_1.clone() }; let want_namespace_schema = { let mut want_table_schema = empty_table_schema(table_id); - want_table_schema.add_column(column_1); - want_table_schema.add_column(column_2); + want_table_schema.add_column(column_1.clone()); + want_table_schema.add_column(column_2.clone()); NamespaceSchema { tables: BTreeMap::from([(String::from(table_name), want_table_schema)]), ..schema_update_1.clone() @@ -275,13 +300,21 @@ mod tests { assert_eq!(*new_schema, schema_update_1); assert_eq!( new_stats, - ChangeStats { new_tables: 1, new_columns: 1, did_update: false } + ChangeStats { new_tables: schema_update_1.tables, new_columns_per_table: Default::default(), did_update: false } ); } ); assert_matches!(cache.put_schema(ns.clone(), schema_update_2), (new_schema, new_stats) => { assert_eq!(*new_schema, want_namespace_schema); - assert_eq!(new_stats, ChangeStats{ new_tables: 0, new_columns: 1, did_update: true}); + let want_new_columns = [( + String::from(table_name), + [( + column_2.name.clone(), + *second_write_table_schema.columns.get(column_2.name.as_str()).expect("should have column 2") + )].into_iter().collect::>().into(), + )].into_iter().collect(); + + assert_eq!(new_stats, ChangeStats{ new_tables: Default::default(), new_columns_per_table: want_new_columns, did_update: true}); }); let got_namespace_schema = cache @@ -304,26 +337,29 @@ mod tests { // Each table has been given a column to assert the table merge logic // produces the correct metrics. let mut table_1 = empty_table_schema(TableId::new(1)); - table_1.add_column(Column { + let column_1 = Column { id: ColumnId::new(1), table_id: TableId::new(1), name: "column_a".to_string(), column_type: ColumnType::String, - }); + }; + table_1.add_column(column_1); let mut table_2 = empty_table_schema(TableId::new(2)); - table_2.add_column(Column { + let column_2 = Column { id: ColumnId::new(2), table_id: TableId::new(2), name: "column_b".to_string(), column_type: ColumnType::String, - }); + }; + table_2.add_column(column_2); let mut table_3 = empty_table_schema(TableId::new(3)); - table_3.add_column(Column { + let column_3 = Column { id: ColumnId::new(3), table_id: TableId::new(3), name: "column_c".to_string(), column_type: ColumnType::String, - }); + }; + table_3.add_column(column_3); let schema_update_1 = NamespaceSchema { id: NamespaceId::new(42), @@ -346,9 +382,9 @@ mod tests { let want_namespace_schema = NamespaceSchema { tables: BTreeMap::from([ - (String::from("table_1"), table_1), - (String::from("table_2"), table_2), - (String::from("table_3"), table_3), + (String::from("table_1"), table_1.clone()), + (String::from("table_2"), table_2.clone()), + (String::from("table_3"), table_3.clone()), ]), ..schema_update_1.clone() }; @@ -368,13 +404,14 @@ mod tests { assert_eq!(*new_schema, schema_update_1); assert_eq!( new_stats, - ChangeStats { new_tables: 2, new_columns: 2, did_update: false } + ChangeStats { new_tables: schema_update_1.tables.clone(), new_columns_per_table: Default::default(), did_update: false } ); } ); assert_matches!(cache.put_schema(ns.clone(), schema_update_2), (new_schema, new_stats) => { assert_eq!(*new_schema, want_namespace_schema); - assert_eq!(new_stats, ChangeStats{ new_tables: 1, new_columns: 1, did_update: true}); + let want_new_tables = [(String::from("table_3"), table_3)].into_iter().collect(); + assert_eq!(new_stats, ChangeStats{ new_tables: want_new_tables, new_columns_per_table: Default::default(), did_update: true}); }); let got_namespace_schema = cache @@ -443,10 +480,10 @@ mod tests { } } - /// Reduce `ns` into a set of `(table_name, column_name)` for all tables & + /// Reduce `ns_tables` into a set of `(table_name, column_name)` for all tables & /// columns. - fn into_set(ns: &NamespaceSchema) -> HashSet<(String, String)> { - ns.tables + fn into_set(ns_tables: &BTreeMap) -> HashSet<(String, String)> { + ns_tables .iter() .flat_map(|(table_name, col_set)| { // Build a set of tuples in the form (table_name, column_name) @@ -459,6 +496,28 @@ mod tests { .collect() } + /// Construct a set of `(table_name, column_name)` from a set of table schema and + /// table-associated column schema. + fn into_set_with_columns( + new_tables: &BTreeMap, + new_columns: &BTreeMap, + ) -> HashSet<(String, String)> { + let new_table_set = into_set(new_tables); + let new_column_set = new_columns + .iter() + .flat_map(|(table_name, col_set)| { + col_set + .names() + .into_iter() + .map(|col_name| (table_name.to_string(), col_name.to_string())) + }) + .collect(); + new_table_set + .union(&new_column_set) + .map(|v| v.to_owned()) + .collect::>() + } + proptest! { #[test] fn prop_schema_merge( @@ -466,8 +525,8 @@ mod tests { b in arbitrary_namespace_schema() ) { // Convert inputs into sets - let known_a = into_set(&a); - let known_b = into_set(&b); + let known_a = into_set(&a.tables); + let known_b = into_set(&b.tables); // Compute the union set of the input schema sets. // @@ -479,12 +538,18 @@ mod tests { let cache = Arc::new(MemoryNamespaceCache::default()); let (got, stats_1) = cache.put_schema(name.clone(), a.clone()); assert_eq!(*got, a); // The new namespace should be unchanged + assert_eq!(stats_1.new_tables, a.tables); // Drive the merging logic let (got, stats_2) = cache.put_schema(name, b.clone()); + // Check the change stats return the difference + let want_change_stat_set = known_b.difference(&known_a).map(|v| v.to_owned()).collect::>(); + let got_change_stat_set = into_set_with_columns(&stats_2.new_tables, &stats_2.new_columns_per_table); + assert_eq!(got_change_stat_set, want_change_stat_set); + // Reduce the merged schema into a comparable set. - let got_set = into_set(&got); + let got_set = into_set(&got.tables); // Assert the table/column sets merged by the known good hashset // union implementation, and the cache merging logic are the same. @@ -495,13 +560,6 @@ mod tests { assert_eq!(got.max_columns_per_table, b.max_columns_per_table); assert_eq!(got.max_tables, b.max_tables); assert_eq!(got.retention_period_ns, b.retention_period_ns); - - // Finally, assert the reported "newly added" statistics sum to the - // total of the inputs. - assert_eq!(stats_1.new_columns + stats_2.new_columns, want.len()); - - let tables = a.tables.keys().chain(b.tables.keys()).collect::>(); - assert_eq!(stats_1.new_tables + stats_2.new_tables, tables.len()); } } } diff --git a/router/src/namespace_cache/metrics.rs b/router/src/namespace_cache/metrics.rs index 4769ec8a5d..0719fa7e46 100644 --- a/router/src/namespace_cache/metrics.rs +++ b/router/src/namespace_cache/metrics.rs @@ -116,8 +116,19 @@ where // Figure out the difference between the new namespace and the // evicted old namespace // Adjust the metrics to reflect the change - self.table_count.inc(change_stats.new_tables as u64); - self.column_count.inc(change_stats.new_columns as u64); + self.table_count.inc(change_stats.new_tables.len() as u64); + let columns_for_new_tables = change_stats + .new_tables + .iter() + .fold(0, |acc, (_, table_schema)| { + acc + table_schema.column_count() as u64 + }); + let columns_for_existing_tables = change_stats + .new_columns_per_table + .iter() + .fold(0, |acc, (_, columns)| acc + columns.column_count() as u64); + self.column_count + .inc(columns_for_new_tables + columns_for_existing_tables); (result, change_stats) }