diff --git a/Cargo.lock b/Cargo.lock index d78dcedd8c..dc3c3c3be8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2686,6 +2686,18 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "292a948cd991e376cf75541fe5b97a1081d713c618b4f1b9500f8844e49eb565" +[[package]] +name = "libz-sys" +version = "1.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9702761c3935f8cc2f101793272e202c72b99da8f4224a19ddcf1279a6450bbf" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.0.46" @@ -3104,6 +3116,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf5395665662ef45796a4ff5486c5d41d29e0c09640af4c5f17fd94ee2c119c9" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0498641e53dd6ac1a4f22547548caa6864cc4933784319cd1775271c5a46ce" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "object" version = "0.29.0" @@ -3520,6 +3553,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" + [[package]] name = "pprof" version = "0.10.1" @@ -3623,6 +3662,17 @@ dependencies = [ "syn", ] +[[package]] +name = "proc-macro-crate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eda0fc3b0fb7c975631757e14d9049da17374063edb6ebbcbc54d880d4fe94e9" +dependencies = [ + "once_cell", + "thiserror", + "toml", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -4017,6 +4067,35 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "rdkafka" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1de127f294f2dba488ed46760b129d5ecbeabbd337ccbf3739cb29d50db2161c" +dependencies = [ + "futures", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.2.0+1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e542c6863b04ce0fa0c5719bc6b7b348cf8dd21af1bb03c9db5f9805b2a6473" +dependencies = [ + "libc", + "libz-sys", + "num_enum", + "pkg-config", +] + [[package]] name = "read_buffer" version = "0.1.0" @@ -5632,6 +5711,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "version_check" version = "0.9.4" @@ -5981,6 +6066,7 @@ dependencies = [ "parking_lot 0.12.1", "pin-project", "prost 0.11.0", + "rdkafka", "rskafka", "schema", "tempfile", diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml index 5bab0fe55b..bfc9fcc7b1 100644 --- a/write_buffer/Cargo.toml +++ b/write_buffer/Cargo.toml @@ -22,6 +22,7 @@ observability_deps = { path = "../observability_deps" } parking_lot = "0.12" pin-project = "1.0" prost = "0.11" +rdkafka = "0.28.0" rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="3208e4742f08048bbab4e8fc4e0a775507fe3e66", default-features = false, features = ["compression-snappy", "transport-socks5"] } schema = { path = "../schema" } tokio = { version = "1.21", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] } diff --git a/write_buffer/src/config.rs b/write_buffer/src/config.rs index 1d02e60cc9..a716a56ed5 100644 --- a/write_buffer/src/config.rs +++ b/write_buffer/src/config.rs @@ -1,7 +1,7 @@ use crate::{ core::{WriteBufferError, WriteBufferReading, WriteBufferWriting}, file::{FileBufferConsumer, FileBufferProducer}, - kafka::{RSKafkaConsumer, RSKafkaProducer}, + kafka::RSKafkaConsumer, mock::{ MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting, MockBufferForWritingThatAlwaysErrors, MockBufferSharedState, @@ -152,8 +152,8 @@ impl WriteBufferConfigFactory { pub async fn new_config_write( &self, db_name: &str, - partitions: Option>, - trace_collector: Option<&Arc>, + _partitions: Option>, + _trace_collector: Option<&Arc>, cfg: &WriteBufferConnection, ) -> Result, WriteBufferError> { let writer = match &cfg.type_[..] { @@ -168,20 +168,7 @@ impl WriteBufferConfigFactory { .await?; Arc::new(file_buffer) as _ } - "kafka" => { - let rskafa_buffer = RSKafkaProducer::new( - cfg.connection.clone(), - db_name.to_owned(), - &cfg.connection_config, - Arc::clone(&self.time_provider), - cfg.creation_config.as_ref(), - partitions, - trace_collector.map(Arc::clone), - &*self.metric_registry, - ) - .await?; - Arc::new(rskafa_buffer) as _ - } + "kafka" => self.kafka_buffer_producer(db_name, cfg).await?, "mock" => match self.get_mock(&cfg.connection)? { Mock::Normal(state) => { let mock_buffer = MockBufferForWriting::new( @@ -204,6 +191,24 @@ impl WriteBufferConfigFactory { Ok(writer) } + async fn kafka_buffer_producer( + &self, + db_name: &str, + cfg: &WriteBufferConnection, + ) -> Result, WriteBufferError> { + let kafka_buffer = crate::kafka::rdkafka::KafkaBufferProducer::new( + &cfg.connection, + db_name, + &cfg.connection_config, + cfg.creation_config.as_ref(), + Arc::clone(&self.time_provider), + &self.metric_registry, + ) + .await?; + + Ok(Arc::new(kafka_buffer) as _) + } + /// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`] pub async fn new_config_read( &self, diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index 04d08faa13..afbe16d82e 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -80,6 +80,15 @@ impl From for WriteBufferError { } } +impl From for WriteBufferError { + fn from(e: rdkafka::error::KafkaError) -> Self { + Self { + inner: Box::new(e), + kind: WriteBufferErrorKind::IO, + } + } +} + impl From for WriteBufferError { fn from(e: rskafka::client::error::Error) -> Self { Self { diff --git a/write_buffer/src/kafka/instrumentation.rs b/write_buffer/src/kafka/instrumentation.rs index 283dc281fd..4c29cc8ab7 100644 --- a/write_buffer/src/kafka/instrumentation.rs +++ b/write_buffer/src/kafka/instrumentation.rs @@ -36,6 +36,7 @@ pub struct KafkaProducerMetrics

{ impl KafkaProducerMetrics { /// Decorate the specified [`ProducerClient`] implementation with an /// instrumentation layer. + #[allow(dead_code)] pub fn new( client: Box, kafka_topic_name: String, diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index 99aad39d1f..c33f65c541 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -1,24 +1,17 @@ -use self::{ - config::{ClientConfig, ConsumerConfig, ProducerConfig, TopicCreationConfig}, - instrumentation::KafkaProducerMetrics, - record_aggregator::RecordAggregator, -}; +use self::config::{ClientConfig, ConsumerConfig, TopicCreationConfig}; use crate::{ codec::IoxHeaders, config::WriteBufferCreationConfig, - core::{ - WriteBufferError, WriteBufferErrorKind, WriteBufferReading, WriteBufferStreamHandler, - WriteBufferWriting, - }, + core::{WriteBufferError, WriteBufferErrorKind, WriteBufferReading, WriteBufferStreamHandler}, }; use async_trait::async_trait; use data_types::{Sequence, SequenceNumber, ShardIndex}; -use dml::{DmlMeta, DmlOperation}; +use dml::DmlOperation; use futures::{ stream::{self, BoxStream}, StreamExt, TryStreamExt, }; -use iox_time::{Time, TimeProvider}; +use iox_time::Time; use observability_deps::tracing::warn; use parking_lot::Mutex; use rskafka::{ @@ -26,7 +19,6 @@ use rskafka::{ consumer::{StartOffset, StreamConsumerBuilder}, error::{Error as RSKafkaError, ProtocolError}, partition::{OffsetAt, PartitionClient, UnknownTopicHandling}, - producer::{BatchProducer, BatchProducerBuilder}, ClientBuilder, }, record::RecordAndOffset, @@ -43,112 +35,13 @@ use trace::TraceCollector; mod config; mod instrumentation; -mod record_aggregator; +pub(crate) mod rdkafka; /// Maximum number of jobs buffered and decoded concurrently. const CONCURRENT_DECODE_JOBS: usize = 10; type Result = std::result::Result; -#[derive(Debug)] -pub struct RSKafkaProducer { - producers: BTreeMap>, -} - -impl RSKafkaProducer { - #[allow(clippy::too_many_arguments)] - pub async fn new<'a>( - conn: String, - topic_name: String, - connection_config: &'a BTreeMap, - time_provider: Arc, - creation_config: Option<&'a WriteBufferCreationConfig>, - partitions: Option>, - _trace_collector: Option>, - metric_registry: &'a metric::Registry, - ) -> Result { - let partition_clients = setup_topic( - conn, - topic_name.clone(), - connection_config, - creation_config, - partitions, - ) - .await?; - - let producer_config = ProducerConfig::try_from(connection_config)?; - - let producers = partition_clients - .into_iter() - .map(|(shard_index, partition_client)| { - // Instrument this kafka partition client. - let partition_client = KafkaProducerMetrics::new( - Box::new(partition_client), - topic_name.clone(), - shard_index, - metric_registry, - ); - - let mut producer_builder = - BatchProducerBuilder::new_with_client(Arc::new(partition_client)); - if let Some(linger) = producer_config.linger { - producer_builder = producer_builder.with_linger(linger); - } - let producer = producer_builder.build(RecordAggregator::new( - shard_index, - producer_config.max_batch_size, - Arc::clone(&time_provider), - )); - - (shard_index, producer) - }) - .collect(); - - Ok(Self { producers }) - } -} - -#[async_trait] -impl WriteBufferWriting for RSKafkaProducer { - fn shard_indexes(&self) -> BTreeSet { - self.producers.keys().copied().collect() - } - - async fn store_operation( - &self, - shard_index: ShardIndex, - operation: DmlOperation, - ) -> Result { - // Sanity check to ensure only partitioned writes are pushed into Kafka. - if let DmlOperation::Write(w) = &operation { - assert!( - w.partition_key().is_some(), - "enqueuing unpartitioned write into kafka" - ) - } - - let producer = self - .producers - .get(&shard_index) - .ok_or_else::(|| { - format!("Unknown shard index: {}", shard_index).into() - })?; - - Ok(producer.produce(operation).await?) - } - - async fn flush(&self) -> Result<(), WriteBufferError> { - for producer in self.producers.values() { - producer.flush().await?; - } - Ok(()) - } - - fn type_name(&self) -> &'static str { - "kafka" - } -} - #[derive(Debug)] pub struct RSKafkaStreamHandler { partition_client: Arc, @@ -525,14 +418,17 @@ async fn setup_topic( mod tests { use super::*; use crate::{ - core::test_utils::{ - assert_span_context_eq_or_linked, perform_generic_tests, random_topic_name, - set_pop_first, TestAdapter, TestContext, + core::{ + test_utils::{ + assert_span_context_eq_or_linked, perform_generic_tests, random_topic_name, + set_pop_first, TestAdapter, TestContext, + }, + WriteBufferWriting, }, maybe_skip_kafka_integration, }; use data_types::{DeletePredicate, PartitionKey, TimestampRange}; - use dml::{test_util::assert_write_op_eq, DmlDelete, DmlWrite}; + use dml::{test_util::assert_write_op_eq, DmlDelete, DmlMeta, DmlWrite}; use futures::{stream::FuturesUnordered, TryStreamExt}; use iox_time::TimeProvider; use rskafka::{client::partition::Compression, record::Record}; @@ -595,19 +491,17 @@ mod tests { #[async_trait] impl TestContext for RSKafkaTestContext { - type Writing = RSKafkaProducer; + type Writing = rdkafka::KafkaBufferProducer; type Reading = RSKafkaConsumer; async fn writing(&self, creation_config: bool) -> Result { - RSKafkaProducer::new( + rdkafka::KafkaBufferProducer::new( self.conn.clone(), self.topic_name.clone(), &BTreeMap::default(), - Arc::clone(&self.time_provider), self.creation_config(creation_config).as_ref(), - None, - Some(self.trace_collector() as Arc<_>), + Arc::clone(&self.time_provider), &self.metrics, ) .await @@ -850,9 +744,9 @@ mod tests { .unwrap(); } - async fn write( + async fn write( namespace: &str, - producer: &RSKafkaProducer, + producer: &T, trace_collector: &Arc, shard_index: ShardIndex, partition_key: impl Into + Send, @@ -869,9 +763,9 @@ mod tests { producer.store_operation(shard_index, op).await.unwrap() } - async fn delete( + async fn delete( namespace: &str, - producer: &RSKafkaProducer, + producer: &T, trace_collector: &Arc, shard_index: ShardIndex, ) -> DmlMeta { diff --git a/write_buffer/src/kafka/rdkafka.rs b/write_buffer/src/kafka/rdkafka.rs new file mode 100644 index 0000000000..33ca8a74f7 --- /dev/null +++ b/write_buffer/src/kafka/rdkafka.rs @@ -0,0 +1,404 @@ +use crate::{ + codec::{ContentType, IoxHeaders}, + core::{WriteBufferError, WriteBufferWriting}, + kafka::WriteBufferCreationConfig, +}; +use async_trait::async_trait; +use data_types::{Sequence, SequenceNumber, ShardIndex}; +use dml::{DmlMeta, DmlOperation}; +use iox_time::{Time, TimeProvider}; +use metric::{Attributes, DurationHistogram, Metric}; +use observability_deps::tracing::{debug, info}; +use rdkafka::{ + admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}, + client::DefaultClientContext, + consumer::{BaseConsumer, Consumer}, + error::KafkaError, + message::{Headers, OwnedHeaders}, + producer::{FutureProducer, FutureRecord, Producer}, + types::RDKafkaErrorCode, + util::Timeout, + ClientConfig, +}; +use std::{ + collections::{BTreeMap, BTreeSet}, + num::NonZeroU32, + sync::Arc, + time::Duration, +}; + +/// Default timeout supplied to rdkafka client for kafka operations. +/// +/// Chosen to be a value less than the default gRPC timeout (30 +/// seconds) so we can detect kafka errors and return them prior to +/// the gRPC requests to IOx timing out. +/// +/// More context in +/// +const KAFKA_OPERATION_TIMEOUT_MS: u64 = 20_000; + +impl From<&IoxHeaders> for OwnedHeaders { + fn from(iox_headers: &IoxHeaders) -> Self { + let mut res = Self::new(); + + for (header, value) in iox_headers.headers() { + res = res.add(header, value.as_ref()); + } + + res + } +} + +pub struct KafkaBufferProducer { + conn: String, + database_name: String, + time_provider: Arc, + producer: Arc>, + partitions: BTreeSet, + enqueue: Metric, +} + +// Needed because rdkafka's FutureProducer doesn't impl Debug +impl std::fmt::Debug for KafkaBufferProducer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("KafkaBufferProducer") + .field("conn", &self.conn) + .field("database_name", &self.database_name) + .finish() + } +} + +#[async_trait] +impl WriteBufferWriting for KafkaBufferProducer { + fn shard_indexes(&self) -> BTreeSet { + self.partitions.clone() + } + + /// Send a [`DmlOperation`] to the write buffer using the specified shard index. + async fn store_operation( + &self, + shard_index: ShardIndex, + operation: DmlOperation, + ) -> Result { + // Sanity check to ensure only partitioned writes are pushed into Kafka. + if let DmlOperation::Write(w) = &operation { + assert!( + w.partition_key().is_some(), + "enqueuing unpartitioned write into kafka" + ) + } + + // Only send writes with known shard indexes to Kafka. + if !self.partitions.contains(&shard_index) { + return Err(format!("Unknown shard index: {}", shard_index).into()); + } + + let kafka_partition_id = shard_index.get(); + + let enqueue_start = self.time_provider.now(); + + // truncate milliseconds from timestamps because that's what Kafka supports + let now = operation + .meta() + .producer_ts() + .unwrap_or_else(|| self.time_provider.now()); + + let timestamp_millis = now.date_time().timestamp_millis(); + let timestamp = Time::from_timestamp_millis(timestamp_millis); + + let headers = IoxHeaders::new( + ContentType::Protobuf, + operation.meta().span_context().cloned(), + operation.namespace().to_string(), + ); + + let mut buf = Vec::new(); + crate::codec::encode_operation(&self.database_name, &operation, &mut buf)?; + + // This type annotation is necessary because `FutureRecord` is generic over key type, but + // key is optional and we're not setting a key. `String` is arbitrary. + let record: FutureRecord<'_, String, _> = FutureRecord::to(&self.database_name) + .payload(&buf) + .partition(kafka_partition_id) + .timestamp(timestamp_millis) + .headers((&headers).into()); + let kafka_write_size = estimate_message_size( + record.payload.map(|v| v.as_ref()), + record.key.map(|s| s.as_bytes()), + record.headers.as_ref(), + ); + + debug!(db_name=%self.database_name, kafka_partition_id, size=buf.len(), "writing to kafka"); + + let res = self.producer.send(record, Timeout::Never).await; + + if let Some(delta) = self + .time_provider + .now() + .checked_duration_since(enqueue_start) + { + let result_attr = match &res { + Ok(_) => "success", + Err(_) => "error", + }; + + let attr = Attributes::from([ + ("kafka_partition", shard_index.to_string().into()), + ("kafka_topic", self.database_name.clone().into()), + ("result", result_attr.into()), + ]); + + let recorder = self.enqueue.recorder(attr); + recorder.record(delta); + } + + let (partition, offset) = res.map_err(|(e, _owned_message)| e)?; + + debug!(db_name=%self.database_name, %offset, %partition, size=buf.len(), "wrote to kafka"); + + Ok(DmlMeta::sequenced( + Sequence::new(shard_index, SequenceNumber::new(offset)), + timestamp, + operation.meta().span_context().cloned(), + kafka_write_size, + )) + } + + async fn flush(&self) -> Result<(), WriteBufferError> { + let producer = Arc::clone(&self.producer); + + tokio::task::spawn_blocking(move || { + producer.flush(Timeout::Never); + }) + .await + .expect("subtask failed"); + + Ok(()) + } + + fn type_name(&self) -> &'static str { + "kafka" + } +} + +impl KafkaBufferProducer { + pub async fn new( + conn: impl Into + Send, + database_name: impl Into + Send, + connection_config: &BTreeMap, + creation_config: Option<&WriteBufferCreationConfig>, + time_provider: Arc, + metric_registry: &metric::Registry, + ) -> Result { + let conn = conn.into(); + let database_name = database_name.into(); + + let mut cfg = ClientConfig::new(); + + // these configs can be overwritten + cfg.set("message.timeout.ms", "5000"); + cfg.set("message.max.bytes", "31457280"); + cfg.set("message.send.max.retries", "10"); + cfg.set("queue.buffering.max.kbytes", "31457280"); + cfg.set("request.required.acks", "all"); // equivalent to acks=-1 + cfg.set("compression.type", "snappy"); + cfg.set("statistics.interval.ms", "15000"); + + // user overrides + for (k, v) in connection_config { + cfg.set(k, v); + } + + // these configs are set in stone + cfg.set("bootstrap.servers", &conn); + cfg.set("allow.auto.create.topics", "false"); + + // handle auto-creation + let partitions = + maybe_auto_create_topics(&conn, &database_name, creation_config, &cfg).await?; + + let producer = cfg.create()?; + + let enqueue = metric_registry.register_metric::( + "write_buffer_client_produce_duration", + "duration of time taken to push a set of records to kafka \ + - includes codec, protocol, and network overhead", + ); + + Ok(Self { + conn, + database_name, + time_provider, + producer: Arc::new(producer), + partitions, + enqueue, + }) + } +} + +/// Iterate over the kafka messages +fn header_iter(headers: Option<&H>) -> impl Iterator +where + H: Headers, +{ + headers + .into_iter() + .flat_map(|headers| (0..headers.count()).map(|idx| headers.get(idx).unwrap())) +} + +/// Estimate size of data read from kafka as payload len + key len + headers +fn estimate_message_size( + payload: Option<&[u8]>, + key: Option<&[u8]>, + headers: Option<&H>, +) -> usize +where + H: Headers, +{ + payload.map(|payload| payload.len()).unwrap_or_default() + + key.map(|key| key.len()).unwrap_or_default() + + header_iter(headers) + .map(|(key, value)| key.len() + value.len()) + .sum::() +} + +/// Get Kafka partition IDs (IOx ShardIndexes) for the database-specific Kafka topic. +/// +/// Will return `None` if the topic is unknown and has to be created. +/// +/// This will check that the partition is is non-empty. +async fn get_partitions( + database_name: &str, + cfg: &ClientConfig, +) -> Result>, WriteBufferError> { + let database_name = database_name.to_string(); + let cfg = cfg.clone(); + + let metadata = tokio::task::spawn_blocking(move || { + let probe_consumer: BaseConsumer = cfg.create()?; + + probe_consumer.fetch_metadata( + Some(&database_name), + Duration::from_millis(KAFKA_OPERATION_TIMEOUT_MS), + ) + }) + .await + .expect("subtask failed")?; + + let topic_metadata = metadata.topics().get(0).expect("requested a single topic"); + + match topic_metadata.error() { + None => { + let partitions: BTreeSet<_> = topic_metadata + .partitions() + .iter() + .map(|partition_metdata| ShardIndex::new(partition_metdata.id())) + .collect(); + + if partitions.is_empty() { + Err("Topic exists but has no partitions".to_string().into()) + } else { + Ok(Some(partitions)) + } + } + Some(error_code) => { + let error_code: RDKafkaErrorCode = error_code.into(); + match error_code { + RDKafkaErrorCode::UnknownTopic | RDKafkaErrorCode::UnknownTopicOrPartition => { + // The caller is responsible for creating the topic, so this is somewhat OK. + Ok(None) + } + _ => Err(KafkaError::MetadataFetch(error_code).into()), + } + } + } +} + +fn admin_client(kafka_connection: &str) -> Result, KafkaError> { + let mut cfg = ClientConfig::new(); + cfg.set("bootstrap.servers", kafka_connection); + cfg.set("message.timeout.ms", "5000"); + cfg.create() +} + +/// Create Kafka topic based on the provided configs. +/// +/// This will create a topic with `n_sequencers` Kafka partitions. +/// +/// This will NOT fail if the topic already exists! `maybe_auto_create_topics` will only call this +/// if there are no partitions. Production should always have partitions already created, so +/// `create_kafka_topic` shouldn't run in production and is only for test/dev environments. +async fn create_kafka_topic( + kafka_connection: &str, + database_name: &str, + n_sequencers: NonZeroU32, + cfg: &BTreeMap, +) -> Result<(), WriteBufferError> { + let admin = admin_client(kafka_connection)?; + + let mut topic = NewTopic::new( + database_name, + n_sequencers.get() as i32, + TopicReplication::Fixed(1), + ); + for (k, v) in cfg { + topic = topic.set(k, v); + } + + let opts = AdminOptions::default(); + let mut results = admin.create_topics([&topic], &opts).await?; + assert_eq!(results.len(), 1, "created exactly one topic"); + let result = results.pop().expect("just checked the vector length"); + match result { + Ok(topic) | Err((topic, RDKafkaErrorCode::TopicAlreadyExists)) => { + assert_eq!(topic, database_name); + Ok(()) + } + Err((topic, code)) => { + assert_eq!(topic, database_name); + Err(format!("Cannot create topic '{}': {}", topic, code).into()) + } + } +} + +/// If there are no Kafka partitions, then create a topic. Production should have Kafka partitions +/// created already, so this should only create a topic in test/dev environments. +async fn maybe_auto_create_topics( + kafka_connection: &str, + database_name: &str, + creation_config: Option<&WriteBufferCreationConfig>, + cfg: &ClientConfig, +) -> Result, WriteBufferError> { + const N_TRIES: usize = 10; + + for i in 0..N_TRIES { + if let Some(partitions) = get_partitions(database_name, cfg).await? { + return Ok(partitions); + } + + // debounce after first round + if i > 0 { + info!( + topic=%database_name, + "Topic does not have partitions after creating it, wait a bit and try again." + ); + tokio::time::sleep(Duration::from_millis(250)).await; + } + + if let Some(creation_config) = creation_config { + create_kafka_topic( + kafka_connection, + database_name, + creation_config.n_shards, + &creation_config.options, + ) + .await?; + } else { + return Err("no partitions found and auto-creation not requested" + .to_string() + .into()); + } + } + + Err(format!("Could not auto-create topic after {} tries.", N_TRIES).into()) +} diff --git a/write_buffer/src/kafka/record_aggregator.rs b/write_buffer/src/kafka/record_aggregator.rs deleted file mode 100644 index 23cc68aefd..0000000000 --- a/write_buffer/src/kafka/record_aggregator.rs +++ /dev/null @@ -1,324 +0,0 @@ -use std::sync::Arc; - -use data_types::{Sequence, SequenceNumber, ShardIndex}; -use dml::{DmlMeta, DmlOperation}; -use iox_time::{Time, TimeProvider}; -use observability_deps::tracing::warn; -use rskafka::{ - client::producer::aggregator::{ - Aggregator, Error, RecordAggregator as RecordAggregatorDelegate, - RecordAggregatorStatusDeaggregator, StatusDeaggregator, TryPush, - }, - record::Record, -}; -use trace::ctx::SpanContext; - -use crate::codec::{ContentType, IoxHeaders}; - -/// The [`Tag`] is a data-carrying token identifier used to de-aggregate -/// responses from a batch aggregated of requests using the -/// [`DmlMetaDeaggregator`]. -#[derive(Debug)] -pub struct Tag { - /// The tag into the batch returned by the - /// [`RecordAggregatorDelegate::try_push()`] call. - idx: usize, - - /// The timestamp assigned to the resulting Kafka [`Record`]. - timestamp: Time, - /// A span extracted from the original [`DmlOperation`]. - span_ctx: Option, - /// The approximate byte size of the serialised [`Record`], as calculated by - /// [`Record::approximate_size()`]. - approx_kafka_write_size: usize, -} - -/// A [`RecordAggregator`] implements [rskafka]'s abstract [`Aggregator`] -/// behaviour to provide batching of requests for a single Kafka partition. -/// -/// Specifically the [`RecordAggregator`] maps [`DmlOperation`] instances to -/// Kafka [`Record`] instances, and delegates the batching to the -/// [`RecordAggregatorDelegate`] implementation maintained within [rskafka] -/// itself. -/// -/// [rskafka]: https://github.com/influxdata/rskafka -#[derive(Debug)] -pub struct RecordAggregator { - time_provider: Arc, - - /// The shard index (Kafka partition number) this aggregator batches ops for (from Kafka, - /// not the catalog). - shard_index: ShardIndex, - - /// The underlying record aggregator the non-IOx-specific batching is - /// delegated to. - aggregator: RecordAggregatorDelegate, -} - -impl RecordAggregator { - /// Initialise a new [`RecordAggregator`] to aggregate up to - /// `max_batch_size` number of bytes per message. - pub fn new( - shard_index: ShardIndex, - max_batch_size: usize, - time_provider: Arc, - ) -> Self { - Self { - shard_index, - aggregator: RecordAggregatorDelegate::new(max_batch_size), - time_provider, - } - } -} - -impl RecordAggregator { - /// Serialise the [`DmlOperation`] destined for the specified `db_name` into a - /// [`Record`], returning the producer timestamp assigned to it. - fn to_record(&self, op: &DmlOperation) -> Result<(Record, Time), Error> { - let now = op - .meta() - .producer_ts() - .unwrap_or_else(|| self.time_provider.now()); - - let headers = IoxHeaders::new( - ContentType::Protobuf, - op.meta().span_context().cloned(), - op.namespace().to_owned(), - ); - - let mut buf = Vec::new(); - crate::codec::encode_operation(op.namespace(), op, &mut buf)?; - buf.shrink_to_fit(); - - let record = Record { - key: None, - value: Some(buf), - headers: headers - .headers() - .map(|(k, v)| (k.to_owned(), v.as_bytes().to_vec())) - .collect(), - timestamp: now.date_time(), - }; - - Ok((record, now)) - } -} - -impl Aggregator for RecordAggregator { - type Input = DmlOperation; - type Tag = ::Tag; - type StatusDeaggregator = DmlMetaDeaggregator; - - /// Callers should retain the returned [`Tag`] in order to de-aggregate the - /// [`DmlMeta`] from the request response. - fn try_push(&mut self, op: Self::Input) -> Result, Error> { - // Encode the DML op to a Record - let (record, timestamp) = self.to_record(&op)?; - - // Capture various metadata necessary to construct the Tag/DmlMeta for - // the caller once a batch has been flushed. - let span_ctx = op.meta().span_context().cloned(); - let approx_kafka_write_size = record.approximate_size(); - - // And delegate batching to rskafka's RecordAggregator implementation - Ok(match self.aggregator.try_push(record)? { - // NoCapacity returns the original input to the caller when the - // batching fails. - // - // The RecordBatcher delegate is returning the Record encoded from - // op above, but the caller of this fn is expecting the original op. - // - // Map to the original input op this fn was called with, discarding - // the encoded Record. - TryPush::NoCapacity(_) => { - // Log a warning if this occurs - this allows an operator to - // increase the maximum Kafka message size, or lower the linger - // time to minimise latency while still producing large enough - // batches for it to be worth while. - warn!("aggregated batch reached maximum capacity"); - TryPush::NoCapacity(op) - } - - // A successful delegate aggregation returns the tag for offset - // de-aggregation later. For simplicity, the tag this layer returns - // also carries the various (small) metadata elements needed to - // construct the DmlMeta at the point of de-aggregation. - TryPush::Aggregated(idx) => TryPush::Aggregated(Tag { - idx, - timestamp, - span_ctx, - approx_kafka_write_size, - }), - }) - } - - fn flush(&mut self) -> Result<(Vec, Self::StatusDeaggregator), Error> { - let records = self.aggregator.flush()?.0; - Ok((records, DmlMetaDeaggregator::new(self.shard_index))) - } -} - -/// The de-aggregation half of the [`RecordAggregator`], this type consumes the -/// caller's [`Tag`] obtained from the aggregator to return the corresponding -/// [`DmlMeta`] from the batched response. -/// -/// The [`DmlMetaDeaggregator`] is a stateless wrapper over the (also stateless) -/// [`RecordAggregatorStatusDeaggregator`] delegate, with most of the metadata -/// elements carried in the [`Tag`] itself. -#[derive(Debug)] -pub struct DmlMetaDeaggregator { - shard_index: ShardIndex, -} - -impl DmlMetaDeaggregator { - pub fn new(shard_index: ShardIndex) -> Self { - Self { shard_index } - } -} - -impl StatusDeaggregator for DmlMetaDeaggregator { - type Status = DmlMeta; - type Tag = Tag; - - fn deaggregate(&self, input: &[i64], tag: Self::Tag) -> Result { - // Delegate de-aggregation to the (stateless) record batch - // de-aggregator for forwards compatibility. - let offset = RecordAggregatorStatusDeaggregator::default() - .deaggregate(input, tag.idx) - .expect("invalid de-aggregation index"); - - Ok(DmlMeta::sequenced( - Sequence::new(self.shard_index, SequenceNumber::new(offset)), - tag.timestamp, - tag.span_ctx, - tag.approx_kafka_write_size, - )) - } -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use dml::DmlWrite; - use hashbrown::HashMap; - use iox_time::MockProvider; - use mutable_batch::{writer::Writer, MutableBatch}; - use trace::LogTraceCollector; - - use crate::codec::{ - CONTENT_TYPE_PROTOBUF, HEADER_CONTENT_TYPE, HEADER_NAMESPACE, HEADER_TRACE_CONTEXT, - }; - - use super::*; - - const NAMESPACE: &str = "bananas"; - const SHARD_INDEX: ShardIndex = ShardIndex::new(42); - const TIMESTAMP_MILLIS: i64 = 1659990497000; - - fn test_op() -> DmlOperation { - let mut batch = MutableBatch::new(); - let mut writer = Writer::new(&mut batch, 1); - writer - // Date: "1970-01-01" - .write_time("time", [42].into_iter()) - .unwrap(); - writer - .write_i64("A", Some(&[0b00000001]), [1].into_iter()) - .unwrap(); - writer.commit(); - - let mut m = HashMap::default(); - m.insert("table".to_string(), batch); - - let span = SpanContext::new(Arc::new(LogTraceCollector::new())); - - DmlOperation::Write(DmlWrite::new( - NAMESPACE.to_string(), - m, - Some("1970-01-01".into()), - DmlMeta::unsequenced(Some(span)), - )) - } - - #[test] - fn test_record_aggregate() { - let clock = Arc::new(MockProvider::new(Time::from_timestamp_millis( - TIMESTAMP_MILLIS, - ))); - let mut agg = RecordAggregator::new(SHARD_INDEX, usize::MAX, clock); - let write = test_op(); - - let res = agg.try_push(write).expect("aggregate call should succeed"); - let tag = match res { - TryPush::NoCapacity(_) => panic!("unexpected no capacity"), - TryPush::Aggregated(tag) => tag, - }; - - // Flush the aggregator to acquire the records - let (records, deagg) = agg.flush().expect("should flush"); - assert_eq!(records.len(), 1); - - // Another flush should not yield the same records - let (records2, _) = agg.flush().expect("should flush"); - assert!(records2.is_empty()); - - // Assert properties of the resulting record - let record = records[0].clone(); - assert_eq!(record.key, None); - assert!(record.value.is_some()); - assert_eq!( - *record - .headers - .get(HEADER_CONTENT_TYPE) - .expect("no content type"), - Vec::::from(CONTENT_TYPE_PROTOBUF), - ); - assert_eq!( - *record - .headers - .get(HEADER_NAMESPACE) - .expect("no namespace header"), - Vec::::from(NAMESPACE), - ); - assert!(record.headers.get(HEADER_TRACE_CONTEXT).is_some()); - assert_eq!(record.timestamp.timestamp(), 1659990497); - - // Extract the DmlMeta from the de-aggregator - let got = deagg - .deaggregate(&[4242], tag) - .expect("de-aggregate should succeed"); - - // Assert the metadata properties - assert!(got.span_context().is_some()); - assert_eq!( - *got.sequence().expect("should be sequenced"), - Sequence::new(SHARD_INDEX, SequenceNumber::new(4242)) - ); - assert_eq!( - got.producer_ts().expect("no producer timestamp"), - Time::from_timestamp_millis(TIMESTAMP_MILLIS) - ); - assert_eq!( - got.bytes_read().expect("no approx size"), - record.approximate_size() - ); - } - - #[test] - fn test_record_aggregate_no_capacity() { - let clock = Arc::new(MockProvider::new(Time::from_timestamp_millis( - TIMESTAMP_MILLIS, - ))); - let mut agg = RecordAggregator::new(SHARD_INDEX, usize::MIN, clock); - let write = test_op(); - - let res = agg - .try_push(write.clone()) - .expect("aggregate call should succeed"); - match res { - TryPush::NoCapacity(res) => assert_eq!(res.namespace(), write.namespace()), - TryPush::Aggregated(_) => panic!("expected no capacity"), - }; - } -}