Merge pull request #7209 from influxdata/dom/wal-refs

feat(wal): segment file reference counting
pull/24376/head
kodiakhq[bot] 2023-03-15 14:40:40 +00:00 committed by GitHub
commit a01abb6f69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 708 additions and 2 deletions

View File

@ -35,7 +35,7 @@ pub struct CompletedPersist {
impl CompletedPersist {
/// Construct a new completion notification.
pub(super) fn new(
pub(crate) fn new(
namespace_id: NamespaceId,
table_id: TableId,
partition_id: PartitionId,
@ -68,6 +68,23 @@ impl CompletedPersist {
pub(crate) fn sequence_numbers(&self) -> &SequenceNumberSet {
&self.sequence_numbers
}
/// Consume `self`, returning ownership of the inner [`SequenceNumberSet`].
pub(crate) fn into_sequence_numbers(self) -> SequenceNumberSet {
self.sequence_numbers
}
/// Obtain an owned inner [`SequenceNumberSet`] from an [`Arc`] wrapped
/// [`CompletedPersist`] in the most memory-efficient way possible at call
/// time.
///
/// This method attempts to unwrap an [`Arc`]-wrapped [`CompletedPersist`]
/// if `self `is the only reference, otherwise the shared set is cloned.
pub(crate) fn owned_sequence_numbers(self: Arc<Self>) -> SequenceNumberSet {
Arc::try_unwrap(self)
.map(|v| v.into_sequence_numbers())
.unwrap_or_else(|v| v.sequence_numbers().clone())
}
}
/// A no-op implementation of the [`PersistCompletionObserver`] trait.
@ -118,3 +135,45 @@ pub(crate) mod mock {
}
}
}
#[cfg(test)]
mod tests {
use data_types::SequenceNumber;
use super::*;
#[test]
fn test_owned_sequence_numbers_only_ref() {
let orig_set = [SequenceNumber::new(42)]
.into_iter()
.collect::<SequenceNumberSet>();
let note = Arc::new(CompletedPersist::new(
NamespaceId::new(1),
TableId::new(2),
PartitionId::new(3),
orig_set.clone(),
));
assert_eq!(orig_set, note.owned_sequence_numbers())
}
#[test]
fn test_owned_sequence_numbers_many_ref() {
let orig_set = [SequenceNumber::new(42)]
.into_iter()
.collect::<SequenceNumberSet>();
let note = Arc::new(CompletedPersist::new(
NamespaceId::new(1),
TableId::new(2),
PartitionId::new(3),
orig_set.clone(),
));
let note2 = Arc::clone(&note);
assert_eq!(orig_set, note.owned_sequence_numbers());
assert_eq!(orig_set, note2.owned_sequence_numbers());
}
}

View File

@ -4,6 +4,7 @@
//! [`DmlSink`]: crate::dml_sink::DmlSink
//! [`DmlOperation`]: dml::DmlOperation
pub(crate) mod reference_tracker;
pub(crate) mod rotate_task;
mod traits;
pub(crate) mod wal_sink;

View File

@ -0,0 +1,646 @@
//! A WAL file reference tracker, responsible for deleting files that contain
//! entirely persisted data.
use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use data_types::{
sequence_number_set::{self, SequenceNumberSet},
SequenceNumber,
};
use hashbrown::HashMap;
use observability_deps::tracing::{debug, info, warn};
use tokio::{
select,
sync::mpsc::{self, error::TrySendError},
};
use wal::SegmentId;
use crate::persist::completion_observer::CompletedPersist;
/// An abstraction defining the ability of an implementer to delete WAL segment
/// files by ID.
#[async_trait]
pub(crate) trait WalFileDeleter: Debug + Send + Sync + 'static {
/// Delete the WAL segment with the specified [`SegmentId`], or panic if
/// deletion fails.
async fn delete_file(&self, id: SegmentId);
}
#[async_trait]
impl WalFileDeleter for Arc<wal::Wal> {
async fn delete_file(&self, id: SegmentId) {
self.delete(id).await.expect("failed to drop wal segment");
}
}
/// A WAL file reference-count tracker handle.
///
/// The [`WalReferenceHandle`] feeds three inputs to the [`WalReferenceActor`]:
///
/// * The [`SequenceNumberSet`] and ID of rotated out WAL segment files
/// * The [`SequenceNumberSet`] of each completed persistence task
/// * All [`SequenceNumber`] of writes that failed to buffer
///
/// ```text
/// ┌ Write Processing ─ ─ ─ ─ ─ ─ ─ ─ ─
/// │
/// │ ┌────────────┐ ┌─────────────┐
/// │ WAL Rotate │ │ WAL DmlSink │ │
/// │ └────────────┘ └─────────────┘
/// │ │ │
/// │ IDs in │
/// rotated Failed │
/// │ segment write IDs
/// file │ │
/// │ │ │
/// ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ─ ┘
/// ▼ ▼
/// ┌────────────────────────────────────┐
/// │ │
/// │ WalReferenceActor │─ ─▶ Delete Files
/// │ │
/// └────────────────────────────────────┘
/// ▲
/// │
/// ┌ Persist System ─│─ ─ ─ ─ ─ ─ ─ ─ ─
/// │ │
/// │ ┌──────────────────┐
/// │ Completed │ │
/// │ │ Persistence │
/// │ Observer │ │
/// │ └──────────────────┘
/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
/// ```
///
/// Using these three streams of information, the [`WalReferenceActor`] computes
/// the number of unpersisted operations referenced in each WAL segment file,
/// and updates this count as more persist operations complete.
///
/// Once all the operations in a given WAL file have been observed as persisted
/// (or failed to apply), the WAL file is no longer required (all data it
/// contains is durable in the object store) and it is deleted.
///
/// The [`WalReferenceActor`] is tolerant of out-of-order events - that is, a
/// "persisted" event can be received and processed before the WAL file the data
/// is in is known. This is necessary to handle "hot partition persistence"
/// where data is persisted before the WAL file is rotated.
///
/// The [`WalReferenceActor`] gracefully stops once all [`WalReferenceHandle`]
/// instances to it are dropped.
#[derive(Debug, Clone)]
pub(crate) struct WalReferenceHandle {
/// A stream of newly rotated segment files and the set of
/// [`SequenceNumber`] within them.
file_tx: mpsc::Sender<(SegmentId, SequenceNumberSet)>,
/// A steam of persist notifications - the [`SequenceNumberSet`] of the
/// persisted data that is now durable in object storage, and which no
/// longer requires WAL entries for.
persist_tx: mpsc::Sender<Arc<CompletedPersist>>,
/// A stream of [`SequenceNumber`] identifying operations that have been (or
/// will be) added to the WAL, but failed to buffer/complete. These should
/// be treated as if they were "persisted", as they will never be persisted,
/// and are not expected to remain durable (user did not get an ACK).
unbuffered_tx: mpsc::Sender<SequenceNumber>,
}
impl WalReferenceHandle {
/// Construct a new [`WalReferenceActor`] and [`WalReferenceHandle`] pair.
///
/// The returned [`WalReferenceActor`] SHOULD be
/// [`WalReferenceActor::run()`] before the handle is used to avoid
/// potential deadlocks.
pub(crate) fn new<T>(wal: T) -> (Self, WalReferenceActor<T>)
where
T: WalFileDeleter,
{
let (file_tx, file_rx) = mpsc::channel(5);
let (persist_tx, persist_rx) = mpsc::channel(50);
let (unbuffered_tx, unbuffered_rx) = mpsc::channel(50);
let actor = WalReferenceActor {
wal,
persisted: SequenceNumberSet::default(),
wal_files: HashMap::with_capacity(3),
file_rx,
persist_rx,
unbuffered_rx,
};
(
Self {
file_tx,
persist_tx,
unbuffered_tx,
},
actor,
)
}
/// Enqueue a new file rotation event, providing the [`SegmentId`] of the
/// WAL file and the [`SequenceNumberSet`] the WAL segment contains.
pub(crate) async fn enqueue_rotated_file(&self, segment_id: SegmentId, set: SequenceNumberSet) {
Self::send(&self.file_tx, (segment_id, set)).await
}
/// Enqueue a persist completion notification for newly persisted data.
pub(crate) async fn enqueue_persist_notification(&self, note: Arc<CompletedPersist>) {
Self::send(&self.persist_tx, note).await
}
/// Enqueue a notification that a write appearing in some WAL segment will
/// not be buffered/persisted (either the active, not-yet-rotated segment or
/// a prior, already-rotated segment).
///
/// This can happen when a write is added to the WAL segment and
/// subsequently fails to be applied to the in-memory buffer. It is
/// important to track these unusual cases to ensure the WAL file is not
/// kept forever due to an outstanding reference, waiting for the unbuffered
/// write to be persisted (which it never will).
pub(crate) async fn enqueue_unbuffered_write(&self, id: SequenceNumber) {
Self::send(&self.unbuffered_tx, id).await
}
/// Send `val` over `chan`, logging a warning if `chan` is at capacity.
async fn send<T>(chan: &mpsc::Sender<T>, val: T)
where
T: Debug + Send,
{
match chan.try_send(val) {
Ok(()) => {}
Err(TrySendError::Full(val)) => {
warn!(?val, "notification buffer is full");
chan.send(val).await.expect("wal reference actor stopped");
}
Err(TrySendError::Closed(_)) => panic!("wal reference actor stopped"),
}
}
}
/// A WAL file reference-count tracker.
///
/// See [`WalReferenceHandle`].
#[derive(Debug)]
pub(crate) struct WalReferenceActor<T = Arc<wal::Wal>> {
wal: T,
/// The set of IDs of persisted data that do not yet appear in
/// `wal_files`, the set of WAL files rotated out of active use. This is
/// an intermediate buffer necessary to tolerate out-of-order persist
/// notifications w.r.t file notifications.
///
/// IDs that appear in this set are most likely part of the active WAL
/// segment file and should be reconciled when it rotates.
persisted: SequenceNumberSet,
/// The set of closed WAL segment files, and the set of unpersisted
/// [`SequenceNumber`] they contain.
///
/// These [`SequenceNumberSet`] are slowly drained / have IDs removed in
/// response to persisted data notifications. Once the set is of length 0,
/// the file can be deleted as all the entries the file contains has been
/// persisted.
///
/// Invariant: sets in this map are always non-empty.
wal_files: HashMap<wal::SegmentId, SequenceNumberSet>,
file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet)>,
persist_rx: mpsc::Receiver<Arc<CompletedPersist>>,
unbuffered_rx: mpsc::Receiver<SequenceNumber>,
}
impl<T> WalReferenceActor<T>
where
T: WalFileDeleter,
{
/// Execute the actor task.
///
/// This task exits once the sender side of the input channels have been
/// dropped.
pub(crate) async fn run(mut self) {
loop {
select! {
// Prefer polling the channels in the specified order.
//
// By consuming file_rx first, there's a greater chance that
// subsequent persist/ignore events can be applied directly to
// the file sets, rather than having to wait in the intermediate
// "persisted" set, reducing memory utilisation.
biased;
Some((id, f)) = self.file_rx.recv() => self.handle_new_file(id, f).await,
Some(p) = self.persist_rx.recv() => self.handle_persisted(p).await,
Some(i) = self.unbuffered_rx.recv() => self.handle_unbuffered(i).await,
else => break
}
}
debug!("stopping wal reference counter task");
}
/// Track a newly rotated WAL segment, with the given [`SegmentId`] and
/// containing the operations specified in [`SequenceNumberSet`].
///
/// This method tolerates an empty `set`.
async fn handle_new_file(&mut self, segment_id: SegmentId, mut set: SequenceNumberSet) {
debug!(
%segment_id,
sequence_number_set = ?set,
"notified of new segment file"
);
// Clear the overlap between the "persisted" set, and this new file from
// both.
let n = clear_intersection(&mut self.persisted, &mut set);
if n > 0 {
debug!(n, "released previously persisted IDs");
}
// If the file set is now completely empty, it can be immediately
// deleted.
if set.is_empty() {
debug!(n, "immediately dropping empty segment file");
return delete_file(&self.wal, segment_id).await;
}
// Otherwise, retain this file for later persist notifications.
//
// Run-optimise the bitmap to minimise memory utilisation of this set.
// This is a relatively fast operation, and the file sets are expected
// to be highly suitable for RLE compression due to the monotonic
// sequence number assignments.
set.run_optimise();
// Insert the file set into the files being tracked
assert!(!set.is_empty()); // Invariant: sets in file map are never empty
assert!(
self.wal_files.insert(segment_id, set).is_none(),
"duplicate segment ID"
);
}
/// Process a persistence completion notification, decreasing the reference
/// counts against tracked WAL files, and holding any remaining IDs (in the
/// untracked active WAL segment) in a temporary "persisted" buffer.
async fn handle_persisted(&mut self, note: Arc<CompletedPersist>) {
debug!(
namespace_id = %note.namespace_id(),
table_id = %note.table_id(),
partition_id = %note.partition_id(),
sequence_number_set = ?note.sequence_numbers(),
"notified of persisted data"
);
self.remove(note.owned_sequence_numbers()).await;
}
/// Handle a write that has been added to the WAL, but that did not complete
/// / buffer.
///
/// Because the write was added to the WAL, its ID will be part of the WAL
/// file's [`SequenceNumberSet`], but because the write was not buffered, it
/// will never be persisted and therefore the WAL set will always have an
/// outstanding reference unless it is accounted for here.
async fn handle_unbuffered(&mut self, id: SequenceNumber) {
debug!(sequence_number = id.get(), "notified of unbuffered write");
// Delegate to the same code as persisted by presenting this ID as a set
// - the same behaviour is required.
let mut set = SequenceNumberSet::with_capacity(1);
set.add(id);
self.remove(set).await;
}
/// Remove the intersection of `set` from all the sets in `self` (file sets,
/// and the untracked / "persisted" buffer set).
///
/// Deletes all WAL files that are no longer referenced / have unpersisted
/// entries.
async fn remove(&mut self, mut set: SequenceNumberSet) {
// First remove this set from the "persisted" / file-less set.
let n = clear_intersection(&mut set, &mut self.persisted);
if n > 0 {
debug!(n, "released previously persisted IDs");
}
if set.is_empty() {
debug!(n, "fully matched previously persisted IDs");
return;
}
// And then walk the WAL file sets.
let mut remove_ids = Vec::with_capacity(0);
for (id, file_set) in self.wal_files.iter_mut() {
// Invariant: files in the file set always have at least 1 reference
assert!(!file_set.is_empty());
// Early exit the loop if possible.
if set.is_empty() {
break;
}
// Clear the intersection of both sets.
let n = clear_intersection(&mut set, file_set);
if n == 0 {
continue;
}
debug!(n, segment_id=%id, "matched file IDs");
// At least 1 element was removed from the file set, it may now be
// empty.
if file_set.is_empty() {
remove_ids.push(*id);
}
}
// Union whatever IDs remain with the file-less persisted set.
if !set.is_empty() {
debug!(n = set.len(), "retaining file-less IDs");
self.persisted.add_set(&set);
}
// And delete any newly empty files
for id in remove_ids {
let file_set = self
.wal_files
.remove(&id)
.expect("id was obtained during iter");
// Invariant: the file being removed always has no references.
assert!(file_set.is_empty());
delete_file(&self.wal, id).await
}
}
}
/// Remove the intersection of `a` and `b`, from both `a` and `b`, and return
/// the cardinality of the intersection.
fn clear_intersection(a: &mut SequenceNumberSet, b: &mut SequenceNumberSet) -> usize {
let intersection = sequence_number_set::intersect(a, b);
a.remove_set(&intersection);
b.remove_set(&intersection);
intersection.len() as _
}
/// Delete the specified WAL segment from `wal`, and log it at info.
async fn delete_file<T>(wal: &T, id: SegmentId)
where
T: WalFileDeleter,
{
info!(
%id,
"deleted fully-persisted wal segment"
);
wal.delete_file(id).await
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use data_types::{NamespaceId, PartitionId, TableId};
use futures::Future;
use parking_lot::Mutex;
use test_helpers::timeout::FutureTimeout;
use tokio::sync::Notify;
use super::*;
/// A mock file deleter that records the IDs it was asked to delete.
#[derive(Debug, Default)]
struct MockWalDeleter {
notify: Notify,
calls: Mutex<Vec<SegmentId>>,
}
impl MockWalDeleter {
/// Return the set of [`SegmentId`] that have been deleted.
fn calls(&self) -> Vec<SegmentId> {
self.calls.lock().clone()
}
/// Return a future that completes when a file is subsequently deleted,
/// or panics if no file is deleted within 5 seconds.
fn waker(&self) -> impl Future<Output = ()> + '_ {
self.notify
.notified()
.with_timeout_panic(Duration::from_secs(5))
}
}
#[async_trait]
impl WalFileDeleter for Arc<MockWalDeleter> {
async fn delete_file(&self, id: SegmentId) {
self.calls.lock().push(id);
self.notify.notify_waiters();
}
}
/// Return a [`SequenceNumberSet`] containing `vals`.
fn new_set<T>(vals: T) -> SequenceNumberSet
where
T: IntoIterator<Item = i64>,
{
vals.into_iter().map(SequenceNumber::new).collect()
}
/// Return a persist completion notification with the given
/// [`SequenceNumberSet`] values.
fn new_note<T>(vals: T) -> Arc<CompletedPersist>
where
T: IntoIterator<Item = i64>,
{
Arc::new(CompletedPersist::new(
NamespaceId::new(1),
TableId::new(2),
PartitionId::new(3),
new_set(vals),
))
}
/// Test in-order notifications:
///
/// * WAL file is rotated and the tracker notified
/// * Multiple persists complete, and an unbuffered notification, draining
/// the references to the file
/// * The file is deleted when refs == 0
/// * Dropping the handle stops the actor
#[tokio::test]
async fn test_rotate_persist_delete() {
const SEGMENT_ID: SegmentId = SegmentId::new(42);
let wal = Arc::new(MockWalDeleter::default());
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal));
let actor_task = tokio::spawn(actor.run());
// Add a file with IDs 1 through 5
handle
.enqueue_rotated_file(SEGMENT_ID, new_set([1, 2, 3, 4, 5]))
.await;
// Submit a persist notification that removes refs 1 & 2.
handle.enqueue_persist_notification(new_note([1, 2])).await;
// Ensure the file was not deleted
assert!(wal.calls().is_empty());
// Enqueue a unbuffered notification (out of order)
handle
.enqueue_unbuffered_write(SequenceNumber::new(5))
.await;
// Ensure the file was not deleted
assert!(wal.calls().is_empty());
// Finally release the last IDs
let waker = wal.waker();
handle.enqueue_persist_notification(new_note([3, 4])).await;
// Wait for it to be processed
waker.await;
// Validate the correct ID was deleted
assert_matches!(wal.calls().as_slice(), &[v] if v == SEGMENT_ID);
// Assert clean shutdown behaviour.
drop(handle);
actor_task
.with_timeout_panic(Duration::from_secs(5))
.await
.expect("actor task should stop cleanly")
}
/// Test in-order notifications:
///
/// * Multiple persists complete
/// * A WAL file notification is received containing a subset of the
/// already persisted IDs
/// * The file is deleted because refs == 0
/// * A WAL file notification for a superset of the remaining persisted
/// IDs
/// * The remaining references are persisted/unbuffered
/// * The second WAL file is deleted
/// * Dropping the handle stops the actor
#[tokio::test]
async fn test_persist_all_rotate_delete() {
const SEGMENT_ID_1: SegmentId = SegmentId::new(42);
const SEGMENT_ID_2: SegmentId = SegmentId::new(24);
let wal = Arc::new(MockWalDeleter::default());
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal));
let actor_task = tokio::spawn(actor.run());
// Submit a persist notification for the entire set of IDs [1,2,3,4] in
// the upcoming first WAL, and partially the second WAL
handle.enqueue_persist_notification(new_note([2])).await;
handle.enqueue_persist_notification(new_note([1])).await;
handle.enqueue_persist_notification(new_note([3, 4])).await;
// Add a file with IDs 1, 2, 3
let waker = wal.waker();
handle
.enqueue_rotated_file(SEGMENT_ID_1, new_set([1, 2, 3]))
.await;
// Wait for it to be processed
waker.await;
// Validate the correct ID was deleted
assert_matches!(wal.calls().as_slice(), &[v] if v == SEGMENT_ID_1);
// Enqueue the second WAL, covering 4
handle
.enqueue_rotated_file(SEGMENT_ID_2, new_set([4, 5, 6]))
.await;
// At this point, the second WAL still has references outstanding (5, 6)
// and should not have been deleted.
assert_eq!(wal.calls().len(), 1);
// Release one of the remaining two refs
handle.enqueue_persist_notification(new_note([6])).await;
// Still no deletion
assert_eq!(wal.calls().len(), 1);
// And finally release the last ref via an unbuffered notification
let waker = wal.waker();
handle
.enqueue_unbuffered_write(SequenceNumber::new(5))
.await;
waker.await;
// Validate the correct ID was deleted
assert_matches!(wal.calls().as_slice(), &[a, b] => {
assert_eq!(a, SEGMENT_ID_1);
assert_eq!(b, SEGMENT_ID_2);
});
// Assert clean shutdown behaviour.
drop(handle);
actor_task
.with_timeout_panic(Duration::from_secs(5))
.await
.expect("actor task should stop cleanly")
}
#[tokio::test]
async fn test_empty_file_set() {
const SEGMENT_ID: SegmentId = SegmentId::new(42);
let wal = Arc::new(MockWalDeleter::default());
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal));
let actor_task = tokio::spawn(actor.run());
// Notifying the actor of a WAL file with no operations in it should not
// cause a panic, and should cause the file to be immediately deleted.
let waker = wal.waker();
handle
.enqueue_rotated_file(SEGMENT_ID, SequenceNumberSet::default())
.await;
// Wait for the file deletion.
waker.await;
assert_matches!(wal.calls().as_slice(), &[v] if v == SEGMENT_ID);
// Assert clean shutdown behaviour.
drop(handle);
actor_task
.with_timeout_panic(Duration::from_secs(5))
.await
.expect("actor task should stop cleanly")
}
#[tokio::test]
#[should_panic(expected = "duplicate segment ID")]
async fn test_duplicate_segment_ids() {
let wal = Arc::new(MockWalDeleter::default());
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal));
// Enqueuing a notification before the actor is running should succeed
// because of the channel buffer capacity.
handle
.enqueue_rotated_file(SegmentId::new(42), new_set([1, 2]))
.with_timeout_panic(Duration::from_secs(5))
.await;
handle
.enqueue_rotated_file(SegmentId::new(42), new_set([3, 4]))
.with_timeout_panic(Duration::from_secs(5))
.await;
// This should panic after processing the second file.
actor.run().with_timeout_panic(Duration::from_secs(5)).await;
}
}

View File

@ -143,7 +143,7 @@ pub type SegmentIdBytes = [u8; 8];
#[allow(missing_docs)]
impl SegmentId {
pub fn new(v: u64) -> Self {
pub const fn new(v: u64) -> Self {
Self(v)
}