Merge pull request #1589 from influxdata/crepererum/catalog_transaction_ts

feat: preserve and read timestamps to/from preserved catalog
pull/24376/head
kodiakhq[bot] 2021-06-02 08:20:23 +00:00 committed by GitHub
commit f58a045d99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 295 additions and 2 deletions

1
Cargo.lock generated
View File

@ -2619,6 +2619,7 @@ dependencies = [
"arrow",
"arrow_util",
"bytes",
"chrono",
"data_types",
"datafusion 0.1.0",
"datafusion_util",

View File

@ -1,6 +1,8 @@
syntax = "proto3";
package influxdata.iox.catalog.v1;
import "google/protobuf/timestamp.proto";
// Path for object store interaction.
message Path {
// Directory hierarchy.
@ -69,4 +71,9 @@ message Transaction {
// UUID of last commit.
string previous_uuid = 5;
// Start timestamp.
//
// Timestamp of the start of the transaction.
google.protobuf.Timestamp start_timestamp = 6;
}

View File

@ -7,6 +7,7 @@ edition = "2018"
[dependencies] # In alphabetical order
arrow = { version = "4.0", features = ["prettyprint"] }
bytes = "1.0"
chrono = "0.4"
data_types = { path = "../data_types" }
datafusion = { path = "../datafusion" }
datafusion_util = { path = "../datafusion_util" }

View File

@ -4,13 +4,16 @@ use std::{
hash_map::Entry::{Occupied, Vacant},
HashMap,
},
convert::TryInto,
fmt::{Debug, Display},
num::TryFromIntError,
str::FromStr,
sync::Arc,
};
use crate::metadata::{parquet_metadata_to_thrift, thrift_to_parquet_metadata};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use data_types::server_id::ServerId;
use futures::TryStreamExt;
use generated_types::influxdata::iox::catalog::v1 as proto;
@ -29,7 +32,7 @@ use uuid::Uuid;
/// Current version for serialized transactions.
///
/// For breaking changes, this will change.
pub const TRANSACTION_VERSION: u32 = 1;
pub const TRANSACTION_VERSION: u32 = 2;
/// File suffix for transaction files in object store.
pub const TRANSACTION_FILE_SUFFIX: &str = "txn";
@ -164,6 +167,12 @@ pub enum Error {
#[snafu(display("Catalog already exists"))]
AlreadyExists {},
#[snafu(display("Internal: Datetime required but missing in serialized catalog"))]
DateTimeRequired {},
#[snafu(display("Internal: Cannot parse datetime in serialized catalog: {}", source))]
DateTimeParseError { source: TryFromIntError },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -209,6 +218,35 @@ pub trait CatalogState {
fn remove(&self, path: DirsAndFileName) -> Result<()>;
}
/// Find last transaction-start-timestamp.
///
/// This method is designed to read and verify as little as possible and should also work on most broken catalogs.
pub async fn find_last_transaction_timestamp(
object_store: &ObjectStore,
server_id: ServerId,
db_name: &str,
) -> Result<Option<DateTime<Utc>>> {
let mut res = None;
for (path, _revision_counter, _uuid) in
list_transaction_files(object_store, server_id, db_name).await?
{
match load_transaction_proto(object_store, &path).await {
Ok(proto) => match parse_timestamp(&proto.start_timestamp) {
Ok(ts) => {
res = Some(res.map_or(ts, |res: DateTime<Utc>| res.max(ts)));
}
Err(e) => warn!(%e, ?path, "Cannot parse timestamp"),
},
Err(e @ Error::Read { .. }) => {
// bubble up IO error
return Err(e);
}
Err(e) => warn!(%e, ?path, "Cannot read transaction"),
}
}
Ok(res)
}
/// Inner mutable part of the preserved catalog.
struct PreservedCatalogInner<S>
where
@ -645,6 +683,16 @@ fn unparse_dirs_and_filename(path: &DirsAndFileName) -> proto::Path {
}
}
/// Parse timestamp from protobuf.
fn parse_timestamp(
ts: &Option<generated_types::google::protobuf::Timestamp>,
) -> Result<DateTime<Utc>> {
let ts: generated_types::google::protobuf::Timestamp =
ts.as_ref().context(DateTimeRequired)?.clone();
let ts: DateTime<Utc> = ts.try_into().context(DateTimeParseError)?;
Ok(ts)
}
/// Key to address transactions.
#[derive(Clone, Debug)]
struct TransactionKey {
@ -686,6 +734,7 @@ where
uuid: uuid.to_string(),
revision_counter,
previous_uuid,
start_timestamp: Some(Utc::now().into()),
},
}
}
@ -818,6 +867,8 @@ where
}
.fail()?;
}
// verify we can parse the timestamp (checking that no error is raised)
parse_timestamp(&proto.start_timestamp)?;
// apply
for action in &proto.actions {
@ -1279,7 +1330,7 @@ mod tests {
.await;
assert_eq!(
res.unwrap_err().to_string(),
"Format version of transaction file for revision 2 is 42 but only [1] are supported"
format!("Format version of transaction file for revision 2 is 42 but only [{}] are supported", TRANSACTION_VERSION)
);
}
@ -1499,6 +1550,42 @@ mod tests {
);
}
#[tokio::test]
async fn test_broken_protobuf() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await;
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let path = transaction_path(&object_store, server_id, db_name, tkey);
let data = Bytes::from("foo");
let len = data.len();
object_store
.put(
&path,
futures::stream::once(async move { Ok(data) }),
Some(len),
)
.await
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await;
assert_eq!(
res.unwrap_err().to_string(),
"Error during deserialization: failed to decode Protobuf message: invalid wire type value: 6"
);
}
#[tokio::test]
async fn test_transaction_handle_debug() {
let object_store = make_object_store();
@ -1599,6 +1686,71 @@ mod tests {
);
}
#[tokio::test]
async fn test_missing_start_timestamp() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await;
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let path = transaction_path(&object_store, server_id, db_name, tkey);
let mut proto = load_transaction_proto(&object_store, &path).await.unwrap();
proto.start_timestamp = None;
store_transaction_proto(&object_store, &path, &proto)
.await
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await;
assert_eq!(
res.unwrap_err().to_string(),
"Internal: Datetime required but missing in serialized catalog"
);
}
#[tokio::test]
async fn test_broken_start_timestamp() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await;
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let path = transaction_path(&object_store, server_id, db_name, tkey);
let mut proto = load_transaction_proto(&object_store, &path).await.unwrap();
proto.start_timestamp = Some(generated_types::google::protobuf::Timestamp {
seconds: 0,
nanos: -1,
});
store_transaction_proto(&object_store, &path, &proto)
.await
.unwrap();
// loading catalog should fail now
let res = PreservedCatalog::<TestCatalogState>::load(
Arc::clone(&object_store),
server_id,
db_name.to_string(),
(),
)
.await;
assert_eq!(
res.unwrap_err().to_string(),
"Internal: Cannot parse datetime in serialized catalog: out of range integral type conversion attempted"
);
}
/// Get sorted list of catalog files from state
fn get_catalog_parquet_files(state: &TestCatalogState) -> Vec<(String, ParquetMetaData)> {
let mut files: Vec<(String, ParquetMetaData)> = state
@ -1676,6 +1828,7 @@ mod tests {
struct TestTrace {
tkeys: Vec<TransactionKey>,
states: Vec<TestCatalogState>,
post_timestamps: Vec<DateTime<Utc>>,
}
impl TestTrace {
@ -1683,6 +1836,7 @@ mod tests {
Self {
tkeys: vec![],
states: vec![],
post_timestamps: vec![],
}
}
@ -1690,6 +1844,7 @@ mod tests {
self.tkeys
.push(catalog.inner.read().previous_tkey.clone().unwrap());
self.states.push(catalog.state().deref().clone());
self.post_timestamps.push(Utc::now());
}
}
@ -1996,6 +2151,135 @@ mod tests {
assert_eq!(t.uuid(), Uuid::nil());
}
#[tokio::test]
async fn test_find_last_transaction_timestamp_ok() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await;
let ts = find_last_transaction_timestamp(&object_store, server_id, db_name)
.await
.unwrap()
.unwrap();
// last trace entry is an aborted transaction, so the valid transaction timestamp is the third last
let second_last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 3];
assert!(
ts > second_last_committed_end_ts,
"failed: last start ts ({}) > second last committed end ts ({})",
ts,
second_last_committed_end_ts
);
let last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 2];
assert!(
ts < last_committed_end_ts,
"failed: last start ts ({}) < last committed end ts ({})",
ts,
last_committed_end_ts
);
}
#[tokio::test]
async fn test_find_last_transaction_timestamp_empty() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
assert!(
find_last_transaction_timestamp(&object_store, server_id, db_name)
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn test_find_last_transaction_timestamp_datetime_broken() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await;
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let path = transaction_path(&object_store, server_id, db_name, tkey);
let mut proto = load_transaction_proto(&object_store, &path).await.unwrap();
proto.start_timestamp = None;
store_transaction_proto(&object_store, &path, &proto)
.await
.unwrap();
let ts = find_last_transaction_timestamp(&object_store, server_id, db_name)
.await
.unwrap()
.unwrap();
// last trace entry is an aborted transaction, so the valid transaction timestamp is the third last
let second_last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 3];
assert!(
ts > second_last_committed_end_ts,
"failed: last start ts ({}) > second last committed end ts ({})",
ts,
second_last_committed_end_ts
);
let last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 2];
assert!(
ts < last_committed_end_ts,
"failed: last start ts ({}) < last committed end ts ({})",
ts,
last_committed_end_ts
);
}
#[tokio::test]
async fn test_find_last_transaction_timestamp_protobuf_broken() {
let object_store = make_object_store();
let server_id = make_server_id();
let db_name = "db1";
let trace = assert_single_catalog_inmem_works(&object_store, server_id, db_name).await;
// break transaction file
assert!(trace.tkeys.len() >= 2);
let tkey = &trace.tkeys[0];
let path = transaction_path(&object_store, server_id, db_name, tkey);
let data = Bytes::from("foo");
let len = data.len();
object_store
.put(
&path,
futures::stream::once(async move { Ok(data) }),
Some(len),
)
.await
.unwrap();
let ts = find_last_transaction_timestamp(&object_store, server_id, db_name)
.await
.unwrap()
.unwrap();
// last trace entry is an aborted transaction, so the valid transaction timestamp is the third last
let second_last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 3];
assert!(
ts > second_last_committed_end_ts,
"failed: last start ts ({}) > second last committed end ts ({})",
ts,
second_last_committed_end_ts
);
let last_committed_end_ts = trace.post_timestamps[trace.post_timestamps.len() - 2];
assert!(
ts < last_committed_end_ts,
"failed: last start ts ({}) < last committed end ts ({})",
ts,
last_committed_end_ts
);
}
async fn assert_catalog_roundtrip_works(
object_store: &Arc<ObjectStore>,
server_id: ServerId,