diff --git a/wal/src/lib.rs b/wal/src/lib.rs index b60b85a475..54058d5e2e 100644 --- a/wal/src/lib.rs +++ b/wal/src/lib.rs @@ -134,10 +134,19 @@ pub enum Error { UnableToCreateSegmentFile { source: blocking::WriterError, }, +} - UnableToDecodeRecordBatch { +/// Errors that occur when decoding internal types from a WAL file. +#[derive(Debug, Snafu)] +#[snafu(visibility(pub(crate)))] +pub enum DecodeError { + UnableToCreateMutableBatch { source: mutable_batch_pb::decode::Error, }, + + FailedToReadWal { + source: Error, + }, } /// A specialized `Result` for WAL-related errors @@ -588,22 +597,22 @@ impl WriteOpEntryDecoder { /// more entries to be decoded from the underlying segment. A zero-length vector /// may be returned if there are no writes in a WAL entry batch, but does not /// indicate the decoder is consumed. - pub fn next_write_op_entry_batch(&mut self) -> Result>> { - match self.reader.next_batch()? { + pub fn next_write_op_entry_batch(&mut self) -> Result>, DecodeError> { + match self.reader.next_batch().context(FailedToReadWalSnafu)? { Some(batch) => Ok(batch .into_iter() .filter_map(|sequenced_op| match sequenced_op.op { WalOp::Write(w) => Some(w), _ => None, }) - .map(|w| -> Result { + .map(|w| -> Result { Ok(WriteOpEntry { namespace: NamespaceId::new(w.database_id), table_batches: decode_database_batch(&w) - .context(UnableToDecodeRecordBatchSnafu)?, + .context(UnableToCreateMutableBatchSnafu)?, }) }) - .collect::>>()? + .collect::, DecodeError>>()? .into()), None => Ok(None), } diff --git a/wal_inspect/src/lib.rs b/wal_inspect/src/lib.rs index 0b37839d80..c0a5e12e08 100644 --- a/wal_inspect/src/lib.rs +++ b/wal_inspect/src/lib.rs @@ -127,7 +127,7 @@ mod tests { iox::wal::v1::sequenced_wal_op::Op, pbdata::v1::DatabaseBatch, }; use mutable_batch_lp::lines_to_batches; - use wal::{Error as WalError, SequencedWalOp, WriteOpEntryDecoder}; + use wal::{DecodeError, SequencedWalOp, WriteOpEntryDecoder}; use super::*; @@ -289,7 +289,7 @@ mod tests { } }), Err(e) => { - assert_matches!(e, WalError::UnableToReadNextOps { .. }); + assert_matches!(e, DecodeError::FailedToReadWal { .. }); break; } };