diff --git a/Cargo.lock b/Cargo.lock index e1365f2c91..14850b0c8e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2642,6 +2642,7 @@ dependencies = [ "test_helpers", "thiserror", "tokio", + "tokio-util", "tonic", "trace", "uuid", @@ -2895,6 +2896,7 @@ dependencies = [ "object_store", "parquet_file", "thiserror", + "tokio-util", "trace", "workspace-hack", ] @@ -2912,6 +2914,7 @@ dependencies = [ "observability_deps", "snafu", "tokio", + "tokio-util", "trace", "workspace-hack", ] @@ -2951,6 +2954,7 @@ dependencies = [ "metric", "object_store", "thiserror", + "tokio-util", "trace", "workspace-hack", "write_buffer", @@ -2962,6 +2966,7 @@ version = "0.1.0" dependencies = [ "async-trait", "clap_blocks", + "futures", "hyper", "ingester2", "iox_catalog", @@ -3001,6 +3006,7 @@ dependencies = [ "sharder", "thiserror", "tokio", + "tokio-util", "tonic", "trace", "workspace-hack", diff --git a/ingester2/Cargo.toml b/ingester2/Cargo.toml index 02f9156886..20417cd624 100644 --- a/ingester2/Cargo.toml +++ b/ingester2/Cargo.toml @@ -48,6 +48,7 @@ trace = { version = "0.1.0", path = "../trace" } uuid = "1.2.2" wal = { version = "0.1.0", path = "../wal" } workspace-hack = { path = "../workspace-hack"} +tokio-util = "0.7.4" [dev-dependencies] assert_matches = "1.5.0" diff --git a/ingester2/src/init.rs b/ingester2/src/init.rs index 7582d9d827..c5c07e83c7 100644 --- a/ingester2/src/init.rs +++ b/ingester2/src/init.rs @@ -20,6 +20,7 @@ use observability_deps::tracing::*; use parquet_file::storage::ParquetStorage; use thiserror::Error; use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; use wal::Wal; use crate::{ @@ -198,7 +199,7 @@ pub async fn new( shutdown: F, ) -> Result, InitError> where - F: Future + Send + 'static, + F: Future + Send + 'static, { // Create the transition shard. let mut txn = catalog diff --git a/ingester2/src/init/graceful_shutdown.rs b/ingester2/src/init/graceful_shutdown.rs index 1f72456ce9..84b44df019 100644 --- a/ingester2/src/init/graceful_shutdown.rs +++ b/ingester2/src/init/graceful_shutdown.rs @@ -3,6 +3,7 @@ use std::{sync::Arc, time::Duration}; use futures::Future; use observability_deps::tracing::*; use tokio::sync::oneshot; +use tokio_util::sync::CancellationToken; use crate::{ ingest_state::{IngestState, IngestStateError}, @@ -40,11 +41,12 @@ pub(super) async fn graceful_shutdown_handler( persist: P, wal: Arc, ) where - F: Future + Send, + F: Future + Send, T: PartitionIter + Sync, P: PersistQueue + Clone, { - fut.await; + // Obtain the cancellation token that stops the RPC server. + let rpc_server_stop = fut.await; info!("gracefully stopping ingester"); // Reject RPC writes. @@ -118,6 +120,10 @@ pub(super) async fn graceful_shutdown_handler( } info!("persisted all data - stopping ingester"); + + // Stop the RPC server (and therefore stop accepting new queries) + rpc_server_stop.cancel(); + // And signal the ingester has stopped. let _ = complete.send(()); } @@ -197,9 +203,10 @@ mod tests { let (_tempdir, wal) = new_wal().await; let partition = new_partition(); + let rpc_stop = CancellationToken::new(); let (tx, rx) = oneshot::channel(); graceful_shutdown_handler( - ready(()), + ready(rpc_stop.clone()), tx, ingest_state, vec![Arc::clone(&partition)], @@ -213,6 +220,8 @@ mod tests { .await .expect("shutdown task panicked"); + assert!(rpc_stop.is_cancelled()); + // Assert the data was persisted let persist_calls = persist.calls(); assert_matches!(&*persist_calls, [p] => { @@ -238,9 +247,10 @@ mod tests { // Start the graceful shutdown job in another thread, as it SHOULD block // until the persist job is marked as complete. + let rpc_stop = CancellationToken::new(); let (tx, rx) = oneshot::channel(); let handle = tokio::spawn(graceful_shutdown_handler( - ready(()), + ready(rpc_stop.clone()), tx, ingest_state, vec![Arc::clone(&partition)], @@ -260,6 +270,10 @@ mod tests { let rx = rx.shared(); assert_matches!(futures::poll!(rx.clone()), Poll::Pending); + // And because the shutdown is still ongoing, the RPC server must not + // have been signalled to stop. + assert!(!rpc_stop.is_cancelled()); + // Mark the persist job as having completed, unblocking the shutdown // task. partition.lock().mark_persisted(persist_job); @@ -269,6 +283,8 @@ mod tests { .await .expect("shutdown task panicked"); + assert!(rpc_stop.is_cancelled()); + assert!(handle .with_timeout_panic(Duration::from_secs(1)) .await @@ -331,9 +347,10 @@ mod tests { // Start the graceful shutdown job in another thread, as it SHOULD block // until the persist job is marked as complete. + let rpc_stop = CancellationToken::new(); let (tx, rx) = oneshot::channel(); let handle = tokio::spawn(graceful_shutdown_handler( - ready(()), + ready(rpc_stop.clone()), tx, ingest_state, Arc::clone(&buffer), @@ -346,6 +363,8 @@ mod tests { .await .expect("shutdown task panicked"); + assert!(rpc_stop.is_cancelled()); + assert!(handle .with_timeout_panic(Duration::from_secs(1)) .await diff --git a/ioxd_common/src/lib.rs b/ioxd_common/src/lib.rs index 84b7c03fe7..3fa3701d0a 100644 --- a/ioxd_common/src/lib.rs +++ b/ioxd_common/src/lib.rs @@ -204,15 +204,24 @@ pub async fn serve( // registry, don't exit before HTTP and gRPC requests dependent on them while !grpc_server.is_terminated() && !http_server.is_terminated() { futures::select! { - _ = signal => info!(?server_type, "Shutdown requested"), + _ = signal => info!(?server_type, "shutdown requested"), _ = server_handle => { - error!(?server_type, "server worker shutdown prematurely"); + // If the frontend & backend stop together, the select! may + // choose to follow the "background has shutdown" signal instead + // of one of the frontend paths. + // + // This should not be a problem so long as the frontend has + // stopped. + if frontend_shutdown.is_cancelled() { + break; + } + error!(?server_type, "server worker shutdown before frontend"); res = res.and(Err(Error::LostServer)); }, result = grpc_server => match result { Ok(_) if frontend_shutdown.is_cancelled() => info!(?server_type, "gRPC server shutdown"), Ok(_) => { - error!(?server_type, "Early gRPC server exit"); + error!(?server_type, "early gRPC server exit"); res = res.and(Err(Error::LostRpc)); } Err(error) => { @@ -223,7 +232,7 @@ pub async fn serve( result = http_server => match result { Ok(_) if frontend_shutdown.is_cancelled() => info!(?server_type, "HTTP server shutdown"), Ok(_) => { - error!(?server_type, "Early HTTP server exit"); + error!(?server_type, "early HTTP server exit"); res = res.and(Err(Error::LostHttp)); } Err(error) => { @@ -233,11 +242,13 @@ pub async fn serve( }, } - frontend_shutdown.cancel() + // Delegate shutting down the frontend to the background shutdown + // handler, allowing it to sequence the stopping of the RPC/HTTP + // servers as needed. + server_type.shutdown(frontend_shutdown.clone()) } info!(?server_type, "frontend shutdown completed"); - server_type.shutdown(); if !server_handle.is_terminated() { server_handle.await; } diff --git a/ioxd_common/src/server_type.rs b/ioxd_common/src/server_type.rs index 7c4bd32077..ab479d98a5 100644 --- a/ioxd_common/src/server_type.rs +++ b/ioxd_common/src/server_type.rs @@ -6,6 +6,7 @@ use async_trait::async_trait; use hyper::{Body, Request, Response}; use metric::Registry; use snafu::Snafu; +use tokio_util::sync::CancellationToken; use trace::TraceCollector; pub use common_state::{CommonServerState, CommonServerStateError}; @@ -60,5 +61,9 @@ pub trait ServerType: std::fmt::Debug + Send + Sync + 'static { async fn join(self: Arc); /// Shutdown background worker. - fn shutdown(&self); + /// + /// The provided [`CancellationToken`] MUST be used by the background worker + /// to shutdown the "frontend" (HTTP & RPC servers) when appropriate - this + /// should happen before [`Self::join()`] returns. + fn shutdown(&self, frontend: CancellationToken); } diff --git a/ioxd_compactor/Cargo.toml b/ioxd_compactor/Cargo.toml index 0df3f5a6ac..75d60a4728 100644 --- a/ioxd_compactor/Cargo.toml +++ b/ioxd_compactor/Cargo.toml @@ -25,3 +25,4 @@ hyper = "0.14" thiserror = "1.0.38" workspace-hack = { path = "../workspace-hack"} parquet_file = { version = "0.1.0", path = "../parquet_file" } +tokio-util = "0.7.4" diff --git a/ioxd_compactor/src/lib.rs b/ioxd_compactor/src/lib.rs index ffa67aa945..84f512cd5f 100644 --- a/ioxd_compactor/src/lib.rs +++ b/ioxd_compactor/src/lib.rs @@ -24,6 +24,7 @@ use std::{ sync::Arc, }; use thiserror::Error; +use tokio_util::sync::CancellationToken; use trace::TraceCollector; #[derive(Debug, Error)] @@ -99,7 +100,8 @@ impl ServerType for CompactorSe self.server.join().await; } - fn shutdown(&self) { + fn shutdown(&self, frontend: CancellationToken) { + frontend.cancel(); self.server.shutdown(); } } diff --git a/ioxd_garbage_collector/Cargo.toml b/ioxd_garbage_collector/Cargo.toml index ac13716d3b..b0e4fbbf0a 100644 --- a/ioxd_garbage_collector/Cargo.toml +++ b/ioxd_garbage_collector/Cargo.toml @@ -17,3 +17,4 @@ snafu = "0.7" tokio = { version = "1", features = ["sync"] } trace = { path = "../trace" } workspace-hack = { path = "../workspace-hack"} +tokio-util = "0.7.4" diff --git a/ioxd_garbage_collector/src/lib.rs b/ioxd_garbage_collector/src/lib.rs index f009e2fad6..f64fac727e 100644 --- a/ioxd_garbage_collector/src/lib.rs +++ b/ioxd_garbage_collector/src/lib.rs @@ -35,6 +35,7 @@ use metric::Registry; use snafu::prelude::*; use std::{fmt::Debug, sync::Arc, time::Duration}; use tokio::{select, sync::broadcast, task::JoinError, time}; +use tokio_util::sync::CancellationToken; use trace::TraceCollector; pub use garbage_collector::{Config, SubConfig}; @@ -134,7 +135,8 @@ impl ServerType for Server { .unwrap_or_report(); } - fn shutdown(&self) { + fn shutdown(&self, frontend: CancellationToken) { + frontend.cancel(); self.shutdown_tx.send(()).ok(); } } diff --git a/ioxd_ingest_replica/Cargo.toml b/ioxd_ingest_replica/Cargo.toml index c8d621ad03..d36b66ba3d 100644 --- a/ioxd_ingest_replica/Cargo.toml +++ b/ioxd_ingest_replica/Cargo.toml @@ -17,6 +17,6 @@ metric = { path = "../metric" } parquet_file = { version = "0.1.0", path = "../parquet_file" } thiserror = "1.0.38" tokio = { version = "1.22", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] } -tokio-util = { version = "0.7.4" } +tokio-util = "0.7.4" trace = { path = "../trace" } workspace-hack = { path = "../workspace-hack"} diff --git a/ioxd_ingest_replica/src/lib.rs b/ioxd_ingest_replica/src/lib.rs index 8b9e5b7465..ab643a08cc 100644 --- a/ioxd_ingest_replica/src/lib.rs +++ b/ioxd_ingest_replica/src/lib.rs @@ -101,7 +101,8 @@ impl ServerType self.shutdown.cancelled().await; } - fn shutdown(&self) { + fn shutdown(&self, frontend: CancellationToken) { + frontend.cancel(); self.shutdown.cancel(); } } diff --git a/ioxd_ingester/Cargo.toml b/ioxd_ingester/Cargo.toml index aaac4bba2b..c7fbfcf216 100644 --- a/ioxd_ingester/Cargo.toml +++ b/ioxd_ingester/Cargo.toml @@ -23,3 +23,4 @@ async-trait = "0.1" hyper = "0.14" thiserror = "1.0.38" workspace-hack = { path = "../workspace-hack"} +tokio-util = "0.7.4" diff --git a/ioxd_ingester/src/lib.rs b/ioxd_ingester/src/lib.rs index 6f9f91fb34..1b147cd13e 100644 --- a/ioxd_ingester/src/lib.rs +++ b/ioxd_ingester/src/lib.rs @@ -26,6 +26,7 @@ use std::{ time::Duration, }; use thiserror::Error; +use tokio_util::sync::CancellationToken; use trace::TraceCollector; #[derive(Debug, Error)] @@ -105,7 +106,8 @@ impl ServerType for IngesterSe self.server.join().await; } - fn shutdown(&self) { + fn shutdown(&self, frontend: CancellationToken) { + frontend.cancel(); self.server.shutdown(); } } diff --git a/ioxd_ingester2/Cargo.toml b/ioxd_ingester2/Cargo.toml index 573ba278ca..3d94bcc329 100644 --- a/ioxd_ingester2/Cargo.toml +++ b/ioxd_ingester2/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true [dependencies] # In alphabetical order async-trait = "0.1" clap_blocks = { path = "../clap_blocks" } +futures = "0.3.25" hyper = "0.14" ingester2 = { path = "../ingester2" } iox_catalog = { path = "../iox_catalog" } diff --git a/ioxd_ingester2/src/lib.rs b/ioxd_ingester2/src/lib.rs index 23584c2bfc..685c6cbe85 100644 --- a/ioxd_ingester2/src/lib.rs +++ b/ioxd_ingester2/src/lib.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use clap_blocks::ingester2::Ingester2Config; +use futures::FutureExt; use hyper::{Body, Request, Response}; use ingester2::{IngesterGuard, IngesterRpcInterface}; use iox_catalog::interface::Catalog; @@ -16,10 +17,11 @@ use metric::Registry; use parquet_file::storage::ParquetStorage; use std::{ fmt::{Debug, Display}, - sync::Arc, + sync::{Arc, Mutex}, time::Duration, }; use thiserror::Error; +use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; use trace::TraceCollector; @@ -33,7 +35,7 @@ pub type Result = std::result::Result; struct IngesterServerType { server: IngesterGuard, - shutdown: CancellationToken, + shutdown: Mutex>>, metrics: Arc, trace_collector: Option>, max_simultaneous_queries: usize, @@ -45,11 +47,11 @@ impl IngesterServerType { metrics: Arc, common_state: &CommonServerState, max_simultaneous_queries: usize, - shutdown: CancellationToken, + shutdown: oneshot::Sender, ) -> Self { Self { server, - shutdown, + shutdown: Mutex::new(Some(shutdown)), metrics, trace_collector: common_state.trace_collector(), max_simultaneous_queries, @@ -105,8 +107,15 @@ impl ServerType for Ing self.server.join().await; } - fn shutdown(&self) { - self.shutdown.cancel(); + fn shutdown(&self, frontend: CancellationToken) { + if let Some(c) = self + .shutdown + .lock() + .expect("shutdown mutex poisoned") + .take() + { + let _ = c.send(frontend); + } } } @@ -149,7 +158,7 @@ pub async fn create_ingester_server_type( exec: Arc, object_store: ParquetStorage, ) -> Result> { - let shutdown = CancellationToken::new(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); let grpc = ingester2::new( catalog, @@ -162,10 +171,7 @@ pub async fn create_ingester_server_type( ingester_config.persist_queue_depth, ingester_config.persist_hot_partition_cost, object_store, - { - let shutdown = shutdown.clone(); - async move { shutdown.cancelled().await } - }, + shutdown_rx.map(|v| v.expect("shutdown sender dropped without calling shutdown")), ) .await?; @@ -174,6 +180,6 @@ pub async fn create_ingester_server_type( metrics, common_state, ingester_config.concurrent_query_limit, - shutdown, + shutdown_tx, ))) } diff --git a/ioxd_querier/Cargo.toml b/ioxd_querier/Cargo.toml index 80d6dd5757..bde8eeaa8d 100644 --- a/ioxd_querier/Cargo.toml +++ b/ioxd_querier/Cargo.toml @@ -32,6 +32,7 @@ tokio = { version = "1.24", features = ["macros", "net", "parking_lot", "rt-mult tonic = "0.8" workspace-hack = { path = "../workspace-hack"} parquet_file = { version = "0.1.0", path = "../parquet_file" } +tokio-util = "0.7.4" [dev-dependencies] # Workspace dependencies, in alphabetical order diff --git a/ioxd_querier/src/lib.rs b/ioxd_querier/src/lib.rs index bf5176fbc7..19a0add7b9 100644 --- a/ioxd_querier/src/lib.rs +++ b/ioxd_querier/src/lib.rs @@ -24,6 +24,7 @@ use std::{ }; use thiserror::Error; use tokio::runtime::Handle; +use tokio_util::sync::CancellationToken; use trace::TraceCollector; mod rpc; @@ -106,7 +107,8 @@ impl ServerType for QuerierServer self.server.join().await; } - fn shutdown(&self) { + fn shutdown(&self, frontend: CancellationToken) { + frontend.cancel(); self.server.shutdown(); } } diff --git a/ioxd_router/src/lib.rs b/ioxd_router/src/lib.rs index b5bbadbd9e..23eb27b113 100644 --- a/ioxd_router/src/lib.rs +++ b/ioxd_router/src/lib.rs @@ -141,7 +141,8 @@ where self.shutdown.cancelled().await; } - fn shutdown(&self) { + fn shutdown(&self, frontend: CancellationToken) { + frontend.cancel(); self.shutdown.cancel(); } } @@ -216,7 +217,8 @@ where self.shutdown.cancelled().await; } - fn shutdown(&self) { + fn shutdown(&self, frontend: CancellationToken) { + frontend.cancel(); self.shutdown.cancel(); } } diff --git a/ioxd_test/Cargo.toml b/ioxd_test/Cargo.toml index 2c9c8747b5..3e7ce2bc2c 100644 --- a/ioxd_test/Cargo.toml +++ b/ioxd_test/Cargo.toml @@ -16,5 +16,5 @@ async-trait = "0.1" clap = { version = "4", features = ["derive", "env"] } hyper = "0.14" snafu = "0.7" -tokio-util = { version = "0.7.4" } +tokio-util = "0.7.4" workspace-hack = { path = "../workspace-hack"} diff --git a/ioxd_test/src/lib.rs b/ioxd_test/src/lib.rs index 0d14a32f07..d15096c101 100644 --- a/ioxd_test/src/lib.rs +++ b/ioxd_test/src/lib.rs @@ -104,7 +104,8 @@ impl ServerType for TestServerType { self.shutdown.cancelled().await; } - fn shutdown(&self) { + fn shutdown(&self, frontend: CancellationToken) { + frontend.cancel(); self.shutdown.cancel(); } }