From e06d65bb2ac24c19f63885ebd05086d44a5b8ac8 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 4 Jun 2021 11:33:36 +0200 Subject: [PATCH] refactor: migrate "DBs initialized" RPC to "server status" --- .../iox/management/v1/service.proto | 34 +++++++-- influxdb_iox_client/src/client/management.rs | 12 +-- server/src/lib.rs | 75 ++++++++----------- src/commands/server.rs | 16 ++-- src/influxdb_ioxd/http.rs | 10 +-- src/influxdb_ioxd/rpc/management.rs | 28 +++++-- tests/common/server_fixture.rs | 22 +++--- tests/end_to_end_cases/management_api.rs | 51 ++++++++++++- tests/end_to_end_cases/preservation.rs | 4 +- tests/end_to_end_cases/read_cli.rs | 8 +- tests/end_to_end_cases/write_api.rs | 18 ++--- 11 files changed, 177 insertions(+), 101 deletions(-) diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index 7319d1f167..4f475fb673 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -62,8 +62,8 @@ service ManagementService { // Close a chunk and move it to the read buffer rpc ClosePartitionChunk(ClosePartitionChunkRequest) returns (ClosePartitionChunkResponse); - // Checks if databases are loaded and server is ready for reads and writes. - rpc AreDatabasesLoaded(AreDatabasesLoadedRequest) returns (AreDatabasesLoadedResponse); + // Get server status + rpc GetServerStatus(GetServerStatusRequest) returns (ServerStatus); } message GetServerIdRequest {} @@ -239,9 +239,29 @@ message ClosePartitionChunkResponse { google.longrunning.Operation operation = 1; } -message AreDatabasesLoadedRequest {} +message GetServerStatusRequest {} -message AreDatabasesLoadedResponse { - // Are DBs loaded? - bool databases_loaded = 1; -} \ No newline at end of file +message ServerStatus { + // Server is initialized, i.e. databases are loaded and accept read/write operations. Furthermore database rules can + // be updaded and new databases can be created. + bool initialized = 1; + + // If present, the server reports a global error condition. + Error error = 2; + + // If `initialized` is true, this contains a complete list of databases. + repeated DatabaseStatus database_statuses = 3; +} + +message DatabaseStatus { + // The name of the database. + string db_name = 1; + + // If present, the database reports an error condition. + Error error = 2; +} + +message Error { + // Message descripting the error. + string message = 1; +} diff --git a/influxdb_iox_client/src/client/management.rs b/influxdb_iox_client/src/client/management.rs index f6dc081e3c..f817b0cabe 100644 --- a/influxdb_iox_client/src/client/management.rs +++ b/influxdb_iox_client/src/client/management.rs @@ -249,9 +249,9 @@ pub enum ClosePartitionChunkError { ServerError(tonic::Status), } -/// Errors returned by [`Client::are_databases_loaded`] +/// Errors returned by [`Client::get_server_status`] #[derive(Debug, Error)] -pub enum AreDatabasesLoadedError { +pub enum GetServerStatusError { /// Client received an unexpected error from the server #[error("Unexpected server error: {}: {}", .0.code(), .0.message())] ServerError(tonic::Status), @@ -330,14 +330,14 @@ impl Client { } /// Check if databases are loaded and ready for read and write. - pub async fn are_databases_loaded(&mut self) -> Result { + pub async fn get_server_status(&mut self) -> Result { let response = self .inner - .are_databases_loaded(AreDatabasesLoadedRequest {}) + .get_server_status(GetServerStatusRequest {}) .await - .map_err(AreDatabasesLoadedError::ServerError)?; + .map_err(GetServerStatusError::ServerError)?; - Ok(response.get_ref().databases_loaded) + Ok(response.into_inner()) } /// Set serving readiness. diff --git a/server/src/lib.rs b/server/src/lib.rs index e39205435a..e2d2fa7588 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -413,10 +413,10 @@ pub struct Server { pub registry: Arc, /// Flags that databases are loaded and server is ready to read/write data. - databases_loaded: AtomicBool, + initialized: AtomicBool, /// Semaphore that limits the number of jobs that load DBs when the serverID is set. - db_load_semaphore: Semaphore, + initialize_semaphore: Semaphore, } #[derive(Debug)] @@ -457,8 +457,8 @@ impl Server { jobs, metrics: Arc::new(ServerMetrics::new(Arc::clone(&metric_registry))), registry: Arc::clone(&metric_registry), - databases_loaded: AtomicBool::new(false), - db_load_semaphore: Semaphore::new(1), + initialized: AtomicBool::new(false), + initialize_semaphore: Semaphore::new(1), } } @@ -475,19 +475,19 @@ impl Server { self.id.get() } - /// Check if databases are loaded and server is ready to read/write. - pub fn databases_loaded(&self) -> bool { + /// Check if server is loaded. Databases are loaded and server is ready to read/write. + pub fn initialized(&self) -> bool { // ordering here isn't that important since this method is not used to check-and-modify the flag - self.databases_loaded.load(Ordering::Relaxed) + self.initialized.load(Ordering::Relaxed) } - /// Requires that databases are loaded and server is ready to read/write. - fn require_dbs_loaded(&self) -> Result { - // since a server ID is the pre-requirement for readyness, check this first + /// Require that server is loaded. Databases are loaded and server is ready to read/write. + fn require_initialized(&self) -> Result { + // since a server ID is the pre-requirement for init, check this first let server_id = self.require_id()?; // ordering here isn't that important since this method is not used to check-and-modify the flag - if self.databases_loaded() { + if self.initialized() { Ok(server_id) } else { Err(Error::DatabasesNotLoaded) @@ -497,7 +497,7 @@ impl Server { /// Tells the server the set of rules for a database. pub async fn create_database(&self, rules: DatabaseRules) -> Result<()> { // Return an error if this server is not yet ready - let server_id = self.require_dbs_loaded()?; + let server_id = self.require_initialized()?; let preserved_catalog = load_or_create_preserved_catalog( rules.db_name(), @@ -556,16 +556,16 @@ impl Server { /// replaced. /// /// This requires the serverID to be set. It will be a no-op if the configs are already loaded and the server is ready. - pub async fn maybe_load_database_configs(&self) -> Result<()> { + pub async fn maybe_initialize_server(&self) -> Result<()> { let _guard = self - .db_load_semaphore + .initialize_semaphore .acquire() .await .expect("semaphore should not be closed"); // Note that we use Acquire-Release ordering for the atomic within the semaphore to ensure that another thread // that enters this semaphore after we've left actually sees the correct `is_ready` flag. - if self.databases_loaded.load(Ordering::Acquire) { + if self.initialized.load(Ordering::Acquire) { // already loaded, so do nothing return Ok(()); } @@ -604,8 +604,8 @@ impl Server { futures::future::join_all(handles).await; // mark as ready (use correct ordering for Acquire-Release) - self.databases_loaded.store(true, Ordering::Release); - info!("loaded databases, server is ready"); + self.initialized.store(true, Ordering::Release); + info!("loaded databases, server is initalized"); Ok(()) } @@ -667,7 +667,7 @@ impl Server { default_time: i64, ) -> Result<()> { // Return an error if this server is not yet ready - self.require_dbs_loaded()?; + self.require_initialized()?; let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?; let db = self @@ -802,7 +802,7 @@ impl Server { pub async fn write_entry(&self, db_name: &str, entry_bytes: Vec) -> Result<()> { // Return an error if this server is not yet ready - self.require_dbs_loaded()?; + self.require_initialized()?; let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?; let db = self @@ -952,7 +952,7 @@ impl Server { while !shutdown.is_cancelled() { if self.require_id().is_ok() { - if let Err(e) = self.maybe_load_database_configs().await { + if let Err(e) = self.maybe_initialize_server().await { error!(%e, "error during DB loading"); } } @@ -1261,7 +1261,7 @@ mod tests { let store = config.store(); let server = Server::new(manager, config); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server.maybe_load_database_configs().await.unwrap(); + server.maybe_initialize_server().await.unwrap(); let name = DatabaseName::new("bananas").unwrap(); @@ -1314,7 +1314,7 @@ mod tests { .with_num_worker_threads(1); let server2 = Server::new(manager, config2); server2.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server2.maybe_load_database_configs().await.unwrap(); + server2.maybe_initialize_server().await.unwrap(); let _ = server2.db(&db2).unwrap(); let _ = server2.db(&name).unwrap(); @@ -1327,7 +1327,7 @@ mod tests { let manager = TestConnectionManager::new(); let server = Server::new(manager, config()); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server.maybe_load_database_configs().await.unwrap(); + server.maybe_initialize_server().await.unwrap(); let name = DatabaseName::new("bananas").unwrap(); @@ -1378,7 +1378,7 @@ mod tests { let config = config_with_store(store); let server = Server::new(manager, config); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server.maybe_load_database_configs().await.unwrap(); + server.maybe_initialize_server().await.unwrap(); create_simple_database(&server, "bananas") .await .expect("failed to create database"); @@ -1394,10 +1394,7 @@ mod tests { let config = config_with_store(store); let server = Server::new(manager, config); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server - .maybe_load_database_configs() - .await - .expect("load config"); + server.maybe_initialize_server().await.expect("load config"); create_simple_database(&server, "apples") .await @@ -1417,10 +1414,7 @@ mod tests { let config = config_with_store(store); let server = Server::new(manager, config); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server - .maybe_load_database_configs() - .await - .expect("load config"); + server.maybe_initialize_server().await.expect("load config"); assert_eq!(server.db_names_sorted(), vec!["apples"]); } @@ -1430,10 +1424,7 @@ mod tests { let manager = TestConnectionManager::new(); let server = Server::new(manager, config()); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server - .maybe_load_database_configs() - .await - .expect("load config"); + server.maybe_initialize_server().await.expect("load config"); let names = vec!["bar", "baz"]; @@ -1454,7 +1445,7 @@ mod tests { let manager = TestConnectionManager::new(); let server = Server::new(manager, config()); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server.maybe_load_database_configs().await.unwrap(); + server.maybe_initialize_server().await.unwrap(); let name = DatabaseName::new("foo".to_string()).unwrap(); server @@ -1495,7 +1486,7 @@ mod tests { let manager = TestConnectionManager::new(); let server = Server::new(manager, config); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server.maybe_load_database_configs().await.unwrap(); + server.maybe_initialize_server().await.unwrap(); let name = DatabaseName::new("foo".to_string()).unwrap(); server @@ -1582,7 +1573,7 @@ mod tests { let server = Server::new(manager, config()); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server.maybe_load_database_configs().await.unwrap(); + server.maybe_initialize_server().await.unwrap(); let db_name = DatabaseName::new("foo").unwrap(); server @@ -1663,7 +1654,7 @@ mod tests { let background_handle = spawn_worker(Arc::clone(&server), cancel_token.clone()); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server.maybe_load_database_configs().await.unwrap(); + server.maybe_initialize_server().await.unwrap(); let db_name = DatabaseName::new("foo").unwrap(); server @@ -1825,7 +1816,7 @@ mod tests { let manager = TestConnectionManager::new(); let server = Server::new(manager, config()); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - server.maybe_load_database_configs().await.unwrap(); + server.maybe_initialize_server().await.unwrap(); let name = DatabaseName::new("foo".to_string()).unwrap(); server @@ -1907,7 +1898,7 @@ mod tests { let t_0 = Instant::now(); loop { - if server.require_dbs_loaded().is_ok() { + if server.require_initialized().is_ok() { break; } assert!(t_0.elapsed() < Duration::from_secs(10)); diff --git a/src/commands/server.rs b/src/commands/server.rs index f7948cf5c9..41ad45a356 100644 --- a/src/commands/server.rs +++ b/src/commands/server.rs @@ -22,7 +22,7 @@ pub enum Error { ConnectionError(#[from] influxdb_iox_client::connection::Error), #[error("Error checking if databases are loded: {0}")] - AreDatabasesLoadedError(#[from] AreDatabasesLoadedError), + AreDatabasesLoadedError(#[from] GetServerStatusError), #[error("Timeout waiting for databases to be loaded")] TimeoutDatabasesLoaded, @@ -45,8 +45,8 @@ enum Command { /// Get server ID Get, - /// Wait until databases are loaded - WaitDatabasesLoaded(WaitDatabasesLoaded), + /// Wait until server is initialized. + WaitServerInitialized(WaitSeverInitialized), Remote(crate::commands::server_remote::Config), } @@ -58,9 +58,9 @@ struct Set { id: u32, } -/// Wait until databases are loaded +/// Wait until server is initialized. #[derive(Debug, StructOpt)] -struct WaitDatabasesLoaded { +struct WaitSeverInitialized { /// Timeout in seconds. #[structopt(short, default_value = "10")] timeout: u64, @@ -83,11 +83,11 @@ pub async fn command(url: String, config: Config) -> Result<()> { println!("{}", id); Ok(()) } - Command::WaitDatabasesLoaded(command) => { + Command::WaitServerInitialized(command) => { let end = Instant::now() + Duration::from_secs(command.timeout); loop { - if client.are_databases_loaded().await? { - println!("Databases loaded."); + if client.get_server_status().await?.initialized { + println!("Server initialized."); return Ok(()); } if Instant::now() >= end { diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index 64e89cda92..b2312c1ec0 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -911,7 +911,7 @@ mod tests { let (_, config) = config(); let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config)); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - app_server.maybe_load_database_configs().await.unwrap(); + app_server.maybe_initialize_server().await.unwrap(); app_server .create_database(DatabaseRules::new( DatabaseName::new("MyOrg_MyBucket").unwrap(), @@ -959,7 +959,7 @@ mod tests { let (metrics_registry, config) = config(); let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config)); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - app_server.maybe_load_database_configs().await.unwrap(); + app_server.maybe_initialize_server().await.unwrap(); app_server .create_database(DatabaseRules::new( DatabaseName::new("MetricsOrg_MetricsBucket").unwrap(), @@ -1049,7 +1049,7 @@ mod tests { let (_, config) = config(); let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config)); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - app_server.maybe_load_database_configs().await.unwrap(); + app_server.maybe_initialize_server().await.unwrap(); app_server .create_database(DatabaseRules::new( DatabaseName::new("MyOrg_MyBucket").unwrap(), @@ -1186,7 +1186,7 @@ mod tests { let (_, config) = config(); let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config)); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - app_server.maybe_load_database_configs().await.unwrap(); + app_server.maybe_initialize_server().await.unwrap(); app_server .create_database(DatabaseRules::new( DatabaseName::new("MyOrg_MyBucket").unwrap(), @@ -1235,7 +1235,7 @@ mod tests { let (_, config) = config(); let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config)); app_server.set_id(ServerId::try_from(1).unwrap()).unwrap(); - app_server.maybe_load_database_configs().await.unwrap(); + app_server.maybe_initialize_server().await.unwrap(); app_server .create_database(DatabaseRules::new( DatabaseName::new("MyOrg_MyBucket").unwrap(), diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 7e0273ffb8..a5e6de329e 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -372,12 +372,30 @@ where Ok(Response::new(SetServingReadinessResponse {})) } - async fn are_databases_loaded( + async fn get_server_status( &self, - _request: Request, - ) -> Result, Status> { - Ok(Response::new(AreDatabasesLoadedResponse { - databases_loaded: self.server.databases_loaded(), + _request: Request, + ) -> Result, Status> { + // TODO: wire up errors (https://github.com/influxdata/influxdb_iox/issues/1624) + let initialized = self.server.initialized(); + + let database_statuses: Vec<_> = if initialized { + self.server + .db_names_sorted() + .into_iter() + .map(|db_name| DatabaseStatus { + db_name, + error: None, + }) + .collect() + } else { + Default::default() + }; + + Ok(Response::new(ServerStatus { + initialized, + error: None, + database_statuses, })) } } diff --git a/tests/common/server_fixture.rs b/tests/common/server_fixture.rs index 0c2f8373b8..af7f3a3341 100644 --- a/tests/common/server_fixture.rs +++ b/tests/common/server_fixture.rs @@ -216,12 +216,12 @@ impl ServerFixture { } } - /// Wait until server returns that databases are loaded. - pub async fn wait_dbs_loaded(&self) { + /// Wait until server is initialized. Databases should be loaded. + pub async fn wait_server_initialized(&self) { let mut client = self.management_client(); let t_0 = Instant::now(); loop { - if client.are_databases_loaded().await.unwrap() { + if client.get_server_status().await.unwrap().initialized { break; } assert!(t_0.elapsed() < Duration::from_secs(10)); @@ -422,15 +422,13 @@ impl TestServer { let mut interval = tokio::time::interval(Duration::from_millis(500)); loop { - match management_client.are_databases_loaded().await { - Ok(loaded) => { - if loaded { - return; - } - } - Err(e) => { - println!("Waiting for databases being loaded: {}", e); - } + if management_client + .get_server_status() + .await + .unwrap() + .initialized + { + return; } interval.tick().await; } diff --git a/tests/end_to_end_cases/management_api.rs b/tests/end_to_end_cases/management_api.rs index 1ab6e9dfe0..5257041bb3 100644 --- a/tests/end_to_end_cases/management_api.rs +++ b/tests/end_to_end_cases/management_api.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use generated_types::{ google::protobuf::{Duration, Empty}, influxdata::iox::management::v1::{database_rules::RoutingRules, *}, @@ -25,7 +27,7 @@ async fn test_serving_readiness() { .update_server_id(42) .await .expect("set ID failed"); - server_fixture.wait_dbs_loaded().await; + server_fixture.wait_server_initialized().await; mgmt_client .create_database(DatabaseRules { name: name.to_string(), @@ -784,3 +786,50 @@ fn normalize_chunks(chunks: Vec) -> Vec { }) .collect::>() } + +#[tokio::test] +async fn test_get_server_status() { + let server_fixture = ServerFixture::create_single_use().await; + let mut client = server_fixture.management_client(); + + // not initalized + let status = client.get_server_status().await.unwrap(); + assert!(!status.initialized); + + // initialize + client.update_server_id(42).await.expect("set ID failed"); + server_fixture.wait_server_initialized().await; + + // now initalized + let status = client.get_server_status().await.unwrap(); + assert!(status.initialized); + + // create DBs + let db_name1 = rand_name(); + let db_name2 = rand_name(); + client + .create_database(DatabaseRules { + name: db_name1.clone(), + ..Default::default() + }) + .await + .expect("create database failed"); + + client + .create_database(DatabaseRules { + name: db_name2.clone(), + ..Default::default() + }) + .await + .expect("create database failed"); + + // databases are listed + let status = client.get_server_status().await.unwrap(); + let names_actual: HashSet<_> = status + .database_statuses + .iter() + .map(|db_status| db_status.db_name.clone()) + .collect(); + let names_expected: HashSet<_> = [db_name1, db_name2].iter().cloned().collect(); + assert_eq!(names_actual, names_expected); +} diff --git a/tests/end_to_end_cases/preservation.rs b/tests/end_to_end_cases/preservation.rs index f1f92a229d..3ff519f394 100644 --- a/tests/end_to_end_cases/preservation.rs +++ b/tests/end_to_end_cases/preservation.rs @@ -18,7 +18,7 @@ async fn test_query_chunk_after_restart() { .update_server_id(server_id) .await .expect("set ID failed"); - fixture.wait_dbs_loaded().await; + fixture.wait_server_initialized().await; // create DB and a RB chunk create_readable_database(&db_name, fixture.grpc_channel()).await; @@ -47,7 +47,7 @@ async fn test_query_chunk_after_restart() { // restart server let fixture = fixture.restart_server().await; - fixture.wait_dbs_loaded().await; + fixture.wait_server_initialized().await; // query data after restart assert_chunk_query_works(&fixture, &db_name).await; diff --git a/tests/end_to_end_cases/read_cli.rs b/tests/end_to_end_cases/read_cli.rs index f4cc1f6caf..cfba2af9dc 100644 --- a/tests/end_to_end_cases/read_cli.rs +++ b/tests/end_to_end_cases/read_cli.rs @@ -13,7 +13,7 @@ pub async fn test() { let addr = server_fixture.grpc_base(); set_server_id(addr).await; - wait_databases_loaded(addr).await; + wait_server_initialized(addr).await; create_database(&db_name, addr).await; test_read_default(&db_name, addr).await; test_read_format_pretty(&db_name, addr).await; @@ -35,16 +35,16 @@ async fn set_server_id(addr: &str) { .stdout(predicate::str::contains("Ok")); } -async fn wait_databases_loaded(addr: &str) { +async fn wait_server_initialized(addr: &str) { Command::cargo_bin("influxdb_iox") .unwrap() .arg("server") - .arg("wait-databases-loaded") + .arg("wait-server-initialized") .arg("--host") .arg(addr) .assert() .success() - .stdout(predicate::str::contains("Databases loaded.")); + .stdout(predicate::str::contains("Server initialized.")); } async fn create_database(db_name: &str, addr: &str) { diff --git a/tests/end_to_end_cases/write_api.rs b/tests/end_to_end_cases/write_api.rs index 9cb9060be3..a1ec46fa42 100644 --- a/tests/end_to_end_cases/write_api.rs +++ b/tests/end_to_end_cases/write_api.rs @@ -148,7 +148,7 @@ async fn test_write_routed() { .update_server_id(TEST_ROUTER_ID) .await .expect("set ID failed"); - router.wait_dbs_loaded().await; + router.wait_server_initialized().await; let target_1 = ServerFixture::create_single_use().await; let mut target_1_mgmt = target_1.management_client(); @@ -156,7 +156,7 @@ async fn test_write_routed() { .update_server_id(TEST_TARGET_ID_1) .await .expect("set ID failed"); - target_1.wait_dbs_loaded().await; + target_1.wait_server_initialized().await; router_mgmt .update_remote(TEST_REMOTE_ID_1, target_1.grpc_base()) @@ -169,7 +169,7 @@ async fn test_write_routed() { .update_server_id(TEST_TARGET_ID_2) .await .expect("set ID failed"); - target_2.wait_dbs_loaded().await; + target_2.wait_server_initialized().await; router_mgmt .update_remote(TEST_REMOTE_ID_2, target_2.grpc_base()) @@ -182,7 +182,7 @@ async fn test_write_routed() { .update_server_id(TEST_TARGET_ID_3) .await .expect("set ID failed"); - target_3.wait_dbs_loaded().await; + target_3.wait_server_initialized().await; router_mgmt .update_remote(TEST_REMOTE_ID_3, target_3.grpc_base()) @@ -378,7 +378,7 @@ async fn test_write_routed_errors() { .update_server_id(TEST_ROUTER_ID) .await .expect("set ID failed"); - router.wait_dbs_loaded().await; + router.wait_server_initialized().await; let db_name = rand_name(); create_readable_database(&db_name, router.grpc_channel()).await; @@ -454,7 +454,7 @@ async fn test_write_routed_no_shard() { .update_server_id(TEST_ROUTER_ID) .await .expect("set ID failed"); - router.wait_dbs_loaded().await; + router.wait_server_initialized().await; let target_1 = ServerFixture::create_single_use().await; let mut target_1_mgmt = target_1.management_client(); @@ -462,7 +462,7 @@ async fn test_write_routed_no_shard() { .update_server_id(TEST_TARGET_ID_1) .await .expect("set ID failed"); - target_1.wait_dbs_loaded().await; + target_1.wait_server_initialized().await; router_mgmt .update_remote(TEST_REMOTE_ID_1, target_1.grpc_base()) @@ -475,7 +475,7 @@ async fn test_write_routed_no_shard() { .update_server_id(TEST_TARGET_ID_2) .await .expect("set ID failed"); - target_2.wait_dbs_loaded().await; + target_2.wait_server_initialized().await; router_mgmt .update_remote(TEST_REMOTE_ID_2, target_2.grpc_base()) @@ -488,7 +488,7 @@ async fn test_write_routed_no_shard() { .update_server_id(TEST_TARGET_ID_3) .await .expect("set ID failed"); - target_3.wait_dbs_loaded().await; + target_3.wait_server_initialized().await; router_mgmt .update_remote(TEST_REMOTE_ID_3, target_3.grpc_base())