fix: improve Kafka error handling

pull/24376/head
Marco Neumann 2021-09-06 12:05:50 +02:00
parent 801cf08be7
commit c0cc239781
1 changed files with 45 additions and 6 deletions

View File

@ -392,13 +392,17 @@ fn admin_client(kafka_connection: &str) -> Result<AdminClient<DefaultClientConte
cfg.create()
}
/// Create Kafka topic for testing.
/// Create Kafka topic based on the provided configs.
///
/// This is create a topic with `n_sequencers` partitions.
///
/// This will NOT fail if the topic already exists!
async fn create_kafka_topic(
kafka_connection: &str,
database_name: &str,
n_sequencers: NonZeroU32,
cfg: &HashMap<String, String>,
) -> Result<(), KafkaError> {
) -> Result<(), WriteBufferError> {
let admin = admin_client(kafka_connection)?;
let mut topic = NewTopic::new(
@ -411,9 +415,19 @@ async fn create_kafka_topic(
}
let opts = AdminOptions::default();
admin.create_topics([&topic], &opts).await?;
let mut results = admin.create_topics([&topic], &opts).await?;
assert_eq!(results.len(), 1, "created exactly one topic");
let result = results.pop().expect("just checked the vector length");
match result {
Ok(topic) | Err((topic, RDKafkaErrorCode::TopicAlreadyExists)) => {
assert_eq!(topic, database_name);
Ok(())
}
Err((topic, code)) => {
assert_eq!(topic, database_name);
Err(format!("Cannot create topic '{}': {}", topic, code).into())
}
}
}
pub mod test_utils {
@ -496,7 +510,10 @@ pub mod test_utils {
let cfg =
AlterConfig::new(ResourceSpecifier::Topic(database_name)).set("retention.ms", "-1");
admin.alter_configs([&cfg], &opts).await.unwrap();
let mut results = admin.alter_configs([&cfg], &opts).await.unwrap();
assert_eq!(results.len(), 1, "created exactly one topic");
let result = results.pop().expect("just checked the vector length");
result.unwrap();
}
/// Generated random topic name for testing.
@ -603,4 +620,26 @@ mod tests {
perform_generic_tests(KafkaTestAdapter::new(conn)).await;
}
#[tokio::test]
async fn topic_create_twice() {
let conn = maybe_skip_kafka_integration!();
let database_name = random_kafka_topic();
create_kafka_topic(
&conn,
&database_name,
NonZeroU32::try_from(1).unwrap(),
&kafka_sequencer_creation_config(),
)
.await
.unwrap();
create_kafka_topic(
&conn,
&database_name,
NonZeroU32::try_from(1).unwrap(),
&kafka_sequencer_creation_config(),
)
.await
.unwrap();
}
}