diff --git a/data_types/src/database_state.rs b/data_types/src/database_state.rs index f62380d723..dc631154f6 100644 --- a/data_types/src/database_state.rs +++ b/data_types/src/database_state.rs @@ -3,7 +3,7 @@ /// The state machine is a simple linear state machine: /// /// ```text -/// Known -> RulesLoaded -> Initialized +/// Known -> RulesLoaded -> Replay -> Initialized /// ``` #[derive(Debug, PartialEq, Eq)] pub enum DatabaseStateCode { @@ -13,6 +13,9 @@ pub enum DatabaseStateCode { /// Rules are loaded RulesLoaded, + /// Catalog is loaded but data from sequencers / write buffers is not yet replayed. + Replay, + /// Fully initialized database. Initialized, } diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index 15121f68b4..a55429ce25 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -277,8 +277,8 @@ message DatabaseStatus { // Rules are loaded DATABASE_STATE_RULES_LOADED = 2; - // Here will be the "preserved catalog loaded but replay is pending" state - reserved 3; + // Catalog is loaded but data from sequencers / write buffers is not yet replayed. + DATABASE_STATE_REPLAY = 3; // Fully initialized database. DATABASE_STATE_INITIALIZED = 4; diff --git a/generated_types/src/database_state.rs b/generated_types/src/database_state.rs index 79b5de2a68..2b8f95e026 100644 --- a/generated_types/src/database_state.rs +++ b/generated_types/src/database_state.rs @@ -6,6 +6,7 @@ impl From for management::database_status::DatabaseState { match state_code { DatabaseStateCode::Known => Self::Known, DatabaseStateCode::RulesLoaded => Self::RulesLoaded, + DatabaseStateCode::Replay => Self::Replay, DatabaseStateCode::Initialized => Self::Initialized, } } diff --git a/server/src/config.rs b/server/src/config.rs index f2c89ecf53..3994b485b5 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -178,12 +178,12 @@ impl Config { } /// Get database, if registered and fully initialized. - pub(crate) fn db(&self, name: &DatabaseName<'_>) -> Option> { + pub(crate) fn db_initialized(&self, name: &DatabaseName<'_>) -> Option> { let state = self.state.read().expect("mutex poisoned"); state .databases .get(name) - .map(|db_state| db_state.db()) + .map(|db_state| db_state.db_initialized()) .flatten() } @@ -225,9 +225,12 @@ impl Config { where F: FnOnce(DatabaseRules) -> std::result::Result, { - let db = self.db(db_name).ok_or_else(|| Error::DatabaseNotFound { - db_name: db_name.to_string(), - })?; + // TODO: implement for non-initialized databases + let db = self + .db_initialized(db_name) + .ok_or_else(|| Error::DatabaseNotFound { + db_name: db_name.to_string(), + })?; let mut rules = db.rules.write(); *rules = update(rules.clone()).map_err(UpdateError::Closure)?; @@ -371,6 +374,24 @@ impl RemoteTemplate { } } +/// Internal representation of the different database states. +/// +/// # Shared Data During Transitions +/// The following elements can safely be shared between states because they won't be poissened by any half-done +/// transition (e.g. starting a transition and then failing due to an IO error): +/// - `object_store` +/// - `exec` +/// +/// The following elements can trivially be copied from one state to the next: +/// - `server_id` +/// - `db_name` +/// +/// The following elements MUST be copied from one state to the next because partial modifications are not allowed: +/// - `rules` +/// +/// Exceptions to the above rules are the following states: +/// - [`Replay`](Self::Replay): replaying twice should (apart from some performance penalties) not do much harm +/// - [`Initialized`](Self::Initialized): the final state is not advanced to anything else #[derive(Debug)] #[allow(clippy::large_enum_variant)] enum DatabaseState { @@ -390,6 +411,9 @@ enum DatabaseState { rules: DatabaseRules, }, + /// Catalog is loaded but data from sequencers / write buffers is not yet replayed. + Replay { db: Arc }, + /// Fully initialized database. Initialized { db: Arc, @@ -410,6 +434,7 @@ impl DatabaseState { match self { DatabaseState::Known { .. } => DatabaseStateCode::Known, DatabaseState::RulesLoaded { .. } => DatabaseStateCode::RulesLoaded, + DatabaseState::Replay { .. } => DatabaseStateCode::Replay, DatabaseState::Initialized { .. } => DatabaseStateCode::Initialized, } } @@ -418,7 +443,15 @@ impl DatabaseState { matches!(self, DatabaseState::Initialized { .. }) } - fn db(&self) -> Option> { + fn db_any_state(&self) -> Option> { + match self { + DatabaseState::Replay { db, .. } => Some(Arc::clone(&db)), + DatabaseState::Initialized { db, .. } => Some(Arc::clone(&db)), + _ => None, + } + } + + fn db_initialized(&self) -> Option> { match self { DatabaseState::Initialized { db, .. } => Some(Arc::clone(&db)), _ => None, @@ -429,6 +462,7 @@ impl DatabaseState { match self { DatabaseState::Known { db_name, .. } => db_name.clone(), DatabaseState::RulesLoaded { rules, .. } => rules.name.clone(), + DatabaseState::Replay { db, .. } => db.rules.read().name.clone(), DatabaseState::Initialized { db, .. } => db.rules.read().name.clone(), } } @@ -437,6 +471,7 @@ impl DatabaseState { match self { DatabaseState::Known { object_store, .. } => Arc::clone(object_store), DatabaseState::RulesLoaded { object_store, .. } => Arc::clone(object_store), + DatabaseState::Replay { db, .. } => Arc::clone(&db.store), DatabaseState::Initialized { db, .. } => Arc::clone(&db.store), } } @@ -445,6 +480,7 @@ impl DatabaseState { match self { DatabaseState::Known { server_id, .. } => *server_id, DatabaseState::RulesLoaded { server_id, .. } => *server_id, + DatabaseState::Replay { db, .. } => db.server_id, DatabaseState::Initialized { db, .. } => db.server_id, } } @@ -453,6 +489,7 @@ impl DatabaseState { match self { DatabaseState::Known { .. } => None, DatabaseState::RulesLoaded { rules, .. } => Some(rules.clone()), + DatabaseState::Replay { db, .. } => Some(db.rules.read().clone()), DatabaseState::Initialized { db, .. } => Some(db.rules.read().clone()), } } @@ -531,6 +568,13 @@ impl<'a> DatabaseHandle<'a> { self.state().rules() } + /// Get database linked to this state. + /// + /// This database may be uninitialized. + pub fn db_any_state(&self) -> Option> { + self.state().db_any_state() + } + /// Commit modification done to this handle to config. /// /// After commiting a new handle for the same database can be created. @@ -579,8 +623,8 @@ impl<'a> DatabaseHandle<'a> { } } - /// Advance database state to [`Initialized`](DatabaseStateCode::Initialized). - pub fn advance_init( + /// Advance database state to [`Replay`](DatabaseStateCode::Replay). + pub fn advance_replay( &mut self, preserved_catalog: PreservedCatalog, catalog: Catalog, @@ -593,13 +637,6 @@ impl<'a> DatabaseHandle<'a> { server_id, rules, } => { - let name = rules.name.clone(); - - if self.config.shutdown.is_cancelled() { - error!("server is shutting down"); - return Err(Error::ServerShuttingDown); - } - let database_to_commit = DatabaseToCommit { server_id: *server_id, object_store: Arc::clone(&object_store), @@ -611,10 +648,30 @@ impl<'a> DatabaseHandle<'a> { }; let db = Arc::new(Db::new(database_to_commit, Arc::clone(&self.config.jobs))); + self.state = Some(Arc::new(DatabaseState::Replay { db })); + + Ok(()) + } + state => Err(Error::InvalidDatabaseStateTransition { + actual: state.code(), + expected: DatabaseStateCode::RulesLoaded, + }), + } + } + + /// Advance database state to [`Initialized`](DatabaseStateCode::Initialized). + pub fn advance_init(&mut self) -> Result<()> { + match self.state().as_ref() { + DatabaseState::Replay { db } => { + if self.config.shutdown.is_cancelled() { + error!("server is shutting down"); + return Err(Error::ServerShuttingDown); + } + let shutdown = self.config.shutdown.child_token(); let shutdown_captured = shutdown.clone(); let db_captured = Arc::clone(&db); - let name_captured = name.clone(); + let name_captured = db.rules.read().name.clone(); let handle = Some(tokio::spawn(async move { db_captured @@ -624,7 +681,7 @@ impl<'a> DatabaseHandle<'a> { })); self.state = Some(Arc::new(DatabaseState::Initialized { - db, + db: Arc::clone(&db), handle, shutdown, })); @@ -633,7 +690,7 @@ impl<'a> DatabaseHandle<'a> { } state => Err(Error::InvalidDatabaseStateTransition { actual: state.code(), - expected: DatabaseStateCode::RulesLoaded, + expected: DatabaseStateCode::Replay, }), } } @@ -714,7 +771,7 @@ mod test { let err = config.block_db(name.clone()).unwrap_err(); assert!(matches!(err, Error::DatabaseReserved { .. })); } - assert!(config.db(&name).is_none()); + assert!(config.db_initialized(&name).is_none()); assert_eq!(config.db_names_sorted(), vec![]); assert!(!config.has_uninitialized_database(&name)); @@ -734,7 +791,7 @@ mod test { .unwrap_err(); assert!(matches!(err, Error::RulesDatabaseNameMismatch { .. })); } - assert!(config.db(&name).is_none()); + assert!(config.db_initialized(&name).is_none()); assert_eq!(config.db_names_sorted(), vec![]); assert!(!config.has_uninitialized_database(&name)); @@ -751,7 +808,7 @@ mod test { db_reservation.abort(); } - assert!(config.db(&name).is_none()); + assert!(config.db_initialized(&name).is_none()); assert_eq!(config.db_names_sorted(), vec![]); assert!(!config.has_uninitialized_database(&name)); @@ -778,12 +835,14 @@ mod test { .await .unwrap(); db_reservation - .advance_init(preserved_catalog, catalog, None) + .advance_replay(preserved_catalog, catalog, None) .unwrap(); + db_reservation.advance_init().unwrap(); + db_reservation.commit(); } - assert!(config.db(&name).is_some()); + assert!(config.db_initialized(&name).is_some()); assert_eq!(config.db_names_sorted(), vec![name.clone()]); assert!(!config.has_uninitialized_database(&name)); @@ -791,14 +850,14 @@ mod test { tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; assert!( config - .db(&name) + .db_initialized(&name) .expect("expected database") .worker_iterations_lifecycle() > 0 ); assert!( config - .db(&name) + .db_initialized(&name) .expect("expected database") .worker_iterations_cleanup() > 0 @@ -897,8 +956,14 @@ mod test { .await .unwrap(); db_reservation - .advance_init(preserved_catalog, catalog, None) + .advance_replay(preserved_catalog, catalog, None) .unwrap(); + assert_eq!(db_reservation.state_code(), DatabaseStateCode::Replay); + assert_eq!(db_reservation.db_name(), name); + assert_eq!(db_reservation.server_id(), server_id); + assert!(db_reservation.rules().is_some()); + + db_reservation.advance_init().unwrap(); assert_eq!(db_reservation.state_code(), DatabaseStateCode::Initialized); assert_eq!(db_reservation.db_name(), name); assert_eq!(db_reservation.server_id(), server_id); @@ -906,7 +971,7 @@ mod test { db_reservation.commit(); } - assert!(config.db(&name).is_some()); + assert!(config.db_initialized(&name).is_some()); assert_eq!(config.db_names_sorted(), vec![name.clone()]); assert!(!config.has_uninitialized_database(&name)); @@ -1019,8 +1084,9 @@ mod test { .unwrap(); db_reservation.advance_rules_loaded(rules).unwrap(); db_reservation - .advance_init(preserved_catalog, catalog, None) + .advance_replay(preserved_catalog, catalog, None) .unwrap(); + db_reservation.advance_init().unwrap(); db_reservation.commit(); // get shutdown token diff --git a/server/src/db.rs b/server/src/db.rs index e0fa7c8cbf..d74cc53160 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -486,6 +486,11 @@ impl Db { self.worker_iterations_cleanup.load(Ordering::Relaxed) } + /// Perform sequencer-driven replay for this DB. + pub async fn perform_replay(&self) { + // TODO: implement replay + } + /// Background worker function pub async fn background_worker( self: &Arc, diff --git a/server/src/init.rs b/server/src/init.rs index c5448f0504..021c8d1d47 100644 --- a/server/src/init.rs +++ b/server/src/init.rs @@ -524,7 +524,21 @@ impl InitStatus { write_buffer::new(&rules).context(CreateWriteBufferForWriting)?; handle - .advance_init(preserved_catalog, catalog, write_buffer) + .advance_replay(preserved_catalog, catalog, write_buffer) + .map_err(Box::new) + .context(InitDbError)?; + + // there is still more work to do for this DB + Ok(InitProgress::Unfinished) + } + DatabaseStateCode::Replay => { + let db = handle + .db_any_state() + .expect("DB should be available in this state"); + db.perform_replay().await; + + handle + .advance_init() .map_err(Box::new) .context(InitDbError)?; diff --git a/server/src/lib.rs b/server/src/lib.rs index c7920e77c6..bfe333a2cc 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -538,7 +538,10 @@ where .context(CannotCreatePreservedCatalog)?; let write_buffer = write_buffer::new(&rules) .map_err(|e| Error::CreatingWriteBufferForWriting { source: e })?; - db_reservation.advance_init(preserved_catalog, catalog, write_buffer)?; + db_reservation.advance_replay(preserved_catalog, catalog, write_buffer)?; + + // no actual replay required + db_reservation.advance_init()?; // ready to commit self.persist_database_rules(rules.clone()).await?; @@ -618,7 +621,7 @@ where let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?; let db = self .config - .db(&db_name) + .db_initialized(&db_name) .context(DatabaseNotFound { db_name: &*db_name })?; // need to split this in two blocks because we cannot hold a lock across an async call. @@ -753,7 +756,7 @@ where let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?; let db = self .config - .db(&db_name) + .db_initialized(&db_name) .context(DatabaseNotFound { db_name: &*db_name })?; let entry = entry_bytes.try_into().context(DecodingEntry)?; @@ -790,11 +793,13 @@ where } pub fn db(&self, name: &DatabaseName<'_>) -> Option> { - self.config.db(name) + self.config.db_initialized(name) } pub fn db_rules(&self, name: &DatabaseName<'_>) -> Option { - self.config.db(name).map(|d| d.rules.read().clone()) + self.config + .db_initialized(name) + .map(|d| d.rules.read().clone()) } // Update database rules and save on success. @@ -861,7 +866,7 @@ where let db = self .config - .db(&name) + .db_initialized(&name) .context(DatabaseNotFound { db_name: &db_name })?; let chunk = db @@ -883,7 +888,7 @@ where &self, db_name: DatabaseName<'static>, ) -> Result> { - if self.config.db(&db_name).is_some() { + if self.config.db_initialized(&db_name).is_some() { return Err(Error::DatabaseAlreadyExists { db_name: db_name.to_string(), });