fix: Pass the database to the KafkaBuffer to use as the topic

pull/24376/head
Carol (Nichols || Goulding) 2021-06-16 14:14:52 -04:00
parent 93881da016
commit 250b9362a6
2 changed files with 7 additions and 3 deletions

View File

@ -255,7 +255,7 @@ impl Config {
let write_buffer = rules
.write_buffer_connection_string
.as_ref()
.map(|conn| Arc::new(KafkaBuffer::new(conn)) as _);
.map(|conn| Arc::new(KafkaBuffer::new(conn, &name)) as _);
let db = Arc::new(Db::new(
rules,

View File

@ -19,6 +19,7 @@ pub trait WriteBuffer: Sync + Send + std::fmt::Debug + 'static {
#[derive(Debug)]
pub struct KafkaBuffer {
conn: String,
database_name: String,
}
#[async_trait]
@ -32,8 +33,11 @@ impl WriteBuffer for KafkaBuffer {
}
impl KafkaBuffer {
pub fn new(conn: impl Into<String>) -> Self {
Self { conn: conn.into() }
pub fn new(conn: impl Into<String>, database_name: impl Into<String>) -> Self {
Self {
conn: conn.into(),
database_name: database_name.into(),
}
}
}