feat: metric for ingest wall-clock time

pull/24376/head
Marco Neumann 2021-07-26 16:50:20 +02:00
parent e736bc6953
commit 7b1301851a
1 changed files with 28 additions and 2 deletions

View File

@ -205,6 +205,12 @@ impl WriteBufferIngestMetrics {
"Maximum timestamp of last write as unix timestamp in nanoseconds", "Maximum timestamp of last write as unix timestamp in nanoseconds",
&labels, &labels,
); );
let last_ingest_ts = self.domain.register_gauge_metric_with_labels(
"last_ingest_ts",
None,
"Last seen ingest timestamp as unix timestamp in nanoseconds",
&labels,
);
SequencerMetrics { SequencerMetrics {
red, red,
@ -213,6 +219,7 @@ impl WriteBufferIngestMetrics {
sequence_number_lag, sequence_number_lag,
last_min_ts, last_min_ts,
last_max_ts, last_max_ts,
last_ingest_ts,
} }
} }
} }
@ -240,6 +247,9 @@ struct SequencerMetrics {
/// Maximum timestamp of last write as unix timestamp in nanoseconds. /// Maximum timestamp of last write as unix timestamp in nanoseconds.
last_max_ts: metrics::Gauge, last_max_ts: metrics::Gauge,
/// Last seen ingest timestamp as unix timestamp in nanoseconds.
last_ingest_ts: metrics::Gauge,
} }
/// This is the main IOx Database object. It is the root object of any /// This is the main IOx Database object. It is the root object of any
@ -869,6 +879,7 @@ impl Db {
// - lag // - lag
// - min ts // - min ts
// - max ts // - max ts
// - ingest ts
let sequence = sequenced_entry let sequence = sequenced_entry
.sequence() .sequence()
.expect("entry from write buffer must be sequenced"); .expect("entry from write buffer must be sequenced");
@ -923,6 +934,9 @@ impl Db {
.last_max_ts .last_max_ts
.set(max_ts.timestamp_nanos() as usize, &[]); .set(max_ts.timestamp_nanos() as usize, &[]);
} }
metrics
.last_ingest_ts
.set(sequence.ingest_timestamp.timestamp_nanos() as usize, &[]);
} }
} }
@ -1468,16 +1482,18 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn read_from_write_buffer_write_to_mutable_buffer() { async fn read_from_write_buffer_write_to_mutable_buffer() {
let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1); let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1);
let ingest_ts1 = Utc.timestamp_millis(42);
let ingest_ts2 = Utc.timestamp_millis(1337);
write_buffer_state.push_entry( write_buffer_state.push_entry(
SequencedEntry::new_from_sequence( SequencedEntry::new_from_sequence(
Sequence::new(0, 0, Utc::now()), Sequence::new(0, 0, ingest_ts1),
lp_to_entry("mem foo=1 10"), lp_to_entry("mem foo=1 10"),
) )
.unwrap(), .unwrap(),
); );
write_buffer_state.push_entry( write_buffer_state.push_entry(
SequencedEntry::new_from_sequence( SequencedEntry::new_from_sequence(
Sequence::new(0, 7, Utc::now()), Sequence::new(0, 7, ingest_ts2),
lp_to_entry("cpu bar=2 20\ncpu bar=3 30"), lp_to_entry("cpu bar=2 20\ncpu bar=3 30"),
) )
.unwrap(), .unwrap(),
@ -1583,6 +1599,16 @@ mod tests {
.gauge() .gauge()
.eq(30.0) .eq(30.0)
.unwrap(); .unwrap();
metrics
.has_metric_family("write_buffer_last_ingest_ts")
.with_labels(&[
("db_name", "placeholder"),
("svr_id", "1"),
("sequencer_id", "0"),
])
.gauge()
.eq(ingest_ts2.timestamp_nanos() as f64)
.unwrap();
// do: stop background task loop // do: stop background task loop
shutdown.cancel(); shutdown.cancel();