feat: remove flatbuffer entry (#3045)

pull/24376/head
Raphael Taylor-Davies 2021-11-05 20:19:24 +00:00 committed by GitHub
parent 9d0f78788d
commit 60f0deaf1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 23 additions and 6240 deletions

View File

@ -313,22 +313,6 @@ jobs:
# other stuff that may have been added to main since last merge)
MERGE_BASE=$(git merge-base origin/main $CIRCLE_BRANCH) sh -c 'buf breaking --against ".git#ref=$MERGE_BASE"'
# Check that any generated files are is up-to-date with the changes in this PR.
# named "check-flatbuffers" because that name is hardcoded into github checks
check-flatbuffers:
docker:
- image: quay.io/influxdb/rust:ci
resource_class: xlarge # use of a smaller executor tends crashes on link
steps:
- checkout
- rust_components # Regenerating flatbuffers uses rustfmt
- run:
name: Check Query Tests
command: ./query_tests/check-generated.sh
- run:
name: Check Flatbuffers
command: INFLUXDB_IOX_INTEGRATION_LOCAL=1 ./entry/check-flatbuffers.sh
# Compile a cargo "release" profile binary for branches that end in `/perf`
#
# Uses the latest ci_image (influxdb/rust below) to build a release binary and
@ -450,7 +434,6 @@ workflows:
- test_kafka_integration
- test_influxdb2_client
- build
- check-flatbuffers
- doc
- perf_image:
filters:
@ -467,7 +450,6 @@ workflows:
- test_kafka_integration
- test_influxdb2_client
- build
- check-flatbuffers
- doc
# Manual build of CI image

View File

@ -224,23 +224,6 @@ cargo clippy --all-targets --workspace -- -D warnings
[`rustfmt`]: https://github.com/rust-lang/rustfmt
[`clippy`]: https://github.com/rust-lang/rust-clippy
## Upgrading the `flatbuffers` crate
IOx uses Flatbuffers for some of its messages. The structure is defined in [`entry/src/entry.fbs`].
We have then used the `flatc` Flatbuffers compiler to generate the corresponding Rust code in
[`entry/src/entry_generated.rs`], which is checked in to the repository.
The checked-in code is compatible with the `flatbuffers` crate version in the `Cargo.lock` file. If
upgrading the version of the `flatbuffers` crate that IOx depends on, the generated code will need
to be updated as well.
Instructions for updating the generated code are in [`docs/regenerating_flatbuffers.md`].
[`entry/src/entry.fbs`]: entry/src/entry.fbs
[`entry/src/entry_generated.rs`]: entry/src/entry_generated.rs
[`docs/regenerating_flatbuffers.md`]: docs/regenerating_flatbuffers.md
## Distributed Tracing
See [tracing.md](docs/tracing.md) for more information on the distributed tracing functionality within IOx

75
Cargo.lock generated
View File

@ -2,12 +2,6 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "Inflector"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3"
[[package]]
name = "RustyXML"
version = "0.3.0"
@ -49,12 +43,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "aliasable"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd"
[[package]]
name = "alloc-no-stdlib"
version = "2.0.3"
@ -972,23 +960,6 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d"
[[package]]
name = "entry"
version = "0.1.0"
dependencies = [
"bytes",
"chrono",
"data_types",
"flatbuffers",
"generated_types",
"influxdb_line_protocol",
"ouroboros",
"schema",
"snafu",
"time 0.1.0",
"trace",
]
[[package]]
name = "env_logger"
version = "0.8.4"
@ -1598,7 +1569,6 @@ dependencies = [
"datafusion 0.1.0",
"dirs",
"dotenv",
"entry",
"flate2",
"futures",
"generated_types",
@ -1621,7 +1591,6 @@ dependencies = [
"metric",
"metric_exporters",
"mutable_batch",
"mutable_batch_entry",
"mutable_batch_lp",
"mutable_batch_pb",
"mutable_buffer",
@ -2220,20 +2189,6 @@ dependencies = [
"trace",
]
[[package]]
name = "mutable_batch_entry"
version = "0.1.0"
dependencies = [
"arrow_util",
"data_types",
"entry",
"flatbuffers",
"hashbrown",
"mutable_batch",
"schema",
"snafu",
]
[[package]]
name = "mutable_batch_lp"
version = "0.1.0"
@ -2265,11 +2220,9 @@ version = "0.1.0"
dependencies = [
"bytes",
"criterion",
"entry",
"flate2",
"generated_types",
"mutable_batch",
"mutable_batch_entry",
"mutable_batch_lp",
"mutable_batch_pb",
"prost",
@ -2282,7 +2235,6 @@ dependencies = [
"arrow",
"arrow_util",
"data_types",
"entry",
"metric",
"mutable_batch",
"mutable_batch_lp",
@ -2663,30 +2615,6 @@ dependencies = [
"num-traits",
]
[[package]]
name = "ouroboros"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f357ef82d1b4db66fbed0b8d542cbd3c22d0bf5b393b3c257b9ba4568e70c9c3"
dependencies = [
"aliasable",
"ouroboros_macro",
"stable_deref_trait",
]
[[package]]
name = "ouroboros_macro"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44a0b52c2cbaef7dffa5fec1a43274afe8bd2a644fa9fc50a9ef4ff0269b1257"
dependencies = [
"Inflector",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "packers"
version = "0.1.0"
@ -3957,7 +3885,6 @@ dependencies = [
"criterion",
"data_types",
"datafusion 0.1.0",
"entry",
"flate2",
"influxdb_line_protocol",
"influxdb_tsm",
@ -5056,13 +4983,11 @@ dependencies = [
"async-trait",
"data_types",
"dotenv",
"entry",
"futures",
"generated_types",
"http",
"httparse",
"mutable_batch",
"mutable_batch_entry",
"mutable_batch_lp",
"mutable_batch_pb",
"observability_deps",

View File

@ -6,7 +6,6 @@ members = [
"data_types",
"datafusion",
"datafusion_util",
"entry",
"generated_types",
"grpc-router",
"grpc-router-test-gen",
@ -24,7 +23,6 @@ members = [
"metric",
"metric_exporters",
"mutable_batch",
"mutable_batch_entry",
"mutable_batch_lp",
"mutable_batch_pb",
"mutable_batch_tests",

View File

@ -48,17 +48,6 @@ RUN apt-key add /tmp/redpanda.gpg \
&& apt-get install -y redpanda \
&& rm -rf /var/lib/{apt,dpkg,cache,log}
# Install bazel using the installer script to enable building of flatc in the flatbuffers check
ENV BAZEL_VERSION=4.0.0
ENV BAZEL_DOWNLOAD_BASE="https://github.com/bazelbuild/bazel/releases/download"
RUN curl ${CURL_FLAGS} https://bazel.build/bazel-release.pub.gpg | gpg --import - \
&& curl ${CURL_FLAGS} -LO ${BAZEL_DOWNLOAD_BASE}/${BAZEL_VERSION}/bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh \
&& curl ${CURL_FLAGS} -LO ${BAZEL_DOWNLOAD_BASE}/${BAZEL_VERSION}/bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh.sig \
&& gpg --verify bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh.sig bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh \
&& chmod +x bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh \
&& ./bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh \
&& rm bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh.sig
# Install InfluxDB 2.0 OSS to enable integration tests of the influxdb2_client crate
ENV INFLUXDB2_VERSION=2.0.4
ENV INFLUXDB2_DOWNLOAD_BASE="https://dl.influxdata.com/influxdb/releases"

View File

@ -32,7 +32,6 @@ We hold monthly Tech Talks that explain the project's technical underpinnings. Y
* Thoughts on using multiple cores / thread pools: [multi_core_tasks.md](multi_core_tasks.md)
* [Query Engine Docs](../query/README.md)
* [Testing documentation](testing.md) for developers of IOx
* [Regenerating Flatbuffers code](regenerating_flatbuffers.md) when updating the version of the `flatbuffers` crate
* Protobuf tips and tricks: [Protobuf](protobuf.md).
* Catalog Persistence: [`catalog_persistence.md`](catalog_persistence.md).
* SQL command line tips and tricks: [SQL](sql.md).

View File

@ -1,31 +0,0 @@
# Regenerating Flatbuffers code
If you have changed some `*.fbs` files:
- Run `./entry/regenerate-flatbuffers.sh` to regenerate the corresponding Rust code.
- Run `cargo test` to make sure everything works as you would expect.
- Check in the changes to the generated code along with your changes to the `*.fbs` files.
- You should not need to edit the `entry/regenerate-flatbuffers.sh` script.
If you are updating the version of the `flatbuffers` crate in `Cargo.lock`, either because a new
patch release has come out that is compatible with the version range in `Cargo.toml` and you have
run `cargo update`, or because you've updated the version constraint in `Cargo.toml` to change to a
new major or minor version:
- The `flatbuffers` crate gets developed in sync with the `flatc` compiler in the same repo,
so when updating the `flatbuffers` crate we also need to update the `flatc` compiler we're
using.
- Go to https://github.com/google/flatbuffers/blame/master/rust/flatbuffers/Cargo.toml and find
the commit SHA where the `version` metadata was updated to the version of the `flatbuffers`
crate we now want to have in our `Cargo.lock`.
- In the `entry/regenerate-flatbuffers.sh` script, put that commit SHA in the `FB_COMMIT`
variable.
- Run `./entry/regenerate-flatbuffers.sh` to regenerate the corresponding Rust code.
- Run `cargo test` to make sure everything works as you would expect.
- Check in the changes to the generated code along with your changes to the `Cargo.lock` file,
`Cargo.toml` file if relevant, and the `entry/regenerate-flatbuffers.sh` script.
By default, the `entry/regenerate-flatbuffers.sh` script will run a Docker container that
uses the same image we use in CI that will have all the necessary dependencies. If you don't want
to use Docker, run this script with `INFLUXDB_IOX_INTEGRATION_LOCAL=1`, which will require you to
have `bazel` available. You can likely install `bazel` with your favourite package manager.

View File

@ -1,21 +0,0 @@
[package]
name = "entry"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2021"
description = "The entry format used by the write buffer"
[dependencies]
bytes = "1.0"
chrono = "0.4"
data_types = { path = "../data_types" }
# See docs/regenerating_flatbuffers.md about updating generated code when updating the
# version of the flatbuffers crate
flatbuffers = "2"
snafu = "0.6"
time = { path = "../time" }
trace = { path = "../trace" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
ouroboros = "0.13.0"
schema = { path = "../schema" }
generated_types = { path = "../generated_types" }

View File

@ -1,24 +0,0 @@
#!/bin/bash -eu
# Change to the generated_types crate directory, where this script is located
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
pushd $DIR
echo "Regenerating flatbuffers code..."
./regenerate-flatbuffers.sh
echo "Checking for uncommitted changes..."
if ! git diff --quiet HEAD --; then
echo "git diff found:"
git diff HEAD
echo "************************************************************"
echo "* Found uncommitted changes to generated flatbuffers code! *"
echo "* Please run \`entry/regenerate-flatbuffers.sh\` *"
echo "* to regenerate the flatbuffers code and check it in! *"
echo "************************************************************"
exit 1
else
echo "No uncommitted changes; this is fine."
fi

View File

@ -1,99 +0,0 @@
#!/bin/bash -e
# Instructions
#
# If you have changed some `*.fbs` files:
#
# - Run this script to regenerate the corresponding Rust code.
# - Run `cargo test` to make sure everything works as you would expect.
# - Check in the changes to the generated code along with your changes to the `*.fbs` files.
# - You should not need to edit this script.
#
# If you are updating the version of the `flatbuffers` crate in `Cargo.lock`:
#
# - The `flatbuffers` crate gets developed in sync with the `flatc` compiler in the same repo,
# so when updating the `flatbuffers` crate we also need to update the `flatc` compiler we're
# using.
# - Go to https://github.com/google/flatbuffers/blame/master/rust/flatbuffers/Cargo.toml and find
# the commit SHA where the `version` metadata was updated to the version of the `flatbuffers`
# crate we now want to have in our `Cargo.lock`.
# - Put that commit SHA in this variable:
FB_COMMIT="a9a295fecf3fbd5a4f571f53b01f63202a3e2113"
# - Run this script to regenerate the corresponding Rust code.
# - Run `cargo test` to make sure everything works as you would expect.
# - Check in the changes to the generated code along with your changes to the `Cargo.lock` file and
# this script.
# By default, this script will run a Docker container that uses the same image we use in CI that
# will have all the necessary dependencies. If you don't want to run Docker, run this script with
# INFLUXDB_IOX_INTEGRATION_LOCAL=1.
if [ -z "${INFLUXDB_IOX_INTEGRATION_LOCAL}" ]; then
echo "Running in Docker..."
CI_IMAGE=quay.io/influxdb/rust:ci
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
pushd "$DIR"
DOCKER_IOX_DIR=/home/rust/influxdb_iox
docker rm --force flatc || true
docker pull ${CI_IMAGE} || true
docker run \
-it \
--detach \
--name=flatc \
--volume "${DIR}/..:${DOCKER_IOX_DIR}" \
${CI_IMAGE}
docker exec -e INFLUXDB_IOX_INTEGRATION_LOCAL=1 flatc .${DOCKER_IOX_DIR}/entry/regenerate-flatbuffers.sh
docker rm --force flatc || true
else
echo "Running locally..."
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
pushd "$DIR"
echo "Building flatc from source ..."
FB_URL="https://github.com/google/flatbuffers"
FB_DIR=".flatbuffers"
FLATC="$FB_DIR/bazel-bin/flatc"
if [ -z "$(which bazel)" ]; then
echo "bazel is required to build flatc"
exit 1
fi
echo "Bazel version: $(bazel version | head -1 | awk -F':' '{print $2}')"
if [ ! -e $FB_DIR ]; then
echo "git clone $FB_URL ..."
git clone -b master --no-tag $FB_URL $FB_DIR
else
echo "git pull $FB_URL ..."
git -C $FB_DIR pull --ff-only
fi
echo "hard reset to $FB_COMMIT"
git -C $FB_DIR reset --hard $FB_COMMIT
pushd $FB_DIR
echo "run: bazel build :flatc ..."
bazel build :flatc
popd
RUST_DIR="$DIR/src"
while read -r FBS_FILE; do
echo "Compiling ${FBS_FILE}"
$FLATC --rust -o "$RUST_DIR" "$FBS_FILE"
done < <(git ls-files "$DIR"/*.fbs)
cargo fmt
popd
echo "DONE! Please run 'cargo test' and check in any changes."
fi

View File

@ -1,113 +0,0 @@
namespace influxdata.iox.write.v1;
// Every modification to a database is represented as an entry. These can be forwarded
// on to other IOx servers or to the write buffer.
// An entry can only be one of these Operation types
union Operation {
write: WriteOperations,
delete: DeleteOperations,
}
table WriteOperations {
// A collection of partition writes. A given partition will have at most one
// write in this collection.
partition_writes: [PartitionWrite];
}
table DeleteOperations {
// A collection of deletes. Each delete targets a single table, with each table
// having no more than one delete. Deletes can span partitions because they
// only have a predicate and do not target any specific partition.
deletes: [Delete];
}
table Entry {
operation: Operation;
}
// A write to a partition. If the IOx server creating this PartitionWrite has
// no rules for generating partition keys, the key will be null, representing
// the empty string.
table PartitionWrite {
key: string;
table_batches: [TableWriteBatch];
}
// A delete from a single table with a predicate. Deletes can span partitions since
// they're concerned with data that has already been written. Partitioning is a way
// to split up writes as they land.
table Delete {
table_name: string;
predicate: string;
}
// A collection of rows in a table in column oriented representation
table TableWriteBatch {
name: string;
// every column must have the same number of bytes in its null_mask. They also must
// have the same number of rows n such that for each column c:
// c.values().len() + count_ones(null_mask) = n
columns: [Column];
}
enum LogicalColumnType : byte { IOx, Tag, Field, Time }
union ColumnValues {
I64Values,
F64Values,
U64Values,
StringValues,
BoolValues,
BytesValues,
}
table Column {
name: string;
// this keeps a mapping of what kind of InfluxDB or IOx type this column came from
logical_column_type: LogicalColumnType;
// the set of non-null values for this column. Their position in this array does not
// map to their row position in the batch. The bitmask must be used to map the row
// position to the position in this array.
values: ColumnValues;
// mask that maps the position to if that value is null. Null positions will
// not have a value represented in the values array. To read the values out of the
// column requires traversing the mask to determine what position in the values
// array that index is located in. Here's what it might look like:
// position: 0 8 9 24
// bit: 00100011 00111000 00000001
// An on bit (1) indicates that the value at that position is null. If there are
// no null values in the column, the null_mask is omitted from the flatbuffers.
null_mask: [ubyte];
}
table I64Values {
values: [int64];
}
table F64Values {
values: [float64];
}
table U64Values {
values: [uint64];
}
table StringValues {
values: [string];
}
table BoolValues {
values: [bool];
}
table BytesValues {
values: [BytesValue];
}
table BytesValue {
data: [ubyte];
}
root_type Entry;
file_identifier "IOxE";

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,9 +0,0 @@
mod entry;
#[allow(unused_imports, clippy::needless_borrow)]
mod entry_generated;
pub use crate::entry::*;
/// Generated Flatbuffers code for replicating and writing data between IOx
/// servers
pub use entry_generated::influxdata::iox::write::v_1 as entry_fb;

View File

@ -9,7 +9,6 @@ default-run = "influxdb_iox"
# Workspace dependencies, in alphabetical order
datafusion = { path = "../datafusion" }
data_types = { path = "../data_types" }
entry = { path = "../entry" }
generated_types = { path = "../generated_types" }
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
@ -20,7 +19,6 @@ logfmt = { path = "../logfmt" }
metric = { path = "../metric" }
metric_exporters = { path = "../metric_exporters" }
mutable_batch = { path = "../mutable_batch" }
mutable_batch_entry = { path = "../mutable_batch_entry" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
mutable_buffer = { path = "../mutable_buffer" }
@ -96,7 +94,6 @@ heappy = { git = "https://github.com/mkmik/heappy", rev = "20aa466524ac9ce34a4ba
[dev-dependencies]
# Workspace dependencies, in alphabetical order
arrow_util = { path = "../arrow_util" }
entry = { path = "../entry" }
influxdb2_client = { path = "../influxdb2_client" }
influxdb_storage_client = { path = "../influxdb_storage_client" }
test_helpers = { path = "../test_helpers" }

View File

@ -1,17 +0,0 @@
[package]
name = "mutable_batch_entry"
version = "0.1.0"
edition = "2018"
description = "Conversion logic for entry flatbuffer <-> MutableBatch"
[dependencies]
entry = { path = "../entry" }
hashbrown = "0.11"
mutable_batch = { path = "../mutable_batch" }
schema = { path = "../schema" }
snafu = "0.6"
[dev-dependencies]
arrow_util = { path = "../arrow_util" }
data_types = { path = "../data_types" }
flatbuffers = "2"

View File

@ -1,375 +0,0 @@
//! Code to convert entry to [`MutableBatch`]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
use entry::{Column as EntryColumn, Entry, SequencedEntry, TableBatch};
use hashbrown::{HashMap, HashSet};
use mutable_batch::{
payload::{DbWrite, WriteMeta},
writer::Writer,
MutableBatch,
};
use schema::{InfluxColumnType, InfluxFieldType, TIME_COLUMN_NAME};
use snafu::{ensure, ResultExt, Snafu};
/// Error type for entry conversion
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display(
"Invalid null mask, expected to be {} bytes but was {}",
expected_bytes,
actual_bytes
))]
InvalidNullMask {
expected_bytes: usize,
actual_bytes: usize,
},
#[snafu(display("duplicate column name: {}", column))]
DuplicateColumnName { column: String },
#[snafu(display("table batch must contain time column"))]
MissingTime,
#[snafu(display("time column must not contain nulls"))]
NullTime,
#[snafu(display("entry contained empty table batch"))]
EmptyTableBatch,
#[snafu(display("error writing column {}: {}", column, source))]
Write {
source: mutable_batch::writer::Error,
column: String,
},
}
/// Result type for entry conversion
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Converts a [`SequencedEntry`] to a [`DbWrite`]
pub fn sequenced_entry_to_write(entry: &SequencedEntry) -> Result<DbWrite> {
let sequence = entry.sequence().cloned().expect("entry must be sequenced");
let timestamp = entry
.producer_wallclock_timestamp()
.expect("entry must be timestamped");
let meta = WriteMeta::sequenced(
sequence,
timestamp,
entry.span_context().cloned(),
entry.entry().data().len(),
);
let tables = entry_to_batches(entry.entry())?;
Ok(DbWrite::new(tables, meta))
}
/// Converts an [`Entry`] to a collection of [`MutableBatch`] keyed by table name
///
/// Note: this flattens partitioning
pub fn entry_to_batches(entry: &Entry) -> Result<HashMap<String, MutableBatch>> {
let mut batches = HashMap::new();
for partition_write in entry.partition_writes_iter() {
for table_batch in partition_write.table_batches_iter() {
let table_name = table_batch.name();
let (_, batch) = batches
.raw_entry_mut()
.from_key(table_name)
.or_insert_with(|| (table_name.to_string(), MutableBatch::new()));
write_table_batch(batch, table_batch)?;
}
}
Ok(batches)
}
/// Writes the provided [`TableBatch`] to a [`MutableBatch`] on error any changes made
/// to `batch` are reverted
fn write_table_batch(batch: &mut MutableBatch, table_batch: TableBatch<'_>) -> Result<()> {
let row_count = table_batch.row_count();
ensure!(row_count != 0, EmptyTableBatch);
let columns = table_batch.columns();
let mut column_names = HashSet::with_capacity(columns.len());
for column in &columns {
ensure!(
column_names.insert(column.name()),
DuplicateColumnName {
column: column.name()
}
);
}
// Batch must contain a time column
ensure!(column_names.contains(TIME_COLUMN_NAME), MissingTime);
let mut writer = Writer::new(batch, row_count);
for column in &columns {
let valid_mask = construct_valid_mask(column)?;
let valid_mask = valid_mask.as_deref();
let inner = column.inner();
match column.influx_type() {
InfluxColumnType::Field(InfluxFieldType::Float) => {
let values = inner
.values_as_f64values()
.unwrap()
.values()
.into_iter()
.flatten();
writer.write_f64(column.name(), valid_mask, values)
}
InfluxColumnType::Field(InfluxFieldType::Integer) => {
let values = inner
.values_as_i64values()
.unwrap()
.values()
.into_iter()
.flatten();
writer.write_i64(column.name(), valid_mask, values)
}
InfluxColumnType::Field(InfluxFieldType::UInteger) => {
let values = inner
.values_as_u64values()
.unwrap()
.values()
.into_iter()
.flatten();
writer.write_u64(column.name(), valid_mask, values)
}
InfluxColumnType::Tag => {
let values = inner
.values_as_string_values()
.unwrap()
.values()
.into_iter()
.flatten();
writer.write_tag(column.name(), valid_mask, values)
}
InfluxColumnType::Field(InfluxFieldType::String) => {
let values = inner
.values_as_string_values()
.unwrap()
.values()
.into_iter()
.flatten();
writer.write_string(column.name(), valid_mask, values)
}
InfluxColumnType::Field(InfluxFieldType::Boolean) => {
let values = inner
.values_as_bool_values()
.unwrap()
.values()
.into_iter()
.flatten()
.cloned();
writer.write_bool(column.name(), valid_mask, values)
}
InfluxColumnType::Timestamp => {
if valid_mask.is_some() {
return Err(Error::NullTime);
}
let values = inner
.values_as_i64values()
.unwrap()
.values()
.into_iter()
.flatten();
writer.write_time(column.name(), values)
}
InfluxColumnType::IOx(_) => unimplemented!(),
}
.context(Write {
column: column.name(),
})?;
}
writer.commit();
Ok(())
}
/// Construct a validity mask from the given column's null mask
fn construct_valid_mask(column: &EntryColumn<'_>) -> Result<Option<Vec<u8>>> {
let buf_len = (column.row_count + 7) >> 3;
let data = match column.inner().null_mask() {
Some(data) => data,
None => return Ok(None),
};
if data.iter().all(|x| *x == 0) {
return Ok(None);
}
ensure!(
data.len() == buf_len,
InvalidNullMask {
expected_bytes: buf_len,
actual_bytes: data.len()
}
);
Ok(Some(
data.iter()
.map(|x| {
// Currently the bit mask is backwards
!x.reverse_bits()
})
.collect(),
))
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_util::assert_batches_eq;
use data_types::database_rules::{PartitionTemplate, TemplatePart};
use entry::test_helpers::{lp_to_entries, lp_to_entry};
use entry::Entry;
use schema::selection::Selection;
fn first_batch(entry: &Entry) -> TableBatch<'_> {
entry.table_batches().next().unwrap().1
}
#[test]
fn test_basic() {
let mut batch = MutableBatch::new();
let lp = "foo,t1=asdf iv=1i,uv=1u,fv=1.0,bv=true,sv=\"hi\" 1";
let entry = lp_to_entry(lp);
write_table_batch(&mut batch, first_batch(&entry)).unwrap();
let expected = &[
"+------+----+----+----+------+--------------------------------+----+",
"| bv | fv | iv | sv | t1 | time | uv |",
"+------+----+----+----+------+--------------------------------+----+",
"| true | 1 | 1 | hi | asdf | 1970-01-01T00:00:00.000000001Z | 1 |",
"+------+----+----+----+------+--------------------------------+----+",
];
assert_batches_eq!(expected, &[batch.to_arrow(Selection::All).unwrap()]);
let lp = "foo t1=\"string\" 1";
let entry = lp_to_entry(lp);
let err = write_table_batch(&mut batch, first_batch(&entry)).unwrap_err();
assert_eq!(err.to_string(), "error writing column t1: Unable to insert iox::column_type::field::string type into a column of iox::column_type::tag");
let lp = "foo iv=1u 1";
let entry = lp_to_entry(lp);
let err = write_table_batch(&mut batch, first_batch(&entry)).unwrap_err();
assert_eq!(err.to_string(), "error writing column iv: Unable to insert iox::column_type::field::uinteger type into a column of iox::column_type::field::integer");
let lp = "foo fv=1i 1";
let entry = lp_to_entry(lp);
let err = write_table_batch(&mut batch, first_batch(&entry)).unwrap_err();
assert_eq!(err.to_string(), "error writing column fv: Unable to insert iox::column_type::field::integer type into a column of iox::column_type::field::float");
let lp = "foo bv=1 1";
let entry = lp_to_entry(lp);
let err = write_table_batch(&mut batch, first_batch(&entry)).unwrap_err();
assert_eq!(err.to_string(), "error writing column bv: Unable to insert iox::column_type::field::float type into a column of iox::column_type::field::boolean");
let lp = "foo sv=true 1";
let entry = lp_to_entry(lp);
let err = write_table_batch(&mut batch, first_batch(&entry)).unwrap_err();
assert_eq!(err.to_string(), "error writing column sv: Unable to insert iox::column_type::field::boolean type into a column of iox::column_type::field::string");
let lp = "foo,sv=\"bar\" f=3i 1";
let entry = lp_to_entry(lp);
let err = write_table_batch(&mut batch, first_batch(&entry)).unwrap_err();
assert_eq!(err.to_string(), "error writing column sv: Unable to insert iox::column_type::tag type into a column of iox::column_type::field::string");
assert_batches_eq!(expected, &[batch.to_arrow(Selection::All).unwrap()]);
let lp = r#"
foo,t1=v1 fv=3.0,bv=false 2
foo,t1=v3 fv=3.0,bv=false 2
foo,t2=v6 bv=true,iv=3i 3
"#;
let entry = lp_to_entry(lp);
write_table_batch(&mut batch, first_batch(&entry)).unwrap();
assert_batches_eq!(
&[
"+-------+----+----+----+------+----+--------------------------------+----+",
"| bv | fv | iv | sv | t1 | t2 | time | uv |",
"+-------+----+----+----+------+----+--------------------------------+----+",
"| true | 1 | 1 | hi | asdf | | 1970-01-01T00:00:00.000000001Z | 1 |",
"| false | 3 | | | v1 | | 1970-01-01T00:00:00.000000002Z | |",
"| false | 3 | | | v3 | | 1970-01-01T00:00:00.000000002Z | |",
"| true | | 3 | | | v6 | 1970-01-01T00:00:00.000000003Z | |",
"+-------+----+----+----+------+----+--------------------------------+----+",
],
&[batch.to_arrow(Selection::All).unwrap()]
);
}
#[test]
fn test_entry_to_batches() {
let lp = r#"
cpu,part=a,tag1=v1 fv=3.0,sv="32" 1
cpu,part=a,tag1=v1 fv=3.0 2
cpu,part=a,tag2=v1 iv=3 1
cpu,part=a,tag2=v1 iv=3 2
mem,part=a,tag1=v2 v=2 1
mem,part=a,tag1=v2 v=2,b=true 2
mem,part=b,tag1=v2 v=2 2
"#;
let entries = lp_to_entries(
lp,
&PartitionTemplate {
parts: vec![TemplatePart::Column("part".to_string())],
},
);
assert_eq!(entries.len(), 1);
let entry = entries.into_iter().next().unwrap();
assert_eq!(entry.table_batches().count(), 3);
// Should flatten partitioning
let batches = entry_to_batches(&entry).unwrap();
assert_eq!(batches.len(), 2);
assert_batches_eq!(
&[
"+----+----+------+----+------+------+--------------------------------+",
"| fv | iv | part | sv | tag1 | tag2 | time |",
"+----+----+------+----+------+------+--------------------------------+",
"| 3 | | a | 32 | v1 | | 1970-01-01T00:00:00.000000001Z |",
"| 3 | | a | | v1 | | 1970-01-01T00:00:00.000000002Z |",
"| | 3 | a | | | v1 | 1970-01-01T00:00:00.000000001Z |",
"| | 3 | a | | | v1 | 1970-01-01T00:00:00.000000002Z |",
"+----+----+------+----+------+------+--------------------------------+",
],
&[batches["cpu"].to_arrow(Selection::All).unwrap()]
);
assert_batches_eq!(
&[
"+------+------+------+--------------------------------+---+",
"| b | part | tag1 | time | v |",
"+------+------+------+--------------------------------+---+",
"| | a | v2 | 1970-01-01T00:00:00.000000001Z | 2 |",
"| true | a | v2 | 1970-01-01T00:00:00.000000002Z | 2 |",
"| | b | v2 | 1970-01-01T00:00:00.000000002Z | 2 |",
"+------+------+------+--------------------------------+---+",
],
&[batches["mem"].to_arrow(Selection::All).unwrap()]
);
}
}

View File

@ -5,11 +5,9 @@ edition = "2021"
description = "MutableBatch integration tests and benchmarks"
[dependencies]
entry = { path = "../entry" }
flate2 = "1.0"
generated_types = { path = "../generated_types" }
mutable_batch = { path = "../mutable_batch" }
mutable_batch_entry = { path = "../mutable_batch_entry" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
prost = "0.8"
@ -22,10 +20,6 @@ criterion = "0.3"
name = "write_lp"
harness = false
[[bench]]
name = "write_entry"
harness = false
[[bench]]
name = "write_pb"

View File

@ -1,38 +0,0 @@
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use entry::Entry;
use mutable_batch_entry::entry_to_batches;
use mutable_batch_tests::benchmark_lp;
fn generate_entry_bytes() -> Vec<(String, (usize, Bytes))> {
benchmark_lp()
.into_iter()
.map(|(bench, lp)| {
(
bench,
(lp.len(), entry::test_helpers::lp_to_entry(&lp).into()),
)
})
.collect()
}
pub fn write_entry(c: &mut Criterion) {
let mut group = c.benchmark_group("write_entry");
for (bench, (lp_bytes, entry_bytes)) in generate_entry_bytes() {
group.throughput(Throughput::Bytes(lp_bytes as u64));
group.bench_function(BenchmarkId::from_parameter(bench), |b| {
b.iter(|| {
let entry: Entry = entry_bytes.clone().try_into().unwrap();
let batches = entry_to_batches(&entry).unwrap();
assert_eq!(batches.len(), 1);
});
});
}
group.finish();
}
criterion_group!(benches, write_entry);
criterion_main!(benches);

View File

@ -7,7 +7,6 @@ edition = "2021"
[dependencies] # In alphabetical order
arrow = { version = "6.0", features = ["prettyprint"] }
data_types = { path = "../data_types" }
entry = { path = "../entry" }
schema = { path = "../schema" }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch" }

View File

@ -1,26 +0,0 @@
#!/bin/bash -eu
# Change to the query_tests crate directory, where this script is located
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
pushd $DIR
echo "Regenerating query_tests..."
(cd generate && cargo run)
echo "Checking for uncommitted changes..."
if ! git diff --quiet HEAD --; then
echo "git diff found:"
git diff HEAD
echo "************************************************************"
echo "* Found uncommitted changes to generated flatbuffers code! *"
echo "* Please do:"
echo "* cd query_tests/generate"
echo "* cargo run"
echo "* to regenerate the query_tests code and check it in! *"
echo "************************************************************"
exit 1
else
echo "No uncommitted changes; everything is awesome."
fi

View File

@ -12,7 +12,6 @@ description = "Server related bechmarks, grouped into their own crate to minimiz
[dev-dependencies] # In alphabetical order
arrow_util = { path = "../arrow_util" }
entry = { path = "../entry" }
criterion = { version = "0.3.4", features = ["async_tokio"] }
chrono = { version = "0.4", features = ["serde"] }
datafusion = { path = "../datafusion" }

View File

@ -7,13 +7,11 @@ edition = "2021"
async-trait = "0.1"
data_types = { path = "../data_types" }
dotenv = "0.15.0"
entry = { path = "../entry" }
futures = "0.3"
generated_types = { path = "../generated_types" }
http = "0.2"
httparse = "1.5"
mutable_batch = { path = "../mutable_batch" }
mutable_batch_entry = { path = "../mutable_batch_entry" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
observability_deps = { path = "../observability_deps" }

View File

@ -7,11 +7,9 @@ use http::{HeaderMap, HeaderValue};
use prost::Message;
use data_types::sequence::Sequence;
use entry::{Entry, SequencedEntry};
use generated_types::influxdata::iox::write_buffer::v1::write_buffer_payload::Payload;
use generated_types::influxdata::iox::write_buffer::v1::WriteBufferPayload;
use mutable_batch::{DbWrite, WriteMeta};
use mutable_batch_entry::sequenced_entry_to_write;
use mutable_batch_pb::decode::decode_database_batch;
use time::Time;
use trace::ctx::SpanContext;
@ -20,16 +18,6 @@ use trace_http::ctx::{format_jaeger_trace_context, TraceHeaderParser};
use crate::core::WriteBufferError;
/// Current flatbuffer-based content type.
///
/// This is a value for [`HEADER_CONTENT_TYPE`].
///
/// Inspired by:
/// - <https://stackoverflow.com/a/56502135>
/// - <https://stackoverflow.com/a/48051331>
pub const CONTENT_TYPE_FLATBUFFER: &str =
r#"application/x-flatbuffers; schema="influxdata.iox.write.v1.Entry""#;
/// Pbdata based content type
pub const CONTENT_TYPE_PROTOBUF: &str =
r#"application/x-protobuf; schema="influxdata.iox.write_buffer.v1.WriteBufferPayload""#;
@ -42,7 +30,6 @@ pub const HEADER_TRACE_CONTEXT: &str = "uber-trace-id";
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ContentType {
Entry,
Protobuf,
}
@ -62,29 +49,20 @@ impl IoxHeaders {
}
}
/// Create new headers where all information is missing.
fn empty() -> Self {
Self {
// Fallback for now https://github.com/influxdata/influxdb_iox/issues/2805
content_type: ContentType::Entry,
span_context: None,
}
}
/// Creates a new IoxHeaders from an iterator of headers
pub fn from_headers(
headers: impl IntoIterator<Item = (impl AsRef<str>, impl AsRef<[u8]>)>,
trace_collector: Option<&Arc<dyn TraceCollector>>,
) -> Result<Self, WriteBufferError> {
let mut res = Self::empty();
let mut span_context = None;
let mut content_type = None;
for (name, value) in headers {
let name = name.as_ref();
if name.eq_ignore_ascii_case(HEADER_CONTENT_TYPE) {
res.content_type = match std::str::from_utf8(value.as_ref()) {
Ok(CONTENT_TYPE_FLATBUFFER) => ContentType::Entry,
Ok(CONTENT_TYPE_PROTOBUF) => ContentType::Protobuf,
content_type = match std::str::from_utf8(value.as_ref()) {
Ok(CONTENT_TYPE_PROTOBUF) => Some(ContentType::Protobuf),
Ok(c) => return Err(format!("Unknown message format: {}", c).into()),
Err(e) => {
return Err(format!("Error decoding content type header: {}", e).into())
@ -101,7 +79,7 @@ impl IoxHeaders {
let parser = TraceHeaderParser::new()
.with_jaeger_trace_context_header_name(HEADER_TRACE_CONTEXT);
res.span_context = match parser.parse(trace_collector, &headers) {
span_context = match parser.parse(trace_collector, &headers) {
Ok(ctx) => ctx,
Err(e) => {
return Err(format!("Error decoding trace context: {}", e).into())
@ -112,7 +90,10 @@ impl IoxHeaders {
}
}
Ok(res)
Ok(Self {
content_type: content_type.ok_or_else(|| "No content type header".to_string())?,
span_context,
})
}
/// Gets the content type
@ -128,7 +109,6 @@ impl IoxHeaders {
/// Returns the header map to encode
pub fn headers(&self) -> impl Iterator<Item = (&str, Cow<'static, str>)> + '_ {
let content_type = match self.content_type {
ContentType::Entry => CONTENT_TYPE_FLATBUFFER.into(),
ContentType::Protobuf => CONTENT_TYPE_PROTOBUF.into(),
};
@ -154,16 +134,6 @@ pub fn decode(
producer_ts: Time,
) -> Result<DbWrite, WriteBufferError> {
match headers.content_type {
ContentType::Entry => {
let entry = Entry::try_from(data.to_vec())?;
let entry = SequencedEntry::new_from_sequence_and_span_context(
sequence,
producer_ts,
entry,
headers.span_context,
);
sequenced_entry_to_write(&entry).map_err(|e| Box::new(e) as WriteBufferError)
}
ContentType::Protobuf => {
let payload: WriteBufferPayload = prost::Message::decode(data)
.map_err(|e| format!("failed to decode WriteBufferPayload: {}", e))?;
@ -209,7 +179,7 @@ mod tests {
let span_context_parent = SpanContext::new(Arc::clone(&collector));
let span_context = span_context_parent.child("foo").ctx;
let iox_headers1 = IoxHeaders::new(ContentType::Entry, Some(span_context));
let iox_headers1 = IoxHeaders::new(ContentType::Protobuf, Some(span_context));
let encoded: Vec<_> = iox_headers1
.headers()

View File

@ -587,7 +587,6 @@ mod tests {
sync::atomic::{AtomicU32, Ordering},
};
use entry::test_helpers::lp_to_entry;
use time::TimeProvider;
use trace::{RingBufferTraceCollector, TraceCollector};
@ -720,10 +719,17 @@ mod tests {
let adapter = KafkaTestAdapter::new(conn);
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
let headers = IoxHeaders::new(ContentType::Protobuf, None);
let mut owned_headers = OwnedHeaders::new();
for (name, value) in headers.headers() {
owned_headers = owned_headers.add(name, value.as_bytes());
}
let writer = ctx.writing(true).await.unwrap();
let partition = set_pop_first(&mut writer.sequencer_ids()).unwrap() as i32;
let record: FutureRecord<'_, String, [u8]> =
FutureRecord::to(&writer.database_name).partition(partition);
let record: FutureRecord<'_, String, [u8]> = FutureRecord::to(&writer.database_name)
.partition(partition)
.headers(owned_headers);
writer.producer.send(record, Timeout::Never).await.unwrap();
let mut reader = ctx.reading(true).await.unwrap();
@ -743,9 +749,8 @@ mod tests {
let writer = ctx.writing(true).await.unwrap();
let partition = set_pop_first(&mut writer.sequencer_ids()).unwrap() as i32;
let entry = lp_to_entry("upc,region=east user=1 100");
let record: FutureRecord<'_, String, _> = FutureRecord::to(&writer.database_name)
.payload(entry.data())
.payload(&[0])
.partition(partition);
writer.producer.send(record, Timeout::Never).await.unwrap();
@ -753,7 +758,8 @@ mod tests {
let mut streams = reader.streams();
assert_eq!(streams.len(), 1);
let (_sequencer_id, mut stream) = map_pop_first(&mut streams).unwrap();
stream.stream.next().await.unwrap().unwrap();
let err = stream.stream.next().await.unwrap().unwrap_err();
assert_eq!(err.to_string(), "No content type header");
}
#[tokio::test]
@ -764,9 +770,8 @@ mod tests {
let writer = ctx.writing(true).await.unwrap();
let partition = set_pop_first(&mut writer.sequencer_ids()).unwrap() as i32;
let entry = lp_to_entry("upc,region=east user=1 100");
let record: FutureRecord<'_, String, _> = FutureRecord::to(&writer.database_name)
.payload(entry.data())
.payload(&[0])
.partition(partition)
.headers(OwnedHeaders::new().add(HEADER_CONTENT_TYPE, "foo"));
writer.producer.send(record, Timeout::Never).await.unwrap();