refactor: wal file process completion notification

Allow a caller to await processing of a submitted WAL segment in the WAL
reference tracker.

This allows callers to (optionally) order operations that should happen
after the file has been processed.
pull/24376/head
Dom Dwyer 2023-03-17 14:10:57 +01:00
parent 1958541460
commit fba15cb521
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
2 changed files with 70 additions and 18 deletions

View File

@ -9,7 +9,7 @@ use metric::U64Gauge;
use observability_deps::tracing::{debug, info}; use observability_deps::tracing::{debug, info};
use tokio::{ use tokio::{
select, select,
sync::{mpsc, Notify}, sync::{mpsc, oneshot, Notify},
}; };
use wal::SegmentId; use wal::SegmentId;
@ -55,7 +55,7 @@ pub(crate) struct WalReferenceActor<T = Arc<wal::Wal>> {
/// Channels for input from the [`WalReferenceHandle`]. /// Channels for input from the [`WalReferenceHandle`].
/// ///
/// [`WalReferenceHandle`]: super::WalReferenceHandle /// [`WalReferenceHandle`]: super::WalReferenceHandle
file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet)>, file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet, oneshot::Sender<()>)>,
persist_rx: mpsc::Receiver<Arc<CompletedPersist>>, persist_rx: mpsc::Receiver<Arc<CompletedPersist>>,
unbuffered_rx: mpsc::Receiver<SequenceNumber>, unbuffered_rx: mpsc::Receiver<SequenceNumber>,
@ -81,7 +81,7 @@ where
{ {
pub(super) fn new( pub(super) fn new(
wal: T, wal: T,
file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet)>, file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet, oneshot::Sender<()>)>,
persist_rx: mpsc::Receiver<Arc<CompletedPersist>>, persist_rx: mpsc::Receiver<Arc<CompletedPersist>>,
unbuffered_rx: mpsc::Receiver<SequenceNumber>, unbuffered_rx: mpsc::Receiver<SequenceNumber>,
empty_waker: Arc<Notify>, empty_waker: Arc<Notify>,
@ -137,7 +137,10 @@ where
// "persisted" set, reducing memory utilisation. // "persisted" set, reducing memory utilisation.
biased; biased;
Some((id, f)) = self.file_rx.recv() => self.handle_new_file(id, f).await, Some((id, f, done)) = self.file_rx.recv() => {
self.handle_new_file(id, f).await;
let _ = done.send(()); // There may be no listener.
},
Some(p) = self.persist_rx.recv() => self.handle_persisted(p).await, Some(p) = self.persist_rx.recv() => self.handle_persisted(p).await,
Some(i) = self.unbuffered_rx.recv() => self.handle_unbuffered(i).await, Some(i) = self.unbuffered_rx.recv() => self.handle_unbuffered(i).await,
else => break else => break

View File

@ -5,7 +5,7 @@ use futures::Future;
use observability_deps::tracing::warn; use observability_deps::tracing::warn;
use tokio::sync::{ use tokio::sync::{
mpsc::{self, error::TrySendError}, mpsc::{self, error::TrySendError},
Notify, oneshot, Notify,
}; };
use wal::SegmentId; use wal::SegmentId;
@ -73,7 +73,9 @@ use super::WalReferenceActor;
pub(crate) struct WalReferenceHandle { pub(crate) struct WalReferenceHandle {
/// A stream of newly rotated segment files and the set of /// A stream of newly rotated segment files and the set of
/// [`SequenceNumber`] within them. /// [`SequenceNumber`] within them.
file_tx: mpsc::Sender<(SegmentId, SequenceNumberSet)>, ///
/// The provided channel should be sent on once the file has been processed.
file_tx: mpsc::Sender<(SegmentId, SequenceNumberSet, oneshot::Sender<()>)>,
/// A steam of persist notifications - the [`SequenceNumberSet`] of the /// A steam of persist notifications - the [`SequenceNumberSet`] of the
/// persisted data that is now durable in object storage, and which no /// persisted data that is now durable in object storage, and which no
@ -130,8 +132,14 @@ impl WalReferenceHandle {
/// Enqueue a new file rotation event, providing the [`SegmentId`] of the /// Enqueue a new file rotation event, providing the [`SegmentId`] of the
/// WAL file and the [`SequenceNumberSet`] the WAL segment contains. /// WAL file and the [`SequenceNumberSet`] the WAL segment contains.
pub(crate) async fn enqueue_rotated_file(&self, segment_id: SegmentId, set: SequenceNumberSet) { pub(crate) async fn enqueue_rotated_file(
Self::send(&self.file_tx, (segment_id, set)).await &self,
segment_id: SegmentId,
set: SequenceNumberSet,
) -> oneshot::Receiver<()> {
let (tx, rx) = oneshot::channel();
Self::send(&self.file_tx, (segment_id, set, tx)).await;
rx
} }
/// Enqueue a persist completion notification for newly persisted data. /// Enqueue a persist completion notification for newly persisted data.
@ -163,11 +171,17 @@ impl WalReferenceHandle {
/// ///
/// # Ordering /// # Ordering
/// ///
/// This method establishes a happens-before ordering. /// Calling this method must happen-before the number of files reaches 0.
/// The number of files reaching zero happens-before the wakers are woken.
/// ///
/// A caller must call this method before the number of inactive files /// A caller must call this method before the number of inactive files
/// reaches 0 to be woken, otherwise the future will resolve the next time 0 /// reaches 0 to be woken, otherwise the future will resolve the next time 0
/// is reached. /// is reached.
///
/// Calls to [`WalReferenceHandle::enqueue_rotated_file()`] are executed
/// asynchronously; callers should use the returned completion handle when
/// enqueuing to order processing of the file before emptying of the
/// inactive file set (this notification future).
pub(crate) fn empty_inactive_notifier(&self) -> impl Future<Output = ()> + '_ { pub(crate) fn empty_inactive_notifier(&self) -> impl Future<Output = ()> + '_ {
self.empty_waker.notified() self.empty_waker.notified()
} }
@ -196,7 +210,7 @@ mod tests {
use assert_matches::assert_matches; use assert_matches::assert_matches;
use async_trait::async_trait; use async_trait::async_trait;
use data_types::{NamespaceId, PartitionId, TableId}; use data_types::{NamespaceId, PartitionId, TableId};
use futures::{task::Context, Future}; use futures::{task::Context, Future, FutureExt};
use metric::{assert_counter, U64Gauge}; use metric::{assert_counter, U64Gauge};
use parking_lot::Mutex; use parking_lot::Mutex;
use test_helpers::timeout::FutureTimeout; use test_helpers::timeout::FutureTimeout;
@ -275,7 +289,10 @@ mod tests {
// Add a file with IDs 1 through 5 // Add a file with IDs 1 through 5
handle handle
.enqueue_rotated_file(SEGMENT_ID, new_set([1, 2, 3, 4, 5])) .enqueue_rotated_file(SEGMENT_ID, new_set([1, 2, 3, 4, 5]))
.await; .with_timeout_panic(Duration::from_secs(5))
.flatten()
.await
.expect("did not receive file processed notification");
// Submit a persist notification that removes refs 1 & 2. // Submit a persist notification that removes refs 1 & 2.
handle.enqueue_persist_notification(new_note([1, 2])).await; handle.enqueue_persist_notification(new_note([1, 2])).await;
@ -355,7 +372,10 @@ mod tests {
let deleted_file_waker = wal.deleted_file_waker(); let deleted_file_waker = wal.deleted_file_waker();
handle handle
.enqueue_rotated_file(SEGMENT_ID_1, new_set([1, 2, 3])) .enqueue_rotated_file(SEGMENT_ID_1, new_set([1, 2, 3]))
.await; .with_timeout_panic(Duration::from_secs(5))
.flatten()
.await
.expect("did not receive file processed notification");
// Wait for it to be processed // Wait for it to be processed
deleted_file_waker.await; deleted_file_waker.await;
@ -366,7 +386,10 @@ mod tests {
// Enqueue the second WAL, covering 4 // Enqueue the second WAL, covering 4
handle handle
.enqueue_rotated_file(SEGMENT_ID_2, new_set([4, 5, 6])) .enqueue_rotated_file(SEGMENT_ID_2, new_set([4, 5, 6]))
.await; .with_timeout_panic(Duration::from_secs(5))
.flatten()
.await
.expect("did not receive file processed notification");
// At this point, the second WAL still has references outstanding (5, 6) // At this point, the second WAL still has references outstanding (5, 6)
// and should not have been deleted. // and should not have been deleted.
@ -420,7 +443,10 @@ mod tests {
let deleted_file_waker = wal.deleted_file_waker(); let deleted_file_waker = wal.deleted_file_waker();
handle handle
.enqueue_rotated_file(SEGMENT_ID, SequenceNumberSet::default()) .enqueue_rotated_file(SEGMENT_ID, SequenceNumberSet::default())
.await; .with_timeout_panic(Duration::from_secs(5))
.flatten()
.await
.expect("did not receive file processed notification");
// Wait for the file deletion. // Wait for the file deletion.
deleted_file_waker.await; deleted_file_waker.await;
@ -470,6 +496,7 @@ mod tests {
async fn test_metrics() { async fn test_metrics() {
const SEGMENT_ID_1: SegmentId = SegmentId::new(42); const SEGMENT_ID_1: SegmentId = SegmentId::new(42);
const SEGMENT_ID_2: SegmentId = SegmentId::new(24); const SEGMENT_ID_2: SegmentId = SegmentId::new(24);
assert!(SEGMENT_ID_2 < SEGMENT_ID_1); // Test invariant
let metrics = metric::Registry::default(); let metrics = metric::Registry::default();
let wal = Arc::new(MockWalDeleter::default()); let wal = Arc::new(MockWalDeleter::default());
@ -480,7 +507,10 @@ mod tests {
// Add a file with 4 references // Add a file with 4 references
handle handle
.enqueue_rotated_file(SEGMENT_ID_1, new_set([1, 2, 3, 4, 5])) .enqueue_rotated_file(SEGMENT_ID_1, new_set([1, 2, 3, 4, 5]))
.await; .with_timeout_panic(Duration::from_secs(5))
.flatten()
.await
.expect("did not receive file processed notification");
// Reduce the reference count for file 1 (leaving 3 references) // Reduce the reference count for file 1 (leaving 3 references)
handle.enqueue_persist_notification(new_note([1, 2])).await; handle.enqueue_persist_notification(new_note([1, 2])).await;
@ -488,7 +518,20 @@ mod tests {
// Enqueue the second file. // Enqueue the second file.
handle handle
.enqueue_rotated_file(SEGMENT_ID_2, new_set([6])) .enqueue_rotated_file(SEGMENT_ID_2, new_set([6]))
.await; .with_timeout_panic(Duration::from_secs(5))
.flatten()
.await
.expect("did not receive file processed notification");
// The second file was completed processed, so the minimum segment ID
// MUST now be 24.
assert_counter!(
metrics,
U64Gauge,
"ingester_wal_inactive_min_id",
value = SEGMENT_ID_2.get(),
);
// Release the references to file 2 // Release the references to file 2
let deleted_file_waker = wal.deleted_file_waker(); let deleted_file_waker = wal.deleted_file_waker();
@ -557,7 +600,10 @@ mod tests {
// Add a file // Add a file
handle handle
.enqueue_rotated_file(SEGMENT_ID_1, new_set([1, 2, 3])) .enqueue_rotated_file(SEGMENT_ID_1, new_set([1, 2, 3]))
.await; .with_timeout_panic(Duration::from_secs(5))
.flatten()
.await
.expect("did not receive file processed notification");
// Remove some file references, leaving 1 reference // Remove some file references, leaving 1 reference
handle.enqueue_persist_notification(new_note([1, 2])).await; handle.enqueue_persist_notification(new_note([1, 2])).await;
@ -580,7 +626,10 @@ mod tests {
// Add a new file with two references, releasing id=5. // Add a new file with two references, releasing id=5.
handle handle
.enqueue_rotated_file(SEGMENT_ID_2, new_set([5, 6])) .enqueue_rotated_file(SEGMENT_ID_2, new_set([5, 6]))
.await; .with_timeout_panic(Duration::from_secs(5))
.flatten()
.await
.expect("did not receive file processed notification");
// The tracker is not empty, so the future must not resolve. // The tracker is not empty, so the future must not resolve.
assert_matches!(Pin::new(&mut empty_waker).poll(&mut cx), Poll::Pending); assert_matches!(Pin::new(&mut empty_waker).poll(&mut cx), Poll::Pending);