parent
910f381355
commit
b7a4ef46dd
|
@ -787,6 +787,7 @@ mod tests {
|
|||
sequence::Sequence,
|
||||
write_buffer::WriteBufferConnection,
|
||||
};
|
||||
use object_store::{ObjectStore, ObjectStoreIntegration, ThrottleConfig};
|
||||
use std::time::Duration;
|
||||
use std::{num::NonZeroU32, time::Instant};
|
||||
use test_helpers::assert_contains;
|
||||
|
@ -963,6 +964,59 @@ mod tests {
|
|||
assert_contains!(&err, "not found");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn database_abort() {
|
||||
test_helpers::maybe_start_logging();
|
||||
|
||||
// Create a throttled object store that will stall the init process
|
||||
let throttle_config = ThrottleConfig {
|
||||
wait_get_per_call: Duration::from_secs(100),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let store = Arc::new(ObjectStore::new_in_memory_throttled(throttle_config));
|
||||
let application = Arc::new(ApplicationState::new(Arc::clone(&store), None, None));
|
||||
|
||||
let db_config = DatabaseConfig {
|
||||
name: DatabaseName::new("test").unwrap(),
|
||||
database_uuid: Uuid::new_v4(),
|
||||
server_id: ServerId::try_from(1).unwrap(),
|
||||
wipe_catalog_on_error: false,
|
||||
skip_replay: false,
|
||||
};
|
||||
|
||||
let database = Database::new(Arc::clone(&application), db_config.clone());
|
||||
|
||||
// Should fail to initialize in a timely manner
|
||||
tokio::time::timeout(Duration::from_millis(10), database.wait_for_init())
|
||||
.await
|
||||
.expect_err("should timeout");
|
||||
|
||||
assert_eq!(database.state_code(), DatabaseStateCode::Known);
|
||||
|
||||
database.shutdown();
|
||||
database.join().await.unwrap();
|
||||
|
||||
assert_eq!(database.state_code(), DatabaseStateCode::Shutdown);
|
||||
|
||||
// Disable throttling
|
||||
match &store.integration {
|
||||
ObjectStoreIntegration::InMemoryThrottled(s) => {
|
||||
s.config_mut(|c| *c = Default::default())
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
|
||||
// Restart should recover from aborted state, but will now error due to missing config
|
||||
let error = tokio::time::timeout(Duration::from_secs(1), database.restart())
|
||||
.await
|
||||
.expect("no timeout")
|
||||
.unwrap_err()
|
||||
.to_string();
|
||||
|
||||
assert_contains!(error, "error getting database owner info");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn skip_replay() {
|
||||
// create write buffer
|
||||
|
|
|
@ -260,6 +260,55 @@ impl DatabaseState {
|
|||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Try to advance to the next state
|
||||
///
|
||||
/// # Panic
|
||||
///
|
||||
/// Panics if the database cannot be advanced (already initialized or shutdown)
|
||||
async fn advance(self, shared: &DatabaseShared) -> Self {
|
||||
match self {
|
||||
Self::Known(state) | Self::OwnerInfoLoadError(state, _) => {
|
||||
match state.advance(shared).await {
|
||||
Ok(state) => Self::OwnerInfoLoaded(state),
|
||||
Err(e) => Self::OwnerInfoLoadError(state, Arc::new(e)),
|
||||
}
|
||||
}
|
||||
Self::OwnerInfoLoaded(state) | Self::RulesLoadError(state, _) => {
|
||||
match state.advance(shared).await {
|
||||
Ok(state) => Self::RulesLoaded(state),
|
||||
Err(e) => Self::RulesLoadError(state, Arc::new(e)),
|
||||
}
|
||||
}
|
||||
Self::RulesLoaded(state) | Self::CatalogLoadError(state, _) => {
|
||||
match state.advance(shared).await {
|
||||
Ok(state) => Self::CatalogLoaded(state),
|
||||
Err(e) => Self::CatalogLoadError(state, Arc::new(e)),
|
||||
}
|
||||
}
|
||||
Self::CatalogLoaded(state) | Self::WriteBufferCreationError(state, _) => {
|
||||
match state.advance(shared).await {
|
||||
Ok(state) => Self::Initialized(state),
|
||||
Err(e @ InitError::CreateWriteBuffer { .. }) => {
|
||||
Self::WriteBufferCreationError(state, Arc::new(e))
|
||||
}
|
||||
Err(e) => Self::ReplayError(state, Arc::new(e)),
|
||||
}
|
||||
}
|
||||
Self::ReplayError(state, _) => {
|
||||
let state2 = state.rollback();
|
||||
match state2.advance(shared).await {
|
||||
Ok(state2) => match state2.advance(shared).await {
|
||||
Ok(state2) => Self::Initialized(state2),
|
||||
Err(e) => Self::ReplayError(state, Arc::new(e)),
|
||||
},
|
||||
Err(e) => Self::ReplayError(state, Arc::new(e)),
|
||||
}
|
||||
}
|
||||
Self::Initialized(_) => unreachable!(),
|
||||
Self::Shutdown(_) => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
@ -516,51 +565,23 @@ pub(crate) async fn initialize_database(shared: &DatabaseShared, shutdown: Cance
|
|||
|
||||
info!(%db_name, %state, "attempting to advance database initialization state");
|
||||
|
||||
match &state {
|
||||
DatabaseState::Initialized(_) => break,
|
||||
DatabaseState::Shutdown(_) => {
|
||||
info!(%db_name, "database in shutdown - aborting initialization");
|
||||
shutdown.cancel();
|
||||
return;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// Try to advance to the next state
|
||||
let next_state = match state {
|
||||
DatabaseState::Known(state) | DatabaseState::OwnerInfoLoadError(state, _) => {
|
||||
match state.advance(shared).await {
|
||||
Ok(state) => DatabaseState::OwnerInfoLoaded(state),
|
||||
Err(e) => DatabaseState::OwnerInfoLoadError(state, Arc::new(e)),
|
||||
}
|
||||
let next_state = tokio::select! {
|
||||
next_state = state.advance(shared) => next_state,
|
||||
_ = shutdown.cancelled() => {
|
||||
info!(%db_name, "initialization aborted by shutdown");
|
||||
return
|
||||
}
|
||||
DatabaseState::OwnerInfoLoaded(state) | DatabaseState::RulesLoadError(state, _) => {
|
||||
match state.advance(shared).await {
|
||||
Ok(state) => DatabaseState::RulesLoaded(state),
|
||||
Err(e) => DatabaseState::RulesLoadError(state, Arc::new(e)),
|
||||
}
|
||||
}
|
||||
DatabaseState::RulesLoaded(state) | DatabaseState::CatalogLoadError(state, _) => {
|
||||
match state.advance(shared).await {
|
||||
Ok(state) => DatabaseState::CatalogLoaded(state),
|
||||
Err(e) => DatabaseState::CatalogLoadError(state, Arc::new(e)),
|
||||
}
|
||||
}
|
||||
DatabaseState::CatalogLoaded(state)
|
||||
| DatabaseState::WriteBufferCreationError(state, _) => {
|
||||
match state.advance(shared).await {
|
||||
Ok(state) => DatabaseState::Initialized(state),
|
||||
Err(e @ InitError::CreateWriteBuffer { .. }) => {
|
||||
DatabaseState::WriteBufferCreationError(state, Arc::new(e))
|
||||
}
|
||||
Err(e) => DatabaseState::ReplayError(state, Arc::new(e)),
|
||||
}
|
||||
}
|
||||
DatabaseState::ReplayError(state, _) => {
|
||||
let state2 = state.rollback();
|
||||
match state2.advance(shared).await {
|
||||
Ok(state2) => match state2.advance(shared).await {
|
||||
Ok(state2) => DatabaseState::Initialized(state2),
|
||||
Err(e) => DatabaseState::ReplayError(state, Arc::new(e)),
|
||||
},
|
||||
Err(e) => DatabaseState::ReplayError(state, Arc::new(e)),
|
||||
}
|
||||
}
|
||||
DatabaseState::Initialized(_) => {
|
||||
// Already initialized
|
||||
break;
|
||||
}
|
||||
DatabaseState::Shutdown(_) => unreachable!(),
|
||||
};
|
||||
|
||||
let state_code = next_state.state_code();
|
||||
|
|
Loading…
Reference in New Issue