feat(wal): Make `wal` WriteOpEntryDecoder an interator

Also, implement drop on `wal_inspect`'s LineProtoWriter and
bubble up flush to the caller.
pull/24376/head
Fraser Savage 2023-05-15 20:46:45 +01:00
parent 6cdc95e49d
commit fcd80060be
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
2 changed files with 98 additions and 73 deletions

View File

@ -585,36 +585,43 @@ pub struct WriteOpEntryDecoder {
reader: ClosedSegmentFileReader,
}
impl WriteOpEntryDecoder {
impl From<ClosedSegmentFileReader> for WriteOpEntryDecoder {
/// Creates a decoder which will use the closed segment file of `reader` to
/// decode write ops from their on-disk format.
pub fn from_closed_segment(reader: ClosedSegmentFileReader) -> Self {
fn from(reader: ClosedSegmentFileReader) -> Self {
Self { reader }
}
}
impl Iterator for WriteOpEntryDecoder {
type Item = Result<Vec<WriteOpEntry>, DecodeError>;
/// Reads a collection of write op entries in the next WAL entry batch from the
/// underlying closed segment. A returned Ok(None) indicates that there are no
/// more entries to be decoded from the underlying segment. A zero-length vector
/// may be returned if there are no writes in a WAL entry batch, but does not
/// indicate the decoder is consumed.
pub fn next_write_op_entry_batch(&mut self) -> Result<Option<Vec<WriteOpEntry>>, DecodeError> {
match self.reader.next_batch().context(FailedToReadWalSnafu)? {
Some(batch) => Ok(batch
.into_iter()
.filter_map(|sequenced_op| match sequenced_op.op {
WalOp::Write(w) => Some(w),
_ => None,
})
.map(|w| -> Result<WriteOpEntry, DecodeError> {
Ok(WriteOpEntry {
namespace: NamespaceId::new(w.database_id),
table_batches: decode_database_batch(&w)
.context(UnableToCreateMutableBatchSnafu)?,
fn next(&mut self) -> Option<Self::Item> {
match self.reader.next_batch().context(FailedToReadWalSnafu) {
Ok(Some(batch)) => Some(
batch
.into_iter()
.filter_map(|sequenced_op| match sequenced_op.op {
WalOp::Write(w) => Some(w),
WalOp::Delete(..) => None,
WalOp::Persist(..) => None,
})
})
.collect::<Result<Vec<WriteOpEntry>, DecodeError>>()?
.into()),
None => Ok(None),
.map(|w| -> Result<WriteOpEntry, DecodeError> {
Ok(WriteOpEntry {
namespace: NamespaceId::new(w.database_id),
table_batches: decode_database_batch(&w)
.context(UnableToCreateMutableBatchSnafu)?,
})
})
.collect::<Self::Item>(),
),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}
@ -783,15 +790,15 @@ mod tests {
let (closed, _) = wal.rotate().unwrap();
let mut decoder = WriteOpEntryDecoder::from_closed_segment(
let decoder = WriteOpEntryDecoder::from(
wal.reader_for_segment(closed.id)
.expect("failed to open reader for closed WAL segment"),
);
let mut write_op_entries = vec![];
while let Ok(Some(mut entry_batch)) = decoder.next_write_op_entry_batch() {
write_op_entries.append(&mut entry_batch);
}
let write_op_entries = decoder
.into_iter()
.flat_map(|r| r.expect("unexpected bad entry"))
.collect::<Vec<_>>();
// The decoder should find 2 entries, each containing a single table write
assert_eq!(write_op_entries.len(), 2);
assert_matches!(write_op_entries.get(0), Some(got_op1) => {

View File

@ -43,13 +43,38 @@ pub enum WriteError {
/// Provides namespaced write functionality from table-based mutable batches
/// to namespaced line protocol output.
#[derive(Debug)]
pub struct LineProtoWriter<W, F> {
pub struct LineProtoWriter<W, F>
where
W: Write,
{
namespaced_output: HashMap<NamespaceId, W>,
new_write_sink: F,
table_name_index: HashMap<TableId, String>,
}
impl<W, F> LineProtoWriter<W, F>
where
W: Write,
{
/// Flushes all write destinations opened by the [`LineProtoWriter`].
pub fn flush(&mut self) -> Result<(), WriteError> {
for w in self.namespaced_output.values_mut() {
w.flush()?
}
Ok(())
}
}
impl<W, F> Drop for LineProtoWriter<W, F>
where
W: Write,
{
fn drop(&mut self) {
_ = self.flush()
}
}
impl<W, F> LineProtoWriter<W, F>
where
W: Write,
@ -80,13 +105,7 @@ where
.entry(ns)
.or_insert((self.new_write_sink)(ns)?);
match write_batches_as_line_proto(sink, &self.table_name_index, table_batches) {
Ok(_) => sink.flush().map_err(WriteError::IoError),
Err(e) => {
_ = sink.flush();
Err(e)
}
}
write_batches_as_line_proto(sink, &self.table_name_index, table_batches)
}
}
@ -127,7 +146,7 @@ mod tests {
iox::wal::v1::sequenced_wal_op::Op, pbdata::v1::DatabaseBatch,
};
use mutable_batch_lp::lines_to_batches;
use wal::{DecodeError, SequencedWalOp, WriteOpEntryDecoder};
use wal::{DecodeError, SequencedWalOp, WriteOpEntry, WriteOpEntryDecoder};
use super::*;
@ -163,32 +182,31 @@ mod tests {
// Rotate the WAL and create the translator.
let (closed, _) = wal.rotate().expect("failed to rotate WAL");
let mut decoder = WriteOpEntryDecoder::from_closed_segment(
let decoder = WriteOpEntryDecoder::from(
wal.reader_for_segment(closed.id())
.expect("failed to open reader for closed segment"),
);
let mut writer = LineProtoWriter::new(|_| Ok(Vec::<u8>::new()), table_name_index);
let mut decoded_entries = 0;
let mut decoded_ops = 0;
while let Some(new_entries) = decoder
.next_write_op_entry_batch()
.expect("decoder error should not occur")
{
decoded_entries += 1;
decoded_ops += new_entries.len();
let decoded_entries = decoder
.into_iter()
.map(|r| r.expect("unexpected bad entry"))
.collect::<Vec<_>>();
assert_eq!(decoded_entries.len(), 1);
let decoded_ops = decoded_entries
.into_iter()
.flatten()
.collect::<Vec<WriteOpEntry>>();
assert_eq!(decoded_ops.len(), 3);
for entry in new_entries {
writer
.write_namespaced_table_batches(entry.namespace, entry.table_batches)
.expect("batch write should not fail");
}
for entry in decoded_ops {
writer
.write_namespaced_table_batches(entry.namespace, entry.table_batches)
.expect("batch write should not fail");
}
// The WAL has been given a single entry containing three write ops
assert_eq!(decoded_entries, 1);
assert_eq!(decoded_ops, 3);
let results = writer.namespaced_output;
let results = &writer.namespaced_output;
// Assert that the namespaced writes contain ONLY the following:
//
@ -269,36 +287,36 @@ mod tests {
}
// Create the translator and read as much as possible out of the bad segment file
let mut decoder = WriteOpEntryDecoder::from_closed_segment(
let decoder = WriteOpEntryDecoder::from(
wal.reader_for_segment(closed.id())
.expect("failed to open reader for closed segment"),
);
let mut writer = LineProtoWriter::new(|_| Ok(Vec::<u8>::new()), table_name_index);
let mut decoded_entries = 0;
let mut decoded_ops = 0;
loop {
match decoder.next_write_op_entry_batch() {
// If the translator returns `None` indicating successful translation
// then something is broken.
Ok(v) => assert_matches!(v, Some(new_entries) => {
decoded_entries += 1;
decoded_ops += new_entries.len();
for entry in new_entries {
writer.write_namespaced_table_batches(entry.namespace, entry.table_batches).expect("batch write should not fail");
}
}),
Err(e) => {
assert_matches!(e, DecodeError::FailedToReadWal { .. });
break;
}
};
// The translator should be able to read all 2 good entries containing 4 write ops
let decoded_entries = decoder
.into_iter()
.map_while(|r| {
r.map_err(|e| match e {
DecodeError::FailedToReadWal { .. } => None::<()>,
_ => panic!("unexpected error"),
})
.ok()
})
.collect::<Vec<_>>();
assert_eq!(decoded_entries.len(), 2);
let decoded_ops = decoded_entries
.into_iter()
.flatten()
.collect::<Vec<WriteOpEntry>>();
assert_eq!(decoded_ops.len(), 4);
for entry in decoded_ops {
writer
.write_namespaced_table_batches(entry.namespace, entry.table_batches)
.expect("batch write should not fail");
}
// The translator should have read all 2 good entries containing 4 write ops
assert_eq!(decoded_entries, 2);
assert_eq!(decoded_ops, 4);
let results = writer.namespaced_output;
let results = &writer.namespaced_output;
// Assert that the namespaced writes contain ONLY the following:
//