Merge branch 'main' into crepererum/write_buffer_optional_span_ctx

pull/24376/head
kodiakhq[bot] 2021-10-15 07:25:21 +00:00 committed by GitHub
commit 45c2c26168
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 1 deletions

View File

@ -164,6 +164,7 @@ pub mod test_utils {
test_sequencer_auto_creation(&adapter).await;
test_sequencer_ids(&adapter).await;
test_span_context(&adapter).await;
test_unknown_sequencer_write(&adapter).await;
}
/// Test IO with a single writer and single reader stream.
@ -696,6 +697,26 @@ pub mod test_utils {
assert_span_context_eq(actual_context_2, &span_context_2);
}
/// Test that writing to an unknown sequencer produces an error
async fn test_unknown_sequencer_write<T>(adapter: &T)
where
T: TestAdapter,
{
let context = adapter.new_context(NonZeroU32::try_from(1).unwrap()).await;
let entry = lp_to_entry("upc user=1 100");
let writer = context.writing(true).await.unwrap();
// flip bits to get an unknown sequencer
let sequencer_id = !set_pop_first(&mut writer.sequencer_ids()).unwrap();
writer
.store_entry(&entry, sequencer_id, None)
.await
.unwrap_err();
}
/// Assert that the content of the reader is as expected.
///
/// This will read `expected.len()` from the reader and then ensures that the stream is pending.

View File

@ -241,7 +241,11 @@ impl WriteBufferWriting for MockBufferForWriting {
) -> Result<(Sequence, Time), WriteBufferError> {
let mut guard = self.state.entries.lock();
let entries = guard.as_mut().unwrap();
let sequencer_entries = entries.get_mut(&sequencer_id).unwrap();
let sequencer_entries = entries
.get_mut(&sequencer_id)
.ok_or_else::<WriteBufferError, _>(|| {
format!("Unknown sequencer: {}", sequencer_id).into()
})?;
let sequence_number = sequencer_entries.max_seqno.map(|n| n + 1).unwrap_or(0);