diff --git a/wal/src/lib.rs b/wal/src/lib.rs index 54058d5e2e..1ecbfbd742 100644 --- a/wal/src/lib.rs +++ b/wal/src/lib.rs @@ -585,36 +585,43 @@ pub struct WriteOpEntryDecoder { reader: ClosedSegmentFileReader, } -impl WriteOpEntryDecoder { +impl From for WriteOpEntryDecoder { /// Creates a decoder which will use the closed segment file of `reader` to /// decode write ops from their on-disk format. - pub fn from_closed_segment(reader: ClosedSegmentFileReader) -> Self { + fn from(reader: ClosedSegmentFileReader) -> Self { Self { reader } } +} + +impl Iterator for WriteOpEntryDecoder { + type Item = Result, DecodeError>; /// Reads a collection of write op entries in the next WAL entry batch from the /// underlying closed segment. A returned Ok(None) indicates that there are no /// more entries to be decoded from the underlying segment. A zero-length vector /// may be returned if there are no writes in a WAL entry batch, but does not /// indicate the decoder is consumed. - pub fn next_write_op_entry_batch(&mut self) -> Result>, DecodeError> { - match self.reader.next_batch().context(FailedToReadWalSnafu)? { - Some(batch) => Ok(batch - .into_iter() - .filter_map(|sequenced_op| match sequenced_op.op { - WalOp::Write(w) => Some(w), - _ => None, - }) - .map(|w| -> Result { - Ok(WriteOpEntry { - namespace: NamespaceId::new(w.database_id), - table_batches: decode_database_batch(&w) - .context(UnableToCreateMutableBatchSnafu)?, + fn next(&mut self) -> Option { + match self.reader.next_batch().context(FailedToReadWalSnafu) { + Ok(Some(batch)) => Some( + batch + .into_iter() + .filter_map(|sequenced_op| match sequenced_op.op { + WalOp::Write(w) => Some(w), + WalOp::Delete(..) => None, + WalOp::Persist(..) => None, }) - }) - .collect::, DecodeError>>()? - .into()), - None => Ok(None), + .map(|w| -> Result { + Ok(WriteOpEntry { + namespace: NamespaceId::new(w.database_id), + table_batches: decode_database_batch(&w) + .context(UnableToCreateMutableBatchSnafu)?, + }) + }) + .collect::(), + ), + Ok(None) => None, + Err(e) => Some(Err(e)), } } } @@ -783,15 +790,15 @@ mod tests { let (closed, _) = wal.rotate().unwrap(); - let mut decoder = WriteOpEntryDecoder::from_closed_segment( + let decoder = WriteOpEntryDecoder::from( wal.reader_for_segment(closed.id) .expect("failed to open reader for closed WAL segment"), ); - let mut write_op_entries = vec![]; - while let Ok(Some(mut entry_batch)) = decoder.next_write_op_entry_batch() { - write_op_entries.append(&mut entry_batch); - } + let write_op_entries = decoder + .into_iter() + .flat_map(|r| r.expect("unexpected bad entry")) + .collect::>(); // The decoder should find 2 entries, each containing a single table write assert_eq!(write_op_entries.len(), 2); assert_matches!(write_op_entries.get(0), Some(got_op1) => { diff --git a/wal_inspect/src/lib.rs b/wal_inspect/src/lib.rs index c0a5e12e08..ea913f107b 100644 --- a/wal_inspect/src/lib.rs +++ b/wal_inspect/src/lib.rs @@ -43,13 +43,38 @@ pub enum WriteError { /// Provides namespaced write functionality from table-based mutable batches /// to namespaced line protocol output. #[derive(Debug)] -pub struct LineProtoWriter { +pub struct LineProtoWriter +where + W: Write, +{ namespaced_output: HashMap, new_write_sink: F, table_name_index: HashMap, } +impl LineProtoWriter +where + W: Write, +{ + /// Flushes all write destinations opened by the [`LineProtoWriter`]. + pub fn flush(&mut self) -> Result<(), WriteError> { + for w in self.namespaced_output.values_mut() { + w.flush()? + } + Ok(()) + } +} + +impl Drop for LineProtoWriter +where + W: Write, +{ + fn drop(&mut self) { + _ = self.flush() + } +} + impl LineProtoWriter where W: Write, @@ -80,13 +105,7 @@ where .entry(ns) .or_insert((self.new_write_sink)(ns)?); - match write_batches_as_line_proto(sink, &self.table_name_index, table_batches) { - Ok(_) => sink.flush().map_err(WriteError::IoError), - Err(e) => { - _ = sink.flush(); - Err(e) - } - } + write_batches_as_line_proto(sink, &self.table_name_index, table_batches) } } @@ -127,7 +146,7 @@ mod tests { iox::wal::v1::sequenced_wal_op::Op, pbdata::v1::DatabaseBatch, }; use mutable_batch_lp::lines_to_batches; - use wal::{DecodeError, SequencedWalOp, WriteOpEntryDecoder}; + use wal::{DecodeError, SequencedWalOp, WriteOpEntry, WriteOpEntryDecoder}; use super::*; @@ -163,32 +182,31 @@ mod tests { // Rotate the WAL and create the translator. let (closed, _) = wal.rotate().expect("failed to rotate WAL"); - let mut decoder = WriteOpEntryDecoder::from_closed_segment( + let decoder = WriteOpEntryDecoder::from( wal.reader_for_segment(closed.id()) .expect("failed to open reader for closed segment"), ); let mut writer = LineProtoWriter::new(|_| Ok(Vec::::new()), table_name_index); - let mut decoded_entries = 0; - let mut decoded_ops = 0; - while let Some(new_entries) = decoder - .next_write_op_entry_batch() - .expect("decoder error should not occur") - { - decoded_entries += 1; - decoded_ops += new_entries.len(); + let decoded_entries = decoder + .into_iter() + .map(|r| r.expect("unexpected bad entry")) + .collect::>(); + assert_eq!(decoded_entries.len(), 1); + let decoded_ops = decoded_entries + .into_iter() + .flatten() + .collect::>(); + assert_eq!(decoded_ops.len(), 3); - for entry in new_entries { - writer - .write_namespaced_table_batches(entry.namespace, entry.table_batches) - .expect("batch write should not fail"); - } + for entry in decoded_ops { + writer + .write_namespaced_table_batches(entry.namespace, entry.table_batches) + .expect("batch write should not fail"); } // The WAL has been given a single entry containing three write ops - assert_eq!(decoded_entries, 1); - assert_eq!(decoded_ops, 3); - let results = writer.namespaced_output; + let results = &writer.namespaced_output; // Assert that the namespaced writes contain ONLY the following: // @@ -269,36 +287,36 @@ mod tests { } // Create the translator and read as much as possible out of the bad segment file - let mut decoder = WriteOpEntryDecoder::from_closed_segment( + let decoder = WriteOpEntryDecoder::from( wal.reader_for_segment(closed.id()) .expect("failed to open reader for closed segment"), ); let mut writer = LineProtoWriter::new(|_| Ok(Vec::::new()), table_name_index); - let mut decoded_entries = 0; - let mut decoded_ops = 0; - loop { - match decoder.next_write_op_entry_batch() { - // If the translator returns `None` indicating successful translation - // then something is broken. - Ok(v) => assert_matches!(v, Some(new_entries) => { - decoded_entries += 1; - decoded_ops += new_entries.len(); - for entry in new_entries { - writer.write_namespaced_table_batches(entry.namespace, entry.table_batches).expect("batch write should not fail"); - } - }), - Err(e) => { - assert_matches!(e, DecodeError::FailedToReadWal { .. }); - break; - } - }; + // The translator should be able to read all 2 good entries containing 4 write ops + let decoded_entries = decoder + .into_iter() + .map_while(|r| { + r.map_err(|e| match e { + DecodeError::FailedToReadWal { .. } => None::<()>, + _ => panic!("unexpected error"), + }) + .ok() + }) + .collect::>(); + assert_eq!(decoded_entries.len(), 2); + let decoded_ops = decoded_entries + .into_iter() + .flatten() + .collect::>(); + assert_eq!(decoded_ops.len(), 4); + for entry in decoded_ops { + writer + .write_namespaced_table_batches(entry.namespace, entry.table_batches) + .expect("batch write should not fail"); } - // The translator should have read all 2 good entries containing 4 write ops - assert_eq!(decoded_entries, 2); - assert_eq!(decoded_ops, 4); - let results = writer.namespaced_output; + let results = &writer.namespaced_output; // Assert that the namespaced writes contain ONLY the following: //