feat: capture Kafka message size distribution

Adds instrumentation to the low-level (post-aggregation) Kafka client,
capturing the uncompressed, approximate message size (calculated as the
sum of all Record::approximate_size() returns, ignoring largely static
framing overhead).
pull/24376/head
Dom Dwyer 2022-08-29 13:34:18 +02:00
parent 80eb8efbe5
commit 175cae2f56
1 changed files with 70 additions and 2 deletions

View File

@ -3,7 +3,7 @@ use std::result::Result;
use data_types::KafkaPartition; use data_types::KafkaPartition;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use iox_time::{SystemProvider, TimeProvider}; use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, DurationHistogram}; use metric::{Attributes, DurationHistogram, U64Histogram, U64HistogramOptions};
use rskafka::{ use rskafka::{
client::{partition::Compression, producer::ProducerClient}, client::{partition::Compression, producer::ProducerClient},
record::Record, record::Record,
@ -14,6 +14,12 @@ use rskafka::{
/// underlying [`ProducerClient::produce()`] call, which includes serialisation /// underlying [`ProducerClient::produce()`] call, which includes serialisation
/// & protocol overhead, as well as the actual network I/O. /// & protocol overhead, as well as the actual network I/O.
/// ///
/// Captures the approximate, uncompressed size of the resulting Kafka message's
/// payload wrote to the wire by summing the [`Record::approximate_size()`] of
/// the batch. This value reflects the size of the message before client
/// compression, or broker compression - messages on the wire may be
/// significantly smaller.
///
/// The metrics created by this instrumentation are labelled with the kafka /// The metrics created by this instrumentation are labelled with the kafka
/// topic & partition specified at initialisation. /// topic & partition specified at initialisation.
#[derive(Debug)] #[derive(Debug)]
@ -23,6 +29,8 @@ pub struct KafkaProducerMetrics<P = SystemProvider> {
enqueue_success: DurationHistogram, enqueue_success: DurationHistogram,
enqueue_error: DurationHistogram, enqueue_error: DurationHistogram,
msg_size: U64Histogram,
} }
impl KafkaProducerMetrics { impl KafkaProducerMetrics {
@ -39,6 +47,24 @@ impl KafkaProducerMetrics {
("kafka_topic", kafka_topic_name.into()), ("kafka_topic", kafka_topic_name.into()),
]); ]);
// Capture the distribution of message sizes (sum of Record size)
let msg_size = metrics
.register_metric_with_options::<U64Histogram, _>(
"write_buffer_client_payload_size",
"distribution of approximate uncompressed message \
payload size wrote to Kafka",
|| {
U64HistogramOptions::new(
// 512 bytes to 16MiB buckets.
[
512, 1024, 2048, 4096, 8192, 16384, 32768, 65536, 131072, 262144,
524288, 1048576, 2097152, 4194304, 8388608, 16777216,
],
)
},
)
.recorder(attr.clone());
let enqueue = metrics.register_metric::<DurationHistogram>( let enqueue = metrics.register_metric::<DurationHistogram>(
"write_buffer_client_produce_duration", "write_buffer_client_produce_duration",
"duration of time taken to push a set of records to kafka \ "duration of time taken to push a set of records to kafka \
@ -60,6 +86,7 @@ impl KafkaProducerMetrics {
time_provider: Default::default(), time_provider: Default::default(),
enqueue_success, enqueue_success,
enqueue_error, enqueue_error,
msg_size,
} }
} }
} }
@ -75,6 +102,7 @@ where
time_provider, time_provider,
enqueue_error: self.enqueue_error, enqueue_error: self.enqueue_error,
enqueue_success: self.enqueue_success, enqueue_success: self.enqueue_success,
msg_size: self.msg_size,
} }
} }
@ -85,6 +113,10 @@ where
records: Vec<Record>, records: Vec<Record>,
compression: Compression, compression: Compression,
) -> Result<Vec<i64>, rskafka::client::error::Error> { ) -> Result<Vec<i64>, rskafka::client::error::Error> {
// Capture the approximate message size.
self.msg_size
.record(records.iter().map(|v| v.approximate_size() as u64).sum());
let t = self.time_provider.now(); let t = self.time_provider.now();
let res = self.inner.produce(records, compression).await; let res = self.inner.produce(records, compression).await;
@ -120,6 +152,7 @@ mod tests {
use iox_time::Time; use iox_time::Time;
use metric::Metric; use metric::Metric;
use parking_lot::Mutex; use parking_lot::Mutex;
use rskafka::time::OffsetDateTime;
use super::*; use super::*;
@ -170,8 +203,15 @@ mod tests {
KafkaProducerMetrics::new(producer, KAFKA_TOPIC.to_string(), KAFKA_PARTITION, &metrics) KafkaProducerMetrics::new(producer, KAFKA_TOPIC.to_string(), KAFKA_PARTITION, &metrics)
.with_time_provider(Arc::clone(&clock)); .with_time_provider(Arc::clone(&clock));
let record = Record {
key: Some("bananas".into()),
value: None,
headers: Default::default(),
timestamp: OffsetDateTime::UNIX_EPOCH,
};
wrapper wrapper
.produce(Vec::new(), Compression::Snappy) .produce(vec![record.clone()], Compression::Snappy)
.await .await
.expect("produce call should succeed"); .expect("produce call should succeed");
@ -189,6 +229,20 @@ mod tests {
assert_eq!(histogram.sample_count(), 1); assert_eq!(histogram.sample_count(), 1);
assert_eq!(histogram.total, CALL_LATENCY); assert_eq!(histogram.total, CALL_LATENCY);
// Ensure the size was captured
let histogram = metrics
.get_instrument::<Metric<U64Histogram>>("write_buffer_client_payload_size")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[
("kafka_topic", KAFKA_TOPIC),
("kafka_partition", "42"),
]))
.expect("failed to get observer")
.fetch();
assert_eq!(histogram.sample_count(), 1);
assert_eq!(histogram.total, record.approximate_size() as u64);
} }
#[tokio::test] #[tokio::test]
@ -225,5 +279,19 @@ mod tests {
assert_eq!(histogram.sample_count(), 1); assert_eq!(histogram.sample_count(), 1);
assert_eq!(histogram.total, CALL_LATENCY); assert_eq!(histogram.total, CALL_LATENCY);
// Ensure the size was captured
let histogram = metrics
.get_instrument::<Metric<U64Histogram>>("write_buffer_client_payload_size")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[
("kafka_topic", KAFKA_TOPIC),
("kafka_partition", "42"),
]))
.expect("failed to get observer")
.fetch();
assert_eq!(histogram.sample_count(), 1);
assert_eq!(histogram.total, 0);
} }
} }