feat: Add database initialization state and errors to CLI and remove list_databases_detailed gRPC (#3377)
* feat: Add database initialization state and errors to CLI: * fix: do not use optional in protobuf * fix: clippy * fix: correct check I broke appeasing clippypull/24376/head
parent
7fe6897c59
commit
758b65dd29
|
@ -1,11 +0,0 @@
|
||||||
use crate::DatabaseName;
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
/// Detailed metadata about an active database.
|
|
||||||
#[derive(Debug, Clone, PartialEq)]
|
|
||||||
pub struct ActiveDatabase {
|
|
||||||
/// The name of the database
|
|
||||||
pub name: DatabaseName<'static>,
|
|
||||||
/// The UUID of the database
|
|
||||||
pub uuid: Uuid,
|
|
||||||
}
|
|
|
@ -15,7 +15,6 @@ pub mod consistent_hasher;
|
||||||
mod database_name;
|
mod database_name;
|
||||||
pub mod database_rules;
|
pub mod database_rules;
|
||||||
pub mod delete_predicate;
|
pub mod delete_predicate;
|
||||||
pub mod detailed_database;
|
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod job;
|
pub mod job;
|
||||||
pub mod names;
|
pub mod names;
|
||||||
|
|
|
@ -34,9 +34,6 @@ service ManagementService {
|
||||||
// Claim a released database.
|
// Claim a released database.
|
||||||
rpc ClaimDatabase(ClaimDatabaseRequest) returns (ClaimDatabaseResponse);
|
rpc ClaimDatabase(ClaimDatabaseRequest) returns (ClaimDatabaseResponse);
|
||||||
|
|
||||||
// List databases with their metadata.
|
|
||||||
rpc ListDetailedDatabases(ListDetailedDatabasesRequest) returns (ListDetailedDatabasesResponse);
|
|
||||||
|
|
||||||
// List chunks available on this database
|
// List chunks available on this database
|
||||||
rpc ListChunks(ListChunksRequest) returns (ListChunksResponse);
|
rpc ListChunks(ListChunksRequest) returns (ListChunksResponse);
|
||||||
|
|
||||||
|
@ -179,29 +176,6 @@ message ClaimDatabaseResponse {
|
||||||
string db_name = 1;
|
string db_name = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
message ListDetailedDatabasesRequest {}
|
|
||||||
|
|
||||||
message ListDetailedDatabasesResponse {
|
|
||||||
repeated DetailedDatabase databases = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// This resource represents detailed information about a database.
|
|
||||||
message DetailedDatabase {
|
|
||||||
// Was the generation ID of the database.
|
|
||||||
reserved 1;
|
|
||||||
reserved "generation_id";
|
|
||||||
|
|
||||||
// Was the datetime at which this database was deleted, if applicable.
|
|
||||||
reserved 2;
|
|
||||||
reserved "deleted_at";
|
|
||||||
|
|
||||||
// The name of the database.
|
|
||||||
string db_name = 3;
|
|
||||||
|
|
||||||
// The UUID of the database.
|
|
||||||
bytes uuid = 4;
|
|
||||||
}
|
|
||||||
|
|
||||||
message ListChunksRequest {
|
message ListChunksRequest {
|
||||||
// the name of the database
|
// the name of the database
|
||||||
string db_name = 1;
|
string db_name = 1;
|
||||||
|
@ -414,6 +388,9 @@ message DatabaseStatus {
|
||||||
|
|
||||||
// Current initialization state of the database.
|
// Current initialization state of the database.
|
||||||
DatabaseState state = 3;
|
DatabaseState state = 3;
|
||||||
|
|
||||||
|
// The UUID of the database, if known, empty otherwise
|
||||||
|
bytes uuid = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message Error {
|
message Error {
|
||||||
|
|
|
@ -1,13 +0,0 @@
|
||||||
use crate::influxdata::iox::management::v1 as management;
|
|
||||||
use data_types::detailed_database::ActiveDatabase;
|
|
||||||
|
|
||||||
impl From<ActiveDatabase> for management::DetailedDatabase {
|
|
||||||
fn from(database: ActiveDatabase) -> Self {
|
|
||||||
let ActiveDatabase { name, uuid } = database;
|
|
||||||
|
|
||||||
Self {
|
|
||||||
db_name: name.to_string(),
|
|
||||||
uuid: uuid.as_bytes().to_vec(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -204,8 +204,6 @@ pub mod database_state;
|
||||||
#[cfg(any(feature = "data_types_conversions", test))]
|
#[cfg(any(feature = "data_types_conversions", test))]
|
||||||
pub mod delete_predicate;
|
pub mod delete_predicate;
|
||||||
#[cfg(any(feature = "data_types_conversions", test))]
|
#[cfg(any(feature = "data_types_conversions", test))]
|
||||||
pub mod detailed_database;
|
|
||||||
#[cfg(any(feature = "data_types_conversions", test))]
|
|
||||||
pub mod job;
|
pub mod job;
|
||||||
#[cfg(any(feature = "data_types_conversions", test))]
|
#[cfg(any(feature = "data_types_conversions", test))]
|
||||||
pub mod router;
|
pub mod router;
|
||||||
|
|
|
@ -6,7 +6,7 @@ use influxdb_iox_client::{
|
||||||
connection::Connection,
|
connection::Connection,
|
||||||
flight,
|
flight,
|
||||||
format::QueryOutputFormat,
|
format::QueryOutputFormat,
|
||||||
management::{self, generated_types::*},
|
management::{self, generated_types::database_status::DatabaseState, generated_types::*},
|
||||||
write,
|
write,
|
||||||
};
|
};
|
||||||
use std::{fs::File, io::Read, num::NonZeroU64, path::PathBuf, str::FromStr, time::Duration};
|
use std::{fs::File, io::Read, num::NonZeroU64, path::PathBuf, str::FromStr, time::Duration};
|
||||||
|
@ -254,19 +254,53 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
||||||
Command::List(list) => {
|
Command::List(list) => {
|
||||||
let mut client = management::Client::new(connection);
|
let mut client = management::Client::new(connection);
|
||||||
if list.detailed {
|
if list.detailed {
|
||||||
let databases = client.list_detailed_databases().await?;
|
let ServerStatus {
|
||||||
|
initialized,
|
||||||
|
error,
|
||||||
|
database_statuses,
|
||||||
|
} = client.get_server_status().await?;
|
||||||
|
if !initialized {
|
||||||
|
eprintln!("Can not list databases. Server is not yet initialized");
|
||||||
|
if let Some(err) = error {
|
||||||
|
println!("WARNING: Server is in error state: {}", err.message);
|
||||||
|
}
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
if !databases.is_empty() {
|
if !database_statuses.is_empty() {
|
||||||
let mut table = Table::new();
|
let mut table = Table::new();
|
||||||
table.load_preset(TABLE_STYLE_SINGLE_LINE_BORDERS);
|
table.load_preset(TABLE_STYLE_SINGLE_LINE_BORDERS);
|
||||||
table.set_header(vec![Cell::new("Name"), Cell::new("UUID")]);
|
table.set_header(vec![
|
||||||
|
Cell::new("Name"),
|
||||||
|
Cell::new("UUID"),
|
||||||
|
Cell::new("State"),
|
||||||
|
Cell::new("Error"),
|
||||||
|
]);
|
||||||
|
|
||||||
for database in databases {
|
for database in database_statuses {
|
||||||
let uuid = Uuid::from_slice(&database.uuid)
|
let uuid = if !database.uuid.is_empty() {
|
||||||
.map(|u| u.to_string())
|
Uuid::from_slice(&database.uuid)
|
||||||
.unwrap_or_else(|_| String::from("<UUID parsing failed>"));
|
.map(|uuid| uuid.to_string())
|
||||||
|
.unwrap_or_else(|_| String::from("<UUID parsing failed>"))
|
||||||
|
} else {
|
||||||
|
String::from("<UUID not yet known>")
|
||||||
|
};
|
||||||
|
|
||||||
table.add_row(vec![Cell::new(&database.db_name), Cell::new(&uuid)]);
|
let state = DatabaseState::from_i32(database.state)
|
||||||
|
.map(|state| state.description())
|
||||||
|
.unwrap_or("UNKNOWN STATE");
|
||||||
|
|
||||||
|
let error = database
|
||||||
|
.error
|
||||||
|
.map(|e| e.message)
|
||||||
|
.unwrap_or_else(|| String::from("<none>"));
|
||||||
|
|
||||||
|
table.add_row(vec![
|
||||||
|
Cell::new(&database.db_name),
|
||||||
|
Cell::new(&uuid),
|
||||||
|
Cell::new(&state),
|
||||||
|
Cell::new(&error),
|
||||||
|
]);
|
||||||
}
|
}
|
||||||
|
|
||||||
print!("{}", table);
|
print!("{}", table);
|
||||||
|
|
|
@ -159,22 +159,6 @@ impl management_service_server::ManagementService for ManagementService {
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn list_detailed_databases(
|
|
||||||
&self,
|
|
||||||
_: Request<ListDetailedDatabasesRequest>,
|
|
||||||
) -> Result<Response<ListDetailedDatabasesResponse>, Status> {
|
|
||||||
let databases = self
|
|
||||||
.server
|
|
||||||
.list_detailed_databases()
|
|
||||||
.await
|
|
||||||
.map_err(default_server_error_handler)?
|
|
||||||
.into_iter()
|
|
||||||
.map(Into::into)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
Ok(Response::new(ListDetailedDatabasesResponse { databases }))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn list_chunks(
|
async fn list_chunks(
|
||||||
&self,
|
&self,
|
||||||
request: Request<ListChunksRequest>,
|
request: Request<ListChunksRequest>,
|
||||||
|
@ -404,12 +388,16 @@ impl management_service_server::ManagementService for ManagementService {
|
||||||
message: e.to_string(),
|
message: e.to_string(),
|
||||||
}),
|
}),
|
||||||
state: database.state_code().into(),
|
state: database.state_code().into(),
|
||||||
|
uuid: database
|
||||||
|
.uuid()
|
||||||
|
.map(|uuid| uuid.as_bytes().to_vec())
|
||||||
|
.unwrap_or_default(),
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
})
|
})
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
// Sort output by database name
|
// Sort output by database name to ensure a nice output order
|
||||||
database_statuses.sort_unstable_by(|a, b| a.db_name.cmp(&b.db_name));
|
database_statuses.sort_unstable_by(|a, b| a.db_name.cmp(&b.db_name));
|
||||||
|
|
||||||
Ok(Response::new(GetServerStatusResponse {
|
Ok(Response::new(GetServerStatusResponse {
|
||||||
|
|
|
@ -274,17 +274,7 @@ async fn test_create_get_update_release_claim_database() {
|
||||||
42
|
42
|
||||||
);
|
);
|
||||||
|
|
||||||
let databases: Vec<_> = client
|
assert_eq!(get_uuid(&mut client, &db_name).await, Some(created_uuid));
|
||||||
.list_detailed_databases()
|
|
||||||
.await
|
|
||||||
.expect("list detailed databases failed")
|
|
||||||
.into_iter()
|
|
||||||
// names may contain the names of other databases created by
|
|
||||||
// concurrent tests as well
|
|
||||||
.filter(|db| db.db_name == db_name)
|
|
||||||
.collect();
|
|
||||||
assert_eq!(databases.len(), 1);
|
|
||||||
assert_eq!(Uuid::from_slice(&databases[0].uuid).unwrap(), created_uuid);
|
|
||||||
|
|
||||||
let released_uuid = client.release_database(&db_name, None).await.unwrap();
|
let released_uuid = client.release_database(&db_name, None).await.unwrap();
|
||||||
assert_eq!(created_uuid, released_uuid);
|
assert_eq!(created_uuid, released_uuid);
|
||||||
|
@ -346,6 +336,35 @@ async fn test_create_get_update_release_claim_database() {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// queries the server and returns the uuid, if any, for the specified database
|
||||||
|
async fn get_uuid(client: &mut Client, db_name: &str) -> Option<Uuid> {
|
||||||
|
let databases: Vec<_> = client
|
||||||
|
.get_server_status()
|
||||||
|
.await
|
||||||
|
.expect("get_server_status failed")
|
||||||
|
.database_statuses
|
||||||
|
.into_iter()
|
||||||
|
// names may contain the names of other databases created by
|
||||||
|
// concurrent tests as well
|
||||||
|
.filter(|db| db.db_name == db_name)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
databases.len() <= 1,
|
||||||
|
"found more than one entry for {}: {:?}",
|
||||||
|
db_name,
|
||||||
|
databases
|
||||||
|
);
|
||||||
|
|
||||||
|
databases.into_iter().next().and_then(|db| {
|
||||||
|
if !db.uuid.is_empty() {
|
||||||
|
Some(Uuid::from_slice(&db.uuid).unwrap())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn release_database() {
|
async fn release_database() {
|
||||||
test_helpers::maybe_start_logging();
|
test_helpers::maybe_start_logging();
|
||||||
|
@ -366,14 +385,7 @@ async fn release_database() {
|
||||||
assert_eq!(created_uuid, released_uuid);
|
assert_eq!(created_uuid, released_uuid);
|
||||||
|
|
||||||
// Released database is no longer in this server's database list
|
// Released database is no longer in this server's database list
|
||||||
assert!(!client
|
assert_eq!(get_uuid(&mut client, &db_name).await, None);
|
||||||
.list_detailed_databases()
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.into_iter()
|
|
||||||
// names may contain the names of other databases created by
|
|
||||||
// concurrent tests as well
|
|
||||||
.any(|db| db.db_name == db_name));
|
|
||||||
|
|
||||||
// Releasing the same database again is an error
|
// Releasing the same database again is an error
|
||||||
let err = client.release_database(&db_name, None).await.unwrap_err();
|
let err = client.release_database(&db_name, None).await.unwrap_err();
|
||||||
|
@ -429,18 +441,7 @@ async fn claim_database() {
|
||||||
client.claim_database(deleted_uuid, false).await.unwrap();
|
client.claim_database(deleted_uuid, false).await.unwrap();
|
||||||
|
|
||||||
// Claimed database is back in this server's database list
|
// Claimed database is back in this server's database list
|
||||||
assert_eq!(
|
assert_eq!(get_uuid(&mut client, &db_name).await, Some(deleted_uuid));
|
||||||
client
|
|
||||||
.list_detailed_databases()
|
|
||||||
.await
|
|
||||||
.unwrap()
|
|
||||||
.into_iter()
|
|
||||||
// names may contain the names of other databases created by
|
|
||||||
// concurrent tests as well
|
|
||||||
.filter(|db| db.db_name == db_name)
|
|
||||||
.count(),
|
|
||||||
1
|
|
||||||
);
|
|
||||||
|
|
||||||
// Claiming the same database again is an error
|
// Claiming the same database again is an error
|
||||||
let err = client
|
let err = client
|
||||||
|
|
|
@ -15,7 +15,7 @@ use generated_types::{
|
||||||
use predicates::prelude::*;
|
use predicates::prelude::*;
|
||||||
use std::{path::PathBuf, sync::Arc, time::Duration};
|
use std::{path::PathBuf, sync::Arc, time::Duration};
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
use test_helpers::make_temp_file;
|
use test_helpers::{assert_contains, make_temp_file};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -751,6 +751,50 @@ async fn force_claim_database() {
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn list_database_detailed() {
|
||||||
|
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
|
||||||
|
let addr = server_fixture.grpc_base();
|
||||||
|
let db_name = rand_name();
|
||||||
|
let db = &db_name;
|
||||||
|
let uuid = create_readable_database(&db_name, server_fixture.grpc_channel()).await;
|
||||||
|
|
||||||
|
// Listing the databases includes the db name, and status
|
||||||
|
let output = String::from_utf8(
|
||||||
|
Command::cargo_bin("influxdb_iox")
|
||||||
|
.unwrap()
|
||||||
|
.arg("database")
|
||||||
|
.arg("list")
|
||||||
|
.arg("--detailed")
|
||||||
|
.arg("--host")
|
||||||
|
.arg(addr)
|
||||||
|
.assert()
|
||||||
|
.success()
|
||||||
|
.get_output()
|
||||||
|
.stdout
|
||||||
|
.clone(),
|
||||||
|
)
|
||||||
|
.expect("non utf8 in output");
|
||||||
|
|
||||||
|
// Output looks like:
|
||||||
|
// +------------+--------------------------------------+-------------+--------+
|
||||||
|
// | Name | UUID | State | Error |
|
||||||
|
// +------------+--------------------------------------+-------------+--------+
|
||||||
|
// | ie9HrfSBQB | 299b541d-e3fb-47ef-bdd4-98f94ad1f1b3 | Initialized | <none> |
|
||||||
|
// +------------+--------------------------------------+-------------+--------+
|
||||||
|
|
||||||
|
println!("looking for {} in", db);
|
||||||
|
println!("{}", output);
|
||||||
|
let line = output
|
||||||
|
.split('\n')
|
||||||
|
.find(|line| line.contains(db))
|
||||||
|
.expect("can't find db name");
|
||||||
|
|
||||||
|
assert_contains!(line, uuid.to_string());
|
||||||
|
assert_contains!(line, "Initialized"); // state
|
||||||
|
assert_contains!(line, "<none>"); // error
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_get_chunks() {
|
async fn test_get_chunks() {
|
||||||
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
|
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
|
||||||
|
|
|
@ -131,15 +131,6 @@ impl Client {
|
||||||
Ok(names)
|
Ok(names)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// List databases and detailed metadata
|
|
||||||
pub async fn list_detailed_databases(&mut self) -> Result<Vec<DetailedDatabase>, Error> {
|
|
||||||
let response = self
|
|
||||||
.inner
|
|
||||||
.list_detailed_databases(ListDetailedDatabasesRequest {})
|
|
||||||
.await?;
|
|
||||||
Ok(response.into_inner().databases)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get database configuration
|
/// Get database configuration
|
||||||
///
|
///
|
||||||
/// If `omit_defaults` is false, return the current configuration
|
/// If `omit_defaults` is false, return the current configuration
|
||||||
|
|
|
@ -72,7 +72,6 @@ use ::lifecycle::{LockableChunk, LockablePartition};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use data_types::{
|
use data_types::{
|
||||||
chunk_metadata::ChunkId,
|
chunk_metadata::ChunkId,
|
||||||
detailed_database::ActiveDatabase,
|
|
||||||
error::ErrorLogger,
|
error::ErrorLogger,
|
||||||
job::Job,
|
job::Job,
|
||||||
server_id::ServerId,
|
server_id::ServerId,
|
||||||
|
@ -788,20 +787,6 @@ impl Server {
|
||||||
Ok(db_name)
|
Ok(db_name)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// List active databases owned by this server, including their UUIDs.
|
|
||||||
pub async fn list_detailed_databases(&self) -> Result<Vec<ActiveDatabase>> {
|
|
||||||
Ok(self
|
|
||||||
.databases()?
|
|
||||||
.iter()
|
|
||||||
.filter_map(|db| {
|
|
||||||
db.uuid().map(|uuid| ActiveDatabase {
|
|
||||||
name: db.config().name.clone(),
|
|
||||||
uuid,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.collect())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Write this server's databases out to the server config in object storage.
|
/// Write this server's databases out to the server config in object storage.
|
||||||
async fn persist_server_config(&self) -> Result<()> {
|
async fn persist_server_config(&self) -> Result<()> {
|
||||||
let (server_id, bytes) = {
|
let (server_id, bytes) = {
|
||||||
|
|
Loading…
Reference in New Issue