refactor: Have server determine database generation from object store

pull/24376/head
Carol (Nichols || Goulding) 2021-08-23 13:03:24 -04:00
parent 1f0e37c9d1
commit c7eceac8a3
7 changed files with 157 additions and 70 deletions

View File

@ -4,6 +4,9 @@ pub enum DatabaseStateCode {
/// Database is known but nothing is loaded.
Known,
/// Database object storage has been found
ObjectStoreFound,
/// Rules are loaded
RulesLoaded,
@ -28,6 +31,7 @@ impl DatabaseStateCode {
pub fn description(&self) -> &'static str {
match self {
DatabaseStateCode::Known => "Known",
DatabaseStateCode::ObjectStoreFound => "ObjectStoreFound",
DatabaseStateCode::RulesLoaded => "RulesLoaded",
DatabaseStateCode::CatalogLoaded => "CatalogLoaded",
DatabaseStateCode::RulesLoadError => "RulesLoadError",

View File

@ -301,6 +301,9 @@ message DatabaseStatus {
// Database is known but nothing is loaded.
DATABASE_STATE_KNOWN = 1;
// Database object storage has been found
DATABASE_STATE_OBJECT_STORE_FOUND = 8;
// Rules are loaded
DATABASE_STATE_RULES_LOADED = 2;

View File

@ -5,6 +5,7 @@ impl From<DatabaseStateCode> for management::database_status::DatabaseState {
fn from(state_code: DatabaseStateCode) -> Self {
match state_code {
DatabaseStateCode::Known => Self::Known,
DatabaseStateCode::ObjectStoreFound => Self::ObjectStoreFound,
DatabaseStateCode::RulesLoaded => Self::RulesLoaded,
DatabaseStateCode::CatalogLoaded => Self::CatalogLoaded,
DatabaseStateCode::Initialized => Self::Initialized,

View File

@ -24,7 +24,7 @@ use object_store::{
path::{parsed::DirsAndFileName, ObjectStorePath, Path},
ObjectStore, ObjectStoreApi, Result,
};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use snafu::{ensure, ResultExt, Snafu};
use std::{io, sync::Arc};
use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;
@ -51,9 +51,6 @@ pub enum IoxObjectStoreError {
))]
DatabaseAlreadyExists { name: String, generation: usize },
#[snafu(display("No active database found in object storage"))]
ActiveDatabaseNotFound,
#[snafu(display("Multiple active databases found in object storage"))]
MultipleActiveDatabasesFound,
}
@ -85,13 +82,12 @@ impl Generation {
}
impl IoxObjectStore {
/// List valid, unique, active database names in object storage that could be used with
/// `IoxObjectStore::existing` on this server.
// TODO: this lies and lists all databases for now and assumes generation 0
pub async fn list_active_databases(
/// List database names in object storage that need to be further checked for generations
/// and whether they're marked as deleted or not.
pub async fn list_possible_databases(
inner: &ObjectStore,
server_id: ServerId,
) -> Result<Vec<(DatabaseName<'static>, usize)>> {
) -> Result<Vec<DatabaseName<'static>>> {
let path = paths::all_databases_path(inner, server_id);
let list_result = inner.list_with_delimiter(&path).await?;
@ -109,11 +105,7 @@ impl IoxObjectStore {
.log_if_error("invalid database directory")
.ok()?;
// TODO: Check each generation directory and only return this database if there
// is a generation directory that does not contain a tombstone file, otherwise
// there is not actually an active database for this name
// See: https://github.com/influxdata/influxdb_iox/issues/2197
Some((db_name, 0))
Some(db_name)
})
.collect())
}
@ -208,7 +200,7 @@ impl IoxObjectStore {
inner: Arc<ObjectStore>,
server_id: ServerId,
database_name: &DatabaseName<'static>,
) -> Result<Self, IoxObjectStoreError> {
) -> Result<Option<Self>, IoxObjectStoreError> {
let root_path = RootPath::new(&inner, server_id, database_name);
let generations = Self::list_generations(&inner, &root_path)
@ -217,13 +209,22 @@ impl IoxObjectStore {
let mut active_generations = generations.iter().filter(|g| g.is_active());
let active = active_generations.next().context(ActiveDatabaseNotFound)?;
let active = match active_generations.next() {
Some(a) => a,
None => return Ok(None),
};
ensure!(
active_generations.next().is_none(),
MultipleActiveDatabasesFound
);
Ok(Self::existing(inner, server_id, database_name, active.id))
Ok(Some(Self::existing(
inner,
server_id,
database_name,
active.id,
)))
}
/// Look in object storage for an existing database with this name and a non-deleted

View File

@ -21,7 +21,7 @@ use observability_deps::tracing::{error, info, warn};
use parking_lot::RwLock;
use parquet_file::catalog::PreservedCatalog;
use persistence_windows::checkpoint::ReplayPlan;
use snafu::{ResultExt, Snafu};
use snafu::{OptionExt, ResultExt, Snafu};
use std::{future::Future, sync::Arc, time::Duration};
use tokio::{sync::Notify, task::JoinError};
use tokio_util::sync::CancellationToken;
@ -90,7 +90,6 @@ pub struct Database {
pub struct DatabaseConfig {
pub name: DatabaseName<'static>,
pub server_id: ServerId,
pub generation_id: usize,
pub wipe_catalog_on_error: bool,
pub skip_replay: bool,
}
@ -101,16 +100,8 @@ impl Database {
/// This is backed by an existing database, which was [created](Self::create) some time in the
/// past.
pub fn new(application: Arc<ApplicationState>, config: DatabaseConfig) -> Self {
let iox_object_store = Arc::new(IoxObjectStore::existing(
Arc::clone(application.object_store()),
config.server_id,
&config.name,
config.generation_id,
));
info!(
db_name=%config.name,
store_prefix=%iox_object_store.debug_database_path(),
"new database"
);
@ -120,7 +111,6 @@ impl Database {
shutdown: Default::default(),
state: RwLock::new(Freezable::new(DatabaseState::Known(DatabaseStateKnown {}))),
state_notify: Default::default(),
iox_object_store,
});
let handle = tokio::spawn(background_worker(Arc::clone(&shared)));
@ -193,6 +183,11 @@ impl Database {
self.shared.state.read().rules()
}
/// Returns the IoxObjectStore if it has been found
pub fn iox_object_store(&self) -> Option<Arc<IoxObjectStore>> {
self.shared.state.read().iox_object_store()
}
/// Gets access to an initialized `Db`
pub fn initialized_db(&self) -> Option<Arc<Db>> {
self.shared
@ -202,10 +197,6 @@ impl Database {
.map(|state| Arc::clone(&state.db))
}
pub fn iox_object_store(&self) -> Arc<IoxObjectStore> {
Arc::clone(&self.shared.iox_object_store)
}
/// Returns Ok(()) when the Database is initialized, or the error
/// if one is encountered
pub async fn wait_for_init(&self) -> Result<(), Arc<InitError>> {
@ -218,10 +209,12 @@ impl Database {
// the notification being fired, and this task waking up
match &**self.shared.state.read() {
DatabaseState::Known(_)
| DatabaseState::ObjectStoreFound(_)
| DatabaseState::RulesLoaded(_)
| DatabaseState::CatalogLoaded(_) => {} // Non-terminal state
DatabaseState::Initialized(_) => return Ok(()),
DatabaseState::RulesLoadError(_, e)
DatabaseState::ObjectStoreLookupError(_, e)
| DatabaseState::RulesLoadError(_, e)
| DatabaseState::CatalogLoadError(_, e)
| DatabaseState::ReplayError(_, e) => return Err(Arc::clone(e)),
}
@ -259,7 +252,7 @@ impl Database {
Ok(async move {
let db_name = &shared.config.name;
PreservedCatalog::wipe(&shared.iox_object_store)
PreservedCatalog::wipe(&current_state.iox_object_store)
.await
.map_err(Box::new)
.context(WipePreservedCatalog { db_name })?;
@ -349,9 +342,6 @@ struct DatabaseShared {
/// Notify that the database state has changed
state_notify: Notify,
/// The object store interface for this database
iox_object_store: Arc<IoxObjectStore>,
}
/// The background worker for `Database` - there should only ever be one
@ -430,6 +420,7 @@ async fn initialize_database(shared: &DatabaseShared) {
DatabaseState::Initialized(_) => break,
// Can perform work
DatabaseState::Known(_)
| DatabaseState::ObjectStoreFound(_)
| DatabaseState::RulesLoaded(_)
| DatabaseState::CatalogLoaded(_) => {
match state.try_freeze() {
@ -442,7 +433,8 @@ async fn initialize_database(shared: &DatabaseShared) {
}
}
// Operator intervention required
DatabaseState::RulesLoadError(_, e)
DatabaseState::ObjectStoreLookupError(_, e)
| DatabaseState::RulesLoadError(_, e)
| DatabaseState::CatalogLoadError(_, e)
| DatabaseState::ReplayError(_, e) => {
error!(%db_name, %e, %state, "database in error state - operator intervention required");
@ -466,6 +458,10 @@ async fn initialize_database(shared: &DatabaseShared) {
// Try to advance to the next state
let next_state = match state {
DatabaseState::Known(state) => match state.advance(shared).await {
Ok(state) => DatabaseState::ObjectStoreFound(state),
Err(e) => DatabaseState::ObjectStoreLookupError(state, Arc::new(e)),
},
DatabaseState::ObjectStoreFound(state) => match state.advance(shared).await {
Ok(state) => DatabaseState::RulesLoaded(state),
Err(e) => DatabaseState::RulesLoadError(state, Arc::new(e)),
},
@ -494,6 +490,17 @@ async fn initialize_database(shared: &DatabaseShared) {
/// Errors encountered during initialization of a database
#[derive(Debug, Snafu)]
pub enum InitError {
#[snafu(display(
"error finding active generation directory in object storage: {}",
source
))]
ObjectStoreLookup {
source: iox_object_store::IoxObjectStoreError,
},
#[snafu(display("no active generation directory found, not loading"))]
NoActiveDatabase,
#[snafu(display("error fetching rules: {}", source))]
RulesFetch { source: object_store::Error },
@ -545,12 +552,14 @@ pub enum InitError {
#[derive(Debug, Clone)]
enum DatabaseState {
Known(DatabaseStateKnown),
ObjectStoreFound(DatabaseStateObjectStoreFound),
RulesLoaded(DatabaseStateRulesLoaded),
CatalogLoaded(DatabaseStateCatalogLoaded),
Initialized(DatabaseStateInitialized),
RulesLoadError(DatabaseStateKnown, Arc<InitError>),
ObjectStoreLookupError(DatabaseStateKnown, Arc<InitError>),
RulesLoadError(DatabaseStateObjectStoreFound, Arc<InitError>),
CatalogLoadError(DatabaseStateRulesLoaded, Arc<InitError>),
ReplayError(DatabaseStateCatalogLoaded, Arc<InitError>),
}
@ -565,10 +574,12 @@ impl DatabaseState {
fn state_code(&self) -> DatabaseStateCode {
match self {
DatabaseState::Known(_) => DatabaseStateCode::Known,
DatabaseState::ObjectStoreFound(_) => DatabaseStateCode::ObjectStoreFound,
DatabaseState::RulesLoaded(_) => DatabaseStateCode::RulesLoaded,
DatabaseState::CatalogLoaded(_) => DatabaseStateCode::CatalogLoaded,
DatabaseState::Initialized(_) => DatabaseStateCode::Initialized,
DatabaseState::RulesLoadError(_, _) => DatabaseStateCode::Known,
DatabaseState::ObjectStoreLookupError(_, _) => DatabaseStateCode::Known,
DatabaseState::RulesLoadError(_, _) => DatabaseStateCode::ObjectStoreFound,
DatabaseState::CatalogLoadError(_, _) => DatabaseStateCode::RulesLoaded,
DatabaseState::ReplayError(_, _) => DatabaseStateCode::CatalogLoaded,
}
@ -577,10 +588,12 @@ impl DatabaseState {
fn error(&self) -> Option<&Arc<InitError>> {
match self {
DatabaseState::Known(_)
| DatabaseState::ObjectStoreFound(_)
| DatabaseState::RulesLoaded(_)
| DatabaseState::CatalogLoaded(_)
| DatabaseState::Initialized(_) => None,
DatabaseState::RulesLoadError(_, e)
DatabaseState::ObjectStoreLookupError(_, e)
| DatabaseState::RulesLoadError(_, e)
| DatabaseState::CatalogLoadError(_, e)
| DatabaseState::ReplayError(_, e) => Some(e),
}
@ -588,7 +601,10 @@ impl DatabaseState {
fn rules(&self) -> Option<Arc<DatabaseRules>> {
match self {
DatabaseState::Known(_) | DatabaseState::RulesLoadError(_, _) => None,
DatabaseState::Known(_)
| DatabaseState::ObjectStoreFound(_)
| DatabaseState::ObjectStoreLookupError(_, _)
| DatabaseState::RulesLoadError(_, _) => None,
DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => {
Some(Arc::clone(&state.rules))
}
@ -599,6 +615,22 @@ impl DatabaseState {
}
}
fn iox_object_store(&self) -> Option<Arc<IoxObjectStore>> {
match self {
DatabaseState::Known(_)
| DatabaseState::ObjectStoreLookupError(_, _)
| DatabaseState::RulesLoadError(_, _) => None,
DatabaseState::ObjectStoreFound(state) => Some(Arc::clone(&state.iox_object_store)),
DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => {
Some(Arc::clone(&state.iox_object_store))
}
DatabaseState::CatalogLoaded(state) | DatabaseState::ReplayError(state, _) => {
Some(state.db.iox_object_store())
}
DatabaseState::Initialized(state) => Some(state.db.iox_object_store()),
}
}
fn get_initialized(&self) -> Option<&DatabaseStateInitialized> {
match self {
DatabaseState::Initialized(state) => Some(state),
@ -611,13 +643,39 @@ impl DatabaseState {
struct DatabaseStateKnown {}
impl DatabaseStateKnown {
/// Find active object storage for this database
async fn advance(
&self,
shared: &DatabaseShared,
) -> Result<DatabaseStateObjectStoreFound, InitError> {
let iox_object_store = IoxObjectStore::find_existing(
Arc::clone(shared.application.object_store()),
shared.config.server_id,
&shared.config.name,
)
.await
.context(ObjectStoreLookup)?
.context(NoActiveDatabase)?;
Ok(DatabaseStateObjectStoreFound {
iox_object_store: Arc::new(iox_object_store),
})
}
}
#[derive(Debug, Clone)]
struct DatabaseStateObjectStoreFound {
iox_object_store: Arc<IoxObjectStore>,
}
impl DatabaseStateObjectStoreFound {
/// Load database rules from object storage
async fn advance(
&self,
shared: &DatabaseShared,
) -> Result<DatabaseStateRulesLoaded, InitError> {
// TODO: Retry this
let bytes = shared
let bytes = self
.iox_object_store
.get_database_rules_file()
.await
@ -635,6 +693,7 @@ impl DatabaseStateKnown {
Ok(DatabaseStateRulesLoaded {
rules: Arc::new(rules),
iox_object_store: Arc::clone(&self.iox_object_store),
})
}
}
@ -642,6 +701,7 @@ impl DatabaseStateKnown {
#[derive(Debug, Clone)]
struct DatabaseStateRulesLoaded {
rules: Arc<DatabaseRules>,
iox_object_store: Arc<IoxObjectStore>,
}
impl DatabaseStateRulesLoaded {
@ -652,7 +712,7 @@ impl DatabaseStateRulesLoaded {
) -> Result<DatabaseStateCatalogLoaded, InitError> {
let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog(
shared.config.name.as_str(),
Arc::clone(&shared.iox_object_store),
Arc::clone(&self.iox_object_store),
shared.config.server_id,
Arc::clone(shared.application.metric_registry()),
shared.config.wipe_catalog_on_error,
@ -670,7 +730,7 @@ impl DatabaseStateRulesLoaded {
let database_to_commit = DatabaseToCommit {
server_id: shared.config.server_id,
iox_object_store: Arc::clone(&shared.iox_object_store),
iox_object_store: Arc::clone(&self.iox_object_store),
exec: Arc::clone(shared.application.executor()),
rules: Arc::clone(&self.rules),
preserved_catalog,
@ -718,7 +778,7 @@ struct DatabaseStateInitialized {
db: Arc<Db>,
}
/// Persist the the `DatabaseRules` given the `Database` store prefix
/// Persist the the `DatabaseRules` given the database object storage
pub(super) async fn persist_database_rules(
object_store: &IoxObjectStore,
rules: DatabaseRules,
@ -756,7 +816,6 @@ mod tests {
DatabaseConfig {
name: DatabaseName::new("test").unwrap(),
server_id: ServerId::new(NonZeroU32::new(23).unwrap()),
generation_id: 0,
wipe_catalog_on_error: false,
skip_replay: false,
},
@ -764,7 +823,11 @@ mod tests {
// Should have failed to load (this isn't important to the test)
let err = database.wait_for_init().await.unwrap_err();
assert!(matches!(err.as_ref(), InitError::RulesFetch { .. }));
assert!(
matches!(err.as_ref(), InitError::NoActiveDatabase { .. }),
"got {:?}",
err
);
// Database should be running
assert!(database.join().now_or_never().is_none());
@ -855,7 +918,6 @@ mod tests {
let db_config = DatabaseConfig {
name: db_name,
server_id,
generation_id,
wipe_catalog_on_error: false,
skip_replay: false,
};

View File

@ -537,6 +537,11 @@ impl Db {
Ok(new_rules)
}
/// Return the current database's object storage
pub fn iox_object_store(&self) -> Arc<IoxObjectStore> {
Arc::clone(&self.iox_object_store)
}
/// Rolls over the active chunk in the database's specified
/// partition. Returns the previously open (now closed) Chunk if
/// there was any.

View File

@ -684,7 +684,7 @@ where
initialized.server_id
};
let generation_id =
let _generation_id =
Database::create(Arc::clone(&self.shared.application), rules, server_id)
.await
.context(CannotCreateDatabase)?;
@ -702,7 +702,6 @@ where
DatabaseConfig {
name: db_name,
server_id,
generation_id,
wipe_catalog_on_error: false,
skip_replay: false,
},
@ -955,7 +954,7 @@ where
let rules = db.update_rules(update).map_err(UpdateError::Closure)?;
// TODO: Handle failure
persist_database_rules(&database.iox_object_store(), rules.as_ref().clone())
persist_database_rules(&db.iox_object_store(), rules.as_ref().clone())
.await
.context(CannotPersistUpdatedRules)?;
Ok(rules)
@ -1115,7 +1114,7 @@ async fn maybe_initialize_server(shared: &ServerShared) {
(init_ready, handle)
};
let maybe_databases = IoxObjectStore::list_active_databases(
let maybe_databases = IoxObjectStore::list_possible_databases(
shared.application.object_store(),
init_ready.server_id,
)
@ -1128,14 +1127,13 @@ async fn maybe_initialize_server(shared: &ServerShared) {
databases: HashMap::with_capacity(databases.len()),
};
for (db_name, generation_id) in databases {
for db_name in databases {
state
.new_database(
shared,
DatabaseConfig {
name: db_name,
server_id: init_ready.server_id,
generation_id,
wipe_catalog_on_error: init_ready.wipe_catalog_on_error,
skip_replay: init_ready.skip_replay_and_seek_instead,
},
@ -1310,6 +1308,7 @@ mod tests {
let read_data = bananas
.iox_object_store()
.unwrap()
.get_database_rules_file()
.await
.unwrap();
@ -1415,6 +1414,7 @@ mod tests {
bananas
.iox_object_store()
.unwrap()
.delete_database_rules_file()
.await
.expect("cannot delete rules file");
@ -1963,15 +1963,18 @@ mod tests {
// tamper store to break one database
rules_broken
.iox_object_store()
.unwrap()
.put_database_rules_file(Bytes::from("x"))
.await
.unwrap();
let (preserved_catalog, _catalog) =
PreservedCatalog::load::<TestCatalogState>(catalog_broken.iox_object_store(), ())
.await
.unwrap()
.unwrap();
let (preserved_catalog, _catalog) = PreservedCatalog::load::<TestCatalogState>(
catalog_broken.iox_object_store().unwrap(),
(),
)
.await
.unwrap()
.unwrap();
parquet_file::catalog::test_helpers::break_catalog_with_weird_version(&preserved_catalog)
.await;
@ -1979,6 +1982,7 @@ mod tests {
rules_broken
.iox_object_store()
.unwrap()
.get_database_rules_file()
.await
.unwrap();
@ -2031,9 +2035,11 @@ mod tests {
.to_string(),
"error wiping preserved catalog: database (db_existing) in invalid state (Initialized) for transition (WipePreservedCatalog)"
);
assert!(PreservedCatalog::exists(&existing.iox_object_store(),)
.await
.unwrap());
assert!(
PreservedCatalog::exists(&existing.iox_object_store().unwrap())
.await
.unwrap()
);
// 2. cannot wipe non-existent DB
assert!(matches!(
@ -2072,7 +2078,8 @@ mod tests {
.wipe_preserved_catalog(&db_name_rules_broken)
.unwrap_err()
.to_string(),
"error wiping preserved catalog: database (db_broken_rules) in invalid state (Known) for transition (WipePreservedCatalog)"
"error wiping preserved catalog: database (db_broken_rules) in invalid state \
(ObjectStoreFound) for transition (WipePreservedCatalog)"
);
// 4. wipe DB with broken catalog, this will bring the DB back to life
@ -2092,9 +2099,11 @@ mod tests {
database.wait_for_init().await.unwrap();
assert!(PreservedCatalog::exists(&catalog_broken.iox_object_store())
.await
.unwrap());
assert!(
PreservedCatalog::exists(&catalog_broken.iox_object_store().unwrap())
.await
.unwrap()
);
assert!(database.init_error().is_none());
assert!(server.db(&db_name_catalog_broken).is_ok());
@ -2118,9 +2127,11 @@ mod tests {
.to_string(),
"error wiping preserved catalog: database (db_created) in invalid state (Initialized) for transition (WipePreservedCatalog)"
);
assert!(PreservedCatalog::exists(&created.iox_object_store())
.await
.unwrap());
assert!(
PreservedCatalog::exists(&created.iox_object_store().unwrap())
.await
.unwrap()
);
}
#[tokio::test]