feat: migrate http metrics to metric crate (#2508)
* feat: migrate http metrics * chore: review feedback Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
525e99610e
commit
f3bcafcfea
|
@ -4901,9 +4901,11 @@ name = "trace_http"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"hashbrown",
|
||||
"http",
|
||||
"http-body",
|
||||
"itertools 0.10.1",
|
||||
"metric",
|
||||
"observability_deps",
|
||||
"parking_lot",
|
||||
"pin-project 1.0.8",
|
||||
|
|
|
@ -97,6 +97,22 @@ pub struct DurationHistogram {
|
|||
}
|
||||
|
||||
impl DurationHistogram {
|
||||
pub fn fetch(&self) -> HistogramObservation<Duration> {
|
||||
let inner = self.inner.fetch();
|
||||
|
||||
HistogramObservation {
|
||||
total: Duration::from_nanos(inner.total),
|
||||
buckets: inner
|
||||
.buckets
|
||||
.into_iter()
|
||||
.map(|bucket| ObservationBucket {
|
||||
le: Duration::from_nanos(bucket.le),
|
||||
count: bucket.count,
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn record(&self, value: Duration) {
|
||||
self.record_multiple(value, 1)
|
||||
}
|
||||
|
@ -175,19 +191,7 @@ impl MetricObserver for DurationHistogram {
|
|||
}
|
||||
|
||||
fn observe(&self) -> Observation {
|
||||
let inner = self.inner.fetch();
|
||||
|
||||
Observation::DurationHistogram(HistogramObservation {
|
||||
total: Duration::from_nanos(inner.total),
|
||||
buckets: inner
|
||||
.buckets
|
||||
.into_iter()
|
||||
.map(|bucket| ObservationBucket {
|
||||
le: Duration::from_nanos(bucket.le),
|
||||
count: bucket.count,
|
||||
})
|
||||
.collect(),
|
||||
})
|
||||
Observation::DurationHistogram(self.fetch())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -414,6 +414,12 @@ pub struct HistogramObservation<T> {
|
|||
pub buckets: Vec<ObservationBucket<T>>,
|
||||
}
|
||||
|
||||
impl<T> HistogramObservation<T> {
|
||||
pub fn sample_count(&self) -> u64 {
|
||||
self.buckets.iter().map(|bucket| bucket.count).sum()
|
||||
}
|
||||
}
|
||||
|
||||
/// A bucketed observation
|
||||
///
|
||||
/// Stores the number of values that were less than or equal to `le` and
|
||||
|
|
|
@ -258,9 +258,6 @@ pub trait DatabaseStore: std::fmt::Debug + Send + Sync {
|
|||
/// A collection of metrics used to instrument the Server.
|
||||
#[derive(Debug)]
|
||||
pub struct ServerMetrics {
|
||||
/// This metric tracks all requests to the Server
|
||||
pub http_requests: metrics::RedMetric,
|
||||
|
||||
/// The number of LP lines ingested
|
||||
pub ingest_lines_total: metrics::Counter,
|
||||
|
||||
|
@ -277,11 +274,9 @@ pub struct ServerMetrics {
|
|||
impl ServerMetrics {
|
||||
pub fn new(registry: Arc<metrics::MetricRegistry>) -> Self {
|
||||
// Server manages multiple domains.
|
||||
let http_domain = registry.register_domain("http");
|
||||
let ingest_domain = registry.register_domain("ingest");
|
||||
|
||||
Self {
|
||||
http_requests: http_domain.register_red_metric(None),
|
||||
ingest_lines_total: ingest_domain.register_counter_metric(
|
||||
"points",
|
||||
None,
|
||||
|
|
|
@ -35,7 +35,6 @@ use chrono::Utc;
|
|||
use futures::{self, StreamExt};
|
||||
use http::header::{CONTENT_ENCODING, CONTENT_TYPE};
|
||||
use hyper::{http::HeaderValue, Body, Method, Request, Response, StatusCode};
|
||||
use metrics::KeyValue;
|
||||
use observability_deps::tracing::{self, debug, error};
|
||||
use routerify::{prelude::*, Middleware, RequestInfo, Router, RouterError};
|
||||
use serde::Deserialize;
|
||||
|
@ -485,7 +484,6 @@ async fn write<M>(req: Request<Body>) -> Result<Response<Body>, ApplicationError
|
|||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
let path = req.uri().path().to_string();
|
||||
let Server {
|
||||
app_server: server,
|
||||
max_request_size,
|
||||
|
@ -493,12 +491,6 @@ where
|
|||
let max_request_size = *max_request_size;
|
||||
let server = Arc::clone(server);
|
||||
|
||||
// TODO(edd): figure out best way of catching all errors in this observation.
|
||||
let obs = server.metrics().http_requests.observation(); // instrument request
|
||||
|
||||
// TODO - metrics. Implement a macro/something that will catch all the
|
||||
// early returns.
|
||||
|
||||
let query = req.uri().query().context(ExpectedQueryString)?;
|
||||
|
||||
let write_info: WriteInfo = serde_urlencoded::from_str(query).context(InvalidQueryString {
|
||||
|
@ -530,12 +522,6 @@ where
|
|||
let num_lines = lines.len();
|
||||
debug!(num_lines, num_fields, body_size=body.len(), %db_name, org=%write_info.org, bucket=%write_info.bucket, "inserting lines into database");
|
||||
|
||||
let metric_kv = vec![
|
||||
KeyValue::new("org", write_info.org.to_string()),
|
||||
KeyValue::new("bucket", write_info.bucket.to_string()),
|
||||
KeyValue::new("path", path),
|
||||
];
|
||||
|
||||
server
|
||||
.write_lines(&db_name, &lines, default_time)
|
||||
.await
|
||||
|
@ -567,7 +553,6 @@ where
|
|||
);
|
||||
debug!(?e, ?db_name, ?num_lines, "error writing lines");
|
||||
|
||||
obs.client_error_with_attributes(&metric_kv); // user error
|
||||
match e {
|
||||
server::Error::DatabaseNotFound { .. } => ApplicationError::DatabaseNotFound {
|
||||
db_name: db_name.to_string(),
|
||||
|
@ -601,7 +586,6 @@ where
|
|||
.ingest_points_bytes_total
|
||||
.add_with_attributes(body.len() as u64, attributes);
|
||||
|
||||
obs.ok_with_attributes(&metric_kv); // request completed successfully
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NO_CONTENT)
|
||||
.body(Body::empty())
|
||||
|
@ -624,12 +608,8 @@ fn default_format() -> String {
|
|||
async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApplicationError> {
|
||||
let path = req.uri().path().to_string();
|
||||
let server = Arc::clone(&req.data::<Server<M>>().expect("server state").app_server);
|
||||
|
||||
// TODO(edd): figure out best way of catching all errors in this observation.
|
||||
let obs = server.metrics().http_requests.observation(); // instrument request
|
||||
|
||||
let uri_query = req.uri().query().context(ExpectedQueryString {})?;
|
||||
|
||||
let QueryParams { q, format } =
|
||||
|
@ -644,11 +624,6 @@ async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
|||
.expect("db name must have been set by routerify")
|
||||
.clone();
|
||||
|
||||
let metric_kv = vec![
|
||||
KeyValue::new("db_name", db_name_str.clone()),
|
||||
KeyValue::new("path", path),
|
||||
];
|
||||
|
||||
let db_name = DatabaseName::new(&db_name_str).context(DatabaseNameError)?;
|
||||
debug!(uri = ?req.uri(), %q, ?format, %db_name, "running SQL query");
|
||||
|
||||
|
@ -676,9 +651,6 @@ async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
|||
.body(body)
|
||||
.context(CreatingResponse)?;
|
||||
|
||||
// successful query
|
||||
obs.ok_with_attributes(&metric_kv);
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
|
@ -686,14 +658,6 @@ async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
|||
async fn health<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApplicationError> {
|
||||
let server = Arc::clone(&req.data::<Server<M>>().expect("server state").app_server);
|
||||
let path = req.uri().path().to_string();
|
||||
server
|
||||
.metrics()
|
||||
.http_requests
|
||||
.observation()
|
||||
.ok_with_attributes(&[metrics::KeyValue::new("path", path)]);
|
||||
|
||||
let response_body = "OK";
|
||||
Ok(Response::new(Body::from(response_body.to_string())))
|
||||
}
|
||||
|
@ -702,14 +666,6 @@ async fn health<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
|||
async fn handle_metrics<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApplicationError> {
|
||||
let server = Arc::clone(&req.data::<Server<M>>().expect("server state").app_server);
|
||||
let path = req.uri().path().to_string();
|
||||
server
|
||||
.metrics()
|
||||
.http_requests
|
||||
.observation()
|
||||
.ok_with_attributes(&[metrics::KeyValue::new("path", path)]);
|
||||
|
||||
let application = req
|
||||
.data::<Arc<ApplicationState>>()
|
||||
.expect("application state");
|
||||
|
@ -736,12 +692,8 @@ struct DatabaseInfo {
|
|||
async fn list_partitions<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, ApplicationError> {
|
||||
let path = req.uri().path().to_string();
|
||||
|
||||
let server = Arc::clone(&req.data::<Server<M>>().expect("server state").app_server);
|
||||
|
||||
// TODO - catch error conditions
|
||||
let obs = server.metrics().http_requests.observation();
|
||||
let query = req.uri().query().context(ExpectedQueryString {})?;
|
||||
|
||||
let info: DatabaseInfo = serde_urlencoded::from_str(query).context(InvalidQueryString {
|
||||
|
@ -751,11 +703,6 @@ async fn list_partitions<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
|||
let db_name =
|
||||
org_and_bucket_to_database(&info.org, &info.bucket).context(BucketMappingError)?;
|
||||
|
||||
let metric_kv = vec![
|
||||
KeyValue::new("db_name", db_name.to_string()),
|
||||
KeyValue::new("path", path),
|
||||
];
|
||||
|
||||
let db = server.db(&db_name)?;
|
||||
|
||||
let partition_keys =
|
||||
|
@ -768,7 +715,6 @@ async fn list_partitions<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
|||
|
||||
let result = serde_json::to_string(&partition_keys).context(JsonGenerationError)?;
|
||||
|
||||
obs.ok_with_attributes(&metric_kv);
|
||||
Ok(Response::new(Body::from(result)))
|
||||
}
|
||||
|
||||
|
@ -958,8 +904,9 @@ pub async fn serve<M>(
|
|||
where
|
||||
M: ConnectionManager + Send + Sync + Debug + 'static,
|
||||
{
|
||||
let metric_registry = Arc::clone(application.metric_registry_v2());
|
||||
let router = router(application, server, max_request_size);
|
||||
let new_service = tower::MakeService::new(router, trace_collector);
|
||||
let new_service = tower::MakeService::new(router, trace_collector, metric_registry);
|
||||
|
||||
hyper::Server::builder(addr)
|
||||
.serve(new_service)
|
||||
|
@ -980,6 +927,7 @@ mod tests {
|
|||
use reqwest::{Client, Response};
|
||||
|
||||
use data_types::{database_rules::DatabaseRules, server_id::ServerId, DatabaseName};
|
||||
use metric::{Attributes, DurationHistogram, Metric};
|
||||
use metrics::TestMetricRegistry;
|
||||
use object_store::ObjectStore;
|
||||
use serde::de::DeserializeOwned;
|
||||
|
@ -1038,8 +986,29 @@ mod tests {
|
|||
|
||||
let data = response.text().await.unwrap();
|
||||
|
||||
assert!(data.contains(&"\nhttp_requests_total{path=\"/metrics\",status=\"ok\"} 1\n"));
|
||||
assert!(data.contains(&"\nmy_metric_total{tag=\"value\"} 20\n"));
|
||||
|
||||
let response = client
|
||||
.get(&format!("{}/nonexistent", server_url))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(response.status().as_u16(), 404);
|
||||
|
||||
let response = client
|
||||
.get(&format!("{}/metrics", server_url))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let data = response.text().await.unwrap();
|
||||
|
||||
// Should include previous metrics scrape but not the current one
|
||||
assert!(data.contains(&"\nhttp_requests_total{path=\"/metrics\",status=\"ok\"} 1\n"));
|
||||
// Should include 404 but not encode the path
|
||||
assert!(!data.contains(&"nonexistent"));
|
||||
assert!(data.contains(&"\nhttp_requests_total{status=\"client_error\"} 1\n"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -1123,6 +1092,7 @@ mod tests {
|
|||
let application = make_application();
|
||||
let app_server = make_server(Arc::clone(&application));
|
||||
let metric_registry = TestMetricRegistry::new(Arc::clone(application.metric_registry()));
|
||||
let metric_registry_v2 = Arc::clone(application.metric_registry_v2());
|
||||
|
||||
app_server.set_id(ServerId::try_from(1).unwrap()).unwrap();
|
||||
app_server.wait_for_init().await.unwrap();
|
||||
|
@ -1151,17 +1121,18 @@ mod tests {
|
|||
.expect("sent data");
|
||||
|
||||
// The request completed successfully
|
||||
metric_registry
|
||||
.has_metric_family("http_request_duration_seconds")
|
||||
.with_attributes(&[
|
||||
("bucket", "MetricsBucket"),
|
||||
("org", "MetricsOrg"),
|
||||
let request_count = metric_registry_v2
|
||||
.get_instrument::<Metric<DurationHistogram>>("http_request_duration")
|
||||
.unwrap()
|
||||
.get_observer(&Attributes::from(&[
|
||||
("path", "/api/v2/write"),
|
||||
("status", "ok"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_count_eq(1)
|
||||
.unwrap();
|
||||
]))
|
||||
.unwrap()
|
||||
.fetch()
|
||||
.sample_count();
|
||||
|
||||
assert_eq!(request_count, 1);
|
||||
|
||||
// A single successful point landed
|
||||
metric_registry
|
||||
|
|
|
@ -28,10 +28,11 @@ impl MakeService {
|
|||
pub fn new(
|
||||
router: Router<Body, ApplicationError>,
|
||||
collector: Option<Arc<dyn TraceCollector>>,
|
||||
metric_registry: Arc<metric::Registry>,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: RouterService::new(router).unwrap(),
|
||||
trace_layer: TraceLayer::new(collector),
|
||||
trace_layer: TraceLayer::new(metric_registry, collector, false),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -104,7 +104,11 @@ where
|
|||
.context(ReflectionError)?;
|
||||
|
||||
let builder = tonic::transport::Server::builder();
|
||||
let mut builder = builder.layer(trace_http::tower::TraceLayer::new(trace_collector));
|
||||
let mut builder = builder.layer(trace_http::tower::TraceLayer::new(
|
||||
Arc::clone(application.metric_registry_v2()),
|
||||
trace_collector,
|
||||
true,
|
||||
));
|
||||
|
||||
// important that this one is NOT gated so that it can answer health requests
|
||||
add_service!(builder, health_reporter, health_service);
|
||||
|
@ -114,10 +118,7 @@ where
|
|||
builder,
|
||||
health_reporter,
|
||||
serving_readiness,
|
||||
storage::make_server(
|
||||
Arc::clone(&server),
|
||||
Arc::clone(application.metric_registry()),
|
||||
)
|
||||
storage::make_server(Arc::clone(&server),)
|
||||
);
|
||||
add_gated_service!(
|
||||
builder,
|
||||
|
|
|
@ -15,7 +15,6 @@ pub mod input;
|
|||
pub mod service;
|
||||
|
||||
use generated_types::storage_server::{Storage, StorageServer};
|
||||
use metrics::{MetricRegistry, RedMetric};
|
||||
use server::DatabaseStore;
|
||||
use std::sync::Arc;
|
||||
|
||||
|
@ -23,34 +22,8 @@ use std::sync::Arc;
|
|||
#[derive(Debug)]
|
||||
struct StorageService<T: DatabaseStore> {
|
||||
pub db_store: Arc<T>,
|
||||
pub metrics: Metrics,
|
||||
}
|
||||
|
||||
pub fn make_server<T: DatabaseStore + 'static>(
|
||||
db_store: Arc<T>,
|
||||
metrics_registry: Arc<MetricRegistry>,
|
||||
) -> StorageServer<impl Storage> {
|
||||
StorageServer::new(StorageService {
|
||||
db_store,
|
||||
metrics: Metrics::new(metrics_registry),
|
||||
})
|
||||
}
|
||||
|
||||
// These are the metrics associated with the gRPC server.
|
||||
#[derive(Debug)]
|
||||
pub struct Metrics {
|
||||
/// The metric tracking the outcome of requests to the gRPC service
|
||||
pub requests: RedMetric,
|
||||
|
||||
// Holding the registry allows it to be replaced for a test implementation
|
||||
pub(crate) metrics_registry: Arc<MetricRegistry>,
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
fn new(registry: Arc<MetricRegistry>) -> Self {
|
||||
Self {
|
||||
requests: registry.register_domain("gRPC").register_red_metric(None),
|
||||
metrics_registry: registry,
|
||||
}
|
||||
}
|
||||
pub fn make_server<T: DatabaseStore + 'static>(db_store: Arc<T>) -> StorageServer<impl Storage> {
|
||||
StorageServer::new(StorageService { db_store })
|
||||
}
|
||||
|
|
|
@ -18,7 +18,6 @@ use generated_types::{
|
|||
ReadWindowAggregateRequest, StringValuesResponse, TagKeysRequest, TagValuesRequest,
|
||||
TimestampRange,
|
||||
};
|
||||
use metrics::KeyValue;
|
||||
use observability_deps::tracing::{error, info};
|
||||
use query::{
|
||||
exec::{fieldlist::FieldList, seriesset::Error as SeriesSetError, ExecutionContextProvider},
|
||||
|
@ -206,10 +205,6 @@ impl Error {
|
|||
Self::NotYetImplemented { .. } => Status::internal(self.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn is_internal(&self) -> bool {
|
||||
matches!(self.to_status().code(), tonic::Code::Internal)
|
||||
}
|
||||
}
|
||||
|
||||
/// Implementes the protobuf defined Storage service for a DatabaseStore
|
||||
|
@ -237,27 +232,12 @@ where
|
|||
|
||||
info!(%db_name, ?range, predicate=%predicate.loggable(),"read filter");
|
||||
|
||||
let ob = self.metrics.requests.observation();
|
||||
let attributes = &[
|
||||
KeyValue::new("operation", "read_filter"),
|
||||
KeyValue::new("db_name", db_name.to_string()),
|
||||
];
|
||||
|
||||
let results = read_filter_impl(self.db_store.as_ref(), db_name, range, predicate, span_ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if e.is_internal() {
|
||||
ob.error_with_attributes(attributes);
|
||||
} else {
|
||||
ob.client_error_with_attributes(attributes);
|
||||
}
|
||||
e.to_status()
|
||||
})?
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(Ok)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
ob.ok_with_attributes(attributes);
|
||||
Ok(tonic::Response::new(futures::stream::iter(results)))
|
||||
}
|
||||
|
||||
|
@ -284,14 +264,7 @@ where
|
|||
|
||||
info!(%db_name, ?range, ?group_keys, ?group, ?aggregate,predicate=%predicate.loggable(),"read_group");
|
||||
|
||||
let ob = self.metrics.requests.observation();
|
||||
let attributes = &[
|
||||
KeyValue::new("operation", "read_group"),
|
||||
KeyValue::new("db_name", db_name.to_string()),
|
||||
];
|
||||
|
||||
if hints != 0 {
|
||||
ob.error_with_attributes(attributes);
|
||||
InternalHintsFieldNotSupported { hints }.fail()?
|
||||
}
|
||||
|
||||
|
@ -300,21 +273,12 @@ where
|
|||
aggregate, group, group_keys
|
||||
);
|
||||
|
||||
let group = expr::convert_group_type(group)
|
||||
.context(ConvertingReadGroupType {
|
||||
aggregate_string: &aggregate_string,
|
||||
})
|
||||
.map_err(|e| {
|
||||
ob.client_error_with_attributes(attributes);
|
||||
e
|
||||
})?;
|
||||
let group = expr::convert_group_type(group).context(ConvertingReadGroupType {
|
||||
aggregate_string: &aggregate_string,
|
||||
})?;
|
||||
|
||||
let gby_agg = expr::make_read_group_aggregate(aggregate, group, group_keys)
|
||||
.context(ConvertingReadGroupAggregate { aggregate_string })
|
||||
.map_err(|e| {
|
||||
ob.client_error_with_attributes(attributes);
|
||||
e
|
||||
})?;
|
||||
.context(ConvertingReadGroupAggregate { aggregate_string })?;
|
||||
|
||||
let results = query_group_impl(
|
||||
self.db_store.as_ref(),
|
||||
|
@ -325,19 +289,11 @@ where
|
|||
span_ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if e.is_internal() {
|
||||
ob.error_with_attributes(attributes);
|
||||
} else {
|
||||
ob.client_error_with_attributes(attributes);
|
||||
}
|
||||
e.to_status()
|
||||
})?
|
||||
.map_err(|e| e.to_status())?
|
||||
.into_iter()
|
||||
.map(Ok)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
ob.ok_with_attributes(attributes);
|
||||
Ok(tonic::Response::new(futures::stream::iter(results)))
|
||||
}
|
||||
|
||||
|
@ -365,23 +321,13 @@ where
|
|||
|
||||
info!(%db_name, ?range, ?window_every, ?offset, ?aggregate, ?window, predicate=%predicate.loggable(),"read_window_aggregate");
|
||||
|
||||
let ob = self.metrics.requests.observation();
|
||||
let attributes = &[
|
||||
KeyValue::new("operation", "read_window_aggregate"),
|
||||
KeyValue::new("db_name", db_name.to_string()),
|
||||
];
|
||||
|
||||
let aggregate_string = format!(
|
||||
"aggregate: {:?}, window_every: {:?}, offset: {:?}, window: {:?}",
|
||||
aggregate, window_every, offset, window
|
||||
);
|
||||
|
||||
let gby_agg = expr::make_read_window_aggregate(aggregate, window_every, offset, window)
|
||||
.context(ConvertingWindowAggregate { aggregate_string })
|
||||
.map_err(|e| {
|
||||
ob.client_error_with_attributes(attributes);
|
||||
e
|
||||
})?;
|
||||
.context(ConvertingWindowAggregate { aggregate_string })?;
|
||||
|
||||
let results = query_group_impl(
|
||||
self.db_store.as_ref(),
|
||||
|
@ -392,19 +338,11 @@ where
|
|||
span_ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if e.is_internal() {
|
||||
ob.error_with_attributes(attributes);
|
||||
} else {
|
||||
ob.client_error_with_attributes(attributes);
|
||||
}
|
||||
e.to_status()
|
||||
})?
|
||||
.map_err(|e| e.to_status())?
|
||||
.into_iter()
|
||||
.map(Ok)
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
ob.ok_with_attributes(attributes);
|
||||
Ok(tonic::Response::new(futures::stream::iter(results)))
|
||||
}
|
||||
|
||||
|
@ -429,12 +367,6 @@ where
|
|||
|
||||
info!(%db_name, ?range, predicate=%predicate.loggable(), "tag_keys");
|
||||
|
||||
let ob = self.metrics.requests.observation();
|
||||
let attributes = &[
|
||||
KeyValue::new("operation", "tag_keys"),
|
||||
KeyValue::new("db_name", db_name.to_string()),
|
||||
];
|
||||
|
||||
let measurement = None;
|
||||
|
||||
let response = tag_keys_impl(
|
||||
|
@ -446,20 +378,12 @@ where
|
|||
span_ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if e.is_internal() {
|
||||
ob.error_with_attributes(attributes);
|
||||
} else {
|
||||
ob.client_error_with_attributes(attributes);
|
||||
}
|
||||
e.to_status()
|
||||
});
|
||||
.map_err(|e| e.to_status());
|
||||
|
||||
tx.send(response)
|
||||
.await
|
||||
.expect("sending tag_keys response to server");
|
||||
|
||||
ob.ok_with_attributes(attributes);
|
||||
Ok(tonic::Response::new(ReceiverStream::new(rx)))
|
||||
}
|
||||
|
||||
|
@ -476,12 +400,6 @@ where
|
|||
|
||||
let db_name = get_database_name(&tag_values_request)?;
|
||||
|
||||
let ob = self.metrics.requests.observation();
|
||||
let attributes = &[
|
||||
KeyValue::new("operation", "tag_values"),
|
||||
KeyValue::new("db_name", db_name.to_string()),
|
||||
];
|
||||
|
||||
let TagValuesRequest {
|
||||
tags_source: _tag_source,
|
||||
range,
|
||||
|
@ -497,23 +415,13 @@ where
|
|||
info!(%db_name, ?range, predicate=%predicate.loggable(), "tag_values with tag_key=[x00] (measurement name)");
|
||||
|
||||
if predicate.is_some() {
|
||||
ob.client_error_with_attributes(attributes);
|
||||
return Err(Error::NotYetImplemented {
|
||||
operation: "tag_value for a measurement, with general predicate".to_string(),
|
||||
}
|
||||
.to_status());
|
||||
}
|
||||
|
||||
measurement_name_impl(self.db_store.as_ref(), db_name, range, span_ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if e.is_internal() {
|
||||
ob.error_with_attributes(attributes);
|
||||
} else {
|
||||
ob.client_error_with_attributes(attributes);
|
||||
}
|
||||
e
|
||||
})
|
||||
measurement_name_impl(self.db_store.as_ref(), db_name, range, span_ctx).await
|
||||
} else if tag_key.is_field() {
|
||||
info!(%db_name, ?range, predicate=%predicate.loggable(), "tag_values with tag_key=[xff] (field name)");
|
||||
|
||||
|
@ -525,15 +433,7 @@ where
|
|||
predicate,
|
||||
span_ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if e.is_internal() {
|
||||
ob.error_with_attributes(attributes);
|
||||
} else {
|
||||
ob.client_error_with_attributes(attributes);
|
||||
}
|
||||
e
|
||||
})?;
|
||||
.await?;
|
||||
|
||||
// Pick out the field names into a Vec<Vec<u8>>for return
|
||||
let values = fieldlist
|
||||
|
@ -544,12 +444,7 @@ where
|
|||
|
||||
Ok(StringValuesResponse { values })
|
||||
} else {
|
||||
let tag_key = String::from_utf8(tag_key)
|
||||
.context(ConvertingTagKeyInTagValues)
|
||||
.map_err(|e| {
|
||||
ob.client_error_with_attributes(attributes);
|
||||
e
|
||||
})?;
|
||||
let tag_key = String::from_utf8(tag_key).context(ConvertingTagKeyInTagValues)?;
|
||||
|
||||
info!(%db_name, ?range, %tag_key, predicate=%predicate.loggable(), "tag_values",);
|
||||
|
||||
|
@ -571,7 +466,6 @@ where
|
|||
.await
|
||||
.expect("sending tag_values response to server");
|
||||
|
||||
ob.ok_with_attributes(attributes);
|
||||
Ok(tonic::Response::new(ReceiverStream::new(rx)))
|
||||
}
|
||||
|
||||
|
@ -652,28 +546,14 @@ where
|
|||
|
||||
info!(%db_name, ?range, predicate=%predicate.loggable(), "measurement_names");
|
||||
|
||||
let ob = self.metrics.requests.observation();
|
||||
let attributes = &[
|
||||
KeyValue::new("operation", "measurement_names"),
|
||||
KeyValue::new("db_name", db_name.to_string()),
|
||||
];
|
||||
|
||||
let response = measurement_name_impl(self.db_store.as_ref(), db_name, range, span_ctx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if e.is_internal() {
|
||||
ob.error_with_attributes(attributes);
|
||||
} else {
|
||||
ob.client_error_with_attributes(attributes);
|
||||
}
|
||||
e.to_status()
|
||||
});
|
||||
.map_err(|e| e.to_status());
|
||||
|
||||
tx.send(response)
|
||||
.await
|
||||
.expect("sending measurement names response to server");
|
||||
|
||||
ob.ok_with_attributes(attributes);
|
||||
Ok(tonic::Response::new(ReceiverStream::new(rx)))
|
||||
}
|
||||
|
||||
|
@ -699,12 +579,6 @@ where
|
|||
|
||||
info!(%db_name, ?range, %measurement, predicate=%predicate.loggable(), "measurement_tag_keys");
|
||||
|
||||
let ob = self.metrics.requests.observation();
|
||||
let attributes = &[
|
||||
KeyValue::new("operation", "measurement_tag_keys"),
|
||||
KeyValue::new("db_name", db_name.to_string()),
|
||||
];
|
||||
|
||||
let measurement = Some(measurement);
|
||||
|
||||
let response = tag_keys_impl(
|
||||
|
@ -716,20 +590,12 @@ where
|
|||
span_ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if e.is_internal() {
|
||||
ob.error_with_attributes(attributes);
|
||||
} else {
|
||||
ob.client_error_with_attributes(attributes);
|
||||
}
|
||||
e.to_status()
|
||||
});
|
||||
.map_err(|e| e.to_status());
|
||||
|
||||
tx.send(response)
|
||||
.await
|
||||
.expect("sending measurement_tag_keys response to server");
|
||||
|
||||
ob.ok_with_attributes(attributes);
|
||||
Ok(tonic::Response::new(ReceiverStream::new(rx)))
|
||||
}
|
||||
|
||||
|
@ -756,12 +622,6 @@ where
|
|||
|
||||
info!(%db_name, ?range, %measurement, %tag_key, predicate=%predicate.loggable(), "measurement_tag_values");
|
||||
|
||||
let ob = self.metrics.requests.observation();
|
||||
let attributes = &[
|
||||
KeyValue::new("operation", "measurement_tag_values"),
|
||||
KeyValue::new("db_name", db_name.to_string()),
|
||||
];
|
||||
|
||||
let measurement = Some(measurement);
|
||||
|
||||
let response = tag_values_impl(
|
||||
|
@ -774,20 +634,12 @@ where
|
|||
span_ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if e.is_internal() {
|
||||
ob.error_with_attributes(attributes);
|
||||
} else {
|
||||
ob.client_error_with_attributes(attributes);
|
||||
}
|
||||
e.to_status()
|
||||
});
|
||||
.map_err(|e| e.to_status());
|
||||
|
||||
tx.send(response)
|
||||
.await
|
||||
.expect("sending measurement_tag_values response to server");
|
||||
|
||||
ob.ok_with_attributes(attributes);
|
||||
Ok(tonic::Response::new(ReceiverStream::new(rx)))
|
||||
}
|
||||
|
||||
|
@ -813,12 +665,6 @@ where
|
|||
|
||||
info!(%db_name, ?range, predicate=%predicate.loggable(), "measurement_fields");
|
||||
|
||||
let ob = self.metrics.requests.observation();
|
||||
let attributes = &[
|
||||
KeyValue::new("operation", "measurement_fields"),
|
||||
KeyValue::new("db_name", db_name.to_string()),
|
||||
];
|
||||
|
||||
let measurement = Some(measurement);
|
||||
|
||||
let response = field_names_impl(
|
||||
|
@ -835,20 +681,12 @@ where
|
|||
.context(ConvertingFieldList)
|
||||
.map_err(|e| e.to_status())
|
||||
})
|
||||
.map_err(|e| {
|
||||
if e.is_internal() {
|
||||
ob.error_with_attributes(attributes);
|
||||
} else {
|
||||
ob.client_error_with_attributes(attributes);
|
||||
}
|
||||
e.to_status()
|
||||
})?;
|
||||
.map_err(|e| e.to_status())?;
|
||||
|
||||
tx.send(response)
|
||||
.await
|
||||
.expect("sending measurement_fields response to server");
|
||||
|
||||
ob.ok_with_attributes(attributes);
|
||||
Ok(tonic::Response::new(ReceiverStream::new(rx)))
|
||||
}
|
||||
}
|
||||
|
@ -1220,6 +1058,7 @@ mod tests {
|
|||
use test_helpers::{assert_contains, tracing::TracingCapture};
|
||||
|
||||
use super::*;
|
||||
use metric::{Attributes, Metric, U64Counter};
|
||||
|
||||
fn to_str_vec(s: &[&str]) -> Vec<String> {
|
||||
s.iter().map(|s| s.to_string()).collect()
|
||||
|
@ -1229,21 +1068,28 @@ mod tests {
|
|||
// correctly updated.
|
||||
fn grpc_request_metric_has_count(
|
||||
fixture: &Fixture,
|
||||
operation: &'static str,
|
||||
path: &'static str,
|
||||
status: &'static str,
|
||||
count: usize,
|
||||
) -> Result<(), metrics::Error> {
|
||||
fixture
|
||||
count: u64,
|
||||
) {
|
||||
let metric = fixture
|
||||
.test_storage
|
||||
.metrics_registry
|
||||
.has_metric_family("gRPC_requests_total")
|
||||
.with_attributes(&[
|
||||
("operation", operation),
|
||||
("db_name", "000000000000007b_00000000000001c8"),
|
||||
("status", status),
|
||||
])
|
||||
.counter()
|
||||
.eq(count as f64)
|
||||
.metric_registry
|
||||
.get_instrument::<Metric<U64Counter>>("grpc_requests")
|
||||
.unwrap();
|
||||
|
||||
let observation = metric
|
||||
.get_observer(&Attributes::from([
|
||||
(
|
||||
"path",
|
||||
format!("/influxdata.platform.storage.Storage/{}", path).into(),
|
||||
),
|
||||
("status", status.into()),
|
||||
]))
|
||||
.unwrap()
|
||||
.fetch();
|
||||
|
||||
assert_eq!(observation, count);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -1353,7 +1199,7 @@ mod tests {
|
|||
expected_predicate
|
||||
);
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "measurement_names", "ok", 2).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "MeasurementNames", "ok", 2);
|
||||
}
|
||||
|
||||
/// test the plumbing of the RPC layer for tag_keys -- specifically that
|
||||
|
@ -1422,7 +1268,7 @@ mod tests {
|
|||
expected_predicate
|
||||
);
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "tag_keys", "ok", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "TagKeys", "ok", 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -1456,7 +1302,7 @@ mod tests {
|
|||
let response = fixture.storage_client.tag_keys(request).await;
|
||||
assert_contains!(response.unwrap_err().to_string(), "Sugar we are going down");
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "tag_keys", "client_error", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "TagKeys", "client_error", 1);
|
||||
}
|
||||
|
||||
/// test the plumbing of the RPC layer for measurement_tag_keys--
|
||||
|
@ -1536,7 +1382,7 @@ mod tests {
|
|||
expected_predicate
|
||||
);
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "measurement_tag_keys", "ok", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "MeasurementTagKeys", "ok", 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -1572,7 +1418,7 @@ mod tests {
|
|||
let response = fixture.storage_client.measurement_tag_keys(request).await;
|
||||
assert_contains!(response.unwrap_err().to_string(), "This is an error");
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "measurement_tag_keys", "client_error", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "MeasurementTagKeys", "client_error", 1);
|
||||
}
|
||||
|
||||
/// test the plumbing of the RPC layer for tag_keys -- specifically that
|
||||
|
@ -1611,7 +1457,7 @@ mod tests {
|
|||
let actual_tag_values = fixture.storage_client.tag_values(request).await.unwrap();
|
||||
assert_eq!(actual_tag_values, vec!["MA"]);
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "tag_values", "ok", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "TagValues", "ok", 1);
|
||||
}
|
||||
|
||||
/// test the plumbing of the RPC layer for tag_values
|
||||
|
@ -1655,7 +1501,7 @@ mod tests {
|
|||
"unexpected tag values while getting tag values for measurement names"
|
||||
);
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "tag_values", "ok", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "TagValues", "ok", 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -1699,7 +1545,7 @@ mod tests {
|
|||
"unexpected tag values while getting tag values for field names"
|
||||
);
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "tag_values", "ok", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "TagValues", "ok", 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -1762,7 +1608,7 @@ mod tests {
|
|||
"Error converting tag_key to UTF-8 in tag_values request"
|
||||
);
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "tag_values", "client_error", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "TagValues", "client_error", 2);
|
||||
}
|
||||
|
||||
/// test the plumbing of the RPC layer for measurement_tag_values
|
||||
|
@ -1808,7 +1654,7 @@ mod tests {
|
|||
"unexpected tag values while getting tag values",
|
||||
);
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "measurement_tag_values", "ok", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "MeasurementTagValues", "ok", 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -1851,7 +1697,7 @@ mod tests {
|
|||
|
||||
assert_contains!(response_string, "Sugar we are going down");
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "measurement_tag_values", "ok", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "MeasurementTagValues", "client_error", 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -1946,7 +1792,7 @@ mod tests {
|
|||
"unexpected frames returned by query_series"
|
||||
);
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "read_filter", "ok", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "ReadFilter", "ok", 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -1981,7 +1827,7 @@ mod tests {
|
|||
let response = fixture.storage_client.read_filter(request).await;
|
||||
assert_contains!(response.unwrap_err().to_string(), "Sugar we are going down");
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "read_filter", "client_error", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "ReadFilter", "client_error", 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -2024,7 +1870,7 @@ mod tests {
|
|||
|
||||
assert_eq!(frames.len(), 1);
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "read_group", "ok", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "ReadGroup", "ok", 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -2074,7 +1920,7 @@ mod tests {
|
|||
"Unexpected hint value on read_group request. Expected 0, got 42"
|
||||
);
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "read_group", "error", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "ReadGroup", "server_error", 1);
|
||||
|
||||
// ---
|
||||
// test error returned in database processing
|
||||
|
@ -2100,7 +1946,7 @@ mod tests {
|
|||
.to_string();
|
||||
assert_contains!(response_string, "Sugar we are going down");
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "read_group", "client_error", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "ReadGroup", "client_error", 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -2154,7 +2000,7 @@ mod tests {
|
|||
"unexpected frames returned by query_groups"
|
||||
);
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "read_window_aggregate", "ok", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "ReadWindowAggregate", "ok", 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -2266,8 +2112,7 @@ mod tests {
|
|||
|
||||
assert_contains!(response_string, "Sugar we are going down");
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "read_window_aggregate", "client_error", 1)
|
||||
.unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "ReadWindowAggregate", "client_error", 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -2314,7 +2159,7 @@ mod tests {
|
|||
"unexpected frames returned by measurement_fields"
|
||||
);
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "measurement_fields", "ok", 1).unwrap();
|
||||
grpc_request_metric_has_count(&fixture, "MeasurementFields", "ok", 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -2432,11 +2277,15 @@ mod tests {
|
|||
);
|
||||
|
||||
let router = tonic::transport::Server::builder()
|
||||
.layer(trace_http::tower::TraceLayer::new(
|
||||
Arc::clone(&test_storage.metric_registry),
|
||||
None,
|
||||
true,
|
||||
))
|
||||
.add_service(crate::influxdb_ioxd::rpc::testing::make_server())
|
||||
.add_service(crate::influxdb_ioxd::rpc::storage::make_server(
|
||||
Arc::clone(&test_storage),
|
||||
test_storage.metrics_registry.registry(),
|
||||
));
|
||||
.add_service(crate::influxdb_ioxd::rpc::storage::make_server(Arc::clone(
|
||||
&test_storage,
|
||||
)));
|
||||
|
||||
let server = async move {
|
||||
let stream = TcpListenerStream::new(socket);
|
||||
|
@ -2471,7 +2320,7 @@ mod tests {
|
|||
pub struct TestDatabaseStore {
|
||||
databases: Mutex<BTreeMap<String, Arc<TestDatabase>>>,
|
||||
executor: Arc<Executor>,
|
||||
pub metrics_registry: metrics::TestMetricRegistry,
|
||||
pub metric_registry: Arc<metric::Registry>,
|
||||
}
|
||||
|
||||
impl TestDatabaseStore {
|
||||
|
@ -2485,7 +2334,7 @@ mod tests {
|
|||
Self {
|
||||
databases: Mutex::new(BTreeMap::new()),
|
||||
executor: Arc::new(Executor::new(1)),
|
||||
metrics_registry: metrics::TestMetricRegistry::default(),
|
||||
metric_registry: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,9 +9,11 @@ description = "Distributed tracing support for HTTP services"
|
|||
|
||||
trace = { path = "../trace" }
|
||||
futures = "0.3"
|
||||
hashbrown = "0.11"
|
||||
http = "0.2"
|
||||
http-body = "0.4"
|
||||
itertools = "0.10"
|
||||
metric = { path = "../metric" }
|
||||
observability_deps = { path = "../observability_deps" }
|
||||
parking_lot = "0.11"
|
||||
pin-project = "1.0"
|
||||
|
|
|
@ -0,0 +1,90 @@
|
|||
use std::borrow::Cow;
|
||||
|
||||
/// A classification of if a given request was successful
|
||||
///
|
||||
/// Note: the variant order defines the override order for classification
|
||||
/// e.g. a request that encounters both a ClientErr and a ServerErr will
|
||||
/// be recorded as a ServerErr
|
||||
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
|
||||
pub enum Classification {
|
||||
/// Successful request
|
||||
Ok,
|
||||
/// The request was to an unrecognised path
|
||||
///
|
||||
/// This is used by the metrics collection to avoid generating a new set of metrics
|
||||
/// for a request path that doesn't correspond to a valid route
|
||||
PathNotFound,
|
||||
/// The request was unsuccessful but it was not the fault of the service
|
||||
ClientErr,
|
||||
/// The request was unsuccessful and it was the fault of the service
|
||||
ServerErr,
|
||||
}
|
||||
|
||||
pub fn classify_response<B>(response: &http::Response<B>) -> (Cow<'static, str>, Classification) {
|
||||
let status = response.status();
|
||||
match status {
|
||||
http::StatusCode::OK | http::StatusCode::CREATED | http::StatusCode::NO_CONTENT => {
|
||||
classify_headers(Some(response.headers()))
|
||||
}
|
||||
http::StatusCode::BAD_REQUEST => ("bad request".into(), Classification::ClientErr),
|
||||
// This is potentially over-zealous but errs on the side of caution
|
||||
http::StatusCode::NOT_FOUND => ("not found".into(), Classification::PathNotFound),
|
||||
http::StatusCode::TOO_MANY_REQUESTS => {
|
||||
("too many requests".into(), Classification::ClientErr)
|
||||
}
|
||||
http::StatusCode::INTERNAL_SERVER_ERROR => {
|
||||
("internal server error".into(), Classification::ServerErr)
|
||||
}
|
||||
_ => (
|
||||
format!("unexpected status code: {}", status).into(),
|
||||
Classification::ServerErr,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// gRPC indicates failure via a [special][1] header allowing it to signal an error
|
||||
/// at the end of an HTTP chunked stream as part of the [response trailer][2]
|
||||
///
|
||||
/// [1]: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
|
||||
/// [2]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer
|
||||
pub fn classify_headers(
|
||||
headers: Option<&http::header::HeaderMap>,
|
||||
) -> (Cow<'static, str>, Classification) {
|
||||
match headers.and_then(|headers| headers.get("grpc-status")) {
|
||||
Some(header) => {
|
||||
let value = match header.to_str() {
|
||||
Ok(value) => value,
|
||||
Err(_) => return ("grpc status not string".into(), Classification::ServerErr),
|
||||
};
|
||||
let value: i32 = match value.parse() {
|
||||
Ok(value) => value,
|
||||
Err(_) => return ("grpc status not integer".into(), Classification::ServerErr),
|
||||
};
|
||||
|
||||
match value {
|
||||
0 => ("ok".into(), Classification::Ok),
|
||||
1 => ("cancelled".into(), Classification::ClientErr),
|
||||
2 => ("unknown".into(), Classification::ServerErr),
|
||||
3 => ("invalid argument".into(), Classification::ClientErr),
|
||||
4 => ("deadline exceeded".into(), Classification::ServerErr),
|
||||
5 => ("not found".into(), Classification::ClientErr),
|
||||
6 => ("already exists".into(), Classification::ClientErr),
|
||||
7 => ("permission denied".into(), Classification::ClientErr),
|
||||
8 => ("resource exhausted".into(), Classification::ServerErr),
|
||||
9 => ("failed precondition".into(), Classification::ClientErr),
|
||||
10 => ("aborted".into(), Classification::ClientErr),
|
||||
11 => ("out of range".into(), Classification::ClientErr),
|
||||
12 => ("unimplemented".into(), Classification::ServerErr),
|
||||
13 => ("internal".into(), Classification::ServerErr),
|
||||
14 => ("unavailable".into(), Classification::ServerErr),
|
||||
15 => ("data loss".into(), Classification::ServerErr),
|
||||
16 => ("unauthenticated".into(), Classification::ClientErr),
|
||||
_ => (
|
||||
format!("unrecognised status code: {}", value).into(),
|
||||
Classification::ServerErr,
|
||||
),
|
||||
}
|
||||
}
|
||||
None => ("ok".into(), Classification::Ok),
|
||||
}
|
||||
}
|
|
@ -7,5 +7,7 @@
|
|||
clippy::future_not_send
|
||||
)]
|
||||
|
||||
mod classify;
|
||||
pub mod ctx;
|
||||
mod metrics;
|
||||
pub mod tower;
|
||||
|
|
|
@ -0,0 +1,158 @@
|
|||
use crate::classify::Classification;
|
||||
use hashbrown::HashMap;
|
||||
use metric::{Attributes, DurationHistogram, Metric, U64Counter};
|
||||
use parking_lot::{MappedMutexGuard, Mutex, MutexGuard};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
/// `MetricsCollection` is used to retrieve `MetricsRecorder` for instrumenting http requests
|
||||
#[derive(Debug)]
|
||||
pub struct MetricsCollection {
|
||||
/// Whether this `MetricCollection` should publish to grpc_request* or http_request*
|
||||
is_grpc: bool,
|
||||
|
||||
/// Metric registry for registering new metrics
|
||||
metric_registry: Arc<metric::Registry>,
|
||||
|
||||
/// Metrics keyed by request path or None for 404 responses
|
||||
metrics: Mutex<HashMap<Option<String>, Metrics>>,
|
||||
}
|
||||
|
||||
impl MetricsCollection {
|
||||
pub fn new(metric_registry: Arc<metric::Registry>, is_grpc: bool) -> Self {
|
||||
Self {
|
||||
is_grpc,
|
||||
metric_registry,
|
||||
metrics: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the `MetricsRecorder` for a given http request
|
||||
pub fn recorder<B>(self: &Arc<Self>, request: &http::Request<B>) -> MetricsRecorder {
|
||||
MetricsRecorder {
|
||||
metrics: Arc::clone(self),
|
||||
start_instant: Instant::now(),
|
||||
path: Some(request.uri().path().to_string()),
|
||||
classification: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn request_metrics(&self, path: Option<String>) -> MappedMutexGuard<'_, Metrics> {
|
||||
MutexGuard::map(self.metrics.lock(), |metrics| {
|
||||
let (_, request_metrics) =
|
||||
metrics.raw_entry_mut().from_key(&path).or_insert_with(|| {
|
||||
let attributes = match path.as_ref() {
|
||||
Some(path) => Attributes::from([("path", path.clone().into())]),
|
||||
None => Attributes::from([]),
|
||||
};
|
||||
|
||||
let metrics =
|
||||
Metrics::new(self.metric_registry.as_ref(), attributes, self.is_grpc);
|
||||
(path, metrics)
|
||||
});
|
||||
request_metrics
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// The request metrics for a specific set of attributes (e.g. path)
|
||||
#[derive(Debug)]
|
||||
struct Metrics {
|
||||
ok: U64Counter,
|
||||
client_error: U64Counter,
|
||||
server_error: U64Counter,
|
||||
aborted: U64Counter,
|
||||
|
||||
duration_ok: DurationHistogram,
|
||||
duration_client_error: DurationHistogram,
|
||||
duration_server_error: DurationHistogram,
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
fn new(registry: &metric::Registry, attributes: impl Into<Attributes>, is_grpc: bool) -> Self {
|
||||
let (counter, duration) = match is_grpc {
|
||||
true => ("grpc_requests", "grpc_request_duration"),
|
||||
false => ("http_requests", "http_request_duration"),
|
||||
};
|
||||
|
||||
let counter: Metric<U64Counter> =
|
||||
registry.register_metric(counter, "accumulated total requests");
|
||||
|
||||
let duration: Metric<DurationHistogram> =
|
||||
registry.register_metric(duration, "distribution of request latencies");
|
||||
|
||||
let mut attributes = attributes.into();
|
||||
|
||||
attributes.insert("status", "ok");
|
||||
let ok = counter.recorder(attributes.clone());
|
||||
let duration_ok = duration.recorder(attributes.clone());
|
||||
|
||||
attributes.insert("status", "client_error");
|
||||
let client_error = counter.recorder(attributes.clone());
|
||||
let duration_client_error = duration.recorder(attributes.clone());
|
||||
|
||||
attributes.insert("status", "server_error");
|
||||
let server_error = counter.recorder(attributes.clone());
|
||||
let duration_server_error = duration.recorder(attributes.clone());
|
||||
|
||||
attributes.insert("status", "aborted");
|
||||
let aborted = counter.recorder(attributes.clone());
|
||||
|
||||
Self {
|
||||
ok,
|
||||
client_error,
|
||||
server_error,
|
||||
aborted,
|
||||
duration_ok,
|
||||
duration_client_error,
|
||||
duration_server_error,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A `MetricsRecorder` is used to record metrics for a given http request
|
||||
#[derive(Debug)]
|
||||
pub struct MetricsRecorder {
|
||||
metrics: Arc<MetricsCollection>,
|
||||
start_instant: Instant,
|
||||
path: Option<String>,
|
||||
classification: Option<Classification>,
|
||||
}
|
||||
|
||||
impl MetricsRecorder {
|
||||
/// Sets the classification of this request if not already set
|
||||
pub fn set_classification(&mut self, classification: Classification) {
|
||||
if matches!(classification, Classification::PathNotFound) {
|
||||
// Don't want to pollute metrics with invalid paths
|
||||
self.path = None
|
||||
}
|
||||
|
||||
self.classification = Some(match self.classification {
|
||||
Some(existing) => existing.max(classification),
|
||||
None => classification,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for MetricsRecorder {
|
||||
fn drop(&mut self) {
|
||||
let metrics = self.metrics.request_metrics(self.path.take());
|
||||
|
||||
let duration = self.start_instant.elapsed();
|
||||
match self.classification {
|
||||
Some(Classification::Ok) => {
|
||||
metrics.ok.inc(1);
|
||||
metrics.duration_ok.record(duration);
|
||||
}
|
||||
Some(Classification::ClientErr) | Some(Classification::PathNotFound) => {
|
||||
metrics.client_error.inc(1);
|
||||
metrics.duration_client_error.record(duration);
|
||||
}
|
||||
Some(Classification::ServerErr) => {
|
||||
metrics.server_error.inc(1);
|
||||
metrics.duration_server_error.record(duration);
|
||||
}
|
||||
None => metrics.aborted.inc(1),
|
||||
}
|
||||
}
|
||||
}
|
|
@ -12,30 +12,49 @@
|
|||
//! - This Body contains the data payload (potentially streamed)
|
||||
//!
|
||||
|
||||
use crate::ctx::parse_span_ctx;
|
||||
use futures::ready;
|
||||
use http::{Request, Response};
|
||||
use http_body::SizeHint;
|
||||
use observability_deps::tracing::error;
|
||||
use pin_project::pin_project;
|
||||
use std::borrow::Cow;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use futures::ready;
|
||||
use http::{Request, Response};
|
||||
use http_body::SizeHint;
|
||||
use pin_project::pin_project;
|
||||
use tower::{Layer, Service};
|
||||
|
||||
use observability_deps::tracing::error;
|
||||
use trace::{span::SpanRecorder, TraceCollector};
|
||||
|
||||
use crate::classify::{classify_headers, classify_response, Classification};
|
||||
use crate::ctx::parse_span_ctx;
|
||||
use crate::metrics::{MetricsCollection, MetricsRecorder};
|
||||
|
||||
/// `TraceLayer` implements `tower::Layer` and can be used to decorate a
|
||||
/// `tower::Service` to collect information about requests flowing through it
|
||||
///
|
||||
/// Including:
|
||||
///
|
||||
/// - Extracting distributed trace context and attaching span context
|
||||
/// - Collecting count and duration metrics - [RED metrics][1]
|
||||
///
|
||||
/// [1]: https://www.weave.works/blog/the-red-method-key-metrics-for-microservices-architecture/
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TraceLayer {
|
||||
metrics: Arc<MetricsCollection>,
|
||||
collector: Option<Arc<dyn TraceCollector>>,
|
||||
}
|
||||
|
||||
impl TraceLayer {
|
||||
pub fn new(collector: Option<Arc<dyn TraceCollector>>) -> Self {
|
||||
Self { collector }
|
||||
pub fn new(
|
||||
metric_registry: Arc<metric::Registry>,
|
||||
collector: Option<Arc<dyn TraceCollector>>,
|
||||
is_grpc: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
metrics: Arc::new(MetricsCollection::new(metric_registry, is_grpc)),
|
||||
collector,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -46,6 +65,7 @@ impl<S> Layer<S> for TraceLayer {
|
|||
TraceService {
|
||||
service,
|
||||
collector: self.collector.clone(),
|
||||
metrics: Arc::clone(&self.metrics),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -55,6 +75,7 @@ impl<S> Layer<S> for TraceLayer {
|
|||
pub struct TraceService<S> {
|
||||
service: S,
|
||||
collector: Option<Arc<dyn TraceCollector>>,
|
||||
metrics: Arc<MetricsCollection>,
|
||||
}
|
||||
|
||||
impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for TraceService<S>
|
||||
|
@ -71,11 +92,14 @@ where
|
|||
}
|
||||
|
||||
fn call(&mut self, mut request: Request<ReqBody>) -> Self::Future {
|
||||
let metrics_recorder = Some(self.metrics.recorder(&request));
|
||||
|
||||
let collector = match self.collector.as_ref() {
|
||||
Some(collector) => collector,
|
||||
None => {
|
||||
return TracedFuture {
|
||||
recorder: SpanRecorder::new(None),
|
||||
metrics_recorder,
|
||||
span_recorder: SpanRecorder::new(None),
|
||||
inner: self.service.call(request),
|
||||
}
|
||||
}
|
||||
|
@ -99,7 +123,8 @@ where
|
|||
};
|
||||
|
||||
TracedFuture {
|
||||
recorder: SpanRecorder::new(span),
|
||||
metrics_recorder,
|
||||
span_recorder: SpanRecorder::new(span),
|
||||
inner: self.service.call(request),
|
||||
}
|
||||
}
|
||||
|
@ -110,7 +135,8 @@ where
|
|||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct TracedFuture<F> {
|
||||
recorder: SpanRecorder,
|
||||
span_recorder: SpanRecorder,
|
||||
metrics_recorder: Option<MetricsRecorder>,
|
||||
#[pin]
|
||||
inner: F,
|
||||
}
|
||||
|
@ -126,19 +152,34 @@ where
|
|||
let result: Result<Response<ResBody>, Error> =
|
||||
ready!(self.as_mut().project().inner.poll(cx));
|
||||
|
||||
let recorder = self.as_mut().project().recorder;
|
||||
let projected = self.as_mut().project();
|
||||
let span_recorder = projected.span_recorder;
|
||||
let mut metrics_recorder = projected.metrics_recorder.take().unwrap();
|
||||
match &result {
|
||||
Ok(response) => match classify_response(response) {
|
||||
Ok(_) => recorder.event("request processed"),
|
||||
Err(e) => recorder.error(e),
|
||||
(_, Classification::Ok) => match response.body().is_end_stream() {
|
||||
true => {
|
||||
metrics_recorder.set_classification(Classification::Ok);
|
||||
span_recorder.ok("request processed with empty response")
|
||||
}
|
||||
false => span_recorder.event("request processed"),
|
||||
},
|
||||
(error, c) => {
|
||||
metrics_recorder.set_classification(c);
|
||||
span_recorder.error(error);
|
||||
}
|
||||
},
|
||||
Err(_) => recorder.error("error processing request"),
|
||||
Err(_) => {
|
||||
metrics_recorder.set_classification(Classification::ServerErr);
|
||||
span_recorder.error("error processing request")
|
||||
}
|
||||
}
|
||||
|
||||
match result {
|
||||
Ok(response) => Poll::Ready(Ok(response.map(|body| TracedBody {
|
||||
recorder: self.as_mut().project().recorder.take(),
|
||||
span_recorder: self.as_mut().project().span_recorder.take(),
|
||||
inner: body,
|
||||
metrics_recorder,
|
||||
}))),
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
|
@ -149,7 +190,8 @@ where
|
|||
#[pin_project]
|
||||
#[derive(Debug)]
|
||||
pub struct TracedBody<B> {
|
||||
recorder: SpanRecorder,
|
||||
span_recorder: SpanRecorder,
|
||||
metrics_recorder: MetricsRecorder,
|
||||
#[pin]
|
||||
inner: B,
|
||||
}
|
||||
|
@ -168,10 +210,21 @@ impl<B: http_body::Body> http_body::Body for TracedBody<B> {
|
|||
None => return Poll::Ready(None),
|
||||
};
|
||||
|
||||
let recorder = self.as_mut().project().recorder;
|
||||
let projected = self.as_mut().project();
|
||||
let span_recorder = projected.span_recorder;
|
||||
let metrics_recorder = projected.metrics_recorder;
|
||||
match &result {
|
||||
Ok(_) => recorder.event("returned body data"),
|
||||
Err(_) => recorder.error("eos getting body"),
|
||||
Ok(_) => match projected.inner.is_end_stream() {
|
||||
true => {
|
||||
metrics_recorder.set_classification(Classification::Ok);
|
||||
span_recorder.ok("returned body data and no trailers")
|
||||
}
|
||||
false => span_recorder.event("returned body data"),
|
||||
},
|
||||
Err(_) => {
|
||||
metrics_recorder.set_classification(Classification::ServerErr);
|
||||
span_recorder.error("error getting body");
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(result))
|
||||
}
|
||||
|
@ -183,13 +236,24 @@ impl<B: http_body::Body> http_body::Body for TracedBody<B> {
|
|||
let result: Result<Option<http::header::HeaderMap>, Self::Error> =
|
||||
ready!(self.as_mut().project().inner.poll_trailers(cx));
|
||||
|
||||
let recorder = self.as_mut().project().recorder;
|
||||
let projected = self.as_mut().project();
|
||||
let span_recorder = projected.span_recorder;
|
||||
let metrics_recorder = projected.metrics_recorder;
|
||||
match &result {
|
||||
Ok(headers) => match classify_headers(headers.as_ref()) {
|
||||
Ok(_) => recorder.ok("returned trailers"),
|
||||
Err(error) => recorder.error(error),
|
||||
(_, Classification::Ok) => {
|
||||
metrics_recorder.set_classification(Classification::Ok);
|
||||
span_recorder.ok("returned trailers")
|
||||
}
|
||||
(error, c) => {
|
||||
metrics_recorder.set_classification(c);
|
||||
span_recorder.error(error)
|
||||
}
|
||||
},
|
||||
Err(_) => recorder.error("eos getting trailers"),
|
||||
Err(_) => {
|
||||
metrics_recorder.set_classification(Classification::ServerErr);
|
||||
span_recorder.error("error getting trailers")
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Ready(result)
|
||||
|
@ -203,51 +267,3 @@ impl<B: http_body::Body> http_body::Body for TracedBody<B> {
|
|||
self.inner.size_hint()
|
||||
}
|
||||
}
|
||||
|
||||
fn classify_response<B>(response: &http::Response<B>) -> Result<(), Cow<'static, str>> {
|
||||
let status = response.status();
|
||||
match status {
|
||||
http::StatusCode::OK | http::StatusCode::CREATED | http::StatusCode::NO_CONTENT => {
|
||||
classify_headers(Some(response.headers()))
|
||||
}
|
||||
http::StatusCode::BAD_REQUEST => Err("bad request".into()),
|
||||
http::StatusCode::NOT_FOUND => Err("not found".into()),
|
||||
http::StatusCode::INTERNAL_SERVER_ERROR => Err("internal server error".into()),
|
||||
_ => Err(format!("unexpected status code: {}", status).into()),
|
||||
}
|
||||
}
|
||||
|
||||
/// gRPC indicates failure via a [special][1] header allowing it to signal an error
|
||||
/// at the end of an HTTP chunked stream as part of the [response trailer][2]
|
||||
///
|
||||
/// [1]: https://grpc.github.io/grpc/core/md_doc_statuscodes.html
|
||||
/// [2]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer
|
||||
fn classify_headers(headers: Option<&http::header::HeaderMap>) -> Result<(), Cow<'static, str>> {
|
||||
match headers.and_then(|headers| headers.get("grpc-status")) {
|
||||
Some(header) => {
|
||||
let value = header.to_str().map_err(|_| "grpc status not string")?;
|
||||
let value: i32 = value.parse().map_err(|_| "grpc status not integer")?;
|
||||
match value {
|
||||
0 => Ok(()),
|
||||
1 => Err("cancelled".into()),
|
||||
2 => Err("unknown".into()),
|
||||
3 => Err("invalid argument".into()),
|
||||
4 => Err("deadline exceeded".into()),
|
||||
5 => Err("not found".into()),
|
||||
6 => Err("already exists".into()),
|
||||
7 => Err("permission denied".into()),
|
||||
8 => Err("resource exhausted".into()),
|
||||
9 => Err("failed precondition".into()),
|
||||
10 => Err("aborted".into()),
|
||||
11 => Err("out of range".into()),
|
||||
12 => Err("unimplemented".into()),
|
||||
13 => Err("internal".into()),
|
||||
14 => Err("unavailable".into()),
|
||||
15 => Err("data loss".into()),
|
||||
16 => Err("unauthenticated".into()),
|
||||
_ => Err(format!("unrecognised status code: {}", value).into()),
|
||||
}
|
||||
}
|
||||
None => Ok(()),
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue