fix: address review comments

pull/24376/head
Marco Neumann 2021-12-07 09:43:47 +01:00
parent ef9c49087e
commit e558f4c708
2 changed files with 20 additions and 9 deletions

View File

@ -1854,9 +1854,10 @@ mod tests {
// wait for a bit so the database fails because the mock is missing // wait for a bit so the database fails because the mock is missing
let database_captured = Arc::clone(&database); let database_captured = Arc::clone(&database);
tokio::time::timeout(Duration::from_millis(100), async move { tokio::time::timeout(
database_captured.wait_for_init().await.unwrap(); Duration::from_millis(100),
}) database_captured.wait_for_init(),
)
.await .await
.unwrap_err(); .unwrap_err();
@ -1867,11 +1868,10 @@ mod tests {
.write_buffer_factory() .write_buffer_factory()
.register_mock("my_mock".to_string(), state.clone()); .register_mock("my_mock".to_string(), state.clone());
tokio::time::timeout(Duration::from_secs(10), async move { tokio::time::timeout(Duration::from_secs(10), database.wait_for_init())
database.wait_for_init().await.unwrap(); .await
}) .unwrap()
.await .unwrap();
.unwrap();
} }
/// Normally database rules are provided as grpc messages, but in /// Normally database rules are provided as grpc messages, but in

View File

@ -549,6 +549,7 @@ where
let error_code: RDKafkaErrorCode = error_code.into(); let error_code: RDKafkaErrorCode = error_code.into();
match error_code { match error_code {
RDKafkaErrorCode::UnknownTopic | RDKafkaErrorCode::UnknownTopicOrPartition => { RDKafkaErrorCode::UnknownTopic | RDKafkaErrorCode::UnknownTopicOrPartition => {
// The caller is responsible for creating the topic, so this is somewhat OK.
Ok(None) Ok(None)
} }
_ => Err(KafkaError::MetadataFetch(error_code).into()), _ => Err(KafkaError::MetadataFetch(error_code).into()),
@ -612,11 +613,19 @@ where
C: Consumer<D> + Send + Sync + 'static, C: Consumer<D> + Send + Sync + 'static,
D: ConsumerContext, D: ConsumerContext,
{ {
loop { const N_TRIES: usize = 10;
for i in 0..N_TRIES {
if let Some(partitions) = get_partitions(database_name, consumer).await? { if let Some(partitions) = get_partitions(database_name, consumer).await? {
return Ok(partitions); return Ok(partitions);
} }
// debounce after first round
if i > 0 {
info!(topic=%database_name, "Topic does not have partitions after creating it, wait a bit and try again.");
tokio::time::sleep(Duration::from_millis(250)).await;
}
if let Some(creation_config) = creation_config { if let Some(creation_config) = creation_config {
create_kafka_topic( create_kafka_topic(
kafka_connection, kafka_connection,
@ -631,6 +640,8 @@ where
.into()); .into());
} }
} }
Err(format!("Could not auto-create topic after {} tries.", N_TRIES).into())
} }
/// Our own implementation of [`ClientContext`] to overwrite certain logging behavior. /// Our own implementation of [`ClientContext`] to overwrite certain logging behavior.