refactor: move the background task handler onto the parent IngesterGuard

* follow the pattern of the periodic wal rotation
* do NOT follow the pattern of the wal.flusher_task
pull/24376/head
wiedld 2023-07-05 13:13:13 -07:00
parent b4b89699cd
commit b961bc79c4
2 changed files with 21 additions and 34 deletions

View File

@ -99,6 +99,11 @@ pub struct IngesterGuard<T> {
/// Aborted on drop. /// Aborted on drop.
rotation_task: tokio::task::JoinHandle<()>, rotation_task: tokio::task::JoinHandle<()>,
/// The handle of the periodic disk protection task.
///
/// Aborted on drop.
disk_protection_task: tokio::task::JoinHandle<()>,
/// The task handle executing the graceful shutdown once triggered. /// The task handle executing the graceful shutdown once triggered.
graceful_shutdown_handler: tokio::task::JoinHandle<()>, graceful_shutdown_handler: tokio::task::JoinHandle<()>,
shutdown_complete: Shared<oneshot::Receiver<()>>, shutdown_complete: Shared<oneshot::Receiver<()>>,
@ -125,6 +130,7 @@ where
impl<T> Drop for IngesterGuard<T> { impl<T> Drop for IngesterGuard<T> {
fn drop(&mut self) { fn drop(&mut self) {
self.rotation_task.abort(); self.rotation_task.abort();
self.disk_protection_task.abort();
self.graceful_shutdown_handler.abort(); self.graceful_shutdown_handler.abort();
} }
} }
@ -332,7 +338,7 @@ where
.map_err(InitError::WalInit)?; .map_err(InitError::WalInit)?;
// Initialize the disk proetction after the WAL directory is initialized // Initialize the disk proetction after the WAL directory is initialized
let disk_protection = InstrumentedDiskProtection::new(wal_directory, &metrics); let disk_protection = InstrumentedDiskProtection::new(wal_directory, &metrics);
disk_protection.start().await; let disk_protection_task = disk_protection.start().await;
// Replay the WAL log files, if any. // Replay the WAL log files, if any.
let max_sequence_number = let max_sequence_number =
@ -411,6 +417,7 @@ where
persist_handle, persist_handle,
), ),
rotation_task, rotation_task,
disk_protection_task,
graceful_shutdown_handler: shutdown_task, graceful_shutdown_handler: shutdown_task,
shutdown_complete: shutdown_rx.shared(), shutdown_complete: shutdown_rx.shared(),
}) })

View File

@ -1,7 +1,6 @@
use std::{borrow::Cow, path::PathBuf, sync::Arc, time::Duration}; use std::{borrow::Cow, path::PathBuf, time::Duration};
use metric::{Attributes, U64Gauge}; use metric::{Attributes, U64Gauge};
use parking_lot::Mutex;
use sysinfo::{DiskExt, System, SystemExt}; use sysinfo::{DiskExt, System, SystemExt};
use tokio::{self, task::JoinHandle}; use tokio::{self, task::JoinHandle};
@ -66,8 +65,6 @@ impl DiskProtectionMetrics {
pub struct InstrumentedDiskProtection { pub struct InstrumentedDiskProtection {
/// The metrics that are reported to the registry. /// The metrics that are reported to the registry.
metrics: DiskProtectionMetrics, metrics: DiskProtectionMetrics,
/// The handle to terminate the background task.
background_task: Mutex<Option<JoinHandle<()>>>,
} }
impl std::fmt::Debug for InstrumentedDiskProtection { impl std::fmt::Debug for InstrumentedDiskProtection {
@ -81,27 +78,12 @@ impl InstrumentedDiskProtection {
pub fn new(directory_to_track: PathBuf, registry: &metric::Registry) -> Self { pub fn new(directory_to_track: PathBuf, registry: &metric::Registry) -> Self {
let metrics = DiskProtectionMetrics::new(directory_to_track, registry); let metrics = DiskProtectionMetrics::new(directory_to_track, registry);
Self { Self { metrics }
metrics,
background_task: Default::default(),
}
} }
/// Start the [`InstrumentedDiskProtection`] background task. /// Start the [`InstrumentedDiskProtection`] background task.
pub async fn start(self) { pub async fn start(self) -> JoinHandle<()> {
let rc_self = Arc::new(self); tokio::task::spawn(async move { self.background_task().await })
let rc_self_clone = Arc::clone(&rc_self);
*rc_self.background_task.lock() = Some(tokio::task::spawn(async move {
rc_self_clone.background_task().await
}));
}
/// Stop the [`InstrumentedDiskProtection`] background task.
pub fn stop(&mut self) {
if let Some(t) = self.background_task.lock().take() {
t.abort()
}
} }
/// The background task that periodically performs the disk protection check. /// The background task that periodically performs the disk protection check.
@ -118,15 +100,10 @@ impl InstrumentedDiskProtection {
} }
} }
impl Drop for InstrumentedDiskProtection {
fn drop(&mut self) {
// future-proof, such that stop does not need to be explicitly called.
self.stop();
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::sync::Arc;
use metric::Metric; use metric::Metric;
use super::*; use super::*;
@ -135,18 +112,20 @@ mod tests {
async fn test_metrics() { async fn test_metrics() {
let registry = Arc::new(metric::Registry::new()); let registry = Arc::new(metric::Registry::new());
struct MockAnyStruct; struct MockAnyStruct {
abort_handle: JoinHandle<()>,
}
impl MockAnyStruct { impl MockAnyStruct {
pub(crate) async fn new(registry: &metric::Registry) -> Self { pub(crate) async fn new(registry: &metric::Registry) -> Self {
let disk_protection = InstrumentedDiskProtection::new(PathBuf::from("/"), registry); let disk_protection = InstrumentedDiskProtection::new(PathBuf::from("/"), registry);
disk_protection.start().await; let abort_handle = disk_protection.start().await;
Self Self { abort_handle }
} }
} }
let _mock = MockAnyStruct::new(&registry).await; let mock = MockAnyStruct::new(&registry).await;
tokio::time::sleep(2 * Duration::from_secs(2)).await; tokio::time::sleep(2 * Duration::from_secs(2)).await;
@ -158,5 +137,6 @@ mod tests {
.fetch(); .fetch();
assert!(recorded_metric > 0_u64); assert!(recorded_metric > 0_u64);
mock.abort_handle.abort();
} }
} }