fix: always remember max seen sequ. numbers during replay

Do not forget max seen sequence numbers for partition-sequencer
combinations that can be skipped during replay.

Fixes #2215.
pull/24376/head
Marco Neumann 2021-08-09 12:41:20 +02:00
parent 8626e9980b
commit 2eaf486eac
2 changed files with 209 additions and 1 deletions

View File

@ -131,6 +131,22 @@ impl PersistenceWindows {
self.late_arrival_period = late_arrival_period;
}
/// Marks sequence numbers as seen and persisted.
///
/// This can be used during replay to keep in-memory information in sync with the already persisted data.
pub fn mark_seen_and_persisted(&mut self, partition_checkpoint: &PartitionCheckpoint) {
for (sequencer_id, min_max) in partition_checkpoint.sequencer_numbers_iter() {
match self.max_sequence_numbers.entry(sequencer_id) {
Entry::Occupied(mut occupied) => {
*occupied.get_mut() = (*occupied.get()).max(min_max.max());
}
Entry::Vacant(vacant) => {
vacant.insert(min_max.max());
}
}
}
}
/// Updates the windows with the information from a batch of rows from a single sequencer
/// to the same partition. The min and max times are the times on the row data. The `received_at`
/// Instant is when the data was received. Taking it in this function is really just about
@ -1524,4 +1540,37 @@ mod tests {
let ckpt_sequencer_numbers: BTreeMap<_, _> = ckpt.sequencer_numbers_iter().collect();
assert_eq!(w.sequencer_numbers(), ckpt_sequencer_numbers);
}
#[test]
fn test_mark_seen_and_persisted() {
let late_arrival_period = Duration::from_secs(100);
let mut w = make_windows(late_arrival_period);
let mut sequencer_numbers1 = BTreeMap::new();
sequencer_numbers1.insert(1, OptionalMinMaxSequence::new(Some(1), 2));
let ckpt1 = PartitionCheckpoint::new(
Arc::from("foo"),
Arc::from("bar"),
sequencer_numbers1,
Utc::now(),
);
w.mark_seen_and_persisted(&ckpt1);
let mut sequencer_numbers2 = BTreeMap::new();
sequencer_numbers2.insert(1, OptionalMinMaxSequence::new(Some(0), 1));
sequencer_numbers2.insert(2, OptionalMinMaxSequence::new(None, 3));
let ckpt2 = PartitionCheckpoint::new(
Arc::from("foo"),
Arc::from("bar"),
sequencer_numbers2,
Utc::now(),
);
w.mark_seen_and_persisted(&ckpt2);
let actual = w.sequencer_numbers();
let mut expected = BTreeMap::new();
expected.insert(1, OptionalMinMaxSequence::new(None, 2));
expected.insert(2, OptionalMinMaxSequence::new(None, 3));
assert_eq!(actual, expected);
}
}

View File

@ -7,7 +7,10 @@ use std::{
use entry::{Sequence, TableBatch};
use futures::TryStreamExt;
use observability_deps::tracing::info;
use persistence_windows::{checkpoint::ReplayPlan, min_max_sequence::OptionalMinMaxSequence};
use persistence_windows::{
checkpoint::ReplayPlan, min_max_sequence::OptionalMinMaxSequence,
persistence_windows::PersistenceWindows,
};
use snafu::{ResultExt, Snafu};
use write_buffer::config::WriteBufferConfig;
@ -231,6 +234,32 @@ pub async fn perform_replay(db: &Db, replay_plan: &ReplayPlan) -> Result<()> {
}
}
}
// remember max seen sequence numbers even for partitions that were not touched during replay
let late_arrival_window = db.rules().lifecycle_rules.late_arrive_window();
for (table_name, partition_key) in replay_plan.partitions() {
if let Ok(partition) = db.partition(&table_name, &partition_key) {
let mut partition = partition.write();
let partition_checkpoint = replay_plan
.last_partition_checkpoint(&table_name, &partition_key)
.expect("replay plan inconsistent");
match partition.persistence_windows_mut() {
Some(windows) => {
windows.mark_seen_and_persisted(partition_checkpoint);
}
None => {
let mut windows = PersistenceWindows::new(
partition.addr().clone(),
late_arrival_window,
db.background_worker_now(),
);
windows.mark_seen_and_persisted(partition_checkpoint);
partition.set_persistence_windows(windows);
}
}
}
}
}
Ok(())
@ -2010,6 +2039,136 @@ mod tests {
);
}
#[tokio::test]
async fn replay_initializes_max_seen_sequence_numbers() {
// Ensures that either replay or the catalog loading initializes the maximum seen sequence numbers (per
// partition) correctly. Before this test (and its fix), sequence numbers were only written if there was any
// unpersisted range during replay.
//
// This is a regression test for https://github.com/influxdata/influxdb_iox/issues/2215
ReplayTest {
n_sequencers: 2,
steps: vec![
Step::Ingest(vec![
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 0,
lp: "table_1,tag_partition_by=b bar=10 10",
},
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 1,
lp: "table_1,tag_partition_by=a bar=20 20",
},
]),
Step::Await(vec![
Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 10 | b | 1970-01-01T00:00:00.000000010Z |",
"| 20 | a | 1970-01-01T00:00:00.000000020Z |",
"+-----+------------------+--------------------------------+",
],
),
]),
Step::MakeWritesPersistable,
Step::Persist(vec![("table_1", "tag_partition_by_a")]),
Step::Restart,
Step::Replay,
Step::Assert(vec![
Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 10 | b | 1970-01-01T00:00:00.000000010Z |",
"| 20 | a | 1970-01-01T00:00:00.000000020Z |",
"+-----+------------------+--------------------------------+",
],
),
]),
Step::Ingest(vec![
TestSequencedEntry {
sequencer_id: 1,
sequence_number: 0,
lp: "table_1,tag_partition_by=a bar=30 30",
},
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 2,
lp: "table_1,tag_partition_by=b bar=40 40",
},
]),
Step::Await(vec![
Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 10 | b | 1970-01-01T00:00:00.000000010Z |",
"| 20 | a | 1970-01-01T00:00:00.000000020Z |",
"| 30 | a | 1970-01-01T00:00:00.000000030Z |",
"| 40 | b | 1970-01-01T00:00:00.000000040Z |",
"+-----+------------------+--------------------------------+",
],
),
]),
Step::MakeWritesPersistable,
Step::Persist(vec![("table_1", "tag_partition_by_a")]),
Step::Assert(vec![
// there should be two persisted chunks for partition a
Check::Query(
"select storage, count(*) as n from system.chunks where partition_key = 'tag_partition_by_a' group by storage order by storage",
vec![
"+--------------------------+---+",
"| storage | n |",
"+--------------------------+---+",
"| ObjectStoreOnly | 1 |",
"| ReadBufferAndObjectStore | 1 |",
"+--------------------------+---+",
],
),
]),
Step::Restart,
Step::Replay,
Step::Assert(vec![
Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 10 | b | 1970-01-01T00:00:00.000000010Z |",
"| 20 | a | 1970-01-01T00:00:00.000000020Z |",
"| 30 | a | 1970-01-01T00:00:00.000000030Z |",
"| 40 | b | 1970-01-01T00:00:00.000000040Z |",
"+-----+------------------+--------------------------------+",
],
),
// no additional chunk for partition a was created
Check::Query(
"select storage, count(*) as n from system.chunks where partition_key = 'tag_partition_by_a' group by storage order by storage",
vec![
"+-----------------+---+",
"| storage | n |",
"+-----------------+---+",
"| ObjectStoreOnly | 2 |",
"+-----------------+---+",
],
),
]),
],
..Default::default()
}
.run()
.await
}
#[tokio::test]
async fn replay_fail_sequencers_change() {
// create write buffer w/ sequencer 0 and 1