Merge pull request #7220 from influxdata/dom/wal-ref-metrics
feat(wal): reference tracker metricspull/24376/head
commit
ece8641767
|
@ -9,6 +9,7 @@ use data_types::{
|
|||
SequenceNumber,
|
||||
};
|
||||
use hashbrown::HashMap;
|
||||
use metric::U64Gauge;
|
||||
use observability_deps::tracing::{debug, info, warn};
|
||||
use tokio::{
|
||||
select,
|
||||
|
@ -112,7 +113,7 @@ impl WalReferenceHandle {
|
|||
/// The returned [`WalReferenceActor`] SHOULD be
|
||||
/// [`WalReferenceActor::run()`] before the handle is used to avoid
|
||||
/// potential deadlocks.
|
||||
pub(crate) fn new<T>(wal: T) -> (Self, WalReferenceActor<T>)
|
||||
pub(crate) fn new<T>(wal: T, metrics: &metric::Registry) -> (Self, WalReferenceActor<T>)
|
||||
where
|
||||
T: WalFileDeleter,
|
||||
{
|
||||
|
@ -120,14 +121,7 @@ impl WalReferenceHandle {
|
|||
let (persist_tx, persist_rx) = mpsc::channel(50);
|
||||
let (unbuffered_tx, unbuffered_rx) = mpsc::channel(50);
|
||||
|
||||
let actor = WalReferenceActor {
|
||||
wal,
|
||||
persisted: SequenceNumberSet::default(),
|
||||
wal_files: HashMap::with_capacity(3),
|
||||
file_rx,
|
||||
persist_rx,
|
||||
unbuffered_rx,
|
||||
};
|
||||
let actor = WalReferenceActor::new(wal, file_rx, persist_rx, unbuffered_rx, metrics);
|
||||
|
||||
(
|
||||
Self {
|
||||
|
@ -206,15 +200,72 @@ pub(crate) struct WalReferenceActor<T = Arc<wal::Wal>> {
|
|||
/// Invariant: sets in this map are always non-empty.
|
||||
wal_files: HashMap<wal::SegmentId, SequenceNumberSet>,
|
||||
|
||||
/// Channels for input from the [`WalReferenceHandle`].
|
||||
file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet)>,
|
||||
persist_rx: mpsc::Receiver<Arc<CompletedPersist>>,
|
||||
unbuffered_rx: mpsc::Receiver<SequenceNumber>,
|
||||
|
||||
/// A metric tracking the number of rotated WAL files being reference
|
||||
/// tracked.
|
||||
num_files: U64Gauge,
|
||||
/// The minimum [`SegmentId`] in `wal_files`, the set of old (rotated out)
|
||||
/// files that will eventually be deleted.
|
||||
///
|
||||
/// If this value never changes over the lifetime of an ingester, it is an
|
||||
/// indication of a reference leak bug, causing a WAL file to never be
|
||||
/// deleted.
|
||||
min_id: U64Gauge,
|
||||
/// The number of references to unpersisted operations remaining in the old
|
||||
/// (rotated out) WAL files, decreasing as persistence completes, and
|
||||
/// increasing as non-empty WAL files are rotated into `wal_files`.
|
||||
referenced_ops: U64Gauge,
|
||||
}
|
||||
|
||||
impl<T> WalReferenceActor<T>
|
||||
where
|
||||
T: WalFileDeleter,
|
||||
{
|
||||
fn new(
|
||||
wal: T,
|
||||
file_rx: mpsc::Receiver<(SegmentId, SequenceNumberSet)>,
|
||||
persist_rx: mpsc::Receiver<Arc<CompletedPersist>>,
|
||||
unbuffered_rx: mpsc::Receiver<SequenceNumber>,
|
||||
metrics: &metric::Registry,
|
||||
) -> Self {
|
||||
let num_files = metrics
|
||||
.register_metric::<U64Gauge>(
|
||||
"ingester_wal_inactive_file_count",
|
||||
"number of WAL files that are not being actively wrote to, but contain unpersisted data"
|
||||
)
|
||||
.recorder(&[]);
|
||||
|
||||
let min_id = metrics
|
||||
.register_metric::<U64Gauge>(
|
||||
"ingester_wal_inactive_min_id",
|
||||
"the segment ID of the oldest inactive wal file",
|
||||
)
|
||||
.recorder(&[]);
|
||||
|
||||
let referenced_ops = metrics
|
||||
.register_metric::<U64Gauge>(
|
||||
"ingester_wal_inactive_file_op_reference_count",
|
||||
"the number of unpersisted operations referenced in inactive WAL files",
|
||||
)
|
||||
.recorder(&[]);
|
||||
|
||||
Self {
|
||||
wal,
|
||||
persisted: SequenceNumberSet::default(),
|
||||
wal_files: HashMap::with_capacity(3),
|
||||
file_rx,
|
||||
persist_rx,
|
||||
unbuffered_rx,
|
||||
num_files,
|
||||
min_id,
|
||||
referenced_ops,
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute the actor task.
|
||||
///
|
||||
/// This task exits once the sender side of the input channels have been
|
||||
|
@ -235,11 +286,50 @@ where
|
|||
Some(i) = self.unbuffered_rx.recv() => self.handle_unbuffered(i).await,
|
||||
else => break
|
||||
}
|
||||
|
||||
// After each action is processed, update the metrics.
|
||||
self.update_metrics();
|
||||
}
|
||||
|
||||
debug!("stopping wal reference counter task");
|
||||
}
|
||||
|
||||
/// Update the metrics to match the internal state.
|
||||
fn update_metrics(&self) {
|
||||
let num_files = self.wal_files.len();
|
||||
|
||||
// Build a set of (id, set_len) tuples for debug logging.
|
||||
let id_lens = self
|
||||
.wal_files
|
||||
.iter()
|
||||
.map(|(id, set)| (*id, set.len()))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Emit a log for debugging purposes, showing the current state.
|
||||
debug!(
|
||||
num_files,
|
||||
files=?id_lens,
|
||||
persisted_set_len=self.persisted.len(),
|
||||
"updated reference state"
|
||||
);
|
||||
|
||||
// Reduce (id, set_len) tuples to the min ID and sum of the set lengths,
|
||||
// defaulting to 0 for the length and u64::MAX for the ID if the file
|
||||
// set is empty.
|
||||
let (min_id, referenced_ops) =
|
||||
id_lens
|
||||
.into_iter()
|
||||
.fold((u64::MAX, 0), |(id_min, len_sum), e| {
|
||||
assert!(e.1 > 0); // Invariant: sets in file map are never empty
|
||||
(id_min.min(e.0.get()), len_sum + e.1)
|
||||
});
|
||||
|
||||
// And update the various exported metrics.
|
||||
self.num_files.set(num_files as _);
|
||||
self.min_id.set(min_id);
|
||||
self.referenced_ops.set(referenced_ops);
|
||||
}
|
||||
|
||||
/// Track a newly rotated WAL segment, with the given [`SegmentId`] and
|
||||
/// containing the operations specified in [`SequenceNumberSet`].
|
||||
///
|
||||
|
@ -409,6 +499,7 @@ mod tests {
|
|||
use assert_matches::assert_matches;
|
||||
use data_types::{NamespaceId, PartitionId, TableId};
|
||||
use futures::Future;
|
||||
use metric::assert_counter;
|
||||
use parking_lot::Mutex;
|
||||
use test_helpers::timeout::FutureTimeout;
|
||||
use tokio::sync::Notify;
|
||||
|
@ -477,8 +568,9 @@ mod tests {
|
|||
async fn test_rotate_persist_delete() {
|
||||
const SEGMENT_ID: SegmentId = SegmentId::new(42);
|
||||
|
||||
let metrics = metric::Registry::default();
|
||||
let wal = Arc::new(MockWalDeleter::default());
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal));
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal), &metrics);
|
||||
|
||||
let actor_task = tokio::spawn(actor.run());
|
||||
|
||||
|
@ -511,6 +603,20 @@ mod tests {
|
|||
// Validate the correct ID was deleted
|
||||
assert_matches!(wal.calls().as_slice(), &[v] if v == SEGMENT_ID);
|
||||
|
||||
assert_counter!(
|
||||
metrics,
|
||||
U64Gauge,
|
||||
"ingester_wal_inactive_file_count",
|
||||
value = 0,
|
||||
);
|
||||
|
||||
assert_counter!(
|
||||
metrics,
|
||||
U64Gauge,
|
||||
"ingester_wal_inactive_file_op_reference_count",
|
||||
value = 0,
|
||||
);
|
||||
|
||||
// Assert clean shutdown behaviour.
|
||||
drop(handle);
|
||||
actor_task
|
||||
|
@ -535,8 +641,9 @@ mod tests {
|
|||
const SEGMENT_ID_1: SegmentId = SegmentId::new(42);
|
||||
const SEGMENT_ID_2: SegmentId = SegmentId::new(24);
|
||||
|
||||
let metrics = metric::Registry::default();
|
||||
let wal = Arc::new(MockWalDeleter::default());
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal));
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal), &metrics);
|
||||
|
||||
let actor_task = tokio::spawn(actor.run());
|
||||
|
||||
|
@ -598,8 +705,9 @@ mod tests {
|
|||
async fn test_empty_file_set() {
|
||||
const SEGMENT_ID: SegmentId = SegmentId::new(42);
|
||||
|
||||
let metrics = metric::Registry::default();
|
||||
let wal = Arc::new(MockWalDeleter::default());
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal));
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal), &metrics);
|
||||
|
||||
let actor_task = tokio::spawn(actor.run());
|
||||
|
||||
|
@ -625,8 +733,9 @@ mod tests {
|
|||
#[tokio::test]
|
||||
#[should_panic(expected = "duplicate segment ID")]
|
||||
async fn test_duplicate_segment_ids() {
|
||||
let metrics = metric::Registry::default();
|
||||
let wal = Arc::new(MockWalDeleter::default());
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal));
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal), &metrics);
|
||||
|
||||
// Enqueuing a notification before the actor is running should succeed
|
||||
// because of the channel buffer capacity.
|
||||
|
@ -643,4 +752,76 @@ mod tests {
|
|||
// This should panic after processing the second file.
|
||||
actor.run().with_timeout_panic(Duration::from_secs(5)).await;
|
||||
}
|
||||
|
||||
/// Enqueue two segment files, enqueue persist notifications for the second
|
||||
/// file and wait for it to be deleted to synchronise the state (so it's not
|
||||
/// a racy test).
|
||||
///
|
||||
/// Then assert the metric values for the known state.
|
||||
#[tokio::test]
|
||||
async fn test_metrics() {
|
||||
const SEGMENT_ID_1: SegmentId = SegmentId::new(42);
|
||||
const SEGMENT_ID_2: SegmentId = SegmentId::new(24);
|
||||
|
||||
let metrics = metric::Registry::default();
|
||||
let wal = Arc::new(MockWalDeleter::default());
|
||||
let (handle, actor) = WalReferenceHandle::new(Arc::clone(&wal), &metrics);
|
||||
|
||||
let actor_task = tokio::spawn(actor.run());
|
||||
|
||||
// Add a file with 4 references
|
||||
handle
|
||||
.enqueue_rotated_file(SEGMENT_ID_1, new_set([1, 2, 3, 4, 5]))
|
||||
.await;
|
||||
|
||||
// Reduce the reference count for file 1 (leaving 3 references)
|
||||
handle.enqueue_persist_notification(new_note([1, 2])).await;
|
||||
|
||||
// Enqueue the second file.
|
||||
handle
|
||||
.enqueue_rotated_file(SEGMENT_ID_2, new_set([6]))
|
||||
.await;
|
||||
|
||||
// Release the references to file 2
|
||||
let waker = wal.waker();
|
||||
handle.enqueue_persist_notification(new_note([6])).await;
|
||||
|
||||
waker.await;
|
||||
|
||||
//
|
||||
// At this point, the actor has deleted the second file, which means it
|
||||
// has already processed the first enqueued file, and the first persist
|
||||
// notification that relates to the first file.
|
||||
//
|
||||
// A non-racy assert can now be made against this known state.
|
||||
//
|
||||
|
||||
assert_counter!(
|
||||
metrics,
|
||||
U64Gauge,
|
||||
"ingester_wal_inactive_file_count",
|
||||
value = 1,
|
||||
);
|
||||
|
||||
assert_counter!(
|
||||
metrics,
|
||||
U64Gauge,
|
||||
"ingester_wal_inactive_file_op_reference_count",
|
||||
value = 3, // 5 initial, reduced by 2 via persist notification
|
||||
);
|
||||
|
||||
assert_counter!(
|
||||
metrics,
|
||||
U64Gauge,
|
||||
"ingester_wal_inactive_min_id",
|
||||
value = SEGMENT_ID_1.get(),
|
||||
);
|
||||
|
||||
// Assert clean shutdown behaviour.
|
||||
drop(handle);
|
||||
actor_task
|
||||
.with_timeout_panic(Duration::from_secs(5))
|
||||
.await
|
||||
.expect("actor task should stop cleanly")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue