From 30d1c77d363e5d0b730800aa113f3defbed6e9c6 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 10 Mar 2022 11:27:24 +0000 Subject: [PATCH] feat: querier test system, ground work (#3991) * feat: querier test system, ground work See #3985 for the motivation. This introduces a cache system for the querier which can later be extended to support the remaining features listed in #3985 (e.g. metrics, LRU/TTL). All current caches are wired up to go throw the new cache system. Once we move away from (ab)using `db`, the set of caches will be different but the system will remain. * test: explain it Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> * refactor: simplify cache result broadcast * refactor: introduce `Loader` crate * fix: docs * docs: explain why we manually drop removed hashmap entries * docs: fix intra-doc link Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- querier/src/cache.rs | 341 ------------------------- querier/src/cache/mod.rs | 66 +++++ querier/src/cache/namespace.rs | 88 +++++++ querier/src/cache/partition.rs | 125 +++++++++ querier/src/cache/table.rs | 145 +++++++++++ querier/src/cache/test_util.rs | 13 + querier/src/cache_system/driver.rs | 397 +++++++++++++++++++++++++++++ querier/src/cache_system/loader.rs | 53 ++++ querier/src/cache_system/mod.rs | 2 + querier/src/chunk.rs | 12 +- querier/src/lib.rs | 1 + querier/src/namespace/mod.rs | 12 +- 12 files changed, 906 insertions(+), 349 deletions(-) delete mode 100644 querier/src/cache.rs create mode 100644 querier/src/cache/mod.rs create mode 100644 querier/src/cache/namespace.rs create mode 100644 querier/src/cache/partition.rs create mode 100644 querier/src/cache/table.rs create mode 100644 querier/src/cache/test_util.rs create mode 100644 querier/src/cache_system/driver.rs create mode 100644 querier/src/cache_system/loader.rs create mode 100644 querier/src/cache_system/mod.rs diff --git a/querier/src/cache.rs b/querier/src/cache.rs deleted file mode 100644 index 271cc5f223..0000000000 --- a/querier/src/cache.rs +++ /dev/null @@ -1,341 +0,0 @@ -use backoff::{Backoff, BackoffConfig}; -use data_types2::{NamespaceId, PartitionId, SequencerId, TableId}; -use iox_catalog::interface::Catalog; -use parking_lot::RwLock; -use std::{collections::HashMap, sync::Arc}; - -/// Caches request to the [`Catalog`]. -#[derive(Debug)] -pub struct CatalogCache { - /// Backoff config for IO operations. - backoff_config: BackoffConfig, - - /// Catalog. - catalog: Arc, - - /// Partition keys cache for old gen. - old_gen_partition_key_cache: RwLock>>, - - /// Partition key and sequencer ID cache. - partition_cache: RwLock, SequencerId)>>, - - /// Table name and namespace cache. - table_cache: RwLock, NamespaceId)>>, - - /// Namespace name cache. - namespace_cache: RwLock>>, -} - -impl CatalogCache { - /// Create empty cache. - pub fn new(catalog: Arc) -> Self { - Self { - backoff_config: BackoffConfig::default(), - catalog, - old_gen_partition_key_cache: RwLock::new(HashMap::default()), - partition_cache: RwLock::new(HashMap::default()), - table_cache: RwLock::new(HashMap::default()), - namespace_cache: RwLock::new(HashMap::default()), - } - } - - /// Get underlying catalog - pub fn catalog(&self) -> Arc { - Arc::clone(&self.catalog) - } - - /// Get partition key for old gen. - /// - /// This either uses a cached value or -- if required -- creates a fresh string. - pub async fn old_gen_partition_key(&self, partition_id: PartitionId) -> Arc { - if let Some(key) = self.old_gen_partition_key_cache.read().get(&partition_id) { - return Arc::clone(key); - } - - let (partition_key, sequencer_id) = self.cached_partition(partition_id).await; - let og_partition_key = Arc::from(format!("{}-{}", sequencer_id.get(), partition_key)); - - Arc::clone( - self.old_gen_partition_key_cache - .write() - .entry(partition_id) - .or_insert(og_partition_key), - ) - } - - /// Get the partition key and sequencer ID for the given partition ID. - /// - /// This either uses a cached value or -- if required -- fetches the mapping from the catalog. - async fn cached_partition(&self, partition_id: PartitionId) -> (Arc, SequencerId) { - if let Some((key, id)) = self.partition_cache.read().get(&partition_id) { - return (Arc::clone(key), *id); - } - - let partition = Backoff::new(&self.backoff_config) - .retry_all_errors("get partition_key", || async { - self.catalog - .repositories() - .await - .partitions() - .get_by_id(partition_id) - .await - }) - .await - .expect("retry forever") - .expect("partition gone from catalog?!"); - - let key = Arc::from(partition.partition_key); - - let mut partition_cache = self.partition_cache.write(); - let (key, id) = partition_cache - .entry(partition_id) - .or_insert((key, partition.sequencer_id)); - (Arc::clone(key), *id) - } - - /// Get the table name and namespace ID for the given table ID. - /// - /// This either uses a cached value or -- if required -- fetches the mapping from the catalog. - async fn cached_table(&self, table_id: TableId) -> (Arc, NamespaceId) { - if let Some((name, ns)) = self.table_cache.read().get(&table_id) { - return (Arc::clone(name), *ns); - } - - let table = Backoff::new(&self.backoff_config) - .retry_all_errors("get table_name", || async { - self.catalog - .repositories() - .await - .tables() - .get_by_id(table_id) - .await - }) - .await - .expect("retry forever") - .expect("table gone from catalog?!"); - - let name = Arc::from(table.name); - - let mut table_cache = self.table_cache.write(); - let (name, ns) = table_cache - .entry(table_id) - .or_insert((name, table.namespace_id)); - (Arc::clone(name), *ns) - } - - /// Get the table name for the given table ID. - /// - /// This either uses a cached value or -- if required -- fetches the mapping from the catalog. - pub async fn table_name(&self, table_id: TableId) -> Arc { - self.cached_table(table_id).await.0 - } - - /// Get the table namespace ID for the given table ID. - /// - /// This either uses a cached value or -- if required -- fetches the mapping from the catalog. - pub async fn table_namespace_id(&self, table_id: TableId) -> NamespaceId { - self.cached_table(table_id).await.1 - } - - /// Get the namespace name for the given namespace ID. - /// - /// This either uses a cached value or -- if required -- fetches the mapping from the catalog. - pub async fn namespace_name(&self, namespace_id: NamespaceId) -> Arc { - if let Some(name) = self.namespace_cache.read().get(&namespace_id) { - return Arc::clone(name); - } - - let namespace = Backoff::new(&self.backoff_config) - .retry_all_errors("get namespace_name", || async { - self.catalog - .repositories() - .await - .namespaces() - .get_by_id(namespace_id) - .await - }) - .await - .expect("retry forever") - .expect("namespace gone from catalog?!"); - - let name = Arc::from(namespace.name); - - Arc::clone( - self.namespace_cache - .write() - .entry(namespace_id) - .or_insert(name), - ) - } -} - -#[cfg(test)] -mod tests { - use metric::{Attributes, Metric, U64Histogram}; - - use crate::test_util::TestCatalog; - - use super::*; - - #[tokio::test] - async fn test_namespace_name() { - let catalog = TestCatalog::new(); - - let ns1 = catalog.create_namespace("ns1").await.namespace.clone(); - let ns2 = catalog.create_namespace("ns2").await.namespace.clone(); - assert_ne!(ns1.id, ns2.id); - - let cache = CatalogCache::new(catalog.catalog()); - - let name1_a = cache.namespace_name(ns1.id).await; - assert_eq!(name1_a.as_ref(), ns1.name.as_str()); - assert_metric(&catalog.metric_registry, "namespace_get_by_id", 1); - - let name2 = cache.namespace_name(ns2.id).await; - assert_eq!(name2.as_ref(), ns2.name.as_str()); - assert_metric(&catalog.metric_registry, "namespace_get_by_id", 2); - - let name1_b = cache.namespace_name(ns1.id).await; - assert!(Arc::ptr_eq(&name1_a, &name1_b)); - assert_metric(&catalog.metric_registry, "namespace_get_by_id", 2); - } - - #[tokio::test] - async fn test_table_name() { - let catalog = TestCatalog::new(); - - let ns = catalog.create_namespace("ns").await; - let t1 = ns.create_table("table1").await.table.clone(); - let t2 = ns.create_table("table2").await.table.clone(); - assert_ne!(t1.id, t2.id); - - let cache = CatalogCache::new(catalog.catalog()); - - let name1_a = cache.table_name(t1.id).await; - assert_eq!(name1_a.as_ref(), t1.name.as_str()); - assert_metric(&catalog.metric_registry, "table_get_by_id", 1); - - let name2 = cache.table_name(t2.id).await; - assert_eq!(name2.as_ref(), t2.name.as_str()); - assert_metric(&catalog.metric_registry, "table_get_by_id", 2); - - let name1_b = cache.table_name(t1.id).await; - assert!(Arc::ptr_eq(&name1_a, &name1_b)); - assert_metric(&catalog.metric_registry, "table_get_by_id", 2); - } - - #[tokio::test] - async fn test_table_namespace_id() { - let catalog = TestCatalog::new(); - - let ns2 = catalog.create_namespace("ns1").await; - let ns1 = catalog.create_namespace("ns2").await; - let t1 = ns1.create_table("table1").await.table.clone(); - let t2 = ns2.create_table("table2").await.table.clone(); - assert_ne!(t1.id, t2.id); - assert_ne!(t1.namespace_id, t2.namespace_id); - - let cache = CatalogCache::new(catalog.catalog()); - - let id1_a = cache.table_namespace_id(t1.id).await; - assert_eq!(id1_a, t1.namespace_id); - assert_metric(&catalog.metric_registry, "table_get_by_id", 1); - - let id2 = cache.table_namespace_id(t2.id).await; - assert_eq!(id2, t2.namespace_id); - assert_metric(&catalog.metric_registry, "table_get_by_id", 2); - - let id1_b = cache.table_namespace_id(t1.id).await; - assert_eq!(id1_b, t1.namespace_id); - assert_metric(&catalog.metric_registry, "table_get_by_id", 2); - } - - #[tokio::test] - async fn test_table_shared_cache() { - let catalog = TestCatalog::new(); - - let ns = catalog.create_namespace("ns").await; - let t1 = ns.create_table("table1").await.table.clone(); - let t2 = ns.create_table("table2").await.table.clone(); - assert_ne!(t1.id, t2.id); - - let cache = CatalogCache::new(catalog.catalog()); - - cache.table_name(t1.id).await; - cache.table_namespace_id(t2.id).await; - assert_metric(&catalog.metric_registry, "table_get_by_id", 2); - - // `table_name` and `table_namespace_id` use the same underlying cache - cache.table_namespace_id(t1.id).await; - cache.table_name(t2.id).await; - assert_metric(&catalog.metric_registry, "table_get_by_id", 2); - } - - #[tokio::test] - async fn test_old_partition_key() { - let catalog = TestCatalog::new(); - - let ns = catalog.create_namespace("ns").await; - let t = ns.create_table("table").await; - let s1 = ns.create_sequencer(1).await; - let s2 = ns.create_sequencer(2).await; - let p11 = t - .with_sequencer(&s1) - .create_partition("k1") - .await - .partition - .clone(); - let p12 = t - .with_sequencer(&s2) - .create_partition("k1") - .await - .partition - .clone(); - let p21 = t - .with_sequencer(&s1) - .create_partition("k2") - .await - .partition - .clone(); - let p22 = t - .with_sequencer(&s2) - .create_partition("k2") - .await - .partition - .clone(); - - let cache = CatalogCache::new(catalog.catalog()); - - let name11_a = cache.old_gen_partition_key(p11.id).await; - assert_eq!(name11_a.as_ref(), "1-k1"); - assert_metric(&catalog.metric_registry, "partition_get_by_id", 1); - - let name12 = cache.old_gen_partition_key(p12.id).await; - assert_eq!(name12.as_ref(), "2-k1"); - assert_metric(&catalog.metric_registry, "partition_get_by_id", 2); - - let name21 = cache.old_gen_partition_key(p21.id).await; - assert_eq!(name21.as_ref(), "1-k2"); - assert_metric(&catalog.metric_registry, "partition_get_by_id", 3); - - let name22 = cache.old_gen_partition_key(p22.id).await; - assert_eq!(name22.as_ref(), "2-k2"); - assert_metric(&catalog.metric_registry, "partition_get_by_id", 4); - - let name11_b = cache.old_gen_partition_key(p11.id).await; - assert!(Arc::ptr_eq(&name11_a, &name11_b)); - assert_metric(&catalog.metric_registry, "partition_get_by_id", 4); - } - - fn assert_metric(metrics: &metric::Registry, name: &'static str, n: u64) { - let histogram = metrics - .get_instrument::>("catalog_op_duration_ms") - .expect("failed to read metric") - .get_observer(&Attributes::from(&[("op", name), ("result", "success")])) - .expect("failed to get observer") - .fetch(); - - let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count); - assert_eq!(hit_count, n); - } -} diff --git a/querier/src/cache/mod.rs b/querier/src/cache/mod.rs new file mode 100644 index 0000000000..2b045e34ab --- /dev/null +++ b/querier/src/cache/mod.rs @@ -0,0 +1,66 @@ +use backoff::BackoffConfig; +use iox_catalog::interface::Catalog; +use std::sync::Arc; + +use self::{namespace::NamespaceCache, partition::PartitionCache, table::TableCache}; + +pub mod namespace; +pub mod partition; +pub mod table; + +#[cfg(test)] +mod test_util; + +/// Caches request to the [`Catalog`]. +#[derive(Debug)] +pub struct CatalogCache { + /// Catalog. + catalog: Arc, + + /// Partition cache. + partition_cache: PartitionCache, + + /// Table cache. + table_cache: TableCache, + + /// Namespace cache. + namespace_cache: NamespaceCache, +} + +impl CatalogCache { + /// Create empty cache. + pub fn new(catalog: Arc) -> Self { + let backoff_config = BackoffConfig::default(); + + let namespace_cache = NamespaceCache::new(Arc::clone(&catalog), backoff_config.clone()); + let table_cache = TableCache::new(Arc::clone(&catalog), backoff_config.clone()); + let partition_cache = PartitionCache::new(Arc::clone(&catalog), backoff_config); + + Self { + catalog, + partition_cache, + table_cache, + namespace_cache, + } + } + + /// Get underlying catalog + pub fn catalog(&self) -> Arc { + Arc::clone(&self.catalog) + } + + /// Namespace cache + pub fn namespace(&self) -> &NamespaceCache { + &self.namespace_cache + } + + /// Table cache + pub fn table(&self) -> &TableCache { + &self.table_cache + } + + /// Partition cache + pub fn partition(&self) -> &PartitionCache { + &self.partition_cache + } +} diff --git a/querier/src/cache/namespace.rs b/querier/src/cache/namespace.rs new file mode 100644 index 0000000000..1db83c3a95 --- /dev/null +++ b/querier/src/cache/namespace.rs @@ -0,0 +1,88 @@ +use std::sync::Arc; + +use backoff::{Backoff, BackoffConfig}; +use data_types2::NamespaceId; +use iox_catalog::interface::Catalog; + +use crate::cache_system::{driver::Cache, loader::FunctionLoader}; + +/// Cache for namespace-related attributes. +#[derive(Debug)] +pub struct NamespaceCache { + cache: Cache, +} + +impl NamespaceCache { + /// Create new empty cache. + pub fn new(catalog: Arc, backoff_config: BackoffConfig) -> Self { + let loader = Arc::new(FunctionLoader::new(move |namespace_id| { + let catalog = Arc::clone(&catalog); + let backoff_config = backoff_config.clone(); + + async move { + let namespace = Backoff::new(&backoff_config) + .retry_all_errors("get namespace_name", || async { + catalog + .repositories() + .await + .namespaces() + .get_by_id(namespace_id) + .await + }) + .await + .expect("retry forever") + .expect("namespace gone from catalog?!"); + + CachedNamespace { + name: Arc::from(namespace.name), + } + } + })); + + Self { + cache: Cache::new(loader), + } + } + + /// Get the namespace name for the given namespace ID. + /// + /// This either uses a cached value or -- if required -- fetches the mapping from the catalog. + pub async fn name(&self, id: NamespaceId) -> Arc { + self.cache.get(id).await.name + } +} + +#[derive(Debug, Clone)] +struct CachedNamespace { + name: Arc, +} + +#[cfg(test)] +mod tests { + use crate::{cache::test_util::assert_histogram_metric_count, test_util::TestCatalog}; + + use super::*; + + #[tokio::test] + async fn test_name() { + let catalog = TestCatalog::new(); + + let ns1 = catalog.create_namespace("ns1").await.namespace.clone(); + let ns2 = catalog.create_namespace("ns2").await.namespace.clone(); + assert_ne!(ns1.id, ns2.id); + + let cache = NamespaceCache::new(catalog.catalog(), BackoffConfig::default()); + + let name1_a = cache.name(ns1.id).await; + assert_eq!(name1_a.as_ref(), ns1.name.as_str()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_id", 1); + + let name2 = cache.name(ns2.id).await; + assert_eq!(name2.as_ref(), ns2.name.as_str()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_id", 2); + + let name1_b = cache.name(ns1.id).await; + assert!(Arc::ptr_eq(&name1_a, &name1_b)); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_id", 2); + } +} diff --git a/querier/src/cache/partition.rs b/querier/src/cache/partition.rs new file mode 100644 index 0000000000..10929c3574 --- /dev/null +++ b/querier/src/cache/partition.rs @@ -0,0 +1,125 @@ +use std::sync::Arc; + +use backoff::{Backoff, BackoffConfig}; +use data_types2::PartitionId; +use iox_catalog::interface::Catalog; + +use crate::cache_system::{driver::Cache, loader::FunctionLoader}; + +/// Cache for partition-related attributes. +#[derive(Debug)] +pub struct PartitionCache { + cache: Cache, +} + +impl PartitionCache { + /// Create new empty cache. + pub fn new(catalog: Arc, backoff_config: BackoffConfig) -> Self { + let loader = Arc::new(FunctionLoader::new(move |partition_id| { + let catalog = Arc::clone(&catalog); + let backoff_config = backoff_config.clone(); + + async move { + let partition = Backoff::new(&backoff_config) + .retry_all_errors("get partition_key", || async { + catalog + .repositories() + .await + .partitions() + .get_by_id(partition_id) + .await + }) + .await + .expect("retry forever") + .expect("partition gone from catalog?!"); + + CachedPartition { + old_gen_partition_key: Arc::from(format!( + "{}-{}", + partition.sequencer_id.get(), + partition.partition_key + )), + } + } + })); + + Self { + cache: Cache::new(loader), + } + } + + /// Get partition key for old gen. + /// + /// This either uses a cached value or -- if required -- creates a fresh string. + pub async fn old_gen_partition_key(&self, partition_id: PartitionId) -> Arc { + self.cache.get(partition_id).await.old_gen_partition_key + } +} + +#[derive(Debug, Clone)] +struct CachedPartition { + old_gen_partition_key: Arc, +} + +#[cfg(test)] +mod tests { + use crate::{cache::test_util::assert_histogram_metric_count, test_util::TestCatalog}; + + use super::*; + + #[tokio::test] + async fn test_old_partition_key() { + let catalog = TestCatalog::new(); + + let ns = catalog.create_namespace("ns").await; + let t = ns.create_table("table").await; + let s1 = ns.create_sequencer(1).await; + let s2 = ns.create_sequencer(2).await; + let p11 = t + .with_sequencer(&s1) + .create_partition("k1") + .await + .partition + .clone(); + let p12 = t + .with_sequencer(&s2) + .create_partition("k1") + .await + .partition + .clone(); + let p21 = t + .with_sequencer(&s1) + .create_partition("k2") + .await + .partition + .clone(); + let p22 = t + .with_sequencer(&s2) + .create_partition("k2") + .await + .partition + .clone(); + + let cache = PartitionCache::new(catalog.catalog(), BackoffConfig::default()); + + let name11_a = cache.old_gen_partition_key(p11.id).await; + assert_eq!(name11_a.as_ref(), "1-k1"); + assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 1); + + let name12 = cache.old_gen_partition_key(p12.id).await; + assert_eq!(name12.as_ref(), "2-k1"); + assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 2); + + let name21 = cache.old_gen_partition_key(p21.id).await; + assert_eq!(name21.as_ref(), "1-k2"); + assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 3); + + let name22 = cache.old_gen_partition_key(p22.id).await; + assert_eq!(name22.as_ref(), "2-k2"); + assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); + + let name11_b = cache.old_gen_partition_key(p11.id).await; + assert!(Arc::ptr_eq(&name11_a, &name11_b)); + assert_histogram_metric_count(&catalog.metric_registry, "partition_get_by_id", 4); + } +} diff --git a/querier/src/cache/table.rs b/querier/src/cache/table.rs new file mode 100644 index 0000000000..fad6bfa755 --- /dev/null +++ b/querier/src/cache/table.rs @@ -0,0 +1,145 @@ +use std::sync::Arc; + +use backoff::{Backoff, BackoffConfig}; +use data_types2::{NamespaceId, TableId}; +use iox_catalog::interface::Catalog; + +use crate::cache_system::{driver::Cache, loader::FunctionLoader}; + +/// Cache for table-related queries. +#[derive(Debug)] +pub struct TableCache { + cache: Cache, +} + +impl TableCache { + /// Create new empty cache. + pub fn new(catalog: Arc, backoff_config: BackoffConfig) -> Self { + let loader = Arc::new(FunctionLoader::new(move |table_id| { + let catalog = Arc::clone(&catalog); + let backoff_config = backoff_config.clone(); + + async move { + let table = Backoff::new(&backoff_config) + .retry_all_errors("get table_name", || async { + catalog + .repositories() + .await + .tables() + .get_by_id(table_id) + .await + }) + .await + .expect("retry forever") + .expect("table gone from catalog?!"); + + CachedTable { + name: Arc::from(table.name), + namespace_id: table.namespace_id, + } + } + })); + + Self { + cache: Cache::new(loader), + } + } + + /// Get the table name for the given table ID. + /// + /// This either uses a cached value or -- if required -- fetches the mapping from the catalog. + pub async fn name(&self, table_id: TableId) -> Arc { + self.cache.get(table_id).await.name + } + + /// Get the table namespace ID for the given table ID. + /// + /// This either uses a cached value or -- if required -- fetches the mapping from the catalog. + pub async fn namespace_id(&self, table_id: TableId) -> NamespaceId { + self.cache.get(table_id).await.namespace_id + } +} + +#[derive(Debug, Clone)] +struct CachedTable { + name: Arc, + namespace_id: NamespaceId, +} + +#[cfg(test)] +mod tests { + use crate::{cache::test_util::assert_histogram_metric_count, test_util::TestCatalog}; + + use super::*; + + #[tokio::test] + async fn test_table_name() { + let catalog = TestCatalog::new(); + + let ns = catalog.create_namespace("ns").await; + let t1 = ns.create_table("table1").await.table.clone(); + let t2 = ns.create_table("table2").await.table.clone(); + assert_ne!(t1.id, t2.id); + + let cache = TableCache::new(catalog.catalog(), BackoffConfig::default()); + + let name1_a = cache.name(t1.id).await; + assert_eq!(name1_a.as_ref(), t1.name.as_str()); + assert_histogram_metric_count(&catalog.metric_registry, "table_get_by_id", 1); + + let name2 = cache.name(t2.id).await; + assert_eq!(name2.as_ref(), t2.name.as_str()); + assert_histogram_metric_count(&catalog.metric_registry, "table_get_by_id", 2); + + let name1_b = cache.name(t1.id).await; + assert!(Arc::ptr_eq(&name1_a, &name1_b)); + assert_histogram_metric_count(&catalog.metric_registry, "table_get_by_id", 2); + } + + #[tokio::test] + async fn test_table_namespace_id() { + let catalog = TestCatalog::new(); + + let ns2 = catalog.create_namespace("ns1").await; + let ns1 = catalog.create_namespace("ns2").await; + let t1 = ns1.create_table("table1").await.table.clone(); + let t2 = ns2.create_table("table2").await.table.clone(); + assert_ne!(t1.id, t2.id); + assert_ne!(t1.namespace_id, t2.namespace_id); + + let cache = TableCache::new(catalog.catalog(), BackoffConfig::default()); + + let id1_a = cache.namespace_id(t1.id).await; + assert_eq!(id1_a, t1.namespace_id); + assert_histogram_metric_count(&catalog.metric_registry, "table_get_by_id", 1); + + let id2 = cache.namespace_id(t2.id).await; + assert_eq!(id2, t2.namespace_id); + assert_histogram_metric_count(&catalog.metric_registry, "table_get_by_id", 2); + + let id1_b = cache.namespace_id(t1.id).await; + assert_eq!(id1_b, t1.namespace_id); + assert_histogram_metric_count(&catalog.metric_registry, "table_get_by_id", 2); + } + + #[tokio::test] + async fn test_table_shared_cache() { + let catalog = TestCatalog::new(); + + let ns = catalog.create_namespace("ns").await; + let t1 = ns.create_table("table1").await.table.clone(); + let t2 = ns.create_table("table2").await.table.clone(); + assert_ne!(t1.id, t2.id); + + let cache = TableCache::new(catalog.catalog(), BackoffConfig::default()); + + cache.name(t1.id).await; + cache.namespace_id(t2.id).await; + assert_histogram_metric_count(&catalog.metric_registry, "table_get_by_id", 2); + + // `name` and `namespace_id` use the same underlying cache + cache.namespace_id(t1.id).await; + cache.name(t2.id).await; + assert_histogram_metric_count(&catalog.metric_registry, "table_get_by_id", 2); + } +} diff --git a/querier/src/cache/test_util.rs b/querier/src/cache/test_util.rs new file mode 100644 index 0000000000..97e3efc294 --- /dev/null +++ b/querier/src/cache/test_util.rs @@ -0,0 +1,13 @@ +use metric::{Attributes, Metric, U64Histogram}; + +pub fn assert_histogram_metric_count(metrics: &metric::Registry, name: &'static str, n: u64) { + let histogram = metrics + .get_instrument::>("catalog_op_duration_ms") + .expect("failed to read metric") + .get_observer(&Attributes::from(&[("op", name), ("result", "success")])) + .expect("failed to get observer") + .fetch(); + + let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count); + assert_eq!(hit_count, n); +} diff --git a/querier/src/cache_system/driver.rs b/querier/src/cache_system/driver.rs new file mode 100644 index 0000000000..94e9bbece2 --- /dev/null +++ b/querier/src/cache_system/driver.rs @@ -0,0 +1,397 @@ +use std::{collections::HashMap, hash::Hash, sync::Arc}; + +use futures::{ + future::{BoxFuture, Shared}, + FutureExt, TryFutureExt, +}; +use parking_lot::Mutex; +use tokio::{sync::oneshot::error::RecvError, task::JoinHandle}; + +use super::loader::Loader; + +/// High-level cache implementation. +/// +/// # Concurrency +/// Multiple cache requests for different keys can run at the same time. When data is requested for the same key the +/// underlying loader will only be polled once, even when the requests are made while the loader is still running. +/// +/// # Cancellation +/// Canceling a [`get`](Self::get) request will NOT cancel the underlying loader. The data will still be cached. +/// +/// # Panic +/// If the underlying loader panics, all currently running [`get`](Self::get) requests will panic. The data will NOT be cached. +#[derive(Debug)] +pub struct Cache +where + K: Clone + Eq + Hash + std::fmt::Debug + Send + 'static, + V: Clone + std::fmt::Debug + Send + 'static, +{ + state: Arc>>, + loader: Arc>, +} + +impl Cache +where + K: Clone + Eq + Hash + std::fmt::Debug + Send + 'static, + V: Clone + std::fmt::Debug + Send + 'static, +{ + /// Create new, empty cache with given loader function. + pub fn new(loader: Arc>) -> Self { + Self { + state: Arc::new(Mutex::new(CacheState { + cached_entries: HashMap::new(), + running_queries: HashMap::new(), + })), + loader, + } + } + + /// Get value from cache. + pub async fn get(&self, k: K) -> V { + // place state locking into its own scope so it doesn't leak into the generator (async function) + let receiver = { + let mut state = self.state.lock(); + + // check if the already cached this entry + if let Some(v) = state.cached_entries.get(&k) { + return v.clone(); + } + + // check if there is already a query for this key running + if let Some((receiver, _handle)) = state.running_queries.get(&k) { + receiver.clone() + } else { + // requires new query + let (tx, rx) = tokio::sync::oneshot::channel(); + let receiver = rx + .map_ok(|v| Arc::new(Mutex::new(v))) + .map_err(Arc::new) + .boxed() + .shared(); + + // need to wrap the query into a tokio task so that it doesn't get cancelled when this very request is canceled + let state_captured = Arc::clone(&self.state); + let loader = Arc::clone(&self.loader); + let k_captured = k.clone(); + let handle = tokio::spawn(async move { + // need to clone K and bind it so rustc doesn't require `K: Sync` + let k_for_loader = k_captured.clone(); + + // execute the loader + // If we panic here then `tx` will be dropped and the receivers will be notified. + let v = loader.load(k_for_loader).await; + + // broadcast result + // It's OK if the receiver side is gone. This might happen during shutdown + tx.send(v.clone()).ok(); + + // remove "running" state and store result + // + // Note: we need to manually drop the result of `.remove(...).expect(...)` here to convince rustc + // that we don't need the shared future within the resulting tuple. The warning we would get + // is: + // + // warning: unused `futures::future::Shared` in tuple element 0 that must be used + let mut state = state_captured.lock(); + drop( + state + .running_queries + .remove(&k_captured) + .expect("query should be running"), + ); + state.cached_entries.insert(k_captured, v); + }); + + state.running_queries.insert(k, (receiver.clone(), handle)); + receiver + } + }; + + receiver + .await + .expect("cache loader panicked, see logs") + .lock() + .clone() + } +} + +impl Drop for Cache +where + K: Clone + Eq + Hash + std::fmt::Debug + Send + 'static, + V: Clone + std::fmt::Debug + Send + 'static, +{ + fn drop(&mut self) { + for (_k, (_receiver, handle)) in self.state.lock().running_queries.drain() { + // It's unlikely that anyone is still using the shared receiver at this point, because Cache::get borrow + // the self. If it is still in use, aborting the task will cancel the contained future which in turn will + // drop the sender of the oneshot channel. The receivers will be notified. + handle.abort(); + } + } +} + +/// A [`tokio::sync::oneshot::Receiver`] that can be cloned. +/// +/// The types are: +/// - `Arc>`: Ensures that we can clone `V` without requiring `V: Sync`. At the same time the reference to `V` +/// (i.e. the `Arc`) must be cloneable for `Shared` +/// - `Arc`: Is required because `RecvError` is not `Clone` but `Shared` requires that. +/// - `BoxFuture`: The transformation from `Result` to `Result>, Arc>` results in +/// a kinda messy type and we wanna erase that. +/// - `Shared`: Allow the receiver to be cloned and be awaited from multiple places. +type SharedReceiver = Shared>, Arc>>>; + +/// Inner cache state that is usually guarded by a lock. +/// +/// The state parts must be updated in a consistent manner, i.e. while using the same lock guard. +#[derive(Debug)] +struct CacheState { + /// Cached entires (i.e. queries completed). + cached_entries: HashMap, + + /// Currently running queries indexed by cache key. + /// + /// For each query we have a receiver that can await the result as well as a handle for the task that is currently + /// executing the query. The handle can be used to abort the running query, e.g. when dropping the cache. + running_queries: HashMap, JoinHandle<()>)>, +} + +#[cfg(test)] +mod tests { + use std::{collections::HashSet, sync::Arc, time::Duration}; + + use async_trait::async_trait; + use tokio::sync::Notify; + + use super::*; + + #[tokio::test] + async fn test_answers_are_correct() { + let (cache, _loader) = setup(); + + assert_eq!(cache.get(1).await, String::from("1")); + assert_eq!(cache.get(2).await, String::from("2")); + } + + #[tokio::test] + async fn test_linear_memory() { + let (cache, loader) = setup(); + + assert_eq!(cache.get(1).await, String::from("1")); + assert_eq!(cache.get(1).await, String::from("1")); + assert_eq!(cache.get(2).await, String::from("2")); + assert_eq!(cache.get(2).await, String::from("2")); + assert_eq!(cache.get(1).await, String::from("1")); + + assert_eq!(loader.loaded(), vec![1, 2]); + } + + #[tokio::test] + async fn test_concurrent_query_loads_once() { + let (cache, loader) = setup(); + + loader.block(); + + let cache_captured = Arc::clone(&cache); + let handle_1 = tokio::spawn(async move { cache_captured.get(1).await }); + let handle_2 = tokio::spawn(async move { cache.get(1).await }); + + tokio::time::sleep(Duration::from_millis(10)).await; + // Shouldn't issue concurrent load requests for the same key + let n_blocked = loader.unblock(); + assert_eq!(n_blocked, 1); + + assert_eq!(handle_1.await.unwrap(), String::from("1")); + assert_eq!(handle_2.await.unwrap(), String::from("1")); + + assert_eq!(loader.loaded(), vec![1]); + } + + #[tokio::test] + async fn test_queries_are_parallelized() { + let (cache, loader) = setup(); + + loader.block(); + + let cache_captured = Arc::clone(&cache); + let handle_1 = tokio::spawn(async move { cache_captured.get(1).await }); + let cache_captured = Arc::clone(&cache); + let handle_2 = tokio::spawn(async move { cache_captured.get(1).await }); + let handle_3 = tokio::spawn(async move { cache.get(2).await }); + + tokio::time::sleep(Duration::from_millis(10)).await; + + let n_blocked = loader.unblock(); + assert_eq!(n_blocked, 2); + + assert_eq!(handle_1.await.unwrap(), String::from("1")); + assert_eq!(handle_2.await.unwrap(), String::from("1")); + assert_eq!(handle_3.await.unwrap(), String::from("2")); + + assert_eq!(loader.loaded(), vec![1, 2]); + } + + #[tokio::test] + async fn test_cancel_request() { + let (cache, loader) = setup(); + + loader.block(); + + let cache_captured = Arc::clone(&cache); + let handle_1 = tokio::spawn(async move { cache_captured.get(1).await }); + tokio::time::sleep(Duration::from_millis(10)).await; + let handle_2 = tokio::spawn(async move { cache.get(1).await }); + + tokio::time::sleep(Duration::from_millis(10)).await; + + // abort first handle + handle_1.abort(); + tokio::time::sleep(Duration::from_millis(10)).await; + + let n_blocked = loader.unblock(); + assert_eq!(n_blocked, 1); + + assert_eq!(handle_2.await.unwrap(), String::from("1")); + + assert_eq!(loader.loaded(), vec![1]); + } + + #[tokio::test] + async fn test_panic_request() { + let (cache, loader) = setup(); + + loader.panic_once(1); + loader.block(); + + let cache_captured = Arc::clone(&cache); + let handle_1 = tokio::spawn(async move { cache_captured.get(1).await }); + tokio::time::sleep(Duration::from_millis(10)).await; + let cache_captured = Arc::clone(&cache); + let handle_2 = tokio::spawn(async move { cache_captured.get(1).await }); + let handle_3 = tokio::spawn(async move { cache.get(2).await }); + + tokio::time::sleep(Duration::from_millis(10)).await; + + let n_blocked = loader.unblock(); + assert_eq!(n_blocked, 2); + + // panic first handle + handle_1.await.unwrap_err(); + tokio::time::sleep(Duration::from_millis(10)).await; + + // second handle should also panic + handle_2.await.unwrap_err(); + + // third handle should just work + assert_eq!(handle_3.await.unwrap(), String::from("2")); + + assert_eq!(loader.loaded(), vec![1, 2]); + } + + #[tokio::test] + async fn test_drop_cancels_loader() { + let (cache, loader) = setup(); + + loader.block(); + + let handle = tokio::spawn(async move { cache.get(1).await }); + + tokio::time::sleep(Duration::from_millis(10)).await; + + handle.abort(); + + tokio::time::sleep(Duration::from_millis(10)).await; + + assert_eq!(Arc::strong_count(&loader), 1); + } + + fn setup() -> (Arc>, Arc) { + let loader = Arc::new(TestLoader::default()); + let cache = Arc::new(Cache::new(Arc::clone(&loader) as _)); + + (cache, loader) + } + + /// Flexible loader function for testing. + #[derive(Debug, Default)] + struct TestLoader { + loaded: Mutex>, + blocked: Mutex>>, + panic: Mutex>, + } + + impl TestLoader { + /// Panic when loading value for `k`. + /// + /// If this is used together with [`block`](Self::block), the panic will occur AFTER blocking. + fn panic_once(&self, k: u8) { + self.panic.lock().insert(k); + } + + /// Block all [`load`](Self::load) requests until [`unblock`](Self::unblock) is called. + /// + /// If this is used together with [`panic_once`](Self::panic_once), the panic will occur AFTER blocking. + fn block(&self) { + let mut blocked = self.blocked.lock(); + assert!(blocked.is_none()); + *blocked = Some(Arc::new(Notify::new())); + } + + /// Unblock all requests. + /// + /// Returns number of requests that were blocked. + fn unblock(&self) -> usize { + let handle = self.blocked.lock().take().unwrap(); + let blocked_count = Arc::strong_count(&handle) - 1; + handle.notify_waiters(); + blocked_count + } + + /// List all keys that were loaded. + /// + /// Contains duplicates if keys were loaded multiple times. + fn loaded(&self) -> Vec { + self.loaded.lock().clone() + } + } + + #[async_trait] + impl Loader for TestLoader { + type K = u8; + type V = String; + + async fn load(&self, k: u8) -> String { + self.loaded.lock().push(k); + + // need to capture the cloned notify handle, otherwise the lock guard leaks into the generator + let maybe_block = self.blocked.lock().clone(); + if let Some(block) = maybe_block { + block.notified().await; + } + + // maybe panic + if self.panic.lock().remove(&k) { + panic!("test"); + } + + k.to_string() + } + } + + fn assert_send() + where + T: Send, + { + } + fn assert_sync() + where + T: Sync, + { + } + + #[test] + fn test_bounds() { + assert_send::>(); + assert_sync::>(); + } +} diff --git a/querier/src/cache_system/loader.rs b/querier/src/cache_system/loader.rs new file mode 100644 index 0000000000..46305ee6cf --- /dev/null +++ b/querier/src/cache_system/loader.rs @@ -0,0 +1,53 @@ +use async_trait::async_trait; +use futures::{future::BoxFuture, FutureExt}; +use std::future::Future; + +/// Loader for missing [`Cache`](crate::cache_system::driver::Cache) entries. +#[async_trait] +pub trait Loader: std::fmt::Debug + Send + Sync + 'static { + /// Cache key. + type K: Send + 'static; + + /// Cache value. + type V: Send + 'static; + + /// Load value for given key. + async fn load(&self, k: Self::K) -> Self::V; +} + +/// Simple-to-use wrapper for async functions to act as a [`Loader`]. +pub struct FunctionLoader { + loader: Box BoxFuture<'static, V>) + Send + Sync>, +} + +impl FunctionLoader { + /// Create loader from function. + pub fn new(loader: T) -> Self + where + T: Fn(K) -> F + Send + Sync + 'static, + F: Future + Send + 'static, + { + let loader = Box::new(move |k| loader(k).boxed()); + Self { loader } + } +} + +impl std::fmt::Debug for FunctionLoader { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FunctionLoader").finish_non_exhaustive() + } +} + +#[async_trait] +impl Loader for FunctionLoader +where + K: Send + 'static, + V: Send + 'static, +{ + type K = K; + type V = V; + + async fn load(&self, k: Self::K) -> Self::V { + (self.loader)(k).await + } +} diff --git a/querier/src/cache_system/mod.rs b/querier/src/cache_system/mod.rs new file mode 100644 index 0000000000..2359c61c1f --- /dev/null +++ b/querier/src/cache_system/mod.rs @@ -0,0 +1,2 @@ +pub mod driver; +pub mod loader; diff --git a/querier/src/chunk.rs b/querier/src/chunk.rs index 4fdb2f91ce..591b66d225 100644 --- a/querier/src/chunk.rs +++ b/querier/src/chunk.rs @@ -51,9 +51,10 @@ impl ParquetChunkAdapter { /// Create parquet chunk. async fn new_parquet_chunk(&self, decoded_parquet_file: &DecodedParquetFile) -> ParquetChunk { let parquet_file = &decoded_parquet_file.parquet_file; - let table_name = self.catalog_cache.table_name(parquet_file.table_id).await; + let table_name = self.catalog_cache.table().name(parquet_file.table_id).await; let partition_key = self .catalog_cache + .partition() .old_gen_partition_key(parquet_file.partition_id) .await; let metrics = ParquetChunkMetrics::new(self.metric_registry.as_ref()); @@ -129,15 +130,18 @@ impl ParquetChunkAdapter { ChunkAddr { db_name: self .catalog_cache - .namespace_name( + .namespace() + .name( self.catalog_cache - .table_namespace_id(parquet_file.table_id) + .table() + .namespace_id(parquet_file.table_id) .await, ) .await, - table_name: self.catalog_cache.table_name(parquet_file.table_id).await, + table_name: self.catalog_cache.table().name(parquet_file.table_id).await, partition_key: self .catalog_cache + .partition() .old_gen_partition_key(parquet_file.partition_id) .await, chunk_id: ChunkId::from(Uuid::from_u128(parquet_file.id.get() as _)), diff --git a/querier/src/lib.rs b/querier/src/lib.rs index 53539cedb0..21ce7f2ee9 100644 --- a/querier/src/lib.rs +++ b/querier/src/lib.rs @@ -14,6 +14,7 @@ pub use client_util::connection; mod cache; +mod cache_system; mod chunk; pub mod database; /// Flight client to the ingester to request in-memory data. diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index 3515ba617c..d96edccecb 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -31,7 +31,7 @@ mod test_util; /// in-memory data structure are synced regularly via [`sync`](Self::sync). /// /// To speed up the sync process and reduce the load on the [IOx Catalog](Catalog) we try to use rather large-scoped -/// queries as well as a [`CatalogCache`]. +/// queries as well as a `CatalogCache`. #[derive(Debug)] pub struct QuerierNamespace { /// Backoff config for IO operations. @@ -240,8 +240,12 @@ impl QuerierNamespace { let mut desired_partitions = HashSet::with_capacity(partitions.len()); for partition in partitions { - let table = self.catalog_cache.table_name(partition.table_id).await; - let key = self.catalog_cache.old_gen_partition_key(partition.id).await; + let table = self.catalog_cache.table().name(partition.table_id).await; + let key = self + .catalog_cache + .partition() + .old_gen_partition_key(partition.id) + .await; desired_partitions.insert((table, key)); } @@ -475,7 +479,7 @@ impl QuerierNamespace { let mut predicates_by_table_and_sequencer: HashMap<_, HashMap<_, Vec<_>>> = HashMap::with_capacity(tombstones_by_table_and_sequencer.len()); for (table_id, tombstones_by_sequencer) in tombstones_by_table_and_sequencer { - let table_name = self.catalog_cache.table_name(table_id).await; + let table_name = self.catalog_cache.table().name(table_id).await; let mut predicates_by_sequencer = HashMap::with_capacity(tombstones_by_sequencer.len()); for (sequencer_id, mut tombstones) in tombstones_by_sequencer { // sort tombstones by ID so that predicate lists are stable