fix: Validate ServerId when creating structs from flatbuffers
When we get the flatbuffers, we won't have the server ID in addition to the flatbuffers-- it's in the flatbuffers. But we want to validate the `ServerId` once when the `SequencedEntry` is created so that its `server_id` method can assume it has a valid `ServerId`.pull/24376/head
parent
b6447a1363
commit
2f4d7189ff
|
@ -6,12 +6,17 @@ use data_types::{database_rules::Partitioner, server_id::ServerId};
|
|||
use generated_types::wb;
|
||||
use influxdb_line_protocol::{FieldValue, ParsedLine};
|
||||
|
||||
use std::{collections::BTreeMap, convert::TryFrom, fmt};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
convert::{TryFrom, TryInto},
|
||||
fmt,
|
||||
};
|
||||
|
||||
use chrono::Utc;
|
||||
use crc32fast::Hasher;
|
||||
use flatbuffers::FlatBufferBuilder;
|
||||
use ouroboros::self_referencing;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
pub fn type_description(value: wb::ColumnValue) -> &'static str {
|
||||
match value {
|
||||
|
@ -37,7 +42,6 @@ pub struct ReplicatedWrite {
|
|||
#[borrows(data)]
|
||||
#[covariant]
|
||||
write_buffer_batch: Option<wb::WriteBufferBatch<'this>>,
|
||||
server_id: ServerId,
|
||||
}
|
||||
|
||||
impl ReplicatedWrite {
|
||||
|
@ -63,7 +67,10 @@ impl ReplicatedWrite {
|
|||
}
|
||||
|
||||
pub fn server_id(&self) -> ServerId {
|
||||
*self.borrow_server_id()
|
||||
self.fb()
|
||||
.server_id()
|
||||
.try_into()
|
||||
.expect("ServerId should have been validated when this was built from flatbuffers")
|
||||
}
|
||||
|
||||
/// Returns the serialized bytes for the write
|
||||
|
@ -78,24 +85,46 @@ impl ReplicatedWrite {
|
|||
}
|
||||
}
|
||||
|
||||
impl TryFrom<(Vec<u8>, ServerId)> for ReplicatedWrite {
|
||||
type Error = flatbuffers::InvalidFlatbuffer;
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum ReplicatedWriteError {
|
||||
#[snafu(display("{}", source))]
|
||||
InvalidFlatbuffer {
|
||||
source: flatbuffers::InvalidFlatbuffer,
|
||||
},
|
||||
#[snafu(display("{}", source))]
|
||||
InvalidServerId {
|
||||
source: data_types::server_id::Error,
|
||||
},
|
||||
}
|
||||
|
||||
fn try_from(data: (Vec<u8>, ServerId)) -> Result<Self, Self::Error> {
|
||||
impl TryFrom<Vec<u8>> for ReplicatedWrite {
|
||||
type Error = ReplicatedWriteError;
|
||||
|
||||
fn try_from(data: Vec<u8>) -> Result<Self, Self::Error> {
|
||||
ReplicatedWriteTryBuilder {
|
||||
data: data.0,
|
||||
fb_builder: |data| flatbuffers::root::<wb::ReplicatedWrite<'_>>(data),
|
||||
data,
|
||||
fb_builder: |data| {
|
||||
let fb = flatbuffers::root::<wb::ReplicatedWrite<'_>>(data)
|
||||
.context(InvalidFlatbuffer)?;
|
||||
|
||||
// Raise an error now if the server ID is invalid so that `SequencedEntry`'s
|
||||
// `server_id` method can assume it has a valid `ServerId`
|
||||
TryInto::<ServerId>::try_into(fb.server_id()).context(InvalidServerId)?;
|
||||
|
||||
Ok(fb)
|
||||
},
|
||||
write_buffer_batch_builder: |data| match flatbuffers::root::<wb::ReplicatedWrite<'_>>(
|
||||
data,
|
||||
)?
|
||||
)
|
||||
.context(InvalidFlatbuffer)?
|
||||
.payload()
|
||||
{
|
||||
Some(payload) => Ok(Some(flatbuffers::root::<wb::WriteBufferBatch<'_>>(
|
||||
&payload,
|
||||
)?)),
|
||||
Some(payload) => Ok(Some(
|
||||
flatbuffers::root::<wb::WriteBufferBatch<'_>>(&payload)
|
||||
.context(InvalidFlatbuffer)?,
|
||||
)),
|
||||
None => Ok(None),
|
||||
},
|
||||
server_id: data.1,
|
||||
}
|
||||
.try_build()
|
||||
}
|
||||
|
@ -210,7 +239,7 @@ pub fn lines_to_replicated_write(
|
|||
fbb.finish(write, None);
|
||||
|
||||
let (mut data, idx) = fbb.collapse();
|
||||
ReplicatedWrite::try_from((data.split_off(idx), server_id))
|
||||
ReplicatedWrite::try_from(data.split_off(idx))
|
||||
.expect("Flatbuffer data just constructed should be valid")
|
||||
}
|
||||
|
||||
|
|
|
@ -9,7 +9,10 @@ use data_types::{
|
|||
use generated_types::entry as entry_fb;
|
||||
use influxdb_line_protocol::{FieldValue, ParsedLine};
|
||||
|
||||
use std::{collections::BTreeMap, convert::TryFrom};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
convert::{TryFrom, TryInto},
|
||||
};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use flatbuffers::{FlatBufferBuilder, Follow, ForwardsUOffset, Vector, VectorIter, WIPOffset};
|
||||
|
@ -1183,7 +1186,6 @@ pub struct SequencedEntry {
|
|||
#[borrows(data)]
|
||||
#[covariant]
|
||||
entry: Option<entry_fb::Entry<'this>>,
|
||||
server_id: ServerId,
|
||||
}
|
||||
|
||||
impl SequencedEntry {
|
||||
|
@ -1214,7 +1216,7 @@ impl SequencedEntry {
|
|||
fbb.finish(sequenced_entry, None);
|
||||
|
||||
let (mut data, idx) = fbb.collapse();
|
||||
let sequenced_entry = Self::try_from((data.split_off(idx), server_id))
|
||||
let sequenced_entry = Self::try_from(data.split_off(idx))
|
||||
.expect("Flatbuffer data just constructed should be valid");
|
||||
|
||||
Ok(sequenced_entry)
|
||||
|
@ -1243,24 +1245,49 @@ impl SequencedEntry {
|
|||
}
|
||||
|
||||
pub fn server_id(&self) -> ServerId {
|
||||
*self.borrow_server_id()
|
||||
self.fb()
|
||||
.server_id()
|
||||
.try_into()
|
||||
.expect("ServerId should have been validated when this was built from flatbuffers")
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<(Vec<u8>, ServerId)> for SequencedEntry {
|
||||
type Error = flatbuffers::InvalidFlatbuffer;
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum SequencedEntryError {
|
||||
#[snafu(display("{}", source))]
|
||||
InvalidFlatbuffer {
|
||||
source: flatbuffers::InvalidFlatbuffer,
|
||||
},
|
||||
#[snafu(display("{}", source))]
|
||||
InvalidServerId {
|
||||
source: data_types::server_id::Error,
|
||||
},
|
||||
}
|
||||
|
||||
fn try_from(data: (Vec<u8>, ServerId)) -> Result<Self, Self::Error> {
|
||||
impl TryFrom<Vec<u8>> for SequencedEntry {
|
||||
type Error = SequencedEntryError;
|
||||
|
||||
fn try_from(data: Vec<u8>) -> Result<Self, Self::Error> {
|
||||
SequencedEntryTryBuilder {
|
||||
server_id: data.1,
|
||||
data: data.0,
|
||||
fb_builder: |data| flatbuffers::root::<entry_fb::SequencedEntry<'_>>(data),
|
||||
entry_builder: |data| match flatbuffers::root::<entry_fb::SequencedEntry<'_>>(data)?
|
||||
data,
|
||||
fb_builder: |data| {
|
||||
let fb = flatbuffers::root::<entry_fb::SequencedEntry<'_>>(data)
|
||||
.context(InvalidFlatbuffer)?;
|
||||
|
||||
// Raise an error now if the server ID is invalid so that `SequencedEntry`'s
|
||||
// `server_id` method can assume it has a valid `ServerId`
|
||||
TryInto::<ServerId>::try_into(fb.server_id()).context(InvalidServerId)?;
|
||||
|
||||
Ok(fb)
|
||||
},
|
||||
entry_builder: |data| match flatbuffers::root::<entry_fb::SequencedEntry<'_>>(data)
|
||||
.context(InvalidFlatbuffer)?
|
||||
.entry_bytes()
|
||||
{
|
||||
Some(entry_bytes) => Ok(Some(flatbuffers::root::<entry_fb::Entry<'_>>(
|
||||
&entry_bytes,
|
||||
)?)),
|
||||
Some(entry_bytes) => Ok(Some(
|
||||
flatbuffers::root::<entry_fb::Entry<'_>>(&entry_bytes)
|
||||
.context(InvalidFlatbuffer)?,
|
||||
)),
|
||||
None => Ok(None),
|
||||
},
|
||||
}
|
||||
|
@ -1958,4 +1985,44 @@ mod tests {
|
|||
let values = col.values().f64_values().unwrap();
|
||||
assert_eq!(&values, &[None, Some(23.2), None]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn validate_sequenced_entry_server_id() {
|
||||
let lp = vec![
|
||||
"a,host=a val=23i 983",
|
||||
"a,host=a,region=west val2=23.2 2343",
|
||||
"a val=21i,bool=true,string=\"hello\" 222",
|
||||
]
|
||||
.join("\n");
|
||||
let lines: Vec<_> = parse_lines(&lp).map(|l| l.unwrap()).collect();
|
||||
|
||||
let sharded_entries =
|
||||
lines_to_sharded_entries(&lines, sharder(1).as_ref(), &partitioner(1)).unwrap();
|
||||
|
||||
let entry_bytes = sharded_entries.first().unwrap().entry.data();
|
||||
|
||||
const OVERHEAD: usize = 4 * std::mem::size_of::<u64>();
|
||||
let mut fbb = FlatBufferBuilder::new_with_capacity(entry_bytes.len() + OVERHEAD);
|
||||
|
||||
let entry_bytes = fbb.create_vector_direct(entry_bytes);
|
||||
let sequenced_entry = entry_fb::SequencedEntry::create(
|
||||
&mut fbb,
|
||||
&entry_fb::SequencedEntryArgs {
|
||||
clock_value: 3,
|
||||
server_id: 0, // <----- IMPORTANT PART this is invalid and should error
|
||||
entry_bytes: Some(entry_bytes),
|
||||
},
|
||||
);
|
||||
|
||||
fbb.finish(sequenced_entry, None);
|
||||
|
||||
let (mut data, idx) = fbb.collapse();
|
||||
let result = SequencedEntry::try_from(data.split_off(idx));
|
||||
|
||||
assert!(
|
||||
matches!(result, Err(SequencedEntryError::InvalidServerId { .. })),
|
||||
"result was {:?}",
|
||||
result
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,11 +49,6 @@ pub enum Error {
|
|||
#[snafu(display("segment id must be between [1, 1,000,000,000)"))]
|
||||
SegmentIdOutOfBounds,
|
||||
|
||||
#[snafu(display("Server ID must not be 0: {}", source))]
|
||||
ServerIdInvalid {
|
||||
source: data_types::server_id::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("unable to compress segment id {}: {}", segment_id, source))]
|
||||
UnableToCompressData {
|
||||
segment_id: u64,
|
||||
|
@ -76,6 +71,11 @@ pub enum Error {
|
|||
source: flatbuffers::InvalidFlatbuffer,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid ReplicatedWrite: {}", source))]
|
||||
InvalidReplicatedWrite {
|
||||
source: internal_types::data::ReplicatedWriteError,
|
||||
},
|
||||
|
||||
#[snafu(display("the flatbuffers size is too small; only found {} bytes", bytes))]
|
||||
FlatbuffersSegmentTooSmall { bytes: usize },
|
||||
|
||||
|
@ -520,14 +520,12 @@ impl Segment {
|
|||
.writes()
|
||||
.context(FlatbuffersMissingField { field: "writes" })?;
|
||||
let mut segment = Self::new_with_capacity(fb_segment.id(), writes.len());
|
||||
let server_id = ServerId::try_from(fb_segment.server_id()).context(ServerIdInvalid)?;
|
||||
for w in writes {
|
||||
let data = w
|
||||
.payload()
|
||||
.context(FlatbuffersMissingField { field: "payload" })?
|
||||
.to_vec();
|
||||
let rw =
|
||||
ReplicatedWrite::try_from((data, server_id)).context(InvalidFlatbuffersSegment)?;
|
||||
let rw = ReplicatedWrite::try_from(data).context(InvalidReplicatedWrite)?;
|
||||
|
||||
segment.append(Arc::new(rw))?;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue