Merge branch 'main' into crepererum/fix_job_metrics
commit
a9e2ed4c14
|
@ -98,11 +98,16 @@ pub struct DurationHistogram {
|
|||
|
||||
impl DurationHistogram {
|
||||
pub fn record(&self, value: Duration) {
|
||||
self.inner.record(
|
||||
self.record_multiple(value, 1)
|
||||
}
|
||||
|
||||
pub fn record_multiple(&self, value: Duration, count: u64) {
|
||||
self.inner.record_multiple(
|
||||
value
|
||||
.as_nanos()
|
||||
.try_into()
|
||||
.expect("cannot fit duration into u64"),
|
||||
count,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -50,6 +50,10 @@ impl U64Histogram {
|
|||
}
|
||||
|
||||
pub fn record(&self, value: u64) {
|
||||
self.record_multiple(value, 1)
|
||||
}
|
||||
|
||||
pub fn record_multiple(&self, value: u64, count: u64) {
|
||||
let mut state = self.shared.lock();
|
||||
if let Some(bucket) = state
|
||||
.buckets
|
||||
|
@ -57,8 +61,8 @@ impl U64Histogram {
|
|||
.find(|bucket| value <= bucket.le)
|
||||
.as_mut()
|
||||
{
|
||||
bucket.count = bucket.count.wrapping_add(1);
|
||||
state.total = state.total.wrapping_add(value);
|
||||
bucket.count = bucket.count.wrapping_add(count);
|
||||
state.total = state.total.wrapping_add(value * count);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2319,20 +2319,27 @@ mod tests {
|
|||
summary.record(Utc.timestamp_nanos(650000000000));
|
||||
summary.record(Utc.timestamp_nanos(650000000010));
|
||||
|
||||
for (minute, count) in summary.cumulative_counts() {
|
||||
let minute = (minute * 60).to_string();
|
||||
test_db
|
||||
.metric_registry
|
||||
.has_metric_family("catalog_row_time_seconds_bucket")
|
||||
.with_attributes(&[
|
||||
("svr_id", "1"),
|
||||
("db_name", "placeholder"),
|
||||
("table", "write_metrics_test"),
|
||||
("le", minute.as_str()),
|
||||
])
|
||||
.counter()
|
||||
.eq(count as _)
|
||||
.unwrap();
|
||||
let mut reporter = metric::RawReporter::default();
|
||||
test_db.metrics_registry_v2.report(&mut reporter);
|
||||
|
||||
let observation = reporter
|
||||
.metric("catalog_row_time")
|
||||
.unwrap()
|
||||
.observation(&[("db_name", "placeholder"), ("table", "write_metrics_test")])
|
||||
.unwrap();
|
||||
|
||||
let histogram = match observation {
|
||||
Observation::DurationHistogram(histogram) => histogram,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
assert_eq!(histogram.buckets.len(), 60);
|
||||
|
||||
for ((minute, count), observation) in
|
||||
summary.counts.iter().enumerate().zip(&histogram.buckets)
|
||||
{
|
||||
let minute = Duration::from_secs((minute * 60) as u64);
|
||||
assert_eq!(observation.le, minute);
|
||||
assert_eq!(*count as u64, observation.count)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -242,7 +242,7 @@ pub struct ChunkMetrics {
|
|||
pub(super) memory_metrics: StorageGauge,
|
||||
|
||||
/// Track ingested timestamps
|
||||
pub(super) timestamp_histogram: TimestampHistogram,
|
||||
pub(super) timestamp_histogram: Option<TimestampHistogram>,
|
||||
}
|
||||
|
||||
impl ChunkMetrics {
|
||||
|
@ -497,7 +497,9 @@ impl CatalogChunk {
|
|||
/// `time_of_write` is the wall clock time of the write
|
||||
/// `timestamps` is a summary of the row timestamps contained in the write
|
||||
pub fn record_write(&mut self, time_of_write: DateTime<Utc>, timestamps: &TimestampSummary) {
|
||||
self.metrics.timestamp_histogram.add(timestamps);
|
||||
if let Some(timestamp_histogram) = self.metrics.timestamp_histogram.as_ref() {
|
||||
timestamp_histogram.add(timestamps)
|
||||
}
|
||||
self.access_recorder.record_access_now();
|
||||
|
||||
self.time_of_first_write = self.time_of_first_write.min(time_of_write);
|
||||
|
|
|
@ -1,10 +1,15 @@
|
|||
use crate::db::catalog::chunk::ChunkMetrics;
|
||||
use data_types::write_summary::TimestampSummary;
|
||||
use metrics::{Counter, Gauge, GaugeValue, Histogram, KeyValue, MetricObserverBuilder};
|
||||
use parking_lot::Mutex;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use parking_lot::Mutex;
|
||||
|
||||
use data_types::write_summary::TimestampSummary;
|
||||
use metric::{Attributes, DurationHistogram, DurationHistogramOptions};
|
||||
use metrics::{Counter, Gauge, GaugeValue, Histogram, KeyValue};
|
||||
use tracker::{LockMetrics, RwLock};
|
||||
|
||||
use crate::db::catalog::chunk::ChunkMetrics;
|
||||
|
||||
const TIMESTAMP_METRICS_ENABLE_ENV: &str = "INFLUXDB_IOX_ROW_TIMESTAMP_METRICS";
|
||||
fn report_timestamp_metrics(table_name: &str) -> bool {
|
||||
std::env::var(TIMESTAMP_METRICS_ENABLE_ENV)
|
||||
|
@ -53,26 +58,29 @@ impl CatalogMetrics {
|
|||
}
|
||||
|
||||
pub(super) fn new_table_metrics(&self, table_name: &str) -> TableMetrics {
|
||||
let mut attributes = metric::Attributes::from([
|
||||
let base_attributes = metric::Attributes::from([
|
||||
("db_name", self.db_name.to_string().into()),
|
||||
("table", table_name.to_string().into()),
|
||||
]);
|
||||
|
||||
attributes.insert("lock", "table");
|
||||
let mut lock_attributes = base_attributes.clone();
|
||||
lock_attributes.insert("lock", "table");
|
||||
let table_lock_metrics = Arc::new(LockMetrics::new(
|
||||
self.metrics_registry.as_ref(),
|
||||
attributes.clone(),
|
||||
lock_attributes.clone(),
|
||||
));
|
||||
|
||||
attributes.insert("lock", "partition");
|
||||
lock_attributes.insert("lock", "partition");
|
||||
let partition_lock_metrics = Arc::new(LockMetrics::new(
|
||||
self.metrics_registry.as_ref(),
|
||||
attributes.clone(),
|
||||
lock_attributes.clone(),
|
||||
));
|
||||
|
||||
attributes.insert("lock", "chunk");
|
||||
let chunk_lock_metrics =
|
||||
Arc::new(LockMetrics::new(self.metrics_registry.as_ref(), attributes));
|
||||
lock_attributes.insert("lock", "chunk");
|
||||
let chunk_lock_metrics = Arc::new(LockMetrics::new(
|
||||
self.metrics_registry.as_ref(),
|
||||
lock_attributes,
|
||||
));
|
||||
|
||||
let storage_gauge = self.metrics_domain.register_gauge_metric_with_attributes(
|
||||
"loaded",
|
||||
|
@ -88,14 +96,8 @@ impl CatalogMetrics {
|
|||
&[KeyValue::new("table", table_name.to_string())],
|
||||
);
|
||||
|
||||
let timestamp_histogram = Default::default();
|
||||
if report_timestamp_metrics(table_name) {
|
||||
self.metrics_domain.register_observer(
|
||||
None,
|
||||
&[KeyValue::new("table", table_name.to_string())],
|
||||
×tamp_histogram,
|
||||
);
|
||||
}
|
||||
let timestamp_histogram = report_timestamp_metrics(table_name)
|
||||
.then(|| TimestampHistogram::new(self.metrics_registry.as_ref(), base_attributes));
|
||||
|
||||
TableMetrics {
|
||||
metrics_domain: Arc::clone(&self.metrics_domain),
|
||||
|
@ -134,7 +136,7 @@ pub struct TableMetrics {
|
|||
chunk_lock_metrics: Arc<LockMetrics>,
|
||||
|
||||
/// Track ingested timestamps
|
||||
timestamp_histogram: TimestampHistogram,
|
||||
timestamp_histogram: Option<TimestampHistogram>,
|
||||
}
|
||||
|
||||
impl TableMetrics {
|
||||
|
@ -192,7 +194,7 @@ pub struct PartitionMetrics {
|
|||
chunk_lock_metrics: Arc<LockMetrics>,
|
||||
|
||||
/// Track ingested timestamps
|
||||
timestamp_histogram: TimestampHistogram,
|
||||
timestamp_histogram: Option<TimestampHistogram>,
|
||||
}
|
||||
|
||||
impl PartitionMetrics {
|
||||
|
@ -337,35 +339,34 @@ impl StorageGauge {
|
|||
|
||||
/// A Histogram-inspired metric for reporting `TimestampSummary`
|
||||
///
|
||||
/// This is partly to workaround limitations defining custom Histogram bucketing in OTEL
|
||||
/// and also because it can be implemented more efficiently as the set of values is fixed
|
||||
///
|
||||
/// Like `TimestampSummary`, this is bucketed based on minute within the hour
|
||||
/// It will therefore wrap around on the hour
|
||||
#[derive(Debug, Clone, Default)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub(super) struct TimestampHistogram {
|
||||
inner: Arc<Mutex<TimestampSummary>>,
|
||||
inner: metric::DurationHistogram,
|
||||
}
|
||||
|
||||
impl TimestampHistogram {
|
||||
pub(super) fn add(&self, summary: &TimestampSummary) {
|
||||
self.inner.lock().merge(summary)
|
||||
}
|
||||
}
|
||||
|
||||
impl metrics::MetricObserver for &TimestampHistogram {
|
||||
fn register(self, builder: MetricObserverBuilder<'_>) {
|
||||
let inner = Arc::clone(&self.inner);
|
||||
builder.register_histogram_bucket(
|
||||
"row_time",
|
||||
Some("seconds"),
|
||||
fn new(registry: &metric::Registry, attributes: impl Into<Attributes>) -> Self {
|
||||
let histogram: metric::Metric<DurationHistogram> = registry.register_metric_with_options(
|
||||
"catalog_row_time",
|
||||
"The cumulative distribution of the ingested row timestamps",
|
||||
move |result| {
|
||||
let inner = inner.lock();
|
||||
for (min, total) in inner.cumulative_counts() {
|
||||
result.observe(total, &[KeyValue::new("le", (min * 60).to_string())])
|
||||
}
|
||||
|| {
|
||||
DurationHistogramOptions::new(
|
||||
(0..60).map(|minute| Duration::from_secs(minute * 60)),
|
||||
)
|
||||
},
|
||||
)
|
||||
);
|
||||
|
||||
Self {
|
||||
inner: histogram.recorder(attributes),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn add(&self, summary: &TimestampSummary) {
|
||||
for (minute, count) in summary.counts.iter().enumerate() {
|
||||
self.inner
|
||||
.record_multiple(Duration::from_secs(minute as u64 * 60), *count as u64)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ pub async fn test_row_timestamp() {
|
|||
let db_name_attribute = format!("db_name=\"{}\"", db_name);
|
||||
|
||||
// Should only be enabled for the system table
|
||||
assert_eq!(lines.len(), 60);
|
||||
assert_eq!(lines.len(), 61);
|
||||
assert!(lines
|
||||
.iter()
|
||||
.all(|x| x.contains("table=\"system\"") && x.contains(&db_name_attribute)));
|
||||
|
|
Loading…
Reference in New Issue