Merge pull request #2084 from influxdata/crepererum/fix_checkpoints_again

refactor: correctly track "seen" ranges in persistence checkpoints
pull/24376/head
kodiakhq[bot] 2021-07-22 11:45:39 +00:00 committed by GitHub
commit f4b9fe20fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 447 additions and 171 deletions

View File

@ -39,8 +39,8 @@ message IoxMetadata {
// checkpoint is usually serialized as part of `IoxMetadata`, the partition information is NOT repeated as part of this
// message.
message PartitionCheckpoint {
// Maps sequencer_id to the minimum and maximum sequence numbers seen.
map<uint32, MinMaxSequence> sequencer_numbers = 1;
// Maps `sequencer_id` to the to-be-persisted minimum and seen maximum sequence numbers.
map<uint32, OptionalMinMaxSequence> sequencer_numbers = 1;
// Minimum unpersisted timestamp.
FixedSizeTimestamp min_unpersisted_timestamp = 2;
@ -54,12 +54,19 @@ message DatabaseCheckpoint {
// was `min_sequence_numbers`
reserved 1;
// Maps sequencer_id to the minimum and maximum sequence numbers seen.
map<uint32, MinMaxSequence> sequencer_numbers = 2;
// Maps `sequencer_id` to the to-be-persisted minimum and seen maximum sequence numbers.
map<uint32, OptionalMinMaxSequence> sequencer_numbers = 2;
}
// The minimum and maximum sequence numbers seen for a given sequencer.
message MinMaxSequence {
uint64 min = 1;
// An optional uint64.
message OptionalUint64 {
uint64 value = 1;
}
// The optional to-be-replayed minimum and seen maximum sequence numbers for a given sequencer.
//
// If the minimum value is missing, no replay is required for this sequencer.
message OptionalMinMaxSequence {
OptionalUint64 min = 1;
uint64 max = 2;
}

View File

@ -31,7 +31,7 @@ use uuid::Uuid;
/// Current version for serialized transactions.
///
/// For breaking changes, this will change.
pub const TRANSACTION_VERSION: u32 = 9;
pub const TRANSACTION_VERSION: u32 = 10;
/// File suffix for transaction files in object store.
pub const TRANSACTION_FILE_SUFFIX: &str = "txn";

View File

@ -109,7 +109,7 @@ use parquet::{
};
use persistence_windows::{
checkpoint::{DatabaseCheckpoint, PartitionCheckpoint},
min_max_sequence::MinMaxSequence,
min_max_sequence::OptionalMinMaxSequence,
};
use prost::Message;
use snafu::{OptionExt, ResultExt, Snafu};
@ -121,7 +121,7 @@ use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputPro
///
/// **Important: When changing this structure, consider bumping the
/// [catalog transaction version](crate::catalog::TRANSACTION_VERSION)!**
pub const METADATA_VERSION: u32 = 3;
pub const METADATA_VERSION: u32 = 4;
/// File-level metadata key to store the IOx-specific data.
///
@ -311,13 +311,15 @@ impl IoxMetadata {
.sequencer_numbers
.into_iter()
.map(|(sequencer_id, min_max)| {
if min_max.min <= min_max.max {
Ok((sequencer_id, MinMaxSequence::new(min_max.min, min_max.max)))
let min = min_max.min.as_ref().map(|min| min.value);
if min.map(|min| min <= min_max.max).unwrap_or(true) {
Ok((sequencer_id, OptionalMinMaxSequence::new(min, min_max.max)))
} else {
Err(Error::IoxMetadataMinMax)
}
})
.collect::<Result<BTreeMap<u32, MinMaxSequence>>>()?;
.collect::<Result<BTreeMap<u32, OptionalMinMaxSequence>>>()?;
let min_unpersisted_timestamp = decode_timestamp(
proto_partition_checkpoint
.min_unpersisted_timestamp
@ -343,13 +345,15 @@ impl IoxMetadata {
.sequencer_numbers
.into_iter()
.map(|(sequencer_id, min_max)| {
if min_max.min <= min_max.max {
Ok((sequencer_id, MinMaxSequence::new(min_max.min, min_max.max)))
let min = min_max.min.as_ref().map(|min| min.value);
if min.map(|min| min <= min_max.max).unwrap_or(true) {
Ok((sequencer_id, OptionalMinMaxSequence::new(min, min_max.max)))
} else {
Err(Error::IoxMetadataMinMax)
}
})
.collect::<Result<BTreeMap<u32, MinMaxSequence>>>()?;
.collect::<Result<BTreeMap<u32, OptionalMinMaxSequence>>>()?;
let database_checkpoint = DatabaseCheckpoint::new(sequencer_numbers);
Ok(Self {
@ -371,8 +375,10 @@ impl IoxMetadata {
.map(|(sequencer_id, min_max)| {
(
sequencer_id,
proto::MinMaxSequence {
min: min_max.min(),
proto::OptionalMinMaxSequence {
min: min_max
.min()
.map(|min| proto::OptionalUint64 { value: min }),
max: min_max.max(),
},
)
@ -390,8 +396,10 @@ impl IoxMetadata {
.map(|(sequencer_id, min_max)| {
(
sequencer_id,
proto::MinMaxSequence {
min: min_max.min(),
proto::OptionalMinMaxSequence {
min: min_max
.min()
.map(|min| proto::OptionalUint64 { value: min }),
max: min_max.max(),
},
)

View File

@ -28,7 +28,7 @@ use parquet::{
};
use persistence_windows::{
checkpoint::{DatabaseCheckpoint, PartitionCheckpoint, PersistCheckpointBuilder},
min_max_sequence::MinMaxSequence,
min_max_sequence::OptionalMinMaxSequence,
};
use crate::{
@ -758,8 +758,8 @@ pub fn create_partition_and_database_checkpoint(
) -> (PartitionCheckpoint, DatabaseCheckpoint) {
// create first partition checkpoint
let mut sequencer_numbers = BTreeMap::new();
sequencer_numbers.insert(1, MinMaxSequence::new(15, 18));
sequencer_numbers.insert(2, MinMaxSequence::new(25, 28));
sequencer_numbers.insert(1, OptionalMinMaxSequence::new(None, 18));
sequencer_numbers.insert(2, OptionalMinMaxSequence::new(Some(25), 28));
let min_unpersisted_timestamp = Utc.timestamp(10, 20);
let partition_checkpoint_1 = PartitionCheckpoint::new(
Arc::clone(&table_name),
@ -770,8 +770,8 @@ pub fn create_partition_and_database_checkpoint(
// create second partition checkpoint
let mut sequencer_numbers = BTreeMap::new();
sequencer_numbers.insert(2, MinMaxSequence::new(24, 29));
sequencer_numbers.insert(3, MinMaxSequence::new(35, 38));
sequencer_numbers.insert(2, OptionalMinMaxSequence::new(Some(24), 29));
sequencer_numbers.insert(3, OptionalMinMaxSequence::new(Some(35), 38));
let min_unpersisted_timestamp = Utc.timestamp(20, 30);
let partition_checkpoint_2 = PartitionCheckpoint::new(
table_name,

View File

@ -177,11 +177,14 @@
//! let mut planner = ReplayPlanner::new();
//!
//! // scan preserved catalog
//! // Important: Files MUST be scanned in order in which they were added to the catalog!
//! // Note: While technically we only need to scan the last parquet file per partition,
//! // it is totally valid to scan the whole catalog.
//! for file in catalog.files() {
//! planner.register_partition_checkpoint(&file.extract_partition_checkpoint());
//! planner.register_database_checkpoint(&file.extract_database_checkpoint());
//! planner.register_checkpoints(
//! &file.extract_partition_checkpoint(),
//! &file.extract_database_checkpoint(),
//! );
//! }
//!
//! // create replay plan
@ -206,21 +209,60 @@ use std::{
};
use chrono::{DateTime, Utc};
use snafu::Snafu;
use snafu::{OptionExt, Snafu};
use crate::min_max_sequence::MinMaxSequence;
use crate::min_max_sequence::OptionalMinMaxSequence;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Did not find a DatabaseCheckpoint for sequencer {}", sequencer_id))]
NoDatabaseCheckpointFound { sequencer_id: u32 },
#[snafu(
display(
"Got sequence range in partition checkpoint but no database-wide number for partition {}:{} and sequencer {}",
table_name,
partition_key,
sequencer_id,
)
)]
PartitionCheckpointWithoutDatabase {
table_name: Arc<str>,
partition_key: Arc<str>,
sequencer_id: u32,
},
#[snafu(display("Minimum sequence number in partition checkpoint ({}) is lower than database-wide number ({}) for partition {}:{}", partition_checkpoint_sequence_number, database_checkpoint_sequence_number, table_name, partition_key))]
#[snafu(
display(
"Minimum sequence number in partition checkpoint ({}) is larger than database-wide number ({}) for partition {}:{} and sequencer {}",
partition_checkpoint_sequence_number,
database_checkpoint_sequence_number,
table_name,
partition_key,
sequencer_id,
)
)]
PartitionCheckpointMaximumAfterDatabase {
partition_checkpoint_sequence_number: u64,
database_checkpoint_sequence_number: u64,
table_name: Arc<str>,
partition_key: Arc<str>,
sequencer_id: u32,
},
#[snafu(
display(
"Minimum sequence number in partition checkpoint ({}) is lower than database-wide number ({}) for partition {}:{} and sequencer {}",
partition_checkpoint_sequence_number,
database_checkpoint_sequence_number,
table_name,
partition_key,
sequencer_id,
)
)]
PartitionCheckpointMinimumBeforeDatabase {
partition_checkpoint_sequence_number: u64,
database_checkpoint_sequence_number: u64,
table_name: Arc<str>,
partition_key: Arc<str>,
sequencer_id: u32,
},
}
@ -235,8 +277,8 @@ pub struct PartitionCheckpoint {
/// Partition key.
partition_key: Arc<str>,
/// Maps `sequencer_id` to the minimum and maximum sequence numbers seen.
sequencer_numbers: BTreeMap<u32, MinMaxSequence>,
/// Maps `sequencer_id` to the to-be-persisted minimum and seen maximum sequence numbers.
sequencer_numbers: BTreeMap<u32, OptionalMinMaxSequence>,
/// Minimum unpersisted timestamp.
min_unpersisted_timestamp: DateTime<Utc>,
@ -247,7 +289,7 @@ impl PartitionCheckpoint {
pub fn new(
table_name: Arc<str>,
partition_key: Arc<str>,
sequencer_numbers: BTreeMap<u32, MinMaxSequence>,
sequencer_numbers: BTreeMap<u32, OptionalMinMaxSequence>,
min_unpersisted_timestamp: DateTime<Utc>,
) -> Self {
Self {
@ -268,11 +310,11 @@ impl PartitionCheckpoint {
&self.partition_key
}
/// Maps `sequencer_id` to the minimum and maximum sequence numbers seen.
/// Maps `sequencer_id` to the to-be-persisted minimum and seen maximum sequence numbers.
///
/// Will return `None` if the sequencer was not yet seen in which case there is not need to replay data from this
/// sequencer for this partition.
pub fn sequencer_numbers(&self, sequencer_id: u32) -> Option<MinMaxSequence> {
pub fn sequencer_numbers(&self, sequencer_id: u32) -> Option<OptionalMinMaxSequence> {
self.sequencer_numbers.get(&sequencer_id).copied()
}
@ -282,7 +324,9 @@ impl PartitionCheckpoint {
}
/// Iterate over sequencer numbers.
pub fn sequencer_numbers_iter(&self) -> impl Iterator<Item = (u32, MinMaxSequence)> + '_ {
pub fn sequencer_numbers_iter(
&self,
) -> impl Iterator<Item = (u32, OptionalMinMaxSequence)> + '_ {
self.sequencer_numbers
.iter()
.map(|(sequencer_id, min_max)| (*sequencer_id, *min_max))
@ -300,7 +344,7 @@ impl PartitionCheckpoint {
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DatabaseCheckpoint {
/// Maps `sequencer_id` to the minimum and maximum sequence numbers seen.
sequencer_numbers: BTreeMap<u32, MinMaxSequence>,
sequencer_numbers: BTreeMap<u32, OptionalMinMaxSequence>,
}
impl DatabaseCheckpoint {
@ -308,18 +352,21 @@ impl DatabaseCheckpoint {
///
/// **This should only rarely be be used directly. Consider using [`PersistCheckpointBuilder`] to collect
/// database-wide checkpoints!**
pub fn new(sequencer_numbers: BTreeMap<u32, MinMaxSequence>) -> Self {
pub fn new(sequencer_numbers: BTreeMap<u32, OptionalMinMaxSequence>) -> Self {
Self { sequencer_numbers }
}
/// Get sequence number range that should be used during replay of the given sequencer.
///
/// If the range only has the maximum but not the minimum set, then this partition in is fully persisted up to and
/// including that maximum. The caller must continue normal playback AFTER the maximum.
///
/// This will return `None` for unknown sequencer. This might have multiple reasons, e.g. in case of Apache Kafka it
/// might be that a partition has not delivered any data yet (for a quite young database) or that the partitioning
/// was wrongly reconfigured (to more partitions in which case the ordering would be wrong). The latter case MUST be
/// detected by another layer, e.g. using persisted database rules. The reaction to `None` in this layer should be
/// "no replay required for this sequencer, just continue with normal playback".
pub fn sequencer_number(&self, sequencer_id: u32) -> Option<MinMaxSequence> {
pub fn sequencer_number(&self, sequencer_id: u32) -> Option<OptionalMinMaxSequence> {
self.sequencer_numbers.get(&sequencer_id).copied()
}
@ -329,7 +376,9 @@ impl DatabaseCheckpoint {
}
/// Iterate over sequencer numbers
pub fn sequencer_numbers_iter(&self) -> impl Iterator<Item = (u32, MinMaxSequence)> + '_ {
pub fn sequencer_numbers_iter(
&self,
) -> impl Iterator<Item = (u32, OptionalMinMaxSequence)> + '_ {
self.sequencer_numbers
.iter()
.map(|(sequencer_id, min_max)| (*sequencer_id, *min_max))
@ -376,10 +425,14 @@ impl PersistCheckpointBuilder {
}
Occupied(mut o) => {
let existing_min_max = o.get_mut();
*existing_min_max = MinMaxSequence::new(
existing_min_max.min().min(min_max.min()),
existing_min_max.max().max(min_max.max()),
);
let min = match (existing_min_max.min(), min_max.min()) {
(Some(a), Some(b)) => Some(a.min(b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
};
let max = existing_min_max.max().max(min_max.max());
*existing_min_max = OptionalMinMaxSequence::new(min, max);
}
}
}
@ -403,7 +456,7 @@ impl PersistCheckpointBuilder {
#[derive(Debug)]
pub struct ReplayPlanner {
/// Range (inclusive minimum, inclusive maximum) of sequence number to be replayed for each sequencer.
replay_ranges: BTreeMap<u32, (Option<u64>, u64)>,
replay_ranges: BTreeMap<u32, OptionalMinMaxSequence>,
/// Last known partition checkpoint, mapped via table name and partition key.
last_partition_checkpoints: BTreeMap<(Arc<str>, Arc<str>), PartitionCheckpoint>,
@ -418,25 +471,14 @@ impl ReplayPlanner {
}
}
/// Register a partition checkpoint that was found in the catalog.
pub fn register_partition_checkpoint(&mut self, partition_checkpoint: &PartitionCheckpoint) {
for (sequencer_id, min_max) in &partition_checkpoint.sequencer_numbers {
match self.replay_ranges.entry(*sequencer_id) {
Vacant(v) => {
// unknown sequencer => just store max value for now
v.insert((None, min_max.max()));
}
Occupied(mut o) => {
// known sequencer => fold in:
// - max value (we take the max of all maximums!)
//
// We are NOT folding in the min value here since other partitions might not be that far yet.
let existing_min_max = o.get_mut();
existing_min_max.1 = existing_min_max.1.max(min_max.max());
}
}
}
/// Register a partition and database checkpoint that was found in the catalog.
///
/// **Note: The checkpoints MUST be added in the same order as they where written to the preserved catalog!**
pub fn register_checkpoints(
&mut self,
partition_checkpoint: &PartitionCheckpoint,
database_checkpoint: &DatabaseCheckpoint,
) {
match self.last_partition_checkpoints.entry((
Arc::clone(partition_checkpoint.table_name()),
Arc::clone(partition_checkpoint.partition_key()),
@ -446,36 +488,21 @@ impl ReplayPlanner {
v.insert(partition_checkpoint.clone());
}
Occupied(mut o) => {
// known partition => check which one is the newer checkpoint
if o.get().min_unpersisted_timestamp
< partition_checkpoint.min_unpersisted_timestamp
{
o.insert(partition_checkpoint.clone());
}
// known partition, but added afterwards => insert
o.insert(partition_checkpoint.clone());
}
}
}
/// Register a database checkpoint that was found in the catalog.
pub fn register_database_checkpoint(&mut self, database_checkpoint: &DatabaseCheckpoint) {
for (sequencer_id, min_max) in &database_checkpoint.sequencer_numbers {
match self.replay_ranges.entry(*sequencer_id) {
Vacant(v) => {
// unknown sequencer => store min value (we keep the latest value for that one) and max value (in
// case when we don't find another partition checkpoint for this sequencer, so we have a fallback)
v.insert((Some(min_max.min()), min_max.max()));
v.insert(*min_max);
}
Occupied(mut o) => {
// known sequencer => fold in:
// - min value (we take the max of all mins!)
// - max value (we take the max of all max!)
let existing_min_max = o.get_mut();
existing_min_max.0 = Some(
existing_min_max
.0
.map_or(min_max.min(), |min2| min2.max(min_max.min())),
);
existing_min_max.1 = existing_min_max.1.max(min_max.max());
// known sequencer => take the alter value
o.insert(*min_max);
}
}
}
@ -483,34 +510,41 @@ impl ReplayPlanner {
/// Build plan that is then used for replay.
pub fn build(self) -> Result<ReplayPlan> {
let mut replay_ranges = BTreeMap::new();
for (sequencer_id, min_max) in self.replay_ranges {
match min_max {
(Some(min), max) => {
replay_ranges.insert(sequencer_id, MinMaxSequence::new(min, max));
}
(None, _max) => {
// We've got data for this sequencer via a PartitionCheckpoint but did not see corresponding data in
// any of the DatabaseCheckpoints. This is clearly a data error, because for the PartitionCheckpoint
// in question we should have persisted (and read back) a DatabaseCheckpoint in the very same
// Parquet file that contains a value for that sequencer.
return Err(Error::NoDatabaseCheckpointFound { sequencer_id });
}
}
}
let Self {
replay_ranges,
last_partition_checkpoints,
} = self;
// sanity-check partition checkpoints
for ((table_name, partition_key), partition_checkpoint) in &self.last_partition_checkpoints
{
for ((table_name, partition_key), partition_checkpoint) in &last_partition_checkpoints {
for (sequencer_id, min_max) in &partition_checkpoint.sequencer_numbers {
let database_wide_min_max = replay_ranges.get(sequencer_id).expect("every partition checkpoint should have resulted in a replay range or an error by now");
if min_max.min() < database_wide_min_max.min() {
return Err(Error::PartitionCheckpointMinimumBeforeDatabase {
partition_checkpoint_sequence_number: min_max.min(),
database_checkpoint_sequence_number: database_wide_min_max.min(),
let database_wide_min_max = replay_ranges.get(sequencer_id).context(
PartitionCheckpointWithoutDatabase {
table_name: Arc::clone(table_name),
partition_key: Arc::clone(partition_key),
sequencer_id: *sequencer_id,
},
)?;
if let (Some(min), Some(db_min)) = (min_max.min(), database_wide_min_max.min()) {
if min < db_min {
return Err(Error::PartitionCheckpointMinimumBeforeDatabase {
partition_checkpoint_sequence_number: min,
database_checkpoint_sequence_number: db_min,
table_name: Arc::clone(table_name),
partition_key: Arc::clone(partition_key),
sequencer_id: *sequencer_id,
});
}
}
if min_max.max() > database_wide_min_max.max() {
return Err(Error::PartitionCheckpointMaximumAfterDatabase {
partition_checkpoint_sequence_number: min_max.max(),
database_checkpoint_sequence_number: database_wide_min_max.max(),
table_name: Arc::clone(table_name),
partition_key: Arc::clone(partition_key),
sequencer_id: *sequencer_id,
});
}
}
@ -518,7 +552,7 @@ impl ReplayPlanner {
Ok(ReplayPlan {
replay_ranges,
last_partition_checkpoints: self.last_partition_checkpoints,
last_partition_checkpoints,
})
}
}
@ -534,8 +568,9 @@ impl Default for ReplayPlanner {
pub struct ReplayPlan {
/// Replay range (inclusive minimum sequence number, inclusive maximum sequence number) for every sequencer.
///
/// For sequencers not included in this map, no replay is required.
replay_ranges: BTreeMap<u32, MinMaxSequence>,
/// For sequencers not included in this map, no replay is required. For sequencer that only have a maximum, only
/// seeking to maximum plus 1 is required (but no replay).
replay_ranges: BTreeMap<u32, OptionalMinMaxSequence>,
/// Last known partition checkpoint, mapped via table name and partition key.
last_partition_checkpoints: BTreeMap<(Arc<str>, Arc<str>), PartitionCheckpoint>,
@ -544,8 +579,11 @@ pub struct ReplayPlan {
impl ReplayPlan {
/// Get replay range for a sequencer.
///
/// When this returns `None`, no replay is required for this sequencer and we can just start to playback normally.
pub fn replay_range(&self, sequencer_id: u32) -> Option<MinMaxSequence> {
/// If only a maximum but no minimum is returned, then no replay is required but the caller must seek the sequencer
/// to maximum plus 1.
///
/// If this returns `None`, no replay is required for this sequencer and we can just start to playback normally.
pub fn replay_range(&self, sequencer_id: u32) -> Option<OptionalMinMaxSequence> {
self.replay_ranges.get(&sequencer_id).copied()
}
@ -583,7 +621,7 @@ mod tests {
{
let mut sequencer_numbers = BTreeMap::new();
$(
sequencer_numbers.insert($sequencer_number, MinMaxSequence::new($min, $max));
sequencer_numbers.insert($sequencer_number, OptionalMinMaxSequence::new($min, $max));
)*
let min_unpersisted_timestamp = DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(0, 0), Utc);
@ -599,7 +637,7 @@ mod tests {
{
let mut sequencer_numbers = BTreeMap::new();
$(
sequencer_numbers.insert($sequencer_number, MinMaxSequence::new($min, $max));
sequencer_numbers.insert($sequencer_number, OptionalMinMaxSequence::new($min, $max));
)*
DatabaseCheckpoint{sequencer_numbers}
}
@ -608,72 +646,101 @@ mod tests {
#[test]
fn test_partition_checkpoint() {
let pckpt = part_ckpt!("table_1", "partition_1", {1 => (10, 20), 2 => (5, 15)});
let pckpt = part_ckpt!("table_1", "partition_1", {1 => (Some(10), 20), 2 => (None, 15)});
assert_eq!(pckpt.table_name().as_ref(), "table_1");
assert_eq!(pckpt.partition_key().as_ref(), "partition_1");
assert_eq!(
pckpt.sequencer_numbers(1).unwrap(),
MinMaxSequence::new(10, 20)
OptionalMinMaxSequence::new(Some(10), 20)
);
assert_eq!(
pckpt.sequencer_numbers(2).unwrap(),
MinMaxSequence::new(5, 15)
OptionalMinMaxSequence::new(None, 15)
);
assert!(pckpt.sequencer_numbers(3).is_none());
assert_eq!(pckpt.sequencer_ids(), vec![1, 2]);
assert_eq!(
pckpt,
part_ckpt!("table_1", "partition_1", {1 => (10, 20), 2 => (5, 15)})
part_ckpt!("table_1", "partition_1", {1 => (Some(10), 20), 2 => (None, 15)})
);
}
#[test]
fn test_database_checkpoint() {
let dckpt = db_ckpt!({1 => (10, 20), 2 => (5, 15)});
let dckpt = db_ckpt!({1 => (Some(10), 20), 2 => (None, 15)});
assert_eq!(
dckpt.sequencer_number(1).unwrap(),
MinMaxSequence::new(10, 20)
OptionalMinMaxSequence::new(Some(10), 20)
);
assert_eq!(
dckpt.sequencer_number(2).unwrap(),
MinMaxSequence::new(5, 15)
OptionalMinMaxSequence::new(None, 15)
);
assert!(dckpt.sequencer_number(3).is_none());
assert_eq!(dckpt.sequencer_ids(), vec![1, 2]);
assert_eq!(dckpt, db_ckpt!({1 => (10, 20), 2 => (5, 15)}));
assert_eq!(dckpt, db_ckpt!({1 => (Some(10), 20), 2 => (None, 15)}));
}
#[test]
fn test_persist_checkpoint_builder_no_other() {
let pckpt_orig = part_ckpt!("table_1", "partition_1", {1 => (10, 20), 2 => (5, 15)});
let pckpt_orig =
part_ckpt!("table_1", "partition_1", {1 => (Some(10), 20), 2 => (None, 15)});
let builder = PersistCheckpointBuilder::new(pckpt_orig.clone());
let (pckpt, dckpt) = builder.build();
assert_eq!(pckpt, pckpt_orig);
assert_eq!(dckpt, db_ckpt!({1 => (10, 20), 2 => (5, 15)}));
assert_eq!(dckpt, db_ckpt!({1 => (Some(10), 20), 2 => (None, 15)}));
}
#[test]
fn test_persist_checkpoint_builder_others() {
let pckpt_orig =
part_ckpt!("table_1", "partition_1", {1 => (10, 20), 2 => (5, 15), 3 => (15, 26)});
let pckpt_orig = part_ckpt!(
"table_1",
"partition_1",
{
1 => (Some(10), 20),
2 => (Some(5), 15),
3 => (Some(15), 26),
5 => (None, 10),
7 => (None, 11),
8 => (Some(5), 10)
}
);
let mut builder = PersistCheckpointBuilder::new(pckpt_orig.clone());
builder.register_other_partition(
&part_ckpt!("table_1", "partition_2", {2 => (2, 16), 3 => (20, 25), 4 => (13, 14)}),
);
builder.register_other_partition(&part_ckpt!(
"table_1",
"partition_2",
{
2 => (Some(2), 16),
3 => (Some(20), 25),
4 => (Some(13), 14),
6 => (None, 10),
7 => (Some(5), 10),
8 => (None, 11)
}
));
let (pckpt, dckpt) = builder.build();
assert_eq!(pckpt, pckpt_orig);
assert_eq!(
dckpt,
db_ckpt!({1 => (10, 20), 2 => (2, 16), 3 => (15, 26), 4 => (13, 14)})
db_ckpt!({
1 => (Some(10), 20),
2 => (Some(2), 16),
3 => (Some(15), 26),
4 => (Some(13), 14),
5 => (None, 10),
6 => (None, 10),
7 => (Some(5), 11),
8 => (Some(5), 11)
})
);
}
@ -695,25 +762,109 @@ mod tests {
fn test_replay_planner_normal() {
let mut planner = ReplayPlanner::new();
planner.register_partition_checkpoint(
&part_ckpt!("table_1", "partition_1", {1 => (15, 19), 2 => (21, 28)}),
planner.register_checkpoints(
&part_ckpt!(
"table_1",
"partition_1",
{
1 => (Some(15), 19),
2 => (Some(21), 27),
5 => (None, 50),
7 => (None, 70),
8 => (None, 80),
9 => (None, 90),
10 => (None, 100),
11 => (None, 110)
}
),
&db_ckpt!({
1 => (Some(10), 19),
2 => (Some(20), 28),
5 => (None, 51),
6 => (None, 60),
7 => (Some(69), 70),
8 => (Some(79), 80),
9 => (Some(88), 90),
10 => (None, 100),
11 => (None, 110)
}),
);
planner.register_database_checkpoint(&db_ckpt!({1 => (10, 19), 2 => (20, 27)}));
planner.register_partition_checkpoint(
&part_ckpt!("table_1", "partition_2", {2 => (22, 27), 3 => (35, 39)}),
planner.register_checkpoints(
&part_ckpt!(
"table_1",
"partition_2",
{
2 => (Some(22), 26),
3 => (Some(35), 39),
8 => (None, 80),
9 => (Some(89), 90),
10 => (None, 101),
11 => (Some(109), 111)
}
),
&db_ckpt!({
1 => (Some(11), 20),
3 => (Some(30), 40),
4 => (Some(40), 50),
8 => (None, 80),
9 => (Some(89), 90),
10 => (None, 101),
11 => (Some(109), 111)
}),
);
planner
.register_database_checkpoint(&db_ckpt!({1 => (11, 20), 3 => (30, 40), 4 => (40, 50)}));
let plan = planner.build().unwrap();
assert_eq!(plan.sequencer_ids(), vec![1, 2, 3, 4]);
assert_eq!(plan.replay_range(1).unwrap(), MinMaxSequence::new(11, 20));
assert_eq!(plan.replay_range(2).unwrap(), MinMaxSequence::new(20, 28));
assert_eq!(plan.replay_range(3).unwrap(), MinMaxSequence::new(30, 40));
assert_eq!(plan.replay_range(4).unwrap(), MinMaxSequence::new(40, 50));
assert!(plan.replay_range(5).is_none());
assert_eq!(
plan.sequencer_ids(),
vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
);
assert_eq!(
plan.replay_range(1).unwrap(),
OptionalMinMaxSequence::new(Some(11), 20)
);
assert_eq!(
plan.replay_range(2).unwrap(),
OptionalMinMaxSequence::new(Some(20), 28)
);
assert_eq!(
plan.replay_range(3).unwrap(),
OptionalMinMaxSequence::new(Some(30), 40)
);
assert_eq!(
plan.replay_range(4).unwrap(),
OptionalMinMaxSequence::new(Some(40), 50)
);
assert_eq!(
plan.replay_range(5).unwrap(),
OptionalMinMaxSequence::new(None, 51)
);
assert_eq!(
plan.replay_range(6).unwrap(),
OptionalMinMaxSequence::new(None, 60)
);
assert_eq!(
plan.replay_range(7).unwrap(),
OptionalMinMaxSequence::new(Some(69), 70)
);
assert_eq!(
plan.replay_range(8).unwrap(),
OptionalMinMaxSequence::new(None, 80)
);
assert_eq!(
plan.replay_range(9).unwrap(),
OptionalMinMaxSequence::new(Some(89), 90)
);
assert_eq!(
plan.replay_range(10).unwrap(),
OptionalMinMaxSequence::new(None, 101)
);
assert_eq!(
plan.replay_range(11).unwrap(),
OptionalMinMaxSequence::new(Some(109), 111)
);
assert!(plan.replay_range(12).is_none());
assert_eq!(
plan.partitions(),
@ -725,12 +876,36 @@ mod tests {
assert_eq!(
plan.last_partition_checkpoint("table_1", "partition_1")
.unwrap(),
&part_ckpt!("table_1", "partition_1", {1 => (15, 19), 2 => (21, 28)})
&part_ckpt!(
"table_1",
"partition_1",
{
1 => (Some(15), 19),
2 => (Some(21), 27),
5 => (None, 50),
7 => (None, 70),
8 => (None, 80),
9 => (None, 90),
10 => (None, 100),
11 => (None, 110)
}
),
);
assert_eq!(
plan.last_partition_checkpoint("table_1", "partition_2")
.unwrap(),
&part_ckpt!("table_1", "partition_2", {2 => (22, 27), 3 => (35, 39)})
&part_ckpt!(
"table_1",
"partition_2",
{
2 => (Some(22), 26),
3 => (Some(35), 39),
8 => (None, 80),
9 => (Some(89), 90),
10 => (None, 101),
11 => (Some(109), 111)
}
),
);
assert!(plan
.last_partition_checkpoint("table_1", "partition_3")
@ -741,22 +916,26 @@ mod tests {
fn test_replay_planner_fail_missing_database_checkpoint() {
let mut planner = ReplayPlanner::new();
planner.register_partition_checkpoint(
&part_ckpt!("table_1", "partition_1", {1 => (11, 12), 2 => (21, 22)}),
planner.register_checkpoints(
&part_ckpt!("table_1", "partition_1", {1 => (Some(11), 12), 2 => (Some(21), 22)}),
&db_ckpt!({1 => (Some(10), 20), 3 => (Some(30), 40)}),
);
planner.register_database_checkpoint(&db_ckpt!({1 => (10, 20), 3 => (30, 40)}));
let err = planner.build().unwrap_err();
assert!(matches!(err, Error::NoDatabaseCheckpointFound { .. }));
assert!(matches!(
err,
Error::PartitionCheckpointWithoutDatabase { .. }
));
}
#[test]
fn test_replay_planner_fail_minima_out_of_sync() {
let mut planner = ReplayPlanner::new();
planner
.register_partition_checkpoint(&part_ckpt!("table_1", "partition_1", {1 => (10, 12)}));
planner.register_database_checkpoint(&db_ckpt!({1 => (11, 20)}));
planner.register_checkpoints(
&part_ckpt!("table_1", "partition_1", {1 => (Some(10), 12)}),
&db_ckpt!({1 => (Some(11), 20)}),
);
let err = planner.build().unwrap_err();
assert!(matches!(
@ -764,4 +943,20 @@ mod tests {
Error::PartitionCheckpointMinimumBeforeDatabase { .. }
));
}
#[test]
fn test_replay_planner_fail_maximum_out_of_sync() {
let mut planner = ReplayPlanner::new();
planner.register_checkpoints(
&part_ckpt!("table_1", "partition_1", {1 => (Some(11), 20)}),
&db_ckpt!({1 => (Some(10), 12)}),
);
let err = planner.build().unwrap_err();
assert!(matches!(
err,
Error::PartitionCheckpointMaximumAfterDatabase { .. }
));
}
}

View File

@ -30,25 +30,81 @@ impl MinMaxSequence {
}
}
/// The optional minimum and maximum sequence numbers seen for a given sequencer.
///
/// **IMPORTANT: These ranges include their start and their end (aka `[start, end]`)!**
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct OptionalMinMaxSequence {
min: Option<u64>,
max: u64,
}
impl OptionalMinMaxSequence {
/// Create new min-max sequence range.
///
/// This panics if `min > max`.
pub fn new(min: Option<u64>, max: u64) -> Self {
if let Some(min) = min {
assert!(
min <= max,
"min ({}) is greater than max ({}) sequence",
min,
max
);
}
Self { min, max }
}
pub fn min(&self) -> Option<u64> {
self.min
}
pub fn max(&self) -> u64 {
self.max
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_getters() {
fn test_min_max_getters() {
let min_max = MinMaxSequence::new(10, 20);
assert_eq!(min_max.min(), 10);
assert_eq!(min_max.max(), 20);
}
#[test]
fn test_accepts_equal_values() {
fn test_opt_min_max_getters() {
let min_max = OptionalMinMaxSequence::new(Some(10), 20);
assert_eq!(min_max.min(), Some(10));
assert_eq!(min_max.max(), 20);
let min_max = OptionalMinMaxSequence::new(None, 20);
assert_eq!(min_max.min(), None);
assert_eq!(min_max.max(), 20);
}
#[test]
fn test_min_max_accepts_equal_values() {
MinMaxSequence::new(10, 10);
}
#[test]
fn test_opt_min_max_accepts_equal_values() {
OptionalMinMaxSequence::new(Some(10), 10);
}
#[test]
#[should_panic(expected = "min (11) is greater than max (10) sequence")]
fn test_checks_values() {
fn test_min_max_checks_values() {
MinMaxSequence::new(11, 10);
}
#[test]
#[should_panic(expected = "min (11) is greater than max (10) sequence")]
fn test_opt_min_max_checks_values() {
OptionalMinMaxSequence::new(Some(11), 10);
}
}

View File

@ -11,8 +11,8 @@ use data_types::{partition_metadata::PartitionAddr, write_summary::WriteSummary}
use entry::Sequence;
use internal_types::guard::{ReadGuard, ReadLock};
use crate::checkpoint::PartitionCheckpoint;
use crate::min_max_sequence::MinMaxSequence;
use crate::{checkpoint::PartitionCheckpoint, min_max_sequence::OptionalMinMaxSequence};
use data_types::instant::to_approximate_datetime;
const DEFAULT_CLOSED_WINDOW_PERIOD: Duration = Duration::from_secs(30);
@ -80,7 +80,7 @@ pub struct FlushHandle {
timestamp: DateTime<Utc>,
/// The sequence number ranges not including those persisted by this flush
sequencer_numbers: BTreeMap<u32, MinMaxSequence>,
sequencer_numbers: BTreeMap<u32, OptionalMinMaxSequence>,
}
impl FlushHandle {
@ -230,7 +230,7 @@ impl PersistenceWindows {
/// Returns the sequence number range of unpersisted writes described by this instance
///
/// Can optionally skip the persistable window if any
fn sequence_numbers(&self, skip_persistable: bool) -> BTreeMap<u32, MinMaxSequence> {
fn sequence_numbers(&self, skip_persistable: bool) -> BTreeMap<u32, OptionalMinMaxSequence> {
if self.is_empty() {
Default::default()
}
@ -242,20 +242,23 @@ impl PersistenceWindows {
self.max_sequence_numbers
.iter()
.filter_map(|(sequencer_id, max_sequence_number)| {
.map(|(sequencer_id, max_sequence_number)| {
// Find first window containing writes from sequencer_id
let window = self
.windows()
.skip(skip)
.filter_map(|window| window.sequencer_numbers.get(sequencer_id))
.next()?;
.next();
assert!(window.max() <= *max_sequence_number);
let min = window.map(|window| {
assert!(window.max() <= *max_sequence_number);
window.min()
});
Some((
(
*sequencer_id,
MinMaxSequence::new(window.min(), *max_sequence_number),
))
OptionalMinMaxSequence::new(min, *max_sequence_number),
)
})
.collect()
}
@ -1131,7 +1134,7 @@ mod tests {
let flush_checkpoint = guard.checkpoint();
assert_eq!(
flush_checkpoint.sequencer_numbers(1).unwrap(),
MinMaxSequence::new(4, 4)
OptionalMinMaxSequence::new(Some(4), 4)
);
assert_eq!(flush_checkpoint.min_unpersisted_timestamp(), truncated_time);
@ -1139,7 +1142,7 @@ mod tests {
let checkpoint = w.checkpoint().unwrap();
assert_eq!(
checkpoint.sequencer_numbers(1).unwrap(),
MinMaxSequence::new(2, 4)
OptionalMinMaxSequence::new(Some(2), 4)
);
assert_eq!(checkpoint.min_unpersisted_timestamp(), start);
@ -1160,6 +1163,13 @@ mod tests {
let guard = w.flush_handle(instant + Duration::from_secs(240)).unwrap();
// that checkpoint has an optional minimum
let flush_checkpoint = guard.checkpoint();
assert_eq!(
flush_checkpoint.sequencer_numbers(1).unwrap(),
OptionalMinMaxSequence::new(None, 4)
);
w.add_range(
Some(&Sequence { id: 1, number: 9 }),
9,
@ -1258,7 +1268,7 @@ mod tests {
let checkpoint = flush.checkpoint();
assert_eq!(
checkpoint.sequencer_numbers(1).unwrap(),
MinMaxSequence::new(6, 10)
OptionalMinMaxSequence::new(Some(6), 10)
);
assert_eq!(checkpoint.min_unpersisted_timestamp(), truncated_time);
@ -1266,7 +1276,7 @@ mod tests {
let checkpoint = w.checkpoint().unwrap();
assert_eq!(
checkpoint.sequencer_numbers(1).unwrap(),
MinMaxSequence::new(2, 14)
OptionalMinMaxSequence::new(Some(2), 14)
);
assert_eq!(checkpoint.min_unpersisted_timestamp(), start);
@ -1276,7 +1286,7 @@ mod tests {
let checkpoint = w.checkpoint().unwrap();
assert_eq!(
checkpoint.sequencer_numbers(1).unwrap(),
MinMaxSequence::new(6, 14)
OptionalMinMaxSequence::new(Some(6), 14)
);
assert_eq!(checkpoint.min_unpersisted_timestamp(), start);