chore: back-port changes to shutdown code from enterprise (#26206)

* refactor: make ShutdownManager Clone

ShutdownManager can be clone since its underlying types from tokio are
all shareable via clone.

* refactor: make ShutdownToken not Clone

Alters the API so that the ShutdownToken is not cloneable. This will help
ensure that the Drop implementation is invoked from the correct place.
pull/26210/head
Trevor Hilton 2025-04-01 11:32:23 -04:00 committed by GitHub
parent d0ae82c54b
commit c7854363c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 29 additions and 22 deletions

View File

@ -2886,7 +2886,7 @@ async fn test_wal_overwritten() {
assert_contains!( assert_contains!(
result.to_string(), result.to_string(),
"another process as written to the WAL ahead of this one" "another process has written to the WAL ahead of this one"
); );
// give p1 some time to shutdown: // give p1 some time to shutdown:

View File

@ -14,12 +14,12 @@
//! be used to [`wait_for_shutdown`][ShutdownToken::wait_for_shutdown] to trigger component-specific //! be used to [`wait_for_shutdown`][ShutdownToken::wait_for_shutdown] to trigger component-specific
//! cleanup logic before signaling back via [`complete`][ShutdownToken::complete] to indicate that //! cleanup logic before signaling back via [`complete`][ShutdownToken::complete] to indicate that
//! shutdown can proceed. //! shutdown can proceed.
use std::sync::Arc;
use observability_deps::tracing::info; use observability_deps::tracing::info;
use parking_lot::Mutex; use parking_lot::Mutex;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio_util::{sync::CancellationToken, task::TaskTracker}; pub use tokio_util::sync::CancellationToken;
use tokio_util::task::TaskTracker;
/// Wait for a `SIGTERM` or `SIGINT` to stop the process on UNIX systems /// Wait for a `SIGTERM` or `SIGINT` to stop the process on UNIX systems
#[cfg(unix)] #[cfg(unix)]
@ -43,7 +43,9 @@ pub async fn wait_for_signal() {
} }
/// Manage application shutdown /// Manage application shutdown
#[derive(Debug)] ///
/// This deries `Clone`, as the underlying `tokio` types can be shared via clone.
#[derive(Debug, Clone)]
pub struct ShutdownManager { pub struct ShutdownManager {
frontend_shutdown: CancellationToken, frontend_shutdown: CancellationToken,
backend_shutdown: CancellationToken, backend_shutdown: CancellationToken,
@ -107,12 +109,13 @@ impl ShutdownManager {
/// A token that a component can obtain via [`register`][ShutdownManager::register] /// A token that a component can obtain via [`register`][ShutdownManager::register]
/// ///
/// This implements [`Clone`] so that a component that obtains it can make copies as needed for /// This does not implement `Clone` because there should only be a single instance of a given
/// sub-components or tasks that may be responsible for triggering a shutdown internally. /// `ShutdownToken`. If you just need a copy of the `CancellationToken` for invoking shutdown, use
#[derive(Debug, Clone)] /// [`ShutdownToken::clone_cancellation_token`].
#[derive(Debug)]
pub struct ShutdownToken { pub struct ShutdownToken {
token: CancellationToken, token: CancellationToken,
complete_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>, complete_tx: Mutex<Option<oneshot::Sender<()>>>,
} }
impl ShutdownToken { impl ShutdownToken {
@ -120,7 +123,7 @@ impl ShutdownToken {
fn new(token: CancellationToken, complete_tx: oneshot::Sender<()>) -> Self { fn new(token: CancellationToken, complete_tx: oneshot::Sender<()>) -> Self {
Self { Self {
token, token,
complete_tx: Arc::new(Mutex::new(Some(complete_tx))), complete_tx: Mutex::new(Some(complete_tx)),
} }
} }
@ -129,6 +132,11 @@ impl ShutdownToken {
self.token.cancel(); self.token.cancel();
} }
/// Get a clone of the cancellation token for triggering shutdown
pub fn clone_cancellation_token(&self) -> CancellationToken {
self.token.clone()
}
/// Future that completes when [`ShutdownManager`] that issued this token is shutdown /// Future that completes when [`ShutdownManager`] that issued this token is shutdown
pub async fn wait_for_shutdown(&self) { pub async fn wait_for_shutdown(&self) {
self.token.cancelled().await; self.token.cancelled().await;

View File

@ -8,7 +8,7 @@ use bytes::Bytes;
use data_types::Timestamp; use data_types::Timestamp;
use futures_util::stream::StreamExt; use futures_util::stream::StreamExt;
use hashbrown::HashMap; use hashbrown::HashMap;
use influxdb3_shutdown::ShutdownToken; use influxdb3_shutdown::{CancellationToken, ShutdownToken};
use iox_time::TimeProvider; use iox_time::TimeProvider;
use object_store::path::{Path, PathPart}; use object_store::path::{Path, PathPart};
use object_store::{ObjectStore, PutMode, PutOptions, PutPayload}; use object_store::{ObjectStore, PutMode, PutOptions, PutPayload};
@ -29,7 +29,7 @@ pub struct WalObjectStore {
/// number of snapshotted wal files to retain in object store /// number of snapshotted wal files to retain in object store
snapshotted_wal_files_to_keep: u64, snapshotted_wal_files_to_keep: u64,
wal_remover: WalFileRemover, wal_remover: WalFileRemover,
shutdown_token: ShutdownToken, shutdown_token: CancellationToken,
} }
impl WalObjectStore { impl WalObjectStore {
@ -61,7 +61,7 @@ impl WalObjectStore {
last_snapshot_sequence_number, last_snapshot_sequence_number,
&all_wal_file_paths, &all_wal_file_paths,
snapshotted_wal_files_to_keep, snapshotted_wal_files_to_keep,
shutdown.clone(), shutdown.clone_cancellation_token(),
); );
wal.replay(last_wal_sequence_number, &all_wal_file_paths) wal.replay(last_wal_sequence_number, &all_wal_file_paths)
@ -83,7 +83,7 @@ impl WalObjectStore {
last_snapshot_sequence_number: Option<SnapshotSequenceNumber>, last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
all_wal_file_paths: &[Path], all_wal_file_paths: &[Path],
num_wal_files_to_keep: u64, num_wal_files_to_keep: u64,
shutdown_token: ShutdownToken, shutdown_token: CancellationToken,
) -> Self { ) -> Self {
let wal_file_sequence_number = last_wal_sequence_number.unwrap_or_default().next(); let wal_file_sequence_number = last_wal_sequence_number.unwrap_or_default().next();
let oldest_wal_file_num = oldest_wal_file_num(all_wal_file_paths); let oldest_wal_file_num = oldest_wal_file_num(all_wal_file_paths);
@ -322,7 +322,7 @@ impl WalObjectStore {
} }
// trigger application shutdown // trigger application shutdown
self.shutdown_token.trigger_shutdown(); self.shutdown_token.cancel();
return None; return None;
} }
@ -765,7 +765,7 @@ enum WalBufferState {
#[derive(Debug, thiserror::Error, Copy, Clone)] #[derive(Debug, thiserror::Error, Copy, Clone)]
enum WalBufferErrorState { enum WalBufferErrorState {
#[error("another process as written to the WAL ahead of this one")] #[error("another process has written to the WAL ahead of this one")]
WalAlreadyWrittenTo, WalAlreadyWrittenTo,
} }
@ -954,7 +954,6 @@ mod tests {
use async_trait::async_trait; use async_trait::async_trait;
use indexmap::IndexMap; use indexmap::IndexMap;
use influxdb3_id::{ColumnId, DbId, TableId}; use influxdb3_id::{ColumnId, DbId, TableId};
use influxdb3_shutdown::ShutdownManager;
use iox_time::{MockProvider, Time}; use iox_time::{MockProvider, Time};
use object_store::memory::InMemory; use object_store::memory::InMemory;
use std::any::Any; use std::any::Any;
@ -983,7 +982,7 @@ mod tests {
None, None,
&paths, &paths,
1, 1,
ShutdownManager::new_testing().register(), CancellationToken::new(),
); );
let db_name: Arc<str> = "db1".into(); let db_name: Arc<str> = "db1".into();
@ -1195,7 +1194,7 @@ mod tests {
None, None,
&paths, &paths,
1, 1,
ShutdownManager::new_testing().register(), CancellationToken::new(),
); );
assert_eq!( assert_eq!(
replay_wal.load_existing_wal_file_paths( replay_wal.load_existing_wal_file_paths(
@ -1357,7 +1356,7 @@ mod tests {
None, None,
&paths, &paths,
1, 1,
ShutdownManager::new_testing().register(), CancellationToken::new(),
); );
assert_eq!( assert_eq!(
replay_wal replay_wal
@ -1405,7 +1404,7 @@ mod tests {
None, None,
&paths, &paths,
10, 10,
ShutdownManager::new_testing().register(), CancellationToken::new(),
); );
assert!(wal.flush_buffer(false).await.is_none()); assert!(wal.flush_buffer(false).await.is_none());
@ -1623,7 +1622,7 @@ mod tests {
None, None,
&[], &[],
1, 1,
ShutdownManager::new_testing().register(), CancellationToken::new(),
); );
{} {}
@ -1766,7 +1765,7 @@ mod tests {
Some(SnapshotSequenceNumber::new(10)), Some(SnapshotSequenceNumber::new(10)),
&all_paths, &all_paths,
10, 10,
ShutdownManager::new_testing().register(), CancellationToken::new(),
); );
let snapshot_details = SnapshotDetails { let snapshot_details = SnapshotDetails {