test: Add a test of reset_to_earliest for all write buffer implementations

This is the basic test case; I've filed #4651 for the more complex test
needing deletion of records from the write buffer.
pull/24376/head
Carol (Nichols || Goulding) 2022-05-20 13:21:50 -04:00
parent bcbf7b4f46
commit e5e08e5b16
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
1 changed files with 46 additions and 0 deletions

View File

@ -350,6 +350,7 @@ pub mod test_utils {
test_multi_sequencer_io(&adapter).await;
test_multi_writer_multi_reader(&adapter).await;
test_seek(&adapter).await;
test_reset_to_earliest(&adapter).await;
test_watermark(&adapter).await;
test_timestamp(&adapter).await;
test_sequencer_auto_creation(&adapter).await;
@ -668,6 +669,51 @@ pub mod test_utils {
assert_reader_content(&mut handler_1_1_a, &[&w_east_1, &w_east_2, &w_east_3]).await;
}
/// Test reset to earliest implemention of readers.
///
/// This tests that:
///
/// - Calling the function jumps to the earliest available sequence number if the earliest
/// available sequence number is earlier than the current sequence number
/// - Calling the function jumps to the earliest available sequence number if the earliest
/// available sequence number is later than the current sequence number
async fn test_reset_to_earliest<T>(adapter: &T)
where
T: TestAdapter,
{
let context = adapter.new_context(NonZeroU32::try_from(2).unwrap()).await;
let entry_east_1 = "upc,region=east user=1 100";
let entry_east_2 = "upc,region=east user=2 200";
let writer = context.writing(true).await.unwrap();
let mut sequencer_ids = writer.sequencer_ids();
let sequencer_id_1 = set_pop_first(&mut sequencer_ids).unwrap();
let w_east_1 = write("namespace", &writer, entry_east_1, sequencer_id_1, None).await;
let w_east_2 = write("namespace", &writer, entry_east_2, sequencer_id_1, None).await;
let reader_1 = context.reading(true).await.unwrap();
let mut handler_1_1_a = reader_1.stream_handler(sequencer_id_1).await.unwrap();
// forward seek
handler_1_1_a
.seek(w_east_2.meta().sequence().unwrap().sequence_number)
.await
.unwrap();
assert_reader_content(&mut handler_1_1_a, &[&w_east_2]).await;
// reset to earliest goes back to 0; stream re-fetches earliest record
handler_1_1_a.reset_to_earliest();
assert_reader_content(&mut handler_1_1_a, &[&w_east_1, &w_east_2]).await;
// TODO: https://github.com/influxdata/influxdb_iox/issues/4651
// Remove first write operation to simulate retention policies evicting some records
// reset to earliest goes to whatever's available
}
/// Test watermark fetching.
///
/// This tests that: