Merge branch 'main' into er/refactor/read_buffer_clippy

pull/24376/head
Edd Robinson 2022-02-01 16:25:12 +00:00 committed by GitHub
commit 37ad778603
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 83 additions and 5 deletions

View File

@ -25,7 +25,12 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static {
///
/// The [`dml::DmlMeta`] will be propagated where applicable
///
/// Returns the metadata that was written
/// This call may "async block" (i.e. be in a pending state) to accumulate multiple operations into a single batch.
/// After this method returns the operation was actually written (i.e. it is NOT buffered any longer). You may use
/// [`flush`](Self::flush) to trigger an early submission (e.g. before some linger time expired), which can be
/// helpful for controlled shutdown.
///
/// Returns the metadata that was written.
async fn store_operation(
&self,
sequencer_id: u32,
@ -47,6 +52,13 @@ pub trait WriteBufferWriting: Sync + Send + Debug + 'static {
.await
}
/// Flush all currently blocking store operations ([`store_operation`](Self::store_operation) /
/// [`store_lp`](Self::store_lp)).
///
/// This call is pending while outstanding data is being submitted and will return AFTER the flush completed.
/// However you still need to poll the store operations to get the metadata for every write.
async fn flush(&self);
/// Return type (like `"mock"` or `"kafka"`) of this writer.
fn type_name(&self) -> &'static str;
}
@ -102,7 +114,7 @@ pub mod test_utils {
use super::{WriteBufferError, WriteBufferReading, WriteBufferWriting};
use async_trait::async_trait;
use dml::{test_util::assert_write_op_eq, DmlMeta, DmlOperation, DmlWrite};
use futures::{StreamExt, TryStreamExt};
use futures::{stream::FuturesUnordered, StreamExt, TryStreamExt};
use std::{
collections::{BTreeMap, BTreeSet},
convert::TryFrom,
@ -184,6 +196,7 @@ pub mod test_utils {
test_span_context(&adapter).await;
test_unknown_sequencer_write(&adapter).await;
test_multi_namespaces(&adapter).await;
test_flush(&adapter).await;
}
/// Writes line protocol and returns the [`DmlWrite`] that was written
@ -718,6 +731,43 @@ pub mod test_utils {
assert_reader_content(&mut reader, &[(sequencer_id, &[&w1, &w2])]).await;
}
/// Dummy test to ensure that flushing somewhat works.
async fn test_flush<T>(adapter: &T)
where
T: TestAdapter,
{
let context = adapter.new_context(NonZeroU32::try_from(1).unwrap()).await;
let writer = Arc::new(context.writing(true).await.unwrap());
let mut sequencer_ids = writer.sequencer_ids();
assert_eq!(sequencer_ids.len(), 1);
let sequencer_id = set_pop_first(&mut sequencer_ids).unwrap();
let mut write_tasks: FuturesUnordered<_> = (0..20)
.map(|i| {
let writer = Arc::clone(&writer);
async move {
let entry = format!("upc,region=east user={} {}", i, i);
write("ns", writer.as_ref(), &entry, sequencer_id, None).await;
}
})
.collect();
let write_tasks = tokio::spawn(async move { while write_tasks.next().await.is_some() {} });
tokio::time::sleep(Duration::from_millis(1)).await;
writer.flush().await;
tokio::time::timeout(Duration::from_millis(1_000), write_tasks)
.await
.unwrap()
.unwrap();
}
/// Assert that the content of the reader is as expected.
///
/// This will read `expected.len()` from the reader and then ensures that the stream is pending.

View File

@ -251,6 +251,10 @@ impl WriteBufferWriting for FileBufferProducer {
))
}
async fn flush(&self) {
// no buffer
}
fn type_name(&self) -> &'static str {
"file"
}

View File

@ -7,7 +7,7 @@ use rdkafka::{
consumer::{BaseConsumer, Consumer, ConsumerContext, StreamConsumer},
error::KafkaError,
message::{Headers, OwnedHeaders},
producer::{FutureProducer, FutureRecord},
producer::{FutureProducer, FutureRecord, Producer},
types::RDKafkaErrorCode,
util::Timeout,
ClientConfig, ClientContext, Message, Offset, TopicPartitionList,
@ -61,7 +61,7 @@ pub struct KafkaBufferProducer {
conn: String,
database_name: String,
time_provider: Arc<dyn TimeProvider>,
producer: FutureProducer<ClientContextImpl>,
producer: Arc<FutureProducer<ClientContextImpl>>,
partitions: BTreeSet<u32>,
}
@ -138,6 +138,16 @@ impl WriteBufferWriting for KafkaBufferProducer {
))
}
async fn flush(&self) {
let producer = Arc::clone(&self.producer);
tokio::task::spawn_blocking(move || {
producer.flush(Timeout::Never);
})
.await
.expect("subtask failed");
}
fn type_name(&self) -> &'static str {
"kafka"
}
@ -186,7 +196,7 @@ impl KafkaBufferProducer {
conn,
database_name,
time_provider,
producer,
producer: Arc::new(producer),
partitions,
})
}

View File

@ -305,6 +305,10 @@ impl WriteBufferWriting for MockBufferForWriting {
Ok(meta)
}
async fn flush(&self) {
// no buffer
}
fn type_name(&self) -> &'static str {
"mock"
}
@ -331,6 +335,10 @@ impl WriteBufferWriting for MockBufferForWritingThatAlwaysErrors {
.into())
}
async fn flush(&self) {
// no buffer
}
fn type_name(&self) -> &'static str {
"mock_failing"
}

View File

@ -92,6 +92,12 @@ impl WriteBufferWriting for RSKafkaProducer {
Ok(producer.produce(operation.clone()).await?)
}
async fn flush(&self) {
for producer in self.producers.values() {
producer.flush().await;
}
}
fn type_name(&self) -> &'static str {
"rskafka"
}