From 4c2945719a2f0d261ea8d196a7bb6e19a96c4a89 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Wed, 23 Aug 2023 15:35:47 +0200 Subject: [PATCH] feat: proto serialisation of ParquetFile Adds conversion functions to serialise a ParquetFile into a protobuf representation, and back again. Adds randomised testing to assert round-trip equality. --- data_types/src/lib.rs | 151 +++++++++++++++++++++++++++++++++++- data_types/src/partition.rs | 4 +- 2 files changed, 152 insertions(+), 3 deletions(-) diff --git a/data_types/src/lib.rs b/data_types/src/lib.rs index e002581ac7..c1f629b2f4 100644 --- a/data_types/src/lib.rs +++ b/data_types/src/lib.rs @@ -15,6 +15,7 @@ unused_crate_dependencies )] +use thiserror::Error; // Workaround for "unused crate" lint false positives. use workspace_hack as _; @@ -639,6 +640,79 @@ impl ParquetFile { } } +impl From for generated_types::influxdata::iox::catalog::v1::ParquetFile { + fn from(v: ParquetFile) -> Self { + Self { + id: v.id.get(), + namespace_id: v.namespace_id.get(), + table_id: v.table_id.get(), + partition_identifier: Some(v.partition_id.into()), + object_store_id: v.object_store_id.to_string(), + min_time: v.min_time.get(), + max_time: v.max_time.get(), + to_delete: v.to_delete.map(|v| v.get()), + file_size_bytes: v.file_size_bytes, + row_count: v.row_count, + compaction_level: v.compaction_level as i32, + created_at: v.created_at.get(), + column_set: v.column_set.iter().map(|v| v.get()).collect(), + max_l0_created_at: v.max_l0_created_at.get(), + } + } +} + +/// Errors deserialising a protobuf serialised [`ParquetFile`]. +#[derive(Debug, Error)] +pub enum ParquetFileProtoError { + /// The proto type does not contain a partition ID. + #[error("no partition id specified for parquet file")] + NoPartitionId, + + /// The specified partition ID is invalid. + #[error(transparent)] + InvalidPartitionId(#[from] PartitionIdProtoError), + + /// The specified object store UUID is invalid. + #[error("invalid object store ID: {0}")] + InvalidObjectStoreId(uuid::Error), + + /// The specified compaction level value is invalid. + #[error("invalid compaction level: {0}")] + InvalidCompactionLevel(Box), +} + +impl TryFrom for ParquetFile { + type Error = ParquetFileProtoError; + + fn try_from( + v: generated_types::influxdata::iox::catalog::v1::ParquetFile, + ) -> Result { + Ok(Self { + id: ParquetFileId::new(v.id), + namespace_id: NamespaceId::new(v.namespace_id), + table_id: TableId::new(v.table_id), + partition_id: TransitionPartitionId::try_from( + v.partition_identifier + .ok_or(ParquetFileProtoError::NoPartitionId)?, + )?, + object_store_id: v + .object_store_id + .parse() + .map_err(ParquetFileProtoError::InvalidObjectStoreId)?, + min_time: Timestamp::new(v.min_time), + max_time: Timestamp::new(v.max_time), + to_delete: v.to_delete.map(Timestamp::new), + file_size_bytes: v.file_size_bytes, + row_count: v.row_count, + compaction_level: CompactionLevel::try_from(v.compaction_level) + .map_err(ParquetFileProtoError::InvalidCompactionLevel)?, + created_at: Timestamp::new(v.created_at), + column_set: ColumnSet::new(v.column_set.into_iter().map(ColumnId::new)), + max_l0_created_at: Timestamp::new(v.max_l0_created_at), + }) + } +} + /// Data for a parquet file to be inserted into the catalog. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ParquetFileParams { @@ -1624,10 +1698,12 @@ pub struct FileRange { #[cfg(test)] mod tests { + use super::*; + use std::borrow::Cow; - use super::*; use ordered_float::OrderedFloat; + use proptest::{prelude::*, proptest}; #[test] fn test_chunk_id_new() { @@ -2635,4 +2711,77 @@ mod tests { assert_eq!(tr.start(), 1); assert_eq!(tr.end(), 1); } + + use crate::partition::tests::arbitrary_partition_id; + + prop_compose! { + /// Return an arbitrary [`Timestamp`]. + pub fn arbitrary_timestamp()(value in any::()) -> Timestamp { + Timestamp::new(value) + } + } + + fn arbitrary_compaction_level() -> impl prop::strategy::Strategy { + prop_oneof![ + Just(CompactionLevel::Initial), + Just(CompactionLevel::FileNonOverlapped), + Just(CompactionLevel::Final), + ] + } + + prop_compose! { + /// Return an arbitrary [`ParquetFile`] with a randomised values. + fn arbitrary_parquet_file()( + partition_id in arbitrary_partition_id(), + parquet_file_id in any::(), + namespace_id in any::(), + table_id in any::(), + min_time in arbitrary_timestamp(), + max_time in arbitrary_timestamp(), + to_delete in prop::option::of(arbitrary_timestamp()), + file_size_bytes in any::(), + row_count in any::(), + compaction_level in arbitrary_compaction_level(), + created_at in arbitrary_timestamp(), + column_set in prop::collection::vec(any::(), 0..10), + max_l0_created_at in arbitrary_timestamp(), + ) -> ParquetFile { + let column_set = ColumnSet::new(column_set.into_iter().map(ColumnId::new)); + + ParquetFile { + id: ParquetFileId::new(parquet_file_id), + namespace_id: NamespaceId::new(namespace_id), + table_id: TableId::new(table_id), + partition_id, + object_store_id: Uuid::new_v4(), + min_time, + max_time, + to_delete, + file_size_bytes, + row_count, + compaction_level, + created_at, + column_set, + max_l0_created_at, + } + } + } + + proptest! { + /// Assert a [`ParquetFile`] is round-trippable through proto + /// serialisation. + #[test] + fn prop_parquet_file_proto_round_trip(file in arbitrary_parquet_file()) { + use generated_types::influxdata::iox::catalog::v1 as proto; + + // Encoding is infallible + let encoded = proto::ParquetFile::from(file.clone()); + + // Decoding a valid proto ParquetFile is infallible. + let decoded = ParquetFile::try_from(encoded).unwrap(); + + // The deserialised value must match the input (round trippable) + assert_eq!(decoded, file); + } + } } diff --git a/data_types/src/partition.rs b/data_types/src/partition.rs index b1d8750649..21b54642cd 100644 --- a/data_types/src/partition.rs +++ b/data_types/src/partition.rs @@ -530,7 +530,7 @@ impl Partition { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use super::*; use assert_matches::assert_matches; @@ -556,7 +556,7 @@ mod tests { prop_compose! { /// Return an arbitrary [`TransitionPartitionId`] with a randomised ID /// value. - fn arbitrary_partition_id()( + pub fn arbitrary_partition_id()( use_hash in any::(), row_id in any::(), hash_id in any::<[u8; PARTITION_HASH_ID_SIZE_BYTES]>()