From c90ef7b14b6ab14da462444e2d37ee158f19c1a4 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 2 Jul 2021 16:17:39 -0400 Subject: [PATCH] fix: Create one consumer group per server+database This hasn't caused any problems for me yet, but seemed like a good idea because we want to be sure we don't get any of Kafka's consumer rebalancing if we have multiple partitions. --- server/src/init.rs | 8 +++++--- server/src/lib.rs | 13 +++++++------ server/src/write_buffer.rs | 17 +++++++++++++---- 3 files changed, 25 insertions(+), 13 deletions(-) diff --git a/server/src/init.rs b/server/src/init.rs index 3417c7da7a..ec80330842 100644 --- a/server/src/init.rs +++ b/server/src/init.rs @@ -530,9 +530,11 @@ impl InitStatus { let rules = handle .rules() .expect("in this state rules should be loaded"); - let write_buffer = WriteBufferConfig::new(&rules).context(CreateWriteBuffer { - config: rules.write_buffer_connection.clone(), - })?; + let write_buffer = WriteBufferConfig::new(handle.server_id(), &rules).context( + CreateWriteBuffer { + config: rules.write_buffer_connection.clone(), + }, + )?; handle .advance_replay(preserved_catalog, catalog, write_buffer) diff --git a/server/src/lib.rs b/server/src/lib.rs index 8ef2c12c06..0e8ea1da0c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -552,12 +552,13 @@ where .map_err(|e| Box::new(e) as _) .context(CannotCreatePreservedCatalog)?; - let write_buffer = write_buffer::WriteBufferConfig::new(&rules).map_err(|e| { - Error::CreatingWriteBuffer { - config: rules.write_buffer_connection.clone(), - source: e, - } - })?; + let write_buffer = + write_buffer::WriteBufferConfig::new(server_id, &rules).map_err(|e| { + Error::CreatingWriteBuffer { + config: rules.write_buffer_connection.clone(), + source: e, + } + })?; db_reservation.advance_replay(preserved_catalog, catalog, write_buffer)?; // no actual replay required diff --git a/server/src/write_buffer.rs b/server/src/write_buffer.rs index 1cc90ae8a3..2cda968130 100644 --- a/server/src/write_buffer.rs +++ b/server/src/write_buffer.rs @@ -1,5 +1,8 @@ use async_trait::async_trait; -use data_types::database_rules::{DatabaseRules, WriteBufferConnection}; +use data_types::{ + database_rules::{DatabaseRules, WriteBufferConnection}, + server_id::ServerId, +}; use entry::{Entry, Sequence, SequencedEntry}; use futures::{stream::BoxStream, StreamExt}; use rdkafka::{ @@ -22,7 +25,10 @@ pub enum WriteBufferConfig { } impl WriteBufferConfig { - pub fn new(rules: &DatabaseRules) -> Result, WriteBufferError> { + pub fn new( + server_id: ServerId, + rules: &DatabaseRules, + ) -> Result, WriteBufferError> { let name = rules.db_name(); // Right now, the Kafka producer and consumers ar the only production implementations of the @@ -36,7 +42,7 @@ impl WriteBufferConfig { Ok(Some(Self::Writing(Arc::new(kafka_buffer) as _))) } Some(WriteBufferConnection::Reading(conn)) => { - let kafka_buffer = KafkaBufferConsumer::new(conn, name)?; + let kafka_buffer = KafkaBufferConsumer::new(conn, server_id, name)?; Ok(Some(Self::Reading(Arc::new(kafka_buffer) as _))) } @@ -174,6 +180,7 @@ impl WriteBufferReading for KafkaBufferConsumer { impl KafkaBufferConsumer { pub fn new( conn: impl Into, + server_id: ServerId, database_name: impl Into, ) -> Result { let conn = conn.into(); @@ -183,7 +190,9 @@ impl KafkaBufferConsumer { cfg.set("bootstrap.servers", &conn); cfg.set("session.timeout.ms", "6000"); cfg.set("enable.auto.commit", "false"); - cfg.set("group.id", "placeholder"); + // Create a unique group ID for this database's consumer as we don't want to create + // consumer groups. + cfg.set("group.id", &format!("{}-{}", server_id, database_name)); let consumer: StreamConsumer = cfg.create()?; let mut topics = TopicPartitionList::new();