diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index eabaafe818..1767a644f2 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -82,7 +82,7 @@ pub mod test_utils { use entry::{test_helpers::lp_to_entry, Entry}; use futures::{StreamExt, TryStreamExt}; - use super::{WriteBufferReading, WriteBufferWriting}; + use super::{WriteBufferError, WriteBufferReading, WriteBufferWriting}; /// Adapter to make a concrete write buffer implementation work w/ [`perform_generic_tests`]. #[async_trait] @@ -109,10 +109,10 @@ pub mod test_utils { type Reading: WriteBufferReading; /// Create new writer. - fn writing(&self) -> Self::Writing; + async fn writing(&self) -> Result; /// Create new reader. - async fn reading(&self) -> Self::Reading; + async fn reading(&self) -> Result; } /// Generic test suite that must be passed by all proper write buffer implementations. @@ -150,8 +150,8 @@ pub mod test_utils { let entry_2 = lp_to_entry("upc user=2 200"); let entry_3 = lp_to_entry("upc user=3 300"); - let writer = context.writing(); - let mut reader = context.reading().await; + let writer = context.writing().await.unwrap(); + let mut reader = context.reading().await.unwrap(); let mut streams = reader.streams(); assert_eq!(streams.len(), 1); @@ -203,8 +203,8 @@ pub mod test_utils { let entry_2 = lp_to_entry("upc user=2 200"); let entry_3 = lp_to_entry("upc user=3 300"); - let writer = context.writing(); - let mut reader = context.reading().await; + let writer = context.writing().await.unwrap(); + let mut reader = context.reading().await.unwrap(); let waker = futures::task::noop_waker(); let mut cx = futures::task::Context::from_waker(&waker); @@ -266,8 +266,8 @@ pub mod test_utils { let entry_2 = lp_to_entry("upc user=2 200"); let entry_3 = lp_to_entry("upc user=3 300"); - let writer = context.writing(); - let mut reader = context.reading().await; + let writer = context.writing().await.unwrap(); + let mut reader = context.reading().await.unwrap(); let mut streams = reader.streams(); assert_eq!(streams.len(), 2); @@ -325,10 +325,10 @@ pub mod test_utils { let entry_east_2 = lp_to_entry("upc,region=east user=2 200"); let entry_west_1 = lp_to_entry("upc,region=west user=1 200"); - let writer_1 = context.writing(); - let writer_2 = context.writing(); - let mut reader_1 = context.reading().await; - let mut reader_2 = context.reading().await; + let writer_1 = context.writing().await.unwrap(); + let writer_2 = context.writing().await.unwrap(); + let mut reader_1 = context.reading().await.unwrap(); + let mut reader_2 = context.reading().await.unwrap(); // TODO: do not hard-code sequencer IDs here but provide a proper interface writer_1.store_entry(&entry_east_1, 0).await.unwrap(); @@ -368,13 +368,13 @@ pub mod test_utils { let entry_east_3 = lp_to_entry("upc,region=east user=3 300"); let entry_west_1 = lp_to_entry("upc,region=west user=1 200"); - let writer = context.writing(); + let writer = context.writing().await.unwrap(); let _sequence_number_east_1 = writer.store_entry(&entry_east_1, 0).await.unwrap().0.number; let sequence_number_east_2 = writer.store_entry(&entry_east_2, 0).await.unwrap().0.number; let _sequence_number_west_1 = writer.store_entry(&entry_west_1, 1).await.unwrap().0.number; - let mut reader_1 = context.reading().await; - let mut reader_2 = context.reading().await; + let mut reader_1 = context.reading().await.unwrap(); + let mut reader_2 = context.reading().await.unwrap(); // forward seek reader_1.seek(0, sequence_number_east_2).await.unwrap(); @@ -429,8 +429,8 @@ pub mod test_utils { let entry_east_2 = lp_to_entry("upc,region=east user=2 200"); let entry_west_1 = lp_to_entry("upc,region=west user=1 200"); - let writer = context.writing(); - let mut reader = context.reading().await; + let writer = context.writing().await.unwrap(); + let mut reader = context.reading().await.unwrap(); let mut streams = reader.streams(); assert_eq!(streams.len(), 2); @@ -471,8 +471,8 @@ pub mod test_utils { let entry = lp_to_entry("upc user=1 100"); - let writer = context.writing(); - let mut reader = context.reading().await; + let writer = context.writing().await.unwrap(); + let mut reader = context.reading().await.unwrap(); let mut streams = reader.streams(); assert_eq!(streams.len(), 1); diff --git a/write_buffer/src/kafka.rs b/write_buffer/src/kafka.rs index e49340d3b8..03ac020c97 100644 --- a/write_buffer/src/kafka.rs +++ b/write_buffer/src/kafka.rs @@ -494,13 +494,15 @@ mod tests { type Reading = KafkaBufferConsumer; - fn writing(&self) -> Self::Writing { - KafkaBufferProducer::new(&self.conn, &self.database_name, &Default::default()).unwrap() + async fn writing(&self) -> Result { + KafkaBufferProducer::new(&self.conn, &self.database_name, &Default::default()) + .map_err(|e| e.into()) } - async fn reading(&self) -> Self::Reading { + async fn reading(&self) -> Result { let server_id = self.server_id_counter.fetch_add(1, Ordering::SeqCst); let server_id = ServerId::try_from(server_id).unwrap(); + KafkaBufferConsumer::new( &self.conn, server_id, @@ -508,7 +510,7 @@ mod tests { &Default::default(), ) .await - .unwrap() + .map_err(|e| e.into()) } } diff --git a/write_buffer/src/mock.rs b/write_buffer/src/mock.rs index 36953bc6f4..a218a91168 100644 --- a/write_buffer/src/mock.rs +++ b/write_buffer/src/mock.rs @@ -407,12 +407,12 @@ mod tests { type Reading = MockBufferForReading; - fn writing(&self) -> Self::Writing { - MockBufferForWriting::new(self.state.clone()) + async fn writing(&self) -> Result { + Ok(MockBufferForWriting::new(self.state.clone())) } - async fn reading(&self) -> Self::Reading { - MockBufferForReading::new(self.state.clone()) + async fn reading(&self) -> Result { + Ok(MockBufferForReading::new(self.state.clone())) } }