refactor: rework DB init state machine

Since adding new features like "sequencer replay" or init retries would
make the current code too complex, a refactor is required:

Config:
The config struct now holds a `DatabaseState` which is a simple linear
state machine representing the different stages of the database init.

Init:
The init module now has a fixpoint-loop which looks at the state,
decides what to do based on it and repeats until either the DB is
initialized or an error occured. This also makes it easier to continue
the init process "in the middle", e.g. when the preserved catalog is
broken or the sequencer (e.g. Kafka) could not be reached.
pull/24376/head
Marco Neumann 2021-07-01 13:47:51 +02:00
parent 8174af9137
commit e1e3163752
3 changed files with 782 additions and 515 deletions

File diff suppressed because it is too large Load Diff

View File

@ -22,8 +22,11 @@ use std::{
use tokio::sync::Semaphore;
use crate::{
config::{Config, DB_RULES_FILE_NAME},
db::{load::load_or_create_preserved_catalog, DatabaseToCommit},
config::{
object_store_path_for_database_config, Config, DatabaseHandle, DatabaseStateCode,
DB_RULES_FILE_NAME,
},
db::load::load_or_create_preserved_catalog,
write_buffer, DatabaseError,
};
@ -54,14 +57,14 @@ pub enum Error {
#[snafu(display("store error: {}", source))]
StoreError { source: object_store::Error },
#[snafu(display("Cannot create DB: {}", source))]
CreateDbError { source: Box<crate::Error> },
#[snafu(display("Cannot recover DB: {}", source))]
RecoverDbError {
source: Arc<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("Cannot init DB: {}", source))]
InitDbError { source: Box<crate::Error> },
#[snafu(display("Cannot wipe preserved catalog DB: {}", source))]
PreservedCatalogWipeError {
source: Box<parquet_file::catalog::Error>,
@ -74,6 +77,12 @@ pub enum Error {
#[snafu(display("Cannot create write buffer for writing: {}", source))]
CreateWriteBufferForWriting { source: DatabaseError },
#[snafu(display(
"Cannot wipe catalog because DB init progress has already read it: {}",
db_name
))]
DbPartiallyInitialized { db_name: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -243,13 +252,12 @@ impl InitStatus {
exec: Arc<Executor>,
server_id: ServerId,
) -> Result<()> {
let root = self.root_path(&store)?;
// get the database names from the object store prefixes
// TODO: update object store to pull back all common prefixes by
// following the next tokens.
let list_result = store
.list_with_delimiter(&self.root_path(&store)?)
.await
.context(StoreError)?;
let list_result = store.list_with_delimiter(&root).await.context(StoreError)?;
let handles: Vec<_> = list_result
.common_prefixes
@ -260,26 +268,32 @@ impl InitStatus {
let exec = Arc::clone(&exec);
let errors_databases = Arc::clone(&self.errors_databases);
let wipe_on_error = self.wipe_on_error.load(Ordering::Relaxed);
let root = root.clone();
path.set_file_name(DB_RULES_FILE_NAME);
match db_name_from_rules_path(&path) {
Ok(db_name) => {
let handle = tokio::task::spawn(async move {
if let Err(e) = Self::initialize_database(
match Self::initialize_database(
server_id,
store,
config,
exec,
path,
root,
db_name.clone(),
wipe_on_error,
)
.await
{
error!(%e, "cannot load database");
let mut guard = errors_databases.lock();
guard.insert(db_name.to_string(), Arc::new(e));
Ok(()) => {
info!(%db_name, "database initialized");
}
Err(e) => {
error!(%e, %db_name, "cannot load database");
let mut guard = errors_databases.lock();
guard.insert(db_name.to_string(), Arc::new(e));
}
}
});
Some(handle)
@ -302,71 +316,36 @@ impl InitStatus {
store: Arc<ObjectStore>,
config: Arc<Config>,
exec: Arc<Executor>,
path: Path,
root: Path,
db_name: DatabaseName<'static>,
wipe_on_error: bool,
) -> Result<()> {
// Reserve name before expensive IO (e.g. loading the preserved catalog)
let handle = config
.create_db(db_name)
let mut handle = config
.create_db(store, exec, server_id, db_name)
.map_err(Box::new)
.context(CreateDbError)?;
.context(InitDbError)?;
let metrics_registry = config.metrics_registry();
match Self::load_database_rules(Arc::clone(&store), path).await {
Ok(Some(rules)) => {
// loaded rules, continue w/ preserved catalog
match load_or_create_preserved_catalog(
rules.db_name(),
Arc::clone(&store),
server_id,
metrics_registry,
wipe_on_error,
)
.await
.map_err(|e| Box::new(e) as _)
.context(CatalogLoadError)
{
Ok((preserved_catalog, catalog)) => {
let write_buffer =
write_buffer::new(&rules).context(CreateWriteBufferForWriting)?;
let database_to_commit = DatabaseToCommit {
server_id,
object_store: store,
exec,
preserved_catalog,
catalog,
rules,
write_buffer,
};
// everything is there, can create DB
handle
.commit_db(database_to_commit)
.map_err(Box::new)
.context(CreateDbError)?;
Ok(())
}
Err(e) => {
// catalog loading failed, at least remember the rules we have loaded
handle
.commit_rules_only(rules)
.map_err(Box::new)
.context(CreateDbError)?;
Err(e)
}
}
match Self::try_advance_database_init_process_until_complete(
&mut handle,
&root,
wipe_on_error,
)
.await
{
Ok(true) => {
// finished init and keep DB
handle.commit();
Ok(())
}
Ok(None) => {
// no DB there, drop handle to initiate rollback
drop(handle);
Ok(false) => {
// finished but do not keep DB
handle.abort();
Ok(())
}
Err(e) => {
// abort transaction but keep DB registered
handle.commit_no_rules();
// encountered some error, still commit intermediate result
handle.commit();
Err(e)
}
}
@ -403,67 +382,60 @@ impl InitStatus {
&self,
store: Arc<ObjectStore>,
config: Arc<Config>,
exec: Arc<Executor>,
server_id: ServerId,
db_name: DatabaseName<'static>,
) -> Result<()> {
if config.has_uninitialized_database(&db_name) {
let handle = config
let mut handle = config
.recover_db(db_name.clone())
.map_err(|e| Arc::new(e) as _)
.context(RecoverDbError)?;
PreservedCatalog::wipe(&store, server_id, &db_name)
if !((handle.state_code() == DatabaseStateCode::Known)
|| (handle.state_code() == DatabaseStateCode::RulesLoaded))
{
// cannot wipe because init state is already too far
return Err(Error::DbPartiallyInitialized {
db_name: db_name.to_string(),
});
}
// wipe while holding handle so no other init/wipe process can interact with the catalog
PreservedCatalog::wipe(&store, handle.server_id(), &db_name)
.await
.map_err(Box::new)
.context(PreservedCatalogWipeError)?;
if handle.has_rules() {
// can recover
let metrics_registry = config.metrics_registry();
let wipe_on_error = self.wipe_on_error.load(Ordering::Relaxed);
let root = self.root_path(&store)?;
let wipe_on_error = self.wipe_on_error.load(Ordering::Relaxed);
match Self::try_advance_database_init_process_until_complete(
&mut handle,
&root,
wipe_on_error,
)
.await
{
Ok(_) => {
// yeah, recovered DB
handle.commit();
match load_or_create_preserved_catalog(
&db_name,
Arc::clone(&store),
server_id,
metrics_registry,
wipe_on_error,
)
.await
.map_err(|e| Box::new(e) as _)
.context(CatalogLoadError)
{
Ok((preserved_catalog, catalog)) => {
handle
.commit_db(server_id, store, exec, preserved_catalog, catalog, None)
.map_err(|e | {
warn!(%db_name, %e, "wiped preserved catalog of registered database but still cannot recover");
Box::new(e)
})
.context(CreateDbError)?;
let mut guard = self.errors_databases.lock();
guard.remove(&db_name.to_string());
let mut guard = self.errors_databases.lock();
guard.remove(&db_name.to_string());
info!(%db_name, "wiped preserved catalog of registered database and recovered");
Ok(())
}
Err(e) => {
let mut guard = self.errors_databases.lock();
let e = Arc::new(e);
guard.insert(db_name.to_string(), Arc::clone(&e));
drop(handle);
warn!(%db_name, %e, "wiped preserved catalog of registered database but still cannot recover");
Err(Error::RecoverDbError { source: e })
}
info!(%db_name, "wiped preserved catalog of registered database and recovered");
Ok(())
}
} else {
// cannot recover, don't have any rules (yet)
handle.abort();
Err(e) => {
// could not recover, but still keep new result
handle.commit();
info!(%db_name, "wiped preserved catalog of registered database but do not have rules ready to recover");
Ok(())
let mut guard = self.errors_databases.lock();
let e = Arc::new(e);
guard.insert(db_name.to_string(), Arc::clone(&e));
warn!(%db_name, %e, "wiped preserved catalog of registered database but still cannot recover");
Err(Error::RecoverDbError { source: e })
}
}
} else {
let handle = config
@ -482,6 +454,95 @@ impl InitStatus {
Ok(())
}
}
/// Try to make as much progress as possible with DB init.
///
/// Returns an error if there was an error along the way (in which case the handle should still be commit to safe
/// the intermediate result). Returns `Ok(true)` if DB init is finished and `Ok(false)` if the DB can be forgotten
/// (e.g. because not rules file is present.)
async fn try_advance_database_init_process_until_complete(
handle: &mut DatabaseHandle<'_>,
root: &Path,
wipe_on_error: bool,
) -> Result<bool> {
loop {
match Self::try_advance_database_init_process(handle, root, wipe_on_error).await? {
InitProgress::Unfinished => {}
InitProgress::Done => {
return Ok(true);
}
InitProgress::Forget => {
return Ok(false);
}
}
}
}
/// Try to make some progress in the DB init.
async fn try_advance_database_init_process(
handle: &mut DatabaseHandle<'_>,
root: &Path,
wipe_on_error: bool,
) -> Result<InitProgress> {
match handle.state_code() {
DatabaseStateCode::Known => {
// known => load DB rules
let path = object_store_path_for_database_config(root, &handle.db_name());
match Self::load_database_rules(handle.object_store(), path).await? {
Some(rules) => {
handle
.advance_rules_loaded(rules)
.map_err(Box::new)
.context(InitDbError)?;
// there is still more work to do for this DB
Ok(InitProgress::Unfinished)
}
None => {
// no rules file present, advice to forget his DB
Ok(InitProgress::Forget)
}
}
}
DatabaseStateCode::RulesLoaded => {
// rules already loaded => continue with loading preserved catalog
let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
&handle.db_name(),
handle.object_store(),
handle.server_id(),
handle.metrics_registry(),
wipe_on_error,
)
.await
.map_err(|e| Box::new(e) as _)
.context(CatalogLoadError)?;
let rules = handle
.rules()
.expect("in this state rules should be loaded");
let write_buffer =
write_buffer::new(&rules).context(CreateWriteBufferForWriting)?;
handle
.advance_init(preserved_catalog, catalog, write_buffer)
.map_err(Box::new)
.context(InitDbError)?;
// there is still more work to do for this DB
Ok(InitProgress::Unfinished)
}
DatabaseStateCode::Initialized => {
// database fully initialized => nothing to do
Ok(InitProgress::Done)
}
}
}
}
enum InitProgress {
Unfinished,
Done,
Forget,
}
// get bytes from the location in object store

View File

@ -73,7 +73,8 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::BytesMut;
use db::{load::create_preserved_catalog, DatabaseToCommit};
use config::DatabaseStateCode;
use db::load::create_preserved_catalog;
use init::InitStatus;
use observability_deps::tracing::{debug, info, warn};
use parking_lot::Mutex;
@ -161,9 +162,12 @@ pub enum Error {
#[snafu(display("store error: {}", source))]
StoreError { source: object_store::Error },
#[snafu(display("database already exists"))]
#[snafu(display("database already exists: {}", db_name))]
DatabaseAlreadyExists { db_name: String },
#[snafu(display("database currently reserved: {}", db_name))]
DatabaseReserved { db_name: String },
#[snafu(display("no rules loaded for database: {}", db_name))]
NoRulesLoaded { db_name: String },
@ -210,6 +214,19 @@ pub enum Error {
#[snafu(display("cannot create write buffer for writing: {}", source))]
CreatingWriteBufferForWriting { source: DatabaseError },
#[snafu(display(
"Invalid database state transition, expected {:?} but got {:?}",
expected,
actual
))]
InvalidDatabaseStateTransition {
actual: DatabaseStateCode,
expected: DatabaseStateCode,
},
#[snafu(display("server is shutting down"))]
ServerShuttingDown,
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -487,9 +504,17 @@ where
let server_id = self.require_initialized()?;
// Reserve name before expensive IO (e.g. loading the preserved catalog)
let db_reservation = self.config.create_db(rules.name.clone())?;
self.persist_database_rules(rules.clone()).await?;
let mut db_reservation = self.config.create_db(
Arc::clone(&self.store),
Arc::clone(&self.exec),
server_id,
rules.name.clone(),
)?;
// register rules
db_reservation.advance_rules_loaded(rules.clone())?;
// load preserved catalog
let (preserved_catalog, catalog) = create_preserved_catalog(
rules.db_name(),
Arc::clone(&self.store),
@ -499,21 +524,13 @@ where
.await
.map_err(|e| Box::new(e) as _)
.context(CannotCreatePreservedCatalog)?;
let write_buffer = write_buffer::new(&rules)
.map_err(|e| Error::CreatingWriteBufferForWriting { source: e })?;
db_reservation.advance_init(preserved_catalog, catalog, write_buffer)?;
let database_to_commit = DatabaseToCommit {
server_id,
object_store: Arc::clone(&self.store),
exec: Arc::clone(&self.exec),
preserved_catalog,
catalog,
rules,
write_buffer,
};
db_reservation.commit_db(database_to_commit)?;
// ready to commit
self.persist_database_rules(rules.clone()).await?;
db_reservation.commit();
Ok(())
}
@ -854,18 +871,11 @@ where
});
let object_store = Arc::clone(&self.store);
let config = Arc::clone(&self.config);
let exec = Arc::clone(&self.exec);
let server_id = self.require_id()?;
let init_status = Arc::clone(&self.init_status);
let task = async move {
init_status
.wipe_preserved_catalog_and_maybe_recover(
object_store,
config,
exec,
server_id,
db_name,
)
.wipe_preserved_catalog_and_maybe_recover(object_store, config, server_id, db_name)
.await
};
tokio::spawn(task.track(registration));
@ -1918,7 +1928,7 @@ mod tests {
// creating failed DBs does not work
let err = create_simple_database(&server, "bar").await.unwrap_err();
assert_eq!(err.to_string(), "database already exists");
assert_eq!(err.to_string(), "database already exists: bar");
}
#[tokio::test]
@ -2015,7 +2025,7 @@ mod tests {
.wipe_preserved_catalog(db_name_existing.clone())
.unwrap_err()
.to_string(),
"database already exists"
"database already exists: db_existing"
);
assert!(PreservedCatalog::exists(
&server.store,
@ -2112,7 +2122,7 @@ mod tests {
.wipe_preserved_catalog(db_name_created.clone())
.unwrap_err()
.to_string(),
"database already exists"
"database already exists: db_created"
);
assert!(PreservedCatalog::exists(
&server.store,