feat(router2): pre-warm schema cache

Fetch all NamespaceSchema from the catalog and pre-cache them during
router initialisation.

This pre-warming happens in 3 queries, and should significantly reduce
the load on the Postgres instance backing the catalog during
deployments/after router crashes/etc.

Pre-warming occurs during init, before the HTTP endpoints are bound, and
therefore before the /health HTTP endpoint can return OK. This ensures
the router pre-warms the cache before it begins receiving traffic.
pull/24376/head
Dom Dwyer 2022-04-27 17:17:50 +01:00
parent bb8a19b571
commit d7d4de5144
1 changed files with 73 additions and 2 deletions

View File

@ -6,7 +6,7 @@ use std::{
use async_trait::async_trait;
use clap_blocks::write_buffer::WriteBufferConfig;
use data_types2::{PartitionTemplate, TemplatePart};
use data_types2::{DatabaseName, PartitionTemplate, TemplatePart};
use hashbrown::HashMap;
use hyper::{Body, Request, Response};
use iox_catalog::interface::Catalog;
@ -19,7 +19,9 @@ use router2::{
NamespaceAutocreation, Partitioner, SchemaValidator, ShardedWriteBuffer,
WriteSummaryAdapter,
},
namespace_cache::{metrics::InstrumentedCache, MemoryNamespaceCache, ShardedCache},
namespace_cache::{
metrics::InstrumentedCache, MemoryNamespaceCache, NamespaceCache, ShardedCache,
},
sequencer::Sequencer,
server::{grpc::GrpcDelegate, http::HttpDelegate, RouterServer},
sharder::JumpHash,
@ -174,6 +176,10 @@ pub async fn create_router2_server_type(
&*metrics,
));
pre_warm_schema_cache(&ns_cache, &*catalog)
.await
.expect("namespace cache pre-warming failed");
// Initialise and instrument the schema validator
let schema_validator =
SchemaValidator::new(Arc::clone(&catalog), Arc::clone(&ns_cache), &*metrics);
@ -318,3 +324,68 @@ async fn init_write_buffer(
.collect::<JumpHash<_>>(),
))
}
/// Pre-populate `cache` with the all existing schemas in `catalog`.
async fn pre_warm_schema_cache<T>(
cache: &T,
catalog: &dyn Catalog,
) -> Result<(), iox_catalog::interface::Error>
where
T: NamespaceCache,
{
iox_catalog::interface::list_schemas(catalog)
.await?
.for_each(|(ns, schema)| {
let name = DatabaseName::try_from(ns.name)
.expect("cannot convert existing namespace name to database name");
cache.put_schema(name, schema);
});
Ok(())
}
#[cfg(test)]
mod tests {
use data_types2::ColumnType;
use iox_catalog::mem::MemCatalog;
use super::*;
#[tokio::test]
async fn test_pre_warm_cache() {
let catalog = Arc::new(MemCatalog::new(Default::default()));
let mut repos = catalog.repositories().await;
let kafka = repos.kafka_topics().create_or_get("foo").await.unwrap();
let pool = repos.query_pools().create_or_get("foo").await.unwrap();
let namespace = repos
.namespaces()
.create("test_ns", "inf", kafka.id, pool.id)
.await
.unwrap();
let table = repos
.tables()
.create_or_get("name", namespace.id)
.await
.unwrap();
let _column = repos
.columns()
.create_or_get("name", table.id, ColumnType::U64)
.await
.unwrap();
drop(repos); // Or it'll deadlock.
let cache = Arc::new(MemoryNamespaceCache::default());
pre_warm_schema_cache(&cache, &*catalog)
.await
.expect("pre-warming failed");
let name = DatabaseName::new("test_ns").unwrap();
let got = cache.get_schema(&name).expect("should contain a schema");
assert!(got.tables.get("name").is_some());
}
}