refactor: require UUID to create Database (#3715)

* refactor: require UUID to create Database

* chore: review feedback

* chore: fmt

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Raphael Taylor-Davies 2022-02-10 20:04:06 +00:00 committed by GitHub
parent d3bd03e37a
commit 910f381355
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 196 additions and 497 deletions

View File

@ -84,12 +84,8 @@ impl management_service_server::ManagementService for ManagementService {
.await
.map_err(default_server_error_handler)?;
let uuid = database
.uuid()
.expect("Database should be initialized or an error should have been returned");
Ok(Response::new(CreateDatabaseResponse {
uuid: uuid.as_bytes().to_vec(),
uuid: database.uuid().as_bytes().to_vec(),
}))
}
@ -389,10 +385,7 @@ impl management_service_server::ManagementService for ManagementService {
message: e.to_string(),
}),
state: database.state_code().into(),
uuid: database
.uuid()
.map(|uuid| uuid.as_bytes().to_vec())
.unwrap_or_default(),
uuid: database.uuid().as_bytes().to_vec(),
})
.collect()
})

View File

@ -165,7 +165,7 @@ impl IoxObjectStore {
/// Access the database-specific object storage files for an existing database that has
/// already been located and verified to be active. Does not check object storage.
fn existing(inner: Arc<ObjectStore>, root_path: RootPath) -> Self {
pub fn existing(inner: Arc<ObjectStore>, root_path: RootPath) -> Self {
let data_path = root_path.data_path();
let transactions_path = root_path.transactions_path();

View File

@ -171,9 +171,18 @@ impl Database {
"new database"
);
let path = IoxObjectStore::root_path_for(application.object_store(), config.database_uuid);
// The database state machine handles the case of this path not existing, as it will end
// up in [`DatabaseState::RulesLoadError`] or [`DatabaseState::OwnerInfoLoadError`]
let iox_object_store = Arc::new(IoxObjectStore::existing(
Arc::clone(application.object_store()),
path,
));
let shared = Arc::new(DatabaseShared {
config: RwLock::new(config),
application,
iox_object_store,
state: RwLock::new(Freezable::new(DatabaseState::Shutdown(None))),
state_notify: Default::default(),
});
@ -191,20 +200,6 @@ impl Database {
let db_name = self.name();
let db_name = db_name.as_str();
let (iox_object_store, uuid) = {
let state = self.shared.state.read();
let store = state
.iox_object_store()
.context(CannotReleaseUnownedSnafu { db_name })?;
let uuid = state
.uuid()
.context(CannotReleaseUnownedSnafu { db_name })?;
(store, uuid)
};
self.shutdown();
let _ = self.join().await.log_if_error("releasing database");
@ -212,12 +207,12 @@ impl Database {
None,
None,
self.shared.application.time_provider().now(),
&iox_object_store,
&self.shared.iox_object_store,
)
.await
.context(CannotReleaseSnafu { db_name })?;
Ok(uuid)
Ok(self.uuid())
}
/// Triggers shutdown of this `Database` if it is running
@ -295,9 +290,9 @@ impl Database {
self.shared.state.read().provided_rules()
}
/// Returns the database UUID if it's loaded
pub fn uuid(&self) -> Option<Uuid> {
self.shared.state.read().uuid()
/// Returns the database UUID
pub fn uuid(&self) -> Uuid {
self.shared.config.read().database_uuid
}
/// Returns the info about the owning server if it has been loaded
@ -305,11 +300,6 @@ impl Database {
self.shared.state.read().owner_info()
}
/// Location in object store; may not actually exist yet
pub fn location(&self) -> String {
self.shared.config.read().location.clone()
}
/// Database name
pub fn name(&self) -> DatabaseName<'static> {
self.shared.config.read().name.clone()
@ -379,9 +369,9 @@ impl Database {
}
}
/// Returns the IoxObjectStore if it has been found
pub fn iox_object_store(&self) -> Option<Arc<IoxObjectStore>> {
self.shared.state.read().iox_object_store()
/// Returns the IoxObjectStore
pub fn iox_object_store(&self) -> Arc<IoxObjectStore> {
Arc::clone(&self.shared.iox_object_store)
}
/// Gets access to an initialized `Db`
@ -404,14 +394,11 @@ impl Database {
// the notification being fired, and this task waking up
match &**self.shared.state.read() {
DatabaseState::Known(_)
| DatabaseState::DatabaseObjectStoreFound(_)
| DatabaseState::OwnerInfoLoaded(_)
| DatabaseState::RulesLoaded(_)
| DatabaseState::CatalogLoaded(_) => {} // Non-terminal state
DatabaseState::Initialized(_) => return Ok(()),
DatabaseState::DatabaseObjectStoreLookupError(_, e)
| DatabaseState::NoActiveDatabase(_, e)
| DatabaseState::OwnerInfoLoadError(_, e)
DatabaseState::OwnerInfoLoadError(_, e)
| DatabaseState::RulesLoadError(_, e)
| DatabaseState::CatalogLoadError(_, e)
| DatabaseState::WriteBufferCreationError(_, e)
@ -427,29 +414,19 @@ impl Database {
pub async fn wipe_preserved_catalog(self: &Arc<Self>) -> Result<TaskTracker<Job>, Error> {
let db_name = self.name();
// TODO: Make IOxObjectStore immutable property of Database
let iox_object_store = match &**self.shared.state.read() {
DatabaseState::CatalogLoadError(rules_loaded, err) => {
warn!(%db_name, %err, "Requested wiping catalog in CatalogLoadError state");
Arc::clone(rules_loaded.iox_object_store())
}
DatabaseState::WriteBufferCreationError(catalog_loaded, err) => {
warn!(%db_name, %err, "Requested wiping catalog in WriteBufferCreationError state");
catalog_loaded.iox_object_store()
}
DatabaseState::ReplayError(catalog_loaded, err) => {
warn!(%db_name, %err, "Requested wiping catalog in ReplayError state");
catalog_loaded.iox_object_store()
}
match self.state_code() {
DatabaseStateCode::CatalogLoadError
| DatabaseStateCode::WriteBufferCreationError
| DatabaseStateCode::ReplayError => {}
state => {
return InvalidStateForWipePreservedCatalogSnafu {
db_name,
state: state.state_code(),
state,
expected: "CatalogLoadError, WriteBufferCreationError, ReplayError",
}
.fail()
}
};
}
// Shutdown database
self.shutdown();
@ -470,7 +447,7 @@ impl Database {
tokio::spawn(
async move {
// wipe the actual catalog
PreservedCatalog::wipe(&iox_object_store)
PreservedCatalog::wipe(&this.shared.iox_object_store)
.await
.map_err(Box::new)
.context(WipePreservedCatalogSnafu { db_name: &db_name })?;
@ -507,15 +484,6 @@ impl Database {
let shared = Arc::clone(&self.shared);
let db_name = self.name();
// TODO: Make IOxObjectStore immutable property of Database
let iox_object_store = self
.iox_object_store()
.context(InvalidStateForRebuildSnafu {
db_name: &db_name,
state: shared.state.read().state_code(),
expected: "Object store initialized",
})?;
// Shutdown database
self.shutdown();
let _ = self.join().await.log_if_error("rebuilding catalog");
@ -537,7 +505,7 @@ impl Database {
info!(%db_name, "rebuilding catalog from parquet files");
// Now wipe the catalog and rebuild it from parquet files
PreservedCatalog::wipe(iox_object_store.as_ref())
PreservedCatalog::wipe(&this.shared.iox_object_store)
.await
.map_err(Box::new)
.context(WipePreservedCatalogSnafu { db_name: &db_name })?;
@ -545,7 +513,7 @@ impl Database {
info!(%db_name, "wiped preserved catalog");
let config = PreservedCatalogConfig::new(
Arc::clone(&iox_object_store),
Arc::clone(&this.shared.iox_object_store),
db_name.to_string(),
Arc::clone(shared.application.time_provider()),
);
@ -821,6 +789,7 @@ mod tests {
};
use std::time::Duration;
use std::{num::NonZeroU32, time::Instant};
use test_helpers::assert_contains;
use uuid::Uuid;
use write_buffer::mock::MockBufferSharedState;
@ -866,13 +835,13 @@ mod tests {
let server_id = ServerId::try_from(1).unwrap();
let application = make_application();
let db_name = DatabaseName::new("test").unwrap();
let uuid = Uuid::new_v4();
let provided_rules = ProvidedDatabaseRules::new_empty(db_name.clone());
let name = DatabaseName::new("test").unwrap();
let database_uuid = Uuid::new_v4();
let provided_rules = ProvidedDatabaseRules::new_empty(name.clone());
let location = create_empty_db_in_object_store(
create_empty_db_in_object_store(
Arc::clone(&application),
uuid,
database_uuid,
provided_rules,
server_id,
)
@ -880,9 +849,9 @@ mod tests {
.unwrap();
let db_config = DatabaseConfig {
name: db_name,
location,
name,
server_id,
database_uuid,
wipe_catalog_on_error: false,
skip_replay: false,
};
@ -897,7 +866,7 @@ mod tests {
let server_id = database.shared.config.read().server_id;
let server_location =
IoxObjectStore::server_config_path(application.object_store(), server_id).to_string();
let iox_object_store = database.iox_object_store().unwrap();
let iox_object_store = database.iox_object_store();
database.release().await.unwrap();
@ -921,7 +890,7 @@ mod tests {
let server_id = database.shared.config.read().server_id;
let server_location =
IoxObjectStore::server_config_path(application.object_store(), server_id).to_string();
let iox_object_store = database.iox_object_store().unwrap();
let iox_object_store = database.iox_object_store();
let new_server_id = ServerId::try_from(2).unwrap();
let new_server_location =
IoxObjectStore::server_config_path(application.object_store(), new_server_id)
@ -985,16 +954,13 @@ mod tests {
assert!(database.is_initialized());
// Delete the rules
let iox_object_store = database.iox_object_store().unwrap();
let iox_object_store = database.iox_object_store();
iox_object_store.delete_database_rules_file().await.unwrap();
// Restart should fail
let err = database.restart().await.unwrap_err();
assert!(
matches!(err.as_ref(), InitError::DatabaseObjectStoreLookup { .. }),
"{:?}",
err
);
let err = database.restart().await.unwrap_err().to_string();
assert_contains!(&err, "error loading database rules");
assert_contains!(&err, "not found");
}
#[tokio::test]
@ -1033,7 +999,8 @@ mod tests {
..Default::default()
}),
};
let location = create_empty_db_in_object_store(
create_empty_db_in_object_store(
Arc::clone(&application),
uuid,
make_provided_rules(rules),
@ -1041,10 +1008,11 @@ mod tests {
)
.await
.unwrap();
let db_config = DatabaseConfig {
name: db_name,
location,
server_id,
database_uuid: uuid,
wipe_catalog_on_error: false,
skip_replay: false,
};
@ -1142,7 +1110,8 @@ mod tests {
..Default::default()
}),
};
let location = create_empty_db_in_object_store(
create_empty_db_in_object_store(
Arc::clone(&application),
uuid,
make_provided_rules(rules),
@ -1150,10 +1119,11 @@ mod tests {
)
.await
.unwrap();
let db_config = DatabaseConfig {
name: db_name,
location,
server_id,
database_uuid: uuid,
wipe_catalog_on_error: false,
skip_replay: false,
};
@ -1185,7 +1155,7 @@ mod tests {
#[tokio::test]
async fn database_init_recovery() {
let (application, database) = initialized_database().await;
let iox_object_store = database.iox_object_store().unwrap();
let iox_object_store = database.iox_object_store();
let config = database.shared.config.read().clone();
// shutdown first database

View File

@ -36,9 +36,6 @@ pub enum InitError {
source: iox_object_store::IoxObjectStoreError,
},
#[snafu(display("no active database found in object storage, not loading"))]
NoActiveDatabase,
#[snafu(display(
"Database name in deserialized rules ({}) does not match expected value ({})",
actual,
@ -106,15 +103,9 @@ pub enum InitError {
/// ```text
/// (start)
/// |
/// o-o o--------|-------------------o o-o
/// | V V V V V |
/// [NoActiveDatabase]<--[Known]------------->[DatabaseObjectStoreLookupError]
/// | | |
/// o----------------+---------------------------o
/// |
/// | o-o
/// V V |
/// [DatabaseObjectStoreFound]------>[OwnerInfoLoadError]
/// |----------------------------o o-o
/// V V V |
/// [Known]-------------->[OwnerInfoLoadError]
/// | |
/// +---------------------------o
/// |
@ -163,7 +154,6 @@ pub(crate) enum DatabaseState {
Shutdown(Option<Arc<JoinError>>),
// Basic initialization sequence states:
Known(DatabaseStateKnown),
DatabaseObjectStoreFound(DatabaseStateDatabaseObjectStoreFound),
OwnerInfoLoaded(DatabaseStateOwnerInfoLoaded),
RulesLoaded(DatabaseStateRulesLoaded),
CatalogLoaded(DatabaseStateCatalogLoaded),
@ -172,9 +162,7 @@ pub(crate) enum DatabaseState {
Initialized(DatabaseStateInitialized),
// Error states, we'll try to recover from them
NoActiveDatabase(DatabaseStateKnown, Arc<InitError>),
DatabaseObjectStoreLookupError(DatabaseStateKnown, Arc<InitError>),
OwnerInfoLoadError(DatabaseStateDatabaseObjectStoreFound, Arc<InitError>),
OwnerInfoLoadError(DatabaseStateKnown, Arc<InitError>),
RulesLoadError(DatabaseStateOwnerInfoLoaded, Arc<InitError>),
CatalogLoadError(DatabaseStateRulesLoaded, Arc<InitError>),
WriteBufferCreationError(DatabaseStateCatalogLoaded, Arc<InitError>),
@ -197,17 +185,10 @@ impl DatabaseState {
match self {
DatabaseState::Shutdown(_) => DatabaseStateCode::Shutdown,
DatabaseState::Known(_) => DatabaseStateCode::Known,
DatabaseState::DatabaseObjectStoreFound(_) => {
DatabaseStateCode::DatabaseObjectStoreFound
}
DatabaseState::OwnerInfoLoaded(_) => DatabaseStateCode::OwnerInfoLoaded,
DatabaseState::RulesLoaded(_) => DatabaseStateCode::RulesLoaded,
DatabaseState::CatalogLoaded(_) => DatabaseStateCode::CatalogLoaded,
DatabaseState::Initialized(_) => DatabaseStateCode::Initialized,
DatabaseState::DatabaseObjectStoreLookupError(_, _) => {
DatabaseStateCode::DatabaseObjectStoreLookupError
}
DatabaseState::NoActiveDatabase(_, _) => DatabaseStateCode::NoActiveDatabase,
DatabaseState::OwnerInfoLoadError(_, _) => DatabaseStateCode::OwnerInfoLoadError,
DatabaseState::RulesLoadError(_, _) => DatabaseStateCode::RulesLoadError,
DatabaseState::CatalogLoadError(_, _) => DatabaseStateCode::CatalogLoadError,
@ -222,14 +203,11 @@ impl DatabaseState {
match self {
DatabaseState::Known(_)
| DatabaseState::Shutdown(_)
| DatabaseState::DatabaseObjectStoreFound(_)
| DatabaseState::OwnerInfoLoaded(_)
| DatabaseState::RulesLoaded(_)
| DatabaseState::CatalogLoaded(_)
| DatabaseState::Initialized(_) => None,
DatabaseState::DatabaseObjectStoreLookupError(_, e)
| DatabaseState::NoActiveDatabase(_, e)
| DatabaseState::OwnerInfoLoadError(_, e)
DatabaseState::OwnerInfoLoadError(_, e)
| DatabaseState::RulesLoadError(_, e)
| DatabaseState::CatalogLoadError(_, e)
| DatabaseState::WriteBufferCreationError(_, e)
@ -241,9 +219,6 @@ impl DatabaseState {
match self {
DatabaseState::Known(_)
| DatabaseState::Shutdown(_)
| DatabaseState::DatabaseObjectStoreFound(_)
| DatabaseState::DatabaseObjectStoreLookupError(_, _)
| DatabaseState::NoActiveDatabase(_, _)
| DatabaseState::OwnerInfoLoaded(_)
| DatabaseState::OwnerInfoLoadError(_, _)
| DatabaseState::RulesLoadError(_, _) => None,
@ -257,33 +232,10 @@ impl DatabaseState {
}
}
pub(crate) fn uuid(&self) -> Option<Uuid> {
match self {
DatabaseState::Known(_)
| DatabaseState::Shutdown(_)
| DatabaseState::DatabaseObjectStoreFound(_)
| DatabaseState::DatabaseObjectStoreLookupError(_, _)
| DatabaseState::NoActiveDatabase(_, _)
| DatabaseState::OwnerInfoLoaded(_)
| DatabaseState::OwnerInfoLoadError(_, _)
| DatabaseState::RulesLoadError(_, _) => None,
DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => {
Some(state.uuid)
}
DatabaseState::CatalogLoaded(state)
| DatabaseState::WriteBufferCreationError(state, _)
| DatabaseState::ReplayError(state, _) => Some(state.uuid),
DatabaseState::Initialized(state) => Some(state.uuid),
}
}
pub(crate) fn owner_info(&self) -> Option<management::v1::OwnerInfo> {
match self {
DatabaseState::Known(_)
| DatabaseState::Shutdown(_)
| DatabaseState::DatabaseObjectStoreFound(_)
| DatabaseState::DatabaseObjectStoreLookupError(_, _)
| DatabaseState::NoActiveDatabase(_, _)
| DatabaseState::OwnerInfoLoadError(_, _)
| DatabaseState::RulesLoadError(_, _) => None,
DatabaseState::OwnerInfoLoaded(state) => Some(state.owner_info.clone()),
@ -297,29 +249,6 @@ impl DatabaseState {
}
}
pub(crate) fn iox_object_store(&self) -> Option<Arc<IoxObjectStore>> {
match self {
DatabaseState::Known(_)
| DatabaseState::Shutdown(_)
| DatabaseState::DatabaseObjectStoreLookupError(_, _)
| DatabaseState::NoActiveDatabase(_, _) => None,
DatabaseState::DatabaseObjectStoreFound(state)
| DatabaseState::OwnerInfoLoadError(state, _) => {
Some(Arc::clone(&state.iox_object_store))
}
DatabaseState::OwnerInfoLoaded(state) | DatabaseState::RulesLoadError(state, _) => {
Some(Arc::clone(&state.iox_object_store))
}
DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => {
Some(Arc::clone(&state.iox_object_store))
}
DatabaseState::CatalogLoaded(state)
| DatabaseState::WriteBufferCreationError(state, _)
| DatabaseState::ReplayError(state, _) => Some(state.db.iox_object_store()),
DatabaseState::Initialized(state) => Some(state.db.iox_object_store()),
}
}
/// Whether this is shutdown
pub(crate) fn is_shutdown(&self) -> bool {
matches!(self, DatabaseState::Shutdown(_))
@ -337,37 +266,12 @@ impl DatabaseState {
pub(crate) struct DatabaseStateKnown {}
impl DatabaseStateKnown {
/// Find active object storage for this database
async fn advance(
&self,
shared: &DatabaseShared,
) -> Result<DatabaseStateDatabaseObjectStoreFound, InitError> {
let location = shared.config.read().location.clone();
let iox_object_store = IoxObjectStore::load_at_root_path(
Arc::clone(shared.application.object_store()),
&location,
)
.await
.context(DatabaseObjectStoreLookupSnafu)?;
Ok(DatabaseStateDatabaseObjectStoreFound {
iox_object_store: Arc::new(iox_object_store),
})
}
}
#[derive(Debug, Clone)]
pub(crate) struct DatabaseStateDatabaseObjectStoreFound {
iox_object_store: Arc<IoxObjectStore>,
}
impl DatabaseStateDatabaseObjectStoreFound {
/// Load owner info from object storage and verify it matches the current owner
async fn advance(
&self,
shared: &DatabaseShared,
) -> Result<DatabaseStateOwnerInfoLoaded, InitError> {
let owner_info = fetch_owner_info(&self.iox_object_store)
let owner_info = fetch_owner_info(&shared.iox_object_store)
.await
.context(FetchingOwnerInfoSnafu)?;
@ -380,17 +284,13 @@ impl DatabaseStateDatabaseObjectStoreFound {
.fail();
}
Ok(DatabaseStateOwnerInfoLoaded {
owner_info,
iox_object_store: Arc::clone(&self.iox_object_store),
})
Ok(DatabaseStateOwnerInfoLoaded { owner_info })
}
}
#[derive(Debug, Clone)]
pub(crate) struct DatabaseStateOwnerInfoLoaded {
owner_info: management::v1::OwnerInfo,
iox_object_store: Arc<IoxObjectStore>,
}
impl DatabaseStateOwnerInfoLoaded {
@ -399,7 +299,7 @@ impl DatabaseStateOwnerInfoLoaded {
&self,
shared: &DatabaseShared,
) -> Result<DatabaseStateRulesLoaded, InitError> {
let rules = PersistedDatabaseRules::load(&self.iox_object_store)
let rules = PersistedDatabaseRules::load(&shared.iox_object_store)
.await
.context(LoadingRulesSnafu)?;
@ -416,7 +316,6 @@ impl DatabaseStateOwnerInfoLoaded {
provided_rules: rules.provided_rules(),
uuid: rules.uuid(),
owner_info: self.owner_info.clone(),
iox_object_store: Arc::clone(&self.iox_object_store),
})
}
}
@ -426,14 +325,9 @@ pub(crate) struct DatabaseStateRulesLoaded {
provided_rules: Arc<ProvidedDatabaseRules>,
uuid: Uuid,
owner_info: management::v1::OwnerInfo,
iox_object_store: Arc<IoxObjectStore>,
}
impl DatabaseStateRulesLoaded {
pub(crate) fn iox_object_store(&self) -> &Arc<IoxObjectStore> {
&self.iox_object_store
}
/// Load catalog from object storage
async fn advance(
&self,
@ -450,7 +344,7 @@ impl DatabaseStateRulesLoaded {
};
let (preserved_catalog, catalog, replay_plan) = load_or_create_preserved_catalog(
db_name.as_str(),
Arc::clone(&self.iox_object_store),
Arc::clone(&shared.iox_object_store),
Arc::clone(shared.application.metric_registry()),
Arc::clone(shared.application.time_provider()),
wipe_catalog_on_error,
@ -461,7 +355,7 @@ impl DatabaseStateRulesLoaded {
let database_to_commit = DatabaseToCommit {
server_id,
iox_object_store: Arc::clone(&self.iox_object_store),
iox_object_store: Arc::clone(&shared.iox_object_store),
exec: Arc::clone(shared.application.executor()),
rules: Arc::clone(self.provided_rules.rules()),
preserved_catalog,
@ -499,10 +393,6 @@ pub(crate) struct DatabaseStateCatalogLoaded {
}
impl DatabaseStateCatalogLoaded {
pub(crate) fn iox_object_store(&self) -> Arc<IoxObjectStore> {
self.db.iox_object_store()
}
/// Perform replay
async fn advance(
&self,
@ -564,7 +454,6 @@ impl DatabaseStateCatalogLoaded {
provided_rules: Arc::clone(&self.provided_rules),
uuid: self.uuid,
owner_info: self.owner_info.clone(),
iox_object_store: self.db.iox_object_store(),
}
}
}
@ -629,20 +518,12 @@ pub(crate) async fn initialize_database(shared: &DatabaseShared, shutdown: Cance
// Try to advance to the next state
let next_state = match state {
DatabaseState::Known(state)
| DatabaseState::DatabaseObjectStoreLookupError(state, _)
| DatabaseState::NoActiveDatabase(state, _) => match state.advance(shared).await {
Ok(state) => DatabaseState::DatabaseObjectStoreFound(state),
Err(InitError::NoActiveDatabase) => {
DatabaseState::NoActiveDatabase(state, Arc::new(InitError::NoActiveDatabase))
DatabaseState::Known(state) | DatabaseState::OwnerInfoLoadError(state, _) => {
match state.advance(shared).await {
Ok(state) => DatabaseState::OwnerInfoLoaded(state),
Err(e) => DatabaseState::OwnerInfoLoadError(state, Arc::new(e)),
}
Err(e) => DatabaseState::DatabaseObjectStoreLookupError(state, Arc::new(e)),
},
DatabaseState::DatabaseObjectStoreFound(state)
| DatabaseState::OwnerInfoLoadError(state, _) => match state.advance(shared).await {
Ok(state) => DatabaseState::OwnerInfoLoaded(state),
Err(e) => DatabaseState::OwnerInfoLoadError(state, Arc::new(e)),
},
}
DatabaseState::OwnerInfoLoaded(state) | DatabaseState::RulesLoadError(state, _) => {
match state.advance(shared).await {
Ok(state) => DatabaseState::RulesLoaded(state),

View File

@ -2,9 +2,11 @@
use crate::ApplicationState;
use data_types::{server_id::ServerId, DatabaseName};
use internal_types::freezable::Freezable;
use iox_object_store::IoxObjectStore;
use parking_lot::RwLock;
use std::sync::Arc;
use tokio::sync::Notify;
use uuid::Uuid;
use super::init::DatabaseState;
@ -13,8 +15,8 @@ use super::init::DatabaseState;
/// and how to perform startup activities.
pub struct DatabaseConfig {
pub name: DatabaseName<'static>,
pub location: String,
pub server_id: ServerId,
pub database_uuid: Uuid,
pub wipe_catalog_on_error: bool,
pub skip_replay: bool,
}
@ -28,6 +30,9 @@ pub(crate) struct DatabaseShared {
/// Application-global state
pub(crate) application: Arc<ApplicationState>,
/// Database object store
pub(crate) iox_object_store: Arc<IoxObjectStore>,
/// The initialization state of the `Database`, wrapped in a
/// `Freezable` to ensure there is only one task with an
/// outstanding intent to write at any time.

View File

@ -286,6 +286,15 @@ pub enum InitError {
#[snafu(display("error persisting initial server config to object storage: {}", source))]
PersistInitialServerConfig { source: object_store::Error },
#[snafu(display("invalid database name in server config: {}", source))]
InvalidDatabase { source: DatabaseNameError },
#[snafu(display(
"invalid database uuid in server config while finding location: {}",
source
))]
InvalidDatabaseLocation { source: uuid::Error },
}
/// The stage of the server in the startup process
@ -348,6 +357,75 @@ struct ServerStateInitReady {
server_id: ServerId,
}
impl ServerStateInitReady {
/// Parse the UUID from an object storage path
///
/// TODO: Encode this data directly in server config
fn parse_location(location: &str) -> Result<Uuid, InitError> {
// Strip trailing / if any
let location = location.strip_suffix('/').unwrap_or(location);
let uuid = location.rsplit('/').next().unwrap();
std::str::FromStr::from_str(uuid).context(InvalidDatabaseLocationSnafu)
}
async fn advance(&self, shared: &ServerShared) -> Result<ServerStateInitialized, InitError> {
let fetch_result = IoxObjectStore::get_server_config_file(
shared.application.object_store(),
self.server_id,
)
.await;
let server_config_bytes = match fetch_result {
Ok(bytes) => bytes,
// If this is the first time starting up this server and there is no config file yet,
// this isn't a problem. Start an empty server config.
Err(object_store::Error::NotFound { .. }) => bytes::Bytes::new(),
Err(source) => return Err(InitError::GetServerConfig { source }),
};
let server_config =
generated_types::server_config::decode_persisted_server_config(server_config_bytes)
.map_err(|source| InitError::DeserializeServerConfig { source })?;
let databases = server_config
.databases
.into_iter()
.map(|(name, location)| {
let database_name = DatabaseName::new(name).context(InvalidDatabaseSnafu)?;
let database_uuid = Self::parse_location(&location)?;
let database = Database::new(
Arc::clone(&shared.application),
DatabaseConfig {
name: database_name.clone(),
database_uuid,
server_id: self.server_id,
wipe_catalog_on_error: self.wipe_catalog_on_error,
skip_replay: self.skip_replay_and_seek_instead,
},
);
Ok((database_name, Arc::new(database)))
})
.collect::<Result<_, InitError>>()?;
let next_state = ServerStateInitialized {
server_id: self.server_id,
databases,
};
IoxObjectStore::put_server_config_file(
shared.application.object_store(),
self.server_id,
next_state.server_config(),
)
.await
.map_err(|source| InitError::PersistInitialServerConfig { source })?;
Ok(next_state)
}
}
#[derive(Debug)]
struct ServerStateInitialized {
server_id: ServerId,
@ -366,36 +444,19 @@ impl ServerStateInitialized {
shared: &ServerShared,
config: DatabaseConfig,
) -> Result<&Arc<Database>> {
use hashbrown::hash_map::Entry;
let db_name = config.name.clone();
let database = match self.databases.entry(db_name.clone()) {
hashbrown::hash_map::Entry::Vacant(vacant) => vacant.insert(Arc::new(Database::new(
match self.databases.entry(db_name.clone()) {
Entry::Vacant(vacant) => Ok(vacant.insert(Arc::new(Database::new(
Arc::clone(&shared.application),
config,
))),
hashbrown::hash_map::Entry::Occupied(mut existing) => {
if let Some(init_error) = existing.get().init_error() {
if matches!(&*init_error, database::init::InitError::NoActiveDatabase) {
existing.insert(Arc::new(Database::new(
Arc::clone(&shared.application),
config,
)));
existing.into_mut()
} else {
return DatabaseAlreadyExistsSnafu {
db_name: config.name,
}
.fail();
}
} else {
return DatabaseAlreadyExistsSnafu {
db_name: config.name,
}
.fail();
}
)))),
Entry::Occupied(_) => DatabaseAlreadyExistsSnafu {
db_name: config.name,
}
};
Ok(database)
.fail(),
}
}
/// Serialize the list of databases this server owns with their names and object storage
@ -405,7 +466,7 @@ impl ServerStateInitialized {
databases: self
.databases
.iter()
.map(|(name, database)| (name.to_string(), database.location()))
.map(|(name, database)| (name.to_string(), database.iox_object_store().root_path()))
.collect(),
};
@ -578,15 +639,14 @@ impl Server {
initialized.server_id
};
let res = create_empty_db_in_object_store(
create_empty_db_in_object_store(
Arc::clone(&self.shared.application),
uuid,
rules,
server_id,
)
.await;
let location = res.context(CannotCreateDatabaseSnafu)?;
.await
.context(CannotCreateDatabaseSnafu)?;
let database = {
let mut state = self.shared.state.write();
@ -600,8 +660,8 @@ impl Server {
&self.shared,
DatabaseConfig {
name: db_name,
location,
server_id,
database_uuid: uuid,
wipe_catalog_on_error: false,
skip_replay: false,
},
@ -632,9 +692,7 @@ impl Server {
let handle = handle_fut.await;
let database = self.database(db_name)?;
let current = database
.uuid()
.expect("Previous line should return not found if the database is inactive");
let current = database.uuid();
// If a UUID has been specified, it has to match this database's UUID
// Should this check be here or in database.release?
@ -705,7 +763,7 @@ impl Server {
// Check that this name is unique among currently active databases
if let Ok(existing_db) = self.database(&db_name) {
if matches!(existing_db.uuid(), Some(existing_uuid) if existing_uuid == uuid) {
if existing_db.uuid() == uuid {
return DatabaseAlreadyOwnedByThisServerSnafu { uuid }.fail();
} else {
return DatabaseAlreadyExistsSnafu { db_name }.fail();
@ -714,7 +772,7 @@ impl Server {
// Mark the database as claimed in object storage and get its location for the server
// config file
let location = claim_database_in_object_store(
claim_database_in_object_store(
Arc::clone(&self.shared.application),
&db_name,
uuid,
@ -736,8 +794,8 @@ impl Server {
&self.shared,
DatabaseConfig {
name: db_name.clone(),
location,
server_id,
database_uuid: uuid,
wipe_catalog_on_error: false,
skip_replay: false,
},
@ -933,86 +991,10 @@ async fn maybe_initialize_server(shared: &ServerShared) {
(init_ready, handle)
};
let maybe_databases = IoxObjectStore::get_server_config_file(
shared.application.object_store(),
init_ready.server_id,
)
.await
.or_else(|e| match e {
// If this is the first time starting up this server and there is no config file yet,
// this isn't a problem. Start an empty server config.
object_store::Error::NotFound { .. } => Ok(bytes::Bytes::new()),
// Any other error is a problem.
_ => Err(InitError::GetServerConfig { source: e }),
})
.and_then(|config| {
generated_types::server_config::decode_persisted_server_config(config)
.map_err(|e| InitError::DeserializeServerConfig { source: e })
})
.map(|server_config| {
server_config
.databases
.into_iter()
.map(|(name, location)| {
(
DatabaseName::new(name).expect("serialized db names should be valid"),
location,
)
})
.collect::<Vec<_>>()
});
let next_state = match maybe_databases {
Ok(databases) => {
let mut state = ServerStateInitialized {
server_id: init_ready.server_id,
databases: HashMap::with_capacity(databases.len()),
};
for (db_name, location) in databases {
state
.new_database(
shared,
DatabaseConfig {
name: db_name,
location,
server_id: init_ready.server_id,
wipe_catalog_on_error: init_ready.wipe_catalog_on_error,
skip_replay: init_ready.skip_replay_and_seek_instead,
},
)
.expect("database unique");
}
let bytes = state.server_config();
let config_written = IoxObjectStore::put_server_config_file(
shared.application.object_store(),
init_ready.server_id,
bytes,
)
.await;
match config_written {
Ok(_) => {
info!(server_id=%init_ready.server_id, "server initialized");
ServerState::Initialized(state)
}
Err(e) => {
error!(
server_id=%init_ready.server_id,
%e,
"error persisting initial server config to object storage"
);
ServerState::InitError(
init_ready,
Arc::new(InitError::PersistInitialServerConfig { source: e }),
)
}
}
}
let next_state = match init_ready.advance(shared).await {
Ok(initialized) => ServerState::Initialized(initialized),
Err(e) => {
error!(server_id=%init_ready.server_id, %e, "error initializing server");
error!(%e, "error attempting to initialize server");
ServerState::InitError(init_ready, Arc::new(e))
}
};
@ -1258,7 +1240,7 @@ mod tests {
.await
.expect("failed to create database");
let iox_object_store = bananas.iox_object_store().unwrap();
let iox_object_store = bananas.iox_object_store();
let read_rules = PersistedDatabaseRules::load(&iox_object_store)
.await
.unwrap();
@ -1290,7 +1272,7 @@ mod tests {
.create_database(provided_rules2)
.await
.expect("failed to create 2nd db");
let awesome_uuid = awesome.uuid().unwrap();
let awesome_uuid = awesome.uuid();
// assert server config file exists and has 2 entries
let config = server_config(application.object_store(), server_id).await;
@ -1387,7 +1369,7 @@ mod tests {
let bananas = create_simple_database(&server, "bananas")
.await
.expect("failed to create database");
let bananas_uuid = bananas.uuid().unwrap();
let bananas_uuid = bananas.uuid();
assert!(bananas.is_initialized());
@ -1409,11 +1391,11 @@ mod tests {
.expect("failed to create database");
assert!(apples.is_initialized());
let apples_uuid = apples.uuid().unwrap();
let apples_uuid = apples.uuid();
assert_eq!(server.db_names_sorted(), vec!["apples", "bananas"]);
let bananas_object_store = bananas.iox_object_store().unwrap();
let bananas_object_store = bananas.iox_object_store();
// Shutdown server to demonstrate that the server shutdown
// causes the databases to shutdown
@ -1453,134 +1435,10 @@ mod tests {
assert!(apples.init_error().is_none());
let err = bananas.wait_for_init().await.unwrap_err();
assert_contains!(err.to_string(), "No rules found to load");
assert_contains!(err.to_string(), "rules.pb not found");
assert!(Arc::ptr_eq(&err, &bananas.init_error().unwrap()));
}
#[tokio::test]
async fn old_database_object_store_path() {
let application = make_application();
let server = make_server(Arc::clone(&application));
let server_id = ServerId::try_from(1).unwrap();
let object_store = application.object_store();
// Databases used to be stored under the server ID. Construct a database in the old
// location and list that in the serialized server config.
let old_loc_db_uuid = Uuid::new_v4();
let old_loc_db_name = DatabaseName::new("old").unwrap();
// Construct path in the old database location containing server ID
let mut old_root = object_store.new_path();
old_root.push_dir(&server_id.to_string());
old_root.push_dir(&old_loc_db_uuid.to_string());
// Write out a database owner file in the old location
let mut old_owner_path = old_root.clone();
old_owner_path.set_file_name("owner.pb");
let owner_info = management::v1::OwnerInfo {
id: server_id.get_u32(),
location: IoxObjectStore::server_config_path(object_store, server_id).to_string(),
transactions: vec![],
};
let mut encoded_owner_info = bytes::BytesMut::new();
generated_types::server_config::encode_database_owner_info(
&owner_info,
&mut encoded_owner_info,
)
.expect("owner info serialization should be valid");
let encoded_owner_info = encoded_owner_info.freeze();
object_store
.put(&old_owner_path, encoded_owner_info)
.await
.unwrap();
// Write out a database rules file in the old location
let mut old_db_rules_path = old_root.clone();
old_db_rules_path.set_file_name("rules.pb");
let rules = management::v1::DatabaseRules {
name: old_loc_db_name.to_string(),
..Default::default()
};
let persisted_database_rules = management::v1::PersistedDatabaseRules {
uuid: old_loc_db_uuid.as_bytes().to_vec(),
rules: Some(rules),
};
let mut encoded_rules = bytes::BytesMut::new();
generated_types::database_rules::encode_persisted_database_rules(
&persisted_database_rules,
&mut encoded_rules,
)
.unwrap();
let encoded_rules = encoded_rules.freeze();
object_store
.put(&old_db_rules_path, encoded_rules)
.await
.unwrap();
// Write out the server config with the database name and pointing to the old location
let old_location = old_root.to_raw().to_string();
let server_config = management::v1::ServerConfig {
databases: [(old_loc_db_name.to_string(), old_location.clone())]
.into_iter()
.collect(),
};
let mut encoded_server_config = bytes::BytesMut::new();
generated_types::server_config::encode_persisted_server_config(
&server_config,
&mut encoded_server_config,
)
.unwrap();
let encoded_server_config = encoded_server_config.freeze();
IoxObjectStore::put_server_config_file(object_store, server_id, encoded_server_config)
.await
.unwrap();
// The server should initialize successfully
server.set_id(server_id).unwrap();
server.wait_for_init().await.unwrap();
// The database should initialize successfully
let old_loc_db = server.database(&old_loc_db_name).unwrap();
old_loc_db.wait_for_init().await.unwrap();
// Database rules can be updated
let mut new_rules = DatabaseRules::new(old_loc_db_name.clone());
new_rules.worker_cleanup_avg_sleep = Duration::from_secs(22);
server
.update_db_rules(make_provided_rules(new_rules.clone()))
.await
.unwrap();
let updated = old_loc_db.provided_rules().unwrap();
assert_eq!(
updated.rules().worker_cleanup_avg_sleep,
new_rules.worker_cleanup_avg_sleep
);
// Location remains the same
assert_eq!(old_loc_db.location(), old_location);
// New databases are created in the current database location, `dbs`
let new_loc_db_name = DatabaseName::new("new").unwrap();
let new_loc_rules = DatabaseRules::new(new_loc_db_name.clone());
let new_loc_db = server
.create_database(make_provided_rules(new_loc_rules))
.await
.unwrap();
let new_loc_db_uuid = new_loc_db.uuid().unwrap();
assert_eq!(new_loc_db.location(), format!("dbs/{}/", new_loc_db_uuid));
// Restarting the server with a database in the "old" location and a database in the "new"
// location works
std::mem::drop(server);
let server = make_server(Arc::clone(&application));
server.set_id(ServerId::try_from(1).unwrap()).unwrap();
server.wait_for_init().await.unwrap();
let old_loc_db = server.database(&old_loc_db_name).unwrap();
old_loc_db.wait_for_init().await.unwrap();
let new_loc_db = server.database(&new_loc_db_name).unwrap();
new_loc_db.wait_for_init().await.unwrap();
}
#[tokio::test]
async fn old_server_config_object_store_path() {
let application = make_application();
@ -1847,7 +1705,7 @@ mod tests {
.expect("failed to create database");
// make the db rules for bar invalid
let iox_object_store = bar.iox_object_store().unwrap();
let iox_object_store = bar.iox_object_store();
iox_object_store
.put_database_rules_file(Bytes::from("x"))
@ -1861,7 +1719,7 @@ mod tests {
.expect("failed to create database");
// make the owner info for baz say it's owned by a different server
let baz_iox_object_store = baz.iox_object_store().unwrap();
let baz_iox_object_store = baz.iox_object_store();
let owner_info = management::v1::OwnerInfo {
id: 2,
location: "nodes/2/config.pb".to_string(),
@ -1990,7 +1848,7 @@ mod tests {
// create database
let foo = create_simple_database(&server, &foo_db_name).await.unwrap();
let first_foo_uuid = foo.uuid().unwrap();
let first_foo_uuid = foo.uuid();
// release database by name
let released_uuid = server.release_database(&foo_db_name, None).await.unwrap();
@ -2006,7 +1864,7 @@ mod tests {
// create another database
let foo = create_simple_database(&server, &foo_db_name).await.unwrap();
let second_foo_uuid = foo.uuid().unwrap();
let second_foo_uuid = foo.uuid();
// release database specifying UUID; error if UUID doesn't match
let incorrect_uuid = Uuid::new_v4();
@ -2107,7 +1965,7 @@ mod tests {
let database = create_simple_database(&server1, &foo_db_name)
.await
.unwrap();
let uuid = database.uuid().unwrap();
let uuid = database.uuid();
// start server 2
let server2 = make_server(Arc::clone(&application));
@ -2200,13 +2058,12 @@ mod tests {
// tamper store to break one database
rules_broken
.iox_object_store()
.unwrap()
.put_database_rules_file(Bytes::from("x"))
.await
.unwrap();
let config = PreservedCatalogConfig::new(
catalog_broken.iox_object_store().unwrap(),
catalog_broken.iox_object_store(),
db_name_catalog_broken.to_string(),
Arc::clone(application.time_provider()),
);
@ -2218,7 +2075,6 @@ mod tests {
rules_broken
.iox_object_store()
.unwrap()
.get_database_rules_file()
.await
.unwrap();
@ -2268,11 +2124,9 @@ mod tests {
"error wiping preserved catalog: database (db_existing) in invalid state (Initialized) \
for wiping preserved catalog. Expected CatalogLoadError, WriteBufferCreationError, ReplayError"
);
assert!(
PreservedCatalog::exists(&existing.iox_object_store().unwrap())
.await
.unwrap()
);
assert!(PreservedCatalog::exists(&existing.iox_object_store())
.await
.unwrap());
// 2. cannot wipe non-existent DB
assert!(matches!(
@ -2336,11 +2190,9 @@ mod tests {
database.wait_for_init().await.unwrap();
assert!(
PreservedCatalog::exists(&catalog_broken.iox_object_store().unwrap())
.await
.unwrap()
);
assert!(PreservedCatalog::exists(&catalog_broken.iox_object_store())
.await
.unwrap());
assert!(database.init_error().is_none());
let db = server.db(&db_name_catalog_broken).unwrap();
@ -2363,11 +2215,9 @@ mod tests {
"error wiping preserved catalog: database (db_created) in invalid state (Initialized) \
for wiping preserved catalog. Expected CatalogLoadError, WriteBufferCreationError, ReplayError"
);
assert!(
PreservedCatalog::exists(&created.iox_object_store().unwrap())
.await
.unwrap()
);
assert!(PreservedCatalog::exists(&created.iox_object_store())
.await
.unwrap());
}
fn default_rules(db_name: DatabaseName<'static>) -> ProvidedDatabaseRules {

View File

@ -196,7 +196,7 @@ async fn delete_predicate_preservation() {
database.restart().await.unwrap();
// ==================== do: remove checkpoint files ====================
let iox_object_store = database.iox_object_store().unwrap();
let iox_object_store = database.iox_object_store();
let files = iox_object_store
.catalog_transaction_files()

View File

@ -131,7 +131,7 @@ async fn write_buffer_lifecycle() {
let database = databases.into_iter().next().unwrap();
database.wait_for_init().await.unwrap();
let database_uuid = database.uuid().unwrap();
let database_uuid = database.uuid();
let db = database.initialized_db().unwrap();
let batches = run_query(Arc::clone(&db), "select sum(bar) as n from table_1").await;