diff --git a/router/src/gossip/anti_entropy/mst/actor.rs b/router/src/gossip/anti_entropy/mst/actor.rs index 8ce2c3cb43..89b1dab4bc 100644 --- a/router/src/gossip/anti_entropy/mst/actor.rs +++ b/router/src/gossip/anti_entropy/mst/actor.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use data_types::{NamespaceName, NamespaceSchema}; -use merkle_search_tree::{digest::RootHash, MerkleSearchTree}; +use merkle_search_tree::{diff::PageRangeSnapshot, digest::RootHash, MerkleSearchTree}; use observability_deps::tracing::{debug, info, trace}; use tokio::sync::{mpsc, oneshot}; @@ -10,11 +10,17 @@ use crate::namespace_cache::{CacheMissErr, NamespaceCache}; use super::handle::AntiEntropyHandle; +/// An alias for a [`PageRangeSnapshot`] for [`NamespaceName`] keys. +pub(crate) type MerkleSnapshot = PageRangeSnapshot>; + /// Requests sent from an [`AntiEntropyHandle`] to an [`AntiEntropyActor`]. #[derive(Debug)] pub(super) enum Op { /// Request the content / merkle tree root hash. ContentHash(oneshot::Sender), + + /// Request a [`MerkleSnapshot`] of the current MST state. + Snapshot(oneshot::Sender), } /// A [`NamespaceCache`] anti-entropy state tracking primitive. @@ -137,15 +143,31 @@ where fn handle_op(&mut self, op: Op) { match op { Op::ContentHash(tx) => { - let root_hash = self.mst.root_hash().clone(); - - debug!(%root_hash, "generated content hash"); - // The caller may have stopped listening, so ignore any errors. - let _ = tx.send(root_hash); + let _ = tx.send(self.generate_root().clone()); + } + Op::Snapshot(tx) => { + // The root hash must be generated before serialising the page + // ranges. + self.generate_root(); + + let snap = PageRangeSnapshot::from( + self.mst + .serialise_page_ranges() + .expect("root hash generated"), + ); + + let _ = tx.send(snap); } } } + + /// Generate the MST root hash and log it for debugging purposes. + fn generate_root(&mut self) -> &RootHash { + let root_hash = self.mst.root_hash(); + debug!(%root_hash, "generated content hash"); + root_hash + } } #[cfg(test)] diff --git a/router/src/gossip/anti_entropy/mst/handle.rs b/router/src/gossip/anti_entropy/mst/handle.rs index 50bca1ef8e..010afccf13 100644 --- a/router/src/gossip/anti_entropy/mst/handle.rs +++ b/router/src/gossip/anti_entropy/mst/handle.rs @@ -7,7 +7,7 @@ use merkle_search_tree::digest::RootHash; use observability_deps::tracing::error; use tokio::sync::{mpsc, oneshot}; -use super::actor::Op; +use super::actor::{MerkleSnapshot, Op}; /// A handle to an [`AntiEntropyActor`]. /// @@ -111,4 +111,20 @@ impl AntiEntropyHandle { rx.await.expect("anti-entropy actor has stopped") } + + /// Obtain a [`MerkleSnapshot`] for the current Merkle Search Tree state. + /// + /// A [`MerkleSnapshot`] is a compact serialised representation of the MST + /// state. + #[allow(dead_code)] + pub(crate) async fn snapshot(&self) -> MerkleSnapshot { + let (tx, rx) = oneshot::channel(); + + self.op_tx + .send(Op::Snapshot(tx)) + .await + .expect("anti-entropy actor has stopped"); + + rx.await.expect("anti-entropy actor has stopped") + } } diff --git a/router/src/gossip/anti_entropy/mst/mod.rs b/router/src/gossip/anti_entropy/mst/mod.rs index 5a89ddbc1b..53f513a8b5 100644 --- a/router/src/gossip/anti_entropy/mst/mod.rs +++ b/router/src/gossip/anti_entropy/mst/mod.rs @@ -105,7 +105,8 @@ mod tests { proptest! { /// Assert that two distinct namespace cache instances return identical - /// content hashes after applying a given set of cache updates. + /// content hashes and snapshots after applying a given set of cache + /// updates. #[test] fn prop_content_hash_diverge_converge( // A variable number of cache entry updates for 2 namespace IDs @@ -132,6 +133,8 @@ mod tests { // Invariant: two empty namespace caches have the same content hash. assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await); + // Invariant: and the same serialised snapshot content + assert_eq!(handle_a.snapshot().await, handle_b.snapshot().await); for update in updates { // Generate a unique, deterministic name for this namespace. @@ -144,6 +147,8 @@ mod tests { // Invariant: after applying the same update, the content hashes // MUST match (even if this update was a no-op / not an update) assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await); + // Invariant: and the same serialised snapshot content + assert_eq!(handle_a.snapshot().await, handle_b.snapshot().await); } // At this point all updates have been applied to both caches. @@ -156,11 +161,15 @@ mod tests { // Invariant: last_update definitely added new cache content, // therefore the cache content hashes MUST diverge. assert_ne!(handle_a.content_hash().await, handle_b.content_hash().await); + // Invariant: the serialised snapshot content must have diverged + assert_ne!(handle_a.snapshot().await, handle_b.snapshot().await); // Invariant: applying the update to the other cache converges their // content hashes. ns_b.put_schema(name, last_update); assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await); + // Invariant: and the serialised snapshot content converges + assert_eq!(handle_a.snapshot().await, handle_b.snapshot().await); }); } }