diff --git a/ingester/src/handler.rs b/ingester/src/handler.rs index 5fd342ff3d..0ee1199ddf 100644 --- a/ingester/src/handler.rs +++ b/ingester/src/handler.rs @@ -21,7 +21,8 @@ use futures::{ }; use generated_types::ingester::IngesterQueryRequest; use iox_catalog::interface::Catalog; -use iox_time::SystemProvider; +use iox_time::{SystemProvider, TimeProvider}; +use metric::{Metric, U64Histogram, U64HistogramOptions}; use object_store::DynObjectStore; use observability_deps::tracing::*; use query::exec::Executor; @@ -89,7 +90,7 @@ fn shared_handle(handle: JoinHandle<()>) -> SharedJoinHandle { /// Implementation of the `IngestHandler` trait to ingest from kafka and manage /// persistence and answer queries #[derive(Debug)] -pub struct IngestHandlerImpl { +pub struct IngestHandlerImpl { /// Kafka Topic assigned to this ingester #[allow(dead_code)] kafka_topic: KafkaTopic, @@ -102,6 +103,12 @@ pub struct IngestHandlerImpl { /// The cache and buffered data for the ingester data: Arc, + + time_provider: T, + + /// Query execution duration distribution (milliseconds). + query_duration_success_ms: U64Histogram, + query_duration_error_ms: U64Histogram, } impl IngestHandlerImpl { @@ -225,11 +232,40 @@ impl IngestHandlerImpl { join_handles.push((worker_name, shared_handle(handle))); } + // Record query duration metrics, broken down by query execution result + let query_duration: Metric = metric_registry.register_metric_with_options( + "flight_query_duration_ms", + "flight request query execution duration in milliseconds", + || { + U64HistogramOptions::new([ + 5, + 10, + 20, + 40, + 80, + 160, + 320, + 640, + 1280, + 2560, + 5120, + 10240, + 20480, + u64::MAX, + ]) + }, + ); + let query_duration_success_ms = query_duration.recorder(&[("result", "success")]); + let query_duration_error_ms = query_duration.recorder(&[("result", "error")]); + Ok(Self { data, kafka_topic: topic, join_handles, shutdown, + query_duration_success_ms, + query_duration_error_ms, + time_provider: Default::default(), }) } } @@ -240,7 +276,21 @@ impl IngestHandler for IngestHandlerImpl { &self, request: IngesterQueryRequest, ) -> Result { - prepare_data_to_querier(&self.data, &request).await + // TODO(4567): move this into a instrumented query delegate + + let t = self.time_provider.now(); + let res = prepare_data_to_querier(&self.data, &request).await; + + if let Some(delta) = self.time_provider.now().checked_duration_since(t) { + match &res { + Ok(_) => self + .query_duration_success_ms + .record(delta.as_millis() as _), + Err(_) => self.query_duration_error_ms.record(delta.as_millis() as _), + }; + } + + res } async fn join(&self) { @@ -277,7 +327,7 @@ impl IngestHandler for IngestHandlerImpl { } } -impl Drop for IngestHandlerImpl { +impl Drop for IngestHandlerImpl { fn drop(&mut self) { if !self.shutdown.is_cancelled() { warn!("IngestHandlerImpl dropped without calling shutdown()");