diff --git a/router/src/gossip/anti_entropy/merkle.rs b/router/src/gossip/anti_entropy/merkle.rs index bfb240f6be..f2ec5147d2 100644 --- a/router/src/gossip/anti_entropy/merkle.rs +++ b/router/src/gossip/anti_entropy/merkle.rs @@ -111,3 +111,58 @@ impl<'a> std::hash::Hash for NamespaceContentHash<'a> { self.0.tables.hash(state); } } + +#[cfg(test)] +mod tests { + use std::{collections::hash_map::DefaultHasher, hash::Hasher}; + + use super::*; + + use super::super::tests::arbitrary_namespace_schema; + + use data_types::NamespaceId; + use proptest::prelude::*; + + proptest! { + /// Assert the [`NamespaceContentHash`] decorator results in hashes that + /// are equal iff the tables and namespace ID match. + /// + /// All other fields may vary without affecting the hash. + #[test] + fn prop_content_hash_coverage( + mut a in arbitrary_namespace_schema(0..1), + b in arbitrary_namespace_schema(0..1) + ) { + assert_eq!(a.id, b.id); // Test invariant + + let wrap_a = NamespaceContentHash(&a); + let wrap_b = NamespaceContentHash(&b); + + // Invariant: if the schemas are equal, the content hashes match + if a == b { + assert_eq!(hash(&wrap_a), hash(&wrap_b)); + } + + // True if the content hashes of a and b are equal. + let is_hash_eq = hash(wrap_a) == hash(wrap_b); + + // Invariant: if the tables and ID match, the content hashes match + assert_eq!( + ((a.tables == b.tables) && (a.id == b.id)), + is_hash_eq + ); + + // Invariant: the hashes chaange if the ID is modified + let new_id = NamespaceId::new(a.id.get().wrapping_add(1)); + let hash_old_a = hash(&a); + a.id = new_id; + assert_ne!(hash_old_a, hash(a)); + } + } + + fn hash(v: impl std::hash::Hash) -> u64 { + let mut hasher = DefaultHasher::default(); + v.hash(&mut hasher); + hasher.finish() + } +} diff --git a/router/src/gossip/anti_entropy/mod.rs b/router/src/gossip/anti_entropy/mod.rs index 8ec7e94688..be9e61d860 100644 --- a/router/src/gossip/anti_entropy/mod.rs +++ b/router/src/gossip/anti_entropy/mod.rs @@ -1,3 +1,148 @@ //! Anti-entropy primitives providing eventual consistency over gossip. pub mod merkle; + +#[cfg(test)] +mod tests { + use std::{collections::BTreeMap, ops::Range, sync::Arc}; + + use crate::{ + gossip::anti_entropy::merkle::MerkleTree, + namespace_cache::{MemoryNamespaceCache, NamespaceCache}, + }; + + use data_types::{ + ColumnId, ColumnSchema, ColumnType, ColumnsByName, NamespaceId, NamespaceName, + NamespaceSchema, TableId, TableSchema, + }; + use proptest::prelude::*; + + /// A set of table and column names from which arbitrary names are selected + /// in prop tests, instead of using random values that have a low + /// probability of overlap. + const TEST_TABLE_NAME_SET: &[&str] = &[ + "bananas", "quiero", "un", "platano", "donkey", "goose", "egg", "mr_toro", + ]; + + prop_compose! { + /// Generate a series of ColumnSchema assigned randomised IDs with a + /// stable mapping of `id -> data type`. + /// + /// This generates at most 255 unique columns. + pub fn arbitrary_column_schema_stable()(id in 0_i16..255) -> ColumnSchema { + // Provide a stable mapping of ID to data type to avoid column type + // conflicts by reducing the ID to the data type discriminant range + // and using that to assign the data type. + let col_type = ColumnType::try_from((id % 7) + 1).expect("valid discriminator range"); + + ColumnSchema { id: ColumnId::new(id as _), column_type: col_type } + } + } + + prop_compose! { + /// Generate an arbitrary TableSchema with up to 255 columns that + /// contain stable `column name -> data type` and `column name -> column + /// id` mappings. + pub fn arbitrary_table_schema()( + id in any::(), + columns in proptest::collection::hash_set( + arbitrary_column_schema_stable(), + (0, 255) // Set size range + ), + ) -> TableSchema { + // Map the column schemas into `name -> schema`, generating a + // column name derived from the column ID to ensure a consistent + // mapping of name -> id, and in turn, name -> data type. + let columns = columns.into_iter() + .map(|v| (format!("col-{}", v.id.get()), v)) + .collect::>(); + + let columns = ColumnsByName::from(columns); + TableSchema { + id: TableId::new(id), + partition_template: Default::default(), + columns, + } + } + } + + prop_compose! { + /// Generate an arbitrary NamespaceSchema that contains tables from + /// [`TEST_TABLE_NAME_SET`], containing up to 255 columns with stable + /// `name -> (id, data type)` mappings. + /// + /// Namespace IDs are allocated from the specified range. + pub fn arbitrary_namespace_schema(id_range: Range)( + namespace_id in id_range, + tables in proptest::collection::btree_map( + proptest::sample::select(TEST_TABLE_NAME_SET), + arbitrary_table_schema(), + (0, 10) // Set size range + ), + max_columns_per_table in any::(), + max_tables in any::(), + retention_period_ns in any::>(), + ) -> NamespaceSchema { + let tables = tables.into_iter().map(|(k, v)| (k.to_string(), v)).collect(); + NamespaceSchema { + id: NamespaceId::new(namespace_id), + tables, + max_columns_per_table, + max_tables, + retention_period_ns, + partition_template: Default::default(), + } + } + } + + fn name_for_schema(schema: &NamespaceSchema) -> NamespaceName<'static> { + NamespaceName::try_from(format!("ns-{}", schema.id)).unwrap() + } + + proptest! { + /// Assert that two distinct namespace cache instances return identical + /// content hashes after applying a given set of cache updates. + #[test] + fn prop_content_hash_diverge_converge( + // A variable number of cache entry updates for 2 namespace IDs + updates in prop::collection::vec(arbitrary_namespace_schema(0..2), 0..10), + // An arbitrary namespace with an ID that lies outside of `updates`. + last_update in arbitrary_namespace_schema(42..100), + ) { + let ns_a = MerkleTree::new(Arc::new(MemoryNamespaceCache::default())); + let ns_b = MerkleTree::new(Arc::new(MemoryNamespaceCache::default())); + + // Invariant: two empty namespace caches have the same content hash. + assert_eq!(ns_a.content_hash(), ns_b.content_hash()); + + for update in updates { + // Generate a unique, deterministic name for this namespace. + let name = name_for_schema(&update); + + // Apply the update (which may be a no-op) to both. + ns_a.put_schema(name.clone(), update.clone()); + ns_b.put_schema(name, update); + + // Invariant: after applying the same update, the content hashes + // MUST match (even if this update was a no-op / not an update) + assert_eq!(ns_a.content_hash(), ns_b.content_hash()); + } + + // At this point all updates have been applied to both caches. + // + // Add a new cache entry that doesn't yet exist, and assert this + // causes the caches to diverge, and then once again reconverge. + let name = name_for_schema(&last_update); + ns_a.put_schema(name.clone(), last_update.clone()); + + // Invariant: last_update definitely added new cache content, + // therefore the cache content hashes MUST diverge. + assert_ne!(ns_a.content_hash(), ns_b.content_hash()); + + // Invariant: applying the update to the other cache converges their + // content hashes. + ns_b.put_schema(name, last_update); + assert_eq!(ns_a.content_hash(), ns_b.content_hash()); + } + } +}