Merge branch 'main' into ntran/group

pull/24376/head
kodiakhq[bot] 2021-10-26 21:25:57 +00:00 committed by GitHub
commit 6760212e86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
189 changed files with 2162 additions and 1245 deletions

View File

@ -354,15 +354,15 @@ jobs:
- checkout
- rust_components
- cache_restore
- run:
name: Print rustc target CPU options
command: cargo run --release --no-default-features --features="aws,gcp,azure,jemalloc_replacing_malloc" --bin print_cpu
- run:
name: Cargo release build with target arch set for CRoaring
command: cargo build --release --no-default-features --features="aws,gcp,azure,jemalloc_replacing_malloc"
- run: |
echo sha256sum after build is
sha256sum target/release/influxdb_iox
- run:
name: Print rustc target CPU options
command: target/release/influxdb_iox debug print-cpu
- setup_remote_docker:
# There seems to be a cache invalidation bug in docker
# or in the way that circleci implements layer caching.

35
Cargo.lock generated
View File

@ -283,9 +283,9 @@ dependencies = [
[[package]]
name = "backtrace"
version = "0.3.61"
version = "0.3.62"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7a905d892734eea339e896738c14b9afce22b5318f64b951e70bf3844419b01"
checksum = "091bcdf2da9950f96aa522681ce805e6857f6ca8df73833d35736ab2dc78e152"
dependencies = [
"addr2line",
"cc",
@ -1429,9 +1429,9 @@ dependencies = [
[[package]]
name = "http-body"
version = "0.4.3"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "399c583b2979440c60be0821a6199eca73bc3c8dcd9d070d75ac726e2c6186e5"
checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6"
dependencies = [
"bytes",
"http",
@ -1458,9 +1458,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.13"
version = "0.14.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15d1cfb9e4f68655fa04c01f59edb405b6074a0f7118ea881e5026e4a1cd8593"
checksum = "2b91bb1f221b6ea1f1e4371216b70f40748774c2fb5971b450c07773fb92d26b"
dependencies = [
"bytes",
"futures-channel",
@ -1635,7 +1635,6 @@ dependencies = [
"prost",
"query",
"rand",
"rdkafka",
"read_buffer",
"reqwest",
"rustyline",
@ -1931,9 +1930,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.104"
version = "0.2.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b2f96d100e1cf1929e7719b7edb3b90ab5298072638fccd77be9ce942ecdfce"
checksum = "869d572136620d55835903746bcb5cdc54cb2851fd0aeec53220b4bb65ef3013"
[[package]]
name = "libloading"
@ -2035,9 +2034,9 @@ checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"
[[package]]
name = "matchers"
version = "0.0.1"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata",
]
@ -2219,6 +2218,7 @@ dependencies = [
"bytes",
"criterion",
"flate2",
"hashbrown",
"influxdb_line_protocol",
"mutable_batch",
"schema",
@ -2504,9 +2504,9 @@ dependencies = [
[[package]]
name = "object"
version = "0.26.2"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39f37e50073ccad23b6d09bcb5b263f4e76d3bb6038e4a3c08e52162ffa8abc2"
checksum = "67ac1d3f9a1d3616fd9a60c8d74296f22406a238b6a72f5cc1e6f314df4ffbf9"
dependencies = [
"memchr",
]
@ -4518,9 +4518,9 @@ dependencies = [
[[package]]
name = "tower"
version = "0.4.9"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d15a6b60cdff0cb039d81d3b37f8bc3d7e53dca09069aae3ef2502ca4834fe30"
checksum = "c00e500fff5fa1131c866b246041a6bf96da9c965f8fe4128cb1421f23e93c00"
dependencies = [
"futures-core",
"futures-util",
@ -4658,12 +4658,11 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.2.25"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e0d2eaa99c3c2e41547cfa109e910a68ea03823cccad4a0525dcbc9b01e8c71"
checksum = "5cf865b5ddc38e503a29c41c4843e616a73028ae18c637bc3eb2afaef4909c84"
dependencies = [
"ansi_term 0.12.1",
"chrono",
"lazy_static",
"matchers",
"regex",

View File

@ -1,45 +1,17 @@
[package]
name = "influxdb_iox"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
default-run = "influxdb_iox"
readme = "README.md"
exclude = [
"*.md",
"*.txt",
".circleci/",
".editorconfig",
".git*",
".github/",
".kodiak.toml",
"Dockerfile*",
"LICENSE*",
"buf.yaml",
"docker/",
"docs/",
"massif.out.*",
"perf/",
"scripts/",
"tools/",
]
[[bin]]
name = "print_cpu"
path = "src/print_cpu.rs"
[workspace] # In alphabetical order
[workspace]
# In alphabetical order
members = [
"arrow_util",
"data_types",
"client_util",
"data_types",
"datafusion",
"datafusion_util",
"entry",
"generated_types",
"grpc-router",
"grpc-router-test-gen",
"influxdb2_client",
"influxdb_iox",
"influxdb_iox_client",
"influxdb_line_protocol",
"influxdb_storage_client",
@ -66,6 +38,7 @@ members = [
"query",
"query_tests",
"read_buffer",
"schema",
"server",
"server_benchmarks",
"test_helpers",
@ -75,131 +48,32 @@ members = [
"trace_http",
"tracker",
"trogging",
"schema",
"grpc-router",
"grpc-router/grpc-router-test-gen",
"write_buffer",
]
default-members = ["influxdb_iox"]
exclude = [
"*.md",
"*.txt",
".circleci/",
".editorconfig",
".git*",
".github/",
".kodiak.toml",
"Dockerfile*",
"LICENSE*",
"buf.yaml",
"docker/",
"docs/",
"massif.out.*",
"perf/",
"scripts/",
"test_fixtures/",
"tools/",
]
[profile.release]
debug = true
[profile.bench]
debug = true
[dependencies]
# Workspace dependencies, in alphabetical order
datafusion = { path = "datafusion" }
data_types = { path = "data_types" }
entry = { path = "entry" }
generated_types = { path = "generated_types" }
influxdb_iox_client = { path = "influxdb_iox_client", features = ["format"] }
influxdb_line_protocol = { path = "influxdb_line_protocol" }
internal_types = { path = "internal_types" }
iox_object_store = { path = "iox_object_store" }
logfmt = { path = "logfmt" }
metric = { path = "metric" }
metric_exporters = { path = "metric_exporters" }
mutable_buffer = { path = "mutable_buffer" }
num_cpus = "1.13.0"
object_store = { path = "object_store" }
observability_deps = { path = "observability_deps" }
panic_logging = { path = "panic_logging" }
parquet_catalog = { path = "parquet_catalog" }
parquet_file = { path = "parquet_file" }
predicate = { path = "predicate" }
query = { path = "query" }
read_buffer = { path = "read_buffer" }
server = { path = "server" }
trace = { path = "trace" }
trace_exporters = { path = "trace_exporters" }
trace_http = { path = "trace_http" }
tracker = { path = "tracker" }
trogging = { path = "trogging", default-features = false, features = ["structopt"] }
time = { path = "time" }
# Crates.io dependencies, in alphabetical order
arrow = { version = "6.0", features = ["prettyprint"] }
arrow-flight = "6.0"
backtrace = "0.3"
byteorder = "1.3.4"
bytes = "1.0"
chrono = "0.4"
clap = "2.33.1"
csv = "1.1"
dirs = "4.0.0"
dotenv = "0.15.0"
flate2 = "1.0"
futures = "0.3"
hashbrown = "0.11"
http = "0.2.0"
humantime = "2.1.0"
hyper = "0.14"
libc = { version = "0.2" }
log = "0.4"
once_cell = { version = "1.4.0", features = ["parking_lot"] }
parking_lot = "0.11.2"
itertools = "0.10.1"
parquet = "6.0"
pin-project = "1.0"
# used by arrow/datafusion anyway
comfy-table = { version = "4.0", default-features = false }
pprof = { version = "^0.5", default-features = false, features = ["flamegraph", "protobuf"], optional = true }
prost = "0.8"
rustyline = { version = "9.0", default-features = false }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.67"
serde_urlencoded = "0.7.0"
snafu = "0.6.9"
structopt = "0.3.25"
thiserror = "1.0.30"
tikv-jemalloc-ctl = { version = "0.4.0" }
tokio = { version = "1.11", features = ["macros", "rt-multi-thread", "parking_lot", "signal"] }
tokio-stream = { version = "0.1.2", features = ["net"] }
tokio-util = { version = "0.6.3" }
tonic = "0.5.0"
tonic-health = "0.4.0"
tonic-reflection = "0.2.0"
tower = "0.4"
uuid = { version = "0.8", features = ["v4"] }
# jemalloc-sys with unprefixed_malloc_on_supported_platforms feature and heappy are mutually exclusive
tikv-jemalloc-sys = { version = "0.4.0", optional = true, features = ["unprefixed_malloc_on_supported_platforms"] }
heappy = { git = "https://github.com/mkmik/heappy", rev = "20aa466524ac9ce34a4bae29f27ec11869b50e21", features = ["enable_heap_profiler", "jemalloc_shim", "measure_free"], optional = true }
[dev-dependencies]
# Workspace dependencies, in alphabetical order
arrow_util = { path = "arrow_util" }
entry = { path = "entry" }
influxdb2_client = { path = "influxdb2_client" }
influxdb_storage_client = { path = "influxdb_storage_client" }
influxdb_iox_client = { path = "influxdb_iox_client", features = ["flight"] }
test_helpers = { path = "test_helpers" }
parking_lot = "0.11.2"
write_buffer = { path = "write_buffer" }
# Crates.io dependencies, in alphabetical order
assert_cmd = "2.0.2"
flate2 = "1.0"
hex = "0.4.2"
predicates = "2.0.3"
rand = "0.8.3"
rdkafka = "0.27.0"
reqwest = "0.11"
tempfile = "3.1.0"
[features]
default = ["jemalloc_replacing_malloc"]
azure = ["object_store/azure"] # Optional Azure Object store support
gcp = ["object_store/gcp"] # Optional GCP object store support
aws = ["object_store/aws"] # Optional AWS / S3 object store support
# pprof is an optional feature for pprof support
# heappy is an optional feature; Not on by default as it
# runtime overhead on all allocations (calls to malloc).
# Cargo cannot currently implement mutually exclusive features so let's force every build
# to pick either heappy or jemalloc_replacing_malloc feature at least until we figure out something better.
jemalloc_replacing_malloc = ["tikv-jemalloc-sys"]

View File

@ -2,7 +2,7 @@
name = "arrow_util"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
edition = "2021"
description = "Apache Arrow utilities"
[dependencies]

View File

@ -29,6 +29,12 @@ impl BitSet {
bitset
}
/// Reserve space for `count` further bits
pub fn reserve(&mut self, count: usize) {
let new_buf_len = (self.len + count + 7) >> 3;
self.buffer.reserve(new_buf_len);
}
/// Appends `count` unset bits
pub fn append_unset(&mut self, count: usize) {
self.len += count;

View File

@ -3,7 +3,7 @@ name = "client_util"
version = "0.1.0"
authors = ["Raphael Taylor-Davies <r.taylordavies@googlemail.com>"]
description = "Shared code for IOx clients"
edition = "2018"
edition = "2021"
[dependencies]
http = "0.2.3"
@ -13,4 +13,4 @@ tonic = { version = "0.5.0" }
tower = "0.4"
[dev-dependencies]
tokio = { version = "1.11", features = ["macros"] }
tokio = { version = "1.11", features = ["macros"] }

View File

@ -3,7 +3,7 @@ name = "data_types"
version = "0.1.0"
authors = ["pauldix <paul@pauldix.net>"]
description = "InfluxDB IOx data_types, shared between IOx instances and IOx clients"
edition = "2018"
edition = "2021"
readme = "README.md"
[dependencies] # In alphabetical order

View File

@ -2,7 +2,7 @@
name = "datafusion"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
edition = "2021"
description = "Re-exports datafusion at a specific version"
[dependencies]

View File

@ -2,7 +2,7 @@
name = "datafusion_util"
version = "0.1.0"
authors = ["Andrew Lamb <andrew@nerdnetworks.org>"]
edition = "2018"
edition = "2021"
description = "Datafusion utilities"
[dependencies]

View File

@ -21,4 +21,4 @@ ENV TEST_INTEGRATION=1
ENV KAFKA_CONNECT=kafka:9092
# Run the integration tests that connect to Kafka that will be running in another container
CMD ["sh", "-c", "./docker/integration_test.sh"]
CMD ["sh", "-c", "cargo test -p write_buffer kafka -- --nocapture"]

View File

@ -1,7 +0,0 @@
#!/bin/bash
set -euxo pipefail
cargo test -p write_buffer kafka -- --nocapture
cargo test -p influxdb_iox --test end_to_end skip_replay -- --nocapture
cargo test -p influxdb_iox --test end_to_end write_buffer -- --nocapture

View File

@ -136,7 +136,7 @@ You can then run the tests with `KAFKA_CONNECT=localhost:9093`. To run just the
tests, the full command would then be:
```
TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo test -p influxdb_iox --test end_to_end write_buffer
TEST_INTEGRATION=1 KAFKA_CONNECT=localhost:9093 cargo test -p write_buffer kafka --nocapture
```
### Running `cargo test` in a Docker container

View File

@ -2,7 +2,7 @@
name = "entry"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
edition = "2021"
description = "The entry format used by the write buffer"
[dependencies]

View File

@ -2,7 +2,7 @@
name = "generated_types"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order
bytes = "1.0"

View File

@ -21,38 +21,44 @@ fn main() -> Result<()> {
/// - `com.github.influxdata.idpe.storage.read.rs`
/// - `influxdata.iox.catalog.v1.rs`
/// - `influxdata.iox.management.v1.rs`
/// - `influxdata.iox.router.v1.rs`
/// - `influxdata.iox.write.v1.rs`
/// - `influxdata.platform.storage.rs`
fn generate_grpc_types(root: &Path) -> Result<()> {
let storage_path = root.join("influxdata/platform/storage");
let idpe_path = root.join("com/github/influxdata/idpe/storage/read");
let catalog_path = root.join("influxdata/iox/catalog/v1");
let idpe_path = root.join("com/github/influxdata/idpe/storage/read");
let management_path = root.join("influxdata/iox/management/v1");
let router_path = root.join("influxdata/iox/router/v1");
let storage_path = root.join("influxdata/platform/storage");
let write_path = root.join("influxdata/iox/write/v1");
let proto_files = vec![
storage_path.join("test.proto"),
storage_path.join("predicate.proto"),
storage_path.join("storage_common.proto"),
storage_path.join("service.proto"),
storage_path.join("storage_common_idpe.proto"),
idpe_path.join("source.proto"),
catalog_path.join("catalog.proto"),
catalog_path.join("parquet_metadata.proto"),
catalog_path.join("predicate.proto"),
management_path.join("database_rules.proto"),
idpe_path.join("source.proto"),
management_path.join("chunk.proto"),
management_path.join("database_rules.proto"),
management_path.join("jobs.proto"),
management_path.join("partition.proto"),
management_path.join("partition_template.proto"),
management_path.join("server_config.proto"),
management_path.join("service.proto"),
management_path.join("shard.proto"),
management_path.join("jobs.proto"),
write_path.join("service.proto"),
root.join("influxdata/pbdata/v1/influxdb_pb_data_protocol.proto"),
root.join("grpc/health/v1/service.proto"),
management_path.join("write_buffer.proto"),
root.join("google/longrunning/operations.proto"),
root.join("google/rpc/error_details.proto"),
root.join("google/rpc/status.proto"),
root.join("grpc/health/v1/service.proto"),
root.join("influxdata/pbdata/v1/influxdb_pb_data_protocol.proto"),
router_path.join("router.proto"),
router_path.join("service.proto"),
storage_path.join("predicate.proto"),
storage_path.join("service.proto"),
storage_path.join("storage_common.proto"),
storage_path.join("storage_common_idpe.proto"),
storage_path.join("test.proto"),
write_path.join("service.proto"),
];
// Tell cargo to recompile if any of these proto files are changed

View File

@ -3,34 +3,9 @@ package influxdata.iox.management.v1;
option go_package = "github.com/influxdata/iox/management/v1";
import "google/protobuf/duration.proto";
import "google/protobuf/empty.proto";
import "influxdata/iox/management/v1/partition_template.proto";
import "influxdata/iox/management/v1/shard.proto";
// `PartitionTemplate` is used to compute the partition key of each row that
// gets written. It can consist of the table name, a column name and its value,
// a formatted time, or a string column and regex captures of its value. For
// columns that do not appear in the input row, a blank value is output.
//
// The key is constructed in order of the template parts; thus ordering changes
// what partition key is generated.
message PartitionTemplate {
message Part {
message ColumnFormat {
string column = 1;
string format = 2;
}
oneof part {
google.protobuf.Empty table = 1;
string column = 2;
string time = 3;
ColumnFormat regex = 4;
ColumnFormat strf_time = 5;
}
}
repeated Part parts = 1;
}
import "influxdata/iox/management/v1/write_buffer.proto";
message LifecycleRules {
// Once the total amount of buffered data in memory reaches this size start
@ -111,6 +86,9 @@ message LifecycleRules {
uint64 parquet_cache_limit = 17;
}
// Database rules.
//
// TODO(marco): add `WriteSources` to this message.
message DatabaseRules {
// The unencoded name of the database
//
@ -128,6 +106,8 @@ message DatabaseRules {
LifecycleRules lifecycle_rules = 3;
// If not specified, does not configure any routing
//
// TODO(marco): remove this
oneof routing_rules {
// Shard config
ShardConfig shard_config = 8;
@ -146,6 +126,8 @@ message DatabaseRules {
// Optionally, the connection for the write buffer for writing or reading/restoring data.
//
// If not specified, does not configure a write buffer
//
// TODO(marco): remove this
WriteBufferConnection write_buffer_connection = 13;
}
@ -158,61 +140,6 @@ message PersistedDatabaseRules {
DatabaseRules rules = 2;
}
// Configures the use of a write buffer.
message WriteBufferConnection {
enum Direction {
// Unspecified direction, will be treated as an error.
DIRECTION_UNSPECIFIED = 0;
// Writes into the buffer aka "producer".
DIRECTION_WRITE = 1;
// Reads from the buffer aka "consumer".
DIRECTION_READ = 2;
}
// If the buffer is used for reading or writing.
Direction direction = 1;
// Which type should be used (e.g. "kafka", "mock")
string type = 2;
// Connection string, depends on `type`.
string connection = 3;
// Old non-nested auto-creation config.
reserved 4, 5, 7;
// Special configs to be applied when establishing the connection.
//
// This depends on `type` and can configure aspects like timeouts.
map<string, string> connection_config = 6;
// Specifies if the sequencers (e.g. for Kafka in form of a topic w/ `n_sequencers` partitions) should be
// automatically created if they do not existing prior to reading or writing.
WriteBufferCreationConfig creation_config = 8;
}
// Configs sequencer auto-creation for write buffers.
//
// What that means depends on the used write buffer, e.g. for Kafka this will create a new topic w/ `n_sequencers`
// partitions.
message WriteBufferCreationConfig {
// Number of sequencers.
//
// How they are implemented depends on `type`, e.g. for Kafka this is mapped to the number of partitions.
//
// If 0, a server-side default is used
uint32 n_sequencers = 1;
// Special configs to by applied when sequencers are created.
//
// This depends on `type` and can setup parameters like retention policy.
//
// Contains 0 or more key value pairs
map<string, string> options = 2;
}
message RoutingConfig {
Sink sink = 2;
}

View File

@ -0,0 +1,31 @@
syntax = "proto3";
package influxdata.iox.management.v1;
option go_package = "github.com/influxdata/iox/management/v1";
import "google/protobuf/empty.proto";
// `PartitionTemplate` is used to compute the partition key of each row that
// gets written. It can consist of the table name, a column name and its value,
// a formatted time, or a string column and regex captures of its value. For
// columns that do not appear in the input row, a blank value is output.
//
// The key is constructed in order of the template parts; thus ordering changes
// what partition key is generated.
message PartitionTemplate {
message Part {
message ColumnFormat {
string column = 1;
string format = 2;
}
oneof part {
google.protobuf.Empty table = 1;
string column = 2;
string time = 3;
ColumnFormat regex = 4;
ColumnFormat strf_time = 5;
}
}
repeated Part parts = 1;
}

View File

@ -30,10 +30,14 @@ message ShardConfig {
/// If set to true the router will ignore any errors sent by the remote
/// targets in this route. That is, the write request will succeed
/// regardless of this route's success.
///
/// TODO(marco): remove this
bool ignore_errors = 3;
/// Mapping between shard IDs and node groups. Other sharding rules use
/// ShardId as targets.
///
/// TODO(marco): remove this
map<uint32, Sink> shards = 4;
}

View File

@ -0,0 +1,58 @@
syntax = "proto3";
package influxdata.iox.management.v1;
option go_package = "github.com/influxdata/iox/management/v1";
// Configures the use of a write buffer.
message WriteBufferConnection {
enum Direction {
// Unspecified direction, will be treated as an error.
DIRECTION_UNSPECIFIED = 0;
// Writes into the buffer aka "producer".
DIRECTION_WRITE = 1;
// Reads from the buffer aka "consumer".
DIRECTION_READ = 2;
}
// If the buffer is used for reading or writing.
Direction direction = 1;
// Which type should be used (e.g. "kafka", "mock")
string type = 2;
// Connection string, depends on `type`.
string connection = 3;
// Old non-nested auto-creation config.
reserved 4, 5, 7;
// Special configs to be applied when establishing the connection.
//
// This depends on `type` and can configure aspects like timeouts.
map<string, string> connection_config = 6;
// Specifies if the sequencers (e.g. for Kafka in form of a topic w/ `n_sequencers` partitions) should be
// automatically created if they do not existing prior to reading or writing.
WriteBufferCreationConfig creation_config = 8;
}
// Configs sequencer auto-creation for write buffers.
//
// What that means depends on the used write buffer, e.g. for Kafka this will create a new topic w/ `n_sequencers`
// partitions.
message WriteBufferCreationConfig {
// Number of sequencers.
//
// How they are implemented depends on `type`, e.g. for Kafka this is mapped to the number of partitions.
//
// If 0, a server-side default is used
uint32 n_sequencers = 1;
// Special configs to by applied when sequencers are created.
//
// This depends on `type` and can setup parameters like retention policy.
//
// Contains 0 or more key value pairs
map<string, string> options = 2;
}

View File

@ -0,0 +1,148 @@
syntax = "proto3";
package influxdata.iox.router.v1;
option go_package = "github.com/influxdata/iox/router/v1";
import "influxdata/iox/management/v1/partition_template.proto";
import "influxdata/iox/management/v1/shard.proto";
import "influxdata/iox/management/v1/write_buffer.proto";
// Router for writes and queries.
//
// A router acts similar to a "real" database except that it does NOT store or manage any data by itself but forwards
// this responsiblity to other nodes (which then in turn provide an actual database or another routing layer).
//
// # Write Routing
//
// ## Overall Picture
// Data is accepted from all sources, is sharded, and is (according to the sharding) written into the sink sets. There
// may be a prioritization for sources that is "HTTP and gRPC first, and write buffers in declared order".
//
// ```text
// ( HTTP )--+ +------->( sink set 1 )
// | |
// ( gRPC )--+-->( sharder )--> ...
// | |
// ( Write Buffer 1 )--+ +------->( sink set n )
// ... |
// ( Write Buffer n )--+
// ```
//
// ## Sharder
// A sharder takes data and for every row/line:
//
// 1. Checks if a matcher matches the row, first matcher wins. If that's the case, the row/line is directly sent to the
// sink set.
// 2. If no matcher matches the row/line is handled by the hash ring.
//
// ```text
// --->[ matcher 1? ]-{no}---...--->[ matcher n? ]-{no}---+
// | | |
// {yes} {yes} |
// | | |
// V V |
// ( sink set 1 ) ( sink set n ) |
// ^ ^ |
// | | |
// +--------( hash ring )-------+ |
// ^ |
// | |
// +-----------------------------+
// ```
//
// ## Sink Set
// Data is written to all sinks in the set in implementation-defined order. Errors do NOT short-circuit. If an error
// occurs for at least one sink that has `ignore_errors = false`, an error is returned. An empty sink set acts as NULL
// sink and always succeeds.
//
// **IMPORTANT: Queries are NOT distributed! The are always only answered by a single node.**
//
// # Query Routing
// Queries always arrive via gRPC and are forwarded one sink. The specific sink is selected via an engine that might
// take the following features into account:
//
// - **freshness:** For each sink what are the lasted sequence numbers pulled from the write buffer.
// - **stickyness:** The same client should ideally reach the same sink in subsequent requests to improve caching.
// - **circuit breaking:** If a sink is unhealthy it should be excluded from the candidate list for a while.
//
// ```text
// ( gRPC )-->[ selection engine ]-->( sink 1 )
// | ...
// +---->( sink n )
// ```
message Router {
// Router name.
//
// The name is unique for this node.
string name = 1;
// Sources of write requests.
WriteSources write_sources = 2;
// Write sharder.
//
// NOTE: This only uses the `specific_targets` and `hash_ring` config of the sharder. The other fields are ignored.
//
// TODO(marco): remove the note above once the `ShardConfig` was cleaned up.
influxdata.iox.management.v1.ShardConfig write_sharder = 3;
// Sinks for write requests.
map<uint32, WriteSinkSet> write_sinks = 4;
// Sinks for query requests.
QuerySinks query_sinks = 5;
// Template that generates a partition key for each row inserted into the database.
//
// This is a temporary config until the partition is moved entirely into the database.
//
// If not specified, a server-side default is used
//
// TODO(marco): remove this
influxdata.iox.management.v1.PartitionTemplate partition_template = 6;
}
// Sources of write request aka new data.
//
// Data is accepted from these sources and a status is provided back to it.
message WriteSources {
// If set writes via gRPC and HTTP are accepted.
//
// You may want to disable this when incoming data should solely be received via write buffer(s).
bool allow_unsequenced_inputs = 2;
// Write buffer connections.
repeated influxdata.iox.management.v1.WriteBufferConnection write_buffers = 3;
}
// Sink of write requests aka new data.
//
// Data is sent to this sink and a status is received from it.
message WriteSink {
// Where the data goes.
oneof sink {
// gRPC-based remote, addressed by its server ID.
uint32 grpc_remote = 1;
// Write buffer connection.
influxdata.iox.management.v1.WriteBufferConnection write_buffer = 2;
}
// If set, errors during writing to this sink are ignored and do NOT lead to an overall failure.
bool ignore_errors = 3;
}
// Set of write sinks.
message WriteSinkSet {
// Sinks within the set.
repeated WriteSink sinks = 1;
}
// Sinks for query requests.
//
// Queries are sent to one of these sinks and the resulting data is received from it.
//
// Note that the query results are flowing into the opposite direction (aka a query sink is a result source).
message QuerySinks {
// gRPC-based remotes, addressed by their server IDs.
repeated uint32 grpc_remotes = 1;
}

View File

@ -0,0 +1,76 @@
syntax = "proto3";
package influxdata.iox.router.v1;
option go_package = "github.com/influxdata/iox/router/v1";
import "influxdata/iox/router/v1/router.proto";
service RouterService {
// List remote IOx servers we know about.
rpc ListRemotes(ListRemotesRequest) returns (ListRemotesResponse);
// Update information about a remote IOx server (upsert).
rpc UpdateRemote(UpdateRemoteRequest) returns (UpdateRemoteResponse);
// Delete a reference to remote IOx server.
rpc DeleteRemote(DeleteRemoteRequest) returns (DeleteRemoteResponse);
// List configured routers.
rpc ListRouter(ListRouterRequest) returns (ListRouterResponse);
// Update router config (upsert).
rpc UpdateRouter(UpdateRouterRequest) returns (UpdateRouterResponse);
// Delete router.
rpc DeleteRouter(DeleteRouterRequest) returns (DeleteRouterResponse);
}
message ListRemotesRequest {}
message ListRemotesResponse {
repeated Remote remotes = 1;
}
// This resource represents a remote IOx server.
message Remote {
// The server ID associated with a remote IOx server.
uint32 id = 1;
// The address of the remote IOx server gRPC endpoint.
string connection_string = 2;
}
// Updates information about a remote IOx server.
//
// If a remote for a given `id` already exists, it is updated in place.
message UpdateRemoteRequest {
// If omitted, the remote associated with `id` will be removed.
Remote remote = 1;
// TODO(#917): add an optional flag to test the connection or not before adding it.
}
message UpdateRemoteResponse {}
message ListRouterRequest {}
message ListRouterResponse {
repeated Router routers = 1;
}
message DeleteRemoteRequest{
uint32 id = 1;
}
message DeleteRemoteResponse {}
message UpdateRouterRequest {
Router router = 1;
}
message UpdateRouterResponse {}
message DeleteRouterRequest {
string router_name = 1;
}
message DeleteRouterResponse {}

View File

@ -2,7 +2,7 @@
name = "grpc-router-test-gen"
version = "0.1.0"
authors = ["Marko Mikulicic <mkm@influxdata.com>"]
edition = "2018"
edition = "2021"
description = "Protobuf used in test for the grpc-router crate; need to be in a separate create because of linter limitations"
[dependencies]

View File

@ -2,7 +2,7 @@
name = "grpc-router"
version = "0.1.0"
authors = ["Marko Mikulicic <mkm@influxdata.com>"]
edition = "2018"
edition = "2021"
[dependencies]
bytes = "1.0"
@ -25,4 +25,4 @@ prost-build = "0.8"
tonic-build = "0.5"
[dev-dependencies]
grpc-router-test-gen = { path = "./grpc-router-test-gen" }
grpc-router-test-gen = { path = "../grpc-router-test-gen" }

View File

@ -2,7 +2,7 @@
name = "influxdb2_client"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2018"
edition = "2021"
[dependencies] # In alphabetical order
bytes = "1.0"

122
influxdb_iox/Cargo.toml Normal file
View File

@ -0,0 +1,122 @@
[package]
name = "influxdb_iox"
version = "0.1.0"
authors = ["Paul Dix <paul@pauldix.net>"]
edition = "2021"
default-run = "influxdb_iox"
[dependencies]
# Workspace dependencies, in alphabetical order
datafusion = { path = "../datafusion" }
data_types = { path = "../data_types" }
entry = { path = "../entry" }
generated_types = { path = "../generated_types" }
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight", "format"] }
influxdb_line_protocol = { path = "../influxdb_line_protocol" }
internal_types = { path = "../internal_types" }
iox_object_store = { path = "../iox_object_store" }
logfmt = { path = "../logfmt" }
metric = { path = "../metric" }
metric_exporters = { path = "../metric_exporters" }
mutable_buffer = { path = "../mutable_buffer" }
num_cpus = "1.13.0"
object_store = { path = "../object_store" }
observability_deps = { path = "../observability_deps" }
panic_logging = { path = "../panic_logging" }
parquet_catalog = { path = "../parquet_catalog" }
parquet_file = { path = "../parquet_file" }
predicate = { path = "../predicate" }
query = { path = "../query" }
read_buffer = { path = "../read_buffer" }
server = { path = "../server" }
trace = { path = "../trace" }
trace_exporters = { path = "../trace_exporters" }
trace_http = { path = "../trace_http" }
tracker = { path = "../tracker" }
trogging = { path = "../trogging", default-features = false, features = ["structopt"] }
time = { path = "../time" }
# Crates.io dependencies, in alphabetical order
arrow = { version = "6.0", features = ["prettyprint"] }
arrow-flight = "6.0"
backtrace = "0.3"
byteorder = "1.3.4"
bytes = "1.0"
chrono = "0.4"
clap = "2.33.1"
csv = "1.1"
dirs = "4.0.0"
dotenv = "0.15.0"
flate2 = "1.0"
futures = "0.3"
hashbrown = "0.11"
http = "0.2.0"
humantime = "2.1.0"
hyper = "0.14"
libc = { version = "0.2" }
log = "0.4"
once_cell = { version = "1.4.0", features = ["parking_lot"] }
parking_lot = "0.11.2"
itertools = "0.10.1"
parquet = "6.0"
pin-project = "1.0"
# used by arrow/datafusion anyway
comfy-table = { version = "4.0", default-features = false }
pprof = { version = "^0.5", default-features = false, features = ["flamegraph", "protobuf"], optional = true }
prost = "0.8"
rustyline = { version = "9.0", default-features = false }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.67"
serde_urlencoded = "0.7.0"
snafu = "0.6.9"
structopt = "0.3.25"
thiserror = "1.0.30"
tikv-jemalloc-ctl = { version = "0.4.0" }
tokio = { version = "1.11", features = ["macros", "rt-multi-thread", "parking_lot", "signal"] }
tokio-stream = { version = "0.1.2", features = ["net"] }
tokio-util = { version = "0.6.3" }
tonic = "0.5.0"
tonic-health = "0.4.0"
tonic-reflection = "0.2.0"
tower = "0.4"
uuid = { version = "0.8", features = ["v4"] }
# jemalloc-sys with unprefixed_malloc_on_supported_platforms feature and heappy are mutually exclusive
tikv-jemalloc-sys = { version = "0.4.0", optional = true, features = ["unprefixed_malloc_on_supported_platforms"] }
heappy = { git = "https://github.com/mkmik/heappy", rev = "20aa466524ac9ce34a4bae29f27ec11869b50e21", features = ["enable_heap_profiler", "jemalloc_shim", "measure_free"], optional = true }
[dev-dependencies]
# Workspace dependencies, in alphabetical order
arrow_util = { path = "../arrow_util" }
entry = { path = "../entry" }
influxdb2_client = { path = "../influxdb2_client" }
influxdb_storage_client = { path = "../influxdb_storage_client" }
influxdb_iox_client = { path = "../influxdb_iox_client", features = ["flight"] }
test_helpers = { path = "../test_helpers" }
parking_lot = "0.11.2"
write_buffer = { path = "../write_buffer" }
# Crates.io dependencies, in alphabetical order
assert_cmd = "2.0.2"
flate2 = "1.0"
hex = "0.4.2"
predicates = "2.0.3"
rand = "0.8.3"
reqwest = "0.11"
tempfile = "3.1.0"
[features]
default = ["jemalloc_replacing_malloc"]
azure = ["object_store/azure"] # Optional Azure Object store support
gcp = ["object_store/gcp"] # Optional GCP object store support
aws = ["object_store/aws"] # Optional AWS / S3 object store support
# pprof is an optional feature for pprof support
# heappy is an optional feature; Not on by default as it
# runtime overhead on all allocations (calls to malloc).
# Cargo cannot currently implement mutually exclusive features so let's force every build
# to pick either heappy or jemalloc_replacing_malloc feature at least until we figure out something better.
jemalloc_replacing_malloc = ["tikv-jemalloc-sys"]

View File

@ -5,13 +5,13 @@ use snafu::{OptionExt, ResultExt, Snafu};
use std::{convert::TryFrom, sync::Arc};
use structopt::StructOpt;
use crate::{object_store::ObjectStoreConfig, server_id::ServerIdConfig};
use crate::structopt_blocks::{object_store::ObjectStoreConfig, server_id::ServerIdConfig};
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Cannot parse object store config: {}", source))]
ObjectStoreParsing {
source: crate::object_store::ParseError,
source: crate::structopt_blocks::object_store::ParseError,
},
#[snafu(display("No server ID provided"))]
@ -38,22 +38,9 @@ pub enum Error {
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Interrogate internal database data
#[derive(Debug, StructOpt)]
pub struct Config {
#[structopt(subcommand)]
command: Command,
}
#[derive(Debug, StructOpt)]
enum Command {
/// Dump preserved catalog.
DumpCatalog(DumpCatalog),
}
/// Dump preserved catalog.
#[derive(Debug, StructOpt)]
struct DumpCatalog {
pub struct Config {
// object store config
#[structopt(flatten)]
object_store_config: ObjectStoreConfig,
@ -71,7 +58,7 @@ struct DumpCatalog {
}
#[derive(Debug, StructOpt)]
struct DumpOptions {
pub struct DumpOptions {
/// Show debug output of `DecodedIoxParquetMetaData` if decoding succeeds, show the decoding error otherwise.
///
/// Since this contains the entire Apache Parquet metadata object this is quite verbose and is usually not
@ -116,29 +103,21 @@ impl From<DumpOptions> for parquet_catalog::dump::DumpOptions {
}
pub async fn command(config: Config) -> Result<()> {
match config.command {
Command::DumpCatalog(dump_catalog) => {
let object_store = ObjectStore::try_from(&dump_catalog.object_store_config)
.context(ObjectStoreParsing)?;
let database_name =
DatabaseName::try_from(dump_catalog.db_name).context(InvalidDbName)?;
let server_id = dump_catalog
.server_id_config
.server_id
.context(NoServerId)?;
let iox_object_store =
IoxObjectStore::find_existing(Arc::new(object_store), server_id, &database_name)
.await
.context(IoxObjectStoreFailure)?
.context(NoIoxObjectStore)?;
let object_store =
ObjectStore::try_from(&config.object_store_config).context(ObjectStoreParsing)?;
let database_name = DatabaseName::try_from(config.db_name).context(InvalidDbName)?;
let server_id = config.server_id_config.server_id.context(NoServerId)?;
let iox_object_store =
IoxObjectStore::find_existing(Arc::new(object_store), server_id, &database_name)
.await
.context(IoxObjectStoreFailure)?
.context(NoIoxObjectStore)?;
let mut writer = std::io::stdout();
let options = dump_catalog.dump_options.into();
parquet_catalog::dump::dump(&iox_object_store, &mut writer, options)
.await
.context(DumpCatalogFailure)?;
}
}
let mut writer = std::io::stdout();
let options = config.dump_options.into();
parquet_catalog::dump::dump(&iox_object_store, &mut writer, options)
.await
.context(DumpCatalogFailure)?;
Ok(())
}

View File

@ -0,0 +1,41 @@
use snafu::{ResultExt, Snafu};
use structopt::StructOpt;
mod dump_catalog;
mod print_cpu;
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Error in dump-catalog subcommand: {}", source))]
DumpCatalogError { source: dump_catalog::Error },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
/// Interrogate internal database data
#[derive(Debug, StructOpt)]
pub struct Config {
#[structopt(subcommand)]
command: Command,
}
#[derive(Debug, StructOpt)]
enum Command {
/// Dump preserved catalog.
DumpCatalog(dump_catalog::Config),
/// Prints what CPU features are used by the compiler by default.
PrintCpu,
}
pub async fn command(config: Config) -> Result<()> {
match config.command {
Command::DumpCatalog(dump_catalog) => dump_catalog::command(dump_catalog)
.await
.context(DumpCatalogError),
Command::PrintCpu => {
print_cpu::main();
Ok(())
}
}
}

View File

@ -1,4 +1,3 @@
#![recursion_limit = "512"]
/// Prints what CPU features are used by the compiler by default.
///
/// Script from:
@ -29,7 +28,7 @@ macro_rules! print_if_feature_enabled {
}
}
fn main() {
pub fn main() {
println!("rustc is using the following target options");
print_if_feature_enabled!(

View File

@ -2,8 +2,7 @@
use crate::{
influxdb_ioxd::{self, serving_readiness::ServingReadinessState},
object_store::ObjectStoreConfig,
server_id::ServerIdConfig,
structopt_blocks::{object_store::ObjectStoreConfig, server_id::ServerIdConfig},
};
use std::{net::SocketAddr, net::ToSocketAddrs};
use structopt::StructOpt;

View File

@ -1,6 +1,6 @@
use crate::{
commands::run::Config,
object_store::{check_object_store, warn_about_inmem_store},
structopt_blocks::object_store::{check_object_store, warn_about_inmem_store},
};
use futures::{future::FusedFuture, pin_mut, FutureExt};
use hyper::server::conn::AddrIncoming;
@ -44,12 +44,12 @@ pub enum Error {
#[snafu(display("Cannot parse object store config: {}", source))]
ObjectStoreParsing {
source: crate::object_store::ParseError,
source: crate::structopt_blocks::object_store::ParseError,
},
#[snafu(display("Cannot check object store config: {}", source))]
ObjectStoreCheck {
source: crate::object_store::CheckError,
source: crate::structopt_blocks::object_store::CheckError,
},
#[snafu(display("Cannot create tracing pipeline: {}", source))]
@ -79,7 +79,10 @@ async fn wait_for_signal() {
let _ = tokio::signal::ctrl_c().await;
}
async fn make_application(config: &Config) -> Result<Arc<ApplicationState>> {
async fn make_application(
config: &Config,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<Arc<ApplicationState>> {
warn_about_inmem_store(&config.object_store_config);
let object_store =
ObjectStore::try_from(&config.object_store_config).context(ObjectStoreParsing)?;
@ -91,6 +94,7 @@ async fn make_application(config: &Config) -> Result<Arc<ApplicationState>> {
Ok(Arc::new(ApplicationState::new(
object_storage,
config.num_worker_threads,
trace_collector,
)))
}
@ -178,7 +182,11 @@ pub async fn main(config: Config) -> Result<()> {
let f = SendPanicsToTracing::new();
std::mem::forget(f);
let application = make_application(&config).await?;
let async_exporter = config.tracing_config.build().context(Tracing)?;
let trace_collector = async_exporter
.clone()
.map(|x| -> Arc<dyn TraceCollector> { x });
let application = make_application(&config, trace_collector).await?;
// Register jemalloc metrics
application
@ -189,17 +197,12 @@ pub async fn main(config: Config) -> Result<()> {
let grpc_listener = grpc_listener(config.grpc_bind_address).await?;
let http_listener = http_listener(config.http_bind_address).await?;
let async_exporter = config.tracing_config.build().context(Tracing)?;
let trace_collector = async_exporter
.clone()
.map(|x| -> Arc<dyn TraceCollector> { x });
let r = serve(
config,
application,
grpc_listener,
http_listener,
trace_collector,
app_server,
)
.await;
@ -241,7 +244,6 @@ async fn serve(
application: Arc<ApplicationState>,
grpc_listener: tokio::net::TcpListener,
http_listener: AddrIncoming,
trace_collector: Option<Arc<dyn TraceCollector>>,
app_server: Arc<AppServer<ConnectionManager>>,
) -> Result<()> {
// Construct a token to trigger shutdown of API services
@ -262,7 +264,6 @@ async fn serve(
Arc::clone(&application),
Arc::clone(&app_server),
trace_header_parser.clone(),
trace_collector.clone(),
frontend_shutdown.clone(),
config.initial_serving_state.into(),
)
@ -279,7 +280,6 @@ async fn serve(
frontend_shutdown.clone(),
max_http_request_size,
trace_header_parser,
trace_collector,
)
.fuse();
info!("HTTP server listening");
@ -381,7 +381,7 @@ mod tests {
use super::*;
use ::http::{header::HeaderName, HeaderValue};
use data_types::{database_rules::DatabaseRules, DatabaseName};
use influxdb_iox_client::connection::Connection;
use influxdb_iox_client::{connection::Connection, flight::PerformQuery};
use server::rules::ProvidedDatabaseRules;
use std::{convert::TryInto, num::NonZeroU64};
use structopt::StructOpt;
@ -412,16 +412,9 @@ mod tests {
let grpc_listener = grpc_listener(config.grpc_bind_address).await.unwrap();
let http_listener = http_listener(config.grpc_bind_address).await.unwrap();
serve(
config,
application,
grpc_listener,
http_listener,
None,
server,
)
.await
.unwrap()
serve(config, application, grpc_listener, http_listener, server)
.await
.unwrap()
}
#[tokio::test]
@ -430,7 +423,7 @@ mod tests {
// Create a server and wait for it to initialize
let config = test_config(Some(23));
let application = make_application(&config).await.unwrap();
let application = make_application(&config, None).await.unwrap();
let server = make_server(Arc::clone(&application), &config);
server.wait_for_init().await.unwrap();
@ -458,7 +451,7 @@ mod tests {
async fn test_server_shutdown_uninit() {
// Create a server but don't set a server id
let config = test_config(None);
let application = make_application(&config).await.unwrap();
let application = make_application(&config, None).await.unwrap();
let server = make_server(Arc::clone(&application), &config);
let serve_fut = test_serve(config, application, Arc::clone(&server)).fuse();
@ -489,7 +482,7 @@ mod tests {
async fn test_server_panic() {
// Create a server and wait for it to initialize
let config = test_config(Some(999999999));
let application = make_application(&config).await.unwrap();
let application = make_application(&config, None).await.unwrap();
let server = make_server(Arc::clone(&application), &config);
server.wait_for_init().await.unwrap();
@ -516,7 +509,7 @@ mod tests {
async fn test_database_panic() {
// Create a server and wait for it to initialize
let config = test_config(Some(23));
let application = make_application(&config).await.unwrap();
let application = make_application(&config, None).await.unwrap();
let server = make_server(Arc::clone(&application), &config);
server.wait_for_init().await.unwrap();
@ -597,7 +590,9 @@ mod tests {
JoinHandle<Result<()>>,
) {
let config = test_config(Some(23));
let application = make_application(&config).await.unwrap();
let application = make_application(&config, Some(Arc::<T>::clone(collector)))
.await
.unwrap();
let server = make_server(Arc::clone(&application), &config);
server.wait_for_init().await.unwrap();
@ -611,7 +606,6 @@ mod tests {
application,
grpc_listener,
http_listener,
Some(Arc::<T>::clone(collector)),
Arc::clone(&server),
);
@ -690,6 +684,11 @@ mod tests {
join.await.unwrap().unwrap();
}
/// Ensure that query is fully executed.
async fn consume_query(mut query: PerformQuery) {
while query.next().await.unwrap().is_some() {}
}
#[tokio::test]
async fn test_query_tracing() {
let collector = Arc::new(RingBufferTraceCollector::new(100));
@ -721,10 +720,13 @@ mod tests {
.unwrap();
let mut flight = influxdb_iox_client::flight::Client::new(conn.clone());
flight
.perform_query(db_info.db_name(), "select * from cpu;")
.await
.unwrap();
consume_query(
flight
.perform_query(db_info.db_name(), "select * from cpu;")
.await
.unwrap(),
)
.await;
flight
.perform_query("nonexistent", "select * from cpu;")

View File

@ -52,7 +52,6 @@ use std::{
};
use tokio_util::sync::CancellationToken;
use tower::Layer;
use trace::TraceCollector;
use trace_http::tower::TraceLayer;
/// Constants used in API error codes.
@ -865,12 +864,12 @@ pub async fn serve<M>(
shutdown: CancellationToken,
max_request_size: usize,
trace_header_parser: TraceHeaderParser,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> Result<(), hyper::Error>
where
M: ConnectionManager + Send + Sync + Debug + 'static,
{
let metric_registry = Arc::clone(application.metric_registry());
let trace_collector = application.trace_collector().clone();
let trace_layer = TraceLayer::new(trace_header_parser, metric_registry, trace_collector, false);
let lp_metrics = Arc::new(LineProtocolMetrics::new(
@ -924,6 +923,7 @@ mod tests {
Arc::new(ApplicationState::new(
Arc::new(ObjectStore::new_in_memory()),
None,
None,
))
}
@ -939,7 +939,7 @@ mod tests {
async fn test_health() {
let application = make_application();
let app_server = make_server(Arc::clone(&application));
let server_url = test_server(application, Arc::clone(&app_server), None);
let server_url = test_server(application, Arc::clone(&app_server));
let client = Client::new();
let response = client.get(&format!("{}/health", server_url)).send().await;
@ -958,7 +958,7 @@ mod tests {
.register_metric("my_metric", "description");
let app_server = make_server(Arc::clone(&application));
let server_url = test_server(application, Arc::clone(&app_server), None);
let server_url = test_server(application, Arc::clone(&app_server));
metric.recorder(&[("tag", "value")]).inc(20);
@ -998,15 +998,15 @@ mod tests {
#[tokio::test]
async fn test_tracing() {
let application = make_application();
let app_server = make_server(Arc::clone(&application));
let trace_collector = Arc::new(RingBufferTraceCollector::new(5));
let server_url = test_server(
application,
Arc::clone(&app_server),
let application = Arc::new(ApplicationState::new(
Arc::new(ObjectStore::new_in_memory()),
None,
Some(Arc::<RingBufferTraceCollector>::clone(&trace_collector)),
);
));
let app_server = make_server(Arc::clone(&application));
let server_url = test_server(application, Arc::clone(&app_server));
let client = Client::new();
let response = client
@ -1036,7 +1036,7 @@ mod tests {
.create_database(make_rules("MyOrg_MyBucket"))
.await
.unwrap();
let server_url = test_server(application, Arc::clone(&app_server), None);
let server_url = test_server(application, Arc::clone(&app_server));
let client = Client::new();
@ -1083,7 +1083,7 @@ mod tests {
.create_database(make_rules("MyOrg_MyBucket"))
.await
.unwrap();
let server_url = test_server(application, Arc::clone(&app_server), None);
let server_url = test_server(application, Arc::clone(&app_server));
// Set up client
let client = Client::new();
@ -1209,7 +1209,7 @@ mod tests {
.await
.unwrap();
let server_url = test_server(application, Arc::clone(&app_server), None);
let server_url = test_server(application, Arc::clone(&app_server));
let client = Client::new();
@ -1399,7 +1399,7 @@ mod tests {
.create_database(make_rules("MyOrg_MyBucket"))
.await
.unwrap();
let server_url = test_server(application, Arc::clone(&app_server), None);
let server_url = test_server(application, Arc::clone(&app_server));
let client = Client::new();
@ -1693,7 +1693,6 @@ mod tests {
fn test_server(
application: Arc<ApplicationState>,
server: Arc<AppServer<ConnectionManagerImpl>>,
trace_collector: Option<Arc<dyn TraceCollector>>,
) -> String {
// NB: specify port 0 to let the OS pick the port.
let bind_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
@ -1710,7 +1709,6 @@ mod tests {
CancellationToken::new(),
TEST_MAX_REQUEST_SIZE,
trace_header_parser,
trace_collector,
));
println!("Started server at {}", server_url);
server_url
@ -1734,7 +1732,7 @@ mod tests {
.create_database(make_rules("MyOrg_MyBucket"))
.await
.unwrap();
let server_url = test_server(application, Arc::clone(&app_server), None);
let server_url = test_server(application, Arc::clone(&app_server));
(app_server, server_url)
}

View File

@ -7,11 +7,7 @@ use query::{
exec::IOxExecutionContext,
frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner},
group_by::{Aggregate, WindowDuration},
plan::{
fieldlist::FieldListPlan,
seriesset::SeriesSetPlans,
stringset::{StringSetPlan, TableNamePlanBuilder},
},
plan::{fieldlist::FieldListPlan, seriesset::SeriesSetPlans, stringset::StringSetPlan},
QueryDatabase,
};
@ -54,7 +50,7 @@ impl Planner {
&self,
database: Arc<D>,
predicate: Predicate,
) -> Result<TableNamePlanBuilder>
) -> Result<StringSetPlan>
where
D: QueryDatabase + 'static,
{

View File

@ -11,7 +11,6 @@ use trace_http::ctx::TraceHeaderParser;
use crate::influxdb_ioxd::serving_readiness::ServingReadiness;
use server::{connection::ConnectionManager, ApplicationState, Server};
use trace::TraceCollector;
pub mod error;
mod flight;
@ -90,7 +89,6 @@ pub async fn serve<M>(
application: Arc<ApplicationState>,
server: Arc<Server<M>>,
trace_header_parser: TraceHeaderParser,
trace_collector: Option<Arc<dyn TraceCollector>>,
shutdown: CancellationToken,
serving_readiness: ServingReadiness,
) -> Result<()>
@ -109,7 +107,7 @@ where
let mut builder = builder.layer(trace_http::tower::TraceLayer::new(
trace_header_parser,
Arc::clone(application.metric_registry()),
trace_collector,
application.trace_collector().clone(),
true,
));

View File

@ -724,14 +724,14 @@ where
let db = db_store.db(db_name).context(DatabaseNotFound { db_name })?;
let ctx = db.new_query_context(span_ctx);
let builder = Planner::new(&ctx)
let plan = Planner::new(&ctx)
.table_names(db, predicate)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingTables { db_name })?;
let table_names = ctx
.to_table_names(builder)
.to_string_set(plan)
.await
.map_err(|e| Box::new(e) as _)
.context(ListingTables { db_name })?;
@ -1116,11 +1116,11 @@ mod tests {
let chunk0 = TestChunk::new("h2o")
.with_id(0)
.with_predicate_match(PredicateMatch::AtLeastOne);
.with_predicate_match(PredicateMatch::AtLeastOneNonNullField);
let chunk1 = TestChunk::new("o2")
.with_id(1)
.with_predicate_match(PredicateMatch::AtLeastOne);
.with_predicate_match(PredicateMatch::AtLeastOneNonNullField);
fixture
.test_storage
@ -1474,7 +1474,8 @@ mod tests {
tag_key: [0].into(),
};
let chunk = TestChunk::new("h2o").with_predicate_match(PredicateMatch::AtLeastOne);
let chunk =
TestChunk::new("h2o").with_predicate_match(PredicateMatch::AtLeastOneNonNullField);
fixture
.test_storage
@ -1724,7 +1725,8 @@ mod tests {
// Note we don't include the actual line / column in the
// expected panic message to avoid needing to update the test
// whenever the source code file changed.
let expected_error = "panicked at 'This is a test panic', src/influxdb_ioxd/rpc/testing.rs";
let expected_error =
"panicked at 'This is a test panic', influxdb_iox/src/influxdb_ioxd/rpc/testing.rs";
assert_contains!(captured_logs, expected_error);
// Ensure that panics don't exhaust the tokio executor by

View File

@ -1,4 +1,5 @@
//! Entrypoint of InfluxDB IOx binary
#![recursion_limit = "512"] // required for print_cpu
#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
#![warn(
missing_debug_implementations,
@ -31,8 +32,7 @@ mod commands {
pub mod tracing;
}
mod object_store;
mod server_id;
mod structopt_blocks;
pub mod influxdb_ioxd;

View File

@ -0,0 +1,5 @@
//! Building blocks for [`structopt`]-driven configs.
//!
//! They can easily be re-used using `#[structopt(flatten)]`.
pub mod object_store;
pub mod server_id;

View File

@ -292,7 +292,7 @@ struct TestServer {
}
// Options for creating test servers
#[derive(Default, Debug)]
#[derive(Default, Debug, Clone)]
pub struct TestConfig {
/// Additional environment variables
env: Vec<(String, String)>,

View File

@ -27,3 +27,16 @@ async fn test_dump_catalog() {
predicate::str::contains("Transaction").and(predicate::str::contains("DecodeError")),
);
}
#[tokio::test]
async fn test_print_cpu() {
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("debug")
.arg("print-cpu")
.assert()
.success()
.stdout(predicate::str::contains(
"rustc is using the following target options",
));
}

View File

@ -1121,8 +1121,8 @@ async fn test_get_server_status_global_error() {
let server_fixture = ServerFixture::create_single_use().await;
let mut client = server_fixture.management_client();
// we need to "break" the object store AFTER the server was started, otherwise the server process will exit
// immediately
// we need to "break" the object store AFTER the server was started, otherwise the server
// process will exit immediately
let metadata = server_fixture.dir().metadata().unwrap();
let mut permissions = metadata.permissions();
permissions.set_mode(0o000);
@ -1137,7 +1137,8 @@ async fn test_get_server_status_global_error() {
loop {
let status = client.get_server_status().await.unwrap();
if let Some(err) = status.error {
assert!(dbg!(err.message).starts_with("error listing databases in object storage:"));
assert!(dbg!(err.message)
.starts_with("error getting server config from object storage:"));
assert!(status.database_statuses.is_empty());
return;
}
@ -1208,6 +1209,33 @@ async fn test_get_server_status_db_error() {
other_gen_path.push("rules.pb");
std::fs::write(other_gen_path, "foo").unwrap();
// create the server config listing the ownership of these three databases
let mut path = server_fixture.dir().to_path_buf();
path.push("42");
path.push("config.pb");
let data = ServerConfig {
databases: vec![
(String::from("my_db"), String::from("42/my_db")),
(
String::from("soft_deleted"),
String::from("42/soft_deleted"),
),
(
String::from("multiple_active"),
String::from("42/multiple_active"),
),
]
.into_iter()
.collect(),
};
let mut encoded = bytes::BytesMut::new();
generated_types::server_config::encode_persisted_server_config(&data, &mut encoded)
.expect("server config serialization should be valid");
let encoded = encoded.freeze();
std::fs::write(path, encoded).unwrap();
// initialize
client.update_server_id(42).await.expect("set ID failed");
server_fixture.wait_server_initialized().await;

View File

@ -8,8 +8,8 @@ use generated_types::google::longrunning::IoxOperation;
use generated_types::influxdata::iox::management::v1::{
operation_metadata::Job, WipePreservedCatalog,
};
use tempfile::TempDir;
use test_helpers::make_temp_file;
use write_buffer::maybe_skip_kafka_integration;
use crate::{
common::server_fixture::ServerFixture,
@ -910,9 +910,9 @@ async fn test_wipe_persisted_catalog_error_db_exists() {
#[tokio::test]
async fn test_skip_replay() {
let kafka_connection = maybe_skip_kafka_integration!();
let write_buffer_dir = TempDir::new().unwrap();
let db_name = rand_name();
let server_fixture = fixture_replay_broken(&db_name, &kafka_connection).await;
let server_fixture = fixture_replay_broken(&db_name, write_buffer_dir.path()).await;
let addr = server_fixture.grpc_base();
Command::cargo_bin("influxdb_iox")

View File

@ -1,6 +1,5 @@
use std::convert::TryFrom;
use std::iter::once;
use std::num::NonZeroU32;
use std::path::Path;
use std::time::Duration;
use std::{convert::TryInto, str, u32};
use std::{sync::Arc, time::SystemTime};
@ -31,9 +30,9 @@ use generated_types::{
ReadSource, TimestampRange,
};
use influxdb_iox_client::{connection::Connection, flight::PerformQuery};
use time::SystemProvider;
use write_buffer::core::WriteBufferWriting;
use write_buffer::kafka::test_utils::{kafka_sequencer_options, purge_kafka_topic};
use write_buffer::kafka::KafkaBufferProducer;
use write_buffer::file::FileBufferProducer;
use crate::common::server_fixture::{ServerFixture, TestConfig, DEFAULT_SERVER_ID};
@ -655,7 +654,7 @@ pub async fn fixture_broken_catalog(db_name: &str) -> ServerFixture {
}
/// Creates a database that cannot be replayed
pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> ServerFixture {
pub async fn fixture_replay_broken(db_name: &str, write_buffer_path: &Path) -> ServerFixture {
let server_id = DEFAULT_SERVER_ID;
let test_config = TestConfig::new().with_env("INFLUXDB_IOX_SKIP_REPLAY", "no");
@ -680,11 +679,11 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser
name: db_name.to_string(),
write_buffer_connection: Some(WriteBufferConnection {
direction: write_buffer_connection::Direction::Read.into(),
r#type: "kafka".to_string(),
connection: kafka_connection.to_string(),
r#type: "file".to_string(),
connection: write_buffer_path.display().to_string(),
creation_config: Some(WriteBufferCreationConfig {
n_sequencers: 1,
options: kafka_sequencer_options(),
..Default::default()
}),
..Default::default()
}),
@ -708,45 +707,42 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser
.unwrap();
// ingest data as mixed throughput
let creation_config = Some(data_types::database_rules::WriteBufferCreationConfig {
n_sequencers: NonZeroU32::try_from(1).unwrap(),
options: kafka_sequencer_options(),
});
let producer = KafkaBufferProducer::new(
kafka_connection,
let time_provider = Arc::new(SystemProvider::new());
let producer = FileBufferProducer::new(
write_buffer_path,
db_name,
&Default::default(),
creation_config.as_ref(),
Arc::new(time::SystemProvider::new()),
Default::default(),
time_provider,
)
.await
.unwrap();
producer
let sequencer_id = producer.sequencer_ids().into_iter().next().unwrap();
let (sequence_1, _) = producer
.store_entry(
&lp_to_entries("table_1,partition_by=a foo=1 10", &partition_template)
.pop()
.unwrap(),
0,
sequencer_id,
None,
)
.await
.unwrap();
producer
let (sequence_2, _) = producer
.store_entry(
&lp_to_entries("table_1,partition_by=b foo=2 20", &partition_template)
.pop()
.unwrap(),
0,
sequencer_id,
None,
)
.await
.unwrap();
producer
let (sequence_3, _) = producer
.store_entry(
&lp_to_entries("table_1,partition_by=b foo=3 30", &partition_template)
.pop()
.unwrap(),
0,
sequencer_id,
None,
)
.await
@ -766,8 +762,7 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser
)
.await;
// purge data from Kafka
purge_kafka_topic(kafka_connection, db_name).await;
// add new entry to the end
producer
.store_entry(
&lp_to_entries("table_1,partition_by=c foo=4 40", &partition_template)
@ -779,6 +774,29 @@ pub async fn fixture_replay_broken(db_name: &str, kafka_connection: &str) -> Ser
.await
.unwrap();
// purge data from write buffer
write_buffer::file::test_utils::remove_entry(
write_buffer_path,
db_name,
sequencer_id,
sequence_1.number,
)
.await;
write_buffer::file::test_utils::remove_entry(
write_buffer_path,
db_name,
sequencer_id,
sequence_2.number,
)
.await;
write_buffer::file::test_utils::remove_entry(
write_buffer_path,
db_name,
sequencer_id,
sequence_3.number,
)
.await;
// Try to replay and error
let fixture = fixture.restart_server().await;

View File

@ -1,9 +1,13 @@
use crate::{
common::server_fixture::ServerFixture,
common::{
server_fixture::{ServerFixture, TestConfig},
udp_listener::UdpCapture,
},
end_to_end_cases::scenario::{rand_name, DatabaseBuilder},
};
use arrow_util::assert_batches_sorted_eq;
use entry::{test_helpers::lp_to_entry, Entry};
use entry::test_helpers::lp_to_entry;
use futures::StreamExt;
use generated_types::influxdata::iox::management::v1::{
write_buffer_connection::Direction as WriteBufferDirection, WriteBufferConnection,
};
@ -11,35 +15,35 @@ use influxdb_iox_client::{
management::{generated_types::WriteBufferCreationConfig, CreateDatabaseError},
write::WriteError,
};
use rdkafka::{
consumer::{Consumer, StreamConsumer},
producer::{FutureProducer, FutureRecord},
ClientConfig, Message, Offset, TopicPartitionList,
};
use std::convert::TryFrom;
use std::sync::Arc;
use tempfile::TempDir;
use test_helpers::assert_contains;
use write_buffer::{kafka::test_utils::kafka_sequencer_options, maybe_skip_kafka_integration};
use time::SystemProvider;
use write_buffer::{
core::{WriteBufferReading, WriteBufferWriting},
file::{FileBufferConsumer, FileBufferProducer},
};
#[tokio::test]
async fn writes_go_to_kafka() {
let kafka_connection = maybe_skip_kafka_integration!();
async fn writes_go_to_write_buffer() {
let write_buffer_dir = TempDir::new().unwrap();
// set up a database with a write buffer pointing at kafka
// set up a database with a write buffer pointing at write buffer
let server = ServerFixture::create_shared().await;
let db_name = rand_name();
let write_buffer_connection = WriteBufferConnection {
direction: WriteBufferDirection::Write.into(),
r#type: "kafka".to_string(),
connection: kafka_connection.to_string(),
r#type: "file".to_string(),
connection: write_buffer_dir.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig {
n_sequencers: 1,
options: kafka_sequencer_options(),
..Default::default()
}),
..Default::default()
};
DatabaseBuilder::new(db_name.clone())
.write_buffer(write_buffer_connection)
.write_buffer(write_buffer_connection.clone())
.build(server.grpc_channel())
.await;
@ -58,43 +62,32 @@ async fn writes_go_to_kafka() {
.expect("cannot write");
assert_eq!(num_lines_written, 3);
// check the data is in kafka
let mut cfg = ClientConfig::new();
cfg.set("bootstrap.servers", kafka_connection);
cfg.set("session.timeout.ms", "6000");
cfg.set("enable.auto.commit", "false");
cfg.set("group.id", "placeholder");
let consumer: StreamConsumer = cfg.create().unwrap();
let mut topics = TopicPartitionList::new();
topics.add_partition(&db_name, 0);
topics
.set_partition_offset(&db_name, 0, Offset::Beginning)
.unwrap();
consumer.assign(&topics).unwrap();
let message = consumer.recv().await.unwrap();
assert_eq!(message.topic(), db_name);
let entry = Entry::try_from(message.payload().unwrap().to_vec()).unwrap();
// check the data is in write buffer
let mut consumer =
FileBufferConsumer::new(write_buffer_dir.path(), &db_name, Default::default(), None)
.await
.unwrap();
let (_, mut stream) = consumer.streams().into_iter().next().unwrap();
let sequenced_entry = stream.stream.next().await.unwrap().unwrap();
let entry = sequenced_entry.entry();
let partition_writes = entry.partition_writes().unwrap();
assert_eq!(partition_writes.len(), 2);
}
#[tokio::test]
async fn writes_go_to_kafka_whitelist() {
let kafka_connection = maybe_skip_kafka_integration!();
async fn writes_go_to_write_buffer_whitelist() {
let write_buffer_dir = TempDir::new().unwrap();
// set up a database with a write buffer pointing at kafka
// set up a database with a write buffer pointing at write buffer
let server = ServerFixture::create_shared().await;
let db_name = rand_name();
let write_buffer_connection = WriteBufferConnection {
direction: WriteBufferDirection::Write.into(),
r#type: "kafka".to_string(),
connection: kafka_connection.to_string(),
r#type: "file".to_string(),
connection: write_buffer_dir.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig {
n_sequencers: 1,
options: kafka_sequencer_options(),
..Default::default()
}),
..Default::default()
};
@ -121,99 +114,74 @@ async fn writes_go_to_kafka_whitelist() {
.expect("cannot write");
assert_eq!(num_lines_written, 4);
// check the data is in kafka
let mut cfg = ClientConfig::new();
cfg.set("bootstrap.servers", kafka_connection);
cfg.set("session.timeout.ms", "6000");
cfg.set("enable.auto.commit", "false");
cfg.set("group.id", "placeholder");
let consumer: StreamConsumer = cfg.create().unwrap();
let mut topics = TopicPartitionList::new();
topics.add_partition(&db_name, 0);
topics
.set_partition_offset(&db_name, 0, Offset::Beginning)
.unwrap();
consumer.assign(&topics).unwrap();
let message = consumer.recv().await.unwrap();
assert_eq!(message.topic(), db_name);
let entry = Entry::try_from(message.payload().unwrap().to_vec()).unwrap();
// check the data is in write buffer
let mut consumer =
FileBufferConsumer::new(write_buffer_dir.path(), &db_name, Default::default(), None)
.await
.unwrap();
let (_, mut stream) = consumer.streams().into_iter().next().unwrap();
let sequenced_entry = stream.stream.next().await.unwrap().unwrap();
let entry = sequenced_entry.entry();
let partition_writes = entry.partition_writes().unwrap();
assert_eq!(partition_writes.len(), 1);
}
async fn produce_to_kafka_directly(
producer: &FutureProducer,
lp: &str,
topic: &str,
partition: Option<i32>,
) {
let entry = lp_to_entry(lp);
let mut record: FutureRecord<'_, String, _> = FutureRecord::to(topic).payload(entry.data());
if let Some(pid) = partition {
record = record.partition(pid);
}
producer
.send_result(record)
.unwrap()
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn reads_come_from_kafka() {
let kafka_connection = maybe_skip_kafka_integration!();
async fn reads_come_from_write_buffer() {
let write_buffer_dir = TempDir::new().unwrap();
// set up a database to read from Kafka
// set up a database to read from write buffer
let server = ServerFixture::create_shared().await;
let db_name = rand_name();
let write_buffer_connection = WriteBufferConnection {
direction: WriteBufferDirection::Read.into(),
r#type: "kafka".to_string(),
connection: kafka_connection.to_string(),
r#type: "file".to_string(),
connection: write_buffer_dir.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig {
n_sequencers: 2,
options: kafka_sequencer_options(),
..Default::default()
}),
..Default::default()
};
// Common Kafka config
let mut cfg = ClientConfig::new();
cfg.set("bootstrap.servers", &kafka_connection);
cfg.set("message.timeout.ms", "5000");
DatabaseBuilder::new(db_name.clone())
.write_buffer(write_buffer_connection)
.build(server.grpc_channel())
.await;
// put some points in Kafka
let producer: FutureProducer = cfg.create().unwrap();
// put some points in write buffer
let time_provider = Arc::new(SystemProvider::new());
let producer = FileBufferProducer::new(
write_buffer_dir.path(),
&db_name,
Default::default(),
time_provider,
)
.await
.unwrap();
let mut sequencer_ids = producer.sequencer_ids().into_iter();
let sequencer_id_1 = sequencer_ids.next().unwrap();
let sequencer_id_2 = sequencer_ids.next().unwrap();
// Kafka partitions must be configured based on the primary key because ordering across Kafka
// partitions is undefined, so the upsert semantics would be undefined. Entries that can
// potentially be merged must end up in the same Kafka partition. This test follows that
// constraint, but doesn't actually encode it.
// Put some data for `upc,region=west` in partition 0
// Put some data for `upc,region=west` in sequencer 1
let lp_lines = [
"upc,region=west user=23.2 100",
"upc,region=west user=21.0 150",
];
produce_to_kafka_directly(&producer, &lp_lines.join("\n"), &db_name, Some(0)).await;
producer
.store_entry(&lp_to_entry(&lp_lines.join("\n")), sequencer_id_1, None)
.await
.unwrap();
// Put some data for `upc,region=east` in partition 1
// Put some data for `upc,region=east` in sequencer 2
let lp_lines = [
"upc,region=east user=76.2 300",
"upc,region=east user=88.7 350",
];
produce_to_kafka_directly(&producer, &lp_lines.join("\n"), &db_name, Some(1)).await;
producer
.store_entry(&lp_to_entry(&lp_lines.join("\n")), sequencer_id_2, None)
.await
.unwrap();
let check = async {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));
@ -261,19 +229,19 @@ async fn reads_come_from_kafka() {
}
#[tokio::test]
async fn cant_write_to_db_reading_from_kafka() {
let kafka_connection = maybe_skip_kafka_integration!();
async fn cant_write_to_db_reading_from_write_buffer() {
let write_buffer_dir = TempDir::new().unwrap();
// set up a database to read from Kafka
// set up a database to read from write buffer
let server = ServerFixture::create_shared().await;
let db_name = rand_name();
let write_buffer_connection = WriteBufferConnection {
direction: WriteBufferDirection::Read.into(),
r#type: "kafka".to_string(),
connection: kafka_connection.to_string(),
r#type: "file".to_string(),
connection: write_buffer_dir.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig {
n_sequencers: 1,
options: kafka_sequencer_options(),
..Default::default()
}),
..Default::default()
};
@ -283,7 +251,7 @@ async fn cant_write_to_db_reading_from_kafka() {
.build(server.grpc_channel())
.await;
// Writing to this database is an error; all data comes from Kafka
// Writing to this database is an error; all data comes from write buffer
let mut write_client = server.write_client();
let err = write_client
.write(&db_name, "temp,region=south color=1")
@ -302,15 +270,15 @@ async fn cant_write_to_db_reading_from_kafka() {
#[tokio::test]
async fn test_create_database_missing_write_buffer_sequencers() {
let kafka_connection = maybe_skip_kafka_integration!();
let write_buffer_dir = TempDir::new().unwrap();
// set up a database to read from Kafka
// set up a database to read from write buffer
let server = ServerFixture::create_shared().await;
let db_name = rand_name();
let write_buffer_connection = WriteBufferConnection {
direction: WriteBufferDirection::Read.into(),
r#type: "kafka".to_string(),
connection: kafka_connection.to_string(),
r#type: "file".to_string(),
connection: write_buffer_dir.path().display().to_string(),
..Default::default()
};
@ -325,3 +293,89 @@ async fn test_create_database_missing_write_buffer_sequencers() {
&err
);
}
#[tokio::test]
pub async fn test_cross_write_buffer_tracing() {
let write_buffer_dir = TempDir::new().unwrap();
// setup tracing
let udp_capture = UdpCapture::new().await;
let test_config = TestConfig::new()
.with_env("TRACES_EXPORTER", "jaeger")
.with_env("TRACES_EXPORTER_JAEGER_AGENT_HOST", udp_capture.ip())
.with_env("TRACES_EXPORTER_JAEGER_AGENT_PORT", udp_capture.port())
.with_client_header("jaeger-debug-id", "some-debug-id");
// we need to use two servers but the same DB name here because the write buffer topic is named after the DB name
let db_name = rand_name();
// create producer server
let server_write = ServerFixture::create_single_use_with_config(test_config.clone()).await;
server_write
.management_client()
.update_server_id(1)
.await
.unwrap();
server_write.wait_server_initialized().await;
let conn_write = WriteBufferConnection {
direction: WriteBufferDirection::Write.into(),
r#type: "file".to_string(),
connection: write_buffer_dir.path().display().to_string(),
creation_config: Some(WriteBufferCreationConfig {
n_sequencers: 1,
..Default::default()
}),
..Default::default()
};
DatabaseBuilder::new(db_name.clone())
.write_buffer(conn_write.clone())
.build(server_write.grpc_channel())
.await;
// create consumer DB
let server_read = ServerFixture::create_single_use_with_config(test_config).await;
server_read
.management_client()
.update_server_id(2)
.await
.unwrap();
server_read.wait_server_initialized().await;
let conn_read = WriteBufferConnection {
direction: WriteBufferDirection::Read.into(),
..conn_write
};
DatabaseBuilder::new(db_name.clone())
.write_buffer(conn_read)
.build(server_read.grpc_channel())
.await;
// write some points
let mut write_client = server_write.write_client();
let lp_lines = [
"cpu,region=west user=23.2 100",
"cpu,region=west user=21.0 150",
"disk,region=east bytes=99i 200",
];
let num_lines_written = write_client
.write(&db_name, lp_lines.join("\n"))
.await
.expect("cannot write");
assert_eq!(num_lines_written, 3);
// "shallow" packet inspection and verify the UDP server got
// something that had some expected results (maybe we could
// eventually verify the payload here too)
udp_capture
.wait_for(|m| m.to_string().contains("IOx write buffer"))
.await;
udp_capture
.wait_for(|m| m.to_string().contains("stored entry"))
.await;
// // debugging assistance
// tokio::time::sleep(std::time::Duration::from_secs(10)).await;
// println!("Traces received (1):\n\n{:#?}", udp_capture.messages());
// wait for the UDP server to shutdown
udp_capture.stop().await
}

View File

@ -2,11 +2,11 @@
name = "influxdb_iox_client"
version = "0.1.0"
authors = ["Dom Dwyer <dom@itsallbroken.com>"]
edition = "2018"
edition = "2021"
[features]
flight = ["arrow", "arrow-flight", "arrow_util", "serde/derive", "serde_json", "futures-util"]
format = ["arrow"]
format = ["arrow", "arrow_util"]
[dependencies]
# Workspace dependencies, in alphabetical order

Some files were not shown because too many files have changed in this diff Show More