fix: retry write buffer creation during database init
parent
8f098d3ca1
commit
ef9c49087e
|
@ -884,6 +884,11 @@ async fn initialize_database(shared: &DatabaseShared) {
|
|||
},
|
||||
DatabaseState::CatalogLoaded(state) => match state.advance(shared).await {
|
||||
Ok(state) => DatabaseState::Initialized(state),
|
||||
Err(InitError::CreateWriteBuffer { source }) => {
|
||||
info!(%db_name, e=%source, "cannot create write buffer, wait a bit and try again");
|
||||
tokio::time::sleep(INIT_BACKOFF).await;
|
||||
DatabaseState::CatalogLoaded(state)
|
||||
}
|
||||
Err(e) => DatabaseState::ReplayError(state, Arc::new(e)),
|
||||
},
|
||||
state => unreachable!("{:?}", state),
|
||||
|
@ -1806,6 +1811,69 @@ mod tests {
|
|||
database.join().await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn write_buffer_creation_error() {
|
||||
// ensure that we're retrying write buffer creation (e.g. after connection errors or cluster issues)
|
||||
|
||||
// setup application
|
||||
let application = make_application();
|
||||
application.write_buffer_factory();
|
||||
|
||||
let server_id = ServerId::try_from(1).unwrap();
|
||||
|
||||
// setup DB
|
||||
let db_name = DatabaseName::new("test_db").unwrap();
|
||||
let uuid = Uuid::new_v4();
|
||||
let rules = data_types::database_rules::DatabaseRules {
|
||||
name: db_name.clone(),
|
||||
lifecycle_rules: Default::default(),
|
||||
partition_template: Default::default(),
|
||||
worker_cleanup_avg_sleep: Duration::from_secs(2),
|
||||
write_buffer_connection: Some(WriteBufferConnection {
|
||||
type_: "mock".to_string(),
|
||||
connection: "my_mock".to_string(),
|
||||
..Default::default()
|
||||
}),
|
||||
};
|
||||
let location = Database::create(
|
||||
Arc::clone(&application),
|
||||
uuid,
|
||||
make_provided_rules(rules),
|
||||
server_id,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let db_config = DatabaseConfig {
|
||||
name: db_name,
|
||||
location,
|
||||
server_id,
|
||||
wipe_catalog_on_error: false,
|
||||
skip_replay: false,
|
||||
};
|
||||
let database = Arc::new(Database::new(Arc::clone(&application), db_config.clone()));
|
||||
|
||||
// wait for a bit so the database fails because the mock is missing
|
||||
let database_captured = Arc::clone(&database);
|
||||
tokio::time::timeout(Duration::from_millis(100), async move {
|
||||
database_captured.wait_for_init().await.unwrap();
|
||||
})
|
||||
.await
|
||||
.unwrap_err();
|
||||
|
||||
// create write buffer
|
||||
let state =
|
||||
MockBufferSharedState::empty_with_n_sequencers(NonZeroU32::try_from(1).unwrap());
|
||||
application
|
||||
.write_buffer_factory()
|
||||
.register_mock("my_mock".to_string(), state.clone());
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(10), async move {
|
||||
database.wait_for_init().await.unwrap();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
/// Normally database rules are provided as grpc messages, but in
|
||||
/// tests they are constructed from database rules structures
|
||||
/// themselves.
|
||||
|
|
Loading…
Reference in New Issue