From b694b9f494a4c04aad7011bbc8bd8e333c105e0a Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 28 Aug 2023 18:00:21 +0200 Subject: [PATCH] feat(router): Merkle tree content hash for cache Adds a (currently unused) NamespaceCache decorator that observes the post-merge content of the cache to maintain a content hash. This makes use of a Merkle Search Tree (https://inria.hal.science/hal-02303490) to track the CRDT content of the cache in a deterministic structure that allows for synchronisation between peers (itself a CRDT). The hash of two routers' NamespaceCache will be equal iff their cache contents are equal - this can be used to (very cheaply) identify out-of-sync routers, and trigger convergence. The MST structure used here provides functionality to compare two compact MST representations, and identify subsets of the cache that are out-of-sync, allowing for cheap convergence. Note this content hash only covers the tables, their set of columns, and those column schemas - this reflects the fact that only these values may currently be converged by gossip. Future work will enable full convergence of all fields. --- Cargo.lock | 12 +++ router/Cargo.toml | 1 + router/src/gossip/anti_entropy/merkle.rs | 113 +++++++++++++++++++++++ router/src/gossip/anti_entropy/mod.rs | 3 + router/src/gossip/mod.rs | 1 + 5 files changed, 130 insertions(+) create mode 100644 router/src/gossip/anti_entropy/merkle.rs create mode 100644 router/src/gossip/anti_entropy/mod.rs diff --git a/Cargo.lock b/Cargo.lock index c3fcbe4fe2..73d30440bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3460,6 +3460,17 @@ dependencies = [ "autocfg", ] +[[package]] +name = "merkle-search-tree" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6f5f8e25bd481489f8e0389fd4ced21ec6cbaf1a30e3204102df45dc6b04ce5" +dependencies = [ + "base64 0.21.3", + "siphasher 0.3.11", + "tracing", +] + [[package]] name = "metric" version = "0.1.0" @@ -4787,6 +4798,7 @@ dependencies = [ "iox_catalog", "iox_tests", "iox_time", + "merkle-search-tree", "metric", "mutable_batch", "mutable_batch_lp", diff --git a/router/Cargo.toml b/router/Cargo.toml index 59dd66b668..e45ad05c8a 100644 --- a/router/Cargo.toml +++ b/router/Cargo.toml @@ -20,6 +20,7 @@ hashbrown = { workspace = true } hyper = "0.14" iox_catalog = { path = "../iox_catalog" } iox_time = { path = "../iox_time" } +merkle-search-tree = { version = "0.6.0", features = ["tracing"] } metric = { path = "../metric" } mutable_batch = { path = "../mutable_batch" } mutable_batch_lp = { path = "../mutable_batch_lp" } diff --git a/router/src/gossip/anti_entropy/merkle.rs b/router/src/gossip/anti_entropy/merkle.rs new file mode 100644 index 0000000000..bfb240f6be --- /dev/null +++ b/router/src/gossip/anti_entropy/merkle.rs @@ -0,0 +1,113 @@ +//! Maintain a [Merkle Search Tree] covering the content of a +//! [`NamespaceCache`]. +//! +//! [Merkle Search Tree]: https://inria.hal.science/hal-02303490 + +use std::sync::Arc; + +use async_trait::async_trait; +use data_types::{NamespaceName, NamespaceSchema}; +use merkle_search_tree::MerkleSearchTree; +use parking_lot::Mutex; + +use crate::namespace_cache::{ChangeStats, NamespaceCache}; + +/// A [`NamespaceCache`] decorator that maintains a content hash / consistency +/// proof. +/// +/// This [`MerkleTree`] tracks the content of the underlying [`NamespaceCache`] +/// delegate, maintaining a compact, serialisable representation in a +/// [MerkleSearchTree] that can be used to perform efficient differential +/// convergence (anti-entropy) of peers to provide eventual consistency. +/// +/// # Merge Correctness +/// +/// The inner [`NamespaceCache`] implementation MUST commutatively & +/// deterministically merge two [`NamespaceSchema`] to converge (monotonically) +/// towards the same result (gossip payloads are CmRDTs). +/// +/// # Portability +/// +/// This implementation relies on the rust [`Hash`] implementation, which is +/// specifically defined as being allowed to differ across platforms (for +/// example, with differing endianness) and across different Rust complier +/// versions. +/// +/// If two nodes are producing differing hashes for the same underlying content, +/// they will appear to never converge. +#[derive(Debug)] +pub struct MerkleTree { + inner: T, + + mst: Mutex, NamespaceSchema>>, +} + +impl MerkleTree { + /// Initialise a new [`MerkleTree`] that generates a content hash covering + /// `inner`. + pub fn new(inner: T) -> Self { + Self { + inner, + mst: Mutex::new(MerkleSearchTree::default()), + } + } + + /// Return a 128-bit hash describing the content of the inner `T`. + /// + /// This hash only covers a subset of schema fields (see + /// [`NamespaceContentHash`]). + pub fn content_hash(&self) -> merkle_search_tree::digest::RootHash { + self.mst.lock().root_hash().clone() + } +} + +#[async_trait] +impl NamespaceCache for MerkleTree +where + T: NamespaceCache, +{ + type ReadError = T::ReadError; + + async fn get_schema( + &self, + namespace: &NamespaceName<'static>, + ) -> Result, Self::ReadError> { + self.inner.get_schema(namespace).await + } + + fn put_schema( + &self, + namespace: NamespaceName<'static>, + schema: NamespaceSchema, + ) -> (Arc, ChangeStats) { + // Pass the namespace into the inner storage, and evaluate the merged + // return value (the new content of the cache). + let (schema, diff) = self.inner.put_schema(namespace.clone(), schema); + + // Intercept the the resulting cache entry state and merge it into the + // merkle tree. + self.mst.lock().upsert(namespace, &schema); + + // And pass through the return value to the caller. + (schema, diff) + } +} + +/// A [`NamespaceSchema`] decorator that produces a content hash covering fields +/// that SHOULD be converged across gossip peers. +#[derive(Debug)] +struct NamespaceContentHash<'a>(&'a NamespaceSchema); + +impl<'a> std::hash::Hash for NamespaceContentHash<'a> { + fn hash(&self, state: &mut H) { + // Technically the ID does not need to be covered by the content hash + // (the namespace name -> namespace ID is immutable and asserted + // elsewhere) but it's not harmful to include it, and would drive + // detection of a broken mapping invariant. + self.0.id.hash(state); + + // The set of tables, and their schemas MUST form part of the content + // hash as they are part of the content that must be converged. + self.0.tables.hash(state); + } +} diff --git a/router/src/gossip/anti_entropy/mod.rs b/router/src/gossip/anti_entropy/mod.rs new file mode 100644 index 0000000000..8ec7e94688 --- /dev/null +++ b/router/src/gossip/anti_entropy/mod.rs @@ -0,0 +1,3 @@ +//! Anti-entropy primitives providing eventual consistency over gossip. + +pub mod merkle; diff --git a/router/src/gossip/mod.rs b/router/src/gossip/mod.rs index 1a988f083c..96dd72602f 100644 --- a/router/src/gossip/mod.rs +++ b/router/src/gossip/mod.rs @@ -47,6 +47,7 @@ //! [`NamespaceCache`]: crate::namespace_cache::NamespaceCache //! [`SchemaTx`]: gossip_schema::handle::SchemaTx +pub mod anti_entropy; pub mod namespace_cache; pub mod schema_change_observer; pub mod traits;