Merge pull request #2566 from influxdata/cn/detailed-db-list

feat: Detailed database list command
pull/24376/head
kodiakhq[bot] 2021-09-17 20:23:10 +00:00 committed by GitHub
commit 63a1ed147f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 255 additions and 83 deletions

View File

@ -2,12 +2,15 @@ use crate::DatabaseName;
use chrono::{DateTime, Utc};
use std::{fmt, str::FromStr};
/// Metadata about a deleted database that could be restored or permanently deleted.
/// Detailed metadata about a database.
#[derive(Debug, Clone, PartialEq)]
pub struct DeletedDatabase {
pub struct DetailedDatabase {
/// The name of the database
pub name: DatabaseName<'static>,
/// The generation ID of the database in object storage
pub generation_id: GenerationId,
pub deleted_at: DateTime<Utc>,
/// The UTC datetime at which this database was deleted, if applicable
pub deleted_at: Option<DateTime<Utc>>,
}
/// Identifier for a generation of a particular database

View File

@ -14,7 +14,7 @@ pub mod chunk_metadata;
pub mod consistent_hasher;
mod database_name;
pub mod database_rules;
pub mod deleted_database;
pub mod detailed_database;
pub mod error;
pub mod instant;
pub mod job;

View File

@ -42,6 +42,9 @@ service ManagementService {
// List deleted databases and their metadata.
rpc ListDeletedDatabases(ListDeletedDatabasesRequest) returns (ListDeletedDatabasesResponse);
// List all databases and their metadata.
rpc ListDetailedDatabases(ListDetailedDatabasesRequest) returns (ListDetailedDatabasesResponse);
// List chunks available on this database
rpc ListChunks(ListChunksRequest) returns (ListChunksResponse);
@ -189,18 +192,24 @@ message RestoreDatabaseResponse {}
message ListDeletedDatabasesRequest {}
message ListDeletedDatabasesResponse {
repeated DeletedDatabase deleted_databases = 1;
repeated DetailedDatabase deleted_databases = 1;
}
// This resource represents a deleted database.
message DeletedDatabase {
// The generation ID of the deleted database.
message ListDetailedDatabasesRequest {}
message ListDetailedDatabasesResponse {
repeated DetailedDatabase databases = 1;
}
// This resource represents detailed information about a database.
message DetailedDatabase {
// The generation ID of the database.
uint64 generation_id = 1;
// The UTC datetime at which this database was deleted.
// The UTC datetime at which this database was deleted, if applicable.
google.protobuf.Timestamp deleted_at = 2;
// The name of the deleted database.
// The name of the database.
string db_name = 3;
}

View File

@ -1,18 +0,0 @@
use crate::influxdata::iox::management::v1 as management;
use data_types::deleted_database::DeletedDatabase;
impl From<DeletedDatabase> for management::DeletedDatabase {
fn from(deleted: DeletedDatabase) -> Self {
let DeletedDatabase {
name,
generation_id,
deleted_at,
} = deleted;
Self {
db_name: name.to_string(),
generation_id: generation_id.inner as u64,
deleted_at: Some(deleted_at.into()),
}
}
}

View File

@ -0,0 +1,18 @@
use crate::influxdata::iox::management::v1 as management;
use data_types::detailed_database::DetailedDatabase;
impl From<DetailedDatabase> for management::DetailedDatabase {
fn from(database: DetailedDatabase) -> Self {
let DetailedDatabase {
name,
generation_id,
deleted_at,
} = database;
Self {
db_name: name.to_string(),
generation_id: generation_id.inner as u64,
deleted_at: deleted_at.map(Into::into),
}
}
}

View File

@ -143,7 +143,7 @@ pub use influxdata::platform::storage::*;
pub mod chunk;
pub mod database_rules;
pub mod database_state;
pub mod deleted_database;
pub mod detailed_database;
pub mod google;
pub mod job;

View File

@ -604,7 +604,7 @@ impl Client {
/// List deleted databases and metadata
pub async fn list_deleted_databases(
&mut self,
) -> Result<Vec<DeletedDatabase>, ListDatabaseError> {
) -> Result<Vec<DetailedDatabase>, ListDatabaseError> {
let response = self
.inner
.list_deleted_databases(ListDeletedDatabasesRequest {})
@ -616,6 +616,21 @@ impl Client {
Ok(response.into_inner().deleted_databases)
}
/// List all databases and detailed metadata
pub async fn list_detailed_databases(
&mut self,
) -> Result<Vec<DetailedDatabase>, ListDatabaseError> {
let response = self
.inner
.list_detailed_databases(ListDetailedDatabasesRequest {})
.await
.map_err(|status| match status.code() {
tonic::Code::Unavailable => ListDatabaseError::Unavailable(status),
_ => ListDatabaseError::ServerError(status),
})?;
Ok(response.into_inner().databases)
}
/// Get database configuration
///
/// If `omit_defaults` is false, return the current configuration

View File

@ -17,7 +17,7 @@
use bytes::{Bytes, BytesMut};
use chrono::{DateTime, Utc};
use data_types::{
deleted_database::{DeletedDatabase, GenerationId},
detailed_database::{DetailedDatabase, GenerationId},
error::ErrorLogger,
server_id::ServerId,
DatabaseName,
@ -96,19 +96,19 @@ pub struct IoxObjectStore {
#[derive(Debug, Copy, Clone, PartialEq)]
struct Generation {
id: GenerationId,
deleted: Option<DateTime<Utc>>,
deleted_at: Option<DateTime<Utc>>,
}
impl Generation {
fn active(id: usize) -> Self {
Self {
id: GenerationId { inner: id },
deleted: None,
deleted_at: None,
}
}
fn is_active(&self) -> bool {
self.deleted.is_none()
self.deleted_at.is_none()
}
}
@ -147,28 +147,46 @@ impl IoxObjectStore {
pub async fn list_deleted_databases(
inner: &ObjectStore,
server_id: ServerId,
) -> Result<Vec<DeletedDatabase>> {
let mut deleted_databases = vec![];
) -> Result<Vec<DetailedDatabase>> {
Ok(Self::list_all_databases(inner, server_id)
.await?
.into_iter()
.flat_map(|(name, generations)| {
let name = Arc::new(name);
generations.into_iter().filter_map(move |gen| {
let name = Arc::clone(&name);
gen.deleted_at.map(|_| DetailedDatabase {
name: (*name).clone(),
generation_id: gen.id,
deleted_at: gen.deleted_at,
})
})
})
.collect())
}
let all_dbs = Self::list_all_databases(inner, server_id).await;
for (name, generations) in all_dbs? {
for deleted_gen in generations {
if let Generation {
id,
deleted: Some(deleted_at),
} = deleted_gen
{
deleted_databases.push(DeletedDatabase {
name: name.clone(),
generation_id: id,
deleted_at,
});
}
}
}
Ok(deleted_databases)
/// List all databases in in object storage along with their generation IDs and if/when they
/// were deleted. Useful for visibility into object storage and finding databases to restore or
/// permanently delete.
pub async fn list_detailed_databases(
inner: &ObjectStore,
server_id: ServerId,
) -> Result<Vec<DetailedDatabase>> {
Ok(Self::list_all_databases(inner, server_id)
.await?
.into_iter()
.flat_map(|(name, generations)| {
let name = Arc::new(name);
generations.into_iter().map(move |gen| {
let name = Arc::clone(&name);
DetailedDatabase {
name: (*name).clone(),
generation_id: gen.id,
deleted_at: gen.deleted_at,
}
})
})
.collect())
}
/// List database names in object storage along with all existing generations for each database
@ -228,13 +246,13 @@ impl IoxObjectStore {
let generation_list_result = inner.list_with_delimiter(&prefix).await?;
let tombstone_file = TombstonePath::new_from_object_store_path(&prefix);
let deleted = generation_list_result
let deleted_at = generation_list_result
.objects
.into_iter()
.find(|object| object.location == tombstone_file.inner)
.map(|object| object.last_modified);
generations.push(Generation { id, deleted });
generations.push(Generation { id, deleted_at });
} else {
// Deliberately ignoring errors with parsing; if the directory isn't a usize, it's
// not a valid database generation directory and we should skip it.
@ -965,7 +983,7 @@ mod tests {
generations[0],
Generation {
id: GenerationId { inner: 0 },
deleted: None,
deleted_at: None,
}
);
@ -999,7 +1017,7 @@ mod tests {
generations[1],
Generation {
id: GenerationId { inner: 1 },
deleted: None,
deleted_at: None,
}
);
}

View File

@ -9,7 +9,7 @@ use crate::{
};
use chrono::{DateTime, Utc};
use data_types::{
database_rules::WriteBufferDirection, deleted_database::GenerationId, server_id::ServerId,
database_rules::WriteBufferDirection, detailed_database::GenerationId, server_id::ServerId,
DatabaseName,
};
use futures::{

View File

@ -72,7 +72,7 @@ use async_trait::async_trait;
use chrono::Utc;
use data_types::{
database_rules::{NodeGroup, RoutingRules, ShardId, Sink},
deleted_database::DeletedDatabase,
detailed_database::DetailedDatabase,
error::ErrorLogger,
job::Job,
server_id::ServerId,
@ -241,6 +241,9 @@ pub enum Error {
#[snafu(display("error listing deleted databases in object storage: {}", source))]
ListDeletedDatabases { source: object_store::Error },
#[snafu(display("error listing detailed databases in object storage: {}", source))]
ListDetailedDatabases { source: object_store::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -725,8 +728,8 @@ where
Ok(())
}
/// List all deleted databases in object storage.
pub async fn list_deleted_databases(&self) -> Result<Vec<DeletedDatabase>> {
/// List deleted databases in object storage.
pub async fn list_deleted_databases(&self) -> Result<Vec<DetailedDatabase>> {
let server_id = {
let state = self.shared.state.read();
let initialized = state.initialized()?;
@ -741,6 +744,22 @@ where
.context(ListDeletedDatabases)?)
}
/// List all databases, active and deleted, in object storage, including their generation IDs.
pub async fn list_detailed_databases(&self) -> Result<Vec<DetailedDatabase>> {
let server_id = {
let state = self.shared.state.read();
let initialized = state.initialized()?;
initialized.server_id
};
Ok(IoxObjectStore::list_detailed_databases(
self.shared.application.object_store(),
server_id,
)
.await
.context(ListDetailedDatabases)?)
}
pub async fn write_pb(&self, database_batch: pb::DatabaseBatch) -> Result<()> {
let db_name = DatabaseName::new(database_batch.database_name.as_str())
.context(InvalidDatabaseName)?;

View File

@ -11,6 +11,7 @@ use influxdb_iox_client::{
},
write::{self, WriteError},
};
use prettytable::{format, Cell, Row, Table};
use std::{
convert::TryInto, fs::File, io::Read, num::NonZeroU64, path::PathBuf, str::FromStr,
time::Duration,
@ -141,6 +142,11 @@ struct List {
/// Whether to list databases marked as deleted instead, to restore or permanently delete.
#[structopt(long)]
deleted: bool,
/// Whether to list detailed information, including generation IDs, about all databases,
/// whether they are active or marked as deleted.
#[structopt(long)]
detailed: bool,
}
/// Return configuration of specific database
@ -258,23 +264,37 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
}
Command::List(list) => {
let mut client = management::Client::new(connection);
if list.deleted {
let deleted = client.list_deleted_databases().await?;
println!("Deleted at | Generation ID | Name");
println!("--------------------------------+---------------+--------");
for database in deleted {
if list.deleted || list.detailed {
let databases = if list.deleted {
client.list_deleted_databases().await?
} else {
client.list_detailed_databases().await?
};
let mut table = Table::new();
table.set_format(*format::consts::FORMAT_NO_LINESEP_WITH_TITLE);
table.set_titles(Row::new(vec![
Cell::new("Deleted at"),
Cell::new("Generation ID"),
Cell::new("Name"),
]));
for database in databases {
let deleted_at = database
.deleted_at
.and_then(|t| {
let dt: Result<DateTime<Utc>, _> = t.try_into();
dt.ok().map(|d| d.to_string())
})
.unwrap_or_else(|| String::from("Unknown"));
println!(
"{:<33}{:<16}{}",
deleted_at, database.generation_id, database.db_name,
);
.unwrap_or_else(String::new);
table.add_row(Row::new(vec![
Cell::new(&deleted_at),
Cell::new(&database.generation_id.to_string()),
Cell::new(&database.db_name),
]));
}
print!("{}", table);
} else {
let names = client.list_database_names().await?;
println!("{}", names.join("\n"))

View File

@ -213,6 +213,22 @@ where
}))
}
async fn list_detailed_databases(
&self,
_: Request<ListDetailedDatabasesRequest>,
) -> Result<Response<ListDetailedDatabasesResponse>, Status> {
let databases = self
.server
.list_detailed_databases()
.await
.map_err(default_server_error_handler)?
.into_iter()
.map(Into::into)
.collect();
Ok(Response::new(ListDetailedDatabasesResponse { databases }))
}
async fn list_chunks(
&self,
request: Request<ListChunksRequest>,

View File

@ -183,6 +183,24 @@ async fn test_create_database_immutable() {
.stdout(predicate::str::contains(r#""immutable": true"#));
}
const DELETED_DB_DATETIME: &str = r#"[\d-]+\s[\d:\.]+\s[A-Z]+"#;
fn deleted_db_match(db: &str, generation_id: usize) -> predicates::str::RegexPredicate {
predicate::str::is_match(format!(
r#"(?m)^\|\s+{}\s+\|\s+{}\s+\|\s+{}\s+\|$"#,
DELETED_DB_DATETIME, generation_id, db
))
.unwrap()
}
fn active_db_match(db: &str, generation_id: usize) -> predicates::str::RegexPredicate {
predicate::str::is_match(format!(
r#"(?m)^\|\s+\|\s+{}\s+\|\s+{}\s+\|$"#,
generation_id, db
))
.unwrap()
}
#[tokio::test]
async fn delete_database() {
let server_fixture = ServerFixture::create_shared().await;
@ -224,6 +242,18 @@ async fn delete_database() {
.success()
.stdout(predicate::str::contains(db).not());
// Listing detailed database info does include the active database, along with its generation
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("list")
.arg("--detailed")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(active_db_match(db, 0));
// Delete the database
Command::cargo_bin("influxdb_iox")
.unwrap()
@ -257,7 +287,19 @@ async fn delete_database() {
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains(db));
.stdout(deleted_db_match(db, 0));
// Listing detailed database info does include the deleted database
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("list")
.arg("--detailed")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(deleted_db_match(db, 0));
// Deleting the database again is an error
Command::cargo_bin("influxdb_iox")
@ -306,7 +348,19 @@ async fn delete_database() {
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains(db));
.stdout(deleted_db_match(db, 0));
// Listing detailed database info includes both active and deleted
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("list")
.arg("--detailed")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(deleted_db_match(db, 0).and(active_db_match(db, 1)));
// Delete the 2nd database
Command::cargo_bin("influxdb_iox")
@ -341,10 +395,19 @@ async fn delete_database() {
.arg(addr)
.assert()
.success()
.stdout(
predicate::str::contains(format!("0 {}", db))
.and(predicate::str::contains(format!("1 {}", db))),
);
.stdout(deleted_db_match(db, 0).and(deleted_db_match(db, 1)));
// Listing detailed database info includes both deleted generations
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("list")
.arg("--detailed")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(deleted_db_match(db, 0).and(deleted_db_match(db, 1)));
// Restore generation 0
Command::cargo_bin("influxdb_iox")
@ -383,10 +446,19 @@ async fn delete_database() {
.arg(addr)
.assert()
.success()
.stdout(
predicate::str::contains(format!("1 {}", db))
.and(predicate::str::contains(format!("0 {}", db)).not()),
);
.stdout(deleted_db_match(db, 0).not().and(deleted_db_match(db, 1)));
// Listing detailed database info includes both active and deleted
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("database")
.arg("list")
.arg("--detailed")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(active_db_match(db, 0).and(deleted_db_match(db, 1)));
}
#[tokio::test]