refactor(wal): Add test & docs for WriteOpEntryDecoder

Adds some documentation for the WriteOpEntryDecoder and
a unit test that asserts it skips over non write entries
and can continue to be consumed from.
pull/24376/head
Fraser Savage 2023-05-03 11:50:14 +01:00
parent f6dea224e8
commit b2e5ea2266
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
4 changed files with 88 additions and 14 deletions

1
Cargo.lock generated
View File

@ -6328,6 +6328,7 @@ dependencies = [
name = "wal"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"byteorder",
"bytes",

View File

@ -29,6 +29,7 @@ tokio-util = "0.7"
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies] # In alphabetical order
assert_matches = "1.5.0"
dml = { path = "../dml" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
test_helpers = { path = "../test_helpers" }

View File

@ -562,23 +562,33 @@ impl std::fmt::Debug for ClosedSegmentFileReader {
}
}
/// An in-memory representation of a WAL write operation entry.
#[derive(Debug)]
pub struct WriteOpEntry {
pub namespace: NamespaceId,
pub table_batches: HashMap<i64, MutableBatch>,
}
/// A decoder that reads from a closed segment file and parses write
/// operations from their on-disk format to an internal format.
#[derive(Debug)]
pub struct WriteOpDecoder {
pub struct WriteOpEntryDecoder {
reader: ClosedSegmentFileReader,
}
impl WriteOpDecoder {
impl WriteOpEntryDecoder {
/// Creates a decoder which will use the closed segment file of `reader` to
/// decode write ops from their on-disk format.
pub fn from_closed_segment(reader: ClosedSegmentFileReader) -> Self {
Self { reader }
}
pub fn next_write_entry_batch(&mut self) -> Result<Option<Vec<WriteOpEntry>>> {
/// Reads a collection of write op entries in the next WAL entry batch from the
/// underlying closed segment. A returned Ok(None) indicates that there are no
/// more entries to be decoded from the underlying segment. A zero-length vector
/// may be returned if there are no writes in a WAL entry batch, but does not
/// indicate the decoder is consumed.
pub fn next_write_op_entry_batch(&mut self) -> Result<Option<Vec<WriteOpEntry>>> {
match self.reader.next_batch()? {
Some(batch) => Ok(batch
.into_iter()
@ -621,7 +631,7 @@ impl ClosedSegment {
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use data_types::{NamespaceId, SequenceNumber, TableId};
use dml::DmlWrite;
use generated_types::influxdata::{
@ -630,6 +640,10 @@ mod tests {
};
use mutable_batch_lp::lines_to_batches;
use super::*;
const TEST_NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
#[tokio::test]
async fn wal_write_and_read_ops() {
let dir = test_helpers::tmp_dir().unwrap();
@ -727,6 +741,64 @@ mod tests {
);
}
#[tokio::test]
async fn decode_write_op_entries() {
let dir = test_helpers::tmp_dir().unwrap();
let wal = Wal::new(&dir.path()).await.unwrap();
let w1 = test_data("m1,t=foo v=1i 1");
let w2 = test_data("m1,t=foo v=2i 2");
let op1 = SequencedWalOp {
sequence_number: 0,
op: WalOp::Write(w1),
};
let op2 = SequencedWalOp {
sequence_number: 1,
op: WalOp::Delete(test_delete()),
};
let op3 = SequencedWalOp {
sequence_number: 1,
op: WalOp::Persist(test_persist()),
};
// A second write entry coming after a delete and persist entry must still be yielded
let op4 = SequencedWalOp {
sequence_number: 2,
op: WalOp::Write(w2),
};
wal.write_op(op1.clone());
wal.write_op(op2.clone()).changed().await.unwrap();
wal.write_op(op3.clone());
wal.write_op(op4.clone()).changed().await.unwrap();
let (closed, _) = wal.rotate().unwrap();
let mut decoder = WriteOpEntryDecoder::from_closed_segment(
wal.reader_for_segment(closed.id)
.expect("failed to open reader for closed WAL segment"),
);
let mut write_op_entries = vec![];
while let Ok(Some(mut entry_batch)) = decoder.next_write_op_entry_batch() {
write_op_entries.append(&mut entry_batch);
}
// The decoder should find 2 entries, each containing a single table write
assert_eq!(write_op_entries.len(), 2);
assert_matches!(write_op_entries.get(0), Some(got_op1) => {
assert_eq!(got_op1.namespace, TEST_NAMESPACE_ID);
assert_eq!(got_op1.table_batches.len(), 1);
let mb = got_op1.table_batches.get(&0).expect("no mutable batch for table ID 0");
assert_eq!(mb.column_names(), vec!["t", "v", "time"].into_iter().collect());
});
assert_matches!(write_op_entries.get(1), Some(got_op2) => {
assert_eq!(got_op2.namespace, TEST_NAMESPACE_ID);
assert_eq!(got_op2.table_batches.len(), 1);
let mb = got_op2.table_batches.get(&0).expect("no mutable batch for table ID 0");
assert_eq!(mb.column_names(), vec!["t", "v", "time"].into_iter().collect());
});
}
fn test_data(lp: &str) -> DatabaseBatch {
let batches = lines_to_batches(lp, 0).unwrap();
let batches = batches
@ -736,7 +808,7 @@ mod tests {
.collect();
let write = DmlWrite::new(
NamespaceId::new(42),
TEST_NAMESPACE_ID,
batches,
"bananas".into(),
Default::default(),
@ -747,7 +819,7 @@ mod tests {
fn test_delete() -> DeletePayload {
DeletePayload {
database_id: 42,
database_id: TEST_NAMESPACE_ID.get(),
predicate: None,
table_name: "bananas".into(),
}
@ -755,7 +827,7 @@ mod tests {
fn test_persist() -> PersistOp {
PersistOp {
namespace_id: 42,
namespace_id: TEST_NAMESPACE_ID.get(),
parquet_file_uuid: "b4N4N4Z".into(),
partition_id: 43,
table_id: 44,

View File

@ -127,12 +127,12 @@ mod tests {
iox::wal::v1::sequenced_wal_op::Op, pbdata::v1::DatabaseBatch,
};
use mutable_batch_lp::lines_to_batches;
use wal::{Error as WalError, SequencedWalOp, WriteOpDecoder};
use wal::{Error as WalError, SequencedWalOp, WriteOpEntryDecoder};
use super::*;
#[tokio::test]
async fn translate_good_wal() {
async fn translate_good_wal_segment_file() {
let test_dir = test_helpers::tmp_dir().expect("failed to create test dir");
let wal = wal::Wal::new(test_dir.path()).await.unwrap();
@ -163,7 +163,7 @@ mod tests {
// Rotate the WAL and create the translator.
let (closed, _) = wal.rotate().expect("failed to rotate WAL");
let mut decoder = WriteOpDecoder::from_closed_segment(
let mut decoder = WriteOpEntryDecoder::from_closed_segment(
wal.reader_for_segment(closed.id())
.expect("failed to open reader for closed segment"),
);
@ -172,7 +172,7 @@ mod tests {
let mut decoded_entries = 0;
let mut decoded_ops = 0;
while let Some(new_entries) = decoder
.next_write_entry_batch()
.next_write_op_entry_batch()
.expect("decoder error should not occur")
{
decoded_entries += 1;
@ -213,7 +213,7 @@ mod tests {
}
#[tokio::test]
async fn partial_translate_bad_wal() {
async fn partial_translate_bad_wal_segment_file() {
let test_dir = test_helpers::tmp_dir().expect("failed to create test dir");
let wal = wal::Wal::new(test_dir.path()).await.unwrap();
@ -269,7 +269,7 @@ mod tests {
}
// Create the translator and read as much as possible out of the bad segment file
let mut decoder = WriteOpDecoder::from_closed_segment(
let mut decoder = WriteOpEntryDecoder::from_closed_segment(
wal.reader_for_segment(closed.id())
.expect("failed to open reader for closed segment"),
);
@ -278,7 +278,7 @@ mod tests {
let mut decoded_entries = 0;
let mut decoded_ops = 0;
loop {
match decoder.next_write_entry_batch() {
match decoder.next_write_op_entry_batch() {
// If the translator returns `None` indicating successful translation
// then something is broken.
Ok(v) => assert_matches!(v, Some(new_entries) => {