Merge pull request #8553 from influxdata/dom/partition-id-proto
feat: proto serialisation of TransitionPartitionIdpull/24376/head
commit
cdd20a2337
|
@ -98,6 +98,61 @@ impl TransitionPartitionId {
|
|||
}
|
||||
}
|
||||
|
||||
/// Errors deserialising protobuf representations of [`TransitionPartitionId`].
|
||||
#[derive(Debug, Error)]
|
||||
pub enum PartitionIdProtoError {
|
||||
/// The proto type does not contain an ID.
|
||||
#[error("no id specified for partition id")]
|
||||
NoId,
|
||||
|
||||
/// The specified hash ID is invalid.
|
||||
#[error(transparent)]
|
||||
InvalidHashId(#[from] PartitionHashIdError),
|
||||
}
|
||||
|
||||
/// Serialise a [`TransitionPartitionId`] to a protobuf representation.
|
||||
impl From<TransitionPartitionId>
|
||||
for generated_types::influxdata::iox::catalog::v1::PartitionIdentifier
|
||||
{
|
||||
fn from(value: TransitionPartitionId) -> Self {
|
||||
use generated_types::influxdata::iox::catalog::v1 as proto;
|
||||
match value {
|
||||
TransitionPartitionId::Deprecated(id) => proto::PartitionIdentifier {
|
||||
id: Some(proto::partition_identifier::Id::CatalogId(id.get())),
|
||||
},
|
||||
TransitionPartitionId::Deterministic(hash) => proto::PartitionIdentifier {
|
||||
id: Some(proto::partition_identifier::Id::HashId(
|
||||
hash.as_bytes().to_owned(),
|
||||
)),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Deserialise a [`TransitionPartitionId`] from a protobuf representation.
|
||||
impl TryFrom<generated_types::influxdata::iox::catalog::v1::PartitionIdentifier>
|
||||
for TransitionPartitionId
|
||||
{
|
||||
type Error = PartitionIdProtoError;
|
||||
|
||||
fn try_from(
|
||||
value: generated_types::influxdata::iox::catalog::v1::PartitionIdentifier,
|
||||
) -> Result<Self, Self::Error> {
|
||||
use generated_types::influxdata::iox::catalog::v1 as proto;
|
||||
|
||||
let id = value.id.ok_or(PartitionIdProtoError::NoId)?;
|
||||
|
||||
Ok(match id {
|
||||
proto::partition_identifier::Id::CatalogId(v) => {
|
||||
TransitionPartitionId::Deprecated(PartitionId::new(v))
|
||||
}
|
||||
proto::partition_identifier::Id::HashId(hash) => {
|
||||
TransitionPartitionId::Deterministic(PartitionHashId::try_from(hash.as_slice())?)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Unique ID for a `Partition`
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, sqlx::Type, sqlx::FromRow)]
|
||||
#[sqlx(transparent)]
|
||||
|
@ -477,6 +532,8 @@ impl Partition {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
use assert_matches::assert_matches;
|
||||
use proptest::{prelude::*, proptest};
|
||||
|
||||
/// A fixture test asserting the deterministic partition ID generation
|
||||
|
@ -496,14 +553,27 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
prop_compose! {
|
||||
/// Return an arbitrary [`TransitionPartitionId`] with a randomised ID
|
||||
/// value.
|
||||
fn arbitrary_partition_id()(
|
||||
use_hash in any::<bool>(),
|
||||
row_id in any::<i64>(),
|
||||
hash_id in any::<[u8; PARTITION_HASH_ID_SIZE_BYTES]>()
|
||||
) -> TransitionPartitionId {
|
||||
match use_hash {
|
||||
true => TransitionPartitionId::Deterministic(PartitionHashId(hash_id.into())),
|
||||
false => TransitionPartitionId::Deprecated(PartitionId::new(row_id)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
proptest! {
|
||||
#[test]
|
||||
fn partition_hash_id_representations(
|
||||
table_id in 0..i64::MAX,
|
||||
partition_key in any::<String>(),
|
||||
partition_key in ".+",
|
||||
) {
|
||||
prop_assume!(!partition_key.is_empty());
|
||||
|
||||
let table_id = TableId::new(table_id);
|
||||
let partition_key = PartitionKey::from(partition_key);
|
||||
|
||||
|
@ -536,5 +606,47 @@ mod tests {
|
|||
let from_string = PartitionHashId::try_from(&bytes_from_string[..]).unwrap();
|
||||
assert_eq!(from_string, partition_hash_id);
|
||||
}
|
||||
|
||||
/// Assert a [`TransitionPartitionId`] is round-trippable through proto
|
||||
/// serialisation.
|
||||
#[test]
|
||||
fn prop_partition_id_proto_round_trip(id in arbitrary_partition_id()) {
|
||||
use generated_types::influxdata::iox::catalog::v1 as proto;
|
||||
|
||||
// Encoding is infallible
|
||||
let encoded = proto::PartitionIdentifier::from(id.clone());
|
||||
|
||||
// Decoding a valid ID is infallible.
|
||||
let decoded = TransitionPartitionId::try_from(encoded).unwrap();
|
||||
|
||||
// The deserialised value must match the input (round trippable)
|
||||
assert_eq!(decoded, id);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_proto_no_id() {
|
||||
use generated_types::influxdata::iox::catalog::v1 as proto;
|
||||
|
||||
let msg = proto::PartitionIdentifier { id: None };
|
||||
|
||||
assert_matches!(
|
||||
TransitionPartitionId::try_from(msg),
|
||||
Err(PartitionIdProtoError::NoId)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_proto_bad_hash() {
|
||||
use generated_types::influxdata::iox::catalog::v1 as proto;
|
||||
|
||||
let msg = proto::PartitionIdentifier {
|
||||
id: Some(proto::partition_identifier::Id::HashId(vec![42])),
|
||||
};
|
||||
|
||||
assert_matches!(
|
||||
TransitionPartitionId::try_from(msg),
|
||||
Err(PartitionIdProtoError::InvalidHashId(_))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue