refactor(wal): Remove `IncompleteEntry` reader error
parent
a160b97977
commit
f5bfe72c7b
|
@ -194,9 +194,10 @@ impl SegmentedWalOpBatchReader for wal::ClosedSegmentFileReader {
|
||||||
///
|
///
|
||||||
/// # Warnings
|
/// # Warnings
|
||||||
///
|
///
|
||||||
/// This function relies on the [`wal::Error::UnableToReadNextOps`] error
|
/// This function relies on the [`wal::blocking::ReaderError::UnableToReadData`]
|
||||||
/// meaning that there are no more valid completed writes which can be read
|
/// error sourced from an unexpected eof error to mean that there are no more
|
||||||
/// from the provided `batches` and that it is safe to ignore them.
|
/// valid completed writes which can be read from the provided `batches` and
|
||||||
|
/// that it is safe to ignore them.
|
||||||
async fn replay_file<T, F>(
|
async fn replay_file<T, F>(
|
||||||
file: F,
|
file: F,
|
||||||
sink: &T,
|
sink: &T,
|
||||||
|
@ -214,14 +215,15 @@ where
|
||||||
for batch in file {
|
for batch in file {
|
||||||
if let Err(
|
if let Err(
|
||||||
err @ wal::Error::UnableToReadNextOps {
|
err @ wal::Error::UnableToReadNextOps {
|
||||||
source: wal::blocking::ReaderError::IncompleteEntry { .. },
|
source: wal::blocking::ReaderError::UnableToReadData { source: io_err },
|
||||||
},
|
},
|
||||||
) = batch
|
) = &batch
|
||||||
{
|
{
|
||||||
warn!(%err, ?segment_id, "detected truncated WAL write, ending replay for file early");
|
if io_err.kind() == std::io::ErrorKind::UnexpectedEof {
|
||||||
break;
|
warn!(%err, ?segment_id, "detected truncated WAL write, ending replay for file early");
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let ops = batch.map_err(WalReplayError::ReadEntry)?;
|
let ops = batch.map_err(WalReplayError::ReadEntry)?;
|
||||||
|
|
||||||
for op in ops {
|
for op in ops {
|
||||||
|
@ -703,7 +705,7 @@ mod tests {
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.collect()),
|
.collect()),
|
||||||
Err(wal::Error::UnableToReadNextOps {
|
Err(wal::Error::UnableToReadNextOps {
|
||||||
source: wal::blocking::ReaderError::IncompleteEntry {
|
source: wal::blocking::ReaderError::UnableToReadData {
|
||||||
source: std::io::Error::new(
|
source: std::io::Error::new(
|
||||||
std::io::ErrorKind::UnexpectedEof,
|
std::io::ErrorKind::UnexpectedEof,
|
||||||
"gremlins in the drive",
|
"gremlins in the drive",
|
||||||
|
|
|
@ -60,12 +60,8 @@ where
|
||||||
let mut decompressing_read = FrameDecoder::new(hashing_read);
|
let mut decompressing_read = FrameDecoder::new(hashing_read);
|
||||||
|
|
||||||
let mut data = Vec::with_capacity(100);
|
let mut data = Vec::with_capacity(100);
|
||||||
match decompressing_read.read_to_end(&mut data) {
|
let other = decompressing_read.read_to_end(&mut data);
|
||||||
Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
|
other.context(UnableToReadDataSnafu)?;
|
||||||
Err(e).context(IncompleteEntrySnafu)
|
|
||||||
}
|
|
||||||
other => other.context(UnableToReadDataSnafu),
|
|
||||||
}?;
|
|
||||||
|
|
||||||
let (actual_compressed_len, actual_checksum) = decompressing_read.into_inner().checksum();
|
let (actual_compressed_len, actual_checksum) = decompressing_read.into_inner().checksum();
|
||||||
|
|
||||||
|
@ -164,12 +160,6 @@ pub enum Error {
|
||||||
source: io::Error,
|
source: io::Error,
|
||||||
},
|
},
|
||||||
|
|
||||||
/// An [`Error::IncompleteEntry`] error is returned when the reader is unable to
|
|
||||||
/// read an entry because of an unexpected end of file.
|
|
||||||
IncompleteEntry {
|
|
||||||
source: io::Error,
|
|
||||||
},
|
|
||||||
|
|
||||||
LengthMismatch {
|
LengthMismatch {
|
||||||
expected: u64,
|
expected: u64,
|
||||||
actual: u64,
|
actual: u64,
|
||||||
|
@ -199,6 +189,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::{SegmentId, FILE_TYPE_IDENTIFIER};
|
use crate::{SegmentId, FILE_TYPE_IDENTIFIER};
|
||||||
|
use assert_matches::assert_matches;
|
||||||
use byteorder::WriteBytesExt;
|
use byteorder::WriteBytesExt;
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use test_helpers::assert_error;
|
use test_helpers::assert_error;
|
||||||
|
@ -265,7 +256,9 @@ mod tests {
|
||||||
assert_eq!(uuid, segment_file.id.as_bytes());
|
assert_eq!(uuid, segment_file.id.as_bytes());
|
||||||
|
|
||||||
let read_fail = reader.one_entry();
|
let read_fail = reader.one_entry();
|
||||||
assert_error!(read_fail, Error::IncompleteEntry { .. });
|
assert_matches!(read_fail, Err(Error::UnableToReadData { source: e }) => {
|
||||||
|
assert_matches!(e.kind(), std::io::ErrorKind::UnexpectedEof);
|
||||||
|
});
|
||||||
// Trying to continue reading will fail as well, see:
|
// Trying to continue reading will fail as well, see:
|
||||||
// <https://github.com/influxdata/influxdb_iox/issues/6222>
|
// <https://github.com/influxdata/influxdb_iox/issues/6222>
|
||||||
assert_error!(reader.one_entry(), Error::UnableToReadData { .. });
|
assert_error!(reader.one_entry(), Error::UnableToReadData { .. });
|
||||||
|
@ -290,7 +283,9 @@ mod tests {
|
||||||
assert_eq!(uuid, segment_file.id.as_bytes());
|
assert_eq!(uuid, segment_file.id.as_bytes());
|
||||||
|
|
||||||
let read_fail = reader.one_entry();
|
let read_fail = reader.one_entry();
|
||||||
assert_error!(read_fail, Error::IncompleteEntry { .. });
|
assert_matches!(read_fail, Err(Error::UnableToReadData { source: e }) => {
|
||||||
|
assert_matches!(e.kind(), std::io::ErrorKind::UnexpectedEof);
|
||||||
|
});
|
||||||
// Trying to continue reading will fail as well, see:
|
// Trying to continue reading will fail as well, see:
|
||||||
// <https://github.com/influxdata/influxdb_iox/issues/6222>
|
// <https://github.com/influxdata/influxdb_iox/issues/6222>
|
||||||
assert_error!(reader.one_entry(), Error::UnableToReadData { .. });
|
assert_error!(reader.one_entry(), Error::UnableToReadData { .. });
|
||||||
|
|
Loading…
Reference in New Issue