diff --git a/server/src/application.rs b/server/src/application.rs index 9516624991..44cd004061 100644 --- a/server/src/application.rs +++ b/server/src/application.rs @@ -6,7 +6,7 @@ use observability_deps::tracing::info; use query::exec::Executor; use write_buffer::config::WriteBufferConfigFactory; -use crate::JobRegistry; +use crate::{job::JobRegistryMetrics, JobRegistry}; /// A container for application-global resources /// shared between server and all DatabaseInstances @@ -28,13 +28,20 @@ impl ApplicationState { let num_threads = num_worker_threads.unwrap_or_else(num_cpus::get); info!(%num_threads, "using specified number of threads per thread pool"); + let metric_registry_v2 = Arc::new(metric::Registry::new()); + + let job_registry = Arc::new(JobRegistry::new()); + metric_registry_v2.register_instrument("job_registry_metrics", || { + JobRegistryMetrics::new(Arc::clone(&job_registry)) + }); + Self { object_store, write_buffer_factory: Arc::new(Default::default()), executor: Arc::new(Executor::new(num_threads)), - job_registry: Arc::new(JobRegistry::new()), + job_registry, metric_registry: Arc::new(metrics::MetricRegistry::new()), - metric_registry_v2: Arc::new(Default::default()), + metric_registry_v2, } } diff --git a/server/src/job.rs b/server/src/job.rs index b5ebb6a59b..747951c3ee 100644 --- a/server/src/job.rs +++ b/server/src/job.rs @@ -1,7 +1,12 @@ use data_types::job::Job; +use metric::{Attributes, MetricKind, Observation}; use parking_lot::Mutex; -use std::convert::Infallible; -use std::sync::Arc; +use std::{ + any::Any, + collections::{BTreeMap, BTreeSet}, + convert::Infallible, + sync::Arc, +}; use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt}; const JOB_HISTORY_SIZE: usize = 1000; @@ -73,3 +78,71 @@ impl JobRegistry { lock.tracked_len() } } + +#[derive(Debug, Clone)] +pub struct JobRegistryMetrics { + registry: Arc, + known: Arc>>, +} + +impl JobRegistryMetrics { + pub fn new(registry: Arc) -> Self { + Self { + registry, + known: Default::default(), + } + } +} + +impl metric::Instrument for JobRegistryMetrics { + fn report(&self, reporter: &mut dyn metric::Reporter) { + // get known attributes from last round + let mut accumulator: BTreeMap = self + .known + .lock() + .iter() + .cloned() + .map(|attr| (attr, 0)) + .collect(); + + // scan current jobs + for job in self.registry.tracked() { + let metadata = job.metadata(); + let status = job.get_status(); + + let attr = Attributes::from(&[ + ("description", metadata.description()), + ( + "status", + status + .result() + .map(|result| result.name()) + .unwrap_or_else(|| status.name()), + ), + ]); + + accumulator.entry(attr).and_modify(|x| *x += 1).or_insert(1); + } + + // remember known attributes + { + let mut known = self.known.lock(); + known.extend(accumulator.keys().cloned()); + } + + // report metrics + reporter.start_metric( + "influxdb_iox_job_count", + "Number of known jobs", + MetricKind::U64Gauge, + ); + for (attr, count) in accumulator { + reporter.report_observation(&attr, Observation::U64Gauge(count)); + } + reporter.finish_metric(); + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/server/src/lib.rs b/server/src/lib.rs index f19fb46f29..ea9291c659 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -2427,4 +2427,44 @@ mod tests { let provided_rules: ProvidedDatabaseRules = rules.try_into().unwrap(); provided_rules } + + #[tokio::test] + async fn job_metrics() { + let application = make_application(); + let server = make_server(Arc::clone(&application)); + + let wait_nanos = 1000; + let job = application + .job_registry() + .spawn_dummy_job(vec![wait_nanos], None); + + job.join().await; + + let mut reporter = metric::RawReporter::default(); + application.metric_registry_v2().report(&mut reporter); + + let observations: Vec<_> = reporter + .observations() + .iter() + .filter(|observation| observation.metric_name == "influxdb_iox_job_count") + .collect(); + assert_eq!(observations.len(), 1); + + let gauge = observations[0]; + assert_eq!(gauge.kind, metric::MetricKind::U64Gauge); + assert_eq!(gauge.observations.len(), 1); + + let (attributes, observation) = &gauge.observations[0]; + assert_eq!( + attributes, + &metric::Attributes::from(&[ + ("description", "Dummy Job, for testing"), + ("status", "Success"), + ]) + ); + assert_eq!(observation, &metric::Observation::U64Gauge(1)); + + server.shutdown(); + server.join().await.unwrap(); + } }