From 36a7d9b8f3b63809bba0bc05a66ac366ca0565f2 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 1 Feb 2022 15:16:23 +0000 Subject: [PATCH] feat: flush interface for write buffer producers (#3595) Closes #3504. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- write_buffer/src/core.rs | 54 +++++++++++++++++++++++++++++++-- write_buffer/src/file.rs | 4 +++ write_buffer/src/kafka.rs | 16 ++++++++-- write_buffer/src/mock.rs | 8 +++++ write_buffer/src/rskafka/mod.rs | 6 ++++ 5 files changed, 83 insertions(+), 5 deletions(-) diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index 5ee7d4c81d..7556f7d905 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -25,7 +25,12 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static { /// /// The [`dml::DmlMeta`] will be propagated where applicable /// - /// Returns the metadata that was written + /// This call may "async block" (i.e. be in a pending state) to accumulate multiple operations into a single batch. + /// After this method returns the operation was actually written (i.e. it is NOT buffered any longer). You may use + /// [`flush`](Self::flush) to trigger an early submission (e.g. before some linger time expired), which can be + /// helpful for controlled shutdown. + /// + /// Returns the metadata that was written. async fn store_operation( &self, sequencer_id: u32, @@ -47,6 +52,13 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static { .await } + /// Flush all currently blocking store operations ([`store_operation`](Self::store_operation) / + /// [`store_lp`](Self::store_lp)). + /// + /// This call is pending while outstanding data is being submitted and will return AFTER the flush completed. + /// However you still need to poll the store operations to get the metadata for every write. + async fn flush(&self); + /// Return type (like `"mock"` or `"kafka"`) of this writer. fn type_name(&self) -> &'static str; } @@ -102,7 +114,7 @@ pub mod test_utils { use super::{WriteBufferError, WriteBufferReading, WriteBufferWriting}; use async_trait::async_trait; use dml::{test_util::assert_write_op_eq, DmlMeta, DmlOperation, DmlWrite}; - use futures::{StreamExt, TryStreamExt}; + use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt}; use std::{ collections::{BTreeMap, BTreeSet}, convert::TryFrom, @@ -184,6 +196,7 @@ pub mod test_utils { test_span_context(&adapter).await; test_unknown_sequencer_write(&adapter).await; test_multi_namespaces(&adapter).await; + test_flush(&adapter).await; } /// Writes line protocol and returns the [`DmlWrite`] that was written @@ -718,6 +731,43 @@ pub mod test_utils { assert_reader_content(&mut reader, &[(sequencer_id, &[&w1, &w2])]).await; } + /// Dummy test to ensure that flushing somewhat works. + async fn test_flush(adapter: &T) + where + T: TestAdapter, + { + let context = adapter.new_context(NonZeroU32::try_from(1).unwrap()).await; + + let writer = Arc::new(context.writing(true).await.unwrap()); + + let mut sequencer_ids = writer.sequencer_ids(); + assert_eq!(sequencer_ids.len(), 1); + let sequencer_id = set_pop_first(&mut sequencer_ids).unwrap(); + + let mut write_tasks: FuturesUnordered<_> = (0..20) + .map(|i| { + let writer = Arc::clone(&writer); + + async move { + let entry = format!("upc,region=east user={} {}", i, i); + + write("ns", writer.as_ref(), &entry, sequencer_id, None).await; + } + }) + .collect(); + + let write_tasks = tokio::spawn(async move { while write_tasks.next().await.is_some() {} }); + + tokio::time::sleep(Duration::from_millis(1)).await; + + writer.flush().await; + + tokio::time::timeout(Duration::from_millis(1_000), write_tasks) + .await + .unwrap() + .unwrap(); + } + /// Assert that the content of the reader is as expected. /// /// This will read `expected.len()` from the reader and then ensures that the stream is pending. diff --git a/write_buffer/src/file.rs b/write_buffer/src/file.rs index d572591dd9..6337137963 100644 --- a/write_buffer/src/file.rs +++ b/write_buffer/src/file.rs @@ -251,6 +251,10 @@ impl WriteBufferWriting for FileBufferProducer { )) } + async fn flush(&self) { + // no buffer + } + fn type_name(&self) -> &'static str { "file" } diff --git a/write_buffer/src/kafka.rs b/write_buffer/src/kafka.rs index 52e855da11..07f3dfbd84 100644 --- a/write_buffer/src/kafka.rs +++ b/write_buffer/src/kafka.rs @@ -7,7 +7,7 @@ use rdkafka::{ consumer::{BaseConsumer, Consumer, ConsumerContext, StreamConsumer}, error::KafkaError, message::{Headers, OwnedHeaders}, - producer::{FutureProducer, FutureRecord}, + producer::{FutureProducer, FutureRecord, Producer}, types::RDKafkaErrorCode, util::Timeout, ClientConfig, ClientContext, Message, Offset, TopicPartitionList, @@ -61,7 +61,7 @@ pub struct KafkaBufferProducer { conn: String, database_name: String, time_provider: Arc, - producer: FutureProducer, + producer: Arc>, partitions: BTreeSet, } @@ -138,6 +138,16 @@ impl WriteBufferWriting for KafkaBufferProducer { )) } + async fn flush(&self) { + let producer = Arc::clone(&self.producer); + + tokio::task::spawn_blocking(move || { + producer.flush(Timeout::Never); + }) + .await + .expect("subtask failed"); + } + fn type_name(&self) -> &'static str { "kafka" } @@ -186,7 +196,7 @@ impl KafkaBufferProducer { conn, database_name, time_provider, - producer, + producer: Arc::new(producer), partitions, }) } diff --git a/write_buffer/src/mock.rs b/write_buffer/src/mock.rs index b06148dc31..2f4b7a349e 100644 --- a/write_buffer/src/mock.rs +++ b/write_buffer/src/mock.rs @@ -305,6 +305,10 @@ impl WriteBufferWriting for MockBufferForWriting { Ok(meta) } + async fn flush(&self) { + // no buffer + } + fn type_name(&self) -> &'static str { "mock" } @@ -331,6 +335,10 @@ impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors { .into()) } + async fn flush(&self) { + // no buffer + } + fn type_name(&self) -> &'static str { "mock_failing" } diff --git a/write_buffer/src/rskafka/mod.rs b/write_buffer/src/rskafka/mod.rs index 6742b36edd..00b2bbda91 100644 --- a/write_buffer/src/rskafka/mod.rs +++ b/write_buffer/src/rskafka/mod.rs @@ -92,6 +92,12 @@ impl WriteBufferWriting for RSKafkaProducer { Ok(producer.produce(operation.clone()).await?) } + async fn flush(&self) { + for producer in self.producers.values() { + producer.flush().await; + } + } + fn type_name(&self) -> &'static str { "rskafka" }