feat: http ingest metrics
Records LP line count, field count & request body size (decompressed, byte size) for writes, and request body byte size for deletes.pull/24376/head
parent
44cfdc7aca
commit
bd64f55658
|
@ -199,7 +199,11 @@ pub async fn command(config: Config) -> Result<()> {
|
|||
let handler_stack =
|
||||
InstrumentationDecorator::new("request", Arc::clone(&metrics), handler_stack);
|
||||
|
||||
let http = HttpDelegate::new(config.run_config.max_http_request_size, handler_stack);
|
||||
let http = HttpDelegate::new(
|
||||
config.run_config.max_http_request_size,
|
||||
handler_stack,
|
||||
&metrics,
|
||||
);
|
||||
let router_server = RouterServer::new(
|
||||
http,
|
||||
Default::default(),
|
||||
|
|
|
@ -54,7 +54,7 @@ fn e2e_benchmarks(c: &mut Criterion) {
|
|||
|
||||
let delegate = {
|
||||
let metrics = Arc::new(metric::Registry::new());
|
||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(metrics));
|
||||
let catalog: Arc<dyn Catalog> = Arc::new(MemCatalog::new(Arc::clone(&metrics)));
|
||||
let ns_cache = Arc::new(ShardedCache::new(
|
||||
iter::repeat_with(|| Arc::new(MemoryNamespaceCache::default())).take(10),
|
||||
));
|
||||
|
@ -68,7 +68,7 @@ fn e2e_benchmarks(c: &mut Criterion) {
|
|||
let handler_stack =
|
||||
schema_validator.and_then(partitioner.and_then(FanOutAdaptor::new(write_buffer)));
|
||||
|
||||
HttpDelegate::new(1024, handler_stack)
|
||||
HttpDelegate::new(1024, handler_stack, &metrics)
|
||||
};
|
||||
|
||||
let body_str = "platanos,tag1=A,tag2=B val=42i 123456";
|
||||
|
|
|
@ -8,6 +8,7 @@ use data_types::names::{org_and_bucket_to_database, OrgBucketMappingError};
|
|||
use futures::StreamExt;
|
||||
use hashbrown::HashMap;
|
||||
use hyper::{header::CONTENT_ENCODING, Body, Method, Request, Response, StatusCode};
|
||||
use metric::U64Counter;
|
||||
use mutable_batch::MutableBatch;
|
||||
use observability_deps::tracing::*;
|
||||
use predicate::delete_predicate::{parse_delete_predicate, parse_http_delete_request};
|
||||
|
@ -154,6 +155,11 @@ pub struct HttpDelegate<D, T = SystemProvider> {
|
|||
max_request_bytes: usize,
|
||||
time_provider: T,
|
||||
dml_handler: D,
|
||||
|
||||
write_metric_lines: U64Counter,
|
||||
write_metric_fields: U64Counter,
|
||||
write_metric_body_size: U64Counter,
|
||||
delete_metric_body_size: U64Counter,
|
||||
}
|
||||
|
||||
impl<D> HttpDelegate<D, SystemProvider> {
|
||||
|
@ -162,11 +168,40 @@ impl<D> HttpDelegate<D, SystemProvider> {
|
|||
///
|
||||
/// HTTP request bodies are limited to `max_request_bytes` in size,
|
||||
/// returning an error if exceeded.
|
||||
pub fn new(max_request_bytes: usize, dml_handler: D) -> Self {
|
||||
pub fn new(max_request_bytes: usize, dml_handler: D, metrics: &metric::Registry) -> Self {
|
||||
let write_metric_lines = metrics
|
||||
.register_metric::<U64Counter>(
|
||||
"http_write_lines_total",
|
||||
"cumulative number of line protocol lines successfully routed",
|
||||
)
|
||||
.recorder(&[]);
|
||||
let write_metric_fields = metrics
|
||||
.register_metric::<U64Counter>(
|
||||
"http_write_fields_total",
|
||||
"cumulative number of line protocol fields successfully routed",
|
||||
)
|
||||
.recorder(&[]);
|
||||
let write_metric_body_size = metrics
|
||||
.register_metric::<U64Counter>(
|
||||
"http_write_body_bytes_total",
|
||||
"cumulative byte size of successfully routed (decompressed) line protocol write requests",
|
||||
)
|
||||
.recorder(&[]);
|
||||
let delete_metric_body_size = metrics
|
||||
.register_metric::<U64Counter>(
|
||||
"http_delete_body_bytes_total",
|
||||
"cumulative byte size of successfully routed (decompressed) delete requests",
|
||||
)
|
||||
.recorder(&[]);
|
||||
|
||||
Self {
|
||||
max_request_bytes,
|
||||
time_provider: SystemProvider::default(),
|
||||
dml_handler,
|
||||
write_metric_lines,
|
||||
write_metric_fields,
|
||||
write_metric_body_size,
|
||||
delete_metric_body_size,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -228,6 +263,10 @@ where
|
|||
.await
|
||||
.map_err(Into::into)?;
|
||||
|
||||
self.write_metric_lines.inc(stats.num_lines as _);
|
||||
self.write_metric_fields.inc(stats.num_fields as _);
|
||||
self.write_metric_body_size.inc(body.len() as _);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -274,6 +313,8 @@ where
|
|||
.await
|
||||
.map_err(Into::into)?;
|
||||
|
||||
self.delete_metric_body_size.inc(body.len() as _);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -350,6 +391,7 @@ mod tests {
|
|||
|
||||
use flate2::{write::GzEncoder, Compression};
|
||||
use hyper::header::HeaderValue;
|
||||
use metric::{Attributes, Metric};
|
||||
|
||||
use crate::dml_handlers::mock::{MockDmlHandler, MockDmlHandlerCall};
|
||||
|
||||
|
@ -357,6 +399,20 @@ mod tests {
|
|||
|
||||
const MAX_BYTES: usize = 1024;
|
||||
|
||||
fn assert_metric_hit(metrics: &metric::Registry, name: &'static str, value: Option<u64>) {
|
||||
let counter = metrics
|
||||
.get_instrument::<Metric<U64Counter>>(name)
|
||||
.expect("failed to read metric")
|
||||
.get_observer(&Attributes::from(&[]))
|
||||
.expect("failed to get observer")
|
||||
.fetch();
|
||||
|
||||
assert!(counter > 0, "metric {} did not record any values", name);
|
||||
if let Some(want) = value {
|
||||
assert_eq!(want, counter, "metric does not have expected value");
|
||||
}
|
||||
}
|
||||
|
||||
// Generate two HTTP handler tests - one for a plain request and one with a
|
||||
// gzip-encoded body (and appropriate header), asserting the handler return
|
||||
// value & write op.
|
||||
|
@ -427,14 +483,23 @@ mod tests {
|
|||
.with_write_return($dml_write_handler)
|
||||
.with_delete_return($dml_delete_handler)
|
||||
);
|
||||
let delegate = HttpDelegate::new(MAX_BYTES, Arc::clone(&dml_handler));
|
||||
let metrics = Arc::new(metric::Registry::default());
|
||||
let delegate = HttpDelegate::new(MAX_BYTES, Arc::clone(&dml_handler), &metrics);
|
||||
|
||||
let got = delegate.route(request).await;
|
||||
assert_matches!(got, $want_result);
|
||||
|
||||
// All successful responses should have a NO_CONTENT code
|
||||
// and metrics should be recorded.
|
||||
if let Ok(v) = got {
|
||||
assert_eq!(v.status(), StatusCode::NO_CONTENT);
|
||||
if $uri.contains("/api/v2/write") {
|
||||
assert_metric_hit(&metrics, "http_write_lines_total", None);
|
||||
assert_metric_hit(&metrics, "http_write_fields_total", None);
|
||||
assert_metric_hit(&metrics, "http_write_body_bytes_total", Some($body.len() as _));
|
||||
} else {
|
||||
assert_metric_hit(&metrics, "http_delete_body_bytes_total", Some($body.len() as _));
|
||||
}
|
||||
}
|
||||
|
||||
let calls = dml_handler.calls();
|
||||
|
|
|
@ -9,7 +9,7 @@ use iox_catalog::{
|
|||
interface::{Catalog, KafkaTopicId, QueryPoolId},
|
||||
mem::MemCatalog,
|
||||
};
|
||||
use metric::{Attributes, Metric, Registry, U64Histogram};
|
||||
use metric::{Attributes, Metric, Registry, U64Counter, U64Histogram};
|
||||
use mutable_batch::MutableBatch;
|
||||
use router2::{
|
||||
dml_handlers::{
|
||||
|
@ -115,7 +115,7 @@ impl TestContext {
|
|||
let handler_stack =
|
||||
InstrumentationDecorator::new("request", Arc::clone(&metrics), handler_stack);
|
||||
|
||||
let delegate = HttpDelegate::new(1024, handler_stack);
|
||||
let delegate = HttpDelegate::new(1024, handler_stack, &metrics);
|
||||
|
||||
Self {
|
||||
delegate,
|
||||
|
@ -209,4 +209,14 @@ async fn test_write_ok() {
|
|||
.fetch();
|
||||
let hit_count = histogram.buckets.iter().fold(0, |acc, v| acc + v.count);
|
||||
assert_eq!(hit_count, 1);
|
||||
|
||||
assert_eq!(
|
||||
ctx.metrics()
|
||||
.get_instrument::<Metric<U64Counter>>("http_write_lines_total")
|
||||
.expect("failed to read metric")
|
||||
.get_observer(&Attributes::from(&[]))
|
||||
.expect("failed to get observer")
|
||||
.fetch(),
|
||||
1
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue