From a0764cbafd6b13446b76974ead9ffb0888a82ce0 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 28 Jul 2021 14:00:52 +0200 Subject: [PATCH] test: add failing replay test --- server/src/db/replay.rs | 239 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 239 insertions(+) diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index f2ec4892b4..412f602c57 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -951,6 +951,245 @@ mod tests { .await; } + #[tokio::test] + async fn replay_ok_interleaved_writes() { + ReplayTest { + n_sequencers: 1, + steps: vec![ + // let's ingest some data for two partitions a and b + Step::Ingest(vec![ + TestSequencedEntry { + sequencer_id: 0, + sequence_number: 0, + lp: "table_1,tag_partition_by=a bar=10 0", + }, + TestSequencedEntry { + sequencer_id: 0, + sequence_number: 1, + lp: "table_1,tag_partition_by=b bar=20 0", + }, + ]), + Step::Await(vec![Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+------------------+----------------------+", + "| bar | tag_partition_by | time |", + "+-----+------------------+----------------------+", + "| 10 | a | 1970-01-01T00:00:00Z |", + "| 20 | b | 1970-01-01T00:00:00Z |", + "+-----+------------------+----------------------+", + ], + )]), + // only persist partition a + Step::Persist(vec![("table_1", "tag_partition_by_a")]), + // ================================================================================ + // after restart the non-persisted partition (B) is gone + Step::Restart, + Step::Assert(vec![Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+------------------+----------------------+", + "| bar | tag_partition_by | time |", + "+-----+------------------+----------------------+", + "| 10 | a | 1970-01-01T00:00:00Z |", + "+-----+------------------+----------------------+", + ], + )]), + // ...but replay can bring the data back without ingesting more data + Step::Ingest(vec![ + TestSequencedEntry { + sequencer_id: 0, + sequence_number: 2, + lp: "table_1,tag_partition_by=a bar=11 10", + }, + TestSequencedEntry { + sequencer_id: 0, + sequence_number: 3, + lp: "table_1,tag_partition_by=b bar=21 10", + }, + ]), + Step::Replay, + Step::Assert(vec![Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+------------------+----------------------+", + "| bar | tag_partition_by | time |", + "+-----+------------------+----------------------+", + "| 10 | a | 1970-01-01T00:00:00Z |", + "| 20 | b | 1970-01-01T00:00:00Z |", + "+-----+------------------+----------------------+", + ], + )]), + // now wait for all the to-be-ingested data + Step::Await(vec![Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+------------------+--------------------------------+", + "| bar | tag_partition_by | time |", + "+-----+------------------+--------------------------------+", + "| 10 | a | 1970-01-01T00:00:00Z |", + "| 11 | a | 1970-01-01T00:00:00.000000010Z |", + "| 20 | b | 1970-01-01T00:00:00Z |", + "| 21 | b | 1970-01-01T00:00:00.000000010Z |", + "+-----+------------------+--------------------------------+", + ], + )]), + // ...and only persist partition a (a 2nd time) + Step::Persist(vec![("table_1", "tag_partition_by_a")]), + // ================================================================================ + // after restart partition b will be gone again + Step::Restart, + Step::Assert(vec![Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+------------------+--------------------------------+", + "| bar | tag_partition_by | time |", + "+-----+------------------+--------------------------------+", + "| 10 | a | 1970-01-01T00:00:00Z |", + "| 11 | a | 1970-01-01T00:00:00.000000010Z |", + "+-----+------------------+--------------------------------+", + ], + )]), + // ...but again replay can bring the data back without ingesting more data + Step::Ingest(vec![ + TestSequencedEntry { + sequencer_id: 0, + sequence_number: 4, + lp: "table_1,tag_partition_by=b bar=22 20", + }, + TestSequencedEntry { + sequencer_id: 0, + sequence_number: 5, + lp: "table_1,tag_partition_by=a bar=12 20", + }, + ]), + Step::Replay, + Step::Assert(vec![Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+------------------+--------------------------------+", + "| bar | tag_partition_by | time |", + "+-----+------------------+--------------------------------+", + "| 10 | a | 1970-01-01T00:00:00Z |", + "| 11 | a | 1970-01-01T00:00:00.000000010Z |", + "| 20 | b | 1970-01-01T00:00:00Z |", + "| 21 | b | 1970-01-01T00:00:00.000000010Z |", + "+-----+------------------+--------------------------------+", + ], + )]), + // now wait for all the to-be-ingested data + Step::Await(vec![Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+------------------+--------------------------------+", + "| bar | tag_partition_by | time |", + "+-----+------------------+--------------------------------+", + "| 10 | a | 1970-01-01T00:00:00Z |", + "| 11 | a | 1970-01-01T00:00:00.000000010Z |", + "| 12 | a | 1970-01-01T00:00:00.000000020Z |", + "| 20 | b | 1970-01-01T00:00:00Z |", + "| 21 | b | 1970-01-01T00:00:00.000000010Z |", + "| 22 | b | 1970-01-01T00:00:00.000000020Z |", + "+-----+------------------+--------------------------------+", + ], + )]), + // this time only persist partition b + Step::Persist(vec![("table_1", "tag_partition_by_b")]), + // ================================================================================ + // after restart partition b will be fully present but the latest data for partition a is gone + Step::Restart, + Step::Assert(vec![Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+------------------+--------------------------------+", + "| bar | tag_partition_by | time |", + "+-----+------------------+--------------------------------+", + "| 10 | a | 1970-01-01T00:00:00Z |", + "| 11 | a | 1970-01-01T00:00:00.000000010Z |", + "| 20 | b | 1970-01-01T00:00:00Z |", + "| 21 | b | 1970-01-01T00:00:00.000000010Z |", + "| 22 | b | 1970-01-01T00:00:00.000000020Z |", + "+-----+------------------+--------------------------------+", + ], + )]), + // ...but again replay can bring the data back without ingesting more data + Step::Ingest(vec![ + TestSequencedEntry { + sequencer_id: 0, + sequence_number: 6, + lp: "table_1,tag_partition_by=b bar=23 30", + }, + TestSequencedEntry { + sequencer_id: 0, + sequence_number: 7, + lp: "table_1,tag_partition_by=a bar=13 30", + }, + ]), + Step::Replay, + Step::Assert(vec![Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+------------------+--------------------------------+", + "| bar | tag_partition_by | time |", + "+-----+------------------+--------------------------------+", + "| 10 | a | 1970-01-01T00:00:00Z |", + "| 11 | a | 1970-01-01T00:00:00.000000010Z |", + "| 12 | a | 1970-01-01T00:00:00.000000020Z |", + "| 20 | b | 1970-01-01T00:00:00Z |", + "| 21 | b | 1970-01-01T00:00:00.000000010Z |", + "| 22 | b | 1970-01-01T00:00:00.000000020Z |", + "+-----+------------------+--------------------------------+", + ], + )]), + // now wait for all the to-be-ingested data + Step::Await(vec![Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+------------------+--------------------------------+", + "| bar | tag_partition_by | time |", + "+-----+------------------+--------------------------------+", + "| 10 | a | 1970-01-01T00:00:00Z |", + "| 11 | a | 1970-01-01T00:00:00.000000010Z |", + "| 12 | a | 1970-01-01T00:00:00.000000020Z |", + "| 13 | a | 1970-01-01T00:00:00.000000030Z |", + "| 20 | b | 1970-01-01T00:00:00Z |", + "| 21 | b | 1970-01-01T00:00:00.000000010Z |", + "| 22 | b | 1970-01-01T00:00:00.000000020Z |", + "| 23 | b | 1970-01-01T00:00:00.000000030Z |", + "+-----+------------------+--------------------------------+", + ], + )]), + // finally persist both partitions + Step::Persist(vec![ + ("table_1", "tag_partition_by_a"), + ("table_1", "tag_partition_by_b"), + ]), + // ================================================================================ + // and after this restart nothing is lost + Step::Restart, + Step::Assert(vec![Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+------------------+--------------------------------+", + "| bar | tag_partition_by | time |", + "+-----+------------------+--------------------------------+", + "| 10 | a | 1970-01-01T00:00:00Z |", + "| 11 | a | 1970-01-01T00:00:00.000000010Z |", + "| 12 | a | 1970-01-01T00:00:00.000000020Z |", + "| 13 | a | 1970-01-01T00:00:00.000000030Z |", + "| 20 | b | 1970-01-01T00:00:00Z |", + "| 21 | b | 1970-01-01T00:00:00.000000010Z |", + "| 22 | b | 1970-01-01T00:00:00.000000020Z |", + "| 23 | b | 1970-01-01T00:00:00.000000030Z |", + "+-----+------------------+--------------------------------+", + ], + )]), + ], + } + .run() + .await; + } + #[tokio::test] async fn replay_fail_sequencers_change() { // create write buffer w/ sequencer 0 and 1