Merge pull request #8600 from influxdata/dom/merkle-actor

perf: asynchronously maintain merkle search tree
pull/24376/head
Dom 2023-08-30 11:40:13 +01:00 committed by GitHub
commit 1ef6e79aed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 358 additions and 52 deletions

View File

@ -10,26 +10,31 @@ use data_types::{
};
use iox_catalog::{interface::Catalog, mem::MemCatalog};
use once_cell::sync::Lazy;
use router::namespace_cache::{
MemoryNamespaceCache, NamespaceCache, ReadThroughCache, ShardedCache,
use router::{
gossip::anti_entropy::{actor::AntiEntropyActor, merkle::MerkleTree},
namespace_cache::{MemoryNamespaceCache, NamespaceCache, ReadThroughCache, ShardedCache},
};
static ARBITRARY_NAMESPACE: Lazy<NamespaceName<'static>> =
Lazy::new(|| "bananas".try_into().unwrap());
fn init_ns_cache(
rt: &tokio::runtime::Runtime,
initial_schema: impl IntoIterator<Item = (NamespaceName<'static>, NamespaceSchema)>,
) -> impl NamespaceCache {
let metrics = Arc::new(metric::Registry::default());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let cache = Arc::new(ReadThroughCache::new(
Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
)),
Arc::clone(&catalog),
let cache = Arc::new(ShardedCache::new(
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
));
let (actor, handle) = AntiEntropyActor::new(Arc::clone(&cache));
rt.spawn(actor.run());
let cache = MerkleTree::new(cache, handle);
let cache = Arc::new(ReadThroughCache::new(cache, Arc::clone(&catalog)));
for (name, schema) in initial_schema {
cache.put_schema(name, schema);
}
@ -66,12 +71,17 @@ fn bench_add_new_tables_with_columns(
let initial_schema = generate_namespace_schema(INITIAL_TABLE_COUNT, columns_per_table);
let schema_update = generate_namespace_schema(INITIAL_TABLE_COUNT + tables, columns_per_table);
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.build()
.unwrap();
group.throughput(Throughput::Elements((tables * columns_per_table) as _));
group.bench_function(format!("{tables}x{columns_per_table}"), |b| {
b.iter_batched(
|| {
(
init_ns_cache([(ARBITRARY_NAMESPACE.clone(), initial_schema.clone())]),
init_ns_cache(&rt, [(ARBITRARY_NAMESPACE.clone(), initial_schema.clone())]),
ARBITRARY_NAMESPACE.clone(),
schema_update.clone(),
)
@ -92,12 +102,14 @@ fn bench_add_columns_to_existing_table(
let initial_schema = generate_namespace_schema(1, initial_column_count);
let schema_update = generate_namespace_schema(1, initial_column_count + add_new_columns);
let rt = tokio::runtime::Runtime::new().unwrap();
group.throughput(Throughput::Elements(add_new_columns as _));
group.bench_function(format!("{initial_column_count}+{add_new_columns}"), |b| {
b.iter_batched(
|| {
(
init_ns_cache([(ARBITRARY_NAMESPACE.clone(), initial_schema.clone())]),
init_ns_cache(&rt, [(ARBITRARY_NAMESPACE.clone(), initial_schema.clone())]),
ARBITRARY_NAMESPACE.clone(),
schema_update.clone(),
)

View File

@ -0,0 +1,165 @@
//! An actor task maintaining [`MerkleSearchTree`] state.
use std::sync::Arc;
use data_types::{NamespaceName, NamespaceSchema};
use merkle_search_tree::{digest::RootHash, MerkleSearchTree};
use observability_deps::tracing::{debug, info, trace};
use tokio::sync::{mpsc, oneshot};
use crate::namespace_cache::{CacheMissErr, NamespaceCache};
use super::handle::AntiEntropyHandle;
/// Requests sent from an [`AntiEntropyHandle`] to an [`AntiEntropyActor`].
#[derive(Debug)]
pub(super) enum Op {
/// Request the content / merkle tree root hash.
ContentHash(oneshot::Sender<RootHash>),
}
/// A [`NamespaceCache`] anti-entropy state tracking primitive.
///
/// This actor maintains a [`MerkleSearchTree`] covering the content of
/// [`NamespaceSchema`] provided to it.
///
/// [`NamespaceCache`]: crate::namespace_cache::NamespaceCache
#[derive(Debug)]
pub struct AntiEntropyActor<T> {
cache: T,
schema_rx: mpsc::Receiver<NamespaceName<'static>>,
op_rx: mpsc::Receiver<Op>,
mst: MerkleSearchTree<NamespaceName<'static>, Arc<NamespaceSchema>>,
}
impl<T> AntiEntropyActor<T>
where
// A cache lookup in the underlying impl MUST be infallible - it's asking
// for existing records, and the cache MUST always return them.
T: NamespaceCache<ReadError = CacheMissErr>,
{
/// Initialise a new [`AntiEntropyActor`], and return the
/// [`AntiEntropyHandle`] used to interact with it.
///
/// The provided cache is used to resolve the most up-to-date
/// [`NamespaceSchema`] for given a [`NamespaceName`], which SHOULD be as
/// fast as possible and MUST be infallible.
pub fn new(cache: T) -> (Self, AntiEntropyHandle) {
// Initialise the queue used to push schema updates.
//
// Filling this queue causes dropped updates to the MST, which in turn
// cause spurious convergence rounds which are expensive relative to a
// bit of worst-case RAM (network trips, CPU for hashing, gossip
// traffic).
//
// Each queue entry has a 64 byte upper limit (excluding queue & pointer
// overhead) because the namespace name has a 64 byte upper bound. This
// means there's an upper (worst case) RAM utilisation bound of 1 MiB
// for these queue values.
let (schema_tx, schema_rx) = mpsc::channel(16_000);
// A "command" channel for non-schema-update messages.
let (op_tx, op_rx) = mpsc::channel(50);
(
Self {
cache,
schema_rx,
op_rx,
mst: MerkleSearchTree::default(),
},
AntiEntropyHandle::new(op_tx, schema_tx),
)
}
/// Block and run the anti-entropy actor until all [`AntiEntropyHandle`]
/// instances for it have been dropped.
pub async fn run(mut self) {
info!("starting anti-entropy state actor");
loop {
tokio::select! {
// Bias towards processing schema updates.
//
// This presents the risk of starvation / high latency for
// "operation" commands (sent over op_rx) under heavy update
// load (which is unlikely) at the benefit of ensuring all
// operations occur against an up-to-date MST.
//
// This helps avoid spurious convergence rounds by ensuring all
// updates are always applied as soon as possible.
biased;
// Immediately apply the available MST update.
Some(name) = self.schema_rx.recv() => self.handle_upsert(name).await,
// Else process an "operation" command from the actor handle.
Some(op) = self.op_rx.recv() => self.handle_op(op),
// And stop if both channels have closed.
else => {
info!("stopping anti-entropy state actor");
return
},
}
}
}
async fn handle_upsert(&mut self, name: NamespaceName<'static>) {
let schema = match self.cache.get_schema(&name).await {
Ok(v) => v,
Err(CacheMissErr { .. }) => {
// This is far from ideal as it causes the MST to skip applying
// an update, effectively causing it to diverge from the actual
// cache state.
//
// This will cause spurious convergence runs between peers that
// have applied this update to their MST, in turn causing it to
// converge locally.
//
// If no node applied this update, then this value will not be
// converged between any peers until a subsequent update causes
// a successful MST update on a peer.
//
// Instead the bounds require the only allowable error to be a
// cache miss error (no I/O error or other problem) - this can't
// ever happen, because state updates are enqueued for existing
// schemas, so there MUST always be an entry returned.
panic!("cache miss for namespace schema {}", name);
}
};
trace!(%name, ?schema, "applying schema");
self.mst.upsert(name, &schema);
}
fn handle_op(&mut self, op: Op) {
match op {
Op::ContentHash(tx) => {
let root_hash = self.mst.root_hash().clone();
debug!(%root_hash, "generated content hash");
// The caller may have stopped listening, so ignore any errors.
let _ = tx.send(root_hash);
}
}
}
}
#[cfg(test)]
mod tests {
use crate::namespace_cache::MemoryNamespaceCache;
use super::*;
#[tokio::test]
async fn test_empty_content_hash_fixture() {
let (actor, handle) = AntiEntropyActor::new(Arc::new(MemoryNamespaceCache::default()));
tokio::spawn(actor.run());
let got = handle.content_hash().await;
assert_eq!(got.to_string(), "UEnXR4Cj4H1CAqtH1M7y9A==");
}
}

View File

@ -0,0 +1,114 @@
//! A handle to interact with a [`AntiEntropyActor`].
//!
//! [`AntiEntropyActor`]: super::actor::AntiEntropyActor
use data_types::NamespaceName;
use merkle_search_tree::digest::RootHash;
use observability_deps::tracing::error;
use tokio::sync::{mpsc, oneshot};
use super::actor::Op;
/// A handle to an [`AntiEntropyActor`].
///
/// [`AntiEntropyActor`]: super::actor::AntiEntropyActor
#[derive(Debug, Clone)]
pub struct AntiEntropyHandle {
// Non-schema actor requests.
op_tx: mpsc::Sender<Op>,
// Schema update notifications (prioritised by actor)
schema_tx: mpsc::Sender<NamespaceName<'static>>,
}
impl AntiEntropyHandle {
pub(super) fn new(
op_tx: mpsc::Sender<Op>,
schema_tx: mpsc::Sender<NamespaceName<'static>>,
) -> AntiEntropyHandle {
Self { op_tx, schema_tx }
}
/// Request the [`MerkleSearchTree`] observe a new update to `name`.
///
/// This call is cheap - it places `name` into a queue to be processed
/// asynchronously by a background worker. If the queue is saturated, an
/// error is logged and the update is dropped.
///
/// # Ordering
///
/// Calls to this method MUST only be made when a subsequent cache lookup
/// would yield a schema for `name`.
///
/// # Starvation
///
/// The [`AntiEntropyActor`] prioritises processing upsert requests over all
/// other operations - an extreme rate of calls to this method may adversely
/// affect the latency of other [`AntiEntropyHandle`] methods.
///
/// [`MerkleSearchTree`]: merkle_search_tree::MerkleSearchTree
/// [`AntiEntropyActor`]: super::actor::AntiEntropyActor
pub(crate) fn observe_update(&self, name: NamespaceName<'static>) {
// NOTE: this doesn't send the actual schema and it likely never should.
//
// If this method sent `(name, schema)` tuples, it would require the
// stream of calls to contain monotonic schemas - that means that
// `schema` must always "go forwards", with subsequent calls containing
// a (non-strict) superset of the content of prior calls.
//
// This invariant can't be preserved in a multi-threaded system where
// updates can be applied to the same cached entry concurrently:
//
// - T1: cache.upsert(name, schema(table_a)) -> schema(table_a)
// - T2: cache.upsert(name, schema(table_b)) -> schema(table_a, table_b)
// - T2: handle.upsert(name, schema(table_a, table_b))
// - T1: handle.upsert(name, schema(table_a))
//
// The last call violates the monotonicity requirement - T1 sets the
// anti-entropy state to the pre-merged value containing only table_a,
// overwriting the correct state that reflected both tables.
//
// The monotonic property is provided by the underlying cache
// implementation; namespace schemas are always merged. By providing the
// actor the name of the updated schema, it can read the merged and most
// up to date schema directly from the cache itself, ensuring
// monotonicity of the schemas regardless of call order.
if self.schema_tx.try_send(name.clone()).is_err() {
// If enqueuing this schema update fails, the MST will become
// out-of-sync w.r.t the content of the cache, and falsely believe
// the update applied to the cache in this schema (if any) has not
// been applied locally.
//
// If this happens, peers may start conflict resolution rounds to
// converge this difference, eventually causing the local node to
// perform a no-op update to the local key, and in turn causing
// another requeue attempt here.
//
// This is bad for efficiency because it causes spurious syncs, but
// does not effect correctness due to the monotonicity of updates.
//
// If every peer hits this same edge case, none of the MSTs will
// contain the updated schema, and no convergence will be attempted
// for this update until a subsequent enqueue for "name" succeeds.
// This does affect correctness, but is exceedingly unlikely, and
// logged for debugging purposes.
error!(%name, "error enqueuing schema update for anti-entropy");
}
}
/// Return the current content hash ([`RootHash`]) describing the set of
/// [`NamespaceSchema`] observed so far.
///
/// [`NamespaceSchema`]: data_types::NamespaceSchema
pub(crate) async fn content_hash(&self) -> RootHash {
let (tx, rx) = oneshot::channel();
self.op_tx
.send(Op::ContentHash(tx))
.await
.expect("anti-entropy actor has stopped");
rx.await.expect("anti-entropy actor has stopped")
}
}

View File

@ -7,17 +7,17 @@ use std::sync::Arc;
use async_trait::async_trait;
use data_types::{NamespaceName, NamespaceSchema};
use merkle_search_tree::MerkleSearchTree;
use parking_lot::Mutex;
use crate::namespace_cache::{ChangeStats, NamespaceCache};
use super::handle::AntiEntropyHandle;
/// A [`NamespaceCache`] decorator that maintains a content hash / consistency
/// proof.
///
/// This [`MerkleTree`] tracks the content of the underlying [`NamespaceCache`]
/// delegate, maintaining a compact, serialisable representation in a
/// [MerkleSearchTree] that can be used to perform efficient differential
/// [`MerkleSearchTree`] that can be used to perform efficient differential
/// convergence (anti-entropy) of peers to provide eventual consistency.
///
/// # Merge Correctness
@ -35,29 +35,31 @@ use crate::namespace_cache::{ChangeStats, NamespaceCache};
///
/// If two nodes are producing differing hashes for the same underlying content,
/// they will appear to never converge.
///
/// [`MerkleSearchTree`]: merkle_search_tree::MerkleSearchTree
#[derive(Debug)]
pub struct MerkleTree<T> {
inner: T,
mst: Mutex<MerkleSearchTree<NamespaceName<'static>, NamespaceSchema>>,
handle: AntiEntropyHandle,
}
impl<T> MerkleTree<T> {
impl<T> MerkleTree<T>
where
T: Send + Sync,
{
/// Initialise a new [`MerkleTree`] that generates a content hash covering
/// `inner`.
pub fn new(inner: T) -> Self {
Self {
inner,
mst: Mutex::new(MerkleSearchTree::default()),
}
pub fn new(inner: T, handle: AntiEntropyHandle) -> Self {
Self { inner, handle }
}
/// Return a 128-bit hash describing the content of the inner `T`.
///
/// This hash only covers a subset of schema fields (see
/// [`NamespaceContentHash`]).
pub fn content_hash(&self) -> merkle_search_tree::digest::RootHash {
self.mst.lock().root_hash().clone()
pub async fn content_hash(&self) -> merkle_search_tree::digest::RootHash {
self.handle.content_hash().await
}
}
@ -84,9 +86,8 @@ where
// return value (the new content of the cache).
let (schema, diff) = self.inner.put_schema(namespace.clone(), schema);
// Intercept the the resulting cache entry state and merge it into the
// merkle tree.
self.mst.lock().upsert(namespace, &schema);
// Attempt to asynchronously update the MST.
self.handle.observe_update(namespace.clone());
// And pass through the return value to the caller.
(schema, diff)

View File

@ -1,5 +1,7 @@
//! Anti-entropy primitives providing eventual consistency over gossip.
pub mod actor;
pub mod handle;
pub mod merkle;
#[cfg(test)]
@ -7,7 +9,7 @@ mod tests {
use std::{collections::BTreeMap, sync::Arc};
use crate::{
gossip::anti_entropy::merkle::MerkleTree,
gossip::anti_entropy::{actor::AntiEntropyActor, merkle::MerkleTree},
namespace_cache::{MemoryNamespaceCache, NamespaceCache},
};
@ -112,40 +114,52 @@ mod tests {
// An arbitrary namespace with an ID that lies outside of `updates`.
last_update in arbitrary_namespace_schema(42_i64..100),
) {
let ns_a = MerkleTree::new(Arc::new(MemoryNamespaceCache::default()));
let ns_b = MerkleTree::new(Arc::new(MemoryNamespaceCache::default()));
tokio::runtime::Runtime::new().unwrap().block_on(async move {
let cache_a = Arc::new(MemoryNamespaceCache::default());
let cache_b = Arc::new(MemoryNamespaceCache::default());
// Invariant: two empty namespace caches have the same content hash.
assert_eq!(ns_a.content_hash(), ns_b.content_hash());
let (actor_a, handle_a) = AntiEntropyActor::new(Arc::clone(&cache_a));
let (actor_b, handle_b) = AntiEntropyActor::new(Arc::clone(&cache_b));
for update in updates {
// Generate a unique, deterministic name for this namespace.
let name = name_for_schema(&update);
// Start the MST actors
tokio::spawn(actor_a.run());
tokio::spawn(actor_b.run());
// Apply the update (which may be a no-op) to both.
ns_a.put_schema(name.clone(), update.clone());
ns_b.put_schema(name, update);
let ns_a = MerkleTree::new(cache_a, handle_a.clone());
let ns_b = MerkleTree::new(cache_b, handle_b.clone());
// Invariant: after applying the same update, the content hashes
// MUST match (even if this update was a no-op / not an update)
assert_eq!(ns_a.content_hash(), ns_b.content_hash());
}
// Invariant: two empty namespace caches have the same content hash.
assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await);
// At this point all updates have been applied to both caches.
//
// Add a new cache entry that doesn't yet exist, and assert this
// causes the caches to diverge, and then once again reconverge.
let name = name_for_schema(&last_update);
ns_a.put_schema(name.clone(), last_update.clone());
for update in updates {
// Generate a unique, deterministic name for this namespace.
let name = name_for_schema(&update);
// Invariant: last_update definitely added new cache content,
// therefore the cache content hashes MUST diverge.
assert_ne!(ns_a.content_hash(), ns_b.content_hash());
// Apply the update (which may be a no-op) to both.
ns_a.put_schema(name.clone(), update.clone());
ns_b.put_schema(name, update);
// Invariant: applying the update to the other cache converges their
// content hashes.
ns_b.put_schema(name, last_update);
assert_eq!(ns_a.content_hash(), ns_b.content_hash());
// Invariant: after applying the same update, the content hashes
// MUST match (even if this update was a no-op / not an update)
assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await);
}
// At this point all updates have been applied to both caches.
//
// Add a new cache entry that doesn't yet exist, and assert this
// causes the caches to diverge, and then once again reconverge.
let name = name_for_schema(&last_update);
ns_a.put_schema(name.clone(), last_update.clone());
// Invariant: last_update definitely added new cache content,
// therefore the cache content hashes MUST diverge.
assert_ne!(handle_a.content_hash().await, handle_b.content_hash().await);
// Invariant: applying the update to the other cache converges their
// content hashes.
ns_b.put_schema(name, last_update);
assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await);
});
}
}
}