Merge pull request #3361 from influxdata/crepererum/issue3335

refactor: allow database init to recover from errors
pull/24376/head
kodiakhq[bot] 2021-12-15 17:00:20 +00:00 committed by GitHub
commit 9608bb1da2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 315 additions and 141 deletions

View File

@ -193,6 +193,13 @@ impl IoxObjectStore {
Ok(self.inner.get(&owner_path).await?.bytes().await?.into())
}
/// Delete owner file for testing
pub async fn delete_owner_file_for_testing(&self) -> Result<()> {
let owner_path = self.root_path.owner_path();
self.inner.delete(&owner_path).await
}
/// The location in object storage for all files for this database, suitable for logging or
/// debugging purposes only. Do not parse this, as its format is subject to change!
pub fn debug_database_path(&self) -> String {

View File

@ -23,6 +23,7 @@ use observability_deps::tracing::{error, info, warn};
use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard};
use parquet_catalog::core::{PreservedCatalog, PreservedCatalogConfig};
use persistence_windows::checkpoint::ReplayPlan;
use rand::{thread_rng, Rng};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{future::Future, sync::Arc, time::Duration};
use time::Time;
@ -38,7 +39,7 @@ macro_rules! error_state {
DatabaseState::$variant(state, _) => state.clone(),
state => {
return InvalidState {
db_name: &$s.shared.config.name,
db_name: &$s.shared.config.read().name,
state: state.state_code(),
transition: $transition,
}
@ -49,6 +50,7 @@ macro_rules! error_state {
}
const INIT_BACKOFF: Duration = Duration::from_secs(1);
const MAX_BACKOFF: Duration = Duration::from_secs(500);
#[derive(Debug, Snafu)]
pub enum Error {
@ -176,7 +178,7 @@ impl Database {
);
let shared = Arc::new(DatabaseShared {
config,
config: RwLock::new(config),
application,
shutdown: Default::default(),
state: RwLock::new(Freezable::new(DatabaseState::Known(DatabaseStateKnown {}))),
@ -234,7 +236,7 @@ impl Database {
/// Release this database from this server.
pub async fn release(&self) -> Result<Uuid, Error> {
let db_name = &self.shared.config.name;
let db_name = self.name();
let handle = self.shared.state.read().freeze();
let handle = handle.await;
@ -350,13 +352,14 @@ impl Database {
/// Triggers shutdown of this `Database`
pub fn shutdown(&self) {
info!(db_name=%self.shared.config.name, "database shutting down");
let db_name = self.name();
info!(%db_name, "database shutting down");
self.shared.shutdown.cancel()
}
/// Triggers a restart of this `Database` and wait for it to re-initialize
pub async fn restart(&self) -> Result<(), Arc<InitError>> {
let db_name = &self.shared.config.name;
let db_name = self.name();
info!(%db_name, "restarting database");
let handle = self.shared.state.read().freeze();
@ -379,8 +382,8 @@ impl Database {
}
/// Returns the config of this database
pub fn config(&self) -> &DatabaseConfig {
&self.shared.config
pub fn config(&self) -> DatabaseConfig {
self.shared.config.read().clone()
}
/// Returns the initialization status of this database
@ -420,7 +423,12 @@ impl Database {
/// Location in object store; may not actually exist yet
pub fn location(&self) -> String {
self.shared.config.location.clone()
self.shared.config.read().location.clone()
}
/// Database name
pub fn name(&self) -> DatabaseName<'static> {
self.shared.config.read().name.clone()
}
/// Update the database rules, panic'ing if the state is invalid
@ -538,7 +546,7 @@ impl Database {
let handle = self.shared.state.read().freeze();
let handle = handle.await;
let db_name = &self.shared.config.name;
let db_name = self.name();
let current_state = error_state!(self, "WipePreservedCatalog", CatalogLoadError);
let registry = self.shared.application.job_registry();
@ -550,8 +558,6 @@ impl Database {
tokio::spawn(
async move {
let db_name = &shared.config.name;
PreservedCatalog::wipe(&current_state.iox_object_store)
.await
.map_err(Box::new)
@ -577,7 +583,7 @@ impl Database {
let handle = self.shared.state.read().freeze();
let handle = handle.await;
let db_name = &self.shared.config.name;
let db_name = self.name();
{
// If the force flag is not specified, can only rebuild
@ -600,15 +606,13 @@ impl Database {
let shared = Arc::clone(&self.shared);
let iox_object_store = self.iox_object_store().context(InvalidStateForRebuild {
db_name,
db_name: &db_name,
state: shared.state.read().state_code(),
expected: "Object store initialized",
})?;
tokio::spawn(
async move {
let db_name = &shared.config.name;
// shutdown / stop the DB if it is running so it can't
// be read / written to, while also preventing
// anything else from driving the state machine
@ -636,7 +640,7 @@ impl Database {
ensure!(
matches!(&**state, DatabaseState::Known(_)),
UnexpectedTransitionForRebuild {
db_name,
db_name: &db_name,
state: state.state_code()
}
);
@ -648,7 +652,7 @@ impl Database {
PreservedCatalog::wipe(iox_object_store.as_ref())
.await
.map_err(Box::new)
.context(WipePreservedCatalog { db_name })?;
.context(WipePreservedCatalog { db_name: &db_name })?;
let config = PreservedCatalogConfig::new(
Arc::clone(&iox_object_store),
@ -658,7 +662,7 @@ impl Database {
parquet_catalog::rebuild::rebuild_catalog(config, false)
.await
.map_err(Box::new)
.context(RebuildPreservedCatalog { db_name })?;
.context(RebuildPreservedCatalog { db_name: &db_name })?;
// Double check the state hasn't changed (we hold the
// freeze handle to make sure it does not)
@ -676,22 +680,22 @@ impl Database {
/// Recover from a ReplayError by skipping replay
pub async fn skip_replay(&self) -> Result<(), Error> {
let db_name = &self.shared.config.name;
error_state!(self, "SkipReplay", ReplayError);
let handle = self.shared.state.read().freeze();
let handle = handle.await;
self.shared.config.write().skip_replay = true;
let mut current_state = error_state!(self, "SkipReplay", ReplayError);
// wait for DB to leave a potential `ReplayError` state
loop {
// Register interest before checking to avoid race
let notify = self.shared.state_notify.notified();
current_state.replay_plan = Arc::new(None);
let current_state = current_state
.advance(self.shared.as_ref())
.await
.map_err(Box::new)
.context(SkipReplay { db_name })?;
match &**self.shared.state.read() {
DatabaseState::ReplayError(_, _) => {}
_ => break,
}
let mut state = self.shared.state.write();
*state.unfreeze(handle) = DatabaseState::Initialized(current_state);
notify.await;
}
Ok(())
}
@ -699,7 +703,7 @@ impl Database {
impl Drop for Database {
fn drop(&mut self) {
let db_name = &self.shared.config.name;
let db_name = self.name();
if !self.shared.shutdown.is_cancelled() {
warn!(%db_name, "database dropped without calling shutdown()");
self.shared.shutdown.cancel();
@ -715,7 +719,7 @@ impl Drop for Database {
#[derive(Debug)]
struct DatabaseShared {
/// Configuration provided to the database at startup
config: DatabaseConfig,
config: RwLock<DatabaseConfig>,
/// A token that is used to trigger shutdown of the background worker
shutdown: CancellationToken,
@ -734,7 +738,8 @@ struct DatabaseShared {
/// The background worker for `Database` - there should only ever be one
async fn background_worker(shared: Arc<DatabaseShared>) {
info!(db_name=%shared.config.name, "started database background worker");
let db_name = shared.config.read().name.clone();
info!(%db_name, "started database background worker");
// The background loop runs until `Database::shutdown` is called
while !shared.shutdown.is_cancelled() {
@ -742,7 +747,7 @@ async fn background_worker(shared: Arc<DatabaseShared>) {
if shared.shutdown.is_cancelled() {
// TODO: Shutdown intermediate workers (#2813)
info!(db_name=%shared.config.name, "database shutdown before finishing initialization");
info!(%db_name, "database shutdown before finishing initialization");
break;
}
@ -758,14 +763,9 @@ async fn background_worker(shared: Arc<DatabaseShared>) {
.expect("expected initialized")
.clone();
info!(db_name=%shared.config.name, "database finished initialization - starting Db worker");
info!(%db_name, "database finished initialization - starting Db worker");
crate::utils::panic_test(|| {
Some(format!(
"database background worker: {}",
shared.config.name
))
});
crate::utils::panic_test(|| Some(format!("database background worker: {}", db_name,)));
let db_shutdown = CancellationToken::new();
let db_worker = db.background_worker(db_shutdown.clone()).fuse();
@ -795,7 +795,7 @@ async fn background_worker(shared: Arc<DatabaseShared>) {
futures::pin_mut!(notify);
if shared.state.read().get_initialized().is_none() {
info!(db_name=%shared.config.name, "database no longer initialized");
info!(%db_name, "database no longer initialized");
break;
}
@ -809,44 +809,44 @@ async fn background_worker(shared: Arc<DatabaseShared>) {
_ = shutdown => info!("database shutting down"),
_ = notify => info!("notified of state change"),
_ = consumer_join => {
error!(db_name=%shared.config.name, "unexpected shutdown of write buffer consumer - bailing out");
error!(%db_name, "unexpected shutdown of write buffer consumer - bailing out");
shared.shutdown.cancel();
}
_ = lifecycle_join => {
error!(db_name=%shared.config.name, "unexpected shutdown of lifecycle worker - bailing out");
error!(%db_name, "unexpected shutdown of lifecycle worker - bailing out");
shared.shutdown.cancel();
}
_ = db_worker => {
error!(db_name=%shared.config.name, "unexpected shutdown of db - bailing out");
error!(%db_name, "unexpected shutdown of db - bailing out");
shared.shutdown.cancel();
}
}
}
if let Some(consumer) = write_buffer_consumer {
info!(db_name=%shared.config.name, "shutting down write buffer consumer");
info!(%db_name, "shutting down write buffer consumer");
consumer.shutdown();
if let Err(e) = consumer.join().await {
error!(db_name=%shared.config.name, %e, "error shutting down write buffer consumer")
error!(%db_name, %e, "error shutting down write buffer consumer")
}
}
if !lifecycle_join.is_terminated() {
info!(db_name=%shared.config.name, "shutting down lifecycle worker");
info!(%db_name, "shutting down lifecycle worker");
lifecycle_worker.shutdown();
if let Err(e) = lifecycle_worker.join().await {
error!(db_name=%shared.config.name, %e, "error shutting down lifecycle worker")
error!(%db_name, %e, "error shutting down lifecycle worker")
}
}
if !db_worker.is_terminated() {
info!(db_name=%shared.config.name, "waiting for db worker shutdown");
info!(%db_name, "waiting for db worker shutdown");
db_shutdown.cancel();
db_worker.await
}
}
info!(db_name=%shared.config.name, "draining tasks");
info!(%db_name, "draining tasks");
// Loop in case tasks are spawned during shutdown
loop {
@ -857,8 +857,8 @@ async fn background_worker(shared: Arc<DatabaseShared>) {
let mut futures: FuturesUnordered<_> = jobs
.iter()
.filter_map(|tracker| {
let db_name = tracker.metadata().db_name()?;
if db_name.as_ref() != shared.config.name.as_str() {
let db_name2 = tracker.metadata().db_name()?;
if db_name2.as_ref() != db_name.as_str() {
return None;
}
Some(tracker.join())
@ -869,72 +869,82 @@ async fn background_worker(shared: Arc<DatabaseShared>) {
break;
}
info!(db_name=%shared.config.name, count=futures.len(), "waiting for jobs");
info!(%db_name, count=futures.len(), "waiting for jobs");
while futures.next().await.is_some() {}
}
info!(db_name=%shared.config.name, "database worker finished");
info!(%db_name, "database worker finished");
}
/// Determine what the init loop should do next.
enum TransactionOrWait {
/// We can transition from one state into another.
Transaction(DatabaseState, internal_types::freezable::FreezeHandle),
/// We have to wait to backoff from an error.
Wait(Duration),
}
/// Try to drive the database to `DatabaseState::Initialized` returns when
/// this is achieved or the shutdown signal is triggered
async fn initialize_database(shared: &DatabaseShared) {
let db_name = &shared.config.name;
let db_name = shared.config.read().name.clone();
info!(%db_name, "database initialization started");
// error throttling
// - checks if the current error was already throttled
// - keeps a backoff duration that will change over the course of multiple errors
let mut throttled_error = false;
let mut backoff = INIT_BACKOFF;
while !shared.shutdown.is_cancelled() {
// Acquire locks and determine if work to be done
let maybe_transaction = {
// lock-dance to make this future `Send`
let handle = shared.state.read().freeze();
let handle = handle.await;
let state = shared.state.read();
match &**state {
// Already initialized
DatabaseState::Initialized(_) => break,
// Can perform work
DatabaseState::Known(_)
| DatabaseState::DatabaseObjectStoreFound(_)
| DatabaseState::OwnerInfoLoaded(_)
| DatabaseState::RulesLoaded(_)
| DatabaseState::CatalogLoaded(_)
| DatabaseState::WriteBufferCreationError(_, _) => {
match state.try_freeze() {
Some(handle) => Some((DatabaseState::clone(&state), handle)),
None => {
// Backoff if there is already an in-progress initialization action (e.g. recovery)
info!(%db_name, %state, "init transaction already in progress");
None
}
}
_ if state.error().is_none() || (state.error().is_some() && throttled_error) => {
TransactionOrWait::Transaction(DatabaseState::clone(&state), handle)
}
// No active database found, was probably deleted
DatabaseState::NoActiveDatabase(_, _) => {
info!(%db_name, "no active database found");
None
}
// Operator intervention required
DatabaseState::DatabaseObjectStoreLookupError(_, e)
| DatabaseState::OwnerInfoLoadError(_, e)
| DatabaseState::RulesLoadError(_, e)
| DatabaseState::CatalogLoadError(_, e)
| DatabaseState::ReplayError(_, e) => {
// Unthrottled error state, need to wait
_ => {
let e = state
.error()
.expect("How did we end up in a non-error state?");
error!(
%db_name,
%e,
%state,
"database in error state - operator intervention required"
"database in error state - wait until retry"
);
None
throttled_error = true;
// exponential backoff w/ jitter, decorrelated
// see https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
let mut rng = thread_rng();
backoff = Duration::from_secs_f64(MAX_BACKOFF.as_secs_f64().min(
rng.gen_range(INIT_BACKOFF.as_secs_f64()..(backoff.as_secs_f64() * 3.0)),
));
TransactionOrWait::Wait(backoff)
}
}
};
// Backoff if no work to be done
let (state, handle) = match maybe_transaction {
Some((state, handle)) => (state, handle),
None => {
TransactionOrWait::Transaction(state, handle) => (state, handle),
TransactionOrWait::Wait(d) => {
info!(%db_name, "backing off initialization");
tokio::time::sleep(INIT_BACKOFF).await;
tokio::time::sleep(d).await;
continue;
}
};
@ -943,40 +953,65 @@ async fn initialize_database(shared: &DatabaseShared) {
// Try to advance to the next state
let next_state = match state {
DatabaseState::Known(state) => match state.advance(shared).await {
DatabaseState::Known(state)
| DatabaseState::DatabaseObjectStoreLookupError(state, _)
| DatabaseState::NoActiveDatabase(state, _) => match state.advance(shared).await {
Ok(state) => DatabaseState::DatabaseObjectStoreFound(state),
Err(InitError::NoActiveDatabase) => {
DatabaseState::NoActiveDatabase(state, Arc::new(InitError::NoActiveDatabase))
}
Err(e) => DatabaseState::DatabaseObjectStoreLookupError(state, Arc::new(e)),
},
DatabaseState::DatabaseObjectStoreFound(state) => match state.advance(shared).await {
DatabaseState::DatabaseObjectStoreFound(state)
| DatabaseState::OwnerInfoLoadError(state, _) => match state.advance(shared).await {
Ok(state) => DatabaseState::OwnerInfoLoaded(state),
Err(e) => DatabaseState::OwnerInfoLoadError(state, Arc::new(e)),
},
DatabaseState::OwnerInfoLoaded(state) => match state.advance(shared).await {
Ok(state) => DatabaseState::RulesLoaded(state),
Err(e) => DatabaseState::RulesLoadError(state, Arc::new(e)),
},
DatabaseState::RulesLoaded(state) => match state.advance(shared).await {
Ok(state) => DatabaseState::CatalogLoaded(state),
Err(e) => DatabaseState::CatalogLoadError(state, Arc::new(e)),
},
DatabaseState::OwnerInfoLoaded(state) | DatabaseState::RulesLoadError(state, _) => {
match state.advance(shared).await {
Ok(state) => DatabaseState::RulesLoaded(state),
Err(e) => DatabaseState::RulesLoadError(state, Arc::new(e)),
}
}
DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => {
match state.advance(shared).await {
Ok(state) => DatabaseState::CatalogLoaded(state),
Err(e) => DatabaseState::CatalogLoadError(state, Arc::new(e)),
}
}
DatabaseState::CatalogLoaded(state)
| DatabaseState::WriteBufferCreationError(state, _) => {
match state.advance(shared).await {
Ok(state) => DatabaseState::Initialized(state),
Err(e @ InitError::CreateWriteBuffer { .. }) => {
info!(%db_name, %e, "cannot create write buffer, wait a bit and try again");
tokio::time::sleep(INIT_BACKOFF).await;
DatabaseState::WriteBufferCreationError(state, Arc::new(e))
}
Err(e) => DatabaseState::ReplayError(state, Arc::new(e)),
}
}
state => unreachable!("{:?}", state),
DatabaseState::ReplayError(state, _) => {
let state2 = state.rollback();
match state2.advance(shared).await {
Ok(state2) => match state2.advance(shared).await {
Ok(state2) => DatabaseState::Initialized(state2),
Err(e) => DatabaseState::ReplayError(state, Arc::new(e)),
},
Err(e) => DatabaseState::ReplayError(state, Arc::new(e)),
}
}
DatabaseState::Initialized(_) => {
unreachable!("{:?}", state)
}
};
if next_state.error().is_some() {
// this is a new error that needs to be throttled
throttled_error = false;
} else {
// reset backoff
backoff = INIT_BACKOFF;
}
// Commit the next state
{
let mut state = shared.state.write();
@ -1060,19 +1095,59 @@ pub enum InitError {
/// The Database startup state machine
///
/// A Database starts in DatabaseState::Known and advances through the
/// ```text
/// (start)
/// |
/// o-o o--------|-------------------o o-o
/// | V V V V V |
/// [NoActiveDatabase]<--[Known]------------->[DatabaseObjectStoreLookupError]
/// | | |
/// o----------------+---------------------------o
/// |
/// | o-o
/// V V |
/// [DatabaseObjectStoreFound]------>[OwnerInfoLoadError]
/// | |
/// +---------------------------o
/// |
/// | o-o
/// V V |
/// [OwnerInfoLoaded]----------->[RulesLoadError]
/// | |
/// +---------------------------o
/// |
/// | o-o
/// V V |
/// [RulesLoaded]-------------->[CatalogLoadError]
/// | |
/// +---------------------------o
/// |
/// | o-o
/// V V |
/// [CatalogLoaded]---------->[WriteBufferCreationError]
/// | | | |
/// | | | | o-o
/// | | | V V |
/// | o---------------|-->[ReplayError]
/// | | |
/// +--------------------+-------o
/// |
/// |
/// V
/// [Initialized]
/// |
/// V
/// (end)
/// ```
///
/// A Database starts in [`DatabaseState::Known`] and advances through the
/// non error states in sequential order until either:
///
/// 1. It reaches `Initialized`
///
/// 2. It is reset to `Known` and starts initialization again
///
/// 3. An error is encountered, in which case it transitions to one of
/// the error states. Most are Terminal (and thus require operator
/// intervention) but some (such as `WriteBufferCreationError`) may
/// resolve after some time to the basic initialization sequence
/// (e.g. `Initialized`)
///
/// 1. It reaches [`DatabaseState::Initialized`]: Database is initialized
/// 2. An error is encountered, in which case it transitions to one of
/// the error states. We try to recover from all of them. For all except [`DatabaseState::ReplayError`] this is a
/// rather cheap operation since we can just retry the actual operation. For [`DatabaseState::ReplayError`] we need
/// to dump the potentially half-modified in-memory catalog before retrying.
#[derive(Debug, Clone)]
enum DatabaseState {
// Basic initialization sequence states:
@ -1081,25 +1156,17 @@ enum DatabaseState {
OwnerInfoLoaded(DatabaseStateOwnerInfoLoaded),
RulesLoaded(DatabaseStateRulesLoaded),
CatalogLoaded(DatabaseStateCatalogLoaded),
// Terminal state (success)
Initialized(DatabaseStateInitialized),
// Error states
/// Terminal State
DatabaseObjectStoreLookupError(DatabaseStateKnown, Arc<InitError>),
/// Terminal State
// Error states, we'll try to recover from them
NoActiveDatabase(DatabaseStateKnown, Arc<InitError>),
/// Terminal State
DatabaseObjectStoreLookupError(DatabaseStateKnown, Arc<InitError>),
OwnerInfoLoadError(DatabaseStateDatabaseObjectStoreFound, Arc<InitError>),
/// Terminal State
RulesLoadError(DatabaseStateOwnerInfoLoaded, Arc<InitError>),
/// Terminal State
CatalogLoadError(DatabaseStateRulesLoaded, Arc<InitError>),
/// Non Terminal State: There was an error creating a connction to
/// the WriteBuffer, but the connection will be retried. If a
/// connection is successfully created, the database will
/// transition to `Initialized`
WriteBufferCreationError(DatabaseStateCatalogLoaded, Arc<InitError>),
/// Terminal State
ReplayError(DatabaseStateCatalogLoaded, Arc<InitError>),
}
@ -1254,9 +1321,10 @@ impl DatabaseStateKnown {
&self,
shared: &DatabaseShared,
) -> Result<DatabaseStateDatabaseObjectStoreFound, InitError> {
let location = shared.config.read().location.clone();
let iox_object_store = IoxObjectStore::load_at_root_path(
Arc::clone(shared.application.object_store()),
&shared.config.location,
&location,
)
.await
.context(DatabaseObjectStoreLookup)?;
@ -1282,10 +1350,11 @@ impl DatabaseStateDatabaseObjectStoreFound {
.await
.context(FetchingOwnerInfo)?;
if owner_info.id != shared.config.server_id.get_u32() {
let server_id = shared.config.read().server_id.get_u32();
if owner_info.id != server_id {
return DatabaseOwnerMismatch {
actual: owner_info.id,
expected: shared.config.server_id.get_u32(),
expected: server_id,
}
.fail();
}
@ -1430,10 +1499,11 @@ impl DatabaseStateOwnerInfoLoaded {
.await
.context(LoadingRules)?;
if rules.db_name() != &shared.config.name {
let db_name = shared.config.read().name.clone();
if rules.db_name() != &db_name {
return RulesDatabaseNameMismatch {
actual: rules.db_name(),
expected: shared.config.name.as_str(),
expected: db_name.as_str(),
}
.fail();
}
@ -1461,19 +1531,28 @@ impl DatabaseStateRulesLoaded {
&self,
shared: &DatabaseShared,
) -> Result<DatabaseStateCatalogLoaded, InitError> {
let (db_name, wipe_catalog_on_error, skip_replay, server_id) = {
let config = shared.config.read();
(
config.name.clone(),
config.wipe_catalog_on_error,
config.skip_replay,
config.server_id,
)
};
let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog(
shared.config.name.as_str(),
db_name.as_str(),
Arc::clone(&self.iox_object_store),
Arc::clone(shared.application.metric_registry()),
Arc::clone(shared.application.time_provider()),
shared.config.wipe_catalog_on_error,
shared.config.skip_replay,
wipe_catalog_on_error,
skip_replay,
)
.await
.context(CatalogLoad)?;
let database_to_commit = DatabaseToCommit {
server_id: shared.config.server_id,
server_id,
iox_object_store: Arc::clone(&self.iox_object_store),
exec: Arc::clone(shared.application.executor()),
rules: Arc::clone(self.provided_rules.rules()),
@ -1522,19 +1601,29 @@ impl DatabaseStateCatalogLoaded {
let rules = self.provided_rules.rules();
let trace_collector = shared.application.trace_collector();
let write_buffer_factory = shared.application.write_buffer_factory();
let (db_name, server_id, skip_replay) = {
let config = shared.config.read();
(config.name.clone(), config.server_id, config.skip_replay)
};
let write_buffer_consumer = match rules.write_buffer_connection.as_ref() {
Some(connection) => {
let mut consumer = write_buffer_factory
.new_config_read(
shared.config.server_id,
shared.config.name.as_str(),
server_id,
db_name.as_str(),
trace_collector.as_ref(),
connection,
)
.await
.context(CreateWriteBuffer)?;
db.perform_replay(self.replay_plan.as_ref().as_ref(), consumer.as_mut())
let replay_plan = if skip_replay {
None
} else {
self.replay_plan.as_ref().as_ref()
};
db.perform_replay(replay_plan, consumer.as_mut())
.await
.context(Replay)?;
@ -1558,6 +1647,17 @@ impl DatabaseStateCatalogLoaded {
owner_info: self.owner_info.clone(),
})
}
/// Rolls back state to an unloaded catalog.
fn rollback(&self) -> DatabaseStateRulesLoaded {
warn!(db_name=%self.db.name(), "throwing away loaded catalog to recover from replay error");
DatabaseStateRulesLoaded {
provided_rules: Arc::clone(&self.provided_rules),
uuid: self.uuid,
owner_info: self.owner_info.clone(),
iox_object_store: self.db.iox_object_store(),
}
}
}
#[derive(Debug, Clone)]
@ -1718,7 +1818,7 @@ mod tests {
#[tokio::test]
async fn database_release() {
let (application, database) = initialized_database().await;
let server_id = database.shared.config.server_id;
let server_id = database.shared.config.read().server_id;
let server_location =
IoxObjectStore::server_config_path(application.object_store(), server_id).to_string();
let iox_object_store = database.iox_object_store().unwrap();
@ -1744,8 +1844,8 @@ mod tests {
#[tokio::test]
async fn database_claim() {
let (application, database) = initialized_database().await;
let db_name = &database.shared.config.name;
let server_id = database.shared.config.server_id;
let db_name = &database.shared.config.read().name.clone();
let server_id = database.shared.config.read().server_id;
let server_location =
IoxObjectStore::server_config_path(application.object_store(), server_id).to_string();
let iox_object_store = database.iox_object_store().unwrap();
@ -1755,16 +1855,23 @@ mod tests {
.to_string();
let uuid = database.release().await.unwrap();
Database::claim(application, db_name, uuid, new_server_id, false)
.await
.unwrap();
// database is in error state
assert_eq!(database.state_code(), DatabaseStateCode::NoActiveDatabase);
assert!(matches!(
database.init_error().unwrap().as_ref(),
InitError::NoActiveDatabase
));
Database::claim(
Arc::clone(&application),
db_name,
uuid,
new_server_id,
false,
)
.await
.unwrap();
let owner_info = fetch_owner_info(&iox_object_store).await.unwrap();
assert_eq!(owner_info.id, new_server_id.get_u32());
assert_eq!(owner_info.location, new_server_location);
@ -1777,6 +1884,31 @@ mod tests {
let claim_transaction = &owner_info.transactions[1];
assert_eq!(claim_transaction.id, 0);
assert_eq!(claim_transaction.location, "");
// put it back to first DB
let db_config = DatabaseConfig {
server_id: new_server_id,
..database.shared.config.read().clone()
};
let new_database = Database::new(Arc::clone(&application), db_config.clone());
new_database.wait_for_init().await.unwrap();
new_database.release().await.unwrap();
Database::claim(application, db_name, uuid, server_id, false)
.await
.unwrap();
// database should recover
tokio::time::timeout(Duration::from_secs(10), async move {
loop {
if database.wait_for_init().await.is_ok() {
return;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
.await
.unwrap();
}
#[tokio::test]
@ -1985,6 +2117,41 @@ mod tests {
.unwrap();
}
#[tokio::test]
async fn database_init_recovery() {
let (application, database) = initialized_database().await;
let iox_object_store = database.iox_object_store().unwrap();
let config = database.shared.config.read().clone();
// shutdown first database
database.shutdown();
database.join().await.unwrap();
// mess up owner file
let owner_backup = iox_object_store.get_owner_file().await.unwrap();
iox_object_store
.delete_owner_file_for_testing()
.await
.unwrap();
// create second database
let database = Database::new(Arc::clone(&application), config);
database.wait_for_init().await.unwrap_err();
// recover database by fixing owner file
iox_object_store.put_owner_file(owner_backup).await.unwrap();
tokio::time::timeout(Duration::from_secs(10), async move {
loop {
if database.wait_for_init().await.is_ok() {
return;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.unwrap();
}
/// Normally database rules are provided as grpc messages, but in
/// tests they are constructed from database rules structures
/// themselves.