test: harden `assert_reader_content` a bit

- entries should be sorted by the stream, there is no need to sort the
  results
- ensure that there are no leftover entries in the stream by asserting
  that it is "pending"
pull/24376/head
Marco Neumann 2021-09-03 10:24:51 +02:00
parent d5662328b0
commit 2ea3b600d0
1 changed files with 21 additions and 5 deletions

View File

@ -454,6 +454,7 @@ pub mod test_utils {
where
R: WriteBufferReading,
{
// Ensure content of the streams
let mut streams = reader.streams();
assert_eq!(streams.len(), expected.len());
streams.sort_by_key(|(sequencer_id, _stream)| *sequencer_id);
@ -464,19 +465,34 @@ pub mod test_utils {
assert_eq!(actual_sequencer_id, *expected_sequencer_id);
// we need to limit the stream to `expected.len()` elements, otherwise it might be pending forever
let mut results: Vec<_> = actual_stream
let results: Vec<_> = actual_stream
.stream
.take(expected_entries.len())
.try_collect()
.await
.unwrap();
results.sort_by_key(|entry| {
let sequence = entry.sequence().unwrap();
(sequence.id, sequence.number)
});
let actual_entries: Vec<_> = results.iter().map(|entry| entry.entry()).collect();
assert_eq!(&&actual_entries[..], expected_entries);
}
// Ensure that streams a pending
let mut streams = reader.streams();
assert_eq!(streams.len(), expected.len());
streams.sort_by_key(|(sequencer_id, _stream)| *sequencer_id);
let waker = futures::task::noop_waker();
let mut cx = futures::task::Context::from_waker(&waker);
for (
(actual_sequencer_id, mut actual_stream),
(expected_sequencer_id, _expected_entries),
) in streams.into_iter().zip(expected.iter())
{
assert_eq!(actual_sequencer_id, *expected_sequencer_id);
// empty stream is pending
assert!(actual_stream.stream.poll_next_unpin(&mut cx).is_pending());
}
}
/// Return largest "milliseconds only" timestamp less than or equal to the given timestamp.