feat: add `Replay` state to DB init

pull/24376/head
Marco Neumann 2021-07-06 14:24:39 +02:00
parent 246a07f884
commit 3d644b63a1
7 changed files with 133 additions and 39 deletions

View File

@ -3,7 +3,7 @@
/// The state machine is a simple linear state machine:
///
/// ```text
/// Known -> RulesLoaded -> Initialized
/// Known -> RulesLoaded -> Replay -> Initialized
/// ```
#[derive(Debug, PartialEq, Eq)]
pub enum DatabaseStateCode {
@ -13,6 +13,9 @@ pub enum DatabaseStateCode {
/// Rules are loaded
RulesLoaded,
/// Catalog is loaded but data from sequencers / write buffers is not yet replayed.
Replay,
/// Fully initialized database.
Initialized,
}

View File

@ -277,8 +277,8 @@ message DatabaseStatus {
// Rules are loaded
DATABASE_STATE_RULES_LOADED = 2;
// Here will be the "preserved catalog loaded but replay is pending" state
reserved 3;
// Catalog is loaded but data from sequencers / write buffers is not yet replayed.
DATABASE_STATE_REPLAY = 3;
// Fully initialized database.
DATABASE_STATE_INITIALIZED = 4;

View File

@ -6,6 +6,7 @@ impl From<DatabaseStateCode> for management::database_status::DatabaseState {
match state_code {
DatabaseStateCode::Known => Self::Known,
DatabaseStateCode::RulesLoaded => Self::RulesLoaded,
DatabaseStateCode::Replay => Self::Replay,
DatabaseStateCode::Initialized => Self::Initialized,
}
}

View File

@ -178,12 +178,12 @@ impl Config {
}
/// Get database, if registered and fully initialized.
pub(crate) fn db(&self, name: &DatabaseName<'_>) -> Option<Arc<Db>> {
pub(crate) fn db_initialized(&self, name: &DatabaseName<'_>) -> Option<Arc<Db>> {
let state = self.state.read().expect("mutex poisoned");
state
.databases
.get(name)
.map(|db_state| db_state.db())
.map(|db_state| db_state.db_initialized())
.flatten()
}
@ -225,9 +225,12 @@ impl Config {
where
F: FnOnce(DatabaseRules) -> std::result::Result<DatabaseRules, E>,
{
let db = self.db(db_name).ok_or_else(|| Error::DatabaseNotFound {
db_name: db_name.to_string(),
})?;
// TODO: implement for non-initialized databases
let db = self
.db_initialized(db_name)
.ok_or_else(|| Error::DatabaseNotFound {
db_name: db_name.to_string(),
})?;
let mut rules = db.rules.write();
*rules = update(rules.clone()).map_err(UpdateError::Closure)?;
@ -371,6 +374,24 @@ impl RemoteTemplate {
}
}
/// Internal representation of the different database states.
///
/// # Shared Data During Transitions
/// The following elements can safely be shared between states because they won't be poissened by any half-done
/// transition (e.g. starting a transition and then failing due to an IO error):
/// - `object_store`
/// - `exec`
///
/// The following elements can trivially be copied from one state to the next:
/// - `server_id`
/// - `db_name`
///
/// The following elements MUST be copied from one state to the next because partial modifications are not allowed:
/// - `rules`
///
/// Exceptions to the above rules are the following states:
/// - [`Replay`](Self::Replay): replaying twice should (apart from some performance penalties) not do much harm
/// - [`Initialized`](Self::Initialized): the final state is not advanced to anything else
#[derive(Debug)]
#[allow(clippy::large_enum_variant)]
enum DatabaseState {
@ -390,6 +411,9 @@ enum DatabaseState {
rules: DatabaseRules,
},
/// Catalog is loaded but data from sequencers / write buffers is not yet replayed.
Replay { db: Arc<Db> },
/// Fully initialized database.
Initialized {
db: Arc<Db>,
@ -410,6 +434,7 @@ impl DatabaseState {
match self {
DatabaseState::Known { .. } => DatabaseStateCode::Known,
DatabaseState::RulesLoaded { .. } => DatabaseStateCode::RulesLoaded,
DatabaseState::Replay { .. } => DatabaseStateCode::Replay,
DatabaseState::Initialized { .. } => DatabaseStateCode::Initialized,
}
}
@ -418,7 +443,15 @@ impl DatabaseState {
matches!(self, DatabaseState::Initialized { .. })
}
fn db(&self) -> Option<Arc<Db>> {
fn db_any_state(&self) -> Option<Arc<Db>> {
match self {
DatabaseState::Replay { db, .. } => Some(Arc::clone(&db)),
DatabaseState::Initialized { db, .. } => Some(Arc::clone(&db)),
_ => None,
}
}
fn db_initialized(&self) -> Option<Arc<Db>> {
match self {
DatabaseState::Initialized { db, .. } => Some(Arc::clone(&db)),
_ => None,
@ -429,6 +462,7 @@ impl DatabaseState {
match self {
DatabaseState::Known { db_name, .. } => db_name.clone(),
DatabaseState::RulesLoaded { rules, .. } => rules.name.clone(),
DatabaseState::Replay { db, .. } => db.rules.read().name.clone(),
DatabaseState::Initialized { db, .. } => db.rules.read().name.clone(),
}
}
@ -437,6 +471,7 @@ impl DatabaseState {
match self {
DatabaseState::Known { object_store, .. } => Arc::clone(object_store),
DatabaseState::RulesLoaded { object_store, .. } => Arc::clone(object_store),
DatabaseState::Replay { db, .. } => Arc::clone(&db.store),
DatabaseState::Initialized { db, .. } => Arc::clone(&db.store),
}
}
@ -445,6 +480,7 @@ impl DatabaseState {
match self {
DatabaseState::Known { server_id, .. } => *server_id,
DatabaseState::RulesLoaded { server_id, .. } => *server_id,
DatabaseState::Replay { db, .. } => db.server_id,
DatabaseState::Initialized { db, .. } => db.server_id,
}
}
@ -453,6 +489,7 @@ impl DatabaseState {
match self {
DatabaseState::Known { .. } => None,
DatabaseState::RulesLoaded { rules, .. } => Some(rules.clone()),
DatabaseState::Replay { db, .. } => Some(db.rules.read().clone()),
DatabaseState::Initialized { db, .. } => Some(db.rules.read().clone()),
}
}
@ -531,6 +568,13 @@ impl<'a> DatabaseHandle<'a> {
self.state().rules()
}
/// Get database linked to this state.
///
/// This database may be uninitialized.
pub fn db_any_state(&self) -> Option<Arc<Db>> {
self.state().db_any_state()
}
/// Commit modification done to this handle to config.
///
/// After commiting a new handle for the same database can be created.
@ -579,8 +623,8 @@ impl<'a> DatabaseHandle<'a> {
}
}
/// Advance database state to [`Initialized`](DatabaseStateCode::Initialized).
pub fn advance_init(
/// Advance database state to [`Replay`](DatabaseStateCode::Replay).
pub fn advance_replay(
&mut self,
preserved_catalog: PreservedCatalog,
catalog: Catalog,
@ -593,13 +637,6 @@ impl<'a> DatabaseHandle<'a> {
server_id,
rules,
} => {
let name = rules.name.clone();
if self.config.shutdown.is_cancelled() {
error!("server is shutting down");
return Err(Error::ServerShuttingDown);
}
let database_to_commit = DatabaseToCommit {
server_id: *server_id,
object_store: Arc::clone(&object_store),
@ -611,10 +648,30 @@ impl<'a> DatabaseHandle<'a> {
};
let db = Arc::new(Db::new(database_to_commit, Arc::clone(&self.config.jobs)));
self.state = Some(Arc::new(DatabaseState::Replay { db }));
Ok(())
}
state => Err(Error::InvalidDatabaseStateTransition {
actual: state.code(),
expected: DatabaseStateCode::RulesLoaded,
}),
}
}
/// Advance database state to [`Initialized`](DatabaseStateCode::Initialized).
pub fn advance_init(&mut self) -> Result<()> {
match self.state().as_ref() {
DatabaseState::Replay { db } => {
if self.config.shutdown.is_cancelled() {
error!("server is shutting down");
return Err(Error::ServerShuttingDown);
}
let shutdown = self.config.shutdown.child_token();
let shutdown_captured = shutdown.clone();
let db_captured = Arc::clone(&db);
let name_captured = name.clone();
let name_captured = db.rules.read().name.clone();
let handle = Some(tokio::spawn(async move {
db_captured
@ -624,7 +681,7 @@ impl<'a> DatabaseHandle<'a> {
}));
self.state = Some(Arc::new(DatabaseState::Initialized {
db,
db: Arc::clone(&db),
handle,
shutdown,
}));
@ -633,7 +690,7 @@ impl<'a> DatabaseHandle<'a> {
}
state => Err(Error::InvalidDatabaseStateTransition {
actual: state.code(),
expected: DatabaseStateCode::RulesLoaded,
expected: DatabaseStateCode::Replay,
}),
}
}
@ -714,7 +771,7 @@ mod test {
let err = config.block_db(name.clone()).unwrap_err();
assert!(matches!(err, Error::DatabaseReserved { .. }));
}
assert!(config.db(&name).is_none());
assert!(config.db_initialized(&name).is_none());
assert_eq!(config.db_names_sorted(), vec![]);
assert!(!config.has_uninitialized_database(&name));
@ -734,7 +791,7 @@ mod test {
.unwrap_err();
assert!(matches!(err, Error::RulesDatabaseNameMismatch { .. }));
}
assert!(config.db(&name).is_none());
assert!(config.db_initialized(&name).is_none());
assert_eq!(config.db_names_sorted(), vec![]);
assert!(!config.has_uninitialized_database(&name));
@ -751,7 +808,7 @@ mod test {
db_reservation.abort();
}
assert!(config.db(&name).is_none());
assert!(config.db_initialized(&name).is_none());
assert_eq!(config.db_names_sorted(), vec![]);
assert!(!config.has_uninitialized_database(&name));
@ -778,12 +835,14 @@ mod test {
.await
.unwrap();
db_reservation
.advance_init(preserved_catalog, catalog, None)
.advance_replay(preserved_catalog, catalog, None)
.unwrap();
db_reservation.advance_init().unwrap();
db_reservation.commit();
}
assert!(config.db(&name).is_some());
assert!(config.db_initialized(&name).is_some());
assert_eq!(config.db_names_sorted(), vec![name.clone()]);
assert!(!config.has_uninitialized_database(&name));
@ -791,14 +850,14 @@ mod test {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
assert!(
config
.db(&name)
.db_initialized(&name)
.expect("expected database")
.worker_iterations_lifecycle()
> 0
);
assert!(
config
.db(&name)
.db_initialized(&name)
.expect("expected database")
.worker_iterations_cleanup()
> 0
@ -897,8 +956,14 @@ mod test {
.await
.unwrap();
db_reservation
.advance_init(preserved_catalog, catalog, None)
.advance_replay(preserved_catalog, catalog, None)
.unwrap();
assert_eq!(db_reservation.state_code(), DatabaseStateCode::Replay);
assert_eq!(db_reservation.db_name(), name);
assert_eq!(db_reservation.server_id(), server_id);
assert!(db_reservation.rules().is_some());
db_reservation.advance_init().unwrap();
assert_eq!(db_reservation.state_code(), DatabaseStateCode::Initialized);
assert_eq!(db_reservation.db_name(), name);
assert_eq!(db_reservation.server_id(), server_id);
@ -906,7 +971,7 @@ mod test {
db_reservation.commit();
}
assert!(config.db(&name).is_some());
assert!(config.db_initialized(&name).is_some());
assert_eq!(config.db_names_sorted(), vec![name.clone()]);
assert!(!config.has_uninitialized_database(&name));
@ -1019,8 +1084,9 @@ mod test {
.unwrap();
db_reservation.advance_rules_loaded(rules).unwrap();
db_reservation
.advance_init(preserved_catalog, catalog, None)
.advance_replay(preserved_catalog, catalog, None)
.unwrap();
db_reservation.advance_init().unwrap();
db_reservation.commit();
// get shutdown token

View File

@ -486,6 +486,11 @@ impl Db {
self.worker_iterations_cleanup.load(Ordering::Relaxed)
}
/// Perform sequencer-driven replay for this DB.
pub async fn perform_replay(&self) {
// TODO: implement replay
}
/// Background worker function
pub async fn background_worker(
self: &Arc<Self>,

View File

@ -524,7 +524,21 @@ impl InitStatus {
write_buffer::new(&rules).context(CreateWriteBufferForWriting)?;
handle
.advance_init(preserved_catalog, catalog, write_buffer)
.advance_replay(preserved_catalog, catalog, write_buffer)
.map_err(Box::new)
.context(InitDbError)?;
// there is still more work to do for this DB
Ok(InitProgress::Unfinished)
}
DatabaseStateCode::Replay => {
let db = handle
.db_any_state()
.expect("DB should be available in this state");
db.perform_replay().await;
handle
.advance_init()
.map_err(Box::new)
.context(InitDbError)?;

View File

@ -538,7 +538,10 @@ where
.context(CannotCreatePreservedCatalog)?;
let write_buffer = write_buffer::new(&rules)
.map_err(|e| Error::CreatingWriteBufferForWriting { source: e })?;
db_reservation.advance_init(preserved_catalog, catalog, write_buffer)?;
db_reservation.advance_replay(preserved_catalog, catalog, write_buffer)?;
// no actual replay required
db_reservation.advance_init()?;
// ready to commit
self.persist_database_rules(rules.clone()).await?;
@ -618,7 +621,7 @@ where
let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?;
let db = self
.config
.db(&db_name)
.db_initialized(&db_name)
.context(DatabaseNotFound { db_name: &*db_name })?;
// need to split this in two blocks because we cannot hold a lock across an async call.
@ -753,7 +756,7 @@ where
let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?;
let db = self
.config
.db(&db_name)
.db_initialized(&db_name)
.context(DatabaseNotFound { db_name: &*db_name })?;
let entry = entry_bytes.try_into().context(DecodingEntry)?;
@ -790,11 +793,13 @@ where
}
pub fn db(&self, name: &DatabaseName<'_>) -> Option<Arc<Db>> {
self.config.db(name)
self.config.db_initialized(name)
}
pub fn db_rules(&self, name: &DatabaseName<'_>) -> Option<DatabaseRules> {
self.config.db(name).map(|d| d.rules.read().clone())
self.config
.db_initialized(name)
.map(|d| d.rules.read().clone())
}
// Update database rules and save on success.
@ -861,7 +866,7 @@ where
let db = self
.config
.db(&name)
.db_initialized(&name)
.context(DatabaseNotFound { db_name: &db_name })?;
let chunk = db
@ -883,7 +888,7 @@ where
&self,
db_name: DatabaseName<'static>,
) -> Result<TaskTracker<Job>> {
if self.config.db(&db_name).is_some() {
if self.config.db_initialized(&db_name).is_some() {
return Err(Error::DatabaseAlreadyExists {
db_name: db_name.to_string(),
});