revert: rdkafka/rskafka swapping (#5844)

This reverts commit 442a7ff2a4.

This commit restores rskafka as the producer Kafka client, effectively
undoing the change made (and follow-up PRs) in:

    https://github.com/influxdata/influxdb_iox/pull/5800
pull/24376/head
Dom Dwyer 2022-10-17 12:34:28 +02:00
parent eec6bf40ce
commit 5d835d5047
9 changed files with 466 additions and 620 deletions

80
Cargo.lock generated
View File

@ -2858,18 +2858,6 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "292a948cd991e376cf75541fe5b97a1081d713c618b4f1b9500f8844e49eb565"
[[package]]
name = "libz-sys"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9702761c3935f8cc2f101793272e202c72b99da8f4224a19ddcf1279a6450bbf"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "link-cplusplus"
version = "1.0.7"
@ -3297,27 +3285,6 @@ dependencies = [
"libc",
]
[[package]]
name = "num_enum"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf5395665662ef45796a4ff5486c5d41d29e0c09640af4c5f17fd94ee2c119c9"
dependencies = [
"num_enum_derive",
]
[[package]]
name = "num_enum_derive"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b0498641e53dd6ac1a4f22547548caa6864cc4933784319cd1775271c5a46ce"
dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "object"
version = "0.29.0"
@ -3843,17 +3810,6 @@ dependencies = [
"syn",
]
[[package]]
name = "proc-macro-crate"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eda0fc3b0fb7c975631757e14d9049da17374063edb6ebbcbc54d880d4fe94e9"
dependencies = [
"once_cell",
"thiserror",
"toml",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
@ -4248,35 +4204,6 @@ dependencies = [
"num_cpus",
]
[[package]]
name = "rdkafka"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1de127f294f2dba488ed46760b129d5ecbeabbd337ccbf3739cb29d50db2161c"
dependencies = [
"futures",
"libc",
"log",
"rdkafka-sys",
"serde",
"serde_derive",
"serde_json",
"slab",
"tokio",
]
[[package]]
name = "rdkafka-sys"
version = "4.2.0+1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e542c6863b04ce0fa0c5719bc6b7b348cf8dd21af1bb03c9db5f9805b2a6473"
dependencies = [
"libc",
"libz-sys",
"num_enum",
"pkg-config",
]
[[package]]
name = "read_buffer"
version = "0.1.0"
@ -5891,12 +5818,6 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.4"
@ -6248,7 +6169,6 @@ dependencies = [
"parking_lot 0.12.1",
"pin-project",
"prost 0.11.0",
"rdkafka",
"rskafka",
"schema",
"tempfile",

View File

@ -202,40 +202,4 @@ mod tests {
]);
assert_eq!(actual, expected);
}
#[test]
fn test_k8s_idpe_connection_config() {
let cfg = WriteBufferConfig::try_parse_from([
"my_binary",
"--write-buffer",
"kafka",
"--write-buffer-addr",
"localhost:1234",
"--write-buffer-connection-config",
"max_message_size=10485760,\
producer_max_batch_size=2621440,\
consumer_min_batch_size=1048576,\
consumer_max_batch_size=5242880,\
consumer_max_wait_ms=10",
])
.unwrap();
let actual = cfg.connection_config();
let expected = BTreeMap::from([
(
String::from("consumer_max_batch_size"),
String::from("5242880"),
),
(String::from("consumer_max_wait_ms"), String::from("10")),
(
String::from("consumer_min_batch_size"),
String::from("1048576"),
),
(String::from("max_message_size"), String::from("10485760")),
(
String::from("producer_max_batch_size"),
String::from("2621440"),
),
]);
assert_eq!(actual, expected);
}
}

View File

@ -22,7 +22,6 @@ observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
pin-project = "1.0"
prost = "0.11"
rdkafka = "0.28.0"
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="3208e4742f08048bbab4e8fc4e0a775507fe3e66", default-features = false, features = ["compression-snappy", "transport-socks5"] }
schema = { path = "../schema" }
tokio = { version = "1.21", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] }

View File

@ -1,7 +1,7 @@
use crate::{
core::{WriteBufferError, WriteBufferReading, WriteBufferWriting},
file::{FileBufferConsumer, FileBufferProducer},
kafka::RSKafkaConsumer,
kafka::{RSKafkaConsumer, RSKafkaProducer},
mock::{
MockBufferForReading, MockBufferForReadingThatAlwaysErrors, MockBufferForWriting,
MockBufferForWritingThatAlwaysErrors, MockBufferSharedState,
@ -152,8 +152,8 @@ impl WriteBufferConfigFactory {
pub async fn new_config_write(
&self,
db_name: &str,
_partitions: Option<Range<i32>>,
_trace_collector: Option<&Arc<dyn TraceCollector>>,
partitions: Option<Range<i32>>,
trace_collector: Option<&Arc<dyn TraceCollector>>,
cfg: &WriteBufferConnection,
) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> {
let writer = match &cfg.type_[..] {
@ -168,7 +168,20 @@ impl WriteBufferConfigFactory {
.await?;
Arc::new(file_buffer) as _
}
"kafka" => self.kafka_buffer_producer(db_name, cfg).await?,
"kafka" => {
let rskafa_buffer = RSKafkaProducer::new(
cfg.connection.clone(),
db_name.to_owned(),
&cfg.connection_config,
Arc::clone(&self.time_provider),
cfg.creation_config.as_ref(),
partitions,
trace_collector.map(Arc::clone),
&*self.metric_registry,
)
.await?;
Arc::new(rskafa_buffer) as _
}
"mock" => match self.get_mock(&cfg.connection)? {
Mock::Normal(state) => {
let mock_buffer = MockBufferForWriting::new(
@ -191,24 +204,6 @@ impl WriteBufferConfigFactory {
Ok(writer)
}
async fn kafka_buffer_producer(
&self,
db_name: &str,
cfg: &WriteBufferConnection,
) -> Result<Arc<dyn WriteBufferWriting>, WriteBufferError> {
let kafka_buffer = crate::kafka::rdkafka::KafkaBufferProducer::new(
&cfg.connection,
db_name,
&cfg.connection_config,
cfg.creation_config.as_ref(),
Arc::clone(&self.time_provider),
&self.metric_registry,
)
.await?;
Ok(Arc::new(kafka_buffer) as _)
}
/// Returns a new [`WriteBufferReading`] for the provided [`WriteBufferConnection`]
pub async fn new_config_read(
&self,

View File

@ -80,15 +80,6 @@ impl From<std::io::Error> for WriteBufferError {
}
}
impl From<rdkafka::error::KafkaError> for WriteBufferError {
fn from(e: rdkafka::error::KafkaError) -> Self {
Self {
inner: Box::new(e),
kind: WriteBufferErrorKind::IO,
}
}
}
impl From<rskafka::client::error::Error> for WriteBufferError {
fn from(e: rskafka::client::error::Error) -> Self {
Self {

View File

@ -36,7 +36,6 @@ pub struct KafkaProducerMetrics<P = SystemProvider> {
impl KafkaProducerMetrics {
/// Decorate the specified [`ProducerClient`] implementation with an
/// instrumentation layer.
#[allow(dead_code)]
pub fn new(
client: Box<dyn ProducerClient>,
kafka_topic_name: String,

View File

@ -1,17 +1,24 @@
use self::config::{ClientConfig, ConsumerConfig, TopicCreationConfig};
use self::{
config::{ClientConfig, ConsumerConfig, ProducerConfig, TopicCreationConfig},
instrumentation::KafkaProducerMetrics,
record_aggregator::RecordAggregator,
};
use crate::{
codec::IoxHeaders,
config::WriteBufferCreationConfig,
core::{WriteBufferError, WriteBufferErrorKind, WriteBufferReading, WriteBufferStreamHandler},
core::{
WriteBufferError, WriteBufferErrorKind, WriteBufferReading, WriteBufferStreamHandler,
WriteBufferWriting,
},
};
use async_trait::async_trait;
use data_types::{Sequence, SequenceNumber, ShardIndex};
use dml::DmlOperation;
use dml::{DmlMeta, DmlOperation};
use futures::{
stream::{self, BoxStream},
StreamExt, TryStreamExt,
};
use iox_time::Time;
use iox_time::{Time, TimeProvider};
use observability_deps::tracing::warn;
use parking_lot::Mutex;
use rskafka::{
@ -19,6 +26,7 @@ use rskafka::{
consumer::{StartOffset, StreamConsumerBuilder},
error::{Error as RSKafkaError, ProtocolError},
partition::{OffsetAt, PartitionClient, UnknownTopicHandling},
producer::{BatchProducer, BatchProducerBuilder},
ClientBuilder,
},
record::RecordAndOffset,
@ -35,13 +43,112 @@ use trace::TraceCollector;
mod config;
mod instrumentation;
pub(crate) mod rdkafka;
mod record_aggregator;
/// Maximum number of jobs buffered and decoded concurrently.
const CONCURRENT_DECODE_JOBS: usize = 10;
type Result<T, E = WriteBufferError> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct RSKafkaProducer {
producers: BTreeMap<ShardIndex, BatchProducer<RecordAggregator>>,
}
impl RSKafkaProducer {
#[allow(clippy::too_many_arguments)]
pub async fn new<'a>(
conn: String,
topic_name: String,
connection_config: &'a BTreeMap<String, String>,
time_provider: Arc<dyn TimeProvider>,
creation_config: Option<&'a WriteBufferCreationConfig>,
partitions: Option<Range<i32>>,
_trace_collector: Option<Arc<dyn TraceCollector>>,
metric_registry: &'a metric::Registry,
) -> Result<Self> {
let partition_clients = setup_topic(
conn,
topic_name.clone(),
connection_config,
creation_config,
partitions,
)
.await?;
let producer_config = ProducerConfig::try_from(connection_config)?;
let producers = partition_clients
.into_iter()
.map(|(shard_index, partition_client)| {
// Instrument this kafka partition client.
let partition_client = KafkaProducerMetrics::new(
Box::new(partition_client),
topic_name.clone(),
shard_index,
metric_registry,
);
let mut producer_builder =
BatchProducerBuilder::new_with_client(Arc::new(partition_client));
if let Some(linger) = producer_config.linger {
producer_builder = producer_builder.with_linger(linger);
}
let producer = producer_builder.build(RecordAggregator::new(
shard_index,
producer_config.max_batch_size,
Arc::clone(&time_provider),
));
(shard_index, producer)
})
.collect();
Ok(Self { producers })
}
}
#[async_trait]
impl WriteBufferWriting for RSKafkaProducer {
fn shard_indexes(&self) -> BTreeSet<ShardIndex> {
self.producers.keys().copied().collect()
}
async fn store_operation(
&self,
shard_index: ShardIndex,
operation: DmlOperation,
) -> Result<DmlMeta, WriteBufferError> {
// Sanity check to ensure only partitioned writes are pushed into Kafka.
if let DmlOperation::Write(w) = &operation {
assert!(
w.partition_key().is_some(),
"enqueuing unpartitioned write into kafka"
)
}
let producer = self
.producers
.get(&shard_index)
.ok_or_else::<WriteBufferError, _>(|| {
format!("Unknown shard index: {}", shard_index).into()
})?;
Ok(producer.produce(operation).await?)
}
async fn flush(&self) -> Result<(), WriteBufferError> {
for producer in self.producers.values() {
producer.flush().await?;
}
Ok(())
}
fn type_name(&self) -> &'static str {
"kafka"
}
}
#[derive(Debug)]
pub struct RSKafkaStreamHandler {
partition_client: Arc<PartitionClient>,
@ -418,17 +525,14 @@ async fn setup_topic(
mod tests {
use super::*;
use crate::{
core::{
test_utils::{
assert_span_context_eq_or_linked, perform_generic_tests, random_topic_name,
set_pop_first, TestAdapter, TestContext,
},
WriteBufferWriting,
core::test_utils::{
assert_span_context_eq_or_linked, perform_generic_tests, random_topic_name,
set_pop_first, TestAdapter, TestContext,
},
maybe_skip_kafka_integration,
};
use data_types::{DeletePredicate, PartitionKey, TimestampRange};
use dml::{test_util::assert_write_op_eq, DmlDelete, DmlMeta, DmlWrite};
use dml::{test_util::assert_write_op_eq, DmlDelete, DmlWrite};
use futures::{stream::FuturesUnordered, TryStreamExt};
use iox_time::TimeProvider;
use rskafka::{client::partition::Compression, record::Record};
@ -491,17 +595,19 @@ mod tests {
#[async_trait]
impl TestContext for RSKafkaTestContext {
type Writing = rdkafka::KafkaBufferProducer;
type Writing = RSKafkaProducer;
type Reading = RSKafkaConsumer;
async fn writing(&self, creation_config: bool) -> Result<Self::Writing, WriteBufferError> {
rdkafka::KafkaBufferProducer::new(
RSKafkaProducer::new(
self.conn.clone(),
self.topic_name.clone(),
&BTreeMap::default(),
self.creation_config(creation_config).as_ref(),
Arc::clone(&self.time_provider),
self.creation_config(creation_config).as_ref(),
None,
Some(self.trace_collector() as Arc<_>),
&self.metrics,
)
.await
@ -744,9 +850,9 @@ mod tests {
.unwrap();
}
async fn write<T: WriteBufferWriting>(
async fn write(
namespace: &str,
producer: &T,
producer: &RSKafkaProducer,
trace_collector: &Arc<RingBufferTraceCollector>,
shard_index: ShardIndex,
partition_key: impl Into<PartitionKey> + Send,
@ -763,9 +869,9 @@ mod tests {
producer.store_operation(shard_index, op).await.unwrap()
}
async fn delete<T: WriteBufferWriting>(
async fn delete(
namespace: &str,
producer: &T,
producer: &RSKafkaProducer,
trace_collector: &Arc<RingBufferTraceCollector>,
shard_index: ShardIndex,
) -> DmlMeta {

View File

@ -1,452 +0,0 @@
use crate::{
codec::{ContentType, IoxHeaders},
core::{WriteBufferError, WriteBufferWriting},
kafka::WriteBufferCreationConfig,
};
use async_trait::async_trait;
use data_types::{Sequence, SequenceNumber, ShardIndex};
use dml::{DmlMeta, DmlOperation};
use iox_time::{Time, TimeProvider};
use metric::{Attributes, DurationHistogram, Metric};
use observability_deps::tracing::{debug, info};
use rdkafka::{
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
client::DefaultClientContext,
consumer::{BaseConsumer, Consumer},
error::KafkaError,
message::{Headers, OwnedHeaders},
producer::{FutureProducer, FutureRecord, Producer},
types::RDKafkaErrorCode,
util::Timeout,
ClientConfig,
};
use std::{
collections::{BTreeMap, BTreeSet},
num::NonZeroU32,
sync::Arc,
time::Duration,
};
/// Default timeout supplied to rdkafka client for kafka operations.
///
/// Chosen to be a value less than the default gRPC timeout (30
/// seconds) so we can detect kafka errors and return them prior to
/// the gRPC requests to IOx timing out.
///
/// More context in
/// <https://github.com/influxdata/influxdb_iox/issues/3029>
const KAFKA_OPERATION_TIMEOUT_MS: u64 = 20_000;
impl From<&IoxHeaders> for OwnedHeaders {
fn from(iox_headers: &IoxHeaders) -> Self {
let mut res = Self::new();
for (header, value) in iox_headers.headers() {
res = res.add(header, value.as_ref());
}
res
}
}
pub struct KafkaBufferProducer {
conn: String,
database_name: String,
time_provider: Arc<dyn TimeProvider>,
producer: Arc<FutureProducer<DefaultClientContext>>,
partitions: BTreeSet<ShardIndex>,
enqueue: Metric<DurationHistogram>,
}
// Needed because rdkafka's FutureProducer doesn't impl Debug
impl std::fmt::Debug for KafkaBufferProducer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KafkaBufferProducer")
.field("conn", &self.conn)
.field("database_name", &self.database_name)
.finish()
}
}
#[async_trait]
impl WriteBufferWriting for KafkaBufferProducer {
fn shard_indexes(&self) -> BTreeSet<ShardIndex> {
self.partitions.clone()
}
/// Send a [`DmlOperation`] to the write buffer using the specified shard index.
async fn store_operation(
&self,
shard_index: ShardIndex,
operation: DmlOperation,
) -> Result<DmlMeta, WriteBufferError> {
// Sanity check to ensure only partitioned writes are pushed into Kafka.
if let DmlOperation::Write(w) = &operation {
assert!(
w.partition_key().is_some(),
"enqueuing unpartitioned write into kafka"
)
}
// Only send writes with known shard indexes to Kafka.
if !self.partitions.contains(&shard_index) {
return Err(format!("Unknown shard index: {}", shard_index).into());
}
let kafka_partition_id = shard_index.get();
let enqueue_start = self.time_provider.now();
// truncate milliseconds from timestamps because that's what Kafka supports
let now = operation
.meta()
.producer_ts()
.unwrap_or_else(|| self.time_provider.now());
let timestamp_millis = now.date_time().timestamp_millis();
let timestamp = Time::from_timestamp_millis(timestamp_millis);
let headers = IoxHeaders::new(
ContentType::Protobuf,
operation.meta().span_context().cloned(),
operation.namespace().to_string(),
);
let mut buf = Vec::new();
crate::codec::encode_operation(&self.database_name, &operation, &mut buf)?;
// This type annotation is necessary because `FutureRecord` is generic over key type, but
// key is optional and we're not setting a key. `String` is arbitrary.
let record: FutureRecord<'_, String, _> = FutureRecord::to(&self.database_name)
.payload(&buf)
.partition(kafka_partition_id)
.timestamp(timestamp_millis)
.headers((&headers).into());
let kafka_write_size = estimate_message_size(
record.payload.map(|v| v.as_ref()),
record.key.map(|s| s.as_bytes()),
record.headers.as_ref(),
);
debug!(db_name=%self.database_name, kafka_partition_id, size=buf.len(), "writing to kafka");
let res = self.producer.send(record, Timeout::Never).await;
if let Some(delta) = self
.time_provider
.now()
.checked_duration_since(enqueue_start)
{
let result_attr = match &res {
Ok(_) => "success",
Err(_) => "error",
};
let attr = Attributes::from([
("kafka_partition", shard_index.to_string().into()),
("kafka_topic", self.database_name.clone().into()),
("result", result_attr.into()),
]);
let recorder = self.enqueue.recorder(attr);
recorder.record(delta);
}
let (partition, offset) = res.map_err(|(e, _owned_message)| e)?;
debug!(db_name=%self.database_name, %offset, %partition, size=buf.len(), "wrote to kafka");
Ok(DmlMeta::sequenced(
Sequence::new(shard_index, SequenceNumber::new(offset)),
timestamp,
operation.meta().span_context().cloned(),
kafka_write_size,
))
}
async fn flush(&self) -> Result<(), WriteBufferError> {
let producer = Arc::clone(&self.producer);
tokio::task::spawn_blocking(move || {
producer.flush(Timeout::Never);
})
.await
.expect("subtask failed");
Ok(())
}
fn type_name(&self) -> &'static str {
"kafka"
}
}
impl KafkaBufferProducer {
pub async fn new(
conn: impl Into<String> + Send,
database_name: impl Into<String> + Send,
connection_config: &BTreeMap<String, String>,
creation_config: Option<&WriteBufferCreationConfig>,
time_provider: Arc<dyn TimeProvider>,
metric_registry: &metric::Registry,
) -> Result<Self, WriteBufferError> {
let conn = conn.into();
let database_name = database_name.into();
let mut cfg = ClientConfig::new();
// these configs can be overwritten
cfg.set("message.timeout.ms", "5000");
cfg.set("message.max.bytes", "31457280");
cfg.set("message.send.max.retries", "10");
cfg.set("queue.buffering.max.kbytes", "31457280");
cfg.set("request.required.acks", "all"); // equivalent to acks=-1
cfg.set("compression.type", "snappy");
cfg.set("statistics.interval.ms", "15000");
// user overrides
if let Some(max_batch_size) = connection_config.get("producer_max_batch_size") {
cfg.set("batch.size", max_batch_size);
}
if let Some(linger) = connection_config.get("producer_linger_ms") {
cfg.set("linger.ms", linger);
}
if let Some(max_message_size) = connection_config.get("max_message_size") {
cfg.set("message.max.bytes", max_message_size);
}
// these configs are set in stone
cfg.set("bootstrap.servers", &conn);
cfg.set("allow.auto.create.topics", "false");
// handle auto-creation
let partitions =
maybe_auto_create_topics(&conn, &database_name, creation_config, &cfg).await?;
let producer = cfg.create()?;
let enqueue = metric_registry.register_metric::<DurationHistogram>(
"write_buffer_client_produce_duration",
"duration of time taken to push a set of records to kafka \
- includes codec, protocol, and network overhead",
);
Ok(Self {
conn,
database_name,
time_provider,
producer: Arc::new(producer),
partitions,
enqueue,
})
}
}
/// Iterate over the kafka messages
fn header_iter<H>(headers: Option<&H>) -> impl Iterator<Item = (&str, &[u8])>
where
H: Headers,
{
headers
.into_iter()
.flat_map(|headers| (0..headers.count()).map(|idx| headers.get(idx).unwrap()))
}
/// Estimate size of data read from kafka as payload len + key len + headers
fn estimate_message_size<H>(
payload: Option<&[u8]>,
key: Option<&[u8]>,
headers: Option<&H>,
) -> usize
where
H: Headers,
{
payload.map(|payload| payload.len()).unwrap_or_default()
+ key.map(|key| key.len()).unwrap_or_default()
+ header_iter(headers)
.map(|(key, value)| key.len() + value.len())
.sum::<usize>()
}
/// Get Kafka partition IDs (IOx ShardIndexes) for the database-specific Kafka topic.
///
/// Will return `None` if the topic is unknown and has to be created.
///
/// This will check that the partition is is non-empty.
async fn get_partitions(
database_name: &str,
cfg: &ClientConfig,
) -> Result<Option<BTreeSet<ShardIndex>>, WriteBufferError> {
let database_name = database_name.to_string();
let cfg = cfg.clone();
let metadata = tokio::task::spawn_blocking(move || {
let probe_consumer: BaseConsumer = cfg.create()?;
probe_consumer.fetch_metadata(
Some(&database_name),
Duration::from_millis(KAFKA_OPERATION_TIMEOUT_MS),
)
})
.await
.expect("subtask failed")?;
let topic_metadata = metadata.topics().get(0).expect("requested a single topic");
match topic_metadata.error() {
None => {
let partitions: BTreeSet<_> = topic_metadata
.partitions()
.iter()
.map(|partition_metdata| ShardIndex::new(partition_metdata.id()))
.collect();
if partitions.is_empty() {
Err("Topic exists but has no partitions".to_string().into())
} else {
Ok(Some(partitions))
}
}
Some(error_code) => {
let error_code: RDKafkaErrorCode = error_code.into();
match error_code {
RDKafkaErrorCode::UnknownTopic | RDKafkaErrorCode::UnknownTopicOrPartition => {
// The caller is responsible for creating the topic, so this is somewhat OK.
Ok(None)
}
_ => Err(KafkaError::MetadataFetch(error_code).into()),
}
}
}
}
fn admin_client(kafka_connection: &str) -> Result<AdminClient<DefaultClientContext>, KafkaError> {
let mut cfg = ClientConfig::new();
cfg.set("bootstrap.servers", kafka_connection);
cfg.set("message.timeout.ms", "5000");
cfg.create()
}
/// Create Kafka topic based on the provided configs.
///
/// This will create a topic with `n_sequencers` Kafka partitions.
///
/// This will NOT fail if the topic already exists! `maybe_auto_create_topics` will only call this
/// if there are no partitions. Production should always have partitions already created, so
/// `create_kafka_topic` shouldn't run in production and is only for test/dev environments.
async fn create_kafka_topic(
kafka_connection: &str,
database_name: &str,
n_sequencers: NonZeroU32,
cfg: &BTreeMap<String, String>,
) -> Result<(), WriteBufferError> {
let admin = admin_client(kafka_connection)?;
let mut topic = NewTopic::new(
database_name,
n_sequencers.get() as i32,
TopicReplication::Fixed(1),
);
for (k, v) in cfg {
topic = topic.set(k, v);
}
let opts = AdminOptions::default();
let mut results = admin.create_topics([&topic], &opts).await?;
assert_eq!(results.len(), 1, "created exactly one topic");
let result = results.pop().expect("just checked the vector length");
match result {
Ok(topic) | Err((topic, RDKafkaErrorCode::TopicAlreadyExists)) => {
assert_eq!(topic, database_name);
Ok(())
}
Err((topic, code)) => {
assert_eq!(topic, database_name);
Err(format!("Cannot create topic '{}': {}", topic, code).into())
}
}
}
/// If there are no Kafka partitions, then create a topic. Production should have Kafka partitions
/// created already, so this should only create a topic in test/dev environments.
async fn maybe_auto_create_topics(
kafka_connection: &str,
database_name: &str,
creation_config: Option<&WriteBufferCreationConfig>,
cfg: &ClientConfig,
) -> Result<BTreeSet<ShardIndex>, WriteBufferError> {
const N_TRIES: usize = 10;
for i in 0..N_TRIES {
if let Some(partitions) = get_partitions(database_name, cfg).await? {
return Ok(partitions);
}
// debounce after first round
if i > 0 {
info!(
topic=%database_name,
"Topic does not have partitions after creating it, wait a bit and try again."
);
tokio::time::sleep(Duration::from_millis(250)).await;
}
if let Some(creation_config) = creation_config {
create_kafka_topic(
kafka_connection,
database_name,
creation_config.n_shards,
&creation_config.options,
)
.await?;
} else {
return Err("no partitions found and auto-creation not requested"
.to_string()
.into());
}
}
Err(format!("Could not auto-create topic after {} tries.", N_TRIES).into())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::maybe_skip_kafka_integration;
use iox_time::MockProvider;
#[tokio::test]
async fn can_parse_write_buffer_connection_config() {
let conn = maybe_skip_kafka_integration!();
let connection_config = BTreeMap::from([
(
String::from("consumer_max_batch_size"),
String::from("5242880"),
),
(String::from("consumer_max_wait_ms"), String::from("10")),
(
String::from("consumer_min_batch_size"),
String::from("1048576"),
),
(String::from("max_message_size"), String::from("10485760")),
(
String::from("producer_max_batch_size"),
String::from("2621440"),
),
]);
let time_provider = Arc::new(MockProvider::new(Time::from_timestamp_nanos(0)));
let metric_registry = Arc::new(metric::Registry::new());
KafkaBufferProducer::new(
conn,
"my_db",
&connection_config,
Some(&WriteBufferCreationConfig::default()),
time_provider,
&metric_registry,
)
.await
.unwrap();
}
}

View File

@ -0,0 +1,324 @@
use std::sync::Arc;
use data_types::{Sequence, SequenceNumber, ShardIndex};
use dml::{DmlMeta, DmlOperation};
use iox_time::{Time, TimeProvider};
use observability_deps::tracing::warn;
use rskafka::{
client::producer::aggregator::{
Aggregator, Error, RecordAggregator as RecordAggregatorDelegate,
RecordAggregatorStatusDeaggregator, StatusDeaggregator, TryPush,
},
record::Record,
};
use trace::ctx::SpanContext;
use crate::codec::{ContentType, IoxHeaders};
/// The [`Tag`] is a data-carrying token identifier used to de-aggregate
/// responses from a batch aggregated of requests using the
/// [`DmlMetaDeaggregator`].
#[derive(Debug)]
pub struct Tag {
/// The tag into the batch returned by the
/// [`RecordAggregatorDelegate::try_push()`] call.
idx: usize,
/// The timestamp assigned to the resulting Kafka [`Record`].
timestamp: Time,
/// A span extracted from the original [`DmlOperation`].
span_ctx: Option<SpanContext>,
/// The approximate byte size of the serialised [`Record`], as calculated by
/// [`Record::approximate_size()`].
approx_kafka_write_size: usize,
}
/// A [`RecordAggregator`] implements [rskafka]'s abstract [`Aggregator`]
/// behaviour to provide batching of requests for a single Kafka partition.
///
/// Specifically the [`RecordAggregator`] maps [`DmlOperation`] instances to
/// Kafka [`Record`] instances, and delegates the batching to the
/// [`RecordAggregatorDelegate`] implementation maintained within [rskafka]
/// itself.
///
/// [rskafka]: https://github.com/influxdata/rskafka
#[derive(Debug)]
pub struct RecordAggregator {
time_provider: Arc<dyn TimeProvider>,
/// The shard index (Kafka partition number) this aggregator batches ops for (from Kafka,
/// not the catalog).
shard_index: ShardIndex,
/// The underlying record aggregator the non-IOx-specific batching is
/// delegated to.
aggregator: RecordAggregatorDelegate,
}
impl RecordAggregator {
/// Initialise a new [`RecordAggregator`] to aggregate up to
/// `max_batch_size` number of bytes per message.
pub fn new(
shard_index: ShardIndex,
max_batch_size: usize,
time_provider: Arc<dyn TimeProvider>,
) -> Self {
Self {
shard_index,
aggregator: RecordAggregatorDelegate::new(max_batch_size),
time_provider,
}
}
}
impl RecordAggregator {
/// Serialise the [`DmlOperation`] destined for the specified `db_name` into a
/// [`Record`], returning the producer timestamp assigned to it.
fn to_record(&self, op: &DmlOperation) -> Result<(Record, Time), Error> {
let now = op
.meta()
.producer_ts()
.unwrap_or_else(|| self.time_provider.now());
let headers = IoxHeaders::new(
ContentType::Protobuf,
op.meta().span_context().cloned(),
op.namespace().to_owned(),
);
let mut buf = Vec::new();
crate::codec::encode_operation(op.namespace(), op, &mut buf)?;
buf.shrink_to_fit();
let record = Record {
key: None,
value: Some(buf),
headers: headers
.headers()
.map(|(k, v)| (k.to_owned(), v.as_bytes().to_vec()))
.collect(),
timestamp: now.date_time(),
};
Ok((record, now))
}
}
impl Aggregator for RecordAggregator {
type Input = DmlOperation;
type Tag = <DmlMetaDeaggregator as StatusDeaggregator>::Tag;
type StatusDeaggregator = DmlMetaDeaggregator;
/// Callers should retain the returned [`Tag`] in order to de-aggregate the
/// [`DmlMeta`] from the request response.
fn try_push(&mut self, op: Self::Input) -> Result<TryPush<Self::Input, Self::Tag>, Error> {
// Encode the DML op to a Record
let (record, timestamp) = self.to_record(&op)?;
// Capture various metadata necessary to construct the Tag/DmlMeta for
// the caller once a batch has been flushed.
let span_ctx = op.meta().span_context().cloned();
let approx_kafka_write_size = record.approximate_size();
// And delegate batching to rskafka's RecordAggregator implementation
Ok(match self.aggregator.try_push(record)? {
// NoCapacity returns the original input to the caller when the
// batching fails.
//
// The RecordBatcher delegate is returning the Record encoded from
// op above, but the caller of this fn is expecting the original op.
//
// Map to the original input op this fn was called with, discarding
// the encoded Record.
TryPush::NoCapacity(_) => {
// Log a warning if this occurs - this allows an operator to
// increase the maximum Kafka message size, or lower the linger
// time to minimise latency while still producing large enough
// batches for it to be worth while.
warn!("aggregated batch reached maximum capacity");
TryPush::NoCapacity(op)
}
// A successful delegate aggregation returns the tag for offset
// de-aggregation later. For simplicity, the tag this layer returns
// also carries the various (small) metadata elements needed to
// construct the DmlMeta at the point of de-aggregation.
TryPush::Aggregated(idx) => TryPush::Aggregated(Tag {
idx,
timestamp,
span_ctx,
approx_kafka_write_size,
}),
})
}
fn flush(&mut self) -> Result<(Vec<Record>, Self::StatusDeaggregator), Error> {
let records = self.aggregator.flush()?.0;
Ok((records, DmlMetaDeaggregator::new(self.shard_index)))
}
}
/// The de-aggregation half of the [`RecordAggregator`], this type consumes the
/// caller's [`Tag`] obtained from the aggregator to return the corresponding
/// [`DmlMeta`] from the batched response.
///
/// The [`DmlMetaDeaggregator`] is a stateless wrapper over the (also stateless)
/// [`RecordAggregatorStatusDeaggregator`] delegate, with most of the metadata
/// elements carried in the [`Tag`] itself.
#[derive(Debug)]
pub struct DmlMetaDeaggregator {
shard_index: ShardIndex,
}
impl DmlMetaDeaggregator {
pub fn new(shard_index: ShardIndex) -> Self {
Self { shard_index }
}
}
impl StatusDeaggregator for DmlMetaDeaggregator {
type Status = DmlMeta;
type Tag = Tag;
fn deaggregate(&self, input: &[i64], tag: Self::Tag) -> Result<Self::Status, Error> {
// Delegate de-aggregation to the (stateless) record batch
// de-aggregator for forwards compatibility.
let offset = RecordAggregatorStatusDeaggregator::default()
.deaggregate(input, tag.idx)
.expect("invalid de-aggregation index");
Ok(DmlMeta::sequenced(
Sequence::new(self.shard_index, SequenceNumber::new(offset)),
tag.timestamp,
tag.span_ctx,
tag.approx_kafka_write_size,
))
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use dml::DmlWrite;
use hashbrown::HashMap;
use iox_time::MockProvider;
use mutable_batch::{writer::Writer, MutableBatch};
use trace::LogTraceCollector;
use crate::codec::{
CONTENT_TYPE_PROTOBUF, HEADER_CONTENT_TYPE, HEADER_NAMESPACE, HEADER_TRACE_CONTEXT,
};
use super::*;
const NAMESPACE: &str = "bananas";
const SHARD_INDEX: ShardIndex = ShardIndex::new(42);
const TIMESTAMP_MILLIS: i64 = 1659990497000;
fn test_op() -> DmlOperation {
let mut batch = MutableBatch::new();
let mut writer = Writer::new(&mut batch, 1);
writer
// Date: "1970-01-01"
.write_time("time", [42].into_iter())
.unwrap();
writer
.write_i64("A", Some(&[0b00000001]), [1].into_iter())
.unwrap();
writer.commit();
let mut m = HashMap::default();
m.insert("table".to_string(), batch);
let span = SpanContext::new(Arc::new(LogTraceCollector::new()));
DmlOperation::Write(DmlWrite::new(
NAMESPACE.to_string(),
m,
Some("1970-01-01".into()),
DmlMeta::unsequenced(Some(span)),
))
}
#[test]
fn test_record_aggregate() {
let clock = Arc::new(MockProvider::new(Time::from_timestamp_millis(
TIMESTAMP_MILLIS,
)));
let mut agg = RecordAggregator::new(SHARD_INDEX, usize::MAX, clock);
let write = test_op();
let res = agg.try_push(write).expect("aggregate call should succeed");
let tag = match res {
TryPush::NoCapacity(_) => panic!("unexpected no capacity"),
TryPush::Aggregated(tag) => tag,
};
// Flush the aggregator to acquire the records
let (records, deagg) = agg.flush().expect("should flush");
assert_eq!(records.len(), 1);
// Another flush should not yield the same records
let (records2, _) = agg.flush().expect("should flush");
assert!(records2.is_empty());
// Assert properties of the resulting record
let record = records[0].clone();
assert_eq!(record.key, None);
assert!(record.value.is_some());
assert_eq!(
*record
.headers
.get(HEADER_CONTENT_TYPE)
.expect("no content type"),
Vec::<u8>::from(CONTENT_TYPE_PROTOBUF),
);
assert_eq!(
*record
.headers
.get(HEADER_NAMESPACE)
.expect("no namespace header"),
Vec::<u8>::from(NAMESPACE),
);
assert!(record.headers.get(HEADER_TRACE_CONTEXT).is_some());
assert_eq!(record.timestamp.timestamp(), 1659990497);
// Extract the DmlMeta from the de-aggregator
let got = deagg
.deaggregate(&[4242], tag)
.expect("de-aggregate should succeed");
// Assert the metadata properties
assert!(got.span_context().is_some());
assert_eq!(
*got.sequence().expect("should be sequenced"),
Sequence::new(SHARD_INDEX, SequenceNumber::new(4242))
);
assert_eq!(
got.producer_ts().expect("no producer timestamp"),
Time::from_timestamp_millis(TIMESTAMP_MILLIS)
);
assert_eq!(
got.bytes_read().expect("no approx size"),
record.approximate_size()
);
}
#[test]
fn test_record_aggregate_no_capacity() {
let clock = Arc::new(MockProvider::new(Time::from_timestamp_millis(
TIMESTAMP_MILLIS,
)));
let mut agg = RecordAggregator::new(SHARD_INDEX, usize::MIN, clock);
let write = test_op();
let res = agg
.try_push(write.clone())
.expect("aggregate call should succeed");
match res {
TryPush::NoCapacity(res) => assert_eq!(res.namespace(), write.namespace()),
TryPush::Aggregated(_) => panic!("expected no capacity"),
};
}
}