From 10a51bac6153b2c3b7bda30908ed74f68a42ab0f Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 13 Dec 2021 12:20:34 +0100 Subject: [PATCH 01/10] refactor: allow database init to recover from errors Closes #3335. --- server/src/database.rs | 381 ++++++++++++++++++++++++++--------------- 1 file changed, 242 insertions(+), 139 deletions(-) diff --git a/server/src/database.rs b/server/src/database.rs index c69ced3fc7..9f69f30adb 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -23,6 +23,7 @@ use observability_deps::tracing::{error, info, warn}; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard}; use parquet_catalog::core::{PreservedCatalog, PreservedCatalogConfig}; use persistence_windows::checkpoint::ReplayPlan; +use rand::{thread_rng, Rng}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::{future::Future, sync::Arc, time::Duration}; use time::Time; @@ -38,7 +39,7 @@ macro_rules! error_state { DatabaseState::$variant(state, _) => state.clone(), state => { return InvalidState { - db_name: &$s.shared.config.name, + db_name: &$s.shared.config.read().name, state: state.state_code(), transition: $transition, } @@ -49,6 +50,7 @@ macro_rules! error_state { } const INIT_BACKOFF: Duration = Duration::from_secs(1); +const MAX_BACKOFF: Duration = Duration::from_secs(500); #[derive(Debug, Snafu)] pub enum Error { @@ -176,7 +178,7 @@ impl Database { ); let shared = Arc::new(DatabaseShared { - config, + config: RwLock::new(config), application, shutdown: Default::default(), state: RwLock::new(Freezable::new(DatabaseState::Known(DatabaseStateKnown {}))), @@ -234,7 +236,7 @@ impl Database { /// Release this database from this server. pub async fn release(&self) -> Result { - let db_name = &self.shared.config.name; + let db_name = self.name(); let handle = self.shared.state.read().freeze(); let handle = handle.await; @@ -350,13 +352,14 @@ impl Database { /// Triggers shutdown of this `Database` pub fn shutdown(&self) { - info!(db_name=%self.shared.config.name, "database shutting down"); + let db_name = self.name(); + info!(%db_name, "database shutting down"); self.shared.shutdown.cancel() } /// Triggers a restart of this `Database` and wait for it to re-initialize pub async fn restart(&self) -> Result<(), Arc> { - let db_name = &self.shared.config.name; + let db_name = self.name(); info!(%db_name, "restarting database"); let handle = self.shared.state.read().freeze(); @@ -379,8 +382,8 @@ impl Database { } /// Returns the config of this database - pub fn config(&self) -> &DatabaseConfig { - &self.shared.config + pub fn config(&self) -> DatabaseConfig { + self.shared.config.read().clone() } /// Returns the initialization status of this database @@ -420,7 +423,12 @@ impl Database { /// Location in object store; may not actually exist yet pub fn location(&self) -> String { - self.shared.config.location.clone() + self.shared.config.read().location.clone() + } + + /// Database name + pub fn name(&self) -> DatabaseName<'static> { + self.shared.config.read().name.clone() } /// Update the database rules, panic'ing if the state is invalid @@ -538,7 +546,7 @@ impl Database { let handle = self.shared.state.read().freeze(); let handle = handle.await; - let db_name = &self.shared.config.name; + let db_name = self.name(); let current_state = error_state!(self, "WipePreservedCatalog", CatalogLoadError); let registry = self.shared.application.job_registry(); @@ -550,8 +558,6 @@ impl Database { tokio::spawn( async move { - let db_name = &shared.config.name; - PreservedCatalog::wipe(¤t_state.iox_object_store) .await .map_err(Box::new) @@ -577,7 +583,7 @@ impl Database { let handle = self.shared.state.read().freeze(); let handle = handle.await; - let db_name = &self.shared.config.name; + let db_name = self.name(); { // If the force flag is not specified, can only rebuild @@ -600,15 +606,13 @@ impl Database { let shared = Arc::clone(&self.shared); let iox_object_store = self.iox_object_store().context(InvalidStateForRebuild { - db_name, + db_name: &db_name, state: shared.state.read().state_code(), expected: "Object store initialized", })?; tokio::spawn( async move { - let db_name = &shared.config.name; - // shutdown / stop the DB if it is running so it can't // be read / written to, while also preventing // anything else from driving the state machine @@ -636,7 +640,7 @@ impl Database { ensure!( matches!(&**state, DatabaseState::Known(_)), UnexpectedTransitionForRebuild { - db_name, + db_name: &db_name, state: state.state_code() } ); @@ -648,7 +652,7 @@ impl Database { PreservedCatalog::wipe(iox_object_store.as_ref()) .await .map_err(Box::new) - .context(WipePreservedCatalog { db_name })?; + .context(WipePreservedCatalog { db_name: &db_name })?; let config = PreservedCatalogConfig::new( Arc::clone(&iox_object_store), @@ -658,7 +662,7 @@ impl Database { parquet_catalog::rebuild::rebuild_catalog(config, false) .await .map_err(Box::new) - .context(RebuildPreservedCatalog { db_name })?; + .context(RebuildPreservedCatalog { db_name: &db_name })?; // Double check the state hasn't changed (we hold the // freeze handle to make sure it does not) @@ -676,22 +680,32 @@ impl Database { /// Recover from a ReplayError by skipping replay pub async fn skip_replay(&self) -> Result<(), Error> { - let db_name = &self.shared.config.name; + self.shared.config.write().skip_replay = true; - let handle = self.shared.state.read().freeze(); - let handle = handle.await; + match &**self.shared.state.read() { + DatabaseState::ReplayError(_, _) | DatabaseState::RulesLoaded(_) => {} + state => { + return InvalidState { + db_name: &self.shared.config.read().name, + state: state.state_code(), + transition: "SkipReplay", + } + .fail() + } + } - let mut current_state = error_state!(self, "SkipReplay", ReplayError); + // wait for DB to leave a potential `ReplayError` state + loop { + // Register interest before checking to avoid race + let notify = self.shared.state_notify.notified(); - current_state.replay_plan = Arc::new(None); - let current_state = current_state - .advance(self.shared.as_ref()) - .await - .map_err(Box::new) - .context(SkipReplay { db_name })?; + match &**self.shared.state.read() { + DatabaseState::ReplayError(_, _) | DatabaseState::RulesLoaded(_) => {} + _ => break, + } - let mut state = self.shared.state.write(); - *state.unfreeze(handle) = DatabaseState::Initialized(current_state); + notify.await; + } Ok(()) } @@ -699,7 +713,7 @@ impl Database { impl Drop for Database { fn drop(&mut self) { - let db_name = &self.shared.config.name; + let db_name = self.name(); if !self.shared.shutdown.is_cancelled() { warn!(%db_name, "database dropped without calling shutdown()"); self.shared.shutdown.cancel(); @@ -715,7 +729,7 @@ impl Drop for Database { #[derive(Debug)] struct DatabaseShared { /// Configuration provided to the database at startup - config: DatabaseConfig, + config: RwLock, /// A token that is used to trigger shutdown of the background worker shutdown: CancellationToken, @@ -734,7 +748,8 @@ struct DatabaseShared { /// The background worker for `Database` - there should only ever be one async fn background_worker(shared: Arc) { - info!(db_name=%shared.config.name, "started database background worker"); + let db_name = shared.config.read().name.clone(); + info!(%db_name, "started database background worker"); // The background loop runs until `Database::shutdown` is called while !shared.shutdown.is_cancelled() { @@ -742,7 +757,7 @@ async fn background_worker(shared: Arc) { if shared.shutdown.is_cancelled() { // TODO: Shutdown intermediate workers (#2813) - info!(db_name=%shared.config.name, "database shutdown before finishing initialization"); + info!(%db_name, "database shutdown before finishing initialization"); break; } @@ -758,14 +773,9 @@ async fn background_worker(shared: Arc) { .expect("expected initialized") .clone(); - info!(db_name=%shared.config.name, "database finished initialization - starting Db worker"); + info!(%db_name, "database finished initialization - starting Db worker"); - crate::utils::panic_test(|| { - Some(format!( - "database background worker: {}", - shared.config.name - )) - }); + crate::utils::panic_test(|| Some(format!("database background worker: {}", db_name,))); let db_shutdown = CancellationToken::new(); let db_worker = db.background_worker(db_shutdown.clone()).fuse(); @@ -795,7 +805,7 @@ async fn background_worker(shared: Arc) { futures::pin_mut!(notify); if shared.state.read().get_initialized().is_none() { - info!(db_name=%shared.config.name, "database no longer initialized"); + info!(%db_name, "database no longer initialized"); break; } @@ -809,44 +819,44 @@ async fn background_worker(shared: Arc) { _ = shutdown => info!("database shutting down"), _ = notify => info!("notified of state change"), _ = consumer_join => { - error!(db_name=%shared.config.name, "unexpected shutdown of write buffer consumer - bailing out"); + error!(%db_name, "unexpected shutdown of write buffer consumer - bailing out"); shared.shutdown.cancel(); } _ = lifecycle_join => { - error!(db_name=%shared.config.name, "unexpected shutdown of lifecycle worker - bailing out"); + error!(%db_name, "unexpected shutdown of lifecycle worker - bailing out"); shared.shutdown.cancel(); } _ = db_worker => { - error!(db_name=%shared.config.name, "unexpected shutdown of db - bailing out"); + error!(%db_name, "unexpected shutdown of db - bailing out"); shared.shutdown.cancel(); } } } if let Some(consumer) = write_buffer_consumer { - info!(db_name=%shared.config.name, "shutting down write buffer consumer"); + info!(%db_name, "shutting down write buffer consumer"); consumer.shutdown(); if let Err(e) = consumer.join().await { - error!(db_name=%shared.config.name, %e, "error shutting down write buffer consumer") + error!(%db_name, %e, "error shutting down write buffer consumer") } } if !lifecycle_join.is_terminated() { - info!(db_name=%shared.config.name, "shutting down lifecycle worker"); + info!(%db_name, "shutting down lifecycle worker"); lifecycle_worker.shutdown(); if let Err(e) = lifecycle_worker.join().await { - error!(db_name=%shared.config.name, %e, "error shutting down lifecycle worker") + error!(%db_name, %e, "error shutting down lifecycle worker") } } if !db_worker.is_terminated() { - info!(db_name=%shared.config.name, "waiting for db worker shutdown"); + info!(%db_name, "waiting for db worker shutdown"); db_shutdown.cancel(); db_worker.await } } - info!(db_name=%shared.config.name, "draining tasks"); + info!(%db_name, "draining tasks"); // Loop in case tasks are spawned during shutdown loop { @@ -857,8 +867,8 @@ async fn background_worker(shared: Arc) { let mut futures: FuturesUnordered<_> = jobs .iter() .filter_map(|tracker| { - let db_name = tracker.metadata().db_name()?; - if db_name.as_ref() != shared.config.name.as_str() { + let db_name2 = tracker.metadata().db_name()?; + if db_name2.as_ref() != db_name.as_str() { return None; } Some(tracker.join()) @@ -869,20 +879,29 @@ async fn background_worker(shared: Arc) { break; } - info!(db_name=%shared.config.name, count=futures.len(), "waiting for jobs"); + info!(%db_name, count=futures.len(), "waiting for jobs"); while futures.next().await.is_some() {} } - info!(db_name=%shared.config.name, "database worker finished"); + info!(%db_name, "database worker finished"); +} + +enum TransactionOrWait { + Transaction(DatabaseState, internal_types::freezable::FreezeHandle), + Wait(Duration), } /// Try to drive the database to `DatabaseState::Initialized` returns when /// this is achieved or the shutdown signal is triggered async fn initialize_database(shared: &DatabaseShared) { - let db_name = &shared.config.name; + let db_name = shared.config.read().name.clone(); info!(%db_name, "database initialization started"); + // error throttle controshared.config.name.l + let mut throttled_error = false; + let mut sleep = INIT_BACKOFF; + while !shared.shutdown.is_cancelled() { // Acquire locks and determine if work to be done let maybe_transaction = { @@ -891,50 +910,61 @@ async fn initialize_database(shared: &DatabaseShared) { match &**state { // Already initialized DatabaseState::Initialized(_) => break, - // Can perform work - DatabaseState::Known(_) - | DatabaseState::DatabaseObjectStoreFound(_) - | DatabaseState::OwnerInfoLoaded(_) - | DatabaseState::RulesLoaded(_) - | DatabaseState::CatalogLoaded(_) - | DatabaseState::WriteBufferCreationError(_, _) => { - match state.try_freeze() { - Some(handle) => Some((DatabaseState::clone(&state), handle)), - None => { - // Backoff if there is already an in-progress initialization action (e.g. recovery) - info!(%db_name, %state, "init transaction already in progress"); - None - } - } - } + // No active database found, was probably deleted DatabaseState::NoActiveDatabase(_, _) => { info!(%db_name, "no active database found"); - None + + // no exponential / jitter sleep + TransactionOrWait::Wait(INIT_BACKOFF) } - // Operator intervention required - DatabaseState::DatabaseObjectStoreLookupError(_, e) - | DatabaseState::OwnerInfoLoadError(_, e) - | DatabaseState::RulesLoadError(_, e) - | DatabaseState::CatalogLoadError(_, e) - | DatabaseState::ReplayError(_, e) => { + + // Can perform work + _ if state.error().is_none() || (state.error().is_some() && throttled_error) => { + match state.try_freeze() { + Some(handle) => { + TransactionOrWait::Transaction(DatabaseState::clone(&state), handle) + } + None => { + // Backoff if there is already an in-progress initialization action (e.g. recovery) + info!(%db_name, %state, "init transaction already in progress"); + + // no exponential / jitter sleep + TransactionOrWait::Wait(INIT_BACKOFF) + } + } + } + + // Unthrottled error state, need to wait + _ => { + let e = state + .error() + .expect("How did we end up in a non-error state?"); error!( %db_name, %e, %state, - "database in error state - operator intervention required" + "database in error state - wait until retry" ); - None + throttled_error = true; + + // exponential backup w/ jitter, decorrelated + // see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ + let mut rng = thread_rng(); + sleep = Duration::from_secs_f64(MAX_BACKOFF.as_secs_f64().min( + rng.gen_range(INIT_BACKOFF.as_secs_f64()..(sleep.as_secs_f64() * 3.0)), + )); + TransactionOrWait::Wait(sleep) } } }; // Backoff if no work to be done let (state, handle) = match maybe_transaction { - Some((state, handle)) => (state, handle), - None => { + TransactionOrWait::Transaction(state, handle) => (state, handle), + TransactionOrWait::Wait(d) => { info!(%db_name, "backing off initialization"); - tokio::time::sleep(INIT_BACKOFF).await; + tokio::time::sleep(d).await; continue; } }; @@ -943,40 +973,61 @@ async fn initialize_database(shared: &DatabaseShared) { // Try to advance to the next state let next_state = match state { - DatabaseState::Known(state) => match state.advance(shared).await { - Ok(state) => DatabaseState::DatabaseObjectStoreFound(state), - Err(InitError::NoActiveDatabase) => { - DatabaseState::NoActiveDatabase(state, Arc::new(InitError::NoActiveDatabase)) + DatabaseState::Known(state) + | DatabaseState::DatabaseObjectStoreLookupError(state, _) => { + match state.advance(shared).await { + Ok(state) => DatabaseState::DatabaseObjectStoreFound(state), + Err(InitError::NoActiveDatabase) => DatabaseState::NoActiveDatabase( + state, + Arc::new(InitError::NoActiveDatabase), + ), + Err(e) => DatabaseState::DatabaseObjectStoreLookupError(state, Arc::new(e)), } - Err(e) => DatabaseState::DatabaseObjectStoreLookupError(state, Arc::new(e)), - }, - DatabaseState::DatabaseObjectStoreFound(state) => match state.advance(shared).await { + } + DatabaseState::DatabaseObjectStoreFound(state) + | DatabaseState::OwnerInfoLoadError(state, _) => match state.advance(shared).await { Ok(state) => DatabaseState::OwnerInfoLoaded(state), Err(e) => DatabaseState::OwnerInfoLoadError(state, Arc::new(e)), }, - DatabaseState::OwnerInfoLoaded(state) => match state.advance(shared).await { - Ok(state) => DatabaseState::RulesLoaded(state), - Err(e) => DatabaseState::RulesLoadError(state, Arc::new(e)), - }, - DatabaseState::RulesLoaded(state) => match state.advance(shared).await { - Ok(state) => DatabaseState::CatalogLoaded(state), - Err(e) => DatabaseState::CatalogLoadError(state, Arc::new(e)), - }, + DatabaseState::OwnerInfoLoaded(state) | DatabaseState::RulesLoadError(state, _) => { + match state.advance(shared).await { + Ok(state) => DatabaseState::RulesLoaded(state), + Err(e) => DatabaseState::RulesLoadError(state, Arc::new(e)), + } + } + DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => { + match state.advance(shared).await { + Ok(state) => DatabaseState::CatalogLoaded(state), + Err(e) => DatabaseState::CatalogLoadError(state, Arc::new(e)), + } + } DatabaseState::CatalogLoaded(state) | DatabaseState::WriteBufferCreationError(state, _) => { match state.advance(shared).await { Ok(state) => DatabaseState::Initialized(state), Err(e @ InitError::CreateWriteBuffer { .. }) => { - info!(%db_name, %e, "cannot create write buffer, wait a bit and try again"); - tokio::time::sleep(INIT_BACKOFF).await; DatabaseState::WriteBufferCreationError(state, Arc::new(e)) } Err(e) => DatabaseState::ReplayError(state, Arc::new(e)), } } - state => unreachable!("{:?}", state), + DatabaseState::ReplayError(state, _) => { + warn!(%db_name, "throwing away loaded catalog to recover from replay error"); + DatabaseState::RulesLoaded(state.rollback()) + } + DatabaseState::Initialized(_) | DatabaseState::NoActiveDatabase(_, _) => { + unreachable!("{:?}", state) + } }; + if next_state.error().is_some() { + // this is a new error that needs to be throttled + throttled_error = false; + } else { + // reset backoff + sleep = INIT_BACKOFF; + } + // Commit the next state { let mut state = shared.state.write(); @@ -1060,19 +1111,45 @@ pub enum InitError { /// The Database startup state machine /// -/// A Database starts in DatabaseState::Known and advances through the +/// ```text +/// (end) +/// ^ +/// | +/// [NoActiveDatabase] +/// ^ +/// | +/// (start)------------>[Known]<------------>[DatabaseObjectStoreLookupError] +/// | +/// V +/// [DatabaseObjectStoreFound]<----->[OwnerInfoLoadError] +/// | +/// V +/// [OwnerInfoLoaded]<---------->[RulesLoadError] +/// | +/// V +/// o--------->[RulesLoaded]<------------->[CatalogLoadError] +/// | | +/// | V +/// [ReplayError]<--[CatalogLoaded]<--------->[WriteBufferCreationError] +/// | +/// | +/// V +/// [Initialized] +/// | +/// V +/// (end) +/// ``` +/// +/// A Database starts in [`DatabaseState::Known`] and advances through the /// non error states in sequential order until either: /// -/// 1. It reaches `Initialized` -/// -/// 2. It is reset to `Known` and starts initialization again -/// +/// 1. It reaches [`DatabaseState::Initialized`]: Database is initialized +/// 2. It reaches [`DatabaseState::NoActiveDatabase`]: We cannot setup this database because we are not aware of any +/// active claim with this name. /// 3. An error is encountered, in which case it transitions to one of -/// the error states. Most are Terminal (and thus require operator -/// intervention) but some (such as `WriteBufferCreationError`) may -/// resolve after some time to the basic initialization sequence -/// (e.g. `Initialized`) -/// +/// the error states. We try to recover from all of them. For all except [`DatabaseState::ReplayError`] this is a +/// rather cheap operation since we can just retry the actual operation. For [`DatabaseState::ReplayError`] we need +/// to dump the potentially half-modified in-memory catalog before retrying. #[derive(Debug, Clone)] enum DatabaseState { // Basic initialization sequence states: @@ -1081,25 +1158,19 @@ enum DatabaseState { OwnerInfoLoaded(DatabaseStateOwnerInfoLoaded), RulesLoaded(DatabaseStateRulesLoaded), CatalogLoaded(DatabaseStateCatalogLoaded), + + // Terminal state (positive) Initialized(DatabaseStateInitialized), - // Error states - /// Terminal State - DatabaseObjectStoreLookupError(DatabaseStateKnown, Arc), - /// Terminal State + // Terminal state (negative) NoActiveDatabase(DatabaseStateKnown, Arc), - /// Terminal State + + // Error states, we'll try to recover from them + DatabaseObjectStoreLookupError(DatabaseStateKnown, Arc), OwnerInfoLoadError(DatabaseStateDatabaseObjectStoreFound, Arc), - /// Terminal State RulesLoadError(DatabaseStateOwnerInfoLoaded, Arc), - /// Terminal State CatalogLoadError(DatabaseStateRulesLoaded, Arc), - /// Non Terminal State: There was an error creating a connction to - /// the WriteBuffer, but the connection will be retried. If a - /// connection is successfully created, the database will - /// transition to `Initialized` WriteBufferCreationError(DatabaseStateCatalogLoaded, Arc), - /// Terminal State ReplayError(DatabaseStateCatalogLoaded, Arc), } @@ -1254,9 +1325,10 @@ impl DatabaseStateKnown { &self, shared: &DatabaseShared, ) -> Result { + let location = shared.config.read().location.clone(); let iox_object_store = IoxObjectStore::load_at_root_path( Arc::clone(shared.application.object_store()), - &shared.config.location, + &location, ) .await .context(DatabaseObjectStoreLookup)?; @@ -1282,10 +1354,11 @@ impl DatabaseStateDatabaseObjectStoreFound { .await .context(FetchingOwnerInfo)?; - if owner_info.id != shared.config.server_id.get_u32() { + let server_id = shared.config.read().server_id.get_u32(); + if owner_info.id != server_id { return DatabaseOwnerMismatch { actual: owner_info.id, - expected: shared.config.server_id.get_u32(), + expected: server_id, } .fail(); } @@ -1430,10 +1503,11 @@ impl DatabaseStateOwnerInfoLoaded { .await .context(LoadingRules)?; - if rules.db_name() != &shared.config.name { + let db_name = shared.config.read().name.clone(); + if rules.db_name() != &db_name { return RulesDatabaseNameMismatch { actual: rules.db_name(), - expected: shared.config.name.as_str(), + expected: db_name.as_str(), } .fail(); } @@ -1461,19 +1535,28 @@ impl DatabaseStateRulesLoaded { &self, shared: &DatabaseShared, ) -> Result { + let (db_name, wipe_catalog_on_error, skip_replay, server_id) = { + let config = shared.config.read(); + ( + config.name.clone(), + config.wipe_catalog_on_error, + config.skip_replay, + config.server_id, + ) + }; let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog( - shared.config.name.as_str(), + db_name.as_str(), Arc::clone(&self.iox_object_store), Arc::clone(shared.application.metric_registry()), Arc::clone(shared.application.time_provider()), - shared.config.wipe_catalog_on_error, - shared.config.skip_replay, + wipe_catalog_on_error, + skip_replay, ) .await .context(CatalogLoad)?; let database_to_commit = DatabaseToCommit { - server_id: shared.config.server_id, + server_id, iox_object_store: Arc::clone(&self.iox_object_store), exec: Arc::clone(shared.application.executor()), rules: Arc::clone(self.provided_rules.rules()), @@ -1522,19 +1605,29 @@ impl DatabaseStateCatalogLoaded { let rules = self.provided_rules.rules(); let trace_collector = shared.application.trace_collector(); let write_buffer_factory = shared.application.write_buffer_factory(); + let (db_name, server_id, skip_replay) = { + let config = shared.config.read(); + (config.name.clone(), config.server_id, config.skip_replay) + }; let write_buffer_consumer = match rules.write_buffer_connection.as_ref() { Some(connection) => { let mut consumer = write_buffer_factory .new_config_read( - shared.config.server_id, - shared.config.name.as_str(), + server_id, + db_name.as_str(), trace_collector.as_ref(), connection, ) .await .context(CreateWriteBuffer)?; - db.perform_replay(self.replay_plan.as_ref().as_ref(), consumer.as_mut()) + let replay_plan = if skip_replay { + None + } else { + self.replay_plan.as_ref().as_ref() + }; + + db.perform_replay(replay_plan, consumer.as_mut()) .await .context(Replay)?; @@ -1558,6 +1651,16 @@ impl DatabaseStateCatalogLoaded { owner_info: self.owner_info.clone(), }) } + + /// Rolls back state to an unloaded catalog. + fn rollback(&self) -> DatabaseStateRulesLoaded { + DatabaseStateRulesLoaded { + provided_rules: Arc::clone(&self.provided_rules), + uuid: self.uuid, + owner_info: self.owner_info.clone(), + iox_object_store: self.db.iox_object_store(), + } + } } #[derive(Debug, Clone)] @@ -1718,7 +1821,7 @@ mod tests { #[tokio::test] async fn database_release() { let (application, database) = initialized_database().await; - let server_id = database.shared.config.server_id; + let server_id = database.shared.config.read().server_id; let server_location = IoxObjectStore::server_config_path(application.object_store(), server_id).to_string(); let iox_object_store = database.iox_object_store().unwrap(); @@ -1744,8 +1847,8 @@ mod tests { #[tokio::test] async fn database_claim() { let (application, database) = initialized_database().await; - let db_name = &database.shared.config.name; - let server_id = database.shared.config.server_id; + let db_name = &database.shared.config.read().name.clone(); + let server_id = database.shared.config.read().server_id; let server_location = IoxObjectStore::server_config_path(application.object_store(), server_id).to_string(); let iox_object_store = database.iox_object_store().unwrap(); From 496043a767e4c16259404c4d8a095eb4a65f4fff Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 14 Dec 2021 09:58:32 +0000 Subject: [PATCH 02/10] fix: PR feedback Co-authored-by: Andrew Lamb --- server/src/database.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/database.rs b/server/src/database.rs index 9f69f30adb..7654169264 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -948,7 +948,7 @@ async fn initialize_database(shared: &DatabaseShared) { ); throttled_error = true; - // exponential backup w/ jitter, decorrelated + // exponential backoff w/ jitter, decorrelated // see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ let mut rng = thread_rng(); sleep = Duration::from_secs_f64(MAX_BACKOFF.as_secs_f64().min( @@ -1159,10 +1159,10 @@ enum DatabaseState { RulesLoaded(DatabaseStateRulesLoaded), CatalogLoaded(DatabaseStateCatalogLoaded), - // Terminal state (positive) + // Terminal state (success) Initialized(DatabaseStateInitialized), - // Terminal state (negative) + // Terminal state (failure) NoActiveDatabase(DatabaseStateKnown, Arc), // Error states, we'll try to recover from them From 11df3091a10899a68faba81cc13e8e3da1cc3d85 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 14 Dec 2021 10:59:09 +0100 Subject: [PATCH 03/10] fix: misleading comment --- server/src/database.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/database.rs b/server/src/database.rs index 7654169264..522a2d2eac 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -898,7 +898,7 @@ async fn initialize_database(shared: &DatabaseShared) { let db_name = shared.config.read().name.clone(); info!(%db_name, "database initialization started"); - // error throttle controshared.config.name.l + // error throttle let mut throttled_error = false; let mut sleep = INIT_BACKOFF; From 7a1f70f9705a8563689464a77d222da00cafd21c Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 14 Dec 2021 11:07:15 +0100 Subject: [PATCH 04/10] fix: move warning to better place --- server/src/database.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/database.rs b/server/src/database.rs index 522a2d2eac..8ada1bf423 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -1011,10 +1011,7 @@ async fn initialize_database(shared: &DatabaseShared) { Err(e) => DatabaseState::ReplayError(state, Arc::new(e)), } } - DatabaseState::ReplayError(state, _) => { - warn!(%db_name, "throwing away loaded catalog to recover from replay error"); - DatabaseState::RulesLoaded(state.rollback()) - } + DatabaseState::ReplayError(state, _) => DatabaseState::RulesLoaded(state.rollback()), DatabaseState::Initialized(_) | DatabaseState::NoActiveDatabase(_, _) => { unreachable!("{:?}", state) } @@ -1654,6 +1651,7 @@ impl DatabaseStateCatalogLoaded { /// Rolls back state to an unloaded catalog. fn rollback(&self) -> DatabaseStateRulesLoaded { + warn!(db_name=%self.db.name(), "throwing away loaded catalog to recover from replay error"); DatabaseStateRulesLoaded { provided_rules: Arc::clone(&self.provided_rules), uuid: self.uuid, From e80ac94af52c04e25f1a7a47d8a980f049c251dc Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 14 Dec 2021 11:07:59 +0100 Subject: [PATCH 05/10] fix: database state diagram --- server/src/database.rs | 53 +++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/server/src/database.rs b/server/src/database.rs index 8ada1bf423..d18a42959c 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -1115,26 +1115,41 @@ pub enum InitError { /// [NoActiveDatabase] /// ^ /// | -/// (start)------------>[Known]<------------>[DatabaseObjectStoreLookupError] -/// | -/// V -/// [DatabaseObjectStoreFound]<----->[OwnerInfoLoadError] -/// | -/// V -/// [OwnerInfoLoaded]<---------->[RulesLoadError] -/// | -/// V -/// o--------->[RulesLoaded]<------------->[CatalogLoadError] -/// | | -/// | V -/// [ReplayError]<--[CatalogLoaded]<--------->[WriteBufferCreationError] -/// | -/// | -/// V +/// | +/// (start)------------>[Known]------------->[DatabaseObjectStoreLookupError] +/// | | +/// +---------------------------o +/// | +/// | +/// V +/// [DatabaseObjectStoreFound]------>[OwnerInfoLoadError] +/// | | +/// +---------------------------o +/// | +/// | +/// V +/// [OwnerInfoLoaded]----------->[RulesLoadError] +/// | | +/// +---------------------------o +/// | +/// | +/// V +/// o--------->[RulesLoaded]-------------->[CatalogLoadError] +/// | | | +/// | +---------------------------o +/// | | +/// | | +/// | V +/// [ReplayError]<--[CatalogLoaded]---------->[WriteBufferCreationError] +/// | | +/// +---------------------------o +/// | +/// | +/// V /// [Initialized] -/// | -/// V -/// (end) +/// | +/// V +/// (end) /// ``` /// /// A Database starts in [`DatabaseState::Known`] and advances through the From 0f3cfb024bff70cfe03f270c4a4286499e7ca12f Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 14 Dec 2021 11:26:36 +0100 Subject: [PATCH 06/10] test: add database startup recovery test --- iox_object_store/src/lib.rs | 7 +++++++ server/src/database.rs | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/iox_object_store/src/lib.rs b/iox_object_store/src/lib.rs index aea54b9ca8..5326eea651 100644 --- a/iox_object_store/src/lib.rs +++ b/iox_object_store/src/lib.rs @@ -193,6 +193,13 @@ impl IoxObjectStore { Ok(self.inner.get(&owner_path).await?.bytes().await?.into()) } + /// Delete owner file for testing + pub async fn delete_owner_file_for_testing(&self) -> Result<()> { + let owner_path = self.root_path.owner_path(); + + self.inner.delete(&owner_path).await + } + /// The location in object storage for all files for this database, suitable for logging or /// debugging purposes only. Do not parse this, as its format is subject to change! pub fn debug_database_path(&self) -> String { diff --git a/server/src/database.rs b/server/src/database.rs index d18a42959c..6e81b2bc8b 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -2101,6 +2101,41 @@ mod tests { .unwrap(); } + #[tokio::test] + async fn database_init_recovery() { + let (application, database) = initialized_database().await; + let iox_object_store = database.iox_object_store().unwrap(); + let config = database.shared.config.read().clone(); + + // shutdown first database + database.shutdown(); + database.join().await.unwrap(); + + // mess up owner file + let owner_backup = iox_object_store.get_owner_file().await.unwrap(); + iox_object_store + .delete_owner_file_for_testing() + .await + .unwrap(); + + // create second database + let database = Database::new(Arc::clone(&application), config); + database.wait_for_init().await.unwrap_err(); + + // recover database by fixing owner file + iox_object_store.put_owner_file(owner_backup).await.unwrap(); + tokio::time::timeout(Duration::from_secs(10), async move { + loop { + if database.wait_for_init().await.is_ok() { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap(); + } + /// Normally database rules are provided as grpc messages, but in /// tests they are constructed from database rules structures /// themselves. From 0682fc208f1f0d082c2ebc922e4b272f2c63463c Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 14 Dec 2021 12:38:14 +0100 Subject: [PATCH 07/10] feat: also recover from `NoActiveDatabase` --- server/src/database.rs | 153 ++++++++++++++++++++++------------------- 1 file changed, 84 insertions(+), 69 deletions(-) diff --git a/server/src/database.rs b/server/src/database.rs index 6e81b2bc8b..fdd2bd629c 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -911,14 +911,6 @@ async fn initialize_database(shared: &DatabaseShared) { // Already initialized DatabaseState::Initialized(_) => break, - // No active database found, was probably deleted - DatabaseState::NoActiveDatabase(_, _) => { - info!(%db_name, "no active database found"); - - // no exponential / jitter sleep - TransactionOrWait::Wait(INIT_BACKOFF) - } - // Can perform work _ if state.error().is_none() || (state.error().is_some() && throttled_error) => { match state.try_freeze() { @@ -974,16 +966,14 @@ async fn initialize_database(shared: &DatabaseShared) { // Try to advance to the next state let next_state = match state { DatabaseState::Known(state) - | DatabaseState::DatabaseObjectStoreLookupError(state, _) => { - match state.advance(shared).await { - Ok(state) => DatabaseState::DatabaseObjectStoreFound(state), - Err(InitError::NoActiveDatabase) => DatabaseState::NoActiveDatabase( - state, - Arc::new(InitError::NoActiveDatabase), - ), - Err(e) => DatabaseState::DatabaseObjectStoreLookupError(state, Arc::new(e)), + | DatabaseState::DatabaseObjectStoreLookupError(state, _) + | DatabaseState::NoActiveDatabase(state, _) => match state.advance(shared).await { + Ok(state) => DatabaseState::DatabaseObjectStoreFound(state), + Err(InitError::NoActiveDatabase) => { + DatabaseState::NoActiveDatabase(state, Arc::new(InitError::NoActiveDatabase)) } - } + Err(e) => DatabaseState::DatabaseObjectStoreLookupError(state, Arc::new(e)), + }, DatabaseState::DatabaseObjectStoreFound(state) | DatabaseState::OwnerInfoLoadError(state, _) => match state.advance(shared).await { Ok(state) => DatabaseState::OwnerInfoLoaded(state), @@ -1012,7 +1002,7 @@ async fn initialize_database(shared: &DatabaseShared) { } } DatabaseState::ReplayError(state, _) => DatabaseState::RulesLoaded(state.rollback()), - DatabaseState::Initialized(_) | DatabaseState::NoActiveDatabase(_, _) => { + DatabaseState::Initialized(_) => { unreachable!("{:?}", state) } }; @@ -1109,56 +1099,51 @@ pub enum InitError { /// The Database startup state machine /// /// ```text -/// (end) -/// ^ -/// | -/// [NoActiveDatabase] -/// ^ -/// | -/// | -/// (start)------------>[Known]------------->[DatabaseObjectStoreLookupError] -/// | | -/// +---------------------------o -/// | -/// | -/// V -/// [DatabaseObjectStoreFound]------>[OwnerInfoLoadError] -/// | | -/// +---------------------------o -/// | -/// | -/// V -/// [OwnerInfoLoaded]----------->[RulesLoadError] -/// | | -/// +---------------------------o -/// | -/// | -/// V -/// o--------->[RulesLoaded]-------------->[CatalogLoadError] -/// | | | -/// | +---------------------------o -/// | | -/// | | -/// | V -/// [ReplayError]<--[CatalogLoaded]---------->[WriteBufferCreationError] -/// | | -/// +---------------------------o -/// | -/// | -/// V -/// [Initialized] -/// | -/// V -/// (end) +/// (start) +/// | +/// o-o | o-o +/// | V V V | +/// [NoActiveDatabase]<--[Known]------------->[DatabaseObjectStoreLookupError] +/// | | | +/// o----------------+---------------------------o +/// | +/// | o-o +/// V V | +/// [DatabaseObjectStoreFound]------>[OwnerInfoLoadError] +/// | | +/// +---------------------------o +/// | +/// | o-o +/// V V | +/// [OwnerInfoLoaded]----------->[RulesLoadError] +/// | | +/// +---------------------------o +/// | +/// | o-o +/// V V | +/// o--------->[RulesLoaded]-------------->[CatalogLoadError] +/// | | | +/// | +---------------------------o +/// | | +/// | | o-o +/// | V V | +/// [ReplayError]<--[CatalogLoaded]---------->[WriteBufferCreationError] +/// | | +/// +---------------------------o +/// | +/// | +/// V +/// [Initialized] +/// | +/// V +/// (end) /// ``` /// /// A Database starts in [`DatabaseState::Known`] and advances through the /// non error states in sequential order until either: /// /// 1. It reaches [`DatabaseState::Initialized`]: Database is initialized -/// 2. It reaches [`DatabaseState::NoActiveDatabase`]: We cannot setup this database because we are not aware of any -/// active claim with this name. -/// 3. An error is encountered, in which case it transitions to one of +/// 2. An error is encountered, in which case it transitions to one of /// the error states. We try to recover from all of them. For all except [`DatabaseState::ReplayError`] this is a /// rather cheap operation since we can just retry the actual operation. For [`DatabaseState::ReplayError`] we need /// to dump the potentially half-modified in-memory catalog before retrying. @@ -1174,10 +1159,8 @@ enum DatabaseState { // Terminal state (success) Initialized(DatabaseStateInitialized), - // Terminal state (failure) - NoActiveDatabase(DatabaseStateKnown, Arc), - // Error states, we'll try to recover from them + NoActiveDatabase(DatabaseStateKnown, Arc), DatabaseObjectStoreLookupError(DatabaseStateKnown, Arc), OwnerInfoLoadError(DatabaseStateDatabaseObjectStoreFound, Arc), RulesLoadError(DatabaseStateOwnerInfoLoaded, Arc), @@ -1871,16 +1854,23 @@ mod tests { .to_string(); let uuid = database.release().await.unwrap(); - Database::claim(application, db_name, uuid, new_server_id, false) - .await - .unwrap(); - + // database is in error state assert_eq!(database.state_code(), DatabaseStateCode::NoActiveDatabase); assert!(matches!( database.init_error().unwrap().as_ref(), InitError::NoActiveDatabase )); + Database::claim( + Arc::clone(&application), + db_name, + uuid, + new_server_id, + false, + ) + .await + .unwrap(); + let owner_info = fetch_owner_info(&iox_object_store).await.unwrap(); assert_eq!(owner_info.id, new_server_id.get_u32()); assert_eq!(owner_info.location, new_server_location); @@ -1893,6 +1883,31 @@ mod tests { let claim_transaction = &owner_info.transactions[1]; assert_eq!(claim_transaction.id, 0); assert_eq!(claim_transaction.location, ""); + + // put it back to first DB + let db_config = DatabaseConfig { + server_id: new_server_id, + ..database.shared.config.read().clone() + }; + let new_database = Database::new(Arc::clone(&application), db_config.clone()); + new_database.wait_for_init().await.unwrap(); + new_database.release().await.unwrap(); + Database::claim(application, db_name, uuid, server_id, false) + .await + .unwrap(); + + // database should recover + tokio::time::timeout(Duration::from_secs(10), async move { + loop { + if database.wait_for_init().await.is_ok() { + return; + } + + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .unwrap(); } #[tokio::test] From 8520b9b9db5b9693bd952c8f39546c16938a6e97 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 14 Dec 2021 12:38:24 +0100 Subject: [PATCH 08/10] fix: improve docs and naming --- server/src/database.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/server/src/database.rs b/server/src/database.rs index fdd2bd629c..5ed6bb157f 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -887,8 +887,16 @@ async fn background_worker(shared: Arc) { info!(%db_name, "database worker finished"); } +/// Determine what the init loop should do next. enum TransactionOrWait { + /// We can transition from one state into another. Transaction(DatabaseState, internal_types::freezable::FreezeHandle), + + /// We have to wait. + /// + /// This can have multiple reasons: + /// - there's another transaction in progress and we wait until we can try again + /// - error backoff Wait(Duration), } @@ -898,9 +906,11 @@ async fn initialize_database(shared: &DatabaseShared) { let db_name = shared.config.read().name.clone(); info!(%db_name, "database initialization started"); - // error throttle + // error throttling + // - checks if the current error was already throttled + // - keeps a backoff duration that will change over the curse of multiple errors let mut throttled_error = false; - let mut sleep = INIT_BACKOFF; + let mut backoff = INIT_BACKOFF; while !shared.shutdown.is_cancelled() { // Acquire locks and determine if work to be done @@ -943,10 +953,10 @@ async fn initialize_database(shared: &DatabaseShared) { // exponential backoff w/ jitter, decorrelated // see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ let mut rng = thread_rng(); - sleep = Duration::from_secs_f64(MAX_BACKOFF.as_secs_f64().min( - rng.gen_range(INIT_BACKOFF.as_secs_f64()..(sleep.as_secs_f64() * 3.0)), + backoff = Duration::from_secs_f64(MAX_BACKOFF.as_secs_f64().min( + rng.gen_range(INIT_BACKOFF.as_secs_f64()..(backoff.as_secs_f64() * 3.0)), )); - TransactionOrWait::Wait(sleep) + TransactionOrWait::Wait(backoff) } } }; @@ -1012,7 +1022,7 @@ async fn initialize_database(shared: &DatabaseShared) { throttled_error = false; } else { // reset backoff - sleep = INIT_BACKOFF; + backoff = INIT_BACKOFF; } // Commit the next state From 384159829663ff686f59400bde5e0da8707d112d Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 15 Dec 2021 11:40:39 +0000 Subject: [PATCH 09/10] fix: typo Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- server/src/database.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/database.rs b/server/src/database.rs index 5ed6bb157f..dac364aa46 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -908,7 +908,7 @@ async fn initialize_database(shared: &DatabaseShared) { // error throttling // - checks if the current error was already throttled - // - keeps a backoff duration that will change over the curse of multiple errors + // - keeps a backoff duration that will change over the course of multiple errors let mut throttled_error = false; let mut backoff = INIT_BACKOFF; From e9041e6da089aed8f1877815cc2850cd7cdcdeec Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 15 Dec 2021 12:43:33 +0100 Subject: [PATCH 10/10] refactor: improve database init routine 1. Simpler locking/freezing 2. Don't commit the rollback from `ReplayError` to `RulesLoaded` because it will confuse the heck out of our users/admins. --- server/src/database.rs | 71 ++++++++++++++++++------------------------ 1 file changed, 31 insertions(+), 40 deletions(-) diff --git a/server/src/database.rs b/server/src/database.rs index dac364aa46..021b1c1507 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -680,19 +680,9 @@ impl Database { /// Recover from a ReplayError by skipping replay pub async fn skip_replay(&self) -> Result<(), Error> { - self.shared.config.write().skip_replay = true; + error_state!(self, "SkipReplay", ReplayError); - match &**self.shared.state.read() { - DatabaseState::ReplayError(_, _) | DatabaseState::RulesLoaded(_) => {} - state => { - return InvalidState { - db_name: &self.shared.config.read().name, - state: state.state_code(), - transition: "SkipReplay", - } - .fail() - } - } + self.shared.config.write().skip_replay = true; // wait for DB to leave a potential `ReplayError` state loop { @@ -700,7 +690,7 @@ impl Database { let notify = self.shared.state_notify.notified(); match &**self.shared.state.read() { - DatabaseState::ReplayError(_, _) | DatabaseState::RulesLoaded(_) => {} + DatabaseState::ReplayError(_, _) => {} _ => break, } @@ -892,11 +882,7 @@ enum TransactionOrWait { /// We can transition from one state into another. Transaction(DatabaseState, internal_types::freezable::FreezeHandle), - /// We have to wait. - /// - /// This can have multiple reasons: - /// - there's another transaction in progress and we wait until we can try again - /// - error backoff + /// We have to wait to backoff from an error. Wait(Duration), } @@ -915,6 +901,9 @@ async fn initialize_database(shared: &DatabaseShared) { while !shared.shutdown.is_cancelled() { // Acquire locks and determine if work to be done let maybe_transaction = { + // lock-dance to make this future `Send` + let handle = shared.state.read().freeze(); + let handle = handle.await; let state = shared.state.read(); match &**state { @@ -923,18 +912,7 @@ async fn initialize_database(shared: &DatabaseShared) { // Can perform work _ if state.error().is_none() || (state.error().is_some() && throttled_error) => { - match state.try_freeze() { - Some(handle) => { - TransactionOrWait::Transaction(DatabaseState::clone(&state), handle) - } - None => { - // Backoff if there is already an in-progress initialization action (e.g. recovery) - info!(%db_name, %state, "init transaction already in progress"); - - // no exponential / jitter sleep - TransactionOrWait::Wait(INIT_BACKOFF) - } - } + TransactionOrWait::Transaction(DatabaseState::clone(&state), handle) } // Unthrottled error state, need to wait @@ -1011,7 +989,16 @@ async fn initialize_database(shared: &DatabaseShared) { Err(e) => DatabaseState::ReplayError(state, Arc::new(e)), } } - DatabaseState::ReplayError(state, _) => DatabaseState::RulesLoaded(state.rollback()), + DatabaseState::ReplayError(state, _) => { + let state2 = state.rollback(); + match state2.advance(shared).await { + Ok(state2) => match state2.advance(shared).await { + Ok(state2) => DatabaseState::Initialized(state2), + Err(e) => DatabaseState::ReplayError(state, Arc::new(e)), + }, + Err(e) => DatabaseState::ReplayError(state, Arc::new(e)), + } + } DatabaseState::Initialized(_) => { unreachable!("{:?}", state) } @@ -1111,8 +1098,8 @@ pub enum InitError { /// ```text /// (start) /// | -/// o-o | o-o -/// | V V V | +/// o-o o--------|-------------------o o-o +/// | V V V V V | /// [NoActiveDatabase]<--[Known]------------->[DatabaseObjectStoreLookupError] /// | | | /// o----------------+---------------------------o @@ -1131,16 +1118,20 @@ pub enum InitError { /// | /// | o-o /// V V | -/// o--------->[RulesLoaded]-------------->[CatalogLoadError] -/// | | | -/// | +---------------------------o -/// | | -/// | | o-o -/// | V V | -/// [ReplayError]<--[CatalogLoaded]---------->[WriteBufferCreationError] +/// [RulesLoaded]-------------->[CatalogLoadError] /// | | /// +---------------------------o /// | +/// | o-o +/// V V | +/// [CatalogLoaded]---------->[WriteBufferCreationError] +/// | | | | +/// | | | | o-o +/// | | | V V | +/// | o---------------|-->[ReplayError] +/// | | | +/// +--------------------+-------o +/// | /// | /// V /// [Initialized]