feat: Make a top-level influxdb_iox feature for kafka functionality

Thread the feature through router and server to the write buffer crate.
Move an end-to-end test that uses Kafka behind the feature flag.
pull/24376/head
Carol (Nichols || Goulding) 2021-12-08 17:04:27 -05:00
parent 8c7b3966de
commit 471c3181bb
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
6 changed files with 48 additions and 31 deletions

View File

@ -124,3 +124,7 @@ jemalloc_replacing_malloc = ["tikv-jemalloc-sys"]
# Implicit feature selected when running under `clippy --all-features` to accept mutable exclusive features during
# linting
clippy = []
# Enable the write buffer implemented with Kafka. Disabled by default to save build time when not
# working on Kafka write buffer-related code.
kafka = ["router/kafka", "server/kafka"]

View File

@ -0,0 +1,34 @@
use crate::common::server_fixture::{ServerFixture, ServerType};
use influxdb_iox_client::management::generated_types::*;
use std::time::Instant;
use test_helpers::assert_contains;
#[tokio::test]
async fn test_create_database_invalid_kafka() {
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
let mut client = server_fixture.management_client();
let rules = DatabaseRules {
name: "db_with_bad_kafka_address".into(),
write_buffer_connection: Some(WriteBufferConnection {
r#type: "kafka".into(),
connection: "i_am_not_a_kafka_server:1234".into(),
..Default::default()
}),
..Default::default()
};
let start = Instant::now();
let err = client
.create_database(rules)
.await
.expect_err("expected request to fail");
println!("Failed after {:?}", Instant::now() - start);
// expect that this error has a useful error related to kafka (not "timeout")
assert_contains!(
err.to_string(),
"error creating write buffer: Meta data fetch error: BrokerTransportFailure"
);
}

View File

@ -7,7 +7,6 @@ use influxdb_iox_client::{
generated_types::{database_status::DatabaseState, operation_metadata::Job, *},
Client,
},
router::generated_types::WriteBufferConnection,
};
use std::{fs::set_permissions, num::NonZeroU32, os::unix::fs::PermissionsExt};
use test_helpers::{assert_contains, assert_error};
@ -85,36 +84,6 @@ async fn test_create_database_invalid_name() {
}
}
#[tokio::test]
async fn test_create_database_invalid_kafka() {
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
let mut client = server_fixture.management_client();
let rules = DatabaseRules {
name: "db_with_bad_kafka_address".into(),
write_buffer_connection: Some(WriteBufferConnection {
r#type: "kafka".into(),
connection: "i_am_not_a_kafka_server:1234".into(),
..Default::default()
}),
..Default::default()
};
let start = Instant::now();
let err = client
.create_database(rules)
.await
.expect_err("expected request to fail");
println!("Failed after {:?}", Instant::now() - start);
// expect that this error has a useful error related to kafka (not "timeout")
assert_contains!(
err.to_string(),
"error creating write buffer: Meta data fetch error: BrokerTransportFailure"
);
}
#[tokio::test]
async fn test_list_databases() {
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;

View File

@ -8,6 +8,10 @@ mod flight_api;
mod freeze;
mod http;
mod influxdb_ioxd;
#[cfg(feature = "kafka")]
mod kafka;
mod management_api;
mod management_cli;
mod metrics;

View File

@ -25,3 +25,6 @@ workspace-hack = { path = "../workspace-hack"}
mutable_batch_lp = { path = "../mutable_batch_lp" }
regex = "1"
tokio = { version = "1.13", features = ["macros", "parking_lot"] }
[features]
kafka = ["write_buffer/kafka"]

View File

@ -65,5 +65,8 @@ test_helpers = { path = "../test_helpers" }
[features]
default = []
# Enable features for benchmarking
bench = ["mutable_buffer/nocache"]
kafka = ["write_buffer/kafka"]