Merge pull request #1867 from influxdata/crepererum/rework_db_init_state_machine

refactor: rework DB init state machine
pull/24376/head
kodiakhq[bot] 2021-07-01 15:31:10 +00:00 committed by GitHub
commit 26167a9e70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 782 additions and 515 deletions

File diff suppressed because it is too large Load Diff

View File

@ -22,8 +22,11 @@ use std::{
use tokio::sync::Semaphore; use tokio::sync::Semaphore;
use crate::{ use crate::{
config::{Config, DB_RULES_FILE_NAME}, config::{
db::{load::load_or_create_preserved_catalog, DatabaseToCommit}, object_store_path_for_database_config, Config, DatabaseHandle, DatabaseStateCode,
DB_RULES_FILE_NAME,
},
db::load::load_or_create_preserved_catalog,
write_buffer, DatabaseError, write_buffer, DatabaseError,
}; };
@ -54,14 +57,14 @@ pub enum Error {
#[snafu(display("store error: {}", source))] #[snafu(display("store error: {}", source))]
StoreError { source: object_store::Error }, StoreError { source: object_store::Error },
#[snafu(display("Cannot create DB: {}", source))]
CreateDbError { source: Box<crate::Error> },
#[snafu(display("Cannot recover DB: {}", source))] #[snafu(display("Cannot recover DB: {}", source))]
RecoverDbError { RecoverDbError {
source: Arc<dyn std::error::Error + Send + Sync>, source: Arc<dyn std::error::Error + Send + Sync>,
}, },
#[snafu(display("Cannot init DB: {}", source))]
InitDbError { source: Box<crate::Error> },
#[snafu(display("Cannot wipe preserved catalog DB: {}", source))] #[snafu(display("Cannot wipe preserved catalog DB: {}", source))]
PreservedCatalogWipeError { PreservedCatalogWipeError {
source: Box<parquet_file::catalog::Error>, source: Box<parquet_file::catalog::Error>,
@ -74,6 +77,12 @@ pub enum Error {
#[snafu(display("Cannot create write buffer for writing: {}", source))] #[snafu(display("Cannot create write buffer for writing: {}", source))]
CreateWriteBufferForWriting { source: DatabaseError }, CreateWriteBufferForWriting { source: DatabaseError },
#[snafu(display(
"Cannot wipe catalog because DB init progress has already read it: {}",
db_name
))]
DbPartiallyInitialized { db_name: String },
} }
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -243,13 +252,12 @@ impl InitStatus {
exec: Arc<Executor>, exec: Arc<Executor>,
server_id: ServerId, server_id: ServerId,
) -> Result<()> { ) -> Result<()> {
let root = self.root_path(&store)?;
// get the database names from the object store prefixes // get the database names from the object store prefixes
// TODO: update object store to pull back all common prefixes by // TODO: update object store to pull back all common prefixes by
// following the next tokens. // following the next tokens.
let list_result = store let list_result = store.list_with_delimiter(&root).await.context(StoreError)?;
.list_with_delimiter(&self.root_path(&store)?)
.await
.context(StoreError)?;
let handles: Vec<_> = list_result let handles: Vec<_> = list_result
.common_prefixes .common_prefixes
@ -260,26 +268,32 @@ impl InitStatus {
let exec = Arc::clone(&exec); let exec = Arc::clone(&exec);
let errors_databases = Arc::clone(&self.errors_databases); let errors_databases = Arc::clone(&self.errors_databases);
let wipe_on_error = self.wipe_on_error.load(Ordering::Relaxed); let wipe_on_error = self.wipe_on_error.load(Ordering::Relaxed);
let root = root.clone();
path.set_file_name(DB_RULES_FILE_NAME); path.set_file_name(DB_RULES_FILE_NAME);
match db_name_from_rules_path(&path) { match db_name_from_rules_path(&path) {
Ok(db_name) => { Ok(db_name) => {
let handle = tokio::task::spawn(async move { let handle = tokio::task::spawn(async move {
if let Err(e) = Self::initialize_database( match Self::initialize_database(
server_id, server_id,
store, store,
config, config,
exec, exec,
path, root,
db_name.clone(), db_name.clone(),
wipe_on_error, wipe_on_error,
) )
.await .await
{ {
error!(%e, "cannot load database"); Ok(()) => {
let mut guard = errors_databases.lock(); info!(%db_name, "database initialized");
guard.insert(db_name.to_string(), Arc::new(e)); }
Err(e) => {
error!(%e, %db_name, "cannot load database");
let mut guard = errors_databases.lock();
guard.insert(db_name.to_string(), Arc::new(e));
}
} }
}); });
Some(handle) Some(handle)
@ -302,71 +316,36 @@ impl InitStatus {
store: Arc<ObjectStore>, store: Arc<ObjectStore>,
config: Arc<Config>, config: Arc<Config>,
exec: Arc<Executor>, exec: Arc<Executor>,
path: Path, root: Path,
db_name: DatabaseName<'static>, db_name: DatabaseName<'static>,
wipe_on_error: bool, wipe_on_error: bool,
) -> Result<()> { ) -> Result<()> {
// Reserve name before expensive IO (e.g. loading the preserved catalog) // Reserve name before expensive IO (e.g. loading the preserved catalog)
let handle = config let mut handle = config
.create_db(db_name) .create_db(store, exec, server_id, db_name)
.map_err(Box::new) .map_err(Box::new)
.context(CreateDbError)?; .context(InitDbError)?;
let metrics_registry = config.metrics_registry(); match Self::try_advance_database_init_process_until_complete(
&mut handle,
match Self::load_database_rules(Arc::clone(&store), path).await { &root,
Ok(Some(rules)) => { wipe_on_error,
// loaded rules, continue w/ preserved catalog )
match load_or_create_preserved_catalog( .await
rules.db_name(), {
Arc::clone(&store), Ok(true) => {
server_id, // finished init and keep DB
metrics_registry, handle.commit();
wipe_on_error, Ok(())
)
.await
.map_err(|e| Box::new(e) as _)
.context(CatalogLoadError)
{
Ok((preserved_catalog, catalog)) => {
let write_buffer =
write_buffer::new(&rules).context(CreateWriteBufferForWriting)?;
let database_to_commit = DatabaseToCommit {
server_id,
object_store: store,
exec,
preserved_catalog,
catalog,
rules,
write_buffer,
};
// everything is there, can create DB
handle
.commit_db(database_to_commit)
.map_err(Box::new)
.context(CreateDbError)?;
Ok(())
}
Err(e) => {
// catalog loading failed, at least remember the rules we have loaded
handle
.commit_rules_only(rules)
.map_err(Box::new)
.context(CreateDbError)?;
Err(e)
}
}
} }
Ok(None) => { Ok(false) => {
// no DB there, drop handle to initiate rollback // finished but do not keep DB
drop(handle); handle.abort();
Ok(()) Ok(())
} }
Err(e) => { Err(e) => {
// abort transaction but keep DB registered // encountered some error, still commit intermediate result
handle.commit_no_rules(); handle.commit();
Err(e) Err(e)
} }
} }
@ -403,67 +382,60 @@ impl InitStatus {
&self, &self,
store: Arc<ObjectStore>, store: Arc<ObjectStore>,
config: Arc<Config>, config: Arc<Config>,
exec: Arc<Executor>,
server_id: ServerId, server_id: ServerId,
db_name: DatabaseName<'static>, db_name: DatabaseName<'static>,
) -> Result<()> { ) -> Result<()> {
if config.has_uninitialized_database(&db_name) { if config.has_uninitialized_database(&db_name) {
let handle = config let mut handle = config
.recover_db(db_name.clone()) .recover_db(db_name.clone())
.map_err(|e| Arc::new(e) as _) .map_err(|e| Arc::new(e) as _)
.context(RecoverDbError)?; .context(RecoverDbError)?;
PreservedCatalog::wipe(&store, server_id, &db_name) if !((handle.state_code() == DatabaseStateCode::Known)
|| (handle.state_code() == DatabaseStateCode::RulesLoaded))
{
// cannot wipe because init state is already too far
return Err(Error::DbPartiallyInitialized {
db_name: db_name.to_string(),
});
}
// wipe while holding handle so no other init/wipe process can interact with the catalog
PreservedCatalog::wipe(&store, handle.server_id(), &db_name)
.await .await
.map_err(Box::new) .map_err(Box::new)
.context(PreservedCatalogWipeError)?; .context(PreservedCatalogWipeError)?;
if handle.has_rules() { let root = self.root_path(&store)?;
// can recover let wipe_on_error = self.wipe_on_error.load(Ordering::Relaxed);
let metrics_registry = config.metrics_registry(); match Self::try_advance_database_init_process_until_complete(
let wipe_on_error = self.wipe_on_error.load(Ordering::Relaxed); &mut handle,
&root,
wipe_on_error,
)
.await
{
Ok(_) => {
// yeah, recovered DB
handle.commit();
match load_or_create_preserved_catalog( let mut guard = self.errors_databases.lock();
&db_name, guard.remove(&db_name.to_string());
Arc::clone(&store),
server_id,
metrics_registry,
wipe_on_error,
)
.await
.map_err(|e| Box::new(e) as _)
.context(CatalogLoadError)
{
Ok((preserved_catalog, catalog)) => {
handle
.commit_db(server_id, store, exec, preserved_catalog, catalog, None)
.map_err(|e | {
warn!(%db_name, %e, "wiped preserved catalog of registered database but still cannot recover");
Box::new(e)
})
.context(CreateDbError)?;
let mut guard = self.errors_databases.lock();
guard.remove(&db_name.to_string());
info!(%db_name, "wiped preserved catalog of registered database and recovered"); info!(%db_name, "wiped preserved catalog of registered database and recovered");
Ok(()) Ok(())
}
Err(e) => {
let mut guard = self.errors_databases.lock();
let e = Arc::new(e);
guard.insert(db_name.to_string(), Arc::clone(&e));
drop(handle);
warn!(%db_name, %e, "wiped preserved catalog of registered database but still cannot recover");
Err(Error::RecoverDbError { source: e })
}
} }
} else { Err(e) => {
// cannot recover, don't have any rules (yet) // could not recover, but still keep new result
handle.abort(); handle.commit();
info!(%db_name, "wiped preserved catalog of registered database but do not have rules ready to recover"); let mut guard = self.errors_databases.lock();
Ok(()) let e = Arc::new(e);
guard.insert(db_name.to_string(), Arc::clone(&e));
warn!(%db_name, %e, "wiped preserved catalog of registered database but still cannot recover");
Err(Error::RecoverDbError { source: e })
}
} }
} else { } else {
let handle = config let handle = config
@ -482,6 +454,95 @@ impl InitStatus {
Ok(()) Ok(())
} }
} }
/// Try to make as much progress as possible with DB init.
///
/// Returns an error if there was an error along the way (in which case the handle should still be commit to safe
/// the intermediate result). Returns `Ok(true)` if DB init is finished and `Ok(false)` if the DB can be forgotten
/// (e.g. because not rules file is present.)
async fn try_advance_database_init_process_until_complete(
handle: &mut DatabaseHandle<'_>,
root: &Path,
wipe_on_error: bool,
) -> Result<bool> {
loop {
match Self::try_advance_database_init_process(handle, root, wipe_on_error).await? {
InitProgress::Unfinished => {}
InitProgress::Done => {
return Ok(true);
}
InitProgress::Forget => {
return Ok(false);
}
}
}
}
/// Try to make some progress in the DB init.
async fn try_advance_database_init_process(
handle: &mut DatabaseHandle<'_>,
root: &Path,
wipe_on_error: bool,
) -> Result<InitProgress> {
match handle.state_code() {
DatabaseStateCode::Known => {
// known => load DB rules
let path = object_store_path_for_database_config(root, &handle.db_name());
match Self::load_database_rules(handle.object_store(), path).await? {
Some(rules) => {
handle
.advance_rules_loaded(rules)
.map_err(Box::new)
.context(InitDbError)?;
// there is still more work to do for this DB
Ok(InitProgress::Unfinished)
}
None => {
// no rules file present, advice to forget his DB
Ok(InitProgress::Forget)
}
}
}
DatabaseStateCode::RulesLoaded => {
// rules already loaded => continue with loading preserved catalog
let (preserved_catalog, catalog) = load_or_create_preserved_catalog(
&handle.db_name(),
handle.object_store(),
handle.server_id(),
handle.metrics_registry(),
wipe_on_error,
)
.await
.map_err(|e| Box::new(e) as _)
.context(CatalogLoadError)?;
let rules = handle
.rules()
.expect("in this state rules should be loaded");
let write_buffer =
write_buffer::new(&rules).context(CreateWriteBufferForWriting)?;
handle
.advance_init(preserved_catalog, catalog, write_buffer)
.map_err(Box::new)
.context(InitDbError)?;
// there is still more work to do for this DB
Ok(InitProgress::Unfinished)
}
DatabaseStateCode::Initialized => {
// database fully initialized => nothing to do
Ok(InitProgress::Done)
}
}
}
}
enum InitProgress {
Unfinished,
Done,
Forget,
} }
// get bytes from the location in object store // get bytes from the location in object store

View File

@ -73,7 +73,8 @@ use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::BytesMut; use bytes::BytesMut;
use db::{load::create_preserved_catalog, DatabaseToCommit}; use config::DatabaseStateCode;
use db::load::create_preserved_catalog;
use init::InitStatus; use init::InitStatus;
use observability_deps::tracing::{debug, info, warn}; use observability_deps::tracing::{debug, info, warn};
use parking_lot::Mutex; use parking_lot::Mutex;
@ -161,9 +162,12 @@ pub enum Error {
#[snafu(display("store error: {}", source))] #[snafu(display("store error: {}", source))]
StoreError { source: object_store::Error }, StoreError { source: object_store::Error },
#[snafu(display("database already exists"))] #[snafu(display("database already exists: {}", db_name))]
DatabaseAlreadyExists { db_name: String }, DatabaseAlreadyExists { db_name: String },
#[snafu(display("database currently reserved: {}", db_name))]
DatabaseReserved { db_name: String },
#[snafu(display("no rules loaded for database: {}", db_name))] #[snafu(display("no rules loaded for database: {}", db_name))]
NoRulesLoaded { db_name: String }, NoRulesLoaded { db_name: String },
@ -210,6 +214,19 @@ pub enum Error {
#[snafu(display("cannot create write buffer for writing: {}", source))] #[snafu(display("cannot create write buffer for writing: {}", source))]
CreatingWriteBufferForWriting { source: DatabaseError }, CreatingWriteBufferForWriting { source: DatabaseError },
#[snafu(display(
"Invalid database state transition, expected {:?} but got {:?}",
expected,
actual
))]
InvalidDatabaseStateTransition {
actual: DatabaseStateCode,
expected: DatabaseStateCode,
},
#[snafu(display("server is shutting down"))]
ServerShuttingDown,
} }
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -487,9 +504,17 @@ where
let server_id = self.require_initialized()?; let server_id = self.require_initialized()?;
// Reserve name before expensive IO (e.g. loading the preserved catalog) // Reserve name before expensive IO (e.g. loading the preserved catalog)
let db_reservation = self.config.create_db(rules.name.clone())?; let mut db_reservation = self.config.create_db(
self.persist_database_rules(rules.clone()).await?; Arc::clone(&self.store),
Arc::clone(&self.exec),
server_id,
rules.name.clone(),
)?;
// register rules
db_reservation.advance_rules_loaded(rules.clone())?;
// load preserved catalog
let (preserved_catalog, catalog) = create_preserved_catalog( let (preserved_catalog, catalog) = create_preserved_catalog(
rules.db_name(), rules.db_name(),
Arc::clone(&self.store), Arc::clone(&self.store),
@ -499,21 +524,13 @@ where
.await .await
.map_err(|e| Box::new(e) as _) .map_err(|e| Box::new(e) as _)
.context(CannotCreatePreservedCatalog)?; .context(CannotCreatePreservedCatalog)?;
let write_buffer = write_buffer::new(&rules) let write_buffer = write_buffer::new(&rules)
.map_err(|e| Error::CreatingWriteBufferForWriting { source: e })?; .map_err(|e| Error::CreatingWriteBufferForWriting { source: e })?;
db_reservation.advance_init(preserved_catalog, catalog, write_buffer)?;
let database_to_commit = DatabaseToCommit { // ready to commit
server_id, self.persist_database_rules(rules.clone()).await?;
object_store: Arc::clone(&self.store), db_reservation.commit();
exec: Arc::clone(&self.exec),
preserved_catalog,
catalog,
rules,
write_buffer,
};
db_reservation.commit_db(database_to_commit)?;
Ok(()) Ok(())
} }
@ -854,18 +871,11 @@ where
}); });
let object_store = Arc::clone(&self.store); let object_store = Arc::clone(&self.store);
let config = Arc::clone(&self.config); let config = Arc::clone(&self.config);
let exec = Arc::clone(&self.exec);
let server_id = self.require_id()?; let server_id = self.require_id()?;
let init_status = Arc::clone(&self.init_status); let init_status = Arc::clone(&self.init_status);
let task = async move { let task = async move {
init_status init_status
.wipe_preserved_catalog_and_maybe_recover( .wipe_preserved_catalog_and_maybe_recover(object_store, config, server_id, db_name)
object_store,
config,
exec,
server_id,
db_name,
)
.await .await
}; };
tokio::spawn(task.track(registration)); tokio::spawn(task.track(registration));
@ -1918,7 +1928,7 @@ mod tests {
// creating failed DBs does not work // creating failed DBs does not work
let err = create_simple_database(&server, "bar").await.unwrap_err(); let err = create_simple_database(&server, "bar").await.unwrap_err();
assert_eq!(err.to_string(), "database already exists"); assert_eq!(err.to_string(), "database already exists: bar");
} }
#[tokio::test] #[tokio::test]
@ -2015,7 +2025,7 @@ mod tests {
.wipe_preserved_catalog(db_name_existing.clone()) .wipe_preserved_catalog(db_name_existing.clone())
.unwrap_err() .unwrap_err()
.to_string(), .to_string(),
"database already exists" "database already exists: db_existing"
); );
assert!(PreservedCatalog::exists( assert!(PreservedCatalog::exists(
&server.store, &server.store,
@ -2112,7 +2122,7 @@ mod tests {
.wipe_preserved_catalog(db_name_created.clone()) .wipe_preserved_catalog(db_name_created.clone())
.unwrap_err() .unwrap_err()
.to_string(), .to_string(),
"database already exists" "database already exists: db_created"
); );
assert!(PreservedCatalog::exists( assert!(PreservedCatalog::exists(
&server.store, &server.store,