feat(router): Merkle tree content hash for cache

Adds a (currently unused) NamespaceCache decorator that observes the
post-merge content of the cache to maintain a content hash.

This makes use of a Merkle Search Tree
(https://inria.hal.science/hal-02303490) to track the CRDT content of
the cache in a deterministic structure that allows for synchronisation
between peers (itself a CRDT).

The hash of two routers' NamespaceCache will be equal iff their cache
contents are equal - this can be used to (very cheaply) identify
out-of-sync routers, and trigger convergence. The MST structure used
here provides functionality to compare two compact MST representations,
and identify subsets of the cache that are out-of-sync, allowing for
cheap convergence.

Note this content hash only covers the tables, their set of columns, and
those column schemas - this reflects the fact that only these values may
currently be converged by gossip. Future work will enable full
convergence of all fields.
pull/24376/head
Dom Dwyer 2023-08-28 18:00:21 +02:00
parent bd4a3fbbb8
commit b694b9f494
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
5 changed files with 130 additions and 0 deletions

12
Cargo.lock generated
View File

@ -3460,6 +3460,17 @@ dependencies = [
"autocfg",
]
[[package]]
name = "merkle-search-tree"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6f5f8e25bd481489f8e0389fd4ced21ec6cbaf1a30e3204102df45dc6b04ce5"
dependencies = [
"base64 0.21.3",
"siphasher 0.3.11",
"tracing",
]
[[package]]
name = "metric"
version = "0.1.0"
@ -4787,6 +4798,7 @@ dependencies = [
"iox_catalog",
"iox_tests",
"iox_time",
"merkle-search-tree",
"metric",
"mutable_batch",
"mutable_batch_lp",

View File

@ -20,6 +20,7 @@ hashbrown = { workspace = true }
hyper = "0.14"
iox_catalog = { path = "../iox_catalog" }
iox_time = { path = "../iox_time" }
merkle-search-tree = { version = "0.6.0", features = ["tracing"] }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch" }
mutable_batch_lp = { path = "../mutable_batch_lp" }

View File

@ -0,0 +1,113 @@
//! Maintain a [Merkle Search Tree] covering the content of a
//! [`NamespaceCache`].
//!
//! [Merkle Search Tree]: https://inria.hal.science/hal-02303490
use std::sync::Arc;
use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema};
use merkle_search_tree::MerkleSearchTree;
use parking_lot::Mutex;
use crate::namespace_cache::{ChangeStats, NamespaceCache};
/// A [`NamespaceCache`] decorator that maintains a content hash / consistency
/// proof.
///
/// This [`MerkleTree`] tracks the content of the underlying [`NamespaceCache`]
/// delegate, maintaining a compact, serialisable representation in a
/// [MerkleSearchTree] that can be used to perform efficient differential
/// convergence (anti-entropy) of peers to provide eventual consistency.
///
/// # Merge Correctness
///
/// The inner [`NamespaceCache`] implementation MUST commutatively &
/// deterministically merge two [`NamespaceSchema`] to converge (monotonically)
/// towards the same result (gossip payloads are CmRDTs).
///
/// # Portability
///
/// This implementation relies on the rust [`Hash`] implementation, which is
/// specifically defined as being allowed to differ across platforms (for
/// example, with differing endianness) and across different Rust complier
/// versions.
///
/// If two nodes are producing differing hashes for the same underlying content,
/// they will appear to never converge.
#[derive(Debug)]
pub struct MerkleTree<T> {
inner: T,
mst: Mutex<MerkleSearchTree<NamespaceName<'static>, NamespaceSchema>>,
}
impl<T> MerkleTree<T> {
/// Initialise a new [`MerkleTree`] that generates a content hash covering
/// `inner`.
pub fn new(inner: T) -> Self {
Self {
inner,
mst: Mutex::new(MerkleSearchTree::default()),
}
}
/// Return a 128-bit hash describing the content of the inner `T`.
///
/// This hash only covers a subset of schema fields (see
/// [`NamespaceContentHash`]).
pub fn content_hash(&self) -> merkle_search_tree::digest::RootHash {
self.mst.lock().root_hash().clone()
}
}
#[async_trait]
impl<T> NamespaceCache for MerkleTree<T>
where
T: NamespaceCache,
{
type ReadError = T::ReadError;
async fn get_schema(
&self,
namespace: &NamespaceName<'static>,
) -> Result<Arc<NamespaceSchema>, Self::ReadError> {
self.inner.get_schema(namespace).await
}
fn put_schema(
&self,
namespace: NamespaceName<'static>,
schema: NamespaceSchema,
) -> (Arc<NamespaceSchema>, ChangeStats) {
// Pass the namespace into the inner storage, and evaluate the merged
// return value (the new content of the cache).
let (schema, diff) = self.inner.put_schema(namespace.clone(), schema);
// Intercept the the resulting cache entry state and merge it into the
// merkle tree.
self.mst.lock().upsert(namespace, &schema);
// And pass through the return value to the caller.
(schema, diff)
}
}
/// A [`NamespaceSchema`] decorator that produces a content hash covering fields
/// that SHOULD be converged across gossip peers.
#[derive(Debug)]
struct NamespaceContentHash<'a>(&'a NamespaceSchema);
impl<'a> std::hash::Hash for NamespaceContentHash<'a> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
// Technically the ID does not need to be covered by the content hash
// (the namespace name -> namespace ID is immutable and asserted
// elsewhere) but it's not harmful to include it, and would drive
// detection of a broken mapping invariant.
self.0.id.hash(state);
// The set of tables, and their schemas MUST form part of the content
// hash as they are part of the content that must be converged.
self.0.tables.hash(state);
}
}

View File

@ -0,0 +1,3 @@
//! Anti-entropy primitives providing eventual consistency over gossip.
pub mod merkle;

View File

@ -47,6 +47,7 @@
//! [`NamespaceCache`]: crate::namespace_cache::NamespaceCache
//! [`SchemaTx`]: gossip_schema::handle::SchemaTx
pub mod anti_entropy;
pub mod namespace_cache;
pub mod schema_change_observer;
pub mod traits;