feat: Add an API for restoring a database that was marked deleted

pull/24376/head
Carol (Nichols || Goulding) 2021-09-03 10:48:53 -04:00
parent 185f45f56b
commit 7b6d8f9327
9 changed files with 379 additions and 15 deletions

View File

@ -37,6 +37,8 @@ service ManagementService {
rpc DeleteDatabase(DeleteDatabaseRequest) returns (DeleteDatabaseResponse);
rpc RestoreDatabase(RestoreDatabaseRequest) returns (RestoreDatabaseResponse);
// List deleted databases and their metadata.
rpc ListDeletedDatabases(ListDeletedDatabasesRequest) returns (ListDeletedDatabasesResponse);
@ -174,6 +176,16 @@ message DeleteDatabaseRequest {
message DeleteDatabaseResponse {}
message RestoreDatabaseRequest {
// The generation ID of the deleted database.
uint64 generation_id = 1;
// the name of the database
string db_name = 2;
}
message RestoreDatabaseResponse {}
message ListDeletedDatabasesRequest {}
message ListDeletedDatabasesResponse {
@ -461,7 +473,7 @@ message DeleteRequest {
// table name
string table_name = 2;
// start time range
// start time range
string start_time = 3;
// stop time range

View File

@ -145,6 +145,26 @@ pub enum DeleteDatabaseError {
ServerError(tonic::Status),
}
/// Errors returned by Client::delete_database
#[derive(Debug, Error)]
pub enum RestoreDatabaseError {
/// Database not found
#[error("Database not found")]
DatabaseNotFound,
/// Server indicated that it is not (yet) available
#[error("Server unavailable: {}", .0.message())]
Unavailable(tonic::Status),
/// Server ID is not set
#[error("Server ID not set")]
NoServerId,
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
}
/// Errors returned by Client::list_chunks
#[derive(Debug, Error)]
pub enum ListChunksError {
@ -639,6 +659,28 @@ impl Client {
Ok(())
}
/// Restore database
pub async fn restore_database(
&mut self,
db_name: impl Into<String> + Send,
generation_id: usize,
) -> Result<(), RestoreDatabaseError> {
self.inner
.restore_database(RestoreDatabaseRequest {
db_name: db_name.into(),
generation_id: generation_id as u64,
})
.await
.map_err(|status| match status.code() {
tonic::Code::NotFound => RestoreDatabaseError::DatabaseNotFound,
tonic::Code::FailedPrecondition => RestoreDatabaseError::NoServerId,
tonic::Code::Unavailable => RestoreDatabaseError::Unavailable(status),
_ => RestoreDatabaseError::ServerError(status),
})?;
Ok(())
}
/// List chunks in a database.
pub async fn list_chunks(
&mut self,

View File

@ -31,7 +31,7 @@ use object_store::{
ObjectStore, ObjectStoreApi, Result,
};
use observability_deps::tracing::warn;
use snafu::{ensure, ResultExt, Snafu};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use std::{collections::BTreeMap, io, sync::Arc};
use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;
@ -56,6 +56,27 @@ pub enum IoxObjectStoreError {
#[snafu(display("Multiple active databases found in object storage"))]
MultipleActiveDatabasesFound,
#[snafu(display("Cannot restore; there is already an active database named `{}`", name))]
ActiveDatabaseAlreadyExists { name: String },
#[snafu(display("Generation `{}` not found for database `{}`", generation_id, name))]
GenerationNotFound {
generation_id: GenerationId,
name: String,
},
#[snafu(display(
"Could not restore generation `{}` of database `{}`: {}",
generation_id,
name,
source
))]
RestoreFailed {
generation_id: GenerationId,
name: String,
source: object_store::Error,
},
}
/// Handles persistence of data for a particular database. Writes within its directory/prefix.
@ -150,18 +171,6 @@ impl IoxObjectStore {
Ok(deleted_databases)
}
// TODO: implement a function to restore a deleted database generation, given the
// relevant information returned from [`list_deleted_databases`].
// See https://github.com/influxdata/influxdb_iox/issues/2199
// pub async fn restore_deleted_database(
// inner: &ObjectStore,
// server_id: ServerId,
// name: &DatabaseName<'_>,
// generation_id: GenerationId,
// ) -> Result<()> {
//
// }
/// List database names in object storage along with all existing generations for each database
/// and whether the generations are marked as deleted or not. Useful for finding candidates
/// to restore or to permanently delete. Makes many more calls to object storage than
@ -363,6 +372,58 @@ impl IoxObjectStore {
.await
}
/// Remove the tombstone file to restore a database generation. Will return an error if this
/// generation is already active or if there is another database generation already active for
/// this database name. Returns the reactivated IoxObjectStore.
pub async fn restore_database(
inner: Arc<ObjectStore>,
server_id: ServerId,
database_name: &DatabaseName<'static>,
generation_id: GenerationId,
) -> Result<Self, IoxObjectStoreError> {
let root_path = RootPath::new(&inner, server_id, database_name);
let generations = Self::list_generations(&inner, &root_path)
.await
.context(UnderlyingObjectStoreError)?;
let active = generations.iter().find(|g| g.is_active());
ensure!(
active.is_none(),
ActiveDatabaseAlreadyExists {
name: database_name
}
);
let restore_candidate = generations
.iter()
.find(|g| g.id == generation_id && !g.is_active())
.context(GenerationNotFound {
generation_id,
name: database_name.as_str(),
})?;
let generation_path = root_path.generation_path(*restore_candidate);
let tombstone_path = TombstonePath::new(&generation_path);
inner
.delete(&tombstone_path.inner)
.await
.context(RestoreFailed {
generation_id,
name: database_name.as_str(),
})?;
Ok(Self::existing(
inner,
server_id,
database_name,
Generation::active(generation_id.inner),
root_path,
))
}
// Catalog transaction file methods ===========================================================
/// List all the catalog transaction files in object storage for this database.
@ -1237,4 +1298,80 @@ mod tests {
assert_eq!(deleted_dbs[3].name, db_reincarnated);
assert_eq!(deleted_dbs[3].generation_id.inner, 0);
}
async fn restore_database(
object_store: Arc<ObjectStore>,
server_id: ServerId,
database_name: &DatabaseName<'static>,
generation_id: GenerationId,
) -> Result<IoxObjectStore, IoxObjectStoreError> {
IoxObjectStore::restore_database(
Arc::clone(&object_store),
server_id,
database_name,
generation_id,
)
.await
}
#[tokio::test]
async fn restore_deleted_database() {
let object_store = make_object_store();
let server_id = make_server_id();
// Create a database
let db = DatabaseName::new("db").unwrap();
let db_iox_store = create_database(Arc::clone(&object_store), server_id, &db).await;
// Delete the database
delete_database(&db_iox_store).await;
// Create and delete it again so there are two deleted generations
let db_iox_store = create_database(Arc::clone(&object_store), server_id, &db).await;
delete_database(&db_iox_store).await;
// Get one generation ID from the list of deleted databases
let deleted_dbs = IoxObjectStore::list_deleted_databases(&object_store, server_id)
.await
.unwrap();
assert_eq!(deleted_dbs.len(), 2);
let deleted_db = deleted_dbs.iter().find(|d| d.name == db).unwrap();
// Restore the generation
restore_database(
Arc::clone(&object_store),
server_id,
&db,
deleted_db.generation_id,
)
.await
.unwrap();
// The database should be in the list of all databases again
let all_dbs = IoxObjectStore::list_all_databases(&object_store, server_id)
.await
.unwrap();
assert_eq!(all_dbs.len(), 1);
// The other deleted generation should be the only item in the deleted list
let deleted_dbs = IoxObjectStore::list_deleted_databases(&object_store, server_id)
.await
.unwrap();
assert_eq!(deleted_dbs.len(), 1);
// Try to restore the other deleted database
let deleted_db = deleted_dbs.iter().find(|d| d.name == db).unwrap();
let err = restore_database(
Arc::clone(&object_store),
server_id,
&db,
deleted_db.generation_id,
)
.await
.unwrap_err();
assert_eq!(
err.to_string(),
"Cannot restore; there is already an active database named `db`"
);
}
}

View File

@ -86,6 +86,9 @@ pub enum Error {
#[snafu(display("no active database named {} to delete", db_name))]
NoActiveDatabaseToDelete { db_name: String },
#[snafu(display("cannot restore database named {} that is already active", db_name))]
CannotRestoreActiveDatabase { db_name: String },
}
#[derive(Debug, Snafu)]
@ -244,6 +247,36 @@ impl Database {
Ok(())
}
/// Mark this database as restored.
pub async fn restore(&self, iox_object_store: IoxObjectStore) -> Result<(), Error> {
let db_name = &self.shared.config.name;
let handle = {
let state = self.shared.state.read();
// Can't restore an already active database.
ensure!(!state.is_active(), CannotRestoreActiveDatabase { db_name });
state.try_freeze().context(TransitionInProgress {
db_name,
state: state.state_code(),
})?
};
let shared = Arc::clone(&self.shared);
{
// Reset the state to initializing with the given iox object storage
let mut state = shared.state.write();
*state.unfreeze(handle) =
DatabaseState::DatabaseObjectStoreFound(DatabaseStateDatabaseObjectStoreFound {
iox_object_store: Arc::new(iox_object_store),
});
}
Ok(())
}
/// Triggers shutdown of this `Database`
pub fn shutdown(&self) {
info!(db_name=%self.shared.config.name, "database shutting down");

View File

@ -152,6 +152,14 @@ pub enum Error {
#[snafu(display("{}", source))]
CannotMarkDatabaseDeleted { source: crate::database::Error },
#[snafu(display("{}", source))]
CannotRestoreDatabaseInObjectStorage {
source: iox_object_store::IoxObjectStoreError,
},
#[snafu(display("{}", source))]
CannotRestoreDatabase { source: crate::database::Error },
#[snafu(display("database already exists: {}", db_name))]
DatabaseAlreadyExists { db_name: String },
@ -684,6 +692,53 @@ where
Ok(())
}
/// Restore a database and generation that has been marked as deleted. Return an error if no
/// database with this generation can be found, or if there's already an active database with
/// this name.
pub async fn restore_database(
&self,
db_name: &DatabaseName<'static>,
generation_id: u64,
) -> Result<()> {
let (server_id, database) = {
let state = self.shared.state.read();
let initialized = state.initialized()?;
let database = Arc::clone(
initialized
.databases
.get(db_name)
.context(DatabaseNotFound { db_name })?,
);
if let Some(init_error) = database.init_error() {
if !matches!(&*init_error, database::InitError::NoActiveDatabase) {
return DatabaseAlreadyExists { db_name }.fail();
}
}
(initialized.server_id, database)
};
let iox_object_store = IoxObjectStore::restore_database(
Arc::clone(self.shared.application.object_store()),
server_id,
db_name,
GenerationId {
inner: generation_id as usize,
},
)
.await
.context(CannotRestoreDatabaseInObjectStorage)?;
database
.restore(iox_object_store)
.await
.context(CannotRestoreDatabase)?;
Ok(())
}
/// List all deleted databases in object storage.
pub async fn list_deleted_databases(&self) -> Result<Vec<DeletedDatabase>> {
let server_id = {

View File

@ -7,7 +7,7 @@ use influxdb_iox_client::{
format::QueryOutputFormat,
management::{
self, generated_types::*, CreateDatabaseError, DeleteDatabaseError, GetDatabaseError,
ListDatabaseError,
ListDatabaseError, RestoreDatabaseError,
},
write::{self, WriteError},
};
@ -37,6 +37,9 @@ pub enum Error {
#[error("Error deleting database: {0}")]
DeleteDatabaseError(#[from] DeleteDatabaseError),
#[error("Error restoring database: {0}")]
RestoreDatabaseError(#[from] RestoreDatabaseError),
#[error("Error reading file {:?}: {}", file_name, source)]
ReadingFile {
file_name: PathBuf,
@ -184,6 +187,16 @@ struct Delete {
name: String,
}
/// Restore a deleted database generation
#[derive(Debug, StructOpt)]
struct Restore {
/// The generation ID of the database to restore
generation_id: usize,
/// The name of the database to delete
name: String,
}
/// All possible subcommands for database
#[derive(Debug, StructOpt)]
enum Command {
@ -196,6 +209,7 @@ enum Command {
Partition(partition::Config),
Recover(recover::Config),
Delete(Delete),
Restore(Restore),
}
pub async fn command(connection: Connection, config: Config) -> Result<()> {
@ -331,6 +345,13 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
client.delete_database(&command.name).await?;
println!("Deleted database {}", command.name);
}
Command::Restore(command) => {
let mut client = management::Client::new(connection);
client
.restore_database(&command.name, command.generation_id)
.await?;
println!("Restored database {}", command.name);
}
}
Ok(())

View File

@ -133,6 +133,9 @@ pub fn default_database_error_handler(error: server::database::Error) -> tonic::
..Default::default()
}
.into(),
Error::CannotRestoreActiveDatabase { .. } => {
tonic::Status::failed_precondition(error.to_string())
}
}
}

View File

@ -179,6 +179,22 @@ where
Ok(Response::new(DeleteDatabaseResponse {}))
}
async fn restore_database(
&self,
request: Request<RestoreDatabaseRequest>,
) -> Result<Response<RestoreDatabaseResponse>, Status> {
let request = request.into_inner();
let db_name = DatabaseName::new(request.db_name).field("db_name")?;
let generation_id = request.generation_id;
self.server
.restore_database(&db_name, generation_id)
.await
.map_err(default_server_error_handler)?;
Ok(Response::new(RestoreDatabaseResponse {}))
}
async fn list_deleted_databases(
&self,
_: Request<ListDeletedDatabasesRequest>,

View File

@ -342,7 +342,52 @@ async fn delete_database() {
.arg(addr)
.assert()
.success()
.stdout(
predicate::str::contains(format!("0 {}", db))
.and(predicate::str::contains(format!("1 {}", db))),
);
// Restore generation 0
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("restore")
.arg("0")
.arg(db)
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains(format!(
"Restored database {}",
db
)));
// This database is back in the active list
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("list")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains(db));
// Only generation 1 is in the deleted list
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("list")
.arg("--deleted")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(
predicate::str::contains(format!("1 {}", db))
.and(predicate::str::contains(format!("0 {}", db)).not()),
);
}
#[tokio::test]