refactor(wal): Implement `Iterator` for ClosedSegmentFileReader
The ClosedSegmentFileReader is pretty much an iterator anyways, this just enables using all the juicy combinators with it more easily.pull/24376/head
parent
c8a7a8ec91
commit
fa69994358
|
@ -189,15 +189,15 @@ where
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let ops = match file.next_batch() {
|
let ops = match file.next() {
|
||||||
Ok(Some(v)) => v,
|
Some(Ok(v)) => v,
|
||||||
Ok(None) => {
|
Some(Err(e)) => return Err(WalReplayError::ReadEntry(e)),
|
||||||
|
None => {
|
||||||
// This file is complete, return the last observed sequence
|
// This file is complete, return the last observed sequence
|
||||||
// number.
|
// number.
|
||||||
debug!("wal file replayed in {:?}", start.elapsed());
|
debug!("wal file replayed in {:?}", start.elapsed());
|
||||||
return Ok(max_sequence);
|
return Ok(max_sequence);
|
||||||
}
|
}
|
||||||
Err(e) => return Err(WalReplayError::ReadEntry(e)),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
for op in ops {
|
for op in ops {
|
||||||
|
|
|
@ -219,7 +219,7 @@ mod tests {
|
||||||
|
|
||||||
// Obtain all the ops in the file
|
// Obtain all the ops in the file
|
||||||
let mut ops = Vec::new();
|
let mut ops = Vec::new();
|
||||||
while let Ok(Some(mut batch)) = reader.next_batch() {
|
while let Some(Ok(mut batch)) = reader.next() {
|
||||||
ops.append(&mut batch);
|
ops.append(&mut batch);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -428,12 +428,7 @@ async fn graceful_shutdown() {
|
||||||
.expect("failed to open wal segment");
|
.expect("failed to open wal segment");
|
||||||
|
|
||||||
// Assert the file contains no operations
|
// Assert the file contains no operations
|
||||||
assert_matches!(
|
assert_matches!(reader.next(), None);
|
||||||
reader
|
|
||||||
.next_batch()
|
|
||||||
.expect("failed to read wal segment contents"),
|
|
||||||
None
|
|
||||||
);
|
|
||||||
|
|
||||||
// Validate the parquet files were added to the catalog during shutdown.
|
// Validate the parquet files were added to the catalog during shutdown.
|
||||||
let parquet_files = catalog
|
let parquet_files = catalog
|
||||||
|
|
|
@ -553,12 +553,19 @@ pub struct ClosedSegmentFileReader {
|
||||||
file: RawClosedSegmentFileReader<BufReader<File>>,
|
file: RawClosedSegmentFileReader<BufReader<File>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ClosedSegmentFileReader {
|
impl Iterator for ClosedSegmentFileReader {
|
||||||
/// Get the next batch of sequenced wal ops from the file
|
type Item = Result<Vec<SequencedWalOp>>;
|
||||||
pub fn next_batch(&mut self) -> Result<Option<Vec<SequencedWalOp>>> {
|
|
||||||
self.file.next_batch().context(UnableToReadNextOpsSnafu)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
/// Read the next batch of sequenced WAL operations from the file
|
||||||
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
|
self.file
|
||||||
|
.next_batch()
|
||||||
|
.context(UnableToReadNextOpsSnafu)
|
||||||
|
.transpose()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ClosedSegmentFileReader {
|
||||||
/// Return the segment file id
|
/// Return the segment file id
|
||||||
pub fn id(&self) -> SegmentId {
|
pub fn id(&self) -> SegmentId {
|
||||||
self.id
|
self.id
|
||||||
|
@ -624,9 +631,8 @@ impl Iterator for WriteOpEntryDecoder {
|
||||||
fn next(&mut self) -> Option<Self::Item> {
|
fn next(&mut self) -> Option<Self::Item> {
|
||||||
Some(
|
Some(
|
||||||
self.reader
|
self.reader
|
||||||
.next_batch()
|
.next()?
|
||||||
.context(FailedToReadWalSnafu)
|
.context(FailedToReadWalSnafu)
|
||||||
.transpose()?
|
|
||||||
.map(|batch| {
|
.map(|batch| {
|
||||||
batch
|
batch
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -730,7 +736,7 @@ mod tests {
|
||||||
let mut reader = wal.reader_for_segment(closed.id).unwrap();
|
let mut reader = wal.reader_for_segment(closed.id).unwrap();
|
||||||
|
|
||||||
let mut ops = vec![];
|
let mut ops = vec![];
|
||||||
while let Ok(Some(mut batch)) = reader.next_batch() {
|
while let Some(Ok(mut batch)) = reader.next() {
|
||||||
ops.append(&mut batch);
|
ops.append(&mut batch);
|
||||||
}
|
}
|
||||||
assert_eq!(vec![op1, op2, op3, op4], ops);
|
assert_eq!(vec![op1, op2, op3, op4], ops);
|
||||||
|
@ -796,7 +802,7 @@ mod tests {
|
||||||
|
|
||||||
// There aren't any entries in the closed segment because nothing was written
|
// There aren't any entries in the closed segment because nothing was written
|
||||||
let mut reader = wal.reader_for_segment(closed_segment_details.id()).unwrap();
|
let mut reader = wal.reader_for_segment(closed_segment_details.id()).unwrap();
|
||||||
assert!(reader.next_batch().unwrap().is_none());
|
assert!(reader.next().is_none());
|
||||||
|
|
||||||
// Can delete an empty segment, leaving no closed segments again
|
// Can delete an empty segment, leaving no closed segments again
|
||||||
wal.delete(closed_segment_details.id()).await.unwrap();
|
wal.delete(closed_segment_details.id()).await.unwrap();
|
||||||
|
|
|
@ -57,13 +57,13 @@ async fn crud() {
|
||||||
// ensuring the per-partition sequence numbers match up to the current
|
// ensuring the per-partition sequence numbers match up to the current
|
||||||
// op-level sequence number while it is the source of truth.
|
// op-level sequence number while it is the source of truth.
|
||||||
let mut reader = wal.reader_for_segment(closed_segment_details.id()).unwrap();
|
let mut reader = wal.reader_for_segment(closed_segment_details.id()).unwrap();
|
||||||
let op = reader.next_batch().unwrap().unwrap();
|
let op = reader.next().unwrap().unwrap();
|
||||||
assert_eq!(op[0].sequence_number, 42);
|
assert_eq!(op[0].sequence_number, 42);
|
||||||
op[0]
|
op[0]
|
||||||
.table_write_sequence_numbers
|
.table_write_sequence_numbers
|
||||||
.values()
|
.values()
|
||||||
.for_each(|sequence_number| assert_eq!(*sequence_number, op[0].sequence_number));
|
.for_each(|sequence_number| assert_eq!(*sequence_number, op[0].sequence_number));
|
||||||
let op = reader.next_batch().unwrap().unwrap();
|
let op = reader.next().unwrap().unwrap();
|
||||||
assert_eq!(op[0].sequence_number, 43);
|
assert_eq!(op[0].sequence_number, 43);
|
||||||
op[0]
|
op[0]
|
||||||
.table_write_sequence_numbers
|
.table_write_sequence_numbers
|
||||||
|
@ -107,7 +107,7 @@ async fn replay() {
|
||||||
// ensuring the per-partition sequence numbers match up to the current
|
// ensuring the per-partition sequence numbers match up to the current
|
||||||
// op-level sequence number while it is the source of truth.
|
// op-level sequence number while it is the source of truth.
|
||||||
let mut reader = wal.reader_for_segment(closed_segment_ids[0]).unwrap();
|
let mut reader = wal.reader_for_segment(closed_segment_ids[0]).unwrap();
|
||||||
let op = reader.next_batch().unwrap().unwrap();
|
let op = reader.next().unwrap().unwrap();
|
||||||
assert_eq!(op[0].sequence_number, 42);
|
assert_eq!(op[0].sequence_number, 42);
|
||||||
op[0]
|
op[0]
|
||||||
.table_write_sequence_numbers
|
.table_write_sequence_numbers
|
||||||
|
@ -116,7 +116,7 @@ async fn replay() {
|
||||||
|
|
||||||
// Can read the written entries from the previously open segment
|
// Can read the written entries from the previously open segment
|
||||||
let mut reader = wal.reader_for_segment(closed_segment_ids[1]).unwrap();
|
let mut reader = wal.reader_for_segment(closed_segment_ids[1]).unwrap();
|
||||||
let op = reader.next_batch().unwrap().unwrap();
|
let op = reader.next().unwrap().unwrap();
|
||||||
assert_eq!(op[0].sequence_number, 43);
|
assert_eq!(op[0].sequence_number, 43);
|
||||||
op[0]
|
op[0]
|
||||||
.table_write_sequence_numbers
|
.table_write_sequence_numbers
|
||||||
|
|
Loading…
Reference in New Issue