diff --git a/ingester2/src/persist/handle.rs b/ingester2/src/persist/handle.rs index 29580381e1..52a151da4c 100644 --- a/ingester2/src/persist/handle.rs +++ b/ingester2/src/persist/handle.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use async_trait::async_trait; use iox_catalog::interface::Catalog; use iox_query::{exec::Executor, QueryChunkMeta}; -use metric::{DurationHistogram, U64Counter}; +use metric::{DurationHistogram, U64Counter, U64Gauge}; use observability_deps::tracing::*; use parking_lot::Mutex; use parquet_file::storage::ParquetStorage; @@ -207,6 +207,25 @@ impl PersistHandle { ) .recorder(&[]); + // Set the values of static metrics exporting the configured capacity + // of the persist system. + // + // This allows dashboards/alerts to calculate saturation. + metrics + .register_metric::( + "ingester_persist_max_parallelism", + "the maximum parallelism of persist tasks (number of workers)", + ) + .recorder(&[]) + .set(n_workers as _); + metrics + .register_metric::( + "ingester_persist_max_queue_depth", + "the maximum parallelism of persist tasks (number of workers)", + ) + .recorder(&[]) + .set(persist_queue_depth as _); + // Initialise the global queue. // // Persist tasks that do not require a sort key update are enqueued into @@ -459,7 +478,7 @@ mod tests { ingest_state::IngestStateError, persist::{ completion_observer::{mock::MockCompletionObserver, NopObserver}, - tests::assert_metric_counter, + tests::{assert_metric_counter, assert_metric_gauge}, }, test_util::make_write_op, }; @@ -931,4 +950,27 @@ mod tests { // And the counter shows two persist ops. assert_metric_counter(&metrics, "ingester_persist_enqueued_jobs", 2); } + + /// Export metrics showing the static config values. + #[tokio::test] + async fn test_static_config_metrics() { + let storage = ParquetStorage::new(Arc::new(InMemory::default()), StorageId::from("iox")); + let metrics = Arc::new(metric::Registry::default()); + let catalog = Arc::new(MemCatalog::new(Arc::clone(&metrics))); + let ingest_state = Arc::new(IngestState::default()); + + let _handle = PersistHandle::new( + 5, + 42, + Arc::clone(&ingest_state), + Arc::clone(&EXEC), + storage, + catalog, + NopObserver::default(), + &metrics, + ); + + assert_metric_gauge(&metrics, "ingester_persist_max_parallelism", 5); + assert_metric_gauge(&metrics, "ingester_persist_max_queue_depth", 42); + } } diff --git a/ingester2/src/persist/mod.rs b/ingester2/src/persist/mod.rs index c753d6514d..824e21fc30 100644 --- a/ingester2/src/persist/mod.rs +++ b/ingester2/src/persist/mod.rs @@ -23,7 +23,7 @@ mod tests { }; use iox_query::exec::Executor; use lazy_static::lazy_static; - use metric::{Attributes, DurationHistogram, Metric, U64Counter}; + use metric::{Attributes, DurationHistogram, Metric, U64Counter, U64Gauge}; use object_store::{memory::InMemory, ObjectMeta, ObjectStore}; use parking_lot::Mutex; use parquet_file::{ @@ -119,6 +119,18 @@ mod tests { buf.partitions().next().unwrap() } + #[track_caller] + pub(super) fn assert_metric_gauge(metrics: &metric::Registry, name: &'static str, value: u64) { + let v = metrics + .get_instrument::>(name) + .expect("failed to read metric") + .get_observer(&Attributes::from([])) + .expect("failed to get observer") + .fetch(); + + assert_eq!(v, value, "metric {name} had value {v} want {value}"); + } + #[track_caller] pub(super) fn assert_metric_counter( metrics: &metric::Registry,