feat: Implement deserializing IoxMetadata from protobuf (#3589)
Fixes #3587. Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
8c5f12cd99
commit
62a2ad289b
|
@ -243,6 +243,9 @@ pub enum Error {
|
||||||
CannotDecodeChunkId {
|
CannotDecodeChunkId {
|
||||||
source: data_types::chunk_metadata::ChunkIdConversionError,
|
source: data_types::chunk_metadata::ChunkIdConversionError,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#[snafu(display("Cannot parse UUID: {}", source))]
|
||||||
|
UuidParse { source: uuid::Error },
|
||||||
}
|
}
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
|
||||||
|
@ -532,12 +535,72 @@ impl IoxMetadata {
|
||||||
Ok(buf)
|
Ok(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Read from protobuf message
|
||||||
|
#[allow(dead_code)]
|
||||||
|
fn from_protobuf(data: &[u8]) -> Result<Self> {
|
||||||
|
// extract protobuf message from bytes
|
||||||
|
let proto_msg = proto::IoxMetadata::decode(data)
|
||||||
|
.map_err(|err| Box::new(err) as _)
|
||||||
|
.context(IoxMetadataBrokenSnafu)?;
|
||||||
|
|
||||||
|
// extract creation timestamp
|
||||||
|
let creation_timestamp =
|
||||||
|
decode_timestamp_from_field(proto_msg.creation_timestamp, "creation_timestamp")?;
|
||||||
|
// extract time of first write
|
||||||
|
let time_of_first_write =
|
||||||
|
decode_timestamp_from_field(proto_msg.time_of_first_write, "time_of_first_write")?;
|
||||||
|
// extract time of last write
|
||||||
|
let time_of_last_write =
|
||||||
|
decode_timestamp_from_field(proto_msg.time_of_last_write, "time_of_last_write")?;
|
||||||
|
|
||||||
|
// extract strings
|
||||||
|
let namespace_name = Arc::from(proto_msg.namespace_name.as_ref());
|
||||||
|
let table_name = Arc::from(proto_msg.table_name.as_ref());
|
||||||
|
let partition_key = Arc::from(proto_msg.partition_key.as_ref());
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
object_store_id: parse_uuid(&proto_msg.object_store_id)?.ok_or_else(|| {
|
||||||
|
Error::IoxMetadataFieldMissing {
|
||||||
|
field: "object_store_id".to_string(),
|
||||||
|
}
|
||||||
|
})?,
|
||||||
|
creation_timestamp,
|
||||||
|
namespace_id: NamespaceId::new(proto_msg.namespace_id),
|
||||||
|
namespace_name,
|
||||||
|
sequencer_id: SequencerId::new(
|
||||||
|
proto_msg
|
||||||
|
.sequencer_id
|
||||||
|
.try_into()
|
||||||
|
.map_err(|err| Box::new(err) as _)
|
||||||
|
.context(IoxMetadataBrokenSnafu)?,
|
||||||
|
),
|
||||||
|
table_id: TableId::new(proto_msg.table_id),
|
||||||
|
table_name,
|
||||||
|
partition_id: PartitionId::new(proto_msg.partition_id),
|
||||||
|
partition_key,
|
||||||
|
time_of_first_write,
|
||||||
|
time_of_last_write,
|
||||||
|
min_sequence_number: SequenceNumber::new(proto_msg.min_sequence_number),
|
||||||
|
max_sequence_number: SequenceNumber::new(proto_msg.max_sequence_number),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// verify uuid
|
/// verify uuid
|
||||||
pub fn match_object_store_id(&self, uuid: Uuid) -> bool {
|
pub fn match_object_store_id(&self, uuid: Uuid) -> bool {
|
||||||
uuid == self.object_store_id
|
uuid == self.object_store_id
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Parse big-endian UUID from protobuf.
|
||||||
|
pub fn parse_uuid(bytes: &[u8]) -> Result<Option<Uuid>> {
|
||||||
|
if bytes.is_empty() {
|
||||||
|
Ok(None)
|
||||||
|
} else {
|
||||||
|
let uuid = Uuid::from_slice(bytes).context(UuidParseSnafu {})?;
|
||||||
|
Ok(Some(uuid))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn decode_timestamp_from_field(
|
fn decode_timestamp_from_field(
|
||||||
value: Option<pbjson_types::Timestamp>,
|
value: Option<pbjson_types::Timestamp>,
|
||||||
field: &'static str,
|
field: &'static str,
|
||||||
|
@ -1111,4 +1174,30 @@ mod tests {
|
||||||
let parquet_metadata = chunk.parquet_metadata();
|
let parquet_metadata = chunk.parquet_metadata();
|
||||||
assert_eq!(parquet_metadata.size(), 3719);
|
assert_eq!(parquet_metadata.size(), 3719);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn iox_metadata_protobuf_round_trip() {
|
||||||
|
let object_store_id = Uuid::new_v4();
|
||||||
|
let iox_metadata = IoxMetadata {
|
||||||
|
object_store_id,
|
||||||
|
creation_timestamp: Time::from_timestamp(3234, 0),
|
||||||
|
namespace_id: NamespaceId::new(2),
|
||||||
|
namespace_name: Arc::from("hi"),
|
||||||
|
sequencer_id: SequencerId::new(1),
|
||||||
|
table_id: TableId::new(3),
|
||||||
|
table_name: Arc::from("weather"),
|
||||||
|
partition_id: PartitionId::new(4),
|
||||||
|
partition_key: Arc::from("part"),
|
||||||
|
time_of_first_write: Time::from_timestamp(3234, 0),
|
||||||
|
time_of_last_write: Time::from_timestamp(3234, 3456),
|
||||||
|
min_sequence_number: SequenceNumber::new(5),
|
||||||
|
max_sequence_number: SequenceNumber::new(6),
|
||||||
|
};
|
||||||
|
|
||||||
|
let proto = iox_metadata.to_protobuf().unwrap();
|
||||||
|
|
||||||
|
let iox_metadata_again = IoxMetadata::from_protobuf(&proto).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(iox_metadata, iox_metadata_again);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue