From c0cc2397811afada8a4e896eb5580bce4603e963 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 6 Sep 2021 12:05:50 +0200 Subject: [PATCH] fix: improve Kafka error handling --- write_buffer/src/kafka.rs | 51 ++++++++++++++++++++++++++++++++++----- 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/write_buffer/src/kafka.rs b/write_buffer/src/kafka.rs index 9ada289ca4..47185bc1d8 100644 --- a/write_buffer/src/kafka.rs +++ b/write_buffer/src/kafka.rs @@ -392,13 +392,17 @@ fn admin_client(kafka_connection: &str) -> Result, -) -> Result<(), KafkaError> { +) -> Result<(), WriteBufferError> { let admin = admin_client(kafka_connection)?; let mut topic = NewTopic::new( @@ -411,9 +415,19 @@ async fn create_kafka_topic( } let opts = AdminOptions::default(); - admin.create_topics([&topic], &opts).await?; - - Ok(()) + let mut results = admin.create_topics([&topic], &opts).await?; + assert_eq!(results.len(), 1, "created exactly one topic"); + let result = results.pop().expect("just checked the vector length"); + match result { + Ok(topic) | Err((topic, RDKafkaErrorCode::TopicAlreadyExists)) => { + assert_eq!(topic, database_name); + Ok(()) + } + Err((topic, code)) => { + assert_eq!(topic, database_name); + Err(format!("Cannot create topic '{}': {}", topic, code).into()) + } + } } pub mod test_utils { @@ -496,7 +510,10 @@ pub mod test_utils { let cfg = AlterConfig::new(ResourceSpecifier::Topic(database_name)).set("retention.ms", "-1"); - admin.alter_configs([&cfg], &opts).await.unwrap(); + let mut results = admin.alter_configs([&cfg], &opts).await.unwrap(); + assert_eq!(results.len(), 1, "created exactly one topic"); + let result = results.pop().expect("just checked the vector length"); + result.unwrap(); } /// Generated random topic name for testing. @@ -603,4 +620,26 @@ mod tests { perform_generic_tests(KafkaTestAdapter::new(conn)).await; } + + #[tokio::test] + async fn topic_create_twice() { + let conn = maybe_skip_kafka_integration!(); + let database_name = random_kafka_topic(); + create_kafka_topic( + &conn, + &database_name, + NonZeroU32::try_from(1).unwrap(), + &kafka_sequencer_creation_config(), + ) + .await + .unwrap(); + create_kafka_topic( + &conn, + &database_name, + NonZeroU32::try_from(1).unwrap(), + &kafka_sequencer_creation_config(), + ) + .await + .unwrap(); + } }