refactor: delegate frontend shutdown to backend
Prior to this commit, the (happy path) shutdown sequence of an IOx process was hard coded to: 1. Stop gRPC & HTTP servers 2. Stop backend server (i.e. ingester2) After this commit, the execution of step 1 is delegated to the handler for step 2; the server implementation (router / ingester / querier / etc) now chooses when to shut down the RPC & HTTP servers. This allows the server shutdown delegate to correctly sequence the shutdown of all components of the IOx server. This allows ingester2 to correctly sequence the shutdown of the query RPC server w.r.t the graceful stop & persist, ensuring queries continue to be serviced.pull/24376/head
parent
7e3bb25815
commit
0d111c4672
|
@ -2642,6 +2642,7 @@ dependencies = [
|
||||||
"test_helpers",
|
"test_helpers",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tokio-util",
|
||||||
"tonic",
|
"tonic",
|
||||||
"trace",
|
"trace",
|
||||||
"uuid",
|
"uuid",
|
||||||
|
@ -2895,6 +2896,7 @@ dependencies = [
|
||||||
"object_store",
|
"object_store",
|
||||||
"parquet_file",
|
"parquet_file",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
|
"tokio-util",
|
||||||
"trace",
|
"trace",
|
||||||
"workspace-hack",
|
"workspace-hack",
|
||||||
]
|
]
|
||||||
|
@ -2912,6 +2914,7 @@ dependencies = [
|
||||||
"observability_deps",
|
"observability_deps",
|
||||||
"snafu",
|
"snafu",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tokio-util",
|
||||||
"trace",
|
"trace",
|
||||||
"workspace-hack",
|
"workspace-hack",
|
||||||
]
|
]
|
||||||
|
@ -2951,6 +2954,7 @@ dependencies = [
|
||||||
"metric",
|
"metric",
|
||||||
"object_store",
|
"object_store",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
|
"tokio-util",
|
||||||
"trace",
|
"trace",
|
||||||
"workspace-hack",
|
"workspace-hack",
|
||||||
"write_buffer",
|
"write_buffer",
|
||||||
|
@ -2962,6 +2966,7 @@ version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"clap_blocks",
|
"clap_blocks",
|
||||||
|
"futures",
|
||||||
"hyper",
|
"hyper",
|
||||||
"ingester2",
|
"ingester2",
|
||||||
"iox_catalog",
|
"iox_catalog",
|
||||||
|
@ -3001,6 +3006,7 @@ dependencies = [
|
||||||
"sharder",
|
"sharder",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tokio-util",
|
||||||
"tonic",
|
"tonic",
|
||||||
"trace",
|
"trace",
|
||||||
"workspace-hack",
|
"workspace-hack",
|
||||||
|
|
|
@ -48,6 +48,7 @@ trace = { version = "0.1.0", path = "../trace" }
|
||||||
uuid = "1.2.2"
|
uuid = "1.2.2"
|
||||||
wal = { version = "0.1.0", path = "../wal" }
|
wal = { version = "0.1.0", path = "../wal" }
|
||||||
workspace-hack = { path = "../workspace-hack"}
|
workspace-hack = { path = "../workspace-hack"}
|
||||||
|
tokio-util = "0.7.4"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
assert_matches = "1.5.0"
|
assert_matches = "1.5.0"
|
||||||
|
|
|
@ -20,6 +20,7 @@ use observability_deps::tracing::*;
|
||||||
use parquet_file::storage::ParquetStorage;
|
use parquet_file::storage::ParquetStorage;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use wal::Wal;
|
use wal::Wal;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
@ -198,7 +199,7 @@ pub async fn new<F>(
|
||||||
shutdown: F,
|
shutdown: F,
|
||||||
) -> Result<IngesterGuard<impl IngesterRpcInterface>, InitError>
|
) -> Result<IngesterGuard<impl IngesterRpcInterface>, InitError>
|
||||||
where
|
where
|
||||||
F: Future<Output = ()> + Send + 'static,
|
F: Future<Output = CancellationToken> + Send + 'static,
|
||||||
{
|
{
|
||||||
// Create the transition shard.
|
// Create the transition shard.
|
||||||
let mut txn = catalog
|
let mut txn = catalog
|
||||||
|
|
|
@ -3,6 +3,7 @@ use std::{sync::Arc, time::Duration};
|
||||||
use futures::Future;
|
use futures::Future;
|
||||||
use observability_deps::tracing::*;
|
use observability_deps::tracing::*;
|
||||||
use tokio::sync::oneshot;
|
use tokio::sync::oneshot;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
ingest_state::{IngestState, IngestStateError},
|
ingest_state::{IngestState, IngestStateError},
|
||||||
|
@ -40,11 +41,12 @@ pub(super) async fn graceful_shutdown_handler<F, T, P>(
|
||||||
persist: P,
|
persist: P,
|
||||||
wal: Arc<wal::Wal>,
|
wal: Arc<wal::Wal>,
|
||||||
) where
|
) where
|
||||||
F: Future<Output = ()> + Send,
|
F: Future<Output = CancellationToken> + Send,
|
||||||
T: PartitionIter + Sync,
|
T: PartitionIter + Sync,
|
||||||
P: PersistQueue + Clone,
|
P: PersistQueue + Clone,
|
||||||
{
|
{
|
||||||
fut.await;
|
// Obtain the cancellation token that stops the RPC server.
|
||||||
|
let rpc_server_stop = fut.await;
|
||||||
info!("gracefully stopping ingester");
|
info!("gracefully stopping ingester");
|
||||||
|
|
||||||
// Reject RPC writes.
|
// Reject RPC writes.
|
||||||
|
@ -118,6 +120,10 @@ pub(super) async fn graceful_shutdown_handler<F, T, P>(
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("persisted all data - stopping ingester");
|
info!("persisted all data - stopping ingester");
|
||||||
|
|
||||||
|
// Stop the RPC server (and therefore stop accepting new queries)
|
||||||
|
rpc_server_stop.cancel();
|
||||||
|
// And signal the ingester has stopped.
|
||||||
let _ = complete.send(());
|
let _ = complete.send(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -197,9 +203,10 @@ mod tests {
|
||||||
let (_tempdir, wal) = new_wal().await;
|
let (_tempdir, wal) = new_wal().await;
|
||||||
let partition = new_partition();
|
let partition = new_partition();
|
||||||
|
|
||||||
|
let rpc_stop = CancellationToken::new();
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
graceful_shutdown_handler(
|
graceful_shutdown_handler(
|
||||||
ready(()),
|
ready(rpc_stop.clone()),
|
||||||
tx,
|
tx,
|
||||||
ingest_state,
|
ingest_state,
|
||||||
vec![Arc::clone(&partition)],
|
vec![Arc::clone(&partition)],
|
||||||
|
@ -213,6 +220,8 @@ mod tests {
|
||||||
.await
|
.await
|
||||||
.expect("shutdown task panicked");
|
.expect("shutdown task panicked");
|
||||||
|
|
||||||
|
assert!(rpc_stop.is_cancelled());
|
||||||
|
|
||||||
// Assert the data was persisted
|
// Assert the data was persisted
|
||||||
let persist_calls = persist.calls();
|
let persist_calls = persist.calls();
|
||||||
assert_matches!(&*persist_calls, [p] => {
|
assert_matches!(&*persist_calls, [p] => {
|
||||||
|
@ -238,9 +247,10 @@ mod tests {
|
||||||
|
|
||||||
// Start the graceful shutdown job in another thread, as it SHOULD block
|
// Start the graceful shutdown job in another thread, as it SHOULD block
|
||||||
// until the persist job is marked as complete.
|
// until the persist job is marked as complete.
|
||||||
|
let rpc_stop = CancellationToken::new();
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
let handle = tokio::spawn(graceful_shutdown_handler(
|
let handle = tokio::spawn(graceful_shutdown_handler(
|
||||||
ready(()),
|
ready(rpc_stop.clone()),
|
||||||
tx,
|
tx,
|
||||||
ingest_state,
|
ingest_state,
|
||||||
vec![Arc::clone(&partition)],
|
vec![Arc::clone(&partition)],
|
||||||
|
@ -260,6 +270,10 @@ mod tests {
|
||||||
let rx = rx.shared();
|
let rx = rx.shared();
|
||||||
assert_matches!(futures::poll!(rx.clone()), Poll::Pending);
|
assert_matches!(futures::poll!(rx.clone()), Poll::Pending);
|
||||||
|
|
||||||
|
// And because the shutdown is still ongoing, the RPC server must not
|
||||||
|
// have been signalled to stop.
|
||||||
|
assert!(!rpc_stop.is_cancelled());
|
||||||
|
|
||||||
// Mark the persist job as having completed, unblocking the shutdown
|
// Mark the persist job as having completed, unblocking the shutdown
|
||||||
// task.
|
// task.
|
||||||
partition.lock().mark_persisted(persist_job);
|
partition.lock().mark_persisted(persist_job);
|
||||||
|
@ -269,6 +283,8 @@ mod tests {
|
||||||
.await
|
.await
|
||||||
.expect("shutdown task panicked");
|
.expect("shutdown task panicked");
|
||||||
|
|
||||||
|
assert!(rpc_stop.is_cancelled());
|
||||||
|
|
||||||
assert!(handle
|
assert!(handle
|
||||||
.with_timeout_panic(Duration::from_secs(1))
|
.with_timeout_panic(Duration::from_secs(1))
|
||||||
.await
|
.await
|
||||||
|
@ -331,9 +347,10 @@ mod tests {
|
||||||
|
|
||||||
// Start the graceful shutdown job in another thread, as it SHOULD block
|
// Start the graceful shutdown job in another thread, as it SHOULD block
|
||||||
// until the persist job is marked as complete.
|
// until the persist job is marked as complete.
|
||||||
|
let rpc_stop = CancellationToken::new();
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
let handle = tokio::spawn(graceful_shutdown_handler(
|
let handle = tokio::spawn(graceful_shutdown_handler(
|
||||||
ready(()),
|
ready(rpc_stop.clone()),
|
||||||
tx,
|
tx,
|
||||||
ingest_state,
|
ingest_state,
|
||||||
Arc::clone(&buffer),
|
Arc::clone(&buffer),
|
||||||
|
@ -346,6 +363,8 @@ mod tests {
|
||||||
.await
|
.await
|
||||||
.expect("shutdown task panicked");
|
.expect("shutdown task panicked");
|
||||||
|
|
||||||
|
assert!(rpc_stop.is_cancelled());
|
||||||
|
|
||||||
assert!(handle
|
assert!(handle
|
||||||
.with_timeout_panic(Duration::from_secs(1))
|
.with_timeout_panic(Duration::from_secs(1))
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -204,15 +204,24 @@ pub async fn serve(
|
||||||
// registry, don't exit before HTTP and gRPC requests dependent on them
|
// registry, don't exit before HTTP and gRPC requests dependent on them
|
||||||
while !grpc_server.is_terminated() && !http_server.is_terminated() {
|
while !grpc_server.is_terminated() && !http_server.is_terminated() {
|
||||||
futures::select! {
|
futures::select! {
|
||||||
_ = signal => info!(?server_type, "Shutdown requested"),
|
_ = signal => info!(?server_type, "shutdown requested"),
|
||||||
_ = server_handle => {
|
_ = server_handle => {
|
||||||
error!(?server_type, "server worker shutdown prematurely");
|
// If the frontend & backend stop together, the select! may
|
||||||
|
// choose to follow the "background has shutdown" signal instead
|
||||||
|
// of one of the frontend paths.
|
||||||
|
//
|
||||||
|
// This should not be a problem so long as the frontend has
|
||||||
|
// stopped.
|
||||||
|
if frontend_shutdown.is_cancelled() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
error!(?server_type, "server worker shutdown before frontend");
|
||||||
res = res.and(Err(Error::LostServer));
|
res = res.and(Err(Error::LostServer));
|
||||||
},
|
},
|
||||||
result = grpc_server => match result {
|
result = grpc_server => match result {
|
||||||
Ok(_) if frontend_shutdown.is_cancelled() => info!(?server_type, "gRPC server shutdown"),
|
Ok(_) if frontend_shutdown.is_cancelled() => info!(?server_type, "gRPC server shutdown"),
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
error!(?server_type, "Early gRPC server exit");
|
error!(?server_type, "early gRPC server exit");
|
||||||
res = res.and(Err(Error::LostRpc));
|
res = res.and(Err(Error::LostRpc));
|
||||||
}
|
}
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
|
@ -223,7 +232,7 @@ pub async fn serve(
|
||||||
result = http_server => match result {
|
result = http_server => match result {
|
||||||
Ok(_) if frontend_shutdown.is_cancelled() => info!(?server_type, "HTTP server shutdown"),
|
Ok(_) if frontend_shutdown.is_cancelled() => info!(?server_type, "HTTP server shutdown"),
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
error!(?server_type, "Early HTTP server exit");
|
error!(?server_type, "early HTTP server exit");
|
||||||
res = res.and(Err(Error::LostHttp));
|
res = res.and(Err(Error::LostHttp));
|
||||||
}
|
}
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
|
@ -233,11 +242,13 @@ pub async fn serve(
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
frontend_shutdown.cancel()
|
// Delegate shutting down the frontend to the background shutdown
|
||||||
|
// handler, allowing it to sequence the stopping of the RPC/HTTP
|
||||||
|
// servers as needed.
|
||||||
|
server_type.shutdown(frontend_shutdown.clone())
|
||||||
}
|
}
|
||||||
info!(?server_type, "frontend shutdown completed");
|
info!(?server_type, "frontend shutdown completed");
|
||||||
|
|
||||||
server_type.shutdown();
|
|
||||||
if !server_handle.is_terminated() {
|
if !server_handle.is_terminated() {
|
||||||
server_handle.await;
|
server_handle.await;
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ use async_trait::async_trait;
|
||||||
use hyper::{Body, Request, Response};
|
use hyper::{Body, Request, Response};
|
||||||
use metric::Registry;
|
use metric::Registry;
|
||||||
use snafu::Snafu;
|
use snafu::Snafu;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use trace::TraceCollector;
|
use trace::TraceCollector;
|
||||||
|
|
||||||
pub use common_state::{CommonServerState, CommonServerStateError};
|
pub use common_state::{CommonServerState, CommonServerStateError};
|
||||||
|
@ -60,5 +61,9 @@ pub trait ServerType: std::fmt::Debug + Send + Sync + 'static {
|
||||||
async fn join(self: Arc<Self>);
|
async fn join(self: Arc<Self>);
|
||||||
|
|
||||||
/// Shutdown background worker.
|
/// Shutdown background worker.
|
||||||
fn shutdown(&self);
|
///
|
||||||
|
/// The provided [`CancellationToken`] MUST be used by the background worker
|
||||||
|
/// to shutdown the "frontend" (HTTP & RPC servers) when appropriate - this
|
||||||
|
/// should happen before [`Self::join()`] returns.
|
||||||
|
fn shutdown(&self, frontend: CancellationToken);
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,3 +25,4 @@ hyper = "0.14"
|
||||||
thiserror = "1.0.38"
|
thiserror = "1.0.38"
|
||||||
workspace-hack = { path = "../workspace-hack"}
|
workspace-hack = { path = "../workspace-hack"}
|
||||||
parquet_file = { version = "0.1.0", path = "../parquet_file" }
|
parquet_file = { version = "0.1.0", path = "../parquet_file" }
|
||||||
|
tokio-util = "0.7.4"
|
||||||
|
|
|
@ -24,6 +24,7 @@ use std::{
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use trace::TraceCollector;
|
use trace::TraceCollector;
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
|
@ -99,7 +100,8 @@ impl<C: CompactorHandler + std::fmt::Debug + 'static> ServerType for CompactorSe
|
||||||
self.server.join().await;
|
self.server.join().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shutdown(&self) {
|
fn shutdown(&self, frontend: CancellationToken) {
|
||||||
|
frontend.cancel();
|
||||||
self.server.shutdown();
|
self.server.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,3 +17,4 @@ snafu = "0.7"
|
||||||
tokio = { version = "1", features = ["sync"] }
|
tokio = { version = "1", features = ["sync"] }
|
||||||
trace = { path = "../trace" }
|
trace = { path = "../trace" }
|
||||||
workspace-hack = { path = "../workspace-hack"}
|
workspace-hack = { path = "../workspace-hack"}
|
||||||
|
tokio-util = "0.7.4"
|
||||||
|
|
|
@ -35,6 +35,7 @@ use metric::Registry;
|
||||||
use snafu::prelude::*;
|
use snafu::prelude::*;
|
||||||
use std::{fmt::Debug, sync::Arc, time::Duration};
|
use std::{fmt::Debug, sync::Arc, time::Duration};
|
||||||
use tokio::{select, sync::broadcast, task::JoinError, time};
|
use tokio::{select, sync::broadcast, task::JoinError, time};
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use trace::TraceCollector;
|
use trace::TraceCollector;
|
||||||
|
|
||||||
pub use garbage_collector::{Config, SubConfig};
|
pub use garbage_collector::{Config, SubConfig};
|
||||||
|
@ -134,7 +135,8 @@ impl ServerType for Server {
|
||||||
.unwrap_or_report();
|
.unwrap_or_report();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shutdown(&self) {
|
fn shutdown(&self, frontend: CancellationToken) {
|
||||||
|
frontend.cancel();
|
||||||
self.shutdown_tx.send(()).ok();
|
self.shutdown_tx.send(()).ok();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,6 @@ metric = { path = "../metric" }
|
||||||
parquet_file = { version = "0.1.0", path = "../parquet_file" }
|
parquet_file = { version = "0.1.0", path = "../parquet_file" }
|
||||||
thiserror = "1.0.38"
|
thiserror = "1.0.38"
|
||||||
tokio = { version = "1.22", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
|
tokio = { version = "1.22", features = ["macros", "net", "parking_lot", "rt-multi-thread", "signal", "sync", "time"] }
|
||||||
tokio-util = { version = "0.7.4" }
|
tokio-util = "0.7.4"
|
||||||
trace = { path = "../trace" }
|
trace = { path = "../trace" }
|
||||||
workspace-hack = { path = "../workspace-hack"}
|
workspace-hack = { path = "../workspace-hack"}
|
||||||
|
|
|
@ -101,7 +101,8 @@ impl<I: IngestReplicaRpcInterface + Sync + Send + Debug + 'static> ServerType
|
||||||
self.shutdown.cancelled().await;
|
self.shutdown.cancelled().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shutdown(&self) {
|
fn shutdown(&self, frontend: CancellationToken) {
|
||||||
|
frontend.cancel();
|
||||||
self.shutdown.cancel();
|
self.shutdown.cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,3 +23,4 @@ async-trait = "0.1"
|
||||||
hyper = "0.14"
|
hyper = "0.14"
|
||||||
thiserror = "1.0.38"
|
thiserror = "1.0.38"
|
||||||
workspace-hack = { path = "../workspace-hack"}
|
workspace-hack = { path = "../workspace-hack"}
|
||||||
|
tokio-util = "0.7.4"
|
||||||
|
|
|
@ -26,6 +26,7 @@ use std::{
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use trace::TraceCollector;
|
use trace::TraceCollector;
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
#[derive(Debug, Error)]
|
||||||
|
@ -105,7 +106,8 @@ impl<I: IngestHandler + Sync + Send + Debug + 'static> ServerType for IngesterSe
|
||||||
self.server.join().await;
|
self.server.join().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shutdown(&self) {
|
fn shutdown(&self, frontend: CancellationToken) {
|
||||||
|
frontend.cancel();
|
||||||
self.server.shutdown();
|
self.server.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ license.workspace = true
|
||||||
[dependencies] # In alphabetical order
|
[dependencies] # In alphabetical order
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
clap_blocks = { path = "../clap_blocks" }
|
clap_blocks = { path = "../clap_blocks" }
|
||||||
|
futures = "0.3.25"
|
||||||
hyper = "0.14"
|
hyper = "0.14"
|
||||||
ingester2 = { path = "../ingester2" }
|
ingester2 = { path = "../ingester2" }
|
||||||
iox_catalog = { path = "../iox_catalog" }
|
iox_catalog = { path = "../iox_catalog" }
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use clap_blocks::ingester2::Ingester2Config;
|
use clap_blocks::ingester2::Ingester2Config;
|
||||||
|
use futures::FutureExt;
|
||||||
use hyper::{Body, Request, Response};
|
use hyper::{Body, Request, Response};
|
||||||
use ingester2::{IngesterGuard, IngesterRpcInterface};
|
use ingester2::{IngesterGuard, IngesterRpcInterface};
|
||||||
use iox_catalog::interface::Catalog;
|
use iox_catalog::interface::Catalog;
|
||||||
|
@ -16,10 +17,11 @@ use metric::Registry;
|
||||||
use parquet_file::storage::ParquetStorage;
|
use parquet_file::storage::ParquetStorage;
|
||||||
use std::{
|
use std::{
|
||||||
fmt::{Debug, Display},
|
fmt::{Debug, Display},
|
||||||
sync::Arc,
|
sync::{Arc, Mutex},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
use tokio::sync::oneshot;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
use trace::TraceCollector;
|
use trace::TraceCollector;
|
||||||
|
|
||||||
|
@ -33,7 +35,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
|
||||||
struct IngesterServerType<I: IngesterRpcInterface> {
|
struct IngesterServerType<I: IngesterRpcInterface> {
|
||||||
server: IngesterGuard<I>,
|
server: IngesterGuard<I>,
|
||||||
shutdown: CancellationToken,
|
shutdown: Mutex<Option<oneshot::Sender<CancellationToken>>>,
|
||||||
metrics: Arc<Registry>,
|
metrics: Arc<Registry>,
|
||||||
trace_collector: Option<Arc<dyn TraceCollector>>,
|
trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||||
max_simultaneous_queries: usize,
|
max_simultaneous_queries: usize,
|
||||||
|
@ -45,11 +47,11 @@ impl<I: IngesterRpcInterface> IngesterServerType<I> {
|
||||||
metrics: Arc<Registry>,
|
metrics: Arc<Registry>,
|
||||||
common_state: &CommonServerState,
|
common_state: &CommonServerState,
|
||||||
max_simultaneous_queries: usize,
|
max_simultaneous_queries: usize,
|
||||||
shutdown: CancellationToken,
|
shutdown: oneshot::Sender<CancellationToken>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
server,
|
server,
|
||||||
shutdown,
|
shutdown: Mutex::new(Some(shutdown)),
|
||||||
metrics,
|
metrics,
|
||||||
trace_collector: common_state.trace_collector(),
|
trace_collector: common_state.trace_collector(),
|
||||||
max_simultaneous_queries,
|
max_simultaneous_queries,
|
||||||
|
@ -105,8 +107,15 @@ impl<I: IngesterRpcInterface + Sync + Send + Debug + 'static> ServerType for Ing
|
||||||
self.server.join().await;
|
self.server.join().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shutdown(&self) {
|
fn shutdown(&self, frontend: CancellationToken) {
|
||||||
self.shutdown.cancel();
|
if let Some(c) = self
|
||||||
|
.shutdown
|
||||||
|
.lock()
|
||||||
|
.expect("shutdown mutex poisoned")
|
||||||
|
.take()
|
||||||
|
{
|
||||||
|
let _ = c.send(frontend);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,7 +158,7 @@ pub async fn create_ingester_server_type(
|
||||||
exec: Arc<Executor>,
|
exec: Arc<Executor>,
|
||||||
object_store: ParquetStorage,
|
object_store: ParquetStorage,
|
||||||
) -> Result<Arc<dyn ServerType>> {
|
) -> Result<Arc<dyn ServerType>> {
|
||||||
let shutdown = CancellationToken::new();
|
let (shutdown_tx, shutdown_rx) = oneshot::channel();
|
||||||
|
|
||||||
let grpc = ingester2::new(
|
let grpc = ingester2::new(
|
||||||
catalog,
|
catalog,
|
||||||
|
@ -162,10 +171,7 @@ pub async fn create_ingester_server_type(
|
||||||
ingester_config.persist_queue_depth,
|
ingester_config.persist_queue_depth,
|
||||||
ingester_config.persist_hot_partition_cost,
|
ingester_config.persist_hot_partition_cost,
|
||||||
object_store,
|
object_store,
|
||||||
{
|
shutdown_rx.map(|v| v.expect("shutdown sender dropped without calling shutdown")),
|
||||||
let shutdown = shutdown.clone();
|
|
||||||
async move { shutdown.cancelled().await }
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -174,6 +180,6 @@ pub async fn create_ingester_server_type(
|
||||||
metrics,
|
metrics,
|
||||||
common_state,
|
common_state,
|
||||||
ingester_config.concurrent_query_limit,
|
ingester_config.concurrent_query_limit,
|
||||||
shutdown,
|
shutdown_tx,
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ tokio = { version = "1.24", features = ["macros", "net", "parking_lot", "rt-mult
|
||||||
tonic = "0.8"
|
tonic = "0.8"
|
||||||
workspace-hack = { path = "../workspace-hack"}
|
workspace-hack = { path = "../workspace-hack"}
|
||||||
parquet_file = { version = "0.1.0", path = "../parquet_file" }
|
parquet_file = { version = "0.1.0", path = "../parquet_file" }
|
||||||
|
tokio-util = "0.7.4"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
# Workspace dependencies, in alphabetical order
|
# Workspace dependencies, in alphabetical order
|
||||||
|
|
|
@ -24,6 +24,7 @@ use std::{
|
||||||
};
|
};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tokio::runtime::Handle;
|
use tokio::runtime::Handle;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
use trace::TraceCollector;
|
use trace::TraceCollector;
|
||||||
|
|
||||||
mod rpc;
|
mod rpc;
|
||||||
|
@ -106,7 +107,8 @@ impl<C: QuerierHandler + std::fmt::Debug + 'static> ServerType for QuerierServer
|
||||||
self.server.join().await;
|
self.server.join().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shutdown(&self) {
|
fn shutdown(&self, frontend: CancellationToken) {
|
||||||
|
frontend.cancel();
|
||||||
self.server.shutdown();
|
self.server.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -141,7 +141,8 @@ where
|
||||||
self.shutdown.cancelled().await;
|
self.shutdown.cancelled().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shutdown(&self) {
|
fn shutdown(&self, frontend: CancellationToken) {
|
||||||
|
frontend.cancel();
|
||||||
self.shutdown.cancel();
|
self.shutdown.cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -216,7 +217,8 @@ where
|
||||||
self.shutdown.cancelled().await;
|
self.shutdown.cancelled().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shutdown(&self) {
|
fn shutdown(&self, frontend: CancellationToken) {
|
||||||
|
frontend.cancel();
|
||||||
self.shutdown.cancel();
|
self.shutdown.cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,5 +16,5 @@ async-trait = "0.1"
|
||||||
clap = { version = "4", features = ["derive", "env"] }
|
clap = { version = "4", features = ["derive", "env"] }
|
||||||
hyper = "0.14"
|
hyper = "0.14"
|
||||||
snafu = "0.7"
|
snafu = "0.7"
|
||||||
tokio-util = { version = "0.7.4" }
|
tokio-util = "0.7.4"
|
||||||
workspace-hack = { path = "../workspace-hack"}
|
workspace-hack = { path = "../workspace-hack"}
|
||||||
|
|
|
@ -104,7 +104,8 @@ impl ServerType for TestServerType {
|
||||||
self.shutdown.cancelled().await;
|
self.shutdown.cancelled().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shutdown(&self) {
|
fn shutdown(&self, frontend: CancellationToken) {
|
||||||
|
frontend.cancel();
|
||||||
self.shutdown.cancel();
|
self.shutdown.cancel();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue