From d3218802ab122638cb6a0d31420f2bcd8494d2a5 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Thu, 22 Apr 2021 18:43:12 +0100 Subject: [PATCH] refactor: more API metric coverage --- metrics/src/tests.rs | 2 +- server/src/lib.rs | 51 ++++++++------- src/influxdb_ioxd/http.rs | 132 +++++++++++++++++++++++++------------- 3 files changed, 115 insertions(+), 70 deletions(-) diff --git a/metrics/src/tests.rs b/metrics/src/tests.rs index ed9820de7c..1d7f2d048d 100644 --- a/metrics/src/tests.rs +++ b/metrics/src/tests.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use snafu::{OptionExt, Snafu}; use observability_deps::prometheus::proto::{ - Counter as PromCounter, Histogram as PromHistogram, Metric, MetricFamily, + Counter as PromCounter, Histogram as PromHistogram, MetricFamily, }; use crate::MetricRegistry; diff --git a/server/src/lib.rs b/server/src/lib.rs index 8765eeee0e..e2003a3d10 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -244,43 +244,34 @@ impl ServerConfig { // A collection of metrics used to instrument the Server. #[derive(Debug)] pub struct ServerMetrics { - /// This metric tracks all requests associated with writes using the HTTP - /// API. - pub write_requests: metrics::RedMetric, + /// This metric tracks all requests to the Server + pub http_requests: metrics::RedMetric, - /// This metric tracks all requests associated with non-write API calls using - /// the HTTP API. - pub api_requests: metrics::RedMetric, + /// The number of LP points written + pub ingest_points_total: metrics::Counter, - /// The number of LP points written via the HTTP API - pub points_written: metrics::Counter, - - /// The number of bytes written via the HTTP API - pub bytes_written: metrics::Counter, - - /// The metrics registry associated with the server. Usually this doesn't - /// need to be held, but in this case the Server needs to expose it via - /// the metrics endpoint. - pub registry: Arc, + /// The number of bytes written + pub ingest_points_bytes_total: metrics::Counter, } impl ServerMetrics { pub fn new(registry: Arc) -> Self { - let domain = registry.register_domain("http"); + // Server manages multiple domains. + let http_domain = registry.register_domain("http"); + let ingest_domain = registry.register_domain("ingest"); + Self { - write_requests: domain.register_red_metric(Some("write")), - api_requests: domain.register_red_metric(Some("api")), - points_written: domain.register_counter_metric( + http_requests: http_domain.register_red_metric(None), + ingest_points_total: ingest_domain.register_counter_metric( "points", None, "total LP points written", ), - bytes_written: domain.register_counter_metric( + ingest_points_bytes_total: ingest_domain.register_counter_metric( "points", Some("bytes".to_owned()), - "total LP bytes written", + "total LP points bytes written", ), - registry, } } } @@ -297,6 +288,11 @@ pub struct Server { exec: Arc, jobs: Arc, pub metrics: Arc, + + /// The metrics registry associated with the server. This is needed not for + /// recording telemetry, but because the server hosts the /metric endpoint + /// and populates the endpoint with this data. + pub registry: Arc, } #[derive(Debug)] @@ -331,6 +327,7 @@ impl Server { exec: Arc::new(Executor::new(num_worker_threads)), jobs, metrics: Arc::new(ServerMetrics::new(Arc::clone(&metric_registry))), + registry: Arc::clone(&metric_registry), } } @@ -494,6 +491,14 @@ impl Server { ) .await?; + self.metrics.ingest_points_total.add_with_labels( + lines.len() as u64, + &[ + metrics::KeyValue::new("status", "ok"), + metrics::KeyValue::new("db_name", db_name.to_string()), + ], + ); + Ok(()) } diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index 1756ef5ef5..d8bc8efa51 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -315,7 +315,7 @@ where Ok(res) })) // this endpoint is for API backward compatibility with InfluxDB 2.x .post("/api/v2/write", write::) - .get("/health", health) + .get("/health", health::) .get("/metrics", handle_metrics::) .get("/iox/api/v1/databases/:name/query", query::) .get( @@ -420,8 +420,11 @@ async fn write(req: Request) -> Result, ApplicationError where M: ConnectionManager + Send + Sync + Debug + 'static, { + let path = req.uri().path().to_string(); let server = Arc::clone(&req.data::>>().expect("server state")); - let obs = server.metrics.write_requests.observation(); // instrument request + + // TODO(edd): figure out best way of catching all errors in this observation. + let obs = server.metrics.http_requests.observation(); // instrument request // TODO - metrics. Implement a macro/something that will catch all the // early returns. @@ -445,18 +448,20 @@ where debug!(num_lines=lines.len(), %db_name, org=%write_info.org, bucket=%write_info.bucket, "inserting lines into database"); - let mut metric_kv = vec![ - KeyValue::new("db_name", db_name.to_string()), + let metric_kv = vec![ KeyValue::new("org", write_info.org.to_string()), KeyValue::new("bucket", write_info.bucket.to_string()), + KeyValue::new("path", path), ]; server.write_lines(&db_name, &lines).await.map_err(|e| { - metric_kv.push(metrics::KeyValue::new("status", "error")); - server - .metrics - .points_written - .add_with_labels(lines.len() as u64, &metric_kv); + server.metrics.ingest_points_total.add_with_labels( + lines.len() as u64, + &[ + metrics::KeyValue::new("status", "error"), + metrics::KeyValue::new("db_name", db_name.to_string()), + ], + ); let num_lines = lines.len(); debug!(?e, ?db_name, ?num_lines, "error writing lines"); @@ -472,19 +477,13 @@ where }, } })?; + // line protocol bytes successfully written + server.metrics.ingest_points_bytes_total.add_with_labels( + body.len() as u64, + &[metrics::KeyValue::new("db_name", db_name.to_string())], + ); - server - .metrics - .bytes_written - .add_with_labels(body.len() as u64, &metric_kv); obs.ok_with_labels(&metric_kv); // request completed successfully - - metric_kv.push(metrics::KeyValue::new("status", "ok")); - server - .metrics - .points_written - .add_with_labels(lines.len() as u64, &metric_kv); - Ok(Response::builder() .status(StatusCode::NO_CONTENT) .body(Body::empty()) @@ -507,8 +506,12 @@ fn default_format() -> String { async fn query( req: Request, ) -> Result, ApplicationError> { + let path = req.uri().path().to_string(); let server = Arc::clone(&req.data::>>().expect("server state")); + // TODO(edd): figure out best way of catching all errors in this observation. + let obs = server.metrics.http_requests.observation(); // instrument request + let uri_query = req.uri().query().context(ExpectedQueryString {})?; let QueryParams { q, format } = @@ -523,6 +526,11 @@ async fn query( .expect("db name must have been set by routerify") .clone(); + let metric_kv = vec![ + KeyValue::new("db_name", db_name_str.clone()), + KeyValue::new("path", path), + ]; + let db_name = DatabaseName::new(&db_name_str).context(DatabaseNameError)?; debug!(uri = ?req.uri(), %q, ?format, %db_name, "running SQL query"); @@ -555,6 +563,9 @@ async fn query( .body(body) .context(CreatingResponse)?; + // successful query + obs.ok_with_labels(&metric_kv); + Ok(response) } @@ -563,12 +574,20 @@ async fn get_write_buffer_meta, ) -> Result, ApplicationError> { let server = Arc::clone(&req.data::>>().expect("server state")); + let path = req.uri().path().to_string(); + // TODO - catch error conditions + let obs = server.metrics.http_requests.observation(); let db_name_str = req .param("name") .expect("db name must have been set") .clone(); + let metric_kv = vec![ + KeyValue::new("db_name", db_name_str.clone()), + KeyValue::new("path", path), + ]; + let query: WriteBufferMetadataQuery = req .uri() .query() @@ -611,11 +630,22 @@ async fn get_write_buffer_meta) -> Result, ApplicationError> { +async fn health( + req: Request, +) -> Result, ApplicationError> { + let server = Arc::clone(&req.data::>>().expect("server state")); + let path = req.uri().path().to_string(); + server + .metrics + .http_requests + .observation() + .ok_with_labels(&[metrics::KeyValue::new("path", path)]); + let response_body = "OK"; Ok(Response::new(Body::from(response_body.to_string()))) } @@ -625,9 +655,13 @@ async fn handle_metrics( req: Request, ) -> Result, ApplicationError> { let server = Arc::clone(&req.data::>>().expect("server state")); - Ok(Response::new(Body::from( - server.metrics.registry.metrics_as_text(), - ))) + let path = req.uri().path().to_string(); + server + .metrics + .http_requests + .observation() + .ok_with_labels(&[metrics::KeyValue::new("path", path)]); + Ok(Response::new(Body::from(server.registry.metrics_as_text()))) } #[derive(Deserialize, Debug)] @@ -641,7 +675,12 @@ struct DatabaseInfo { async fn list_partitions( req: Request, ) -> Result, ApplicationError> { + let path = req.uri().path().to_string(); + let server = Arc::clone(&req.data::>>().expect("server state")); + + // TODO - catch error conditions + let obs = server.metrics.http_requests.observation(); let query = req.uri().query().context(ExpectedQueryString {})?; let info: DatabaseInfo = serde_urlencoded::from_str(query).context(InvalidQueryString { @@ -651,6 +690,11 @@ async fn list_partitions( let db_name = org_and_bucket_to_database(&info.org, &info.bucket).context(BucketMappingError)?; + let metric_kv = vec![ + KeyValue::new("db_name", db_name.to_string()), + KeyValue::new("path", path), + ]; + let db = server.db(&db_name).context(BucketNotFound { org: &info.org, bucket: &info.bucket, @@ -666,6 +710,7 @@ async fn list_partitions( let result = serde_json::to_string(&partition_keys).context(JsonGenerationError)?; + obs.ok_with_labels(&metric_kv); Ok(Response::new(Body::from(result))) } @@ -684,7 +729,10 @@ async fn snapshot_partition Result, ApplicationError> { use object_store::path::ObjectStorePath; + let path = req.uri().path().to_string(); let server = Arc::clone(&req.data::>>().expect("server state")); + // TODO - catch error conditions + let obs = server.metrics.http_requests.observation(); let query = req.uri().query().context(ExpectedQueryString {})?; let snapshot: SnapshotInfo = serde_urlencoded::from_str(query).context(InvalidQueryString { @@ -694,6 +742,11 @@ async fn snapshot_partition