fix(ingester): connect to assigned Kafka partition

During initialisation, the ingester connects to the Kafka brokers - this
involves per-partition leadership discovery & connection establishment.
These connections are then retained for the lifetime of the process.

Prior to this commit, the ingester would establish a connection to all
partition leaders for a given topic. After this commit, the ingester
connects to only the partition leaders it is going to consume from
(for those shards that it is assigned.)
pull/24376/head
Dom Dwyer 2022-09-07 13:14:58 +02:00
parent cbfd37540a
commit d1ca29c029
5 changed files with 84 additions and 34 deletions

View File

@ -1,7 +1,7 @@
//! Config for [`write_buffer`].
use iox_time::SystemProvider;
use observability_deps::tracing::*;
use std::{collections::BTreeMap, num::NonZeroU32, path::PathBuf, sync::Arc};
use std::{collections::BTreeMap, num::NonZeroU32, ops::Range, path::PathBuf, sync::Arc};
use tempfile::TempDir;
use trace::TraceCollector;
use write_buffer::{
@ -94,12 +94,13 @@ impl WriteBufferConfig {
pub async fn writing(
&self,
metrics: Arc<metric::Registry>,
partitions: Option<Range<i32>>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> {
let conn = self.conn();
let factory = Self::factory(metrics);
factory
.new_config_write(&self.topic, trace_collector.as_ref(), &conn)
.new_config_write(&self.topic, partitions, trace_collector.as_ref(), &conn)
.await
}
@ -107,12 +108,13 @@ impl WriteBufferConfig {
pub async fn reading(
&self,
metrics: Arc<metric::Registry>,
partitions: Option<Range<i32>>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<Arc<dyn WriteBufferReading>, WriteBufferError> {
let conn = self.conn();
let factory = Self::factory(metrics);
factory
.new_config_read(&self.topic, trace_collector.as_ref(), &conn)
.new_config_read(&self.topic, partitions, trace_collector.as_ref(), &conn)
.await
}

View File

@ -156,10 +156,9 @@ pub async fn create_ingester_server_type(
return Err(Error::ShardIndexRange);
}
let shard_indexes: Vec<_> = (ingester_config.shard_index_range_start
..=ingester_config.shard_index_range_end)
.map(ShardIndex::new)
.collect();
let shard_range =
ingester_config.shard_index_range_start..(ingester_config.shard_index_range_end + 1);
let shard_indexes: Vec<_> = shard_range.clone().map(ShardIndex::new).collect();
let mut shards = BTreeMap::new();
for shard_index in shard_indexes {
@ -171,7 +170,11 @@ pub async fn create_ingester_server_type(
let trace_collector = common_state.trace_collector();
let write_buffer = write_buffer_config
.reading(Arc::clone(&metric_registry), trace_collector.clone())
.reading(
Arc::clone(&metric_registry),
Some(shard_range),
trace_collector.clone(),
)
.await?;
let lifecycle_config = LifecycleConfig::new(

View File

@ -320,7 +320,7 @@ async fn init_write_buffer(
)> {
let write_buffer = Arc::new(
write_buffer_config
.writing(Arc::clone(&metrics), trace_collector)
.writing(Arc::clone(&metrics), None, trace_collector)
.await?,
);

View File

@ -12,6 +12,7 @@ use parking_lot::RwLock;
use std::{
collections::{btree_map::Entry, BTreeMap},
num::NonZeroU32,
ops::Range,
path::PathBuf,
sync::Arc,
};
@ -151,6 +152,7 @@ impl WriteBufferConfigFactory {
pub async fn new_config_write(
&self,
db_name: &str,
partitions: Option<Range<i32>>,
trace_collector: Option<&Arc<dyn TraceCollector>>,
cfg: &WriteBufferConnection,
) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> {
@ -173,6 +175,7 @@ impl WriteBufferConfigFactory {
&cfg.connection_config,
Arc::clone(&self.time_provider),
cfg.creation_config.as_ref(),
partitions,
trace_collector.map(Arc::clone),
&*self.metric_registry,
)
@ -205,6 +208,7 @@ impl WriteBufferConfigFactory {
pub async fn new_config_read(
&self,
db_name: &str,
partitions: Option<Range<i32>>,
trace_collector: Option<&Arc<dyn TraceCollector>>,
cfg: &WriteBufferConnection,
) -> Result<Arc<dyn WriteBufferReading>, WriteBufferError> {
@ -226,6 +230,7 @@ impl WriteBufferConfigFactory {
db_name.to_owned(),
&cfg.connection_config,
cfg.creation_config.as_ref(),
partitions,
trace_collector.map(Arc::clone),
)
.await?;
@ -275,7 +280,7 @@ mod tests {
};
let conn = factory
.new_config_write(db_name.as_str(), None, &cfg)
.new_config_write(db_name.as_str(), None, None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "file");
@ -294,7 +299,7 @@ mod tests {
};
let conn = factory
.new_config_read(db_name.as_str(), None, &cfg)
.new_config_read(db_name.as_str(), None, None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "file");
@ -316,7 +321,7 @@ mod tests {
};
let conn = factory
.new_config_write(db_name.as_str(), None, &cfg)
.new_config_write(db_name.as_str(), None, None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "mock");
@ -328,7 +333,7 @@ mod tests {
..Default::default()
};
let err = factory
.new_config_write(db_name.as_str(), None, &cfg)
.new_config_write(db_name.as_str(), None, None, &cfg)
.await
.unwrap_err();
assert!(err.to_string().contains("Unknown mock ID:"));
@ -350,7 +355,7 @@ mod tests {
};
let conn = factory
.new_config_read(db_name.as_str(), None, &cfg)
.new_config_read(db_name.as_str(), None, None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "mock");
@ -362,7 +367,7 @@ mod tests {
..Default::default()
};
let err = factory
.new_config_read(db_name.as_str(), None, &cfg)
.new_config_read(db_name.as_str(), None, None, &cfg)
.await
.unwrap_err();
assert!(err.to_string().contains("Unknown mock ID:"));
@ -383,7 +388,7 @@ mod tests {
};
let conn = factory
.new_config_write(db_name.as_str(), None, &cfg)
.new_config_write(db_name.as_str(), None, None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "mock_failing");
@ -395,7 +400,7 @@ mod tests {
..Default::default()
};
let err = factory
.new_config_write(db_name.as_str(), None, &cfg)
.new_config_write(db_name.as_str(), None, None, &cfg)
.await
.unwrap_err();
assert!(err.to_string().contains("Unknown mock ID:"));
@ -416,7 +421,7 @@ mod tests {
};
let conn = factory
.new_config_read(db_name.as_str(), None, &cfg)
.new_config_read(db_name.as_str(), None, None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "mock_failing");
@ -428,7 +433,7 @@ mod tests {
..Default::default()
};
let err = factory
.new_config_read(db_name.as_str(), None, &cfg)
.new_config_read(db_name.as_str(), None, None, &cfg)
.await
.unwrap_err();
assert!(err.to_string().contains("Unknown mock ID:"));
@ -464,7 +469,7 @@ mod tests {
};
let conn = factory
.new_config_write(db_name.as_str(), None, &cfg)
.new_config_write(db_name.as_str(), None, None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "kafka");
@ -484,7 +489,7 @@ mod tests {
};
let conn = factory
.new_config_read(db_name.as_str(), None, &cfg)
.new_config_read(db_name.as_str(), None, None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "kafka");

View File

@ -33,6 +33,7 @@ use rskafka::{
};
use std::{
collections::{BTreeMap, BTreeSet},
ops::Range,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
@ -55,17 +56,25 @@ pub struct RSKafkaProducer {
}
impl RSKafkaProducer {
#[allow(clippy::too_many_arguments)]
pub async fn new<'a>(
conn: String,
topic_name: String,
connection_config: &'a BTreeMap<String, String>,
time_provider: Arc<dyn TimeProvider>,
creation_config: Option<&'a WriteBufferCreationConfig>,
partitions: Option<Range<i32>>,
_trace_collector: Option<Arc<dyn TraceCollector>>,
metric_registry: &'a metric::Registry,
) -> Result<Self> {
let partition_clients =
setup_topic(conn, topic_name.clone(), connection_config, creation_config).await?;
let partition_clients = setup_topic(
conn,
topic_name.clone(),
connection_config,
creation_config,
partitions,
)
.await?;
let producer_config = ProducerConfig::try_from(connection_config)?;
@ -358,10 +367,17 @@ impl RSKafkaConsumer {
topic_name: String,
connection_config: &BTreeMap<String, String>,
creation_config: Option<&WriteBufferCreationConfig>,
partitions: Option<Range<i32>>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<Self> {
let partition_clients =
setup_topic(conn, topic_name.clone(), connection_config, creation_config).await?;
let partition_clients = setup_topic(
conn,
topic_name.clone(),
connection_config,
creation_config,
partitions,
)
.await?;
let partition_clients = partition_clients
.into_iter()
@ -428,6 +444,7 @@ async fn setup_topic(
topic_name: String,
connection_config: &BTreeMap<String, String>,
creation_config: Option<&WriteBufferCreationConfig>,
partitions: Option<Range<i32>>,
) -> Result<BTreeMap<ShardIndex, PartitionClient>> {
let client_config = ClientConfig::try_from(connection_config)?;
let mut client_builder = ClientBuilder::new(vec![conn]);
@ -447,17 +464,37 @@ async fn setup_topic(
// Instantiate 10 partition clients concurrently until all are ready
// speed up server init.
let client_ref = &client;
return stream::iter(topic.partitions.into_iter().map(|p| {
let clients = stream::iter(
topic
.partitions
.into_iter()
.filter(|p| {
partitions
.as_ref()
.map(|want| want.contains(p))
.unwrap_or(true)
})
.map(|p| {
let topic_name = topic_name.clone();
async move {
let shard_index = ShardIndex::new(p);
let c = client_ref.partition_client(&topic_name, p).await?;
Ok((shard_index, c))
Result::<_, WriteBufferError>::Ok((shard_index, c))
}
}))
}),
)
.buffer_unordered(10)
.try_collect::<BTreeMap<_, _>>()
.await;
.await?;
if let Some(p) = partitions {
assert_eq!(
p.len(),
clients.len(),
"requested partition clients not initialised"
);
}
return Ok(clients);
}
// create topic
@ -576,6 +613,7 @@ mod tests {
&BTreeMap::default(),
Arc::clone(&self.time_provider),
self.creation_config(creation_config).as_ref(),
None,
Some(self.trace_collector() as Arc<_>),
&self.metrics,
)
@ -588,6 +626,7 @@ mod tests {
self.topic_name.clone(),
&BTreeMap::default(),
self.creation_config(creation_config).as_ref(),
None,
Some(self.trace_collector() as Arc<_>),
)
.await
@ -625,6 +664,7 @@ mod tests {
n_shards,
..Default::default()
}),
None,
)
.await
.unwrap();