fix: Make metric names potentially less confusing
parent
a08a91c5ba
commit
6ce6a38094
|
@ -453,7 +453,7 @@ mod tests {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.get_observer(&Attributes::from(&[
|
.get_observer(&Attributes::from(&[
|
||||||
("kafka_topic", "whatevs"),
|
("kafka_topic", "whatevs"),
|
||||||
("sequencer_id", "0"),
|
("kafka_partition", "0"),
|
||||||
("result", "success"),
|
("result", "success"),
|
||||||
]))
|
]))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
@ -467,7 +467,7 @@ mod tests {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.get_observer(&Attributes::from(&[
|
.get_observer(&Attributes::from(&[
|
||||||
("kafka_topic", "whatevs"),
|
("kafka_topic", "whatevs"),
|
||||||
("sequencer_id", "0"),
|
("kafka_partition", "0"),
|
||||||
]))
|
]))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.fetch();
|
.fetch();
|
||||||
|
@ -479,7 +479,7 @@ mod tests {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.get_observer(&Attributes::from(&[
|
.get_observer(&Attributes::from(&[
|
||||||
("kafka_topic", "whatevs"),
|
("kafka_topic", "whatevs"),
|
||||||
("sequencer_id", "0"),
|
("kafka_partition", "0"),
|
||||||
]))
|
]))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.fetch();
|
.fetch();
|
||||||
|
@ -491,7 +491,7 @@ mod tests {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.get_observer(&Attributes::from(&[
|
.get_observer(&Attributes::from(&[
|
||||||
("kafka_topic", "whatevs"),
|
("kafka_topic", "whatevs"),
|
||||||
("sequencer_id", "0"),
|
("kafka_partition", "0"),
|
||||||
]))
|
]))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.fetch();
|
.fetch();
|
||||||
|
@ -503,7 +503,7 @@ mod tests {
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.get_observer(&Attributes::from(&[
|
.get_observer(&Attributes::from(&[
|
||||||
("kafka_topic", "whatevs"),
|
("kafka_topic", "whatevs"),
|
||||||
("sequencer_id", "0"),
|
("kafka_partition", "0"),
|
||||||
]))
|
]))
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.fetch();
|
.fetch();
|
||||||
|
|
|
@ -414,7 +414,7 @@ fn metric_attrs(
|
||||||
data_loss: bool,
|
data_loss: bool,
|
||||||
) -> Attributes {
|
) -> Attributes {
|
||||||
let mut attr = Attributes::from([
|
let mut attr = Attributes::from([
|
||||||
("sequencer_id", partition.to_string().into()),
|
("kafka_partition", partition.to_string().into()),
|
||||||
("kafka_topic", topic.to_string().into()),
|
("kafka_topic", topic.to_string().into()),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
|
@ -649,7 +649,7 @@ mod tests {
|
||||||
.expect("did not find ttbr metric")
|
.expect("did not find ttbr metric")
|
||||||
.get_observer(&Attributes::from([
|
.get_observer(&Attributes::from([
|
||||||
("kafka_topic", TEST_KAFKA_TOPIC.into()),
|
("kafka_topic", TEST_KAFKA_TOPIC.into()),
|
||||||
("sequencer_id", TEST_KAFKA_PARTITION.to_string().into()),
|
("kafka_partition", TEST_KAFKA_PARTITION.to_string().into()),
|
||||||
]))
|
]))
|
||||||
.expect("did not match metric attributes")
|
.expect("did not match metric attributes")
|
||||||
.fetch();
|
.fetch();
|
||||||
|
|
|
@ -84,7 +84,7 @@ where
|
||||||
metrics: &metric::Registry,
|
metrics: &metric::Registry,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let attr = Attributes::from([
|
let attr = Attributes::from([
|
||||||
("sequencer_id", kafka_partition.to_string().into()),
|
("kafka_partition", kafka_partition.to_string().into()),
|
||||||
("kafka_topic", kafka_topic_name.into()),
|
("kafka_topic", kafka_topic_name.into()),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
|
@ -199,7 +199,7 @@ where
|
||||||
.expect("entry from write buffer must be sequenced");
|
.expect("entry from write buffer must be sequenced");
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
sequence.sequencer_id as i32, self.sequencer_id,
|
sequence.sequencer_id as i32, self.sequencer_id,
|
||||||
"instrumentation for sequencer {} saw op from sequencer {}",
|
"instrumentation for kafka partition {} saw op from kafka partition {}",
|
||||||
self.sequencer_id, sequence.sequencer_id,
|
self.sequencer_id, sequence.sequencer_id,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -279,7 +279,7 @@ mod tests {
|
||||||
/// The attributes assigned to the metrics emitted by the
|
/// The attributes assigned to the metrics emitted by the
|
||||||
/// instrumentation when using the above sequencer / kafka topic values.
|
/// instrumentation when using the above sequencer / kafka topic values.
|
||||||
static ref DEFAULT_ATTRS: Attributes = Attributes::from([
|
static ref DEFAULT_ATTRS: Attributes = Attributes::from([
|
||||||
("sequencer_id", SEQUENCER_ID.to_string().into()),
|
("kafka_partition", SEQUENCER_ID.to_string().into()),
|
||||||
("kafka_topic", TEST_KAFKA_TOPIC.into()),
|
("kafka_topic", TEST_KAFKA_TOPIC.into()),
|
||||||
]);
|
]);
|
||||||
}
|
}
|
||||||
|
@ -621,12 +621,12 @@ mod tests {
|
||||||
|
|
||||||
// The instrumentation emits per-sequencer metrics, so upon observing an op
|
// The instrumentation emits per-sequencer metrics, so upon observing an op
|
||||||
// for a different sequencer it should panic.
|
// for a different sequencer it should panic.
|
||||||
#[should_panic = "instrumentation for sequencer 42 saw op from sequencer 52"]
|
#[should_panic = "instrumentation for kafka partition 42 saw op from kafka partition 52"]
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_op_different_sequencer_id() {
|
async fn test_op_different_sequencer_id() {
|
||||||
let metrics = metric::Registry::default();
|
let metrics = metric::Registry::default();
|
||||||
let meta = DmlMeta::sequenced(
|
let meta = DmlMeta::sequenced(
|
||||||
// A different sequencer ID from what the handler is configured to
|
// A different kafka partition ID from what the handler is configured to
|
||||||
// be instrumenting
|
// be instrumenting
|
||||||
Sequence::new(SEQUENCER_ID + 10, 100),
|
Sequence::new(SEQUENCER_ID + 10, 100),
|
||||||
*TEST_TIME,
|
*TEST_TIME,
|
||||||
|
|
|
@ -119,7 +119,7 @@ where
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
%partition_key,
|
%partition_key,
|
||||||
sequencer_id=%sequencer.id(),
|
kafka_partition=%sequencer.id(),
|
||||||
tables=%dml.table_count(),
|
tables=%dml.table_count(),
|
||||||
%namespace,
|
%namespace,
|
||||||
approx_size=%dml.size(),
|
approx_size=%dml.size(),
|
||||||
|
|
|
@ -45,11 +45,11 @@ impl Sequencer {
|
||||||
);
|
);
|
||||||
|
|
||||||
let enqueue_success = write.recorder([
|
let enqueue_success = write.recorder([
|
||||||
("shard_id", Cow::from(id.to_string())),
|
("kafka_partition", Cow::from(id.to_string())),
|
||||||
("result", Cow::from("success")),
|
("result", Cow::from("success")),
|
||||||
]);
|
]);
|
||||||
let enqueue_error = write.recorder([
|
let enqueue_error = write.recorder([
|
||||||
("shard_id", Cow::from(id.to_string())),
|
("kafka_partition", Cow::from(id.to_string())),
|
||||||
("result", Cow::from("error")),
|
("result", Cow::from("error")),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
|
|
|
@ -227,7 +227,7 @@ async fn test_write_ok() {
|
||||||
.get_instrument::<Metric<U64Histogram>>("sequencer_enqueue_duration_ms")
|
.get_instrument::<Metric<U64Histogram>>("sequencer_enqueue_duration_ms")
|
||||||
.expect("failed to read metric")
|
.expect("failed to read metric")
|
||||||
.get_observer(&Attributes::from(&[
|
.get_observer(&Attributes::from(&[
|
||||||
("shard_id", "0"),
|
("kafka_partition", "0"),
|
||||||
("result", "success"),
|
("result", "success"),
|
||||||
]))
|
]))
|
||||||
.expect("failed to get observer")
|
.expect("failed to get observer")
|
||||||
|
|
|
@ -47,7 +47,10 @@ impl WriteSummary {
|
||||||
|
|
||||||
let mut sequencers = BTreeMap::new();
|
let mut sequencers = BTreeMap::new();
|
||||||
for s in sequences {
|
for s in sequences {
|
||||||
let sequencer_id: i32 = s.sequencer_id.try_into().expect("Invalid sequencer id");
|
let sequencer_id: i32 = s
|
||||||
|
.sequencer_id
|
||||||
|
.try_into()
|
||||||
|
.expect("Invalid Kafka Partition id");
|
||||||
|
|
||||||
// This is super confusing: "sequencer_id" in the router
|
// This is super confusing: "sequencer_id" in the router
|
||||||
// and other parts of the codebase refers to what the
|
// and other parts of the codebase refers to what the
|
||||||
|
|
Loading…
Reference in New Issue