refactor(tracker): Return disk usage watcher from `DiskUsageMetrics`
This allows the creator to pass around a handle to the latest observed disk usage statistics, allowing other threads to act upon changes.pull/24376/head
parent
9b52bfdeaa
commit
8e0cee8e73
|
@ -6265,6 +6265,7 @@ dependencies = [
|
|||
"pin-project",
|
||||
"sysinfo",
|
||||
"tempfile",
|
||||
"test_helpers",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"trace",
|
||||
|
|
|
@ -351,11 +351,9 @@ where
|
|||
|
||||
// Initialize disk metrics to emit disk capacity / free statistics for the
|
||||
// WAL directory.
|
||||
let disk_metric_task = tokio::task::spawn(
|
||||
DiskSpaceMetrics::new(wal_directory, &metrics)
|
||||
.expect("failed to resolve WAL directory to disk")
|
||||
.run(),
|
||||
);
|
||||
let (disk_metric_task, _snapshot_rx) = DiskSpaceMetrics::new(wal_directory, &metrics)
|
||||
.expect("failed to resolve WAL directory to disk");
|
||||
let disk_metric_task = tokio::task::spawn(disk_metric_task.run());
|
||||
|
||||
// Replay the WAL log files, if any.
|
||||
let max_sequence_number =
|
||||
|
|
|
@ -25,3 +25,4 @@ sysinfo = "0.29.7"
|
|||
tempfile = "3.7.0"
|
||||
# Need the multi-threaded executor for testing
|
||||
tokio = { version = "1.29", features = ["macros", "parking_lot", "rt-multi-thread", "time"] }
|
||||
test_helpers = { path = "../test_helpers", features = ["future_timeout"] }
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
use std::{borrow::Cow, path::PathBuf, time::Duration};
|
||||
use std::borrow::Cow;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use metric::{Attributes, U64Gauge};
|
||||
use sysinfo::{DiskExt, RefreshKind, System, SystemExt};
|
||||
use tokio::sync::watch;
|
||||
|
||||
/// The interval at which disk metrics are updated.
|
||||
///
|
||||
|
@ -9,6 +12,32 @@ use sysinfo::{DiskExt, RefreshKind, System, SystemExt};
|
|||
/// interval.
|
||||
const UPDATE_INTERVAL: Duration = Duration::from_secs(13);
|
||||
|
||||
/// An immutable snapshot of space and usage statistics for some disk.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub struct DiskSpaceSnapshot {
|
||||
available_disk_space: u64,
|
||||
total_disk_space: u64,
|
||||
}
|
||||
|
||||
impl DiskSpaceSnapshot {
|
||||
/// The available space in bytes on the disk.
|
||||
pub fn available_disk_space(&self) -> u64 {
|
||||
self.available_disk_space
|
||||
}
|
||||
|
||||
/// The maximum capacity in bytes of the disk.
|
||||
pub fn total_disk_space(&self) -> u64 {
|
||||
self.total_disk_space
|
||||
}
|
||||
|
||||
/// Overall usage of the disk, as a percentage [0.0, 1.0].
|
||||
#[inline]
|
||||
pub fn disk_usage_ratio(&self) -> f64 {
|
||||
debug_assert!(self.available_disk_space <= self.total_disk_space);
|
||||
1.0 - (self.available_disk_space as f64 / self.total_disk_space as f64)
|
||||
}
|
||||
}
|
||||
|
||||
/// A periodic reporter of disk capacity / free statistics for a given
|
||||
/// directory.
|
||||
#[derive(Debug)]
|
||||
|
@ -22,12 +51,19 @@ pub struct DiskSpaceMetrics {
|
|||
/// The index into [`System::disks()`] for the disk containing the observed
|
||||
/// directory.
|
||||
disk_idx: usize,
|
||||
|
||||
/// A stream of [`DiskSpaceSnapshot`] produced by the metric reporter for
|
||||
/// consumption by any listeners.
|
||||
snapshot_tx: watch::Sender<DiskSpaceSnapshot>,
|
||||
}
|
||||
|
||||
impl DiskSpaceMetrics {
|
||||
/// Create a new [`DiskSpaceMetrics`], returning [`None`] if no disk can be
|
||||
/// found for the specified `directory`.
|
||||
pub fn new(directory: PathBuf, registry: &metric::Registry) -> Option<Self> {
|
||||
pub fn new(
|
||||
directory: PathBuf,
|
||||
registry: &metric::Registry,
|
||||
) -> Option<(Self, watch::Receiver<DiskSpaceSnapshot>)> {
|
||||
let path: Cow<'static, str> = Cow::from(directory.display().to_string());
|
||||
let mut directory = directory.canonicalize().ok()?;
|
||||
|
||||
|
@ -52,14 +88,14 @@ impl DiskSpaceMetrics {
|
|||
|
||||
// Resolve the mount point once.
|
||||
// The directory path may be `/path/to/dir` and the mount point is `/`.
|
||||
let disk_idx = loop {
|
||||
if let Some((idx, _disk)) = system
|
||||
let (disk_idx, initial_disk) = loop {
|
||||
if let Some((idx, disk)) = system
|
||||
.disks()
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_idx, disk)| disk.mount_point() == directory)
|
||||
{
|
||||
break idx;
|
||||
break (idx, disk);
|
||||
}
|
||||
// The mount point for this directory could not be found.
|
||||
if !directory.pop() {
|
||||
|
@ -67,18 +103,26 @@ impl DiskSpaceMetrics {
|
|||
}
|
||||
};
|
||||
|
||||
Some(Self {
|
||||
available_disk_space,
|
||||
total_disk_space,
|
||||
system,
|
||||
disk_idx,
|
||||
})
|
||||
let (snapshot_tx, snapshot_rx) = watch::channel(DiskSpaceSnapshot {
|
||||
available_disk_space: initial_disk.available_space(),
|
||||
total_disk_space: initial_disk.total_space(),
|
||||
});
|
||||
|
||||
Some((
|
||||
Self {
|
||||
available_disk_space,
|
||||
total_disk_space,
|
||||
system,
|
||||
disk_idx,
|
||||
snapshot_tx,
|
||||
},
|
||||
snapshot_rx,
|
||||
))
|
||||
}
|
||||
|
||||
/// Start the [`DiskSpaceMetrics`] evaluation loop, blocking forever.
|
||||
pub async fn run(mut self) {
|
||||
let mut interval = tokio::time::interval(UPDATE_INTERVAL);
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
|
@ -93,6 +137,13 @@ impl DiskSpaceMetrics {
|
|||
|
||||
self.available_disk_space.set(disk.available_space());
|
||||
self.total_disk_space.set(disk.total_space());
|
||||
|
||||
// Produce and send a [`DiskSpaceSnapshot`] for any listeners
|
||||
// that might exist.
|
||||
_ = self.snapshot_tx.send(DiskSpaceSnapshot {
|
||||
available_disk_space: disk.available_space(),
|
||||
total_disk_space: disk.total_space(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -103,6 +154,7 @@ mod tests {
|
|||
|
||||
use metric::Metric;
|
||||
use tempfile::tempdir_in;
|
||||
use test_helpers::timeout::FutureTimeout;
|
||||
|
||||
use super::*;
|
||||
|
||||
|
@ -121,11 +173,9 @@ mod tests {
|
|||
|
||||
let registry = Arc::new(metric::Registry::new());
|
||||
|
||||
let _handle = tokio::spawn(
|
||||
DiskSpaceMetrics::new(pathbuf, ®istry)
|
||||
.expect("root always exists")
|
||||
.run(),
|
||||
);
|
||||
let (_handle, mut snapshot_rx) =
|
||||
DiskSpaceMetrics::new(pathbuf, ®istry).expect("root always exists");
|
||||
let _handle = tokio::spawn(_handle.run());
|
||||
|
||||
// Wait for the metric to be emitted and non-zero - this should be very
|
||||
// quick!
|
||||
|
@ -151,10 +201,45 @@ mod tests {
|
|||
.fetch();
|
||||
|
||||
if recorded_free_metric > 0 && recorded_total_metric > 0 {
|
||||
snapshot_rx
|
||||
.changed()
|
||||
.with_timeout_panic(Duration::from_secs(5))
|
||||
.await
|
||||
.expect("snapshot value should have changed");
|
||||
|
||||
let snapshot = *snapshot_rx.borrow();
|
||||
assert_eq!(snapshot.available_disk_space, recorded_free_metric);
|
||||
assert_eq!(snapshot.total_disk_space, recorded_total_metric);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Token test to assert disk usage ratio
|
||||
#[test]
|
||||
fn assert_disk_usage_ratio() {
|
||||
// 80% used
|
||||
let snapshot = DiskSpaceSnapshot {
|
||||
available_disk_space: 2000,
|
||||
total_disk_space: 10000,
|
||||
};
|
||||
assert_eq!(snapshot.disk_usage_ratio(), 0.8);
|
||||
|
||||
// 90% used
|
||||
let snapshot = DiskSpaceSnapshot {
|
||||
available_disk_space: 2000,
|
||||
total_disk_space: 20000,
|
||||
};
|
||||
assert_eq!(snapshot.disk_usage_ratio(), 0.9);
|
||||
|
||||
// Free!
|
||||
let snapshot = DiskSpaceSnapshot {
|
||||
available_disk_space: 42,
|
||||
total_disk_space: 42,
|
||||
};
|
||||
assert_eq!(snapshot.disk_usage_ratio(), 0.0);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue