feat: Add kafka write buffer consumer metrics (#3129)
* feat: Add kafka write buffer consumer metrics * refactor: use unwrap_or_else * fix: Update bucket boundariespull/24376/head
parent
47acd181c5
commit
b5a7bf03da
|
@ -8,13 +8,13 @@ use std::{
|
|||
|
||||
use async_trait::async_trait;
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use metric::{Metric, U64Gauge};
|
||||
use metric::{Metric, U64Gauge, U64Histogram, U64HistogramOptions};
|
||||
use rdkafka::{
|
||||
admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
|
||||
client::DefaultClientContext,
|
||||
consumer::{BaseConsumer, Consumer, ConsumerContext, StreamConsumer},
|
||||
error::KafkaError,
|
||||
message::{Headers, OwnedHeaders},
|
||||
message::{BorrowedMessage, Headers, OwnedHeaders},
|
||||
producer::{FutureProducer, FutureRecord},
|
||||
types::RDKafkaErrorCode,
|
||||
util::Timeout,
|
||||
|
@ -192,6 +192,7 @@ pub struct KafkaBufferConsumer {
|
|||
database_name: String,
|
||||
consumers: BTreeMap<u32, Arc<StreamConsumer<ClientContextImpl>>>,
|
||||
trace_collector: Option<Arc<dyn TraceCollector>>,
|
||||
write_buffer_ingest_entry_size: Metric<U64Histogram>,
|
||||
}
|
||||
|
||||
// Needed because rdkafka's StreamConsumer doesn't impl Debug
|
||||
|
@ -204,6 +205,14 @@ impl std::fmt::Debug for KafkaBufferConsumer {
|
|||
}
|
||||
}
|
||||
|
||||
// Iterate over the kafka messages
|
||||
fn header_iter<'a>(message: &'a BorrowedMessage<'_>) -> impl Iterator<Item = (&'a str, &'a [u8])> {
|
||||
message
|
||||
.headers()
|
||||
.into_iter()
|
||||
.flat_map(|headers| (0..headers.count()).map(|idx| headers.get(idx).unwrap()))
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl WriteBufferReading for KafkaBufferConsumer {
|
||||
fn streams(&mut self) -> BTreeMap<u32, WriteStream<'_>> {
|
||||
|
@ -215,18 +224,30 @@ impl WriteBufferReading for KafkaBufferConsumer {
|
|||
let database_name = self.database_name.clone();
|
||||
let trace_collector = self.trace_collector.clone();
|
||||
|
||||
// prepare a metric recorder for this sequencer
|
||||
let attributes = metric::Attributes::from([("database", database_name.clone().into())]);
|
||||
let write_buffer_ingest_entry_size =
|
||||
self.write_buffer_ingest_entry_size.recorder(attributes);
|
||||
|
||||
let stream = consumer
|
||||
.stream()
|
||||
.map(move |message| {
|
||||
let message = message?;
|
||||
|
||||
let kafka_headers = message.headers().into_iter().flat_map(|headers| (0..headers.count()).map(|idx| headers.get(idx).unwrap()));
|
||||
let kafka_headers = header_iter(&message);
|
||||
let headers = IoxHeaders::from_headers(kafka_headers, trace_collector.as_ref())?;
|
||||
|
||||
let payload = message.payload().ok_or_else::<WriteBufferError, _>(|| {
|
||||
"Payload missing".to_string().into()
|
||||
})?;
|
||||
|
||||
// Estimate size of data read from kafka as
|
||||
// payload len + key len + headers
|
||||
let kafka_read_size: usize = payload.len() +
|
||||
message.key().map(|key| key.len()).unwrap_or_default() +
|
||||
header_iter(&message).map(|(key, value)| key.len() + value.len()).sum::<usize>();
|
||||
write_buffer_ingest_entry_size
|
||||
.record(kafka_read_size as u64);
|
||||
|
||||
// Timestamps were added as part of
|
||||
// [KIP-32](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message).
|
||||
// The tracking issue [KAFKA-2511](https://issues.apache.org/jira/browse/KAFKA-2511) states that
|
||||
|
@ -395,11 +416,30 @@ impl KafkaBufferConsumer {
|
|||
.collect::<Result<BTreeMap<u32, Arc<StreamConsumer<ClientContextImpl>>>, KafkaError>>(
|
||||
)?;
|
||||
|
||||
let write_buffer_ingest_entry_size: Metric<U64Histogram> = metric_registry
|
||||
.register_metric_with_options(
|
||||
"write_buffer_ingest_entry_size",
|
||||
"distribution of ingested Kafka message sizes",
|
||||
|| {
|
||||
U64HistogramOptions::new([
|
||||
1024,
|
||||
16 * 1024,
|
||||
256 * 1024,
|
||||
768 * 1024,
|
||||
1024 * 1024,
|
||||
3 * 1024 * 1024,
|
||||
10 * 1024 * 1024,
|
||||
u64::MAX,
|
||||
])
|
||||
},
|
||||
);
|
||||
|
||||
Ok(Self {
|
||||
conn,
|
||||
database_name,
|
||||
consumers,
|
||||
trace_collector: trace_collector.map(|x| Arc::clone(x)),
|
||||
write_buffer_ingest_entry_size,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -703,6 +743,8 @@ pub mod test_utils {
|
|||
}
|
||||
}
|
||||
|
||||
/// Kafka tests (only run when in integration test mode and kafka is running).
|
||||
/// see [`crate::maybe_skip_kafka_integration`] for more details.
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::{
|
||||
|
@ -716,7 +758,8 @@ mod tests {
|
|||
use crate::codec::HEADER_CONTENT_TYPE;
|
||||
use crate::{
|
||||
core::test_utils::{
|
||||
map_pop_first, perform_generic_tests, set_pop_first, TestAdapter, TestContext,
|
||||
map_pop_first, perform_generic_tests, set_pop_first, write as write_to_writer,
|
||||
TestAdapter, TestContext,
|
||||
},
|
||||
kafka::test_utils::random_kafka_topic,
|
||||
maybe_skip_kafka_integration,
|
||||
|
@ -961,4 +1004,59 @@ mod tests {
|
|||
|
||||
background_task.abort();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_ingest_metrics() {
|
||||
let conn = maybe_skip_kafka_integration!();
|
||||
let adapter = KafkaTestAdapter::new(conn);
|
||||
|
||||
let context = adapter.new_context(NonZeroU32::try_from(1).unwrap()).await;
|
||||
|
||||
let entry_1 = "upc user=1 100";
|
||||
|
||||
let writer = context.writing(true).await.unwrap();
|
||||
let mut reader = context.reading(true).await.unwrap();
|
||||
|
||||
let mut streams = reader.streams();
|
||||
assert_eq!(streams.len(), 1);
|
||||
let (sequencer_id, mut stream) = map_pop_first(&mut streams).unwrap();
|
||||
|
||||
// Send some data into the buffer and read it out
|
||||
write_to_writer(&writer, entry_1, sequencer_id, None).await;
|
||||
stream.stream.next().await.unwrap().unwrap();
|
||||
|
||||
let metric: Metric<U64Histogram> = context
|
||||
.metric_registry
|
||||
.get_instrument("write_buffer_ingest_entry_size")
|
||||
.unwrap();
|
||||
|
||||
let observation = metric
|
||||
.get_observer(&metric::Attributes::from([(
|
||||
"database",
|
||||
context.database_name.clone().into(),
|
||||
)]))
|
||||
.unwrap()
|
||||
.fetch();
|
||||
|
||||
assert_eq!(observation.total, 197, "Observation: {:#?}", observation);
|
||||
assert_eq!(
|
||||
observation.buckets.len(),
|
||||
8,
|
||||
"Observation: {:#?}",
|
||||
observation
|
||||
);
|
||||
assert_eq!(
|
||||
observation.buckets[0],
|
||||
metric::ObservationBucket { le: 1024, count: 1 },
|
||||
"Observation: {:#?}",
|
||||
observation
|
||||
);
|
||||
|
||||
// should be no other observations in this histogram
|
||||
observation
|
||||
.buckets
|
||||
.iter()
|
||||
.skip(1)
|
||||
.for_each(|bucket| assert_eq!(bucket.count, 0, "{:#?}", observation));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue