diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index 2566f2e376..97e22ac8e9 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -164,6 +164,7 @@ pub mod test_utils { test_sequencer_auto_creation(&adapter).await; test_sequencer_ids(&adapter).await; test_span_context(&adapter).await; + test_unknown_sequencer_write(&adapter).await; } /// Test IO with a single writer and single reader stream. @@ -696,6 +697,26 @@ pub mod test_utils { assert_span_context_eq(actual_context_2, &span_context_2); } + /// Test that writing to an unknown sequencer produces an error + async fn test_unknown_sequencer_write(adapter: &T) + where + T: TestAdapter, + { + let context = adapter.new_context(NonZeroU32::try_from(1).unwrap()).await; + + let entry = lp_to_entry("upc user=1 100"); + + let writer = context.writing(true).await.unwrap(); + + // flip bits to get an unknown sequencer + let sequencer_id = !set_pop_first(&mut writer.sequencer_ids()).unwrap(); + + writer + .store_entry(&entry, sequencer_id, None) + .await + .unwrap_err(); + } + /// Assert that the content of the reader is as expected. /// /// This will read `expected.len()` from the reader and then ensures that the stream is pending. diff --git a/write_buffer/src/mock.rs b/write_buffer/src/mock.rs index ec3d999a0e..ce636904cd 100644 --- a/write_buffer/src/mock.rs +++ b/write_buffer/src/mock.rs @@ -241,7 +241,11 @@ impl WriteBufferWriting for MockBufferForWriting { ) -> Result<(Sequence, Time), WriteBufferError> { let mut guard = self.state.entries.lock(); let entries = guard.as_mut().unwrap(); - let sequencer_entries = entries.get_mut(&sequencer_id).unwrap(); + let sequencer_entries = entries + .get_mut(&sequencer_id) + .ok_or_else::(|| { + format!("Unknown sequencer: {}", sequencer_id).into() + })?; let sequence_number = sequencer_entries.max_seqno.map(|n| n + 1).unwrap_or(0);