feat(ingester2): persist active & queue timings
Adds more debug logging to the persist code paths, as well as capturing & logging (at INFO) timing information tracking the time a persist task spends in the queue, the active time spent actually persisting the data, and the total duration of time since the request was created (sum of both durations).pull/24376/head
parent
e108a8b6c9
commit
5fa4e49098
|
@ -11,7 +11,7 @@ use observability_deps::tracing::*;
|
|||
use parking_lot::Mutex;
|
||||
use parquet_file::metadata::IoxMetadata;
|
||||
use schema::sort::SortKey;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::{sync::Notify, time::Instant};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
|
@ -35,14 +35,18 @@ pub(super) struct PersistRequest {
|
|||
complete: Arc<Notify>,
|
||||
partition: Arc<Mutex<PartitionData>>,
|
||||
data: PersistingData,
|
||||
enqueued_at: Instant,
|
||||
}
|
||||
|
||||
impl PersistRequest {
|
||||
/// Construct a [`PersistRequest`] for `data` from `partition`, recording
|
||||
/// the current timestamp as the "enqueued at" point.
|
||||
pub(super) fn new(partition: Arc<Mutex<PartitionData>>, data: PersistingData) -> Self {
|
||||
Self {
|
||||
complete: Arc::new(Notify::default()),
|
||||
partition,
|
||||
data,
|
||||
enqueued_at: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -63,7 +67,6 @@ pub(super) struct Context {
|
|||
partition: Arc<Mutex<PartitionData>>,
|
||||
data: PersistingData,
|
||||
inner: Arc<Inner>,
|
||||
|
||||
/// IDs loaded from the partition at construction time.
|
||||
namespace_id: NamespaceId,
|
||||
table_id: TableId,
|
||||
|
@ -93,6 +96,13 @@ pub(super) struct Context {
|
|||
/// A notification signal to indicate to the caller that this partition has
|
||||
/// persisted.
|
||||
complete: Arc<Notify>,
|
||||
|
||||
/// Timing statistics tracking the timestamp this persist job was first
|
||||
/// enqueued, and the timestamp this [`Context`] was constructed (signifying
|
||||
/// the start of active persist work, as opposed to passive time spent in
|
||||
/// the queue).
|
||||
enqueued_at: Instant,
|
||||
dequeued_at: Instant,
|
||||
}
|
||||
|
||||
impl Context {
|
||||
|
@ -128,6 +138,8 @@ impl Context {
|
|||
sort_key: guard.sort_key().clone(),
|
||||
|
||||
complete,
|
||||
enqueued_at: req.enqueued_at,
|
||||
dequeued_at: Instant::now(),
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -364,6 +376,8 @@ impl Context {
|
|||
// the persisted data will be dropped "shortly".
|
||||
self.partition.lock().mark_persisted(self.data);
|
||||
|
||||
let now = Instant::now();
|
||||
|
||||
info!(
|
||||
%object_store_id,
|
||||
namespace_id = %self.namespace_id,
|
||||
|
@ -372,6 +386,9 @@ impl Context {
|
|||
table_name = %self.table_name,
|
||||
partition_id = %self.partition_id,
|
||||
partition_key = %self.partition_key,
|
||||
total_persist_duration = ?now.duration_since(self.enqueued_at),
|
||||
active_persist_duration = ?now.duration_since(self.dequeued_at),
|
||||
queued_persist_duration = ?self.dequeued_at.duration_since(self.enqueued_at),
|
||||
"persisted partition"
|
||||
);
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::sync::Arc;
|
|||
|
||||
use iox_catalog::interface::Catalog;
|
||||
use iox_query::exec::Executor;
|
||||
use observability_deps::tracing::info;
|
||||
use observability_deps::tracing::{debug, info};
|
||||
use parking_lot::Mutex;
|
||||
use parquet_file::storage::ParquetStorage;
|
||||
use thiserror::Error;
|
||||
|
@ -169,6 +169,11 @@ impl PersistHandle {
|
|||
partition: Arc<Mutex<PartitionData>>,
|
||||
data: PersistingData,
|
||||
) -> Arc<Notify> {
|
||||
debug!(
|
||||
partition_id = data.partition_id().get(),
|
||||
"enqueuing persistence task"
|
||||
);
|
||||
|
||||
// Build the persist task request
|
||||
let r = PersistRequest::new(partition, data);
|
||||
let notify = r.complete_notification();
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use futures::{stream, StreamExt};
|
||||
use observability_deps::tracing::*;
|
||||
use std::{future, sync::Arc, time::Duration};
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::{buffer_tree::BufferTree, persist::handle::PersistHandle};
|
||||
|
||||
|
@ -90,9 +91,17 @@ pub(crate) async fn periodic_rotation(
|
|||
let notifications = stream::iter(buffer.partitions())
|
||||
.filter_map(|p| {
|
||||
async move {
|
||||
let t = Instant::now();
|
||||
|
||||
// Skip this partition if there is no data to persist
|
||||
let data = p.lock().mark_persisting()?;
|
||||
|
||||
debug!(
|
||||
partition_id=data.partition_id().get(),
|
||||
lock_wait=?Instant::now().duration_since(t),
|
||||
"read data for persistence"
|
||||
);
|
||||
|
||||
// Enqueue the partition for persistence.
|
||||
//
|
||||
// The persist task will call mark_persisted() on the partition
|
||||
|
|
Loading…
Reference in New Issue