refactor: improve directory names

pull/24376/head
Marco Neumann 2021-10-19 16:15:22 +02:00
parent 6ec0bd5bab
commit a5f15e6e76
1 changed files with 64 additions and 61 deletions

View File

@ -9,25 +9,25 @@
//! //!
//! ```text //! ```text
//! <root>/<db_name>/ //! <root>/<db_name>/
//! /cur | Location of current state //! /active | Location of current state
//! : | //! : |
//! : | //! : |
//! : | symlink //! : | symlink
//! : | //! : |
//! : +----+ //! : +--------+
//! : | //! : |
//! : | //! : |
//! : | //! : |
//! /pad/ V //! /version/ V
//! /<uuid>/ //! /<uuid>/
//! : /0/ //! : /0/
//! : : /cur/0 \ //! : : /committed/0 \
//! : : : /1 | Message files //! : : : /1 | Message files
//! : : : /2 | (finished) //! : : : /2 | (finished)
//! : : : ... / //! : : : ... /
//! : : : //! : : :
//! : : : //! : : :
//! : : /pad/<uuid> \ //! : : /temp/<uuid> \
//! : : /<uuid> | Message files //! : : /<uuid> | Message files
//! : : /<uuid> | (to be committed) //! : : /<uuid> | (to be committed)
//! : : ... / //! : : ... /
@ -224,13 +224,13 @@ impl WriteBufferWriting for FileBufferProducer {
message.extend(b"\n"); message.extend(b"\n");
message.extend(entry.data()); message.extend(entry.data());
// write data to scratchpad file // write data to scratchpad file in temp directory
let pad_file = sequencer_path.join("pad").join(Uuid::new_v4().to_string()); let temp_file = sequencer_path.join("temp").join(Uuid::new_v4().to_string());
tokio::fs::write(&pad_file, &message).await?; tokio::fs::write(&temp_file, &message).await?;
// scan existing files to figure out new sequence number // scan existing files to figure out new sequence number
let cur = sequencer_path.join("cur"); let committed = sequencer_path.join("committed");
let existing_files = scan_dir::<u64>(&cur, FileType::File).await?; let existing_files = scan_dir::<u64>(&committed, FileType::File).await?;
let mut sequence_number = if let Some(max) = existing_files.keys().max() { let mut sequence_number = if let Some(max) = existing_files.keys().max() {
max.checked_add(1).ok_or_else::<WriteBufferError, _>(|| { max.checked_add(1).ok_or_else::<WriteBufferError, _>(|| {
"Overflow during sequence number calculation" "Overflow during sequence number calculation"
@ -243,8 +243,11 @@ impl WriteBufferWriting for FileBufferProducer {
// try to link scratchpad file to "current" dir // try to link scratchpad file to "current" dir
loop { loop {
let cur_file = cur.join(sequence_number.to_string()); let committed_file = committed.join(sequence_number.to_string());
if tokio::fs::hard_link(&pad_file, &cur_file).await.is_ok() { if tokio::fs::hard_link(&temp_file, &committed_file)
.await
.is_ok()
{
break; break;
} }
sequence_number = sequence_number sequence_number = sequence_number
@ -257,7 +260,7 @@ impl WriteBufferWriting for FileBufferProducer {
} }
// unlink scratchpad file (and ignore error) // unlink scratchpad file (and ignore error)
tokio::fs::remove_file(&pad_file).await.ok(); tokio::fs::remove_file(&temp_file).await.ok();
Ok((Sequence::new(sequencer_id, sequence_number), now)) Ok((Sequence::new(sequencer_id, sequence_number), now))
} }
@ -302,21 +305,21 @@ impl WriteBufferReading for FileBufferConsumer {
let mut streams = BTreeMap::default(); let mut streams = BTreeMap::default();
for (sequencer_id, (sequencer_path, next_sequence_number)) in &self.dirs { for (sequencer_id, (sequencer_path, next_sequence_number)) in &self.dirs {
let cur = sequencer_path.join("cur"); let committed = sequencer_path.join("committed");
let stream = ConsumerStream::new( let stream = ConsumerStream::new(
*sequencer_id, *sequencer_id,
cur.clone(), committed.clone(),
Arc::clone(next_sequence_number), Arc::clone(next_sequence_number),
self.trace_collector.clone(), self.trace_collector.clone(),
) )
.boxed(); .boxed();
let fetch_high_watermark = move || { let fetch_high_watermark = move || {
let cur = cur.clone(); let committed = committed.clone();
let fut = async move { let fut = async move {
let files = scan_dir::<u64>(&cur, FileType::File).await?; let files = scan_dir::<u64>(&committed, FileType::File).await?;
let watermark = files.keys().max().map(|n| n + 1).unwrap_or(0); let watermark = files.keys().max().map(|n| n + 1).unwrap_or(0);
Ok(watermark) Ok(watermark)
@ -525,48 +528,48 @@ async fn maybe_auto_create_directories(
creation_config: Option<&WriteBufferCreationConfig>, creation_config: Option<&WriteBufferCreationConfig>,
) -> Result<BTreeMap<u32, PathBuf>, WriteBufferError> { ) -> Result<BTreeMap<u32, PathBuf>, WriteBufferError> {
loop { loop {
// figure out if a current version exists // figure out if a active version exists
let cur = root.join("cur"); let active = root.join("active");
if tokio::fs::metadata(&cur).await.is_ok() { if tokio::fs::metadata(&active).await.is_ok() {
// Scan for directories // Scan for directories
let directories = scan_dir(&cur, FileType::Dir).await?; let directories = scan_dir(&active, FileType::Dir).await?;
if directories.is_empty() { if directories.is_empty() {
return Err("Current configuration has zero sequencers." return Err("Active configuration has zero sequencers."
.to_string() .to_string()
.into()); .into());
} }
return Ok(directories); return Ok(directories);
} }
// no current config exists // no active config exists
if let Some(creation_config) = creation_config { if let Some(creation_config) = creation_config {
// create scratchpad directory // create version directory
let pad = root.join("pad").join(Uuid::new_v4().to_string()); let version = root.join("version").join(Uuid::new_v4().to_string());
tokio::fs::create_dir_all(&pad).await?; tokio::fs::create_dir_all(&version).await?;
let mut directories = BTreeMap::new(); let mut directories = BTreeMap::new();
for sequencer_id in 0..creation_config.n_sequencers.get() { for sequencer_id in 0..creation_config.n_sequencers.get() {
let sequencer_path_in_pad = pad.join(sequencer_id.to_string()); let sequencer_path_in_version = version.join(sequencer_id.to_string());
tokio::fs::create_dir(&sequencer_path_in_pad).await?; tokio::fs::create_dir(&sequencer_path_in_version).await?;
let sequencer_path_in_pad_cur = sequencer_path_in_pad.join("cur"); let committed = sequencer_path_in_version.join("committed");
tokio::fs::create_dir(&sequencer_path_in_pad_cur).await?; tokio::fs::create_dir(&committed).await?;
let sequencer_path_in_pad_pad = sequencer_path_in_pad.join("pad"); let temp = sequencer_path_in_version.join("temp");
tokio::fs::create_dir(&sequencer_path_in_pad_pad).await?; tokio::fs::create_dir(&temp).await?;
let sequencer_path_in_cur = cur.join(sequencer_id.to_string()); let sequencer_path_in_active = active.join(sequencer_id.to_string());
directories.insert(sequencer_id, sequencer_path_in_cur); directories.insert(sequencer_id, sequencer_path_in_active);
} }
// symlink cur->pad // symlink active->version
if tokio::fs::symlink(&pad, cur).await.is_ok() { if tokio::fs::symlink(&version, active).await.is_ok() {
// linking worked // linking worked
return Ok(directories); return Ok(directories);
} else { } else {
// linking did not work, assuming a concurrent initialization process. Remove scratchpad and and try again // linking did not work, assuming a concurrent initialization process. Remove version and and try again.
tokio::fs::remove_dir_all(&pad).await?; tokio::fs::remove_dir_all(&version).await?;
continue; continue;
} }
} else { } else {