feat: expose query semaphore metrics (#4836)
The groundwork for that was already done, just needed a bit of wiring. This might help us to judge timeouts.pull/24376/head
parent
ddf61c5e98
commit
66623fe0cd
|
@ -3969,6 +3969,7 @@ dependencies = [
|
||||||
"tokio-util 0.7.3",
|
"tokio-util 0.7.3",
|
||||||
"tonic",
|
"tonic",
|
||||||
"trace",
|
"trace",
|
||||||
|
"tracker",
|
||||||
"uuid 0.8.2",
|
"uuid 0.8.2",
|
||||||
"workspace-hack",
|
"workspace-hack",
|
||||||
]
|
]
|
||||||
|
@ -4757,7 +4758,7 @@ dependencies = [
|
||||||
"metric",
|
"metric",
|
||||||
"parking_lot 0.12.1",
|
"parking_lot 0.12.1",
|
||||||
"predicate",
|
"predicate",
|
||||||
"tokio",
|
"tracker",
|
||||||
"workspace-hack",
|
"workspace-hack",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -4789,6 +4790,7 @@ dependencies = [
|
||||||
"futures",
|
"futures",
|
||||||
"generated_types",
|
"generated_types",
|
||||||
"iox_query",
|
"iox_query",
|
||||||
|
"metric",
|
||||||
"observability_deps",
|
"observability_deps",
|
||||||
"pin-project",
|
"pin-project",
|
||||||
"prost",
|
"prost",
|
||||||
|
@ -4798,6 +4800,7 @@ dependencies = [
|
||||||
"snafu",
|
"snafu",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tonic",
|
"tonic",
|
||||||
|
"tracker",
|
||||||
"workspace-hack",
|
"workspace-hack",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -4834,6 +4837,7 @@ dependencies = [
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
"tonic",
|
"tonic",
|
||||||
"trace_http",
|
"trace_http",
|
||||||
|
"tracker",
|
||||||
"workspace-hack",
|
"workspace-hack",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,7 @@ tokio = { version = "1.19", features = ["macros", "parking_lot", "rt-multi-threa
|
||||||
tokio-util = { version = "0.7.3" }
|
tokio-util = { version = "0.7.3" }
|
||||||
tonic = { version = "0.7" }
|
tonic = { version = "0.7" }
|
||||||
trace = { path = "../trace" }
|
trace = { path = "../trace" }
|
||||||
|
tracker = { path = "../tracker" }
|
||||||
uuid = { version = "0.8", features = ["v4"] }
|
uuid = { version = "0.8", features = ["v4"] }
|
||||||
workspace-hack = { path = "../workspace-hack"}
|
workspace-hack = { path = "../workspace-hack"}
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,9 @@ use iox_query::exec::Executor;
|
||||||
use parquet_file::storage::ParquetStorage;
|
use parquet_file::storage::ParquetStorage;
|
||||||
use service_common::QueryDatabaseProvider;
|
use service_common::QueryDatabaseProvider;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
use tracker::{
|
||||||
|
AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore,
|
||||||
|
};
|
||||||
|
|
||||||
/// The number of entries to store in the circular query buffer log.
|
/// The number of entries to store in the circular query buffer log.
|
||||||
///
|
///
|
||||||
|
@ -50,7 +52,7 @@ pub struct QuerierDatabase {
|
||||||
/// This should be a 1-to-1 relation to the number of active queries.
|
/// This should be a 1-to-1 relation to the number of active queries.
|
||||||
///
|
///
|
||||||
/// If the same database is requested twice for different queries, it is counted twice.
|
/// If the same database is requested twice for different queries, it is counted twice.
|
||||||
query_execution_semaphore: Arc<Semaphore>,
|
query_execution_semaphore: Arc<InstrumentedAsyncSemaphore>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
@ -61,7 +63,7 @@ impl QueryDatabaseProvider for QuerierDatabase {
|
||||||
self.namespace(name).await
|
self.namespace(name).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn acquire_semaphore(&self) -> OwnedSemaphorePermit {
|
async fn acquire_semaphore(&self) -> InstrumentedAsyncOwnedSemaphorePermit {
|
||||||
Arc::clone(&self.query_execution_semaphore)
|
Arc::clone(&self.query_execution_semaphore)
|
||||||
.acquire_owned()
|
.acquire_owned()
|
||||||
.await
|
.await
|
||||||
|
@ -99,7 +101,12 @@ impl QuerierDatabase {
|
||||||
catalog_cache.time_provider(),
|
catalog_cache.time_provider(),
|
||||||
));
|
));
|
||||||
let query_log = Arc::new(QueryLog::new(QUERY_LOG_SIZE, catalog_cache.time_provider()));
|
let query_log = Arc::new(QueryLog::new(QUERY_LOG_SIZE, catalog_cache.time_provider()));
|
||||||
let query_execution_semaphore = Arc::new(Semaphore::new(max_concurrent_queries));
|
let semaphore_metrics = Arc::new(AsyncSemaphoreMetrics::new(
|
||||||
|
&metric_registry,
|
||||||
|
&[("semaphore", "query_execution")],
|
||||||
|
));
|
||||||
|
let query_execution_semaphore =
|
||||||
|
Arc::new(semaphore_metrics.new_semaphore(max_concurrent_queries));
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
backoff_config: BackoffConfig::default(),
|
backoff_config: BackoffConfig::default(),
|
||||||
|
|
|
@ -12,7 +12,7 @@ predicate = { path = "../predicate" }
|
||||||
iox_query = { path = "../iox_query" }
|
iox_query = { path = "../iox_query" }
|
||||||
metric = { path = "../metric" }
|
metric = { path = "../metric" }
|
||||||
parking_lot = "0.12"
|
parking_lot = "0.12"
|
||||||
tokio = { version = "1.19", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }
|
tracker = { path = "../tracker" }
|
||||||
workspace-hack = { path = "../workspace-hack"}
|
workspace-hack = { path = "../workspace-hack"}
|
||||||
|
|
||||||
# Crates.io dependencies, in alphabetical order
|
# Crates.io dependencies, in alphabetical order
|
||||||
|
|
|
@ -7,7 +7,7 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use iox_query::{exec::ExecutionContextProvider, QueryDatabase};
|
use iox_query::{exec::ExecutionContextProvider, QueryDatabase};
|
||||||
use tokio::sync::OwnedSemaphorePermit;
|
use tracker::InstrumentedAsyncOwnedSemaphorePermit;
|
||||||
|
|
||||||
/// Trait that allows the query engine (which includes flight and storage/InfluxRPC) to access a virtual set of
|
/// Trait that allows the query engine (which includes flight and storage/InfluxRPC) to access a virtual set of
|
||||||
/// databases.
|
/// databases.
|
||||||
|
@ -22,5 +22,5 @@ pub trait QueryDatabaseProvider: std::fmt::Debug + Send + Sync + 'static {
|
||||||
async fn db(&self, name: &str) -> Option<Arc<Self::Db>>;
|
async fn db(&self, name: &str) -> Option<Arc<Self::Db>>;
|
||||||
|
|
||||||
/// Acquire concurrency-limiting sempahore
|
/// Acquire concurrency-limiting sempahore
|
||||||
async fn acquire_semaphore(&self) -> OwnedSemaphorePermit;
|
async fn acquire_semaphore(&self) -> InstrumentedAsyncOwnedSemaphorePermit;
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,9 @@ use std::{collections::BTreeMap, sync::Arc};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use iox_query::{exec::Executor, test::TestDatabase};
|
use iox_query::{exec::Executor, test::TestDatabase};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
use tracker::{
|
||||||
|
AsyncSemaphoreMetrics, InstrumentedAsyncOwnedSemaphorePermit, InstrumentedAsyncSemaphore,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::QueryDatabaseProvider;
|
use crate::QueryDatabaseProvider;
|
||||||
|
|
||||||
|
@ -12,7 +14,7 @@ pub struct TestDatabaseStore {
|
||||||
databases: Mutex<BTreeMap<String, Arc<TestDatabase>>>,
|
databases: Mutex<BTreeMap<String, Arc<TestDatabase>>>,
|
||||||
executor: Arc<Executor>,
|
executor: Arc<Executor>,
|
||||||
pub metric_registry: Arc<metric::Registry>,
|
pub metric_registry: Arc<metric::Registry>,
|
||||||
pub query_semaphore: Arc<Semaphore>,
|
pub query_semaphore: Arc<InstrumentedAsyncSemaphore>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TestDatabaseStore {
|
impl TestDatabaseStore {
|
||||||
|
@ -21,9 +23,16 @@ impl TestDatabaseStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_with_semaphore_size(semaphore_size: usize) -> Self {
|
pub fn new_with_semaphore_size(semaphore_size: usize) -> Self {
|
||||||
|
let metric_registry = Arc::new(metric::Registry::default());
|
||||||
|
let semaphore_metrics = Arc::new(AsyncSemaphoreMetrics::new(
|
||||||
|
&metric_registry,
|
||||||
|
&[("semaphore", "query_execution")],
|
||||||
|
));
|
||||||
Self {
|
Self {
|
||||||
query_semaphore: Arc::new(Semaphore::new(semaphore_size)),
|
databases: Mutex::new(BTreeMap::new()),
|
||||||
..Default::default()
|
executor: Arc::new(Executor::new(1)),
|
||||||
|
metric_registry,
|
||||||
|
query_semaphore: Arc::new(semaphore_metrics.new_semaphore(semaphore_size)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -42,12 +51,7 @@ impl TestDatabaseStore {
|
||||||
|
|
||||||
impl Default for TestDatabaseStore {
|
impl Default for TestDatabaseStore {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self {
|
Self::new_with_semaphore_size(u16::MAX as usize)
|
||||||
databases: Mutex::new(BTreeMap::new()),
|
|
||||||
executor: Arc::new(Executor::new(1)),
|
|
||||||
metric_registry: Default::default(),
|
|
||||||
query_semaphore: Arc::new(Semaphore::new(u16::MAX as usize)),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,7 +66,7 @@ impl QueryDatabaseProvider for TestDatabaseStore {
|
||||||
databases.get(name).cloned()
|
databases.get(name).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn acquire_semaphore(&self) -> OwnedSemaphorePermit {
|
async fn acquire_semaphore(&self) -> InstrumentedAsyncOwnedSemaphorePermit {
|
||||||
Arc::clone(&self.query_semaphore)
|
Arc::clone(&self.query_semaphore)
|
||||||
.acquire_owned()
|
.acquire_owned()
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -14,6 +14,7 @@ generated_types = { path = "../generated_types" }
|
||||||
observability_deps = { path = "../observability_deps" }
|
observability_deps = { path = "../observability_deps" }
|
||||||
iox_query = { path = "../iox_query" }
|
iox_query = { path = "../iox_query" }
|
||||||
service_common = { path = "../service_common" }
|
service_common = { path = "../service_common" }
|
||||||
|
tracker = { path = "../tracker" }
|
||||||
|
|
||||||
# Crates.io dependencies, in alphabetical order
|
# Crates.io dependencies, in alphabetical order
|
||||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||||
|
@ -28,3 +29,6 @@ snafu = "0.7"
|
||||||
tokio = { version = "1.19", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
|
tokio = { version = "1.19", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
|
||||||
tonic = "0.7"
|
tonic = "0.7"
|
||||||
workspace-hack = { path = "../workspace-hack"}
|
workspace-hack = { path = "../workspace-hack"}
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
metric = { path = "../metric" }
|
||||||
|
|
|
@ -23,8 +23,9 @@ use serde::Deserialize;
|
||||||
use service_common::{planner::Planner, QueryDatabaseProvider};
|
use service_common::{planner::Planner, QueryDatabaseProvider};
|
||||||
use snafu::{ResultExt, Snafu};
|
use snafu::{ResultExt, Snafu};
|
||||||
use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll};
|
use std::{fmt::Debug, pin::Pin, sync::Arc, task::Poll};
|
||||||
use tokio::{sync::OwnedSemaphorePermit, task::JoinHandle};
|
use tokio::task::JoinHandle;
|
||||||
use tonic::{Request, Response, Streaming};
|
use tonic::{Request, Response, Streaming};
|
||||||
|
use tracker::InstrumentedAsyncOwnedSemaphorePermit;
|
||||||
|
|
||||||
#[allow(clippy::enum_variant_names)]
|
#[allow(clippy::enum_variant_names)]
|
||||||
#[derive(Debug, Snafu)]
|
#[derive(Debug, Snafu)]
|
||||||
|
@ -290,7 +291,7 @@ struct GetStream {
|
||||||
join_handle: JoinHandle<()>,
|
join_handle: JoinHandle<()>,
|
||||||
done: bool,
|
done: bool,
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
permit: OwnedSemaphorePermit,
|
permit: InstrumentedAsyncOwnedSemaphorePermit,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl GetStream {
|
impl GetStream {
|
||||||
|
@ -299,7 +300,7 @@ impl GetStream {
|
||||||
physical_plan: Arc<dyn ExecutionPlan>,
|
physical_plan: Arc<dyn ExecutionPlan>,
|
||||||
database_name: String,
|
database_name: String,
|
||||||
mut query_completed_token: QueryCompletedToken,
|
mut query_completed_token: QueryCompletedToken,
|
||||||
permit: OwnedSemaphorePermit,
|
permit: InstrumentedAsyncOwnedSemaphorePermit,
|
||||||
) -> Result<Self, tonic::Status> {
|
) -> Result<Self, tonic::Status> {
|
||||||
// setup channel
|
// setup channel
|
||||||
let (mut tx, rx) = futures::channel::mpsc::channel::<Result<FlightData, tonic::Status>>(1);
|
let (mut tx, rx) = futures::channel::mpsc::channel::<Result<FlightData, tonic::Status>>(1);
|
||||||
|
@ -429,6 +430,7 @@ impl Stream for GetStream {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
|
use metric::{Attributes, Metric, U64Gauge};
|
||||||
use service_common::test_util::TestDatabaseStore;
|
use service_common::test_util::TestDatabaseStore;
|
||||||
use tokio::pin;
|
use tokio::pin;
|
||||||
|
|
||||||
|
@ -439,6 +441,22 @@ mod tests {
|
||||||
let semaphore_size = 2;
|
let semaphore_size = 2;
|
||||||
let test_storage = Arc::new(TestDatabaseStore::new_with_semaphore_size(semaphore_size));
|
let test_storage = Arc::new(TestDatabaseStore::new_with_semaphore_size(semaphore_size));
|
||||||
|
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_total",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_pending",
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_acquired",
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
|
||||||
// add some data
|
// add some data
|
||||||
test_storage.db_or_create("my_db").await;
|
test_storage.db_or_create("my_db").await;
|
||||||
|
|
||||||
|
@ -452,19 +470,103 @@ mod tests {
|
||||||
.do_get(tonic::Request::new(ticket.clone()))
|
.do_get(tonic::Request::new(ticket.clone()))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let _streaming_resp2 = service
|
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_total",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_pending",
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_acquired",
|
||||||
|
1,
|
||||||
|
);
|
||||||
|
|
||||||
|
let streaming_resp2 = service
|
||||||
.do_get(tonic::Request::new(ticket.clone()))
|
.do_get(tonic::Request::new(ticket.clone()))
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_total",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_pending",
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_acquired",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
|
||||||
// 3rd request is pending
|
// 3rd request is pending
|
||||||
let fut = service.do_get(tonic::Request::new(ticket.clone()));
|
let fut = service.do_get(tonic::Request::new(ticket.clone()));
|
||||||
pin!(fut);
|
pin!(fut);
|
||||||
assert_fut_pending(&mut fut).await;
|
assert_fut_pending(&mut fut).await;
|
||||||
|
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_total",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_pending",
|
||||||
|
1,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_acquired",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
|
||||||
// free permit
|
// free permit
|
||||||
drop(streaming_resp1);
|
drop(streaming_resp1);
|
||||||
let _streaming_resp3 = fut.await;
|
let streaming_resp3 = fut.await;
|
||||||
|
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_total",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_pending",
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_acquired",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
|
||||||
|
drop(streaming_resp2);
|
||||||
|
drop(streaming_resp3);
|
||||||
|
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_total",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_pending",
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_acquired",
|
||||||
|
0,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Assert that given future is pending.
|
/// Assert that given future is pending.
|
||||||
|
@ -479,4 +581,14 @@ mod tests {
|
||||||
_ = tokio::time::sleep(std::time::Duration::from_millis(10)) => {},
|
_ = tokio::time::sleep(std::time::Duration::from_millis(10)) => {},
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn assert_semaphore_metric(registry: &metric::Registry, name: &'static str, expected: u64) {
|
||||||
|
let actual = registry
|
||||||
|
.get_instrument::<Metric<U64Gauge>>(name)
|
||||||
|
.expect("failed to read metric")
|
||||||
|
.get_observer(&Attributes::from(&[("semaphore", "query_execution")]))
|
||||||
|
.expect("failed to get observer")
|
||||||
|
.fetch();
|
||||||
|
assert_eq!(actual, expected);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@ iox_query = { path = "../iox_query" }
|
||||||
query_functions = { path = "../query_functions"}
|
query_functions = { path = "../query_functions"}
|
||||||
schema = { path = "../schema" }
|
schema = { path = "../schema" }
|
||||||
service_common = { path = "../service_common" }
|
service_common = { path = "../service_common" }
|
||||||
|
tracker = { path = "../tracker" }
|
||||||
|
|
||||||
# Crates.io dependencies, in alphabetical order
|
# Crates.io dependencies, in alphabetical order
|
||||||
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
arrow = { version = "15.0.0", features = ["prettyprint"] }
|
||||||
|
|
|
@ -39,9 +39,10 @@ use std::{
|
||||||
collections::{BTreeSet, HashMap},
|
collections::{BTreeSet, HashMap},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
use tokio::sync::{mpsc, OwnedSemaphorePermit};
|
use tokio::sync::mpsc;
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
use tonic::Status;
|
use tonic::Status;
|
||||||
|
use tracker::InstrumentedAsyncOwnedSemaphorePermit;
|
||||||
|
|
||||||
#[derive(Debug, Snafu)]
|
#[derive(Debug, Snafu)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
|
@ -1441,11 +1442,11 @@ pub struct StreamWithPermit<S> {
|
||||||
#[pin]
|
#[pin]
|
||||||
stream: S,
|
stream: S,
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
permit: OwnedSemaphorePermit,
|
permit: InstrumentedAsyncOwnedSemaphorePermit,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S> StreamWithPermit<S> {
|
impl<S> StreamWithPermit<S> {
|
||||||
fn new(stream: S, permit: OwnedSemaphorePermit) -> Self {
|
fn new(stream: S, permit: InstrumentedAsyncOwnedSemaphorePermit) -> Self {
|
||||||
Self { stream, permit }
|
Self { stream, permit }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1478,7 +1479,7 @@ mod tests {
|
||||||
Client as StorageClient, OrgAndBucket,
|
Client as StorageClient, OrgAndBucket,
|
||||||
};
|
};
|
||||||
use iox_query::test::TestChunk;
|
use iox_query::test::TestChunk;
|
||||||
use metric::{Attributes, Metric, U64Counter};
|
use metric::{Attributes, Metric, U64Counter, U64Gauge};
|
||||||
use panic_logging::SendPanicsToTracing;
|
use panic_logging::SendPanicsToTracing;
|
||||||
use predicate::{Predicate, PredicateMatch};
|
use predicate::{Predicate, PredicateMatch};
|
||||||
use service_common::test_util::TestDatabaseStore;
|
use service_common::test_util::TestDatabaseStore;
|
||||||
|
@ -3131,17 +3132,118 @@ mod tests {
|
||||||
let service = StorageService {
|
let service = StorageService {
|
||||||
db_store: Arc::clone(&test_storage),
|
db_store: Arc::clone(&test_storage),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_total",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_pending",
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_acquired",
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
|
||||||
let streaming_resp1 = t.request(&service).await;
|
let streaming_resp1 = t.request(&service).await;
|
||||||
let _streaming_resp2 = t.request(&service).await;
|
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_total",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_pending",
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_acquired",
|
||||||
|
1,
|
||||||
|
);
|
||||||
|
|
||||||
|
let streaming_resp2 = t.request(&service).await;
|
||||||
|
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_total",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_pending",
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_acquired",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
|
||||||
// 3rd request is pending
|
// 3rd request is pending
|
||||||
let fut = t.request(&service);
|
let fut = t.request(&service);
|
||||||
pin!(fut);
|
pin!(fut);
|
||||||
assert_fut_pending(&mut fut).await;
|
assert_fut_pending(&mut fut).await;
|
||||||
|
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_total",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_pending",
|
||||||
|
1,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_acquired",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
|
||||||
// free permit
|
// free permit
|
||||||
drop(streaming_resp1);
|
drop(streaming_resp1);
|
||||||
let _streaming_resp3 = fut.await;
|
let streaming_resp3 = fut.await;
|
||||||
|
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_total",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_pending",
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_acquired",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
|
||||||
|
drop(streaming_resp2);
|
||||||
|
drop(streaming_resp3);
|
||||||
|
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_total",
|
||||||
|
2,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_pending",
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
assert_semaphore_metric(
|
||||||
|
&test_storage.metric_registry,
|
||||||
|
"iox_async_semaphore_permits_acquired",
|
||||||
|
0,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3339,4 +3441,14 @@ mod tests {
|
||||||
_ = tokio::time::sleep(std::time::Duration::from_millis(10)) => {},
|
_ = tokio::time::sleep(std::time::Duration::from_millis(10)) => {},
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn assert_semaphore_metric(registry: &metric::Registry, name: &'static str, expected: u64) {
|
||||||
|
let actual = registry
|
||||||
|
.get_instrument::<Metric<U64Gauge>>(name)
|
||||||
|
.expect("failed to read metric")
|
||||||
|
.get_observer(&Attributes::from(&[("semaphore", "query_execution")]))
|
||||||
|
.expect("failed to get observer")
|
||||||
|
.fetch();
|
||||||
|
assert_eq!(actual, expected);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue