refactor: more API metric coverage

pull/24376/head
Edd Robinson 2021-04-22 18:43:12 +01:00 committed by kodiakhq[bot]
parent 87de656d23
commit d3218802ab
3 changed files with 115 additions and 70 deletions

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use snafu::{OptionExt, Snafu}; use snafu::{OptionExt, Snafu};
use observability_deps::prometheus::proto::{ use observability_deps::prometheus::proto::{
Counter as PromCounter, Histogram as PromHistogram, Metric, MetricFamily, Counter as PromCounter, Histogram as PromHistogram, MetricFamily,
}; };
use crate::MetricRegistry; use crate::MetricRegistry;

View File

@ -244,43 +244,34 @@ impl ServerConfig {
// A collection of metrics used to instrument the Server. // A collection of metrics used to instrument the Server.
#[derive(Debug)] #[derive(Debug)]
pub struct ServerMetrics { pub struct ServerMetrics {
/// This metric tracks all requests associated with writes using the HTTP /// This metric tracks all requests to the Server
/// API. pub http_requests: metrics::RedMetric,
pub write_requests: metrics::RedMetric,
/// This metric tracks all requests associated with non-write API calls using /// The number of LP points written
/// the HTTP API. pub ingest_points_total: metrics::Counter,
pub api_requests: metrics::RedMetric,
/// The number of LP points written via the HTTP API /// The number of bytes written
pub points_written: metrics::Counter, pub ingest_points_bytes_total: metrics::Counter,
/// The number of bytes written via the HTTP API
pub bytes_written: metrics::Counter,
/// The metrics registry associated with the server. Usually this doesn't
/// need to be held, but in this case the Server needs to expose it via
/// the metrics endpoint.
pub registry: Arc<metrics::MetricRegistry>,
} }
impl ServerMetrics { impl ServerMetrics {
pub fn new(registry: Arc<metrics::MetricRegistry>) -> Self { pub fn new(registry: Arc<metrics::MetricRegistry>) -> Self {
let domain = registry.register_domain("http"); // Server manages multiple domains.
let http_domain = registry.register_domain("http");
let ingest_domain = registry.register_domain("ingest");
Self { Self {
write_requests: domain.register_red_metric(Some("write")), http_requests: http_domain.register_red_metric(None),
api_requests: domain.register_red_metric(Some("api")), ingest_points_total: ingest_domain.register_counter_metric(
points_written: domain.register_counter_metric(
"points", "points",
None, None,
"total LP points written", "total LP points written",
), ),
bytes_written: domain.register_counter_metric( ingest_points_bytes_total: ingest_domain.register_counter_metric(
"points", "points",
Some("bytes".to_owned()), Some("bytes".to_owned()),
"total LP bytes written", "total LP points bytes written",
), ),
registry,
} }
} }
} }
@ -297,6 +288,11 @@ pub struct Server<M: ConnectionManager> {
exec: Arc<Executor>, exec: Arc<Executor>,
jobs: Arc<JobRegistry>, jobs: Arc<JobRegistry>,
pub metrics: Arc<ServerMetrics>, pub metrics: Arc<ServerMetrics>,
/// The metrics registry associated with the server. This is needed not for
/// recording telemetry, but because the server hosts the /metric endpoint
/// and populates the endpoint with this data.
pub registry: Arc<metrics::MetricRegistry>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -331,6 +327,7 @@ impl<M: ConnectionManager> Server<M> {
exec: Arc::new(Executor::new(num_worker_threads)), exec: Arc::new(Executor::new(num_worker_threads)),
jobs, jobs,
metrics: Arc::new(ServerMetrics::new(Arc::clone(&metric_registry))), metrics: Arc::new(ServerMetrics::new(Arc::clone(&metric_registry))),
registry: Arc::clone(&metric_registry),
} }
} }
@ -494,6 +491,14 @@ impl<M: ConnectionManager> Server<M> {
) )
.await?; .await?;
self.metrics.ingest_points_total.add_with_labels(
lines.len() as u64,
&[
metrics::KeyValue::new("status", "ok"),
metrics::KeyValue::new("db_name", db_name.to_string()),
],
);
Ok(()) Ok(())
} }

View File

@ -315,7 +315,7 @@ where
Ok(res) Ok(res)
})) // this endpoint is for API backward compatibility with InfluxDB 2.x })) // this endpoint is for API backward compatibility with InfluxDB 2.x
.post("/api/v2/write", write::<M>) .post("/api/v2/write", write::<M>)
.get("/health", health) .get("/health", health::<M>)
.get("/metrics", handle_metrics::<M>) .get("/metrics", handle_metrics::<M>)
.get("/iox/api/v1/databases/:name/query", query::<M>) .get("/iox/api/v1/databases/:name/query", query::<M>)
.get( .get(
@ -420,8 +420,11 @@ async fn write<M>(req: Request<Body>) -> Result<Response<Body>, ApplicationError
where where
M: ConnectionManager + Send + Sync + Debug + 'static, M: ConnectionManager + Send + Sync + Debug + 'static,
{ {
let path = req.uri().path().to_string();
let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state")); let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state"));
let obs = server.metrics.write_requests.observation(); // instrument request
// TODO(edd): figure out best way of catching all errors in this observation.
let obs = server.metrics.http_requests.observation(); // instrument request
// TODO - metrics. Implement a macro/something that will catch all the // TODO - metrics. Implement a macro/something that will catch all the
// early returns. // early returns.
@ -445,18 +448,20 @@ where
debug!(num_lines=lines.len(), %db_name, org=%write_info.org, bucket=%write_info.bucket, "inserting lines into database"); debug!(num_lines=lines.len(), %db_name, org=%write_info.org, bucket=%write_info.bucket, "inserting lines into database");
let mut metric_kv = vec![ let metric_kv = vec![
KeyValue::new("db_name", db_name.to_string()),
KeyValue::new("org", write_info.org.to_string()), KeyValue::new("org", write_info.org.to_string()),
KeyValue::new("bucket", write_info.bucket.to_string()), KeyValue::new("bucket", write_info.bucket.to_string()),
KeyValue::new("path", path),
]; ];
server.write_lines(&db_name, &lines).await.map_err(|e| { server.write_lines(&db_name, &lines).await.map_err(|e| {
metric_kv.push(metrics::KeyValue::new("status", "error")); server.metrics.ingest_points_total.add_with_labels(
server lines.len() as u64,
.metrics &[
.points_written metrics::KeyValue::new("status", "error"),
.add_with_labels(lines.len() as u64, &metric_kv); metrics::KeyValue::new("db_name", db_name.to_string()),
],
);
let num_lines = lines.len(); let num_lines = lines.len();
debug!(?e, ?db_name, ?num_lines, "error writing lines"); debug!(?e, ?db_name, ?num_lines, "error writing lines");
@ -472,19 +477,13 @@ where
}, },
} }
})?; })?;
// line protocol bytes successfully written
server.metrics.ingest_points_bytes_total.add_with_labels(
body.len() as u64,
&[metrics::KeyValue::new("db_name", db_name.to_string())],
);
server
.metrics
.bytes_written
.add_with_labels(body.len() as u64, &metric_kv);
obs.ok_with_labels(&metric_kv); // request completed successfully obs.ok_with_labels(&metric_kv); // request completed successfully
metric_kv.push(metrics::KeyValue::new("status", "ok"));
server
.metrics
.points_written
.add_with_labels(lines.len() as u64, &metric_kv);
Ok(Response::builder() Ok(Response::builder()
.status(StatusCode::NO_CONTENT) .status(StatusCode::NO_CONTENT)
.body(Body::empty()) .body(Body::empty())
@ -507,8 +506,12 @@ fn default_format() -> String {
async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>( async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>, req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> { ) -> Result<Response<Body>, ApplicationError> {
let path = req.uri().path().to_string();
let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state")); let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state"));
// TODO(edd): figure out best way of catching all errors in this observation.
let obs = server.metrics.http_requests.observation(); // instrument request
let uri_query = req.uri().query().context(ExpectedQueryString {})?; let uri_query = req.uri().query().context(ExpectedQueryString {})?;
let QueryParams { q, format } = let QueryParams { q, format } =
@ -523,6 +526,11 @@ async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
.expect("db name must have been set by routerify") .expect("db name must have been set by routerify")
.clone(); .clone();
let metric_kv = vec![
KeyValue::new("db_name", db_name_str.clone()),
KeyValue::new("path", path),
];
let db_name = DatabaseName::new(&db_name_str).context(DatabaseNameError)?; let db_name = DatabaseName::new(&db_name_str).context(DatabaseNameError)?;
debug!(uri = ?req.uri(), %q, ?format, %db_name, "running SQL query"); debug!(uri = ?req.uri(), %q, ?format, %db_name, "running SQL query");
@ -555,6 +563,9 @@ async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
.body(body) .body(body)
.context(CreatingResponse)?; .context(CreatingResponse)?;
// successful query
obs.ok_with_labels(&metric_kv);
Ok(response) Ok(response)
} }
@ -563,12 +574,20 @@ async fn get_write_buffer_meta<M: ConnectionManager + Send + Sync + Debug + 'sta
req: Request<Body>, req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> { ) -> Result<Response<Body>, ApplicationError> {
let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state")); let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state"));
let path = req.uri().path().to_string();
// TODO - catch error conditions
let obs = server.metrics.http_requests.observation();
let db_name_str = req let db_name_str = req
.param("name") .param("name")
.expect("db name must have been set") .expect("db name must have been set")
.clone(); .clone();
let metric_kv = vec![
KeyValue::new("db_name", db_name_str.clone()),
KeyValue::new("path", path),
];
let query: WriteBufferMetadataQuery = req let query: WriteBufferMetadataQuery = req
.uri() .uri()
.query() .query()
@ -611,11 +630,22 @@ async fn get_write_buffer_meta<M: ConnectionManager + Send + Sync + Debug + 'sta
)) ))
.expect("builder should be successful"); .expect("builder should be successful");
obs.ok_with_labels(&metric_kv);
Ok(response) Ok(response)
} }
#[tracing::instrument(level = "debug")] #[tracing::instrument(level = "debug")]
async fn health(_: Request<Body>) -> Result<Response<Body>, ApplicationError> { async fn health<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> {
let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state"));
let path = req.uri().path().to_string();
server
.metrics
.http_requests
.observation()
.ok_with_labels(&[metrics::KeyValue::new("path", path)]);
let response_body = "OK"; let response_body = "OK";
Ok(Response::new(Body::from(response_body.to_string()))) Ok(Response::new(Body::from(response_body.to_string())))
} }
@ -625,9 +655,13 @@ async fn handle_metrics<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>, req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> { ) -> Result<Response<Body>, ApplicationError> {
let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state")); let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state"));
Ok(Response::new(Body::from( let path = req.uri().path().to_string();
server.metrics.registry.metrics_as_text(), server
))) .metrics
.http_requests
.observation()
.ok_with_labels(&[metrics::KeyValue::new("path", path)]);
Ok(Response::new(Body::from(server.registry.metrics_as_text())))
} }
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
@ -641,7 +675,12 @@ struct DatabaseInfo {
async fn list_partitions<M: ConnectionManager + Send + Sync + Debug + 'static>( async fn list_partitions<M: ConnectionManager + Send + Sync + Debug + 'static>(
req: Request<Body>, req: Request<Body>,
) -> Result<Response<Body>, ApplicationError> { ) -> Result<Response<Body>, ApplicationError> {
let path = req.uri().path().to_string();
let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state")); let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state"));
// TODO - catch error conditions
let obs = server.metrics.http_requests.observation();
let query = req.uri().query().context(ExpectedQueryString {})?; let query = req.uri().query().context(ExpectedQueryString {})?;
let info: DatabaseInfo = serde_urlencoded::from_str(query).context(InvalidQueryString { let info: DatabaseInfo = serde_urlencoded::from_str(query).context(InvalidQueryString {
@ -651,6 +690,11 @@ async fn list_partitions<M: ConnectionManager + Send + Sync + Debug + 'static>(
let db_name = let db_name =
org_and_bucket_to_database(&info.org, &info.bucket).context(BucketMappingError)?; org_and_bucket_to_database(&info.org, &info.bucket).context(BucketMappingError)?;
let metric_kv = vec![
KeyValue::new("db_name", db_name.to_string()),
KeyValue::new("path", path),
];
let db = server.db(&db_name).context(BucketNotFound { let db = server.db(&db_name).context(BucketNotFound {
org: &info.org, org: &info.org,
bucket: &info.bucket, bucket: &info.bucket,
@ -666,6 +710,7 @@ async fn list_partitions<M: ConnectionManager + Send + Sync + Debug + 'static>(
let result = serde_json::to_string(&partition_keys).context(JsonGenerationError)?; let result = serde_json::to_string(&partition_keys).context(JsonGenerationError)?;
obs.ok_with_labels(&metric_kv);
Ok(Response::new(Body::from(result))) Ok(Response::new(Body::from(result)))
} }
@ -684,7 +729,10 @@ async fn snapshot_partition<M: ConnectionManager + Send + Sync + Debug + 'static
) -> Result<Response<Body>, ApplicationError> { ) -> Result<Response<Body>, ApplicationError> {
use object_store::path::ObjectStorePath; use object_store::path::ObjectStorePath;
let path = req.uri().path().to_string();
let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state")); let server = Arc::clone(&req.data::<Arc<AppServer<M>>>().expect("server state"));
// TODO - catch error conditions
let obs = server.metrics.http_requests.observation();
let query = req.uri().query().context(ExpectedQueryString {})?; let query = req.uri().query().context(ExpectedQueryString {})?;
let snapshot: SnapshotInfo = serde_urlencoded::from_str(query).context(InvalidQueryString { let snapshot: SnapshotInfo = serde_urlencoded::from_str(query).context(InvalidQueryString {
@ -694,6 +742,11 @@ async fn snapshot_partition<M: ConnectionManager + Send + Sync + Debug + 'static
let db_name = let db_name =
org_and_bucket_to_database(&snapshot.org, &snapshot.bucket).context(BucketMappingError)?; org_and_bucket_to_database(&snapshot.org, &snapshot.bucket).context(BucketMappingError)?;
let metric_kv = vec![
KeyValue::new("db_name", db_name.to_string()),
KeyValue::new("path", path),
];
// TODO: refactor the rest of this out of the http route and into the server // TODO: refactor the rest of this out of the http route and into the server
// crate. // crate.
let db = server.db(&db_name).context(BucketNotFound { let db = server.db(&db_name).context(BucketNotFound {
@ -729,6 +782,7 @@ async fn snapshot_partition<M: ConnectionManager + Send + Sync + Debug + 'static
) )
.unwrap(); .unwrap();
obs.ok_with_labels(&metric_kv);
let ret = format!("{}", snapshot.id); let ret = format!("{}", snapshot.id);
Ok(Response::new(Body::from(ret))) Ok(Response::new(Body::from(ret)))
} }
@ -871,11 +925,11 @@ mod tests {
// The request completed successfully // The request completed successfully
metrics_registry metrics_registry
.has_metric_family("http_write_request_duration_seconds") .has_metric_family("http_request_duration_seconds")
.with_labels(&[ .with_labels(&[
("bucket", "MetricsBucket"), ("bucket", "MetricsBucket"),
("db_name", "MetricsOrg_MetricsBucket"),
("org", "MetricsOrg"), ("org", "MetricsOrg"),
("path", "/api/v2/write"),
("status", "ok"), ("status", "ok"),
]) ])
.histogram() .histogram()
@ -884,25 +938,16 @@ mod tests {
// A single successful point landed // A single successful point landed
metrics_registry metrics_registry
.has_metric_family("http_points_total") .has_metric_family("ingest_points_total")
.with_labels(&[ .with_labels(&[("db_name", "MetricsOrg_MetricsBucket"), ("status", "ok")])
("bucket", "MetricsBucket"),
("db_name", "MetricsOrg_MetricsBucket"),
("org", "MetricsOrg"),
("status", "ok"),
])
.counter() .counter()
.eq(1.0) .eq(1.0)
.unwrap(); .unwrap();
// Bytes of data were written // Bytes of data were written
metrics_registry metrics_registry
.has_metric_family("http_points_bytes_total") .has_metric_family("ingest_points_bytes_total")
.with_labels(&[ .with_labels(&[("db_name", "MetricsOrg_MetricsBucket")])
("bucket", "MetricsBucket"),
("db_name", "MetricsOrg_MetricsBucket"),
("org", "MetricsOrg"),
])
.counter() .counter()
.eq(98.0) .eq(98.0)
.unwrap(); .unwrap();
@ -920,13 +965,8 @@ mod tests {
// A single point was rejected // A single point was rejected
metrics_registry metrics_registry
.has_metric_family("http_points_total") .has_metric_family("ingest_points_total")
.with_labels(&[ .with_labels(&[("db_name", "NotMyOrg_NotMyBucket"), ("status", "error")])
("bucket", "NotMyBucket"),
("db_name", "NotMyOrg_NotMyBucket"),
("org", "NotMyOrg"),
("status", "error"),
])
.counter() .counter()
.eq(1.0) .eq(1.0)
.unwrap(); .unwrap();