diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml index d3a69cb7bb..5472f2a4b4 100644 --- a/write_buffer/Cargo.toml +++ b/write_buffer/Cargo.toml @@ -19,7 +19,7 @@ observability_deps = { path = "../observability_deps" } parking_lot = "0.11.2" pin-project = "1.0" prost = "0.8" -rdkafka = "0.28.0" +rdkafka = { version = "0.28.0", optional = true } time = { path = "../time" } tokio = { version = "1.13", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] } tokio-util = "0.6.9" @@ -28,6 +28,9 @@ trace_http = { path = "../trace_http" } uuid = { version = "0.8", features = ["v4"] } workspace-hack = { path = "../workspace-hack"} +[features] +kafka = ["rdkafka"] + [dev-dependencies] tempfile = "3.1.0" diff --git a/write_buffer/src/codec.rs b/write_buffer/src/codec.rs index 89d5eb4d1e..38e39f4a84 100644 --- a/write_buffer/src/codec.rs +++ b/write_buffer/src/codec.rs @@ -100,11 +100,13 @@ impl IoxHeaders { } /// Gets the content type + #[allow(dead_code)] // this function is only used in optionally-compiled kafka code pub fn content_type(&self) -> ContentType { self.content_type } /// Gets the span context if any + #[allow(dead_code)] // this function is only used in optionally-compiled kafka code pub fn span_context(&self) -> Option<&SpanContext> { self.span_context.as_ref() } diff --git a/write_buffer/src/config.rs b/write_buffer/src/config.rs index 3672773376..7597b5ec61 100644 --- a/write_buffer/src/config.rs +++ b/write_buffer/src/config.rs @@ -12,7 +12,6 @@ use trace::TraceCollector; use crate::{ core::{WriteBufferError, WriteBufferReading, WriteBufferWriting}, file::{FileBufferConsumer, FileBufferProducer}, - kafka::{KafkaBufferConsumer, KafkaBufferProducer}, mock::{ MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting, MockBufferForWritingThatAlwaysErrors, MockBufferSharedState, @@ -37,6 +36,7 @@ enum Mock { pub struct WriteBufferConfigFactory { mocks: RwLock>, time_provider: Arc, + #[allow(dead_code)] // this field is only used in optionally-compiled kafka code metric_registry: Arc, } @@ -108,18 +108,7 @@ impl WriteBufferConfigFactory { .await?; Arc::new(file_buffer) as _ } - "kafka" => { - let kafka_buffer = KafkaBufferProducer::new( - &cfg.connection, - db_name, - &cfg.connection_config, - cfg.creation_config.as_ref(), - Arc::clone(&self.time_provider), - &self.metric_registry, - ) - .await?; - Arc::new(kafka_buffer) as _ - } + "kafka" => self.kafka_buffer_producer(db_name, cfg).await?, "mock" => match self.get_mock(&cfg.connection)? { Mock::Normal(state) => { let mock_buffer = MockBufferForWriting::new( @@ -142,6 +131,37 @@ impl WriteBufferConfigFactory { Ok(writer) } + #[cfg(feature = "kafka")] + async fn kafka_buffer_producer( + &self, + db_name: &str, + cfg: &WriteBufferConnection, + ) -> Result, WriteBufferError> { + let kafka_buffer = crate::kafka::KafkaBufferProducer::new( + &cfg.connection, + db_name, + &cfg.connection_config, + cfg.creation_config.as_ref(), + Arc::clone(&self.time_provider), + &self.metric_registry, + ) + .await?; + + Ok(Arc::new(kafka_buffer) as _) + } + + #[cfg(not(feature = "kafka"))] + async fn kafka_buffer_producer( + &self, + _db_name: &str, + _cfg: &WriteBufferConnection, + ) -> Result, WriteBufferError> { + panic!( + "`WriteBufferWriting` of type `kafka` requested, but Kafka support was not included \ + in this build by enabling the `kafka` feature" + ); + } + /// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`] pub async fn new_config_read( &self, @@ -163,17 +183,8 @@ impl WriteBufferConfigFactory { Box::new(file_buffer) as _ } "kafka" => { - let kafka_buffer = KafkaBufferConsumer::new( - &cfg.connection, - server_id, - db_name, - &cfg.connection_config, - cfg.creation_config.as_ref(), - trace_collector, - &self.metric_registry, - ) - .await?; - Box::new(kafka_buffer) as _ + self.kafka_buffer_consumer(server_id, db_name, trace_collector, cfg) + .await? } "mock" => match self.get_mock(&cfg.connection)? { Mock::Normal(state) => { @@ -193,6 +204,42 @@ impl WriteBufferConfigFactory { Ok(reader) } + + #[cfg(feature = "kafka")] + async fn kafka_buffer_consumer( + &self, + server_id: ServerId, + db_name: &str, + trace_collector: Option<&Arc>, + cfg: &WriteBufferConnection, + ) -> Result, WriteBufferError> { + let kafka_buffer = crate::kafka::KafkaBufferConsumer::new( + &cfg.connection, + server_id, + db_name, + &cfg.connection_config, + cfg.creation_config.as_ref(), + trace_collector, + &self.metric_registry, + ) + .await?; + + Ok(Box::new(kafka_buffer) as _) + } + + #[cfg(not(feature = "kafka"))] + async fn kafka_buffer_consumer( + &self, + _server_id: ServerId, + _db_name: &str, + _trace_collector: Option<&Arc>, + _cfg: &WriteBufferConnection, + ) -> Result, WriteBufferError> { + panic!( + "`WriteBufferReading` of type `kafka` requested, but Kafka support was not included \ + in this build by enabling the `kafka` feature" + ); + } } #[cfg(test)] @@ -202,10 +249,7 @@ mod tests { use data_types::{write_buffer::WriteBufferCreationConfig, DatabaseName}; use tempfile::TempDir; - use crate::{ - kafka::test_utils::random_kafka_topic, maybe_skip_kafka_integration, - mock::MockBufferSharedState, - }; + use crate::{core::test_utils::random_topic_name, mock::MockBufferSharedState}; use super::*; @@ -248,46 +292,6 @@ mod tests { assert_eq!(conn.type_name(), "file"); } - #[tokio::test] - async fn test_writing_kafka() { - let conn = maybe_skip_kafka_integration!(); - let factory = factory(); - let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); - let cfg = WriteBufferConnection { - type_: "kafka".to_string(), - connection: conn, - creation_config: Some(WriteBufferCreationConfig::default()), - ..Default::default() - }; - - let conn = factory - .new_config_write(db_name.as_str(), &cfg) - .await - .unwrap(); - assert_eq!(conn.type_name(), "kafka"); - } - - #[tokio::test] - async fn test_reading_kafka() { - let conn = maybe_skip_kafka_integration!(); - let factory = factory(); - let server_id = ServerId::try_from(1).unwrap(); - - let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); - let cfg = WriteBufferConnection { - type_: "kafka".to_string(), - connection: conn, - creation_config: Some(WriteBufferCreationConfig::default()), - ..Default::default() - }; - - let conn = factory - .new_config_read(server_id, db_name.as_str(), None, &cfg) - .await - .unwrap(); - assert_eq!(conn.type_name(), "kafka"); - } - #[tokio::test] async fn test_writing_mock() { let factory = factory(); @@ -297,7 +301,7 @@ mod tests { let mock_name = "some_mock"; factory.register_mock(mock_name.to_string(), state); - let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); + let db_name = DatabaseName::try_from(random_topic_name()).unwrap(); let cfg = WriteBufferConnection { type_: "mock".to_string(), connection: mock_name.to_string(), @@ -333,7 +337,7 @@ mod tests { factory.register_mock(mock_name.to_string(), state); let server_id = ServerId::try_from(1).unwrap(); - let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); + let db_name = DatabaseName::try_from(random_topic_name()).unwrap(); let cfg = WriteBufferConnection { type_: "mock".to_string(), connection: mock_name.to_string(), @@ -366,7 +370,7 @@ mod tests { let mock_name = "some_mock"; factory.register_always_fail_mock(mock_name.to_string()); - let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); + let db_name = DatabaseName::try_from(random_topic_name()).unwrap(); let cfg = WriteBufferConnection { type_: "mock".to_string(), connection: mock_name.to_string(), @@ -444,4 +448,50 @@ mod tests { let registry = Arc::new(metric::Registry::new()); WriteBufferConfigFactory::new(time, registry) } + + #[cfg(feature = "kafka")] + mod kafka { + use super::*; + use crate::maybe_skip_kafka_integration; + + #[tokio::test] + async fn test_writing_kafka() { + let conn = maybe_skip_kafka_integration!(); + let factory = factory(); + let db_name = DatabaseName::try_from(random_topic_name()).unwrap(); + let cfg = WriteBufferConnection { + type_: "kafka".to_string(), + connection: conn, + creation_config: Some(WriteBufferCreationConfig::default()), + ..Default::default() + }; + + let conn = factory + .new_config_write(db_name.as_str(), &cfg) + .await + .unwrap(); + assert_eq!(conn.type_name(), "kafka"); + } + + #[tokio::test] + async fn test_reading_kafka() { + let conn = maybe_skip_kafka_integration!(); + let factory = factory(); + let server_id = ServerId::try_from(1).unwrap(); + + let db_name = DatabaseName::try_from(random_topic_name()).unwrap(); + let cfg = WriteBufferConnection { + type_: "kafka".to_string(), + connection: conn, + creation_config: Some(WriteBufferCreationConfig::default()), + ..Default::default() + }; + + let conn = factory + .new_config_read(server_id, db_name.as_str(), None, &cfg) + .await + .unwrap(); + assert_eq!(conn.type_name(), "kafka"); + } + } } diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index 22da8dd407..3dbdce4d22 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -112,9 +112,15 @@ pub mod test_utils { use futures::{StreamExt, TryStreamExt}; use time::{Time, TimeProvider}; use trace::{ctx::SpanContext, RingBufferTraceCollector, TraceCollector}; + use uuid::Uuid; use super::{WriteBufferError, WriteBufferReading, WriteBufferWriting}; + /// Generated random topic name for testing. + pub fn random_topic_name() -> String { + format!("test_topic_{}", Uuid::new_v4()) + } + /// Adapter to make a concrete write buffer implementation work w/ [`perform_generic_tests`]. #[async_trait] pub trait TestAdapter: Send + Sync { diff --git a/write_buffer/src/kafka.rs b/write_buffer/src/kafka.rs index 621e04f16b..ddd3a27af1 100644 --- a/write_buffer/src/kafka.rs +++ b/write_buffer/src/kafka.rs @@ -748,7 +748,6 @@ pub mod test_utils { use std::{collections::BTreeMap, time::Duration}; use rdkafka::admin::{AdminOptions, AlterConfig, ResourceSpecifier}; - use uuid::Uuid; use super::admin_client; @@ -829,11 +828,6 @@ pub mod test_utils { let result = results.pop().expect("just checked the vector length"); result.unwrap(); } - - /// Generated random topic name for testing. - pub fn random_kafka_topic() -> String { - format!("test_topic_{}", Uuid::new_v4()) - } } /// Kafka tests (only run when in integration test mode and kafka is running). @@ -850,11 +844,11 @@ mod tests { use crate::codec::HEADER_CONTENT_TYPE; use crate::{ + core::test_utils::random_topic_name, core::test_utils::{ map_pop_first, perform_generic_tests, set_pop_first, write as write_to_writer, TestAdapter, TestContext, }, - kafka::test_utils::random_kafka_topic, maybe_skip_kafka_integration, }; @@ -881,7 +875,7 @@ mod tests { ) -> Self::Context { KafkaTestContext { conn: self.conn.clone(), - database_name: random_kafka_topic(), + database_name: random_topic_name(), server_id_counter: AtomicU32::new(1), n_sequencers, time_provider, @@ -964,7 +958,7 @@ mod tests { #[tokio::test] async fn topic_create_twice() { let conn = maybe_skip_kafka_integration!(); - let database_name = random_kafka_topic(); + let database_name = random_topic_name(); create_kafka_topic( &conn, diff --git a/write_buffer/src/lib.rs b/write_buffer/src/lib.rs index 2405e03729..aaf97776fe 100644 --- a/write_buffer/src/lib.rs +++ b/write_buffer/src/lib.rs @@ -12,5 +12,8 @@ pub(crate) mod codec; pub mod config; pub mod core; pub mod file; + +#[cfg(feature = "kafka")] pub mod kafka; + pub mod mock;