Merge pull request #4044 from influxdata/dom/compactor-shutdown
fix: propagate shutdown into CompactorHandlerpull/24376/head
commit
943cc3731c
|
@ -15,7 +15,6 @@ use metric::Registry;
|
||||||
use object_store::DynObjectStore;
|
use object_store::DynObjectStore;
|
||||||
use query::exec::Executor;
|
use query::exec::Executor;
|
||||||
use time::TimeProvider;
|
use time::TimeProvider;
|
||||||
use tokio_util::sync::CancellationToken;
|
|
||||||
use trace::TraceCollector;
|
use trace::TraceCollector;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
@ -27,7 +26,6 @@ use crate::{
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct CompactorServerType<C: CompactorHandler> {
|
pub struct CompactorServerType<C: CompactorHandler> {
|
||||||
server: CompactorServer<C>,
|
server: CompactorServer<C>,
|
||||||
shutdown: CancellationToken,
|
|
||||||
trace_collector: Option<Arc<dyn TraceCollector>>,
|
trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,7 +33,6 @@ impl<C: CompactorHandler> CompactorServerType<C> {
|
||||||
pub fn new(server: CompactorServer<C>, common_state: &CommonServerState) -> Self {
|
pub fn new(server: CompactorServer<C>, common_state: &CommonServerState) -> Self {
|
||||||
Self {
|
Self {
|
||||||
server,
|
server,
|
||||||
shutdown: CancellationToken::new(),
|
|
||||||
trace_collector: common_state.trace_collector(),
|
trace_collector: common_state.trace_collector(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -71,11 +68,11 @@ impl<C: CompactorHandler + std::fmt::Debug + 'static> ServerType for CompactorSe
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn join(self: Arc<Self>) {
|
async fn join(self: Arc<Self>) {
|
||||||
self.shutdown.cancelled().await;
|
self.server.join().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn shutdown(&self) {
|
fn shutdown(&self) {
|
||||||
self.shutdown.cancel();
|
self.server.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue