feat: Put kafka write_buffer code behind a feature flag

Which is off by default. This makes rdkafka optional to minimize
build-time dependencies for users that don't plan on using a Kafka write
buffer.
pull/24376/head
Carol (Nichols || Goulding) 2021-12-08 16:53:28 -05:00
parent 46b43d3c30
commit 403dcae93c
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
6 changed files with 139 additions and 81 deletions

View File

@ -19,7 +19,7 @@ observability_deps = { path = "../observability_deps" }
parking_lot = "0.11.2" parking_lot = "0.11.2"
pin-project = "1.0" pin-project = "1.0"
prost = "0.8" prost = "0.8"
rdkafka = "0.28.0" rdkafka = { version = "0.28.0", optional = true }
time = { path = "../time" } time = { path = "../time" }
tokio = { version = "1.13", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] } tokio = { version = "1.13", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] }
tokio-util = "0.6.9" tokio-util = "0.6.9"
@ -28,6 +28,9 @@ trace_http = { path = "../trace_http" }
uuid = { version = "0.8", features = ["v4"] } uuid = { version = "0.8", features = ["v4"] }
workspace-hack = { path = "../workspace-hack"} workspace-hack = { path = "../workspace-hack"}
[features]
kafka = ["rdkafka"]
[dev-dependencies] [dev-dependencies]
tempfile = "3.1.0" tempfile = "3.1.0"

View File

@ -100,11 +100,13 @@ impl IoxHeaders {
} }
/// Gets the content type /// Gets the content type
#[allow(dead_code)] // this function is only used in optionally-compiled kafka code
pub fn content_type(&self) -> ContentType { pub fn content_type(&self) -> ContentType {
self.content_type self.content_type
} }
/// Gets the span context if any /// Gets the span context if any
#[allow(dead_code)] // this function is only used in optionally-compiled kafka code
pub fn span_context(&self) -> Option<&SpanContext> { pub fn span_context(&self) -> Option<&SpanContext> {
self.span_context.as_ref() self.span_context.as_ref()
} }

View File

@ -12,7 +12,6 @@ use trace::TraceCollector;
use crate::{ use crate::{
core::{WriteBufferError, WriteBufferReading, WriteBufferWriting}, core::{WriteBufferError, WriteBufferReading, WriteBufferWriting},
file::{FileBufferConsumer, FileBufferProducer}, file::{FileBufferConsumer, FileBufferProducer},
kafka::{KafkaBufferConsumer, KafkaBufferProducer},
mock::{ mock::{
MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting, MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting,
MockBufferForWritingThatAlwaysErrors, MockBufferSharedState, MockBufferForWritingThatAlwaysErrors, MockBufferSharedState,
@ -37,6 +36,7 @@ enum Mock {
pub struct WriteBufferConfigFactory { pub struct WriteBufferConfigFactory {
mocks: RwLock<BTreeMap<String, Mock>>, mocks: RwLock<BTreeMap<String, Mock>>,
time_provider: Arc<dyn TimeProvider>, time_provider: Arc<dyn TimeProvider>,
#[allow(dead_code)] // this field is only used in optionally-compiled kafka code
metric_registry: Arc<metric::Registry>, metric_registry: Arc<metric::Registry>,
} }
@ -108,18 +108,7 @@ impl WriteBufferConfigFactory {
.await?; .await?;
Arc::new(file_buffer) as _ Arc::new(file_buffer) as _
} }
"kafka" => { "kafka" => self.kafka_buffer_producer(db_name, cfg).await?,
let kafka_buffer = KafkaBufferProducer::new(
&cfg.connection,
db_name,
&cfg.connection_config,
cfg.creation_config.as_ref(),
Arc::clone(&self.time_provider),
&self.metric_registry,
)
.await?;
Arc::new(kafka_buffer) as _
}
"mock" => match self.get_mock(&cfg.connection)? { "mock" => match self.get_mock(&cfg.connection)? {
Mock::Normal(state) => { Mock::Normal(state) => {
let mock_buffer = MockBufferForWriting::new( let mock_buffer = MockBufferForWriting::new(
@ -142,6 +131,37 @@ impl WriteBufferConfigFactory {
Ok(writer) Ok(writer)
} }
#[cfg(feature = "kafka")]
async fn kafka_buffer_producer(
&self,
db_name: &str,
cfg: &WriteBufferConnection,
) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> {
let kafka_buffer = crate::kafka::KafkaBufferProducer::new(
&cfg.connection,
db_name,
&cfg.connection_config,
cfg.creation_config.as_ref(),
Arc::clone(&self.time_provider),
&self.metric_registry,
)
.await?;
Ok(Arc::new(kafka_buffer) as _)
}
#[cfg(not(feature = "kafka"))]
async fn kafka_buffer_producer(
&self,
_db_name: &str,
_cfg: &WriteBufferConnection,
) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> {
panic!(
"`WriteBufferWriting` of type `kafka` requested, but Kafka support was not included \
in this build by enabling the `kafka` feature"
);
}
/// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`] /// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`]
pub async fn new_config_read( pub async fn new_config_read(
&self, &self,
@ -163,17 +183,8 @@ impl WriteBufferConfigFactory {
Box::new(file_buffer) as _ Box::new(file_buffer) as _
} }
"kafka" => { "kafka" => {
let kafka_buffer = KafkaBufferConsumer::new( self.kafka_buffer_consumer(server_id, db_name, trace_collector, cfg)
&cfg.connection, .await?
server_id,
db_name,
&cfg.connection_config,
cfg.creation_config.as_ref(),
trace_collector,
&self.metric_registry,
)
.await?;
Box::new(kafka_buffer) as _
} }
"mock" => match self.get_mock(&cfg.connection)? { "mock" => match self.get_mock(&cfg.connection)? {
Mock::Normal(state) => { Mock::Normal(state) => {
@ -193,6 +204,42 @@ impl WriteBufferConfigFactory {
Ok(reader) Ok(reader)
} }
#[cfg(feature = "kafka")]
async fn kafka_buffer_consumer(
&self,
server_id: ServerId,
db_name: &str,
trace_collector: Option<&Arc<dyn TraceCollector>>,
cfg: &WriteBufferConnection,
) -> Result<Box<dyn WriteBufferReading>, WriteBufferError> {
let kafka_buffer = crate::kafka::KafkaBufferConsumer::new(
&cfg.connection,
server_id,
db_name,
&cfg.connection_config,
cfg.creation_config.as_ref(),
trace_collector,
&self.metric_registry,
)
.await?;
Ok(Box::new(kafka_buffer) as _)
}
#[cfg(not(feature = "kafka"))]
async fn kafka_buffer_consumer(
&self,
_server_id: ServerId,
_db_name: &str,
_trace_collector: Option<&Arc<dyn TraceCollector>>,
_cfg: &WriteBufferConnection,
) -> Result<Box<dyn WriteBufferReading>, WriteBufferError> {
panic!(
"`WriteBufferReading` of type `kafka` requested, but Kafka support was not included \
in this build by enabling the `kafka` feature"
);
}
} }
#[cfg(test)] #[cfg(test)]
@ -202,10 +249,7 @@ mod tests {
use data_types::{write_buffer::WriteBufferCreationConfig, DatabaseName}; use data_types::{write_buffer::WriteBufferCreationConfig, DatabaseName};
use tempfile::TempDir; use tempfile::TempDir;
use crate::{ use crate::{core::test_utils::random_topic_name, mock::MockBufferSharedState};
kafka::test_utils::random_kafka_topic, maybe_skip_kafka_integration,
mock::MockBufferSharedState,
};
use super::*; use super::*;
@ -248,46 +292,6 @@ mod tests {
assert_eq!(conn.type_name(), "file"); assert_eq!(conn.type_name(), "file");
} }
#[tokio::test]
async fn test_writing_kafka() {
let conn = maybe_skip_kafka_integration!();
let factory = factory();
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
let cfg = WriteBufferConnection {
type_: "kafka".to_string(),
connection: conn,
creation_config: Some(WriteBufferCreationConfig::default()),
..Default::default()
};
let conn = factory
.new_config_write(db_name.as_str(), &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "kafka");
}
#[tokio::test]
async fn test_reading_kafka() {
let conn = maybe_skip_kafka_integration!();
let factory = factory();
let server_id = ServerId::try_from(1).unwrap();
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap();
let cfg = WriteBufferConnection {
type_: "kafka".to_string(),
connection: conn,
creation_config: Some(WriteBufferCreationConfig::default()),
..Default::default()
};
let conn = factory
.new_config_read(server_id, db_name.as_str(), None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "kafka");
}
#[tokio::test] #[tokio::test]
async fn test_writing_mock() { async fn test_writing_mock() {
let factory = factory(); let factory = factory();
@ -297,7 +301,7 @@ mod tests {
let mock_name = "some_mock"; let mock_name = "some_mock";
factory.register_mock(mock_name.to_string(), state); factory.register_mock(mock_name.to_string(), state);
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
let cfg = WriteBufferConnection { let cfg = WriteBufferConnection {
type_: "mock".to_string(), type_: "mock".to_string(),
connection: mock_name.to_string(), connection: mock_name.to_string(),
@ -333,7 +337,7 @@ mod tests {
factory.register_mock(mock_name.to_string(), state); factory.register_mock(mock_name.to_string(), state);
let server_id = ServerId::try_from(1).unwrap(); let server_id = ServerId::try_from(1).unwrap();
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
let cfg = WriteBufferConnection { let cfg = WriteBufferConnection {
type_: "mock".to_string(), type_: "mock".to_string(),
connection: mock_name.to_string(), connection: mock_name.to_string(),
@ -366,7 +370,7 @@ mod tests {
let mock_name = "some_mock"; let mock_name = "some_mock";
factory.register_always_fail_mock(mock_name.to_string()); factory.register_always_fail_mock(mock_name.to_string());
let db_name = DatabaseName::try_from(random_kafka_topic()).unwrap(); let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
let cfg = WriteBufferConnection { let cfg = WriteBufferConnection {
type_: "mock".to_string(), type_: "mock".to_string(),
connection: mock_name.to_string(), connection: mock_name.to_string(),
@ -444,4 +448,50 @@ mod tests {
let registry = Arc::new(metric::Registry::new()); let registry = Arc::new(metric::Registry::new());
WriteBufferConfigFactory::new(time, registry) WriteBufferConfigFactory::new(time, registry)
} }
#[cfg(feature = "kafka")]
mod kafka {
use super::*;
use crate::maybe_skip_kafka_integration;
#[tokio::test]
async fn test_writing_kafka() {
let conn = maybe_skip_kafka_integration!();
let factory = factory();
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
let cfg = WriteBufferConnection {
type_: "kafka".to_string(),
connection: conn,
creation_config: Some(WriteBufferCreationConfig::default()),
..Default::default()
};
let conn = factory
.new_config_write(db_name.as_str(), &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "kafka");
}
#[tokio::test]
async fn test_reading_kafka() {
let conn = maybe_skip_kafka_integration!();
let factory = factory();
let server_id = ServerId::try_from(1).unwrap();
let db_name = DatabaseName::try_from(random_topic_name()).unwrap();
let cfg = WriteBufferConnection {
type_: "kafka".to_string(),
connection: conn,
creation_config: Some(WriteBufferCreationConfig::default()),
..Default::default()
};
let conn = factory
.new_config_read(server_id, db_name.as_str(), None, &cfg)
.await
.unwrap();
assert_eq!(conn.type_name(), "kafka");
}
}
} }

View File

@ -112,9 +112,15 @@ pub mod test_utils {
use futures::{StreamExt, TryStreamExt}; use futures::{StreamExt, TryStreamExt};
use time::{Time, TimeProvider}; use time::{Time, TimeProvider};
use trace::{ctx::SpanContext, RingBufferTraceCollector, TraceCollector}; use trace::{ctx::SpanContext, RingBufferTraceCollector, TraceCollector};
use uuid::Uuid;
use super::{WriteBufferError, WriteBufferReading, WriteBufferWriting}; use super::{WriteBufferError, WriteBufferReading, WriteBufferWriting};
/// Generated random topic name for testing.
pub fn random_topic_name() -> String {
format!("test_topic_{}", Uuid::new_v4())
}
/// Adapter to make a concrete write buffer implementation work w/ [`perform_generic_tests`]. /// Adapter to make a concrete write buffer implementation work w/ [`perform_generic_tests`].
#[async_trait] #[async_trait]
pub trait TestAdapter: Send + Sync { pub trait TestAdapter: Send + Sync {

View File

@ -748,7 +748,6 @@ pub mod test_utils {
use std::{collections::BTreeMap, time::Duration}; use std::{collections::BTreeMap, time::Duration};
use rdkafka::admin::{AdminOptions, AlterConfig, ResourceSpecifier}; use rdkafka::admin::{AdminOptions, AlterConfig, ResourceSpecifier};
use uuid::Uuid;
use super::admin_client; use super::admin_client;
@ -829,11 +828,6 @@ pub mod test_utils {
let result = results.pop().expect("just checked the vector length"); let result = results.pop().expect("just checked the vector length");
result.unwrap(); result.unwrap();
} }
/// Generated random topic name for testing.
pub fn random_kafka_topic() -> String {
format!("test_topic_{}", Uuid::new_v4())
}
} }
/// Kafka tests (only run when in integration test mode and kafka is running). /// Kafka tests (only run when in integration test mode and kafka is running).
@ -850,11 +844,11 @@ mod tests {
use crate::codec::HEADER_CONTENT_TYPE; use crate::codec::HEADER_CONTENT_TYPE;
use crate::{ use crate::{
core::test_utils::random_topic_name,
core::test_utils::{ core::test_utils::{
map_pop_first, perform_generic_tests, set_pop_first, write as write_to_writer, map_pop_first, perform_generic_tests, set_pop_first, write as write_to_writer,
TestAdapter, TestContext, TestAdapter, TestContext,
}, },
kafka::test_utils::random_kafka_topic,
maybe_skip_kafka_integration, maybe_skip_kafka_integration,
}; };
@ -881,7 +875,7 @@ mod tests {
) -> Self::Context { ) -> Self::Context {
KafkaTestContext { KafkaTestContext {
conn: self.conn.clone(), conn: self.conn.clone(),
database_name: random_kafka_topic(), database_name: random_topic_name(),
server_id_counter: AtomicU32::new(1), server_id_counter: AtomicU32::new(1),
n_sequencers, n_sequencers,
time_provider, time_provider,
@ -964,7 +958,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn topic_create_twice() { async fn topic_create_twice() {
let conn = maybe_skip_kafka_integration!(); let conn = maybe_skip_kafka_integration!();
let database_name = random_kafka_topic(); let database_name = random_topic_name();
create_kafka_topic( create_kafka_topic(
&conn, &conn,

View File

@ -12,5 +12,8 @@ pub(crate) mod codec;
pub mod config; pub mod config;
pub mod core; pub mod core;
pub mod file; pub mod file;
#[cfg(feature = "kafka")]
pub mod kafka; pub mod kafka;
pub mod mock; pub mod mock;