feat(metrics): cumulative persist job count

Tracks the cumulative number of persist jobs enqueued on a single
ingester (the total amount, so including now-completed jobs).
pull/24376/head
Dom Dwyer 2022-12-20 20:04:01 +01:00
parent c63790740b
commit 0637540aad
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
2 changed files with 126 additions and 2 deletions

View File

@ -3,6 +3,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use iox_catalog::interface::Catalog;
use iox_query::{exec::Executor, QueryChunkMeta};
use metric::U64Counter;
use observability_deps::tracing::*;
use parking_lot::Mutex;
use parquet_file::storage::ParquetStorage;
@ -154,6 +155,9 @@ pub(crate) struct PersistHandle {
/// Marks and recovers the saturation state of the persist system.
persist_state: Arc<PersistState>,
/// A counter tracking the number of enqueued into the persist system.
enqueued_jobs: U64Counter,
}
impl PersistHandle {
@ -227,12 +231,24 @@ impl PersistHandle {
metrics,
));
// Initialise a metric tracking the number of jobs enqueued.
//
// When combined with the completion count metric, this allows us to
// derive the rate of enqueues and the number of outstanding jobs.
let enqueued_jobs = metrics
.register_metric::<U64Counter>(
"ingester_persist_enqueued_jobs",
"the number of partition persist tasks enqueued",
)
.recorder(&[]);
Self {
sem,
global_queue: global_tx,
worker_queues: JumpHash::new(tx_handles),
worker_tasks,
persist_state,
enqueued_jobs,
}
}
@ -286,7 +302,12 @@ impl PersistQueue for PersistHandle {
let partition_id = data.partition_id().get();
debug!(partition_id, "enqueuing persistence task");
// Record a starting timestamp, and increment the number of persist jobs
// before waiting on the semaphore - this ensures the difference between
// started and completed includes the full count of pending jobs (even
// those blocked waiting for queue capacity).
let enqueued_at = Instant::now();
self.enqueued_jobs.inc(1);
// Try and acquire the persist task permit immediately.
let permit = match Arc::clone(&self.sem).try_acquire_owned() {
@ -393,13 +414,15 @@ impl<T> Drop for AbortOnDrop<T> {
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use std::{sync::Arc, task::Poll, time::Duration};
use assert_matches::assert_matches;
use data_types::{NamespaceId, PartitionId, PartitionKey, ShardId, TableId};
use dml::DmlOperation;
use futures::Future;
use iox_catalog::mem::MemCatalog;
use lazy_static::lazy_static;
use metric::{Attributes, Metric};
use object_store::memory::InMemory;
use parquet_file::storage::StorageId;
use schema::sort::SortKey;
@ -417,7 +440,8 @@ mod tests {
},
deferred_load::DeferredLoad,
dml_sink::DmlSink,
persist::completion_observer::mock::MockCompletionObserver,
ingest_state::IngestStateError,
persist::completion_observer::{mock::MockCompletionObserver, NopObserver},
test_util::make_write_op,
};
@ -441,6 +465,18 @@ mod tests {
}));
}
#[track_caller]
fn assert_metric(metrics: &metric::Registry, name: &'static str, value: u64) {
let v = metrics
.get_instrument::<Metric<U64Counter>>(name)
.expect("failed to read metric")
.get_observer(&Attributes::from([]))
.expect("failed to get observer")
.fetch();
assert_eq!(v, value, "metric {name} had value {v} want {value}");
}
/// Construct a partition with the above constants, with the given sort key,
/// and containing a single write.
async fn new_partition(
@ -815,4 +851,77 @@ mod tests {
.expect("task should be in global queue");
assert_eq!(msg.partition_id(), PARTITION_ID);
}
/// A test that a ensures tasks waiting to be enqueued (waiting on the
/// semaphore) appear in the metrics.
#[tokio::test]
async fn test_persist_saturated_enqueue_counter() {
let storage = ParquetStorage::new(Arc::new(InMemory::default()), StorageId::from("iox"));
let metrics = Arc::new(metric::Registry::default());
let catalog = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let ingest_state = Arc::new(IngestState::default());
let mut handle = PersistHandle::new(
1,
1,
Arc::clone(&ingest_state),
Arc::clone(&EXEC),
storage,
catalog,
NopObserver::default(),
&metrics,
);
assert!(ingest_state.read().is_ok());
// Kill the workers, and replace the queues so we can inspect the
// enqueue output.
handle.worker_tasks = vec![];
let (global_tx, _global_rx) = async_channel::unbounded();
handle.global_queue = global_tx;
let (worker1_tx, _worker1_rx) = mpsc::unbounded_channel();
let (worker2_tx, _worker2_rx) = mpsc::unbounded_channel();
handle.worker_queues = JumpHash::new([worker1_tx, worker2_tx]);
// Generate a partition
let p = new_partition(
PARTITION_ID,
SortKeyState::Deferred(Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
Some(SortKey::from_columns(["time", "good"]))
}))),
)
.await;
let data = p.lock().mark_persisting().unwrap();
// Enqueue it
let _notify1 = handle.enqueue(p, data).await;
// Generate a second partition
let p = new_partition(
PARTITION_ID,
SortKeyState::Deferred(Arc::new(DeferredLoad::new(Duration::from_secs(1), async {
Some(SortKey::from_columns(["time", "good"]))
}))),
)
.await;
let data = p.lock().mark_persisting().unwrap();
// Enqueue it
let fut = handle.enqueue(p, data);
// Poll it to the pending state
let waker = futures::task::noop_waker();
let mut cx = futures::task::Context::from_waker(&waker);
futures::pin_mut!(fut);
let poll = std::pin::Pin::new(&mut fut).poll(&mut cx);
assert_matches!(poll, Poll::Pending);
// The queue is now full, and the second enqueue above is blocked
assert_matches!(ingest_state.read(), Err(IngestStateError::PersistSaturated));
// And the counter shows two persist ops.
assert_metric(&metrics, "ingester_persist_enqueued_jobs", 2);
}
}

View File

@ -23,6 +23,7 @@ mod tests {
};
use iox_query::exec::Executor;
use lazy_static::lazy_static;
use metric::{Attributes, Metric, U64Counter};
use object_store::{memory::InMemory, ObjectMeta, ObjectStore};
use parking_lot::Mutex;
use parquet_file::{
@ -118,6 +119,18 @@ mod tests {
buf.partitions().next().unwrap()
}
#[track_caller]
fn assert_metric(metrics: &metric::Registry, name: &'static str, value: u64) {
let v = metrics
.get_instrument::<Metric<U64Counter>>(name)
.expect("failed to read metric")
.get_observer(&Attributes::from([]))
.expect("failed to get observer")
.fetch();
assert_eq!(v, value, "metric {name} had value {v} want {value}");
}
/// A complete integration test of the persistence system components.
#[tokio::test]
async fn test_persist_integration() {
@ -160,6 +173,8 @@ mod tests {
let notify = handle.enqueue(Arc::clone(&partition), data).await;
assert!(ingest_state.read().is_ok());
assert_metric(&metrics, "ingester_persist_enqueued_jobs", 1);
// Wait for the persist to complete.
notify
.with_timeout(Duration::from_secs(10))