refactor: Rename to clarify closed segment types

pull/24376/head
Carol (Nichols || Goulding) 2022-11-21 10:49:01 -05:00
parent 4335edc6d1
commit be2207bcd3
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
3 changed files with 30 additions and 23 deletions

View File

@ -1,5 +1,5 @@
mod reader;
pub use reader::{Error as ReaderError, Result as ReaderResult, SegmentFileReader};
pub use reader::{ClosedSegmentFileReader, Error as ReaderError, Result as ReaderResult};
mod writer;
pub use writer::{Error as WriterError, OpenSegmentFileWriter, Result as WriterResult};

View File

@ -9,9 +9,9 @@ use std::{
path::{Path, PathBuf},
};
pub struct SegmentFileReader<R>(R);
pub struct ClosedSegmentFileReader<R>(R);
impl SegmentFileReader<BufReader<File>> {
impl ClosedSegmentFileReader<BufReader<File>> {
pub fn from_path(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref();
let f = File::open(path).context(UnableToOpenFileSnafu { path })?;
@ -20,7 +20,7 @@ impl SegmentFileReader<BufReader<File>> {
}
}
impl<R> SegmentFileReader<R>
impl<R> ClosedSegmentFileReader<R>
where
R: Read,
{
@ -101,7 +101,7 @@ where
}
}
impl<R> Iterator for SegmentFileReader<R>
impl<R> Iterator for ClosedSegmentFileReader<R>
where
R: io::Read,
{

View File

@ -224,7 +224,7 @@ pub trait SegmentWal {
fn closed_segments(&self) -> &[ClosedSegment];
/// Opens a reader for a given segment from the WAL
async fn reader_for_segment(&self, id: SegmentId) -> Result<SegmentFileReader>;
async fn reader_for_segment(&self, id: SegmentId) -> Result<ClosedSegmentFileReader>;
/// Returns a handle to the WAL to commit entries to the currently active segment.
async fn write_handle(&self) -> OpenSegment;
@ -245,9 +245,9 @@ impl SegmentWal for Wal {
&self.closed_segments
}
async fn reader_for_segment(&self, id: SegmentId) -> Result<SegmentFileReader> {
async fn reader_for_segment(&self, id: SegmentId) -> Result<ClosedSegmentFileReader> {
let path = fnamex(&self.root, id);
SegmentFileReader::from_path(path).await
ClosedSegmentFileReader::from_path(path).await
}
async fn write_handle(&self) -> OpenSegment {
@ -430,7 +430,7 @@ impl OpenSegment {
}
#[derive(Debug)]
enum SegmentFileReaderRequest {
enum ClosedSegmentFileReaderRequest {
ReadHeader(oneshot::Sender<blocking::ReaderResult<(FileTypeIdentifier, uuid::Bytes)>>),
Entries(oneshot::Sender<blocking::ReaderResult<Vec<SegmentEntry>>>),
@ -440,20 +440,20 @@ enum SegmentFileReaderRequest {
/// Enables reading a particular closed segment's entries.
#[derive(Debug)]
pub struct SegmentFileReader {
pub struct ClosedSegmentFileReader {
id: SegmentId,
tx: mpsc::Sender<SegmentFileReaderRequest>,
tx: mpsc::Sender<ClosedSegmentFileReaderRequest>,
task: tokio::task::JoinHandle<Result<()>>,
}
impl SegmentFileReader {
impl ClosedSegmentFileReader {
async fn from_path(path: impl Into<PathBuf>) -> Result<Self> {
let path = path.into();
let (tx, rx) = mpsc::channel::<SegmentFileReaderRequest>(10);
let (tx, rx) = mpsc::channel::<ClosedSegmentFileReaderRequest>(10);
let task = tokio::task::spawn_blocking(|| Self::task_main(rx, path));
let (file_type, id) = Self::one_command(&tx, SegmentFileReaderRequest::ReadHeader)
let (file_type, id) = Self::one_command(&tx, ClosedSegmentFileReaderRequest::ReadHeader)
.await?
.context(UnableToReadFileHeaderSnafu)?;
@ -468,12 +468,15 @@ impl SegmentFileReader {
Ok(Self { id, tx, task })
}
fn task_main(mut rx: mpsc::Receiver<SegmentFileReaderRequest>, path: PathBuf) -> Result<()> {
let mut reader = blocking::SegmentFileReader::from_path(&path)
fn task_main(
mut rx: mpsc::Receiver<ClosedSegmentFileReaderRequest>,
path: PathBuf,
) -> Result<()> {
let mut reader = blocking::ClosedSegmentFileReader::from_path(&path)
.context(UnableToOpenFileSnafu { path })?;
while let Some(req) = rx.blocking_recv() {
use SegmentFileReaderRequest::*;
use ClosedSegmentFileReaderRequest::*;
// We don't care if we can't respond to the request.
match req {
@ -495,11 +498,11 @@ impl SegmentFileReader {
}
async fn one_command<Req, Resp>(
tx: &mpsc::Sender<SegmentFileReaderRequest>,
tx: &mpsc::Sender<ClosedSegmentFileReaderRequest>,
req: Req,
) -> Result<Resp>
where
Req: FnOnce(oneshot::Sender<Resp>) -> SegmentFileReaderRequest,
Req: FnOnce(oneshot::Sender<Resp>) -> ClosedSegmentFileReaderRequest,
{
let (req_tx, req_rx) = oneshot::channel();
tx.send(req(req_tx))
@ -513,13 +516,13 @@ impl SegmentFileReader {
// TODO: Should this return a stream instead of a big vector?
async fn entries(&mut self) -> Result<Vec<SegmentEntry>> {
Self::one_command(&self.tx, SegmentFileReaderRequest::Entries)
Self::one_command(&self.tx, ClosedSegmentFileReaderRequest::Entries)
.await?
.context(UnableToReadEntriesSnafu)
}
pub async fn next_ops(&mut self) -> Result<Option<Vec<SequencedWalOp>>> {
Self::one_command(&self.tx, SegmentFileReaderRequest::NextOps)
Self::one_command(&self.tx, ClosedSegmentFileReaderRequest::NextOps)
.await?
.context(UnableToReadNextOpsSnafu)
}
@ -565,7 +568,9 @@ mod tests {
let closed = sf.rotate().await.unwrap();
let mut reader = SegmentFileReader::from_path(&closed.path).await.unwrap();
let mut reader = ClosedSegmentFileReader::from_path(&closed.path)
.await
.unwrap();
let entries = reader.entries().await.unwrap();
assert_eq!(
&entries,
@ -604,7 +609,9 @@ mod tests {
let closed = segment.rotate().await.unwrap();
let mut reader = SegmentFileReader::from_path(&closed.path).await.unwrap();
let mut reader = ClosedSegmentFileReader::from_path(&closed.path)
.await
.unwrap();
let read_ops = reader.next_ops().await.unwrap().unwrap();
assert_eq!(ops, read_ops);
}