From bbd471718dc1612dd9fe05e5de3d62559a2ccb84 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 1 Mar 2023 14:13:43 +0100 Subject: [PATCH] docs: hot partition persistence Document the hot partition persistence configuration values. --- ingester2/src/buffer_tree/partition.rs | 2 ++ ingester2/src/init.rs | 38 +++++++++++++++++++------ ingester2/src/persist/hot_partitions.rs | 6 ++++ 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/ingester2/src/buffer_tree/partition.rs b/ingester2/src/buffer_tree/partition.rs index 79abda50c2..cde6235c8d 100644 --- a/ingester2/src/buffer_tree/partition.rs +++ b/ingester2/src/buffer_tree/partition.rs @@ -146,6 +146,8 @@ impl PartitionData { Ok(()) } + /// Return an estimated cost of persisting the data buffered in this + /// [`PartitionData`]. pub(crate) fn persist_cost_estimate(&self) -> usize { self.buffer.persist_cost_estimate() } diff --git a/ingester2/src/init.rs b/ingester2/src/init.rs index 2286d3ba4e..ed42942e36 100644 --- a/ingester2/src/init.rs +++ b/ingester2/src/init.rs @@ -159,6 +159,19 @@ pub enum InitError { /// /// Any error during replay is fatal. /// +/// ## Graceful Shutdown +/// +/// When `shutdown` completes, the ingester blocks ingest (returning an error to +/// all new write requests) while still executing query requests. The ingester +/// then persists all data currently buffered. +/// +/// Callers can wait for this buffer persist to complete by awaiting +/// [`IngesterGuard::join()`], which will resolve once all data has been flushed +/// to object storage. +/// +/// The ingester will continue answering queries until the gRPC server is +/// stopped by the caller (managed outside of this crate). +/// /// ## Deferred Loading for Persist Operations /// /// Several items within the ingester's internal state are loaded only when @@ -184,18 +197,25 @@ pub enum InitError { /// operations, but not so long that it causes catalog load spikes at persist /// time (which can be observed by the catalog instrumentation metrics). /// -/// ## Graceful Shutdown +/// ## Hot Persistence /// -/// When `shutdown` completes, the ingester blocks ingest (returning an error to -/// all new write requests) while still executing query requests. The ingester -/// then persists all data currently buffered. +/// Partitions have a opaque estimate of the "cost" (in terms of time/space) to +/// persist the data within them. The cost calculation is made in +/// [`MutableBatch::size_data()`]. /// -/// Callers can wait for this buffer persist to complete by awaiting -/// [`IngesterGuard::join()`], which will resolve once all data has been flushed -/// to object storage. +/// Once this cost estimation exceeds the `persist_hot_partition_cost` the +/// partition is immediately enqueued for persistence, and subsequent writes are +/// applied to a new buffer. /// -/// The ingester will continue answering queries until the gRPC server is -/// stopped by the caller (managed outside of this crate). +/// Increasing this value reduces the frequency of hot partition persistence, +/// but may also increase the total amount of data that needs persisting for a +/// single partition. In practice, this increases the memory utilisation of +/// datafusion during the persist compaction step. +/// +/// Decreasing this value increases the frequency of persist operations, and +/// usually decreases the size of the resulting parquet files. +/// +/// [`MutableBatch::size_data()`]: mutable_batch::MutableBatch::size_data #[allow(clippy::too_many_arguments)] pub async fn new( catalog: Arc, diff --git a/ingester2/src/persist/hot_partitions.rs b/ingester2/src/persist/hot_partitions.rs index 8c260f58e9..938f5f1a27 100644 --- a/ingester2/src/persist/hot_partitions.rs +++ b/ingester2/src/persist/hot_partitions.rs @@ -7,6 +7,8 @@ use crate::buffer_tree::{partition::PartitionData, post_write::PostWriteObserver use super::queue::PersistQueue; +/// A [`PostWriteObserver`] that triggers persistence of a partition when the +/// estimated persistence cost exceeds a pre-configured limit. #[derive(Debug)] pub(crate) struct HotPartitionPersister

{ persist_handle: P, @@ -57,6 +59,10 @@ where #[inline(always)] fn observe(&self, partition: Arc>, guard: MutexGuard<'_, PartitionData>) { // Without releasing the lock, obtain the new persist cost estimate. + // + // By holding the write lock, concurrent writes are blocked while the + // cost is evaluated. This prevents "overrun" where parallel writes are + // applied while the cost is evaluated concurrently in this thread. let cost_estimate = guard.persist_cost_estimate(); // This observer is called after a successful write, therefore