diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index 82a433a68f..f912cf9351 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -14,7 +14,10 @@ use crate::{ use async_trait::async_trait; use data_types::{KafkaPartition, Sequence, SequenceNumber}; use dml::{DmlMeta, DmlOperation}; -use futures::{stream::BoxStream, StreamExt}; +use futures::{ + stream::{self, BoxStream}, + StreamExt, TryStreamExt, +}; use iox_time::{Time, TimeProvider}; use observability_deps::tracing::warn; use parking_lot::Mutex; @@ -435,13 +438,20 @@ async fn setup_topic( // check if topic already exists let topics = client.list_topics().await?; if let Some(topic) = topics.into_iter().find(|t| t.name == topic_name) { - let mut partition_clients = BTreeMap::new(); - for partition in topic.partitions { - let c = client.partition_client(&topic_name, partition).await?; - let partition = u32::try_from(partition).map_err(WriteBufferError::invalid_data)?; - partition_clients.insert(partition, c); - } - return Ok(partition_clients); + // Instantiate 10 partition clients concurrently until all are ready + // speed up server init. + let client_ref = &client; + return stream::iter(topic.partitions.into_iter().map(|p| { + let topic_name = topic_name.clone(); + async move { + let partition = u32::try_from(p).map_err(WriteBufferError::invalid_data)?; + let c = client_ref.partition_client(&topic_name, p).await?; + Ok((partition, c)) + } + })) + .buffer_unordered(10) + .try_collect::>() + .await; } // create topic