feat: return DatabaseRules for ListDatabases request (#2431)
parent
07b1722412
commit
2f49e47a23
|
@ -15,6 +15,9 @@ service ManagementService {
|
|||
|
||||
rpc SetServingReadiness(SetServingReadinessRequest) returns (SetServingReadinessResponse);
|
||||
|
||||
// List all databases on this server.
|
||||
//
|
||||
// Roughly follows the <https://google.aip.dev/132> pattern, except we wrap the response
|
||||
rpc ListDatabases(ListDatabasesRequest) returns (ListDatabasesResponse);
|
||||
|
||||
// Return a specific database by name
|
||||
|
@ -110,10 +113,18 @@ message SetServingReadinessRequest {
|
|||
|
||||
message SetServingReadinessResponse {}
|
||||
|
||||
message ListDatabasesRequest {}
|
||||
message ListDatabasesRequest {
|
||||
// If true, returns only explciitly defined values. See additional
|
||||
// details on `GetDatabaseRequest`.
|
||||
bool omit_defaults = 1;
|
||||
}
|
||||
|
||||
message ListDatabasesResponse {
|
||||
repeated string names = 1;
|
||||
// old version of this API returned names only.
|
||||
// repeated string names = 1;
|
||||
|
||||
/// database rules (configuration) for each database
|
||||
repeated DatabaseRules rules = 2;
|
||||
}
|
||||
|
||||
message GetDatabaseRequest {
|
||||
|
|
|
@ -501,16 +501,38 @@ impl Client {
|
|||
}
|
||||
|
||||
/// List databases.
|
||||
pub async fn list_databases(&mut self) -> Result<Vec<String>, ListDatabaseError> {
|
||||
///
|
||||
/// See [`Self::get_database`] for the semanitcs of `omit_defaults`
|
||||
pub async fn list_databases(
|
||||
&mut self,
|
||||
omit_defaults: bool,
|
||||
) -> Result<Vec<DatabaseRules>, ListDatabaseError> {
|
||||
let response = self
|
||||
.inner
|
||||
.list_databases(ListDatabasesRequest {})
|
||||
.list_databases(ListDatabasesRequest { omit_defaults })
|
||||
.await
|
||||
.map_err(|status| match status.code() {
|
||||
tonic::Code::Unavailable => ListDatabaseError::Unavailable(status),
|
||||
_ => ListDatabaseError::ServerError(status),
|
||||
})?;
|
||||
Ok(response.into_inner().names)
|
||||
|
||||
Ok(response.into_inner().rules)
|
||||
}
|
||||
|
||||
/// List databases names
|
||||
pub async fn list_database_names(&mut self) -> Result<Vec<String>, ListDatabaseError> {
|
||||
// doesn't really matter as the name is present in all forms
|
||||
// of the config. Pick true to minimize bandwidth.
|
||||
let omit_defaults = true;
|
||||
|
||||
let databases = self.list_databases(omit_defaults).await?;
|
||||
|
||||
let names = databases
|
||||
.iter()
|
||||
.map(|rules| rules.name.to_string())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(names)
|
||||
}
|
||||
|
||||
/// Get database configuration
|
||||
|
|
|
@ -226,9 +226,6 @@ pub trait DatabaseStore: std::fmt::Debug + Send + Sync {
|
|||
/// The type of error this DataBase store generates
|
||||
type Error: std::error::Error + Send + Sync + 'static;
|
||||
|
||||
/// List the database names.
|
||||
fn db_names_sorted(&self) -> Vec<String>;
|
||||
|
||||
/// Retrieve the database specified by `name` returning None if no
|
||||
/// such database exists
|
||||
fn db(&self, name: &str) -> Option<Arc<Self::Database>>;
|
||||
|
@ -635,11 +632,21 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// Returns a list of `Database` for this `Server` in no particular order
|
||||
/// Returns a list of `Database` for this `Server` sorted by name
|
||||
pub fn databases(&self) -> Result<Vec<Arc<Database>>> {
|
||||
let state = self.shared.state.read();
|
||||
let initialized = state.initialized()?;
|
||||
Ok(initialized.databases.values().cloned().collect())
|
||||
let mut databases: Vec<_> = initialized.databases.iter().collect();
|
||||
|
||||
// ensure the databases come back sorted by name
|
||||
databases.sort_by_key(|(name, _db)| (*name).clone());
|
||||
|
||||
let databases = databases
|
||||
.into_iter()
|
||||
.map(|(_name, db)| Arc::clone(db))
|
||||
.collect();
|
||||
|
||||
Ok(databases)
|
||||
}
|
||||
|
||||
/// Get the `Database` by name
|
||||
|
@ -1173,24 +1180,6 @@ where
|
|||
type Database = Db;
|
||||
type Error = Error;
|
||||
|
||||
fn db_names_sorted(&self) -> Vec<String> {
|
||||
self.shared
|
||||
.state
|
||||
.read()
|
||||
.initialized()
|
||||
.map(|initialized| {
|
||||
let mut keys: Vec<_> = initialized
|
||||
.databases
|
||||
.keys()
|
||||
.map(ToString::to_string)
|
||||
.collect();
|
||||
|
||||
keys.sort_unstable();
|
||||
keys
|
||||
})
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
fn db(&self, name: &str) -> Option<Arc<Self::Database>> {
|
||||
DatabaseName::new(name)
|
||||
.ok()
|
||||
|
@ -1216,6 +1205,32 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl<M> Server<M>
|
||||
where
|
||||
M: ConnectionManager + Send + Sync,
|
||||
{
|
||||
/// For tests: list of database names in this server, regardless
|
||||
/// of their initialization state
|
||||
fn db_names_sorted(&self) -> Vec<String> {
|
||||
self.shared
|
||||
.state
|
||||
.read()
|
||||
.initialized()
|
||||
.map(|initialized| {
|
||||
let mut keys: Vec<_> = initialized
|
||||
.databases
|
||||
.keys()
|
||||
.map(ToString::to_string)
|
||||
.collect();
|
||||
|
||||
keys.sort_unstable();
|
||||
keys
|
||||
})
|
||||
.unwrap_or_default()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
|
@ -216,8 +216,8 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
|
|||
}
|
||||
Command::List(_) => {
|
||||
let mut client = management::Client::new(connection);
|
||||
let databases = client.list_databases().await?;
|
||||
println!("{}", databases.join("\n"))
|
||||
let names = client.list_database_names().await?;
|
||||
println!("{}", names.join("\n"))
|
||||
}
|
||||
Command::Get(get) => {
|
||||
let Get {
|
||||
|
|
|
@ -108,7 +108,7 @@ async fn load_remote_system_tables(
|
|||
let mut management_client = influxdb_iox_client::management::Client::new(connection.clone());
|
||||
|
||||
let db_names = management_client
|
||||
.list_databases()
|
||||
.list_database_names()
|
||||
.await
|
||||
.context(LoadingDatabaseNames)?;
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ impl RemoteState {
|
|||
management_client: &mut influxdb_iox_client::management::Client,
|
||||
) -> Result<Self> {
|
||||
let db_names = management_client
|
||||
.list_databases()
|
||||
.list_database_names()
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(LoadingRemoteState)?;
|
||||
|
|
|
@ -601,7 +601,7 @@ mod tests {
|
|||
|
||||
let mut client = influxdb_iox_client::management::Client::new(client);
|
||||
|
||||
client.list_databases().await.unwrap();
|
||||
client.list_database_names().await.unwrap();
|
||||
|
||||
assert_eq!(trace_collector.spans().len(), 0);
|
||||
|
||||
|
@ -624,12 +624,12 @@ mod tests {
|
|||
|
||||
let mut b3_tracing_client = influxdb_iox_client::management::Client::new(b3_tracing_client);
|
||||
|
||||
b3_tracing_client.list_databases().await.unwrap();
|
||||
b3_tracing_client.list_database_names().await.unwrap();
|
||||
b3_tracing_client.get_server_status().await.unwrap();
|
||||
|
||||
let conn = jaeger_client(addr, "34f9495:30e34:0:1").await;
|
||||
influxdb_iox_client::management::Client::new(conn)
|
||||
.list_databases()
|
||||
.list_database_names()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
@ -747,7 +747,7 @@ mod tests {
|
|||
let (addr, server, join) = tracing_server(&collector).await;
|
||||
let conn = jaeger_client(addr, "34f8495:30e34:0:1").await;
|
||||
influxdb_iox_client::management::Client::new(conn)
|
||||
.list_databases()
|
||||
.list_database_names()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -8,7 +8,7 @@ use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt,
|
|||
use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *};
|
||||
use query::QueryDatabase;
|
||||
use server::rules::ProvidedDatabaseRules;
|
||||
use server::{ApplicationState, ConnectionManager, DatabaseStore, Error, Server};
|
||||
use server::{ApplicationState, ConnectionManager, Error, Server};
|
||||
use tonic::{Request, Response, Status};
|
||||
|
||||
struct ManagementService<M: ConnectionManager> {
|
||||
|
@ -59,10 +59,20 @@ where
|
|||
|
||||
async fn list_databases(
|
||||
&self,
|
||||
_: Request<ListDatabasesRequest>,
|
||||
request: Request<ListDatabasesRequest>,
|
||||
) -> Result<Response<ListDatabasesResponse>, Status> {
|
||||
let names = self.server.db_names_sorted();
|
||||
Ok(Response::new(ListDatabasesResponse { names }))
|
||||
let ListDatabasesRequest { omit_defaults } = request.into_inner();
|
||||
|
||||
let rules = self
|
||||
.server
|
||||
.databases()
|
||||
.map_err(default_server_error_handler)?
|
||||
.into_iter()
|
||||
.filter_map(|db| db.provided_rules())
|
||||
.map(|rules| format_rules(rules, omit_defaults))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(Response::new(ListDatabasesResponse { rules }))
|
||||
}
|
||||
|
||||
async fn get_database(
|
||||
|
@ -80,25 +90,17 @@ where
|
|||
.database(&name)
|
||||
.map_err(default_server_error_handler)?;
|
||||
|
||||
match database.provided_rules() {
|
||||
Some(provided_rules) => {
|
||||
let rules: DatabaseRules = if omit_defaults {
|
||||
// return rules as originally provided by the user
|
||||
provided_rules.original().clone()
|
||||
} else {
|
||||
// return the active rules (which have all default values filled in)
|
||||
provided_rules.rules().as_ref().clone().into()
|
||||
};
|
||||
|
||||
Ok(Response::new(GetDatabaseResponse { rules: Some(rules) }))
|
||||
}
|
||||
None => {
|
||||
return Err(tonic::Status::unavailable(format!(
|
||||
let rules = database
|
||||
.provided_rules()
|
||||
.map(|rules| format_rules(rules, omit_defaults))
|
||||
.ok_or_else(|| {
|
||||
tonic::Status::unavailable(format!(
|
||||
"Rules have not yet been loaded for database ({})",
|
||||
name
|
||||
)))
|
||||
}
|
||||
}
|
||||
))
|
||||
})?;
|
||||
|
||||
Ok(Response::new(GetDatabaseResponse { rules: Some(rules) }))
|
||||
}
|
||||
|
||||
async fn create_database(
|
||||
|
@ -559,6 +561,19 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// returns [`DatabaseRules`] formated accordingo the omit_defaults
|
||||
/// flag. If omit_defaults is true, returns the stored config,
|
||||
/// otherwise returns the actual configu
|
||||
fn format_rules(provided_rules: Arc<ProvidedDatabaseRules>, omit_defaults: bool) -> DatabaseRules {
|
||||
if omit_defaults {
|
||||
// return rules as originally provided by the user
|
||||
provided_rules.original().clone()
|
||||
} else {
|
||||
// return the active rules (which have all default values filled in)
|
||||
provided_rules.rules().as_ref().clone().into()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn make_server<M>(
|
||||
application: Arc<ApplicationState>,
|
||||
server: Arc<Server<M>>,
|
||||
|
|
|
@ -2495,13 +2495,6 @@ mod tests {
|
|||
type Database = TestDatabase;
|
||||
type Error = TestError;
|
||||
|
||||
/// List the database names.
|
||||
fn db_names_sorted(&self) -> Vec<String> {
|
||||
let databases = self.databases.lock();
|
||||
|
||||
databases.keys().cloned().collect()
|
||||
}
|
||||
|
||||
/// Retrieve the database specified name
|
||||
fn db(&self, name: &str) -> Option<Arc<Self::Database>> {
|
||||
let databases = self.databases.lock();
|
||||
|
|
|
@ -189,20 +189,72 @@ async fn test_list_databases() {
|
|||
let server_fixture = ServerFixture::create_shared().await;
|
||||
let mut client = server_fixture.management_client();
|
||||
|
||||
let name = rand_name();
|
||||
let name1 = rand_name();
|
||||
let rules1 = DatabaseRules {
|
||||
name: name1.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
client
|
||||
.create_database(DatabaseRules {
|
||||
name: name.clone(),
|
||||
..Default::default()
|
||||
})
|
||||
.create_database(rules1)
|
||||
.await
|
||||
.expect("create database failed");
|
||||
|
||||
let names = client
|
||||
.list_databases()
|
||||
let name2 = rand_name();
|
||||
// Only set the worker cleanup rules.
|
||||
let rules2 = DatabaseRules {
|
||||
name: name2.clone(),
|
||||
worker_cleanup_avg_sleep: Some(Duration {
|
||||
seconds: 2,
|
||||
nanos: 0,
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
client
|
||||
.create_database(rules2)
|
||||
.await
|
||||
.expect("list databases failed");
|
||||
assert!(names.contains(&name));
|
||||
.expect("create database failed");
|
||||
|
||||
// By default, should get both databases names back
|
||||
let omit_defaults = false;
|
||||
let databases: Vec<_> = client
|
||||
.list_databases(omit_defaults)
|
||||
.await
|
||||
.expect("list databases failed")
|
||||
.into_iter()
|
||||
// names may contain the names of other databases created by
|
||||
// concurrent tests as well
|
||||
.filter(|rules| rules.name == name1 || rules.name == name2)
|
||||
.collect();
|
||||
|
||||
let names: Vec<_> = databases.iter().map(|rules| rules.name.clone()).collect();
|
||||
|
||||
assert!(dbg!(&names).contains(&name1));
|
||||
assert!(dbg!(&names).contains(&name2));
|
||||
|
||||
// validate that both rules have the defaults filled in
|
||||
for rules in &databases {
|
||||
assert!(rules.lifecycle_rules.is_some());
|
||||
}
|
||||
|
||||
// now fetch without defaults, and neither should have their rules filled in
|
||||
let omit_defaults = true;
|
||||
let databases: Vec<_> = client
|
||||
.list_databases(omit_defaults)
|
||||
.await
|
||||
.expect("list databases failed")
|
||||
.into_iter()
|
||||
// names may contain the names of other databases created by
|
||||
// concurrent tests as well
|
||||
.filter(|rules| rules.name == name1 || rules.name == name2)
|
||||
.collect();
|
||||
|
||||
let names: Vec<_> = databases.iter().map(|rules| rules.name.clone()).collect();
|
||||
assert!(dbg!(&names).contains(&name1));
|
||||
assert!(dbg!(&names).contains(&name2));
|
||||
|
||||
for rules in &databases {
|
||||
assert!(rules.lifecycle_rules.is_none());
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
Loading…
Reference in New Issue