refactor(persist): pluggable completion observer

Changes the persist system to call into an abstract
PersistCompletionObserver after the persist task has completed, but
before releasing the job permit / notifying the enqueuer.

This call happens synchronously, driven by the persist worker to
completion. A sync construct can easily be made async (by enqueuing work
into a channel), but not the other way around, so this gives the best
flexibility.

This trait allows pluggable logic to be inserted into the persist
system, without tightly coupling it to the implementer's logic (for
example, replication). One or more observers may be chained together to
construct an arbitrary sequence of actors.

This commit uses a no-op observer, causing no functional change to the
system.
pull/24376/head
Dom Dwyer 2023-01-17 00:06:06 +01:00
parent c4bf4bad91
commit 4b3a5c0c2b
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
6 changed files with 221 additions and 30 deletions

View File

@ -35,7 +35,10 @@ use crate::{
},
ingest_state::IngestState,
ingester_id::IngesterId,
persist::{handle::PersistHandle, hot_partitions::HotPartitionPersister},
persist::{
completion_observer::NopObserver, handle::PersistHandle,
hot_partitions::HotPartitionPersister,
},
server::grpc::GrpcDelegate,
timestamp_oracle::TimestampOracle,
wal::{rotate_task::periodic_rotation, wal_sink::WalSink},
@ -285,6 +288,7 @@ where
persist_executor,
object_store,
Arc::clone(&catalog),
NopObserver::default(),
&metrics,
);
let persist_handle = Arc::new(persist_handle);

View File

@ -0,0 +1,120 @@
use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{sequence_number_set::SequenceNumberSet, NamespaceId, PartitionId, TableId};
/// An abstract observer of persistence completion events.
///
/// This call is made synchronously by the persist worker, after
/// [`PartitionData::mark_persisted()`] has been called, but before the persist
/// job is logged as complete / waiter notification fired / job permit dropped.
///
/// Implementers SHOULD be be relatively quick to return to the caller, avoiding
/// unnecessary slowdown of persistence system.
///
/// [`PartitionData::mark_persisted()`]:
/// crate::buffer_tree::partition::PartitionData::mark_persisted()
#[async_trait]
pub(crate) trait PersistCompletionObserver: Send + Sync + Debug {
/// Observe the [`CompletedPersist`] notification for the newly persisted
/// data.
async fn persist_complete(&self, note: Arc<CompletedPersist>);
}
/// A set of details describing the persisted data.
#[derive(Debug, Clone)]
pub struct CompletedPersist {
/// The catalog identifiers for the persisted partition.
namespace_id: NamespaceId,
table_id: TableId,
partition_id: PartitionId,
/// The [`SequenceNumberSet`] of the persisted data.
sequence_numbers: SequenceNumberSet,
}
impl CompletedPersist {
/// Construct a new completion notification.
pub(super) fn new(
namespace_id: NamespaceId,
table_id: TableId,
partition_id: PartitionId,
sequence_numbers: SequenceNumberSet,
) -> Self {
Self {
namespace_id,
table_id,
partition_id,
sequence_numbers,
}
}
/// Returns the [`NamespaceId`] of the persisted data.
pub(crate) fn namespace_id(&self) -> NamespaceId {
self.namespace_id
}
/// Returns the [`TableId`] of the persisted data.
pub(crate) fn table_id(&self) -> TableId {
self.table_id
}
/// Returns the [`PartitionId`] of the persisted data.
pub(crate) fn partition_id(&self) -> PartitionId {
self.partition_id
}
/// Returns the [`SequenceNumberSet`] of the persisted data.
pub(crate) fn sequence_numbers(&self) -> &SequenceNumberSet {
&self.sequence_numbers
}
}
/// A no-op implementation of the [`PersistCompletionObserver`] trait.
#[derive(Debug, Default)]
pub(crate) struct NopObserver;
#[async_trait]
impl PersistCompletionObserver for NopObserver {
async fn persist_complete(&self, _note: Arc<CompletedPersist>) {
// the goggles do nothing!
}
}
#[async_trait]
impl<T> PersistCompletionObserver for Arc<T>
where
T: PersistCompletionObserver,
{
async fn persist_complete(&self, note: Arc<CompletedPersist>) {
(**self).persist_complete(note).await
}
}
#[cfg(test)]
pub(crate) mod mock {
use std::sync::Arc;
use parking_lot::Mutex;
use super::*;
/// A mock observer that captures the calls it receives.
#[derive(Debug, Default)]
pub(crate) struct MockCompletionObserver {
calls: Mutex<Vec<Arc<CompletedPersist>>>,
}
impl MockCompletionObserver {
pub(crate) fn calls(&self) -> Vec<Arc<CompletedPersist>> {
self.calls.lock().clone()
}
}
#[async_trait]
impl PersistCompletionObserver for MockCompletionObserver {
async fn persist_complete(&self, note: Arc<CompletedPersist>) {
self.calls.lock().push(Arc::clone(&note));
}
}
}

View File

@ -18,8 +18,11 @@ use crate::{
table::TableName,
},
deferred_load::DeferredLoad,
persist::completion_observer::CompletedPersist,
};
use super::completion_observer::PersistCompletionObserver;
/// Errors a persist can experience.
#[derive(Debug, Error)]
pub(super) enum PersistError {
@ -217,7 +220,10 @@ impl Context {
// Call [`PartitionData::mark_complete`] to finalise the persistence job,
// emit a log for the user, and notify the observer of this persistence
// task, if any.
pub(super) fn mark_complete(self, object_store_id: Uuid) {
pub(super) async fn mark_complete<O>(self, object_store_id: Uuid, completion_observer: &O)
where
O: PersistCompletionObserver,
{
// Mark the partition as having completed persistence, causing it to
// release the reference to the in-flight persistence data it is
// holding.
@ -226,6 +232,18 @@ impl Context {
// queries that currently hold a reference to the data. In either case,
// the persisted data will be dropped "shortly".
let sequence_numbers = self.partition.lock().mark_persisted(self.data);
let n_writes = sequence_numbers.len();
// Dispatch the completion notification into the observer chain before
// completing the persist operation.
completion_observer
.persist_complete(Arc::new(CompletedPersist::new(
self.namespace_id,
self.table_id,
self.partition_id,
sequence_numbers,
)))
.await;
let now = Instant::now();
@ -240,7 +258,7 @@ impl Context {
total_persist_duration = ?now.duration_since(self.enqueued_at),
active_persist_duration = ?now.duration_since(self.dequeued_at),
queued_persist_duration = ?self.dequeued_at.duration_since(self.enqueued_at),
n_writes = sequence_numbers.len(),
n_writes,
"persisted partition"
);

View File

@ -14,8 +14,8 @@ use tokio::{
};
use super::{
backpressure::PersistState, context::PersistRequest, queue::PersistQueue,
worker::SharedWorkerState,
backpressure::PersistState, completion_observer::PersistCompletionObserver,
context::PersistRequest, queue::PersistQueue, worker::SharedWorkerState,
};
use crate::{
buffer_tree::partition::{persisting::PersistingData, PartitionData, SortKeyState},
@ -119,9 +119,6 @@ use crate::{
/// crate::ingest_state::IngestStateError::PersistSaturated
#[derive(Debug)]
pub(crate) struct PersistHandle {
/// The state/dependencies shared across all worker tasks.
worker_state: Arc<SharedWorkerState>,
/// Task handles for the worker tasks, aborted on drop of all
/// [`PersistHandle`] instances.
worker_tasks: Vec<AbortOnDrop<()>>,
@ -161,15 +158,20 @@ pub(crate) struct PersistHandle {
impl PersistHandle {
/// Initialise a new persist actor & obtain the first handle.
pub(crate) fn new(
#[allow(clippy::too_many_arguments)]
pub(crate) fn new<O>(
n_workers: usize,
persist_queue_depth: usize,
ingest_state: Arc<IngestState>,
exec: Arc<Executor>,
store: ParquetStorage,
catalog: Arc<dyn Catalog>,
completion_observer: O,
metrics: &metric::Registry,
) -> Self {
) -> Self
where
O: PersistCompletionObserver + 'static,
{
assert_ne!(n_workers, 0, "must run at least 1 persist worker");
assert_ne!(
persist_queue_depth, 0,
@ -183,6 +185,7 @@ impl PersistHandle {
exec,
store,
catalog,
completion_observer,
});
// Initialise the global queue.
@ -225,7 +228,6 @@ impl PersistHandle {
));
Self {
worker_state,
sem,
global_queue: global_tx,
worker_queues: JumpHash::new(tx_handles),
@ -415,6 +417,7 @@ mod tests {
},
deferred_load::DeferredLoad,
dml_sink::DmlSink,
persist::completion_observer::mock::MockCompletionObserver,
test_util::make_write_op,
};
@ -495,6 +498,7 @@ mod tests {
Arc::clone(&EXEC),
storage,
catalog,
Arc::new(MockCompletionObserver::default()),
&metrics,
);
@ -570,6 +574,7 @@ mod tests {
Arc::clone(&EXEC),
storage,
catalog,
Arc::new(MockCompletionObserver::default()),
&metrics,
);
@ -657,6 +662,7 @@ mod tests {
Arc::clone(&EXEC),
storage,
catalog,
Arc::new(MockCompletionObserver::default()),
&metrics,
);
@ -744,6 +750,7 @@ mod tests {
Arc::clone(&EXEC),
storage,
catalog,
Arc::new(MockCompletionObserver::default()),
&metrics,
);

View File

@ -1,5 +1,6 @@
pub(crate) mod backpressure;
pub(super) mod compact;
pub(crate) mod completion_observer;
mod context;
pub(crate) mod drain_buffer;
pub(crate) mod handle;
@ -40,7 +41,7 @@ mod tests {
},
dml_sink::DmlSink,
ingest_state::IngestState,
persist::queue::PersistQueue,
persist::{completion_observer::mock::MockCompletionObserver, queue::PersistQueue},
test_util::{make_write_op, populate_catalog},
TRANSITION_SHARD_INDEX,
};
@ -127,6 +128,7 @@ mod tests {
let metrics = Arc::new(metric::Registry::default());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let ingest_state = Arc::new(IngestState::default());
let completion_observer = Arc::new(MockCompletionObserver::default());
// Initialise the persist system.
let handle = PersistHandle::new(
@ -136,6 +138,7 @@ mod tests {
Arc::clone(&EXEC),
storage,
Arc::clone(&catalog),
Arc::clone(&completion_observer),
&metrics,
);
assert!(ingest_state.read().is_ok());
@ -164,6 +167,14 @@ mod tests {
.expect("timeout waiting for completion notification")
.expect("worker task failed");
// Assert the notification observer saw this persist operation finish.
assert_matches!(&completion_observer.calls().as_slice(), &[n] => {
assert_eq!(n.namespace_id(), namespace_id);
assert_eq!(n.table_id(), table_id);
assert_eq!(n.partition_id(), partition_id);
assert_eq!(n.sequence_numbers().len(), 1);
});
// Assert the partition persistence count increased, an indication that
// mark_persisted() was called.
assert_eq!(partition.lock().completed_persistence_count(), 1);
@ -242,6 +253,7 @@ mod tests {
let metrics = Arc::new(metric::Registry::default());
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
let ingest_state = Arc::new(IngestState::default());
let completion_observer = Arc::new(MockCompletionObserver::default());
// Initialise the persist system.
let handle = PersistHandle::new(
@ -251,6 +263,7 @@ mod tests {
Arc::clone(&EXEC),
storage,
Arc::clone(&catalog),
Arc::clone(&completion_observer),
&metrics,
);
assert!(ingest_state.read().is_ok());
@ -293,6 +306,15 @@ mod tests {
.expect("timeout waiting for completion notification")
.expect("worker task failed");
// Assert the notification observer was invoked exactly once, with the
// successful persist output.
assert_matches!(&completion_observer.calls().as_slice(), &[n] => {
assert_eq!(n.namespace_id(), namespace_id);
assert_eq!(n.table_id(), table_id);
assert_eq!(n.partition_id(), partition_id);
assert_eq!(n.sequence_numbers().len(), 1);
});
// Assert the partition persistence count increased, an indication that
// mark_persisted() was called.
assert_eq!(partition.lock().completed_persistence_count(), 1);

View File

@ -18,15 +18,17 @@ use crate::persist::compact::compact_persisting_batch;
use super::{
compact::CompactedStream,
completion_observer::PersistCompletionObserver,
context::{Context, PersistError, PersistRequest},
};
/// State shared across workers.
#[derive(Debug)]
pub(super) struct SharedWorkerState {
pub(super) struct SharedWorkerState<O> {
pub(super) exec: Arc<Executor>,
pub(super) store: ParquetStorage,
pub(super) catalog: Arc<dyn Catalog>,
pub(super) completion_observer: O,
}
/// The worker routine that drives a [`PersistRequest`] to completion,
@ -70,11 +72,13 @@ pub(super) struct SharedWorkerState {
/// [`PersistingData`]:
/// crate::buffer_tree::partition::persisting::PersistingData
/// [`PartitionData`]: crate::buffer_tree::partition::PartitionData
pub(super) async fn run_task(
worker_state: Arc<SharedWorkerState>,
pub(super) async fn run_task<O>(
worker_state: Arc<SharedWorkerState<O>>,
global_queue: async_channel::Receiver<PersistRequest>,
mut rx: mpsc::UnboundedReceiver<PersistRequest>,
) {
) where
O: PersistCompletionObserver,
{
loop {
let req = tokio::select! {
// Bias the channel polling to prioritise work in the
@ -130,7 +134,8 @@ pub(super) async fn run_task(
// And finally mark the persist job as complete and notify any
// observers.
ctx.mark_complete(object_store_id);
ctx.mark_complete(object_store_id, &worker_state.completion_observer)
.await;
}
}
@ -148,10 +153,13 @@ pub(super) async fn run_task(
///
/// [`PersistingData`]:
/// crate::buffer_tree::partition::persisting::PersistingData
async fn compact_and_upload(
async fn compact_and_upload<O>(
ctx: &mut Context,
worker_state: &SharedWorkerState,
) -> Result<ParquetFileParams, PersistError> {
worker_state: &SharedWorkerState<O>,
) -> Result<ParquetFileParams, PersistError>
where
O: Send + Sync,
{
let compacted = compact(ctx, worker_state).await;
let (sort_key_update, parquet_table_data) = upload(ctx, worker_state, compacted).await;
@ -170,7 +178,10 @@ async fn compact_and_upload(
/// Compact the data in `ctx` using sorted by the sort key returned from
/// [`Context::sort_key()`].
async fn compact(ctx: &Context, worker_state: &SharedWorkerState) -> CompactedStream {
async fn compact<O>(ctx: &Context, worker_state: &SharedWorkerState<O>) -> CompactedStream
where
O: Send + Sync,
{
let sort_key = ctx.sort_key().get().await;
debug!(
@ -202,11 +213,14 @@ async fn compact(ctx: &Context, worker_state: &SharedWorkerState) -> CompactedSt
/// Upload the compacted data in `compacted`, returning the new sort key value
/// and parquet metadata to be upserted into the catalog.
async fn upload(
async fn upload<O>(
ctx: &Context,
worker_state: &SharedWorkerState,
worker_state: &SharedWorkerState<O>,
compacted: CompactedStream,
) -> (Option<SortKey>, ParquetFileParams) {
) -> (Option<SortKey>, ParquetFileParams)
where
O: Send + Sync,
{
let CompactedStream {
stream: record_stream,
catalog_sort_key_update,
@ -293,12 +307,15 @@ async fn upload(
/// If a concurrent sort key change is detected (issued by another node) then
/// this method updates the sort key in `ctx` to reflect the newly observed
/// value and returns [`PersistError::ConcurrentSortKeyUpdate`] to the caller.
async fn update_catalog_sort_key(
async fn update_catalog_sort_key<O>(
ctx: &mut Context,
worker_state: &SharedWorkerState,
worker_state: &SharedWorkerState<O>,
new_sort_key: SortKey,
object_store_id: Uuid,
) -> Result<(), PersistError> {
) -> Result<(), PersistError>
where
O: Send + Sync,
{
let old_sort_key = ctx
.sort_key()
.get()
@ -423,11 +440,14 @@ async fn update_catalog_sort_key(
Ok(())
}
async fn update_catalog_parquet(
async fn update_catalog_parquet<O>(
ctx: &Context,
worker_state: &SharedWorkerState,
worker_state: &SharedWorkerState<O>,
parquet_table_data: ParquetFileParams,
) -> Uuid {
) -> Uuid
where
O: Send + Sync,
{
// Extract the object store ID to the local scope so that it can easily
// be referenced in debug logging to aid correlation of persist events
// for a specific file.