Merge pull request #3927 from influxdata/dom/more-router-metrics
feat: more router metricspull/24376/head
commit
81b942cd13
|
@ -227,7 +227,7 @@ async fn init_write_buffer(
|
|||
let write_buffer = Arc::new(
|
||||
config
|
||||
.write_buffer_config
|
||||
.writing(metrics, trace_collector)
|
||||
.writing(Arc::clone(&metrics), trace_collector)
|
||||
.await?,
|
||||
);
|
||||
|
||||
|
@ -248,7 +248,7 @@ async fn init_write_buffer(
|
|||
Ok(ShardedWriteBuffer::new(
|
||||
shards
|
||||
.into_iter()
|
||||
.map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer)))
|
||||
.map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &metrics))
|
||||
.map(Arc::new)
|
||||
.collect::<JumpHash<_>>(),
|
||||
))
|
||||
|
|
|
@ -47,6 +47,10 @@ schema = { path = "../schema" }
|
|||
name = "sharder"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "schema_validator"
|
||||
harness = false
|
||||
|
||||
[[bench]]
|
||||
name = "e2e"
|
||||
harness = false
|
||||
|
|
|
@ -35,7 +35,7 @@ fn init_write_buffer(n_sequencers: u32) -> ShardedWriteBuffer<JumpHash<Arc<Seque
|
|||
ShardedWriteBuffer::new(
|
||||
shards
|
||||
.into_iter()
|
||||
.map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer)))
|
||||
.map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &Default::default()))
|
||||
.map(Arc::new)
|
||||
.collect::<JumpHash<_>>(),
|
||||
)
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
use std::{iter, sync::Arc};
|
||||
|
||||
use criterion::{
|
||||
criterion_group, criterion_main, measurement::WallTime, BatchSize, BenchmarkGroup, Criterion,
|
||||
Throughput,
|
||||
};
|
||||
use data_types2::DatabaseName;
|
||||
use hashbrown::HashMap;
|
||||
use iox_catalog::mem::MemCatalog;
|
||||
use mutable_batch::MutableBatch;
|
||||
use router2::{
|
||||
dml_handlers::{DmlHandler, SchemaValidator},
|
||||
namespace_cache::{MemoryNamespaceCache, ShardedCache},
|
||||
};
|
||||
use schema::selection::Selection;
|
||||
use tokio::runtime::Runtime;
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
static ref NAMESPACE: DatabaseName<'static> = "bananas".try_into().unwrap();
|
||||
}
|
||||
|
||||
fn runtime() -> Runtime {
|
||||
tokio::runtime::Builder::new_current_thread()
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn sharder_benchmarks(c: &mut Criterion) {
|
||||
let mut group = c.benchmark_group("schema_validator");
|
||||
|
||||
bench(&mut group, 1, 1);
|
||||
|
||||
bench(&mut group, 1, 100);
|
||||
bench(&mut group, 1, 10000);
|
||||
|
||||
bench(&mut group, 100, 1);
|
||||
bench(&mut group, 10000, 1);
|
||||
|
||||
group.finish();
|
||||
}
|
||||
|
||||
fn bench(group: &mut BenchmarkGroup<WallTime>, tables: usize, columns_per_table: usize) {
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
|
||||
let catalog = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
|
||||
let ns_cache = Arc::new(ShardedCache::new(
|
||||
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
|
||||
));
|
||||
let validator = SchemaValidator::new(catalog, ns_cache);
|
||||
|
||||
for i in 0..65_000 {
|
||||
let write = lp_to_writes(format!("{}{}", i + 10_000_000, generate_lp(1, 1)).as_str());
|
||||
let _ = runtime().block_on(validator.write(&*NAMESPACE, write, None));
|
||||
}
|
||||
|
||||
let write = lp_to_writes(&generate_lp(tables, columns_per_table));
|
||||
let column_count = write
|
||||
.values()
|
||||
.fold(0, |acc, b| acc + b.schema(Selection::All).unwrap().len());
|
||||
|
||||
group.throughput(Throughput::Elements(column_count as _));
|
||||
group.bench_function(format!("{tables}x{columns_per_table}"), |b| {
|
||||
b.to_async(runtime()).iter_batched(
|
||||
|| write.clone(),
|
||||
|write| validator.write(&*NAMESPACE, write, None),
|
||||
BatchSize::SmallInput,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
fn generate_lp(tables: usize, columns_per_table: usize) -> String {
|
||||
(0..tables)
|
||||
.map(|i| {
|
||||
let cols = (0..columns_per_table)
|
||||
.map(|i| format!("val{}=42i", i))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
|
||||
format!("table{i},tag=A {cols}")
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n")
|
||||
}
|
||||
|
||||
// Parse `lp` into a table-keyed MutableBatch map.
|
||||
fn lp_to_writes(lp: &str) -> HashMap<String, MutableBatch> {
|
||||
let (writes, _) = mutable_batch_lp::lines_to_batches_stats(lp, 42)
|
||||
.expect("failed to build test writes from LP");
|
||||
writes
|
||||
}
|
||||
|
||||
criterion_group!(benches, sharder_benchmarks);
|
||||
criterion_main!(benches);
|
|
@ -237,7 +237,11 @@ mod tests {
|
|||
|
||||
// Configure the sharder to return shards containing the mock write
|
||||
// buffer.
|
||||
let shard = Arc::new(Sequencer::new(0, Arc::new(write_buffer)));
|
||||
let shard = Arc::new(Sequencer::new(
|
||||
0,
|
||||
Arc::new(write_buffer),
|
||||
&Default::default(),
|
||||
));
|
||||
let sharder = Arc::new(MockSharder::default().with_return([
|
||||
Arc::clone(&shard),
|
||||
Arc::clone(&shard),
|
||||
|
@ -285,13 +289,21 @@ mod tests {
|
|||
// Configure the first shard to write to one write buffer
|
||||
let write_buffer1 = init_write_buffer(1);
|
||||
let write_buffer1_state = write_buffer1.state();
|
||||
let shard1 = Arc::new(Sequencer::new(0, Arc::new(write_buffer1)));
|
||||
let shard1 = Arc::new(Sequencer::new(
|
||||
0,
|
||||
Arc::new(write_buffer1),
|
||||
&Default::default(),
|
||||
));
|
||||
|
||||
// Configure the second shard to write to a different write buffer in
|
||||
// order to see which buffer saw what write.
|
||||
let write_buffer2 = init_write_buffer(2);
|
||||
let write_buffer2_state = write_buffer2.state();
|
||||
let shard2 = Arc::new(Sequencer::new(1, Arc::new(write_buffer2)));
|
||||
let shard2 = Arc::new(Sequencer::new(
|
||||
1,
|
||||
Arc::new(write_buffer2),
|
||||
&Default::default(),
|
||||
));
|
||||
|
||||
let sharder = Arc::new(MockSharder::default().with_return([
|
||||
// 4 tables, 3 mapped to the first shard
|
||||
|
@ -359,12 +371,20 @@ mod tests {
|
|||
// Configure the first shard to write to one write buffer
|
||||
let write_buffer1 = init_write_buffer(1);
|
||||
let write_buffer1_state = write_buffer1.state();
|
||||
let shard1 = Arc::new(Sequencer::new(0, Arc::new(write_buffer1)));
|
||||
let shard1 = Arc::new(Sequencer::new(
|
||||
0,
|
||||
Arc::new(write_buffer1),
|
||||
&Default::default(),
|
||||
));
|
||||
|
||||
// Configure the second shard to write to a write buffer that always fails
|
||||
let write_buffer2 = init_write_buffer(1);
|
||||
// Non-existant sequencer ID to trigger an error.
|
||||
let shard2 = Arc::new(Sequencer::new(13, Arc::new(write_buffer2)));
|
||||
let shard2 = Arc::new(Sequencer::new(
|
||||
13,
|
||||
Arc::new(write_buffer2),
|
||||
&Default::default(),
|
||||
));
|
||||
|
||||
let sharder = Arc::new(
|
||||
MockSharder::default().with_return([Arc::clone(&shard1), Arc::clone(&shard2)]),
|
||||
|
@ -403,7 +423,11 @@ mod tests {
|
|||
|
||||
// Configure the sharder to return shards containing the mock write
|
||||
// buffer.
|
||||
let shard = Arc::new(Sequencer::new(0, Arc::new(write_buffer)));
|
||||
let shard = Arc::new(Sequencer::new(
|
||||
0,
|
||||
Arc::new(write_buffer),
|
||||
&Default::default(),
|
||||
));
|
||||
let sharder = Arc::new(MockSharder::default().with_return([Arc::clone(&shard)]));
|
||||
|
||||
let w = ShardedWriteBuffer::new(Arc::clone(&sharder));
|
||||
|
|
|
@ -2,42 +2,54 @@
|
|||
|
||||
use super::NamespaceCache;
|
||||
use data_types2::{DatabaseName, NamespaceSchema};
|
||||
use metric::{Metric, U64Counter, U64Gauge};
|
||||
use metric::{Metric, U64Gauge, U64Histogram, U64HistogramOptions};
|
||||
use std::sync::Arc;
|
||||
use time::{SystemProvider, TimeProvider};
|
||||
|
||||
/// An [`InstrumentedCache`] decorates a [`NamespaceCache`] with cache read
|
||||
/// hit/miss and cache put insert/update metrics.
|
||||
#[derive(Debug)]
|
||||
pub struct InstrumentedCache<T> {
|
||||
pub struct InstrumentedCache<T, P = SystemProvider> {
|
||||
inner: T,
|
||||
time_provider: P,
|
||||
|
||||
/// Metrics derived from the [`NamespaceSchema`] held within the cache.
|
||||
table_count: U64Gauge,
|
||||
column_count: U64Gauge,
|
||||
|
||||
/// A cache read hit
|
||||
get_hit_counter: U64Counter,
|
||||
get_hit: U64Histogram,
|
||||
/// A cache read miss
|
||||
get_miss_counter: U64Counter,
|
||||
get_miss: U64Histogram,
|
||||
|
||||
/// A cache put for a namespace that did not previously exist.
|
||||
put_insert_counter: U64Counter,
|
||||
put_insert: U64Histogram,
|
||||
/// A cache put replacing a namespace that previously had a cache entry.
|
||||
put_update_counter: U64Counter,
|
||||
put_update: U64Histogram,
|
||||
}
|
||||
|
||||
impl<T> InstrumentedCache<T> {
|
||||
/// Instrument `T`, recording cache operations to `registry`.
|
||||
pub fn new(inner: T, registry: &metric::Registry) -> Self {
|
||||
let get_counter: Metric<U64Counter> =
|
||||
registry.register_metric("namespace_cache_get_count", "cache read requests");
|
||||
let get_hit_counter = get_counter.recorder(&[("result", "hit")]);
|
||||
let get_miss_counter = get_counter.recorder(&[("result", "miss")]);
|
||||
let buckets = || {
|
||||
U64HistogramOptions::new([5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, u64::MAX])
|
||||
};
|
||||
|
||||
let put_counter: Metric<U64Counter> =
|
||||
registry.register_metric("namespace_cache_put_count", "cache put requests");
|
||||
let put_insert_counter = put_counter.recorder(&[("op", "insert")]);
|
||||
let put_update_counter = put_counter.recorder(&[("op", "update")]);
|
||||
let get_counter: Metric<U64Histogram> = registry.register_metric_with_options(
|
||||
"namespace_cache_get_duration_ms",
|
||||
"cache read call duration in milliseconds",
|
||||
buckets,
|
||||
);
|
||||
let get_hit = get_counter.recorder(&[("result", "hit")]);
|
||||
let get_miss = get_counter.recorder(&[("result", "miss")]);
|
||||
|
||||
let put_counter: Metric<U64Histogram> = registry.register_metric_with_options(
|
||||
"namespace_cache_put_duration_ms",
|
||||
"cache put call duration in milliseconds",
|
||||
buckets,
|
||||
);
|
||||
let put_insert = put_counter.recorder(&[("op", "insert")]);
|
||||
let put_update = put_counter.recorder(&[("op", "update")]);
|
||||
|
||||
let table_count = registry
|
||||
.register_metric::<U64Gauge>(
|
||||
|
@ -54,31 +66,36 @@ impl<T> InstrumentedCache<T> {
|
|||
|
||||
Self {
|
||||
inner,
|
||||
time_provider: Default::default(),
|
||||
table_count,
|
||||
column_count,
|
||||
get_hit_counter,
|
||||
get_miss_counter,
|
||||
put_insert_counter,
|
||||
put_update_counter,
|
||||
get_hit,
|
||||
get_miss,
|
||||
put_insert,
|
||||
put_update,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> NamespaceCache for Arc<InstrumentedCache<T>>
|
||||
impl<T, P> NamespaceCache for Arc<InstrumentedCache<T, P>>
|
||||
where
|
||||
T: NamespaceCache,
|
||||
P: TimeProvider,
|
||||
{
|
||||
fn get_schema(&self, namespace: &DatabaseName<'_>) -> Option<Arc<NamespaceSchema>> {
|
||||
match self.inner.get_schema(namespace) {
|
||||
Some(v) => {
|
||||
self.get_hit_counter.inc(1);
|
||||
Some(v)
|
||||
}
|
||||
None => {
|
||||
self.get_miss_counter.inc(1);
|
||||
None
|
||||
}
|
||||
let t = self.time_provider.now();
|
||||
let res = self.inner.get_schema(namespace);
|
||||
|
||||
// Avoid exploding if time goes backwards - simply drop the measurement
|
||||
// if it happens.
|
||||
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
|
||||
match &res {
|
||||
Some(_) => self.get_hit.record(delta.as_millis() as _),
|
||||
None => self.get_miss.record(delta.as_millis() as _),
|
||||
};
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
fn put_schema(
|
||||
|
@ -89,9 +106,14 @@ where
|
|||
let schema = schema.into();
|
||||
let stats = NamespaceStats::new(&*schema);
|
||||
|
||||
match self.inner.put_schema(namespace, schema) {
|
||||
let t = self.time_provider.now();
|
||||
let res = self.inner.put_schema(namespace, schema);
|
||||
|
||||
match res {
|
||||
Some(v) => {
|
||||
self.put_update_counter.inc(1);
|
||||
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
|
||||
self.put_update.record(delta.as_millis() as _);
|
||||
}
|
||||
|
||||
// Figure out the difference between the new namespace and the
|
||||
// evicted old namespace
|
||||
|
@ -106,7 +128,9 @@ where
|
|||
Some(v)
|
||||
}
|
||||
None => {
|
||||
self.put_insert_counter.inc(1);
|
||||
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
|
||||
self.put_insert.record(delta.as_millis() as _);
|
||||
}
|
||||
|
||||
// Add the new namespace stats to the counts.
|
||||
self.table_count.inc(stats.table_count);
|
||||
|
@ -143,7 +167,7 @@ mod tests {
|
|||
ColumnId, ColumnSchema, ColumnType, KafkaTopicId, NamespaceId, QueryPoolId, TableId,
|
||||
TableSchema,
|
||||
};
|
||||
use metric::{MetricObserver, Observation};
|
||||
use metric::{Attributes, MetricObserver, Observation};
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
/// Deterministically generate a schema containing tables with the specified
|
||||
|
@ -184,6 +208,26 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
fn assert_histogram_hit(
|
||||
metrics: &metric::Registry,
|
||||
metric_name: &'static str,
|
||||
attr: (&'static str, &'static str),
|
||||
count: u64,
|
||||
) {
|
||||
let histogram = metrics
|
||||
.get_instrument::<Metric<U64Histogram>>(metric_name)
|
||||
.expect("failed to read metric")
|
||||
.get_observer(&Attributes::from(&[attr]))
|
||||
.expect("failed to get observer")
|
||||
.fetch();
|
||||
|
||||
let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count);
|
||||
assert_eq!(
|
||||
hit_count, count,
|
||||
"metric did not record correct number of calls"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_put() {
|
||||
let ns = DatabaseName::new("test").expect("database name is valid");
|
||||
|
@ -194,13 +238,17 @@ mod tests {
|
|||
// No tables
|
||||
let schema = new_schema(&[]);
|
||||
assert!(cache.put_schema(ns.clone(), schema).is_none());
|
||||
assert_eq!(
|
||||
cache.put_insert_counter.observe(),
|
||||
Observation::U64Counter(1)
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_put_duration_ms",
|
||||
("op", "insert"),
|
||||
1,
|
||||
);
|
||||
assert_eq!(
|
||||
cache.put_update_counter.observe(),
|
||||
Observation::U64Counter(0)
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_put_duration_ms",
|
||||
("op", "update"),
|
||||
0,
|
||||
);
|
||||
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(0));
|
||||
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(0));
|
||||
|
@ -208,13 +256,17 @@ mod tests {
|
|||
// Add a table with 1 column
|
||||
let schema = new_schema(&[1]);
|
||||
assert!(cache.put_schema(ns.clone(), schema).is_some());
|
||||
assert_eq!(
|
||||
cache.put_insert_counter.observe(),
|
||||
Observation::U64Counter(1)
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_put_duration_ms",
|
||||
("op", "insert"),
|
||||
1,
|
||||
); // Unchanged
|
||||
assert_eq!(
|
||||
cache.put_update_counter.observe(),
|
||||
Observation::U64Counter(1)
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_put_duration_ms",
|
||||
("op", "update"),
|
||||
1,
|
||||
);
|
||||
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(1));
|
||||
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(1));
|
||||
|
@ -222,13 +274,17 @@ mod tests {
|
|||
// Increase the number of columns in this one table
|
||||
let schema = new_schema(&[5]);
|
||||
assert!(cache.put_schema(ns.clone(), schema).is_some());
|
||||
assert_eq!(
|
||||
cache.put_insert_counter.observe(),
|
||||
Observation::U64Counter(1)
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_put_duration_ms",
|
||||
("op", "insert"),
|
||||
1,
|
||||
); // Unchanged
|
||||
assert_eq!(
|
||||
cache.put_update_counter.observe(),
|
||||
Observation::U64Counter(2)
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_put_duration_ms",
|
||||
("op", "update"),
|
||||
2,
|
||||
);
|
||||
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(1));
|
||||
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(5));
|
||||
|
@ -236,13 +292,17 @@ mod tests {
|
|||
// Decrease the number of columns
|
||||
let schema = new_schema(&[2]);
|
||||
assert!(cache.put_schema(ns.clone(), schema).is_some());
|
||||
assert_eq!(
|
||||
cache.put_insert_counter.observe(),
|
||||
Observation::U64Counter(1)
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_put_duration_ms",
|
||||
("op", "insert"),
|
||||
1,
|
||||
); // Unchanged
|
||||
assert_eq!(
|
||||
cache.put_update_counter.observe(),
|
||||
Observation::U64Counter(3)
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_put_duration_ms",
|
||||
("op", "update"),
|
||||
3,
|
||||
);
|
||||
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(1));
|
||||
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(2));
|
||||
|
@ -250,13 +310,17 @@ mod tests {
|
|||
// Add another table
|
||||
let schema = new_schema(&[2, 5]);
|
||||
assert!(cache.put_schema(ns.clone(), schema).is_some());
|
||||
assert_eq!(
|
||||
cache.put_insert_counter.observe(),
|
||||
Observation::U64Counter(1)
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_put_duration_ms",
|
||||
("op", "insert"),
|
||||
1,
|
||||
); // Unchanged
|
||||
assert_eq!(
|
||||
cache.put_update_counter.observe(),
|
||||
Observation::U64Counter(4)
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_put_duration_ms",
|
||||
("op", "update"),
|
||||
4,
|
||||
);
|
||||
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(2));
|
||||
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(7));
|
||||
|
@ -264,13 +328,17 @@ mod tests {
|
|||
// Add another table and adjust the existing tables (one up, one down)
|
||||
let schema = new_schema(&[1, 10, 4]);
|
||||
assert!(cache.put_schema(ns.clone(), schema).is_some());
|
||||
assert_eq!(
|
||||
cache.put_insert_counter.observe(),
|
||||
Observation::U64Counter(1)
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_put_duration_ms",
|
||||
("op", "insert"),
|
||||
1,
|
||||
); // Unchanged
|
||||
assert_eq!(
|
||||
cache.put_update_counter.observe(),
|
||||
Observation::U64Counter(5)
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_put_duration_ms",
|
||||
("op", "update"),
|
||||
5,
|
||||
);
|
||||
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(3));
|
||||
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(15));
|
||||
|
@ -278,13 +346,17 @@ mod tests {
|
|||
// Remove a table
|
||||
let schema = new_schema(&[1, 10]);
|
||||
assert!(cache.put_schema(ns, schema).is_some());
|
||||
assert_eq!(
|
||||
cache.put_insert_counter.observe(),
|
||||
Observation::U64Counter(1)
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_put_duration_ms",
|
||||
("op", "insert"),
|
||||
1,
|
||||
); // Unchanged
|
||||
assert_eq!(
|
||||
cache.put_update_counter.observe(),
|
||||
Observation::U64Counter(6)
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_put_duration_ms",
|
||||
("op", "update"),
|
||||
6,
|
||||
);
|
||||
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(2));
|
||||
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(11));
|
||||
|
@ -292,16 +364,28 @@ mod tests {
|
|||
// Add a new namespace
|
||||
let ns = DatabaseName::new("another").expect("database name is valid");
|
||||
let schema = new_schema(&[10, 12, 9]);
|
||||
assert!(cache.put_schema(ns, schema).is_none());
|
||||
assert_eq!(
|
||||
cache.put_insert_counter.observe(),
|
||||
Observation::U64Counter(2)
|
||||
assert!(cache.put_schema(ns.clone(), schema).is_none());
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_put_duration_ms",
|
||||
("op", "insert"),
|
||||
2,
|
||||
);
|
||||
assert_eq!(
|
||||
cache.put_update_counter.observe(),
|
||||
Observation::U64Counter(6)
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_put_duration_ms",
|
||||
("op", "update"),
|
||||
6,
|
||||
);
|
||||
assert_eq!(cache.table_count.observe(), Observation::U64Gauge(5));
|
||||
assert_eq!(cache.column_count.observe(), Observation::U64Gauge(42));
|
||||
|
||||
let _got = cache.get_schema(&ns).expect("should exist");
|
||||
assert_histogram_hit(
|
||||
®istry,
|
||||
"namespace_cache_get_duration_ms",
|
||||
("result", "hit"),
|
||||
1,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,15 +1,21 @@
|
|||
//! A representation of a single operation sequencer.
|
||||
|
||||
use std::{hash::Hash, sync::Arc};
|
||||
use std::{borrow::Cow, hash::Hash, sync::Arc};
|
||||
|
||||
use dml::{DmlMeta, DmlOperation};
|
||||
use metric::{Metric, U64Histogram, U64HistogramOptions};
|
||||
use time::{SystemProvider, TimeProvider};
|
||||
use write_buffer::core::{WriteBufferError, WriteBufferWriting};
|
||||
|
||||
/// A sequencer tags an write buffer with a sequencer ID.
|
||||
#[derive(Debug)]
|
||||
pub struct Sequencer {
|
||||
pub struct Sequencer<P = SystemProvider> {
|
||||
id: usize,
|
||||
inner: Arc<dyn WriteBufferWriting>,
|
||||
time_provider: P,
|
||||
|
||||
enqueue_success: U64Histogram,
|
||||
enqueue_error: U64Histogram,
|
||||
}
|
||||
|
||||
impl Eq for Sequencer {}
|
||||
|
@ -28,8 +34,32 @@ impl Hash for Sequencer {
|
|||
|
||||
impl Sequencer {
|
||||
/// Tag `inner` with the specified `id`.
|
||||
pub fn new(id: usize, inner: Arc<dyn WriteBufferWriting>) -> Self {
|
||||
Self { id, inner }
|
||||
pub fn new(id: usize, inner: Arc<dyn WriteBufferWriting>, metrics: &metric::Registry) -> Self {
|
||||
let buckets = || {
|
||||
U64HistogramOptions::new([5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, u64::MAX])
|
||||
};
|
||||
let write: Metric<U64Histogram> = metrics.register_metric_with_options(
|
||||
"sequencer_enqueue_duration_ms",
|
||||
"sequencer enqueue call duration in milliseconds",
|
||||
buckets,
|
||||
);
|
||||
|
||||
let enqueue_success = write.recorder([
|
||||
("shard_id", Cow::from(id.to_string())),
|
||||
("result", Cow::from("success")),
|
||||
]);
|
||||
let enqueue_error = write.recorder([
|
||||
("shard_id", Cow::from(id.to_string())),
|
||||
("result", Cow::from("error")),
|
||||
]);
|
||||
|
||||
Self {
|
||||
id,
|
||||
inner,
|
||||
enqueue_success,
|
||||
enqueue_error,
|
||||
time_provider: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the ID of this sequencer.
|
||||
|
@ -43,6 +73,17 @@ impl Sequencer {
|
|||
/// behaviour of the [`WriteBufferWriting::store_operation()`]
|
||||
/// implementation this [`Sequencer`] wraps.
|
||||
pub async fn enqueue<'a>(&self, op: DmlOperation) -> Result<DmlMeta, WriteBufferError> {
|
||||
self.inner.store_operation(self.id as u32, &op).await
|
||||
let t = self.time_provider.now();
|
||||
|
||||
let res = self.inner.store_operation(self.id as u32, &op).await;
|
||||
|
||||
if let Some(delta) = self.time_provider.now().checked_duration_since(t) {
|
||||
match &res {
|
||||
Ok(_) => self.enqueue_success.record(delta.as_millis() as _),
|
||||
Err(_) => self.enqueue_error.record(delta.as_millis() as _),
|
||||
}
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
}
|
||||
|
|
|
@ -155,6 +155,7 @@ pub struct HttpDelegate<D, T = SystemProvider> {
|
|||
|
||||
write_metric_lines: U64Counter,
|
||||
write_metric_fields: U64Counter,
|
||||
write_metric_tables: U64Counter,
|
||||
write_metric_body_size: U64Counter,
|
||||
delete_metric_body_size: U64Counter,
|
||||
}
|
||||
|
@ -178,6 +179,12 @@ impl<D> HttpDelegate<D, SystemProvider> {
|
|||
"cumulative number of line protocol fields successfully routed",
|
||||
)
|
||||
.recorder(&[]);
|
||||
let write_metric_tables = metrics
|
||||
.register_metric::<U64Counter>(
|
||||
"http_write_tables_total",
|
||||
"cumulative number of tables in each write request",
|
||||
)
|
||||
.recorder(&[]);
|
||||
let write_metric_body_size = metrics
|
||||
.register_metric::<U64Counter>(
|
||||
"http_write_body_bytes_total",
|
||||
|
@ -197,6 +204,7 @@ impl<D> HttpDelegate<D, SystemProvider> {
|
|||
dml_handler,
|
||||
write_metric_lines,
|
||||
write_metric_fields,
|
||||
write_metric_tables,
|
||||
write_metric_body_size,
|
||||
delete_metric_body_size,
|
||||
}
|
||||
|
@ -245,9 +253,11 @@ where
|
|||
Err(e) => return Err(Error::ParseLineProtocol(e)),
|
||||
};
|
||||
|
||||
let num_tables = batches.len();
|
||||
debug!(
|
||||
num_lines=stats.num_lines,
|
||||
num_fields=stats.num_fields,
|
||||
num_tables,
|
||||
body_size=body.len(),
|
||||
%namespace,
|
||||
org=%account.org,
|
||||
|
@ -262,6 +272,7 @@ where
|
|||
|
||||
self.write_metric_lines.inc(stats.num_lines as _);
|
||||
self.write_metric_fields.inc(stats.num_fields as _);
|
||||
self.write_metric_tables.inc(num_tables as _);
|
||||
self.write_metric_body_size.inc(body.len() as _);
|
||||
|
||||
Ok(())
|
||||
|
@ -493,6 +504,7 @@ mod tests {
|
|||
if $uri.contains("/api/v2/write") {
|
||||
assert_metric_hit(&metrics, "http_write_lines_total", None);
|
||||
assert_metric_hit(&metrics, "http_write_fields_total", None);
|
||||
assert_metric_hit(&metrics, "http_write_tables_total", None);
|
||||
assert_metric_hit(&metrics, "http_write_body_bytes_total", Some($body.len() as _));
|
||||
} else {
|
||||
assert_metric_hit(&metrics, "http_delete_body_bytes_total", Some($body.len() as _));
|
||||
|
|
|
@ -82,7 +82,7 @@ impl TestContext {
|
|||
let sharded_write_buffer = ShardedWriteBuffer::new(
|
||||
shards
|
||||
.into_iter()
|
||||
.map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer)))
|
||||
.map(|id| Sequencer::new(id as _, Arc::clone(&write_buffer), &metrics))
|
||||
.map(Arc::new)
|
||||
.collect::<JumpHash<_>>(),
|
||||
);
|
||||
|
@ -217,4 +217,17 @@ async fn test_write_ok() {
|
|||
.fetch(),
|
||||
1
|
||||
);
|
||||
|
||||
let histogram = ctx
|
||||
.metrics()
|
||||
.get_instrument::<Metric<U64Histogram>>("sequencer_enqueue_duration_ms")
|
||||
.expect("failed to read metric")
|
||||
.get_observer(&Attributes::from(&[
|
||||
("shard_id", "0"),
|
||||
("result", "success"),
|
||||
]))
|
||||
.expect("failed to get observer")
|
||||
.fetch();
|
||||
let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count);
|
||||
assert_eq!(hit_count, 1);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue