feat: Implement delete WAL file

pull/24376/head
Carol (Nichols || Goulding) 2022-11-21 16:00:53 -05:00
parent a2c25f5191
commit d5b439732d
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
2 changed files with 50 additions and 15 deletions

View File

@ -16,7 +16,7 @@
use generated_types::influxdata::{iox::delete::v1::DeletePayload, pbdata::v1::DatabaseBatch};
use serde::{Deserialize, Serialize};
use snafu::prelude::*;
use std::{io, path::PathBuf, time::SystemTime};
use std::{collections::HashMap, io, path::PathBuf, time::SystemTime};
use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid;
@ -90,6 +90,15 @@ pub enum Error {
filename: String,
source: uuid::Error,
},
SegmentNotFound {
id: SegmentId,
},
DeleteClosedSegment {
source: std::io::Error,
path: PathBuf,
},
}
/// A specialized `Result` for WAL-related errors
@ -105,7 +114,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
pub type SequenceNumber = u64;
/// Segments are identified by a type 4 UUID
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct SegmentId(Uuid);
#[allow(missing_docs)]
@ -169,7 +178,7 @@ const SEGMENT_FILE_EXTENSION: &str = "dat";
#[derive(Debug)]
pub struct Wal {
root: PathBuf,
closed_segments: RwLock<Vec<ClosedSegment>>,
closed_segments: RwLock<HashMap<SegmentId, ClosedSegment>>,
open_segment: OpenSegmentFile,
}
@ -193,7 +202,7 @@ impl Wal {
.await
.context(UnableToReadDirectoryContentsSnafu { path: &root })?;
let mut closed_segments = Vec::new();
let mut closed_segments = HashMap::new();
while let Some(child) = dir
.next_entry()
@ -208,15 +217,16 @@ impl Wal {
let child_path = child.path();
let filename = child_path.file_name().unwrap();
let filename = filename.to_str().unwrap();
let id = Uuid::parse_str(filename)
.context(InvalidUuidSnafu { filename })?
.into();
let segment = ClosedSegment {
id: Uuid::parse_str(filename)
.context(InvalidUuidSnafu { filename })?
.into(),
id,
path: child.path(),
size: metadata.len(),
created_at: metadata.created().context(UnableToReadFileMetadataSnafu)?,
};
closed_segments.push(segment);
closed_segments.insert(id, segment);
}
}
@ -271,7 +281,13 @@ pub struct WalReader<'a>(&'a Wal);
impl<'a> WalReader<'a> {
/// Gets a list of the closed segments
pub async fn closed_segments(&self) -> Vec<ClosedSegment> {
self.0.closed_segments.read().await.clone()
self.0
.closed_segments
.read()
.await
.values()
.cloned()
.collect()
}
/// Opens a reader for a given segment from the WAL
@ -294,13 +310,24 @@ impl<'a> WalRotator<'a> {
(),
)
.await?;
self.0.closed_segments.write().await.push(closed.clone());
self.0
.closed_segments
.write()
.await
.insert(closed.id, closed.clone());
Ok(closed)
}
/// Deletes the specified segment from disk.
pub async fn delete(&self, _id: SegmentId) -> Result<()> {
todo!();
pub async fn delete(&self, id: SegmentId) -> Result<()> {
let closed = self
.0
.closed_segments
.write()
.await
.remove(&id)
.context(SegmentNotFoundSnafu { id })?;
std::fs::remove_file(&closed.path).context(DeleteClosedSegmentSnafu { path: closed.path })
}
}

View File

@ -58,12 +58,20 @@ async fn crud() {
.unwrap();
let op = reader.next_ops().await.unwrap().unwrap();
assert_eq!(op.sequence_number, 42);
let op = reader.next_ops().await.unwrap().unwrap();
assert_eq!(op.sequence_number, 43);
// Can delete a segment
// wal_rotator.delete()
// Can delete a segment, leaving no closed segments again
wal_rotator
.delete(closed_segment_details.id())
.await
.unwrap();
let closed = wal_reader.closed_segments().await;
assert!(
closed.is_empty(),
"Expected empty closed segments; got {:?}",
closed
);
}
fn arbitrary_sequenced_wal_op(sequence_number: u64) -> SequencedWalOp {