test: querier shutdown and background task handling (#3881)

This is similar to what we already have for the ingester.
pull/24376/head
Marco Neumann 2022-03-02 08:59:50 +00:00 committed by GitHub
parent 5f1095ef7d
commit 936f51013d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 186 additions and 3 deletions

1
Cargo.lock generated
View File

@ -3850,6 +3850,7 @@ dependencies = [
"observability_deps",
"parking_lot 0.12.0",
"parquet_file",
"pin-project",
"query",
"rand",
"schema",

View File

@ -20,6 +20,7 @@ object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
parquet_file = { path = "../parquet_file" }
pin-project = "1.0"
rand = "0.8.3"
thiserror = "1.0"
time = { path = "../time" }

View File

@ -7,12 +7,16 @@ use std::{
use backoff::{Backoff, BackoffConfig};
use iox_catalog::interface::{Catalog, NamespaceId};
use object_store::ObjectStore;
use observability_deps::tracing::info;
use observability_deps::tracing::{error, info};
use parking_lot::RwLock;
use time::TimeProvider;
use tokio_util::sync::CancellationToken;
use crate::{cache::CatalogCache, namespace::QuerierNamespace};
use crate::{
cache::CatalogCache,
namespace::QuerierNamespace,
poison::{PoisonCabinet, PoisonPill},
};
const SYNC_INTERVAL: Duration = Duration::from_secs(1);
@ -141,12 +145,20 @@ impl QuerierDatabase {
pub(crate) async fn database_sync_loop(
database: Arc<QuerierDatabase>,
shutdown: CancellationToken,
poison_cabinet: Arc<PoisonCabinet>,
) {
loop {
if shutdown.is_cancelled() {
info!("Database sync shutdown");
return;
}
if poison_cabinet.contains(&PoisonPill::DatabaseSyncPanic) {
panic!("Database sync poisened, panic");
}
if poison_cabinet.contains(&PoisonPill::DatabaseSyncExit) {
error!("Database sync poisened, exit early");
return;
}
database.sync().await;

View File

@ -15,7 +15,10 @@ use time::TimeProvider;
use tokio::task::{JoinError, JoinHandle};
use tokio_util::sync::CancellationToken;
use crate::database::{database_sync_loop, QuerierDatabase};
use crate::{
database::{database_sync_loop, QuerierDatabase},
poison::PoisonCabinet,
};
#[derive(Debug, Error)]
#[allow(missing_copy_implementations, missing_docs)]
@ -52,6 +55,9 @@ pub struct QuerierHandlerImpl {
/// A token that is used to trigger shutdown of the background worker
shutdown: CancellationToken,
/// Poison pills for testing.
poison_cabinet: Arc<PoisonCabinet>,
}
impl QuerierHandlerImpl {
@ -69,18 +75,21 @@ impl QuerierHandlerImpl {
time_provider,
));
let shutdown = CancellationToken::new();
let poison_cabinet = Arc::new(PoisonCabinet::new());
let join_handles = vec![(
String::from("database sync"),
shared_handle(tokio::spawn(database_sync_loop(
Arc::clone(&database),
shutdown.clone(),
Arc::clone(&poison_cabinet),
))),
)];
Self {
database,
join_handles,
shutdown,
poison_cabinet,
}
}
}
@ -118,3 +127,69 @@ impl Drop for QuerierHandlerImpl {
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use iox_catalog::mem::MemCatalog;
use time::{MockProvider, Time};
use crate::poison::PoisonPill;
use super::*;
#[tokio::test]
async fn test_shutdown() {
let querier = TestQuerier::new().querier;
// does not exit w/o shutdown
tokio::select! {
_ = querier.join() => panic!("querier finished w/o shutdown"),
_ = tokio::time::sleep(Duration::from_millis(10)) => {},
};
querier.shutdown();
tokio::time::timeout(Duration::from_millis(1000), querier.join())
.await
.unwrap();
}
#[tokio::test]
#[should_panic(expected = "Background worker 'database sync' exited early!")]
async fn test_supervise_database_sync_early_exit() {
let querier = TestQuerier::new().querier;
querier.poison_cabinet.add(PoisonPill::DatabaseSyncExit);
tokio::time::timeout(Duration::from_millis(2000), querier.join())
.await
.unwrap();
}
#[tokio::test]
#[should_panic(expected = "JoinError::Panic")]
async fn test_supervise_database_sync_panic() {
let querier = TestQuerier::new().querier;
querier.poison_cabinet.add(PoisonPill::DatabaseSyncPanic);
tokio::time::timeout(Duration::from_millis(2000), querier.join())
.await
.unwrap();
}
struct TestQuerier {
querier: QuerierHandlerImpl,
}
impl TestQuerier {
fn new() -> Self {
let metric_registry = Arc::new(metric::Registry::new());
let catalog = Arc::new(MemCatalog::new(Arc::clone(&metric_registry)));
let object_store = Arc::new(ObjectStore::new_in_memory());
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let querier =
QuerierHandlerImpl::new(catalog, metric_registry, object_store, time_provider);
Self { querier }
}
}
}

View File

@ -20,6 +20,7 @@ mod database;
pub mod flight;
pub mod handler;
mod namespace;
mod poison;
pub mod server;
#[cfg(test)]

93
querier/src/poison.rs Normal file
View File

@ -0,0 +1,93 @@
use std::{
sync::Arc,
task::{Poll, Waker},
};
use futures::Future;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use pin_project::pin_project;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PoisonPill {
DatabaseSyncPanic,
DatabaseSyncExit,
}
#[derive(Debug)]
struct PoisonCabinetInner {
pills: Vec<PoisonPill>,
wait_list: Vec<Waker>,
}
impl PoisonCabinetInner {
/// Register a waker to be notified when a new pill is added
fn register_waker(&mut self, waker: &Waker) {
for wait_waker in &self.wait_list {
if wait_waker.will_wake(waker) {
return;
}
}
self.wait_list.push(waker.clone())
}
}
#[derive(Debug)]
pub struct PoisonCabinet {
inner: Arc<RwLock<PoisonCabinetInner>>,
}
impl PoisonCabinet {
pub fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(PoisonCabinetInner {
pills: Vec::with_capacity(0),
wait_list: Vec::with_capacity(0),
})),
}
}
pub fn add(&self, pill: PoisonPill) {
let mut inner = self.inner.write();
inner.pills.push(pill);
for waker in inner.wait_list.drain(..) {
waker.wake()
}
}
pub fn contains(&self, pill: &PoisonPill) -> bool {
let inner = self.inner.read();
inner.pills.contains(pill)
}
pub fn wait_for(&self, pill: PoisonPill) -> PoisonWait {
PoisonWait {
pill,
inner: Arc::clone(&self.inner),
}
}
}
#[pin_project]
pub struct PoisonWait {
pill: PoisonPill,
inner: Arc<RwLock<PoisonCabinetInner>>,
}
impl Future for PoisonWait {
type Output = ();
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let inner = this.inner.upgradable_read();
if inner.pills.contains(this.pill) {
return Poll::Ready(());
}
let mut inner = RwLockUpgradableReadGuard::upgrade(inner);
inner.register_waker(cx.waker());
Poll::Pending
}
}