From d5b439732daab9048f7383dafa32531ebffb3959 Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Mon, 21 Nov 2022 16:00:53 -0500 Subject: [PATCH] feat: Implement delete WAL file --- wal/src/lib.rs | 51 +++++++++++++++++++++++++++++++---------- wal/tests/end_to_end.rs | 14 ++++++++--- 2 files changed, 50 insertions(+), 15 deletions(-) diff --git a/wal/src/lib.rs b/wal/src/lib.rs index 057eb6d716..3682614a27 100644 --- a/wal/src/lib.rs +++ b/wal/src/lib.rs @@ -16,7 +16,7 @@ use generated_types::influxdata::{iox::delete::v1::DeletePayload, pbdata::v1::DatabaseBatch}; use serde::{Deserialize, Serialize}; use snafu::prelude::*; -use std::{io, path::PathBuf, time::SystemTime}; +use std::{collections::HashMap, io, path::PathBuf, time::SystemTime}; use tokio::sync::{mpsc, oneshot, RwLock}; use uuid::Uuid; @@ -90,6 +90,15 @@ pub enum Error { filename: String, source: uuid::Error, }, + + SegmentNotFound { + id: SegmentId, + }, + + DeleteClosedSegment { + source: std::io::Error, + path: PathBuf, + }, } /// A specialized `Result` for WAL-related errors @@ -105,7 +114,7 @@ pub type Result = std::result::Result; pub type SequenceNumber = u64; /// Segments are identified by a type 4 UUID -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct SegmentId(Uuid); #[allow(missing_docs)] @@ -169,7 +178,7 @@ const SEGMENT_FILE_EXTENSION: &str = "dat"; #[derive(Debug)] pub struct Wal { root: PathBuf, - closed_segments: RwLock>, + closed_segments: RwLock>, open_segment: OpenSegmentFile, } @@ -193,7 +202,7 @@ impl Wal { .await .context(UnableToReadDirectoryContentsSnafu { path: &root })?; - let mut closed_segments = Vec::new(); + let mut closed_segments = HashMap::new(); while let Some(child) = dir .next_entry() @@ -208,15 +217,16 @@ impl Wal { let child_path = child.path(); let filename = child_path.file_name().unwrap(); let filename = filename.to_str().unwrap(); + let id = Uuid::parse_str(filename) + .context(InvalidUuidSnafu { filename })? + .into(); let segment = ClosedSegment { - id: Uuid::parse_str(filename) - .context(InvalidUuidSnafu { filename })? - .into(), + id, path: child.path(), size: metadata.len(), created_at: metadata.created().context(UnableToReadFileMetadataSnafu)?, }; - closed_segments.push(segment); + closed_segments.insert(id, segment); } } @@ -271,7 +281,13 @@ pub struct WalReader<'a>(&'a Wal); impl<'a> WalReader<'a> { /// Gets a list of the closed segments pub async fn closed_segments(&self) -> Vec { - self.0.closed_segments.read().await.clone() + self.0 + .closed_segments + .read() + .await + .values() + .cloned() + .collect() } /// Opens a reader for a given segment from the WAL @@ -294,13 +310,24 @@ impl<'a> WalRotator<'a> { (), ) .await?; - self.0.closed_segments.write().await.push(closed.clone()); + self.0 + .closed_segments + .write() + .await + .insert(closed.id, closed.clone()); Ok(closed) } /// Deletes the specified segment from disk. - pub async fn delete(&self, _id: SegmentId) -> Result<()> { - todo!(); + pub async fn delete(&self, id: SegmentId) -> Result<()> { + let closed = self + .0 + .closed_segments + .write() + .await + .remove(&id) + .context(SegmentNotFoundSnafu { id })?; + std::fs::remove_file(&closed.path).context(DeleteClosedSegmentSnafu { path: closed.path }) } } diff --git a/wal/tests/end_to_end.rs b/wal/tests/end_to_end.rs index 0bb7b721c1..a18e70f8c6 100644 --- a/wal/tests/end_to_end.rs +++ b/wal/tests/end_to_end.rs @@ -58,12 +58,20 @@ async fn crud() { .unwrap(); let op = reader.next_ops().await.unwrap().unwrap(); assert_eq!(op.sequence_number, 42); - let op = reader.next_ops().await.unwrap().unwrap(); assert_eq!(op.sequence_number, 43); - // Can delete a segment - // wal_rotator.delete() + // Can delete a segment, leaving no closed segments again + wal_rotator + .delete(closed_segment_details.id()) + .await + .unwrap(); + let closed = wal_reader.closed_segments().await; + assert!( + closed.is_empty(), + "Expected empty closed segments; got {:?}", + closed + ); } fn arbitrary_sequenced_wal_op(sequence_number: u64) -> SequencedWalOp {