refactor(wal): Remove op-level `sequence_number`, use per table map

This commit removes the op-level sequence number from the proto
definition, now reading and writing solely to the per table (and thus
per partition) sequence number map. Tables/partitions within the same
write op are still assigned the same number for now, so there should be
no semantic different
pull/24376/head
Fraser Savage 2023-06-23 15:19:50 +01:00
parent 5815df5e6d
commit 30939cfe96
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
9 changed files with 51 additions and 51 deletions

View File

@ -22,7 +22,9 @@ message PersistOp {
// WAL operation with a sequence number, used to inform read buffers when to evict data
message SequencedWalOp {
uint64 sequence_number = 1;
// Was the op-level sequence number.
reserved "sequence_number";
reserved 1;
// A mapping which assigns a sequence number per table ID affected by this WAL
// operation.

View File

@ -61,9 +61,11 @@ where
let formatter = reader
.flatten_ok()
.filter_ok(|op| {
sequence_number_range
.as_ref()
.map_or(true, |range| range.contains(&op.sequence_number))
sequence_number_range.as_ref().map_or(true, |range| {
op.table_write_sequence_numbers
.values()
.any(|seq| range.contains(seq))
})
})
.format_with(",\n", |op, f| match op {
Ok(op) => f(&format_args!("{:#?}", op)),
@ -87,6 +89,7 @@ where
#[cfg(test)]
mod tests {
use data_types::TableId;
use generated_types::influxdata::iox::wal::v1::sequenced_wal_op::Op as WalOp;
use proptest::{prelude::*, prop_assert};
@ -94,8 +97,7 @@ mod tests {
fn arbitrary_sequence_wal_op(seq_number: u64) -> SequencedWalOp {
SequencedWalOp {
sequence_number: seq_number,
table_write_sequence_numbers: Default::default(),
table_write_sequence_numbers: [(TableId::new(0), seq_number)].into(),
op: WalOp::Write(Default::default()),
}
}
@ -126,8 +128,21 @@ mod tests {
// Expect two operations inspected, with the appropriate sequence numbers
assert_eq!(results.matches("SequencedWalOp").count(), 2);
assert_eq!(results.matches("sequence_number: 2").count(), 1);
assert_eq!(results.matches("sequence_number: 3").count(), 1);
// Strip the whitespace before checking the output
let results: String = results.chars().filter(|c| !c.is_whitespace()).collect();
assert_eq!(
results
.matches("table_write_sequence_numbers:{TableId(0,):2,},")
.count(),
1
);
assert_eq!(
results
.matches("table_write_sequence_numbers:{TableId(0,):3,},")
.count(),
1
);
}
proptest! {

View File

@ -29,7 +29,6 @@ async fn init() -> tempfile::TempDir {
// Write a single line of LP to the WAL
wal.write_op(SequencedWalOp {
sequence_number: 42,
table_write_sequence_numbers: [(TableId::new(0), 42)].into_iter().collect(),
op: WalOp::Write(lp_to_writes("bananas,tag1=A,tag2=B val=42i 1")),
})

View File

@ -194,12 +194,16 @@ where
for op in ops {
let SequencedWalOp {
sequence_number,
table_write_sequence_numbers: _, // TODO(savage): Use sequence numbers assigned per-partition
table_write_sequence_numbers, // TODO(savage): Use sequence numbers assigned per-partition
op,
} = op;
let sequence_number = SequenceNumber::new(sequence_number);
let sequence_number = SequenceNumber::new(
*table_write_sequence_numbers
.values()
.next()
.expect("attempt to replay unsequenced wal entry"),
);
max_sequence = max_sequence.max(Some(sequence_number));

View File

@ -116,10 +116,6 @@ impl WalAppender for Arc<wal::Wal> {
};
self.write_op(SequencedWalOp {
sequence_number: *partition_sequence_numbers
.values()
.next()
.expect("tried to append unsequenced WAL operation"),
table_write_sequence_numbers: partition_sequence_numbers,
op: wal_op,
})
@ -232,7 +228,6 @@ mod tests {
// Extract the op payload read from the WAL
let read_op = assert_matches!(&*ops, [op] => op, "expected 1 DML operation");
assert_eq!(read_op.sequence_number, 42);
assert_eq!(
read_op.table_write_sequence_numbers,
[(ARBITRARY_TABLE_ID, 42), (SECOND_TABLE_ID, 42)]

View File

@ -470,8 +470,6 @@ impl WalBuffer {
/// A wal operation with a sequence number
#[derive(Debug, PartialEq, Clone)]
pub struct SequencedWalOp {
/// The to-be-deprecated sequence number used to sequence WAL operations
pub sequence_number: u64,
/// This mapping assigns a sequence number to table ID modified by this
/// write.
pub table_write_sequence_numbers: std::collections::HashMap<TableId, u64>,
@ -484,13 +482,11 @@ impl TryFrom<ProtoSequencedWalOp> for SequencedWalOp {
fn try_from(proto: ProtoSequencedWalOp) -> Result<Self, Self::Error> {
let ProtoSequencedWalOp {
sequence_number,
table_write_sequence_numbers,
op,
} = proto;
Ok(Self {
sequence_number,
table_write_sequence_numbers: table_write_sequence_numbers
.into_iter()
.map(|(table_id, sequence_number)| (TableId::new(table_id), sequence_number))
@ -503,13 +499,11 @@ impl TryFrom<ProtoSequencedWalOp> for SequencedWalOp {
impl From<SequencedWalOp> for ProtoSequencedWalOp {
fn from(seq_op: SequencedWalOp) -> Self {
let SequencedWalOp {
sequence_number,
table_write_sequence_numbers,
op,
} = seq_op;
Self {
sequence_number,
table_write_sequence_numbers: table_write_sequence_numbers
.into_iter()
.map(|(table_id, sequence_number)| (table_id.get(), sequence_number))
@ -705,22 +699,18 @@ mod tests {
let w2 = test_data("m1,t=foo v=2i 2");
let op1 = SequencedWalOp {
sequence_number: 0,
table_write_sequence_numbers: vec![(TableId::new(0), 0)].into_iter().collect(),
op: WalOp::Write(w1),
};
let op2 = SequencedWalOp {
sequence_number: 1,
table_write_sequence_numbers: vec![(TableId::new(0), 1)].into_iter().collect(),
op: WalOp::Write(w2),
};
let op3 = SequencedWalOp {
sequence_number: 2,
table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(),
op: WalOp::Delete(test_delete()),
};
let op4 = SequencedWalOp {
sequence_number: 2,
table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(),
op: WalOp::Persist(test_persist()),
};
@ -822,28 +812,23 @@ mod tests {
let w3 = test_data("m1,t=foo v=3i 3");
let op1 = SequencedWalOp {
sequence_number: 0,
table_write_sequence_numbers: vec![(TableId::new(0), 0)].into_iter().collect(),
op: WalOp::Write(w1.to_owned()),
};
let op2 = SequencedWalOp {
sequence_number: 1,
table_write_sequence_numbers: vec![(TableId::new(0), 1)].into_iter().collect(),
op: WalOp::Write(w2.to_owned()),
};
let op3 = SequencedWalOp {
sequence_number: 2,
table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(),
op: WalOp::Delete(test_delete()),
};
let op4 = SequencedWalOp {
sequence_number: 2,
table_write_sequence_numbers: vec![(TableId::new(0), 2)].into_iter().collect(),
op: WalOp::Persist(test_persist()),
};
// A third write entry coming after a delete and persist entry must still be yielded
let op5 = SequencedWalOp {
sequence_number: 3,
table_write_sequence_numbers: vec![(TableId::new(0), 3)].into_iter().collect(),
op: WalOp::Write(w3.to_owned()),
};
@ -888,7 +873,6 @@ mod tests {
// Log a write operation to test recovery from a tail-corrupted WAL.
let good_write = test_data("m3,a=baz b=4i 1");
wal.write_op(SequencedWalOp {
sequence_number: 0,
table_write_sequence_numbers: vec![(TableId::new(0), 0)].into_iter().collect(),
op: WalOp::Write(good_write.to_owned()),
})

View File

@ -122,7 +122,16 @@ impl WriterIoThread {
.ops
.into_iter()
.map(|v| {
let id = SequenceNumber::new(v.sequence_number as _);
// TODO(savage): Extract all [`SequenceNumber`] used for
// the op and include them in the batch once the tables
// sequenced independently.
let id = SequenceNumber::new(
*v.table_write_sequence_numbers
.values()
.next()
.expect("attempt to encode unsequence wal operation")
as _,
);
(proto::SequencedWalOp::from(v), id)
})
.unzip();

View File

@ -24,14 +24,14 @@ async fn crud() {
// Can write an entry to the open segment
let op = arbitrary_sequenced_wal_op(42);
let summary = unwrap_summary(wal.write_op(op)).await;
assert_eq!(summary.total_bytes, 128);
assert_eq!(summary.bytes_written, 112);
assert_eq!(summary.total_bytes, 126);
assert_eq!(summary.bytes_written, 110);
// Can write another entry; total_bytes accumulates
let op = arbitrary_sequenced_wal_op(43);
let summary = unwrap_summary(wal.write_op(op)).await;
assert_eq!(summary.total_bytes, 240);
assert_eq!(summary.bytes_written, 112);
assert_eq!(summary.total_bytes, 236);
assert_eq!(summary.bytes_written, 110);
// Still no closed segments
let closed = wal.closed_segments();
@ -42,7 +42,7 @@ async fn crud() {
// Can't read entries from the open segment; have to rotate first
let (closed_segment_details, ids) = wal.rotate().unwrap();
assert_eq!(closed_segment_details.size(), 240);
assert_eq!(closed_segment_details.size(), 236);
assert_eq!(
ids.iter().collect::<Vec<_>>(),
[SequenceNumber::new(42), SequenceNumber::new(43)]
@ -58,17 +58,15 @@ async fn crud() {
// op-level sequence number while it is the source of truth.
let mut reader = wal.reader_for_segment(closed_segment_details.id()).unwrap();
let op = reader.next().unwrap().unwrap();
assert_eq!(op[0].sequence_number, 42);
op[0]
.table_write_sequence_numbers
.values()
.for_each(|sequence_number| assert_eq!(*sequence_number, op[0].sequence_number));
.for_each(|sequence_number| assert_eq!(*sequence_number, 42));
let op = reader.next().unwrap().unwrap();
assert_eq!(op[0].sequence_number, 43);
op[0]
.table_write_sequence_numbers
.values()
.for_each(|sequence_number| assert_eq!(*sequence_number, op[0].sequence_number));
.for_each(|sequence_number| assert_eq!(*sequence_number, 43));
// Can delete a segment, leaving no closed segments again
wal.delete(closed_segment_details.id()).await.unwrap();
@ -108,20 +106,18 @@ async fn replay() {
// op-level sequence number while it is the source of truth.
let mut reader = wal.reader_for_segment(closed_segment_ids[0]).unwrap();
let op = reader.next().unwrap().unwrap();
assert_eq!(op[0].sequence_number, 42);
op[0]
.table_write_sequence_numbers
.values()
.for_each(|sequence_number| assert_eq!(*sequence_number, op[0].sequence_number));
.for_each(|sequence_number| assert_eq!(*sequence_number, 42));
// Can read the written entries from the previously open segment
let mut reader = wal.reader_for_segment(closed_segment_ids[1]).unwrap();
let op = reader.next().unwrap().unwrap();
assert_eq!(op[0].sequence_number, 43);
op[0]
.table_write_sequence_numbers
.values()
.for_each(|sequence_number| assert_eq!(*sequence_number, op[0].sequence_number));
.for_each(|sequence_number| assert_eq!(*sequence_number, 43));
}
#[tokio::test]
@ -171,7 +167,6 @@ async fn ordering() {
fn arbitrary_sequenced_wal_op(sequence_number: u64) -> SequencedWalOp {
let w = test_data("m1,t=foo v=1i 1");
SequencedWalOp {
sequence_number,
table_write_sequence_numbers: w
.table_batches
.iter()

View File

@ -204,17 +204,14 @@ mod tests {
// Generate a single entry
wal.write_op(SequencedWalOp {
sequence_number: 0,
table_write_sequence_numbers: [(TableId::new(1), 0)].into_iter().collect(),
op: Op::Write(encode_line(NamespaceId::new(1), &table_id_index, line1)),
});
wal.write_op(SequencedWalOp {
sequence_number: 1,
table_write_sequence_numbers: [(TableId::new(2), 1)].into_iter().collect(),
op: Op::Write(encode_line(NamespaceId::new(2), &table_id_index, line2)),
});
wal.write_op(SequencedWalOp {
sequence_number: 2,
table_write_sequence_numbers: [(TableId::new(1), 2)].into_iter().collect(),
op: Op::Write(encode_line(NamespaceId::new(1), &table_id_index, line3)),
})