refactor: `max_persisted_timestamp` => `flush_timestamp`

There might be data left before this timestamp that wasn't persisted
(e.g. incoming data while the persistence was running).
pull/24376/head
Marco Neumann 2021-10-08 12:34:05 +02:00
parent f59ee631c1
commit d3de6bb6e4
8 changed files with 26 additions and 30 deletions

View File

@ -53,8 +53,8 @@ message PartitionCheckpoint {
// Was Minimum unpersisted timestamp.
reserved 2;
// Maximum persisted timestamp.
google.protobuf.Timestamp max_persisted_timestamp = 3;
// Flush timestamp.
google.protobuf.Timestamp flush_timestamp = 3;
}
// Record of the playback state for the whole database.

View File

@ -93,7 +93,7 @@ pub trait LockablePartition: Sized + std::fmt::Display {
/// Combines and deduplicates the data in `chunks` into two new chunks:
///
/// 1. A read buffer chunk that contains any rows with timestamps
/// prior to `max_persistable_timestamp`
/// prior to `flush_timestamp`
///
/// 2. A read buffer chunk (also written to the object store) with
/// all other rows

View File

@ -484,7 +484,7 @@ File {
max: 28,
},
},
max_persisted_timestamp: 1970-01-01T00:00:10.000000020Z,
flush_timestamp: 1970-01-01T00:00:10.000000020Z,
},
database_checkpoint: DatabaseCheckpoint {
sequencer_numbers: {

View File

@ -338,15 +338,15 @@ impl IoxMetadata {
}
})
.collect::<Result<BTreeMap<u32, OptionalMinMaxSequence>>>()?;
let max_persisted_timestamp = decode_timestamp_from_field(
proto_partition_checkpoint.max_persisted_timestamp,
"partition_checkpoint.max_persisted_timestamp",
let flush_timestamp = decode_timestamp_from_field(
proto_partition_checkpoint.flush_timestamp,
"partition_checkpoint.flush_timestamp",
)?;
let partition_checkpoint = PartitionCheckpoint::new(
Arc::clone(&table_name),
Arc::clone(&partition_key),
sequencer_numbers,
max_persisted_timestamp,
flush_timestamp,
);
// extract database checkpoint
@ -406,9 +406,7 @@ impl IoxMetadata {
)
})
.collect(),
max_persisted_timestamp: Some(
self.partition_checkpoint.max_persisted_timestamp().into(),
),
flush_timestamp: Some(self.partition_checkpoint.flush_timestamp().into()),
};
let proto_database_checkpoint = proto::DatabaseCheckpoint {

View File

@ -929,12 +929,12 @@ pub fn create_partition_and_database_checkpoint(
let mut sequencer_numbers_1 = BTreeMap::new();
sequencer_numbers_1.insert(1, OptionalMinMaxSequence::new(None, 18));
sequencer_numbers_1.insert(2, OptionalMinMaxSequence::new(Some(25), 28));
let max_persisted_timestamp = Utc.timestamp(10, 20);
let flush_timestamp = Utc.timestamp(10, 20);
let partition_checkpoint_1 = PartitionCheckpoint::new(
Arc::clone(&table_name),
Arc::clone(&partition_key),
sequencer_numbers_1,
max_persisted_timestamp,
flush_timestamp,
);
// create second partition

View File

@ -333,10 +333,10 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
///
/// ```text
/// ┌───────────────────┬ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┌─────────────────────┐
/// │seq < min_sequence │ time <= max_persisted │ seq > max sequence │
/// │seq < min_sequence │ time <= flush_ts │ seq > max sequence │
/// │ │ PERSISTED │ │
/// │ ├──────────────────────────┤ │
/// │ PERSISTED │ time > max_persisted │ UNPERSISTED │
/// │ PERSISTED │ time > flush_ts │ UNPERSISTED │
/// │ │ UNPERSISTED │ │
/// └───────────────────┤─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┼─────────────────────┘
///
@ -360,7 +360,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// sequencer 1 yield "less" and the ranges for sequencer 2 yield "greater"; or if the sequence number ranges yield
/// "less" but the first checkpoint has more sequencers than the second.
///
/// Note that they are NOT compared based on the [`max_persisted_timestamp`](Self::max_persisted_timestamp) since
/// Note that they are NOT compared based on the [`flush_timestamp`](Self::flush_timestamp) since
/// that one depends on the data ingested by the user and might go backwards during backfills.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PartitionCheckpoint {
@ -373,10 +373,8 @@ pub struct PartitionCheckpoint {
/// Maps `sequencer_id` to the to-be-persisted minimum and seen maximum sequence numbers.
sequencer_numbers: BTreeMap<u32, OptionalMinMaxSequence>,
/// Maximum persisted timestamp value of the
/// [`TIME_COLUMN_NAME`](internal_types::schema::TIME_COLUMN_NAME)
/// (aka "flush timestamp")
max_persisted_timestamp: DateTime<Utc>,
/// Flush timestamp
flush_timestamp: DateTime<Utc>,
}
impl PartitionCheckpoint {
@ -385,13 +383,13 @@ impl PartitionCheckpoint {
table_name: Arc<str>,
partition_key: Arc<str>,
sequencer_numbers: BTreeMap<u32, OptionalMinMaxSequence>,
max_persisted_timestamp: DateTime<Utc>,
flush_timestamp: DateTime<Utc>,
) -> Self {
Self {
table_name,
partition_key,
sequencer_numbers,
max_persisted_timestamp,
flush_timestamp,
}
}
@ -428,8 +426,8 @@ impl PartitionCheckpoint {
}
/// Maximum persisted timestamp.
pub fn max_persisted_timestamp(&self) -> DateTime<Utc> {
self.max_persisted_timestamp
pub fn flush_timestamp(&self) -> DateTime<Utc> {
self.flush_timestamp
}
}
@ -911,8 +909,8 @@ mod tests {
($table_name:expr, $partition_key:expr, {$($sequencer_number:expr => ($min:expr, $max:expr)),*}) => {
{
let sequencer_numbers = sequencer_numbers!{$($sequencer_number => ($min, $max)),*};
let max_persisted_timestamp = DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(0, 0), Utc);
PartitionCheckpoint::new(Arc::from($table_name), Arc::from($partition_key), sequencer_numbers, max_persisted_timestamp)
let flush_timestamp = DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(0, 0), Utc);
PartitionCheckpoint::new(Arc::from($table_name), Arc::from($partition_key), sequencer_numbers, flush_timestamp)
}
};
}

View File

@ -1195,7 +1195,7 @@ mod tests {
flush_checkpoint.sequencer_numbers(1).unwrap(),
OptionalMinMaxSequence::new(Some(4), 4)
);
assert_eq!(flush_checkpoint.max_persisted_timestamp(), flush_t);
assert_eq!(flush_checkpoint.flush_timestamp(), flush_t);
// The sequencer numbers on the partition should include everything
let sequencer_numbers = w.sequencer_numbers();
@ -1338,7 +1338,7 @@ mod tests {
checkpoint.sequencer_numbers(1).unwrap(),
OptionalMinMaxSequence::new(Some(6), 10)
);
assert_eq!(checkpoint.max_persisted_timestamp(), flush_t);
assert_eq!(checkpoint.flush_timestamp(), flush_t);
// The sequencer numbers of partition should include everything
let sequencer_numbers = w.sequencer_numbers();
@ -1577,7 +1577,7 @@ mod tests {
let handle = w.flush_handle(t2).unwrap();
assert_eq!(handle.timestamp(), MAX_DATETIME);
let ckpt = handle.checkpoint();
assert_eq!(ckpt.max_persisted_timestamp(), MAX_DATETIME);
assert_eq!(ckpt.flush_timestamp(), MAX_DATETIME);
w.flush(handle);
assert!(w.closed.is_empty());

View File

@ -316,7 +316,7 @@ fn filter_entry(
.map(|partition_checkpoint| {
partition_checkpoint
.sequencer_numbers(sequence.id)
.map(|min_max| (partition_checkpoint.max_persisted_timestamp(), min_max))
.map(|min_max| (partition_checkpoint.flush_timestamp(), min_max))
})
.flatten();