From 8e0cee8e73cf3df7d266e20c1d3469928e7e866e Mon Sep 17 00:00:00 2001 From: Fraser Savage Date: Mon, 31 Jul 2023 12:13:42 +0100 Subject: [PATCH] refactor(tracker): Return disk usage watcher from `DiskUsageMetrics` This allows the creator to pass around a handle to the latest observed disk usage statistics, allowing other threads to act upon changes. --- Cargo.lock | 1 + ingester/src/init.rs | 8 +-- tracker/Cargo.toml | 1 + tracker/src/disk_metric.rs | 119 +++++++++++++++++++++++++++++++------ 4 files changed, 107 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 67f9bef536..b46fa0962e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6265,6 +6265,7 @@ dependencies = [ "pin-project", "sysinfo", "tempfile", + "test_helpers", "tokio", "tokio-util", "trace", diff --git a/ingester/src/init.rs b/ingester/src/init.rs index 28798a9dce..8467eb07b4 100644 --- a/ingester/src/init.rs +++ b/ingester/src/init.rs @@ -351,11 +351,9 @@ where // Initialize disk metrics to emit disk capacity / free statistics for the // WAL directory. - let disk_metric_task = tokio::task::spawn( - DiskSpaceMetrics::new(wal_directory, &metrics) - .expect("failed to resolve WAL directory to disk") - .run(), - ); + let (disk_metric_task, _snapshot_rx) = DiskSpaceMetrics::new(wal_directory, &metrics) + .expect("failed to resolve WAL directory to disk"); + let disk_metric_task = tokio::task::spawn(disk_metric_task.run()); // Replay the WAL log files, if any. let max_sequence_number = diff --git a/tracker/Cargo.toml b/tracker/Cargo.toml index 83f584d6e7..4f8c2917cd 100644 --- a/tracker/Cargo.toml +++ b/tracker/Cargo.toml @@ -25,3 +25,4 @@ sysinfo = "0.29.7" tempfile = "3.7.0" # Need the multi-threaded executor for testing tokio = { version = "1.29", features = ["macros", "parking_lot", "rt-multi-thread", "time"] } +test_helpers = { path = "../test_helpers", features = ["future_timeout"] } diff --git a/tracker/src/disk_metric.rs b/tracker/src/disk_metric.rs index c2772a1237..8a39a767e6 100644 --- a/tracker/src/disk_metric.rs +++ b/tracker/src/disk_metric.rs @@ -1,7 +1,10 @@ -use std::{borrow::Cow, path::PathBuf, time::Duration}; +use std::borrow::Cow; +use std::path::PathBuf; +use std::time::Duration; use metric::{Attributes, U64Gauge}; use sysinfo::{DiskExt, RefreshKind, System, SystemExt}; +use tokio::sync::watch; /// The interval at which disk metrics are updated. /// @@ -9,6 +12,32 @@ use sysinfo::{DiskExt, RefreshKind, System, SystemExt}; /// interval. const UPDATE_INTERVAL: Duration = Duration::from_secs(13); +/// An immutable snapshot of space and usage statistics for some disk. +#[derive(Clone, Copy, Debug)] +pub struct DiskSpaceSnapshot { + available_disk_space: u64, + total_disk_space: u64, +} + +impl DiskSpaceSnapshot { + /// The available space in bytes on the disk. + pub fn available_disk_space(&self) -> u64 { + self.available_disk_space + } + + /// The maximum capacity in bytes of the disk. + pub fn total_disk_space(&self) -> u64 { + self.total_disk_space + } + + /// Overall usage of the disk, as a percentage [0.0, 1.0]. + #[inline] + pub fn disk_usage_ratio(&self) -> f64 { + debug_assert!(self.available_disk_space <= self.total_disk_space); + 1.0 - (self.available_disk_space as f64 / self.total_disk_space as f64) + } +} + /// A periodic reporter of disk capacity / free statistics for a given /// directory. #[derive(Debug)] @@ -22,12 +51,19 @@ pub struct DiskSpaceMetrics { /// The index into [`System::disks()`] for the disk containing the observed /// directory. disk_idx: usize, + + /// A stream of [`DiskSpaceSnapshot`] produced by the metric reporter for + /// consumption by any listeners. + snapshot_tx: watch::Sender, } impl DiskSpaceMetrics { /// Create a new [`DiskSpaceMetrics`], returning [`None`] if no disk can be /// found for the specified `directory`. - pub fn new(directory: PathBuf, registry: &metric::Registry) -> Option { + pub fn new( + directory: PathBuf, + registry: &metric::Registry, + ) -> Option<(Self, watch::Receiver)> { let path: Cow<'static, str> = Cow::from(directory.display().to_string()); let mut directory = directory.canonicalize().ok()?; @@ -52,14 +88,14 @@ impl DiskSpaceMetrics { // Resolve the mount point once. // The directory path may be `/path/to/dir` and the mount point is `/`. - let disk_idx = loop { - if let Some((idx, _disk)) = system + let (disk_idx, initial_disk) = loop { + if let Some((idx, disk)) = system .disks() .iter() .enumerate() .find(|(_idx, disk)| disk.mount_point() == directory) { - break idx; + break (idx, disk); } // The mount point for this directory could not be found. if !directory.pop() { @@ -67,18 +103,26 @@ impl DiskSpaceMetrics { } }; - Some(Self { - available_disk_space, - total_disk_space, - system, - disk_idx, - }) + let (snapshot_tx, snapshot_rx) = watch::channel(DiskSpaceSnapshot { + available_disk_space: initial_disk.available_space(), + total_disk_space: initial_disk.total_space(), + }); + + Some(( + Self { + available_disk_space, + total_disk_space, + system, + disk_idx, + snapshot_tx, + }, + snapshot_rx, + )) } /// Start the [`DiskSpaceMetrics`] evaluation loop, blocking forever. pub async fn run(mut self) { let mut interval = tokio::time::interval(UPDATE_INTERVAL); - loop { interval.tick().await; @@ -93,6 +137,13 @@ impl DiskSpaceMetrics { self.available_disk_space.set(disk.available_space()); self.total_disk_space.set(disk.total_space()); + + // Produce and send a [`DiskSpaceSnapshot`] for any listeners + // that might exist. + _ = self.snapshot_tx.send(DiskSpaceSnapshot { + available_disk_space: disk.available_space(), + total_disk_space: disk.total_space(), + }); } } } @@ -103,6 +154,7 @@ mod tests { use metric::Metric; use tempfile::tempdir_in; + use test_helpers::timeout::FutureTimeout; use super::*; @@ -121,11 +173,9 @@ mod tests { let registry = Arc::new(metric::Registry::new()); - let _handle = tokio::spawn( - DiskSpaceMetrics::new(pathbuf, ®istry) - .expect("root always exists") - .run(), - ); + let (_handle, mut snapshot_rx) = + DiskSpaceMetrics::new(pathbuf, ®istry).expect("root always exists"); + let _handle = tokio::spawn(_handle.run()); // Wait for the metric to be emitted and non-zero - this should be very // quick! @@ -151,10 +201,45 @@ mod tests { .fetch(); if recorded_free_metric > 0 && recorded_total_metric > 0 { + snapshot_rx + .changed() + .with_timeout_panic(Duration::from_secs(5)) + .await + .expect("snapshot value should have changed"); + + let snapshot = *snapshot_rx.borrow(); + assert_eq!(snapshot.available_disk_space, recorded_free_metric); + assert_eq!(snapshot.total_disk_space, recorded_total_metric); + return; } tokio::time::sleep(Duration::from_millis(50)).await; } } + + // Token test to assert disk usage ratio + #[test] + fn assert_disk_usage_ratio() { + // 80% used + let snapshot = DiskSpaceSnapshot { + available_disk_space: 2000, + total_disk_space: 10000, + }; + assert_eq!(snapshot.disk_usage_ratio(), 0.8); + + // 90% used + let snapshot = DiskSpaceSnapshot { + available_disk_space: 2000, + total_disk_space: 20000, + }; + assert_eq!(snapshot.disk_usage_ratio(), 0.9); + + // Free! + let snapshot = DiskSpaceSnapshot { + available_disk_space: 42, + total_disk_space: 42, + }; + assert_eq!(snapshot.disk_usage_ratio(), 0.0); + } }