Merge pull request #8776 from influxdata/dom/merkle-snap
feat: merkle search tree snapshot generationpull/24376/head
commit
485dbe9a82
|
@ -2,7 +2,7 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use data_types::{NamespaceName, NamespaceSchema};
|
use data_types::{NamespaceName, NamespaceSchema};
|
||||||
use merkle_search_tree::{digest::RootHash, MerkleSearchTree};
|
use merkle_search_tree::{diff::PageRangeSnapshot, digest::RootHash, MerkleSearchTree};
|
||||||
use observability_deps::tracing::{debug, info, trace};
|
use observability_deps::tracing::{debug, info, trace};
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
|
@ -10,11 +10,17 @@ use crate::namespace_cache::{CacheMissErr, NamespaceCache};
|
||||||
|
|
||||||
use super::handle::AntiEntropyHandle;
|
use super::handle::AntiEntropyHandle;
|
||||||
|
|
||||||
|
/// An alias for a [`PageRangeSnapshot`] for [`NamespaceName`] keys.
|
||||||
|
pub(crate) type MerkleSnapshot = PageRangeSnapshot<NamespaceName<'static>>;
|
||||||
|
|
||||||
/// Requests sent from an [`AntiEntropyHandle`] to an [`AntiEntropyActor`].
|
/// Requests sent from an [`AntiEntropyHandle`] to an [`AntiEntropyActor`].
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(super) enum Op {
|
pub(super) enum Op {
|
||||||
/// Request the content / merkle tree root hash.
|
/// Request the content / merkle tree root hash.
|
||||||
ContentHash(oneshot::Sender<RootHash>),
|
ContentHash(oneshot::Sender<RootHash>),
|
||||||
|
|
||||||
|
/// Request a [`MerkleSnapshot`] of the current MST state.
|
||||||
|
Snapshot(oneshot::Sender<MerkleSnapshot>),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A [`NamespaceCache`] anti-entropy state tracking primitive.
|
/// A [`NamespaceCache`] anti-entropy state tracking primitive.
|
||||||
|
@ -137,15 +143,31 @@ where
|
||||||
fn handle_op(&mut self, op: Op) {
|
fn handle_op(&mut self, op: Op) {
|
||||||
match op {
|
match op {
|
||||||
Op::ContentHash(tx) => {
|
Op::ContentHash(tx) => {
|
||||||
let root_hash = self.mst.root_hash().clone();
|
|
||||||
|
|
||||||
debug!(%root_hash, "generated content hash");
|
|
||||||
|
|
||||||
// The caller may have stopped listening, so ignore any errors.
|
// The caller may have stopped listening, so ignore any errors.
|
||||||
let _ = tx.send(root_hash);
|
let _ = tx.send(self.generate_root().clone());
|
||||||
|
}
|
||||||
|
Op::Snapshot(tx) => {
|
||||||
|
// The root hash must be generated before serialising the page
|
||||||
|
// ranges.
|
||||||
|
self.generate_root();
|
||||||
|
|
||||||
|
let snap = PageRangeSnapshot::from(
|
||||||
|
self.mst
|
||||||
|
.serialise_page_ranges()
|
||||||
|
.expect("root hash generated"),
|
||||||
|
);
|
||||||
|
|
||||||
|
let _ = tx.send(snap);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Generate the MST root hash and log it for debugging purposes.
|
||||||
|
fn generate_root(&mut self) -> &RootHash {
|
||||||
|
let root_hash = self.mst.root_hash();
|
||||||
|
debug!(%root_hash, "generated content hash");
|
||||||
|
root_hash
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|
|
@ -7,7 +7,7 @@ use merkle_search_tree::digest::RootHash;
|
||||||
use observability_deps::tracing::error;
|
use observability_deps::tracing::error;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
use super::actor::Op;
|
use super::actor::{MerkleSnapshot, Op};
|
||||||
|
|
||||||
/// A handle to an [`AntiEntropyActor`].
|
/// A handle to an [`AntiEntropyActor`].
|
||||||
///
|
///
|
||||||
|
@ -111,4 +111,20 @@ impl AntiEntropyHandle {
|
||||||
|
|
||||||
rx.await.expect("anti-entropy actor has stopped")
|
rx.await.expect("anti-entropy actor has stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Obtain a [`MerkleSnapshot`] for the current Merkle Search Tree state.
|
||||||
|
///
|
||||||
|
/// A [`MerkleSnapshot`] is a compact serialised representation of the MST
|
||||||
|
/// state.
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub(crate) async fn snapshot(&self) -> MerkleSnapshot {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
|
||||||
|
self.op_tx
|
||||||
|
.send(Op::Snapshot(tx))
|
||||||
|
.await
|
||||||
|
.expect("anti-entropy actor has stopped");
|
||||||
|
|
||||||
|
rx.await.expect("anti-entropy actor has stopped")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,7 +105,8 @@ mod tests {
|
||||||
|
|
||||||
proptest! {
|
proptest! {
|
||||||
/// Assert that two distinct namespace cache instances return identical
|
/// Assert that two distinct namespace cache instances return identical
|
||||||
/// content hashes after applying a given set of cache updates.
|
/// content hashes and snapshots after applying a given set of cache
|
||||||
|
/// updates.
|
||||||
#[test]
|
#[test]
|
||||||
fn prop_content_hash_diverge_converge(
|
fn prop_content_hash_diverge_converge(
|
||||||
// A variable number of cache entry updates for 2 namespace IDs
|
// A variable number of cache entry updates for 2 namespace IDs
|
||||||
|
@ -132,6 +133,8 @@ mod tests {
|
||||||
|
|
||||||
// Invariant: two empty namespace caches have the same content hash.
|
// Invariant: two empty namespace caches have the same content hash.
|
||||||
assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await);
|
assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await);
|
||||||
|
// Invariant: and the same serialised snapshot content
|
||||||
|
assert_eq!(handle_a.snapshot().await, handle_b.snapshot().await);
|
||||||
|
|
||||||
for update in updates {
|
for update in updates {
|
||||||
// Generate a unique, deterministic name for this namespace.
|
// Generate a unique, deterministic name for this namespace.
|
||||||
|
@ -144,6 +147,8 @@ mod tests {
|
||||||
// Invariant: after applying the same update, the content hashes
|
// Invariant: after applying the same update, the content hashes
|
||||||
// MUST match (even if this update was a no-op / not an update)
|
// MUST match (even if this update was a no-op / not an update)
|
||||||
assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await);
|
assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await);
|
||||||
|
// Invariant: and the same serialised snapshot content
|
||||||
|
assert_eq!(handle_a.snapshot().await, handle_b.snapshot().await);
|
||||||
}
|
}
|
||||||
|
|
||||||
// At this point all updates have been applied to both caches.
|
// At this point all updates have been applied to both caches.
|
||||||
|
@ -156,11 +161,15 @@ mod tests {
|
||||||
// Invariant: last_update definitely added new cache content,
|
// Invariant: last_update definitely added new cache content,
|
||||||
// therefore the cache content hashes MUST diverge.
|
// therefore the cache content hashes MUST diverge.
|
||||||
assert_ne!(handle_a.content_hash().await, handle_b.content_hash().await);
|
assert_ne!(handle_a.content_hash().await, handle_b.content_hash().await);
|
||||||
|
// Invariant: the serialised snapshot content must have diverged
|
||||||
|
assert_ne!(handle_a.snapshot().await, handle_b.snapshot().await);
|
||||||
|
|
||||||
// Invariant: applying the update to the other cache converges their
|
// Invariant: applying the update to the other cache converges their
|
||||||
// content hashes.
|
// content hashes.
|
||||||
ns_b.put_schema(name, last_update);
|
ns_b.put_schema(name, last_update);
|
||||||
assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await);
|
assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await);
|
||||||
|
// Invariant: and the serialised snapshot content converges
|
||||||
|
assert_eq!(handle_a.snapshot().await, handle_b.snapshot().await);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue