feat: querier test system, ground work (#3991)

* feat: querier test system, ground work

See #3985 for the motivation.

This introduces a cache system for the querier which can later be
extended to support the remaining features listed in #3985 (e.g.
metrics, LRU/TTL).

All current caches are wired up to go throw the new cache system. Once
we move away from (ab)using `db`, the set of caches will be different
but the system will remain.

* test: explain it

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>

* refactor: simplify cache result broadcast

* refactor: introduce `Loader` crate

* fix: docs

* docs: explain why we manually drop removed hashmap entries

* docs: fix intra-doc link

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-03-10 11:27:24 +00:00 committed by GitHub
parent 59d6bee531
commit 30d1c77d36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 906 additions and 349 deletions

View File

@ -1,341 +0,0 @@
use backoff::{Backoff, BackoffConfig};
use data_types2::{NamespaceId, PartitionId, SequencerId, TableId};
use iox_catalog::interface::Catalog;
use parking_lot::RwLock;
use std::{collections::HashMap, sync::Arc};
/// Caches request to the [`Catalog`].
#[derive(Debug)]
pub struct CatalogCache {
/// Backoff config for IO operations.
backoff_config: BackoffConfig,
/// Catalog.
catalog: Arc<dyn Catalog>,
/// Partition keys cache for old gen.
old_gen_partition_key_cache: RwLock<HashMap<PartitionId, Arc<str>>>,
/// Partition key and sequencer ID cache.
partition_cache: RwLock<HashMap<PartitionId, (Arc<str>, SequencerId)>>,
/// Table name and namespace cache.
table_cache: RwLock<HashMap<TableId, (Arc<str>, NamespaceId)>>,
/// Namespace name cache.
namespace_cache: RwLock<HashMap<NamespaceId, Arc<str>>>,
}
impl CatalogCache {
/// Create empty cache.
pub fn new(catalog: Arc<dyn Catalog>) -> Self {
Self {
backoff_config: BackoffConfig::default(),
catalog,
old_gen_partition_key_cache: RwLock::new(HashMap::default()),
partition_cache: RwLock::new(HashMap::default()),
table_cache: RwLock::new(HashMap::default()),
namespace_cache: RwLock::new(HashMap::default()),
}
}
/// Get underlying catalog
pub fn catalog(&self) -> Arc<dyn Catalog> {
Arc::clone(&self.catalog)
}
/// Get partition key for old gen.
///
/// This either uses a cached value or -- if required -- creates a fresh string.
pub async fn old_gen_partition_key(&self, partition_id: PartitionId) -> Arc<str> {
if let Some(key) = self.old_gen_partition_key_cache.read().get(&partition_id) {
return Arc::clone(key);
}
let (partition_key, sequencer_id) = self.cached_partition(partition_id).await;
let og_partition_key = Arc::from(format!("{}-{}", sequencer_id.get(), partition_key));
Arc::clone(
self.old_gen_partition_key_cache
.write()
.entry(partition_id)
.or_insert(og_partition_key),
)
}
/// Get the partition key and sequencer ID for the given partition ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
async fn cached_partition(&self, partition_id: PartitionId) -> (Arc<str>, SequencerId) {
if let Some((key, id)) = self.partition_cache.read().get(&partition_id) {
return (Arc::clone(key), *id);
}
let partition = Backoff::new(&self.backoff_config)
.retry_all_errors("get partition_key", || async {
self.catalog
.repositories()
.await
.partitions()
.get_by_id(partition_id)
.await
})
.await
.expect("retry forever")
.expect("partition gone from catalog?!");
let key = Arc::from(partition.partition_key);
let mut partition_cache = self.partition_cache.write();
let (key, id) = partition_cache
.entry(partition_id)
.or_insert((key, partition.sequencer_id));
(Arc::clone(key), *id)
}
/// Get the table name and namespace ID for the given table ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
async fn cached_table(&self, table_id: TableId) -> (Arc<str>, NamespaceId) {
if let Some((name, ns)) = self.table_cache.read().get(&table_id) {
return (Arc::clone(name), *ns);
}
let table = Backoff::new(&self.backoff_config)
.retry_all_errors("get table_name", || async {
self.catalog
.repositories()
.await
.tables()
.get_by_id(table_id)
.await
})
.await
.expect("retry forever")
.expect("table gone from catalog?!");
let name = Arc::from(table.name);
let mut table_cache = self.table_cache.write();
let (name, ns) = table_cache
.entry(table_id)
.or_insert((name, table.namespace_id));
(Arc::clone(name), *ns)
}
/// Get the table name for the given table ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
pub async fn table_name(&self, table_id: TableId) -> Arc<str> {
self.cached_table(table_id).await.0
}
/// Get the table namespace ID for the given table ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
pub async fn table_namespace_id(&self, table_id: TableId) -> NamespaceId {
self.cached_table(table_id).await.1
}
/// Get the namespace name for the given namespace ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
pub async fn namespace_name(&self, namespace_id: NamespaceId) -> Arc<str> {
if let Some(name) = self.namespace_cache.read().get(&namespace_id) {
return Arc::clone(name);
}
let namespace = Backoff::new(&self.backoff_config)
.retry_all_errors("get namespace_name", || async {
self.catalog
.repositories()
.await
.namespaces()
.get_by_id(namespace_id)
.await
})
.await
.expect("retry forever")
.expect("namespace gone from catalog?!");
let name = Arc::from(namespace.name);
Arc::clone(
self.namespace_cache
.write()
.entry(namespace_id)
.or_insert(name),
)
}
}
#[cfg(test)]
mod tests {
use metric::{Attributes, Metric, U64Histogram};
use crate::test_util::TestCatalog;
use super::*;
#[tokio::test]
async fn test_namespace_name() {
let catalog = TestCatalog::new();
let ns1 = catalog.create_namespace("ns1").await.namespace.clone();
let ns2 = catalog.create_namespace("ns2").await.namespace.clone();
assert_ne!(ns1.id, ns2.id);
let cache = CatalogCache::new(catalog.catalog());
let name1_a = cache.namespace_name(ns1.id).await;
assert_eq!(name1_a.as_ref(), ns1.name.as_str());
assert_metric(&catalog.metric_registry, "namespace_get_by_id", 1);
let name2 = cache.namespace_name(ns2.id).await;
assert_eq!(name2.as_ref(), ns2.name.as_str());
assert_metric(&catalog.metric_registry, "namespace_get_by_id", 2);
let name1_b = cache.namespace_name(ns1.id).await;
assert!(Arc::ptr_eq(&name1_a, &name1_b));
assert_metric(&catalog.metric_registry, "namespace_get_by_id", 2);
}
#[tokio::test]
async fn test_table_name() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
let t1 = ns.create_table("table1").await.table.clone();
let t2 = ns.create_table("table2").await.table.clone();
assert_ne!(t1.id, t2.id);
let cache = CatalogCache::new(catalog.catalog());
let name1_a = cache.table_name(t1.id).await;
assert_eq!(name1_a.as_ref(), t1.name.as_str());
assert_metric(&catalog.metric_registry, "table_get_by_id", 1);
let name2 = cache.table_name(t2.id).await;
assert_eq!(name2.as_ref(), t2.name.as_str());
assert_metric(&catalog.metric_registry, "table_get_by_id", 2);
let name1_b = cache.table_name(t1.id).await;
assert!(Arc::ptr_eq(&name1_a, &name1_b));
assert_metric(&catalog.metric_registry, "table_get_by_id", 2);
}
#[tokio::test]
async fn test_table_namespace_id() {
let catalog = TestCatalog::new();
let ns2 = catalog.create_namespace("ns1").await;
let ns1 = catalog.create_namespace("ns2").await;
let t1 = ns1.create_table("table1").await.table.clone();
let t2 = ns2.create_table("table2").await.table.clone();
assert_ne!(t1.id, t2.id);
assert_ne!(t1.namespace_id, t2.namespace_id);
let cache = CatalogCache::new(catalog.catalog());
let id1_a = cache.table_namespace_id(t1.id).await;
assert_eq!(id1_a, t1.namespace_id);
assert_metric(&catalog.metric_registry, "table_get_by_id", 1);
let id2 = cache.table_namespace_id(t2.id).await;
assert_eq!(id2, t2.namespace_id);
assert_metric(&catalog.metric_registry, "table_get_by_id", 2);
let id1_b = cache.table_namespace_id(t1.id).await;
assert_eq!(id1_b, t1.namespace_id);
assert_metric(&catalog.metric_registry, "table_get_by_id", 2);
}
#[tokio::test]
async fn test_table_shared_cache() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
let t1 = ns.create_table("table1").await.table.clone();
let t2 = ns.create_table("table2").await.table.clone();
assert_ne!(t1.id, t2.id);
let cache = CatalogCache::new(catalog.catalog());
cache.table_name(t1.id).await;
cache.table_namespace_id(t2.id).await;
assert_metric(&catalog.metric_registry, "table_get_by_id", 2);
// `table_name` and `table_namespace_id` use the same underlying cache
cache.table_namespace_id(t1.id).await;
cache.table_name(t2.id).await;
assert_metric(&catalog.metric_registry, "table_get_by_id", 2);
}
#[tokio::test]
async fn test_old_partition_key() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
let t = ns.create_table("table").await;
let s1 = ns.create_sequencer(1).await;
let s2 = ns.create_sequencer(2).await;
let p11 = t
.with_sequencer(&s1)
.create_partition("k1")
.await
.partition
.clone();
let p12 = t
.with_sequencer(&s2)
.create_partition("k1")
.await
.partition
.clone();
let p21 = t
.with_sequencer(&s1)
.create_partition("k2")
.await
.partition
.clone();
let p22 = t
.with_sequencer(&s2)
.create_partition("k2")
.await
.partition
.clone();
let cache = CatalogCache::new(catalog.catalog());
let name11_a = cache.old_gen_partition_key(p11.id).await;
assert_eq!(name11_a.as_ref(), "1-k1");
assert_metric(&catalog.metric_registry, "partition_get_by_id", 1);
let name12 = cache.old_gen_partition_key(p12.id).await;
assert_eq!(name12.as_ref(), "2-k1");
assert_metric(&catalog.metric_registry, "partition_get_by_id", 2);
let name21 = cache.old_gen_partition_key(p21.id).await;
assert_eq!(name21.as_ref(), "1-k2");
assert_metric(&catalog.metric_registry, "partition_get_by_id", 3);
let name22 = cache.old_gen_partition_key(p22.id).await;
assert_eq!(name22.as_ref(), "2-k2");
assert_metric(&catalog.metric_registry, "partition_get_by_id", 4);
let name11_b = cache.old_gen_partition_key(p11.id).await;
assert!(Arc::ptr_eq(&name11_a, &name11_b));
assert_metric(&catalog.metric_registry, "partition_get_by_id", 4);
}
fn assert_metric(metrics: &metric::Registry, name: &'static str, n: u64) {
let histogram = metrics
.get_instrument::<Metric<U64Histogram>>("catalog_op_duration_ms")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[("op", name), ("result", "success")]))
.expect("failed to get observer")
.fetch();
let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count);
assert_eq!(hit_count, n);
}
}

66
querier/src/cache/mod.rs vendored Normal file
View File

@ -0,0 +1,66 @@
use backoff::BackoffConfig;
use iox_catalog::interface::Catalog;
use std::sync::Arc;
use self::{namespace::NamespaceCache, partition::PartitionCache, table::TableCache};
pub mod namespace;
pub mod partition;
pub mod table;
#[cfg(test)]
mod test_util;
/// Caches request to the [`Catalog`].
#[derive(Debug)]
pub struct CatalogCache {
/// Catalog.
catalog: Arc<dyn Catalog>,
/// Partition cache.
partition_cache: PartitionCache,
/// Table cache.
table_cache: TableCache,
/// Namespace cache.
namespace_cache: NamespaceCache,
}
impl CatalogCache {
/// Create empty cache.
pub fn new(catalog: Arc<dyn Catalog>) -> Self {
let backoff_config = BackoffConfig::default();
let namespace_cache = NamespaceCache::new(Arc::clone(&catalog), backoff_config.clone());
let table_cache = TableCache::new(Arc::clone(&catalog), backoff_config.clone());
let partition_cache = PartitionCache::new(Arc::clone(&catalog), backoff_config);
Self {
catalog,
partition_cache,
table_cache,
namespace_cache,
}
}
/// Get underlying catalog
pub fn catalog(&self) -> Arc<dyn Catalog> {
Arc::clone(&self.catalog)
}
/// Namespace cache
pub fn namespace(&self) -> &NamespaceCache {
&self.namespace_cache
}
/// Table cache
pub fn table(&self) -> &TableCache {
&self.table_cache
}
/// Partition cache
pub fn partition(&self) -> &PartitionCache {
&self.partition_cache
}
}

88
querier/src/cache/namespace.rs vendored Normal file
View File

@ -0,0 +1,88 @@
use std::sync::Arc;
use backoff::{Backoff, BackoffConfig};
use data_types2::NamespaceId;
use iox_catalog::interface::Catalog;
use crate::cache_system::{driver::Cache, loader::FunctionLoader};
/// Cache for namespace-related attributes.
#[derive(Debug)]
pub struct NamespaceCache {
cache: Cache<NamespaceId, CachedNamespace>,
}
impl NamespaceCache {
/// Create new empty cache.
pub fn new(catalog: Arc<dyn Catalog>, backoff_config: BackoffConfig) -> Self {
let loader = Arc::new(FunctionLoader::new(move |namespace_id| {
let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone();
async move {
let namespace = Backoff::new(&backoff_config)
.retry_all_errors("get namespace_name", || async {
catalog
.repositories()
.await
.namespaces()
.get_by_id(namespace_id)
.await
})
.await
.expect("retry forever")
.expect("namespace gone from catalog?!");
CachedNamespace {
name: Arc::from(namespace.name),
}
}
}));
Self {
cache: Cache::new(loader),
}
}
/// Get the namespace name for the given namespace ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
pub async fn name(&self, id: NamespaceId) -> Arc<str> {
self.cache.get(id).await.name
}
}
#[derive(Debug, Clone)]
struct CachedNamespace {
name: Arc<str>,
}
#[cfg(test)]
mod tests {
use crate::{cache::test_util::assert_histogram_metric_count, test_util::TestCatalog};
use super::*;
#[tokio::test]
async fn test_name() {
let catalog = TestCatalog::new();
let ns1 = catalog.create_namespace("ns1").await.namespace.clone();
let ns2 = catalog.create_namespace("ns2").await.namespace.clone();
assert_ne!(ns1.id, ns2.id);
let cache = NamespaceCache::new(catalog.catalog(), BackoffConfig::default());
let name1_a = cache.name(ns1.id).await;
assert_eq!(name1_a.as_ref(), ns1.name.as_str());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_id", 1);
let name2 = cache.name(ns2.id).await;
assert_eq!(name2.as_ref(), ns2.name.as_str());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_id", 2);
let name1_b = cache.name(ns1.id).await;
assert!(Arc::ptr_eq(&name1_a, &name1_b));
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_id", 2);
}
}

125
querier/src/cache/partition.rs vendored Normal file
View File

@ -0,0 +1,125 @@
use std::sync::Arc;
use backoff::{Backoff, BackoffConfig};
use data_types2::PartitionId;
use iox_catalog::interface::Catalog;
use crate::cache_system::{driver::Cache, loader::FunctionLoader};
/// Cache for partition-related attributes.
#[derive(Debug)]
pub struct PartitionCache {
cache: Cache<PartitionId, CachedPartition>,
}
impl PartitionCache {
/// Create new empty cache.
pub fn new(catalog: Arc<dyn Catalog>, backoff_config: BackoffConfig) -> Self {
let loader = Arc::new(FunctionLoader::new(move |partition_id| {
let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone();
async move {
let partition = Backoff::new(&backoff_config)
.retry_all_errors("get partition_key", || async {
catalog
.repositories()
.await
.partitions()
.get_by_id(partition_id)
.await
})
.await
.expect("retry forever")
.expect("partition gone from catalog?!");
CachedPartition {
old_gen_partition_key: Arc::from(format!(
"{}-{}",
partition.sequencer_id.get(),
partition.partition_key
)),
}
}
}));
Self {
cache: Cache::new(loader),
}
}
/// Get partition key for old gen.
///
/// This either uses a cached value or -- if required -- creates a fresh string.
pub async fn old_gen_partition_key(&self, partition_id: PartitionId) -> Arc<str> {
self.cache.get(partition_id).await.old_gen_partition_key
}
}
#[derive(Debug, Clone)]
struct CachedPartition {
old_gen_partition_key: Arc<str>,
}
#[cfg(test)]
mod tests {
use crate::{cache::test_util::assert_histogram_metric_count, test_util::TestCatalog};
use super::*;
#[tokio::test]
async fn test_old_partition_key() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
let t = ns.create_table("table").await;
let s1 = ns.create_sequencer(1).await;
let s2 = ns.create_sequencer(2).await;
let p11 = t
.with_sequencer(&s1)
.create_partition("k1")
.await
.partition
.clone();
let p12 = t
.with_sequencer(&s2)
.create_partition("k1")
.await
.partition
.clone();
let p21 = t
.with_sequencer(&s1)
.create_partition("k2")
.await
.partition
.clone();
let p22 = t
.with_sequencer(&s2)
.create_partition("k2")
.await
.partition
.clone();
let cache = PartitionCache::new(catalog.catalog(), BackoffConfig::default());
let name11_a = cache.old_gen_partition_key(p11.id).await;
assert_eq!(name11_a.as_ref(), "1-k1");
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1);
let name12 = cache.old_gen_partition_key(p12.id).await;
assert_eq!(name12.as_ref(), "2-k1");
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2);
let name21 = cache.old_gen_partition_key(p21.id).await;
assert_eq!(name21.as_ref(), "1-k2");
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3);
let name22 = cache.old_gen_partition_key(p22.id).await;
assert_eq!(name22.as_ref(), "2-k2");
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
let name11_b = cache.old_gen_partition_key(p11.id).await;
assert!(Arc::ptr_eq(&name11_a, &name11_b));
assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 4);
}
}

145
querier/src/cache/table.rs vendored Normal file
View File

@ -0,0 +1,145 @@
use std::sync::Arc;
use backoff::{Backoff, BackoffConfig};
use data_types2::{NamespaceId, TableId};
use iox_catalog::interface::Catalog;
use crate::cache_system::{driver::Cache, loader::FunctionLoader};
/// Cache for table-related queries.
#[derive(Debug)]
pub struct TableCache {
cache: Cache<TableId, CachedTable>,
}
impl TableCache {
/// Create new empty cache.
pub fn new(catalog: Arc<dyn Catalog>, backoff_config: BackoffConfig) -> Self {
let loader = Arc::new(FunctionLoader::new(move |table_id| {
let catalog = Arc::clone(&catalog);
let backoff_config = backoff_config.clone();
async move {
let table = Backoff::new(&backoff_config)
.retry_all_errors("get table_name", || async {
catalog
.repositories()
.await
.tables()
.get_by_id(table_id)
.await
})
.await
.expect("retry forever")
.expect("table gone from catalog?!");
CachedTable {
name: Arc::from(table.name),
namespace_id: table.namespace_id,
}
}
}));
Self {
cache: Cache::new(loader),
}
}
/// Get the table name for the given table ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
pub async fn name(&self, table_id: TableId) -> Arc<str> {
self.cache.get(table_id).await.name
}
/// Get the table namespace ID for the given table ID.
///
/// This either uses a cached value or -- if required -- fetches the mapping from the catalog.
pub async fn namespace_id(&self, table_id: TableId) -> NamespaceId {
self.cache.get(table_id).await.namespace_id
}
}
#[derive(Debug, Clone)]
struct CachedTable {
name: Arc<str>,
namespace_id: NamespaceId,
}
#[cfg(test)]
mod tests {
use crate::{cache::test_util::assert_histogram_metric_count, test_util::TestCatalog};
use super::*;
#[tokio::test]
async fn test_table_name() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
let t1 = ns.create_table("table1").await.table.clone();
let t2 = ns.create_table("table2").await.table.clone();
assert_ne!(t1.id, t2.id);
let cache = TableCache::new(catalog.catalog(), BackoffConfig::default());
let name1_a = cache.name(t1.id).await;
assert_eq!(name1_a.as_ref(), t1.name.as_str());
assert_histogram_metric_count(&catalog.metric_registry, "table_get_by_id", 1);
let name2 = cache.name(t2.id).await;
assert_eq!(name2.as_ref(), t2.name.as_str());
assert_histogram_metric_count(&catalog.metric_registry, "table_get_by_id", 2);
let name1_b = cache.name(t1.id).await;
assert!(Arc::ptr_eq(&name1_a, &name1_b));
assert_histogram_metric_count(&catalog.metric_registry, "table_get_by_id", 2);
}
#[tokio::test]
async fn test_table_namespace_id() {
let catalog = TestCatalog::new();
let ns2 = catalog.create_namespace("ns1").await;
let ns1 = catalog.create_namespace("ns2").await;
let t1 = ns1.create_table("table1").await.table.clone();
let t2 = ns2.create_table("table2").await.table.clone();
assert_ne!(t1.id, t2.id);
assert_ne!(t1.namespace_id, t2.namespace_id);
let cache = TableCache::new(catalog.catalog(), BackoffConfig::default());
let id1_a = cache.namespace_id(t1.id).await;
assert_eq!(id1_a, t1.namespace_id);
assert_histogram_metric_count(&catalog.metric_registry, "table_get_by_id", 1);
let id2 = cache.namespace_id(t2.id).await;
assert_eq!(id2, t2.namespace_id);
assert_histogram_metric_count(&catalog.metric_registry, "table_get_by_id", 2);
let id1_b = cache.namespace_id(t1.id).await;
assert_eq!(id1_b, t1.namespace_id);
assert_histogram_metric_count(&catalog.metric_registry, "table_get_by_id", 2);
}
#[tokio::test]
async fn test_table_shared_cache() {
let catalog = TestCatalog::new();
let ns = catalog.create_namespace("ns").await;
let t1 = ns.create_table("table1").await.table.clone();
let t2 = ns.create_table("table2").await.table.clone();
assert_ne!(t1.id, t2.id);
let cache = TableCache::new(catalog.catalog(), BackoffConfig::default());
cache.name(t1.id).await;
cache.namespace_id(t2.id).await;
assert_histogram_metric_count(&catalog.metric_registry, "table_get_by_id", 2);
// `name` and `namespace_id` use the same underlying cache
cache.namespace_id(t1.id).await;
cache.name(t2.id).await;
assert_histogram_metric_count(&catalog.metric_registry, "table_get_by_id", 2);
}
}

13
querier/src/cache/test_util.rs vendored Normal file
View File

@ -0,0 +1,13 @@
use metric::{Attributes, Metric, U64Histogram};
pub fn assert_histogram_metric_count(metrics: &metric::Registry, name: &'static str, n: u64) {
let histogram = metrics
.get_instrument::<Metric<U64Histogram>>("catalog_op_duration_ms")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[("op", name), ("result", "success")]))
.expect("failed to get observer")
.fetch();
let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count);
assert_eq!(hit_count, n);
}

View File

@ -0,0 +1,397 @@
use std::{collections::HashMap, hash::Hash, sync::Arc};
use futures::{
future::{BoxFuture, Shared},
FutureExt, TryFutureExt,
};
use parking_lot::Mutex;
use tokio::{sync::oneshot::error::RecvError, task::JoinHandle};
use super::loader::Loader;
/// High-level cache implementation.
///
/// # Concurrency
/// Multiple cache requests for different keys can run at the same time. When data is requested for the same key the
/// underlying loader will only be polled once, even when the requests are made while the loader is still running.
///
/// # Cancellation
/// Canceling a [`get`](Self::get) request will NOT cancel the underlying loader. The data will still be cached.
///
/// # Panic
/// If the underlying loader panics, all currently running [`get`](Self::get) requests will panic. The data will NOT be cached.
#[derive(Debug)]
pub struct Cache<K, V>
where
K: Clone + Eq + Hash + std::fmt::Debug + Send + 'static,
V: Clone + std::fmt::Debug + Send + 'static,
{
state: Arc<Mutex<CacheState<K, V>>>,
loader: Arc<dyn Loader<K = K, V = V>>,
}
impl<K, V> Cache<K, V>
where
K: Clone + Eq + Hash + std::fmt::Debug + Send + 'static,
V: Clone + std::fmt::Debug + Send + 'static,
{
/// Create new, empty cache with given loader function.
pub fn new(loader: Arc<dyn Loader<K = K, V = V>>) -> Self {
Self {
state: Arc::new(Mutex::new(CacheState {
cached_entries: HashMap::new(),
running_queries: HashMap::new(),
})),
loader,
}
}
/// Get value from cache.
pub async fn get(&self, k: K) -> V {
// place state locking into its own scope so it doesn't leak into the generator (async function)
let receiver = {
let mut state = self.state.lock();
// check if the already cached this entry
if let Some(v) = state.cached_entries.get(&k) {
return v.clone();
}
// check if there is already a query for this key running
if let Some((receiver, _handle)) = state.running_queries.get(&k) {
receiver.clone()
} else {
// requires new query
let (tx, rx) = tokio::sync::oneshot::channel();
let receiver = rx
.map_ok(|v| Arc::new(Mutex::new(v)))
.map_err(Arc::new)
.boxed()
.shared();
// need to wrap the query into a tokio task so that it doesn't get cancelled when this very request is canceled
let state_captured = Arc::clone(&self.state);
let loader = Arc::clone(&self.loader);
let k_captured = k.clone();
let handle = tokio::spawn(async move {
// need to clone K and bind it so rustc doesn't require `K: Sync`
let k_for_loader = k_captured.clone();
// execute the loader
// If we panic here then `tx` will be dropped and the receivers will be notified.
let v = loader.load(k_for_loader).await;
// broadcast result
// It's OK if the receiver side is gone. This might happen during shutdown
tx.send(v.clone()).ok();
// remove "running" state and store result
//
// Note: we need to manually drop the result of `.remove(...).expect(...)` here to convince rustc
// that we don't need the shared future within the resulting tuple. The warning we would get
// is:
//
// warning: unused `futures::future::Shared` in tuple element 0 that must be used
let mut state = state_captured.lock();
drop(
state
.running_queries
.remove(&k_captured)
.expect("query should be running"),
);
state.cached_entries.insert(k_captured, v);
});
state.running_queries.insert(k, (receiver.clone(), handle));
receiver
}
};
receiver
.await
.expect("cache loader panicked, see logs")
.lock()
.clone()
}
}
impl<K, V> Drop for Cache<K, V>
where
K: Clone + Eq + Hash + std::fmt::Debug + Send + 'static,
V: Clone + std::fmt::Debug + Send + 'static,
{
fn drop(&mut self) {
for (_k, (_receiver, handle)) in self.state.lock().running_queries.drain() {
// It's unlikely that anyone is still using the shared receiver at this point, because Cache::get borrow
// the self. If it is still in use, aborting the task will cancel the contained future which in turn will
// drop the sender of the oneshot channel. The receivers will be notified.
handle.abort();
}
}
}
/// A [`tokio::sync::oneshot::Receiver`] that can be cloned.
///
/// The types are:
/// - `Arc<Mutex<V>>`: Ensures that we can clone `V` without requiring `V: Sync`. At the same time the reference to `V`
/// (i.e. the `Arc`) must be cloneable for `Shared`
/// - `Arc<RecvError>`: Is required because `RecvError` is not `Clone` but `Shared` requires that.
/// - `BoxFuture`: The transformation from `Result<V, RecvError>` to `Result<Arc<Mutex<V>>, Arc<RecvError>>` results in
/// a kinda messy type and we wanna erase that.
/// - `Shared`: Allow the receiver to be cloned and be awaited from multiple places.
type SharedReceiver<V> = Shared<BoxFuture<'static, Result<Arc<Mutex<V>>, Arc<RecvError>>>>;
/// Inner cache state that is usually guarded by a lock.
///
/// The state parts must be updated in a consistent manner, i.e. while using the same lock guard.
#[derive(Debug)]
struct CacheState<K, V> {
/// Cached entires (i.e. queries completed).
cached_entries: HashMap<K, V>,
/// Currently running queries indexed by cache key.
///
/// For each query we have a receiver that can await the result as well as a handle for the task that is currently
/// executing the query. The handle can be used to abort the running query, e.g. when dropping the cache.
running_queries: HashMap<K, (SharedReceiver<V>, JoinHandle<()>)>,
}
#[cfg(test)]
mod tests {
use std::{collections::HashSet, sync::Arc, time::Duration};
use async_trait::async_trait;
use tokio::sync::Notify;
use super::*;
#[tokio::test]
async fn test_answers_are_correct() {
let (cache, _loader) = setup();
assert_eq!(cache.get(1).await, String::from("1"));
assert_eq!(cache.get(2).await, String::from("2"));
}
#[tokio::test]
async fn test_linear_memory() {
let (cache, loader) = setup();
assert_eq!(cache.get(1).await, String::from("1"));
assert_eq!(cache.get(1).await, String::from("1"));
assert_eq!(cache.get(2).await, String::from("2"));
assert_eq!(cache.get(2).await, String::from("2"));
assert_eq!(cache.get(1).await, String::from("1"));
assert_eq!(loader.loaded(), vec![1, 2]);
}
#[tokio::test]
async fn test_concurrent_query_loads_once() {
let (cache, loader) = setup();
loader.block();
let cache_captured = Arc::clone(&cache);
let handle_1 = tokio::spawn(async move { cache_captured.get(1).await });
let handle_2 = tokio::spawn(async move { cache.get(1).await });
tokio::time::sleep(Duration::from_millis(10)).await;
// Shouldn't issue concurrent load requests for the same key
let n_blocked = loader.unblock();
assert_eq!(n_blocked, 1);
assert_eq!(handle_1.await.unwrap(), String::from("1"));
assert_eq!(handle_2.await.unwrap(), String::from("1"));
assert_eq!(loader.loaded(), vec![1]);
}
#[tokio::test]
async fn test_queries_are_parallelized() {
let (cache, loader) = setup();
loader.block();
let cache_captured = Arc::clone(&cache);
let handle_1 = tokio::spawn(async move { cache_captured.get(1).await });
let cache_captured = Arc::clone(&cache);
let handle_2 = tokio::spawn(async move { cache_captured.get(1).await });
let handle_3 = tokio::spawn(async move { cache.get(2).await });
tokio::time::sleep(Duration::from_millis(10)).await;
let n_blocked = loader.unblock();
assert_eq!(n_blocked, 2);
assert_eq!(handle_1.await.unwrap(), String::from("1"));
assert_eq!(handle_2.await.unwrap(), String::from("1"));
assert_eq!(handle_3.await.unwrap(), String::from("2"));
assert_eq!(loader.loaded(), vec![1, 2]);
}
#[tokio::test]
async fn test_cancel_request() {
let (cache, loader) = setup();
loader.block();
let cache_captured = Arc::clone(&cache);
let handle_1 = tokio::spawn(async move { cache_captured.get(1).await });
tokio::time::sleep(Duration::from_millis(10)).await;
let handle_2 = tokio::spawn(async move { cache.get(1).await });
tokio::time::sleep(Duration::from_millis(10)).await;
// abort first handle
handle_1.abort();
tokio::time::sleep(Duration::from_millis(10)).await;
let n_blocked = loader.unblock();
assert_eq!(n_blocked, 1);
assert_eq!(handle_2.await.unwrap(), String::from("1"));
assert_eq!(loader.loaded(), vec![1]);
}
#[tokio::test]
async fn test_panic_request() {
let (cache, loader) = setup();
loader.panic_once(1);
loader.block();
let cache_captured = Arc::clone(&cache);
let handle_1 = tokio::spawn(async move { cache_captured.get(1).await });
tokio::time::sleep(Duration::from_millis(10)).await;
let cache_captured = Arc::clone(&cache);
let handle_2 = tokio::spawn(async move { cache_captured.get(1).await });
let handle_3 = tokio::spawn(async move { cache.get(2).await });
tokio::time::sleep(Duration::from_millis(10)).await;
let n_blocked = loader.unblock();
assert_eq!(n_blocked, 2);
// panic first handle
handle_1.await.unwrap_err();
tokio::time::sleep(Duration::from_millis(10)).await;
// second handle should also panic
handle_2.await.unwrap_err();
// third handle should just work
assert_eq!(handle_3.await.unwrap(), String::from("2"));
assert_eq!(loader.loaded(), vec![1, 2]);
}
#[tokio::test]
async fn test_drop_cancels_loader() {
let (cache, loader) = setup();
loader.block();
let handle = tokio::spawn(async move { cache.get(1).await });
tokio::time::sleep(Duration::from_millis(10)).await;
handle.abort();
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(Arc::strong_count(&loader), 1);
}
fn setup() -> (Arc<Cache<u8, String>>, Arc<TestLoader>) {
let loader = Arc::new(TestLoader::default());
let cache = Arc::new(Cache::new(Arc::clone(&loader) as _));
(cache, loader)
}
/// Flexible loader function for testing.
#[derive(Debug, Default)]
struct TestLoader {
loaded: Mutex<Vec<u8>>,
blocked: Mutex<Option<Arc<Notify>>>,
panic: Mutex<HashSet<u8>>,
}
impl TestLoader {
/// Panic when loading value for `k`.
///
/// If this is used together with [`block`](Self::block), the panic will occur AFTER blocking.
fn panic_once(&self, k: u8) {
self.panic.lock().insert(k);
}
/// Block all [`load`](Self::load) requests until [`unblock`](Self::unblock) is called.
///
/// If this is used together with [`panic_once`](Self::panic_once), the panic will occur AFTER blocking.
fn block(&self) {
let mut blocked = self.blocked.lock();
assert!(blocked.is_none());
*blocked = Some(Arc::new(Notify::new()));
}
/// Unblock all requests.
///
/// Returns number of requests that were blocked.
fn unblock(&self) -> usize {
let handle = self.blocked.lock().take().unwrap();
let blocked_count = Arc::strong_count(&handle) - 1;
handle.notify_waiters();
blocked_count
}
/// List all keys that were loaded.
///
/// Contains duplicates if keys were loaded multiple times.
fn loaded(&self) -> Vec<u8> {
self.loaded.lock().clone()
}
}
#[async_trait]
impl Loader for TestLoader {
type K = u8;
type V = String;
async fn load(&self, k: u8) -> String {
self.loaded.lock().push(k);
// need to capture the cloned notify handle, otherwise the lock guard leaks into the generator
let maybe_block = self.blocked.lock().clone();
if let Some(block) = maybe_block {
block.notified().await;
}
// maybe panic
if self.panic.lock().remove(&k) {
panic!("test");
}
k.to_string()
}
}
fn assert_send<T>()
where
T: Send,
{
}
fn assert_sync<T>()
where
T: Sync,
{
}
#[test]
fn test_bounds() {
assert_send::<Cache<u8, u8>>();
assert_sync::<Cache<u8, u8>>();
}
}

View File

@ -0,0 +1,53 @@
use async_trait::async_trait;
use futures::{future::BoxFuture, FutureExt};
use std::future::Future;
/// Loader for missing [`Cache`](crate::cache_system::driver::Cache) entries.
#[async_trait]
pub trait Loader: std::fmt::Debug + Send + Sync + 'static {
/// Cache key.
type K: Send + 'static;
/// Cache value.
type V: Send + 'static;
/// Load value for given key.
async fn load(&self, k: Self::K) -> Self::V;
}
/// Simple-to-use wrapper for async functions to act as a [`Loader`].
pub struct FunctionLoader<K, V> {
loader: Box<dyn (Fn(K) -> BoxFuture<'static, V>) + Send + Sync>,
}
impl<K, V> FunctionLoader<K, V> {
/// Create loader from function.
pub fn new<T, F>(loader: T) -> Self
where
T: Fn(K) -> F + Send + Sync + 'static,
F: Future<Output = V> + Send + 'static,
{
let loader = Box::new(move |k| loader(k).boxed());
Self { loader }
}
}
impl<K, V> std::fmt::Debug for FunctionLoader<K, V> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FunctionLoader").finish_non_exhaustive()
}
}
#[async_trait]
impl<K, V> Loader for FunctionLoader<K, V>
where
K: Send + 'static,
V: Send + 'static,
{
type K = K;
type V = V;
async fn load(&self, k: Self::K) -> Self::V {
(self.loader)(k).await
}
}

View File

@ -0,0 +1,2 @@
pub mod driver;
pub mod loader;

View File

@ -51,9 +51,10 @@ impl ParquetChunkAdapter {
/// Create parquet chunk.
async fn new_parquet_chunk(&self, decoded_parquet_file: &DecodedParquetFile) -> ParquetChunk {
let parquet_file = &decoded_parquet_file.parquet_file;
let table_name = self.catalog_cache.table_name(parquet_file.table_id).await;
let table_name = self.catalog_cache.table().name(parquet_file.table_id).await;
let partition_key = self
.catalog_cache
.partition()
.old_gen_partition_key(parquet_file.partition_id)
.await;
let metrics = ParquetChunkMetrics::new(self.metric_registry.as_ref());
@ -129,15 +130,18 @@ impl ParquetChunkAdapter {
ChunkAddr {
db_name: self
.catalog_cache
.namespace_name(
.namespace()
.name(
self.catalog_cache
.table_namespace_id(parquet_file.table_id)
.table()
.namespace_id(parquet_file.table_id)
.await,
)
.await,
table_name: self.catalog_cache.table_name(parquet_file.table_id).await,
table_name: self.catalog_cache.table().name(parquet_file.table_id).await,
partition_key: self
.catalog_cache
.partition()
.old_gen_partition_key(parquet_file.partition_id)
.await,
chunk_id: ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _)),

View File

@ -14,6 +14,7 @@
pub use client_util::connection;
mod cache;
mod cache_system;
mod chunk;
pub mod database;
/// Flight client to the ingester to request in-memory data.

View File

@ -31,7 +31,7 @@ mod test_util;
/// in-memory data structure are synced regularly via [`sync`](Self::sync).
///
/// To speed up the sync process and reduce the load on the [IOx Catalog](Catalog) we try to use rather large-scoped
/// queries as well as a [`CatalogCache`].
/// queries as well as a `CatalogCache`.
#[derive(Debug)]
pub struct QuerierNamespace {
/// Backoff config for IO operations.
@ -240,8 +240,12 @@ impl QuerierNamespace {
let mut desired_partitions = HashSet::with_capacity(partitions.len());
for partition in partitions {
let table = self.catalog_cache.table_name(partition.table_id).await;
let key = self.catalog_cache.old_gen_partition_key(partition.id).await;
let table = self.catalog_cache.table().name(partition.table_id).await;
let key = self
.catalog_cache
.partition()
.old_gen_partition_key(partition.id)
.await;
desired_partitions.insert((table, key));
}
@ -475,7 +479,7 @@ impl QuerierNamespace {
let mut predicates_by_table_and_sequencer: HashMap<_, HashMap<_, Vec<_>>> =
HashMap::with_capacity(tombstones_by_table_and_sequencer.len());
for (table_id, tombstones_by_sequencer) in tombstones_by_table_and_sequencer {
let table_name = self.catalog_cache.table_name(table_id).await;
let table_name = self.catalog_cache.table().name(table_id).await;
let mut predicates_by_sequencer = HashMap::with_capacity(tombstones_by_sequencer.len());
for (sequencer_id, mut tombstones) in tombstones_by_sequencer {
// sort tombstones by ID so that predicate lists are stable