refactor: remove any callback or Action handling from InstrumentedDiskProtection.
Will be implemented later in stages.pull/24376/head
parent
09654542ec
commit
c2de92afd2
|
@ -45,40 +45,6 @@ impl DiskProtectionMetrics {
|
|||
}
|
||||
}
|
||||
|
||||
/// Protective action taken, per each cycle of background task
|
||||
struct DiskProtectionAction {
|
||||
/// Function used to check if action should be triggered.
|
||||
trigger: Box<dyn FnMut(u64, DiskProtectionState) -> bool + Send + Sync>,
|
||||
/// Callback action taken.
|
||||
callback: Option<Box<dyn FnMut() + Send + Sync>>,
|
||||
/// Next state.
|
||||
next_state: DiskProtectionState,
|
||||
}
|
||||
|
||||
impl DiskProtectionAction {
|
||||
/// Perform the protection action as per the [`DiskProtectionAction`] contract.
|
||||
pub(crate) fn check_trigger(&mut self, measured: u64, curr_state: &Mutex<DiskProtectionState>) {
|
||||
let mut curr_state = curr_state.lock();
|
||||
match (&mut self.callback, (self.trigger)(measured, *curr_state)) {
|
||||
(None, _) => {}
|
||||
(Some(_), false) => {}
|
||||
(Some(callback), true) => {
|
||||
*curr_state = self.next_state;
|
||||
callback();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, PartialEq)]
|
||||
/// Current state of disk protection.
|
||||
enum DiskProtectionState {
|
||||
/// DiskProtection has activated by triggering the appropriate calllback.
|
||||
Activated,
|
||||
/// DiskProtection is not activated, but is still watching (and checking) the metrics.
|
||||
Watching,
|
||||
}
|
||||
|
||||
/// Disk Protection instrument.
|
||||
pub struct InstrumentedDiskProtection {
|
||||
/// How often to perform the disk protection check.
|
||||
|
@ -87,12 +53,6 @@ pub struct InstrumentedDiskProtection {
|
|||
metrics: DiskProtectionMetrics,
|
||||
/// The handle to terminate the background task.
|
||||
background_task: Mutex<Option<JoinHandle<()>>>,
|
||||
/// Current state of disk protection.
|
||||
state: Mutex<DiskProtectionState>,
|
||||
/// Callback triggered when disk protection is enacted.
|
||||
callback_on_protection_begin: tokio::sync::Mutex<DiskProtectionAction>,
|
||||
/// Callback triggered when disk protection has ended.
|
||||
callback_on_protection_end: tokio::sync::Mutex<DiskProtectionAction>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for InstrumentedDiskProtection {
|
||||
|
@ -107,9 +67,6 @@ impl InstrumentedDiskProtection {
|
|||
registry: &metric::Registry,
|
||||
attributes: impl Into<Attributes> + Send,
|
||||
interval_duration: Duration,
|
||||
disk_threshold: u64,
|
||||
callback_on_protection_begin: Option<Box<dyn FnMut() + Send + Sync>>,
|
||||
callback_on_protection_end: Option<Box<dyn FnMut() + Send + Sync>>,
|
||||
) -> Self {
|
||||
let metrics = DiskProtectionMetrics::new(registry, attributes);
|
||||
|
||||
|
@ -117,21 +74,6 @@ impl InstrumentedDiskProtection {
|
|||
interval_duration,
|
||||
metrics,
|
||||
background_task: Default::default(),
|
||||
state: Mutex::new(DiskProtectionState::Watching),
|
||||
callback_on_protection_begin: tokio::sync::Mutex::new(DiskProtectionAction {
|
||||
trigger: Box::new(move |curr_metric: u64, curr_state| {
|
||||
curr_metric <= disk_threshold && curr_state == DiskProtectionState::Watching
|
||||
}),
|
||||
callback: callback_on_protection_begin,
|
||||
next_state: DiskProtectionState::Activated,
|
||||
}),
|
||||
callback_on_protection_end: tokio::sync::Mutex::new(DiskProtectionAction {
|
||||
trigger: Box::new(move |curr_metric: u64, curr_state| {
|
||||
curr_metric > disk_threshold && curr_state == DiskProtectionState::Activated
|
||||
}),
|
||||
callback: callback_on_protection_end,
|
||||
next_state: DiskProtectionState::Watching,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -162,17 +104,7 @@ impl InstrumentedDiskProtection {
|
|||
|
||||
system.refresh_all();
|
||||
|
||||
// Protective actions based upon available_disk_percentage.
|
||||
let available_disk_percentage =
|
||||
self.metrics.measure_available_disk_space_percent(&system);
|
||||
self.callback_on_protection_begin
|
||||
.lock()
|
||||
.await
|
||||
.check_trigger(available_disk_percentage, &self.state);
|
||||
self.callback_on_protection_end
|
||||
.lock()
|
||||
.await
|
||||
.check_trigger(available_disk_percentage, &self.state);
|
||||
self.metrics.measure_available_disk_space_percent(&system);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -186,8 +118,6 @@ impl Drop for InstrumentedDiskProtection {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use metric::Metric;
|
||||
|
||||
use super::*;
|
||||
|
@ -201,14 +131,8 @@ mod tests {
|
|||
|
||||
impl MockAnyStruct {
|
||||
pub(crate) async fn new(registry: &metric::Registry, duration: Duration) -> Self {
|
||||
let disk_protection = InstrumentedDiskProtection::new(
|
||||
registry,
|
||||
&[("test", "mock")],
|
||||
duration,
|
||||
10_u64,
|
||||
None,
|
||||
None,
|
||||
);
|
||||
let disk_protection =
|
||||
InstrumentedDiskProtection::new(registry, &[("test", "mock")], duration);
|
||||
disk_protection.start().await;
|
||||
|
||||
Self
|
||||
|
@ -228,55 +152,4 @@ mod tests {
|
|||
|
||||
assert!(recorded_metric > 0_u64);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_callback_is_triggered() {
|
||||
let registry = Arc::new(metric::Registry::new());
|
||||
let duration = Duration::from_secs(1);
|
||||
|
||||
struct MockStructTrackCallback {
|
||||
callback_triggered: AtomicBool,
|
||||
}
|
||||
|
||||
impl MockStructTrackCallback {
|
||||
pub(crate) async fn new(registry: &metric::Registry, duration: Duration) -> Arc<Self> {
|
||||
let mock = Arc::new(Self {
|
||||
callback_triggered: AtomicBool::new(false),
|
||||
});
|
||||
let mock_for_cb = Arc::clone(&mock);
|
||||
|
||||
let disk_protection = InstrumentedDiskProtection::new(
|
||||
registry,
|
||||
&[("test", "mock")],
|
||||
duration,
|
||||
100_u64,
|
||||
Some(Box::new(move || {
|
||||
mock_for_cb.callback();
|
||||
})),
|
||||
None,
|
||||
);
|
||||
disk_protection.start().await;
|
||||
|
||||
mock
|
||||
}
|
||||
|
||||
fn callback(&self) {
|
||||
self.callback_triggered.store(true, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
let mock = MockStructTrackCallback::new(®istry, duration).await;
|
||||
|
||||
tokio::time::sleep(2 * duration).await;
|
||||
|
||||
let recorded_metric = registry
|
||||
.get_instrument::<Metric<U64Gauge>>("disk_protection_free_disk_space")
|
||||
.expect("metric should exist")
|
||||
.get_observer(&Attributes::from(&[("test", "mock")]))
|
||||
.expect("metric should have labels")
|
||||
.fetch();
|
||||
|
||||
assert!(recorded_metric > 0_u64);
|
||||
assert!(mock.callback_triggered.load(Ordering::SeqCst));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue