diff --git a/Cargo.lock b/Cargo.lock index 8e1a043bc6..83eb93002d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2858,18 +2858,6 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "292a948cd991e376cf75541fe5b97a1081d713c618b4f1b9500f8844e49eb565" -[[package]] -name = "libz-sys" -version = "1.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9702761c3935f8cc2f101793272e202c72b99da8f4224a19ddcf1279a6450bbf" -dependencies = [ - "cc", - "libc", - "pkg-config", - "vcpkg", -] - [[package]] name = "link-cplusplus" version = "1.0.7" @@ -3297,27 +3285,6 @@ dependencies = [ "libc", ] -[[package]] -name = "num_enum" -version = "0.5.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf5395665662ef45796a4ff5486c5d41d29e0c09640af4c5f17fd94ee2c119c9" -dependencies = [ - "num_enum_derive", -] - -[[package]] -name = "num_enum_derive" -version = "0.5.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b0498641e53dd6ac1a4f22547548caa6864cc4933784319cd1775271c5a46ce" -dependencies = [ - "proc-macro-crate", - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "object" version = "0.29.0" @@ -3843,17 +3810,6 @@ dependencies = [ "syn", ] -[[package]] -name = "proc-macro-crate" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eda0fc3b0fb7c975631757e14d9049da17374063edb6ebbcbc54d880d4fe94e9" -dependencies = [ - "once_cell", - "thiserror", - "toml", -] - [[package]] name = "proc-macro-error" version = "1.0.4" @@ -4248,35 +4204,6 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "rdkafka" -version = "0.28.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1de127f294f2dba488ed46760b129d5ecbeabbd337ccbf3739cb29d50db2161c" -dependencies = [ - "futures", - "libc", - "log", - "rdkafka-sys", - "serde", - "serde_derive", - "serde_json", - "slab", - "tokio", -] - -[[package]] -name = "rdkafka-sys" -version = "4.2.0+1.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e542c6863b04ce0fa0c5719bc6b7b348cf8dd21af1bb03c9db5f9805b2a6473" -dependencies = [ - "libc", - "libz-sys", - "num_enum", - "pkg-config", -] - [[package]] name = "read_buffer" version = "0.1.0" @@ -5891,12 +5818,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" -[[package]] -name = "vcpkg" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" - [[package]] name = "version_check" version = "0.9.4" @@ -6248,7 +6169,6 @@ dependencies = [ "parking_lot 0.12.1", "pin-project", "prost 0.11.0", - "rdkafka", "rskafka", "schema", "tempfile", diff --git a/clap_blocks/src/write_buffer.rs b/clap_blocks/src/write_buffer.rs index 9747ec85a3..d2bfdbb714 100644 --- a/clap_blocks/src/write_buffer.rs +++ b/clap_blocks/src/write_buffer.rs @@ -202,40 +202,4 @@ mod tests { ]); assert_eq!(actual, expected); } - - #[test] - fn test_k8s_idpe_connection_config() { - let cfg = WriteBufferConfig::try_parse_from([ - "my_binary", - "--write-buffer", - "kafka", - "--write-buffer-addr", - "localhost:1234", - "--write-buffer-connection-config", - "max_message_size=10485760,\ - producer_max_batch_size=2621440,\ - consumer_min_batch_size=1048576,\ - consumer_max_batch_size=5242880,\ - consumer_max_wait_ms=10", - ]) - .unwrap(); - let actual = cfg.connection_config(); - let expected = BTreeMap::from([ - ( - String::from("consumer_max_batch_size"), - String::from("5242880"), - ), - (String::from("consumer_max_wait_ms"), String::from("10")), - ( - String::from("consumer_min_batch_size"), - String::from("1048576"), - ), - (String::from("max_message_size"), String::from("10485760")), - ( - String::from("producer_max_batch_size"), - String::from("2621440"), - ), - ]); - assert_eq!(actual, expected); - } } diff --git a/write_buffer/Cargo.toml b/write_buffer/Cargo.toml index bfc9fcc7b1..5bab0fe55b 100644 --- a/write_buffer/Cargo.toml +++ b/write_buffer/Cargo.toml @@ -22,7 +22,6 @@ observability_deps = { path = "../observability_deps" } parking_lot = "0.12" pin-project = "1.0" prost = "0.11" -rdkafka = "0.28.0" rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="3208e4742f08048bbab4e8fc4e0a775507fe3e66", default-features = false, features = ["compression-snappy", "transport-socks5"] } schema = { path = "../schema" } tokio = { version = "1.21", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] } diff --git a/write_buffer/src/config.rs b/write_buffer/src/config.rs index a716a56ed5..1d02e60cc9 100644 --- a/write_buffer/src/config.rs +++ b/write_buffer/src/config.rs @@ -1,7 +1,7 @@ use crate::{ core::{WriteBufferError, WriteBufferReading, WriteBufferWriting}, file::{FileBufferConsumer, FileBufferProducer}, - kafka::RSKafkaConsumer, + kafka::{RSKafkaConsumer, RSKafkaProducer}, mock::{ MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting, MockBufferForWritingThatAlwaysErrors, MockBufferSharedState, @@ -152,8 +152,8 @@ impl WriteBufferConfigFactory { pub async fn new_config_write( &self, db_name: &str, - _partitions: Option>, - _trace_collector: Option<&Arc>, + partitions: Option>, + trace_collector: Option<&Arc>, cfg: &WriteBufferConnection, ) -> Result, WriteBufferError> { let writer = match &cfg.type_[..] { @@ -168,7 +168,20 @@ impl WriteBufferConfigFactory { .await?; Arc::new(file_buffer) as _ } - "kafka" => self.kafka_buffer_producer(db_name, cfg).await?, + "kafka" => { + let rskafa_buffer = RSKafkaProducer::new( + cfg.connection.clone(), + db_name.to_owned(), + &cfg.connection_config, + Arc::clone(&self.time_provider), + cfg.creation_config.as_ref(), + partitions, + trace_collector.map(Arc::clone), + &*self.metric_registry, + ) + .await?; + Arc::new(rskafa_buffer) as _ + } "mock" => match self.get_mock(&cfg.connection)? { Mock::Normal(state) => { let mock_buffer = MockBufferForWriting::new( @@ -191,24 +204,6 @@ impl WriteBufferConfigFactory { Ok(writer) } - async fn kafka_buffer_producer( - &self, - db_name: &str, - cfg: &WriteBufferConnection, - ) -> Result, WriteBufferError> { - let kafka_buffer = crate::kafka::rdkafka::KafkaBufferProducer::new( - &cfg.connection, - db_name, - &cfg.connection_config, - cfg.creation_config.as_ref(), - Arc::clone(&self.time_provider), - &self.metric_registry, - ) - .await?; - - Ok(Arc::new(kafka_buffer) as _) - } - /// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`] pub async fn new_config_read( &self, diff --git a/write_buffer/src/core.rs b/write_buffer/src/core.rs index afbe16d82e..04d08faa13 100644 --- a/write_buffer/src/core.rs +++ b/write_buffer/src/core.rs @@ -80,15 +80,6 @@ impl From for WriteBufferError { } } -impl From for WriteBufferError { - fn from(e: rdkafka::error::KafkaError) -> Self { - Self { - inner: Box::new(e), - kind: WriteBufferErrorKind::IO, - } - } -} - impl From for WriteBufferError { fn from(e: rskafka::client::error::Error) -> Self { Self { diff --git a/write_buffer/src/kafka/instrumentation.rs b/write_buffer/src/kafka/instrumentation.rs index 4c29cc8ab7..283dc281fd 100644 --- a/write_buffer/src/kafka/instrumentation.rs +++ b/write_buffer/src/kafka/instrumentation.rs @@ -36,7 +36,6 @@ pub struct KafkaProducerMetrics

{ impl KafkaProducerMetrics { /// Decorate the specified [`ProducerClient`] implementation with an /// instrumentation layer. - #[allow(dead_code)] pub fn new( client: Box, kafka_topic_name: String, diff --git a/write_buffer/src/kafka/mod.rs b/write_buffer/src/kafka/mod.rs index c33f65c541..99aad39d1f 100644 --- a/write_buffer/src/kafka/mod.rs +++ b/write_buffer/src/kafka/mod.rs @@ -1,17 +1,24 @@ -use self::config::{ClientConfig, ConsumerConfig, TopicCreationConfig}; +use self::{ + config::{ClientConfig, ConsumerConfig, ProducerConfig, TopicCreationConfig}, + instrumentation::KafkaProducerMetrics, + record_aggregator::RecordAggregator, +}; use crate::{ codec::IoxHeaders, config::WriteBufferCreationConfig, - core::{WriteBufferError, WriteBufferErrorKind, WriteBufferReading, WriteBufferStreamHandler}, + core::{ + WriteBufferError, WriteBufferErrorKind, WriteBufferReading, WriteBufferStreamHandler, + WriteBufferWriting, + }, }; use async_trait::async_trait; use data_types::{Sequence, SequenceNumber, ShardIndex}; -use dml::DmlOperation; +use dml::{DmlMeta, DmlOperation}; use futures::{ stream::{self, BoxStream}, StreamExt, TryStreamExt, }; -use iox_time::Time; +use iox_time::{Time, TimeProvider}; use observability_deps::tracing::warn; use parking_lot::Mutex; use rskafka::{ @@ -19,6 +26,7 @@ use rskafka::{ consumer::{StartOffset, StreamConsumerBuilder}, error::{Error as RSKafkaError, ProtocolError}, partition::{OffsetAt, PartitionClient, UnknownTopicHandling}, + producer::{BatchProducer, BatchProducerBuilder}, ClientBuilder, }, record::RecordAndOffset, @@ -35,13 +43,112 @@ use trace::TraceCollector; mod config; mod instrumentation; -pub(crate) mod rdkafka; +mod record_aggregator; /// Maximum number of jobs buffered and decoded concurrently. const CONCURRENT_DECODE_JOBS: usize = 10; type Result = std::result::Result; +#[derive(Debug)] +pub struct RSKafkaProducer { + producers: BTreeMap>, +} + +impl RSKafkaProducer { + #[allow(clippy::too_many_arguments)] + pub async fn new<'a>( + conn: String, + topic_name: String, + connection_config: &'a BTreeMap, + time_provider: Arc, + creation_config: Option<&'a WriteBufferCreationConfig>, + partitions: Option>, + _trace_collector: Option>, + metric_registry: &'a metric::Registry, + ) -> Result { + let partition_clients = setup_topic( + conn, + topic_name.clone(), + connection_config, + creation_config, + partitions, + ) + .await?; + + let producer_config = ProducerConfig::try_from(connection_config)?; + + let producers = partition_clients + .into_iter() + .map(|(shard_index, partition_client)| { + // Instrument this kafka partition client. + let partition_client = KafkaProducerMetrics::new( + Box::new(partition_client), + topic_name.clone(), + shard_index, + metric_registry, + ); + + let mut producer_builder = + BatchProducerBuilder::new_with_client(Arc::new(partition_client)); + if let Some(linger) = producer_config.linger { + producer_builder = producer_builder.with_linger(linger); + } + let producer = producer_builder.build(RecordAggregator::new( + shard_index, + producer_config.max_batch_size, + Arc::clone(&time_provider), + )); + + (shard_index, producer) + }) + .collect(); + + Ok(Self { producers }) + } +} + +#[async_trait] +impl WriteBufferWriting for RSKafkaProducer { + fn shard_indexes(&self) -> BTreeSet { + self.producers.keys().copied().collect() + } + + async fn store_operation( + &self, + shard_index: ShardIndex, + operation: DmlOperation, + ) -> Result { + // Sanity check to ensure only partitioned writes are pushed into Kafka. + if let DmlOperation::Write(w) = &operation { + assert!( + w.partition_key().is_some(), + "enqueuing unpartitioned write into kafka" + ) + } + + let producer = self + .producers + .get(&shard_index) + .ok_or_else::(|| { + format!("Unknown shard index: {}", shard_index).into() + })?; + + Ok(producer.produce(operation).await?) + } + + async fn flush(&self) -> Result<(), WriteBufferError> { + for producer in self.producers.values() { + producer.flush().await?; + } + Ok(()) + } + + fn type_name(&self) -> &'static str { + "kafka" + } +} + #[derive(Debug)] pub struct RSKafkaStreamHandler { partition_client: Arc, @@ -418,17 +525,14 @@ async fn setup_topic( mod tests { use super::*; use crate::{ - core::{ - test_utils::{ - assert_span_context_eq_or_linked, perform_generic_tests, random_topic_name, - set_pop_first, TestAdapter, TestContext, - }, - WriteBufferWriting, + core::test_utils::{ + assert_span_context_eq_or_linked, perform_generic_tests, random_topic_name, + set_pop_first, TestAdapter, TestContext, }, maybe_skip_kafka_integration, }; use data_types::{DeletePredicate, PartitionKey, TimestampRange}; - use dml::{test_util::assert_write_op_eq, DmlDelete, DmlMeta, DmlWrite}; + use dml::{test_util::assert_write_op_eq, DmlDelete, DmlWrite}; use futures::{stream::FuturesUnordered, TryStreamExt}; use iox_time::TimeProvider; use rskafka::{client::partition::Compression, record::Record}; @@ -491,17 +595,19 @@ mod tests { #[async_trait] impl TestContext for RSKafkaTestContext { - type Writing = rdkafka::KafkaBufferProducer; + type Writing = RSKafkaProducer; type Reading = RSKafkaConsumer; async fn writing(&self, creation_config: bool) -> Result { - rdkafka::KafkaBufferProducer::new( + RSKafkaProducer::new( self.conn.clone(), self.topic_name.clone(), &BTreeMap::default(), - self.creation_config(creation_config).as_ref(), Arc::clone(&self.time_provider), + self.creation_config(creation_config).as_ref(), + None, + Some(self.trace_collector() as Arc<_>), &self.metrics, ) .await @@ -744,9 +850,9 @@ mod tests { .unwrap(); } - async fn write( + async fn write( namespace: &str, - producer: &T, + producer: &RSKafkaProducer, trace_collector: &Arc, shard_index: ShardIndex, partition_key: impl Into + Send, @@ -763,9 +869,9 @@ mod tests { producer.store_operation(shard_index, op).await.unwrap() } - async fn delete( + async fn delete( namespace: &str, - producer: &T, + producer: &RSKafkaProducer, trace_collector: &Arc, shard_index: ShardIndex, ) -> DmlMeta { diff --git a/write_buffer/src/kafka/rdkafka.rs b/write_buffer/src/kafka/rdkafka.rs deleted file mode 100644 index 9e04d2fc12..0000000000 --- a/write_buffer/src/kafka/rdkafka.rs +++ /dev/null @@ -1,452 +0,0 @@ -use crate::{ - codec::{ContentType, IoxHeaders}, - core::{WriteBufferError, WriteBufferWriting}, - kafka::WriteBufferCreationConfig, -}; -use async_trait::async_trait; -use data_types::{Sequence, SequenceNumber, ShardIndex}; -use dml::{DmlMeta, DmlOperation}; -use iox_time::{Time, TimeProvider}; -use metric::{Attributes, DurationHistogram, Metric}; -use observability_deps::tracing::{debug, info}; -use rdkafka::{ - admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}, - client::DefaultClientContext, - consumer::{BaseConsumer, Consumer}, - error::KafkaError, - message::{Headers, OwnedHeaders}, - producer::{FutureProducer, FutureRecord, Producer}, - types::RDKafkaErrorCode, - util::Timeout, - ClientConfig, -}; -use std::{ - collections::{BTreeMap, BTreeSet}, - num::NonZeroU32, - sync::Arc, - time::Duration, -}; - -/// Default timeout supplied to rdkafka client for kafka operations. -/// -/// Chosen to be a value less than the default gRPC timeout (30 -/// seconds) so we can detect kafka errors and return them prior to -/// the gRPC requests to IOx timing out. -/// -/// More context in -/// -const KAFKA_OPERATION_TIMEOUT_MS: u64 = 20_000; - -impl From<&IoxHeaders> for OwnedHeaders { - fn from(iox_headers: &IoxHeaders) -> Self { - let mut res = Self::new(); - - for (header, value) in iox_headers.headers() { - res = res.add(header, value.as_ref()); - } - - res - } -} - -pub struct KafkaBufferProducer { - conn: String, - database_name: String, - time_provider: Arc, - producer: Arc>, - partitions: BTreeSet, - enqueue: Metric, -} - -// Needed because rdkafka's FutureProducer doesn't impl Debug -impl std::fmt::Debug for KafkaBufferProducer { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("KafkaBufferProducer") - .field("conn", &self.conn) - .field("database_name", &self.database_name) - .finish() - } -} - -#[async_trait] -impl WriteBufferWriting for KafkaBufferProducer { - fn shard_indexes(&self) -> BTreeSet { - self.partitions.clone() - } - - /// Send a [`DmlOperation`] to the write buffer using the specified shard index. - async fn store_operation( - &self, - shard_index: ShardIndex, - operation: DmlOperation, - ) -> Result { - // Sanity check to ensure only partitioned writes are pushed into Kafka. - if let DmlOperation::Write(w) = &operation { - assert!( - w.partition_key().is_some(), - "enqueuing unpartitioned write into kafka" - ) - } - - // Only send writes with known shard indexes to Kafka. - if !self.partitions.contains(&shard_index) { - return Err(format!("Unknown shard index: {}", shard_index).into()); - } - - let kafka_partition_id = shard_index.get(); - - let enqueue_start = self.time_provider.now(); - - // truncate milliseconds from timestamps because that's what Kafka supports - let now = operation - .meta() - .producer_ts() - .unwrap_or_else(|| self.time_provider.now()); - - let timestamp_millis = now.date_time().timestamp_millis(); - let timestamp = Time::from_timestamp_millis(timestamp_millis); - - let headers = IoxHeaders::new( - ContentType::Protobuf, - operation.meta().span_context().cloned(), - operation.namespace().to_string(), - ); - - let mut buf = Vec::new(); - crate::codec::encode_operation(&self.database_name, &operation, &mut buf)?; - - // This type annotation is necessary because `FutureRecord` is generic over key type, but - // key is optional and we're not setting a key. `String` is arbitrary. - let record: FutureRecord<'_, String, _> = FutureRecord::to(&self.database_name) - .payload(&buf) - .partition(kafka_partition_id) - .timestamp(timestamp_millis) - .headers((&headers).into()); - let kafka_write_size = estimate_message_size( - record.payload.map(|v| v.as_ref()), - record.key.map(|s| s.as_bytes()), - record.headers.as_ref(), - ); - - debug!(db_name=%self.database_name, kafka_partition_id, size=buf.len(), "writing to kafka"); - - let res = self.producer.send(record, Timeout::Never).await; - - if let Some(delta) = self - .time_provider - .now() - .checked_duration_since(enqueue_start) - { - let result_attr = match &res { - Ok(_) => "success", - Err(_) => "error", - }; - - let attr = Attributes::from([ - ("kafka_partition", shard_index.to_string().into()), - ("kafka_topic", self.database_name.clone().into()), - ("result", result_attr.into()), - ]); - - let recorder = self.enqueue.recorder(attr); - recorder.record(delta); - } - - let (partition, offset) = res.map_err(|(e, _owned_message)| e)?; - - debug!(db_name=%self.database_name, %offset, %partition, size=buf.len(), "wrote to kafka"); - - Ok(DmlMeta::sequenced( - Sequence::new(shard_index, SequenceNumber::new(offset)), - timestamp, - operation.meta().span_context().cloned(), - kafka_write_size, - )) - } - - async fn flush(&self) -> Result<(), WriteBufferError> { - let producer = Arc::clone(&self.producer); - - tokio::task::spawn_blocking(move || { - producer.flush(Timeout::Never); - }) - .await - .expect("subtask failed"); - - Ok(()) - } - - fn type_name(&self) -> &'static str { - "kafka" - } -} - -impl KafkaBufferProducer { - pub async fn new( - conn: impl Into + Send, - database_name: impl Into + Send, - connection_config: &BTreeMap, - creation_config: Option<&WriteBufferCreationConfig>, - time_provider: Arc, - metric_registry: &metric::Registry, - ) -> Result { - let conn = conn.into(); - let database_name = database_name.into(); - - let mut cfg = ClientConfig::new(); - - // these configs can be overwritten - cfg.set("message.timeout.ms", "5000"); - cfg.set("message.max.bytes", "31457280"); - cfg.set("message.send.max.retries", "10"); - cfg.set("queue.buffering.max.kbytes", "31457280"); - cfg.set("request.required.acks", "all"); // equivalent to acks=-1 - cfg.set("compression.type", "snappy"); - cfg.set("statistics.interval.ms", "15000"); - - // user overrides - if let Some(max_batch_size) = connection_config.get("producer_max_batch_size") { - cfg.set("batch.size", max_batch_size); - } - if let Some(linger) = connection_config.get("producer_linger_ms") { - cfg.set("linger.ms", linger); - } - if let Some(max_message_size) = connection_config.get("max_message_size") { - cfg.set("message.max.bytes", max_message_size); - } - - // these configs are set in stone - cfg.set("bootstrap.servers", &conn); - cfg.set("allow.auto.create.topics", "false"); - - // handle auto-creation - let partitions = - maybe_auto_create_topics(&conn, &database_name, creation_config, &cfg).await?; - - let producer = cfg.create()?; - - let enqueue = metric_registry.register_metric::( - "write_buffer_client_produce_duration", - "duration of time taken to push a set of records to kafka \ - - includes codec, protocol, and network overhead", - ); - - Ok(Self { - conn, - database_name, - time_provider, - producer: Arc::new(producer), - partitions, - enqueue, - }) - } -} - -/// Iterate over the kafka messages -fn header_iter(headers: Option<&H>) -> impl Iterator -where - H: Headers, -{ - headers - .into_iter() - .flat_map(|headers| (0..headers.count()).map(|idx| headers.get(idx).unwrap())) -} - -/// Estimate size of data read from kafka as payload len + key len + headers -fn estimate_message_size( - payload: Option<&[u8]>, - key: Option<&[u8]>, - headers: Option<&H>, -) -> usize -where - H: Headers, -{ - payload.map(|payload| payload.len()).unwrap_or_default() - + key.map(|key| key.len()).unwrap_or_default() - + header_iter(headers) - .map(|(key, value)| key.len() + value.len()) - .sum::() -} - -/// Get Kafka partition IDs (IOx ShardIndexes) for the database-specific Kafka topic. -/// -/// Will return `None` if the topic is unknown and has to be created. -/// -/// This will check that the partition is is non-empty. -async fn get_partitions( - database_name: &str, - cfg: &ClientConfig, -) -> Result>, WriteBufferError> { - let database_name = database_name.to_string(); - let cfg = cfg.clone(); - - let metadata = tokio::task::spawn_blocking(move || { - let probe_consumer: BaseConsumer = cfg.create()?; - - probe_consumer.fetch_metadata( - Some(&database_name), - Duration::from_millis(KAFKA_OPERATION_TIMEOUT_MS), - ) - }) - .await - .expect("subtask failed")?; - - let topic_metadata = metadata.topics().get(0).expect("requested a single topic"); - - match topic_metadata.error() { - None => { - let partitions: BTreeSet<_> = topic_metadata - .partitions() - .iter() - .map(|partition_metdata| ShardIndex::new(partition_metdata.id())) - .collect(); - - if partitions.is_empty() { - Err("Topic exists but has no partitions".to_string().into()) - } else { - Ok(Some(partitions)) - } - } - Some(error_code) => { - let error_code: RDKafkaErrorCode = error_code.into(); - match error_code { - RDKafkaErrorCode::UnknownTopic | RDKafkaErrorCode::UnknownTopicOrPartition => { - // The caller is responsible for creating the topic, so this is somewhat OK. - Ok(None) - } - _ => Err(KafkaError::MetadataFetch(error_code).into()), - } - } - } -} - -fn admin_client(kafka_connection: &str) -> Result, KafkaError> { - let mut cfg = ClientConfig::new(); - cfg.set("bootstrap.servers", kafka_connection); - cfg.set("message.timeout.ms", "5000"); - cfg.create() -} - -/// Create Kafka topic based on the provided configs. -/// -/// This will create a topic with `n_sequencers` Kafka partitions. -/// -/// This will NOT fail if the topic already exists! `maybe_auto_create_topics` will only call this -/// if there are no partitions. Production should always have partitions already created, so -/// `create_kafka_topic` shouldn't run in production and is only for test/dev environments. -async fn create_kafka_topic( - kafka_connection: &str, - database_name: &str, - n_sequencers: NonZeroU32, - cfg: &BTreeMap, -) -> Result<(), WriteBufferError> { - let admin = admin_client(kafka_connection)?; - - let mut topic = NewTopic::new( - database_name, - n_sequencers.get() as i32, - TopicReplication::Fixed(1), - ); - for (k, v) in cfg { - topic = topic.set(k, v); - } - - let opts = AdminOptions::default(); - let mut results = admin.create_topics([&topic], &opts).await?; - assert_eq!(results.len(), 1, "created exactly one topic"); - let result = results.pop().expect("just checked the vector length"); - match result { - Ok(topic) | Err((topic, RDKafkaErrorCode::TopicAlreadyExists)) => { - assert_eq!(topic, database_name); - Ok(()) - } - Err((topic, code)) => { - assert_eq!(topic, database_name); - Err(format!("Cannot create topic '{}': {}", topic, code).into()) - } - } -} - -/// If there are no Kafka partitions, then create a topic. Production should have Kafka partitions -/// created already, so this should only create a topic in test/dev environments. -async fn maybe_auto_create_topics( - kafka_connection: &str, - database_name: &str, - creation_config: Option<&WriteBufferCreationConfig>, - cfg: &ClientConfig, -) -> Result, WriteBufferError> { - const N_TRIES: usize = 10; - - for i in 0..N_TRIES { - if let Some(partitions) = get_partitions(database_name, cfg).await? { - return Ok(partitions); - } - - // debounce after first round - if i > 0 { - info!( - topic=%database_name, - "Topic does not have partitions after creating it, wait a bit and try again." - ); - tokio::time::sleep(Duration::from_millis(250)).await; - } - - if let Some(creation_config) = creation_config { - create_kafka_topic( - kafka_connection, - database_name, - creation_config.n_shards, - &creation_config.options, - ) - .await?; - } else { - return Err("no partitions found and auto-creation not requested" - .to_string() - .into()); - } - } - - Err(format!("Could not auto-create topic after {} tries.", N_TRIES).into()) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::maybe_skip_kafka_integration; - use iox_time::MockProvider; - - #[tokio::test] - async fn can_parse_write_buffer_connection_config() { - let conn = maybe_skip_kafka_integration!(); - - let connection_config = BTreeMap::from([ - ( - String::from("consumer_max_batch_size"), - String::from("5242880"), - ), - (String::from("consumer_max_wait_ms"), String::from("10")), - ( - String::from("consumer_min_batch_size"), - String::from("1048576"), - ), - (String::from("max_message_size"), String::from("10485760")), - ( - String::from("producer_max_batch_size"), - String::from("2621440"), - ), - ]); - let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0))); - let metric_registry = Arc::new(metric::Registry::new()); - - KafkaBufferProducer::new( - conn, - "my_db", - &connection_config, - Some(&WriteBufferCreationConfig::default()), - time_provider, - &metric_registry, - ) - .await - .unwrap(); - } -} diff --git a/write_buffer/src/kafka/record_aggregator.rs b/write_buffer/src/kafka/record_aggregator.rs new file mode 100644 index 0000000000..23cc68aefd --- /dev/null +++ b/write_buffer/src/kafka/record_aggregator.rs @@ -0,0 +1,324 @@ +use std::sync::Arc; + +use data_types::{Sequence, SequenceNumber, ShardIndex}; +use dml::{DmlMeta, DmlOperation}; +use iox_time::{Time, TimeProvider}; +use observability_deps::tracing::warn; +use rskafka::{ + client::producer::aggregator::{ + Aggregator, Error, RecordAggregator as RecordAggregatorDelegate, + RecordAggregatorStatusDeaggregator, StatusDeaggregator, TryPush, + }, + record::Record, +}; +use trace::ctx::SpanContext; + +use crate::codec::{ContentType, IoxHeaders}; + +/// The [`Tag`] is a data-carrying token identifier used to de-aggregate +/// responses from a batch aggregated of requests using the +/// [`DmlMetaDeaggregator`]. +#[derive(Debug)] +pub struct Tag { + /// The tag into the batch returned by the + /// [`RecordAggregatorDelegate::try_push()`] call. + idx: usize, + + /// The timestamp assigned to the resulting Kafka [`Record`]. + timestamp: Time, + /// A span extracted from the original [`DmlOperation`]. + span_ctx: Option, + /// The approximate byte size of the serialised [`Record`], as calculated by + /// [`Record::approximate_size()`]. + approx_kafka_write_size: usize, +} + +/// A [`RecordAggregator`] implements [rskafka]'s abstract [`Aggregator`] +/// behaviour to provide batching of requests for a single Kafka partition. +/// +/// Specifically the [`RecordAggregator`] maps [`DmlOperation`] instances to +/// Kafka [`Record`] instances, and delegates the batching to the +/// [`RecordAggregatorDelegate`] implementation maintained within [rskafka] +/// itself. +/// +/// [rskafka]: https://github.com/influxdata/rskafka +#[derive(Debug)] +pub struct RecordAggregator { + time_provider: Arc, + + /// The shard index (Kafka partition number) this aggregator batches ops for (from Kafka, + /// not the catalog). + shard_index: ShardIndex, + + /// The underlying record aggregator the non-IOx-specific batching is + /// delegated to. + aggregator: RecordAggregatorDelegate, +} + +impl RecordAggregator { + /// Initialise a new [`RecordAggregator`] to aggregate up to + /// `max_batch_size` number of bytes per message. + pub fn new( + shard_index: ShardIndex, + max_batch_size: usize, + time_provider: Arc, + ) -> Self { + Self { + shard_index, + aggregator: RecordAggregatorDelegate::new(max_batch_size), + time_provider, + } + } +} + +impl RecordAggregator { + /// Serialise the [`DmlOperation`] destined for the specified `db_name` into a + /// [`Record`], returning the producer timestamp assigned to it. + fn to_record(&self, op: &DmlOperation) -> Result<(Record, Time), Error> { + let now = op + .meta() + .producer_ts() + .unwrap_or_else(|| self.time_provider.now()); + + let headers = IoxHeaders::new( + ContentType::Protobuf, + op.meta().span_context().cloned(), + op.namespace().to_owned(), + ); + + let mut buf = Vec::new(); + crate::codec::encode_operation(op.namespace(), op, &mut buf)?; + buf.shrink_to_fit(); + + let record = Record { + key: None, + value: Some(buf), + headers: headers + .headers() + .map(|(k, v)| (k.to_owned(), v.as_bytes().to_vec())) + .collect(), + timestamp: now.date_time(), + }; + + Ok((record, now)) + } +} + +impl Aggregator for RecordAggregator { + type Input = DmlOperation; + type Tag = ::Tag; + type StatusDeaggregator = DmlMetaDeaggregator; + + /// Callers should retain the returned [`Tag`] in order to de-aggregate the + /// [`DmlMeta`] from the request response. + fn try_push(&mut self, op: Self::Input) -> Result, Error> { + // Encode the DML op to a Record + let (record, timestamp) = self.to_record(&op)?; + + // Capture various metadata necessary to construct the Tag/DmlMeta for + // the caller once a batch has been flushed. + let span_ctx = op.meta().span_context().cloned(); + let approx_kafka_write_size = record.approximate_size(); + + // And delegate batching to rskafka's RecordAggregator implementation + Ok(match self.aggregator.try_push(record)? { + // NoCapacity returns the original input to the caller when the + // batching fails. + // + // The RecordBatcher delegate is returning the Record encoded from + // op above, but the caller of this fn is expecting the original op. + // + // Map to the original input op this fn was called with, discarding + // the encoded Record. + TryPush::NoCapacity(_) => { + // Log a warning if this occurs - this allows an operator to + // increase the maximum Kafka message size, or lower the linger + // time to minimise latency while still producing large enough + // batches for it to be worth while. + warn!("aggregated batch reached maximum capacity"); + TryPush::NoCapacity(op) + } + + // A successful delegate aggregation returns the tag for offset + // de-aggregation later. For simplicity, the tag this layer returns + // also carries the various (small) metadata elements needed to + // construct the DmlMeta at the point of de-aggregation. + TryPush::Aggregated(idx) => TryPush::Aggregated(Tag { + idx, + timestamp, + span_ctx, + approx_kafka_write_size, + }), + }) + } + + fn flush(&mut self) -> Result<(Vec, Self::StatusDeaggregator), Error> { + let records = self.aggregator.flush()?.0; + Ok((records, DmlMetaDeaggregator::new(self.shard_index))) + } +} + +/// The de-aggregation half of the [`RecordAggregator`], this type consumes the +/// caller's [`Tag`] obtained from the aggregator to return the corresponding +/// [`DmlMeta`] from the batched response. +/// +/// The [`DmlMetaDeaggregator`] is a stateless wrapper over the (also stateless) +/// [`RecordAggregatorStatusDeaggregator`] delegate, with most of the metadata +/// elements carried in the [`Tag`] itself. +#[derive(Debug)] +pub struct DmlMetaDeaggregator { + shard_index: ShardIndex, +} + +impl DmlMetaDeaggregator { + pub fn new(shard_index: ShardIndex) -> Self { + Self { shard_index } + } +} + +impl StatusDeaggregator for DmlMetaDeaggregator { + type Status = DmlMeta; + type Tag = Tag; + + fn deaggregate(&self, input: &[i64], tag: Self::Tag) -> Result { + // Delegate de-aggregation to the (stateless) record batch + // de-aggregator for forwards compatibility. + let offset = RecordAggregatorStatusDeaggregator::default() + .deaggregate(input, tag.idx) + .expect("invalid de-aggregation index"); + + Ok(DmlMeta::sequenced( + Sequence::new(self.shard_index, SequenceNumber::new(offset)), + tag.timestamp, + tag.span_ctx, + tag.approx_kafka_write_size, + )) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use dml::DmlWrite; + use hashbrown::HashMap; + use iox_time::MockProvider; + use mutable_batch::{writer::Writer, MutableBatch}; + use trace::LogTraceCollector; + + use crate::codec::{ + CONTENT_TYPE_PROTOBUF, HEADER_CONTENT_TYPE, HEADER_NAMESPACE, HEADER_TRACE_CONTEXT, + }; + + use super::*; + + const NAMESPACE: &str = "bananas"; + const SHARD_INDEX: ShardIndex = ShardIndex::new(42); + const TIMESTAMP_MILLIS: i64 = 1659990497000; + + fn test_op() -> DmlOperation { + let mut batch = MutableBatch::new(); + let mut writer = Writer::new(&mut batch, 1); + writer + // Date: "1970-01-01" + .write_time("time", [42].into_iter()) + .unwrap(); + writer + .write_i64("A", Some(&[0b00000001]), [1].into_iter()) + .unwrap(); + writer.commit(); + + let mut m = HashMap::default(); + m.insert("table".to_string(), batch); + + let span = SpanContext::new(Arc::new(LogTraceCollector::new())); + + DmlOperation::Write(DmlWrite::new( + NAMESPACE.to_string(), + m, + Some("1970-01-01".into()), + DmlMeta::unsequenced(Some(span)), + )) + } + + #[test] + fn test_record_aggregate() { + let clock = Arc::new(MockProvider::new(Time::from_timestamp_millis( + TIMESTAMP_MILLIS, + ))); + let mut agg = RecordAggregator::new(SHARD_INDEX, usize::MAX, clock); + let write = test_op(); + + let res = agg.try_push(write).expect("aggregate call should succeed"); + let tag = match res { + TryPush::NoCapacity(_) => panic!("unexpected no capacity"), + TryPush::Aggregated(tag) => tag, + }; + + // Flush the aggregator to acquire the records + let (records, deagg) = agg.flush().expect("should flush"); + assert_eq!(records.len(), 1); + + // Another flush should not yield the same records + let (records2, _) = agg.flush().expect("should flush"); + assert!(records2.is_empty()); + + // Assert properties of the resulting record + let record = records[0].clone(); + assert_eq!(record.key, None); + assert!(record.value.is_some()); + assert_eq!( + *record + .headers + .get(HEADER_CONTENT_TYPE) + .expect("no content type"), + Vec::::from(CONTENT_TYPE_PROTOBUF), + ); + assert_eq!( + *record + .headers + .get(HEADER_NAMESPACE) + .expect("no namespace header"), + Vec::::from(NAMESPACE), + ); + assert!(record.headers.get(HEADER_TRACE_CONTEXT).is_some()); + assert_eq!(record.timestamp.timestamp(), 1659990497); + + // Extract the DmlMeta from the de-aggregator + let got = deagg + .deaggregate(&[4242], tag) + .expect("de-aggregate should succeed"); + + // Assert the metadata properties + assert!(got.span_context().is_some()); + assert_eq!( + *got.sequence().expect("should be sequenced"), + Sequence::new(SHARD_INDEX, SequenceNumber::new(4242)) + ); + assert_eq!( + got.producer_ts().expect("no producer timestamp"), + Time::from_timestamp_millis(TIMESTAMP_MILLIS) + ); + assert_eq!( + got.bytes_read().expect("no approx size"), + record.approximate_size() + ); + } + + #[test] + fn test_record_aggregate_no_capacity() { + let clock = Arc::new(MockProvider::new(Time::from_timestamp_millis( + TIMESTAMP_MILLIS, + ))); + let mut agg = RecordAggregator::new(SHARD_INDEX, usize::MIN, clock); + let write = test_op(); + + let res = agg + .try_push(write.clone()) + .expect("aggregate call should succeed"); + match res { + TryPush::NoCapacity(res) => assert_eq!(res.namespace(), write.namespace()), + TryPush::Aggregated(_) => panic!("expected no capacity"), + }; + } +}