diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index a0dbd14e59..97815e1939 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -386,7 +386,7 @@ pub struct Table { } /// Column definitions for a table -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct TableSchema { /// the table id pub id: TableId, diff --git a/router/src/namespace_cache.rs b/router/src/namespace_cache.rs index 7e5a20df60..d44c1ab526 100644 --- a/router/src/namespace_cache.rs +++ b/router/src/namespace_cache.rs @@ -11,10 +11,10 @@ pub mod metrics; mod read_through_cache; pub use read_through_cache::*; -use std::{error::Error, fmt::Debug, sync::Arc}; +use std::{collections::BTreeMap, error::Error, fmt::Debug, sync::Arc}; use async_trait::async_trait; -use data_types::{NamespaceName, NamespaceSchema}; +use data_types::{ColumnsByName, NamespaceName, NamespaceSchema, TableSchema}; /// An abstract cache of [`NamespaceSchema`]. #[async_trait] @@ -50,11 +50,11 @@ pub trait NamespaceCache: Debug + Send + Sync { /// associated [`NamespaceCache::put_schema()`] call. #[derive(Debug, PartialEq, Eq)] pub struct ChangeStats { - /// The number of tables added to the cache. - pub(crate) new_tables: usize, + /// The new tables added to the cache. + pub(crate) new_tables: BTreeMap, - /// The number of columns added to the cache (across all tables). - pub(crate) new_columns: usize, + /// The new columns added to cache for all pre-existing tables. + pub(crate) new_columns_per_table: BTreeMap, /// Indicates whether the change took place when an entry already /// existed. diff --git a/router/src/namespace_cache/memory.rs b/router/src/namespace_cache/memory.rs index 67c080f592..fd778a0df0 100644 --- a/router/src/namespace_cache/memory.rs +++ b/router/src/namespace_cache/memory.rs @@ -1,7 +1,7 @@ -use std::sync::Arc; +use std::{collections::BTreeMap, sync::Arc}; use async_trait::async_trait; -use data_types::{NamespaceName, NamespaceSchema}; +use data_types::{ColumnsByName, NamespaceName, NamespaceSchema}; use hashbrown::HashMap; use parking_lot::RwLock; use thiserror::Error; @@ -56,12 +56,10 @@ impl NamespaceCache for Arc { Some(old) => merge_schema_additive(schema, old), None => { let change_stats = ChangeStats { - new_columns: schema - .tables - .values() - .map(|v| v.column_count()) - .sum::(), - new_tables: schema.tables.len(), + new_tables: schema.tables.clone(), + // There are no pre-existing tables for columns to be added + // to, so don't need to build another map. + new_columns_per_table: Default::default(), did_update: false, }; (schema, change_stats) @@ -86,11 +84,13 @@ fn merge_schema_additive( // invariant: Namespace partition template override should never change for a given name assert_eq!(old_ns.partition_template, new_ns.partition_template); - let old_table_count = old_ns.tables.len(); - let mut old_column_count = 0; + let mut new_columns: BTreeMap = Default::default(); + // Table schema missing from the new schema are added from the old. If the // table exists in both the new and the old namespace schema then any column - // schema missing from the new table schema are added from the old. + // schema missing from the new table schema are added from the old, while + // columns added that are not in the old schema get placed in the + // `new_columns` set to be included in the returned [`ChangeStats`]. // // This code performs get_mut() & insert() operations to populate `new_ns`, // instead of using the BTreeMap's entry() API. This allows this loop to @@ -102,14 +102,30 @@ fn merge_schema_additive( // to 0 as the schemas become fully populated, leaving the common path free // of overhead. for (old_table_name, old_table) in &old_ns.tables { - old_column_count += old_table.column_count(); match new_ns.tables.get_mut(old_table_name) { Some(new_table) => { - for (column_name, column) in old_table.columns.iter() { - if !new_table.contains_column_name(column_name) { - new_table.add_column_schema(column_name.to_string(), *column); + // Insert old columns missing from the new table schema + for (old_column_name, old_column) in old_table.columns.iter() { + if !new_table.contains_column_name(old_column_name) { + new_table.add_column_schema(old_column_name.clone(), *old_column); } } + // Then take note of any columns added to the new table schema + // that are not present in the previous + let new_columns_in_table = new_table + .columns + .iter() + .filter_map(|(new_column_name, new_column_schema)| { + if old_table.contains_column_name(new_column_name) { + None + } else { + Some((new_column_name.clone(), *new_column_schema)) + } + }) + .collect::>(); + if !new_columns_in_table.is_empty() { + new_columns.insert(old_table_name.clone(), new_columns_in_table.into()); + } } None => { new_ns @@ -119,17 +135,26 @@ fn merge_schema_additive( } } + // Work out the set of new tables added to the namespace schema and capture + // their schema in the [`ChangeStats`]. + let new_tables = new_ns + .tables + .iter() + .filter_map(|(new_table_name, new_table_schema)| { + if old_ns.tables.contains_key(new_table_name) { + None + } else { + Some((new_table_name.clone(), new_table_schema.clone())) + } + }) + .collect(); + // To compute the change stats for the merge it is still necessary to iterate // over the tables present in the new schema. The new schema may have // introduced additional tables that won't be visited by the merge logic's logic. let change_stats = ChangeStats { - new_tables: new_ns.tables.len() - old_table_count, - new_columns: new_ns - .tables - .values() - .map(|v| v.column_count()) - .sum::() - - old_column_count, + new_tables, + new_columns_per_table: new_columns, did_update: true, }; (new_ns, change_stats) @@ -172,7 +197,7 @@ mod tests { }; assert_matches!(cache.put_schema(ns.clone(), schema1.clone()), (new, s) => { assert_eq!(*new, schema1); - assert_eq!(s.new_tables, 0); + assert!(s.new_tables.is_empty()); }); assert_eq!( *cache.get_schema(&ns).await.expect("lookup failure"), @@ -190,7 +215,7 @@ mod tests { assert_matches!(cache.put_schema(ns.clone(), schema2.clone()), (new, s) => { assert_eq!(*new, schema2); - assert_eq!(s.new_tables, 0); + assert!(s.new_tables.is_empty()); }); assert_eq!( *cache.get_schema(&ns).await.expect("lookup failure"), @@ -239,21 +264,21 @@ mod tests { let schema_update_1 = NamespaceSchema { id: NamespaceId::new(42), - tables: BTreeMap::from([(String::from(table_name), first_write_table_schema)]), + tables: BTreeMap::from([(String::from(table_name), first_write_table_schema.clone())]), max_columns_per_table: 50, max_tables: 24, retention_period_ns: None, partition_template: Default::default(), }; let schema_update_2 = NamespaceSchema { - tables: BTreeMap::from([(String::from(table_name), second_write_table_schema)]), + tables: BTreeMap::from([(String::from(table_name), second_write_table_schema.clone())]), ..schema_update_1.clone() }; let want_namespace_schema = { let mut want_table_schema = empty_table_schema(table_id); - want_table_schema.add_column(column_1); - want_table_schema.add_column(column_2); + want_table_schema.add_column(column_1.clone()); + want_table_schema.add_column(column_2.clone()); NamespaceSchema { tables: BTreeMap::from([(String::from(table_name), want_table_schema)]), ..schema_update_1.clone() @@ -275,13 +300,21 @@ mod tests { assert_eq!(*new_schema, schema_update_1); assert_eq!( new_stats, - ChangeStats { new_tables: 1, new_columns: 1, did_update: false } + ChangeStats { new_tables: schema_update_1.tables, new_columns_per_table: Default::default(), did_update: false } ); } ); assert_matches!(cache.put_schema(ns.clone(), schema_update_2), (new_schema, new_stats) => { assert_eq!(*new_schema, want_namespace_schema); - assert_eq!(new_stats, ChangeStats{ new_tables: 0, new_columns: 1, did_update: true}); + let want_new_columns = [( + String::from(table_name), + [( + column_2.name.clone(), + *second_write_table_schema.columns.get(column_2.name.as_str()).expect("should have column 2") + )].into_iter().collect::>().into(), + )].into_iter().collect(); + + assert_eq!(new_stats, ChangeStats{ new_tables: Default::default(), new_columns_per_table: want_new_columns, did_update: true}); }); let got_namespace_schema = cache @@ -304,26 +337,29 @@ mod tests { // Each table has been given a column to assert the table merge logic // produces the correct metrics. let mut table_1 = empty_table_schema(TableId::new(1)); - table_1.add_column(Column { + let column_1 = Column { id: ColumnId::new(1), table_id: TableId::new(1), name: "column_a".to_string(), column_type: ColumnType::String, - }); + }; + table_1.add_column(column_1); let mut table_2 = empty_table_schema(TableId::new(2)); - table_2.add_column(Column { + let column_2 = Column { id: ColumnId::new(2), table_id: TableId::new(2), name: "column_b".to_string(), column_type: ColumnType::String, - }); + }; + table_2.add_column(column_2); let mut table_3 = empty_table_schema(TableId::new(3)); - table_3.add_column(Column { + let column_3 = Column { id: ColumnId::new(3), table_id: TableId::new(3), name: "column_c".to_string(), column_type: ColumnType::String, - }); + }; + table_3.add_column(column_3); let schema_update_1 = NamespaceSchema { id: NamespaceId::new(42), @@ -346,9 +382,9 @@ mod tests { let want_namespace_schema = NamespaceSchema { tables: BTreeMap::from([ - (String::from("table_1"), table_1), - (String::from("table_2"), table_2), - (String::from("table_3"), table_3), + (String::from("table_1"), table_1.clone()), + (String::from("table_2"), table_2.clone()), + (String::from("table_3"), table_3.clone()), ]), ..schema_update_1.clone() }; @@ -368,13 +404,14 @@ mod tests { assert_eq!(*new_schema, schema_update_1); assert_eq!( new_stats, - ChangeStats { new_tables: 2, new_columns: 2, did_update: false } + ChangeStats { new_tables: schema_update_1.tables.clone(), new_columns_per_table: Default::default(), did_update: false } ); } ); assert_matches!(cache.put_schema(ns.clone(), schema_update_2), (new_schema, new_stats) => { assert_eq!(*new_schema, want_namespace_schema); - assert_eq!(new_stats, ChangeStats{ new_tables: 1, new_columns: 1, did_update: true}); + let want_new_tables = [(String::from("table_3"), table_3)].into_iter().collect(); + assert_eq!(new_stats, ChangeStats{ new_tables: want_new_tables, new_columns_per_table: Default::default(), did_update: true}); }); let got_namespace_schema = cache @@ -443,10 +480,10 @@ mod tests { } } - /// Reduce `ns` into a set of `(table_name, column_name)` for all tables & + /// Reduce `ns_tables` into a set of `(table_name, column_name)` for all tables & /// columns. - fn into_set(ns: &NamespaceSchema) -> HashSet<(String, String)> { - ns.tables + fn into_set(ns_tables: &BTreeMap) -> HashSet<(String, String)> { + ns_tables .iter() .flat_map(|(table_name, col_set)| { // Build a set of tuples in the form (table_name, column_name) @@ -459,6 +496,28 @@ mod tests { .collect() } + /// Construct a set of `(table_name, column_name)` from a set of table schema and + /// table-associated column schema. + fn into_set_with_columns( + new_tables: &BTreeMap, + new_columns: &BTreeMap, + ) -> HashSet<(String, String)> { + let new_table_set = into_set(new_tables); + let new_column_set = new_columns + .iter() + .flat_map(|(table_name, col_set)| { + col_set + .names() + .into_iter() + .map(|col_name| (table_name.to_string(), col_name.to_string())) + }) + .collect(); + new_table_set + .union(&new_column_set) + .map(|v| v.to_owned()) + .collect::>() + } + proptest! { #[test] fn prop_schema_merge( @@ -466,8 +525,8 @@ mod tests { b in arbitrary_namespace_schema() ) { // Convert inputs into sets - let known_a = into_set(&a); - let known_b = into_set(&b); + let known_a = into_set(&a.tables); + let known_b = into_set(&b.tables); // Compute the union set of the input schema sets. // @@ -479,12 +538,18 @@ mod tests { let cache = Arc::new(MemoryNamespaceCache::default()); let (got, stats_1) = cache.put_schema(name.clone(), a.clone()); assert_eq!(*got, a); // The new namespace should be unchanged + assert_eq!(stats_1.new_tables, a.tables); // Drive the merging logic let (got, stats_2) = cache.put_schema(name, b.clone()); + // Check the change stats return the difference + let want_change_stat_set = known_b.difference(&known_a).map(|v| v.to_owned()).collect::>(); + let got_change_stat_set = into_set_with_columns(&stats_2.new_tables, &stats_2.new_columns_per_table); + assert_eq!(got_change_stat_set, want_change_stat_set); + // Reduce the merged schema into a comparable set. - let got_set = into_set(&got); + let got_set = into_set(&got.tables); // Assert the table/column sets merged by the known good hashset // union implementation, and the cache merging logic are the same. @@ -495,13 +560,6 @@ mod tests { assert_eq!(got.max_columns_per_table, b.max_columns_per_table); assert_eq!(got.max_tables, b.max_tables); assert_eq!(got.retention_period_ns, b.retention_period_ns); - - // Finally, assert the reported "newly added" statistics sum to the - // total of the inputs. - assert_eq!(stats_1.new_columns + stats_2.new_columns, want.len()); - - let tables = a.tables.keys().chain(b.tables.keys()).collect::>(); - assert_eq!(stats_1.new_tables + stats_2.new_tables, tables.len()); } } } diff --git a/router/src/namespace_cache/metrics.rs b/router/src/namespace_cache/metrics.rs index 4769ec8a5d..0719fa7e46 100644 --- a/router/src/namespace_cache/metrics.rs +++ b/router/src/namespace_cache/metrics.rs @@ -116,8 +116,19 @@ where // Figure out the difference between the new namespace and the // evicted old namespace // Adjust the metrics to reflect the change - self.table_count.inc(change_stats.new_tables as u64); - self.column_count.inc(change_stats.new_columns as u64); + self.table_count.inc(change_stats.new_tables.len() as u64); + let columns_for_new_tables = change_stats + .new_tables + .iter() + .fold(0, |acc, (_, table_schema)| { + acc + table_schema.column_count() as u64 + }); + let columns_for_existing_tables = change_stats + .new_columns_per_table + .iter() + .fold(0, |acc, (_, columns)| acc + columns.column_count() as u64); + self.column_count + .inc(columns_for_new_tables + columns_for_existing_tables); (result, change_stats) }