diff --git a/Cargo.lock b/Cargo.lock index a9b68498b7..94a8241b72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2619,6 +2619,7 @@ dependencies = [ "arrow", "arrow_util", "bytes", + "chrono", "data_types", "datafusion 0.1.0", "datafusion_util", diff --git a/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto b/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto index d36a626ba4..a00f5a846c 100644 --- a/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto +++ b/generated_types/protos/influxdata/iox/catalog/v1/catalog.proto @@ -1,6 +1,8 @@ syntax = "proto3"; package influxdata.iox.catalog.v1; +import "google/protobuf/timestamp.proto"; + // Path for object store interaction. message Path { // Directory hierarchy. @@ -69,4 +71,9 @@ message Transaction { // UUID of last commit. string previous_uuid = 5; + + // Start timestamp. + // + // Timestamp of the start of the transaction. + google.protobuf.Timestamp start_timestamp = 6; } \ No newline at end of file diff --git a/parquet_file/Cargo.toml b/parquet_file/Cargo.toml index b5eca7dba5..a9ed5f633e 100644 --- a/parquet_file/Cargo.toml +++ b/parquet_file/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] # In alphabetical order arrow = { version = "4.0", features = ["prettyprint"] } bytes = "1.0" +chrono = "0.4" data_types = { path = "../data_types" } datafusion = { path = "../datafusion" } datafusion_util = { path = "../datafusion_util" } diff --git a/parquet_file/src/catalog.rs b/parquet_file/src/catalog.rs index 6b6a316d4c..ac030be46c 100644 --- a/parquet_file/src/catalog.rs +++ b/parquet_file/src/catalog.rs @@ -4,13 +4,16 @@ use std::{ hash_map::Entry::{Occupied, Vacant}, HashMap, }, + convert::TryInto, fmt::{Debug, Display}, + num::TryFromIntError, str::FromStr, sync::Arc, }; use crate::metadata::{parquet_metadata_to_thrift, thrift_to_parquet_metadata}; use bytes::Bytes; +use chrono::{DateTime, Utc}; use data_types::server_id::ServerId; use futures::TryStreamExt; use generated_types::influxdata::iox::catalog::v1 as proto; @@ -29,7 +32,7 @@ use uuid::Uuid; /// Current version for serialized transactions. /// /// For breaking changes, this will change. -pub const TRANSACTION_VERSION: u32 = 1; +pub const TRANSACTION_VERSION: u32 = 2; /// File suffix for transaction files in object store. pub const TRANSACTION_FILE_SUFFIX: &str = "txn"; @@ -164,6 +167,12 @@ pub enum Error { #[snafu(display("Catalog already exists"))] AlreadyExists {}, + + #[snafu(display("Internal: Datetime required but missing in serialized catalog"))] + DateTimeRequired {}, + + #[snafu(display("Internal: Cannot parse datetime in serialized catalog: {}", source))] + DateTimeParseError { source: TryFromIntError }, } pub type Result = std::result::Result; @@ -209,6 +218,35 @@ pub trait CatalogState { fn remove(&self, path: DirsAndFileName) -> Result<()>; } +/// Find last transaction-start-timestamp. +/// +/// This method is designed to read and verify as little as possible and should also work on most broken catalogs. +pub async fn find_last_transaction_timestamp( + object_store: &ObjectStore, + server_id: ServerId, + db_name: &str, +) -> Result>> { + let mut res = None; + for (path, _revision_counter, _uuid) in + list_transaction_files(object_store, server_id, db_name).await? + { + match load_transaction_proto(object_store, &path).await { + Ok(proto) => match parse_timestamp(&proto.start_timestamp) { + Ok(ts) => { + res = Some(res.map_or(ts, |res: DateTime| res.max(ts))); + } + Err(e) => warn!(%e, ?path, "Cannot parse timestamp"), + }, + Err(e @ Error::Read { .. }) => { + // bubble up IO error + return Err(e); + } + Err(e) => warn!(%e, ?path, "Cannot read transaction"), + } + } + Ok(res) +} + /// Inner mutable part of the preserved catalog. struct PreservedCatalogInner where @@ -645,6 +683,16 @@ fn unparse_dirs_and_filename(path: &DirsAndFileName) -> proto::Path { } } +/// Parse timestamp from protobuf. +fn parse_timestamp( + ts: &Option, +) -> Result> { + let ts: generated_types::google::protobuf::Timestamp = + ts.as_ref().context(DateTimeRequired)?.clone(); + let ts: DateTime = ts.try_into().context(DateTimeParseError)?; + Ok(ts) +} + /// Key to address transactions. #[derive(Clone, Debug)] struct TransactionKey { @@ -686,6 +734,7 @@ where uuid: uuid.to_string(), revision_counter, previous_uuid, + start_timestamp: Some(Utc::now().into()), }, } } @@ -818,6 +867,8 @@ where } .fail()?; } + // verify we can parse the timestamp (checking that no error is raised) + parse_timestamp(&proto.start_timestamp)?; // apply for action in &proto.actions { @@ -1279,7 +1330,7 @@ mod tests { .await; assert_eq!( res.unwrap_err().to_string(), - "Format version of transaction file for revision 2 is 42 but only [1] are supported" + format!("Format version of transaction file for revision 2 is 42 but only [{}] are supported", TRANSACTION_VERSION) ); } @@ -1499,6 +1550,42 @@ mod tests { ); } + #[tokio::test] + async fn test_broken_protobuf() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; + + // break transaction file + assert!(trace.tkeys.len() >= 2); + let tkey = &trace.tkeys[0]; + let path = transaction_path(&object_store, server_id, db_name, tkey); + let data = Bytes::from("foo"); + let len = data.len(); + object_store + .put( + &path, + futures::stream::once(async move { Ok(data) }), + Some(len), + ) + .await + .unwrap(); + + // loading catalog should fail now + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; + assert_eq!( + res.unwrap_err().to_string(), + "Error during deserialization: failed to decode Protobuf message: invalid wire type value: 6" + ); + } + #[tokio::test] async fn test_transaction_handle_debug() { let object_store = make_object_store(); @@ -1599,6 +1686,71 @@ mod tests { ); } + #[tokio::test] + async fn test_missing_start_timestamp() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; + + // break transaction file + assert!(trace.tkeys.len() >= 2); + let tkey = &trace.tkeys[0]; + let path = transaction_path(&object_store, server_id, db_name, tkey); + let mut proto = load_transaction_proto(&object_store, &path).await.unwrap(); + proto.start_timestamp = None; + store_transaction_proto(&object_store, &path, &proto) + .await + .unwrap(); + + // loading catalog should fail now + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; + assert_eq!( + res.unwrap_err().to_string(), + "Internal: Datetime required but missing in serialized catalog" + ); + } + + #[tokio::test] + async fn test_broken_start_timestamp() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; + + // break transaction file + assert!(trace.tkeys.len() >= 2); + let tkey = &trace.tkeys[0]; + let path = transaction_path(&object_store, server_id, db_name, tkey); + let mut proto = load_transaction_proto(&object_store, &path).await.unwrap(); + proto.start_timestamp = Some(generated_types::google::protobuf::Timestamp { + seconds: 0, + nanos: -1, + }); + store_transaction_proto(&object_store, &path, &proto) + .await + .unwrap(); + + // loading catalog should fail now + let res = PreservedCatalog::::load( + Arc::clone(&object_store), + server_id, + db_name.to_string(), + (), + ) + .await; + assert_eq!( + res.unwrap_err().to_string(), + "Internal: Cannot parse datetime in serialized catalog: out of range integral type conversion attempted" + ); + } + /// Get sorted list of catalog files from state fn get_catalog_parquet_files(state: &TestCatalogState) -> Vec<(String, ParquetMetaData)> { let mut files: Vec<(String, ParquetMetaData)> = state @@ -1676,6 +1828,7 @@ mod tests { struct TestTrace { tkeys: Vec, states: Vec, + post_timestamps: Vec>, } impl TestTrace { @@ -1683,6 +1836,7 @@ mod tests { Self { tkeys: vec![], states: vec![], + post_timestamps: vec![], } } @@ -1690,6 +1844,7 @@ mod tests { self.tkeys .push(catalog.inner.read().previous_tkey.clone().unwrap()); self.states.push(catalog.state().deref().clone()); + self.post_timestamps.push(Utc::now()); } } @@ -1996,6 +2151,135 @@ mod tests { assert_eq!(t.uuid(), Uuid::nil()); } + #[tokio::test] + async fn test_find_last_transaction_timestamp_ok() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; + + let ts = find_last_transaction_timestamp(&object_store, server_id, db_name) + .await + .unwrap() + .unwrap(); + + // last trace entry is an aborted transaction, so the valid transaction timestamp is the third last + let second_last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 3]; + assert!( + ts > second_last_committed_end_ts, + "failed: last start ts ({}) > second last committed end ts ({})", + ts, + second_last_committed_end_ts + ); + + let last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 2]; + assert!( + ts < last_committed_end_ts, + "failed: last start ts ({}) < last committed end ts ({})", + ts, + last_committed_end_ts + ); + } + + #[tokio::test] + async fn test_find_last_transaction_timestamp_empty() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + + assert!( + find_last_transaction_timestamp(&object_store, server_id, db_name) + .await + .unwrap() + .is_none() + ); + } + + #[tokio::test] + async fn test_find_last_transaction_timestamp_datetime_broken() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; + + // break transaction file + assert!(trace.tkeys.len() >= 2); + let tkey = &trace.tkeys[0]; + let path = transaction_path(&object_store, server_id, db_name, tkey); + let mut proto = load_transaction_proto(&object_store, &path).await.unwrap(); + proto.start_timestamp = None; + store_transaction_proto(&object_store, &path, &proto) + .await + .unwrap(); + + let ts = find_last_transaction_timestamp(&object_store, server_id, db_name) + .await + .unwrap() + .unwrap(); + + // last trace entry is an aborted transaction, so the valid transaction timestamp is the third last + let second_last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 3]; + assert!( + ts > second_last_committed_end_ts, + "failed: last start ts ({}) > second last committed end ts ({})", + ts, + second_last_committed_end_ts + ); + + let last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 2]; + assert!( + ts < last_committed_end_ts, + "failed: last start ts ({}) < last committed end ts ({})", + ts, + last_committed_end_ts + ); + } + + #[tokio::test] + async fn test_find_last_transaction_timestamp_protobuf_broken() { + let object_store = make_object_store(); + let server_id = make_server_id(); + let db_name = "db1"; + let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await; + + // break transaction file + assert!(trace.tkeys.len() >= 2); + let tkey = &trace.tkeys[0]; + let path = transaction_path(&object_store, server_id, db_name, tkey); + let data = Bytes::from("foo"); + let len = data.len(); + object_store + .put( + &path, + futures::stream::once(async move { Ok(data) }), + Some(len), + ) + .await + .unwrap(); + + let ts = find_last_transaction_timestamp(&object_store, server_id, db_name) + .await + .unwrap() + .unwrap(); + + // last trace entry is an aborted transaction, so the valid transaction timestamp is the third last + let second_last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 3]; + assert!( + ts > second_last_committed_end_ts, + "failed: last start ts ({}) > second last committed end ts ({})", + ts, + second_last_committed_end_ts + ); + + let last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 2]; + assert!( + ts < last_committed_end_ts, + "failed: last start ts ({}) < last committed end ts ({})", + ts, + last_committed_end_ts + ); + } + async fn assert_catalog_roundtrip_works( object_store: &Arc, server_id: ServerId,