feat: export persist config metrics
Export the configured maximum persist parallelism, and the maximum queue depth, so they can be used to compute % saturation in alerts / dashboards.pull/24376/head
parent
c928eddaab
commit
7b69c84ceb
|
@ -3,7 +3,7 @@ use std::sync::Arc;
|
|||
use async_trait::async_trait;
|
||||
use iox_catalog::interface::Catalog;
|
||||
use iox_query::{exec::Executor, QueryChunkMeta};
|
||||
use metric::{DurationHistogram, U64Counter};
|
||||
use metric::{DurationHistogram, U64Counter, U64Gauge};
|
||||
use observability_deps::tracing::*;
|
||||
use parking_lot::Mutex;
|
||||
use parquet_file::storage::ParquetStorage;
|
||||
|
@ -207,6 +207,25 @@ impl PersistHandle {
|
|||
)
|
||||
.recorder(&[]);
|
||||
|
||||
// Set the values of static metrics exporting the configured capacity
|
||||
// of the persist system.
|
||||
//
|
||||
// This allows dashboards/alerts to calculate saturation.
|
||||
metrics
|
||||
.register_metric::<U64Gauge>(
|
||||
"ingester_persist_max_parallelism",
|
||||
"the maximum parallelism of persist tasks (number of workers)",
|
||||
)
|
||||
.recorder(&[])
|
||||
.set(n_workers as _);
|
||||
metrics
|
||||
.register_metric::<U64Gauge>(
|
||||
"ingester_persist_max_queue_depth",
|
||||
"the maximum parallelism of persist tasks (number of workers)",
|
||||
)
|
||||
.recorder(&[])
|
||||
.set(persist_queue_depth as _);
|
||||
|
||||
// Initialise the global queue.
|
||||
//
|
||||
// Persist tasks that do not require a sort key update are enqueued into
|
||||
|
@ -459,7 +478,7 @@ mod tests {
|
|||
ingest_state::IngestStateError,
|
||||
persist::{
|
||||
completion_observer::{mock::MockCompletionObserver, NopObserver},
|
||||
tests::assert_metric_counter,
|
||||
tests::{assert_metric_counter, assert_metric_gauge},
|
||||
},
|
||||
test_util::make_write_op,
|
||||
};
|
||||
|
@ -931,4 +950,27 @@ mod tests {
|
|||
// And the counter shows two persist ops.
|
||||
assert_metric_counter(&metrics, "ingester_persist_enqueued_jobs", 2);
|
||||
}
|
||||
|
||||
/// Export metrics showing the static config values.
|
||||
#[tokio::test]
|
||||
async fn test_static_config_metrics() {
|
||||
let storage = ParquetStorage::new(Arc::new(InMemory::default()), StorageId::from("iox"));
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
let catalog = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
|
||||
let ingest_state = Arc::new(IngestState::default());
|
||||
|
||||
let _handle = PersistHandle::new(
|
||||
5,
|
||||
42,
|
||||
Arc::clone(&ingest_state),
|
||||
Arc::clone(&EXEC),
|
||||
storage,
|
||||
catalog,
|
||||
NopObserver::default(),
|
||||
&metrics,
|
||||
);
|
||||
|
||||
assert_metric_gauge(&metrics, "ingester_persist_max_parallelism", 5);
|
||||
assert_metric_gauge(&metrics, "ingester_persist_max_queue_depth", 42);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ mod tests {
|
|||
};
|
||||
use iox_query::exec::Executor;
|
||||
use lazy_static::lazy_static;
|
||||
use metric::{Attributes, DurationHistogram, Metric, U64Counter};
|
||||
use metric::{Attributes, DurationHistogram, Metric, U64Counter, U64Gauge};
|
||||
use object_store::{memory::InMemory, ObjectMeta, ObjectStore};
|
||||
use parking_lot::Mutex;
|
||||
use parquet_file::{
|
||||
|
@ -119,6 +119,18 @@ mod tests {
|
|||
buf.partitions().next().unwrap()
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
pub(super) fn assert_metric_gauge(metrics: &metric::Registry, name: &'static str, value: u64) {
|
||||
let v = metrics
|
||||
.get_instrument::<Metric<U64Gauge>>(name)
|
||||
.expect("failed to read metric")
|
||||
.get_observer(&Attributes::from([]))
|
||||
.expect("failed to get observer")
|
||||
.fetch();
|
||||
|
||||
assert_eq!(v, value, "metric {name} had value {v} want {value}");
|
||||
}
|
||||
|
||||
#[track_caller]
|
||||
pub(super) fn assert_metric_counter(
|
||||
metrics: &metric::Registry,
|
||||
|
|
Loading…
Reference in New Issue