feat: proto serialisation of ParquetFile

Adds conversion functions to serialise a ParquetFile into a protobuf
representation, and back again.

Adds randomised testing to assert round-trip equality.
pull/24376/head
Dom Dwyer 2023-08-23 15:35:47 +02:00
parent ae3f73f65e
commit 4c2945719a
No known key found for this signature in database
GPG Key ID: E4C40DBD9157879A
2 changed files with 152 additions and 3 deletions

View File

@ -15,6 +15,7 @@
unused_crate_dependencies
)]
use thiserror::Error;
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
@ -639,6 +640,79 @@ impl ParquetFile {
}
}
impl From<ParquetFile> for generated_types::influxdata::iox::catalog::v1::ParquetFile {
fn from(v: ParquetFile) -> Self {
Self {
id: v.id.get(),
namespace_id: v.namespace_id.get(),
table_id: v.table_id.get(),
partition_identifier: Some(v.partition_id.into()),
object_store_id: v.object_store_id.to_string(),
min_time: v.min_time.get(),
max_time: v.max_time.get(),
to_delete: v.to_delete.map(|v| v.get()),
file_size_bytes: v.file_size_bytes,
row_count: v.row_count,
compaction_level: v.compaction_level as i32,
created_at: v.created_at.get(),
column_set: v.column_set.iter().map(|v| v.get()).collect(),
max_l0_created_at: v.max_l0_created_at.get(),
}
}
}
/// Errors deserialising a protobuf serialised [`ParquetFile`].
#[derive(Debug, Error)]
pub enum ParquetFileProtoError {
/// The proto type does not contain a partition ID.
#[error("no partition id specified for parquet file")]
NoPartitionId,
/// The specified partition ID is invalid.
#[error(transparent)]
InvalidPartitionId(#[from] PartitionIdProtoError),
/// The specified object store UUID is invalid.
#[error("invalid object store ID: {0}")]
InvalidObjectStoreId(uuid::Error),
/// The specified compaction level value is invalid.
#[error("invalid compaction level: {0}")]
InvalidCompactionLevel(Box<dyn std::error::Error + Send + Sync + 'static>),
}
impl TryFrom<generated_types::influxdata::iox::catalog::v1::ParquetFile> for ParquetFile {
type Error = ParquetFileProtoError;
fn try_from(
v: generated_types::influxdata::iox::catalog::v1::ParquetFile,
) -> Result<Self, Self::Error> {
Ok(Self {
id: ParquetFileId::new(v.id),
namespace_id: NamespaceId::new(v.namespace_id),
table_id: TableId::new(v.table_id),
partition_id: TransitionPartitionId::try_from(
v.partition_identifier
.ok_or(ParquetFileProtoError::NoPartitionId)?,
)?,
object_store_id: v
.object_store_id
.parse()
.map_err(ParquetFileProtoError::InvalidObjectStoreId)?,
min_time: Timestamp::new(v.min_time),
max_time: Timestamp::new(v.max_time),
to_delete: v.to_delete.map(Timestamp::new),
file_size_bytes: v.file_size_bytes,
row_count: v.row_count,
compaction_level: CompactionLevel::try_from(v.compaction_level)
.map_err(ParquetFileProtoError::InvalidCompactionLevel)?,
created_at: Timestamp::new(v.created_at),
column_set: ColumnSet::new(v.column_set.into_iter().map(ColumnId::new)),
max_l0_created_at: Timestamp::new(v.max_l0_created_at),
})
}
}
/// Data for a parquet file to be inserted into the catalog.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ParquetFileParams {
@ -1624,10 +1698,12 @@ pub struct FileRange {
#[cfg(test)]
mod tests {
use super::*;
use std::borrow::Cow;
use super::*;
use ordered_float::OrderedFloat;
use proptest::{prelude::*, proptest};
#[test]
fn test_chunk_id_new() {
@ -2635,4 +2711,77 @@ mod tests {
assert_eq!(tr.start(), 1);
assert_eq!(tr.end(), 1);
}
use crate::partition::tests::arbitrary_partition_id;
prop_compose! {
/// Return an arbitrary [`Timestamp`].
pub fn arbitrary_timestamp()(value in any::<i64>()) -> Timestamp {
Timestamp::new(value)
}
}
fn arbitrary_compaction_level() -> impl prop::strategy::Strategy<Value = CompactionLevel> {
prop_oneof![
Just(CompactionLevel::Initial),
Just(CompactionLevel::FileNonOverlapped),
Just(CompactionLevel::Final),
]
}
prop_compose! {
/// Return an arbitrary [`ParquetFile`] with a randomised values.
fn arbitrary_parquet_file()(
partition_id in arbitrary_partition_id(),
parquet_file_id in any::<i64>(),
namespace_id in any::<i64>(),
table_id in any::<i64>(),
min_time in arbitrary_timestamp(),
max_time in arbitrary_timestamp(),
to_delete in prop::option::of(arbitrary_timestamp()),
file_size_bytes in any::<i64>(),
row_count in any::<i64>(),
compaction_level in arbitrary_compaction_level(),
created_at in arbitrary_timestamp(),
column_set in prop::collection::vec(any::<i64>(), 0..10),
max_l0_created_at in arbitrary_timestamp(),
) -> ParquetFile {
let column_set = ColumnSet::new(column_set.into_iter().map(ColumnId::new));
ParquetFile {
id: ParquetFileId::new(parquet_file_id),
namespace_id: NamespaceId::new(namespace_id),
table_id: TableId::new(table_id),
partition_id,
object_store_id: Uuid::new_v4(),
min_time,
max_time,
to_delete,
file_size_bytes,
row_count,
compaction_level,
created_at,
column_set,
max_l0_created_at,
}
}
}
proptest! {
/// Assert a [`ParquetFile`] is round-trippable through proto
/// serialisation.
#[test]
fn prop_parquet_file_proto_round_trip(file in arbitrary_parquet_file()) {
use generated_types::influxdata::iox::catalog::v1 as proto;
// Encoding is infallible
let encoded = proto::ParquetFile::from(file.clone());
// Decoding a valid proto ParquetFile is infallible.
let decoded = ParquetFile::try_from(encoded).unwrap();
// The deserialised value must match the input (round trippable)
assert_eq!(decoded, file);
}
}
}

View File

@ -530,7 +530,7 @@ impl Partition {
}
#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::*;
use assert_matches::assert_matches;
@ -556,7 +556,7 @@ mod tests {
prop_compose! {
/// Return an arbitrary [`TransitionPartitionId`] with a randomised ID
/// value.
fn arbitrary_partition_id()(
pub fn arbitrary_partition_id()(
use_hash in any::<bool>(),
row_id in any::<i64>(),
hash_id in any::<[u8; PARTITION_HASH_ID_SIZE_BYTES]>()