diff --git a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs index 8b14681df4..552e47ecf7 100644 --- a/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs +++ b/influxdb_iox/src/influxdb_ioxd/server_type/database/rpc/management.rs @@ -84,12 +84,8 @@ impl management_service_server::ManagementService for ManagementService { .await .map_err(default_server_error_handler)?; - let uuid = database - .uuid() - .expect("Database should be initialized or an error should have been returned"); - Ok(Response::new(CreateDatabaseResponse { - uuid: uuid.as_bytes().to_vec(), + uuid: database.uuid().as_bytes().to_vec(), })) } @@ -389,10 +385,7 @@ impl management_service_server::ManagementService for ManagementService { message: e.to_string(), }), state: database.state_code().into(), - uuid: database - .uuid() - .map(|uuid| uuid.as_bytes().to_vec()) - .unwrap_or_default(), + uuid: database.uuid().as_bytes().to_vec(), }) .collect() }) diff --git a/iox_object_store/src/lib.rs b/iox_object_store/src/lib.rs index b50c86c8f0..b5c22aead1 100644 --- a/iox_object_store/src/lib.rs +++ b/iox_object_store/src/lib.rs @@ -165,7 +165,7 @@ impl IoxObjectStore { /// Access the database-specific object storage files for an existing database that has /// already been located and verified to be active. Does not check object storage. - fn existing(inner: Arc, root_path: RootPath) -> Self { + pub fn existing(inner: Arc, root_path: RootPath) -> Self { let data_path = root_path.data_path(); let transactions_path = root_path.transactions_path(); diff --git a/server/src/database.rs b/server/src/database.rs index 86eb11ff8c..5f0657a2d0 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -171,9 +171,18 @@ impl Database { "new database" ); + let path = IoxObjectStore::root_path_for(application.object_store(), config.database_uuid); + // The database state machine handles the case of this path not existing, as it will end + // up in [`DatabaseState::RulesLoadError`] or [`DatabaseState::OwnerInfoLoadError`] + let iox_object_store = Arc::new(IoxObjectStore::existing( + Arc::clone(application.object_store()), + path, + )); + let shared = Arc::new(DatabaseShared { config: RwLock::new(config), application, + iox_object_store, state: RwLock::new(Freezable::new(DatabaseState::Shutdown(None))), state_notify: Default::default(), }); @@ -191,20 +200,6 @@ impl Database { let db_name = self.name(); let db_name = db_name.as_str(); - let (iox_object_store, uuid) = { - let state = self.shared.state.read(); - - let store = state - .iox_object_store() - .context(CannotReleaseUnownedSnafu { db_name })?; - - let uuid = state - .uuid() - .context(CannotReleaseUnownedSnafu { db_name })?; - - (store, uuid) - }; - self.shutdown(); let _ = self.join().await.log_if_error("releasing database"); @@ -212,12 +207,12 @@ impl Database { None, None, self.shared.application.time_provider().now(), - &iox_object_store, + &self.shared.iox_object_store, ) .await .context(CannotReleaseSnafu { db_name })?; - Ok(uuid) + Ok(self.uuid()) } /// Triggers shutdown of this `Database` if it is running @@ -295,9 +290,9 @@ impl Database { self.shared.state.read().provided_rules() } - /// Returns the database UUID if it's loaded - pub fn uuid(&self) -> Option { - self.shared.state.read().uuid() + /// Returns the database UUID + pub fn uuid(&self) -> Uuid { + self.shared.config.read().database_uuid } /// Returns the info about the owning server if it has been loaded @@ -305,11 +300,6 @@ impl Database { self.shared.state.read().owner_info() } - /// Location in object store; may not actually exist yet - pub fn location(&self) -> String { - self.shared.config.read().location.clone() - } - /// Database name pub fn name(&self) -> DatabaseName<'static> { self.shared.config.read().name.clone() @@ -379,9 +369,9 @@ impl Database { } } - /// Returns the IoxObjectStore if it has been found - pub fn iox_object_store(&self) -> Option> { - self.shared.state.read().iox_object_store() + /// Returns the IoxObjectStore + pub fn iox_object_store(&self) -> Arc { + Arc::clone(&self.shared.iox_object_store) } /// Gets access to an initialized `Db` @@ -404,14 +394,11 @@ impl Database { // the notification being fired, and this task waking up match &**self.shared.state.read() { DatabaseState::Known(_) - | DatabaseState::DatabaseObjectStoreFound(_) | DatabaseState::OwnerInfoLoaded(_) | DatabaseState::RulesLoaded(_) | DatabaseState::CatalogLoaded(_) => {} // Non-terminal state DatabaseState::Initialized(_) => return Ok(()), - DatabaseState::DatabaseObjectStoreLookupError(_, e) - | DatabaseState::NoActiveDatabase(_, e) - | DatabaseState::OwnerInfoLoadError(_, e) + DatabaseState::OwnerInfoLoadError(_, e) | DatabaseState::RulesLoadError(_, e) | DatabaseState::CatalogLoadError(_, e) | DatabaseState::WriteBufferCreationError(_, e) @@ -427,29 +414,19 @@ impl Database { pub async fn wipe_preserved_catalog(self: &Arc) -> Result, Error> { let db_name = self.name(); - // TODO: Make IOxObjectStore immutable property of Database - let iox_object_store = match &**self.shared.state.read() { - DatabaseState::CatalogLoadError(rules_loaded, err) => { - warn!(%db_name, %err, "Requested wiping catalog in CatalogLoadError state"); - Arc::clone(rules_loaded.iox_object_store()) - } - DatabaseState::WriteBufferCreationError(catalog_loaded, err) => { - warn!(%db_name, %err, "Requested wiping catalog in WriteBufferCreationError state"); - catalog_loaded.iox_object_store() - } - DatabaseState::ReplayError(catalog_loaded, err) => { - warn!(%db_name, %err, "Requested wiping catalog in ReplayError state"); - catalog_loaded.iox_object_store() - } + match self.state_code() { + DatabaseStateCode::CatalogLoadError + | DatabaseStateCode::WriteBufferCreationError + | DatabaseStateCode::ReplayError => {} state => { return InvalidStateForWipePreservedCatalogSnafu { db_name, - state: state.state_code(), + state, expected: "CatalogLoadError, WriteBufferCreationError, ReplayError", } .fail() } - }; + } // Shutdown database self.shutdown(); @@ -470,7 +447,7 @@ impl Database { tokio::spawn( async move { // wipe the actual catalog - PreservedCatalog::wipe(&iox_object_store) + PreservedCatalog::wipe(&this.shared.iox_object_store) .await .map_err(Box::new) .context(WipePreservedCatalogSnafu { db_name: &db_name })?; @@ -507,15 +484,6 @@ impl Database { let shared = Arc::clone(&self.shared); let db_name = self.name(); - // TODO: Make IOxObjectStore immutable property of Database - let iox_object_store = self - .iox_object_store() - .context(InvalidStateForRebuildSnafu { - db_name: &db_name, - state: shared.state.read().state_code(), - expected: "Object store initialized", - })?; - // Shutdown database self.shutdown(); let _ = self.join().await.log_if_error("rebuilding catalog"); @@ -537,7 +505,7 @@ impl Database { info!(%db_name, "rebuilding catalog from parquet files"); // Now wipe the catalog and rebuild it from parquet files - PreservedCatalog::wipe(iox_object_store.as_ref()) + PreservedCatalog::wipe(&this.shared.iox_object_store) .await .map_err(Box::new) .context(WipePreservedCatalogSnafu { db_name: &db_name })?; @@ -545,7 +513,7 @@ impl Database { info!(%db_name, "wiped preserved catalog"); let config = PreservedCatalogConfig::new( - Arc::clone(&iox_object_store), + Arc::clone(&this.shared.iox_object_store), db_name.to_string(), Arc::clone(shared.application.time_provider()), ); @@ -821,6 +789,7 @@ mod tests { }; use std::time::Duration; use std::{num::NonZeroU32, time::Instant}; + use test_helpers::assert_contains; use uuid::Uuid; use write_buffer::mock::MockBufferSharedState; @@ -866,13 +835,13 @@ mod tests { let server_id = ServerId::try_from(1).unwrap(); let application = make_application(); - let db_name = DatabaseName::new("test").unwrap(); - let uuid = Uuid::new_v4(); - let provided_rules = ProvidedDatabaseRules::new_empty(db_name.clone()); + let name = DatabaseName::new("test").unwrap(); + let database_uuid = Uuid::new_v4(); + let provided_rules = ProvidedDatabaseRules::new_empty(name.clone()); - let location = create_empty_db_in_object_store( + create_empty_db_in_object_store( Arc::clone(&application), - uuid, + database_uuid, provided_rules, server_id, ) @@ -880,9 +849,9 @@ mod tests { .unwrap(); let db_config = DatabaseConfig { - name: db_name, - location, + name, server_id, + database_uuid, wipe_catalog_on_error: false, skip_replay: false, }; @@ -897,7 +866,7 @@ mod tests { let server_id = database.shared.config.read().server_id; let server_location = IoxObjectStore::server_config_path(application.object_store(), server_id).to_string(); - let iox_object_store = database.iox_object_store().unwrap(); + let iox_object_store = database.iox_object_store(); database.release().await.unwrap(); @@ -921,7 +890,7 @@ mod tests { let server_id = database.shared.config.read().server_id; let server_location = IoxObjectStore::server_config_path(application.object_store(), server_id).to_string(); - let iox_object_store = database.iox_object_store().unwrap(); + let iox_object_store = database.iox_object_store(); let new_server_id = ServerId::try_from(2).unwrap(); let new_server_location = IoxObjectStore::server_config_path(application.object_store(), new_server_id) @@ -985,16 +954,13 @@ mod tests { assert!(database.is_initialized()); // Delete the rules - let iox_object_store = database.iox_object_store().unwrap(); + let iox_object_store = database.iox_object_store(); iox_object_store.delete_database_rules_file().await.unwrap(); // Restart should fail - let err = database.restart().await.unwrap_err(); - assert!( - matches!(err.as_ref(), InitError::DatabaseObjectStoreLookup { .. }), - "{:?}", - err - ); + let err = database.restart().await.unwrap_err().to_string(); + assert_contains!(&err, "error loading database rules"); + assert_contains!(&err, "not found"); } #[tokio::test] @@ -1033,7 +999,8 @@ mod tests { ..Default::default() }), }; - let location = create_empty_db_in_object_store( + + create_empty_db_in_object_store( Arc::clone(&application), uuid, make_provided_rules(rules), @@ -1041,10 +1008,11 @@ mod tests { ) .await .unwrap(); + let db_config = DatabaseConfig { name: db_name, - location, server_id, + database_uuid: uuid, wipe_catalog_on_error: false, skip_replay: false, }; @@ -1142,7 +1110,8 @@ mod tests { ..Default::default() }), }; - let location = create_empty_db_in_object_store( + + create_empty_db_in_object_store( Arc::clone(&application), uuid, make_provided_rules(rules), @@ -1150,10 +1119,11 @@ mod tests { ) .await .unwrap(); + let db_config = DatabaseConfig { name: db_name, - location, server_id, + database_uuid: uuid, wipe_catalog_on_error: false, skip_replay: false, }; @@ -1185,7 +1155,7 @@ mod tests { #[tokio::test] async fn database_init_recovery() { let (application, database) = initialized_database().await; - let iox_object_store = database.iox_object_store().unwrap(); + let iox_object_store = database.iox_object_store(); let config = database.shared.config.read().clone(); // shutdown first database diff --git a/server/src/database/init.rs b/server/src/database/init.rs index 951ea52e56..01f550d997 100644 --- a/server/src/database/init.rs +++ b/server/src/database/init.rs @@ -36,9 +36,6 @@ pub enum InitError { source: iox_object_store::IoxObjectStoreError, }, - #[snafu(display("no active database found in object storage, not loading"))] - NoActiveDatabase, - #[snafu(display( "Database name in deserialized rules ({}) does not match expected value ({})", actual, @@ -106,15 +103,9 @@ pub enum InitError { /// ```text /// (start) /// | -/// o-o o--------|-------------------o o-o -/// | V V V V V | -/// [NoActiveDatabase]<--[Known]------------->[DatabaseObjectStoreLookupError] -/// | | | -/// o----------------+---------------------------o -/// | -/// | o-o -/// V V | -/// [DatabaseObjectStoreFound]------>[OwnerInfoLoadError] +/// |----------------------------o o-o +/// V V V | +/// [Known]-------------->[OwnerInfoLoadError] /// | | /// +---------------------------o /// | @@ -163,7 +154,6 @@ pub(crate) enum DatabaseState { Shutdown(Option>), // Basic initialization sequence states: Known(DatabaseStateKnown), - DatabaseObjectStoreFound(DatabaseStateDatabaseObjectStoreFound), OwnerInfoLoaded(DatabaseStateOwnerInfoLoaded), RulesLoaded(DatabaseStateRulesLoaded), CatalogLoaded(DatabaseStateCatalogLoaded), @@ -172,9 +162,7 @@ pub(crate) enum DatabaseState { Initialized(DatabaseStateInitialized), // Error states, we'll try to recover from them - NoActiveDatabase(DatabaseStateKnown, Arc), - DatabaseObjectStoreLookupError(DatabaseStateKnown, Arc), - OwnerInfoLoadError(DatabaseStateDatabaseObjectStoreFound, Arc), + OwnerInfoLoadError(DatabaseStateKnown, Arc), RulesLoadError(DatabaseStateOwnerInfoLoaded, Arc), CatalogLoadError(DatabaseStateRulesLoaded, Arc), WriteBufferCreationError(DatabaseStateCatalogLoaded, Arc), @@ -197,17 +185,10 @@ impl DatabaseState { match self { DatabaseState::Shutdown(_) => DatabaseStateCode::Shutdown, DatabaseState::Known(_) => DatabaseStateCode::Known, - DatabaseState::DatabaseObjectStoreFound(_) => { - DatabaseStateCode::DatabaseObjectStoreFound - } DatabaseState::OwnerInfoLoaded(_) => DatabaseStateCode::OwnerInfoLoaded, DatabaseState::RulesLoaded(_) => DatabaseStateCode::RulesLoaded, DatabaseState::CatalogLoaded(_) => DatabaseStateCode::CatalogLoaded, DatabaseState::Initialized(_) => DatabaseStateCode::Initialized, - DatabaseState::DatabaseObjectStoreLookupError(_, _) => { - DatabaseStateCode::DatabaseObjectStoreLookupError - } - DatabaseState::NoActiveDatabase(_, _) => DatabaseStateCode::NoActiveDatabase, DatabaseState::OwnerInfoLoadError(_, _) => DatabaseStateCode::OwnerInfoLoadError, DatabaseState::RulesLoadError(_, _) => DatabaseStateCode::RulesLoadError, DatabaseState::CatalogLoadError(_, _) => DatabaseStateCode::CatalogLoadError, @@ -222,14 +203,11 @@ impl DatabaseState { match self { DatabaseState::Known(_) | DatabaseState::Shutdown(_) - | DatabaseState::DatabaseObjectStoreFound(_) | DatabaseState::OwnerInfoLoaded(_) | DatabaseState::RulesLoaded(_) | DatabaseState::CatalogLoaded(_) | DatabaseState::Initialized(_) => None, - DatabaseState::DatabaseObjectStoreLookupError(_, e) - | DatabaseState::NoActiveDatabase(_, e) - | DatabaseState::OwnerInfoLoadError(_, e) + DatabaseState::OwnerInfoLoadError(_, e) | DatabaseState::RulesLoadError(_, e) | DatabaseState::CatalogLoadError(_, e) | DatabaseState::WriteBufferCreationError(_, e) @@ -241,9 +219,6 @@ impl DatabaseState { match self { DatabaseState::Known(_) | DatabaseState::Shutdown(_) - | DatabaseState::DatabaseObjectStoreFound(_) - | DatabaseState::DatabaseObjectStoreLookupError(_, _) - | DatabaseState::NoActiveDatabase(_, _) | DatabaseState::OwnerInfoLoaded(_) | DatabaseState::OwnerInfoLoadError(_, _) | DatabaseState::RulesLoadError(_, _) => None, @@ -257,33 +232,10 @@ impl DatabaseState { } } - pub(crate) fn uuid(&self) -> Option { - match self { - DatabaseState::Known(_) - | DatabaseState::Shutdown(_) - | DatabaseState::DatabaseObjectStoreFound(_) - | DatabaseState::DatabaseObjectStoreLookupError(_, _) - | DatabaseState::NoActiveDatabase(_, _) - | DatabaseState::OwnerInfoLoaded(_) - | DatabaseState::OwnerInfoLoadError(_, _) - | DatabaseState::RulesLoadError(_, _) => None, - DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => { - Some(state.uuid) - } - DatabaseState::CatalogLoaded(state) - | DatabaseState::WriteBufferCreationError(state, _) - | DatabaseState::ReplayError(state, _) => Some(state.uuid), - DatabaseState::Initialized(state) => Some(state.uuid), - } - } - pub(crate) fn owner_info(&self) -> Option { match self { DatabaseState::Known(_) | DatabaseState::Shutdown(_) - | DatabaseState::DatabaseObjectStoreFound(_) - | DatabaseState::DatabaseObjectStoreLookupError(_, _) - | DatabaseState::NoActiveDatabase(_, _) | DatabaseState::OwnerInfoLoadError(_, _) | DatabaseState::RulesLoadError(_, _) => None, DatabaseState::OwnerInfoLoaded(state) => Some(state.owner_info.clone()), @@ -297,29 +249,6 @@ impl DatabaseState { } } - pub(crate) fn iox_object_store(&self) -> Option> { - match self { - DatabaseState::Known(_) - | DatabaseState::Shutdown(_) - | DatabaseState::DatabaseObjectStoreLookupError(_, _) - | DatabaseState::NoActiveDatabase(_, _) => None, - DatabaseState::DatabaseObjectStoreFound(state) - | DatabaseState::OwnerInfoLoadError(state, _) => { - Some(Arc::clone(&state.iox_object_store)) - } - DatabaseState::OwnerInfoLoaded(state) | DatabaseState::RulesLoadError(state, _) => { - Some(Arc::clone(&state.iox_object_store)) - } - DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => { - Some(Arc::clone(&state.iox_object_store)) - } - DatabaseState::CatalogLoaded(state) - | DatabaseState::WriteBufferCreationError(state, _) - | DatabaseState::ReplayError(state, _) => Some(state.db.iox_object_store()), - DatabaseState::Initialized(state) => Some(state.db.iox_object_store()), - } - } - /// Whether this is shutdown pub(crate) fn is_shutdown(&self) -> bool { matches!(self, DatabaseState::Shutdown(_)) @@ -337,37 +266,12 @@ impl DatabaseState { pub(crate) struct DatabaseStateKnown {} impl DatabaseStateKnown { - /// Find active object storage for this database - async fn advance( - &self, - shared: &DatabaseShared, - ) -> Result { - let location = shared.config.read().location.clone(); - let iox_object_store = IoxObjectStore::load_at_root_path( - Arc::clone(shared.application.object_store()), - &location, - ) - .await - .context(DatabaseObjectStoreLookupSnafu)?; - - Ok(DatabaseStateDatabaseObjectStoreFound { - iox_object_store: Arc::new(iox_object_store), - }) - } -} - -#[derive(Debug, Clone)] -pub(crate) struct DatabaseStateDatabaseObjectStoreFound { - iox_object_store: Arc, -} - -impl DatabaseStateDatabaseObjectStoreFound { /// Load owner info from object storage and verify it matches the current owner async fn advance( &self, shared: &DatabaseShared, ) -> Result { - let owner_info = fetch_owner_info(&self.iox_object_store) + let owner_info = fetch_owner_info(&shared.iox_object_store) .await .context(FetchingOwnerInfoSnafu)?; @@ -380,17 +284,13 @@ impl DatabaseStateDatabaseObjectStoreFound { .fail(); } - Ok(DatabaseStateOwnerInfoLoaded { - owner_info, - iox_object_store: Arc::clone(&self.iox_object_store), - }) + Ok(DatabaseStateOwnerInfoLoaded { owner_info }) } } #[derive(Debug, Clone)] pub(crate) struct DatabaseStateOwnerInfoLoaded { owner_info: management::v1::OwnerInfo, - iox_object_store: Arc, } impl DatabaseStateOwnerInfoLoaded { @@ -399,7 +299,7 @@ impl DatabaseStateOwnerInfoLoaded { &self, shared: &DatabaseShared, ) -> Result { - let rules = PersistedDatabaseRules::load(&self.iox_object_store) + let rules = PersistedDatabaseRules::load(&shared.iox_object_store) .await .context(LoadingRulesSnafu)?; @@ -416,7 +316,6 @@ impl DatabaseStateOwnerInfoLoaded { provided_rules: rules.provided_rules(), uuid: rules.uuid(), owner_info: self.owner_info.clone(), - iox_object_store: Arc::clone(&self.iox_object_store), }) } } @@ -426,14 +325,9 @@ pub(crate) struct DatabaseStateRulesLoaded { provided_rules: Arc, uuid: Uuid, owner_info: management::v1::OwnerInfo, - iox_object_store: Arc, } impl DatabaseStateRulesLoaded { - pub(crate) fn iox_object_store(&self) -> &Arc { - &self.iox_object_store - } - /// Load catalog from object storage async fn advance( &self, @@ -450,7 +344,7 @@ impl DatabaseStateRulesLoaded { }; let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog( db_name.as_str(), - Arc::clone(&self.iox_object_store), + Arc::clone(&shared.iox_object_store), Arc::clone(shared.application.metric_registry()), Arc::clone(shared.application.time_provider()), wipe_catalog_on_error, @@ -461,7 +355,7 @@ impl DatabaseStateRulesLoaded { let database_to_commit = DatabaseToCommit { server_id, - iox_object_store: Arc::clone(&self.iox_object_store), + iox_object_store: Arc::clone(&shared.iox_object_store), exec: Arc::clone(shared.application.executor()), rules: Arc::clone(self.provided_rules.rules()), preserved_catalog, @@ -499,10 +393,6 @@ pub(crate) struct DatabaseStateCatalogLoaded { } impl DatabaseStateCatalogLoaded { - pub(crate) fn iox_object_store(&self) -> Arc { - self.db.iox_object_store() - } - /// Perform replay async fn advance( &self, @@ -564,7 +454,6 @@ impl DatabaseStateCatalogLoaded { provided_rules: Arc::clone(&self.provided_rules), uuid: self.uuid, owner_info: self.owner_info.clone(), - iox_object_store: self.db.iox_object_store(), } } } @@ -629,20 +518,12 @@ pub(crate) async fn initialize_database(shared: &DatabaseShared, shutdown: Cance // Try to advance to the next state let next_state = match state { - DatabaseState::Known(state) - | DatabaseState::DatabaseObjectStoreLookupError(state, _) - | DatabaseState::NoActiveDatabase(state, _) => match state.advance(shared).await { - Ok(state) => DatabaseState::DatabaseObjectStoreFound(state), - Err(InitError::NoActiveDatabase) => { - DatabaseState::NoActiveDatabase(state, Arc::new(InitError::NoActiveDatabase)) + DatabaseState::Known(state) | DatabaseState::OwnerInfoLoadError(state, _) => { + match state.advance(shared).await { + Ok(state) => DatabaseState::OwnerInfoLoaded(state), + Err(e) => DatabaseState::OwnerInfoLoadError(state, Arc::new(e)), } - Err(e) => DatabaseState::DatabaseObjectStoreLookupError(state, Arc::new(e)), - }, - DatabaseState::DatabaseObjectStoreFound(state) - | DatabaseState::OwnerInfoLoadError(state, _) => match state.advance(shared).await { - Ok(state) => DatabaseState::OwnerInfoLoaded(state), - Err(e) => DatabaseState::OwnerInfoLoadError(state, Arc::new(e)), - }, + } DatabaseState::OwnerInfoLoaded(state) | DatabaseState::RulesLoadError(state, _) => { match state.advance(shared).await { Ok(state) => DatabaseState::RulesLoaded(state), diff --git a/server/src/database/state.rs b/server/src/database/state.rs index 4e764d1a4d..750c756a49 100644 --- a/server/src/database/state.rs +++ b/server/src/database/state.rs @@ -2,9 +2,11 @@ use crate::ApplicationState; use data_types::{server_id::ServerId, DatabaseName}; use internal_types::freezable::Freezable; +use iox_object_store::IoxObjectStore; use parking_lot::RwLock; use std::sync::Arc; use tokio::sync::Notify; +use uuid::Uuid; use super::init::DatabaseState; @@ -13,8 +15,8 @@ use super::init::DatabaseState; /// and how to perform startup activities. pub struct DatabaseConfig { pub name: DatabaseName<'static>, - pub location: String, pub server_id: ServerId, + pub database_uuid: Uuid, pub wipe_catalog_on_error: bool, pub skip_replay: bool, } @@ -28,6 +30,9 @@ pub(crate) struct DatabaseShared { /// Application-global state pub(crate) application: Arc, + /// Database object store + pub(crate) iox_object_store: Arc, + /// The initialization state of the `Database`, wrapped in a /// `Freezable` to ensure there is only one task with an /// outstanding intent to write at any time. diff --git a/server/src/lib.rs b/server/src/lib.rs index 20c62eef6d..df65a89366 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -286,6 +286,15 @@ pub enum InitError { #[snafu(display("error persisting initial server config to object storage: {}", source))] PersistInitialServerConfig { source: object_store::Error }, + + #[snafu(display("invalid database name in server config: {}", source))] + InvalidDatabase { source: DatabaseNameError }, + + #[snafu(display( + "invalid database uuid in server config while finding location: {}", + source + ))] + InvalidDatabaseLocation { source: uuid::Error }, } /// The stage of the server in the startup process @@ -348,6 +357,75 @@ struct ServerStateInitReady { server_id: ServerId, } +impl ServerStateInitReady { + /// Parse the UUID from an object storage path + /// + /// TODO: Encode this data directly in server config + fn parse_location(location: &str) -> Result { + // Strip trailing / if any + let location = location.strip_suffix('/').unwrap_or(location); + let uuid = location.rsplit('/').next().unwrap(); + std::str::FromStr::from_str(uuid).context(InvalidDatabaseLocationSnafu) + } + + async fn advance(&self, shared: &ServerShared) -> Result { + let fetch_result = IoxObjectStore::get_server_config_file( + shared.application.object_store(), + self.server_id, + ) + .await; + + let server_config_bytes = match fetch_result { + Ok(bytes) => bytes, + // If this is the first time starting up this server and there is no config file yet, + // this isn't a problem. Start an empty server config. + Err(object_store::Error::NotFound { .. }) => bytes::Bytes::new(), + Err(source) => return Err(InitError::GetServerConfig { source }), + }; + + let server_config = + generated_types::server_config::decode_persisted_server_config(server_config_bytes) + .map_err(|source| InitError::DeserializeServerConfig { source })?; + + let databases = server_config + .databases + .into_iter() + .map(|(name, location)| { + let database_name = DatabaseName::new(name).context(InvalidDatabaseSnafu)?; + let database_uuid = Self::parse_location(&location)?; + + let database = Database::new( + Arc::clone(&shared.application), + DatabaseConfig { + name: database_name.clone(), + database_uuid, + server_id: self.server_id, + wipe_catalog_on_error: self.wipe_catalog_on_error, + skip_replay: self.skip_replay_and_seek_instead, + }, + ); + + Ok((database_name, Arc::new(database))) + }) + .collect::>()?; + + let next_state = ServerStateInitialized { + server_id: self.server_id, + databases, + }; + + IoxObjectStore::put_server_config_file( + shared.application.object_store(), + self.server_id, + next_state.server_config(), + ) + .await + .map_err(|source| InitError::PersistInitialServerConfig { source })?; + + Ok(next_state) + } +} + #[derive(Debug)] struct ServerStateInitialized { server_id: ServerId, @@ -366,36 +444,19 @@ impl ServerStateInitialized { shared: &ServerShared, config: DatabaseConfig, ) -> Result<&Arc> { + use hashbrown::hash_map::Entry; + let db_name = config.name.clone(); - let database = match self.databases.entry(db_name.clone()) { - hashbrown::hash_map::Entry::Vacant(vacant) => vacant.insert(Arc::new(Database::new( + match self.databases.entry(db_name.clone()) { + Entry::Vacant(vacant) => Ok(vacant.insert(Arc::new(Database::new( Arc::clone(&shared.application), config, - ))), - hashbrown::hash_map::Entry::Occupied(mut existing) => { - if let Some(init_error) = existing.get().init_error() { - if matches!(&*init_error, database::init::InitError::NoActiveDatabase) { - existing.insert(Arc::new(Database::new( - Arc::clone(&shared.application), - config, - ))); - existing.into_mut() - } else { - return DatabaseAlreadyExistsSnafu { - db_name: config.name, - } - .fail(); - } - } else { - return DatabaseAlreadyExistsSnafu { - db_name: config.name, - } - .fail(); - } + )))), + Entry::Occupied(_) => DatabaseAlreadyExistsSnafu { + db_name: config.name, } - }; - - Ok(database) + .fail(), + } } /// Serialize the list of databases this server owns with their names and object storage @@ -405,7 +466,7 @@ impl ServerStateInitialized { databases: self .databases .iter() - .map(|(name, database)| (name.to_string(), database.location())) + .map(|(name, database)| (name.to_string(), database.iox_object_store().root_path())) .collect(), }; @@ -578,15 +639,14 @@ impl Server { initialized.server_id }; - let res = create_empty_db_in_object_store( + create_empty_db_in_object_store( Arc::clone(&self.shared.application), uuid, rules, server_id, ) - .await; - - let location = res.context(CannotCreateDatabaseSnafu)?; + .await + .context(CannotCreateDatabaseSnafu)?; let database = { let mut state = self.shared.state.write(); @@ -600,8 +660,8 @@ impl Server { &self.shared, DatabaseConfig { name: db_name, - location, server_id, + database_uuid: uuid, wipe_catalog_on_error: false, skip_replay: false, }, @@ -632,9 +692,7 @@ impl Server { let handle = handle_fut.await; let database = self.database(db_name)?; - let current = database - .uuid() - .expect("Previous line should return not found if the database is inactive"); + let current = database.uuid(); // If a UUID has been specified, it has to match this database's UUID // Should this check be here or in database.release? @@ -705,7 +763,7 @@ impl Server { // Check that this name is unique among currently active databases if let Ok(existing_db) = self.database(&db_name) { - if matches!(existing_db.uuid(), Some(existing_uuid) if existing_uuid == uuid) { + if existing_db.uuid() == uuid { return DatabaseAlreadyOwnedByThisServerSnafu { uuid }.fail(); } else { return DatabaseAlreadyExistsSnafu { db_name }.fail(); @@ -714,7 +772,7 @@ impl Server { // Mark the database as claimed in object storage and get its location for the server // config file - let location = claim_database_in_object_store( + claim_database_in_object_store( Arc::clone(&self.shared.application), &db_name, uuid, @@ -736,8 +794,8 @@ impl Server { &self.shared, DatabaseConfig { name: db_name.clone(), - location, server_id, + database_uuid: uuid, wipe_catalog_on_error: false, skip_replay: false, }, @@ -933,86 +991,10 @@ async fn maybe_initialize_server(shared: &ServerShared) { (init_ready, handle) }; - let maybe_databases = IoxObjectStore::get_server_config_file( - shared.application.object_store(), - init_ready.server_id, - ) - .await - .or_else(|e| match e { - // If this is the first time starting up this server and there is no config file yet, - // this isn't a problem. Start an empty server config. - object_store::Error::NotFound { .. } => Ok(bytes::Bytes::new()), - // Any other error is a problem. - _ => Err(InitError::GetServerConfig { source: e }), - }) - .and_then(|config| { - generated_types::server_config::decode_persisted_server_config(config) - .map_err(|e| InitError::DeserializeServerConfig { source: e }) - }) - .map(|server_config| { - server_config - .databases - .into_iter() - .map(|(name, location)| { - ( - DatabaseName::new(name).expect("serialized db names should be valid"), - location, - ) - }) - .collect::>() - }); - - let next_state = match maybe_databases { - Ok(databases) => { - let mut state = ServerStateInitialized { - server_id: init_ready.server_id, - databases: HashMap::with_capacity(databases.len()), - }; - - for (db_name, location) in databases { - state - .new_database( - shared, - DatabaseConfig { - name: db_name, - location, - server_id: init_ready.server_id, - wipe_catalog_on_error: init_ready.wipe_catalog_on_error, - skip_replay: init_ready.skip_replay_and_seek_instead, - }, - ) - .expect("database unique"); - } - - let bytes = state.server_config(); - - let config_written = IoxObjectStore::put_server_config_file( - shared.application.object_store(), - init_ready.server_id, - bytes, - ) - .await; - - match config_written { - Ok(_) => { - info!(server_id=%init_ready.server_id, "server initialized"); - ServerState::Initialized(state) - } - Err(e) => { - error!( - server_id=%init_ready.server_id, - %e, - "error persisting initial server config to object storage" - ); - ServerState::InitError( - init_ready, - Arc::new(InitError::PersistInitialServerConfig { source: e }), - ) - } - } - } + let next_state = match init_ready.advance(shared).await { + Ok(initialized) => ServerState::Initialized(initialized), Err(e) => { - error!(server_id=%init_ready.server_id, %e, "error initializing server"); + error!(%e, "error attempting to initialize server"); ServerState::InitError(init_ready, Arc::new(e)) } }; @@ -1258,7 +1240,7 @@ mod tests { .await .expect("failed to create database"); - let iox_object_store = bananas.iox_object_store().unwrap(); + let iox_object_store = bananas.iox_object_store(); let read_rules = PersistedDatabaseRules::load(&iox_object_store) .await .unwrap(); @@ -1290,7 +1272,7 @@ mod tests { .create_database(provided_rules2) .await .expect("failed to create 2nd db"); - let awesome_uuid = awesome.uuid().unwrap(); + let awesome_uuid = awesome.uuid(); // assert server config file exists and has 2 entries let config = server_config(application.object_store(), server_id).await; @@ -1387,7 +1369,7 @@ mod tests { let bananas = create_simple_database(&server, "bananas") .await .expect("failed to create database"); - let bananas_uuid = bananas.uuid().unwrap(); + let bananas_uuid = bananas.uuid(); assert!(bananas.is_initialized()); @@ -1409,11 +1391,11 @@ mod tests { .expect("failed to create database"); assert!(apples.is_initialized()); - let apples_uuid = apples.uuid().unwrap(); + let apples_uuid = apples.uuid(); assert_eq!(server.db_names_sorted(), vec!["apples", "bananas"]); - let bananas_object_store = bananas.iox_object_store().unwrap(); + let bananas_object_store = bananas.iox_object_store(); // Shutdown server to demonstrate that the server shutdown // causes the databases to shutdown @@ -1453,134 +1435,10 @@ mod tests { assert!(apples.init_error().is_none()); let err = bananas.wait_for_init().await.unwrap_err(); - assert_contains!(err.to_string(), "No rules found to load"); + assert_contains!(err.to_string(), "rules.pb not found"); assert!(Arc::ptr_eq(&err, &bananas.init_error().unwrap())); } - #[tokio::test] - async fn old_database_object_store_path() { - let application = make_application(); - let server = make_server(Arc::clone(&application)); - let server_id = ServerId::try_from(1).unwrap(); - let object_store = application.object_store(); - - // Databases used to be stored under the server ID. Construct a database in the old - // location and list that in the serialized server config. - let old_loc_db_uuid = Uuid::new_v4(); - let old_loc_db_name = DatabaseName::new("old").unwrap(); - - // Construct path in the old database location containing server ID - let mut old_root = object_store.new_path(); - old_root.push_dir(&server_id.to_string()); - old_root.push_dir(&old_loc_db_uuid.to_string()); - - // Write out a database owner file in the old location - let mut old_owner_path = old_root.clone(); - old_owner_path.set_file_name("owner.pb"); - let owner_info = management::v1::OwnerInfo { - id: server_id.get_u32(), - location: IoxObjectStore::server_config_path(object_store, server_id).to_string(), - transactions: vec![], - }; - let mut encoded_owner_info = bytes::BytesMut::new(); - generated_types::server_config::encode_database_owner_info( - &owner_info, - &mut encoded_owner_info, - ) - .expect("owner info serialization should be valid"); - let encoded_owner_info = encoded_owner_info.freeze(); - object_store - .put(&old_owner_path, encoded_owner_info) - .await - .unwrap(); - - // Write out a database rules file in the old location - let mut old_db_rules_path = old_root.clone(); - old_db_rules_path.set_file_name("rules.pb"); - let rules = management::v1::DatabaseRules { - name: old_loc_db_name.to_string(), - ..Default::default() - }; - let persisted_database_rules = management::v1::PersistedDatabaseRules { - uuid: old_loc_db_uuid.as_bytes().to_vec(), - rules: Some(rules), - }; - let mut encoded_rules = bytes::BytesMut::new(); - generated_types::database_rules::encode_persisted_database_rules( - &persisted_database_rules, - &mut encoded_rules, - ) - .unwrap(); - let encoded_rules = encoded_rules.freeze(); - object_store - .put(&old_db_rules_path, encoded_rules) - .await - .unwrap(); - - // Write out the server config with the database name and pointing to the old location - let old_location = old_root.to_raw().to_string(); - let server_config = management::v1::ServerConfig { - databases: [(old_loc_db_name.to_string(), old_location.clone())] - .into_iter() - .collect(), - }; - let mut encoded_server_config = bytes::BytesMut::new(); - generated_types::server_config::encode_persisted_server_config( - &server_config, - &mut encoded_server_config, - ) - .unwrap(); - let encoded_server_config = encoded_server_config.freeze(); - IoxObjectStore::put_server_config_file(object_store, server_id, encoded_server_config) - .await - .unwrap(); - - // The server should initialize successfully - server.set_id(server_id).unwrap(); - server.wait_for_init().await.unwrap(); - - // The database should initialize successfully - let old_loc_db = server.database(&old_loc_db_name).unwrap(); - old_loc_db.wait_for_init().await.unwrap(); - - // Database rules can be updated - let mut new_rules = DatabaseRules::new(old_loc_db_name.clone()); - new_rules.worker_cleanup_avg_sleep = Duration::from_secs(22); - server - .update_db_rules(make_provided_rules(new_rules.clone())) - .await - .unwrap(); - let updated = old_loc_db.provided_rules().unwrap(); - assert_eq!( - updated.rules().worker_cleanup_avg_sleep, - new_rules.worker_cleanup_avg_sleep - ); - - // Location remains the same - assert_eq!(old_loc_db.location(), old_location); - - // New databases are created in the current database location, `dbs` - let new_loc_db_name = DatabaseName::new("new").unwrap(); - let new_loc_rules = DatabaseRules::new(new_loc_db_name.clone()); - let new_loc_db = server - .create_database(make_provided_rules(new_loc_rules)) - .await - .unwrap(); - let new_loc_db_uuid = new_loc_db.uuid().unwrap(); - assert_eq!(new_loc_db.location(), format!("dbs/{}/", new_loc_db_uuid)); - - // Restarting the server with a database in the "old" location and a database in the "new" - // location works - std::mem::drop(server); - let server = make_server(Arc::clone(&application)); - server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server.wait_for_init().await.unwrap(); - let old_loc_db = server.database(&old_loc_db_name).unwrap(); - old_loc_db.wait_for_init().await.unwrap(); - let new_loc_db = server.database(&new_loc_db_name).unwrap(); - new_loc_db.wait_for_init().await.unwrap(); - } - #[tokio::test] async fn old_server_config_object_store_path() { let application = make_application(); @@ -1847,7 +1705,7 @@ mod tests { .expect("failed to create database"); // make the db rules for bar invalid - let iox_object_store = bar.iox_object_store().unwrap(); + let iox_object_store = bar.iox_object_store(); iox_object_store .put_database_rules_file(Bytes::from("x")) @@ -1861,7 +1719,7 @@ mod tests { .expect("failed to create database"); // make the owner info for baz say it's owned by a different server - let baz_iox_object_store = baz.iox_object_store().unwrap(); + let baz_iox_object_store = baz.iox_object_store(); let owner_info = management::v1::OwnerInfo { id: 2, location: "nodes/2/config.pb".to_string(), @@ -1990,7 +1848,7 @@ mod tests { // create database let foo = create_simple_database(&server, &foo_db_name).await.unwrap(); - let first_foo_uuid = foo.uuid().unwrap(); + let first_foo_uuid = foo.uuid(); // release database by name let released_uuid = server.release_database(&foo_db_name, None).await.unwrap(); @@ -2006,7 +1864,7 @@ mod tests { // create another database let foo = create_simple_database(&server, &foo_db_name).await.unwrap(); - let second_foo_uuid = foo.uuid().unwrap(); + let second_foo_uuid = foo.uuid(); // release database specifying UUID; error if UUID doesn't match let incorrect_uuid = Uuid::new_v4(); @@ -2107,7 +1965,7 @@ mod tests { let database = create_simple_database(&server1, &foo_db_name) .await .unwrap(); - let uuid = database.uuid().unwrap(); + let uuid = database.uuid(); // start server 2 let server2 = make_server(Arc::clone(&application)); @@ -2200,13 +2058,12 @@ mod tests { // tamper store to break one database rules_broken .iox_object_store() - .unwrap() .put_database_rules_file(Bytes::from("x")) .await .unwrap(); let config = PreservedCatalogConfig::new( - catalog_broken.iox_object_store().unwrap(), + catalog_broken.iox_object_store(), db_name_catalog_broken.to_string(), Arc::clone(application.time_provider()), ); @@ -2218,7 +2075,6 @@ mod tests { rules_broken .iox_object_store() - .unwrap() .get_database_rules_file() .await .unwrap(); @@ -2268,11 +2124,9 @@ mod tests { "error wiping preserved catalog: database (db_existing) in invalid state (Initialized) \ for wiping preserved catalog. Expected CatalogLoadError, WriteBufferCreationError, ReplayError" ); - assert!( - PreservedCatalog::exists(&existing.iox_object_store().unwrap()) - .await - .unwrap() - ); + assert!(PreservedCatalog::exists(&existing.iox_object_store()) + .await + .unwrap()); // 2. cannot wipe non-existent DB assert!(matches!( @@ -2336,11 +2190,9 @@ mod tests { database.wait_for_init().await.unwrap(); - assert!( - PreservedCatalog::exists(&catalog_broken.iox_object_store().unwrap()) - .await - .unwrap() - ); + assert!(PreservedCatalog::exists(&catalog_broken.iox_object_store()) + .await + .unwrap()); assert!(database.init_error().is_none()); let db = server.db(&db_name_catalog_broken).unwrap(); @@ -2363,11 +2215,9 @@ mod tests { "error wiping preserved catalog: database (db_created) in invalid state (Initialized) \ for wiping preserved catalog. Expected CatalogLoadError, WriteBufferCreationError, ReplayError" ); - assert!( - PreservedCatalog::exists(&created.iox_object_store().unwrap()) - .await - .unwrap() - ); + assert!(PreservedCatalog::exists(&created.iox_object_store()) + .await + .unwrap()); } fn default_rules(db_name: DatabaseName<'static>) -> ProvidedDatabaseRules { diff --git a/server/tests/delete.rs b/server/tests/delete.rs index 10a6d28ab3..685c533889 100644 --- a/server/tests/delete.rs +++ b/server/tests/delete.rs @@ -196,7 +196,7 @@ async fn delete_predicate_preservation() { database.restart().await.unwrap(); // ==================== do: remove checkpoint files ==================== - let iox_object_store = database.iox_object_store().unwrap(); + let iox_object_store = database.iox_object_store(); let files = iox_object_store .catalog_transaction_files() diff --git a/server/tests/write_buffer_lifecycle.rs b/server/tests/write_buffer_lifecycle.rs index ca93c6a8ef..f4d2164cf3 100644 --- a/server/tests/write_buffer_lifecycle.rs +++ b/server/tests/write_buffer_lifecycle.rs @@ -131,7 +131,7 @@ async fn write_buffer_lifecycle() { let database = databases.into_iter().next().unwrap(); database.wait_for_init().await.unwrap(); - let database_uuid = database.uuid().unwrap(); + let database_uuid = database.uuid(); let db = database.initialized_db().unwrap(); let batches = run_query(Arc::clone(&db), "select sum(bar) as n from table_1").await;