diff --git a/Cargo.lock b/Cargo.lock index 556b5fb77d..ebc711939a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6991,6 +6991,7 @@ version = "0.1.0" dependencies = [ "base64 0.13.0", "data_types", + "data_types2", "dml", "generated_types", "serde_json", diff --git a/generated_types/protos/influxdata/iox/write_summary/v1/write_summary.proto b/generated_types/protos/influxdata/iox/write_summary/v1/write_summary.proto index f539001e15..3ea7b49140 100644 --- a/generated_types/protos/influxdata/iox/write_summary/v1/write_summary.proto +++ b/generated_types/protos/influxdata/iox/write_summary/v1/write_summary.proto @@ -11,8 +11,8 @@ message WriteSummary { // Per sequencer information aout what sequence numbers contain part of a write message SequencerWrite { // Unique sequencer ID. - uint32 sequencer_id = 1; + int32 sequencer_id = 1; // Which sequence numbers for this sequencer had data - repeated uint64 sequence_numbers = 13; + repeated int64 sequence_numbers = 2; } diff --git a/write_summary/Cargo.toml b/write_summary/Cargo.toml index d776e6e2a5..351bdfccec 100644 --- a/write_summary/Cargo.toml +++ b/write_summary/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] # Workspace dependencies, in alphabetical order +data_types2 = { path = "../data_types2" } dml = { path = "../dml" } generated_types = { path = "../generated_types" } diff --git a/write_summary/src/lib.rs b/write_summary/src/lib.rs index 3dfc2ab33c..f155b2ac78 100644 --- a/write_summary/src/lib.rs +++ b/write_summary/src/lib.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; +use data_types2::{SequenceNumber, SequencerId}; /// Protobuf to/from conversion use generated_types::influxdata::iox::write_summary::v1 as proto; @@ -14,15 +15,37 @@ use dml::DmlMeta; /// /// This struct contains sufficient information to determine the /// current state of the write as a whole -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone, PartialEq)] /// Summary of a Vec> pub struct WriteSummary { - metas: Vec>, + /// Key is the sequencer, value is the sequence numbers from that sequencer. + /// Note: BTreeMap to ensure the output is in a consistent order + sequencers: BTreeMap>, } impl WriteSummary { pub fn new(metas: Vec>) -> Self { - Self { metas } + let sequences = metas + .iter() + .flat_map(|v| v.iter()) + .filter_map(|meta| meta.sequence()); + + let mut sequencers = BTreeMap::new(); + for s in sequences { + let sequencer_id: i16 = s.sequencer_id.try_into().expect("Invalid sequencer id"); + + let sequence_number: i64 = s + .sequence_number + .try_into() + .expect("Invalid sequencer number"); + + sequencers + .entry(SequencerId::new(sequencer_id)) + .or_insert_with(Vec::new) + .push(SequenceNumber::new(sequence_number)) + } + + Self { sequencers } } /// Return an opaque summary "token" of this summary @@ -33,31 +56,37 @@ impl WriteSummary { .expect("unexpected error serializing token to json"), ) } + + /// Return a WriteSummary from the "token" (created with [to_token]), or error if not possible + pub fn try_from_token(token: &str) -> Result { + let data = base64::decode(token) + .map_err(|e| format!("Invalid write token, invalid base64: {}", e))?; + + let json = String::from_utf8(data) + .map_err(|e| format!("Invalid write token, non utf8 data in write token: {}", e))?; + + let proto = serde_json::from_str::(&json) + .map_err(|e| format!("Invalid write token, protobuf decode error: {}", e))?; + + proto + .try_into() + .map_err(|e| format!("Invalid write token, invalid content: {}", e)) + } + + /// return what sequencer ids were present in this write summary + pub fn sequencer_ids(&self) -> Vec { + self.sequencers.keys().cloned().collect() + } } impl From for proto::WriteSummary { fn from(summary: WriteSummary) -> Self { - // create a map from sequencer_id to sequences - let sequences = summary - .metas - .iter() - .flat_map(|v| v.iter()) - .filter_map(|meta| meta.sequence()); - - // Use BTreeMap to ensure consistent output - let mut sequencers = BTreeMap::new(); - for s in sequences { - sequencers - .entry(s.sequencer_id) - .or_insert_with(Vec::new) - .push(s.sequence_number) - } - - let sequencers = sequencers + let sequencers = summary + .sequencers .into_iter() .map(|(sequencer_id, sequence_numbers)| proto::SequencerWrite { - sequencer_id, - sequence_numbers, + sequencer_id: sequencer_id.get() as i32, + sequence_numbers: sequence_numbers.into_iter().map(|v| v.get()).collect(), }) .collect(); @@ -65,6 +94,36 @@ impl From for proto::WriteSummary { } } +impl TryFrom for WriteSummary { + type Error = String; + + fn try_from(summary: proto::WriteSummary) -> Result { + let sequencers = summary + .sequencers + .into_iter() + .map( + |proto::SequencerWrite { + sequencer_id, + sequence_numbers, + }| { + let sequencer_id = sequencer_id.try_into().map_err(|e| { + format!("Invalid sequencer id {} in proto: {}", sequencer_id, e) + })?; + + let sequence_numbers = sequence_numbers + .into_iter() + .map(SequenceNumber::new) + .collect::>(); + + Ok((SequencerId::new(sequencer_id), sequence_numbers)) + }, + ) + .collect::, String>>()?; + + Ok(Self { sequencers }) + } +} + #[cfg(test)] mod tests { use super::*; @@ -188,6 +247,57 @@ mod tests { ); } + #[test] + fn token_parsing() { + let metas = vec![vec![make_meta(Sequence::new(1, 2))]]; + let summary = WriteSummary::new(metas); + + let token = summary.clone().to_token(); + + // round trip should parse to the same summary + let new_summary = WriteSummary::try_from_token(&token).expect("parsing successful"); + assert_eq!(summary, new_summary); + } + + #[test] + #[should_panic(expected = "Invalid write token, invalid base64")] + fn token_parsing_bad_base64() { + let token = "foo%%"; + WriteSummary::try_from_token(token).unwrap(); + } + + #[test] + #[should_panic(expected = "Invalid write token, non utf8 data in write token")] + fn token_parsing_bad_utf8() { + let token = base64::encode(vec![0xa0, 0xa1]); + WriteSummary::try_from_token(&token).unwrap(); + } + + #[test] + #[should_panic(expected = "Invalid write token, protobuf decode error: key must be a string")] + fn token_parsing_bad_proto() { + let token = base64::encode("{not_valid_json}"); + WriteSummary::try_from_token(&token).unwrap(); + } + + #[test] + #[should_panic( + expected = "Invalid write token, invalid content: Invalid sequencer id 2147483647 in proto" + )] + fn token_parsing_bad_sequencer() { + // construct a message with sequencer id that can not be converted into i16 + let bad_proto = proto::WriteSummary { + sequencers: vec![proto::SequencerWrite { + sequencer_id: i32::MAX, + sequence_numbers: vec![2], + }], + }; + + let token = base64::encode(serde_json::to_string(&bad_proto).unwrap()); + + WriteSummary::try_from_token(&token).unwrap(); + } + fn make_meta(s: Sequence) -> DmlMeta { use time::TimeProvider; let time_provider = time::SystemProvider::new();