diff --git a/server/src/database.rs b/server/src/database.rs index 9e42e9d603..bc34dd87dc 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -190,22 +190,13 @@ impl Database { }, ); - let location = iox_object_store.root_path(); + let database_location = iox_object_store.root_path(); + let server_location = + IoxObjectStore::server_config_path(application.object_store(), server_id).to_string(); - let owner_info = management::v1::OwnerInfo { - id: server_id.get_u32(), - location: IoxObjectStore::server_config_path(application.object_store(), server_id) - .to_string(), - }; - let mut encoded = bytes::BytesMut::new(); - generated_types::server_config::encode_database_owner_info(&owner_info, &mut encoded) - .expect("owner info serialization should be valid"); - let encoded = encoded.freeze(); - - iox_object_store - .put_owner_file(encoded) + create_owner_info(server_id, server_location, &iox_object_store) .await - .context(SavingOwner)?; + .context(CreatingOwnerInfo)?; let rules_to_persist = PersistedDatabaseRules::new(uuid, provided_rules); rules_to_persist @@ -223,7 +214,7 @@ impl Database { .await .context(CannotCreatePreservedCatalog)?; - Ok(location) + Ok(database_location) } /// Mark this database as deleted. @@ -294,24 +285,15 @@ impl Database { other => other.context(IoxObjectStoreError)?, }; - let location = iox_object_store.root_path(); + let database_location = iox_object_store.root_path(); + let server_location = + IoxObjectStore::server_config_path(application.object_store(), server_id).to_string(); - let owner_info = management::v1::OwnerInfo { - id: server_id.get_u32(), - location: IoxObjectStore::server_config_path(application.object_store(), server_id) - .to_string(), - }; - let mut encoded = bytes::BytesMut::new(); - generated_types::server_config::encode_database_owner_info(&owner_info, &mut encoded) - .expect("owner info serialization should be valid"); - let encoded = encoded.freeze(); - - iox_object_store - .put_owner_file(encoded) + update_owner_info(server_id, server_location, &iox_object_store) .await - .context(SavingOwner)?; + .context(UpdatingOwnerInfo)?; - Ok(location) + Ok(database_location) } /// Triggers shutdown of this `Database` @@ -925,11 +907,14 @@ pub enum InitError { #[snafu(display("error during replay: {}", source))] Replay { source: crate::db::Error }, - #[snafu(display("error saving database owner: {}", source))] - SavingOwner { source: object_store::Error }, + #[snafu(display("error creating database owner info: {}", source))] + CreatingOwnerInfo { source: OwnerInfoCreateError }, #[snafu(display("error getting database owner info: {}", source))] - OwnerInfo { source: OwnerInfoError }, + FetchingOwnerInfo { source: OwnerInfoFetchError }, + + #[snafu(display("error updating database owner info: {}", source))] + UpdatingOwnerInfo { source: OwnerInfoUpdateError }, #[snafu(display( "Server ID in the database's owner info file ({}) does not match this server's ID ({})", @@ -1151,7 +1136,7 @@ impl DatabaseStateDatabaseObjectStoreFound { ) -> Result { let owner_info = fetch_owner_info(&self.iox_object_store) .await - .context(OwnerInfo)?; + .context(FetchingOwnerInfo)?; if owner_info.id != shared.config.server_id.get_u32() { return DatabaseOwnerMismatch { @@ -1169,7 +1154,7 @@ impl DatabaseStateDatabaseObjectStoreFound { } #[derive(Debug, Snafu)] -pub enum OwnerInfoError { +pub enum OwnerInfoFetchError { #[snafu(display("error loading owner info: {}", source))] Loading { source: object_store::Error }, @@ -1181,12 +1166,89 @@ pub enum OwnerInfoError { async fn fetch_owner_info( iox_object_store: &IoxObjectStore, -) -> Result { +) -> Result { let raw_owner_info = iox_object_store.get_owner_file().await.context(Loading)?; generated_types::server_config::decode_database_owner_info(raw_owner_info).context(Decoding) } +#[derive(Debug, Snafu)] +pub enum OwnerInfoCreateError { + #[snafu(display("could not create new owner info file; it already exists"))] + OwnerFileAlreadyExists, + + #[snafu(display("error creating database owner info file: {}", source))] + CreatingOwnerFile { source: object_store::Error }, +} + +/// Create a new owner info file for this database. Existing content at this location in object +/// storage is an error. +async fn create_owner_info( + server_id: ServerId, + server_location: String, + iox_object_store: &IoxObjectStore, +) -> Result<(), OwnerInfoCreateError> { + ensure!( + matches!( + iox_object_store.get_owner_file().await, + Err(object_store::Error::NotFound { .. }) + ), + OwnerFileAlreadyExists, + ); + + let owner_info = management::v1::OwnerInfo { + id: server_id.get_u32(), + location: server_location, + }; + let mut encoded = bytes::BytesMut::new(); + generated_types::server_config::encode_database_owner_info(&owner_info, &mut encoded) + .expect("owner info serialization should be valid"); + let encoded = encoded.freeze(); + + iox_object_store + .put_owner_file(encoded) + .await + .context(CreatingOwnerFile)?; + + Ok(()) +} + +#[derive(Debug, Snafu)] +pub enum OwnerInfoUpdateError { + #[snafu(display("could not fetch existing owner info: {}", source))] + CouldNotFetch { source: OwnerInfoFetchError }, + + #[snafu(display("error updating database owner info file: {}", source))] + UpdatingOwnerFile { source: object_store::Error }, +} + +/// Fetch existing owner info, set the `id` and `location`, insert a new entry into the transaction +/// history, and overwrite the contents of the owner file. Errors if the owner info file does NOT +/// currently exist. +async fn update_owner_info( + new_server_id: ServerId, + new_server_location: String, + iox_object_store: &IoxObjectStore, +) -> Result<(), OwnerInfoUpdateError> { + let mut owner_info = fetch_owner_info(iox_object_store) + .await + .context(CouldNotFetch)?; + + owner_info.id = new_server_id.get_u32(); + owner_info.location = new_server_location; + + let mut encoded = bytes::BytesMut::new(); + generated_types::server_config::encode_database_owner_info(&owner_info, &mut encoded) + .expect("owner info serialization should be valid"); + let encoded = encoded.freeze(); + + iox_object_store + .put_owner_file(encoded) + .await + .context(UpdatingOwnerFile)?; + Ok(()) +} + #[derive(Debug, Clone)] struct DatabaseStateOwnerInfoLoaded { owner_info: management::v1::OwnerInfo,