docs: hot partition persistence

Document the hot partition persistence configuration values.
pull/24376/head
Dom Dwyer 2023-03-01 14:13:43 +01:00
parent cfd377d12a
commit bbd471718d
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
3 changed files with 37 additions and 9 deletions

View File

@ -146,6 +146,8 @@ impl PartitionData {
Ok(())
}
/// Return an estimated cost of persisting the data buffered in this
/// [`PartitionData`].
pub(crate) fn persist_cost_estimate(&self) -> usize {
self.buffer.persist_cost_estimate()
}

View File

@ -159,6 +159,19 @@ pub enum InitError {
///
/// Any error during replay is fatal.
///
/// ## Graceful Shutdown
///
/// When `shutdown` completes, the ingester blocks ingest (returning an error to
/// all new write requests) while still executing query requests. The ingester
/// then persists all data currently buffered.
///
/// Callers can wait for this buffer persist to complete by awaiting
/// [`IngesterGuard::join()`], which will resolve once all data has been flushed
/// to object storage.
///
/// The ingester will continue answering queries until the gRPC server is
/// stopped by the caller (managed outside of this crate).
///
/// ## Deferred Loading for Persist Operations
///
/// Several items within the ingester's internal state are loaded only when
@ -184,18 +197,25 @@ pub enum InitError {
/// operations, but not so long that it causes catalog load spikes at persist
/// time (which can be observed by the catalog instrumentation metrics).
///
/// ## Graceful Shutdown
/// ## Hot Persistence
///
/// When `shutdown` completes, the ingester blocks ingest (returning an error to
/// all new write requests) while still executing query requests. The ingester
/// then persists all data currently buffered.
/// Partitions have a opaque estimate of the "cost" (in terms of time/space) to
/// persist the data within them. The cost calculation is made in
/// [`MutableBatch::size_data()`].
///
/// Callers can wait for this buffer persist to complete by awaiting
/// [`IngesterGuard::join()`], which will resolve once all data has been flushed
/// to object storage.
/// Once this cost estimation exceeds the `persist_hot_partition_cost` the
/// partition is immediately enqueued for persistence, and subsequent writes are
/// applied to a new buffer.
///
/// The ingester will continue answering queries until the gRPC server is
/// stopped by the caller (managed outside of this crate).
/// Increasing this value reduces the frequency of hot partition persistence,
/// but may also increase the total amount of data that needs persisting for a
/// single partition. In practice, this increases the memory utilisation of
/// datafusion during the persist compaction step.
///
/// Decreasing this value increases the frequency of persist operations, and
/// usually decreases the size of the resulting parquet files.
///
/// [`MutableBatch::size_data()`]: mutable_batch::MutableBatch::size_data
#[allow(clippy::too_many_arguments)]
pub async fn new<F>(
catalog: Arc<dyn Catalog>,

View File

@ -7,6 +7,8 @@ use crate::buffer_tree::{partition::PartitionData, post_write::PostWriteObserver
use super::queue::PersistQueue;
/// A [`PostWriteObserver`] that triggers persistence of a partition when the
/// estimated persistence cost exceeds a pre-configured limit.
#[derive(Debug)]
pub(crate) struct HotPartitionPersister<P> {
persist_handle: P,
@ -57,6 +59,10 @@ where
#[inline(always)]
fn observe(&self, partition: Arc<Mutex<PartitionData>>, guard: MutexGuard<'_, PartitionData>) {
// Without releasing the lock, obtain the new persist cost estimate.
//
// By holding the write lock, concurrent writes are blocked while the
// cost is evaluated. This prevents "overrun" where parallel writes are
// applied while the cost is evaluated concurrently in this thread.
let cost_estimate = guard.persist_cost_estimate();
// This observer is called after a successful write, therefore