feat: Add WriteSummary serialization and deserialization to protobuf (#4232)

* feat: Add WriteSummary serialization and deserialization to protobuf

* fix: clippy

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-04-05 05:57:32 -04:00 committed by GitHub
parent 756116b497
commit 5d66cd0a81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 136 additions and 24 deletions

1
Cargo.lock generated
View File

@ -6991,6 +6991,7 @@ version = "0.1.0"
dependencies = [
"base64 0.13.0",
"data_types",
"data_types2",
"dml",
"generated_types",
"serde_json",

View File

@ -11,8 +11,8 @@ message WriteSummary {
// Per sequencer information aout what sequence numbers contain part of a write
message SequencerWrite {
// Unique sequencer ID.
uint32 sequencer_id = 1;
int32 sequencer_id = 1;
// Which sequence numbers for this sequencer had data
repeated uint64 sequence_numbers = 13;
repeated int64 sequence_numbers = 2;
}

View File

@ -7,6 +7,7 @@ edition = "2021"
[dependencies]
# Workspace dependencies, in alphabetical order
data_types2 = { path = "../data_types2" }
dml = { path = "../dml" }
generated_types = { path = "../generated_types" }

View File

@ -1,5 +1,6 @@
use std::collections::BTreeMap;
use data_types2::{SequenceNumber, SequencerId};
/// Protobuf to/from conversion
use generated_types::influxdata::iox::write_summary::v1 as proto;
@ -14,15 +15,37 @@ use dml::DmlMeta;
///
/// This struct contains sufficient information to determine the
/// current state of the write as a whole
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone, PartialEq)]
/// Summary of a Vec<Vec<DmlMeta>>
pub struct WriteSummary {
metas: Vec<Vec<DmlMeta>>,
/// Key is the sequencer, value is the sequence numbers from that sequencer.
/// Note: BTreeMap to ensure the output is in a consistent order
sequencers: BTreeMap<SequencerId, Vec<SequenceNumber>>,
}
impl WriteSummary {
pub fn new(metas: Vec<Vec<DmlMeta>>) -> Self {
Self { metas }
let sequences = metas
.iter()
.flat_map(|v| v.iter())
.filter_map(|meta| meta.sequence());
let mut sequencers = BTreeMap::new();
for s in sequences {
let sequencer_id: i16 = s.sequencer_id.try_into().expect("Invalid sequencer id");
let sequence_number: i64 = s
.sequence_number
.try_into()
.expect("Invalid sequencer number");
sequencers
.entry(SequencerId::new(sequencer_id))
.or_insert_with(Vec::new)
.push(SequenceNumber::new(sequence_number))
}
Self { sequencers }
}
/// Return an opaque summary "token" of this summary
@ -33,31 +56,37 @@ impl WriteSummary {
.expect("unexpected error serializing token to json"),
)
}
/// Return a WriteSummary from the "token" (created with [to_token]), or error if not possible
pub fn try_from_token(token: &str) -> Result<Self, String> {
let data = base64::decode(token)
.map_err(|e| format!("Invalid write token, invalid base64: {}", e))?;
let json = String::from_utf8(data)
.map_err(|e| format!("Invalid write token, non utf8 data in write token: {}", e))?;
let proto = serde_json::from_str::<proto::WriteSummary>(&json)
.map_err(|e| format!("Invalid write token, protobuf decode error: {}", e))?;
proto
.try_into()
.map_err(|e| format!("Invalid write token, invalid content: {}", e))
}
/// return what sequencer ids were present in this write summary
pub fn sequencer_ids(&self) -> Vec<SequencerId> {
self.sequencers.keys().cloned().collect()
}
}
impl From<WriteSummary> for proto::WriteSummary {
fn from(summary: WriteSummary) -> Self {
// create a map from sequencer_id to sequences
let sequences = summary
.metas
.iter()
.flat_map(|v| v.iter())
.filter_map(|meta| meta.sequence());
// Use BTreeMap to ensure consistent output
let mut sequencers = BTreeMap::new();
for s in sequences {
sequencers
.entry(s.sequencer_id)
.or_insert_with(Vec::new)
.push(s.sequence_number)
}
let sequencers = sequencers
let sequencers = summary
.sequencers
.into_iter()
.map(|(sequencer_id, sequence_numbers)| proto::SequencerWrite {
sequencer_id,
sequence_numbers,
sequencer_id: sequencer_id.get() as i32,
sequence_numbers: sequence_numbers.into_iter().map(|v| v.get()).collect(),
})
.collect();
@ -65,6 +94,36 @@ impl From<WriteSummary> for proto::WriteSummary {
}
}
impl TryFrom<proto::WriteSummary> for WriteSummary {
type Error = String;
fn try_from(summary: proto::WriteSummary) -> Result<Self, Self::Error> {
let sequencers = summary
.sequencers
.into_iter()
.map(
|proto::SequencerWrite {
sequencer_id,
sequence_numbers,
}| {
let sequencer_id = sequencer_id.try_into().map_err(|e| {
format!("Invalid sequencer id {} in proto: {}", sequencer_id, e)
})?;
let sequence_numbers = sequence_numbers
.into_iter()
.map(SequenceNumber::new)
.collect::<Vec<_>>();
Ok((SequencerId::new(sequencer_id), sequence_numbers))
},
)
.collect::<Result<BTreeMap<_, _>, String>>()?;
Ok(Self { sequencers })
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -188,6 +247,57 @@ mod tests {
);
}
#[test]
fn token_parsing() {
let metas = vec![vec![make_meta(Sequence::new(1, 2))]];
let summary = WriteSummary::new(metas);
let token = summary.clone().to_token();
// round trip should parse to the same summary
let new_summary = WriteSummary::try_from_token(&token).expect("parsing successful");
assert_eq!(summary, new_summary);
}
#[test]
#[should_panic(expected = "Invalid write token, invalid base64")]
fn token_parsing_bad_base64() {
let token = "foo%%";
WriteSummary::try_from_token(token).unwrap();
}
#[test]
#[should_panic(expected = "Invalid write token, non utf8 data in write token")]
fn token_parsing_bad_utf8() {
let token = base64::encode(vec![0xa0, 0xa1]);
WriteSummary::try_from_token(&token).unwrap();
}
#[test]
#[should_panic(expected = "Invalid write token, protobuf decode error: key must be a string")]
fn token_parsing_bad_proto() {
let token = base64::encode("{not_valid_json}");
WriteSummary::try_from_token(&token).unwrap();
}
#[test]
#[should_panic(
expected = "Invalid write token, invalid content: Invalid sequencer id 2147483647 in proto"
)]
fn token_parsing_bad_sequencer() {
// construct a message with sequencer id that can not be converted into i16
let bad_proto = proto::WriteSummary {
sequencers: vec![proto::SequencerWrite {
sequencer_id: i32::MAX,
sequence_numbers: vec![2],
}],
};
let token = base64::encode(serde_json::to_string(&bad_proto).unwrap());
WriteSummary::try_from_token(&token).unwrap();
}
fn make_meta(s: Sequence) -> DmlMeta {
use time::TimeProvider;
let time_provider = time::SystemProvider::new();