Merge pull request #3347 from influxdata/cn/optional-kafka
feat: Optionally build Kafkapull/24376/head
commit
bef1024fbb
|
@ -185,7 +185,7 @@ jobs:
|
||||||
- cache_restore
|
- cache_restore
|
||||||
- run:
|
- run:
|
||||||
name: Cargo test
|
name: Cargo test
|
||||||
command: cargo test --workspace
|
command: cargo test --workspace --features=kafka
|
||||||
- cache_save
|
- cache_save
|
||||||
|
|
||||||
# end to end tests with Heappy (heap profiling enabled)
|
# end to end tests with Heappy (heap profiling enabled)
|
||||||
|
@ -240,7 +240,7 @@ jobs:
|
||||||
- cargo-lock-{{ checksum "Cargo.lock" }}
|
- cargo-lock-{{ checksum "Cargo.lock" }}
|
||||||
- run:
|
- run:
|
||||||
name: Prime Rust build cache
|
name: Prime Rust build cache
|
||||||
command: cargo build --package influxdb_iox --bin influxdb_iox --package iox_data_generator --bin iox_data_generator
|
command: cargo build --package influxdb_iox --bin influxdb_iox --package iox_data_generator --bin iox_data_generator --features=kafka
|
||||||
- save_cache:
|
- save_cache:
|
||||||
key: cargo-lock-{{ checksum "Cargo.lock" }}
|
key: cargo-lock-{{ checksum "Cargo.lock" }}
|
||||||
paths:
|
paths:
|
||||||
|
@ -277,8 +277,8 @@ jobs:
|
||||||
name: Build benches
|
name: Build benches
|
||||||
command: cargo test --workspace --benches --no-run
|
command: cargo test --workspace --benches --no-run
|
||||||
- run:
|
- run:
|
||||||
name: Build with object store + exporter support + HEAP profiling
|
name: Build with object store + exporter support + HEAP profiling + kafka
|
||||||
command: cargo build --no-default-features --features="aws,gcp,azure,heappy,pprof"
|
command: cargo build --no-default-features --features="aws,gcp,azure,heappy,pprof,kafka"
|
||||||
- cache_save
|
- cache_save
|
||||||
|
|
||||||
# Lint protobufs.
|
# Lint protobufs.
|
||||||
|
|
|
@ -15,7 +15,7 @@ WORKDIR /influxdb_iox
|
||||||
|
|
||||||
ARG CARGO_INCREMENTAL=yes
|
ARG CARGO_INCREMENTAL=yes
|
||||||
ARG PROFILE=release
|
ARG PROFILE=release
|
||||||
ARG FEATURES=aws,gcp,azure,jemalloc_replacing_malloc
|
ARG FEATURES=aws,gcp,azure,jemalloc_replacing_malloc,kafka
|
||||||
ARG ROARING_ARCH="haswell"
|
ARG ROARING_ARCH="haswell"
|
||||||
ARG RUSTFLAGS=""
|
ARG RUSTFLAGS=""
|
||||||
ENV CARGO_INCREMENTAL=$CARGO_INCREMENTAL \
|
ENV CARGO_INCREMENTAL=$CARGO_INCREMENTAL \
|
||||||
|
|
|
@ -124,3 +124,7 @@ jemalloc_replacing_malloc = ["tikv-jemalloc-sys"]
|
||||||
# Implicit feature selected when running under `clippy --all-features` to accept mutable exclusive features during
|
# Implicit feature selected when running under `clippy --all-features` to accept mutable exclusive features during
|
||||||
# linting
|
# linting
|
||||||
clippy = []
|
clippy = []
|
||||||
|
|
||||||
|
# Enable the write buffer implemented with Kafka. Disabled by default to save build time when not
|
||||||
|
# working on Kafka write buffer-related code.
|
||||||
|
kafka = ["router/kafka", "server/kafka"]
|
||||||
|
|
|
@ -0,0 +1,34 @@
|
||||||
|
use crate::common::server_fixture::{ServerFixture, ServerType};
|
||||||
|
use influxdb_iox_client::management::generated_types::*;
|
||||||
|
use std::time::Instant;
|
||||||
|
use test_helpers::assert_contains;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_create_database_invalid_kafka() {
|
||||||
|
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
|
||||||
|
let mut client = server_fixture.management_client();
|
||||||
|
|
||||||
|
let rules = DatabaseRules {
|
||||||
|
name: "db_with_bad_kafka_address".into(),
|
||||||
|
write_buffer_connection: Some(WriteBufferConnection {
|
||||||
|
r#type: "kafka".into(),
|
||||||
|
connection: "i_am_not_a_kafka_server:1234".into(),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let start = Instant::now();
|
||||||
|
let err = client
|
||||||
|
.create_database(rules)
|
||||||
|
.await
|
||||||
|
.expect_err("expected request to fail");
|
||||||
|
|
||||||
|
println!("Failed after {:?}", Instant::now() - start);
|
||||||
|
|
||||||
|
// expect that this error has a useful error related to kafka (not "timeout")
|
||||||
|
assert_contains!(
|
||||||
|
err.to_string(),
|
||||||
|
"error creating write buffer: Meta data fetch error: BrokerTransportFailure"
|
||||||
|
);
|
||||||
|
}
|
|
@ -1,3 +1,10 @@
|
||||||
|
use crate::{
|
||||||
|
common::server_fixture::{ServerFixture, ServerType, TestConfig, DEFAULT_SERVER_ID},
|
||||||
|
end_to_end_cases::scenario::{
|
||||||
|
create_readable_database, create_two_partition_database, create_unreadable_database,
|
||||||
|
fixture_broken_catalog, rand_name, wait_for_exact_chunk_states, DatabaseBuilder,
|
||||||
|
},
|
||||||
|
};
|
||||||
use data_types::chunk_metadata::ChunkId;
|
use data_types::chunk_metadata::ChunkId;
|
||||||
use generated_types::google::protobuf::{Duration, Empty};
|
use generated_types::google::protobuf::{Duration, Empty};
|
||||||
use influxdb_iox_client::{
|
use influxdb_iox_client::{
|
||||||
|
@ -7,22 +14,9 @@ use influxdb_iox_client::{
|
||||||
generated_types::{database_status::DatabaseState, operation_metadata::Job, *},
|
generated_types::{database_status::DatabaseState, operation_metadata::Job, *},
|
||||||
Client,
|
Client,
|
||||||
},
|
},
|
||||||
router::generated_types::WriteBufferConnection,
|
|
||||||
};
|
};
|
||||||
use std::{fs::set_permissions, num::NonZeroU32, os::unix::fs::PermissionsExt};
|
use std::{fs::set_permissions, num::NonZeroU32, os::unix::fs::PermissionsExt, time::Instant};
|
||||||
use test_helpers::{assert_contains, assert_error};
|
use test_helpers::{assert_contains, assert_error};
|
||||||
|
|
||||||
use super::scenario::{
|
|
||||||
create_readable_database, create_two_partition_database, create_unreadable_database, rand_name,
|
|
||||||
};
|
|
||||||
use crate::common::server_fixture::{TestConfig, DEFAULT_SERVER_ID};
|
|
||||||
use crate::{
|
|
||||||
common::server_fixture::{ServerFixture, ServerType},
|
|
||||||
end_to_end_cases::scenario::{
|
|
||||||
fixture_broken_catalog, wait_for_exact_chunk_states, DatabaseBuilder,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
use std::time::Instant;
|
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -85,36 +79,6 @@ async fn test_create_database_invalid_name() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_create_database_invalid_kafka() {
|
|
||||||
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
|
|
||||||
let mut client = server_fixture.management_client();
|
|
||||||
|
|
||||||
let rules = DatabaseRules {
|
|
||||||
name: "db_with_bad_kafka_address".into(),
|
|
||||||
write_buffer_connection: Some(WriteBufferConnection {
|
|
||||||
r#type: "kafka".into(),
|
|
||||||
connection: "i_am_not_a_kafka_server:1234".into(),
|
|
||||||
..Default::default()
|
|
||||||
}),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
let start = Instant::now();
|
|
||||||
let err = client
|
|
||||||
.create_database(rules)
|
|
||||||
.await
|
|
||||||
.expect_err("expected request to fail");
|
|
||||||
|
|
||||||
println!("Failed after {:?}", Instant::now() - start);
|
|
||||||
|
|
||||||
// expect that this error has a useful error related to kafka (not "timeout")
|
|
||||||
assert_contains!(
|
|
||||||
err.to_string(),
|
|
||||||
"error creating write buffer: Meta data fetch error: BrokerTransportFailure"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_list_databases() {
|
async fn test_list_databases() {
|
||||||
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
|
let server_fixture = ServerFixture::create_shared(ServerType::Database).await;
|
||||||
|
|
|
@ -8,6 +8,10 @@ mod flight_api;
|
||||||
mod freeze;
|
mod freeze;
|
||||||
mod http;
|
mod http;
|
||||||
mod influxdb_ioxd;
|
mod influxdb_ioxd;
|
||||||
|
|
||||||
|
#[cfg(feature = "kafka")]
|
||||||
|
mod kafka;
|
||||||
|
|
||||||
mod management_api;
|
mod management_api;
|
||||||
mod management_cli;
|
mod management_cli;
|
||||||
mod metrics;
|
mod metrics;
|
||||||
|
|
|
@ -397,7 +397,7 @@ def cargo_build_iox(debug=False, build_with_aws=True):
|
||||||
t = time.time()
|
t = time.time()
|
||||||
print('building IOx')
|
print('building IOx')
|
||||||
|
|
||||||
features = []
|
features = ['kafka']
|
||||||
if build_with_aws:
|
if build_with_aws:
|
||||||
features.append('aws')
|
features.append('aws')
|
||||||
features = ','.join(features)
|
features = ','.join(features)
|
||||||
|
|
|
@ -25,3 +25,6 @@ workspace-hack = { path = "../workspace-hack"}
|
||||||
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
mutable_batch_lp = { path = "../mutable_batch_lp" }
|
||||||
regex = "1"
|
regex = "1"
|
||||||
tokio = { version = "1.13", features = ["macros", "parking_lot"] }
|
tokio = { version = "1.13", features = ["macros", "parking_lot"] }
|
||||||
|
|
||||||
|
[features]
|
||||||
|
kafka = ["write_buffer/kafka"]
|
||||||
|
|
|
@ -65,5 +65,8 @@ test_helpers = { path = "../test_helpers" }
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
|
|
||||||
# Enable features for benchmarking
|
# Enable features for benchmarking
|
||||||
bench = ["mutable_buffer/nocache"]
|
bench = ["mutable_buffer/nocache"]
|
||||||
|
|
||||||
|
kafka = ["write_buffer/kafka"]
|
||||||
|
|
|
@ -19,7 +19,7 @@ observability_deps = { path = "../observability_deps" }
|
||||||
parking_lot = "0.11.2"
|
parking_lot = "0.11.2"
|
||||||
pin-project = "1.0"
|
pin-project = "1.0"
|
||||||
prost = "0.8"
|
prost = "0.8"
|
||||||
rdkafka = "0.28.0"
|
rdkafka = { version = "0.28.0", optional = true }
|
||||||
time = { path = "../time" }
|
time = { path = "../time" }
|
||||||
tokio = { version = "1.13", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] }
|
tokio = { version = "1.13", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] }
|
||||||
tokio-util = "0.6.9"
|
tokio-util = "0.6.9"
|
||||||
|
@ -28,6 +28,9 @@ trace_http = { path = "../trace_http" }
|
||||||
uuid = { version = "0.8", features = ["v4"] }
|
uuid = { version = "0.8", features = ["v4"] }
|
||||||
workspace-hack = { path = "../workspace-hack"}
|
workspace-hack = { path = "../workspace-hack"}
|
||||||
|
|
||||||
|
[features]
|
||||||
|
kafka = ["rdkafka"]
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tempfile = "3.1.0"
|
tempfile = "3.1.0"
|
||||||
|
|
||||||
|
|
|
@ -100,11 +100,13 @@ impl IoxHeaders {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gets the content type
|
/// Gets the content type
|
||||||
|
#[allow(dead_code)] // this function is only used in optionally-compiled kafka code
|
||||||
pub fn content_type(&self) -> ContentType {
|
pub fn content_type(&self) -> ContentType {
|
||||||
self.content_type
|
self.content_type
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gets the span context if any
|
/// Gets the span context if any
|
||||||
|
#[allow(dead_code)] // this function is only used in optionally-compiled kafka code
|
||||||
pub fn span_context(&self) -> Option<&SpanContext> {
|
pub fn span_context(&self) -> Option<&SpanContext> {
|
||||||
self.span_context.as_ref()
|
self.span_context.as_ref()
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,24 +1,21 @@
|
||||||
|
use crate::{
|
||||||
|
core::{WriteBufferError, WriteBufferReading, WriteBufferWriting},
|
||||||
|
file::{FileBufferConsumer, FileBufferProducer},
|
||||||
|
mock::{
|
||||||
|
MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting,
|
||||||
|
MockBufferForWritingThatAlwaysErrors, MockBufferSharedState,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use data_types::{server_id::ServerId, write_buffer::WriteBufferConnection};
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use std::{
|
use std::{
|
||||||
collections::{btree_map::Entry, BTreeMap},
|
collections::{btree_map::Entry, BTreeMap},
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
use data_types::{server_id::ServerId, write_buffer::WriteBufferConnection};
|
|
||||||
use time::TimeProvider;
|
use time::TimeProvider;
|
||||||
use trace::TraceCollector;
|
use trace::TraceCollector;
|
||||||
|
|
||||||
use crate::{
|
|
||||||
core::{WriteBufferError, WriteBufferReading, WriteBufferWriting},
|
|
||||||
file::{FileBufferConsumer, FileBufferProducer},
|
|
||||||
kafka::{KafkaBufferConsumer, KafkaBufferProducer},
|
|
||||||
mock::{
|
|
||||||
MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting,
|
|
||||||
MockBufferForWritingThatAlwaysErrors, MockBufferSharedState,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum WriteBufferConfig {
|
pub enum WriteBufferConfig {
|
||||||
Writing(Arc<dyn WriteBufferWriting>),
|
Writing(Arc<dyn WriteBufferWriting>),
|
||||||
|
@ -37,6 +34,7 @@ enum Mock {
|
||||||
pub struct WriteBufferConfigFactory {
|
pub struct WriteBufferConfigFactory {
|
||||||
mocks: RwLock<BTreeMap<String, Mock>>,
|
mocks: RwLock<BTreeMap<String, Mock>>,
|
||||||
time_provider: Arc<dyn TimeProvider>,
|
time_provider: Arc<dyn TimeProvider>,
|
||||||
|
#[allow(dead_code)] // this field is only used in optionally-compiled kafka code
|
||||||
metric_registry: Arc<metric::Registry>,
|
metric_registry: Arc<metric::Registry>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,18 +106,7 @@ impl WriteBufferConfigFactory {
|
||||||
.await?;
|
.await?;
|
||||||
Arc::new(file_buffer) as _
|
Arc::new(file_buffer) as _
|
||||||
}
|
}
|
||||||
"kafka" => {
|
"kafka" => self.kafka_buffer_producer(db_name, cfg).await?,
|
||||||
let kafka_buffer = KafkaBufferProducer::new(
|
|
||||||
&cfg.connection,
|
|
||||||
db_name,
|
|
||||||
&cfg.connection_config,
|
|
||||||
cfg.creation_config.as_ref(),
|
|
||||||
Arc::clone(&self.time_provider),
|
|
||||||
&self.metric_registry,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
Arc::new(kafka_buffer) as _
|
|
||||||
}
|
|
||||||
"mock" => match self.get_mock(&cfg.connection)? {
|
"mock" => match self.get_mock(&cfg.connection)? {
|
||||||
Mock::Normal(state) => {
|
Mock::Normal(state) => {
|
||||||
let mock_buffer = MockBufferForWriting::new(
|
let mock_buffer = MockBufferForWriting::new(
|
||||||
|
@ -142,6 +129,38 @@ impl WriteBufferConfigFactory {
|
||||||
Ok(writer)
|
Ok(writer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "kafka")]
|
||||||
|
async fn kafka_buffer_producer(
|
||||||
|
&self,
|
||||||
|
db_name: &str,
|
||||||
|
cfg: &WriteBufferConnection,
|
||||||
|
) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> {
|
||||||
|
let kafka_buffer = crate::kafka::KafkaBufferProducer::new(
|
||||||
|
&cfg.connection,
|
||||||
|
db_name,
|
||||||
|
&cfg.connection_config,
|
||||||
|
cfg.creation_config.as_ref(),
|
||||||
|
Arc::clone(&self.time_provider),
|
||||||
|
&self.metric_registry,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(Arc::new(kafka_buffer) as _)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "kafka"))]
|
||||||
|
async fn kafka_buffer_producer(
|
||||||
|
&self,
|
||||||
|
_db_name: &str,
|
||||||
|
_cfg: &WriteBufferConnection,
|
||||||
|
) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> {
|
||||||
|
Err(String::from(
|
||||||
|
"`WriteBufferWriting` of type `kafka` requested, but Kafka support was not included \
|
||||||
|
in this build by enabling the `kafka` feature",
|
||||||
|
)
|
||||||
|
.into())
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`]
|
/// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`]
|
||||||
pub async fn new_config_read(
|
pub async fn new_config_read(
|
||||||
&self,
|
&self,
|
||||||
|
@ -163,17 +182,8 @@ impl WriteBufferConfigFactory {
|
||||||
Box::new(file_buffer) as _
|
Box::new(file_buffer) as _
|
||||||
}
|
}
|
||||||
"kafka" => {
|
"kafka" => {
|
||||||
let kafka_buffer = KafkaBufferConsumer::new(
|
self.kafka_buffer_consumer(server_id, db_name, trace_collector, cfg)
|
||||||
&cfg.connection,
|
.await?
|
||||||
server_id,
|
|
||||||
db_name,
|
|
||||||
&cfg.connection_config,
|
|
||||||
cfg.creation_config.as_ref(),
|
|
||||||
trace_collector,
|
|
||||||
&self.metric_registry,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
Box::new(kafka_buffer) as _
|
|
||||||
}
|
}
|
||||||
"mock" => match self.get_mock(&cfg.connection)? {
|
"mock" => match self.get_mock(&cfg.connection)? {
|
||||||
Mock::Normal(state) => {
|
Mock::Normal(state) => {
|
||||||
|
@ -193,21 +203,52 @@ impl WriteBufferConfigFactory {
|
||||||
|
|
||||||
Ok(reader)
|
Ok(reader)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "kafka")]
|
||||||
|
async fn kafka_buffer_consumer(
|
||||||
|
&self,
|
||||||
|
server_id: ServerId,
|
||||||
|
db_name: &str,
|
||||||
|
trace_collector: Option<&Arc<dyn TraceCollector>>,
|
||||||
|
cfg: &WriteBufferConnection,
|
||||||
|
) -> Result<Box<dyn WriteBufferReading>, WriteBufferError> {
|
||||||
|
let kafka_buffer = crate::kafka::KafkaBufferConsumer::new(
|
||||||
|
&cfg.connection,
|
||||||
|
server_id,
|
||||||
|
db_name,
|
||||||
|
&cfg.connection_config,
|
||||||
|
cfg.creation_config.as_ref(),
|
||||||
|
trace_collector,
|
||||||
|
&self.metric_registry,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(Box::new(kafka_buffer) as _)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "kafka"))]
|
||||||
|
async fn kafka_buffer_consumer(
|
||||||
|
&self,
|
||||||
|
_server_id: ServerId,
|
||||||
|
_db_name: &str,
|
||||||
|
_trace_collector: Option<&Arc<dyn TraceCollector>>,
|
||||||
|
_cfg: &WriteBufferConnection,
|
||||||
|
) -> Result<Box<dyn WriteBufferReading>, WriteBufferError> {
|
||||||
|
Err(String::from(
|
||||||
|
"`WriteBufferReading` of type `kafka` requested, but Kafka support was not included \
|
||||||
|
in this build by enabling the `kafka` feature",
|
||||||
|
)
|
||||||
|
.into())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::{convert::TryFrom, num::NonZeroU32};
|
|
||||||
|
|
||||||
use data_types::{write_buffer::WriteBufferCreationConfig, DatabaseName};
|
|
||||||
use tempfile::TempDir;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
kafka::test_utils::random_kafka_topic, maybe_skip_kafka_integration,
|
|
||||||
mock::MockBufferSharedState,
|
|
||||||
};
|
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::{core::test_utils::random_topic_name, mock::MockBufferSharedState};
|
||||||
|
use data_types::{write_buffer::WriteBufferCreationConfig, DatabaseName};
|
||||||
|
use std::{convert::TryFrom, num::NonZeroU32};
|
||||||
|
use tempfile::TempDir;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_writing_file() {
|
async fn test_writing_file() {
|
||||||
|
@ -248,46 +289,6 @@ mod tests {
|
||||||
assert_eq!(conn.type_name(), "file");
|
assert_eq!(conn.type_name(), "file");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_writing_kafka() {
|
|
||||||
let conn = maybe_skip_kafka_integration!();
|
|
||||||
let factory = factory();
|
|
||||||
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
|
|
||||||
let cfg = WriteBufferConnection {
|
|
||||||
type_: "kafka".to_string(),
|
|
||||||
connection: conn,
|
|
||||||
creation_config: Some(WriteBufferCreationConfig::default()),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
let conn = factory
|
|
||||||
.new_config_write(db_name.as_str(), &cfg)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(conn.type_name(), "kafka");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_reading_kafka() {
|
|
||||||
let conn = maybe_skip_kafka_integration!();
|
|
||||||
let factory = factory();
|
|
||||||
let server_id = ServerId::try_from(1).unwrap();
|
|
||||||
|
|
||||||
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
|
|
||||||
let cfg = WriteBufferConnection {
|
|
||||||
type_: "kafka".to_string(),
|
|
||||||
connection: conn,
|
|
||||||
creation_config: Some(WriteBufferCreationConfig::default()),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
let conn = factory
|
|
||||||
.new_config_read(server_id, db_name.as_str(), None, &cfg)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(conn.type_name(), "kafka");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_writing_mock() {
|
async fn test_writing_mock() {
|
||||||
let factory = factory();
|
let factory = factory();
|
||||||
|
@ -297,7 +298,7 @@ mod tests {
|
||||||
let mock_name = "some_mock";
|
let mock_name = "some_mock";
|
||||||
factory.register_mock(mock_name.to_string(), state);
|
factory.register_mock(mock_name.to_string(), state);
|
||||||
|
|
||||||
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
|
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
|
||||||
let cfg = WriteBufferConnection {
|
let cfg = WriteBufferConnection {
|
||||||
type_: "mock".to_string(),
|
type_: "mock".to_string(),
|
||||||
connection: mock_name.to_string(),
|
connection: mock_name.to_string(),
|
||||||
|
@ -333,7 +334,7 @@ mod tests {
|
||||||
factory.register_mock(mock_name.to_string(), state);
|
factory.register_mock(mock_name.to_string(), state);
|
||||||
|
|
||||||
let server_id = ServerId::try_from(1).unwrap();
|
let server_id = ServerId::try_from(1).unwrap();
|
||||||
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
|
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
|
||||||
let cfg = WriteBufferConnection {
|
let cfg = WriteBufferConnection {
|
||||||
type_: "mock".to_string(),
|
type_: "mock".to_string(),
|
||||||
connection: mock_name.to_string(),
|
connection: mock_name.to_string(),
|
||||||
|
@ -366,7 +367,7 @@ mod tests {
|
||||||
let mock_name = "some_mock";
|
let mock_name = "some_mock";
|
||||||
factory.register_always_fail_mock(mock_name.to_string());
|
factory.register_always_fail_mock(mock_name.to_string());
|
||||||
|
|
||||||
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
|
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
|
||||||
let cfg = WriteBufferConnection {
|
let cfg = WriteBufferConnection {
|
||||||
type_: "mock".to_string(),
|
type_: "mock".to_string(),
|
||||||
connection: mock_name.to_string(),
|
connection: mock_name.to_string(),
|
||||||
|
@ -444,4 +445,99 @@ mod tests {
|
||||||
let registry = Arc::new(metric::Registry::new());
|
let registry = Arc::new(metric::Registry::new());
|
||||||
WriteBufferConfigFactory::new(time, registry)
|
WriteBufferConfigFactory::new(time, registry)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "kafka")]
|
||||||
|
mod kafka {
|
||||||
|
use super::*;
|
||||||
|
use crate::maybe_skip_kafka_integration;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_writing_kafka() {
|
||||||
|
let conn = maybe_skip_kafka_integration!();
|
||||||
|
let factory = factory();
|
||||||
|
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
|
||||||
|
let cfg = WriteBufferConnection {
|
||||||
|
type_: "kafka".to_string(),
|
||||||
|
connection: conn,
|
||||||
|
creation_config: Some(WriteBufferCreationConfig::default()),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let conn = factory
|
||||||
|
.new_config_write(db_name.as_str(), &cfg)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(conn.type_name(), "kafka");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_reading_kafka() {
|
||||||
|
let conn = maybe_skip_kafka_integration!();
|
||||||
|
let factory = factory();
|
||||||
|
let server_id = ServerId::try_from(1).unwrap();
|
||||||
|
|
||||||
|
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
|
||||||
|
let cfg = WriteBufferConnection {
|
||||||
|
type_: "kafka".to_string(),
|
||||||
|
connection: conn,
|
||||||
|
creation_config: Some(WriteBufferCreationConfig::default()),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let conn = factory
|
||||||
|
.new_config_read(server_id, db_name.as_str(), None, &cfg)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(conn.type_name(), "kafka");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "kafka"))]
|
||||||
|
mod no_kafka {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn writing_to_kafka_without_kafka_feature_returns_error() {
|
||||||
|
let factory = factory();
|
||||||
|
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
|
||||||
|
let cfg = WriteBufferConnection {
|
||||||
|
type_: "kafka".to_string(),
|
||||||
|
creation_config: Some(WriteBufferCreationConfig::default()),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let err = factory
|
||||||
|
.new_config_write(db_name.as_str(), &cfg)
|
||||||
|
.await
|
||||||
|
.unwrap_err();
|
||||||
|
assert_eq!(
|
||||||
|
err.to_string(),
|
||||||
|
"`WriteBufferWriting` of type `kafka` requested, but Kafka support was not \
|
||||||
|
included in this build by enabling the `kafka` feature"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn reading_from_kafka_without_kafka_feature_returns_error() {
|
||||||
|
let factory = factory();
|
||||||
|
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
|
||||||
|
let server_id = ServerId::try_from(1).unwrap();
|
||||||
|
let cfg = WriteBufferConnection {
|
||||||
|
type_: "kafka".to_string(),
|
||||||
|
creation_config: Some(WriteBufferCreationConfig::default()),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let err = factory
|
||||||
|
.new_config_read(server_id, db_name.as_str(), None, &cfg)
|
||||||
|
.await
|
||||||
|
.unwrap_err();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
err.to_string(),
|
||||||
|
"`WriteBufferReading` of type `kafka` requested, but Kafka support was not \
|
||||||
|
included in this build by enabling the `kafka` feature"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -99,6 +99,10 @@ pub trait WriteBufferReading: Sync + Send + Debug + 'static {
|
||||||
|
|
||||||
pub mod test_utils {
|
pub mod test_utils {
|
||||||
//! Generic tests for all write buffer implementations.
|
//! Generic tests for all write buffer implementations.
|
||||||
|
use super::{WriteBufferError, WriteBufferReading, WriteBufferWriting};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use dml::{test_util::assert_write_op_eq, DmlMeta, DmlOperation, DmlWrite};
|
||||||
|
use futures::{StreamExt, TryStreamExt};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{BTreeMap, BTreeSet},
|
collections::{BTreeMap, BTreeSet},
|
||||||
convert::TryFrom,
|
convert::TryFrom,
|
||||||
|
@ -106,14 +110,14 @@ pub mod test_utils {
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use dml::{test_util::assert_write_op_eq, DmlMeta, DmlOperation, DmlWrite};
|
|
||||||
use futures::{StreamExt, TryStreamExt};
|
|
||||||
use time::{Time, TimeProvider};
|
use time::{Time, TimeProvider};
|
||||||
use trace::{ctx::SpanContext, RingBufferTraceCollector, TraceCollector};
|
use trace::{ctx::SpanContext, RingBufferTraceCollector, TraceCollector};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use super::{WriteBufferError, WriteBufferReading, WriteBufferWriting};
|
/// Generated random topic name for testing.
|
||||||
|
pub fn random_topic_name() -> String {
|
||||||
|
format!("test_topic_{}", Uuid::new_v4())
|
||||||
|
}
|
||||||
|
|
||||||
/// Adapter to make a concrete write buffer implementation work w/ [`perform_generic_tests`].
|
/// Adapter to make a concrete write buffer implementation work w/ [`perform_generic_tests`].
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|
|
@ -1,11 +1,3 @@
|
||||||
use std::{
|
|
||||||
collections::{BTreeMap, BTreeSet},
|
|
||||||
convert::{TryFrom, TryInto},
|
|
||||||
num::NonZeroU32,
|
|
||||||
sync::Arc,
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use futures::{FutureExt, StreamExt};
|
use futures::{FutureExt, StreamExt};
|
||||||
use metric::{Metric, U64Gauge, U64Histogram, U64HistogramOptions};
|
use metric::{Metric, U64Gauge, U64Histogram, U64HistogramOptions};
|
||||||
|
@ -23,15 +15,13 @@ use rdkafka::{
|
||||||
util::Timeout,
|
util::Timeout,
|
||||||
ClientConfig, ClientContext, Message, Offset, TopicPartitionList,
|
ClientConfig, ClientContext, Message, Offset, TopicPartitionList,
|
||||||
};
|
};
|
||||||
|
use std::{
|
||||||
use data_types::{
|
collections::{BTreeMap, BTreeSet},
|
||||||
sequence::Sequence, server_id::ServerId, write_buffer::WriteBufferCreationConfig,
|
convert::{TryFrom, TryInto},
|
||||||
|
num::NonZeroU32,
|
||||||
|
sync::Arc,
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
use dml::{DmlMeta, DmlOperation};
|
|
||||||
use observability_deps::tracing::{debug, info};
|
|
||||||
use time::{Time, TimeProvider};
|
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
use trace::TraceCollector;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
codec::{ContentType, IoxHeaders},
|
codec::{ContentType, IoxHeaders},
|
||||||
|
@ -40,6 +30,14 @@ use crate::{
|
||||||
WriteBufferWriting, WriteStream,
|
WriteBufferWriting, WriteStream,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
use data_types::{
|
||||||
|
sequence::Sequence, server_id::ServerId, write_buffer::WriteBufferCreationConfig,
|
||||||
|
};
|
||||||
|
use dml::{DmlMeta, DmlOperation};
|
||||||
|
use observability_deps::tracing::{debug, info};
|
||||||
|
use time::{Time, TimeProvider};
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
use trace::TraceCollector;
|
||||||
|
|
||||||
/// Default timeout supplied to rdkafka client for kafka operations.
|
/// Default timeout supplied to rdkafka client for kafka operations.
|
||||||
///
|
///
|
||||||
|
@ -745,12 +743,9 @@ impl ClientContext for ClientContextImpl {
|
||||||
impl ConsumerContext for ClientContextImpl {}
|
impl ConsumerContext for ClientContextImpl {}
|
||||||
|
|
||||||
pub mod test_utils {
|
pub mod test_utils {
|
||||||
use std::{collections::BTreeMap, time::Duration};
|
|
||||||
|
|
||||||
use rdkafka::admin::{AdminOptions, AlterConfig, ResourceSpecifier};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use super::admin_client;
|
use super::admin_client;
|
||||||
|
use rdkafka::admin::{AdminOptions, AlterConfig, ResourceSpecifier};
|
||||||
|
use std::{collections::BTreeMap, time::Duration};
|
||||||
|
|
||||||
/// Get the testing Kafka connection string or return current scope.
|
/// Get the testing Kafka connection string or return current scope.
|
||||||
///
|
///
|
||||||
|
@ -829,37 +824,28 @@ pub mod test_utils {
|
||||||
let result = results.pop().expect("just checked the vector length");
|
let result = results.pop().expect("just checked the vector length");
|
||||||
result.unwrap();
|
result.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generated random topic name for testing.
|
|
||||||
pub fn random_kafka_topic() -> String {
|
|
||||||
format!("test_topic_{}", Uuid::new_v4())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Kafka tests (only run when in integration test mode and kafka is running).
|
/// Kafka tests (only run when in integration test mode and kafka is running).
|
||||||
/// see [`crate::maybe_skip_kafka_integration`] for more details.
|
/// see [`crate::maybe_skip_kafka_integration`] for more details.
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
use super::{test_utils::kafka_sequencer_options, *};
|
||||||
|
use crate::{
|
||||||
|
codec::HEADER_CONTENT_TYPE,
|
||||||
|
core::test_utils::{
|
||||||
|
map_pop_first, perform_generic_tests, random_topic_name, set_pop_first,
|
||||||
|
write as write_to_writer, TestAdapter, TestContext,
|
||||||
|
},
|
||||||
|
maybe_skip_kafka_integration,
|
||||||
|
};
|
||||||
use std::{
|
use std::{
|
||||||
num::NonZeroU32,
|
num::NonZeroU32,
|
||||||
sync::atomic::{AtomicU32, Ordering},
|
sync::atomic::{AtomicU32, Ordering},
|
||||||
};
|
};
|
||||||
|
|
||||||
use time::TimeProvider;
|
use time::TimeProvider;
|
||||||
use trace::{RingBufferTraceCollector, TraceCollector};
|
use trace::{RingBufferTraceCollector, TraceCollector};
|
||||||
|
|
||||||
use crate::codec::HEADER_CONTENT_TYPE;
|
|
||||||
use crate::{
|
|
||||||
core::test_utils::{
|
|
||||||
map_pop_first, perform_generic_tests, set_pop_first, write as write_to_writer,
|
|
||||||
TestAdapter, TestContext,
|
|
||||||
},
|
|
||||||
kafka::test_utils::random_kafka_topic,
|
|
||||||
maybe_skip_kafka_integration,
|
|
||||||
};
|
|
||||||
|
|
||||||
use super::{test_utils::kafka_sequencer_options, *};
|
|
||||||
|
|
||||||
struct KafkaTestAdapter {
|
struct KafkaTestAdapter {
|
||||||
conn: String,
|
conn: String,
|
||||||
}
|
}
|
||||||
|
@ -881,7 +867,7 @@ mod tests {
|
||||||
) -> Self::Context {
|
) -> Self::Context {
|
||||||
KafkaTestContext {
|
KafkaTestContext {
|
||||||
conn: self.conn.clone(),
|
conn: self.conn.clone(),
|
||||||
database_name: random_kafka_topic(),
|
database_name: random_topic_name(),
|
||||||
server_id_counter: AtomicU32::new(1),
|
server_id_counter: AtomicU32::new(1),
|
||||||
n_sequencers,
|
n_sequencers,
|
||||||
time_provider,
|
time_provider,
|
||||||
|
@ -964,7 +950,7 @@ mod tests {
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn topic_create_twice() {
|
async fn topic_create_twice() {
|
||||||
let conn = maybe_skip_kafka_integration!();
|
let conn = maybe_skip_kafka_integration!();
|
||||||
let database_name = random_kafka_topic();
|
let database_name = random_topic_name();
|
||||||
|
|
||||||
create_kafka_topic(
|
create_kafka_topic(
|
||||||
&conn,
|
&conn,
|
||||||
|
|
|
@ -12,5 +12,8 @@ pub(crate) mod codec;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod core;
|
pub mod core;
|
||||||
pub mod file;
|
pub mod file;
|
||||||
|
|
||||||
|
#[cfg(feature = "kafka")]
|
||||||
pub mod kafka;
|
pub mod kafka;
|
||||||
|
|
||||||
pub mod mock;
|
pub mod mock;
|
||||||
|
|
Loading…
Reference in New Issue