From 67903a4bf2d0bffd6c49e3ee933282a14081d19c Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Thu, 2 Feb 2023 14:52:09 +0100 Subject: [PATCH] feat(metrics): ingester2 WAL replay Adds two metrics: * Number of files replayed (counted at the start of, not completion) * Number of applied ops This will help identify when WAL replay is happening (an indication of an ungraceful shutdown & potential temporary read unavailability). --- ingester2/benches/wal.rs | 11 ++++++-- ingester2/src/init.rs | 7 +++-- ingester2/src/init/wal_replay.rs | 47 ++++++++++++++++++++++++++++++-- 3 files changed, 57 insertions(+), 8 deletions(-) diff --git a/ingester2/benches/wal.rs b/ingester2/benches/wal.rs index 5d197b78d9..cb81c310d3 100644 --- a/ingester2/benches/wal.rs +++ b/ingester2/benches/wal.rs @@ -64,9 +64,14 @@ fn wal_replay_bench(c: &mut Criterion) { let persist = ingester2::persist::queue::benches::MockPersistQueue::default(); // Replay the wal into the NOP. - ingester2::benches::replay(&wal, &sink, Arc::new(persist)) - .await - .expect("WAL replay error"); + ingester2::benches::replay( + &wal, + &sink, + Arc::new(persist), + &metric::Registry::default(), + ) + .await + .expect("WAL replay error"); }, // Use the WAL for one test invocation only, and re-create a new one // for the next iteration. diff --git a/ingester2/src/init.rs b/ingester2/src/init.rs index e179c22915..c2a329be42 100644 --- a/ingester2/src/init.rs +++ b/ingester2/src/init.rs @@ -320,9 +320,10 @@ where let wal = Wal::new(wal_directory).await.map_err(InitError::WalInit)?; // Replay the WAL log files, if any. - let max_sequence_number = wal_replay::replay(&wal, &buffer, Arc::clone(&persist_handle)) - .await - .map_err(|e| InitError::WalReplay(e.into()))?; + let max_sequence_number = + wal_replay::replay(&wal, &buffer, Arc::clone(&persist_handle), &metrics) + .await + .map_err(|e| InitError::WalReplay(e.into()))?; // Build the chain of DmlSink that forms the write path. let write_path = DmlSinkInstrumentation::new( diff --git a/ingester2/src/init/wal_replay.rs b/ingester2/src/init/wal_replay.rs index 2cba5dd743..a8bdad10ab 100644 --- a/ingester2/src/init/wal_replay.rs +++ b/ingester2/src/init/wal_replay.rs @@ -1,6 +1,7 @@ use data_types::{NamespaceId, PartitionKey, Sequence, SequenceNumber, TableId}; use dml::{DmlMeta, DmlOperation, DmlWrite}; use generated_types::influxdata::iox::wal::v1::sequenced_wal_op::Op; +use metric::U64Counter; use mutable_batch_pb::decode::decode_database_batch; use observability_deps::tracing::*; use std::time::Instant; @@ -47,6 +48,7 @@ pub async fn replay( wal: &Wal, sink: &T, persist: P, + metrics: &metric::Registry, ) -> Result, WalReplayError> where T: DmlSink + PartitionIter, @@ -62,6 +64,24 @@ where return Ok(None); } + // Initialise metrics to track the progress of the WAL replay. + // + // The file count tracks the number of WAL files that have started + // replaying, as opposed to finished replaying - this gives us the ability + // to monitor WAL replays that hang or otherwise go wrong. + let file_count_metric = metrics + .register_metric::( + "ingester_wal_replay_files_started", + "Number of WAL files that have started to be replayed", + ) + .recorder(&[]); + let op_count_metric = metrics + .register_metric::( + "ingester_wal_replay_ops", + "Number of operations successfully replayed from the WAL", + ) + .recorder(&[]); + let n_files = files.len(); info!(n_files, "found wal files for replay"); @@ -74,6 +94,8 @@ where // Map 0-based iter index to 1 based file count let file_number = index + 1; + file_count_metric.inc(1); + // Read the segment let reader = wal .reader_for_segment(file.id()) @@ -90,7 +112,7 @@ where ); // Replay this segment file - match replay_file(reader, sink).await? { + match replay_file(reader, sink, &op_count_metric).await? { v @ Some(_) => max_sequence = max_sequence.max(v), None => { // This file was empty and should be deleted. @@ -159,6 +181,7 @@ where async fn replay_file( mut file: wal::ClosedSegmentFileReader, sink: &T, + op_count_metric: &U64Counter, ) -> Result, WalReplayError> where T: DmlSink, @@ -227,6 +250,8 @@ where sink.apply(DmlOperation::Write(op)) .await .map_err(Into::::into)?; + + op_count_metric.inc(1); } } } @@ -238,6 +263,7 @@ mod tests { use assert_matches::assert_matches; use async_trait::async_trait; use data_types::{NamespaceId, PartitionId, PartitionKey, ShardId, TableId}; + use metric::{Attributes, Metric}; use parking_lot::Mutex; use wal::Wal; @@ -384,7 +410,8 @@ mod tests { partitions: vec![Arc::new(Mutex::new(partition))], }; - let max_sequence_number = replay(&wal, &mock_iter, Arc::clone(&persist)) + let metrics = metric::Registry::default(); + let max_sequence_number = replay(&wal, &mock_iter, Arc::clone(&persist), &metrics) .await .expect("failed to replay WAL"); @@ -416,5 +443,21 @@ mod tests { .expect("failed to initialise WAL"); assert_eq!(wal.closed_segments().len(), 1); + + // Validate the expected metric values were populated. + let files = metrics + .get_instrument::>("ingester_wal_replay_files_started") + .expect("file counter not found") + .get_observer(&Attributes::from([])) + .expect("attributes not found") + .fetch(); + assert_eq!(files, 2); + let ops = metrics + .get_instrument::>("ingester_wal_replay_ops") + .expect("file counter not found") + .get_observer(&Attributes::from([])) + .expect("attributes not found") + .fetch(); + assert_eq!(ops, 3); } }