Merge pull request #7962 from influxdata/savage/inspect-wal-contents

feat(cli): Add `influxdb_iox debug wal inspect` command
pull/24376/head
kodiakhq[bot] 2023-06-12 14:27:41 +00:00 committed by GitHub
commit f58e647d3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 276 additions and 49 deletions

1
Cargo.lock generated
View File

@ -2608,6 +2608,7 @@ dependencies = [
"predicate",
"predicates",
"pretty_assertions",
"proptest",
"prost",
"rustyline",
"schema",

View File

@ -91,6 +91,7 @@ mutable_batch = { path = "../mutable_batch" }
predicate = { path = "../predicate" }
predicates = "3.0.3"
pretty_assertions = "1.3.0"
proptest = "1.2.0"
serde = "1.0.164"
test_helpers = { path = "../test_helpers", features = ["future_timeout"] }
test_helpers_end_to_end = { path = "../test_helpers_end_to_end" }

View File

@ -0,0 +1,144 @@
//! A module providing a CLI command to inspect the contents of a WAL file.
use std::{io::Write, ops::RangeInclusive, path::PathBuf};
use itertools::Itertools;
use wal::SequencedWalOp;
use super::Error;
#[derive(Debug, clap::Parser)]
pub struct Config {
/// The path to the input WAL file
#[clap(value_parser)]
input: PathBuf,
/// An optional range of sequence numbers to restrict the inspection to, in
/// the format "%d-%d". Only entries that have a sequence number falling
/// within the range (inclusive) will be displayed
#[clap(long, short, value_parser = parse_sequence_number_range)]
sequence_number_range: Option<RangeInclusive<u64>>,
}
fn parse_sequence_number_range(s: &str) -> Result<RangeInclusive<u64>, String> {
let parts: Vec<&str> = s.split('-').collect();
if parts.len() != 2 {
return Err("sequence number range provided does not use format <START>-<END>".to_string());
}
let min = parts[0]
.parse()
.map_err(|_| format!("{} isn't a valid sequence number", parts[0]))?;
let max = parts[1]
.parse()
.map_err(|_| format!("{} isn't a valid sequence number", parts[1]))?;
if max < min {
Err(format!(
"invalid sequence number range provided, {max} is less than {min}"
))
} else {
Ok(RangeInclusive::new(min, max))
}
}
pub fn command(config: Config) -> Result<(), Error> {
let reader = wal::ClosedSegmentFileReader::from_path(&config.input)
.map_err(Error::UnableToReadWalFile)?;
inspect(config.sequence_number_range, &mut std::io::stdout(), reader)
}
fn inspect<W, R>(
sequence_number_range: Option<RangeInclusive<u64>>,
output: &mut W,
reader: R,
) -> Result<(), Error>
where
W: Write,
R: Iterator<Item = Result<Vec<SequencedWalOp>, wal::Error>>,
{
let mut inspect_errors = Vec::<wal::Error>::new();
let formatter = reader
.flatten_ok()
.filter_ok(|op| {
sequence_number_range
.as_ref()
.map_or(true, |range| range.contains(&op.sequence_number))
})
.format_with(",\n", |op, f| match op {
Ok(op) => f(&format_args!("{:#?}", op)),
Err(e) => {
let err_string = e.to_string();
inspect_errors.push(e);
f(&err_string)
}
});
let result = writeln!(output, "{}", formatter);
if inspect_errors.is_empty() {
result.map_err(Error::IoFailure)
} else {
Err(Error::IncompleteInspection {
sources: inspect_errors,
})
}
}
#[cfg(test)]
mod tests {
use generated_types::influxdata::iox::wal::v1::sequenced_wal_op::Op as WalOp;
use proptest::{prelude::*, prop_assert};
use super::*;
fn arbitrary_sequence_wal_op(seq_number: u64) -> SequencedWalOp {
SequencedWalOp {
sequence_number: seq_number,
table_write_sequence_numbers: Default::default(),
op: WalOp::Write(Default::default()),
}
}
#[test]
fn test_range_filters_operations() {
let mut sink = Vec::<u8>::new();
inspect(
Some(RangeInclusive::new(2, 3)),
&mut sink,
[
Ok(vec![
arbitrary_sequence_wal_op(1),
arbitrary_sequence_wal_op(2),
]),
Ok(vec![
arbitrary_sequence_wal_op(3),
arbitrary_sequence_wal_op(4),
arbitrary_sequence_wal_op(5),
]),
]
.into_iter(),
)
.expect("should inspect entries given without error");
let results = String::from_utf8(sink).expect("failed to recover string from write sink");
// Expect two operations inspected, with the appropriate sequence numbers
assert_eq!(results.matches("SequencedWalOp").count(), 2);
assert_eq!(results.matches("sequence_number: 2").count(), 1);
assert_eq!(results.matches("sequence_number: 3").count(), 1);
}
proptest! {
#[test]
fn test_sequence_number_range_parsing(a in any::<u64>(), b in any::<u64>()) {
let input = format!("{}-{}", a, b);
match parse_sequence_number_range(input.as_str()) {
Ok(_) => prop_assert!(a <= b),
Err(_) => prop_assert!(a > b),
}
}
}
}

View File

@ -5,14 +5,15 @@ use futures::Future;
use influxdb_iox_client::connection::Connection;
use thiserror::Error;
mod inspect;
mod regenerate_lp;
/// A command level error type to decorate WAL errors with some extra
/// "human" context for the user
#[derive(Debug, Error)]
pub enum Error {
#[error("could not open WAL file: {0}")]
UnableToOpenWalFile(#[from] wal::Error),
#[error("could not read WAL file: {0}")]
UnableToReadWalFile(#[from] wal::Error),
#[error("failed to decode write entries from the WAL file: {0}")]
FailedToDecodeWriteOpEntry(#[from] wal::DecodeError),
@ -27,6 +28,9 @@ pub enum Error {
#[error("i/o failure: {0}")]
IoFailure(#[from] std::io::Error),
#[error("errors occurred during inspection of the WAL file: {sources:?}")]
IncompleteInspection { sources: Vec<wal::Error> },
}
/// A set of non-fatal errors which can occur during the regeneration of write
@ -48,6 +52,8 @@ pub struct Config {
/// Subcommands for debugging the ingester WAL
#[derive(Debug, clap::Parser)]
enum Command {
/// Inspect the encoded contents of a WAL file in a human readable manner
Inspect(inspect::Config),
/// Regenerate line protocol writes from the contents of a WAL file. When
/// looking up measurement names from IOx, the target host must implement
/// the namespace and schema APIs
@ -61,6 +67,7 @@ where
CFut: Send + Future<Output = Connection>,
{
match config.command {
Command::Inspect(config) => inspect::command(config),
Command::RegenerateLp(config) => regenerate_lp::command(connection, config).await,
}
}

View File

@ -114,7 +114,7 @@ where
CFut: Send + Future<Output = Connection>,
{
let decoder = WriteOpEntryDecoder::from(
ClosedSegmentFileReader::from_path(&config.input).map_err(Error::UnableToOpenWalFile)?,
ClosedSegmentFileReader::from_path(&config.input).map_err(Error::UnableToReadWalFile)?,
);
let table_name_indexer = if config.skip_measurement_lookup {

View File

@ -1,4 +1,5 @@
//! Tests CLI commands
use std::fs::read_dir;
use std::sync::Arc;
use std::time::{Duration, Instant};
@ -1210,7 +1211,7 @@ async fn write_lp_from_wal() {
let wal_dir = Arc::clone(&wal_dir);
async move {
let mut reader =
fs::read_dir(wal_dir.as_path()).expect("failed to read WAL directory");
read_dir(wal_dir.as_path()).expect("failed to read WAL directory");
let segment_file_path = reader
.next()
.expect("no segment file found")
@ -1246,7 +1247,7 @@ async fn write_lp_from_wal() {
.success();
let mut reader =
fs::read_dir(out_dir.path()).expect("failed to read output directory");
read_dir(out_dir.path()).expect("failed to read output directory");
let regenerated_file_path = reader
.next()
@ -1327,3 +1328,84 @@ async fn assert_ingester_contains_results(
assert_batches_sorted_eq!(expected, &ingester_response.record_batches);
}
#[tokio::test]
async fn inspect_entries_from_wal() {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
let ingester_config = TestConfig::new_ingester_never_persist(&database_url);
let router_config = TestConfig::new_router(&ingester_config);
let wal_dir = Arc::new(std::path::PathBuf::from(
ingester_config
.wal_dir()
.as_ref()
.expect("missing WAL dir")
.path(),
));
let mut cluster = MiniCluster::new()
.with_ingester(ingester_config)
.await
.with_router(router_config)
.await;
StepTest::new(
&mut cluster,
vec![
// Perform 3 separate writes, then inspect the WAL and ensure that
// they can all be accounted for in the output by the sequencing.
Step::WriteLineProtocol("bananas,quality=fresh,taste=good val=42i 123456".to_string()),
Step::WriteLineProtocol("arán,quality=fresh,taste=best val=42i 654321".to_string()),
Step::WriteLineProtocol("arán,quality=stale,taste=crunchy val=42i 654456".to_string()),
Step::Custom(Box::new(move |_| {
let wal_dir = Arc::clone(&wal_dir);
async move {
let mut reader =
read_dir(wal_dir.as_path()).expect("failed to read WAL directory");
let segment_file_path = reader
.next()
.expect("no segment file found")
.unwrap()
.path();
assert_matches!(reader.next(), None);
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("debug")
.arg("wal")
.arg("inspect")
.arg(
segment_file_path
.to_str()
.expect("should be able to get segment file path as string"),
)
.assert()
.success()
.stdout(predicate::str::contains("SequencedWalOp").count(3));
// Re-inspect the log, but filter for WAL operations with
// sequence numbers in a range.
Command::cargo_bin("influxdb_iox")
.unwrap()
.arg("debug")
.arg("wal")
.arg("inspect")
.arg("--sequence-number-range")
.arg("1-2")
.arg(
segment_file_path
.to_str()
.expect("should be able to get segment file path as string"),
)
.assert()
.success()
.stdout(predicate::str::contains("SequencedWalOp").count(2));
}
.boxed()
})),
],
)
.run()
.await
}

View File

@ -178,7 +178,7 @@ where
/// Replay the entries in `file`, applying them to `buffer`. Returns the highest
/// sequence number observed in the file, or [`None`] if the file was empty.
async fn replay_file<T>(
mut file: wal::ClosedSegmentFileReader,
file: wal::ClosedSegmentFileReader,
sink: &T,
op_count_metric: &U64Counter,
) -> Result<Option<SequenceNumber>, WalReplayError>
@ -188,17 +188,8 @@ where
let mut max_sequence = None;
let start = Instant::now();
loop {
let ops = match file.next_batch() {
Ok(Some(v)) => v,
Ok(None) => {
// This file is complete, return the last observed sequence
// number.
debug!("wal file replayed in {:?}", start.elapsed());
return Ok(max_sequence);
}
Err(e) => return Err(WalReplayError::ReadEntry(e)),
};
for batch in file {
let ops = batch.map_err(WalReplayError::ReadEntry)?;
for op in ops {
let SequencedWalOp {
@ -251,6 +242,11 @@ where
op_count_metric.inc(1);
}
}
// This file is complete, return the last observed sequence
// number.
debug!("wal file replayed in {:?}", start.elapsed());
Ok(max_sequence)
}
#[cfg(test)]

View File

@ -213,15 +213,11 @@ mod tests {
let file = assert_matches!(&*files, [f] => f, "expected 1 file");
// Open a reader
let mut reader = wal
let ops: Vec<SequencedWalOp> = wal
.reader_for_segment(file.id())
.expect("failed to obtain reader");
// Obtain all the ops in the file
let mut ops = Vec::new();
while let Ok(Some(mut batch)) = reader.next_batch() {
ops.append(&mut batch);
}
.expect("failed to obtain reader for WAL segment")
.flat_map(|batch| batch.expect("failed to read WAL op batch"))
.collect();
// Extract the op payload read from the WAL
let read_op = assert_matches!(&*ops, [op] => op, "expected 1 DML operation");

View File

@ -428,12 +428,7 @@ async fn graceful_shutdown() {
.expect("failed to open wal segment");
// Assert the file contains no operations
assert_matches!(
reader
.next_batch()
.expect("failed to read wal segment contents"),
None
);
assert_matches!(reader.next(), None);
// Validate the parquet files were added to the catalog during shutdown.
let parquet_files = catalog

View File

@ -553,12 +553,19 @@ pub struct ClosedSegmentFileReader {
file: RawClosedSegmentFileReader<BufReader<File>>,
}
impl ClosedSegmentFileReader {
/// Get the next batch of sequenced wal ops from the file
pub fn next_batch(&mut self) -> Result<Option<Vec<SequencedWalOp>>> {
self.file.next_batch().context(UnableToReadNextOpsSnafu)
}
impl Iterator for ClosedSegmentFileReader {
type Item = Result<Vec<SequencedWalOp>>;
/// Read the next batch of sequenced WAL operations from the file
fn next(&mut self) -> Option<Self::Item> {
self.file
.next_batch()
.context(UnableToReadNextOpsSnafu)
.transpose()
}
}
impl ClosedSegmentFileReader {
/// Return the segment file id
pub fn id(&self) -> SegmentId {
self.id
@ -624,9 +631,8 @@ impl Iterator for WriteOpEntryDecoder {
fn next(&mut self) -> Option<Self::Item> {
Some(
self.reader
.next_batch()
.next()?
.context(FailedToReadWalSnafu)
.transpose()?
.map(|batch| {
batch
.into_iter()
@ -727,12 +733,11 @@ mod tests {
// TODO(savage): Returned SequenceNumberSet should reflect `partition_sequence_numbers` post-change.
let (closed, ids) = wal.rotate().unwrap();
let mut reader = wal.reader_for_segment(closed.id).unwrap();
let mut ops = vec![];
while let Ok(Some(mut batch)) = reader.next_batch() {
ops.append(&mut batch);
}
let ops: Vec<SequencedWalOp> = wal
.reader_for_segment(closed.id)
.expect("should be able to open reader for closed WAL segment")
.flat_map(|batch| batch.expect("failed to read WAL op batch"))
.collect();
assert_eq!(vec![op1, op2, op3, op4], ops);
// Assert the set has recorded the op IDs.
@ -796,7 +801,7 @@ mod tests {
// There aren't any entries in the closed segment because nothing was written
let mut reader = wal.reader_for_segment(closed_segment_details.id()).unwrap();
assert!(reader.next_batch().unwrap().is_none());
assert!(reader.next().is_none());
// Can delete an empty segment, leaving no closed segments again
wal.delete(closed_segment_details.id()).await.unwrap();

View File

@ -57,13 +57,13 @@ async fn crud() {
// ensuring the per-partition sequence numbers match up to the current
// op-level sequence number while it is the source of truth.
let mut reader = wal.reader_for_segment(closed_segment_details.id()).unwrap();
let op = reader.next_batch().unwrap().unwrap();
let op = reader.next().unwrap().unwrap();
assert_eq!(op[0].sequence_number, 42);
op[0]
.table_write_sequence_numbers
.values()
.for_each(|sequence_number| assert_eq!(*sequence_number, op[0].sequence_number));
let op = reader.next_batch().unwrap().unwrap();
let op = reader.next().unwrap().unwrap();
assert_eq!(op[0].sequence_number, 43);
op[0]
.table_write_sequence_numbers
@ -107,7 +107,7 @@ async fn replay() {
// ensuring the per-partition sequence numbers match up to the current
// op-level sequence number while it is the source of truth.
let mut reader = wal.reader_for_segment(closed_segment_ids[0]).unwrap();
let op = reader.next_batch().unwrap().unwrap();
let op = reader.next().unwrap().unwrap();
assert_eq!(op[0].sequence_number, 42);
op[0]
.table_write_sequence_numbers
@ -116,7 +116,7 @@ async fn replay() {
// Can read the written entries from the previously open segment
let mut reader = wal.reader_for_segment(closed_segment_ids[1]).unwrap();
let op = reader.next_batch().unwrap().unwrap();
let op = reader.next().unwrap().unwrap();
assert_eq!(op[0].sequence_number, 43);
op[0]
.table_write_sequence_numbers