refactor: "min unpersisted ts" => "max persisted ts"

Store the "maximum persisted timestamp" instead of the "minimum
unpersisted timestamp". This avoids the need to calculate the next
timestamp from the current one (which was done via "max TS + 1ns").

The old calculation was prone to overflow panics. Since the
timestamps in this calculation originate from user-provided data (and
not the wall clock), this was an easy DoS vector that could be triggered
via the following line protocol:

```text
table_1 foo=1 <i64::MAX>
```

which is

```text
table_1 foo=1 9223372036854775807
```

Bonus points: the timestamp persisted in the partition
checkpoints is now the very same that was used by the split query during
persistence. Consistence FTW!

Fixes #2225.
pull/24376/head
Marco Neumann 2021-10-08 11:52:49 +02:00
parent 8817ee8d61
commit 63a932fa37
9 changed files with 101 additions and 57 deletions

View File

@ -50,8 +50,11 @@ message PartitionCheckpoint {
// Maps `sequencer_id` to the to-be-persisted minimum and seen maximum sequence numbers. // Maps `sequencer_id` to the to-be-persisted minimum and seen maximum sequence numbers.
map<uint32, OptionalMinMaxSequence> sequencer_numbers = 1; map<uint32, OptionalMinMaxSequence> sequencer_numbers = 1;
// Minimum unpersisted timestamp. // Was Minimum unpersisted timestamp.
google.protobuf.Timestamp min_unpersisted_timestamp = 2; reserved 2;
// Maximum persisted timestamp.
google.protobuf.Timestamp max_persisted_timestamp = 3;
} }
// Record of the playback state for the whole database. // Record of the playback state for the whole database.

View File

@ -39,7 +39,7 @@ pub use crate::catalog::internals::proto_parse::Error as ProtoParseError;
/// Current version for serialized transactions. /// Current version for serialized transactions.
/// ///
/// For breaking changes, this will change. /// For breaking changes, this will change.
pub const TRANSACTION_VERSION: u32 = 18; pub const TRANSACTION_VERSION: u32 = 19;
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {

View File

@ -272,7 +272,7 @@ File {
is_checkpoint: false, is_checkpoint: false,
proto: Ok( proto: Ok(
Transaction { Transaction {
version: 18, version: 19,
actions: [], actions: [],
revision_counter: 0, revision_counter: 0,
uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
@ -297,7 +297,7 @@ File {
is_checkpoint: false, is_checkpoint: false,
proto: Ok( proto: Ok(
Transaction { Transaction {
version: 18, version: 19,
actions: [ actions: [
Action { Action {
action: Some( action: Some(
@ -396,7 +396,7 @@ File {
is_checkpoint: false, is_checkpoint: false,
proto: Ok( proto: Ok(
Transaction { Transaction {
version: 18, version: 19,
actions: [], actions: [],
revision_counter: 0, revision_counter: 0,
uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", uuid: b"\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
@ -421,7 +421,7 @@ File {
is_checkpoint: false, is_checkpoint: false,
proto: Ok( proto: Ok(
Transaction { Transaction {
version: 18, version: 19,
actions: [ actions: [
Action { Action {
action: Some( action: Some(
@ -484,7 +484,7 @@ File {
max: 28, max: 28,
}, },
}, },
min_unpersisted_timestamp: 1970-01-01T00:00:10.000000020Z, max_persisted_timestamp: 1970-01-01T00:00:10.000000020Z,
}, },
database_checkpoint: DatabaseCheckpoint { database_checkpoint: DatabaseCheckpoint {
sequencer_numbers: { sequencer_numbers: {

View File

@ -121,7 +121,7 @@ use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputPro
/// ///
/// **Important: When changing this structure, consider bumping the /// **Important: When changing this structure, consider bumping the
/// [catalog transaction version](crate::catalog::core::TRANSACTION_VERSION)!** /// [catalog transaction version](crate::catalog::core::TRANSACTION_VERSION)!**
pub const METADATA_VERSION: u32 = 9; pub const METADATA_VERSION: u32 = 10;
/// File-level metadata key to store the IOx-specific data. /// File-level metadata key to store the IOx-specific data.
/// ///
@ -338,15 +338,15 @@ impl IoxMetadata {
} }
}) })
.collect::<Result<BTreeMap<u32, OptionalMinMaxSequence>>>()?; .collect::<Result<BTreeMap<u32, OptionalMinMaxSequence>>>()?;
let min_unpersisted_timestamp = decode_timestamp_from_field( let max_persisted_timestamp = decode_timestamp_from_field(
proto_partition_checkpoint.min_unpersisted_timestamp, proto_partition_checkpoint.max_persisted_timestamp,
"partition_checkpoint.min_unpersisted_timestamp", "partition_checkpoint.max_persisted_timestamp",
)?; )?;
let partition_checkpoint = PartitionCheckpoint::new( let partition_checkpoint = PartitionCheckpoint::new(
Arc::clone(&table_name), Arc::clone(&table_name),
Arc::clone(&partition_key), Arc::clone(&partition_key),
sequencer_numbers, sequencer_numbers,
min_unpersisted_timestamp, max_persisted_timestamp,
); );
// extract database checkpoint // extract database checkpoint
@ -406,8 +406,8 @@ impl IoxMetadata {
) )
}) })
.collect(), .collect(),
min_unpersisted_timestamp: Some( max_persisted_timestamp: Some(
self.partition_checkpoint.min_unpersisted_timestamp().into(), self.partition_checkpoint.max_persisted_timestamp().into(),
), ),
}; };

View File

@ -929,12 +929,12 @@ pub fn create_partition_and_database_checkpoint(
let mut sequencer_numbers_1 = BTreeMap::new(); let mut sequencer_numbers_1 = BTreeMap::new();
sequencer_numbers_1.insert(1, OptionalMinMaxSequence::new(None, 18)); sequencer_numbers_1.insert(1, OptionalMinMaxSequence::new(None, 18));
sequencer_numbers_1.insert(2, OptionalMinMaxSequence::new(Some(25), 28)); sequencer_numbers_1.insert(2, OptionalMinMaxSequence::new(Some(25), 28));
let min_unpersisted_timestamp = Utc.timestamp(10, 20); let max_persisted_timestamp = Utc.timestamp(10, 20);
let partition_checkpoint_1 = PartitionCheckpoint::new( let partition_checkpoint_1 = PartitionCheckpoint::new(
Arc::clone(&table_name), Arc::clone(&table_name),
Arc::clone(&partition_key), Arc::clone(&partition_key),
sequencer_numbers_1, sequencer_numbers_1,
min_unpersisted_timestamp, max_persisted_timestamp,
); );
// create second partition // create second partition

View File

@ -316,9 +316,9 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// ///
/// This structure contains the minimum and maximum sequence numbers /// This structure contains the minimum and maximum sequence numbers
/// for each sequencer for a specific partition along with the /// for each sequencer for a specific partition along with the
/// min_unpersisted timestamp ("flush timestamp"). /// `max_persisted` timestamp ("flush timestamp").
/// ///
/// The min_unpersisted timestamp is relative to the value in /// The `min_persisted` timestamp is relative to the value in
/// [`TIME_COLUMN_NAME`](internal_types::schema::TIME_COLUMN_NAME). The /// [`TIME_COLUMN_NAME`](internal_types::schema::TIME_COLUMN_NAME). The
/// min/max sequence numbers are relative to their respective /// min/max sequence numbers are relative to their respective
/// sequencers. /// sequencers.
@ -333,10 +333,10 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// ///
/// ```text /// ```text
/// ┌───────────────────┬ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┌─────────────────────┐ /// ┌───────────────────┬ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┌─────────────────────┐
/// │seq < min_sequence │ time < min_unpersisted │ seq > max sequence │ /// │seq < min_sequence │ time <= max_persisted │ seq > max sequence │
/// │ │ PERSISTED │ │ /// │ │ PERSISTED │ │
/// │ ├──────────────────────────┤ │ /// │ ├──────────────────────────┤ │
/// │ PERSISTED │ time >= min_unpersisted │ UNPERSISTED │ /// │ PERSISTED │ time > max_persisted │ UNPERSISTED │
/// │ │ UNPERSISTED │ │ /// │ │ UNPERSISTED │ │
/// └───────────────────┤─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┼─────────────────────┘ /// └───────────────────┤─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┼─────────────────────┘
/// ///
@ -360,7 +360,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// sequencer 1 yield "less" and the ranges for sequencer 2 yield "greater"; or if the sequence number ranges yield /// sequencer 1 yield "less" and the ranges for sequencer 2 yield "greater"; or if the sequence number ranges yield
/// "less" but the first checkpoint has more sequencers than the second. /// "less" but the first checkpoint has more sequencers than the second.
/// ///
/// Note that they are NOT compared based on the [`min_unpersisted_timestamp`](Self::min_unpersisted_timestamp) since /// Note that they are NOT compared based on the [`max_persisted_timestamp`](Self::max_persisted_timestamp) since
/// that one depends on the data ingested by the user and might go backwards during backfills. /// that one depends on the data ingested by the user and might go backwards during backfills.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct PartitionCheckpoint { pub struct PartitionCheckpoint {
@ -373,10 +373,10 @@ pub struct PartitionCheckpoint {
/// Maps `sequencer_id` to the to-be-persisted minimum and seen maximum sequence numbers. /// Maps `sequencer_id` to the to-be-persisted minimum and seen maximum sequence numbers.
sequencer_numbers: BTreeMap<u32, OptionalMinMaxSequence>, sequencer_numbers: BTreeMap<u32, OptionalMinMaxSequence>,
/// Minimum unpersisted timestamp value of the /// Maximum persisted timestamp value of the
/// [`TIME_COLUMN_NAME`](internal_types::schema::TIME_COLUMN_NAME) + 1ns /// [`TIME_COLUMN_NAME`](internal_types::schema::TIME_COLUMN_NAME)
/// (aka "flush timestamp" + 1ns) /// (aka "flush timestamp")
min_unpersisted_timestamp: DateTime<Utc>, max_persisted_timestamp: DateTime<Utc>,
} }
impl PartitionCheckpoint { impl PartitionCheckpoint {
@ -385,13 +385,13 @@ impl PartitionCheckpoint {
table_name: Arc<str>, table_name: Arc<str>,
partition_key: Arc<str>, partition_key: Arc<str>,
sequencer_numbers: BTreeMap<u32, OptionalMinMaxSequence>, sequencer_numbers: BTreeMap<u32, OptionalMinMaxSequence>,
min_unpersisted_timestamp: DateTime<Utc>, max_persisted_timestamp: DateTime<Utc>,
) -> Self { ) -> Self {
Self { Self {
table_name, table_name,
partition_key, partition_key,
sequencer_numbers, sequencer_numbers,
min_unpersisted_timestamp, max_persisted_timestamp,
} }
} }
@ -427,9 +427,9 @@ impl PartitionCheckpoint {
.map(|(sequencer_id, min_max)| (*sequencer_id, *min_max)) .map(|(sequencer_id, min_max)| (*sequencer_id, *min_max))
} }
/// Minimum unpersisted timestamp. /// Maximum persisted timestamp.
pub fn min_unpersisted_timestamp(&self) -> DateTime<Utc> { pub fn max_persisted_timestamp(&self) -> DateTime<Utc> {
self.min_unpersisted_timestamp self.max_persisted_timestamp
} }
} }
@ -911,8 +911,8 @@ mod tests {
($table_name:expr, $partition_key:expr, {$($sequencer_number:expr => ($min:expr, $max:expr)),*}) => { ($table_name:expr, $partition_key:expr, {$($sequencer_number:expr => ($min:expr, $max:expr)),*}) => {
{ {
let sequencer_numbers = sequencer_numbers!{$($sequencer_number => ($min, $max)),*}; let sequencer_numbers = sequencer_numbers!{$($sequencer_number => ($min, $max)),*};
let min_unpersisted_timestamp = DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(0, 0), Utc); let max_persisted_timestamp = DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(0, 0), Utc);
PartitionCheckpoint::new(Arc::from($table_name), Arc::from($partition_key), sequencer_numbers, min_unpersisted_timestamp) PartitionCheckpoint::new(Arc::from($table_name), Arc::from($partition_key), sequencer_numbers, max_persisted_timestamp)
} }
}; };
} }

View File

@ -98,7 +98,7 @@ impl FlushHandle {
Arc::clone(&self.addr.table_name), Arc::clone(&self.addr.table_name),
Arc::clone(&self.addr.partition_key), Arc::clone(&self.addr.partition_key),
self.sequencer_numbers.clone(), self.sequencer_numbers.clone(),
self.timestamp + chrono::Duration::nanoseconds(1), self.timestamp,
) )
} }
} }
@ -350,18 +350,26 @@ impl PersistenceWindows {
persistable.max_time, timestamp, persistable.max_time, timestamp,
"persistable max time doesn't match handle" "persistable max time doesn't match handle"
); );
// Everything up to and including persistable max time will have been persisted
let new_min = persistable.max_time + chrono::Duration::nanoseconds(1);
for w in self.closed.iter_mut().take(closed_count) {
if w.min_time < new_min {
w.min_time = new_min;
}
}
// Drop any now empty windows // Everything up to and including persistable max time will have been persisted
let mut tail = self.closed.split_off(closed_count); if let Some(new_min) = persistable
self.closed.retain(|w| w.max_time >= new_min); .max_time
self.closed.append(&mut tail); .checked_add_signed(chrono::Duration::nanoseconds(1))
{
for w in self.closed.iter_mut().take(closed_count) {
if w.min_time < new_min {
w.min_time = new_min;
}
}
// Drop any now empty windows
let mut tail = self.closed.split_off(closed_count);
self.closed.retain(|w| w.max_time >= new_min);
self.closed.append(&mut tail);
} else {
// drop all windows (persisted everything)
self.closed.clear();
}
} }
/// Returns an iterator over the windows starting with the oldest /// Returns an iterator over the windows starting with the oldest
@ -548,7 +556,7 @@ impl Window {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use chrono::TimeZone; use chrono::{TimeZone, MAX_DATETIME, MIN_DATETIME};
use super::*; use super::*;
@ -1187,7 +1195,7 @@ mod tests {
flush_checkpoint.sequencer_numbers(1).unwrap(), flush_checkpoint.sequencer_numbers(1).unwrap(),
OptionalMinMaxSequence::new(Some(4), 4) OptionalMinMaxSequence::new(Some(4), 4)
); );
assert_eq!(flush_checkpoint.min_unpersisted_timestamp(), truncated_time); assert_eq!(flush_checkpoint.max_persisted_timestamp(), flush_t);
// The sequencer numbers on the partition should include everything // The sequencer numbers on the partition should include everything
let sequencer_numbers = w.sequencer_numbers(); let sequencer_numbers = w.sequencer_numbers();
@ -1330,7 +1338,7 @@ mod tests {
checkpoint.sequencer_numbers(1).unwrap(), checkpoint.sequencer_numbers(1).unwrap(),
OptionalMinMaxSequence::new(Some(6), 10) OptionalMinMaxSequence::new(Some(6), 10)
); );
assert_eq!(checkpoint.min_unpersisted_timestamp(), truncated_time); assert_eq!(checkpoint.max_persisted_timestamp(), flush_t);
// The sequencer numbers of partition should include everything // The sequencer numbers of partition should include everything
let sequencer_numbers = w.sequencer_numbers(); let sequencer_numbers = w.sequencer_numbers();
@ -1542,4 +1550,37 @@ mod tests {
expected.insert(2, OptionalMinMaxSequence::new(None, 3)); expected.insert(2, OptionalMinMaxSequence::new(None, 3));
assert_eq!(actual, expected); assert_eq!(actual, expected);
} }
#[test]
fn flush_min_max_timestamp() {
let mut w = make_windows(StdDuration::from_secs(30));
let t0 = Utc::now();
let t1 = t0 + Duration::seconds(30);
let t2 = t1 + Duration::seconds(3);
w.add_range(
Some(&Sequence { id: 1, number: 2 }),
NonZeroUsize::new(2).unwrap(),
MIN_DATETIME,
MAX_DATETIME,
t0,
);
w.add_range(
Some(&Sequence { id: 1, number: 3 }),
NonZeroUsize::new(2).unwrap(),
MIN_DATETIME,
MAX_DATETIME,
t1,
);
let handle = w.flush_handle(t2).unwrap();
assert_eq!(handle.timestamp(), MAX_DATETIME);
let ckpt = handle.checkpoint();
assert_eq!(ckpt.max_persisted_timestamp(), MAX_DATETIME);
w.flush(handle);
assert!(w.closed.is_empty());
assert!(w.persistable.is_none());
}
} }

View File

@ -1807,7 +1807,7 @@ mod tests {
.id(); .id();
// A chunk is now in the object store and still in read buffer // A chunk is now in the object store and still in read buffer
let expected_parquet_size = 1234; let expected_parquet_size = 1233;
catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", expected_read_buffer_size); catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", expected_read_buffer_size);
// now also in OS // now also in OS
catalog_chunk_size_bytes_metric_eq(registry, "object_store", expected_parquet_size); catalog_chunk_size_bytes_metric_eq(registry, "object_store", expected_parquet_size);
@ -2240,7 +2240,7 @@ mod tests {
// Read buffer + Parquet chunk size // Read buffer + Parquet chunk size
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0); catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0);
catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 1700); catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 1700);
catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1233); catalog_chunk_size_bytes_metric_eq(registry, "object_store", 1231);
// All the chunks should have different IDs // All the chunks should have different IDs
assert_ne!(mb_chunk.id(), rb_chunk.id()); assert_ne!(mb_chunk.id(), rb_chunk.id());
@ -2350,7 +2350,7 @@ mod tests {
let registry = test_db.metric_registry.as_ref(); let registry = test_db.metric_registry.as_ref();
// Read buffer + Parquet chunk size // Read buffer + Parquet chunk size
let object_store_bytes = 1233; let object_store_bytes = 1231;
catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0); catalog_chunk_size_bytes_metric_eq(registry, "mutable_buffer", 0);
catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 1700); catalog_chunk_size_bytes_metric_eq(registry, "read_buffer", 1700);
catalog_chunk_size_bytes_metric_eq(registry, "object_store", object_store_bytes); catalog_chunk_size_bytes_metric_eq(registry, "object_store", object_store_bytes);

View File

@ -311,17 +311,17 @@ fn filter_entry(
let table_name = table_batch.name(); let table_name = table_batch.name();
// Check if we have a partition checkpoint that contains data for this specific sequencer // Check if we have a partition checkpoint that contains data for this specific sequencer
let min_unpersisted_ts_and_sequence_range = replay_plan let max_persisted_ts_and_sequence_range = replay_plan
.last_partition_checkpoint(table_name, partition_key) .last_partition_checkpoint(table_name, partition_key)
.map(|partition_checkpoint| { .map(|partition_checkpoint| {
partition_checkpoint partition_checkpoint
.sequencer_numbers(sequence.id) .sequencer_numbers(sequence.id)
.map(|min_max| (partition_checkpoint.min_unpersisted_timestamp(), min_max)) .map(|min_max| (partition_checkpoint.max_persisted_timestamp(), min_max))
}) })
.flatten(); .flatten();
match min_unpersisted_ts_and_sequence_range { match max_persisted_ts_and_sequence_range {
Some((min_unpersisted_ts, min_max)) => { Some((max_persisted_ts, min_max)) => {
// Figure out what the sequence number tells us about the entire batch // Figure out what the sequence number tells us about the entire batch
match SequenceNumberSection::compare(sequence.number, min_max) { match SequenceNumberSection::compare(sequence.number, min_max) {
SequenceNumberSection::Persisted => { SequenceNumberSection::Persisted => {
@ -331,10 +331,10 @@ fn filter_entry(
SequenceNumberSection::PartiallyPersisted => { SequenceNumberSection::PartiallyPersisted => {
// TODO: implement row filtering, for now replay the entire batch // TODO: implement row filtering, for now replay the entire batch
let maybe_mask = table_batch.timestamps().ok().map(|timestamps| { let maybe_mask = table_batch.timestamps().ok().map(|timestamps| {
let min_unpersisted_ts = min_unpersisted_ts.timestamp_nanos(); let max_persisted_ts = max_persisted_ts.timestamp_nanos();
timestamps timestamps
.into_iter() .into_iter()
.map(|ts_row| ts_row >= min_unpersisted_ts) .map(|ts_row| ts_row > max_persisted_ts)
.collect::<Vec<bool>>() .collect::<Vec<bool>>()
}); });
(true, maybe_mask) (true, maybe_mask)