diff --git a/server/src/config.rs b/server/src/config.rs index 34a5c969fa..9015f1946e 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -67,7 +67,7 @@ impl Config { db_name: DatabaseName<'static>, ) -> Result> { let mut state = self.state.write().expect("mutex poisoned"); - if state.reservations.contains(&db_name) || state.databases.contains_key(&db_name) { + if state.reservations.contains(&db_name) { return Err(Error::DatabaseAlreadyExists { db_name: db_name.to_string(), }); @@ -87,7 +87,7 @@ impl Config { pub(crate) fn db_names_sorted(&self) -> Vec> { let state = self.state.read().expect("mutex poisoned"); - state.databases.keys().cloned().collect() + state.reservations.iter().cloned().collect() } pub(crate) fn update_db_rules( @@ -144,10 +144,7 @@ impl Config { preserved_catalog: PreservedCatalog, ) { let mut state = self.state.write().expect("mutex poisoned"); - let name = state - .reservations - .take(&rules.name) - .expect("reservation doesn't exist"); + let name = rules.name.clone(); if self.shutdown.is_cancelled() { error!("server is shutting down"); @@ -335,6 +332,7 @@ impl<'a> CreateDatabaseHandle<'a> { ) -> Result<()> { let db_name = self.db_name.take().expect("not committed"); if db_name != rules.name { + self.config.rollback(&db_name); return Err(Error::RulesDatabaseNameMismatch { actual: rules.name.to_string(), expected: db_name.to_string(), @@ -346,6 +344,10 @@ impl<'a> CreateDatabaseHandle<'a> { Ok(()) } + + pub(crate) fn abort_without_rollback(mut self) { + self.db_name.take(); + } } impl<'a> Drop for CreateDatabaseHandle<'a> { diff --git a/server/src/init.rs b/server/src/init.rs index 2a86803983..945ba8a32e 100644 --- a/server/src/init.rs +++ b/server/src/init.rs @@ -1,25 +1,30 @@ //! Routines to initialize a server. -use data_types::{server_id::ServerId, DatabaseName}; +use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName}; use futures::TryStreamExt; use generated_types::database_rules::decode_database_rules; use internal_types::once::OnceNonZeroU32; +use metrics::MetricRegistry; use object_store::{ path::{parsed::DirsAndFileName, ObjectStorePath, Path}, ObjectStore, ObjectStoreApi, }; use observability_deps::tracing::{debug, error, info, warn}; use parking_lot::Mutex; +use parquet_file::catalog::PreservedCatalog; use query::exec::Executor; use snafu::{OptionExt, ResultExt, Snafu}; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, +use std::{ + collections::HashMap, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, }; use tokio::sync::Semaphore; use crate::{ config::{Config, DB_RULES_FILE_NAME}, - db::load_or_create_preserved_catalog, + db::{catalog::Catalog, load_or_create_preserved_catalog}, DatabaseError, }; @@ -94,6 +99,9 @@ pub struct InitStatus { /// Error occurred during generic server init (e.g. listing store content). error_generic: Mutex>>, + + /// Errors that occurred during some DB init. + errors_databases: Arc>>>, } impl InitStatus { @@ -105,6 +113,7 @@ impl InitStatus { // Always set semaphore permits to `1`, see design comments in `Server::initialize_semaphore`. initialize_semaphore: Semaphore::new(1), error_generic: Default::default(), + errors_databases: Default::default(), } } @@ -129,6 +138,20 @@ impl InitStatus { guard.clone() } + /// List all databases with errors in sorted order. + pub fn databases_with_errors(&self) -> Vec { + let guard = self.errors_databases.lock(); + let mut names: Vec<_> = guard.keys().cloned().collect(); + names.sort(); + names + } + + /// Error that occurred during initialization of a specific database. + pub fn error_database(&self, db_name: &str) -> Option> { + let guard = self.errors_databases.lock(); + guard.get(db_name).cloned() + } + /// Loads the database configurations based on the databases in the /// object store. Any databases in the config already won't be /// replaced. @@ -203,20 +226,39 @@ impl InitStatus { let handles: Vec<_> = list_result .common_prefixes .into_iter() - .map(|mut path| { + .filter_map(|mut path| { let store = Arc::clone(&store); let config = Arc::clone(&config); let exec = Arc::clone(&exec); + let errors_databases = Arc::clone(&self.errors_databases); path.set_file_name(DB_RULES_FILE_NAME); - tokio::task::spawn(async move { - if let Err(e) = - Self::load_database_config(server_id, store, config, exec, path).await - { - error!(%e, "cannot load database"); + match db_name_from_rules_path(&path) { + Ok(db_name) => { + let handle = tokio::task::spawn(async move { + if let Err(e) = Self::load_database_config( + server_id, + store, + config, + exec, + path, + db_name.clone(), + ) + .await + { + error!(%e, "cannot load database"); + let mut guard = errors_databases.lock(); + guard.insert(db_name.to_string(), Arc::new(e)); + } + }); + Some(handle) } - }) + Err(e) => { + error!(%e, "invalid database path"); + None + } + } }) .collect(); @@ -231,28 +273,59 @@ impl InitStatus { config: Arc, exec: Arc, path: Path, + db_name: DatabaseName<'static>, ) -> Result<()> { - // Parse DB name from path before doing anything else, so we can already reserve the DB name. That way the name - // stays reserved even when we cannot decode the rules file (e.g. due to a broken IO). - let path_parsed: DirsAndFileName = path.clone().into(); - let db_name = path_parsed - .directories - .last() - .map(|part| part.encoded().to_string()) - .unwrap_or_else(String::new); - let db_name = DatabaseName::new(db_name).context(DatabaseNameError)?; + // Reserve name before expensive IO (e.g. loading the preserved catalog) let handle = config .create_db(db_name) .map_err(Box::new) .context(CreateDbError)?; + let metrics_registry = config.metrics_registry(); + + match Self::load_database_config_with_handle( + server_id, + Arc::clone(&store), + metrics_registry, + path, + ) + .await + { + Ok(Some((rules, preserved_catalog))) => { + // successfully loaded + handle + .commit(server_id, store, exec, preserved_catalog, rules) + .map_err(Box::new) + .context(CreateDbError)?; + + Ok(()) + } + Ok(None) => { + // no DB there, drop handle to initiate rollback + drop(handle); + Ok(()) + } + Err(e) => { + // abort transaction but keep DB registered + handle.abort_without_rollback(); + Err(e) + } + } + } + + async fn load_database_config_with_handle( + server_id: ServerId, + store: Arc, + metrics_registry: Arc, + path: Path, + ) -> Result)>> { let serialized_rules = loop { match get_database_config_bytes(&path, &store).await { Ok(data) => break data, Err(e) => { if let Error::NoDatabaseConfigError { location } = &e { warn!(?location, "{}", e); - return Ok(()); + return Ok(None); } error!( "error getting database config {:?} from object store: {}", @@ -270,18 +343,13 @@ impl InitStatus { rules.db_name(), Arc::clone(&store), server_id, - config.metrics_registry(), + metrics_registry, ) .await .map_err(|e| Box::new(e) as _) .context(CatalogLoadError)?; - handle - .commit(server_id, store, exec, preserved_catalog, rules) - .map_err(Box::new) - .context(CreateDbError)?; - - Ok(()) + Ok(Some((rules, preserved_catalog))) } } @@ -321,6 +389,17 @@ async fn get_database_config_bytes( get_store_bytes(location, store).await } +/// Helper to extract the DB name from the rules file path. +fn db_name_from_rules_path(path: &Path) -> Result> { + let path_parsed: DirsAndFileName = path.clone().into(); + let db_name = path_parsed + .directories + .last() + .map(|part| part.encoded().to_string()) + .unwrap_or_else(String::new); + DatabaseName::new(db_name).context(DatabaseNameError) +} + #[cfg(test)] mod tests { use object_store::{memory::InMemory, path::ObjectStorePath}; diff --git a/server/src/lib.rs b/server/src/lib.rs index 2082c5cb73..593e140b0b 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -449,6 +449,16 @@ where self.init_status.error_generic() } + /// List all databases with errors in sorted order. + pub fn databases_with_errors(&self) -> Vec { + self.init_status.databases_with_errors() + } + + /// Error that occurred during initialization of a specific database. + pub fn error_database(&self, db_name: &str) -> Option> { + self.init_status.error_database(db_name) + } + /// Require that server is loaded. Databases are loaded and server is ready to read/write. fn require_initialized(&self) -> Result { // since a server ID is the pre-requirement for init, check this first @@ -983,10 +993,12 @@ mod tests { use std::{ collections::BTreeMap, convert::TryFrom, + sync::Arc, time::{Duration, Instant}, }; use async_trait::async_trait; + use bytes::Bytes; use futures::TryStreamExt; use generated_types::database_rules::decode_database_rules; use snafu::Snafu; @@ -1734,10 +1746,89 @@ mod tests { let manager = TestConnectionManager::new(); let config = config_with_store(store); - let server = Arc::new(Server::new(manager, config)); + let server = Server::new(manager, config); server.set_id(ServerId::try_from(1).unwrap()).unwrap(); server.maybe_initialize_server().await; assert!(dbg!(server.error_generic().unwrap().to_string()).starts_with("store error:")); } + + #[tokio::test] + async fn init_error_database() { + let store = ObjectStore::new_in_memory(InMemory::new()); + let server_id = ServerId::try_from(1).unwrap(); + + // Create temporary server to create single database + let manager = TestConnectionManager::new(); + let config = config_with_store(store); + let store = config.store(); + + let server = Server::new(manager, config); + server.set_id(server_id).unwrap(); + server.maybe_initialize_server().await; + + create_simple_database(&server, "foo") + .await + .expect("failed to create database"); + let root = server.init_status.root_path(&store).unwrap(); + server.config.drain().await; + drop(server); + + // tamper store + let path = object_store_path_for_database_config(&root, &DatabaseName::new("bar").unwrap()); + let data = Bytes::from("x"); + let len = data.len(); + store + .put( + &path, + futures::stream::once(async move { Ok(data) }), + Some(len), + ) + .await + .unwrap(); + + // start server + let store = Arc::try_unwrap(store).unwrap(); + store.get(&path).await.unwrap(); + let manager = TestConnectionManager::new(); + let config = config_with_store(store); + + let server = Server::new(manager, config); + server.set_id(server_id).unwrap(); + server.maybe_initialize_server().await; + + // generic error MUST NOT be set + assert!(server.error_generic().is_none()); + + // server is initialized + assert!(server.initialized()); + + // DB-specific error is set for `bar` but not for `foo` + assert_eq!(server.databases_with_errors(), vec!["bar".to_string()]); + assert!(dbg!(server.error_database("foo")).is_none()); + assert!(dbg!(server.error_database("bar").unwrap().to_string()) + .starts_with("error deserializing database rules from protobuf:")); + + // DB names contain all DBs + assert_eq!( + server.db_names_sorted(), + vec!["bar".to_string(), "foo".to_string()] + ); + + // can only write to successfully created DBs + let lines = parsed_lines("cpu foo=1 10"); + server + .write_lines("foo", &lines, ARBITRARY_DEFAULT_TIME) + .await + .unwrap(); + let err = server + .write_lines("bar", &lines, ARBITRARY_DEFAULT_TIME) + .await + .unwrap_err(); + assert_eq!(err.to_string(), "database not found: bar"); + + // creating failed DBs does not work + let err = create_simple_database(&server, "bar").await.unwrap_err(); + assert_eq!(err.to_string(), "database already exists"); + } }