diff --git a/clap_blocks/src/querier.rs b/clap_blocks/src/querier.rs index 78c07b540b..cfb5c37493 100644 --- a/clap_blocks/src/querier.rs +++ b/clap_blocks/src/querier.rs @@ -45,6 +45,14 @@ pub struct QuerierConfig { default_value = "1073741824" )] pub ram_pool_bytes: usize, + + /// Limit the number of concurrent queries. + #[clap( + long = "--max-concurrent-queries", + env = "INFLUXDB_IOX_MAX_CONCURRENT_QUERIES", + default_value = "10" + )] + pub max_concurrent_queries: usize, } impl QuerierConfig { @@ -75,6 +83,11 @@ impl QuerierConfig { pub fn ram_pool_bytes(&self) -> usize { self.ram_pool_bytes } + + /// Number of namespaces that can be used at once. + pub fn max_concurrent_queries(&self) -> usize { + self.max_concurrent_queries + } } #[cfg(test)] diff --git a/influxdb_iox/src/commands/run/all_in_one.rs b/influxdb_iox/src/commands/run/all_in_one.rs index fc23a9b32b..4f2445ed55 100644 --- a/influxdb_iox/src/commands/run/all_in_one.rs +++ b/influxdb_iox/src/commands/run/all_in_one.rs @@ -278,6 +278,14 @@ pub struct Config { default_value = "1073741824" )] pub querier_ram_pool_bytes: usize, + + /// Limit the number of concurrent queries. + #[clap( + long = "--querier-max-concurrent-queries", + env = "INFLUXDB_IOX_QUERIER_MAX_CONCURRENT_QUERIES", + default_value = "10" + )] + pub querier_max_concurrent_queries: usize, } impl Config { @@ -301,6 +309,7 @@ impl Config { ingester_grpc_bind_address, compactor_grpc_bind_address, querier_ram_pool_bytes, + querier_max_concurrent_queries, } = self; let object_store_config = ObjectStoreConfig::new(database_directory.clone()); @@ -368,6 +377,7 @@ impl Config { num_query_threads: None, // will be ignored ingester_addresses: vec![], // will be ignored ram_pool_bytes: querier_ram_pool_bytes, + max_concurrent_queries: querier_max_concurrent_queries, }; SpecializedConfig { @@ -495,6 +505,7 @@ pub async fn command(config: Config) -> Result<()> { exec, ingester_addresses, ram_pool_bytes: querier_config.ram_pool_bytes(), + max_concurrent_queries: querier_config.max_concurrent_queries(), }) .await; diff --git a/influxdb_iox/src/commands/run/querier.rs b/influxdb_iox/src/commands/run/querier.rs index 37c5ade89e..6eab40bc3b 100644 --- a/influxdb_iox/src/commands/run/querier.rs +++ b/influxdb_iox/src/commands/run/querier.rs @@ -95,6 +95,7 @@ pub async fn command(config: Config) -> Result<(), Error> { let exec = Arc::new(Executor::new(num_threads)); let ram_pool_bytes = config.querier_config.ram_pool_bytes(); + let max_concurrent_queries = config.querier_config.max_concurrent_queries(); let server_type = create_querier_server_type(QuerierServerTypeArgs { common_state: &common_state, metric_registry: Arc::clone(&metric_registry), @@ -104,6 +105,7 @@ pub async fn command(config: Config) -> Result<(), Error> { exec, ingester_addresses, ram_pool_bytes, + max_concurrent_queries, }) .await; diff --git a/ioxd_querier/src/lib.rs b/ioxd_querier/src/lib.rs index 05b88f9373..76059ca146 100644 --- a/ioxd_querier/src/lib.rs +++ b/ioxd_querier/src/lib.rs @@ -147,6 +147,7 @@ pub struct QuerierServerTypeArgs<'a> { pub exec: Arc, pub ingester_addresses: Vec, pub ram_pool_bytes: usize, + pub max_concurrent_queries: usize, } /// Instantiate a querier server @@ -165,6 +166,7 @@ pub async fn create_querier_server_type(args: QuerierServerTypeArgs<'_>) -> Arc< ParquetStorage::new(args.object_store), args.exec, ingester_connection, + args.max_concurrent_queries, )); let querier_handler = Arc::new(QuerierHandlerImpl::new(args.catalog, Arc::clone(&database))); diff --git a/ioxd_querier/src/rpc/namespace.rs b/ioxd_querier/src/rpc/namespace.rs index 0649ccbff2..089418ce6d 100644 --- a/ioxd_querier/src/rpc/namespace.rs +++ b/ioxd_querier/src/rpc/namespace.rs @@ -76,6 +76,7 @@ mod tests { ParquetStorage::new(catalog.object_store()), catalog.exec(), create_ingester_connection_for_testing(), + QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, )); let service = NamespaceServiceImpl::new(db); @@ -103,6 +104,7 @@ mod tests { ParquetStorage::new(catalog.object_store()), catalog.exec(), create_ingester_connection_for_testing(), + QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, )); let service = NamespaceServiceImpl::new(db); diff --git a/querier/src/database.rs b/querier/src/database.rs index 08bb75b39d..b8e5ecbb4f 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -11,6 +11,7 @@ use iox_query::exec::Executor; use parquet_file::storage::ParquetStorage; use service_common::QueryDatabaseProvider; use std::sync::Arc; +use tokio::sync::Semaphore; /// The number of entries to store in the circular query buffer log. /// @@ -43,6 +44,13 @@ pub struct QuerierDatabase { /// Query log. query_log: Arc, + + /// Semaphore that limits the number of namespaces in used at the time by the query subsystem. + /// + /// This should be a 1-to-1 relation to the number of active queries. + /// + /// If the same database is requested twice for different queries, it is counted twice. + query_execution_semaphore: Arc, } #[async_trait] @@ -55,6 +63,12 @@ impl QueryDatabaseProvider for QuerierDatabase { } impl QuerierDatabase { + /// The maximum value for `max_concurrent_queries` that is allowed. + /// + /// This limit exists because [`tokio::sync::Semaphore`] has an internal limit and semaphore creation beyond that + /// will panic. The tokio limit is not exposed though so we pick a reasonable but smaller number. + pub const MAX_CONCURRENT_QUERIES_MAX: usize = u16::MAX as usize; + /// Create new database. pub fn new( catalog_cache: Arc, @@ -62,7 +76,15 @@ impl QuerierDatabase { store: ParquetStorage, exec: Arc, ingester_connection: Arc, + max_concurrent_queries: usize, ) -> Self { + assert!( + max_concurrent_queries <= Self::MAX_CONCURRENT_QUERIES_MAX, + "`max_concurrent_queries` ({}) > `max_concurrent_queries_MAX` ({})", + max_concurrent_queries, + Self::MAX_CONCURRENT_QUERIES_MAX, + ); + let chunk_adapter = Arc::new(ChunkAdapter::new( Arc::clone(&catalog_cache), store, @@ -70,6 +92,7 @@ impl QuerierDatabase { catalog_cache.time_provider(), )); let query_log = Arc::new(QueryLog::new(QUERY_LOG_SIZE, catalog_cache.time_provider())); + let namespace_semaphore = Arc::new(Semaphore::new(max_concurrent_queries)); Self { backoff_config: BackoffConfig::default(), @@ -79,11 +102,21 @@ impl QuerierDatabase { exec, ingester_connection, query_log, + query_execution_semaphore: namespace_semaphore, } } - /// Get namespace if it exists + /// Get namespace if it exists. + /// + /// This will await the internal namespace semaphore. Existence of namespaces is checked AFTER a semaphore permit + /// was acquired since this lowers the chance that we obtain stale data. pub async fn namespace(&self, name: &str) -> Option> { + // get the permit first + let permit = Arc::clone(&self.query_execution_semaphore) + .acquire_owned() + .await + .expect("Semaphore should NOT be closed by now"); + let name = Arc::from(name.to_owned()); let schema = self .catalog_cache @@ -97,6 +130,7 @@ impl QuerierDatabase { Arc::clone(&self.exec), Arc::clone(&self.ingester_connection), Arc::clone(&self.query_log), + permit, ))) } @@ -124,12 +158,38 @@ impl QuerierDatabase { #[cfg(test)] mod tests { + use std::{future::Future, time::Duration}; + use iox_tests::util::TestCatalog; + use tokio::pin; use crate::create_ingester_connection_for_testing; use super::*; + #[test] + #[should_panic( + expected = "`max_concurrent_queries` (65536) > `max_concurrent_queries_MAX` (65535)" + )] + fn test_semaphore_limit_is_checked() { + let catalog = TestCatalog::new(); + + let catalog_cache = Arc::new(CatalogCache::new( + catalog.catalog(), + catalog.time_provider(), + catalog.metric_registry(), + usize::MAX, + )); + QuerierDatabase::new( + catalog_cache, + catalog.metric_registry(), + ParquetStorage::new(catalog.object_store()), + catalog.exec(), + create_ingester_connection_for_testing(), + QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX.saturating_add(1), + ); + } + #[tokio::test] async fn test_namespace() { let catalog = TestCatalog::new(); @@ -146,6 +206,7 @@ mod tests { ParquetStorage::new(catalog.object_store()), catalog.exec(), create_ingester_connection_for_testing(), + QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, ); catalog.create_namespace("ns1").await; @@ -153,4 +214,108 @@ mod tests { assert!(db.namespace("ns1").await.is_some()); assert!(db.namespace("ns2").await.is_none()); } + + #[tokio::test] + async fn test_namespaces() { + let catalog = TestCatalog::new(); + + let catalog_cache = Arc::new(CatalogCache::new( + catalog.catalog(), + catalog.time_provider(), + catalog.metric_registry(), + usize::MAX, + )); + let db = QuerierDatabase::new( + catalog_cache, + catalog.metric_registry(), + ParquetStorage::new(catalog.object_store()), + catalog.exec(), + create_ingester_connection_for_testing(), + QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, + ); + + catalog.create_namespace("ns1").await; + catalog.create_namespace("ns2").await; + + let mut namespaces = db.namespaces().await; + namespaces.sort_by_key(|ns| ns.name.clone()); + assert_eq!(namespaces.len(), 2); + assert_eq!(namespaces[0].name, "ns1"); + assert_eq!(namespaces[1].name, "ns2"); + } + + #[tokio::test] + async fn test_namespace_semaphore() { + let catalog = TestCatalog::new(); + + let catalog_cache = Arc::new(CatalogCache::new( + catalog.catalog(), + catalog.time_provider(), + catalog.metric_registry(), + usize::MAX, + )); + let db = QuerierDatabase::new( + catalog_cache, + catalog.metric_registry(), + ParquetStorage::new(catalog.object_store()), + catalog.exec(), + create_ingester_connection_for_testing(), + 2, + ); + + catalog.create_namespace("ns1").await; + catalog.create_namespace("ns2").await; + catalog.create_namespace("ns3").await; + + // consume all semaphore permits + let ns1 = db.namespace("ns1").await.unwrap(); + let ns2 = db.namespace("ns2").await.unwrap(); + + // cannot get any new namespace, even when we already have a namespace for the same name + let fut3 = db.namespace("ns3"); + let fut1 = db.namespace("ns1"); + let fut9 = db.namespace("ns9"); + let fut2 = db.namespace("ns2"); + pin!(fut3); + pin!(fut1); + pin!(fut9); + pin!(fut2); + assert_fut_pending(&mut fut3).await; + assert_fut_pending(&mut fut1).await; + assert_fut_pending(&mut fut9).await; + assert_fut_pending(&mut fut2).await; + + // dropping the newest namespace frees a permit + drop(ns2); + let ns3 = fut3.await.unwrap(); + assert_fut_pending(&mut fut1).await; + assert_fut_pending(&mut fut9).await; + assert_fut_pending(&mut fut2).await; + + // dropping the newest namespace frees a permit + drop(ns3); + let _ns1b = fut1.await.unwrap(); + assert_fut_pending(&mut fut9).await; + assert_fut_pending(&mut fut2).await; + + // dropping the oldest namespace frees a permit + drop(ns1); + assert!(fut9.await.is_none()); + // because "ns9" did not exist, we immediately get a new permit + fut2.await.unwrap(); + } + + /// Assert that given future is pending. + /// + /// This will try to poll the future a bit to ensure that it is not stuck in tokios task preemption. + async fn assert_fut_pending(fut: &mut F) + where + F: Future + Send + Unpin, + F::Output: std::fmt::Debug, + { + tokio::select! { + x = fut => panic!("future is not pending, yielded: {x:?}"), + _ = tokio::time::sleep(Duration::from_millis(10)) => {}, + }; + } } diff --git a/querier/src/handler.rs b/querier/src/handler.rs index 83f552a03f..12670d4e24 100644 --- a/querier/src/handler.rs +++ b/querier/src/handler.rs @@ -180,6 +180,7 @@ mod tests { ParquetStorage::new(object_store), exec, create_ingester_connection_for_testing(), + QuerierDatabase::MAX_CONCURRENT_QUERIES_MAX, )); let querier = QuerierHandlerImpl::new(catalog, database); diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index d0b2bfa672..baa27b188d 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -9,6 +9,7 @@ use iox_query::exec::Executor; use parquet_file::storage::ParquetStorage; use schema::Schema; use std::{collections::HashMap, sync::Arc}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; mod query_access; @@ -41,6 +42,10 @@ pub struct QuerierNamespace { /// Query log. query_log: Arc, + + /// Permit that limits the number of active namespaces at the same time. + #[allow(dead_code)] + permit: OwnedSemaphorePermit, } impl QuerierNamespace { @@ -52,6 +57,7 @@ impl QuerierNamespace { exec: Arc, ingester_connection: Arc, query_log: Arc, + permit: OwnedSemaphorePermit, ) -> Self { let tables: HashMap<_, _> = schema .tables @@ -83,6 +89,7 @@ impl QuerierNamespace { exec, catalog_cache: Arc::clone(chunk_adapter.catalog_cache()), query_log, + permit, } } @@ -106,6 +113,9 @@ impl QuerierNamespace { )); let query_log = Arc::new(QueryLog::new(10, time_provider)); + let semaphore = Arc::new(Semaphore::new(1)); + let permit = semaphore.try_acquire_owned().unwrap(); + Self::new( chunk_adapter, schema, @@ -113,6 +123,7 @@ impl QuerierNamespace { exec, ingester_connection, query_log, + permit, ) }