From 0a625b50e656716521fbdd4964a30d5911845851 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 31 May 2021 13:49:58 +0200 Subject: [PATCH 01/10] feat: store transaction timestamp in preserved catalog --- Cargo.lock | 1 + .../influxdata/iox/catalog/v1/catalog.proto | 7 +++ parquet_file/Cargo.toml | 1 + parquet_file/src/catalog.rs | 46 +++++++++++++++++++ 4 files changed, 55 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index a9b68498b7..94a8241b72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2619,6 +2619,7 @@ dependencies = [ "arrow", "arrow_util", "bytes", + "chrono", "data_types", "datafusion 0.1.0", "datafusion_util", diff --git a/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto b/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto index d36a626ba4..79c06b398b 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto @@ -69,4 +69,11 @@ message Transaction { // UUID of last commit. string previous_uuid = 5; + + // Start timestamp. + // + // Timestamp of the start of the transaction encoded as [RFC 3339]. + // + // [RFC 3339]: https://datatracker.ietf.org/doc/html/rfc3339 + string start_timestamp = 6; } \ No newline at end of file diff --git a/parquet_file/Cargo.toml b/parquet_file/Cargo.toml index b5eca7dba5..a9ed5f633e 100644 --- a/parquet_file/Cargo.toml +++ b/parquet_file/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] # In alphabetical order arrow = { version = "4.0", features = ["prettyprint"] } bytes = "1.0" +chrono = "0.4" data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } datafusion_util = { path = "../datafusion_util" } diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 6b6a316d4c..3ddd80b748 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -11,6 +11,7 @@ use std::{ use crate::metadata::{parquet_metadata_to_thrift, thrift_to_parquet_metadata}; use bytes::Bytes; +use chrono::{DateTime, FixedOffset, Utc}; use data_types::server_id::ServerId; use futures::TryStreamExt; use generated_types::influxdata::iox::catalog::v1 as proto; @@ -164,6 +165,9 @@ pub enum Error { #[snafu(display("Catalog already exists"))] AlreadyExists {}, + + #[snafu(display("Cannot parse datetime: {}", source))] + DateTimeParseError { source: chrono::ParseError }, } pub type Result = std::result::Result; @@ -645,6 +649,15 @@ fn unparse_dirs_and_filename(path: &DirsAndFileName) -> proto::Path { } } +/// Parse timestamp encoded as [RFC 3339]. +/// +/// [RFC 3339]: https://datatracker.ietf.org/doc/html/rfc3339 +fn parse_timestamp(s: &str) -> Result> { + Ok(DateTime::::from( + DateTime::::parse_from_rfc3339(s).context(DateTimeParseError)?, + )) +} + /// Key to address transactions. #[derive(Clone, Debug)] struct TransactionKey { @@ -686,6 +699,7 @@ where uuid: uuid.to_string(), revision_counter, previous_uuid, + start_timestamp: Utc::now().to_rfc3339(), }, } } @@ -818,6 +832,7 @@ where } .fail()?; } + parse_timestamp(&proto.start_timestamp)?; // apply for action in &proto.actions { @@ -1599,6 +1614,37 @@ mod tests { ); } + #[tokio::test] + async fn test_missing_start_timestamp() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; + + // break transaction file + assert!(trace.tkeys.len() >= 2); + let tkey = &trace.tkeys[0]; + let path = transaction_path(&object_store, server_id, db_name, tkey); + let mut proto = load_transaction_proto(&object_store, &path).await.unwrap(); + proto.start_timestamp = String::new(); + store_transaction_proto(&object_store, &path, &proto) + .await + .unwrap(); + + // loading catalog should fail now + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; + assert_eq!( + res.unwrap_err().to_string(), + "Cannot parse datetime: premature end of input" + ); + } + /// Get sorted list of catalog files from state fn get_catalog_parquet_files(state: &TestCatalogState) -> Vec<(String, ParquetMetaData)> { let mut files: Vec<(String, ParquetMetaData)> = state From 9aee961e2af8825b1b0dbf279bba4c4674f67e48 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 31 May 2021 14:28:53 +0200 Subject: [PATCH 02/10] test: test loading catalogs from broken protobufs --- parquet_file/src/catalog.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 3ddd80b748..c6e7238c4b 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -1514,6 +1514,42 @@ mod tests { ); } + #[tokio::test] + async fn test_broken_protobuf() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; + + // break transaction file + assert!(trace.tkeys.len() >= 2); + let tkey = &trace.tkeys[0]; + let path = transaction_path(&object_store, server_id, db_name, tkey); + let data = Bytes::from("foo"); + let len = data.len(); + object_store + .put( + &path, + futures::stream::once(async move { Ok(data) }), + Some(len), + ) + .await + .unwrap(); + + // loading catalog should fail now + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; + assert_eq!( + res.unwrap_err().to_string(), + "Error during deserialization: failed to decode Protobuf message: invalid wire type value: 6" + ); + } + #[tokio::test] async fn test_transaction_handle_debug() { let object_store = make_object_store(); From 5f77b7b92b60eddb6c06f59768e92098dd938022 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 31 May 2021 14:29:17 +0200 Subject: [PATCH 03/10] feat: add `parquet_file::catalog::find_last_transaction_timestamp` --- parquet_file/src/catalog.rs | 113 ++++++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index c6e7238c4b..51d32d6a5c 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -213,6 +213,35 @@ pub trait CatalogState { fn remove(&self, path: DirsAndFileName) -> Result<()>; } +/// Find last transaction-start-timestamp. +/// +/// This method is designed to read and verify as little as possible and should also work on most broken catalogs. +pub async fn find_last_transaction_timestamp( + object_store: &ObjectStore, + server_id: ServerId, + db_name: &str, +) -> Result>> { + let mut res = None; + for (path, _revision_counter, _uuid) in + list_transaction_files(object_store, server_id, db_name).await? + { + match load_transaction_proto(object_store, &path).await { + Ok(proto) => match parse_timestamp(&proto.start_timestamp) { + Ok(ts) => { + res = Some(res.map_or(ts, |res: DateTime| res.max(ts))); + } + Err(e) => warn!("Cannot parse timestamp from {:?}: {}", path, e), + }, + Err(e @ Error::Read { .. }) => { + // bubble up IO error + return Err(e); + } + Err(e) => warn!("Cannot read transaction from {:?}: {}", path, e), + } + } + Ok(res) +} + /// Inner mutable part of the preserved catalog. struct PreservedCatalogInner where @@ -2078,6 +2107,90 @@ mod tests { assert_eq!(t.uuid(), Uuid::nil()); } + #[tokio::test] + async fn test_find_last_transaction_timestamp_ok() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; + + assert!( + find_last_transaction_timestamp(&object_store, server_id, db_name) + .await + .unwrap() + .is_some() + ); + } + + #[tokio::test] + async fn test_find_last_transaction_timestamp_empty() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + + assert!( + find_last_transaction_timestamp(&object_store, server_id, db_name) + .await + .unwrap() + .is_none() + ); + } + + #[tokio::test] + async fn test_find_last_transaction_timestamp_datetime_broken() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; + + // break transaction file + assert!(trace.tkeys.len() >= 2); + let tkey = &trace.tkeys[0]; + let path = transaction_path(&object_store, server_id, db_name, tkey); + let mut proto = load_transaction_proto(&object_store, &path).await.unwrap(); + proto.start_timestamp = String::new(); + store_transaction_proto(&object_store, &path, &proto) + .await + .unwrap(); + + assert!( + find_last_transaction_timestamp(&object_store, server_id, db_name) + .await + .unwrap() + .is_some() + ); + } + + #[tokio::test] + async fn test_find_last_transaction_timestamp_protobuf_broken() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; + + // break transaction file + assert!(trace.tkeys.len() >= 2); + let tkey = &trace.tkeys[0]; + let path = transaction_path(&object_store, server_id, db_name, tkey); + let data = Bytes::from("foo"); + let len = data.len(); + object_store + .put( + &path, + futures::stream::once(async move { Ok(data) }), + Some(len), + ) + .await + .unwrap(); + + assert!( + find_last_transaction_timestamp(&object_store, server_id, db_name) + .await + .unwrap() + .is_some() + ); + } + async fn assert_catalog_roundtrip_works( object_store: &Arc, server_id: ServerId, From 9b9400803bcf4ab8d4253ad1e18d106b6b613221 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 31 May 2021 14:33:42 +0200 Subject: [PATCH 04/10] refactor!: bump transaction version to 2 --- parquet_file/src/catalog.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 51d32d6a5c..8256b7ea2a 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -30,7 +30,7 @@ use uuid::Uuid; /// Current version for serialized transactions. /// /// For breaking changes, this will change. -pub const TRANSACTION_VERSION: u32 = 1; +pub const TRANSACTION_VERSION: u32 = 2; /// File suffix for transaction files in object store. pub const TRANSACTION_FILE_SUFFIX: &str = "txn"; @@ -1323,7 +1323,7 @@ mod tests { .await; assert_eq!( res.unwrap_err().to_string(), - "Format version of transaction file for revision 2 is 42 but only [1] are supported" + format!("Format version of transaction file for revision 2 is 42 but only [{}] are supported", TRANSACTION_VERSION) ); } From 77aeb5ca5d409dedb8f06a296ad230095b17cb13 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 31 May 2021 15:26:32 +0200 Subject: [PATCH 05/10] refactor: use protobuf-native Timestamp instead of string --- .../influxdata/iox/catalog/v1/catalog.proto | 8 ++--- parquet_file/src/catalog.rs | 31 +++++++++++-------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto b/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto index 79c06b398b..a00f5a846c 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto @@ -1,6 +1,8 @@ syntax = "proto3"; package influxdata.iox.catalog.v1; +import "google/protobuf/timestamp.proto"; + // Path for object store interaction. message Path { // Directory hierarchy. @@ -72,8 +74,6 @@ message Transaction { // Start timestamp. // - // Timestamp of the start of the transaction encoded as [RFC 3339]. - // - // [RFC 3339]: https://datatracker.ietf.org/doc/html/rfc3339 - string start_timestamp = 6; + // Timestamp of the start of the transaction. + google.protobuf.Timestamp start_timestamp = 6; } \ No newline at end of file diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 8256b7ea2a..9746ac17a9 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -4,6 +4,7 @@ use std::{ hash_map::Entry::{Occupied, Vacant}, HashMap, }, + convert::{Infallible, TryInto}, fmt::{Debug, Display}, str::FromStr, sync::Arc, @@ -11,7 +12,7 @@ use std::{ use crate::metadata::{parquet_metadata_to_thrift, thrift_to_parquet_metadata}; use bytes::Bytes; -use chrono::{DateTime, FixedOffset, Utc}; +use chrono::{DateTime, Utc}; use data_types::server_id::ServerId; use futures::TryStreamExt; use generated_types::influxdata::iox::catalog::v1 as proto; @@ -166,8 +167,11 @@ pub enum Error { #[snafu(display("Catalog already exists"))] AlreadyExists {}, + #[snafu(display("Datetime required but missing"))] + DateTimeRequired {}, + #[snafu(display("Cannot parse datetime: {}", source))] - DateTimeParseError { source: chrono::ParseError }, + DateTimeParseError { source: Infallible }, } pub type Result = std::result::Result; @@ -678,13 +682,14 @@ fn unparse_dirs_and_filename(path: &DirsAndFileName) -> proto::Path { } } -/// Parse timestamp encoded as [RFC 3339]. -/// -/// [RFC 3339]: https://datatracker.ietf.org/doc/html/rfc3339 -fn parse_timestamp(s: &str) -> Result> { - Ok(DateTime::::from( - DateTime::::parse_from_rfc3339(s).context(DateTimeParseError)?, - )) +/// Parse timestamp from protobuf. +fn parse_timestamp( + ts: &Option, +) -> Result> { + let ts: generated_types::google::protobuf::Timestamp = + ts.as_ref().context(DateTimeRequired)?.clone(); + let ts: DateTime = ts.try_into().unwrap(); + Ok(ts) } /// Key to address transactions. @@ -728,7 +733,7 @@ where uuid: uuid.to_string(), revision_counter, previous_uuid, - start_timestamp: Utc::now().to_rfc3339(), + start_timestamp: Some(Utc::now().into()), }, } } @@ -1691,7 +1696,7 @@ mod tests { let tkey = &trace.tkeys[0]; let path = transaction_path(&object_store, server_id, db_name, tkey); let mut proto = load_transaction_proto(&object_store, &path).await.unwrap(); - proto.start_timestamp = String::new(); + proto.start_timestamp = None; store_transaction_proto(&object_store, &path, &proto) .await .unwrap(); @@ -1706,7 +1711,7 @@ mod tests { .await; assert_eq!( res.unwrap_err().to_string(), - "Cannot parse datetime: premature end of input" + "Datetime required but missing" ); } @@ -2148,7 +2153,7 @@ mod tests { let tkey = &trace.tkeys[0]; let path = transaction_path(&object_store, server_id, db_name, tkey); let mut proto = load_transaction_proto(&object_store, &path).await.unwrap(); - proto.start_timestamp = String::new(); + proto.start_timestamp = None; store_transaction_proto(&object_store, &path, &proto) .await .unwrap(); From 64bf8c5182e341656d1374633bf729dc6e3682e2 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 2 Jun 2021 09:12:06 +0200 Subject: [PATCH 06/10] docs: add code comment explaining why we parse transaction timestamps Co-authored-by: Andrew Lamb --- parquet_file/src/catalog.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 9746ac17a9..2ff7c98b22 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -866,6 +866,7 @@ where } .fail()?; } + // verify we can parse the timestamp (checking that no error is raised) parse_timestamp(&proto.start_timestamp)?; // apply From 2a0b2698c6af07e909f1d335bc9c50d2ba12455a Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 2 Jun 2021 09:16:39 +0200 Subject: [PATCH 07/10] fix: use structured logging Co-authored-by: Andrew Lamb --- parquet_file/src/catalog.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 2ff7c98b22..02ca3ada89 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -234,13 +234,13 @@ pub async fn find_last_transaction_timestamp( Ok(ts) => { res = Some(res.map_or(ts, |res: DateTime| res.max(ts))); } - Err(e) => warn!("Cannot parse timestamp from {:?}: {}", path, e), + Err(e) => warn!(%e, ?path, "Cannot parse timestamp"), }, Err(e @ Error::Read { .. }) => { // bubble up IO error return Err(e); } - Err(e) => warn!("Cannot read transaction from {:?}: {}", path, e), + Err(e) => warn!(%e, ?path, "Cannot read transaction"), } } Ok(res) From fc0a74920fb2080cfb566a5cb9132a6ccc179811 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 2 Jun 2021 09:16:53 +0200 Subject: [PATCH 08/10] fix: use clearer error text --- parquet_file/src/catalog.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 02ca3ada89..e478d20a06 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -167,7 +167,7 @@ pub enum Error { #[snafu(display("Catalog already exists"))] AlreadyExists {}, - #[snafu(display("Datetime required but missing"))] + #[snafu(display("Internal: Datetime required but missing in serialized catalog"))] DateTimeRequired {}, #[snafu(display("Cannot parse datetime: {}", source))] @@ -1712,7 +1712,7 @@ mod tests { .await; assert_eq!( res.unwrap_err().to_string(), - "Datetime required but missing" + "Internal: Datetime required but missing in serialized catalog" ); } From 98e413d5a95070895218d288217b12f9768c7114 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 2 Jun 2021 09:39:51 +0200 Subject: [PATCH 09/10] fix: do not unwrap broken timestamps in serialized catalog --- parquet_file/src/catalog.rs | 43 +++++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index e478d20a06..de5889f4be 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -4,8 +4,9 @@ use std::{ hash_map::Entry::{Occupied, Vacant}, HashMap, }, - convert::{Infallible, TryInto}, + convert::TryInto, fmt::{Debug, Display}, + num::TryFromIntError, str::FromStr, sync::Arc, }; @@ -170,8 +171,8 @@ pub enum Error { #[snafu(display("Internal: Datetime required but missing in serialized catalog"))] DateTimeRequired {}, - #[snafu(display("Cannot parse datetime: {}", source))] - DateTimeParseError { source: Infallible }, + #[snafu(display("Internal: Cannot parse datetime in serialized catalog: {}", source))] + DateTimeParseError { source: TryFromIntError }, } pub type Result = std::result::Result; @@ -688,7 +689,7 @@ fn parse_timestamp( ) -> Result> { let ts: generated_types::google::protobuf::Timestamp = ts.as_ref().context(DateTimeRequired)?.clone(); - let ts: DateTime = ts.try_into().unwrap(); + let ts: DateTime = ts.try_into().context(DateTimeParseError)?; Ok(ts) } @@ -1716,6 +1717,40 @@ mod tests { ); } + #[tokio::test] + async fn test_broken_start_timestamp() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; + + // break transaction file + assert!(trace.tkeys.len() >= 2); + let tkey = &trace.tkeys[0]; + let path = transaction_path(&object_store, server_id, db_name, tkey); + let mut proto = load_transaction_proto(&object_store, &path).await.unwrap(); + proto.start_timestamp = Some(generated_types::google::protobuf::Timestamp { + seconds: 0, + nanos: -1, + }); + store_transaction_proto(&object_store, &path, &proto) + .await + .unwrap(); + + // loading catalog should fail now + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; + assert_eq!( + res.unwrap_err().to_string(), + "Internal: Cannot parse datetime in serialized catalog: out of range integral type conversion attempted" + ); + } + /// Get sorted list of catalog files from state fn get_catalog_parquet_files(state: &TestCatalogState) -> Vec<(String, ParquetMetaData)> { let mut files: Vec<(String, ParquetMetaData)> = state From e5b65e10ac330ed1227ab56624d8ea672cc8ba41 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Wed, 2 Jun 2021 09:55:58 +0200 Subject: [PATCH 10/10] test: ensure that `find_last_transaction_timestamp` indeed returns the last timestamp --- parquet_file/src/catalog.rs | 74 ++++++++++++++++++++++++++++++------- 1 file changed, 61 insertions(+), 13 deletions(-) diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index de5889f4be..ac030be46c 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -1828,6 +1828,7 @@ mod tests { struct TestTrace { tkeys: Vec, states: Vec, + post_timestamps: Vec>, } impl TestTrace { @@ -1835,6 +1836,7 @@ mod tests { Self { tkeys: vec![], states: vec![], + post_timestamps: vec![], } } @@ -1842,6 +1844,7 @@ mod tests { self.tkeys .push(catalog.inner.read().previous_tkey.clone().unwrap()); self.states.push(catalog.state().deref().clone()); + self.post_timestamps.push(Utc::now()); } } @@ -2153,13 +2156,28 @@ mod tests { let object_store = make_object_store(); let server_id = make_server_id(); let db_name = "db1"; - assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; + let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; + let ts = find_last_transaction_timestamp(&object_store, server_id, db_name) + .await + .unwrap() + .unwrap(); + + // last trace entry is an aborted transaction, so the valid transaction timestamp is the third last + let second_last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 3]; assert!( - find_last_transaction_timestamp(&object_store, server_id, db_name) - .await - .unwrap() - .is_some() + ts > second_last_committed_end_ts, + "failed: last start ts ({}) > second last committed end ts ({})", + ts, + second_last_committed_end_ts + ); + + let last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 2]; + assert!( + ts < last_committed_end_ts, + "failed: last start ts ({}) < last committed end ts ({})", + ts, + last_committed_end_ts ); } @@ -2194,11 +2212,26 @@ mod tests { .await .unwrap(); + let ts = find_last_transaction_timestamp(&object_store, server_id, db_name) + .await + .unwrap() + .unwrap(); + + // last trace entry is an aborted transaction, so the valid transaction timestamp is the third last + let second_last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 3]; assert!( - find_last_transaction_timestamp(&object_store, server_id, db_name) - .await - .unwrap() - .is_some() + ts > second_last_committed_end_ts, + "failed: last start ts ({}) > second last committed end ts ({})", + ts, + second_last_committed_end_ts + ); + + let last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 2]; + assert!( + ts < last_committed_end_ts, + "failed: last start ts ({}) < last committed end ts ({})", + ts, + last_committed_end_ts ); } @@ -2224,11 +2257,26 @@ mod tests { .await .unwrap(); + let ts = find_last_transaction_timestamp(&object_store, server_id, db_name) + .await + .unwrap() + .unwrap(); + + // last trace entry is an aborted transaction, so the valid transaction timestamp is the third last + let second_last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 3]; assert!( - find_last_transaction_timestamp(&object_store, server_id, db_name) - .await - .unwrap() - .is_some() + ts > second_last_committed_end_ts, + "failed: last start ts ({}) > second last committed end ts ({})", + ts, + second_last_committed_end_ts + ); + + let last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 2]; + assert!( + ts < last_committed_end_ts, + "failed: last start ts ({}) < last committed end ts ({})", + ts, + last_committed_end_ts ); }