refactor: use kafka produce instrumentation
This commit changes the IOx write buffer initialisation code to add the KafkaProducerMetrics instrumentation to the per-partition Kafka clients.pull/24376/head
parent
284a3069ce
commit
c133cf22c6
|
@ -1,6 +1,7 @@
|
||||||
use self::{
|
use self::{
|
||||||
aggregator::DmlAggregator,
|
aggregator::DmlAggregator,
|
||||||
config::{ClientConfig, ConsumerConfig, ProducerConfig, TopicCreationConfig},
|
config::{ClientConfig, ConsumerConfig, ProducerConfig, TopicCreationConfig},
|
||||||
|
instrumentation::KafkaProducerMetrics,
|
||||||
};
|
};
|
||||||
use crate::{
|
use crate::{
|
||||||
codec::IoxHeaders,
|
codec::IoxHeaders,
|
||||||
|
@ -11,7 +12,7 @@ use crate::{
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use data_types::{Sequence, SequenceNumber};
|
use data_types::{KafkaPartition, Sequence, SequenceNumber};
|
||||||
use dml::{DmlMeta, DmlOperation};
|
use dml::{DmlMeta, DmlOperation};
|
||||||
use futures::{stream::BoxStream, StreamExt};
|
use futures::{stream::BoxStream, StreamExt};
|
||||||
use iox_time::{Time, TimeProvider};
|
use iox_time::{Time, TimeProvider};
|
||||||
|
@ -73,7 +74,16 @@ impl RSKafkaProducer {
|
||||||
let producers = partition_clients
|
let producers = partition_clients
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(sequencer_id, partition_client)| {
|
.map(|(sequencer_id, partition_client)| {
|
||||||
let mut producer_builder = BatchProducerBuilder::new(Arc::new(partition_client));
|
// Instrument this kafka partition client.
|
||||||
|
let partition_client = KafkaProducerMetrics::new(
|
||||||
|
Box::new(partition_client),
|
||||||
|
database_name.clone(),
|
||||||
|
KafkaPartition::new(sequencer_id.try_into().unwrap()),
|
||||||
|
&*metric_registry,
|
||||||
|
);
|
||||||
|
|
||||||
|
let mut producer_builder =
|
||||||
|
BatchProducerBuilder::new_with_client(Arc::new(partition_client));
|
||||||
if let Some(linger) = producer_config.linger {
|
if let Some(linger) = producer_config.linger {
|
||||||
producer_builder = producer_builder.with_linger(linger);
|
producer_builder = producer_builder.with_linger(linger);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue