diff --git a/iox_catalog/src/interface.rs b/iox_catalog/src/interface.rs index a54c50cb6f..31850a9997 100644 --- a/iox_catalog/src/interface.rs +++ b/iox_catalog/src/interface.rs @@ -3090,7 +3090,7 @@ pub(crate) mod test_helpers { .expect("failed to get observer") .fetch(); - let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count); + let hit_count = histogram.sample_count(); assert!(hit_count > 1, "metric did not record any calls"); } } diff --git a/iox_catalog/src/postgres.rs b/iox_catalog/src/postgres.rs index cdc2edcbab..743a36f970 100644 --- a/iox_catalog/src/postgres.rs +++ b/iox_catalog/src/postgres.rs @@ -2011,7 +2011,7 @@ mod tests { .expect("failed to get observer") .fetch(); - let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count); + let hit_count = histogram.sample_count(); assert!(hit_count > 0, "metric did not record any calls"); } diff --git a/object_store_metrics/src/lib.rs b/object_store_metrics/src/lib.rs index 7b7d9d6843..d673bc77ae 100644 --- a/object_store_metrics/src/lib.rs +++ b/object_store_metrics/src/lib.rs @@ -516,7 +516,7 @@ mod tests { .expect("failed to get observer") .fetch(); - let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count); + let hit_count = histogram.sample_count(); assert!(hit_count > 0, "metric {} did not record any calls", name); } @@ -814,21 +814,18 @@ mod tests { // Now the stream is complete, the wall clock duration must have been // recorded. - let hit_count = success_hist - .fetch() - .buckets - .iter() - .fold(0, |acc, v| acc + v.count); + let hit_count = success_hist.fetch().sample_count(); assert_eq!(hit_count, 1, "wall clock duration recorded incorrectly"); assert_counter_value(&metrics, "object_store_transfer_bytes", [], 4); // And it must be in a SLEEP or higher bucket. - let hit_count = success_hist + let hit_count: u64 = success_hist .fetch() .buckets .iter() .skip_while(|b| b.le < SLEEP) // Skip buckets less than the sleep duration - .fold(0, |acc, v| acc + v.count); + .map(|v| v.count) + .sum(); assert_eq!( hit_count, 1, "wall clock duration not recorded in correct bucket" @@ -836,11 +833,7 @@ mod tests { // Metrics must not be duplicated when the decorator is dropped drop(stream); - let hit_count = success_hist - .fetch() - .buckets - .iter() - .fold(0, |acc, v| acc + v.count); + let hit_count = success_hist.fetch().sample_count(); assert_eq!(hit_count, 1, "wall clock duration duplicated"); assert_counter_value(&metrics, "object_store_transfer_bytes", [], 4); } @@ -901,9 +894,7 @@ mod tests { .get_observer(&metric::Attributes::from(&[("result", "success")])) .expect("failed to get observer") .fetch() - .buckets - .iter() - .fold(0, |acc, v| acc + v.count); + .sample_count(); assert_eq!(hit_count, 1, "wall clock duration recorded incorrectly"); // And the number of bytes read must match the pre-drop value. @@ -966,9 +957,7 @@ mod tests { .get_observer(&metric::Attributes::from(&[("result", "error")])) .expect("failed to get observer") .fetch() - .buckets - .iter() - .fold(0, |acc, v| acc + v.count); + .sample_count(); assert_eq!(hit_count, 1, "wall clock duration recorded incorrectly"); // And the number of bytes read must match @@ -1040,9 +1029,7 @@ mod tests { .get_observer(&metric::Attributes::from(&[("result", "success")])) .expect("failed to get observer") .fetch() - .buckets - .iter() - .fold(0, |acc, v| acc + v.count); + .sample_count(); assert_eq!(hit_count, 1, "wall clock duration recorded incorrectly"); // And the number of bytes read must match @@ -1087,9 +1074,7 @@ mod tests { .get_observer(&metric::Attributes::from(&[("result", "success")])) .expect("failed to get observer") .fetch() - .buckets - .iter() - .fold(0, |acc, v| acc + v.count); + .sample_count(); assert_eq!(hit_count, 1, "wall clock duration recorded incorrectly"); // And the number of bytes read must match @@ -1133,9 +1118,7 @@ mod tests { .get_observer(&metric::Attributes::from(&[("result", "success")])) .expect("failed to get observer") .fetch() - .buckets - .iter() - .fold(0, |acc, v| acc + v.count); + .sample_count(); assert_eq!(hit_count, 1, "wall clock duration recorded incorrectly"); // And the number of bytes read must match diff --git a/querier/src/cache/test_util.rs b/querier/src/cache/test_util.rs index ae4941e432..4ed6c98259 100644 --- a/querier/src/cache/test_util.rs +++ b/querier/src/cache/test_util.rs @@ -8,6 +8,6 @@ pub fn assert_histogram_metric_count(metrics: &metric::Registry, name: &'static .expect("failed to get observer") .fetch(); - let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count); + let hit_count = histogram.sample_count(); assert_eq!(hit_count, n); } diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index 5b6262c744..b2dc11af2d 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -287,6 +287,11 @@ impl ChunkAdapter { } } + /// Metric registry getter. + pub fn metric_registry(&self) -> Arc { + Arc::clone(&self.metric_registry) + } + /// Get underlying catalog cache. pub fn catalog_cache(&self) -> &Arc { &self.catalog_cache diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 7a87077c1c..88317d5218 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -8,7 +8,8 @@ use crate::{ use data_types::{PartitionId, TableId}; use futures::join; use iox_query::{provider::ChunkPruner, QueryChunk}; -use observability_deps::tracing::debug; +use metric::{DurationHistogram, Metric}; +use observability_deps::tracing::{debug, info}; use predicate::Predicate; use schema::Schema; use snafu::{ResultExt, Snafu}; @@ -72,6 +73,12 @@ pub struct QuerierTable { /// Handle reconciling ingester and catalog data reconciler: Reconciler, + + /// Time spent waiting for successful ingester queries + ingester_duration_success: DurationHistogram, + + /// Time spent waiting for unsuccessful ingester queries + ingester_duration_error: DurationHistogram, } impl QuerierTable { @@ -90,6 +97,15 @@ impl QuerierTable { Arc::clone(&chunk_adapter), ); + let metric_registry = chunk_adapter.metric_registry(); + + let ingester_duration: Metric = metric_registry.register_metric( + "ingester_duration", + "ingester request query execution duration", + ); + let ingester_duration_success = ingester_duration.recorder(&[("result", "success")]); + let ingester_duration_error = ingester_duration.recorder(&[("result", "error")]); + Self { namespace_name, table_name, @@ -98,6 +114,8 @@ impl QuerierTable { ingester_connection, chunk_adapter, reconciler, + ingester_duration_success, + ingester_duration_error, } } @@ -121,7 +139,12 @@ impl QuerierTable { /// /// This currently contains all parquet files linked to their unprocessed tombstones. pub async fn chunks(&self, predicate: &Predicate) -> Result>> { - debug!(?predicate, namespace=%self.namespace_name, table_name=%self.table_name(), "Fetching all chunks"); + debug!( + ?predicate, + namespace=%self.namespace_name, + table_name=%self.table_name(), + "Fetching all chunks" + ); let catalog_cache = self.chunk_adapter.catalog_cache(); @@ -178,8 +201,10 @@ impl QuerierTable { .map(|(_, f)| f.name().to_string()) .collect(); + let ingester_time_start = self.chunk_adapter.catalog_cache().time_provider().now(); + // get any chunks from the ingster - let partitions = self + let partitions_result = self .ingester_connection .partitions( Arc::clone(&self.namespace_name), @@ -189,7 +214,28 @@ impl QuerierTable { Arc::clone(&self.schema), ) .await - .context(GettingIngesterPartitionsSnafu)?; + .context(GettingIngesterPartitionsSnafu); + + if let Some(ingester_duration) = self + .chunk_adapter + .catalog_cache() + .time_provider() + .now() + .checked_duration_since(ingester_time_start) + { + match &partitions_result { + Ok(_) => self.ingester_duration_success.record(ingester_duration), + Err(_) => self.ingester_duration_error.record(ingester_duration), + } + info!( + ?predicate, + namespace=%self.namespace_name, + table_name=%self.table_name, + ?ingester_duration, + "Time spent in ingester" + ); + } + let partitions = partitions_result?; // check that partitions from ingesters don't overlap let mut seen = HashMap::with_capacity(partitions.len()); @@ -269,20 +315,19 @@ impl QuerierTable { #[cfg(test)] mod tests { - use std::sync::Arc; - - use assert_matches::assert_matches; - use data_types::{ChunkId, ColumnType, SequenceNumber}; - use iox_tests::util::{now, TestCatalog, TestTable}; - use predicate::Predicate; - use schema::{builder::SchemaBuilder, InfluxFieldType}; - use test_helpers::maybe_start_logging; - use super::*; use crate::{ ingester::{test_util::MockIngesterConnection, IngesterPartition}, table::test_util::{querier_table, IngesterPartitionBuilder}, }; + use assert_matches::assert_matches; + use data_types::{ChunkId, ColumnType, SequenceNumber}; + use iox_tests::util::{now, TestCatalog, TestTable}; + use metric::Attributes; + use predicate::Predicate; + use schema::{builder::SchemaBuilder, InfluxFieldType}; + use std::sync::Arc; + use test_helpers::maybe_start_logging; #[tokio::test] async fn test_parquet_chunks() { @@ -449,11 +494,39 @@ mod tests { assert_matches!(err, Error::StateFusion { .. }); } + #[tokio::test] + async fn test_ingester_metrics() { + maybe_start_logging(); + + let catalog = TestCatalog::new(); + let metrics = catalog.metric_registry(); + + let ns = catalog.create_namespace("ns").await; + let table = ns.create_table("table").await; + + let querier_table = TestQuerierTable::new(&catalog, &table).await; + + querier_table + .inner() + .ingester_partitions(&Predicate::default()) + .await + .unwrap(); + + let histogram = metrics + .get_instrument::>("ingester_duration") + .expect("failed to read metric") + .get_observer(&Attributes::from(&[("result", "success")])) + .expect("failed to get observer") + .fetch(); + + let hit_count = histogram.sample_count(); + assert!(hit_count > 0, "metric did not record any calls"); + } + #[tokio::test] async fn test_state_reconcile() { maybe_start_logging(); let catalog = TestCatalog::new(); - let ns = catalog.create_namespace("ns").await; let table = ns.create_table("table").await; let sequencer = ns.create_sequencer(1).await; diff --git a/router/src/dml_handlers/instrumentation.rs b/router/src/dml_handlers/instrumentation.rs index 6ca6cce047..a8ddfb57f5 100644 --- a/router/src/dml_handlers/instrumentation.rs +++ b/router/src/dml_handlers/instrumentation.rs @@ -159,7 +159,7 @@ mod tests { .expect("failed to get observer") .fetch(); - let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count); + let hit_count = histogram.sample_count(); assert!(hit_count > 0, "metric did not record any calls"); } diff --git a/router/src/namespace_cache/metrics.rs b/router/src/namespace_cache/metrics.rs index 1aa63ba942..7775d311d9 100644 --- a/router/src/namespace_cache/metrics.rs +++ b/router/src/namespace_cache/metrics.rs @@ -211,7 +211,7 @@ mod tests { .expect("failed to get observer") .fetch(); - let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count); + let hit_count = histogram.sample_count(); assert_eq!( hit_count, count, "metric did not record correct number of calls" diff --git a/router/tests/http.rs b/router/tests/http.rs index 0c21ec5be2..116c21c850 100644 --- a/router/tests/http.rs +++ b/router/tests/http.rs @@ -209,7 +209,7 @@ async fn test_write_ok() { ])) .expect("failed to get observer") .fetch(); - let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count); + let hit_count = histogram.sample_count(); assert_eq!(hit_count, 1); assert_eq!( @@ -232,7 +232,7 @@ async fn test_write_ok() { ])) .expect("failed to get observer") .fetch(); - let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count); + let hit_count = histogram.sample_count(); assert_eq!(hit_count, 1); }