diff --git a/persistence_windows/src/persistence_windows.rs b/persistence_windows/src/persistence_windows.rs index 5ddf97ec16..b7ed726acb 100644 --- a/persistence_windows/src/persistence_windows.rs +++ b/persistence_windows/src/persistence_windows.rs @@ -131,6 +131,22 @@ impl PersistenceWindows { self.late_arrival_period = late_arrival_period; } + /// Marks sequence numbers as seen and persisted. + /// + /// This can be used during replay to keep in-memory information in sync with the already persisted data. + pub fn mark_seen_and_persisted(&mut self, partition_checkpoint: &PartitionCheckpoint) { + for (sequencer_id, min_max) in partition_checkpoint.sequencer_numbers_iter() { + match self.max_sequence_numbers.entry(sequencer_id) { + Entry::Occupied(mut occupied) => { + *occupied.get_mut() = (*occupied.get()).max(min_max.max()); + } + Entry::Vacant(vacant) => { + vacant.insert(min_max.max()); + } + } + } + } + /// Updates the windows with the information from a batch of rows from a single sequencer /// to the same partition. The min and max times are the times on the row data. The `received_at` /// Instant is when the data was received. Taking it in this function is really just about @@ -1524,4 +1540,37 @@ mod tests { let ckpt_sequencer_numbers: BTreeMap<_, _> = ckpt.sequencer_numbers_iter().collect(); assert_eq!(w.sequencer_numbers(), ckpt_sequencer_numbers); } + + #[test] + fn test_mark_seen_and_persisted() { + let late_arrival_period = Duration::from_secs(100); + let mut w = make_windows(late_arrival_period); + + let mut sequencer_numbers1 = BTreeMap::new(); + sequencer_numbers1.insert(1, OptionalMinMaxSequence::new(Some(1), 2)); + let ckpt1 = PartitionCheckpoint::new( + Arc::from("foo"), + Arc::from("bar"), + sequencer_numbers1, + Utc::now(), + ); + w.mark_seen_and_persisted(&ckpt1); + + let mut sequencer_numbers2 = BTreeMap::new(); + sequencer_numbers2.insert(1, OptionalMinMaxSequence::new(Some(0), 1)); + sequencer_numbers2.insert(2, OptionalMinMaxSequence::new(None, 3)); + let ckpt2 = PartitionCheckpoint::new( + Arc::from("foo"), + Arc::from("bar"), + sequencer_numbers2, + Utc::now(), + ); + w.mark_seen_and_persisted(&ckpt2); + + let actual = w.sequencer_numbers(); + let mut expected = BTreeMap::new(); + expected.insert(1, OptionalMinMaxSequence::new(None, 2)); + expected.insert(2, OptionalMinMaxSequence::new(None, 3)); + assert_eq!(actual, expected); + } } diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index 3e11412ee1..3ed5e62212 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -7,7 +7,10 @@ use std::{ use entry::{Sequence, TableBatch}; use futures::TryStreamExt; use observability_deps::tracing::info; -use persistence_windows::{checkpoint::ReplayPlan, min_max_sequence::OptionalMinMaxSequence}; +use persistence_windows::{ + checkpoint::ReplayPlan, min_max_sequence::OptionalMinMaxSequence, + persistence_windows::PersistenceWindows, +}; use snafu::{ResultExt, Snafu}; use write_buffer::config::WriteBufferConfig; @@ -231,6 +234,32 @@ pub async fn perform_replay(db: &Db, replay_plan: &ReplayPlan) -> Result<()> { } } } + + // remember max seen sequence numbers even for partitions that were not touched during replay + let late_arrival_window = db.rules().lifecycle_rules.late_arrive_window(); + for (table_name, partition_key) in replay_plan.partitions() { + if let Ok(partition) = db.partition(&table_name, &partition_key) { + let mut partition = partition.write(); + let partition_checkpoint = replay_plan + .last_partition_checkpoint(&table_name, &partition_key) + .expect("replay plan inconsistent"); + + match partition.persistence_windows_mut() { + Some(windows) => { + windows.mark_seen_and_persisted(partition_checkpoint); + } + None => { + let mut windows = PersistenceWindows::new( + partition.addr().clone(), + late_arrival_window, + db.background_worker_now(), + ); + windows.mark_seen_and_persisted(partition_checkpoint); + partition.set_persistence_windows(windows); + } + } + } + } } Ok(()) @@ -2010,6 +2039,136 @@ mod tests { ); } + #[tokio::test] + async fn replay_initializes_max_seen_sequence_numbers() { + // Ensures that either replay or the catalog loading initializes the maximum seen sequence numbers (per + // partition) correctly. Before this test (and its fix), sequence numbers were only written if there was any + // unpersisted range during replay. + // + // This is a regression test for https://github.com/influxdata/influxdb_iox/issues/2215 + ReplayTest { + n_sequencers: 2, + steps: vec![ + Step::Ingest(vec![ + TestSequencedEntry { + sequencer_id: 0, + sequence_number: 0, + lp: "table_1,tag_partition_by=b bar=10 10", + }, + TestSequencedEntry { + sequencer_id: 0, + sequence_number: 1, + lp: "table_1,tag_partition_by=a bar=20 20", + }, + ]), + Step::Await(vec![ + Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+------------------+--------------------------------+", + "| bar | tag_partition_by | time |", + "+-----+------------------+--------------------------------+", + "| 10 | b | 1970-01-01T00:00:00.000000010Z |", + "| 20 | a | 1970-01-01T00:00:00.000000020Z |", + "+-----+------------------+--------------------------------+", + ], + ), + ]), + Step::MakeWritesPersistable, + Step::Persist(vec![("table_1", "tag_partition_by_a")]), + Step::Restart, + Step::Replay, + Step::Assert(vec![ + Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+------------------+--------------------------------+", + "| bar | tag_partition_by | time |", + "+-----+------------------+--------------------------------+", + "| 10 | b | 1970-01-01T00:00:00.000000010Z |", + "| 20 | a | 1970-01-01T00:00:00.000000020Z |", + "+-----+------------------+--------------------------------+", + ], + ), + ]), + Step::Ingest(vec![ + TestSequencedEntry { + sequencer_id: 1, + sequence_number: 0, + lp: "table_1,tag_partition_by=a bar=30 30", + }, + TestSequencedEntry { + sequencer_id: 0, + sequence_number: 2, + lp: "table_1,tag_partition_by=b bar=40 40", + }, + ]), + Step::Await(vec![ + Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+------------------+--------------------------------+", + "| bar | tag_partition_by | time |", + "+-----+------------------+--------------------------------+", + "| 10 | b | 1970-01-01T00:00:00.000000010Z |", + "| 20 | a | 1970-01-01T00:00:00.000000020Z |", + "| 30 | a | 1970-01-01T00:00:00.000000030Z |", + "| 40 | b | 1970-01-01T00:00:00.000000040Z |", + "+-----+------------------+--------------------------------+", + ], + ), + ]), + Step::MakeWritesPersistable, + Step::Persist(vec![("table_1", "tag_partition_by_a")]), + Step::Assert(vec![ + // there should be two persisted chunks for partition a + Check::Query( + "select storage, count(*) as n from system.chunks where partition_key = 'tag_partition_by_a' group by storage order by storage", + vec![ + "+--------------------------+---+", + "| storage | n |", + "+--------------------------+---+", + "| ObjectStoreOnly | 1 |", + "| ReadBufferAndObjectStore | 1 |", + "+--------------------------+---+", + ], + ), + ]), + Step::Restart, + Step::Replay, + Step::Assert(vec![ + Check::Query( + "select * from table_1 order by bar", + vec![ + "+-----+------------------+--------------------------------+", + "| bar | tag_partition_by | time |", + "+-----+------------------+--------------------------------+", + "| 10 | b | 1970-01-01T00:00:00.000000010Z |", + "| 20 | a | 1970-01-01T00:00:00.000000020Z |", + "| 30 | a | 1970-01-01T00:00:00.000000030Z |", + "| 40 | b | 1970-01-01T00:00:00.000000040Z |", + "+-----+------------------+--------------------------------+", + ], + ), + // no additional chunk for partition a was created + Check::Query( + "select storage, count(*) as n from system.chunks where partition_key = 'tag_partition_by_a' group by storage order by storage", + vec![ + "+-----------------+---+", + "| storage | n |", + "+-----------------+---+", + "| ObjectStoreOnly | 2 |", + "+-----------------+---+", + ], + ), + ]), + ], + ..Default::default() + } + .run() + .await + } + #[tokio::test] async fn replay_fail_sequencers_change() { // create write buffer w/ sequencer 0 and 1