From 7b1301851a002bc57d289c5e6a862fd7275b7797 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Mon, 26 Jul 2021 16:50:20 +0200 Subject: [PATCH] feat: metric for ingest wall-clock time --- server/src/db.rs | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/server/src/db.rs b/server/src/db.rs index bccf5c45b7..1b0581a86f 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -205,6 +205,12 @@ impl WriteBufferIngestMetrics { "Maximum timestamp of last write as unix timestamp in nanoseconds", &labels, ); + let last_ingest_ts = self.domain.register_gauge_metric_with_labels( + "last_ingest_ts", + None, + "Last seen ingest timestamp as unix timestamp in nanoseconds", + &labels, + ); SequencerMetrics { red, @@ -213,6 +219,7 @@ impl WriteBufferIngestMetrics { sequence_number_lag, last_min_ts, last_max_ts, + last_ingest_ts, } } } @@ -240,6 +247,9 @@ struct SequencerMetrics { /// Maximum timestamp of last write as unix timestamp in nanoseconds. last_max_ts: metrics::Gauge, + + /// Last seen ingest timestamp as unix timestamp in nanoseconds. + last_ingest_ts: metrics::Gauge, } /// This is the main IOx Database object. It is the root object of any @@ -869,6 +879,7 @@ impl Db { // - lag // - min ts // - max ts + // - ingest ts let sequence = sequenced_entry .sequence() .expect("entry from write buffer must be sequenced"); @@ -923,6 +934,9 @@ impl Db { .last_max_ts .set(max_ts.timestamp_nanos() as usize, &[]); } + metrics + .last_ingest_ts + .set(sequence.ingest_timestamp.timestamp_nanos() as usize, &[]); } } @@ -1468,16 +1482,18 @@ mod tests { #[tokio::test] async fn read_from_write_buffer_write_to_mutable_buffer() { let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1); + let ingest_ts1 = Utc.timestamp_millis(42); + let ingest_ts2 = Utc.timestamp_millis(1337); write_buffer_state.push_entry( SequencedEntry::new_from_sequence( - Sequence::new(0, 0, Utc::now()), + Sequence::new(0, 0, ingest_ts1), lp_to_entry("mem foo=1 10"), ) .unwrap(), ); write_buffer_state.push_entry( SequencedEntry::new_from_sequence( - Sequence::new(0, 7, Utc::now()), + Sequence::new(0, 7, ingest_ts2), lp_to_entry("cpu bar=2 20\ncpu bar=3 30"), ) .unwrap(), @@ -1583,6 +1599,16 @@ mod tests { .gauge() .eq(30.0) .unwrap(); + metrics + .has_metric_family("write_buffer_last_ingest_ts") + .with_labels(&[ + ("db_name", "placeholder"), + ("svr_id", "1"), + ("sequencer_id", "0"), + ]) + .gauge() + .eq(ingest_ts2.timestamp_nanos() as f64) + .unwrap(); // do: stop background task loop shutdown.cancel();