diff --git a/Cargo.lock b/Cargo.lock index ad8476e532..750b705120 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2608,6 +2608,7 @@ dependencies = [ "predicate", "predicates", "pretty_assertions", + "proptest", "prost", "rustyline", "schema", diff --git a/influxdb_iox/Cargo.toml b/influxdb_iox/Cargo.toml index 5b6ec55837..80a622ced9 100644 --- a/influxdb_iox/Cargo.toml +++ b/influxdb_iox/Cargo.toml @@ -91,6 +91,7 @@ mutable_batch = { path = "../mutable_batch" } predicate = { path = "../predicate" } predicates = "3.0.3" pretty_assertions = "1.3.0" +proptest = "1.2.0" serde = "1.0.164" test_helpers = { path = "../test_helpers", features = ["future_timeout"] } test_helpers_end_to_end = { path = "../test_helpers_end_to_end" } diff --git a/influxdb_iox/src/commands/debug/wal/inspect.rs b/influxdb_iox/src/commands/debug/wal/inspect.rs new file mode 100644 index 0000000000..e95dcfe7bc --- /dev/null +++ b/influxdb_iox/src/commands/debug/wal/inspect.rs @@ -0,0 +1,144 @@ +//! A module providing a CLI command to inspect the contents of a WAL file. +use std::{io::Write, ops::RangeInclusive, path::PathBuf}; + +use itertools::Itertools; +use wal::SequencedWalOp; + +use super::Error; + +#[derive(Debug, clap::Parser)] +pub struct Config { + /// The path to the input WAL file + #[clap(value_parser)] + input: PathBuf, + + /// An optional range of sequence numbers to restrict the inspection to, in + /// the format "%d-%d". Only entries that have a sequence number falling + /// within the range (inclusive) will be displayed + #[clap(long, short, value_parser = parse_sequence_number_range)] + sequence_number_range: Option>, +} + +fn parse_sequence_number_range(s: &str) -> Result, String> { + let parts: Vec<&str> = s.split('-').collect(); + if parts.len() != 2 { + return Err("sequence number range provided does not use format -".to_string()); + } + + let min = parts[0] + .parse() + .map_err(|_| format!("{} isn't a valid sequence number", parts[0]))?; + let max = parts[1] + .parse() + .map_err(|_| format!("{} isn't a valid sequence number", parts[1]))?; + if max < min { + Err(format!( + "invalid sequence number range provided, {max} is less than {min}" + )) + } else { + Ok(RangeInclusive::new(min, max)) + } +} + +pub fn command(config: Config) -> Result<(), Error> { + let reader = wal::ClosedSegmentFileReader::from_path(&config.input) + .map_err(Error::UnableToReadWalFile)?; + + inspect(config.sequence_number_range, &mut std::io::stdout(), reader) +} + +fn inspect( + sequence_number_range: Option>, + output: &mut W, + reader: R, +) -> Result<(), Error> +where + W: Write, + R: Iterator, wal::Error>>, +{ + let mut inspect_errors = Vec::::new(); + + let formatter = reader + .flatten_ok() + .filter_ok(|op| { + sequence_number_range + .as_ref() + .map_or(true, |range| range.contains(&op.sequence_number)) + }) + .format_with(",\n", |op, f| match op { + Ok(op) => f(&format_args!("{:#?}", op)), + Err(e) => { + let err_string = e.to_string(); + inspect_errors.push(e); + f(&err_string) + } + }); + + let result = writeln!(output, "{}", formatter); + + if inspect_errors.is_empty() { + result.map_err(Error::IoFailure) + } else { + Err(Error::IncompleteInspection { + sources: inspect_errors, + }) + } +} + +#[cfg(test)] +mod tests { + use generated_types::influxdata::iox::wal::v1::sequenced_wal_op::Op as WalOp; + use proptest::{prelude::*, prop_assert}; + + use super::*; + + fn arbitrary_sequence_wal_op(seq_number: u64) -> SequencedWalOp { + SequencedWalOp { + sequence_number: seq_number, + table_write_sequence_numbers: Default::default(), + op: WalOp::Write(Default::default()), + } + } + + #[test] + fn test_range_filters_operations() { + let mut sink = Vec::::new(); + + inspect( + Some(RangeInclusive::new(2, 3)), + &mut sink, + [ + Ok(vec![ + arbitrary_sequence_wal_op(1), + arbitrary_sequence_wal_op(2), + ]), + Ok(vec![ + arbitrary_sequence_wal_op(3), + arbitrary_sequence_wal_op(4), + arbitrary_sequence_wal_op(5), + ]), + ] + .into_iter(), + ) + .expect("should inspect entries given without error"); + + let results = String::from_utf8(sink).expect("failed to recover string from write sink"); + + // Expect two operations inspected, with the appropriate sequence numbers + assert_eq!(results.matches("SequencedWalOp").count(), 2); + assert_eq!(results.matches("sequence_number: 2").count(), 1); + assert_eq!(results.matches("sequence_number: 3").count(), 1); + } + + proptest! { + #[test] + fn test_sequence_number_range_parsing(a in any::(), b in any::()) { + let input = format!("{}-{}", a, b); + + match parse_sequence_number_range(input.as_str()) { + Ok(_) => prop_assert!(a <= b), + Err(_) => prop_assert!(a > b), + } + } + } +} diff --git a/influxdb_iox/src/commands/debug/wal/mod.rs b/influxdb_iox/src/commands/debug/wal/mod.rs index f0d1438e80..072779e87d 100644 --- a/influxdb_iox/src/commands/debug/wal/mod.rs +++ b/influxdb_iox/src/commands/debug/wal/mod.rs @@ -5,14 +5,15 @@ use futures::Future; use influxdb_iox_client::connection::Connection; use thiserror::Error; +mod inspect; mod regenerate_lp; /// A command level error type to decorate WAL errors with some extra /// "human" context for the user #[derive(Debug, Error)] pub enum Error { - #[error("could not open WAL file: {0}")] - UnableToOpenWalFile(#[from] wal::Error), + #[error("could not read WAL file: {0}")] + UnableToReadWalFile(#[from] wal::Error), #[error("failed to decode write entries from the WAL file: {0}")] FailedToDecodeWriteOpEntry(#[from] wal::DecodeError), @@ -27,6 +28,9 @@ pub enum Error { #[error("i/o failure: {0}")] IoFailure(#[from] std::io::Error), + + #[error("errors occurred during inspection of the WAL file: {sources:?}")] + IncompleteInspection { sources: Vec }, } /// A set of non-fatal errors which can occur during the regeneration of write @@ -48,6 +52,8 @@ pub struct Config { /// Subcommands for debugging the ingester WAL #[derive(Debug, clap::Parser)] enum Command { + /// Inspect the encoded contents of a WAL file in a human readable manner + Inspect(inspect::Config), /// Regenerate line protocol writes from the contents of a WAL file. When /// looking up measurement names from IOx, the target host must implement /// the namespace and schema APIs @@ -61,6 +67,7 @@ where CFut: Send + Future, { match config.command { + Command::Inspect(config) => inspect::command(config), Command::RegenerateLp(config) => regenerate_lp::command(connection, config).await, } } diff --git a/influxdb_iox/src/commands/debug/wal/regenerate_lp.rs b/influxdb_iox/src/commands/debug/wal/regenerate_lp.rs index 1c00ca4a4c..8f0f747264 100644 --- a/influxdb_iox/src/commands/debug/wal/regenerate_lp.rs +++ b/influxdb_iox/src/commands/debug/wal/regenerate_lp.rs @@ -114,7 +114,7 @@ where CFut: Send + Future, { let decoder = WriteOpEntryDecoder::from( - ClosedSegmentFileReader::from_path(&config.input).map_err(Error::UnableToOpenWalFile)?, + ClosedSegmentFileReader::from_path(&config.input).map_err(Error::UnableToReadWalFile)?, ); let table_name_indexer = if config.skip_measurement_lookup { diff --git a/influxdb_iox/tests/end_to_end_cases/cli.rs b/influxdb_iox/tests/end_to_end_cases/cli.rs index 8cb0d184cc..732e4d4ffa 100644 --- a/influxdb_iox/tests/end_to_end_cases/cli.rs +++ b/influxdb_iox/tests/end_to_end_cases/cli.rs @@ -1,4 +1,5 @@ //! Tests CLI commands +use std::fs::read_dir; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -1210,7 +1211,7 @@ async fn write_lp_from_wal() { let wal_dir = Arc::clone(&wal_dir); async move { let mut reader = - fs::read_dir(wal_dir.as_path()).expect("failed to read WAL directory"); + read_dir(wal_dir.as_path()).expect("failed to read WAL directory"); let segment_file_path = reader .next() .expect("no segment file found") @@ -1246,7 +1247,7 @@ async fn write_lp_from_wal() { .success(); let mut reader = - fs::read_dir(out_dir.path()).expect("failed to read output directory"); + read_dir(out_dir.path()).expect("failed to read output directory"); let regenerated_file_path = reader .next() @@ -1327,3 +1328,84 @@ async fn assert_ingester_contains_results( assert_batches_sorted_eq!(expected, &ingester_response.record_batches); } + +#[tokio::test] +async fn inspect_entries_from_wal() { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + let ingester_config = TestConfig::new_ingester_never_persist(&database_url); + let router_config = TestConfig::new_router(&ingester_config); + let wal_dir = Arc::new(std::path::PathBuf::from( + ingester_config + .wal_dir() + .as_ref() + .expect("missing WAL dir") + .path(), + )); + + let mut cluster = MiniCluster::new() + .with_ingester(ingester_config) + .await + .with_router(router_config) + .await; + + StepTest::new( + &mut cluster, + vec![ + // Perform 3 separate writes, then inspect the WAL and ensure that + // they can all be accounted for in the output by the sequencing. + Step::WriteLineProtocol("bananas,quality=fresh,taste=good val=42i 123456".to_string()), + Step::WriteLineProtocol("arán,quality=fresh,taste=best val=42i 654321".to_string()), + Step::WriteLineProtocol("arán,quality=stale,taste=crunchy val=42i 654456".to_string()), + Step::Custom(Box::new(move |_| { + let wal_dir = Arc::clone(&wal_dir); + async move { + let mut reader = + read_dir(wal_dir.as_path()).expect("failed to read WAL directory"); + let segment_file_path = reader + .next() + .expect("no segment file found") + .unwrap() + .path(); + assert_matches!(reader.next(), None); + + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("debug") + .arg("wal") + .arg("inspect") + .arg( + segment_file_path + .to_str() + .expect("should be able to get segment file path as string"), + ) + .assert() + .success() + .stdout(predicate::str::contains("SequencedWalOp").count(3)); + + // Re-inspect the log, but filter for WAL operations with + // sequence numbers in a range. + Command::cargo_bin("influxdb_iox") + .unwrap() + .arg("debug") + .arg("wal") + .arg("inspect") + .arg("--sequence-number-range") + .arg("1-2") + .arg( + segment_file_path + .to_str() + .expect("should be able to get segment file path as string"), + ) + .assert() + .success() + .stdout(predicate::str::contains("SequencedWalOp").count(2)); + } + .boxed() + })), + ], + ) + .run() + .await +} diff --git a/ingester/src/init/wal_replay.rs b/ingester/src/init/wal_replay.rs index 3b2ddf5041..deb3388a29 100644 --- a/ingester/src/init/wal_replay.rs +++ b/ingester/src/init/wal_replay.rs @@ -178,7 +178,7 @@ where /// Replay the entries in `file`, applying them to `buffer`. Returns the highest /// sequence number observed in the file, or [`None`] if the file was empty. async fn replay_file( - mut file: wal::ClosedSegmentFileReader, + file: wal::ClosedSegmentFileReader, sink: &T, op_count_metric: &U64Counter, ) -> Result, WalReplayError> @@ -188,17 +188,8 @@ where let mut max_sequence = None; let start = Instant::now(); - loop { - let ops = match file.next_batch() { - Ok(Some(v)) => v, - Ok(None) => { - // This file is complete, return the last observed sequence - // number. - debug!("wal file replayed in {:?}", start.elapsed()); - return Ok(max_sequence); - } - Err(e) => return Err(WalReplayError::ReadEntry(e)), - }; + for batch in file { + let ops = batch.map_err(WalReplayError::ReadEntry)?; for op in ops { let SequencedWalOp { @@ -251,6 +242,11 @@ where op_count_metric.inc(1); } } + + // This file is complete, return the last observed sequence + // number. + debug!("wal file replayed in {:?}", start.elapsed()); + Ok(max_sequence) } #[cfg(test)] diff --git a/ingester/src/wal/wal_sink.rs b/ingester/src/wal/wal_sink.rs index 1609832579..b112b6e7b0 100644 --- a/ingester/src/wal/wal_sink.rs +++ b/ingester/src/wal/wal_sink.rs @@ -213,15 +213,11 @@ mod tests { let file = assert_matches!(&*files, [f] => f, "expected 1 file"); // Open a reader - let mut reader = wal + let ops: Vec = wal .reader_for_segment(file.id()) - .expect("failed to obtain reader"); - - // Obtain all the ops in the file - let mut ops = Vec::new(); - while let Ok(Some(mut batch)) = reader.next_batch() { - ops.append(&mut batch); - } + .expect("failed to obtain reader for WAL segment") + .flat_map(|batch| batch.expect("failed to read WAL op batch")) + .collect(); // Extract the op payload read from the WAL let read_op = assert_matches!(&*ops, [op] => op, "expected 1 DML operation"); diff --git a/ingester/tests/write.rs b/ingester/tests/write.rs index e7f9a20bdb..05de79f0e5 100644 --- a/ingester/tests/write.rs +++ b/ingester/tests/write.rs @@ -428,12 +428,7 @@ async fn graceful_shutdown() { .expect("failed to open wal segment"); // Assert the file contains no operations - assert_matches!( - reader - .next_batch() - .expect("failed to read wal segment contents"), - None - ); + assert_matches!(reader.next(), None); // Validate the parquet files were added to the catalog during shutdown. let parquet_files = catalog diff --git a/wal/src/lib.rs b/wal/src/lib.rs index 98d7347a41..ed7b3028c0 100644 --- a/wal/src/lib.rs +++ b/wal/src/lib.rs @@ -553,12 +553,19 @@ pub struct ClosedSegmentFileReader { file: RawClosedSegmentFileReader>, } -impl ClosedSegmentFileReader { - /// Get the next batch of sequenced wal ops from the file - pub fn next_batch(&mut self) -> Result>> { - self.file.next_batch().context(UnableToReadNextOpsSnafu) - } +impl Iterator for ClosedSegmentFileReader { + type Item = Result>; + /// Read the next batch of sequenced WAL operations from the file + fn next(&mut self) -> Option { + self.file + .next_batch() + .context(UnableToReadNextOpsSnafu) + .transpose() + } +} + +impl ClosedSegmentFileReader { /// Return the segment file id pub fn id(&self) -> SegmentId { self.id @@ -624,9 +631,8 @@ impl Iterator for WriteOpEntryDecoder { fn next(&mut self) -> Option { Some( self.reader - .next_batch() + .next()? .context(FailedToReadWalSnafu) - .transpose()? .map(|batch| { batch .into_iter() @@ -727,12 +733,11 @@ mod tests { // TODO(savage): Returned SequenceNumberSet should reflect `partition_sequence_numbers` post-change. let (closed, ids) = wal.rotate().unwrap(); - let mut reader = wal.reader_for_segment(closed.id).unwrap(); - - let mut ops = vec![]; - while let Ok(Some(mut batch)) = reader.next_batch() { - ops.append(&mut batch); - } + let ops: Vec = wal + .reader_for_segment(closed.id) + .expect("should be able to open reader for closed WAL segment") + .flat_map(|batch| batch.expect("failed to read WAL op batch")) + .collect(); assert_eq!(vec![op1, op2, op3, op4], ops); // Assert the set has recorded the op IDs. @@ -796,7 +801,7 @@ mod tests { // There aren't any entries in the closed segment because nothing was written let mut reader = wal.reader_for_segment(closed_segment_details.id()).unwrap(); - assert!(reader.next_batch().unwrap().is_none()); + assert!(reader.next().is_none()); // Can delete an empty segment, leaving no closed segments again wal.delete(closed_segment_details.id()).await.unwrap(); diff --git a/wal/tests/end_to_end.rs b/wal/tests/end_to_end.rs index f01b86f9c0..8859ba38d4 100644 --- a/wal/tests/end_to_end.rs +++ b/wal/tests/end_to_end.rs @@ -57,13 +57,13 @@ async fn crud() { // ensuring the per-partition sequence numbers match up to the current // op-level sequence number while it is the source of truth. let mut reader = wal.reader_for_segment(closed_segment_details.id()).unwrap(); - let op = reader.next_batch().unwrap().unwrap(); + let op = reader.next().unwrap().unwrap(); assert_eq!(op[0].sequence_number, 42); op[0] .table_write_sequence_numbers .values() .for_each(|sequence_number| assert_eq!(*sequence_number, op[0].sequence_number)); - let op = reader.next_batch().unwrap().unwrap(); + let op = reader.next().unwrap().unwrap(); assert_eq!(op[0].sequence_number, 43); op[0] .table_write_sequence_numbers @@ -107,7 +107,7 @@ async fn replay() { // ensuring the per-partition sequence numbers match up to the current // op-level sequence number while it is the source of truth. let mut reader = wal.reader_for_segment(closed_segment_ids[0]).unwrap(); - let op = reader.next_batch().unwrap().unwrap(); + let op = reader.next().unwrap().unwrap(); assert_eq!(op[0].sequence_number, 42); op[0] .table_write_sequence_numbers @@ -116,7 +116,7 @@ async fn replay() { // Can read the written entries from the previously open segment let mut reader = wal.reader_for_segment(closed_segment_ids[1]).unwrap(); - let op = reader.next_batch().unwrap().unwrap(); + let op = reader.next().unwrap().unwrap(); assert_eq!(op[0].sequence_number, 43); op[0] .table_write_sequence_numbers