feat: allow to fail initializing a single DB
- keep errors encountered during DB init - treat failed DB inits as existing DBs - effectively poison failed DBs (there is no way to recover except by restarting the server, yet)pull/24376/head
parent
0b5552f131
commit
2ea24b6467
|
@ -67,7 +67,7 @@ impl Config {
|
|||
db_name: DatabaseName<'static>,
|
||||
) -> Result<CreateDatabaseHandle<'_>> {
|
||||
let mut state = self.state.write().expect("mutex poisoned");
|
||||
if state.reservations.contains(&db_name) || state.databases.contains_key(&db_name) {
|
||||
if state.reservations.contains(&db_name) {
|
||||
return Err(Error::DatabaseAlreadyExists {
|
||||
db_name: db_name.to_string(),
|
||||
});
|
||||
|
@ -87,7 +87,7 @@ impl Config {
|
|||
|
||||
pub(crate) fn db_names_sorted(&self) -> Vec<DatabaseName<'static>> {
|
||||
let state = self.state.read().expect("mutex poisoned");
|
||||
state.databases.keys().cloned().collect()
|
||||
state.reservations.iter().cloned().collect()
|
||||
}
|
||||
|
||||
pub(crate) fn update_db_rules<F, E>(
|
||||
|
@ -144,10 +144,7 @@ impl Config {
|
|||
preserved_catalog: PreservedCatalog<Catalog>,
|
||||
) {
|
||||
let mut state = self.state.write().expect("mutex poisoned");
|
||||
let name = state
|
||||
.reservations
|
||||
.take(&rules.name)
|
||||
.expect("reservation doesn't exist");
|
||||
let name = rules.name.clone();
|
||||
|
||||
if self.shutdown.is_cancelled() {
|
||||
error!("server is shutting down");
|
||||
|
@ -335,6 +332,7 @@ impl<'a> CreateDatabaseHandle<'a> {
|
|||
) -> Result<()> {
|
||||
let db_name = self.db_name.take().expect("not committed");
|
||||
if db_name != rules.name {
|
||||
self.config.rollback(&db_name);
|
||||
return Err(Error::RulesDatabaseNameMismatch {
|
||||
actual: rules.name.to_string(),
|
||||
expected: db_name.to_string(),
|
||||
|
@ -346,6 +344,10 @@ impl<'a> CreateDatabaseHandle<'a> {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn abort_without_rollback(mut self) {
|
||||
self.db_name.take();
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Drop for CreateDatabaseHandle<'a> {
|
||||
|
|
|
@ -1,25 +1,30 @@
|
|||
//! Routines to initialize a server.
|
||||
use data_types::{server_id::ServerId, DatabaseName};
|
||||
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
|
||||
use futures::TryStreamExt;
|
||||
use generated_types::database_rules::decode_database_rules;
|
||||
use internal_types::once::OnceNonZeroU32;
|
||||
use metrics::MetricRegistry;
|
||||
use object_store::{
|
||||
path::{parsed::DirsAndFileName, ObjectStorePath, Path},
|
||||
ObjectStore, ObjectStoreApi,
|
||||
};
|
||||
use observability_deps::tracing::{debug, error, info, warn};
|
||||
use parking_lot::Mutex;
|
||||
use parquet_file::catalog::PreservedCatalog;
|
||||
use query::exec::Executor;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use std::sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
Arc,
|
||||
},
|
||||
};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::{
|
||||
config::{Config, DB_RULES_FILE_NAME},
|
||||
db::load_or_create_preserved_catalog,
|
||||
db::{catalog::Catalog, load_or_create_preserved_catalog},
|
||||
DatabaseError,
|
||||
};
|
||||
|
||||
|
@ -94,6 +99,9 @@ pub struct InitStatus {
|
|||
|
||||
/// Error occurred during generic server init (e.g. listing store content).
|
||||
error_generic: Mutex<Option<Arc<Error>>>,
|
||||
|
||||
/// Errors that occurred during some DB init.
|
||||
errors_databases: Arc<Mutex<HashMap<String, Arc<Error>>>>,
|
||||
}
|
||||
|
||||
impl InitStatus {
|
||||
|
@ -105,6 +113,7 @@ impl InitStatus {
|
|||
// Always set semaphore permits to `1`, see design comments in `Server::initialize_semaphore`.
|
||||
initialize_semaphore: Semaphore::new(1),
|
||||
error_generic: Default::default(),
|
||||
errors_databases: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -129,6 +138,20 @@ impl InitStatus {
|
|||
guard.clone()
|
||||
}
|
||||
|
||||
/// List all databases with errors in sorted order.
|
||||
pub fn databases_with_errors(&self) -> Vec<String> {
|
||||
let guard = self.errors_databases.lock();
|
||||
let mut names: Vec<_> = guard.keys().cloned().collect();
|
||||
names.sort();
|
||||
names
|
||||
}
|
||||
|
||||
/// Error that occurred during initialization of a specific database.
|
||||
pub fn error_database(&self, db_name: &str) -> Option<Arc<Error>> {
|
||||
let guard = self.errors_databases.lock();
|
||||
guard.get(db_name).cloned()
|
||||
}
|
||||
|
||||
/// Loads the database configurations based on the databases in the
|
||||
/// object store. Any databases in the config already won't be
|
||||
/// replaced.
|
||||
|
@ -203,20 +226,39 @@ impl InitStatus {
|
|||
let handles: Vec<_> = list_result
|
||||
.common_prefixes
|
||||
.into_iter()
|
||||
.map(|mut path| {
|
||||
.filter_map(|mut path| {
|
||||
let store = Arc::clone(&store);
|
||||
let config = Arc::clone(&config);
|
||||
let exec = Arc::clone(&exec);
|
||||
let errors_databases = Arc::clone(&self.errors_databases);
|
||||
|
||||
path.set_file_name(DB_RULES_FILE_NAME);
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
if let Err(e) =
|
||||
Self::load_database_config(server_id, store, config, exec, path).await
|
||||
{
|
||||
error!(%e, "cannot load database");
|
||||
match db_name_from_rules_path(&path) {
|
||||
Ok(db_name) => {
|
||||
let handle = tokio::task::spawn(async move {
|
||||
if let Err(e) = Self::load_database_config(
|
||||
server_id,
|
||||
store,
|
||||
config,
|
||||
exec,
|
||||
path,
|
||||
db_name.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!(%e, "cannot load database");
|
||||
let mut guard = errors_databases.lock();
|
||||
guard.insert(db_name.to_string(), Arc::new(e));
|
||||
}
|
||||
});
|
||||
Some(handle)
|
||||
}
|
||||
})
|
||||
Err(e) => {
|
||||
error!(%e, "invalid database path");
|
||||
None
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
|
@ -231,28 +273,59 @@ impl InitStatus {
|
|||
config: Arc<Config>,
|
||||
exec: Arc<Executor>,
|
||||
path: Path,
|
||||
db_name: DatabaseName<'static>,
|
||||
) -> Result<()> {
|
||||
// Parse DB name from path before doing anything else, so we can already reserve the DB name. That way the name
|
||||
// stays reserved even when we cannot decode the rules file (e.g. due to a broken IO).
|
||||
let path_parsed: DirsAndFileName = path.clone().into();
|
||||
let db_name = path_parsed
|
||||
.directories
|
||||
.last()
|
||||
.map(|part| part.encoded().to_string())
|
||||
.unwrap_or_else(String::new);
|
||||
let db_name = DatabaseName::new(db_name).context(DatabaseNameError)?;
|
||||
// Reserve name before expensive IO (e.g. loading the preserved catalog)
|
||||
let handle = config
|
||||
.create_db(db_name)
|
||||
.map_err(Box::new)
|
||||
.context(CreateDbError)?;
|
||||
|
||||
let metrics_registry = config.metrics_registry();
|
||||
|
||||
match Self::load_database_config_with_handle(
|
||||
server_id,
|
||||
Arc::clone(&store),
|
||||
metrics_registry,
|
||||
path,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some((rules, preserved_catalog))) => {
|
||||
// successfully loaded
|
||||
handle
|
||||
.commit(server_id, store, exec, preserved_catalog, rules)
|
||||
.map_err(Box::new)
|
||||
.context(CreateDbError)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Ok(None) => {
|
||||
// no DB there, drop handle to initiate rollback
|
||||
drop(handle);
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
// abort transaction but keep DB registered
|
||||
handle.abort_without_rollback();
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn load_database_config_with_handle(
|
||||
server_id: ServerId,
|
||||
store: Arc<ObjectStore>,
|
||||
metrics_registry: Arc<MetricRegistry>,
|
||||
path: Path,
|
||||
) -> Result<Option<(DatabaseRules, PreservedCatalog<Catalog>)>> {
|
||||
let serialized_rules = loop {
|
||||
match get_database_config_bytes(&path, &store).await {
|
||||
Ok(data) => break data,
|
||||
Err(e) => {
|
||||
if let Error::NoDatabaseConfigError { location } = &e {
|
||||
warn!(?location, "{}", e);
|
||||
return Ok(());
|
||||
return Ok(None);
|
||||
}
|
||||
error!(
|
||||
"error getting database config {:?} from object store: {}",
|
||||
|
@ -270,18 +343,13 @@ impl InitStatus {
|
|||
rules.db_name(),
|
||||
Arc::clone(&store),
|
||||
server_id,
|
||||
config.metrics_registry(),
|
||||
metrics_registry,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(CatalogLoadError)?;
|
||||
|
||||
handle
|
||||
.commit(server_id, store, exec, preserved_catalog, rules)
|
||||
.map_err(Box::new)
|
||||
.context(CreateDbError)?;
|
||||
|
||||
Ok(())
|
||||
Ok(Some((rules, preserved_catalog)))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -321,6 +389,17 @@ async fn get_database_config_bytes(
|
|||
get_store_bytes(location, store).await
|
||||
}
|
||||
|
||||
/// Helper to extract the DB name from the rules file path.
|
||||
fn db_name_from_rules_path(path: &Path) -> Result<DatabaseName<'static>> {
|
||||
let path_parsed: DirsAndFileName = path.clone().into();
|
||||
let db_name = path_parsed
|
||||
.directories
|
||||
.last()
|
||||
.map(|part| part.encoded().to_string())
|
||||
.unwrap_or_else(String::new);
|
||||
DatabaseName::new(db_name).context(DatabaseNameError)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use object_store::{memory::InMemory, path::ObjectStorePath};
|
||||
|
|
|
@ -449,6 +449,16 @@ where
|
|||
self.init_status.error_generic()
|
||||
}
|
||||
|
||||
/// List all databases with errors in sorted order.
|
||||
pub fn databases_with_errors(&self) -> Vec<String> {
|
||||
self.init_status.databases_with_errors()
|
||||
}
|
||||
|
||||
/// Error that occurred during initialization of a specific database.
|
||||
pub fn error_database(&self, db_name: &str) -> Option<Arc<crate::init::Error>> {
|
||||
self.init_status.error_database(db_name)
|
||||
}
|
||||
|
||||
/// Require that server is loaded. Databases are loaded and server is ready to read/write.
|
||||
fn require_initialized(&self) -> Result<ServerId> {
|
||||
// since a server ID is the pre-requirement for init, check this first
|
||||
|
@ -983,10 +993,12 @@ mod tests {
|
|||
use std::{
|
||||
collections::BTreeMap,
|
||||
convert::TryFrom,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures::TryStreamExt;
|
||||
use generated_types::database_rules::decode_database_rules;
|
||||
use snafu::Snafu;
|
||||
|
@ -1734,10 +1746,89 @@ mod tests {
|
|||
|
||||
let manager = TestConnectionManager::new();
|
||||
let config = config_with_store(store);
|
||||
let server = Arc::new(Server::new(manager, config));
|
||||
let server = Server::new(manager, config);
|
||||
|
||||
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
server.maybe_initialize_server().await;
|
||||
assert!(dbg!(server.error_generic().unwrap().to_string()).starts_with("store error:"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn init_error_database() {
|
||||
let store = ObjectStore::new_in_memory(InMemory::new());
|
||||
let server_id = ServerId::try_from(1).unwrap();
|
||||
|
||||
// Create temporary server to create single database
|
||||
let manager = TestConnectionManager::new();
|
||||
let config = config_with_store(store);
|
||||
let store = config.store();
|
||||
|
||||
let server = Server::new(manager, config);
|
||||
server.set_id(server_id).unwrap();
|
||||
server.maybe_initialize_server().await;
|
||||
|
||||
create_simple_database(&server, "foo")
|
||||
.await
|
||||
.expect("failed to create database");
|
||||
let root = server.init_status.root_path(&store).unwrap();
|
||||
server.config.drain().await;
|
||||
drop(server);
|
||||
|
||||
// tamper store
|
||||
let path = object_store_path_for_database_config(&root, &DatabaseName::new("bar").unwrap());
|
||||
let data = Bytes::from("x");
|
||||
let len = data.len();
|
||||
store
|
||||
.put(
|
||||
&path,
|
||||
futures::stream::once(async move { Ok(data) }),
|
||||
Some(len),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// start server
|
||||
let store = Arc::try_unwrap(store).unwrap();
|
||||
store.get(&path).await.unwrap();
|
||||
let manager = TestConnectionManager::new();
|
||||
let config = config_with_store(store);
|
||||
|
||||
let server = Server::new(manager, config);
|
||||
server.set_id(server_id).unwrap();
|
||||
server.maybe_initialize_server().await;
|
||||
|
||||
// generic error MUST NOT be set
|
||||
assert!(server.error_generic().is_none());
|
||||
|
||||
// server is initialized
|
||||
assert!(server.initialized());
|
||||
|
||||
// DB-specific error is set for `bar` but not for `foo`
|
||||
assert_eq!(server.databases_with_errors(), vec!["bar".to_string()]);
|
||||
assert!(dbg!(server.error_database("foo")).is_none());
|
||||
assert!(dbg!(server.error_database("bar").unwrap().to_string())
|
||||
.starts_with("error deserializing database rules from protobuf:"));
|
||||
|
||||
// DB names contain all DBs
|
||||
assert_eq!(
|
||||
server.db_names_sorted(),
|
||||
vec!["bar".to_string(), "foo".to_string()]
|
||||
);
|
||||
|
||||
// can only write to successfully created DBs
|
||||
let lines = parsed_lines("cpu foo=1 10");
|
||||
server
|
||||
.write_lines("foo", &lines, ARBITRARY_DEFAULT_TIME)
|
||||
.await
|
||||
.unwrap();
|
||||
let err = server
|
||||
.write_lines("bar", &lines, ARBITRARY_DEFAULT_TIME)
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert_eq!(err.to_string(), "database not found: bar");
|
||||
|
||||
// creating failed DBs does not work
|
||||
let err = create_simple_database(&server, "bar").await.unwrap_err();
|
||||
assert_eq!(err.to_string(), "database already exists");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue