refactor: migrate "DBs initialized" RPC to "server status"

pull/24376/head
Marco Neumann 2021-06-04 11:33:36 +02:00
parent e876eed8c4
commit e06d65bb2a
11 changed files with 177 additions and 101 deletions

View File

@ -62,8 +62,8 @@ service ManagementService {
// Close a chunk and move it to the read buffer
rpc ClosePartitionChunk(ClosePartitionChunkRequest) returns (ClosePartitionChunkResponse);
// Checks if databases are loaded and server is ready for reads and writes.
rpc AreDatabasesLoaded(AreDatabasesLoadedRequest) returns (AreDatabasesLoadedResponse);
// Get server status
rpc GetServerStatus(GetServerStatusRequest) returns (ServerStatus);
}
message GetServerIdRequest {}
@ -239,9 +239,29 @@ message ClosePartitionChunkResponse {
google.longrunning.Operation operation = 1;
}
message AreDatabasesLoadedRequest {}
message GetServerStatusRequest {}
message AreDatabasesLoadedResponse {
// Are DBs loaded?
bool databases_loaded = 1;
message ServerStatus {
// Server is initialized, i.e. databases are loaded and accept read/write operations. Furthermore database rules can
// be updaded and new databases can be created.
bool initialized = 1;
// If present, the server reports a global error condition.
Error error = 2;
// If `initialized` is true, this contains a complete list of databases.
repeated DatabaseStatus database_statuses = 3;
}
message DatabaseStatus {
// The name of the database.
string db_name = 1;
// If present, the database reports an error condition.
Error error = 2;
}
message Error {
// Message descripting the error.
string message = 1;
}

View File

@ -249,9 +249,9 @@ pub enum ClosePartitionChunkError {
ServerError(tonic::Status),
}
/// Errors returned by [`Client::are_databases_loaded`]
/// Errors returned by [`Client::get_server_status`]
#[derive(Debug, Error)]
pub enum AreDatabasesLoadedError {
pub enum GetServerStatusError {
/// Client received an unexpected error from the server
#[error("Unexpected server error: {}: {}", .0.code(), .0.message())]
ServerError(tonic::Status),
@ -330,14 +330,14 @@ impl Client {
}
/// Check if databases are loaded and ready for read and write.
pub async fn are_databases_loaded(&mut self) -> Result<bool, AreDatabasesLoadedError> {
pub async fn get_server_status(&mut self) -> Result<ServerStatus, GetServerStatusError> {
let response = self
.inner
.are_databases_loaded(AreDatabasesLoadedRequest {})
.get_server_status(GetServerStatusRequest {})
.await
.map_err(AreDatabasesLoadedError::ServerError)?;
.map_err(GetServerStatusError::ServerError)?;
Ok(response.get_ref().databases_loaded)
Ok(response.into_inner())
}
/// Set serving readiness.

View File

@ -413,10 +413,10 @@ pub struct Server<M: ConnectionManager> {
pub registry: Arc<metrics::MetricRegistry>,
/// Flags that databases are loaded and server is ready to read/write data.
databases_loaded: AtomicBool,
initialized: AtomicBool,
/// Semaphore that limits the number of jobs that load DBs when the serverID is set.
db_load_semaphore: Semaphore,
initialize_semaphore: Semaphore,
}
#[derive(Debug)]
@ -457,8 +457,8 @@ impl<M: ConnectionManager> Server<M> {
jobs,
metrics: Arc::new(ServerMetrics::new(Arc::clone(&metric_registry))),
registry: Arc::clone(&metric_registry),
databases_loaded: AtomicBool::new(false),
db_load_semaphore: Semaphore::new(1),
initialized: AtomicBool::new(false),
initialize_semaphore: Semaphore::new(1),
}
}
@ -475,19 +475,19 @@ impl<M: ConnectionManager> Server<M> {
self.id.get()
}
/// Check if databases are loaded and server is ready to read/write.
pub fn databases_loaded(&self) -> bool {
/// Check if server is loaded. Databases are loaded and server is ready to read/write.
pub fn initialized(&self) -> bool {
// ordering here isn't that important since this method is not used to check-and-modify the flag
self.databases_loaded.load(Ordering::Relaxed)
self.initialized.load(Ordering::Relaxed)
}
/// Requires that databases are loaded and server is ready to read/write.
fn require_dbs_loaded(&self) -> Result<ServerId> {
// since a server ID is the pre-requirement for readyness, check this first
/// Require that server is loaded. Databases are loaded and server is ready to read/write.
fn require_initialized(&self) -> Result<ServerId> {
// since a server ID is the pre-requirement for init, check this first
let server_id = self.require_id()?;
// ordering here isn't that important since this method is not used to check-and-modify the flag
if self.databases_loaded() {
if self.initialized() {
Ok(server_id)
} else {
Err(Error::DatabasesNotLoaded)
@ -497,7 +497,7 @@ impl<M: ConnectionManager> Server<M> {
/// Tells the server the set of rules for a database.
pub async fn create_database(&self, rules: DatabaseRules) -> Result<()> {
// Return an error if this server is not yet ready
let server_id = self.require_dbs_loaded()?;
let server_id = self.require_initialized()?;
let preserved_catalog = load_or_create_preserved_catalog(
rules.db_name(),
@ -556,16 +556,16 @@ impl<M: ConnectionManager> Server<M> {
/// replaced.
///
/// This requires the serverID to be set. It will be a no-op if the configs are already loaded and the server is ready.
pub async fn maybe_load_database_configs(&self) -> Result<()> {
pub async fn maybe_initialize_server(&self) -> Result<()> {
let _guard = self
.db_load_semaphore
.initialize_semaphore
.acquire()
.await
.expect("semaphore should not be closed");
// Note that we use Acquire-Release ordering for the atomic within the semaphore to ensure that another thread
// that enters this semaphore after we've left actually sees the correct `is_ready` flag.
if self.databases_loaded.load(Ordering::Acquire) {
if self.initialized.load(Ordering::Acquire) {
// already loaded, so do nothing
return Ok(());
}
@ -604,8 +604,8 @@ impl<M: ConnectionManager> Server<M> {
futures::future::join_all(handles).await;
// mark as ready (use correct ordering for Acquire-Release)
self.databases_loaded.store(true, Ordering::Release);
info!("loaded databases, server is ready");
self.initialized.store(true, Ordering::Release);
info!("loaded databases, server is initalized");
Ok(())
}
@ -667,7 +667,7 @@ impl<M: ConnectionManager> Server<M> {
default_time: i64,
) -> Result<()> {
// Return an error if this server is not yet ready
self.require_dbs_loaded()?;
self.require_initialized()?;
let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?;
let db = self
@ -802,7 +802,7 @@ impl<M: ConnectionManager> Server<M> {
pub async fn write_entry(&self, db_name: &str, entry_bytes: Vec<u8>) -> Result<()> {
// Return an error if this server is not yet ready
self.require_dbs_loaded()?;
self.require_initialized()?;
let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?;
let db = self
@ -952,7 +952,7 @@ impl<M: ConnectionManager> Server<M> {
while !shutdown.is_cancelled() {
if self.require_id().is_ok() {
if let Err(e) = self.maybe_load_database_configs().await {
if let Err(e) = self.maybe_initialize_server().await {
error!(%e, "error during DB loading");
}
}
@ -1261,7 +1261,7 @@ mod tests {
let store = config.store();
let server = Server::new(manager, config);
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_load_database_configs().await.unwrap();
server.maybe_initialize_server().await.unwrap();
let name = DatabaseName::new("bananas").unwrap();
@ -1314,7 +1314,7 @@ mod tests {
.with_num_worker_threads(1);
let server2 = Server::new(manager, config2);
server2.set_id(ServerId::try_from(1).unwrap()).unwrap();
server2.maybe_load_database_configs().await.unwrap();
server2.maybe_initialize_server().await.unwrap();
let _ = server2.db(&db2).unwrap();
let _ = server2.db(&name).unwrap();
@ -1327,7 +1327,7 @@ mod tests {
let manager = TestConnectionManager::new();
let server = Server::new(manager, config());
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_load_database_configs().await.unwrap();
server.maybe_initialize_server().await.unwrap();
let name = DatabaseName::new("bananas").unwrap();
@ -1378,7 +1378,7 @@ mod tests {
let config = config_with_store(store);
let server = Server::new(manager, config);
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_load_database_configs().await.unwrap();
server.maybe_initialize_server().await.unwrap();
create_simple_database(&server, "bananas")
.await
.expect("failed to create database");
@ -1394,10 +1394,7 @@ mod tests {
let config = config_with_store(store);
let server = Server::new(manager, config);
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server
.maybe_load_database_configs()
.await
.expect("load config");
server.maybe_initialize_server().await.expect("load config");
create_simple_database(&server, "apples")
.await
@ -1417,10 +1414,7 @@ mod tests {
let config = config_with_store(store);
let server = Server::new(manager, config);
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server
.maybe_load_database_configs()
.await
.expect("load config");
server.maybe_initialize_server().await.expect("load config");
assert_eq!(server.db_names_sorted(), vec!["apples"]);
}
@ -1430,10 +1424,7 @@ mod tests {
let manager = TestConnectionManager::new();
let server = Server::new(manager, config());
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server
.maybe_load_database_configs()
.await
.expect("load config");
server.maybe_initialize_server().await.expect("load config");
let names = vec!["bar", "baz"];
@ -1454,7 +1445,7 @@ mod tests {
let manager = TestConnectionManager::new();
let server = Server::new(manager, config());
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_load_database_configs().await.unwrap();
server.maybe_initialize_server().await.unwrap();
let name = DatabaseName::new("foo".to_string()).unwrap();
server
@ -1495,7 +1486,7 @@ mod tests {
let manager = TestConnectionManager::new();
let server = Server::new(manager, config);
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_load_database_configs().await.unwrap();
server.maybe_initialize_server().await.unwrap();
let name = DatabaseName::new("foo".to_string()).unwrap();
server
@ -1582,7 +1573,7 @@ mod tests {
let server = Server::new(manager, config());
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_load_database_configs().await.unwrap();
server.maybe_initialize_server().await.unwrap();
let db_name = DatabaseName::new("foo").unwrap();
server
@ -1663,7 +1654,7 @@ mod tests {
let background_handle = spawn_worker(Arc::clone(&server), cancel_token.clone());
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_load_database_configs().await.unwrap();
server.maybe_initialize_server().await.unwrap();
let db_name = DatabaseName::new("foo").unwrap();
server
@ -1825,7 +1816,7 @@ mod tests {
let manager = TestConnectionManager::new();
let server = Server::new(manager, config());
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.maybe_load_database_configs().await.unwrap();
server.maybe_initialize_server().await.unwrap();
let name = DatabaseName::new("foo".to_string()).unwrap();
server
@ -1907,7 +1898,7 @@ mod tests {
let t_0 = Instant::now();
loop {
if server.require_dbs_loaded().is_ok() {
if server.require_initialized().is_ok() {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));

View File

@ -22,7 +22,7 @@ pub enum Error {
ConnectionError(#[from] influxdb_iox_client::connection::Error),
#[error("Error checking if databases are loded: {0}")]
AreDatabasesLoadedError(#[from] AreDatabasesLoadedError),
AreDatabasesLoadedError(#[from] GetServerStatusError),
#[error("Timeout waiting for databases to be loaded")]
TimeoutDatabasesLoaded,
@ -45,8 +45,8 @@ enum Command {
/// Get server ID
Get,
/// Wait until databases are loaded
WaitDatabasesLoaded(WaitDatabasesLoaded),
/// Wait until server is initialized.
WaitServerInitialized(WaitSeverInitialized),
Remote(crate::commands::server_remote::Config),
}
@ -58,9 +58,9 @@ struct Set {
id: u32,
}
/// Wait until databases are loaded
/// Wait until server is initialized.
#[derive(Debug, StructOpt)]
struct WaitDatabasesLoaded {
struct WaitSeverInitialized {
/// Timeout in seconds.
#[structopt(short, default_value = "10")]
timeout: u64,
@ -83,11 +83,11 @@ pub async fn command(url: String, config: Config) -> Result<()> {
println!("{}", id);
Ok(())
}
Command::WaitDatabasesLoaded(command) => {
Command::WaitServerInitialized(command) => {
let end = Instant::now() + Duration::from_secs(command.timeout);
loop {
if client.are_databases_loaded().await? {
println!("Databases loaded.");
if client.get_server_status().await?.initialized {
println!("Server initialized.");
return Ok(());
}
if Instant::now() >= end {

View File

@ -911,7 +911,7 @@ mod tests {
let (_, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_load_database_configs().await.unwrap();
app_server.maybe_initialize_server().await.unwrap();
app_server
.create_database(DatabaseRules::new(
DatabaseName::new("MyOrg_MyBucket").unwrap(),
@ -959,7 +959,7 @@ mod tests {
let (metrics_registry, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_load_database_configs().await.unwrap();
app_server.maybe_initialize_server().await.unwrap();
app_server
.create_database(DatabaseRules::new(
DatabaseName::new("MetricsOrg_MetricsBucket").unwrap(),
@ -1049,7 +1049,7 @@ mod tests {
let (_, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_load_database_configs().await.unwrap();
app_server.maybe_initialize_server().await.unwrap();
app_server
.create_database(DatabaseRules::new(
DatabaseName::new("MyOrg_MyBucket").unwrap(),
@ -1186,7 +1186,7 @@ mod tests {
let (_, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_load_database_configs().await.unwrap();
app_server.maybe_initialize_server().await.unwrap();
app_server
.create_database(DatabaseRules::new(
DatabaseName::new("MyOrg_MyBucket").unwrap(),
@ -1235,7 +1235,7 @@ mod tests {
let (_, config) = config();
let app_server = Arc::new(AppServer::new(ConnectionManagerImpl {}, config));
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
app_server.maybe_load_database_configs().await.unwrap();
app_server.maybe_initialize_server().await.unwrap();
app_server
.create_database(DatabaseRules::new(
DatabaseName::new("MyOrg_MyBucket").unwrap(),

View File

@ -372,12 +372,30 @@ where
Ok(Response::new(SetServingReadinessResponse {}))
}
async fn are_databases_loaded(
async fn get_server_status(
&self,
_request: Request<AreDatabasesLoadedRequest>,
) -> Result<Response<AreDatabasesLoadedResponse>, Status> {
Ok(Response::new(AreDatabasesLoadedResponse {
databases_loaded: self.server.databases_loaded(),
_request: Request<GetServerStatusRequest>,
) -> Result<Response<ServerStatus>, Status> {
// TODO: wire up errors (https://github.com/influxdata/influxdb_iox/issues/1624)
let initialized = self.server.initialized();
let database_statuses: Vec<_> = if initialized {
self.server
.db_names_sorted()
.into_iter()
.map(|db_name| DatabaseStatus {
db_name,
error: None,
})
.collect()
} else {
Default::default()
};
Ok(Response::new(ServerStatus {
initialized,
error: None,
database_statuses,
}))
}
}

View File

@ -216,12 +216,12 @@ impl ServerFixture {
}
}
/// Wait until server returns that databases are loaded.
pub async fn wait_dbs_loaded(&self) {
/// Wait until server is initialized. Databases should be loaded.
pub async fn wait_server_initialized(&self) {
let mut client = self.management_client();
let t_0 = Instant::now();
loop {
if client.are_databases_loaded().await.unwrap() {
if client.get_server_status().await.unwrap().initialized {
break;
}
assert!(t_0.elapsed() < Duration::from_secs(10));
@ -422,16 +422,14 @@ impl TestServer {
let mut interval = tokio::time::interval(Duration::from_millis(500));
loop {
match management_client.are_databases_loaded().await {
Ok(loaded) => {
if loaded {
if management_client
.get_server_status()
.await
.unwrap()
.initialized
{
return;
}
}
Err(e) => {
println!("Waiting for databases being loaded: {}", e);
}
}
interval.tick().await;
}
};

View File

@ -1,3 +1,5 @@
use std::collections::HashSet;
use generated_types::{
google::protobuf::{Duration, Empty},
influxdata::iox::management::v1::{database_rules::RoutingRules, *},
@ -25,7 +27,7 @@ async fn test_serving_readiness() {
.update_server_id(42)
.await
.expect("set ID failed");
server_fixture.wait_dbs_loaded().await;
server_fixture.wait_server_initialized().await;
mgmt_client
.create_database(DatabaseRules {
name: name.to_string(),
@ -784,3 +786,50 @@ fn normalize_chunks(chunks: Vec<Chunk>) -> Vec<Chunk> {
})
.collect::<Vec<_>>()
}
#[tokio::test]
async fn test_get_server_status() {
let server_fixture = ServerFixture::create_single_use().await;
let mut client = server_fixture.management_client();
// not initalized
let status = client.get_server_status().await.unwrap();
assert!(!status.initialized);
// initialize
client.update_server_id(42).await.expect("set ID failed");
server_fixture.wait_server_initialized().await;
// now initalized
let status = client.get_server_status().await.unwrap();
assert!(status.initialized);
// create DBs
let db_name1 = rand_name();
let db_name2 = rand_name();
client
.create_database(DatabaseRules {
name: db_name1.clone(),
..Default::default()
})
.await
.expect("create database failed");
client
.create_database(DatabaseRules {
name: db_name2.clone(),
..Default::default()
})
.await
.expect("create database failed");
// databases are listed
let status = client.get_server_status().await.unwrap();
let names_actual: HashSet<_> = status
.database_statuses
.iter()
.map(|db_status| db_status.db_name.clone())
.collect();
let names_expected: HashSet<_> = [db_name1, db_name2].iter().cloned().collect();
assert_eq!(names_actual, names_expected);
}

View File

@ -18,7 +18,7 @@ async fn test_query_chunk_after_restart() {
.update_server_id(server_id)
.await
.expect("set ID failed");
fixture.wait_dbs_loaded().await;
fixture.wait_server_initialized().await;
// create DB and a RB chunk
create_readable_database(&db_name, fixture.grpc_channel()).await;
@ -47,7 +47,7 @@ async fn test_query_chunk_after_restart() {
// restart server
let fixture = fixture.restart_server().await;
fixture.wait_dbs_loaded().await;
fixture.wait_server_initialized().await;
// query data after restart
assert_chunk_query_works(&fixture, &db_name).await;

View File

@ -13,7 +13,7 @@ pub async fn test() {
let addr = server_fixture.grpc_base();
set_server_id(addr).await;
wait_databases_loaded(addr).await;
wait_server_initialized(addr).await;
create_database(&db_name, addr).await;
test_read_default(&db_name, addr).await;
test_read_format_pretty(&db_name, addr).await;
@ -35,16 +35,16 @@ async fn set_server_id(addr: &str) {
.stdout(predicate::str::contains("Ok"));
}
async fn wait_databases_loaded(addr: &str) {
async fn wait_server_initialized(addr: &str) {
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("server")
.arg("wait-databases-loaded")
.arg("wait-server-initialized")
.arg("--host")
.arg(addr)
.assert()
.success()
.stdout(predicate::str::contains("Databases loaded."));
.stdout(predicate::str::contains("Server initialized."));
}
async fn create_database(db_name: &str, addr: &str) {

View File

@ -148,7 +148,7 @@ async fn test_write_routed() {
.update_server_id(TEST_ROUTER_ID)
.await
.expect("set ID failed");
router.wait_dbs_loaded().await;
router.wait_server_initialized().await;
let target_1 = ServerFixture::create_single_use().await;
let mut target_1_mgmt = target_1.management_client();
@ -156,7 +156,7 @@ async fn test_write_routed() {
.update_server_id(TEST_TARGET_ID_1)
.await
.expect("set ID failed");
target_1.wait_dbs_loaded().await;
target_1.wait_server_initialized().await;
router_mgmt
.update_remote(TEST_REMOTE_ID_1, target_1.grpc_base())
@ -169,7 +169,7 @@ async fn test_write_routed() {
.update_server_id(TEST_TARGET_ID_2)
.await
.expect("set ID failed");
target_2.wait_dbs_loaded().await;
target_2.wait_server_initialized().await;
router_mgmt
.update_remote(TEST_REMOTE_ID_2, target_2.grpc_base())
@ -182,7 +182,7 @@ async fn test_write_routed() {
.update_server_id(TEST_TARGET_ID_3)
.await
.expect("set ID failed");
target_3.wait_dbs_loaded().await;
target_3.wait_server_initialized().await;
router_mgmt
.update_remote(TEST_REMOTE_ID_3, target_3.grpc_base())
@ -378,7 +378,7 @@ async fn test_write_routed_errors() {
.update_server_id(TEST_ROUTER_ID)
.await
.expect("set ID failed");
router.wait_dbs_loaded().await;
router.wait_server_initialized().await;
let db_name = rand_name();
create_readable_database(&db_name, router.grpc_channel()).await;
@ -454,7 +454,7 @@ async fn test_write_routed_no_shard() {
.update_server_id(TEST_ROUTER_ID)
.await
.expect("set ID failed");
router.wait_dbs_loaded().await;
router.wait_server_initialized().await;
let target_1 = ServerFixture::create_single_use().await;
let mut target_1_mgmt = target_1.management_client();
@ -462,7 +462,7 @@ async fn test_write_routed_no_shard() {
.update_server_id(TEST_TARGET_ID_1)
.await
.expect("set ID failed");
target_1.wait_dbs_loaded().await;
target_1.wait_server_initialized().await;
router_mgmt
.update_remote(TEST_REMOTE_ID_1, target_1.grpc_base())
@ -475,7 +475,7 @@ async fn test_write_routed_no_shard() {
.update_server_id(TEST_TARGET_ID_2)
.await
.expect("set ID failed");
target_2.wait_dbs_loaded().await;
target_2.wait_server_initialized().await;
router_mgmt
.update_remote(TEST_REMOTE_ID_2, target_2.grpc_base())
@ -488,7 +488,7 @@ async fn test_write_routed_no_shard() {
.update_server_id(TEST_TARGET_ID_3)
.await
.expect("set ID failed");
target_3.wait_dbs_loaded().await;
target_3.wait_server_initialized().await;
router_mgmt
.update_remote(TEST_REMOTE_ID_3, target_3.grpc_base())