diff --git a/influxdb_iox/src/commands/run/router2.rs b/influxdb_iox/src/commands/run/router2.rs index 3dbc711dbc..2290d1c948 100644 --- a/influxdb_iox/src/commands/run/router2.rs +++ b/influxdb_iox/src/commands/run/router2.rs @@ -199,7 +199,11 @@ pub async fn command(config: Config) -> Result<()> { let handler_stack = InstrumentationDecorator::new("request", Arc::clone(&metrics), handler_stack); - let http = HttpDelegate::new(config.run_config.max_http_request_size, handler_stack); + let http = HttpDelegate::new( + config.run_config.max_http_request_size, + handler_stack, + &metrics, + ); let router_server = RouterServer::new( http, Default::default(), diff --git a/router2/benches/e2e.rs b/router2/benches/e2e.rs index c04fdcd478..9fd8276576 100644 --- a/router2/benches/e2e.rs +++ b/router2/benches/e2e.rs @@ -54,7 +54,7 @@ fn e2e_benchmarks(c: &mut Criterion) { let delegate = { let metrics = Arc::new(metric::Registry::new()); - let catalog: Arc = Arc::new(MemCatalog::new(metrics)); + let catalog: Arc = Arc::new(MemCatalog::new(Arc::clone(&metrics))); let ns_cache = Arc::new(ShardedCache::new( iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10), )); @@ -68,7 +68,7 @@ fn e2e_benchmarks(c: &mut Criterion) { let handler_stack = schema_validator.and_then(partitioner.and_then(FanOutAdaptor::new(write_buffer))); - HttpDelegate::new(1024, handler_stack) + HttpDelegate::new(1024, handler_stack, &metrics) }; let body_str = "platanos,tag1=A,tag2=B val=42i 123456"; diff --git a/router2/src/server/http.rs b/router2/src/server/http.rs index 01893e8897..d835dc7da5 100644 --- a/router2/src/server/http.rs +++ b/router2/src/server/http.rs @@ -8,6 +8,7 @@ use data_types::names::{org_and_bucket_to_database, OrgBucketMappingError}; use futures::StreamExt; use hashbrown::HashMap; use hyper::{header::CONTENT_ENCODING, Body, Method, Request, Response, StatusCode}; +use metric::U64Counter; use mutable_batch::MutableBatch; use observability_deps::tracing::*; use predicate::delete_predicate::{parse_delete_predicate, parse_http_delete_request}; @@ -154,6 +155,11 @@ pub struct HttpDelegate { max_request_bytes: usize, time_provider: T, dml_handler: D, + + write_metric_lines: U64Counter, + write_metric_fields: U64Counter, + write_metric_body_size: U64Counter, + delete_metric_body_size: U64Counter, } impl HttpDelegate { @@ -162,11 +168,40 @@ impl HttpDelegate { /// /// HTTP request bodies are limited to `max_request_bytes` in size, /// returning an error if exceeded. - pub fn new(max_request_bytes: usize, dml_handler: D) -> Self { + pub fn new(max_request_bytes: usize, dml_handler: D, metrics: &metric::Registry) -> Self { + let write_metric_lines = metrics + .register_metric::( + "http_write_lines_total", + "cumulative number of line protocol lines successfully routed", + ) + .recorder(&[]); + let write_metric_fields = metrics + .register_metric::( + "http_write_fields_total", + "cumulative number of line protocol fields successfully routed", + ) + .recorder(&[]); + let write_metric_body_size = metrics + .register_metric::( + "http_write_body_bytes_total", + "cumulative byte size of successfully routed (decompressed) line protocol write requests", + ) + .recorder(&[]); + let delete_metric_body_size = metrics + .register_metric::( + "http_delete_body_bytes_total", + "cumulative byte size of successfully routed (decompressed) delete requests", + ) + .recorder(&[]); + Self { max_request_bytes, time_provider: SystemProvider::default(), dml_handler, + write_metric_lines, + write_metric_fields, + write_metric_body_size, + delete_metric_body_size, } } } @@ -228,6 +263,10 @@ where .await .map_err(Into::into)?; + self.write_metric_lines.inc(stats.num_lines as _); + self.write_metric_fields.inc(stats.num_fields as _); + self.write_metric_body_size.inc(body.len() as _); + Ok(()) } @@ -274,6 +313,8 @@ where .await .map_err(Into::into)?; + self.delete_metric_body_size.inc(body.len() as _); + Ok(()) } @@ -350,6 +391,7 @@ mod tests { use flate2::{write::GzEncoder, Compression}; use hyper::header::HeaderValue; + use metric::{Attributes, Metric}; use crate::dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall}; @@ -357,6 +399,20 @@ mod tests { const MAX_BYTES: usize = 1024; + fn assert_metric_hit(metrics: &metric::Registry, name: &'static str, value: Option) { + let counter = metrics + .get_instrument::>(name) + .expect("failed to read metric") + .get_observer(&Attributes::from(&[])) + .expect("failed to get observer") + .fetch(); + + assert!(counter > 0, "metric {} did not record any values", name); + if let Some(want) = value { + assert_eq!(want, counter, "metric does not have expected value"); + } + } + // Generate two HTTP handler tests - one for a plain request and one with a // gzip-encoded body (and appropriate header), asserting the handler return // value & write op. @@ -427,14 +483,23 @@ mod tests { .with_write_return($dml_write_handler) .with_delete_return($dml_delete_handler) ); - let delegate = HttpDelegate::new(MAX_BYTES, Arc::clone(&dml_handler)); + let metrics = Arc::new(metric::Registry::default()); + let delegate = HttpDelegate::new(MAX_BYTES, Arc::clone(&dml_handler), &metrics); let got = delegate.route(request).await; assert_matches!(got, $want_result); // All successful responses should have a NO_CONTENT code + // and metrics should be recorded. if let Ok(v) = got { assert_eq!(v.status(), StatusCode::NO_CONTENT); + if $uri.contains("/api/v2/write") { + assert_metric_hit(&metrics, "http_write_lines_total", None); + assert_metric_hit(&metrics, "http_write_fields_total", None); + assert_metric_hit(&metrics, "http_write_body_bytes_total", Some($body.len() as _)); + } else { + assert_metric_hit(&metrics, "http_delete_body_bytes_total", Some($body.len() as _)); + } } let calls = dml_handler.calls(); diff --git a/router2/tests/http.rs b/router2/tests/http.rs index 9094f0d985..9a26e93c2c 100644 --- a/router2/tests/http.rs +++ b/router2/tests/http.rs @@ -9,7 +9,7 @@ use iox_catalog::{ interface::{Catalog, KafkaTopicId, QueryPoolId}, mem::MemCatalog, }; -use metric::{Attributes, Metric, Registry, U64Histogram}; +use metric::{Attributes, Metric, Registry, U64Counter, U64Histogram}; use mutable_batch::MutableBatch; use router2::{ dml_handlers::{ @@ -115,7 +115,7 @@ impl TestContext { let handler_stack = InstrumentationDecorator::new("request", Arc::clone(&metrics), handler_stack); - let delegate = HttpDelegate::new(1024, handler_stack); + let delegate = HttpDelegate::new(1024, handler_stack, &metrics); Self { delegate, @@ -209,4 +209,14 @@ async fn test_write_ok() { .fetch(); let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count); assert_eq!(hit_count, 1); + + assert_eq!( + ctx.metrics() + .get_instrument::>("http_write_lines_total") + .expect("failed to read metric") + .get_observer(&Attributes::from(&[])) + .expect("failed to get observer") + .fetch(), + 1 + ); }