diff --git a/influxdb3/tests/cli/mod.rs b/influxdb3/tests/cli/mod.rs index 4d2cc393c1..46d57ae9dc 100644 --- a/influxdb3/tests/cli/mod.rs +++ b/influxdb3/tests/cli/mod.rs @@ -2886,7 +2886,7 @@ async fn test_wal_overwritten() { assert_contains!( result.to_string(), - "another process as written to the WAL ahead of this one" + "another process has written to the WAL ahead of this one" ); // give p1 some time to shutdown: diff --git a/influxdb3_shutdown/src/lib.rs b/influxdb3_shutdown/src/lib.rs index bb0c9743dd..705d57abc0 100644 --- a/influxdb3_shutdown/src/lib.rs +++ b/influxdb3_shutdown/src/lib.rs @@ -14,12 +14,12 @@ //! be used to [`wait_for_shutdown`][ShutdownToken::wait_for_shutdown] to trigger component-specific //! cleanup logic before signaling back via [`complete`][ShutdownToken::complete] to indicate that //! shutdown can proceed. -use std::sync::Arc; use observability_deps::tracing::info; use parking_lot::Mutex; use tokio::sync::oneshot; -use tokio_util::{sync::CancellationToken, task::TaskTracker}; +pub use tokio_util::sync::CancellationToken; +use tokio_util::task::TaskTracker; /// Wait for a `SIGTERM` or `SIGINT` to stop the process on UNIX systems #[cfg(unix)] @@ -43,7 +43,9 @@ pub async fn wait_for_signal() { } /// Manage application shutdown -#[derive(Debug)] +/// +/// This deries `Clone`, as the underlying `tokio` types can be shared via clone. +#[derive(Debug, Clone)] pub struct ShutdownManager { frontend_shutdown: CancellationToken, backend_shutdown: CancellationToken, @@ -107,12 +109,13 @@ impl ShutdownManager { /// A token that a component can obtain via [`register`][ShutdownManager::register] /// -/// This implements [`Clone`] so that a component that obtains it can make copies as needed for -/// sub-components or tasks that may be responsible for triggering a shutdown internally. -#[derive(Debug, Clone)] +/// This does not implement `Clone` because there should only be a single instance of a given +/// `ShutdownToken`. If you just need a copy of the `CancellationToken` for invoking shutdown, use +/// [`ShutdownToken::clone_cancellation_token`]. +#[derive(Debug)] pub struct ShutdownToken { token: CancellationToken, - complete_tx: Arc>>>, + complete_tx: Mutex>>, } impl ShutdownToken { @@ -120,7 +123,7 @@ impl ShutdownToken { fn new(token: CancellationToken, complete_tx: oneshot::Sender<()>) -> Self { Self { token, - complete_tx: Arc::new(Mutex::new(Some(complete_tx))), + complete_tx: Mutex::new(Some(complete_tx)), } } @@ -129,6 +132,11 @@ impl ShutdownToken { self.token.cancel(); } + /// Get a clone of the cancellation token for triggering shutdown + pub fn clone_cancellation_token(&self) -> CancellationToken { + self.token.clone() + } + /// Future that completes when [`ShutdownManager`] that issued this token is shutdown pub async fn wait_for_shutdown(&self) { self.token.cancelled().await; diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index df5f489f7a..2c878caa60 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -8,7 +8,7 @@ use bytes::Bytes; use data_types::Timestamp; use futures_util::stream::StreamExt; use hashbrown::HashMap; -use influxdb3_shutdown::ShutdownToken; +use influxdb3_shutdown::{CancellationToken, ShutdownToken}; use iox_time::TimeProvider; use object_store::path::{Path, PathPart}; use object_store::{ObjectStore, PutMode, PutOptions, PutPayload}; @@ -29,7 +29,7 @@ pub struct WalObjectStore { /// number of snapshotted wal files to retain in object store snapshotted_wal_files_to_keep: u64, wal_remover: WalFileRemover, - shutdown_token: ShutdownToken, + shutdown_token: CancellationToken, } impl WalObjectStore { @@ -61,7 +61,7 @@ impl WalObjectStore { last_snapshot_sequence_number, &all_wal_file_paths, snapshotted_wal_files_to_keep, - shutdown.clone(), + shutdown.clone_cancellation_token(), ); wal.replay(last_wal_sequence_number, &all_wal_file_paths) @@ -83,7 +83,7 @@ impl WalObjectStore { last_snapshot_sequence_number: Option, all_wal_file_paths: &[Path], num_wal_files_to_keep: u64, - shutdown_token: ShutdownToken, + shutdown_token: CancellationToken, ) -> Self { let wal_file_sequence_number = last_wal_sequence_number.unwrap_or_default().next(); let oldest_wal_file_num = oldest_wal_file_num(all_wal_file_paths); @@ -322,7 +322,7 @@ impl WalObjectStore { } // trigger application shutdown - self.shutdown_token.trigger_shutdown(); + self.shutdown_token.cancel(); return None; } @@ -765,7 +765,7 @@ enum WalBufferState { #[derive(Debug, thiserror::Error, Copy, Clone)] enum WalBufferErrorState { - #[error("another process as written to the WAL ahead of this one")] + #[error("another process has written to the WAL ahead of this one")] WalAlreadyWrittenTo, } @@ -954,7 +954,6 @@ mod tests { use async_trait::async_trait; use indexmap::IndexMap; use influxdb3_id::{ColumnId, DbId, TableId}; - use influxdb3_shutdown::ShutdownManager; use iox_time::{MockProvider, Time}; use object_store::memory::InMemory; use std::any::Any; @@ -983,7 +982,7 @@ mod tests { None, &paths, 1, - ShutdownManager::new_testing().register(), + CancellationToken::new(), ); let db_name: Arc = "db1".into(); @@ -1195,7 +1194,7 @@ mod tests { None, &paths, 1, - ShutdownManager::new_testing().register(), + CancellationToken::new(), ); assert_eq!( replay_wal.load_existing_wal_file_paths( @@ -1357,7 +1356,7 @@ mod tests { None, &paths, 1, - ShutdownManager::new_testing().register(), + CancellationToken::new(), ); assert_eq!( replay_wal @@ -1405,7 +1404,7 @@ mod tests { None, &paths, 10, - ShutdownManager::new_testing().register(), + CancellationToken::new(), ); assert!(wal.flush_buffer(false).await.is_none()); @@ -1623,7 +1622,7 @@ mod tests { None, &[], 1, - ShutdownManager::new_testing().register(), + CancellationToken::new(), ); {} @@ -1766,7 +1765,7 @@ mod tests { Some(SnapshotSequenceNumber::new(10)), &all_paths, 10, - ShutdownManager::new_testing().register(), + CancellationToken::new(), ); let snapshot_details = SnapshotDetails {