feat: instrument kafka aggregated DML batch size

The Kafka write buffer implementation (and only the Kafka impl) merges
together successive DML writes for the same namespace & partition within
a window of time.

This commit records the number of DML writes that have been merged
together to form a single batched op before it is dispatched to Kafka.
pull/24376/head
Dom Dwyer 2022-08-04 11:59:59 +02:00
parent 5f9ade103f
commit 77fd967517
3 changed files with 85 additions and 1 deletions

View File

@ -174,6 +174,7 @@ impl WriteBufferConfigFactory {
cfg.creation_config.as_ref(),
Arc::clone(&self.time_provider),
trace_collector.map(Arc::clone),
&*self.metric_registry,
)
.await?;
Arc::new(rskafa_buffer) as _

View File

@ -3,6 +3,7 @@ use data_types::{PartitionKey, Sequence, SequenceNumber};
use dml::{DmlMeta, DmlOperation, DmlWrite};
use hashbrown::{hash_map::Entry, HashMap};
use iox_time::{Time, TimeProvider};
use metric::{U64Histogram, U64HistogramOptions};
use mutable_batch::MutableBatch;
use observability_deps::tracing::{debug, error, warn};
use rskafka::{
@ -45,10 +46,21 @@ struct WriteAggregator {
/// Trace collector
collector: Option<Arc<dyn TraceCollector>>,
/// Number of DML writes coalesced into this aggregator.
num_ops: u64,
/// A metric recorder that observes num_ops when finalising the batch.
num_ops_recorder: U64Histogram,
}
impl WriteAggregator {
fn new(write: DmlWrite, collector: Option<Arc<dyn TraceCollector>>, tag: Tag) -> Self {
fn new(
write: DmlWrite,
collector: Option<Arc<dyn TraceCollector>>,
tag: Tag,
batch_coalesced_ops: U64Histogram,
) -> Self {
let mut span_recorder = None;
Self::record_span(&mut span_recorder, write.meta().span_context(), &collector);
@ -64,6 +76,8 @@ impl WriteAggregator {
span_recorder,
tag,
collector,
num_ops: 1, // Inclusive of the write op this aggregator was created with
num_ops_recorder: batch_coalesced_ops,
}
}
@ -135,6 +149,8 @@ impl WriteAggregator {
&self.partition_key
);
self.num_ops += 1;
Self::record_span(
&mut self.span_recorder,
write.meta().span_context(),
@ -158,6 +174,9 @@ impl WriteAggregator {
if let Some(span_recorder) = self.span_recorder.as_mut() {
span_recorder.ok("aggregated");
}
// Record the number of ops that were aggregated into this batch.
self.num_ops_recorder.record(self.num_ops);
}
/// Encode into DML write.
@ -290,6 +309,10 @@ pub struct DmlAggregator {
/// Time provider.
time_provider: Arc<dyn TimeProvider>,
/// A metric capturing the distribution of coalesced [`DmlWrite`] count per
/// aggregated batch.
batch_coalesced_ops: U64Histogram,
}
impl DmlAggregator {
@ -299,7 +322,22 @@ impl DmlAggregator {
max_size: usize,
sequencer_id: u32,
time_provider: Arc<dyn TimeProvider>,
metric_registry: &metric::Registry,
) -> Self {
// Register a metric recording the number of ops batched together before
// being pushed to kafka.
//
// Register here and pass the recorder into each WriteAggregator to
// avoid locking the metric repository each time a WriteAggregator is
// created.
let batch_coalesced_ops: U64Histogram = metric_registry
.register_metric_with_options::<U64Histogram, _>(
"write_buffer_batch_coalesced_write_ops",
"number of dml writes aggregated into a single kafka produce operation",
|| U64HistogramOptions::new([1, 2, 4, 8, 16, 32, 64, 128]),
)
.recorder(&[]);
Self {
collector,
database_name: database_name.into(),
@ -307,6 +345,7 @@ impl DmlAggregator {
sequencer_id,
state: DmlAggregatorState::default(),
time_provider,
batch_coalesced_ops,
}
}
}
@ -378,6 +417,7 @@ impl Aggregator for DmlAggregator {
write,
self.collector.as_ref().map(Arc::clone),
new_tag,
self.batch_coalesced_ops.clone(),
);
// Replace current aggregator
@ -397,6 +437,7 @@ impl Aggregator for DmlAggregator {
write,
self.collector.as_ref().map(Arc::clone),
tag,
self.batch_coalesced_ops.clone(),
),
op_record,
op_md,

View File

@ -57,6 +57,7 @@ impl RSKafkaProducer {
creation_config: Option<&WriteBufferCreationConfig>,
time_provider: Arc<dyn TimeProvider>,
trace_collector: Option<Arc<dyn TraceCollector>>,
metric_registry: &metric::Registry,
) -> Result<Self> {
let partition_clients = setup_topic(
conn,
@ -81,6 +82,7 @@ impl RSKafkaProducer {
producer_config.max_batch_size,
sequencer_id,
Arc::clone(&time_provider),
metric_registry,
));
(sequencer_id, producer)
@ -463,6 +465,7 @@ mod tests {
use data_types::{DeletePredicate, PartitionKey, TimestampRange};
use dml::{test_util::assert_write_op_eq, DmlDelete, DmlWrite};
use futures::{stream::FuturesUnordered, TryStreamExt};
use metric::{Metric, U64Histogram};
use rskafka::{client::partition::Compression, record::Record};
use std::num::NonZeroU32;
use test_helpers::assert_contains;
@ -493,6 +496,7 @@ mod tests {
n_sequencers,
time_provider,
trace_collector: Arc::new(RingBufferTraceCollector::new(100)),
metrics: metric::Registry::default(),
}
}
}
@ -503,6 +507,7 @@ mod tests {
n_sequencers: NonZeroU32,
time_provider: Arc<dyn TimeProvider>,
trace_collector: Arc<RingBufferTraceCollector>,
metrics: metric::Registry,
}
impl RSKafkaTestContext {
@ -512,6 +517,10 @@ mod tests {
..Default::default()
})
}
fn metrics(&self) -> &metric::Registry {
&self.metrics
}
}
#[async_trait]
@ -528,6 +537,7 @@ mod tests {
self.creation_config(creation_config).as_ref(),
Arc::clone(&self.time_provider),
Some(self.trace_collector() as Arc<_>),
&self.metrics,
)
.await
}
@ -676,6 +686,38 @@ mod tests {
assert_ne!(w2_1.sequence().unwrap(), w1_1.sequence().unwrap());
assert_eq!(w2_1.sequence().unwrap(), w2_2.sequence().unwrap());
// Assert the batch sizes were captured in the metrics
let histogram = ctx
.metrics()
.get_instrument::<Metric<U64Histogram>>("write_buffer_batch_coalesced_write_ops")
.expect("failed to read metric")
.get_observer(&[].into())
.expect("failed to get observer")
.fetch();
assert_eq!(
histogram.sample_count(),
3,
"metric did not record expected batch count"
);
// Validate the expected bucket values.
[
(1, 0),
(2, 3),
(4, 0),
(8, 0),
(16, 0),
(32, 0),
(64, 0),
(128, 0),
]
.into_iter()
.enumerate()
.for_each(|(i, (le, count))| {
let bucket = &histogram.buckets[i];
assert_eq!(bucket.le, le);
assert_eq!(bucket.count, count, "bucket le={}", le);
});
let consumer = ctx.reading(true).await.unwrap();
let mut handler = consumer.stream_handler(sequencer_id).await.unwrap();
let mut stream = handler.stream().await;