fix: Restart database background worker when it's restored

pull/24376/head
Carol (Nichols || Goulding) 2021-09-13 14:16:22 -04:00
parent 7c81c280cf
commit 81feced9d6
1 changed files with 28 additions and 9 deletions

View File

@ -122,7 +122,7 @@ pub enum WriteError {
#[derive(Debug)]
pub struct Database {
/// Future that resolves when the background worker exits
join: Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>,
join: Mutex<Shared<BoxFuture<'static, Result<(), Arc<JoinError>>>>>,
/// The state shared with the background worker
shared: Arc<DatabaseShared>,
@ -162,7 +162,7 @@ impl Database {
});
let handle = tokio::spawn(background_worker(Arc::clone(&shared)));
let join = handle.map_err(Arc::new).boxed().shared();
let join = Mutex::new(handle.map_err(Arc::new).boxed().shared());
Self { join, shared }
}
@ -283,18 +283,26 @@ impl Database {
info!(%db_name, "set database state to object store found");
}
// Restart the background worker to re-initialize the database
let handle = tokio::spawn(background_worker(Arc::clone(&shared)));
let mut join = self.join.lock();
*join = handle.map_err(Arc::new).boxed().shared();
let mut shutdown = shared.shutdown.lock();
*shutdown = Default::default();
Ok(())
}
/// Triggers shutdown of this `Database`
pub fn shutdown(&self) {
info!(db_name=%self.shared.config.name, "database shutting down");
self.shared.shutdown.cancel()
let shutdown = self.shared.shutdown.lock();
shutdown.cancel()
}
/// Waits for the background worker of this `Database` to exit
pub fn join(&self) -> impl Future<Output = Result<(), Arc<JoinError>>> {
self.join.clone()
self.join.lock().clone()
}
/// Returns the config of this database
@ -575,12 +583,13 @@ impl Database {
impl Drop for Database {
fn drop(&mut self) {
let db_name = &self.shared.config.name;
if !self.shared.shutdown.is_cancelled() {
let shutdown = self.shared.shutdown.lock();
if !shutdown.is_cancelled() {
warn!(%db_name, "database dropped without calling shutdown()");
self.shared.shutdown.cancel();
shutdown.cancel();
}
if self.join.clone().now_or_never().is_none() {
if self.join.lock().clone().now_or_never().is_none() {
warn!(%db_name, "database dropped without waiting for worker termination");
}
}
@ -593,7 +602,7 @@ struct DatabaseShared {
config: DatabaseConfig,
/// A token that is used to trigger shutdown of the background worker
shutdown: CancellationToken,
shutdown: Mutex<CancellationToken>,
/// Application-global state
application: Arc<ApplicationState>,
@ -743,7 +752,12 @@ async fn initialize_database(shared: &DatabaseShared) {
let db_name = &shared.config.name;
info!(%db_name, "database initialization started");
while !shared.shutdown.is_cancelled() {
let mut shutdown_cancelled = {
let s = shared.shutdown.lock();
s.is_cancelled()
};
while !shutdown_cancelled {
// Acquire locks and determine if work to be done
let maybe_transaction = {
let state = shared.state.read();
@ -830,6 +844,11 @@ async fn initialize_database(shared: &DatabaseShared) {
*state.unfreeze(handle) = next_state;
shared.state_notify.notify_waiters();
}
shutdown_cancelled = {
let s = shared.shutdown.lock();
s.is_cancelled()
};
}
}