Merge pull request #1675 from influxdata/cn/kafka-docker-working

test: Starting Kafka Write Buffer integration tests
pull/24376/head
kodiakhq[bot] 2021-06-21 13:59:06 +00:00 committed by GitHub
commit 15aaf235b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 436 additions and 43 deletions

View File

@ -124,6 +124,15 @@ jobs:
command: cargo test --workspace command: cargo test --workspace
- cache_save - cache_save
test_kafka_integration:
machine: true
resource_class: xlarge
steps:
- checkout
- run:
name: Run integration tests with Docker Compose
command: docker-compose -f docker/ci-kafka-docker-compose.yml up --build --force-recreate --exit-code-from rust
# Integration tests for the influxdb2_client crate against InfluxDB 2.0 OSS. # Integration tests for the influxdb2_client crate against InfluxDB 2.0 OSS.
test_influxdb2_client: test_influxdb2_client:
docker: docker:
@ -278,6 +287,7 @@ workflows:
- lint - lint
- protobuf-lint - protobuf-lint
- test - test
- test_kafka_integration
- test_influxdb2_client - test_influxdb2_client
- build - build
- check-flatbuffers - check-flatbuffers

177
Cargo.lock generated
View File

@ -173,7 +173,7 @@ dependencies = [
"arrow", "arrow",
"hashbrown 0.11.2", "hashbrown 0.11.2",
"num-traits", "num-traits",
"rand 0.8.4", "rand 0.8.3",
"snafu", "snafu",
] ]
@ -851,7 +851,7 @@ dependencies = [
"parquet", "parquet",
"paste 1.0.5", "paste 1.0.5",
"pin-project-lite", "pin-project-lite",
"rand 0.8.4", "rand 0.8.3",
"smallvec", "smallvec",
"sqlparser", "sqlparser",
"tokio", "tokio",
@ -875,6 +875,17 @@ dependencies = [
"uuid", "uuid",
] ]
[[package]]
name = "derivative"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "difference" name = "difference"
version = "2.0.0" version = "2.0.0"
@ -1611,7 +1622,8 @@ dependencies = [
"prettytable-rs", "prettytable-rs",
"prost", "prost",
"query", "query",
"rand 0.8.4", "rand 0.8.3",
"rdkafka",
"read_buffer", "read_buffer",
"reqwest", "reqwest",
"routerify", "routerify",
@ -1650,7 +1662,7 @@ dependencies = [
"http", "http",
"hyper", "hyper",
"prost", "prost",
"rand 0.8.4", "rand 0.8.3",
"serde", "serde",
"serde_json", "serde_json",
"thiserror", "thiserror",
@ -1677,7 +1689,7 @@ dependencies = [
"hex", "hex",
"integer-encoding", "integer-encoding",
"observability_deps", "observability_deps",
"rand 0.8.4", "rand 0.8.3",
"snafu", "snafu",
"snap", "snap",
"test_helpers", "test_helpers",
@ -1713,9 +1725,9 @@ dependencies = [
[[package]] [[package]]
name = "ipnet" name = "ipnet"
version = "2.3.1" version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68f2d64f2edebec4ce84ad108148e67e1064789bee435edc5b60ad398714a3a9" checksum = "47be2f14c678be2fdcab04ab1171db51b2762ce6f0a8ee87c8dd4a04ed216135"
[[package]] [[package]]
name = "itertools" name = "itertools"
@ -1820,6 +1832,18 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a" checksum = "c7d73b3f436185384286bd8098d17ec07c9a7d2388a6599f824d8502b529702a"
[[package]]
name = "libz-sys"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de5435b8549c16d423ed0c03dbaafe57cf6c3344744f1242520d59c9d8ecec66"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]] [[package]]
name = "lifecycle" name = "lifecycle"
version = "0.1.0" version = "0.1.0"
@ -1975,9 +1999,9 @@ dependencies = [
[[package]] [[package]]
name = "mio" name = "mio"
version = "0.7.13" version = "0.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c2bdb6314ec10835cd3293dd268473a835c02b7b352e788be788b3c6ca6bb16" checksum = "cf80d3e903b34e0bd7282b218398aec54e082c840d9baf8339e0080a0c542956"
dependencies = [ dependencies = [
"libc", "libc",
"log", "log",
@ -2239,6 +2263,28 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "num_enum"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "226b45a5c2ac4dd696ed30fa6b94b057ad909c7b7fc2e0d0808192bced894066"
dependencies = [
"derivative",
"num_enum_derive",
]
[[package]]
name = "num_enum_derive"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c0fd9eba1d5db0994a239e09c1be402d35622277e35468ba891aa5e3188ce7e"
dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "oauth2" name = "oauth2"
version = "4.0.0-alpha.3" version = "4.0.0-alpha.3"
@ -2369,7 +2415,7 @@ dependencies = [
"lazy_static", "lazy_static",
"percent-encoding", "percent-encoding",
"pin-project 1.0.7", "pin-project 1.0.7",
"rand 0.8.4", "rand 0.8.3",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
@ -2467,7 +2513,7 @@ dependencies = [
"internal_types", "internal_types",
"observability_deps", "observability_deps",
"parquet", "parquet",
"rand 0.8.4", "rand 0.8.3",
"snafu", "snafu",
"test_helpers", "test_helpers",
] ]
@ -2499,7 +2545,7 @@ dependencies = [
"cfg-if", "cfg-if",
"instant", "instant",
"libc", "libc",
"redox_syscall 0.2.9", "redox_syscall 0.2.8",
"smallvec", "smallvec",
"winapi", "winapi",
] ]
@ -2796,6 +2842,15 @@ dependencies = [
"unicode-width", "unicode-width",
] ]
[[package]]
name = "proc-macro-crate"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d6ea3c4595b96363c13943497db34af4460fb474a95c43f4446ad341b8c9785"
dependencies = [
"toml",
]
[[package]] [[package]]
name = "proc-macro-error" name = "proc-macro-error"
version = "1.0.4" version = "1.0.4"
@ -3007,14 +3062,14 @@ dependencies = [
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.8.4" version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8" checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e"
dependencies = [ dependencies = [
"libc", "libc",
"rand_chacha 0.3.1", "rand_chacha 0.3.1",
"rand_core 0.6.3", "rand_core 0.6.2",
"rand_hc 0.3.1", "rand_hc 0.3.0",
] ]
[[package]] [[package]]
@ -3034,7 +3089,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
dependencies = [ dependencies = [
"ppv-lite86", "ppv-lite86",
"rand_core 0.6.3", "rand_core 0.6.2",
] ]
[[package]] [[package]]
@ -3048,21 +3103,21 @@ dependencies = [
[[package]] [[package]]
name = "rand_core" name = "rand_core"
version = "0.6.3" version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d34f1408f55294453790c48b2f1ebbb1c5b4b7563eb1f418bcfcfdbb06ebb4e7" checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7"
dependencies = [ dependencies = [
"getrandom 0.2.3", "getrandom 0.2.3",
] ]
[[package]] [[package]]
name = "rand_distr" name = "rand_distr"
version = "0.4.1" version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "051b398806e42b9cd04ad9ec8f81e355d0a382c543ac6672c62f5a5b452ef142" checksum = "da9e8f32ad24fb80d07d2323a9a2ce8b30d68a62b8cb4df88119ff49a698f038"
dependencies = [ dependencies = [
"num-traits", "num-traits",
"rand 0.8.4", "rand 0.8.3",
] ]
[[package]] [[package]]
@ -3076,11 +3131,11 @@ dependencies = [
[[package]] [[package]]
name = "rand_hc" name = "rand_hc"
version = "0.3.1" version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d51e9f596de227fda2ea6c84607f5558e196eeaf43c986b724ba4fb8fdf497e7" checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73"
dependencies = [ dependencies = [
"rand_core 0.6.3", "rand_core 0.6.2",
] ]
[[package]] [[package]]
@ -3108,6 +3163,35 @@ dependencies = [
"num_cpus", "num_cpus",
] ]
[[package]]
name = "rdkafka"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af78bc431a82ef178c4ad6db537eb9cc25715a8591d27acc30455ee7227a76f4"
dependencies = [
"futures",
"libc",
"log",
"rdkafka-sys",
"serde",
"serde_derive",
"serde_json",
"slab",
"tokio",
]
[[package]]
name = "rdkafka-sys"
version = "4.0.0+1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54f24572851adfeb525fdc4a1d51185898e54fed4e8d8dba4fadb90c6b4f0422"
dependencies = [
"libc",
"libz-sys",
"num_enum",
"pkg-config",
]
[[package]] [[package]]
name = "read_buffer" name = "read_buffer"
version = "0.1.0" version = "0.1.0"
@ -3127,7 +3211,7 @@ dependencies = [
"packers", "packers",
"parking_lot", "parking_lot",
"permutation", "permutation",
"rand 0.8.4", "rand 0.8.3",
"rand_distr", "rand_distr",
"snafu", "snafu",
"test_helpers", "test_helpers",
@ -3141,9 +3225,9 @@ checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.2.9" version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ab49abadf3f9e1c4bc499e8845e152ad87d2ad2d30371841171169e9d75feee" checksum = "742739e41cd49414de871ea5e549afb7e2a3ac77b589bcbebe8c82fab37147fc"
dependencies = [ dependencies = [
"bitflags", "bitflags",
] ]
@ -3166,7 +3250,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64" checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64"
dependencies = [ dependencies = [
"getrandom 0.2.3", "getrandom 0.2.3",
"redox_syscall 0.2.9", "redox_syscall 0.2.8",
] ]
[[package]] [[package]]
@ -3660,7 +3744,7 @@ dependencies = [
"parking_lot", "parking_lot",
"parquet_file", "parquet_file",
"query", "query",
"rand 0.8.4", "rand 0.8.3",
"rand_distr", "rand_distr",
"read_buffer", "read_buffer",
"serde", "serde",
@ -3692,7 +3776,7 @@ dependencies = [
"packers", "packers",
"query", "query",
"query_tests", "query_tests",
"rand 0.8.4", "rand 0.8.3",
"server", "server",
"test_helpers", "test_helpers",
"tokio", "tokio",
@ -3941,9 +4025,9 @@ checksum = "1e81da0851ada1f3e9d4312c704aa4f8806f0f9d69faaf8df2f3464b4a9437c2"
[[package]] [[package]]
name = "symbolic-common" name = "symbolic-common"
version = "8.2.1" version = "8.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47ff4840b3bc0674313dcb4e6d5afc15b2d3fd4269716f06fcf581037645a56e" checksum = "7f8e101b55bbcf228c855fa34fc4312e4f58b4a3251f1298bc0f97b71557815d"
dependencies = [ dependencies = [
"debugid", "debugid",
"memmap", "memmap",
@ -3953,9 +4037,9 @@ dependencies = [
[[package]] [[package]]
name = "symbolic-demangle" name = "symbolic-demangle"
version = "8.2.1" version = "8.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34248c3797477de8fe190615c3f50d69ef79edd65179324f11ffec2ec922cb48" checksum = "5e257e28c2cbcf60a0c21089d32ff8b6cdc7efa6125b13693d29e8986aa1cd99"
dependencies = [ dependencies = [
"rustc-demangle", "rustc-demangle",
"symbolic-common", "symbolic-common",
@ -3998,8 +4082,8 @@ checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"rand 0.8.4", "rand 0.8.3",
"redox_syscall 0.2.9", "redox_syscall 0.2.8",
"remove_dir_all", "remove_dir_all",
"winapi", "winapi",
] ]
@ -4201,9 +4285,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.7.0" version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c79ba603c337335df6ba6dd6afc38c38a7d5e1b0c871678439ea973cd62a118e" checksum = "0a38d31d7831c6ed7aad00aa4c12d9375fd225a6dd77da1d25b707346319a975"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"bytes", "bytes",
@ -4276,6 +4360,15 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "toml"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa"
dependencies = [
"serde",
]
[[package]] [[package]]
name = "tonic" name = "tonic"
version = "0.4.3" version = "0.4.3"
@ -4357,7 +4450,7 @@ dependencies = [
"futures-util", "futures-util",
"indexmap", "indexmap",
"pin-project 1.0.7", "pin-project 1.0.7",
"rand 0.8.4", "rand 0.8.3",
"slab", "slab",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
@ -4593,9 +4686,9 @@ dependencies = [
[[package]] [[package]]
name = "vcpkg" name = "vcpkg"
version = "0.2.14" version = "0.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70455df2fdf4e9bf580a92e443f1eb0303c390d682e2ea817312c9e81f8c3399" checksum = "025ce40a007e1907e58d5bc1a594def78e5573bb0b1160bc389634e8f12e4faa"
[[package]] [[package]]
name = "vec_map" name = "vec_map"

View File

@ -124,5 +124,6 @@ flate2 = "1.0"
hex = "0.4.2" hex = "0.4.2"
predicates = "1.0.4" predicates = "1.0.4"
rand = "0.8.3" rand = "0.8.3"
rdkafka = "0.26.0"
reqwest = "0.11" reqwest = "0.11"
tempfile = "3.1.0" tempfile = "3.1.0"

View File

@ -0,0 +1,24 @@
###
# Dockerfile for integration tests that connect to Kafka
# It expects to be run with `docker-compose -f ci-kafka-docker-compose.yml`
##
# Reuse most of the configuration for the rest of the CI builds
FROM quay.io/influxdb/rust:ci
# Create a new directory that will contain the code checkout
ADD . /home/rust/iox
# Make the rust user the owner
RUN sudo chown -R rust:rust /home/rust/iox
# Work in this directory
WORKDIR /home/rust/iox
ENV CARGO_INCREMENTAL=0
ENV RUSTFLAGS="-C debuginfo=1"
ENV TEST_INTEGRATION=1
ENV KAFKA_CONNECT=kafka:9092
# Run the integration tests that connect to Kafka that will be running in another container
CMD ["sh", "-c", "cargo test -p influxdb_iox --test end-to-end write_buffer"]

View File

@ -0,0 +1 @@
target/

View File

@ -0,0 +1,36 @@
version: "2"
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: docker.io/bitnami/kafka:2
ports:
- "9093:9093"
volumes:
- "kafka_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
rust:
build:
context: ..
dockerfile: docker/Dockerfile.ci.integration
depends_on:
- kafka
volumes:
zookeeper_data: {}
kafka_data: {}

View File

@ -39,3 +39,51 @@ integration tests against `influxd` running in a Docker container.
If you do not want to use Docker locally, but you do have `influxd` for InfluxDB If you do not want to use Docker locally, but you do have `influxd` for InfluxDB
2.0 locally, you can use that instead by running the tests with the environment variable 2.0 locally, you can use that instead by running the tests with the environment variable
`INFLUXDB_IOX_INTEGRATION_LOCAL=1`. `INFLUXDB_IOX_INTEGRATION_LOCAL=1`.
## Kafka Write Buffer
If you want to run integration tests with a Kafka instance serving as a write buffer, you will need
to set `TEST_INTEGRATION=1`.
You will also need to set `KAFKA_CONNECT` to the host and port where the tests can connect to a
running Kafka instance.
There is a Docker Compose file for running Kafka and Zookeeper using Docker in
`docker/ci-kafka-docker-compose.yml` that CI also uses to run the integration tests.
You have two options for running `cargo test`: on your local (host) machine (likely what you
normally do with tests), or within another Docker container (what CI does).
### Running `cargo test` on the host machine
If you want to compile the tests and run `cargo test` on your local machine, you can start Kafka
using the Docker Compose file with:
```
$ docker-compose -f docker/ci-kafka-docker-compose.yml up kafka
```
You can then run the tests with `KAFKA_CONNECT=localhost:9093`. To run just the Kafka integration
tests, the full command would then be:
```
TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo test -p influxdb_iox --test end-to-end write_buffer
```
### Running `cargo test` in a Docker container
Alternatively, you can do what CI does by compiling the tests and running `cargo test` in a Docker
container as well. First, make sure you have the latest `rust:ci` image by running:
```
docker image pull quay.io/influxdb/rust:ci
```
Then run this Docker Compose command that uses `docker/Dockerfile.ci.integration`:
```
docker-compose -f docker/ci-kafka-docker-compose.yml up --build --force-recreate --exit-code-from rust
```
Because the `rust` service depends on the `kafka` service in the Docker Compose file, you don't
need to start the `kafka` service separately.

View File

@ -12,4 +12,5 @@ mod sql_cli;
pub mod storage_api; pub mod storage_api;
mod system_tables; mod system_tables;
pub mod write_api; pub mod write_api;
mod write_buffer;
pub mod write_cli; pub mod write_cli;

View File

@ -293,6 +293,17 @@ pub fn rand_id() -> String {
pub async fn create_readable_database( pub async fn create_readable_database(
db_name: impl Into<String>, db_name: impl Into<String>,
channel: tonic::transport::Channel, channel: tonic::transport::Channel,
) {
create_readable_database_plus(db_name, channel, std::convert::identity).await
}
/// given a channel to talk with the management api, create a new
/// database with the specified name configured with a 10MB mutable
/// buffer, partitioned on table
pub async fn create_readable_database_plus(
db_name: impl Into<String>,
channel: tonic::transport::Channel,
modify_rules: impl FnOnce(DatabaseRules) -> DatabaseRules,
) { ) {
let mut management_client = influxdb_iox_client::management::Client::new(channel); let mut management_client = influxdb_iox_client::management::Client::new(channel);
@ -310,8 +321,10 @@ pub async fn create_readable_database(
..Default::default() ..Default::default()
}; };
let rules = modify_rules(rules);
management_client management_client
.create_database(rules.clone()) .create_database(rules)
.await .await
.expect("create database failed"); .expect("create database failed");
} }

View File

@ -0,0 +1,166 @@
use futures::{stream::FuturesUnordered, StreamExt};
use rdkafka::{
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
client::DefaultClientContext,
consumer::{Consumer, StreamConsumer},
producer::{FutureProducer, FutureRecord},
ClientConfig, Message, Offset, TopicPartitionList,
};
use std::{
array,
convert::TryInto,
time::{SystemTime, UNIX_EPOCH},
};
/// If `TEST_INTEGRATION` and `KAFKA_CONNECT` are set, return the Kafka connection URL to the
/// caller.
///
/// If `TEST_INTEGRATION` is set but `KAFKA_CONNECT` is not set, fail the tests and provide
/// guidance for setting `KAFKA_CONNECTION`.
///
/// If `TEST_INTEGRATION` is not set, skip the calling test by returning early.
macro_rules! maybe_skip_integration {
() => {{
use std::env;
dotenv::dotenv().ok();
match (
env::var("TEST_INTEGRATION").is_ok(),
env::var("KAFKA_CONNECT").ok(),
) {
(true, Some(kafka_connection)) => kafka_connection,
(true, None) => {
panic!(
"TEST_INTEGRATION is set which requires running integration tests, but \
KAFKA_CONNECT is not set. Please run Kafka, perhaps by using the command \
`docker-compose -f docker/ci-kafka-docker-compose.yml up kafka`, then \
set KAFKA_CONNECT to the host and port where Kafka is accessible. If \
running the `docker-compose` command and the Rust tests on the host, the \
value for `KAFKA_CONNECT` should be `localhost:9093`. If running the Rust \
tests in another container in the `docker-compose` network as on CI, \
`KAFKA_CONNECT` should be `kafka:9092`."
)
}
(false, Some(_)) => {
eprintln!("skipping Kafka integration tests - set TEST_INTEGRATION to run");
return;
}
(false, None) => {
eprintln!(
"skipping Kafka integration tests - set TEST_INTEGRATION and KAFKA_CONNECT to \
run"
);
return;
}
}
}};
}
// This is the test I actually want to write but can't yet because the code doesn't exist
// #[tokio::test]
// async fn writes_go_to_kafka() {
// // start up kafka
//
// // set up a database with a write buffer pointing at kafka
//
// // write some points
//
// // check the data is in kafka
//
// // stop kafka
// }
// This test validates the Kafka/Docker Compose setup and that the Rust tests can use it
#[tokio::test]
async fn can_connect_to_kafka() {
// TODO: this should be the database name and managed by IOx
const TOPIC: &str = "my-topic22227";
// TODO: this should go away
const NUM_MSGS: usize = 10;
let kafka_connection = maybe_skip_integration!();
// connect to kafka, produce, and consume
let mut cfg = ClientConfig::new();
cfg.set("bootstrap.servers", kafka_connection);
let admin_cfg = cfg.clone();
let mut producer_cfg = cfg.clone();
producer_cfg.set("message.timeout.ms", "5000");
let mut consumer_cfg = cfg.clone();
consumer_cfg.set("session.timeout.ms", "6000");
consumer_cfg.set("enable.auto.commit", "false");
consumer_cfg.set("group.id", "placeholder");
let admin: AdminClient<DefaultClientContext> = admin_cfg.create().unwrap();
let producer: FutureProducer = producer_cfg.create().unwrap();
let consumer: StreamConsumer = consumer_cfg.create().unwrap();
let topic = NewTopic::new(TOPIC, 1, TopicReplication::Fixed(1));
let opts = AdminOptions::default();
admin.create_topics(&[topic], &opts).await.unwrap();
let mut topics = TopicPartitionList::new();
topics.add_partition(TOPIC, 0);
topics
.set_partition_offset(TOPIC, 0, Offset::Beginning)
.unwrap();
consumer.assign(&topics).unwrap();
let consumer_task = tokio::spawn(async move {
eprintln!("Consumer task starting");
let mut counter = NUM_MSGS;
loop {
let p = consumer.recv().await.unwrap();
eprintln!("Received a {:?}", p.payload().map(String::from_utf8_lossy));
counter -= 1;
if counter == 0 {
break;
}
}
assert_eq!(counter, 0);
eprintln!("Exiting Consumer");
});
// TODO all the producing should move to server/src/write_buffer.rs
let producer_task = tokio::spawn(async move {
eprintln!("Producer task starting");
for i in 0..NUM_MSGS {
let s = format!("hello! {}", i);
let record = FutureRecord::to(TOPIC).key(&s).payload(&s).timestamp(now());
match producer.send_result(record) {
Ok(x) => match x.await.unwrap() {
Ok((partition, offset)) => {
// TODO remove all the dbg
dbg!(&s, partition, offset);
}
Err((e, msg)) => panic!("oh no {}, {:?}", e, msg),
},
Err((e, msg)) => panic!("oh no {}, {:?}", e, msg),
}
eprintln!("Sent {}", i);
}
eprintln!("exiting producer");
});
let mut tasks: FuturesUnordered<_> =
array::IntoIter::new([consumer_task, producer_task]).collect();
while let Some(t) = tasks.next().await {
t.unwrap();
}
}
fn now() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
.try_into()
.unwrap()
}