refactor: Rename KafkaBuffer to KafkaBufferProducer

pull/24376/head
Carol (Nichols || Goulding) 2021-06-21 12:51:00 -04:00
parent e5de73133c
commit 63d26f6f3f
1 changed files with 9 additions and 9 deletions

View File

@ -13,13 +13,13 @@ pub type WriteBufferError = Box<dyn std::error::Error + Sync + Send>;
pub fn new(rules: &DatabaseRules) -> Result<Option<Arc<dyn WriteBufferWriting>>, WriteBufferError> { pub fn new(rules: &DatabaseRules) -> Result<Option<Arc<dyn WriteBufferWriting>>, WriteBufferError> {
let name = rules.db_name(); let name = rules.db_name();
// Right now, `KafkaBuffer` is the only production implementation of the `WriteBufferWriting` // Right now, `KafkaBufferProducer` is the only production implementation of the
// trait, so always use `KafkaBuffer` when there is a write buffer writing connection string // `WriteBufferWriting` trait, so always use `KafkaBufferProducer` when there is a write buffer
// specified. If/when there are other kinds of write buffers, additional configuration will // writing connection string specified. If/when there are other kinds of write buffers,
// be needed to determine what kind of write buffer to use here. // additional configuration will be needed to determine what kind of write buffer to use here.
match rules.write_buffer_connection.as_ref() { match rules.write_buffer_connection.as_ref() {
Some(WriteBufferConnection::Writing(conn)) => { Some(WriteBufferConnection::Writing(conn)) => {
let kafka_buffer = KafkaBuffer::new(conn, name)?; let kafka_buffer = KafkaBufferProducer::new(conn, name)?;
Ok(Some(Arc::new(kafka_buffer) as _)) Ok(Some(Arc::new(kafka_buffer) as _))
} }
@ -40,14 +40,14 @@ pub trait WriteBufferWriting: Sync + Send + std::fmt::Debug + 'static {
// async fn restore_from(&self, sequence: &Sequence) -> Result<Stream<Entry>, Err>; // async fn restore_from(&self, sequence: &Sequence) -> Result<Stream<Entry>, Err>;
} }
pub struct KafkaBuffer { pub struct KafkaBufferProducer {
conn: String, conn: String,
database_name: String, database_name: String,
producer: FutureProducer, producer: FutureProducer,
} }
// Needed because rdkafka's FutureProducer doesn't impl Debug // Needed because rdkafka's FutureProducer doesn't impl Debug
impl std::fmt::Debug for KafkaBuffer { impl std::fmt::Debug for KafkaBufferProducer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KafkaBuffer") f.debug_struct("KafkaBuffer")
.field("conn", &self.conn) .field("conn", &self.conn)
@ -57,7 +57,7 @@ impl std::fmt::Debug for KafkaBuffer {
} }
#[async_trait] #[async_trait]
impl WriteBufferWriting for KafkaBuffer { impl WriteBufferWriting for KafkaBufferProducer {
/// Send an `Entry` to Kafka and return the partition ID as the sequencer ID and the offset /// Send an `Entry` to Kafka and return the partition ID as the sequencer ID and the offset
/// as the sequence number. /// as the sequence number.
async fn store_entry(&self, entry: &Entry) -> Result<Sequence, WriteBufferError> { async fn store_entry(&self, entry: &Entry) -> Result<Sequence, WriteBufferError> {
@ -84,7 +84,7 @@ impl WriteBufferWriting for KafkaBuffer {
} }
} }
impl KafkaBuffer { impl KafkaBufferProducer {
pub fn new( pub fn new(
conn: impl Into<String>, conn: impl Into<String>,
database_name: impl Into<String>, database_name: impl Into<String>,