refactor: track ingest metrics in one place (#1503)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
91a45fd380
commit
4f0e46bcd5
|
@ -566,18 +566,6 @@ impl<M: ConnectionManager> Server<M> {
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let num_fields: usize = lines.iter().map(|line| line.field_set.len()).sum();
|
|
||||||
let labels = &[
|
|
||||||
metrics::KeyValue::new("status", "ok"),
|
|
||||||
metrics::KeyValue::new("db_name", db_name.to_string()),
|
|
||||||
];
|
|
||||||
self.metrics
|
|
||||||
.ingest_lines_total
|
|
||||||
.add_with_labels(lines.len() as u64, labels);
|
|
||||||
self.metrics
|
|
||||||
.ingest_fields_total
|
|
||||||
.add_with_labels(num_fields as u64, labels);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -458,7 +458,16 @@ where
|
||||||
|
|
||||||
let body = str::from_utf8(&body).context(ReadingBodyAsUtf8)?;
|
let body = str::from_utf8(&body).context(ReadingBodyAsUtf8)?;
|
||||||
|
|
||||||
|
let mut num_fields = 0;
|
||||||
|
let mut num_lines = 0;
|
||||||
|
|
||||||
let lines = parse_lines(body)
|
let lines = parse_lines(body)
|
||||||
|
.inspect(|line| {
|
||||||
|
if let Ok(line) = line {
|
||||||
|
num_fields += line.field_set.len();
|
||||||
|
num_lines += 1;
|
||||||
|
}
|
||||||
|
})
|
||||||
.collect::<Result<Vec<_>, influxdb_line_protocol::Error>>()
|
.collect::<Result<Vec<_>, influxdb_line_protocol::Error>>()
|
||||||
.context(ParsingLineProtocol)?;
|
.context(ParsingLineProtocol)?;
|
||||||
|
|
||||||
|
@ -471,19 +480,21 @@ where
|
||||||
];
|
];
|
||||||
|
|
||||||
server.write_lines(&db_name, &lines).await.map_err(|e| {
|
server.write_lines(&db_name, &lines).await.map_err(|e| {
|
||||||
let num_fields: usize = lines.iter().map(|line| line.field_set.len()).sum();
|
|
||||||
let labels = &[
|
let labels = &[
|
||||||
metrics::KeyValue::new("status", "error"),
|
metrics::KeyValue::new("status", "error"),
|
||||||
metrics::KeyValue::new("db_name", db_name.to_string()),
|
metrics::KeyValue::new("db_name", db_name.to_string()),
|
||||||
];
|
];
|
||||||
|
|
||||||
server
|
server
|
||||||
.metrics
|
.metrics
|
||||||
.ingest_lines_total
|
.ingest_lines_total
|
||||||
.add_with_labels(lines.len() as u64, labels);
|
.add_with_labels(num_lines as u64, labels);
|
||||||
|
|
||||||
server
|
server
|
||||||
.metrics
|
.metrics
|
||||||
.ingest_fields_total
|
.ingest_fields_total
|
||||||
.add_with_labels(num_fields as u64, labels);
|
.add_with_labels(num_fields as u64, labels);
|
||||||
|
|
||||||
server.metrics.ingest_points_bytes_total.add_with_labels(
|
server.metrics.ingest_points_bytes_total.add_with_labels(
|
||||||
body.len() as u64,
|
body.len() as u64,
|
||||||
&[
|
&[
|
||||||
|
@ -491,7 +502,6 @@ where
|
||||||
metrics::KeyValue::new("db_name", db_name.to_string()),
|
metrics::KeyValue::new("db_name", db_name.to_string()),
|
||||||
],
|
],
|
||||||
);
|
);
|
||||||
let num_lines = lines.len();
|
|
||||||
debug!(?e, ?db_name, ?num_lines, "error writing lines");
|
debug!(?e, ?db_name, ?num_lines, "error writing lines");
|
||||||
|
|
||||||
obs.client_error_with_labels(&metric_kv); // user error
|
obs.client_error_with_labels(&metric_kv); // user error
|
||||||
|
@ -506,14 +516,27 @@ where
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
let labels = &[
|
||||||
|
metrics::KeyValue::new("status", "ok"),
|
||||||
|
metrics::KeyValue::new("db_name", db_name.to_string()),
|
||||||
|
];
|
||||||
|
|
||||||
|
server
|
||||||
|
.metrics
|
||||||
|
.ingest_lines_total
|
||||||
|
.add_with_labels(num_lines as u64, labels);
|
||||||
|
|
||||||
|
server
|
||||||
|
.metrics
|
||||||
|
.ingest_fields_total
|
||||||
|
.add_with_labels(num_fields as u64, labels);
|
||||||
|
|
||||||
// line protocol bytes successfully written
|
// line protocol bytes successfully written
|
||||||
server.metrics.ingest_points_bytes_total.add_with_labels(
|
server
|
||||||
body.len() as u64,
|
.metrics
|
||||||
&[
|
.ingest_points_bytes_total
|
||||||
metrics::KeyValue::new("status", "ok"),
|
.add_with_labels(body.len() as u64, labels);
|
||||||
metrics::KeyValue::new("db_name", db_name.to_string()),
|
|
||||||
],
|
|
||||||
);
|
|
||||||
|
|
||||||
obs.ok_with_labels(&metric_kv); // request completed successfully
|
obs.ok_with_labels(&metric_kv); // request completed successfully
|
||||||
Ok(Response::builder()
|
Ok(Response::builder()
|
||||||
|
|
Loading…
Reference in New Issue