Merge pull request #4569 from influxdata/dom/query-metrics

feat(ingester): emit query latency metrics
pull/24376/head
kodiakhq[bot] 2022-05-11 15:58:29 +00:00 committed by GitHub
commit 2a6b383dd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 54 additions and 4 deletions

View File

@ -21,7 +21,8 @@ use futures::{
}; };
use generated_types::ingester::IngesterQueryRequest; use generated_types::ingester::IngesterQueryRequest;
use iox_catalog::interface::Catalog; use iox_catalog::interface::Catalog;
use iox_time::SystemProvider; use iox_time::{SystemProvider, TimeProvider};
use metric::{Metric, U64Histogram, U64HistogramOptions};
use object_store::DynObjectStore; use object_store::DynObjectStore;
use observability_deps::tracing::*; use observability_deps::tracing::*;
use query::exec::Executor; use query::exec::Executor;
@ -89,7 +90,7 @@ fn shared_handle(handle: JoinHandle<()>) -> SharedJoinHandle {
/// Implementation of the `IngestHandler` trait to ingest from kafka and manage /// Implementation of the `IngestHandler` trait to ingest from kafka and manage
/// persistence and answer queries /// persistence and answer queries
#[derive(Debug)] #[derive(Debug)]
pub struct IngestHandlerImpl { pub struct IngestHandlerImpl<T = SystemProvider> {
/// Kafka Topic assigned to this ingester /// Kafka Topic assigned to this ingester
#[allow(dead_code)] #[allow(dead_code)]
kafka_topic: KafkaTopic, kafka_topic: KafkaTopic,
@ -102,6 +103,12 @@ pub struct IngestHandlerImpl {
/// The cache and buffered data for the ingester /// The cache and buffered data for the ingester
data: Arc<IngesterData>, data: Arc<IngesterData>,
time_provider: T,
/// Query execution duration distribution (milliseconds).
query_duration_success_ms: U64Histogram,
query_duration_error_ms: U64Histogram,
} }
impl IngestHandlerImpl { impl IngestHandlerImpl {
@ -225,11 +232,40 @@ impl IngestHandlerImpl {
join_handles.push((worker_name, shared_handle(handle))); join_handles.push((worker_name, shared_handle(handle)));
} }
// Record query duration metrics, broken down by query execution result
let query_duration: Metric<U64Histogram> = metric_registry.register_metric_with_options(
"flight_query_duration_ms",
"flight request query execution duration in milliseconds",
|| {
U64HistogramOptions::new([
5,
10,
20,
40,
80,
160,
320,
640,
1280,
2560,
5120,
10240,
20480,
u64::MAX,
])
},
);
let query_duration_success_ms = query_duration.recorder(&[("result", "success")]);
let query_duration_error_ms = query_duration.recorder(&[("result", "error")]);
Ok(Self { Ok(Self {
data, data,
kafka_topic: topic, kafka_topic: topic,
join_handles, join_handles,
shutdown, shutdown,
query_duration_success_ms,
query_duration_error_ms,
time_provider: Default::default(),
}) })
} }
} }
@ -240,7 +276,21 @@ impl IngestHandler for IngestHandlerImpl {
&self, &self,
request: IngesterQueryRequest, request: IngesterQueryRequest,
) -> Result<IngesterQueryResponse, crate::querier_handler::Error> { ) -> Result<IngesterQueryResponse, crate::querier_handler::Error> {
prepare_data_to_querier(&self.data, &request).await // TODO(4567): move this into a instrumented query delegate
let t = self.time_provider.now();
let res = prepare_data_to_querier(&self.data, &request).await;
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
match &res {
Ok(_) => self
.query_duration_success_ms
.record(delta.as_millis() as _),
Err(_) => self.query_duration_error_ms.record(delta.as_millis() as _),
};
}
res
} }
async fn join(&self) { async fn join(&self) {
@ -277,7 +327,7 @@ impl IngestHandler for IngestHandlerImpl {
} }
} }
impl Drop for IngestHandlerImpl { impl<T> Drop for IngestHandlerImpl<T> {
fn drop(&mut self) { fn drop(&mut self) {
if !self.shutdown.is_cancelled() { if !self.shutdown.is_cancelled() {
warn!("IngestHandlerImpl dropped without calling shutdown()"); warn!("IngestHandlerImpl dropped without calling shutdown()");