diff --git a/iox_tests/src/util.rs b/iox_tests/src/util.rs index c90ff37a72..56f2459f8e 100644 --- a/iox_tests/src/util.rs +++ b/iox_tests/src/util.rs @@ -6,9 +6,9 @@ use arrow::{ }; use bytes::Bytes; use data_types2::{ - ColumnType, KafkaPartition, KafkaTopic, Namespace, ParquetFile, ParquetFileParams, Partition, - QueryPool, SequenceNumber, Sequencer, SequencerId, Table, TableId, Timestamp, Tombstone, - TombstoneId, + Column, ColumnType, KafkaPartition, KafkaTopic, Namespace, ParquetFile, ParquetFileParams, + Partition, QueryPool, SequenceNumber, Sequencer, SequencerId, Table, TableId, Timestamp, + Tombstone, TombstoneId, }; use iox_catalog::{interface::Catalog, mem::MemCatalog}; use iox_object_store::{IoxObjectStore, ParquetFilePath}; @@ -267,17 +267,37 @@ impl TestTable { } /// Create a column for the table - pub async fn create_column(self: &Arc, name: &str, column_type: ColumnType) { + pub async fn create_column( + self: &Arc, + name: &str, + column_type: ColumnType, + ) -> Arc { let mut repos = self.catalog.catalog.repositories().await; - repos + let column = repos .columns() .create_or_get(name, self.table.id, column_type) .await .unwrap(); + + Arc::new(TestColumn { + catalog: Arc::clone(&self.catalog), + namespace: Arc::clone(&self.namespace), + table: Arc::clone(self), + column, + }) } } +/// A test column. +#[allow(missing_docs)] +pub struct TestColumn { + pub catalog: Arc, + pub namespace: Arc, + pub table: Arc, + pub column: Column, +} + /// A test catalog with specified namespace, sequencer, and table #[allow(missing_docs)] pub struct TestTableBoundSequencer { diff --git a/querier/src/cache/mod.rs b/querier/src/cache/mod.rs index abbc8dc915..7dd9d8bd71 100644 --- a/querier/src/cache/mod.rs +++ b/querier/src/cache/mod.rs @@ -41,7 +41,11 @@ impl CatalogCache { pub fn new(catalog: Arc, time_provider: Arc) -> Self { let backoff_config = BackoffConfig::default(); - let namespace_cache = NamespaceCache::new(Arc::clone(&catalog), backoff_config.clone()); + let namespace_cache = NamespaceCache::new( + Arc::clone(&catalog), + backoff_config.clone(), + Arc::clone(&time_provider), + ); let table_cache = TableCache::new( Arc::clone(&catalog), backoff_config.clone(), diff --git a/querier/src/cache/namespace.rs b/querier/src/cache/namespace.rs index c6756b5116..ab77c6c7ab 100644 --- a/querier/src/cache/namespace.rs +++ b/querier/src/cache/namespace.rs @@ -1,28 +1,52 @@ //! Namespace cache. -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use backoff::{Backoff, BackoffConfig}; -use data_types2::NamespaceId; -use iox_catalog::interface::Catalog; +use data_types2::{NamespaceId, NamespaceSchema}; +use iox_catalog::interface::{get_schema_by_name, Catalog}; +use time::TimeProvider; -use crate::cache_system::{driver::Cache, loader::FunctionLoader}; +use crate::cache_system::{ + backend::{ + dual::dual_backends, + ttl::{OptionalValueTtlProvider, TtlBackend}, + }, + driver::Cache, + loader::FunctionLoader, +}; + +/// Duration to keep existing namespaces. +pub const TTL_EXISTING: Duration = Duration::from_secs(10); + +/// Duration to keep non-existing namespaces. +pub const TTL_NON_EXISTING: Duration = Duration::from_secs(60); + +type CacheFromId = Cache>>; +type CacheFromName = Cache, Option>>; /// Cache for namespace-related attributes. #[derive(Debug)] pub struct NamespaceCache { - cache: Cache, + cache_from_id: CacheFromId, + cache_from_name: CacheFromName, } impl NamespaceCache { /// Create new empty cache. - pub fn new(catalog: Arc, backoff_config: BackoffConfig) -> Self { - let loader = Arc::new(FunctionLoader::new(move |namespace_id| { - let catalog = Arc::clone(&catalog); - let backoff_config = backoff_config.clone(); + pub fn new( + catalog: Arc, + backoff_config: BackoffConfig, + time_provider: Arc, + ) -> Self { + let catalog_captured = Arc::clone(&catalog); + let backoff_config_captured = backoff_config.clone(); + let loader_from_id = Arc::new(FunctionLoader::new(move |namespace_id| { + let catalog = Arc::clone(&catalog_captured); + let backoff_config = backoff_config_captured.clone(); async move { let namespace = Backoff::new(&backoff_config) - .retry_all_errors("get namespace_name", || async { + .retry_all_errors("get namespace name by ID", || async { catalog .repositories() .await @@ -31,37 +55,125 @@ impl NamespaceCache { .await }) .await - .expect("retry forever") - .expect("namespace gone from catalog?!"); + .expect("retry forever")?; - CachedNamespace { + let schema = Backoff::new(&backoff_config) + .retry_all_errors("get namespace schema", || async { + let mut repos = catalog.repositories().await; + match get_schema_by_name(&namespace.name, repos.as_mut()).await { + Ok(schema) => Ok(Some(schema)), + Err(iox_catalog::interface::Error::NamespaceNotFound { .. }) => { + Ok(None) + } + Err(e) => Err(e), + } + }) + .await + .expect("retry forever")?; + + Some(Arc::new(CachedNamespace { name: Arc::from(namespace.name), - } + schema: Arc::new(schema), + })) } })); - let backend = Box::new(HashMap::new()); + let backend_from_id = Box::new(TtlBackend::new( + Box::new(HashMap::new()), + Arc::new(OptionalValueTtlProvider::new( + Some(TTL_NON_EXISTING), + Some(TTL_EXISTING), + )), + Arc::clone(&time_provider), + )); + let mapper_from_id = |_k: &_, maybe_table: &Option>| { + maybe_table.as_ref().map(|n| Arc::clone(&n.name)) + }; + + let loader_from_name = Arc::new(FunctionLoader::new(move |namespace_name: Arc| { + let catalog = Arc::clone(&catalog); + let backoff_config = backoff_config.clone(); + + async move { + let schema = Backoff::new(&backoff_config) + .retry_all_errors("get namespace schema", || async { + let mut repos = catalog.repositories().await; + match get_schema_by_name(&namespace_name, repos.as_mut()).await { + Ok(schema) => Ok(Some(schema)), + Err(iox_catalog::interface::Error::NamespaceNotFound { .. }) => { + Ok(None) + } + Err(e) => Err(e), + } + }) + .await + .expect("retry forever")?; + + Some(Arc::new(CachedNamespace { + name: namespace_name, + schema: Arc::new(schema), + })) + } + })); + let backend_from_name = Box::new(TtlBackend::new( + Box::new(HashMap::new()), + Arc::new(OptionalValueTtlProvider::new( + Some(TTL_NON_EXISTING), + Some(TTL_EXISTING), + )), + Arc::clone(&time_provider), + )); + let mapper_from_name = |_k: &_, maybe_table: &Option>| { + maybe_table.as_ref().map(|n| n.schema.id) + }; + + // cross backends + let (backend_from_id, backend_from_name) = dual_backends( + backend_from_id, + mapper_from_id, + backend_from_name, + mapper_from_name, + ); + + let cache_from_id = Cache::new(loader_from_id, Box::new(backend_from_id)); + let cache_from_name = Cache::new(loader_from_name, Box::new(backend_from_name)); Self { - cache: Cache::new(loader, backend), + cache_from_id, + cache_from_name, } } /// Get the namespace name for the given namespace ID. /// /// This either uses a cached value or -- if required -- fetches the mapping from the catalog. - pub async fn name(&self, id: NamespaceId) -> Arc { - self.cache.get(id).await.name + pub async fn name(&self, id: NamespaceId) -> Option> { + self.cache_from_id + .get(id) + .await + .map(|n| Arc::clone(&n.name)) + } + + /// Get namespace schema by name. + pub async fn schema(&self, name: Arc) -> Option> { + self.cache_from_name + .get(name) + .await + .map(|n| Arc::clone(&n.schema)) } } #[derive(Debug, Clone)] struct CachedNamespace { name: Arc, + schema: Arc, } #[cfg(test)] mod tests { + use std::collections::BTreeMap; + use crate::cache::test_util::assert_histogram_metric_count; + use data_types2::{ColumnSchema, ColumnType, TableSchema}; use iox_tests::util::TestCatalog; use super::*; @@ -74,18 +186,234 @@ mod tests { let ns2 = catalog.create_namespace("ns2").await.namespace.clone(); assert_ne!(ns1.id, ns2.id); - let cache = NamespaceCache::new(catalog.catalog(), BackoffConfig::default()); + let cache = NamespaceCache::new( + catalog.catalog(), + BackoffConfig::default(), + catalog.time_provider(), + ); - let name1_a = cache.name(ns1.id).await; + let name1_a = cache.name(ns1.id).await.unwrap(); assert_eq!(name1_a.as_ref(), ns1.name.as_str()); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_id", 1); - let name2 = cache.name(ns2.id).await; + let name2 = cache.name(ns2.id).await.unwrap(); assert_eq!(name2.as_ref(), ns2.name.as_str()); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_id", 2); - let name1_b = cache.name(ns1.id).await; + let name1_b = cache.name(ns1.id).await.unwrap(); assert!(Arc::ptr_eq(&name1_a, &name1_b)); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_id", 2); + + // cache timeout + catalog.mock_time_provider().inc(TTL_EXISTING); + + let name1_c = cache.name(ns1.id).await.unwrap(); + assert_eq!(name1_c.as_ref(), ns1.name.as_str()); + assert!(!Arc::ptr_eq(&name1_a, &name1_c)); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_id", 3); + } + + #[tokio::test] + async fn test_name_non_existing() { + let catalog = TestCatalog::new(); + + let cache = NamespaceCache::new( + catalog.catalog(), + BackoffConfig::default(), + catalog.time_provider(), + ); + + let none = cache.name(NamespaceId::new(i32::MAX)).await; + assert!(none.is_none()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_id", 1); + + let none = cache.name(NamespaceId::new(i32::MAX)).await; + assert!(none.is_none()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_id", 1); + + // cache timeout + catalog.mock_time_provider().inc(TTL_NON_EXISTING); + + let none = cache.name(NamespaceId::new(i32::MAX)).await; + assert!(none.is_none()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_id", 2); + } + + #[tokio::test] + async fn test_schema() { + let catalog = TestCatalog::new(); + + let ns1 = catalog.create_namespace("ns1").await; + let ns2 = catalog.create_namespace("ns2").await; + assert_ne!(ns1.namespace.id, ns2.namespace.id); + + let table11 = ns1.create_table("table1").await; + let table12 = ns1.create_table("table2").await; + let table21 = ns2.create_table("table1").await; + + let col111 = table11.create_column("col1", ColumnType::I64).await; + let col112 = table11.create_column("col2", ColumnType::Tag).await; + let col113 = table11.create_column("col3", ColumnType::Time).await; + let col121 = table12.create_column("col1", ColumnType::F64).await; + let col122 = table12.create_column("col2", ColumnType::Time).await; + let col211 = table21.create_column("col1", ColumnType::Time).await; + + let cache = NamespaceCache::new( + catalog.catalog(), + BackoffConfig::default(), + catalog.time_provider(), + ); + + let schema1_a = cache.schema(Arc::from(String::from("ns1"))).await.unwrap(); + let expected_schema_1 = NamespaceSchema { + id: ns1.namespace.id, + kafka_topic_id: ns1.namespace.kafka_topic_id, + query_pool_id: ns1.namespace.query_pool_id, + tables: BTreeMap::from([ + ( + String::from("table1"), + TableSchema { + id: table11.table.id, + columns: BTreeMap::from([ + ( + String::from("col1"), + ColumnSchema { + id: col111.column.id, + column_type: ColumnType::I64, + }, + ), + ( + String::from("col2"), + ColumnSchema { + id: col112.column.id, + column_type: ColumnType::Tag, + }, + ), + ( + String::from("col3"), + ColumnSchema { + id: col113.column.id, + column_type: ColumnType::Time, + }, + ), + ]), + }, + ), + ( + String::from("table2"), + TableSchema { + id: table12.table.id, + columns: BTreeMap::from([ + ( + String::from("col1"), + ColumnSchema { + id: col121.column.id, + column_type: ColumnType::F64, + }, + ), + ( + String::from("col2"), + ColumnSchema { + id: col122.column.id, + column_type: ColumnType::Time, + }, + ), + ]), + }, + ), + ]), + }; + assert_eq!(schema1_a.as_ref(), &expected_schema_1); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1); + + let schema2 = cache.schema(Arc::from(String::from("ns2"))).await.unwrap(); + let expected_schema_2 = NamespaceSchema { + id: ns2.namespace.id, + kafka_topic_id: ns2.namespace.kafka_topic_id, + query_pool_id: ns2.namespace.query_pool_id, + tables: BTreeMap::from([( + String::from("table1"), + TableSchema { + id: table21.table.id, + columns: BTreeMap::from([( + String::from("col1"), + ColumnSchema { + id: col211.column.id, + column_type: ColumnType::Time, + }, + )]), + }, + )]), + }; + assert_eq!(schema2.as_ref(), &expected_schema_2); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2); + + let schema1_b = cache.schema(Arc::from(String::from("ns1"))).await.unwrap(); + assert!(Arc::ptr_eq(&schema1_a, &schema1_b)); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2); + + // cache timeout + catalog.mock_time_provider().inc(TTL_EXISTING); + + let schema1_c = cache.schema(Arc::from(String::from("ns1"))).await.unwrap(); + assert_eq!(schema1_c.as_ref(), schema1_a.as_ref()); + assert!(!Arc::ptr_eq(&schema1_a, &schema1_c)); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 3); + } + + #[tokio::test] + async fn test_schema_non_existing() { + let catalog = TestCatalog::new(); + + let cache = NamespaceCache::new( + catalog.catalog(), + BackoffConfig::default(), + catalog.time_provider(), + ); + + let none = cache.schema(Arc::from(String::from("foo"))).await; + assert!(none.is_none()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1); + + let none = cache.schema(Arc::from(String::from("foo"))).await; + assert!(none.is_none()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1); + + // cache timeout + catalog.mock_time_provider().inc(TTL_NON_EXISTING); + + let none = cache.schema(Arc::from(String::from("foo"))).await; + assert!(none.is_none()); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2); + } + + #[tokio::test] + async fn test_shared_cache() { + let catalog = TestCatalog::new(); + + let ns1 = catalog.create_namespace("ns1").await.namespace.clone(); + let ns2 = catalog.create_namespace("ns2").await.namespace.clone(); + assert_ne!(ns1.id, ns2.id); + + let cache = NamespaceCache::new( + catalog.catalog(), + BackoffConfig::default(), + catalog.time_provider(), + ); + + cache.name(ns1.id).await.unwrap(); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_id", 1); + + // the schema gathering queries the ID via the name again + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1); + + cache.schema(Arc::from(String::from("ns2"))).await.unwrap(); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2); + + // `name` and `schema` share the same entries + cache.name(ns2.id).await.unwrap(); + cache.schema(Arc::from(String::from("ns1"))).await.unwrap(); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_id", 1); + assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2); } } diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index fd90379f65..8715704636 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -276,7 +276,7 @@ impl ParquetChunkAdapter { .namespace_id(parquet_file.table_id) .await?, ) - .await, + .await?, table_name: self .catalog_cache .table() diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index 292eb8a718..fe9700e109 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -1,8 +1,8 @@ //! Namespace within the whole database. use crate::{cache::CatalogCache, chunk::ParquetChunkAdapter, table::QuerierTable}; -use backoff::{Backoff, BackoffConfig}; +use backoff::BackoffConfig; use data_types2::NamespaceId; -use iox_catalog::interface::{get_schema_by_name, Catalog}; +use iox_catalog::interface::Catalog; use object_store::DynObjectStore; use observability_deps::tracing::warn; use parking_lot::RwLock; @@ -112,18 +112,12 @@ impl QuerierNamespace { /// /// Should be called regularly. pub async fn sync(&self) { - let catalog_schema_desired = Backoff::new(&self.backoff_config) - .retry_all_errors("get schema", || async { - let mut repos = self.catalog.repositories().await; - match get_schema_by_name(&self.name, repos.as_mut()).await { - Ok(schema) => Ok(Some(schema)), - Err(iox_catalog::interface::Error::NamespaceNotFound { .. }) => Ok(None), - Err(e) => Err(e), - } - }) + let catalog_schema_desired = match self + .catalog_cache + .namespace() + .schema(Arc::clone(&self.name)) .await - .expect("retry forever"); - let catalog_schema_desired = match catalog_schema_desired { + { Some(schema) => schema, None => { warn!( @@ -136,11 +130,11 @@ impl QuerierNamespace { let tables: HashMap<_, _> = catalog_schema_desired .tables - .into_iter() + .iter() .map(|(name, table_schema)| { - let name = Arc::from(name); + let name = Arc::from(name.clone()); let id = table_schema.id; - let schema = Schema::try_from(table_schema).expect("cannot build schema"); + let schema = Schema::try_from(table_schema.clone()).expect("cannot build schema"); let table = Arc::new(QuerierTable::new( self.backoff_config.clone(), @@ -161,7 +155,7 @@ impl QuerierNamespace { #[cfg(test)] mod tests { use super::*; - use crate::namespace::test_util::querier_namespace; + use crate::{cache::namespace::TTL_EXISTING, namespace::test_util::querier_namespace}; use data_types2::ColumnType; use iox_tests::util::TestCatalog; use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType}; @@ -206,6 +200,7 @@ mod tests { ns.create_table("table1").await; ns.create_table("table2").await; + catalog.mock_time_provider().inc(TTL_EXISTING); querier_namespace.sync().await; assert_eq!( tables(&querier_namespace), @@ -213,6 +208,7 @@ mod tests { ); ns.create_table("table3").await; + catalog.mock_time_provider().inc(TTL_EXISTING); querier_namespace.sync().await; assert_eq!( tables(&querier_namespace), @@ -241,6 +237,7 @@ mod tests { table.create_column("col1", ColumnType::I64).await; table.create_column("col2", ColumnType::Bool).await; table.create_column("col3", ColumnType::Tag).await; + catalog.mock_time_provider().inc(TTL_EXISTING); querier_namespace.sync().await; let expected_schema = SchemaBuilder::new() .influx_column("col1", InfluxColumnType::Field(InfluxFieldType::Integer)) @@ -253,6 +250,7 @@ mod tests { table.create_column("col4", ColumnType::Tag).await; table.create_column("col5", ColumnType::Time).await; + catalog.mock_time_provider().inc(TTL_EXISTING); querier_namespace.sync().await; let expected_schema = SchemaBuilder::new() .influx_column("col1", InfluxColumnType::Field(InfluxFieldType::Integer))