Merge pull request #6817 from influxdata/dom/wal-metrics
feat(metrics): ingester2 WAL replaypull/24376/head
commit
7055abcddf
|
@ -64,9 +64,14 @@ fn wal_replay_bench(c: &mut Criterion) {
|
|||
let persist = ingester2::persist::queue::benches::MockPersistQueue::default();
|
||||
|
||||
// Replay the wal into the NOP.
|
||||
ingester2::benches::replay(&wal, &sink, Arc::new(persist))
|
||||
.await
|
||||
.expect("WAL replay error");
|
||||
ingester2::benches::replay(
|
||||
&wal,
|
||||
&sink,
|
||||
Arc::new(persist),
|
||||
&metric::Registry::default(),
|
||||
)
|
||||
.await
|
||||
.expect("WAL replay error");
|
||||
},
|
||||
// Use the WAL for one test invocation only, and re-create a new one
|
||||
// for the next iteration.
|
||||
|
|
|
@ -320,9 +320,10 @@ where
|
|||
let wal = Wal::new(wal_directory).await.map_err(InitError::WalInit)?;
|
||||
|
||||
// Replay the WAL log files, if any.
|
||||
let max_sequence_number = wal_replay::replay(&wal, &buffer, Arc::clone(&persist_handle))
|
||||
.await
|
||||
.map_err(|e| InitError::WalReplay(e.into()))?;
|
||||
let max_sequence_number =
|
||||
wal_replay::replay(&wal, &buffer, Arc::clone(&persist_handle), &metrics)
|
||||
.await
|
||||
.map_err(|e| InitError::WalReplay(e.into()))?;
|
||||
|
||||
// Build the chain of DmlSink that forms the write path.
|
||||
let write_path = DmlSinkInstrumentation::new(
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use data_types::{NamespaceId, PartitionKey, Sequence, SequenceNumber, TableId};
|
||||
use dml::{DmlMeta, DmlOperation, DmlWrite};
|
||||
use generated_types::influxdata::iox::wal::v1::sequenced_wal_op::Op;
|
||||
use metric::U64Counter;
|
||||
use mutable_batch_pb::decode::decode_database_batch;
|
||||
use observability_deps::tracing::*;
|
||||
use std::time::Instant;
|
||||
|
@ -47,6 +48,7 @@ pub async fn replay<T, P>(
|
|||
wal: &Wal,
|
||||
sink: &T,
|
||||
persist: P,
|
||||
metrics: &metric::Registry,
|
||||
) -> Result<Option<SequenceNumber>, WalReplayError>
|
||||
where
|
||||
T: DmlSink + PartitionIter,
|
||||
|
@ -62,6 +64,24 @@ where
|
|||
return Ok(None);
|
||||
}
|
||||
|
||||
// Initialise metrics to track the progress of the WAL replay.
|
||||
//
|
||||
// The file count tracks the number of WAL files that have started
|
||||
// replaying, as opposed to finished replaying - this gives us the ability
|
||||
// to monitor WAL replays that hang or otherwise go wrong.
|
||||
let file_count_metric = metrics
|
||||
.register_metric::<U64Counter>(
|
||||
"ingester_wal_replay_files_started",
|
||||
"Number of WAL files that have started to be replayed",
|
||||
)
|
||||
.recorder(&[]);
|
||||
let op_count_metric = metrics
|
||||
.register_metric::<U64Counter>(
|
||||
"ingester_wal_replay_ops",
|
||||
"Number of operations successfully replayed from the WAL",
|
||||
)
|
||||
.recorder(&[]);
|
||||
|
||||
let n_files = files.len();
|
||||
info!(n_files, "found wal files for replay");
|
||||
|
||||
|
@ -74,6 +94,8 @@ where
|
|||
// Map 0-based iter index to 1 based file count
|
||||
let file_number = index + 1;
|
||||
|
||||
file_count_metric.inc(1);
|
||||
|
||||
// Read the segment
|
||||
let reader = wal
|
||||
.reader_for_segment(file.id())
|
||||
|
@ -90,7 +112,7 @@ where
|
|||
);
|
||||
|
||||
// Replay this segment file
|
||||
match replay_file(reader, sink).await? {
|
||||
match replay_file(reader, sink, &op_count_metric).await? {
|
||||
v @ Some(_) => max_sequence = max_sequence.max(v),
|
||||
None => {
|
||||
// This file was empty and should be deleted.
|
||||
|
@ -159,6 +181,7 @@ where
|
|||
async fn replay_file<T>(
|
||||
mut file: wal::ClosedSegmentFileReader,
|
||||
sink: &T,
|
||||
op_count_metric: &U64Counter,
|
||||
) -> Result<Option<SequenceNumber>, WalReplayError>
|
||||
where
|
||||
T: DmlSink,
|
||||
|
@ -227,6 +250,8 @@ where
|
|||
sink.apply(DmlOperation::Write(op))
|
||||
.await
|
||||
.map_err(Into::<DmlError>::into)?;
|
||||
|
||||
op_count_metric.inc(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -238,6 +263,7 @@ mod tests {
|
|||
use assert_matches::assert_matches;
|
||||
use async_trait::async_trait;
|
||||
use data_types::{NamespaceId, PartitionId, PartitionKey, ShardId, TableId};
|
||||
use metric::{Attributes, Metric};
|
||||
use parking_lot::Mutex;
|
||||
use wal::Wal;
|
||||
|
||||
|
@ -384,7 +410,8 @@ mod tests {
|
|||
partitions: vec![Arc::new(Mutex::new(partition))],
|
||||
};
|
||||
|
||||
let max_sequence_number = replay(&wal, &mock_iter, Arc::clone(&persist))
|
||||
let metrics = metric::Registry::default();
|
||||
let max_sequence_number = replay(&wal, &mock_iter, Arc::clone(&persist), &metrics)
|
||||
.await
|
||||
.expect("failed to replay WAL");
|
||||
|
||||
|
@ -416,5 +443,21 @@ mod tests {
|
|||
.expect("failed to initialise WAL");
|
||||
|
||||
assert_eq!(wal.closed_segments().len(), 1);
|
||||
|
||||
// Validate the expected metric values were populated.
|
||||
let files = metrics
|
||||
.get_instrument::<Metric<U64Counter>>("ingester_wal_replay_files_started")
|
||||
.expect("file counter not found")
|
||||
.get_observer(&Attributes::from([]))
|
||||
.expect("attributes not found")
|
||||
.fetch();
|
||||
assert_eq!(files, 2);
|
||||
let ops = metrics
|
||||
.get_instrument::<Metric<U64Counter>>("ingester_wal_replay_ops")
|
||||
.expect("file counter not found")
|
||||
.get_observer(&Attributes::from([]))
|
||||
.expect("attributes not found")
|
||||
.fetch();
|
||||
assert_eq!(ops, 3);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue