diff --git a/wal/src/blocking.rs b/wal/src/blocking.rs index 309f2bc7aa..f8bf89df0f 100644 --- a/wal/src/blocking.rs +++ b/wal/src/blocking.rs @@ -1,5 +1,5 @@ mod reader; -pub use reader::{Error as ReaderError, Result as ReaderResult, SegmentFileReader}; +pub use reader::{ClosedSegmentFileReader, Error as ReaderError, Result as ReaderResult}; mod writer; pub use writer::{Error as WriterError, OpenSegmentFileWriter, Result as WriterResult}; diff --git a/wal/src/blocking/reader.rs b/wal/src/blocking/reader.rs index b4bfae2f3c..230db8a671 100644 --- a/wal/src/blocking/reader.rs +++ b/wal/src/blocking/reader.rs @@ -9,9 +9,9 @@ use std::{ path::{Path, PathBuf}, }; -pub struct SegmentFileReader(R); +pub struct ClosedSegmentFileReader(R); -impl SegmentFileReader> { +impl ClosedSegmentFileReader> { pub fn from_path(path: impl AsRef) -> Result { let path = path.as_ref(); let f = File::open(path).context(UnableToOpenFileSnafu { path })?; @@ -20,7 +20,7 @@ impl SegmentFileReader> { } } -impl SegmentFileReader +impl ClosedSegmentFileReader where R: Read, { @@ -101,7 +101,7 @@ where } } -impl Iterator for SegmentFileReader +impl Iterator for ClosedSegmentFileReader where R: io::Read, { diff --git a/wal/src/lib.rs b/wal/src/lib.rs index 52149611c2..aedec25aa9 100644 --- a/wal/src/lib.rs +++ b/wal/src/lib.rs @@ -224,7 +224,7 @@ pub trait SegmentWal { fn closed_segments(&self) -> &[ClosedSegment]; /// Opens a reader for a given segment from the WAL - async fn reader_for_segment(&self, id: SegmentId) -> Result; + async fn reader_for_segment(&self, id: SegmentId) -> Result; /// Returns a handle to the WAL to commit entries to the currently active segment. async fn write_handle(&self) -> OpenSegment; @@ -245,9 +245,9 @@ impl SegmentWal for Wal { &self.closed_segments } - async fn reader_for_segment(&self, id: SegmentId) -> Result { + async fn reader_for_segment(&self, id: SegmentId) -> Result { let path = fnamex(&self.root, id); - SegmentFileReader::from_path(path).await + ClosedSegmentFileReader::from_path(path).await } async fn write_handle(&self) -> OpenSegment { @@ -430,7 +430,7 @@ impl OpenSegment { } #[derive(Debug)] -enum SegmentFileReaderRequest { +enum ClosedSegmentFileReaderRequest { ReadHeader(oneshot::Sender>), Entries(oneshot::Sender>>), @@ -440,20 +440,20 @@ enum SegmentFileReaderRequest { /// Enables reading a particular closed segment's entries. #[derive(Debug)] -pub struct SegmentFileReader { +pub struct ClosedSegmentFileReader { id: SegmentId, - tx: mpsc::Sender, + tx: mpsc::Sender, task: tokio::task::JoinHandle>, } -impl SegmentFileReader { +impl ClosedSegmentFileReader { async fn from_path(path: impl Into) -> Result { let path = path.into(); - let (tx, rx) = mpsc::channel::(10); + let (tx, rx) = mpsc::channel::(10); let task = tokio::task::spawn_blocking(|| Self::task_main(rx, path)); - let (file_type, id) = Self::one_command(&tx, SegmentFileReaderRequest::ReadHeader) + let (file_type, id) = Self::one_command(&tx, ClosedSegmentFileReaderRequest::ReadHeader) .await? .context(UnableToReadFileHeaderSnafu)?; @@ -468,12 +468,15 @@ impl SegmentFileReader { Ok(Self { id, tx, task }) } - fn task_main(mut rx: mpsc::Receiver, path: PathBuf) -> Result<()> { - let mut reader = blocking::SegmentFileReader::from_path(&path) + fn task_main( + mut rx: mpsc::Receiver, + path: PathBuf, + ) -> Result<()> { + let mut reader = blocking::ClosedSegmentFileReader::from_path(&path) .context(UnableToOpenFileSnafu { path })?; while let Some(req) = rx.blocking_recv() { - use SegmentFileReaderRequest::*; + use ClosedSegmentFileReaderRequest::*; // We don't care if we can't respond to the request. match req { @@ -495,11 +498,11 @@ impl SegmentFileReader { } async fn one_command( - tx: &mpsc::Sender, + tx: &mpsc::Sender, req: Req, ) -> Result where - Req: FnOnce(oneshot::Sender) -> SegmentFileReaderRequest, + Req: FnOnce(oneshot::Sender) -> ClosedSegmentFileReaderRequest, { let (req_tx, req_rx) = oneshot::channel(); tx.send(req(req_tx)) @@ -513,13 +516,13 @@ impl SegmentFileReader { // TODO: Should this return a stream instead of a big vector? async fn entries(&mut self) -> Result> { - Self::one_command(&self.tx, SegmentFileReaderRequest::Entries) + Self::one_command(&self.tx, ClosedSegmentFileReaderRequest::Entries) .await? .context(UnableToReadEntriesSnafu) } pub async fn next_ops(&mut self) -> Result>> { - Self::one_command(&self.tx, SegmentFileReaderRequest::NextOps) + Self::one_command(&self.tx, ClosedSegmentFileReaderRequest::NextOps) .await? .context(UnableToReadNextOpsSnafu) } @@ -565,7 +568,9 @@ mod tests { let closed = sf.rotate().await.unwrap(); - let mut reader = SegmentFileReader::from_path(&closed.path).await.unwrap(); + let mut reader = ClosedSegmentFileReader::from_path(&closed.path) + .await + .unwrap(); let entries = reader.entries().await.unwrap(); assert_eq!( &entries, @@ -604,7 +609,9 @@ mod tests { let closed = segment.rotate().await.unwrap(); - let mut reader = SegmentFileReader::from_path(&closed.path).await.unwrap(); + let mut reader = ClosedSegmentFileReader::from_path(&closed.path) + .await + .unwrap(); let read_ops = reader.next_ops().await.unwrap().unwrap(); assert_eq!(ops, read_ops); }