Merge pull request #1017 from influxdata/cn/check-in-flatbuffers

chore: Upgrade flatbuffers and check in generated code
pull/24376/head
kodiakhq[bot] 2021-03-22 19:23:48 +00:00 committed by GitHub
commit 9817dbef2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 3465 additions and 181 deletions

1
.gitattributes vendored
View File

@ -1,2 +1,3 @@
generated_types/protos/google/ linguist-generated=true
generated_types/protos/grpc/ linguist-generated=true
generated_types/src/wal_generated.rs linguist-generated=true

View File

@ -186,3 +186,20 @@ cargo clippy --all-targets --workspace -- -D warnings
[`rustfmt`]: https://github.com/rust-lang/rustfmt
[`clippy`]: https://github.com/rust-lang/rust-clippy
## Upgrading the `flatbuffers` crate
IOx uses Flatbuffers for its write-ahead log. The structure is defined in
[`generated_types/protos/wal.fbs`]. We have then used the `flatc` Flatbuffers compiler to generate
the corresponding Rust code in [`generated_types/src/wal_generated.rs`], which is checked in to the
repository.
The checked-in code is compatible with the `flatbuffers` crate version in the `Cargo.lock` file. If
upgrading the version of the `flatbuffers` crate that IOx depends on, the generated code will need
to be updated as well.
Instructions for updating the generated code are in [`docs/regenerating_flatbuffers.md`].
[`generated_types/protos/wal.fbs`]: generated_types/protos/wal.fbs
[`generated_types/src/wal_generated.rs`]: generated_types/src/wal_generated.rs
[`docs/regenerating_flatbuffers.md`]: docs/regenerating_flatbuffers.md

59
Cargo.lock generated
View File

@ -1,5 +1,15 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "Inflector"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3"
dependencies = [
"lazy_static",
"regex",
]
[[package]]
name = "RustyXML"
version = "0.3.0"
@ -106,7 +116,7 @@ dependencies = [
"cfg_aliases",
"chrono",
"csv",
"flatbuffers 0.8.3",
"flatbuffers",
"hex",
"indexmap",
"lazy_static",
@ -949,15 +959,6 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d"
[[package]]
name = "flatbuffers"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a788f068dd10687940565bf4b5480ee943176cbd114b12e811074bcf7c04e4b9"
dependencies = [
"smallvec",
]
[[package]]
name = "flatbuffers"
version = "0.8.3"
@ -1133,7 +1134,7 @@ dependencies = [
name = "generated_types"
version = "0.1.0"
dependencies = [
"flatbuffers 0.6.1",
"flatbuffers",
"futures",
"google_types",
"prost",
@ -1548,9 +1549,10 @@ dependencies = [
"crc32fast",
"criterion",
"data_types",
"flatbuffers 0.6.1",
"flatbuffers",
"generated_types",
"influxdb_line_protocol",
"ouroboros",
"snafu",
"tracing",
]
@ -1843,7 +1845,7 @@ dependencies = [
"chrono",
"criterion",
"data_types",
"flatbuffers 0.6.1",
"flatbuffers",
"generated_types",
"influxdb_line_protocol",
"internal_types",
@ -2150,6 +2152,29 @@ dependencies = [
"num-traits",
]
[[package]]
name = "ouroboros"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f6d5c203fe8d786d9d7bec8203cbbff3eb2cf8410c0d70cfd05b3d5f5d545da"
dependencies = [
"ouroboros_macro",
"stable_deref_trait",
]
[[package]]
name = "ouroboros_macro"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "129943a960e6a08c7e70ca5a09f113c273fe7f10ae8420992c78293e3dffdf65"
dependencies = [
"Inflector",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "packed_simd_2"
version = "0.3.4"
@ -3135,7 +3160,7 @@ dependencies = [
"chrono",
"crc32fast",
"data_types",
"flatbuffers 0.6.1",
"flatbuffers",
"futures",
"generated_types",
"hashbrown",
@ -3282,6 +3307,12 @@ dependencies = [
"log",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]]
name = "standback"
version = "0.2.15"

View File

@ -28,8 +28,7 @@ We're also hosting monthly tech talks and community office hours on the project
## Quick Start
To compile and run InfluxDB IOx from source, you'll need a Rust compiler and a `flatc` FlatBuffers
compiler.
To compile and run InfluxDB IOx from source, you'll need a Rust compiler and `clang`.
### Build a Docker Image
@ -80,36 +79,6 @@ rustc --version
and you should see a nightly version of Rust!
### Installing `flatc`
InfluxDB IOx uses the [FlatBuffer] serialization format for its write-ahead log. The [`flatc`
compiler] reads the schema in `generated_types/wal.fbs` and generates the corresponding Rust code.
Install `flatc` >= 1.12.0 with one of these methods as appropriate to your operating system:
* Using a [Windows binary release]
* Using the [`flatbuffers` package for conda]
* Using the [`flatbuffers` package for Arch Linux]
* Using the [`flatbuffers` package for Homebrew]
Once you have installed the packages, you should be able to run:
```shell
flatc --version
```
and see the version displayed.
You won't have to run `flatc` directly; once it's available, Rust's Cargo build tool manages the
compilation process by calling `flatc` for you.
[FlatBuffer]: https://google.github.io/flatbuffers/
[`flatc` compiler]: https://google.github.io/flatbuffers/flatbuffers_guide_using_schema_compiler.html
[Windows binary release]: https://github.com/google/flatbuffers/releases
[`flatbuffers` package for conda]: https://anaconda.org/conda-forge/flatbuffers
[`flatbuffers` package for Arch Linux]: https://www.archlinux.org/packages/community/x86_64/flatbuffers/
[`flatbuffers` package for Homebrew]: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/flatbuffers.rb
### Installing `clang`
An installation of `clang` is required to build the [`croaring`] dependency - if
@ -143,7 +112,6 @@ provided [example](docs/env.example) as a template if you want:
cp docs/env.example .env
```
### Compiling and Starting the Server
InfluxDB IOx is built using Cargo, Rust's package manager and build tool.

View File

@ -9,25 +9,9 @@
# docker build -f docker/Dockerfile.ci .
##
# Build any binaries that can be copied into the CI image
# Note we build flatbuffers from source (pinned to a particualar version)
FROM rust:slim-buster AS flatc
ARG flatbuffers_version="v1.12.0"
RUN apt-get update \
&& mkdir -p /usr/share/man/man1 \
&& apt-get install -y \
git make clang cmake llvm \
--no-install-recommends \
&& git clone -b ${flatbuffers_version} -- https://github.com/google/flatbuffers.git /usr/local/src/flatbuffers \
&& cmake -S /usr/local/src/flatbuffers -B /usr/local/src/flatbuffers \
-G "Unix Makefiles" \
-DCMAKE_BUILD_TYPE=Release \
&& make -C /usr/local/src/flatbuffers -j $(nproc) flatc
# Build actual image used for CI pipeline
FROM rust:slim-buster
COPY --from=flatc /usr/local/src/flatbuffers/flatc /usr/bin/flatc
# make Apt non-interactive
RUN echo 'APT::Get::Assume-Yes "true";' > /etc/apt/apt.conf.d/90ci \
&& echo 'DPkg::Options "--force-confnew";' >> /etc/apt/apt.conf.d/90ci

View File

@ -22,3 +22,5 @@ We hold monthly Tech Talks that explain the project's technical underpinnings. Y
* Thoughts on using multiple cores: [multi_core_tasks.md](multi_core_tasks.md)
* [Query Engine Docs](../query/README.md)
* [Testing documentation](testing.md) for developers of IOx
* [Regenerating Flatbuffers code](regenerating_flatbuffers.md) when updating the version of the `flatbuffers` crate

View File

@ -0,0 +1,7 @@
# Regenerating Flatbuffers code
When updating the version of the [flatbuffers](https://crates.io/crates/flatbuffers) Rust crate used as a dependency in the IOx workspace, the generated Rust code in `generated_types/src/wal_generated.rs` also needs to be updated in sync.
To update the generated code, edit `generated_types/regenerate-flatbuffers.sh` and set the `FB_COMMIT` variable at the top of the file to the commit SHA of the same commit in the [flatbuffers repository](https://github.com/google/flatbuffers) where the `flatbuffers` Rust crate version was updated. This ensures we'll be [using the same version of `flatc` that the crate was tested with](https://github.com/google/flatbuffers/issues/6199#issuecomment-714562121).
Then run the `generated_types/regenerate-flatbuffers.sh` script and check in any changes. Check the whole project builds.

2
generated_types/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
.flatbuffers

View File

@ -5,7 +5,9 @@ authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
[dependencies] # In alphabetical order
flatbuffers = "0.6" # TODO: Update to 0.8
# See docs/regenerating_flatbuffers.md about updating generated code when updating the
# version of the flatbuffers crate
flatbuffers = "0.8"
futures = "0.3.1"
prost = "0.7"
prost-types = "0.7"

View File

@ -1,10 +1,6 @@
//! Compiles Protocol Buffers and FlatBuffers schema definitions into
//! native Rust types.
//! Compiles Protocol Buffers into native Rust types.
use std::{
path::{Path, PathBuf},
process::Command,
};
use std::path::{Path, PathBuf};
type Error = Box<dyn std::error::Error>;
type Result<T, E = Error> = std::result::Result<T, E>;
@ -13,7 +9,6 @@ fn main() -> Result<()> {
let root = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("protos");
generate_grpc_types(&root)?;
generate_wal_types(&root)?;
Ok(())
}
@ -66,30 +61,3 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
Ok(())
}
/// Schema used in the WAL
///
/// Creates `wal_generated.rs`
fn generate_wal_types(root: &Path) -> Result<()> {
let wal_file = root.join("wal.fbs");
println!("cargo:rerun-if-changed={}", wal_file.display());
let out_dir: PathBuf = std::env::var_os("OUT_DIR")
.expect("Could not determine `OUT_DIR`")
.into();
let status = Command::new("flatc")
.arg("--rust")
.arg("-o")
.arg(&out_dir)
.arg(wal_file)
.status();
match status {
Ok(status) if !status.success() => panic!("`flatc` failed to compile the .fbs to Rust"),
Ok(_status) => {} // Successfully compiled
Err(err) => panic!("Could not execute `flatc`: {}", err),
}
Ok(())
}

View File

@ -0,0 +1,49 @@
#!/bin/bash -e
# The commit where the Rust `flatbuffers` crate version was changed to the version in `Cargo.lock`
# Update this, rerun this script, and check in the changes in the generated code when the
# `flatbuffers` crate version is updated.
FB_COMMIT="86401e078d0746d2381735415f8c2dfe849f3f52"
# Change to the generated_types crate directory, where this script is located
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
pushd $DIR
echo "Building flatc from source ..."
FB_URL="https://github.com/google/flatbuffers"
FB_DIR=".flatbuffers"
FLATC="$FB_DIR/bazel-bin/flatc"
if [ -z $(which bazel) ]; then
echo "bazel is required to build flatc"
exit 1
fi
echo "Bazel version: $(bazel version | head -1 | awk -F':' '{print $2}')"
if [ ! -e $FB_DIR ]; then
echo "git clone $FB_URL ..."
git clone -b master --no-tag $FB_URL $FB_DIR
else
echo "git pull $FB_URL ..."
git -C $FB_DIR pull --ff-only
fi
echo "hard reset to $FB_COMMIT"
git -C $FB_DIR reset --hard $FB_COMMIT
pushd $FB_DIR
echo "run: bazel build :flatc ..."
bazel build :flatc
popd
WAL_FBS="$DIR/protos/wal.fbs"
WAL_RS_DIR="$DIR/src"
$FLATC --rust -o $WAL_RS_DIR $WAL_FBS
cargo fmt
popd
echo "DONE! Please run 'cargo test' and check in any changes."

View File

@ -74,7 +74,9 @@ pub mod grpc {
}
}
include!(concat!(env!("OUT_DIR"), "/wal_generated.rs"));
/// Generated Flatbuffers code for working with the write-ahead log
pub mod wal_generated;
pub use wal_generated::wal;
/// gRPC Storage Service
pub const STORAGE_SERVICE: &str = "influxdata.platform.storage.Storage";

3220
generated_types/src/wal_generated.rs generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -6,16 +6,17 @@ edition = "2018"
description = "InfluxDB IOx internal types, shared between IOx instances"
readme = "README.md"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
arrow_deps = { path = "../arrow_deps" }
crc32fast = "1.2.0"
chrono = { version = "0.4", features = ["serde"] }
data_types = { path = "../data_types" }
flatbuffers = "0.6" # TODO: Update to 0.8
# See docs/regenerating_flatbuffers.md about updating generated code when updating the
# version of the flatbuffers crate
flatbuffers = "0.8"
generated_types = { path = "../generated_types" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
ouroboros = "0.8.3"
snafu = "0.6"
tracing = "0.1"

View File

@ -4,9 +4,12 @@ use data_types::database_rules::{DatabaseRules, PartitionTemplate, TemplatePart}
use generated_types::wal as wb;
use influxdb_line_protocol::{parse_lines, ParsedLine};
use internal_types::data::{lines_to_replicated_write as lines_to_rw, ReplicatedWrite};
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::time::Duration;
use std::{
collections::{BTreeMap, BTreeSet},
convert::TryFrom,
fmt,
time::Duration,
};
const NEXT_ENTRY_NS: i64 = 1_000_000_000;
const STARTING_TIMESTAMP_NS: i64 = 0;
@ -61,7 +64,7 @@ fn replicated_write_into_bytes(c: &mut Criterion) {
assert_eq!(write.entry_count(), config.partition_count);
b.iter(|| {
let _ = write.bytes().len();
let _ = write.data().len();
});
},
);
@ -73,7 +76,7 @@ fn bytes_into_struct(c: &mut Criterion) {
run_group("bytes_into_struct", c, |lines, rules, config, b| {
let write = lines_to_rw(0, 0, &lines, rules);
assert_eq!(write.entry_count(), config.partition_count);
let data = write.bytes();
let data = write.data();
b.iter(|| {
let mut db = Db::default();
@ -160,7 +163,7 @@ struct Db {
impl Db {
fn deserialize_write(&mut self, data: &[u8]) {
let write = ReplicatedWrite::from(data);
let write = ReplicatedWrite::try_from(data.to_vec()).unwrap();
if let Some(batch) = write.write_buffer_batch() {
if let Some(entries) = batch.entries() {

View File

@ -6,87 +6,98 @@ use data_types::database_rules::Partitioner;
use generated_types::wal as wb;
use influxdb_line_protocol::{FieldValue, ParsedLine};
use std::{collections::BTreeMap, fmt};
use std::{collections::BTreeMap, convert::TryFrom, fmt};
use chrono::Utc;
use crc32fast::Hasher;
use flatbuffers::FlatBufferBuilder;
use ouroboros::self_referencing;
pub fn type_description(value: wb::ColumnValue) -> &'static str {
use wb::ColumnValue::*;
match value {
NONE => "none",
TagValue => "tag",
I64Value => "i64",
U64Value => "u64",
F64Value => "f64",
BoolValue => "bool",
StringValue => "String",
wb::ColumnValue::TagValue => "tag",
wb::ColumnValue::I64Value => "i64",
wb::ColumnValue::U64Value => "u64",
wb::ColumnValue::F64Value => "f64",
wb::ColumnValue::BoolValue => "bool",
wb::ColumnValue::StringValue => "String",
wb::ColumnValue::NONE => "none",
_ => "none",
}
}
/// A friendlier wrapper to help deal with the Flatbuffers write data
#[derive(Debug, Default, Clone, PartialEq)]
#[self_referencing]
#[derive(Debug, Clone, PartialEq)]
pub struct ReplicatedWrite {
pub data: Vec<u8>,
data: Vec<u8>,
#[borrows(data)]
#[covariant]
fb: wb::ReplicatedWrite<'this>,
#[borrows(data)]
#[covariant]
write_buffer_batch: Option<wb::WriteBufferBatch<'this>>,
}
impl ReplicatedWrite {
/// Returns the Flatbuffers struct represented by the raw bytes.
pub fn to_fb(&self) -> wb::ReplicatedWrite<'_> {
flatbuffers::get_root::<wb::ReplicatedWrite<'_>>(&self.data)
}
/// Returns the Flatbuffers struct for the WriteBufferBatch in the raw bytes
/// of the payload of the ReplicatedWrite.
pub fn write_buffer_batch(&self) -> Option<wb::WriteBufferBatch<'_>> {
match self.to_fb().payload() {
Some(d) => Some(flatbuffers::get_root::<wb::WriteBufferBatch<'_>>(&d)),
None => None,
}
pub fn write_buffer_batch(&self) -> Option<&wb::WriteBufferBatch<'_>> {
self.borrow_write_buffer_batch().as_ref()
}
/// Returns the Flatbuffers struct for the ReplicatedWrite
pub fn fb(&self) -> &wb::ReplicatedWrite<'_> {
self.borrow_fb()
}
/// Returns true if this replicated write matches the writer and sequence.
pub fn equal_to_writer_and_sequence(&self, writer_id: u32, sequence_number: u64) -> bool {
let fb = self.to_fb();
fb.writer() == writer_id && fb.sequence() == sequence_number
self.fb().writer() == writer_id && self.fb().sequence() == sequence_number
}
/// Returns the writer id and sequence number
pub fn writer_and_sequence(&self) -> (u32, u64) {
let fb = self.to_fb();
(fb.writer(), fb.sequence())
(self.fb().writer(), self.fb().sequence())
}
/// Returns the serialized bytes for the write. (used for benchmarking)
pub fn bytes(&self) -> &Vec<u8> {
&self.data
/// Returns the serialized bytes for the write
pub fn data(&self) -> &[u8] {
self.borrow_data()
}
/// Returns the number of write buffer entries in this replicated write
pub fn entry_count(&self) -> usize {
if let Some(batch) = self.write_buffer_batch() {
if let Some(entries) = batch.entries() {
return entries.len();
}
}
0
self.write_buffer_batch()
.map_or(0, |wbb| wbb.entries().map_or(0, |entries| entries.len()))
}
}
impl From<&[u8]> for ReplicatedWrite {
fn from(data: &[u8]) -> Self {
Self {
data: Vec::from(data),
impl TryFrom<Vec<u8>> for ReplicatedWrite {
type Error = flatbuffers::InvalidFlatbuffer;
fn try_from(data: Vec<u8>) -> Result<Self, Self::Error> {
ReplicatedWriteTryBuilder {
data,
fb_builder: |data| flatbuffers::root::<wb::ReplicatedWrite<'_>>(data),
write_buffer_batch_builder: |data| match flatbuffers::root::<wb::ReplicatedWrite<'_>>(
data,
)?
.payload()
{
Some(payload) => Ok(Some(flatbuffers::root::<wb::WriteBufferBatch<'_>>(
&payload,
)?)),
None => Ok(None),
},
}
.try_build()
}
}
impl fmt::Display for ReplicatedWrite {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let fb = self.to_fb();
let fb = self.fb();
write!(
f,
"\nwriter:{}, sequence:{}, checksum:{}\n",
@ -143,6 +154,7 @@ impl fmt::Display for ReplicatedWrite {
.unwrap_or("")
.to_string(),
wb::ColumnValue::NONE => "".to_string(),
_ => "".to_string(),
};
write!(f, " {}:{}", value.column().unwrap_or(""), val)?;
}
@ -192,9 +204,8 @@ pub fn lines_to_replicated_write(
fbb.finish(write, None);
let (mut data, idx) = fbb.collapse();
ReplicatedWrite {
data: data.split_off(idx),
}
ReplicatedWrite::try_from(data.split_off(idx))
.expect("Flatbuffer data just constructed should be valid")
}
pub fn split_lines_into_write_entry_partitions(

View File

@ -18,7 +18,9 @@ arrow_deps = { path = "../arrow_deps" }
async-trait = "0.1"
chrono = "0.4"
data_types = { path = "../data_types" }
flatbuffers = "0.6" # TODO: Update to 0.8
# See docs/regenerating_flatbuffers.md about updating generated code when updating the
# version of the flatbuffers crate
flatbuffers = "0.8"
generated_types = { path = "../generated_types" }
internal_types = { path = "../internal_types" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }

View File

@ -46,10 +46,8 @@ impl Column {
capacity: usize,
value: wb::Value<'_>,
) -> Result<Self> {
use wb::ColumnValue::*;
Ok(match value.value_type() {
F64Value => {
wb::ColumnValue::F64Value => {
let val = value
.value_as_f64value()
.expect("f64 value should be present")
@ -58,7 +56,7 @@ impl Column {
vals.push(Some(val));
Self::F64(vals, StatValues::new(val))
}
I64Value => {
wb::ColumnValue::I64Value => {
let val = value
.value_as_i64value()
.expect("i64 value should be present")
@ -67,7 +65,7 @@ impl Column {
vals.push(Some(val));
Self::I64(vals, StatValues::new(val))
}
U64Value => {
wb::ColumnValue::U64Value => {
let val = value
.value_as_u64value()
.expect("u64 value should be present")
@ -76,7 +74,7 @@ impl Column {
vals.push(Some(val));
Self::U64(vals, StatValues::new(val))
}
StringValue => {
wb::ColumnValue::StringValue => {
let val = value
.value_as_string_value()
.expect("string value should be present")
@ -86,7 +84,7 @@ impl Column {
vals.push(Some(val.to_string()));
Self::String(vals, StatValues::new(val.to_string()))
}
BoolValue => {
wb::ColumnValue::BoolValue => {
let val = value
.value_as_bool_value()
.expect("bool value should be present")
@ -95,7 +93,7 @@ impl Column {
vals.push(Some(val));
Self::Bool(vals, StatValues::new(val))
}
TagValue => {
wb::ColumnValue::TagValue => {
let val = value
.value_as_tag_value()
.expect("tag value should be present")

View File

@ -188,7 +188,7 @@ impl MutableBufferDb {
Some(b) => self.write_entries_to_partitions(&b)?,
None => {
return MissingPayload {
writer: write.to_fb().writer(),
writer: write.fb().writer(),
}
.fail()
}

View File

@ -929,7 +929,7 @@ mod tests {
let lines: Vec<_> = parse_lines(&lp_string).map(|l| l.unwrap()).collect();
let data = split_lines_into_write_entry_partitions(|_| partition.key().into(), &lines);
let batch = flatbuffers::get_root::<wb::WriteBufferBatch<'_>>(&data);
let batch = flatbuffers::root::<wb::WriteBufferBatch<'_>>(&data).unwrap();
let entries = batch.entries().unwrap();
for entry in entries {

View File

@ -808,7 +808,7 @@ mod tests {
let data = split_lines_into_write_entry_partitions(chunk_key_func, &lines);
let batch = flatbuffers::get_root::<wb::WriteBufferBatch<'_>>(&data);
let batch = flatbuffers::root::<wb::WriteBufferBatch<'_>>(&data).unwrap();
let entries = batch.entries().expect("at least one entry");
for entry in entries {

View File

@ -11,7 +11,9 @@ bytes = "1.0"
chrono = "0.4"
crc32fast = "1.2.0"
data_types = { path = "../data_types" }
flatbuffers = "0.6" # TODO: Update to 0.8
# See docs/regenerating_flatbuffers.md about updating generated code when updating the
# version of the flatbuffers crate
flatbuffers = "0.8"
futures = "0.3.7"
generated_types = { path = "../generated_types" }
hashbrown = "0.9.1"

View File

@ -69,8 +69,16 @@ pub enum Error {
#[snafu(display("checksum mismatch for segment"))]
ChecksumMismatch,
#[snafu(display("the flatbuffers Segment is invalid"))]
InvalidFlatbuffersSegment,
#[snafu(display("the flatbuffers Segment is invalid: {}", source))]
InvalidFlatbuffersSegment {
source: flatbuffers::InvalidFlatbuffer,
},
#[snafu(display("the flatbuffers size is too small; only found {} bytes", bytes))]
FlatbuffersSegmentTooSmall { bytes: usize },
#[snafu(display("the flatbuffers Segment is missing an expected value for {}", field))]
FlatbuffersMissingField { field: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -111,7 +119,7 @@ impl Buffer {
/// by accepting the write, the oldest (first) of the closed segments
/// will be dropped, if it is persisted. Otherwise, an error is returned.
pub fn append(&mut self, write: Arc<ReplicatedWrite>) -> Result<Option<Arc<Segment>>> {
let write_size = u64::try_from(write.data.len())
let write_size = u64::try_from(write.data().len())
.expect("appended data must be less than a u64 in length");
while self.current_size + write_size > self.max_size {
@ -310,7 +318,7 @@ impl Segment {
let (writer_id, sequence_number) = write.writer_and_sequence();
self.validate_and_update_sequence_summary(writer_id, sequence_number)?;
let size = write.data.len();
let size = write.data().len();
let size = u64::try_from(size).expect("appended data must be less than a u64 in length");
self.size += size;
@ -418,7 +426,7 @@ impl Segment {
.writes
.iter()
.map(|rw| {
let payload = fbb.create_vector_direct(&rw.data);
let payload = fbb.create_vector_direct(rw.data());
wal::ReplicatedWriteData::create(
&mut fbb,
&wal::ReplicatedWriteDataArgs {
@ -483,7 +491,7 @@ impl Segment {
/// deserializes it into a Segment struct.
pub fn from_file_bytes(data: &[u8]) -> Result<Self> {
if data.len() < std::mem::size_of::<u32>() {
return Err(Error::InvalidFlatbuffersSegment);
return FlatbuffersSegmentTooSmall { bytes: data.len() }.fail();
}
let (data, checksum) = data.split_at(data.len() - std::mem::size_of::<u32>());
@ -501,15 +509,21 @@ impl Segment {
.decompress_vec(data)
.context(UnableToDecompressData)?;
let fb_segment = flatbuffers::get_root::<wal::Segment<'_>>(&data);
// Use verified flatbuffer functionality here
let fb_segment =
flatbuffers::root::<wal::Segment<'_>>(&data).context(InvalidFlatbuffersSegment)?;
let writes = fb_segment.writes().context(InvalidFlatbuffersSegment)?;
let writes = fb_segment
.writes()
.context(FlatbuffersMissingField { field: "writes" })?;
let mut segment = Self::new_with_capacity(fb_segment.id(), writes.len());
for w in writes {
let data = w.payload().context(InvalidFlatbuffersSegment)?;
let rw = ReplicatedWrite {
data: data.to_vec(),
};
let data = w
.payload()
.context(FlatbuffersMissingField { field: "payload" })?
.to_vec();
let rw = ReplicatedWrite::try_from(data).context(InvalidFlatbuffersSegment)?;
segment.append(Arc::new(rw))?;
}
@ -579,7 +593,7 @@ mod tests {
let mut buf = Buffer::new(max, segment, WalBufferRollover::ReturnError, false);
let write = lp_to_replicated_write(1, 1, "cpu val=1 10");
let size = write.data.len() as u64;
let size = write.data().len() as u64;
assert_eq!(0, buf.size());
let segment = buf.append(write).unwrap();
assert_eq!(size, buf.size());
@ -732,7 +746,7 @@ mod tests {
fn all_writes_since() {
let max = 1 << 63;
let write = lp_to_replicated_write(1, 1, "cpu val=1 10");
let segment = (write.data.len() + 1) as u64;
let segment = (write.data().len() + 1) as u64;
let mut buf = Buffer::new(max, segment, WalBufferRollover::ReturnError, false);
let segment = buf.append(write).unwrap();
@ -789,7 +803,7 @@ mod tests {
fn writes_since() {
let max = 1 << 63;
let write = lp_to_replicated_write(1, 1, "cpu val=1 10");
let segment = (write.data.len() + 1) as u64;
let segment = (write.data().len() + 1) as u64;
let mut buf = Buffer::new(max, segment, WalBufferRollover::ReturnError, false);
let segment = buf.append(write).unwrap();
@ -836,7 +850,7 @@ mod tests {
fn returns_error_if_sequence_decreases() {
let max = 1 << 63;
let write = lp_to_replicated_write(1, 3, "cpu val=1 10");
let segment = (write.data.len() + 1) as u64;
let segment = (write.data().len() + 1) as u64;
let mut buf = Buffer::new(max, segment, WalBufferRollover::ReturnError, false);
let segment = buf.append(write).unwrap();