diff --git a/ingester2/src/persist/context.rs b/ingester2/src/persist/context.rs index d82a3a5784..32ef83f64a 100644 --- a/ingester2/src/persist/context.rs +++ b/ingester2/src/persist/context.rs @@ -11,7 +11,7 @@ use observability_deps::tracing::*; use parking_lot::Mutex; use parquet_file::metadata::IoxMetadata; use schema::sort::SortKey; -use tokio::sync::Notify; +use tokio::{sync::Notify, time::Instant}; use uuid::Uuid; use crate::{ @@ -35,14 +35,18 @@ pub(super) struct PersistRequest { complete: Arc, partition: Arc>, data: PersistingData, + enqueued_at: Instant, } impl PersistRequest { + /// Construct a [`PersistRequest`] for `data` from `partition`, recording + /// the current timestamp as the "enqueued at" point. pub(super) fn new(partition: Arc>, data: PersistingData) -> Self { Self { complete: Arc::new(Notify::default()), partition, data, + enqueued_at: Instant::now(), } } @@ -63,7 +67,6 @@ pub(super) struct Context { partition: Arc>, data: PersistingData, inner: Arc, - /// IDs loaded from the partition at construction time. namespace_id: NamespaceId, table_id: TableId, @@ -93,6 +96,13 @@ pub(super) struct Context { /// A notification signal to indicate to the caller that this partition has /// persisted. complete: Arc, + + /// Timing statistics tracking the timestamp this persist job was first + /// enqueued, and the timestamp this [`Context`] was constructed (signifying + /// the start of active persist work, as opposed to passive time spent in + /// the queue). + enqueued_at: Instant, + dequeued_at: Instant, } impl Context { @@ -128,6 +138,8 @@ impl Context { sort_key: guard.sort_key().clone(), complete, + enqueued_at: req.enqueued_at, + dequeued_at: Instant::now(), } }; @@ -364,6 +376,8 @@ impl Context { // the persisted data will be dropped "shortly". self.partition.lock().mark_persisted(self.data); + let now = Instant::now(); + info!( %object_store_id, namespace_id = %self.namespace_id, @@ -372,6 +386,9 @@ impl Context { table_name = %self.table_name, partition_id = %self.partition_id, partition_key = %self.partition_key, + total_persist_duration = ?now.duration_since(self.enqueued_at), + active_persist_duration = ?now.duration_since(self.dequeued_at), + queued_persist_duration = ?self.dequeued_at.duration_since(self.enqueued_at), "persisted partition" ); diff --git a/ingester2/src/persist/handle.rs b/ingester2/src/persist/handle.rs index 11f21814d4..d90ac5b871 100644 --- a/ingester2/src/persist/handle.rs +++ b/ingester2/src/persist/handle.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use iox_catalog::interface::Catalog; use iox_query::exec::Executor; -use observability_deps::tracing::info; +use observability_deps::tracing::{debug, info}; use parking_lot::Mutex; use parquet_file::storage::ParquetStorage; use thiserror::Error; @@ -169,6 +169,11 @@ impl PersistHandle { partition: Arc>, data: PersistingData, ) -> Arc { + debug!( + partition_id = data.partition_id().get(), + "enqueuing persistence task" + ); + // Build the persist task request let r = PersistRequest::new(partition, data); let notify = r.complete_notification(); diff --git a/ingester2/src/wal/rotate_task.rs b/ingester2/src/wal/rotate_task.rs index 81e983d923..c5e918ec2e 100644 --- a/ingester2/src/wal/rotate_task.rs +++ b/ingester2/src/wal/rotate_task.rs @@ -1,6 +1,7 @@ use futures::{stream, StreamExt}; use observability_deps::tracing::*; use std::{future, sync::Arc, time::Duration}; +use tokio::time::Instant; use crate::{buffer_tree::BufferTree, persist::handle::PersistHandle}; @@ -90,9 +91,17 @@ pub(crate) async fn periodic_rotation( let notifications = stream::iter(buffer.partitions()) .filter_map(|p| { async move { + let t = Instant::now(); + // Skip this partition if there is no data to persist let data = p.lock().mark_persisting()?; + debug!( + partition_id=data.partition_id().get(), + lock_wait=?Instant::now().duration_since(t), + "read data for persistence" + ); + // Enqueue the partition for persistence. // // The persist task will call mark_persisted() on the partition