refactor: parallelise Kafka partition client init
Changes the Kafka write buffer impl to parallelise initialisation of the PartitionClient instances. Now that the PartitionClient constructor also performs leader discovery (using cached metadata, influxdata/rskafka#164) and establishes a broker connection (influxdata/rskafka#166) executing them in parallel will cause a proportional decrease in the time taken to bring IOx up.pull/24376/head
parent
b2e50f0c1a
commit
bd88ac6149
|
@ -14,7 +14,10 @@ use crate::{
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use data_types::{KafkaPartition, Sequence, SequenceNumber};
|
use data_types::{KafkaPartition, Sequence, SequenceNumber};
|
||||||
use dml::{DmlMeta, DmlOperation};
|
use dml::{DmlMeta, DmlOperation};
|
||||||
use futures::{stream::BoxStream, StreamExt};
|
use futures::{
|
||||||
|
stream::{self, BoxStream},
|
||||||
|
StreamExt, TryStreamExt,
|
||||||
|
};
|
||||||
use iox_time::{Time, TimeProvider};
|
use iox_time::{Time, TimeProvider};
|
||||||
use observability_deps::tracing::warn;
|
use observability_deps::tracing::warn;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
|
@ -435,13 +438,20 @@ async fn setup_topic(
|
||||||
// check if topic already exists
|
// check if topic already exists
|
||||||
let topics = client.list_topics().await?;
|
let topics = client.list_topics().await?;
|
||||||
if let Some(topic) = topics.into_iter().find(|t| t.name == topic_name) {
|
if let Some(topic) = topics.into_iter().find(|t| t.name == topic_name) {
|
||||||
let mut partition_clients = BTreeMap::new();
|
// Instantiate 10 partition clients concurrently until all are ready
|
||||||
for partition in topic.partitions {
|
// speed up server init.
|
||||||
let c = client.partition_client(&topic_name, partition).await?;
|
let client_ref = &client;
|
||||||
let partition = u32::try_from(partition).map_err(WriteBufferError::invalid_data)?;
|
return stream::iter(topic.partitions.into_iter().map(|p| {
|
||||||
partition_clients.insert(partition, c);
|
let topic_name = topic_name.clone();
|
||||||
}
|
async move {
|
||||||
return Ok(partition_clients);
|
let partition = u32::try_from(p).map_err(WriteBufferError::invalid_data)?;
|
||||||
|
let c = client_ref.partition_client(&topic_name, p).await?;
|
||||||
|
Ok((partition, c))
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.buffer_unordered(10)
|
||||||
|
.try_collect::<BTreeMap<_, _>>()
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
// create topic
|
// create topic
|
||||||
|
|
Loading…
Reference in New Issue