diff --git a/ingester/benches/wal.rs b/ingester/benches/wal.rs index 75f99a3d84..08befd3e85 100644 --- a/ingester/benches/wal.rs +++ b/ingester/benches/wal.rs @@ -27,7 +27,7 @@ async fn init() -> tempfile::TempDir { // Write a single line of LP to the WAL wal.write_op(SequencedWalOp { sequence_number: 42, - table_write_sequence_numbers: [(0, 42)].into_iter().collect(), + table_write_sequence_numbers: [(TableId::new(0), 42)].into_iter().collect(), op: WalOp::Write(lp_to_writes("bananas,tag1=A,tag2=B val=42i 1")), }) .changed() diff --git a/ingester/src/wal/wal_sink.rs b/ingester/src/wal/wal_sink.rs index 2eadefcf0b..e9299aa431 100644 --- a/ingester/src/wal/wal_sink.rs +++ b/ingester/src/wal/wal_sink.rs @@ -110,7 +110,7 @@ impl WalAppender for Arc { DmlOperation::Write(w) => { let partition_sequence_numbers = w .tables() - .map(|(table_id, _)| (table_id.get(), sequence_number)) + .map(|(table_id, _)| (*table_id, sequence_number)) .collect(); ( Op::Write(encode_write(namespace_id.get(), w)), @@ -134,7 +134,9 @@ mod tests { use std::{future, sync::Arc}; use assert_matches::assert_matches; - use data_types::{NamespaceId, PartitionKey, TableId}; + use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId}; + use dml::{DmlMeta, DmlWrite}; + use mutable_batch_lp::lines_to_batches; use wal::Wal; use crate::{dml_sink::mock_sink::MockDmlSink, test_util::make_write_op}; @@ -150,14 +152,36 @@ mod tests { async fn test_append() { let dir = tempfile::tempdir().unwrap(); - // Generate the test op that will be appended and read back - let op = make_write_op( - &PartitionKey::from("p1"), + const SECOND_TABLE_ID: TableId = TableId::new(45); + const SECOND_TABLE_NAME: &str = "banani"; + // Generate a test op containing writes for multiple tables that will + // be appended and read back + let mut tables_by_name = lines_to_batches( + r#"bananas,region=Madrid temp=35 4242424242 +banani,region=Iceland temp=25 7676767676"#, + 0, + ) + .expect("invalid line proto"); + let op = DmlWrite::new( NAMESPACE_ID, - TABLE_NAME, - TABLE_ID, - 42, - r#"bananas,region=Madrid temp=35 4242424242"#, + [ + ( + TABLE_ID, + tables_by_name + .remove(TABLE_NAME) + .expect("table does not exist in LP"), + ), + ( + SECOND_TABLE_ID, + tables_by_name + .remove(SECOND_TABLE_NAME) + .expect("second table does not exist in LP"), + ), + ] + .into_iter() + .collect(), + PartitionKey::from("p1"), + DmlMeta::sequenced(SequenceNumber::new(42), iox_time::Time::MIN, None, 42), ); // The write portion of this test. @@ -204,9 +228,9 @@ mod tests { assert_eq!(read_op.sequence_number, 42); assert_eq!( read_op.table_write_sequence_numbers, - [(TABLE_ID.get(), 42)] + [(TABLE_ID, 42), (SECOND_TABLE_ID, 42)] .into_iter() - .collect::>() + .collect::>() ); let payload = assert_matches!(&read_op.op, Op::Write(w) => w, "expected DML write WAL entry"); diff --git a/wal/src/lib.rs b/wal/src/lib.rs index 4ac9770f5d..98d7347a41 100644 --- a/wal/src/lib.rs +++ b/wal/src/lib.rs @@ -474,7 +474,7 @@ pub struct SequencedWalOp { pub sequence_number: u64, /// This mapping assigns a sequence number to table ID modified by this /// write. - pub table_write_sequence_numbers: std::collections::HashMap, + pub table_write_sequence_numbers: std::collections::HashMap, /// The underlying WAL operation which this wrapper sequences. pub op: WalOp, } @@ -491,7 +491,10 @@ impl TryFrom for SequencedWalOp { Ok(Self { sequence_number, - table_write_sequence_numbers, + table_write_sequence_numbers: table_write_sequence_numbers + .into_iter() + .map(|(table_id, sequence_number)| (TableId::new(table_id), sequence_number)) + .collect(), op: op.unwrap_field("op")?, }) } @@ -507,7 +510,10 @@ impl From for ProtoSequencedWalOp { Self { sequence_number, - table_write_sequence_numbers, + table_write_sequence_numbers: table_write_sequence_numbers + .into_iter() + .map(|(table_id, sequence_number)| (table_id.get(), sequence_number)) + .collect(), op: Some(op), } } @@ -694,22 +700,22 @@ mod tests { let op1 = SequencedWalOp { sequence_number: 0, - table_write_sequence_numbers: vec![(0, 0)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 0)].into_iter().collect(), op: WalOp::Write(w1), }; let op2 = SequencedWalOp { sequence_number: 1, - table_write_sequence_numbers: vec![(0, 1)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 1)].into_iter().collect(), op: WalOp::Write(w2), }; let op3 = SequencedWalOp { sequence_number: 2, - table_write_sequence_numbers: vec![(0, 2)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(), op: WalOp::Delete(test_delete()), }; let op4 = SequencedWalOp { sequence_number: 2, - table_write_sequence_numbers: vec![(0, 2)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(), op: WalOp::Persist(test_persist()), }; @@ -749,15 +755,15 @@ mod tests { assert_eq!( ops.into_iter() .map(|op| op.table_write_sequence_numbers) - .collect::>>(), + .collect::>>(), [ - [(0, 0)].into_iter().collect(), - [(0, 1)].into_iter().collect(), - [(0, 2)].into_iter().collect(), - [(0, 2)].into_iter().collect(), + [(TableId::new(0), 0)].into_iter().collect(), + [(TableId::new(0), 1)].into_iter().collect(), + [(TableId::new(0), 2)].into_iter().collect(), + [(TableId::new(0), 2)].into_iter().collect(), ] .into_iter() - .collect::>>(), + .collect::>>(), ); } @@ -812,28 +818,28 @@ mod tests { let op1 = SequencedWalOp { sequence_number: 0, - table_write_sequence_numbers: vec![(0, 0)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 0)].into_iter().collect(), op: WalOp::Write(w1.to_owned()), }; let op2 = SequencedWalOp { sequence_number: 1, - table_write_sequence_numbers: vec![(0, 1)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 1)].into_iter().collect(), op: WalOp::Write(w2.to_owned()), }; let op3 = SequencedWalOp { sequence_number: 2, - table_write_sequence_numbers: vec![(0, 2)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(), op: WalOp::Delete(test_delete()), }; let op4 = SequencedWalOp { sequence_number: 2, - table_write_sequence_numbers: vec![(0, 2)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(), op: WalOp::Persist(test_persist()), }; // A third write entry coming after a delete and persist entry must still be yielded let op5 = SequencedWalOp { sequence_number: 3, - table_write_sequence_numbers: vec![(0, 3)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(), op: WalOp::Write(w3.to_owned()), }; @@ -878,7 +884,7 @@ mod tests { let good_write = test_data("m3,a=baz b=4i 1"); wal.write_op(SequencedWalOp { sequence_number: 0, - table_write_sequence_numbers: vec![(0, 0)].into_iter().collect(), + table_write_sequence_numbers: vec![(TableId::new(0), 0)].into_iter().collect(), op: WalOp::Write(good_write.to_owned()), }) .changed() diff --git a/wal/tests/end_to_end.rs b/wal/tests/end_to_end.rs index 0dcdda2dda..f01b86f9c0 100644 --- a/wal/tests/end_to_end.rs +++ b/wal/tests/end_to_end.rs @@ -175,7 +175,7 @@ fn arbitrary_sequenced_wal_op(sequence_number: u64) -> SequencedWalOp { table_write_sequence_numbers: w .table_batches .iter() - .map(|table_batch| (table_batch.table_id, sequence_number)) + .map(|table_batch| (TableId::new(table_batch.table_id), sequence_number)) .collect(), op: WalOp::Write(w), } diff --git a/wal_inspect/src/lib.rs b/wal_inspect/src/lib.rs index c88af1baab..45fa70c132 100644 --- a/wal_inspect/src/lib.rs +++ b/wal_inspect/src/lib.rs @@ -205,17 +205,17 @@ mod tests { // Generate a single entry wal.write_op(SequencedWalOp { sequence_number: 0, - table_write_sequence_numbers: [(1, 0)].into_iter().collect(), + table_write_sequence_numbers: [(TableId::new(1), 0)].into_iter().collect(), op: Op::Write(encode_line(NamespaceId::new(1), &table_id_index, line1)), }); wal.write_op(SequencedWalOp { sequence_number: 1, - table_write_sequence_numbers: [(2, 1)].into_iter().collect(), + table_write_sequence_numbers: [(TableId::new(2), 1)].into_iter().collect(), op: Op::Write(encode_line(NamespaceId::new(2), &table_id_index, line2)), }); wal.write_op(SequencedWalOp { sequence_number: 2, - table_write_sequence_numbers: [(1, 2)].into_iter().collect(), + table_write_sequence_numbers: [(TableId::new(1), 2)].into_iter().collect(), op: Op::Write(encode_line(NamespaceId::new(1), &table_id_index, line3)), }) .changed()