refactor: improve replay test naming and add more docs
parent
43cb148566
commit
aa61eb2732
|
@ -229,7 +229,7 @@ mod tests {
|
|||
|
||||
/// Different checks for replay tests
|
||||
#[derive(Debug)]
|
||||
enum ReplayTestCheck {
|
||||
enum Check {
|
||||
/// Check that the set of partitions is as expected.
|
||||
///
|
||||
/// The partitions are by table name and partition key.
|
||||
|
@ -243,7 +243,7 @@ mod tests {
|
|||
|
||||
/// Action or check for replay test.
|
||||
#[derive(Debug)]
|
||||
enum ReplayTestAOC {
|
||||
enum ActionOrCheck {
|
||||
/// Ingest new entries into the write buffer.
|
||||
Ingest(Vec<TestSequencedEntry>),
|
||||
|
||||
|
@ -259,12 +259,12 @@ mod tests {
|
|||
Persist(Vec<(&'static str, &'static str)>),
|
||||
|
||||
/// Assert that all these checks pass.
|
||||
Assert(Vec<ReplayTestCheck>),
|
||||
Assert(Vec<Check>),
|
||||
|
||||
/// Wait that background loop generates desired state (all checks pass).
|
||||
///
|
||||
/// Background loop is started before the check and stopped afterwards.
|
||||
Await(Vec<ReplayTestCheck>),
|
||||
Await(Vec<Check>),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -272,8 +272,21 @@ mod tests {
|
|||
/// Number of sequencers in this test setup.
|
||||
n_sequencers: u32,
|
||||
|
||||
/// What to do in which order
|
||||
actions_and_checks: Vec<ReplayTestAOC>,
|
||||
/// What to do in which order.
|
||||
///
|
||||
/// # Serialization
|
||||
/// The execution of the entire test is purely serial with the exception of [`Await`](ActionOrCheck::Await) (see
|
||||
/// next section). That means that nothing happens concurrently during each step. Every step is finished and
|
||||
/// checked for errors before the next is started (e.g. [`Replay`](ActionOrCheck::Replay) is fully executed and
|
||||
/// it is ensured that there were no errors before a subsequent [`Assert`](ActionOrCheck::Assert) is evaluated).
|
||||
/// The database background worker is NOT active during any non-[`Await`](ActionOrCheck::Await)
|
||||
///
|
||||
/// # Await
|
||||
/// Sometimes the background worker is needed to perform something, e.g. to consume some data from the write
|
||||
/// buffer. In that case [`Await`](ActionOrCheck::Await) can be used. During this check (and only during this
|
||||
/// check) the background worker is active and the checks passed to [`Await`](ActionOrCheck::Await) are
|
||||
/// evaluated until they succeed. The background worker is stopped before the next test step is evaluated.
|
||||
steps: Vec<ActionOrCheck>,
|
||||
}
|
||||
|
||||
impl ReplayTest {
|
||||
|
@ -301,16 +314,16 @@ mod tests {
|
|||
);
|
||||
|
||||
// ==================== do: main loop ====================
|
||||
for (step, action_or_check) in self.actions_and_checks.into_iter().enumerate() {
|
||||
for (step, action_or_check) in self.steps.into_iter().enumerate() {
|
||||
println!("===== step {} =====\n{:?}", step + 1, action_or_check);
|
||||
|
||||
match action_or_check {
|
||||
ReplayTestAOC::Ingest(entries) => {
|
||||
ActionOrCheck::Ingest(entries) => {
|
||||
for se in entries {
|
||||
write_buffer_state.push_entry(se.build(&partition_template));
|
||||
}
|
||||
}
|
||||
ReplayTestAOC::Restart => {
|
||||
ActionOrCheck::Restart => {
|
||||
// first drop old DB
|
||||
#[allow(unused_assignments)]
|
||||
{
|
||||
|
@ -329,7 +342,7 @@ mod tests {
|
|||
.await,
|
||||
);
|
||||
}
|
||||
ReplayTestAOC::Replay => {
|
||||
ActionOrCheck::Replay => {
|
||||
let test_db = test_db.as_ref().unwrap();
|
||||
|
||||
test_db
|
||||
|
@ -338,7 +351,7 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
}
|
||||
ReplayTestAOC::Persist(partitions) => {
|
||||
ActionOrCheck::Persist(partitions) => {
|
||||
let test_db = test_db.as_ref().unwrap();
|
||||
let db = &test_db.db;
|
||||
|
||||
|
@ -367,11 +380,11 @@ mod tests {
|
|||
.unwrap();
|
||||
}
|
||||
}
|
||||
ReplayTestAOC::Assert(checks) => {
|
||||
ActionOrCheck::Assert(checks) => {
|
||||
let test_db = test_db.as_ref().unwrap();
|
||||
Self::eval_checks(&checks, true, test_db).await;
|
||||
}
|
||||
ReplayTestAOC::Await(checks) => {
|
||||
ActionOrCheck::Await(checks) => {
|
||||
let test_db = test_db.as_ref().unwrap();
|
||||
let db = &test_db.db;
|
||||
|
||||
|
@ -450,18 +463,18 @@ mod tests {
|
|||
.await
|
||||
}
|
||||
|
||||
async fn eval_checks(
|
||||
checks: &[ReplayTestCheck],
|
||||
use_assert: bool,
|
||||
test_db: &TestDb,
|
||||
) -> bool {
|
||||
/// Evaluates given checks.
|
||||
///
|
||||
/// If `use_assert = true` this function will panic in case of a failure. If `use_assert = false` it will just
|
||||
/// return `false` in the error case. `true` is returned in all checks passed.
|
||||
async fn eval_checks(checks: &[Check], use_assert: bool, test_db: &TestDb) -> bool {
|
||||
let db = &test_db.db;
|
||||
|
||||
for (step, check) in checks.iter().enumerate() {
|
||||
println!("check {}: {:?}", step + 1, check);
|
||||
|
||||
let res = match check {
|
||||
ReplayTestCheck::Partitions(partitions) => {
|
||||
Check::Partitions(partitions) => {
|
||||
let partitions_actual = Self::get_partitions(db);
|
||||
let partitions_actual: Vec<(&str, &str)> = partitions_actual
|
||||
.iter()
|
||||
|
@ -475,7 +488,7 @@ mod tests {
|
|||
use_assert,
|
||||
)
|
||||
}
|
||||
ReplayTestCheck::Query(query, expected) => {
|
||||
Check::Query(query, expected) => {
|
||||
let batches = run_query(Arc::clone(&db), query).await;
|
||||
|
||||
// we are throwing away the record batches after the assert, so we don't care about interior
|
||||
|
@ -499,11 +512,23 @@ mod tests {
|
|||
true
|
||||
}
|
||||
|
||||
fn eval_assert<F>(f: F, bubble: bool) -> bool
|
||||
/// Evaluates given function that may contain an `assert`.
|
||||
///
|
||||
/// This helper allows you use the same `assert` statement no matter if you want the error case to panic or if
|
||||
/// you just want to have an boolean expression that states "did it succeed?".
|
||||
///
|
||||
/// Behavior dependson `raise`:
|
||||
///
|
||||
/// - `raise = true`: The given function `f` will be executed. If it panics, `eval_assert` will just bubble up
|
||||
/// the error (aka panic as well). If the function `f` does not panic, `true` is returned (aka "it
|
||||
/// succeeded").
|
||||
/// - `raise = false`: The given function `f` will be executed but unwinds will be caught. If `f` did panic
|
||||
/// `false` will be returned (aka "it failed"), otherwise `true` will be returned (aka "it succeeded").
|
||||
fn eval_assert<F>(f: F, raise: bool) -> bool
|
||||
where
|
||||
F: Fn() + std::panic::UnwindSafe,
|
||||
{
|
||||
if bubble {
|
||||
if raise {
|
||||
f();
|
||||
true
|
||||
} else {
|
||||
|
@ -517,11 +542,11 @@ mod tests {
|
|||
async fn replay_metatest_fail_assert_partitions() {
|
||||
ReplayTest {
|
||||
n_sequencers: 1,
|
||||
actions_and_checks: vec![
|
||||
steps: vec![
|
||||
// that passes
|
||||
ReplayTestAOC::Assert(vec![ReplayTestCheck::Partitions(vec![])]),
|
||||
ActionOrCheck::Assert(vec![Check::Partitions(vec![])]),
|
||||
// that fails
|
||||
ReplayTestAOC::Assert(vec![ReplayTestCheck::Partitions(vec![(
|
||||
ActionOrCheck::Assert(vec![Check::Partitions(vec![(
|
||||
"table_2",
|
||||
"tag_partition_by_a",
|
||||
)])]),
|
||||
|
@ -536,18 +561,18 @@ mod tests {
|
|||
async fn replay_metatest_fail_assert_query() {
|
||||
ReplayTest {
|
||||
n_sequencers: 1,
|
||||
actions_and_checks: vec![
|
||||
ReplayTestAOC::Ingest(vec![TestSequencedEntry {
|
||||
steps: vec![
|
||||
ActionOrCheck::Ingest(vec![TestSequencedEntry {
|
||||
sequencer_id: 0,
|
||||
sequence_number: 0,
|
||||
lp: "table_1,tag_partition_by=a bar=1 0",
|
||||
}]),
|
||||
ReplayTestAOC::Await(vec![ReplayTestCheck::Partitions(vec![(
|
||||
ActionOrCheck::Await(vec![Check::Partitions(vec![(
|
||||
"table_1",
|
||||
"tag_partition_by_a",
|
||||
)])]),
|
||||
// that fails
|
||||
ReplayTestAOC::Assert(vec![ReplayTestCheck::Query(
|
||||
ActionOrCheck::Assert(vec![Check::Query(
|
||||
"select * from table_1",
|
||||
vec![
|
||||
"+-----+------------------+----------------------+",
|
||||
|
@ -567,8 +592,8 @@ mod tests {
|
|||
async fn replay_ok_two_partitions_persist_second() {
|
||||
ReplayTest {
|
||||
n_sequencers: 1,
|
||||
actions_and_checks: vec![
|
||||
ReplayTestAOC::Ingest(vec![
|
||||
steps: vec![
|
||||
ActionOrCheck::Ingest(vec![
|
||||
TestSequencedEntry {
|
||||
sequencer_id: 0,
|
||||
sequence_number: 0,
|
||||
|
@ -580,17 +605,17 @@ mod tests {
|
|||
lp: "table_2,tag_partition_by=a bar=20 0",
|
||||
},
|
||||
]),
|
||||
ReplayTestAOC::Await(vec![ReplayTestCheck::Partitions(vec![
|
||||
ActionOrCheck::Await(vec![Check::Partitions(vec![
|
||||
("table_1", "tag_partition_by_a"),
|
||||
("table_2", "tag_partition_by_a"),
|
||||
])]),
|
||||
ReplayTestAOC::Persist(vec![("table_2", "tag_partition_by_a")]),
|
||||
ReplayTestAOC::Restart,
|
||||
ReplayTestAOC::Assert(vec![ReplayTestCheck::Partitions(vec![(
|
||||
ActionOrCheck::Persist(vec![("table_2", "tag_partition_by_a")]),
|
||||
ActionOrCheck::Restart,
|
||||
ActionOrCheck::Assert(vec![Check::Partitions(vec![(
|
||||
"table_2",
|
||||
"tag_partition_by_a",
|
||||
)])]),
|
||||
ReplayTestAOC::Ingest(vec![
|
||||
ActionOrCheck::Ingest(vec![
|
||||
TestSequencedEntry {
|
||||
sequencer_id: 0,
|
||||
sequence_number: 2,
|
||||
|
@ -602,13 +627,13 @@ mod tests {
|
|||
lp: "table_2,tag_partition_by=b bar=21 10",
|
||||
},
|
||||
]),
|
||||
ReplayTestAOC::Replay,
|
||||
ReplayTestAOC::Assert(vec![
|
||||
ReplayTestCheck::Partitions(vec![
|
||||
ActionOrCheck::Replay,
|
||||
ActionOrCheck::Assert(vec![
|
||||
Check::Partitions(vec![
|
||||
("table_1", "tag_partition_by_a"),
|
||||
("table_2", "tag_partition_by_a"),
|
||||
]),
|
||||
ReplayTestCheck::Query(
|
||||
Check::Query(
|
||||
"select * from table_1 order by time",
|
||||
vec![
|
||||
"+-----+------------------+----------------------+",
|
||||
|
@ -618,7 +643,7 @@ mod tests {
|
|||
"+-----+------------------+----------------------+",
|
||||
],
|
||||
),
|
||||
ReplayTestCheck::Query(
|
||||
Check::Query(
|
||||
"select * from table_2 order by time",
|
||||
vec![
|
||||
"+-----+------------------+----------------------+",
|
||||
|
@ -629,14 +654,14 @@ mod tests {
|
|||
],
|
||||
),
|
||||
]),
|
||||
ReplayTestAOC::Await(vec![ReplayTestCheck::Partitions(vec![
|
||||
ActionOrCheck::Await(vec![Check::Partitions(vec![
|
||||
("table_1", "tag_partition_by_a"),
|
||||
("table_1", "tag_partition_by_b"),
|
||||
("table_2", "tag_partition_by_a"),
|
||||
("table_2", "tag_partition_by_b"),
|
||||
])]),
|
||||
ReplayTestAOC::Assert(vec![
|
||||
ReplayTestCheck::Query(
|
||||
ActionOrCheck::Assert(vec![
|
||||
Check::Query(
|
||||
"select * from table_1 order by time",
|
||||
vec![
|
||||
"+-----+------------------+--------------------------------+",
|
||||
|
@ -647,7 +672,7 @@ mod tests {
|
|||
"+-----+------------------+--------------------------------+",
|
||||
],
|
||||
),
|
||||
ReplayTestCheck::Query(
|
||||
Check::Query(
|
||||
"select * from table_2 order by time",
|
||||
vec![
|
||||
"+-----+------------------+--------------------------------+",
|
||||
|
@ -669,8 +694,8 @@ mod tests {
|
|||
async fn replay_ok_two_partitions_persist_first() {
|
||||
ReplayTest {
|
||||
n_sequencers: 1,
|
||||
actions_and_checks: vec![
|
||||
ReplayTestAOC::Ingest(vec![
|
||||
steps: vec![
|
||||
ActionOrCheck::Ingest(vec![
|
||||
TestSequencedEntry {
|
||||
sequencer_id: 0,
|
||||
sequence_number: 0,
|
||||
|
@ -682,17 +707,17 @@ mod tests {
|
|||
lp: "table_2,tag_partition_by=a bar=20 0",
|
||||
},
|
||||
]),
|
||||
ReplayTestAOC::Await(vec![ReplayTestCheck::Partitions(vec![
|
||||
ActionOrCheck::Await(vec![Check::Partitions(vec![
|
||||
("table_1", "tag_partition_by_a"),
|
||||
("table_2", "tag_partition_by_a"),
|
||||
])]),
|
||||
ReplayTestAOC::Persist(vec![("table_1", "tag_partition_by_a")]),
|
||||
ReplayTestAOC::Restart,
|
||||
ReplayTestAOC::Assert(vec![ReplayTestCheck::Partitions(vec![(
|
||||
ActionOrCheck::Persist(vec![("table_1", "tag_partition_by_a")]),
|
||||
ActionOrCheck::Restart,
|
||||
ActionOrCheck::Assert(vec![Check::Partitions(vec![(
|
||||
"table_1",
|
||||
"tag_partition_by_a",
|
||||
)])]),
|
||||
ReplayTestAOC::Ingest(vec![
|
||||
ActionOrCheck::Ingest(vec![
|
||||
TestSequencedEntry {
|
||||
sequencer_id: 0,
|
||||
sequence_number: 2,
|
||||
|
@ -704,13 +729,13 @@ mod tests {
|
|||
lp: "table_2,tag_partition_by=b bar=21 10",
|
||||
},
|
||||
]),
|
||||
ReplayTestAOC::Replay,
|
||||
ReplayTestAOC::Assert(vec![
|
||||
ReplayTestCheck::Partitions(vec![
|
||||
ActionOrCheck::Replay,
|
||||
ActionOrCheck::Assert(vec![
|
||||
Check::Partitions(vec![
|
||||
("table_1", "tag_partition_by_a"),
|
||||
("table_2", "tag_partition_by_a"),
|
||||
]),
|
||||
ReplayTestCheck::Query(
|
||||
Check::Query(
|
||||
"select * from table_1 order by time",
|
||||
vec![
|
||||
"+-----+------------------+----------------------+",
|
||||
|
@ -720,7 +745,7 @@ mod tests {
|
|||
"+-----+------------------+----------------------+",
|
||||
],
|
||||
),
|
||||
ReplayTestCheck::Query(
|
||||
Check::Query(
|
||||
"select * from table_2 order by time",
|
||||
vec![
|
||||
"+-----+------------------+----------------------+",
|
||||
|
@ -731,14 +756,14 @@ mod tests {
|
|||
],
|
||||
),
|
||||
]),
|
||||
ReplayTestAOC::Await(vec![ReplayTestCheck::Partitions(vec![
|
||||
ActionOrCheck::Await(vec![Check::Partitions(vec![
|
||||
("table_1", "tag_partition_by_a"),
|
||||
("table_1", "tag_partition_by_b"),
|
||||
("table_2", "tag_partition_by_a"),
|
||||
("table_2", "tag_partition_by_b"),
|
||||
])]),
|
||||
ReplayTestAOC::Assert(vec![
|
||||
ReplayTestCheck::Query(
|
||||
ActionOrCheck::Assert(vec![
|
||||
Check::Query(
|
||||
"select * from table_1 order by time",
|
||||
vec![
|
||||
"+-----+------------------+--------------------------------+",
|
||||
|
@ -749,7 +774,7 @@ mod tests {
|
|||
"+-----+------------------+--------------------------------+",
|
||||
],
|
||||
),
|
||||
ReplayTestCheck::Query(
|
||||
Check::Query(
|
||||
"select * from table_2 order by time",
|
||||
vec![
|
||||
"+-----+------------------+--------------------------------+",
|
||||
|
@ -771,21 +796,21 @@ mod tests {
|
|||
async fn replay_ok_nothing_to_replay() {
|
||||
ReplayTest {
|
||||
n_sequencers: 1,
|
||||
actions_and_checks: vec![
|
||||
ReplayTestAOC::Restart,
|
||||
ReplayTestAOC::Assert(vec![ReplayTestCheck::Partitions(vec![])]),
|
||||
ReplayTestAOC::Ingest(vec![TestSequencedEntry {
|
||||
steps: vec![
|
||||
ActionOrCheck::Restart,
|
||||
ActionOrCheck::Assert(vec![Check::Partitions(vec![])]),
|
||||
ActionOrCheck::Ingest(vec![TestSequencedEntry {
|
||||
sequencer_id: 0,
|
||||
sequence_number: 2,
|
||||
lp: "table_1,tag_partition_by=a bar=1 0",
|
||||
}]),
|
||||
ReplayTestAOC::Replay,
|
||||
ReplayTestAOC::Assert(vec![ReplayTestCheck::Partitions(vec![])]),
|
||||
ReplayTestAOC::Await(vec![ReplayTestCheck::Partitions(vec![(
|
||||
ActionOrCheck::Replay,
|
||||
ActionOrCheck::Assert(vec![Check::Partitions(vec![])]),
|
||||
ActionOrCheck::Await(vec![Check::Partitions(vec![(
|
||||
"table_1",
|
||||
"tag_partition_by_a",
|
||||
)])]),
|
||||
ReplayTestAOC::Assert(vec![ReplayTestCheck::Query(
|
||||
ActionOrCheck::Assert(vec![Check::Query(
|
||||
"select * from table_1 order by time",
|
||||
vec![
|
||||
"+-----+------------------+----------------------+",
|
||||
|
@ -814,8 +839,8 @@ mod tests {
|
|||
// table 2: partition a: from sequencer 1, not persisted but recovered during replay
|
||||
ReplayTest {
|
||||
n_sequencers: 3,
|
||||
actions_and_checks: vec![
|
||||
ReplayTestAOC::Ingest(vec![
|
||||
steps: vec![
|
||||
ActionOrCheck::Ingest(vec![
|
||||
TestSequencedEntry {
|
||||
sequencer_id: 1,
|
||||
sequence_number: 0,
|
||||
|
@ -832,28 +857,28 @@ mod tests {
|
|||
lp: "table_2,tag_partition_by=a bar=20 0",
|
||||
},
|
||||
]),
|
||||
ReplayTestAOC::Await(vec![ReplayTestCheck::Partitions(vec![
|
||||
ActionOrCheck::Await(vec![Check::Partitions(vec![
|
||||
("table_1", "tag_partition_by_a"),
|
||||
("table_2", "tag_partition_by_a"),
|
||||
])]),
|
||||
ReplayTestAOC::Persist(vec![("table_1", "tag_partition_by_a")]),
|
||||
ReplayTestAOC::Restart,
|
||||
ReplayTestAOC::Assert(vec![ReplayTestCheck::Partitions(vec![(
|
||||
ActionOrCheck::Persist(vec![("table_1", "tag_partition_by_a")]),
|
||||
ActionOrCheck::Restart,
|
||||
ActionOrCheck::Assert(vec![Check::Partitions(vec![(
|
||||
"table_1",
|
||||
"tag_partition_by_a",
|
||||
)])]),
|
||||
ReplayTestAOC::Ingest(vec![TestSequencedEntry {
|
||||
ActionOrCheck::Ingest(vec![TestSequencedEntry {
|
||||
sequencer_id: 0,
|
||||
sequence_number: 1,
|
||||
lp: "table_1,tag_partition_by=b bar=12 20",
|
||||
}]),
|
||||
ReplayTestAOC::Replay,
|
||||
ReplayTestAOC::Assert(vec![
|
||||
ReplayTestCheck::Partitions(vec![
|
||||
ActionOrCheck::Replay,
|
||||
ActionOrCheck::Assert(vec![
|
||||
Check::Partitions(vec![
|
||||
("table_1", "tag_partition_by_a"),
|
||||
("table_2", "tag_partition_by_a"),
|
||||
]),
|
||||
ReplayTestCheck::Query(
|
||||
Check::Query(
|
||||
"select * from table_1 order by time",
|
||||
vec![
|
||||
"+-----+------------------+--------------------------------+",
|
||||
|
@ -864,7 +889,7 @@ mod tests {
|
|||
"+-----+------------------+--------------------------------+",
|
||||
],
|
||||
),
|
||||
ReplayTestCheck::Query(
|
||||
Check::Query(
|
||||
"select * from table_2 order by time",
|
||||
vec![
|
||||
"+-----+------------------+----------------------+",
|
||||
|
@ -875,13 +900,13 @@ mod tests {
|
|||
],
|
||||
),
|
||||
]),
|
||||
ReplayTestAOC::Await(vec![ReplayTestCheck::Partitions(vec![
|
||||
ActionOrCheck::Await(vec![Check::Partitions(vec![
|
||||
("table_1", "tag_partition_by_a"),
|
||||
("table_1", "tag_partition_by_b"),
|
||||
("table_2", "tag_partition_by_a"),
|
||||
])]),
|
||||
ReplayTestAOC::Assert(vec![
|
||||
ReplayTestCheck::Query(
|
||||
ActionOrCheck::Assert(vec![
|
||||
Check::Query(
|
||||
"select * from table_1 order by time",
|
||||
vec![
|
||||
"+-----+------------------+--------------------------------+",
|
||||
|
@ -893,7 +918,7 @@ mod tests {
|
|||
"+-----+------------------+--------------------------------+",
|
||||
],
|
||||
),
|
||||
ReplayTestCheck::Query(
|
||||
Check::Query(
|
||||
"select * from table_2 order by time",
|
||||
vec![
|
||||
"+-----+------------------+----------------------+",
|
||||
|
|
Loading…
Reference in New Issue