test: prepare write buffer test suite for failable reader and writer creation

This is required to work w/ sequencer auto-creation.
pull/24376/head
Marco Neumann 2021-09-03 10:42:18 +02:00
parent f1864813f4
commit 82f9750ba7
3 changed files with 30 additions and 28 deletions

View File

@ -82,7 +82,7 @@ pub mod test_utils {
use entry::{test_helpers::lp_to_entry, Entry};
use futures::{StreamExt, TryStreamExt};
use super::{WriteBufferReading, WriteBufferWriting};
use super::{WriteBufferError, WriteBufferReading, WriteBufferWriting};
/// Adapter to make a concrete write buffer implementation work w/ [`perform_generic_tests`].
#[async_trait]
@ -109,10 +109,10 @@ pub mod test_utils {
type Reading: WriteBufferReading;
/// Create new writer.
fn writing(&self) -> Self::Writing;
async fn writing(&self) -> Result<Self::Writing, WriteBufferError>;
/// Create new reader.
async fn reading(&self) -> Self::Reading;
async fn reading(&self) -> Result<Self::Reading, WriteBufferError>;
}
/// Generic test suite that must be passed by all proper write buffer implementations.
@ -150,8 +150,8 @@ pub mod test_utils {
let entry_2 = lp_to_entry("upc user=2 200");
let entry_3 = lp_to_entry("upc user=3 300");
let writer = context.writing();
let mut reader = context.reading().await;
let writer = context.writing().await.unwrap();
let mut reader = context.reading().await.unwrap();
let mut streams = reader.streams();
assert_eq!(streams.len(), 1);
@ -203,8 +203,8 @@ pub mod test_utils {
let entry_2 = lp_to_entry("upc user=2 200");
let entry_3 = lp_to_entry("upc user=3 300");
let writer = context.writing();
let mut reader = context.reading().await;
let writer = context.writing().await.unwrap();
let mut reader = context.reading().await.unwrap();
let waker = futures::task::noop_waker();
let mut cx = futures::task::Context::from_waker(&waker);
@ -266,8 +266,8 @@ pub mod test_utils {
let entry_2 = lp_to_entry("upc user=2 200");
let entry_3 = lp_to_entry("upc user=3 300");
let writer = context.writing();
let mut reader = context.reading().await;
let writer = context.writing().await.unwrap();
let mut reader = context.reading().await.unwrap();
let mut streams = reader.streams();
assert_eq!(streams.len(), 2);
@ -325,10 +325,10 @@ pub mod test_utils {
let entry_east_2 = lp_to_entry("upc,region=east user=2 200");
let entry_west_1 = lp_to_entry("upc,region=west user=1 200");
let writer_1 = context.writing();
let writer_2 = context.writing();
let mut reader_1 = context.reading().await;
let mut reader_2 = context.reading().await;
let writer_1 = context.writing().await.unwrap();
let writer_2 = context.writing().await.unwrap();
let mut reader_1 = context.reading().await.unwrap();
let mut reader_2 = context.reading().await.unwrap();
// TODO: do not hard-code sequencer IDs here but provide a proper interface
writer_1.store_entry(&entry_east_1, 0).await.unwrap();
@ -368,13 +368,13 @@ pub mod test_utils {
let entry_east_3 = lp_to_entry("upc,region=east user=3 300");
let entry_west_1 = lp_to_entry("upc,region=west user=1 200");
let writer = context.writing();
let writer = context.writing().await.unwrap();
let _sequence_number_east_1 = writer.store_entry(&entry_east_1, 0).await.unwrap().0.number;
let sequence_number_east_2 = writer.store_entry(&entry_east_2, 0).await.unwrap().0.number;
let _sequence_number_west_1 = writer.store_entry(&entry_west_1, 1).await.unwrap().0.number;
let mut reader_1 = context.reading().await;
let mut reader_2 = context.reading().await;
let mut reader_1 = context.reading().await.unwrap();
let mut reader_2 = context.reading().await.unwrap();
// forward seek
reader_1.seek(0, sequence_number_east_2).await.unwrap();
@ -429,8 +429,8 @@ pub mod test_utils {
let entry_east_2 = lp_to_entry("upc,region=east user=2 200");
let entry_west_1 = lp_to_entry("upc,region=west user=1 200");
let writer = context.writing();
let mut reader = context.reading().await;
let writer = context.writing().await.unwrap();
let mut reader = context.reading().await.unwrap();
let mut streams = reader.streams();
assert_eq!(streams.len(), 2);
@ -471,8 +471,8 @@ pub mod test_utils {
let entry = lp_to_entry("upc user=1 100");
let writer = context.writing();
let mut reader = context.reading().await;
let writer = context.writing().await.unwrap();
let mut reader = context.reading().await.unwrap();
let mut streams = reader.streams();
assert_eq!(streams.len(), 1);

View File

@ -494,13 +494,15 @@ mod tests {
type Reading = KafkaBufferConsumer;
fn writing(&self) -> Self::Writing {
KafkaBufferProducer::new(&self.conn, &self.database_name, &Default::default()).unwrap()
async fn writing(&self) -> Result<Self::Writing, WriteBufferError> {
KafkaBufferProducer::new(&self.conn, &self.database_name, &Default::default())
.map_err(|e| e.into())
}
async fn reading(&self) -> Self::Reading {
async fn reading(&self) -> Result<Self::Reading, WriteBufferError> {
let server_id = self.server_id_counter.fetch_add(1, Ordering::SeqCst);
let server_id = ServerId::try_from(server_id).unwrap();
KafkaBufferConsumer::new(
&self.conn,
server_id,
@ -508,7 +510,7 @@ mod tests {
&Default::default(),
)
.await
.unwrap()
.map_err(|e| e.into())
}
}

View File

@ -407,12 +407,12 @@ mod tests {
type Reading = MockBufferForReading;
fn writing(&self) -> Self::Writing {
MockBufferForWriting::new(self.state.clone())
async fn writing(&self) -> Result<Self::Writing, WriteBufferError> {
Ok(MockBufferForWriting::new(self.state.clone()))
}
async fn reading(&self) -> Self::Reading {
MockBufferForReading::new(self.state.clone())
async fn reading(&self) -> Result<Self::Reading, WriteBufferError> {
Ok(MockBufferForReading::new(self.state.clone()))
}
}