fix: also seed persistence windows when skipping replay

pull/24376/head
Marco Neumann 2021-08-09 15:54:34 +02:00
parent 2082042626
commit 65b1ca2071
1 changed files with 172 additions and 4 deletions

View File

@ -4,11 +4,13 @@ use std::{
time::Duration,
};
use chrono::Utc;
use entry::{Sequence, TableBatch};
use futures::TryStreamExt;
use observability_deps::tracing::info;
use persistence_windows::{
checkpoint::ReplayPlan, min_max_sequence::OptionalMinMaxSequence,
checkpoint::{PartitionCheckpoint, ReplayPlan},
min_max_sequence::OptionalMinMaxSequence,
persistence_windows::PersistenceWindows,
};
use snafu::{ResultExt, Snafu};
@ -93,11 +95,51 @@ pub async fn seek_to_end(db: &Db) -> Result<()> {
watermarks.push((sequencer_id, watermark));
}
for (sequencer_id, watermark) in watermarks {
for (sequencer_id, watermark) in &watermarks {
write_buffer
.seek(sequencer_id, watermark)
.seek(*sequencer_id, *watermark)
.await
.context(SeekError { sequencer_id })?;
.context(SeekError {
sequencer_id: *sequencer_id,
})?;
}
// remember max seen sequence numbers
let late_arrival_window = db.rules().lifecycle_rules.late_arrive_window();
let sequencer_numbers: BTreeMap<_, _> = watermarks
.into_iter()
.filter(|(_sequencer_id, watermark)| *watermark > 0)
.map(|(sequencer_id, watermark)| {
(
sequencer_id,
OptionalMinMaxSequence::new(None, watermark - 1),
)
})
.collect();
for partition in db.catalog.partitions() {
let mut partition = partition.write();
let dummy_checkpoint = PartitionCheckpoint::new(
Arc::from(partition.table_name()),
Arc::from(partition.key()),
sequencer_numbers.clone(),
Utc::now(),
);
match partition.persistence_windows_mut() {
Some(windows) => {
windows.mark_seen_and_persisted(&dummy_checkpoint);
}
None => {
let mut windows = PersistenceWindows::new(
partition.addr().clone(),
late_arrival_window,
db.background_worker_now(),
);
windows.mark_seen_and_persisted(&dummy_checkpoint);
partition.set_persistence_windows(windows);
}
}
}
}
@ -2169,6 +2211,132 @@ mod tests {
.await
}
#[tokio::test]
async fn skip_replay_initializes_max_seen_sequence_numbers() {
// Similar case to `replay_initializes_max_seen_sequence_numbers` but instead of replaying, we skip replay to
// provoke a similar outcome.
//
// This is a regression test for https://github.com/influxdata/influxdb_iox/issues/2215
ReplayTest {
n_sequencers: 2,
steps: vec![
Step::Ingest(vec![
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 0,
lp: "table_1,tag_partition_by=b bar=10 10",
},
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 1,
lp: "table_1,tag_partition_by=a bar=20 20",
},
]),
Step::Await(vec![
Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 10 | b | 1970-01-01T00:00:00.000000010Z |",
"| 20 | a | 1970-01-01T00:00:00.000000020Z |",
"+-----+------------------+--------------------------------+",
],
),
]),
Step::MakeWritesPersistable,
Step::Persist(vec![("table_1", "tag_partition_by_a")]),
Step::Restart,
Step::SkipReplay,
Step::Assert(vec![
Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 20 | a | 1970-01-01T00:00:00.000000020Z |",
"+-----+------------------+--------------------------------+",
],
),
]),
Step::Ingest(vec![
TestSequencedEntry {
sequencer_id: 1,
sequence_number: 0,
lp: "table_1,tag_partition_by=a bar=30 30",
},
TestSequencedEntry {
sequencer_id: 0,
sequence_number: 2,
lp: "table_1,tag_partition_by=b bar=40 40",
},
]),
Step::Await(vec![
Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 20 | a | 1970-01-01T00:00:00.000000020Z |",
"| 30 | a | 1970-01-01T00:00:00.000000030Z |",
"| 40 | b | 1970-01-01T00:00:00.000000040Z |",
"+-----+------------------+--------------------------------+",
],
),
]),
Step::MakeWritesPersistable,
Step::Persist(vec![("table_1", "tag_partition_by_a")]),
Step::Assert(vec![
// there should be two persisted chunks for partition a
Check::Query(
"select storage, count(*) as n from system.chunks where partition_key = 'tag_partition_by_a' group by storage order by storage",
vec![
"+--------------------------+---+",
"| storage | n |",
"+--------------------------+---+",
"| ObjectStoreOnly | 1 |",
"| ReadBufferAndObjectStore | 1 |",
"+--------------------------+---+",
],
),
]),
Step::Restart,
Step::Replay,
Step::Assert(vec![
Check::Query(
"select * from table_1 order by bar",
vec![
"+-----+------------------+--------------------------------+",
"| bar | tag_partition_by | time |",
"+-----+------------------+--------------------------------+",
"| 20 | a | 1970-01-01T00:00:00.000000020Z |",
"| 30 | a | 1970-01-01T00:00:00.000000030Z |",
"| 40 | b | 1970-01-01T00:00:00.000000040Z |",
"+-----+------------------+--------------------------------+",
],
),
// no additional chunk for partition a was created
Check::Query(
"select storage, count(*) as n from system.chunks where partition_key = 'tag_partition_by_a' group by storage order by storage",
vec![
"+-----------------+---+",
"| storage | n |",
"+-----------------+---+",
"| ObjectStoreOnly | 2 |",
"+-----------------+---+",
],
),
]),
],
..Default::default()
}
.run()
.await
}
#[tokio::test]
async fn replay_fail_sequencers_change() {
// create write buffer w/ sequencer 0 and 1