feat: Kafka client produce() instrumentation

Adds a decorator over the underlying kafka client to capture the latency
distribution of the low-level kafka writes, independent of the
aggregation/DML batching framework that sits "above" this client.

The latency measurements include the serialisation overhead, protocol
overhead, and actual network I/O.
pull/24376/head
Dom Dwyer 2022-08-04 16:32:31 +02:00
parent d003fe0047
commit 284a3069ce
5 changed files with 233 additions and 3 deletions

2
Cargo.lock generated
View File

@ -4187,7 +4187,7 @@ dependencies = [
[[package]]
name = "rskafka"
version = "0.3.0"
source = "git+https://github.com/influxdata/rskafka.git?rev=e34f6f485db9256a7614220cabea86f7c44f5eb6#e34f6f485db9256a7614220cabea86f7c44f5eb6"
source = "git+https://github.com/influxdata/rskafka.git?rev=9f1e36e8f34a4b0a7323bde623667470b4559a7d#9f1e36e8f34a4b0a7323bde623667470b4559a7d"
dependencies = [
"async-socks5",
"async-trait",

View File

@ -118,4 +118,4 @@ incremental = true
[patch.crates-io]
# remove and bump object_store dep version once this revision is released.
object_store = { git = 'https://github.com/influxdata/object_store_rs', rev = "3c51870ac41a90942c2e45bb499a893d514ed1da"}
object_store = { git = 'https://github.com/influxdata/object_store_rs', rev = "3c51870ac41a90942c2e45bb499a893d514ed1da"}

View File

@ -22,7 +22,7 @@ observability_deps = { path = "../observability_deps" }
parking_lot = "0.12"
pin-project = "1.0"
prost = "0.10"
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="e34f6f485db9256a7614220cabea86f7c44f5eb6", default-features = false, features = ["compression-snappy", "transport-socks5"] }
rskafka = { git = "https://github.com/influxdata/rskafka.git", rev="9f1e36e8f34a4b0a7323bde623667470b4559a7d", default-features = false, features = ["compression-snappy", "transport-socks5"] }
schema = { path = "../schema" }
tokio = { version = "1.20", features = ["fs", "macros", "parking_lot", "rt", "sync", "time"] }
tokio-util = "0.7.3"

View File

@ -0,0 +1,229 @@
use std::result::Result;
use data_types::KafkaPartition;
use futures::future::BoxFuture;
use iox_time::{SystemProvider, TimeProvider};
use metric::{Attributes, DurationHistogram};
use rskafka::{
client::{partition::Compression, producer::ProducerClient},
record::Record,
};
/// An instrumentation layer that decorates a [`ProducerClient`] implementation,
/// recording the latency distribution and success/error result of the
/// underlying [`ProducerClient::produce()`] call, which includes serialisation
/// & protocol overhead, as well as the actual network I/O.
///
/// The metrics created by this instrumentation are labelled with the kafka
/// topic & partition specified at initialisation.
#[derive(Debug)]
pub struct KafkaProducerMetrics<P = SystemProvider> {
inner: Box<dyn ProducerClient>,
time_provider: P,
enqueue_success: DurationHistogram,
enqueue_error: DurationHistogram,
}
impl KafkaProducerMetrics {
/// Decorate the specified [`ProducerClient`] implementation with an
/// instrumentation layer.
pub fn new(
client: Box<dyn ProducerClient>,
kafka_topic_name: String,
kafka_partition: KafkaPartition,
metrics: &metric::Registry,
) -> Self {
let attr = Attributes::from([
("kafka_partition", kafka_partition.to_string().into()),
("kafka_topic", kafka_topic_name.into()),
]);
let enqueue = metrics.register_metric::<DurationHistogram>(
"write_buffer_client_produce_duration",
"duration of time taken to push a set of records to kafka \
- includes codec, protocol, and network overhead",
);
let enqueue_success = enqueue.recorder({
let mut attr = attr.clone();
attr.insert("result", "success");
attr
});
let enqueue_error = enqueue.recorder({
let mut attr = attr;
attr.insert("result", "error");
attr
});
Self {
inner: client,
time_provider: Default::default(),
enqueue_success,
enqueue_error,
}
}
}
impl<P> KafkaProducerMetrics<P>
where
P: TimeProvider,
{
#[cfg(test)]
fn with_time_provider<T>(self, time_provider: T) -> KafkaProducerMetrics<T> {
KafkaProducerMetrics {
inner: self.inner,
time_provider,
enqueue_error: self.enqueue_error,
enqueue_success: self.enqueue_success,
}
}
/// Call the inner [`ProducerClient`] implementation and record latency in
/// the appropriate success/error metric depending on the result.
async fn instrument(
&self,
records: Vec<Record>,
compression: Compression,
) -> Result<Vec<i64>, rskafka::client::error::Error> {
let t = self.time_provider.now();
let res = self.inner.produce(records, compression).await;
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
match &res {
Ok(_) => self.enqueue_success.record(delta),
Err(_) => self.enqueue_error.record(delta),
}
}
res
}
}
impl<P> rskafka::client::producer::ProducerClient for KafkaProducerMetrics<P>
where
P: TimeProvider,
{
fn produce(
&self,
records: Vec<Record>,
compression: Compression,
) -> BoxFuture<'_, Result<Vec<i64>, rskafka::client::error::Error>> {
Box::pin(self.instrument(records, compression))
}
}
#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};
use iox_time::Time;
use metric::Metric;
use parking_lot::Mutex;
use super::*;
const KAFKA_TOPIC: &str = "bananas";
const KAFKA_PARTITION: KafkaPartition = KafkaPartition::new(42);
/// The duration of time the MockProducer::produce() takes to "execute"
const CALL_LATENCY: Duration = Duration::from_secs(1);
#[derive(Debug)]
pub struct MockProducer {
clock: Arc<iox_time::MockProvider>,
ret: Mutex<Option<Result<Vec<i64>, rskafka::client::error::Error>>>,
}
impl MockProducer {
pub fn new(
clock: Arc<iox_time::MockProvider>,
ret: Result<Vec<i64>, rskafka::client::error::Error>,
) -> Self {
Self {
clock,
ret: Mutex::new(Some(ret)),
}
}
}
impl rskafka::client::producer::ProducerClient for MockProducer {
fn produce(
&self,
_records: Vec<Record>,
_compression: Compression,
) -> BoxFuture<'_, Result<Vec<i64>, rskafka::client::error::Error>> {
// Jump the clock by 1s so the metrics to observe a 1s latency
self.clock.inc(CALL_LATENCY);
// And return the configured response
Box::pin(async { self.ret.lock().take().expect("multiple calls to mock") })
}
}
#[tokio::test]
async fn test_produce_instrumentation_success() {
let clock = Arc::new(iox_time::MockProvider::new(Time::MIN));
let producer = Box::new(MockProducer::new(Arc::clone(&clock), Ok(Vec::default())));
let metrics = metric::Registry::default();
let wrapper =
KafkaProducerMetrics::new(producer, KAFKA_TOPIC.to_string(), KAFKA_PARTITION, &metrics)
.with_time_provider(Arc::clone(&clock));
wrapper
.produce(Vec::new(), Compression::Snappy)
.await
.expect("produce call should succeed");
// Ensure the latency was correctly recorded.
let histogram = metrics
.get_instrument::<Metric<DurationHistogram>>("write_buffer_client_produce_duration")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[
("kafka_topic", KAFKA_TOPIC),
("kafka_partition", "42"),
("result", "success"),
]))
.expect("failed to get observer")
.fetch();
assert_eq!(histogram.sample_count(), 1);
assert_eq!(histogram.total, CALL_LATENCY);
}
#[tokio::test]
async fn test_produce_instrumentation_error() {
let clock = Arc::new(iox_time::MockProvider::new(Time::MIN));
let producer = Box::new(MockProducer::new(
Arc::clone(&clock),
Err(rskafka::client::error::Error::InvalidResponse(
"bananas".to_string(),
)),
));
let metrics = metric::Registry::default();
let wrapper =
KafkaProducerMetrics::new(producer, KAFKA_TOPIC.to_string(), KAFKA_PARTITION, &metrics)
.with_time_provider(Arc::clone(&clock));
wrapper
.produce(Vec::new(), Compression::Snappy)
.await
.expect_err("produce call should fail");
// Ensure the latency was correctly recorded.
let histogram = metrics
.get_instrument::<Metric<DurationHistogram>>("write_buffer_client_produce_duration")
.expect("failed to read metric")
.get_observer(&Attributes::from(&[
("kafka_topic", KAFKA_TOPIC),
("kafka_partition", "42"),
("result", "error"),
]))
.expect("failed to get observer")
.fetch();
assert_eq!(histogram.sample_count(), 1);
assert_eq!(histogram.total, CALL_LATENCY);
}
}

View File

@ -38,6 +38,7 @@ use trace::TraceCollector;
mod aggregator;
mod config;
mod instrumentation;
/// Maximum number of jobs buffered and decoded concurrently.
const CONCURRENT_DECODE_JOBS: usize = 10;