Merge pull request #2460 from influxdata/crepererum/issue2455b

feat: forward connection config to Kafka write buffer
pull/24376/head
kodiakhq[bot] 2021-09-03 07:02:39 +00:00 committed by GitHub
commit bba040a1cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 9 deletions

View File

@ -705,7 +705,8 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser
.unwrap();
// ingest data as mixed throughput
let producer = KafkaBufferProducer::new(kafka_connection, db_name).unwrap();
let producer =
KafkaBufferProducer::new(kafka_connection, db_name, &Default::default()).unwrap();
producer
.store_entry(
&lp_to_entries("table_1,partition_by=a foo=1 10", &partition_template)

View File

@ -109,7 +109,8 @@ impl WriteBufferConfigFactory {
let writer = match &cfg.type_[..] {
"kafka" => {
let kafka_buffer = KafkaBufferProducer::new(&cfg.connection, db_name)?;
let kafka_buffer =
KafkaBufferProducer::new(&cfg.connection, db_name, &cfg.connection_config)?;
Arc::new(kafka_buffer) as _
}
"mock" => match self.get_mock(&cfg.connection)? {
@ -140,8 +141,13 @@ impl WriteBufferConfigFactory {
let reader = match &cfg.type_[..] {
"kafka" => {
let kafka_buffer =
KafkaBufferConsumer::new(&cfg.connection, server_id, db_name).await?;
let kafka_buffer = KafkaBufferConsumer::new(
&cfg.connection,
server_id,
db_name,
&cfg.connection_config,
)
.await?;
Box::new(kafka_buffer) as _
}
"mock" => match self.get_mock(&cfg.connection)? {

View File

@ -1,5 +1,5 @@
use std::{
collections::BTreeMap,
collections::{BTreeMap, HashMap},
convert::{TryFrom, TryInto},
sync::Arc,
time::Duration,
@ -90,6 +90,7 @@ impl KafkaBufferProducer {
pub fn new(
conn: impl Into<String>,
database_name: impl Into<String>,
connection_config: &HashMap<String, String>,
) -> Result<Self, KafkaError> {
let conn = conn.into();
let database_name = database_name.into();
@ -102,6 +103,9 @@ impl KafkaBufferProducer {
cfg.set("request.required.acks", "all"); // equivalent to acks=-1
cfg.set("compression.type", "snappy");
cfg.set("statistics.interval.ms", "15000");
for (k, v) in connection_config {
cfg.set(k, v);
}
let producer: FutureProducer = cfg.create()?;
@ -246,6 +250,7 @@ impl KafkaBufferConsumer {
conn: impl Into<String> + Send + Sync,
server_id: ServerId,
database_name: impl Into<String> + Send + Sync,
connection_config: &HashMap<String, String>,
) -> Result<Self, KafkaError> {
let conn = conn.into();
let database_name = database_name.into();
@ -256,6 +261,9 @@ impl KafkaBufferConsumer {
cfg.set("enable.auto.commit", "false");
cfg.set("statistics.interval.ms", "15000");
cfg.set("queued.max.messages.kbytes", "10000");
for (k, v) in connection_config {
cfg.set(k, v);
}
// Create a unique group ID for this database's consumer as we don't want to create
// consumer groups.
@ -484,15 +492,20 @@ mod tests {
type Reading = KafkaBufferConsumer;
fn writing(&self) -> Self::Writing {
KafkaBufferProducer::new(&self.conn, &self.database_name).unwrap()
KafkaBufferProducer::new(&self.conn, &self.database_name, &Default::default()).unwrap()
}
async fn reading(&self) -> Self::Reading {
let server_id = self.server_id_counter.fetch_add(1, Ordering::SeqCst);
let server_id = ServerId::try_from(server_id).unwrap();
KafkaBufferConsumer::new(&self.conn, server_id, &self.database_name)
.await
.unwrap()
KafkaBufferConsumer::new(
&self.conn,
server_id,
&self.database_name,
&Default::default(),
)
.await
.unwrap()
}
}