diff --git a/.circleci/config.yml b/.circleci/config.yml index 664745a0e7..99a8004e9f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -185,7 +185,7 @@ jobs: - cache_restore - run: name: Cargo test - command: cargo test --workspace + command: cargo test --workspace --features=kafka - cache_save # end to end tests with Heappy (heap profiling enabled) @@ -240,7 +240,7 @@ jobs: - cargo-lock-{{ checksum "Cargo.lock" }} - run: name: Prime Rust build cache - command: cargo build --package influxdb_iox --bin influxdb_iox --package iox_data_generator --bin iox_data_generator + command: cargo build --package influxdb_iox --bin influxdb_iox --package iox_data_generator --bin iox_data_generator --features=kafka - save_cache: key: cargo-lock-{{ checksum "Cargo.lock" }} paths: @@ -277,8 +277,8 @@ jobs: name: Build benches command: cargo test --workspace --benches --no-run - run: - name: Build with object store + exporter support + HEAP profiling - command: cargo build --no-default-features --features="aws,gcp,azure,heappy,pprof" + name: Build with object store + exporter support + HEAP profiling + kafka + command: cargo build --no-default-features --features="aws,gcp,azure,heappy,pprof,kafka" - cache_save # Lint protobufs. diff --git a/Dockerfile b/Dockerfile index 24a47f82cc..7626feedbb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,7 +15,7 @@ WORKDIR /influxdb_iox ARG CARGO_INCREMENTAL=yes ARG PROFILE=release -ARG FEATURES=aws,gcp,azure,jemalloc_replacing_malloc +ARG FEATURES=aws,gcp,azure,jemalloc_replacing_malloc,kafka ARG ROARING_ARCH="haswell" ARG RUSTFLAGS="" ENV CARGO_INCREMENTAL=$CARGO_INCREMENTAL \ diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index f73cadb89c..bb6e0736bc 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -124,3 +124,7 @@ jemalloc_replacing_malloc = ["tikv-jemalloc-sys"] # Implicit feature selected when running under `clippy --all-features` to accept mutable exclusive features during # linting clippy = [] + +# Enable the write buffer implemented with Kafka. Disabled by default to save build time when not +# working on Kafka write buffer-related code. +kafka = ["router/kafka", "server/kafka"] diff --git a/influxdb_iox/tests/end_to_end_cases/kafka.rs b/influxdb_iox/tests/end_to_end_cases/kafka.rs new file mode 100644 index 0000000000..05a95cf793 --- /dev/null +++ b/influxdb_iox/tests/end_to_end_cases/kafka.rs @@ -0,0 +1,34 @@ +use crate::common::server_fixture::{ServerFixture, ServerType}; +use influxdb_iox_client::management::generated_types::*; +use std::time::Instant; +use test_helpers::assert_contains; + +#[tokio::test] +async fn test_create_database_invalid_kafka() { + let server_fixture = ServerFixture::create_shared(ServerType::Database).await; + let mut client = server_fixture.management_client(); + + let rules = DatabaseRules { + name: "db_with_bad_kafka_address".into(), + write_buffer_connection: Some(WriteBufferConnection { + r#type: "kafka".into(), + connection: "i_am_not_a_kafka_server:1234".into(), + ..Default::default() + }), + ..Default::default() + }; + + let start = Instant::now(); + let err = client + .create_database(rules) + .await + .expect_err("expected request to fail"); + + println!("Failed after {:?}", Instant::now() - start); + + // expect that this error has a useful error related to kafka (not "timeout") + assert_contains!( + err.to_string(), + "error creating write buffer: Meta data fetch error: BrokerTransportFailure" + ); +} diff --git a/influxdb_iox/tests/end_to_end_cases/management_api.rs b/influxdb_iox/tests/end_to_end_cases/management_api.rs index fdae0d48a8..b560adacab 100644 --- a/influxdb_iox/tests/end_to_end_cases/management_api.rs +++ b/influxdb_iox/tests/end_to_end_cases/management_api.rs @@ -1,3 +1,10 @@ +use crate::{ + common::server_fixture::{ServerFixture, ServerType, TestConfig, DEFAULT_SERVER_ID}, + end_to_end_cases::scenario::{ + create_readable_database, create_two_partition_database, create_unreadable_database, + fixture_broken_catalog, rand_name, wait_for_exact_chunk_states, DatabaseBuilder, + }, +}; use data_types::chunk_metadata::ChunkId; use generated_types::google::protobuf::{Duration, Empty}; use influxdb_iox_client::{ @@ -7,22 +14,9 @@ use influxdb_iox_client::{ generated_types::{database_status::DatabaseState, operation_metadata::Job, *}, Client, }, - router::generated_types::WriteBufferConnection, }; -use std::{fs::set_permissions, num::NonZeroU32, os::unix::fs::PermissionsExt}; +use std::{fs::set_permissions, num::NonZeroU32, os::unix::fs::PermissionsExt, time::Instant}; use test_helpers::{assert_contains, assert_error}; - -use super::scenario::{ - create_readable_database, create_two_partition_database, create_unreadable_database, rand_name, -}; -use crate::common::server_fixture::{TestConfig, DEFAULT_SERVER_ID}; -use crate::{ - common::server_fixture::{ServerFixture, ServerType}, - end_to_end_cases::scenario::{ - fixture_broken_catalog, wait_for_exact_chunk_states, DatabaseBuilder, - }, -}; -use std::time::Instant; use uuid::Uuid; #[tokio::test] @@ -85,36 +79,6 @@ async fn test_create_database_invalid_name() { } } -#[tokio::test] -async fn test_create_database_invalid_kafka() { - let server_fixture = ServerFixture::create_shared(ServerType::Database).await; - let mut client = server_fixture.management_client(); - - let rules = DatabaseRules { - name: "db_with_bad_kafka_address".into(), - write_buffer_connection: Some(WriteBufferConnection { - r#type: "kafka".into(), - connection: "i_am_not_a_kafka_server:1234".into(), - ..Default::default() - }), - ..Default::default() - }; - - let start = Instant::now(); - let err = client - .create_database(rules) - .await - .expect_err("expected request to fail"); - - println!("Failed after {:?}", Instant::now() - start); - - // expect that this error has a useful error related to kafka (not "timeout") - assert_contains!( - err.to_string(), - "error creating write buffer: Meta data fetch error: BrokerTransportFailure" - ); -} - #[tokio::test] async fn test_list_databases() { let server_fixture = ServerFixture::create_shared(ServerType::Database).await; diff --git a/influxdb_iox/tests/end_to_end_cases/mod.rs b/influxdb_iox/tests/end_to_end_cases/mod.rs index df22d58e96..a992e290a0 100644 --- a/influxdb_iox/tests/end_to_end_cases/mod.rs +++ b/influxdb_iox/tests/end_to_end_cases/mod.rs @@ -8,6 +8,10 @@ mod flight_api; mod freeze; mod http; mod influxdb_ioxd; + +#[cfg(feature = "kafka")] +mod kafka; + mod management_api; mod management_cli; mod metrics; diff --git a/perf/perf.py b/perf/perf.py index 64ced3dc02..10889df6d6 100755 --- a/perf/perf.py +++ b/perf/perf.py @@ -397,7 +397,7 @@ def cargo_build_iox(debug=False, build_with_aws=True): t = time.time() print('building IOx') - features = [] + features = ['kafka'] if build_with_aws: features.append('aws') features = ','.join(features) diff --git a/router/Cargo.toml b/router/Cargo.toml index 50658be6d6..9f27c2cad8 100644 --- a/router/Cargo.toml +++ b/router/Cargo.toml @@ -25,3 +25,6 @@ workspace-hack = { path = "../workspace-hack"} mutable_batch_lp = { path = "../mutable_batch_lp" } regex = "1" tokio = { version = "1.13", features = ["macros", "parking_lot"] } + +[features] +kafka = ["write_buffer/kafka"] diff --git a/server/Cargo.toml b/server/Cargo.toml index 53aebac984..1ddcb5b3ca 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -65,5 +65,8 @@ test_helpers = { path = "../test_helpers" } [features] default = [] + # Enable features for benchmarking bench = ["mutable_buffer/nocache"] + +kafka = ["write_buffer/kafka"] diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml index d3a69cb7bb..5472f2a4b4 100644 --- a/write_buffer/Cargo.toml +++ b/write_buffer/Cargo.toml @@ -19,7 +19,7 @@ observability_deps = { path = "../observability_deps" } parking_lot = "0.11.2" pin-project = "1.0" prost = "0.8" -rdkafka = "0.28.0" +rdkafka = { version = "0.28.0", optional = true } time = { path = "../time" } tokio = { version = "1.13", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] } tokio-util = "0.6.9" @@ -28,6 +28,9 @@ trace_http = { path = "../trace_http" } uuid = { version = "0.8", features = ["v4"] } workspace-hack = { path = "../workspace-hack"} +[features] +kafka = ["rdkafka"] + [dev-dependencies] tempfile = "3.1.0" diff --git a/write_buffer/src/codec.rs b/write_buffer/src/codec.rs index 89d5eb4d1e..38e39f4a84 100644 --- a/write_buffer/src/codec.rs +++ b/write_buffer/src/codec.rs @@ -100,11 +100,13 @@ impl IoxHeaders { } /// Gets the content type + #[allow(dead_code)] // this function is only used in optionally-compiled kafka code pub fn content_type(&self) -> ContentType { self.content_type } /// Gets the span context if any + #[allow(dead_code)] // this function is only used in optionally-compiled kafka code pub fn span_context(&self) -> Option<&SpanContext> { self.span_context.as_ref() } diff --git a/write_buffer/src/config.rs b/write_buffer/src/config.rs index 3672773376..7baaee6166 100644 --- a/write_buffer/src/config.rs +++ b/write_buffer/src/config.rs @@ -1,24 +1,21 @@ +use crate::{ + core::{WriteBufferError, WriteBufferReading, WriteBufferWriting}, + file::{FileBufferConsumer, FileBufferProducer}, + mock::{ + MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting, + MockBufferForWritingThatAlwaysErrors, MockBufferSharedState, + }, +}; +use data_types::{server_id::ServerId, write_buffer::WriteBufferConnection}; use parking_lot::RwLock; use std::{ collections::{btree_map::Entry, BTreeMap}, path::PathBuf, sync::Arc, }; - -use data_types::{server_id::ServerId, write_buffer::WriteBufferConnection}; use time::TimeProvider; use trace::TraceCollector; -use crate::{ - core::{WriteBufferError, WriteBufferReading, WriteBufferWriting}, - file::{FileBufferConsumer, FileBufferProducer}, - kafka::{KafkaBufferConsumer, KafkaBufferProducer}, - mock::{ - MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting, - MockBufferForWritingThatAlwaysErrors, MockBufferSharedState, - }, -}; - #[derive(Debug)] pub enum WriteBufferConfig { Writing(Arc), @@ -37,6 +34,7 @@ enum Mock { pub struct WriteBufferConfigFactory { mocks: RwLock>, time_provider: Arc, + #[allow(dead_code)] // this field is only used in optionally-compiled kafka code metric_registry: Arc, } @@ -108,18 +106,7 @@ impl WriteBufferConfigFactory { .await?; Arc::new(file_buffer) as _ } - "kafka" => { - let kafka_buffer = KafkaBufferProducer::new( - &cfg.connection, - db_name, - &cfg.connection_config, - cfg.creation_config.as_ref(), - Arc::clone(&self.time_provider), - &self.metric_registry, - ) - .await?; - Arc::new(kafka_buffer) as _ - } + "kafka" => self.kafka_buffer_producer(db_name, cfg).await?, "mock" => match self.get_mock(&cfg.connection)? { Mock::Normal(state) => { let mock_buffer = MockBufferForWriting::new( @@ -142,6 +129,38 @@ impl WriteBufferConfigFactory { Ok(writer) } + #[cfg(feature = "kafka")] + async fn kafka_buffer_producer( + &self, + db_name: &str, + cfg: &WriteBufferConnection, + ) -> Result, WriteBufferError> { + let kafka_buffer = crate::kafka::KafkaBufferProducer::new( + &cfg.connection, + db_name, + &cfg.connection_config, + cfg.creation_config.as_ref(), + Arc::clone(&self.time_provider), + &self.metric_registry, + ) + .await?; + + Ok(Arc::new(kafka_buffer) as _) + } + + #[cfg(not(feature = "kafka"))] + async fn kafka_buffer_producer( + &self, + _db_name: &str, + _cfg: &WriteBufferConnection, + ) -> Result, WriteBufferError> { + Err(String::from( + "`WriteBufferWriting` of type `kafka` requested, but Kafka support was not included \ + in this build by enabling the `kafka` feature", + ) + .into()) + } + /// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`] pub async fn new_config_read( &self, @@ -163,17 +182,8 @@ impl WriteBufferConfigFactory { Box::new(file_buffer) as _ } "kafka" => { - let kafka_buffer = KafkaBufferConsumer::new( - &cfg.connection, - server_id, - db_name, - &cfg.connection_config, - cfg.creation_config.as_ref(), - trace_collector, - &self.metric_registry, - ) - .await?; - Box::new(kafka_buffer) as _ + self.kafka_buffer_consumer(server_id, db_name, trace_collector, cfg) + .await? } "mock" => match self.get_mock(&cfg.connection)? { Mock::Normal(state) => { @@ -193,21 +203,52 @@ impl WriteBufferConfigFactory { Ok(reader) } + + #[cfg(feature = "kafka")] + async fn kafka_buffer_consumer( + &self, + server_id: ServerId, + db_name: &str, + trace_collector: Option<&Arc>, + cfg: &WriteBufferConnection, + ) -> Result, WriteBufferError> { + let kafka_buffer = crate::kafka::KafkaBufferConsumer::new( + &cfg.connection, + server_id, + db_name, + &cfg.connection_config, + cfg.creation_config.as_ref(), + trace_collector, + &self.metric_registry, + ) + .await?; + + Ok(Box::new(kafka_buffer) as _) + } + + #[cfg(not(feature = "kafka"))] + async fn kafka_buffer_consumer( + &self, + _server_id: ServerId, + _db_name: &str, + _trace_collector: Option<&Arc>, + _cfg: &WriteBufferConnection, + ) -> Result, WriteBufferError> { + Err(String::from( + "`WriteBufferReading` of type `kafka` requested, but Kafka support was not included \ + in this build by enabling the `kafka` feature", + ) + .into()) + } } #[cfg(test)] mod tests { - use std::{convert::TryFrom, num::NonZeroU32}; - - use data_types::{write_buffer::WriteBufferCreationConfig, DatabaseName}; - use tempfile::TempDir; - - use crate::{ - kafka::test_utils::random_kafka_topic, maybe_skip_kafka_integration, - mock::MockBufferSharedState, - }; - use super::*; + use crate::{core::test_utils::random_topic_name, mock::MockBufferSharedState}; + use data_types::{write_buffer::WriteBufferCreationConfig, DatabaseName}; + use std::{convert::TryFrom, num::NonZeroU32}; + use tempfile::TempDir; #[tokio::test] async fn test_writing_file() { @@ -248,46 +289,6 @@ mod tests { assert_eq!(conn.type_name(), "file"); } - #[tokio::test] - async fn test_writing_kafka() { - let conn = maybe_skip_kafka_integration!(); - let factory = factory(); - let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); - let cfg = WriteBufferConnection { - type_: "kafka".to_string(), - connection: conn, - creation_config: Some(WriteBufferCreationConfig::default()), - ..Default::default() - }; - - let conn = factory - .new_config_write(db_name.as_str(), &cfg) - .await - .unwrap(); - assert_eq!(conn.type_name(), "kafka"); - } - - #[tokio::test] - async fn test_reading_kafka() { - let conn = maybe_skip_kafka_integration!(); - let factory = factory(); - let server_id = ServerId::try_from(1).unwrap(); - - let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); - let cfg = WriteBufferConnection { - type_: "kafka".to_string(), - connection: conn, - creation_config: Some(WriteBufferCreationConfig::default()), - ..Default::default() - }; - - let conn = factory - .new_config_read(server_id, db_name.as_str(), None, &cfg) - .await - .unwrap(); - assert_eq!(conn.type_name(), "kafka"); - } - #[tokio::test] async fn test_writing_mock() { let factory = factory(); @@ -297,7 +298,7 @@ mod tests { let mock_name = "some_mock"; factory.register_mock(mock_name.to_string(), state); - let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); + let db_name = DatabaseName::try_from(random_topic_name()).unwrap(); let cfg = WriteBufferConnection { type_: "mock".to_string(), connection: mock_name.to_string(), @@ -333,7 +334,7 @@ mod tests { factory.register_mock(mock_name.to_string(), state); let server_id = ServerId::try_from(1).unwrap(); - let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); + let db_name = DatabaseName::try_from(random_topic_name()).unwrap(); let cfg = WriteBufferConnection { type_: "mock".to_string(), connection: mock_name.to_string(), @@ -366,7 +367,7 @@ mod tests { let mock_name = "some_mock"; factory.register_always_fail_mock(mock_name.to_string()); - let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); + let db_name = DatabaseName::try_from(random_topic_name()).unwrap(); let cfg = WriteBufferConnection { type_: "mock".to_string(), connection: mock_name.to_string(), @@ -444,4 +445,99 @@ mod tests { let registry = Arc::new(metric::Registry::new()); WriteBufferConfigFactory::new(time, registry) } + + #[cfg(feature = "kafka")] + mod kafka { + use super::*; + use crate::maybe_skip_kafka_integration; + + #[tokio::test] + async fn test_writing_kafka() { + let conn = maybe_skip_kafka_integration!(); + let factory = factory(); + let db_name = DatabaseName::try_from(random_topic_name()).unwrap(); + let cfg = WriteBufferConnection { + type_: "kafka".to_string(), + connection: conn, + creation_config: Some(WriteBufferCreationConfig::default()), + ..Default::default() + }; + + let conn = factory + .new_config_write(db_name.as_str(), &cfg) + .await + .unwrap(); + assert_eq!(conn.type_name(), "kafka"); + } + + #[tokio::test] + async fn test_reading_kafka() { + let conn = maybe_skip_kafka_integration!(); + let factory = factory(); + let server_id = ServerId::try_from(1).unwrap(); + + let db_name = DatabaseName::try_from(random_topic_name()).unwrap(); + let cfg = WriteBufferConnection { + type_: "kafka".to_string(), + connection: conn, + creation_config: Some(WriteBufferCreationConfig::default()), + ..Default::default() + }; + + let conn = factory + .new_config_read(server_id, db_name.as_str(), None, &cfg) + .await + .unwrap(); + assert_eq!(conn.type_name(), "kafka"); + } + } + + #[cfg(not(feature = "kafka"))] + mod no_kafka { + use super::*; + + #[tokio::test] + async fn writing_to_kafka_without_kafka_feature_returns_error() { + let factory = factory(); + let db_name = DatabaseName::try_from(random_topic_name()).unwrap(); + let cfg = WriteBufferConnection { + type_: "kafka".to_string(), + creation_config: Some(WriteBufferCreationConfig::default()), + ..Default::default() + }; + + let err = factory + .new_config_write(db_name.as_str(), &cfg) + .await + .unwrap_err(); + assert_eq!( + err.to_string(), + "`WriteBufferWriting` of type `kafka` requested, but Kafka support was not \ + included in this build by enabling the `kafka` feature" + ); + } + + #[tokio::test] + async fn reading_from_kafka_without_kafka_feature_returns_error() { + let factory = factory(); + let db_name = DatabaseName::try_from(random_topic_name()).unwrap(); + let server_id = ServerId::try_from(1).unwrap(); + let cfg = WriteBufferConnection { + type_: "kafka".to_string(), + creation_config: Some(WriteBufferCreationConfig::default()), + ..Default::default() + }; + + let err = factory + .new_config_read(server_id, db_name.as_str(), None, &cfg) + .await + .unwrap_err(); + + assert_eq!( + err.to_string(), + "`WriteBufferReading` of type `kafka` requested, but Kafka support was not \ + included in this build by enabling the `kafka` feature" + ); + } + } } diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index 22da8dd407..f59c0fb8da 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -99,6 +99,10 @@ pub trait WriteBufferReading: Sync + Send + Debug + 'static { pub mod test_utils { //! Generic tests for all write buffer implementations. + use super::{WriteBufferError, WriteBufferReading, WriteBufferWriting}; + use async_trait::async_trait; + use dml::{test_util::assert_write_op_eq, DmlMeta, DmlOperation, DmlWrite}; + use futures::{StreamExt, TryStreamExt}; use std::{ collections::{BTreeMap, BTreeSet}, convert::TryFrom, @@ -106,14 +110,14 @@ pub mod test_utils { sync::Arc, time::Duration, }; - - use async_trait::async_trait; - use dml::{test_util::assert_write_op_eq, DmlMeta, DmlOperation, DmlWrite}; - use futures::{StreamExt, TryStreamExt}; use time::{Time, TimeProvider}; use trace::{ctx::SpanContext, RingBufferTraceCollector, TraceCollector}; + use uuid::Uuid; - use super::{WriteBufferError, WriteBufferReading, WriteBufferWriting}; + /// Generated random topic name for testing. + pub fn random_topic_name() -> String { + format!("test_topic_{}", Uuid::new_v4()) + } /// Adapter to make a concrete write buffer implementation work w/ [`perform_generic_tests`]. #[async_trait] diff --git a/write_buffer/src/kafka.rs b/write_buffer/src/kafka.rs index 621e04f16b..6895bdbec5 100644 --- a/write_buffer/src/kafka.rs +++ b/write_buffer/src/kafka.rs @@ -1,11 +1,3 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - convert::{TryFrom, TryInto}, - num::NonZeroU32, - sync::Arc, - time::Duration, -}; - use async_trait::async_trait; use futures::{FutureExt, StreamExt}; use metric::{Metric, U64Gauge, U64Histogram, U64HistogramOptions}; @@ -23,15 +15,13 @@ use rdkafka::{ util::Timeout, ClientConfig, ClientContext, Message, Offset, TopicPartitionList, }; - -use data_types::{ - sequence::Sequence, server_id::ServerId, write_buffer::WriteBufferCreationConfig, +use std::{ + collections::{BTreeMap, BTreeSet}, + convert::{TryFrom, TryInto}, + num::NonZeroU32, + sync::Arc, + time::Duration, }; -use dml::{DmlMeta, DmlOperation}; -use observability_deps::tracing::{debug, info}; -use time::{Time, TimeProvider}; -use tokio::task::JoinHandle; -use trace::TraceCollector; use crate::{ codec::{ContentType, IoxHeaders}, @@ -40,6 +30,14 @@ use crate::{ WriteBufferWriting, WriteStream, }, }; +use data_types::{ + sequence::Sequence, server_id::ServerId, write_buffer::WriteBufferCreationConfig, +}; +use dml::{DmlMeta, DmlOperation}; +use observability_deps::tracing::{debug, info}; +use time::{Time, TimeProvider}; +use tokio::task::JoinHandle; +use trace::TraceCollector; /// Default timeout supplied to rdkafka client for kafka operations. /// @@ -745,12 +743,9 @@ impl ClientContext for ClientContextImpl { impl ConsumerContext for ClientContextImpl {} pub mod test_utils { - use std::{collections::BTreeMap, time::Duration}; - - use rdkafka::admin::{AdminOptions, AlterConfig, ResourceSpecifier}; - use uuid::Uuid; - use super::admin_client; + use rdkafka::admin::{AdminOptions, AlterConfig, ResourceSpecifier}; + use std::{collections::BTreeMap, time::Duration}; /// Get the testing Kafka connection string or return current scope. /// @@ -829,37 +824,28 @@ pub mod test_utils { let result = results.pop().expect("just checked the vector length"); result.unwrap(); } - - /// Generated random topic name for testing. - pub fn random_kafka_topic() -> String { - format!("test_topic_{}", Uuid::new_v4()) - } } /// Kafka tests (only run when in integration test mode and kafka is running). /// see [`crate::maybe_skip_kafka_integration`] for more details. #[cfg(test)] mod tests { + use super::{test_utils::kafka_sequencer_options, *}; + use crate::{ + codec::HEADER_CONTENT_TYPE, + core::test_utils::{ + map_pop_first, perform_generic_tests, random_topic_name, set_pop_first, + write as write_to_writer, TestAdapter, TestContext, + }, + maybe_skip_kafka_integration, + }; use std::{ num::NonZeroU32, sync::atomic::{AtomicU32, Ordering}, }; - use time::TimeProvider; use trace::{RingBufferTraceCollector, TraceCollector}; - use crate::codec::HEADER_CONTENT_TYPE; - use crate::{ - core::test_utils::{ - map_pop_first, perform_generic_tests, set_pop_first, write as write_to_writer, - TestAdapter, TestContext, - }, - kafka::test_utils::random_kafka_topic, - maybe_skip_kafka_integration, - }; - - use super::{test_utils::kafka_sequencer_options, *}; - struct KafkaTestAdapter { conn: String, } @@ -881,7 +867,7 @@ mod tests { ) -> Self::Context { KafkaTestContext { conn: self.conn.clone(), - database_name: random_kafka_topic(), + database_name: random_topic_name(), server_id_counter: AtomicU32::new(1), n_sequencers, time_provider, @@ -964,7 +950,7 @@ mod tests { #[tokio::test] async fn topic_create_twice() { let conn = maybe_skip_kafka_integration!(); - let database_name = random_kafka_topic(); + let database_name = random_topic_name(); create_kafka_topic( &conn, diff --git a/write_buffer/src/lib.rs b/write_buffer/src/lib.rs index 2405e03729..aaf97776fe 100644 --- a/write_buffer/src/lib.rs +++ b/write_buffer/src/lib.rs @@ -12,5 +12,8 @@ pub(crate) mod codec; pub mod config; pub mod core; pub mod file; + +#[cfg(feature = "kafka")] pub mod kafka; + pub mod mock;