Merge pull request #2850 from influxdata/crepererum/write_buffer_unknown_sequencer
fix: do not panic when writing to an unknown sequencerpull/24376/head
commit
5e09061ac8
|
@ -164,6 +164,7 @@ pub mod test_utils {
|
||||||
test_sequencer_auto_creation(&adapter).await;
|
test_sequencer_auto_creation(&adapter).await;
|
||||||
test_sequencer_ids(&adapter).await;
|
test_sequencer_ids(&adapter).await;
|
||||||
test_span_context(&adapter).await;
|
test_span_context(&adapter).await;
|
||||||
|
test_unknown_sequencer_write(&adapter).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Test IO with a single writer and single reader stream.
|
/// Test IO with a single writer and single reader stream.
|
||||||
|
@ -696,6 +697,26 @@ pub mod test_utils {
|
||||||
assert_span_context_eq(actual_context_2, &span_context_2);
|
assert_span_context_eq(actual_context_2, &span_context_2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Test that writing to an unknown sequencer produces an error
|
||||||
|
async fn test_unknown_sequencer_write<T>(adapter: &T)
|
||||||
|
where
|
||||||
|
T: TestAdapter,
|
||||||
|
{
|
||||||
|
let context = adapter.new_context(NonZeroU32::try_from(1).unwrap()).await;
|
||||||
|
|
||||||
|
let entry = lp_to_entry("upc user=1 100");
|
||||||
|
|
||||||
|
let writer = context.writing(true).await.unwrap();
|
||||||
|
|
||||||
|
// flip bits to get an unknown sequencer
|
||||||
|
let sequencer_id = !set_pop_first(&mut writer.sequencer_ids()).unwrap();
|
||||||
|
|
||||||
|
writer
|
||||||
|
.store_entry(&entry, sequencer_id, None)
|
||||||
|
.await
|
||||||
|
.unwrap_err();
|
||||||
|
}
|
||||||
|
|
||||||
/// Assert that the content of the reader is as expected.
|
/// Assert that the content of the reader is as expected.
|
||||||
///
|
///
|
||||||
/// This will read `expected.len()` from the reader and then ensures that the stream is pending.
|
/// This will read `expected.len()` from the reader and then ensures that the stream is pending.
|
||||||
|
|
|
@ -241,7 +241,11 @@ impl WriteBufferWriting for MockBufferForWriting {
|
||||||
) -> Result<(Sequence, Time), WriteBufferError> {
|
) -> Result<(Sequence, Time), WriteBufferError> {
|
||||||
let mut guard = self.state.entries.lock();
|
let mut guard = self.state.entries.lock();
|
||||||
let entries = guard.as_mut().unwrap();
|
let entries = guard.as_mut().unwrap();
|
||||||
let sequencer_entries = entries.get_mut(&sequencer_id).unwrap();
|
let sequencer_entries = entries
|
||||||
|
.get_mut(&sequencer_id)
|
||||||
|
.ok_or_else::<WriteBufferError, _>(|| {
|
||||||
|
format!("Unknown sequencer: {}", sequencer_id).into()
|
||||||
|
})?;
|
||||||
|
|
||||||
let sequence_number = sequencer_entries.max_seqno.map(|n| n + 1).unwrap_or(0);
|
let sequence_number = sequencer_entries.max_seqno.map(|n| n + 1).unwrap_or(0);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue