diff --git a/server/src/config.rs b/server/src/config.rs index 15e43b1688..370b960177 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -12,7 +12,8 @@ use query::exec::Executor; /// This module contains code for managing the configuration of the server. use crate::{ db::{catalog::Catalog, DatabaseToCommit, Db}, - write_buffer, Error, JobRegistry, Result, + write_buffer::WriteBuffer, + Error, JobRegistry, Result, }; use observability_deps::tracing::{self, error, info, warn, Instrument}; use tokio::task::JoinHandle; @@ -64,6 +65,10 @@ impl Config { /// Get handle to create a database. /// + /// The handle present a database in the [`Known`](DatabaseStateCode::Known) state. Note that until the handle is + /// [committed](DatabaseHandle::commit) the database will not be present in the config. Hence + /// [aborting](DatabaseHandle::abort) will discard the to-be-created database. + /// /// While the handle is held, no other operations for the given database can be executed. /// /// This only works if the database is not yet known. To recover a database out of an uninitialized state, see @@ -71,56 +76,70 @@ impl Config { /// without initializing it, see [`block_db`](Self::block_db). pub(crate) fn create_db( &self, + object_store: Arc, + exec: Arc, + server_id: ServerId, db_name: DatabaseName<'static>, - ) -> Result> { + ) -> Result> { let mut state = self.state.write().expect("mutex poisoned"); - if state.reservations.contains(&db_name) - || state.databases.contains_key(&db_name) - || state.uninitialized_databases.contains_key(&db_name) - { + if state.reservations.contains(&db_name) { + return Err(Error::DatabaseReserved { + db_name: db_name.to_string(), + }); + } + if state.databases.contains_key(&db_name) { return Err(Error::DatabaseAlreadyExists { db_name: db_name.to_string(), }); } state.reservations.insert(db_name.clone()); - Ok(CreateDatabaseHandle { - db_name: Some(db_name), + Ok(DatabaseHandle { + state: Some(Arc::new(DatabaseState::Known { + object_store, + exec, + server_id, + db_name, + })), config: &self, }) } /// Get handle to recover database out of an uninitialized state. /// - /// If there are already rules known for this database, they will be passed to the handle. + /// The state of the handle will be identical to the one that was last committed. /// /// While the handle is held, no other operations for the given database can be executed. /// /// This only works if the database is known but is uninitialized. To create a new database that is not yet known, /// see [`create_db`](Self::create_db). To do maintainance work on data linked to the database (e.g. the catalog) /// without initializing it, see [`block_db`](Self::block_db). - pub(crate) fn recover_db( - &self, - db_name: DatabaseName<'static>, - ) -> Result> { + pub(crate) fn recover_db(&self, db_name: DatabaseName<'static>) -> Result> { let mut state = self.state.write().expect("mutex poisoned"); - if state.reservations.contains(&db_name) || state.databases.contains_key(&db_name) { + if state.reservations.contains(&db_name) { + return Err(Error::DatabaseReserved { + db_name: db_name.to_string(), + }); + } + + let db_state = + state + .databases + .get(&db_name) + .cloned() + .ok_or_else(|| Error::DatabaseNotFound { + db_name: db_name.to_string(), + })?; + + if db_state.is_initialized() { return Err(Error::DatabaseAlreadyExists { db_name: db_name.to_string(), }); } - let rules = state - .uninitialized_databases - .get(&db_name) - .cloned() - .ok_or_else(|| Error::DatabaseNotFound { - db_name: db_name.to_string(), - })?; state.reservations.insert(db_name.clone()); - Ok(RecoverDatabaseHandle { - db_name: Some(db_name), - rules, + Ok(DatabaseHandle { + state: Some(db_state), config: &self, }) } @@ -137,10 +156,12 @@ impl Config { db_name: DatabaseName<'static>, ) -> Result> { let mut state = self.state.write().expect("mutex poisoned"); - if state.reservations.contains(&db_name) - || state.databases.contains_key(&db_name) - || state.uninitialized_databases.contains_key(&db_name) - { + if state.reservations.contains(&db_name) { + return Err(Error::DatabaseReserved { + db_name: db_name.to_string(), + }); + } + if state.databases.contains_key(&db_name) { return Err(Error::DatabaseAlreadyExists { db_name: db_name.to_string(), }); @@ -156,13 +177,21 @@ impl Config { /// Get database, if registered and fully initialized. pub(crate) fn db(&self, name: &DatabaseName<'_>) -> Option> { let state = self.state.read().expect("mutex poisoned"); - state.databases.get(name).map(|x| Arc::clone(&x.db)) + state + .databases + .get(name) + .map(|db_state| db_state.db()) + .flatten() } /// Check if there is a database with the given name that is registered but is uninitialized. pub(crate) fn has_uninitialized_database(&self, name: &DatabaseName<'_>) -> bool { let state = self.state.read().expect("mutex poisoned"); - state.uninitialized_databases.contains_key(name) + state + .databases + .get(name) + .map(|db_state| !db_state.is_initialized()) + .unwrap_or(false) } /// Get all database names in all states (blocked, uninitialized, fully initialized). @@ -173,7 +202,6 @@ impl Config { .iter() .cloned() .chain(state.databases.keys().cloned()) - .chain(state.uninitialized_databases.keys().cloned()) .collect(); names.sort(); names @@ -188,15 +216,11 @@ impl Config { where F: FnOnce(DatabaseRules) -> std::result::Result, { - let state = self.state.read().expect("mutex poisoned"); - let db_state = state - .databases - .get(db_name) - .ok_or_else(|| Error::DatabaseNotFound { - db_name: db_name.to_string(), - })?; + let db = self.db(db_name).ok_or_else(|| Error::DatabaseNotFound { + db_name: db_name.to_string(), + })?; - let mut rules = db_state.db.rules.write(); + let mut rules = db.rules.write(); *rules = update(rules.clone()).map_err(UpdateError::Closure)?; Ok(rules.clone()) } @@ -229,53 +253,11 @@ impl Config { .or_else(|| state.remote_template.as_ref().map(|t| t.get(&id))) } - /// Creates database in initialized state. - fn commit_db(&self, database_to_commit: DatabaseToCommit) { + /// Commit new or unchanged database state. + fn commit_db(&self, db_state: Arc) { let mut state = self.state.write().expect("mutex poisoned"); - let name = database_to_commit.rules.name.clone(); - - if self.shutdown.is_cancelled() { - error!("server is shutting down"); - return; - } - - let db = Arc::new(Db::new(database_to_commit, Arc::clone(&self.jobs))); - - let shutdown = self.shutdown.child_token(); - let shutdown_captured = shutdown.clone(); - let db_captured = Arc::clone(&db); - let name_captured = name.clone(); - - let handle = Some(tokio::spawn(async move { - db_captured - .background_worker(shutdown_captured) - .instrument(tracing::info_span!("db_worker", database=%name_captured)) - .await - })); - - assert!(state - .databases - .insert( - name.clone(), - DatabaseState { - db, - handle, - shutdown - } - ) - .is_none()); - state.reservations.remove(&name); - state.uninitialized_databases.remove(&name); - } - - /// Creates a database in an uninitialized state but remembers rules that are supposed to use for the database. - fn commit_uninitialized(&self, name: DatabaseName<'static>, rules: Option) { - let mut state = self.state.write().expect("mutex poisoned"); - - assert!(state - .uninitialized_databases - .insert(name.clone(), rules,) - .is_none()); + let name = db_state.db_name(); + state.databases.insert(name.clone(), db_state); state.reservations.remove(&name); } @@ -292,14 +274,21 @@ impl Config { // This will cancel all background child tasks self.shutdown.cancel(); - let handles: Vec<_> = self - .state - .write() - .expect("mutex poisoned") - .databases - .iter_mut() - .filter_map(|(_, v)| v.join()) - .collect(); + let handles: Vec<_> = { + let mut state = self.state.write().expect("mutex poisoned"); + + let mut databases = BTreeMap::new(); + std::mem::swap(&mut databases, &mut state.databases); + + databases + .into_iter() + .filter_map(|(_, db_state)| { + Arc::try_unwrap(db_state) + .expect("who else has a DB handle here?!") + .join() + }) + .collect() + }; for handle in handles { let _ = handle.await; @@ -308,11 +297,14 @@ impl Config { info!("database background workers shutdown"); } + /// Metrics registry associated with this config and that should be used to create all databases. pub fn metrics_registry(&self) -> Arc { Arc::clone(&self.metric_registry) } } +/// Get object store path for the database config under the given root (= path under with the server with the current ID +/// stores all its data). pub fn object_store_path_for_database_config( root: &P, name: &DatabaseName<'_>, @@ -329,14 +321,11 @@ pub type GRpcConnectionString = String; /// Inner config state that is protected by a lock. #[derive(Default, Debug)] struct ConfigState { - /// Databases that are known (but not exist due to init errors). + /// Databases for which there are handled but that are not yet committed to `databases`. reservations: BTreeSet>, - /// Rules for databases that are reserved but that could not be initialized. - uninitialized_databases: BTreeMap, Option>, - - /// Databases that work. - databases: BTreeMap, DatabaseState>, + /// Databases in different states. + databases: BTreeMap, Arc>, /// Map between remote IOx server IDs and management API connection strings. remotes: BTreeMap, @@ -374,184 +363,296 @@ impl RemoteTemplate { } #[derive(Debug)] -struct DatabaseState { - db: Arc, - handle: Option>, - shutdown: CancellationToken, +#[allow(clippy::large_enum_variant)] +enum DatabaseState { + /// Database is known but nothing is loaded. + Known { + object_store: Arc, + exec: Arc, + server_id: ServerId, + db_name: DatabaseName<'static>, + }, + + /// Rules are loaded + RulesLoaded { + object_store: Arc, + exec: Arc, + server_id: ServerId, + rules: DatabaseRules, + }, + + /// Fully initialized database. + Initialized { + db: Arc, + handle: Option>, + shutdown: CancellationToken, + }, } impl DatabaseState { fn join(&mut self) -> Option> { - self.handle.take() + match self { + DatabaseState::Initialized { handle, .. } => handle.take(), + _ => None, + } + } + + fn code(&self) -> DatabaseStateCode { + match self { + DatabaseState::Known { .. } => DatabaseStateCode::Known, + DatabaseState::RulesLoaded { .. } => DatabaseStateCode::RulesLoaded, + DatabaseState::Initialized { .. } => DatabaseStateCode::Initialized, + } + } + + fn is_initialized(&self) -> bool { + matches!(self, DatabaseState::Initialized { .. }) + } + + fn db(&self) -> Option> { + match self { + DatabaseState::Initialized { db, .. } => Some(Arc::clone(&db)), + _ => None, + } + } + + fn db_name(&self) -> DatabaseName<'static> { + match self { + DatabaseState::Known { db_name, .. } => db_name.clone(), + DatabaseState::RulesLoaded { rules, .. } => rules.name.clone(), + DatabaseState::Initialized { db, .. } => db.rules.read().name.clone(), + } + } + + fn object_store(&self) -> Arc { + match self { + DatabaseState::Known { object_store, .. } => Arc::clone(object_store), + DatabaseState::RulesLoaded { object_store, .. } => Arc::clone(object_store), + DatabaseState::Initialized { db, .. } => Arc::clone(&db.store), + } + } + + fn server_id(&self) -> ServerId { + match self { + DatabaseState::Known { server_id, .. } => *server_id, + DatabaseState::RulesLoaded { server_id, .. } => *server_id, + DatabaseState::Initialized { db, .. } => db.server_id, + } + } + + fn rules(&self) -> Option { + match self { + DatabaseState::Known { .. } => None, + DatabaseState::RulesLoaded { rules, .. } => Some(rules.clone()), + DatabaseState::Initialized { db, .. } => Some(db.rules.read().clone()), + } } } impl Drop for DatabaseState { fn drop(&mut self) { - if self.handle.is_some() { - // Join should be called on `DatabaseState` prior to dropping, for example, by - // calling drain() on the owning `Config` - warn!("DatabaseState dropped without waiting for background task to complete"); - self.shutdown.cancel(); + if let DatabaseState::Initialized { + handle, shutdown, .. + } = self + { + if handle.is_some() { + // Join should be called on `DatabaseState` prior to dropping, for example, by + // calling drain() on the owning `Config` + warn!("DatabaseState dropped without waiting for background task to complete"); + shutdown.cancel(); + } } } } -/// CreateDatabaseHandle is returned when a call is made to `create_db` on -/// the Config struct. The handle can be used to hold a reservation for the -/// database name. Calling `commit` on the handle will consume the struct -/// and move the database from reserved to being in the config. +/// Simple representation of the state a database can be in. /// -/// The goal is to ensure that database names can be reserved with -/// minimal time holding a write lock on the config state. This allows -/// the caller (the server) to reserve the database name, persist its -/// configuration and then commit the change in-memory after it has been -/// persisted. +/// The state machine is a simple linear state machine: +/// +/// ```text +/// Known -> RulesLoaded -> Initialized +/// ``` +#[derive(Debug, PartialEq, Eq)] +pub enum DatabaseStateCode { + /// Database is known but nothing is loaded. + Known, + + /// Rules are loaded + RulesLoaded, + + /// Fully initialized database. + Initialized, +} + +/// This handle is returned when a call is made to [`create_db`](Config::create_db) or +/// [`recover_db`](Config::recover_db) on the Config struct. The handle can be used to hold a reservation for the +/// database name. Calling `commit` on the handle will consume the struct and move the database from reserved to being +/// in the config. +/// +/// # Concurrent Actions +/// The goal is to ensure that database names can be reserved with minimal time holding a write lock on the config +/// state. This allows the caller (the server) to reserve the database name, setup the database (e.g. load the preserved +/// catalog), persist its configuration and then commit the change in-memory after it has been persisted. +/// +/// # State +/// The handle represents databases in different states. The state can be queries with [`DatabaseHandle::state_code`]. +/// See [`DatabaseStateCode`] for the description of the different states. States can be advances by using on of the +/// `advance_*` methods. #[derive(Debug)] -pub(crate) struct CreateDatabaseHandle<'a> { +pub(crate) struct DatabaseHandle<'a> { /// Partial moves aren't supported on structures that implement Drop /// so use Option to allow taking DatabaseRules out in `commit` - db_name: Option>, + state: Option>, config: &'a Config, } -impl<'a> CreateDatabaseHandle<'a> { - /// Create initialized database. - /// - /// Will fail if database name used to create this handle and the name within `rules` do not match. In this case, - /// the database will be de-registered. - pub(crate) fn commit_db(mut self, database_to_commit: DatabaseToCommit) -> Result<()> { - let db_name = self.db_name.take().expect("not committed"); - if db_name != database_to_commit.rules.name { - self.config.forget_reservation(&db_name); - return Err(Error::RulesDatabaseNameMismatch { - actual: database_to_commit.rules.name.to_string(), - expected: db_name.to_string(), - }); - } - - self.config.commit_db(database_to_commit); - - Ok(()) +impl<'a> DatabaseHandle<'a> { + fn state(&self) -> Arc { + Arc::clone(&self.state.as_ref().expect("not committed")) } - /// Create database in uninitialized state and only remember rules. - /// - /// Use [`Config::recover_db`] to recover database from that state. - /// - /// Will fail if database name used to create this handle and the name within `rules` do not match. In this case, - /// the database will be de-registered. - pub(crate) fn commit_rules_only(mut self, rules: DatabaseRules) -> Result<()> { - let db_name = self.db_name.take().expect("not committed"); - if db_name != rules.name { - self.config.forget_reservation(&db_name); - return Err(Error::RulesDatabaseNameMismatch { - actual: rules.name.to_string(), - expected: db_name.to_string(), - }); - } - - self.config.commit_uninitialized(db_name, Some(rules)); - - Ok(()) + /// Get current [`DatabaseStateCode`] associated with this handle. + pub fn state_code(&self) -> DatabaseStateCode { + self.state().code() } - /// Create database in uninitialized state without any rules. - /// - /// Use [`Config::recover_db`] to recover database from that state. - pub(crate) fn commit_no_rules(mut self) { - let db_name = self.db_name.take().expect("not committed"); - - self.config.commit_uninitialized(db_name, None); + /// Get database name. + pub fn db_name(&self) -> DatabaseName<'static> { + self.state().db_name() } -} -impl<'a> Drop for CreateDatabaseHandle<'a> { - fn drop(&mut self) { - if let Some(db_name) = self.db_name.take() { - self.config.forget_reservation(&db_name) + /// Get object store. + pub fn object_store(&self) -> Arc { + self.state().object_store() + } + + /// Get server ID. + pub fn server_id(&self) -> ServerId { + self.state().server_id() + } + + /// Get metrics registry. + pub fn metrics_registry(&self) -> Arc { + self.config.metrics_registry() + } + + /// Get rules, if already known in the current state. + pub fn rules(&self) -> Option { + self.state().rules() + } + + /// Commit modification done to this handle to config. + /// + /// After commiting a new handle for the same database can be created. + pub fn commit(mut self) { + let state = self.state.take().expect("not committed"); + self.config.commit_db(state); + } + + /// Discard modification done to this handle. + /// + /// After aborting a new handle for the same database can be created. + pub fn abort(mut self) { + let state = self.state.take().expect("not committed"); + self.config.forget_reservation(&state.db_name()) + } + + /// Advance database state to [`RulesLoaded`](DatabaseStateCode::RulesLoaded). + pub fn advance_rules_loaded(&mut self, rules: DatabaseRules) -> Result<()> { + match self.state().as_ref() { + DatabaseState::Known { + object_store, + exec, + server_id, + db_name, + } => { + if db_name != &rules.name { + return Err(Error::RulesDatabaseNameMismatch { + actual: rules.name.to_string(), + expected: db_name.to_string(), + }); + } + + self.state = Some(Arc::new(DatabaseState::RulesLoaded { + object_store: Arc::clone(&object_store), + exec: Arc::clone(&exec), + server_id: *server_id, + rules, + })); + + Ok(()) + } + state => Err(Error::InvalidDatabaseStateTransition { + actual: state.code(), + expected: DatabaseStateCode::Known, + }), } } -} -#[derive(Debug)] -pub(crate) struct RecoverDatabaseHandle<'a> { - /// Partial moves aren't supported on structures that implement Drop - /// so use Option to allow taking DatabaseRules out in `commit` - db_name: Option>, - rules: Option, - config: &'a Config, -} - -impl<'a> RecoverDatabaseHandle<'a> { - /// Create initialized database. - /// - /// Rules are taken from the `rules` argument. If that is `None`, the rules that were previously recorded are used. - /// If both are `None` this method will fail and the database is kept uninitialized and the registered rules will - /// not change. - /// - /// Will fail if database name used to create this handle and the name within `rules` do not match. In this case, - /// the database will be kept uninitialized and the registered rules will not change. - pub(crate) fn commit_db( - mut self, - server_id: ServerId, - object_store: Arc, - exec: Arc, + /// Advance database state to [`Initialized`](DatabaseStateCode::Initialized). + pub fn advance_init( + &mut self, preserved_catalog: PreservedCatalog, catalog: Catalog, - rules: Option, + write_buffer: Option>, ) -> Result<()> { - let db_name = self.db_name.take().expect("not committed"); - let rules = rules - .or_else(|| self.rules.take()) - .ok_or_else(|| Error::NoRulesLoaded { - db_name: db_name.to_string(), - })?; - if db_name != rules.name { - self.config.forget_reservation(&db_name); - return Err(Error::RulesDatabaseNameMismatch { - actual: rules.name.to_string(), - expected: db_name.to_string(), - }); + match self.state().as_ref() { + DatabaseState::RulesLoaded { + object_store, + exec, + server_id, + rules, + } => { + let name = rules.name.clone(); + + if self.config.shutdown.is_cancelled() { + error!("server is shutting down"); + return Err(Error::ServerShuttingDown); + } + + let database_to_commit = DatabaseToCommit { + server_id: *server_id, + object_store: Arc::clone(&object_store), + exec: Arc::clone(&exec), + preserved_catalog, + catalog, + rules: rules.clone(), + write_buffer, + }; + let db = Arc::new(Db::new(database_to_commit, Arc::clone(&self.config.jobs))); + + let shutdown = self.config.shutdown.child_token(); + let shutdown_captured = shutdown.clone(); + let db_captured = Arc::clone(&db); + let name_captured = name.clone(); + + let handle = Some(tokio::spawn(async move { + db_captured + .background_worker(shutdown_captured) + .instrument(tracing::info_span!("db_worker", database=%name_captured)) + .await + })); + + self.state = Some(Arc::new(DatabaseState::Initialized { + db, + handle, + shutdown, + })); + + Ok(()) + } + state => Err(Error::InvalidDatabaseStateTransition { + actual: state.code(), + expected: DatabaseStateCode::RulesLoaded, + }), } - - let write_buffer = write_buffer::new(&rules) - .map_err(|e| Error::CreatingWriteBufferForWriting { source: e })?; - - let database_to_commit = DatabaseToCommit { - server_id, - object_store, - exec, - preserved_catalog, - catalog, - rules, - write_buffer, - }; - - self.config.commit_db(database_to_commit); - - Ok(()) - } - - /// Check if there are already rules known for this database. - pub(crate) fn has_rules(&self) -> bool { - self.rules.is_some() - } - - /// Abort recovery process. - /// - /// This keeps the database in an uninitialized state and does not alter the potentially registered rules. - pub(crate) fn abort(mut self) { - let db_name = self.db_name.take().expect("not committed"); - - self.config.forget_reservation(&db_name); } } -impl<'a> Drop for RecoverDatabaseHandle<'a> { +impl<'a> Drop for DatabaseHandle<'a> { fn drop(&mut self) { - if let Some(db_name) = self.db_name.take() { - self.config.forget_reservation(&db_name) + if let Some(state) = self.state.as_ref() { + self.config.forget_reservation(&state.db_name()) } } } @@ -586,75 +687,118 @@ mod test { #[tokio::test] async fn create_db() { + // setup let name = DatabaseName::new("foo").unwrap(); + let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); + let exec = Arc::new(Executor::new(1)); + let server_id = ServerId::try_from(1).unwrap(); let metric_registry = Arc::new(metrics::MetricRegistry::new()); let config = Config::new( Arc::new(JobRegistry::new()), Arc::clone(&metric_registry), None, ); - - { - let _db_reservation = config.create_db(name.clone()).unwrap(); - let err = config.create_db(name.clone()).unwrap_err(); - assert!(matches!(err, Error::DatabaseAlreadyExists { .. })); - } - - let server_id = ServerId::try_from(1).unwrap(); - let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); - let exec = Arc::new(Executor::new(1)); - let (preserved_catalog, catalog) = load_or_create_preserved_catalog( - &name, - Arc::clone(&store), - server_id, - config.metrics_registry(), - false, - ) - .await - .unwrap(); let rules = DatabaseRules::new(name.clone()); + // getting handle while DB is reserved => fails { - let db_reservation = config.create_db(DatabaseName::new("bar").unwrap()).unwrap(); - let database_to_commit = DatabaseToCommit { - server_id, - object_store: Arc::clone(&store), - exec: Arc::clone(&exec), - preserved_catalog, - catalog, - rules: rules.clone(), - write_buffer: None, - }; + let _db_reservation = config + .create_db( + Arc::clone(&store), + Arc::clone(&exec), + server_id, + name.clone(), + ) + .unwrap(); - let err = db_reservation.commit_db(database_to_commit).unwrap_err(); + let err = config + .create_db( + Arc::clone(&store), + Arc::clone(&exec), + server_id, + name.clone(), + ) + .unwrap_err(); + assert!(matches!(err, Error::DatabaseReserved { .. })); + + let err = config.block_db(name.clone()).unwrap_err(); + assert!(matches!(err, Error::DatabaseReserved { .. })); + } + assert!(config.db(&name).is_none()); + assert_eq!(config.db_names_sorted(), vec![]); + assert!(!config.has_uninitialized_database(&name)); + + // name in rules must match reserved name + { + let mut db_reservation = config + .create_db( + Arc::clone(&store), + Arc::clone(&exec), + server_id, + DatabaseName::new("bar").unwrap(), + ) + .unwrap(); + + let err = db_reservation + .advance_rules_loaded(rules.clone()) + .unwrap_err(); assert!(matches!(err, Error::RulesDatabaseNameMismatch { .. })); } + assert!(config.db(&name).is_none()); + assert_eq!(config.db_names_sorted(), vec![]); + assert!(!config.has_uninitialized_database(&name)); - let (preserved_catalog, catalog) = load_or_create_preserved_catalog( - &name, - Arc::clone(&store), - server_id, - config.metrics_registry(), - false, - ) - .await - .unwrap(); - let db_reservation = config.create_db(name.clone()).unwrap(); - let database_to_commit = DatabaseToCommit { - server_id, - object_store: store, - exec, - preserved_catalog, - catalog, - rules, - write_buffer: None, - }; - db_reservation.commit_db(database_to_commit).unwrap(); + // handle.abort just works (aka does not mess up the transaction afterwards) + { + let db_reservation = config + .create_db( + Arc::clone(&store), + Arc::clone(&exec), + server_id, + DatabaseName::new("bar").unwrap(), + ) + .unwrap(); + + db_reservation.abort(); + } + assert!(config.db(&name).is_none()); + assert_eq!(config.db_names_sorted(), vec![]); + assert!(!config.has_uninitialized_database(&name)); + + // create DB successfull + { + let mut db_reservation = config + .create_db( + Arc::clone(&store), + Arc::clone(&exec), + server_id, + name.clone(), + ) + .unwrap(); + + db_reservation.advance_rules_loaded(rules).unwrap(); + + let (preserved_catalog, catalog) = load_or_create_preserved_catalog( + &name, + Arc::clone(&store), + server_id, + config.metrics_registry(), + false, + ) + .await + .unwrap(); + db_reservation + .advance_init(preserved_catalog, catalog, None) + .unwrap(); + + db_reservation.commit(); + } assert!(config.db(&name).is_some()); assert_eq!(config.db_names_sorted(), vec![name.clone()]); + assert!(!config.has_uninitialized_database(&name)); + // check that background workers is running tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; - assert!( config .db(&name) @@ -670,115 +814,142 @@ mod test { > 0 ); + // recover a fully initialzed DB => fail let err = config.recover_db(name.clone()).unwrap_err(); assert!(matches!(err, Error::DatabaseAlreadyExists { .. })); - let err = config.create_db(name.clone()).unwrap_err(); + // create DB as second time => fail + let err = config + .create_db( + Arc::clone(&store), + Arc::clone(&exec), + server_id, + name.clone(), + ) + .unwrap_err(); assert!(matches!(err, Error::DatabaseAlreadyExists { .. })); + // block fully initiliazed DB => fail let err = config.block_db(name.clone()).unwrap_err(); assert!(matches!(err, Error::DatabaseAlreadyExists { .. })); + // cleanup config.drain().await } #[tokio::test] async fn recover_db() { + // setup let name = DatabaseName::new("foo").unwrap(); + let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); + let exec = Arc::new(Executor::new(1)); + let server_id = ServerId::try_from(1).unwrap(); let metric_registry = Arc::new(metrics::MetricRegistry::new()); let config = Config::new( Arc::new(JobRegistry::new()), Arc::clone(&metric_registry), None, ); - - { - let db_reservation = config.create_db(name.clone()).unwrap(); - - let err = db_reservation - .commit_rules_only(DatabaseRules::new(DatabaseName::new("bar").unwrap())) - .unwrap_err(); - assert!(matches!(err, Error::RulesDatabaseNameMismatch { .. })); - } - - assert!(!config.has_uninitialized_database(&name)); - let rules = DatabaseRules::new(name.clone()); - { - let db_reservation = config.create_db(name.clone()).unwrap(); - db_reservation.commit_rules_only(rules).unwrap(); - } + // create DB but don't continue with rules loaded (e.g. because the rules file is broken) + { + let db_reservation = config + .create_db( + Arc::clone(&store), + Arc::clone(&exec), + server_id, + name.clone(), + ) + .unwrap(); + db_reservation.commit(); + } assert!(config.has_uninitialized_database(&name)); - let err = config.create_db(name.clone()).unwrap_err(); + // create DB while it is uninitialized => fail + let err = config + .create_db( + Arc::clone(&store), + Arc::clone(&exec), + server_id, + name.clone(), + ) + .unwrap_err(); assert!(matches!(err, Error::DatabaseAlreadyExists { .. })); + // recover an unknown DB => fail let err = config .recover_db(DatabaseName::new("bar").unwrap()) .unwrap_err(); assert!(matches!(err, Error::DatabaseNotFound { .. })); - let server_id = ServerId::try_from(1).unwrap(); - let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); - let exec = Arc::new(Executor::new(1)); - let (preserved_catalog, catalog) = load_or_create_preserved_catalog( - &name, - Arc::clone(&store), - server_id, - config.metrics_registry(), - false, - ) - .await - .unwrap(); + // recover DB { - let db_reservation = config.recover_db(name.clone()).unwrap(); - let err = db_reservation - .commit_db( - server_id, - Arc::clone(&store), - Arc::clone(&exec), - preserved_catalog, - catalog, - Some(DatabaseRules::new(DatabaseName::new("bar").unwrap())), - ) - .unwrap_err(); - assert!(matches!(err, Error::RulesDatabaseNameMismatch { .. })); - } + let mut db_reservation = config.recover_db(name.clone()).unwrap(); + assert_eq!(db_reservation.state_code(), DatabaseStateCode::Known); + assert_eq!(db_reservation.db_name(), name); + assert_eq!(db_reservation.server_id(), server_id); + assert!(db_reservation.rules().is_none()); - let (preserved_catalog, catalog) = load_or_create_preserved_catalog( - &name, - Arc::clone(&store), - server_id, - config.metrics_registry(), - false, - ) - .await - .unwrap(); - let db_reservation = config.recover_db(name.clone()).unwrap(); - assert!(db_reservation.has_rules()); - db_reservation - .commit_db(server_id, store, exec, preserved_catalog, catalog, None) + db_reservation.advance_rules_loaded(rules).unwrap(); + assert_eq!(db_reservation.state_code(), DatabaseStateCode::RulesLoaded); + assert_eq!(db_reservation.db_name(), name); + assert_eq!(db_reservation.server_id(), server_id); + assert!(db_reservation.rules().is_some()); + + let (preserved_catalog, catalog) = load_or_create_preserved_catalog( + &name, + Arc::clone(&store), + server_id, + config.metrics_registry(), + false, + ) + .await .unwrap(); + db_reservation + .advance_init(preserved_catalog, catalog, None) + .unwrap(); + assert_eq!(db_reservation.state_code(), DatabaseStateCode::Initialized); + assert_eq!(db_reservation.db_name(), name); + assert_eq!(db_reservation.server_id(), server_id); + assert!(db_reservation.rules().is_some()); + + db_reservation.commit(); + } assert!(config.db(&name).is_some()); assert_eq!(config.db_names_sorted(), vec![name.clone()]); assert!(!config.has_uninitialized_database(&name)); + // recover DB a second time => fail let err = config.recover_db(name.clone()).unwrap_err(); assert!(matches!(err, Error::DatabaseAlreadyExists { .. })); - let err = config.create_db(name.clone()).unwrap_err(); + // create recovered DB => fail + let err = config + .create_db( + Arc::clone(&store), + Arc::clone(&exec), + server_id, + name.clone(), + ) + .unwrap_err(); assert!(matches!(err, Error::DatabaseAlreadyExists { .. })); + // block recovered DB => fail let err = config.block_db(name.clone()).unwrap_err(); assert!(matches!(err, Error::DatabaseAlreadyExists { .. })); + // cleanup config.drain().await } #[tokio::test] async fn block_db() { + // setup let name = DatabaseName::new("foo").unwrap(); + let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); + let exec = Arc::new(Executor::new(1)); + let server_id = ServerId::try_from(1).unwrap(); let metric_registry = Arc::new(metrics::MetricRegistry::new()); let config = Config::new( Arc::new(JobRegistry::new()), @@ -786,27 +957,50 @@ mod test { None, ); + // block DB let handle = config.block_db(name.clone()).unwrap(); + // create while blocked => fail + let err = config + .create_db( + Arc::clone(&store), + Arc::clone(&exec), + server_id, + name.clone(), + ) + .unwrap_err(); + assert!(matches!(err, Error::DatabaseReserved { .. })); + + // recover while blocked => fail let err = config.recover_db(name.clone()).unwrap_err(); - assert!(matches!(err, Error::DatabaseAlreadyExists { .. })); - - let err = config.create_db(name.clone()).unwrap_err(); - assert!(matches!(err, Error::DatabaseAlreadyExists { .. })); + assert!(matches!(err, Error::DatabaseReserved { .. })); + // block while blocked => fail let err = config.block_db(name.clone()).unwrap_err(); - assert!(matches!(err, Error::DatabaseAlreadyExists { .. })); + assert!(matches!(err, Error::DatabaseReserved { .. })); + // unblock => DB can be created drop(handle); + config + .create_db( + Arc::clone(&store), + Arc::clone(&exec), + server_id, + name.clone(), + ) + .unwrap(); - config.create_db(name.clone()).unwrap(); - + // cleanup config.drain().await } #[tokio::test] async fn test_db_drop() { + // setup let name = DatabaseName::new("foo").unwrap(); + let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); + let exec = Arc::new(Executor::new(1)); + let server_id = ServerId::try_from(1).unwrap(); let metric_registry = Arc::new(metrics::MetricRegistry::new()); let config = Config::new( Arc::new(JobRegistry::new()), @@ -814,11 +1008,6 @@ mod test { None, ); let rules = DatabaseRules::new(name.clone()); - - let db_reservation = config.create_db(name.clone()).unwrap(); - let server_id = ServerId::try_from(1).unwrap(); - let store = Arc::new(ObjectStore::new_in_memory(InMemory::new())); - let exec = Arc::new(Executor::new(1)); let (preserved_catalog, catalog) = load_or_create_preserved_catalog( &name, Arc::clone(&store), @@ -829,27 +1018,34 @@ mod test { .await .unwrap(); - let database_to_commit = DatabaseToCommit { - server_id, - object_store: store, - exec, - preserved_catalog, - catalog, - rules, - write_buffer: None, - }; + // create DB + let mut db_reservation = config + .create_db( + Arc::clone(&store), + Arc::clone(&exec), + server_id, + name.clone(), + ) + .unwrap(); + db_reservation.advance_rules_loaded(rules).unwrap(); + db_reservation + .advance_init(preserved_catalog, catalog, None) + .unwrap(); + db_reservation.commit(); - db_reservation.commit_db(database_to_commit).unwrap(); - - let token = config + // get shutdown token + let token = match config .state .read() .expect("lock poisoned") .databases .get(&name) .unwrap() - .shutdown - .clone(); + .as_ref() + { + DatabaseState::Initialized { shutdown, .. } => shutdown.clone(), + _ => panic!("wrong state"), + }; // Drop config without calling drain std::mem::drop(config); diff --git a/server/src/init.rs b/server/src/init.rs index ee826d07c5..404fdb9e08 100644 --- a/server/src/init.rs +++ b/server/src/init.rs @@ -22,8 +22,11 @@ use std::{ use tokio::sync::Semaphore; use crate::{ - config::{Config, DB_RULES_FILE_NAME}, - db::{load::load_or_create_preserved_catalog, DatabaseToCommit}, + config::{ + object_store_path_for_database_config, Config, DatabaseHandle, DatabaseStateCode, + DB_RULES_FILE_NAME, + }, + db::load::load_or_create_preserved_catalog, write_buffer, DatabaseError, }; @@ -54,14 +57,14 @@ pub enum Error { #[snafu(display("store error: {}", source))] StoreError { source: object_store::Error }, - #[snafu(display("Cannot create DB: {}", source))] - CreateDbError { source: Box }, - #[snafu(display("Cannot recover DB: {}", source))] RecoverDbError { source: Arc, }, + #[snafu(display("Cannot init DB: {}", source))] + InitDbError { source: Box }, + #[snafu(display("Cannot wipe preserved catalog DB: {}", source))] PreservedCatalogWipeError { source: Box, @@ -74,6 +77,12 @@ pub enum Error { #[snafu(display("Cannot create write buffer for writing: {}", source))] CreateWriteBufferForWriting { source: DatabaseError }, + + #[snafu(display( + "Cannot wipe catalog because DB init progress has already read it: {}", + db_name + ))] + DbPartiallyInitialized { db_name: String }, } pub type Result = std::result::Result; @@ -243,13 +252,12 @@ impl InitStatus { exec: Arc, server_id: ServerId, ) -> Result<()> { + let root = self.root_path(&store)?; + // get the database names from the object store prefixes // TODO: update object store to pull back all common prefixes by // following the next tokens. - let list_result = store - .list_with_delimiter(&self.root_path(&store)?) - .await - .context(StoreError)?; + let list_result = store.list_with_delimiter(&root).await.context(StoreError)?; let handles: Vec<_> = list_result .common_prefixes @@ -260,26 +268,32 @@ impl InitStatus { let exec = Arc::clone(&exec); let errors_databases = Arc::clone(&self.errors_databases); let wipe_on_error = self.wipe_on_error.load(Ordering::Relaxed); + let root = root.clone(); path.set_file_name(DB_RULES_FILE_NAME); match db_name_from_rules_path(&path) { Ok(db_name) => { let handle = tokio::task::spawn(async move { - if let Err(e) = Self::initialize_database( + match Self::initialize_database( server_id, store, config, exec, - path, + root, db_name.clone(), wipe_on_error, ) .await { - error!(%e, "cannot load database"); - let mut guard = errors_databases.lock(); - guard.insert(db_name.to_string(), Arc::new(e)); + Ok(()) => { + info!(%db_name, "database initialized"); + } + Err(e) => { + error!(%e, %db_name, "cannot load database"); + let mut guard = errors_databases.lock(); + guard.insert(db_name.to_string(), Arc::new(e)); + } } }); Some(handle) @@ -302,71 +316,36 @@ impl InitStatus { store: Arc, config: Arc, exec: Arc, - path: Path, + root: Path, db_name: DatabaseName<'static>, wipe_on_error: bool, ) -> Result<()> { // Reserve name before expensive IO (e.g. loading the preserved catalog) - let handle = config - .create_db(db_name) + let mut handle = config + .create_db(store, exec, server_id, db_name) .map_err(Box::new) - .context(CreateDbError)?; + .context(InitDbError)?; - let metrics_registry = config.metrics_registry(); - - match Self::load_database_rules(Arc::clone(&store), path).await { - Ok(Some(rules)) => { - // loaded rules, continue w/ preserved catalog - match load_or_create_preserved_catalog( - rules.db_name(), - Arc::clone(&store), - server_id, - metrics_registry, - wipe_on_error, - ) - .await - .map_err(|e| Box::new(e) as _) - .context(CatalogLoadError) - { - Ok((preserved_catalog, catalog)) => { - let write_buffer = - write_buffer::new(&rules).context(CreateWriteBufferForWriting)?; - - let database_to_commit = DatabaseToCommit { - server_id, - object_store: store, - exec, - preserved_catalog, - catalog, - rules, - write_buffer, - }; - - // everything is there, can create DB - handle - .commit_db(database_to_commit) - .map_err(Box::new) - .context(CreateDbError)?; - Ok(()) - } - Err(e) => { - // catalog loading failed, at least remember the rules we have loaded - handle - .commit_rules_only(rules) - .map_err(Box::new) - .context(CreateDbError)?; - Err(e) - } - } + match Self::try_advance_database_init_process_until_complete( + &mut handle, + &root, + wipe_on_error, + ) + .await + { + Ok(true) => { + // finished init and keep DB + handle.commit(); + Ok(()) } - Ok(None) => { - // no DB there, drop handle to initiate rollback - drop(handle); + Ok(false) => { + // finished but do not keep DB + handle.abort(); Ok(()) } Err(e) => { - // abort transaction but keep DB registered - handle.commit_no_rules(); + // encountered some error, still commit intermediate result + handle.commit(); Err(e) } } @@ -403,67 +382,60 @@ impl InitStatus { &self, store: Arc, config: Arc, - exec: Arc, server_id: ServerId, db_name: DatabaseName<'static>, ) -> Result<()> { if config.has_uninitialized_database(&db_name) { - let handle = config + let mut handle = config .recover_db(db_name.clone()) .map_err(|e| Arc::new(e) as _) .context(RecoverDbError)?; - PreservedCatalog::wipe(&store, server_id, &db_name) + if !((handle.state_code() == DatabaseStateCode::Known) + || (handle.state_code() == DatabaseStateCode::RulesLoaded)) + { + // cannot wipe because init state is already too far + return Err(Error::DbPartiallyInitialized { + db_name: db_name.to_string(), + }); + } + + // wipe while holding handle so no other init/wipe process can interact with the catalog + PreservedCatalog::wipe(&store, handle.server_id(), &db_name) .await .map_err(Box::new) .context(PreservedCatalogWipeError)?; - if handle.has_rules() { - // can recover - let metrics_registry = config.metrics_registry(); - let wipe_on_error = self.wipe_on_error.load(Ordering::Relaxed); + let root = self.root_path(&store)?; + let wipe_on_error = self.wipe_on_error.load(Ordering::Relaxed); + match Self::try_advance_database_init_process_until_complete( + &mut handle, + &root, + wipe_on_error, + ) + .await + { + Ok(_) => { + // yeah, recovered DB + handle.commit(); - match load_or_create_preserved_catalog( - &db_name, - Arc::clone(&store), - server_id, - metrics_registry, - wipe_on_error, - ) - .await - .map_err(|e| Box::new(e) as _) - .context(CatalogLoadError) - { - Ok((preserved_catalog, catalog)) => { - handle - .commit_db(server_id, store, exec, preserved_catalog, catalog, None) - .map_err(|e | { - warn!(%db_name, %e, "wiped preserved catalog of registered database but still cannot recover"); - Box::new(e) - }) - .context(CreateDbError)?; - let mut guard = self.errors_databases.lock(); - guard.remove(&db_name.to_string()); + let mut guard = self.errors_databases.lock(); + guard.remove(&db_name.to_string()); - info!(%db_name, "wiped preserved catalog of registered database and recovered"); - Ok(()) - } - Err(e) => { - let mut guard = self.errors_databases.lock(); - let e = Arc::new(e); - guard.insert(db_name.to_string(), Arc::clone(&e)); - drop(handle); - - warn!(%db_name, %e, "wiped preserved catalog of registered database but still cannot recover"); - Err(Error::RecoverDbError { source: e }) - } + info!(%db_name, "wiped preserved catalog of registered database and recovered"); + Ok(()) } - } else { - // cannot recover, don't have any rules (yet) - handle.abort(); + Err(e) => { + // could not recover, but still keep new result + handle.commit(); - info!(%db_name, "wiped preserved catalog of registered database but do not have rules ready to recover"); - Ok(()) + let mut guard = self.errors_databases.lock(); + let e = Arc::new(e); + guard.insert(db_name.to_string(), Arc::clone(&e)); + + warn!(%db_name, %e, "wiped preserved catalog of registered database but still cannot recover"); + Err(Error::RecoverDbError { source: e }) + } } } else { let handle = config @@ -482,6 +454,95 @@ impl InitStatus { Ok(()) } } + + /// Try to make as much progress as possible with DB init. + /// + /// Returns an error if there was an error along the way (in which case the handle should still be commit to safe + /// the intermediate result). Returns `Ok(true)` if DB init is finished and `Ok(false)` if the DB can be forgotten + /// (e.g. because not rules file is present.) + async fn try_advance_database_init_process_until_complete( + handle: &mut DatabaseHandle<'_>, + root: &Path, + wipe_on_error: bool, + ) -> Result { + loop { + match Self::try_advance_database_init_process(handle, root, wipe_on_error).await? { + InitProgress::Unfinished => {} + InitProgress::Done => { + return Ok(true); + } + InitProgress::Forget => { + return Ok(false); + } + } + } + } + + /// Try to make some progress in the DB init. + async fn try_advance_database_init_process( + handle: &mut DatabaseHandle<'_>, + root: &Path, + wipe_on_error: bool, + ) -> Result { + match handle.state_code() { + DatabaseStateCode::Known => { + // known => load DB rules + let path = object_store_path_for_database_config(root, &handle.db_name()); + match Self::load_database_rules(handle.object_store(), path).await? { + Some(rules) => { + handle + .advance_rules_loaded(rules) + .map_err(Box::new) + .context(InitDbError)?; + + // there is still more work to do for this DB + Ok(InitProgress::Unfinished) + } + None => { + // no rules file present, advice to forget his DB + Ok(InitProgress::Forget) + } + } + } + DatabaseStateCode::RulesLoaded => { + // rules already loaded => continue with loading preserved catalog + let (preserved_catalog, catalog) = load_or_create_preserved_catalog( + &handle.db_name(), + handle.object_store(), + handle.server_id(), + handle.metrics_registry(), + wipe_on_error, + ) + .await + .map_err(|e| Box::new(e) as _) + .context(CatalogLoadError)?; + + let rules = handle + .rules() + .expect("in this state rules should be loaded"); + let write_buffer = + write_buffer::new(&rules).context(CreateWriteBufferForWriting)?; + + handle + .advance_init(preserved_catalog, catalog, write_buffer) + .map_err(Box::new) + .context(InitDbError)?; + + // there is still more work to do for this DB + Ok(InitProgress::Unfinished) + } + DatabaseStateCode::Initialized => { + // database fully initialized => nothing to do + Ok(InitProgress::Done) + } + } + } +} + +enum InitProgress { + Unfinished, + Done, + Forget, } // get bytes from the location in object store diff --git a/server/src/lib.rs b/server/src/lib.rs index 4ece3f9bdf..ed807da543 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -73,7 +73,8 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::BytesMut; -use db::{load::create_preserved_catalog, DatabaseToCommit}; +use config::DatabaseStateCode; +use db::load::create_preserved_catalog; use init::InitStatus; use observability_deps::tracing::{debug, info, warn}; use parking_lot::Mutex; @@ -161,9 +162,12 @@ pub enum Error { #[snafu(display("store error: {}", source))] StoreError { source: object_store::Error }, - #[snafu(display("database already exists"))] + #[snafu(display("database already exists: {}", db_name))] DatabaseAlreadyExists { db_name: String }, + #[snafu(display("database currently reserved: {}", db_name))] + DatabaseReserved { db_name: String }, + #[snafu(display("no rules loaded for database: {}", db_name))] NoRulesLoaded { db_name: String }, @@ -210,6 +214,19 @@ pub enum Error { #[snafu(display("cannot create write buffer for writing: {}", source))] CreatingWriteBufferForWriting { source: DatabaseError }, + + #[snafu(display( + "Invalid database state transition, expected {:?} but got {:?}", + expected, + actual + ))] + InvalidDatabaseStateTransition { + actual: DatabaseStateCode, + expected: DatabaseStateCode, + }, + + #[snafu(display("server is shutting down"))] + ServerShuttingDown, } pub type Result = std::result::Result; @@ -487,9 +504,17 @@ where let server_id = self.require_initialized()?; // Reserve name before expensive IO (e.g. loading the preserved catalog) - let db_reservation = self.config.create_db(rules.name.clone())?; - self.persist_database_rules(rules.clone()).await?; + let mut db_reservation = self.config.create_db( + Arc::clone(&self.store), + Arc::clone(&self.exec), + server_id, + rules.name.clone(), + )?; + // register rules + db_reservation.advance_rules_loaded(rules.clone())?; + + // load preserved catalog let (preserved_catalog, catalog) = create_preserved_catalog( rules.db_name(), Arc::clone(&self.store), @@ -499,21 +524,13 @@ where .await .map_err(|e| Box::new(e) as _) .context(CannotCreatePreservedCatalog)?; - let write_buffer = write_buffer::new(&rules) .map_err(|e| Error::CreatingWriteBufferForWriting { source: e })?; + db_reservation.advance_init(preserved_catalog, catalog, write_buffer)?; - let database_to_commit = DatabaseToCommit { - server_id, - object_store: Arc::clone(&self.store), - exec: Arc::clone(&self.exec), - preserved_catalog, - catalog, - rules, - write_buffer, - }; - - db_reservation.commit_db(database_to_commit)?; + // ready to commit + self.persist_database_rules(rules.clone()).await?; + db_reservation.commit(); Ok(()) } @@ -854,18 +871,11 @@ where }); let object_store = Arc::clone(&self.store); let config = Arc::clone(&self.config); - let exec = Arc::clone(&self.exec); let server_id = self.require_id()?; let init_status = Arc::clone(&self.init_status); let task = async move { init_status - .wipe_preserved_catalog_and_maybe_recover( - object_store, - config, - exec, - server_id, - db_name, - ) + .wipe_preserved_catalog_and_maybe_recover(object_store, config, server_id, db_name) .await }; tokio::spawn(task.track(registration)); @@ -1918,7 +1928,7 @@ mod tests { // creating failed DBs does not work let err = create_simple_database(&server, "bar").await.unwrap_err(); - assert_eq!(err.to_string(), "database already exists"); + assert_eq!(err.to_string(), "database already exists: bar"); } #[tokio::test] @@ -2015,7 +2025,7 @@ mod tests { .wipe_preserved_catalog(db_name_existing.clone()) .unwrap_err() .to_string(), - "database already exists" + "database already exists: db_existing" ); assert!(PreservedCatalog::exists( &server.store, @@ -2112,7 +2122,7 @@ mod tests { .wipe_preserved_catalog(db_name_created.clone()) .unwrap_err() .to_string(), - "database already exists" + "database already exists: db_created" ); assert!(PreservedCatalog::exists( &server.store,