From 36d50d083f3f5425acf79a19a6d2b42316be120f Mon Sep 17 00:00:00 2001 From: Dom Date: Tue, 18 Jan 2022 18:46:58 +0000 Subject: [PATCH] feat: sharder trait & impl This commit defines the Sharder trait that should allow us to implement multiple sharding strategies over a defined set of input types (such as a MutableBatch for writes, DeletePredicate for deletes, etc). This commit also includes a jump hash implementation that consistently shards (table name, namespace) tuples to a given shard for all input types. --- Cargo.lock | 1 + router2/Cargo.toml | 1 + router2/src/lib.rs | 1 + router2/src/sharder/mod.rs | 7 + .../src/sharder/table_namespace_sharder.rs | 208 ++++++++++++++++++ router2/src/sharder/trait.rs | 24 ++ 6 files changed, 242 insertions(+) create mode 100644 router2/src/sharder/mod.rs create mode 100644 router2/src/sharder/table_namespace_sharder.rs create mode 100644 router2/src/sharder/trait.rs diff --git a/Cargo.lock b/Cargo.lock index 04a96ff666..eeaa54af3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3744,6 +3744,7 @@ dependencies = [ "predicate", "serde", "serde_urlencoded", + "siphasher", "thiserror", "time 0.1.0", "tokio", diff --git a/router2/Cargo.toml b/router2/Cargo.toml index d40ab9a51c..b3021bebe6 100644 --- a/router2/Cargo.toml +++ b/router2/Cargo.toml @@ -23,6 +23,7 @@ parking_lot = "0.11" predicate = { path = "../predicate" } serde = "1.0" serde_urlencoded = "0.7" +siphasher = "0.3" thiserror = "1.0" time = { path = "../time" } tokio = { version = "1", features = ["rt-multi-thread", "macros"] } diff --git a/router2/src/lib.rs b/router2/src/lib.rs index 52c529394c..054869cbe0 100644 --- a/router2/src/lib.rs +++ b/router2/src/lib.rs @@ -27,3 +27,4 @@ pub mod dml_handler; pub mod server; +pub mod sharder; diff --git a/router2/src/sharder/mod.rs b/router2/src/sharder/mod.rs new file mode 100644 index 0000000000..74b4ee156f --- /dev/null +++ b/router2/src/sharder/mod.rs @@ -0,0 +1,7 @@ +//! Sharder logic to consistently map operations to a specific sequencer. + +mod r#trait; +pub use r#trait::*; + +mod table_namespace_sharder; +pub use table_namespace_sharder::*; diff --git a/router2/src/sharder/table_namespace_sharder.rs b/router2/src/sharder/table_namespace_sharder.rs new file mode 100644 index 0000000000..11f8eaaa21 --- /dev/null +++ b/router2/src/sharder/table_namespace_sharder.rs @@ -0,0 +1,208 @@ +use std::{ + fmt::Debug, + hash::{Hash, Hasher}, +}; + +use data_types::DatabaseName; +use siphasher::sip::SipHasher13; + +use super::Sharder; + +/// A [`TableNamespaceSharder`] maps operations for a given table in a given +/// namespace consistently to the same shard, irrespective of the operation +/// itself with near perfect distribution. +/// +/// Different instances of a [`TableNamespaceSharder`] using the same seed key, +/// and the same set of shards (in the same order) will always map the same +/// input table & namespace to the same shard `T`. +/// +/// For `N` shards, this type uses `O(N)` memory and `O(ln N)` lookup, utilising +/// Google's [jump hash] internally. Adding 1 additional shard causes +/// approximately `1/N` keys to be remapped. +/// +/// [jump hash]: https://arxiv.org/ftp/arxiv/papers/1406/1406.2294.pdf +#[derive(Debug)] +pub struct TableNamespaceSharder { + hasher: SipHasher13, + shards: Vec, +} + +impl TableNamespaceSharder +where + T: Ord, +{ + /// Initialise a [`TableNamespaceSharder`] that consistently maps keys to + /// one of `shards`. + /// + /// # Correctness + /// + /// Changing the number of, or order of, the elements in `shards` when + /// constructing two instances changes the mapping produced. + /// + /// # Panics + /// + /// This constructor panics if the number of elements in `shards` is 0. + pub fn new(shards: impl IntoIterator) -> Self { + // A randomly generated static siphash key to ensure all router + // instances hash the same input to the same u64 sharding key. + // + // Generated with: xxd -i -l 16 /dev/urandom + let key = [ + 0x6d, 0x83, 0x93, 0x52, 0xa3, 0x7c, 0xe6, 0x02, 0xac, 0x01, 0x11, 0x94, 0x79, 0x0c, + 0x64, 0x42, + ]; + + let shards = shards.into_iter().collect::>(); + assert!( + !shards.is_empty(), + "cannot initialise sharder with no shards" + ); + + Self { + hasher: SipHasher13::new_with_key(&key), + shards, + } + } +} + +impl TableNamespaceSharder { + /// Reinitialise [`Self`] with a new key. + /// + /// Re-keying [`Self`] will change the mapping of inputs to output instances + /// of `T`. + pub fn with_seed_key(self, key: &[u8; 16]) -> Self { + let hasher = SipHasher13::new_with_key(key); + Self { hasher, ..self } + } + + /// Consistently hash `key` to a `T`. + fn hash(&self, key: H) -> &T + where + H: Hash, + { + let mut state = self.hasher; + key.hash(&mut state); + let mut key = state.finish(); + + let mut b = -1; + let mut j = 0; + while j < self.shards.len() as i64 { + b = j; + key = key.wrapping_mul(2862933555777941757).wrapping_add(1); + j = ((b.wrapping_add(1) as f64) * (((1u64 << 31) as f64) / (((key >> 33) + 1) as f64))) + as i64 + } + + assert!(b >= 0); + self.shards + .get(b as usize) + .expect("sharder mapped input to non-existant bucket") + } +} + +impl FromIterator for TableNamespaceSharder +where + T: Ord, +{ + fn from_iter>(iter: U) -> Self { + Self::new(iter) + } +} + +#[derive(Hash)] +struct HashKey<'a> { + table: &'a str, + namespace: &'a str, +} + +/// A [`TableNamespaceSharder`] is generic over `P`, the payload type, enabling +/// it to map any type of payload to a shard as it only considers the table name +/// and namespace when making a sharding decision. +impl Sharder

for TableNamespaceSharder +where + T: Debug + Send + Sync, +{ + type Item = T; + + fn shard(&self, table: &str, namespace: &DatabaseName<'_>, _payload: &P) -> &Self::Item { + // The derived hash impl for HashKey is hardened against prefix + // collisions when combining the two fields. + self.hash(&HashKey { + table, + namespace: namespace.as_ref(), + }) + } +} + +#[cfg(test)] +mod tests { + use hashbrown::HashMap; + + use super::*; + + #[test] + fn test_consistent_hashing() { + const NUM_TESTS: usize = 10_000; + const NUM_SHARDS: usize = 10; + + let hasher = TableNamespaceSharder::new(0..NUM_SHARDS); + + // Create a HashMap to verify against. + let mappings = (0..NUM_TESTS) + .map(|v| { + let shard = hasher.hash(v); + (v, shard) + }) + .collect::>(); + + // Rehash all the same keys and validate they map to the same shard. + // + // The random iteration order of the hashmap asserts the shard output is + // not a function of the order of the keys hashed. + assert!(mappings + .iter() + .all(|(&key, &value)| hasher.hash(key) == value)); + + // Reinitialise the hasher with the same (default) key + let hasher = TableNamespaceSharder::new(0..NUM_SHARDS); + + // And assert the mappings are the same + assert!(mappings + .iter() + .all(|(&key, &value)| hasher.hash(key) == value)); + + // Reinitialise the hasher with the a different key + let hasher = TableNamespaceSharder::new(0..NUM_SHARDS).with_seed_key(&[42; 16]); + + // And assert the mappings are the NOT all same (some may be the same) + assert!(!mappings + .iter() + .all(|(&key, &value)| hasher.hash(key) == value)); + } + + #[test] + fn test_sharder_impl() { + let hasher = TableNamespaceSharder::new(0..10_000); + + let a = hasher.shard("table", &DatabaseName::try_from("namespace").unwrap(), &0); + let b = hasher.shard("table", &DatabaseName::try_from("namespace2").unwrap(), &0); + assert_ne!(a, b); + + let a = hasher.shard("table", &DatabaseName::try_from("namespace").unwrap(), &0); + let b = hasher.shard("table2", &DatabaseName::try_from("namespace").unwrap(), &0); + assert_ne!(a, b); + + // Assert payloads are ignored for this sharder + let a = hasher.shard("table", &DatabaseName::try_from("namespace").unwrap(), &0); + let b = hasher.shard("table", &DatabaseName::try_from("namespace").unwrap(), &42); + assert_eq!(a, b); + } + + #[test] + fn test_sharder_prefix_collision() { + let hasher = TableNamespaceSharder::new(0..10_000); + let a = hasher.shard("a", &DatabaseName::try_from("bc").unwrap(), &0); + let b = hasher.shard("ab", &DatabaseName::try_from("c").unwrap(), &0); + assert_ne!(a, b); + } +} diff --git a/router2/src/sharder/trait.rs b/router2/src/sharder/trait.rs new file mode 100644 index 0000000000..29d8bd08f4 --- /dev/null +++ b/router2/src/sharder/trait.rs @@ -0,0 +1,24 @@ +use std::fmt::Debug; + +use data_types::DatabaseName; + +/// A [`Sharder`] implementation is responsible for mapping an opaque payload +/// for a given table name & namespace to an output type. +/// +/// [`Sharder`] instances can be generic over any payload type (in which case, +/// the implementation operates exclusively on the table name and/or namespace) +/// or they can be implemented for, and inspect, a specific payload type while +/// sharding. +/// +/// NOTE: It is a system invariant that deletes are routed to (all of) the same +/// sequencers as a write for the same table. +pub trait Sharder

: Debug + Send + Sync { + /// The type returned by a sharder. + /// + /// This could be a shard ID, a sequencer, an array of multiple sequencers, + /// etc. + type Item: Debug + Send + Sync; + + /// Map the specified `payload` to a shard. + fn shard(&self, table: &str, namespace: &DatabaseName<'_>, payload: &P) -> &Self::Item; +}