diff --git a/wal/src/lib.rs b/wal/src/lib.rs index 804d09207e..56bf5feb70 100644 --- a/wal/src/lib.rs +++ b/wal/src/lib.rs @@ -147,8 +147,10 @@ pub enum Error { /// A specialized `Result` for WAL-related errors pub type Result = std::result::Result; +// todo: change to newtypes /// SequenceNumber is a u64 monotonically-increasing number provided by users of the WAL for /// their tracking purposes of data getting written into a segment. +// who enforces monotonicity? what happens if WAL receives a lower sequence number? pub type SequenceNumber = u64; /// Segments are identified by a type 4 UUID @@ -167,7 +169,7 @@ pub enum WalOp { Persist(PersistOp), } -/// Wal operation with a sequence number, which is used to inform read buffers when to evict data +/// WAL operation with a sequence number, which is used to inform read buffers when to evict data /// from the buffer #[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct SequencedWalOp { @@ -175,39 +177,42 @@ pub struct SequencedWalOp { pub op: WalOp, } -/// Serializable and deserializable persist information that can be saved to the wal. This is +/// Serializable and deserializable persist information that can be saved to the WAL. This is /// used during replay to evict data from memory. #[derive(Debug, Deserialize, Serialize, Eq, PartialEq)] pub struct PersistOp { + // todo: use data_types for these namespace_id: i64, table_id: i64, partition_id: i64, parquet_file_uuid: String, } -/// Methods for reading ops from a segment in a wal +/// Methods for reading ops from a segment in a WAL #[async_trait] pub trait OpReader { // get the next collection of ops. Since ops are batched into Segments, they come - // back as a collection. Each SegmentEntry will encode a vec of WalOps + // back as a collection. Each `SegmentEntry` will encode a `Vec`. + // todo: change to Result>, or a stream of `Vec`? async fn next(&mut self) -> Result>>; } -/// Methods for a Segment +/// Methods for a `Segment` #[async_trait] pub trait Segment { /// Get the id of the segment fn id(&self) -> SegmentId; - /// Persist an operation into the segment. This is meant ot be an accumulator that will - /// batch ops together and return when the collection have been persisted to the segment + /// Persist an operation into the segment. The `Segment` trait implementer is meant to be an + /// accumulator that will batch ops together, and `write_op` calls will return when the + /// collection has been persisted to the segment file. async fn write_op(&self, op: &SequencedWalOp) -> Result; /// Return a reader for the ops in the segment async fn reader(&self) -> Result>; } -/// Methods for working with segments (a wal) +/// Methods for working with segments (a WAL) #[async_trait] pub trait SegmentWal { /// Closes the currently open segment and opens a new one, returning the closed segment details @@ -217,7 +222,7 @@ pub trait SegmentWal { /// Gets a list of the closed segments async fn closed_segments() -> Vec; - /// Opens a reader for a given segment from the wal + /// Opens a reader for a given segment from the WAL async fn reader_for_segment(id: SegmentId) -> Result>; /// Returns a handle to the open segment @@ -230,7 +235,7 @@ pub trait SegmentWal { /// Data for a Segment entry #[derive(Debug, Eq, PartialEq)] pub struct SegmentEntry { - /// The crc checksum of the uncompressed data + /// The CRC checksum of the uncompressed data pub checksum: u32, /// The uncompressed data pub data: Vec, @@ -240,19 +245,19 @@ pub struct SegmentEntry { #[derive(Debug, Copy, Clone)] pub struct WriteSummary { /// Total size of the segment in bytes - pub segment_size_bytes: usize, - /// Number of bytes written to segment + pub total_bytes: usize, + /// Number of bytes written to segment in this write pub bytes_written: usize, /// Checksum for the compressed data written to segment pub checksum: u32, } -/// A Segment in a wal, which is a single file +/// A Segment in a WAL. One segment is stored in one file. #[derive(Debug)] pub struct SegmentFile { - id: Uuid, + id: Uuid, // todo: use SegmentId type path: PathBuf, - state: RwLock, + state: RwLock, // todo: is this lock needed? } #[derive(Debug)] @@ -296,13 +301,13 @@ impl SegmentFile { async fn write(&self, data: &[u8]) -> Result { // Only designed to support chunks up to `u32::max` bytes long. let uncompressed_len = data.len(); - let _ = u32::try_from(uncompressed_len).context(ChunkSizeTooLargeSnafu { + u32::try_from(uncompressed_len).context(ChunkSizeTooLargeSnafu { actual: uncompressed_len, })?; let mut encoder = snap::raw::Encoder::new(); let compressed_data = encoder - .compress_vec(&data) + .compress_vec(data) .context(UnableToCompressDataSnafu)?; let actual_compressed_len = compressed_data.len(); let actual_compressed_len = @@ -337,12 +342,13 @@ impl SegmentFile { Ok(WriteSummary { checksum, - segment_size_bytes: state.bytes_written, + total_bytes: state.bytes_written, bytes_written, }) } async fn write_ops(&self, ops: &[SequencedWalOp]) -> Result { + // todo: bincode instead of serde_json let encoded = serde_json::to_vec(&ops).unwrap(); self.write(&encoded).await } @@ -367,14 +373,15 @@ impl SegmentFile { } struct SegmentFileReader { + // todo: pin necessary? f: Pin>, id: SegmentId, } impl SegmentFileReader { pub async fn new(path: impl AsRef) -> Result { - let path = path.as_ref().to_owned(); - let f = tokio::fs::File::open(&path) + let path = path.as_ref(); + let f = tokio::fs::File::open(path) .await .context(UnableToOpenFileSnafu { path })?; @@ -410,27 +417,6 @@ impl SegmentFileReader { } } -async fn file_to_entries(path: impl AsRef) -> Result> { - let path = path.as_ref().to_owned(); - let f = tokio::fs::File::open(&path) - .await - .context(UnableToOpenFileSnafu { path })?; - - let mut f: Pin> = Box::pin(f); - - // read the identifier from the beginning of the file - is_segment_stream(&mut f).await?; - - let _ = read_id(&mut f).await?; - - let mut entries = Vec::new(); - while let Some(entry) = read_entry(&mut f).await? { - entries.push(entry); - } - - Ok(entries) -} - async fn is_segment_stream(f: &mut Pin>) -> Result<()> { let mut header = vec![0u8; FILE_TYPE_IDENTIFIER.len()]; f.read_exact(&mut header) @@ -517,21 +503,21 @@ pub struct Wal { impl Wal { pub async fn new(root: impl AsRef) -> Result { - let root = root.as_ref().to_owned(); - tokio::fs::create_dir_all(&root) + let root = root.as_ref(); + tokio::fs::create_dir_all(root) .await - .context(UnableToCreateWalDirSnafu { path: &root })?; + .context(UnableToCreateWalDirSnafu { path: root })?; - let mut dir = tokio::fs::read_dir(&root) + let mut dir = tokio::fs::read_dir(root) .await - .context(UnableToReadDirectoryContentsSnafu { path: &root })?; + .context(UnableToReadDirectoryContentsSnafu { path: root })?; let mut closed_segments = Vec::new(); while let Some(child) = dir .next_entry() .await - .context(UnableToReadDirectoryContentsSnafu { path: &root })? + .context(UnableToReadDirectoryContentsSnafu { path: root })? { let metadata = child .metadata() @@ -547,10 +533,10 @@ impl Wal { } } - let open_segment = Arc::new(SegmentFile::new_writer(&root).await?); + let open_segment = Arc::new(SegmentFile::new_writer(root).await?); Ok(Self { - root, + root: root.to_owned(), closed_segments, open_segment, }) @@ -563,7 +549,6 @@ mod tests { use data_types::{NamespaceId, TableId}; use dml::DmlWrite; use mutable_batch_lp::lines_to_batches; - use mutable_batch_pb; #[tokio::test] async fn segment_file_write_and_read_entries() { @@ -628,6 +613,8 @@ mod tests { assert_eq!(ops, read_ops); } + // test delete and persist ops + // open wal and write and read ops from segment // rotates wal to new segment @@ -640,8 +627,10 @@ mod tests { // read segment works even if last entry is truncated + // writes get batched + fn test_data(lp: &str) -> DatabaseBatch { - let batches = lines_to_batches(&lp, 0).unwrap(); + let batches = lines_to_batches(lp, 0).unwrap(); let ids = batches .keys() .enumerate()