refactor: Extract functions for creating and updating owner info to match the fetching function
parent
d42c890333
commit
e160954b57
|
@ -190,22 +190,13 @@ impl Database {
|
|||
},
|
||||
);
|
||||
|
||||
let location = iox_object_store.root_path();
|
||||
let database_location = iox_object_store.root_path();
|
||||
let server_location =
|
||||
IoxObjectStore::server_config_path(application.object_store(), server_id).to_string();
|
||||
|
||||
let owner_info = management::v1::OwnerInfo {
|
||||
id: server_id.get_u32(),
|
||||
location: IoxObjectStore::server_config_path(application.object_store(), server_id)
|
||||
.to_string(),
|
||||
};
|
||||
let mut encoded = bytes::BytesMut::new();
|
||||
generated_types::server_config::encode_database_owner_info(&owner_info, &mut encoded)
|
||||
.expect("owner info serialization should be valid");
|
||||
let encoded = encoded.freeze();
|
||||
|
||||
iox_object_store
|
||||
.put_owner_file(encoded)
|
||||
create_owner_info(server_id, server_location, &iox_object_store)
|
||||
.await
|
||||
.context(SavingOwner)?;
|
||||
.context(CreatingOwnerInfo)?;
|
||||
|
||||
let rules_to_persist = PersistedDatabaseRules::new(uuid, provided_rules);
|
||||
rules_to_persist
|
||||
|
@ -223,7 +214,7 @@ impl Database {
|
|||
.await
|
||||
.context(CannotCreatePreservedCatalog)?;
|
||||
|
||||
Ok(location)
|
||||
Ok(database_location)
|
||||
}
|
||||
|
||||
/// Mark this database as deleted.
|
||||
|
@ -294,24 +285,15 @@ impl Database {
|
|||
other => other.context(IoxObjectStoreError)?,
|
||||
};
|
||||
|
||||
let location = iox_object_store.root_path();
|
||||
let database_location = iox_object_store.root_path();
|
||||
let server_location =
|
||||
IoxObjectStore::server_config_path(application.object_store(), server_id).to_string();
|
||||
|
||||
let owner_info = management::v1::OwnerInfo {
|
||||
id: server_id.get_u32(),
|
||||
location: IoxObjectStore::server_config_path(application.object_store(), server_id)
|
||||
.to_string(),
|
||||
};
|
||||
let mut encoded = bytes::BytesMut::new();
|
||||
generated_types::server_config::encode_database_owner_info(&owner_info, &mut encoded)
|
||||
.expect("owner info serialization should be valid");
|
||||
let encoded = encoded.freeze();
|
||||
|
||||
iox_object_store
|
||||
.put_owner_file(encoded)
|
||||
update_owner_info(server_id, server_location, &iox_object_store)
|
||||
.await
|
||||
.context(SavingOwner)?;
|
||||
.context(UpdatingOwnerInfo)?;
|
||||
|
||||
Ok(location)
|
||||
Ok(database_location)
|
||||
}
|
||||
|
||||
/// Triggers shutdown of this `Database`
|
||||
|
@ -925,11 +907,14 @@ pub enum InitError {
|
|||
#[snafu(display("error during replay: {}", source))]
|
||||
Replay { source: crate::db::Error },
|
||||
|
||||
#[snafu(display("error saving database owner: {}", source))]
|
||||
SavingOwner { source: object_store::Error },
|
||||
#[snafu(display("error creating database owner info: {}", source))]
|
||||
CreatingOwnerInfo { source: OwnerInfoCreateError },
|
||||
|
||||
#[snafu(display("error getting database owner info: {}", source))]
|
||||
OwnerInfo { source: OwnerInfoError },
|
||||
FetchingOwnerInfo { source: OwnerInfoFetchError },
|
||||
|
||||
#[snafu(display("error updating database owner info: {}", source))]
|
||||
UpdatingOwnerInfo { source: OwnerInfoUpdateError },
|
||||
|
||||
#[snafu(display(
|
||||
"Server ID in the database's owner info file ({}) does not match this server's ID ({})",
|
||||
|
@ -1151,7 +1136,7 @@ impl DatabaseStateDatabaseObjectStoreFound {
|
|||
) -> Result<DatabaseStateOwnerInfoLoaded, InitError> {
|
||||
let owner_info = fetch_owner_info(&self.iox_object_store)
|
||||
.await
|
||||
.context(OwnerInfo)?;
|
||||
.context(FetchingOwnerInfo)?;
|
||||
|
||||
if owner_info.id != shared.config.server_id.get_u32() {
|
||||
return DatabaseOwnerMismatch {
|
||||
|
@ -1169,7 +1154,7 @@ impl DatabaseStateDatabaseObjectStoreFound {
|
|||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum OwnerInfoError {
|
||||
pub enum OwnerInfoFetchError {
|
||||
#[snafu(display("error loading owner info: {}", source))]
|
||||
Loading { source: object_store::Error },
|
||||
|
||||
|
@ -1181,12 +1166,89 @@ pub enum OwnerInfoError {
|
|||
|
||||
async fn fetch_owner_info(
|
||||
iox_object_store: &IoxObjectStore,
|
||||
) -> Result<management::v1::OwnerInfo, OwnerInfoError> {
|
||||
) -> Result<management::v1::OwnerInfo, OwnerInfoFetchError> {
|
||||
let raw_owner_info = iox_object_store.get_owner_file().await.context(Loading)?;
|
||||
|
||||
generated_types::server_config::decode_database_owner_info(raw_owner_info).context(Decoding)
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum OwnerInfoCreateError {
|
||||
#[snafu(display("could not create new owner info file; it already exists"))]
|
||||
OwnerFileAlreadyExists,
|
||||
|
||||
#[snafu(display("error creating database owner info file: {}", source))]
|
||||
CreatingOwnerFile { source: object_store::Error },
|
||||
}
|
||||
|
||||
/// Create a new owner info file for this database. Existing content at this location in object
|
||||
/// storage is an error.
|
||||
async fn create_owner_info(
|
||||
server_id: ServerId,
|
||||
server_location: String,
|
||||
iox_object_store: &IoxObjectStore,
|
||||
) -> Result<(), OwnerInfoCreateError> {
|
||||
ensure!(
|
||||
matches!(
|
||||
iox_object_store.get_owner_file().await,
|
||||
Err(object_store::Error::NotFound { .. })
|
||||
),
|
||||
OwnerFileAlreadyExists,
|
||||
);
|
||||
|
||||
let owner_info = management::v1::OwnerInfo {
|
||||
id: server_id.get_u32(),
|
||||
location: server_location,
|
||||
};
|
||||
let mut encoded = bytes::BytesMut::new();
|
||||
generated_types::server_config::encode_database_owner_info(&owner_info, &mut encoded)
|
||||
.expect("owner info serialization should be valid");
|
||||
let encoded = encoded.freeze();
|
||||
|
||||
iox_object_store
|
||||
.put_owner_file(encoded)
|
||||
.await
|
||||
.context(CreatingOwnerFile)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum OwnerInfoUpdateError {
|
||||
#[snafu(display("could not fetch existing owner info: {}", source))]
|
||||
CouldNotFetch { source: OwnerInfoFetchError },
|
||||
|
||||
#[snafu(display("error updating database owner info file: {}", source))]
|
||||
UpdatingOwnerFile { source: object_store::Error },
|
||||
}
|
||||
|
||||
/// Fetch existing owner info, set the `id` and `location`, insert a new entry into the transaction
|
||||
/// history, and overwrite the contents of the owner file. Errors if the owner info file does NOT
|
||||
/// currently exist.
|
||||
async fn update_owner_info(
|
||||
new_server_id: ServerId,
|
||||
new_server_location: String,
|
||||
iox_object_store: &IoxObjectStore,
|
||||
) -> Result<(), OwnerInfoUpdateError> {
|
||||
let mut owner_info = fetch_owner_info(iox_object_store)
|
||||
.await
|
||||
.context(CouldNotFetch)?;
|
||||
|
||||
owner_info.id = new_server_id.get_u32();
|
||||
owner_info.location = new_server_location;
|
||||
|
||||
let mut encoded = bytes::BytesMut::new();
|
||||
generated_types::server_config::encode_database_owner_info(&owner_info, &mut encoded)
|
||||
.expect("owner info serialization should be valid");
|
||||
let encoded = encoded.freeze();
|
||||
|
||||
iox_object_store
|
||||
.put_owner_file(encoded)
|
||||
.await
|
||||
.context(UpdatingOwnerFile)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct DatabaseStateOwnerInfoLoaded {
|
||||
owner_info: management::v1::OwnerInfo,
|
||||
|
|
Loading…
Reference in New Issue