feat(metrics): persist duration histograms

Adds metrics to track the distribution duration spent actively
persisting a batch of partition data (compacting, generating parquet,
uploading, DB entries, etc) and another tracking the duration of time an
entry spent in the persist queue.

Together these provide a measurement of the latency of persist requests,
and as they contain event counters, they also provide the throughput and
number of outstanding jobs.
pull/24376/head
Dom Dwyer 2022-12-21 14:13:21 +01:00
parent 0637540aad
commit 3541243fcb
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
4 changed files with 77 additions and 22 deletions

View File

@ -271,6 +271,10 @@ impl Context {
let _ = self.complete.send(());
}
pub(super) fn enqueued_at(&self) -> Instant {
self.enqueued_at
}
pub(super) fn sort_key(&self) -> &SortKeyState {
&self.sort_key
}

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use iox_catalog::interface::Catalog;
use iox_query::{exec::Executor, QueryChunkMeta};
use metric::U64Counter;
use metric::{DurationHistogram, U64Counter};
use observability_deps::tracing::*;
use parking_lot::Mutex;
use parquet_file::storage::ParquetStorage;
@ -192,6 +192,21 @@ impl PersistHandle {
completion_observer,
});
// Initialise a histogram to capture persist job duration & time spent
// in the queue.
let persist_duration = metrics
.register_metric::<DurationHistogram>(
"ingester_persist_active_duration",
"the distribution of persist job processing duration in nanoseconds",
)
.recorder(&[]);
let queue_duration = metrics
.register_metric::<DurationHistogram>(
"ingester_persist_enqueue_duration",
"the distribution of duration a persist job spent enqueued, waiting to be processed in nanoseconds",
)
.recorder(&[]);
// Initialise the global queue.
//
// Persist tasks that do not require a sort key update are enqueued into
@ -211,6 +226,8 @@ impl PersistHandle {
worker_state,
global_rx.clone(),
rx,
queue_duration.clone(),
persist_duration.clone(),
))),
)
})
@ -422,7 +439,6 @@ mod tests {
use futures::Future;
use iox_catalog::mem::MemCatalog;
use lazy_static::lazy_static;
use metric::{Attributes, Metric};
use object_store::memory::InMemory;
use parquet_file::storage::StorageId;
use schema::sort::SortKey;
@ -441,7 +457,10 @@ mod tests {
deferred_load::DeferredLoad,
dml_sink::DmlSink,
ingest_state::IngestStateError,
persist::completion_observer::{mock::MockCompletionObserver, NopObserver},
persist::{
completion_observer::{mock::MockCompletionObserver, NopObserver},
tests::assert_metric_counter,
},
test_util::make_write_op,
};
@ -465,18 +484,6 @@ mod tests {
}));
}
#[track_caller]
fn assert_metric(metrics: &metric::Registry, name: &'static str, value: u64) {
let v = metrics
.get_instrument::<Metric<U64Counter>>(name)
.expect("failed to read metric")
.get_observer(&Attributes::from([]))
.expect("failed to get observer")
.fetch();
assert_eq!(v, value, "metric {name} had value {v} want {value}");
}
/// Construct a partition with the above constants, with the given sort key,
/// and containing a single write.
async fn new_partition(
@ -922,6 +929,6 @@ mod tests {
assert_matches!(ingest_state.read(), Err(IngestStateError::PersistSaturated));
// And the counter shows two persist ops.
assert_metric(&metrics, "ingester_persist_enqueued_jobs", 2);
assert_metric_counter(&metrics, "ingester_persist_enqueued_jobs", 2);
}
}

View File

@ -23,7 +23,7 @@ mod tests {
};
use iox_query::exec::Executor;
use lazy_static::lazy_static;
use metric::{Attributes, Metric, U64Counter};
use metric::{Attributes, DurationHistogram, Metric, U64Counter};
use object_store::{memory::InMemory, ObjectMeta, ObjectStore};
use parking_lot::Mutex;
use parquet_file::{
@ -120,7 +120,11 @@ mod tests {
}
#[track_caller]
fn assert_metric(metrics: &metric::Registry, name: &'static str, value: u64) {
pub(super) fn assert_metric_counter(
metrics: &metric::Registry,
name: &'static str,
value: u64,
) {
let v = metrics
.get_instrument::<Metric<U64Counter>>(name)
.expect("failed to read metric")
@ -131,6 +135,23 @@ mod tests {
assert_eq!(v, value, "metric {name} had value {v} want {value}");
}
#[track_caller]
pub(super) fn assert_metric_histogram(
metrics: &metric::Registry,
name: &'static str,
hits: u64,
) {
let v = metrics
.get_instrument::<Metric<DurationHistogram>>(name)
.expect("failed to read metric")
.get_observer(&Attributes::from([]))
.expect("failed to get observer")
.fetch()
.sample_count();
assert_eq!(v, hits, "metric {name} had {v} samples want {hits}");
}
/// A complete integration test of the persistence system components.
#[tokio::test]
async fn test_persist_integration() {
@ -169,11 +190,15 @@ mod tests {
.mark_persisting()
.expect("partition with write should transition to persisting");
// Assert the starting metric values.
assert_metric_histogram(&metrics, "ingester_persist_active_duration", 0);
assert_metric_histogram(&metrics, "ingester_persist_enqueue_duration", 0);
// Enqueue the persist job
let notify = handle.enqueue(Arc::clone(&partition), data).await;
assert!(ingest_state.read().is_ok());
assert_metric(&metrics, "ingester_persist_enqueued_jobs", 1);
assert_metric_counter(&metrics, "ingester_persist_enqueued_jobs", 1);
// Wait for the persist to complete.
notify
@ -190,6 +215,10 @@ mod tests {
assert_eq!(n.sequence_numbers().len(), 1);
});
// And that metrics recorded the enqueue & completion
assert_metric_histogram(&metrics, "ingester_persist_active_duration", 1);
assert_metric_histogram(&metrics, "ingester_persist_enqueue_duration", 1);
// Assert the partition persistence count increased, an indication that
// mark_persisted() was called.
assert_eq!(partition.lock().completed_persistence_count(), 1);
@ -330,6 +359,12 @@ mod tests {
assert_eq!(n.sequence_numbers().len(), 1);
});
// And that despite the persist job effectively running twice (to handle
// the sort key conflict) the metrics should record 1 persist job start
// & completion
assert_metric_histogram(&metrics, "ingester_persist_active_duration", 1);
assert_metric_histogram(&metrics, "ingester_persist_enqueue_duration", 1);
// Assert the partition persistence count increased, an indication that
// mark_persisted() was called.
assert_eq!(partition.lock().completed_persistence_count(), 1);

View File

@ -5,13 +5,12 @@ use backoff::Backoff;
use data_types::{CompactionLevel, ParquetFileParams, SequenceNumber};
use iox_catalog::interface::{get_table_schema_by_id, CasFailure, Catalog};
use iox_query::exec::Executor;
use iox_time::{SystemProvider, TimeProvider};
use metric::DurationHistogram;
use observability_deps::tracing::{debug, info, warn};
use parquet_file::{metadata::IoxMetadata, storage::ParquetStorage};
use schema::sort::SortKey;
use tokio::sync::mpsc;
use tokio::{sync::mpsc, time::Instant};
use uuid::Uuid;
use crate::persist::compact::compact_persisting_batch;
@ -76,6 +75,8 @@ pub(super) async fn run_task<O>(
worker_state: Arc<SharedWorkerState<O>>,
global_queue: async_channel::Receiver<PersistRequest>,
mut rx: mpsc::UnboundedReceiver<PersistRequest>,
queue_duration: DurationHistogram,
persist_duration: DurationHistogram,
) where
O: PersistCompletionObserver,
{
@ -114,6 +115,10 @@ pub(super) async fn run_task<O>(
let mut ctx = Context::new(req);
// Capture the time spent in the queue.
let started_at = Instant::now();
queue_duration.record(started_at.duration_since(ctx.enqueued_at()));
// Compact the data, generate the parquet file from the result, and
// upload it to object storage.
//
@ -136,6 +141,10 @@ pub(super) async fn run_task<O>(
// observers.
ctx.mark_complete(object_store_id, &worker_state.completion_observer)
.await;
// Capture the time spent actively persisting.
let now = Instant::now();
persist_duration.record(now.duration_since(started_at));
}
}