refactor: make ingest metrics easier to understand

pull/24376/head
Marco Neumann 2021-07-21 13:57:53 +02:00
parent fb931bb1ca
commit 7d597d1d5c
1 changed files with 64 additions and 52 deletions

View File

@ -168,16 +168,16 @@ impl IngestMetrics {
"Bytes read from sequencer",
labels.clone(),
);
let watermark_iox = self.domain.register_gauge_metric_with_labels(
"watermark_iox",
let last_sequence_number = self.domain.register_gauge_metric_with_labels(
"last_sequence_number",
None,
"High watermark of IOx (aka next sequence number that will be ingested)",
"Last consumed sequence number (e.g. Kafka offset)",
&labels,
);
let watermark_sequencer = self.domain.register_gauge_metric_with_labels(
"watermark_sequencer",
let sequence_number_lag = self.domain.register_gauge_metric_with_labels(
"sequence_number_lag",
None,
"High watermark of the sequencer (aka next sequence number that will be added)",
"The difference between the last consumed sequence number (e.g. Kafka offset) and the last sequence number available",
&labels,
);
let last_min_ts = self.domain.register_gauge_metric_with_labels(
@ -196,8 +196,8 @@ impl IngestMetrics {
SequencerMetrics {
red,
bytes_read,
watermark_iox,
watermark_sequencer,
last_sequence_number,
sequence_number_lag,
last_min_ts,
last_max_ts,
}
@ -215,15 +215,12 @@ struct SequencerMetrics {
/// This metrics is independent of the success / error state of the entries.
bytes_read: metrics::Counter,
/// Watermark of ingested data.
///
/// This represents the next sequence number that will be ingested.
watermark_iox: metrics::Gauge,
/// Last consumed sequence number (e.g. Kafka offset).
last_sequence_number: metrics::Gauge,
/// Watermark of to-be-ingested data.
///
/// This represents the next sequence number that will be added to the sequencer.
watermark_sequencer: metrics::Gauge,
/// The difference between the last consumed sequence number (e.g. Kafka offset) and the last sequence number
/// available.
sequence_number_lag: metrics::Gauge,
/// Minimum unix timestamp of last write as unix timestamp in nanoseconds.
last_min_ts: metrics::Gauge,
@ -803,7 +800,8 @@ impl Db {
f_mark: FetchHighWatermark<'a>,
mut metrics: SequencerMetrics,
) {
let mut last_watermark_update: Option<Instant> = None;
let mut watermark_last_updated: Option<Instant> = None;
let mut watermark = 0;
while let Some(sequenced_entry_result) = stream.next().await {
let red_observation = metrics.red.observation();
@ -833,19 +831,42 @@ impl Db {
}
}
// maybe update sequencer watermark
// We are not updating this watermark every round because asking the sequencer for that watermark can be
// quite expensive.
if watermark_last_updated
.map(|ts| ts.elapsed() > Duration::from_secs(60))
.unwrap_or(true)
{
match f_mark().await {
Ok(w) => {
watermark = w;
}
Err(e) => {
debug!(%e, "Error while reading sequencer watermark")
}
}
watermark_last_updated = Some(Instant::now());
}
// update:
// - bytes read
// - iox watermark
// - last sequence number
// - lag
// - min ts
// - max ts
let sequence = sequenced_entry
.sequence()
.expect("entry from write buffer must be sequenced");
let entry = sequenced_entry.entry();
metrics
.watermark_iox
.set((sequence.number + 1) as usize, &[]);
metrics.bytes_read.add(entry.data().len() as u64);
metrics
.last_sequence_number
.set(sequence.number as usize, &[]);
metrics.sequence_number_lag.set(
watermark.saturating_sub(sequence.number).saturating_sub(1) as usize,
&[],
);
if let Some(min_ts) = entry
.partition_writes()
.map(|partition_writes| {
@ -888,24 +909,6 @@ impl Db {
.last_max_ts
.set(max_ts.timestamp_nanos() as usize, &[]);
}
// maybe update sequencer watermark
// We are not updating this watermark every round because asking the sequencer for that watermark can be
// quite expensive.
if last_watermark_update
.map(|ts| ts.elapsed() > Duration::from_secs(60))
.unwrap_or(true)
{
match f_mark().await {
Ok(watermark) => {
metrics.watermark_sequencer.set(watermark as usize, &[]);
}
Err(e) => {
debug!(%e, "Error while reading sequencer watermark")
}
}
last_watermark_update = Some(Instant::now());
}
}
}
@ -1414,10 +1417,18 @@ mod tests {
#[tokio::test]
async fn read_from_write_buffer_write_to_mutable_buffer() {
let entry = lp_to_entry("cpu bar=1 10");
let write_buffer_state = MockBufferSharedState::empty_with_n_sequencers(1);
write_buffer_state
.push_entry(SequencedEntry::new_from_sequence(Sequence::new(0, 0), entry).unwrap());
write_buffer_state.push_entry(
SequencedEntry::new_from_sequence(Sequence::new(0, 0), lp_to_entry("mem foo=1 10"))
.unwrap(),
);
write_buffer_state.push_entry(
SequencedEntry::new_from_sequence(
Sequence::new(0, 7),
lp_to_entry("cpu bar=2 20\ncpu bar=3 30"),
)
.unwrap(),
);
let write_buffer = MockBufferForReading::new(write_buffer_state);
let test_db = TestDb::builder()
@ -1467,7 +1478,7 @@ mod tests {
("status", "ok"),
])
.counter()
.eq(1.0)
.eq(2.0)
.unwrap();
metrics
.has_metric_family("ingest_read_bytes_total")
@ -1477,27 +1488,27 @@ mod tests {
("sequencer_id", "0"),
])
.counter()
.eq(256.0)
.eq(528.0)
.unwrap();
metrics
.has_metric_family("ingest_watermark_iox")
.has_metric_family("ingest_last_sequence_number")
.with_labels(&[
("db_name", "placeholder"),
("svr_id", "1"),
("sequencer_id", "0"),
])
.gauge()
.eq(1.0)
.eq(7.0)
.unwrap();
metrics
.has_metric_family("ingest_watermark_sequencer")
.has_metric_family("ingest_sequence_number_lag")
.with_labels(&[
("db_name", "placeholder"),
("svr_id", "1"),
("sequencer_id", "0"),
])
.gauge()
.eq(1.0)
.eq(0.0)
.unwrap();
metrics
.has_metric_family("ingest_last_min_ts")
@ -1507,7 +1518,7 @@ mod tests {
("sequencer_id", "0"),
])
.gauge()
.eq(10.0)
.eq(20.0)
.unwrap();
metrics
.has_metric_family("ingest_last_max_ts")
@ -1517,7 +1528,7 @@ mod tests {
("sequencer_id", "0"),
])
.gauge()
.eq(10.0)
.eq(30.0)
.unwrap();
// do: stop background task loop
@ -1525,13 +1536,14 @@ mod tests {
join_handle.await.unwrap();
// check: the expected results should be there
let batches = run_query(db, "select * from cpu").await;
let batches = run_query(db, "select * from cpu order by time").await;
let expected = vec![
"+-----+-------------------------------+",
"| bar | time |",
"+-----+-------------------------------+",
"| 1 | 1970-01-01 00:00:00.000000010 |",
"| 2 | 1970-01-01 00:00:00.000000020 |",
"| 3 | 1970-01-01 00:00:00.000000030 |",
"+-----+-------------------------------+",
];
assert_batches_eq!(expected, &batches);