fix: Cleanups, notes, and clarifications noticed while reading

pull/24376/head
Carol (Nichols || Goulding) 2022-11-17 15:51:23 -05:00
parent ef6eda6399
commit d7134d4846
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
1 changed files with 40 additions and 51 deletions

View File

@ -147,8 +147,10 @@ pub enum Error {
/// A specialized `Result` for WAL-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
// todo: change to newtypes
/// SequenceNumber is a u64 monotonically-increasing number provided by users of the WAL for
/// their tracking purposes of data getting written into a segment.
// who enforces monotonicity? what happens if WAL receives a lower sequence number?
pub type SequenceNumber = u64;
/// Segments are identified by a type 4 UUID
@ -167,7 +169,7 @@ pub enum WalOp {
Persist(PersistOp),
}
/// Wal operation with a sequence number, which is used to inform read buffers when to evict data
/// WAL operation with a sequence number, which is used to inform read buffers when to evict data
/// from the buffer
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub struct SequencedWalOp {
@ -175,39 +177,42 @@ pub struct SequencedWalOp {
pub op: WalOp,
}
/// Serializable and deserializable persist information that can be saved to the wal. This is
/// Serializable and deserializable persist information that can be saved to the WAL. This is
/// used during replay to evict data from memory.
#[derive(Debug, Deserialize, Serialize, Eq, PartialEq)]
pub struct PersistOp {
// todo: use data_types for these
namespace_id: i64,
table_id: i64,
partition_id: i64,
parquet_file_uuid: String,
}
/// Methods for reading ops from a segment in a wal
/// Methods for reading ops from a segment in a WAL
#[async_trait]
pub trait OpReader {
// get the next collection of ops. Since ops are batched into Segments, they come
// back as a collection. Each SegmentEntry will encode a vec of WalOps
// back as a collection. Each `SegmentEntry` will encode a `Vec<WalOps>`.
// todo: change to Result<Vec<WalOp>>, or a stream of `Vec<WalOps>`?
async fn next(&mut self) -> Result<Option<Vec<WalOp>>>;
}
/// Methods for a Segment
/// Methods for a `Segment`
#[async_trait]
pub trait Segment {
/// Get the id of the segment
fn id(&self) -> SegmentId;
/// Persist an operation into the segment. This is meant ot be an accumulator that will
/// batch ops together and return when the collection have been persisted to the segment
/// Persist an operation into the segment. The `Segment` trait implementer is meant to be an
/// accumulator that will batch ops together, and `write_op` calls will return when the
/// collection has been persisted to the segment file.
async fn write_op(&self, op: &SequencedWalOp) -> Result<WriteSummary>;
/// Return a reader for the ops in the segment
async fn reader(&self) -> Result<Box<dyn OpReader>>;
}
/// Methods for working with segments (a wal)
/// Methods for working with segments (a WAL)
#[async_trait]
pub trait SegmentWal {
/// Closes the currently open segment and opens a new one, returning the closed segment details
@ -217,7 +222,7 @@ pub trait SegmentWal {
/// Gets a list of the closed segments
async fn closed_segments() -> Vec<ClosedSegment>;
/// Opens a reader for a given segment from the wal
/// Opens a reader for a given segment from the WAL
async fn reader_for_segment(id: SegmentId) -> Result<Box<dyn OpReader>>;
/// Returns a handle to the open segment
@ -230,7 +235,7 @@ pub trait SegmentWal {
/// Data for a Segment entry
#[derive(Debug, Eq, PartialEq)]
pub struct SegmentEntry {
/// The crc checksum of the uncompressed data
/// The CRC checksum of the uncompressed data
pub checksum: u32,
/// The uncompressed data
pub data: Vec<u8>,
@ -240,19 +245,19 @@ pub struct SegmentEntry {
#[derive(Debug, Copy, Clone)]
pub struct WriteSummary {
/// Total size of the segment in bytes
pub segment_size_bytes: usize,
/// Number of bytes written to segment
pub total_bytes: usize,
/// Number of bytes written to segment in this write
pub bytes_written: usize,
/// Checksum for the compressed data written to segment
pub checksum: u32,
}
/// A Segment in a wal, which is a single file
/// A Segment in a WAL. One segment is stored in one file.
#[derive(Debug)]
pub struct SegmentFile {
id: Uuid,
id: Uuid, // todo: use SegmentId type
path: PathBuf,
state: RwLock<SegmentFileInner>,
state: RwLock<SegmentFileInner>, // todo: is this lock needed?
}
#[derive(Debug)]
@ -296,13 +301,13 @@ impl SegmentFile {
async fn write(&self, data: &[u8]) -> Result<WriteSummary> {
// Only designed to support chunks up to `u32::max` bytes long.
let uncompressed_len = data.len();
let _ = u32::try_from(uncompressed_len).context(ChunkSizeTooLargeSnafu {
u32::try_from(uncompressed_len).context(ChunkSizeTooLargeSnafu {
actual: uncompressed_len,
})?;
let mut encoder = snap::raw::Encoder::new();
let compressed_data = encoder
.compress_vec(&data)
.compress_vec(data)
.context(UnableToCompressDataSnafu)?;
let actual_compressed_len = compressed_data.len();
let actual_compressed_len =
@ -337,12 +342,13 @@ impl SegmentFile {
Ok(WriteSummary {
checksum,
segment_size_bytes: state.bytes_written,
total_bytes: state.bytes_written,
bytes_written,
})
}
async fn write_ops(&self, ops: &[SequencedWalOp]) -> Result<WriteSummary> {
// todo: bincode instead of serde_json
let encoded = serde_json::to_vec(&ops).unwrap();
self.write(&encoded).await
}
@ -367,14 +373,15 @@ impl SegmentFile {
}
struct SegmentFileReader {
// todo: pin necessary?
f: Pin<Box<dyn AsyncRead>>,
id: SegmentId,
}
impl SegmentFileReader {
pub async fn new(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().to_owned();
let f = tokio::fs::File::open(&path)
let path = path.as_ref();
let f = tokio::fs::File::open(path)
.await
.context(UnableToOpenFileSnafu { path })?;
@ -410,27 +417,6 @@ impl SegmentFileReader {
}
}
async fn file_to_entries(path: impl AsRef<Path>) -> Result<Vec<SegmentEntry>> {
let path = path.as_ref().to_owned();
let f = tokio::fs::File::open(&path)
.await
.context(UnableToOpenFileSnafu { path })?;
let mut f: Pin<Box<dyn AsyncRead>> = Box::pin(f);
// read the identifier from the beginning of the file
is_segment_stream(&mut f).await?;
let _ = read_id(&mut f).await?;
let mut entries = Vec::new();
while let Some(entry) = read_entry(&mut f).await? {
entries.push(entry);
}
Ok(entries)
}
async fn is_segment_stream(f: &mut Pin<Box<dyn AsyncRead>>) -> Result<()> {
let mut header = vec![0u8; FILE_TYPE_IDENTIFIER.len()];
f.read_exact(&mut header)
@ -517,21 +503,21 @@ pub struct Wal {
impl Wal {
pub async fn new(root: impl AsRef<Path>) -> Result<Self> {
let root = root.as_ref().to_owned();
tokio::fs::create_dir_all(&root)
let root = root.as_ref();
tokio::fs::create_dir_all(root)
.await
.context(UnableToCreateWalDirSnafu { path: &root })?;
.context(UnableToCreateWalDirSnafu { path: root })?;
let mut dir = tokio::fs::read_dir(&root)
let mut dir = tokio::fs::read_dir(root)
.await
.context(UnableToReadDirectoryContentsSnafu { path: &root })?;
.context(UnableToReadDirectoryContentsSnafu { path: root })?;
let mut closed_segments = Vec::new();
while let Some(child) = dir
.next_entry()
.await
.context(UnableToReadDirectoryContentsSnafu { path: &root })?
.context(UnableToReadDirectoryContentsSnafu { path: root })?
{
let metadata = child
.metadata()
@ -547,10 +533,10 @@ impl Wal {
}
}
let open_segment = Arc::new(SegmentFile::new_writer(&root).await?);
let open_segment = Arc::new(SegmentFile::new_writer(root).await?);
Ok(Self {
root,
root: root.to_owned(),
closed_segments,
open_segment,
})
@ -563,7 +549,6 @@ mod tests {
use data_types::{NamespaceId, TableId};
use dml::DmlWrite;
use mutable_batch_lp::lines_to_batches;
use mutable_batch_pb;
#[tokio::test]
async fn segment_file_write_and_read_entries() {
@ -628,6 +613,8 @@ mod tests {
assert_eq!(ops, read_ops);
}
// test delete and persist ops
// open wal and write and read ops from segment
// rotates wal to new segment
@ -640,8 +627,10 @@ mod tests {
// read segment works even if last entry is truncated
// writes get batched
fn test_data(lp: &str) -> DatabaseBatch {
let batches = lines_to_batches(&lp, 0).unwrap();
let batches = lines_to_batches(lp, 0).unwrap();
let ids = batches
.keys()
.enumerate()