refactor(wal): Use TableId type for look-aside map key

This adds a little extra layer of type safety and should be optimised
by the compiler. This commit also makes sure the ingester's WAL sink
tests assert the behaviour for partitioned sequence numbering on an
operation that hits multiple tables & thus partitions.
pull/24376/head
Fraser Savage 2023-06-08 11:39:23 +01:00
parent 6daec564d0
commit fad34c375e
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
5 changed files with 65 additions and 35 deletions

View File

@ -27,7 +27,7 @@ async fn init() -> tempfile::TempDir {
// Write a single line of LP to the WAL
wal.write_op(SequencedWalOp {
sequence_number: 42,
table_write_sequence_numbers: [(0, 42)].into_iter().collect(),
table_write_sequence_numbers: [(TableId::new(0), 42)].into_iter().collect(),
op: WalOp::Write(lp_to_writes("bananas,tag1=A,tag2=B val=42i 1")),
})
.changed()

View File

@ -110,7 +110,7 @@ impl WalAppender for Arc<wal::Wal> {
DmlOperation::Write(w) => {
let partition_sequence_numbers = w
.tables()
.map(|(table_id, _)| (table_id.get(), sequence_number))
.map(|(table_id, _)| (*table_id, sequence_number))
.collect();
(
Op::Write(encode_write(namespace_id.get(), w)),
@ -134,7 +134,9 @@ mod tests {
use std::{future, sync::Arc};
use assert_matches::assert_matches;
use data_types::{NamespaceId, PartitionKey, TableId};
use data_types::{NamespaceId, PartitionKey, SequenceNumber, TableId};
use dml::{DmlMeta, DmlWrite};
use mutable_batch_lp::lines_to_batches;
use wal::Wal;
use crate::{dml_sink::mock_sink::MockDmlSink, test_util::make_write_op};
@ -150,14 +152,36 @@ mod tests {
async fn test_append() {
let dir = tempfile::tempdir().unwrap();
// Generate the test op that will be appended and read back
let op = make_write_op(
&PartitionKey::from("p1"),
const SECOND_TABLE_ID: TableId = TableId::new(45);
const SECOND_TABLE_NAME: &str = "banani";
// Generate a test op containing writes for multiple tables that will
// be appended and read back
let mut tables_by_name = lines_to_batches(
r#"bananas,region=Madrid temp=35 4242424242
banani,region=Iceland temp=25 7676767676"#,
0,
)
.expect("invalid line proto");
let op = DmlWrite::new(
NAMESPACE_ID,
TABLE_NAME,
TABLE_ID,
42,
r#"bananas,region=Madrid temp=35 4242424242"#,
[
(
TABLE_ID,
tables_by_name
.remove(TABLE_NAME)
.expect("table does not exist in LP"),
),
(
SECOND_TABLE_ID,
tables_by_name
.remove(SECOND_TABLE_NAME)
.expect("second table does not exist in LP"),
),
]
.into_iter()
.collect(),
PartitionKey::from("p1"),
DmlMeta::sequenced(SequenceNumber::new(42), iox_time::Time::MIN, None, 42),
);
// The write portion of this test.
@ -204,9 +228,9 @@ mod tests {
assert_eq!(read_op.sequence_number, 42);
assert_eq!(
read_op.table_write_sequence_numbers,
[(TABLE_ID.get(), 42)]
[(TABLE_ID, 42), (SECOND_TABLE_ID, 42)]
.into_iter()
.collect::<std::collections::HashMap<i64, u64>>()
.collect::<std::collections::HashMap<TableId, u64>>()
);
let payload =
assert_matches!(&read_op.op, Op::Write(w) => w, "expected DML write WAL entry");

View File

@ -474,7 +474,7 @@ pub struct SequencedWalOp {
pub sequence_number: u64,
/// This mapping assigns a sequence number to table ID modified by this
/// write.
pub table_write_sequence_numbers: std::collections::HashMap<i64, u64>,
pub table_write_sequence_numbers: std::collections::HashMap<TableId, u64>,
/// The underlying WAL operation which this wrapper sequences.
pub op: WalOp,
}
@ -491,7 +491,10 @@ impl TryFrom<ProtoSequencedWalOp> for SequencedWalOp {
Ok(Self {
sequence_number,
table_write_sequence_numbers,
table_write_sequence_numbers: table_write_sequence_numbers
.into_iter()
.map(|(table_id, sequence_number)| (TableId::new(table_id), sequence_number))
.collect(),
op: op.unwrap_field("op")?,
})
}
@ -507,7 +510,10 @@ impl From<SequencedWalOp> for ProtoSequencedWalOp {
Self {
sequence_number,
table_write_sequence_numbers,
table_write_sequence_numbers: table_write_sequence_numbers
.into_iter()
.map(|(table_id, sequence_number)| (table_id.get(), sequence_number))
.collect(),
op: Some(op),
}
}
@ -694,22 +700,22 @@ mod tests {
let op1 = SequencedWalOp {
sequence_number: 0,
table_write_sequence_numbers: vec![(0, 0)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 0)].into_iter().collect(),
op: WalOp::Write(w1),
};
let op2 = SequencedWalOp {
sequence_number: 1,
table_write_sequence_numbers: vec![(0, 1)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 1)].into_iter().collect(),
op: WalOp::Write(w2),
};
let op3 = SequencedWalOp {
sequence_number: 2,
table_write_sequence_numbers: vec![(0, 2)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(),
op: WalOp::Delete(test_delete()),
};
let op4 = SequencedWalOp {
sequence_number: 2,
table_write_sequence_numbers: vec![(0, 2)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(),
op: WalOp::Persist(test_persist()),
};
@ -749,15 +755,15 @@ mod tests {
assert_eq!(
ops.into_iter()
.map(|op| op.table_write_sequence_numbers)
.collect::<Vec<std::collections::HashMap<i64, u64>>>(),
.collect::<Vec<std::collections::HashMap<TableId, u64>>>(),
[
[(0, 0)].into_iter().collect(),
[(0, 1)].into_iter().collect(),
[(0, 2)].into_iter().collect(),
[(0, 2)].into_iter().collect(),
[(TableId::new(0), 0)].into_iter().collect(),
[(TableId::new(0), 1)].into_iter().collect(),
[(TableId::new(0), 2)].into_iter().collect(),
[(TableId::new(0), 2)].into_iter().collect(),
]
.into_iter()
.collect::<Vec<std::collections::HashMap<i64, u64>>>(),
.collect::<Vec<std::collections::HashMap<TableId, u64>>>(),
);
}
@ -812,28 +818,28 @@ mod tests {
let op1 = SequencedWalOp {
sequence_number: 0,
table_write_sequence_numbers: vec![(0, 0)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 0)].into_iter().collect(),
op: WalOp::Write(w1.to_owned()),
};
let op2 = SequencedWalOp {
sequence_number: 1,
table_write_sequence_numbers: vec![(0, 1)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 1)].into_iter().collect(),
op: WalOp::Write(w2.to_owned()),
};
let op3 = SequencedWalOp {
sequence_number: 2,
table_write_sequence_numbers: vec![(0, 2)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(),
op: WalOp::Delete(test_delete()),
};
let op4 = SequencedWalOp {
sequence_number: 2,
table_write_sequence_numbers: vec![(0, 2)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(),
op: WalOp::Persist(test_persist()),
};
// A third write entry coming after a delete and persist entry must still be yielded
let op5 = SequencedWalOp {
sequence_number: 3,
table_write_sequence_numbers: vec![(0, 3)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(),
op: WalOp::Write(w3.to_owned()),
};
@ -878,7 +884,7 @@ mod tests {
let good_write = test_data("m3,a=baz b=4i 1");
wal.write_op(SequencedWalOp {
sequence_number: 0,
table_write_sequence_numbers: vec![(0, 0)].into_iter().collect(),
table_write_sequence_numbers: vec![(TableId::new(0), 0)].into_iter().collect(),
op: WalOp::Write(good_write.to_owned()),
})
.changed()

View File

@ -175,7 +175,7 @@ fn arbitrary_sequenced_wal_op(sequence_number: u64) -> SequencedWalOp {
table_write_sequence_numbers: w
.table_batches
.iter()
.map(|table_batch| (table_batch.table_id, sequence_number))
.map(|table_batch| (TableId::new(table_batch.table_id), sequence_number))
.collect(),
op: WalOp::Write(w),
}

View File

@ -205,17 +205,17 @@ mod tests {
// Generate a single entry
wal.write_op(SequencedWalOp {
sequence_number: 0,
table_write_sequence_numbers: [(1, 0)].into_iter().collect(),
table_write_sequence_numbers: [(TableId::new(1), 0)].into_iter().collect(),
op: Op::Write(encode_line(NamespaceId::new(1), &table_id_index, line1)),
});
wal.write_op(SequencedWalOp {
sequence_number: 1,
table_write_sequence_numbers: [(2, 1)].into_iter().collect(),
table_write_sequence_numbers: [(TableId::new(2), 1)].into_iter().collect(),
op: Op::Write(encode_line(NamespaceId::new(2), &table_id_index, line2)),
});
wal.write_op(SequencedWalOp {
sequence_number: 2,
table_write_sequence_numbers: [(1, 2)].into_iter().collect(),
table_write_sequence_numbers: [(TableId::new(1), 2)].into_iter().collect(),
op: Op::Write(encode_line(NamespaceId::new(1), &table_id_index, line3)),
})
.changed()