feat: Implement disown in Server and Database
parent
489fad0040
commit
fb7bde527f
|
@ -75,6 +75,7 @@ pub fn default_server_error_handler(error: server::Error) -> tonic::Status {
|
|||
Error::DatabaseAlreadyActive { .. } | Error::DatabaseAlreadyExists { .. } => {
|
||||
tonic::Status::already_exists(error.to_string())
|
||||
}
|
||||
Error::UuidMismatch { .. } => tonic::Status::invalid_argument(error.to_string()),
|
||||
Error::CouldNotGetDatabaseNameFromRules {
|
||||
source: DatabaseNameFromRulesError::DatabaseRulesNotFound { uuid, .. },
|
||||
} => tonic::Status::not_found(format!("Could not find a database with UUID `{}`", uuid)),
|
||||
|
@ -138,9 +139,13 @@ pub fn default_database_error_handler(error: server::database::Error) -> tonic::
|
|||
error!(%source, "Unexpected error deleting database");
|
||||
InternalError {}.into()
|
||||
}
|
||||
Error::CannotDeleteInactiveDatabase { .. } => {
|
||||
Error::CannotDeleteInactiveDatabase { .. } | Error::CannotDisownUnowned { .. } => {
|
||||
tonic::Status::failed_precondition(error.to_string())
|
||||
}
|
||||
Error::CannotDisown { source, .. } => {
|
||||
error!(%source, "Unexpected error disowning database");
|
||||
InternalError {}.into()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -91,6 +91,18 @@ pub enum Error {
|
|||
db_name
|
||||
))]
|
||||
CannotDeleteInactiveDatabase { db_name: String },
|
||||
|
||||
#[snafu(display(
|
||||
"cannot disown database named {} that has already been disowned",
|
||||
db_name
|
||||
))]
|
||||
CannotDisownUnowned { db_name: String },
|
||||
|
||||
#[snafu(display("cannot disown database {}: {}", db_name, source))]
|
||||
CannotDisown {
|
||||
db_name: String,
|
||||
source: OwnerInfoUpdateError,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
|
@ -260,6 +272,50 @@ impl Database {
|
|||
Ok(uuid)
|
||||
}
|
||||
|
||||
/// Disown this database from this server.
|
||||
pub async fn disown(&self, context: Option<String>) -> Result<Uuid, Error> {
|
||||
let db_name = &self.shared.config.name;
|
||||
|
||||
let handle = self.shared.state.read().freeze();
|
||||
let handle = handle.await;
|
||||
|
||||
let uuid = {
|
||||
let state = self.shared.state.read();
|
||||
// Can't disown an already disowned database
|
||||
ensure!(state.is_active(), CannotDisownUnowned { db_name });
|
||||
|
||||
state.uuid().expect("Active databases have UUIDs")
|
||||
};
|
||||
|
||||
info!(%db_name, %uuid, "disowning database");
|
||||
|
||||
// If there is an object store for this database, update its owner file to indicate the
|
||||
// database is now unowned and record the history of the state change.
|
||||
// If there isn't an object store, something is wrong and we shouldn't switch the
|
||||
// state without being able to write to object storage.
|
||||
let iox_object_store = self.iox_object_store().with_context(|| {
|
||||
let state = self.shared.state.read();
|
||||
TransitionInProgress {
|
||||
db_name: db_name.clone(),
|
||||
state: state.state_code(),
|
||||
}
|
||||
})?;
|
||||
|
||||
update_owner_info(None, None, &iox_object_store)
|
||||
.await
|
||||
.context(CannotDisown { db_name })?;
|
||||
|
||||
let mut state = self.shared.state.write();
|
||||
let mut state = state.unfreeze(handle);
|
||||
*state = DatabaseState::NoActiveDatabase(
|
||||
DatabaseStateKnown {},
|
||||
Arc::new(InitError::NoActiveDatabase),
|
||||
);
|
||||
self.shared.state_notify.notify_waiters();
|
||||
|
||||
Ok(uuid)
|
||||
}
|
||||
|
||||
/// Create a restored database without any state. Returns its location in object storage
|
||||
/// for saving in the server config file.
|
||||
pub async fn restore(
|
||||
|
@ -289,7 +345,7 @@ impl Database {
|
|||
let server_location =
|
||||
IoxObjectStore::server_config_path(application.object_store(), server_id).to_string();
|
||||
|
||||
update_owner_info(server_id, server_location, &iox_object_store)
|
||||
update_owner_info(Some(server_id), Some(server_location), &iox_object_store)
|
||||
.await
|
||||
.context(UpdatingOwnerInfo)?;
|
||||
|
||||
|
@ -1226,16 +1282,18 @@ pub enum OwnerInfoUpdateError {
|
|||
/// history, and overwrite the contents of the owner file. Errors if the owner info file does NOT
|
||||
/// currently exist.
|
||||
async fn update_owner_info(
|
||||
new_server_id: ServerId,
|
||||
new_server_location: String,
|
||||
new_server_id: Option<ServerId>,
|
||||
new_server_location: Option<String>,
|
||||
iox_object_store: &IoxObjectStore,
|
||||
) -> Result<(), OwnerInfoUpdateError> {
|
||||
let mut owner_info = fetch_owner_info(iox_object_store)
|
||||
.await
|
||||
.context(CouldNotFetch)?;
|
||||
|
||||
owner_info.id = new_server_id.get_u32();
|
||||
owner_info.location = new_server_location;
|
||||
// 0 is not a valid server ID, so it indicates "unowned".
|
||||
owner_info.id = new_server_id.map(|s| s.get_u32()).unwrap_or_default();
|
||||
// Owner location is empty when the database is unowned.
|
||||
owner_info.location = new_server_location.unwrap_or_default();
|
||||
|
||||
let mut encoded = bytes::BytesMut::new();
|
||||
generated_types::server_config::encode_database_owner_info(&owner_info, &mut encoded)
|
||||
|
@ -1606,6 +1664,45 @@ mod tests {
|
|||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn database_disown() {
|
||||
let (_application, database) = initialized_database().await;
|
||||
let iox_object_store = database.iox_object_store().unwrap();
|
||||
|
||||
database.disown(None).await.unwrap();
|
||||
|
||||
assert_eq!(database.state_code(), DatabaseStateCode::NoActiveDatabase);
|
||||
assert!(matches!(
|
||||
database.init_error().unwrap().as_ref(),
|
||||
InitError::NoActiveDatabase
|
||||
));
|
||||
|
||||
let owner_info = fetch_owner_info(&iox_object_store).await.unwrap();
|
||||
assert_eq!(owner_info.id, 0);
|
||||
assert_eq!(owner_info.location, "");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn database_disown_with_context() {
|
||||
let (_application, database) = initialized_database().await;
|
||||
let iox_object_store = database.iox_object_store().unwrap();
|
||||
|
||||
database
|
||||
.disown(Some("I don't like this database anymore".into()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(database.state_code(), DatabaseStateCode::NoActiveDatabase);
|
||||
assert!(matches!(
|
||||
database.init_error().unwrap().as_ref(),
|
||||
InitError::NoActiveDatabase
|
||||
));
|
||||
|
||||
let owner_info = fetch_owner_info(&iox_object_store).await.unwrap();
|
||||
assert_eq!(owner_info.id, 0);
|
||||
assert_eq!(owner_info.location, "");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn database_restart() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
|
|
@ -158,6 +158,9 @@ pub enum Error {
|
|||
#[snafu(display("{}", source))]
|
||||
CannotMarkDatabaseDeleted { source: crate::database::Error },
|
||||
|
||||
#[snafu(display("{}", source))]
|
||||
CannotDisownDatabase { source: crate::database::Error },
|
||||
|
||||
#[snafu(display("{}", source))]
|
||||
CannotRestoreDatabase { source: crate::database::InitError },
|
||||
|
||||
|
@ -167,6 +170,18 @@ pub enum Error {
|
|||
#[snafu(display("The database with UUID `{}` named `{}` is already active", uuid, name))]
|
||||
DatabaseAlreadyActive { name: String, uuid: Uuid },
|
||||
|
||||
#[snafu(display(
|
||||
"Could not disown {}: the UUID specified ({}) does not match the current UUID ({})",
|
||||
db_name,
|
||||
specified,
|
||||
current
|
||||
))]
|
||||
UuidMismatch {
|
||||
db_name: String,
|
||||
specified: Uuid,
|
||||
current: Uuid,
|
||||
},
|
||||
|
||||
#[snafu(display("Server error: {}", source))]
|
||||
ServerError { source: std::io::Error },
|
||||
|
||||
|
@ -753,7 +768,48 @@ where
|
|||
uuid: Option<Uuid>,
|
||||
context: Option<String>,
|
||||
) -> Result<Uuid> {
|
||||
unimplemented!()
|
||||
// Wait for exclusive access to mutate server state
|
||||
let handle_fut = self.shared.state.read().freeze();
|
||||
let handle = handle_fut.await;
|
||||
|
||||
let database = self.database(db_name)?;
|
||||
let current = database
|
||||
.uuid()
|
||||
.expect("Previous line should return not found if the database is inactive");
|
||||
|
||||
// If a UUID has been specified, it has to match this database's UUID
|
||||
// Should this check be here or in database.disown?
|
||||
if matches!(uuid, Some(specified) if specified != current) {
|
||||
return UuidMismatch {
|
||||
db_name: db_name.to_string(),
|
||||
specified: uuid.unwrap(),
|
||||
current,
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
let returned_uuid = database
|
||||
.disown(context)
|
||||
.await
|
||||
.context(CannotDisownDatabase)?;
|
||||
|
||||
{
|
||||
let mut state = self.shared.state.write();
|
||||
|
||||
// Exchange FreezeHandle for mutable access via WriteGuard
|
||||
let mut state = state.unfreeze(handle);
|
||||
|
||||
match &mut *state {
|
||||
ServerState::Initialized(initialized) => {
|
||||
initialized.databases.remove(db_name);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
self.persist_server_config().await?;
|
||||
|
||||
Ok(returned_uuid)
|
||||
}
|
||||
|
||||
/// Restore a database that has been marked as deleted. Return an error if no database with
|
||||
|
@ -2377,6 +2433,75 @@ mod tests {
|
|||
server.update_db_rules(provided_rules).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn disown_database_removes_from_memory_and_persisted_config() {
|
||||
let application = make_application();
|
||||
let server_id = ServerId::try_from(1).unwrap();
|
||||
|
||||
let foo_db_name = DatabaseName::new("foo").unwrap();
|
||||
|
||||
// start server
|
||||
let server = make_server(Arc::clone(&application));
|
||||
server.set_id(server_id).unwrap();
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
// create database
|
||||
let foo = create_simple_database(&server, &foo_db_name).await.unwrap();
|
||||
let first_foo_uuid = foo.uuid().unwrap();
|
||||
|
||||
// disown database by name
|
||||
let disowned_uuid = server
|
||||
.disown_database(&foo_db_name, None, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(first_foo_uuid, disowned_uuid);
|
||||
|
||||
assert_error!(
|
||||
server.database(&foo_db_name),
|
||||
Error::DatabaseNotFound { .. },
|
||||
);
|
||||
|
||||
let config = server_config(application.object_store(), server_id).await;
|
||||
assert_config_contents(&config, &[]);
|
||||
|
||||
// create another database
|
||||
let foo = create_simple_database(&server, &foo_db_name).await.unwrap();
|
||||
let second_foo_uuid = foo.uuid().unwrap();
|
||||
|
||||
// disown database specifying UUID; error if UUID doesn't match
|
||||
let incorrect_uuid = Uuid::new_v4();
|
||||
assert_error!(
|
||||
server
|
||||
.disown_database(&foo_db_name, Some(incorrect_uuid), None)
|
||||
.await,
|
||||
Error::UuidMismatch { .. }
|
||||
);
|
||||
|
||||
// disown database specifying UUID works if UUID *does* match
|
||||
server
|
||||
.disown_database(&foo_db_name, Some(second_foo_uuid), None)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cant_disown_nonexistent_database() {
|
||||
let application = make_application();
|
||||
let server_id = ServerId::try_from(1).unwrap();
|
||||
|
||||
let foo_db_name = DatabaseName::new("foo").unwrap();
|
||||
|
||||
// start server
|
||||
let server = make_server(Arc::clone(&application));
|
||||
server.set_id(server_id).unwrap();
|
||||
server.wait_for_init().await.unwrap();
|
||||
|
||||
assert_error!(
|
||||
server.disown_database(&foo_db_name, None, None).await,
|
||||
Error::DatabaseNotFound { .. },
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cant_restore_nonexistent_database() {
|
||||
let application = make_application();
|
||||
|
|
Loading…
Reference in New Issue