diff --git a/data_types/src/database_state.rs b/data_types/src/database_state.rs index de05189805..47ec36b074 100644 --- a/data_types/src/database_state.rs +++ b/data_types/src/database_state.rs @@ -4,6 +4,9 @@ pub enum DatabaseStateCode { /// Database is known but nothing is loaded. Known, + /// Database object storage has been found + ObjectStoreFound, + /// Rules are loaded RulesLoaded, @@ -28,6 +31,7 @@ impl DatabaseStateCode { pub fn description(&self) -> &'static str { match self { DatabaseStateCode::Known => "Known", + DatabaseStateCode::ObjectStoreFound => "ObjectStoreFound", DatabaseStateCode::RulesLoaded => "RulesLoaded", DatabaseStateCode::CatalogLoaded => "CatalogLoaded", DatabaseStateCode::RulesLoadError => "RulesLoadError", diff --git a/generated_types/protos/influxdata/iox/management/v1/service.proto b/generated_types/protos/influxdata/iox/management/v1/service.proto index a57d3b5274..7e4dc4c5ed 100644 --- a/generated_types/protos/influxdata/iox/management/v1/service.proto +++ b/generated_types/protos/influxdata/iox/management/v1/service.proto @@ -301,6 +301,9 @@ message DatabaseStatus { // Database is known but nothing is loaded. DATABASE_STATE_KNOWN = 1; + // Database object storage has been found + DATABASE_STATE_OBJECT_STORE_FOUND = 8; + // Rules are loaded DATABASE_STATE_RULES_LOADED = 2; diff --git a/generated_types/src/database_state.rs b/generated_types/src/database_state.rs index 8f5393a732..60591f0180 100644 --- a/generated_types/src/database_state.rs +++ b/generated_types/src/database_state.rs @@ -5,6 +5,7 @@ impl From for management::database_status::DatabaseState { fn from(state_code: DatabaseStateCode) -> Self { match state_code { DatabaseStateCode::Known => Self::Known, + DatabaseStateCode::ObjectStoreFound => Self::ObjectStoreFound, DatabaseStateCode::RulesLoaded => Self::RulesLoaded, DatabaseStateCode::CatalogLoaded => Self::CatalogLoaded, DatabaseStateCode::Initialized => Self::Initialized, diff --git a/iox_object_store/src/lib.rs b/iox_object_store/src/lib.rs index 732c7d537d..b9318d82ab 100644 --- a/iox_object_store/src/lib.rs +++ b/iox_object_store/src/lib.rs @@ -24,7 +24,7 @@ use object_store::{ path::{parsed::DirsAndFileName, ObjectStorePath, Path}, ObjectStore, ObjectStoreApi, Result, }; -use snafu::{ensure, OptionExt, ResultExt, Snafu}; +use snafu::{ensure, ResultExt, Snafu}; use std::{io, sync::Arc}; use tokio::sync::mpsc::channel; use tokio_stream::wrappers::ReceiverStream; @@ -51,9 +51,6 @@ pub enum IoxObjectStoreError { ))] DatabaseAlreadyExists { name: String, generation: usize }, - #[snafu(display("No active database found in object storage"))] - ActiveDatabaseNotFound, - #[snafu(display("Multiple active databases found in object storage"))] MultipleActiveDatabasesFound, } @@ -85,13 +82,12 @@ impl Generation { } impl IoxObjectStore { - /// List valid, unique, active database names in object storage that could be used with - /// `IoxObjectStore::existing` on this server. - // TODO: this lies and lists all databases for now and assumes generation 0 - pub async fn list_active_databases( + /// List database names in object storage that need to be further checked for generations + /// and whether they're marked as deleted or not. + pub async fn list_possible_databases( inner: &ObjectStore, server_id: ServerId, - ) -> Result, usize)>> { + ) -> Result>> { let path = paths::all_databases_path(inner, server_id); let list_result = inner.list_with_delimiter(&path).await?; @@ -109,11 +105,7 @@ impl IoxObjectStore { .log_if_error("invalid database directory") .ok()?; - // TODO: Check each generation directory and only return this database if there - // is a generation directory that does not contain a tombstone file, otherwise - // there is not actually an active database for this name - // See: https://github.com/influxdata/influxdb_iox/issues/2197 - Some((db_name, 0)) + Some(db_name) }) .collect()) } @@ -208,7 +200,7 @@ impl IoxObjectStore { inner: Arc, server_id: ServerId, database_name: &DatabaseName<'static>, - ) -> Result { + ) -> Result, IoxObjectStoreError> { let root_path = RootPath::new(&inner, server_id, database_name); let generations = Self::list_generations(&inner, &root_path) @@ -217,13 +209,22 @@ impl IoxObjectStore { let mut active_generations = generations.iter().filter(|g| g.is_active()); - let active = active_generations.next().context(ActiveDatabaseNotFound)?; + let active = match active_generations.next() { + Some(a) => a, + None => return Ok(None), + }; + ensure!( active_generations.next().is_none(), MultipleActiveDatabasesFound ); - Ok(Self::existing(inner, server_id, database_name, active.id)) + Ok(Some(Self::existing( + inner, + server_id, + database_name, + active.id, + ))) } /// Look in object storage for an existing database with this name and a non-deleted diff --git a/server/src/database.rs b/server/src/database.rs index cad3f384b3..edbae2506f 100644 --- a/server/src/database.rs +++ b/server/src/database.rs @@ -21,7 +21,7 @@ use observability_deps::tracing::{error, info, warn}; use parking_lot::RwLock; use parquet_file::catalog::PreservedCatalog; use persistence_windows::checkpoint::ReplayPlan; -use snafu::{ResultExt, Snafu}; +use snafu::{OptionExt, ResultExt, Snafu}; use std::{future::Future, sync::Arc, time::Duration}; use tokio::{sync::Notify, task::JoinError}; use tokio_util::sync::CancellationToken; @@ -90,7 +90,6 @@ pub struct Database { pub struct DatabaseConfig { pub name: DatabaseName<'static>, pub server_id: ServerId, - pub generation_id: usize, pub wipe_catalog_on_error: bool, pub skip_replay: bool, } @@ -101,16 +100,8 @@ impl Database { /// This is backed by an existing database, which was [created](Self::create) some time in the /// past. pub fn new(application: Arc, config: DatabaseConfig) -> Self { - let iox_object_store = Arc::new(IoxObjectStore::existing( - Arc::clone(application.object_store()), - config.server_id, - &config.name, - config.generation_id, - )); - info!( db_name=%config.name, - store_prefix=%iox_object_store.debug_database_path(), "new database" ); @@ -120,7 +111,6 @@ impl Database { shutdown: Default::default(), state: RwLock::new(Freezable::new(DatabaseState::Known(DatabaseStateKnown {}))), state_notify: Default::default(), - iox_object_store, }); let handle = tokio::spawn(background_worker(Arc::clone(&shared))); @@ -193,6 +183,11 @@ impl Database { self.shared.state.read().rules() } + /// Returns the IoxObjectStore if it has been found + pub fn iox_object_store(&self) -> Option> { + self.shared.state.read().iox_object_store() + } + /// Gets access to an initialized `Db` pub fn initialized_db(&self) -> Option> { self.shared @@ -202,10 +197,6 @@ impl Database { .map(|state| Arc::clone(&state.db)) } - pub fn iox_object_store(&self) -> Arc { - Arc::clone(&self.shared.iox_object_store) - } - /// Returns Ok(()) when the Database is initialized, or the error /// if one is encountered pub async fn wait_for_init(&self) -> Result<(), Arc> { @@ -218,10 +209,12 @@ impl Database { // the notification being fired, and this task waking up match &**self.shared.state.read() { DatabaseState::Known(_) + | DatabaseState::ObjectStoreFound(_) | DatabaseState::RulesLoaded(_) | DatabaseState::CatalogLoaded(_) => {} // Non-terminal state DatabaseState::Initialized(_) => return Ok(()), - DatabaseState::RulesLoadError(_, e) + DatabaseState::ObjectStoreLookupError(_, e) + | DatabaseState::RulesLoadError(_, e) | DatabaseState::CatalogLoadError(_, e) | DatabaseState::ReplayError(_, e) => return Err(Arc::clone(e)), } @@ -259,7 +252,7 @@ impl Database { Ok(async move { let db_name = &shared.config.name; - PreservedCatalog::wipe(&shared.iox_object_store) + PreservedCatalog::wipe(¤t_state.iox_object_store) .await .map_err(Box::new) .context(WipePreservedCatalog { db_name })?; @@ -349,9 +342,6 @@ struct DatabaseShared { /// Notify that the database state has changed state_notify: Notify, - - /// The object store interface for this database - iox_object_store: Arc, } /// The background worker for `Database` - there should only ever be one @@ -430,6 +420,7 @@ async fn initialize_database(shared: &DatabaseShared) { DatabaseState::Initialized(_) => break, // Can perform work DatabaseState::Known(_) + | DatabaseState::ObjectStoreFound(_) | DatabaseState::RulesLoaded(_) | DatabaseState::CatalogLoaded(_) => { match state.try_freeze() { @@ -442,7 +433,8 @@ async fn initialize_database(shared: &DatabaseShared) { } } // Operator intervention required - DatabaseState::RulesLoadError(_, e) + DatabaseState::ObjectStoreLookupError(_, e) + | DatabaseState::RulesLoadError(_, e) | DatabaseState::CatalogLoadError(_, e) | DatabaseState::ReplayError(_, e) => { error!(%db_name, %e, %state, "database in error state - operator intervention required"); @@ -466,6 +458,10 @@ async fn initialize_database(shared: &DatabaseShared) { // Try to advance to the next state let next_state = match state { DatabaseState::Known(state) => match state.advance(shared).await { + Ok(state) => DatabaseState::ObjectStoreFound(state), + Err(e) => DatabaseState::ObjectStoreLookupError(state, Arc::new(e)), + }, + DatabaseState::ObjectStoreFound(state) => match state.advance(shared).await { Ok(state) => DatabaseState::RulesLoaded(state), Err(e) => DatabaseState::RulesLoadError(state, Arc::new(e)), }, @@ -494,6 +490,17 @@ async fn initialize_database(shared: &DatabaseShared) { /// Errors encountered during initialization of a database #[derive(Debug, Snafu)] pub enum InitError { + #[snafu(display( + "error finding active generation directory in object storage: {}", + source + ))] + ObjectStoreLookup { + source: iox_object_store::IoxObjectStoreError, + }, + + #[snafu(display("no active generation directory found, not loading"))] + NoActiveDatabase, + #[snafu(display("error fetching rules: {}", source))] RulesFetch { source: object_store::Error }, @@ -545,12 +552,14 @@ pub enum InitError { #[derive(Debug, Clone)] enum DatabaseState { Known(DatabaseStateKnown), + ObjectStoreFound(DatabaseStateObjectStoreFound), RulesLoaded(DatabaseStateRulesLoaded), CatalogLoaded(DatabaseStateCatalogLoaded), Initialized(DatabaseStateInitialized), - RulesLoadError(DatabaseStateKnown, Arc), + ObjectStoreLookupError(DatabaseStateKnown, Arc), + RulesLoadError(DatabaseStateObjectStoreFound, Arc), CatalogLoadError(DatabaseStateRulesLoaded, Arc), ReplayError(DatabaseStateCatalogLoaded, Arc), } @@ -565,10 +574,12 @@ impl DatabaseState { fn state_code(&self) -> DatabaseStateCode { match self { DatabaseState::Known(_) => DatabaseStateCode::Known, + DatabaseState::ObjectStoreFound(_) => DatabaseStateCode::ObjectStoreFound, DatabaseState::RulesLoaded(_) => DatabaseStateCode::RulesLoaded, DatabaseState::CatalogLoaded(_) => DatabaseStateCode::CatalogLoaded, DatabaseState::Initialized(_) => DatabaseStateCode::Initialized, - DatabaseState::RulesLoadError(_, _) => DatabaseStateCode::Known, + DatabaseState::ObjectStoreLookupError(_, _) => DatabaseStateCode::Known, + DatabaseState::RulesLoadError(_, _) => DatabaseStateCode::ObjectStoreFound, DatabaseState::CatalogLoadError(_, _) => DatabaseStateCode::RulesLoaded, DatabaseState::ReplayError(_, _) => DatabaseStateCode::CatalogLoaded, } @@ -577,10 +588,12 @@ impl DatabaseState { fn error(&self) -> Option<&Arc> { match self { DatabaseState::Known(_) + | DatabaseState::ObjectStoreFound(_) | DatabaseState::RulesLoaded(_) | DatabaseState::CatalogLoaded(_) | DatabaseState::Initialized(_) => None, - DatabaseState::RulesLoadError(_, e) + DatabaseState::ObjectStoreLookupError(_, e) + | DatabaseState::RulesLoadError(_, e) | DatabaseState::CatalogLoadError(_, e) | DatabaseState::ReplayError(_, e) => Some(e), } @@ -588,7 +601,10 @@ impl DatabaseState { fn rules(&self) -> Option> { match self { - DatabaseState::Known(_) | DatabaseState::RulesLoadError(_, _) => None, + DatabaseState::Known(_) + | DatabaseState::ObjectStoreFound(_) + | DatabaseState::ObjectStoreLookupError(_, _) + | DatabaseState::RulesLoadError(_, _) => None, DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => { Some(Arc::clone(&state.rules)) } @@ -599,6 +615,22 @@ impl DatabaseState { } } + fn iox_object_store(&self) -> Option> { + match self { + DatabaseState::Known(_) + | DatabaseState::ObjectStoreLookupError(_, _) + | DatabaseState::RulesLoadError(_, _) => None, + DatabaseState::ObjectStoreFound(state) => Some(Arc::clone(&state.iox_object_store)), + DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => { + Some(Arc::clone(&state.iox_object_store)) + } + DatabaseState::CatalogLoaded(state) | DatabaseState::ReplayError(state, _) => { + Some(state.db.iox_object_store()) + } + DatabaseState::Initialized(state) => Some(state.db.iox_object_store()), + } + } + fn get_initialized(&self) -> Option<&DatabaseStateInitialized> { match self { DatabaseState::Initialized(state) => Some(state), @@ -611,13 +643,39 @@ impl DatabaseState { struct DatabaseStateKnown {} impl DatabaseStateKnown { + /// Find active object storage for this database + async fn advance( + &self, + shared: &DatabaseShared, + ) -> Result { + let iox_object_store = IoxObjectStore::find_existing( + Arc::clone(shared.application.object_store()), + shared.config.server_id, + &shared.config.name, + ) + .await + .context(ObjectStoreLookup)? + .context(NoActiveDatabase)?; + + Ok(DatabaseStateObjectStoreFound { + iox_object_store: Arc::new(iox_object_store), + }) + } +} + +#[derive(Debug, Clone)] +struct DatabaseStateObjectStoreFound { + iox_object_store: Arc, +} + +impl DatabaseStateObjectStoreFound { /// Load database rules from object storage async fn advance( &self, shared: &DatabaseShared, ) -> Result { // TODO: Retry this - let bytes = shared + let bytes = self .iox_object_store .get_database_rules_file() .await @@ -635,6 +693,7 @@ impl DatabaseStateKnown { Ok(DatabaseStateRulesLoaded { rules: Arc::new(rules), + iox_object_store: Arc::clone(&self.iox_object_store), }) } } @@ -642,6 +701,7 @@ impl DatabaseStateKnown { #[derive(Debug, Clone)] struct DatabaseStateRulesLoaded { rules: Arc, + iox_object_store: Arc, } impl DatabaseStateRulesLoaded { @@ -652,7 +712,7 @@ impl DatabaseStateRulesLoaded { ) -> Result { let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog( shared.config.name.as_str(), - Arc::clone(&shared.iox_object_store), + Arc::clone(&self.iox_object_store), shared.config.server_id, Arc::clone(shared.application.metric_registry()), shared.config.wipe_catalog_on_error, @@ -670,7 +730,7 @@ impl DatabaseStateRulesLoaded { let database_to_commit = DatabaseToCommit { server_id: shared.config.server_id, - iox_object_store: Arc::clone(&shared.iox_object_store), + iox_object_store: Arc::clone(&self.iox_object_store), exec: Arc::clone(shared.application.executor()), rules: Arc::clone(&self.rules), preserved_catalog, @@ -718,7 +778,7 @@ struct DatabaseStateInitialized { db: Arc, } -/// Persist the the `DatabaseRules` given the `Database` store prefix +/// Persist the the `DatabaseRules` given the database object storage pub(super) async fn persist_database_rules( object_store: &IoxObjectStore, rules: DatabaseRules, @@ -756,7 +816,6 @@ mod tests { DatabaseConfig { name: DatabaseName::new("test").unwrap(), server_id: ServerId::new(NonZeroU32::new(23).unwrap()), - generation_id: 0, wipe_catalog_on_error: false, skip_replay: false, }, @@ -764,7 +823,11 @@ mod tests { // Should have failed to load (this isn't important to the test) let err = database.wait_for_init().await.unwrap_err(); - assert!(matches!(err.as_ref(), InitError::RulesFetch { .. })); + assert!( + matches!(err.as_ref(), InitError::NoActiveDatabase { .. }), + "got {:?}", + err + ); // Database should be running assert!(database.join().now_or_never().is_none()); @@ -855,7 +918,6 @@ mod tests { let db_config = DatabaseConfig { name: db_name, server_id, - generation_id, wipe_catalog_on_error: false, skip_replay: false, }; diff --git a/server/src/db.rs b/server/src/db.rs index a6bb0f67d7..fc043ade28 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -537,6 +537,11 @@ impl Db { Ok(new_rules) } + /// Return the current database's object storage + pub fn iox_object_store(&self) -> Arc { + Arc::clone(&self.iox_object_store) + } + /// Rolls over the active chunk in the database's specified /// partition. Returns the previously open (now closed) Chunk if /// there was any. diff --git a/server/src/lib.rs b/server/src/lib.rs index 95a556a9cc..213137385d 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -684,7 +684,7 @@ where initialized.server_id }; - let generation_id = + let _generation_id = Database::create(Arc::clone(&self.shared.application), rules, server_id) .await .context(CannotCreateDatabase)?; @@ -702,7 +702,6 @@ where DatabaseConfig { name: db_name, server_id, - generation_id, wipe_catalog_on_error: false, skip_replay: false, }, @@ -955,7 +954,7 @@ where let rules = db.update_rules(update).map_err(UpdateError::Closure)?; // TODO: Handle failure - persist_database_rules(&database.iox_object_store(), rules.as_ref().clone()) + persist_database_rules(&db.iox_object_store(), rules.as_ref().clone()) .await .context(CannotPersistUpdatedRules)?; Ok(rules) @@ -1115,7 +1114,7 @@ async fn maybe_initialize_server(shared: &ServerShared) { (init_ready, handle) }; - let maybe_databases = IoxObjectStore::list_active_databases( + let maybe_databases = IoxObjectStore::list_possible_databases( shared.application.object_store(), init_ready.server_id, ) @@ -1128,14 +1127,13 @@ async fn maybe_initialize_server(shared: &ServerShared) { databases: HashMap::with_capacity(databases.len()), }; - for (db_name, generation_id) in databases { + for db_name in databases { state .new_database( shared, DatabaseConfig { name: db_name, server_id: init_ready.server_id, - generation_id, wipe_catalog_on_error: init_ready.wipe_catalog_on_error, skip_replay: init_ready.skip_replay_and_seek_instead, }, @@ -1310,6 +1308,7 @@ mod tests { let read_data = bananas .iox_object_store() + .unwrap() .get_database_rules_file() .await .unwrap(); @@ -1415,6 +1414,7 @@ mod tests { bananas .iox_object_store() + .unwrap() .delete_database_rules_file() .await .expect("cannot delete rules file"); @@ -1963,15 +1963,18 @@ mod tests { // tamper store to break one database rules_broken .iox_object_store() + .unwrap() .put_database_rules_file(Bytes::from("x")) .await .unwrap(); - let (preserved_catalog, _catalog) = - PreservedCatalog::load::(catalog_broken.iox_object_store(), ()) - .await - .unwrap() - .unwrap(); + let (preserved_catalog, _catalog) = PreservedCatalog::load::( + catalog_broken.iox_object_store().unwrap(), + (), + ) + .await + .unwrap() + .unwrap(); parquet_file::catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog) .await; @@ -1979,6 +1982,7 @@ mod tests { rules_broken .iox_object_store() + .unwrap() .get_database_rules_file() .await .unwrap(); @@ -2031,9 +2035,11 @@ mod tests { .to_string(), "error wiping preserved catalog: database (db_existing) in invalid state (Initialized) for transition (WipePreservedCatalog)" ); - assert!(PreservedCatalog::exists(&existing.iox_object_store(),) - .await - .unwrap()); + assert!( + PreservedCatalog::exists(&existing.iox_object_store().unwrap()) + .await + .unwrap() + ); // 2. cannot wipe non-existent DB assert!(matches!( @@ -2072,7 +2078,8 @@ mod tests { .wipe_preserved_catalog(&db_name_rules_broken) .unwrap_err() .to_string(), - "error wiping preserved catalog: database (db_broken_rules) in invalid state (Known) for transition (WipePreservedCatalog)" + "error wiping preserved catalog: database (db_broken_rules) in invalid state \ + (ObjectStoreFound) for transition (WipePreservedCatalog)" ); // 4. wipe DB with broken catalog, this will bring the DB back to life @@ -2092,9 +2099,11 @@ mod tests { database.wait_for_init().await.unwrap(); - assert!(PreservedCatalog::exists(&catalog_broken.iox_object_store()) - .await - .unwrap()); + assert!( + PreservedCatalog::exists(&catalog_broken.iox_object_store().unwrap()) + .await + .unwrap() + ); assert!(database.init_error().is_none()); assert!(server.db(&db_name_catalog_broken).is_ok()); @@ -2118,9 +2127,11 @@ mod tests { .to_string(), "error wiping preserved catalog: database (db_created) in invalid state (Initialized) for transition (WipePreservedCatalog)" ); - assert!(PreservedCatalog::exists(&created.iox_object_store()) - .await - .unwrap()); + assert!( + PreservedCatalog::exists(&created.iox_object_store().unwrap()) + .await + .unwrap() + ); } #[tokio::test]