diff --git a/Cargo.lock b/Cargo.lock index f5f5a7b60a..5da801b9d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4262,6 +4262,7 @@ dependencies = [ "data_types", "datafusion 0.1.0", "db", + "iox_catalog", "iox_tests", "itertools", "metric", diff --git a/querier/src/database.rs b/querier/src/database.rs index 919ee3db78..2f7c825639 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -1,29 +1,15 @@ //! Database for the querier that contains all namespaces. -use crate::{ - cache::CatalogCache, - chunk::ParquetChunkAdapter, - namespace::QuerierNamespace, - poison::{PoisonCabinet, PoisonPill}, -}; +use crate::{cache::CatalogCache, chunk::ParquetChunkAdapter, namespace::QuerierNamespace}; use async_trait::async_trait; -use backoff::{Backoff, BackoffConfig}; -use data_types2::NamespaceId; +use backoff::BackoffConfig; use iox_catalog::interface::Catalog; use object_store::DynObjectStore; -use observability_deps::tracing::{error, info}; use parking_lot::RwLock; use query::exec::Executor; use service_common::QueryDatabaseProvider; -use std::{ - collections::{BTreeMap, HashMap}, - sync::Arc, - time::Duration, -}; +use std::{collections::HashMap, sync::Arc}; use time::TimeProvider; -use tokio_util::sync::CancellationToken; - -const SYNC_INTERVAL: Duration = Duration::from_secs(1); /// Database for the querier. /// @@ -63,14 +49,12 @@ impl QueryDatabaseProvider for QuerierDatabase { type Db = QuerierNamespace; async fn db(&self, name: &str) -> Option> { - self.namespace(name) + self.namespace(name).await } } impl QuerierDatabase { - /// Create new, empty database. - /// - /// You may call [`sync`](Self::sync) to fill this database with content. + /// Create new database. pub fn new( catalog: Arc, metric_registry: Arc, @@ -102,108 +86,21 @@ impl QuerierDatabase { } } - /// List of namespaces. - pub fn namespaces(&self) -> Vec> { - self.namespaces.read().values().cloned().collect() - } - /// Get namespace if it exists - pub fn namespace(&self, name: &str) -> Option> { - self.namespaces.read().get(name).cloned() - } - - /// Sync set of namespaces and the data of the namespaces themselves. - /// - /// Should be called regularly. - pub async fn sync(&self) { - let namespaces = Backoff::new(&self.backoff_config) - .retry_all_errors("get namespaces", || async { - self.catalog.repositories().await.namespaces().list().await - }) - .await - .expect("unlimited retry"); - let namespaces: BTreeMap, NamespaceId> = namespaces - .into_iter() - .map(|ns| (ns.name.into(), ns.id)) - .collect(); - - // lock namespaces AFTER IO - let querier_namespaces: Vec<_> = { - let mut namespaces_guard = self.namespaces.write(); - - // calculate set differences - let to_add: Vec<(Arc, NamespaceId)> = namespaces - .iter() - .filter_map(|(name, id)| { - (!namespaces_guard.contains_key(name)).then(|| (Arc::clone(name), *id)) - }) - .collect(); - let to_delete: Vec> = namespaces_guard - .keys() - .filter_map(|name| (!namespaces.contains_key(name)).then(|| Arc::clone(name))) - .collect(); - info!( - add = to_add.len(), - delete = to_delete.len(), - actual = namespaces_guard.len(), - desired = namespaces.len(), - "Syncing namespaces", - ); - - // perform modification - for name in to_delete { - // TODO(marco): this is currently untested because `iox_catalog` doesn't implement namespace deletion - namespaces_guard.remove(&name); - } - for (name, id) in to_add { - namespaces_guard.insert( - Arc::clone(&name), - Arc::new(QuerierNamespace::new( - self.backoff_config.clone(), - Arc::clone(&self.chunk_adapter), - name, - id, - Arc::clone(&self.exec), - )), - ); - } - - // get a clone of namespace Arcs so that we can run an async operation - namespaces_guard.values().cloned().collect() - }; - - // downgrade guard so that other readers are allowed again and sync namespace states. - for namespace in querier_namespaces { - namespace.sync().await; - } - } -} - -/// Run regular [`sync`](QuerierDatabase::sync) until shutdown token is canceled. -pub(crate) async fn database_sync_loop( - database: Arc, - shutdown: CancellationToken, - poison_cabinet: Arc, -) { - loop { - if shutdown.is_cancelled() { - info!("Database sync shutdown"); - return; - } - if poison_cabinet.contains(&PoisonPill::DatabaseSyncPanic) { - panic!("Database sync poisened, panic"); - } - if poison_cabinet.contains(&PoisonPill::DatabaseSyncExit) { - error!("Database sync poisened, exit early"); - return; - } - - database.sync().await; - - tokio::select!( - _ = tokio::time::sleep(SYNC_INTERVAL) => {}, - _ = shutdown.cancelled() => {}, - ); + pub async fn namespace(&self, name: &str) -> Option> { + let name = Arc::from(name.to_owned()); + let schema = self + .catalog_cache + .namespace() + .schema(Arc::clone(&name)) + .await?; + Some(Arc::new(QuerierNamespace::new( + self.backoff_config.clone(), + Arc::clone(&self.chunk_adapter), + schema, + name, + Arc::clone(&self.exec), + ))) } } @@ -214,7 +111,7 @@ mod tests { use super::*; #[tokio::test] - async fn test_sync() { + async fn test_namespace() { let catalog = TestCatalog::new(); let db = QuerierDatabase::new( @@ -224,36 +121,10 @@ mod tests { catalog.time_provider(), catalog.exec(), ); - assert_eq!(ns_names(&db), vec![]); - - db.sync().await; - assert_eq!(ns_names(&db), vec![]); catalog.create_namespace("ns1").await; - catalog.create_namespace("ns2").await; - db.sync().await; - assert_eq!(ns_names(&db), vec![Arc::from("ns1"), Arc::from("ns2")]); - let ns1_a = db.namespace("ns1").unwrap(); - let ns2_a = db.namespace("ns2").unwrap(); - assert_eq!(ns1_a.name().as_ref(), "ns1"); - assert_eq!(ns2_a.name().as_ref(), "ns2"); - assert!(db.namespace("ns3").is_none()); - catalog.create_namespace("ns3").await; - db.sync().await; - assert_eq!( - ns_names(&db), - vec![Arc::from("ns1"), Arc::from("ns2"), Arc::from("ns3")] - ); - let ns1_b = db.namespace("ns1").unwrap(); - let ns2_b = db.namespace("ns2").unwrap(); - assert!(Arc::ptr_eq(&ns1_a, &ns1_b)); - assert!(Arc::ptr_eq(&ns2_a, &ns2_b)); - } - - fn ns_names(db: &QuerierDatabase) -> Vec> { - let mut names: Vec> = db.namespaces().iter().map(|ns| ns.name()).collect(); - names.sort(); - names + assert!(db.namespace("ns1").await.is_some()); + assert!(db.namespace("ns2").await.is_none()); } } diff --git a/querier/src/handler.rs b/querier/src/handler.rs index 6ba2402f50..2937e7d029 100644 --- a/querier/src/handler.rs +++ b/querier/src/handler.rs @@ -12,10 +12,7 @@ use thiserror::Error; use tokio::task::{JoinError, JoinHandle}; use tokio_util::sync::CancellationToken; -use crate::{ - database::{database_sync_loop, QuerierDatabase}, - poison::PoisonCabinet, -}; +use crate::{database::QuerierDatabase, poison::PoisonCabinet}; #[derive(Debug, Error)] #[allow(missing_copy_implementations, missing_docs)] @@ -63,14 +60,7 @@ impl QuerierHandlerImpl { let shutdown = CancellationToken::new(); let poison_cabinet = Arc::new(PoisonCabinet::new()); - let join_handles = vec![( - String::from("database sync"), - shared_handle(tokio::spawn(database_sync_loop( - Arc::clone(&database), - shutdown.clone(), - Arc::clone(&poison_cabinet), - ))), - )]; + let join_handles = vec![]; Self { database, join_handles, @@ -98,6 +88,8 @@ impl QuerierHandler for QuerierHandlerImpl { panic!("Background worker '{name}' exited early!"); } } + + self.shutdown.cancelled().await; } fn shutdown(&self) { @@ -123,8 +115,6 @@ mod tests { use query::exec::Executor; use time::{MockProvider, Time}; - use crate::poison::PoisonPill; - use super::*; #[tokio::test] @@ -144,26 +134,6 @@ mod tests { .unwrap(); } - #[tokio::test] - #[should_panic(expected = "Background worker 'database sync' exited early!")] - async fn test_supervise_database_sync_early_exit() { - let querier = TestQuerier::new().querier; - querier.poison_cabinet.add(PoisonPill::DatabaseSyncExit); - tokio::time::timeout(Duration::from_millis(2000), querier.join()) - .await - .unwrap(); - } - - #[tokio::test] - #[should_panic(expected = "JoinError::Panic")] - async fn test_supervise_database_sync_panic() { - let querier = TestQuerier::new().querier; - querier.poison_cabinet.add(PoisonPill::DatabaseSyncPanic); - tokio::time::timeout(Duration::from_millis(2000), querier.join()) - .await - .unwrap(); - } - struct TestQuerier { querier: QuerierHandlerImpl, } diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index fe9700e109..0b7422eb3d 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -1,11 +1,9 @@ //! Namespace within the whole database. use crate::{cache::CatalogCache, chunk::ParquetChunkAdapter, table::QuerierTable}; use backoff::BackoffConfig; -use data_types2::NamespaceId; +use data_types2::{NamespaceId, NamespaceSchema}; use iox_catalog::interface::Catalog; use object_store::DynObjectStore; -use observability_deps::tracing::warn; -use parking_lot::RwLock; use query::exec::Executor; use schema::Schema; use std::{collections::HashMap, sync::Arc}; @@ -19,71 +17,72 @@ mod test_util; /// Maps a catalog namespace to all the in-memory resources and sync-state that the querier needs. /// /// # Data Structures & Sync -/// Tables and schemas are synced ahead of usage (via [`sync`](Self::sync)) because DataFusion does not implement async +/// Tables and schemas are created when [`QuerierNamespace`] is created because DataFusion does not implement async /// schema inspection. The actual payload (chunks and tombstones) are only queried on demand. /// /// Most access to the [IOx Catalog](Catalog) are cached. #[derive(Debug)] pub struct QuerierNamespace { - /// Backoff config for IO operations. - backoff_config: BackoffConfig, - - /// The catalog. - catalog: Arc, - - /// Catalog IO cache. - catalog_cache: Arc, - - /// Tables in this namespace. - tables: RwLock, Arc>>>, - - /// Adapter to create chunks. - chunk_adapter: Arc, - /// ID of this namespace. id: NamespaceId, /// Name of this namespace. name: Arc, + /// Tables in this namespace. + tables: Arc, Arc>>, + /// Executor for queries. exec: Arc, } impl QuerierNamespace { - /// Create new, empty namespace. - /// - /// You may call [`sync`](Self::sync) to fill the namespace with chunks. + /// Create new namespace for given schema. pub fn new( backoff_config: BackoffConfig, chunk_adapter: Arc, + schema: Arc, name: Arc, - id: NamespaceId, exec: Arc, ) -> Self { - let catalog_cache = Arc::clone(chunk_adapter.catalog_cache()); - let catalog = catalog_cache.catalog(); + let tables: HashMap<_, _> = schema + .tables + .iter() + .map(|(name, table_schema)| { + let name = Arc::from(name.clone()); + let id = table_schema.id; + let schema = Schema::try_from(table_schema.clone()).expect("cannot build schema"); + + let table = Arc::new(QuerierTable::new( + backoff_config.clone(), + id, + Arc::clone(&name), + Arc::new(schema), + Arc::clone(&chunk_adapter), + )); + + (name, table) + }) + .collect(); + + let id = schema.id; Self { - backoff_config, - catalog, - catalog_cache: Arc::clone(&catalog_cache), - tables: RwLock::new(Arc::new(HashMap::new())), - chunk_adapter, id, name, + tables: Arc::new(tables), exec, } } - /// Create new empty namespace for testing. + /// Create new namespace for given schema, for testing. pub fn new_testing( catalog: Arc, object_store: Arc, metric_registry: Arc, time_provider: Arc, name: Arc, - id: NamespaceId, + schema: Arc, exec: Arc, ) -> Self { let catalog_cache = Arc::new(CatalogCache::new(catalog, Arc::clone(&time_provider))); @@ -94,124 +93,44 @@ impl QuerierNamespace { time_provider, )); - Self::new(BackoffConfig::default(), chunk_adapter, name, id, exec) + Self::new(BackoffConfig::default(), chunk_adapter, schema, name, exec) } /// Namespace name. pub fn name(&self) -> Arc { Arc::clone(&self.name) } - - /// Sync partial namespace state. - /// - /// This includes: - /// - tables - /// - schemas - /// - /// Chunks and tombstones are queried on-demand. - /// - /// Should be called regularly. - pub async fn sync(&self) { - let catalog_schema_desired = match self - .catalog_cache - .namespace() - .schema(Arc::clone(&self.name)) - .await - { - Some(schema) => schema, - None => { - warn!( - namespace = self.name.as_ref(), - "Cannot sync namespace because it is gone", - ); - return; - } - }; - - let tables: HashMap<_, _> = catalog_schema_desired - .tables - .iter() - .map(|(name, table_schema)| { - let name = Arc::from(name.clone()); - let id = table_schema.id; - let schema = Schema::try_from(table_schema.clone()).expect("cannot build schema"); - - let table = Arc::new(QuerierTable::new( - self.backoff_config.clone(), - id, - Arc::clone(&name), - Arc::new(schema), - Arc::clone(&self.chunk_adapter), - )); - - (name, table) - }) - .collect(); - - *self.tables.write() = Arc::new(tables); - } } #[cfg(test)] mod tests { use super::*; - use crate::{cache::namespace::TTL_EXISTING, namespace::test_util::querier_namespace}; + use crate::namespace::test_util::querier_namespace; use data_types2::ColumnType; use iox_tests::util::TestCatalog; use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType}; - #[tokio::test] - async fn test_sync_namespace_gone() { - let catalog = TestCatalog::new(); - - let catalog_cache = Arc::new(CatalogCache::new( - catalog.catalog(), - catalog.time_provider(), - )); - let chunk_adapter = Arc::new(ParquetChunkAdapter::new( - catalog_cache, - catalog.object_store(), - catalog.metric_registry(), - catalog.time_provider(), - )); - let querier_namespace = QuerierNamespace::new( - BackoffConfig::default(), - chunk_adapter, - "ns".into(), - NamespaceId::new(1), - catalog.exec(), - ); - - // The container (`QuerierDatabase`) should prune the namespace if it's gone, however the `sync` might still be - // in-progress and must not block or panic. - querier_namespace.sync().await; - } - #[tokio::test] async fn test_sync_tables() { let catalog = TestCatalog::new(); let ns = catalog.create_namespace("ns").await; - let querier_namespace = querier_namespace(&ns); - - querier_namespace.sync().await; - assert_eq!(tables(&querier_namespace), Vec::::new()); + let qns = querier_namespace(&ns).await; + assert_eq!(tables(&qns), Vec::::new()); ns.create_table("table1").await; ns.create_table("table2").await; - catalog.mock_time_provider().inc(TTL_EXISTING); - querier_namespace.sync().await; + let qns = querier_namespace(&ns).await; assert_eq!( - tables(&querier_namespace), + tables(&qns), vec![String::from("table1"), String::from("table2")] ); ns.create_table("table3").await; - catalog.mock_time_provider().inc(TTL_EXISTING); - querier_namespace.sync().await; + let qns = querier_namespace(&ns).await; assert_eq!( - tables(&querier_namespace), + tables(&qns), vec![ String::from("table1"), String::from("table2"), @@ -227,31 +146,27 @@ mod tests { let ns = catalog.create_namespace("ns").await; let table = ns.create_table("table").await; - let querier_namespace = querier_namespace(&ns); - - querier_namespace.sync().await; + let qns = querier_namespace(&ns).await; let expected_schema = SchemaBuilder::new().build().unwrap(); - let actual_schema = schema(&querier_namespace, "table"); + let actual_schema = schema(&qns, "table"); assert_eq!(actual_schema.as_ref(), &expected_schema,); table.create_column("col1", ColumnType::I64).await; table.create_column("col2", ColumnType::Bool).await; table.create_column("col3", ColumnType::Tag).await; - catalog.mock_time_provider().inc(TTL_EXISTING); - querier_namespace.sync().await; + let qns = querier_namespace(&ns).await; let expected_schema = SchemaBuilder::new() .influx_column("col1", InfluxColumnType::Field(InfluxFieldType::Integer)) .influx_column("col2", InfluxColumnType::Field(InfluxFieldType::Boolean)) .influx_column("col3", InfluxColumnType::Tag) .build() .unwrap(); - let actual_schema = schema(&querier_namespace, "table"); + let actual_schema = schema(&qns, "table"); assert_eq!(actual_schema.as_ref(), &expected_schema,); table.create_column("col4", ColumnType::Tag).await; table.create_column("col5", ColumnType::Time).await; - catalog.mock_time_provider().inc(TTL_EXISTING); - querier_namespace.sync().await; + let qns = querier_namespace(&ns).await; let expected_schema = SchemaBuilder::new() .influx_column("col1", InfluxColumnType::Field(InfluxFieldType::Integer)) .influx_column("col2", InfluxColumnType::Field(InfluxFieldType::Boolean)) @@ -260,7 +175,7 @@ mod tests { .influx_column("col5", InfluxColumnType::Timestamp) .build() .unwrap(); - let actual_schema = schema(&querier_namespace, "table"); + let actual_schema = schema(&qns, "table"); assert_eq!(actual_schema.as_ref(), &expected_schema,); } @@ -276,7 +191,6 @@ mod tests { sorted( querier_namespace .tables - .read() .keys() .map(|s| s.to_string()) .collect(), @@ -284,6 +198,6 @@ mod tests { } fn schema(querier_namespace: &QuerierNamespace, table: &str) -> Arc { - Arc::clone(querier_namespace.tables.read().get(table).unwrap().schema()) + Arc::clone(querier_namespace.tables.get(table).unwrap().schema()) } } diff --git a/querier/src/namespace/query_access.rs b/querier/src/namespace/query_access.rs index d7d789bad4..a700af227d 100644 --- a/querier/src/namespace/query_access.rs +++ b/querier/src/namespace/query_access.rs @@ -17,16 +17,13 @@ use crate::{namespace::QuerierNamespace, table::QuerierTable}; impl QueryDatabaseMeta for QuerierNamespace { fn table_names(&self) -> Vec { - let mut names: Vec<_> = self.tables.read().keys().map(|s| s.to_string()).collect(); + let mut names: Vec<_> = self.tables.keys().map(|s| s.to_string()).collect(); names.sort(); names } fn table_schema(&self, table_name: &str) -> Option> { - self.tables - .read() - .get(table_name) - .map(|t| Arc::clone(t.schema())) + self.tables.get(table_name).map(|t| Arc::clone(t.schema())) } } @@ -34,7 +31,7 @@ impl QueryDatabaseMeta for QuerierNamespace { impl QueryDatabase for QuerierNamespace { async fn chunks(&self, table_name: &str, predicate: &Predicate) -> Vec> { // get table metadata - let table = match self.tables.read().get(table_name).map(Arc::clone) { + let table = match self.tables.get(table_name).map(Arc::clone) { Some(table) => table, None => { // table gone @@ -85,7 +82,7 @@ pub struct QuerierCatalogProvider { impl QuerierCatalogProvider { fn from_namespace(namespace: &QuerierNamespace) -> Self { Self { - tables: Arc::clone(&namespace.tables.read()), + tables: Arc::clone(&namespace.tables), } } } @@ -249,8 +246,7 @@ mod tests { .create_tombstone(1, 1, 13, "host=d") .await; - let querier_namespace = Arc::new(querier_namespace(&ns)); - querier_namespace.sync().await; + let querier_namespace = Arc::new(querier_namespace(&ns).await); assert_query( &querier_namespace, diff --git a/querier/src/namespace/test_util.rs b/querier/src/namespace/test_util.rs index 5e84589f0b..0a817c858c 100644 --- a/querier/src/namespace/test_util.rs +++ b/querier/src/namespace/test_util.rs @@ -1,18 +1,26 @@ use std::sync::Arc; +use iox_catalog::interface::get_schema_by_name; use iox_tests::util::TestNamespace; use super::QuerierNamespace; /// Create [`QuerierNamespace`] for testing. -pub fn querier_namespace(ns: &Arc) -> QuerierNamespace { +pub async fn querier_namespace(ns: &Arc) -> QuerierNamespace { + let mut repos = ns.catalog.catalog.repositories().await; + let schema = Arc::new( + get_schema_by_name(&ns.namespace.name, repos.as_mut()) + .await + .unwrap(), + ); + QuerierNamespace::new_testing( ns.catalog.catalog(), ns.catalog.object_store(), ns.catalog.metric_registry(), ns.catalog.time_provider(), ns.namespace.name.clone().into(), - ns.namespace.id, + schema, ns.catalog.exec(), ) } diff --git a/querier/src/poison.rs b/querier/src/poison.rs index 5b94194eb5..d9c037603a 100644 --- a/querier/src/poison.rs +++ b/querier/src/poison.rs @@ -8,10 +8,7 @@ use parking_lot::{RwLock, RwLockUpgradableReadGuard}; use pin_project::pin_project; #[derive(Debug, Clone, PartialEq, Eq)] -pub enum PoisonPill { - DatabaseSyncPanic, - DatabaseSyncExit, -} +pub enum PoisonPill {} #[derive(Debug)] struct PoisonCabinetInner { diff --git a/query_tests/Cargo.toml b/query_tests/Cargo.toml index 87cb8b6ce2..62b57cb7cf 100644 --- a/query_tests/Cargo.toml +++ b/query_tests/Cargo.toml @@ -12,6 +12,7 @@ async-trait = "0.1" data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } db = { path = "../db" } +iox_catalog = { path = "../iox_catalog" } iox_tests = { path = "../iox_tests" } itertools = "0.10" mutable_batch_lp = { path = "../mutable_batch_lp" } diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index f91cf7f73f..7840b7ad9c 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -7,6 +7,7 @@ use db::{ utils::{count_mub_table_chunks, count_os_table_chunks, count_rub_table_chunks, make_db}, Db, }; +use iox_catalog::interface::get_schema_by_name; use iox_tests::util::{TestCatalog, TestNamespace}; use itertools::Itertools; use querier::QuerierNamespace; @@ -1227,16 +1228,20 @@ async fn make_ng_chunk(ns: Arc, chunk: ChunkDataNew<'_, '_>) -> S } async fn make_querier_namespace(ns: Arc) -> Arc { - let db = Arc::new(QuerierNamespace::new_testing( + let mut repos = ns.catalog.catalog.repositories().await; + let schema = Arc::new( + get_schema_by_name(&ns.namespace.name, repos.as_mut()) + .await + .unwrap(), + ); + + Arc::new(QuerierNamespace::new_testing( ns.catalog.catalog(), ns.catalog.object_store(), ns.catalog.metric_registry(), ns.catalog.time_provider(), ns.namespace.name.clone().into(), - ns.namespace.id, + schema, ns.catalog.exec(), - )); - db.sync().await; - - db + )) }