feat: handle graceful shutdown (#26197)

* feat: add influxdb3_shutdown crate

provides basic wait methods for unix/windows OS's

* feat: graceful shutdown

* docs: add rust docs and test to influxdb3_shutdown

Added rustdoc comments to types and methods in the influxdb3_shutdown
crate as well as a test that shows the ordering of a shutdown.
pull/26202/head
Trevor Hilton 2025-03-31 09:58:40 -04:00 committed by GitHub
parent 87a54814ae
commit 9401137825
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 397 additions and 58 deletions

19
Cargo.lock generated
View File

@ -2766,6 +2766,7 @@ dependencies = [
"influxdb3_process",
"influxdb3_processing_engine",
"influxdb3_server",
"influxdb3_shutdown",
"influxdb3_sys_events",
"influxdb3_telemetry",
"influxdb3_types",
@ -3021,6 +3022,7 @@ dependencies = [
"influxdb3_client",
"influxdb3_internal_api",
"influxdb3_py_api",
"influxdb3_shutdown",
"influxdb3_sys_events",
"influxdb3_types",
"influxdb3_wal",
@ -3104,6 +3106,7 @@ dependencies = [
"influxdb3_process",
"influxdb3_processing_engine",
"influxdb3_py_api",
"influxdb3_shutdown",
"influxdb3_sys_events",
"influxdb3_telemetry",
"influxdb3_types",
@ -3154,6 +3157,18 @@ dependencies = [
"urlencoding",
]
[[package]]
name = "influxdb3_shutdown"
version = "3.0.0-beta.2"
dependencies = [
"futures",
"futures-util",
"observability_deps",
"parking_lot",
"tokio",
"tokio-util",
]
[[package]]
name = "influxdb3_sys_events"
version = "3.0.0-beta.2"
@ -3239,6 +3254,7 @@ dependencies = [
"indexmap 2.7.0",
"influxdb-line-protocol",
"influxdb3_id",
"influxdb3_shutdown",
"iox_time",
"object_store",
"observability_deps",
@ -3281,6 +3297,7 @@ dependencies = [
"influxdb3_id",
"influxdb3_internal_api",
"influxdb3_py_api",
"influxdb3_shutdown",
"influxdb3_sys_events",
"influxdb3_telemetry",
"influxdb3_test_helpers",
@ -6550,6 +6567,8 @@ dependencies = [
"futures-core",
"futures-io",
"futures-sink",
"futures-util",
"hashbrown 0.14.5",
"pin-project-lite",
"tokio",
]

View File

@ -13,8 +13,10 @@ members = [
"influxdb3_processing_engine",
"influxdb3_py_api",
"influxdb3_server",
"influxdb3_shutdown",
"influxdb3_telemetry",
"influxdb3_test_helpers", "influxdb3_types",
"influxdb3_test_helpers",
"influxdb3_types",
"influxdb3_wal",
"influxdb3_write",
"iox_query_influxql_rewrite",
@ -120,7 +122,7 @@ tempfile = "3.14.0"
test-log = { version = "0.2.16", features = ["trace"] }
thiserror = "1.0"
tokio = { version = "1.43", features = ["full"] }
tokio-util = "0.7.13"
tokio-util = { version = "0.7.13", features = ["rt"] }
tonic = { version = "0.11.0", features = ["tls", "tls-roots"] }
tonic-build = "0.11.0"
tonic-health = "0.11.0"

View File

@ -30,11 +30,12 @@ influxdb3_clap_blocks = { path = "../influxdb3_clap_blocks" }
influxdb3_process = { path = "../influxdb3_process", default-features = false }
influxdb3_processing_engine = {path = "../influxdb3_processing_engine"}
influxdb3_server = { path = "../influxdb3_server" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_write = { path = "../influxdb3_write" }
influxdb3_shutdown = { path = "../influxdb3_shutdown" }
influxdb3_sys_events = { path = "../influxdb3_sys_events" }
influxdb3_telemetry = { path = "../influxdb3_telemetry" }
influxdb3_types = { path = "../influxdb3_types" }
influxdb3_sys_events = { path = "../influxdb3_sys_events" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_write = { path = "../influxdb3_write" }
# Crates.io dependencies
anyhow.workspace = true
@ -42,6 +43,7 @@ backtrace.workspace = true
base64.workspace = true
clap.workspace = true
dotenvy.workspace = true
futures.workspace = true
hashbrown.workspace = true
hex.workspace = true
humantime.workspace = true

View File

@ -2,6 +2,7 @@
use anyhow::{Context, bail};
use datafusion_util::config::register_iox_object_store;
use futures::{FutureExt, future::FusedFuture, pin_mut};
use influxdb3_cache::{
distinct_cache::DistinctCacheProvider,
last_cache::{self, LastCacheProvider},
@ -32,6 +33,7 @@ use influxdb3_server::{
query_executor::{CreateQueryExecutorArgs, QueryExecutorImpl},
serve,
};
use influxdb3_shutdown::{ShutdownManager, wait_for_signal};
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
@ -105,6 +107,12 @@ pub enum Error {
#[error("failed to initialize distinct cache: {0:#}")]
InitializeDistinctCache(#[source] influxdb3_cache::distinct_cache::ProviderError),
#[error("lost backend")]
LostBackend,
#[error("lost HTTP/gRPC service")]
LostHttpGrpc,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -457,6 +465,7 @@ pub async fn command(config: Config) -> Result<()> {
// Construct a token to trigger clean shutdown
let frontend_shutdown = CancellationToken::new();
let shutdown_manager = ShutdownManager::new(frontend_shutdown.clone());
let time_provider = Arc::new(SystemProvider::new());
let sys_events_store = Arc::new(SysEventStore::new(Arc::clone(&time_provider) as _));
@ -585,6 +594,7 @@ pub async fn command(config: Config) -> Result<()> {
metric_registry: Arc::clone(&metrics),
snapshotted_wal_files_to_keep: config.snapshotted_wal_files_to_keep,
query_file_limit: config.query_file_limit,
shutdown: shutdown_manager.register(),
})
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;
@ -659,9 +669,85 @@ pub async fn command(config: Config) -> Result<()> {
} else {
builder.build().await
};
serve(server, frontend_shutdown, startup_timer).await?;
Ok(())
// There are two different select! macros - tokio::select and futures::select
//
// tokio::select takes ownership of the passed future "moving" it into the
// select block. This works well when not running select inside a loop, or
// when using a future that can be dropped and recreated, often the case
// with tokio's futures e.g. `channel.recv()`
//
// futures::select is more flexible as it doesn't take ownership of the provided
// future. However, to safely provide this it imposes some additional
// requirements
//
// All passed futures must implement FusedFuture - it is IB to poll a future
// that has returned Poll::Ready(_). A FusedFuture has an is_terminated()
// method that indicates if it is safe to poll - e.g. false if it has
// returned Poll::Ready(_). futures::select uses this to implement its
// functionality. futures::FutureExt adds a fuse() method that
// wraps an arbitrary future and makes it a FusedFuture
//
// The additional requirement of futures::select is that if the future passed
// outlives the select block, it must be Unpin or already Pinned
// Create the FusedFutures that will be waited on before exiting the process
let signal = wait_for_signal().fuse();
let frontend = serve(server, frontend_shutdown.clone(), startup_timer).fuse();
let backend = shutdown_manager.join().fuse();
// pin_mut constructs a Pin<&mut T> from a T by preventing moving the T
// from the current stack frame and constructing a Pin<&mut T> to it
pin_mut!(signal);
pin_mut!(frontend);
pin_mut!(backend);
let mut res = Ok(());
// Graceful shutdown can be triggered by sending SIGINT or SIGTERM to the
// process, or by a background task exiting - most likely with an error
while !frontend.is_terminated() {
futures::select! {
// External shutdown signal, e.g., `ctrl+c`
_ = signal => info!("shutdown requested"),
// `join` on the `ShutdownManager` has completed
_ = backend => {
// If something stops the process on the backend the frontend shutdown should have
// been signaled in which case we can break the loop here once checking that it
// has been cancelled.
//
// The select! could also pick this branch in the event that the frontend and
// backend stop at the same time. That shouldn't be an issue so long as the frontend
// so long as the frontend has indeed stopped.
if frontend_shutdown.is_cancelled() {
break;
}
error!("backend shutdown before frontend");
res = res.and(Err(Error::LostBackend));
}
// HTTP/gRPC frontend has stopped
result = frontend => match result {
Ok(_) if frontend_shutdown.is_cancelled() => info!("HTTP/gRPC service shutdown"),
Ok(_) => {
error!("early HTTP/gRPC service exit");
res = res.and(Err(Error::LostHttpGrpc));
},
Err(error) => {
error!("HTTP/gRPC error");
res = res.and(Err(Error::Server(error)));
}
}
}
shutdown_manager.shutdown()
}
info!("frontend shutdown completed");
if !backend.is_terminated() {
backend.await;
}
info!("backend shutdown completed");
res
}
pub(crate) fn setup_processing_engine_env_manager(

View File

@ -22,6 +22,7 @@ influxdb3_client = { path = "../influxdb3_client" }
influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_py_api = { path = "../influxdb3_py_api" }
influxdb3_types = { path = "../influxdb3_types" }
influxdb3_shutdown = { path = "../influxdb3_shutdown"}
influxdb3_sys_events = { path = "../influxdb3_sys_events"}
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_write = { path = "../influxdb3_write" }

View File

@ -754,6 +754,7 @@ mod tests {
use influxdb3_catalog::catalog::Catalog;
use influxdb3_catalog::log::{TriggerSettings, TriggerSpecificationDefinition};
use influxdb3_internal_api::query_executor::UnimplementedQueryExecutor;
use influxdb3_shutdown::ShutdownManager;
use influxdb3_sys_events::SysEventStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::persister::Persister;
@ -993,6 +994,7 @@ mod tests {
)
.await
.unwrap();
let shutdown = ShutdownManager::new_testing();
let wbuf = WriteBufferImpl::new(WriteBufferImplArgs {
persister,
catalog: Arc::clone(&catalog),
@ -1005,6 +1007,7 @@ mod tests {
metric_registry: Arc::clone(&metric_registry),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
shutdown: shutdown.register(),
})
.await
.unwrap();

View File

@ -41,13 +41,14 @@ influxdb3_id = { path = "../influxdb3_id" }
influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_process = { path = "../influxdb3_process", default-features = false }
influxdb3_processing_engine = { path = "../influxdb3_processing_engine" }
influxdb3_types = { path = "../influxdb3_types"}
influxdb3_py_api = {path = "../influxdb3_py_api"}
influxdb3_sys_events = { path = "../influxdb3_sys_events" }
influxdb3_shutdown = { path = "../influxdb3_shutdown" }
influxdb3_telemetry = { path = "../influxdb3_telemetry" }
influxdb3_types = { path = "../influxdb3_types"}
influxdb3_wal = { path = "../influxdb3_wal"}
influxdb3_write = { path = "../influxdb3_write" }
iox_query_influxql_rewrite = { path = "../iox_query_influxql_rewrite" }
influxdb3_sys_events = { path = "../influxdb3_sys_events" }
influxdb3_telemetry = { path = "../influxdb3_telemetry" }
# crates.io Dependencies
anyhow.workspace = true

View File

@ -182,28 +182,6 @@ where
Ok(())
}
/// On unix platforms we want to intercept SIGINT and SIGTERM
/// This method returns if either are signalled
#[cfg(unix)]
pub async fn wait_for_signal() {
use observability_deps::tracing::info;
use tokio::signal::unix::{SignalKind, signal};
let mut term = signal(SignalKind::terminate()).expect("failed to register signal handler");
let mut int = signal(SignalKind::interrupt()).expect("failed to register signal handler");
tokio::select! {
_ = term.recv() => info!("Received SIGTERM"),
_ = int.recv() => info!("Received SIGINT"),
}
}
#[cfg(windows)]
/// ctrl_c is the cross-platform way to intercept the equivalent of SIGINT
/// This method returns if this occurs
pub async fn wait_for_signal() {
let _ = tokio::signal::ctrl_c().await;
}
#[cfg(test)]
mod tests {
use crate::auth::DefaultAuthorizer;
@ -219,6 +197,7 @@ mod tests {
use influxdb3_processing_engine::ProcessingEngineManagerImpl;
use influxdb3_processing_engine::environment::DisabledManager;
use influxdb3_processing_engine::plugins::ProcessingEngineEnvironmentManager;
use influxdb3_shutdown::ShutdownManager;
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::WalConfig;
@ -782,6 +761,8 @@ mod tests {
.await
.unwrap(),
);
let frontend_shutdown = CancellationToken::new();
let shutdown_manager = ShutdownManager::new(frontend_shutdown.clone());
let write_buffer_impl = influxdb3_write::write_buffer::WriteBufferImpl::new(
influxdb3_write::write_buffer::WriteBufferImplArgs {
persister: Arc::clone(&persister),
@ -802,6 +783,7 @@ mod tests {
metric_registry: Arc::clone(&metrics),
snapshotted_wal_files_to_keep: 100,
query_file_limit: None,
shutdown: shutdown_manager.register(),
},
)
.await
@ -863,7 +845,6 @@ mod tests {
.processing_engine(processing_engine)
.build()
.await;
let frontend_shutdown = CancellationToken::new();
let shutdown = frontend_shutdown.clone();
tokio::spawn(async move { serve(server, frontend_shutdown, server_start_time).await });

View File

@ -750,6 +750,7 @@ mod tests {
};
use influxdb3_catalog::catalog::Catalog;
use influxdb3_internal_api::query_executor::QueryExecutor;
use influxdb3_shutdown::ShutdownManager;
use influxdb3_sys_events::SysEventStore;
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
@ -822,6 +823,7 @@ mod tests {
.await
.unwrap(),
);
let shutdown = ShutdownManager::new_testing();
let write_buffer_impl = WriteBufferImpl::new(WriteBufferImplArgs {
persister,
catalog: Arc::clone(&catalog),
@ -846,6 +848,7 @@ mod tests {
metric_registry: Default::default(),
snapshotted_wal_files_to_keep: 1,
query_file_limit,
shutdown: shutdown.register(),
})
.await
.unwrap();

View File

@ -0,0 +1,20 @@
[package]
name = "influxdb3_shutdown"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
# influxdb3_core dependencies
observability_deps.workspace = true
# crates.io dependencies
futures.workspace = true
futures-util.workspace = true
parking_lot.workspace = true
tokio.workspace = true
tokio-util.workspace = true
[lints]
workspace = true

View File

@ -0,0 +1,192 @@
//! Manage application shutdown
//!
//! This crate provides a set of types for managing graceful application shutdown.
//!
//! # Coordinate shutdown with the [`ShutdownManager`] type
//!
//! The [`ShutdownManager`] is used to coordinate shutdown of the process in an ordered fashion.
//! When a shutdown is signaled externally, e.g., `ctrl+c`, or internally by some error state, then
//! there may be processes running on the backend that need to be gracefully stopped before the
//! HTTP/gRPC frontend. For example, if the WAL has writes buffered, it needs to flush the buffer to
//! object store and respond to the write request before the HTTP/gRPC frontend is taken down.
//!
//! Components can [`register`][ShutdownManager::register] to receive a [`ShutdownToken`], which can
//! be used to [`wait_for_shutdown`][ShutdownToken::wait_for_shutdown] to trigger component-specific
//! cleanup logic before signaling back via [`complete`][ShutdownToken::complete] to indicate that
//! shutdown can proceed.
use std::sync::Arc;
use observability_deps::tracing::info;
use parking_lot::Mutex;
use tokio::sync::oneshot;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
/// Wait for a `SIGTERM` or `SIGINT` to stop the process on UNIX systems
#[cfg(unix)]
pub async fn wait_for_signal() {
use tokio::signal::unix::{SignalKind, signal};
let mut term = signal(SignalKind::terminate()).expect("failed to register signal handler");
let mut int = signal(SignalKind::interrupt()).expect("failed to register signal handler");
tokio::select! {
_ = term.recv() => info!("Received SIGTERM"),
_ = int.recv() => info!("Received SIGINT"),
}
}
/// Wait for a `ctrl+c` to stop the process on Windows systems
#[cfg(windows)]
pub async fn wait_for_signal() {
let _ = tokio::signal::ctrl_c().await;
info!("Received SIGINT");
}
/// Manage application shutdown
#[derive(Debug)]
pub struct ShutdownManager {
frontend_shutdown: CancellationToken,
backend_shutdown: CancellationToken,
tasks: TaskTracker,
}
impl ShutdownManager {
/// Create a [`ShutdownManager`]
///
/// Accepts a [`CancellationToken`] which the `ShutdownManager` will signal cancellation to
/// after the backend has cleanly shutdown.
pub fn new(frontend_shutdown: CancellationToken) -> Self {
Self {
frontend_shutdown,
backend_shutdown: CancellationToken::new(),
tasks: TaskTracker::new(),
}
}
/// Create a [`ShutdownManager`] for testing purposes
///
/// This handles creation of the frontend [`CancellationToken`] for tests where `tokio-util` is
/// not a dependency, or hanlding of the frontend shutdown is not necessary.
pub fn new_testing() -> Self {
Self {
frontend_shutdown: CancellationToken::new(),
backend_shutdown: CancellationToken::new(),
tasks: TaskTracker::new(),
}
}
/// Register a task that needs to perform work before the process may exit
///
/// Provides a [`ShutdownToken`] which the caller is responsible for handling. The caller must
/// invoke [`complete`][ShutdownToken::complete] in order for process shutdown to succeed.
pub fn register(&self) -> ShutdownToken {
let (tx, rx) = oneshot::channel();
self.tasks.spawn(rx);
ShutdownToken::new(self.backend_shutdown.clone(), tx)
}
/// Waits for registered tasks to complete before signaling shutdown to frontend
///
/// The future returned will complete when all registered tasks have signaled completion via
/// [`complete`][ShutdownToken::complete]
pub async fn join(&self) {
self.tasks.close();
self.tasks.wait().await;
self.frontend_shutdown.cancel();
}
/// Invoke application shutdown
///
/// This will signal backend shutdown and wake the
/// [`wait_for_shutdown`][ShutdownToken::wait_for_shutdown] future so that registered tasks can
/// clean up before indicating completion.
pub fn shutdown(&self) {
self.backend_shutdown.cancel();
}
}
/// A token that a component can obtain via [`register`][ShutdownManager::register]
///
/// This implements [`Clone`] so that a component that obtains it can make copies as needed for
/// sub-components or tasks that may be responsible for triggering a shutdown internally.
#[derive(Debug, Clone)]
pub struct ShutdownToken {
token: CancellationToken,
complete_tx: Arc<Mutex<Option<oneshot::Sender<()>>>>,
}
impl ShutdownToken {
/// Create a new [`ShutdownToken`]
fn new(token: CancellationToken, complete_tx: oneshot::Sender<()>) -> Self {
Self {
token,
complete_tx: Arc::new(Mutex::new(Some(complete_tx))),
}
}
/// Trigger application shutdown due to some unrecoverable state
pub fn trigger_shutdown(&self) {
self.token.cancel();
}
/// Future that completes when [`ShutdownManager`] that issued this token is shutdown
pub async fn wait_for_shutdown(&self) {
self.token.cancelled().await;
}
/// Signal back to the [`ShutdownManager`] that the component that owns this token is finished
/// cleaning up and it is safe for the process to exit
pub fn complete(&self) {
if let Some(s) = self.complete_tx.lock().take() {
let _ = s.send(());
}
}
}
#[cfg(test)]
mod tests {
use std::{
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use futures::FutureExt;
use tokio_util::sync::CancellationToken;
use crate::ShutdownManager;
#[tokio::test]
async fn test_shutdown_order() {
let frontend_token = CancellationToken::new();
let shutdown_manager = ShutdownManager::new(frontend_token.clone());
static CLEAN: AtomicBool = AtomicBool::new(false);
let token = shutdown_manager.register();
tokio::spawn(async move {
loop {
futures::select! {
_ = token.wait_for_shutdown().fuse() => {
CLEAN.store(true, Ordering::SeqCst);
token.complete();
break;
}
_ = tokio::time::sleep(Duration::from_millis(10)).fuse() => {
// sleeping... 😴
}
}
}
});
shutdown_manager.shutdown();
shutdown_manager.join().await;
assert!(
CLEAN.load(Ordering::SeqCst),
"backend shutdown did not compelte"
);
assert!(
frontend_token.is_cancelled(),
"frontend shutdown was not triggered"
);
}
}

View File

@ -15,6 +15,7 @@ schema.workspace = true
# Local Crates
influxdb3_id = { path = "../influxdb3_id" }
influxdb3_shutdown = { path = "../influxdb3_shutdown" }
# crates.io dependencies
async-trait.workspace = true

View File

@ -15,6 +15,7 @@ use indexmap::IndexMap;
use influxdb_line_protocol::FieldValue;
use influxdb_line_protocol::v3::SeriesValue;
use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId};
use influxdb3_shutdown::ShutdownToken;
use iox_time::Time;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
@ -544,30 +545,38 @@ pub struct SnapshotDetails {
pub fn background_wal_flush<W: Wal>(
wal: Arc<W>,
flush_interval: Duration,
shutdown: ShutdownToken,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(flush_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await;
tokio::select! {
_ = shutdown.wait_for_shutdown() => {
wal.shutdown().await;
break;
},
_ = interval.tick() => {
let cleanup_after_snapshot = wal.flush_buffer().await;
let cleanup_after_snapshot = wal.flush_buffer().await;
// handle snapshot cleanup outside of the flush loop
if let Some((snapshot_complete, snapshot_info, snapshot_permit)) =
cleanup_after_snapshot
{
let snapshot_wal = Arc::clone(&wal);
tokio::spawn(async move {
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
assert_eq!(snapshot_info, snapshot_details);
// handle snapshot cleanup outside of the flush loop
if let Some((snapshot_complete, snapshot_info, snapshot_permit)) =
cleanup_after_snapshot
{
let snapshot_wal = Arc::clone(&wal);
tokio::spawn(async move {
let snapshot_details = snapshot_complete.await.expect("snapshot failed");
assert_eq!(snapshot_info, snapshot_details);
snapshot_wal
.cleanup_snapshot(snapshot_info, snapshot_permit)
.await;
});
snapshot_wal
.cleanup_snapshot(snapshot_info, snapshot_permit)
.await;
});
}
}
}
}
shutdown.complete();
})
}

View File

@ -8,6 +8,7 @@ use bytes::Bytes;
use data_types::Timestamp;
use futures_util::stream::StreamExt;
use hashbrown::HashMap;
use influxdb3_shutdown::ShutdownToken;
use iox_time::TimeProvider;
use object_store::path::{Path, PathPart};
use object_store::{ObjectStore, PutPayload};
@ -43,13 +44,14 @@ impl WalObjectStore {
last_wal_sequence_number: Option<WalFileSequenceNumber>,
last_snapshot_sequence_number: Option<SnapshotSequenceNumber>,
snapshotted_wal_files_to_keep: u64,
shutdown: ShutdownToken,
) -> Result<Arc<Self>, crate::Error> {
let node_identifier = node_identifier_prefix.into();
let all_wal_file_paths =
load_all_wal_file_paths(Arc::clone(&object_store), node_identifier.clone()).await?;
let flush_interval = config.flush_interval;
let wal = Self::new_without_replay(
time_provider,
Arc::clone(&time_provider),
object_store,
node_identifier,
file_notifier,
@ -63,7 +65,7 @@ impl WalObjectStore {
wal.replay(last_wal_sequence_number, &all_wal_file_paths)
.await?;
let wal = Arc::new(wal);
background_wal_flush(Arc::clone(&wal), flush_interval);
background_wal_flush(Arc::clone(&wal), flush_interval, shutdown);
Ok(wal)
}
@ -212,7 +214,7 @@ impl WalObjectStore {
}
/// Stop accepting write operations, flush of buffered writes to a WAL file and return when done.
pub async fn shutdown(&mut self) {
pub async fn shutdown_inner(&self) {
// stop accepting writes
self.flush_buffer.lock().await.wal_buffer.is_shutdown = true;
@ -566,7 +568,7 @@ impl Wal for WalObjectStore {
}
async fn shutdown(&self) {
self.shutdown().await
self.shutdown_inner().await
}
fn add_file_notifier(&self, notifier: Arc<dyn WalFileNotifier>) {
@ -661,7 +663,7 @@ impl FlushBuffer {
// swap out the filled buffer with a new one
let mut new_buffer = WalBuffer {
time_provider: Arc::clone(&self.time_provider),
is_shutdown: false,
is_shutdown: self.wal_buffer.is_shutdown,
wal_file_sequence_number: self.wal_buffer.wal_file_sequence_number.next(),
op_limit: self.wal_buffer.op_limit,
op_count: 0,
@ -715,6 +717,9 @@ pub enum WriteResult {
impl WalBuffer {
fn write_ops_unconfirmed(&mut self, ops: Vec<WalOp>) -> crate::Result<(), crate::Error> {
if self.is_shutdown {
return Err(crate::Error::Shutdown);
}
if self.op_count >= self.op_limit {
return Err(crate::Error::BufferFull(self.op_count));
}
@ -754,6 +759,9 @@ impl WalBuffer {
ops: Vec<WalOp>,
response: oneshot::Sender<WriteResult>,
) -> crate::Result<(), crate::Error> {
if self.is_shutdown {
return Err(crate::Error::Shutdown);
}
self.write_op_responses.push(response);
self.write_ops_unconfirmed(ops)?;

View File

@ -29,12 +29,13 @@ influxdb3_cache = { path = "../influxdb3_cache" }
influxdb3_catalog = { path = "../influxdb3_catalog" }
influxdb3_id = { path = "../influxdb3_id" }
influxdb3_internal_api = { path = "../influxdb3_internal_api" }
influxdb3_test_helpers = { path = "../influxdb3_test_helpers" }
influxdb3_wal = { path = "../influxdb3_wal" }
influxdb3_telemetry = { path = "../influxdb3_telemetry" }
influxdb3_types = { path = "../influxdb3_types" }
influxdb3_sys_events = { path = "../influxdb3_sys_events" }
influxdb3_py_api = {path = "../influxdb3_py_api"}
influxdb3_sys_events = { path = "../influxdb3_sys_events" }
influxdb3_shutdown = { path = "../influxdb3_shutdown" }
influxdb3_telemetry = { path = "../influxdb3_telemetry" }
influxdb3_test_helpers = { path = "../influxdb3_test_helpers" }
influxdb3_types = { path = "../influxdb3_types" }
influxdb3_wal = { path = "../influxdb3_wal" }
# crates.io dependencies
anyhow.workspace = true

View File

@ -4,6 +4,7 @@ mod metrics;
pub mod persisted_files;
pub mod queryable_buffer;
mod table_buffer;
use influxdb3_shutdown::ShutdownToken;
use tokio::sync::{oneshot, watch::Receiver};
use trace::span::{MetaValue, SpanRecorder};
pub mod validator;
@ -177,6 +178,7 @@ pub struct WriteBufferImplArgs {
pub metric_registry: Arc<Registry>,
pub snapshotted_wal_files_to_keep: u64,
pub query_file_limit: Option<usize>,
pub shutdown: ShutdownToken,
}
impl WriteBufferImpl {
@ -193,6 +195,7 @@ impl WriteBufferImpl {
metric_registry,
snapshotted_wal_files_to_keep,
query_file_limit,
shutdown,
}: WriteBufferImplArgs,
) -> Result<Arc<Self>> {
// load snapshots and replay the wal into the in memory buffer
@ -240,6 +243,7 @@ impl WriteBufferImpl {
last_wal_sequence_number,
last_snapshot_sequence_number,
snapshotted_wal_files_to_keep,
shutdown,
)
.await?;
@ -655,6 +659,7 @@ mod tests {
use influxdb3_catalog::catalog::CatalogSequenceNumber;
use influxdb3_catalog::log::FieldDataType;
use influxdb3_id::{ColumnId, DbId, ParquetFileId};
use influxdb3_shutdown::ShutdownManager;
use influxdb3_test_helpers::object_store::RequestCountedObjectStore;
use influxdb3_types::http::LastCacheSize;
use influxdb3_wal::{Gen1Duration, SnapshotSequenceNumber, WalFileSequenceNumber};
@ -761,6 +766,7 @@ mod tests {
metric_registry: Default::default(),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
shutdown: ShutdownManager::new_testing().register(),
})
.await
.unwrap();
@ -867,6 +873,7 @@ mod tests {
metric_registry: Default::default(),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
shutdown: ShutdownManager::new_testing().register(),
})
.await
.unwrap();
@ -957,6 +964,7 @@ mod tests {
metric_registry: Default::default(),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
shutdown: ShutdownManager::new_testing().register(),
})
.await
.unwrap()
@ -1214,6 +1222,7 @@ mod tests {
metric_registry: Default::default(),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
shutdown: ShutdownManager::new_testing().register(),
})
.await
.unwrap();
@ -3090,6 +3099,7 @@ mod tests {
metric_registry: Arc::clone(&metric_registry),
snapshotted_wal_files_to_keep: 10,
query_file_limit: None,
shutdown: ShutdownManager::new_testing().register(),
})
.await
.unwrap();