chore: merge main to branch

pull/24376/head
Nga Tran 2021-11-08 09:28:29 -05:00
commit abbfafcabd
68 changed files with 1518 additions and 6680 deletions

View File

@ -35,6 +35,10 @@ ignore = [
# upstream issue: https://github.com/apache/arrow-rs/issues/774
"RUSTSEC-2021-0118",
# title: Generated code can read and write out of bounds in safe code
# why needed: part of `arrow`
# upstream issue: https://github.com/google/flatbuffers/issues/6627
"RUSTSEC-2021-0122",
]
[output]

View File

@ -7,20 +7,25 @@
# pushed to `quay.io/influxdb/rust:ci`. This build image is then used to run
# the CI tasks for the day.
#
# CI runs for git branches ending in `/perf`:
# Every commit:
#
# Runs tests, fmt, & lints and then compiles binaries using the "release"
# cargo target and pushes a container with the binary to
# `quay.io/influxdb/fusion` (see perf_image below).
# The CI for every PR and merge to main runs tests, fmt, lints and compiles debug binaries
#
# CI for all other branches:
# On main if all these checks pass it will then additionally compile in "release" mode and
# publish a docker image to quay.io/influxdb/iox:$COMMIT_SHA
#
# Manual CI Image:
#
# It is possible to manually trigger a rebuild of the image used in CI. To do this, navigate to
# https://app.circleci.com/pipelines/github/influxdata/influxdb_iox?branch=main (overriding the
# branch name if desired). Then:
# - Click "Run Pipeline" in the top-right
# - Expand "Add Parameters"
# - Add a "boolean" parameter called "ci_image" with the value true
# - Click "Run Pipeline"
#
# If you refresh the page you should see a newly running ci_image workflow
#
# - cargo build with the default cargo profile ("dev")
# - cargo test
# - cargo fmt
# - clippy (with warnings denied)
# - lint protobufs
# - check if generated flatbuffers code is up to date
version: 2.1
@ -305,34 +310,18 @@ jobs:
if wget -O - https://api.github.com/repos/influxdata/influxdb_iox/issues/$(echo $CIRCLE_PULL_REQUEST | grep -oE "[^/pull]+$") | grep "$SKIP_LABEL" ; then echo "SKIPPING (FOUND LABEL)" && exit ; else echo "CHECKING (NO LABEL FOUND)"; fi
git fetch origin main
# compare against only changes in this branch (not against
# other stuff that may have been added to master since last merge)
# other stuff that may have been added to main since last merge)
MERGE_BASE=$(git merge-base origin/main $CIRCLE_BRANCH) sh -c 'buf breaking --against ".git#ref=$MERGE_BASE"'
# Check that any generated files are is up-to-date with the changes in this PR.
# named "check-flatbuffers" because that name is hardcoded into github checks
check-flatbuffers:
docker:
- image: quay.io/influxdb/rust:ci
resource_class: xlarge # use of a smaller executor tends crashes on link
steps:
- checkout
- rust_components # Regenerating flatbuffers uses rustfmt
- run:
name: Check Query Tests
command: ./query_tests/check-generated.sh
- run:
name: Check Flatbuffers
command: INFLUXDB_IOX_INTEGRATION_LOCAL=1 ./entry/check-flatbuffers.sh
# Compile a cargo "release" profile binary for branches that end in `/perf`
#
# Uses the latest ci_image (influxdb/rust below) to build a release binary and
# copies it to a minimal container image based upon `rust:slim-buster`. This
# minimal image is then pushed to `quay.io/influxdb/fusion:${BRANCH}` with '/'
# minimal image is then pushed to `quay.io/influxdb/iox:${BRANCH}` with '/'
# repaced by '.' - as an example:
#
# git branch: dom/my-awesome-feature/perf
# container: quay.io/influxdb/fusion:dom.my-awesome-feature.perf
# container: quay.io/influxdb/iox:dom.my-awesome-feature.perf
#
# Subsequent CI runs will overwrite the tag if you push more changes, so watch
# out for parallel CI runs!
@ -376,7 +365,7 @@ jobs:
sudo apt-get update
sudo apt-get install -y docker.io
- run: |
echo "$QUAY_PASS" | docker login quay.io --username $QUAY_USER --password-stdin
echo "$QUAY_INFLUXDB_IOX_PASS" | docker login quay.io --username $QUAY_INFLUXDB_IOX_USER --password-stdin
- run:
# Docker has functionality to support per-Dockerfile .dockerignore
# This was added in https://github.com/moby/buildkit/pull/901
@ -390,8 +379,8 @@ jobs:
echo sha256sum after build is
sha256sum target/release/influxdb_iox
COMMIT_SHA=$(git rev-parse --short HEAD)
docker build -t quay.io/influxdb/fusion:$COMMIT_SHA -f docker/Dockerfile.iox .
docker push quay.io/influxdb/fusion:$COMMIT_SHA
docker build -t quay.io/influxdb/iox:$COMMIT_SHA -f docker/Dockerfile.iox .
docker push quay.io/influxdb/iox:$COMMIT_SHA
echo "export COMMIT_SHA=${COMMIT_SHA}" >> $BASH_ENV
- run:
name: Deploy tags
@ -419,11 +408,21 @@ jobs:
docker push quay.io/influxdb/rust:$COMMIT_SHA
docker push quay.io/influxdb/rust:ci
parameters:
# Trigger build of CI image
ci_image:
type: boolean
default: false
workflows:
version: 2
# CI for all pull requests.
ci:
when:
not: << pipeline.parameters.ci_image >>
jobs:
- fmt
- lint
@ -435,34 +434,7 @@ workflows:
- test_kafka_integration
- test_influxdb2_client
- build
- check-flatbuffers
- doc
# Internal pipeline for perf builds.
#
# Filter ensures this only runs for git branches ending in `/perf`.
perf_build:
jobs:
- fmt:
filters:
branches:
only: main
- lint:
filters:
branches:
only: main
- cargo_audit:
filters:
branches:
only: main
- test:
filters:
branches:
only: main
- build:
filters:
branches:
only: main
- perf_image:
filters:
branches:
@ -471,11 +443,23 @@ workflows:
- fmt
- lint
- cargo_audit
- protobuf-lint
- test
- test_heappy
- test_perf
- test_kafka_integration
- test_influxdb2_client
- build
- doc
# Manual build of CI image
ci_image:
when: << pipeline.parameters.ci_image >>
jobs:
- ci_image
# Nightly rebuild of the build container
ci_image:
ci_image_nightly:
triggers:
- schedule:
cron: "0 5 * * *"

View File

@ -20,7 +20,7 @@
set -euo pipefail
DOCKER_IMAGE_TAG=${1}
DOCKER_IMAGE="quay.io/influxdb/fusion"
DOCKER_IMAGE="quay.io/influxdb/iox"
APP_NAME="IOx"
DOCKER_IMAGE_DIGEST="$(docker image inspect "${DOCKER_IMAGE}:${DOCKER_IMAGE_TAG}" --format '{{ if eq (len .RepoDigests) 1 }}{{index .RepoDigests 0}}{{ end }}')"

View File

@ -224,23 +224,6 @@ cargo clippy --all-targets --workspace -- -D warnings
[`rustfmt`]: https://github.com/rust-lang/rustfmt
[`clippy`]: https://github.com/rust-lang/rust-clippy
## Upgrading the `flatbuffers` crate
IOx uses Flatbuffers for some of its messages. The structure is defined in [`entry/src/entry.fbs`].
We have then used the `flatc` Flatbuffers compiler to generate the corresponding Rust code in
[`entry/src/entry_generated.rs`], which is checked in to the repository.
The checked-in code is compatible with the `flatbuffers` crate version in the `Cargo.lock` file. If
upgrading the version of the `flatbuffers` crate that IOx depends on, the generated code will need
to be updated as well.
Instructions for updating the generated code are in [`docs/regenerating_flatbuffers.md`].
[`entry/src/entry.fbs`]: entry/src/entry.fbs
[`entry/src/entry_generated.rs`]: entry/src/entry_generated.rs
[`docs/regenerating_flatbuffers.md`]: docs/regenerating_flatbuffers.md
## Distributed Tracing
See [tracing.md](docs/tracing.md) for more information on the distributed tracing functionality within IOx

136
Cargo.lock generated
View File

@ -2,12 +2,6 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "Inflector"
version = "0.11.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3"
[[package]]
name = "RustyXML"
version = "0.3.0"
@ -49,12 +43,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "aliasable"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "250f629c0161ad8107cf89319e990051fae62832fd343083bea452d93e2205fd"
[[package]]
name = "alloc-no-stdlib"
version = "2.0.3"
@ -111,7 +99,7 @@ checksum = "a900a6164aa0abf26d7a6dfea727116a3619456decfbf573e60c7615bf49e84c"
dependencies = [
"bitflags",
"chrono",
"comfy-table",
"comfy-table 4.1.1",
"csv",
"flatbuffers",
"hex",
@ -151,7 +139,7 @@ dependencies = [
"ahash",
"arrow",
"chrono",
"comfy-table",
"comfy-table 5.0.0",
"hashbrown",
"num-traits",
"rand",
@ -599,8 +587,19 @@ version = "4.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11e95a3e867422fd8d04049041f5671f94d53c32a9dcd82e2be268714942f3f3"
dependencies = [
"strum",
"strum_macros",
"strum 0.21.0",
"strum_macros 0.21.1",
"unicode-width",
]
[[package]]
name = "comfy-table"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c42350b81f044f576ff88ac750419f914abb46a03831bb1747134344ee7a4e64"
dependencies = [
"strum 0.22.0",
"strum_macros 0.22.0",
"unicode-width",
]
@ -972,23 +971,6 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d"
[[package]]
name = "entry"
version = "0.1.0"
dependencies = [
"bytes",
"chrono",
"data_types",
"flatbuffers",
"generated_types",
"influxdb_line_protocol",
"ouroboros",
"schema",
"snafu",
"time 0.1.0",
"trace",
]
[[package]]
name = "env_logger"
version = "0.8.4"
@ -1346,9 +1328,9 @@ checksum = "ac5956d4e63858efaec57e0d6c1c2f6a41e1487f830314a324ccd7e2223a7ca0"
[[package]]
name = "handlebars"
version = "4.1.3"
version = "4.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66b09e2322d20d14bc2572401ce7c1d60b4748580a76c230ed9c1f8938f0c833"
checksum = "e1874024f4a29f47d609014caec0b1c866f1c1eb0661a09c9733ecc4757f5f88"
dependencies = [
"log",
"pest",
@ -1592,13 +1574,12 @@ dependencies = [
"bytes",
"chrono",
"clap",
"comfy-table",
"comfy-table 5.0.0",
"csv",
"data_types",
"datafusion 0.1.0",
"dirs",
"dotenv",
"entry",
"flate2",
"futures",
"generated_types",
@ -1621,7 +1602,6 @@ dependencies = [
"metric",
"metric_exporters",
"mutable_batch",
"mutable_batch_entry",
"mutable_batch_lp",
"mutable_batch_pb",
"mutable_buffer",
@ -1684,6 +1664,9 @@ dependencies = [
"client_util",
"futures-util",
"generated_types",
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"prost",
"rand",
"serde",
@ -2217,20 +2200,6 @@ dependencies = [
"trace",
]
[[package]]
name = "mutable_batch_entry"
version = "0.1.0"
dependencies = [
"arrow_util",
"data_types",
"entry",
"flatbuffers",
"hashbrown",
"mutable_batch",
"schema",
"snafu",
]
[[package]]
name = "mutable_batch_lp"
version = "0.1.0"
@ -2262,11 +2231,9 @@ version = "0.1.0"
dependencies = [
"bytes",
"criterion",
"entry",
"flate2",
"generated_types",
"mutable_batch",
"mutable_batch_entry",
"mutable_batch_lp",
"mutable_batch_pb",
"prost",
@ -2279,7 +2246,6 @@ dependencies = [
"arrow",
"arrow_util",
"data_types",
"entry",
"metric",
"mutable_batch",
"mutable_batch_lp",
@ -2660,30 +2626,6 @@ dependencies = [
"num-traits",
]
[[package]]
name = "ouroboros"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f357ef82d1b4db66fbed0b8d542cbd3c22d0bf5b393b3c257b9ba4568e70c9c3"
dependencies = [
"aliasable",
"ouroboros_macro",
"stable_deref_trait",
]
[[package]]
name = "ouroboros_macro"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44a0b52c2cbaef7dffa5fec1a43274afe8bd2a644fa9fc50a9ef4ff0269b1257"
dependencies = [
"Inflector",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "packers"
version = "0.1.0"
@ -2834,9 +2776,9 @@ dependencies = [
[[package]]
name = "paste"
version = "1.0.5"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf547ad0c65e31259204bd90935776d1c693cec2f4ff7abb7a1bbbd40dfe58"
checksum = "0744126afe1a6dd7f394cb50a716dbe086cb06e255e53d8d0185d82828358fb5"
[[package]]
name = "pbjson"
@ -3575,16 +3517,23 @@ dependencies = [
name = "router"
version = "0.1.0"
dependencies = [
"async-trait",
"cache_loader_async",
"data_types",
"hashbrown",
"influxdb_iox_client",
"metric",
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"observability_deps",
"parking_lot",
"regex",
"snafu",
"time 0.1.0",
"tokio",
"trace",
"write_buffer",
]
[[package]]
@ -3857,9 +3806,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.68"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8"
checksum = "e466864e431129c7e0d3476b92f20458e5879919a0596c6472738d9fa2d342f8"
dependencies = [
"indexmap",
"itoa",
@ -3954,7 +3903,6 @@ dependencies = [
"criterion",
"data_types",
"datafusion 0.1.0",
"entry",
"flate2",
"influxdb_line_protocol",
"influxdb_tsm",
@ -4162,6 +4110,12 @@ version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aaf86bbcfd1fa9670b7a129f64fc0c9fcbbfe4f1bc4210e9e98fe71ffc12cde2"
[[package]]
name = "strum"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7ac893c7d471c8a21f31cfe213ec4f6d9afeed25537c772e08ef3f005f8729e"
[[package]]
name = "strum_macros"
version = "0.21.1"
@ -4174,6 +4128,18 @@ dependencies = [
"syn",
]
[[package]]
name = "strum_macros"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "339f799d8b549e3744c7ac7feb216383e4005d94bdb22561b3ab8f3b808ae9fb"
dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "subtle"
version = "2.4.1"
@ -5053,13 +5019,11 @@ dependencies = [
"async-trait",
"data_types",
"dotenv",
"entry",
"futures",
"generated_types",
"http",
"httparse",
"mutable_batch",
"mutable_batch_entry",
"mutable_batch_lp",
"mutable_batch_pb",
"observability_deps",

View File

@ -6,7 +6,6 @@ members = [
"data_types",
"datafusion",
"datafusion_util",
"entry",
"generated_types",
"grpc-router",
"grpc-router-test-gen",
@ -24,7 +23,6 @@ members = [
"metric",
"metric_exporters",
"mutable_batch",
"mutable_batch_entry",
"mutable_batch_lp",
"mutable_batch_pb",
"mutable_batch_tests",

View File

@ -14,7 +14,7 @@ snafu = "0.6"
hashbrown = "0.11"
# used by arrow anyway (needed for printing workaround)
chrono = "0.4"
comfy-table = { version = "4.0", default-features = false }
comfy-table = { version = "5.0", default-features = false }
[dev-dependencies]
rand = "0.8.3"

View File

@ -19,6 +19,12 @@ impl ShardId {
}
}
impl std::fmt::Display for ShardId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ShardId({})", self.get())
}
}
/// ShardConfig defines rules for assigning a line/row to an individual
/// host or a group of hosts. A shard
/// is a logical concept, but the usage is meant to split data into

View File

@ -1,7 +1,7 @@
use std::{collections::HashMap, num::NonZeroU32};
use std::{collections::BTreeMap, num::NonZeroU32};
/// If the buffer is used for reading or writing.
#[derive(Debug, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub enum WriteBufferDirection {
/// Writes into the buffer aka "producer".
Write,
@ -13,7 +13,7 @@ pub enum WriteBufferDirection {
pub const DEFAULT_N_SEQUENCERS: u32 = 1;
/// Configures the use of a write buffer.
#[derive(Debug, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub struct WriteBufferConnection {
/// If the buffer is used for reading or writing.
pub direction: WriteBufferDirection,
@ -27,7 +27,9 @@ pub struct WriteBufferConnection {
/// Special configs to be applied when establishing the connection.
///
/// This depends on [`type_`](Self::type_) and can configure aspects like timeouts.
pub connection_config: HashMap<String, String>,
///
/// Note: This config should be a [`BTreeMap`] to ensure that a stable hash.
pub connection_config: BTreeMap<String, String>,
/// Specifies if the sequencers (e.g. for Kafka in form of a topic) should be automatically created if they do not
/// existing prior to reading or writing.
@ -50,7 +52,7 @@ impl Default for WriteBufferConnection {
///
/// What that means depends on the used write buffer, e.g. for Kafka this will create a new topic w/
/// [`n_sequencers`](Self::n_sequencers) partitions.
#[derive(Debug, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub struct WriteBufferCreationConfig {
/// Number of sequencers.
///
@ -61,7 +63,9 @@ pub struct WriteBufferCreationConfig {
/// Special configs to by applied when sequencers are created.
///
/// This depends on [type](WriteBufferConnection::type_) and can setup parameters like retention policy.
pub options: HashMap<String, String>,
///
/// Note: This config should be a [`BTreeMap`] to ensure that a stable hash.
pub options: BTreeMap<String, String>,
}
impl Default for WriteBufferCreationConfig {

View File

@ -48,17 +48,6 @@ RUN apt-key add /tmp/redpanda.gpg \
&& apt-get install -y redpanda \
&& rm -rf /var/lib/{apt,dpkg,cache,log}
# Install bazel using the installer script to enable building of flatc in the flatbuffers check
ENV BAZEL_VERSION=4.0.0
ENV BAZEL_DOWNLOAD_BASE="https://github.com/bazelbuild/bazel/releases/download"
RUN curl ${CURL_FLAGS} https://bazel.build/bazel-release.pub.gpg | gpg --import - \
&& curl ${CURL_FLAGS} -LO ${BAZEL_DOWNLOAD_BASE}/${BAZEL_VERSION}/bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh \
&& curl ${CURL_FLAGS} -LO ${BAZEL_DOWNLOAD_BASE}/${BAZEL_VERSION}/bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh.sig \
&& gpg --verify bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh.sig bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh \
&& chmod +x bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh \
&& ./bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh \
&& rm bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh bazel-${BAZEL_VERSION}-installer-linux-x86_64.sh.sig
# Install InfluxDB 2.0 OSS to enable integration tests of the influxdb2_client crate
ENV INFLUXDB2_VERSION=2.0.4
ENV INFLUXDB2_DOWNLOAD_BASE="https://dl.influxdata.com/influxdb/releases"

View File

@ -32,7 +32,6 @@ We hold monthly Tech Talks that explain the project's technical underpinnings. Y
* Thoughts on using multiple cores / thread pools: [multi_core_tasks.md](multi_core_tasks.md)
* [Query Engine Docs](../query/README.md)
* [Testing documentation](testing.md) for developers of IOx
* [Regenerating Flatbuffers code](regenerating_flatbuffers.md) when updating the version of the `flatbuffers` crate
* Protobuf tips and tricks: [Protobuf](protobuf.md).
* Catalog Persistence: [`catalog_persistence.md`](catalog_persistence.md).
* SQL command line tips and tricks: [SQL](sql.md).

View File

@ -1,31 +0,0 @@
# Regenerating Flatbuffers code
If you have changed some `*.fbs` files:
- Run `./entry/regenerate-flatbuffers.sh` to regenerate the corresponding Rust code.
- Run `cargo test` to make sure everything works as you would expect.
- Check in the changes to the generated code along with your changes to the `*.fbs` files.
- You should not need to edit the `entry/regenerate-flatbuffers.sh` script.
If you are updating the version of the `flatbuffers` crate in `Cargo.lock`, either because a new
patch release has come out that is compatible with the version range in `Cargo.toml` and you have
run `cargo update`, or because you've updated the version constraint in `Cargo.toml` to change to a
new major or minor version:
- The `flatbuffers` crate gets developed in sync with the `flatc` compiler in the same repo,
so when updating the `flatbuffers` crate we also need to update the `flatc` compiler we're
using.
- Go to https://github.com/google/flatbuffers/blame/master/rust/flatbuffers/Cargo.toml and find
the commit SHA where the `version` metadata was updated to the version of the `flatbuffers`
crate we now want to have in our `Cargo.lock`.
- In the `entry/regenerate-flatbuffers.sh` script, put that commit SHA in the `FB_COMMIT`
variable.
- Run `./entry/regenerate-flatbuffers.sh` to regenerate the corresponding Rust code.
- Run `cargo test` to make sure everything works as you would expect.
- Check in the changes to the generated code along with your changes to the `Cargo.lock` file,
`Cargo.toml` file if relevant, and the `entry/regenerate-flatbuffers.sh` script.
By default, the `entry/regenerate-flatbuffers.sh` script will run a Docker container that
uses the same image we use in CI that will have all the necessary dependencies. If you don't want
to use Docker, run this script with `INFLUXDB_IOX_INTEGRATION_LOCAL=1`, which will require you to
have `bazel` available. You can likely install `bazel` with your favourite package manager.

View File

@ -1,21 +0,0 @@
[package]
name = "entry"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2021"
description = "The entry format used by the write buffer"
[dependencies]
bytes = "1.0"
chrono = "0.4"
data_types = { path = "../data_types" }
# See docs/regenerating_flatbuffers.md about updating generated code when updating the
# version of the flatbuffers crate
flatbuffers = "2"
snafu = "0.6"
time = { path = "../time" }
trace = { path = "../trace" }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
ouroboros = "0.13.0"
schema = { path = "../schema" }
generated_types = { path = "../generated_types" }

View File

@ -1,24 +0,0 @@
#!/bin/bash -eu
# Change to the generated_types crate directory, where this script is located
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
pushd $DIR
echo "Regenerating flatbuffers code..."
./regenerate-flatbuffers.sh
echo "Checking for uncommitted changes..."
if ! git diff --quiet HEAD --; then
echo "git diff found:"
git diff HEAD
echo "************************************************************"
echo "* Found uncommitted changes to generated flatbuffers code! *"
echo "* Please run \`entry/regenerate-flatbuffers.sh\` *"
echo "* to regenerate the flatbuffers code and check it in! *"
echo "************************************************************"
exit 1
else
echo "No uncommitted changes; this is fine."
fi

View File

@ -1,99 +0,0 @@
#!/bin/bash -e
# Instructions
#
# If you have changed some `*.fbs` files:
#
# - Run this script to regenerate the corresponding Rust code.
# - Run `cargo test` to make sure everything works as you would expect.
# - Check in the changes to the generated code along with your changes to the `*.fbs` files.
# - You should not need to edit this script.
#
# If you are updating the version of the `flatbuffers` crate in `Cargo.lock`:
#
# - The `flatbuffers` crate gets developed in sync with the `flatc` compiler in the same repo,
# so when updating the `flatbuffers` crate we also need to update the `flatc` compiler we're
# using.
# - Go to https://github.com/google/flatbuffers/blame/master/rust/flatbuffers/Cargo.toml and find
# the commit SHA where the `version` metadata was updated to the version of the `flatbuffers`
# crate we now want to have in our `Cargo.lock`.
# - Put that commit SHA in this variable:
FB_COMMIT="a9a295fecf3fbd5a4f571f53b01f63202a3e2113"
# - Run this script to regenerate the corresponding Rust code.
# - Run `cargo test` to make sure everything works as you would expect.
# - Check in the changes to the generated code along with your changes to the `Cargo.lock` file and
# this script.
# By default, this script will run a Docker container that uses the same image we use in CI that
# will have all the necessary dependencies. If you don't want to run Docker, run this script with
# INFLUXDB_IOX_INTEGRATION_LOCAL=1.
if [ -z "${INFLUXDB_IOX_INTEGRATION_LOCAL}" ]; then
echo "Running in Docker..."
CI_IMAGE=quay.io/influxdb/rust:ci
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
pushd "$DIR"
DOCKER_IOX_DIR=/home/rust/influxdb_iox
docker rm --force flatc || true
docker pull ${CI_IMAGE} || true
docker run \
-it \
--detach \
--name=flatc \
--volume "${DIR}/..:${DOCKER_IOX_DIR}" \
${CI_IMAGE}
docker exec -e INFLUXDB_IOX_INTEGRATION_LOCAL=1 flatc .${DOCKER_IOX_DIR}/entry/regenerate-flatbuffers.sh
docker rm --force flatc || true
else
echo "Running locally..."
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
pushd "$DIR"
echo "Building flatc from source ..."
FB_URL="https://github.com/google/flatbuffers"
FB_DIR=".flatbuffers"
FLATC="$FB_DIR/bazel-bin/flatc"
if [ -z "$(which bazel)" ]; then
echo "bazel is required to build flatc"
exit 1
fi
echo "Bazel version: $(bazel version | head -1 | awk -F':' '{print $2}')"
if [ ! -e $FB_DIR ]; then
echo "git clone $FB_URL ..."
git clone -b master --no-tag $FB_URL $FB_DIR
else
echo "git pull $FB_URL ..."
git -C $FB_DIR pull --ff-only
fi
echo "hard reset to $FB_COMMIT"
git -C $FB_DIR reset --hard $FB_COMMIT
pushd $FB_DIR
echo "run: bazel build :flatc ..."
bazel build :flatc
popd
RUST_DIR="$DIR/src"
while read -r FBS_FILE; do
echo "Compiling ${FBS_FILE}"
$FLATC --rust -o "$RUST_DIR" "$FBS_FILE"
done < <(git ls-files "$DIR"/*.fbs)
cargo fmt
popd
echo "DONE! Please run 'cargo test' and check in any changes."
fi

View File

@ -1,113 +0,0 @@
namespace influxdata.iox.write.v1;
// Every modification to a database is represented as an entry. These can be forwarded
// on to other IOx servers or to the write buffer.
// An entry can only be one of these Operation types
union Operation {
write: WriteOperations,
delete: DeleteOperations,
}
table WriteOperations {
// A collection of partition writes. A given partition will have at most one
// write in this collection.
partition_writes: [PartitionWrite];
}
table DeleteOperations {
// A collection of deletes. Each delete targets a single table, with each table
// having no more than one delete. Deletes can span partitions because they
// only have a predicate and do not target any specific partition.
deletes: [Delete];
}
table Entry {
operation: Operation;
}
// A write to a partition. If the IOx server creating this PartitionWrite has
// no rules for generating partition keys, the key will be null, representing
// the empty string.
table PartitionWrite {
key: string;
table_batches: [TableWriteBatch];
}
// A delete from a single table with a predicate. Deletes can span partitions since
// they're concerned with data that has already been written. Partitioning is a way
// to split up writes as they land.
table Delete {
table_name: string;
predicate: string;
}
// A collection of rows in a table in column oriented representation
table TableWriteBatch {
name: string;
// every column must have the same number of bytes in its null_mask. They also must
// have the same number of rows n such that for each column c:
// c.values().len() + count_ones(null_mask) = n
columns: [Column];
}
enum LogicalColumnType : byte { IOx, Tag, Field, Time }
union ColumnValues {
I64Values,
F64Values,
U64Values,
StringValues,
BoolValues,
BytesValues,
}
table Column {
name: string;
// this keeps a mapping of what kind of InfluxDB or IOx type this column came from
logical_column_type: LogicalColumnType;
// the set of non-null values for this column. Their position in this array does not
// map to their row position in the batch. The bitmask must be used to map the row
// position to the position in this array.
values: ColumnValues;
// mask that maps the position to if that value is null. Null positions will
// not have a value represented in the values array. To read the values out of the
// column requires traversing the mask to determine what position in the values
// array that index is located in. Here's what it might look like:
// position: 0 8 9 24
// bit: 00100011 00111000 00000001
// An on bit (1) indicates that the value at that position is null. If there are
// no null values in the column, the null_mask is omitted from the flatbuffers.
null_mask: [ubyte];
}
table I64Values {
values: [int64];
}
table F64Values {
values: [float64];
}
table U64Values {
values: [uint64];
}
table StringValues {
values: [string];
}
table BoolValues {
values: [bool];
}
table BytesValues {
values: [BytesValue];
}
table BytesValue {
data: [ubyte];
}
root_type Entry;
file_identifier "IOxE";

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,9 +0,0 @@
mod entry;
#[allow(unused_imports, clippy::needless_borrow)]
mod entry_generated;
pub use crate::entry::*;
/// Generated Flatbuffers code for replicating and writing data between IOx
/// servers
pub use entry_generated::influxdata::iox::write::v_1 as entry_fb;

View File

@ -33,12 +33,12 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
let deployment_path = root.join("influxdata/iox/deployment/v1");
let idpe_path = root.join("com/github/influxdata/idpe/storage/read");
let management_path = root.join("influxdata/iox/management/v1");
let predicate_path = root.join("influxdata/iox/predicate/v1");
let preserved_catalog_path = root.join("influxdata/iox/preserved_catalog/v1");
let remote_path = root.join("influxdata/iox/remote/v1");
let router_path = root.join("influxdata/iox/router/v1");
let storage_path = root.join("influxdata/platform/storage");
let write_buffer_path = root.join("influxdata/iox/write_buffer/v1");
let write_path = root.join("influxdata/iox/write/v1");
let proto_files = vec![
delete_path.join("service.proto"),
@ -52,9 +52,9 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
management_path.join("server_config.proto"),
management_path.join("service.proto"),
management_path.join("shard.proto"),
predicate_path.join("predicate.proto"),
preserved_catalog_path.join("catalog.proto"),
preserved_catalog_path.join("parquet_metadata.proto"),
preserved_catalog_path.join("predicate.proto"),
root.join("google/longrunning/operations.proto"),
root.join("google/rpc/error_details.proto"),
root.join("google/rpc/status.proto"),
@ -70,7 +70,6 @@ fn generate_grpc_types(root: &Path) -> Result<()> {
storage_path.join("storage_common.proto"),
storage_path.join("storage_common_idpe.proto"),
storage_path.join("test.proto"),
write_path.join("service.proto"),
write_buffer_path.join("write_buffer.proto"),
];

View File

@ -1,5 +1,5 @@
syntax = "proto3";
package influxdata.iox.preserved_catalog.v1;
package influxdata.iox.predicate.v1;
// Represents a parsed predicate for evaluation by the InfluxDB IOx query engine.
message Predicate {
@ -21,18 +21,6 @@ message Predicate {
repeated Expr exprs = 5;
}
// A optional string set.
//
// This is used instead of a `repeated string` to differenctiate between "empty set" and "none".
message OptionalStringSet {
repeated string values = 1;
}
// An optional string.
message OptionalString {
string value = 1;
}
// Specifies a continuous range of nanosecond timestamps.
message TimestampRange {
// Start defines the inclusive lower bound.

View File

@ -2,7 +2,7 @@ syntax = "proto3";
package influxdata.iox.preserved_catalog.v1;
import "google/protobuf/timestamp.proto";
import "influxdata/iox/preserved_catalog/v1/predicate.proto";
import "influxdata/iox/predicate/v1/predicate.proto";
// Path for object store interaction.
message Path {
@ -68,7 +68,7 @@ message ChunkAddr {
// Register new delete predicate
message DeletePredicate {
// Predicate to be applied.
Predicate predicate = 1;
influxdata.iox.predicate.v1.Predicate predicate = 1;
// Chunks that are affected by the predicate.
repeated ChunkAddr chunks = 2;

View File

@ -1,42 +0,0 @@
syntax = "proto3";
package influxdata.iox.write.v1;
service WriteService {
// write data into a specific Database
rpc Write(WriteRequest) returns (WriteResponse) {
option deprecated = true;
};
// write an entry into a Database
rpc WriteEntry(WriteEntryRequest) returns (WriteEntryResponse);
}
message WriteRequest {
// name of database into which to write
string db_name = 1;
// data, in [LineProtocol] format
//
// [LineProtocol]: https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format
string lp_data = 2;
}
message WriteResponse {
// how many lines were parsed and written into the database
uint64 lines_written = 1;
}
message WriteEntryRequest {
// name of database into which to write
string db_name = 1;
// entry, in serialized flatbuffers [Entry] format
//
// [Entry]: https://github.com/influxdata/influxdb_iox/blob/main/generated_types/protos/influxdata/iox/write/v1/entry.fbs
bytes entry = 2;
}
message WriteEntryResponse {
}

View File

@ -62,6 +62,16 @@ pub mod influxdata {
}
}
pub mod predicate {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/influxdata.iox.predicate.v1.rs"));
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.predicate.v1.serde.rs"
));
}
}
pub mod preserved_catalog {
pub mod v1 {
include!(concat!(
@ -95,16 +105,6 @@ pub mod influxdata {
}
}
pub mod write {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/influxdata.iox.write.v1.rs"));
include!(concat!(
env!("OUT_DIR"),
"/influxdata.iox.write.v1.serde.rs"
));
}
}
pub mod write_buffer {
pub mod v1 {
include!(concat!(

View File

@ -17,7 +17,7 @@ impl From<WriteBufferConnection> for write_buffer::WriteBufferConnection {
direction: direction.into(),
r#type: v.type_,
connection: v.connection,
connection_config: v.connection_config,
connection_config: v.connection_config.into_iter().collect(),
creation_config: v.creation_config.map(|x| x.into()),
}
}
@ -36,7 +36,7 @@ impl From<WriteBufferCreationConfig> for write_buffer::WriteBufferCreationConfig
fn from(v: WriteBufferCreationConfig) -> Self {
Self {
n_sequencers: v.n_sequencers.get(),
options: v.options,
options: v.options.into_iter().collect(),
}
}
}
@ -57,7 +57,7 @@ impl TryFrom<write_buffer::WriteBufferConnection> for WriteBufferConnection {
direction: direction.try_into()?,
type_: proto.r#type,
connection: proto.connection,
connection_config: proto.connection_config,
connection_config: proto.connection_config.into_iter().collect(),
creation_config: proto.creation_config.optional("creation_config")?,
})
}
@ -86,7 +86,7 @@ impl TryFrom<write_buffer::WriteBufferCreationConfig> for WriteBufferCreationCon
Ok(Self {
n_sequencers: NonZeroU32::try_from(proto.n_sequencers)
.unwrap_or_else(|_| NonZeroU32::try_from(DEFAULT_N_SEQUENCERS).unwrap()),
options: proto.options,
options: proto.options.into_iter().collect(),
})
}
}

View File

@ -9,7 +9,7 @@ bytes = "1.0"
cache_loader_async = {version = "0.1.2", features = ["ttl-cache"] }
futures = "0.3"
observability_deps = { path = "../observability_deps" }
paste = "1.0.5"
paste = "1.0.6"
prost = "0.8"
prost-types = "0.8"
thiserror = "1.0.30"
@ -20,7 +20,7 @@ tonic = "0.5"
tonic-reflection = "0.2.0"
[build-dependencies]
paste = "1.0.5"
paste = "1.0.6"
prost-build = "0.8"
tonic-build = "0.5"

View File

@ -9,7 +9,7 @@ bytes = "1.0"
futures = { version = "0.3", default-features = false }
reqwest = { version = "0.11", features = ["stream", "json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.67"
serde_json = "1.0.69"
snafu = "0.6.6"
url = "2.1.1"

View File

@ -9,10 +9,9 @@ default-run = "influxdb_iox"
# Workspace dependencies, in alphabetical order
datafusion = { path = "../datafusion" }
data_types = { path = "../data_types" }
entry = { path = "../entry" }
generated_types = { path = "../generated_types" }
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] }
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format", "write_lp"] }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
internal_types = { path = "../internal_types" }
iox_object_store = { path = "../iox_object_store" }
@ -20,7 +19,6 @@ logfmt = { path = "../logfmt" }
metric = { path = "../metric" }
metric_exporters = { path = "../metric_exporters" }
mutable_batch = { path = "../mutable_batch" }
mutable_batch_entry = { path = "../mutable_batch_entry" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
mutable_buffer = { path = "../mutable_buffer" }
@ -68,12 +66,12 @@ itertools = "0.10.1"
parquet = "6.0"
pin-project = "1.0"
# used by arrow/datafusion anyway
comfy-table = { version = "4.0", default-features = false }
comfy-table = { version = "5.0", default-features = false }
pprof = { version = "^0.5", default-features = false, features = ["flamegraph", "protobuf"], optional = true }
prost = "0.8"
rustyline = { version = "9.0", default-features = false }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.67"
serde_json = "1.0.69"
serde_urlencoded = "0.7.0"
snafu = "0.6.9"
structopt = "0.3.25"
@ -96,10 +94,8 @@ heappy = { git = "https://github.com/mkmik/heappy", rev = "20aa466524ac9ce34a4ba
[dev-dependencies]
# Workspace dependencies, in alphabetical order
arrow_util = { path = "../arrow_util" }
entry = { path = "../entry" }
influxdb2_client = { path = "../influxdb2_client" }
influxdb_storage_client = { path = "../influxdb_storage_client" }
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight"] }
test_helpers = { path = "../test_helpers" }
parking_lot = "0.11.2"
write_buffer = { path = "../write_buffer" }

View File

@ -15,6 +15,7 @@ use influxdb_iox_client::{
use std::{fs::File, io::Read, num::NonZeroU64, path::PathBuf, str::FromStr, time::Duration};
use structopt::StructOpt;
use thiserror::Error;
use time::TimeProvider;
use uuid::Uuid;
mod chunk;
@ -303,7 +304,8 @@ pub async fn command(connection: Connection, config: Config) -> Result<()> {
source: e,
})?;
let lines_written = client.write(write.name, lp_data).await?;
let default_time = time::SystemProvider::new().now().timestamp_nanos();
let lines_written = client.write_lp(write.name, lp_data, default_time).await?;
println!("{} Lines OK", lines_written);
}

View File

@ -15,6 +15,7 @@ use crate::{
use router::{resolver::RemoteTemplate, server::RouterServer};
use structopt::StructOpt;
use thiserror::Error;
use time::SystemProvider;
#[derive(Debug, Error)]
pub enum Error {
@ -63,10 +64,15 @@ pub async fn command(config: Config) -> Result<()> {
let common_state = CommonServerState::from_config(config.run_config.clone())?;
let remote_template = config.remote_template.map(RemoteTemplate::new);
let router_server = Arc::new(RouterServer::new(
remote_template,
common_state.trace_collector(),
));
let time_provider = Arc::new(SystemProvider::new());
let router_server = Arc::new(
RouterServer::new(
remote_template,
common_state.trace_collector(),
time_provider,
)
.await,
);
let server_type = Arc::new(RouterServerType::new(router_server, &common_state));
Ok(influxdb_ioxd::main(common_state, server_type).await?)

View File

@ -482,7 +482,7 @@ mod tests {
let mut write = influxdb_iox_client::write::Client::new(conn.clone());
write
.write(db_info.db_name(), "cpu,tag0=foo val=1 100\n")
.write_lp(db_info.db_name(), "cpu,tag0=foo val=1 100\n", 0)
.await
.unwrap();

View File

@ -15,7 +15,6 @@ mod management;
mod operations;
mod remote;
mod storage;
mod write;
mod write_pb;
pub async fn server_grpc<M>(
@ -39,7 +38,6 @@ where
builder,
delete::make_server(Arc::clone(&server_type.server))
);
add_gated_service!(builder, write::make_server(Arc::clone(&server_type.server)));
add_gated_service!(
builder,
write_pb::make_server(Arc::clone(&server_type.server))

View File

@ -1,98 +0,0 @@
use std::convert::TryFrom;
use std::fmt::Debug;
use std::sync::Arc;
use chrono::Utc;
use tonic::Response;
use data_types::DatabaseName;
use generated_types::{
google::{FieldViolation, FieldViolationExt},
influxdata::iox::write::v1::*,
};
use mutable_batch::{DbWrite, WriteMeta};
use observability_deps::tracing::debug;
use server::{connection::ConnectionManager, Server};
use super::error::default_server_error_handler;
/// Implementation of the write service
struct WriteService<M: ConnectionManager> {
server: Arc<Server<M>>,
}
#[tonic::async_trait]
impl<M> write_service_server::WriteService for WriteService<M>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
async fn write(
&self,
request: tonic::Request<WriteRequest>,
) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
let span_ctx = request.extensions().get().cloned();
let request = request.into_inner();
// The time, in nanoseconds since the epoch, to assign to any points that don't
// contain a timestamp
let default_time = Utc::now().timestamp_nanos();
let lp_data = request.lp_data;
let db_name = DatabaseName::new(&request.db_name).field("db_name")?;
let (tables, stats) = mutable_batch_lp::lines_to_batches_stats(&lp_data, default_time)
.map_err(|e| FieldViolation {
field: "lp_data".into(),
description: format!("Invalid Line Protocol: {}", e),
})?;
debug!(%db_name, lp_line_count=stats.num_lines, body_size=lp_data.len(), num_fields=stats.num_fields, "Writing lines into database");
let write = DbWrite::new(tables, WriteMeta::unsequenced(span_ctx));
self.server
.write(&db_name, write)
.await
.map_err(default_server_error_handler)?;
Ok(Response::new(WriteResponse {
lines_written: stats.num_lines as u64,
}))
}
async fn write_entry(
&self,
request: tonic::Request<WriteEntryRequest>,
) -> Result<tonic::Response<WriteEntryResponse>, tonic::Status> {
let span_ctx = request.extensions().get().cloned();
let request = request.into_inner();
let db_name = DatabaseName::new(&request.db_name).field("db_name")?;
if request.entry.is_empty() {
return Err(FieldViolation::required("entry").into());
}
let entry = entry::Entry::try_from(request.entry).field("entry")?;
let tables = mutable_batch_entry::entry_to_batches(&entry).map_err(|e| FieldViolation {
field: "entry".into(),
description: format!("Invalid Entry: {}", e),
})?;
let write = DbWrite::new(tables, WriteMeta::unsequenced(span_ctx));
self.server
.write(&db_name, write)
.await
.map_err(default_server_error_handler)?;
Ok(Response::new(WriteEntryResponse {}))
}
}
/// Instantiate the write service
pub fn make_server<M>(
server: Arc<Server<M>>,
) -> write_service_server::WriteServiceServer<impl write_service_server::WriteService>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
write_service_server::WriteServiceServer::new(WriteService { server })
}

View File

@ -34,7 +34,7 @@ async fn test_delete() {
];
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("write succeded");

View File

@ -25,7 +25,7 @@ async fn test_querying_deleted_database() {
.await
.expect("create database failed");
let num_lines_written = write_client
.write(&db_name, "cpu,region=west user=12.3 100")
.write_lp(&db_name, "cpu,region=west user=12.3 100", 0)
.await
.expect("cannot write");
@ -60,7 +60,7 @@ async fn test_querying_deleted_database() {
.await
.expect("create database failed");
let num_lines_written = write_client
.write(&db_name, "cpu,region=east user=99.9 200")
.write_lp(&db_name, "cpu,region=east user=99.9 200", 0)
.await
.expect("cannot write");

View File

@ -32,7 +32,7 @@ async fn test_serving_readiness_database() {
.set_serving_readiness(false)
.await
.unwrap();
let err = write_client.write(name, lp_data).await.unwrap_err();
let err = write_client.write_lp(name, lp_data, 0).await.unwrap_err();
assert!(
matches!(&err, WriteError::ServerError(status) if status.code() == Code::Unavailable),
"{}",
@ -43,7 +43,7 @@ async fn test_serving_readiness_database() {
deployment_client.set_serving_readiness(true).await.unwrap();
assert!(deployment_client.get_serving_readiness().await.unwrap());
write_client.write(name, lp_data).await.unwrap();
write_client.write_lp(name, lp_data, 0).await.unwrap();
}
// TODO(marco): add `test_serving_readiness_router` once we have some other API that we could use for testing

View File

@ -28,7 +28,7 @@ async fn test_mub_freeze() {
.collect();
let num_lines_written = write_client
.write(&db_name, lp_lines.iter().join("\n"))
.write_lp(&db_name, lp_lines.iter().join("\n"), 0)
.await
.expect("successful write");
assert_eq!(num_lines_written, 20);
@ -38,7 +38,7 @@ async fn test_mub_freeze() {
assert_eq!(chunks[0].storage, ChunkStorage::ClosedMutableBuffer);
let num_lines_written = write_client
.write(&db_name, lp_lines.iter().take(10).join("\n"))
.write_lp(&db_name, lp_lines.iter().take(10).join("\n"), 0)
.await
.expect("successful write");
assert_eq!(num_lines_written, 10);
@ -51,7 +51,7 @@ async fn test_mub_freeze() {
assert_eq!(chunks[1].storage, ChunkStorage::ClosedMutableBuffer);
let num_lines_written = write_client
.write(&db_name, lp_lines.iter().take(10).join("\n"))
.write_lp(&db_name, lp_lines.iter().take(10).join("\n"), 0)
.await
.expect("successful write");
assert_eq!(num_lines_written, 10);

View File

@ -51,7 +51,7 @@ async fn test_serving_readiness() {
.expect("create database failed");
mgmt_client.set_serving_readiness(false).await.unwrap();
let err = write_client.write(name, lp_data).await.unwrap_err();
let err = write_client.write_lp(name, lp_data, 0).await.unwrap_err();
assert!(
matches!(&err, WriteError::ServerError(status) if status.code() == Code::Unavailable),
"{}",
@ -59,7 +59,7 @@ async fn test_serving_readiness() {
);
mgmt_client.set_serving_readiness(true).await.unwrap();
write_client.write(name, lp_data).await.unwrap();
write_client.write_lp(name, lp_data, 0).await.unwrap();
}
#[tokio::test]
@ -542,7 +542,7 @@ async fn test_chunk_get() {
];
write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("write succeded");
@ -711,7 +711,7 @@ async fn test_partition_get_error() {
vec!["processes,host=foo running=4i,sleeping=514i,total=519i 1591894310000000000"];
write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("write succeded");
@ -739,7 +739,7 @@ async fn test_list_partition_chunks() {
];
write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("write succeded");
@ -802,7 +802,7 @@ async fn test_new_partition_chunk() {
let lp_lines = vec!["cpu,region=west user=23.2 100"];
write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("write succeded");
@ -826,7 +826,7 @@ async fn test_new_partition_chunk() {
let lp_lines = vec!["cpu,region=west user=21.0 150"];
write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("write succeeded");
@ -906,7 +906,7 @@ async fn test_close_partition_chunk() {
let lp_lines = vec!["cpu,region=west user=23.2 100"];
write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("write succeded");
@ -998,7 +998,7 @@ async fn test_chunk_lifecycle() {
let lp_lines = vec!["cpu,region=west user=23.2 100"];
write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("write succeded");
@ -1305,7 +1305,7 @@ async fn test_unload_read_buffer() {
.collect();
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("successful write");
assert_eq!(num_lines_written, 1000);
@ -1353,7 +1353,10 @@ async fn test_chunk_access_time() {
.build(fixture.grpc_channel())
.await;
write_client.write(&db_name, "cpu foo=1 10").await.unwrap();
write_client
.write_lp(&db_name, "cpu foo=1 10", 0)
.await
.unwrap();
let to_datetime = |a: Option<&generated_types::google::protobuf::Timestamp>| -> DateTime<Utc> {
a.unwrap().clone().try_into().unwrap()
@ -1381,7 +1384,10 @@ async fn test_chunk_access_time() {
assert_eq!(chunks.len(), 1);
let t2 = to_datetime(chunks[0].time_of_last_access.as_ref());
write_client.write(&db_name, "cpu foo=1 20").await.unwrap();
write_client
.write_lp(&db_name, "cpu foo=1 20", 0)
.await
.unwrap();
let chunks = management_client.list_chunks(&db_name).await.unwrap();
assert_eq!(chunks.len(), 1);
@ -1424,7 +1430,7 @@ async fn test_drop_partition() {
.collect();
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("successful write");
assert_eq!(num_lines_written, 1000);
@ -1476,7 +1482,7 @@ async fn test_drop_partition_error() {
.collect();
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("successful write");
assert_eq!(num_lines_written, 1000);
@ -1532,7 +1538,7 @@ async fn test_delete() {
];
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("write succeded");
@ -1665,7 +1671,7 @@ async fn test_persist_partition() {
.await;
let num_lines_written = write_client
.write(&db_name, "data foo=1 10")
.write_lp(&db_name, "data foo=1 10", 0)
.await
.expect("successful write");
assert_eq!(num_lines_written, 1);
@ -1721,7 +1727,7 @@ async fn test_persist_partition_error() {
.await;
let num_lines_written = write_client
.write(&db_name, "data foo=1 10")
.write_lp(&db_name, "data foo=1 10", 0)
.await
.expect("successful write");
assert_eq!(num_lines_written, 1);

View File

@ -30,7 +30,7 @@ async fn test_chunk_is_persisted_automatically() {
.collect();
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("successful write");
assert_eq!(num_lines_written, 1000);
@ -68,7 +68,7 @@ async fn write_data(
// Writing the same data multiple times should be compacted away
for _ in 0..=num_duplicates {
let num_lines_written = write_client
.write(db_name, payload)
.write_lp(db_name, payload, 0)
.await
.expect("successful write");
assert_eq!(num_lines_written, payload_size as usize);
@ -257,7 +257,7 @@ async fn create_readbuffer_chunk(fixture: &ServerFixture, db_name: &str) -> Chun
let lp_lines = vec!["cpu,region=west user=23.2 100"];
write_client
.write(db_name, lp_lines.join("\n"))
.write_lp(db_name, lp_lines.join("\n"), 0)
.await
.expect("write succeded");

View File

@ -490,7 +490,7 @@ pub async fn create_two_partition_database(db_name: impl Into<String>, channel:
];
write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("write succeded");
}

View File

@ -22,7 +22,7 @@ async fn test_operations() {
let lp_lines = vec!["cpu,region=west user=23.2 100"];
write_client
.write(&db_name1, lp_lines.join("\n"))
.write_lp(&db_name1, lp_lines.join("\n"), 0)
.await
.expect("write succeded");

View File

@ -8,16 +8,11 @@ use crate::{
use super::scenario::{create_readable_database, rand_name};
use arrow_util::assert_batches_sorted_eq;
use entry::{
lines_to_sharded_entries,
test_helpers::{partitioner, sharder},
};
use generated_types::influxdata::iox::management::v1::database_rules::RoutingRules;
use generated_types::influxdata::iox::management::v1::{
node_group::Node, sink, HashRing, Matcher, MatcherToShard, NodeGroup, RoutingConfig,
ShardConfig, Sink,
};
use influxdb_line_protocol::parse_lines;
use std::collections::HashMap;
#[tokio::test]
@ -46,7 +41,7 @@ async fn test_write() {
];
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("cannot write");
@ -54,19 +49,19 @@ async fn test_write() {
// ---- test bad data ----
let err = write_client
.write(&db_name, "XXX")
.write_lp(&db_name, "XXX", 0)
.await
.expect_err("expected write to fail");
assert_contains!(
err.to_string(),
r#"Client specified an invalid argument: Violation for field "lp_data": Invalid Line Protocol: error parsing line 1: A generic parsing error occurred"#
r#"Error converting lines: error parsing line 1: A generic parsing error occurred: TakeWhile1"#
);
assert!(matches!(dbg!(err), WriteError::InvalidArgument(_)));
assert!(matches!(dbg!(err), WriteError::LinesConversion(_)));
// ---- test non existent database ----
let err = write_client
.write("Non_existent_database", lp_lines.join("\n"))
.write_lp("Non_existent_database", lp_lines.join("\n"), 0)
.await
.expect_err("expected write to fail");
@ -83,7 +78,10 @@ async fn test_write() {
let lp_lines: Vec<_> = (0..1_000)
.map(|j| format!("flood,tag1={},tag2={} x={},y={} 0", i, j, i, j))
.collect();
if let Err(err) = write_client.write(&db_name, lp_lines.join("\n")).await {
if let Err(err) = write_client
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
{
maybe_err = Some(err);
break;
}
@ -100,48 +98,6 @@ async fn test_write() {
// useless. Don't append any tests after the "hard limit" test!
}
#[tokio::test]
async fn test_write_entry() {
let fixture = ServerFixture::create_shared(ServerType::Database).await;
let mut write_client = fixture.write_client();
let db_name = rand_name();
create_readable_database(&db_name, fixture.grpc_channel()).await;
let lp_data = vec!["cpu bar=1 10", "cpu bar=2 20"].join("\n");
let lines: Vec<_> = parse_lines(&lp_data).map(|l| l.unwrap()).collect();
let default_time = 456;
let sharded_entries =
lines_to_sharded_entries(&lines, default_time, sharder(1).as_ref(), &partitioner(1))
.unwrap();
let entry = sharded_entries.into_iter().next().unwrap().entry;
write_client.write_entry(&db_name, entry).await.unwrap();
let mut query_results = fixture
.flight_client()
.perform_query(&db_name, "select * from cpu")
.await
.unwrap();
let mut batches = Vec::new();
while let Some(data) = query_results.next().await.unwrap() {
batches.push(data);
}
let expected = vec![
"+-----+--------------------------------+",
"| bar | time |",
"+-----+--------------------------------+",
"| 1 | 1970-01-01T00:00:00.000000010Z |",
"| 2 | 1970-01-01T00:00:00.000000020Z |",
"+-----+--------------------------------+",
];
assert_batches_sorted_eq!(&expected, &batches);
}
#[tokio::test]
async fn test_write_routed() {
const TEST_ROUTER_ID: u32 = 1;
@ -288,7 +244,7 @@ async fn test_write_routed() {
];
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("cannot write");
@ -430,7 +386,7 @@ async fn test_write_routed_errors() {
let mut write_client = router.write_client();
let lp_lines = vec!["cpu bar=1 100", "cpu bar=2 200"];
let err = write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.unwrap_err();
@ -495,14 +451,14 @@ async fn test_write_dev_null() {
let mut write_client = router.write_client();
let lp_lines = vec!["cpu bar=1 100", "cpu bar=2 200"];
write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("dev null eats them all");
// Rows not matching that shard won't be send to "/dev/null".
let lp_lines = vec!["mem bar=1 1", "mem bar=2 2"];
let err = write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.unwrap_err();
@ -613,7 +569,7 @@ async fn test_write_routed_no_shard() {
for (&ref db_name, &ref line) in &[(&db_name_1, line_1), (&db_name_2, line_2)] {
let num_lines_written = write_client
.write(db_name, line)
.write_lp(db_name, line, 0)
.await
.expect("cannot write");
assert_eq!(num_lines_written, 1);
@ -709,12 +665,12 @@ async fn test_write_schema_mismatch() {
create_readable_database(&db_name, fixture.grpc_channel()).await;
write_client
.write(&db_name, "table field=1i 10")
.write_lp(&db_name, "table field=1i 10", 0)
.await
.expect("cannot write");
let err = write_client
.write(&db_name, "table field=1.1 10")
.write_lp(&db_name, "table field=1.1 10", 0)
.await
.unwrap_err();
assert_contains!(err.to_string(), "Table batch has mismatching schema");

View File

@ -56,7 +56,7 @@ async fn writes_go_to_write_buffer() {
];
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("cannot write");
assert_eq!(num_lines_written, 3);
@ -106,7 +106,7 @@ async fn writes_go_to_write_buffer_whitelist() {
];
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("cannot write");
assert_eq!(num_lines_written, 4);
@ -249,7 +249,7 @@ async fn cant_write_to_db_reading_from_write_buffer() {
// Writing to this database is an error; all data comes from write buffer
let mut write_client = server.write_client();
let err = write_client
.write(&db_name, "temp,region=south color=1")
.write_lp(&db_name, "temp,region=south color=1", 0)
.await
.expect_err("expected write to fail");
@ -352,7 +352,7 @@ pub async fn test_cross_write_buffer_tracing() {
"disk,region=east bytes=99i 200",
];
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
.write_lp(&db_name, lp_lines.join("\n"), 0)
.await
.expect("cannot write");
assert_eq!(num_lines_written, 3);

View File

@ -7,6 +7,7 @@ edition = "2021"
[features]
flight = ["arrow", "arrow-flight", "arrow_util", "serde/derive", "serde_json", "futures-util"]
format = ["arrow", "arrow_util"]
write_lp = ["mutable_batch", "mutable_batch_lp", "mutable_batch_pb"]
[dependencies]
# Workspace dependencies, in alphabetical order
@ -19,10 +20,13 @@ arrow = { version = "6.0", optional = true }
arrow-flight = { version = "6.0", optional = true }
bytes = "1.0"
futures-util = { version = "0.3.1", optional = true }
mutable_batch = { path = "../mutable_batch", optional = true }
mutable_batch_lp = { path = "../mutable_batch_lp", optional = true }
mutable_batch_pb = { path = "../mutable_batch_pb", optional = true }
prost = "0.8"
rand = "0.8.3"
serde = "1.0.128"
serde_json = { version = "1.0.67", optional = true }
serde_json = { version = "1.0.69", optional = true }
thiserror = "1.0.30"
tonic = { version = "0.5.0" }
uuid = { version = "0.8", features = ["v4"] }

View File

@ -1,10 +1,11 @@
use bytes::Bytes;
use thiserror::Error;
use generated_types::influxdata::iox::write::v1 as write;
use generated_types::influxdata::iox::write::v1::write_service_client::WriteServiceClient;
use generated_types::influxdata::pbdata::v1 as write_pb;
use generated_types::influxdata::pbdata::v1::write_service_client::WriteServiceClient as PBWriteServiceClient;
/// Re-export generated_types
pub mod generated_types {
pub use generated_types::influxdata::pbdata::v1::*;
}
use self::generated_types::write_service_client::WriteServiceClient;
use crate::connection::Connection;
@ -18,6 +19,11 @@ pub enum WriteError {
/// Server returned an invalid argument error
#[error("Invalid argument: {}: {}", .0.code(), .0.message())]
InvalidArgument(tonic::Status),
#[cfg(feature = "write_lp")]
/// Error converting lines
#[error("Error converting lines: {}", .0)]
LinesConversion(#[from] mutable_batch_lp::Error),
}
/// An IOx Write API client.
@ -39,7 +45,7 @@ pub enum WriteError {
///
/// // write a line of line procol data
/// client
/// .write("bananas", "cpu,region=west user=23.2 100")
/// .write_lp("bananas", "cpu,region=west user=23.2 100",0)
/// .await
/// .expect("failed to create database");
/// # }
@ -47,66 +53,52 @@ pub enum WriteError {
#[derive(Debug, Clone)]
pub struct Client {
inner: WriteServiceClient<Connection>,
inner_pb: PBWriteServiceClient<Connection>,
}
impl Client {
/// Creates a new client with the provided connection
pub fn new(channel: Connection) -> Self {
Self {
inner: WriteServiceClient::new(channel.clone()),
inner_pb: PBWriteServiceClient::new(channel),
inner: WriteServiceClient::new(channel),
}
}
/// Write the [LineProtocol] formatted data in `lp_data` to
/// database `name`. Returns the number of lines which were parsed
/// and written to the database
/// database `name`. Lines without a timestamp will be assigned `default_time`
///
/// Returns the number of lines which were parsed and written to the database
///
/// [LineProtocol]: https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format
pub async fn write(
#[cfg(feature = "write_lp")]
pub async fn write_lp(
&mut self,
db_name: impl Into<String> + Send,
lp_data: impl Into<String> + Send,
db_name: impl AsRef<str> + Send,
lp_data: impl AsRef<str> + Send,
default_time: i64,
) -> Result<usize, WriteError> {
let db_name = db_name.into();
let lp_data = lp_data.into();
let response = self
.inner
.write(write::WriteRequest { db_name, lp_data })
.await
.map_err(Self::map_err)?;
let tables = mutable_batch_lp::lines_to_batches(lp_data.as_ref(), default_time)?;
let meta = mutable_batch::WriteMeta::unsequenced(None);
let write = mutable_batch::DbWrite::new(tables, meta);
let lines = write.tables().map(|(_, table)| table.rows()).sum();
Ok(response.into_inner().lines_written as usize)
}
let database_batch = mutable_batch_pb::encode::encode_write(db_name.as_ref(), &write);
/// Write an [Entry] to database `name`.
///
/// An Entry unit of write payload encoded as Flatbuffer structure
/// and passed as a bytes field in the gRPC protobuf API.
///
/// [Entry]: https://github.com/influxdata/influxdb_iox/blob/main/entry/src/entry.fbs
pub async fn write_entry(
&mut self,
db_name: impl Into<String> + Send,
entry: impl Into<Bytes> + Send,
) -> Result<(), WriteError> {
let db_name = db_name.into();
let entry = entry.into();
self.inner
.write_entry(write::WriteEntryRequest { db_name, entry })
.write(generated_types::WriteRequest {
database_batch: Some(database_batch),
})
.await
.map_err(Self::map_err)?;
Ok(())
Ok(lines)
}
/// Write a protobuf batch.
pub async fn write_pb(
&mut self,
write_request: write_pb::WriteRequest,
write_request: generated_types::WriteRequest,
) -> Result<(), WriteError> {
self.inner_pb
self.inner
.write(write_request)
.await
.map_err(Self::map_err)?;

View File

@ -10,7 +10,7 @@ chrono = "0.4.13"
chrono-english = "0.1.4"
clap = "2.33.1"
futures = "0.3.5"
handlebars = "4.1.2"
handlebars = "4.1.4"
humantime = "2.1.0"
data_types = { path = "../data_types" }
generated_types = { path = "../generated_types" }
@ -19,7 +19,7 @@ influxdb_iox_client = { path = "../influxdb_iox_client" }
itertools = "0.10.0"
rand = { version = "0.8.3", features = ["small_rng"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.67"
serde_json = "1.0.69"
snafu = "0.6.8"
tokio = { version = "1.13", features = ["macros", "rt-multi-thread"] }
toml = "0.5.6"

View File

@ -11,10 +11,8 @@
)]
use clap::{App, Arg};
use generated_types::influxdata::iox::{
management::v1::{self as management, database_rules::*, lifecycle_rules::*, *},
write_buffer::v1::*,
};
use influxdb_iox_client::management::generated_types::*;
use influxdb_iox_client::write::generated_types::*;
#[tokio::main]
async fn main() {
@ -85,9 +83,9 @@ Examples:
..Default::default()
}),
worker_cleanup_avg_sleep: None,
routing_rules: Some(RoutingRules::RoutingConfig(RoutingConfig {
sink: Some(management::Sink {
sink: Some(management::sink::Sink::Kafka(KafkaProducer {})),
routing_rules: Some(database_rules::RoutingRules::RoutingConfig(RoutingConfig {
sink: Some(Sink {
sink: Some(sink::Sink::Kafka(KafkaProducer {})),
}),
})),
write_buffer_connection: Some(WriteBufferConnection {
@ -110,15 +108,17 @@ Examples:
buffer_size_soft: 1024 * 1024 * 1024,
buffer_size_hard: 1024 * 1024 * 1024 * 2,
worker_backoff_millis: 100,
max_active_compactions_cfg: Some(MaxActiveCompactionsCfg::MaxActiveCompactions(1)),
max_active_compactions_cfg: Some(
lifecycle_rules::MaxActiveCompactionsCfg::MaxActiveCompactions(1),
),
persist: true,
persist_row_threshold: 10 * 1000 * 1000,
..Default::default()
}),
worker_cleanup_avg_sleep: None,
routing_rules: Some(RoutingRules::RoutingConfig(RoutingConfig {
sink: Some(management::Sink {
sink: Some(management::sink::Sink::Kafka(KafkaProducer {})),
routing_rules: Some(database_rules::RoutingRules::RoutingConfig(RoutingConfig {
sink: Some(Sink {
sink: Some(sink::Sink::Kafka(KafkaProducer {})),
}),
})),
write_buffer_connection: Some(WriteBufferConnection {
@ -144,16 +144,10 @@ Examples:
// Write a few points
let mut write_client = influxdb_iox_client::write::Client::new(writer_grpc_channel);
let lp_lines = [
"write_test,region=west user=23.2 100",
"write_test,region=west user=21.0 150",
"write_test,region=east bytes=99i 200",
];
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
write_client
.write_pb(test_write(&db_name))
.await
.expect("cannot write");
assert_eq!(num_lines_written, 3);
// Create the reader db
let reader_grpc_bind_addr = format!("http://{}", reader);
@ -170,3 +164,62 @@ Examples:
println!("Created database {}", db_name);
}
/// 3 rows of test data
///
/// "write_test,region=west user=23.2 100"
// "write_test,region=west user=21.0 150"
// "write_test,region=east bytes=99i 200"
fn test_write(db_name: &str) -> WriteRequest {
WriteRequest {
database_batch: Some(DatabaseBatch {
database_name: db_name.to_string(),
table_batches: vec![TableBatch {
table_name: "write_test".to_string(),
columns: vec![
Column {
column_name: "time".to_string(),
semantic_type: column::SemanticType::Time as _,
values: Some(column::Values {
i64_values: vec![100, 150, 200],
..Default::default()
}),
null_mask: vec![],
},
Column {
column_name: "region".to_string(),
semantic_type: column::SemanticType::Tag as _,
values: Some(column::Values {
string_values: vec![
"west".to_string(),
"west".to_string(),
"east".to_string(),
],
..Default::default()
}),
null_mask: vec![],
},
Column {
column_name: "user".to_string(),
semantic_type: column::SemanticType::Field as _,
values: Some(column::Values {
f64_values: vec![23.2, 21.0],
..Default::default()
}),
null_mask: vec![0b00000100],
},
Column {
column_name: "bytes".to_string(),
semantic_type: column::SemanticType::Field as _,
values: Some(column::Values {
i64_values: vec![99],
..Default::default()
}),
null_mask: vec![0b00000011],
},
],
row_count: 3,
}],
}),
}
}

View File

@ -1,17 +0,0 @@
[package]
name = "mutable_batch_entry"
version = "0.1.0"
edition = "2018"
description = "Conversion logic for entry flatbuffer <-> MutableBatch"
[dependencies]
entry = { path = "../entry" }
hashbrown = "0.11"
mutable_batch = { path = "../mutable_batch" }
schema = { path = "../schema" }
snafu = "0.6"
[dev-dependencies]
arrow_util = { path = "../arrow_util" }
data_types = { path = "../data_types" }
flatbuffers = "2"

View File

@ -1,375 +0,0 @@
//! Code to convert entry to [`MutableBatch`]
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_copy_implementations,
missing_debug_implementations,
missing_docs,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::use_self,
clippy::clone_on_ref_ptr
)]
use entry::{Column as EntryColumn, Entry, SequencedEntry, TableBatch};
use hashbrown::{HashMap, HashSet};
use mutable_batch::{
payload::{DbWrite, WriteMeta},
writer::Writer,
MutableBatch,
};
use schema::{InfluxColumnType, InfluxFieldType, TIME_COLUMN_NAME};
use snafu::{ensure, ResultExt, Snafu};
/// Error type for entry conversion
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
pub enum Error {
#[snafu(display(
"Invalid null mask, expected to be {} bytes but was {}",
expected_bytes,
actual_bytes
))]
InvalidNullMask {
expected_bytes: usize,
actual_bytes: usize,
},
#[snafu(display("duplicate column name: {}", column))]
DuplicateColumnName { column: String },
#[snafu(display("table batch must contain time column"))]
MissingTime,
#[snafu(display("time column must not contain nulls"))]
NullTime,
#[snafu(display("entry contained empty table batch"))]
EmptyTableBatch,
#[snafu(display("error writing column {}: {}", column, source))]
Write {
source: mutable_batch::writer::Error,
column: String,
},
}
/// Result type for entry conversion
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Converts a [`SequencedEntry`] to a [`DbWrite`]
pub fn sequenced_entry_to_write(entry: &SequencedEntry) -> Result<DbWrite> {
let sequence = entry.sequence().cloned().expect("entry must be sequenced");
let timestamp = entry
.producer_wallclock_timestamp()
.expect("entry must be timestamped");
let meta = WriteMeta::sequenced(
sequence,
timestamp,
entry.span_context().cloned(),
entry.entry().data().len(),
);
let tables = entry_to_batches(entry.entry())?;
Ok(DbWrite::new(tables, meta))
}
/// Converts an [`Entry`] to a collection of [`MutableBatch`] keyed by table name
///
/// Note: this flattens partitioning
pub fn entry_to_batches(entry: &Entry) -> Result<HashMap<String, MutableBatch>> {
let mut batches = HashMap::new();
for partition_write in entry.partition_writes_iter() {
for table_batch in partition_write.table_batches_iter() {
let table_name = table_batch.name();
let (_, batch) = batches
.raw_entry_mut()
.from_key(table_name)
.or_insert_with(|| (table_name.to_string(), MutableBatch::new()));
write_table_batch(batch, table_batch)?;
}
}
Ok(batches)
}
/// Writes the provided [`TableBatch`] to a [`MutableBatch`] on error any changes made
/// to `batch` are reverted
fn write_table_batch(batch: &mut MutableBatch, table_batch: TableBatch<'_>) -> Result<()> {
let row_count = table_batch.row_count();
ensure!(row_count != 0, EmptyTableBatch);
let columns = table_batch.columns();
let mut column_names = HashSet::with_capacity(columns.len());
for column in &columns {
ensure!(
column_names.insert(column.name()),
DuplicateColumnName {
column: column.name()
}
);
}
// Batch must contain a time column
ensure!(column_names.contains(TIME_COLUMN_NAME), MissingTime);
let mut writer = Writer::new(batch, row_count);
for column in &columns {
let valid_mask = construct_valid_mask(column)?;
let valid_mask = valid_mask.as_deref();
let inner = column.inner();
match column.influx_type() {
InfluxColumnType::Field(InfluxFieldType::Float) => {
let values = inner
.values_as_f64values()
.unwrap()
.values()
.into_iter()
.flatten();
writer.write_f64(column.name(), valid_mask, values)
}
InfluxColumnType::Field(InfluxFieldType::Integer) => {
let values = inner
.values_as_i64values()
.unwrap()
.values()
.into_iter()
.flatten();
writer.write_i64(column.name(), valid_mask, values)
}
InfluxColumnType::Field(InfluxFieldType::UInteger) => {
let values = inner
.values_as_u64values()
.unwrap()
.values()
.into_iter()
.flatten();
writer.write_u64(column.name(), valid_mask, values)
}
InfluxColumnType::Tag => {
let values = inner
.values_as_string_values()
.unwrap()
.values()
.into_iter()
.flatten();
writer.write_tag(column.name(), valid_mask, values)
}
InfluxColumnType::Field(InfluxFieldType::String) => {
let values = inner
.values_as_string_values()
.unwrap()
.values()
.into_iter()
.flatten();
writer.write_string(column.name(), valid_mask, values)
}
InfluxColumnType::Field(InfluxFieldType::Boolean) => {
let values = inner
.values_as_bool_values()
.unwrap()
.values()
.into_iter()
.flatten()
.cloned();
writer.write_bool(column.name(), valid_mask, values)
}
InfluxColumnType::Timestamp => {
if valid_mask.is_some() {
return Err(Error::NullTime);
}
let values = inner
.values_as_i64values()
.unwrap()
.values()
.into_iter()
.flatten();
writer.write_time(column.name(), values)
}
InfluxColumnType::IOx(_) => unimplemented!(),
}
.context(Write {
column: column.name(),
})?;
}
writer.commit();
Ok(())
}
/// Construct a validity mask from the given column's null mask
fn construct_valid_mask(column: &EntryColumn<'_>) -> Result<Option<Vec<u8>>> {
let buf_len = (column.row_count + 7) >> 3;
let data = match column.inner().null_mask() {
Some(data) => data,
None => return Ok(None),
};
if data.iter().all(|x| *x == 0) {
return Ok(None);
}
ensure!(
data.len() == buf_len,
InvalidNullMask {
expected_bytes: buf_len,
actual_bytes: data.len()
}
);
Ok(Some(
data.iter()
.map(|x| {
// Currently the bit mask is backwards
!x.reverse_bits()
})
.collect(),
))
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_util::assert_batches_eq;
use data_types::database_rules::{PartitionTemplate, TemplatePart};
use entry::test_helpers::{lp_to_entries, lp_to_entry};
use entry::Entry;
use schema::selection::Selection;
fn first_batch(entry: &Entry) -> TableBatch<'_> {
entry.table_batches().next().unwrap().1
}
#[test]
fn test_basic() {
let mut batch = MutableBatch::new();
let lp = "foo,t1=asdf iv=1i,uv=1u,fv=1.0,bv=true,sv=\"hi\" 1";
let entry = lp_to_entry(lp);
write_table_batch(&mut batch, first_batch(&entry)).unwrap();
let expected = &[
"+------+----+----+----+------+--------------------------------+----+",
"| bv | fv | iv | sv | t1 | time | uv |",
"+------+----+----+----+------+--------------------------------+----+",
"| true | 1 | 1 | hi | asdf | 1970-01-01T00:00:00.000000001Z | 1 |",
"+------+----+----+----+------+--------------------------------+----+",
];
assert_batches_eq!(expected, &[batch.to_arrow(Selection::All).unwrap()]);
let lp = "foo t1=\"string\" 1";
let entry = lp_to_entry(lp);
let err = write_table_batch(&mut batch, first_batch(&entry)).unwrap_err();
assert_eq!(err.to_string(), "error writing column t1: Unable to insert iox::column_type::field::string type into a column of iox::column_type::tag");
let lp = "foo iv=1u 1";
let entry = lp_to_entry(lp);
let err = write_table_batch(&mut batch, first_batch(&entry)).unwrap_err();
assert_eq!(err.to_string(), "error writing column iv: Unable to insert iox::column_type::field::uinteger type into a column of iox::column_type::field::integer");
let lp = "foo fv=1i 1";
let entry = lp_to_entry(lp);
let err = write_table_batch(&mut batch, first_batch(&entry)).unwrap_err();
assert_eq!(err.to_string(), "error writing column fv: Unable to insert iox::column_type::field::integer type into a column of iox::column_type::field::float");
let lp = "foo bv=1 1";
let entry = lp_to_entry(lp);
let err = write_table_batch(&mut batch, first_batch(&entry)).unwrap_err();
assert_eq!(err.to_string(), "error writing column bv: Unable to insert iox::column_type::field::float type into a column of iox::column_type::field::boolean");
let lp = "foo sv=true 1";
let entry = lp_to_entry(lp);
let err = write_table_batch(&mut batch, first_batch(&entry)).unwrap_err();
assert_eq!(err.to_string(), "error writing column sv: Unable to insert iox::column_type::field::boolean type into a column of iox::column_type::field::string");
let lp = "foo,sv=\"bar\" f=3i 1";
let entry = lp_to_entry(lp);
let err = write_table_batch(&mut batch, first_batch(&entry)).unwrap_err();
assert_eq!(err.to_string(), "error writing column sv: Unable to insert iox::column_type::tag type into a column of iox::column_type::field::string");
assert_batches_eq!(expected, &[batch.to_arrow(Selection::All).unwrap()]);
let lp = r#"
foo,t1=v1 fv=3.0,bv=false 2
foo,t1=v3 fv=3.0,bv=false 2
foo,t2=v6 bv=true,iv=3i 3
"#;
let entry = lp_to_entry(lp);
write_table_batch(&mut batch, first_batch(&entry)).unwrap();
assert_batches_eq!(
&[
"+-------+----+----+----+------+----+--------------------------------+----+",
"| bv | fv | iv | sv | t1 | t2 | time | uv |",
"+-------+----+----+----+------+----+--------------------------------+----+",
"| true | 1 | 1 | hi | asdf | | 1970-01-01T00:00:00.000000001Z | 1 |",
"| false | 3 | | | v1 | | 1970-01-01T00:00:00.000000002Z | |",
"| false | 3 | | | v3 | | 1970-01-01T00:00:00.000000002Z | |",
"| true | | 3 | | | v6 | 1970-01-01T00:00:00.000000003Z | |",
"+-------+----+----+----+------+----+--------------------------------+----+",
],
&[batch.to_arrow(Selection::All).unwrap()]
);
}
#[test]
fn test_entry_to_batches() {
let lp = r#"
cpu,part=a,tag1=v1 fv=3.0,sv="32" 1
cpu,part=a,tag1=v1 fv=3.0 2
cpu,part=a,tag2=v1 iv=3 1
cpu,part=a,tag2=v1 iv=3 2
mem,part=a,tag1=v2 v=2 1
mem,part=a,tag1=v2 v=2,b=true 2
mem,part=b,tag1=v2 v=2 2
"#;
let entries = lp_to_entries(
lp,
&PartitionTemplate {
parts: vec![TemplatePart::Column("part".to_string())],
},
);
assert_eq!(entries.len(), 1);
let entry = entries.into_iter().next().unwrap();
assert_eq!(entry.table_batches().count(), 3);
// Should flatten partitioning
let batches = entry_to_batches(&entry).unwrap();
assert_eq!(batches.len(), 2);
assert_batches_eq!(
&[
"+----+----+------+----+------+------+--------------------------------+",
"| fv | iv | part | sv | tag1 | tag2 | time |",
"+----+----+------+----+------+------+--------------------------------+",
"| 3 | | a | 32 | v1 | | 1970-01-01T00:00:00.000000001Z |",
"| 3 | | a | | v1 | | 1970-01-01T00:00:00.000000002Z |",
"| | 3 | a | | | v1 | 1970-01-01T00:00:00.000000001Z |",
"| | 3 | a | | | v1 | 1970-01-01T00:00:00.000000002Z |",
"+----+----+------+----+------+------+--------------------------------+",
],
&[batches["cpu"].to_arrow(Selection::All).unwrap()]
);
assert_batches_eq!(
&[
"+------+------+------+--------------------------------+---+",
"| b | part | tag1 | time | v |",
"+------+------+------+--------------------------------+---+",
"| | a | v2 | 1970-01-01T00:00:00.000000001Z | 2 |",
"| true | a | v2 | 1970-01-01T00:00:00.000000002Z | 2 |",
"| | b | v2 | 1970-01-01T00:00:00.000000002Z | 2 |",
"+------+------+------+--------------------------------+---+",
],
&[batches["mem"].to_arrow(Selection::All).unwrap()]
);
}
}

View File

@ -5,11 +5,9 @@ edition = "2021"
description = "MutableBatch integration tests and benchmarks"
[dependencies]
entry = { path = "../entry" }
flate2 = "1.0"
generated_types = { path = "../generated_types" }
mutable_batch = { path = "../mutable_batch" }
mutable_batch_entry = { path = "../mutable_batch_entry" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
prost = "0.8"
@ -22,10 +20,6 @@ criterion = "0.3"
name = "write_lp"
harness = false
[[bench]]
name = "write_entry"
harness = false
[[bench]]
name = "write_pb"

View File

@ -1,38 +0,0 @@
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use entry::Entry;
use mutable_batch_entry::entry_to_batches;
use mutable_batch_tests::benchmark_lp;
fn generate_entry_bytes() -> Vec<(String, (usize, Bytes))> {
benchmark_lp()
.into_iter()
.map(|(bench, lp)| {
(
bench,
(lp.len(), entry::test_helpers::lp_to_entry(&lp).into()),
)
})
.collect()
}
pub fn write_entry(c: &mut Criterion) {
let mut group = c.benchmark_group("write_entry");
for (bench, (lp_bytes, entry_bytes)) in generate_entry_bytes() {
group.throughput(Throughput::Bytes(lp_bytes as u64));
group.bench_function(BenchmarkId::from_parameter(bench), |b| {
b.iter(|| {
let entry: Entry = entry_bytes.clone().try_into().unwrap();
let batches = entry_to_batches(&entry).unwrap();
assert_eq!(batches.len(), 1);
});
});
}
group.finish();
}
criterion_group!(benches, write_entry);
criterion_main!(benches);

View File

@ -7,7 +7,6 @@ edition = "2021"
[dependencies] # In alphabetical order
arrow = { version = "6.0", features = ["prettyprint"] }
data_types = { path = "../data_types" }
entry = { path = "../entry" }
schema = { path = "../schema" }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch" }

View File

@ -14,7 +14,7 @@ schema = { path = "../schema" }
observability_deps = { path = "../observability_deps" }
ordered-float = "2"
regex = "1"
serde_json = "1.0.67"
serde_json = "1.0.69"
snafu = "0.6.9"
sqlparser = "0.12.0"

View File

@ -3,7 +3,7 @@ use std::{
ops::Deref,
};
use generated_types::influxdata::iox::preserved_catalog::v1 as proto;
use generated_types::influxdata::iox::predicate::v1 as proto;
use ordered_float::OrderedFloat;
use snafu::{OptionExt, ResultExt, Snafu};

View File

@ -9,7 +9,7 @@
use std::convert::TryInto;
use data_types::timestamp::TimestampRange;
use generated_types::influxdata::iox::preserved_catalog::v1 as proto;
use generated_types::influxdata::iox::predicate::v1 as proto;
use snafu::{ResultExt, Snafu};
use crate::{delete_expr::DeleteExpr, delete_predicate::DeletePredicate};

View File

@ -1,26 +0,0 @@
#!/bin/bash -eu
# Change to the query_tests crate directory, where this script is located
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
pushd $DIR
echo "Regenerating query_tests..."
(cd generate && cargo run)
echo "Checking for uncommitted changes..."
if ! git diff --quiet HEAD --; then
echo "git diff found:"
git diff HEAD
echo "************************************************************"
echo "* Found uncommitted changes to generated flatbuffers code! *"
echo "* Please do:"
echo "* cd query_tests/generate"
echo "* cargo run"
echo "* to regenerate the query_tests code and check it in! *"
echo "************************************************************"
exit 1
else
echo "No uncommitted changes; everything is awesome."
fi

View File

@ -11,10 +11,11 @@ use crate::scenarios::{
use super::scenarios::*;
use arrow::record_batch::RecordBatch;
use arrow_util::assert_batches_sorted_eq;
use datafusion::error::DataFusionError;
use query::{exec::ExecutionContextProvider, frontend::sql::SqlQueryPlanner};
use test_helpers::assert_contains;
/// Runs table_names(predicate) and compares it to the expected
/// output.
/// Runs the query in `sql` and compares it to the expected output.
async fn run_sql_test_case<D>(db_setup: D, sql: &str, expected_lines: &[&str])
where
D: DbSetup,
@ -42,6 +43,38 @@ where
}
}
/// Runs the query in `sql` which is expected to error, and ensures
/// the output contains the expected message.
async fn run_sql_error_test_case<D>(db_setup: D, sql: &str, expected_error: &str)
where
D: DbSetup,
{
test_helpers::maybe_start_logging();
let sql = sql.to_string();
for scenario in db_setup.make().await {
let DbScenario {
scenario_name, db, ..
} = scenario;
println!("Running scenario '{}'", scenario_name);
println!("SQL: '{:#?}'", sql);
let planner = SqlQueryPlanner::default();
let ctx = db.new_query_context(None);
let result: Result<(), DataFusionError> = async {
let physical_plan = planner.query(&sql, &ctx).await?;
ctx.collect(physical_plan).await?;
Ok(())
}
.await;
let err = result.expect_err("Expected failure to plan");
assert_contains!(err.to_string(), expected_error);
}
}
#[tokio::test]
async fn sql_select_with_schema_merge() {
let expected = vec![
@ -920,4 +953,16 @@ async fn sql_select_time_max_foo_gb_time() {
.await;
}
#[tokio::test]
async fn sql_create_external_table() {
let expected_error = "Unsupported logical plan: CreateExternalTable";
// Datafusion supports CREATE EXTERNAL TABLE, but IOx should not (as that would be a security hole)
run_sql_error_test_case(
scenarios::delete::NoDeleteOneChunk {},
"CREATE EXTERNAL TABLE foo(ts TIMESTAMP) STORED AS CSV LOCATION '/tmp/foo.csv';",
expected_error,
)
.await;
}
// --------------------------------------------------------

View File

@ -4,15 +4,22 @@ version = "0.1.0"
edition = "2021"
[dependencies]
async-trait = "0.1"
cache_loader_async = "0.1.2"
data_types = { path = "../data_types" }
hashbrown = "0.11"
influxdb_iox_client = { path = "../influxdb_iox_client" }
metric = { path = "../metric" }
mutable_batch = { path = "../mutable_batch" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
observability_deps = { path = "../observability_deps" }
trace = { path = "../trace" }
parking_lot = "0.11.2"
snafu = "0.6"
time = { path = "../time" }
write_buffer = { path = "../write_buffer" }
[dev-dependencies]
mutable_batch_lp = { path = "../mutable_batch_lp" }
regex = "1.4"
time = { path = "../time" }
tokio = { version = "1.13", features = ["macros", "time"] }

View File

@ -0,0 +1,152 @@
use std::sync::Arc;
use cache_loader_async::cache_api::LoadingCache;
use data_types::write_buffer::WriteBufferConnection;
use observability_deps::tracing::debug;
use write_buffer::{
config::WriteBufferConfigFactory,
core::{WriteBufferError, WriteBufferWriting},
};
use crate::grpc_client::GrpcClient;
type KeyWriteBufferProducer = (String, WriteBufferConnection);
pub type ConnectionError = Arc<dyn std::error::Error + Send + Sync + 'static>;
/// Stupid hack to fit the `Box<dyn ...>` in `WriteBufferError` into an `Arc`
struct EWrapper(WriteBufferError);
impl std::fmt::Debug for EWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::fmt::Display for EWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::error::Error for EWrapper {}
/// Connection pool for the entire routing server.
///
/// This avoids:
/// 1. That every [`Router`](crate::router::Router) uses their own connections
/// 2. That we open too many connections in total.
#[derive(Debug)]
pub struct ConnectionPool {
grpc_clients: LoadingCache<String, Arc<dyn GrpcClient>, ConnectionError>,
write_buffer_producers:
LoadingCache<KeyWriteBufferProducer, Arc<dyn WriteBufferWriting>, ConnectionError>,
}
impl ConnectionPool {
/// Create new connection pool.
///
/// If `use_mock_grpc` is set only mock gRPC clients are created.
pub async fn new(use_mock_grpc: bool, wb_factory: WriteBufferConfigFactory) -> Self {
// Note: this function is async even though it does not contain any `.await` calls because `LoadingCache::new`
// requires tokio to be running and even if documented people will forget about this.
let grpc_clients = if use_mock_grpc {
LoadingCache::new(|_connection_string: String| async move {
use crate::grpc_client::MockClient;
Ok(Arc::new(MockClient::default()) as Arc<dyn GrpcClient>)
})
} else {
LoadingCache::new(|connection_string: String| async move {
use crate::grpc_client::RealClient;
use influxdb_iox_client::connection::Builder;
let connection = Builder::default()
.build(&connection_string)
.await
.map_err(|e| Arc::new(e) as ConnectionError)?;
Ok(Arc::new(RealClient::new(connection)) as Arc<dyn GrpcClient>)
})
};
let wb_factory = Arc::new(wb_factory);
let write_buffer_producers = LoadingCache::new(move |key: KeyWriteBufferProducer| {
let wb_factory = Arc::clone(&wb_factory);
async move {
wb_factory
.new_config_write(&key.0, &key.1)
.await
.map_err(|e| Arc::new(EWrapper(e)) as ConnectionError)
}
});
Self {
grpc_clients,
write_buffer_producers,
}
}
/// Create new connection factory for testing purposes.
#[cfg(test)]
pub async fn new_testing() -> Self {
use time::SystemProvider;
let time_provider = Arc::new(SystemProvider::new());
Self::new(true, WriteBufferConfigFactory::new(time_provider)).await
}
/// Get gRPC client given a connection string.
pub async fn grpc_client(
&self,
connection_string: &str,
) -> Result<Arc<dyn GrpcClient>, ConnectionError> {
let res = self
.grpc_clients
.get_with_meta(connection_string.to_string())
.await
.map_err(|e| Arc::new(e) as ConnectionError)?;
debug!(was_cached=%res.cached, %connection_string, "getting IOx client");
Ok(res.result)
}
/// Get write buffer producer given a DB name and config.
pub async fn write_buffer_producer(
&self,
db_name: &str,
cfg: &WriteBufferConnection,
) -> Result<Arc<dyn WriteBufferWriting>, ConnectionError> {
let res = self
.write_buffer_producers
.get_with_meta((db_name.to_string(), cfg.clone()))
.await
.map_err(|e| Arc::new(e) as ConnectionError)?;
debug!(was_cached=%res.cached, %db_name, "getting write buffer");
Ok(res.result)
}
}
#[cfg(test)]
mod tests {
use time::{SystemProvider, TimeProvider};
use crate::grpc_client::MockClient;
use super::*;
#[tokio::test]
async fn test_grpc_mocking() {
let time_provider: Arc<dyn TimeProvider> = Arc::new(SystemProvider::new());
let pool1 = ConnectionPool::new(
false,
WriteBufferConfigFactory::new(Arc::clone(&time_provider)),
)
.await;
// connection will fail
pool1.grpc_client("foo").await.unwrap_err();
let pool2 = ConnectionPool::new(true, WriteBufferConfigFactory::new(time_provider)).await;
let client2 = pool2.grpc_client("foo").await.unwrap();
client2.as_any().downcast_ref::<MockClient>().unwrap();
}
}

226
router/src/grpc_client.rs Normal file
View File

@ -0,0 +1,226 @@
//! gRPC clients abastraction.
//!
//! This abstraction was created for easier testing.
use std::{
any::Any,
sync::atomic::{AtomicBool, Ordering},
};
use async_trait::async_trait;
use mutable_batch::DbWrite;
use parking_lot::RwLock;
/// Generic write error.
pub type WriteError = Box<dyn std::error::Error + Send + Sync>;
/// An abstract IOx gRPC client.
#[async_trait]
pub trait GrpcClient: Sync + Send + std::fmt::Debug + 'static {
/// Write data to the given database.
async fn write(&self, db_name: &str, write: &DbWrite) -> Result<(), WriteError>;
/// Cast client to [`Any`], useful for downcasting.
fn as_any(&self) -> &dyn Any;
}
/// A real, network-driven gRPC client.
#[derive(Debug)]
pub struct RealClient {
/// Write client for IOx.
write_client: influxdb_iox_client::write::Client,
}
impl RealClient {
/// Create new client from established connection.
pub fn new(connection: influxdb_iox_client::connection::Connection) -> Self {
Self {
write_client: influxdb_iox_client::write::Client::new(connection),
}
}
}
#[async_trait]
impl GrpcClient for RealClient {
async fn write(&self, db_name: &str, write: &DbWrite) -> Result<(), WriteError> {
use influxdb_iox_client::write::generated_types::WriteRequest;
use mutable_batch_pb::encode::encode_write;
let write_request = WriteRequest {
database_batch: Some(encode_write(db_name, write)),
};
// cheap, see https://docs.rs/tonic/0.4.2/tonic/client/index.html#concurrent-usage
let mut client = self.write_client.clone();
client
.write_pb(write_request)
.await
.map_err(|e| Box::new(e) as _)
}
fn as_any(&self) -> &dyn Any {
self
}
}
/// Mock client for testing.
#[derive(Debug, Default)]
pub struct MockClient {
/// All writes recorded by this client.
writes: RwLock<Vec<(String, DbWrite)>>,
/// Poisen pill.
///
/// If set to `true` all writes will fail.
poisoned: AtomicBool,
}
impl MockClient {
/// Take poison pill.
///
/// All subsequent writes will fail.
pub fn poison(&self) {
self.poisoned.store(true, Ordering::SeqCst)
}
/// Get a copy of all recorded writes.
pub fn writes(&self) -> Vec<(String, DbWrite)> {
self.writes.read().clone()
}
/// Assert that writes are as expected.
pub fn assert_writes(&self, expected: &[(String, DbWrite)]) {
use mutable_batch::test_util::assert_writes_eq;
let actual = self.writes();
assert_eq!(
actual.len(),
expected.len(),
"number of writes differ ({} VS {})",
actual.len(),
expected.len()
);
for ((actual_db, actual_write), (expected_db, expected_write)) in
actual.iter().zip(expected)
{
assert_eq!(
actual_db, expected_db,
"database names differ (\"{}\" VS \"{}\")",
actual_db, expected_db
);
assert_writes_eq(actual_write, expected_write);
}
}
}
#[async_trait]
impl GrpcClient for MockClient {
async fn write(&self, db_name: &str, write: &DbWrite) -> Result<(), WriteError> {
if self.poisoned.load(Ordering::SeqCst) {
return Err("poisened".to_string().into());
}
self.writes
.write()
.push((db_name.to_string(), write.clone()));
Ok(())
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use mutable_batch_lp::lines_to_batches;
use super::*;
#[tokio::test]
async fn test_mock() {
let client = MockClient::default();
let write1 = DbWrite::new(
lines_to_batches("foo x=1 1", 0).unwrap(),
Default::default(),
);
let write2 = DbWrite::new(
lines_to_batches("foo x=2 2", 0).unwrap(),
Default::default(),
);
let write3 = DbWrite::new(
lines_to_batches("foo x=3 3", 0).unwrap(),
Default::default(),
);
client.write("db1", &write1).await.unwrap();
client.write("db2", &write1).await.unwrap();
client.write("db1", &write2).await.unwrap();
let expected_writes = vec![
(String::from("db1"), write1.clone()),
(String::from("db2"), write1.clone()),
(String::from("db1"), write2.clone()),
];
client.assert_writes(&expected_writes);
client.poison();
client.write("db1", &write3).await.unwrap_err();
client.assert_writes(&expected_writes);
}
#[tokio::test]
#[should_panic(expected = "number of writes differ (1 VS 0)")]
async fn test_assert_writes_fail_count() {
let client = MockClient::default();
let write1 = DbWrite::new(
lines_to_batches("foo x=1 1", 0).unwrap(),
Default::default(),
);
client.write("db1", &write1).await.unwrap();
let expected_writes = [];
client.assert_writes(&expected_writes);
}
#[tokio::test]
#[should_panic(expected = "database names differ (\"db1\" VS \"db2\")")]
async fn test_assert_writes_fail_db_name() {
let client = MockClient::default();
let write = DbWrite::new(
lines_to_batches("foo x=1 1", 0).unwrap(),
Default::default(),
);
client.write("db1", &write).await.unwrap();
let expected_writes = vec![(String::from("db2"), write)];
client.assert_writes(&expected_writes);
}
#[tokio::test]
#[should_panic(expected = "batches for table \"foo\" differ")]
async fn test_assert_writes_fail_batch() {
let client = MockClient::default();
let write1 = DbWrite::new(
lines_to_batches("foo x=1 1", 0).unwrap(),
Default::default(),
);
let write2 = DbWrite::new(
lines_to_batches("foo x=2 2", 0).unwrap(),
Default::default(),
);
client.write("db1", &write1).await.unwrap();
let expected_writes = vec![(String::from("db1"), write2)];
client.assert_writes(&expected_writes);
}
}

View File

@ -8,7 +8,10 @@
clippy::clone_on_ref_ptr
)]
pub mod connection_pool;
pub mod grpc_client;
pub mod resolver;
pub mod router;
pub mod server;
pub mod sharder;
pub mod write_sink;

View File

@ -1,16 +1,91 @@
use data_types::router::Router as RouterConfig;
use std::{
collections::{BTreeMap, HashMap},
fmt::Write,
sync::Arc,
};
use data_types::router::{Router as RouterConfig, ShardId};
use mutable_batch::DbWrite;
use snafu::{ResultExt, Snafu};
use crate::{
connection_pool::ConnectionPool, resolver::Resolver, sharder::shard_write,
write_sink::WriteSinkSet,
};
#[derive(Debug, Snafu)]
pub enum WriteErrorShard {
#[snafu(display("Did not find sink set for shard ID {}", shard_id.get()))]
NoSinkSetFound { shard_id: ShardId },
#[snafu(display("Write to sink set failed: {}", source))]
SinkSetFailure { source: crate::write_sink::Error },
}
#[derive(Debug, Snafu)]
pub enum WriteError {
#[snafu(display("One or more writes failed: {}", fmt_write_errors(errors)))]
MultiWriteFailure {
errors: BTreeMap<ShardId, WriteErrorShard>,
},
}
fn fmt_write_errors(errors: &BTreeMap<ShardId, WriteErrorShard>) -> String {
const MAX_ERRORS: usize = 2;
let mut out = String::new();
for (shard_id, error) in errors.iter().take(MAX_ERRORS) {
if !out.is_empty() {
write!(&mut out, ", ").expect("write to string failed?!");
}
write!(&mut out, "{} => \"{}\"", shard_id, error).expect("write to string failed?!");
}
if errors.len() > MAX_ERRORS {
write!(&mut out, "...").expect("write to string failed?!");
}
out
}
/// Router for a single database.
#[derive(Debug)]
pub struct Router {
/// Router config.
config: RouterConfig,
/// We use a [`HashMap`] here for `O(1)` lookups. Do not rely on the iteration order.
write_sink_sets: HashMap<ShardId, WriteSinkSet>,
}
impl Router {
/// Create new router from config.
pub fn new(config: RouterConfig) -> Self {
Self { config }
pub fn new(
config: RouterConfig,
resolver: Arc<Resolver>,
connection_pool: Arc<ConnectionPool>,
) -> Self {
let write_sink_sets = config
.write_sinks
.iter()
.map(|(shard_id, set_config)| {
(
*shard_id,
WriteSinkSet::new(
&config.name,
set_config.clone(),
Arc::clone(&resolver),
Arc::clone(&connection_pool),
),
)
})
.collect();
Self {
config,
write_sink_sets,
}
}
/// Router config.
@ -24,22 +99,252 @@ impl Router {
pub fn name(&self) -> &str {
&self.config.name
}
/// Shard and write data.
pub async fn write(&self, write: DbWrite) -> Result<(), WriteError> {
let mut errors: BTreeMap<ShardId, WriteErrorShard> = Default::default();
// The iteration order is stable here due to the [`BTreeMap`], so we ensure deterministic behavior and error order.
let sharded: BTreeMap<_, _> = shard_write(&write, &self.config.write_sharder);
for (shard_id, write) in sharded {
if let Err(e) = self.write_shard(shard_id, &write).await {
errors.insert(shard_id, e);
}
}
if errors.is_empty() {
Ok(())
} else {
Err(WriteError::MultiWriteFailure { errors })
}
}
async fn write_shard(&self, shard_id: ShardId, write: &DbWrite) -> Result<(), WriteErrorShard> {
match self.write_sink_sets.get(&shard_id) {
Some(sink_set) => sink_set.write(write).await.context(SinkSetFailure),
None => Err(WriteErrorShard::NoSinkSetFound { shard_id }),
}
}
}
#[cfg(test)]
mod tests {
use crate::{grpc_client::MockClient, resolver::RemoteTemplate};
use super::*;
#[test]
fn test_getters() {
use data_types::{
router::{
Matcher, MatcherToShard, ShardConfig, WriteSink as WriteSinkConfig,
WriteSinkSet as WriteSinkSetConfig, WriteSinkVariant as WriteSinkVariantConfig,
},
sequence::Sequence,
server_id::ServerId,
};
use mutable_batch::WriteMeta;
use mutable_batch_lp::lines_to_batches;
use regex::Regex;
use time::Time;
#[tokio::test]
async fn test_getters() {
let resolver = Arc::new(Resolver::new(None));
let connection_pool = Arc::new(ConnectionPool::new_testing().await);
let cfg = RouterConfig {
name: String::from("my_router"),
write_sharder: Default::default(),
write_sinks: Default::default(),
query_sinks: Default::default(),
};
let router = Router::new(cfg.clone());
let router = Router::new(cfg.clone(), resolver, connection_pool);
assert_eq!(router.config(), &cfg);
assert_eq!(router.name(), "my_router");
}
#[tokio::test]
async fn test_write() {
let server_id_1 = ServerId::try_from(1).unwrap();
let server_id_2 = ServerId::try_from(2).unwrap();
let server_id_3 = ServerId::try_from(3).unwrap();
let resolver = Arc::new(Resolver::new(Some(RemoteTemplate::new("{id}"))));
let connection_pool = Arc::new(ConnectionPool::new_testing().await);
let client_1 = connection_pool.grpc_client("1").await.unwrap();
let client_2 = connection_pool.grpc_client("2").await.unwrap();
let client_3 = connection_pool.grpc_client("3").await.unwrap();
let client_1 = client_1.as_any().downcast_ref::<MockClient>().unwrap();
let client_2 = client_2.as_any().downcast_ref::<MockClient>().unwrap();
let client_3 = client_3.as_any().downcast_ref::<MockClient>().unwrap();
let cfg = RouterConfig {
name: String::from("my_router"),
write_sharder: ShardConfig {
specific_targets: vec![
MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new("foo_bar").unwrap()),
},
shard: ShardId::new(10),
},
MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new("foo_three").unwrap()),
},
shard: ShardId::new(30),
},
MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new("foo_.*").unwrap()),
},
shard: ShardId::new(20),
},
MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new("doom").unwrap()),
},
shard: ShardId::new(40),
},
MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new("nooo").unwrap()),
},
shard: ShardId::new(50),
},
MatcherToShard {
matcher: Matcher {
table_name_regex: Some(Regex::new(".*").unwrap()),
},
shard: ShardId::new(20),
},
],
hash_ring: None,
},
write_sinks: BTreeMap::from([
(
ShardId::new(10),
WriteSinkSetConfig {
sinks: vec![WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id_1),
ignore_errors: false,
}],
},
),
(
ShardId::new(20),
WriteSinkSetConfig {
sinks: vec![WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id_2),
ignore_errors: false,
}],
},
),
(
ShardId::new(30),
WriteSinkSetConfig {
sinks: vec![WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id_3),
ignore_errors: false,
}],
},
),
]),
query_sinks: Default::default(),
};
let router = Router::new(cfg.clone(), resolver, connection_pool);
// clean write
let meta_1 = WriteMeta::sequenced(
Sequence::new(1, 2),
Time::from_timestamp_nanos(1337),
None,
10,
);
let write_1 = db_write(
&["foo_x x=2 2", "foo_bar x=1 1", "foo_y x=3 3", "www x=4 4"],
&meta_1,
);
router.write(write_1).await.unwrap();
client_1.assert_writes(&[(
String::from("my_router"),
db_write(&["foo_bar x=1 1"], &meta_1),
)]);
client_2.assert_writes(&[(
String::from("my_router"),
db_write(&["foo_x x=2 2", "foo_y x=3 3", "www x=4 4"], &meta_1),
)]);
// write w/ errors
client_2.poison();
let meta_2 = WriteMeta::sequenced(
Sequence::new(3, 4),
Time::from_timestamp_nanos(42),
None,
20,
);
let write_2 = db_write(
&[
"foo_bar x=5 5",
"doom x=6 6",
"foo_bar x=7 7",
"www x=8 8",
"foo_bar x=9 9",
"nooo x=10 10",
"foo_bar x=11 11",
"foo_three x=12 12",
"doom x=13 13",
"foo_three x=14 14",
"www x=15 15",
"foo_three x=16 16",
"nooo x=17 17",
"foo_three x=18 18",
],
&meta_2,
);
let err = router.write(write_2).await.unwrap_err();
assert_eq!(err.to_string(), "One or more writes failed: ShardId(20) => \"Write to sink set failed: Cannot write: poisened\", ShardId(40) => \"Did not find sink set for shard ID 40\"...");
client_1.assert_writes(&[
(
String::from("my_router"),
db_write(&["foo_bar x=1 1"], &meta_1),
),
(
String::from("my_router"),
db_write(
&[
"foo_bar x=5 5",
"foo_bar x=7 7",
"foo_bar x=9 9",
"foo_bar x=11 11",
],
&meta_2,
),
),
]);
client_2.assert_writes(&[(
String::from("my_router"),
db_write(&["foo_x x=2 2", "foo_y x=3 3", "www x=4 4"], &meta_1),
)]);
client_3.assert_writes(&[(
String::from("my_router"),
db_write(
&[
"foo_three x=12 12",
"foo_three x=14 14",
"foo_three x=16 16",
"foo_three x=18 18",
],
&meta_2,
),
)]);
}
fn db_write(lines: &[&str], meta: &WriteMeta) -> DbWrite {
DbWrite::new(
lines_to_batches(&lines.join("\n"), 0).unwrap(),
meta.clone(),
)
}
}

View File

@ -4,9 +4,12 @@ use data_types::{router::Router as RouterConfig, server_id::ServerId};
use metric::Registry as MetricRegistry;
use parking_lot::RwLock;
use snafu::Snafu;
use time::TimeProvider;
use trace::TraceCollector;
use write_buffer::config::WriteBufferConfigFactory;
use crate::{
connection_pool::ConnectionPool,
resolver::{RemoteTemplate, Resolver},
router::Router,
};
@ -26,12 +29,14 @@ pub struct RouterServer {
trace_collector: Option<Arc<dyn TraceCollector>>,
routers: RwLock<BTreeMap<String, Arc<Router>>>,
resolver: Arc<Resolver>,
connection_pool: Arc<ConnectionPool>,
}
impl RouterServer {
pub fn new(
pub async fn new(
remote_template: Option<RemoteTemplate>,
trace_collector: Option<Arc<dyn TraceCollector>>,
time_provider: Arc<dyn TimeProvider>,
) -> Self {
let metric_registry = Arc::new(metric::Registry::new());
@ -41,6 +46,9 @@ impl RouterServer {
trace_collector,
routers: Default::default(),
resolver: Arc::new(Resolver::new(remote_template)),
connection_pool: Arc::new(
ConnectionPool::new(false, WriteBufferConfigFactory::new(time_provider)).await,
),
}
}
@ -86,7 +94,11 @@ impl RouterServer {
///
/// Returns `true` if the router already existed.
pub fn update_router(&self, config: RouterConfig) -> bool {
let router = Router::new(config);
let router = Router::new(
config,
Arc::clone(&self.resolver),
Arc::clone(&self.connection_pool),
);
self.routers
.write()
.insert(router.name().to_string(), Arc::new(router))
@ -96,8 +108,15 @@ impl RouterServer {
/// Delete router.
///
/// Returns `true` if the router existed.
pub fn delete_router(&self, name: &str) -> bool {
self.routers.write().remove(name).is_some()
pub fn delete_router(&self, router_name: &str) -> bool {
self.routers.write().remove(router_name).is_some()
}
/// Get registered router, if any.
///
/// The router name is identical to the database for which this router handles data.
pub fn router(&self, router_name: &str) -> Option<Arc<Router>> {
self.routers.read().get(router_name).cloned()
}
/// Resolver associated with this server.
@ -107,10 +126,14 @@ impl RouterServer {
}
pub mod test_utils {
use std::sync::Arc;
use time::SystemProvider;
use super::RouterServer;
pub fn make_router_server() -> RouterServer {
RouterServer::new(None, None)
pub async fn make_router_server() -> RouterServer {
RouterServer::new(None, None, Arc::new(SystemProvider::new())).await
}
}
@ -122,13 +145,13 @@ mod tests {
use super::*;
#[test]
fn test_server_id() {
#[tokio::test]
async fn test_server_id() {
let id13 = ServerId::try_from(13).unwrap();
let id42 = ServerId::try_from(42).unwrap();
// server starts w/o any ID
let server = make_router_server();
let server = make_router_server().await;
assert_eq!(server.server_id(), None);
// setting ID
@ -144,9 +167,9 @@ mod tests {
assert!(matches!(err, SetServerIdError::AlreadySet { .. }));
}
#[test]
fn test_router_crud() {
let server = make_router_server();
#[tokio::test]
async fn test_router_crud() {
let server = make_router_server().await;
let cfg_foo_1 = RouterConfig {
name: String::from("foo"),
@ -180,6 +203,8 @@ mod tests {
assert_eq!(routers.len(), 2);
assert_eq!(routers[0].config(), &cfg_bar);
assert_eq!(routers[1].config(), &cfg_foo_1);
assert_eq!(server.router("bar").unwrap().config(), &cfg_bar);
assert_eq!(server.router("foo").unwrap().config(), &cfg_foo_1);
// update router
assert!(server.update_router(cfg_foo_2.clone()));
@ -187,12 +212,18 @@ mod tests {
assert_eq!(routers.len(), 2);
assert_eq!(routers[0].config(), &cfg_bar);
assert_eq!(routers[1].config(), &cfg_foo_2);
assert_eq!(server.router("bar").unwrap().config(), &cfg_bar);
assert_eq!(server.router("foo").unwrap().config(), &cfg_foo_2);
// delete routers
assert!(server.delete_router("foo"));
let routers = server.routers();
assert_eq!(routers.len(), 1);
assert_eq!(routers[0].config(), &cfg_bar);
assert_eq!(server.router("bar").unwrap().config(), &cfg_bar);
assert!(server.router("foo").is_none());
// deleting router a 2nd time works
assert!(!server.delete_router("foo"));
}
}

368
router/src/write_sink.rs Normal file
View File

@ -0,0 +1,368 @@
use std::sync::Arc;
use data_types::{
router::{
WriteSink as WriteSinkConfig, WriteSinkSet as WriteSinkSetConfig,
WriteSinkVariant as WriteSinkVariantConfig,
},
server_id::ServerId,
write_buffer::WriteBufferConnection,
};
use mutable_batch::DbWrite;
use snafu::{OptionExt, ResultExt, Snafu};
use crate::{
connection_pool::{ConnectionError, ConnectionPool},
resolver::Resolver,
};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("No remote for server ID {}", server_id))]
NoRemote { server_id: ServerId },
#[snafu(display("Cannot connect: {}", source))]
ConnectionFailure { source: ConnectionError },
#[snafu(display("Cannot write: {}", source))]
WriteFailure {
source: Box<dyn std::error::Error + Send + Sync>,
},
}
#[derive(Debug)]
struct VariantGrpcRemote {
db_name: String,
server_id: ServerId,
resolver: Arc<Resolver>,
connection_pool: Arc<ConnectionPool>,
}
impl VariantGrpcRemote {
fn new(
db_name: String,
server_id: ServerId,
resolver: Arc<Resolver>,
connection_pool: Arc<ConnectionPool>,
) -> Self {
Self {
db_name,
server_id,
resolver,
connection_pool,
}
}
async fn write(&self, write: &DbWrite) -> Result<(), Error> {
let connection_string = self
.resolver
.resolve_remote(self.server_id)
.context(NoRemote {
server_id: self.server_id,
})?;
let client = self
.connection_pool
.grpc_client(&connection_string)
.await
.context(ConnectionFailure)?;
client
.write(&self.db_name, write)
.await
.context(WriteFailure)
}
}
#[derive(Debug)]
struct VariantWriteBuffer {
db_name: String,
write_buffer_cfg: WriteBufferConnection,
connection_pool: Arc<ConnectionPool>,
}
impl VariantWriteBuffer {
fn new(
db_name: String,
write_buffer_cfg: WriteBufferConnection,
connection_pool: Arc<ConnectionPool>,
) -> Self {
Self {
db_name,
write_buffer_cfg,
connection_pool,
}
}
async fn write(&self, write: &DbWrite) -> Result<(), Error> {
let write_buffer = self
.connection_pool
.write_buffer_producer(&self.db_name, &self.write_buffer_cfg)
.await
.context(ConnectionFailure)?;
// TODO(marco): use multiple sequencers
write_buffer
.store_write(0, write)
.await
.context(WriteFailure)?;
Ok(())
}
}
#[derive(Debug)]
enum WriteSinkVariant {
/// Send write to a remote server via gRPC
GrpcRemote(VariantGrpcRemote),
/// Send write to a write buffer (which may be backed by kafka, local disk, etc)
WriteBuffer(VariantWriteBuffer),
}
/// Write sink abstraction.
#[derive(Debug)]
pub struct WriteSink {
ignore_errors: bool,
variant: WriteSinkVariant,
}
impl WriteSink {
pub fn new(
db_name: &str,
config: WriteSinkConfig,
resolver: Arc<Resolver>,
connection_pool: Arc<ConnectionPool>,
) -> Self {
let variant = match config.sink {
WriteSinkVariantConfig::GrpcRemote(server_id) => WriteSinkVariant::GrpcRemote(
VariantGrpcRemote::new(db_name.to_string(), server_id, resolver, connection_pool),
),
WriteSinkVariantConfig::WriteBuffer(write_buffer_cfg) => WriteSinkVariant::WriteBuffer(
VariantWriteBuffer::new(db_name.to_string(), write_buffer_cfg, connection_pool),
),
};
Self {
ignore_errors: config.ignore_errors,
variant,
}
}
pub async fn write(&self, write: &DbWrite) -> Result<(), Error> {
let res = match &self.variant {
WriteSinkVariant::GrpcRemote(v) => v.write(write).await,
WriteSinkVariant::WriteBuffer(v) => v.write(write).await,
};
match res {
Ok(()) => Ok(()),
Err(_) if self.ignore_errors => Ok(()),
e => e,
}
}
}
/// A set of [`WriteSink`]s.
#[derive(Debug)]
pub struct WriteSinkSet {
sinks: Vec<WriteSink>,
}
impl WriteSinkSet {
/// Create new set from config.
pub fn new(
db_name: &str,
config: WriteSinkSetConfig,
resolver: Arc<Resolver>,
connection_pool: Arc<ConnectionPool>,
) -> Self {
Self {
sinks: config
.sinks
.into_iter()
.map(|sink_config| {
WriteSink::new(
db_name,
sink_config,
Arc::clone(&resolver),
Arc::clone(&connection_pool),
)
})
.collect(),
}
}
/// Write to sinks. Fails on first error.
pub async fn write(&self, write: &DbWrite) -> Result<(), Error> {
for sink in &self.sinks {
sink.write(write).await?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use data_types::write_buffer::WriteBufferDirection;
use mutable_batch_lp::lines_to_batches;
use time::SystemProvider;
use write_buffer::config::WriteBufferConfigFactory;
use crate::grpc_client::MockClient;
use super::*;
#[tokio::test]
async fn test_write_sink_error_handling() {
let server_id = ServerId::try_from(1).unwrap();
let resolver = Arc::new(Resolver::new(None));
resolver.update_remote(server_id, String::from("1.2.3.4"));
let time_provider = Arc::new(SystemProvider::new());
let wb_factory = WriteBufferConfigFactory::new(time_provider);
wb_factory.register_always_fail_mock(String::from("failing_wb"));
let connection_pool = Arc::new(ConnectionPool::new(true, wb_factory).await);
let client_grpc = connection_pool.grpc_client("1.2.3.4").await.unwrap();
let client_grpc = client_grpc.as_any().downcast_ref::<MockClient>().unwrap();
client_grpc.poison();
let write = DbWrite::new(
lines_to_batches("foo x=1 1", 0).unwrap(),
Default::default(),
);
// gRPC, do NOT ignore errors
let config = WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id),
ignore_errors: false,
};
let sink = WriteSink::new(
"my_db",
config,
Arc::clone(&resolver),
Arc::clone(&connection_pool),
);
sink.write(&write).await.unwrap_err();
// gRPC, ignore errors
let config = WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id),
ignore_errors: true,
};
let sink = WriteSink::new(
"my_db",
config,
Arc::clone(&resolver),
Arc::clone(&connection_pool),
);
sink.write(&write).await.unwrap();
// write buffer, do NOT ignore errors
let write_buffer_cfg = WriteBufferConnection {
direction: WriteBufferDirection::Write,
type_: String::from("mock"),
connection: String::from("failing_wb"),
..Default::default()
};
let config = WriteSinkConfig {
sink: WriteSinkVariantConfig::WriteBuffer(write_buffer_cfg.clone()),
ignore_errors: false,
};
let sink = WriteSink::new(
"my_db",
config,
Arc::clone(&resolver),
Arc::clone(&connection_pool),
);
sink.write(&write).await.unwrap_err();
// write buffer, ignore errors
let config = WriteSinkConfig {
sink: WriteSinkVariantConfig::WriteBuffer(write_buffer_cfg),
ignore_errors: true,
};
let sink = WriteSink::new(
"my_db",
config,
Arc::clone(&resolver),
Arc::clone(&connection_pool),
);
sink.write(&write).await.unwrap();
}
#[tokio::test]
async fn test_write_sink_set() {
let server_id_1 = ServerId::try_from(1).unwrap();
let server_id_2 = ServerId::try_from(2).unwrap();
let server_id_3 = ServerId::try_from(3).unwrap();
let resolver = Arc::new(Resolver::new(None));
resolver.update_remote(server_id_1, String::from("1"));
resolver.update_remote(server_id_2, String::from("2"));
resolver.update_remote(server_id_3, String::from("3"));
let connection_pool = Arc::new(ConnectionPool::new_testing().await);
let client_1 = connection_pool.grpc_client("1").await.unwrap();
let client_2 = connection_pool.grpc_client("2").await.unwrap();
let client_3 = connection_pool.grpc_client("3").await.unwrap();
let client_1 = client_1.as_any().downcast_ref::<MockClient>().unwrap();
let client_2 = client_2.as_any().downcast_ref::<MockClient>().unwrap();
let client_3 = client_3.as_any().downcast_ref::<MockClient>().unwrap();
let sink_set = WriteSinkSet::new(
"my_db",
WriteSinkSetConfig {
sinks: vec![
WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id_1),
ignore_errors: false,
},
WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id_2),
ignore_errors: false,
},
WriteSinkConfig {
sink: WriteSinkVariantConfig::GrpcRemote(server_id_3),
ignore_errors: false,
},
],
},
resolver,
connection_pool,
);
let write_1 = DbWrite::new(
lines_to_batches("foo x=1 1", 0).unwrap(),
Default::default(),
);
sink_set.write(&write_1).await.unwrap();
let writes_1 = [(String::from("my_db"), write_1.clone())];
client_1.assert_writes(&writes_1);
client_2.assert_writes(&writes_1);
client_3.assert_writes(&writes_1);
client_2.poison();
let write_2 = DbWrite::new(
lines_to_batches("foo x=2 2", 0).unwrap(),
Default::default(),
);
sink_set.write(&write_2).await.unwrap_err();
// The sink set stops on first non-ignored error. So
// - client 1 got the new data
// - client 2 failed, but still has the data from the first write
// - client 3 got skipped due to the failure, but still has the data from the first write
let writes_2 = [
(String::from("my_db"), write_1.clone()),
(String::from("my_db"), write_2.clone()),
];
client_1.assert_writes(&writes_2);
client_2.assert_writes(&writes_1);
client_3.assert_writes(&writes_1);
}
}

View File

@ -12,7 +12,6 @@ description = "Server related bechmarks, grouped into their own crate to minimiz
[dev-dependencies] # In alphabetical order
arrow_util = { path = "../arrow_util" }
entry = { path = "../entry" }
criterion = { version = "0.3.4", features = ["async_tokio"] }
chrono = { version = "0.4", features = ["serde"] }
datafusion = { path = "../datafusion" }

View File

@ -7,13 +7,11 @@ edition = "2021"
async-trait = "0.1"
data_types = { path = "../data_types" }
dotenv = "0.15.0"
entry = { path = "../entry" }
futures = "0.3"
generated_types = { path = "../generated_types" }
http = "0.2"
httparse = "1.5"
mutable_batch = { path = "../mutable_batch" }
mutable_batch_entry = { path = "../mutable_batch_entry" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
observability_deps = { path = "../observability_deps" }

View File

@ -7,11 +7,9 @@ use http::{HeaderMap, HeaderValue};
use prost::Message;
use data_types::sequence::Sequence;
use entry::{Entry, SequencedEntry};
use generated_types::influxdata::iox::write_buffer::v1::write_buffer_payload::Payload;
use generated_types::influxdata::iox::write_buffer::v1::WriteBufferPayload;
use mutable_batch::{DbWrite, WriteMeta};
use mutable_batch_entry::sequenced_entry_to_write;
use mutable_batch_pb::decode::decode_database_batch;
use time::Time;
use trace::ctx::SpanContext;
@ -20,16 +18,6 @@ use trace_http::ctx::{format_jaeger_trace_context, TraceHeaderParser};
use crate::core::WriteBufferError;
/// Current flatbuffer-based content type.
///
/// This is a value for [`HEADER_CONTENT_TYPE`].
///
/// Inspired by:
/// - <https://stackoverflow.com/a/56502135>
/// - <https://stackoverflow.com/a/48051331>
pub const CONTENT_TYPE_FLATBUFFER: &str =
r#"application/x-flatbuffers; schema="influxdata.iox.write.v1.Entry""#;
/// Pbdata based content type
pub const CONTENT_TYPE_PROTOBUF: &str =
r#"application/x-protobuf; schema="influxdata.iox.write_buffer.v1.WriteBufferPayload""#;
@ -42,7 +30,6 @@ pub const HEADER_TRACE_CONTEXT: &str = "uber-trace-id";
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum ContentType {
Entry,
Protobuf,
}
@ -62,29 +49,20 @@ impl IoxHeaders {
}
}
/// Create new headers where all information is missing.
fn empty() -> Self {
Self {
// Fallback for now https://github.com/influxdata/influxdb_iox/issues/2805
content_type: ContentType::Entry,
span_context: None,
}
}
/// Creates a new IoxHeaders from an iterator of headers
pub fn from_headers(
headers: impl IntoIterator<Item = (impl AsRef<str>, impl AsRef<[u8]>)>,
trace_collector: Option<&Arc<dyn TraceCollector>>,
) -> Result<Self, WriteBufferError> {
let mut res = Self::empty();
let mut span_context = None;
let mut content_type = None;
for (name, value) in headers {
let name = name.as_ref();
if name.eq_ignore_ascii_case(HEADER_CONTENT_TYPE) {
res.content_type = match std::str::from_utf8(value.as_ref()) {
Ok(CONTENT_TYPE_FLATBUFFER) => ContentType::Entry,
Ok(CONTENT_TYPE_PROTOBUF) => ContentType::Protobuf,
content_type = match std::str::from_utf8(value.as_ref()) {
Ok(CONTENT_TYPE_PROTOBUF) => Some(ContentType::Protobuf),
Ok(c) => return Err(format!("Unknown message format: {}", c).into()),
Err(e) => {
return Err(format!("Error decoding content type header: {}", e).into())
@ -101,7 +79,7 @@ impl IoxHeaders {
let parser = TraceHeaderParser::new()
.with_jaeger_trace_context_header_name(HEADER_TRACE_CONTEXT);
res.span_context = match parser.parse(trace_collector, &headers) {
span_context = match parser.parse(trace_collector, &headers) {
Ok(ctx) => ctx,
Err(e) => {
return Err(format!("Error decoding trace context: {}", e).into())
@ -112,7 +90,10 @@ impl IoxHeaders {
}
}
Ok(res)
Ok(Self {
content_type: content_type.ok_or_else(|| "No content type header".to_string())?,
span_context,
})
}
/// Gets the content type
@ -128,7 +109,6 @@ impl IoxHeaders {
/// Returns the header map to encode
pub fn headers(&self) -> impl Iterator<Item = (&str, Cow<'static, str>)> + '_ {
let content_type = match self.content_type {
ContentType::Entry => CONTENT_TYPE_FLATBUFFER.into(),
ContentType::Protobuf => CONTENT_TYPE_PROTOBUF.into(),
};
@ -154,16 +134,6 @@ pub fn decode(
producer_ts: Time,
) -> Result<DbWrite, WriteBufferError> {
match headers.content_type {
ContentType::Entry => {
let entry = Entry::try_from(data.to_vec())?;
let entry = SequencedEntry::new_from_sequence_and_span_context(
sequence,
producer_ts,
entry,
headers.span_context,
);
sequenced_entry_to_write(&entry).map_err(|e| Box::new(e) as WriteBufferError)
}
ContentType::Protobuf => {
let payload: WriteBufferPayload = prost::Message::decode(data)
.map_err(|e| format!("failed to decode WriteBufferPayload: {}", e))?;
@ -209,7 +179,7 @@ mod tests {
let span_context_parent = SpanContext::new(Arc::clone(&collector));
let span_context = span_context_parent.child("foo").ctx;
let iox_headers1 = IoxHeaders::new(ContentType::Entry, Some(span_context));
let iox_headers1 = IoxHeaders::new(ContentType::Protobuf, Some(span_context));
let encoded: Vec<_> = iox_headers1
.headers()

View File

@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
collections::{BTreeMap, BTreeSet},
convert::{TryFrom, TryInto},
num::NonZeroU32,
sync::Arc,
@ -129,7 +129,7 @@ impl KafkaBufferProducer {
pub async fn new(
conn: impl Into<String> + Send,
database_name: impl Into<String> + Send,
connection_config: &HashMap<String, String>,
connection_config: &BTreeMap<String, String>,
creation_config: Option<&WriteBufferCreationConfig>,
time_provider: Arc<dyn TimeProvider>,
) -> Result<Self, WriteBufferError> {
@ -313,7 +313,7 @@ impl KafkaBufferConsumer {
conn: impl Into<String> + Send + Sync,
server_id: ServerId,
database_name: impl Into<String> + Send + Sync,
connection_config: &HashMap<String, String>,
connection_config: &BTreeMap<String, String>,
creation_config: Option<&WriteBufferCreationConfig>,
// `trace_collector` has to be a reference due to https://github.com/rust-lang/rust/issues/63033
trace_collector: Option<&Arc<dyn TraceCollector>>,
@ -426,7 +426,7 @@ async fn create_kafka_topic(
kafka_connection: &str,
database_name: &str,
n_sequencers: NonZeroU32,
cfg: &HashMap<String, String>,
cfg: &BTreeMap<String, String>,
) -> Result<(), WriteBufferError> {
let admin = admin_client(kafka_connection)?;
@ -489,7 +489,7 @@ async fn maybe_auto_create_topics(
}
pub mod test_utils {
use std::{collections::HashMap, time::Duration};
use std::{collections::BTreeMap, time::Duration};
use rdkafka::admin::{AdminOptions, AlterConfig, ResourceSpecifier};
use uuid::Uuid;
@ -544,12 +544,12 @@ pub mod test_utils {
}
/// Create topic creation config that is ideal for testing and works with [`purge_kafka_topic`]
pub fn kafka_sequencer_options() -> HashMap<String, String> {
let mut cfg: HashMap<String, String> = Default::default();
cfg.insert("cleanup.policy".to_string(), "delete".to_string());
cfg.insert("retention.ms".to_string(), "-1".to_string());
cfg.insert("segment.ms".to_string(), "10".to_string());
cfg
pub fn kafka_sequencer_options() -> BTreeMap<String, String> {
BTreeMap::from([
("cleanup.policy".to_string(), "delete".to_string()),
("retention.ms".to_string(), "-1".to_string()),
("segment.ms".to_string(), "10".to_string()),
])
}
/// Purge all records from given topic.
@ -587,7 +587,6 @@ mod tests {
sync::atomic::{AtomicU32, Ordering},
};
use entry::test_helpers::lp_to_entry;
use time::TimeProvider;
use trace::{RingBufferTraceCollector, TraceCollector};
@ -720,10 +719,17 @@ mod tests {
let adapter = KafkaTestAdapter::new(conn);
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
let headers = IoxHeaders::new(ContentType::Protobuf, None);
let mut owned_headers = OwnedHeaders::new();
for (name, value) in headers.headers() {
owned_headers = owned_headers.add(name, value.as_bytes());
}
let writer = ctx.writing(true).await.unwrap();
let partition = set_pop_first(&mut writer.sequencer_ids()).unwrap() as i32;
let record: FutureRecord<'_, String, [u8]> =
FutureRecord::to(&writer.database_name).partition(partition);
let record: FutureRecord<'_, String, [u8]> = FutureRecord::to(&writer.database_name)
.partition(partition)
.headers(owned_headers);
writer.producer.send(record, Timeout::Never).await.unwrap();
let mut reader = ctx.reading(true).await.unwrap();
@ -743,9 +749,8 @@ mod tests {
let writer = ctx.writing(true).await.unwrap();
let partition = set_pop_first(&mut writer.sequencer_ids()).unwrap() as i32;
let entry = lp_to_entry("upc,region=east user=1 100");
let record: FutureRecord<'_, String, _> = FutureRecord::to(&writer.database_name)
.payload(entry.data())
.payload(&[0])
.partition(partition);
writer.producer.send(record, Timeout::Never).await.unwrap();
@ -753,7 +758,8 @@ mod tests {
let mut streams = reader.streams();
assert_eq!(streams.len(), 1);
let (_sequencer_id, mut stream) = map_pop_first(&mut streams).unwrap();
stream.stream.next().await.unwrap().unwrap();
let err = stream.stream.next().await.unwrap().unwrap_err();
assert_eq!(err.to_string(), "No content type header");
}
#[tokio::test]
@ -764,9 +770,8 @@ mod tests {
let writer = ctx.writing(true).await.unwrap();
let partition = set_pop_first(&mut writer.sequencer_ids()).unwrap() as i32;
let entry = lp_to_entry("upc,region=east user=1 100");
let record: FutureRecord<'_, String, _> = FutureRecord::to(&writer.database_name)
.payload(entry.data())
.payload(&[0])
.partition(partition)
.headers(OwnedHeaders::new().add(HEADER_CONTENT_TYPE, "foo"));
writer.producer.send(record, Timeout::Never).await.unwrap();