refactor: make `MinMaxSequence` safer to use

pull/24376/head
Marco Neumann 2021-06-30 16:37:48 +02:00
parent 403f7297e1
commit 043890369f
3 changed files with 67 additions and 37 deletions

View File

@ -357,7 +357,7 @@ impl PersistCheckpointBuilder {
partition_checkpoint
.sequencer_numbers
.iter()
.map(|(sequencer_id, min_max)| (*sequencer_id, min_max.min))
.map(|(sequencer_id, min_max)| (*sequencer_id, min_max.min()))
.collect()
}
}
@ -391,12 +391,16 @@ impl ReplayPlanner {
match self.replay_ranges.entry(*sequencer_id) {
Vacant(v) => {
// unknown sequencer => just store max value for now
v.insert((None, Some(min_max.max)));
v.insert((None, Some(min_max.max())));
}
Occupied(mut o) => {
// known sequencer => fold in max value (we take the max of all maximums!)
let min_max2 = o.get_mut();
min_max2.1 = Some(min_max2.1.map_or(min_max.max, |max| max.max(min_max.max)));
min_max2.1 = Some(
min_max2
.1
.map_or(min_max.max(), |max| max.max(min_max.max())),
);
}
}
}
@ -443,7 +447,7 @@ impl ReplayPlanner {
for (sequencer_id, min_max) in self.replay_ranges {
match min_max {
(Some(min), Some(max)) => {
replay_ranges.insert(sequencer_id, MinMaxSequence{min, max});
replay_ranges.insert(sequencer_id, MinMaxSequence::new(min, max));
}
(None, Some(_max)) => {
// We've got data for this sequencer via a PartitionCheckpoint but did not see corresponding data in
@ -468,10 +472,10 @@ impl ReplayPlanner {
for (sequencer_id, min_max) in &partition_checkpoint.sequencer_numbers {
let database_wide_min_max = replay_ranges.get(sequencer_id).expect("every partition checkpoint should have resulted in a replay range or an error by now");
if min_max.min < database_wide_min_max.min {
if min_max.min() < database_wide_min_max.min() {
return Err(Error::PartitionCheckpointMinimumBeforeDatabase {
partition_checkpoint_sequence_number: min_max.min,
database_checkpoint_sequence_number: database_wide_min_max.min,
partition_checkpoint_sequence_number: min_max.min(),
database_checkpoint_sequence_number: database_wide_min_max.min(),
table_name: table_name.clone(),
partition_key: partition_key.clone(),
});
@ -546,7 +550,7 @@ mod tests {
{
let mut sequencer_numbers = BTreeMap::new();
$(
sequencer_numbers.insert($sequencer_number, MinMaxSequence{min: $min, max: $max});
sequencer_numbers.insert($sequencer_number, MinMaxSequence::new($min, $max));
)*
let min_unpersisted_timestamp = DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(0, 0), Utc);
@ -577,11 +581,11 @@ mod tests {
assert_eq!(pckpt.partition_key(), "partition_1");
assert_eq!(
pckpt.sequencer_numbers(1).unwrap(),
MinMaxSequence { min: 10, max: 20 }
MinMaxSequence::new(10, 20)
);
assert_eq!(
pckpt.sequencer_numbers(2).unwrap(),
MinMaxSequence { min: 5, max: 15 }
MinMaxSequence::new(5, 15)
);
assert!(pckpt.sequencer_numbers(3).is_none());
assert_eq!(pckpt.sequencer_ids(), vec![1, 2]);
@ -662,18 +666,9 @@ mod tests {
let plan = planner.build().unwrap();
assert_eq!(plan.sequencer_ids(), vec![1, 2, 3]);
assert_eq!(
plan.replay_range(1).unwrap(),
MinMaxSequence { min: 11, max: 19 }
);
assert_eq!(
plan.replay_range(2).unwrap(),
MinMaxSequence { min: 20, max: 28 }
);
assert_eq!(
plan.replay_range(3).unwrap(),
MinMaxSequence { min: 30, max: 39 }
);
assert_eq!(plan.replay_range(1).unwrap(), MinMaxSequence::new(11, 19));
assert_eq!(plan.replay_range(2).unwrap(), MinMaxSequence::new(20, 28));
assert_eq!(plan.replay_range(3).unwrap(), MinMaxSequence::new(30, 39));
assert!(plan.replay_range(4).is_none());
assert_eq!(

View File

@ -193,8 +193,31 @@ struct Window {
/// The minimum and maximum sequence numbers seen for a given sequencer
#[derive(Debug, Copy, Clone, PartialEq)]
pub struct MinMaxSequence {
pub min: u64,
pub max: u64,
min: u64,
max: u64,
}
impl MinMaxSequence {
/// Create new min-max sequence range.
///
/// This panics if `min > max`.
pub fn new(min: u64, max: u64) -> Self {
assert!(
min <= max,
"min ({}) is greater than max ({}) sequence",
min,
max
);
Self { min, max }
}
pub fn min(&self) -> u64 {
self.min
}
pub fn max(&self) -> u64 {
self.max
}
}
impl Window {
@ -209,10 +232,7 @@ impl Window {
if let Some(sequence) = sequence {
sequencer_numbers.insert(
sequence.id,
MinMaxSequence {
min: sequence.number,
max: sequence.number,
},
MinMaxSequence::new(sequence.number, sequence.number),
);
}
@ -250,10 +270,7 @@ impl Window {
None => {
self.sequencer_numbers.insert(
sequence.id,
MinMaxSequence {
min: sequence.number,
max: sequence.number,
},
MinMaxSequence::new(sequence.number, sequence.number),
);
}
}
@ -287,6 +304,24 @@ impl Window {
mod tests {
use super::*;
#[test]
fn test_min_max_getters() {
let min_max = MinMaxSequence::new(10, 20);
assert_eq!(min_max.min(), 10);
assert_eq!(min_max.max(), 20);
}
#[test]
fn test_min_max_accepts_equal_values() {
MinMaxSequence::new(10, 10);
}
#[test]
#[should_panic(expected = "min (11) is greater than max (10) sequence")]
fn test_min_max_checks_values() {
MinMaxSequence::new(11, 10);
}
#[test]
fn starts_open_window() {
let mut w = PersistenceWindows::new(Duration::from_secs(60));
@ -334,11 +369,11 @@ mod tests {
assert_eq!(open.row_count, 14);
assert_eq!(
open.sequencer_numbers.get(&1).unwrap(),
&MinMaxSequence { min: 2, max: 10 }
&MinMaxSequence::new(2, 10)
);
assert_eq!(
open.sequencer_numbers.get(&2).unwrap(),
&MinMaxSequence { min: 23, max: 23 }
&MinMaxSequence::new(23, 23)
);
}
@ -380,7 +415,7 @@ mod tests {
let closed = w.closed.get(0).unwrap();
assert_eq!(
closed.sequencer_numbers.get(&1).unwrap(),
&MinMaxSequence { min: 2, max: 3 }
&MinMaxSequence::new(2, 3)
);
assert_eq!(closed.row_count, 2);
assert_eq!(closed.min_time, start_time);
@ -392,7 +427,7 @@ mod tests {
assert_eq!(open.max_time, open_time);
assert_eq!(
open.sequencer_numbers.get(&1).unwrap(),
&MinMaxSequence { min: 6, max: 6 }
&MinMaxSequence::new(6, 6)
)
}

View File

@ -1975,7 +1975,7 @@ mod tests {
let seq = windows.minimum_unpersisted_sequence().unwrap();
let seq = seq.get(&0).unwrap();
assert_eq!(seq, &MinMaxSequence { min: 0, max: 2 });
assert_eq!(seq, &MinMaxSequence::new(0, 2));
}
#[tokio::test]