feat: expose job metrics

Closes #2416.
pull/24376/head
Marco Neumann 2021-08-31 15:14:06 +02:00
parent 04a301cc64
commit e6f12f965c
3 changed files with 125 additions and 5 deletions

View File

@ -6,7 +6,7 @@ use observability_deps::tracing::info;
use query::exec::Executor;
use write_buffer::config::WriteBufferConfigFactory;
use crate::JobRegistry;
use crate::{job::JobRegistryMetrics, JobRegistry};
/// A container for application-global resources
/// shared between server and all DatabaseInstances
@ -28,13 +28,20 @@ impl ApplicationState {
let num_threads = num_worker_threads.unwrap_or_else(num_cpus::get);
info!(%num_threads, "using specified number of threads per thread pool");
let metric_registry_v2 = Arc::new(metric::Registry::new());
let job_registry = Arc::new(JobRegistry::new());
metric_registry_v2.register_instrument("job_registry_metrics", || {
JobRegistryMetrics::new(Arc::clone(&job_registry))
});
Self {
object_store,
write_buffer_factory: Arc::new(Default::default()),
executor: Arc::new(Executor::new(num_threads)),
job_registry: Arc::new(JobRegistry::new()),
job_registry,
metric_registry: Arc::new(metrics::MetricRegistry::new()),
metric_registry_v2: Arc::new(Default::default()),
metric_registry_v2,
}
}

View File

@ -1,7 +1,12 @@
use data_types::job::Job;
use metric::{Attributes, MetricKind, Observation};
use parking_lot::Mutex;
use std::convert::Infallible;
use std::sync::Arc;
use std::{
any::Any,
collections::{BTreeMap, BTreeSet},
convert::Infallible,
sync::Arc,
};
use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, TrackedFutureExt};
const JOB_HISTORY_SIZE: usize = 1000;
@ -73,3 +78,71 @@ impl JobRegistry {
lock.tracked_len()
}
}
#[derive(Debug, Clone)]
pub struct JobRegistryMetrics {
registry: Arc<JobRegistry>,
known: Arc<Mutex<BTreeSet<Attributes>>>,
}
impl JobRegistryMetrics {
pub fn new(registry: Arc<JobRegistry>) -> Self {
Self {
registry,
known: Default::default(),
}
}
}
impl metric::Instrument for JobRegistryMetrics {
fn report(&self, reporter: &mut dyn metric::Reporter) {
// get known attributes from last round
let mut accumulator: BTreeMap<Attributes, u64> = self
.known
.lock()
.iter()
.cloned()
.map(|attr| (attr, 0))
.collect();
// scan current jobs
for job in self.registry.tracked() {
let metadata = job.metadata();
let status = job.get_status();
let attr = Attributes::from(&[
("description", metadata.description()),
(
"status",
status
.result()
.map(|result| result.name())
.unwrap_or_else(|| status.name()),
),
]);
accumulator.entry(attr).and_modify(|x| *x += 1).or_insert(1);
}
// remember known attributes
{
let mut known = self.known.lock();
known.extend(accumulator.keys().cloned());
}
// report metrics
reporter.start_metric(
"influxdb_iox_job_count",
"Number of known jobs",
MetricKind::U64Gauge,
);
for (attr, count) in accumulator {
reporter.report_observation(&attr, Observation::U64Gauge(count));
}
reporter.finish_metric();
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@ -2427,4 +2427,44 @@ mod tests {
let provided_rules: ProvidedDatabaseRules = rules.try_into().unwrap();
provided_rules
}
#[tokio::test]
async fn job_metrics() {
let application = make_application();
let server = make_server(Arc::clone(&application));
let wait_nanos = 1000;
let job = application
.job_registry()
.spawn_dummy_job(vec![wait_nanos], None);
job.join().await;
let mut reporter = metric::RawReporter::default();
application.metric_registry_v2().report(&mut reporter);
let observations: Vec<_> = reporter
.observations()
.iter()
.filter(|observation| observation.metric_name == "influxdb_iox_job_count")
.collect();
assert_eq!(observations.len(), 1);
let gauge = observations[0];
assert_eq!(gauge.kind, metric::MetricKind::U64Gauge);
assert_eq!(gauge.observations.len(), 1);
let (attributes, observation) = &gauge.observations[0];
assert_eq!(
attributes,
&metric::Attributes::from(&[
("description", "Dummy Job, for testing"),
("status", "Success"),
])
);
assert_eq!(observation, &metric::Observation::U64Gauge(1));
server.shutdown();
server.join().await.unwrap();
}
}