diff --git a/router/benches/namespace_schema_cache.rs b/router/benches/namespace_schema_cache.rs index c9014d111f..80a014a57c 100644 --- a/router/benches/namespace_schema_cache.rs +++ b/router/benches/namespace_schema_cache.rs @@ -10,26 +10,31 @@ use data_types::{ }; use iox_catalog::{interface::Catalog, mem::MemCatalog}; use once_cell::sync::Lazy; -use router::namespace_cache::{ - MemoryNamespaceCache, NamespaceCache, ReadThroughCache, ShardedCache, +use router::{ + gossip::anti_entropy::{actor::AntiEntropyActor, merkle::MerkleTree}, + namespace_cache::{MemoryNamespaceCache, NamespaceCache, ReadThroughCache, ShardedCache}, }; static ARBITRARY_NAMESPACE: Lazy> = Lazy::new(|| "bananas".try_into().unwrap()); fn init_ns_cache( + rt: &tokio::runtime::Runtime, initial_schema: impl IntoIterator, NamespaceSchema)>, ) -> impl NamespaceCache { let metrics = Arc::new(metric::Registry::default()); let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); - let cache = Arc::new(ReadThroughCache::new( - Arc::new(ShardedCache::new( - iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), - )), - Arc::clone(&catalog), + let cache = Arc::new(ShardedCache::new( + iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), )); + let (actor, handle) = AntiEntropyActor::new(Arc::clone(&cache)); + rt.spawn(actor.run()); + + let cache = MerkleTree::new(cache, handle); + let cache = Arc::new(ReadThroughCache::new(cache, Arc::clone(&catalog))); + for (name, schema) in initial_schema { cache.put_schema(name, schema); } @@ -66,12 +71,17 @@ fn bench_add_new_tables_with_columns( let initial_schema = generate_namespace_schema(INITIAL_TABLE_COUNT, columns_per_table); let schema_update = generate_namespace_schema(INITIAL_TABLE_COUNT + tables, columns_per_table); + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(4) + .build() + .unwrap(); + group.throughput(Throughput::Elements((tables * columns_per_table) as _)); group.bench_function(format!("{tables}x{columns_per_table}"), |b| { b.iter_batched( || { ( - init_ns_cache([(ARBITRARY_NAMESPACE.clone(), initial_schema.clone())]), + init_ns_cache(&rt, [(ARBITRARY_NAMESPACE.clone(), initial_schema.clone())]), ARBITRARY_NAMESPACE.clone(), schema_update.clone(), ) @@ -92,12 +102,14 @@ fn bench_add_columns_to_existing_table( let initial_schema = generate_namespace_schema(1, initial_column_count); let schema_update = generate_namespace_schema(1, initial_column_count + add_new_columns); + let rt = tokio::runtime::Runtime::new().unwrap(); + group.throughput(Throughput::Elements(add_new_columns as _)); group.bench_function(format!("{initial_column_count}+{add_new_columns}"), |b| { b.iter_batched( || { ( - init_ns_cache([(ARBITRARY_NAMESPACE.clone(), initial_schema.clone())]), + init_ns_cache(&rt, [(ARBITRARY_NAMESPACE.clone(), initial_schema.clone())]), ARBITRARY_NAMESPACE.clone(), schema_update.clone(), ) diff --git a/router/src/gossip/anti_entropy/actor.rs b/router/src/gossip/anti_entropy/actor.rs new file mode 100644 index 0000000000..6690193836 --- /dev/null +++ b/router/src/gossip/anti_entropy/actor.rs @@ -0,0 +1,165 @@ +//! An actor task maintaining [`MerkleSearchTree`] state. +use std::sync::Arc; + +use data_types::{NamespaceName, NamespaceSchema}; +use merkle_search_tree::{digest::RootHash, MerkleSearchTree}; +use observability_deps::tracing::{debug, info, trace}; +use tokio::sync::{mpsc, oneshot}; + +use crate::namespace_cache::{CacheMissErr, NamespaceCache}; + +use super::handle::AntiEntropyHandle; + +/// Requests sent from an [`AntiEntropyHandle`] to an [`AntiEntropyActor`]. +#[derive(Debug)] +pub(super) enum Op { + /// Request the content / merkle tree root hash. + ContentHash(oneshot::Sender), +} + +/// A [`NamespaceCache`] anti-entropy state tracking primitive. +/// +/// This actor maintains a [`MerkleSearchTree`] covering the content of +/// [`NamespaceSchema`] provided to it. +/// +/// [`NamespaceCache`]: crate::namespace_cache::NamespaceCache +#[derive(Debug)] +pub struct AntiEntropyActor { + cache: T, + schema_rx: mpsc::Receiver>, + op_rx: mpsc::Receiver, + + mst: MerkleSearchTree, Arc>, +} + +impl AntiEntropyActor +where + // A cache lookup in the underlying impl MUST be infallible - it's asking + // for existing records, and the cache MUST always return them. + T: NamespaceCache, +{ + /// Initialise a new [`AntiEntropyActor`], and return the + /// [`AntiEntropyHandle`] used to interact with it. + /// + /// The provided cache is used to resolve the most up-to-date + /// [`NamespaceSchema`] for given a [`NamespaceName`], which SHOULD be as + /// fast as possible and MUST be infallible. + pub fn new(cache: T) -> (Self, AntiEntropyHandle) { + // Initialise the queue used to push schema updates. + // + // Filling this queue causes dropped updates to the MST, which in turn + // cause spurious convergence rounds which are expensive relative to a + // bit of worst-case RAM (network trips, CPU for hashing, gossip + // traffic). + // + // Each queue entry has a 64 byte upper limit (excluding queue & pointer + // overhead) because the namespace name has a 64 byte upper bound. This + // means there's an upper (worst case) RAM utilisation bound of 1 MiB + // for these queue values. + let (schema_tx, schema_rx) = mpsc::channel(16_000); + + // A "command" channel for non-schema-update messages. + let (op_tx, op_rx) = mpsc::channel(50); + + ( + Self { + cache, + schema_rx, + op_rx, + mst: MerkleSearchTree::default(), + }, + AntiEntropyHandle::new(op_tx, schema_tx), + ) + } + + /// Block and run the anti-entropy actor until all [`AntiEntropyHandle`] + /// instances for it have been dropped. + pub async fn run(mut self) { + info!("starting anti-entropy state actor"); + + loop { + tokio::select! { + // Bias towards processing schema updates. + // + // This presents the risk of starvation / high latency for + // "operation" commands (sent over op_rx) under heavy update + // load (which is unlikely) at the benefit of ensuring all + // operations occur against an up-to-date MST. + // + // This helps avoid spurious convergence rounds by ensuring all + // updates are always applied as soon as possible. + biased; + + // Immediately apply the available MST update. + Some(name) = self.schema_rx.recv() => self.handle_upsert(name).await, + + // Else process an "operation" command from the actor handle. + Some(op) = self.op_rx.recv() => self.handle_op(op), + + // And stop if both channels have closed. + else => { + info!("stopping anti-entropy state actor"); + return + }, + } + } + } + + async fn handle_upsert(&mut self, name: NamespaceName<'static>) { + let schema = match self.cache.get_schema(&name).await { + Ok(v) => v, + Err(CacheMissErr { .. }) => { + // This is far from ideal as it causes the MST to skip applying + // an update, effectively causing it to diverge from the actual + // cache state. + // + // This will cause spurious convergence runs between peers that + // have applied this update to their MST, in turn causing it to + // converge locally. + // + // If no node applied this update, then this value will not be + // converged between any peers until a subsequent update causes + // a successful MST update on a peer. + // + // Instead the bounds require the only allowable error to be a + // cache miss error (no I/O error or other problem) - this can't + // ever happen, because state updates are enqueued for existing + // schemas, so there MUST always be an entry returned. + panic!("cache miss for namespace schema {}", name); + } + }; + + trace!(%name, ?schema, "applying schema"); + + self.mst.upsert(name, &schema); + } + + fn handle_op(&mut self, op: Op) { + match op { + Op::ContentHash(tx) => { + let root_hash = self.mst.root_hash().clone(); + + debug!(%root_hash, "generated content hash"); + + // The caller may have stopped listening, so ignore any errors. + let _ = tx.send(root_hash); + } + } + } +} + +#[cfg(test)] +mod tests { + use crate::namespace_cache::MemoryNamespaceCache; + + use super::*; + + #[tokio::test] + async fn test_empty_content_hash_fixture() { + let (actor, handle) = AntiEntropyActor::new(Arc::new(MemoryNamespaceCache::default())); + tokio::spawn(actor.run()); + + let got = handle.content_hash().await; + assert_eq!(got.to_string(), "UEnXR4Cj4H1CAqtH1M7y9A=="); + } +} diff --git a/router/src/gossip/anti_entropy/handle.rs b/router/src/gossip/anti_entropy/handle.rs new file mode 100644 index 0000000000..50bca1ef8e --- /dev/null +++ b/router/src/gossip/anti_entropy/handle.rs @@ -0,0 +1,114 @@ +//! A handle to interact with a [`AntiEntropyActor`]. +//! +//! [`AntiEntropyActor`]: super::actor::AntiEntropyActor + +use data_types::NamespaceName; +use merkle_search_tree::digest::RootHash; +use observability_deps::tracing::error; +use tokio::sync::{mpsc, oneshot}; + +use super::actor::Op; + +/// A handle to an [`AntiEntropyActor`]. +/// +/// [`AntiEntropyActor`]: super::actor::AntiEntropyActor +#[derive(Debug, Clone)] +pub struct AntiEntropyHandle { + // Non-schema actor requests. + op_tx: mpsc::Sender, + + // Schema update notifications (prioritised by actor) + schema_tx: mpsc::Sender>, +} + +impl AntiEntropyHandle { + pub(super) fn new( + op_tx: mpsc::Sender, + schema_tx: mpsc::Sender>, + ) -> AntiEntropyHandle { + Self { op_tx, schema_tx } + } + + /// Request the [`MerkleSearchTree`] observe a new update to `name`. + /// + /// This call is cheap - it places `name` into a queue to be processed + /// asynchronously by a background worker. If the queue is saturated, an + /// error is logged and the update is dropped. + /// + /// # Ordering + /// + /// Calls to this method MUST only be made when a subsequent cache lookup + /// would yield a schema for `name`. + /// + /// # Starvation + /// + /// The [`AntiEntropyActor`] prioritises processing upsert requests over all + /// other operations - an extreme rate of calls to this method may adversely + /// affect the latency of other [`AntiEntropyHandle`] methods. + /// + /// [`MerkleSearchTree`]: merkle_search_tree::MerkleSearchTree + /// [`AntiEntropyActor`]: super::actor::AntiEntropyActor + pub(crate) fn observe_update(&self, name: NamespaceName<'static>) { + // NOTE: this doesn't send the actual schema and it likely never should. + // + // If this method sent `(name, schema)` tuples, it would require the + // stream of calls to contain monotonic schemas - that means that + // `schema` must always "go forwards", with subsequent calls containing + // a (non-strict) superset of the content of prior calls. + // + // This invariant can't be preserved in a multi-threaded system where + // updates can be applied to the same cached entry concurrently: + // + // - T1: cache.upsert(name, schema(table_a)) -> schema(table_a) + // - T2: cache.upsert(name, schema(table_b)) -> schema(table_a, table_b) + // - T2: handle.upsert(name, schema(table_a, table_b)) + // - T1: handle.upsert(name, schema(table_a)) + // + // The last call violates the monotonicity requirement - T1 sets the + // anti-entropy state to the pre-merged value containing only table_a, + // overwriting the correct state that reflected both tables. + // + // The monotonic property is provided by the underlying cache + // implementation; namespace schemas are always merged. By providing the + // actor the name of the updated schema, it can read the merged and most + // up to date schema directly from the cache itself, ensuring + // monotonicity of the schemas regardless of call order. + + if self.schema_tx.try_send(name.clone()).is_err() { + // If enqueuing this schema update fails, the MST will become + // out-of-sync w.r.t the content of the cache, and falsely believe + // the update applied to the cache in this schema (if any) has not + // been applied locally. + // + // If this happens, peers may start conflict resolution rounds to + // converge this difference, eventually causing the local node to + // perform a no-op update to the local key, and in turn causing + // another requeue attempt here. + // + // This is bad for efficiency because it causes spurious syncs, but + // does not effect correctness due to the monotonicity of updates. + // + // If every peer hits this same edge case, none of the MSTs will + // contain the updated schema, and no convergence will be attempted + // for this update until a subsequent enqueue for "name" succeeds. + // This does affect correctness, but is exceedingly unlikely, and + // logged for debugging purposes. + error!(%name, "error enqueuing schema update for anti-entropy"); + } + } + + /// Return the current content hash ([`RootHash`]) describing the set of + /// [`NamespaceSchema`] observed so far. + /// + /// [`NamespaceSchema`]: data_types::NamespaceSchema + pub(crate) async fn content_hash(&self) -> RootHash { + let (tx, rx) = oneshot::channel(); + + self.op_tx + .send(Op::ContentHash(tx)) + .await + .expect("anti-entropy actor has stopped"); + + rx.await.expect("anti-entropy actor has stopped") + } +} diff --git a/router/src/gossip/anti_entropy/merkle.rs b/router/src/gossip/anti_entropy/merkle.rs index c2041cfb82..cf6a80cabb 100644 --- a/router/src/gossip/anti_entropy/merkle.rs +++ b/router/src/gossip/anti_entropy/merkle.rs @@ -7,17 +7,17 @@ use std::sync::Arc; use async_trait::async_trait; use data_types::{NamespaceName, NamespaceSchema}; -use merkle_search_tree::MerkleSearchTree; -use parking_lot::Mutex; use crate::namespace_cache::{ChangeStats, NamespaceCache}; +use super::handle::AntiEntropyHandle; + /// A [`NamespaceCache`] decorator that maintains a content hash / consistency /// proof. /// /// This [`MerkleTree`] tracks the content of the underlying [`NamespaceCache`] /// delegate, maintaining a compact, serialisable representation in a -/// [MerkleSearchTree] that can be used to perform efficient differential +/// [`MerkleSearchTree`] that can be used to perform efficient differential /// convergence (anti-entropy) of peers to provide eventual consistency. /// /// # Merge Correctness @@ -35,29 +35,31 @@ use crate::namespace_cache::{ChangeStats, NamespaceCache}; /// /// If two nodes are producing differing hashes for the same underlying content, /// they will appear to never converge. +/// +/// [`MerkleSearchTree`]: merkle_search_tree::MerkleSearchTree #[derive(Debug)] pub struct MerkleTree { inner: T, - mst: Mutex, NamespaceSchema>>, + handle: AntiEntropyHandle, } -impl MerkleTree { +impl MerkleTree +where + T: Send + Sync, +{ /// Initialise a new [`MerkleTree`] that generates a content hash covering /// `inner`. - pub fn new(inner: T) -> Self { - Self { - inner, - mst: Mutex::new(MerkleSearchTree::default()), - } + pub fn new(inner: T, handle: AntiEntropyHandle) -> Self { + Self { inner, handle } } /// Return a 128-bit hash describing the content of the inner `T`. /// /// This hash only covers a subset of schema fields (see /// [`NamespaceContentHash`]). - pub fn content_hash(&self) -> merkle_search_tree::digest::RootHash { - self.mst.lock().root_hash().clone() + pub async fn content_hash(&self) -> merkle_search_tree::digest::RootHash { + self.handle.content_hash().await } } @@ -84,9 +86,8 @@ where // return value (the new content of the cache). let (schema, diff) = self.inner.put_schema(namespace.clone(), schema); - // Intercept the the resulting cache entry state and merge it into the - // merkle tree. - self.mst.lock().upsert(namespace, &schema); + // Attempt to asynchronously update the MST. + self.handle.observe_update(namespace.clone()); // And pass through the return value to the caller. (schema, diff) diff --git a/router/src/gossip/anti_entropy/mod.rs b/router/src/gossip/anti_entropy/mod.rs index 43cb853d03..59586bc632 100644 --- a/router/src/gossip/anti_entropy/mod.rs +++ b/router/src/gossip/anti_entropy/mod.rs @@ -1,5 +1,7 @@ //! Anti-entropy primitives providing eventual consistency over gossip. +pub mod actor; +pub mod handle; pub mod merkle; #[cfg(test)] @@ -7,7 +9,7 @@ mod tests { use std::{collections::BTreeMap, sync::Arc}; use crate::{ - gossip::anti_entropy::merkle::MerkleTree, + gossip::anti_entropy::{actor::AntiEntropyActor, merkle::MerkleTree}, namespace_cache::{MemoryNamespaceCache, NamespaceCache}, }; @@ -112,40 +114,52 @@ mod tests { // An arbitrary namespace with an ID that lies outside of `updates`. last_update in arbitrary_namespace_schema(42_i64..100), ) { - let ns_a = MerkleTree::new(Arc::new(MemoryNamespaceCache::default())); - let ns_b = MerkleTree::new(Arc::new(MemoryNamespaceCache::default())); + tokio::runtime::Runtime::new().unwrap().block_on(async move { + let cache_a = Arc::new(MemoryNamespaceCache::default()); + let cache_b = Arc::new(MemoryNamespaceCache::default()); - // Invariant: two empty namespace caches have the same content hash. - assert_eq!(ns_a.content_hash(), ns_b.content_hash()); + let (actor_a, handle_a) = AntiEntropyActor::new(Arc::clone(&cache_a)); + let (actor_b, handle_b) = AntiEntropyActor::new(Arc::clone(&cache_b)); - for update in updates { - // Generate a unique, deterministic name for this namespace. - let name = name_for_schema(&update); + // Start the MST actors + tokio::spawn(actor_a.run()); + tokio::spawn(actor_b.run()); - // Apply the update (which may be a no-op) to both. - ns_a.put_schema(name.clone(), update.clone()); - ns_b.put_schema(name, update); + let ns_a = MerkleTree::new(cache_a, handle_a.clone()); + let ns_b = MerkleTree::new(cache_b, handle_b.clone()); - // Invariant: after applying the same update, the content hashes - // MUST match (even if this update was a no-op / not an update) - assert_eq!(ns_a.content_hash(), ns_b.content_hash()); - } + // Invariant: two empty namespace caches have the same content hash. + assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await); - // At this point all updates have been applied to both caches. - // - // Add a new cache entry that doesn't yet exist, and assert this - // causes the caches to diverge, and then once again reconverge. - let name = name_for_schema(&last_update); - ns_a.put_schema(name.clone(), last_update.clone()); + for update in updates { + // Generate a unique, deterministic name for this namespace. + let name = name_for_schema(&update); - // Invariant: last_update definitely added new cache content, - // therefore the cache content hashes MUST diverge. - assert_ne!(ns_a.content_hash(), ns_b.content_hash()); + // Apply the update (which may be a no-op) to both. + ns_a.put_schema(name.clone(), update.clone()); + ns_b.put_schema(name, update); - // Invariant: applying the update to the other cache converges their - // content hashes. - ns_b.put_schema(name, last_update); - assert_eq!(ns_a.content_hash(), ns_b.content_hash()); + // Invariant: after applying the same update, the content hashes + // MUST match (even if this update was a no-op / not an update) + assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await); + } + + // At this point all updates have been applied to both caches. + // + // Add a new cache entry that doesn't yet exist, and assert this + // causes the caches to diverge, and then once again reconverge. + let name = name_for_schema(&last_update); + ns_a.put_schema(name.clone(), last_update.clone()); + + // Invariant: last_update definitely added new cache content, + // therefore the cache content hashes MUST diverge. + assert_ne!(handle_a.content_hash().await, handle_b.content_hash().await); + + // Invariant: applying the update to the other cache converges their + // content hashes. + ns_b.put_schema(name, last_update); + assert_eq!(handle_a.content_hash().await, handle_b.content_hash().await); + }); } } }