refactor: Rename to clarify open segment types
parent
70b12a8112
commit
4335edc6d1
|
@ -2,4 +2,4 @@ mod reader;
|
|||
pub use reader::{Error as ReaderError, Result as ReaderResult, SegmentFileReader};
|
||||
|
||||
mod writer;
|
||||
pub use writer::{Error as WriterError, Result as WriterResult, SegmentFileWriter};
|
||||
pub use writer::{Error as WriterError, OpenSegmentFileWriter, Result as WriterResult};
|
||||
|
|
|
@ -9,14 +9,14 @@ use std::{
|
|||
path::PathBuf,
|
||||
};
|
||||
|
||||
pub struct SegmentFileWriter {
|
||||
pub struct OpenSegmentFileWriter {
|
||||
id: SegmentId,
|
||||
path: PathBuf,
|
||||
f: File,
|
||||
bytes_written: usize,
|
||||
}
|
||||
|
||||
impl SegmentFileWriter {
|
||||
impl OpenSegmentFileWriter {
|
||||
pub fn new_in_directory(dir: impl Into<PathBuf>) -> Result<Self> {
|
||||
let id = SegmentId::new();
|
||||
let path = crate::fnamex(dir, id);
|
||||
|
|
|
@ -162,7 +162,7 @@ const SEGMENT_FILE_EXTENSION: &str = "dat";
|
|||
pub struct Wal {
|
||||
root: PathBuf,
|
||||
closed_segments: Vec<ClosedSegment>,
|
||||
open_segment: SegmentFile,
|
||||
open_segment: OpenSegmentFile,
|
||||
}
|
||||
|
||||
impl Wal {
|
||||
|
@ -203,7 +203,7 @@ impl Wal {
|
|||
}
|
||||
}
|
||||
|
||||
let open_segment = SegmentFile::new_in_directory(&root).await?;
|
||||
let open_segment = OpenSegmentFile::new_in_directory(&root).await?;
|
||||
|
||||
Ok(Self {
|
||||
root,
|
||||
|
@ -331,19 +331,19 @@ pub struct WriteSummary {
|
|||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum SegmentFileWriterRequest {
|
||||
enum OpenSegmentFileWriterRequest {
|
||||
Write(oneshot::Sender<WriteSummary>, Vec<u8>), // todo Bytes
|
||||
Rotate(oneshot::Sender<ClosedSegment>, ()),
|
||||
}
|
||||
|
||||
/// A Segment in a WAL. One segment is stored in one file.
|
||||
/// An open segment in a WAL.
|
||||
#[derive(Debug)]
|
||||
struct SegmentFile {
|
||||
tx: mpsc::Sender<SegmentFileWriterRequest>,
|
||||
struct OpenSegmentFile {
|
||||
tx: mpsc::Sender<OpenSegmentFileWriterRequest>,
|
||||
task: tokio::task::JoinHandle<Result<()>>,
|
||||
}
|
||||
|
||||
impl SegmentFile {
|
||||
impl OpenSegmentFile {
|
||||
async fn new_in_directory(dir: impl Into<PathBuf>) -> Result<Self> {
|
||||
let dir = dir.into();
|
||||
let (tx, rx) = mpsc::channel(10);
|
||||
|
@ -352,14 +352,14 @@ impl SegmentFile {
|
|||
}
|
||||
|
||||
fn task_main(
|
||||
mut rx: tokio::sync::mpsc::Receiver<SegmentFileWriterRequest>,
|
||||
mut rx: tokio::sync::mpsc::Receiver<OpenSegmentFileWriterRequest>,
|
||||
dir: PathBuf,
|
||||
) -> Result<()> {
|
||||
let new_writ = || Ok(blocking::SegmentFileWriter::new_in_directory(&dir).unwrap());
|
||||
let new_writ = || Ok(blocking::OpenSegmentFileWriter::new_in_directory(&dir).unwrap());
|
||||
let mut open_write = new_writ()?;
|
||||
|
||||
while let Some(req) = rx.blocking_recv() {
|
||||
use SegmentFileWriterRequest::*;
|
||||
use OpenSegmentFileWriterRequest::*;
|
||||
|
||||
match req {
|
||||
Write(tx, data) => {
|
||||
|
@ -379,12 +379,12 @@ impl SegmentFile {
|
|||
}
|
||||
|
||||
async fn one_command<Req, Resp, Args>(
|
||||
tx: &mpsc::Sender<SegmentFileWriterRequest>,
|
||||
tx: &mpsc::Sender<OpenSegmentFileWriterRequest>,
|
||||
req: Req,
|
||||
args: Args,
|
||||
) -> Result<Resp>
|
||||
where
|
||||
Req: FnOnce(oneshot::Sender<Resp>, Args) -> SegmentFileWriterRequest,
|
||||
Req: FnOnce(oneshot::Sender<Resp>, Args) -> OpenSegmentFileWriterRequest,
|
||||
{
|
||||
let (req_tx, req_rx) = oneshot::channel();
|
||||
|
||||
|
@ -397,7 +397,7 @@ impl SegmentFile {
|
|||
}
|
||||
|
||||
async fn write(&self, data: &[u8]) -> Result<WriteSummary> {
|
||||
Self::one_command(&self.tx, SegmentFileWriterRequest::Write, data.to_vec()).await
|
||||
Self::one_command(&self.tx, OpenSegmentFileWriterRequest::Write, data.to_vec()).await
|
||||
}
|
||||
|
||||
async fn write_ops(&self, ops: &[SequencedWalOp]) -> Result<WriteSummary> {
|
||||
|
@ -407,17 +407,18 @@ impl SegmentFile {
|
|||
}
|
||||
|
||||
async fn rotate(&self) -> Result<ClosedSegment> {
|
||||
Self::one_command(&self.tx, SegmentFileWriterRequest::Rotate, ()).await
|
||||
Self::one_command(&self.tx, OpenSegmentFileWriterRequest::Rotate, ()).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle to the one currently open segment for users of the WAL to send [`SequencedWalOp`]s to.
|
||||
#[derive(Debug)]
|
||||
pub struct OpenSegment(mpsc::Sender<SegmentFileWriterRequest>);
|
||||
pub struct OpenSegment(mpsc::Sender<OpenSegmentFileWriterRequest>);
|
||||
|
||||
impl OpenSegment {
|
||||
async fn write(&self, data: &[u8]) -> Result<WriteSummary> {
|
||||
SegmentFile::one_command(&self.0, SegmentFileWriterRequest::Write, data.to_vec()).await
|
||||
OpenSegmentFile::one_command(&self.0, OpenSegmentFileWriterRequest::Write, data.to_vec())
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn write_op(&self, op: &SequencedWalOp) -> Result<WriteSummary> {
|
||||
|
@ -554,7 +555,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn segment_file_write_and_read_entries() {
|
||||
let dir = test_helpers::tmp_dir().unwrap();
|
||||
let sf = SegmentFile::new_in_directory(dir.path()).await.unwrap();
|
||||
let sf = OpenSegmentFile::new_in_directory(dir.path()).await.unwrap();
|
||||
|
||||
let data = b"whatevs";
|
||||
let write_summary = sf.write(data).await.unwrap();
|
||||
|
@ -584,7 +585,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn segment_file_write_and_read_ops() {
|
||||
let dir = test_helpers::tmp_dir().unwrap();
|
||||
let segment = SegmentFile::new_in_directory(dir.path()).await.unwrap();
|
||||
let segment = OpenSegmentFile::new_in_directory(dir.path()).await.unwrap();
|
||||
|
||||
let w1 = test_data("m1,t=foo v=1i 1");
|
||||
let w2 = test_data("m1,t=foo v=2i 2");
|
||||
|
|
Loading…
Reference in New Issue