test: ensure that Kafka producers also generate sane metrics
parent
b7952c15a6
commit
459c14035c
|
@ -1009,7 +1009,38 @@ mod tests {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn metrics() {
|
||||
async fn metrics_producer() {
|
||||
let conn = maybe_skip_kafka_integration!();
|
||||
let adapter = KafkaTestAdapter::new(conn);
|
||||
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
|
||||
|
||||
let _writer = ctx.writing(true).await.unwrap();
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(5), async {
|
||||
loop {
|
||||
if let Some(instrument) = ctx
|
||||
.metric_registry
|
||||
.get_instrument::<Metric<U64Gauge>>("kafka_rx_bytes")
|
||||
{
|
||||
if let Some(observer) = instrument.get_observer(&metric::Attributes::from([
|
||||
("database", ctx.database_name.clone().into()),
|
||||
("client_type", "producer".into()),
|
||||
])) {
|
||||
let observation = observer.fetch();
|
||||
assert_ne!(observation, 0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn metrics_consumer() {
|
||||
let conn = maybe_skip_kafka_integration!();
|
||||
let adapter = KafkaTestAdapter::new(conn);
|
||||
let ctx = adapter.new_context(NonZeroU32::new(1).unwrap()).await;
|
||||
|
|
Loading…
Reference in New Issue