Merge branch 'main' into dependabot/cargo/tokio-1.11.0
commit
f6e040df3d
|
@ -4107,7 +4107,6 @@ dependencies = [
|
||||||
"snafu",
|
"snafu",
|
||||||
"snap",
|
"snap",
|
||||||
"test_helpers",
|
"test_helpers",
|
||||||
"tikv-jemalloc-ctl",
|
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
"trace",
|
"trace",
|
||||||
|
|
|
@ -43,7 +43,7 @@
|
||||||
//! associated with a single `Observation`.
|
//! associated with a single `Observation`.
|
||||||
//!
|
//!
|
||||||
//! It is common for each set of `Attributes` to be recorded independently despite sharing
|
//! It is common for each set of `Attributes` to be recorded independently despite sharing
|
||||||
//! the same `Instrument` name. To avoid code duplication `Metric` encodes this scenario.
|
//! the same metric name. `Metric` is an `Instrument` that encodes this scenario.
|
||||||
//!
|
//!
|
||||||
//! A `MetricObserver` is an object that reports a single `Observation`.
|
//! A `MetricObserver` is an object that reports a single `Observation`.
|
||||||
//!
|
//!
|
||||||
|
@ -197,12 +197,22 @@ impl Registry {
|
||||||
|
|
||||||
/// If an instrument already exists with the provided `name`, returns it
|
/// If an instrument already exists with the provided `name`, returns it
|
||||||
///
|
///
|
||||||
/// Otherwise, invokes `create` to create a new `Instrument`,
|
/// Otherwise, invokes `create` to create a new `Instrument`, stores it in this `Registry`,
|
||||||
/// stores it in this `Registry` and returns it
|
/// and returns it
|
||||||
|
///
|
||||||
|
/// An application might choose to register a custom Instrument, instead of using a Metric,
|
||||||
|
/// when it wishes to defer some computation to report time. For example, reporting
|
||||||
|
/// metrics from systems that cannot be instrumented with `Metric` directly (e.g. jemalloc)
|
||||||
|
///
|
||||||
|
/// Note: An instrument name is not required to match the metric name(s) reported by the
|
||||||
|
/// instrument, however:
|
||||||
|
///
|
||||||
|
/// - instruments will report in order of their instrument name
|
||||||
|
/// - not all reporters may handle the same metric name being reported multiple times
|
||||||
///
|
///
|
||||||
/// Panics if an `Instrument` has already been registered with this name but a different type
|
/// Panics if an `Instrument` has already been registered with this name but a different type
|
||||||
///
|
///
|
||||||
/// Panics if the metric name is illegal
|
/// Panics if the instrument name is illegal
|
||||||
pub fn register_instrument<F: FnOnce() -> I, I: Instrument + Clone + 'static>(
|
pub fn register_instrument<F: FnOnce() -> I, I: Instrument + Clone + 'static>(
|
||||||
&self,
|
&self,
|
||||||
name: &'static str,
|
name: &'static str,
|
||||||
|
|
|
@ -47,7 +47,6 @@ serde = "1.0"
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
snafu = "0.6"
|
snafu = "0.6"
|
||||||
snap = "1.0.0"
|
snap = "1.0.0"
|
||||||
tikv-jemalloc-ctl = "0.4.0"
|
|
||||||
trace = { path = "../trace" }
|
trace = { path = "../trace" }
|
||||||
tokio = { version = "1.11", features = ["macros", "time"] }
|
tokio = { version = "1.11", features = ["macros", "time"] }
|
||||||
tokio-util = { version = "0.6.3" }
|
tokio-util = { version = "0.6.3" }
|
||||||
|
|
|
@ -86,7 +86,6 @@ use influxdb_line_protocol::ParsedLine;
|
||||||
use internal_types::freezable::Freezable;
|
use internal_types::freezable::Freezable;
|
||||||
use iox_object_store::IoxObjectStore;
|
use iox_object_store::IoxObjectStore;
|
||||||
use lifecycle::LockableChunk;
|
use lifecycle::LockableChunk;
|
||||||
use metrics::{KeyValue, MetricObserverBuilder};
|
|
||||||
use observability_deps::tracing::{error, info, warn};
|
use observability_deps::tracing::{error, info, warn};
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use rand::seq::SliceRandom;
|
use rand::seq::SliceRandom;
|
||||||
|
@ -268,38 +267,6 @@ impl ServerMetrics {
|
||||||
// Server manages multiple domains.
|
// Server manages multiple domains.
|
||||||
let http_domain = registry.register_domain("http");
|
let http_domain = registry.register_domain("http");
|
||||||
let ingest_domain = registry.register_domain("ingest");
|
let ingest_domain = registry.register_domain("ingest");
|
||||||
let jemalloc_domain = registry.register_domain("jemalloc");
|
|
||||||
|
|
||||||
// This isn't really a property of the server, perhaps it should be somewhere else?
|
|
||||||
jemalloc_domain.register_observer(None, &[], |observer: MetricObserverBuilder<'_>| {
|
|
||||||
observer.register_gauge_u64(
|
|
||||||
"memstats",
|
|
||||||
Some("bytes"),
|
|
||||||
"jemalloc memstats",
|
|
||||||
|observer| {
|
|
||||||
use tikv_jemalloc_ctl::{epoch, stats};
|
|
||||||
epoch::advance().unwrap();
|
|
||||||
|
|
||||||
let active = stats::active::read().unwrap();
|
|
||||||
observer.observe(active as u64, &[KeyValue::new("stat", "active")]);
|
|
||||||
|
|
||||||
let allocated = stats::allocated::read().unwrap();
|
|
||||||
observer.observe(allocated as u64, &[KeyValue::new("stat", "alloc")]);
|
|
||||||
|
|
||||||
let metadata = stats::metadata::read().unwrap();
|
|
||||||
observer.observe(metadata as u64, &[KeyValue::new("stat", "metadata")]);
|
|
||||||
|
|
||||||
let mapped = stats::mapped::read().unwrap();
|
|
||||||
observer.observe(mapped as u64, &[KeyValue::new("stat", "mapped")]);
|
|
||||||
|
|
||||||
let resident = stats::resident::read().unwrap();
|
|
||||||
observer.observe(resident as u64, &[KeyValue::new("stat", "resident")]);
|
|
||||||
|
|
||||||
let retained = stats::retained::read().unwrap();
|
|
||||||
observer.observe(retained as u64, &[KeyValue::new("stat", "retained")]);
|
|
||||||
},
|
|
||||||
)
|
|
||||||
});
|
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
http_requests: http_domain.register_red_metric(None),
|
http_requests: http_domain.register_red_metric(None),
|
||||||
|
|
|
@ -16,6 +16,7 @@ use std::{convert::TryFrom, net::SocketAddr, sync::Arc};
|
||||||
use trace::TraceCollector;
|
use trace::TraceCollector;
|
||||||
|
|
||||||
mod http;
|
mod http;
|
||||||
|
mod jemalloc;
|
||||||
mod planner;
|
mod planner;
|
||||||
mod rpc;
|
mod rpc;
|
||||||
pub(crate) mod serving_readiness;
|
pub(crate) mod serving_readiness;
|
||||||
|
@ -177,6 +178,12 @@ pub async fn main(config: Config) -> Result<()> {
|
||||||
std::mem::forget(f);
|
std::mem::forget(f);
|
||||||
|
|
||||||
let application = make_application(&config).await?;
|
let application = make_application(&config).await?;
|
||||||
|
|
||||||
|
// Register jemalloc metrics
|
||||||
|
application
|
||||||
|
.metric_registry_v2()
|
||||||
|
.register_instrument("jemalloc_metrics", jemalloc::JemallocMetrics::new);
|
||||||
|
|
||||||
let app_server = make_server(Arc::clone(&application), &config);
|
let app_server = make_server(Arc::clone(&application), &config);
|
||||||
|
|
||||||
let grpc_listener = grpc_listener(config.grpc_bind_address).await?;
|
let grpc_listener = grpc_listener(config.grpc_bind_address).await?;
|
||||||
|
|
|
@ -0,0 +1,79 @@
|
||||||
|
use std::any::Any;
|
||||||
|
|
||||||
|
use tikv_jemalloc_ctl::{epoch, stats};
|
||||||
|
|
||||||
|
use metric::{Attributes, MetricKind, Observation, Reporter};
|
||||||
|
|
||||||
|
/// A `metric::Instrument` that reports jemalloc memory statistics, specifically:
|
||||||
|
///
|
||||||
|
/// - a u64 gauge called "jemalloc_memstats_bytes"
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct JemallocMetrics {
|
||||||
|
active: Attributes,
|
||||||
|
alloc: Attributes,
|
||||||
|
metadata: Attributes,
|
||||||
|
mapped: Attributes,
|
||||||
|
resident: Attributes,
|
||||||
|
retained: Attributes,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JemallocMetrics {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
active: Attributes::from(&[("stat", "active")]),
|
||||||
|
alloc: Attributes::from(&[("stat", "alloc")]),
|
||||||
|
metadata: Attributes::from(&[("stat", "metadata")]),
|
||||||
|
mapped: Attributes::from(&[("stat", "mapped")]),
|
||||||
|
resident: Attributes::from(&[("stat", "resident")]),
|
||||||
|
retained: Attributes::from(&[("stat", "retained")]),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl metric::Instrument for JemallocMetrics {
|
||||||
|
fn report(&self, reporter: &mut dyn Reporter) {
|
||||||
|
reporter.start_metric(
|
||||||
|
"jemalloc_memstats_bytes",
|
||||||
|
"jemalloc metrics - http://jemalloc.net/jemalloc.3.html#stats.active",
|
||||||
|
MetricKind::U64Gauge,
|
||||||
|
);
|
||||||
|
|
||||||
|
epoch::advance().unwrap();
|
||||||
|
|
||||||
|
reporter.report_observation(
|
||||||
|
&self.active,
|
||||||
|
Observation::U64Gauge(stats::active::read().unwrap() as u64),
|
||||||
|
);
|
||||||
|
|
||||||
|
reporter.report_observation(
|
||||||
|
&self.alloc,
|
||||||
|
Observation::U64Gauge(stats::allocated::read().unwrap() as u64),
|
||||||
|
);
|
||||||
|
|
||||||
|
reporter.report_observation(
|
||||||
|
&self.metadata,
|
||||||
|
Observation::U64Gauge(stats::metadata::read().unwrap() as u64),
|
||||||
|
);
|
||||||
|
|
||||||
|
reporter.report_observation(
|
||||||
|
&self.mapped,
|
||||||
|
Observation::U64Gauge(stats::mapped::read().unwrap() as u64),
|
||||||
|
);
|
||||||
|
|
||||||
|
reporter.report_observation(
|
||||||
|
&self.resident,
|
||||||
|
Observation::U64Gauge(stats::resident::read().unwrap() as u64),
|
||||||
|
);
|
||||||
|
|
||||||
|
reporter.report_observation(
|
||||||
|
&self.retained,
|
||||||
|
Observation::U64Gauge(stats::retained::read().unwrap() as u64),
|
||||||
|
);
|
||||||
|
|
||||||
|
reporter.finish_metric();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn as_any(&self) -> &dyn Any {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,5 +1,6 @@
|
||||||
use crate::common::server_fixture::ServerFixture;
|
use crate::common::server_fixture::ServerFixture;
|
||||||
use crate::end_to_end_cases::scenario::Scenario;
|
use crate::end_to_end_cases::scenario::Scenario;
|
||||||
|
use test_helpers::assert_contains;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
pub async fn test_row_timestamp() {
|
pub async fn test_row_timestamp() {
|
||||||
|
@ -37,3 +38,27 @@ pub async fn test_row_timestamp() {
|
||||||
.iter()
|
.iter()
|
||||||
.all(|x| x.contains("table=\"system\"") && x.contains(&db_name_attribute)));
|
.all(|x| x.contains("table=\"system\"") && x.contains(&db_name_attribute)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
pub async fn test_jemalloc_metrics() {
|
||||||
|
let server_fixture = ServerFixture::create_shared().await;
|
||||||
|
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
let url = format!("{}/metrics", server_fixture.http_base());
|
||||||
|
|
||||||
|
let payload = client.get(&url).send().await.unwrap().text().await.unwrap();
|
||||||
|
|
||||||
|
let lines: Vec<_> = payload
|
||||||
|
.trim()
|
||||||
|
.split('\n')
|
||||||
|
.filter(|x| x.starts_with("jemalloc_memstats_bytes"))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
assert_eq!(lines.len(), 6);
|
||||||
|
assert_contains!(lines[0], "jemalloc_memstats_bytes{stat=\"active\"}");
|
||||||
|
assert_contains!(lines[1], "jemalloc_memstats_bytes{stat=\"alloc\"}");
|
||||||
|
assert_contains!(lines[2], "jemalloc_memstats_bytes{stat=\"metadata\"}");
|
||||||
|
assert_contains!(lines[3], "jemalloc_memstats_bytes{stat=\"mapped\"}");
|
||||||
|
assert_contains!(lines[4], "jemalloc_memstats_bytes{stat=\"resident\"}");
|
||||||
|
assert_contains!(lines[5], "jemalloc_memstats_bytes{stat=\"retained\"}");
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue