diff --git a/internal_types/src/data.rs b/internal_types/src/data.rs index 3219ad7e90..f5eb24ef59 100644 --- a/internal_types/src/data.rs +++ b/internal_types/src/data.rs @@ -6,12 +6,17 @@ use data_types::{database_rules::Partitioner, server_id::ServerId}; use generated_types::wb; use influxdb_line_protocol::{FieldValue, ParsedLine}; -use std::{collections::BTreeMap, convert::TryFrom, fmt}; +use std::{ + collections::BTreeMap, + convert::{TryFrom, TryInto}, + fmt, +}; use chrono::Utc; use crc32fast::Hasher; use flatbuffers::FlatBufferBuilder; use ouroboros::self_referencing; +use snafu::{ResultExt, Snafu}; pub fn type_description(value: wb::ColumnValue) -> &'static str { match value { @@ -37,7 +42,6 @@ pub struct ReplicatedWrite { #[borrows(data)] #[covariant] write_buffer_batch: Option>, - server_id: ServerId, } impl ReplicatedWrite { @@ -63,7 +67,10 @@ impl ReplicatedWrite { } pub fn server_id(&self) -> ServerId { - *self.borrow_server_id() + self.fb() + .server_id() + .try_into() + .expect("ServerId should have been validated when this was built from flatbuffers") } /// Returns the serialized bytes for the write @@ -78,24 +85,46 @@ impl ReplicatedWrite { } } -impl TryFrom<(Vec, ServerId)> for ReplicatedWrite { - type Error = flatbuffers::InvalidFlatbuffer; +#[derive(Debug, Snafu)] +pub enum ReplicatedWriteError { + #[snafu(display("{}", source))] + InvalidFlatbuffer { + source: flatbuffers::InvalidFlatbuffer, + }, + #[snafu(display("{}", source))] + InvalidServerId { + source: data_types::server_id::Error, + }, +} - fn try_from(data: (Vec, ServerId)) -> Result { +impl TryFrom> for ReplicatedWrite { + type Error = ReplicatedWriteError; + + fn try_from(data: Vec) -> Result { ReplicatedWriteTryBuilder { - data: data.0, - fb_builder: |data| flatbuffers::root::>(data), + data, + fb_builder: |data| { + let fb = flatbuffers::root::>(data) + .context(InvalidFlatbuffer)?; + + // Raise an error now if the server ID is invalid so that `SequencedEntry`'s + // `server_id` method can assume it has a valid `ServerId` + TryInto::::try_into(fb.server_id()).context(InvalidServerId)?; + + Ok(fb) + }, write_buffer_batch_builder: |data| match flatbuffers::root::>( data, - )? + ) + .context(InvalidFlatbuffer)? .payload() { - Some(payload) => Ok(Some(flatbuffers::root::>( - &payload, - )?)), + Some(payload) => Ok(Some( + flatbuffers::root::>(&payload) + .context(InvalidFlatbuffer)?, + )), None => Ok(None), }, - server_id: data.1, } .try_build() } @@ -210,7 +239,7 @@ pub fn lines_to_replicated_write( fbb.finish(write, None); let (mut data, idx) = fbb.collapse(); - ReplicatedWrite::try_from((data.split_off(idx), server_id)) + ReplicatedWrite::try_from(data.split_off(idx)) .expect("Flatbuffer data just constructed should be valid") } diff --git a/internal_types/src/entry.rs b/internal_types/src/entry.rs index 02a5b84fad..83bc23567c 100644 --- a/internal_types/src/entry.rs +++ b/internal_types/src/entry.rs @@ -9,7 +9,10 @@ use data_types::{ use generated_types::entry as entry_fb; use influxdb_line_protocol::{FieldValue, ParsedLine}; -use std::{collections::BTreeMap, convert::TryFrom}; +use std::{ + collections::BTreeMap, + convert::{TryFrom, TryInto}, +}; use chrono::{DateTime, Utc}; use flatbuffers::{FlatBufferBuilder, Follow, ForwardsUOffset, Vector, VectorIter, WIPOffset}; @@ -1183,7 +1186,6 @@ pub struct SequencedEntry { #[borrows(data)] #[covariant] entry: Option>, - server_id: ServerId, } impl SequencedEntry { @@ -1214,7 +1216,7 @@ impl SequencedEntry { fbb.finish(sequenced_entry, None); let (mut data, idx) = fbb.collapse(); - let sequenced_entry = Self::try_from((data.split_off(idx), server_id)) + let sequenced_entry = Self::try_from(data.split_off(idx)) .expect("Flatbuffer data just constructed should be valid"); Ok(sequenced_entry) @@ -1243,24 +1245,49 @@ impl SequencedEntry { } pub fn server_id(&self) -> ServerId { - *self.borrow_server_id() + self.fb() + .server_id() + .try_into() + .expect("ServerId should have been validated when this was built from flatbuffers") } } -impl TryFrom<(Vec, ServerId)> for SequencedEntry { - type Error = flatbuffers::InvalidFlatbuffer; +#[derive(Debug, Snafu)] +pub enum SequencedEntryError { + #[snafu(display("{}", source))] + InvalidFlatbuffer { + source: flatbuffers::InvalidFlatbuffer, + }, + #[snafu(display("{}", source))] + InvalidServerId { + source: data_types::server_id::Error, + }, +} - fn try_from(data: (Vec, ServerId)) -> Result { +impl TryFrom> for SequencedEntry { + type Error = SequencedEntryError; + + fn try_from(data: Vec) -> Result { SequencedEntryTryBuilder { - server_id: data.1, - data: data.0, - fb_builder: |data| flatbuffers::root::>(data), - entry_builder: |data| match flatbuffers::root::>(data)? + data, + fb_builder: |data| { + let fb = flatbuffers::root::>(data) + .context(InvalidFlatbuffer)?; + + // Raise an error now if the server ID is invalid so that `SequencedEntry`'s + // `server_id` method can assume it has a valid `ServerId` + TryInto::::try_into(fb.server_id()).context(InvalidServerId)?; + + Ok(fb) + }, + entry_builder: |data| match flatbuffers::root::>(data) + .context(InvalidFlatbuffer)? .entry_bytes() { - Some(entry_bytes) => Ok(Some(flatbuffers::root::>( - &entry_bytes, - )?)), + Some(entry_bytes) => Ok(Some( + flatbuffers::root::>(&entry_bytes) + .context(InvalidFlatbuffer)?, + )), None => Ok(None), }, } @@ -1958,4 +1985,44 @@ mod tests { let values = col.values().f64_values().unwrap(); assert_eq!(&values, &[None, Some(23.2), None]); } + + #[test] + fn validate_sequenced_entry_server_id() { + let lp = vec![ + "a,host=a val=23i 983", + "a,host=a,region=west val2=23.2 2343", + "a val=21i,bool=true,string=\"hello\" 222", + ] + .join("\n"); + let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect(); + + let sharded_entries = + lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap(); + + let entry_bytes = sharded_entries.first().unwrap().entry.data(); + + const OVERHEAD: usize = 4 * std::mem::size_of::(); + let mut fbb = FlatBufferBuilder::new_with_capacity(entry_bytes.len() + OVERHEAD); + + let entry_bytes = fbb.create_vector_direct(entry_bytes); + let sequenced_entry = entry_fb::SequencedEntry::create( + &mut fbb, + &entry_fb::SequencedEntryArgs { + clock_value: 3, + server_id: 0, // <----- IMPORTANT PART this is invalid and should error + entry_bytes: Some(entry_bytes), + }, + ); + + fbb.finish(sequenced_entry, None); + + let (mut data, idx) = fbb.collapse(); + let result = SequencedEntry::try_from(data.split_off(idx)); + + assert!( + matches!(result, Err(SequencedEntryError::InvalidServerId { .. })), + "result was {:?}", + result + ); + } } diff --git a/server/src/buffer.rs b/server/src/buffer.rs index f38b871d75..a9f3e4babe 100644 --- a/server/src/buffer.rs +++ b/server/src/buffer.rs @@ -49,11 +49,6 @@ pub enum Error { #[snafu(display("segment id must be between [1, 1,000,000,000)"))] SegmentIdOutOfBounds, - #[snafu(display("Server ID must not be 0: {}", source))] - ServerIdInvalid { - source: data_types::server_id::Error, - }, - #[snafu(display("unable to compress segment id {}: {}", segment_id, source))] UnableToCompressData { segment_id: u64, @@ -76,6 +71,11 @@ pub enum Error { source: flatbuffers::InvalidFlatbuffer, }, + #[snafu(display("Invalid ReplicatedWrite: {}", source))] + InvalidReplicatedWrite { + source: internal_types::data::ReplicatedWriteError, + }, + #[snafu(display("the flatbuffers size is too small; only found {} bytes", bytes))] FlatbuffersSegmentTooSmall { bytes: usize }, @@ -520,14 +520,12 @@ impl Segment { .writes() .context(FlatbuffersMissingField { field: "writes" })?; let mut segment = Self::new_with_capacity(fb_segment.id(), writes.len()); - let server_id = ServerId::try_from(fb_segment.server_id()).context(ServerIdInvalid)?; for w in writes { let data = w .payload() .context(FlatbuffersMissingField { field: "payload" })? .to_vec(); - let rw = - ReplicatedWrite::try_from((data, server_id)).context(InvalidFlatbuffersSegment)?; + let rw = ReplicatedWrite::try_from(data).context(InvalidReplicatedWrite)?; segment.append(Arc::new(rw))?; }