From 7b6d8f9327563801b483407d8611732082ebbef7 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 3 Sep 2021 10:48:53 -0400 Subject: [PATCH] feat: Add an API for restoring a database that was marked deleted --- .../iox/management/v1/service.proto | 14 +- influxdb_iox_client/src/client/management.rs | 42 +++++ iox_object_store/src/lib.rs | 163 ++++++++++++++++-- server/src/database.rs | 33 ++++ server/src/lib.rs | 55 ++++++ src/commands/database.rs | 23 ++- src/influxdb_ioxd/rpc/error.rs | 3 + src/influxdb_ioxd/rpc/management.rs | 16 ++ tests/end_to_end_cases/management_cli.rs | 45 +++++ 9 files changed, 379 insertions(+), 15 deletions(-) diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index c7ce8b9a88..734679d118 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -37,6 +37,8 @@ service ManagementService { rpc DeleteDatabase(DeleteDatabaseRequest) returns (DeleteDatabaseResponse); + rpc RestoreDatabase(RestoreDatabaseRequest) returns (RestoreDatabaseResponse); + // List deleted databases and their metadata. rpc ListDeletedDatabases(ListDeletedDatabasesRequest) returns (ListDeletedDatabasesResponse); @@ -174,6 +176,16 @@ message DeleteDatabaseRequest { message DeleteDatabaseResponse {} +message RestoreDatabaseRequest { + // The generation ID of the deleted database. + uint64 generation_id = 1; + + // the name of the database + string db_name = 2; +} + +message RestoreDatabaseResponse {} + message ListDeletedDatabasesRequest {} message ListDeletedDatabasesResponse { @@ -461,7 +473,7 @@ message DeleteRequest { // table name string table_name = 2; - // start time range + // start time range string start_time = 3; // stop time range diff --git a/influxdb_iox_client/src/client/management.rs b/influxdb_iox_client/src/client/management.rs index bc3672ff6f..5b6392281e 100644 --- a/influxdb_iox_client/src/client/management.rs +++ b/influxdb_iox_client/src/client/management.rs @@ -145,6 +145,26 @@ pub enum DeleteDatabaseError { ServerError(tonic::Status), } +/// Errors returned by Client::delete_database +#[derive(Debug, Error)] +pub enum RestoreDatabaseError { + /// Database not found + #[error("Database not found")] + DatabaseNotFound, + + /// Server indicated that it is not (yet) available + #[error("Server unavailable: {}", .0.message())] + Unavailable(tonic::Status), + + /// Server ID is not set + #[error("Server ID not set")] + NoServerId, + + /// Client received an unexpected error from the server + #[error("Unexpected server error: {}: {}", .0.code(), .0.message())] + ServerError(tonic::Status), +} + /// Errors returned by Client::list_chunks #[derive(Debug, Error)] pub enum ListChunksError { @@ -639,6 +659,28 @@ impl Client { Ok(()) } + /// Restore database + pub async fn restore_database( + &mut self, + db_name: impl Into + Send, + generation_id: usize, + ) -> Result<(), RestoreDatabaseError> { + self.inner + .restore_database(RestoreDatabaseRequest { + db_name: db_name.into(), + generation_id: generation_id as u64, + }) + .await + .map_err(|status| match status.code() { + tonic::Code::NotFound => RestoreDatabaseError::DatabaseNotFound, + tonic::Code::FailedPrecondition => RestoreDatabaseError::NoServerId, + tonic::Code::Unavailable => RestoreDatabaseError::Unavailable(status), + _ => RestoreDatabaseError::ServerError(status), + })?; + + Ok(()) + } + /// List chunks in a database. pub async fn list_chunks( &mut self, diff --git a/iox_object_store/src/lib.rs b/iox_object_store/src/lib.rs index 6ea15b8f8c..28798ab1d6 100644 --- a/iox_object_store/src/lib.rs +++ b/iox_object_store/src/lib.rs @@ -31,7 +31,7 @@ use object_store::{ ObjectStore, ObjectStoreApi, Result, }; use observability_deps::tracing::warn; -use snafu::{ensure, ResultExt, Snafu}; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::{collections::BTreeMap, io, sync::Arc}; use tokio::sync::mpsc::channel; use tokio_stream::wrappers::ReceiverStream; @@ -56,6 +56,27 @@ pub enum IoxObjectStoreError { #[snafu(display("Multiple active databases found in object storage"))] MultipleActiveDatabasesFound, + + #[snafu(display("Cannot restore; there is already an active database named `{}`", name))] + ActiveDatabaseAlreadyExists { name: String }, + + #[snafu(display("Generation `{}` not found for database `{}`", generation_id, name))] + GenerationNotFound { + generation_id: GenerationId, + name: String, + }, + + #[snafu(display( + "Could not restore generation `{}` of database `{}`: {}", + generation_id, + name, + source + ))] + RestoreFailed { + generation_id: GenerationId, + name: String, + source: object_store::Error, + }, } /// Handles persistence of data for a particular database. Writes within its directory/prefix. @@ -150,18 +171,6 @@ impl IoxObjectStore { Ok(deleted_databases) } - // TODO: implement a function to restore a deleted database generation, given the - // relevant information returned from [`list_deleted_databases`]. - // See https://github.com/influxdata/influxdb_iox/issues/2199 - // pub async fn restore_deleted_database( - // inner: &ObjectStore, - // server_id: ServerId, - // name: &DatabaseName<'_>, - // generation_id: GenerationId, - // ) -> Result<()> { - // - // } - /// List database names in object storage along with all existing generations for each database /// and whether the generations are marked as deleted or not. Useful for finding candidates /// to restore or to permanently delete. Makes many more calls to object storage than @@ -363,6 +372,58 @@ impl IoxObjectStore { .await } + /// Remove the tombstone file to restore a database generation. Will return an error if this + /// generation is already active or if there is another database generation already active for + /// this database name. Returns the reactivated IoxObjectStore. + pub async fn restore_database( + inner: Arc, + server_id: ServerId, + database_name: &DatabaseName<'static>, + generation_id: GenerationId, + ) -> Result { + let root_path = RootPath::new(&inner, server_id, database_name); + + let generations = Self::list_generations(&inner, &root_path) + .await + .context(UnderlyingObjectStoreError)?; + + let active = generations.iter().find(|g| g.is_active()); + + ensure!( + active.is_none(), + ActiveDatabaseAlreadyExists { + name: database_name + } + ); + + let restore_candidate = generations + .iter() + .find(|g| g.id == generation_id && !g.is_active()) + .context(GenerationNotFound { + generation_id, + name: database_name.as_str(), + })?; + + let generation_path = root_path.generation_path(*restore_candidate); + let tombstone_path = TombstonePath::new(&generation_path); + + inner + .delete(&tombstone_path.inner) + .await + .context(RestoreFailed { + generation_id, + name: database_name.as_str(), + })?; + + Ok(Self::existing( + inner, + server_id, + database_name, + Generation::active(generation_id.inner), + root_path, + )) + } + // Catalog transaction file methods =========================================================== /// List all the catalog transaction files in object storage for this database. @@ -1237,4 +1298,80 @@ mod tests { assert_eq!(deleted_dbs[3].name, db_reincarnated); assert_eq!(deleted_dbs[3].generation_id.inner, 0); } + + async fn restore_database( + object_store: Arc, + server_id: ServerId, + database_name: &DatabaseName<'static>, + generation_id: GenerationId, + ) -> Result { + IoxObjectStore::restore_database( + Arc::clone(&object_store), + server_id, + database_name, + generation_id, + ) + .await + } + + #[tokio::test] + async fn restore_deleted_database() { + let object_store = make_object_store(); + let server_id = make_server_id(); + + // Create a database + let db = DatabaseName::new("db").unwrap(); + let db_iox_store = create_database(Arc::clone(&object_store), server_id, &db).await; + + // Delete the database + delete_database(&db_iox_store).await; + + // Create and delete it again so there are two deleted generations + let db_iox_store = create_database(Arc::clone(&object_store), server_id, &db).await; + delete_database(&db_iox_store).await; + + // Get one generation ID from the list of deleted databases + let deleted_dbs = IoxObjectStore::list_deleted_databases(&object_store, server_id) + .await + .unwrap(); + assert_eq!(deleted_dbs.len(), 2); + let deleted_db = deleted_dbs.iter().find(|d| d.name == db).unwrap(); + + // Restore the generation + restore_database( + Arc::clone(&object_store), + server_id, + &db, + deleted_db.generation_id, + ) + .await + .unwrap(); + + // The database should be in the list of all databases again + let all_dbs = IoxObjectStore::list_all_databases(&object_store, server_id) + .await + .unwrap(); + assert_eq!(all_dbs.len(), 1); + + // The other deleted generation should be the only item in the deleted list + let deleted_dbs = IoxObjectStore::list_deleted_databases(&object_store, server_id) + .await + .unwrap(); + assert_eq!(deleted_dbs.len(), 1); + + // Try to restore the other deleted database + let deleted_db = deleted_dbs.iter().find(|d| d.name == db).unwrap(); + let err = restore_database( + Arc::clone(&object_store), + server_id, + &db, + deleted_db.generation_id, + ) + .await + .unwrap_err(); + assert_eq!( + err.to_string(), + "Cannot restore; there is already an active database named `db`" + ); + } } diff --git a/server/src/database.rs b/server/src/database.rs index 93d717fe5c..92031481fb 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -86,6 +86,9 @@ pub enum Error { #[snafu(display("no active database named {} to delete", db_name))] NoActiveDatabaseToDelete { db_name: String }, + + #[snafu(display("cannot restore database named {} that is already active", db_name))] + CannotRestoreActiveDatabase { db_name: String }, } #[derive(Debug, Snafu)] @@ -244,6 +247,36 @@ impl Database { Ok(()) } + /// Mark this database as restored. + pub async fn restore(&self, iox_object_store: IoxObjectStore) -> Result<(), Error> { + let db_name = &self.shared.config.name; + + let handle = { + let state = self.shared.state.read(); + + // Can't restore an already active database. + ensure!(!state.is_active(), CannotRestoreActiveDatabase { db_name }); + + state.try_freeze().context(TransitionInProgress { + db_name, + state: state.state_code(), + })? + }; + + let shared = Arc::clone(&self.shared); + + { + // Reset the state to initializing with the given iox object storage + let mut state = shared.state.write(); + *state.unfreeze(handle) = + DatabaseState::DatabaseObjectStoreFound(DatabaseStateDatabaseObjectStoreFound { + iox_object_store: Arc::new(iox_object_store), + }); + } + + Ok(()) + } + /// Triggers shutdown of this `Database` pub fn shutdown(&self) { info!(db_name=%self.shared.config.name, "database shutting down"); diff --git a/server/src/lib.rs b/server/src/lib.rs index 8b31c6a958..ffffd9736a 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -152,6 +152,14 @@ pub enum Error { #[snafu(display("{}", source))] CannotMarkDatabaseDeleted { source: crate::database::Error }, + #[snafu(display("{}", source))] + CannotRestoreDatabaseInObjectStorage { + source: iox_object_store::IoxObjectStoreError, + }, + + #[snafu(display("{}", source))] + CannotRestoreDatabase { source: crate::database::Error }, + #[snafu(display("database already exists: {}", db_name))] DatabaseAlreadyExists { db_name: String }, @@ -684,6 +692,53 @@ where Ok(()) } + /// Restore a database and generation that has been marked as deleted. Return an error if no + /// database with this generation can be found, or if there's already an active database with + /// this name. + pub async fn restore_database( + &self, + db_name: &DatabaseName<'static>, + generation_id: u64, + ) -> Result<()> { + let (server_id, database) = { + let state = self.shared.state.read(); + let initialized = state.initialized()?; + + let database = Arc::clone( + initialized + .databases + .get(db_name) + .context(DatabaseNotFound { db_name })?, + ); + + if let Some(init_error) = database.init_error() { + if !matches!(&*init_error, database::InitError::NoActiveDatabase) { + return DatabaseAlreadyExists { db_name }.fail(); + } + } + + (initialized.server_id, database) + }; + + let iox_object_store = IoxObjectStore::restore_database( + Arc::clone(self.shared.application.object_store()), + server_id, + db_name, + GenerationId { + inner: generation_id as usize, + }, + ) + .await + .context(CannotRestoreDatabaseInObjectStorage)?; + + database + .restore(iox_object_store) + .await + .context(CannotRestoreDatabase)?; + + Ok(()) + } + /// List all deleted databases in object storage. pub async fn list_deleted_databases(&self) -> Result> { let server_id = { diff --git a/src/commands/database.rs b/src/commands/database.rs index 6a5f355390..d0dd7678c5 100644 --- a/src/commands/database.rs +++ b/src/commands/database.rs @@ -7,7 +7,7 @@ use influxdb_iox_client::{ format::QueryOutputFormat, management::{ self, generated_types::*, CreateDatabaseError, DeleteDatabaseError, GetDatabaseError, - ListDatabaseError, + ListDatabaseError, RestoreDatabaseError, }, write::{self, WriteError}, }; @@ -37,6 +37,9 @@ pub enum Error { #[error("Error deleting database: {0}")] DeleteDatabaseError(#[from] DeleteDatabaseError), + #[error("Error restoring database: {0}")] + RestoreDatabaseError(#[from] RestoreDatabaseError), + #[error("Error reading file {:?}: {}", file_name, source)] ReadingFile { file_name: PathBuf, @@ -184,6 +187,16 @@ struct Delete { name: String, } +/// Restore a deleted database generation +#[derive(Debug, StructOpt)] +struct Restore { + /// The generation ID of the database to restore + generation_id: usize, + + /// The name of the database to delete + name: String, +} + /// All possible subcommands for database #[derive(Debug, StructOpt)] enum Command { @@ -196,6 +209,7 @@ enum Command { Partition(partition::Config), Recover(recover::Config), Delete(Delete), + Restore(Restore), } pub async fn command(connection: Connection, config: Config) -> Result<()> { @@ -331,6 +345,13 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { client.delete_database(&command.name).await?; println!("Deleted database {}", command.name); } + Command::Restore(command) => { + let mut client = management::Client::new(connection); + client + .restore_database(&command.name, command.generation_id) + .await?; + println!("Restored database {}", command.name); + } } Ok(()) diff --git a/src/influxdb_ioxd/rpc/error.rs b/src/influxdb_ioxd/rpc/error.rs index 1928b22f3d..48b7d2435d 100644 --- a/src/influxdb_ioxd/rpc/error.rs +++ b/src/influxdb_ioxd/rpc/error.rs @@ -133,6 +133,9 @@ pub fn default_database_error_handler(error: server::database::Error) -> tonic:: ..Default::default() } .into(), + Error::CannotRestoreActiveDatabase { .. } => { + tonic::Status::failed_precondition(error.to_string()) + } } } diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 2b97ff5407..278ee303ac 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -179,6 +179,22 @@ where Ok(Response::new(DeleteDatabaseResponse {})) } + async fn restore_database( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let db_name = DatabaseName::new(request.db_name).field("db_name")?; + let generation_id = request.generation_id; + + self.server + .restore_database(&db_name, generation_id) + .await + .map_err(default_server_error_handler)?; + + Ok(Response::new(RestoreDatabaseResponse {})) + } + async fn list_deleted_databases( &self, _: Request, diff --git a/tests/end_to_end_cases/management_cli.rs b/tests/end_to_end_cases/management_cli.rs index 366fb96381..ce03987073 100644 --- a/tests/end_to_end_cases/management_cli.rs +++ b/tests/end_to_end_cases/management_cli.rs @@ -342,7 +342,52 @@ async fn delete_database() { .arg(addr) .assert() .success() + .stdout( + predicate::str::contains(format!("0 {}", db)) + .and(predicate::str::contains(format!("1 {}", db))), + ); + + // Restore generation 0 + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("restore") + .arg("0") + .arg(db) + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout(predicate::str::contains(format!( + "Restored database {}", + db + ))); + + // This database is back in the active list + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("list") + .arg("--host") + .arg(addr) + .assert() + .success() .stdout(predicate::str::contains(db)); + + // Only generation 1 is in the deleted list + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("database") + .arg("list") + .arg("--deleted") + .arg("--host") + .arg(addr) + .assert() + .success() + .stdout( + predicate::str::contains(format!("1 {}", db)) + .and(predicate::str::contains(format!("0 {}", db)).not()), + ); } #[tokio::test]