fix: Create one consumer group per server+database

This hasn't caused any problems for me yet, but seemed like a good idea
because we want to be sure we don't get any of Kafka's consumer
rebalancing if we have multiple partitions.
pull/24376/head
Carol (Nichols || Goulding) 2021-07-02 16:17:39 -04:00
parent e5168936f5
commit c90ef7b14b
3 changed files with 25 additions and 13 deletions

View File

@ -530,9 +530,11 @@ impl InitStatus {
let rules = handle let rules = handle
.rules() .rules()
.expect("in this state rules should be loaded"); .expect("in this state rules should be loaded");
let write_buffer = WriteBufferConfig::new(&rules).context(CreateWriteBuffer { let write_buffer = WriteBufferConfig::new(handle.server_id(), &rules).context(
config: rules.write_buffer_connection.clone(), CreateWriteBuffer {
})?; config: rules.write_buffer_connection.clone(),
},
)?;
handle handle
.advance_replay(preserved_catalog, catalog, write_buffer) .advance_replay(preserved_catalog, catalog, write_buffer)

View File

@ -552,12 +552,13 @@ where
.map_err(|e| Box::new(e) as _) .map_err(|e| Box::new(e) as _)
.context(CannotCreatePreservedCatalog)?; .context(CannotCreatePreservedCatalog)?;
let write_buffer = write_buffer::WriteBufferConfig::new(&rules).map_err(|e| { let write_buffer =
Error::CreatingWriteBuffer { write_buffer::WriteBufferConfig::new(server_id, &rules).map_err(|e| {
config: rules.write_buffer_connection.clone(), Error::CreatingWriteBuffer {
source: e, config: rules.write_buffer_connection.clone(),
} source: e,
})?; }
})?;
db_reservation.advance_replay(preserved_catalog, catalog, write_buffer)?; db_reservation.advance_replay(preserved_catalog, catalog, write_buffer)?;
// no actual replay required // no actual replay required

View File

@ -1,5 +1,8 @@
use async_trait::async_trait; use async_trait::async_trait;
use data_types::database_rules::{DatabaseRules, WriteBufferConnection}; use data_types::{
database_rules::{DatabaseRules, WriteBufferConnection},
server_id::ServerId,
};
use entry::{Entry, Sequence, SequencedEntry}; use entry::{Entry, Sequence, SequencedEntry};
use futures::{stream::BoxStream, StreamExt}; use futures::{stream::BoxStream, StreamExt};
use rdkafka::{ use rdkafka::{
@ -22,7 +25,10 @@ pub enum WriteBufferConfig {
} }
impl WriteBufferConfig { impl WriteBufferConfig {
pub fn new(rules: &DatabaseRules) -> Result<Option<Self>, WriteBufferError> { pub fn new(
server_id: ServerId,
rules: &DatabaseRules,
) -> Result<Option<Self>, WriteBufferError> {
let name = rules.db_name(); let name = rules.db_name();
// Right now, the Kafka producer and consumers ar the only production implementations of the // Right now, the Kafka producer and consumers ar the only production implementations of the
@ -36,7 +42,7 @@ impl WriteBufferConfig {
Ok(Some(Self::Writing(Arc::new(kafka_buffer) as _))) Ok(Some(Self::Writing(Arc::new(kafka_buffer) as _)))
} }
Some(WriteBufferConnection::Reading(conn)) => { Some(WriteBufferConnection::Reading(conn)) => {
let kafka_buffer = KafkaBufferConsumer::new(conn, name)?; let kafka_buffer = KafkaBufferConsumer::new(conn, server_id, name)?;
Ok(Some(Self::Reading(Arc::new(kafka_buffer) as _))) Ok(Some(Self::Reading(Arc::new(kafka_buffer) as _)))
} }
@ -174,6 +180,7 @@ impl WriteBufferReading for KafkaBufferConsumer {
impl KafkaBufferConsumer { impl KafkaBufferConsumer {
pub fn new( pub fn new(
conn: impl Into<String>, conn: impl Into<String>,
server_id: ServerId,
database_name: impl Into<String>, database_name: impl Into<String>,
) -> Result<Self, KafkaError> { ) -> Result<Self, KafkaError> {
let conn = conn.into(); let conn = conn.into();
@ -183,7 +190,9 @@ impl KafkaBufferConsumer {
cfg.set("bootstrap.servers", &conn); cfg.set("bootstrap.servers", &conn);
cfg.set("session.timeout.ms", "6000"); cfg.set("session.timeout.ms", "6000");
cfg.set("enable.auto.commit", "false"); cfg.set("enable.auto.commit", "false");
cfg.set("group.id", "placeholder"); // Create a unique group ID for this database's consumer as we don't want to create
// consumer groups.
cfg.set("group.id", &format!("{}-{}", server_id, database_name));
let consumer: StreamConsumer = cfg.create()?; let consumer: StreamConsumer = cfg.create()?;
let mut topics = TopicPartitionList::new(); let mut topics = TopicPartitionList::new();