diff --git a/Cargo.lock b/Cargo.lock index 082eaf3a2f..7e3faf6f26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3969,6 +3969,7 @@ dependencies = [ "tokio-util 0.7.3", "tonic", "trace", + "tracker", "uuid 0.8.2", "workspace-hack", ] @@ -4757,7 +4758,7 @@ dependencies = [ "metric", "parking_lot 0.12.1", "predicate", - "tokio", + "tracker", "workspace-hack", ] @@ -4789,6 +4790,7 @@ dependencies = [ "futures", "generated_types", "iox_query", + "metric", "observability_deps", "pin-project", "prost", @@ -4798,6 +4800,7 @@ dependencies = [ "snafu", "tokio", "tonic", + "tracker", "workspace-hack", ] @@ -4834,6 +4837,7 @@ dependencies = [ "tokio-stream", "tonic", "trace_http", + "tracker", "workspace-hack", ] diff --git a/querier/Cargo.toml b/querier/Cargo.toml index 31dbff3048..4140ee57ec 100644 --- a/querier/Cargo.toml +++ b/querier/Cargo.toml @@ -36,6 +36,7 @@ tokio = { version = "1.19", features = ["macros", "parking_lot", "rt-multi-threa tokio-util = { version = "0.7.3" } tonic = { version = "0.7" } trace = { path = "../trace" } +tracker = { path = "../tracker" } uuid = { version = "0.8", features = ["v4"] } workspace-hack = { path = "../workspace-hack"} diff --git a/querier/src/database.rs b/querier/src/database.rs index 470cf4afff..98fef73b8f 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -11,7 +11,9 @@ use iox_query::exec::Executor; use parquet_file::storage::ParquetStorage; use service_common::QueryDatabaseProvider; use std::sync::Arc; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tracker::{ + AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore, +}; /// The number of entries to store in the circular query buffer log. /// @@ -50,7 +52,7 @@ pub struct QuerierDatabase { /// This should be a 1-to-1 relation to the number of active queries. /// /// If the same database is requested twice for different queries, it is counted twice. - query_execution_semaphore: Arc, + query_execution_semaphore: Arc, } #[async_trait] @@ -61,7 +63,7 @@ impl QueryDatabaseProvider for QuerierDatabase { self.namespace(name).await } - async fn acquire_semaphore(&self) -> OwnedSemaphorePermit { + async fn acquire_semaphore(&self) -> InstrumentedAsyncOwnedSemaphorePermit { Arc::clone(&self.query_execution_semaphore) .acquire_owned() .await @@ -99,7 +101,12 @@ impl QuerierDatabase { catalog_cache.time_provider(), )); let query_log = Arc::new(QueryLog::new(QUERY_LOG_SIZE, catalog_cache.time_provider())); - let query_execution_semaphore = Arc::new(Semaphore::new(max_concurrent_queries)); + let semaphore_metrics = Arc::new(AsyncSemaphoreMetrics::new( + &metric_registry, + &[("semaphore", "query_execution")], + )); + let query_execution_semaphore = + Arc::new(semaphore_metrics.new_semaphore(max_concurrent_queries)); Self { backoff_config: BackoffConfig::default(), diff --git a/service_common/Cargo.toml b/service_common/Cargo.toml index 9a439a5d34..bf05199a56 100644 --- a/service_common/Cargo.toml +++ b/service_common/Cargo.toml @@ -12,7 +12,7 @@ predicate = { path = "../predicate" } iox_query = { path = "../iox_query" } metric = { path = "../metric" } parking_lot = "0.12" -tokio = { version = "1.19", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] } +tracker = { path = "../tracker" } workspace-hack = { path = "../workspace-hack"} # Crates.io dependencies, in alphabetical order diff --git a/service_common/src/lib.rs b/service_common/src/lib.rs index f941c5de79..b08fd05d39 100644 --- a/service_common/src/lib.rs +++ b/service_common/src/lib.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use async_trait::async_trait; use iox_query::{exec::ExecutionContextProvider, QueryDatabase}; -use tokio::sync::OwnedSemaphorePermit; +use tracker::InstrumentedAsyncOwnedSemaphorePermit; /// Trait that allows the query engine (which includes flight and storage/InfluxRPC) to access a virtual set of /// databases. @@ -22,5 +22,5 @@ pub trait QueryDatabaseProvider: std::fmt::Debug + Send + Sync + 'static { async fn db(&self, name: &str) -> Option>; /// Acquire concurrency-limiting sempahore - async fn acquire_semaphore(&self) -> OwnedSemaphorePermit; + async fn acquire_semaphore(&self) -> InstrumentedAsyncOwnedSemaphorePermit; } diff --git a/service_common/src/test_util.rs b/service_common/src/test_util.rs index 46b99f6534..c3b8940c99 100644 --- a/service_common/src/test_util.rs +++ b/service_common/src/test_util.rs @@ -3,7 +3,9 @@ use std::{collections::BTreeMap, sync::Arc}; use async_trait::async_trait; use iox_query::{exec::Executor, test::TestDatabase}; use parking_lot::Mutex; -use tokio::sync::{OwnedSemaphorePermit, Semaphore}; +use tracker::{ + AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore, +}; use crate::QueryDatabaseProvider; @@ -12,7 +14,7 @@ pub struct TestDatabaseStore { databases: Mutex>>, executor: Arc, pub metric_registry: Arc, - pub query_semaphore: Arc, + pub query_semaphore: Arc, } impl TestDatabaseStore { @@ -21,9 +23,16 @@ impl TestDatabaseStore { } pub fn new_with_semaphore_size(semaphore_size: usize) -> Self { + let metric_registry = Arc::new(metric::Registry::default()); + let semaphore_metrics = Arc::new(AsyncSemaphoreMetrics::new( + &metric_registry, + &[("semaphore", "query_execution")], + )); Self { - query_semaphore: Arc::new(Semaphore::new(semaphore_size)), - ..Default::default() + databases: Mutex::new(BTreeMap::new()), + executor: Arc::new(Executor::new(1)), + metric_registry, + query_semaphore: Arc::new(semaphore_metrics.new_semaphore(semaphore_size)), } } @@ -42,12 +51,7 @@ impl TestDatabaseStore { impl Default for TestDatabaseStore { fn default() -> Self { - Self { - databases: Mutex::new(BTreeMap::new()), - executor: Arc::new(Executor::new(1)), - metric_registry: Default::default(), - query_semaphore: Arc::new(Semaphore::new(u16::MAX as usize)), - } + Self::new_with_semaphore_size(u16::MAX as usize) } } @@ -62,7 +66,7 @@ impl QueryDatabaseProvider for TestDatabaseStore { databases.get(name).cloned() } - async fn acquire_semaphore(&self) -> OwnedSemaphorePermit { + async fn acquire_semaphore(&self) -> InstrumentedAsyncOwnedSemaphorePermit { Arc::clone(&self.query_semaphore) .acquire_owned() .await diff --git a/service_grpc_flight/Cargo.toml b/service_grpc_flight/Cargo.toml index f5fdba0baa..f0b8b90cd2 100644 --- a/service_grpc_flight/Cargo.toml +++ b/service_grpc_flight/Cargo.toml @@ -14,6 +14,7 @@ generated_types = { path = "../generated_types" } observability_deps = { path = "../observability_deps" } iox_query = { path = "../iox_query" } service_common = { path = "../service_common" } +tracker = { path = "../tracker" } # Crates.io dependencies, in alphabetical order arrow = { version = "15.0.0", features = ["prettyprint"] } @@ -28,3 +29,6 @@ snafu = "0.7" tokio = { version = "1.19", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } tonic = "0.7" workspace-hack = { path = "../workspace-hack"} + +[dev-dependencies] +metric = { path = "../metric" } diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index 8cbaeaa8df..72377d6971 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -23,8 +23,9 @@ use serde::Deserialize; use service_common::{planner::Planner, QueryDatabaseProvider}; use snafu::{ResultExt, Snafu}; use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll}; -use tokio::{sync::OwnedSemaphorePermit, task::JoinHandle}; +use tokio::task::JoinHandle; use tonic::{Request, Response, Streaming}; +use tracker::InstrumentedAsyncOwnedSemaphorePermit; #[allow(clippy::enum_variant_names)] #[derive(Debug, Snafu)] @@ -290,7 +291,7 @@ struct GetStream { join_handle: JoinHandle<()>, done: bool, #[allow(dead_code)] - permit: OwnedSemaphorePermit, + permit: InstrumentedAsyncOwnedSemaphorePermit, } impl GetStream { @@ -299,7 +300,7 @@ impl GetStream { physical_plan: Arc, database_name: String, mut query_completed_token: QueryCompletedToken, - permit: OwnedSemaphorePermit, + permit: InstrumentedAsyncOwnedSemaphorePermit, ) -> Result { // setup channel let (mut tx, rx) = futures::channel::mpsc::channel::>(1); @@ -429,6 +430,7 @@ impl Stream for GetStream { #[cfg(test)] mod tests { use futures::Future; + use metric::{Attributes, Metric, U64Gauge}; use service_common::test_util::TestDatabaseStore; use tokio::pin; @@ -439,6 +441,22 @@ mod tests { let semaphore_size = 2; let test_storage = Arc::new(TestDatabaseStore::new_with_semaphore_size(semaphore_size)); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_total", + 2, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_pending", + 0, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_acquired", + 0, + ); + // add some data test_storage.db_or_create("my_db").await; @@ -452,19 +470,103 @@ mod tests { .do_get(tonic::Request::new(ticket.clone())) .await .unwrap(); - let _streaming_resp2 = service + + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_total", + 2, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_pending", + 0, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_acquired", + 1, + ); + + let streaming_resp2 = service .do_get(tonic::Request::new(ticket.clone())) .await .unwrap(); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_total", + 2, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_pending", + 0, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_acquired", + 2, + ); + // 3rd request is pending let fut = service.do_get(tonic::Request::new(ticket.clone())); pin!(fut); assert_fut_pending(&mut fut).await; + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_total", + 2, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_pending", + 1, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_acquired", + 2, + ); + // free permit drop(streaming_resp1); - let _streaming_resp3 = fut.await; + let streaming_resp3 = fut.await; + + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_total", + 2, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_pending", + 0, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_acquired", + 2, + ); + + drop(streaming_resp2); + drop(streaming_resp3); + + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_total", + 2, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_pending", + 0, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_acquired", + 0, + ); } /// Assert that given future is pending. @@ -479,4 +581,14 @@ mod tests { _ = tokio::time::sleep(std::time::Duration::from_millis(10)) => {}, }; } + + fn assert_semaphore_metric(registry: &metric::Registry, name: &'static str, expected: u64) { + let actual = registry + .get_instrument::>(name) + .expect("failed to read metric") + .get_observer(&Attributes::from(&[("semaphore", "query_execution")])) + .expect("failed to get observer") + .fetch(); + assert_eq!(actual, expected); + } } diff --git a/service_grpc_influxrpc/Cargo.toml b/service_grpc_influxrpc/Cargo.toml index 2ecd36c4c1..62ae0b0954 100644 --- a/service_grpc_influxrpc/Cargo.toml +++ b/service_grpc_influxrpc/Cargo.toml @@ -14,6 +14,7 @@ iox_query = { path = "../iox_query" } query_functions = { path = "../query_functions"} schema = { path = "../schema" } service_common = { path = "../service_common" } +tracker = { path = "../tracker" } # Crates.io dependencies, in alphabetical order arrow = { version = "15.0.0", features = ["prettyprint"] } diff --git a/service_grpc_influxrpc/src/service.rs b/service_grpc_influxrpc/src/service.rs index 52f13514a9..99332d1296 100644 --- a/service_grpc_influxrpc/src/service.rs +++ b/service_grpc_influxrpc/src/service.rs @@ -39,9 +39,10 @@ use std::{ collections::{BTreeSet, HashMap}, sync::Arc, }; -use tokio::sync::{mpsc, OwnedSemaphorePermit}; +use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; +use tracker::InstrumentedAsyncOwnedSemaphorePermit; #[derive(Debug, Snafu)] pub enum Error { @@ -1441,11 +1442,11 @@ pub struct StreamWithPermit { #[pin] stream: S, #[allow(dead_code)] - permit: OwnedSemaphorePermit, + permit: InstrumentedAsyncOwnedSemaphorePermit, } impl StreamWithPermit { - fn new(stream: S, permit: OwnedSemaphorePermit) -> Self { + fn new(stream: S, permit: InstrumentedAsyncOwnedSemaphorePermit) -> Self { Self { stream, permit } } } @@ -1478,7 +1479,7 @@ mod tests { Client as StorageClient, OrgAndBucket, }; use iox_query::test::TestChunk; - use metric::{Attributes, Metric, U64Counter}; + use metric::{Attributes, Metric, U64Counter, U64Gauge}; use panic_logging::SendPanicsToTracing; use predicate::{Predicate, PredicateMatch}; use service_common::test_util::TestDatabaseStore; @@ -3131,17 +3132,118 @@ mod tests { let service = StorageService { db_store: Arc::clone(&test_storage), }; + + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_total", + 2, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_pending", + 0, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_acquired", + 0, + ); + let streaming_resp1 = t.request(&service).await; - let _streaming_resp2 = t.request(&service).await; + + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_total", + 2, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_pending", + 0, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_acquired", + 1, + ); + + let streaming_resp2 = t.request(&service).await; + + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_total", + 2, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_pending", + 0, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_acquired", + 2, + ); // 3rd request is pending let fut = t.request(&service); pin!(fut); assert_fut_pending(&mut fut).await; + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_total", + 2, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_pending", + 1, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_acquired", + 2, + ); + // free permit drop(streaming_resp1); - let _streaming_resp3 = fut.await; + let streaming_resp3 = fut.await; + + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_total", + 2, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_pending", + 0, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_acquired", + 2, + ); + + drop(streaming_resp2); + drop(streaming_resp3); + + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_total", + 2, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_pending", + 0, + ); + assert_semaphore_metric( + &test_storage.metric_registry, + "iox_async_semaphore_permits_acquired", + 0, + ); } } @@ -3339,4 +3441,14 @@ mod tests { _ = tokio::time::sleep(std::time::Duration::from_millis(10)) => {}, }; } + + fn assert_semaphore_metric(registry: &metric::Registry, name: &'static str, expected: u64) { + let actual = registry + .get_instrument::>(name) + .expect("failed to read metric") + .get_observer(&Attributes::from(&[("semaphore", "query_execution")])) + .expect("failed to get observer") + .fetch(); + assert_eq!(actual, expected); + } }