diff --git a/ingester2/src/wal/reference_tracker/actor.rs b/ingester2/src/wal/reference_tracker/actor.rs index 7ad62400fd..633609b06f 100644 --- a/ingester2/src/wal/reference_tracker/actor.rs +++ b/ingester2/src/wal/reference_tracker/actor.rs @@ -9,7 +9,7 @@ use metric::U64Gauge; use observability_deps::tracing::{debug, info}; use tokio::{ select, - sync::{mpsc, Notify}, + sync::{mpsc, oneshot, Notify}, }; use wal::SegmentId; @@ -55,7 +55,7 @@ pub(crate) struct WalReferenceActor> { /// Channels for input from the [`WalReferenceHandle`]. /// /// [`WalReferenceHandle`]: super::WalReferenceHandle - file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet)>, + file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet, oneshot::Sender<()>)>, persist_rx: mpsc::Receiver>, unbuffered_rx: mpsc::Receiver, @@ -81,7 +81,7 @@ where { pub(super) fn new( wal: T, - file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet)>, + file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet, oneshot::Sender<()>)>, persist_rx: mpsc::Receiver>, unbuffered_rx: mpsc::Receiver, empty_waker: Arc, @@ -137,7 +137,10 @@ where // "persisted" set, reducing memory utilisation. biased; - Some((id, f)) = self.file_rx.recv() => self.handle_new_file(id, f).await, + Some((id, f, done)) = self.file_rx.recv() => { + self.handle_new_file(id, f).await; + let _ = done.send(()); // There may be no listener. + }, Some(p) = self.persist_rx.recv() => self.handle_persisted(p).await, Some(i) = self.unbuffered_rx.recv() => self.handle_unbuffered(i).await, else => break diff --git a/ingester2/src/wal/reference_tracker/handle.rs b/ingester2/src/wal/reference_tracker/handle.rs index 19969c9e82..0e9d33badd 100644 --- a/ingester2/src/wal/reference_tracker/handle.rs +++ b/ingester2/src/wal/reference_tracker/handle.rs @@ -5,7 +5,7 @@ use futures::Future; use observability_deps::tracing::warn; use tokio::sync::{ mpsc::{self, error::TrySendError}, - Notify, + oneshot, Notify, }; use wal::SegmentId; @@ -73,7 +73,9 @@ use super::WalReferenceActor; pub(crate) struct WalReferenceHandle { /// A stream of newly rotated segment files and the set of /// [`SequenceNumber`] within them. - file_tx: mpsc::Sender<(SegmentId, SequenceNumberSet)>, + /// + /// The provided channel should be sent on once the file has been processed. + file_tx: mpsc::Sender<(SegmentId, SequenceNumberSet, oneshot::Sender<()>)>, /// A steam of persist notifications - the [`SequenceNumberSet`] of the /// persisted data that is now durable in object storage, and which no @@ -130,8 +132,14 @@ impl WalReferenceHandle { /// Enqueue a new file rotation event, providing the [`SegmentId`] of the /// WAL file and the [`SequenceNumberSet`] the WAL segment contains. - pub(crate) async fn enqueue_rotated_file(&self, segment_id: SegmentId, set: SequenceNumberSet) { - Self::send(&self.file_tx, (segment_id, set)).await + pub(crate) async fn enqueue_rotated_file( + &self, + segment_id: SegmentId, + set: SequenceNumberSet, + ) -> oneshot::Receiver<()> { + let (tx, rx) = oneshot::channel(); + Self::send(&self.file_tx, (segment_id, set, tx)).await; + rx } /// Enqueue a persist completion notification for newly persisted data. @@ -163,11 +171,17 @@ impl WalReferenceHandle { /// /// # Ordering /// - /// This method establishes a happens-before ordering. + /// Calling this method must happen-before the number of files reaches 0. + /// The number of files reaching zero happens-before the wakers are woken. /// /// A caller must call this method before the number of inactive files /// reaches 0 to be woken, otherwise the future will resolve the next time 0 /// is reached. + /// + /// Calls to [`WalReferenceHandle::enqueue_rotated_file()`] are executed + /// asynchronously; callers should use the returned completion handle when + /// enqueuing to order processing of the file before emptying of the + /// inactive file set (this notification future). pub(crate) fn empty_inactive_notifier(&self) -> impl Future + '_ { self.empty_waker.notified() } @@ -196,7 +210,7 @@ mod tests { use assert_matches::assert_matches; use async_trait::async_trait; use data_types::{NamespaceId, PartitionId, TableId}; - use futures::{task::Context, Future}; + use futures::{task::Context, Future, FutureExt}; use metric::{assert_counter, U64Gauge}; use parking_lot::Mutex; use test_helpers::timeout::FutureTimeout; @@ -275,7 +289,10 @@ mod tests { // Add a file with IDs 1 through 5 handle .enqueue_rotated_file(SEGMENT_ID, new_set([1, 2, 3, 4, 5])) - .await; + .with_timeout_panic(Duration::from_secs(5)) + .flatten() + .await + .expect("did not receive file processed notification"); // Submit a persist notification that removes refs 1 & 2. handle.enqueue_persist_notification(new_note([1, 2])).await; @@ -355,7 +372,10 @@ mod tests { let deleted_file_waker = wal.deleted_file_waker(); handle .enqueue_rotated_file(SEGMENT_ID_1, new_set([1, 2, 3])) - .await; + .with_timeout_panic(Duration::from_secs(5)) + .flatten() + .await + .expect("did not receive file processed notification"); // Wait for it to be processed deleted_file_waker.await; @@ -366,7 +386,10 @@ mod tests { // Enqueue the second WAL, covering 4 handle .enqueue_rotated_file(SEGMENT_ID_2, new_set([4, 5, 6])) - .await; + .with_timeout_panic(Duration::from_secs(5)) + .flatten() + .await + .expect("did not receive file processed notification"); // At this point, the second WAL still has references outstanding (5, 6) // and should not have been deleted. @@ -420,7 +443,10 @@ mod tests { let deleted_file_waker = wal.deleted_file_waker(); handle .enqueue_rotated_file(SEGMENT_ID, SequenceNumberSet::default()) - .await; + .with_timeout_panic(Duration::from_secs(5)) + .flatten() + .await + .expect("did not receive file processed notification"); // Wait for the file deletion. deleted_file_waker.await; @@ -470,6 +496,7 @@ mod tests { async fn test_metrics() { const SEGMENT_ID_1: SegmentId = SegmentId::new(42); const SEGMENT_ID_2: SegmentId = SegmentId::new(24); + assert!(SEGMENT_ID_2 < SEGMENT_ID_1); // Test invariant let metrics = metric::Registry::default(); let wal = Arc::new(MockWalDeleter::default()); @@ -480,7 +507,10 @@ mod tests { // Add a file with 4 references handle .enqueue_rotated_file(SEGMENT_ID_1, new_set([1, 2, 3, 4, 5])) - .await; + .with_timeout_panic(Duration::from_secs(5)) + .flatten() + .await + .expect("did not receive file processed notification"); // Reduce the reference count for file 1 (leaving 3 references) handle.enqueue_persist_notification(new_note([1, 2])).await; @@ -488,7 +518,20 @@ mod tests { // Enqueue the second file. handle .enqueue_rotated_file(SEGMENT_ID_2, new_set([6])) - .await; + .with_timeout_panic(Duration::from_secs(5)) + .flatten() + .await + .expect("did not receive file processed notification"); + + // The second file was completed processed, so the minimum segment ID + // MUST now be 24. + + assert_counter!( + metrics, + U64Gauge, + "ingester_wal_inactive_min_id", + value = SEGMENT_ID_2.get(), + ); // Release the references to file 2 let deleted_file_waker = wal.deleted_file_waker(); @@ -557,7 +600,10 @@ mod tests { // Add a file handle .enqueue_rotated_file(SEGMENT_ID_1, new_set([1, 2, 3])) - .await; + .with_timeout_panic(Duration::from_secs(5)) + .flatten() + .await + .expect("did not receive file processed notification"); // Remove some file references, leaving 1 reference handle.enqueue_persist_notification(new_note([1, 2])).await; @@ -580,7 +626,10 @@ mod tests { // Add a new file with two references, releasing id=5. handle .enqueue_rotated_file(SEGMENT_ID_2, new_set([5, 6])) - .await; + .with_timeout_panic(Duration::from_secs(5)) + .flatten() + .await + .expect("did not receive file processed notification"); // The tracker is not empty, so the future must not resolve. assert_matches!(Pin::new(&mut empty_waker).poll(&mut cx), Poll::Pending);