diff --git a/generated_types/protos/influxdata/iox/catalog/v1/parquet_metadata.proto b/generated_types/protos/influxdata/iox/catalog/v1/parquet_metadata.proto index 708649d3bc..bd649ecae0 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/parquet_metadata.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/parquet_metadata.proto @@ -53,8 +53,8 @@ message PartitionCheckpoint { // Was Minimum unpersisted timestamp. reserved 2; - // Maximum persisted timestamp. - google.protobuf.Timestamp max_persisted_timestamp = 3; + // Flush timestamp. + google.protobuf.Timestamp flush_timestamp = 3; } // Record of the playback state for the whole database. diff --git a/lifecycle/src/lib.rs b/lifecycle/src/lib.rs index 8907c87551..643feeb30d 100644 --- a/lifecycle/src/lib.rs +++ b/lifecycle/src/lib.rs @@ -93,7 +93,7 @@ pub trait LockablePartition: Sized + std::fmt::Display { /// Combines and deduplicates the data in `chunks` into two new chunks: /// /// 1. A read buffer chunk that contains any rows with timestamps - /// prior to `max_persistable_timestamp` + /// prior to `flush_timestamp` /// /// 2. A read buffer chunk (also written to the object store) with /// all other rows diff --git a/parquet_file/src/catalog/dump.rs b/parquet_file/src/catalog/dump.rs index 2af86ab7d1..44442cc4c9 100644 --- a/parquet_file/src/catalog/dump.rs +++ b/parquet_file/src/catalog/dump.rs @@ -484,7 +484,7 @@ File { max: 28, }, }, - max_persisted_timestamp: 1970-01-01T00:00:10.000000020Z, + flush_timestamp: 1970-01-01T00:00:10.000000020Z, }, database_checkpoint: DatabaseCheckpoint { sequencer_numbers: { diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index fc6d3a557d..0cdf8ff94d 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -338,15 +338,15 @@ impl IoxMetadata { } }) .collect::>>()?; - let max_persisted_timestamp = decode_timestamp_from_field( - proto_partition_checkpoint.max_persisted_timestamp, - "partition_checkpoint.max_persisted_timestamp", + let flush_timestamp = decode_timestamp_from_field( + proto_partition_checkpoint.flush_timestamp, + "partition_checkpoint.flush_timestamp", )?; let partition_checkpoint = PartitionCheckpoint::new( Arc::clone(&table_name), Arc::clone(&partition_key), sequencer_numbers, - max_persisted_timestamp, + flush_timestamp, ); // extract database checkpoint @@ -406,9 +406,7 @@ impl IoxMetadata { ) }) .collect(), - max_persisted_timestamp: Some( - self.partition_checkpoint.max_persisted_timestamp().into(), - ), + flush_timestamp: Some(self.partition_checkpoint.flush_timestamp().into()), }; let proto_database_checkpoint = proto::DatabaseCheckpoint { diff --git a/parquet_file/src/test_utils.rs b/parquet_file/src/test_utils.rs index baf21809df..67e5fd9c93 100644 --- a/parquet_file/src/test_utils.rs +++ b/parquet_file/src/test_utils.rs @@ -929,12 +929,12 @@ pub fn create_partition_and_database_checkpoint( let mut sequencer_numbers_1 = BTreeMap::new(); sequencer_numbers_1.insert(1, OptionalMinMaxSequence::new(None, 18)); sequencer_numbers_1.insert(2, OptionalMinMaxSequence::new(Some(25), 28)); - let max_persisted_timestamp = Utc.timestamp(10, 20); + let flush_timestamp = Utc.timestamp(10, 20); let partition_checkpoint_1 = PartitionCheckpoint::new( Arc::clone(&table_name), Arc::clone(&partition_key), sequencer_numbers_1, - max_persisted_timestamp, + flush_timestamp, ); // create second partition diff --git a/persistence_windows/src/checkpoint.rs b/persistence_windows/src/checkpoint.rs index 59509794b7..ac7a571054 100644 --- a/persistence_windows/src/checkpoint.rs +++ b/persistence_windows/src/checkpoint.rs @@ -333,10 +333,10 @@ pub type Result = std::result::Result; /// /// ```text /// ┌───────────────────┬ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┌─────────────────────┐ -/// │seq < min_sequence │ time <= max_persisted │ seq > max sequence │ +/// │seq < min_sequence │ time <= flush_ts │ seq > max sequence │ /// │ │ PERSISTED │ │ /// │ ├──────────────────────────┤ │ -/// │ PERSISTED │ time > max_persisted │ UNPERSISTED │ +/// │ PERSISTED │ time > flush_ts │ UNPERSISTED │ /// │ │ UNPERSISTED │ │ /// └───────────────────┤─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┼─────────────────────┘ /// @@ -360,7 +360,7 @@ pub type Result = std::result::Result; /// sequencer 1 yield "less" and the ranges for sequencer 2 yield "greater"; or if the sequence number ranges yield /// "less" but the first checkpoint has more sequencers than the second. /// -/// Note that they are NOT compared based on the [`max_persisted_timestamp`](Self::max_persisted_timestamp) since +/// Note that they are NOT compared based on the [`flush_timestamp`](Self::flush_timestamp) since /// that one depends on the data ingested by the user and might go backwards during backfills. #[derive(Debug, Clone, PartialEq, Eq)] pub struct PartitionCheckpoint { @@ -373,10 +373,8 @@ pub struct PartitionCheckpoint { /// Maps `sequencer_id` to the to-be-persisted minimum and seen maximum sequence numbers. sequencer_numbers: BTreeMap, - /// Maximum persisted timestamp value of the - /// [`TIME_COLUMN_NAME`](internal_types::schema::TIME_COLUMN_NAME) - /// (aka "flush timestamp") - max_persisted_timestamp: DateTime, + /// Flush timestamp + flush_timestamp: DateTime, } impl PartitionCheckpoint { @@ -385,13 +383,13 @@ impl PartitionCheckpoint { table_name: Arc, partition_key: Arc, sequencer_numbers: BTreeMap, - max_persisted_timestamp: DateTime, + flush_timestamp: DateTime, ) -> Self { Self { table_name, partition_key, sequencer_numbers, - max_persisted_timestamp, + flush_timestamp, } } @@ -428,8 +426,8 @@ impl PartitionCheckpoint { } /// Maximum persisted timestamp. - pub fn max_persisted_timestamp(&self) -> DateTime { - self.max_persisted_timestamp + pub fn flush_timestamp(&self) -> DateTime { + self.flush_timestamp } } @@ -911,8 +909,8 @@ mod tests { ($table_name:expr, $partition_key:expr, {$($sequencer_number:expr => ($min:expr, $max:expr)),*}) => { { let sequencer_numbers = sequencer_numbers!{$($sequencer_number => ($min, $max)),*}; - let max_persisted_timestamp = DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(0, 0), Utc); - PartitionCheckpoint::new(Arc::from($table_name), Arc::from($partition_key), sequencer_numbers, max_persisted_timestamp) + let flush_timestamp = DateTime::from_utc(chrono::NaiveDateTime::from_timestamp(0, 0), Utc); + PartitionCheckpoint::new(Arc::from($table_name), Arc::from($partition_key), sequencer_numbers, flush_timestamp) } }; } diff --git a/persistence_windows/src/persistence_windows.rs b/persistence_windows/src/persistence_windows.rs index 386cfecdae..4a6f403a4d 100644 --- a/persistence_windows/src/persistence_windows.rs +++ b/persistence_windows/src/persistence_windows.rs @@ -1195,7 +1195,7 @@ mod tests { flush_checkpoint.sequencer_numbers(1).unwrap(), OptionalMinMaxSequence::new(Some(4), 4) ); - assert_eq!(flush_checkpoint.max_persisted_timestamp(), flush_t); + assert_eq!(flush_checkpoint.flush_timestamp(), flush_t); // The sequencer numbers on the partition should include everything let sequencer_numbers = w.sequencer_numbers(); @@ -1338,7 +1338,7 @@ mod tests { checkpoint.sequencer_numbers(1).unwrap(), OptionalMinMaxSequence::new(Some(6), 10) ); - assert_eq!(checkpoint.max_persisted_timestamp(), flush_t); + assert_eq!(checkpoint.flush_timestamp(), flush_t); // The sequencer numbers of partition should include everything let sequencer_numbers = w.sequencer_numbers(); @@ -1577,7 +1577,7 @@ mod tests { let handle = w.flush_handle(t2).unwrap(); assert_eq!(handle.timestamp(), MAX_DATETIME); let ckpt = handle.checkpoint(); - assert_eq!(ckpt.max_persisted_timestamp(), MAX_DATETIME); + assert_eq!(ckpt.flush_timestamp(), MAX_DATETIME); w.flush(handle); assert!(w.closed.is_empty()); diff --git a/server/src/db/replay.rs b/server/src/db/replay.rs index d084c8ffa4..7de3f2fc63 100644 --- a/server/src/db/replay.rs +++ b/server/src/db/replay.rs @@ -316,7 +316,7 @@ fn filter_entry( .map(|partition_checkpoint| { partition_checkpoint .sequencer_numbers(sequence.id) - .map(|min_max| (partition_checkpoint.max_persisted_timestamp(), min_max)) + .map(|min_max| (partition_checkpoint.flush_timestamp(), min_max)) }) .flatten();