feat(ingester2): persist saturation metric
Expose a metric ("ingester_persist_saturated_duration_ns") that records the cumulative duration of time the persist system has spent in the "saturated" state.pull/24376/head
parent
ed17069087
commit
679c6a7896
|
@ -223,6 +223,7 @@ pub async fn new(
|
|||
persist_executor,
|
||||
object_store,
|
||||
Arc::clone(&catalog),
|
||||
&metrics,
|
||||
);
|
||||
|
||||
// Instantiate a post-write observer for hot partition persistence.
|
||||
|
|
|
@ -7,12 +7,13 @@ use std::{
|
|||
};
|
||||
|
||||
use crossbeam_utils::CachePadded;
|
||||
use metric::DurationCounter;
|
||||
use observability_deps::tracing::*;
|
||||
use parking_lot::Mutex;
|
||||
use tokio::{
|
||||
sync::Semaphore,
|
||||
task::JoinHandle,
|
||||
time::{Interval, MissedTickBehavior},
|
||||
time::{Instant, Interval, MissedTickBehavior},
|
||||
};
|
||||
|
||||
/// The interval of time between evaluations of the state of the persist system
|
||||
|
@ -76,13 +77,21 @@ pub(crate) struct PersistState {
|
|||
|
||||
/// The handle to the current saturation evaluation/recovery task, if any.
|
||||
recovery_handle: Mutex<Option<JoinHandle<()>>>,
|
||||
|
||||
/// A counter tracking the number of nanoseconds the state value is set to
|
||||
/// [`CurrentState::Saturated`].
|
||||
saturated_duration_ms: DurationCounter,
|
||||
}
|
||||
|
||||
impl PersistState {
|
||||
/// Initialise a [`PersistState`] with [`CurrentState::Ok`], with a total
|
||||
/// number of tasks bounded to `persist_queue_depth` and permits issued from
|
||||
/// `sem`.
|
||||
pub(crate) fn new(persist_queue_depth: usize, sem: Arc<Semaphore>) -> Self {
|
||||
pub(crate) fn new(
|
||||
persist_queue_depth: usize,
|
||||
sem: Arc<Semaphore>,
|
||||
metrics: &metric::Registry,
|
||||
) -> Self {
|
||||
// The persist_queue_depth should be the maximum number of permits
|
||||
// available in the semaphore.
|
||||
assert!(persist_queue_depth >= sem.available_permits());
|
||||
|
@ -92,12 +101,20 @@ impl PersistState {
|
|||
"persist queue depth must be non-zero"
|
||||
);
|
||||
|
||||
let saturated_duration_ms = metrics
|
||||
.register_metric::<DurationCounter>(
|
||||
"ingester_persist_saturated_duration_ns",
|
||||
"the duration of time the persist system was marked as saturated",
|
||||
)
|
||||
.recorder(&[]);
|
||||
|
||||
let s = Self {
|
||||
state: Default::default(),
|
||||
waiting_to_enqueue: Arc::new(AtomicUsize::new(0)),
|
||||
recovery_handle: Default::default(),
|
||||
persist_queue_depth,
|
||||
sem,
|
||||
saturated_duration_ms,
|
||||
};
|
||||
s.set(CurrentState::Ok);
|
||||
s
|
||||
|
@ -127,6 +144,7 @@ impl PersistState {
|
|||
///
|
||||
/// This value is eventually consistent, with a presumption of being visible
|
||||
/// in a reasonable amount of time.
|
||||
#[inline(always)]
|
||||
pub(crate) fn get(&self) -> CurrentState {
|
||||
// Correctness: relaxed as reading the current state is allowed to be
|
||||
// racy for performance reasons; this call should be as cheap as
|
||||
|
@ -146,6 +164,7 @@ impl PersistState {
|
|||
|
||||
/// A convenience method that returns true if `self` is
|
||||
/// [`CurrentState::Saturated`].
|
||||
#[inline(always)]
|
||||
pub(crate) fn is_saturated(&self) -> bool {
|
||||
self.get() == CurrentState::Saturated
|
||||
}
|
||||
|
@ -234,10 +253,19 @@ async fn saturation_monitor_task(
|
|||
persist_queue_depth: usize,
|
||||
sem: Arc<Semaphore>,
|
||||
) {
|
||||
let mut last = Instant::now();
|
||||
loop {
|
||||
// Wait before evaluating the state of the system.
|
||||
interval.tick().await;
|
||||
|
||||
// Update the saturation metric after the tick.
|
||||
//
|
||||
// For the first tick, this covers the tick wait itself. For subsequent
|
||||
// ticks, this duration covers the evaluation time + tick wait.
|
||||
let now = Instant::now();
|
||||
state.saturated_duration_ms.inc(now.duration_since(last));
|
||||
last = now;
|
||||
|
||||
// INVARIANT: this task only ever runs when the system is saturated.
|
||||
assert!(state.is_saturated());
|
||||
|
||||
|
@ -306,6 +334,7 @@ fn has_sufficient_capacity(capacity: usize, max_capacity: usize) -> bool {
|
|||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use metric::Metric;
|
||||
use test_helpers::timeout::FutureTimeout;
|
||||
|
||||
use super::*;
|
||||
|
@ -313,6 +342,24 @@ mod tests {
|
|||
const QUEUE_DEPTH: usize = 42;
|
||||
const POLL_INTERVAL: Duration = Duration::from_millis(5);
|
||||
|
||||
/// Execute `f` with the current value of the
|
||||
/// "ingester_persist_saturated_duration_ns" metric.
|
||||
#[track_caller]
|
||||
fn assert_saturation_time<F>(metrics: &metric::Registry, f: F)
|
||||
where
|
||||
F: FnOnce(Duration) -> bool,
|
||||
{
|
||||
// Get the saturated duration counter that tracks the time spent in the
|
||||
// "saturated" state.
|
||||
let duration_counter = metrics
|
||||
.get_instrument::<Metric<DurationCounter>>("ingester_persist_saturated_duration_ns")
|
||||
.expect("constructor did not create required duration metric")
|
||||
.recorder(&[]);
|
||||
|
||||
// Call the assert closure
|
||||
assert!(f(duration_counter.fetch()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_has_sufficient_capacity() {
|
||||
// A queue of minimal depth (1).
|
||||
|
@ -337,8 +384,9 @@ mod tests {
|
|||
/// first thread that changes the state observes the "first=true" response.
|
||||
#[test]
|
||||
fn test_state_transitions() {
|
||||
let metrics = metric::Registry::default();
|
||||
let sem = Arc::new(Semaphore::new(QUEUE_DEPTH));
|
||||
let s = PersistState::new(QUEUE_DEPTH, sem);
|
||||
let s = PersistState::new(QUEUE_DEPTH, sem, &metrics);
|
||||
assert_eq!(s.get(), CurrentState::Ok);
|
||||
assert!(!s.is_saturated());
|
||||
|
||||
|
@ -370,12 +418,18 @@ mod tests {
|
|||
/// waiters (as tracked by the [`WaitGuard`]).
|
||||
#[tokio::test]
|
||||
async fn test_saturation_recovery_enqueue_waiters() {
|
||||
let metrics = metric::Registry::default();
|
||||
let sem = Arc::new(Semaphore::new(QUEUE_DEPTH));
|
||||
let s = Arc::new(PersistState::new(QUEUE_DEPTH, Arc::clone(&sem)));
|
||||
let s = Arc::new(PersistState::new(QUEUE_DEPTH, Arc::clone(&sem), &metrics));
|
||||
|
||||
// Use no queues to ensure only the waiters are blocking recovery.
|
||||
|
||||
assert!(!s.is_saturated());
|
||||
assert_saturation_time(&metrics, |d| d == Duration::ZERO);
|
||||
|
||||
// Obtain the current timestamp, and use it as an upper-bound on the
|
||||
// duration of saturation.
|
||||
let duration_upper_bound = Instant::now();
|
||||
|
||||
let w1 = PersistState::set_saturated(Arc::clone(&s));
|
||||
let w2 = PersistState::set_saturated(Arc::clone(&s));
|
||||
|
@ -406,6 +460,7 @@ mod tests {
|
|||
// from ever transitioning to a healthy state.
|
||||
tokio::time::sleep(POLL_INTERVAL * 4).await;
|
||||
assert!(s.is_saturated());
|
||||
assert_saturation_time(&metrics, |d| d > Duration::ZERO);
|
||||
|
||||
// Drop the other waiter.
|
||||
drop(w2);
|
||||
|
@ -422,6 +477,15 @@ mod tests {
|
|||
.with_timeout_panic(Duration::from_secs(5))
|
||||
.await;
|
||||
|
||||
// Assert the saturation metric reports a duration of at least 1 poll
|
||||
// interval (the lower bound necessary for the above recovery to occur)
|
||||
// and the maximum bound (the time since the system entered the
|
||||
// saturated state).
|
||||
assert_saturation_time(&metrics, |d| d >= POLL_INTERVAL);
|
||||
assert_saturation_time(&metrics, |d| {
|
||||
d < Instant::now().duration_since(duration_upper_bound)
|
||||
});
|
||||
|
||||
// Wait up to 60 seconds to observe the recovery task finish.
|
||||
//
|
||||
// The recovery task sets the system state as healthy, and THEN exits,
|
||||
|
@ -450,11 +514,19 @@ mod tests {
|
|||
/// marking the system as healthy.
|
||||
#[tokio::test]
|
||||
async fn test_saturation_recovery_queue_capacity() {
|
||||
let metrics = metric::Registry::default();
|
||||
let sem = Arc::new(Semaphore::new(QUEUE_DEPTH));
|
||||
let s = Arc::new(PersistState::new(QUEUE_DEPTH, Arc::clone(&sem)));
|
||||
let s = Arc::new(PersistState::new(QUEUE_DEPTH, Arc::clone(&sem), &metrics));
|
||||
|
||||
// Use no waiters to ensure only the queue slots are blocking recovery.
|
||||
|
||||
assert!(!s.is_saturated());
|
||||
assert_saturation_time(&metrics, |d| d == Duration::ZERO);
|
||||
|
||||
// Obtain the current timestamp, and use it as an upper-bound on the
|
||||
// duration of saturation.
|
||||
let duration_upper_bound = Instant::now();
|
||||
|
||||
// Take half the permits. Holding this number of permits should allow
|
||||
// the state to transition to healthy.
|
||||
let _half_the_permits = sem.acquire_many(QUEUE_DEPTH as u32 / 2).await.unwrap();
|
||||
|
@ -498,6 +570,15 @@ mod tests {
|
|||
.with_timeout_panic(Duration::from_secs(5))
|
||||
.await;
|
||||
|
||||
// Assert the saturation metric reports a duration of at least 1 poll
|
||||
// interval (the lower bound necessary for the above recovery to occur)
|
||||
// and the maximum bound (the time since the system entered the
|
||||
// saturated state).
|
||||
assert_saturation_time(&metrics, |d| d >= POLL_INTERVAL);
|
||||
assert_saturation_time(&metrics, |d| {
|
||||
d < Instant::now().duration_since(duration_upper_bound)
|
||||
});
|
||||
|
||||
// Wait up to 60 seconds to observe the recovery task finish.
|
||||
//
|
||||
// The recovery task sets the system state as healthy, and THEN exits,
|
||||
|
|
|
@ -12,13 +12,12 @@ use tokio::{
|
|||
time::Instant,
|
||||
};
|
||||
|
||||
use super::{backpressure::PersistState, context::PersistRequest, worker::SharedWorkerState};
|
||||
use crate::{
|
||||
buffer_tree::partition::{persisting::PersistingData, PartitionData, SortKeyState},
|
||||
persist::worker,
|
||||
};
|
||||
|
||||
use super::{backpressure::PersistState, context::PersistRequest, worker::SharedWorkerState};
|
||||
|
||||
/// A persistence task submission handle.
|
||||
///
|
||||
/// This type is cheap to clone to share across threads.
|
||||
|
@ -160,6 +159,7 @@ impl PersistHandle {
|
|||
exec: Arc<Executor>,
|
||||
store: ParquetStorage,
|
||||
catalog: Arc<dyn Catalog>,
|
||||
metrics: &metric::Registry,
|
||||
) -> (Self, Arc<PersistState>) {
|
||||
assert_ne!(n_workers, 0, "must run at least 1 persist worker");
|
||||
assert_ne!(
|
||||
|
@ -208,7 +208,11 @@ impl PersistHandle {
|
|||
|
||||
// Initialise the saturation state as "not saturated" and provide it
|
||||
// with the task semaphore and total permit count.
|
||||
let persist_state = Arc::new(PersistState::new(persist_queue_depth, Arc::clone(&sem)));
|
||||
let persist_state = Arc::new(PersistState::new(
|
||||
persist_queue_depth,
|
||||
Arc::clone(&sem),
|
||||
metrics,
|
||||
));
|
||||
|
||||
(
|
||||
Self {
|
||||
|
@ -389,6 +393,7 @@ mod tests {
|
|||
use test_helpers::timeout::FutureTimeout;
|
||||
use tokio::sync::mpsc::error::TryRecvError;
|
||||
|
||||
use super::*;
|
||||
use crate::{
|
||||
buffer_tree::{
|
||||
namespace::{name_resolver::mock::MockNamespaceNameProvider, NamespaceName},
|
||||
|
@ -402,8 +407,6 @@ mod tests {
|
|||
test_util::make_write_op,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
const PARTITION_ID: PartitionId = PartitionId::new(42);
|
||||
const NAMESPACE_ID: NamespaceId = NamespaceId::new(24);
|
||||
const TABLE_ID: TableId = TableId::new(2442);
|
||||
|
@ -472,9 +475,10 @@ mod tests {
|
|||
async fn test_persist_sort_key_provided_none() {
|
||||
let storage = ParquetStorage::new(Arc::new(InMemory::default()), StorageId::from("iox"));
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
let catalog = Arc::new(MemCatalog::new(metrics));
|
||||
let catalog = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
|
||||
|
||||
let (mut handle, state) = PersistHandle::new(1, 2, Arc::clone(&EXEC), storage, catalog);
|
||||
let (mut handle, state) =
|
||||
PersistHandle::new(1, 2, Arc::clone(&EXEC), storage, catalog, &metrics);
|
||||
assert!(!state.is_saturated());
|
||||
|
||||
// Kill the workers, and replace the queues so we can inspect the
|
||||
|
@ -540,9 +544,10 @@ mod tests {
|
|||
async fn test_persist_sort_key_deferred_resolved_none_update_necessary() {
|
||||
let storage = ParquetStorage::new(Arc::new(InMemory::default()), StorageId::from("iox"));
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
let catalog = Arc::new(MemCatalog::new(metrics));
|
||||
let catalog = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
|
||||
|
||||
let (mut handle, state) = PersistHandle::new(1, 2, Arc::clone(&EXEC), storage, catalog);
|
||||
let (mut handle, state) =
|
||||
PersistHandle::new(1, 2, Arc::clone(&EXEC), storage, catalog, &metrics);
|
||||
assert!(!state.is_saturated());
|
||||
|
||||
// Kill the workers, and replace the queues so we can inspect the
|
||||
|
@ -620,9 +625,10 @@ mod tests {
|
|||
async fn test_persist_sort_key_deferred_resolved_some_update_necessary() {
|
||||
let storage = ParquetStorage::new(Arc::new(InMemory::default()), StorageId::from("iox"));
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
let catalog = Arc::new(MemCatalog::new(metrics));
|
||||
let catalog = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
|
||||
|
||||
let (mut handle, state) = PersistHandle::new(1, 2, Arc::clone(&EXEC), storage, catalog);
|
||||
let (mut handle, state) =
|
||||
PersistHandle::new(1, 2, Arc::clone(&EXEC), storage, catalog, &metrics);
|
||||
assert!(!state.is_saturated());
|
||||
|
||||
// Kill the workers, and replace the queues so we can inspect the
|
||||
|
@ -700,9 +706,10 @@ mod tests {
|
|||
async fn test_persist_sort_key_no_update_necessary() {
|
||||
let storage = ParquetStorage::new(Arc::new(InMemory::default()), StorageId::from("iox"));
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
let catalog = Arc::new(MemCatalog::new(metrics));
|
||||
let catalog = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
|
||||
|
||||
let (mut handle, state) = PersistHandle::new(1, 2, Arc::clone(&EXEC), storage, catalog);
|
||||
let (mut handle, state) =
|
||||
PersistHandle::new(1, 2, Arc::clone(&EXEC), storage, catalog, &metrics);
|
||||
assert!(!state.is_saturated());
|
||||
|
||||
// Kill the workers, and replace the queues so we can inspect the
|
||||
|
|
|
@ -251,8 +251,9 @@ mod tests {
|
|||
);
|
||||
let timestamp = Arc::new(TimestampOracle::new(0));
|
||||
|
||||
let metrics = metric::Registry::default();
|
||||
let sem = Arc::new(Semaphore::new(PERSIST_QUEUE_DEPTH));
|
||||
let persist_state = Arc::new(PersistState::new(PERSIST_QUEUE_DEPTH, sem));
|
||||
let persist_state = Arc::new(PersistState::new(PERSIST_QUEUE_DEPTH, sem, &metrics));
|
||||
|
||||
let handler = RpcWrite::new(Arc::clone(&mock), timestamp, persist_state);
|
||||
|
||||
|
@ -366,8 +367,9 @@ mod tests {
|
|||
let mock = Arc::new(MockDmlSink::default().with_apply_return(vec![Ok(()), Ok(())]));
|
||||
let timestamp = Arc::new(TimestampOracle::new(0));
|
||||
|
||||
let metrics = metric::Registry::default();
|
||||
let sem = Arc::new(Semaphore::new(PERSIST_QUEUE_DEPTH));
|
||||
let persist_state = Arc::new(PersistState::new(PERSIST_QUEUE_DEPTH, sem));
|
||||
let persist_state = Arc::new(PersistState::new(PERSIST_QUEUE_DEPTH, sem, &metrics));
|
||||
|
||||
let handler = RpcWrite::new(Arc::clone(&mock), timestamp, persist_state);
|
||||
|
||||
|
@ -424,8 +426,9 @@ mod tests {
|
|||
let mock = Arc::new(MockDmlSink::default().with_apply_return(vec![Ok(()), Ok(())]));
|
||||
let timestamp = Arc::new(TimestampOracle::new(0));
|
||||
|
||||
let metrics = metric::Registry::default();
|
||||
let sem = Arc::new(Semaphore::new(PERSIST_QUEUE_DEPTH));
|
||||
let persist_state = Arc::new(PersistState::new(PERSIST_QUEUE_DEPTH, sem));
|
||||
let persist_state = Arc::new(PersistState::new(PERSIST_QUEUE_DEPTH, sem, &metrics));
|
||||
|
||||
let handler = RpcWrite::new(Arc::clone(&mock), timestamp, Arc::clone(&persist_state));
|
||||
|
||||
|
|
Loading…
Reference in New Issue