diff --git a/Cargo.lock b/Cargo.lock index 9551cf4d6b..4365e64912 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6328,6 +6328,7 @@ dependencies = [ name = "wal" version = "0.1.0" dependencies = [ + "assert_matches", "async-trait", "byteorder", "bytes", diff --git a/wal/Cargo.toml b/wal/Cargo.toml index 048d49b972..de1d55c06f 100644 --- a/wal/Cargo.toml +++ b/wal/Cargo.toml @@ -29,6 +29,7 @@ tokio-util = "0.7" workspace-hack = { version = "0.1", path = "../workspace-hack" } [dev-dependencies] # In alphabetical order +assert_matches = "1.5.0" dml = { path = "../dml" } mutable_batch_lp = { path = "../mutable_batch_lp" } test_helpers = { path = "../test_helpers" } diff --git a/wal/src/lib.rs b/wal/src/lib.rs index 4a46b34fbe..b60b85a475 100644 --- a/wal/src/lib.rs +++ b/wal/src/lib.rs @@ -562,23 +562,33 @@ impl std::fmt::Debug for ClosedSegmentFileReader { } } +/// An in-memory representation of a WAL write operation entry. #[derive(Debug)] pub struct WriteOpEntry { pub namespace: NamespaceId, pub table_batches: HashMap, } +/// A decoder that reads from a closed segment file and parses write +/// operations from their on-disk format to an internal format. #[derive(Debug)] -pub struct WriteOpDecoder { +pub struct WriteOpEntryDecoder { reader: ClosedSegmentFileReader, } -impl WriteOpDecoder { +impl WriteOpEntryDecoder { + /// Creates a decoder which will use the closed segment file of `reader` to + /// decode write ops from their on-disk format. pub fn from_closed_segment(reader: ClosedSegmentFileReader) -> Self { Self { reader } } - pub fn next_write_entry_batch(&mut self) -> Result>> { + /// Reads a collection of write op entries in the next WAL entry batch from the + /// underlying closed segment. A returned Ok(None) indicates that there are no + /// more entries to be decoded from the underlying segment. A zero-length vector + /// may be returned if there are no writes in a WAL entry batch, but does not + /// indicate the decoder is consumed. + pub fn next_write_op_entry_batch(&mut self) -> Result>> { match self.reader.next_batch()? { Some(batch) => Ok(batch .into_iter() @@ -621,7 +631,7 @@ impl ClosedSegment { #[cfg(test)] mod tests { - use super::*; + use assert_matches::assert_matches; use data_types::{NamespaceId, SequenceNumber, TableId}; use dml::DmlWrite; use generated_types::influxdata::{ @@ -630,6 +640,10 @@ mod tests { }; use mutable_batch_lp::lines_to_batches; + use super::*; + + const TEST_NAMESPACE_ID: NamespaceId = NamespaceId::new(42); + #[tokio::test] async fn wal_write_and_read_ops() { let dir = test_helpers::tmp_dir().unwrap(); @@ -727,6 +741,64 @@ mod tests { ); } + #[tokio::test] + async fn decode_write_op_entries() { + let dir = test_helpers::tmp_dir().unwrap(); + let wal = Wal::new(&dir.path()).await.unwrap(); + + let w1 = test_data("m1,t=foo v=1i 1"); + let w2 = test_data("m1,t=foo v=2i 2"); + + let op1 = SequencedWalOp { + sequence_number: 0, + op: WalOp::Write(w1), + }; + let op2 = SequencedWalOp { + sequence_number: 1, + op: WalOp::Delete(test_delete()), + }; + let op3 = SequencedWalOp { + sequence_number: 1, + op: WalOp::Persist(test_persist()), + }; + // A second write entry coming after a delete and persist entry must still be yielded + let op4 = SequencedWalOp { + sequence_number: 2, + op: WalOp::Write(w2), + }; + + wal.write_op(op1.clone()); + wal.write_op(op2.clone()).changed().await.unwrap(); + wal.write_op(op3.clone()); + wal.write_op(op4.clone()).changed().await.unwrap(); + + let (closed, _) = wal.rotate().unwrap(); + + let mut decoder = WriteOpEntryDecoder::from_closed_segment( + wal.reader_for_segment(closed.id) + .expect("failed to open reader for closed WAL segment"), + ); + + let mut write_op_entries = vec![]; + while let Ok(Some(mut entry_batch)) = decoder.next_write_op_entry_batch() { + write_op_entries.append(&mut entry_batch); + } + // The decoder should find 2 entries, each containing a single table write + assert_eq!(write_op_entries.len(), 2); + assert_matches!(write_op_entries.get(0), Some(got_op1) => { + assert_eq!(got_op1.namespace, TEST_NAMESPACE_ID); + assert_eq!(got_op1.table_batches.len(), 1); + let mb = got_op1.table_batches.get(&0).expect("no mutable batch for table ID 0"); + assert_eq!(mb.column_names(), vec!["t", "v", "time"].into_iter().collect()); + }); + assert_matches!(write_op_entries.get(1), Some(got_op2) => { + assert_eq!(got_op2.namespace, TEST_NAMESPACE_ID); + assert_eq!(got_op2.table_batches.len(), 1); + let mb = got_op2.table_batches.get(&0).expect("no mutable batch for table ID 0"); + assert_eq!(mb.column_names(), vec!["t", "v", "time"].into_iter().collect()); + }); + } + fn test_data(lp: &str) -> DatabaseBatch { let batches = lines_to_batches(lp, 0).unwrap(); let batches = batches @@ -736,7 +808,7 @@ mod tests { .collect(); let write = DmlWrite::new( - NamespaceId::new(42), + TEST_NAMESPACE_ID, batches, "bananas".into(), Default::default(), @@ -747,7 +819,7 @@ mod tests { fn test_delete() -> DeletePayload { DeletePayload { - database_id: 42, + database_id: TEST_NAMESPACE_ID.get(), predicate: None, table_name: "bananas".into(), } @@ -755,7 +827,7 @@ mod tests { fn test_persist() -> PersistOp { PersistOp { - namespace_id: 42, + namespace_id: TEST_NAMESPACE_ID.get(), parquet_file_uuid: "b4N4N4Z".into(), partition_id: 43, table_id: 44, diff --git a/wal_inspect/src/lib.rs b/wal_inspect/src/lib.rs index 77fcdc3042..0b37839d80 100644 --- a/wal_inspect/src/lib.rs +++ b/wal_inspect/src/lib.rs @@ -127,12 +127,12 @@ mod tests { iox::wal::v1::sequenced_wal_op::Op, pbdata::v1::DatabaseBatch, }; use mutable_batch_lp::lines_to_batches; - use wal::{Error as WalError, SequencedWalOp, WriteOpDecoder}; + use wal::{Error as WalError, SequencedWalOp, WriteOpEntryDecoder}; use super::*; #[tokio::test] - async fn translate_good_wal() { + async fn translate_good_wal_segment_file() { let test_dir = test_helpers::tmp_dir().expect("failed to create test dir"); let wal = wal::Wal::new(test_dir.path()).await.unwrap(); @@ -163,7 +163,7 @@ mod tests { // Rotate the WAL and create the translator. let (closed, _) = wal.rotate().expect("failed to rotate WAL"); - let mut decoder = WriteOpDecoder::from_closed_segment( + let mut decoder = WriteOpEntryDecoder::from_closed_segment( wal.reader_for_segment(closed.id()) .expect("failed to open reader for closed segment"), ); @@ -172,7 +172,7 @@ mod tests { let mut decoded_entries = 0; let mut decoded_ops = 0; while let Some(new_entries) = decoder - .next_write_entry_batch() + .next_write_op_entry_batch() .expect("decoder error should not occur") { decoded_entries += 1; @@ -213,7 +213,7 @@ mod tests { } #[tokio::test] - async fn partial_translate_bad_wal() { + async fn partial_translate_bad_wal_segment_file() { let test_dir = test_helpers::tmp_dir().expect("failed to create test dir"); let wal = wal::Wal::new(test_dir.path()).await.unwrap(); @@ -269,7 +269,7 @@ mod tests { } // Create the translator and read as much as possible out of the bad segment file - let mut decoder = WriteOpDecoder::from_closed_segment( + let mut decoder = WriteOpEntryDecoder::from_closed_segment( wal.reader_for_segment(closed.id()) .expect("failed to open reader for closed segment"), ); @@ -278,7 +278,7 @@ mod tests { let mut decoded_entries = 0; let mut decoded_ops = 0; loop { - match decoder.next_write_entry_batch() { + match decoder.next_write_op_entry_batch() { // If the translator returns `None` indicating successful translation // then something is broken. Ok(v) => assert_matches!(v, Some(new_entries) => {