From bd88ac6149e2f8eeb501ca8d42626a032734d2af Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Fri, 12 Aug 2022 14:34:57 +0200 Subject: [PATCH] refactor: parallelise Kafka partition client init Changes the Kafka write buffer impl to parallelise initialisation of the PartitionClient instances. Now that the PartitionClient constructor also performs leader discovery (using cached metadata, influxdata/rskafka#164) and establishes a broker connection (influxdata/rskafka#166) executing them in parallel will cause a proportional decrease in the time taken to bring IOx up. --- write_buffer/src/kafka/mod.rs | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index 82a433a68f..f912cf9351 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -14,7 +14,10 @@ use crate::{ use async_trait::async_trait; use data_types::{KafkaPartition, Sequence, SequenceNumber}; use dml::{DmlMeta, DmlOperation}; -use futures::{stream::BoxStream, StreamExt}; +use futures::{ + stream::{self, BoxStream}, + StreamExt, TryStreamExt, +}; use iox_time::{Time, TimeProvider}; use observability_deps::tracing::warn; use parking_lot::Mutex; @@ -435,13 +438,20 @@ async fn setup_topic( // check if topic already exists let topics = client.list_topics().await?; if let Some(topic) = topics.into_iter().find(|t| t.name == topic_name) { - let mut partition_clients = BTreeMap::new(); - for partition in topic.partitions { - let c = client.partition_client(&topic_name, partition).await?; - let partition = u32::try_from(partition).map_err(WriteBufferError::invalid_data)?; - partition_clients.insert(partition, c); - } - return Ok(partition_clients); + // Instantiate 10 partition clients concurrently until all are ready + // speed up server init. + let client_ref = &client; + return stream::iter(topic.partitions.into_iter().map(|p| { + let topic_name = topic_name.clone(); + async move { + let partition = u32::try_from(p).map_err(WriteBufferError::invalid_data)?; + let c = client_ref.partition_client(&topic_name, p).await?; + Ok((partition, c)) + } + })) + .buffer_unordered(10) + .try_collect::>() + .await; } // create topic