diff --git a/influxdb_iox/src/commands/run/router2.rs b/influxdb_iox/src/commands/run/router2.rs index 97f144f191..6293b18fb6 100644 --- a/influxdb_iox/src/commands/run/router2.rs +++ b/influxdb_iox/src/commands/run/router2.rs @@ -227,7 +227,7 @@ async fn init_write_buffer( let write_buffer = Arc::new( config .write_buffer_config - .writing(metrics, trace_collector) + .writing(Arc::clone(&metrics), trace_collector) .await?, ); @@ -248,7 +248,7 @@ async fn init_write_buffer( Ok(ShardedWriteBuffer::new( shards .into_iter() - .map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer))) + .map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &metrics)) .map(Arc::new) .collect::>(), )) diff --git a/router2/Cargo.toml b/router2/Cargo.toml index 2c39435ea2..48281be2a6 100644 --- a/router2/Cargo.toml +++ b/router2/Cargo.toml @@ -47,6 +47,10 @@ schema = { path = "../schema" } name = "sharder" harness = false +[[bench]] +name = "schema_validator" +harness = false + [[bench]] name = "e2e" harness = false diff --git a/router2/benches/e2e.rs b/router2/benches/e2e.rs index 5ea87269cb..aad9cc6d84 100644 --- a/router2/benches/e2e.rs +++ b/router2/benches/e2e.rs @@ -35,7 +35,7 @@ fn init_write_buffer(n_sequencers: u32) -> ShardedWriteBuffer>(), ) diff --git a/router2/benches/schema_validator.rs b/router2/benches/schema_validator.rs new file mode 100644 index 0000000000..0899597406 --- /dev/null +++ b/router2/benches/schema_validator.rs @@ -0,0 +1,93 @@ +use std::{iter, sync::Arc}; + +use criterion::{ + criterion_group, criterion_main, measurement::WallTime, BatchSize, BenchmarkGroup, Criterion, + Throughput, +}; +use data_types2::DatabaseName; +use hashbrown::HashMap; +use iox_catalog::mem::MemCatalog; +use mutable_batch::MutableBatch; +use router2::{ + dml_handlers::{DmlHandler, SchemaValidator}, + namespace_cache::{MemoryNamespaceCache, ShardedCache}, +}; +use schema::selection::Selection; +use tokio::runtime::Runtime; + +lazy_static::lazy_static! { + static ref NAMESPACE: DatabaseName<'static> = "bananas".try_into().unwrap(); +} + +fn runtime() -> Runtime { + tokio::runtime::Builder::new_current_thread() + .build() + .unwrap() +} + +fn sharder_benchmarks(c: &mut Criterion) { + let mut group = c.benchmark_group("schema_validator"); + + bench(&mut group, 1, 1); + + bench(&mut group, 1, 100); + bench(&mut group, 1, 10000); + + bench(&mut group, 100, 1); + bench(&mut group, 10000, 1); + + group.finish(); +} + +fn bench(group: &mut BenchmarkGroup, tables: usize, columns_per_table: usize) { + let metrics = Arc::new(metric::Registry::default()); + + let catalog = Arc::new(MemCatalog::new(Arc::clone(&metrics))); + let ns_cache = Arc::new(ShardedCache::new( + iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), + )); + let validator = SchemaValidator::new(catalog, ns_cache); + + for i in 0..65_000 { + let write = lp_to_writes(format!("{}{}", i + 10_000_000, generate_lp(1, 1)).as_str()); + let _ = runtime().block_on(validator.write(&*NAMESPACE, write, None)); + } + + let write = lp_to_writes(&generate_lp(tables, columns_per_table)); + let column_count = write + .values() + .fold(0, |acc, b| acc + b.schema(Selection::All).unwrap().len()); + + group.throughput(Throughput::Elements(column_count as _)); + group.bench_function(format!("{tables}x{columns_per_table}"), |b| { + b.to_async(runtime()).iter_batched( + || write.clone(), + |write| validator.write(&*NAMESPACE, write, None), + BatchSize::SmallInput, + ); + }); +} + +fn generate_lp(tables: usize, columns_per_table: usize) -> String { + (0..tables) + .map(|i| { + let cols = (0..columns_per_table) + .map(|i| format!("val{}=42i", i)) + .collect::>() + .join(","); + + format!("table{i},tag=A {cols}") + }) + .collect::>() + .join("\n") +} + +// Parse `lp` into a table-keyed MutableBatch map. +fn lp_to_writes(lp: &str) -> HashMap { + let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42) + .expect("failed to build test writes from LP"); + writes +} + +criterion_group!(benches, sharder_benchmarks); +criterion_main!(benches); diff --git a/router2/src/dml_handlers/sharded_write_buffer.rs b/router2/src/dml_handlers/sharded_write_buffer.rs index 2f12fe0c69..51953215cb 100644 --- a/router2/src/dml_handlers/sharded_write_buffer.rs +++ b/router2/src/dml_handlers/sharded_write_buffer.rs @@ -237,7 +237,11 @@ mod tests { // Configure the sharder to return shards containing the mock write // buffer. - let shard = Arc::new(Sequencer::new(0, Arc::new(write_buffer))); + let shard = Arc::new(Sequencer::new( + 0, + Arc::new(write_buffer), + &Default::default(), + )); let sharder = Arc::new(MockSharder::default().with_return([ Arc::clone(&shard), Arc::clone(&shard), @@ -285,13 +289,21 @@ mod tests { // Configure the first shard to write to one write buffer let write_buffer1 = init_write_buffer(1); let write_buffer1_state = write_buffer1.state(); - let shard1 = Arc::new(Sequencer::new(0, Arc::new(write_buffer1))); + let shard1 = Arc::new(Sequencer::new( + 0, + Arc::new(write_buffer1), + &Default::default(), + )); // Configure the second shard to write to a different write buffer in // order to see which buffer saw what write. let write_buffer2 = init_write_buffer(2); let write_buffer2_state = write_buffer2.state(); - let shard2 = Arc::new(Sequencer::new(1, Arc::new(write_buffer2))); + let shard2 = Arc::new(Sequencer::new( + 1, + Arc::new(write_buffer2), + &Default::default(), + )); let sharder = Arc::new(MockSharder::default().with_return([ // 4 tables, 3 mapped to the first shard @@ -359,12 +371,20 @@ mod tests { // Configure the first shard to write to one write buffer let write_buffer1 = init_write_buffer(1); let write_buffer1_state = write_buffer1.state(); - let shard1 = Arc::new(Sequencer::new(0, Arc::new(write_buffer1))); + let shard1 = Arc::new(Sequencer::new( + 0, + Arc::new(write_buffer1), + &Default::default(), + )); // Configure the second shard to write to a write buffer that always fails let write_buffer2 = init_write_buffer(1); // Non-existant sequencer ID to trigger an error. - let shard2 = Arc::new(Sequencer::new(13, Arc::new(write_buffer2))); + let shard2 = Arc::new(Sequencer::new( + 13, + Arc::new(write_buffer2), + &Default::default(), + )); let sharder = Arc::new( MockSharder::default().with_return([Arc::clone(&shard1), Arc::clone(&shard2)]), @@ -403,7 +423,11 @@ mod tests { // Configure the sharder to return shards containing the mock write // buffer. - let shard = Arc::new(Sequencer::new(0, Arc::new(write_buffer))); + let shard = Arc::new(Sequencer::new( + 0, + Arc::new(write_buffer), + &Default::default(), + )); let sharder = Arc::new(MockSharder::default().with_return([Arc::clone(&shard)])); let w = ShardedWriteBuffer::new(Arc::clone(&sharder)); diff --git a/router2/src/namespace_cache/metrics.rs b/router2/src/namespace_cache/metrics.rs index 3751795b14..614bb37494 100644 --- a/router2/src/namespace_cache/metrics.rs +++ b/router2/src/namespace_cache/metrics.rs @@ -2,42 +2,54 @@ use super::NamespaceCache; use data_types2::{DatabaseName, NamespaceSchema}; -use metric::{Metric, U64Counter, U64Gauge}; +use metric::{Metric, U64Gauge, U64Histogram, U64HistogramOptions}; use std::sync::Arc; +use time::{SystemProvider, TimeProvider}; /// An [`InstrumentedCache`] decorates a [`NamespaceCache`] with cache read /// hit/miss and cache put insert/update metrics. #[derive(Debug)] -pub struct InstrumentedCache { +pub struct InstrumentedCache { inner: T, + time_provider: P, /// Metrics derived from the [`NamespaceSchema`] held within the cache. table_count: U64Gauge, column_count: U64Gauge, /// A cache read hit - get_hit_counter: U64Counter, + get_hit: U64Histogram, /// A cache read miss - get_miss_counter: U64Counter, + get_miss: U64Histogram, /// A cache put for a namespace that did not previously exist. - put_insert_counter: U64Counter, + put_insert: U64Histogram, /// A cache put replacing a namespace that previously had a cache entry. - put_update_counter: U64Counter, + put_update: U64Histogram, } impl InstrumentedCache { /// Instrument `T`, recording cache operations to `registry`. pub fn new(inner: T, registry: &metric::Registry) -> Self { - let get_counter: Metric = - registry.register_metric("namespace_cache_get_count", "cache read requests"); - let get_hit_counter = get_counter.recorder(&[("result", "hit")]); - let get_miss_counter = get_counter.recorder(&[("result", "miss")]); + let buckets = || { + U64HistogramOptions::new([5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, u64::MAX]) + }; - let put_counter: Metric = - registry.register_metric("namespace_cache_put_count", "cache put requests"); - let put_insert_counter = put_counter.recorder(&[("op", "insert")]); - let put_update_counter = put_counter.recorder(&[("op", "update")]); + let get_counter: Metric = registry.register_metric_with_options( + "namespace_cache_get_duration_ms", + "cache read call duration in milliseconds", + buckets, + ); + let get_hit = get_counter.recorder(&[("result", "hit")]); + let get_miss = get_counter.recorder(&[("result", "miss")]); + + let put_counter: Metric = registry.register_metric_with_options( + "namespace_cache_put_duration_ms", + "cache put call duration in milliseconds", + buckets, + ); + let put_insert = put_counter.recorder(&[("op", "insert")]); + let put_update = put_counter.recorder(&[("op", "update")]); let table_count = registry .register_metric::( @@ -54,31 +66,36 @@ impl InstrumentedCache { Self { inner, + time_provider: Default::default(), table_count, column_count, - get_hit_counter, - get_miss_counter, - put_insert_counter, - put_update_counter, + get_hit, + get_miss, + put_insert, + put_update, } } } -impl NamespaceCache for Arc> +impl NamespaceCache for Arc> where T: NamespaceCache, + P: TimeProvider, { fn get_schema(&self, namespace: &DatabaseName<'_>) -> Option> { - match self.inner.get_schema(namespace) { - Some(v) => { - self.get_hit_counter.inc(1); - Some(v) - } - None => { - self.get_miss_counter.inc(1); - None - } + let t = self.time_provider.now(); + let res = self.inner.get_schema(namespace); + + // Avoid exploding if time goes backwards - simply drop the measurement + // if it happens. + if let Some(delta) = self.time_provider.now().checked_duration_since(t) { + match &res { + Some(_) => self.get_hit.record(delta.as_millis() as _), + None => self.get_miss.record(delta.as_millis() as _), + }; } + + res } fn put_schema( @@ -89,9 +106,14 @@ where let schema = schema.into(); let stats = NamespaceStats::new(&*schema); - match self.inner.put_schema(namespace, schema) { + let t = self.time_provider.now(); + let res = self.inner.put_schema(namespace, schema); + + match res { Some(v) => { - self.put_update_counter.inc(1); + if let Some(delta) = self.time_provider.now().checked_duration_since(t) { + self.put_update.record(delta.as_millis() as _); + } // Figure out the difference between the new namespace and the // evicted old namespace @@ -106,7 +128,9 @@ where Some(v) } None => { - self.put_insert_counter.inc(1); + if let Some(delta) = self.time_provider.now().checked_duration_since(t) { + self.put_insert.record(delta.as_millis() as _); + } // Add the new namespace stats to the counts. self.table_count.inc(stats.table_count); @@ -143,7 +167,7 @@ mod tests { ColumnId, ColumnSchema, ColumnType, KafkaTopicId, NamespaceId, QueryPoolId, TableId, TableSchema, }; - use metric::{MetricObserver, Observation}; + use metric::{Attributes, MetricObserver, Observation}; use std::collections::BTreeMap; /// Deterministically generate a schema containing tables with the specified @@ -184,6 +208,26 @@ mod tests { } } + fn assert_histogram_hit( + metrics: &metric::Registry, + metric_name: &'static str, + attr: (&'static str, &'static str), + count: u64, + ) { + let histogram = metrics + .get_instrument::>(metric_name) + .expect("failed to read metric") + .get_observer(&Attributes::from(&[attr])) + .expect("failed to get observer") + .fetch(); + + let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count); + assert_eq!( + hit_count, count, + "metric did not record correct number of calls" + ); + } + #[test] fn test_put() { let ns = DatabaseName::new("test").expect("database name is valid"); @@ -194,13 +238,17 @@ mod tests { // No tables let schema = new_schema(&[]); assert!(cache.put_schema(ns.clone(), schema).is_none()); - assert_eq!( - cache.put_insert_counter.observe(), - Observation::U64Counter(1) + assert_histogram_hit( + ®istry, + "namespace_cache_put_duration_ms", + ("op", "insert"), + 1, ); - assert_eq!( - cache.put_update_counter.observe(), - Observation::U64Counter(0) + assert_histogram_hit( + ®istry, + "namespace_cache_put_duration_ms", + ("op", "update"), + 0, ); assert_eq!(cache.table_count.observe(), Observation::U64Gauge(0)); assert_eq!(cache.column_count.observe(), Observation::U64Gauge(0)); @@ -208,13 +256,17 @@ mod tests { // Add a table with 1 column let schema = new_schema(&[1]); assert!(cache.put_schema(ns.clone(), schema).is_some()); - assert_eq!( - cache.put_insert_counter.observe(), - Observation::U64Counter(1) + assert_histogram_hit( + ®istry, + "namespace_cache_put_duration_ms", + ("op", "insert"), + 1, ); // Unchanged - assert_eq!( - cache.put_update_counter.observe(), - Observation::U64Counter(1) + assert_histogram_hit( + ®istry, + "namespace_cache_put_duration_ms", + ("op", "update"), + 1, ); assert_eq!(cache.table_count.observe(), Observation::U64Gauge(1)); assert_eq!(cache.column_count.observe(), Observation::U64Gauge(1)); @@ -222,13 +274,17 @@ mod tests { // Increase the number of columns in this one table let schema = new_schema(&[5]); assert!(cache.put_schema(ns.clone(), schema).is_some()); - assert_eq!( - cache.put_insert_counter.observe(), - Observation::U64Counter(1) + assert_histogram_hit( + ®istry, + "namespace_cache_put_duration_ms", + ("op", "insert"), + 1, ); // Unchanged - assert_eq!( - cache.put_update_counter.observe(), - Observation::U64Counter(2) + assert_histogram_hit( + ®istry, + "namespace_cache_put_duration_ms", + ("op", "update"), + 2, ); assert_eq!(cache.table_count.observe(), Observation::U64Gauge(1)); assert_eq!(cache.column_count.observe(), Observation::U64Gauge(5)); @@ -236,13 +292,17 @@ mod tests { // Decrease the number of columns let schema = new_schema(&[2]); assert!(cache.put_schema(ns.clone(), schema).is_some()); - assert_eq!( - cache.put_insert_counter.observe(), - Observation::U64Counter(1) + assert_histogram_hit( + ®istry, + "namespace_cache_put_duration_ms", + ("op", "insert"), + 1, ); // Unchanged - assert_eq!( - cache.put_update_counter.observe(), - Observation::U64Counter(3) + assert_histogram_hit( + ®istry, + "namespace_cache_put_duration_ms", + ("op", "update"), + 3, ); assert_eq!(cache.table_count.observe(), Observation::U64Gauge(1)); assert_eq!(cache.column_count.observe(), Observation::U64Gauge(2)); @@ -250,13 +310,17 @@ mod tests { // Add another table let schema = new_schema(&[2, 5]); assert!(cache.put_schema(ns.clone(), schema).is_some()); - assert_eq!( - cache.put_insert_counter.observe(), - Observation::U64Counter(1) + assert_histogram_hit( + ®istry, + "namespace_cache_put_duration_ms", + ("op", "insert"), + 1, ); // Unchanged - assert_eq!( - cache.put_update_counter.observe(), - Observation::U64Counter(4) + assert_histogram_hit( + ®istry, + "namespace_cache_put_duration_ms", + ("op", "update"), + 4, ); assert_eq!(cache.table_count.observe(), Observation::U64Gauge(2)); assert_eq!(cache.column_count.observe(), Observation::U64Gauge(7)); @@ -264,13 +328,17 @@ mod tests { // Add another table and adjust the existing tables (one up, one down) let schema = new_schema(&[1, 10, 4]); assert!(cache.put_schema(ns.clone(), schema).is_some()); - assert_eq!( - cache.put_insert_counter.observe(), - Observation::U64Counter(1) + assert_histogram_hit( + ®istry, + "namespace_cache_put_duration_ms", + ("op", "insert"), + 1, ); // Unchanged - assert_eq!( - cache.put_update_counter.observe(), - Observation::U64Counter(5) + assert_histogram_hit( + ®istry, + "namespace_cache_put_duration_ms", + ("op", "update"), + 5, ); assert_eq!(cache.table_count.observe(), Observation::U64Gauge(3)); assert_eq!(cache.column_count.observe(), Observation::U64Gauge(15)); @@ -278,13 +346,17 @@ mod tests { // Remove a table let schema = new_schema(&[1, 10]); assert!(cache.put_schema(ns, schema).is_some()); - assert_eq!( - cache.put_insert_counter.observe(), - Observation::U64Counter(1) + assert_histogram_hit( + ®istry, + "namespace_cache_put_duration_ms", + ("op", "insert"), + 1, ); // Unchanged - assert_eq!( - cache.put_update_counter.observe(), - Observation::U64Counter(6) + assert_histogram_hit( + ®istry, + "namespace_cache_put_duration_ms", + ("op", "update"), + 6, ); assert_eq!(cache.table_count.observe(), Observation::U64Gauge(2)); assert_eq!(cache.column_count.observe(), Observation::U64Gauge(11)); @@ -292,16 +364,28 @@ mod tests { // Add a new namespace let ns = DatabaseName::new("another").expect("database name is valid"); let schema = new_schema(&[10, 12, 9]); - assert!(cache.put_schema(ns, schema).is_none()); - assert_eq!( - cache.put_insert_counter.observe(), - Observation::U64Counter(2) + assert!(cache.put_schema(ns.clone(), schema).is_none()); + assert_histogram_hit( + ®istry, + "namespace_cache_put_duration_ms", + ("op", "insert"), + 2, ); - assert_eq!( - cache.put_update_counter.observe(), - Observation::U64Counter(6) + assert_histogram_hit( + ®istry, + "namespace_cache_put_duration_ms", + ("op", "update"), + 6, ); assert_eq!(cache.table_count.observe(), Observation::U64Gauge(5)); assert_eq!(cache.column_count.observe(), Observation::U64Gauge(42)); + + let _got = cache.get_schema(&ns).expect("should exist"); + assert_histogram_hit( + ®istry, + "namespace_cache_get_duration_ms", + ("result", "hit"), + 1, + ); } } diff --git a/router2/src/sequencer.rs b/router2/src/sequencer.rs index 97dacb7980..eb9c8c0941 100644 --- a/router2/src/sequencer.rs +++ b/router2/src/sequencer.rs @@ -1,15 +1,21 @@ //! A representation of a single operation sequencer. -use std::{hash::Hash, sync::Arc}; +use std::{borrow::Cow, hash::Hash, sync::Arc}; use dml::{DmlMeta, DmlOperation}; +use metric::{Metric, U64Histogram, U64HistogramOptions}; +use time::{SystemProvider, TimeProvider}; use write_buffer::core::{WriteBufferError, WriteBufferWriting}; /// A sequencer tags an write buffer with a sequencer ID. #[derive(Debug)] -pub struct Sequencer { +pub struct Sequencer

{ id: usize, inner: Arc, + time_provider: P, + + enqueue_success: U64Histogram, + enqueue_error: U64Histogram, } impl Eq for Sequencer {} @@ -28,8 +34,32 @@ impl Hash for Sequencer { impl Sequencer { /// Tag `inner` with the specified `id`. - pub fn new(id: usize, inner: Arc) -> Self { - Self { id, inner } + pub fn new(id: usize, inner: Arc, metrics: &metric::Registry) -> Self { + let buckets = || { + U64HistogramOptions::new([5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, u64::MAX]) + }; + let write: Metric = metrics.register_metric_with_options( + "sequencer_enqueue_duration_ms", + "sequencer enqueue call duration in milliseconds", + buckets, + ); + + let enqueue_success = write.recorder([ + ("shard_id", Cow::from(id.to_string())), + ("result", Cow::from("success")), + ]); + let enqueue_error = write.recorder([ + ("shard_id", Cow::from(id.to_string())), + ("result", Cow::from("error")), + ]); + + Self { + id, + inner, + enqueue_success, + enqueue_error, + time_provider: Default::default(), + } } /// Return the ID of this sequencer. @@ -43,6 +73,17 @@ impl Sequencer { /// behaviour of the [`WriteBufferWriting::store_operation()`] /// implementation this [`Sequencer`] wraps. pub async fn enqueue<'a>(&self, op: DmlOperation) -> Result { - self.inner.store_operation(self.id as u32, &op).await + let t = self.time_provider.now(); + + let res = self.inner.store_operation(self.id as u32, &op).await; + + if let Some(delta) = self.time_provider.now().checked_duration_since(t) { + match &res { + Ok(_) => self.enqueue_success.record(delta.as_millis() as _), + Err(_) => self.enqueue_error.record(delta.as_millis() as _), + } + } + + res } } diff --git a/router2/src/server/http.rs b/router2/src/server/http.rs index 713b03b4ed..20721ae67b 100644 --- a/router2/src/server/http.rs +++ b/router2/src/server/http.rs @@ -155,6 +155,7 @@ pub struct HttpDelegate { write_metric_lines: U64Counter, write_metric_fields: U64Counter, + write_metric_tables: U64Counter, write_metric_body_size: U64Counter, delete_metric_body_size: U64Counter, } @@ -178,6 +179,12 @@ impl HttpDelegate { "cumulative number of line protocol fields successfully routed", ) .recorder(&[]); + let write_metric_tables = metrics + .register_metric::( + "http_write_tables_total", + "cumulative number of tables in each write request", + ) + .recorder(&[]); let write_metric_body_size = metrics .register_metric::( "http_write_body_bytes_total", @@ -197,6 +204,7 @@ impl HttpDelegate { dml_handler, write_metric_lines, write_metric_fields, + write_metric_tables, write_metric_body_size, delete_metric_body_size, } @@ -245,9 +253,11 @@ where Err(e) => return Err(Error::ParseLineProtocol(e)), }; + let num_tables = batches.len(); debug!( num_lines=stats.num_lines, num_fields=stats.num_fields, + num_tables, body_size=body.len(), %namespace, org=%account.org, @@ -262,6 +272,7 @@ where self.write_metric_lines.inc(stats.num_lines as _); self.write_metric_fields.inc(stats.num_fields as _); + self.write_metric_tables.inc(num_tables as _); self.write_metric_body_size.inc(body.len() as _); Ok(()) @@ -493,6 +504,7 @@ mod tests { if $uri.contains("/api/v2/write") { assert_metric_hit(&metrics, "http_write_lines_total", None); assert_metric_hit(&metrics, "http_write_fields_total", None); + assert_metric_hit(&metrics, "http_write_tables_total", None); assert_metric_hit(&metrics, "http_write_body_bytes_total", Some($body.len() as _)); } else { assert_metric_hit(&metrics, "http_delete_body_bytes_total", Some($body.len() as _)); diff --git a/router2/tests/http.rs b/router2/tests/http.rs index 6dbb30be21..625136b5c3 100644 --- a/router2/tests/http.rs +++ b/router2/tests/http.rs @@ -82,7 +82,7 @@ impl TestContext { let sharded_write_buffer = ShardedWriteBuffer::new( shards .into_iter() - .map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer))) + .map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &metrics)) .map(Arc::new) .collect::>(), ); @@ -217,4 +217,17 @@ async fn test_write_ok() { .fetch(), 1 ); + + let histogram = ctx + .metrics() + .get_instrument::>("sequencer_enqueue_duration_ms") + .expect("failed to read metric") + .get_observer(&Attributes::from(&[ + ("shard_id", "0"), + ("result", "success"), + ])) + .expect("failed to get observer") + .fetch(); + let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count); + assert_eq!(hit_count, 1); }