Merge pull request #8780 from influxdata/dom/enable-merkle-tracking

feat(router): init anti-entropy merkle search tree
pull/24376/head
Dom 2023-09-20 14:41:30 +01:00 committed by GitHub
commit 28c3637c01
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 79 additions and 19 deletions

1
Cargo.lock generated
View File

@ -3242,6 +3242,7 @@ dependencies = [
"metric",
"mutable_batch",
"object_store",
"observability_deps",
"router",
"thiserror",
"tokio",

View File

@ -19,13 +19,9 @@ ioxd_common = { path = "../ioxd_common" }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch" }
object_store = { workspace = true }
observability_deps = { version = "0.1.0", path = "../observability_deps" }
router = { path = "../router" }
thiserror = "1.0.48"
tokio-util = { version = "0.7.8" }
trace = { path = "../trace" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies]
tokio = { version = "1.32", features = [
"macros",
"net",
@ -35,3 +31,6 @@ tokio = { version = "1.32", features = [
"sync",
"time",
] }
tokio-util = { version = "0.7.8" }
trace = { path = "../trace" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }

View File

@ -14,6 +14,7 @@
use gossip::TopicInterests;
use gossip_schema::{dispatcher::SchemaRx, handle::SchemaTx};
use observability_deps::tracing::info;
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
@ -54,7 +55,11 @@ use router::{
InstrumentationDecorator, Partitioner, RetentionValidator, RpcWrite, SchemaValidator,
},
gossip::{
namespace_cache::NamespaceSchemaGossip, schema_change_observer::SchemaChangeObserver,
anti_entropy::mst::{
actor::AntiEntropyActor, handle::AntiEntropyHandle, merkle::MerkleTree,
},
namespace_cache::NamespaceSchemaGossip,
schema_change_observer::SchemaChangeObserver,
},
namespace_cache::{
metrics::InstrumentedCache, MaybeLayer, MemoryNamespaceCache, NamespaceCache,
@ -260,17 +265,32 @@ pub async fn create_router_server_type(
// Initialise an instrumented namespace cache to be shared with the schema
// validator, and namespace auto-creator that reports cache hit/miss/update
// metrics.
let ns_cache = InstrumentedCache::new(
let ns_cache = Arc::new(InstrumentedCache::new(
ShardedCache::new(std::iter::repeat_with(MemoryNamespaceCache::default).take(10)),
&metrics,
);
));
// Initialise the anti-entropy subsystem, responsible for maintaining a
// Merkle Search Tree that encodes the content of the namespace schema
// cache.
let (actor, mst) = AntiEntropyActor::new(Arc::clone(&ns_cache));
// Start the actor task, which exits when the `mst` handle to it drops.
tokio::spawn(actor.run());
// Pre-warm the cache before adding the gossip layer to avoid broadcasting
// the full cache content at startup.
pre_warm_schema_cache(&ns_cache, &*catalog)
//
// This method blocks until all schemas have been stored in the cache, and
// all schemas have been enqueued for the MST actor to process.
pre_warm_schema_cache(&ns_cache, &*catalog, &mst)
.await
.expect("namespace cache pre-warming failed");
// Now the cache and anti-entropy merkle tree have been pre-populated, wrap
// the namespace cache in an observer that tracks any future changes to the
// cache content, ensuring the MST remains in-sync.
let ns_cache = MerkleTree::new(ns_cache, mst);
// Optionally initialise the schema gossip subsystem.
//
// The schema gossip primitives sit in the stack of NamespaceCache layers:
@ -472,24 +492,39 @@ pub async fn create_router_server_type(
async fn pre_warm_schema_cache<T>(
cache: &T,
catalog: &dyn Catalog,
mst: &AntiEntropyHandle,
) -> Result<(), iox_catalog::interface::Error>
where
T: NamespaceCache,
{
iox_catalog::interface::list_schemas(catalog)
.await?
.for_each(|(ns, schema)| {
let name = NamespaceName::try_from(ns.name)
.expect("cannot convert existing namespace string to a `NamespaceName` instance");
let mut n = 0;
for (ns, schema) in iox_catalog::interface::list_schemas(catalog).await? {
let name = NamespaceName::try_from(ns.name)
.expect("cannot convert existing namespace string to a `NamespaceName` instance");
cache.put_schema(name, schema);
});
cache.put_schema(name.clone(), schema);
mst.observe_update_blocking(name).await;
n += 1;
}
info!(n, "pre-warmed schema cache");
// Calculate the root hash after pre-warming.
//
// This causes the merkle tree pages to be pre-hashed ahead of first use,
// and is guaranteed to happen after the backlog of MST updates has
// completed (schema updates are processed before other operations).
let root_hash = mst.content_hash().await;
info!(%root_hash, "initialised anti-entropy merkle tree");
Ok(())
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use data_types::ColumnType;
use iox_catalog::{
mem::MemCatalog,
@ -513,8 +548,16 @@ mod tests {
drop(repos); // Or it'll deadlock.
let cache = MemoryNamespaceCache::default();
pre_warm_schema_cache(&cache, &*catalog)
let cache = Arc::new(MemoryNamespaceCache::default());
let (actor, mst) = AntiEntropyActor::new(Arc::clone(&cache));
tokio::spawn(actor.run());
// Remember the initial hash of the merkle tree covering the cache
// content.
let initial_hash = mst.content_hash().await;
pre_warm_schema_cache(&cache, &*catalog, &mst)
.await
.expect("pre-warming failed");
@ -525,5 +568,10 @@ mod tests {
.expect("should contain a schema");
assert!(got.tables.get("name").is_some());
// Assert the MST has observed at least one cache update, causing the
// root hashes to diverge.
let now = mst.content_hash().await;
assert_ne!(initial_hash, now);
}
}

View File

@ -97,11 +97,23 @@ impl AntiEntropyHandle {
}
}
/// Send `name` to the MST actor to observe a new schema state.
///
/// This method is the blocking variant of the non-blocking
/// [`AntiEntropyHandle::observe_update()`] that waits for `name` to be
/// successfully enqueued (blocking if the queue is full).
pub async fn observe_update_blocking(&self, name: NamespaceName<'static>) {
self.schema_tx
.send(name)
.await
.expect("mst actor not running");
}
/// Return the current content hash ([`RootHash`]) describing the set of
/// [`NamespaceSchema`] observed so far.
///
/// [`NamespaceSchema`]: data_types::NamespaceSchema
pub(crate) async fn content_hash(&self) -> RootHash {
pub async fn content_hash(&self) -> RootHash {
let (tx, rx) = oneshot::channel();
self.op_tx