From 3541243fcb692e17c70fe8823756f48bd6112916 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 21 Dec 2022 14:13:21 +0100 Subject: [PATCH] feat(metrics): persist duration histograms Adds metrics to track the distribution duration spent actively persisting a batch of partition data (compacting, generating parquet, uploading, DB entries, etc) and another tracking the duration of time an entry spent in the persist queue. Together these provide a measurement of the latency of persist requests, and as they contain event counters, they also provide the throughput and number of outstanding jobs. --- ingester2/src/persist/context.rs | 4 ++++ ingester2/src/persist/handle.rs | 39 +++++++++++++++++------------- ingester2/src/persist/mod.rs | 41 +++++++++++++++++++++++++++++--- ingester2/src/persist/worker.rs | 15 +++++++++--- 4 files changed, 77 insertions(+), 22 deletions(-) diff --git a/ingester2/src/persist/context.rs b/ingester2/src/persist/context.rs index 9145e8ac15..4ef0515f59 100644 --- a/ingester2/src/persist/context.rs +++ b/ingester2/src/persist/context.rs @@ -271,6 +271,10 @@ impl Context { let _ = self.complete.send(()); } + pub(super) fn enqueued_at(&self) -> Instant { + self.enqueued_at + } + pub(super) fn sort_key(&self) -> &SortKeyState { &self.sort_key } diff --git a/ingester2/src/persist/handle.rs b/ingester2/src/persist/handle.rs index 709957dcb2..41e0cfbfeb 100644 --- a/ingester2/src/persist/handle.rs +++ b/ingester2/src/persist/handle.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use async_trait::async_trait; use iox_catalog::interface::Catalog; use iox_query::{exec::Executor, QueryChunkMeta}; -use metric::U64Counter; +use metric::{DurationHistogram, U64Counter}; use observability_deps::tracing::*; use parking_lot::Mutex; use parquet_file::storage::ParquetStorage; @@ -192,6 +192,21 @@ impl PersistHandle { completion_observer, }); + // Initialise a histogram to capture persist job duration & time spent + // in the queue. + let persist_duration = metrics + .register_metric::( + "ingester_persist_active_duration", + "the distribution of persist job processing duration in nanoseconds", + ) + .recorder(&[]); + let queue_duration = metrics + .register_metric::( + "ingester_persist_enqueue_duration", + "the distribution of duration a persist job spent enqueued, waiting to be processed in nanoseconds", + ) + .recorder(&[]); + // Initialise the global queue. // // Persist tasks that do not require a sort key update are enqueued into @@ -211,6 +226,8 @@ impl PersistHandle { worker_state, global_rx.clone(), rx, + queue_duration.clone(), + persist_duration.clone(), ))), ) }) @@ -422,7 +439,6 @@ mod tests { use futures::Future; use iox_catalog::mem::MemCatalog; use lazy_static::lazy_static; - use metric::{Attributes, Metric}; use object_store::memory::InMemory; use parquet_file::storage::StorageId; use schema::sort::SortKey; @@ -441,7 +457,10 @@ mod tests { deferred_load::DeferredLoad, dml_sink::DmlSink, ingest_state::IngestStateError, - persist::completion_observer::{mock::MockCompletionObserver, NopObserver}, + persist::{ + completion_observer::{mock::MockCompletionObserver, NopObserver}, + tests::assert_metric_counter, + }, test_util::make_write_op, }; @@ -465,18 +484,6 @@ mod tests { })); } - #[track_caller] - fn assert_metric(metrics: &metric::Registry, name: &'static str, value: u64) { - let v = metrics - .get_instrument::>(name) - .expect("failed to read metric") - .get_observer(&Attributes::from([])) - .expect("failed to get observer") - .fetch(); - - assert_eq!(v, value, "metric {name} had value {v} want {value}"); - } - /// Construct a partition with the above constants, with the given sort key, /// and containing a single write. async fn new_partition( @@ -922,6 +929,6 @@ mod tests { assert_matches!(ingest_state.read(), Err(IngestStateError::PersistSaturated)); // And the counter shows two persist ops. - assert_metric(&metrics, "ingester_persist_enqueued_jobs", 2); + assert_metric_counter(&metrics, "ingester_persist_enqueued_jobs", 2); } } diff --git a/ingester2/src/persist/mod.rs b/ingester2/src/persist/mod.rs index 8057aa5d56..c753d6514d 100644 --- a/ingester2/src/persist/mod.rs +++ b/ingester2/src/persist/mod.rs @@ -23,7 +23,7 @@ mod tests { }; use iox_query::exec::Executor; use lazy_static::lazy_static; - use metric::{Attributes, Metric, U64Counter}; + use metric::{Attributes, DurationHistogram, Metric, U64Counter}; use object_store::{memory::InMemory, ObjectMeta, ObjectStore}; use parking_lot::Mutex; use parquet_file::{ @@ -120,7 +120,11 @@ mod tests { } #[track_caller] - fn assert_metric(metrics: &metric::Registry, name: &'static str, value: u64) { + pub(super) fn assert_metric_counter( + metrics: &metric::Registry, + name: &'static str, + value: u64, + ) { let v = metrics .get_instrument::>(name) .expect("failed to read metric") @@ -131,6 +135,23 @@ mod tests { assert_eq!(v, value, "metric {name} had value {v} want {value}"); } + #[track_caller] + pub(super) fn assert_metric_histogram( + metrics: &metric::Registry, + name: &'static str, + hits: u64, + ) { + let v = metrics + .get_instrument::>(name) + .expect("failed to read metric") + .get_observer(&Attributes::from([])) + .expect("failed to get observer") + .fetch() + .sample_count(); + + assert_eq!(v, hits, "metric {name} had {v} samples want {hits}"); + } + /// A complete integration test of the persistence system components. #[tokio::test] async fn test_persist_integration() { @@ -169,11 +190,15 @@ mod tests { .mark_persisting() .expect("partition with write should transition to persisting"); + // Assert the starting metric values. + assert_metric_histogram(&metrics, "ingester_persist_active_duration", 0); + assert_metric_histogram(&metrics, "ingester_persist_enqueue_duration", 0); + // Enqueue the persist job let notify = handle.enqueue(Arc::clone(&partition), data).await; assert!(ingest_state.read().is_ok()); - assert_metric(&metrics, "ingester_persist_enqueued_jobs", 1); + assert_metric_counter(&metrics, "ingester_persist_enqueued_jobs", 1); // Wait for the persist to complete. notify @@ -190,6 +215,10 @@ mod tests { assert_eq!(n.sequence_numbers().len(), 1); }); + // And that metrics recorded the enqueue & completion + assert_metric_histogram(&metrics, "ingester_persist_active_duration", 1); + assert_metric_histogram(&metrics, "ingester_persist_enqueue_duration", 1); + // Assert the partition persistence count increased, an indication that // mark_persisted() was called. assert_eq!(partition.lock().completed_persistence_count(), 1); @@ -330,6 +359,12 @@ mod tests { assert_eq!(n.sequence_numbers().len(), 1); }); + // And that despite the persist job effectively running twice (to handle + // the sort key conflict) the metrics should record 1 persist job start + // & completion + assert_metric_histogram(&metrics, "ingester_persist_active_duration", 1); + assert_metric_histogram(&metrics, "ingester_persist_enqueue_duration", 1); + // Assert the partition persistence count increased, an indication that // mark_persisted() was called. assert_eq!(partition.lock().completed_persistence_count(), 1); diff --git a/ingester2/src/persist/worker.rs b/ingester2/src/persist/worker.rs index 5f9303a1cc..c32d2dcc8d 100644 --- a/ingester2/src/persist/worker.rs +++ b/ingester2/src/persist/worker.rs @@ -5,13 +5,12 @@ use backoff::Backoff; use data_types::{CompactionLevel, ParquetFileParams, SequenceNumber}; use iox_catalog::interface::{get_table_schema_by_id, CasFailure, Catalog}; use iox_query::exec::Executor; - use iox_time::{SystemProvider, TimeProvider}; +use metric::DurationHistogram; use observability_deps::tracing::{debug, info, warn}; use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage}; - use schema::sort::SortKey; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, time::Instant}; use uuid::Uuid; use crate::persist::compact::compact_persisting_batch; @@ -76,6 +75,8 @@ pub(super) async fn run_task( worker_state: Arc>, global_queue: async_channel::Receiver, mut rx: mpsc::UnboundedReceiver, + queue_duration: DurationHistogram, + persist_duration: DurationHistogram, ) where O: PersistCompletionObserver, { @@ -114,6 +115,10 @@ pub(super) async fn run_task( let mut ctx = Context::new(req); + // Capture the time spent in the queue. + let started_at = Instant::now(); + queue_duration.record(started_at.duration_since(ctx.enqueued_at())); + // Compact the data, generate the parquet file from the result, and // upload it to object storage. // @@ -136,6 +141,10 @@ pub(super) async fn run_task( // observers. ctx.mark_complete(object_store_id, &worker_state.completion_observer) .await; + + // Capture the time spent actively persisting. + let now = Instant::now(); + persist_duration.record(now.duration_since(started_at)); } }