From 65b1ca2071b0d99783a77255cf25b276313000ab Mon Sep 17 00:00:00 2001
From: Marco Neumann <marco@crepererum.net>
Date: Mon, 9 Aug 2021 15:54:34 +0200
Subject: [PATCH] fix: also seed persistence windows when skipping replay

---
 server/src/db/replay.rs | 176 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 172 insertions(+), 4 deletions(-)

diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs
index 3ed5e62212..5f045d49db 100644
--- a/server/src/db/replay.rs
+++ b/server/src/db/replay.rs
@@ -4,11 +4,13 @@ use std::{
     time::Duration,
 };
 
+use chrono::Utc;
 use entry::{Sequence, TableBatch};
 use futures::TryStreamExt;
 use observability_deps::tracing::info;
 use persistence_windows::{
-    checkpoint::ReplayPlan, min_max_sequence::OptionalMinMaxSequence,
+    checkpoint::{PartitionCheckpoint, ReplayPlan},
+    min_max_sequence::OptionalMinMaxSequence,
     persistence_windows::PersistenceWindows,
 };
 use snafu::{ResultExt, Snafu};
@@ -93,11 +95,51 @@ pub async fn seek_to_end(db: &Db) -> Result<()> {
             watermarks.push((sequencer_id, watermark));
         }
 
-        for (sequencer_id, watermark) in watermarks {
+        for (sequencer_id, watermark) in &watermarks {
             write_buffer
-                .seek(sequencer_id, watermark)
+                .seek(*sequencer_id, *watermark)
                 .await
-                .context(SeekError { sequencer_id })?;
+                .context(SeekError {
+                    sequencer_id: *sequencer_id,
+                })?;
+        }
+
+        // remember max seen sequence numbers
+        let late_arrival_window = db.rules().lifecycle_rules.late_arrive_window();
+        let sequencer_numbers: BTreeMap<_, _> = watermarks
+            .into_iter()
+            .filter(|(_sequencer_id, watermark)| *watermark > 0)
+            .map(|(sequencer_id, watermark)| {
+                (
+                    sequencer_id,
+                    OptionalMinMaxSequence::new(None, watermark - 1),
+                )
+            })
+            .collect();
+        for partition in db.catalog.partitions() {
+            let mut partition = partition.write();
+
+            let dummy_checkpoint = PartitionCheckpoint::new(
+                Arc::from(partition.table_name()),
+                Arc::from(partition.key()),
+                sequencer_numbers.clone(),
+                Utc::now(),
+            );
+
+            match partition.persistence_windows_mut() {
+                Some(windows) => {
+                    windows.mark_seen_and_persisted(&dummy_checkpoint);
+                }
+                None => {
+                    let mut windows = PersistenceWindows::new(
+                        partition.addr().clone(),
+                        late_arrival_window,
+                        db.background_worker_now(),
+                    );
+                    windows.mark_seen_and_persisted(&dummy_checkpoint);
+                    partition.set_persistence_windows(windows);
+                }
+            }
         }
     }
 
@@ -2169,6 +2211,132 @@ mod tests {
         .await
     }
 
+    #[tokio::test]
+    async fn skip_replay_initializes_max_seen_sequence_numbers() {
+        // Similar case to `replay_initializes_max_seen_sequence_numbers` but instead of replaying, we skip replay to
+        // provoke a similar outcome.
+        //
+        // This is a regression test for https://github.com/influxdata/influxdb_iox/issues/2215
+        ReplayTest {
+            n_sequencers: 2,
+            steps: vec![
+                Step::Ingest(vec![
+                    TestSequencedEntry {
+                        sequencer_id: 0,
+                        sequence_number: 0,
+                        lp: "table_1,tag_partition_by=b bar=10 10",
+                    },
+                    TestSequencedEntry {
+                        sequencer_id: 0,
+                        sequence_number: 1,
+                        lp: "table_1,tag_partition_by=a bar=20 20",
+                    },
+                ]),
+                Step::Await(vec![
+                    Check::Query(
+                        "select * from table_1 order by bar",
+                        vec![
+                            "+-----+------------------+--------------------------------+",
+                            "| bar | tag_partition_by | time                           |",
+                            "+-----+------------------+--------------------------------+",
+                            "| 10  | b                | 1970-01-01T00:00:00.000000010Z |",
+                            "| 20  | a                | 1970-01-01T00:00:00.000000020Z |",
+                            "+-----+------------------+--------------------------------+",
+                        ],
+                    ),
+                ]),
+                Step::MakeWritesPersistable,
+                Step::Persist(vec![("table_1", "tag_partition_by_a")]),
+                Step::Restart,
+                Step::SkipReplay,
+                Step::Assert(vec![
+                    Check::Query(
+                        "select * from table_1 order by bar",
+                        vec![
+                            "+-----+------------------+--------------------------------+",
+                            "| bar | tag_partition_by | time                           |",
+                            "+-----+------------------+--------------------------------+",
+                            "| 20  | a                | 1970-01-01T00:00:00.000000020Z |",
+                            "+-----+------------------+--------------------------------+",
+                        ],
+                    ),
+                ]),
+                Step::Ingest(vec![
+                    TestSequencedEntry {
+                        sequencer_id: 1,
+                        sequence_number: 0,
+                        lp: "table_1,tag_partition_by=a bar=30 30",
+                    },
+                    TestSequencedEntry {
+                        sequencer_id: 0,
+                        sequence_number: 2,
+                        lp: "table_1,tag_partition_by=b bar=40 40",
+                    },
+                ]),
+                Step::Await(vec![
+                    Check::Query(
+                        "select * from table_1 order by bar",
+                        vec![
+                            "+-----+------------------+--------------------------------+",
+                            "| bar | tag_partition_by | time                           |",
+                            "+-----+------------------+--------------------------------+",
+                            "| 20  | a                | 1970-01-01T00:00:00.000000020Z |",
+                            "| 30  | a                | 1970-01-01T00:00:00.000000030Z |",
+                            "| 40  | b                | 1970-01-01T00:00:00.000000040Z |",
+                            "+-----+------------------+--------------------------------+",
+                        ],
+                    ),
+                ]),
+                Step::MakeWritesPersistable,
+                Step::Persist(vec![("table_1", "tag_partition_by_a")]),
+                Step::Assert(vec![
+                    // there should be two persisted chunks for partition a
+                    Check::Query(
+                        "select storage, count(*) as n from system.chunks where partition_key = 'tag_partition_by_a' group by storage order by storage",
+                        vec![
+                            "+--------------------------+---+",
+                            "| storage                  | n |",
+                            "+--------------------------+---+",
+                            "| ObjectStoreOnly          | 1 |",
+                            "| ReadBufferAndObjectStore | 1 |",
+                            "+--------------------------+---+",
+                        ],
+                    ),
+                ]),
+                Step::Restart,
+                Step::Replay,
+                Step::Assert(vec![
+                    Check::Query(
+                        "select * from table_1 order by bar",
+                        vec![
+                            "+-----+------------------+--------------------------------+",
+                            "| bar | tag_partition_by | time                           |",
+                            "+-----+------------------+--------------------------------+",
+                            "| 20  | a                | 1970-01-01T00:00:00.000000020Z |",
+                            "| 30  | a                | 1970-01-01T00:00:00.000000030Z |",
+                            "| 40  | b                | 1970-01-01T00:00:00.000000040Z |",
+                            "+-----+------------------+--------------------------------+",
+                        ],
+                    ),
+                    // no additional chunk for partition a was created
+                    Check::Query(
+                        "select storage, count(*) as n from system.chunks where partition_key = 'tag_partition_by_a' group by storage order by storage",
+                        vec![
+                            "+-----------------+---+",
+                            "| storage         | n |",
+                            "+-----------------+---+",
+                            "| ObjectStoreOnly | 2 |",
+                            "+-----------------+---+",
+                        ],
+                    ),
+                ]),
+            ],
+            ..Default::default()
+        }
+        .run()
+        .await
+    }
+
     #[tokio::test]
     async fn replay_fail_sequencers_change() {
         // create write buffer w/ sequencer 0 and 1