test: add failing replay test

pull/24376/head
Marco Neumann 2021-07-28 14:00:52 +02:00
parent 29ddc36154
commit a0764cbafd
1 changed files with 239 additions and 0 deletions

View File

@ -951,6 +951,245 @@ mod tests {
.await;
}
#[tokio::test]
async fn replay_ok_interleaved_writes() {
ReplayTest {
n_sequencers: 1,
steps: vec![
// let's ingest some data for two partitions a and b
Step::Ingest(vec![
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 0,
lp: "table_1,tag_partition_by=a bar=10 0",
},
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 1,
lp: "table_1,tag_partition_by=b bar=20 0",
},
]),
Step::Await(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+----------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+----------------------+",
"| 10 | a | 1970-01-01T00:00:00Z |",
"| 20 | b | 1970-01-01T00:00:00Z |",
"+-----+------------------+----------------------+",
],
)]),
// only persist partition a
Step::Persist(vec![("table_1", "tag_partition_by_a")]),
// ================================================================================
// after restart the non-persisted partition (B) is gone
Step::Restart,
Step::Assert(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+----------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+----------------------+",
"| 10 | a | 1970-01-01T00:00:00Z |",
"+-----+------------------+----------------------+",
],
)]),
// ...but replay can bring the data back without ingesting more data
Step::Ingest(vec![
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 2,
lp: "table_1,tag_partition_by=a bar=11 10",
},
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 3,
lp: "table_1,tag_partition_by=b bar=21 10",
},
]),
Step::Replay,
Step::Assert(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+----------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+----------------------+",
"| 10 | a | 1970-01-01T00:00:00Z |",
"| 20 | b | 1970-01-01T00:00:00Z |",
"+-----+------------------+----------------------+",
],
)]),
// now wait for all the to-be-ingested data
Step::Await(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 10 | a | 1970-01-01T00:00:00Z |",
"| 11 | a | 1970-01-01T00:00:00.000000010Z |",
"| 20 | b | 1970-01-01T00:00:00Z |",
"| 21 | b | 1970-01-01T00:00:00.000000010Z |",
"+-----+------------------+--------------------------------+",
],
)]),
// ...and only persist partition a (a 2nd time)
Step::Persist(vec![("table_1", "tag_partition_by_a")]),
// ================================================================================
// after restart partition b will be gone again
Step::Restart,
Step::Assert(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 10 | a | 1970-01-01T00:00:00Z |",
"| 11 | a | 1970-01-01T00:00:00.000000010Z |",
"+-----+------------------+--------------------------------+",
],
)]),
// ...but again replay can bring the data back without ingesting more data
Step::Ingest(vec![
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 4,
lp: "table_1,tag_partition_by=b bar=22 20",
},
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 5,
lp: "table_1,tag_partition_by=a bar=12 20",
},
]),
Step::Replay,
Step::Assert(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 10 | a | 1970-01-01T00:00:00Z |",
"| 11 | a | 1970-01-01T00:00:00.000000010Z |",
"| 20 | b | 1970-01-01T00:00:00Z |",
"| 21 | b | 1970-01-01T00:00:00.000000010Z |",
"+-----+------------------+--------------------------------+",
],
)]),
// now wait for all the to-be-ingested data
Step::Await(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 10 | a | 1970-01-01T00:00:00Z |",
"| 11 | a | 1970-01-01T00:00:00.000000010Z |",
"| 12 | a | 1970-01-01T00:00:00.000000020Z |",
"| 20 | b | 1970-01-01T00:00:00Z |",
"| 21 | b | 1970-01-01T00:00:00.000000010Z |",
"| 22 | b | 1970-01-01T00:00:00.000000020Z |",
"+-----+------------------+--------------------------------+",
],
)]),
// this time only persist partition b
Step::Persist(vec![("table_1", "tag_partition_by_b")]),
// ================================================================================
// after restart partition b will be fully present but the latest data for partition a is gone
Step::Restart,
Step::Assert(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 10 | a | 1970-01-01T00:00:00Z |",
"| 11 | a | 1970-01-01T00:00:00.000000010Z |",
"| 20 | b | 1970-01-01T00:00:00Z |",
"| 21 | b | 1970-01-01T00:00:00.000000010Z |",
"| 22 | b | 1970-01-01T00:00:00.000000020Z |",
"+-----+------------------+--------------------------------+",
],
)]),
// ...but again replay can bring the data back without ingesting more data
Step::Ingest(vec![
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 6,
lp: "table_1,tag_partition_by=b bar=23 30",
},
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 7,
lp: "table_1,tag_partition_by=a bar=13 30",
},
]),
Step::Replay,
Step::Assert(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 10 | a | 1970-01-01T00:00:00Z |",
"| 11 | a | 1970-01-01T00:00:00.000000010Z |",
"| 12 | a | 1970-01-01T00:00:00.000000020Z |",
"| 20 | b | 1970-01-01T00:00:00Z |",
"| 21 | b | 1970-01-01T00:00:00.000000010Z |",
"| 22 | b | 1970-01-01T00:00:00.000000020Z |",
"+-----+------------------+--------------------------------+",
],
)]),
// now wait for all the to-be-ingested data
Step::Await(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 10 | a | 1970-01-01T00:00:00Z |",
"| 11 | a | 1970-01-01T00:00:00.000000010Z |",
"| 12 | a | 1970-01-01T00:00:00.000000020Z |",
"| 13 | a | 1970-01-01T00:00:00.000000030Z |",
"| 20 | b | 1970-01-01T00:00:00Z |",
"| 21 | b | 1970-01-01T00:00:00.000000010Z |",
"| 22 | b | 1970-01-01T00:00:00.000000020Z |",
"| 23 | b | 1970-01-01T00:00:00.000000030Z |",
"+-----+------------------+--------------------------------+",
],
)]),
// finally persist both partitions
Step::Persist(vec![
("table_1", "tag_partition_by_a"),
("table_1", "tag_partition_by_b"),
]),
// ================================================================================
// and after this restart nothing is lost
Step::Restart,
Step::Assert(vec![Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 10 | a | 1970-01-01T00:00:00Z |",
"| 11 | a | 1970-01-01T00:00:00.000000010Z |",
"| 12 | a | 1970-01-01T00:00:00.000000020Z |",
"| 13 | a | 1970-01-01T00:00:00.000000030Z |",
"| 20 | b | 1970-01-01T00:00:00Z |",
"| 21 | b | 1970-01-01T00:00:00.000000010Z |",
"| 22 | b | 1970-01-01T00:00:00.000000020Z |",
"| 23 | b | 1970-01-01T00:00:00.000000030Z |",
"+-----+------------------+--------------------------------+",
],
)]),
],
}
.run()
.await;
}
#[tokio::test]
async fn replay_fail_sequencers_change() {
// create write buffer w/ sequencer 0 and 1