feat: check that replay plan and write buffer are in-sync
parent
db0f501b02
commit
0c89930b7c
|
@ -42,7 +42,7 @@ use persistence_windows::persistence_windows::PersistenceWindows;
|
|||
use query::{exec::Executor, predicate::Predicate, QueryDatabase};
|
||||
use rand_distr::{Distribution, Poisson};
|
||||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::{
|
||||
any::Any,
|
||||
collections::HashMap,
|
||||
|
@ -186,6 +186,16 @@ pub enum Error {
|
|||
sequencer_id: u32,
|
||||
source: Box<Error>,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Replay plan references unknown sequencer {}, known sequencers are {:?}",
|
||||
sequencer_id,
|
||||
sequencer_ids,
|
||||
))]
|
||||
ReplayUnknownSequencer {
|
||||
sequencer_id: u32,
|
||||
sequencer_ids: Vec<u32>,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -767,12 +777,22 @@ impl Db {
|
|||
.try_lock()
|
||||
.expect("no streams should exist at this point");
|
||||
|
||||
// determine replay ranges based on the plan
|
||||
let sequencer_ids: Vec<_> = write_buffer
|
||||
// check if write buffer and replay plan agree on the set of sequencer ids
|
||||
let sequencer_ids: BTreeSet<_> = write_buffer
|
||||
.streams()
|
||||
.into_iter()
|
||||
.map(|(sequencer_id, _stream)| sequencer_id)
|
||||
.collect();
|
||||
for sequencer_id in replay_plan.sequencer_ids() {
|
||||
if !sequencer_ids.contains(&sequencer_id) {
|
||||
return Err(Error::ReplayUnknownSequencer {
|
||||
sequencer_id,
|
||||
sequencer_ids: sequencer_ids.iter().copied().collect(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// determine replay ranges based on the plan
|
||||
let replay_ranges: BTreeMap<_, _> = sequencer_ids
|
||||
.into_iter()
|
||||
.filter_map(|sequencer_id| {
|
||||
|
@ -1430,7 +1450,10 @@ mod tests {
|
|||
metadata::IoxParquetMetaData,
|
||||
test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data},
|
||||
};
|
||||
use persistence_windows::min_max_sequence::MinMaxSequence;
|
||||
use persistence_windows::{
|
||||
checkpoint::{PartitionCheckpoint, PersistCheckpointBuilder, ReplayPlanner},
|
||||
min_max_sequence::{MinMaxSequence, OptionalMinMaxSequence},
|
||||
};
|
||||
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase};
|
||||
use std::{
|
||||
collections::{BTreeMap, HashSet},
|
||||
|
@ -4378,6 +4401,45 @@ mod tests {
|
|||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn replay_fail_sequencers_change() {
|
||||
// create write buffer w/ sequencer 0 and 1
|
||||
let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(2);
|
||||
let write_buffer = MockBufferForReading::new(write_buffer_state);
|
||||
|
||||
// create DB
|
||||
let db = TestDb::builder()
|
||||
.write_buffer(WriteBufferConfig::Reading(Arc::new(
|
||||
tokio::sync::Mutex::new(Box::new(write_buffer) as _),
|
||||
)))
|
||||
.build()
|
||||
.await
|
||||
.db;
|
||||
|
||||
// construct replay plan for sequencers 0 and 2
|
||||
let mut sequencer_numbers = BTreeMap::new();
|
||||
sequencer_numbers.insert(0, OptionalMinMaxSequence::new(Some(0), 1));
|
||||
sequencer_numbers.insert(2, OptionalMinMaxSequence::new(Some(0), 1));
|
||||
let partition_checkpoint = PartitionCheckpoint::new(
|
||||
Arc::from("table"),
|
||||
Arc::from("partition"),
|
||||
sequencer_numbers,
|
||||
Utc::now(),
|
||||
);
|
||||
let builder = PersistCheckpointBuilder::new(partition_checkpoint);
|
||||
let (partition_checkpoint, database_checkpoint) = builder.build();
|
||||
let mut replay_planner = ReplayPlanner::new();
|
||||
replay_planner.register_checkpoints(&partition_checkpoint, &database_checkpoint);
|
||||
let replay_plan = replay_planner.build().unwrap();
|
||||
|
||||
// replay fails
|
||||
let res = db.perform_replay(&replay_plan).await;
|
||||
assert_contains!(
|
||||
res.unwrap_err().to_string(),
|
||||
"Replay plan references unknown sequencer"
|
||||
);
|
||||
}
|
||||
|
||||
async fn create_parquet_chunk(db: &Arc<Db>) -> (String, String, u32) {
|
||||
write_lp(db, "cpu bar=1 10").await;
|
||||
let partition_key = "1970-01-01T00";
|
||||
|
|
Loading…
Reference in New Issue