feat: expose DB state in gRPC interface

pull/24376/head
Marco Neumann 2021-07-02 11:24:36 +02:00
parent 8386b1528e
commit 54fbb60740
10 changed files with 127 additions and 34 deletions

View File

@ -0,0 +1,18 @@
/// Simple representation of the state a database can be in.
///
/// The state machine is a simple linear state machine:
///
/// ```text
/// Known -> RulesLoaded -> Initialized
/// ```
#[derive(Debug, PartialEq, Eq)]
pub enum DatabaseStateCode {
/// Database is known but nothing is loaded.
Known,
/// Rules are loaded
RulesLoaded,
/// Fully initialized database.
Initialized,
}

View File

@ -15,6 +15,7 @@ pub mod consistent_hasher;
mod database_name;
pub use database_name::*;
pub mod database_rules;
pub mod database_state;
pub mod error;
pub mod job;
pub mod names;

View File

@ -266,6 +266,26 @@ message DatabaseStatus {
// If present, the database reports an error condition.
Error error = 2;
// Current initialization state of the database.
enum DatabaseState {
DATABASE_STATE_UNSPECIFIED = 0;
// Database is known but nothing is loaded.
DATABASE_STATE_KNOWN = 1;
// Rules are loaded
DATABASE_STATE_RULES_LOADED = 2;
// Here will be the "preserved catalog loaded but replay is pending" state
reserved 3;
// Fully initialized database.
DATABASE_STATE_INITIALIZED = 4;
}
// Current initialization state of the database.
DatabaseState state = 3;
}
message Error {

View File

@ -0,0 +1,21 @@
use crate::influxdata::iox::management::v1 as management;
use data_types::database_state::DatabaseStateCode;
impl From<DatabaseStateCode> for management::database_status::DatabaseState {
fn from(state_code: DatabaseStateCode) -> Self {
match state_code {
DatabaseStateCode::Known => Self::Known,
DatabaseStateCode::RulesLoaded => Self::RulesLoaded,
DatabaseStateCode::Initialized => Self::Initialized,
}
}
}
impl From<Option<DatabaseStateCode>> for management::database_status::DatabaseState {
fn from(state_code: Option<DatabaseStateCode>) -> Self {
match state_code {
Some(state_code) => state_code.into(),
None => Self::Unspecified,
}
}
}

View File

@ -129,6 +129,7 @@ pub use influxdata::platform::storage::*;
pub mod chunk;
pub mod database_rules;
pub mod database_state;
pub mod google;
pub mod job;

View File

@ -3,7 +3,10 @@ use std::{
sync::{Arc, RwLock},
};
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
use data_types::{
database_rules::DatabaseRules, database_state::DatabaseStateCode, server_id::ServerId,
DatabaseName,
};
use metrics::MetricRegistry;
use object_store::{path::ObjectStorePath, ObjectStore};
use parquet_file::catalog::PreservedCatalog;
@ -194,6 +197,12 @@ impl Config {
.unwrap_or(false)
}
/// Current database init state
pub(crate) fn db_state(&self, name: &DatabaseName<'_>) -> Option<DatabaseStateCode> {
let state = self.state.read().expect("mutex poisoned");
state.databases.get(name).map(|db_state| db_state.code())
}
/// Get all database names in all states (blocked, uninitialized, fully initialized).
pub(crate) fn db_names_sorted(&self) -> Vec<DatabaseName<'static>> {
let state = self.state.read().expect("mutex poisoned");
@ -465,25 +474,6 @@ impl Drop for DatabaseState {
}
}
/// Simple representation of the state a database can be in.
///
/// The state machine is a simple linear state machine:
///
/// ```text
/// Known -> RulesLoaded -> Initialized
/// ```
#[derive(Debug, PartialEq, Eq)]
pub enum DatabaseStateCode {
/// Database is known but nothing is loaded.
Known,
/// Rules are loaded
RulesLoaded,
/// Fully initialized database.
Initialized,
}
/// This handle is returned when a call is made to [`create_db`](Config::create_db) or
/// [`recover_db`](Config::recover_db) on the Config struct. The handle can be used to hold a reservation for the
/// database name. Calling `commit` on the handle will consume the struct and move the database from reserved to being

View File

@ -1,5 +1,8 @@
//! Routines to initialize a server.
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
use data_types::{
database_rules::DatabaseRules, database_state::DatabaseStateCode, server_id::ServerId,
DatabaseName,
};
use futures::TryStreamExt;
use generated_types::database_rules::decode_database_rules;
use internal_types::once::OnceNonZeroU32;
@ -22,10 +25,7 @@ use std::{
use tokio::sync::Semaphore;
use crate::{
config::{
object_store_path_for_database_config, Config, DatabaseHandle, DatabaseStateCode,
DB_RULES_FILE_NAME,
},
config::{object_store_path_for_database_config, Config, DatabaseHandle, DB_RULES_FILE_NAME},
db::load::load_or_create_preserved_catalog,
write_buffer, DatabaseError,
};

View File

@ -73,7 +73,6 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::BytesMut;
use config::DatabaseStateCode;
use db::load::create_preserved_catalog;
use init::InitStatus;
use observability_deps::tracing::{debug, info, warn};
@ -81,7 +80,8 @@ use parking_lot::Mutex;
use snafu::{OptionExt, ResultExt, Snafu};
use data_types::{
database_rules::DatabaseRules,
database_rules::{DatabaseRules, NodeGroup, RoutingRules, Shard, ShardConfig, ShardId},
database_state::DatabaseStateCode,
job::Job,
server_id::ServerId,
{DatabaseName, DatabaseNameError},
@ -97,7 +97,6 @@ use tracker::{TaskId, TaskRegistration, TaskRegistryWithHistory, TaskTracker, Tr
pub use crate::config::RemoteTemplate;
use crate::config::{object_store_path_for_database_config, Config, GRpcConnectionString};
use cache_loader_async::cache_api::LoadingCache;
use data_types::database_rules::{NodeGroup, RoutingRules, Shard, ShardConfig, ShardId};
pub use db::Db;
use generated_types::database_rules::encode_database_rules;
use influxdb_iox_client::{connection::Builder, write};
@ -489,6 +488,15 @@ where
self.init_status.error_database(db_name)
}
/// Current database init state.
pub fn database_state(&self, name: &str) -> Option<DatabaseStateCode> {
if let Ok(name) = DatabaseName::new(name) {
self.config.db_state(&name)
} else {
None
}
}
/// Require that server is loaded. Databases are loaded and server is ready to read/write.
fn require_initialized(&self) -> Result<ServerId> {
// since a server ID is the pre-requirement for init, check this first

View File

@ -387,7 +387,14 @@ where
message: e.to_string(),
});
DatabaseStatus { db_name, error }
let state: database_status::DatabaseState =
self.server.database_state(&db_name).into();
DatabaseStatus {
db_name,
error,
state: state.into(),
}
})
.collect()
} else {

View File

@ -1,8 +1,10 @@
use std::{collections::HashSet, fs::set_permissions, os::unix::fs::PermissionsExt};
use std::{fs::set_permissions, os::unix::fs::PermissionsExt};
use generated_types::{
google::protobuf::{Duration, Empty},
influxdata::iox::management::v1::{database_rules::RoutingRules, *},
influxdata::iox::management::v1::{
database_rules::RoutingRules, database_status::DatabaseState, *,
},
};
use influxdb_iox_client::{management::CreateDatabaseError, operations, write::WriteError};
@ -862,14 +864,34 @@ async fn test_get_server_status_ok() {
.expect("create database failed");
// databases are listed
// output is sorted by db name
let (db_name1, db_name2) = if db_name1 < db_name2 {
(db_name1, db_name2)
} else {
(db_name2, db_name1)
};
let status = client.get_server_status().await.unwrap();
let names_actual: HashSet<_> = status
let names: Vec<_> = status
.database_statuses
.iter()
.map(|db_status| db_status.db_name.clone())
.collect();
let names_expected: HashSet<_> = [db_name1, db_name2].iter().cloned().collect();
assert_eq!(names_actual, names_expected);
let errors: Vec<_> = status
.database_statuses
.iter()
.map(|db_status| db_status.error.clone())
.collect();
let states: Vec<_> = status
.database_statuses
.iter()
.map(|db_status| DatabaseState::from_i32(db_status.state).unwrap())
.collect();
assert_eq!(names, vec![db_name1, db_name2]);
assert_eq!(errors, vec![None, None]);
assert_eq!(
states,
vec![DatabaseState::Initialized, DatabaseState::Initialized]
);
}
#[tokio::test]
@ -894,6 +916,7 @@ async fn test_get_server_status_global_error() {
let status = client.get_server_status().await.unwrap();
if let Some(err) = status.error {
assert!(dbg!(err.message).starts_with("store error:"));
assert!(status.database_statuses.is_empty());
return;
}
@ -930,4 +953,8 @@ async fn test_get_server_status_db_error() {
assert_eq!(db_status.db_name, "my_db");
assert!(dbg!(&db_status.error.as_ref().unwrap().message)
.starts_with("error deserializing database rules from protobuf:"));
assert_eq!(
DatabaseState::from_i32(db_status.state).unwrap(),
DatabaseState::Known
);
}