diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index 06714a0504..c34b42a609 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -454,6 +454,7 @@ pub mod test_utils { where R: WriteBufferReading, { + // Ensure content of the streams let mut streams = reader.streams(); assert_eq!(streams.len(), expected.len()); streams.sort_by_key(|(sequencer_id, _stream)| *sequencer_id); @@ -464,19 +465,34 @@ pub mod test_utils { assert_eq!(actual_sequencer_id, *expected_sequencer_id); // we need to limit the stream to `expected.len()` elements, otherwise it might be pending forever - let mut results: Vec<_> = actual_stream + let results: Vec<_> = actual_stream .stream .take(expected_entries.len()) .try_collect() .await .unwrap(); - results.sort_by_key(|entry| { - let sequence = entry.sequence().unwrap(); - (sequence.id, sequence.number) - }); let actual_entries: Vec<_> = results.iter().map(|entry| entry.entry()).collect(); assert_eq!(&&actual_entries[..], expected_entries); } + + // Ensure that streams a pending + let mut streams = reader.streams(); + assert_eq!(streams.len(), expected.len()); + streams.sort_by_key(|(sequencer_id, _stream)| *sequencer_id); + + let waker = futures::task::noop_waker(); + let mut cx = futures::task::Context::from_waker(&waker); + + for ( + (actual_sequencer_id, mut actual_stream), + (expected_sequencer_id, _expected_entries), + ) in streams.into_iter().zip(expected.iter()) + { + assert_eq!(actual_sequencer_id, *expected_sequencer_id); + + // empty stream is pending + assert!(actual_stream.stream.poll_next_unpin(&mut cx).is_pending()); + } } /// Return largest "milliseconds only" timestamp less than or equal to the given timestamp.