From 2f49e47a2362f6b30774f278a88b09eabe24aaab Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 28 Aug 2021 06:53:24 -0400 Subject: [PATCH] feat: return DatabaseRules for ListDatabases request (#2431) --- .../iox/management/v1/service.proto | 15 +++- influxdb_iox_client/src/client/management.rs | 28 +++++++- server/src/lib.rs | 61 ++++++++++------ src/commands/database.rs | 4 +- src/commands/sql/observer.rs | 2 +- src/commands/sql/repl.rs | 2 +- src/influxdb_ioxd.rs | 8 +-- src/influxdb_ioxd/rpc/management.rs | 57 +++++++++------ src/influxdb_ioxd/rpc/storage/service.rs | 7 -- tests/end_to_end_cases/management_api.rs | 70 ++++++++++++++++--- 10 files changed, 181 insertions(+), 73 deletions(-) diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index ce7499fbcd..b2237dc869 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -15,6 +15,9 @@ service ManagementService { rpc SetServingReadiness(SetServingReadinessRequest) returns (SetServingReadinessResponse); + // List all databases on this server. + // + // Roughly follows the pattern, except we wrap the response rpc ListDatabases(ListDatabasesRequest) returns (ListDatabasesResponse); // Return a specific database by name @@ -110,10 +113,18 @@ message SetServingReadinessRequest { message SetServingReadinessResponse {} -message ListDatabasesRequest {} +message ListDatabasesRequest { + // If true, returns only explciitly defined values. See additional + // details on `GetDatabaseRequest`. + bool omit_defaults = 1; +} message ListDatabasesResponse { - repeated string names = 1; + // old version of this API returned names only. + // repeated string names = 1; + + /// database rules (configuration) for each database + repeated DatabaseRules rules = 2; } message GetDatabaseRequest { diff --git a/influxdb_iox_client/src/client/management.rs b/influxdb_iox_client/src/client/management.rs index b0c358dd7c..05e7aaa392 100644 --- a/influxdb_iox_client/src/client/management.rs +++ b/influxdb_iox_client/src/client/management.rs @@ -501,16 +501,38 @@ impl Client { } /// List databases. - pub async fn list_databases(&mut self) -> Result, ListDatabaseError> { + /// + /// See [`Self::get_database`] for the semanitcs of `omit_defaults` + pub async fn list_databases( + &mut self, + omit_defaults: bool, + ) -> Result, ListDatabaseError> { let response = self .inner - .list_databases(ListDatabasesRequest {}) + .list_databases(ListDatabasesRequest { omit_defaults }) .await .map_err(|status| match status.code() { tonic::Code::Unavailable => ListDatabaseError::Unavailable(status), _ => ListDatabaseError::ServerError(status), })?; - Ok(response.into_inner().names) + + Ok(response.into_inner().rules) + } + + /// List databases names + pub async fn list_database_names(&mut self) -> Result, ListDatabaseError> { + // doesn't really matter as the name is present in all forms + // of the config. Pick true to minimize bandwidth. + let omit_defaults = true; + + let databases = self.list_databases(omit_defaults).await?; + + let names = databases + .iter() + .map(|rules| rules.name.to_string()) + .collect::>(); + + Ok(names) } /// Get database configuration diff --git a/server/src/lib.rs b/server/src/lib.rs index 5b1a48b676..f88eabc3df 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -226,9 +226,6 @@ pub trait DatabaseStore: std::fmt::Debug + Send + Sync { /// The type of error this DataBase store generates type Error: std::error::Error + Send + Sync + 'static; - /// List the database names. - fn db_names_sorted(&self) -> Vec; - /// Retrieve the database specified by `name` returning None if no /// such database exists fn db(&self, name: &str) -> Option>; @@ -635,11 +632,21 @@ where } } - /// Returns a list of `Database` for this `Server` in no particular order + /// Returns a list of `Database` for this `Server` sorted by name pub fn databases(&self) -> Result>> { let state = self.shared.state.read(); let initialized = state.initialized()?; - Ok(initialized.databases.values().cloned().collect()) + let mut databases: Vec<_> = initialized.databases.iter().collect(); + + // ensure the databases come back sorted by name + databases.sort_by_key(|(name, _db)| (*name).clone()); + + let databases = databases + .into_iter() + .map(|(_name, db)| Arc::clone(db)) + .collect(); + + Ok(databases) } /// Get the `Database` by name @@ -1173,24 +1180,6 @@ where type Database = Db; type Error = Error; - fn db_names_sorted(&self) -> Vec { - self.shared - .state - .read() - .initialized() - .map(|initialized| { - let mut keys: Vec<_> = initialized - .databases - .keys() - .map(ToString::to_string) - .collect(); - - keys.sort_unstable(); - keys - }) - .unwrap_or_default() - } - fn db(&self, name: &str) -> Option> { DatabaseName::new(name) .ok() @@ -1216,6 +1205,32 @@ where } } +#[cfg(test)] +impl Server +where + M: ConnectionManager + Send + Sync, +{ + /// For tests: list of database names in this server, regardless + /// of their initialization state + fn db_names_sorted(&self) -> Vec { + self.shared + .state + .read() + .initialized() + .map(|initialized| { + let mut keys: Vec<_> = initialized + .databases + .keys() + .map(ToString::to_string) + .collect(); + + keys.sort_unstable(); + keys + }) + .unwrap_or_default() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/commands/database.rs b/src/commands/database.rs index e99f8eec42..cec9c1dbb4 100644 --- a/src/commands/database.rs +++ b/src/commands/database.rs @@ -216,8 +216,8 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> { } Command::List(_) => { let mut client = management::Client::new(connection); - let databases = client.list_databases().await?; - println!("{}", databases.join("\n")) + let names = client.list_database_names().await?; + println!("{}", names.join("\n")) } Command::Get(get) => { let Get { diff --git a/src/commands/sql/observer.rs b/src/commands/sql/observer.rs index 6224836a66..dcf89f7228 100644 --- a/src/commands/sql/observer.rs +++ b/src/commands/sql/observer.rs @@ -108,7 +108,7 @@ async fn load_remote_system_tables( let mut management_client = influxdb_iox_client::management::Client::new(connection.clone()); let db_names = management_client - .list_databases() + .list_database_names() .await .context(LoadingDatabaseNames)?; diff --git a/src/commands/sql/repl.rs b/src/commands/sql/repl.rs index 5b3f5d01b9..322c539ad6 100644 --- a/src/commands/sql/repl.rs +++ b/src/commands/sql/repl.rs @@ -57,7 +57,7 @@ impl RemoteState { management_client: &mut influxdb_iox_client::management::Client, ) -> Result { let db_names = management_client - .list_databases() + .list_database_names() .await .map_err(|e| Box::new(e) as _) .context(LoadingRemoteState)?; diff --git a/src/influxdb_ioxd.rs b/src/influxdb_ioxd.rs index 6d82f8cb02..dbeaea1c48 100644 --- a/src/influxdb_ioxd.rs +++ b/src/influxdb_ioxd.rs @@ -601,7 +601,7 @@ mod tests { let mut client = influxdb_iox_client::management::Client::new(client); - client.list_databases().await.unwrap(); + client.list_database_names().await.unwrap(); assert_eq!(trace_collector.spans().len(), 0); @@ -624,12 +624,12 @@ mod tests { let mut b3_tracing_client = influxdb_iox_client::management::Client::new(b3_tracing_client); - b3_tracing_client.list_databases().await.unwrap(); + b3_tracing_client.list_database_names().await.unwrap(); b3_tracing_client.get_server_status().await.unwrap(); let conn = jaeger_client(addr, "34f9495:30e34:0:1").await; influxdb_iox_client::management::Client::new(conn) - .list_databases() + .list_database_names() .await .unwrap(); @@ -747,7 +747,7 @@ mod tests { let (addr, server, join) = tracing_server(&collector).await; let conn = jaeger_client(addr, "34f8495:30e34:0:1").await; influxdb_iox_client::management::Client::new(conn) - .list_databases() + .list_database_names() .await .unwrap(); diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index ed54109304..c35ce23136 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -8,7 +8,7 @@ use generated_types::google::{AlreadyExists, FieldViolation, FieldViolationExt, use generated_types::influxdata::iox::management::v1::{Error as ProtobufError, *}; use query::QueryDatabase; use server::rules::ProvidedDatabaseRules; -use server::{ApplicationState, ConnectionManager, DatabaseStore, Error, Server}; +use server::{ApplicationState, ConnectionManager, Error, Server}; use tonic::{Request, Response, Status}; struct ManagementService { @@ -59,10 +59,20 @@ where async fn list_databases( &self, - _: Request, + request: Request, ) -> Result, Status> { - let names = self.server.db_names_sorted(); - Ok(Response::new(ListDatabasesResponse { names })) + let ListDatabasesRequest { omit_defaults } = request.into_inner(); + + let rules = self + .server + .databases() + .map_err(default_server_error_handler)? + .into_iter() + .filter_map(|db| db.provided_rules()) + .map(|rules| format_rules(rules, omit_defaults)) + .collect::>(); + + Ok(Response::new(ListDatabasesResponse { rules })) } async fn get_database( @@ -80,25 +90,17 @@ where .database(&name) .map_err(default_server_error_handler)?; - match database.provided_rules() { - Some(provided_rules) => { - let rules: DatabaseRules = if omit_defaults { - // return rules as originally provided by the user - provided_rules.original().clone() - } else { - // return the active rules (which have all default values filled in) - provided_rules.rules().as_ref().clone().into() - }; - - Ok(Response::new(GetDatabaseResponse { rules: Some(rules) })) - } - None => { - return Err(tonic::Status::unavailable(format!( + let rules = database + .provided_rules() + .map(|rules| format_rules(rules, omit_defaults)) + .ok_or_else(|| { + tonic::Status::unavailable(format!( "Rules have not yet been loaded for database ({})", name - ))) - } - } + )) + })?; + + Ok(Response::new(GetDatabaseResponse { rules: Some(rules) })) } async fn create_database( @@ -559,6 +561,19 @@ where } } +/// returns [`DatabaseRules`] formated accordingo the omit_defaults +/// flag. If omit_defaults is true, returns the stored config, +/// otherwise returns the actual configu +fn format_rules(provided_rules: Arc, omit_defaults: bool) -> DatabaseRules { + if omit_defaults { + // return rules as originally provided by the user + provided_rules.original().clone() + } else { + // return the active rules (which have all default values filled in) + provided_rules.rules().as_ref().clone().into() + } +} + pub fn make_server( application: Arc, server: Arc>, diff --git a/src/influxdb_ioxd/rpc/storage/service.rs b/src/influxdb_ioxd/rpc/storage/service.rs index ca709a8589..b5592ad18b 100644 --- a/src/influxdb_ioxd/rpc/storage/service.rs +++ b/src/influxdb_ioxd/rpc/storage/service.rs @@ -2495,13 +2495,6 @@ mod tests { type Database = TestDatabase; type Error = TestError; - /// List the database names. - fn db_names_sorted(&self) -> Vec { - let databases = self.databases.lock(); - - databases.keys().cloned().collect() - } - /// Retrieve the database specified name fn db(&self, name: &str) -> Option> { let databases = self.databases.lock(); diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 450ba63ba0..acdab2e282 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -189,20 +189,72 @@ async fn test_list_databases() { let server_fixture = ServerFixture::create_shared().await; let mut client = server_fixture.management_client(); - let name = rand_name(); + let name1 = rand_name(); + let rules1 = DatabaseRules { + name: name1.clone(), + ..Default::default() + }; client - .create_database(DatabaseRules { - name: name.clone(), - ..Default::default() - }) + .create_database(rules1) .await .expect("create database failed"); - let names = client - .list_databases() + let name2 = rand_name(); + // Only set the worker cleanup rules. + let rules2 = DatabaseRules { + name: name2.clone(), + worker_cleanup_avg_sleep: Some(Duration { + seconds: 2, + nanos: 0, + }), + ..Default::default() + }; + client + .create_database(rules2) .await - .expect("list databases failed"); - assert!(names.contains(&name)); + .expect("create database failed"); + + // By default, should get both databases names back + let omit_defaults = false; + let databases: Vec<_> = client + .list_databases(omit_defaults) + .await + .expect("list databases failed") + .into_iter() + // names may contain the names of other databases created by + // concurrent tests as well + .filter(|rules| rules.name == name1 || rules.name == name2) + .collect(); + + let names: Vec<_> = databases.iter().map(|rules| rules.name.clone()).collect(); + + assert!(dbg!(&names).contains(&name1)); + assert!(dbg!(&names).contains(&name2)); + + // validate that both rules have the defaults filled in + for rules in &databases { + assert!(rules.lifecycle_rules.is_some()); + } + + // now fetch without defaults, and neither should have their rules filled in + let omit_defaults = true; + let databases: Vec<_> = client + .list_databases(omit_defaults) + .await + .expect("list databases failed") + .into_iter() + // names may contain the names of other databases created by + // concurrent tests as well + .filter(|rules| rules.name == name1 || rules.name == name2) + .collect(); + + let names: Vec<_> = databases.iter().map(|rules| rules.name.clone()).collect(); + assert!(dbg!(&names).contains(&name1)); + assert!(dbg!(&names).contains(&name2)); + + for rules in &databases { + assert!(rules.lifecycle_rules.is_none()); + } } #[tokio::test]